/*
 * Decompiled with CFR 0.152.
 */
package eu.stratosphere.pact.runtime.iterative.task;

import eu.stratosphere.api.common.aggregators.Aggregator;
import eu.stratosphere.api.common.aggregators.LongSumAggregator;
import eu.stratosphere.api.common.functions.Function;
import eu.stratosphere.api.common.functions.IterationRuntimeContext;
import eu.stratosphere.api.common.typeutils.TypeSerializer;
import eu.stratosphere.api.common.typeutils.TypeSerializerFactory;
import eu.stratosphere.core.memory.DataOutputView;
import eu.stratosphere.nephele.execution.Environment;
import eu.stratosphere.pact.runtime.hash.CompactingHashTable;
import eu.stratosphere.pact.runtime.iterative.concurrent.BlockingBackChannel;
import eu.stratosphere.pact.runtime.iterative.concurrent.BlockingBackChannelBroker;
import eu.stratosphere.pact.runtime.iterative.concurrent.Broker;
import eu.stratosphere.pact.runtime.iterative.concurrent.IterationAggregatorBroker;
import eu.stratosphere.pact.runtime.iterative.concurrent.SolutionSetBroker;
import eu.stratosphere.pact.runtime.iterative.io.SolutionSetUpdateOutputCollector;
import eu.stratosphere.pact.runtime.iterative.io.WorksetUpdateOutputCollector;
import eu.stratosphere.pact.runtime.iterative.task.RuntimeAggregatorRegistry;
import eu.stratosphere.pact.runtime.iterative.task.Terminable;
import eu.stratosphere.pact.runtime.task.PactDriver;
import eu.stratosphere.pact.runtime.task.RegularPactTask;
import eu.stratosphere.pact.runtime.task.ResettablePactDriver;
import eu.stratosphere.pact.runtime.task.util.TaskConfig;
import eu.stratosphere.pact.runtime.udf.RuntimeUDFContext;
import eu.stratosphere.runtime.io.api.MutableReader;
import eu.stratosphere.types.Value;
import eu.stratosphere.util.Collector;
import eu.stratosphere.util.InstantiationUtil;
import eu.stratosphere.util.MutableObjectIterator;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public abstract class AbstractIterativePactTask<S extends Function, OT>
extends RegularPactTask<S, OT>
implements Terminable {
    private static final Log log = LogFactory.getLog(AbstractIterativePactTask.class);
    protected LongSumAggregator worksetAggregator;
    protected BlockingBackChannel worksetBackChannel;
    protected boolean isWorksetIteration;
    protected boolean isWorksetUpdate;
    protected boolean isSolutionSetUpdate;
    private RuntimeAggregatorRegistry iterationAggregators;
    private String brokerKey;
    private int superstepNum = 1;
    private volatile boolean terminationRequested;

    @Override
    protected void initialize() throws Exception {
        super.initialize();
        if (this.driver instanceof ResettablePactDriver) {
            ResettablePactDriver resDriver = (ResettablePactDriver)this.driver;
            for (int i = 0; i < resDriver.getNumberOfInputs(); ++i) {
                if (!resDriver.isInputResettable(i)) continue;
                this.excludeFromReset(i);
            }
        }
        TaskConfig config = this.getLastTasksConfig();
        this.isWorksetIteration = config.getIsWorksetIteration();
        this.isWorksetUpdate = config.getIsWorksetUpdate();
        this.isSolutionSetUpdate = config.getIsSolutionSetUpdate();
        if (this.isWorksetUpdate) {
            this.worksetBackChannel = BlockingBackChannelBroker.instance().getAndRemove(this.brokerKey());
            if (this.isWorksetIteration) {
                this.worksetAggregator = (LongSumAggregator)this.getIterationAggregators().getAggregator("pact.runtime.workset-empty-aggregator");
                if (this.worksetAggregator == null) {
                    throw new RuntimeException("Missing workset elements count aggregator.");
                }
            }
        }
    }

    @Override
    public void run() throws Exception {
        if (this.inFirstIteration()) {
            if (this.driver instanceof ResettablePactDriver) {
                ((ResettablePactDriver)this.driver).initialize();
            }
        } else {
            this.reinstantiateDriver();
            this.resetAllInputs();
            for (int i : this.iterativeBroadcastInputs) {
                String name = this.getTaskConfig().getBroadcastInputName(i);
                this.readAndSetBroadcastInput(i, name, this.runtimeUdfContext);
            }
        }
        super.run();
    }

    @Override
    protected void closeLocalStrategiesAndCaches() {
        try {
            super.closeLocalStrategiesAndCaches();
        }
        finally {
            if (this.driver instanceof ResettablePactDriver) {
                ResettablePactDriver resDriver = (ResettablePactDriver)this.driver;
                try {
                    resDriver.teardown();
                }
                catch (Throwable t) {
                    log.error((Object)"Error while shutting down an iterative operator.", t);
                }
            }
        }
    }

    @Override
    public RuntimeUDFContext createRuntimeContext(String taskName) {
        Environment env = this.getEnvironment();
        return new IterativeRuntimeUdfContext(taskName, env.getCurrentNumberOfSubtasks(), env.getIndexInSubtaskGroup());
    }

    protected boolean inFirstIteration() {
        return this.superstepNum == 1;
    }

    protected int currentIteration() {
        return this.superstepNum;
    }

    protected void incrementIterationCounter() {
        ++this.superstepNum;
    }

    public String brokerKey() {
        if (this.brokerKey == null) {
            int iterationId = this.config.getIterationId();
            this.brokerKey = this.getEnvironment().getJobID().toString() + '#' + iterationId + '#' + this.getEnvironment().getIndexInSubtaskGroup();
        }
        return this.brokerKey;
    }

    private void reinstantiateDriver() throws Exception {
        if (this.driver instanceof ResettablePactDriver) {
            ResettablePactDriver resDriver = (ResettablePactDriver)this.driver;
            resDriver.reset();
        } else {
            Class driverClass = this.config.getDriver();
            this.driver = (PactDriver)InstantiationUtil.instantiate(driverClass, PactDriver.class);
            try {
                this.driver.setup(this);
            }
            catch (Throwable t) {
                throw new Exception("The pact driver setup for '" + this.getEnvironment().getTaskName() + "' , caused an error: " + t.getMessage(), t);
            }
        }
    }

    public RuntimeAggregatorRegistry getIterationAggregators() {
        if (this.iterationAggregators == null) {
            this.iterationAggregators = (RuntimeAggregatorRegistry)IterationAggregatorBroker.instance().get(this.brokerKey());
        }
        return this.iterationAggregators;
    }

    protected void checkForTerminationAndResetEndOfSuperstepState() throws IOException {
        MutableReader reader;
        if (this.iterativeInputs.length == 0 && this.iterativeBroadcastInputs.length == 0) {
            throw new IllegalStateException();
        }
        boolean anyClosed = false;
        boolean allClosed = true;
        for (int inputNum : this.iterativeInputs) {
            reader = this.inputReaders[inputNum];
            if (reader.isInputClosed()) {
                anyClosed = true;
                continue;
            }
            if (reader.hasReachedEndOfSuperstep()) {
                allClosed = false;
                reader.startNextSuperstep();
                continue;
            }
            MutableObjectIterator inIter = this.inputIterators[inputNum];
            Object o = this.inputSerializers[inputNum].getSerializer().createInstance();
            while ((o = inIter.next(o)) != null) {
            }
            if (reader.isInputClosed()) {
                anyClosed = true;
                continue;
            }
            allClosed = false;
            reader.startNextSuperstep();
        }
        for (int inputNum : this.iterativeBroadcastInputs) {
            reader = this.broadcastInputReaders[inputNum];
            if (reader.isInputClosed()) {
                anyClosed = true;
                continue;
            }
            if (!reader.hasReachedEndOfSuperstep()) {
                throw new IllegalStateException("An iterative broadcast input has not been fully consumed.");
            }
            allClosed = false;
            reader.startNextSuperstep();
        }
        if (allClosed != anyClosed) {
            throw new IllegalStateException("Inconsistent state: Iteration termination received on some, but not all inputs.");
        }
        if (allClosed) {
            this.requestTermination();
        }
    }

    @Override
    public boolean terminationRequested() {
        return this.terminationRequested;
    }

    @Override
    public void requestTermination() {
        this.terminationRequested = true;
    }

    @Override
    public void cancel() throws Exception {
        this.requestTermination();
        super.cancel();
    }

    protected Collector<OT> createWorksetUpdateOutputCollector(Collector<OT> delegate) {
        DataOutputView outputView = this.worksetBackChannel.getWriteEnd();
        TypeSerializer<OT> serializer = this.getOutputSerializer();
        return new WorksetUpdateOutputCollector<OT>(outputView, serializer, delegate);
    }

    protected Collector<OT> createWorksetUpdateOutputCollector() {
        return this.createWorksetUpdateOutputCollector(null);
    }

    protected Collector<OT> createSolutionSetUpdateOutputCollector(Collector<OT> delegate) {
        Broker<CompactingHashTable<?>> solutionSetBroker = SolutionSetBroker.instance();
        CompactingHashTable<?> solutionSet = solutionSetBroker.get(this.brokerKey());
        TypeSerializer<OT> serializer = this.getOutputSerializer();
        return new SolutionSetUpdateOutputCollector(solutionSet, serializer, delegate);
    }

    private TypeSerializer<OT> getOutputSerializer() {
        TypeSerializerFactory serializerFactory = this.getLastTasksConfig().getOutputSerializer(this.userCodeClassLoader);
        if (serializerFactory == null) {
            throw new RuntimeException("Missing output serializer for workset update.");
        }
        return serializerFactory.getSerializer();
    }

    private class IterativeRuntimeUdfContext
    extends RuntimeUDFContext
    implements IterationRuntimeContext {
        public IterativeRuntimeUdfContext(String name, int numParallelSubtasks, int subtaskIndex) {
            super(name, numParallelSubtasks, subtaskIndex);
        }

        public int getSuperstepNumber() {
            return AbstractIterativePactTask.this.superstepNum;
        }

        public <T extends Aggregator<?>> T getIterationAggregator(String name) {
            return AbstractIterativePactTask.this.getIterationAggregators().getAggregator(name);
        }

        public <T extends Value> T getPreviousIterationAggregate(String name) {
            return (T)AbstractIterativePactTask.this.getIterationAggregators().getPreviousGlobalAggregate(name);
        }
    }
}

