package com.tangosol.internal.net.topic.impl.paged.agent;

import com.oracle.coherence.common.base.Logger;
import com.tangosol.internal.net.topic.impl.paged.PagedTopicCaches;
import com.tangosol.internal.net.topic.impl.paged.PagedTopicSubscriber;
import com.tangosol.internal.net.topic.impl.paged.model.SubscriberGroupId;
import com.tangosol.internal.net.topic.impl.paged.model.SubscriberInfo;
import com.tangosol.io.pof.PofReader;
import com.tangosol.io.pof.PofWriter;
import com.tangosol.net.DistributedCacheService;
import com.tangosol.net.Member;
import com.tangosol.net.NamedCache;
import com.tangosol.net.partition.PartitionSet;
import com.tangosol.util.BinaryEntry;
import com.tangosol.util.InvocableMap;
import com.tangosol.util.UUID;
import com.tangosol.util.filter.AlwaysFilter;
import com.tangosol.util.filter.PartitionedFilter;
import com.tangosol.util.processor.AbstractEvolvableProcessor;
import java.io.IOException;
import java.time.Instant;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.BiFunction;
import java.util.stream.Collectors;

/* loaded from: input_file:com/tangosol/internal/net/topic/impl/paged/agent/CleanupSubscribers.class */
public class CleanupSubscribers extends AbstractEvolvableProcessor<SubscriberInfo.Key, SubscriberInfo, Boolean> {
    public void execute(DistributedCacheService distributedCacheService) {
        execute(distributedCacheService, null);
    }

    public void execute(DistributedCacheService distributedCacheService, PartitionSet partitionSet) {
        if (distributedCacheService.isRunning()) {
            ClassLoader contextClassLoader = distributedCacheService.getContextClassLoader();
            Enumeration cacheNames = distributedCacheService.getCacheNames();
            HashSet<String> hashSet = new HashSet();
            while (cacheNames.hasMoreElements()) {
                hashSet.add(PagedTopicCaches.Names.getTopicName((String) cacheNames.nextElement()));
            }
            for (String str : hashSet) {
                try {
                } catch (Throwable th) {
                    Logger.err("Caught exception cleaning up subscribers in topic " + str, th);
                }
                if (!distributedCacheService.isRunning()) {
                    return;
                }
                PartitionedFilter partitionedFilter = new PartitionedFilter(AlwaysFilter.INSTANCE(), partitionSet == null ? distributedCacheService.getOwnedPartitions(distributedCacheService.getCluster().getLocalMember()) : partitionSet);
                NamedCache ensureCache = distributedCacheService.ensureCache(PagedTopicCaches.Names.SUBSCRIBERS.cacheNameForTopicName(str), contextClassLoader);
                if (ensureCache.isActive()) {
                    ensureCache.async().invokeAll(partitionedFilter, this).handle((BiFunction<? super Map<K, R>, Throwable, ? extends U>) (map, th2) -> {
                        if (th2 != null) {
                            if (!ensureCache.isActive()) {
                                return null;
                            }
                            Logger.err("Caught exception cleaning up subscribers in topic " + str, th2);
                            return null;
                        }
                        if (map.isEmpty()) {
                            return null;
                        }
                        HashMap hashMap = new HashMap();
                        for (SubscriberInfo.Key key : map.keySet()) {
                            ((List) ((Map) hashMap.computeIfAbsent(Integer.valueOf(PagedTopicSubscriber.memberIdFromId(key.getSubscriberId())), num -> {
                                return new HashMap();
                            })).computeIfAbsent(key.getGroupId(), subscriberGroupId -> {
                                return new ArrayList();
                            })).add(Long.valueOf(key.getSubscriberId()));
                        }
                        for (Map.Entry entry : hashMap.entrySet()) {
                            Logger.info("Removed the following subscribers from topic " + str + " due to departure of member " + ((Integer) entry.getKey()).intValue() + " " + ((String) ((Map) entry.getValue()).entrySet().stream().map(entry2 -> {
                                return "[Group='" + ((SubscriberGroupId) entry2.getKey()).getGroupName() + "' Subscribers=" + PagedTopicSubscriber.idToString((Collection<Long>) entry2.getValue()) + "]";
                            }).collect(Collectors.joining(", "))));
                        }
                        return null;
                    });
                }
            }
        }
    }

    @Override // com.tangosol.util.processor.AbstractEvolvableProcessor, com.tangosol.util.InvocableMap.EntryProcessor
    public Map<SubscriberInfo.Key, Boolean> processAll(Set<? extends InvocableMap.Entry<SubscriberInfo.Key, SubscriberInfo>> set) {
        HashMap hashMap = new HashMap();
        BinaryEntry binaryEntry = (BinaryEntry) set.stream().findFirst().map((v0) -> {
            return v0.asBinaryEntry();
        }).orElse(null);
        if (binaryEntry != null) {
            Map<Integer, Member> map = (Map) ((DistributedCacheService) binaryEntry.getContext().getCacheService()).getInfo().getServiceMembers().stream().collect(Collectors.toMap((v0) -> {
                return v0.getId();
            }, member -> {
                return member;
            }));
            for (InvocableMap.Entry<SubscriberInfo.Key, SubscriberInfo> entry : set) {
                if (process(entry, map).booleanValue()) {
                    hashMap.put(entry.getKey(), true);
                }
            }
        }
        return hashMap;
    }

    @Override // com.tangosol.util.processor.AbstractEvolvableProcessor, com.tangosol.util.InvocableMap.EntryProcessor
    public Boolean process(InvocableMap.Entry<SubscriberInfo.Key, SubscriberInfo> entry) {
        if (entry.isPresent()) {
            return process(entry, (Map) ((DistributedCacheService) entry.asBinaryEntry().getContext().getCacheService()).getInfo().getServiceMembers().stream().collect(Collectors.toMap((v0) -> {
                return v0.getId();
            }, member -> {
                return member;
            })));
        }
        return false;
    }

    @Override // com.tangosol.io.AbstractEvolvable, com.tangosol.io.Evolvable
    public int getImplVersion() {
        return 0;
    }

    @Override // com.tangosol.io.pof.PortableObject
    public void readExternal(PofReader pofReader) throws IOException {
    }

    @Override // com.tangosol.io.pof.PortableObject
    public void writeExternal(PofWriter pofWriter) throws IOException {
    }

    /* JADX WARN: Type inference failed for: r0v25, types: [java.time.LocalDateTime] */
    Boolean process(InvocableMap.Entry<SubscriberInfo.Key, SubscriberInfo> entry, Map<Integer, Member> map) {
        if (entry.isPresent()) {
            long subscriberId = entry.getKey().getSubscriberId();
            SubscriberInfo value = entry.getValue();
            UUID owningUid = value.getOwningUid();
            Member member = map.get(Integer.valueOf(PagedTopicSubscriber.memberIdFromId(subscriberId)));
            boolean z = false;
            if (member == null) {
                z = true;
            } else if (owningUid != null && !member.getUuid().equals(owningUid)) {
                z = true;
            } else if (Instant.ofEpochMilli(member.getTimestamp()).atZone(ZoneId.systemDefault()).toLocalDateTime().isAfter(value.getLastHeartbeat())) {
                z = true;
            }
            if (z) {
                entry.remove(false);
                return true;
            }
        }
        return false;
    }

    @Override // com.tangosol.util.processor.AbstractEvolvableProcessor, com.tangosol.util.InvocableMap.EntryProcessor
    public /* bridge */ /* synthetic */ Object process(InvocableMap.Entry entry) {
        return process((InvocableMap.Entry<SubscriberInfo.Key, SubscriberInfo>) entry);
    }
}
