package org.springframework.integration.rsocket.inbound;

import io.rsocket.Payload;
import java.util.Arrays;
import java.util.Map;
import org.reactivestreams.Publisher;
import org.springframework.core.ReactiveAdapter;
import org.springframework.core.ResolvableType;
import org.springframework.core.codec.Decoder;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.integration.gateway.MessagingGatewaySupport;
import org.springframework.integration.rsocket.AbstractRSocketConnector;
import org.springframework.integration.rsocket.ClientRSocketConnector;
import org.springframework.integration.rsocket.IntegrationRSocketEndpoint;
import org.springframework.integration.rsocket.RSocketInteractionModel;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageDeliveryException;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.rsocket.PayloadUtils;
import org.springframework.messaging.rsocket.RSocketStrategies;
import org.springframework.util.Assert;
import org.springframework.util.MimeType;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;

/* loaded from: input_file:org/springframework/integration/rsocket/inbound/RSocketInboundGateway.class */
public class RSocketInboundGateway extends MessagingGatewaySupport implements IntegrationRSocketEndpoint {
    private final String[] path;
    private RSocketInteractionModel[] interactionModels = RSocketInteractionModel.values();
    private RSocketStrategies rsocketStrategies = RSocketStrategies.create();

    @Nullable
    private AbstractRSocketConnector rsocketConnector;

    @Nullable
    private ResolvableType requestElementType;
    private boolean decodeFluxAsUnit;

    public RSocketInboundGateway(String... strArr) {
        Assert.notNull(strArr, "'pathArg' must not be null");
        this.path = (String[]) Arrays.copyOf(strArr, strArr.length);
    }

    public void setRSocketStrategies(RSocketStrategies rSocketStrategies) {
        Assert.notNull(rSocketStrategies, "'rsocketStrategies' must not be null");
        this.rsocketStrategies = rSocketStrategies;
    }

    public void setRSocketConnector(AbstractRSocketConnector abstractRSocketConnector) {
        Assert.notNull(abstractRSocketConnector, "'rsocketConnector' must not be null");
        this.rsocketConnector = abstractRSocketConnector;
    }

    public void setInteractionModels(RSocketInteractionModel... rSocketInteractionModelArr) {
        Assert.notNull(rSocketInteractionModelArr, "'interactionModelsArg' must not be null");
        this.interactionModels = (RSocketInteractionModel[]) Arrays.copyOf(rSocketInteractionModelArr, rSocketInteractionModelArr.length);
    }

    @Override // org.springframework.integration.rsocket.IntegrationRSocketEndpoint
    public RSocketInteractionModel[] getInteractionModels() {
        return (RSocketInteractionModel[]) Arrays.copyOf(this.interactionModels, this.interactionModels.length);
    }

    @Override // org.springframework.integration.rsocket.IntegrationRSocketEndpoint
    public String[] getPath() {
        return (String[]) Arrays.copyOf(this.path, this.path.length);
    }

    public void setRequestElementClass(Class<?> cls) {
        setRequestElementType(ResolvableType.forClass(cls));
    }

    public void setRequestElementType(ResolvableType resolvableType) {
        this.requestElementType = resolvableType;
    }

    public void setDecodeFluxAsUnit(boolean z) {
        this.decodeFluxAsUnit = z;
    }

    protected void onInit() {
        super.onInit();
        AbstractRSocketConnector abstractRSocketConnector = this.rsocketConnector;
        if (abstractRSocketConnector != null) {
            abstractRSocketConnector.addEndpoint(this);
            this.rsocketStrategies = abstractRSocketConnector.getRSocketStrategies();
        }
    }

    protected void doStart() {
        super.doStart();
        if (this.rsocketConnector instanceof ClientRSocketConnector) {
            ((ClientRSocketConnector) this.rsocketConnector).connect();
        }
    }

