package com.tangosol.internal.net.topic.impl.paged;

import com.oracle.coherence.common.base.Logger;
import com.oracle.coherence.common.base.NonBlocking;
import com.tangosol.internal.net.DebouncedFlowControl;
import com.tangosol.util.Gate;
import com.tangosol.util.LongArray;
import com.tangosol.util.NullImplementation;
import com.tangosol.util.SimpleLongArray;
import com.tangosol.util.TaskDaemon;
import com.tangosol.util.ThreadGateLite;
import java.util.Deque;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Objects;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.function.ToLongFunction;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/* loaded from: input_file:com/tangosol/internal/net/topic/impl/paged/BatchingOperationsQueue.class */
public class BatchingOperationsQueue<V, R> {
    public static final int TRIGGER_OPEN = 0;
    public static final int TRIGGER_CLOSED = 1;
    public static final int TRIGGER_WAIT = 2;
    private final BiConsumer<BatchingOperationsQueue<V, R>, Integer> f_functionBatch;
    private final int f_cbInitialBatch;
    private final Deque<BatchingOperationsQueue<V, R>.Element> f_queuePending;
    private final Deque<BatchingOperationsQueue<V, R>.Element> f_queueCurrentBatch;
    private long m_cbCurrentBatch;
    private final Gate<?> f_gate;
    private final AtomicInteger f_lockTrigger;
    private final DebouncedFlowControl f_backlog;
    private final ToLongFunction<V> f_backlogCalculator;
    private final Executor f_executor;
    private boolean m_fActive;

    /* loaded from: input_file:com/tangosol/internal/net/topic/impl/paged/BatchingOperationsQueue$Element.class */
    public class Element {
        private final V f_value;
        private long m_cbSize;
        private volatile boolean m_fDone = false;
        private volatile boolean m_fCancelled = false;
        private final CompletableFuture<R> f_future = new CompletableFuture<>();

        public Element(V v) {
            this.f_value = v;
            this.f_future.handle((obj, th) -> {
                this.m_fCancelled = th instanceof CancellationException;
                this.m_fDone = true;
                return null;
            });
        }

        public V getValue() {
            return this.f_value;
        }

        public CompletableFuture<R> getFuture() {
            return this.f_future;
        }

        public boolean isDone() {
            return this.m_fDone || this.f_future.isDone();
        }

        public boolean isCancelled() {
            return this.m_fCancelled || this.f_future.isCancelled();
        }

        public void complete(R r, Consumer<R> consumer) {
            if (this.m_fDone) {
                return;
            }
            this.m_fDone = true;
            BatchingOperationsQueue.this.f_executor.complete(this.f_future, r, consumer);
        }

        public boolean completeSynchronous(R r, Consumer<R> consumer) {
            boolean z = false;
            if (!this.m_fDone) {
                this.m_fDone = true;
                z = this.f_future.complete(r);
                if (z && consumer != null) {
                    try {
                        consumer.accept(r);
                    } catch (Throwable th) {
                        Logger.err(th);
                    }
                }
            }
            return z;
        }

        public void completeExceptionally(Throwable th, BiFunction<Throwable, V, Throwable> biFunction) {
            if (this.m_fDone) {
                return;
            }
            this.m_fDone = true;
            BatchingOperationsQueue.this.f_executor.completeExceptionally(this.f_future, biFunction.apply(th, this.f_value));
        }

        public void cancel(BiFunction<Throwable, V, Throwable> biFunction, String str) {
            if (this.m_fDone) {
                return;
            }
            OperationCancelledException operationCancelledException = (str == null || str.isEmpty()) ? new OperationCancelledException() : new OperationCancelledException(str);
            BatchingOperationsQueue.this.f_executor.completeExceptionally(this.f_future, biFunction == null ? operationCancelledException : biFunction.apply(operationCancelledException, this.f_value));
        }

        public long getSize() {
            return this.m_cbSize;
        }

        public void setSize(long j) {
            this.m_cbSize = j;
        }
    }

    /* loaded from: input_file:com/tangosol/internal/net/topic/impl/paged/BatchingOperationsQueue$Executor.class */
    public interface Executor {
        void execute(Runnable runnable);

        default <R> void complete(CompletableFuture<R> completableFuture, R r, Consumer<R> consumer) {
            execute(() -> {
                if (!completableFuture.complete(r) || consumer == null) {
                    return;
                }
                try {
                    consumer.accept(r);
                } catch (Throwable th) {
                    Logger.err(th);
                }
            });
        }

        default void completeExceptionally(CompletableFuture<?> completableFuture, Throwable th) {
            execute(() -> {
                completableFuture.completeExceptionally(th);
            });
        }

        static Executor sameThread() {
            return (v0) -> {
                v0.run();
            };
        }

