package com.tangosol.internal.net.topic.impl.paged;

import com.oracle.coherence.common.base.Converter;
import com.oracle.coherence.common.base.Exceptions;
import com.oracle.coherence.common.base.Logger;
import com.oracle.coherence.common.util.MemorySize;
import com.oracle.coherence.common.util.Options;
import com.tangosol.coherence.config.Config;
import com.tangosol.internal.net.DebouncedFlowControl;
import com.tangosol.internal.net.topic.impl.paged.PagedTopicCaches;
import com.tangosol.internal.net.topic.impl.paged.model.NotificationKey;
import com.tangosol.internal.net.topic.impl.paged.model.PagedPosition;
import com.tangosol.internal.sleepycat.je.utilint.DbLsn;
import com.tangosol.internal.util.DaemonPool;
import com.tangosol.internal.util.Daemons;
import com.tangosol.internal.util.DefaultDaemonPoolDependencies;
import com.tangosol.io.ExternalizableLite;
import com.tangosol.io.Serializer;
import com.tangosol.io.pof.PofReader;
import com.tangosol.io.pof.PofWriter;
import com.tangosol.io.pof.PortableObject;
import com.tangosol.net.Cluster;
import com.tangosol.net.FlowControl;
import com.tangosol.net.topic.NamedTopic;
import com.tangosol.net.topic.Position;
import com.tangosol.net.topic.Publisher;
import com.tangosol.util.Base;
import com.tangosol.util.Binary;
import com.tangosol.util.ExternalizableHelper;
import com.tangosol.util.Filter;
import com.tangosol.util.MapListener;
import com.tangosol.util.filter.InKeySetFilter;
import com.tangosol.util.listener.SimpleMapListener;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.BitSet;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/* loaded from: input_file:com/tangosol/internal/net/topic/impl/paged/PagedTopicPublisher.class */
public class PagedTopicPublisher<V> implements Publisher<V> {
    public static final long CLOSE_TIMEOUT_SECS = TimeUnit.MILLISECONDS.toSeconds(Base.parseTime(Config.getProperty("coherence.topic.publisher.close.timeout", "30s"), 1000));
    private volatile State m_state;
    private final NamedTopic<V> m_topic;
    private PagedTopicCaches m_caches;
    private final String f_sTopicName;
    private final Converter<V, Binary> f_convValueToBinary;
    private final int f_nNotifyPostFull;
    private final Publisher.OrderBy<V> f_funcOrder;
    private final DebouncedFlowControl f_flowControl;
    protected final PagedTopicChannelPublisher[] f_aChannel;
    private final BitSet f_setOfferedChannel;
    private final MapListener<NotificationKey, int[]> f_listenerNotification;
    private final Filter<int[]> f_filterListenerNotification;
    private final DaemonPool f_daemon;
    private final Executor f_executor;
    private final long f_nId;
    private final Publisher.OnFailure f_onFailure;
    private final PagedTopicPublisher<V>.DeactivationListener f_listenerDeactivation = new DeactivationListener();
    private final List<Runnable> f_listOnCloseActions = new ArrayList();
    private final Lock f_lock = new ReentrantLock();

    /* loaded from: input_file:com/tangosol/internal/net/topic/impl/paged/PagedTopicPublisher$ChannelCount.class */
    public static class ChannelCount implements Publisher.Option<Object>, ExternalizableLite, PortableObject {
        public static final ChannelCount USE_CONFIGURED = new ChannelCount(-1);
        private int m_cChannel;

        public ChannelCount() {
            this(-1);
        }

        public ChannelCount(int i) {
            this.m_cChannel = i;
        }

        public boolean isUseConfigured() {
            return this.m_cChannel < 0;
        }

        public int getChannelCount() {
            return this.m_cChannel;
        }

        @Override // com.tangosol.io.ExternalizableLite
        public void readExternal(DataInput dataInput) throws IOException {
            this.m_cChannel = dataInput.readInt();
        }

        @Override // com.tangosol.io.ExternalizableLite
        public void writeExternal(DataOutput dataOutput) throws IOException {
            dataOutput.writeInt(this.m_cChannel);
        }

        @Override // com.tangosol.io.pof.PortableObject
        public void readExternal(PofReader pofReader) throws IOException {
            this.m_cChannel = pofReader.readInt(0);
        }

