package org.springframework.integration.kafka.listener;

import java.io.IOException;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.integration.kafka.core.Partition;
import org.springframework.util.Assert;
import reactor.Environment;
import reactor.core.processor.RingBufferProcessor;
import reactor.fn.BiFunction;
import reactor.fn.Consumer;
import reactor.fn.Function;
import reactor.rx.Stream;
import reactor.rx.Streams;
import reactor.rx.stream.GroupedStream;

/* loaded from: input_file:org/springframework/integration/kafka/listener/WindowingOffsetManager.class */
public class WindowingOffsetManager implements OffsetManager, InitializingBean, DisposableBean {
    private static final BiFunction<Long, Long, Long> maxFunction;
    private static final Function<PartitionAndOffset, Long> offsetFunction;
    private static final ComputeMaximumOffsetByPartitionFunction findHighestOffsetInPartitionGroup;
    private static final Function<PartitionAndOffset, Partition> getPartitionFunction;
    private static final FindHighestOffsetsByPartitionFunction findHighestOffsetsByPartition;
    private final OffsetManager delegate;
    private volatile RingBufferProcessor<PartitionAndOffset> offsets;
    private volatile boolean closed;
    private final Consumer<PartitionAndOffset> delegateUpdateOffset = new Consumer<PartitionAndOffset>() { // from class: org.springframework.integration.kafka.listener.WindowingOffsetManager.4
        public void accept(PartitionAndOffset partitionAndOffset) {
            WindowingOffsetManager.this.delegate.updateOffset(partitionAndOffset.getPartition(), partitionAndOffset.getOffset().longValue());
        }
    };
    private final Consumer<Void> offsetComplete = new Consumer<Void>() { // from class: org.springframework.integration.kafka.listener.WindowingOffsetManager.5
        public void accept(Void r3) {
            WindowingOffsetManager.this.createOffsetsStream();
        }
    };
    private final ReadWriteLock offsetsLock = new ReentrantReadWriteLock();
    private long timespan = 10000;
    private int count = Integer.MAX_VALUE;
    private int shutdownTimeout = 2000;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/springframework/integration/kafka/listener/WindowingOffsetManager$ComputeMaximumOffsetByPartitionFunction.class */
    public static class ComputeMaximumOffsetByPartitionFunction implements Function<GroupedStream<Partition, PartitionAndOffset>, Stream<PartitionAndOffset>> {
        private ComputeMaximumOffsetByPartitionFunction() {
        }

