/*
 * Decompiled with CFR 0.152.
 */
package eu.stratosphere.pact.runtime.task;

import eu.stratosphere.api.common.typeutils.TypeSerializer;
import eu.stratosphere.api.common.typeutils.TypeSerializerFactory;
import eu.stratosphere.core.memory.DataInputView;
import eu.stratosphere.core.memory.DataOutputView;
import eu.stratosphere.core.memory.MemorySegment;
import eu.stratosphere.nephele.services.iomanager.IOManager;
import eu.stratosphere.nephele.services.memorymanager.ListMemorySegmentSource;
import eu.stratosphere.nephele.services.memorymanager.MemoryAllocationException;
import eu.stratosphere.nephele.services.memorymanager.MemoryManager;
import eu.stratosphere.nephele.template.AbstractInvokable;
import eu.stratosphere.pact.runtime.io.InputViewIterator;
import eu.stratosphere.pact.runtime.io.SpillingBuffer;
import eu.stratosphere.pact.runtime.task.util.CloseableInputProvider;
import eu.stratosphere.util.MutableObjectIterator;
import java.io.IOException;
import java.util.ArrayList;

public class TempBarrier<T>
implements CloseableInputProvider<T> {
    private final SpillingBuffer buffer;
    private final TypeSerializer<T> serializer;
    private final TempWritingThread tempWriter;
    private final MemoryManager memManager;
    private final Object lock = new Object();
    private volatile Throwable exception;
    private final ArrayList<MemorySegment> memory;
    private volatile boolean writingDone;
    private volatile boolean closed;

    public TempBarrier(AbstractInvokable owner, MutableObjectIterator<T> input, TypeSerializerFactory<T> serializerFactory, MemoryManager memManager, IOManager ioManager, int numPages) throws MemoryAllocationException {
        this.serializer = serializerFactory.getSerializer();
        this.memManager = memManager;
        this.memory = new ArrayList(numPages);
        memManager.allocatePages(owner, this.memory, numPages);
        this.buffer = new SpillingBuffer(ioManager, new ListMemorySegmentSource(this.memory), memManager.getPageSize());
        this.tempWriter = new TempWritingThread(input, serializerFactory.getSerializer(), this.buffer);
    }

    public void startReading() {
        this.tempWriter.start();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public MutableObjectIterator<T> getIterator() throws InterruptedException, IOException {
        Object object = this.lock;
        synchronized (object) {
            while (this.exception == null && !this.writingDone) {
                this.lock.wait(5000L);
            }
        }
        if (this.exception != null) {
            throw new RuntimeException("An error occurred creating the temp table.", this.exception);
        }
        if (this.writingDone) {
            DataInputView in = this.buffer.flip();
            return new InputViewIterator<T>(in, this.serializer);
        }
        return null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void close() throws IOException {
        Object object = this.lock;
        synchronized (object) {
            if (this.closed) {
                return;
            }
            if (this.exception == null) {
                this.exception = new Exception("The dam has been closed.");
            }
            this.lock.notifyAll();
        }
        try {
            this.tempWriter.shutdown();
            this.tempWriter.join();
        }
        catch (InterruptedException iex) {
            throw new IOException("Interrupted");
        }
        this.memManager.release(this.buffer.close());
        this.memManager.release(this.memory);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void setException(Throwable t) {
        Object object = this.lock;
        synchronized (object) {
            this.exception = t;
            this.lock.notifyAll();
        }
        try {
            this.close();
        }
        catch (Throwable throwable) {
            // empty catch block
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void writingDone() throws IOException {
        Object object = this.lock;
        synchronized (object) {
            this.writingDone = true;
            this.lock.notifyAll();
        }
    }

    private final class TempWritingThread
    extends Thread {
        private final MutableObjectIterator<T> input;
        private final TypeSerializer<T> serializer;
        private final SpillingBuffer buffer;
        private volatile boolean running;

        private TempWritingThread(MutableObjectIterator<T> input, TypeSerializer<T> serializer, SpillingBuffer buffer) {
            super("Temp writer");
            this.running = true;
            this.setDaemon(true);
            this.input = input;
            this.serializer = serializer;
            this.buffer = buffer;
        }

        @Override
        public void run() {
            MutableObjectIterator input = this.input;
            TypeSerializer serializer = this.serializer;
            SpillingBuffer buffer = this.buffer;
            try {
                Object record = serializer.createInstance();
                while (this.running && (record = input.next(record)) != null) {
                    serializer.serialize(record, (DataOutputView)buffer);
                }
                TempBarrier.this.writingDone();
            }
            catch (Throwable t) {
                TempBarrier.this.setException(t);
            }
        }

        public void shutdown() {
            this.running = false;
            this.interrupt();
        }
    }
}

