/*
 * Decompiled with CFR 0.152.
 */
package eu.stratosphere.pact.runtime.hash;

import eu.stratosphere.api.common.functions.GenericJoiner;
import eu.stratosphere.api.common.typeutils.TypeComparator;
import eu.stratosphere.api.common.typeutils.TypePairComparator;
import eu.stratosphere.api.common.typeutils.TypeSerializer;
import eu.stratosphere.core.memory.MemorySegment;
import eu.stratosphere.nephele.services.iomanager.IOManager;
import eu.stratosphere.nephele.services.memorymanager.MemoryAllocationException;
import eu.stratosphere.nephele.services.memorymanager.MemoryManager;
import eu.stratosphere.nephele.template.AbstractInvokable;
import eu.stratosphere.pact.runtime.hash.MutableHashTable;
import eu.stratosphere.pact.runtime.task.util.JoinTaskIterator;
import eu.stratosphere.util.Collector;
import eu.stratosphere.util.MutableObjectIterator;
import java.io.IOException;
import java.util.List;

public class BuildSecondHashMatchIterator<V1, V2, O>
implements JoinTaskIterator<V1, V2, O> {
    protected final MutableHashTable<V2, V1> hashJoin;
    private final V2 nextBuildSideObject;
    private final V2 tempBuildSideRecord;
    private final V1 probeCopy;
    protected final TypeSerializer<V1> probeSideSerializer;
    private final MemoryManager memManager;
    private final MutableObjectIterator<V1> firstInput;
    private final MutableObjectIterator<V2> secondInput;
    private volatile boolean running = true;

    public BuildSecondHashMatchIterator(MutableObjectIterator<V1> firstInput, MutableObjectIterator<V2> secondInput, TypeSerializer<V1> serializer1, TypeComparator<V1> comparator1, TypeSerializer<V2> serializer2, TypeComparator<V2> comparator2, TypePairComparator<V1, V2> pairComparator, MemoryManager memManager, IOManager ioManager, AbstractInvokable ownerTask, double memoryFraction) throws MemoryAllocationException {
        this.memManager = memManager;
        this.firstInput = firstInput;
        this.secondInput = secondInput;
        this.probeSideSerializer = serializer1;
        this.nextBuildSideObject = serializer2.createInstance();
        this.tempBuildSideRecord = serializer2.createInstance();
        this.probeCopy = serializer1.createInstance();
        this.hashJoin = this.getHashJoin(serializer2, comparator2, serializer1, comparator1, pairComparator, memManager, ioManager, ownerTask, memoryFraction);
    }

    @Override
    public void open() throws IOException, MemoryAllocationException, InterruptedException {
        this.hashJoin.open(this.secondInput, this.firstInput);
    }

    @Override
    public void close() {
        this.hashJoin.close();
        List<MemorySegment> segments = this.hashJoin.getFreedMemory();
        this.memManager.release(segments);
    }

    @Override
    public boolean callWithNextKey(GenericJoiner<V1, V2, O> matchFunction, Collector<O> collector) throws Exception {
        if (this.hashJoin.nextRecord()) {
            MutableHashTable.HashBucketIterator<V2, V1> buildSideIterator = this.hashJoin.getBuildSideIterator();
            V2 nextBuildSideRecord = this.nextBuildSideObject;
            if ((nextBuildSideRecord = buildSideIterator.next(nextBuildSideRecord)) != null) {
                V2 tmpRec = this.tempBuildSideRecord;
                V1 probeRecord = this.hashJoin.getCurrentProbeRecord();
                if ((tmpRec = buildSideIterator.next(tmpRec)) != null) {
                    Object probeCopy = this.probeCopy;
                    probeCopy = this.probeSideSerializer.copy(probeRecord, probeCopy);
                    matchFunction.join(probeCopy, nextBuildSideRecord, collector);
                    probeCopy = this.probeSideSerializer.copy(probeRecord, probeCopy);
                    matchFunction.join(probeCopy, tmpRec, collector);
                    while (this.running && (nextBuildSideRecord = buildSideIterator.next(nextBuildSideRecord)) != null) {
                        probeCopy = this.probeSideSerializer.copy(probeRecord, probeCopy);
                        matchFunction.join(probeCopy, nextBuildSideRecord, collector);
                    }
                } else {
                    matchFunction.join(probeRecord, nextBuildSideRecord, collector);
                }
            }
            return true;
        }
        return false;
    }

    @Override
    public void abort() {
        this.running = false;
        this.hashJoin.abort();
    }

    public <BT, PT> MutableHashTable<BT, PT> getHashJoin(TypeSerializer<BT> buildSideSerializer, TypeComparator<BT> buildSideComparator, TypeSerializer<PT> probeSideSerializer, TypeComparator<PT> probeSideComparator, TypePairComparator<PT, BT> pairComparator, MemoryManager memManager, IOManager ioManager, AbstractInvokable ownerTask, double memoryFraction) throws MemoryAllocationException {
        int numPages = memManager.computeNumberOfPages(memoryFraction);
        List<MemorySegment> memorySegments = memManager.allocatePages(ownerTask, numPages);
        return new MutableHashTable<BT, PT>(buildSideSerializer, probeSideSerializer, buildSideComparator, probeSideComparator, pairComparator, memorySegments, ioManager);
    }
}

