package com.tangosol.internal.net.topic.impl.paged;

import com.oracle.coherence.common.base.Associated;
import com.oracle.coherence.common.base.Blocking;
import com.oracle.coherence.common.base.Exceptions;
import com.oracle.coherence.common.base.Logger;
import com.oracle.coherence.common.util.Duration;
import com.tangosol.internal.net.DebouncedFlowControl;
import com.tangosol.internal.net.topic.impl.paged.BatchingOperationsQueue;
import com.tangosol.internal.net.topic.impl.paged.PagedTopicPublisher;
import com.tangosol.internal.net.topic.impl.paged.agent.OfferProcessor;
import com.tangosol.internal.net.topic.impl.paged.agent.TailAdvancer;
import com.tangosol.internal.net.topic.impl.paged.agent.TopicInitialiseProcessor;
import com.tangosol.internal.net.topic.impl.paged.model.Page;
import com.tangosol.internal.net.topic.impl.paged.model.Usage;
import com.tangosol.internal.util.DaemonPool;
import com.tangosol.io.Serializer;
import com.tangosol.net.PagedTopicService;
import com.tangosol.net.partition.KeyPartitioningStrategy;
import com.tangosol.net.topic.NamedTopic;
import com.tangosol.net.topic.Publisher;
import com.tangosol.net.topic.TopicException;
import com.tangosol.net.topic.TopicPublisherException;
import com.tangosol.util.Binary;
import com.tangosol.util.InvocableMapHelper;
import com.tangosol.util.LongArray;
import com.tangosol.util.SparseArray;
import java.util.LinkedList;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Supplier;

/* loaded from: input_file:com/tangosol/internal/net/topic/impl/paged/PagedTopicChannelPublisher.class */
public class PagedTopicChannelPublisher {
    private static final Void VOID = null;
    private final long f_lPublisherId;
    private final int f_nChannel;
    private final int f_nChannelCount;
    private final String f_sTopicName;
    private final BiConsumer<Throwable, Integer> f_onErrorHandler;
    private PagedTopicCaches m_caches;
    private final Serializer f_serializer;
    private final KeyPartitioningStrategy f_keyPartitioningStrategy;
    private final int f_nNotifyPostFull;
    private final Usage.Key f_keyUsageSync;
    private final int f_nUsageSyncUnitOfOrder;
    private final BatchingOperationsQueue<Binary, Publisher.Status> f_batchingQueue;
    private volatile State m_state;
    private volatile long m_lTail = -1;
    private volatile CompletableFuture<Long> futurePageId;
    private CompletableFuture<Long> m_futureMovePage;
    private long m_cOffers;
    private long m_cOffersLast;
    private long m_cAccepted;
    private long m_cAcceptedLast;
    private long m_cMisses;
    private long m_cMissesLast;
    private long m_cWait;
    private long m_cWaitsLast;
    private long m_cNotify;
    private long m_cNotifyLast;

    /* loaded from: input_file:com/tangosol/internal/net/topic/impl/paged/PagedTopicChannelPublisher$AssociatedExecutor.class */
    protected class AssociatedExecutor implements BatchingOperationsQueue.Executor {
        private final DaemonPool f_pool;

        public AssociatedExecutor(DaemonPool daemonPool) {
            this.f_pool = daemonPool;
        }

        @Override // com.tangosol.internal.net.topic.impl.paged.BatchingOperationsQueue.Executor
        public void execute(Runnable runnable) {
            this.f_pool.add(new AssociatedTask(runnable));
        }
    }

    /* loaded from: input_file:com/tangosol/internal/net/topic/impl/paged/PagedTopicChannelPublisher$AssociatedTask.class */
    protected class AssociatedTask implements Runnable, Associated<Integer> {
        private final Runnable f_task;

        public AssociatedTask(Runnable runnable) {
            this.f_task = runnable;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // com.oracle.coherence.common.base.Associated
        public Integer getAssociatedKey() {
            return Integer.valueOf(PagedTopicChannelPublisher.this.f_nChannel);
        }

        @Override // java.lang.Runnable
        public void run() {
            this.f_task.run();
        }
    }

    /* loaded from: input_file:com/tangosol/internal/net/topic/impl/paged/PagedTopicChannelPublisher$State.class */
    public enum State {
        Active,
        Closing,
        Closed
    }

