/*
 * Decompiled with CFR 0.152.
 */
package eu.stratosphere.nephele.profiling.impl;

import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.configuration.GlobalConfiguration;
import eu.stratosphere.nephele.execution.Environment;
import eu.stratosphere.nephele.executiongraph.ExecutionVertexID;
import eu.stratosphere.nephele.instance.InstanceConnectionInfo;
import eu.stratosphere.nephele.ipc.RPC;
import eu.stratosphere.nephele.net.NetUtils;
import eu.stratosphere.nephele.profiling.ProfilingException;
import eu.stratosphere.nephele.profiling.TaskManagerProfiler;
import eu.stratosphere.nephele.profiling.impl.EnvironmentListenerImpl;
import eu.stratosphere.nephele.profiling.impl.EnvironmentThreadSet;
import eu.stratosphere.nephele.profiling.impl.InstanceProfiler;
import eu.stratosphere.nephele.profiling.impl.ProfilerImplProtocol;
import eu.stratosphere.nephele.profiling.impl.types.InternalExecutionVertexThreadProfilingData;
import eu.stratosphere.nephele.profiling.impl.types.InternalInstanceProfilingData;
import eu.stratosphere.nephele.profiling.impl.types.ProfilingDataContainer;
import eu.stratosphere.nephele.taskmanager.Task;
import eu.stratosphere.util.StringUtils;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.lang.management.ThreadMXBean;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class TaskManagerProfilerImpl
extends TimerTask
implements TaskManagerProfiler {
    private static final Log LOG = LogFactory.getLog(TaskManagerProfilerImpl.class);
    private final ProfilerImplProtocol jobManagerProfiler;
    private final Timer timer;
    private final ThreadMXBean tmx;
    private final long timerInterval;
    private final ProfilingDataContainer profilingDataContainer = new ProfilingDataContainer();
    private final InstanceProfiler instanceProfiler;
    private final Map<Environment, EnvironmentThreadSet> monitoredThreads = new HashMap<Environment, EnvironmentThreadSet>();

    public TaskManagerProfilerImpl(InetAddress jobManagerAddress, InstanceConnectionInfo instanceConnectionInfo) throws ProfilingException {
        InetSocketAddress profilingAddress = new InetSocketAddress(jobManagerAddress, GlobalConfiguration.getInteger((String)"jobmanager.profiling.rpc.port", (int)6124));
        ProfilerImplProtocol jobManagerProfilerTmp = null;
        try {
            jobManagerProfilerTmp = RPC.getProxy(ProfilerImplProtocol.class, profilingAddress, NetUtils.getSocketFactory());
        }
        catch (IOException e) {
            throw new ProfilingException(StringUtils.stringifyException((Throwable)e));
        }
        this.jobManagerProfiler = jobManagerProfilerTmp;
        this.tmx = ManagementFactory.getThreadMXBean();
        if (!this.tmx.isThreadContentionMonitoringSupported()) {
            throw new ProfilingException("The thread contention monitoring is not supported.");
        }
        this.tmx.setThreadContentionMonitoringEnabled(true);
        this.instanceProfiler = new InstanceProfiler(instanceConnectionInfo);
        this.timerInterval = GlobalConfiguration.getInteger((String)"taskmanager.profiling.reportinterval", (int)2) * 1000;
        long initialDelay = (long)(Math.random() * (double)this.timerInterval);
        this.timer = new Timer(true);
        this.timer.schedule((TimerTask)this, initialDelay, this.timerInterval);
    }

    @Override
    public void registerExecutionListener(Task task, Configuration jobConfiguration) {
        task.registerExecutionListener(new EnvironmentListenerImpl(this, task.getRuntimeEnvironment()));
    }

    @Override
    public void unregisterExecutionListener(ExecutionVertexID id) {
    }

    @Override
    public void shutdown() {
        this.timer.cancel();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        long timestamp = System.currentTimeMillis();
        InternalInstanceProfilingData instanceProfilingData = null;
        Object object = this.monitoredThreads;
        synchronized (object) {
            for (Environment environment : this.monitoredThreads.keySet()) {
                EnvironmentThreadSet environmentThreadSet = this.monitoredThreads.get(environment);
                InternalExecutionVertexThreadProfilingData threadProfilingData = environmentThreadSet.captureCPUUtilization(environment.getJobID(), this.tmx, timestamp);
                if (threadProfilingData == null) continue;
                this.profilingDataContainer.addProfilingData(threadProfilingData);
            }
            if (!this.monitoredThreads.isEmpty()) {
                try {
                    instanceProfilingData = this.instanceProfiler.generateProfilingData(timestamp);
                }
                catch (ProfilingException e) {
                    LOG.error((Object)"Error while retrieving instance profiling data: ", (Throwable)e);
                }
            }
        }
        object = this.profilingDataContainer;
        synchronized (object) {
            if (instanceProfilingData != null) {
                this.profilingDataContainer.addProfilingData(instanceProfilingData);
            }
            if (!this.profilingDataContainer.isEmpty()) {
                try {
                    this.jobManagerProfiler.reportProfilingData(this.profilingDataContainer);
                    this.profilingDataContainer.clear();
                }
                catch (IOException e) {
                    LOG.error((Object)e);
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void registerMainThreadForCPUProfiling(Environment environment, Thread thread, ExecutionVertexID executionVertexID) {
        Map<Environment, EnvironmentThreadSet> map = this.monitoredThreads;
        synchronized (map) {
            LOG.debug((Object)("Registering thread " + thread.getName() + " for CPU monitoring"));
            if (this.monitoredThreads.containsKey(environment)) {
                LOG.error((Object)("There is already a main thread registered for environment object " + environment.getTaskName()));
            }
            this.monitoredThreads.put(environment, new EnvironmentThreadSet(this.tmx, thread, executionVertexID));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void registerUserThreadForCPUProfiling(Environment environment, Thread userThread) {
        Map<Environment, EnvironmentThreadSet> map = this.monitoredThreads;
        synchronized (map) {
            EnvironmentThreadSet environmentThreadList = this.monitoredThreads.get(environment);
            if (environmentThreadList == null) {
                LOG.error((Object)("Trying to register " + userThread.getName() + " but no main thread found!"));
                return;
            }
            environmentThreadList.addUserThread(this.tmx, userThread);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void unregisterMainThreadFromCPUProfiling(Environment environment, Thread thread) {
        Map<Environment, EnvironmentThreadSet> map = this.monitoredThreads;
        synchronized (map) {
            LOG.debug((Object)("Unregistering thread " + thread.getName() + " from CPU monitoring"));
            EnvironmentThreadSet environmentThreadSet = this.monitoredThreads.remove(environment);
            if (environmentThreadSet != null) {
                if (environmentThreadSet.getMainThread() != thread) {
                    LOG.error((Object)("The thread " + thread.getName() + " is not the main thread of this environment"));
                }
                if (environmentThreadSet.getNumberOfUserThreads() > 0) {
                    LOG.error((Object)("Thread " + environmentThreadSet.getMainThread().getName() + " has still unfinished user threads!"));
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void unregisterUserThreadFromCPUProfiling(Environment environment, Thread userThread) {
        Map<Environment, EnvironmentThreadSet> map = this.monitoredThreads;
        synchronized (map) {
            EnvironmentThreadSet environmentThreadSet = this.monitoredThreads.get(environment);
            if (environmentThreadSet == null) {
                LOG.error((Object)("Trying to unregister " + userThread.getName() + " but no main thread found!"));
                return;
            }
            environmentThreadSet.removeUserThread(userThread);
        }
    }
}

