/*
 * Decompiled with CFR 0.152.
 */
package org.apache.dubbo.rpc.protocol.tri;

import com.google.protobuf.ByteString;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaderValues;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http2.Http2Headers;
import io.netty.util.AsciiString;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.RejectedExecutionException;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.config.ConfigurationUtils;
import org.apache.dubbo.common.serialize.MultipleSerialization;
import org.apache.dubbo.common.stream.StreamObserver;
import org.apache.dubbo.common.utils.CollectionUtils;
import org.apache.dubbo.remoting.api.Connection;
import org.apache.dubbo.remoting.exchange.Request;
import org.apache.dubbo.remoting.exchange.support.DefaultFuture2;
import org.apache.dubbo.rpc.CancellationContext;
import org.apache.dubbo.rpc.RpcInvocation;
import org.apache.dubbo.rpc.model.ConsumerModel;
import org.apache.dubbo.rpc.model.MethodDescriptor;
import org.apache.dubbo.rpc.model.ServiceModel;
import org.apache.dubbo.rpc.protocol.tri.AbstractStream;
import org.apache.dubbo.rpc.protocol.tri.CancelableStreamObserver;
import org.apache.dubbo.rpc.protocol.tri.ClassLoadUtil;
import org.apache.dubbo.rpc.protocol.tri.ClientOutboundTransportObserver;
import org.apache.dubbo.rpc.protocol.tri.ClientStream;
import org.apache.dubbo.rpc.protocol.tri.ClientStreamObserver;
import org.apache.dubbo.rpc.protocol.tri.Compressor;
import org.apache.dubbo.rpc.protocol.tri.DefaultMetadata;
import org.apache.dubbo.rpc.protocol.tri.GrpcStatus;
import org.apache.dubbo.rpc.protocol.tri.Metadata;
import org.apache.dubbo.rpc.protocol.tri.Stream;
import org.apache.dubbo.rpc.protocol.tri.TripleConstant;
import org.apache.dubbo.rpc.protocol.tri.TripleHeaderEnum;
import org.apache.dubbo.rpc.protocol.tri.UnaryClientStream;
import org.apache.dubbo.rpc.protocol.tri.WriteQueue;
import org.apache.dubbo.triple.TripleWrapper;

