package com.tangosol.internal.net.topic.impl.paged;

import com.oracle.coherence.common.base.Exceptions;
import com.oracle.coherence.common.base.Logger;
import com.oracle.coherence.common.base.TimeHelper;
import com.oracle.coherence.common.util.Options;
import com.oracle.coherence.common.util.SafeClock;
import com.oracle.coherence.common.util.Sentry;
import com.tangosol.coherence.config.Config;
import com.tangosol.internal.net.DebouncedFlowControl;
import com.tangosol.internal.net.metrics.Meter;
import com.tangosol.internal.net.topic.impl.paged.BatchingOperationsQueue;
import com.tangosol.internal.net.topic.impl.paged.PagedTopicCaches;
import com.tangosol.internal.net.topic.impl.paged.agent.CloseSubscriptionProcessor;
import com.tangosol.internal.net.topic.impl.paged.agent.CommitProcessor;
import com.tangosol.internal.net.topic.impl.paged.agent.DestroySubscriptionProcessor;
import com.tangosol.internal.net.topic.impl.paged.agent.EvictSubscriber;
import com.tangosol.internal.net.topic.impl.paged.agent.HeadAdvancer;
import com.tangosol.internal.net.topic.impl.paged.agent.PollProcessor;
import com.tangosol.internal.net.topic.impl.paged.agent.SeekProcessor;
import com.tangosol.internal.net.topic.impl.paged.agent.SubscriberHeartbeatProcessor;
import com.tangosol.internal.net.topic.impl.paged.model.ContentKey;
import com.tangosol.internal.net.topic.impl.paged.model.NotificationKey;
import com.tangosol.internal.net.topic.impl.paged.model.Page;
import com.tangosol.internal.net.topic.impl.paged.model.PageElement;
import com.tangosol.internal.net.topic.impl.paged.model.PagedPosition;
import com.tangosol.internal.net.topic.impl.paged.model.PagedTopicSubscription;
import com.tangosol.internal.net.topic.impl.paged.model.SubscriberGroupId;
import com.tangosol.internal.net.topic.impl.paged.model.SubscriberId;
import com.tangosol.internal.net.topic.impl.paged.model.SubscriberInfo;
import com.tangosol.internal.net.topic.impl.paged.model.Subscription;
import com.tangosol.internal.sleepycat.je.utilint.DbLsn;
import com.tangosol.internal.util.Daemons;
import com.tangosol.io.Serializer;
import com.tangosol.net.Cluster;
import com.tangosol.net.FlowControl;
import com.tangosol.net.Member;
import com.tangosol.net.NamedCache;
import com.tangosol.net.PagedTopicService;
import com.tangosol.net.PartitionedService;
import com.tangosol.net.events.EventDispatcher;
import com.tangosol.net.events.EventDispatcherAwareInterceptor;
import com.tangosol.net.events.partition.cache.EntryEvent;
import com.tangosol.net.events.partition.cache.PartitionedCacheDispatcher;
import com.tangosol.net.management.MBeanHelper;
import com.tangosol.net.topic.NamedTopic;
import com.tangosol.net.topic.Position;
import com.tangosol.net.topic.Subscriber;
import com.tangosol.net.topic.TopicException;
import com.tangosol.util.AbstractMapListener;
import com.tangosol.util.Base;
import com.tangosol.util.Binary;
import com.tangosol.util.ExternalizableHelper;
import com.tangosol.util.Filter;
import com.tangosol.util.Filters;
import com.tangosol.util.Gate;
import com.tangosol.util.InvocableMapHelper;
import com.tangosol.util.Listeners;
import com.tangosol.util.LongArray;
import com.tangosol.util.MapEvent;
import com.tangosol.util.MapListener;
import com.tangosol.util.SparseArray;
import com.tangosol.util.TaskDaemon;
import com.tangosol.util.ThreadGateLite;
import com.tangosol.util.ValueExtractor;
import com.tangosol.util.aggregator.ComparableMin;
import com.tangosol.util.aggregator.GroupAggregator;
import com.tangosol.util.aggregator.LongMin;
import com.tangosol.util.extractor.ReflectionExtractor;
import com.tangosol.util.filter.InKeySetFilter;
import com.tangosol.util.listener.SimpleMapListener;
import java.io.PrintStream;
import java.lang.invoke.SerializedLambda;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.EventListener;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/* loaded from: input_file:com/tangosol/internal/net/topic/impl/paged/PagedTopicSubscriber.class */
public class PagedTopicSubscriber<V> implements Subscriber<V>, AutoCloseable {
    public static final int STATE_INITIAL = 0;
    public static final int STATE_CONNECTING = 1;
    public static final int STATE_CONNECTED = 2;
    public static final int STATE_DISCONNECTED = 3;
    public static final int STATE_CLOSING = 4;
    public static final int STATE_CLOSED = 5;
    public static final String[] STATES = {"Initial", "Connecting", "Connected", "Disconnected", "CLosing", "Closed"};
    public static final long CLOSE_TIMEOUT_SECS = TimeUnit.MILLISECONDS.toSeconds(Base.parseTime(Config.getProperty("coherence.topic.subscriber.close.timeout", "30s"), 1000));
    public static final long INIT_TIMEOUT_SECS = TimeUnit.MILLISECONDS.toSeconds(Base.parseTime(Config.getProperty("coherence.topic.subscriber.init.timeout", "30s"), 1000));
    private final NamedTopic<?> f_topic;
    protected final PagedTopicCaches m_caches;
    protected final boolean f_fAnonymous;
    protected final SubscriberId f_id;
    protected final SubscriberInfo.Key f_key;
    protected final SubscriberHeartbeatProcessor f_heartbeatProcessor;
    protected final Filter<V> f_filter;
    protected final ValueExtractor<V, ?> f_extractor;
    protected final Serializer f_serializer;
    protected final SubscriberGroupId f_subscriberGroupId;
    protected long m_subscriptionId;
    protected volatile long m_connectionTimestamp;
    protected final int f_nNotificationId;
    protected final Filter<Object> f_filterNotification;
    protected final boolean f_fCompleteOnEmpty;
    private final Gate<?> f_gate;
    private final Gate<?> f_gateState;
    private volatile boolean m_fForceReconnect;
    protected final BatchingOperationsQueue<Request, ?> f_queueReceiveOrders;
    protected final DebouncedFlowControl f_backlog;
    protected volatile PagedTopicChannel[] m_aChannel;
    protected volatile int[] m_aChannelOwned;
    protected volatile int m_nChannel;
    protected final TaskDaemon f_daemon;
    private final Executor f_executor;
    protected final TaskDaemon f_daemonChannels;
    private final Executor f_executorChannels;
    private final SimpleMapListener f_listenerNotification;
    protected final PagedTopicSubscriber<V>.ChannelListener m_listenerChannelAllocation;
    protected final Subscriber.ChannelOwnershipListener[] m_aChannelOwnershipListener;
    protected long m_cPolls;
    protected long m_cPollsLast;
    protected long m_cValues;
    protected long m_cValuesLast;
    protected long m_cWait;
    protected long m_cWaitsLast;
    protected long m_cMisses;
    protected long m_cMissesLast;
    protected long m_cNotify;
    protected long m_cNotifyLast;
    protected final PagedTopicSubscriber<V>.GroupDeactivationListener m_listenerGroupDeactivation;
    private PagedTopicSubscriber<V>.CommittableElement m_elementEmpty;
    private final ReconnectTask f_taskReconnect;
    private final String f_sIdentifyingName;
    private final Lock f_receiveLock = new ReentrantLock();
    private volatile int m_nState = 0;
    protected final ConcurrentLinkedDeque<PagedTopicSubscriber<V>.CommittableElement> m_queueValuesPrefetched = new ConcurrentLinkedDeque<>();
    protected final PagedTopicSubscriber<V>.DeactivationListener f_listenerDeactivation = new DeactivationListener();
    private final List<Runnable> f_listOnCloseActions = new ArrayList();
    private final Meter m_cReceived = new Meter();
    private final Meter m_cReceivedEmpty = new Meter();
    private final Meter m_cReceivedError = new Meter();
    private final Meter m_cSubscribe = new Meter();
    private final Meter m_cDisconnect = new Meter();
    private final Listeners f_stateListeners = new Listeners();
    private final LongAdder f_cReceiveRequests = new LongAdder();
    private final LongAdder f_cCancelled = new LongAdder();

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:com/tangosol/internal/net/topic/impl/paged/PagedTopicSubscriber$ChannelListener.class */
    public class ChannelListener implements PagedTopicSubscription.Listener {
        private CountDownLatch m_latch = new CountDownLatch(1);
        private final SubscriberId f_id;
        private final PagedTopicSubscription.Key f_key;

        public ChannelListener(SubscriberId subscriberId, PagedTopicSubscription.Key key) {
            this.f_id = subscriberId;
            this.f_key = key;
        }

        @Override // com.tangosol.internal.net.topic.impl.paged.model.PagedTopicSubscription.Listener
        public void onUpdate(PagedTopicSubscription pagedTopicSubscription) {
            if (Objects.equals(pagedTopicSubscription.getKey(), this.f_key)) {
                PagedTopicSubscriber.this.f_daemonChannels.executeTask(() -> {
                    onChannelAllocation(pagedTopicSubscription);
                });
            }
        }

        @Override // com.tangosol.internal.net.topic.impl.paged.model.PagedTopicSubscription.Listener
        public void onDelete(PagedTopicSubscription pagedTopicSubscription) {
            if (Objects.equals(pagedTopicSubscription.getKey(), this.f_key)) {
                PagedTopicSubscriber.this.f_daemonChannels.executeTask(() -> {
                    PagedTopicSubscriber.this.updateChannelOwnership(PagedTopicSubscription.NO_CHANNELS, true);
                });
            }
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            ChannelListener channelListener = (ChannelListener) obj;
            return Objects.equals(this.f_id, channelListener.f_id) && Objects.equals(this.f_key, channelListener.f_key);
        }

        public int hashCode() {
            return Objects.hash(this.f_id, this.f_key);
        }

        public void reset() {
            if (this.m_latch.getCount() == 0) {
                PagedTopicSubscriber.this.updateChannelOwnership(PagedTopicSubscription.NO_CHANNELS, true);
                this.m_latch = new CountDownLatch(1);
            }
        }

