package com.citahub.cita.protocol.rx;

import com.citahub.cita.protocol.CITAj;
import com.citahub.cita.protocol.core.DefaultBlockParameter;
import com.citahub.cita.protocol.core.DefaultBlockParameterName;
import com.citahub.cita.protocol.core.DefaultBlockParameterNumber;
import com.citahub.cita.protocol.core.filters.BlockFilter;
import com.citahub.cita.protocol.core.filters.Callback;
import com.citahub.cita.protocol.core.filters.Filter;
import com.citahub.cita.protocol.core.filters.LogFilter;
import com.citahub.cita.protocol.core.filters.PendingTransactionFilter;
import com.citahub.cita.protocol.core.methods.request.AppFilter;
import com.citahub.cita.protocol.core.methods.response.AppBlock;
import com.citahub.cita.protocol.core.methods.response.AppTransaction;
import com.citahub.cita.protocol.core.methods.response.Log;
import com.citahub.cita.protocol.core.methods.response.Transaction;
import com.citahub.cita.utils.Flowables;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.FlowableEmitter;
import io.reactivex.FlowableOnSubscribe;
import io.reactivex.Scheduler;
import io.reactivex.functions.Cancellable;
import io.reactivex.functions.Function;
import io.reactivex.functions.Predicate;
import io.reactivex.schedulers.Schedulers;
import java.io.IOException;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import org.reactivestreams.Publisher;

/* loaded from: input_file:com/citahub/cita/protocol/rx/JsonRpc2_0Rx.class */
public class JsonRpc2_0Rx {
    private final CITAj citaj;
    private final ScheduledExecutorService scheduledExecutorService;
    private final Scheduler scheduler;

    public JsonRpc2_0Rx(CITAj cITAj, ScheduledExecutorService scheduledExecutorService) {
        this.citaj = cITAj;
        this.scheduledExecutorService = scheduledExecutorService;
        this.scheduler = Schedulers.from(scheduledExecutorService);
    }

    public Flowable<String> appBlockHashFlowable(long j) {
        return Flowable.create(flowableEmitter -> {
            run(new BlockFilter(this.citaj, new Callback<String>() { // from class: com.citahub.cita.protocol.rx.JsonRpc2_0Rx.1
                @Override // com.citahub.cita.protocol.core.filters.Callback
                public void onEvent(String str) {
                    flowableEmitter.onNext(str);
                }
            }), flowableEmitter, j);
        }, BackpressureStrategy.BUFFER);
    }

    public Flowable<String> appPendingTransactionHashFlowable(long j) {
        return Flowable.create(flowableEmitter -> {
            CITAj cITAj = this.citaj;
            flowableEmitter.getClass();
            run(new PendingTransactionFilter(cITAj, (v1) -> {
                r3.onNext(v1);
            }), flowableEmitter, j);
        }, BackpressureStrategy.BUFFER);
    }

