/*
 * Decompiled with CFR 0.152.
 */
package com.alicloud.openservices.tablestore.tunnel.pipeline;

import com.alicloud.openservices.tablestore.model.StreamRecord;
import com.alicloud.openservices.tablestore.model.tunnel.internal.CheckpointResponse;
import com.alicloud.openservices.tablestore.model.tunnel.internal.ReadRecordsRequest;
import com.alicloud.openservices.tablestore.model.tunnel.internal.ReadRecordsResponse;
import com.alicloud.openservices.tablestore.tunnel.pipeline.AbstractStage;
import com.alicloud.openservices.tablestore.tunnel.pipeline.Pipeline;
import com.alicloud.openservices.tablestore.tunnel.pipeline.ProcessDataBackoff;
import com.alicloud.openservices.tablestore.tunnel.pipeline.ProcessDataPipelineContext;
import com.alicloud.openservices.tablestore.tunnel.pipeline.Stage;
import com.alicloud.openservices.tablestore.tunnel.pipeline.StageException;
import com.alicloud.openservices.tablestore.tunnel.worker.ChannelConnect;
import com.alicloud.openservices.tablestore.tunnel.worker.ChannelConnectStatus;
import com.alicloud.openservices.tablestore.tunnel.worker.IChannelProcessor;
import com.alicloud.openservices.tablestore.tunnel.worker.ProcessRecordsInput;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ProcessDataPipeline
implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(ProcessDataPipeline.class);
    private final ChannelConnect connect;
    private volatile boolean started = false;
    private final ThreadPoolExecutor readRecordsExecutor;
    private final ThreadPoolExecutor processRecordsExecutor;
    private final ExecutorService pipelineHelperExecutor;
    private ProcessDataBackoff backoff;
    private Pipeline<ReadRecordsRequest, CheckpointResponse> pipeline;
    private static final int COUNT_BAR = 500;
    private static final int SIZE_BAR = 921600;

    public ProcessDataPipeline(ChannelConnect connect, ExecutorService helperExecutor, ThreadPoolExecutor readRecordsExecutor, ThreadPoolExecutor processRecordsExecutor) {
        this.connect = connect;
        this.pipelineHelperExecutor = helperExecutor;
        this.readRecordsExecutor = readRecordsExecutor;
        this.processRecordsExecutor = processRecordsExecutor;
    }

    @Override
    public void run() {
        if (!this.started) {
            LOG.info("Initial process data pipeline.");
            this.pipeline = this.buildPipeline();
            this.pipeline.init(new ProcessDataPipelineContext(this.connect));
            this.started = true;
        }
        this.pipeline.process(new ReadRecordsRequest(this.connect.getTunnelId(), this.connect.getClientId(), this.connect.getChannelId(), this.connect.getToken()));
    }

    private Pipeline<ReadRecordsRequest, CheckpointResponse> buildPipeline() {
        Pipeline<ReadRecordsRequest, CheckpointResponse> pipeline = new Pipeline<ReadRecordsRequest, CheckpointResponse>(this.pipelineHelperExecutor);
        Stage<ReadRecordsRequest, ProcessRecordsInput> readRecordsStage = this.createReadRecordsStage();
        pipeline.addExecutorForStage(readRecordsStage, this.readRecordsExecutor);
        Stage<ProcessRecordsInput, Boolean> processRecordStage = this.createProcessRecordsStage();
        pipeline.addExecutorForStage(processRecordStage, this.processRecordsExecutor);
        return pipeline;
    }

    private Stage<ReadRecordsRequest, ProcessRecordsInput> createReadRecordsStage() {
        return new AbstractStage<ReadRecordsRequest, ProcessRecordsInput>(){

            @Override
            public ProcessRecordsInput doProcess(ReadRecordsRequest readRecordsRequest) throws StageException {
                if (ProcessDataPipeline.this.connect.getStatus() == ChannelConnectStatus.RUNNING) {
                    if (ProcessDataPipeline.this.connect.getToken() != null && !"finished".equals(ProcessDataPipeline.this.connect.getToken())) {
                        try {
                            LOG.debug("Begin read records, connect: {}", (Object)ProcessDataPipeline.this.connect);
                            long beginTs = System.currentTimeMillis();
                            ReadRecordsResponse resp = ProcessDataPipeline.this.connect.getClient().readRecords(readRecordsRequest);
                            List<StreamRecord> records = resp.getRecords();
                            LOG.info("GetRecords, Num: {}, Channel connect: {}, Latency: {} ms, Next Token: {}", new Object[]{records.size(), ProcessDataPipeline.this.connect, System.currentTimeMillis() - beginTs, resp.getNextToken()});
                            if (ProcessDataPipeline.this.backoff != null) {
                                if (ProcessDataPipeline.this.checkDataEnough(resp.getRecords().size(), resp.getMemoizedSerializedSize())) {
                                    LOG.debug("Backoff is reset");
                                    ProcessDataPipeline.this.backoff.reset();
                                } else {
                                    long sleepMills = ProcessDataPipeline.this.backoff.nextBackOffMillis();
                                    LOG.debug("Data is not full, sleep {} msec.", (Object)sleepMills);
                                    Thread.sleep(ProcessDataPipeline.this.backoff.nextBackOffMillis());
                                }
                            }
                            return new ProcessRecordsInput(resp.getRecords(), resp.getNextToken(), resp.getRequestId());
                        }
                        catch (Exception e) {
                            throw new StageException(this, readRecordsRequest, e.getMessage(), e);
                        }
                    }
                    LOG.info("Channel is finished, channel will be closed.");
                    ProcessDataPipeline.this.connect.close(true);
                    throw new StageException(this, readRecordsRequest, "Channel connect is finished.");
                }
                throw new StageException(this, readRecordsRequest, "Channel is not running.");
            }
        };
    }

    private Stage<ProcessRecordsInput, Boolean> createProcessRecordsStage() {
        return new AbstractStage<ProcessRecordsInput, Boolean>(){

            @Override
            public Boolean doProcess(ProcessRecordsInput processRecordsInput) throws StageException {
                if (ProcessDataPipeline.this.connect.getStatus() == ChannelConnectStatus.RUNNING) {
                    try {
                        IChannelProcessor processor = ProcessDataPipeline.this.connect.getProcessor();
                        processor.process(processRecordsInput);
                        ProcessDataPipeline.this.connect.setToken(processRecordsInput.getNextToken());
                        LOG.info("Continue run pipeline, connect: {}", (Object)ProcessDataPipeline.this.connect);
                        ProcessDataPipeline.this.connect.getChannelExecutorService().submit(ProcessDataPipeline.this.connect.getProcessPipeline());
                        return true;
                    }
                    catch (Exception e) {
                        throw new StageException(this, processRecordsInput, e.getMessage(), e);
                    }
                }
                throw new StageException(this, processRecordsInput, "Channel is not running.");
            }
        };
    }

    private boolean checkDataEnough(int numRec, int size) {
        return numRec > 500 || size > 921600;
    }

    public ProcessDataBackoff getBackoff() {
        return this.backoff;
    }

    public void setBackoff(ProcessDataBackoff backoff) {
        this.backoff = backoff;
    }
}

