/*
 * Decompiled with CFR 0.152.
 */
package eu.stratosphere.nephele.client;

import eu.stratosphere.api.common.JobExecutionResult;
import eu.stratosphere.api.common.accumulators.AccumulatorHelper;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.client.AbstractJobResult;
import eu.stratosphere.nephele.client.JobCancelResult;
import eu.stratosphere.nephele.client.JobExecutionException;
import eu.stratosphere.nephele.client.JobProgressResult;
import eu.stratosphere.nephele.client.JobSubmissionResult;
import eu.stratosphere.nephele.event.job.AbstractEvent;
import eu.stratosphere.nephele.event.job.JobEvent;
import eu.stratosphere.nephele.ipc.RPC;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.nephele.jobgraph.JobStatus;
import eu.stratosphere.nephele.net.NetUtils;
import eu.stratosphere.nephele.protocols.AccumulatorProtocol;
import eu.stratosphere.nephele.protocols.JobManagementProtocol;
import eu.stratosphere.nephele.services.accumulators.AccumulatorEvent;
import eu.stratosphere.nephele.types.IntegerRecord;
import eu.stratosphere.util.StringUtils;
import java.io.IOException;
import java.io.PrintStream;
import java.net.InetSocketAddress;
import java.util.Iterator;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class JobClient {
    private static final Log LOG = LogFactory.getLog(JobClient.class);
    private final JobManagementProtocol jobSubmitClient;
    private AccumulatorProtocol accumulatorProtocolProxy;
    private final JobGraph jobGraph;
    private final Configuration configuration;
    private final JobCleanUp jobCleanUp;
    private long lastProcessedEventSequenceNumber = -1L;
    private PrintStream console;

    public JobClient(JobGraph jobGraph) throws IOException {
        this(jobGraph, new Configuration());
    }

    public JobClient(JobGraph jobGraph, Configuration configuration) throws IOException {
        String address = configuration.getString("jobmanager.rpc.address", null);
        int port = configuration.getInteger("jobmanager.rpc.port", 6123);
        InetSocketAddress inetaddr = new InetSocketAddress(address, port);
        this.jobSubmitClient = RPC.getProxy(JobManagementProtocol.class, inetaddr, NetUtils.getSocketFactory());
        this.accumulatorProtocolProxy = RPC.getProxy(AccumulatorProtocol.class, inetaddr, NetUtils.getSocketFactory());
        this.jobGraph = jobGraph;
        this.configuration = configuration;
        this.jobCleanUp = new JobCleanUp(this);
    }

    public JobClient(JobGraph jobGraph, Configuration configuration, InetSocketAddress jobManagerAddress) throws IOException {
        this.jobSubmitClient = RPC.getProxy(JobManagementProtocol.class, jobManagerAddress, NetUtils.getSocketFactory());
        this.jobGraph = jobGraph;
        this.configuration = configuration;
        this.jobCleanUp = new JobCleanUp(this);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void close() {
        Object object = this.jobSubmitClient;
        synchronized (object) {
            RPC.stopProxy(this.jobSubmitClient);
        }
        object = this.accumulatorProtocolProxy;
        synchronized (object) {
            RPC.stopProxy(this.accumulatorProtocolProxy);
        }
    }

    public Configuration getConfiguration() {
        return this.configuration;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public JobSubmissionResult submitJob() throws IOException {
        JobManagementProtocol jobManagementProtocol = this.jobSubmitClient;
        synchronized (jobManagementProtocol) {
            return this.jobSubmitClient.submitJob(this.jobGraph);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public JobCancelResult cancelJob() throws IOException {
        JobManagementProtocol jobManagementProtocol = this.jobSubmitClient;
        synchronized (jobManagementProtocol) {
            return this.jobSubmitClient.cancelJob(this.jobGraph.getJobID());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public JobProgressResult getJobProgress() throws IOException {
        JobManagementProtocol jobManagementProtocol = this.jobSubmitClient;
        synchronized (jobManagementProtocol) {
            return this.jobSubmitClient.getJobProgress(this.jobGraph.getJobID());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public JobExecutionResult submitJobAndWait() throws IOException, JobExecutionException {
        JobManagementProtocol jobManagementProtocol = this.jobSubmitClient;
        synchronized (jobManagementProtocol) {
            JobSubmissionResult submissionResult = this.jobSubmitClient.submitJob(this.jobGraph);
            if (submissionResult.getReturnCode() == AbstractJobResult.ReturnCode.ERROR) {
                LOG.error((Object)("ERROR: " + submissionResult.getDescription()));
                throw new JobExecutionException(submissionResult.getDescription(), false);
            }
            Runtime.getRuntime().addShutdownHook(this.jobCleanUp);
        }
        long sleep = 0L;
        try {
            IntegerRecord interval = this.jobSubmitClient.getRecommendedPollingInterval();
            sleep = interval.getValue() * 1000;
        }
        catch (IOException ioe) {
            Runtime.getRuntime().removeShutdownHook(this.jobCleanUp);
            throw ioe;
        }
        try {
            Thread.sleep(sleep / 2L);
        }
        catch (InterruptedException e) {
            Runtime.getRuntime().removeShutdownHook(this.jobCleanUp);
            this.logErrorAndRethrow(StringUtils.stringifyException((Throwable)e));
        }
        long startTimestamp = -1L;
        while (true) {
            if (Thread.interrupted()) {
                this.logErrorAndRethrow("Job client has been interrupted");
            }
            JobProgressResult jobProgressResult = null;
            try {
                jobProgressResult = this.getJobProgress();
            }
            catch (IOException ioe) {
                Runtime.getRuntime().removeShutdownHook(this.jobCleanUp);
                throw ioe;
            }
            if (jobProgressResult == null) {
                this.logErrorAndRethrow("Returned job progress is unexpectedly null!");
            }
            if (jobProgressResult.getReturnCode() == AbstractJobResult.ReturnCode.ERROR) {
                this.logErrorAndRethrow("Could not retrieve job progress: " + jobProgressResult.getDescription());
            }
            Iterator<AbstractEvent> it = jobProgressResult.getEvents();
            while (it.hasNext()) {
                AbstractEvent event = it.next();
                if (this.lastProcessedEventSequenceNumber >= event.getSequenceNumber()) continue;
                LOG.info((Object)event.toString());
                if (this.console != null) {
                    this.console.println(event.toString());
                }
                this.lastProcessedEventSequenceNumber = event.getSequenceNumber();
                if (!(event instanceof JobEvent)) continue;
                JobEvent jobEvent = (JobEvent)event;
                JobStatus jobStatus = jobEvent.getCurrentJobStatus();
                if (jobStatus == JobStatus.SCHEDULED) {
                    startTimestamp = jobEvent.getTimestamp();
                }
                if (jobStatus == JobStatus.FINISHED) {
                    Runtime.getRuntime().removeShutdownHook(this.jobCleanUp);
                    long jobDuration = jobEvent.getTimestamp() - startTimestamp;
                    Map accumulators = null;
                    try {
                        accumulators = AccumulatorHelper.toResultMap(this.getAccumulators().getAccumulators());
                    }
                    catch (IOException ioe) {
                        Runtime.getRuntime().removeShutdownHook(this.jobCleanUp);
                        throw ioe;
                    }
                    return new JobExecutionResult(jobDuration, accumulators);
                }
                if (jobStatus != JobStatus.CANCELED && jobStatus != JobStatus.FAILED) continue;
                Runtime.getRuntime().removeShutdownHook(this.jobCleanUp);
                LOG.info((Object)jobEvent.getOptionalMessage());
                if (jobStatus == JobStatus.CANCELED) {
                    throw new JobExecutionException(jobEvent.getOptionalMessage(), true);
                }
                throw new JobExecutionException(jobEvent.getOptionalMessage(), false);
            }
            try {
                Thread.sleep(sleep);
                continue;
            }
            catch (InterruptedException e) {
                this.logErrorAndRethrow(StringUtils.stringifyException((Throwable)e));
                continue;
            }
            break;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public int getRecommendedPollingInterval() throws IOException {
        JobManagementProtocol jobManagementProtocol = this.jobSubmitClient;
        synchronized (jobManagementProtocol) {
            return this.jobSubmitClient.getRecommendedPollingInterval().getValue();
        }
    }

    private void logErrorAndRethrow(String errorMessage) throws IOException {
        LOG.error((Object)errorMessage);
        throw new IOException(errorMessage);
    }

    public void setConsoleStreamForReporting(PrintStream stream) {
        this.console = stream;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private AccumulatorEvent getAccumulators() throws IOException {
        JobManagementProtocol jobManagementProtocol = this.jobSubmitClient;
        synchronized (jobManagementProtocol) {
            return this.accumulatorProtocolProxy.getAccumulatorResults(this.jobGraph.getJobID());
        }
    }

    public static class JobCleanUp
    extends Thread {
        private final JobClient jobClient;

        public JobCleanUp(JobClient jobClient) {
            this.jobClient = jobClient;
        }

        @Override
        public void run() {
            this.jobClient.close();
        }
    }
}