public abstract class AbstractClientStream
extends AbstractStream
implements Stream {
    private final AsciiString scheme;
    private ConsumerModel consumerModel;
    private Connection connection;
    private RpcInvocation rpcInvocation;
    private long requestId;

    protected AbstractClientStream(URL url) {
        super(url);
        this.scheme = this.getSchemeFromUrl(url);
        this.getCancellationContext().addListener(context -> {
            Throwable throwable = this.getCancellationContext().getCancellationCause();
            if (LOGGER.isWarnEnabled()) {
                LOGGER.warn("Triple request to " + this.getConsumerModel().getServiceName() + "#" + this.getMethodName() + " was canceled by local exception ", throwable);
            }
            this.outboundTransportObserver().onError(GrpcStatus.fromCode(GrpcStatus.Code.CANCELLED).withCause(throwable));
        });
    }

    public static UnaryClientStream unary(URL url) {
        return new UnaryClientStream(url);
    }

    public static ClientStream stream(URL url) {
        return new ClientStream(url);
    }

    public static AbstractClientStream newClientStream(Request req, Connection connection) {
        RpcInvocation inv = (RpcInvocation)req.getData();
        URL url = inv.getInvoker().getUrl();
        ConsumerModel consumerModel = inv.getServiceModel() != null ? (ConsumerModel)inv.getServiceModel() : (ConsumerModel)url.getServiceModel();
        MethodDescriptor methodDescriptor = AbstractClientStream.getTriMethodDescriptor(consumerModel, inv);
        ClassLoadUtil.switchContextLoader(consumerModel.getClassLoader());
        AbstractClientStream stream = methodDescriptor.isUnary() ? AbstractClientStream.unary(url) : AbstractClientStream.stream(url);
        Compressor compressor = AbstractClientStream.getCompressor(url, consumerModel);
        stream.request(req).service(consumerModel).connection(connection).serialize((String)inv.getObjectAttachment("serialization")).method(methodDescriptor).setCompressor(compressor);
        return stream;
    }

    private static Compressor getCompressor(URL url, ServiceModel model) {
        String compressorStr = url.getParameter("dubbo.rpc.tri.compressor");
        if (compressorStr == null) {
            compressorStr = ConfigurationUtils.getCachedDynamicProperty(model.getModuleModel(), "dubbo.rpc.tri.compressor", "identity");
        }
        return Compressor.getCompressor(url.getOrDefaultFrameworkModel(), compressorStr);
    }

    private static MethodDescriptor getTriMethodDescriptor(ConsumerModel consumerModel, RpcInvocation inv) {
        List<MethodDescriptor> methodDescriptors = consumerModel.getServiceModel().getMethods(inv.getMethodName());
        if (CollectionUtils.isEmpty(methodDescriptors)) {
            throw new IllegalStateException("methodDescriptors must not be null method=" + inv.getMethodName());
        }
        for (MethodDescriptor methodDescriptor : methodDescriptors) {
            if (!Arrays.equals(inv.getParameterTypes(), methodDescriptor.getRealParameterClasses())) continue;
            return methodDescriptor;
        }
        throw new IllegalStateException("methodDescriptors must not be null method=" + inv.getMethodName());
    }

    protected void startCall(WriteQueue queue, ChannelPromise promise) {
        this.execute(() -> {
            ClientOutboundTransportObserver clientTransportObserver = new ClientOutboundTransportObserver(queue, promise);
            this.subscribe(clientTransportObserver);
            try {
                this.doOnStartCall();
            }
            catch (Throwable throwable) {
                this.cancel(throwable);
                DefaultFuture2.getFuture(this.getRequestId()).cancel();
            }
        });
    }

    protected abstract void doOnStartCall();

    @Override
    protected StreamObserver<Object> createStreamObserver() {
        return new ClientStreamObserverImpl(this.getCancellationContext());
    }

    @Override
    protected void cancelByRemoteReset() {
        DefaultFuture2.getFuture(this.getRequestId()).cancel();
    }

    @Override
    protected void cancelByLocal(Throwable throwable) {
        this.getCancellationContext().cancel(throwable);
    }

    @Override
    public void execute(Runnable runnable) {
        try {
            super.execute(runnable);
        }
        catch (RejectedExecutionException e) {
            LOGGER.error("Consumer's thread pool is full", e);
            this.outboundMessageSubscriber().onError(GrpcStatus.fromCode(GrpcStatus.Code.RESOURCE_EXHAUSTED).withDescription("Consumer's thread pool is full").asException());
        }
        catch (Throwable t) {
            LOGGER.error("Consumer submit request to thread pool error ", t);
            this.outboundMessageSubscriber().onError(GrpcStatus.fromCode(GrpcStatus.Code.INTERNAL).withCause(t).withDescription("Consumer's error").asException());
        }
    }

    public AbstractClientStream service(ConsumerModel model) {
        this.consumerModel = model;
        return this;
    }

    public AbstractClientStream request(Request request) {
        this.requestId = request.getId();
        this.rpcInvocation = (RpcInvocation)request.getData();
        return this;
    }

    protected RpcInvocation getRpcInvocation() {
        return this.rpcInvocation;
    }

    public AsciiString getScheme() {
        return this.scheme;
    }

    public long getRequestId() {
        return this.requestId;
    }

    private AsciiString getSchemeFromUrl(URL url) {
        try {
            Boolean ssl = url.getParameter("ssl-enabled", Boolean.class);
            if (ssl == null) {
                return TripleConstant.HTTP_SCHEME;
            }
            return ssl != false ? TripleConstant.HTTPS_SCHEME : TripleConstant.HTTP_SCHEME;
        }
        catch (Exception e) {
            return TripleConstant.HTTP_SCHEME;
        }
    }

    public ConsumerModel getConsumerModel() {
        return this.consumerModel;
    }

    public AbstractClientStream connection(Connection connection) {
        this.connection = connection;
        return this;
    }

    public Connection getConnection() {
        return this.connection;
    }

    protected byte[] encodeRequest(Object value) {
        Object obj = this.getMethodDescriptor().isNeedWrap() ? this.getRequestWrapper(value) : this.getRequestValue(value);
        byte[] out = this.pack(obj);
        return super.compress(out);
    }

    private TripleWrapper.TripleRequestWrapper getRequestWrapper(Object value) {
        if (this.getMethodDescriptor().isStream()) {
            String type = this.getMethodDescriptor().getParameterClasses()[0].getName();
            return this.wrapReq(this.getUrl(), this.getSerializeType(), value, type, this.getMultipleSerialization());
        }
        RpcInvocation invocation = (RpcInvocation)value;
        return this.wrapReq(this.getUrl(), invocation, this.getMultipleSerialization());
    }

    private TripleWrapper.TripleRequestWrapper wrapReq(URL url, RpcInvocation invocation, MultipleSerialization serialization) {
        try {
            String serializationName = (String)invocation.getObjectAttachment("serialization");
            TripleWrapper.TripleRequestWrapper.Builder builder = TripleWrapper.TripleRequestWrapper.newBuilder().setSerializeType(this.convertHessianToWrapper(serializationName));
            for (int i = 0; i < invocation.getArguments().length; ++i) {
                String clz = invocation.getParameterTypes()[i].getName();
                builder.addArgTypes(clz);
                ByteArrayOutputStream bos = new ByteArrayOutputStream();
                serialization.serialize(url, serializationName, clz, invocation.getArguments()[i], bos);
                builder.addArgs(ByteString.copyFrom((byte[])bos.toByteArray()));
            }
            return builder.build();
        }
        catch (IOException e) {
            throw new RuntimeException("Failed to pack wrapper req", e);
        }
    }

    public TripleWrapper.TripleRequestWrapper wrapReq(URL url, String serializeType, Object req, String type, MultipleSerialization multipleSerialization) {
        try {
            TripleWrapper.TripleRequestWrapper.Builder builder = TripleWrapper.TripleRequestWrapper.newBuilder().addArgTypes(type).setSerializeType(this.convertHessianToWrapper(serializeType));
            ByteArrayOutputStream bos = new ByteArrayOutputStream();
            multipleSerialization.serialize(url, serializeType, type, req, bos);
            builder.addArgs(ByteString.copyFrom((byte[])bos.toByteArray()));
            bos.close();
            return builder.build();
        }
        catch (IOException e) {
            throw new RuntimeException("Failed to pack wrapper req", e);
        }
    }

    private Object getRequestValue(Object value) {
        if (this.getMethodDescriptor().isUnary()) {
            RpcInvocation invocation = (RpcInvocation)value;
            return invocation.getArguments()[0];
        }
        return value;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected Object deserializeResponse(byte[] data) {
        ClassLoader tccl = Thread.currentThread().getContextClassLoader();
        try {
            if (this.getConsumerModel() != null) {
                ClassLoadUtil.switchContextLoader(this.getConsumerModel().getClassLoader());
            }
            if (this.getMethodDescriptor().isNeedWrap()) {
                TripleWrapper.TripleResponseWrapper wrapper = this.unpack(data, TripleWrapper.TripleResponseWrapper.class);
                if (!this.getSerializeType().equals(this.convertHessianFromWrapper(wrapper.getSerializeType()))) {
                    throw new UnsupportedOperationException("Received inconsistent serialization type from server, reject to deserialize! Expected:" + this.getSerializeType() + " Actual:" + this.convertHessianFromWrapper(wrapper.getSerializeType()));
                }
                Object object = this.unwrapResp(this.getUrl(), wrapper, this.getMultipleSerialization());
                return object;
            }
            Object obj = this.unpack(data, this.getMethodDescriptor().getReturnClass());
            return obj;
        }
        finally {
            ClassLoadUtil.switchContextLoader(tccl);
        }
    }

    public Object unwrapResp(URL url, TripleWrapper.TripleResponseWrapper wrap, MultipleSerialization serialization) {
        String serializeType = this.convertHessianFromWrapper(wrap.getSerializeType());
        try {
            ByteArrayInputStream bais = new ByteArrayInputStream(wrap.getData().toByteArray());
            Object ret = serialization.deserialize(url, serializeType, wrap.getType(), bais);
            bais.close();
            return ret;
        }
        catch (Exception e) {
            throw new RuntimeException("Failed to unwrap resp", e);
        }
    }

    protected Metadata createRequestMeta(RpcInvocation inv) {
        DefaultMetadata metadata = new DefaultMetadata();
        metadata.put((CharSequence)Http2Headers.PseudoHeaderName.SCHEME.value(), (CharSequence)this.getScheme()).put((CharSequence)Http2Headers.PseudoHeaderName.PATH.value(), this.getMethodPath(inv)).put((CharSequence)Http2Headers.PseudoHeaderName.AUTHORITY.value(), this.getUrl().getAddress()).put((CharSequence)Http2Headers.PseudoHeaderName.METHOD.value(), (CharSequence)HttpMethod.POST.asciiName());
        metadata.put(TripleHeaderEnum.CONTENT_TYPE_KEY.getHeader(), "application/grpc+proto").put(TripleHeaderEnum.TIMEOUT.getHeader(), inv.get("timeout") + "m").put((CharSequence)HttpHeaderNames.TE, (CharSequence)HttpHeaderValues.TRAILERS);
        metadata.putIfNotNull(TripleHeaderEnum.SERVICE_VERSION.getHeader(), this.getUrl().getVersion()).putIfNotNull(TripleHeaderEnum.CONSUMER_APP_NAME_KEY.getHeader(), (String)inv.getObjectAttachments().remove("application")).putIfNotNull(TripleHeaderEnum.CONSUMER_APP_NAME_KEY.getHeader(), (String)inv.getObjectAttachments().remove("remote.application")).putIfNotNull(TripleHeaderEnum.SERVICE_GROUP.getHeader(), this.getUrl().getGroup()).putIfNotNull(TripleHeaderEnum.GRPC_ENCODING.getHeader(), this.getCompressor().getMessageEncoding()).putIfNotNull(TripleHeaderEnum.GRPC_ACCEPT_ENCODING.getHeader(), Compressor.getAcceptEncoding(this.getUrl().getOrDefaultFrameworkModel()));
        Map<String, Object> attachments = inv.getObjectAttachments();
        if (attachments != null) {
            this.convertAttachment(metadata, attachments);
        }
        return metadata;
    }

    private String getMethodPath(RpcInvocation inv) {
        return "/" + inv.getObjectAttachment("path") + "/" + inv.getMethodName();
    }

    protected class ClientStreamObserverImpl
    extends CancelableStreamObserver<Object>
    implements ClientStreamObserver<Object> {
        public ClientStreamObserverImpl(CancellationContext cancellationContext) {
            super(cancellationContext);
        }

        @Override
        public void onNext(Object data) {
            if (AbstractClientStream.this.getState().allowSendMeta()) {
                Metadata metadata = AbstractClientStream.this.createRequestMeta(AbstractClientStream.this.getRpcInvocation());
                AbstractClientStream.this.outboundTransportObserver().onMetadata(metadata, false);
            }
            if (AbstractClientStream.this.getState().allowSendData()) {
                byte[] bytes = AbstractClientStream.this.encodeRequest(data);
                AbstractClientStream.this.outboundTransportObserver().onData(bytes, false);
            }
        }

        @Override
        public void onError(Throwable throwable) {
            if (AbstractClientStream.this.getState().allowSendEndStream()) {
                GrpcStatus status = GrpcStatus.getStatus(throwable);
                AbstractClientStream.this.transportError(status, null, AbstractClientStream.this.getState().allowSendMeta());
            } else if (Stream.LOGGER.isErrorEnabled()) {
                Stream.LOGGER.error("Triple request to " + AbstractClientStream.this.getConsumerModel().getServiceName() + "#" + AbstractClientStream.this.getMethodName() + " was failed by exception ", throwable);
            }
        }

        @Override
        public void onCompleted() {
            if (AbstractClientStream.this.getState().allowSendEndStream()) {
                AbstractClientStream.this.outboundTransportObserver().onComplete();
            }
        }

        @Override
        public void setCompression(String compression) {
            if (!AbstractClientStream.this.getState().allowSendMeta()) {
                this.cancel(new IllegalStateException("Metadata already has been sent,can not set compression"));
                return;
            }
            Compressor compressor = Compressor.getCompressor(AbstractClientStream.this.getUrl().getOrDefaultFrameworkModel(), compression);
            AbstractClientStream.this.setCompressor(compressor);
        }
    }
}

