/*
 * Decompiled with CFR 0.152.
 */
package org.apache.dubbo.rpc.protocol.tri.call;

import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http2.DefaultHttp2Headers;
import io.netty.handler.codec.http2.Http2Headers;
import io.netty.util.concurrent.Future;
import java.io.IOException;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.rpc.CancellationContext;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.RpcContext;
import org.apache.dubbo.rpc.RpcInvocation;
import org.apache.dubbo.rpc.TriRpcStatus;
import org.apache.dubbo.rpc.model.FrameworkModel;
import org.apache.dubbo.rpc.model.MethodDescriptor;
import org.apache.dubbo.rpc.model.PackableMethod;
import org.apache.dubbo.rpc.model.ServiceDescriptor;
import org.apache.dubbo.rpc.protocol.tri.ClassLoadUtil;
import org.apache.dubbo.rpc.protocol.tri.TripleHeaderEnum;
import org.apache.dubbo.rpc.protocol.tri.call.AbstractServerCallListener;
import org.apache.dubbo.rpc.protocol.tri.call.BiStreamServerCallListener;
import org.apache.dubbo.rpc.protocol.tri.call.ServerCall;
import org.apache.dubbo.rpc.protocol.tri.call.ServerStreamServerCallListener;
import org.apache.dubbo.rpc.protocol.tri.call.UnaryServerCallListener;
import org.apache.dubbo.rpc.protocol.tri.compressor.Compressor;
import org.apache.dubbo.rpc.protocol.tri.observer.ServerCallToObserverAdapter;
import org.apache.dubbo.rpc.protocol.tri.stream.ServerStream;
import org.apache.dubbo.rpc.protocol.tri.stream.StreamUtils;

