/*
 * Decompiled with CFR 0.152.
 */
package org.apache.dubbo.rpc.protocol.tri;

import io.netty.util.AsciiString;
import java.util.Arrays;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.config.Configuration;
import org.apache.dubbo.common.config.ConfigurationUtils;
import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.stream.StreamObserver;
import org.apache.dubbo.common.threadpool.ThreadlessExecutor;
import org.apache.dubbo.remoting.api.connection.AbstractConnectionClient;
import org.apache.dubbo.rpc.AppResponse;
import org.apache.dubbo.rpc.AsyncRpcResult;
import org.apache.dubbo.rpc.CancellationContext;
import org.apache.dubbo.rpc.FutureContext;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.InvokeMode;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcContext;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.RpcInvocation;
import org.apache.dubbo.rpc.StatusRpcException;
import org.apache.dubbo.rpc.TriRpcStatus;
import org.apache.dubbo.rpc.model.ApplicationModel;
import org.apache.dubbo.rpc.model.ConsumerModel;
import org.apache.dubbo.rpc.model.MethodDescriptor;
import org.apache.dubbo.rpc.model.PackableMethod;
import org.apache.dubbo.rpc.model.PackableMethodFactory;
import org.apache.dubbo.rpc.model.ScopeModelUtil;
import org.apache.dubbo.rpc.model.ServiceDescriptor;
import org.apache.dubbo.rpc.model.StubMethodDescriptor;
import org.apache.dubbo.rpc.protocol.AbstractInvoker;
import org.apache.dubbo.rpc.protocol.tri.CancelableStreamObserver;
import org.apache.dubbo.rpc.protocol.tri.DeadlineFuture;
import org.apache.dubbo.rpc.protocol.tri.RequestMetadata;
import org.apache.dubbo.rpc.protocol.tri.TripleConstant;
import org.apache.dubbo.rpc.protocol.tri.TripleProtocol;
import org.apache.dubbo.rpc.protocol.tri.call.ClientCall;
import org.apache.dubbo.rpc.protocol.tri.call.ObserverToClientCallListenerAdapter;
import org.apache.dubbo.rpc.protocol.tri.call.TripleClientCall;
import org.apache.dubbo.rpc.protocol.tri.call.UnaryClientCallListener;
import org.apache.dubbo.rpc.protocol.tri.compressor.Compressor;
import org.apache.dubbo.rpc.protocol.tri.observer.ClientCallToObserverAdapter;
import org.apache.dubbo.rpc.protocol.tri.transport.TripleWriteQueue;
import org.apache.dubbo.rpc.support.RpcUtils;

