/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.dts.client.executor.longtime.processor;

import com.alibaba.dts.client.executor.job.context.ClientContextImpl;
import com.alibaba.dts.client.executor.longtime.LongTimePool;
import com.alibaba.dts.client.executor.longtime.unit.ExecutorUnit;
import com.alibaba.dts.common.constants.Constants;
import com.alibaba.dts.common.domain.ExecutableTask;
import com.alibaba.dts.common.domain.result.Result;
import com.alibaba.dts.common.domain.store.TaskSnapshot;
import com.alibaba.dts.common.logger.SchedulerXLoggerFactory;
import com.alibaba.dts.common.logger.innerlog.Logger;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import org.springframework.util.CollectionUtils;

public class PullProcessor
extends Thread
implements Constants {
    private static final Logger logger = SchedulerXLoggerFactory.getLogger(PullProcessor.class);
    private int retryTimes = 0;
    private static final int MAX_RETRY_TIMES = 5;
    private static final long PULL_LOCK_SLEEP_TIME_INTERVAL = 150L;
    private static final long PULL_EMPTY_SLEEP_TIME_INTERVAL = 2000L;
    private static final long PULL_FAIL_SLEEP_TIME_INTERVAL = 2000L;
    private static final long PULL_SUCCESS_SLEEP_TIME_INTERVAL = 150L;
    private static final long PULL_SLEEP_TIME_INTERVAL = 10L;
    private ExecutorUnit executorUnit;
    private volatile boolean stop = false;
    private final ClientContextImpl clientContext;

    public PullProcessor(ClientContextImpl clientContext, ExecutorUnit executorUnit) {
        this.clientContext = clientContext;
        this.executorUnit = executorUnit;
        super.setName("LongTimeDtsPullProcessor-" + this.executorUnit.getExecutableTask().getJob().getId() + "-" + executorUnit.getExecutableTask().getJob().getJobProcessor() + "-" + executorUnit.getExecutableTask().getJobInstanceSnapshot().getId() + "-" + executorUnit.getExecutableTask().getJobInstanceSnapshot().getFireTime() + "-" + executorUnit.getExecutableTask().getJobInstanceSnapshot().getRetryCount());
    }

    public void refresh(ExecutorUnit executorUnit) {
        this.executorUnit = executorUnit;
        super.setName("LongTimeDtsPullProcessor-" + this.executorUnit.getExecutableTask().getJob().getId() + "-" + executorUnit.getExecutableTask().getJob().getJobProcessor() + "-" + executorUnit.getExecutableTask().getJobInstanceSnapshot().getId() + "-" + executorUnit.getExecutableTask().getJobInstanceSnapshot().getFireTime() + "-" + executorUnit.getExecutableTask().getJobInstanceSnapshot().getRetryCount());
    }

    @Override
    public void run() {
        try {
            this.retryTimes = 0;
            this.executorUnit.setPullTaskFlag(true);
            BlockingQueue<TaskSnapshot> queue = this.executorUnit.getQueue();
            while (!this.stop) {
                if (!this.executorUnit.isReleaseTaskFlag()) {
                    try {
                        this.pullAndPut(queue);
                    }
                    catch (Throwable e) {
                        logger.error("[LPullProcessor]: pullAndPut error, instanceId:" + this.executorUnit.getExecutableTask().getJobInstanceSnapshot().getId(), e);
                    }
                    continue;
                }
                try {
                    Thread.sleep(10L);
                }
                catch (Throwable e) {
                    logger.error("[LPullProcessor]: ReleaseTaskLock not release, executorUnit:" + this.executorUnit, e);
                }
            }
        }
        catch (Throwable e) {
            logger.error("[LPullProcessor]: run error, instanceId:" + this.executorUnit.getExecutableTask().getJobInstanceSnapshot().getId(), e);
        }
        finally {
            this.executorUnit.setPullTaskFlag(false);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void pullAndPut(BlockingQueue<TaskSnapshot> queue) {
        Result<ExecutableTask> pullResult = null;
        try {
            pullResult = this.clientContext.getExecutor().pullLongTimeTask(this.executorUnit.getExecutableTask());
        }
        catch (Throwable e) {
            logger.error("[LPullProcessor]: pull error, instanceId:" + this.executorUnit.getExecutableTask().getJobInstanceSnapshot().getId(), e);
        }
        if (null == pullResult) {
            logger.error("[LPullProcessor]: pullResult is null, instanceId:" + this.executorUnit.getExecutableTask().getJobInstanceSnapshot().getId());
            try {
                Thread.sleep(2000L);
            }
            catch (Throwable e) {
                logger.error("[LPullProcessor]: pullResult sleep error, executorUnit:" + this.executorUnit, e);
            }
            return;
        }
        ExecutableTask executableTaskResult = pullResult.getData();
        if (null == executableTaskResult) {
            switch (pullResult.getResultCode()) {
                case PULL_TASK_LIST_OVER: {
                    ++this.retryTimes;
                    try {
                        Thread.sleep(2000L);
                    }
                    catch (Throwable e) {
                        logger.error("[LPullProcessor]: PULL_TASK_LIST_OVER sleep error, instanceId:" + this.executorUnit.getExecutableTask().getJobInstanceSnapshot().getId(), e);
                    }
                    if (this.retryTimes <= 5) break;
                    this.setStop(true);
                    break;
                }
                case PULL_TASK_GET_LOCK_FAILURE: {
                    try {
                        Thread.sleep(150L);
                    }
                    catch (Throwable e) {
                        logger.error("[LPullProcessor]: PULL_TASK_GET_LOCK_FAILURE sleep error, instanceId:" + this.executorUnit.getExecutableTask().getJobInstanceSnapshot().getId(), e);
                    }
                    break;
                }
                case PULL_OVER: {
                    ++this.retryTimes;
                    try {
                        Thread.sleep(2000L);
                    }
                    catch (Throwable e) {
                        logger.error("[LPullProcessor]: PULL_OVER sleep before error, executorUnit:" + this.executorUnit, e);
                    }
                    try {
                        if (this.retryTimes > 0) {
                            this.setStop(true);
                        }
                        LongTimePool longTimePool = this.executorUnit.getLongTimePool();
                        longTimePool.stopTask(this.executorUnit.getExecutableTask().getJob().getId(), this.executorUnit.getExecutableTask().getJobInstanceSnapshot().getId());
                        this.executorUnit.stopTask();
                        break;
                    }
                    catch (Throwable e) {
                        logger.error("[LPullProcessor]: PULL_OVER error, instanceId:" + this.executorUnit.getExecutableTask().getJobInstanceSnapshot().getId(), e);
                        break;
                    }
                    finally {
                        if (this.clientContext.getClientConfig().isFinishLog()) {
                            logger.info("[LPullProcessor]: PULL_OVER EXIT, instanceId:" + this.executorUnit.getExecutableTask().getJobInstanceSnapshot().getId());
                        }
                    }
                }
                default: {
                    logger.error("[LPullProcessor]: executableTask is null, instanceId:" + this.executorUnit.getExecutableTask().getJobInstanceSnapshot().getId() + ", pullResult:" + pullResult.toString());
                }
            }
            return;
        }
        List<TaskSnapshot> taskSnapshotList = executableTaskResult.getTaskSnapshotList();
        if (CollectionUtils.isEmpty(taskSnapshotList)) {
            logger.warn("[LPullProcessor]: taskSnapshotList is empty error, instanceId:" + this.executorUnit.getExecutableTask().getJobInstanceSnapshot().getId());
            return;
        }
        for (TaskSnapshot taskSnapshot : taskSnapshotList) {
            try {
                if (this.executorUnit.isExistsInTaskRunStatisticMap(taskSnapshot.getId())) {
                    this.executorUnit.updateTaskRunStatisticMap(taskSnapshot.getId(), 0L, 1);
                    logger.info("[LPullProcessor] pull duplicate task, instanceid:" + taskSnapshot.getJobInstanceId() + ",taskid(db):" + taskSnapshot.getId());
                    continue;
                }
                queue.put(taskSnapshot);
                logger.info("[LPullProcessor] pull task, instanceid:" + taskSnapshot.getJobInstanceId() + ",taskid(db):" + taskSnapshot.getId());
            }
            catch (Throwable e) {
                logger.error("[LPullProcessor]: put error, instanceId:" + taskSnapshot.getJobInstanceId() + ", id:" + taskSnapshot.getId(), e);
            }
        }
        try {
            Thread.sleep(150L);
        }
        catch (Throwable e) {
            logger.error("[LPullProcessor]: PULL_TASK_SUCCESS sleep error, instanceId:" + this.executorUnit.getExecutableTask().getJobInstanceSnapshot().getId(), e);
        }
    }

    public boolean isStop() {
        return this.stop;
    }

    public void setStop(boolean stop) {
        this.stop = stop;
    }
}

