package eu.stratosphere.nephele.taskmanager;

import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.execution.Environment;
import eu.stratosphere.nephele.execution.ExecutionListener;
import eu.stratosphere.nephele.execution.ExecutionObserver;
import eu.stratosphere.nephele.execution.ExecutionState;
import eu.stratosphere.nephele.execution.ExecutionStateTransition;
import eu.stratosphere.nephele.execution.RuntimeEnvironment;
import eu.stratosphere.nephele.executiongraph.ExecutionVertexID;
import eu.stratosphere.nephele.jobgraph.JobID;
import eu.stratosphere.nephele.profiling.TaskManagerProfiler;
import eu.stratosphere.nephele.services.memorymanager.MemoryManager;
import java.lang.Thread;
import java.util.Iterator;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:eu/stratosphere/nephele/taskmanager/Task.class */
public final class Task implements ExecutionObserver {
    private static final Log LOG = LogFactory.getLog(Task.class);
    private final ExecutionVertexID vertexID;
    private final RuntimeEnvironment environment;
    private final TaskManager taskManager;
    private final AtomicBoolean canceled = new AtomicBoolean(false);
    private volatile ExecutionState executionState = ExecutionState.STARTING;
    private Queue<ExecutionListener> registeredListeners = new ConcurrentLinkedQueue();

    public Task(ExecutionVertexID executionVertexID, RuntimeEnvironment runtimeEnvironment, TaskManager taskManager) {
        this.vertexID = executionVertexID;
        this.environment = runtimeEnvironment;
        this.taskManager = taskManager;
        this.environment.setExecutionObserver(this);
    }

    public JobID getJobID() {
        return this.environment.getJobID();
    }

    public ExecutionVertexID getVertexID() {
        return this.vertexID;
    }

    public Environment getEnvironment() {
        return this.environment;
    }

    public void markAsFailed() {
        executionStateChanged(ExecutionState.FAILED, "Execution thread died unexpectedly");
    }

    public void cancelExecution() {
        cancelOrKillExecution();
    }

    public void killExecution() {
        cancelOrKillExecution();
    }

    private void cancelOrKillExecution() {
        if (this.canceled.compareAndSet(false, true)) {
            if (this.executionState == ExecutionState.RUNNING || this.executionState == ExecutionState.FINISHING) {
                executionStateChanged(ExecutionState.CANCELING, null);
                try {
                    this.environment.cancelExecution();
                } catch (Throwable th) {
                    LOG.error("Error while cancelling the task.", th);
                }
            }
        }
    }

    public boolean isTerminated() {
        return this.environment.getExecutingThread().getState() == Thread.State.TERMINATED;
    }

    public void startExecution() {
        this.environment.getExecutingThread().start();
    }

    public void registerProfiler(TaskManagerProfiler taskManagerProfiler, Configuration configuration) {
        taskManagerProfiler.registerExecutionListener(this, configuration);
    }

    public void unregisterMemoryManager(MemoryManager memoryManager) {
        if (memoryManager != null) {
            memoryManager.releaseAll(this.environment.getInvokable());
        }
    }

    public void unregisterProfiler(TaskManagerProfiler taskManagerProfiler) {
        if (taskManagerProfiler != null) {
            taskManagerProfiler.unregisterExecutionListener(this.vertexID);
        }
    }

    public ExecutionState getExecutionState() {
        return this.executionState;
    }

    @Override // eu.stratosphere.nephele.execution.ExecutionObserver
    public void executionStateChanged(ExecutionState executionState, String str) {
        ExecutionStateTransition.checkTransition(false, getTaskName(), this.executionState, executionState);
        if (executionState == ExecutionState.FAILED) {
            LOG.error(str);
        }
        Iterator<ExecutionListener> it = this.registeredListeners.iterator();
        while (it.hasNext()) {
            it.next().executionStateChanged(this.environment.getJobID(), this.vertexID, executionState, str);
        }
        this.executionState = executionState;
        this.taskManager.executionStateChanged(this.environment.getJobID(), this.vertexID, executionState, str);
    }

    private String getTaskName() {
        return this.environment.getTaskName() + " (" + (this.environment.getIndexInSubtaskGroup() + 1) + "/" + this.environment.getCurrentNumberOfSubtasks() + ")";
    }

    @Override // eu.stratosphere.nephele.execution.ExecutionObserver
    public void userThreadStarted(Thread thread) {
        Iterator<ExecutionListener> it = this.registeredListeners.iterator();
        while (it.hasNext()) {
            it.next().userThreadStarted(this.environment.getJobID(), this.vertexID, thread);
        }
    }

    @Override // eu.stratosphere.nephele.execution.ExecutionObserver
    public void userThreadFinished(Thread thread) {
        Iterator<ExecutionListener> it = this.registeredListeners.iterator();
        while (it.hasNext()) {
            it.next().userThreadFinished(this.environment.getJobID(), this.vertexID, thread);
        }
    }

    public void registerExecutionListener(ExecutionListener executionListener) {
        this.registeredListeners.add(executionListener);
    }

    public void unregisterExecutionListener(ExecutionListener executionListener) {
        this.registeredListeners.remove(executionListener);
    }

    @Override // eu.stratosphere.nephele.execution.ExecutionObserver
    public boolean isCanceled() {
        return this.canceled.get();
    }

    public RuntimeEnvironment getRuntimeEnvironment() {
        return this.environment;
    }
}
