/*
 * Decompiled with CFR 0.152.
 */
package stream.monitor;

import java.io.Serializable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import stream.Data;
import stream.ProcessContext;
import stream.monitor.StreamMonitor;
import stream.monitor.TimeRateService;

public class TimeRate
extends StreamMonitor
implements TimeRateService {
    static Logger logger = LoggerFactory.getLogger(TimeRate.class);
    protected Long start;
    protected Long startIndex;
    protected Long nowIndex;
    protected long n;
    protected float mean;
    protected Float rate;
    protected Float time;
    protected Integer every = null;
    protected String index;

    public TimeRate() {
        try {
            this.reset();
        }
        catch (Exception e) {
            e.printStackTrace();
        }
    }

    public String getIndex() {
        return this.index;
    }

    public void setIndex(String index) {
        this.index = index;
    }

    public Integer getEvery() {
        return this.every;
    }

    public void setEvery(Integer every) {
        this.every = every;
    }

    @Override
    public void init(ProcessContext ctx) throws Exception {
        if (this.dweet.booleanValue()) {
            this.keys = new String[]{"@timeRate"};
        }
        super.init(ctx);
    }

    public Data process(Data data) {
        Long now;
        long diff;
        if (this.start == null) {
            this.start = System.currentTimeMillis();
            this.startIndex = this.getIndex(data);
        }
        if ((diff = (now = Long.valueOf(System.currentTimeMillis())) - this.start) > (long)this.every.intValue()) {
            this.nowIndex = this.getIndex(data);
            if (this.nowIndex != null) {
                long indexDiff = this.nowIndex - this.startIndex;
                this.rate = Float.valueOf(1.0f * (float)indexDiff / (float)diff);
                this.time = Float.valueOf(1.0f * (float)indexDiff / 1000.0f);
                data.put((Object)"@timeRate", (Object)this.rate);
                data.put((Object)"@processedTime", (Object)this.time);
                if (this.log.booleanValue()) {
                    logger.info("Time rate {}. {} time (s) processed. @index={}.Time-rate is: {}/second", new Object[]{this.getId(), this.time, this.nowIndex, this.rate});
                }
                if (this.dweet.booleanValue()) {
                    ++this.n;
                    float delta = this.rate.floatValue() - this.mean;
                    this.mean += delta / (float)this.n;
                    data.put((Object)"@timeRate", (Object)Float.valueOf(this.mean));
                    this.dweetWriter.process(data);
                }
                this.start = now;
                this.startIndex = this.nowIndex;
            }
        }
        return data;
    }

    private Long getIndex(Data data) {
        Serializable s = (Serializable)data.get((Object)this.index);
        if (s != null && s instanceof Long) {
            return (Long)s;
        }
        return null;
    }

    public void finish() throws Exception {
        super.finish();
        logger.info("TimeRate finished");
    }

    @Override
    public Double getTimeRate() {
        return new Double(this.rate.floatValue());
    }

    public void reset() throws Exception {
        this.n = 0L;
        this.start = null;
        this.startIndex = null;
        this.nowIndex = null;
        this.rate = new Float(0.0f);
        this.time = new Float(0.0f);
        this.mean = 0.0f;
    }
}

