/*
 * Decompiled with CFR 0.152.
 */
package eu.stratosphere.nephele.execution;

import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.core.fs.Path;
import eu.stratosphere.core.io.IOReadableWritable;
import eu.stratosphere.nephele.deployment.TaskDeploymentDescriptor;
import eu.stratosphere.nephele.execution.CancelTaskException;
import eu.stratosphere.nephele.execution.Environment;
import eu.stratosphere.nephele.execution.ExecutionObserver;
import eu.stratosphere.nephele.execution.ExecutionState;
import eu.stratosphere.nephele.execution.librarycache.LibraryCacheManager;
import eu.stratosphere.nephele.jobgraph.JobID;
import eu.stratosphere.nephele.protocols.AccumulatorProtocol;
import eu.stratosphere.nephele.services.iomanager.IOManager;
import eu.stratosphere.nephele.services.memorymanager.MemoryManager;
import eu.stratosphere.nephele.template.AbstractInvokable;
import eu.stratosphere.nephele.template.InputSplitProvider;
import eu.stratosphere.runtime.io.Buffer;
import eu.stratosphere.runtime.io.channels.ChannelID;
import eu.stratosphere.runtime.io.channels.OutputChannel;
import eu.stratosphere.runtime.io.gates.GateID;
import eu.stratosphere.runtime.io.gates.InputGate;
import eu.stratosphere.runtime.io.gates.OutputGate;
import eu.stratosphere.runtime.io.network.bufferprovider.BufferAvailabilityListener;
import eu.stratosphere.runtime.io.network.bufferprovider.BufferProvider;
import eu.stratosphere.runtime.io.network.bufferprovider.GlobalBufferPool;
import eu.stratosphere.runtime.io.network.bufferprovider.LocalBufferPool;
import eu.stratosphere.runtime.io.network.bufferprovider.LocalBufferPoolOwner;
import eu.stratosphere.util.StringUtils;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.FutureTask;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class RuntimeEnvironment
implements Environment,
BufferProvider,
LocalBufferPoolOwner,
Runnable {
    private static final Log LOG = LogFactory.getLog(RuntimeEnvironment.class);
    private static final int SLEEPINTERVAL = 100;
    private final List<OutputGate> outputGates = new CopyOnWriteArrayList<OutputGate>();
    private final List<InputGate<? extends IOReadableWritable>> inputGates = new CopyOnWriteArrayList<InputGate<? extends IOReadableWritable>>();
    private final Queue<GateID> unboundInputGateIDs = new ArrayDeque<GateID>();
    private final MemoryManager memoryManager;
    private final IOManager ioManager;
    private final Class<? extends AbstractInvokable> invokableClass;
    private final AbstractInvokable invokable;
    private final JobID jobID;
    private final Configuration jobConfiguration;
    private final Configuration taskConfiguration;
    private final InputSplitProvider inputSplitProvider;
    private volatile ExecutionObserver executionObserver = null;
    private volatile Thread executingThread;
    private AccumulatorProtocol accumulatorProtocolProxy = null;
    private final int indexInSubtaskGroup;
    private final int currentNumberOfSubtasks;
    private final String taskName;
    private LocalBufferPool outputBufferPool;
    private final Map<String, FutureTask<Path>> cacheCopyTasks;
    private volatile boolean canceled;

    public RuntimeEnvironment(TaskDeploymentDescriptor tdd, MemoryManager memoryManager, IOManager ioManager, InputSplitProvider inputSplitProvider, AccumulatorProtocol accumulatorProtocolProxy, Map<String, FutureTask<Path>> cpTasks) throws Exception {
        this.jobID = tdd.getJobID();
        this.taskName = tdd.getTaskName();
        this.invokableClass = tdd.getInvokableClass();
        this.jobConfiguration = tdd.getJobConfiguration();
        this.taskConfiguration = tdd.getTaskConfiguration();
        this.indexInSubtaskGroup = tdd.getIndexInSubtaskGroup();
        this.currentNumberOfSubtasks = tdd.getCurrentNumberOfSubtasks();
        this.memoryManager = memoryManager;
        this.ioManager = ioManager;
        this.inputSplitProvider = inputSplitProvider;
        this.accumulatorProtocolProxy = accumulatorProtocolProxy;
        this.cacheCopyTasks = cpTasks;
        this.invokable = this.invokableClass.newInstance();
        this.invokable.setEnvironment(this);
        this.invokable.registerInputOutput();
        int numOutputGates = tdd.getNumberOfOutputGateDescriptors();
        for (int i = 0; i < numOutputGates; ++i) {
            this.outputGates.get(i).initializeChannels(tdd.getOutputGateDescriptor(i));
        }
        int numInputGates = tdd.getNumberOfInputGateDescriptors();
        for (int i = 0; i < numInputGates; ++i) {
            this.inputGates.get(i).initializeChannels(tdd.getInputGateDescriptor(i));
        }
    }

    public AbstractInvokable getInvokable() {
        return this.invokable;
    }

    @Override
    public JobID getJobID() {
        return this.jobID;
    }

    @Override
    public GateID getNextUnboundInputGateID() {
        return this.unboundInputGateIDs.poll();
    }

    @Override
    public OutputGate createAndRegisterOutputGate() {
        OutputGate gate = new OutputGate(this.getJobID(), new GateID(), this.getNumberOfOutputGates());
        this.outputGates.add(gate);
        return gate;
    }

    @Override
    public void run() {
        if (this.invokable == null) {
            LOG.fatal((Object)"ExecutionEnvironment has no Invokable set");
        }
        this.changeExecutionState(ExecutionState.RUNNING, null);
        if (this.executionObserver.isCanceled()) {
            this.changeExecutionState(ExecutionState.CANCELED, null);
            return;
        }
        try {
            ClassLoader cl = LibraryCacheManager.getClassLoader(this.jobID);
            Thread.currentThread().setContextClassLoader(cl);
            this.invokable.invoke();
            if (this.executionObserver.isCanceled()) {
                throw new InterruptedException();
            }
        }
        catch (Throwable t) {
            if (!this.executionObserver.isCanceled()) {
                try {
                    this.invokable.cancel();
                }
                catch (Throwable t2) {
                    LOG.error((Object)StringUtils.stringifyException((Throwable)t2));
                }
            }
            this.releaseAllChannelResources();
            if (this.executionObserver.isCanceled() || t instanceof CancelTaskException) {
                this.changeExecutionState(ExecutionState.CANCELED, null);
            } else {
                this.changeExecutionState(ExecutionState.FAILED, StringUtils.stringifyException((Throwable)t));
            }
            return;
        }
        this.changeExecutionState(ExecutionState.FINISHING, null);
        try {
            this.closeInputGates();
            this.requestAllOutputGatesToClose();
            this.waitForInputChannelsToBeClosed();
            this.waitForOutputChannelsToBeClosed();
        }
        catch (Throwable t) {
            this.releaseAllChannelResources();
            if (this.executionObserver.isCanceled() || t instanceof CancelTaskException) {
                this.changeExecutionState(ExecutionState.CANCELED, null);
            } else {
                this.changeExecutionState(ExecutionState.FAILED, StringUtils.stringifyException((Throwable)t));
            }
            return;
        }
        this.releaseAllChannelResources();
        this.changeExecutionState(ExecutionState.FINISHED, null);
    }

    @Override
    public <T extends IOReadableWritable> InputGate<T> createAndRegisterInputGate() {
        InputGate gate = new InputGate(this.getJobID(), new GateID(), this.getNumberOfInputGates());
        this.inputGates.add(gate);
        return gate;
    }

    @Override
    public int getNumberOfOutputGates() {
        return this.outputGates.size();
    }

    @Override
    public int getNumberOfInputGates() {
        return this.inputGates.size();
    }

    @Override
    public int getNumberOfOutputChannels() {
        int numberOfOutputChannels = 0;
        for (int i = 0; i < this.outputGates.size(); ++i) {
            numberOfOutputChannels += this.outputGates.get(i).getNumChannels();
        }
        return numberOfOutputChannels;
    }

    @Override
    public int getNumberOfInputChannels() {
        int numberOfInputChannels = 0;
        for (int i = 0; i < this.inputGates.size(); ++i) {
            numberOfInputChannels += this.inputGates.get(i).getNumberOfInputChannels();
        }
        return numberOfInputChannels;
    }

    public InputGate<? extends IOReadableWritable> getInputGate(int pos) {
        if (pos < this.inputGates.size()) {
            return this.inputGates.get(pos);
        }
        return null;
    }

    public OutputGate getOutputGate(int index) {
        if (index < this.outputGates.size()) {
            return this.outputGates.get(index);
        }
        return null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Thread getExecutingThread() {
        RuntimeEnvironment runtimeEnvironment = this;
        synchronized (runtimeEnvironment) {
            if (this.executingThread == null) {
                this.executingThread = this.taskName == null ? new Thread(this) : new Thread((Runnable)this, this.getTaskNameWithIndex());
            }
            return this.executingThread;
        }
    }

    public void cancelExecution() {
        this.canceled = true;
        LOG.info((Object)("Canceling " + this.getTaskNameWithIndex()));
        if (this.invokable != null) {
            try {
                this.invokable.cancel();
            }
            catch (Throwable e) {
                LOG.error((Object)"Error while cancelling the task.", e);
            }
        }
        this.executingThread.interrupt();
        try {
            this.executingThread.join(5000L);
        }
        catch (InterruptedException e) {
            // empty catch block
        }
        if (!this.executingThread.isAlive()) {
            return;
        }
        while (this.executingThread != null && this.executingThread.isAlive()) {
            LOG.warn((Object)("Task " + this.getTaskName() + " did not react to cancelling signal. Sending repeated interrupt."));
            if (LOG.isDebugEnabled()) {
                StackTraceElement[] stack;
                StringBuilder bld = new StringBuilder("Task ").append(this.getTaskName()).append(" is stuck in method:\n");
                for (StackTraceElement e : stack = this.executingThread.getStackTrace()) {
                    bld.append(e).append('\n');
                }
                LOG.debug((Object)bld.toString());
            }
            this.executingThread.interrupt();
            try {
                this.executingThread.join(1000L);
            }
            catch (InterruptedException interruptedException) {}
        }
    }

    private void waitForOutputChannelsToBeClosed() throws InterruptedException {
        if (this.executionObserver.isCanceled()) {
            return;
        }
        for (OutputGate og : this.outputGates) {
            og.waitForGateToBeClosed();
        }
    }

    private void waitForInputChannelsToBeClosed() throws IOException, InterruptedException {
        while (!this.canceled) {
            if (this.executionObserver.isCanceled()) {
                throw new InterruptedException();
            }
            boolean allClosed = true;
            for (int i = 0; i < this.getNumberOfInputGates(); ++i) {
                InputGate<? extends IOReadableWritable> eig = this.inputGates.get(i);
                if (eig.isClosed()) continue;
                allClosed = false;
            }
            if (allClosed) break;
            Thread.sleep(100L);
        }
    }

    private void closeInputGates() throws IOException, InterruptedException {
        for (int i = 0; i < this.inputGates.size(); ++i) {
            InputGate<? extends IOReadableWritable> eig = this.inputGates.get(i);
            eig.close();
        }
    }

    private void requestAllOutputGatesToClose() throws IOException, InterruptedException {
        for (int i = 0; i < this.outputGates.size(); ++i) {
            this.outputGates.get(i).requestClose();
        }
    }

    @Override
    public IOManager getIOManager() {
        return this.ioManager;
    }

    @Override
    public MemoryManager getMemoryManager() {
        return this.memoryManager;
    }

    @Override
    public Configuration getTaskConfiguration() {
        return this.taskConfiguration;
    }

    @Override
    public Configuration getJobConfiguration() {
        return this.jobConfiguration;
    }

    @Override
    public int getCurrentNumberOfSubtasks() {
        return this.currentNumberOfSubtasks;
    }

    @Override
    public int getIndexInSubtaskGroup() {
        return this.indexInSubtaskGroup;
    }

    private void changeExecutionState(ExecutionState newExecutionState, String optionalMessage) {
        if (this.executionObserver != null) {
            this.executionObserver.executionStateChanged(newExecutionState, optionalMessage);
        }
    }

    @Override
    public String getTaskName() {
        return this.taskName;
    }

    public String getTaskNameWithIndex() {
        return String.format("%s (%d/%d)", this.taskName, this.getIndexInSubtaskGroup() + 1, this.getCurrentNumberOfSubtasks());
    }

    public void setExecutionObserver(ExecutionObserver executionObserver) {
        this.executionObserver = executionObserver;
    }

    @Override
    public InputSplitProvider getInputSplitProvider() {
        return this.inputSplitProvider;
    }

    @Override
    public void userThreadStarted(Thread userThread) {
        if (this.executionObserver != null) {
            this.executionObserver.userThreadStarted(userThread);
        }
    }

    @Override
    public void userThreadFinished(Thread userThread) {
        if (this.executionObserver != null) {
            this.executionObserver.userThreadFinished(userThread);
        }
    }

    private void releaseAllChannelResources() {
        int i;
        for (i = 0; i < this.inputGates.size(); ++i) {
            this.inputGates.get(i).releaseAllChannelResources();
        }
        for (i = 0; i < this.outputGates.size(); ++i) {
            this.outputGates.get(i).releaseAllChannelResources();
        }
    }

    @Override
    public Set<ChannelID> getOutputChannelIDs() {
        HashSet<ChannelID> ids = new HashSet<ChannelID>();
        for (OutputGate gate : this.outputGates) {
            for (OutputChannel channel : gate.channels()) {
                ids.add(channel.getID());
            }
        }
        return Collections.unmodifiableSet(ids);
    }

    @Override
    public Set<ChannelID> getInputChannelIDs() {
        HashSet<ChannelID> inputChannelIDs = new HashSet<ChannelID>();
        for (InputGate<? extends IOReadableWritable> outputGate : this.inputGates) {
            for (int i = 0; i < outputGate.getNumberOfInputChannels(); ++i) {
                inputChannelIDs.add(outputGate.getInputChannel(i).getID());
            }
        }
        return Collections.unmodifiableSet(inputChannelIDs);
    }

    @Override
    public Set<GateID> getInputGateIDs() {
        HashSet<GateID> inputGateIDs = new HashSet<GateID>();
        Iterator<InputGate<? extends IOReadableWritable>> gateIterator = this.inputGates.iterator();
        while (gateIterator.hasNext()) {
            inputGateIDs.add(gateIterator.next().getGateID());
        }
        return Collections.unmodifiableSet(inputGateIDs);
    }

    @Override
    public Set<GateID> getOutputGateIDs() {
        HashSet<GateID> outputGateIDs = new HashSet<GateID>();
        Iterator<OutputGate> gateIterator = this.outputGates.iterator();
        while (gateIterator.hasNext()) {
            outputGateIDs.add(gateIterator.next().getGateID());
        }
        return Collections.unmodifiableSet(outputGateIDs);
    }

    @Override
    public Set<ChannelID> getOutputChannelIDsOfGate(GateID gateID) {
        OutputGate outputGate = null;
        for (OutputGate candidateGate : this.outputGates) {
            if (!candidateGate.getGateID().equals(gateID)) continue;
            outputGate = candidateGate;
            break;
        }
        if (outputGate == null) {
            throw new IllegalArgumentException("Cannot find output gate with ID " + gateID);
        }
        HashSet<ChannelID> outputChannelIDs = new HashSet<ChannelID>();
        for (int i = 0; i < outputGate.getNumChannels(); ++i) {
            outputChannelIDs.add(outputGate.getChannel(i).getID());
        }
        return Collections.unmodifiableSet(outputChannelIDs);
    }

    @Override
    public Set<ChannelID> getInputChannelIDsOfGate(GateID gateID) {
        InputGate<? extends IOReadableWritable> inputGate = null;
        for (InputGate<? extends IOReadableWritable> candidateGate : this.inputGates) {
            if (!candidateGate.getGateID().equals(gateID)) continue;
            inputGate = candidateGate;
            break;
        }
        if (inputGate == null) {
            throw new IllegalArgumentException("Cannot find input gate with ID " + gateID);
        }
        HashSet<ChannelID> inputChannelIDs = new HashSet<ChannelID>();
        for (int i = 0; i < inputGate.getNumberOfInputChannels(); ++i) {
            inputChannelIDs.add(inputGate.getInputChannel(i).getID());
        }
        return Collections.unmodifiableSet(inputChannelIDs);
    }

    public List<OutputGate> outputGates() {
        return this.outputGates;
    }

    public List<InputGate<? extends IOReadableWritable>> inputGates() {
        return this.inputGates;
    }

    @Override
    public AccumulatorProtocol getAccumulatorProtocolProxy() {
        return this.accumulatorProtocolProxy;
    }

    public void addCopyTaskForCacheFile(String name, FutureTask<Path> copyTask) {
        this.cacheCopyTasks.put(name, copyTask);
    }

    @Override
    public Map<String, FutureTask<Path>> getCopyTask() {
        return this.cacheCopyTasks;
    }

    @Override
    public BufferProvider getOutputBufferProvider() {
        return this;
    }

    @Override
    public Buffer requestBuffer(int minBufferSize) throws IOException {
        return this.outputBufferPool.requestBuffer(minBufferSize);
    }

    @Override
    public Buffer requestBufferBlocking(int minBufferSize) throws IOException, InterruptedException {
        return this.outputBufferPool.requestBufferBlocking(minBufferSize);
    }

    @Override
    public int getBufferSize() {
        return this.outputBufferPool.getBufferSize();
    }

    @Override
    public BufferProvider.BufferAvailabilityRegistration registerBufferAvailabilityListener(BufferAvailabilityListener listener) {
        return this.outputBufferPool.registerBufferAvailabilityListener(listener);
    }

    @Override
    public int getNumberOfChannels() {
        return this.getNumberOfOutputChannels();
    }

    @Override
    public void setDesignatedNumberOfBuffers(int numBuffers) {
        this.outputBufferPool.setNumDesignatedBuffers(numBuffers);
    }

    @Override
    public void clearLocalBufferPool() {
        this.outputBufferPool.destroy();
    }

    @Override
    public void registerGlobalBufferPool(GlobalBufferPool globalBufferPool) {
        if (this.outputBufferPool == null) {
            this.outputBufferPool = new LocalBufferPool(globalBufferPool, 1);
        }
    }

    @Override
    public void logBufferUtilization() {
        LOG.info((Object)String.format("\t%s: %d available, %d requested, %d designated", this.getTaskNameWithIndex(), this.outputBufferPool.numAvailableBuffers(), this.outputBufferPool.numRequestedBuffers(), this.outputBufferPool.numDesignatedBuffers()));
    }

    @Override
    public void reportAsynchronousEvent() {
        this.outputBufferPool.reportAsynchronousEvent();
    }
}

