/*
 * Decompiled with CFR 0.152.
 */
package eu.stratosphere.pact.runtime.task;

import eu.stratosphere.api.common.functions.GenericCrosser;
import eu.stratosphere.api.common.typeutils.TypeSerializer;
import eu.stratosphere.nephele.services.memorymanager.MemoryManager;
import eu.stratosphere.pact.runtime.resettable.BlockResettableMutableObjectIterator;
import eu.stratosphere.pact.runtime.resettable.SpillingResettableMutableObjectIterator;
import eu.stratosphere.pact.runtime.task.DriverStrategy;
import eu.stratosphere.pact.runtime.task.PactDriver;
import eu.stratosphere.pact.runtime.task.PactTaskContext;
import eu.stratosphere.pact.runtime.task.util.TaskConfig;
import eu.stratosphere.util.Collector;
import eu.stratosphere.util.MutableObjectIterator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class CrossDriver<T1, T2, OT>
implements PactDriver<GenericCrosser<T1, T2, OT>, OT> {
    private static final Log LOG = LogFactory.getLog(CrossDriver.class);
    private PactTaskContext<GenericCrosser<T1, T2, OT>, OT> taskContext;
    private MemoryManager memManager;
    private SpillingResettableMutableObjectIterator<?> spillIter;
    private BlockResettableMutableObjectIterator<?> blockIter;
    private int memPagesForBlockSide;
    private int memPagesForSpillingSide;
    private boolean blocked;
    private boolean firstIsOuter;
    private volatile boolean running;

    @Override
    public void setup(PactTaskContext<GenericCrosser<T1, T2, OT>, OT> context) {
        this.taskContext = context;
        this.running = true;
    }

    @Override
    public int getNumberOfInputs() {
        return 2;
    }

    @Override
    public Class<GenericCrosser<T1, T2, OT>> getStubType() {
        Class<GenericCrosser> clazz = GenericCrosser.class;
        return clazz;
    }

    @Override
    public boolean requiresComparatorOnInput() {
        return false;
    }

    @Override
    public void prepare() throws Exception {
        TaskConfig config = this.taskContext.getTaskConfig();
        DriverStrategy ls = config.getDriverStrategy();
        switch (ls) {
            case NESTEDLOOP_BLOCKED_OUTER_FIRST: {
                this.blocked = true;
                this.firstIsOuter = true;
                break;
            }
            case NESTEDLOOP_BLOCKED_OUTER_SECOND: {
                this.blocked = true;
                this.firstIsOuter = false;
                break;
            }
            case NESTEDLOOP_STREAMED_OUTER_FIRST: {
                this.blocked = false;
                this.firstIsOuter = true;
                break;
            }
            case NESTEDLOOP_STREAMED_OUTER_SECOND: {
                this.blocked = false;
                this.firstIsOuter = false;
                break;
            }
            default: {
                throw new RuntimeException("Invalid local strategy for CROSS: " + (Object)((Object)ls));
            }
        }
        this.memManager = this.taskContext.getMemoryManager();
        int numPages = this.memManager.computeNumberOfPages(config.getRelativeMemoryDriver());
        if (numPages < 2) {
            throw new RuntimeException("The Cross task was initialized with too little memory. Cross requires at least 2 memory pages.");
        }
        if (ls == DriverStrategy.NESTEDLOOP_STREAMED_OUTER_FIRST || ls == DriverStrategy.NESTEDLOOP_STREAMED_OUTER_SECOND) {
            this.memPagesForSpillingSide = numPages;
            this.memPagesForBlockSide = 0;
        } else {
            this.memPagesForSpillingSide = numPages > 32 ? 2 : 1;
            this.memPagesForBlockSide = numPages - this.memPagesForSpillingSide;
        }
    }

    @Override
    public void run() throws Exception {
        if (this.blocked) {
            if (this.firstIsOuter) {
                this.runBlockedOuterFirst();
            } else {
                this.runBlockedOuterSecond();
            }
        } else if (this.firstIsOuter) {
            this.runStreamedOuterFirst();
        } else {
            this.runStreamedOuterSecond();
        }
    }

    @Override
    public void cleanup() throws Exception {
        if (this.spillIter != null) {
            this.spillIter.close();
            this.spillIter = null;
        }
        if (this.blockIter != null) {
            this.blockIter.close();
            this.blockIter = null;
        }
    }

    @Override
    public void cancel() {
        this.running = false;
    }

    private void runBlockedOuterFirst() throws Exception {
        if (LOG.isDebugEnabled()) {
            LOG.debug((Object)this.taskContext.formatLogString("Running Cross with Block-Nested-Loops: First input is outer (blocking) side, second input is inner (spilling) side."));
        }
        MutableObjectIterator in1 = this.taskContext.getInput(0);
        MutableObjectIterator in2 = this.taskContext.getInput(1);
        TypeSerializer serializer1 = this.taskContext.getInputSerializer(0).getSerializer();
        TypeSerializer serializer2 = this.taskContext.getInputSerializer(1).getSerializer();
        BlockResettableMutableObjectIterator blockVals = new BlockResettableMutableObjectIterator(this.memManager, in1, serializer1, this.memPagesForBlockSide, this.taskContext.getOwningNepheleTask());
        this.blockIter = blockVals;
        SpillingResettableMutableObjectIterator spillVals = new SpillingResettableMutableObjectIterator(in2, serializer2, this.memManager, this.taskContext.getIOManager(), this.memPagesForSpillingSide, this.taskContext.getOwningNepheleTask());
        this.spillIter = spillVals;
        Object val1Reuse = serializer1.createInstance();
        Object val2Reuse = serializer2.createInstance();
        Object val2Copy = serializer2.createInstance();
        GenericCrosser<T1, T2, OT> crosser = this.taskContext.getStub();
        Collector<OT> collector = this.taskContext.getOutputCollector();
        while (true) {
            Object val2;
            if (this.running && (val2 = spillVals.next(val2Reuse)) != null) {
                Object val1;
                while ((val1 = blockVals.next(val1Reuse)) != null) {
                    val2Copy = serializer2.copy(val2, val2Copy);
                    crosser.cross(val1, val2Copy, collector);
                }
                blockVals.reset();
                continue;
            }
            spillVals.reset();
            if (!this.running || !blockVals.nextBlock()) break;
        }
    }

    private void runBlockedOuterSecond() throws Exception {
        if (LOG.isDebugEnabled()) {
            LOG.debug((Object)this.taskContext.formatLogString("Running Cross with Block-Nested-Loops: First input is inner (spilling) side, second input is outer (blocking) side."));
        }
        MutableObjectIterator in1 = this.taskContext.getInput(0);
        MutableObjectIterator in2 = this.taskContext.getInput(1);
        TypeSerializer serializer1 = this.taskContext.getInputSerializer(0).getSerializer();
        TypeSerializer serializer2 = this.taskContext.getInputSerializer(1).getSerializer();
        SpillingResettableMutableObjectIterator spillVals = new SpillingResettableMutableObjectIterator(in1, serializer1, this.memManager, this.taskContext.getIOManager(), this.memPagesForSpillingSide, this.taskContext.getOwningNepheleTask());
        this.spillIter = spillVals;
        BlockResettableMutableObjectIterator blockVals = new BlockResettableMutableObjectIterator(this.memManager, in2, serializer2, this.memPagesForBlockSide, this.taskContext.getOwningNepheleTask());
        this.blockIter = blockVals;
        Object val1Reuse = serializer1.createInstance();
        Object val1Copy = serializer1.createInstance();
        Object val2Reuse = serializer2.createInstance();
        GenericCrosser<T1, T2, OT> crosser = this.taskContext.getStub();
        Collector<OT> collector = this.taskContext.getOutputCollector();
        while (true) {
            Object val1;
            if (this.running && (val1 = spillVals.next(val1Reuse)) != null) {
                Object val2;
                while (this.running && (val2 = blockVals.next(val2Reuse)) != null) {
                    val1Copy = serializer1.copy(val1, val1Copy);
                    crosser.cross(val1Copy, val2, collector);
                }
                blockVals.reset();
                continue;
            }
            spillVals.reset();
            if (!this.running || !blockVals.nextBlock()) break;
        }
    }

    private void runStreamedOuterFirst() throws Exception {
        Object val1;
        if (LOG.isDebugEnabled()) {
            LOG.debug((Object)this.taskContext.formatLogString("Running Cross with Nested-Loops: First input is outer side, second input is inner (spilling) side."));
        }
        MutableObjectIterator in1 = this.taskContext.getInput(0);
        MutableObjectIterator in2 = this.taskContext.getInput(1);
        TypeSerializer serializer1 = this.taskContext.getInputSerializer(0).getSerializer();
        TypeSerializer serializer2 = this.taskContext.getInputSerializer(1).getSerializer();
        SpillingResettableMutableObjectIterator spillVals = new SpillingResettableMutableObjectIterator(in2, serializer2, this.memManager, this.taskContext.getIOManager(), this.memPagesForSpillingSide, this.taskContext.getOwningNepheleTask());
        this.spillIter = spillVals;
        Object val1Reuse = serializer1.createInstance();
        Object val1Copy = serializer1.createInstance();
        Object val2Reuse = serializer2.createInstance();
        GenericCrosser<T1, T2, OT> crosser = this.taskContext.getStub();
        Collector<OT> collector = this.taskContext.getOutputCollector();
        while (this.running && (val1 = in1.next(val1Reuse)) != null) {
            Object val2;
            while (this.running && (val2 = spillVals.next(val2Reuse)) != null) {
                val1Copy = serializer1.copy(val1, val1Copy);
                crosser.cross(val1Copy, val2, collector);
            }
            spillVals.reset();
        }
    }

    private void runStreamedOuterSecond() throws Exception {
        Object val2;
        if (LOG.isDebugEnabled()) {
            LOG.debug((Object)this.taskContext.formatLogString("Running Cross with Nested-Loops: First input is inner (spilling) side, second input is outer side."));
        }
        MutableObjectIterator in1 = this.taskContext.getInput(0);
        MutableObjectIterator in2 = this.taskContext.getInput(1);
        TypeSerializer serializer1 = this.taskContext.getInputSerializer(0).getSerializer();
        TypeSerializer serializer2 = this.taskContext.getInputSerializer(1).getSerializer();
        SpillingResettableMutableObjectIterator spillVals = new SpillingResettableMutableObjectIterator(in1, serializer1, this.memManager, this.taskContext.getIOManager(), this.memPagesForSpillingSide, this.taskContext.getOwningNepheleTask());
        this.spillIter = spillVals;
        Object val1Reuse = serializer1.createInstance();
        Object val2Reuse = serializer2.createInstance();
        Object val2Copy = serializer2.createInstance();
        GenericCrosser<T1, T2, OT> crosser = this.taskContext.getStub();
        Collector<OT> collector = this.taskContext.getOutputCollector();
        while (this.running && (val2 = in2.next(val2Reuse)) != null) {
            Object val1;
            while (this.running && (val1 = spillVals.next(val1Reuse)) != null) {
                val2Copy = serializer2.copy(val2, val2Copy);
                crosser.cross(val1, val2Copy, collector);
            }
            spillVals.reset();
        }
    }
}

