/*
 * Decompiled with CFR 0.152.
 */
package org.apache.dubbo.rpc.protocol.tri;

import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.stream.StreamObserver;
import org.apache.dubbo.remoting.exchange.Response;
import org.apache.dubbo.remoting.exchange.support.DefaultFuture2;
import org.apache.dubbo.rpc.AppResponse;
import org.apache.dubbo.rpc.CancellationContext;
import org.apache.dubbo.rpc.protocol.tri.AbstractClientStream;
import org.apache.dubbo.rpc.protocol.tri.CancelableStreamObserver;
import org.apache.dubbo.rpc.protocol.tri.GrpcStatus;
import org.apache.dubbo.rpc.protocol.tri.InboundTransportObserver;
import org.apache.dubbo.rpc.protocol.tri.Stream;

public class ClientStream
extends AbstractClientStream
implements Stream {
    protected ClientStream(URL url) {
        super(url);
    }

    @Override
    protected InboundTransportObserver createInboundTransportObserver() {
        return new ClientStreamInboundTransportObserverImpl();
    }

    @Override
    protected void doOnStartCall() {
        Response response = new Response(this.getRequestId(), "1.0.0");
        AppResponse result = this.getMethodDescriptor().isServerStream() ? this.callServerStream() : this.callBiStream();
        response.setResult(result);
        DefaultFuture2.received(this.getConnection(), response);
    }

    private AppResponse callServerStream() {
        StreamObserver<Object> obServer = (StreamObserver<Object>)this.getRpcInvocation().getArguments()[1];
        obServer = this.attachCancelContext(obServer, this.getCancellationContext());
        this.subscribe(obServer);
        this.inboundMessageObserver().onNext(this.getRpcInvocation().getArguments()[0]);
        this.inboundMessageObserver().onCompleted();
        return new AppResponse();
    }

    private AppResponse callBiStream() {
        StreamObserver<Object> obServer = (StreamObserver<Object>)this.getRpcInvocation().getArguments()[0];
        obServer = this.attachCancelContext(obServer, this.getCancellationContext());
        this.subscribe(obServer);
        return new AppResponse(this.inboundMessageObserver());
    }

    private <T> StreamObserver<T> attachCancelContext(StreamObserver<T> observer, CancellationContext context) {
        if (observer instanceof CancelableStreamObserver) {
            CancelableStreamObserver streamObserver = (CancelableStreamObserver)observer;
            streamObserver.setCancellationContext(context);
            return streamObserver;
        }
        return observer;
    }

    private class ClientStreamInboundTransportObserverImpl
    extends InboundTransportObserver {
        private boolean error = false;

        private ClientStreamInboundTransportObserverImpl() {
        }

        @Override
        public void onData(byte[] data, boolean endStream) {
            ClientStream.this.execute(() -> {
                try {
                    Object resp = ClientStream.this.deserializeResponse(data);
                    ClientStream.this.outboundMessageSubscriber().onNext(resp);
                }
                catch (Throwable throwable) {
                    this.onError(throwable);
                }
            });
        }

        @Override
        public void onError(GrpcStatus status) {
            this.onError(status.asException());
        }

        @Override
        public void onComplete() {
            ClientStream.this.execute(() -> {
                ClientStream.this.getState().setServerEndStreamReceived();
                GrpcStatus status = this.extractStatusFromMeta(this.getHeaders());
                if (GrpcStatus.Code.isOk(status.code.code)) {
                    ClientStream.this.outboundMessageSubscriber().onCompleted();
                } else {
                    this.onError(status.cause);
                }
            });
        }

        private void onError(Throwable throwable) {
            if (this.error) {
                return;
            }
            this.error = true;
            if (!ClientStream.this.getState().serverSendStreamReceived()) {
                ClientStream.this.cancel(throwable);
            }
            ClientStream.this.outboundMessageSubscriber().onError(throwable);
        }
    }
}

