/*
 * Decompiled with CFR 0.152.
 */
package eu.stratosphere.pact.runtime.task.chaining;

import eu.stratosphere.api.common.functions.Function;
import eu.stratosphere.api.common.functions.GenericFlatMap;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.template.AbstractInvokable;
import eu.stratosphere.pact.runtime.task.RegularPactTask;
import eu.stratosphere.pact.runtime.task.chaining.ChainedDriver;
import eu.stratosphere.pact.runtime.task.chaining.ExceptionInChainedStubException;

public class ChainedFlatMapDriver<IT, OT>
extends ChainedDriver<IT, OT> {
    private GenericFlatMap<IT, OT> mapper;

    @Override
    public void setup(AbstractInvokable parent) {
        GenericFlatMap mapper;
        this.mapper = mapper = RegularPactTask.instantiateUserCode(this.config, this.userCodeClassLoader, GenericFlatMap.class);
        mapper.setRuntimeContext(this.getUdfRuntimeContext());
    }

    @Override
    public void openTask() throws Exception {
        Configuration stubConfig = this.config.getStubParameters();
        RegularPactTask.openUserCode(this.mapper, stubConfig);
    }

    @Override
    public void closeTask() throws Exception {
        RegularPactTask.closeUserCode(this.mapper);
    }

    @Override
    public void cancelTask() {
        try {
            this.mapper.close();
        }
        catch (Throwable throwable) {
            // empty catch block
        }
    }

    @Override
    public Function getStub() {
        return this.mapper;
    }

    @Override
    public String getTaskName() {
        return this.taskName;
    }

    @Override
    public void collect(IT record) {
        try {
            this.mapper.flatMap(record, this.outputCollector);
        }
        catch (Exception ex) {
            throw new ExceptionInChainedStubException(this.taskName, ex);
        }
    }

    public void close() {
        this.outputCollector.close();
    }
}

