package com.oracle.coherence.patterns.eventdistribution.distributors.coherence;

import com.oracle.coherence.common.events.Event;
import com.oracle.coherence.common.identifiers.Identifier;
import com.oracle.coherence.common.identifiers.StringBasedIdentifier;
import com.oracle.coherence.common.processors.InvokeMethodProcessor;
import com.oracle.coherence.common.tuples.Pair;
import com.oracle.coherence.patterns.eventdistribution.EventChannelController;
import com.oracle.coherence.patterns.eventdistribution.EventChannelNotReadyException;
import com.oracle.coherence.patterns.eventdistribution.EventDistributor;
import com.oracle.coherence.patterns.eventdistribution.distributors.AbstractEventChannelController;
import com.oracle.coherence.patterns.messaging.DefaultMessageTracker;
import com.oracle.coherence.patterns.messaging.Message;
import com.oracle.coherence.patterns.messaging.MessageIdentifier;
import com.oracle.coherence.patterns.messaging.MessageKey;
import com.oracle.coherence.patterns.messaging.MessageTracker;
import com.oracle.coherence.patterns.messaging.SubscriptionIdentifier;
import com.oracle.coherence.patterns.messaging.entryprocessors.AcknowledgeMessageProcessor;
import com.oracle.coherence.patterns.messaging.entryprocessors.AcknowledgeSubscriptionMessagesProcessor;
import com.oracle.coherence.patterns.messaging.entryprocessors.DrainSubscriptionMessagesProcessor;
import com.tangosol.coherence.config.builder.ParameterizedBuilder;
import com.tangosol.config.expression.ParameterResolver;
import com.tangosol.io.Serializer;
import com.tangosol.net.CacheFactory;
import com.tangosol.net.NamedCache;
import com.tangosol.util.processor.ExtractorProcessor;
import com.tangosol.util.processor.UpdaterProcessor;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.logging.Level;
import java.util.logging.Logger;

/* loaded from: input_file:com/oracle/coherence/patterns/eventdistribution/distributors/coherence/CoherenceEventChannelController.class */
public class CoherenceEventChannelController extends AbstractEventChannelController<MessageTracker> implements CoherenceEventChannelControllerMBean {
    private static Logger logger = Logger.getLogger(CoherenceEventChannelController.class.getName());
    private SubscriptionIdentifier subscriptionIdentifier;
    private ConcurrentHashMap<Identifier, Long> prevMessageIdentifierMap;

    public CoherenceEventChannelController(EventDistributor.Identifier identifier, EventChannelController.Identifier identifier2, EventChannelController.Dependencies dependencies, ClassLoader classLoader, ParameterResolver parameterResolver, ParameterizedBuilder<Serializer> parameterizedBuilder) {
        super(identifier, identifier2, dependencies, classLoader, parameterResolver, parameterizedBuilder);
        this.prevMessageIdentifierMap = new ConcurrentHashMap<>();
        this.subscriptionIdentifier = new SubscriptionIdentifier(StringBasedIdentifier.newInstance(identifier.getExternalName()), identifier2.getExternalName());
    }

    private boolean verifyMessageSequence(Message message) {
        long messageSequenceNumber = message.getRequestIdentifier().getMessageSequenceNumber();
        Identifier publisherIdentifier = message.getRequestIdentifier().getPublisherIdentifier();
        Long l = this.prevMessageIdentifierMap.get(publisherIdentifier);
        if (l == null) {
            this.prevMessageIdentifierMap.put(publisherIdentifier, Long.valueOf(messageSequenceNumber));
            return true;
        }
        this.prevMessageIdentifierMap.put(publisherIdentifier, Long.valueOf(messageSequenceNumber));
        if (l.longValue() <= messageSequenceNumber) {
            return true;
        }
        if (!logger.isLoggable(Level.SEVERE)) {
            return false;
        }
        logger.log(Level.SEVERE, "Recovery in process. Previously at message {0}, now at message: {1}.", new Object[]{l, l});
        return false;
    }

    public SubscriptionIdentifier getSubscriptionIdentifier() {
        return this.subscriptionIdentifier;
    }

    @Override // com.oracle.coherence.patterns.eventdistribution.distributors.AbstractEventChannelController
    protected void internalEnable() {
        CacheFactory.getCache("coherence.messagingpattern.subscriptions").invoke(this.subscriptionIdentifier, new InvokeMethodProcessor("enable"));
    }

    @Override // com.oracle.coherence.patterns.eventdistribution.distributors.AbstractEventChannelController
    protected void internalDisable() {
        CacheFactory.getCache("coherence.messagingpattern.subscriptions").invoke(this.subscriptionIdentifier, new InvokeMethodProcessor("disable"));
        internalDrain();
    }

