package com.taobao.tair.reactive;

import com.taobao.tair.async.TairFuture;
import com.taobao.tair.async.TairFutureListener;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

/* loaded from: input_file:lib/tair-client-4.2.3.jar:com/taobao/tair/reactive/TairPublisher.class */
public class TairPublisher<T> implements Publisher<T> {
    Function<TairFuture<T>> function;

    /* loaded from: input_file:lib/tair-client-4.2.3.jar:com/taobao/tair/reactive/TairPublisher$SubscriptionImpl.class */
    final class SubscriptionImpl implements Subscription {
        final Subscriber<? super T> subscriber;
        boolean cancelled = false;
        boolean completed = false;

        SubscriptionImpl(Subscriber<? super T> subscriber) {
            this.subscriber = subscriber;
        }

        void init() {
            doSubscribe();
        }

        void doSubscribe() {
            this.subscriber.onSubscribe(this);
        }

        @Override // org.reactivestreams.Subscription
        public void request(long j) {
            if (this.completed || this.cancelled) {
                return;
            }
            this.completed = true;
            try {
                TairPublisher.this.function.call().addListener(new TairFutureListener<T>() { // from class: com.taobao.tair.reactive.TairPublisher.SubscriptionImpl.1
                    @Override // com.taobao.tair.async.TairFutureListener
                    public void operationComplete(TairFuture<? extends T> tairFuture) {
                        if (!SubscriptionImpl.this.cancelled) {
                            SubscriptionImpl.this.subscriber.onNext(tairFuture.get());
                        }
                        if (SubscriptionImpl.this.cancelled) {
                            return;
                        }
                        SubscriptionImpl.this.subscriber.onComplete();
                    }
                });
            } catch (Throwable th) {
                if (this.cancelled) {
                    return;
                }
                this.subscriber.onError(th);
            }
        }

        @Override // org.reactivestreams.Subscription
        public void cancel() {
            if (this.cancelled) {
                return;
            }
            this.cancelled = true;
        }
    }

    public TairPublisher(Function<TairFuture<T>> function) {
        this.function = function;
    }

    @Override // org.reactivestreams.Publisher
    public void subscribe(Subscriber<? super T> subscriber) {
        new SubscriptionImpl(subscriber).init();
    }
}
