/*
 * Decompiled with CFR 0.152.
 */
package eu.stratosphere.runtime.io.gates;

import eu.stratosphere.core.io.IOReadableWritable;
import eu.stratosphere.nephele.deployment.ChannelDeploymentDescriptor;
import eu.stratosphere.nephele.deployment.GateDeploymentDescriptor;
import eu.stratosphere.nephele.event.task.AbstractEvent;
import eu.stratosphere.nephele.event.task.AbstractTaskEvent;
import eu.stratosphere.nephele.jobgraph.JobID;
import eu.stratosphere.runtime.io.Buffer;
import eu.stratosphere.runtime.io.channels.InputChannel;
import eu.stratosphere.runtime.io.gates.Gate;
import eu.stratosphere.runtime.io.gates.GateID;
import eu.stratosphere.runtime.io.gates.InputChannelResult;
import eu.stratosphere.runtime.io.gates.RecordAvailabilityListener;
import eu.stratosphere.runtime.io.network.bufferprovider.BufferAvailabilityListener;
import eu.stratosphere.runtime.io.network.bufferprovider.BufferProvider;
import eu.stratosphere.runtime.io.network.bufferprovider.GlobalBufferPool;
import eu.stratosphere.runtime.io.network.bufferprovider.LocalBufferPool;
import eu.stratosphere.runtime.io.network.bufferprovider.LocalBufferPoolOwner;
import java.io.IOException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class InputGate<T extends IOReadableWritable>
extends Gate<T>
implements BufferProvider,
LocalBufferPoolOwner {
    private static final Log LOG = LogFactory.getLog(InputGate.class);
    private InputChannel<T>[] channels;
    private final BlockingQueue<Integer> availableChannels = new LinkedBlockingQueue<Integer>();
    private final AtomicReference<RecordAvailabilityListener<T>> recordAvailabilityListener = new AtomicReference<Object>(null);
    private AbstractTaskEvent currentEvent;
    private boolean isClosed = false;
    private int channelToReadFrom = -1;
    private LocalBufferPool bufferPool;

    public InputGate(JobID jobID, GateID gateID, int index) {
        super(jobID, gateID, index);
    }

    public void initializeChannels(GateDeploymentDescriptor inputGateDescriptor) {
        this.channels = new InputChannel[inputGateDescriptor.getNumberOfChannelDescriptors()];
        this.setChannelType(inputGateDescriptor.getChannelType());
        int nicdd = inputGateDescriptor.getNumberOfChannelDescriptors();
        for (int i = 0; i < nicdd; ++i) {
            ChannelDeploymentDescriptor cdd = inputGateDescriptor.getChannelDescriptor(i);
            this.channels[i] = new InputChannel(this, i, cdd.getInputChannelID(), cdd.getOutputChannelID(), this.getChannelType());
        }
    }

    @Override
    public boolean isInputGate() {
        return true;
    }

    public int getNumberOfInputChannels() {
        return this.channels.length;
    }

    public InputChannel<T> getInputChannel(int pos) {
        return this.channels[pos];
    }

    public InputChannel<T>[] channels() {
        return this.channels;
    }

    public InputChannelResult readRecord(T target) throws IOException, InterruptedException {
        if (this.channelToReadFrom == -1) {
            if (this.isClosed()) {
                return InputChannelResult.END_OF_STREAM;
            }
            if (Thread.interrupted()) {
                throw new InterruptedException();
            }
            this.channelToReadFrom = this.waitForAnyChannelToBecomeAvailable();
        }
        InputChannelResult result = this.getInputChannel(this.channelToReadFrom).readRecord(target);
        switch (result) {
            case INTERMEDIATE_RECORD_FROM_BUFFER: {
                return InputChannelResult.INTERMEDIATE_RECORD_FROM_BUFFER;
            }
            case LAST_RECORD_FROM_BUFFER: {
                this.channelToReadFrom = -1;
                return InputChannelResult.LAST_RECORD_FROM_BUFFER;
            }
            case END_OF_SUPERSTEP: {
                this.channelToReadFrom = -1;
                return InputChannelResult.END_OF_SUPERSTEP;
            }
            case TASK_EVENT: {
                this.currentEvent = this.getInputChannel(this.channelToReadFrom).getCurrentEvent();
                this.channelToReadFrom = -1;
                return InputChannelResult.TASK_EVENT;
            }
            case NONE: {
                this.channelToReadFrom = -1;
                return InputChannelResult.NONE;
            }
            case END_OF_STREAM: {
                this.channelToReadFrom = -1;
                return this.isClosed() ? InputChannelResult.END_OF_STREAM : InputChannelResult.NONE;
            }
        }
        throw new RuntimeException();
    }

    public AbstractTaskEvent getCurrentEvent() {
        AbstractTaskEvent e = this.currentEvent;
        this.currentEvent = null;
        return e;
    }

    public void notifyRecordIsAvailable(int channelIndex) {
        this.availableChannels.add(channelIndex);
        RecordAvailabilityListener<T> listener = this.recordAvailabilityListener.get();
        if (listener != null) {
            listener.reportRecordAvailability(this);
        }
    }

    public int waitForAnyChannelToBecomeAvailable() throws InterruptedException {
        return this.availableChannels.take();
    }

    @Override
    public boolean isClosed() throws IOException, InterruptedException {
        if (this.isClosed) {
            return true;
        }
        for (int i = 0; i < this.getNumberOfInputChannels(); ++i) {
            InputChannel<T> inputChannel = this.channels[i];
            if (inputChannel.isClosed()) continue;
            return false;
        }
        this.isClosed = true;
        return true;
    }

    public void close() throws IOException, InterruptedException {
        for (int i = 0; i < this.getNumberOfInputChannels(); ++i) {
            InputChannel<T> inputChannel = this.channels[i];
            inputChannel.close();
        }
    }

    @Override
    public String toString() {
        return "Input " + super.toString();
    }

    @Override
    public void publishEvent(AbstractEvent event) throws IOException, InterruptedException {
        for (int i = 0; i < this.getNumberOfChannels(); ++i) {
            this.channels[i].transferEvent(event);
        }
    }

    @Override
    public void releaseAllChannelResources() {
        for (int i = 0; i < this.getNumberOfChannels(); ++i) {
            this.channels[i].releaseAllResources();
        }
    }

    public void registerRecordAvailabilityListener(RecordAvailabilityListener<T> listener) {
        if (!this.recordAvailabilityListener.compareAndSet(null, listener)) {
            throw new IllegalStateException(this.recordAvailabilityListener + " is already registered as a record availability listener");
        }
    }

    public void notifyDataUnitConsumed(int channelIndex) {
        this.channelToReadFrom = -1;
    }

    @Override
    public Buffer requestBuffer(int minBufferSize) throws IOException {
        return this.bufferPool.requestBuffer(minBufferSize);
    }

    @Override
    public Buffer requestBufferBlocking(int minBufferSize) throws IOException, InterruptedException {
        return this.bufferPool.requestBufferBlocking(minBufferSize);
    }

    @Override
    public int getBufferSize() {
        return this.bufferPool.getBufferSize();
    }

    @Override
    public int getNumberOfChannels() {
        return this.getNumberOfInputChannels();
    }

    @Override
    public void setDesignatedNumberOfBuffers(int numBuffers) {
        this.bufferPool.setNumDesignatedBuffers(numBuffers);
    }

    @Override
    public void clearLocalBufferPool() {
        this.bufferPool.destroy();
    }

    @Override
    public void registerGlobalBufferPool(GlobalBufferPool globalBufferPool) {
        this.bufferPool = new LocalBufferPool(globalBufferPool, 1);
    }

    @Override
    public void logBufferUtilization() {
        LOG.info((Object)String.format("\t%s: %d available, %d requested, %d designated", this, this.bufferPool.numAvailableBuffers(), this.bufferPool.numRequestedBuffers(), this.bufferPool.numDesignatedBuffers()));
    }

    @Override
    public void reportAsynchronousEvent() {
        this.bufferPool.reportAsynchronousEvent();
    }

    @Override
    public BufferProvider.BufferAvailabilityRegistration registerBufferAvailabilityListener(BufferAvailabilityListener listener) {
        return this.bufferPool.registerBufferAvailabilityListener(listener);
    }
}

