package com.oracle.coherence.patterns.eventdistribution.distributors.coherence;

import com.oracle.coherence.common.events.Event;
import com.oracle.coherence.common.identifiers.StringBasedIdentifier;
import com.oracle.coherence.patterns.eventdistribution.EventChannelController;
import com.oracle.coherence.patterns.eventdistribution.EventDistributor;
import com.oracle.coherence.patterns.eventdistribution.distributors.AbstractEventChannelController;
import com.oracle.coherence.patterns.messaging.DefaultMessagingSession;
import com.oracle.coherence.patterns.messaging.DefaultSubscriptionConfiguration;
import com.oracle.coherence.patterns.messaging.MessagingSession;
import com.oracle.coherence.patterns.messaging.entryprocessors.TopicSubscribeProcessor;
import com.tangosol.coherence.config.ParameterList;
import com.tangosol.coherence.config.builder.ParameterizedBuilder;
import com.tangosol.config.expression.ParameterResolver;
import com.tangosol.io.Serializer;
import com.tangosol.net.CacheFactory;
import com.tangosol.net.NamedCache;
import com.tangosol.net.cache.ContinuousQueryCache;
import com.tangosol.util.Base;
import com.tangosol.util.Filter;
import com.tangosol.util.MapEvent;
import com.tangosol.util.MapEventTransformer;
import com.tangosol.util.MapListener;
import com.tangosol.util.ValueExtractor;
import com.tangosol.util.extractor.KeyExtractor;
import com.tangosol.util.extractor.ReflectionExtractor;
import com.tangosol.util.filter.AndFilter;
import com.tangosol.util.filter.EqualsFilter;
import com.tangosol.util.filter.MapEventFilter;
import com.tangosol.util.filter.MapEventTransformerFilter;
import com.tangosol.util.filter.OrFilter;
import com.tangosol.util.filter.ValueChangeEventFilter;
import com.tangosol.util.transformer.ExtractorEventTransformer;
import com.tangosol.util.transformer.SemiLiteEventTransformer;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;

/* loaded from: input_file:com/oracle/coherence/patterns/eventdistribution/distributors/coherence/CoherenceEventDistributor.class */
public class CoherenceEventDistributor implements EventDistributor, CoherenceEventDistributorMBean {
    private static final Logger logger = Logger.getLogger(CoherenceEventDistributorBuilder.class.getName());
    private EventDistributor.Identifier identifier;
    private MessagingSession messagingSession;
    private ParameterizedBuilder<Serializer> serializerBuilder;
    private Serializer serializer;
    private volatile ContinuousQueryCache eventChannelStartingModes;
    private AtomicInteger activeEventChannelCount;

    /* loaded from: input_file:com/oracle/coherence/patterns/eventdistribution/distributors/coherence/CoherenceEventDistributor$CustomContinuousQueryCache.class */
    public static class CustomContinuousQueryCache extends ContinuousQueryCache {
        private static ValueExtractor transformer = new ReflectionExtractor("getStartingMode");

        public CustomContinuousQueryCache(NamedCache namedCache, Filter filter, boolean z, MapListener mapListener) {
            super(namedCache, filter, z, mapListener, transformer);
        }

        protected Filter createTransformerFilter(MapEventFilter mapEventFilter) {
            return new MapEventTransformerFilter((Filter) (transformer == null ? mapEventFilter : new AndFilter(mapEventFilter, new OrFilter(new MapEventFilter(5), new ValueChangeEventFilter(transformer)))), (MapEventTransformer) (transformer == null ? SemiLiteEventTransformer.INSTANCE : new ExtractorEventTransformer((ValueExtractor) null, transformer)));
        }
    }

    public CoherenceEventDistributor(String str, String str2, ParameterizedBuilder<Serializer> parameterizedBuilder) {
        if (logger.isLoggable(Level.INFO)) {
            logger.log(Level.INFO, String.format("Using Coherence-based Event Distributor [%s] (%s).", str, str2));
        }
        Base.azzert(str != null);
        Base.azzert(str2 != null);
        Base.azzert(parameterizedBuilder != null);
        this.identifier = new EventDistributor.Identifier(str, str2);
        this.serializerBuilder = parameterizedBuilder;
        this.messagingSession = DefaultMessagingSession.getInstance();
        this.messagingSession.createTopic(getIdentifier().getExternalName());
        this.eventChannelStartingModes = null;
        this.activeEventChannelCount = new AtomicInteger(-1);
    }

    @Override // com.oracle.coherence.patterns.eventdistribution.EventDistributor
    public EventDistributor.Identifier getIdentifier() {
        return this.identifier;
    }

