/*
 * Decompiled with CFR 0.152.
 */
package org.apache.dubbo.rpc.protocol.tri;

import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.stream.StreamObserver;
import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcContext;
import org.apache.dubbo.rpc.RpcInvocation;
import org.apache.dubbo.rpc.protocol.tri.AbstractServerStream;
import org.apache.dubbo.rpc.protocol.tri.Compressor;
import org.apache.dubbo.rpc.protocol.tri.GrpcStatus;
import org.apache.dubbo.rpc.protocol.tri.InboundTransportObserver;
import org.apache.dubbo.rpc.protocol.tri.Metadata;
import org.apache.dubbo.rpc.protocol.tri.ServerStreamObserver;
import org.apache.dubbo.rpc.protocol.tri.Stream;
import org.apache.dubbo.rpc.protocol.tri.TransportObserver;
import org.apache.dubbo.rpc.protocol.tri.TripleConstant;

public class ServerStream
extends AbstractServerStream
implements Stream {
    protected ServerStream(URL url) {
        super(url);
    }

    @Override
    protected StreamObserver<Object> createStreamObserver() {
        return new ServerStreamObserverImpl();
    }

    @Override
    protected InboundTransportObserver createInboundTransportObserver() {
        return new ServerStreamInboundTransportObserver();
    }

    private class ServerStreamInboundTransportObserver
    extends InboundTransportObserver
    implements TransportObserver {
        private ServerStreamInboundTransportObserver() {
        }

        @Override
        public void onMetadata(Metadata metadata, boolean endStream) {
            super.onMetadata(metadata, endStream);
            if (ServerStream.this.getMethodDescriptor().isServerStream()) {
                return;
            }
            ServerStream.this.execute(() -> {
                try {
                    RpcContext.restoreCancellationContext(ServerStream.this.getCancellationContext());
                    RpcInvocation inv = ServerStream.this.buildInvocation(metadata);
                    inv.setArguments(new Object[]{ServerStream.this.inboundMessageObserver()});
                    Result result = ServerStream.this.getInvoker().invoke(inv);
                    if (result.hasException()) {
                        ServerStream.this.transportError(GrpcStatus.getStatus(result.getException()));
                        return;
                    }
                    try {
                        ServerStream.this.subscribe((StreamObserver)result.getValue());
                    }
                    catch (Throwable t) {
                        ServerStream.this.transportError(GrpcStatus.fromCode(GrpcStatus.Code.INTERNAL).withDescription("Failed to create server's observer"));
                    }
                }
                finally {
                    RpcContext.removeCancellationContext();
                }
            });
        }

        @Override
        public void onData(byte[] in, boolean endStream) {
            ServerStream.this.execute(() -> {
                try {
                    if (ServerStream.this.getMethodDescriptor().isServerStream()) {
                        this.serverStreamOnData(in);
                        return;
                    }
                    this.biStreamOnData(in);
                }
                catch (Throwable t) {
                    ServerStream.this.transportError(GrpcStatus.fromCode(GrpcStatus.Code.INTERNAL).withDescription("Deserialize request failed").withCause(t));
                }
            });
        }

        @Override
        public void onError(GrpcStatus status) {
        }

        private void biStreamOnData(byte[] in) {
            Object[] arguments = ServerStream.this.deserializeRequest(in);
            if (arguments != null) {
                ServerStream.this.outboundMessageSubscriber().onNext(arguments[0]);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void serverStreamOnData(byte[] in) {
            try {
                RpcContext.restoreCancellationContext(ServerStream.this.getCancellationContext());
                RpcInvocation inv = ServerStream.this.buildInvocation(this.getHeaders());
                Object[] arguments = ServerStream.this.deserializeRequest(in);
                if (arguments != null) {
                    inv.setArguments(new Object[]{arguments[0], ServerStream.this.inboundMessageObserver()});
                    Result result = ServerStream.this.getInvoker().invoke(inv);
                    if (result.hasException()) {
                        ServerStream.this.transportError(GrpcStatus.getStatus(result.getException()));
                    }
                }
            }
            finally {
                RpcContext.removeCancellationContext();
            }
        }

        @Override
        public void onComplete() {
            if (ServerStream.this.getMethodDescriptor().isServerStream()) {
                return;
            }
            ServerStream.this.execute(() -> ServerStream.this.outboundMessageSubscriber().onCompleted());
        }
    }

    private class ServerStreamObserverImpl
    implements ServerStreamObserver<Object> {
        private ServerStreamObserverImpl() {
        }

        @Override
        public void onNext(Object data) {
            byte[] bytes;
            if (ServerStream.this.getState().allowSendMeta()) {
                ServerStream.this.outboundTransportObserver().onMetadata(ServerStream.this.createResponseMeta(), false);
            }
            if ((bytes = ServerStream.this.encodeResponse(data)) == null) {
                return;
            }
            if (ServerStream.this.getState().allowSendData()) {
                ServerStream.this.outboundTransportObserver().onData(bytes, false);
            }
        }

        @Override
        public void onError(Throwable throwable) {
            if (!ServerStream.this.getState().allowSendEndStream()) {
                return;
            }
            GrpcStatus status = GrpcStatus.fromCode(GrpcStatus.Code.INTERNAL).withCause(throwable).withDescription("Biz exception");
            ServerStream.this.transportError(status);
        }

        @Override
        public void onCompleted() {
            if (!ServerStream.this.getState().allowSendEndStream()) {
                return;
            }
            ServerStream.this.outboundTransportObserver().onMetadata(TripleConstant.SUCCESS_RESPONSE_META, true);
        }

        @Override
        public void setCompression(String compression) {
            if (!ServerStream.this.getState().allowSendMeta()) {
                GrpcStatus status = GrpcStatus.fromCode(GrpcStatus.Code.INTERNAL).withDescription("Metadata already has been sent,can not set compression");
                ServerStream.this.transportError(status);
                return;
            }
            Compressor compressor = Compressor.getCompressor(ServerStream.this.getUrl().getOrDefaultFrameworkModel(), compression);
            ServerStream.this.setCompressor(compressor);
        }
    }
}

