/*
 * Decompiled with CFR 0.152.
 */
package eu.stratosphere.nephele.jobmanager;

import eu.stratosphere.nephele.event.job.AbstractEvent;
import eu.stratosphere.nephele.event.job.ExecutionStateChangeEvent;
import eu.stratosphere.nephele.event.job.JobEvent;
import eu.stratosphere.nephele.event.job.ManagementEvent;
import eu.stratosphere.nephele.event.job.RecentJobEvent;
import eu.stratosphere.nephele.event.job.VertexAssignmentEvent;
import eu.stratosphere.nephele.event.job.VertexEvent;
import eu.stratosphere.nephele.execution.ExecutionListener;
import eu.stratosphere.nephele.execution.ExecutionState;
import eu.stratosphere.nephele.executiongraph.ExecutionGraph;
import eu.stratosphere.nephele.executiongraph.ExecutionGraphIterator;
import eu.stratosphere.nephele.executiongraph.ExecutionVertex;
import eu.stratosphere.nephele.executiongraph.ExecutionVertexID;
import eu.stratosphere.nephele.executiongraph.InternalJobStatus;
import eu.stratosphere.nephele.executiongraph.JobStatusListener;
import eu.stratosphere.nephele.executiongraph.ManagementGraphFactory;
import eu.stratosphere.nephele.executiongraph.VertexAssignmentListener;
import eu.stratosphere.nephele.instance.AllocatedResource;
import eu.stratosphere.nephele.instance.Instance;
import eu.stratosphere.nephele.jobgraph.JobID;
import eu.stratosphere.nephele.jobgraph.JobStatus;
import eu.stratosphere.nephele.jobgraph.JobVertexID;
import eu.stratosphere.nephele.jobmanager.archive.ArchiveListener;
import eu.stratosphere.nephele.managementgraph.ManagementGraph;
import eu.stratosphere.nephele.managementgraph.ManagementVertex;
import eu.stratosphere.nephele.managementgraph.ManagementVertexID;
import eu.stratosphere.nephele.profiling.ProfilingListener;
import eu.stratosphere.nephele.profiling.types.ProfilingEvent;
import eu.stratosphere.nephele.topology.NetworkTopology;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;

