/*
 * Decompiled with CFR 0.152.
 */
package org.apache.shardingsphere.data.pipeline.core.importer;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import javax.sql.DataSource;
import lombok.Generated;
import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.ImporterConfiguration;
import org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor;
import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.FinishedRecord;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.GroupedDataRecord;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobExecutionException;
import org.apache.shardingsphere.data.pipeline.core.importer.DataRecordMerger;
import org.apache.shardingsphere.data.pipeline.core.record.RecordUtil;
import org.apache.shardingsphere.data.pipeline.core.util.ThreadUtil;
import org.apache.shardingsphere.data.pipeline.spi.importer.Importer;
import org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractImporter
extends AbstractLifecycleExecutor
implements Importer {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(AbstractImporter.class);
    private static final DataRecordMerger MERGER = new DataRecordMerger();
    private final ImporterConfiguration importerConfig;
    private final PipelineDataSourceManager dataSourceManager;
    private final PipelineSQLBuilder pipelineSqlBuilder;
    private final PipelineChannel channel;

    protected AbstractImporter(ImporterConfiguration importerConfig, PipelineDataSourceManager dataSourceManager, PipelineChannel channel) {
        this.importerConfig = importerConfig;
        this.dataSourceManager = dataSourceManager;
        this.channel = channel;
        this.pipelineSqlBuilder = this.createSQLBuilder(importerConfig.getShardingColumnsMap());
    }

    protected abstract PipelineSQLBuilder createSQLBuilder(Map<String, Set<String>> var1);

    protected void doStart() {
        this.write();
    }

    private void write() {
        log.info("importer write");
        int round = 1;
        int rowCount = 0;
        boolean finishedByBreak = false;
        int batchSize = this.importerConfig.getBatchSize() * 2;
        while (this.isRunning()) {
            List records = this.channel.fetchRecords(batchSize, 3);
            if (null == records || records.isEmpty()) continue;
            ++round;
            rowCount += records.size();
            this.flush((DataSource)this.dataSourceManager.getDataSource(this.importerConfig.getDataSourceConfig()), records);
            this.channel.ack(records);
            if (log.isDebugEnabled()) {
                log.debug("importer write, round={}, rowCount={}", (Object)round, (Object)rowCount);
            } else if (0 == round % 50) {
                log.info("importer write, round={}, rowCount={}", (Object)round, (Object)rowCount);
            }
            if (!FinishedRecord.class.equals(((Record)records.get(records.size() - 1)).getClass())) continue;
            log.info("write, get FinishedRecord, break");
            finishedByBreak = true;
            break;
        }
        log.info("importer write done, rowCount={}, finishedByBreak={}", (Object)rowCount, (Object)finishedByBreak);
    }

    private void flush(DataSource dataSource, List<Record> buffer) {
        List<GroupedDataRecord> groupedDataRecords = MERGER.group(buffer.stream().filter(each -> each instanceof DataRecord).map(each -> (DataRecord)each).collect(Collectors.toList()));
        groupedDataRecords.forEach(each -> {
            this.flushInternal(dataSource, each.getDeleteDataRecords());
            this.flushInternal(dataSource, each.getInsertDataRecords());
            this.flushInternal(dataSource, each.getUpdateDataRecords());
        });
    }

    private void flushInternal(DataSource dataSource, List<DataRecord> buffer) {
        if (null == buffer || buffer.isEmpty()) {
            return;
        }
        boolean success = this.tryFlush(dataSource, buffer);
        if (this.isRunning() && !success) {
            throw new PipelineJobExecutionException("write failed.");
        }
    }

    private boolean tryFlush(DataSource dataSource, List<DataRecord> buffer) {
        for (int i = 0; this.isRunning() && i <= this.importerConfig.getRetryTimes(); ++i) {
            try {
                this.doFlush(dataSource, buffer);
                return true;
            }
            catch (SQLException ex) {
                log.error("flush failed {}/{} times.", new Object[]{i, this.importerConfig.getRetryTimes(), ex});
                ThreadUtil.sleep(Math.min(300000L, 1000L << i));
                continue;
            }
        }
        return false;
    }

    private void doFlush(DataSource dataSource, List<DataRecord> buffer) throws SQLException {
        try (Connection connection = dataSource.getConnection();){
            connection.setAutoCommit(false);
            switch (buffer.get(0).getType()) {
                case "INSERT": {
                    this.executeBatchInsert(connection, buffer);
                    break;
                }
                case "UPDATE": {
                    this.executeUpdate(connection, buffer);
                    break;
                }
                case "DELETE": {
                    this.executeBatchDelete(connection, buffer);
                    break;
                }
            }
            connection.commit();
        }
    }

    private void executeBatchInsert(Connection connection, List<DataRecord> dataRecords) throws SQLException {
        String insertSql = this.pipelineSqlBuilder.buildInsertSQL(dataRecords.get(0));
        try (PreparedStatement ps = connection.prepareStatement(insertSql);){
            ps.setQueryTimeout(30);
            for (DataRecord each : dataRecords) {
                for (int i = 0; i < each.getColumnCount(); ++i) {
                    ps.setObject(i + 1, each.getColumn(i).getValue());
                }
                ps.addBatch();
            }
            ps.executeBatch();
        }
    }

    private void executeUpdate(Connection connection, List<DataRecord> dataRecords) throws SQLException {
        for (DataRecord each : dataRecords) {
            this.executeUpdate(connection, each);
        }
    }

    private void executeUpdate(Connection connection, DataRecord record) throws SQLException {
        List<Column> conditionColumns = RecordUtil.extractConditionColumns(record, (Set)this.importerConfig.getShardingColumnsMap().get(record.getTableName()));
        List updatedColumns = this.pipelineSqlBuilder.extractUpdatedColumns((Collection)record.getColumns(), record);
        String updateSql = this.pipelineSqlBuilder.buildUpdateSQL(record, conditionColumns);
        try (PreparedStatement ps = connection.prepareStatement(updateSql);){
            int i;
            for (i = 0; i < updatedColumns.size(); ++i) {
                ps.setObject(i + 1, ((Column)updatedColumns.get(i)).getValue());
            }
            for (i = 0; i < conditionColumns.size(); ++i) {
                Column keyColumn = conditionColumns.get(i);
                ps.setObject(updatedColumns.size() + i + 1, keyColumn.isPrimaryKey() && keyColumn.isUpdated() ? keyColumn.getOldValue() : keyColumn.getValue());
            }
            ps.execute();
        }
    }

    private void executeBatchDelete(Connection connection, List<DataRecord> dataRecords) throws SQLException {
        List<Column> conditionColumns = RecordUtil.extractConditionColumns(dataRecords.get(0), (Set)this.importerConfig.getShardingColumnsMap().get(dataRecords.get(0).getTableName()));
        String deleteSQL = this.pipelineSqlBuilder.buildDeleteSQL(dataRecords.get(0), conditionColumns);
        try (PreparedStatement ps = connection.prepareStatement(deleteSQL);){
            ps.setQueryTimeout(30);
            for (DataRecord each : dataRecords) {
                conditionColumns = RecordUtil.extractConditionColumns(each, (Set)this.importerConfig.getShardingColumnsMap().get(each.getTableName()));
                for (int i = 0; i < conditionColumns.size(); ++i) {
                    ps.setObject(i + 1, conditionColumns.get(i).getValue());
                }
                ps.addBatch();
            }
            ps.executeBatch();
        }
    }

    protected void doStop() {
    }
}

