package eu.stratosphere.pact.runtime.task;

import eu.stratosphere.api.common.io.InputFormat;
import eu.stratosphere.api.common.typeutils.TypeSerializer;
import eu.stratosphere.api.common.typeutils.TypeSerializerFactory;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.core.io.InputSplit;
import eu.stratosphere.nephele.execution.CancelTaskException;
import eu.stratosphere.nephele.execution.librarycache.LibraryCacheManager;
import eu.stratosphere.nephele.template.AbstractInvokable;
import eu.stratosphere.nephele.template.InputSplitProvider;
import eu.stratosphere.pact.runtime.shipping.OutputCollector;
import eu.stratosphere.pact.runtime.shipping.RecordOutputCollector;
import eu.stratosphere.pact.runtime.task.chaining.ChainedCollectorMapDriver;
import eu.stratosphere.pact.runtime.task.chaining.ChainedDriver;
import eu.stratosphere.pact.runtime.task.chaining.ExceptionInChainedStubException;
import eu.stratosphere.pact.runtime.task.util.TaskConfig;
import eu.stratosphere.runtime.io.api.BufferWriter;
import eu.stratosphere.types.Record;
import eu.stratosphere.util.Collector;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:eu/stratosphere/pact/runtime/task/DataSourceTask.class */
public class DataSourceTask<OT> extends AbstractInvokable {
    private static final Log LOG = LogFactory.getLog(DataSourceTask.class);
    private List<BufferWriter> eventualOutputs;
    private Collector<OT> output;
    private InputFormat<OT, InputSplit> format;
    private TypeSerializerFactory<OT> serializerFactory;
    private TaskConfig config;
    private ArrayList<ChainedDriver<?, ?>> chainedTasks;
    private ClassLoader userCodeClassLoader;
    private volatile boolean taskCanceled = false;

    @Override // eu.stratosphere.nephele.template.AbstractInvokable
    public void registerInputOutput() {
        initInputFormat();
        if (LOG.isDebugEnabled()) {
            LOG.debug(getLogString("Start registering input and output"));
        }
        try {
            initOutputs(this.userCodeClassLoader);
            if (LOG.isDebugEnabled()) {
                LOG.debug(getLogString("Finished registering input and output"));
            }
        } catch (Exception e) {
            throw new RuntimeException("The initialization of the DataSource's outputs caused an error: " + e.getMessage(), e);
        }
    }

    /* JADX WARN: Finally extract failed */
    @Override // eu.stratosphere.nephele.template.AbstractInvokable
    public void invoke() throws Exception {
        if (LOG.isDebugEnabled()) {
            LOG.debug(getLogString("Starting data source operator"));
        }
        TypeSerializer serializer = this.serializerFactory.getSerializer();
        try {
            RegularPactTask.initOutputWriters(this.eventualOutputs);
            RegularPactTask.openChainedTasks(this.chainedTasks, this);
            Iterator<InputSplit> inputSplits = getInputSplits();
            while (!this.taskCanceled && inputSplits.hasNext()) {
                InputSplit next = inputSplits.next();
                Object createInstance = serializer.createInstance();
                if (LOG.isDebugEnabled()) {
                    LOG.debug(getLogString("Opening input split " + next.toString()));
                }
                InputFormat<OT, InputSplit> inputFormat = this.format;
                inputFormat.open(next);
                if (LOG.isDebugEnabled()) {
                    LOG.debug(getLogString("Starting to read input from split " + next.toString()));
                }
                try {
                    if (createInstance.getClass() == Record.class) {
                        Record record = (Record) createInstance;
                        if (this.output instanceof RecordOutputCollector) {
                            RecordOutputCollector recordOutputCollector = (RecordOutputCollector) this.output;
                            while (!this.taskCanceled && !inputFormat.reachedEnd()) {
                                record.clear();
                                Record record2 = (Record) inputFormat.nextRecord(record);
                                if (record2 != null) {
                                    recordOutputCollector.collect(record2);
                                }
                            }
                        } else if (this.output instanceof ChainedCollectorMapDriver) {
                            ChainedCollectorMapDriver chainedCollectorMapDriver = (ChainedCollectorMapDriver) this.output;
                            while (!this.taskCanceled && !inputFormat.reachedEnd()) {
                                record.clear();
                                Record record3 = (Record) inputFormat.nextRecord(record);
                                record = record3;
                                if (record3 != null) {
                                    chainedCollectorMapDriver.collect(record);
                                }
                            }
                        } else {
                            Collector<OT> collector = this.output;
                            while (!this.taskCanceled && !inputFormat.reachedEnd()) {
                                record.clear();
                                Record record4 = (Record) inputFormat.nextRecord(record);
                                record = record4;
                                if (record4 != null) {
                                    collector.collect(record);
                                }
                            }
                        }
                    } else if (this.output instanceof OutputCollector) {
                        OutputCollector outputCollector = (OutputCollector) this.output;
                        while (!this.taskCanceled && !inputFormat.reachedEnd()) {
                            Object nextRecord = inputFormat.nextRecord(createInstance);
                            createInstance = nextRecord;
                            if (nextRecord != null) {
                                outputCollector.collect(createInstance);
                            }
                        }
                    } else if (this.output instanceof ChainedCollectorMapDriver) {
                        ChainedCollectorMapDriver chainedCollectorMapDriver2 = (ChainedCollectorMapDriver) this.output;
                        while (!this.taskCanceled && !inputFormat.reachedEnd()) {
                            Object nextRecord2 = inputFormat.nextRecord(createInstance);
                            createInstance = nextRecord2;
                            if (nextRecord2 != null) {
                                chainedCollectorMapDriver2.collect(createInstance);
                            }
                        }
                    } else {
                        Collector<OT> collector2 = this.output;
                        while (!this.taskCanceled && !inputFormat.reachedEnd()) {
                            Object nextRecord3 = inputFormat.nextRecord(createInstance);
                            createInstance = nextRecord3;
                            if (nextRecord3 != null) {
                                collector2.collect(createInstance);
                            }
                        }
                    }
                    if (LOG.isDebugEnabled() && !this.taskCanceled) {
                        LOG.debug(getLogString("Closing input split " + next.toString()));
                    }
                    inputFormat.close();
                } catch (Throwable th) {
                    inputFormat.close();
                    throw th;
                }
            }
            this.output.close();
            RegularPactTask.closeChainedTasks(this.chainedTasks, this);
            RegularPactTask.reportAndClearAccumulators(getEnvironment(), new HashMap(), this.chainedTasks);
        } catch (Exception e) {
            try {
                this.format.close();
            } catch (Throwable th2) {
            }
            RegularPactTask.cancelChainedTasks(this.chainedTasks);
            Exception exceptionUnwrap = ExceptionInChainedStubException.exceptionUnwrap(e);
            if (exceptionUnwrap instanceof CancelTaskException) {
                throw exceptionUnwrap;
            }
            if (!this.taskCanceled) {
                RegularPactTask.logAndThrowException(exceptionUnwrap, this);
            }
        }
        if (this.taskCanceled) {
            if (LOG.isDebugEnabled()) {
                LOG.debug(getLogString("Data source operator cancelled"));
            }
        } else if (LOG.isDebugEnabled()) {
            LOG.debug(getLogString("Finished data source operator"));
        }
    }