        public Stream<PartitionAndOffset> apply(final GroupedStream<Partition, PartitionAndOffset> groupedStream) {
            return groupedStream.map(WindowingOffsetManager.offsetFunction).reduce(WindowingOffsetManager.maxFunction).map(new Function<Long, PartitionAndOffset>() { // from class: org.springframework.integration.kafka.listener.WindowingOffsetManager.ComputeMaximumOffsetByPartitionFunction.1
                public PartitionAndOffset apply(Long l) {
                    return new PartitionAndOffset((Partition) groupedStream.key(), l);
                }
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/springframework/integration/kafka/listener/WindowingOffsetManager$FindHighestOffsetsByPartitionFunction.class */
    public static class FindHighestOffsetsByPartitionFunction implements Function<Stream<PartitionAndOffset>, Stream<PartitionAndOffset>> {
        private FindHighestOffsetsByPartitionFunction() {
        }

        public Stream<PartitionAndOffset> apply(Stream<PartitionAndOffset> stream) {
            return stream.groupBy(WindowingOffsetManager.getPartitionFunction).flatMap(WindowingOffsetManager.findHighestOffsetInPartitionGroup);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/springframework/integration/kafka/listener/WindowingOffsetManager$PartitionAndOffset.class */
    public static class PartitionAndOffset {
        private final Partition partition;
        private final Long offset;

        public PartitionAndOffset(Partition partition, Long l) {
            this.partition = partition;
            this.offset = l;
        }

        public Partition getPartition() {
            return this.partition;
        }

        public Long getOffset() {
            return this.offset;
        }

        public String toString() {
            return "PartitionAndOffset{partition=" + this.partition + ", offset=" + this.offset + '}';
        }
    }

    public WindowingOffsetManager(OffsetManager offsetManager) {
        this.delegate = offsetManager;
    }

    public void setTimespan(long j) {
        Assert.isTrue(j >= 0, "Timespan must be a positive value");
        this.timespan = j;
    }

    public void setCount(int i) {
        Assert.isTrue(i >= 0, "Count must be a positive value");
        this.count = i;
    }

    public void setShutdownTimeout(int i) {
        this.shutdownTimeout = i;
    }

    public void afterPropertiesSet() throws Exception {
        if (this.count != 1) {
            createOffsetsStream();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void createOffsetsStream() {
        if (this.closed) {
            return;
        }
        this.offsetsLock.writeLock().lock();
        try {
            this.offsets = RingBufferProcessor.share("spring-integration-kafka-offset", 1024);
            this.offsetsLock.writeLock().unlock();
            Streams.wrap(this.offsets).window(this.count, this.timespan, TimeUnit.MILLISECONDS).flatMap(findHighestOffsetsByPartition).consume(this.delegateUpdateOffset, (Consumer) null, this.offsetComplete);
        } catch (Throwable th) {
            this.offsetsLock.writeLock().unlock();
            throw th;
        }
    }

    public void destroy() throws Exception {
        flush();
        close();
        if (this.delegate instanceof DisposableBean) {
            this.delegate.destroy();
        }
    }

    @Override // org.springframework.integration.kafka.listener.OffsetManager
    public void updateOffset(Partition partition, long j) {
        if (this.offsets == null) {
            this.delegate.updateOffset(partition, j);
            return;
        }
        this.offsetsLock.readLock().lock();
        try {
            this.offsets.onNext(new PartitionAndOffset(partition, Long.valueOf(j)));
            this.offsetsLock.readLock().unlock();
        } catch (Throwable th) {
            this.offsetsLock.readLock().unlock();
            throw th;
        }
    }

    @Override // org.springframework.integration.kafka.listener.OffsetManager
    public long getOffset(Partition partition) {
        doFlush();
        return this.delegate.getOffset(partition);
    }

    @Override // org.springframework.integration.kafka.listener.OffsetManager
    public void deleteOffset(Partition partition) {
        doFlush();
        this.delegate.deleteOffset(partition);
    }

    @Override // org.springframework.integration.kafka.listener.OffsetManager
    public void resetOffsets(Collection<Partition> collection) {
        doFlush();
        this.delegate.resetOffsets(collection);
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        this.closed = true;
        this.delegate.close();
    }

    @Override // java.io.Flushable
    public void flush() throws IOException {
        if (this.offsets != null) {
            this.offsets.awaitAndShutdown(this.shutdownTimeout, TimeUnit.MILLISECONDS);
        }
        this.delegate.flush();
    }

    private void doFlush() {
        try {
            flush();
        } catch (IOException e) {
            throw new IllegalStateException(e);
        }
    }

    static {
        Environment.initializeIfEmpty();
        maxFunction = new BiFunction<Long, Long, Long>() { // from class: org.springframework.integration.kafka.listener.WindowingOffsetManager.1
            public Long apply(Long l, Long l2) {
                return Long.valueOf(Math.max(l.longValue(), l2.longValue()));
            }
        };
        offsetFunction = new Function<PartitionAndOffset, Long>() { // from class: org.springframework.integration.kafka.listener.WindowingOffsetManager.2
            public Long apply(PartitionAndOffset partitionAndOffset) {
                return partitionAndOffset.getOffset();
            }
        };
        findHighestOffsetInPartitionGroup = new ComputeMaximumOffsetByPartitionFunction();
        getPartitionFunction = new Function<PartitionAndOffset, Partition>() { // from class: org.springframework.integration.kafka.listener.WindowingOffsetManager.3
            public Partition apply(PartitionAndOffset partitionAndOffset) {
                return partitionAndOffset.getPartition();
            }
        };
        findHighestOffsetsByPartition = new FindHighestOffsetsByPartitionFunction();
    }
}