        private void onChannelAllocation(PagedTopicSubscription pagedTopicSubscription) {
            if (PagedTopicSubscriber.this.isActive()) {
                SortedSet<Integer> sortedSet = null;
                if (pagedTopicSubscription.hasSubscriber(this.f_id)) {
                    sortedSet = pagedTopicSubscription.getOwnedChannels(this.f_id);
                }
                if (sortedSet != null && sortedSet.isEmpty()) {
                    PagedTopicSubscriber.this.updateChannelOwnership(PagedTopicSubscription.NO_CHANNELS, true);
                    return;
                }
                if (sortedSet != null) {
                    PagedTopicSubscriber.this.updateChannelOwnership(sortedSet, false);
                    this.m_latch.countDown();
                } else if (PagedTopicSubscriber.this.isActive() && !PagedTopicSubscriber.this.f_fAnonymous && PagedTopicSubscriber.this.isConnected()) {
                    Logger.finest("Disconnecting Subscriber (null channel set) " + String.valueOf(PagedTopicSubscriber.this));
                    PagedTopicSubscriber.this.updateChannelOwnership(PagedTopicSubscription.NO_CHANNELS, true);
                    PagedTopicSubscriber.this.disconnectInternal(false);
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:com/tangosol/internal/net/topic/impl/paged/PagedTopicSubscriber$CommittableElement.class */
    public class CommittableElement implements Subscriber.Element<V> {
        public static final int EMPTY = -1;
        private final PageElement<V> m_element;
        private final int f_nChannel;

        protected CommittableElement(Binary binary, int i) {
            this.m_element = PageElement.fromBinary(binary, PagedTopicSubscriber.this.f_serializer);
            this.f_nChannel = i;
        }

        PageElement<V> getElement() {
            return this.m_element;
        }

        @Override // com.tangosol.net.topic.Subscriber.Element
        public V getValue() {
            return this.m_element.getValue();
        }

        @Override // com.tangosol.net.topic.Subscriber.Element
        public Binary getBinaryValue() {
            return this.m_element.getBinaryValue();
        }

        @Override // com.tangosol.net.topic.Subscriber.Element
        public int getChannel() {
            return this.f_nChannel;
        }

        @Override // com.tangosol.net.topic.Subscriber.Element
        public Position getPosition() {
            return this.m_element.getPosition();
        }

        @Override // com.tangosol.net.topic.Subscriber.Element
        public Instant getTimestamp() {
            return this.m_element.getTimestamp();
        }

        @Override // com.tangosol.net.topic.Subscriber.Element
        public CompletableFuture<Subscriber.CommitResult> commitAsync() {
            try {
                return PagedTopicSubscriber.this.commitAsync(getChannel(), getPosition());
            } catch (Throwable th) {
                return CompletableFuture.completedFuture(new Subscriber.CommitResult(0, (Position) null, th));
            }
        }

        public boolean isEmpty() {
            return getChannel() == -1;
        }

        public String toString() {
            return "Element(channel=" + this.f_nChannel + ", position=" + String.valueOf(getPosition()) + ", timestamp=" + String.valueOf(getTimestamp()) + ", value=" + String.valueOf(getValue()) + ")";
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:com/tangosol/internal/net/topic/impl/paged/PagedTopicSubscriber$DeactivationListener.class */
    public class DeactivationListener implements PagedTopicCaches.Listener {
        protected DeactivationListener() {
        }

        @Override // com.tangosol.internal.net.topic.impl.paged.PagedTopicCaches.Listener
        public void onConnect() {
        }

        @Override // com.tangosol.internal.net.topic.impl.paged.PagedTopicCaches.Listener
        public void onDisconnect() {
            PagedTopicSubscriber.this.disconnectInternal(false);
        }

        @Override // com.tangosol.internal.net.topic.impl.paged.PagedTopicCaches.Listener
        public void onDestroy() {
            Logger.finest("Detected destroy of topic " + PagedTopicSubscriber.this.m_caches.getTopicName() + ", closing subscriber " + String.valueOf(PagedTopicSubscriber.this));
            CompletableFuture.runAsync(() -> {
                PagedTopicSubscriber.this.closeInternal(true);
            }, PagedTopicSubscriber.this.f_executor);
        }

        @Override // com.tangosol.internal.net.topic.impl.paged.PagedTopicCaches.Listener
        public void onRelease() {
            Logger.finest("Detected release of topic " + PagedTopicSubscriber.this.m_caches.getTopicName() + ", closing subscriber " + String.valueOf(PagedTopicSubscriber.this));
            CompletableFuture.runAsync(() -> {
                PagedTopicSubscriber.this.closeInternal(true);
            }, PagedTopicSubscriber.this.f_executor);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/tangosol/internal/net/topic/impl/paged/PagedTopicSubscriber$FlushMode.class */
    public enum FlushMode {
        FLUSH,
        FLUSH_DESTROY,
        FLUSH_CLOSE_EXCEPTIONALLY
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:com/tangosol/internal/net/topic/impl/paged/PagedTopicSubscriber$FunctionalRequest.class */
    public static abstract class FunctionalRequest implements Request {
        protected FunctionalRequest() {
        }

        protected abstract void execute(PagedTopicSubscriber<?> pagedTopicSubscriber, BatchingOperationsQueue<Request, ?> batchingOperationsQueue);

        protected void onRequestComplete(Object obj) {
        }

        protected Throwable onRequestError(Throwable th, Object obj) {
            return new TopicException(th);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:com/tangosol/internal/net/topic/impl/paged/PagedTopicSubscriber$GetPositionRequest.class */
    public static class GetPositionRequest extends FunctionalRequest {
        private final PositionType f_type;

        public GetPositionRequest(PositionType positionType) {
            this.f_type = positionType;
        }

        @Override // com.tangosol.internal.net.topic.impl.paged.PagedTopicSubscriber.FunctionalRequest
        protected void execute(PagedTopicSubscriber<?> pagedTopicSubscriber, BatchingOperationsQueue<Request, ?> batchingOperationsQueue) {
            Map<Integer, Position> lastCommittedInternal;
            switch (this.f_type) {
                case Head:
                    lastCommittedInternal = pagedTopicSubscriber.getHeadsInternal();
                    break;
                case Tail:
                    lastCommittedInternal = pagedTopicSubscriber.getTailsInternal();
                    break;
                case Committed:
                    lastCommittedInternal = pagedTopicSubscriber.getLastCommittedInternal();
                    break;
                default:
                    throw new IllegalStateException("Unexpected value: " + String.valueOf(this.f_type));
            }
            batchingOperationsQueue.completeElement(lastCommittedInternal, this::onRequestComplete);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:com/tangosol/internal/net/topic/impl/paged/PagedTopicSubscriber$GroupDeactivationListener.class */
    public class GroupDeactivationListener extends AbstractMapListener {
        protected GroupDeactivationListener() {
        }

        @Override // com.tangosol.util.AbstractMapListener, com.tangosol.util.MapListener
        public void entryDeleted(MapEvent mapEvent) {
            if (PagedTopicSubscriber.this.isActive()) {
                Logger.finest("Detected removal of subscriber group " + PagedTopicSubscriber.this.f_subscriberGroupId.getGroupName() + ", closing subscriber " + String.valueOf(PagedTopicSubscriber.this));
                CompletableFuture.runAsync(() -> {
                    PagedTopicSubscriber.this.closeInternal(true);
                }, PagedTopicSubscriber.this.f_executor);
            }
        }
    }

    /* loaded from: input_file:com/tangosol/internal/net/topic/impl/paged/PagedTopicSubscriber$PagedTopicChannel.class */
    public static class PagedTopicChannel implements Subscriber.Channel {
        public static final int HEAD_UNKNOWN = -1;
        volatile boolean m_fEmpty;
        Subscription.Key subscriberPartitionSync;
        boolean m_fContended;
        Set<Subscription.Key> m_setSubscriptionKeys;
        PagedPosition m_lastReceived;
        long m_cPolls;
        PagedPosition m_firstPolled;
        long m_firstPolledTimestamp;
        PagedPosition m_lastPolled;
        long m_lastPolledTimestamp;
        PagedPosition m_lastCommit;
        long m_cCommited;
        boolean m_fPolled;
        boolean m_fHit;
        volatile long m_lHead = -1;
        AtomicLong m_lVersion = new AtomicLong();
        AtomicLong m_cNotify = new AtomicLong();
        int m_nNext = -1;
        volatile boolean m_fOwned = true;
        Meter m_cReceived = new Meter();
        private final Lock m_lock = new ReentrantLock();

        @Override // com.tangosol.net.topic.Subscriber.Channel
        public int getId() {
            return this.subscriberPartitionSync.getChannelId();
        }

        @Override // com.tangosol.net.topic.Subscriber.Channel
        public PagedPosition getLastCommit() {
            return this.m_lastCommit;
        }

        @Override // com.tangosol.net.topic.Subscriber.Channel
        public long getCommitCount() {
            return this.m_cCommited;
        }

        public void committed(PagedPosition pagedPosition) {
            this.m_lastCommit = pagedPosition;
            this.m_cCommited++;
        }

        @Override // com.tangosol.net.topic.Subscriber.Channel
        public PagedPosition getLastReceived() {
            return this.m_lastReceived;
        }

        @Override // com.tangosol.net.topic.Subscriber.Channel
        public long getReceiveCount() {
            return this.m_cReceived.getCount();
        }

        public void received(PagedPosition pagedPosition) {
            this.m_lastReceived = pagedPosition;
            this.m_cReceived.mark();
        }

        @Override // com.tangosol.net.topic.Subscriber.Channel
        public long getPolls() {
            return this.m_cPolls;
        }

        public void adjustPolls(long j) {
            this.m_cPolls += j;
        }

        @Override // com.tangosol.net.topic.Subscriber.Channel
        public Position getFirstPolled() {
            return this.m_firstPolled;
        }

        public void setFirstPolled(PagedPosition pagedPosition, long j) {
            if (this.m_firstPolled == null) {
                this.m_firstPolled = pagedPosition;
                this.m_firstPolledTimestamp = j;
            }
        }

        @Override // com.tangosol.net.topic.Subscriber.Channel
        public long getFirstPolledTimestamp() {
            return this.m_firstPolledTimestamp;
        }

        @Override // com.tangosol.net.topic.Subscriber.Channel
        public Position getLastPolled() {
            return this.m_lastPolled;
        }

        public void setLastPolled(PagedPosition pagedPosition, long j) {
            this.m_lastPolled = pagedPosition;
            this.m_lastPolledTimestamp = j;
        }

        @Override // com.tangosol.net.topic.Subscriber.Channel
        public long getLastPolledTimestamp() {
            return this.m_lastPolledTimestamp;
        }

        @Override // com.tangosol.net.topic.Subscriber.Channel
        public boolean isEmpty() {
            return this.m_fEmpty;
        }

        @Override // com.tangosol.net.topic.Subscriber.Channel
        public boolean isOwned() {
            return this.m_fOwned;
        }

        @Override // com.tangosol.net.topic.Subscriber.Channel
        public int getOwnedCode() {
            return isOwned() ? 1 : 0;
        }

        @Override // com.tangosol.net.topic.Subscriber.Channel
        public PagedPosition getHead() {
            return this.m_lHead == -1 ? PagedPosition.NULL_POSITION : new PagedPosition(this.m_lHead, this.m_nNext);
        }

        @Override // com.tangosol.net.topic.Subscriber.Channel
        public long getReceived() {
            return this.m_cReceived.getCount();
        }

        @Override // com.tangosol.net.topic.Subscriber.Channel
        public double getReceivedMeanRate() {
            return this.m_cReceived.getMeanRate();
        }

        @Override // com.tangosol.net.topic.Subscriber.Channel
        public double getReceivedOneMinuteRate() {
            return this.m_cReceived.getOneMinuteRate();
        }

        @Override // com.tangosol.net.topic.Subscriber.Channel
        public double getReceivedFiveMinuteRate() {
            return this.m_cReceived.getFiveMinuteRate();
        }

        @Override // com.tangosol.net.topic.Subscriber.Channel
        public double getReceivedFifteenMinuteRate() {
            return this.m_cReceived.getFifteenMinuteRate();
        }

        protected void setEmpty(long j) {
            this.m_lock.lock();
            try {
                if (this.m_lVersion.get() == j) {
                    this.m_fEmpty = true;
                }
            } finally {
                this.m_lock.unlock();
            }
        }

        protected long getVersion() {
            this.m_lock.lock();
            try {
                return this.m_lVersion.get();
            } finally {
                this.m_lock.unlock();
            }
        }

        protected void setOwned() {
            this.m_fOwned = true;
        }

        protected void setUnowned() {
            this.m_fOwned = false;
        }

        protected void onChannelPopulatedNotification() {
            this.m_cNotify.incrementAndGet();
            setPopulated();
        }

        protected void setPopulated() {
            this.m_lock.lock();
            try {
                this.m_lVersion.incrementAndGet();
                this.m_fEmpty = false;
            } finally {
                this.m_lock.unlock();
            }
        }

        public long getNotify() {
            return this.m_cNotify.get();
        }

        public void setPolled() {
            this.m_fPolled = true;
        }

        public void clearPolled() {
            this.m_fPolled = false;
        }

        public boolean isPolled() {
            return this.m_fPolled;
        }

        public void setHit() {
            this.m_fHit = true;
        }

        public void clearHit() {
            this.m_fHit = false;
        }

        public boolean isHit() {
            return this.m_fHit;
        }

        protected Set<Subscription.Key> ensureSubscriptionKeys(int i, SubscriberGroupId subscriberGroupId) {
            if (this.m_setSubscriptionKeys == null) {
                int channelId = this.subscriberPartitionSync.getChannelId();
                HashSet hashSet = new HashSet();
                for (int i2 = 0; i2 < i; i2++) {
                    hashSet.add(new Subscription.Key(i2, channelId, subscriberGroupId));
                }
                this.m_setSubscriptionKeys = hashSet;
            }
            return this.m_setSubscriptionKeys;
        }

        public String toString() {
            int channelId = this.subscriberPartitionSync.getChannelId();
            boolean z = this.m_fOwned;
            boolean z2 = this.m_fEmpty;
            long j = this.m_lVersion.get();
            long j2 = this.m_lHead;
            int i = this.m_nNext;
            long j3 = this.m_cPolls;
            long count = this.m_cReceived.getCount();
            long j4 = this.m_cCommited;
            String.valueOf(this.m_firstPolled);
            long j5 = this.m_firstPolledTimestamp;
            String.valueOf(this.m_lastPolled);
            long j6 = this.m_lastPolledTimestamp;
            boolean z3 = this.m_fContended;
            return "Channel=" + channelId + ", owned=" + z + ", empty=" + z2 + ", version=" + j + ", head=" + channelId + ", next=" + j2 + ", polls=" + channelId + ", received=" + i + ", committed=" + j3 + ", first=" + channelId + ", firstTimestamp=" + count + ", last=" + channelId + ", lastTimestamp=" + j4 + ", contended=" + channelId;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:com/tangosol/internal/net/topic/impl/paged/PagedTopicSubscriber$PositionType.class */
    public enum PositionType {
        Head,
        Tail,
        Committed
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:com/tangosol/internal/net/topic/impl/paged/PagedTopicSubscriber$ReceiveRequest.class */
    public static class ReceiveRequest implements Request {
        public static final ReceiveRequest SINGLE = new ReceiveRequest(false, 1);
        private final boolean f_fBatch;
        private final int f_cElement;

        protected ReceiveRequest(boolean z, int i) {
            this.f_fBatch = z;
            this.f_cElement = i;
        }

        public boolean isBatch() {
            return this.f_fBatch;
        }

        public int getElementCount() {
            return this.f_cElement;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:com/tangosol/internal/net/topic/impl/paged/PagedTopicSubscriber$ReconnectTask.class */
    public static class ReconnectTask implements Runnable {
        private final PagedTopicSubscriber<?> m_subscriber;
        private final AtomicInteger f_cExecution = new AtomicInteger();

        protected ReconnectTask(PagedTopicSubscriber<?> pagedTopicSubscriber) {
            this.m_subscriber = pagedTopicSubscriber;
        }

        @Override // java.lang.Runnable
        public void run() {
            this.m_subscriber.reconnectInternal();
            this.f_cExecution.incrementAndGet();
        }

        public int getExecutionCount() {
            return this.f_cExecution.get();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:com/tangosol/internal/net/topic/impl/paged/PagedTopicSubscriber$Request.class */
    public interface Request {
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:com/tangosol/internal/net/topic/impl/paged/PagedTopicSubscriber$SeekRequest.class */
    public static class SeekRequest extends FunctionalRequest {
        protected final SeekType m_type;
        protected final Map<Integer, Position> m_mapPosition;
        protected final Instant m_instant;
        protected final int[] m_anChannel;

        public SeekRequest(SeekType seekType, int... iArr) {
            this(seekType, null, null, iArr);
        }

        public SeekRequest(Map<Integer, Position> map) {
            this(SeekType.Position, map, null, new int[0]);
        }

        public SeekRequest(Instant instant, int... iArr) {
            this(SeekType.Instant, null, instant, iArr);
        }

        private SeekRequest(SeekType seekType, Map<Integer, Position> map, Instant instant, int... iArr) {
            switch (seekType.ordinal()) {
                case 2:
                    if (map != null) {
                        iArr = map.keySet().stream().mapToInt((v0) -> {
                            return v0.intValue();
                        }).toArray();
                        break;
                    } else {
                        throw new IllegalArgumentException("Seek request of type " + String.valueOf(seekType) + " require a position");
                    }
                case 3:
                    if (instant == null) {
                        throw new IllegalArgumentException("Seek request of type " + String.valueOf(seekType) + " require an instant");
                    }
                    break;
            }
            this.m_type = seekType;
            this.m_mapPosition = map;
            this.m_instant = instant;
            this.m_anChannel = iArr;
        }

        @Override // com.tangosol.internal.net.topic.impl.paged.PagedTopicSubscriber.FunctionalRequest
        protected void execute(PagedTopicSubscriber<?> pagedTopicSubscriber, BatchingOperationsQueue<Request, ?> batchingOperationsQueue) {
            batchingOperationsQueue.completeElement(pagedTopicSubscriber.seekInternal(this), this::onRequestComplete);
        }

        public SeekType getType() {
            return this.m_type;
        }

        public Map<Integer, Position> getPositions() {
            return this.m_mapPosition;
        }

        public Instant getInstant() {
            return this.m_instant;
        }

        public int[] getChannels() {
            return this.m_anChannel;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:com/tangosol/internal/net/topic/impl/paged/PagedTopicSubscriber$SeekType.class */
    public enum SeekType {
        Head,
        Tail,
        Position,
        Instant
    }

    /* loaded from: input_file:com/tangosol/internal/net/topic/impl/paged/PagedTopicSubscriber$StateListener.class */
    public interface StateListener extends EventListener {
        void onStateChange(PagedTopicSubscriber<?> pagedTopicSubscriber, int i, int i2);
    }

    /* loaded from: input_file:com/tangosol/internal/net/topic/impl/paged/PagedTopicSubscriber$TimeoutInterceptor.class */
    public static class TimeoutInterceptor implements EventDispatcherAwareInterceptor<EntryEvent<SubscriberInfo.Key, SubscriberInfo>> {
        private static final AtomicInteger f_instance = new AtomicInteger();
        private final Executor f_executor = Executors.newSingleThreadScheduledExecutor(runnable -> {
            return Base.makeThread(null, runnable, "PagedTopic:SubscriberTimeoutInterceptor:" + f_instance.incrementAndGet());
        });

        @Override // com.tangosol.net.events.EventDispatcherAwareInterceptor
        public void introduceEventDispatcher(String str, EventDispatcher eventDispatcher) {
            if (eventDispatcher instanceof PartitionedCacheDispatcher) {
                if (PagedTopicCaches.Names.SUBSCRIBERS.equals(PagedTopicCaches.Names.fromCacheName(((PartitionedCacheDispatcher) eventDispatcher).getCacheName()))) {
                    eventDispatcher.addEventInterceptor(str, this, Collections.singleton(EntryEvent.Type.REMOVED), true);
                }
            }
        }

        @Override // com.tangosol.net.events.EventInterceptor
        public void onEvent(EntryEvent<SubscriberInfo.Key, SubscriberInfo> entryEvent) {
            if (entryEvent.getType() == EntryEvent.Type.REMOVED) {
                SubscriberInfo.Key key = entryEvent.getKey();
                Logger.finest(String.format("Cleaning up subscriber %d in group '%s' owned by member %d", Long.valueOf(key.getSubscriberId()), key.getGroupId().getGroupName(), Integer.valueOf(PagedTopicSubscriber.memberIdFromId(key.getSubscriberId()))));
                this.f_executor.execute(() -> {
                    processSubscriberRemoval(entryEvent);
                });
            }
        }

        private void processSubscriberRemoval(EntryEvent<SubscriberInfo.Key, SubscriberInfo> entryEvent) {
            SubscriberInfo.Key key = entryEvent.getKey();
            SubscriberInfo originalValue = entryEvent.getOriginalValue();
            long subscriberId = key.getSubscriberId();
            SubscriberGroupId groupId = key.getGroupId();
            String topicName = PagedTopicCaches.Names.getTopicName(entryEvent.getCacheName());
            String cacheNameForTopicName = PagedTopicCaches.Names.SUBSCRIPTIONS.cacheNameForTopicName(topicName);
            PagedTopicService pagedTopicService = (PagedTopicService) entryEvent.getService();
            int memberIdFromId = PagedTopicSubscriber.memberIdFromId(subscriberId);
            long connectionTimestamp = originalValue.getConnectionTimestamp();
            if (entryEvent.getEntry().isSynthetic()) {
                Logger.finest(String.format("Subscriber expired after %d ms - groupId='%s', memberId=%d, notificationId=%d, last heartbeat at %s", Long.valueOf(originalValue.getTimeoutMillis()), groupId.getGroupName(), Integer.valueOf(memberIdFromId), Integer.valueOf(PagedTopicSubscriber.notificationIdFromId(subscriberId)), originalValue.getLastHeartbeat()));
            } else {
                Logger.finest(String.format("Subscriber %d in group '%s' removed due to %s", Long.valueOf(subscriberId), groupId.getGroupName(), pagedTopicService.getInfo().getServiceMembers().stream().anyMatch(member -> {
                    return member.getId() == memberIdFromId;
                }) ? "manual removal of subscriber(s)" : "departure of member " + memberIdFromId));
            }
            SubscriberId subscriberId2 = new SubscriberId(subscriberId, originalValue.getOwningUid());
            long subscriptionId = pagedTopicService.getSubscriptionId(topicName, groupId);
            PagedTopicSubscription subscription = pagedTopicService.getSubscription(subscriptionId);
            if (subscription == null || subscription.getSubscriberTimestamp(subscriberId2) <= connectionTimestamp) {
                PagedTopicSubscriber.notifyClosed(pagedTopicService.ensureCache(cacheNameForTopicName, null), groupId, subscriptionId, subscriberId2);
            }
        }
    }

    /* loaded from: input_file:com/tangosol/internal/net/topic/impl/paged/PagedTopicSubscriber$WithIdentifyingName.class */
    public static class WithIdentifyingName implements Subscriber.Option {
        private final String f_sName;

        public WithIdentifyingName(String str) {
            this.f_sName = str;
        }

        public String getName() {
            return this.f_sName;
        }
    }

    /* loaded from: input_file:com/tangosol/internal/net/topic/impl/paged/PagedTopicSubscriber$WithNotificationId.class */
    public interface WithNotificationId<V, U> extends Subscriber.Option<V, U> {
        int getId();
    }

    public <T> PagedTopicSubscriber(NamedTopic<?> namedTopic, PagedTopicCaches pagedTopicCaches, Subscriber.Option<? super T, V>... optionArr) {
        this.f_topic = (NamedTopic) Objects.requireNonNull(namedTopic);
        this.m_caches = (PagedTopicCaches) Objects.requireNonNull(pagedTopicCaches, "The TopicCaches parameter cannot be null");
        Options from = Options.from(Subscriber.Option.class, optionArr);
        Subscriber.Name name = (Subscriber.Name) from.get(Subscriber.Name.class, null);
        String name2 = name == null ? null : name.getName();
        this.f_fAnonymous = name2 == null;
        this.m_listenerGroupDeactivation = new GroupDeactivationListener();
        this.f_serializer = this.m_caches.getSerializer();
        this.f_listenerNotification = new SimpleMapListener().addDeleteHandler(this::onChannelPopulatedNotification);
        this.f_gate = new ThreadGateLite();
        this.f_gateState = new ThreadGateLite();
        this.m_aChannelOwnershipListener = (Subscriber.ChannelOwnershipListener[]) ((Subscriber.ChannelOwnershipListeners) from.get(Subscriber.ChannelOwnershipListeners.class, Subscriber.ChannelOwnershipListeners.none())).getListeners().toArray(new Subscriber.ChannelOwnershipListener[0]);
        Cluster cluster = this.m_caches.getService().getCluster();
        Member localMember = cluster.getLocalMember();
        boolean z = false;
        WithNotificationId withNotificationId = (WithNotificationId) from.get(WithNotificationId.class);
        if (withNotificationId == null) {
            this.f_nNotificationId = System.identityHashCode(this);
        } else {
            this.f_nNotificationId = withNotificationId.getId();
            z = true;
        }
        this.f_fCompleteOnEmpty = from.contains(Subscriber.CompleteOnEmpty.class);
        this.f_filterNotification = new InKeySetFilter(null, this.m_caches.getPartitionNotifierSet(this.f_nNotificationId));
        this.f_id = new SubscriberId(this.f_nNotificationId, localMember.getId(), localMember.getUuid());
        this.f_subscriberGroupId = this.f_fAnonymous ? SubscriberGroupId.anonymous() : SubscriberGroupId.withName(name2);
        this.f_key = new SubscriberInfo.Key(this.f_subscriberGroupId, this.f_id.getId());
        this.f_heartbeatProcessor = new SubscriberHeartbeatProcessor();
        this.m_listenerChannelAllocation = new ChannelListener(this.f_id, new PagedTopicSubscription.Key(this.f_topic.getName(), this.f_subscriberGroupId));
        Subscriber.Filtered filtered = (Subscriber.Filtered) from.get(Subscriber.Filtered.class);
        this.f_filter = filtered == null ? null : filtered.getFilter();
        Subscriber.Convert convert = (Subscriber.Convert) from.get(Subscriber.Convert.class);
        this.f_extractor = convert == null ? null : convert.getExtractor();
        this.f_taskReconnect = new ReconnectTask(this);
        this.f_daemon = new TaskDaemon("PagedTopic:Subscriber:" + this.m_caches.getTopicName() + ":" + this.f_id.getId());
        this.f_executor = new TaskDaemon("PagedTopic:Subscriber:" + this.m_caches.getTopicName() + ":Receive:" + this.f_id.getId());
        this.f_daemonChannels = new TaskDaemon("PagedTopic:Subscriber:" + this.m_caches.getTopicName() + ":Channels:" + this.f_id.getId());
        TaskDaemon taskDaemon = this.f_daemonChannels;
        Objects.requireNonNull(taskDaemon);
        this.f_executorChannels = taskDaemon::executeTask;
        this.f_daemon.start();
        this.f_daemonChannels.start();
        long publisherCloggedCount = cluster.getDependencies().getPublisherCloggedCount();
        this.f_backlog = new DebouncedFlowControl((publisherCloggedCount * 2) / 3, publisherCloggedCount);
        this.f_queueReceiveOrders = new BatchingOperationsQueue<>((Consumer<Integer>) (v1) -> {
            trigger(v1);
        }, 1, this.f_backlog, request -> {
            return 1L;
        }, BatchingOperationsQueue.Executor.fromTaskDaemon(this.f_daemon));
        this.m_aChannel = initializeChannels(this.m_caches, this.m_caches.getChannelCount(), this.f_subscriberGroupId);
        WithIdentifyingName withIdentifyingName = (WithIdentifyingName) from.get(WithIdentifyingName.class);
        this.f_sIdentifyingName = withIdentifyingName == null ? null : withIdentifyingName.getName();
        registerChannelAllocationListener();
        registerDeactivationListener();
        registerMBean();
        if (z) {
            Logger.warn("Subscriber " + String.valueOf(this.f_id) + " is being created with a custom notification id " + this.f_nNotificationId);
        }
        ensureConnected();
    }

    public long getId() {
        return this.f_id.getId();
    }

    public SubscriberId getSubscriberId() {
        return this.f_id;
    }

    public long getSubscriptionId() {
        return this.m_subscriptionId;
    }

    public String getIdentifyingName() {
        return this.f_sIdentifyingName;
    }

    public int getNotificationId() {
        return this.f_nNotificationId;
    }

    public SubscriberInfo.Key getKey() {
        return this.f_key;
    }

    public boolean isAnonymous() {
        return this.f_fAnonymous;
    }

    public long getBacklog() {
        return this.f_backlog.getBacklog();
    }

    public long getMaxBacklog() {
        return this.f_backlog.getExcessiveLimit();
    }

    public Filter<V> getFilter() {
        return this.f_filter;
    }

    public ValueExtractor<V, ?> getConverter() {
        return this.f_extractor;
    }

    public Serializer getSerializer() {
        return this.f_serializer;
    }

    public boolean isCompleteOnEmpty() {
        return this.f_fCompleteOnEmpty;
    }

    public void printChannels(PrintStream printStream) {
        Gate<?> gate = this.f_gate;
        gate.enter(-1L);
        try {
            printStream.println("Owned: " + Arrays.toString(this.m_aChannelOwned));
            int i = 0;
            while (i < this.m_aChannel.length) {
                Object[] objArr = new Object[3];
                objArr[0] = Integer.valueOf(i);
                objArr[1] = this.m_aChannel[i];
                objArr[2] = Boolean.valueOf(i == this.m_nChannel);
                printStream.printf("%d: %s current=%b\n", objArr);
                i++;
            }
        } finally {
            gate.exit();
        }
    }

    public void printPreFetchCache(PrintStream printStream) {
        Gate<?> gate = this.f_gate;
        gate.enter(-1L);
        try {
            printStream.println("Pre-Fetch Cache: ");
            ConcurrentLinkedDeque<PagedTopicSubscriber<V>.CommittableElement> concurrentLinkedDeque = this.m_queueValuesPrefetched;
            Objects.requireNonNull(printStream);
            concurrentLinkedDeque.forEach((v1) -> {
                r1.println(v1);
            });
        } finally {
            gate.exit();
        }
    }

    @Override // com.tangosol.net.topic.Subscriber
    public <T> NamedTopic<T> getNamedTopic() {
        return (NamedTopic<T>) this.f_topic;
    }

    @Override // com.tangosol.net.topic.Subscriber
    public CompletableFuture<Subscriber.Element<V>> receive() {
        ensureActive();
        CompletableFuture<Subscriber.Element<V>> completableFuture = (CompletableFuture<Subscriber.Element<V>>) this.f_queueReceiveOrders.add(ReceiveRequest.SINGLE);
        this.f_cReceiveRequests.add(1L);
        completableFuture.handle((element, th) -> {
            if (!(th instanceof CancellationException)) {
                return null;
            }
            if (!(th instanceof BatchingOperationsQueue.OperationCancelledException)) {
                Logger.err("Receive cancelled", th);
            }
            this.f_cCancelled.add(1L);
            return null;
        });
        return completableFuture;
    }

    @Override // com.tangosol.net.topic.Subscriber
    public CompletableFuture<List<Subscriber.Element<V>>> receive(int i) {
        ensureActive();
        CompletableFuture<List<Subscriber.Element<V>>> completableFuture = (CompletableFuture<List<Subscriber.Element<V>>>) this.f_queueReceiveOrders.add(new ReceiveRequest(true, i));
        this.f_cReceiveRequests.add(1L);
        completableFuture.handle((list, th) -> {
            if (!(th instanceof CancellationException)) {
                return null;
            }
            if (!(th instanceof BatchingOperationsQueue.OperationCancelledException)) {
                Logger.err("Receive cancelled", th);
            }
            this.f_cCancelled.add(1L);
            return null;
        });
        return completableFuture;
    }

    public Optional<Subscriber.Element<V>> peek(int i) {
        ensureActive();
        Position position = getHeads().get(Integer.valueOf(i));
        if (position == null) {
            return Optional.empty();
        }
        Optional findFirst = this.m_queueValuesPrefetched.stream().filter(committableElement -> {
            return committableElement.getPosition().equals(position);
        }).findFirst();
        if (findFirst.isPresent()) {
            return Optional.of(((CommittableElement) findFirst.get()).getElement());
        }
        PagedPosition pagedPosition = (PagedPosition) position;
        Binary binary = this.m_caches.Data.get(new ContentKey(i, pagedPosition.getPage(), pagedPosition.getOffset()).toBinary(this.m_caches.getPartitionCount()));
        return binary == null ? Optional.empty() : Optional.of(PageElement.fromBinary(binary, this.m_caches.getSerializer()));
    }

    @Override // com.tangosol.net.topic.Subscriber
    public CompletableFuture<Subscriber.CommitResult> commitAsync(int i, Position position) {
        ensureActive();
        try {
            if (position instanceof PagedPosition) {
                return commitInternal(i, (PagedPosition) position, null);
            }
            throw new IllegalArgumentException("Invalid position type");
        } catch (Throwable th) {
            CompletableFuture<Subscriber.CommitResult> completableFuture = new CompletableFuture<>();
            completableFuture.completeExceptionally(th);
            return completableFuture;
        }
    }

    @Override // com.tangosol.net.topic.Subscriber
    public CompletableFuture<Map<Integer, Subscriber.CommitResult>> commitAsync(Map<Integer, Position> map) {
        ensureActive();
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        for (Map.Entry<Integer, Position> entry : map.entrySet()) {
            Integer key = entry.getKey();
            Position value = entry.getValue();
            if (value instanceof PagedPosition) {
                hashMap2.put(key, (PagedPosition) value);
            } else {
                hashMap.put(key, new Subscriber.CommitResult(key.intValue(), value, Subscriber.CommitResultStatus.Rejected));
            }
        }
        return CompletableFuture.allOf((CompletableFuture[]) hashMap2.entrySet().stream().map(entry2 -> {
            return commitInternal(((Integer) entry2.getKey()).intValue(), (PagedPosition) entry2.getValue(), hashMap);
        }).toArray(i -> {
            return new CompletableFuture[i];
        })).handle((r3, th) -> {
            return hashMap;
        });
    }

    @Override // com.tangosol.net.topic.Subscriber
    public int[] getChannels() {
        return getChannelSet().stream().mapToInt(num -> {
            return num.intValue();
        }).toArray();
    }

    public Set<Integer> getChannelSet() {
        if (this.m_nState == 2) {
            Gate<?> gate = this.f_gate;
            gate.enter(-1L);
            try {
                if (this.m_nState == 2) {
                    return (Set) Arrays.stream(this.m_aChannel).filter((v0) -> {
                        return v0.isOwned();
                    }).map(pagedTopicChannel -> {
                        return Integer.valueOf(pagedTopicChannel.subscriberPartitionSync.getChannelId());
                    }).collect(Collectors.toSet());
                }
            } finally {
                gate.exit();
            }
        }
        return Collections.emptySet();
    }

    @Override // com.tangosol.net.topic.Subscriber
    public boolean isOwner(int i) {
        return this.m_nState == 2 && i >= 0 && this.m_aChannel[i].isOwned();
    }

    @Override // com.tangosol.net.topic.Subscriber
    public int getChannelCount() {
        return this.m_aChannel == null ? this.m_caches.getChannelCount() : this.m_aChannel.length;
    }

    @Override // com.tangosol.net.topic.Subscriber
    public FlowControl getFlowControl() {
        return this.f_backlog;
    }

    @Override // com.tangosol.net.topic.Subscriber
    public void onClose(Runnable runnable) {
        this.f_listOnCloseActions.add(runnable);
    }

    @Override // com.tangosol.net.topic.Subscriber
    public boolean isActive() {
        return (this.m_nState == 5 || this.m_nState == 4) ? false : true;
    }

    @Override // com.tangosol.net.topic.Subscriber
    public Map<Integer, Position> getLastCommitted() {
        ensureActive();
        return (Map) this.f_queueReceiveOrders.addFirst(new GetPositionRequest(PositionType.Committed)).join();
    }

    @Override // com.tangosol.net.topic.Subscriber
    public Map<Integer, Position> getHeads() {
        ensureActive();
        return (Map) this.f_queueReceiveOrders.addFirst(new GetPositionRequest(PositionType.Head)).join();
    }

    @Override // com.tangosol.net.topic.Subscriber
    public Map<Integer, Position> getTails() {
        ensureActive();
        return (Map) this.f_queueReceiveOrders.addFirst(new GetPositionRequest(PositionType.Tail)).join();
    }

    @Override // com.tangosol.net.topic.Subscriber
    public Position seek(int i, Position position) {
        ensureActive();
        return (Position) ((Map) this.f_queueReceiveOrders.addFirst(new SeekRequest(Collections.singletonMap(Integer.valueOf(i), position))).join()).get(Integer.valueOf(i));
    }

    @Override // com.tangosol.net.topic.Subscriber
    public Map<Integer, Position> seek(Map<Integer, Position> map) {
        ensureActive();
        return (Map) this.f_queueReceiveOrders.addFirst(new SeekRequest(map)).join();
    }

    @Override // com.tangosol.net.topic.Subscriber
    public Position seek(int i, Instant instant) {
        ensureActive();
        return (Position) ((Map) this.f_queueReceiveOrders.addFirst(new SeekRequest(instant, i)).join()).get(Integer.valueOf(i));
    }

    @Override // com.tangosol.net.topic.Subscriber
    public Map<Integer, Position> seekToHead(int... iArr) {
        ensureActiveAnOwnedChannels(iArr);
        return (Map) this.f_queueReceiveOrders.addFirst(new SeekRequest(SeekType.Head, iArr)).join();
    }

    @Override // com.tangosol.net.topic.Subscriber
    public Map<Integer, Position> seekToTail(int... iArr) {
        ensureActiveAnOwnedChannels(iArr);
        return (Map) this.f_queueReceiveOrders.addFirst(new SeekRequest(SeekType.Tail, iArr)).join();
    }

    @Override // com.tangosol.net.topic.Subscriber
    public int getRemainingMessages() {
        return this.m_caches.getRemainingMessages(this.f_subscriberGroupId, getChannels());
    }

    @Override // com.tangosol.net.topic.Subscriber
    public int getRemainingMessages(int i) {
        if (isOwner(i)) {
            return this.m_caches.getRemainingMessages(this.f_subscriberGroupId, i);
        }
        return 0;
    }

    @Override // com.tangosol.net.topic.Subscriber, java.lang.AutoCloseable
    public void close() {
        closeInternal(false);
    }

    public String toString() {
        if (this.m_nState == 5) {
            return getClass().getSimpleName() + "(inactive)";
        }
        long j = this.m_cPolls;
        long j2 = this.m_cValues;
        long j3 = this.m_cMisses;
        long j4 = this.m_cWait;
        long j5 = this.m_cNotify;
        long j6 = j - this.m_cPollsLast;
        long j7 = j2 - this.m_cValuesLast;
        long j8 = j3 - this.m_cMissesLast;
        long j9 = j4 - this.m_cWaitsLast;
        long j10 = j5 - this.m_cNotifyLast;
        this.m_cPollsLast = j;
        this.m_cValuesLast = j2;
        this.m_cMissesLast = j3;
        this.m_cWaitsLast = j4;
        this.m_cNotifyLast = j5;
        PagedTopicChannel[] pagedTopicChannelArr = this.m_aChannel;
        long count = Arrays.stream(pagedTopicChannelArr).filter((v0) -> {
            return v0.isPolled();
        }).count();
        String arrays = Arrays.toString(Arrays.stream(pagedTopicChannelArr).filter((v0) -> {
            return v0.isPolled();
        }).mapToInt((v0) -> {
            return v0.getId();
        }).toArray());
        Arrays.stream(pagedTopicChannelArr).filter((v0) -> {
            return v0.isHit();
        }).count();
        Arrays.toString(Arrays.stream(pagedTopicChannelArr).filter((v0) -> {
            return v0.isHit();
        }).mapToInt((v0) -> {
            return v0.getId();
        }).toArray());
        String stateName = getStateName();
        String str = this.f_sIdentifyingName == null ? "" : ", name=" + this.f_sIdentifyingName;
        String simpleName = getClass().getSimpleName();
        String topicName = this.m_caches.getTopicName();
        String valueOf = String.valueOf(this.f_id);
        String valueOf2 = String.valueOf(this.f_subscriberGroupId);
        long j11 = this.m_subscriptionId;
        boolean z = !this.f_fAnonymous;
        int size = this.m_queueValuesPrefetched.size();
        String valueOf3 = String.valueOf(this.f_backlog);
        long count2 = this.m_cSubscribe.getCount();
        long count3 = this.m_cDisconnect.getCount();
        long count4 = this.m_cReceived.getCount();
        long count5 = this.m_cReceivedEmpty.getCount();
        long count6 = this.m_cReceivedError.getCount();
        String arrays2 = this.f_fAnonymous ? "[ALL]" : Arrays.toString(this.m_aChannelOwned);
        long max = j7 / Math.max(1L, j6 - j8);
        long max2 = (j9 * 100) / Math.max(1L, j6);
        long max3 = (j10 * 100) / Math.max(1L, j6);
        return simpleName + "(topic=" + topicName + str + ", id=" + valueOf + ", group=" + valueOf2 + ", subscriptionId=" + j11 + ", durable=" + simpleName + ", state=" + z + ", prefetched=" + stateName + ", backlog=" + size + ", subscriptions=" + valueOf3 + ", disconnections=" + count2 + ", received=" + simpleName + ", receivedEmpty=" + count3 + ", receivedError=" + simpleName + ", channelAllocation=" + count4 + ", totalChannelsPolled=" + simpleName + ", channelsPolledSinceReallocation=" + count5 + simpleName + ", channelsHit=" + count6 + simpleName + "/" + arrays2 + ", batchSize=" + j + ", values=" + simpleName + ", notifications=" + arrays + ", waitNotifyRate=" + count + "/" + simpleName + "%)";
    }

    private PagedTopicChannel[] initializeChannels(PagedTopicCaches pagedTopicCaches, int i, SubscriberGroupId subscriberGroupId) {
        return initializeChannels(pagedTopicCaches, i, subscriberGroupId, null);
    }

    private PagedTopicChannel[] initializeChannels(PagedTopicCaches pagedTopicCaches, int i, SubscriberGroupId subscriberGroupId, PagedTopicChannel[] pagedTopicChannelArr) {
        if (pagedTopicChannelArr != null && pagedTopicChannelArr.length >= i) {
            return pagedTopicChannelArr;
        }
        Sentry<?> close = this.f_gate.close();
        try {
            PagedTopicChannel[] pagedTopicChannelArr2 = new PagedTopicChannel[i];
            int partitionCount = pagedTopicCaches.getPartitionCount();
            for (int i2 = 0; i2 < i; i2++) {
                if (pagedTopicChannelArr != null) {
                    if (i2 < pagedTopicChannelArr.length) {
                        pagedTopicChannelArr2[i2] = pagedTopicChannelArr[i2];
                    }
                }
                pagedTopicChannelArr2[i2] = new PagedTopicChannel();
                pagedTopicChannelArr2[i2].subscriberPartitionSync = Subscription.createSyncKey(subscriberGroupId, i2, partitionCount);
            }
            if (close != null) {
                close.close();
            }
            return pagedTopicChannelArr2;
        } catch (Throwable th) {
            if (close != null) {
                try {
                    close.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    public Subscriber.Channel getChannel(int i) {
        return i < this.m_aChannel.length ? this.m_aChannel[i] : new Subscriber.Channel.EmptyChannel(i);
    }

    public long getCancelled() {
        return this.f_cCancelled.longValue();
    }

    public long getReceiveRequests() {
        return this.f_cReceiveRequests.longValue();
    }

    public long getReceived() {
        return this.m_cReceived.getCount();
    }

    public double getReceivedMeanRate() {
        return this.m_cReceived.getMeanRate();
    }

    public double getReceivedOneMinuteRate() {
        return this.m_cReceived.getOneMinuteRate();
    }

    public double getReceivedFiveMinuteRate() {
        return this.m_cReceived.getFiveMinuteRate();
    }

    public double getReceivedFifteenMinuteRate() {
        return this.m_cReceived.getFifteenMinuteRate();
    }

    public long getReceivedEmpty() {
        return this.m_cReceivedEmpty.getCount();
    }

    public long getReceivedError() {
        return this.m_cReceivedError.getCount();
    }

    public long getDisconnectCount() {
        return this.m_cDisconnect.getCount();
    }

    public int getAutoReconnectTaskCount() {
        return this.f_taskReconnect.getExecutionCount();
    }

    public long getPolls() {
        return this.m_cPolls;
    }

    public long getElementsPolled() {
        return this.m_cValues;
    }

    public long getWaitCount() {
        return this.m_cWait;
    }

    public long getMisses() {
        return this.m_cMisses;
    }

    public long getNotify() {
        return this.m_cNotify;
    }

    public boolean isCommitted(int i, Position position) {
        ensureActive();
        return this.m_caches.isCommitted(this.f_subscriberGroupId, i, position);
    }

    protected void initialise() throws InterruptedException, ExecutionException, TimeoutException {
        SortedSet<Integer> sortedSet;
        ensureActive();
        if (this.m_nState == 2) {
            return;
        }
        Sentry<?> close = this.f_gate.close();
        try {
            boolean z = this.m_nState == 3;
            while (this.m_nState != 2 && isActive()) {
                int state = setState(1);
                if (z) {
                    Logger.finest("Reconnecting subscriber " + String.valueOf(this));
                }
                this.m_caches.ensureConnected();
                if (!this.f_fAnonymous) {
                    PagedTopicService service = this.m_caches.getService();
                    if (this.m_subscriptionId == 0) {
                        this.m_subscriptionId = service.ensureSubscription(this.f_topic.getName(), this.f_subscriberGroupId, this.f_id, this.f_filter, this.f_extractor);
                    } else {
                        service.ensureSubscription(this.f_topic.getName(), this.m_subscriptionId, this.f_id, this.m_fForceReconnect);
                        if (service.isSubscriptionDestroyed(this.m_subscriptionId)) {
                            close();
                            throw new IllegalStateException("The subscriber group \"" + String.valueOf(this.f_subscriberGroupId) + "\" (id=" + this.m_subscriptionId + ") this subscriber was previously subscribed to has been destroyed");
                        }
                    }
                    PagedTopicSubscription subscription = service.getSubscription(this.m_subscriptionId);
                    if (subscription != null) {
                        this.m_connectionTimestamp = subscription.getSubscriberTimestamp(this.f_id);
                    } else {
                        this.m_connectionTimestamp = SafeClock.INSTANCE.getSafeTimeMillis();
                    }
                    heartbeat(false);
                }
                long[] initializeSubscription = this.m_caches.initializeSubscription(this.f_subscriberGroupId, this.f_id, this.m_subscriptionId, this.f_filter, this.f_extractor, z, false, state == 3);
                int length = initializeSubscription.length;
                if (length > this.m_aChannel.length) {
                    this.m_aChannel = initializeChannels(this.m_caches, length, this.f_subscriberGroupId, this.m_aChannel);
                }
                for (int i = 0; i < length; i++) {
                    PagedTopicChannel pagedTopicChannel = this.m_aChannel[i];
                    pagedTopicChannel.m_lHead = initializeSubscription[i];
                    pagedTopicChannel.m_nNext = -1;
                    pagedTopicChannel.setPopulated();
                }
                if (this.f_fAnonymous) {
                    TreeSet treeSet = new TreeSet();
                    for (int i2 = 0; i2 < length; i2++) {
                        treeSet.add(Integer.valueOf(i2));
                    }
                    updateChannelOwnership(treeSet, false);
                } else {
                    PagedTopicSubscription subscription2 = this.m_caches.getService().getSubscription(this.m_subscriptionId);
                    if (subscription2 != null) {
                        sortedSet = subscription2.getOwnedChannels(this.f_id);
                    } else {
                        CompletableFuture<Subscription> completableFuture = this.m_caches.Subscriptions.async().get(this.m_aChannel[0].subscriberPartitionSync);
                        try {
                            sortedSet = (SortedSet) Arrays.stream(completableFuture.get(INIT_TIMEOUT_SECS, TimeUnit.SECONDS).getChannels(this.f_id, length)).boxed().collect(Collectors.toCollection(TreeSet::new));
                        } catch (TimeoutException e) {
                            completableFuture.cancel(true);
                            if (!completableFuture.isDone() || completableFuture.isCompletedExceptionally()) {
                                throw e;
                            }
                            sortedSet = (SortedSet) Arrays.stream(completableFuture.get(INIT_TIMEOUT_SECS, TimeUnit.SECONDS).getChannels(this.f_id, length)).boxed().collect(Collectors.toCollection(TreeSet::new));
                        }
                    }
                    updateChannelOwnership(sortedSet, false);
                }
                heartbeat();
                registerNotificationListener();
                if (casState(1, 2)) {
                    switchChannel();
                }
            }
            this.m_cSubscribe.mark();
            if (close != null) {
                close.close();
            }
        } catch (Throwable th) {
            if (close != null) {
                try {
                    close.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private void trigger(int i) {
        receiveInternal(this.f_queueReceiveOrders, Integer.valueOf(i));
    }

    private void receiveInternal(BatchingOperationsQueue<Request, ?> batchingOperationsQueue, Integer num) {
        if (isActive()) {
            ensureConnected();
        }
        if (!batchingOperationsQueue.isBatchComplete() || batchingOperationsQueue.fillCurrentBatch(num.intValue())) {
            heartbeat();
            complete(batchingOperationsQueue);
            int ensureOwnedChannel = ensureOwnedChannel();
            if (!batchingOperationsQueue.isBatchComplete() && ensureOwnedChannel >= 0) {
                PagedTopicChannel pagedTopicChannel = this.m_aChannel[ensureOwnedChannel];
                long version = pagedTopicChannel.getVersion();
                long subscriptionHead = pagedTopicChannel.m_lHead == -1 ? getSubscriptionHead(pagedTopicChannel) : pagedTopicChannel.m_lHead;
                int keyPartition = ((PartitionedService) this.m_caches.Subscriptions.getCacheService()).getKeyPartitioningStrategy().getKeyPartition(new Page.Key(ensureOwnedChannel, subscriptionHead));
                CompletableFuture invokeAsync = InvocableMapHelper.invokeAsync(this.m_caches.Subscriptions, new Subscription.Key(keyPartition, ensureOwnedChannel, this.f_subscriberGroupId), this.m_caches.getUnitOfOrder(keyPartition), new PollProcessor(subscriptionHead, Integer.MAX_VALUE, this.f_nNotificationId, this.f_id), this.f_executor, (result, th) -> {
                    onReceiveResult(pagedTopicChannel, version, subscriptionHead, result, th);
                });
                BiFunction biFunction = (result2, th2) -> {
                    if (th2 != null) {
                        Logger.err(th2);
                        return null;
                    }
                    if (!this.m_queueValuesPrefetched.isEmpty()) {
                        complete(batchingOperationsQueue);
                    }
                    trigger(num.intValue());
                    return null;
                };
                TaskDaemon taskDaemon = this.f_daemon;
                Objects.requireNonNull(taskDaemon);
                invokeAsync.handleAsync(biFunction, taskDaemon::executeTask);
                return;
            }
            if (!this.m_queueValuesPrefetched.isEmpty() || ensureOwnedChannel >= 0) {
                receiveInternal(batchingOperationsQueue, num);
                return;
            }
            Gate<?> gate = this.f_gate;
            gate.enter(-1L);
            try {
                if (switchChannel()) {
                    receiveInternal(batchingOperationsQueue, num);
                } else {
                    if (this.f_fCompleteOnEmpty) {
                        this.m_queueValuesPrefetched.add(getEmptyElement());
                        complete(batchingOperationsQueue);
                    }
                    batchingOperationsQueue.resetTrigger();
                }
            } finally {
                gate.exit();
            }
        }
    }

    private long getSubscriptionHead(PagedTopicChannel pagedTopicChannel) {
        Subscription subscription = this.m_caches.Subscriptions.get(pagedTopicChannel.subscriberPartitionSync);
        if (subscription != null) {
            return subscription.getSubscriptionHead();
        }
        try {
            initialise();
            return this.m_aChannel[pagedTopicChannel.getId()].m_lHead;
        } catch (Throwable th) {
            throw Exceptions.ensureRuntimeException(th);
        }
    }

    protected void complete(BatchingOperationsQueue<Request, ?> batchingOperationsQueue) {
        complete(batchingOperationsQueue, batchingOperationsQueue.getCurrentBatchValues());
    }

    protected void complete(BatchingOperationsQueue<Request, ?> batchingOperationsQueue, LinkedList<Request> linkedList) {
        ConcurrentLinkedDeque<PagedTopicSubscriber<V>.CommittableElement> concurrentLinkedDeque = this.m_queueValuesPrefetched;
        Request peek = linkedList.peek();
        while (peek instanceof FunctionalRequest) {
            ((FunctionalRequest) linkedList.poll()).execute(this, batchingOperationsQueue);
            peek = linkedList.peek();
        }
        int i = 0;
        int size = linkedList.size();
        if (!isActive() || concurrentLinkedDeque.isEmpty()) {
            return;
        }
        Gate<?> gate = this.f_gate;
        gate.enter(-1L);
        try {
            LongArray<?> sparseArray = new SparseArray<>();
            PagedTopicSubscriber<V>.CommittableElement peek2 = concurrentLinkedDeque.peek();
            if (peek2 == null || !peek2.isEmpty()) {
                while (this.m_nState == 2 && i < size && !concurrentLinkedDeque.isEmpty()) {
                    Request request = linkedList.get(i);
                    if (request instanceof ReceiveRequest) {
                        ReceiveRequest receiveRequest = (ReceiveRequest) request;
                        if (receiveRequest.isBatch()) {
                            int elementCount = receiveRequest.getElementCount();
                            LinkedList linkedList2 = new LinkedList();
                            for (int i2 = 0; i2 < elementCount && !concurrentLinkedDeque.isEmpty(); i2++) {
                                PagedTopicSubscriber<V>.CommittableElement poll = concurrentLinkedDeque.poll();
                                if (poll != null && !poll.isEmpty() && isOwner(poll.getChannel())) {
                                    linkedList2.add(poll);
                                }
                            }
                            i++;
                            if (!batchingOperationsQueue.completeElement(linkedList2, this::onReceiveComplete)) {
                                while (true) {
                                    PagedTopicSubscriber<V>.CommittableElement committableElement = (CommittableElement) linkedList2.pollLast();
                                    if (committableElement == null) {
                                        break;
                                    } else {
                                        concurrentLinkedDeque.offerFirst(committableElement);
                                    }
                                }
                            }
                        } else {
                            PagedTopicSubscriber<V>.CommittableElement poll2 = concurrentLinkedDeque.poll();
                            if (poll2 != null && !poll2.isEmpty() && isOwner(poll2.getChannel())) {
                                i++;
                                if (!batchingOperationsQueue.completeElement(poll2, this::onReceiveComplete)) {
                                    concurrentLinkedDeque.offerFirst(poll2);
                                }
                            }
                        }
                    }
                }
            } else {
                concurrentLinkedDeque.poll();
                while (i < size) {
                    Request request2 = linkedList.get(i);
                    if (request2 instanceof ReceiveRequest) {
                        if (((ReceiveRequest) request2).isBatch()) {
                            sparseArray.set(i, Collections.emptyList());
                        } else {
                            sparseArray.set(i, null);
                        }
                        i++;
                    }
                }
                batchingOperationsQueue.completeElements(i, sparseArray, (v1, v2) -> {
                    return onReceiveError(v1, v2);
                }, this::onReceiveComplete);
            }
        } finally {
            gate.exit();
        }
    }

    public int getReceiveQueueSize() {
        return this.f_queueReceiveOrders.size();
    }

    private Throwable onReceiveError(Throwable th, Object obj) {
        this.m_cReceived.mark();
        this.m_cReceivedError.mark();
        return new TopicException(th);
    }

    private void onReceiveComplete(Object obj) {
        if (obj == null) {
            this.m_cReceived.mark();
            this.m_cReceivedEmpty.mark();
        } else if (obj instanceof Subscriber.Element) {
            this.m_cReceived.mark();
            onReceiveComplete((Subscriber.Element<?>) obj);
        } else if (obj instanceof Collection) {
            this.m_cReceived.mark();
            Iterator it = ((Collection) obj).iterator();
            while (it.hasNext()) {
                onReceiveComplete((Subscriber.Element<?>) it.next());
            }
        }
    }

    private void onReceiveComplete(Subscriber.Element<?> element) {
        int channel = element.getChannel();
        if (this.m_aChannel[channel].isOwned()) {
            this.m_aChannel[channel].m_lastReceived = (PagedPosition) element.getPosition();
            this.m_aChannel[channel].m_cReceived.mark();
        }
    }

    private CompletableFuture<Subscriber.CommitResult> commitInternal(int i, PagedPosition pagedPosition, Map<Integer, Subscriber.CommitResult> map) {
        try {
            long page = pagedPosition.getPage();
            int partitionCount = this.m_caches.getPartitionCount();
            int keyPartition = ((PartitionedService) this.m_caches.Subscriptions.getCacheService()).getKeyPartitioningStrategy().getKeyPartition(new Page.Key(i, page));
            scheduleHeadIncrement(this.m_aChannel[i], page - 1).join();
            Set<Subscription.Key> ensureSubscriptionKeys = this.m_aChannel[i].ensureSubscriptionKeys(partitionCount, this.f_subscriberGroupId);
            return CompletableFuture.supplyAsync(() -> {
                return this.m_caches.Subscriptions.invokeAll(ensureSubscriptionKeys, new CommitProcessor(pagedPosition, this.f_id));
            }, Daemons.commonPool()).handle((map2, th) -> {
                V commitResult;
                if (th == null) {
                    commitResult = (Subscriber.CommitResult) map2.get(new Subscription.Key(keyPartition, i, this.f_subscriberGroupId));
                } else {
                    Logger.err("Commit failure", th);
                    commitResult = new Subscriber.CommitResult(i, pagedPosition, Subscriber.CommitResultStatus.Rejected, th);
                }
                if (map != null) {
                    map.put(Integer.valueOf(i), commitResult);
                }
                this.m_aChannel[i].committed(pagedPosition);
                return commitResult;
            });
        } catch (Throwable th2) {
            CompletableFuture<Subscriber.CommitResult> completableFuture = new CompletableFuture<>();
            completableFuture.completeExceptionally(th2);
            return completableFuture;
        }
    }

    private Position seekChannel(int i, PagedPosition pagedPosition) {
        Position updateSeekedChannel = updateSeekedChannel(i, seekInternal(i, pagedPosition));
        return updateSeekedChannel == null ? PagedPosition.NULL_POSITION : updateSeekedChannel;
    }

    private Position updateSeekedChannel(int i, SeekProcessor.Result result) {
        PagedPosition head = result.getHead();
        PagedPosition seekPosition = result.getSeekPosition();
        if (head != null) {
            this.m_aChannel[i].m_lHead = head.getPage();
            this.m_aChannel[i].m_nNext = head.getOffset();
        }
        this.m_queueValuesPrefetched.removeIf(committableElement -> {
            return committableElement.getChannel() == i;
        });
        return seekPosition;
    }

    private SeekProcessor.Result seekInternal(int i, PagedPosition pagedPosition) {
        return (SeekProcessor.Result) this.m_caches.Subscriptions.invokeAll(this.m_aChannel[i].ensureSubscriptionKeys(this.m_caches.getPartitionCount(), this.f_subscriberGroupId), new SeekProcessor(pagedPosition, this.f_id)).values().stream().filter((v0) -> {
            return Objects.nonNull(v0);
        }).sorted().findFirst().orElse(null);
    }

    private Map<Integer, Position> seekInternal(SeekRequest seekRequest) {
        SeekType type = seekRequest.getType();
        int[] channels = seekRequest.getChannels();
        if (channels == null || channels.length == 0) {
            return new HashMap();
        }
        ensureActiveAnOwnedChannels(channels);
        switch (type) {
            case Head:
                Map map = (Map) this.m_caches.Pages.aggregate(GroupAggregator.createInstance(new ReflectionExtractor("getChannelId", new Object[0], 1), new LongMin(new ReflectionExtractor("getPageId", new Object[0], 1))));
                HashMap hashMap = new HashMap();
                for (int i : channels) {
                    hashMap.put(Integer.valueOf(i), new PagedPosition(((Long) map.get(Integer.valueOf(i))).longValue(), -1));
                }
                return seekInternal(hashMap);
            case Tail:
                return seekInternal(filterForChannel(this.m_caches.getTails(), channels));
            case Position:
                return seekInternal(seekRequest.getPositions());
            case Instant:
                HashMap hashMap2 = new HashMap();
                Instant instant = seekRequest.getInstant();
                for (int i2 : seekRequest.getChannels()) {
                    hashMap2.put(Integer.valueOf(i2), seekInternal(i2, instant));
                }
                return hashMap2;
            default:
                throw new IllegalArgumentException("Invalid SeekType " + String.valueOf(type));
        }
    }

    private Map<Integer, Position> seekInternal(Map<Integer, Position> map) {
        ensureActive();
        List<Integer> list = map.keySet().stream().filter(num -> {
            return !isOwner(num.intValue());
        }).toList();
        if (!list.isEmpty()) {
            throw new IllegalStateException("Subscriber is not allocated channels " + String.valueOf(list));
        }
        try {
            this.f_queueReceiveOrders.pause();
            HashMap hashMap = new HashMap();
            for (Map.Entry<Integer, Position> entry : map.entrySet()) {
                Integer key = entry.getKey();
                Position value = entry.getValue();
                if (!(value instanceof PagedPosition)) {
                    throw new IllegalArgumentException("Invalid position type for channel " + key);
                }
                hashMap.put(key, (PagedPosition) value);
            }
            HashMap hashMap2 = new HashMap();
            for (Map.Entry entry2 : hashMap.entrySet()) {
                int intValue = ((Integer) entry2.getKey()).intValue();
                hashMap2.put(Integer.valueOf(intValue), updateSeekedChannel(intValue, seekInternal(intValue, (PagedPosition) entry2.getValue())));
            }
            return hashMap2;
        } finally {
            this.f_queueReceiveOrders.resetTrigger();
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    private Position seekInternal(int i, Instant instant) {
        if (!isOwner(i)) {
            throw new IllegalStateException("Subscriber is not allocated channel " + i);
        }
        Objects.requireNonNull(instant);
        try {
            this.f_queueReceiveOrders.pause();
            PagedPosition pagedPosition = (PagedPosition) this.m_caches.getService().getBackingMapManager().getContext().getValueFromInternalConverter().convert((Binary) this.m_caches.Data.aggregate(Filters.equal((ValueExtractor<T, ? extends Integer>) Page.ElementExtractor.chained((v0) -> {
                return v0.getChannel();
            }), Integer.valueOf(i)).and(Filters.greater(Page.ElementExtractor.chained((v0) -> {
                return v0.getTimestamp();
            }), instant)), new ComparableMin(Page.ElementExtractor.chained((v0) -> {
                return v0.getPosition();
            }))));
            if (pagedPosition == null) {
                Position position = seekToTail(i).get(Integer.valueOf(i));
                this.f_queueReceiveOrders.resetTrigger();
                return position;
            }
            int offset = pagedPosition.getOffset();
            Position seekChannel = seekChannel(i, offset == 0 ? new PagedPosition(pagedPosition.getPage() - 1, Integer.MAX_VALUE) : new PagedPosition(pagedPosition.getPage(), offset - 1));
            this.f_queueReceiveOrders.resetTrigger();
            return seekChannel;
        } catch (Throwable th) {
            this.f_queueReceiveOrders.resetTrigger();
            throw th;
        }
    }

    private void ensureActiveAnOwnedChannels(int... iArr) {
        ensureActive();
        if (iArr == null || iArr.length <= 0) {
            return;
        }
        List<Integer> list = Arrays.stream(iArr).filter(i -> {
            return !isOwner(i);
        }).boxed().toList();
        if (!list.isEmpty()) {
            throw new IllegalArgumentException("One or more channels are not allocated to this subscriber " + String.valueOf(list));
        }
    }

    private Map<Integer, Position> filterForChannel(Map<Integer, Position> map, int... iArr) {
        HashMap hashMap = new HashMap();
        for (int i : iArr) {
            Position position = map.get(Integer.valueOf(i));
            if (position != null) {
                hashMap.put(Integer.valueOf(i), position);
            }
        }
        return hashMap;
    }

    private Map<Integer, Position> getHeadsInternal() {
        ensureActive();
        HashMap hashMap = new HashMap();
        for (int i : getChannels()) {
            hashMap.put(Integer.valueOf(i), this.m_aChannel[i].getHead());
        }
        Iterator<PagedTopicSubscriber<V>.CommittableElement> it = this.m_queueValuesPrefetched.iterator();
        while (it.hasNext()) {
            PagedTopicSubscriber<V>.CommittableElement next = it.next();
            int channel = next.getChannel();
            if (hashMap.containsKey(Integer.valueOf(channel))) {
                Position position = (Position) hashMap.get(Integer.valueOf(channel));
                PagedPosition pagedPosition = (PagedPosition) next.getPosition();
                if (channel != -1 && pagedPosition != null && pagedPosition.getPage() != -1 && (position == null || position.compareTo(pagedPosition) > 0)) {
                    hashMap.put(Integer.valueOf(channel), pagedPosition);
                }
            }
        }
        return hashMap;
    }

    private Map<Integer, Position> getTailsInternal() {
        Map<Integer, Position> tails = this.m_caches.getTails();
        HashMap hashMap = new HashMap();
        for (int i : getChannels()) {
            hashMap.put(Integer.valueOf(i), tails.getOrDefault(Integer.valueOf(i), this.m_aChannel[i].getHead()));
        }
        return hashMap;
    }

    private Map<Integer, Position> getLastCommittedInternal() {
        ensureActive();
        Map<Integer, Position> lastCommitted = this.m_caches.getLastCommitted(this.f_subscriberGroupId);
        int[] iArr = this.m_aChannelOwned;
        HashMap hashMap = new HashMap();
        for (int i : iArr) {
            hashMap.put(Integer.valueOf(i), lastCommitted.getOrDefault(Integer.valueOf(i), PagedPosition.NULL_POSITION));
        }
        return hashMap;
    }

    protected void ensureActive() {
        if (!isActive()) {
            throw new IllegalStateException("The subscriber is not active");
        }
    }

    public int getState() {
        return this.m_nState;
    }

    public String getStateName() {
        return getStateName(this.m_nState);
    }

    public static String getStateName(int i) {
        return (i < 0 || i >= STATES.length) ? "Unknown (" + i + ")" : STATES[i];
    }

    protected int setState(int i) {
        Sentry<?> close = this.f_gateState.close();
        try {
            int i2 = this.m_nState;
            this.m_nState = i;
            if (i2 != i) {
                notifyStateChange(i, i2);
            }
            if (close != null) {
                close.close();
            }
            return i2;
        } catch (Throwable th) {
            if (close != null) {
                try {
                    close.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    protected boolean casState(int i, int i2) {
        if (this.m_nState != i) {
            return false;
        }
        Sentry<?> close = this.f_gateState.close();
        try {
            if (this.m_nState != i) {
                if (close == null) {
                    return false;
                }
                close.close();
                return false;
            }
            this.m_nState = i2;
            notifyStateChange(i2, i);
            if (close != null) {
                close.close();
            }
            return true;
        } catch (Throwable th) {
            if (close != null) {
                try {
                    close.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private void notifyStateChange(int i, int i2) {
        for (EventListener eventListener : this.f_stateListeners.listeners()) {
            try {
                ((StateListener) eventListener).onStateChange(this, i, i2);
            } catch (Throwable th) {
                Logger.err(th);
            }
        }
    }

    public void addStateListener(StateListener stateListener) {
        this.f_stateListeners.add(stateListener);
    }

    public void removeStateListener(StateListener stateListener) {
        this.f_stateListeners.remove(stateListener);
    }

    public void connect() {
        ensureActive();
        ensureConnected();
    }

    protected void reconnectInternal() {
        if (getState() == 2 || !isActive()) {
            return;
        }
        try {
            if (this.m_caches.getService().isSuspended()) {
                Logger.finest("Skipping reconnect task, service is suspended for subscriber " + String.valueOf(this));
            } else if (this.f_queueReceiveOrders.size() > 0) {
                Logger.finest("Running reconnect task, reconnecting " + String.valueOf(this));
                ensureConnected();
                this.f_queueReceiveOrders.triggerOperations();
            } else {
                Logger.finest("Skipping reconnect task, no pending receives for subscriber " + String.valueOf(this));
            }
        } catch (Throwable th) {
            Logger.finest("Failed to reconnect subscriber " + String.valueOf(this), th);
        }
    }

    protected void ensureConnected() {
        if (!isActive() || this.m_nState == 2) {
            return;
        }
        Sentry<?> close = this.f_gate.close();
        try {
            ensureActive();
            PagedTopicDependencies dependencies = this.m_caches.getDependencies();
            long reconnectRetryMillis = dependencies.getReconnectRetryMillis();
            long currentTimeMillis = System.currentTimeMillis();
            long reconnectTimeoutMillis = currentTimeMillis + dependencies.getReconnectTimeoutMillis();
            Throwable th = null;
            if (this.m_nState != 2) {
                while (currentTimeMillis < reconnectTimeoutMillis && isActive()) {
                    try {
                        if (this.m_caches.getService().isSuspended()) {
                            Logger.finer("Skipping ensureConnected, service is suspended " + String.valueOf(this));
                        } else {
                            this.m_caches.ensureConnected();
                            initialise();
                            th = null;
                        }
                    } catch (Throwable th2) {
                        th = th2;
                        if (th instanceof TopicException) {
                            break;
                        }
                        currentTimeMillis = System.currentTimeMillis();
                        if (currentTimeMillis < reconnectTimeoutMillis) {
                            String valueOf = String.valueOf(this);
                            th.getMessage();
                            Logger.info("Failed to reconnect subscriber, will retry in " + reconnectRetryMillis + " millis " + reconnectRetryMillis + " due to " + valueOf);
                            Logger.finest(th);
                            try {
                                Thread.sleep(reconnectRetryMillis);
                            } catch (InterruptedException e) {
                            }
                        }
                    }
                }
                if (th == null) {
                    this.f_queueReceiveOrders.triggerOperations();
                }
            }
            if (th != null) {
                throw Exceptions.ensureRuntimeException(th);
            }
            if (close != null) {
                close.close();
            }
        } catch (Throwable th3) {
            if (close != null) {
                try {
                    close.close();
                } catch (Throwable th4) {
                    th3.addSuppressed(th4);
                }
            }
            throw th3;
        }
    }

    public boolean isDisconnected() {
        return this.m_nState == 3;
    }

    public boolean isConnected() {
        return this.m_nState == 2;
    }

    public void disconnect() {
        disconnectInternal(false);
    }

    private void disconnectInternal(boolean z) {
        long j = this.m_connectionTimestamp;
        if (!isActive()) {
            return;
        }
        while (true) {
            int i = this.m_nState;
            if (i == 2) {
                Sentry<?> close = this.f_gate.close();
                try {
                    if (this.m_connectionTimestamp != j) {
                        if (close != null) {
                            close.close();
                            return;
                        }
                        return;
                    }
                    if (isActive() && casState(i, 3)) {
                        this.m_fForceReconnect = z;
                        this.m_cDisconnect.mark();
                        if (!this.f_fAnonymous) {
                            this.m_listenerChannelAllocation.reset();
                        }
                        this.m_queueValuesPrefetched.clear();
                        long reconnectWaitMillis = this.m_caches.getDependencies().getReconnectWaitMillis();
                        Logger.finest("Disconnected Subscriber " + String.valueOf(this));
                        this.f_daemon.scheduleTask(this.f_taskReconnect, TimeHelper.getSafeTimeMillis() + reconnectWaitMillis);
                    }
                    if (close != null) {
                        close.close();
                    }
                } catch (Throwable th) {
                    if (close != null) {
                        try {
                            close.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            } else if (i == 3 || casState(i, 3)) {
                return;
            }
        }
    }

    public SubscriberGroupId getSubscriberGroupId() {
        return this.f_subscriberGroupId;
    }

    public void notifyChannel(int i) {
        onChannelPopulatedNotification(new int[]{i});
    }

    private void onChannelPopulatedNotification(MapEvent<?, ?> mapEvent) {
        CompletableFuture.runAsync(() -> {
            onChannelPopulatedNotification((int[]) mapEvent.getOldValue());
        }, this.f_executorChannels);
    }

    protected void onChannelPopulatedNotification(int[] iArr) {
        if (iArr == null || iArr.length == 0) {
            return;
        }
        Sentry<?> close = this.f_gate.close();
        try {
            if (this.m_aChannel == null || !isActive()) {
                if (close != null) {
                    close.close();
                    return;
                }
                return;
            }
            this.m_cNotify++;
            int i = this.m_nChannel;
            boolean z = i < 0 || this.m_aChannel[i].isEmpty();
            for (int i2 : iArr) {
                this.m_aChannel[i2].onChannelPopulatedNotification();
            }
            if (close != null) {
                close.close();
            }
            PagedTopicSubscriber<V>.CommittableElement peek = this.m_queueValuesPrefetched.peek();
            if (peek != null && peek.isEmpty()) {
                this.m_queueValuesPrefetched.poll();
            }
            if (z) {
                switchChannel();
                this.f_queueReceiveOrders.triggerOperations();
            }
        } catch (Throwable th) {
            if (close != null) {
                try {
                    close.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private void onChannelEmpty(int i, long j) {
        if (isConnected()) {
            Gate<?> gate = this.f_gate;
            gate.enter(-1L);
            try {
                if (this.m_aChannel != null && isActive() && isConnected()) {
                    this.m_aChannel[i].setEmpty(j);
                    gate.exit();
                }
            } finally {
                gate.exit();
            }
        }
    }

    protected CompletableFuture<Long> scheduleHeadIncrement(PagedTopicChannel pagedTopicChannel, long j) {
        return isActive() ? InvocableMapHelper.invokeAsync(this.m_caches.Subscriptions, pagedTopicChannel.subscriberPartitionSync, this.m_caches.getUnitOfOrder(pagedTopicChannel.subscriberPartitionSync.getPartitionId()), new HeadAdvancer(j + 1), this.f_executor, (l, th) -> {
            if (l.longValue() < j + 1) {
                pagedTopicChannel.m_fContended = false;
                return;
            }
            if (j != -1 && !pagedTopicChannel.m_fContended) {
                pagedTopicChannel.m_fContended = true;
            }
            if (l.longValue() > pagedTopicChannel.m_lHead) {
                pagedTopicChannel.m_lHead = l.longValue();
                pagedTopicChannel.m_nNext = -1;
            }
        }) : CompletableFuture.completedFuture(-1L);
    }

    @Override // com.tangosol.net.topic.Subscriber
    public void heartbeat() {
        heartbeat(true);
    }

    private void heartbeat(boolean z) {
        if (this.f_fAnonymous) {
            return;
        }
        this.f_heartbeatProcessor.setUuid(this.m_caches.getService().getCluster().getLocalMember().getUuid());
        this.f_heartbeatProcessor.setSubscription(this.m_subscriptionId);
        this.f_heartbeatProcessor.setlConnectionTimestamp(this.m_connectionTimestamp);
        if (z) {
            this.m_caches.Subscribers.async().invoke(this.f_key, this.f_heartbeatProcessor);
        } else {
            this.m_caches.Subscribers.invoke(this.f_key, this.f_heartbeatProcessor);
        }
    }

    private void updateChannelOwnership(SortedSet<Integer> sortedSet, boolean z) {
        if (isActive()) {
            int[] array = sortedSet.stream().mapToInt(num -> {
                return num.intValue();
            }).toArray();
            int orElse = sortedSet.stream().mapToInt(num2 -> {
                return num2.intValue();
            }).max().orElse(getChannelCount() - 1);
            Sentry<?> close = this.f_gate.close();
            try {
                if (!isActive()) {
                    if (close != null) {
                        close.close();
                        return;
                    }
                    return;
                }
                PagedTopicChannel[] pagedTopicChannelArr = this.m_aChannel;
                if (orElse >= pagedTopicChannelArr.length) {
                    Logger.finer((Supplier<String>) () -> {
                        return String.format("Disconnecting subscriber %d on topic %s due to increase in channel count from %d to %d", Long.valueOf(this.f_id.getId()), this.f_topic.getName(), Integer.valueOf(pagedTopicChannelArr.length), Integer.valueOf(orElse));
                    });
                    disconnectInternal(true);
                    if (close != null) {
                        close.close();
                        return;
                    }
                    return;
                }
                if (!Arrays.equals(this.m_aChannelOwned, array)) {
                    HashSet hashSet = new HashSet();
                    HashSet hashSet2 = new HashSet(sortedSet);
                    if (this.m_aChannelOwned != null && this.m_aChannelOwned.length > 0) {
                        for (int i : this.m_aChannelOwned) {
                            hashSet.add(Integer.valueOf(i));
                            hashSet2.remove(Integer.valueOf(i));
                        }
                        Objects.requireNonNull(hashSet);
                        sortedSet.forEach((v1) -> {
                            r1.remove(v1);
                        });
                    }
                    Set<Integer> unmodifiableSet = Collections.unmodifiableSet(hashSet);
                    Set<Integer> copyOf = Set.copyOf(sortedSet);
                    Logger.finest(String.format("Subscriber %d (name=%s) channel allocation changed, assigned=%s added=%s revoked=%s", Long.valueOf(this.f_id.getId()), this.f_sIdentifyingName, copyOf, hashSet2, unmodifiableSet));
                    this.m_aChannelOwned = array;
                    if (!this.f_fAnonymous) {
                        if (this.m_nState == 0) {
                            for (PagedTopicChannel pagedTopicChannel : this.m_aChannel) {
                                pagedTopicChannel.setUnowned();
                                pagedTopicChannel.setPopulated();
                            }
                        }
                        for (PagedTopicChannel pagedTopicChannel2 : this.m_aChannel) {
                            pagedTopicChannel2.m_fContended = false;
                            pagedTopicChannel2.setUnowned();
                            pagedTopicChannel2.setPopulated();
                        }
                        for (int i2 : this.m_aChannelOwned) {
                            PagedTopicChannel pagedTopicChannel3 = this.m_aChannel[i2];
                            pagedTopicChannel3.m_fContended = false;
                            pagedTopicChannel3.setOwned();
                            pagedTopicChannel3.setPopulated();
                        }
                        Iterator it = hashSet2.iterator();
                        while (it.hasNext()) {
                            PagedTopicChannel pagedTopicChannel4 = this.m_aChannel[((Integer) it.next()).intValue()];
                            pagedTopicChannel4.clearPolled();
                            pagedTopicChannel4.clearHit();
                        }
                        Iterator<Integer> it2 = unmodifiableSet.iterator();
                        while (it2.hasNext()) {
                            PagedTopicChannel pagedTopicChannel5 = this.m_aChannel[it2.next().intValue()];
                            pagedTopicChannel5.clearPolled();
                            pagedTopicChannel5.clearHit();
                        }
                    }
                    PagedTopicSubscriber<V>.CommittableElement peek = this.m_queueValuesPrefetched.peek();
                    if (peek != null && peek.isEmpty()) {
                        this.m_queueValuesPrefetched.poll();
                    }
                    for (Subscriber.ChannelOwnershipListener channelOwnershipListener : this.m_aChannelOwnershipListener) {
                        if (!unmodifiableSet.isEmpty()) {
                            if (z) {
                                try {
                                    channelOwnershipListener.onChannelsLost(unmodifiableSet);
                                } catch (Throwable th) {
                                    Logger.err(th);
                                }
                            } else {
                                channelOwnershipListener.onChannelsRevoked(unmodifiableSet);
                            }
                        }
                        if (!copyOf.isEmpty()) {
                            try {
                                channelOwnershipListener.onChannelsAssigned(copyOf);
                            } catch (Throwable th2) {
                                Logger.err(th2);
                            }
                        }
                    }
                    onChannelPopulatedNotification(this.m_aChannelOwned);
                }
                if (close != null) {
                    close.close();
                }
            } catch (Throwable th3) {
                if (close != null) {
                    try {
                        close.close();
                    } catch (Throwable th4) {
                        th3.addSuppressed(th4);
                    }
                }
                throw th3;
            }
        }
    }

    protected int ensureOwnedChannel() {
        if (this.m_nChannel >= 0 && this.m_aChannel[this.m_nChannel].isOwned()) {
            return this.m_nChannel;
        }
        switchChannel();
        return this.m_nChannel;
    }

    protected boolean switchChannel() {
        if (this.m_aChannel == null || !isActive() || !isConnected()) {
            return false;
        }
        Gate<?> gate = this.f_gate;
        gate.enter(-1L);
        try {
            if (this.m_aChannel == null || !isActive() || !isConnected()) {
                return false;
            }
            if (this.m_aChannelOwned.length == 0) {
                this.m_nChannel = -1;
                gate.exit();
                return false;
            }
            if (this.m_aChannelOwned.length == 1) {
                int i = this.m_aChannelOwned[0];
                if (this.m_aChannel[i].isEmpty()) {
                    this.m_nChannel = -1;
                    gate.exit();
                    return false;
                }
                this.m_nChannel = i;
                gate.exit();
                return true;
            }
            int i2 = this.m_nChannel;
            int i3 = 0;
            while (true) {
                if (i3 >= this.m_aChannelOwned.length || this.m_aChannelOwned[i3] > i2) {
                    break;
                }
                if (this.m_aChannelOwned[i3] == i2) {
                    i3++;
                    break;
                }
                i3++;
            }
            if (i3 >= this.m_aChannelOwned.length) {
                i3 = 0;
            }
            int i4 = this.m_aChannelOwned[i3];
            int i5 = 0;
            while (i4 != i2 && i5 < this.m_aChannel.length && (!this.m_aChannel[i4].isOwned() || this.m_aChannel[i4].isEmpty())) {
                i5++;
                i4++;
                if (i4 == this.m_aChannel.length) {
                    i4 = 0;
                }
            }
            if (!this.m_aChannel[i4].isOwned() || this.m_aChannel[i4].isEmpty()) {
                this.m_nChannel = -1;
                gate.exit();
                return false;
            }
            this.m_nChannel = i4;
            gate.exit();
            return true;
        } finally {
            gate.exit();
        }
    }

    protected void onReceiveResult(PagedTopicChannel pagedTopicChannel, long j, long j2, PollProcessor.Result result, Throwable th) {
        int channelId = pagedTopicChannel.subscriberPartitionSync.getChannelId();
        this.f_receiveLock.lock();
        try {
            if (th == null) {
                Queue<Binary> elements = result.getElements();
                int size = elements.size();
                int remainingElementCount = result.getRemainingElementCount();
                int nextIndex = result.getNextIndex();
                pagedTopicChannel.setPolled();
                this.m_cPolls++;
                if (size == 0) {
                    this.m_cMisses++;
                } else if (!elements.isEmpty()) {
                    pagedTopicChannel.setHit();
                    this.m_cValues += size;
                    pagedTopicChannel.adjustPolls(size);
                    Stream<R> map = elements.stream().map(binary -> {
                        return new CommittableElement(binary, channelId);
                    });
                    ConcurrentLinkedDeque<PagedTopicSubscriber<V>.CommittableElement> concurrentLinkedDeque = this.m_queueValuesPrefetched;
                    Objects.requireNonNull(concurrentLinkedDeque);
                    map.forEach((v1) -> {
                        r1.add(v1);
                    });
                    if (!this.m_queueValuesPrefetched.isEmpty()) {
                        long currentTimeMillis = System.currentTimeMillis();
                        pagedTopicChannel.setFirstPolled((PagedPosition) this.m_queueValuesPrefetched.getFirst().getPosition(), currentTimeMillis);
                        pagedTopicChannel.setLastPolled((PagedPosition) this.m_queueValuesPrefetched.getLast().getPosition(), currentTimeMillis);
                    }
                }
                pagedTopicChannel.m_nNext = nextIndex;
                if (remainingElementCount == -1) {
                    if (j2 >= pagedTopicChannel.m_lHead && j2 != -1) {
                        pagedTopicChannel.m_lHead = j2 + 1;
                        pagedTopicChannel.m_nNext = 0;
                    }
                    if (j2 == -1) {
                        scheduleHeadIncrement(pagedTopicChannel, j2);
                    }
                    switchChannel();
                } else if (remainingElementCount == 0 || remainingElementCount == -3) {
                    if (remainingElementCount == 0) {
                        onChannelEmpty(channelId, j);
                    }
                    if (!switchChannel()) {
                        if (this.f_fCompleteOnEmpty) {
                            this.m_queueValuesPrefetched.add(getEmptyElement());
                        } else {
                            this.m_cWait++;
                        }
                    }
                } else if (remainingElementCount == -2) {
                    disconnectInternal(true);
                }
            } else {
                this.f_queueReceiveOrders.handleError((th2, request) -> {
                    return th;
                }, BatchingOperationsQueue.OnErrorAction.CompleteWithException);
            }
        } finally {
            this.f_receiveLock.unlock();
        }
    }

    public static void destroy(PagedTopicCaches pagedTopicCaches, SubscriberGroupId subscriberGroupId, long j) {
        PagedTopicService service = pagedTopicCaches.getService();
        if (j == 0 && !subscriberGroupId.isAnonymous()) {
            j = service.getSubscriptionId(pagedTopicCaches.getTopicName(), subscriberGroupId);
        }
        service.destroySubscription(j);
        if (pagedTopicCaches.isActive() && pagedTopicCaches.Subscriptions.isActive()) {
            int partitionCount = service.getPartitionCount();
            HashSet hashSet = new HashSet(partitionCount);
            for (int i = 0; i < partitionCount; i++) {
                hashSet.add(new Subscription.Key(i, 0, subscriberGroupId));
            }
            InvocableMapHelper.invokeAllAsync(pagedTopicCaches.Subscriptions, hashSet, key -> {
                return pagedTopicCaches.getUnitOfOrder(key.getPartitionId());
            }, new DestroySubscriptionProcessor(j), new BiConsumer[0]).join();
        }
    }

    private void closeInternal(boolean z) {
        if (this.m_nState != 5) {
            Sentry<?> close = this.f_gate.close();
            try {
                setState(4);
                if (close != null) {
                    close.close();
                }
                try {
                    unregisterMBean();
                    this.f_queueReceiveOrders.close();
                    this.f_queueReceiveOrders.cancelAllAndClose("Subscriber has been closed", null);
                    try {
                        flushInternal(z ? FlushMode.FLUSH_DESTROY : FlushMode.FLUSH).get(CLOSE_TIMEOUT_SECS, TimeUnit.SECONDS);
                    } catch (InterruptedException | ExecutionException e) {
                    } catch (TimeoutException e2) {
                        flushInternal(FlushMode.FLUSH_CLOSE_EXCEPTIONALLY).join();
                        Logger.warn("Subscriber.close: timeout after waiting " + CLOSE_TIMEOUT_SECS + " seconds for completion with flush.join(), forcing complete exceptionally");
                    }
                    if (z) {
                        notifyClosed(this.m_caches.Subscriptions, this.f_subscriberGroupId, this.m_subscriptionId, this.f_id);
                    } else {
                        unregisterDeactivationListener();
                        unregisterChannelAllocationListener();
                        unregisterNotificationListener();
                        notifyClosed(this.m_caches.Subscriptions, this.f_subscriberGroupId, this.m_subscriptionId, this.f_id);
                        removeSubscriberEntry();
                    }
                    if (!z && this.f_subscriberGroupId.getMemberTimestamp() != 0) {
                        destroy(this.m_caches, this.f_subscriberGroupId, this.m_subscriptionId);
                    }
                } finally {
                    setState(5);
                    this.f_listOnCloseActions.forEach(runnable -> {
                        try {
                            runnable.run();
                        } catch (Throwable th) {
                            Logger.finest(getClass().getName() + ".close(): handled onClose exception: " + th.getClass().getCanonicalName() + ": " + th.getMessage());
                        }
                    });
                    this.f_daemon.stop(true);
                    this.f_daemonChannels.stop(false);
                }
            } catch (Throwable th) {
                if (close != null) {
                    try {
                        close.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }
    }

    private CompletableFuture<Void> flushInternal(FlushMode flushMode) {
        String topicName = this.m_caches.getTopicName();
        String str = null;
        switch (flushMode) {
            case FLUSH:
            default:
                return this.f_queueReceiveOrders.flush();
            case FLUSH_DESTROY:
                str = "Topic " + topicName + " was destroyed";
                break;
            case FLUSH_CLOSE_EXCEPTIONALLY:
                break;
        }
        String str2 = str != null ? str : "Force Close of Subscriber " + String.valueOf(this.f_id) + " for topic " + topicName;
        BiFunction biFunction = (th, request) -> {
            return new TopicException(str2, th);
        };
        Arrays.stream(this.m_aChannel).forEach(pagedTopicChannel -> {
            this.f_queueReceiveOrders.handleError(biFunction, BatchingOperationsQueue.OnErrorAction.CompleteWithException);
        });
        return CompletableFuture.completedFuture(null);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static void notifyClosed(NamedCache<Subscription.Key, Subscription> namedCache, SubscriberGroupId subscriberGroupId, long j, SubscriberId subscriberId) {
        PagedTopicService pagedTopicService = (PagedTopicService) namedCache.getCacheService();
        if (j == 0) {
            j = pagedTopicService.getSubscriptionId(PagedTopicCaches.Names.getTopicName(namedCache.getCacheName()), subscriberGroupId);
        }
        PagedTopicSubscription subscription = pagedTopicService.getSubscription(j);
        if (subscription != null && subscription.hasSubscriber(subscriberId)) {
            pagedTopicService.destroySubscription(j, subscriberId);
        }
        if (namedCache.isActive()) {
            try {
                int partitionCount = pagedTopicService.getPartitionCount();
                ArrayList arrayList = new ArrayList(partitionCount);
                for (int i = 0; i < partitionCount; i++) {
                    arrayList.add(new Subscription.Key(i, 0, subscriberGroupId));
                }
                if (namedCache.isActive()) {
                    try {
                        namedCache.invokeAll(arrayList, new CloseSubscriptionProcessor(subscriberId));
                    } catch (Exception e) {
                    }
                }
            } catch (Throwable th) {
                if (namedCache.isActive()) {
                    Logger.fine("Caught exception closing subscription for subscriber " + (SubscriberId.NullSubscriber.equals(subscriberId) ? "<ALL>" : idToString(subscriberId.getId())) + " in group " + subscriberGroupId.getGroupName(), th);
                }
            }
        }
    }

    protected void removeSubscriberEntry() {
        NamedCache<SubscriberInfo.Key, SubscriberInfo> namedCache = this.m_caches.Subscribers;
        if (namedCache.isActive()) {
            try {
                namedCache.invoke(this.f_key, EvictSubscriber.INSTANCE);
            } catch (Throwable th) {
                Logger.err(th);
            }
        }
    }

    protected void registerChannelAllocationListener() {
        try {
            this.m_caches.f_topicService.addSubscriptionListener(this.m_listenerChannelAllocation);
        } catch (RuntimeException e) {
            Logger.err(e);
        }
    }

    protected void unregisterChannelAllocationListener() {
        try {
            this.m_caches.f_topicService.removeSubscriptionListener(this.m_listenerChannelAllocation);
        } catch (RuntimeException e) {
            Logger.err(e);
        }
    }

    protected void registerNotificationListener() {
        if (this.m_caches.Notifications.isActive()) {
            this.m_caches.Notifications.addMapListener((MapListener<? super NotificationKey, ? super int[]>) this.f_listenerNotification, (Filter) this.f_filterNotification, false);
        }
    }

    protected void unregisterNotificationListener() {
        if (this.m_caches.Notifications.isActive()) {
            this.m_caches.Notifications.removeMapListener(this.f_listenerNotification, this.f_filterNotification);
        }
    }

    protected void registerDeactivationListener() {
        PagedTopicSubscriber<V>.GroupDeactivationListener groupDeactivationListener;
        try {
            if (!this.f_fAnonymous && (groupDeactivationListener = this.m_listenerGroupDeactivation) != null) {
                this.m_caches.Subscriptions.addMapListener((MapListener<? super PagedTopicSubscriber<V>.GroupDeactivationListener, ? super Subscription>) groupDeactivationListener, (PagedTopicSubscriber<V>.GroupDeactivationListener) this.m_aChannel[0].subscriberPartitionSync, true);
            }
            this.m_caches.addListener(this.f_listenerDeactivation);
        } catch (RuntimeException e) {
        }
    }

    protected void unregisterDeactivationListener() {
        try {
            PagedTopicSubscriber<V>.GroupDeactivationListener groupDeactivationListener = this.m_listenerGroupDeactivation;
            if (groupDeactivationListener != null && this.m_caches.Subscriptions.isActive()) {
                this.m_caches.Subscriptions.removeMapListener(groupDeactivationListener, (PagedTopicSubscriber<V>.GroupDeactivationListener) this.m_aChannel[0].subscriberPartitionSync);
            }
            this.m_caches.removeListener(this.f_listenerDeactivation);
        } catch (RuntimeException e) {
        }
    }

    protected void registerMBean() {
        MBeanHelper.registerSubscriberMBean(this);
    }

    protected void unregisterMBean() {
        MBeanHelper.unregisterSubscriberMBean(this);
    }

    PagedTopicSubscriber<V>.CommittableElement getEmptyElement() {
        if (this.m_elementEmpty == null) {
            this.m_elementEmpty = new CommittableElement(PageElement.toBinary(-1, 0L, 0, 0L, ExternalizableHelper.toBinary(null, this.f_serializer)), -1);
        }
        return this.m_elementEmpty;
    }

    public static long createId(long j, long j2) {
        return (j2 << 32) | (j & DbLsn.MAX_FILE_OFFSET);
    }

    public static int memberIdFromId(long j) {
        return (int) (j >> 32);
    }

    public static String idToString(long j) {
        memberIdFromId(j);
        return j + "/" + j;
    }

    public static String idToString(SubscriberId subscriberId) {
        long id = subscriberId.getId();
        subscriberId.getMemberId();
        return id + "/" + id;
    }

    public static String idToString(Collection<Long> collection) {
        return (String) collection.stream().map((v0) -> {
            return idToString(v0);
        }).collect(Collectors.joining(","));
    }

    public static String subscriberIdToString(Collection<SubscriberId> collection) {
        return (String) collection.stream().map(PagedTopicSubscriber::idToString).collect(Collectors.joining(","));
    }

    public static int notificationIdFromId(long j) {
        return (int) (j & DbLsn.MAX_FILE_OFFSET);
    }

    public static Subscriber.Option withIdentifyingName(String str) {
        return new WithIdentifyingName(str);
    }

    public static Subscriber.Option withNotificationId(int i) {
        return () -> {
            return i;
        };
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1469262177:
                if (implMethodName.equals("getPosition")) {
                    z = false;
                    break;
                }
                break;
            case 45521504:
                if (implMethodName.equals("getTimestamp")) {
                    z = 2;
                    break;
                }
                break;
            case 1775810765:
                if (implMethodName.equals("getChannel")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 9 && serializedLambda.getFunctionalInterfaceClass().equals("com/tangosol/util/ValueExtractor") && serializedLambda.getFunctionalInterfaceMethodName().equals("extract") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/tangosol/net/topic/Subscriber$Element") && serializedLambda.getImplMethodSignature().equals("()Lcom/tangosol/net/topic/Position;")) {
                    return (v0) -> {
                        return v0.getPosition();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 9 && serializedLambda.getFunctionalInterfaceClass().equals("com/tangosol/util/ValueExtractor") && serializedLambda.getFunctionalInterfaceMethodName().equals("extract") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/tangosol/net/topic/Subscriber$Element") && serializedLambda.getImplMethodSignature().equals("()I")) {
                    return (v0) -> {
                        return v0.getChannel();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 9 && serializedLambda.getFunctionalInterfaceClass().equals("com/tangosol/util/ValueExtractor") && serializedLambda.getFunctionalInterfaceMethodName().equals("extract") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/tangosol/net/topic/Subscriber$Element") && serializedLambda.getImplMethodSignature().equals("()Ljava/time/Instant;")) {
                    return (v0) -> {
                        return v0.getTimestamp();
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
