package com.oracle.coherence.patterns.eventdistribution.channels.cache;

import com.oracle.coherence.common.events.Event;
import com.oracle.coherence.common.resourcing.InvocationServiceSupervisedResourceProvider;
import com.oracle.coherence.common.resourcing.SupervisedResourceProvider;
import com.oracle.coherence.patterns.eventdistribution.EventChannelBuilder;
import com.oracle.coherence.patterns.eventdistribution.EventChannelController;
import com.oracle.coherence.patterns.eventdistribution.EventChannelNotReadyException;
import com.oracle.coherence.patterns.eventdistribution.EventDistributor;
import com.oracle.coherence.patterns.eventdistribution.channels.RemoteClusterEventChannel;
import com.oracle.coherence.patterns.eventdistribution.events.DistributableEntryEvent;
import com.tangosol.coherence.config.builder.ParameterizedBuilder;
import com.tangosol.config.expression.ParameterResolver;
import com.tangosol.net.CacheFactory;
import com.tangosol.net.DistributedCacheService;
import com.tangosol.net.InvocationObserver;
import com.tangosol.net.InvocationService;
import com.tangosol.net.Member;
import com.tangosol.net.NamedCache;
import com.tangosol.net.RequestTimeoutException;
import com.tangosol.util.Base;
import com.tangosol.util.NullImplementation;
import com.tangosol.util.ResourceRegistry;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.MissingResourceException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Logger;

/* loaded from: input_file:com/oracle/coherence/patterns/eventdistribution/channels/cache/ParallelLocalCacheEventChannel.class */
public class ParallelLocalCacheEventChannel implements CacheEventChannel {
    private static Logger logger = Logger.getLogger(ParallelLocalCacheEventChannel.class.getName());
    public static final long TOTAL_WAIT_MILLIS = 60000;
    private EventDistributor.Identifier distributorIdentifier;
    private EventChannelController.Identifier controllerIdentifier;
    private String targetCacheName;
    private ParameterizedBuilder<ConflictResolver> conflictResolverBuilder;
    private EventChannelBuilder remoteChannelBuilder;
    private ParameterResolver resolver;
    private ResourceRegistry registry;
    private DistributedCacheService service;
    private String invocationServiceName;
    private final AtomicInteger completedCount = new AtomicInteger();
    private final Object NOTIFIER = new Object();

    /* loaded from: input_file:com/oracle/coherence/patterns/eventdistribution/channels/cache/ParallelLocalCacheEventChannel$RemoteChannelAgentObserver.class */
    public class RemoteChannelAgentObserver implements InvocationObserver {
        private final List<Event> eventList;
        private volatile boolean completed;
        private volatile boolean memberLeft;
        private volatile Throwable exception;
        private volatile long ldtStart = Base.getSafeTimeMillis();

        public RemoteChannelAgentObserver(List<Event> list) {
            this.eventList = list;
        }

        public Iterator<Event> getEvents() {
            return this.eventList.iterator();
        }

        public boolean isCompleted() {
            return this.completed;
        }

        public boolean isMemberLeft() {
            return this.memberLeft;
        }

        public Throwable getException() {
            return this.exception;
        }

        public void memberCompleted(Member member, Object obj) {
            ParallelLocalCacheEventChannel.this.incrementCompleteCount(((Integer) obj).intValue());
            this.completed = true;
        }

        public void memberFailed(Member member, Throwable th) {
            this.exception = th;
        }

        public void memberLeft(Member member) {
            this.memberLeft = true;
        }

        public void invocationCompleted() {
            synchronized (ParallelLocalCacheEventChannel.this.NOTIFIER) {
                ParallelLocalCacheEventChannel.this.NOTIFIER.notifyAll();
            }
        }
    }

    public ParallelLocalCacheEventChannel(String str, String str2, EventChannelBuilder eventChannelBuilder, ResourceRegistry resourceRegistry, ParameterResolver parameterResolver) {
        this.targetCacheName = str;
        this.invocationServiceName = str2;
        this.remoteChannelBuilder = eventChannelBuilder;
        this.resolver = parameterResolver;
        this.registry = resourceRegistry;
    }

    private SupervisedResourceProvider<InvocationService> getSupervised() {
        return (SupervisedResourceProvider) this.registry.getResource(InvocationServiceSupervisedResourceProvider.class, this.invocationServiceName);
    }

    @Override // com.oracle.coherence.patterns.eventdistribution.channels.cache.CacheEventChannel
    public String getTargetCacheName() {
        return this.targetCacheName;
    }

    @Override // com.oracle.coherence.patterns.eventdistribution.channels.cache.CacheEventChannel
    public ParameterizedBuilder<ConflictResolver> getConflictResolverBuilder() {
        return this.conflictResolverBuilder;
    }

    @Override // com.oracle.coherence.patterns.eventdistribution.EventChannel
    public void connect(EventDistributor.Identifier identifier, EventChannelController.Identifier identifier2) throws EventChannelNotReadyException {
        this.distributorIdentifier = identifier;
        this.controllerIdentifier = identifier2;
    }

