package org.springframework.integration.kafka.listener;

import com.gs.collections.api.block.procedure.Procedure;
import com.gs.collections.api.block.procedure.Procedure2;
import com.gs.collections.api.map.MutableMap;
import com.gs.collections.impl.factory.Maps;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.Executor;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.integration.kafka.core.KafkaMessage;
import org.springframework.integration.kafka.core.Partition;
import org.springframework.util.Assert;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/springframework/integration/kafka/listener/ConcurrentMessageListenerDispatcher.class */
public class ConcurrentMessageListenerDispatcher {
    private static final Log log = LogFactory.getLog(ConcurrentMessageListenerDispatcher.class);
    private static final StartDelegateProcedure startDelegateProcedure = new StartDelegateProcedure();
    private static final StopDelegateProcedure stopDelegateProcedure = new StopDelegateProcedure();
    private final Object lifecycleMonitor = new Object();
    private final Collection<Partition> partitions;
    private final int consumers;
    private final Object delegateListener;
    private final ErrorHandler errorHandler;
    private final OffsetManager offsetManager;
    private final int queueSize;
    private final Executor taskExecutor;
    private volatile boolean running;
    private MutableMap<Partition, QueueingMessageListenerInvoker> delegates;
    private boolean autoCommitOnError;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/springframework/integration/kafka/listener/ConcurrentMessageListenerDispatcher$StartDelegateProcedure.class */
    public static class StartDelegateProcedure implements Procedure<QueueingMessageListenerInvoker> {
        private StartDelegateProcedure() {
        }

        public void value(QueueingMessageListenerInvoker queueingMessageListenerInvoker) {
            queueingMessageListenerInvoker.start();
        }
    }

    /* loaded from: input_file:org/springframework/integration/kafka/listener/ConcurrentMessageListenerDispatcher$StopDelegateProcedure.class */
    private static class StopDelegateProcedure implements Procedure2<QueueingMessageListenerInvoker, Integer> {
        private StopDelegateProcedure() {
        }

        public void value(QueueingMessageListenerInvoker queueingMessageListenerInvoker, Integer num) {
            try {
                queueingMessageListenerInvoker.stop(num.intValue());
            } catch (Exception e) {
                if (ConcurrentMessageListenerDispatcher.log.isInfoEnabled()) {
                    ConcurrentMessageListenerDispatcher.log.info("Exception thrown while stopping dispatcher:", e);
                }
            }
        }
    }

    public ConcurrentMessageListenerDispatcher(Object obj, ErrorHandler errorHandler, Collection<Partition> collection, OffsetManager offsetManager, int i, int i2, Executor executor, boolean z) {
        Assert.isTrue((obj instanceof MessageListener) || (obj instanceof AcknowledgingMessageListener), "Either a " + MessageListener.class.getName() + " or a " + AcknowledgingMessageListener.class.getName() + " must be provided");
        Assert.notEmpty(collection, "A set of partitions must be provided");
        Assert.isTrue(i <= collection.size(), "The number of consumers must be smaller or equal to the number of partitions");
        Assert.notNull(obj, "A delegate must be provided");
        this.delegateListener = obj;
        this.errorHandler = errorHandler;
        this.partitions = collection;
        this.offsetManager = offsetManager;
        this.consumers = Math.min(collection.size(), i);
        this.queueSize = i2;
        this.taskExecutor = executor;
        this.autoCommitOnError = z;
    }

    public void start() {
        synchronized (this.lifecycleMonitor) {
            if (!this.running) {
                initializeAndStartDispatching();
                this.running = true;
            }
        }
    }

    public void stop(int i) {
        synchronized (this.lifecycleMonitor) {
            if (this.running) {
                this.running = false;
                this.delegates.flip().keyBag().toSet().forEachWith(stopDelegateProcedure, Integer.valueOf(i));
            }
            this.delegates = null;
        }
    }

    public void dispatch(KafkaMessage kafkaMessage) {
        if (this.running) {
            ((QueueingMessageListenerInvoker) this.delegates.get(kafkaMessage.getMetadata().getPartition())).enqueue(kafkaMessage);
        }
    }

    private void initializeAndStartDispatching() {
        ArrayList arrayList = new ArrayList(this.consumers);
        for (int i = 0; i < this.consumers; i++) {
            arrayList.add(new QueueingMessageListenerInvoker(this.queueSize, this.offsetManager, this.delegateListener, this.errorHandler, this.taskExecutor, this.autoCommitOnError));
        }
        int i2 = 0;
        this.delegates = Maps.mutable.of();
        Iterator<Partition> it = this.partitions.iterator();
        while (it.hasNext()) {
            int i3 = i2;
            i2++;
            this.delegates.put(it.next(), arrayList.get(i3 % this.consumers));
        }
        this.delegates.flip().keyBag().toSet().forEach(startDelegateProcedure);
    }
}