    @Override // eu.stratosphere.nephele.template.AbstractInvokable
    public void cancel() throws Exception {
        this.taskCanceled = true;
        if (LOG.isDebugEnabled()) {
            LOG.debug(getLogString("Cancelling data source operator"));
        }
    }

    public void setUserCodeClassLoader(ClassLoader classLoader) {
        this.userCodeClassLoader = classLoader;
    }

    private void initInputFormat() {
        if (this.userCodeClassLoader == null) {
            try {
                this.userCodeClassLoader = LibraryCacheManager.getClassLoader(getEnvironment().getJobID());
            } catch (IOException e) {
                throw new RuntimeException("Usercode ClassLoader could not be obtained for job: " + getEnvironment().getJobID(), e);
            }
        }
        Configuration taskConfiguration = getTaskConfiguration();
        taskConfiguration.setClassLoader(this.userCodeClassLoader);
        this.config = new TaskConfig(taskConfiguration);
        try {
            this.format = (InputFormat) this.config.getStubWrapper(this.userCodeClassLoader).getUserCodeObject(InputFormat.class, this.userCodeClassLoader);
            if (!InputFormat.class.isAssignableFrom(this.format.getClass())) {
                throw new RuntimeException("The class '" + this.format.getClass().getName() + "' is not a subclass of '" + InputFormat.class.getName() + "' as is required.");
            }
            try {
                this.format.configure(this.config.getStubParameters());
                this.serializerFactory = this.config.getOutputSerializer(this.userCodeClassLoader);
            } catch (Throwable th) {
                throw new RuntimeException("The user defined 'configure()' method caused an error: " + th.getMessage(), th);
            }
        } catch (ClassCastException e2) {
            throw new RuntimeException("The stub class is not a proper subclass of " + InputFormat.class.getName(), e2);
        }
    }

    private void initOutputs(ClassLoader classLoader) throws Exception {
        this.chainedTasks = new ArrayList<>();
        this.eventualOutputs = new ArrayList();
        this.output = RegularPactTask.initOutputs(this, classLoader, this.config, this.chainedTasks, this.eventualOutputs);
    }

    private String getLogString(String str) {
        return getLogString(str, getEnvironment().getTaskName());
    }

    private String getLogString(String str, String str2) {
        return RegularPactTask.constructLogString(str, str2, this);
    }

    private Iterator<InputSplit> getInputSplits() {
        final InputSplitProvider inputSplitProvider = getEnvironment().getInputSplitProvider();
        return new Iterator<InputSplit>() { // from class: eu.stratosphere.pact.runtime.task.DataSourceTask.1
            private InputSplit nextSplit;
            private boolean exhausted;

            @Override // java.util.Iterator
            public boolean hasNext() {
                if (this.exhausted) {
                    return false;
                }
                if (this.nextSplit != null) {
                    return true;
                }
                InputSplit nextInputSplit = inputSplitProvider.getNextInputSplit();
                if (nextInputSplit != null) {
                    this.nextSplit = nextInputSplit;
                    return true;
                }
                this.exhausted = true;
                return false;
            }

            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.Iterator
            public InputSplit next() {
                if (this.nextSplit == null && !hasNext()) {
                    throw new NoSuchElementException();
                }
                InputSplit inputSplit = this.nextSplit;
                this.nextSplit = null;
                return inputSplit;
            }

            @Override // java.util.Iterator
            public void remove() {
                throw new UnsupportedOperationException();
            }
        };
    }
}
