/*
 * Decompiled with CFR 0.152.
 */
package eu.stratosphere.compiler;

import eu.stratosphere.api.common.Plan;
import eu.stratosphere.api.common.operators.Operator;
import eu.stratosphere.api.common.operators.SingleInputOperator;
import eu.stratosphere.api.common.operators.Union;
import eu.stratosphere.api.common.operators.base.BulkIterationBase;
import eu.stratosphere.api.common.operators.base.CoGroupOperatorBase;
import eu.stratosphere.api.common.operators.base.CollectorMapOperatorBase;
import eu.stratosphere.api.common.operators.base.CrossOperatorBase;
import eu.stratosphere.api.common.operators.base.DeltaIterationBase;
import eu.stratosphere.api.common.operators.base.FilterOperatorBase;
import eu.stratosphere.api.common.operators.base.FlatMapOperatorBase;
import eu.stratosphere.api.common.operators.base.GenericDataSinkBase;
import eu.stratosphere.api.common.operators.base.GenericDataSourceBase;
import eu.stratosphere.api.common.operators.base.GroupReduceOperatorBase;
import eu.stratosphere.api.common.operators.base.JoinOperatorBase;
import eu.stratosphere.api.common.operators.base.MapOperatorBase;
import eu.stratosphere.api.common.operators.base.ReduceOperatorBase;
import eu.stratosphere.compiler.CompilerException;
import eu.stratosphere.compiler.DataStatistics;
import eu.stratosphere.compiler.costs.CostEstimator;
import eu.stratosphere.compiler.costs.DefaultCostEstimator;
import eu.stratosphere.compiler.dag.BinaryUnionNode;
import eu.stratosphere.compiler.dag.BulkIterationNode;
import eu.stratosphere.compiler.dag.BulkPartialSolutionNode;
import eu.stratosphere.compiler.dag.CoGroupNode;
import eu.stratosphere.compiler.dag.CollectorMapNode;
import eu.stratosphere.compiler.dag.CrossNode;
import eu.stratosphere.compiler.dag.DataSinkNode;
import eu.stratosphere.compiler.dag.DataSourceNode;
import eu.stratosphere.compiler.dag.FilterNode;
import eu.stratosphere.compiler.dag.FlatMapNode;
import eu.stratosphere.compiler.dag.GroupReduceNode;
import eu.stratosphere.compiler.dag.IterationNode;
import eu.stratosphere.compiler.dag.MapNode;
import eu.stratosphere.compiler.dag.MatchNode;
import eu.stratosphere.compiler.dag.OptimizerNode;
import eu.stratosphere.compiler.dag.PactConnection;
import eu.stratosphere.compiler.dag.ReduceNode;
import eu.stratosphere.compiler.dag.SinkJoiner;
import eu.stratosphere.compiler.dag.SolutionSetNode;
import eu.stratosphere.compiler.dag.TempMode;
import eu.stratosphere.compiler.dag.WorksetIterationNode;
import eu.stratosphere.compiler.dag.WorksetNode;
import eu.stratosphere.compiler.deadlockdetect.DeadlockPreventer;
import eu.stratosphere.compiler.plan.BinaryUnionPlanNode;
import eu.stratosphere.compiler.plan.BulkIterationPlanNode;
import eu.stratosphere.compiler.plan.BulkPartialSolutionPlanNode;
import eu.stratosphere.compiler.plan.Channel;
import eu.stratosphere.compiler.plan.IterationPlanNode;
import eu.stratosphere.compiler.plan.NAryUnionPlanNode;
import eu.stratosphere.compiler.plan.OptimizedPlan;
import eu.stratosphere.compiler.plan.PlanNode;
import eu.stratosphere.compiler.plan.SinkJoinerPlanNode;
import eu.stratosphere.compiler.plan.SinkPlanNode;
import eu.stratosphere.compiler.plan.SolutionSetPlanNode;
import eu.stratosphere.compiler.plan.SourcePlanNode;
import eu.stratosphere.compiler.plan.WorksetIterationPlanNode;
import eu.stratosphere.compiler.plan.WorksetPlanNode;
import eu.stratosphere.compiler.postpass.OptimizerPostPass;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.configuration.GlobalConfiguration;
import eu.stratosphere.pact.runtime.shipping.ShipStrategyType;
import eu.stratosphere.pact.runtime.task.util.LocalStrategy;
import eu.stratosphere.util.InstantiationUtil;
import eu.stratosphere.util.Visitor;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Deque;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class PactCompiler {
    public static final String HINT_SHIP_STRATEGY = "INPUT_SHIP_STRATEGY";
    public static final String HINT_SHIP_STRATEGY_FIRST_INPUT = "INPUT_LEFT_SHIP_STRATEGY";
    public static final String HINT_SHIP_STRATEGY_SECOND_INPUT = "INPUT_RIGHT_SHIP_STRATEGY";
    public static final String HINT_SHIP_STRATEGY_FORWARD = "SHIP_FORWARD";
    public static final String HINT_SHIP_STRATEGY_REPARTITION = "SHIP_REPARTITION";
    public static final String HINT_SHIP_STRATEGY_REPARTITION_HASH = "SHIP_REPARTITION_HASH";
    public static final String HINT_SHIP_STRATEGY_REPARTITION_RANGE = "SHIP_REPARTITION_RANGE";
    public static final String HINT_SHIP_STRATEGY_BROADCAST = "SHIP_BROADCAST";
    public static final String HINT_LOCAL_STRATEGY = "LOCAL_STRATEGY";
    public static final String HINT_LOCAL_STRATEGY_SORT = "LOCAL_STRATEGY_SORT";
    public static final String HINT_LOCAL_STRATEGY_COMBINING_SORT = "LOCAL_STRATEGY_COMBINING_SORT";
    public static final String HINT_LOCAL_STRATEGY_SORT_BOTH_MERGE = "LOCAL_STRATEGY_SORT_BOTH_MERGE";
    public static final String HINT_LOCAL_STRATEGY_SORT_FIRST_MERGE = "LOCAL_STRATEGY_SORT_FIRST_MERGE";
    public static final String HINT_LOCAL_STRATEGY_SORT_SECOND_MERGE = "LOCAL_STRATEGY_SORT_SECOND_MERGE";
    public static final String HINT_LOCAL_STRATEGY_MERGE = "LOCAL_STRATEGY_MERGE";
    public static final String HINT_LOCAL_STRATEGY_HASH_BUILD_FIRST = "LOCAL_STRATEGY_HASH_BUILD_FIRST";
    public static final String HINT_LOCAL_STRATEGY_HASH_BUILD_SECOND = "LOCAL_STRATEGY_HASH_BUILD_SECOND";
    public static final String HINT_LOCAL_STRATEGY_NESTEDLOOP_STREAMED_OUTER_FIRST = "LOCAL_STRATEGY_NESTEDLOOP_STREAMED_OUTER_FIRST";
    public static final String HINT_LOCAL_STRATEGY_NESTEDLOOP_STREAMED_OUTER_SECOND = "LOCAL_STRATEGY_NESTEDLOOP_STREAMED_OUTER_SECOND";
    public static final String HINT_LOCAL_STRATEGY_NESTEDLOOP_BLOCKED_OUTER_FIRST = "LOCAL_STRATEGY_NESTEDLOOP_BLOCKED_OUTER_FIRST";
    public static final String HINT_LOCAL_STRATEGY_NESTEDLOOP_BLOCKED_OUTER_SECOND = "LOCAL_STRATEGY_NESTEDLOOP_BLOCKED_OUTER_SECOND";
    public static final Log LOG = LogFactory.getLog(PactCompiler.class);
    private final DataStatistics statistics;
    private final CostEstimator costEstimator;
    private int defaultDegreeOfParallelism;

    public PactCompiler() {
        this(null, new DefaultCostEstimator());
    }

    public PactCompiler(DataStatistics stats) {
        this(stats, new DefaultCostEstimator());
    }

    public PactCompiler(CostEstimator estimator) {
        this(null, estimator);
    }

    public PactCompiler(DataStatistics stats, CostEstimator estimator) {
        this.statistics = stats;
        this.costEstimator = estimator;
        Configuration config = GlobalConfiguration.getConfiguration();
        this.defaultDegreeOfParallelism = config.getInteger("parallelization.degree.default", 1);
    }

    public int getDefaultDegreeOfParallelism() {
        return this.defaultDegreeOfParallelism;
    }

    public void setDefaultDegreeOfParallelism(int defaultDegreeOfParallelism) {
        if (defaultDegreeOfParallelism <= 0) {
            throw new IllegalArgumentException("Default parallelism cannot be zero or negative.");
        }
        this.defaultDegreeOfParallelism = defaultDegreeOfParallelism;
    }

    public OptimizedPlan compile(Plan program) throws CompilerException {
        OptimizerPostPass postPasser = this.getPostPassFromPlan(program);
        return this.compile(program, postPasser);
    }

    private OptimizedPlan compile(Plan program, OptimizerPostPass postPasser) throws CompilerException {
        OptimizerNode rootNode;
        int defaultParallelism;
        if (program == null || postPasser == null) {
            throw new NullPointerException();
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug((Object)("Beginning compilation of program '" + program.getJobName() + '\''));
        }
        int n = defaultParallelism = program.getDefaultParallelism() > 0 ? program.getDefaultParallelism() : this.defaultDegreeOfParallelism;
        if (LOG.isDebugEnabled()) {
            LOG.debug((Object)("Using a default degree of parallelism of " + defaultParallelism + '.'));
        }
        GraphCreatingVisitor graphCreator = new GraphCreatingVisitor(defaultParallelism);
        program.accept((Visitor)graphCreator);
        if (graphCreator.sinks.size() == 1) {
            rootNode = (OptimizerNode)graphCreator.sinks.get(0);
        } else if (graphCreator.sinks.size() > 1) {
            Iterator iter = graphCreator.sinks.iterator();
            rootNode = (OptimizerNode)iter.next();
            while (iter.hasNext()) {
                rootNode = new SinkJoiner(rootNode, (OptimizerNode)iter.next());
            }
        } else {
            throw new CompilerException("Bug: The optimizer plan representation has no sinks.");
        }
        rootNode.accept(new IdAndEstimatesVisitor(this.statistics));
        InterestingPropertyVisitor propsVisitor = new InterestingPropertyVisitor(this.costEstimator);
        rootNode.accept(propsVisitor);
        BranchesVisitor branchingVisitor = new BranchesVisitor();
        rootNode.accept(branchingVisitor);
        if (rootNode.getOpenBranches() != null && rootNode.getOpenBranches().size() > 0) {
            throw new CompilerException("Bug: Logic for branching plans (non-tree plans) has an error, and does not track the re-joining of branches correctly.");
        }
        List<PlanNode> bestPlan = rootNode.getAlternativePlans(this.costEstimator);
        if (bestPlan.size() != 1) {
            throw new CompilerException("Error in compiler: more than one best plan was created!");
        }
        PlanNode bestPlanRoot = bestPlan.get(0);
        ArrayList<SinkPlanNode> bestPlanSinks = new ArrayList<SinkPlanNode>(4);
        if (bestPlanRoot instanceof SinkPlanNode) {
            bestPlanSinks.add((SinkPlanNode)bestPlanRoot);
        } else if (bestPlanRoot instanceof SinkJoinerPlanNode) {
            ((SinkJoinerPlanNode)bestPlanRoot).getDataSinks(bestPlanSinks);
        }
        DeadlockPreventer dp = new DeadlockPreventer();
        dp.resolveDeadlocks(bestPlanSinks);
        OptimizedPlan plan = new PlanFinalizer().createFinalPlan(bestPlanSinks, program.getJobName(), program);
        plan.accept(new BinaryUnionReplacer());
        postPasser.postPass(plan);
        return plan;
    }

    public static List<DataSinkNode> createPreOptimizedPlan(Plan program) {
        GraphCreatingVisitor graphCreator = new GraphCreatingVisitor(1);
        program.accept((Visitor)graphCreator);
        return graphCreator.sinks;
    }

    private OptimizerPostPass getPostPassFromPlan(Plan program) {
        String className = program.getPostPassClassName();
        if (className == null) {
            throw new CompilerException("Optimizer Post Pass class description is null");
        }
        try {
            Class<OptimizerPostPass> clazz = Class.forName(className).asSubclass(OptimizerPostPass.class);
            try {
                return (OptimizerPostPass)InstantiationUtil.instantiate(clazz, OptimizerPostPass.class);
            }
            catch (RuntimeException rtex) {
                if (rtex.getCause() != null) {
                    throw new CompilerException("Cannot instantiate optimizer post pass: " + rtex.getMessage(), rtex.getCause());
                }
                throw rtex;
            }
        }
        catch (ClassNotFoundException cnfex) {
            throw new CompilerException("Cannot load Optimizer post-pass class '" + className + "'.", cnfex);
        }
        catch (ClassCastException ccex) {
            throw new CompilerException("Class '" + className + "' is not an optimizer post passer.", ccex);
        }
    }

    private static final class BinaryUnionReplacer
    implements Visitor<PlanNode> {
        private final Set<PlanNode> seenBefore = new HashSet<PlanNode>();

        private BinaryUnionReplacer() {
        }

        public boolean preVisit(PlanNode visitable) {
            if (this.seenBefore.add(visitable)) {
                if (visitable instanceof IterationPlanNode) {
                    ((IterationPlanNode)((Object)visitable)).acceptForStepFunction(this);
                }
                return true;
            }
            return false;
        }

        public void postVisit(PlanNode visitable) {
            if (visitable instanceof BinaryUnionPlanNode) {
                BinaryUnionPlanNode unionNode = (BinaryUnionPlanNode)visitable;
                Channel in1 = unionNode.getInput1();
                Channel in2 = unionNode.getInput2();
                ArrayList<Channel> inputs = new ArrayList<Channel>();
                this.collect(in1, inputs);
                this.collect(in2, inputs);
                NAryUnionPlanNode newUnionNode = new NAryUnionPlanNode(unionNode.getOptimizerNode(), inputs, unionNode.getGlobalProperties());
                for (Channel c : inputs) {
                    c.setTarget(newUnionNode);
                }
                for (Channel channel : unionNode.getOutgoingChannels()) {
                    channel.swapUnionNodes(newUnionNode);
                }
            }
        }

        private void collect(Channel in, List<Channel> inputs) {
            if (in.getSource() instanceof NAryUnionPlanNode) {
                if (in.getShipStrategy() != ShipStrategyType.FORWARD) {
                    throw new CompilerException("Bug: Plan generation for Unions picked a ship strategy between binary plan operators.");
                }
                if (in.getLocalStrategy() != null && in.getLocalStrategy() != LocalStrategy.NONE) {
                    throw new CompilerException("Bug: Plan generation for Unions picked a local strategy between binary plan operators.");
                }
                inputs.addAll(((NAryUnionPlanNode)in.getSource()).getListOfInputs());
            } else {
                inputs.add(in);
            }
        }
    }

    private static final class PlanFinalizer
    implements Visitor<PlanNode> {
        private final Set<PlanNode> allNodes = new HashSet<PlanNode>();
        private final List<SourcePlanNode> sources = new ArrayList<SourcePlanNode>();
        private final List<SinkPlanNode> sinks = new ArrayList<SinkPlanNode>();
        private final Deque<IterationPlanNode> stackOfIterationNodes = new ArrayDeque<IterationPlanNode>();
        private int memoryConsumerWeights;

        private PlanFinalizer() {
        }

        private OptimizedPlan createFinalPlan(List<SinkPlanNode> sinks, String jobName, Plan originalPlan) {
            this.memoryConsumerWeights = 0;
            for (SinkPlanNode sinkPlanNode : sinks) {
                sinkPlanNode.accept(this);
            }
            if (this.memoryConsumerWeights > 0) {
                for (PlanNode planNode : this.allNodes) {
                    int consumerWeight = planNode.getMemoryConsumerWeight();
                    if (consumerWeight > 0) {
                        double relativeMem = (double)consumerWeight / (double)this.memoryConsumerWeights;
                        planNode.setRelativeMemoryPerSubtask(relativeMem);
                        if (LOG.isDebugEnabled()) {
                            LOG.debug((Object)("Assigned " + relativeMem + " of total memory to each subtask of " + planNode.getPactContract().getName() + "."));
                        }
                    }
                    for (Channel c : planNode.getInputs()) {
                        double relativeMem;
                        if (c.getLocalStrategy().dams()) {
                            relativeMem = 1.0 / (double)this.memoryConsumerWeights;
                            c.setRelativeMemoryLocalStrategy(relativeMem);
                            if (LOG.isDebugEnabled()) {
                                LOG.debug((Object)("Assigned " + relativeMem + " of total memory to each local strategy " + "instance of " + c + "."));
                            }
                        }
                        if (c.getTempMode() == TempMode.NONE) continue;
                        relativeMem = 1.0 / (double)this.memoryConsumerWeights;
                        c.setRelativeTempMemory(relativeMem);
                        if (!LOG.isDebugEnabled()) continue;
                        LOG.debug((Object)("Assigned " + relativeMem + " of total memory to each instance of the temp " + "table" + " " + "for " + c + "."));
                    }
                }
            }
            return new OptimizedPlan(this.sources, this.sinks, this.allNodes, jobName, originalPlan);
        }

        public boolean preVisit(PlanNode visitable) {
            if (!this.allNodes.add(visitable)) {
                return false;
            }
            if (visitable instanceof SinkPlanNode) {
                this.sinks.add((SinkPlanNode)visitable);
            } else if (visitable instanceof SourcePlanNode) {
                this.sources.add((SourcePlanNode)visitable);
            } else if (visitable instanceof BulkPartialSolutionPlanNode) {
                BulkPartialSolutionPlanNode pspn = (BulkPartialSolutionPlanNode)visitable;
                IterationPlanNode iterationPlanNode = this.stackOfIterationNodes.peekLast();
                if (iterationPlanNode == null || !(iterationPlanNode instanceof BulkIterationPlanNode)) {
                    throw new CompilerException("Bug: Error finalizing the plan. Cannot associate the node for a partial solutions with its containing iteration.");
                }
                pspn.setContainingIterationNode((BulkIterationPlanNode)iterationPlanNode);
            } else if (visitable instanceof WorksetPlanNode) {
                WorksetPlanNode wspn = (WorksetPlanNode)visitable;
                IterationPlanNode iterationPlanNode = this.stackOfIterationNodes.peekLast();
                if (iterationPlanNode == null || !(iterationPlanNode instanceof WorksetIterationPlanNode)) {
                    throw new CompilerException("Bug: Error finalizing the plan. Cannot associate the node for a partial solutions with its containing iteration.");
                }
                wspn.setContainingIterationNode((WorksetIterationPlanNode)iterationPlanNode);
            } else if (visitable instanceof SolutionSetPlanNode) {
                Iterator<Channel> sspn = (SolutionSetPlanNode)visitable;
                IterationPlanNode iterationPlanNode = this.stackOfIterationNodes.peekLast();
                if (iterationPlanNode == null || !(iterationPlanNode instanceof WorksetIterationPlanNode)) {
                    throw new CompilerException("Bug: Error finalizing the plan. Cannot associate the node for a partial solutions with its containing iteration.");
                }
                ((SolutionSetPlanNode)((Object)sspn)).setContainingIterationNode((WorksetIterationPlanNode)iterationPlanNode);
            }
            for (Channel channel : visitable.getInputs()) {
                channel.setTarget(visitable);
                channel.getSource().addOutgoingChannel(channel);
            }
            for (Channel channel : visitable.getBroadcastInputs()) {
                channel.setTarget(visitable);
                channel.getSource().addOutgoingChannel(channel);
            }
            this.memoryConsumerWeights += visitable.getMemoryConsumerWeight();
            for (Channel channel : visitable.getInputs()) {
                if (channel.getLocalStrategy().dams()) {
                    ++this.memoryConsumerWeights;
                }
                if (channel.getTempMode() == TempMode.NONE) continue;
                ++this.memoryConsumerWeights;
            }
            for (Channel channel : visitable.getBroadcastInputs()) {
                if (channel.getLocalStrategy().dams()) {
                    ++this.memoryConsumerWeights;
                }
                if (channel.getTempMode() == TempMode.NONE) continue;
                ++this.memoryConsumerWeights;
            }
            if (visitable instanceof IterationPlanNode) {
                IterationPlanNode iterNode = (IterationPlanNode)((Object)visitable);
                this.stackOfIterationNodes.addLast(iterNode);
                ((IterationPlanNode)((Object)visitable)).acceptForStepFunction(this);
                this.stackOfIterationNodes.removeLast();
            }
            return true;
        }

        public void postVisit(PlanNode visitable) {
        }
    }

    private static final class BranchesVisitor
    implements Visitor<OptimizerNode> {
        private BranchesVisitor() {
        }

        public boolean preVisit(OptimizerNode node) {
            return node.getOpenBranches() == null;
        }

        public void postVisit(OptimizerNode node) {
            if (node instanceof IterationNode) {
                ((IterationNode)((Object)node)).acceptForStepFunction(this);
            }
            node.computeUnclosedBranchStack();
        }
    }

    public static final class InterestingPropertyVisitor
    implements Visitor<OptimizerNode> {
        private CostEstimator estimator;

        public InterestingPropertyVisitor(CostEstimator estimator) {
            this.estimator = estimator;
        }

        public boolean preVisit(OptimizerNode node) {
            if (node.getInterestingProperties() == null && node.haveAllOutputConnectionInterestingProperties()) {
                node.computeUnionOfInterestingPropertiesFromSuccessors();
                node.computeInterestingPropertiesForInputs(this.estimator);
                return true;
            }
            return false;
        }

        public void postVisit(OptimizerNode visitable) {
        }
    }

    private static final class IdAndEstimatesVisitor
    implements Visitor<OptimizerNode> {
        private final DataStatistics statistics;
        private int id = 1;

        private IdAndEstimatesVisitor(DataStatistics statistics) {
            this.statistics = statistics;
        }

        public boolean preVisit(OptimizerNode visitable) {
            return visitable.getId() == -1;
        }

        public void postVisit(OptimizerNode visitable) {
            visitable.initId(this.id++);
            for (PactConnection conn : visitable.getIncomingConnections()) {
                conn.initMaxDepth();
            }
            for (PactConnection conn : visitable.getBroadcastConnections()) {
                conn.initMaxDepth();
            }
            visitable.computeOutputEstimates(this.statistics);
            if (visitable instanceof IterationNode) {
                ((IterationNode)((Object)visitable)).acceptForStepFunction(this);
            }
        }
    }

    private static final class StaticDynamicPathIdentifier
    implements Visitor<OptimizerNode> {
        private final Set<OptimizerNode> seenBefore = new HashSet<OptimizerNode>();
        private final int costWeight;

        private StaticDynamicPathIdentifier(int costWeight) {
            this.costWeight = costWeight;
        }

        public boolean preVisit(OptimizerNode visitable) {
            return this.seenBefore.add(visitable);
        }

        public void postVisit(OptimizerNode visitable) {
            visitable.identifyDynamicPath(this.costWeight);
        }
    }

    private static final class GraphCreatingVisitor
    implements Visitor<Operator<?>> {
        private final Map<Operator<?>, OptimizerNode> con2node;
        private final List<DataSourceNode> sources;
        private final List<DataSinkNode> sinks;
        private final int defaultParallelism;
        private final GraphCreatingVisitor parent;
        private final boolean forceDOP;

        private GraphCreatingVisitor(int defaultParallelism) {
            this(null, false, defaultParallelism, null);
        }

        private GraphCreatingVisitor(GraphCreatingVisitor parent, boolean forceDOP, int defaultParallelism, HashMap<Operator<?>, OptimizerNode> closure) {
            this.con2node = closure == null ? new HashMap() : closure;
            this.sources = new ArrayList<DataSourceNode>(4);
            this.sinks = new ArrayList<DataSinkNode>(2);
            this.defaultParallelism = defaultParallelism;
            this.parent = parent;
            this.forceDOP = forceDOP;
        }

        public boolean preVisit(Operator<?> c) {
            BulkIterationBase.PartialSolutionPlaceHolder holder;
            OptimizerNode n;
            OptimizerNode dsn;
            if (this.con2node.containsKey(c)) {
                return false;
            }
            if (c instanceof GenericDataSinkBase) {
                dsn = new DataSinkNode((GenericDataSinkBase)c);
                this.sinks.add((DataSinkNode)dsn);
                n = dsn;
            } else if (c instanceof GenericDataSourceBase) {
                dsn = new DataSourceNode((GenericDataSourceBase)c);
                this.sources.add((DataSourceNode)dsn);
                n = dsn;
            } else if (c instanceof MapOperatorBase) {
                n = new MapNode((SingleInputOperator<?, ?, ?>)((MapOperatorBase)c));
            } else if (c instanceof CollectorMapOperatorBase) {
                n = new CollectorMapNode((SingleInputOperator<?, ?, ?>)((CollectorMapOperatorBase)c));
            } else if (c instanceof FlatMapOperatorBase) {
                n = new FlatMapNode((FlatMapOperatorBase)c);
            } else if (c instanceof FilterOperatorBase) {
                n = new FilterNode((FilterOperatorBase)c);
            } else if (c instanceof ReduceOperatorBase) {
                n = new ReduceNode((ReduceOperatorBase)c);
            } else if (c instanceof GroupReduceOperatorBase) {
                n = new GroupReduceNode((GroupReduceOperatorBase)c);
            } else if (c instanceof JoinOperatorBase) {
                n = new MatchNode((JoinOperatorBase)c);
            } else if (c instanceof CoGroupOperatorBase) {
                n = new CoGroupNode((CoGroupOperatorBase)c);
            } else if (c instanceof CrossOperatorBase) {
                n = new CrossNode((CrossOperatorBase)c);
            } else if (c instanceof BulkIterationBase) {
                n = new BulkIterationNode((BulkIterationBase)c);
            } else if (c instanceof DeltaIterationBase) {
                n = new WorksetIterationNode((DeltaIterationBase)c);
            } else if (c instanceof Union) {
                n = new BinaryUnionNode((Union)c);
            } else if (c instanceof BulkIterationBase.PartialSolutionPlaceHolder) {
                holder = (BulkIterationBase.PartialSolutionPlaceHolder)c;
                BulkIterationBase enclosingIteration = holder.getContainingBulkIteration();
                BulkIterationNode containingIterationNode = (BulkIterationNode)this.parent.con2node.get(enclosingIteration);
                BulkPartialSolutionNode p = new BulkPartialSolutionNode(holder, containingIterationNode);
                p.setDegreeOfParallelism(containingIterationNode.getDegreeOfParallelism());
                n = p;
            } else if (c instanceof DeltaIterationBase.WorksetPlaceHolder) {
                holder = (DeltaIterationBase.WorksetPlaceHolder)c;
                DeltaIterationBase enclosingIteration = holder.getContainingWorksetIteration();
                WorksetIterationNode containingIterationNode = (WorksetIterationNode)this.parent.con2node.get(enclosingIteration);
                WorksetNode p = new WorksetNode((DeltaIterationBase.WorksetPlaceHolder<?>)holder, containingIterationNode);
                p.setDegreeOfParallelism(containingIterationNode.getDegreeOfParallelism());
                n = p;
            } else if (c instanceof DeltaIterationBase.SolutionSetPlaceHolder) {
                holder = (DeltaIterationBase.SolutionSetPlaceHolder)c;
                DeltaIterationBase enclosingIteration = holder.getContainingWorksetIteration();
                WorksetIterationNode containingIterationNode = (WorksetIterationNode)this.parent.con2node.get(enclosingIteration);
                SolutionSetNode p = new SolutionSetNode((DeltaIterationBase.SolutionSetPlaceHolder<?>)holder, containingIterationNode);
                p.setDegreeOfParallelism(containingIterationNode.getDegreeOfParallelism());
                n = p;
            } else {
                throw new IllegalArgumentException("Unknown operator type: " + c);
            }
            this.con2node.put(c, n);
            if (n.getDegreeOfParallelism() < 1) {
                int par = c.getDegreeOfParallelism();
                if (par > 0) {
                    if (this.forceDOP && par != this.defaultParallelism) {
                        par = this.defaultParallelism;
                        LOG.warn((Object)"The degree-of-parallelism of nested Dataflows (such as step functions in iterations) is currently fixed to the degree-of-parallelism of the surrounding operator (the iteration).");
                    }
                } else {
                    par = this.defaultParallelism;
                }
                n.setDegreeOfParallelism(par);
            }
            return true;
        }

        public void postVisit(Operator<?> c) {
            OptimizerNode n = this.con2node.get(c);
            n.setInput(this.con2node);
            n.setBroadcastInputs(this.con2node);
            if (n instanceof BulkIterationNode) {
                BulkIterationNode iterNode = (BulkIterationNode)n;
                BulkIterationBase<?> iter = iterNode.getIterationContract();
                HashMap closure = new HashMap(this.con2node);
                GraphCreatingVisitor recursiveCreator = new GraphCreatingVisitor(this, true, iterNode.getDegreeOfParallelism(), closure);
                BulkPartialSolutionNode partialSolution = null;
                iter.getNextPartialSolution().accept((Visitor)recursiveCreator);
                partialSolution = (BulkPartialSolutionNode)recursiveCreator.con2node.get(iter.getPartialSolution());
                OptimizerNode rootOfStepFunction = recursiveCreator.con2node.get(iter.getNextPartialSolution());
                if (partialSolution == null) {
                    throw new CompilerException("Error: The step functions result does not depend on the partial solution.");
                }
                OptimizerNode terminationCriterion = null;
                if (iter.getTerminationCriterion() != null && (terminationCriterion = recursiveCreator.con2node.get(iter.getTerminationCriterion())) == null) {
                    iter.getTerminationCriterion().accept((Visitor)recursiveCreator);
                    terminationCriterion = recursiveCreator.con2node.get(iter.getTerminationCriterion());
                }
                iterNode.setNextPartialSolution(rootOfStepFunction, terminationCriterion);
                iterNode.setPartialSolution(partialSolution);
                StaticDynamicPathIdentifier identifier = new StaticDynamicPathIdentifier(iterNode.getCostWeight());
                rootOfStepFunction.accept(identifier);
                if (terminationCriterion != null) {
                    terminationCriterion.accept(identifier);
                }
            } else if (n instanceof WorksetIterationNode) {
                WorksetIterationNode iterNode = (WorksetIterationNode)n;
                DeltaIterationBase<?, ?> iter = iterNode.getIterationContract();
                HashMap closure = new HashMap(this.con2node);
                GraphCreatingVisitor recursiveCreator = new GraphCreatingVisitor(this, true, iterNode.getDegreeOfParallelism(), closure);
                iter.getSolutionSetDelta().accept((Visitor)recursiveCreator);
                SolutionSetNode solutionSetNode = (SolutionSetNode)recursiveCreator.con2node.get(iter.getSolutionSet());
                WorksetNode worksetNode = (WorksetNode)recursiveCreator.con2node.get(iter.getWorkset());
                if (worksetNode == null) {
                    throw new CompilerException("In the given plan, the solution set delta does not depend on the workset. This is a prerequisite in workset iterations.");
                }
                iter.getNextWorkset().accept((Visitor)recursiveCreator);
                if (solutionSetNode == null || solutionSetNode.getOutgoingConnections() == null || solutionSetNode.getOutgoingConnections().isEmpty()) {
                    throw new CompilerException("Error: The step function does not reference the solution set.");
                }
                for (PactConnection conn : solutionSetNode.getOutgoingConnections()) {
                    OptimizerNode successor = conn.getTarget();
                    if (successor.getClass() == MatchNode.class) {
                        MatchNode mn = (MatchNode)successor;
                        if (mn.getFirstPredecessorNode() == solutionSetNode) {
                            mn.makeJoinWithSolutionSet(0);
                            continue;
                        }
                        if (mn.getSecondPredecessorNode() == solutionSetNode) {
                            mn.makeJoinWithSolutionSet(1);
                            continue;
                        }
                        throw new CompilerException();
                    }
                    if (successor.getClass() == CoGroupNode.class) {
                        CoGroupNode cg = (CoGroupNode)successor;
                        if (cg.getFirstPredecessorNode() == solutionSetNode) {
                            cg.makeCoGroupWithSolutionSet(0);
                            continue;
                        }
                        if (cg.getSecondPredecessorNode() == solutionSetNode) {
                            cg.makeCoGroupWithSolutionSet(1);
                            continue;
                        }
                        throw new CompilerException();
                    }
                    throw new CompilerException("Error: The only operations allowed on the solution set are Join and CoGroup.");
                }
                OptimizerNode nextWorksetNode = recursiveCreator.con2node.get(iter.getNextWorkset());
                OptimizerNode solutionSetDeltaNode = recursiveCreator.con2node.get(iter.getSolutionSetDelta());
                iterNode.setPartialSolution(solutionSetNode, worksetNode);
                iterNode.setNextPartialSolution(solutionSetDeltaNode, nextWorksetNode);
                StaticDynamicPathIdentifier pathIdentifier = new StaticDynamicPathIdentifier(iterNode.getCostWeight());
                nextWorksetNode.accept(pathIdentifier);
                iterNode.getSolutionSetDelta().accept(pathIdentifier);
            }
        }
    }
}

