/*
 * Decompiled with CFR 0.152.
 */
package eu.stratosphere.pact.runtime.sort;

import eu.stratosphere.api.common.typeutils.TypeComparator;
import eu.stratosphere.api.common.typeutils.TypeSerializer;
import eu.stratosphere.api.common.typeutils.TypeSerializerFactory;
import eu.stratosphere.core.memory.DataOutputView;
import eu.stratosphere.core.memory.MemorySegment;
import eu.stratosphere.nephele.services.iomanager.BlockChannelAccess;
import eu.stratosphere.nephele.services.iomanager.BlockChannelReader;
import eu.stratosphere.nephele.services.iomanager.BlockChannelWriter;
import eu.stratosphere.nephele.services.iomanager.Channel;
import eu.stratosphere.nephele.services.iomanager.ChannelReaderInputView;
import eu.stratosphere.nephele.services.iomanager.ChannelWriterOutputView;
import eu.stratosphere.nephele.services.iomanager.IOManager;
import eu.stratosphere.nephele.services.memorymanager.MemoryAllocationException;
import eu.stratosphere.nephele.services.memorymanager.MemoryManager;
import eu.stratosphere.nephele.template.AbstractInvokable;
import eu.stratosphere.pact.runtime.io.ChannelReaderInputViewIterator;
import eu.stratosphere.pact.runtime.sort.ExceptionHandler;
import eu.stratosphere.pact.runtime.sort.FixedLengthRecordSorter;
import eu.stratosphere.pact.runtime.sort.InMemorySorter;
import eu.stratosphere.pact.runtime.sort.IndexedSorter;
import eu.stratosphere.pact.runtime.sort.MergeIterator;
import eu.stratosphere.pact.runtime.sort.NormalizedKeySorter;
import eu.stratosphere.pact.runtime.sort.QuickSort;
import eu.stratosphere.pact.runtime.sort.Sorter;
import eu.stratosphere.util.Collector;
import eu.stratosphere.util.MutableObjectIterator;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class UnilateralSortMerger<E>
implements Sorter<E> {
    private static final Log LOG = LogFactory.getLog(UnilateralSortMerger.class);
    private static final int THRESHOLD_FOR_IN_PLACE_SORTING = 32;
    protected static final int MIN_NUM_WRITE_BUFFERS = 2;
    protected static final int MAX_NUM_WRITE_BUFFERS = 64;
    protected static final int MIN_NUM_SORT_MEM_SEGMENTS = 10;
    private final ThreadBase<E> readThread;
    private final ThreadBase<E> sortThread;
    private final ThreadBase<E> spillThread;
    protected final ArrayList<MemorySegment> sortReadMemory;
    protected final ArrayList<MemorySegment> writeMemory;
    protected final MemoryManager memoryManager;
    private final HashSet<BlockChannelAccess<?, ?>> openChannels;
    private final HashSet<Channel.ID> channelsToDeleteAtShutdown;
    protected final Object iteratorLock = new Object();
    protected volatile MutableObjectIterator<E> iterator;
    protected volatile IOException iteratorException;
    protected volatile boolean closed;
    private static final CircularElement<Object> EOF_MARKER = new CircularElement();
    private static final CircularElement<Object> SPILLING_MARKER = new CircularElement();

    public UnilateralSortMerger(MemoryManager memoryManager, IOManager ioManager, MutableObjectIterator<E> input, AbstractInvokable parentTask, TypeSerializerFactory<E> serializerFactory, TypeComparator<E> comparator, double memoryFraction, int maxNumFileHandles, float startSpillingFraction) throws IOException, MemoryAllocationException {
        this(memoryManager, ioManager, input, parentTask, serializerFactory, comparator, memoryFraction, -1, maxNumFileHandles, startSpillingFraction);
    }

    public UnilateralSortMerger(MemoryManager memoryManager, IOManager ioManager, MutableObjectIterator<E> input, AbstractInvokable parentTask, TypeSerializerFactory<E> serializerFactory, TypeComparator<E> comparator, double memoryFraction, int numSortBuffers, int maxNumFileHandles, float startSpillingFraction) throws IOException, MemoryAllocationException {
        this(memoryManager, ioManager, input, parentTask, serializerFactory, comparator, memoryFraction, numSortBuffers, maxNumFileHandles, startSpillingFraction, false);
    }

    protected UnilateralSortMerger(MemoryManager memoryManager, IOManager ioManager, MutableObjectIterator<E> input, AbstractInvokable parentTask, TypeSerializerFactory<E> serializerFactory, TypeComparator<E> comparator, double memoryFraction, int numSortBuffers, int maxNumFileHandles, float startSpillingFraction, boolean noSpillingMemory) throws IOException, MemoryAllocationException {
        int numWriteBuffers;
        if (memoryManager == null | (ioManager == null && !noSpillingMemory) | serializerFactory == null | comparator == null) {
            throw new NullPointerException();
        }
        if (parentTask == null) {
            throw new NullPointerException("Parent Task must not be null.");
        }
        if (maxNumFileHandles < 2) {
            throw new IllegalArgumentException("Merger cannot work with less than two file handles.");
        }
        this.memoryManager = memoryManager;
        int numPagesTotal = memoryManager.computeNumberOfPages(memoryFraction);
        if (numPagesTotal < 12) {
            throw new IllegalArgumentException("Too little memory provided to sorter to perform task. Required are at least 12 pages. Current page size is " + memoryManager.getPageSize() + " bytes.");
        }
        if (noSpillingMemory) {
            numWriteBuffers = 0;
        } else {
            int minBuffers = 2 + maxNumFileHandles;
            int desiredBuffers = 2 + 2 * maxNumFileHandles;
            if (desiredBuffers > numPagesTotal) {
                numWriteBuffers = 2;
                if (minBuffers > numPagesTotal) {
                    maxNumFileHandles = numPagesTotal - 2;
                    if (LOG.isDebugEnabled()) {
                        LOG.debug((Object)("Reducing maximal merge fan-in to " + maxNumFileHandles + " due to limited memory availability during merge"));
                    }
                }
            } else {
                int designatedWriteBuffers = numPagesTotal / (maxNumFileHandles + 1);
                int fractional = numPagesTotal / 64;
                int maximal = numPagesTotal - 10;
                numWriteBuffers = Math.max(2, Math.min(Math.min(64, maximal), Math.min(designatedWriteBuffers, fractional)));
            }
        }
        int sortMemPages = numPagesTotal - numWriteBuffers;
        long sortMemory = (long)sortMemPages * (long)memoryManager.getPageSize();
        if (numSortBuffers < 1) {
            numSortBuffers = sortMemory > 0x6000000L ? 3 : (sortMemPages >= 20 ? 2 : 1);
        }
        int numSegmentsPerSortBuffer = sortMemPages / numSortBuffers;
        if (LOG.isDebugEnabled()) {
            LOG.debug((Object)("Instantiating sorter with " + sortMemPages + " pages of sorting memory (=" + sortMemory + " bytes total) divided over " + numSortBuffers + " sort buffers (" + numSegmentsPerSortBuffer + " pages per buffer). Using " + numWriteBuffers + " buffers for writing sorted results and merging maximally " + maxNumFileHandles + " streams at once."));
        }
        this.writeMemory = new ArrayList(numWriteBuffers);
        this.sortReadMemory = new ArrayList(sortMemPages);
        memoryManager.allocatePages(parentTask, this.sortReadMemory, sortMemPages);
        if (numWriteBuffers > 0) {
            memoryManager.allocatePages(parentTask, this.writeMemory, numWriteBuffers);
        }
        CircularQueues circularQueues = new CircularQueues();
        TypeSerializer serializer = serializerFactory.getSerializer();
        Iterator<MemorySegment> segments = this.sortReadMemory.iterator();
        for (int i = 0; i < numSortBuffers; ++i) {
            int k;
            ArrayList<MemorySegment> sortSegments = new ArrayList<MemorySegment>(numSegmentsPerSortBuffer);
            int n = k = i == numSortBuffers - 1 ? Integer.MAX_VALUE : numSegmentsPerSortBuffer;
            while (k > 0 && segments.hasNext()) {
                sortSegments.add(segments.next());
                --k;
            }
            TypeComparator comp = comparator.duplicate();
            InMemorySorter buffer = comp.supportsSerializationWithKeyNormalization() && serializer.getLength() > 0 && serializer.getLength() <= 32 ? new FixedLengthRecordSorter(serializerFactory.getSerializer(), comp, sortSegments) : new NormalizedKeySorter(serializerFactory.getSerializer(), comp, sortSegments);
            CircularElement element = new CircularElement(i, buffer);
            circularQueues.empty.add(element);
        }
        ExceptionHandler<IOException> exceptionHandler = new ExceptionHandler<IOException>(){

            @Override
            public void handleException(IOException exception) {
                if (!UnilateralSortMerger.this.closed) {
                    UnilateralSortMerger.this.setResultIteratorException(exception);
                    UnilateralSortMerger.this.close();
                }
            }
        };
        this.channelsToDeleteAtShutdown = new HashSet(64);
        this.openChannels = new HashSet(64);
        this.readThread = this.getReadingThread(exceptionHandler, input, circularQueues, parentTask, serializer, (long)(startSpillingFraction * (float)sortMemory));
        this.sortThread = this.getSortingThread(exceptionHandler, circularQueues, parentTask);
        this.spillThread = this.getSpillingThread(exceptionHandler, circularQueues, parentTask, memoryManager, ioManager, serializerFactory, comparator, this.sortReadMemory, this.writeMemory, maxNumFileHandles);
        this.startThreads();
    }

    protected void startThreads() {
        if (this.readThread != null) {
            this.readThread.start();
        }
        if (this.sortThread != null) {
            this.sortThread.start();
        }
        if (this.spillThread != null) {
            this.spillThread.start();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void close() {
        Object object = this;
        synchronized (object) {
            if (this.closed) {
                return;
            }
            this.closed = true;
        }
        try {
            object = this.iteratorLock;
            synchronized (object) {
                if (this.iteratorException == null) {
                    this.iteratorException = new IOException("The sorter has been closed.");
                    this.iteratorLock.notifyAll();
                }
            }
            if (this.readThread != null) {
                try {
                    this.readThread.shutdown();
                }
                catch (Throwable t) {
                    LOG.error((Object)("Error shutting down reader thread: " + t.getMessage()), t);
                }
            }
            if (this.sortThread != null) {
                try {
                    this.sortThread.shutdown();
                }
                catch (Throwable t) {
                    LOG.error((Object)("Error shutting down sorter thread: " + t.getMessage()), t);
                }
            }
            if (this.spillThread != null) {
                try {
                    this.spillThread.shutdown();
                }
                catch (Throwable t) {
                    LOG.error((Object)("Error shutting down spilling thread: " + t.getMessage()), t);
                }
            }
            try {
                if (this.readThread != null) {
                    this.readThread.join();
                }
                if (this.sortThread != null) {
                    this.sortThread.join();
                }
                if (this.spillThread != null) {
                    this.spillThread.join();
                }
            }
            catch (InterruptedException iex) {
                LOG.debug((Object)"Closing of sort/merger was interrupted. The reading/sorting/spilling threads may still be working.", (Throwable)iex);
            }
        }
        finally {
            Object channel;
            Iterator<Object> channels;
            try {
                if (!this.writeMemory.isEmpty()) {
                    this.memoryManager.release(this.writeMemory);
                }
                this.writeMemory.clear();
            }
            catch (Throwable t) {}
            try {
                if (!this.sortReadMemory.isEmpty()) {
                    this.memoryManager.release(this.sortReadMemory);
                }
                this.sortReadMemory.clear();
            }
            catch (Throwable t) {}
            while (!this.openChannels.isEmpty()) {
                try {
                    channels = this.openChannels.iterator();
                    while (channels.hasNext()) {
                        channel = channels.next();
                        channels.remove();
                        ((BlockChannelAccess)channel).closeAndDelete();
                    }
                }
                catch (Throwable t) {
                }
            }
            while (!this.channelsToDeleteAtShutdown.isEmpty()) {
                try {
                    channels = this.channelsToDeleteAtShutdown.iterator();
                    while (channels.hasNext()) {
                        channel = (Channel.ID)channels.next();
                        channels.remove();
                        try {
                            File f = new File(((Channel.ID)channel).getPath());
                            if (!f.exists()) continue;
                            f.delete();
                        }
                        catch (Throwable throwable) {}
                    }
                }
                catch (Throwable throwable) {
                }
            }
        }
    }

    protected ThreadBase<E> getReadingThread(ExceptionHandler<IOException> exceptionHandler, MutableObjectIterator<E> reader, CircularQueues<E> queues, AbstractInvokable parentTask, TypeSerializer<E> serializer, long startSpillingBytes) {
        return new ReadingThread<Object>(exceptionHandler, reader, queues, serializer.createInstance(), parentTask, startSpillingBytes);
    }

    protected ThreadBase<E> getSortingThread(ExceptionHandler<IOException> exceptionHandler, CircularQueues<E> queues, AbstractInvokable parentTask) {
        return new SortingThread<E>(exceptionHandler, queues, parentTask);
    }

    protected ThreadBase<E> getSpillingThread(ExceptionHandler<IOException> exceptionHandler, CircularQueues<E> queues, AbstractInvokable parentTask, MemoryManager memoryManager, IOManager ioManager, TypeSerializerFactory<E> serializerFactory, TypeComparator<E> comparator, List<MemorySegment> sortReadMemory, List<MemorySegment> writeMemory, int maxFileHandles) {
        return new SpillingThread(exceptionHandler, queues, parentTask, memoryManager, ioManager, serializerFactory.getSerializer(), comparator, sortReadMemory, writeMemory, maxFileHandles);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public MutableObjectIterator<E> getIterator() throws InterruptedException {
        Object object = this.iteratorLock;
        synchronized (object) {
            while (this.iterator == null && this.iteratorException == null) {
                this.iteratorLock.wait();
            }
            if (this.iteratorException != null) {
                throw new RuntimeException("Error obtaining the sorted input: " + this.iteratorException.getMessage(), this.iteratorException);
            }
            return this.iterator;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected final void setResultIterator(MutableObjectIterator<E> iterator) {
        Object object = this.iteratorLock;
        synchronized (object) {
            if (this.iteratorException == null) {
                this.iterator = iterator;
                this.iteratorLock.notifyAll();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected final void setResultIteratorException(IOException ioex) {
        Object object = this.iteratorLock;
        synchronized (object) {
            if (this.iteratorException == null) {
                this.iteratorException = ioex;
                this.iteratorLock.notifyAll();
            }
        }
    }

    protected static <T> CircularElement<T> endMarker() {
        CircularElement<Object> c = EOF_MARKER;
        return c;
    }

    protected static <T> CircularElement<T> spillingMarker() {
        CircularElement<Object> c = SPILLING_MARKER;
        return c;
    }

    protected static final class ChannelWithBlockCount {
        private final Channel.ID channel;
        private final int blockCount;

        public ChannelWithBlockCount(Channel.ID channel, int blockCount) {
            this.channel = channel;
            this.blockCount = blockCount;
        }

        public Channel.ID getChannel() {
            return this.channel;
        }

        public int getBlockCount() {
            return this.blockCount;
        }
    }

    public static final class InputDataCollector<E>
    implements Collector<E> {
        private final CircularQueues<E> queues;
        private InMemorySorter<E> currentBuffer;
        private CircularElement<E> currentElement;
        private long bytesUntilSpilling;
        private boolean spillingInThisBuffer;
        private volatile boolean running;

        public InputDataCollector(CircularQueues<E> queues, long startSpillingBytes) {
            this.queues = queues;
            this.bytesUntilSpilling = startSpillingBytes;
            this.running = true;
            this.grabBuffer();
        }

        private void grabBuffer() {
            while (this.currentElement == null) {
                try {
                    this.currentElement = this.queues.empty.take();
                }
                catch (InterruptedException iex) {
                    if (this.running) {
                        LOG.error((Object)"Reading thread was interrupted (without being shut down) while grabbing a buffer. Retrying to grab buffer...");
                        continue;
                    }
                    return;
                }
            }
            this.currentBuffer = this.currentElement.buffer;
            if (!this.currentBuffer.isEmpty()) {
                throw new RuntimeException("New sort-buffer is not empty.");
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)("Retrieved empty read buffer " + this.currentElement.id + "."));
            }
            this.spillingInThisBuffer = this.currentBuffer.getCapacity() <= this.bytesUntilSpilling;
        }

        public void collect(E record) {
            try {
                if (this.spillingInThisBuffer) {
                    if (this.currentBuffer.write(record)) {
                        if (this.bytesUntilSpilling - this.currentBuffer.getOccupancy() <= 0L) {
                            this.bytesUntilSpilling = 0L;
                            this.queues.sort.add(UnilateralSortMerger.spillingMarker());
                        }
                        return;
                    }
                } else if (this.currentBuffer.write(record)) {
                    return;
                }
                if (this.bytesUntilSpilling > 0L) {
                    this.bytesUntilSpilling -= this.currentBuffer.getCapacity();
                    if (this.bytesUntilSpilling <= 0L) {
                        this.bytesUntilSpilling = 0L;
                        this.queues.sort.add(UnilateralSortMerger.spillingMarker());
                    }
                }
                if (LOG.isDebugEnabled()) {
                    LOG.debug((Object)("Emitting full buffer from reader thread: " + this.currentElement.id + "."));
                }
                this.queues.sort.add(this.currentElement);
                this.currentElement = null;
                while (this.running && this.currentElement == null) {
                    try {
                        this.currentElement = this.queues.empty.take();
                    }
                    catch (InterruptedException iex) {
                        if (this.running) {
                            LOG.error((Object)"Reading thread was interrupted (without being shut down) while grabbing a buffer. Retrying to grab buffer...");
                            continue;
                        }
                        return;
                    }
                }
                if (!this.running) {
                    return;
                }
                this.currentBuffer = this.currentElement.buffer;
                if (!this.currentBuffer.isEmpty()) {
                    throw new RuntimeException("BUG: New sort-buffer is not empty.");
                }
                if (LOG.isDebugEnabled()) {
                    LOG.debug((Object)("Retrieved empty read buffer " + this.currentElement.id + "."));
                }
                if (!this.currentBuffer.write(record)) {
                    throw new RuntimeException("Record could not be written to empty sort-buffer: Serialized record exceeds buffer capacity.");
                }
            }
            catch (IOException ioex) {
                throw new RuntimeException("BUG: An error occurred while writing a record to the sort buffer: " + ioex.getMessage(), ioex);
            }
        }

        public void close() {
            if (this.running) {
                this.running = false;
                if (this.currentBuffer != null && this.currentElement != null) {
                    if (this.currentBuffer.isEmpty()) {
                        this.queues.empty.add(this.currentElement);
                    } else {
                        this.queues.sort.add(this.currentElement);
                        if (LOG.isDebugEnabled()) {
                            LOG.debug((Object)("Emitting last buffer from input collector: " + this.currentElement.id + "."));
                        }
                    }
                }
                this.currentBuffer = null;
                this.currentElement = null;
                this.queues.sort.add(UnilateralSortMerger.endMarker());
            }
        }
    }

    protected class SpillingThread
    extends ThreadBase<E> {
        protected final MemoryManager memManager;
        protected final IOManager ioManager;
        protected final TypeSerializer<E> serializer;
        protected final TypeComparator<E> comparator;
        protected final List<MemorySegment> writeMemory;
        protected final List<MemorySegment> sortReadMemory;
        protected final int maxNumFileHandles;
        protected final int numWriteBuffersToCluster;

        public SpillingThread(ExceptionHandler<IOException> exceptionHandler, CircularQueues<E> queues, AbstractInvokable parentTask, MemoryManager memManager, IOManager ioManager, TypeSerializer<E> serializer, TypeComparator<E> comparator, List<MemorySegment> sortReadMemory, List<MemorySegment> writeMemory, int maxNumFileHandles) {
            super(exceptionHandler, "SortMerger spilling thread", queues, parentTask);
            this.memManager = memManager;
            this.ioManager = ioManager;
            this.serializer = serializer;
            this.comparator = comparator;
            this.sortReadMemory = sortReadMemory;
            this.writeMemory = writeMemory;
            this.maxNumFileHandles = maxNumFileHandles;
            this.numWriteBuffersToCluster = writeMemory.size() >= 4 ? writeMemory.size() / 2 : 1;
        }

        /*
         * Exception decompiling
         */
        @Override
        public void go() throws IOException {
            /*
             * This method has failed to decompile.  When submitting a bug report, please provide this stack trace, and (if you hold appropriate legal rights) the relevant class file.
             * 
             * java.lang.NullPointerException: Cannot invoke "org.benf.cfr.reader.bytecode.analysis.types.BindingSuperContainer.getBoundAssignable(org.benf.cfr.reader.bytecode.analysis.types.JavaGenericRefTypeInstance, org.benf.cfr.reader.bytecode.analysis.types.JavaGenericRefTypeInstance)" because "maybeBindingContainer" is null
             *     at org.benf.cfr.reader.bytecode.analysis.types.GenericTypeBinder.extractBaseBindings(GenericTypeBinder.java:125)
             *     at org.benf.cfr.reader.bytecode.analysis.parse.rewriters.ExplicitTypeCallRewriter$InnerExplicitTypeCallRewriter.rewriteFunctionInvokation(ExplicitTypeCallRewriter.java:37)
             *     at org.benf.cfr.reader.bytecode.analysis.parse.rewriters.ExplicitTypeCallRewriter$InnerExplicitTypeCallRewriter.rewriteExpression(ExplicitTypeCallRewriter.java:56)
             *     at org.benf.cfr.reader.bytecode.analysis.parse.rewriters.ExpressionRewriterHelper.applyForwards(ExpressionRewriterHelper.java:12)
             *     at org.benf.cfr.reader.bytecode.analysis.parse.expression.AbstractMemberFunctionInvokation.applyExpressionRewriterToArgs(AbstractMemberFunctionInvokation.java:101)
             *     at org.benf.cfr.reader.bytecode.analysis.parse.rewriters.ExplicitTypeCallRewriter.rewriteExpression(ExplicitTypeCallRewriter.java:71)
             *     at org.benf.cfr.reader.bytecode.analysis.parse.statement.ExpressionStatement.rewriteExpressions(ExpressionStatement.java:40)
             *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op03SimpleStatement.rewrite(Op03SimpleStatement.java:479)
             *     at org.benf.cfr.reader.bytecode.analysis.opgraph.op3rewriters.Op03Rewriters.rewriteWith(Op03Rewriters.java:23)
             *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisInner(CodeAnalyser.java:819)
             *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisOrWrapFail(CodeAnalyser.java:278)
             *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysis(CodeAnalyser.java:201)
             *     at org.benf.cfr.reader.entities.attributes.AttributeCode.analyse(AttributeCode.java:94)
             *     at org.benf.cfr.reader.entities.Method.analyse(Method.java:531)
             *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1055)
             *     at org.benf.cfr.reader.entities.ClassFile.analyseInnerClassesPass1(ClassFile.java:923)
             *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1035)
             *     at org.benf.cfr.reader.entities.ClassFile.analyseTop(ClassFile.java:942)
             *     at org.benf.cfr.reader.Driver.doJarVersionTypes(Driver.java:257)
             *     at org.benf.cfr.reader.Driver.doJar(Driver.java:139)
             *     at org.benf.cfr.reader.CfrDriverImpl.analyse(CfrDriverImpl.java:76)
             *     at org.benf.cfr.reader.Main.main(Main.java:54)
             */
            throw new IllegalStateException("Decompilation failed");
        }

        protected final void disposeSortBuffers(boolean releaseMemory) {
            while (!this.queues.empty.isEmpty()) {
                try {
                    InMemorySorter sorter = this.queues.empty.take().buffer;
                    List<MemorySegment> sorterMem = sorter.dispose();
                    if (!releaseMemory) continue;
                    this.memManager.release(sorterMem);
                }
                catch (InterruptedException iex) {
                    if (this.isRunning()) {
                        LOG.error((Object)"Spilling thread was interrupted (without being shut down) while collecting empty buffers to release them. Retrying to collect buffers...");
                        continue;
                    }
                    return;
                }
            }
        }

        protected final CircularElement<E> takeNext(BlockingQueue<CircularElement<E>> queue, Queue<CircularElement<E>> cache) throws InterruptedException {
            return cache.isEmpty() ? queue.take() : cache.poll();
        }

        protected final MergeIterator<E> getMergingIterator(List<ChannelWithBlockCount> channelIDs, List<List<MemorySegment>> inputSegments, List<BlockChannelAccess<?, ?>> readerList) throws IOException {
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)("Performing merge of " + channelIDs.size() + " sorted streams."));
            }
            ArrayList iterators = new ArrayList(channelIDs.size());
            for (int i = 0; i < channelIDs.size(); ++i) {
                ChannelWithBlockCount channel = channelIDs.get(i);
                List<MemorySegment> segsForChannel = inputSegments.get(i);
                BlockChannelReader reader = segsForChannel.size() >= 4 ? this.ioManager.createBlockChannelReader(channel.getChannel(), segsForChannel.size() / 2) : this.ioManager.createBlockChannelReader(channel.getChannel());
                readerList.add(reader);
                this.registerOpenChannelToBeRemovedAtShudown(reader);
                this.unregisterChannelToBeRemovedAtShudown(channel.getChannel());
                ChannelReaderInputView inView = new ChannelReaderInputView(reader, segsForChannel, channel.getBlockCount(), false);
                iterators.add(new ChannelReaderInputViewIterator(inView, null, this.serializer));
            }
            return new MergeIterator(iterators, this.serializer, this.comparator);
        }

        protected final List<ChannelWithBlockCount> mergeChannelList(List<ChannelWithBlockCount> channelIDs, List<MemorySegment> allReadBuffers, List<MemorySegment> writeBuffers) throws IOException {
            double numMerges = Math.ceil((double)channelIDs.size() / (double)this.maxNumFileHandles);
            int channelsToMergePerStep = (int)Math.ceil((double)channelIDs.size() / numMerges);
            ArrayList<List<MemorySegment>> readBuffers = new ArrayList<List<MemorySegment>>(channelsToMergePerStep);
            this.getSegmentsForReaders(readBuffers, allReadBuffers, channelsToMergePerStep);
            ArrayList<ChannelWithBlockCount> mergedChannelIDs = new ArrayList<ChannelWithBlockCount>((int)(numMerges + 1.0));
            ArrayList<ChannelWithBlockCount> channelsToMergeThisStep = new ArrayList<ChannelWithBlockCount>(channelsToMergePerStep);
            int channelNum = 0;
            while (this.isRunning() && channelNum < channelIDs.size()) {
                channelsToMergeThisStep.clear();
                for (int i = 0; i < channelsToMergePerStep && channelNum < channelIDs.size(); ++i, ++channelNum) {
                    channelsToMergeThisStep.add(channelIDs.get(channelNum));
                }
                if (channelsToMergeThisStep.size() < 2) {
                    mergedChannelIDs.addAll(channelsToMergeThisStep);
                    continue;
                }
                mergedChannelIDs.add(this.mergeChannels(channelsToMergeThisStep, readBuffers, writeBuffers));
            }
            return mergedChannelIDs;
        }

        protected ChannelWithBlockCount mergeChannels(List<ChannelWithBlockCount> channelIDs, List<List<MemorySegment>> readBuffers, List<MemorySegment> writeBuffers) throws IOException {
            ArrayList channelAccesses = new ArrayList(channelIDs.size());
            MergeIterator<Object> mergeIterator = this.getMergingIterator(channelIDs, readBuffers, channelAccesses);
            Channel.ID mergedChannelID = this.ioManager.createChannel();
            this.registerChannelToBeRemovedAtShudown(mergedChannelID);
            BlockChannelWriter writer = this.ioManager.createBlockChannelWriter(mergedChannelID, this.numWriteBuffersToCluster);
            this.registerOpenChannelToBeRemovedAtShudown(writer);
            ChannelWriterOutputView output = new ChannelWriterOutputView(writer, writeBuffers, this.memManager.getPageSize());
            TypeSerializer serializer = this.serializer;
            Object rec = serializer.createInstance();
            while ((rec = mergeIterator.next(rec)) != null) {
                serializer.serialize(rec, (DataOutputView)output);
            }
            output.close();
            int numBlocksWritten = output.getBlockCount();
            this.unregisterOpenChannelToBeRemovedAtShudown(writer);
            for (int i = 0; i < channelAccesses.size(); ++i) {
                BlockChannelAccess access = (BlockChannelAccess)channelAccesses.get(i);
                access.closeAndDelete();
                this.unregisterOpenChannelToBeRemovedAtShudown(access);
            }
            return new ChannelWithBlockCount(mergedChannelID, numBlocksWritten);
        }

        protected final void getSegmentsForReaders(List<List<MemorySegment>> target, List<MemorySegment> memory, int numChannels) {
            int k;
            ArrayList<MemorySegment> segs;
            int i;
            int numBuffers = memory.size();
            int buffersPerChannelLowerBound = numBuffers / numChannels;
            int numChannelsWithOneMore = numBuffers % numChannels;
            Iterator<MemorySegment> segments = memory.iterator();
            for (i = 0; i < numChannelsWithOneMore; ++i) {
                segs = new ArrayList<MemorySegment>(buffersPerChannelLowerBound + 1);
                target.add(segs);
                for (k = buffersPerChannelLowerBound; k >= 0; --k) {
                    segs.add(segments.next());
                }
            }
            for (i = numChannelsWithOneMore; i < numChannels; ++i) {
                segs = new ArrayList(buffersPerChannelLowerBound);
                target.add(segs);
                for (k = buffersPerChannelLowerBound; k > 0; --k) {
                    segs.add(segments.next());
                }
            }
        }

        protected void registerChannelToBeRemovedAtShudown(Channel.ID channel) {
            UnilateralSortMerger.this.channelsToDeleteAtShutdown.add(channel);
        }

        protected void unregisterChannelToBeRemovedAtShudown(Channel.ID channel) {
            UnilateralSortMerger.this.channelsToDeleteAtShutdown.remove(channel);
        }

        protected void registerOpenChannelToBeRemovedAtShudown(BlockChannelAccess<?, ?> channel) {
            UnilateralSortMerger.this.openChannels.add(channel);
        }

        protected void unregisterOpenChannelToBeRemovedAtShudown(BlockChannelAccess<?, ?> channel) {
            UnilateralSortMerger.this.openChannels.remove(channel);
        }
    }

    protected static class SortingThread<E>
    extends ThreadBase<E> {
        private final IndexedSorter sorter = new QuickSort();

        public SortingThread(ExceptionHandler<IOException> exceptionHandler, CircularQueues<E> queues, AbstractInvokable parentTask) {
            super(exceptionHandler, "SortMerger sorting thread", queues, parentTask);
        }

        @Override
        public void go() throws IOException {
            boolean alive = true;
            while (this.isRunning() && alive) {
                CircularElement element = null;
                try {
                    element = this.queues.sort.take();
                }
                catch (InterruptedException iex) {
                    if (this.isRunning()) {
                        if (!LOG.isErrorEnabled()) continue;
                        LOG.error((Object)"Sorting thread was interrupted (without being shut down) while grabbing a buffer. Retrying to grab buffer...");
                        continue;
                    }
                    return;
                }
                if (element != EOF_MARKER && element != SPILLING_MARKER) {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug((Object)("Sorting buffer " + element.id + "."));
                    }
                    this.sorter.sort(element.buffer);
                    if (LOG.isDebugEnabled()) {
                        LOG.debug((Object)("Sorted buffer " + element.id + "."));
                    }
                } else if (element == EOF_MARKER) {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug((Object)"Sorting thread done.");
                    }
                    alive = false;
                }
                this.queues.spill.add(element);
            }
        }
    }

    protected static class ReadingThread<E>
    extends ThreadBase<E> {
        private final MutableObjectIterator<E> reader;
        private final long startSpillingBytes;
        private final E readTarget;

        public ReadingThread(ExceptionHandler<IOException> exceptionHandler, MutableObjectIterator<E> reader, CircularQueues<E> queues, E readTarget, AbstractInvokable parentTask, long startSpillingBytes) {
            super(exceptionHandler, "SortMerger Reading Thread", queues, parentTask);
            this.reader = reader;
            this.readTarget = readTarget;
            this.startSpillingBytes = startSpillingBytes;
        }

        @Override
        public void go() throws IOException {
            MutableObjectIterator<E> reader = this.reader;
            Object current = this.readTarget;
            Object leftoverRecord = null;
            CircularElement element = null;
            long bytesUntilSpilling = this.startSpillingBytes;
            boolean done = false;
            if (bytesUntilSpilling < 1L) {
                bytesUntilSpilling = 0L;
                this.queues.sort.add(UnilateralSortMerger.spillingMarker());
            }
            while (!done && this.isRunning()) {
                while (element == null) {
                    try {
                        element = this.queues.empty.take();
                    }
                    catch (InterruptedException iex) {
                        if (this.isRunning()) {
                            LOG.error((Object)"Reading thread was interrupted (without being shut down) while grabbing a buffer. Retrying to grab buffer...");
                            continue;
                        }
                        return;
                    }
                }
                InMemorySorter buffer = element.buffer;
                if (!buffer.isEmpty()) {
                    throw new IOException("New buffer is not empty.");
                }
                if (LOG.isDebugEnabled()) {
                    LOG.debug((Object)("Retrieved empty read buffer " + element.id + "."));
                }
                if (leftoverRecord != null) {
                    if (!buffer.write(leftoverRecord)) {
                        throw new IOException("Record could not be written to empty buffer: Serialized record exceeds buffer capacity.");
                    }
                    leftoverRecord = null;
                }
                boolean available = true;
                if (bytesUntilSpilling > 0L && buffer.getCapacity() >= bytesUntilSpilling) {
                    CircularElement SPILLING_MARKER;
                    Object newCurrent;
                    boolean fullBuffer = false;
                    while (this.isRunning() && (available = (newCurrent = reader.next(current)) != null)) {
                        current = newCurrent;
                        if (!buffer.write(current)) {
                            leftoverRecord = current;
                            fullBuffer = true;
                            break;
                        }
                        if (bytesUntilSpilling - buffer.getOccupancy() > 0L) continue;
                        bytesUntilSpilling = 0L;
                        SPILLING_MARKER = UnilateralSortMerger.spillingMarker();
                        this.queues.sort.add(SPILLING_MARKER);
                        break;
                    }
                    if (fullBuffer) {
                        if (bytesUntilSpilling > 0L && (bytesUntilSpilling -= buffer.getCapacity()) <= 0L) {
                            bytesUntilSpilling = 0L;
                            SPILLING_MARKER = UnilateralSortMerger.spillingMarker();
                            this.queues.sort.add(SPILLING_MARKER);
                        }
                        if (LOG.isDebugEnabled()) {
                            LOG.debug((Object)("Emitting full buffer from reader thread: " + element.id + "."));
                        }
                        this.queues.sort.add(element);
                        element = null;
                        continue;
                    }
                } else if (bytesUntilSpilling > 0L && (bytesUntilSpilling -= buffer.getCapacity()) <= 0L) {
                    bytesUntilSpilling = 0L;
                    CircularElement SPILLING_MARKER = UnilateralSortMerger.spillingMarker();
                    this.queues.sort.add(SPILLING_MARKER);
                }
                if (available) {
                    Object newCurrent;
                    while (this.isRunning() && (newCurrent = reader.next(current)) != null) {
                        current = newCurrent;
                        if (buffer.write(current)) continue;
                        leftoverRecord = current;
                        break;
                    }
                }
                if (leftoverRecord != null) {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug((Object)("Emitting full buffer from reader thread: " + element.id + "."));
                    }
                } else {
                    done = true;
                    if (LOG.isDebugEnabled()) {
                        LOG.debug((Object)("Emitting final buffer from reader thread: " + element.id + "."));
                    }
                }
                if (!buffer.isEmpty()) {
                    this.queues.sort.add(element);
                } else {
                    this.queues.empty.add(element);
                }
                element = null;
            }
            if (!this.isRunning()) {
                return;
            }
            CircularElement EOF_MARKER = UnilateralSortMerger.endMarker();
            this.queues.sort.add(EOF_MARKER);
            LOG.debug((Object)"Reading thread done.");
        }
    }

    protected static abstract class ThreadBase<E>
    extends Thread
    implements Thread.UncaughtExceptionHandler {
        protected final CircularQueues<E> queues;
        private final ExceptionHandler<IOException> exceptionHandler;
        private final AbstractInvokable parentTask;
        private volatile boolean alive;

        protected ThreadBase(ExceptionHandler<IOException> exceptionHandler, String name, CircularQueues<E> queues, AbstractInvokable parentTask) {
            super(name);
            this.setDaemon(true);
            this.exceptionHandler = exceptionHandler;
            this.setUncaughtExceptionHandler(this);
            this.queues = queues;
            this.parentTask = parentTask;
            this.alive = true;
        }

        @Override
        public void run() {
            try {
                if (this.parentTask != null) {
                    this.parentTask.userThreadStarted(this);
                }
                this.go();
            }
            catch (Throwable t) {
                this.internalHandleException(new IOException("Thread '" + this.getName() + "' terminated due to an exception: " + t.getMessage(), t));
            }
            finally {
                if (this.parentTask != null) {
                    this.parentTask.userThreadFinished(this);
                }
            }
        }

        protected abstract void go() throws IOException;

        public boolean isRunning() {
            return this.alive;
        }

        public void shutdown() {
            this.alive = false;
            this.interrupt();
        }

        protected final void internalHandleException(IOException ioex) {
            if (!this.isRunning()) {
                return;
            }
            if (this.exceptionHandler != null) {
                try {
                    this.exceptionHandler.handleException(ioex);
                }
                catch (Throwable throwable) {
                    // empty catch block
                }
            }
        }

        @Override
        public void uncaughtException(Thread t, Throwable e) {
            this.internalHandleException(new IOException("Thread '" + t.getName() + "' terminated due to an uncaught exception: " + e.getMessage(), e));
        }
    }

    protected static final class CircularQueues<E> {
        final BlockingQueue<CircularElement<E>> empty;
        final BlockingQueue<CircularElement<E>> sort;
        final BlockingQueue<CircularElement<E>> spill;

        public CircularQueues() {
            this.empty = new LinkedBlockingQueue<CircularElement<E>>();
            this.sort = new LinkedBlockingQueue<CircularElement<E>>();
            this.spill = new LinkedBlockingQueue<CircularElement<E>>();
        }

        public CircularQueues(int numElements) {
            this.empty = new ArrayBlockingQueue<CircularElement<E>>(numElements);
            this.sort = new ArrayBlockingQueue<CircularElement<E>>(numElements);
            this.spill = new ArrayBlockingQueue<CircularElement<E>>(numElements);
        }
    }

    protected static final class CircularElement<E> {
        final int id;
        final InMemorySorter<E> buffer;

        public CircularElement() {
            this.buffer = null;
            this.id = -1;
        }

        public CircularElement(int id, InMemorySorter<E> buffer) {
            this.id = id;
            this.buffer = buffer;
        }
    }
}

