/*
 * Decompiled with CFR 0.152.
 */
package eu.stratosphere.nephele.executiongraph;

import eu.stratosphere.nephele.executiongraph.ExecutionEdge;
import eu.stratosphere.nephele.executiongraph.ExecutionGate;
import eu.stratosphere.nephele.executiongraph.ExecutionGraph;
import eu.stratosphere.nephele.executiongraph.ExecutionGroupVertex;
import eu.stratosphere.nephele.executiongraph.ExecutionPipeline;
import eu.stratosphere.nephele.executiongraph.ExecutionVertex;
import eu.stratosphere.nephele.instance.Instance;
import eu.stratosphere.runtime.io.channels.ChannelType;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;

public final class ExecutionStage {
    private final ExecutionGraph executionGraph;
    private final CopyOnWriteArrayList<ExecutionGroupVertex> stageMembers = new CopyOnWriteArrayList();
    private volatile int stageNum = -1;

    public ExecutionStage(ExecutionGraph executionGraph, int stageNum) {
        this.executionGraph = executionGraph;
        this.stageNum = stageNum;
    }

    public void setStageNumber(int stageNum) {
        this.stageNum = stageNum;
    }

    public int getStageNumber() {
        return this.stageNum;
    }

    public void addStageMember(ExecutionGroupVertex groupVertex) {
        if (this.stageMembers.addIfAbsent(groupVertex)) {
            groupVertex.setExecutionStage(this);
        }
    }

    public void removeStageMember(ExecutionGroupVertex groupVertex) {
        this.stageMembers.remove(groupVertex);
    }

    public int getNumberOfStageMembers() {
        return this.stageMembers.size();
    }

    public ExecutionGroupVertex getStageMember(int index) {
        try {
            return this.stageMembers.get(index);
        }
        catch (ArrayIndexOutOfBoundsException e) {
            return null;
        }
    }

    public int getNumberOfInputExecutionVertices() {
        int retVal = 0;
        for (ExecutionGroupVertex groupVertex : this.stageMembers) {
            if (!groupVertex.isInputVertex()) continue;
            retVal += groupVertex.getCurrentNumberOfGroupMembers();
        }
        return retVal;
    }

    public int getNumberOfOutputExecutionVertices() {
        int retVal = 0;
        for (ExecutionGroupVertex groupVertex : this.stageMembers) {
            if (!groupVertex.isOutputVertex()) continue;
            retVal += groupVertex.getCurrentNumberOfGroupMembers();
        }
        return retVal;
    }

    public ExecutionVertex getInputExecutionVertex(int index) {
        for (ExecutionGroupVertex groupVertex : this.stageMembers) {
            if (!groupVertex.isInputVertex()) continue;
            int numberOfMembers = groupVertex.getCurrentNumberOfGroupMembers();
            if (index >= numberOfMembers) {
                index -= numberOfMembers;
                continue;
            }
            return groupVertex.getGroupMember(index);
        }
        return null;
    }

    public ExecutionVertex getOutputExecutionVertex(int index) {
        for (ExecutionGroupVertex groupVertex : this.stageMembers) {
            if (!groupVertex.isOutputVertex()) continue;
            int numberOfMembers = groupVertex.getCurrentNumberOfGroupMembers();
            if (index >= numberOfMembers) {
                index -= numberOfMembers;
                continue;
            }
            return groupVertex.getGroupMember(index);
        }
        return null;
    }

    public ExecutionGraph getExecutionGraph() {
        return this.executionGraph;
    }

    void reconstructExecutionPipelines() {
        ExecutionVertex vertex;
        Iterator<ExecutionVertex> vertexIt;
        ExecutionGroupVertex groupVertex;
        Iterator<ExecutionGroupVertex> it = this.stageMembers.iterator();
        HashSet<ExecutionVertex> alreadyVisited = new HashSet<ExecutionVertex>();
        while (it.hasNext()) {
            groupVertex = it.next();
            if (!groupVertex.isInputVertex()) continue;
            vertexIt = groupVertex.iterator();
            while (vertexIt.hasNext()) {
                vertex = vertexIt.next();
                this.reconstructExecutionPipeline(vertex, true, alreadyVisited);
            }
        }
        it = this.stageMembers.iterator();
        alreadyVisited.clear();
        while (it.hasNext()) {
            groupVertex = it.next();
            if (!groupVertex.isOutputVertex()) continue;
            vertexIt = groupVertex.iterator();
            while (vertexIt.hasNext()) {
                vertex = vertexIt.next();
                this.reconstructExecutionPipeline(vertex, false, alreadyVisited);
            }
        }
    }

