/*
 * Decompiled with CFR 0.152.
 */
package eu.stratosphere.pact.runtime.hash;

import eu.stratosphere.api.common.typeutils.TypeComparator;
import eu.stratosphere.api.common.typeutils.TypePairComparator;
import eu.stratosphere.api.common.typeutils.TypeSerializer;
import eu.stratosphere.core.memory.MemorySegment;
import eu.stratosphere.nephele.services.memorymanager.ListMemorySegmentSource;
import eu.stratosphere.pact.runtime.hash.AbstractHashTableProber;
import eu.stratosphere.pact.runtime.hash.AbstractMutableHashTable;
import eu.stratosphere.pact.runtime.hash.InMemoryPartition;
import eu.stratosphere.pact.runtime.util.MathUtils;
import eu.stratosphere.util.MutableObjectIterator;
import java.io.EOFException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class CompactingHashTable<T>
extends AbstractMutableHashTable<T> {
    private static final Log LOG = LogFactory.getLog(CompactingHashTable.class);
    private static final int MIN_NUM_MEMORY_SEGMENTS = 33;
    private static final int MAX_NUM_PARTITIONS = 32;
    private static final int DEFAULT_RECORD_LEN = 24;
    private static final int HASH_CODE_LEN = 4;
    private static final int POINTER_LEN = 8;
    private static final int RECORD_TABLE_BYTES = 12;
    private static final int RECORD_OVERHEAD_BYTES = 12;
    private static final int NUM_INTRA_BUCKET_BITS = 7;
    private static final int HASH_BUCKET_SIZE = 128;
    private static final int BUCKET_HEADER_LENGTH = 16;
    private static final int NUM_ENTRIES_PER_BUCKET = 9;
    private static final int BUCKET_POINTER_START_OFFSET = 52;
    private static final int HEADER_PARTITION_OFFSET = 0;
    private static final int HEADER_COUNT_OFFSET = 4;
    private static final int HEADER_FORWARD_OFFSET = 8;
    private static final long BUCKET_FORWARD_POINTER_NOT_SET = -1L;
    private final ArrayList<MemorySegment> availableMemory;
    private final int segmentSize;
    private final int bucketsPerSegmentMask;
    private final int bucketsPerSegmentBits;
    private final int avgRecordLen;
    private final ArrayList<InMemoryPartition<T>> partitions;
    private MemorySegment[] buckets;
    private InMemoryPartition<T> compactionMemory;
    private int numBuckets;
    private AtomicBoolean closed = new AtomicBoolean();
    private boolean running = true;
    private int pageSizeInBits;

    public CompactingHashTable(TypeSerializer<T> buildSideSerializer, TypeComparator<T> buildSideComparator, List<MemorySegment> memorySegments) {
        this(buildSideSerializer, buildSideComparator, memorySegments, 24);
    }

    public CompactingHashTable(TypeSerializer<T> buildSideSerializer, TypeComparator<T> buildSideComparator, List<MemorySegment> memorySegments, int avgRecordLen) {
        super(buildSideSerializer, buildSideComparator);
        if (memorySegments == null) {
            throw new NullPointerException();
        }
        if (memorySegments.size() < 33) {
            throw new IllegalArgumentException("Too few memory segments provided. Hash Join needs at least 33 memory segments.");
        }
        this.availableMemory = memorySegments instanceof ArrayList ? (ArrayList<Object>)memorySegments : new ArrayList<MemorySegment>(memorySegments);
        this.avgRecordLen = buildSideSerializer.getLength() > 0 ? buildSideSerializer.getLength() : avgRecordLen;
        this.segmentSize = memorySegments.get(0).size();
        if ((this.segmentSize & this.segmentSize - 1) != 0) {
            throw new IllegalArgumentException("Hash Table requires buffers whose size is a power of 2.");
        }
        int bucketsPerSegment = this.segmentSize >> 7;
        if (bucketsPerSegment == 0) {
            throw new IllegalArgumentException("Hash Table requires buffers of at least 128 bytes.");
        }
        this.bucketsPerSegmentMask = bucketsPerSegment - 1;
        this.bucketsPerSegmentBits = MathUtils.log2strict(bucketsPerSegment);
        this.partitions = new ArrayList();
        this.closed.set(true);
    }

    @Override
    public void open() {
        if (!this.closed.compareAndSet(true, false)) {
            throw new IllegalStateException("Hash Table cannot be opened, because it is currently not closed.");
        }
        int partitionFanOut = CompactingHashTable.getPartitioningFanOutNoEstimates(this.availableMemory.size());
        this.createPartitions(partitionFanOut);
        int numBuckets = CompactingHashTable.getInitialTableSize(this.availableMemory.size(), this.segmentSize, partitionFanOut, this.avgRecordLen);
        this.initTable(numBuckets, (byte)partitionFanOut);
    }

    @Override
    public void close() {
        if (!this.closed.compareAndSet(false, true)) {
            return;
        }
        LOG.debug((Object)"Closing hash table and releasing resources.");
        this.releaseTable();
        this.clearPartitions();
    }

    @Override
    public void abort() {
        this.running = false;
        LOG.debug((Object)"Cancelling hash table operations.");
    }

    @Override
    public List<MemorySegment> getFreeMemory() {
        if (!this.closed.get()) {
            throw new IllegalStateException("Cannot return memory while join is open.");
        }
        return this.availableMemory;
    }

    @Override
    public void buildTable(MutableObjectIterator<T> input) throws IOException {
        Object record = this.buildSideSerializer.createInstance();
        while (this.running && (record = input.next(record)) != null) {
            this.insert(record);
        }
    }

    @Override
    public final void insert(T record) throws IOException {
        long pointer;
        if (this.closed.get()) {
            return;
        }
        int hashCode = CompactingHashTable.hash(this.buildSideComparator.hash(record));
        int posHashCode = hashCode % this.numBuckets;
        int bucketArrayPos = posHashCode >>> this.bucketsPerSegmentBits;
        int bucketInSegmentPos = (posHashCode & this.bucketsPerSegmentMask) << 7;
        MemorySegment bucket = this.buckets[bucketArrayPos];
        byte partitionNumber = bucket.get(bucketInSegmentPos + 0);
        InMemoryPartition<T> p = this.partitions.get(partitionNumber);
        try {
            pointer = p.appendRecord(record);
            if (pointer >> this.pageSizeInBits > (long)this.compactionMemory.getBlockCount()) {
                this.compactionMemory.allocateSegments((int)(pointer >> this.pageSizeInBits));
            }
        }
        catch (EOFException e) {
            try {
                this.compactPartition(partitionNumber);
                pointer = this.partitions.get(partitionNumber).appendRecord(record);
            }
            catch (EOFException ex) {
                throw new RuntimeException("Memory ran out. Compaction failed. numPartitions: " + this.partitions.size() + " minPartition: " + this.getMinPartition() + " maxPartition: " + this.getMaxPartition() + " number of overflow segments: " + this.getOverflowSegmentCount() + " bucketSize: " + this.buckets.length + " Message: " + ex.getMessage());
            }
            catch (IndexOutOfBoundsException ex) {
                throw new RuntimeException("Memory ran out. Compaction failed. numPartitions: " + this.partitions.size() + " minPartition: " + this.getMinPartition() + " maxPartition: " + this.getMaxPartition() + " number of overflow segments: " + this.getOverflowSegmentCount() + " bucketSize: " + this.buckets.length + " Message: " + ex.getMessage());
            }
        }
        catch (IndexOutOfBoundsException e1) {
            try {
                this.compactPartition(partitionNumber);
                pointer = this.partitions.get(partitionNumber).appendRecord(record);
            }
            catch (EOFException ex) {
                throw new RuntimeException("Memory ran out. Compaction failed. numPartitions: " + this.partitions.size() + " minPartition: " + this.getMinPartition() + " maxPartition: " + this.getMaxPartition() + " number of overflow segments: " + this.getOverflowSegmentCount() + " bucketSize: " + this.buckets.length + " Message: " + ex.getMessage());
            }
            catch (IndexOutOfBoundsException ex) {
                throw new RuntimeException("Memory ran out. Compaction failed. numPartitions: " + this.partitions.size() + " minPartition: " + this.getMinPartition() + " maxPartition: " + this.getMaxPartition() + " number of overflow segments: " + this.getOverflowSegmentCount() + " bucketSize: " + this.buckets.length + " Message: " + ex.getMessage());
            }
        }
        this.insertBucketEntryFromStart(p, bucket, bucketInSegmentPos, hashCode, pointer);
    }

    public <PT> HashTableProber<PT> getProber(TypeComparator<PT> probeSideComparator, TypePairComparator<PT, T> pairComparator) {
        return new HashTableProber(probeSideComparator, pairComparator);
    }

    @Override
    public MutableObjectIterator<T> getEntryIterator() {
        return new EntryIterator(this);
    }

    @Override
    public void insertOrReplaceRecord(T record, T tempHolder) throws IOException {
        if (this.closed.get()) {
            return;
        }
        int searchHashCode = CompactingHashTable.hash(this.buildSideComparator.hash(record));
        int posHashCode = searchHashCode % this.numBuckets;
        MemorySegment originalBucket = this.buckets[posHashCode >> this.bucketsPerSegmentBits];
        int originalBucketOffset = (posHashCode & this.bucketsPerSegmentMask) << 7;
        MemorySegment bucket = originalBucket;
        int bucketInSegmentOffset = originalBucketOffset;
        byte partitionNumber = bucket.get(bucketInSegmentOffset + 0);
        InMemoryPartition<T> partition = this.partitions.get(partitionNumber);
        MemorySegment[] overflowSegments = partition.overflowSegments;
        this.buildSideComparator.setReference(record);
        int countInSegment = bucket.getInt(bucketInSegmentOffset + 4);
        int numInSegment = 0;
        int posInSegment = bucketInSegmentOffset + 16;
        long currentForwardPointer = -1L;
        while (true) {
            if (numInSegment < countInSegment) {
                int thisCode = bucket.getInt(posInSegment);
                posInSegment += 4;
                if (thisCode == searchHashCode) {
                    int pointerOffset = bucketInSegmentOffset + 52 + numInSegment * 8;
                    long pointer = bucket.getLong(pointerOffset);
                    ++numInSegment;
                    try {
                        partition.readRecordAt(pointer, tempHolder);
                        if (!this.buildSideComparator.equalToReference(tempHolder)) continue;
                        long newPointer = partition.appendRecord(record);
                        bucket.putLong(pointerOffset, newPointer);
                        partition.setCompaction(false);
                        if (newPointer >> this.pageSizeInBits > (long)this.compactionMemory.getBlockCount()) {
                            this.compactionMemory.allocateSegments((int)(newPointer >> this.pageSizeInBits));
                        }
                        return;
                    }
                    catch (EOFException e) {
                        long newPointer;
                        try {
                            this.compactPartition(partition.getPartitionNumber());
                            newPointer = this.partitions.get(partitionNumber).appendRecord(record);
                        }
                        catch (EOFException ex) {
                            throw new RuntimeException("Memory ran out. Compaction failed. numPartitions: " + this.partitions.size() + " minPartition: " + this.getMinPartition() + " maxPartition: " + this.getMaxPartition() + " number of overflow segments: " + this.getOverflowSegmentCount() + " bucketSize: " + this.buckets.length + " Message: " + ex.getMessage());
                        }
                        catch (IndexOutOfBoundsException ex) {
                            throw new RuntimeException("Memory ran out. Compaction failed. numPartitions: " + this.partitions.size() + " minPartition: " + this.getMinPartition() + " maxPartition: " + this.getMaxPartition() + " number of overflow segments: " + this.getOverflowSegmentCount() + " bucketSize: " + this.buckets.length + " Message: " + ex.getMessage());
                        }
                        bucket.putLong(pointerOffset, newPointer);
                        return;
                    }
                    catch (IndexOutOfBoundsException e) {
                        long newPointer;
                        try {
                            this.compactPartition(partition.getPartitionNumber());
                            newPointer = this.partitions.get(partitionNumber).appendRecord(record);
                        }
                        catch (EOFException ex) {
                            throw new RuntimeException("Memory ran out. Compaction failed. numPartitions: " + this.partitions.size() + " minPartition: " + this.getMinPartition() + " maxPartition: " + this.getMaxPartition() + " number of overflow segments: " + this.getOverflowSegmentCount() + " bucketSize: " + this.buckets.length + " Message: " + ex.getMessage());
                        }
                        catch (IndexOutOfBoundsException ex) {
                            throw new RuntimeException("Memory ran out. Compaction failed. numPartitions: " + this.partitions.size() + " minPartition: " + this.getMinPartition() + " maxPartition: " + this.getMaxPartition() + " number of overflow segments: " + this.getOverflowSegmentCount() + " bucketSize: " + this.buckets.length + " Message: " + ex.getMessage());
                        }
                        bucket.putLong(pointerOffset, newPointer);
                        return;
                    }
                    catch (IOException e) {
                        throw new RuntimeException("Error deserializing record from the hashtable: " + e.getMessage(), e);
                    }
                }
                ++numInSegment;
                continue;
            }
            long newForwardPointer = bucket.getLong(bucketInSegmentOffset + 8);
            if (newForwardPointer == -1L) {
                long pointer = partition.appendRecord(record);
                this.insertBucketEntryFromSearch(partition, originalBucket, bucket, originalBucketOffset, bucketInSegmentOffset, countInSegment, currentForwardPointer, searchHashCode, pointer);
                if (pointer >> this.pageSizeInBits > (long)this.compactionMemory.getBlockCount()) {
                    this.compactionMemory.allocateSegments((int)(pointer >> this.pageSizeInBits));
                }
                return;
            }
            int overflowSegNum = (int)(newForwardPointer >>> 32);
            bucket = overflowSegments[overflowSegNum];
            bucketInSegmentOffset = (int)(newForwardPointer & 0xFFFFFFFFFFFFFFFFL);
            countInSegment = bucket.getInt(bucketInSegmentOffset + 4);
            posInSegment = bucketInSegmentOffset + 16;
            numInSegment = 0;
            currentForwardPointer = newForwardPointer;
        }
    }

    private final void insertBucketEntryFromStart(InMemoryPartition<T> p, MemorySegment bucket, int bucketInSegmentPos, int hashCode, long pointer) throws IOException {
        int count = bucket.getInt(bucketInSegmentPos + 4);
        if (count < 9) {
            bucket.putInt(bucketInSegmentPos + 16 + count * 4, hashCode);
            bucket.putLong(bucketInSegmentPos + 52 + count * 8, pointer);
            bucket.putInt(bucketInSegmentPos + 4, count + 1);
        } else {
            int overflowBucketNum;
            int overflowBucketOffset;
            MemorySegment overflowSeg;
            long forwardForNewBucket;
            long originalForwardPointer = bucket.getLong(bucketInSegmentPos + 8);
            if (originalForwardPointer != -1L) {
                int overflowSegNum = (int)(originalForwardPointer >>> 32);
                MemorySegment seg = p.overflowSegments[overflowSegNum];
                int segOffset = (int)(originalForwardPointer & 0xFFFFFFFFFFFFFFFFL);
                int obCount = seg.getInt(segOffset + 4);
                if (obCount < 9) {
                    seg.putInt(segOffset + 16 + obCount * 4, hashCode);
                    seg.putLong(segOffset + 52 + obCount * 8, pointer);
                    seg.putInt(segOffset + 4, obCount + 1);
                    return;
                }
                forwardForNewBucket = originalForwardPointer;
            } else {
                forwardForNewBucket = -1L;
            }
            if (p.nextOverflowBucket == 0) {
                overflowSeg = this.getNextBuffer();
                overflowBucketOffset = 0;
                overflowBucketNum = p.numOverflowSegments;
                if (p.overflowSegments.length <= p.numOverflowSegments) {
                    MemorySegment[] newSegsArray = new MemorySegment[p.overflowSegments.length * 2];
                    System.arraycopy(p.overflowSegments, 0, newSegsArray, 0, p.overflowSegments.length);
                    p.overflowSegments = newSegsArray;
                }
                p.overflowSegments[p.numOverflowSegments] = overflowSeg;
                ++p.numOverflowSegments;
            } else {
                overflowBucketNum = p.numOverflowSegments - 1;
                overflowSeg = p.overflowSegments[overflowBucketNum];
                overflowBucketOffset = p.nextOverflowBucket << 7;
            }
            p.nextOverflowBucket = p.nextOverflowBucket == this.bucketsPerSegmentMask ? 0 : p.nextOverflowBucket + 1;
            overflowSeg.putLong(overflowBucketOffset + 8, forwardForNewBucket);
            long pointerToNewBucket = (long)overflowBucketNum << 32 | (long)overflowBucketOffset;
            bucket.putLong(bucketInSegmentPos + 8, pointerToNewBucket);
            overflowSeg.putInt(overflowBucketOffset + 16, hashCode);
            overflowSeg.putLong(overflowBucketOffset + 52, pointer);
            overflowSeg.putInt(overflowBucketOffset + 4, 1);
        }
    }

    private final void insertBucketEntryFromSearch(InMemoryPartition<T> partition, MemorySegment originalBucket, MemorySegment currentBucket, int originalBucketOffset, int currentBucketOffset, int countInCurrentBucket, long currentForwardPointer, int hashCode, long pointer) {
        if (countInCurrentBucket < 9) {
            currentBucket.putInt(currentBucketOffset + 16 + countInCurrentBucket * 4, hashCode);
            currentBucket.putLong(currentBucketOffset + 52 + countInCurrentBucket * 8, pointer);
            currentBucket.putInt(currentBucketOffset + 4, countInCurrentBucket + 1);
        } else {
            int overflowBucketNum;
            int overflowBucketOffset;
            MemorySegment overflowSeg;
            if (partition.nextOverflowBucket == 0) {
                overflowSeg = this.getNextBuffer();
                overflowBucketOffset = 0;
                overflowBucketNum = partition.numOverflowSegments;
                if (partition.overflowSegments.length <= partition.numOverflowSegments) {
                    MemorySegment[] newSegsArray = new MemorySegment[partition.overflowSegments.length * 2];
                    System.arraycopy(partition.overflowSegments, 0, newSegsArray, 0, partition.overflowSegments.length);
                    partition.overflowSegments = newSegsArray;
                }
                partition.overflowSegments[partition.numOverflowSegments] = overflowSeg;
                ++partition.numOverflowSegments;
            } else {
                overflowBucketNum = partition.numOverflowSegments - 1;
                overflowSeg = partition.overflowSegments[overflowBucketNum];
                overflowBucketOffset = partition.nextOverflowBucket << 7;
            }
            partition.nextOverflowBucket = partition.nextOverflowBucket == this.bucketsPerSegmentMask ? 0 : partition.nextOverflowBucket + 1;
            overflowSeg.putLong(overflowBucketOffset + 8, currentForwardPointer);
            long pointerToNewBucket = (long)overflowBucketNum << 32 | (long)overflowBucketOffset;
            originalBucket.putLong(originalBucketOffset + 8, pointerToNewBucket);
            overflowSeg.putInt(overflowBucketOffset + 16, hashCode);
            overflowSeg.putLong(overflowBucketOffset + 52, pointer);
            overflowSeg.putInt(overflowBucketOffset + 4, 1);
        }
    }

    private void createPartitions(int numPartitions) {
        this.partitions.clear();
        ListMemorySegmentSource memSource = new ListMemorySegmentSource(this.availableMemory);
        this.pageSizeInBits = MathUtils.log2strict(this.segmentSize);
        for (int i = 0; i < numPartitions; ++i) {
            this.partitions.add(new InMemoryPartition(this.buildSideSerializer, i, memSource, this.segmentSize, this.pageSizeInBits));
        }
        this.compactionMemory = new InMemoryPartition(this.buildSideSerializer, -1, memSource, this.segmentSize, this.pageSizeInBits);
    }

    private void clearPartitions() {
        for (int i = 0; i < this.partitions.size(); ++i) {
            InMemoryPartition<T> p = this.partitions.get(i);
            p.clearAllMemory(this.availableMemory);
        }
        this.partitions.clear();
        this.compactionMemory.clearAllMemory(this.availableMemory);
    }

    private void initTable(int numBuckets, byte numPartitions) {
        int bucketsPerSegment = this.bucketsPerSegmentMask + 1;
        int numSegs = (numBuckets >>> this.bucketsPerSegmentBits) + ((numBuckets & this.bucketsPerSegmentMask) == 0 ? 0 : 1);
        MemorySegment[] table = new MemorySegment[numSegs];
        int bucket = 0;
        for (int i = 0; i < numSegs && bucket < numBuckets; ++i) {
            MemorySegment seg = this.getNextBuffer();
            for (int k = 0; k < bucketsPerSegment && bucket < numBuckets; ++k, ++bucket) {
                int bucketOffset = k * 128;
                byte partition = CompactingHashTable.assignPartition(bucket, numPartitions);
                seg.put(bucketOffset + 0, partition);
                seg.putInt(bucketOffset + 4, 0);
                seg.putLong(bucketOffset + 8, -1L);
            }
            table[i] = seg;
        }
        this.buckets = table;
        this.numBuckets = numBuckets;
    }

    private void releaseTable() {
        this.numBuckets = 0;
        if (this.buckets != null) {
            for (int i = 0; i < this.buckets.length; ++i) {
                this.availableMemory.add(this.buckets[i]);
            }
            this.buckets = null;
        }
    }

    private final MemorySegment getNextBuffer() {
        int s = this.availableMemory.size();
        if (s > 0) {
            return this.availableMemory.remove(s - 1);
        }
        throw new RuntimeException("Memory ran out. numPartitions: " + this.partitions.size() + " minPartition: " + this.getMinPartition() + " maxPartition: " + this.getMaxPartition() + " number of overflow segments: " + this.getOverflowSegmentCount() + " bucketSize: " + this.buckets.length);
    }

    private static final int getPartitioningFanOutNoEstimates(int numBuffers) {
        return Math.max(10, Math.min(numBuffers / 10, 32));
    }

    private int getMaxPartition() {
        int maxPartition = 0;
        for (InMemoryPartition<T> p1 : this.partitions) {
            if (p1.getBlockCount() <= maxPartition) continue;
            maxPartition = p1.getBlockCount();
        }
        return maxPartition;
    }

    private int getMinPartition() {
        int minPartition = Integer.MAX_VALUE;
        for (InMemoryPartition<T> p1 : this.partitions) {
            if (p1.getBlockCount() >= minPartition) continue;
            minPartition = p1.getBlockCount();
        }
        return minPartition;
    }

    private int getOverflowSegmentCount() {
        int result = 0;
        for (InMemoryPartition<T> p : this.partitions) {
            result += p.numOverflowSegments;
        }
        return result;
    }

    private static final int getInitialTableSize(int numBuffers, int bufferSize, int numPartitions, int recordLenBytes) {
        long totalSize = (long)bufferSize * (long)numBuffers;
        long numRecordsStorable = totalSize / (long)(recordLenBytes + 12);
        long bucketBytes = numRecordsStorable * 12L;
        long numBuckets = bucketBytes / 256L + 1L;
        return numBuckets > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int)numBuckets;
    }

    private static final byte assignPartition(int bucket, byte numPartitions) {
        return (byte)(bucket % numPartitions);
    }

    private void compactPartition(int partitionNumber) throws IOException {
        if (this.partitions.get(partitionNumber).isCompacted() || this.closed.get()) {
            return;
        }
        this.compactionMemory.clearAllMemory(this.availableMemory);
        this.compactionMemory.allocateSegments(1);
        Object tempHolder = this.buildSideSerializer.createInstance();
        InMemoryPartition<Object> partition = this.partitions.remove(partitionNumber);
        int numPartitions = this.partitions.size() + 1;
        long pointer = 0L;
        int pointerOffset = 0;
        int bucketOffset = 0;
        int bucketsPerSegment = this.bucketsPerSegmentMask + 1;
        int bucket = partitionNumber;
        for (int i = 0; i < this.buckets.length && bucket < this.numBuckets; ++i) {
            MemorySegment segment = this.buckets[i];
            block1: for (int k = bucket % bucketsPerSegment; k < bucketsPerSegment && bucket < this.numBuckets; k += numPartitions, bucket += numPartitions) {
                bucketOffset = k * 128;
                if (segment.get(bucketOffset + 0) != partitionNumber) {
                    throw new IOException("Accessed wrong bucket! ");
                }
                int count = segment.getInt(bucketOffset + 4);
                for (int j = 0; j < 9 && j < count; ++j) {
                    pointerOffset = bucketOffset + 52 + j * 8;
                    pointer = segment.getLong(pointerOffset);
                    partition.readRecordAt(pointer, tempHolder);
                    pointer = this.compactionMemory.appendRecord(tempHolder);
                    segment.putLong(pointerOffset, pointer);
                }
                long overflowPointer = segment.getLong(bucketOffset + 8);
                if (overflowPointer == -1L) continue;
                int current = 9;
                bucketOffset = (int)(overflowPointer & 0xFFFFFFFFFFFFFFFFL);
                pointerOffset = (int)(overflowPointer & 0xFFFFFFFFFFFFFFFFL) + 52;
                int overflowSegNum = (int)(overflowPointer >>> 32);
                count += partition.overflowSegments[overflowSegNum].getInt(bucketOffset + 4);
                while (current < count) {
                    pointer = partition.overflowSegments[overflowSegNum].getLong(pointerOffset);
                    partition.readRecordAt(pointer, tempHolder);
                    pointer = this.compactionMemory.appendRecord(tempHolder);
                    partition.overflowSegments[overflowSegNum].putLong(pointerOffset, pointer);
                    if (++current % 9 == 0) {
                        count += partition.overflowSegments[overflowSegNum].getInt(bucketOffset + 4);
                        overflowPointer = partition.overflowSegments[overflowSegNum].getLong(bucketOffset + 8);
                        if (overflowPointer == -1L) continue block1;
                        overflowSegNum = (int)(overflowPointer >>> 32);
                        bucketOffset = (int)(overflowPointer & 0xFFFFFFFFFFFFFFFFL);
                        pointerOffset = (int)(overflowPointer & 0xFFFFFFFFFFFFFFFFL) + 52;
                        continue;
                    }
                    pointerOffset += 8;
                }
            }
        }
        this.compactionMemory.setPartitionNumber(partitionNumber);
        this.partitions.add(partitionNumber, this.compactionMemory);
        this.compactionMemory = partition;
        this.partitions.get((int)partitionNumber).overflowSegments = this.compactionMemory.overflowSegments;
        this.partitions.get((int)partitionNumber).numOverflowSegments = this.compactionMemory.numOverflowSegments;
        this.partitions.get((int)partitionNumber).nextOverflowBucket = this.compactionMemory.nextOverflowBucket;
        this.partitions.get(partitionNumber).setCompaction(true);
        this.compactionMemory.resetRecordCounter();
        this.compactionMemory.setPartitionNumber(-1);
        int maxSegmentNumber = 0;
        for (InMemoryPartition<T> e : this.partitions) {
            if (e.getBlockCount() <= maxSegmentNumber) continue;
            maxSegmentNumber = e.getBlockCount();
        }
        this.compactionMemory.allocateSegments(maxSegmentNumber);
        if (this.compactionMemory.getBlockCount() > maxSegmentNumber) {
            this.compactionMemory.releaseSegments(maxSegmentNumber, this.availableMemory);
        }
    }

    private void fastCompactPartition(int partitionNumber) throws IOException {
        if (this.partitions.get(partitionNumber).isCompacted()) {
            return;
        }
    }

    private static final int hash(int code) {
        code = code + 2127912214 + (code << 12);
        code = code ^ 0xC761C23C ^ code >>> 19;
        code = code + 374761393 + (code << 5);
        code = code + -744332180 ^ code << 9;
        code = code + -42973499 + (code << 3);
        return (code = code ^ 0xB55A4F09 ^ code >>> 16) >= 0 ? code : -(code + 1);
    }

    public final class HashTableProber<PT>
    extends AbstractHashTableProber<PT, T> {
        private InMemoryPartition<T> partition;
        private MemorySegment bucket;
        private int pointerOffsetInBucket;

        private HashTableProber(TypeComparator<PT> probeTypeComparator, TypePairComparator<PT, T> pairComparator) {
            super(probeTypeComparator, pairComparator);
        }

        @Override
        public boolean getMatchFor(PT probeSideRecord, T targetForMatch) {
            if (CompactingHashTable.this.closed.get()) {
                return false;
            }
            int searchHashCode = CompactingHashTable.hash(this.probeTypeComparator.hash(probeSideRecord));
            int posHashCode = searchHashCode % CompactingHashTable.this.numBuckets;
            MemorySegment bucket = CompactingHashTable.this.buckets[posHashCode >> CompactingHashTable.this.bucketsPerSegmentBits];
            int bucketInSegmentOffset = (posHashCode & CompactingHashTable.this.bucketsPerSegmentMask) << 7;
            byte partitionNumber = bucket.get(bucketInSegmentOffset + 0);
            InMemoryPartition partition = (InMemoryPartition)CompactingHashTable.this.partitions.get(partitionNumber);
            MemorySegment[] overflowSegments = partition.overflowSegments;
            this.pairComparator.setReference(probeSideRecord);
            int countInSegment = bucket.getInt(bucketInSegmentOffset + 4);
            int numInSegment = 0;
            int posInSegment = bucketInSegmentOffset + 16;
            while (true) {
                if (numInSegment < countInSegment) {
                    int thisCode = bucket.getInt(posInSegment);
                    posInSegment += 4;
                    if (thisCode == searchHashCode) {
                        int pointerOffset = bucketInSegmentOffset + 52 + numInSegment * 8;
                        long pointer = bucket.getLong(pointerOffset);
                        ++numInSegment;
                        try {
                            partition.readRecordAt(pointer, targetForMatch);
                            if (!this.pairComparator.equalToReference(targetForMatch)) continue;
                            this.partition = partition;
                            this.bucket = bucket;
                            this.pointerOffsetInBucket = pointerOffset;
                            return true;
                        }
                        catch (IOException e) {
                            throw new RuntimeException("Error deserializing record from the hashtable: " + e.getMessage(), e);
                        }
                    }
                    ++numInSegment;
                    continue;
                }
                long forwardPointer = bucket.getLong(bucketInSegmentOffset + 8);
                if (forwardPointer == -1L) {
                    return false;
                }
                int overflowSegNum = (int)(forwardPointer >>> 32);
                bucket = overflowSegments[overflowSegNum];
                bucketInSegmentOffset = (int)(forwardPointer & 0xFFFFFFFFFFFFFFFFL);
                countInSegment = bucket.getInt(bucketInSegmentOffset + 4);
                posInSegment = bucketInSegmentOffset + 16;
                numInSegment = 0;
            }
        }

        @Override
        public void updateMatch(T record) throws IOException {
            if (CompactingHashTable.this.closed.get()) {
                return;
            }
            long newPointer = this.partition.appendRecord(record);
            this.bucket.putLong(this.pointerOffsetInBucket, newPointer);
            this.partition.setCompaction(false);
        }
    }

    public class EntryIterator
    implements MutableObjectIterator<T> {
        private CompactingHashTable<T> table;
        private ArrayList<T> cache;
        private int currentBucketIndex = 0;
        private int currentSegmentIndex = 0;
        private int currentBucketOffset = 0;
        private int bucketsPerSegment;
        private boolean done;

        private EntryIterator(CompactingHashTable<T> compactingHashTable) {
            this.table = compactingHashTable;
            this.cache = new ArrayList(64);
            this.done = false;
            this.bucketsPerSegment = this.table.bucketsPerSegmentMask + 1;
        }

        public T next(T reuse) throws IOException {
            if (this.done || this.table.closed.get()) {
                return null;
            }
            if (!this.cache.isEmpty()) {
                reuse = this.cache.remove(this.cache.size() - 1);
                return reuse;
            }
            while (!this.done && this.cache.isEmpty()) {
                this.done = !this.fillCache();
            }
            if (!this.done) {
                reuse = this.cache.remove(this.cache.size() - 1);
                return reuse;
            }
            return null;
        }

        private boolean fillCache() throws IOException {
            if (this.currentBucketIndex >= this.table.numBuckets) {
                return false;
            }
            MemorySegment bucket = this.table.buckets[this.currentSegmentIndex];
            byte partitionNumber = bucket.get(this.currentBucketOffset + 0);
            InMemoryPartition partition = (InMemoryPartition)this.table.partitions.get(partitionNumber);
            MemorySegment[] overflowSegments = partition.overflowSegments;
            int countInSegment = bucket.getInt(this.currentBucketOffset + 4);
            int numInSegment = 0;
            int posInSegment = this.currentBucketOffset + 52;
            int bucketOffset = this.currentBucketOffset;
            while (true) {
                if (numInSegment < countInSegment) {
                    long pointer = bucket.getLong(posInSegment);
                    posInSegment += 8;
                    ++numInSegment;
                    Object target = this.table.buildSideSerializer.createInstance();
                    try {
                        partition.readRecordAt(pointer, target);
                        this.cache.add(target);
                    }
                    catch (IOException e) {
                        throw new RuntimeException("Error deserializing record from the hashtable: " + e.getMessage(), e);
                    }
                }
                long forwardPointer = bucket.getLong(bucketOffset + 8);
                if (forwardPointer == -1L) break;
                int overflowSegNum = (int)(forwardPointer >>> 32);
                bucket = overflowSegments[overflowSegNum];
                bucketOffset = (int)(forwardPointer & 0xFFFFFFFFFFFFFFFFL);
                countInSegment = bucket.getInt(bucketOffset + 4);
                posInSegment = bucketOffset + 52;
                numInSegment = 0;
            }
            ++this.currentBucketIndex;
            if (this.currentBucketIndex % this.bucketsPerSegment == 0) {
                ++this.currentSegmentIndex;
                this.currentBucketOffset = 0;
            } else {
                this.currentBucketOffset += 128;
            }
            return true;
        }
    }
}