        @Override // com.tangosol.io.pof.PortableObject
        public void writeExternal(PofWriter pofWriter) throws IOException {
            pofWriter.writeInt(0, this.m_cChannel);
        }

        public static ChannelCount of(int i) {
            return new ChannelCount(i);
        }

        @Options.Default
        public static ChannelCount useConfigured() {
            return USE_CONFIGURED;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:com/tangosol/internal/net/topic/impl/paged/PagedTopicPublisher$DeactivationListener.class */
    public class DeactivationListener implements PagedTopicCaches.Listener {
        protected DeactivationListener() {
        }

        @Override // com.tangosol.internal.net.topic.impl.paged.PagedTopicCaches.Listener
        public void onConnect() {
        }

        @Override // com.tangosol.internal.net.topic.impl.paged.PagedTopicCaches.Listener
        public void onDisconnect() {
        }

        @Override // com.tangosol.internal.net.topic.impl.paged.PagedTopicCaches.Listener
        public void onDestroy() {
            if (PagedTopicPublisher.this.isActive()) {
                Logger.fine("Detected destroy of topic " + PagedTopicPublisher.this.f_sTopicName + ", closing publisher " + String.valueOf(PagedTopicPublisher.this));
                PagedTopicPublisher.this.closeInternal(true);
            }
        }

        @Override // com.tangosol.internal.net.topic.impl.paged.PagedTopicCaches.Listener
        public void onRelease() {
            if (PagedTopicPublisher.this.isActive()) {
                Logger.fine("Detected release of topic " + PagedTopicPublisher.this.f_sTopicName + ", closing publisher " + String.valueOf(PagedTopicPublisher.this));
                PagedTopicPublisher.this.closeInternal(false);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/tangosol/internal/net/topic/impl/paged/PagedTopicPublisher$FlushMode.class */
    public enum FlushMode {
        FLUSH,
        FLUSH_DESTROY,
        FLUSH_CLOSE_EXCEPTIONALLY
    }

    /* loaded from: input_file:com/tangosol/internal/net/topic/impl/paged/PagedTopicPublisher$PublishedStatus.class */
    protected static class PublishedStatus implements Publisher.Status {
        private final int f_nChannel;
        private final PagedPosition f_position;

        /* JADX INFO: Access modifiers changed from: protected */
        public PublishedStatus(int i, long j, int i2) {
            this.f_nChannel = i;
            this.f_position = new PagedPosition(j, i2);
        }

        @Override // com.tangosol.net.topic.Publisher.Status
        public int getChannel() {
            return this.f_nChannel;
        }

        @Override // com.tangosol.net.topic.Publisher.Status
        public Position getPosition() {
            return this.f_position;
        }

        public String toString() {
            return "PublishedStatus(channel=" + this.f_nChannel + ", position=" + String.valueOf(this.f_position) + ")";
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            PublishedStatus publishedStatus = (PublishedStatus) obj;
            return this.f_nChannel == publishedStatus.f_nChannel && Objects.equals(this.f_position, publishedStatus.f_position);
        }

        public int hashCode() {
            return Objects.hash(Integer.valueOf(this.f_nChannel), this.f_position);
        }
    }

    /* loaded from: input_file:com/tangosol/internal/net/topic/impl/paged/PagedTopicPublisher$State.class */
    public enum State {
        Active,
        Closing,
        Closed,
        Disconnected,
        OnError
    }

    @SafeVarargs
    public PagedTopicPublisher(NamedTopic<V> namedTopic, PagedTopicCaches pagedTopicCaches, Publisher.Option<? super V>... optionArr) {
        this.m_caches = (PagedTopicCaches) Objects.requireNonNull(pagedTopicCaches, "The PagedTopicCaches parameter cannot be null");
        this.m_topic = namedTopic;
        registerDeactivationListener();
        Serializer serializer = pagedTopicCaches.getSerializer();
        Cluster cluster = this.m_caches.getService().getCluster();
        Options from = Options.from(Publisher.Option.class, optionArr);
        this.f_nId = createId(System.identityHashCode(this), cluster.getLocalMember().getId());
        this.f_convValueToBinary = obj -> {
            return ExternalizableHelper.toBinary(obj, serializer);
        };
        this.f_sTopicName = pagedTopicCaches.getTopicName();
        this.f_nNotifyPostFull = from.contains(Publisher.FailOnFull.class) ? 0 : System.identityHashCode(this);
        this.f_funcOrder = computeOrderByOption(from);
        this.f_onFailure = (Publisher.OnFailure) from.get(Publisher.OnFailure.class);
        ChannelCount channelCount = (ChannelCount) from.get(ChannelCount.class, ChannelCount.USE_CONFIGURED);
        int channelCount2 = channelCount.isUseConfigured() ? namedTopic.getChannelCount() : channelCount.getChannelCount();
        long maxBatchSizeBytes = this.m_caches.getDependencies().getMaxBatchSizeBytes();
        this.f_aChannel = new PagedTopicChannelPublisher[channelCount2];
        this.f_setOfferedChannel = new BitSet(channelCount2);
        DebouncedFlowControl debouncedFlowControl = new DebouncedFlowControl(maxBatchSizeBytes * 2, maxBatchSizeBytes * 3, l -> {
            return new MemorySize(Math.abs(l.longValue())).toString();
        });
        this.f_flowControl = debouncedFlowControl;
        DefaultDaemonPoolDependencies defaultDaemonPoolDependencies = new DefaultDaemonPoolDependencies();
        defaultDaemonPoolDependencies.setName("Publisher-" + this.m_caches.getTopicName() + "-" + this.f_nId);
        defaultDaemonPoolDependencies.setThreadCountMin(1);
        defaultDaemonPoolDependencies.setThreadCount(1);
        defaultDaemonPoolDependencies.setThreadCountMax(Integer.MAX_VALUE);
        this.f_daemon = Daemons.newDaemonPool(defaultDaemonPoolDependencies);
        DaemonPool daemonPool = this.f_daemon;
        Objects.requireNonNull(daemonPool);
        this.f_executor = daemonPool::add;
        this.f_daemon.start();
        for (int i = 0; i < channelCount2; i++) {
            this.f_aChannel[i] = new PagedTopicChannelPublisher(this.f_nId, i, channelCount2, this.m_caches, this.f_nNotifyPostFull, debouncedFlowControl, this.f_daemon, (v1, v2) -> {
                handlePublishError(v1, v2);
            });
        }
        this.f_listenerNotification = new SimpleMapListener().addDeleteHandler(mapEvent -> {
            onNotification((int[]) mapEvent.getOldValue());
        }).synchronous();
        this.f_filterListenerNotification = this.f_nNotifyPostFull == 0 ? null : new InKeySetFilter(null, pagedTopicCaches.getPartitionNotifierSet(this.f_nNotifyPostFull));
        if (this.f_nNotifyPostFull != 0) {
            pagedTopicCaches.Notifications.addMapListener((MapListener<? super NotificationKey, ? super int[]>) this.f_listenerNotification, (Filter) this.f_filterListenerNotification, false);
        }
        this.m_state = State.Active;
    }

    @Override // com.tangosol.net.topic.Publisher
    public boolean isActive() {
        return this.m_state == State.Active || this.m_state == State.Disconnected;
    }

    @Override // com.tangosol.net.topic.Publisher
    public CompletableFuture<Publisher.Status> publish(V v) {
        ensureActive();
        IllegalStateException illegalStateException = null;
        for (int i = 0; i < 2; i++) {
            try {
                PagedTopicChannelPublisher ensureChannelPublisher = ensureChannelPublisher(v);
                CompletableFuture<Publisher.Status> publish = ensureChannelPublisher.publish(this.f_convValueToBinary.convert(v));
                publish.handleAsync((status, th) -> {
                    return handlePublished(ensureChannelPublisher.getChannel());
                }, this.f_executor);
                return publish;
            } catch (IllegalStateException e) {
                if (illegalStateException == null) {
                    illegalStateException = e;
                } else {
                    illegalStateException.addSuppressed(e);
                }
                ensureActive();
            }
        }
        throw Exceptions.ensureRuntimeException(illegalStateException);
    }

    @Override // com.tangosol.net.topic.Publisher
    public FlowControl getFlowControl() {
        return this.f_flowControl;
    }

    @Override // com.tangosol.net.topic.Publisher
    public CompletableFuture<Void> flush() {
        ensureActive();
        return flushInternal(FlushMode.FLUSH);
    }

    @Override // com.tangosol.net.topic.Publisher, java.lang.AutoCloseable
    public void close() {
        if (isActive()) {
            closeInternal(false);
        }
    }

    @Override // com.tangosol.net.topic.Publisher
    public void onClose(Runnable runnable) {
        this.f_listOnCloseActions.add(runnable);
    }

    @Override // com.tangosol.net.topic.Publisher
    public int getChannelCount() {
        return this.m_topic.getChannelCount();
    }

    @Override // com.tangosol.net.topic.Publisher
    public NamedTopic<V> getNamedTopic() {
        return this.m_topic;
    }

    public String getName() {
        return this.f_sTopicName;
    }

    protected void onNotification(int[] iArr) {
        for (int i : iArr) {
            this.f_aChannel[i].onNotification();
        }
    }

    public boolean equals(Object obj) {
        if (this == obj) {
            return true;
        }
        if (obj == null || getClass() != obj.getClass()) {
            return false;
        }
        return this.m_caches.equals(((PagedTopicPublisher) obj).m_caches);
    }

    public int hashCode() {
        return this.m_caches.hashCode();
    }

    public String toString() {
        PagedTopicCaches pagedTopicCaches = this.m_caches;
        if (pagedTopicCaches == null) {
            return getClass().getSimpleName() + "(inactive)";
        }
        StringBuilder append = new StringBuilder(getClass().getSimpleName()).append("(topic=").append(pagedTopicCaches.getTopicName()).append(", id=").append(this.f_nId).append(", orderBy=").append(this.f_funcOrder).append(", backlog=").append(this.f_flowControl).append(", channels=").append(this.f_setOfferedChannel.cardinality());
        if (!this.f_setOfferedChannel.isEmpty()) {
            for (PagedTopicChannelPublisher pagedTopicChannelPublisher : this.f_aChannel) {
                int channel = pagedTopicChannelPublisher.getChannel();
                if (this.f_setOfferedChannel.get(channel)) {
                    append.append("  ").append(channel).append(": ").append(pagedTopicChannelPublisher);
                }
            }
            this.f_setOfferedChannel.clear();
        }
        return append.toString();
    }

    protected Void handlePublished(int i) {
        this.f_setOfferedChannel.set(i);
        return null;
    }

    protected void handlePublishError(Throwable th, int i) {
        if (th != null) {
            switch (this.f_onFailure) {
                case Stop:
                    Logger.fine("Closing publisher due to publishing error from channel " + i + ", " + String.valueOf(th));
                    this.m_state = State.OnError;
                    CompletableFuture.runAsync(() -> {
                        closeInternal(false);
                    }, Daemons.commonPool());
                    return;
                case Continue:
                    Logger.finer("Publisher set to continue on error, ignoring publishing error from channel " + i + ", " + String.valueOf(th));
                    return;
                default:
                    return;
            }
        }
    }

    private PagedTopicChannelPublisher ensureChannelPublisher(V v) {
        int mod = Base.mod(v instanceof Publisher.Orderable ? ((Publisher.Orderable) v).getOrderId() : this.f_funcOrder.getOrderId(v), this.f_aChannel.length);
        PagedTopicChannelPublisher pagedTopicChannelPublisher = this.f_aChannel[mod];
        if (!pagedTopicChannelPublisher.isActive()) {
            if (this.f_onFailure == Publisher.OnFailure.Stop) {
                closeInternal(false);
                throw new IllegalStateException("This publisher is no longer active");
            }
            this.f_lock.lock();
            try {
                pagedTopicChannelPublisher = this.f_aChannel[mod];
                if (isActive() && !pagedTopicChannelPublisher.isActive()) {
                    this.m_caches.ensureConnected();
                    Logger.finer("Restarted publisher for channel " + mod + " topic " + this.m_caches.getTopicName() + " publisher " + this.f_nId);
                    PagedTopicChannelPublisher[] pagedTopicChannelPublisherArr = this.f_aChannel;
                    PagedTopicChannelPublisher pagedTopicChannelPublisher2 = new PagedTopicChannelPublisher(this.f_nId, mod, this.f_aChannel.length, this.m_caches, this.f_nNotifyPostFull, this.f_flowControl, this.f_daemon, (v1, v2) -> {
                        handlePublishError(v1, v2);
                    });
                    pagedTopicChannelPublisherArr[mod] = pagedTopicChannelPublisher2;
                    pagedTopicChannelPublisher = pagedTopicChannelPublisher2;
                }
            } finally {
                this.f_lock.unlock();
            }
        }
        return pagedTopicChannelPublisher;
    }

    private Publisher.OrderBy computeOrderByOption(Options options) {
        Iterator it = options.getInstancesOf(Publisher.OrderBy.class).iterator();
        return it.hasNext() ? (Publisher.OrderBy) it.next() : Publisher.OrderBy.thread();
    }

    private void ensureActive() {
        if (!isActive()) {
            throw new IllegalStateException("This publisher is no longer active");
        }
    }

    /* JADX WARN: Finally extract failed */
    protected void closeInternal(boolean z) {
        if (this.m_caches == null || this.m_state == State.Closing || this.m_state == State.Closed) {
            return;
        }
        this.f_lock.lock();
        try {
            if (this.m_caches == null || this.m_state == State.Closing || this.m_state == State.Closed) {
                this.f_lock.unlock();
                return;
            }
            this.m_state = State.Closing;
            if (!z) {
                try {
                    unregisterDeactivationListener();
                    if (this.f_nNotifyPostFull != 0) {
                        PagedTopicCaches pagedTopicCaches = this.m_caches;
                        if (pagedTopicCaches.Notifications.isActive()) {
                            pagedTopicCaches.Notifications.removeMapListener(this.f_listenerNotification, this.f_filterListenerNotification);
                        }
                    }
                } catch (Throwable th) {
                    this.m_caches = null;
                    Arrays.fill(this.f_aChannel, (Object) null);
                    this.f_listOnCloseActions.forEach(runnable -> {
                        try {
                            runnable.run();
                        } catch (Throwable th2) {
                            Logger.fine(getClass().getName() + ".close(): handled onClose exception: " + th2.getClass().getCanonicalName() + ": " + th2.getMessage());
                        }
                    });
                    this.f_daemon.shutdown();
                    this.m_state = State.Closed;
                    throw th;
                }
            }
            for (PagedTopicChannelPublisher pagedTopicChannelPublisher : this.f_aChannel) {
                pagedTopicChannelPublisher.stop();
            }
            try {
                flushInternal(z ? FlushMode.FLUSH_DESTROY : FlushMode.FLUSH).get(CLOSE_TIMEOUT_SECS, TimeUnit.SECONDS);
            } catch (InterruptedException | ExecutionException e) {
            } catch (TimeoutException e2) {
                flushInternal(FlushMode.FLUSH_CLOSE_EXCEPTIONALLY).join();
                Logger.warn("Publisher.close: timeout after waiting " + CLOSE_TIMEOUT_SECS + " seconds for completion with flush.join(), forcing complete exceptionally");
            }
            for (PagedTopicChannelPublisher pagedTopicChannelPublisher2 : this.f_aChannel) {
                pagedTopicChannelPublisher2.close();
            }
            this.m_caches = null;
            Arrays.fill(this.f_aChannel, (Object) null);
            this.f_listOnCloseActions.forEach(runnable2 -> {
                try {
                    runnable2.run();
                } catch (Throwable th2) {
                    Logger.fine(getClass().getName() + ".close(): handled onClose exception: " + th2.getClass().getCanonicalName() + ": " + th2.getMessage());
                }
            });
            this.f_daemon.shutdown();
            this.m_state = State.Closed;
        } finally {
            this.f_lock.unlock();
        }
    }

    private CompletableFuture<Void> flushInternal(FlushMode flushMode) {
        CompletableFuture[] completableFutureArr = new CompletableFuture[this.f_aChannel.length];
        for (int i = 0; i < completableFutureArr.length; i++) {
            completableFutureArr[i] = this.f_aChannel[i].flush(flushMode);
        }
        return CompletableFuture.allOf(completableFutureArr);
    }

    protected void registerDeactivationListener() {
        try {
            this.m_caches.addListener(this.f_listenerDeactivation);
        } catch (RuntimeException e) {
        }
    }

    protected void unregisterDeactivationListener() {
        try {
            this.m_caches.removeListener(this.f_listenerDeactivation);
        } catch (RuntimeException e) {
        }
    }

    static long createId(long j, long j2) {
        return (j2 << 32) | (j & DbLsn.MAX_FILE_OFFSET);
    }
}
