package org.springframework.integration.kafka.outbound;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.springframework.core.AttributeAccessorSupport;
import org.springframework.expression.EvaluationContext;
import org.springframework.expression.Expression;
import org.springframework.integration.MessageTimeoutException;
import org.springframework.integration.expression.ExpressionUtils;
import org.springframework.integration.expression.ValueExpression;
import org.springframework.integration.handler.AbstractMessageProducingHandler;
import org.springframework.integration.kafka.support.KafkaSendFailureException;
import org.springframework.integration.support.DefaultErrorMessageStrategy;
import org.springframework.integration.support.ErrorMessageStrategy;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.DefaultKafkaHeaderMapper;
import org.springframework.kafka.support.JacksonPresent;
import org.springframework.kafka.support.KafkaHeaderMapper;
import org.springframework.kafka.support.KafkaNull;
import org.springframework.kafka.support.SendResult;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;

/* loaded from: input_file:org/springframework/integration/kafka/outbound/KafkaProducerMessageHandler.class */
public class KafkaProducerMessageHandler<K, V> extends AbstractMessageProducingHandler {
    private static final long DEFAULT_SEND_TIMEOUT = 10000;
    private final KafkaTemplate<K, V> kafkaTemplate;
    private EvaluationContext evaluationContext;
    private volatile Expression topicExpression;
    private volatile Expression messageKeyExpression;
    private volatile Expression partitionIdExpression;
    private volatile Expression timestampExpression;
    private boolean sync;
    private KafkaHeaderMapper headerMapper;
    private MessageChannel sendFailureChannel;
    private String sendFailureChannelName;
    private Expression sendTimeoutExpression = new ValueExpression(Long.valueOf(DEFAULT_SEND_TIMEOUT));
    private ErrorMessageStrategy errorMessageStrategy = new DefaultErrorMessageStrategy();

    /* loaded from: input_file:org/springframework/integration/kafka/outbound/KafkaProducerMessageHandler$Attributes.class */
    private static final class Attributes extends AttributeAccessorSupport {
        Attributes() {
        }
    }

    public KafkaProducerMessageHandler(KafkaTemplate<K, V> kafkaTemplate) {
        Assert.notNull(kafkaTemplate, "kafkaTemplate cannot be null");
        this.kafkaTemplate = kafkaTemplate;
        if (JacksonPresent.isJackson2Present()) {
            this.headerMapper = new DefaultKafkaHeaderMapper();
        }
    }

    public void setTopicExpression(Expression expression) {
        this.topicExpression = expression;
    }

    public void setMessageKeyExpression(Expression expression) {
        this.messageKeyExpression = expression;
    }

    public void setPartitionIdExpression(Expression expression) {
        this.partitionIdExpression = expression;
    }

    public void setTimestampExpression(Expression expression) {
        this.timestampExpression = expression;
    }

    public void setHeaderMapper(KafkaHeaderMapper kafkaHeaderMapper) {
        this.headerMapper = kafkaHeaderMapper;
    }

    public KafkaTemplate<?, ?> getKafkaTemplate() {
        return this.kafkaTemplate;
    }

    public void setSync(boolean z) {
        this.sync = z;
    }

    public void setSendTimeout(long j) {
        super.setSendTimeout(j);
        setSendTimeoutExpression(new ValueExpression(Long.valueOf(j)));
    }

    public void setSendTimeoutExpression(Expression expression) {
        Assert.notNull(expression, "'sendTimeoutExpression' must not be null");
        this.sendTimeoutExpression = expression;
    }

    public void setSendFailureChannel(MessageChannel messageChannel) {
        this.sendFailureChannel = messageChannel;
    }

    public void setSendFailureChannelName(String str) {
        this.sendFailureChannelName = str;
    }

    public void setErrorMessageStrategy(ErrorMessageStrategy errorMessageStrategy) {
        Assert.notNull(errorMessageStrategy, "'errorMessageStrategy' cannot be null");
        this.errorMessageStrategy = errorMessageStrategy;
    }

    protected MessageChannel getSendFailureChannel() {
        if (this.sendFailureChannel != null) {
            return this.sendFailureChannel;
        }
        if (this.sendFailureChannelName == null) {
            return null;
        }
        this.sendFailureChannel = (MessageChannel) getChannelResolver().resolveDestination(this.sendFailureChannelName);
        return this.sendFailureChannel;
    }

    protected void onInit() throws Exception {
        super.onInit();
        this.evaluationContext = ExpressionUtils.createStandardEvaluationContext(getBeanFactory());
    }

    protected void handleMessageInternal(final Message<?> message) throws Exception {
        String str = this.topicExpression != null ? (String) this.topicExpression.getValue(this.evaluationContext, message, String.class) : (String) message.getHeaders().get("kafka_topic", String.class);
        Assert.state(StringUtils.hasText(str), "The 'topic' can not be empty or null");
        Integer num = this.partitionIdExpression != null ? (Integer) this.partitionIdExpression.getValue(this.evaluationContext, message, Integer.class) : (Integer) message.getHeaders().get("kafka_partitionId", Integer.class);
        Object value = this.messageKeyExpression != null ? this.messageKeyExpression.getValue(this.evaluationContext, message) : message.getHeaders().get("kafka_messageKey");
        Long l = this.timestampExpression != null ? (Long) this.timestampExpression.getValue(this.evaluationContext, message, Long.class) : (Long) message.getHeaders().get("kafka_timestamp", Long.class);
        Object payload = message.getPayload();
        if (payload instanceof KafkaNull) {
            payload = null;
        }
        Headers headers = null;
        if (this.headerMapper != null) {
            headers = new RecordHeaders();
            this.headerMapper.fromHeaders(message.getHeaders(), headers);
        }
        final ProducerRecord producerRecord = new ProducerRecord(str, num, l, value, payload, headers);
        ListenableFuture send = this.kafkaTemplate.send(producerRecord);
        if (getSendFailureChannel() != null || getOutputChannel() != null) {
            send.addCallback(new ListenableFutureCallback<SendResult<K, V>>() { // from class: org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler.1
                public void onSuccess(SendResult<K, V> sendResult) {
                    if (KafkaProducerMessageHandler.this.getOutputChannel() != null) {
                        KafkaProducerMessageHandler.this.messagingTemplate.send(KafkaProducerMessageHandler.this.getOutputChannel(), KafkaProducerMessageHandler.this.getMessageBuilderFactory().fromMessage(message).setHeader("kafka_recordMetadata", sendResult.getRecordMetadata()).build());
                    }
                }

                public void onFailure(Throwable th) {
                    if (KafkaProducerMessageHandler.this.getSendFailureChannel() != null) {
                        KafkaProducerMessageHandler.this.messagingTemplate.send(KafkaProducerMessageHandler.this.getSendFailureChannel(), KafkaProducerMessageHandler.this.errorMessageStrategy.buildErrorMessage(new KafkaSendFailureException(message, producerRecord, th), new Attributes()));
                    }
                }
            });
        }
        if (this.sync) {
            Long l2 = (Long) this.sendTimeoutExpression.getValue(this.evaluationContext, message, Long.class);
            if (l2 == null || l2.longValue() < 0) {
                send.get();
                return;
            }
            try {
                send.get(l2.longValue(), TimeUnit.MILLISECONDS);
            } catch (TimeoutException e) {
                throw new MessageTimeoutException(message, "Timeout waiting for response from KafkaProducer", e);
            }
        }
    }

    public String getComponentType() {
        return "kafka:outbound-channel-adapter";
    }
}
