/*
 * Decompiled with CFR 0.152.
 */
package com.taosdata.jdbc.ws;

import com.taosdata.jdbc.BlockData;
import com.taosdata.jdbc.TSDBError;
import com.taosdata.jdbc.enums.FetchState;
import com.taosdata.jdbc.rs.RestfulResultSet;
import com.taosdata.jdbc.utils.FetchDataUtil;
import com.taosdata.jdbc.utils.Utils;
import com.taosdata.jdbc.ws.Transport;
import com.taosdata.jdbc.ws.entity.Code;
import com.taosdata.jdbc.ws.entity.FetchBlockNewResp;
import com.taosdata.jdbc.ws.entity.QueryResp;
import java.sql.SQLException;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FetchBlockData {
    private static final Logger log = LoggerFactory.getLogger(FetchBlockData.class);
    private final Transport transport;
    private final long queryId;
    private final long reqId;
    private static final int CACHE_SIZE = 5;
    private BlockingQueue<BlockData> blockingQueue = new LinkedBlockingQueue<BlockData>(5);
    private ForkJoinPool dataHandleExecutor = Utils.getForkJoinPool();
    private List<RestfulResultSet.Field> fields;
    private volatile FetchState fetchState = FetchState.STOPPED;
    private final long timeoutMs;
    private final int precision;

    public FetchBlockData(Transport transport, QueryResp response, List<RestfulResultSet.Field> fields, int timeoutMs, int precision) {
        this.transport = transport;
        this.queryId = response.getId();
        this.reqId = response.getReqId();
        this.fields = fields;
        this.timeoutMs = timeoutMs;
        this.precision = precision;
    }

    public void handleReceiveBlockData(FetchBlockNewResp resp) throws InterruptedException {
        BlockData blockData = BlockData.getEmptyBlockData(this.fields, this.precision);
        resp.init();
        if (Code.SUCCESS.getCode() != resp.getCode()) {
            blockData.setReturnCode(resp.getCode());
            blockData.setErrorMessage(resp.getMessage());
            blockData.doneWithNoData();
            this.blockingQueue.put(blockData);
            this.fetchState = FetchState.FINISHED_ERROR;
            return;
        }
        if (resp.isCompleted()) {
            blockData.setCompleted(true);
            blockData.doneWithNoData();
            this.blockingQueue.put(blockData);
            FetchDataUtil.getFetchMap().remove(this.reqId);
            return;
        }
        blockData.setBuffer(resp.getBuffer());
        this.blockingQueue.put(blockData);
        this.dataHandleExecutor.submit(blockData::handleData);
        if (this.blockingQueue.remainingCapacity() > 0) {
            try {
                this.transport.sendFetchBlockAsync(this.reqId, this.queryId);
                return;
            }
            catch (SQLException e) {
                log.error("Error when sending fetch block request:", (Throwable)e);
                blockData.setReturnCode(9040);
                blockData.setErrorMessage("ErrorCode: " + e.getErrorCode() + ", Error when sending fetch block request: " + e.getMessage());
                this.blockingQueue.put(blockData);
            }
        }
        this.fetchState = FetchState.STOPPED;
    }

    public BlockData getBlockData() throws SQLException {
        BlockData blockData;
        if (this.fetchState == FetchState.STOPPED || this.fetchState == FetchState.FINISHED_ERROR && this.blockingQueue.isEmpty()) {
            this.transport.sendFetchBlockAsync(this.reqId, this.queryId);
            this.fetchState = FetchState.FETCHING;
        }
        try {
            blockData = this.blockingQueue.poll(this.timeoutMs, TimeUnit.MILLISECONDS);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw TSDBError.createSQLException(9040, "FETCH DATA INTERRUPTED");
        }
        if (blockData == null) {
            this.fetchState = FetchState.STOPPED;
            throw TSDBError.createSQLException(8990, "Fetch block data timeout after " + this.timeoutMs + " ms");
        }
        return blockData;
    }
}

