/*
 * Decompiled with CFR 0.152.
 */
package eu.stratosphere.nephele.taskmanager;

import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.execution.Environment;
import eu.stratosphere.nephele.execution.ExecutionListener;
import eu.stratosphere.nephele.execution.ExecutionObserver;
import eu.stratosphere.nephele.execution.ExecutionState;
import eu.stratosphere.nephele.execution.ExecutionStateTransition;
import eu.stratosphere.nephele.execution.RuntimeEnvironment;
import eu.stratosphere.nephele.executiongraph.ExecutionVertexID;
import eu.stratosphere.nephele.jobgraph.JobID;
import eu.stratosphere.nephele.profiling.TaskManagerProfiler;
import eu.stratosphere.nephele.services.memorymanager.MemoryManager;
import eu.stratosphere.nephele.taskmanager.TaskManager;
import java.util.Iterator;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public final class Task
implements ExecutionObserver {
    private static final Log LOG = LogFactory.getLog(Task.class);
    private final ExecutionVertexID vertexID;
    private final RuntimeEnvironment environment;
    private final TaskManager taskManager;
    private final AtomicBoolean canceled = new AtomicBoolean(false);
    private volatile ExecutionState executionState = ExecutionState.STARTING;
    private Queue<ExecutionListener> registeredListeners = new ConcurrentLinkedQueue<ExecutionListener>();

    public Task(ExecutionVertexID vertexID, RuntimeEnvironment environment, TaskManager taskManager) {
        this.vertexID = vertexID;
        this.environment = environment;
        this.taskManager = taskManager;
        this.environment.setExecutionObserver(this);
    }

    public JobID getJobID() {
        return this.environment.getJobID();
    }

    public ExecutionVertexID getVertexID() {
        return this.vertexID;
    }

    public Environment getEnvironment() {
        return this.environment;
    }

    public void markAsFailed() {
        this.executionStateChanged(ExecutionState.FAILED, "Execution thread died unexpectedly");
    }

    public void cancelExecution() {
        this.cancelOrKillExecution();
    }

    public void killExecution() {
        this.cancelOrKillExecution();
    }

    private void cancelOrKillExecution() {
        if (!this.canceled.compareAndSet(false, true)) {
            return;
        }
        if (this.executionState != ExecutionState.RUNNING && this.executionState != ExecutionState.FINISHING) {
            return;
        }
        this.executionStateChanged(ExecutionState.CANCELING, null);
        try {
            this.environment.cancelExecution();
        }
        catch (Throwable e) {
            LOG.error((Object)"Error while cancelling the task.", e);
        }
    }

    public boolean isTerminated() {
        Thread executingThread = this.environment.getExecutingThread();
        return executingThread.getState() == Thread.State.TERMINATED;
    }

    public void startExecution() {
        Thread thread = this.environment.getExecutingThread();
        thread.start();
    }

    public void registerProfiler(TaskManagerProfiler taskManagerProfiler, Configuration jobConfiguration) {
        taskManagerProfiler.registerExecutionListener(this, jobConfiguration);
    }

    public void unregisterMemoryManager(MemoryManager memoryManager) {
        if (memoryManager != null) {
            memoryManager.releaseAll(this.environment.getInvokable());
        }
    }

    public void unregisterProfiler(TaskManagerProfiler taskManagerProfiler) {
        if (taskManagerProfiler != null) {
            taskManagerProfiler.unregisterExecutionListener(this.vertexID);
        }
    }

    public ExecutionState getExecutionState() {
        return this.executionState;
    }

    @Override
    public void executionStateChanged(ExecutionState newExecutionState, String optionalMessage) {
        ExecutionStateTransition.checkTransition(false, this.getTaskName(), this.executionState, newExecutionState);
        if (newExecutionState == ExecutionState.FAILED) {
            LOG.error((Object)optionalMessage);
        }
        Iterator it = this.registeredListeners.iterator();
        while (it.hasNext()) {
            ((ExecutionListener)it.next()).executionStateChanged(this.environment.getJobID(), this.vertexID, newExecutionState, optionalMessage);
        }
        this.executionState = newExecutionState;
        this.taskManager.executionStateChanged(this.environment.getJobID(), this.vertexID, newExecutionState, optionalMessage);
    }

    private String getTaskName() {
        return this.environment.getTaskName() + " (" + (this.environment.getIndexInSubtaskGroup() + 1) + "/" + this.environment.getCurrentNumberOfSubtasks() + ")";
    }

    @Override
    public void userThreadStarted(Thread userThread) {
        Iterator it = this.registeredListeners.iterator();
        while (it.hasNext()) {
            ((ExecutionListener)it.next()).userThreadStarted(this.environment.getJobID(), this.vertexID, userThread);
        }
    }

    @Override
    public void userThreadFinished(Thread userThread) {
        Iterator it = this.registeredListeners.iterator();
        while (it.hasNext()) {
            ((ExecutionListener)it.next()).userThreadFinished(this.environment.getJobID(), this.vertexID, userThread);
        }
    }

    public void registerExecutionListener(ExecutionListener executionListener) {
        this.registeredListeners.add(executionListener);
    }

    public void unregisterExecutionListener(ExecutionListener executionListener) {
        this.registeredListeners.remove(executionListener);
    }

    @Override
    public boolean isCanceled() {
        return this.canceled.get();
    }

    public RuntimeEnvironment getRuntimeEnvironment() {
        return this.environment;
    }
}