    public Flowable<Log> appLogFlowable(final AppFilter appFilter, final long j) {
        return Flowable.create(new FlowableOnSubscribe<Log>() { // from class: com.citahub.cita.protocol.rx.JsonRpc2_0Rx.2
            public void subscribe(final FlowableEmitter<Log> flowableEmitter) throws Exception {
                JsonRpc2_0Rx.this.run(new LogFilter(JsonRpc2_0Rx.this.citaj, new Callback<Log>() { // from class: com.citahub.cita.protocol.rx.JsonRpc2_0Rx.2.1
                    @Override // com.citahub.cita.protocol.core.filters.Callback
                    public void onEvent(Log log) {
                        flowableEmitter.onNext(log);
                    }
                }, appFilter), flowableEmitter, j);
            }
        }, BackpressureStrategy.BUFFER);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public <T> void run(final Filter<T> filter, FlowableEmitter<? super T> flowableEmitter, long j) {
        filter.run(this.scheduledExecutorService, j);
        flowableEmitter.setCancellable(new Cancellable() { // from class: com.citahub.cita.protocol.rx.JsonRpc2_0Rx.3
            public void cancel() throws Exception {
                filter.cancel();
            }
        });
    }

    public Flowable<Transaction> transactionFlowable(long j) {
        return blockFlowable(true, j).flatMapIterable(JsonRpc2_0Rx::toTransactions);
    }

    public Flowable<Transaction> pendingTransactionFlowable(long j) {
        return appPendingTransactionHashFlowable(j).flatMap(new Function<String, Publisher<AppTransaction>>() { // from class: com.citahub.cita.protocol.rx.JsonRpc2_0Rx.6
            public Publisher<AppTransaction> apply(String str) throws Exception {
                return JsonRpc2_0Rx.this.citaj.appGetTransactionByHash(str).flowable();
            }
        }).filter(new Predicate<AppTransaction>() { // from class: com.citahub.cita.protocol.rx.JsonRpc2_0Rx.5
            public boolean test(AppTransaction appTransaction) throws Exception {
                return appTransaction.getTransaction() != null;
            }
        }).map(new Function<AppTransaction, Transaction>() { // from class: com.citahub.cita.protocol.rx.JsonRpc2_0Rx.4
            public Transaction apply(AppTransaction appTransaction) throws Exception {
                return appTransaction.getTransaction();
            }
        });
    }

    public Flowable<AppBlock> blockFlowable(boolean z, long j) {
        return appBlockHashFlowable(j).flatMap(str -> {
            return this.citaj.appGetBlockByHash(str, z).flowable();
        });
    }

    public Flowable<AppBlock> replayBlocksFlowable(DefaultBlockParameter defaultBlockParameter, DefaultBlockParameter defaultBlockParameter2, boolean z) {
        return replayBlocksFlowable(defaultBlockParameter, defaultBlockParameter2, z, true);
    }

    public Flowable<AppBlock> replayBlocksFlowable(DefaultBlockParameter defaultBlockParameter, DefaultBlockParameter defaultBlockParameter2, boolean z, boolean z2) {
        return replayBlocksFlowableSync(defaultBlockParameter, defaultBlockParameter2, z, z2).subscribeOn(this.scheduler);
    }

    private Flowable<AppBlock> replayBlocksFlowableSync(DefaultBlockParameter defaultBlockParameter, DefaultBlockParameter defaultBlockParameter2, boolean z) {
        return replayBlocksFlowableSync(defaultBlockParameter, defaultBlockParameter2, z, true);
    }

    private Flowable<AppBlock> replayBlocksFlowableSync(DefaultBlockParameter defaultBlockParameter, DefaultBlockParameter defaultBlockParameter2, boolean z, boolean z2) {
        BigInteger bigInteger = null;
        BigInteger bigInteger2 = null;
        try {
            bigInteger = getBlockNumber(defaultBlockParameter);
            bigInteger2 = getBlockNumber(defaultBlockParameter2);
        } catch (IOException e) {
            Flowable.error(e);
        }
        return z2 ? Flowables.range(bigInteger, bigInteger2).flatMap(bigInteger3 -> {
            return this.citaj.appGetBlockByNumber(new DefaultBlockParameterNumber(bigInteger3), z).flowable();
        }) : Flowables.range(bigInteger, bigInteger2, false).flatMap(bigInteger4 -> {
            return this.citaj.appGetBlockByNumber(new DefaultBlockParameterNumber(bigInteger4), z).flowable();
        });
    }

    public Flowable<Transaction> replayTransactionsFlowable(DefaultBlockParameter defaultBlockParameter, DefaultBlockParameter defaultBlockParameter2) {
        return replayBlocksFlowable(defaultBlockParameter, defaultBlockParameter2, true).flatMapIterable(JsonRpc2_0Rx::toTransactions);
    }

    public Flowable<AppBlock> catchUpToLatestBlockFlowable(DefaultBlockParameter defaultBlockParameter, boolean z, Flowable<AppBlock> flowable) {
        return catchUpToLatestBlockFlowableSync(defaultBlockParameter, z, flowable).subscribeOn(this.scheduler);
    }

    public Flowable<AppBlock> catchUpToLatestBlockFlowable(DefaultBlockParameter defaultBlockParameter, boolean z) {
        return catchUpToLatestBlockFlowable(defaultBlockParameter, z, Flowable.empty());
    }

    private Flowable<AppBlock> catchUpToLatestBlockFlowableSync(DefaultBlockParameter defaultBlockParameter, boolean z, Flowable<AppBlock> flowable) {
        try {
            BigInteger blockNumber = getBlockNumber(defaultBlockParameter);
            BigInteger latestBlockNumber = getLatestBlockNumber();
            return blockNumber.compareTo(latestBlockNumber) > -1 ? flowable : Flowable.concat(replayBlocksFlowableSync(new DefaultBlockParameterNumber(blockNumber), new DefaultBlockParameterNumber(latestBlockNumber), z), Flowable.defer(() -> {
                return catchUpToLatestBlockFlowableSync(new DefaultBlockParameterNumber(latestBlockNumber.add(BigInteger.ONE)), z, flowable);
            }));
        } catch (IOException e) {
            return Flowable.error(e);
        }
    }

    public Flowable<Transaction> catchUpToLatestTransactionFlowable(DefaultBlockParameter defaultBlockParameter) {
        return catchUpToLatestBlockFlowable(defaultBlockParameter, true, Flowable.empty()).flatMapIterable(JsonRpc2_0Rx::toTransactions);
    }

    public Flowable<AppBlock> catchUpToLatestAndSubscribeToNewBlocksFlowable(DefaultBlockParameter defaultBlockParameter, boolean z, long j) {
        return catchUpToLatestBlockFlowable(defaultBlockParameter, z, blockFlowable(z, j));
    }

    public Flowable<Transaction> catchUpToLatestAndSubscribeToNewTransactionsFlowable(DefaultBlockParameter defaultBlockParameter, long j) {
        return catchUpToLatestAndSubscribeToNewBlocksFlowable(defaultBlockParameter, true, j).flatMapIterable(JsonRpc2_0Rx::toTransactions);
    }

    private BigInteger getLatestBlockNumber() throws IOException {
        return getBlockNumber(DefaultBlockParameterName.PENDING);
    }

    private BigInteger getBlockNumber(DefaultBlockParameter defaultBlockParameter) throws IOException {
        return defaultBlockParameter instanceof DefaultBlockParameterNumber ? ((DefaultBlockParameterNumber) defaultBlockParameter).getBlockNumber() : this.citaj.appGetBlockByNumber(defaultBlockParameter, false).send().getBlock().getHeader().getNumberDec();
    }

    private static List<Transaction> toTransactions(AppBlock appBlock) {
        List<AppBlock.TransactionObject> transactions = appBlock.getBlock().getBody().getTransactions();
        ArrayList arrayList = new ArrayList(transactions.size());
        Iterator<AppBlock.TransactionObject> it = transactions.iterator();
        while (it.hasNext()) {
            arrayList.add(it.next().get());
        }
        return arrayList;
    }
}