public abstract class AbstractServerCall
implements ServerCall,
ServerStream.Listener {
    public static final String REMOTE_ADDRESS_KEY = "tri.remote.address";
    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractServerCall.class);
    public final Invoker<?> invoker;
    public final FrameworkModel frameworkModel;
    public final ServerStream stream;
    public final Executor executor;
    public final String methodName;
    public final String serviceName;
    public final ServiceDescriptor serviceDescriptor;
    private final String acceptEncoding;
    public boolean autoRequestN = true;
    public Long timeout;
    ServerCall.Listener listener;
    private Compressor compressor;
    private boolean headerSent;
    private boolean closed;
    CancellationContext cancellationContext;
    protected MethodDescriptor methodDescriptor;
    protected PackableMethod packableMethod;
    protected Map<String, Object> requestMetadata;

    AbstractServerCall(Invoker<?> invoker, ServerStream stream, FrameworkModel frameworkModel, ServiceDescriptor serviceDescriptor, String acceptEncoding, String serviceName, String methodName, Executor executor) {
        Objects.requireNonNull(serviceDescriptor, "No service descriptor found for " + invoker.getUrl());
        this.invoker = invoker;
        this.executor = executor;
        this.frameworkModel = frameworkModel;
        this.serviceDescriptor = serviceDescriptor;
        this.serviceName = serviceName;
        this.methodName = methodName;
        this.stream = stream;
        this.acceptEncoding = acceptEncoding;
    }

    @Override
    public void onHeader(Map<String, Object> requestMetadata) {
        this.requestMetadata = requestMetadata;
        if (this.serviceDescriptor == null) {
            this.responseErr(TriRpcStatus.UNIMPLEMENTED.withDescription("Service not found:" + this.serviceName));
            return;
        }
        this.startCall();
    }

    protected void startCall() {
        RpcInvocation invocation = this.buildInvocation(this.methodDescriptor);
        this.listener = this.startInternalCall(invocation, this.methodDescriptor, this.invoker);
    }

    @Override
    public final void request(int numMessages) {
        this.stream.request(numMessages);
    }

    @Override
    public final void sendMessage(Object message) {
        if (this.closed) {
            throw new IllegalStateException("Stream has already canceled");
        }
        this.doSendMessage(message);
    }

    private void doSendMessage(Object message) {
        Future<?> future;
        byte[] data;
        if (this.closed) {
            return;
        }
        if (!this.headerSent) {
            this.sendHeader();
        }
        try {
            data = this.packableMethod.packResponse(message);
        }
        catch (Throwable e) {
            this.close(TriRpcStatus.INTERNAL.withDescription("Serialize response failed").withCause(e), null);
            LOGGER.error(String.format("Serialize triple response failed, service=%s method=%s", this.serviceName, this.methodName), e);
            return;
        }
        if (data == null) {
            this.close(TriRpcStatus.INTERNAL.withDescription("Missing response"), null);
            return;
        }
        if (this.compressor != null) {
            int compressedFlag = "identity".equals(this.compressor.getMessageEncoding()) ? 0 : 1;
            byte[] compressed = this.compressor.compress(data);
            future = this.stream.sendMessage(compressed, compressedFlag);
        } else {
            future = this.stream.sendMessage(data, 0);
        }
        future.addListener(f -> {
            if (!f.isSuccess()) {
                this.cancelDual(TriRpcStatus.CANCELLED.withDescription("Send message failed").withCause(f.cause()));
            }
        });
    }

    @Override
    public final void onComplete() {
        this.listener.onComplete();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public final void onMessage(byte[] message) {
        ClassLoader tccl = Thread.currentThread().getContextClassLoader();
        try {
            Object instance = this.parseSingleMessage(message);
            this.listener.onMessage(instance);
        }
        catch (Throwable t) {
            TriRpcStatus status = TriRpcStatus.UNKNOWN.withDescription("Server error").withCause(t);
            this.close(status, null);
            LOGGER.error("Process request failed. service=" + this.serviceName + " method=" + this.methodName, t);
        }
        finally {
            ClassLoadUtil.switchContextLoader(tccl);
        }
    }

    protected abstract Object parseSingleMessage(byte[] var1) throws IOException, ClassNotFoundException;

    @Override
    public final void onCancelByRemote(TriRpcStatus status) {
        this.closed = true;
        this.cancellationContext.cancel(status.cause);
        this.listener.onCancel(status);
    }

    public final boolean isClosed() {
        return this.closed;
    }

    protected RpcInvocation buildInvocation(MethodDescriptor methodDescriptor) {
        URL url = this.invoker.getUrl();
        RpcInvocation inv = new RpcInvocation(url.getServiceModel(), methodDescriptor.getMethodName(), this.serviceDescriptor.getInterfaceName(), url.getProtocolServiceKey(), methodDescriptor.getParameterClasses(), new Object[0]);
        inv.setTargetServiceUniqueName(url.getServiceKey());
        inv.setReturnTypes(methodDescriptor.getReturnTypes());
        inv.setObjectAttachments(StreamUtils.toAttachments(this.requestMetadata));
        inv.put((Object)REMOTE_ADDRESS_KEY, (Object)this.stream.remoteAddress());
        String timeout = (String)this.requestMetadata.get(TripleHeaderEnum.TIMEOUT.getHeader());
        try {
            if (Objects.nonNull(timeout)) {
                this.timeout = this.parseTimeoutToMills(timeout);
            }
        }
        catch (Throwable t) {
            LOGGER.warn(String.format("Failed to parse request timeout set from:%s, service=%s method=%s", timeout, this.serviceDescriptor.getInterfaceName(), this.methodName));
        }
        if (null != this.requestMetadata.get(TripleHeaderEnum.CONSUMER_APP_NAME_KEY.getHeader())) {
            inv.put((Object)TripleHeaderEnum.CONSUMER_APP_NAME_KEY, this.requestMetadata.get(TripleHeaderEnum.CONSUMER_APP_NAME_KEY.getHeader()));
        }
        return inv;
    }

    private void sendHeader() {
        if (this.closed) {
            return;
        }
        if (this.headerSent) {
            throw new IllegalStateException("Header has already sent");
        }
        this.headerSent = true;
        DefaultHttp2Headers headers = new DefaultHttp2Headers();
        headers.status((CharSequence)HttpResponseStatus.OK.codeAsText());
        headers.set((Object)HttpHeaderNames.CONTENT_TYPE, (Object)"application/grpc+proto");
        if (this.acceptEncoding != null) {
            headers.set((Object)HttpHeaderNames.ACCEPT_ENCODING, (Object)this.acceptEncoding);
        }
        if (this.compressor != null) {
            headers.set((Object)TripleHeaderEnum.GRPC_ENCODING.getHeader(), (Object)this.compressor.getMessageEncoding());
        }
        this.stream.sendHeader((Http2Headers)headers).addListener(f -> {
            if (!f.isSuccess()) {
                this.cancelDual(TriRpcStatus.INTERNAL.withCause(f.cause()));
            }
        });
    }

    private void cancelDual(TriRpcStatus status) {
        this.closed = true;
        this.listener.onCancel(status);
        this.cancellationContext.cancel((Throwable)status.asException());
    }

    public void cancelByLocal(Throwable throwable) {
        if (this.closed) {
            return;
        }
        this.closed = true;
        this.cancellationContext.cancel(throwable);
        this.stream.cancelByLocal(TriRpcStatus.CANCELLED.withCause(throwable));
    }

    public void setCompression(String compression) {
        if (this.headerSent) {
            throw new IllegalStateException("Can not set compression after header sent");
        }
        this.compressor = Compressor.getCompressor(this.frameworkModel, compression);
    }

    public void disableAutoRequestN() {
        this.autoRequestN = false;
    }

    public boolean isAutoRequestN() {
        return this.autoRequestN;
    }

    @Override
    public void close(TriRpcStatus status, Map<String, Object> attachments) {
        this.doClose(status, attachments);
    }

    private void doClose(TriRpcStatus status, Map<String, Object> attachments) {
        if (this.closed) {
            return;
        }
        this.closed = true;
        this.stream.complete(status, attachments);
    }

    protected Long parseTimeoutToMills(String timeoutVal) {
        if (StringUtils.isEmpty((String)timeoutVal) || StringUtils.isContains((String)timeoutVal, (String)"null")) {
            return null;
        }
        long value = Long.parseLong(timeoutVal.substring(0, timeoutVal.length() - 1));
        char unit = timeoutVal.charAt(timeoutVal.length() - 1);
        switch (unit) {
            case 'n': {
                return TimeUnit.NANOSECONDS.toMillis(value);
            }
            case 'u': {
                return TimeUnit.MICROSECONDS.toMillis(value);
            }
            case 'm': {
                return value;
            }
            case 'S': {
                return TimeUnit.SECONDS.toMillis(value);
            }
            case 'M': {
                return TimeUnit.MINUTES.toMillis(value);
            }
            case 'H': {
                return TimeUnit.HOURS.toMillis(value);
            }
        }
        return null;
    }

    protected void responseErr(TriRpcStatus status) {
        if (this.closed) {
            return;
        }
        this.closed = true;
        this.stream.complete(status, null);
        LOGGER.error("Triple request error: service=" + this.serviceName + " method" + this.methodName, (Throwable)status.asException());
    }

    protected ServerCall.Listener startInternalCall(RpcInvocation invocation, MethodDescriptor methodDescriptor, Invoker<?> invoker) {
        this.cancellationContext = RpcContext.getCancellationContext();
        ServerCallToObserverAdapter<Object> responseObserver = new ServerCallToObserverAdapter<Object>(this, this.cancellationContext);
        try {
            AbstractServerCallListener listener;
            switch (methodDescriptor.getRpcType()) {
                case UNARY: {
                    listener = new UnaryServerCallListener(invocation, invoker, responseObserver);
                    this.request(2);
                    break;
                }
                case SERVER_STREAM: {
                    listener = new ServerStreamServerCallListener(invocation, invoker, responseObserver);
                    this.request(2);
                    break;
                }
                case BI_STREAM: 
                case CLIENT_STREAM: {
                    listener = new BiStreamServerCallListener(invocation, invoker, responseObserver);
                    this.request(1);
                    break;
                }
                default: {
                    throw new IllegalStateException("Can not reach here");
                }
            }
            return listener;
        }
        catch (Throwable t) {
            LOGGER.error("Create triple stream failed", t);
            this.responseErr(TriRpcStatus.INTERNAL.withDescription("Create stream failed").withCause(t));
            return null;
        }
    }
}

