/*
 * Decompiled with CFR 0.152.
 */
package io.vertx.grpc.common.impl;

import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.StreamResetException;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.future.PromiseInternal;
import io.vertx.core.streams.ReadStream;
import io.vertx.core.streams.impl.InboundBuffer;
import io.vertx.grpc.common.CodecException;
import io.vertx.grpc.common.GrpcError;
import io.vertx.grpc.common.GrpcMessage;
import io.vertx.grpc.common.GrpcMessageDecoder;
import io.vertx.grpc.common.GrpcReadStream;
import java.util.function.BiConsumer;
import java.util.stream.Collector;

public abstract class GrpcReadStreamBase<S extends GrpcReadStreamBase<S, T>, T>
implements GrpcReadStream<T>,
Handler<Buffer> {
    static final GrpcMessage END_SENTINEL = new GrpcMessage(){

        @Override
        public String encoding() {
            return null;
        }

        @Override
        public Buffer payload() {
            return null;
        }
    };
    protected final ContextInternal context;
    private final String encoding;
    private final ReadStream<Buffer> stream;
    private final InboundBuffer<GrpcMessage> queue;
    private Buffer buffer;
    private Handler<GrpcError> errorHandler;
    private Handler<Throwable> exceptionHandler;
    private Handler<GrpcMessage> messageHandler;
    private Handler<Void> endHandler;
    private GrpcMessage last;
    private final GrpcMessageDecoder<T> messageDecoder;
    private final Promise<Void> end;

    protected GrpcReadStreamBase(Context context, ReadStream<Buffer> stream, String encoding, GrpcMessageDecoder<T> messageDecoder) {
        this.context = (ContextInternal)context;
        this.encoding = encoding;
        this.stream = stream;
        this.queue = new InboundBuffer(context);
        this.messageDecoder = messageDecoder;
        this.end = ((ContextInternal)context).promise();
    }

    public void init() {
        this.stream.handler((Handler)this);
        this.stream.endHandler(v -> this.queue.write((Object)END_SENTINEL));
        this.stream.exceptionHandler(err -> {
            if (err instanceof StreamResetException) {
                this.handleReset(((StreamResetException)err).getCode());
            } else {
                this.handleException((Throwable)err);
            }
        });
        this.queue.drainHandler(v -> this.stream.resume());
        this.queue.handler(msg -> {
            if (msg == END_SENTINEL) {
                this.handleEnd();
            } else {
                this.handleMessage((GrpcMessage)msg);
            }
        });
    }

    protected T decodeMessage(GrpcMessage msg) throws CodecException {
        switch (msg.encoding()) {
            case "identity": {
                break;
            }
            case "gzip": {
                msg = GrpcMessage.message("identity", GrpcMessageDecoder.GZIP.decode(msg));
                break;
            }
            default: {
                throw new UnsupportedOperationException();
            }
        }
        return this.messageDecoder.decode(msg);
    }

    public void handle(Buffer chunk) {
        int len;
        if (this.buffer == null) {
            this.buffer = chunk;
        } else {
            this.buffer.appendBuffer(chunk);
        }
        int idx = 0;
        boolean pause = false;
        while (idx + 5 <= this.buffer.length() && idx + 5 + (len = this.buffer.getInt(idx + 1)) <= this.buffer.length()) {
            boolean compressed;
            boolean bl = compressed = this.buffer.getByte(idx) == 1;
            if (compressed && this.encoding == null) {
                throw new UnsupportedOperationException("Handle me");
            }
            Buffer payload = this.buffer.slice(idx + 5, idx + 5 + len);
            GrpcMessage message = GrpcMessage.message(compressed ? this.encoding : "identity", payload);
            pause |= !this.queue.write((Object)message);
            idx += 5 + len;
        }
        if (pause) {
            this.stream.pause();
        }
        this.buffer = idx < this.buffer.length() ? this.buffer.getBuffer(idx, this.buffer.length()) : null;
    }

    public S pause() {
        this.queue.pause();
        return (S)this;
    }

    public S resume() {
        this.queue.resume();
        return (S)this;
    }

    public S fetch(long amount) {
        this.queue.fetch(amount);
        return (S)this;
    }

    public S errorHandler(Handler<GrpcError> handler) {
        this.errorHandler = handler;
        return (S)this;
    }

    public S exceptionHandler(Handler<Throwable> handler) {
        this.exceptionHandler = handler;
        return (S)this;
    }

    public S messageHandler(Handler<GrpcMessage> handler) {
        this.messageHandler = handler;
        return (S)this;
    }

    public S endHandler(Handler<Void> endHandler) {
        this.endHandler = endHandler;
        return (S)this;
    }

    protected void handleReset(long code) {
        GrpcError error;
        Handler<GrpcError> handler = this.errorHandler;
        if (handler != null && (error = GrpcError.mapHttp2ErrorCode(code)) != null) {
            handler.handle((Object)error);
        }
    }

    protected void handleException(Throwable err) {
        this.end.tryFail(err);
        Handler<Throwable> handler = this.exceptionHandler;
        if (handler != null) {
            handler.handle((Object)err);
        }
    }

    protected void handleEnd() {
        this.end.tryComplete();
        Handler<Void> handler = this.endHandler;
        if (handler != null) {
            handler.handle(null);
        }
    }

    protected void handleMessage(GrpcMessage msg) {
        this.last = msg;
        Handler<GrpcMessage> handler = this.messageHandler;
        if (handler != null) {
            handler.handle((Object)msg);
        }
    }

    @Override
    public Future<T> last() {
        return this.end().map(v -> this.decodeMessage(this.last));
    }

    @Override
    public Future<Void> end() {
        return this.end.future();
    }

    @Override
    public <R, C> Future<R> collecting(Collector<T, C, R> collector) {
        PromiseInternal promise = this.context.promise();
        Object cumulation = collector.supplier().get();
        BiConsumer accumulator = collector.accumulator();
        this.handler(elt -> accumulator.accept(cumulation, elt));
        this.endHandler(v -> {
            Object result = collector.finisher().apply(cumulation);
            promise.tryComplete(result);
        });
        this.exceptionHandler(arg_0 -> ((PromiseInternal)promise).tryFail(arg_0));
        return promise.future();
    }
}

