/*
 * Decompiled with CFR 0.152.
 */
package eu.stratosphere.nephele.jobmanager.splitassigner;

import eu.stratosphere.configuration.GlobalConfiguration;
import eu.stratosphere.core.fs.FileInputSplit;
import eu.stratosphere.core.io.GenericInputSplit;
import eu.stratosphere.core.io.InputSplit;
import eu.stratosphere.nephele.executiongraph.ExecutionGraph;
import eu.stratosphere.nephele.executiongraph.ExecutionGroupVertex;
import eu.stratosphere.nephele.executiongraph.ExecutionGroupVertexIterator;
import eu.stratosphere.nephele.executiongraph.ExecutionVertex;
import eu.stratosphere.nephele.jobgraph.JobID;
import eu.stratosphere.nephele.jobmanager.splitassigner.DefaultInputSplitAssigner;
import eu.stratosphere.nephele.jobmanager.splitassigner.InputSplitAssigner;
import eu.stratosphere.nephele.jobmanager.splitassigner.InputSplitTracker;
import eu.stratosphere.nephele.jobmanager.splitassigner.file.FileInputSplitAssigner;
import eu.stratosphere.util.StringUtils;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public final class InputSplitManager {
    private static final Log LOG = LogFactory.getLog(InputSplitManager.class);
    private static final String INPUT_SPLIT_CONFIG_KEY_PREFIX = "inputsplit.assigner.";
    private final Map<ExecutionGroupVertex, InputSplitAssigner> assignerCache = new ConcurrentHashMap<ExecutionGroupVertex, InputSplitAssigner>();
    private final Map<Class<? extends InputSplit>, InputSplitAssigner> loadedAssigners = new HashMap<Class<? extends InputSplit>, InputSplitAssigner>();
    private final InputSplitTracker inputSplitTracker = new InputSplitTracker();
    private final InputSplitAssigner defaultAssigner = new DefaultInputSplitAssigner();

    public void registerJob(ExecutionGraph executionGraph) {
        ExecutionGroupVertexIterator it = new ExecutionGroupVertexIterator(executionGraph, true, -1);
        while (it.hasNext()) {
            ExecutionGroupVertex groupVertex = (ExecutionGroupVertex)it.next();
            InputSplit[] inputSplits = groupVertex.getInputSplits();
            if (inputSplits == null || inputSplits.length == 0) continue;
            InputSplitAssigner assigner = this.getAssignerByType(groupVertex.getInputSplitType(), true);
            this.assignerCache.put(groupVertex, assigner);
            assigner.registerGroupVertex(groupVertex);
        }
        this.inputSplitTracker.registerJob(executionGraph);
    }

    public void unregisterJob(ExecutionGraph executionGraph) {
        ExecutionGroupVertexIterator it = new ExecutionGroupVertexIterator(executionGraph, true, -1);
        while (it.hasNext()) {
            ExecutionGroupVertex groupVertex = (ExecutionGroupVertex)it.next();
            InputSplit[] inputSplits = groupVertex.getInputSplits();
            if (inputSplits == null || inputSplits.length == 0) continue;
            InputSplitAssigner assigner = this.assignerCache.remove(groupVertex);
            if (assigner == null) {
                LOG.error((Object)("Group vertex " + groupVertex.getName() + " is unregistered, but cannot be found in assigner cache"));
                continue;
            }
            assigner.unregisterGroupVertex(groupVertex);
        }
        this.inputSplitTracker.unregisterJob(executionGraph);
    }

    public InputSplit getNextInputSplit(ExecutionVertex vertex, int sequenceNumber) {
        InputSplit nextInputSplit = this.inputSplitTracker.getInputSplitFromLog(vertex, sequenceNumber);
        if (nextInputSplit != null) {
            LOG.info((Object)("Input split " + nextInputSplit.getSplitNumber() + " for vertex " + vertex + " replayed from log"));
            return nextInputSplit;
        }
        ExecutionGroupVertex groupVertex = vertex.getGroupVertex();
        InputSplitAssigner inputSplitAssigner = this.assignerCache.get(groupVertex);
        if (inputSplitAssigner == null) {
            JobID jobID = groupVertex.getExecutionStage().getExecutionGraph().getJobID();
            LOG.error((Object)("Cannot find input assigner for group vertex " + groupVertex.getName() + " (job " + jobID + ")"));
            return null;
        }
        nextInputSplit = inputSplitAssigner.getNextInputSplit(vertex);
        if (nextInputSplit != null) {
            this.inputSplitTracker.addInputSplitToLog(vertex, sequenceNumber, nextInputSplit);
            LOG.info((Object)(vertex + " receives input split " + nextInputSplit.getSplitNumber()));
        }
        return nextInputSplit;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private InputSplitAssigner getAssignerByType(Class<? extends InputSplit> inputSplitType, boolean allowLoading) {
        Map<Class<? extends InputSplit>, InputSplitAssigner> map = this.loadedAssigners;
        synchronized (map) {
            InputSplitAssigner assigner = this.loadedAssigners.get(inputSplitType);
            if (assigner == null && allowLoading && (assigner = this.loadInputSplitAssigner(inputSplitType)) != null) {
                this.loadedAssigners.put(inputSplitType, assigner);
            }
            if (assigner != null) {
                return assigner;
            }
        }
        LOG.warn((Object)("Unable to find specific input split provider for type " + inputSplitType.getName() + ", using default assigner"));
        return this.defaultAssigner;
    }

    private InputSplitAssigner loadInputSplitAssigner(Class<? extends InputSplit> inputSplitType) {
        String className = inputSplitType.getName();
        String assignerKey = INPUT_SPLIT_CONFIG_KEY_PREFIX + className;
        LOG.info((Object)("Trying to load input split assigner for type " + className));
        String assignerClassName = GlobalConfiguration.getString((String)assignerKey, null);
        if (assignerClassName == null) {
            if (FileInputSplit.class == inputSplitType) {
                return new FileInputSplitAssigner();
            }
            if (GenericInputSplit.class == inputSplitType) {
                return new DefaultInputSplitAssigner();
            }
            return null;
        }
        try {
            Class<InputSplitAssigner> assignerClass = Class.forName(assignerClassName).asSubclass(InputSplitAssigner.class);
            return assignerClass.newInstance();
        }
        catch (Exception e) {
            LOG.error((Object)StringUtils.stringifyException((Throwable)e));
            return null;
        }
    }
}