        static Executor fromTaskDaemon(TaskDaemon taskDaemon) {
            Objects.requireNonNull(taskDaemon);
            return taskDaemon::executeTask;
        }
    }

    /* loaded from: input_file:com/tangosol/internal/net/topic/impl/paged/BatchingOperationsQueue$OnErrorAction.class */
    public enum OnErrorAction {
        Retry,
        Complete,
        CompleteAndClose,
        CompleteWithException,
        CompleteWithExceptionAndClose,
        Cancel,
        CancelAndClose
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:com/tangosol/internal/net/topic/impl/paged/BatchingOperationsQueue$OperationCancelledException.class */
    public static class OperationCancelledException extends CancellationException {
        public OperationCancelledException() {
        }

        public OperationCancelledException(String str) {
            super(str);
        }
    }

    public BatchingOperationsQueue(Consumer<Integer> consumer, int i) {
        this((batchingOperationsQueue, num) -> {
            consumer.accept(num);
        }, i, new DebouncedFlowControl(i, SimpleLongArray.MAX), obj -> {
            return 1L;
        }, (v0) -> {
            v0.run();
        });
    }

    public BatchingOperationsQueue(Consumer<Integer> consumer, int i, DebouncedFlowControl debouncedFlowControl) {
        this(consumer, i, debouncedFlowControl, obj -> {
            return 1L;
        }, (v0) -> {
            v0.run();
        });
    }

    public BatchingOperationsQueue(Consumer<Integer> consumer, int i, DebouncedFlowControl debouncedFlowControl, ToLongFunction<V> toLongFunction, Executor executor) {
        this((batchingOperationsQueue, num) -> {
            consumer.accept(num);
        }, i, debouncedFlowControl, toLongFunction, executor);
    }

    public BatchingOperationsQueue(BiConsumer<BatchingOperationsQueue<V, R>, Integer> biConsumer, int i, DebouncedFlowControl debouncedFlowControl, ToLongFunction<V> toLongFunction, Executor executor) {
        this.f_lockTrigger = new AtomicInteger(0);
        this.m_fActive = true;
        this.f_functionBatch = biConsumer;
        this.f_cbInitialBatch = i;
        this.f_queuePending = new ConcurrentLinkedDeque();
        this.f_queueCurrentBatch = new ConcurrentLinkedDeque();
        this.f_gate = new ThreadGateLite();
        this.f_backlog = debouncedFlowControl;
        this.f_backlogCalculator = toLongFunction == null ? obj -> {
            return 1L;
        } : toLongFunction;
        this.f_executor = executor == null ? Executor.sameThread() : executor;
        resetTrigger();
    }

    public CompletableFuture<R> add(V v) {
        return add(v, false);
    }

    public CompletableFuture<R> addFirst(V v) {
        return add(v, true);
    }

    public void close() {
        Gate<?> gate = getGate();
        gate.close(-1L);
        try {
            this.m_fActive = false;
        } finally {
            gate.open();
        }
    }

    public CompletableFuture<Void> flush() {
        Gate<?> gate = getGate();
        gate.close(-1L);
        try {
            CompletableFuture handle = CompletableFuture.allOf((CompletableFuture[]) Stream.concat(getCurrentBatch().stream(), getPending().stream()).map((v0) -> {
                return v0.getFuture();
            }).filter(completableFuture -> {
                return !completableFuture.isDone();
            }).toArray(i -> {
                return new CompletableFuture[i];
            })).handle((r2, th) -> {
                return null;
            });
            gate.open();
            return handle;
        } catch (Throwable th2) {
            gate.open();
            throw th2;
        }
    }

    public LinkedList<V> getCurrentBatchValues() {
        Stream stream = getCurrentBatch().stream();
        Predicate predicate = (v0) -> {
            return v0.isDone();
        };
        return (LinkedList) stream.filter(predicate.negate()).map((v0) -> {
            return v0.getValue();
        }).collect(Collectors.toCollection(LinkedList::new));
    }

    public boolean isBatchComplete() {
        return purgeCurrentBatch();
    }

    public int size() {
        return getCurrentBatchSize() + getPendingSize();
    }

    public int getCurrentBatchSize() {
        return this.f_queueCurrentBatch.size();
    }

    public int getPendingSize() {
        return this.f_queuePending.size();
    }

