/*
 * Decompiled with CFR 0.152.
 */
package org.apache.dubbo.rpc.protocol.tri.reactive.calls;

import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import org.apache.dubbo.common.stream.StreamObserver;
import org.apache.dubbo.rpc.protocol.tri.observer.CallStreamObserver;
import org.apache.dubbo.rpc.protocol.tri.observer.ServerCallToObserverAdapter;
import org.apache.dubbo.rpc.protocol.tri.reactive.ServerTripleReactorPublisher;
import org.apache.dubbo.rpc.protocol.tri.reactive.ServerTripleReactorSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public final class ReactorServerCalls {
    private ReactorServerCalls() {
    }

    public static <T, R> void oneToOne(T request, StreamObserver<R> responseObserver, Function<Mono<T>, Mono<R>> func) {
        func.apply(Mono.just(request)).subscribe(res -> CompletableFuture.completedFuture(res).whenComplete((r, t) -> {
            if (t != null) {
                responseObserver.onError((Throwable)t);
            } else {
                responseObserver.onNext(r);
                responseObserver.onCompleted();
            }
        }));
    }

    public static <T, R> void oneToMany(T request, StreamObserver<R> responseObserver, Function<Mono<T>, Flux<R>> func) {
        try {
            Flux<R> response = func.apply(Mono.just(request));
            ServerTripleReactorSubscriber subscriber = (ServerTripleReactorSubscriber)response.subscribeWith(new ServerTripleReactorSubscriber());
            subscriber.subscribe((ServerCallToObserverAdapter)responseObserver);
        }
        catch (Throwable throwable) {
            responseObserver.onError(throwable);
        }
    }

    public static <T, R> StreamObserver<T> manyToOne(StreamObserver<R> responseObserver, Function<Flux<T>, Mono<R>> func) {
        ServerTripleReactorPublisher serverPublisher = new ServerTripleReactorPublisher((CallStreamObserver)responseObserver);
        try {
            Mono<R> responseMono = func.apply(Flux.from(serverPublisher));
            responseMono.subscribe(value -> {
                if (!serverPublisher.isCancelled()) {
                    responseObserver.onNext(value);
                }
            }, throwable -> {
                if (!serverPublisher.isCancelled()) {
                    responseObserver.onError((Throwable)throwable);
                }
            }, responseObserver::onCompleted);
            serverPublisher.startRequest();
        }
        catch (Throwable throwable2) {
            responseObserver.onError(throwable2);
        }
        return serverPublisher;
    }

    public static <T, R> StreamObserver<T> manyToMany(StreamObserver<R> responseObserver, Function<Flux<T>, Flux<R>> func) {
        ServerTripleReactorPublisher serverPublisher = new ServerTripleReactorPublisher((CallStreamObserver)responseObserver);
        try {
            Flux<R> responseFlux = func.apply(Flux.from(serverPublisher));
            ServerTripleReactorSubscriber serverSubscriber = (ServerTripleReactorSubscriber)responseFlux.subscribeWith(new ServerTripleReactorSubscriber());
            serverSubscriber.subscribe((CallStreamObserver)responseObserver);
            serverPublisher.startRequest();
        }
        catch (Throwable throwable) {
            responseObserver.onError(throwable);
        }
        return serverPublisher;
    }
}