    @Override // com.oracle.coherence.patterns.eventdistribution.EventChannel
    public void disconnect() {
    }

    @Override // com.oracle.coherence.patterns.eventdistribution.channels.cache.CacheEventChannel
    public NamedCache getTargetNamedCache() {
        return (getTargetCacheName() == null || getTargetCacheName().isEmpty()) ? CacheFactory.getCache(this.distributorIdentifier.getSymbolicName(), NullImplementation.getClassLoader()) : CacheFactory.getCache(getTargetCacheName(), NullImplementation.getClassLoader());
    }

    protected DistributedCacheService getCacheService() {
        if (this.service == null) {
            this.service = getTargetNamedCache().getCacheService();
        }
        return this.service;
    }

    @Override // com.oracle.coherence.patterns.eventdistribution.EventChannel
    public int send(Iterator<Event> it) {
        Map<Member, List<Event>> mapEventsToMember = mapEventsToMember(it);
        HashSet hashSet = new HashSet();
        for (Map.Entry<Member, List<Event>> entry : mapEventsToMember.entrySet()) {
            hashSet.add(publishList(entry.getKey(), entry.getValue()));
        }
        long currentTimeMillis = System.currentTimeMillis();
        while (!hashSet.isEmpty() && System.currentTimeMillis() - currentTimeMillis < TOTAL_WAIT_MILLIS) {
            synchronized (this.NOTIFIER) {
                try {
                    this.NOTIFIER.wait(10L);
                } catch (InterruptedException e) {
                    Thread.interrupted();
                    throw Base.ensureRuntimeException(e);
                }
            }
            Iterator it2 = new HashSet(hashSet).iterator();
            while (it2.hasNext()) {
                RemoteChannelAgentObserver remoteChannelAgentObserver = (RemoteChannelAgentObserver) it2.next();
                if (remoteChannelAgentObserver.isCompleted()) {
                    hashSet.remove(remoteChannelAgentObserver);
                } else if (remoteChannelAgentObserver.isMemberLeft()) {
                    for (Map.Entry<Member, List<Event>> entry2 : mapEventsToMember(remoteChannelAgentObserver.getEvents()).entrySet()) {
                        hashSet.add(publishList(entry2.getKey(), entry2.getValue()));
                    }
                    hashSet.remove(remoteChannelAgentObserver);
                } else if (remoteChannelAgentObserver.getException() != null) {
                    throw Base.ensureRuntimeException(remoteChannelAgentObserver.getException());
                }
            }
        }
        if (hashSet.isEmpty()) {
            return getCompletedCount();
        }
        throw new RequestTimeoutException(String.format("InvocationService did not receive a response from members %s after %s ms", hashSet, Long.valueOf(System.currentTimeMillis() - currentTimeMillis)));
    }

    protected Map<Member, List<Event>> mapEventsToMember(Iterator<Event> it) {
        Member localMember;
        HashMap hashMap = new HashMap();
        while (it.hasNext()) {
            DistributableEntryEvent distributableEntryEvent = (Event) it.next();
            if (distributableEntryEvent instanceof DistributableEntryEvent) {
                localMember = getCacheService().getPartitionOwner(getCacheService().getBackingMapManager().getContext().getKeyPartition(distributableEntryEvent.m69getEntry().getBinaryKey()));
            } else {
                localMember = getCacheService().getCluster().getLocalMember();
            }
            List list = (List) hashMap.get(localMember);
            if (list == null) {
                list = new ArrayList();
            }
            list.add(distributableEntryEvent);
            hashMap.put(localMember, list);
        }
        return hashMap;
    }

    protected RemoteChannelAgentObserver publishList(Member member, List<Event> list) {
        SupervisedResourceProvider<InvocationService> supervised = getSupervised();
        if (supervised == null) {
            this.registry.registerResource(InvocationServiceSupervisedResourceProvider.class, this.invocationServiceName, new InvocationServiceSupervisedResourceProvider(this.invocationServiceName));
            supervised = getSupervised();
        }
        if (!supervised.isResourceAccessible()) {
            throw new MissingResourceException("Unable to resolve the following invocation service", InvocationService.class.toString(), this.invocationServiceName);
        }
        InvocationService invocationService = (InvocationService) supervised.getResource();
        RemoteChannelAgentObserver remoteChannelAgentObserver = new RemoteChannelAgentObserver(list);
        invocationService.execute(new RemoteClusterEventChannel.RemoteDistributionAgent(this.distributorIdentifier, this.controllerIdentifier, list, this.remoteChannelBuilder, this.resolver), Collections.singleton(member), remoteChannelAgentObserver);
        return remoteChannelAgentObserver;
    }

    public void incrementCompleteCount(int i) {
        this.completedCount.addAndGet(i);
    }

    public int getCompletedCount() {
        return this.completedCount.get();
    }
}