    @Override // com.oracle.coherence.patterns.eventdistribution.EventDistributor
    public EventChannelController.Identifier establishEventChannelController(EventChannelController.Dependencies dependencies, ParameterResolver parameterResolver, ClassLoader classLoader) {
        establishEventChannelStartingMode();
        if (this.serializer == null) {
            this.serializer = (Serializer) this.serializerBuilder.realize(parameterResolver, classLoader, (ParameterList) null);
        }
        EventChannelController.Identifier identifier = new EventChannelController.Identifier(dependencies.getChannelName(), dependencies.getExternalName());
        CoherenceEventChannelSubscription coherenceEventChannelSubscription = new CoherenceEventChannelSubscription(getIdentifier(), identifier, dependencies, (String) parameterResolver.resolve("cache-name").evaluate(parameterResolver).get(), parameterResolver);
        CacheFactory.getCache("coherence.messagingpattern.destinations").invoke(coherenceEventChannelSubscription.getIdentifier().getDestinationIdentifier(), new TopicSubscribeProcessor(coherenceEventChannelSubscription.getIdentifier(), new DefaultSubscriptionConfiguration(), coherenceEventChannelSubscription));
        return identifier;
    }

    protected NamedCache establishEventChannelStartingMode() {
        if (this.eventChannelStartingModes == null) {
            synchronized (this) {
                if (this.eventChannelStartingModes == null) {
                    this.eventChannelStartingModes = new CustomContinuousQueryCache(CacheFactory.getCache("coherence.messagingpattern.subscriptions"), new EqualsFilter(new KeyExtractor("getDestinationIdentifier"), StringBasedIdentifier.newInstance(this.identifier.getExternalName())), false, new MapListener() { // from class: com.oracle.coherence.patterns.eventdistribution.distributors.coherence.CoherenceEventDistributor.1
                        public void entryInserted(MapEvent mapEvent) {
                            if (((EventChannelController.Mode) mapEvent.getNewValue()) == EventChannelController.Mode.DISABLED || CoherenceEventDistributor.this.activeEventChannelCount.getAndIncrement() != -1) {
                                return;
                            }
                            CoherenceEventDistributor.this.activeEventChannelCount.incrementAndGet();
                        }

                        public void entryUpdated(MapEvent mapEvent) {
                            EventChannelController.Mode mode = (EventChannelController.Mode) mapEvent.getOldValue();
                            EventChannelController.Mode mode2 = (EventChannelController.Mode) mapEvent.getNewValue();
                            if (mode == EventChannelController.Mode.DISABLED && mode2 != EventChannelController.Mode.DISABLED) {
                                CoherenceEventDistributor.this.activeEventChannelCount.incrementAndGet();
                            }
                            if (mode == EventChannelController.Mode.DISABLED || mode2 != EventChannelController.Mode.DISABLED) {
                                return;
                            }
                            CoherenceEventDistributor.this.activeEventChannelCount.decrementAndGet();
                        }

                        public void entryDeleted(MapEvent mapEvent) {
                            if (((EventChannelController.Mode) mapEvent.getOldValue()) != EventChannelController.Mode.DISABLED) {
                                CoherenceEventDistributor.this.activeEventChannelCount.decrementAndGet();
                            }
                        }
                    });
                }
            }
        }
        return this.eventChannelStartingModes;
    }

    @Override // com.oracle.coherence.patterns.eventdistribution.EventDistributor
    public void distribute(Event event) {
        if (this.activeEventChannelCount.get() == 0) {
            if (logger.isLoggable(Level.FINEST)) {
                logger.log(Level.FINEST, "Skipping {0} (all event channels disabled).", event);
            }
        } else {
            if (logger.isLoggable(Level.FINEST)) {
                logger.log(Level.FINEST, "Distributing {0}.", event);
            }
            this.messagingSession.publishMessage(StringBasedIdentifier.newInstance(getIdentifier().getExternalName()), event);
        }
    }

    @Override // com.oracle.coherence.patterns.eventdistribution.EventDistributor
    public void distribute(List<Event> list) {
        Iterator<Event> it = list.iterator();
        while (it.hasNext()) {
            distribute(it.next());
        }
    }

    @Override // com.oracle.coherence.patterns.eventdistribution.EventDistributorMBean
    public String getEventDistributorName() {
        return this.identifier.getSymbolicName();
    }

    private void universallyRaiseEventChannelControllerEvent(AbstractEventChannelController.ControllerEvent controllerEvent) {
        CacheFactory.getCache("coherence.messagingpattern.subscriptions").invokeAll(new EqualsFilter(new KeyExtractor("getDestinationIdentifier"), StringBasedIdentifier.newInstance(this.identifier.getExternalName())), new AbstractEventChannelController.RaiseControllerEventProcessor(controllerEvent));
    }

    @Override // com.oracle.coherence.patterns.eventdistribution.EventDistributorMBean
    public void startAll() {
        universallyRaiseEventChannelControllerEvent(AbstractEventChannelController.ControllerEvent.START);
    }

    @Override // com.oracle.coherence.patterns.eventdistribution.EventDistributorMBean
    public void suspendAll() {
        universallyRaiseEventChannelControllerEvent(AbstractEventChannelController.ControllerEvent.SUSPEND);
    }

    @Override // com.oracle.coherence.patterns.eventdistribution.EventDistributorMBean
    public void disableAll() {
        universallyRaiseEventChannelControllerEvent(AbstractEventChannelController.ControllerEvent.DISABLE);
    }
}
