/*
 * Decompiled with CFR 0.152.
 */
package org.apache.dubbo.rpc.protocol.tri;

import io.netty.util.AsciiString;
import java.util.Arrays;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.stream.StreamObserver;
import org.apache.dubbo.remoting.api.Connection;
import org.apache.dubbo.remoting.api.ConnectionManager;
import org.apache.dubbo.rpc.AppResponse;
import org.apache.dubbo.rpc.AsyncRpcResult;
import org.apache.dubbo.rpc.CancellationContext;
import org.apache.dubbo.rpc.FutureContext;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcContext;
import org.apache.dubbo.rpc.StatusRpcException;
import org.apache.dubbo.rpc.TimeoutCountDown;
import org.apache.dubbo.rpc.TriRpcStatus;
import org.apache.dubbo.rpc.model.ConsumerModel;
import org.apache.dubbo.rpc.model.MethodDescriptor;
import org.apache.dubbo.rpc.model.PackableMethod;
import org.apache.dubbo.rpc.model.ServiceDescriptor;
import org.apache.dubbo.rpc.model.StubMethodDescriptor;
import org.apache.dubbo.rpc.protocol.AbstractInvoker;
import org.apache.dubbo.rpc.protocol.tri.CancelableStreamObserver;
import org.apache.dubbo.rpc.protocol.tri.DeadlineFuture;
import org.apache.dubbo.rpc.protocol.tri.ReflectionPackableMethod;
import org.apache.dubbo.rpc.protocol.tri.RequestMetadata;
import org.apache.dubbo.rpc.protocol.tri.TripleConstant;
import org.apache.dubbo.rpc.protocol.tri.call.ClientCall;
import org.apache.dubbo.rpc.protocol.tri.call.ObserverToClientCallListenerAdapter;
import org.apache.dubbo.rpc.protocol.tri.call.TripleClientCall;
import org.apache.dubbo.rpc.protocol.tri.call.UnaryClientCallListener;
import org.apache.dubbo.rpc.protocol.tri.compressor.Compressor;
import org.apache.dubbo.rpc.protocol.tri.observer.ClientCallToObserverAdapter;
import org.apache.dubbo.rpc.support.RpcUtils;

