/*
 * Decompiled with CFR 0.152.
 */
package eu.stratosphere.pact.runtime.resettable;

import eu.stratosphere.api.common.typeutils.TypeSerializer;
import eu.stratosphere.core.memory.DataInputView;
import eu.stratosphere.core.memory.DataOutputView;
import eu.stratosphere.core.memory.MemorySegment;
import eu.stratosphere.nephele.services.memorymanager.ListMemorySegmentSource;
import eu.stratosphere.nephele.services.memorymanager.MemoryAllocationException;
import eu.stratosphere.nephele.services.memorymanager.MemoryManager;
import eu.stratosphere.nephele.template.AbstractInvokable;
import eu.stratosphere.pact.runtime.io.RandomAccessInputView;
import eu.stratosphere.pact.runtime.io.SimpleCollectingOutputView;
import eu.stratosphere.pact.runtime.util.MemoryBlockIterator;
import java.io.EOFException;
import java.io.IOException;
import java.util.ArrayList;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

abstract class AbstractBlockResettableIterator<T>
implements MemoryBlockIterator {
    protected static final Log LOG = LogFactory.getLog(AbstractBlockResettableIterator.class);
    protected final RandomAccessInputView readView;
    protected final SimpleCollectingOutputView collectingView;
    protected final TypeSerializer<T> serializer;
    protected int numRecordsInBuffer;
    protected int numRecordsReturned;
    protected final ArrayList<MemorySegment> emptySegments;
    protected final ArrayList<MemorySegment> fullSegments;
    private final MemoryManager memoryManager;
    protected volatile boolean closed;

    protected AbstractBlockResettableIterator(TypeSerializer<T> serializer, MemoryManager memoryManager, int numPages, AbstractInvokable ownerTask) throws MemoryAllocationException {
        if (numPages < 1) {
            throw new IllegalArgumentException("Block Resettable iterator requires at leat one page of memory");
        }
        this.memoryManager = memoryManager;
        this.serializer = serializer;
        this.emptySegments = new ArrayList(numPages);
        this.fullSegments = new ArrayList(numPages);
        memoryManager.allocatePages(ownerTask, this.emptySegments, numPages);
        this.collectingView = new SimpleCollectingOutputView(this.fullSegments, new ListMemorySegmentSource(this.emptySegments), memoryManager.getPageSize());
        this.readView = new RandomAccessInputView(this.fullSegments, memoryManager.getPageSize());
        if (LOG.isDebugEnabled()) {
            LOG.debug((Object)("Iterator initalized using " + numPages + " memory buffers."));
        }
    }

    public void open() {
        if (LOG.isDebugEnabled()) {
            LOG.debug((Object)"Block Resettable Iterator opened.");
        }
    }

    public void reset() {
        if (this.closed) {
            throw new IllegalStateException("Iterator was closed.");
        }
        this.readView.setReadPosition(0L);
        this.numRecordsReturned = 0;
    }

    @Override
    public boolean nextBlock() throws IOException {
        this.numRecordsInBuffer = 0;
        for (int i = this.fullSegments.size() - 1; i >= 0; --i) {
            this.emptySegments.add(this.fullSegments.remove(i));
        }
        this.collectingView.reset();
        this.readView.setReadPosition(0L);
        return true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void close() {
        AbstractBlockResettableIterator abstractBlockResettableIterator = this;
        synchronized (abstractBlockResettableIterator) {
            if (this.closed) {
                return;
            }
            this.closed = true;
        }
        this.numRecordsInBuffer = 0;
        this.numRecordsReturned = 0;
        for (int i = this.fullSegments.size() - 1; i >= 0; --i) {
            this.emptySegments.add(this.fullSegments.remove(i));
        }
        this.memoryManager.release(this.emptySegments);
        this.emptySegments.clear();
        if (LOG.isDebugEnabled()) {
            LOG.debug((Object)"Block Resettable Iterator closed.");
        }
    }

    protected boolean writeNextRecord(T record) throws IOException {
        try {
            this.serializer.serialize(record, (DataOutputView)this.collectingView);
            ++this.numRecordsInBuffer;
            return true;
        }
        catch (EOFException eofex) {
            return false;
        }
    }

    protected T getNextRecord(T reuse) throws IOException {
        if (this.numRecordsReturned < this.numRecordsInBuffer) {
            ++this.numRecordsReturned;
            return (T)this.serializer.deserialize(reuse, (DataInputView)this.readView);
        }
        return null;
    }
}

