package com.aol.cyclops.types.futurestream;

import com.aol.cyclops.control.LazyReact;
import com.aol.cyclops.data.async.Queue;
import com.aol.cyclops.internal.react.async.future.CompletedException;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
import java.util.function.Function;

/* loaded from: input_file:com/aol/cyclops/types/futurestream/LazyToQueue.class */
public interface LazyToQueue<U> extends ToQueue<U> {
    <R> LazyFutureStream<R> then(Function<? super U, ? extends R> function, Executor executor);

    <R> LazyFutureStream<R> thenSync(Function<? super U, ? extends R> function);

    LazyReact getPopulator();

    LazyFutureStream<U> peekSync(Consumer<? super U> consumer);

    @Override // com.aol.cyclops.types.futurestream.ToQueue, com.aol.cyclops.types.futurestream.EagerToQueue
    default Queue<U> toQueue() {
        Queue<U> build = getQueueFactory().build();
        build.getClass();
        build.addContinuation(peekSync(build::add).self(lazyFutureStream -> {
            if (getPopulator().isPoolingActive()) {
                lazyFutureStream.peekSync((Consumer) obj -> {
                    throw new CompletedException(obj);
                });
            }
        }).runContinuation(() -> {
            build.close();
        }));
        return build;
    }

    @Override // com.aol.cyclops.types.futurestream.ToQueue
    default Queue<U> toQueue(Function<Queue, Queue> function) {
        Queue<U> apply = function.apply(getQueueFactory().build());
        apply.getClass();
        apply.addContinuation(thenSync(apply::add).self(lazyFutureStream -> {
            if (getPopulator().isPoolingActive()) {
                lazyFutureStream.peekSync((Consumer) bool -> {
                    throw new CompletedException(bool);
                });
            }
        }).runContinuation(() -> {
            apply.close();
        }));
        return apply;
    }

    @Override // com.aol.cyclops.types.futurestream.ToQueue
    default void addToQueue(Queue queue) {
        queue.getClass();
        queue.addContinuation(thenSync(queue::add).self(lazyFutureStream -> {
            if (getPopulator().isPoolingActive()) {
                lazyFutureStream.peekSync((Consumer) bool -> {
                    throw new CompletedException(bool);
                });
            }
        }).runContinuation(() -> {
            throw new Queue.ClosedQueueException();
        }));
    }

    @Override // com.aol.cyclops.types.futurestream.ToQueue
    default <K> void toQueue(Map<K, Queue<U>> map, Function<? super U, ? extends K> function) {
        LazyReact populator = getPopulator();
        then(obj -> {
            return Boolean.valueOf(((Queue) map.get(function.apply(obj))).offer(obj));
        }, populator.getExecutor()).runThread(() -> {
            map.values().forEach(queue -> {
                queue.close();
            });
            returnPopulator(populator);
        });
    }

    void returnPopulator(LazyReact lazyReact);

    default U add(U u, Queue<U> queue) {
        if (queue.add(u)) {
            return u;
        }
        throw new RuntimeException();
    }
}
