/*
 * Decompiled with CFR 0.152.
 */
package eu.stratosphere.pact.runtime.task;

import eu.stratosphere.api.common.io.InputFormat;
import eu.stratosphere.api.common.typeutils.TypeSerializer;
import eu.stratosphere.api.common.typeutils.TypeSerializerFactory;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.core.io.InputSplit;
import eu.stratosphere.nephele.execution.CancelTaskException;
import eu.stratosphere.nephele.execution.librarycache.LibraryCacheManager;
import eu.stratosphere.nephele.template.AbstractInvokable;
import eu.stratosphere.nephele.template.InputSplitProvider;
import eu.stratosphere.pact.runtime.shipping.OutputCollector;
import eu.stratosphere.pact.runtime.shipping.RecordOutputCollector;
import eu.stratosphere.pact.runtime.task.RegularPactTask;
import eu.stratosphere.pact.runtime.task.chaining.ChainedCollectorMapDriver;
import eu.stratosphere.pact.runtime.task.chaining.ChainedDriver;
import eu.stratosphere.pact.runtime.task.chaining.ExceptionInChainedStubException;
import eu.stratosphere.pact.runtime.task.util.TaskConfig;
import eu.stratosphere.runtime.io.api.BufferWriter;
import eu.stratosphere.types.Record;
import eu.stratosphere.util.Collector;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class DataSourceTask<OT>
extends AbstractInvokable {
    private static final Log LOG = LogFactory.getLog(DataSourceTask.class);
    private List<BufferWriter> eventualOutputs;
    private Collector<OT> output;
    private InputFormat<OT, InputSplit> format;
    private TypeSerializerFactory<OT> serializerFactory;
    private TaskConfig config;
    private ArrayList<ChainedDriver<?, ?>> chainedTasks;
    private ClassLoader userCodeClassLoader;
    private volatile boolean taskCanceled = false;

    @Override
    public void registerInputOutput() {
        this.initInputFormat();
        if (LOG.isDebugEnabled()) {
            LOG.debug((Object)this.getLogString("Start registering input and output"));
        }
        try {
            this.initOutputs(this.userCodeClassLoader);
        }
        catch (Exception ex) {
            throw new RuntimeException("The initialization of the DataSource's outputs caused an error: " + ex.getMessage(), ex);
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug((Object)this.getLogString("Finished registering input and output"));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void invoke() throws Exception {
        block32: {
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)this.getLogString("Starting data source operator"));
            }
            TypeSerializer serializer = this.serializerFactory.getSerializer();
            try {
                RegularPactTask.initOutputWriters(this.eventualOutputs);
                RegularPactTask.openChainedTasks(this.chainedTasks, this);
                Iterator<InputSplit> splitIterator = this.getInputSplits();
                while (!this.taskCanceled && splitIterator.hasNext()) {
                    InputSplit split = splitIterator.next();
                    Object record = serializer.createInstance();
                    if (LOG.isDebugEnabled()) {
                        LOG.debug((Object)this.getLogString("Opening input split " + split.toString()));
                    }
                    InputFormat<OT, InputSplit> format = this.format;
                    format.open(split);
                    if (LOG.isDebugEnabled()) {
                        LOG.debug((Object)this.getLogString("Starting to read input from split " + split.toString()));
                    }
                    try {
                        ChainedCollectorMapDriver output;
                        if (record.getClass() == Record.class) {
                            ChainedCollectorMapDriver output2;
                            Record typedRecord = (Record)record;
                            InputFormat<OT, InputSplit> inFormat = format;
                            if (this.output instanceof RecordOutputCollector) {
                                output2 = (ChainedCollectorMapDriver)this.output;
                                while (!this.taskCanceled && !inFormat.reachedEnd()) {
                                    typedRecord.clear();
                                    Record returnedRecord = null;
                                    returnedRecord = (Record)inFormat.nextRecord((Object)typedRecord);
                                    if (returnedRecord == null) continue;
                                    ((RecordOutputCollector)((Object)output2)).collect(returnedRecord);
                                }
                            } else if (this.output instanceof ChainedCollectorMapDriver) {
                                output2 = (ChainedCollectorMapDriver)this.output;
                                while (!this.taskCanceled && !inFormat.reachedEnd()) {
                                    typedRecord.clear();
                                    if ((typedRecord = (Record)inFormat.nextRecord((Object)typedRecord)) == null) continue;
                                    output2.collect(typedRecord);
                                }
                            } else {
                                output2 = this.output;
                                while (!this.taskCanceled && !inFormat.reachedEnd()) {
                                    typedRecord.clear();
                                    if ((typedRecord = (Record)inFormat.nextRecord((Object)typedRecord)) == null) continue;
                                    output2.collect(typedRecord);
                                }
                            }
                        } else if (this.output instanceof OutputCollector) {
                            output = (ChainedCollectorMapDriver)this.output;
                            while (!this.taskCanceled && !format.reachedEnd()) {
                                if ((record = format.nextRecord(record)) == null) continue;
                                ((OutputCollector)((Object)output)).collect(record);
                            }
                        } else if (this.output instanceof ChainedCollectorMapDriver) {
                            output = (ChainedCollectorMapDriver)this.output;
                            while (!this.taskCanceled && !format.reachedEnd()) {
                                if ((record = format.nextRecord(record)) == null) continue;
                                output.collect(record);
                            }
                        } else {
                            output = this.output;
                            while (!this.taskCanceled && !format.reachedEnd()) {
                                if ((record = format.nextRecord(record)) == null) continue;
                                output.collect(record);
                            }
                        }
                        if (!LOG.isDebugEnabled() || this.taskCanceled) continue;
                        LOG.debug((Object)this.getLogString("Closing input split " + split.toString()));
                    }
                    finally {
                        format.close();
                    }
                }
                this.output.close();
                RegularPactTask.closeChainedTasks(this.chainedTasks, this);
                RegularPactTask.reportAndClearAccumulators(this.getEnvironment(), new HashMap(), this.chainedTasks);
            }
            catch (Exception ex) {
                try {
                    this.format.close();
                }
                catch (Throwable throwable) {
                    // empty catch block
                }
                RegularPactTask.cancelChainedTasks(this.chainedTasks);
                ex = ExceptionInChainedStubException.exceptionUnwrap(ex);
                if (ex instanceof CancelTaskException) {
                    throw ex;
                }
                if (this.taskCanceled) break block32;
                RegularPactTask.logAndThrowException(ex, this);
            }
        }
        if (!this.taskCanceled) {
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)this.getLogString("Finished data source operator"));
            }
        } else if (LOG.isDebugEnabled()) {
            LOG.debug((Object)this.getLogString("Data source operator cancelled"));
        }
    }

    @Override
    public void cancel() throws Exception {
        this.taskCanceled = true;
        if (LOG.isDebugEnabled()) {
            LOG.debug((Object)this.getLogString("Cancelling data source operator"));
        }
    }

    public void setUserCodeClassLoader(ClassLoader cl) {
        this.userCodeClassLoader = cl;
    }

    private void initInputFormat() {
        if (this.userCodeClassLoader == null) {
            try {
                this.userCodeClassLoader = LibraryCacheManager.getClassLoader(this.getEnvironment().getJobID());
            }
            catch (IOException ioe) {
                throw new RuntimeException("Usercode ClassLoader could not be obtained for job: " + this.getEnvironment().getJobID(), ioe);
            }
        }
        Configuration taskConf = this.getTaskConfiguration();
        taskConf.setClassLoader(this.userCodeClassLoader);
        this.config = new TaskConfig(taskConf);
        try {
            this.format = (InputFormat)this.config.getStubWrapper(this.userCodeClassLoader).getUserCodeObject(InputFormat.class, this.userCodeClassLoader);
            if (!InputFormat.class.isAssignableFrom(this.format.getClass())) {
                throw new RuntimeException("The class '" + this.format.getClass().getName() + "' is not a subclass of '" + InputFormat.class.getName() + "' as is required.");
            }
        }
        catch (ClassCastException ccex) {
            throw new RuntimeException("The stub class is not a proper subclass of " + InputFormat.class.getName(), ccex);
        }
        try {
            this.format.configure(this.config.getStubParameters());
        }
        catch (Throwable t) {
            throw new RuntimeException("The user defined 'configure()' method caused an error: " + t.getMessage(), t);
        }
        this.serializerFactory = this.config.getOutputSerializer(this.userCodeClassLoader);
    }

    private void initOutputs(ClassLoader cl) throws Exception {
        this.chainedTasks = new ArrayList();
        this.eventualOutputs = new ArrayList<BufferWriter>();
        this.output = RegularPactTask.initOutputs(this, cl, this.config, this.chainedTasks, this.eventualOutputs);
    }

    private String getLogString(String message) {
        return this.getLogString(message, this.getEnvironment().getTaskName());
    }

    private String getLogString(String message, String taskName) {
        return RegularPactTask.constructLogString(message, taskName, this);
    }

    private Iterator<InputSplit> getInputSplits() {
        final InputSplitProvider provider = this.getEnvironment().getInputSplitProvider();
        return new Iterator<InputSplit>(){
            private InputSplit nextSplit;
            private boolean exhausted;

            @Override
            public boolean hasNext() {
                if (this.exhausted) {
                    return false;
                }
                if (this.nextSplit != null) {
                    return true;
                }
                InputSplit split = provider.getNextInputSplit();
                if (split != null) {
                    this.nextSplit = split;
                    return true;
                }
                this.exhausted = true;
                return false;
            }

            @Override
            public InputSplit next() {
                if (this.nextSplit == null && !this.hasNext()) {
                    throw new NoSuchElementException();
                }
                InputSplit tmp = this.nextSplit;
                this.nextSplit = null;
                return tmp;
            }

            @Override
            public void remove() {
                throw new UnsupportedOperationException();
            }
        };
    }
}

