/*
 * Decompiled with CFR 0.152.
 */
package org.apache.shardingsphere.data.pipeline.core.api.impl;

import com.google.common.base.Preconditions;
import java.sql.SQLException;
import java.time.LocalDateTime;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import lombok.Generated;
import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPI;
import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.JobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.WorkflowConfiguration;
import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.api.job.progress.JobProgress;
import org.apache.shardingsphere.data.pipeline.api.pojo.DataConsistencyCheckAlgorithmInfo;
import org.apache.shardingsphere.data.pipeline.api.pojo.JobInfo;
import org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.api.impl.AbstractPipelineJobAPIImpl;
import org.apache.shardingsphere.data.pipeline.core.check.consistency.DataConsistencyChecker;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobCreationException;
import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobExecutionException;
import org.apache.shardingsphere.data.pipeline.core.exception.PipelineVerifyFailedException;
import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredContext;
import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJob;
import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobContext;
import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobPreparer;
import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobProgressDetector;
import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobWorker;
import org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCheckAlgorithm;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import org.apache.shardingsphere.elasticjob.lite.lifecycle.domain.JobBriefInfo;
import org.apache.shardingsphere.infra.config.TypedSPIConfiguration;
import org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithmConfiguration;
import org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithmFactory;
import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
import org.apache.shardingsphere.infra.eventbus.ShardingSphereEventBus;
import org.apache.shardingsphere.infra.lock.LockContext;
import org.apache.shardingsphere.infra.lock.ShardingSphereLock;
import org.apache.shardingsphere.infra.yaml.engine.YamlEngine;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.rule.ScalingTaskFinishedEvent;
import org.apache.shardingsphere.scaling.core.job.check.EnvironmentCheckerFactory;
import org.apache.shardingsphere.scaling.core.job.environment.ScalingEnvironmentManager;
import org.apache.shardingsphere.spi.singleton.SingletonSPIRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class RuleAlteredJobAPIImpl
extends AbstractPipelineJobAPIImpl
implements RuleAlteredJobAPI {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(RuleAlteredJobAPIImpl.class);
    private static final Map<String, DataConsistencyCheckAlgorithm> DATA_CONSISTENCY_CHECK_ALGORITHM_MAP = new TreeMap<String, DataConsistencyCheckAlgorithm>(SingletonSPIRegistry.getTypedSingletonInstancesMap(DataConsistencyCheckAlgorithm.class));

    public List<JobInfo> list() {
        this.checkModeConfig();
        return this.getJobBriefInfos().map(each -> this.getJobInfo(each.getJobName())).collect(Collectors.toList());
    }

    private void checkModeConfig() {
        ModeConfiguration modeConfig = PipelineContext.getModeConfig();
        Preconditions.checkNotNull((Object)modeConfig, (Object)"Mode configuration is required.");
        Preconditions.checkArgument((boolean)"Cluster".equalsIgnoreCase(modeConfig.getType()), (Object)"Mode must be `Cluster`.");
    }

    private Stream<JobBriefInfo> getJobBriefInfos() {
        return PipelineAPIFactory.getJobStatisticsAPI().getAllJobsBriefInfo().stream().filter(each -> !each.getJobName().startsWith("_"));
    }

    private JobInfo getJobInfo(String jobName) {
        JobInfo result = new JobInfo(jobName);
        JobConfigurationPOJO jobConfigPOJO = this.getElasticJobConfigPOJO(result.getJobId());
        JobConfiguration jobConfig = this.getJobConfig(jobConfigPOJO);
        result.setActive(!jobConfigPOJO.isDisabled());
        result.setShardingTotalCount(jobConfig.getHandleConfig().getJobShardingCount());
        result.setTables(jobConfig.getHandleConfig().getLogicTables());
        result.setCreateTime(jobConfigPOJO.getProps().getProperty("create_time"));
        result.setStopTime(jobConfigPOJO.getProps().getProperty("stop_time"));
        result.setJobParameter(jobConfigPOJO.getJobParameter());
        return result;
    }

    public Optional<String> start(JobConfiguration jobConfig) {
        jobConfig.buildHandleConfig();
        if (jobConfig.getHandleConfig().getJobShardingCount() == 0) {
            log.warn("Invalid scaling job config!");
            throw new PipelineJobCreationException("handleConfig shardingTotalCount is 0");
        }
        log.info("Start scaling job by {}", (Object)jobConfig.getHandleConfig());
        GovernanceRepositoryAPI repositoryAPI = PipelineAPIFactory.getGovernanceRepositoryAPI();
        String jobId = jobConfig.getHandleConfig().getJobId();
        String jobConfigKey = String.format("%s/%s/config", "/scaling", jobId);
        if (repositoryAPI.isExisted(jobConfigKey)) {
            log.warn("jobId already exists in registry center, ignore, jobConfigKey={}", (Object)jobConfigKey);
            return Optional.of(jobId);
        }
        repositoryAPI.persist(String.format("%s/%s", "/scaling", jobId), RuleAlteredJob.class.getName());
        repositoryAPI.persist(jobConfigKey, this.createJobConfig(jobConfig));
        return Optional.of(jobId);
    }

    private String createJobConfig(JobConfiguration jobConfig) {
        JobConfigurationPOJO jobConfigPOJO = new JobConfigurationPOJO();
        jobConfigPOJO.setJobName(jobConfig.getHandleConfig().getJobId());
        jobConfigPOJO.setShardingTotalCount(jobConfig.getHandleConfig().getJobShardingCount());
        jobConfigPOJO.setJobParameter(YamlEngine.marshal((Object)jobConfig));
        jobConfigPOJO.getProps().setProperty("create_time", LocalDateTime.now().format(DATE_TIME_FORMATTER));
        return YamlEngine.marshal((Object)jobConfigPOJO);
    }

    public Map<Integer, JobProgress> getProgress(String jobId) {
        this.checkModeConfig();
        JobConfiguration jobConfig = this.getJobConfig(jobId);
        return this.getProgress(jobConfig);
    }

    public Map<Integer, JobProgress> getProgress(JobConfiguration jobConfig) {
        String jobId = jobConfig.getHandleConfig().getJobId();
        JobConfigurationPOJO jobConfigPOJO = this.getElasticJobConfigPOJO(jobId);
        return IntStream.range(0, jobConfig.getHandleConfig().getJobShardingCount()).boxed().collect(LinkedHashMap::new, (map, each) -> {
            JobProgress jobProgress = PipelineAPIFactory.getGovernanceRepositoryAPI().getJobProgress(jobId, (int)each);
            if (null != jobProgress) {
                jobProgress.setActive(!jobConfigPOJO.isDisabled());
            }
            map.put(each, jobProgress);
        }, HashMap::putAll);
    }

    private void verifyManualMode(JobConfiguration jobConfig) {
        RuleAlteredContext ruleAlteredContext = RuleAlteredJobWorker.createRuleAlteredContext(jobConfig);
        if (null != ruleAlteredContext.getCompletionDetectAlgorithm()) {
            throw new PipelineVerifyFailedException("It's not necessary to do it in auto mode.");
        }
    }

    private void verifyJobNotCompleted(JobConfiguration jobConfig) {
        if (RuleAlteredJobProgressDetector.isJobCompleted(jobConfig.getHandleConfig().getJobShardingCount(), this.getProgress(jobConfig).values())) {
            throw new PipelineVerifyFailedException("Job is completed, it's not necessary to do it.");
        }
    }

    private void verifySourceWritingStopped(JobConfiguration jobConfig) {
        String schemaName;
        LockContext lockContext = PipelineContext.getContextManager().getInstanceContext().getLockContext();
        ShardingSphereLock lock = lockContext.getSchemaLock(schemaName = jobConfig.getWorkflowConfig().getSchemaName()).orElse(null);
        if (null == lock || !lock.isLocked(schemaName)) {
            throw new PipelineVerifyFailedException("Source writing is not stopped. You could run `STOP SCALING SOURCE WRITING {jobId}` to stop it.");
        }
    }

    public void stopClusterWriteDB(String jobId) {
        this.checkModeConfig();
        log.info("stopClusterWriteDB for job {}", (Object)jobId);
        JobConfigurationPOJO jobConfigPOJO = this.getElasticJobConfigPOJO(jobId);
        JobConfiguration jobConfig = this.getJobConfig(jobConfigPOJO);
        this.verifyManualMode(jobConfig);
        this.verifyJobNotStopped(jobConfigPOJO);
        this.verifyJobNotCompleted(jobConfig);
        String schemaName = jobConfig.getWorkflowConfig().getSchemaName();
        this.stopClusterWriteDB(schemaName, jobId);
    }

    public void stopClusterWriteDB(String schemaName, String jobId) {
        LockContext lockContext = PipelineContext.getContextManager().getInstanceContext().getLockContext();
        ShardingSphereLock lock = lockContext.getOrCreateSchemaLock(schemaName);
        if (lock.isLocked(schemaName)) {
            log.info("stopClusterWriteDB, already stopped");
            return;
        }
        boolean tryLockSuccess = lock.tryLock(schemaName);
        log.info("stopClusterWriteDB, tryLockSuccess={}", (Object)tryLockSuccess);
        if (!tryLockSuccess) {
            throw new RuntimeException("Stop source writing failed");
        }
    }

    public void restoreClusterWriteDB(String jobId) {
        this.checkModeConfig();
        log.info("restoreClusterWriteDB for job {}", (Object)jobId);
        JobConfigurationPOJO jobConfigPOJO = this.getElasticJobConfigPOJO(jobId);
        JobConfiguration jobConfig = this.getJobConfig(jobConfigPOJO);
        this.verifyManualMode(jobConfig);
        String schemaName = jobConfig.getWorkflowConfig().getSchemaName();
        this.restoreClusterWriteDB(schemaName, jobId);
    }

    public void restoreClusterWriteDB(String schemaName, String jobId) {
        LockContext lockContext = PipelineContext.getContextManager().getInstanceContext().getLockContext();
        ShardingSphereLock lock = lockContext.getSchemaLock(schemaName).orElse(null);
        if (null == lock) {
            log.info("restoreClusterWriteDB, lock is null");
            return;
        }
        boolean isLocked = lock.isLocked(schemaName);
        if (!isLocked) {
            log.info("restoreClusterWriteDB, isLocked false, schemaName={}", (Object)schemaName);
            return;
        }
        log.info("restoreClusterWriteDB, before releaseLock, schemaName={}, jobId={}", (Object)schemaName, (Object)jobId);
        lock.releaseLock(schemaName);
    }

    public Collection<DataConsistencyCheckAlgorithmInfo> listDataConsistencyCheckAlgorithms() {
        this.checkModeConfig();
        return DATA_CONSISTENCY_CHECK_ALGORITHM_MAP.values().stream().map(each -> {
            DataConsistencyCheckAlgorithmInfo algorithmInfo = new DataConsistencyCheckAlgorithmInfo();
            algorithmInfo.setType(each.getType());
            algorithmInfo.setDescription(each.getDescription());
            algorithmInfo.setSupportedDatabaseTypes(each.getSupportedDatabaseTypes());
            algorithmInfo.setProvider(each.getProvider());
            return algorithmInfo;
        }).collect(Collectors.toList());
    }

    public boolean isDataConsistencyCheckNeeded(String jobId) {
        log.info("isDataConsistencyCheckNeeded for job {}", (Object)jobId);
        JobConfiguration jobConfig = this.getJobConfig(jobId);
        return this.isDataConsistencyCheckNeeded(jobConfig);
    }

    public boolean isDataConsistencyCheckNeeded(JobConfiguration jobConfig) {
        RuleAlteredContext ruleAlteredContext = RuleAlteredJobWorker.createRuleAlteredContext(jobConfig);
        return this.isDataConsistencyCheckNeeded(ruleAlteredContext);
    }

    private boolean isDataConsistencyCheckNeeded(RuleAlteredContext ruleAlteredContext) {
        return null != ruleAlteredContext.getDataConsistencyCheckAlgorithm();
    }

    private void verifyDataConsistencyCheck(JobConfigurationPOJO jobConfigPOJO, JobConfiguration jobConfig) {
        this.verifyManualMode(jobConfig);
        this.verifySourceWritingStopped(jobConfig);
    }

    public Map<String, DataConsistencyCheckResult> dataConsistencyCheck(String jobId) {
        this.checkModeConfig();
        log.info("Data consistency check for job {}", (Object)jobId);
        JobConfigurationPOJO jobConfigPOJO = this.getElasticJobConfigPOJO(jobId);
        JobConfiguration jobConfig = this.getJobConfig(jobConfigPOJO);
        this.verifyDataConsistencyCheck(jobConfigPOJO, jobConfig);
        return this.dataConsistencyCheck(jobConfig);
    }

    public Map<String, DataConsistencyCheckResult> dataConsistencyCheck(JobConfiguration jobConfig) {
        RuleAlteredContext ruleAlteredContext = RuleAlteredJobWorker.createRuleAlteredContext(jobConfig);
        if (!this.isDataConsistencyCheckNeeded(ruleAlteredContext)) {
            log.info("dataConsistencyCheckAlgorithm is not configured, data consistency check is ignored.");
            return Collections.emptyMap();
        }
        return this.dataConsistencyCheck0(jobConfig, ruleAlteredContext.getDataConsistencyCheckAlgorithm());
    }

    public Map<String, DataConsistencyCheckResult> dataConsistencyCheck(String jobId, String algorithmType) {
        this.checkModeConfig();
        log.info("Data consistency check for job {}, algorithmType: {}", (Object)jobId, (Object)algorithmType);
        JobConfigurationPOJO jobConfigPOJO = this.getElasticJobConfigPOJO(jobId);
        JobConfiguration jobConfig = this.getJobConfig(jobConfigPOJO);
        this.verifyDataConsistencyCheck(jobConfigPOJO, jobConfig);
        ShardingSphereAlgorithmConfiguration typedSPIConfig = new ShardingSphereAlgorithmConfiguration(algorithmType, new Properties());
        DataConsistencyCheckAlgorithm checkAlgorithm = (DataConsistencyCheckAlgorithm)ShardingSphereAlgorithmFactory.createAlgorithm((TypedSPIConfiguration)typedSPIConfig, DataConsistencyCheckAlgorithm.class);
        return this.dataConsistencyCheck0(jobConfig, checkAlgorithm);
    }

    private Map<String, DataConsistencyCheckResult> dataConsistencyCheck0(JobConfiguration jobConfig, DataConsistencyCheckAlgorithm checkAlgorithm) {
        String jobId = jobConfig.getHandleConfig().getJobId();
        DataConsistencyChecker dataConsistencyChecker = EnvironmentCheckerFactory.newInstance(jobConfig);
        Map<String, DataConsistencyCheckResult> result = dataConsistencyChecker.checkRecordsCount();
        if (result.values().stream().allMatch(DataConsistencyCheckResult::isRecordsCountMatched)) {
            Map<String, Boolean> contentCheckResult = dataConsistencyChecker.checkRecordsContent(checkAlgorithm);
            result.forEach((key, value) -> value.setRecordsContentMatched(contentCheckResult.getOrDefault(key, false).booleanValue()));
        }
        log.info("Scaling job {} with check algorithm '{}' data consistency checker result {}", new Object[]{jobId, checkAlgorithm.getClass().getName(), result});
        PipelineAPIFactory.getGovernanceRepositoryAPI().persistJobCheckResult(jobId, this.aggregateDataConsistencyCheckResults(jobId, result));
        return result;
    }

    public boolean aggregateDataConsistencyCheckResults(String jobId, Map<String, DataConsistencyCheckResult> checkResultMap) {
        if (checkResultMap.isEmpty()) {
            return false;
        }
        for (Map.Entry<String, DataConsistencyCheckResult> entry : checkResultMap.entrySet()) {
            boolean recordsCountMatched = entry.getValue().isRecordsCountMatched();
            boolean recordsContentMatched = entry.getValue().isRecordsContentMatched();
            if (recordsContentMatched && recordsCountMatched) continue;
            log.error("Scaling job: {}, table: {} data consistency check failed, recordsContentMatched: {}, recordsCountMatched: {}", new Object[]{jobId, entry.getKey(), recordsContentMatched, recordsCountMatched});
            return false;
        }
        return true;
    }

    public void switchClusterConfiguration(String jobId) {
        this.checkModeConfig();
        log.info("Switch cluster configuration for job {}", (Object)jobId);
        JobConfigurationPOJO jobConfigPOJO = this.getElasticJobConfigPOJO(jobId);
        JobConfiguration jobConfig = this.getJobConfig(jobConfigPOJO);
        this.verifyManualMode(jobConfig);
        this.verifyJobNotStopped(jobConfigPOJO);
        this.verifyJobNotCompleted(jobConfig);
        this.switchClusterConfiguration(jobConfig);
    }

    public void switchClusterConfiguration(JobConfiguration jobConfig) {
        Optional<Boolean> checkResultOptional;
        String jobId = jobConfig.getHandleConfig().getJobId();
        RuleAlteredContext ruleAlteredContext = RuleAlteredJobWorker.createRuleAlteredContext(jobConfig);
        GovernanceRepositoryAPI repositoryAPI = PipelineAPIFactory.getGovernanceRepositoryAPI();
        if (!(!this.isDataConsistencyCheckNeeded(ruleAlteredContext) || (checkResultOptional = repositoryAPI.getJobCheckResult(jobId)).isPresent() && checkResultOptional.get().booleanValue())) {
            throw new PipelineVerifyFailedException("Data consistency check is not finished or failed.");
        }
        WorkflowConfiguration workflowConfig = jobConfig.getWorkflowConfig();
        ScalingTaskFinishedEvent taskFinishedEvent = new ScalingTaskFinishedEvent(workflowConfig.getSchemaName(), workflowConfig.getActiveVersion().intValue(), workflowConfig.getNewVersion().intValue());
        ShardingSphereEventBus.getInstance().post((Object)taskFinishedEvent);
        for (int each : repositoryAPI.getShardingItems(jobId)) {
            repositoryAPI.updateShardingJobStatus(jobId, each, JobStatus.FINISHED);
        }
        this.stop(jobId);
        try {
            TimeUnit.SECONDS.sleep(1L);
        }
        catch (InterruptedException ex) {
            log.error(ex.getMessage());
        }
        RuleAlteredJobContext jobContext = new RuleAlteredJobContext(jobConfig);
        RuleAlteredJobPreparer jobPreparer = new RuleAlteredJobPreparer();
        jobPreparer.cleanup(jobContext);
        jobContext.close();
    }

    public void reset(String jobId) {
        this.checkModeConfig();
        log.info("Scaling job {} reset target table", (Object)jobId);
        JobConfigurationPOJO jobConfigPOJO = this.getElasticJobConfigPOJO(jobId);
        this.verifyJobStopped(jobConfigPOJO);
        try {
            new ScalingEnvironmentManager().cleanupTargetTables(this.getJobConfig(jobConfigPOJO));
        }
        catch (SQLException ex) {
            throw new PipelineJobExecutionException("Reset target table failed for job " + jobId);
        }
    }

    public JobConfiguration getJobConfig(String jobId) {
        return this.getJobConfig(this.getElasticJobConfigPOJO(jobId));
    }

    private JobConfiguration getJobConfig(JobConfigurationPOJO elasticJobConfigPOJO) {
        return (JobConfiguration)YamlEngine.unmarshal((String)elasticJobConfigPOJO.getJobParameter(), JobConfiguration.class, (boolean)true);
    }
}

