package com.netflix.turbine;

import com.netflix.turbine.aggregator.InstanceKey;
import com.netflix.turbine.aggregator.StreamAggregator;
import com.netflix.turbine.aggregator.TypeAndNameKey;
import com.netflix.turbine.discovery.StreamAction;
import com.netflix.turbine.discovery.StreamDiscovery;
import com.netflix.turbine.internal.JsonUtility;
import io.reactivex.netty.RxNetty;
import io.reactivex.netty.pipeline.PipelineConfigurators;
import io.reactivex.netty.protocol.http.client.HttpClientRequest;
import io.reactivex.netty.protocol.text.sse.ServerSentEvent;
import java.net.URI;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.observables.GroupedObservable;

/* loaded from: input_file:com/netflix/turbine/Turbine.class */
public class Turbine {
    private static final Logger logger = LoggerFactory.getLogger(Turbine.class);

    public static void startServerSentEventServer(int i, Observable<GroupedObservable<TypeAndNameKey, Map<String, Object>>> observable) {
        logger.info("Turbine => Starting server on " + i);
        Observable refCount = observable.doOnUnsubscribe(() -> {
            logger.info("Turbine => Unsubscribing aggregation.");
        }).doOnSubscribe(() -> {
            logger.info("Turbine => Starting aggregation");
        }).flatMap(groupedObservable -> {
            return groupedObservable;
        }).publish().refCount();
        RxNetty.createHttpServer(i, (httpServerRequest, httpServerResponse) -> {
            logger.info("Turbine => SSE Request Received");
            httpServerResponse.getHeaders().setHeader("Content-Type", "text/event-stream");
            return refCount.doOnUnsubscribe(() -> {
                logger.info("Turbine => Unsubscribing RxNetty server connection");
            }).flatMap(map -> {
                return httpServerResponse.writeAndFlush(new ServerSentEvent((String) null, (String) null, JsonUtility.mapToJson(map)));
            });
        }, PipelineConfigurators.sseServerConfigurator()).startAndWait();
    }

    public static void startServerSentEventServer(int i, StreamDiscovery streamDiscovery) {
        startServerSentEventServer(i, aggregateHttpSSE(streamDiscovery));
    }

    public static Observable<GroupedObservable<TypeAndNameKey, Map<String, Object>>> aggregateHttpSSE(URI... uriArr) {
        return aggregateHttpSSE(() -> {
            return Observable.from(uriArr).map(uri -> {
                return StreamAction.create(StreamAction.ActionType.ADD, uri);
            }).concatWith(Observable.never());
        });
    }

    public static Observable<GroupedObservable<TypeAndNameKey, Map<String, Object>>> aggregateHttpSSE(StreamDiscovery streamDiscovery) {
        Observable refCount = streamDiscovery.getInstanceList().publish().refCount();
        Observable filter = refCount.filter(streamAction -> {
            return Boolean.valueOf(streamAction.getType() == StreamAction.ActionType.ADD);
        });
        Observable filter2 = refCount.filter(streamAction2 -> {
            return Boolean.valueOf(streamAction2.getType() == StreamAction.ActionType.REMOVE);
        });
        return StreamAggregator.aggregateGroupedStreams(filter.map(streamAction3 -> {
            URI uri = streamAction3.getUri();
            return GroupedObservable.from(InstanceKey.create(uri.toASCIIString()), Observable.defer(() -> {
                return RxNetty.createHttpClient(uri.getHost(), uri.getPort(), PipelineConfigurators.sseClientConfigurator()).submit(HttpClientRequest.createGet(uri.toASCIIString())).flatMap(httpClientResponse -> {
                    return httpClientResponse.getStatus().code() != 200 ? Observable.error(new RuntimeException("Failed to connect: " + httpClientResponse.getStatus())) : httpClientResponse.getContent().doOnSubscribe(() -> {
                        logger.info("Turbine => Aggregate Stream from URI: " + uri.toASCIIString());
                    }).doOnUnsubscribe(() -> {
                        logger.info("Turbine => Unsubscribing Stream: " + uri);
                    }).takeUntil(filter2.filter(streamAction3 -> {
                        return Boolean.valueOf(streamAction3.getUri().equals(streamAction3.getUri()));
                    })).map(serverSentEvent -> {
                        return JsonUtility.jsonToMap(serverSentEvent.getEventData());
                    });
                }).retryWhen(observable -> {
                    return observable.flatMap(th -> {
                        return Observable.timer(1L, TimeUnit.SECONDS).doOnEach(notification -> {
                            logger.info("Turbine => Retrying connection to: " + uri);
                        });
                    });
                });
            }));
        }));
    }
}
