/*
 * Decompiled with CFR 0.152.
 */
package eu.stratosphere.pact.runtime.iterative.task;

import com.google.common.base.Preconditions;
import eu.stratosphere.api.common.aggregators.Aggregator;
import eu.stratosphere.api.common.aggregators.AggregatorWithName;
import eu.stratosphere.api.common.aggregators.ConvergenceCriterion;
import eu.stratosphere.nephele.event.task.AbstractTaskEvent;
import eu.stratosphere.nephele.execution.librarycache.LibraryCacheManager;
import eu.stratosphere.nephele.template.AbstractInvokable;
import eu.stratosphere.nephele.types.IntegerRecord;
import eu.stratosphere.pact.runtime.iterative.event.AllWorkersDoneEvent;
import eu.stratosphere.pact.runtime.iterative.event.TerminationEvent;
import eu.stratosphere.pact.runtime.iterative.event.WorkerDoneEvent;
import eu.stratosphere.pact.runtime.iterative.task.SyncEventHandler;
import eu.stratosphere.pact.runtime.iterative.task.Terminable;
import eu.stratosphere.pact.runtime.task.RegularPactTask;
import eu.stratosphere.pact.runtime.task.util.TaskConfig;
import eu.stratosphere.runtime.io.api.MutableRecordReader;
import eu.stratosphere.types.Value;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class IterationSynchronizationSinkTask
extends AbstractInvokable
implements Terminable {
    private static final Log log = LogFactory.getLog(IterationSynchronizationSinkTask.class);
    private MutableRecordReader<IntegerRecord> headEventReader;
    private ClassLoader userCodeClassLoader;
    private SyncEventHandler eventHandler;
    private ConvergenceCriterion<Value> convergenceCriterion;
    private Map<String, Aggregator<?>> aggregators;
    private String convergenceAggregatorName;
    private int currentIteration = 1;
    private int maxNumberOfIterations;
    private final AtomicBoolean terminated = new AtomicBoolean(false);

    @Override
    public void registerInputOutput() {
        this.headEventReader = new MutableRecordReader(this);
    }

    @Override
    public void invoke() throws Exception {
        this.userCodeClassLoader = LibraryCacheManager.getClassLoader(this.getEnvironment().getJobID());
        TaskConfig taskConfig = new TaskConfig(this.getTaskConfiguration());
        this.aggregators = new HashMap();
        for (AggregatorWithName<?> aggWithName : taskConfig.getIterationAggregators()) {
            this.aggregators.put(aggWithName.getName(), aggWithName.getAggregator());
        }
        if (taskConfig.usesConvergenceCriterion()) {
            this.convergenceCriterion = taskConfig.getConvergenceCriterion();
            this.convergenceAggregatorName = taskConfig.getConvergenceCriterionAggregatorName();
            Preconditions.checkNotNull((Object)this.convergenceAggregatorName);
        }
        this.maxNumberOfIterations = taskConfig.getNumberOfIterations();
        int numEventsTillEndOfSuperstep = taskConfig.getNumberOfEventsUntilInterruptInIterativeGate(0);
        this.eventHandler = new SyncEventHandler(numEventsTillEndOfSuperstep, this.aggregators, this.userCodeClassLoader);
        this.headEventReader.subscribeToEvent(this.eventHandler, WorkerDoneEvent.class);
        IntegerRecord dummy = new IntegerRecord();
        while (!this.terminationRequested()) {
            if (log.isInfoEnabled()) {
                log.info((Object)this.formatLogString("starting iteration [" + this.currentIteration + "]"));
            }
            this.readHeadEventChannel(dummy);
            if (log.isInfoEnabled()) {
                log.info((Object)this.formatLogString("finishing iteration [" + this.currentIteration + "]"));
            }
            if (this.checkForConvergence()) {
                if (log.isInfoEnabled()) {
                    log.info((Object)this.formatLogString("signaling that all workers are to terminate in iteration [" + this.currentIteration + "]"));
                }
                this.requestTermination();
                this.sendToAllWorkers(new TerminationEvent());
                continue;
            }
            if (log.isInfoEnabled()) {
                log.info((Object)this.formatLogString("signaling that all workers are done in iteration [" + this.currentIteration + "]"));
            }
            AllWorkersDoneEvent allWorkersDoneEvent = new AllWorkersDoneEvent(this.aggregators);
            this.sendToAllWorkers(allWorkersDoneEvent);
            for (Aggregator<?> agg : this.aggregators.values()) {
                agg.reset();
            }
            ++this.currentIteration;
        }
    }

    private boolean checkForConvergence() {
        if (this.maxNumberOfIterations == this.currentIteration) {
            if (log.isInfoEnabled()) {
                log.info((Object)this.formatLogString("maximum number of iterations [" + this.currentIteration + "] reached, terminating..."));
            }
            return true;
        }
        if (this.convergenceAggregatorName != null) {
            Aggregator<?> aggregator = this.aggregators.get(this.convergenceAggregatorName);
            if (aggregator == null) {
                throw new RuntimeException("Error: Aggregator for convergence criterion was null.");
            }
            Value aggregate = aggregator.getAggregate();
            if (this.convergenceCriterion.isConverged(this.currentIteration, aggregate)) {
                if (log.isInfoEnabled()) {
                    log.info((Object)this.formatLogString("convergence reached after [" + this.currentIteration + "] iterations, terminating..."));
                }
                return true;
            }
        }
        return false;
    }

    private void readHeadEventChannel(IntegerRecord rec) throws IOException {
        block3: {
            this.eventHandler.resetEndOfSuperstep();
            try {
                if (this.headEventReader.next(rec)) {
                    throw new RuntimeException("Synchronization task must not see any records!");
                }
            }
            catch (InterruptedException iex) {
                if (this.eventHandler.isEndOfSuperstep()) break block3;
                throw new RuntimeException("Event handler interrupted without reaching end-of-superstep.");
            }
        }
    }

    private void sendToAllWorkers(AbstractTaskEvent event) throws IOException, InterruptedException {
        this.headEventReader.publishEvent(event);
    }

    private String formatLogString(String message) {
        return RegularPactTask.constructLogString(message, this.getEnvironment().getTaskName(), this);
    }

    @Override
    public boolean terminationRequested() {
        return this.terminated.get();
    }

    @Override
    public void requestTermination() {
        this.terminated.set(true);
    }
}

