package org.frameworkset.tran.kafka.output;

import com.frameworkset.orm.annotation.BatchContext;
import java.util.Date;
import java.util.concurrent.CountDownLatch;
import org.frameworkset.soa.BBossStringWriter;
import org.frameworkset.tran.BaseCommonRecordDataTran;
import org.frameworkset.tran.CommonRecord;
import org.frameworkset.tran.DataImportException;
import org.frameworkset.tran.ESDataImportException;
import org.frameworkset.tran.TranErrorWrapper;
import org.frameworkset.tran.TranResultSet;
import org.frameworkset.tran.context.Context;
import org.frameworkset.tran.context.ImportContext;
import org.frameworkset.tran.metrics.SerialImportCount;
import org.frameworkset.tran.schedule.Status;
import org.frameworkset.tran.schedule.TaskContext;
import org.frameworkset.tran.task.TaskCall;
import org.slf4j.Logger;

/* loaded from: input_file:org/frameworkset/tran/kafka/output/KafkaOutputDataTran.class */
public class KafkaOutputDataTran extends BaseCommonRecordDataTran {
    protected String taskInfo;
    private CountDownLatch countDownLatch;
    private KafkaOutputContext kafkaOutputContext;

    public void logTaskStart(Logger logger) {
        logger.info(this.taskInfo);
    }

    public void init() {
        super.init();
        this.kafkaOutputContext = this.targetImportContext;
        this.taskInfo = "import data to kafka topic[" + this.kafkaOutputContext.getTopic() + "].";
    }

    public KafkaOutputDataTran(TaskContext taskContext, TranResultSet tranResultSet, ImportContext importContext, ImportContext importContext2, CountDownLatch countDownLatch, Status status) {
        super(taskContext, tranResultSet, importContext, importContext2, status);
        this.countDownLatch = countDownLatch;
    }

    public String serialExecute() {
        this.logger.info("import data to kafka start.");
        Object obj = null;
        long currentTimeMillis = System.currentTimeMillis();
        Status status = this.currentStatus;
        Object lastValue = status != null ? status.getLastValue() : null;
        SerialImportCount serialImportCount = new SerialImportCount();
        long j = 0;
        boolean z = false;
        try {
            while (true) {
                try {
                    try {
                        Boolean next = this.jdbcResultSet.next();
                        if (next != null) {
                            if (!next.booleanValue()) {
                                break;
                            }
                            if (obj == null) {
                                try {
                                    obj = this.importContext.max(lastValue, getLastValue());
                                } catch (Exception e) {
                                    throw new DataImportException(e);
                                }
                            } else {
                                obj = this.importContext.max(obj, getLastValue());
                            }
                            Context buildContext = this.importContext.buildContext(this.taskContext, this.jdbcResultSet, (BatchContext) null);
                            if (!z) {
                                z = buildContext.reachEOFClosed();
                            }
                            if (buildContext.removed()) {
                                serialImportCount.increamentIgnoreTotalCount();
                            } else {
                                buildContext.refactorData();
                                buildContext.afterRefactor();
                                if (buildContext.isDrop()) {
                                    serialImportCount.increamentIgnoreTotalCount();
                                } else {
                                    CommonRecord buildRecord = buildRecord(buildContext);
                                    StringBuilder sb = new StringBuilder();
                                    this.kafkaOutputContext.generateReocord(buildContext, buildRecord, new BBossStringWriter(sb));
                                    KafkaCommand kafkaCommand = new KafkaCommand(serialImportCount, this.importContext, this.targetImportContext, 1L, -1, serialImportCount.getJobNo(), obj, this.taskContext, status, z);
                                    kafkaCommand.setDatas(sb.toString());
                                    kafkaCommand.setKey(buildRecord.getRecordKey());
                                    TaskCall.asynCall(kafkaCommand);
                                    j++;
                                }
                            }
                        }
                    } catch (DataImportException e2) {
                        throw e2;
                    }
                } catch (Exception e3) {
                    throw new DataImportException(e3);
                }
            }
            if (isPrintTaskLog()) {
                this.logger.info("Serial import Take time:" + (System.currentTimeMillis() - currentTimeMillis) + "ms,Import total " + j + " records,IgnoreTotalCount " + serialImportCount.getIgnoreTotalCount() + " records.");
            }
            return null;
        } finally {
            if (!TranErrorWrapper.assertCondition((Exception) null, this.importContext)) {
                if (this.importContext.getDataTranPlugin().isMultiTran()) {
                    stopTranOnly();
                } else {
                    stop();
                }
            }
            if (this.importContext.isCurrentStoped()) {
                stopTranOnly();
            }
            serialImportCount.setJobEndTime(new Date());
        }
    }

    public String parallelBatchExecute() {
        return serialExecute();
    }

    public String batchExecute() {
        return serialExecute();
    }

    public void stop() {
        if (this.esTranResultSet != null) {
            this.esTranResultSet.stop();
            this.esTranResultSet = null;
        }
        super.stop();
    }

    public void stopTranOnly() {
        if (this.esTranResultSet != null) {
            this.esTranResultSet.stopTranOnly();
            this.esTranResultSet = null;
        }
        super.stopTranOnly();
    }

    public String tran() throws ESDataImportException {
        try {
            return super.tran();
        } finally {
            if (this.countDownLatch != null) {
                this.countDownLatch.countDown();
            }
        }
    }
}
