package com.aol.cyclops.types.futurestream;

import com.aol.cyclops.control.LazyReact;
import com.aol.cyclops.control.StreamUtils;
import com.aol.cyclops.data.async.QueueFactory;
import com.aol.cyclops.data.collections.extensions.standard.ListX;
import com.aol.cyclops.internal.react.async.future.FastFuture;
import com.aol.cyclops.internal.react.exceptions.FilteredExecutionPathException;
import com.aol.cyclops.internal.react.stream.LazyStreamWrapper;
import com.aol.cyclops.react.SimpleReactFailedStageException;
import com.aol.cyclops.react.async.subscription.Continueable;
import com.nurkiewicz.asyncretry.RetryExecutor;
import com.nurkiewicz.asyncretry.policy.AbortRetryException;
import java.util.Optional;
import java.util.Spliterators;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

/* loaded from: input_file:com/aol/cyclops/types/futurestream/LazySimpleReactStream.class */
public interface LazySimpleReactStream<U> extends BlockingStream<U>, ConfigurableStream<U, FastFuture<U>>, ToQueue<U>, BaseSimpleReactStream<U> {
    LazyReact getSimpleReact();

    LazySimpleReactStream<U> withTaskExecutor(Executor executor);

    LazySimpleReactStream<U> withRetrier(RetryExecutor retryExecutor);

    LazySimpleReactStream<U> withQueueFactory(QueueFactory<U> queueFactory);

    LazySimpleReactStream<U> withErrorHandler(Optional<Consumer<Throwable>> optional);

    LazySimpleReactStream<U> withSubscription(Continueable continueable);

    LazySimpleReactStream<U> withAsync(boolean z);

    Continueable getSubscription();

    <R> LazySimpleReactStream<R> withLastActive(LazyStreamWrapper<R> lazyStreamWrapper);

    @Override // com.aol.cyclops.types.futurestream.BlockingStream, com.aol.cyclops.types.futurestream.BaseSimpleReactStream, com.aol.cyclops.types.futurestream.LazyStream, com.aol.cyclops.types.stream.reactive.FutureStreamSynchronousPublisher
    LazyStreamWrapper<U> getLastActive();

    default <R> LazySimpleReactStream<R> then(Function<? super U, ? extends R> function, Executor executor) {
        return withLastActive(getLastActive().operation(pipelineBuilder -> {
            return pipelineBuilder.thenApplyAsync(handleExceptions(function), executor);
        }));
    }

    @Override // com.aol.cyclops.types.futurestream.BaseSimpleReactStream, com.aol.cyclops.types.futurestream.EagerToQueue
    default <R> LazySimpleReactStream<R> thenSync(Function<? super U, ? extends R> function) {
        return withLastActive(getLastActive().operation(pipelineBuilder -> {
            return pipelineBuilder.thenApply(handleExceptions(function));
        }));
    }

    default <R> LazySimpleReactStream<R> retry(Function<? super U, ? extends R> function) {
        return withLastActive(getLastActive().operation(pipelineBuilder -> {
            return pipelineBuilder.thenApplyAsync(obj -> {
                return getRetrier().getWithRetry(() -> {
                    return handleExceptions(function).apply(obj);
                }).join();
            }, getTaskExecutor());
        }));
    }

    default <R> LazySimpleReactStream<R> then(Function<? super U, ? extends R> function) {
        if (!isAsync()) {
            return thenSync((Function) function);
        }
        return withLastActive(getLastActive().operation(pipelineBuilder -> {
            return pipelineBuilder.thenApplyAsync(handleExceptions(function), getTaskExecutor());
        }));
    }

    default LazySimpleReactStream<U> peek(Consumer<? super U> consumer) {
        return !isAsync() ? peekSync((Consumer) consumer) : (LazySimpleReactStream<U>) then((Function) obj -> {
            consumer.accept(obj);
            return obj;
        });
    }

    default LazySimpleReactStream<U> peekSync(Consumer<? super U> consumer) {
        return (LazySimpleReactStream<U>) thenSync((Function) obj -> {
            consumer.accept(obj);
            return obj;
        });
    }

    static <U, R> Function<U, R> handleExceptions(Function<? super U, ? extends R> function) {
        return obj -> {
            try {
                return function.apply(obj);
            } catch (Throwable th) {
                if (th instanceof AbortRetryException) {
                    throw th;
                }
                throw new SimpleReactFailedStageException(obj, th);
            }
        };
    }

