package com.alibaba.dts.client.executor.parallel.processor;

import com.alibaba.dts.client.executor.job.context.ClientContextImpl;
import com.alibaba.dts.client.executor.parallel.unit.ExecutorUnit;
import com.alibaba.dts.common.constants.Constants;
import com.alibaba.dts.common.domain.ExecutableTask;
import com.alibaba.dts.common.domain.result.Result;
import com.alibaba.dts.common.domain.store.TaskSnapshot;
import com.alibaba.dts.common.logger.SchedulerXLoggerFactory;
import com.alibaba.dts.common.logger.innerlog.Logger;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import org.springframework.util.CollectionUtils;

/* loaded from: input_file:com/alibaba/dts/client/executor/parallel/processor/PullProcessor.class */
public class PullProcessor extends Thread implements Constants {
    private static final Logger logger = SchedulerXLoggerFactory.getLogger((Class<?>) PullProcessor.class);
    private ExecutorUnit executorUnit;
    private volatile boolean stop = false;
    private final ClientContextImpl clientContext;

    public PullProcessor(ClientContextImpl clientContextImpl, ExecutorUnit executorUnit) {
        this.clientContext = clientContextImpl;
        this.executorUnit = executorUnit;
        super.setName(Constants.PULL_TASK_THREAD_NAME + this.executorUnit.getExecutableTask().getJob().getId() + "-" + executorUnit.getExecutableTask().getJob().getJobProcessor() + "-" + executorUnit.getExecutableTask().getJobInstanceSnapshot().getId() + "-" + executorUnit.getExecutableTask().getJobInstanceSnapshot().getFireTime() + "-" + executorUnit.getExecutableTask().getJobInstanceSnapshot().getRetryCount());
    }

    public void refresh(ExecutorUnit executorUnit) {
        this.executorUnit = executorUnit;
        super.setName(Constants.PULL_TASK_THREAD_NAME + this.executorUnit.getExecutableTask().getJob().getId() + "-" + executorUnit.getExecutableTask().getJob().getJobProcessor() + "-" + executorUnit.getExecutableTask().getJobInstanceSnapshot().getId() + "-" + executorUnit.getExecutableTask().getJobInstanceSnapshot().getFireTime() + "-" + executorUnit.getExecutableTask().getJobInstanceSnapshot().getRetryCount());
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        try {
            try {
                BlockingQueue<TaskSnapshot> queue = this.executorUnit.getQueue();
                while (!this.stop) {
                    try {
                        pullAndPut(queue);
                    } catch (Throwable th) {
                        logger.error("[PullProcessor]: pullAndPut error, instanceId:" + this.executorUnit.getExecutableTask().getJobInstanceSnapshot().getId(), th);
                    }
                }
                if (this.clientContext.getClientConfig().isFinishLog()) {
                    logger.warn("[PullProcessor]: finally stopTask, instanceId:" + this.executorUnit.getExecutableTask().getJobInstanceSnapshot().getId() + ", stop:" + isStop());
                }
            } catch (Throwable th2) {
                logger.error("[PullProcessor]: run error, instanceId:" + this.executorUnit.getExecutableTask().getJobInstanceSnapshot().getId(), th2);
                if (this.clientContext.getClientConfig().isFinishLog()) {
                    logger.warn("[PullProcessor]: finally stopTask, instanceId:" + this.executorUnit.getExecutableTask().getJobInstanceSnapshot().getId() + ", stop:" + isStop());
                }
            }
        } catch (Throwable th3) {
            if (this.clientContext.getClientConfig().isFinishLog()) {
                logger.warn("[PullProcessor]: finally stopTask, instanceId:" + this.executorUnit.getExecutableTask().getJobInstanceSnapshot().getId() + ", stop:" + isStop());
            }
            throw th3;
        }
    }

