package org.springframework.messaging.simp.stomp;

import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageDeliveryException;
import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
import org.springframework.messaging.simp.SimpMessageType;
import org.springframework.messaging.simp.handler.AbstractBrokerMessageHandler;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.messaging.support.tcp.FixedIntervalReconnectStrategy;
import org.springframework.messaging.support.tcp.ReactorNettyTcpClient;
import org.springframework.messaging.support.tcp.TcpConnection;
import org.springframework.messaging.support.tcp.TcpConnectionHandler;
import org.springframework.messaging.support.tcp.TcpOperations;
import org.springframework.util.Assert;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.util.concurrent.ListenableFutureTask;

/* loaded from: input_file:org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.class */
public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler {
    private static final byte[] EMPTY_PAYLOAD = new byte[0];
    private static final Message<byte[]> HEARTBEAT_MESSAGE = MessageBuilder.withPayload(new byte[]{10}).setHeaders(SimpMessageHeaderAccessor.create(SimpMessageType.HEARTBEAT)).build();
    private static final long HEARTBEAT_MULTIPLIER = 3;
    private final MessageChannel messageChannel;
    private String relayHost;
    private int relayPort;
    private String systemLogin;
    private String systemPasscode;
    private long systemHeartbeatSendInterval;
    private long systemHeartbeatReceiveInterval;
    private String virtualHost;
    private TcpOperations<byte[]> tcpClient;
    private final Map<String, StompConnectionHandler> connectionHandlers;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler$StompConnectionHandler.class */
    public class StompConnectionHandler implements TcpConnectionHandler<byte[]> {
        private final String sessionId;
        private final boolean isRemoteClientSession;
        private final StompHeaderAccessor connectHeaders;
        private volatile TcpConnection<byte[]> tcpConnection;
        private volatile boolean isStompConnected;

        private StompConnectionHandler(StompBrokerRelayMessageHandler stompBrokerRelayMessageHandler, String str, StompHeaderAccessor stompHeaderAccessor) {
            this(str, stompHeaderAccessor, true);
        }

        private StompConnectionHandler(String str, StompHeaderAccessor stompHeaderAccessor, boolean z) {
            Assert.notNull(str, "sessionId is required");
            Assert.notNull(stompHeaderAccessor, "connectHeaders is required");
            this.sessionId = str;
            this.connectHeaders = stompHeaderAccessor;
            this.isRemoteClientSession = z;
        }

        public String getSessionId() {
            return this.sessionId;
        }

        @Override // org.springframework.messaging.support.tcp.TcpConnectionHandler
        public void afterConnected(TcpConnection<byte[]> tcpConnection) {
            this.tcpConnection = tcpConnection;
            tcpConnection.send(MessageBuilder.withPayload(StompBrokerRelayMessageHandler.EMPTY_PAYLOAD).setHeaders(this.connectHeaders).build());
        }

        @Override // org.springframework.messaging.support.tcp.TcpConnectionHandler
        public void afterConnectFailure(Throwable th) {
            handleTcpConnectionFailure("Failed to connect to message broker", th);
        }

        protected void handleTcpConnectionFailure(String str, Throwable th) {
            if (StompBrokerRelayMessageHandler.this.logger.isErrorEnabled()) {
                StompBrokerRelayMessageHandler.this.logger.error(str + ", sessionId=" + this.sessionId, th);
            }
            resetTcpConnection();
            sendStompErrorToClient(str);
        }

        private void sendStompErrorToClient(String str) {
            if (!this.isRemoteClientSession || StompBrokerRelayMessageHandler.this.removeConnectionHandler(this.sessionId) == null) {
                return;
            }
            StompHeaderAccessor create = StompHeaderAccessor.create(StompCommand.ERROR);
            create.setSessionId(this.sessionId);
            create.setMessage(str);
            sendMessageToClient(MessageBuilder.withPayload(StompBrokerRelayMessageHandler.EMPTY_PAYLOAD).setHeaders(create).build());
        }

        protected void sendMessageToClient(Message<?> message) {
            if (this.isRemoteClientSession) {
                StompBrokerRelayMessageHandler.this.messageChannel.send(message);
            }
        }

        @Override // org.springframework.messaging.support.tcp.TcpConnectionHandler
        public void handleMessage(Message<byte[]> message) {
            if (StompBrokerRelayMessageHandler.this.logger.isTraceEnabled()) {
                StompBrokerRelayMessageHandler.this.logger.trace("Reading message for sessionId=" + this.sessionId + ", " + message);
            }
            StompHeaderAccessor wrap = StompHeaderAccessor.wrap((Message<?>) message);
            if (StompCommand.CONNECTED == wrap.getCommand()) {
                afterStompConnected(wrap);
            }
            wrap.setSessionId(this.sessionId);
            sendMessageToClient(MessageBuilder.withPayload(message.getPayload()).setHeaders(wrap).build());
        }

        protected void afterStompConnected(StompHeaderAccessor stompHeaderAccessor) {
            this.isStompConnected = true;
            initHeartbeats(stompHeaderAccessor);
        }

        private void initHeartbeats(StompHeaderAccessor stompHeaderAccessor) {
            if (this.isRemoteClientSession) {
                return;
            }
            long j = this.connectHeaders.getHeartbeat()[0];
            long j2 = this.connectHeaders.getHeartbeat()[1];
            long j3 = stompHeaderAccessor.getHeartbeat()[0];
            long j4 = stompHeaderAccessor.getHeartbeat()[1];
            if (j > 0 && j4 > 0) {
                this.tcpConnection.onWriteInactivity(new Runnable() { // from class: org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler.StompConnectionHandler.1
                    @Override // java.lang.Runnable
                    public void run() {
                        TcpConnection tcpConnection = StompConnectionHandler.this.tcpConnection;
                        if (tcpConnection != null) {
                            tcpConnection.send(StompBrokerRelayMessageHandler.HEARTBEAT_MESSAGE).addCallback(new ListenableFutureCallback<Void>() { // from class: org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler.StompConnectionHandler.1.1
                                public void onFailure(Throwable th) {
                                    StompConnectionHandler.this.handleTcpConnectionFailure("Failed to send heartbeat", th);
                                }

                                public void onSuccess(Void r2) {
                                }
                            });
                        }
                    }
                }, Math.max(j, j4));
            }
            if (j2 <= 0 || j3 <= 0) {
                return;
            }
            final long max = Math.max(j2, j3) * StompBrokerRelayMessageHandler.HEARTBEAT_MULTIPLIER;
            this.tcpConnection.onReadInactivity(new Runnable() { // from class: org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler.StompConnectionHandler.2
                @Override // java.lang.Runnable
                public void run() {
                    StompConnectionHandler.this.handleTcpConnectionFailure("No hearbeat from broker for more than " + max + "ms, closing connection", null);
                }
            }, max);
        }

        @Override // org.springframework.messaging.support.tcp.TcpConnectionHandler
        public void afterConnectionClosed() {
            sendStompErrorToClient("Connection to broker closed");
        }

        public ListenableFuture<Void> forward(final Message<?> message) {
            if (!this.isStompConnected) {
                if (StompBrokerRelayMessageHandler.this.logger.isWarnEnabled()) {
                    StompBrokerRelayMessageHandler.this.logger.warn("Connection to broker inactive or not ready, ignoring message=" + message);
                }
                return new ListenableFutureTask(new Callable<Void>() { // from class: org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler.StompConnectionHandler.3
                    /* JADX WARN: Can't rename method to resolve collision */
                    @Override // java.util.concurrent.Callable
                    public Void call() throws Exception {
                        return null;
                    }
                });
            }
            if (StompBrokerRelayMessageHandler.this.logger.isTraceEnabled()) {
                StompBrokerRelayMessageHandler.this.logger.trace("Forwarding message to broker: " + message);
            }
            ListenableFuture<Void> send = this.tcpConnection.send(message);
            send.addCallback(new ListenableFutureCallback<Void>() { // from class: org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler.StompConnectionHandler.4
                public void onSuccess(Void r4) {
                    if (StompHeaderAccessor.wrap((Message<?>) message).getCommand() == StompCommand.DISCONNECT) {
                        StompConnectionHandler.this.resetTcpConnection();
                    }
                }

                public void onFailure(Throwable th) {
                    StompConnectionHandler.this.handleTcpConnectionFailure("Failed to send message " + message, th);
                }
            });
            return send;
        }

        public void resetTcpConnection() {
            TcpConnection<byte[]> tcpConnection = this.tcpConnection;
            this.isStompConnected = false;
            this.tcpConnection = null;
            if (tcpConnection != null) {
                try {
                    this.tcpConnection.close();
                } catch (Throwable th) {
                }
            }
        }
    }

    /* loaded from: input_file:org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler$SystemStompConnectionHandler.class */
    private class SystemStompConnectionHandler extends StompConnectionHandler {
        public static final String SESSION_ID = "stompRelaySystemSessionId";

        public SystemStompConnectionHandler(StompHeaderAccessor stompHeaderAccessor) {
            super(SESSION_ID, stompHeaderAccessor, false);
        }

        @Override // org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler.StompConnectionHandler
        protected void afterStompConnected(StompHeaderAccessor stompHeaderAccessor) {
            super.afterStompConnected(stompHeaderAccessor);
            StompBrokerRelayMessageHandler.this.publishBrokerAvailableEvent();
        }

        @Override // org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler.StompConnectionHandler
        protected void handleTcpConnectionFailure(String str, Throwable th) {
            super.handleTcpConnectionFailure(str, th);
            StompBrokerRelayMessageHandler.this.publishBrokerUnavailableEvent();
        }

        @Override // org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler.StompConnectionHandler, org.springframework.messaging.support.tcp.TcpConnectionHandler
        public void afterConnectionClosed() {
            super.afterConnectionClosed();
            StompBrokerRelayMessageHandler.this.publishBrokerUnavailableEvent();
        }

        @Override // org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler.StompConnectionHandler
        public ListenableFuture<Void> forward(Message<?> message) {
            try {
                ListenableFuture<Void> forward = super.forward(message);
                forward.get();
                return forward;
            } catch (Throwable th) {
                throw new MessageDeliveryException(message, th);
            }
        }
    }

    public StompBrokerRelayMessageHandler(MessageChannel messageChannel, Collection<String> collection) {
        super(collection);
        this.relayHost = "127.0.0.1";
        this.relayPort = 61613;
        this.systemLogin = "guest";
        this.systemPasscode = "guest";
        this.systemHeartbeatSendInterval = 10000L;
        this.systemHeartbeatReceiveInterval = 10000L;
        this.connectionHandlers = new ConcurrentHashMap();
        Assert.notNull(messageChannel, "messageChannel is required");
        this.messageChannel = messageChannel;
    }

    public void setRelayHost(String str) {
        Assert.hasText(str, "relayHost must not be empty");
        this.relayHost = str;
    }

    public String getRelayHost() {
        return this.relayHost;
    }

    public void setRelayPort(int i) {
        this.relayPort = i;
    }

    public int getRelayPort() {
        return this.relayPort;
    }

    public void setSystemHeartbeatSendInterval(long j) {
        this.systemHeartbeatSendInterval = j;
    }

    public long getSystemHeartbeatSendInterval() {
        return this.systemHeartbeatSendInterval;
    }

    public void setSystemHeartbeatReceiveInterval(long j) {
        this.systemHeartbeatReceiveInterval = j;
    }

    public long getSystemHeartbeatReceiveInterval() {
        return this.systemHeartbeatReceiveInterval;
    }

    public void setSystemLogin(String str) {
        Assert.hasText(str, "systemLogin must not be empty");
        this.systemLogin = str;
    }

    public String getSystemLogin() {
        return this.systemLogin;
    }

    public void setSystemPasscode(String str) {
        this.systemPasscode = str;
    }

    public String getSystemPasscode() {
        return this.systemPasscode;
    }

    public void setVirtualHost(String str) {
        this.virtualHost = str;
    }

    public String getVirtualHost() {
        return this.virtualHost;
    }

    void setTcpClient(TcpOperations<byte[]> tcpOperations) {
        this.tcpClient = tcpOperations;
    }

    @Override // org.springframework.messaging.simp.handler.AbstractBrokerMessageHandler
    protected void startInternal() {
        if (this.tcpClient == null) {
            this.tcpClient = new ReactorNettyTcpClient(this.relayHost, this.relayPort, new StompCodec());
        }
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("Initializing \"system\" TCP connection");
        }
        StompHeaderAccessor create = StompHeaderAccessor.create(StompCommand.CONNECT);
        create.setAcceptVersion("1.1,1.2");
        create.setLogin(this.systemLogin);
        create.setPasscode(this.systemPasscode);
        create.setHeartbeat(this.systemHeartbeatSendInterval, this.systemHeartbeatReceiveInterval);
        create.setHost(getVirtualHost());
        SystemStompConnectionHandler systemStompConnectionHandler = new SystemStompConnectionHandler(create);
        this.connectionHandlers.put(systemStompConnectionHandler.getSessionId(), systemStompConnectionHandler);
        this.tcpClient.connect(systemStompConnectionHandler, new FixedIntervalReconnectStrategy(5000L));
    }

    @Override // org.springframework.messaging.simp.handler.AbstractBrokerMessageHandler
    protected void stopInternal() {
        Iterator<StompConnectionHandler> it = this.connectionHandlers.values().iterator();
        while (it.hasNext()) {
            try {
                it.next().resetTcpConnection();
            } catch (Throwable th) {
                this.logger.error("Failed to close STOMP connection " + th.getMessage());
            }
        }
        try {
            this.tcpClient.shutdown();
        } catch (Throwable th2) {
            this.logger.error("Error while shutting down TCP client", th2);
        }
    }

    @Override // org.springframework.messaging.simp.handler.AbstractBrokerMessageHandler
    protected void handleMessageInternal(Message<?> message) {
        StompHeaderAccessor wrap = StompHeaderAccessor.wrap(message);
        String sessionId = wrap.getSessionId();
        String destination = wrap.getDestination();
        StompCommand command = wrap.getCommand();
        SimpMessageType messageType = wrap.getMessageType();
        if (SimpMessageType.MESSAGE.equals(messageType)) {
            sessionId = sessionId == null ? SystemStompConnectionHandler.SESSION_ID : sessionId;
            wrap.setSessionId(sessionId);
            command = wrap.updateStompCommandAsClientMessage();
            message = MessageBuilder.withPayload(message.getPayload()).setHeaders(wrap).build();
        }
        if (sessionId == null) {
            this.logger.error("No sessionId, ignoring message: " + message);
            return;
        }
        if (command == null || !command.requiresDestination() || checkDestinationPrefix(destination)) {
            if (SimpMessageType.CONNECT.equals(messageType)) {
                if (getVirtualHost() != null) {
                    wrap.setHost(getVirtualHost());
                }
                StompConnectionHandler stompConnectionHandler = new StompConnectionHandler(sessionId, wrap);
                this.connectionHandlers.put(sessionId, stompConnectionHandler);
                this.tcpClient.connect(stompConnectionHandler);
                return;
            }
            if (!SimpMessageType.DISCONNECT.equals(messageType)) {
                StompConnectionHandler stompConnectionHandler2 = this.connectionHandlers.get(sessionId);
                if (stompConnectionHandler2 == null) {
                    this.logger.warn("Connection for sessionId=" + sessionId + " not found. Ignoring message: " + message);
                    return;
                } else {
                    stompConnectionHandler2.forward(message);
                    return;
                }
            }
            StompConnectionHandler removeConnectionHandler = removeConnectionHandler(sessionId);
            if (removeConnectionHandler != null) {
                removeConnectionHandler.forward(message);
            } else if (this.logger.isTraceEnabled()) {
                this.logger.trace("Connection already removed for sessionId=" + sessionId);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public StompConnectionHandler removeConnectionHandler(String str) {
        if (SystemStompConnectionHandler.SESSION_ID.equals(str)) {
            return null;
        }
        return this.connectionHandlers.remove(str);
    }
}
