package reactor.ipc.stream;

import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.publisher.Operators;
import reactor.ipc.connector.Inbound;
import reactor.util.Logger;
import reactor.util.Loggers;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:reactor/ipc/stream/StreamOperationsImpl.class */
public final class StreamOperationsImpl<IN, OUT> extends AtomicLong implements StreamOperations {
    static Logger log = Loggers.getLogger(StreamOperationsImpl.class);
    final ConcurrentMap<Long, Subscriber<OUT>> subscribers;
    final ConcurrentMap<Long, Subscription> subscriptions;
    final StreamOutbound remote;
    final String name;
    final OnStream onNew;
    final Runnable onTerminate;
    final AtomicBoolean terminateOnce;
    final Inbound<? extends IN> channel;

    /* JADX INFO: Access modifiers changed from: package-private */
    public StreamOperationsImpl(String str, OnStream onStream, StreamOutbound streamOutbound, Inbound<? extends IN> inbound, Runnable runnable) {
        super(1L);
        this.name = str;
        this.channel = inbound;
        this.remote = streamOutbound;
        this.onNew = onStream;
        this.onTerminate = runnable;
        this.terminateOnce = new AtomicBoolean();
        this.subscribers = new ConcurrentHashMap();
        this.subscriptions = new ConcurrentHashMap();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public long newStreamId() {
        return getAndIncrement();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void registerSubscription(long j, Subscription subscription) {
        if (this.subscriptions.putIfAbsent(Long.valueOf(j), subscription) != null) {
            throw new IllegalStateException("StreamID " + j + " already registered");
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void registerSubscriber(long j, Subscriber<OUT> subscriber) {
        if (this.subscribers.putIfAbsent(Long.valueOf(j), subscriber) != null) {
            throw new IllegalStateException("StreamID " + j + " already registered");
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean deregister(long j) {
        this.subscribers.remove(Long.valueOf(j));
        return this.subscriptions.remove(Long.valueOf(j)) != null;
    }

    @Override // reactor.ipc.stream.StreamOperations
    public void onNew(long j, String str) {
        if (log.isDebugEnabled()) {
            log.debug("{}/onStream/{}/{}", new Object[]{this.name, Long.valueOf(j), str});
        }
        if (this.onNew.onStream(j, str, this)) {
            return;
        }
        if (log.isDebugEnabled()) {
            log.debug("{}/onStream/{} {}", new Object[]{this.name, Long.valueOf(j), "New stream(" + str + ") rejected"});
        }
        sendCancel(j, "New stream(" + str + ") rejected");
    }

    @Override // reactor.ipc.stream.StreamOperations
    public void onNext(long j, Object obj) {
        if (log.isDebugEnabled()) {
            log.debug("{}/onNext/{}/value={}", new Object[]{this.name, Long.valueOf(j), obj});
        }
        Subscriber<OUT> subscriber = this.subscribers.get(Long.valueOf(j));
        if (subscriber != null) {
            try {
                subscriber.onNext(obj);
            } catch (Throwable th) {
                if (log.isDebugEnabled()) {
                    log.debug("{}/onNextError/{}/value={}", new Object[]{this.name, Long.valueOf(j), obj, th});
                }
                sendCancel(j, th.toString());
                subscriber.onError(th);
            }
        }
    }

    @Override // reactor.ipc.stream.StreamOperations
    public void onError(long j, String str) {
        onError(j, new Exception(str));
    }

    @Override // reactor.ipc.stream.StreamOperations
    public void onError(long j, Throwable th) {
        if (log.isDebugEnabled()) {
            log.debug("{}/onError/{}", new Object[]{this.name, Long.valueOf(j), th});
        }
        if (j > 0) {
            Subscriber<OUT> subscriber = this.subscribers.get(Long.valueOf(j));
            if (subscriber != null) {
                subscriber.onError(th);
                return;
            }
        } else if (j < 0) {
            if (this.terminateOnce.compareAndSet(false, true)) {
                this.onTerminate.run();
            }
            if (isClosed()) {
                return;
            }
        }
        Operators.onErrorDropped(th);
    }

    @Override // reactor.ipc.stream.StreamOperations
    public void onComplete(long j) {
        Subscriber<OUT> subscriber = this.subscribers.get(Long.valueOf(j));
        if (subscriber != null) {
            subscriber.onComplete();
        }
    }

    @Override // reactor.ipc.stream.StreamOperations
    public void onCancel(long j, String str) {
        if (log.isDebugEnabled()) {
            log.debug("{}/onCancel/{} {}", new Object[]{this.name, Long.valueOf(j), str});
        }
        Subscription subscription = this.subscriptions.get(Long.valueOf(j));
        if (subscription != null) {
            subscription.cancel();
        }
    }

    @Override // reactor.ipc.stream.StreamOperations
    public void onRequested(long j, long j2) {
        if (log.isDebugEnabled()) {
            log.debug("{}/onRequested/{}/{}", new Object[]{this.name, Long.valueOf(j), Long.valueOf(j2)});
        }
        Subscription subscription = this.subscriptions.get(Long.valueOf(j));
        if (subscription != null) {
            subscription.request(j2);
        }
    }

    @Override // reactor.ipc.stream.StreamOutbound
    public void sendNew(long j, String str) {
        if (log.isDebugEnabled()) {
            log.debug("{}/sendNew/{}/{}", new Object[]{this.name, Long.valueOf(j), str});
        }
        this.remote.sendNew(j, str);
    }

    @Override // reactor.ipc.stream.StreamOutbound
    public void sendCancel(long j, String str) {
        if (log.isDebugEnabled()) {
            log.debug("{}/sendCancel/{} {}", new Object[]{this.name, Long.valueOf(j), str});
        }
        this.remote.sendCancel(j, str);
    }

    @Override // reactor.ipc.stream.StreamOutbound
    public void sendNext(long j, Object obj) throws IOException {
        if (log.isDebugEnabled()) {
            log.debug("{}/sendNext/{}/value={}", new Object[]{this.name, Long.valueOf(j), obj});
        }
        this.remote.sendNext(j, obj);
    }

    @Override // reactor.ipc.stream.StreamOutbound
    public void sendError(long j, Throwable th) {
        if (log.isDebugEnabled()) {
            log.debug("{}/sendError/{}", new Object[]{this.name, Long.valueOf(j), th});
        }
        this.remote.sendError(j, th);
    }

    @Override // reactor.ipc.stream.StreamOutbound
    public void sendComplete(long j) {
        if (log.isDebugEnabled()) {
            log.debug("{}/sendComplete/{}", new Object[]{this.name, Long.valueOf(j)});
        }
        this.remote.sendComplete(j);
    }

    @Override // reactor.ipc.stream.StreamOutbound
    public void sendRequested(long j, long j2) {
        if (log.isDebugEnabled()) {
            log.debug("{}/sendRequested/{}/{}", new Object[]{this.name, Long.valueOf(j), Long.valueOf(j2)});
        }
        this.remote.sendRequested(j, j2);
    }

    @Override // reactor.ipc.stream.StreamOutbound
    public boolean isClosed() {
        return this.remote.isClosed();
    }
}
