/*
 * Decompiled with CFR 0.152.
 */
package stream.monitor;

import java.util.concurrent.atomic.AtomicLong;
import stream.Data;
import stream.ProcessorList;
import stream.annotations.Description;
import stream.data.Statistics;
import stream.statistics.StatisticsService;

@Description(group="Streams.Monitoring", text="Measures the time per item of inner processors")
public class AverageThroughput
extends ProcessorList
implements StatisticsService {
    AtomicLong nanoTime = new AtomicLong(0L);
    AtomicLong itemsProcessed = new AtomicLong(0L);

    public Data process(Data input) {
        long start = System.currentTimeMillis();
        Data processed = super.process(input);
        long duration = System.currentTimeMillis() - start;
        this.nanoTime.addAndGet(duration);
        this.itemsProcessed.incrementAndGet();
        return processed;
    }

    public void reset() throws Exception {
        this.nanoTime.set(0L);
        this.itemsProcessed.set(0L);
    }

    @Override
    public Statistics getStatistics() {
        Statistics stats = new Statistics();
        Double time = this.nanoTime.doubleValue();
        Double items = this.itemsProcessed.doubleValue();
        stats.setName("Average Throughput");
        stats.put((Object)"@avg:milliseconds-per-item", (Object)(time / items));
        return stats;
    }
}

