/*
 * Decompiled with CFR 0.152.
 */
package eu.stratosphere.pact.runtime.task.chaining;

import eu.stratosphere.api.common.functions.Function;
import eu.stratosphere.api.common.functions.GenericCombine;
import eu.stratosphere.api.common.typeutils.TypeComparator;
import eu.stratosphere.api.common.typeutils.TypeComparatorFactory;
import eu.stratosphere.api.common.typeutils.TypeSerializer;
import eu.stratosphere.api.common.typeutils.TypeSerializerFactory;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.core.memory.MemorySegment;
import eu.stratosphere.nephele.services.memorymanager.MemoryManager;
import eu.stratosphere.nephele.template.AbstractInvokable;
import eu.stratosphere.pact.runtime.sort.FixedLengthRecordSorter;
import eu.stratosphere.pact.runtime.sort.InMemorySorter;
import eu.stratosphere.pact.runtime.sort.NormalizedKeySorter;
import eu.stratosphere.pact.runtime.sort.QuickSort;
import eu.stratosphere.pact.runtime.task.RegularPactTask;
import eu.stratosphere.pact.runtime.task.chaining.ChainedDriver;
import eu.stratosphere.pact.runtime.task.chaining.ExceptionInChainedStubException;
import eu.stratosphere.pact.runtime.util.KeyGroupedIterator;
import eu.stratosphere.util.Collector;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;

public class SynchronousChainedCombineDriver<T>
extends ChainedDriver<T, T> {
    private static final int THRESHOLD_FOR_IN_PLACE_SORTING = 32;
    private InMemorySorter<T> sorter;
    private GenericCombine<T> combiner;
    private TypeSerializer<T> serializer;
    private TypeComparator<T> comparator;
    private AbstractInvokable parent;
    private QuickSort sortAlgo = new QuickSort();
    private MemoryManager memManager;
    private volatile boolean running = true;

    @Override
    public void setup(AbstractInvokable parent) {
        GenericCombine combiner;
        this.parent = parent;
        this.combiner = combiner = RegularPactTask.instantiateUserCode(this.config, this.userCodeClassLoader, GenericCombine.class);
        combiner.setRuntimeContext(this.getUdfRuntimeContext());
    }

    @Override
    public void openTask() throws Exception {
        Configuration stubConfig = this.config.getStubParameters();
        RegularPactTask.openUserCode(this.combiner, stubConfig);
        this.memManager = this.parent.getEnvironment().getMemoryManager();
        int numMemoryPages = this.memManager.computeNumberOfPages(this.config.getRelativeMemoryDriver());
        TypeSerializerFactory serializerFactory = this.config.getInputSerializer(0, this.userCodeClassLoader);
        TypeComparatorFactory comparatorFactory = this.config.getDriverComparator(0, this.userCodeClassLoader);
        this.serializer = serializerFactory.getSerializer();
        this.comparator = comparatorFactory.createComparator();
        List<MemorySegment> memory = this.memManager.allocatePages(this.parent, numMemoryPages);
        this.sorter = this.comparator.supportsSerializationWithKeyNormalization() && this.serializer.getLength() > 0 && this.serializer.getLength() <= 32 ? new FixedLengthRecordSorter<T>(this.serializer, this.comparator, memory) : new NormalizedKeySorter<T>(this.serializer, this.comparator.duplicate(), memory);
    }

    @Override
    public void closeTask() throws Exception {
        this.memManager.release(this.sorter.dispose());
        if (!this.running) {
            return;
        }
        RegularPactTask.closeUserCode(this.combiner);
    }

    @Override
    public void cancelTask() {
        this.running = false;
        this.memManager.release(this.sorter.dispose());
    }

    @Override
    public Function getStub() {
        return this.combiner;
    }

    @Override
    public String getTaskName() {
        return this.taskName;
    }

    @Override
    public void collect(T record) {
        try {
            if (this.sorter.write(record)) {
                return;
            }
        }
        catch (IOException e) {
            throw new ExceptionInChainedStubException(this.taskName, e);
        }
        try {
            this.sortAndCombine();
        }
        catch (Exception e) {
            throw new ExceptionInChainedStubException(this.taskName, e);
        }
        this.sorter.reset();
        try {
            if (!this.sorter.write(record)) {
                throw new IOException("Cannot write record to fresh sort buffer. Record too large.");
            }
        }
        catch (IOException e) {
            throw new ExceptionInChainedStubException(this.taskName, e);
        }
    }

    public void close() {
        try {
            this.sortAndCombine();
        }
        catch (Exception e) {
            throw new ExceptionInChainedStubException(this.taskName, e);
        }
        this.outputCollector.close();
    }

    private void sortAndCombine() throws Exception {
        InMemorySorter<T> sorter = this.sorter;
        if (!sorter.isEmpty()) {
            this.sortAlgo.sort(sorter);
            KeyGroupedIterator<T> keyIter = new KeyGroupedIterator<T>(sorter.getIterator(), this.serializer, this.comparator);
            GenericCombine<T> stub = this.combiner;
            Collector output = this.outputCollector;
            while (this.running && keyIter.nextKey()) {
                stub.combine((Iterator)keyIter.getValues(), output);
            }
        }
    }
}