public class TripleInvoker<T>
extends AbstractInvoker<T> {
    private static final ErrorTypeAwareLogger LOGGER = LoggerFactory.getErrorTypeAwareLogger(TripleInvoker.class);
    private final AbstractConnectionClient connectionClient;
    private final ReentrantLock destroyLock = new ReentrantLock();
    private final Set<Invoker<?>> invokers;
    private final ExecutorService streamExecutor;
    private final String acceptEncodings;
    private final TripleWriteQueue writeQueue = new TripleWriteQueue(256);
    private static final boolean setFutureWhenSync = Boolean.parseBoolean(System.getProperty("future.sync.set", "true"));

    public TripleInvoker(Class<T> serviceType, URL url, String acceptEncodings, AbstractConnectionClient connectionClient, Set<Invoker<?>> invokers, ExecutorService streamExecutor) {
        super(serviceType, url, new String[]{"interface", "group", "token"});
        this.invokers = invokers;
        this.connectionClient = connectionClient;
        this.acceptEncodings = acceptEncodings;
        this.streamExecutor = streamExecutor;
    }

    private static AsciiString getSchemeFromUrl(URL url) {
        boolean ssl = url.getParameter("ssl-enabled", false);
        return ssl ? TripleConstant.HTTPS_SCHEME : TripleConstant.HTTP_SCHEME;
    }

    private static Compressor getCompressorFromEnv() {
        Configuration configuration = ConfigurationUtils.getEnvConfiguration(ApplicationModel.defaultModel());
        String compressorKey = configuration.getString("dubbo.rpc.tri.compressor", "identity");
        return Compressor.getCompressor(ScopeModelUtil.getFrameworkModel(ApplicationModel.defaultModel()), compressorKey);
    }

    @Override
    protected Result doInvoke(Invocation invocation) {
        if (!this.connectionClient.isConnected()) {
            CompletableFuture<AppResponse> future = new CompletableFuture<AppResponse>();
            StatusRpcException exception = TriRpcStatus.UNAVAILABLE.withDescription(String.format("upstream %s is unavailable", this.getUrl().getAddress())).asException();
            future.completeExceptionally(exception);
            return new AsyncRpcResult(future, invocation);
        }
        ConsumerModel consumerModel = (ConsumerModel)(invocation.getServiceModel() != null ? invocation.getServiceModel() : this.getUrl().getServiceModel());
        ServiceDescriptor serviceDescriptor = consumerModel.getServiceModel();
        MethodDescriptor methodDescriptor = serviceDescriptor.getMethod(invocation.getMethodName(), invocation.getParameterTypes());
        ExecutorService callbackExecutor = TripleInvoker.isSync(methodDescriptor, invocation) ? new ThreadlessExecutor() : this.streamExecutor;
        TripleClientCall call = new TripleClientCall(this.connectionClient, callbackExecutor, this.getUrl().getOrDefaultFrameworkModel(), this.writeQueue);
        try {
            AsyncRpcResult result;
            switch (methodDescriptor.getRpcType()) {
                case UNARY: {
                    result = this.invokeUnary(methodDescriptor, invocation, call, callbackExecutor);
                    break;
                }
                case SERVER_STREAM: {
                    result = this.invokeServerStream(methodDescriptor, invocation, call);
                    break;
                }
                case CLIENT_STREAM: 
                case BI_STREAM: {
                    result = this.invokeBiOrClientStream(methodDescriptor, invocation, call);
                    break;
                }
                default: {
                    throw new IllegalStateException("Can not reach here");
                }
            }
            return result;
        }
        catch (Throwable t) {
            TriRpcStatus status = TriRpcStatus.INTERNAL.withCause(t).withDescription("Call aborted cause client exception");
            StatusRpcException e = status.asException();
            try {
                call.cancelByLocal(e);
            }
            catch (Throwable t1) {
                LOGGER.error("4-11", "", "", "Cancel triple request failed", t1);
            }
            CompletableFuture<AppResponse> future = new CompletableFuture<AppResponse>();
            future.completeExceptionally(e);
            return new AsyncRpcResult(future, invocation);
        }
    }

    private static boolean isSync(MethodDescriptor methodDescriptor, Invocation invocation) {
        if (!(invocation instanceof RpcInvocation)) {
            return false;
        }
        RpcInvocation rpcInvocation = (RpcInvocation)invocation;
        MethodDescriptor.RpcType rpcType = methodDescriptor.getRpcType();
        return MethodDescriptor.RpcType.UNARY.equals((Object)rpcType) && InvokeMode.SYNC.equals((Object)rpcInvocation.getInvokeMode());
    }

    AsyncRpcResult invokeServerStream(MethodDescriptor methodDescriptor, Invocation invocation, ClientCall call) {
        RequestMetadata request = this.createRequest(methodDescriptor, invocation, null);
        StreamObserver responseObserver = (StreamObserver)invocation.getArguments()[1];
        StreamObserver<Object> requestObserver = this.streamCall(call, request, responseObserver);
        requestObserver.onNext(invocation.getArguments()[0]);
        requestObserver.onCompleted();
        return new AsyncRpcResult(CompletableFuture.completedFuture(new AppResponse()), invocation);
    }

    AsyncRpcResult invokeBiOrClientStream(MethodDescriptor methodDescriptor, Invocation invocation, ClientCall call) {
        RequestMetadata request = this.createRequest(methodDescriptor, invocation, null);
        StreamObserver responseObserver = (StreamObserver)invocation.getArguments()[0];
        StreamObserver<Object> requestObserver = this.streamCall(call, request, responseObserver);
        AsyncRpcResult result = new AsyncRpcResult(CompletableFuture.completedFuture(new AppResponse(requestObserver)), invocation);
        return result;
    }

    StreamObserver<Object> streamCall(ClientCall call, RequestMetadata metadata, StreamObserver<Object> responseObserver) {
        ObserverToClientCallListenerAdapter listener = new ObserverToClientCallListenerAdapter(responseObserver);
        StreamObserver<Object> streamObserver = call.start(metadata, listener);
        if (responseObserver instanceof CancelableStreamObserver) {
            CancellationContext context = new CancellationContext();
            CancelableStreamObserver cancelableStreamObserver = (CancelableStreamObserver)responseObserver;
            cancelableStreamObserver.setCancellationContext(context);
            context.addListener(context1 -> call.cancelByLocal(new IllegalStateException("Canceled by app")));
            listener.setOnStartConsumer(dummy -> cancelableStreamObserver.startRequest());
            cancelableStreamObserver.beforeStart((ClientCallToObserverAdapter)streamObserver);
        }
        return streamObserver;
    }

    AsyncRpcResult invokeUnary(MethodDescriptor methodDescriptor, Invocation invocation, ClientCall call, ExecutorService callbackExecutor) {
        int timeout = RpcUtils.calculateTimeout(this.getUrl(), invocation, RpcUtils.getMethodName(invocation), 3000L);
        if (timeout <= 0) {
            return AsyncRpcResult.newDefaultAsyncResult(new RpcException(8, "No time left for making the following call: " + invocation.getServiceName() + "." + RpcUtils.getMethodName(invocation) + ", terminate directly."), invocation);
        }
        invocation.setAttachment("timeout", String.valueOf(timeout));
        DeadlineFuture future = DeadlineFuture.newFuture(this.getUrl().getPath(), methodDescriptor.getMethodName(), this.getUrl().getAddress(), timeout, callbackExecutor);
        RequestMetadata request = this.createRequest(methodDescriptor, invocation, timeout);
        Object pureArgument = methodDescriptor instanceof StubMethodDescriptor ? invocation.getArguments()[0] : invocation.getArguments();
        AsyncRpcResult result = new AsyncRpcResult(future, invocation);
        if (setFutureWhenSync || ((RpcInvocation)invocation).getInvokeMode() != InvokeMode.SYNC) {
            FutureContext.getContext().setCompatibleFuture(future);
        }
        result.setExecutor(callbackExecutor);
        UnaryClientCallListener callListener = new UnaryClientCallListener(future);
        StreamObserver<Object> requestObserver = call.start(request, callListener);
        requestObserver.onNext(pureArgument);
        requestObserver.onCompleted();
        return result;
    }

    RequestMetadata createRequest(MethodDescriptor methodDescriptor, Invocation invocation, Integer timeout) {
        String application;
        String methodName = RpcUtils.getMethodName(invocation);
        Objects.requireNonNull(methodDescriptor, "MethodDescriptor not found for" + methodName + " params:" + Arrays.toString(invocation.getCompatibleParamSignatures()));
        RequestMetadata meta = new RequestMetadata();
        URL url = this.getUrl();
        meta.packableMethod = methodDescriptor instanceof PackableMethod ? (PackableMethod)((Object)methodDescriptor) : url.getOrDefaultFrameworkModel().getExtensionLoader(PackableMethodFactory.class).getExtension(ConfigurationUtils.getGlobalConfiguration(url.getApplicationModel()).getString("dubbo.application.parameters.serialize.packable.factory", "default")).create(methodDescriptor, url, "application/grpc+proto");
        meta.convertNoLowerHeader = TripleProtocol.CONVERT_NO_LOWER_HEADER;
        meta.ignoreDefaultVersion = TripleProtocol.IGNORE_1_0_0_VERSION;
        meta.method = methodDescriptor;
        meta.scheme = TripleInvoker.getSchemeFromUrl(url);
        meta.compressor = TripleInvoker.getCompressorFromEnv();
        meta.cancellationContext = RpcContext.getCancellationContext();
        meta.address = url.getAddress();
        meta.service = url.getPath();
        meta.group = url.getGroup();
        meta.version = url.getVersion();
        meta.acceptEncoding = this.acceptEncodings;
        if (timeout != null) {
            meta.timeout = timeout + "m";
        }
        if ((application = (String)invocation.getObjectAttachmentWithoutConvert("application")) == null) {
            application = (String)invocation.getObjectAttachmentWithoutConvert("remote.application");
        }
        meta.application = application;
        meta.attachments = invocation.getObjectAttachments();
        return meta;
    }

    @Override
    public boolean isAvailable() {
        if (!super.isAvailable()) {
            return false;
        }
        return this.connectionClient.isConnected();
    }

    @Override
    public void destroy() {
        if (!super.isDestroyed()) {
            this.destroyLock.lock();
            try {
                if (super.isDestroyed()) {
                    return;
                }
                super.destroy();
                if (this.invokers != null) {
                    this.invokers.remove(this);
                }
                try {
                    this.connectionClient.release();
                }
                catch (Throwable t) {
                    logger.warn("4-17", "", "", t.getMessage(), t);
                }
            }
            finally {
                this.destroyLock.unlock();
            }
        }
    }
}