public final class EventCollector
extends TimerTask
implements ProfilingListener {
    private final long timerTaskInterval;
    private final Map<JobID, List<AbstractEvent>> collectedEvents = new HashMap<JobID, List<AbstractEvent>>();
    private final Map<JobID, RecentJobEvent> recentJobs = new HashMap<JobID, RecentJobEvent>();
    private final Map<JobID, ManagementGraph> recentManagementGraphs = new HashMap<JobID, ManagementGraph>();
    private final Map<JobID, NetworkTopology> recentNetworkTopologies = new HashMap<JobID, NetworkTopology>();
    private final Timer timer;
    private List<ArchiveListener> archivists = new ArrayList<ArchiveListener>();

    public EventCollector(int clientQueryInterval) {
        this.timerTaskInterval = (long)clientQueryInterval * 1000L * 2L;
        this.timer = new Timer();
        this.timer.schedule((TimerTask)this, this.timerTaskInterval, this.timerTaskInterval);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void getEventsForJob(JobID jobID, List<AbstractEvent> eventList, boolean includeManagementEvents) {
        Map<JobID, List<AbstractEvent>> map = this.collectedEvents;
        synchronized (map) {
            List<AbstractEvent> eventsForJob = this.collectedEvents.get(jobID);
            if (eventsForJob != null) {
                for (AbstractEvent event : eventsForJob) {
                    boolean isManagementEvent = event instanceof ManagementEvent;
                    if (isManagementEvent && !includeManagementEvents) continue;
                    eventList.add(event);
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void getRecentJobs(List<RecentJobEvent> eventList) {
        Map<JobID, RecentJobEvent> map = this.recentJobs;
        synchronized (map) {
            Iterator<RecentJobEvent> it = this.recentJobs.values().iterator();
            while (it.hasNext()) {
                eventList.add(it.next());
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void shutdown() {
        Map<JobID, Object> map = this.collectedEvents;
        synchronized (map) {
            this.collectedEvents.clear();
        }
        map = this.recentJobs;
        synchronized (map) {
            this.recentJobs.clear();
        }
        this.timer.cancel();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void addEvent(JobID jobID, AbstractEvent event) {
        Map<JobID, List<AbstractEvent>> map = this.collectedEvents;
        synchronized (map) {
            List<AbstractEvent> eventList = this.collectedEvents.get(jobID);
            if (eventList == null) {
                eventList = new ArrayList<AbstractEvent>();
                this.collectedEvents.put(jobID, eventList);
            }
            eventList.add(event);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void updateRecentJobEvent(JobID jobID, String jobName, boolean isProfilingEnabled, long submissionTimestamp, JobStatus jobStatus) {
        long currentTime = System.currentTimeMillis();
        RecentJobEvent recentJobEvent = new RecentJobEvent(jobID, jobName, jobStatus, isProfilingEnabled, submissionTimestamp, currentTime);
        Map<JobID, RecentJobEvent> map = this.recentJobs;
        synchronized (map) {
            this.recentJobs.put(jobID, recentJobEvent);
        }
    }

    public void registerJob(ExecutionGraph executionGraph, boolean profilingAvailable, long submissionTimestamp) {
        ExecutionGraphIterator it = new ExecutionGraphIterator(executionGraph, true);
        while (it.hasNext()) {
            ExecutionVertex vertex = (ExecutionVertex)it.next();
            vertex.registerExecutionListener(new ExecutionListenerWrapper(this, vertex));
            vertex.registerVertexAssignmentListener(new VertexAssignmentListenerWrapper(this, executionGraph.getJobID()));
        }
        executionGraph.registerJobStatusListener(new JobStatusListenerWrapper(this, executionGraph.getJobName(), profilingAvailable, submissionTimestamp));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        Iterator<Object> it;
        long currentTime = System.currentTimeMillis();
        Map<JobID, Object> map = this.collectedEvents;
        synchronized (map) {
            it = this.collectedEvents.keySet().iterator();
            while (it.hasNext()) {
                JobID jobID = it.next();
                List<AbstractEvent> eventList = this.collectedEvents.get(jobID);
                if (eventList == null) continue;
                Iterator<AbstractEvent> it2 = eventList.iterator();
                while (it2.hasNext()) {
                    AbstractEvent event = it2.next();
                    if (event.getTimestamp() + this.timerTaskInterval >= currentTime) continue;
                    this.archiveEvent(jobID, event);
                    it2.remove();
                }
                if (!eventList.isEmpty()) continue;
                it.remove();
            }
        }
        map = this.recentJobs;
        synchronized (map) {
            it = this.recentJobs.entrySet().iterator();
            while (it.hasNext()) {
                Map.Entry entry = (Map.Entry)it.next();
                JobStatus jobStatus = ((RecentJobEvent)entry.getValue()).getJobStatus();
                if (jobStatus != JobStatus.FINISHED && jobStatus != JobStatus.CANCELED && jobStatus != JobStatus.FAILED || ((RecentJobEvent)entry.getValue()).getTimestamp() + this.timerTaskInterval >= currentTime) continue;
                this.archiveJobevent((JobID)entry.getKey(), (RecentJobEvent)entry.getValue());
                it.remove();
                Map<JobID, Object> map2 = this.recentManagementGraphs;
                synchronized (map2) {
                    this.archiveManagementGraph((JobID)entry.getKey(), this.recentManagementGraphs.get(entry.getKey()));
                    this.recentManagementGraphs.remove(entry.getValue());
                }
                map2 = this.recentNetworkTopologies;
                synchronized (map2) {
                    this.archiveNetworkTopology((JobID)entry.getKey(), this.recentNetworkTopologies.get(entry.getKey()));
                    this.recentNetworkTopologies.remove(entry.getValue());
                }
            }
        }
    }

    @Override
    public void processProfilingEvents(ProfilingEvent profilingEvent) {
        this.addEvent(profilingEvent.getJobID(), profilingEvent);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void addManagementGraph(JobID jobID, ManagementGraph managementGraph) {
        Map<JobID, ManagementGraph> map = this.recentManagementGraphs;
        synchronized (map) {
            this.recentManagementGraphs.put(jobID, managementGraph);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public ManagementGraph getManagementGraph(JobID jobID) {
        Map<JobID, ManagementGraph> map = this.recentManagementGraphs;
        synchronized (map) {
            return this.recentManagementGraphs.get(jobID);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void updateManagementGraph(JobID jobID, VertexAssignmentEvent vertexAssignmentEvent) {
        Map<JobID, ManagementGraph> map = this.recentManagementGraphs;
        synchronized (map) {
            ManagementGraph managementGraph = this.recentManagementGraphs.get(jobID);
            if (managementGraph == null) {
                return;
            }
            ManagementVertex vertex = managementGraph.getVertexByID(vertexAssignmentEvent.getVertexID());
            if (vertex == null) {
                return;
            }
            vertex.setInstanceName(vertexAssignmentEvent.getInstanceName());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void updateManagementGraph(JobID jobID, ExecutionStateChangeEvent executionStateChangeEvent, String optionalMessage) {
        Map<JobID, ManagementGraph> map = this.recentManagementGraphs;
        synchronized (map) {
            ManagementGraph managementGraph = this.recentManagementGraphs.get(jobID);
            if (managementGraph == null) {
                return;
            }
            ManagementVertex vertex = managementGraph.getVertexByID(executionStateChangeEvent.getVertexID());
            if (vertex == null) {
                return;
            }
            vertex.setExecutionState(executionStateChangeEvent.getNewExecutionState());
            if (executionStateChangeEvent.getNewExecutionState() == ExecutionState.FAILED) {
                vertex.setOptMessage(optionalMessage);
            }
        }
    }

    public void registerArchivist(ArchiveListener al) {
        this.archivists.add(al);
    }

    private void archiveEvent(JobID jobId, AbstractEvent event) {
        for (ArchiveListener al : this.archivists) {
            al.archiveEvent(jobId, event);
        }
    }

    private void archiveJobevent(JobID jobId, RecentJobEvent event) {
        for (ArchiveListener al : this.archivists) {
            al.archiveJobevent(jobId, event);
        }
    }

    private void archiveManagementGraph(JobID jobId, ManagementGraph graph) {
        for (ArchiveListener al : this.archivists) {
            al.archiveManagementGraph(jobId, graph);
        }
    }

    private void archiveNetworkTopology(JobID jobId, NetworkTopology topology) {
        for (ArchiveListener al : this.archivists) {
            al.archiveNetworkTopology(jobId, topology);
        }
    }

    private static final class VertexAssignmentListenerWrapper
    implements VertexAssignmentListener {
        private final EventCollector eventCollector;
        private final JobID jobID;

        public VertexAssignmentListenerWrapper(EventCollector eventCollector, JobID jobID) {
            this.eventCollector = eventCollector;
            this.jobID = jobID;
        }

        @Override
        public void vertexAssignmentChanged(ExecutionVertexID id, AllocatedResource newAllocatedResource) {
            VertexAssignmentEvent event;
            ManagementVertexID managementVertexID = id.toManagementVertexID();
            long timestamp = System.currentTimeMillis();
            Instance instance = newAllocatedResource.getInstance();
            if (instance == null) {
                event = new VertexAssignmentEvent(timestamp, managementVertexID, "null");
            } else {
                String instanceName = null;
                instanceName = instance.getInstanceConnectionInfo() != null ? instance.getInstanceConnectionInfo().toString() : instance.toString();
                event = new VertexAssignmentEvent(timestamp, managementVertexID, instanceName);
            }
            this.eventCollector.updateManagementGraph(this.jobID, event);
            this.eventCollector.addEvent(this.jobID, event);
        }
    }

    private static final class JobStatusListenerWrapper
    implements JobStatusListener {
        private final EventCollector eventCollector;
        private final String jobName;
        private final boolean isProfilingAvailable;
        private final long submissionTimestamp;

        public JobStatusListenerWrapper(EventCollector eventCollector, String jobName, boolean isProfilingAvailable, long submissionTimestamp) {
            this.eventCollector = eventCollector;
            this.jobName = jobName;
            this.isProfilingAvailable = isProfilingAvailable;
            this.submissionTimestamp = submissionTimestamp;
        }

        @Override
        public void jobStatusHasChanged(ExecutionGraph executionGraph, InternalJobStatus newJobStatus, String optionalMessage) {
            JobStatus jobStatus;
            JobID jobID = executionGraph.getJobID();
            if (newJobStatus == InternalJobStatus.SCHEDULED) {
                ManagementGraph managementGraph = ManagementGraphFactory.fromExecutionGraph(executionGraph);
                this.eventCollector.addManagementGraph(jobID, managementGraph);
            }
            if ((jobStatus = InternalJobStatus.toJobStatus(newJobStatus)) != null) {
                this.eventCollector.updateRecentJobEvent(jobID, this.jobName, this.isProfilingAvailable, this.submissionTimestamp, jobStatus);
                this.eventCollector.addEvent(jobID, new JobEvent(System.currentTimeMillis(), jobStatus, optionalMessage));
            }
        }
    }

    private static final class ExecutionListenerWrapper
    implements ExecutionListener {
        private final EventCollector eventCollector;
        private final ExecutionVertex vertex;

        public ExecutionListenerWrapper(EventCollector eventCollector, ExecutionVertex vertex) {
            this.eventCollector = eventCollector;
            this.vertex = vertex;
        }

        @Override
        public void executionStateChanged(JobID jobID, ExecutionVertexID vertexID, ExecutionState newExecutionState, String optionalMessage) {
            long timestamp = System.currentTimeMillis();
            JobVertexID jobVertexID = this.vertex.getGroupVertex().getJobVertexID();
            String taskName = this.vertex.getGroupVertex().getName();
            int totalNumberOfSubtasks = this.vertex.getGroupVertex().getCurrentNumberOfGroupMembers();
            int indexInSubtaskGroup = this.vertex.getIndexInVertexGroup();
            VertexEvent vertexEvent = new VertexEvent(timestamp, jobVertexID, taskName, totalNumberOfSubtasks, indexInSubtaskGroup, newExecutionState, optionalMessage);
            this.eventCollector.addEvent(jobID, vertexEvent);
            ExecutionStateChangeEvent executionStateChangeEvent = new ExecutionStateChangeEvent(timestamp, vertexID.toManagementVertexID(), newExecutionState);
            this.eventCollector.updateManagementGraph(jobID, executionStateChangeEvent, optionalMessage);
            this.eventCollector.addEvent(jobID, executionStateChangeEvent);
        }

        @Override
        public void userThreadStarted(JobID jobID, ExecutionVertexID vertexID, Thread userThread) {
        }

        @Override
        public void userThreadFinished(JobID jobID, ExecutionVertexID vertexID, Thread userThread) {
        }

        @Override
        public int getPriority() {
            return 20;
        }
    }
}

