package org.springframework.integration.kafka.listener;

import com.gs.collections.api.RichIterable;
import com.gs.collections.api.block.function.Function;
import com.gs.collections.api.block.predicate.Predicate;
import com.gs.collections.api.list.ImmutableList;
import com.gs.collections.api.list.MutableList;
import com.gs.collections.api.multimap.list.ImmutableListMultimap;
import com.gs.collections.api.partition.PartitionIterable;
import com.gs.collections.api.set.MutableSet;
import com.gs.collections.api.tuple.Pair;
import com.gs.collections.impl.block.factory.Functions;
import com.gs.collections.impl.block.function.checked.CheckedFunction;
import com.gs.collections.impl.factory.Lists;
import com.gs.collections.impl.factory.Sets;
import com.gs.collections.impl.list.mutable.FastList;
import com.gs.collections.impl.map.mutable.UnifiedMap;
import com.gs.collections.impl.utility.ArrayIterate;
import com.gs.collections.impl.utility.Iterate;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import kafka.common.ErrorMapping;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.context.SmartLifecycle;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.integration.kafka.core.BrokerAddress;
import org.springframework.integration.kafka.core.ConnectionFactory;
import org.springframework.integration.kafka.core.ConsumerException;
import org.springframework.integration.kafka.core.FetchRequest;
import org.springframework.integration.kafka.core.KafkaConsumerDefaults;
import org.springframework.integration.kafka.core.KafkaMessage;
import org.springframework.integration.kafka.core.KafkaMessageBatch;
import org.springframework.integration.kafka.core.KafkaTemplate;
import org.springframework.integration.kafka.core.Partition;
import org.springframework.integration.kafka.core.Result;
import org.springframework.scheduling.SchedulingAwareRunnable;
import org.springframework.util.Assert;

/* loaded from: input_file:org/springframework/integration/kafka/listener/KafkaMessageListenerContainer.class */
public class KafkaMessageListenerContainer implements SmartLifecycle {
    public static final int DEFAULT_WAIT_FOR_LEADER_REFRESH_RETRY = 5000;
    private static final int DEFAULT_STOP_TIMEOUT = 1000;
    private static final Log log = LogFactory.getLog(KafkaMessageListenerContainer.class);
    private final KafkaTemplate kafkaTemplate;
    private final String[] topics;
    private Partition[] partitions;
    private Executor fetchTaskExecutor;
    private Executor adminTaskExecutor;
    private Executor dispatcherTaskExecutor;
    private Object messageListener;
    private volatile OffsetManager offsetManager;
    private ConcurrentMap<Partition, Long> fetchOffsets;
    private ConcurrentMessageListenerDispatcher messageDispatcher;
    private final GetOffsetForPartitionFunction getOffset = new GetOffsetForPartitionFunction();
    private final PartitionToLeaderFunction getLeader = new PartitionToLeaderFunction();
    private final Function<Partition, Partition> passThru = Functions.getPassThru();
    private final Object lifecycleMonitor = new Object();
    public boolean autoStartup = true;
    private int concurrency = 1;
    private volatile boolean running = false;
    private int maxFetch = KafkaConsumerDefaults.FETCH_SIZE_INT;
    private int queueSize = 1024;
    private int stopTimeout = 1000;
    private int recoveryInterval = 5000;
    private ErrorHandler errorHandler = new LoggingErrorHandler();
    private final ConcurrentMap<BrokerAddress, FetchTask> fetchTasksByBroker = new ConcurrentHashMap();
    private boolean autoCommitOnError = false;

    /* loaded from: input_file:org/springframework/integration/kafka/listener/KafkaMessageListenerContainer$FetchTask.class */
    public class FetchTask implements SchedulingAwareRunnable {
        private final BrokerAddress brokerAddress;
        private final PartitionToFetchRequestFunction partitionToFetchRequestFunction;
        private final MutableSet<Partition> listenedPartitions = Sets.mutable.of().asSynchronized();
        private final IsLeaderErrorPredicate isLeaderPredicate = new IsLeaderErrorPredicate();
        private final IsOffsetOutOfRangePredicate offsetOutOfRangePredicate = new IsOffsetOutOfRangePredicate();
        private volatile boolean active = true;

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/springframework/integration/kafka/listener/KafkaMessageListenerContainer$FetchTask$IsLeaderErrorPredicate.class */
        public class IsLeaderErrorPredicate implements Predicate<Map.Entry<Partition, Short>> {
            private IsLeaderErrorPredicate() {
            }