    @Override // com.oracle.coherence.patterns.eventdistribution.distributors.AbstractEventChannelController
    protected void internalDrain() {
        final MessageTracker messageTracker = (MessageTracker) CacheFactory.getCache("coherence.messagingpattern.subscriptions").invoke(this.subscriptionIdentifier, new DrainSubscriptionMessagesProcessor());
        new Thread(new Runnable() { // from class: com.oracle.coherence.patterns.eventdistribution.distributors.coherence.CoherenceEventChannelController.1
            @Override // java.lang.Runnable
            public void run() {
                if (messageTracker == null || messageTracker.isEmpty()) {
                    return;
                }
                long size = messageTracker.size();
                if (CoherenceEventChannelController.logger.isLoggable(Level.INFO)) {
                    CoherenceEventChannelController.logger.log(Level.INFO, "Commenced draining {0} messages from {1} (in the background using batches of {2} with an inter-batch delay of {3} ms)", new Object[]{Long.valueOf(size), CoherenceEventChannelController.this.subscriptionIdentifier, Integer.valueOf(CoherenceEventChannelController.this.getBatchSize()), Long.valueOf(CoherenceEventChannelController.this.getBatchDistributionDelay())});
                }
                NamedCache cache = CacheFactory.getCache("coherence.messagingpattern.messages");
                Iterator it = messageTracker.iterator();
                while (it.hasNext() && !Thread.currentThread().isInterrupted()) {
                    ArrayList arrayList = new ArrayList(CoherenceEventChannelController.this.getBatchSize());
                    while (it.hasNext() && arrayList.size() < CoherenceEventChannelController.this.getBatchSize()) {
                        arrayList.add(Message.getKey(CoherenceEventChannelController.this.subscriptionIdentifier.getDestinationIdentifier(), (MessageIdentifier) it.next()));
                    }
                    cache.invokeAll(arrayList, new AcknowledgeMessageProcessor(CoherenceEventChannelController.this.subscriptionIdentifier));
                    if (CoherenceEventChannelController.this.getBatchDistributionDelay() > 0) {
                        try {
                            Thread.currentThread();
                            Thread.sleep(CoherenceEventChannelController.this.getBatchDistributionDelay());
                        } catch (InterruptedException e) {
                            if (CoherenceEventChannelController.logger.isLoggable(Level.INFO)) {
                                CoherenceEventChannelController.logger.log(Level.INFO, "Draining {0} was interrupted.", new Object[]{CoherenceEventChannelController.this.subscriptionIdentifier});
                            }
                        }
                    }
                }
                if (CoherenceEventChannelController.logger.isLoggable(Level.INFO)) {
                    CoherenceEventChannelController.logger.log(Level.INFO, "Completed draining {0} messages from {1} (in the background)", new Object[]{Long.valueOf(size), CoherenceEventChannelController.this.subscriptionIdentifier});
                }
            }
        }).start();
    }

    @Override // com.oracle.coherence.patterns.eventdistribution.distributors.AbstractEventChannelController
    protected void internalStart() throws EventChannelNotReadyException {
        this.channel.connect(this.distributorIdentifier, this.controllerIdentifier);
    }

    @Override // com.oracle.coherence.patterns.eventdistribution.distributors.AbstractEventChannelController
    protected void internalStop() {
        this.channel.disconnect();
    }

    @Override // com.oracle.coherence.patterns.eventdistribution.distributors.AbstractEventChannelController, com.oracle.coherence.patterns.eventdistribution.EventChannelControllerMBean
    public void setBatchDistributionDelay(long j) {
        if (logger.isLoggable(Level.INFO)) {
            logger.log(Level.INFO, "Changing Batch Distribution Delay from {0} ms to {1} ms", new Object[]{Long.valueOf(this.controllerDependencies.getBatchDistributionDelay()), Long.valueOf(j)});
        }
        super.setBatchDistributionDelay(j);
        CacheFactory.getCache("coherence.messagingpattern.subscriptions").invoke(this.subscriptionIdentifier, new UpdaterProcessor("getEventControllerDependencies.setBatchDistributionDelay", Long.valueOf(j)));
    }

    @Override // com.oracle.coherence.patterns.eventdistribution.distributors.AbstractEventChannelController, com.oracle.coherence.patterns.eventdistribution.EventChannelControllerMBean
    public void setBatchSize(int i) {
        int i2 = i <= 0 ? 1 : i;
        if (logger.isLoggable(Level.INFO)) {
            logger.log(Level.INFO, "Changing Batch Size from {0} to {1}", new Object[]{Integer.valueOf(this.controllerDependencies.getBatchSize()), Integer.valueOf(i2)});
        }
        super.setBatchSize(i2);
        CacheFactory.getCache("coherence.messagingpattern.subscriptions").invoke(this.subscriptionIdentifier, new UpdaterProcessor("getEventControllerDependencies.setBatchSize", Integer.valueOf(i2)));
    }