    private void reconstructExecutionPipeline(ExecutionVertex vertex, boolean forward, Set<ExecutionVertex> alreadyVisited) {
        ExecutionPipeline pipeline = vertex.getExecutionPipeline();
        if (pipeline == null) {
            pipeline = new ExecutionPipeline();
            vertex.setExecutionPipeline(pipeline);
        }
        alreadyVisited.add(vertex);
        if (forward) {
            int numberOfOutputGates = vertex.getNumberOfOutputGates();
            for (int i = 0; i < numberOfOutputGates; ++i) {
                ExecutionGate outputGate = vertex.getOutputGate(i);
                ChannelType channelType = outputGate.getChannelType();
                int numberOfOutputChannels = outputGate.getNumberOfEdges();
                for (int j = 0; j < numberOfOutputChannels; ++j) {
                    ExecutionEdge outputChannel = outputGate.getEdge(j);
                    ExecutionVertex connectedVertex = outputChannel.getInputGate().getVertex();
                    boolean recurse = false;
                    if (!alreadyVisited.contains(connectedVertex)) {
                        recurse = true;
                    }
                    if (channelType == ChannelType.IN_MEMORY && !pipeline.equals(connectedVertex.getExecutionPipeline())) {
                        connectedVertex.setExecutionPipeline(pipeline);
                        recurse = true;
                    }
                    if (!recurse) continue;
                    this.reconstructExecutionPipeline(connectedVertex, true, alreadyVisited);
                }
            }
        } else {
            int numberOfInputGates = vertex.getNumberOfInputGates();
            for (int i = 0; i < numberOfInputGates; ++i) {
                ExecutionGate inputGate = vertex.getInputGate(i);
                ChannelType channelType = inputGate.getChannelType();
                int numberOfInputChannels = inputGate.getNumberOfEdges();
                for (int j = 0; j < numberOfInputChannels; ++j) {
                    ExecutionEdge inputChannel = inputGate.getEdge(j);
                    ExecutionVertex connectedVertex = inputChannel.getOutputGate().getVertex();
                    boolean recurse = false;
                    if (!alreadyVisited.contains(connectedVertex)) {
                        recurse = true;
                    }
                    if (channelType == ChannelType.IN_MEMORY && !pipeline.equals(connectedVertex.getExecutionPipeline())) {
                        connectedVertex.setExecutionPipeline(pipeline);
                        recurse = true;
                    }
                    if (!recurse) continue;
                    this.reconstructExecutionPipeline(connectedVertex, false, alreadyVisited);
                }
            }
        }
    }

    public int getMaxNumberSubtasks() {
        int maxDegree = 0;
        for (int i = 0; i < this.getNumberOfStageMembers(); ++i) {
            ExecutionGroupVertex groupVertex = this.getStageMember(i);
            if (groupVertex.getCurrentNumberOfGroupMembers() <= maxDegree) continue;
            maxDegree = groupVertex.getCurrentNumberOfGroupMembers();
        }
        return maxDegree;
    }

    public int getRequiredSlots() {
        HashSet<Instance> instanceSet = new HashSet<Instance>();
        for (int i = 0; i < this.getNumberOfStageMembers(); ++i) {
            ExecutionGroupVertex groupVertex = this.getStageMember(i);
            Iterator<ExecutionVertex> vertexIterator = groupVertex.iterator();
            while (vertexIterator.hasNext()) {
                ExecutionVertex vertex = vertexIterator.next();
                instanceSet.add(vertex.getAllocatedResource().getInstance());
            }
        }
        return instanceSet.size();
    }
}

