package org.springframework.integration.rsocket;

import io.rsocket.RSocketFactory;
import io.rsocket.core.RSocketServer;
import io.rsocket.transport.ServerTransport;
import io.rsocket.transport.netty.server.CloseableChannel;
import io.rsocket.transport.netty.server.TcpServerTransport;
import io.rsocket.transport.netty.server.WebsocketServerTransport;
import java.util.Map;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.lang.Nullable;
import org.springframework.messaging.rsocket.RSocketRequester;
import org.springframework.messaging.rsocket.RSocketStrategies;
import org.springframework.util.Assert;
import org.springframework.util.MimeType;
import reactor.core.publisher.Mono;
import reactor.netty.http.server.HttpServer;

/* loaded from: input_file:org/springframework/integration/rsocket/ServerRSocketConnector.class */
public class ServerRSocketConnector extends AbstractRSocketConnector implements ApplicationEventPublisherAware {
    private final ServerTransport<CloseableChannel> serverTransport;
    private Consumer<RSocketServer> serverConfigurer;
    private Mono<CloseableChannel> serverMono;

    public ServerRSocketConnector(ServerRSocketMessageHandler serverRSocketMessageHandler) {
        super(serverRSocketMessageHandler);
        this.serverConfigurer = rSocketServer -> {
        };
        this.serverTransport = null;
    }

    public ServerRSocketConnector(String str, int i) {
        this((ServerTransport<CloseableChannel>) TcpServerTransport.create(str, i));
    }

    public ServerRSocketConnector(HttpServer httpServer) {
        this((ServerTransport<CloseableChannel>) WebsocketServerTransport.create(httpServer));
    }

    public ServerRSocketConnector(ServerTransport<CloseableChannel> serverTransport) {
        super(new ServerRSocketMessageHandler());
        this.serverConfigurer = rSocketServer -> {
        };
        Assert.notNull(serverTransport, "'serverTransport' must not be null");
        this.serverTransport = serverTransport;
    }

    private ServerRSocketMessageHandler serverRSocketMessageHandler() {
        return (ServerRSocketMessageHandler) this.rSocketMessageHandler;
    }

    @Deprecated
    public void setFactoryConfigurer(Consumer<RSocketFactory.ServerRSocketFactory> consumer) {
        Assert.notNull(consumer, "'factoryConfigurer' must not be null");
        setServerConfigurer(rSocketServer -> {
            consumer.accept(new RSocketFactory.ServerRSocketFactory(rSocketServer));
        });
    }

    public void setServerConfigurer(Consumer<RSocketServer> consumer) {
        this.serverConfigurer = consumer;
    }

    public void setClientRSocketKeyStrategy(BiFunction<Map<String, Object>, DataBuffer, Object> biFunction) {
        if (this.serverTransport != null) {
            serverRSocketMessageHandler().setClientRSocketKeyStrategy(biFunction);
        }
    }

    @Override // org.springframework.integration.rsocket.AbstractRSocketConnector
    public void setDataMimeType(@Nullable MimeType mimeType) {
        if (this.serverTransport != null) {
            super.setDataMimeType(mimeType);
        }
    }

    @Override // org.springframework.integration.rsocket.AbstractRSocketConnector
    public void setMetadataMimeType(MimeType mimeType) {
        if (this.serverTransport != null) {
            super.setMetadataMimeType(mimeType);
        }
    }

    @Override // org.springframework.integration.rsocket.AbstractRSocketConnector
    public void setRSocketStrategies(RSocketStrategies rSocketStrategies) {
        if (this.serverTransport != null) {
            super.setRSocketStrategies(rSocketStrategies);
        }
    }

    @Override // org.springframework.integration.rsocket.AbstractRSocketConnector
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        if (this.serverTransport != null) {
            super.setApplicationContext(applicationContext);
        }
    }

    public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
        if (this.serverTransport != null) {
            serverRSocketMessageHandler().setApplicationEventPublisher(applicationEventPublisher);
        }
    }

    @Override // org.springframework.integration.rsocket.AbstractRSocketConnector
    public void afterPropertiesSet() {
        if (this.serverTransport != null) {
            super.afterPropertiesSet();
            RSocketServer create = RSocketServer.create();
            this.serverConfigurer.accept(create);
            this.serverMono = create.acceptor(serverRSocketMessageHandler().responder()).bind(this.serverTransport).cache();
        }
    }

    public Map<Object, RSocketRequester> getClientRSocketRequesters() {
        return serverRSocketMessageHandler().getClientRSocketRequesters();
    }

    @Nullable
    public RSocketRequester getClientRSocketRequester(Object obj) {
        return serverRSocketMessageHandler().getClientRSocketRequester(obj);
    }

    public Mono<Integer> getBoundPort() {
        return this.serverTransport != null ? this.serverMono.map(closeableChannel -> {
            return Integer.valueOf(closeableChannel.address().getPort());
        }) : Mono.empty();
    }

    @Override // org.springframework.integration.rsocket.AbstractRSocketConnector
    protected void doStart() {
        if (this.serverTransport != null) {
            this.serverMono.subscribe();
        }
    }

    public void destroy() {
        if (this.serverTransport != null) {
            this.serverMono.doOnNext((v0) -> {
                v0.dispose();
            }).subscribe();
        }
    }

    @Override // org.springframework.integration.rsocket.AbstractRSocketConnector
    public void afterSingletonsInstantiated() {
        super.afterSingletonsInstantiated();
        serverRSocketMessageHandler().registerHandleConnectionSetupMethod();
    }
}