    public void handleError(BiFunction<Throwable, V, Throwable> biFunction, OnErrorAction onErrorAction) {
        Gate<?> gate = getGate();
        gate.close(-1L);
        boolean z = false;
        if (onErrorAction == null) {
            try {
                onErrorAction = OnErrorAction.CompleteWithException;
            } finally {
                gate.open();
            }
        }
        switch (onErrorAction) {
            case Retry:
                Deque<BatchingOperationsQueue<V, R>.Element> currentBatch = getCurrentBatch();
                while (!currentBatch.isEmpty()) {
                    BatchingOperationsQueue<V, R>.Element pollLast = currentBatch.pollLast();
                    long applyAsLong = this.f_backlogCalculator.applyAsLong(pollLast.getValue());
                    this.m_cbCurrentBatch -= applyAsLong;
                    if (!pollLast.isDone()) {
                        this.f_backlog.adjustBacklog(applyAsLong);
                        getPending().offerFirst(pollLast);
                    }
                }
                resetTrigger();
                triggerOperations(this.f_cbInitialBatch);
                break;
            case CompleteAndClose:
                z = true;
            case Complete:
                doErrorAction(element -> {
                    element.complete(null, null);
                }, z);
                break;
            case CompleteWithExceptionAndClose:
                z = true;
            case CompleteWithException:
                doErrorAction(element2 -> {
                    element2.completeExceptionally(null, biFunction);
                }, z);
                break;
            case CancelAndClose:
                z = true;
            case Cancel:
                doErrorAction(element3 -> {
                    element3.cancel(biFunction, null);
                }, z);
                break;
        }
    }

    public void cancelAllAndClose(String str, BiFunction<Throwable, V, Throwable> biFunction) {
        Gate<?> gate = getGate();
        gate.close(-1L);
        try {
            doErrorAction(element -> {
                element.cancel(biFunction, str);
            }, true);
            gate.open();
        } catch (Throwable th) {
            gate.open();
            throw th;
        }
    }

    public boolean isActive() {
        return this.m_fActive;
    }

    protected BatchingOperationsQueue<V, R>.Element createElement(V v) {
        return new Element(v);
    }

