/*
 * Decompiled with CFR 0.152.
 */
package de.codecentric.boot.admin.server.services;

import de.codecentric.boot.admin.server.domain.events.InstanceEvent;
import de.codecentric.boot.admin.server.domain.events.InstanceRegisteredEvent;
import de.codecentric.boot.admin.server.domain.events.InstanceRegistrationUpdatedEvent;
import de.codecentric.boot.admin.server.domain.values.InstanceId;
import de.codecentric.boot.admin.server.services.AbstractEventHandler;
import de.codecentric.boot.admin.server.services.StatusUpdater;
import java.time.Duration;
import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
import java.util.logging.Level;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SignalType;
import reactor.core.scheduler.Schedulers;

public class StatusUpdateTrigger
extends AbstractEventHandler<InstanceEvent> {
    private static final Logger log = LoggerFactory.getLogger(StatusUpdateTrigger.class);
    private final StatusUpdater statusUpdater;
    private Map<InstanceId, Instant> lastQueried = new HashMap<InstanceId, Instant>();
    private Duration updateInterval = Duration.ofSeconds(10L);
    private Duration statusLifetime = Duration.ofSeconds(10L);
    private Disposable intervalSubscription;

    public StatusUpdateTrigger(StatusUpdater statusUpdater, Publisher<InstanceEvent> publisher) {
        super(publisher, InstanceEvent.class);
        this.statusUpdater = statusUpdater;
    }

    @Override
    public void start() {
        super.start();
        this.intervalSubscription = Flux.interval((Duration)this.updateInterval).doOnSubscribe(s -> log.debug("Scheduled status update every {}", (Object)this.updateInterval)).log(log.getName(), Level.FINEST, new SignalType[0]).subscribeOn(Schedulers.newSingle((String)"status-monitor")).concatMap(i -> this.updateStatusForAllInstances()).onErrorContinue((ex, value) -> log.warn("Unexpected error while updating statuses", ex)).subscribe();
    }

    @Override
    protected Publisher<Void> handle(Flux<InstanceEvent> publisher) {
        return publisher.subscribeOn(Schedulers.newSingle((String)"status-updater")).filter(event -> event instanceof InstanceRegisteredEvent || event instanceof InstanceRegistrationUpdatedEvent).flatMap(event -> this.updateStatus(event.getInstance()));
    }

    @Override
    public void stop() {
        super.stop();
        if (this.intervalSubscription != null) {
            this.intervalSubscription.dispose();
        }
    }

    protected Mono<Void> updateStatusForAllInstances() {
        log.debug("Updating status for all instances");
        Instant expiryInstant = Instant.now().minus(this.statusLifetime);
        return Flux.fromIterable(this.lastQueried.entrySet()).filter(e -> ((Instant)e.getValue()).isBefore(expiryInstant)).map(Map.Entry::getKey).flatMap(this::updateStatus).then();
    }

    protected Mono<Void> updateStatus(InstanceId instanceId) {
        return this.statusUpdater.updateStatus(instanceId).doFinally(s -> this.lastQueried.put(instanceId, Instant.now()));
    }

    public void setUpdateInterval(Duration updateInterval) {
        this.updateInterval = updateInterval;
    }

    public void setStatusLifetime(Duration statusLifetime) {
        this.statusLifetime = statusLifetime;
    }
}