    private void pullAndPut(BlockingQueue<TaskSnapshot> blockingQueue) {
        Result<ExecutableTask> result = null;
        try {
            result = this.clientContext.getExecutor().pull(this.executorUnit.getExecutableTask());
        } catch (Throwable th) {
            logger.error("[PullProcessor]: pull error, instanceId:" + this.executorUnit.getExecutableTask().getJobInstanceSnapshot().getId(), th);
        }
        if (null == result) {
            logger.error("[PullProcessor]: pullResult is null, instanceId:" + this.executorUnit.getExecutableTask().getJobInstanceSnapshot().getId());
            try {
                Thread.sleep(10000L);
                return;
            } catch (Throwable th2) {
                logger.error("[PullProcessor]: pullResult sleep error, executorUnit:" + this.executorUnit, th2);
                return;
            }
        }
        ExecutableTask data = result.getData();
        if (null != data) {
            List<TaskSnapshot> taskSnapshotList = data.getTaskSnapshotList();
            if (CollectionUtils.isEmpty(taskSnapshotList)) {
                logger.warn("[PullProcessor]: taskSnapshotList is empty error, instanceId:" + this.executorUnit.getExecutableTask().getJobInstanceSnapshot().getId());
                try {
                    Thread.sleep(500L);
                    return;
                } catch (Throwable th3) {
                    logger.error("[PullProcessor]: PULL_TASK_SUCCESS Empty sleep error, instanceId:" + this.executorUnit.getExecutableTask().getJobInstanceSnapshot().getId(), th3);
                    return;
                }
            }
            for (TaskSnapshot taskSnapshot : taskSnapshotList) {
                try {
                    blockingQueue.put(taskSnapshot);
                } catch (Throwable th4) {
                    logger.error("[PullProcessor]: put error, instanceId:" + taskSnapshot.getJobInstanceId() + ", id:" + taskSnapshot.getId(), th4);
                }
            }
            try {
                Thread.sleep(500L);
                return;
            } catch (Throwable th5) {
                logger.error("[PullProcessor]: PULL_TASK_SUCCESS sleep error, instanceId:" + this.executorUnit.getExecutableTask().getJobInstanceSnapshot().getId(), th5);
                return;
            }
        }
        switch (result.getResultCode()) {
            case PULL_TASK_LIST_OVER:
                try {
                    Thread.sleep(this.clientContext.getClientConfig().getPullTaskListOverSleepTime());
                    return;
                } catch (Throwable th6) {
                    logger.error("[PullProcessor]: PULL_TASK_LIST_OVER sleep error, instanceId:" + this.executorUnit.getExecutableTask().getJobInstanceSnapshot().getId(), th6);
                    return;
                }
            case PULL_TASK_GET_LOCK_FAILURE:
                try {
                    Thread.sleep(500L);
                    return;
                } catch (Throwable th7) {
                    logger.error("[PullProcessor]: PULL_TASK_GET_LOCK_FAILURE sleep error, instanceId:" + this.executorUnit.getExecutableTask().getJobInstanceSnapshot().getId(), th7);
                    return;
                }
            case PULL_OVER:
                try {
                    Thread.sleep(10000L);
                } catch (Throwable th8) {
                    logger.error("[PullProcessor]: PULL_OVER sleep before error, executorUnit:" + this.executorUnit, th8);
                }
                try {
                    try {
                        this.executorUnit.getParallelPool().stopTask(this.executorUnit.getExecutableTask().getJob().getId(), this.executorUnit.getExecutableTask().getJobInstanceSnapshot().getId());
                        this.executorUnit.stopTask();
                        if (this.clientContext.getClientConfig().isFinishLog()) {
                            logger.warn("[PullProcessor]: PULL_OVER EXIT, instanceId:" + this.executorUnit.getExecutableTask().getJobInstanceSnapshot().getId() + ", stop:" + isStop());
                        }
                    } catch (Throwable th9) {
                        logger.error("[PullProcessor]: PULL_OVER error, instanceId:" + this.executorUnit.getExecutableTask().getJobInstanceSnapshot().getId(), th9);
                        if (this.clientContext.getClientConfig().isFinishLog()) {
                            logger.warn("[PullProcessor]: PULL_OVER EXIT, instanceId:" + this.executorUnit.getExecutableTask().getJobInstanceSnapshot().getId() + ", stop:" + isStop());
                        }
                    }
                    try {
                        Thread.sleep(10000L);
                        return;
                    } catch (Throwable th10) {
                        logger.error("[PullProcessor]: PULL_OVER sleep after error, executorUnit:" + this.executorUnit, th10);
                        return;
                    }
                } catch (Throwable th11) {
                    if (this.clientContext.getClientConfig().isFinishLog()) {
                        logger.warn("[PullProcessor]: PULL_OVER EXIT, instanceId:" + this.executorUnit.getExecutableTask().getJobInstanceSnapshot().getId() + ", stop:" + isStop());
                    }
                    throw th11;
                }
            default:
                logger.error("[PullProcessor]: executableTask is null, instanceId:" + this.executorUnit.getExecutableTask().getJobInstanceSnapshot().getId() + ", pullResult:" + result.toString());
                try {
                    Thread.sleep(500L);
                    return;
                } catch (Throwable th12) {
                    logger.error("[PullProcessor]: PULL_TASK_SUCCESS null sleep error, instanceId:" + this.executorUnit.getExecutableTask().getJobInstanceSnapshot().getId(), th12);
                    return;
                }
        }
    }

    public boolean isStop() {
        return this.stop;
    }

    public void setStop(boolean z) {
        this.stop = z;
    }
}
