/*
 * Decompiled with CFR 0.152.
 */
package eu.stratosphere.nephele.jobmanager.scheduler;

import eu.stratosphere.nephele.execution.ExecutionState;
import eu.stratosphere.nephele.executiongraph.ExecutionEdge;
import eu.stratosphere.nephele.executiongraph.ExecutionGate;
import eu.stratosphere.nephele.executiongraph.ExecutionGraph;
import eu.stratosphere.nephele.executiongraph.ExecutionGraphIterator;
import eu.stratosphere.nephele.executiongraph.ExecutionGroupVertex;
import eu.stratosphere.nephele.executiongraph.ExecutionGroupVertexIterator;
import eu.stratosphere.nephele.executiongraph.ExecutionPipeline;
import eu.stratosphere.nephele.executiongraph.ExecutionStage;
import eu.stratosphere.nephele.executiongraph.ExecutionStageListener;
import eu.stratosphere.nephele.executiongraph.ExecutionVertex;
import eu.stratosphere.nephele.executiongraph.ExecutionVertexID;
import eu.stratosphere.nephele.executiongraph.InternalJobStatus;
import eu.stratosphere.nephele.executiongraph.JobStatusListener;
import eu.stratosphere.nephele.instance.AllocatedResource;
import eu.stratosphere.nephele.instance.AllocationID;
import eu.stratosphere.nephele.instance.DummyInstance;
import eu.stratosphere.nephele.instance.Instance;
import eu.stratosphere.nephele.instance.InstanceException;
import eu.stratosphere.nephele.instance.InstanceListener;
import eu.stratosphere.nephele.instance.InstanceManager;
import eu.stratosphere.nephele.jobgraph.JobID;
import eu.stratosphere.nephele.jobmanager.DeploymentManager;
import eu.stratosphere.nephele.jobmanager.scheduler.DefaultExecutionListener;
import eu.stratosphere.nephele.jobmanager.scheduler.SchedulingException;
import eu.stratosphere.util.StringUtils;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Deque;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class DefaultScheduler
implements InstanceListener,
JobStatusListener,
ExecutionStageListener {
    protected static final Log LOG = LogFactory.getLog(DefaultScheduler.class);
    private final InstanceManager instanceManager;
    private final DeploymentManager deploymentManager;
    private final Map<ExecutionVertexID, ExecutionVertex> verticesToBeRestarted = new ConcurrentHashMap<ExecutionVertexID, ExecutionVertex>();
    private Deque<ExecutionGraph> jobQueue = new ArrayDeque<ExecutionGraph>();

    public DefaultScheduler(DeploymentManager deploymentManager, InstanceManager instanceManager) {
        this.deploymentManager = deploymentManager;
        this.instanceManager = instanceManager;
        this.instanceManager.setInstanceListener(this);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void removeJobFromSchedule(ExecutionGraph executionGraphToRemove) {
        boolean removedFromQueue = false;
        Deque<ExecutionGraph> deque = this.jobQueue;
        synchronized (deque) {
            Iterator<ExecutionGraph> it = this.jobQueue.iterator();
            while (it.hasNext()) {
                ExecutionGraph executionGraph = it.next();
                if (!executionGraph.getJobID().equals(executionGraphToRemove.getJobID())) continue;
                removedFromQueue = true;
                it.remove();
                break;
            }
        }
        if (!removedFromQueue) {
            LOG.error((Object)("Cannot find job " + executionGraphToRemove.getJobName() + " (" + executionGraphToRemove.getJobID() + ") to remove"));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void scheduleJob(ExecutionGraph executionGraph) throws SchedulingException {
        Object vertex;
        int availableSlots;
        int requiredSlots = executionGraph.getRequiredSlots();
        if (requiredSlots > (availableSlots = this.getInstanceManager().getNumberOfSlots())) {
            throw new SchedulingException("Not enough slots to schedule job " + executionGraph.getJobID());
        }
        executionGraph.registerJobStatusListener(this);
        ExecutionGraphIterator it2 = new ExecutionGraphIterator(executionGraph, true);
        while (it2.hasNext()) {
            vertex = it2.next();
            ((ExecutionVertex)vertex).registerExecutionListener(new DefaultExecutionListener(this, (ExecutionVertex)vertex));
        }
        executionGraph.registerExecutionStageListener(this);
        vertex = this.jobQueue;
        synchronized (vertex) {
            this.jobQueue.add(executionGraph);
        }
        ExecutionStage executionStage = executionGraph.getCurrentExecutionStage();
        try {
            this.requestInstances(executionStage);
        }
        catch (InstanceException e) {
            String exceptionMessage = StringUtils.stringifyException((Throwable)e);
            LOG.error((Object)exceptionMessage);
            this.jobQueue.remove(executionGraph);
            throw new SchedulingException(exceptionMessage);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public ExecutionGraph getExecutionGraphByID(JobID jobID) {
        Deque<ExecutionGraph> deque = this.jobQueue;
        synchronized (deque) {
            for (ExecutionGraph executionGraph : this.jobQueue) {
                if (!executionGraph.getJobID().equals(jobID)) continue;
                return executionGraph;
            }
        }
        return null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void shutdown() {
        Deque<ExecutionGraph> deque = this.jobQueue;
        synchronized (deque) {
            this.jobQueue.clear();
        }
    }

    @Override
    public void jobStatusHasChanged(ExecutionGraph executionGraph, InternalJobStatus newJobStatus, String optionalMessage) {
        if (newJobStatus == InternalJobStatus.FAILED || newJobStatus == InternalJobStatus.FINISHED || newJobStatus == InternalJobStatus.CANCELED) {
            this.removeJobFromSchedule(executionGraph);
        }
    }

    @Override
    public void nextExecutionStageEntered(JobID jobID, ExecutionStage executionStage) {
        try {
            this.requestInstances(executionStage);
        }
        catch (InstanceException e) {
            LOG.error((Object)StringUtils.stringifyException((Throwable)e));
        }
        this.deployAssignedInputVertices(executionStage.getExecutionGraph());
    }

    public InstanceManager getInstanceManager() {
        return this.instanceManager;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void requestInstances(ExecutionStage executionStage) throws InstanceException {
        ExecutionGraph executionGraph = executionStage.getExecutionGraph();
        ExecutionStage executionStage2 = executionStage;
        synchronized (executionStage2) {
            int requiredSlots = executionStage.getRequiredSlots();
            LOG.info((Object)("Requesting " + requiredSlots + " slots for job " + executionGraph.getJobID()));
            this.instanceManager.requestInstance(executionGraph.getJobID(), executionGraph.getJobConfiguration(), requiredSlots);
            ExecutionGraphIterator it2 = new ExecutionGraphIterator(executionGraph, executionGraph.getIndexOfCurrentExecutionStage(), true, true);
            while (it2.hasNext()) {
                it2.next().compareAndUpdateExecutionState(ExecutionState.CREATED, ExecutionState.SCHEDULED);
            }
        }
    }

    void findVerticesToBeDeployed(ExecutionVertex vertex, Map<Instance, List<ExecutionVertex>> verticesToBeDeployed, Set<ExecutionVertex> alreadyVisited) {
        if (!alreadyVisited.add(vertex)) {
            return;
        }
        if (vertex.compareAndUpdateExecutionState(ExecutionState.ASSIGNED, ExecutionState.READY)) {
            List<ExecutionVertex> verticesForInstance;
            Instance instance = vertex.getAllocatedResource().getInstance();
            if (instance instanceof DummyInstance) {
                LOG.error((Object)("Inconsistency: Vertex " + vertex + " is about to be deployed on a DummyInstance"));
            }
            if ((verticesForInstance = verticesToBeDeployed.get(instance)) == null) {
                verticesForInstance = new ArrayList<ExecutionVertex>();
                verticesToBeDeployed.put(instance, verticesForInstance);
            }
            verticesForInstance.add(vertex);
        }
        int numberOfOutputGates = vertex.getNumberOfOutputGates();
        for (int i = 0; i < numberOfOutputGates; ++i) {
            boolean deployTarget;
            ExecutionGate outputGate = vertex.getOutputGate(i);
            switch (outputGate.getChannelType()) {
                case NETWORK: {
                    deployTarget = false;
                    break;
                }
                case IN_MEMORY: {
                    deployTarget = true;
                    break;
                }
                default: {
                    throw new IllegalStateException("Unknown channel type");
                }
            }
            if (!deployTarget) continue;
            int numberOfOutputChannels = outputGate.getNumberOfEdges();
            for (int j = 0; j < numberOfOutputChannels; ++j) {
                ExecutionEdge outputChannel = outputGate.getEdge(j);
                ExecutionVertex connectedVertex = outputChannel.getInputGate().getVertex();
                this.findVerticesToBeDeployed(connectedVertex, verticesToBeDeployed, alreadyVisited);
            }
        }
    }

    public void deployAssignedVertices(ExecutionVertex startVertex) {
        JobID jobID = startVertex.getExecutionGraph().getJobID();
        HashMap<Instance, List<ExecutionVertex>> verticesToBeDeployed = new HashMap<Instance, List<ExecutionVertex>>();
        HashSet<ExecutionVertex> alreadyVisited = new HashSet<ExecutionVertex>();
        this.findVerticesToBeDeployed(startVertex, verticesToBeDeployed, alreadyVisited);
        if (!verticesToBeDeployed.isEmpty()) {
            for (Map.Entry entry : verticesToBeDeployed.entrySet()) {
                this.deploymentManager.deploy(jobID, (Instance)entry.getKey(), (List)entry.getValue());
            }
        }
    }

    public void deployAssignedPipeline(ExecutionPipeline pipeline) {
        JobID jobID = null;
        HashMap<Instance, List<ExecutionVertex>> verticesToBeDeployed = new HashMap<Instance, List<ExecutionVertex>>();
        HashSet<ExecutionVertex> alreadyVisited = new HashSet<ExecutionVertex>();
        Iterator<ExecutionVertex> it = pipeline.iterator();
        while (it.hasNext()) {
            this.findVerticesToBeDeployed(it.next(), verticesToBeDeployed, alreadyVisited);
        }
        if (!verticesToBeDeployed.isEmpty()) {
            for (Map.Entry entry : verticesToBeDeployed.entrySet()) {
                this.deploymentManager.deploy(jobID, (Instance)entry.getKey(), (List)entry.getValue());
            }
        }
    }

    public void deployAssignedVertices(Collection<ExecutionVertex> startVertices) {
        JobID jobID = null;
        HashMap<Instance, List<ExecutionVertex>> verticesToBeDeployed = new HashMap<Instance, List<ExecutionVertex>>();
        HashSet<ExecutionVertex> alreadyVisited = new HashSet<ExecutionVertex>();
        for (ExecutionVertex startVertex : startVertices) {
            if (jobID == null) {
                jobID = startVertex.getExecutionGraph().getJobID();
            }
            this.findVerticesToBeDeployed(startVertex, verticesToBeDeployed, alreadyVisited);
        }
        if (!verticesToBeDeployed.isEmpty()) {
            for (Map.Entry entry : verticesToBeDeployed.entrySet()) {
                this.deploymentManager.deploy(jobID, (Instance)entry.getKey(), (List)entry.getValue());
            }
        }
    }

    public void deployAssignedInputVertices(ExecutionGraph executionGraph) {
        HashMap<Instance, List<ExecutionVertex>> verticesToBeDeployed = new HashMap<Instance, List<ExecutionVertex>>();
        ExecutionStage executionStage = executionGraph.getCurrentExecutionStage();
        HashSet<ExecutionVertex> alreadyVisited = new HashSet<ExecutionVertex>();
        for (int i = 0; i < executionStage.getNumberOfStageMembers(); ++i) {
            ExecutionGroupVertex startVertex = executionStage.getStageMember(i);
            if (!startVertex.isInputVertex()) continue;
            for (int j = 0; j < startVertex.getCurrentNumberOfGroupMembers(); ++j) {
                ExecutionVertex vertex = startVertex.getGroupMember(j);
                this.findVerticesToBeDeployed(vertex, verticesToBeDeployed, alreadyVisited);
            }
        }
        if (!verticesToBeDeployed.isEmpty()) {
            for (Map.Entry entry : verticesToBeDeployed.entrySet()) {
                this.deploymentManager.deploy(executionGraph.getJobID(), (Instance)entry.getKey(), (List)entry.getValue());
            }
        }
    }

    @Override
    public void resourcesAllocated(JobID jobID, final List<AllocatedResource> allocatedResources) {
        if (allocatedResources == null) {
            LOG.error((Object)"Resource to lock is null!");
            return;
        }
        for (AllocatedResource allocatedResource : allocatedResources) {
            if (!(allocatedResource.getInstance() instanceof DummyInstance)) continue;
            LOG.debug((Object)"Available instance is of type DummyInstance!");
            return;
        }
        final ExecutionGraph eg = this.getExecutionGraphByID(jobID);
        if (eg == null) {
            try {
                for (AllocatedResource allocatedResource : allocatedResources) {
                    this.getInstanceManager().releaseAllocatedResource(allocatedResource);
                }
            }
            catch (InstanceException instanceException) {
                LOG.error((Object)instanceException);
            }
            return;
        }
        Runnable runnable = new Runnable(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void run() {
                ExecutionStage stage;
                ExecutionStage executionStage = stage = eg.getCurrentExecutionStage();
                synchronized (executionStage) {
                    for (AllocatedResource allocatedResource : allocatedResources) {
                        AllocatedResource resourceToBeReplaced = null;
                        ExecutionGroupVertexIterator groupIterator = new ExecutionGroupVertexIterator(eg, true, stage.getStageNumber());
                        while (groupIterator.hasNext()) {
                            ExecutionGroupVertex groupVertex = (ExecutionGroupVertex)groupIterator.next();
                            for (int i = 0; i < groupVertex.getCurrentNumberOfGroupMembers(); ++i) {
                                ExecutionVertex vertex = groupVertex.getGroupMember(i);
                                if (vertex.getExecutionState() != ExecutionState.SCHEDULED || vertex.getAllocatedResource() == null) continue;
                                resourceToBeReplaced = vertex.getAllocatedResource();
                                break;
                            }
                            if (resourceToBeReplaced == null) continue;
                            break;
                        }
                        if (resourceToBeReplaced == null) {
                            LOG.error((Object)("Instance " + allocatedResource.getInstance() + " is not required for job" + eg.getJobID()));
                            try {
                                DefaultScheduler.this.getInstanceManager().releaseAllocatedResource(allocatedResource);
                            }
                            catch (InstanceException e) {
                                LOG.error((Object)e);
                            }
                            return;
                        }
                        Iterator<ExecutionVertex> it = resourceToBeReplaced.assignedVertices();
                        while (it.hasNext()) {
                            ExecutionVertex vertex = it.next();
                            vertex.setAllocatedResource(allocatedResource);
                            vertex.updateExecutionState(ExecutionState.ASSIGNED);
                        }
                    }
                }
                DefaultScheduler.this.deployAssignedInputVertices(eg);
            }
        };
        eg.executeCommand(runnable);
    }

    public void checkAndReleaseAllocatedResource(ExecutionGraph executionGraph, AllocatedResource allocatedResource) {
        if (allocatedResource == null) {
            LOG.error((Object)"Resource to lock is null!");
            return;
        }
        if (allocatedResource.getInstance() instanceof DummyInstance) {
            LOG.debug((Object)"Available instance is of type DummyInstance!");
            return;
        }
        boolean resourceCanBeReleased = true;
        Iterator<ExecutionVertex> it = allocatedResource.assignedVertices();
        while (it.hasNext()) {
            ExecutionVertex vertex = it.next();
            ExecutionState state = vertex.getExecutionState();
            if (state == ExecutionState.CREATED || state == ExecutionState.FINISHED || state == ExecutionState.FAILED || state == ExecutionState.CANCELED) continue;
            resourceCanBeReleased = false;
            break;
        }
        if (resourceCanBeReleased) {
            LOG.info((Object)("Releasing instance " + allocatedResource.getInstance()));
            try {
                this.getInstanceManager().releaseAllocatedResource(allocatedResource);
            }
            catch (InstanceException e) {
                LOG.error((Object)StringUtils.stringifyException((Throwable)e));
            }
        }
    }

    DeploymentManager getDeploymentManager() {
        return this.deploymentManager;
    }

    protected void replayCheckpointsFromPreviousStage(ExecutionGraph executionGraph) {
        int currentStageIndex = executionGraph.getIndexOfCurrentExecutionStage();
        ExecutionStage previousStage = executionGraph.getStage(currentStageIndex - 1);
        ArrayList<ExecutionVertex> verticesToBeReplayed = new ArrayList<ExecutionVertex>();
        for (int i = 0; i < previousStage.getNumberOfOutputExecutionVertices(); ++i) {
            ExecutionVertex vertex = previousStage.getOutputExecutionVertex(i);
            vertex.updateExecutionState(ExecutionState.ASSIGNED);
            verticesToBeReplayed.add(vertex);
        }
        this.deployAssignedVertices(verticesToBeReplayed);
    }

    Map<ExecutionVertexID, ExecutionVertex> getVerticesToBeRestarted() {
        return this.verticesToBeRestarted;
    }

    @Override
    public void allocatedResourcesDied(final JobID jobID, final List<AllocatedResource> allocatedResources) {
        final ExecutionGraph eg = this.getExecutionGraphByID(jobID);
        if (eg == null) {
            LOG.error((Object)("Cannot find execution graph for job with ID " + jobID));
            return;
        }
        Runnable command = new Runnable(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void run() {
                ExecutionGraph executionGraph = eg;
                synchronized (executionGraph) {
                    for (AllocatedResource allocatedResource : allocatedResources) {
                        LOG.info((Object)("Resource " + allocatedResource.getInstance().getName() + " for Job " + jobID + " died."));
                        ExecutionGraph executionGraph2 = DefaultScheduler.this.getExecutionGraphByID(jobID);
                        if (executionGraph2 == null) {
                            LOG.error((Object)("Cannot find execution graph for job " + jobID));
                            return;
                        }
                        Iterator<ExecutionVertex> vertexIter = allocatedResource.assignedVertices();
                        DummyInstance dummyInstance = DummyInstance.createDummyInstance();
                        AllocatedResource dummyResource = new AllocatedResource(dummyInstance, new AllocationID());
                        while (vertexIter.hasNext()) {
                            ExecutionVertex vertex = vertexIter.next();
                            vertex.setAllocatedResource(dummyResource);
                        }
                        String failureMessage = allocatedResource.getInstance().getName() + " died";
                        vertexIter = allocatedResource.assignedVertices();
                        while (vertexIter.hasNext()) {
                            ExecutionVertex vertex = vertexIter.next();
                            ExecutionState state = vertex.getExecutionState();
                            switch (state) {
                                case ASSIGNED: 
                                case READY: 
                                case STARTING: 
                                case RUNNING: 
                                case FINISHING: {
                                    vertex.updateExecutionState(ExecutionState.FAILED, failureMessage);
                                    break;
                                }
                            }
                        }
                    }
                }
                InternalJobStatus js = eg.getJobStatus();
                if (js != InternalJobStatus.FAILING && js != InternalJobStatus.FAILED) {
                    ExecutionStage stage = eg.getCurrentExecutionStage();
                    try {
                        DefaultScheduler.this.requestInstances(stage);
                    }
                    catch (InstanceException e) {
                        e.printStackTrace();
                    }
                }
            }
        };
        eg.executeCommand(command);
    }
}