    /*  JADX ERROR: Failed to decode insn: 0x009D: MOVE_MULTI, method: com.tangosol.internal.net.topic.impl.paged.BatchingOperationsQueue.fillCurrentBatch(int):boolean
        java.lang.ArrayIndexOutOfBoundsException: arraycopy: source index -1 out of bounds for object array[6]
        	at java.base/java.lang.System.arraycopy(Native Method)
        	at jadx.plugins.input.java.data.code.StackState.insert(StackState.java:49)
        	at jadx.plugins.input.java.data.code.CodeDecodeState.insert(CodeDecodeState.java:118)
        	at jadx.plugins.input.java.data.code.JavaInsnsRegister.dup2x1(JavaInsnsRegister.java:313)
        	at jadx.plugins.input.java.data.code.JavaInsnData.decode(JavaInsnData.java:46)
        	at jadx.core.dex.instructions.InsnDecoder.lambda$process$0(InsnDecoder.java:54)
        	at jadx.plugins.input.java.data.code.JavaCodeReader.visitInstructions(JavaCodeReader.java:81)
        	at jadx.core.dex.instructions.InsnDecoder.process(InsnDecoder.java:50)
        	at jadx.core.dex.nodes.MethodNode.load(MethodNode.java:156)
        	at jadx.core.dex.nodes.ClassNode.load(ClassNode.java:443)
        	at jadx.core.ProcessClass.process(ProcessClass.java:70)
        	at jadx.core.ProcessClass.generateCode(ProcessClass.java:118)
        	at jadx.core.dex.nodes.ClassNode.generateClassCode(ClassNode.java:400)
        	at jadx.core.dex.nodes.ClassNode.decompile(ClassNode.java:388)
        	at jadx.core.dex.nodes.ClassNode.getCode(ClassNode.java:338)
        */
    public boolean fillCurrentBatch(int r7) {
        /*
            Method dump skipped, instructions count: 237
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: com.tangosol.internal.net.topic.impl.paged.BatchingOperationsQueue.fillCurrentBatch(int):boolean");
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void resetTrigger() {
        getTrigger().set(0);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void pause() {
        getTrigger().set(2);
    }

    public boolean resume() {
        return getTrigger().compareAndSet(2, 1);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void triggerOperations() {
        triggerOperations(Math.max(this.f_cbInitialBatch, 1));
    }

    protected void triggerOperations(int i) {
        AtomicInteger trigger = getTrigger();
        if (trigger.get() == 0 && trigger.compareAndSet(0, 1)) {
            this.f_functionBatch.accept(this, Integer.valueOf(i));
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    public boolean completeElement(Object obj, Consumer<R> consumer) {
        boolean z = false;
        BatchingOperationsQueue<V, R>.Element poll = getCurrentBatch().poll();
        if (poll != null) {
            V value = poll.getValue();
            this.m_cbCurrentBatch -= value != null ? this.f_backlogCalculator.applyAsLong(value) : 0L;
            if (!poll.isDone()) {
                z = poll.completeSynchronous(obj, consumer);
            }
        }
        return z;
    }

    public void completeElements(int i, LongArray<R> longArray, BiFunction<Throwable, V, Throwable> biFunction, Consumer<R> consumer) {
        completeElements(i, NullImplementation.getLongArray(), longArray, biFunction, consumer);
    }

    public void completeElements(int i, LongArray<Throwable> longArray, LongArray<R> longArray2, BiFunction<Throwable, V, Throwable> biFunction, Consumer<R> consumer) {
        Deque<BatchingOperationsQueue<V, R>.Element> currentBatch = getCurrentBatch();
        for (int i2 = 0; i2 < i; i2++) {
            BatchingOperationsQueue<V, R>.Element poll = currentBatch.poll();
            if (poll != null) {
                V value = poll.getValue();
                this.m_cbCurrentBatch -= value != null ? this.f_backlogCalculator.applyAsLong(value) : 0L;
                if (!poll.isDone()) {
                    Throwable th = longArray == null ? null : longArray.get(i2);
                    if (th == null) {
                        poll.complete(longArray2.get(i2), consumer);
                    } else {
                        poll.completeExceptionally(th, biFunction);
                    }
                }
            }
        }
    }

    protected Gate<?> getGate() {
        return this.f_gate;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Deque<BatchingOperationsQueue<V, R>.Element> getCurrentBatch() {
        return this.f_queueCurrentBatch;
    }

    protected Deque<BatchingOperationsQueue<V, R>.Element> getPending() {
        return this.f_queuePending;
    }

    protected AtomicInteger getTrigger() {
        return this.f_lockTrigger;
    }

    public String toString() {
        return "BatchingOperationsQueue(current=" + getCurrentBatch().size() + ", pending=" + getPending().size() + ", trigger=" + triggerToString(getTrigger().get()) + ", backlog=" + String.valueOf(this.f_backlog) + ")";
    }

    private CompletableFuture<R> add(V v, boolean z) {
        BatchingOperationsQueue<V, R>.Element createElement = createElement(v);
        Gate<?> gate = getGate();
        gate.enter(-1L);
        try {
            assertActive();
            if (z) {
                getPending().addFirst(createElement);
            } else {
                getPending().add(createElement);
            }
            triggerOperations(this.f_cbInitialBatch);
            this.f_backlog.adjustBacklog(this.f_backlogCalculator.applyAsLong(v));
            return createElement.getFuture();
        } finally {
            gate.exit();
        }
    }

    protected void assertActive() {
        if (!isActive()) {
            throw new IllegalStateException("This batching queue is no longer active");
        }
    }

    protected void doErrorAction(Consumer<BatchingOperationsQueue<V, R>.Element> consumer, boolean z) {
        Deque<BatchingOperationsQueue<V, R>.Element> currentBatch = getCurrentBatch();
        Deque<BatchingOperationsQueue<V, R>.Element> pending = getPending();
        if (!currentBatch.isEmpty() || !pending.isEmpty()) {
            Stream.concat(currentBatch.stream(), pending.stream()).forEach(element -> {
                if (element.isDone()) {
                    return;
                }
                consumer.accept(element);
            });
            this.m_cbCurrentBatch = 0L;
            NonBlocking nonBlocking = new NonBlocking();
            try {
                this.f_backlog.adjustBacklog(-pending.stream().map(element2 -> {
                    return Long.valueOf(this.f_backlogCalculator.applyAsLong(element2.getValue()));
                }).mapToLong((v0) -> {
                    return v0.longValue();
                }).sum());
                nonBlocking.close();
            } catch (Throwable th) {
                try {
                    nonBlocking.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
                throw th;
            }
        }
        if (z) {
            close();
        } else {
            resetTrigger();
        }
    }

    private String triggerToString(int i) {
        switch (i) {
            case 0:
                return "TRIGGER_OPEN";
            case 1:
                return "TRIGGER_CLOSED";
            case 2:
                return "TRIGGER_WAIT";
            default:
                return "TRIGGER_UNKNOWN";
        }
    }

    private boolean purgeCurrentBatch() {
        if (this.f_queueCurrentBatch.isEmpty()) {
            return true;
        }
        Gate<?> gate = getGate();
        gate.close(-1L);
        try {
            Iterator<BatchingOperationsQueue<V, R>.Element> it = this.f_queueCurrentBatch.iterator();
            long j = this.m_cbCurrentBatch;
            while (it.hasNext()) {
                BatchingOperationsQueue<V, R>.Element next = it.next();
                if (next.isDone()) {
                    it.remove();
                    j -= next.getSize();
                }
            }
            this.m_cbCurrentBatch = j;
            gate.open();
            return getCurrentBatchValues().isEmpty();
        } catch (Throwable th) {
            gate.open();
            throw th;
        }
    }
}
