/*
 * Decompiled with CFR 0.152.
 */
package eu.stratosphere.client.program;

import com.google.common.base.Preconditions;
import eu.stratosphere.api.common.JobExecutionResult;
import eu.stratosphere.api.common.Plan;
import eu.stratosphere.api.java.ExecutionEnvironment;
import eu.stratosphere.api.java.operators.translation.JavaPlan;
import eu.stratosphere.client.program.ContextEnvironment;
import eu.stratosphere.client.program.JobWithJars;
import eu.stratosphere.client.program.PackagedProgram;
import eu.stratosphere.client.program.ProgramInvocationException;
import eu.stratosphere.compiler.CompilerException;
import eu.stratosphere.compiler.DataStatistics;
import eu.stratosphere.compiler.PactCompiler;
import eu.stratosphere.compiler.contextcheck.ContextChecker;
import eu.stratosphere.compiler.costs.CostEstimator;
import eu.stratosphere.compiler.costs.DefaultCostEstimator;
import eu.stratosphere.compiler.plan.OptimizedPlan;
import eu.stratosphere.compiler.plandump.PlanJSONDumpGenerator;
import eu.stratosphere.compiler.plantranslate.NepheleJobGraphGenerator;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.configuration.GlobalConfiguration;
import eu.stratosphere.core.fs.Path;
import eu.stratosphere.nephele.client.AbstractJobResult;
import eu.stratosphere.nephele.client.JobClient;
import eu.stratosphere.nephele.client.JobExecutionException;
import eu.stratosphere.nephele.client.JobSubmissionResult;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.PrintStream;
import java.net.InetSocketAddress;
import java.util.List;
import org.apache.commons.lang.StringEscapeUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class Client {
    private static final Log LOG = LogFactory.getLog(Client.class);
    private final Configuration configuration;
    private final PactCompiler compiler;
    private boolean printStatusDuringExecution;

    public Client(InetSocketAddress jobManagerAddress, Configuration config) {
        Preconditions.checkNotNull((Object)config, (Object)"Configuration is null");
        this.configuration = config;
        this.configuration.setString("jobmanager.rpc.address", jobManagerAddress.getAddress().getHostAddress());
        this.configuration.setInteger("jobmanager.rpc.port", jobManagerAddress.getPort());
        this.compiler = new PactCompiler(new DataStatistics(), (CostEstimator)new DefaultCostEstimator());
    }

    public Client(Configuration config) {
        Preconditions.checkNotNull((Object)config, (Object)"Configuration is null");
        this.configuration = config;
        String address = config.getString("jobmanager.rpc.address", null);
        if (address == null) {
            throw new CompilerException("Cannot find address to job manager's RPC service in the global configuration.");
        }
        int port = GlobalConfiguration.getInteger((String)"jobmanager.rpc.port", (int)6123);
        if (port < 0) {
            throw new CompilerException("Cannot find port to job manager's RPC service in the global configuration.");
        }
        this.compiler = new PactCompiler(new DataStatistics(), (CostEstimator)new DefaultCostEstimator());
    }

    public void setPrintStatusDuringExecution(boolean print) {
        this.printStatusDuringExecution = print;
    }

    public String getJobManagerAddress() {
        return this.configuration.getString("jobmanager.rpc.address", null);
    }

    public int getJobManagerPort() {
        return this.configuration.getInteger("jobmanager.rpc.port", -1);
    }

    public String getOptimizedPlanAsJson(PackagedProgram prog, int parallelism) throws CompilerException, ProgramInvocationException {
        PlanJSONDumpGenerator jsonGen = new PlanJSONDumpGenerator();
        return jsonGen.getOptimizerPlanAsJSON(this.getOptimizedPlan(prog, parallelism));
    }

    public OptimizedPlan getOptimizedPlan(PackagedProgram prog, int parallelism) throws CompilerException, ProgramInvocationException {
        Thread.currentThread().setContextClassLoader(prog.getUserCodeClassLoader());
        if (prog.isUsingProgramEntryPoint()) {
            return this.getOptimizedPlan(prog.getPlanWithJars(), parallelism);
        }
        if (prog.isUsingInteractiveMode()) {
            OptimizerPlanEnvironment env = new OptimizerPlanEnvironment(this.compiler);
            if (parallelism > 0) {
                env.setDegreeOfParallelism(parallelism);
            }
            env.setAsContext();
            PrintStream originalOut = System.out;
            PrintStream originalErr = System.err;
            ByteArrayOutputStream baos = new ByteArrayOutputStream();
            System.setOut(new PrintStream(baos));
            ByteArrayOutputStream baes = new ByteArrayOutputStream();
            System.setErr(new PrintStream(baes));
            try {
                ContextEnvironment.disableLocalExecution();
                prog.invokeInteractiveModeForExecution();
            }
            catch (ProgramInvocationException e) {
                throw e;
            }
            catch (Throwable t) {
                if (env.optimizerPlan != null) {
                    OptimizedPlan optimizedPlan = env.optimizerPlan;
                    return optimizedPlan;
                }
                throw new ProgramInvocationException("The program caused an error: ", t);
            }
            finally {
                System.setOut(originalOut);
                System.setErr(originalErr);
                System.err.println(baes);
                System.out.println(baos);
            }
            throw new ProgramInvocationException("The program plan could not be fetched. The program silently swallowed the control flow exceptions.\nSystem.err: " + StringEscapeUtils.escapeHtml((String)baes.toString()) + " \n" + "System.out: " + StringEscapeUtils.escapeHtml((String)baos.toString()) + " \n");
        }
        throw new RuntimeException();
    }

    public OptimizedPlan getOptimizedPlan(Plan p, int parallelism) throws CompilerException {
        if (parallelism > 0 && p.getDefaultParallelism() <= 0) {
            p.setDefaultParallelism(parallelism);
        }
        ContextChecker checker = new ContextChecker();
        checker.check(p);
        return this.compiler.compile(p);
    }

    public OptimizedPlan getOptimizedPlan(JobWithJars prog, int parallelism) throws CompilerException, ProgramInvocationException {
        return this.getOptimizedPlan(prog.getPlan(), parallelism);
    }

    public JobGraph getJobGraph(PackagedProgram prog, OptimizedPlan optPlan) throws ProgramInvocationException {
        return this.getJobGraph(optPlan, prog.getAllLibraries());
    }

    private JobGraph getJobGraph(OptimizedPlan optPlan, List<File> jarFiles) {
        NepheleJobGraphGenerator gen = new NepheleJobGraphGenerator();
        JobGraph job = gen.compileJobGraph(optPlan);
        for (File jar : jarFiles) {
            job.addJar(new Path(jar.getAbsolutePath()));
        }
        return job;
    }

    public JobExecutionResult run(final PackagedProgram prog, int parallelism, boolean wait) throws ProgramInvocationException {
        Thread.currentThread().setContextClassLoader(prog.getUserCodeClassLoader());
        if (prog.isUsingProgramEntryPoint()) {
            return this.run(prog.getPlanWithJars(), parallelism, wait);
        }
        if (prog.isUsingInteractiveMode()) {
            ContextEnvironment env = new ContextEnvironment(this, prog.getAllLibraries(), prog.getUserCodeClassLoader());
            if (parallelism > 0) {
                env.setDegreeOfParallelism(parallelism);
            }
            env.setAsContext();
            ContextEnvironment.disableLocalExecution();
            if (wait) {
                prog.invokeInteractiveModeForExecution();
            } else {
                Thread backGroundRunner = new Thread("Program Runner"){

                    @Override
                    public void run() {
                        try {
                            prog.invokeInteractiveModeForExecution();
                        }
                        catch (Throwable t) {
                            LOG.error((Object)"The program execution failed.", t);
                        }
                    }
                };
                backGroundRunner.start();
            }
            return null;
        }
        throw new RuntimeException();
    }

    public JobExecutionResult run(PackagedProgram prog, OptimizedPlan optimizedPlan, boolean wait) throws ProgramInvocationException {
        return this.run(optimizedPlan, prog.getAllLibraries(), wait);
    }

    public JobExecutionResult run(JobWithJars prog, int parallelism, boolean wait) throws CompilerException, ProgramInvocationException {
        return this.run(this.getOptimizedPlan(prog, parallelism), prog.getJarFiles(), wait);
    }

    public JobExecutionResult run(OptimizedPlan compiledPlan, List<File> libraries, boolean wait) throws ProgramInvocationException {
        JobGraph job = this.getJobGraph(compiledPlan, libraries);
        return this.run(job, wait);
    }

    public JobExecutionResult run(JobGraph jobGraph, boolean wait) throws ProgramInvocationException {
        JobClient client;
        try {
            client = new JobClient(jobGraph, this.configuration);
        }
        catch (IOException e) {
            throw new ProgramInvocationException("Could not open job manager: " + e.getMessage());
        }
        client.setConsoleStreamForReporting(this.printStatusDuringExecution ? System.out : null);
        try {
            if (wait) {
                return client.submitJobAndWait();
            }
            JobSubmissionResult result = client.submitJob();
            if (result.getReturnCode() != AbstractJobResult.ReturnCode.SUCCESS) {
                throw new ProgramInvocationException("The job was not successfully submitted to the nephele job manager" + (result.getDescription() == null ? "." : ": " + result.getDescription()));
            }
        }
        catch (IOException e) {
            throw new ProgramInvocationException("Could not submit job to job manager: " + e.getMessage());
        }
        catch (JobExecutionException jex) {
            if (jex.isJobCanceledByUser()) {
                throw new ProgramInvocationException("The program has been canceled");
            }
            throw new ProgramInvocationException("The program execution failed: " + jex.getMessage());
        }
        return new JobExecutionResult(-1L, null);
    }

    public static final class ProgramAbortException
    extends Error {
        private static final long serialVersionUID = 1L;
    }

    private static final class OptimizerPlanEnvironment
    extends ExecutionEnvironment {
        private final PactCompiler compiler;
        private OptimizedPlan optimizerPlan;

        private OptimizerPlanEnvironment(PactCompiler compiler) {
            this.compiler = compiler;
        }

        public JobExecutionResult execute(String jobName) throws Exception {
            JavaPlan plan = this.createProgramPlan(jobName);
            this.optimizerPlan = this.compiler.compile((Plan)plan);
            throw new ProgramAbortException();
        }

        public String getExecutionPlan() throws Exception {
            JavaPlan plan = this.createProgramPlan();
            this.optimizerPlan = this.compiler.compile((Plan)plan);
            throw new ProgramAbortException();
        }

        private void setAsContext() {
            OptimizerPlanEnvironment.initializeContextEnvironment((ExecutionEnvironment)this);
        }
    }
}

