/*
 * Decompiled with CFR 0.152.
 */
package eu.stratosphere.pact.runtime.hash;

import eu.stratosphere.api.common.typeutils.TypeComparator;
import eu.stratosphere.api.common.typeutils.TypeSerializer;
import eu.stratosphere.core.memory.DataInputView;
import eu.stratosphere.core.memory.DataOutputView;
import eu.stratosphere.core.memory.MemorySegment;
import eu.stratosphere.core.memory.MemorySegmentSource;
import eu.stratosphere.core.memory.SeekableDataInputView;
import eu.stratosphere.core.memory.SeekableDataOutputView;
import eu.stratosphere.nephele.services.iomanager.BlockChannelWriter;
import eu.stratosphere.nephele.services.iomanager.Channel;
import eu.stratosphere.nephele.services.iomanager.ChannelWriterOutputView;
import eu.stratosphere.nephele.services.iomanager.IOManager;
import eu.stratosphere.nephele.services.memorymanager.AbstractPagedInputView;
import eu.stratosphere.nephele.services.memorymanager.AbstractPagedOutputView;
import eu.stratosphere.pact.runtime.io.RandomAccessOutputView;
import eu.stratosphere.pact.runtime.util.MathUtils;
import eu.stratosphere.util.MutableObjectIterator;
import java.io.EOFException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;

