/*
 * Decompiled with CFR 0.152.
 */
package eu.stratosphere.nephele.instance;

import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.configuration.GlobalConfiguration;
import eu.stratosphere.nephele.instance.AllocatedResource;
import eu.stratosphere.nephele.instance.AllocatedSlot;
import eu.stratosphere.nephele.instance.HardwareDescription;
import eu.stratosphere.nephele.instance.Instance;
import eu.stratosphere.nephele.instance.InstanceConnectionInfo;
import eu.stratosphere.nephele.instance.InstanceException;
import eu.stratosphere.nephele.instance.InstanceListener;
import eu.stratosphere.nephele.instance.InstanceManager;
import eu.stratosphere.nephele.instance.InstanceNotifier;
import eu.stratosphere.nephele.jobgraph.JobID;
import eu.stratosphere.nephele.topology.NetworkNode;
import eu.stratosphere.nephele.topology.NetworkTopology;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class DefaultInstanceManager
implements InstanceManager {
    private static final Log LOG = LogFactory.getLog(DefaultInstanceManager.class);
    private static final int DEFAULT_CLEANUP_INTERVAL = 120;
    private static final String CLEANUP_INTERVAL_KEY = "instancemanager.cluster.cleanupinterval";
    private final Object lock = new Object();
    private final long cleanUpInterval;
    private final Map<InstanceConnectionInfo, Instance> registeredHosts;
    private final NetworkTopology networkTopology;
    private InstanceListener instanceListener;
    private boolean shutdown;
    private final TimerTask cleanupStaleMachines = new TimerTask(){

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            Object object = DefaultInstanceManager.this.lock;
            synchronized (object) {
                ArrayList hostsToRemove = new ArrayList();
                HashMap<JobID, ArrayList<AllocatedResource>> staleResources = new HashMap<JobID, ArrayList<AllocatedResource>>();
                for (Map.Entry entry : DefaultInstanceManager.this.registeredHosts.entrySet()) {
                    Instance host = (Instance)entry.getValue();
                    if (host.isStillAlive(DefaultInstanceManager.this.cleanUpInterval)) continue;
                    Collection<AllocatedSlot> slots = host.removeAllocatedSlots();
                    for (AllocatedSlot slot : slots) {
                        JobID jobID = slot.getJobID();
                        ArrayList<AllocatedResource> staleResourcesOfJob = (ArrayList<AllocatedResource>)staleResources.get(jobID);
                        if (staleResourcesOfJob == null) {
                            staleResourcesOfJob = new ArrayList<AllocatedResource>();
                            staleResources.put(jobID, staleResourcesOfJob);
                        }
                        staleResourcesOfJob.add(new AllocatedResource(host, slot.getAllocationID()));
                    }
                    hostsToRemove.add(entry);
                }
                DefaultInstanceManager.this.registeredHosts.entrySet().removeAll(hostsToRemove);
                for (Map.Entry entry : staleResources.entrySet()) {
                    if (DefaultInstanceManager.this.instanceListener == null) continue;
                    DefaultInstanceManager.this.instanceListener.allocatedResourcesDied((JobID)entry.getKey(), (List)entry.getValue());
                }
            }
        }
    };

    public DefaultInstanceManager() {
        this.registeredHosts = new HashMap<InstanceConnectionInfo, Instance>();
        long tmpCleanUpInterval = (long)GlobalConfiguration.getInteger((String)CLEANUP_INTERVAL_KEY, (int)120) * 1000L;
        if (tmpCleanUpInterval < 10L) {
            LOG.warn((Object)"Invalid clean up interval. Reverting to default cleanup interval of 120 secs.");
            tmpCleanUpInterval = 120L;
        }
        this.cleanUpInterval = tmpCleanUpInterval;
        this.networkTopology = NetworkTopology.createEmptyTopology();
        boolean runTimerAsDaemon = true;
        new Timer(true).schedule(this.cleanupStaleMachines, 1000L, 1000L);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void shutdown() {
        Object object = this.lock;
        synchronized (object) {
            if (this.shutdown) {
                return;
            }
            this.cleanupStaleMachines.cancel();
            Iterator<Instance> it = this.registeredHosts.values().iterator();
            while (it.hasNext()) {
                it.next().destroyProxies();
            }
            this.registeredHosts.clear();
            this.shutdown = true;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void releaseAllocatedResource(AllocatedResource allocatedResource) throws InstanceException {
        Object object = this.lock;
        synchronized (object) {
            Instance clusterInstance = allocatedResource.getInstance();
            clusterInstance.releaseSlot(allocatedResource.getAllocationID());
        }
    }

    private Instance createNewHost(InstanceConnectionInfo instanceConnectionInfo, HardwareDescription hardwareDescription, int numberOfSlots) {
        int pos;
        String instanceName = instanceConnectionInfo.hostname();
        NetworkNode parentNode = this.networkTopology.getRootNode();
        NetworkNode currentStubNode = null;
        while ((currentStubNode = this.networkTopology.getNodeByName(instanceName)) == null && (pos = instanceName.lastIndexOf(46)) != -1) {
            instanceName = instanceName.substring(0, pos);
        }
        if (currentStubNode == null) {
            instanceName = instanceConnectionInfo.address().toString();
            instanceName = instanceName.replaceAll("/", "");
            currentStubNode = this.networkTopology.getNodeByName(instanceName);
        }
        if (currentStubNode != null) {
            if (currentStubNode.getParentNode() != null) {
                parentNode = currentStubNode.getParentNode();
            }
            currentStubNode.remove();
        }
        LOG.info((Object)("Creating instance for " + instanceConnectionInfo + ", parent is " + parentNode.getName()));
        Instance host = new Instance(instanceConnectionInfo, parentNode, this.networkTopology, hardwareDescription, numberOfSlots);
        return host;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void reportHeartBeat(InstanceConnectionInfo instanceConnectionInfo) {
        Object object = this.lock;
        synchronized (object) {
            Instance host = this.registeredHosts.get(instanceConnectionInfo);
            if (host == null) {
                LOG.error((Object)("Task manager with connection info " + instanceConnectionInfo + " has not been registered."));
                return;
            }
            host.reportHeartBeat();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void registerTaskManager(InstanceConnectionInfo instanceConnectionInfo, HardwareDescription hardwareDescription, int numberOfSlots) {
        Object object = this.lock;
        synchronized (object) {
            if (this.registeredHosts.containsKey(instanceConnectionInfo)) {
                LOG.error((Object)("Task manager with connection info " + instanceConnectionInfo + " has already been " + "registered."));
                return;
            }
            Instance host = this.createNewHost(instanceConnectionInfo, hardwareDescription, numberOfSlots);
            if (host == null) {
                LOG.error((Object)("Could not create a new host object for register task manager for connection info " + instanceConnectionInfo));
                return;
            }
            this.registeredHosts.put(instanceConnectionInfo, host);
            LOG.info((Object)("New number of registered hosts is " + this.registeredHosts.size()));
            host.reportHeartBeat();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void requestInstance(JobID jobID, Configuration conf, int requiredSlots) throws InstanceException {
        Object object = this.lock;
        synchronized (object) {
            Iterator<Instance> clusterIterator = this.registeredHosts.values().iterator();
            Instance instance = null;
            ArrayList<AllocatedResource> allocatedResources = new ArrayList<AllocatedResource>();
            int allocatedSlots = 0;
            while (clusterIterator.hasNext()) {
                instance = clusterIterator.next();
                while (instance.getNumberOfAvailableSlots() > 0 && allocatedSlots < requiredSlots) {
                    AllocatedResource resource = instance.allocateSlot(jobID);
                    allocatedResources.add(resource);
                    ++allocatedSlots;
                }
            }
            if (allocatedSlots < requiredSlots) {
                throw new InstanceException("Cannot allocate the required number of slots: " + requiredSlots + ".");
            }
            if (this.instanceListener != null) {
                InstanceNotifier instanceNotifier = new InstanceNotifier(this.instanceListener, jobID, allocatedResources);
                instanceNotifier.start();
            }
        }
    }

    @Override
    public NetworkTopology getNetworkTopology(JobID jobID) {
        return this.networkTopology;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void setInstanceListener(InstanceListener instanceListener) {
        Object object = this.lock;
        synchronized (object) {
            this.instanceListener = instanceListener;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Instance getInstanceByName(String name) {
        if (name == null) {
            throw new IllegalArgumentException("Argument name must not be null");
        }
        Object object = this.lock;
        synchronized (object) {
            for (Instance instance : this.registeredHosts.values()) {
                if (!name.equals(instance.getName())) continue;
                return instance;
            }
        }
        return null;
    }

    @Override
    public int getNumberOfTaskTrackers() {
        return this.registeredHosts.size();
    }

    @Override
    public int getNumberOfSlots() {
        int slots = 0;
        for (Instance instance : this.registeredHosts.values()) {
            slots += instance.getNumberOfSlots();
        }
        return slots;
    }

    @Override
    public Map<InstanceConnectionInfo, Instance> getInstances() {
        return this.registeredHosts;
    }
}