public class TripleInvoker<T>
extends AbstractInvoker<T> {
    private static final Logger LOGGER = LoggerFactory.getLogger(TripleInvoker.class);
    private final Connection connection;
    private final ReentrantLock destroyLock = new ReentrantLock();
    private final Set<Invoker<?>> invokers;
    private final ExecutorService streamExecutor;
    private final String acceptEncodings;

    public TripleInvoker(Class<T> serviceType, URL url, String acceptEncodings, ConnectionManager connectionManager, Set<Invoker<?>> invokers, ExecutorService streamExecutor) {
        super(serviceType, url, new String[]{"interface", "group", "token"});
        this.invokers = invokers;
        this.connection = connectionManager.connect(url);
        this.acceptEncodings = acceptEncodings;
        this.streamExecutor = streamExecutor;
    }

    private static AsciiString getSchemeFromUrl(URL url) {
        boolean ssl = url.getParameter("ssl-enabled", false);
        return ssl ? TripleConstant.HTTPS_SCHEME : TripleConstant.HTTP_SCHEME;
    }

    @Override
    protected Result doInvoke(Invocation invocation) {
        if (!this.connection.isAvailable()) {
            CompletableFuture<AppResponse> future = new CompletableFuture<AppResponse>();
            StatusRpcException exception = TriRpcStatus.UNAVAILABLE.withDescription(String.format("upstream %s is unavailable", this.getUrl().getAddress())).asException();
            future.completeExceptionally(exception);
            return new AsyncRpcResult(future, invocation);
        }
        ConsumerModel consumerModel = (ConsumerModel)(invocation.getServiceModel() != null ? invocation.getServiceModel() : this.getUrl().getServiceModel());
        ServiceDescriptor serviceDescriptor = consumerModel.getServiceModel();
        MethodDescriptor methodDescriptor = serviceDescriptor.getMethod(invocation.getMethodName(), invocation.getParameterTypes());
        TripleClientCall call = new TripleClientCall(this.connection, this.streamExecutor, this.getUrl().getOrDefaultFrameworkModel());
        try {
            AsyncRpcResult result;
            switch (methodDescriptor.getRpcType()) {
                case UNARY: {
                    result = this.invokeUnary(methodDescriptor, invocation, call);
                    break;
                }
                case SERVER_STREAM: {
                    result = this.invokeServerStream(methodDescriptor, invocation, call);
                    break;
                }
                case CLIENT_STREAM: 
                case BI_STREAM: {
                    result = this.invokeBiOrClientStream(methodDescriptor, invocation, call);
                    break;
                }
                default: {
                    throw new IllegalStateException("Can not reach here");
                }
            }
            return result;
        }
        catch (Throwable t) {
            TriRpcStatus status = TriRpcStatus.INTERNAL.withCause(t).withDescription("Call aborted cause client exception");
            StatusRpcException e = status.asException();
            try {
                call.cancelByLocal(e);
            }
            catch (Throwable t1) {
                LOGGER.error("Cancel triple request failed", t1);
            }
            CompletableFuture<AppResponse> future = new CompletableFuture<AppResponse>();
            future.completeExceptionally(e);
            return new AsyncRpcResult(future, invocation);
        }
    }

    AsyncRpcResult invokeServerStream(MethodDescriptor methodDescriptor, Invocation invocation, ClientCall call) {
        RequestMetadata request = this.createRequest(methodDescriptor, invocation, null);
        StreamObserver responseObserver = (StreamObserver)invocation.getArguments()[1];
        StreamObserver<Object> requestObserver = this.streamCall(call, request, responseObserver);
        requestObserver.onNext(invocation.getArguments()[0]);
        requestObserver.onCompleted();
        return new AsyncRpcResult(CompletableFuture.completedFuture(new AppResponse()), invocation);
    }

    AsyncRpcResult invokeBiOrClientStream(MethodDescriptor methodDescriptor, Invocation invocation, ClientCall call) {
        RequestMetadata request = this.createRequest(methodDescriptor, invocation, null);
        StreamObserver responseObserver = (StreamObserver)invocation.getArguments()[0];
        StreamObserver<Object> requestObserver = this.streamCall(call, request, responseObserver);
        AsyncRpcResult result = new AsyncRpcResult(CompletableFuture.completedFuture(new AppResponse(requestObserver)), invocation);
        return result;
    }

    StreamObserver<Object> streamCall(ClientCall call, RequestMetadata metadata, StreamObserver<Object> responseObserver) {
        ObserverToClientCallListenerAdapter listener = new ObserverToClientCallListenerAdapter(responseObserver);
        StreamObserver<Object> streamObserver = call.start(metadata, listener);
        if (responseObserver instanceof CancelableStreamObserver) {
            CancellationContext context = new CancellationContext();
            CancelableStreamObserver cancelableStreamObserver = (CancelableStreamObserver)responseObserver;
            cancelableStreamObserver.setCancellationContext(context);
            context.addListener(context1 -> call.cancelByLocal(new IllegalStateException("Canceled by app")));
            listener.setOnStartConsumer(dummy -> cancelableStreamObserver.startRequest());
            cancelableStreamObserver.beforeStart((ClientCallToObserverAdapter)streamObserver);
        }
        return streamObserver;
    }

    AsyncRpcResult invokeUnary(MethodDescriptor methodDescriptor, Invocation invocation, ClientCall call) {
        ExecutorService callbackExecutor = this.getCallbackExecutor(this.getUrl(), invocation);
        int timeout = this.calculateTimeout(invocation, invocation.getMethodName());
        invocation.setAttachment("timeout", timeout);
        DeadlineFuture future = DeadlineFuture.newFuture(this.getUrl().getPath(), methodDescriptor.getMethodName(), this.getUrl().getAddress(), timeout, callbackExecutor);
        RequestMetadata request = this.createRequest(methodDescriptor, invocation, timeout);
        Object pureArgument = methodDescriptor instanceof StubMethodDescriptor ? invocation.getArguments()[0] : invocation.getArguments();
        AsyncRpcResult result = new AsyncRpcResult(future, invocation);
        FutureContext.getContext().setCompatibleFuture(future);
        result.setExecutor(callbackExecutor);
        UnaryClientCallListener callListener = new UnaryClientCallListener(future);
        StreamObserver<Object> requestObserver = call.start(request, callListener);
        requestObserver.onNext(pureArgument);
        requestObserver.onCompleted();
        return result;
    }

    RequestMetadata createRequest(MethodDescriptor methodDescriptor, Invocation invocation, Integer timeout) {
        String application;
        String methodName = RpcUtils.getMethodName(invocation);
        Objects.requireNonNull(methodDescriptor, "MethodDescriptor not found for" + methodName + " params:" + Arrays.toString(invocation.getCompatibleParamSignatures()));
        RequestMetadata meta = new RequestMetadata();
        URL url = this.getUrl();
        meta.packableMethod = methodDescriptor instanceof PackableMethod ? (PackableMethod)((Object)methodDescriptor) : ReflectionPackableMethod.init(methodDescriptor, url);
        meta.method = methodDescriptor;
        meta.scheme = TripleInvoker.getSchemeFromUrl(url);
        meta.compressor = Compressor.NONE;
        meta.cancellationContext = RpcContext.getCancellationContext();
        meta.address = url.getAddress();
        meta.service = url.getPath();
        meta.group = url.getGroup();
        meta.version = url.getVersion();
        meta.acceptEncoding = this.acceptEncodings;
        if (timeout != null) {
            meta.timeout = timeout + "m";
        }
        if ((application = (String)invocation.getObjectAttachmentWithoutConvert("application")) == null) {
            application = (String)invocation.getObjectAttachmentWithoutConvert("remote.application");
        }
        meta.application = application;
        meta.attachments = invocation.getObjectAttachments();
        return meta;
    }

    @Override
    public boolean isAvailable() {
        if (!super.isAvailable()) {
            return false;
        }
        return this.connection.isAvailable();
    }

    @Override
    public void destroy() {
        if (!super.isDestroyed()) {
            this.destroyLock.lock();
            try {
                if (super.isDestroyed()) {
                    return;
                }
                super.destroy();
                if (this.invokers != null) {
                    this.invokers.remove(this);
                }
                try {
                    this.connection.release();
                }
                catch (Throwable t) {
                    logger.warn(t.getMessage(), t);
                }
            }
            finally {
                this.destroyLock.unlock();
            }
        }
    }

    private int calculateTimeout(Invocation invocation, String methodName) {
        int timeout;
        if (invocation.getObjectAttachment("timeout") != null) {
            return (Integer)invocation.getObjectAttachment("timeout");
        }
        Object countdown = RpcContext.getClientAttachment().getObjectAttachment("timeout-countdown");
        if (countdown == null) {
            timeout = (int)RpcUtils.getTimeout(this.getUrl(), methodName, RpcContext.getClientAttachment(), 3000L);
            if (this.getUrl().getParameter("enable-timeout-countdown", false)) {
                invocation.setObjectAttachment("_TO", timeout);
            }
        } else {
            TimeoutCountDown timeoutCountDown = (TimeoutCountDown)countdown;
            timeout = (int)timeoutCountDown.timeRemaining(TimeUnit.MILLISECONDS);
            invocation.setObjectAttachment("_TO", timeout);
        }
        return timeout;
    }
}

