/*
 * Decompiled with CFR 0.152.
 */
package eu.stratosphere.nephele.services.iomanager;

import eu.stratosphere.core.memory.MemorySegment;
import eu.stratosphere.nephele.services.iomanager.BlockChannelWriter;
import eu.stratosphere.nephele.services.memorymanager.AbstractPagedOutputView;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;

public final class ChannelWriterOutputView
extends AbstractPagedOutputView {
    protected static final short HEADER_MAGIC_NUMBER = -16130;
    protected static final int HEADER_LENGTH = 8;
    protected static final int HEADER_FLAGS_OFFSET = 2;
    protected static final int HEAD_BLOCK_LENGTH_OFFSET = 4;
    protected static final short FLAG_LAST_BLOCK = 1;
    private final BlockChannelWriter writer;
    private long bytesBeforeSegment;
    private int blockCount;
    private final int numSegments;

    public ChannelWriterOutputView(BlockChannelWriter writer, List<MemorySegment> memory, int segmentSize) {
        super(segmentSize, 8);
        if (writer == null) {
            throw new NullPointerException();
        }
        this.writer = writer;
        if (memory == null) {
            this.numSegments = 0;
        } else {
            this.numSegments = memory.size();
            LinkedBlockingQueue queue = (LinkedBlockingQueue)writer.getReturnQueue();
            for (int i = memory.size() - 1; i >= 0; --i) {
                MemorySegment seg = memory.get(i);
                if (seg.size() != segmentSize) {
                    throw new IllegalArgumentException("The supplied memory segments are not of the specified size.");
                }
                queue.add(seg);
            }
        }
        try {
            this.advance();
        }
        catch (IOException ioex) {
            throw new RuntimeException("BUG: IOException occurred while getting first block for ChannelWriterOutputView.", ioex);
        }
    }

    public ChannelWriterOutputView(BlockChannelWriter writer, int segmentSize) {
        this(writer, null, segmentSize);
    }

    public List<MemorySegment> close() throws IOException {
        this.writeSegment(this.getCurrentSegment(), this.getCurrentPositionInSegment(), true);
        this.clear();
        LinkedBlockingQueue queue = (LinkedBlockingQueue)this.writer.getReturnQueue();
        this.writer.close();
        ArrayList<MemorySegment> list = new ArrayList<MemorySegment>(this.numSegments);
        for (int i = 0; i < this.numSegments; ++i) {
            MemorySegment m = (MemorySegment)queue.poll();
            if (m == null) {
                throw new RuntimeException("ChannelWriterOutputView: MemorySegments have been taken from return queue by different actor.");
            }
            list.add(m);
        }
        return list;
    }

    public int getBlockCount() {
        return this.blockCount;
    }

    public long getBytesWritten() {
        return this.bytesBeforeSegment + (long)this.getCurrentPositionInSegment() - 8L;
    }

    public long getBytesMemoryUsed() {
        return (this.blockCount - 1) * this.getSegmentSize() + this.getCurrentPositionInSegment();
    }

    @Override
    protected final MemorySegment nextSegment(MemorySegment current, int posInSegment) throws IOException {
        if (current != null) {
            this.writeSegment(current, posInSegment, false);
        }
        MemorySegment next = this.writer.getNextReturnedSegment();
        ++this.blockCount;
        return next;
    }

    private final void writeSegment(MemorySegment segment, int writePosition, boolean lastSegment) throws IOException {
        segment.putShort(0, (short)-16130);
        segment.putShort(2, lastSegment ? (short)1 : 0);
        segment.putInt(4, writePosition);
        this.writer.writeBlock(segment);
        this.bytesBeforeSegment += (long)(writePosition - 8);
    }
}