            public boolean accept(Map.Entry<Partition, Short> entry) {
                return entry.getValue().shortValue() == ErrorMapping.NotLeaderForPartitionCode() || entry.getValue().shortValue() == ErrorMapping.UnknownTopicOrPartitionCode();
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/springframework/integration/kafka/listener/KafkaMessageListenerContainer$FetchTask$IsOffsetOutOfRangePredicate.class */
        public class IsOffsetOutOfRangePredicate implements Predicate<Map.Entry<Partition, Short>> {
            private IsOffsetOutOfRangePredicate() {
            }

            public boolean accept(Map.Entry<Partition, Short> entry) {
                return entry.getValue().shortValue() == ErrorMapping.OffsetOutOfRangeCode();
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/springframework/integration/kafka/listener/KafkaMessageListenerContainer$FetchTask$UpdateLeadersTask.class */
        public class UpdateLeadersTask implements SchedulingAwareRunnable {
            private final Iterable<Partition> partitionsToReset;

            public UpdateLeadersTask(Iterable<Partition> iterable) {
                this.partitionsToReset = iterable;
            }

            public boolean isLongLived() {
                return true;
            }

            public void run() {
                boolean z = false;
                while (!z && KafkaMessageListenerContainer.this.isRunning()) {
                    try {
                        KafkaMessageListenerContainer.this.kafkaTemplate.getConnectionFactory().refreshMetadata(FastList.newList(this.partitionsToReset).collect(new PartitionToTopicFunction()).distinct());
                        for (Pair pair : UnifiedMap.newMap(KafkaMessageListenerContainer.this.kafkaTemplate.getConnectionFactory().getLeaders(this.partitionsToReset)).flip().keyMultiValuePairsView()) {
                            synchronized (KafkaMessageListenerContainer.this.fetchTasksByBroker) {
                                FetchTask fetchTask = (FetchTask) KafkaMessageListenerContainer.this.fetchTasksByBroker.get(pair.getOne());
                                if (!(fetchTask != null ? fetchTask.addListenedPartitionsIfActive((Iterable) pair.getTwo()) : false)) {
                                    SchedulingAwareRunnable fetchTask2 = new FetchTask((BrokerAddress) pair.getOne(), (RichIterable) pair.getTwo());
                                    KafkaMessageListenerContainer.this.fetchTaskExecutor.execute(fetchTask2);
                                    KafkaMessageListenerContainer.this.fetchTasksByBroker.put(pair.getOne(), fetchTask2);
                                }
                            }
                        }
                        z = true;
                    } catch (Exception e) {
                        if (KafkaMessageListenerContainer.this.isRunning()) {
                            try {
                                Thread.sleep(KafkaMessageListenerContainer.this.recoveryInterval);
                            } catch (InterruptedException e2) {
                                Thread.currentThread().interrupt();
                                KafkaMessageListenerContainer.log.error("Interrupted after refresh leaders failure for: " + Iterate.makeString(this.partitionsToReset, ","));
                                z = true;
                            }
                        }
                    }
                }
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/springframework/integration/kafka/listener/KafkaMessageListenerContainer$FetchTask$UpdateOffsetsTask.class */
        public class UpdateOffsetsTask implements Runnable {
            private final Collection<Partition> partitionsToResetOffsets;

            public UpdateOffsetsTask(Collection<Partition> collection) {
                this.partitionsToResetOffsets = collection;
            }

            @Override // java.lang.Runnable
            public void run() {
                KafkaMessageListenerContainer.this.offsetManager.resetOffsets(this.partitionsToResetOffsets);
                for (Partition partition : this.partitionsToResetOffsets) {
                    KafkaMessageListenerContainer.this.fetchOffsets.replace(partition, Long.valueOf(KafkaMessageListenerContainer.this.offsetManager.getOffset(partition)));
                }
                synchronized (KafkaMessageListenerContainer.this.fetchTasksByBroker) {
                    FetchTask fetchTask = (FetchTask) KafkaMessageListenerContainer.this.fetchTasksByBroker.get(FetchTask.this.brokerAddress);
                    if (!(fetchTask != null ? fetchTask.addListenedPartitionsIfActive(this.partitionsToResetOffsets) : false)) {
                        SchedulingAwareRunnable fetchTask2 = new FetchTask(FetchTask.this.brokerAddress, Sets.immutable.ofAll(this.partitionsToResetOffsets));
                        KafkaMessageListenerContainer.this.fetchTaskExecutor.execute(fetchTask2);
                        KafkaMessageListenerContainer.this.fetchTasksByBroker.put(FetchTask.this.brokerAddress, fetchTask2);
                    }
                }
            }
        }

        public FetchTask(BrokerAddress brokerAddress, RichIterable<Partition> richIterable) {
            this.partitionToFetchRequestFunction = new PartitionToFetchRequestFunction();
            this.brokerAddress = brokerAddress;
            this.listenedPartitions.addAll(richIterable.toSet());
        }

        public boolean isLongLived() {
            return true;
        }

        public boolean addListenedPartitionsIfActive(Iterable<Partition> iterable) {
            boolean z;
            synchronized (this.listenedPartitions) {
                if (this.active) {
                    this.listenedPartitions.addAllIterable(iterable);
                }
                z = this.active;
            }
            return z;
        }

        public void run() {
            while (this.active && KafkaMessageListenerContainer.this.isRunning()) {
                try {
                    synchronized (this.listenedPartitions) {
                        try {
                            if (this.listenedPartitions.isEmpty()) {
                                this.active = false;
                            } else {
                                Result<KafkaMessageBatch> fetchAvailableData = fetchAvailableData();
                                handleSuccessful(fetchAvailableData);
                                if (fetchAvailableData.getErrors().size() > 0) {
                                    handleErrors(fetchAvailableData);
                                }
                            }
                        } catch (ConsumerException e) {
                            this.active = false;
                            KafkaMessageListenerContainer.this.kafkaTemplate.getConnectionFactory().disconnect(this.brokerAddress);
                            resetLeaders(this.listenedPartitions.toImmutable());
                        }
                    }
                } catch (Throwable th) {
                    this.active = false;
                    synchronized (KafkaMessageListenerContainer.this.fetchTasksByBroker) {
                        if (KafkaMessageListenerContainer.this.fetchTasksByBroker.get(this.brokerAddress) == this) {
                            KafkaMessageListenerContainer.this.fetchTasksByBroker.remove(this.brokerAddress);
                        }
                        throw th;
                    }
                }
            }
            this.active = false;
            synchronized (KafkaMessageListenerContainer.this.fetchTasksByBroker) {
                if (KafkaMessageListenerContainer.this.fetchTasksByBroker.get(this.brokerAddress) == this) {
                    KafkaMessageListenerContainer.this.fetchTasksByBroker.remove(this.brokerAddress);
                }
            }
        }

        private Result<KafkaMessageBatch> fetchAvailableData() {
            return KafkaMessageListenerContainer.this.kafkaTemplate.receive(this.listenedPartitions.collect(this.partitionToFetchRequestFunction));
        }

        private void handleSuccessful(Result<KafkaMessageBatch> result) {
            for (KafkaMessageBatch kafkaMessageBatch : result.getResults().values()) {
                if (!kafkaMessageBatch.getMessages().isEmpty()) {
                    long j = 0;
                    for (KafkaMessage kafkaMessage : kafkaMessageBatch.getMessages()) {
                        if (kafkaMessage.getMetadata().getOffset() >= ((Long) KafkaMessageListenerContainer.this.fetchOffsets.get(kafkaMessageBatch.getPartition())).longValue()) {
                            KafkaMessageListenerContainer.this.messageDispatcher.dispatch(kafkaMessage);
                        }
                        j = Math.max(j, kafkaMessage.getMetadata().getNextOffset());
                    }
                    KafkaMessageListenerContainer.this.fetchOffsets.replace(kafkaMessageBatch.getPartition(), Long.valueOf(j));
                }
            }
        }

        private void handleErrors(Result<KafkaMessageBatch> result) {
            PartitionIterable partition = Iterate.partition(result.getErrors().entrySet(), this.isLeaderPredicate);
            resetLeaders(partition.getSelected().collect(Functions.getKeyFunction()));
            PartitionIterable partition2 = partition.getRejected().partition(this.offsetOutOfRangePredicate);
            resetOffsets(partition2.getSelected().collect(Functions.getKeyFunction()).toSet());
            this.listenedPartitions.removeAllIterable(partition2.getRejected().collect(Functions.getKeyFunction()));
        }

        private void resetLeaders(Iterable<Partition> iterable) {
            this.listenedPartitions.removeAllIterable(iterable);
            KafkaMessageListenerContainer.this.adminTaskExecutor.execute(new UpdateLeadersTask(iterable));
        }

        private void resetOffsets(Collection<Partition> collection) {
            this.listenedPartitions.removeAllIterable(collection);
            KafkaMessageListenerContainer.this.adminTaskExecutor.execute(new UpdateOffsetsTask(collection));
        }
    }

    /* loaded from: input_file:org/springframework/integration/kafka/listener/KafkaMessageListenerContainer$GetOffsetForPartitionFunction.class */
    class GetOffsetForPartitionFunction extends CheckedFunction<Partition, Long> {
        GetOffsetForPartitionFunction() {
        }

        public Long safeValueOf(Partition partition) throws Exception {
            try {
                return Long.valueOf(KafkaMessageListenerContainer.this.offsetManager.getOffset(partition));
            } catch (Exception e) {
                KafkaMessageListenerContainer.log.error(e);
                throw e;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/springframework/integration/kafka/listener/KafkaMessageListenerContainer$GetPartitionsForTopic.class */
    public static class GetPartitionsForTopic extends CheckedFunction<String, Iterable<Partition>> {
        private final ConnectionFactory connectionFactory;

        public GetPartitionsForTopic(ConnectionFactory connectionFactory) {
            this.connectionFactory = connectionFactory;
        }

        public Iterable<Partition> safeValueOf(String str) throws Exception {
            return this.connectionFactory.getPartitions(str);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/springframework/integration/kafka/listener/KafkaMessageListenerContainer$PartitionToFetchRequestFunction.class */
    public class PartitionToFetchRequestFunction implements Function<Partition, FetchRequest> {
        private PartitionToFetchRequestFunction() {
        }

        public FetchRequest valueOf(Partition partition) {
            return new FetchRequest(partition, ((Long) KafkaMessageListenerContainer.this.fetchOffsets.get(partition)).longValue(), KafkaMessageListenerContainer.this.maxFetch);
        }
    }

    /* loaded from: input_file:org/springframework/integration/kafka/listener/KafkaMessageListenerContainer$PartitionToLeaderFunction.class */
    private class PartitionToLeaderFunction implements Function<Partition, BrokerAddress> {
        private PartitionToLeaderFunction() {
        }

        public BrokerAddress valueOf(Partition partition) {
            return KafkaMessageListenerContainer.this.kafkaTemplate.getConnectionFactory().getLeader(partition);
        }
    }

    /* loaded from: input_file:org/springframework/integration/kafka/listener/KafkaMessageListenerContainer$PartitionToTopicFunction.class */
    private class PartitionToTopicFunction implements Function<Partition, String> {
        private PartitionToTopicFunction() {
        }

        public String valueOf(Partition partition) {
            return partition.getTopic();
        }
    }

    public KafkaMessageListenerContainer(ConnectionFactory connectionFactory, Partition... partitionArr) {
        Assert.notNull(connectionFactory, "A connection factory must be supplied");
        Assert.notEmpty(partitionArr, "A list of partitions must be provided");
        Assert.noNullElements(partitionArr, "The list of partitions cannot contain null elements");
        this.kafkaTemplate = new KafkaTemplate(connectionFactory);
        this.partitions = partitionArr;
        this.topics = null;
    }

    public KafkaMessageListenerContainer(ConnectionFactory connectionFactory, String... strArr) {
        Assert.notNull(connectionFactory, "A connection factory must be supplied");
        Assert.notNull(strArr, "A list of topics must be provided");
        Assert.noNullElements(strArr, "The list of topics cannot contain null elements");
        this.kafkaTemplate = new KafkaTemplate(connectionFactory);
        this.topics = strArr;
    }

    public OffsetManager getOffsetManager() {
        return this.offsetManager;
    }

    public void setOffsetManager(OffsetManager offsetManager) {
        this.offsetManager = offsetManager;
    }

    public Object getMessageListener() {
        return this.messageListener;
    }

    public void setMessageListener(Object obj) {
        Assert.isTrue((obj instanceof MessageListener) || (obj instanceof AcknowledgingMessageListener), "Either a " + MessageListener.class.getName() + " or a " + AcknowledgingMessageListener.class.getName() + " must be provided");
        this.messageListener = obj;
    }

    public ErrorHandler getErrorHandler() {
        return this.errorHandler;
    }

    public void setErrorHandler(ErrorHandler errorHandler) {
        this.errorHandler = errorHandler;
    }

    public int getConcurrency() {
        return this.concurrency;
    }

    public void setConcurrency(int i) {
        this.concurrency = i;
    }

    public void setStopTimeout(int i) {
        this.stopTimeout = i;
    }

    public int getStopTimeout() {
        return this.stopTimeout;
    }

    public Executor getFetchTaskExecutor() {
        return this.fetchTaskExecutor;
    }

    public void setFetchTaskExecutor(Executor executor) {
        this.fetchTaskExecutor = executor;
    }

    public Executor getAdminTaskExecutor() {
        return this.adminTaskExecutor;
    }

    public void setAdminTaskExecutor(Executor executor) {
        this.adminTaskExecutor = executor;
    }

    public void setDispatcherTaskExecutor(Executor executor) {
        this.dispatcherTaskExecutor = executor;
    }

    public int getMaxFetch() {
        return this.maxFetch;
    }

    public int getQueueSize() {
        return this.queueSize;
    }

    public void setQueueSize(int i) {
        Assert.isTrue(i > 0 && Integer.bitCount(i) == 1, "'queueSize' must be a positive number and a power of 2");
        this.queueSize = i;
    }

    public void setMaxFetch(int i) {
        this.maxFetch = i;
    }

    public void setAutoCommitOnError(boolean z) {
        this.autoCommitOnError = z;
    }

    public boolean isAutoCommitOnError() {
        return this.autoCommitOnError;
    }

    public boolean isAutoStartup() {
        return this.autoStartup;
    }

    public void setAutoStartup(boolean z) {
        this.autoStartup = z;
    }

    public int getRecoveryInterval() {
        return this.recoveryInterval;
    }

    public void setRecoveryInterval(int i) {
        this.recoveryInterval = i;
    }

    public void stop(Runnable runnable) {
        synchronized (this.lifecycleMonitor) {
            if (this.running) {
                this.running = false;
                try {
                    this.offsetManager.flush();
                } catch (IOException e) {
                    log.error("Error while flushing:", e);
                }
                this.messageDispatcher.stop(this.stopTimeout);
            }
        }
        if (runnable != null) {
            runnable.run();
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    public void start() {
        synchronized (this.lifecycleMonitor) {
            if (!this.running) {
                if (this.partitions == null) {
                    this.partitions = getPartitionsForTopics(this.kafkaTemplate.getConnectionFactory(), this.topics);
                }
                this.running = true;
                if (this.offsetManager == null) {
                    this.offsetManager = new MetadataStoreOffsetManager(this.kafkaTemplate.getConnectionFactory());
                }
                ImmutableList with = Lists.immutable.with(this.partitions);
                this.fetchOffsets = new ConcurrentHashMap((Map) with.toMap(this.passThru, this.getOffset));
                this.messageDispatcher = new ConcurrentMessageListenerDispatcher(this.messageListener, this.errorHandler, Arrays.asList(this.partitions), this.offsetManager, this.concurrency, this.queueSize, this.dispatcherTaskExecutor, this.autoCommitOnError);
                this.messageDispatcher.start();
                this.fetchTasksByBroker.clear();
                ImmutableListMultimap groupBy = with.groupBy(this.getLeader);
                if (this.fetchTaskExecutor == null) {
                    this.fetchTaskExecutor = new SimpleAsyncTaskExecutor("kafka-fetch-");
                }
                if (this.adminTaskExecutor == null) {
                    this.adminTaskExecutor = Executors.newSingleThreadExecutor();
                }
                for (Pair pair : groupBy.keyMultiValuePairsView()) {
                    SchedulingAwareRunnable fetchTask = new FetchTask((BrokerAddress) pair.getOne(), (RichIterable) pair.getTwo());
                    this.fetchTaskExecutor.execute(fetchTask);
                    this.fetchTasksByBroker.put(pair.getOne(), fetchTask);
                }
            }
        }
    }

    public void stop() {
        stop(null);
    }

    public boolean isRunning() {
        return this.running;
    }

    public int getPhase() {
        return 0;
    }

    private static Partition[] getPartitionsForTopics(ConnectionFactory connectionFactory, String[] strArr) {
        MutableList flatCollect = ArrayIterate.flatCollect(strArr, new GetPartitionsForTopic(connectionFactory));
        return (Partition[]) flatCollect.toArray(new Partition[flatCollect.size()]);
    }
}