    public PagedTopicChannelPublisher(long j, int i, int i2, PagedTopicCaches pagedTopicCaches, int i3, DebouncedFlowControl debouncedFlowControl, DaemonPool daemonPool, BiConsumer<Throwable, Integer> biConsumer) {
        this.f_lPublisherId = j;
        this.f_nChannel = i;
        this.f_nChannelCount = i2;
        this.f_sTopicName = pagedTopicCaches.getTopicName();
        this.f_onErrorHandler = biConsumer;
        this.m_caches = pagedTopicCaches;
        this.f_nNotifyPostFull = i3;
        this.f_keyUsageSync = pagedTopicCaches.getUsageSyncKey(i);
        this.f_nUsageSyncUnitOfOrder = pagedTopicCaches.getUnitOfOrder(this.f_keyUsageSync.getPartitionId());
        this.f_serializer = pagedTopicCaches.getSerializer();
        this.f_keyPartitioningStrategy = pagedTopicCaches.getService().getKeyPartitioningStrategy();
        AssociatedExecutor associatedExecutor = new AssociatedExecutor(daemonPool);
        NamedTopic.ElementCalculator elementCalculator = pagedTopicCaches.getElementCalculator();
        Consumer consumer = (v1) -> {
            addQueuedElements(v1);
        };
        Objects.requireNonNull(elementCalculator);
        this.f_batchingQueue = new BatchingOperationsQueue<>((Consumer<Integer>) consumer, 1, debouncedFlowControl, elementCalculator::calculateUnits, associatedExecutor);
        this.m_state = State.Active;
    }

    public CompletableFuture<Publisher.Status> publish(Binary binary) {
        ensureConnected();
        try {
            return this.f_batchingQueue.add(binary);
        } catch (IllegalStateException e) {
            throw new IllegalStateException("This publisher is no longer active", e);
        }
    }

    private void ensureConnected() {
        long currentTimeMillis = System.currentTimeMillis();
        long as = PagedTopic.DEFAULT_RECONNECT_TIMEOUT_SECONDS.as(Duration.Magnitude.MILLI);
        long j = currentTimeMillis * 2;
        Throwable th = null;
        while (true) {
            if (currentTimeMillis >= j) {
                break;
            }
            PagedTopicCaches pagedTopicCaches = this.m_caches;
            if (this.m_state != State.Active || pagedTopicCaches == null) {
                return;
            }
            try {
                PagedTopicDependencies dependencies = pagedTopicCaches.getDependencies();
                dependencies.getReconnectRetryMillis();
                long reconnectTimeoutMillis = currentTimeMillis + dependencies.getReconnectTimeoutMillis();
                pagedTopicCaches.ensureConnected();
                PagedTopicService service = pagedTopicCaches.getService();
                if (service.isSuspended()) {
                    Blocking.sleep(100L);
                } else {
                    int ensureChannelCount = service.ensureChannelCount(this.f_sTopicName, this.f_nChannel + 1, this.f_nChannelCount);
                    if (this.f_nChannel >= ensureChannelCount) {
                        Logger.warn((Supplier<String>) () -> {
                            return String.format("This publisher is publishing to channel %d, but the topic is configured with %d channels", Integer.valueOf(this.f_nChannel), Integer.valueOf(ensureChannelCount));
                        });
                    }
                    th = null;
                }
            } catch (Throwable th2) {
                th = th2;
                if (th instanceof TopicException) {
                    break;
                }
                currentTimeMillis = System.currentTimeMillis();
                if (currentTimeMillis < j) {
                    String valueOf = String.valueOf(this);
                    th.getMessage();
                    Logger.finer("Failed to reconnect publisher, will retry in " + as + " millis " + as + " due to " + valueOf);
                    try {
                        Thread.sleep(as);
                    } catch (InterruptedException e) {
                    }
                }
            }
        }
        if (th != null) {
            throw Exceptions.ensureRuntimeException(th);
        }
    }

    public boolean isActive() {
        return this.m_state == State.Active;
    }

    public synchronized void stop() {
        if (this.m_state == State.Active) {
            this.m_state = State.Closing;
            this.f_batchingQueue.close();
        }
    }

    public synchronized void close() {
        if (this.m_state == State.Closing) {
            this.m_state = State.Closed;
            this.f_batchingQueue.cancelAllAndClose("Publisher has been closed", null);
            this.m_caches = null;
        }
    }