    public Mono<Void> handleMessage(Message<?> message) {
        if (!isRunning()) {
            return Mono.error(new MessageDeliveryException(message, "The RSocket Inbound Gateway '" + getComponentName() + "' is stopped; service for path(s) " + Arrays.toString(this.path) + " is not available at the moment."));
        }
        Mono<Message<?>> decodeRequestMessage = decodeRequestMessage(message);
        MonoProcessor<Flux<Payload>> replyMono = getReplyMono(message);
        return replyMono != null ? decodeRequestMessage.flatMap((v1) -> {
            return sendAndReceiveMessageReactive(v1);
        }).flatMap(message2 -> {
            return new ChannelSendOperator(createReply(message2.getPayload(), message), publisher -> {
                return sendReply(publisher, replyMono);
            });
        }) : decodeRequestMessage.doOnNext((v1) -> {
            send(v1);
        }).then();
    }

    private Mono<Message<?>> decodeRequestMessage(Message<?> message) {
        Object decodePayload = decodePayload(message);
        return decodePayload == null ? Mono.just(message) : Mono.just(decodePayload).map(obj -> {
            return MessageBuilder.withPayload(obj).copyHeaders(message.getHeaders()).build();
        });
    }

    @Nullable
    private Object decodePayload(Message<?> message) {
        ResolvableType resolvableType;
        MimeType mimeType = (MimeType) message.getHeaders().get("contentType", MimeType.class);
        if (this.requestElementType == null) {
            resolvableType = (mimeType == null || !"text".equals(mimeType.getType())) ? ResolvableType.forClass(byte[].class) : ResolvableType.forClass(String.class);
        } else {
            resolvableType = this.requestElementType;
        }
        Object payload = message.getPayload();
        Decoder decoder = this.rsocketStrategies.decoder(resolvableType, mimeType);
        if (payload instanceof DataBuffer) {
            return decoder.decode((DataBuffer) payload, resolvableType, mimeType, (Map) null);
        }
        if (this.decodeFluxAsUnit) {
            return decoder.decode((Publisher) payload, resolvableType, mimeType, (Map) null);
        }
        ResolvableType resolvableType2 = resolvableType;
        return Flux.from((Publisher) payload).handle((dataBuffer, synchronousSink) -> {
            Object decode = decoder.decode(dataBuffer, resolvableType2, mimeType, (Map) null);
            if (decode != null) {
                synchronousSink.next(decode);
            }
        });
    }

    private Flux<DataBuffer> createReply(Object obj, Message<?> message) {
        MessageHeaders headers = message.getHeaders();
        DataBufferFactory dataBufferFactory = (DataBufferFactory) headers.get("dataBufferFactory", DataBufferFactory.class);
        if (dataBufferFactory == null) {
            dataBufferFactory = this.rsocketStrategies.dataBufferFactory();
        }
        return encodeContent(obj, ResolvableType.forInstance(obj), dataBufferFactory, (MimeType) headers.get("contentType", MimeType.class));
    }

    private Flux<DataBuffer> encodeContent(Object obj, ResolvableType resolvableType, DataBufferFactory dataBufferFactory, @Nullable MimeType mimeType) {
        ReactiveAdapter adapter = this.rsocketStrategies.reactiveAdapterRegistry().getAdapter(resolvableType.resolve(), obj);
        return Flux.from(adapter != null ? adapter.toPublisher(obj) : Flux.just(obj)).map(obj2 -> {
            return encodeValue(obj2, dataBufferFactory, mimeType);
        });
    }

    private DataBuffer encodeValue(Object obj, DataBufferFactory dataBufferFactory, @Nullable MimeType mimeType) {
        ResolvableType forInstance = ResolvableType.forInstance(obj);
        return this.rsocketStrategies.encoder(forInstance, mimeType).encodeValue(obj, dataBufferFactory, forInstance, mimeType, (Map) null);
    }

    private Mono<Void> sendReply(Publisher<DataBuffer> publisher, MonoProcessor<Flux<Payload>> monoProcessor) {
        monoProcessor.onNext(Flux.from(publisher).map(PayloadUtils::createPayload));
        monoProcessor.onComplete();
        return Mono.empty();
    }

    @Nullable
    private static MonoProcessor<Flux<Payload>> getReplyMono(Message<?> message) {
        Object obj = message.getHeaders().get("rsocketResponse");
        Assert.state(obj == null || (obj instanceof MonoProcessor), "Expected MonoProcessor");
        return (MonoProcessor) obj;
    }
}
