/*
 * Decompiled with CFR 0.152.
 */
package eu.stratosphere.pact.runtime.task;

import eu.stratosphere.api.common.functions.GenericCombine;
import eu.stratosphere.api.common.typeutils.TypeComparator;
import eu.stratosphere.api.common.typeutils.TypeSerializerFactory;
import eu.stratosphere.nephele.services.memorymanager.MemoryManager;
import eu.stratosphere.pact.runtime.sort.AsynchronousPartialSorter;
import eu.stratosphere.pact.runtime.task.DriverStrategy;
import eu.stratosphere.pact.runtime.task.PactDriver;
import eu.stratosphere.pact.runtime.task.PactTaskContext;
import eu.stratosphere.pact.runtime.task.util.CloseableInputProvider;
import eu.stratosphere.pact.runtime.task.util.TaskConfig;
import eu.stratosphere.pact.runtime.util.KeyGroupedIterator;
import eu.stratosphere.util.Collector;
import eu.stratosphere.util.MutableObjectIterator;
import java.util.Iterator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class GroupReduceCombineDriver<T>
implements PactDriver<GenericCombine<T>, T> {
    private static final Log LOG = LogFactory.getLog(GroupReduceCombineDriver.class);
    private PactTaskContext<GenericCombine<T>, T> taskContext;
    private CloseableInputProvider<T> input;
    private TypeSerializerFactory<T> serializerFactory;
    private TypeComparator<T> comparator;
    private volatile boolean running;

    @Override
    public void setup(PactTaskContext<GenericCombine<T>, T> context) {
        this.taskContext = context;
        this.running = true;
    }

    @Override
    public int getNumberOfInputs() {
        return 1;
    }

    @Override
    public Class<GenericCombine<T>> getStubType() {
        Class<GenericCombine> clazz = GenericCombine.class;
        return clazz;
    }

    @Override
    public boolean requiresComparatorOnInput() {
        return true;
    }

    @Override
    public void prepare() throws Exception {
        TaskConfig config = this.taskContext.getTaskConfig();
        DriverStrategy ls = config.getDriverStrategy();
        MemoryManager memoryManager = this.taskContext.getMemoryManager();
        MutableObjectIterator in = this.taskContext.getInput(0);
        this.serializerFactory = this.taskContext.getInputSerializer(0);
        this.comparator = this.taskContext.getInputComparator(0);
        switch (ls) {
            case SORTED_GROUP_COMBINE: {
                this.input = new AsynchronousPartialSorter(memoryManager, in, this.taskContext.getOwningNepheleTask(), this.serializerFactory, this.comparator.duplicate(), config.getRelativeMemoryDriver());
                break;
            }
            default: {
                throw new RuntimeException("Invalid local strategy provided for CombineTask.");
            }
        }
    }

    @Override
    public void run() throws Exception {
        if (LOG.isDebugEnabled()) {
            LOG.debug((Object)this.taskContext.formatLogString("Preprocessing done, iterator obtained."));
        }
        KeyGroupedIterator<T> iter = new KeyGroupedIterator<T>(this.input.getIterator(), this.serializerFactory.getSerializer(), this.comparator);
        GenericCombine<T> stub = this.taskContext.getStub();
        Collector<T> output = this.taskContext.getOutputCollector();
        while (this.running && iter.nextKey()) {
            stub.combine((Iterator)iter.getValues(), output);
        }
    }

    @Override
    public void cleanup() throws Exception {
        if (this.input != null) {
            this.input.close();
            this.input = null;
        }
    }

    @Override
    public void cancel() {
        this.running = false;
    }
}