    public CompletableFuture<Void> flush(PagedTopicPublisher.FlushMode flushMode) {
        String str;
        String str2 = null;
        switch (flushMode) {
            case FLUSH_DESTROY:
                str2 = "Topic " + this.f_sTopicName + " was destroyed";
                break;
            case FLUSH_CLOSE_EXCEPTIONALLY:
                break;
            case FLUSH:
            default:
                return this.f_batchingQueue.flush();
        }
        if (str2 != null) {
            str = str2;
        } else {
            long j = this.f_lPublisherId;
            int i = this.f_nChannel;
            String str3 = this.f_sTopicName;
            str = "Force Close of Publisher " + j + " channel " + j + " for topic " + i;
        }
        this.f_batchingQueue.handleError(TopicPublisherException.createFactory(this.f_serializer, str), BatchingOperationsQueue.OnErrorAction.CompleteWithException);
        return this.f_batchingQueue.flush();
    }

    public int getChannel() {
        return this.f_nChannel;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void onNotification() {
        this.m_cNotify++;
        if (this.f_batchingQueue.resume()) {
            this.m_cWait++;
            addQueuedElements(1);
        }
    }

    protected void addQueuedElements(int i) {
        if (this.f_batchingQueue.fillCurrentBatch(i)) {
            ensurePageId().thenAccept(l -> {
                addInternal(this.m_lTail);
            }).handle((v1, v2) -> {
                return handleError(v1, v2);
            });
        }
    }

    protected void addInternal(long j) {
        LinkedList<Binary> currentBatchValues = this.f_batchingQueue.getCurrentBatchValues();
        if (currentBatchValues.isEmpty()) {
            return;
        }
        PagedTopicCaches pagedTopicCaches = this.m_caches;
        Page.Key key = new Page.Key(this.f_keyUsageSync.getChannelId(), j);
        InvocableMapHelper.invokeAsync(pagedTopicCaches.Pages, key, pagedTopicCaches.getUnitOfOrder(this.f_keyPartitioningStrategy.getKeyPartition(key)), new OfferProcessor(currentBatchValues, this.f_nNotifyPostFull, false), (result, th) -> {
            if (th == null) {
                handleOfferCompletion(result, j);
            } else {
                handleError(null, th);
            }
        });
    }

    /* JADX WARN: Failed to find 'out' block for switch in B:19:0x010b. Please report as an issue. */
    protected void handleOfferCompletion(OfferProcessor.Result result, long j) {
        LongArray<Throwable> errors = result.getErrors();
        SparseArray sparseArray = new SparseArray();
        int acceptedCount = result.getAcceptedCount();
        int channelId = this.f_keyUsageSync.getChannelId();
        this.m_cOffers++;
        this.m_cAccepted += acceptedCount;
        if (acceptedCount == 0) {
            this.m_cMisses++;
        }
        if (this.f_nNotifyPostFull != 0 || result.getStatus() != OfferProcessor.Result.Status.TopicFull) {
            int offset = result.getOffset();
            long j2 = 0;
            while (true) {
                long j3 = j2;
                if (j3 >= acceptedCount) {
                    break;
                }
                if (errors == null || errors.get(j3) == null) {
                    int i = offset;
                    offset++;
                    sparseArray.set(j3, new PagedTopicPublisher.PublishedStatus(channelId, j, i));
                }
                j2 = j3 + 1;
            }
        } else {
            int size = this.f_batchingQueue.getCurrentBatch().size();
            IllegalStateException illegalStateException = new IllegalStateException("the topic is at capacity");
            if (errors == null) {
                errors = new SparseArray();
            }
            while (acceptedCount < size) {
                acceptedCount++;
                errors.add(illegalStateException);
            }
        }
        this.f_batchingQueue.completeElements(acceptedCount, errors, sparseArray, TopicPublisherException.createFactory(this.f_serializer), null);
        handleIndividualErrors(errors);
        if (this.m_state == State.Closed) {
            return;
        }
        switch (result.getStatus()) {
            case PageSealed:
                moveToNextPage(j).thenRun(() -> {
                    addQueuedElements(result.getPageCapacity());
                }).handle((v1, v2) -> {
                    return handleError(v1, v2);
                });
                return;
            case TopicFull:
                if (this.f_nNotifyPostFull != 0) {
                    this.f_batchingQueue.pause();
                    return;
                }
            default:
                addQueuedElements(result.getPageCapacity());
                return;
        }
    }

    protected Void handleError(Object obj, Throwable th) {
        if (th != null) {
            synchronized (this) {
                stop();
                if (this.f_onErrorHandler != null) {
                    try {
                        this.f_onErrorHandler.accept(th, Integer.valueOf(this.f_nChannel));
                    } catch (Throwable th2) {
                        Logger.err(th2);
                    }
                }
                this.f_batchingQueue.handleError(TopicPublisherException.createFactory(this.f_serializer, th.getMessage()), BatchingOperationsQueue.OnErrorAction.CancelAndClose);
                close();
            }
        }
        return VOID;
    }

    protected void handleIndividualErrors(LongArray<Throwable> longArray) {
        if (longArray == null || longArray.isEmpty()) {
            return;
        }
        handleError(VOID, longArray.get(longArray.getFirstIndex()));
    }

    protected CompletableFuture<Long> ensurePageId() {
        return this.futurePageId == null ? initializePageId() : this.futurePageId;
    }

    private synchronized CompletableFuture<Long> initializePageId() {
        if (this.futurePageId == null) {
            this.futurePageId = InvocableMapHelper.invokeAsync(this.m_caches.Usages, this.f_keyUsageSync, this.m_caches.getUnitOfOrder(this.f_keyUsageSync.getPartitionId()), new TopicInitialiseProcessor(), new BiConsumer[0]);
        }
        return this.futurePageId;
    }

    protected CompletableFuture<Long> moveToNextPage(long j) {
        long j2 = this.m_lTail;
        if (j2 > j) {
            return CompletableFuture.completedFuture(Long.valueOf(j2));
        }
        synchronized (this) {
            long j3 = this.m_lTail;
            if (j3 > j) {
                return CompletableFuture.completedFuture(Long.valueOf(j3));
            }
            CompletableFuture<Long> completableFuture = this.m_futureMovePage;
            if (completableFuture == null) {
                CompletableFuture<Long> invokeAsync = InvocableMapHelper.invokeAsync(this.m_caches.Usages, this.f_keyUsageSync, this.f_nUsageSyncUnitOfOrder, new TailAdvancer(j + 1), (l, th) -> {
                    if (th == null) {
                        updatePageId(l.longValue());
                    } else {
                        handleError(l, th);
                    }
                });
                completableFuture = invokeAsync;
                this.m_futureMovePage = invokeAsync;
            }
            return completableFuture;
        }
    }

    protected void updatePageId(long j) {
        if (this.m_lTail < j) {
            synchronized (this) {
                if (this.m_lTail < j) {
                    this.m_lTail = j;
                }
            }
        }
        this.m_futureMovePage = null;
    }

    public boolean equals(Object obj) {
        if (this == obj) {
            return true;
        }
        if (obj == null || getClass() != obj.getClass()) {
            return false;
        }
        PagedTopicChannelPublisher pagedTopicChannelPublisher = (PagedTopicChannelPublisher) obj;
        return this.f_lPublisherId == pagedTopicChannelPublisher.f_lPublisherId && this.f_nChannel == pagedTopicChannelPublisher.f_nChannel && Objects.equals(this.m_caches, pagedTopicChannelPublisher.m_caches);
    }

    public int hashCode() {
        return Objects.hash(Long.valueOf(this.f_lPublisherId), Integer.valueOf(this.f_nChannel), this.m_caches);
    }

    public String toString() {
        long j = this.m_cOffers;
        long j2 = this.m_cAccepted;
        long j3 = this.m_cMisses;
        long j4 = this.m_cWait;
        long j5 = this.m_cNotify;
        long j6 = j - this.m_cOffersLast;
        long j7 = j2 - this.m_cAcceptedLast;
        long j8 = j3 - this.m_cMissesLast;
        long j9 = j4 - this.m_cWaitsLast;
        long j10 = j5 - this.m_cNotifyLast;
        this.m_cOffersLast = j;
        this.m_cAcceptedLast = j2;
        this.m_cMissesLast = j3;
        this.m_cWaitsLast = j4;
        this.m_cNotifyLast = j5;
        String simpleName = getClass().getSimpleName();
        String str = this.f_sTopicName;
        int i = this.f_nChannel;
        String valueOf = String.valueOf(this.m_state);
        long j11 = this.f_lPublisherId;
        long max = j7 / Math.max(1L, j6 - j8);
        long max2 = ((j6 - j8) * 100) / Math.max(1L, j6);
        long max3 = (j9 * 100) / Math.max(1L, j6);
        long max4 = (j10 * 100) / Math.max(1L, j6);
        return simpleName + "(topic=" + str + ", channel=" + i + ", state=" + valueOf + ", publisher=" + j11 + ", batchSize=" + simpleName + ", hitRate=" + max + "%, waitNotifyRate=" + simpleName + "/" + max2 + "%)";
    }
}
