package org.apache.flink.streaming.runtime.io;

import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
import org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.plugable.DeserializationDelegate;
import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
import org.apache.flink.util.Preconditions;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/runtime/io/StreamTaskNetworkInput.class */
public final class StreamTaskNetworkInput implements StreamTaskInput {
    private final CheckpointedInputGate checkpointedInputGate;
    private final DeserializationDelegate<StreamElement> deserializationDelegate;
    private final RecordDeserializer<DeserializationDelegate<StreamElement>>[] recordDeserializers;
    private final int inputIndex;
    private int lastChannel = -1;
    private RecordDeserializer<DeserializationDelegate<StreamElement>> currentRecordDeserializer = null;
    private boolean isFinished = false;

    public StreamTaskNetworkInput(CheckpointedInputGate checkpointedInputGate, TypeSerializer<?> typeSerializer, IOManager iOManager, int i) {
        this.checkpointedInputGate = checkpointedInputGate;
        this.deserializationDelegate = new NonReusingDeserializationDelegate(new StreamElementSerializer(typeSerializer));
        this.recordDeserializers = new SpillingAdaptiveSpanningRecordDeserializer[checkpointedInputGate.getNumberOfInputChannels()];
        for (int i2 = 0; i2 < this.recordDeserializers.length; i2++) {
            this.recordDeserializers[i2] = new SpillingAdaptiveSpanningRecordDeserializer(iOManager.getSpillingDirectoriesPaths());
        }
        this.inputIndex = i;
    }

    @Nullable
    /* renamed from: pollNextNullable, reason: merged with bridge method [inline-methods] */
    public StreamElement m85pollNextNullable() throws Exception {
        while (true) {
            if (this.currentRecordDeserializer != null) {
                RecordDeserializer.DeserializationResult nextRecord = this.currentRecordDeserializer.getNextRecord(this.deserializationDelegate);
                if (nextRecord.isBufferConsumed()) {
                    this.currentRecordDeserializer.getCurrentBuffer().recycleBuffer();
                    this.currentRecordDeserializer = null;
                }
                if (nextRecord.isFullRecord()) {
                    return (StreamElement) this.deserializationDelegate.getInstance();
                }
            }
            Optional<BufferOrEvent> pollNext = this.checkpointedInputGate.pollNext();
            if (!pollNext.isPresent()) {
                if (!this.checkpointedInputGate.isFinished()) {
                    return null;
                }
                this.isFinished = true;
                Preconditions.checkState(this.checkpointedInputGate.isAvailable().isDone(), "Finished BarrierHandler should be available");
                if (this.checkpointedInputGate.isEmpty()) {
                    return null;
                }
                throw new IllegalStateException("Trailing data in checkpoint barrier handler.");
            }
            processBufferOrEvent(pollNext.get());
        }
    }

    private void processBufferOrEvent(BufferOrEvent bufferOrEvent) throws IOException {
        if (bufferOrEvent.isBuffer()) {
            this.lastChannel = bufferOrEvent.getChannelIndex();
            this.currentRecordDeserializer = this.recordDeserializers[this.lastChannel];
            this.currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer());
        } else {
            AbstractEvent event = bufferOrEvent.getEvent();
            if (event.getClass() != EndOfPartitionEvent.class) {
                throw new IOException("Unexpected event: " + event);
            }
        }
    }

    @Override // org.apache.flink.streaming.runtime.io.StreamTaskInput
    public int getLastChannel() {
        return this.lastChannel;
    }

    @Override // org.apache.flink.streaming.runtime.io.StreamTaskInput
    public int getInputIndex() {
        return this.inputIndex;
    }

    public boolean isFinished() {
        return this.isFinished;
    }

    public CompletableFuture<?> isAvailable() {
        return this.currentRecordDeserializer != null ? AVAILABLE : this.checkpointedInputGate.isAvailable();
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        for (RecordDeserializer<DeserializationDelegate<StreamElement>> recordDeserializer : this.recordDeserializers) {
            Buffer currentBuffer = recordDeserializer.getCurrentBuffer();
            if (currentBuffer != null && !currentBuffer.isRecycled()) {
                currentBuffer.recycleBuffer();
            }
            recordDeserializer.clear();
        }
        this.checkpointedInputGate.cleanup();
    }
}
