package org.frameworkset.tran.kafka.input;

import org.frameworkset.tran.BaseDataTran;
import org.frameworkset.tran.BaseDataTranPlugin;
import org.frameworkset.tran.DataTranPlugin;
import org.frameworkset.tran.ESDataImportException;
import org.frameworkset.tran.TranResultSet;
import org.frameworkset.tran.context.ImportContext;
import org.frameworkset.tran.kafka.KafkaContext;
import org.frameworkset.tran.kafka.KafkaResultSet;
import org.frameworkset.tran.schedule.Status;
import org.frameworkset.tran.schedule.TaskContext;

/* loaded from: input_file:org/frameworkset/tran/kafka/input/BaseKafkaInputPlugin.class */
public abstract class BaseKafkaInputPlugin extends BaseDataTranPlugin implements DataTranPlugin {
    protected KafkaContext kafkaContext;

    protected void init(ImportContext importContext, ImportContext importContext2) {
        super.init(importContext, importContext2);
        this.kafkaContext = (KafkaContext) importContext;
    }

    public void initStatusTableId() {
    }

    public BaseKafkaInputPlugin(ImportContext importContext, ImportContext importContext2) {
        super(importContext, importContext2);
    }

    public void importData() throws ESDataImportException {
        long currentTimeMillis = System.currentTimeMillis();
        doImportData(null);
        long currentTimeMillis2 = System.currentTimeMillis();
        if (isPrintTaskLog()) {
            this.logger.info("Execute job Take " + (currentTimeMillis2 - currentTimeMillis) + " ms");
        }
    }

    public void beforeInit() {
    }

    public void afterInit() {
    }

    protected abstract void initKafkaTranBatchConsumer2ndStore(BaseDataTran baseDataTran) throws Exception;

    protected abstract BaseDataTran createBaseDataTran(TaskContext taskContext, TranResultSet tranResultSet, Status status);

    public void doImportData(TaskContext taskContext) throws ESDataImportException {
        final BaseDataTran createBaseDataTran = createBaseDataTran(taskContext, new KafkaResultSet(this.importContext), this.currentStatus);
        setHasTran();
        try {
            try {
                new Thread(new Runnable() { // from class: org.frameworkset.tran.kafka.input.BaseKafkaInputPlugin.1
                    @Override // java.lang.Runnable
                    public void run() {
                        createBaseDataTran.tran();
                    }
                }, "kafka-elasticsearch-Tran").start();
                initKafkaTranBatchConsumer2ndStore(createBaseDataTran);
            } catch (Exception e) {
                throw new ESDataImportException(e);
            }
        } catch (ESDataImportException e2) {
            throw e2;
        }
    }

    public void initSchedule() {
        this.logger.info("Ignore initSchedule for plugin {}", getClass().getName());
    }
}