    @Override // com.oracle.coherence.patterns.eventdistribution.distributors.AbstractEventChannelController, com.oracle.coherence.patterns.eventdistribution.EventChannelControllerMBean
    public void setRestartDelay(long j) {
        if (logger.isLoggable(Level.INFO)) {
            logger.log(Level.INFO, "Changing Restart Delay from {0} ms to {1} ms", new Object[]{Long.valueOf(this.controllerDependencies.getBatchDistributionDelay()), Long.valueOf(j)});
        }
        super.setRestartDelay(j);
        CacheFactory.getCache("coherence.messagingpattern.subscriptions").invoke(this.subscriptionIdentifier, new UpdaterProcessor("getEventControllerDependencies.setRestartDelay", Long.valueOf(j)));
    }

    @Override // com.oracle.coherence.patterns.eventdistribution.distributors.AbstractEventChannelController, com.oracle.coherence.patterns.eventdistribution.EventChannelController
    public void setStartingMode(EventChannelController.Mode mode) {
        if (mode != getStartingMode()) {
            if (logger.isLoggable(Level.INFO)) {
                logger.log(Level.INFO, "Changing Starting Mode from {0} to {1}", new Object[]{this.controllerDependencies.getStartingMode(), mode});
            }
            super.setStartingMode(mode);
            CacheFactory.getCache("coherence.messagingpattern.subscriptions").invoke(this.subscriptionIdentifier, new UpdaterProcessor("getEventControllerDependencies.setStartingMode", mode));
        }
    }

    @Override // com.oracle.coherence.patterns.eventdistribution.distributors.AbstractEventChannelController
    protected Pair<List<Event>, MessageTracker> getEventsToDistribute() {
        DefaultMessageTracker defaultMessageTracker = new DefaultMessageTracker();
        ArrayList arrayList = new ArrayList(this.controllerDependencies.getBatchSize());
        MessageTracker messageTracker = (MessageTracker) CacheFactory.getCache("coherence.messagingpattern.subscriptions").invoke(this.subscriptionIdentifier, new ExtractorProcessor("getVisibleMessageTracker"));
        if (messageTracker != null && !messageTracker.isEmpty()) {
            ArrayList arrayList2 = new ArrayList(this.controllerDependencies.getBatchSize());
            Iterator it = messageTracker.iterator();
            while (it.hasNext() && arrayList2.size() < this.controllerDependencies.getBatchSize()) {
                MessageIdentifier messageIdentifier = (MessageIdentifier) it.next();
                arrayList2.add(Message.getKey(this.subscriptionIdentifier.getDestinationIdentifier(), messageIdentifier));
                defaultMessageTracker.add(messageIdentifier);
            }
            Map all = CacheFactory.getCache("coherence.messagingpattern.messages").getAll(arrayList2);
            Iterator it2 = arrayList2.iterator();
            while (it2.hasNext()) {
                Message message = (Message) all.get((MessageKey) it2.next());
                if (message != null && !message.isAcknowledgedBy(this.subscriptionIdentifier)) {
                    verifyMessageSequence(message);
                    if (message.getPayload() instanceof Event) {
                        arrayList.add((Event) message.getPayload());
                    }
                }
            }
        }
        return new Pair<>(arrayList, defaultMessageTracker);
    }

    /* renamed from: acknowledgeDistributedEvents, reason: avoid collision after fix types in other method */
    protected void acknowledgeDistributedEvents2(List<Event> list, MessageTracker messageTracker) {
        ArrayList arrayList = new ArrayList(list.size());
        Iterator it = messageTracker.iterator();
        while (it.hasNext()) {
            arrayList.add(Message.getKey(this.subscriptionIdentifier.getDestinationIdentifier(), (MessageIdentifier) it.next()));
        }
        CacheFactory.getCache("coherence.messagingpattern.messages").invokeAll(arrayList, new AcknowledgeMessageProcessor(this.subscriptionIdentifier));
        CacheFactory.getCache("coherence.messagingpattern.subscriptions").invoke(this.subscriptionIdentifier, new AcknowledgeSubscriptionMessagesProcessor(messageTracker));
    }

    @Override // com.oracle.coherence.patterns.eventdistribution.distributors.AbstractEventChannelController
    protected /* bridge */ /* synthetic */ void acknowledgeDistributedEvents(List list, MessageTracker messageTracker) {
        acknowledgeDistributedEvents2((List<Event>) list, messageTracker);
    }
}