public class HashPartition<BT, PT>
extends AbstractPagedInputView
implements SeekableDataInputView {
    protected MemorySegment[] overflowSegments;
    protected int numOverflowSegments;
    protected int nextOverflowBucket;
    private final TypeSerializer<BT> buildSideSerializer;
    private final TypeSerializer<PT> probeSideSerializer;
    protected MemorySegment[] partitionBuffers;
    private int currentBufferNum;
    private int finalBufferLimit;
    private BuildSideBuffer<BT> buildSideWriteBuffer;
    protected ChannelWriterOutputView probeSideBuffer;
    private RandomAccessOutputView overwriteBuffer;
    private long buildSideRecordCounter;
    protected long probeSideRecordCounter;
    private final int segmentSizeBits;
    private final int memorySegmentSize;
    private final int partitionNumber;
    protected int recursionLevel;
    private BlockChannelWriter buildSideChannel;
    protected BlockChannelWriter probeSideChannel;
    protected boolean furtherPartitioning = false;

    protected void setFurtherPatitioning(boolean v) {
        this.furtherPartitioning = v;
    }

    HashPartition(TypeSerializer<BT> buildSideAccessors, TypeSerializer<PT> probeSideAccessors, int partitionNumber, int recursionLevel, MemorySegment initialBuffer, MemorySegmentSource memSource, int segmentSize) {
        super(0);
        this.buildSideSerializer = buildSideAccessors;
        this.probeSideSerializer = probeSideAccessors;
        this.partitionNumber = partitionNumber;
        this.recursionLevel = recursionLevel;
        this.memorySegmentSize = segmentSize;
        this.segmentSizeBits = MathUtils.log2strict(segmentSize);
        this.overflowSegments = new MemorySegment[2];
        this.numOverflowSegments = 0;
        this.nextOverflowBucket = 0;
        this.buildSideWriteBuffer = new BuildSideBuffer(initialBuffer, memSource);
    }

    HashPartition(TypeSerializer<BT> buildSideAccessors, TypeSerializer<PT> probeSideAccessors, int partitionNumber, int recursionLevel, List<MemorySegment> buffers, long buildSideRecordCounter, int segmentSize, int lastSegmentLimit) {
        super(0);
        this.buildSideSerializer = buildSideAccessors;
        this.probeSideSerializer = probeSideAccessors;
        this.partitionNumber = partitionNumber;
        this.recursionLevel = recursionLevel;
        this.memorySegmentSize = segmentSize;
        this.segmentSizeBits = MathUtils.log2strict(segmentSize);
        this.finalBufferLimit = lastSegmentLimit;
        this.partitionBuffers = buffers.toArray(new MemorySegment[buffers.size()]);
        this.buildSideRecordCounter = buildSideRecordCounter;
        this.overflowSegments = new MemorySegment[2];
        this.numOverflowSegments = 0;
        this.nextOverflowBucket = 0;
    }

    public int getPartitionNumber() {
        return this.partitionNumber;
    }

    public int getRecursionLevel() {
        return this.recursionLevel;
    }

    public final boolean isInMemory() {
        return this.buildSideChannel == null;
    }

    public int getBuildSideBlockCount() {
        return this.partitionBuffers == null ? this.buildSideWriteBuffer.getBlockCount() : this.partitionBuffers.length;
    }

    public int getProbeSideBlockCount() {
        return this.probeSideBuffer == null ? -1 : this.probeSideBuffer.getBlockCount();
    }

    public long getBuildSideRecordCount() {
        return this.buildSideRecordCounter;
    }

    public long getProbeSideRecordCount() {
        return this.probeSideRecordCounter;
    }

    public BlockChannelWriter getBuildSideChannel() {
        return this.buildSideChannel;
    }

    public BlockChannelWriter getProbeSideChannel() {
        return this.probeSideChannel;
    }

    public final long insertIntoBuildBuffer(BT record) throws IOException {
        ++this.buildSideRecordCounter;
        if (this.isInMemory()) {
            long pointer = this.buildSideWriteBuffer.getCurrentPointer();
            this.buildSideSerializer.serialize(record, this.buildSideWriteBuffer);
            return this.isInMemory() ? pointer : -1L;
        }
        this.buildSideSerializer.serialize(record, this.buildSideWriteBuffer);
        return -1L;
    }

    public final void insertIntoProbeBuffer(PT record) throws IOException {
        this.probeSideSerializer.serialize(record, (DataOutputView)this.probeSideBuffer);
        ++this.probeSideRecordCounter;
    }

    public int spillPartition(List<MemorySegment> target, IOManager ioAccess, Channel.ID targetChannel, LinkedBlockingQueue<MemorySegment> bufferReturnQueue) throws IOException {
        if (!this.isInMemory()) {
            throw new RuntimeException("Bug in Hybrid Hash Join: Request to spill a partition that has already been spilled.");
        }
        if (this.getBuildSideBlockCount() + this.numOverflowSegments < 2) {
            throw new RuntimeException("Bug in Hybrid Hash Join: Request to spill a partition with less than two buffers.");
        }
        for (int i = 0; i < this.numOverflowSegments; ++i) {
            target.add(this.overflowSegments[i]);
        }
        this.overflowSegments = null;
        this.numOverflowSegments = 0;
        this.nextOverflowBucket = 0;
        this.buildSideChannel = ioAccess.createBlockChannelWriter(targetChannel, bufferReturnQueue);
        return this.buildSideWriteBuffer.spill(this.buildSideChannel);
    }

    public void finalizeBuildPhase(IOManager ioAccess, Channel.Enumerator probeChannelEnumerator, LinkedBlockingQueue<MemorySegment> bufferReturnQueue) throws IOException {
        this.finalBufferLimit = this.buildSideWriteBuffer.getCurrentPositionInSegment();
        this.partitionBuffers = this.buildSideWriteBuffer.close();
        if (!this.isInMemory()) {
            this.buildSideChannel.close();
            this.probeSideChannel = ioAccess.createBlockChannelWriter(probeChannelEnumerator.next(), bufferReturnQueue);
            this.probeSideBuffer = new ChannelWriterOutputView(this.probeSideChannel, this.memorySegmentSize);
        }
    }

    public int finalizeProbePhase(List<MemorySegment> freeMemory, List<HashPartition<BT, PT>> spilledPartitions) throws IOException {
        if (this.isInMemory()) {
            for (int k = 0; k < this.numOverflowSegments; ++k) {
                freeMemory.add(this.overflowSegments[k]);
            }
            this.overflowSegments = null;
            this.numOverflowSegments = 0;
            this.nextOverflowBucket = 0;
            for (int i = 0; i < this.partitionBuffers.length; ++i) {
                freeMemory.add(this.partitionBuffers[i]);
            }
            this.partitionBuffers = null;
            return 0;
        }
        if (this.probeSideRecordCounter == 0L) {
            freeMemory.add(this.probeSideBuffer.getCurrentSegment());
            this.probeSideChannel.close();
            this.buildSideChannel.deleteChannel();
            this.probeSideChannel.deleteChannel();
            return 0;
        }
        this.probeSideBuffer.close();
        this.probeSideChannel.close();
        spilledPartitions.add(this);
        return 1;
    }

    public void clearAllMemory(List<MemorySegment> target) {
        int k;
        if (this.buildSideWriteBuffer != null) {
            if (this.buildSideWriteBuffer.getCurrentSegment() != null) {
                target.add(this.buildSideWriteBuffer.getCurrentSegment());
            }
            target.addAll(((BuildSideBuffer)this.buildSideWriteBuffer).targetList);
            ((BuildSideBuffer)this.buildSideWriteBuffer).targetList.clear();
            this.buildSideWriteBuffer = null;
        }
        if (this.probeSideBuffer != null && this.probeSideBuffer.getCurrentSegment() != null) {
            target.add(this.probeSideBuffer.getCurrentSegment());
            this.probeSideBuffer = null;
        }
        if (this.overflowSegments != null) {
            for (k = 0; k < this.numOverflowSegments; ++k) {
                target.add(this.overflowSegments[k]);
            }
        }
        if (this.partitionBuffers != null) {
            for (k = 0; k < this.partitionBuffers.length; ++k) {
                target.add(this.partitionBuffers[k]);
            }
            this.partitionBuffers = null;
        }
        try {
            if (this.buildSideChannel != null) {
                this.buildSideChannel.close();
                this.buildSideChannel.deleteChannel();
            }
            if (this.probeSideChannel != null) {
                this.probeSideChannel.close();
                this.probeSideChannel.deleteChannel();
            }
        }
        catch (IOException ioex) {
            throw new RuntimeException("Error deleting the partition files. Some temporary files might not be removed.");
        }
    }

    final PartitionIterator getPartitionIterator(TypeComparator<BT> comparator) throws IOException {
        return new PartitionIterator(comparator);
    }

    final int getLastSegmentLimit() {
        return this.finalBufferLimit;
    }

    final SeekableDataOutputView getWriteView() {
        if (this.overwriteBuffer == null) {
            this.overwriteBuffer = new RandomAccessOutputView(this.partitionBuffers, this.memorySegmentSize);
        }
        return this.overwriteBuffer;
    }

    public void prepareProbePhase(IOManager ioAccess, Channel.Enumerator probeChannelEnumerator, LinkedBlockingQueue<MemorySegment> bufferReturnQueue) throws IOException {
        if (this.isInMemory()) {
            return;
        }
        this.probeSideChannel = ioAccess.createBlockChannelWriter(probeChannelEnumerator.next(), bufferReturnQueue);
        this.probeSideBuffer = new ChannelWriterOutputView(this.probeSideChannel, this.memorySegmentSize);
    }

    public void setReadPosition(long pointer) {
        int bufferNum = (int)(pointer >>> this.segmentSizeBits);
        int offset = (int)(pointer & (long)(this.memorySegmentSize - 1));
        this.currentBufferNum = bufferNum;
        this.seekInput(this.partitionBuffers[bufferNum], offset, bufferNum < this.partitionBuffers.length - 1 ? this.memorySegmentSize : this.finalBufferLimit);
    }

    @Override
    protected MemorySegment nextSegment(MemorySegment current) throws IOException {
        ++this.currentBufferNum;
        if (this.currentBufferNum < this.partitionBuffers.length) {
            return this.partitionBuffers[this.currentBufferNum];
        }
        throw new EOFException();
    }

    @Override
    protected int getLimitForSegment(MemorySegment segment) {
        return segment == this.partitionBuffers[this.partitionBuffers.length - 1] ? this.finalBufferLimit : this.memorySegmentSize;
    }

    final class PartitionIterator
    implements MutableObjectIterator<BT> {
        private final TypeComparator<BT> comparator;
        private long currentPointer;
        private int currentHashCode;

        private PartitionIterator(TypeComparator<BT> comparator) throws IOException {
            this.comparator = comparator;
            HashPartition.this.setReadPosition(0L);
        }

        public final BT next(BT reuse) throws IOException {
            int pos = HashPartition.this.getCurrentPositionInSegment();
            int buffer = HashPartition.this.currentBufferNum;
            this.currentPointer = ((long)buffer << HashPartition.this.segmentSizeBits) + (long)pos;
            try {
                reuse = HashPartition.this.buildSideSerializer.deserialize(reuse, (DataInputView)HashPartition.this);
                this.currentHashCode = this.comparator.hash(reuse);
                return reuse;
            }
            catch (EOFException eofex) {
                return null;
            }
        }

        protected final long getPointer() {
            return this.currentPointer;
        }

        protected final int getCurrentHashCode() {
            return this.currentHashCode;
        }
    }

    protected static final class BuildSideBuffer<BT>
    extends AbstractPagedOutputView {
        private final ArrayList<MemorySegment> targetList = new ArrayList();
        private final MemorySegmentSource memSource;
        private BlockChannelWriter writer;
        private int currentBlockNumber;
        private final int sizeBits;

        private BuildSideBuffer(MemorySegment initialSegment, MemorySegmentSource memSource) {
            super(initialSegment, initialSegment.size(), 0);
            this.memSource = memSource;
            this.sizeBits = MathUtils.log2strict(initialSegment.size());
        }

        @Override
        protected MemorySegment nextSegment(MemorySegment current, int bytesUsed) throws IOException {
            MemorySegment next;
            this.finalizeSegment(current, bytesUsed);
            if (this.writer == null) {
                this.targetList.add(current);
                next = this.memSource.nextSegment();
            } else {
                this.writer.writeBlock(current);
                try {
                    next = (MemorySegment)((LinkedBlockingQueue)this.writer.getReturnQueue()).take();
                }
                catch (InterruptedException iex) {
                    throw new IOException("Hash Join Partition was interrupted while grabbing a new write-behind buffer.");
                }
            }
            ++this.currentBlockNumber;
            return next;
        }

        long getCurrentPointer() {
            return ((long)this.currentBlockNumber << this.sizeBits) + (long)this.getCurrentPositionInSegment();
        }

        int getBlockCount() {
            return this.currentBlockNumber + 1;
        }

        int spill(BlockChannelWriter writer) throws IOException {
            this.writer = writer;
            int numSegments = this.targetList.size();
            for (int i = 0; i < numSegments; ++i) {
                this.writer.writeBlock(this.targetList.get(i));
            }
            this.targetList.clear();
            return numSegments;
        }

        MemorySegment[] close() throws IOException {
            MemorySegment current = this.getCurrentSegment();
            if (current == null) {
                throw new IllegalStateException("Illegal State in HashPartition: No current buffer when finilizing build side.");
            }
            this.finalizeSegment(current, this.getCurrentPositionInSegment());
            this.clear();
            if (this.writer == null) {
                this.targetList.add(current);
                MemorySegment[] buffers = this.targetList.toArray(new MemorySegment[this.targetList.size()]);
                this.targetList.clear();
                return buffers;
            }
            this.writer.writeBlock(current);
            return null;
        }

        private final void finalizeSegment(MemorySegment seg, int bytesUsed) {
        }
    }
}

