/*
 * Decompiled with CFR 0.152.
 */
package eu.stratosphere.pact.runtime.sort;

import eu.stratosphere.api.common.functions.GenericJoiner;
import eu.stratosphere.api.common.typeutils.TypeComparator;
import eu.stratosphere.api.common.typeutils.TypePairComparator;
import eu.stratosphere.api.common.typeutils.TypeSerializer;
import eu.stratosphere.core.memory.MemorySegment;
import eu.stratosphere.nephele.services.iomanager.IOManager;
import eu.stratosphere.nephele.services.memorymanager.MemoryAllocationException;
import eu.stratosphere.nephele.services.memorymanager.MemoryManager;
import eu.stratosphere.nephele.template.AbstractInvokable;
import eu.stratosphere.pact.runtime.resettable.BlockResettableIterator;
import eu.stratosphere.pact.runtime.resettable.SpillingResettableIterator;
import eu.stratosphere.pact.runtime.task.util.JoinTaskIterator;
import eu.stratosphere.pact.runtime.util.KeyGroupedIterator;
import eu.stratosphere.util.Collector;
import eu.stratosphere.util.MutableObjectIterator;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class MergeMatchIterator<T1, T2, O>
implements JoinTaskIterator<T1, T2, O> {
    private static final Log LOG = LogFactory.getLog(MergeMatchIterator.class);
    private TypePairComparator<T1, T2> comp;
    private KeyGroupedIterator<T1> iterator1;
    private KeyGroupedIterator<T2> iterator2;
    private final TypeSerializer<T1> serializer1;
    private final TypeSerializer<T2> serializer2;
    private T1 copy1;
    private T1 spillHeadCopy;
    private T2 copy2;
    private T2 blockHeadCopy;
    private final BlockResettableIterator<T2> blockIt;
    private final List<MemorySegment> memoryForSpillingIterator;
    private final MemoryManager memoryManager;
    private final IOManager ioManager;

    public MergeMatchIterator(MutableObjectIterator<T1> input1, MutableObjectIterator<T2> input2, TypeSerializer<T1> serializer1, TypeComparator<T1> comparator1, TypeSerializer<T2> serializer2, TypeComparator<T2> comparator2, TypePairComparator<T1, T2> pairComparator, MemoryManager memoryManager, IOManager ioManager, int numMemoryPages, AbstractInvokable parentTask) throws MemoryAllocationException {
        if (numMemoryPages < 2) {
            throw new IllegalArgumentException("Merger needs at least 2 memory pages.");
        }
        this.comp = pairComparator;
        this.serializer1 = serializer1;
        this.serializer2 = serializer2;
        this.copy1 = serializer1.createInstance();
        this.spillHeadCopy = serializer1.createInstance();
        this.copy2 = serializer2.createInstance();
        this.blockHeadCopy = serializer2.createInstance();
        this.memoryManager = memoryManager;
        this.ioManager = ioManager;
        this.iterator1 = new KeyGroupedIterator<T1>(input1, this.serializer1, comparator1.duplicate());
        this.iterator2 = new KeyGroupedIterator<T2>(input2, this.serializer2, comparator2.duplicate());
        int numPagesForSpiller = numMemoryPages > 20 ? 2 : 1;
        this.blockIt = new BlockResettableIterator<T2>(this.memoryManager, this.serializer2, numMemoryPages - numPagesForSpiller, parentTask);
        this.memoryForSpillingIterator = memoryManager.allocatePages(parentTask, numPagesForSpiller);
    }

    @Override
    public void open() throws IOException {
    }

    @Override
    public void close() {
        if (this.blockIt != null) {
            try {
                this.blockIt.close();
            }
            catch (Throwable t) {
                LOG.error((Object)("Error closing block memory iterator: " + t.getMessage()), t);
            }
        }
        this.memoryManager.release(this.memoryForSpillingIterator);
    }

    @Override
    public void abort() {
        this.close();
    }

    @Override
    public boolean callWithNextKey(GenericJoiner<T1, T2, O> matchFunction, Collector<O> collector) throws Exception {
        int comp;
        if (!this.iterator1.nextKey() || !this.iterator2.nextKey()) {
            while (this.iterator1.nextKey()) {
            }
            while (this.iterator2.nextKey()) {
            }
            return false;
        }
        TypePairComparator<T1, T2> comparator = this.comp;
        comparator.setReference(this.iterator1.getCurrent());
        T2 current2 = this.iterator2.getCurrent();
        while ((comp = comparator.compareToReference(current2)) != 0) {
            if (comp < 0) {
                if (!this.iterator2.nextKey()) {
                    return false;
                }
                current2 = this.iterator2.getCurrent();
                continue;
            }
            if (!this.iterator1.nextKey()) {
                return false;
            }
            comparator.setReference(this.iterator1.getCurrent());
        }
        KeyGroupedIterator.ValuesIterator values1 = this.iterator1.getValues();
        KeyGroupedIterator.ValuesIterator values2 = this.iterator2.getValues();
        Object firstV1 = values1.next();
        Object firstV2 = values2.next();
        boolean v1HasNext = values1.hasNext();
        boolean v2HasNext = values2.hasNext();
        if (v1HasNext) {
            if (v2HasNext) {
                this.crossMwithNValues(firstV1, values1, firstV2, values2, matchFunction, collector);
            } else {
                this.crossSecond1withNValues(firstV2, firstV1, values1, matchFunction, collector);
            }
        } else if (v2HasNext) {
            this.crossFirst1withNValues(firstV1, firstV2, values2, matchFunction, collector);
        } else {
            matchFunction.join(firstV1, firstV2, collector);
        }
        return true;
    }

    private void crossFirst1withNValues(T1 val1, T2 firstValN, Iterator<T2> valsN, GenericJoiner<T1, T2, O> matchFunction, Collector<O> collector) throws Exception {
        this.copy1 = this.serializer1.copy(val1, this.copy1);
        matchFunction.join(this.copy1, firstValN, collector);
        boolean more = true;
        do {
            T2 nRec = valsN.next();
            if (valsN.hasNext()) {
                this.copy1 = this.serializer1.copy(val1, this.copy1);
                matchFunction.join(this.copy1, nRec, collector);
                continue;
            }
            matchFunction.join(val1, nRec, collector);
            more = false;
        } while (more);
    }

    private void crossSecond1withNValues(T2 val1, T1 firstValN, Iterator<T1> valsN, GenericJoiner<T1, T2, O> matchFunction, Collector<O> collector) throws Exception {
        this.copy2 = this.serializer2.copy(val1, this.copy2);
        matchFunction.join(firstValN, this.copy2, collector);
        boolean more = true;
        do {
            T1 nRec = valsN.next();
            if (valsN.hasNext()) {
                this.copy2 = this.serializer2.copy(val1, this.copy2);
                matchFunction.join(nRec, this.copy2, collector);
                continue;
            }
            matchFunction.join(nRec, val1, collector);
            more = false;
        } while (more);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void crossMwithNValues(T1 firstV1, Iterator<T1> spillVals, T2 firstV2, Iterator<T2> blockVals, GenericJoiner<T1, T2, O> matchFunction, Collector<O> collector) throws Exception {
        this.copy1 = this.serializer1.copy(firstV1, this.copy1);
        this.blockHeadCopy = this.serializer2.copy(firstV2, this.blockHeadCopy);
        matchFunction.join(this.copy1, firstV2, collector);
        SpillingResettableIterator<T1> spillIt = null;
        try {
            T1 nextSpillVal;
            Iterator<T1> leftSideIter;
            this.blockIt.reopen(blockVals);
            while (this.blockIt.hasNext()) {
                T2 nextBlockRec = this.blockIt.next();
                this.copy1 = this.serializer1.copy(firstV1, this.copy1);
                matchFunction.join(this.copy1, nextBlockRec, collector);
            }
            this.blockIt.reset();
            boolean spillingRequired = this.blockIt.hasFurtherInput();
            if (spillingRequired) {
                spillIt = new SpillingResettableIterator<T1>(spillVals, this.serializer1, this.memoryManager, this.ioManager, this.memoryForSpillingIterator);
                leftSideIter = spillIt;
                spillIt.open();
                this.spillHeadCopy = this.serializer1.copy(firstV1, this.spillHeadCopy);
            } else {
                leftSideIter = spillVals;
            }
            while (leftSideIter.hasNext()) {
                nextSpillVal = leftSideIter.next();
                this.copy1 = this.serializer1.copy(nextSpillVal, this.copy1);
                this.copy2 = this.serializer2.copy(this.blockHeadCopy, this.copy2);
                matchFunction.join(this.copy1, this.copy2, collector);
                while (this.blockIt.hasNext()) {
                    T2 nextBlockRec = this.blockIt.next();
                    this.copy1 = this.serializer1.copy(nextSpillVal, this.copy1);
                    matchFunction.join(this.copy1, nextBlockRec, collector);
                }
                this.blockIt.reset();
            }
            if (!spillingRequired) {
                return;
            }
            while (this.blockIt.nextBlock()) {
                spillIt.reset();
                while (this.blockIt.hasNext()) {
                    this.copy1 = this.serializer1.copy(this.spillHeadCopy, this.copy1);
                    T2 nextBlockVal = this.blockIt.next();
                    matchFunction.join(this.copy1, nextBlockVal, collector);
                }
                this.blockIt.reset();
                while (spillIt.hasNext()) {
                    nextSpillVal = spillIt.next();
                    while (this.blockIt.hasNext()) {
                        T2 nextBlockVal = this.blockIt.next();
                        this.copy1 = this.serializer1.copy(nextSpillVal, this.copy1);
                        matchFunction.join(this.copy1, nextBlockVal, collector);
                    }
                    this.blockIt.reset();
                }
                spillIt.reset();
            }
        }
        finally {
            if (spillIt != null) {
                this.memoryForSpillingIterator.addAll(spillIt.close());
            }
        }
    }
}

