package eu.stratosphere.pact.runtime.task;

import eu.stratosphere.api.common.functions.GenericCombine;
import eu.stratosphere.api.common.typeutils.TypeComparator;
import eu.stratosphere.api.common.typeutils.TypeSerializerFactory;
import eu.stratosphere.nephele.services.memorymanager.MemoryManager;
import eu.stratosphere.pact.runtime.sort.AsynchronousPartialSorter;
import eu.stratosphere.pact.runtime.task.util.CloseableInputProvider;
import eu.stratosphere.pact.runtime.task.util.TaskConfig;
import eu.stratosphere.pact.runtime.util.KeyGroupedIterator;
import eu.stratosphere.util.Collector;
import eu.stratosphere.util.MutableObjectIterator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:eu/stratosphere/pact/runtime/task/GroupReduceCombineDriver.class */
public class GroupReduceCombineDriver<T> implements PactDriver<GenericCombine<T>, T> {
    private static final Log LOG = LogFactory.getLog(GroupReduceCombineDriver.class);
    private PactTaskContext<GenericCombine<T>, T> taskContext;
    private CloseableInputProvider<T> input;
    private TypeSerializerFactory<T> serializerFactory;
    private TypeComparator<T> comparator;
    private volatile boolean running;

    @Override // eu.stratosphere.pact.runtime.task.PactDriver
    public void setup(PactTaskContext<GenericCombine<T>, T> pactTaskContext) {
        this.taskContext = pactTaskContext;
        this.running = true;
    }

    @Override // eu.stratosphere.pact.runtime.task.PactDriver
    public int getNumberOfInputs() {
        return 1;
    }

    @Override // eu.stratosphere.pact.runtime.task.PactDriver
    public Class<GenericCombine<T>> getStubType() {
        return GenericCombine.class;
    }

    @Override // eu.stratosphere.pact.runtime.task.PactDriver
    public boolean requiresComparatorOnInput() {
        return true;
    }

    @Override // eu.stratosphere.pact.runtime.task.PactDriver
    public void prepare() throws Exception {
        TaskConfig taskConfig = this.taskContext.getTaskConfig();
        DriverStrategy driverStrategy = taskConfig.getDriverStrategy();
        MemoryManager memoryManager = this.taskContext.getMemoryManager();
        MutableObjectIterator<X> input = this.taskContext.getInput(0);
        this.serializerFactory = (TypeSerializerFactory<T>) this.taskContext.getInputSerializer(0);
        this.comparator = (TypeComparator<T>) this.taskContext.getInputComparator(0);
        switch (driverStrategy) {
            case SORTED_GROUP_COMBINE:
                this.input = new AsynchronousPartialSorter(memoryManager, input, this.taskContext.getOwningNepheleTask(), this.serializerFactory, this.comparator.duplicate(), taskConfig.getRelativeMemoryDriver());
                return;
            default:
                throw new RuntimeException("Invalid local strategy provided for CombineTask.");
        }
    }

    @Override // eu.stratosphere.pact.runtime.task.PactDriver
    public void run() throws Exception {
        if (LOG.isDebugEnabled()) {
            LOG.debug(this.taskContext.formatLogString("Preprocessing done, iterator obtained."));
        }
        KeyGroupedIterator keyGroupedIterator = new KeyGroupedIterator(this.input.getIterator(), this.serializerFactory.getSerializer(), this.comparator);
        GenericCombine<T> stub = this.taskContext.getStub();
        Collector<T> outputCollector = this.taskContext.getOutputCollector();
        while (this.running && keyGroupedIterator.nextKey()) {
            stub.combine(keyGroupedIterator.getValues(), outputCollector);
        }
    }

    @Override // eu.stratosphere.pact.runtime.task.PactDriver
    public void cleanup() throws Exception {
        if (this.input != null) {
            this.input.close();
            this.input = null;
        }
    }

    @Override // eu.stratosphere.pact.runtime.task.PactDriver
    public void cancel() {
        this.running = false;
    }
}
