/*
 * Decompiled with CFR 0.152.
 */
package eu.stratosphere.nephele.jobmanager.splitassigner;

import eu.stratosphere.core.io.InputSplit;
import eu.stratosphere.nephele.executiongraph.ExecutionGroupVertex;
import eu.stratosphere.nephele.executiongraph.ExecutionVertex;
import eu.stratosphere.nephele.jobgraph.JobID;
import eu.stratosphere.nephele.jobmanager.splitassigner.InputSplitAssigner;
import java.util.Arrays;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class DefaultInputSplitAssigner
implements InputSplitAssigner {
    private static final Log LOG = LogFactory.getLog(DefaultInputSplitAssigner.class);
    private final ConcurrentMap<ExecutionGroupVertex, Queue<InputSplit>> splitMap = new ConcurrentHashMap<ExecutionGroupVertex, Queue<InputSplit>>();

    @Override
    public void registerGroupVertex(ExecutionGroupVertex groupVertex) {
        InputSplit[] inputSplits = groupVertex.getInputSplits();
        if (inputSplits == null) {
            return;
        }
        if (inputSplits.length == 0) {
            return;
        }
        ConcurrentLinkedQueue<InputSplit> queue = new ConcurrentLinkedQueue<InputSplit>();
        if (this.splitMap.putIfAbsent(groupVertex, queue) != null) {
            LOG.error((Object)("Group vertex " + groupVertex.getName() + " already has a split queue"));
        }
        queue.addAll(Arrays.asList(inputSplits));
    }

    @Override
    public void unregisterGroupVertex(ExecutionGroupVertex groupVertex) {
        this.splitMap.remove(groupVertex);
    }

    @Override
    public InputSplit getNextInputSplit(ExecutionVertex vertex) {
        Queue queue = (Queue)this.splitMap.get(vertex.getGroupVertex());
        if (queue == null) {
            JobID jobID = vertex.getExecutionGraph().getJobID();
            LOG.error((Object)("Cannot find split queue for vertex " + vertex.getGroupVertex() + " (job " + jobID + ")"));
            return null;
        }
        InputSplit nextSplit = (InputSplit)queue.poll();
        if (LOG.isDebugEnabled() && nextSplit != null) {
            LOG.debug((Object)("Assigning split " + nextSplit.getSplitNumber() + " to " + vertex));
        }
        return nextSplit;
    }
}