    @Override // com.aol.cyclops.types.futurestream.BaseSimpleReactStream
    default <R> LazySimpleReactStream<R> flatMapToCompletableFuture(Function<? super U, CompletableFuture<? extends R>> function) {
        if (!isAsync()) {
            return flatMapToCompletableFutureSync((Function) function);
        }
        return withLastActive(getLastActive().operation(pipelineBuilder -> {
            return pipelineBuilder.thenComposeAsync(handleExceptions(function), getTaskExecutor());
        }));
    }

    @Override // com.aol.cyclops.types.futurestream.BaseSimpleReactStream
    default <R> LazySimpleReactStream<R> flatMapToCompletableFutureSync(Function<? super U, CompletableFuture<? extends R>> function) {
        return withLastActive(getLastActive().operation(pipelineBuilder -> {
            return pipelineBuilder.thenCompose(handleExceptions(function));
        }));
    }

    default <R> LazySimpleReactStream<R> flatMap(Function<? super U, ? extends Stream<? extends R>> function) {
        return getSimpleReact().construct(Stream.of(new Object[0])).withSubscription(getSubscription()).withQueueFactory((QueueFactory) getQueueFactory()).fromStream((Stream) toQueue().stream(getSubscription()).flatMap(function));
    }

    default ListX<BaseSimpleReactStream<U>> copySimpleReactStream(int i) {
        return (ListX<BaseSimpleReactStream<U>>) StreamUtils.toBufferingCopier(iterator(), i).mo60stream().map(it -> {
            return StreamSupport.stream(Spliterators.spliteratorUnknownSize(it, 16), false);
        }).map(obj -> {
            return getSimpleReact().construct((Stream) obj);
        }).toListX();
    }

    default LazySimpleReactStream<U> filter(Predicate<? super U> predicate) {
        if (!isAsync()) {
            return filterSync((Predicate) predicate);
        }
        return (LazySimpleReactStream<U>) withLastActive(getLastActive().operation(pipelineBuilder -> {
            return pipelineBuilder.thenApplyAsync(obj -> {
                if (predicate.test(obj)) {
                    return obj;
                }
                throw new FilteredExecutionPathException();
            }, getTaskExecutor());
        }));
    }

    @Override // com.aol.cyclops.types.futurestream.BaseSimpleReactStream
    default LazySimpleReactStream<U> filterSync(Predicate<? super U> predicate) {
        return (LazySimpleReactStream<U>) withLastActive(getLastActive().operation(pipelineBuilder -> {
            return pipelineBuilder.thenApply(obj -> {
                if (predicate.test(obj)) {
                    return obj;
                }
                throw new FilteredExecutionPathException();
            });
        }));
    }

    @Override // com.aol.cyclops.types.futurestream.BaseSimpleReactStream
    default <T> Stream<CompletableFuture<T>> streamCompletableFutures() {
        return getLastActive().stream();
    }

    default LazySimpleReactStream<U> onFail(Function<? super SimpleReactFailedStageException, ? extends U> function) {
        return onFail(Throwable.class, (Function) function);
    }

    default LazySimpleReactStream<U> onFail(Class<? extends Throwable> cls, Function<? super SimpleReactFailedStageException, ? extends U> function) {
        return (LazySimpleReactStream<U>) withLastActive(getLastActive().operation(pipelineBuilder -> {
            return pipelineBuilder.exceptionally(th -> {
                if (th instanceof FilteredExecutionPathException) {
                    throw ((FilteredExecutionPathException) th);
                }
                Throwable th = th;
                if (th instanceof CompletionException) {
                    th = ((Exception) th).getCause();
                }
                SimpleReactFailedStageException assureSimpleReactException = assureSimpleReactException(th);
                if (cls.isAssignableFrom(assureSimpleReactException.getCause().getClass())) {
                    return function.apply(assureSimpleReactException);
                }
                throw assureSimpleReactException;
            });
        }));
    }

    static SimpleReactFailedStageException assureSimpleReactException(Throwable th) {
        return th instanceof SimpleReactFailedStageException ? (SimpleReactFailedStageException) th : new SimpleReactFailedStageException(null, th);
    }

    default LazySimpleReactStream<U> capture(Consumer<Throwable> consumer) {
        return withErrorHandler(Optional.of(consumer));
    }

    /* bridge */ /* synthetic */ default ConfigurableStream withErrorHandler(Optional optional) {
        return withErrorHandler((Optional<Consumer<Throwable>>) optional);
    }

    /* bridge */ /* synthetic */ default BaseSimpleReactStream capture(Consumer consumer) {
        return capture((Consumer<Throwable>) consumer);
    }

    /* bridge */ /* synthetic */ default BaseSimpleReactStream onFail(Class cls, Function function) {
        return onFail((Class<? extends Throwable>) cls, function);
    }
}
