/*
 * Decompiled with CFR 0.152.
 */
package eu.stratosphere.runtime.io.api;

import eu.stratosphere.core.io.IOReadableWritable;
import eu.stratosphere.nephele.event.task.AbstractTaskEvent;
import eu.stratosphere.runtime.io.api.AbstractRecordReader;
import eu.stratosphere.runtime.io.api.MutableRecordReader;
import eu.stratosphere.runtime.io.gates.InputChannelResult;
import eu.stratosphere.runtime.io.gates.InputGate;
import eu.stratosphere.runtime.io.gates.RecordAvailabilityListener;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.HashSet;
import java.util.Set;

public abstract class AbstractUnionRecordReader<T extends IOReadableWritable>
extends AbstractRecordReader
implements RecordAvailabilityListener<T> {
    private final InputGate<T>[] allInputGates;
    private final Set<InputGate<T>> remainingInputGates;
    private final ArrayDeque<InputGate<T>> availableInputGates = new ArrayDeque();
    private InputGate<T> nextInputGateToReadFrom;

    @Override
    public boolean isInputClosed() {
        return this.remainingInputGates.isEmpty();
    }

    protected AbstractUnionRecordReader(MutableRecordReader<T>[] recordReaders) {
        if (recordReaders == null) {
            throw new IllegalArgumentException("Provided argument recordReaders is null");
        }
        if (recordReaders.length < 2) {
            throw new IllegalArgumentException("The mutable union record reader must at least be initialized with two individual mutable record readers");
        }
        this.allInputGates = new InputGate[recordReaders.length];
        this.remainingInputGates = new HashSet<InputGate<T>>((int)((float)recordReaders.length * 1.6f));
        for (int i = 0; i < recordReaders.length; ++i) {
            InputGate inputGate = recordReaders[i].getInputGate();
            inputGate.registerRecordAvailabilityListener(this);
            this.allInputGates[i] = inputGate;
            this.remainingInputGates.add(inputGate);
        }
    }

    @Override
    public void publishEvent(AbstractTaskEvent event) throws IOException, InterruptedException {
        for (InputGate<T> gate : this.allInputGates) {
            gate.publishEvent(event);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void reportRecordAvailability(InputGate<T> inputGate) {
        ArrayDeque<InputGate<T>> arrayDeque = this.availableInputGates;
        synchronized (arrayDeque) {
            this.availableInputGates.add(inputGate);
            this.availableInputGates.notifyAll();
        }
    }

    protected boolean getNextRecord(T target) throws IOException, InterruptedException {
        while (true) {
            if (this.nextInputGateToReadFrom == null) {
                if (this.remainingInputGates.isEmpty()) {
                    return false;
                }
                this.nextInputGateToReadFrom = this.getNextAvailableInputGate();
            }
            InputChannelResult result = this.nextInputGateToReadFrom.readRecord(target);
            switch (result) {
                case INTERMEDIATE_RECORD_FROM_BUFFER: {
                    return true;
                }
                case LAST_RECORD_FROM_BUFFER: {
                    this.nextInputGateToReadFrom = null;
                    return true;
                }
                case END_OF_SUPERSTEP: {
                    this.nextInputGateToReadFrom = null;
                    if (!this.incrementEndOfSuperstepEventAndCheck()) break;
                    return false;
                }
                case TASK_EVENT: {
                    this.handleEvent(this.nextInputGateToReadFrom.getCurrentEvent());
                    this.nextInputGateToReadFrom = null;
                    break;
                }
                case END_OF_STREAM: {
                    this.remainingInputGates.remove(this.nextInputGateToReadFrom);
                    this.nextInputGateToReadFrom = null;
                    break;
                }
                case NONE: {
                    this.nextInputGateToReadFrom = null;
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private InputGate<T> getNextAvailableInputGate() throws InterruptedException {
        ArrayDeque<InputGate<T>> arrayDeque = this.availableInputGates;
        synchronized (arrayDeque) {
            while (this.availableInputGates.isEmpty()) {
                this.availableInputGates.wait();
            }
            return this.availableInputGates.pop();
        }
    }
}

