/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.dts.client.executor.parallel.processor;

import com.alibaba.dts.client.executor.job.context.ClientContextImpl;
import com.alibaba.dts.client.executor.parallel.ParallelPool;
import com.alibaba.dts.client.executor.parallel.unit.ExecutorUnit;
import com.alibaba.dts.common.constants.Constants;
import com.alibaba.dts.common.domain.ExecutableTask;
import com.alibaba.dts.common.domain.result.Result;
import com.alibaba.dts.common.domain.store.TaskSnapshot;
import com.alibaba.dts.common.logger.SchedulerXLoggerFactory;
import com.alibaba.dts.common.logger.innerlog.Logger;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import org.springframework.util.CollectionUtils;

public class PullProcessor
extends Thread
implements Constants {
    private static final Logger logger = SchedulerXLoggerFactory.getLogger(PullProcessor.class);
    private ExecutorUnit executorUnit;
    private volatile boolean stop = false;
    private final ClientContextImpl clientContext;

    public PullProcessor(ClientContextImpl clientContext, ExecutorUnit executorUnit) {
        this.clientContext = clientContext;
        this.executorUnit = executorUnit;
        super.setName("DtsPullProcessor-" + this.executorUnit.getExecutableTask().getJob().getId() + "-" + executorUnit.getExecutableTask().getJob().getJobProcessor() + "-" + executorUnit.getExecutableTask().getJobInstanceSnapshot().getId() + "-" + executorUnit.getExecutableTask().getJobInstanceSnapshot().getFireTime() + "-" + executorUnit.getExecutableTask().getJobInstanceSnapshot().getRetryCount());
    }

    public void refresh(ExecutorUnit executorUnit) {
        this.executorUnit = executorUnit;
        super.setName("DtsPullProcessor-" + this.executorUnit.getExecutableTask().getJob().getId() + "-" + executorUnit.getExecutableTask().getJob().getJobProcessor() + "-" + executorUnit.getExecutableTask().getJobInstanceSnapshot().getId() + "-" + executorUnit.getExecutableTask().getJobInstanceSnapshot().getFireTime() + "-" + executorUnit.getExecutableTask().getJobInstanceSnapshot().getRetryCount());
    }

    @Override
    public void run() {
        try {
            BlockingQueue<TaskSnapshot> queue = this.executorUnit.getQueue();
            while (!this.stop) {
                try {
                    this.pullAndPut(queue);
                }
                catch (Throwable e) {
                    logger.error("[PullProcessor]: pullAndPut error, instanceId:" + this.executorUnit.getExecutableTask().getJobInstanceSnapshot().getId(), e);
                }
            }
        }
        catch (Throwable e) {
            logger.error("[PullProcessor]: run error, instanceId:" + this.executorUnit.getExecutableTask().getJobInstanceSnapshot().getId(), e);
        }
        finally {
            if (this.clientContext.getClientConfig().isFinishLog()) {
                logger.warn("[PullProcessor]: finally stopTask, instanceId:" + this.executorUnit.getExecutableTask().getJobInstanceSnapshot().getId() + ", stop:" + this.isStop());
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void pullAndPut(BlockingQueue<TaskSnapshot> queue) {
        ExecutableTask executableTaskResult;
        block34: {
            Result<ExecutableTask> pullResult = null;
            try {
                pullResult = this.clientContext.getExecutor().pull(this.executorUnit.getExecutableTask());
            }
            catch (Throwable e) {
                logger.error("[PullProcessor]: pull error, instanceId:" + this.executorUnit.getExecutableTask().getJobInstanceSnapshot().getId(), e);
            }
            if (null == pullResult) {
                logger.error("[PullProcessor]: pullResult is null, instanceId:" + this.executorUnit.getExecutableTask().getJobInstanceSnapshot().getId());
                try {
                    Thread.sleep(10000L);
                }
                catch (Throwable e) {
                    logger.error("[PullProcessor]: pullResult sleep error, executorUnit:" + this.executorUnit, e);
                }
                return;
            }
            executableTaskResult = pullResult.getData();
            if (null != executableTaskResult) break block34;
            switch (pullResult.getResultCode()) {
                case PULL_TASK_LIST_OVER: {
                    try {
                        Thread.sleep(this.clientContext.getClientConfig().getPullTaskListOverSleepTime());
                    }
                    catch (Throwable e) {
                        logger.error("[PullProcessor]: PULL_TASK_LIST_OVER sleep error, instanceId:" + this.executorUnit.getExecutableTask().getJobInstanceSnapshot().getId(), e);
                    }
                    break;
                }
                case PULL_TASK_GET_LOCK_FAILURE: {
                    try {
                        Thread.sleep(500L);
                    }
                    catch (Throwable e) {
                        logger.error("[PullProcessor]: PULL_TASK_GET_LOCK_FAILURE sleep error, instanceId:" + this.executorUnit.getExecutableTask().getJobInstanceSnapshot().getId(), e);
                    }
                    break;
                }
                case PULL_OVER: {
                    try {
                        Thread.sleep(10000L);
                    }
                    catch (Throwable e) {
                        logger.error("[PullProcessor]: PULL_OVER sleep before error, executorUnit:" + this.executorUnit, e);
                    }
                    try {
                        ParallelPool parallelPool = this.executorUnit.getParallelPool();
                        parallelPool.stopTask(this.executorUnit.getExecutableTask().getJob().getId(), this.executorUnit.getExecutableTask().getJobInstanceSnapshot().getId());
                        this.executorUnit.stopTask();
                    }
                    catch (Throwable e) {
                        logger.error("[PullProcessor]: PULL_OVER error, instanceId:" + this.executorUnit.getExecutableTask().getJobInstanceSnapshot().getId(), e);
                    }
                    finally {
                        if (this.clientContext.getClientConfig().isFinishLog()) {
                            logger.warn("[PullProcessor]: PULL_OVER EXIT, instanceId:" + this.executorUnit.getExecutableTask().getJobInstanceSnapshot().getId() + ", stop:" + this.isStop());
                        }
                    }
                    try {
                        Thread.sleep(10000L);
                    }
                    catch (Throwable e) {
                        logger.error("[PullProcessor]: PULL_OVER sleep after error, executorUnit:" + this.executorUnit, e);
                    }
                    break;
                }
                default: {
                    logger.error("[PullProcessor]: executableTask is null, instanceId:" + this.executorUnit.getExecutableTask().getJobInstanceSnapshot().getId() + ", pullResult:" + pullResult.toString());
                    try {
                        Thread.sleep(500L);
                        break;
                    }
                    catch (Throwable e) {
                        logger.error("[PullProcessor]: PULL_TASK_SUCCESS null sleep error, instanceId:" + this.executorUnit.getExecutableTask().getJobInstanceSnapshot().getId(), e);
                    }
                }
            }
            return;
        }
        List<TaskSnapshot> taskSnapshotList = executableTaskResult.getTaskSnapshotList();
        if (CollectionUtils.isEmpty(taskSnapshotList)) {
            logger.warn("[PullProcessor]: taskSnapshotList is empty error, instanceId:" + this.executorUnit.getExecutableTask().getJobInstanceSnapshot().getId());
            try {
                Thread.sleep(500L);
            }
            catch (Throwable e) {
                logger.error("[PullProcessor]: PULL_TASK_SUCCESS Empty sleep error, instanceId:" + this.executorUnit.getExecutableTask().getJobInstanceSnapshot().getId(), e);
            }
            return;
        }
        for (TaskSnapshot taskSnapshot : taskSnapshotList) {
            try {
                queue.put(taskSnapshot);
            }
            catch (Throwable e) {
                logger.error("[PullProcessor]: put error, instanceId:" + taskSnapshot.getJobInstanceId() + ", id:" + taskSnapshot.getId(), e);
            }
        }
        try {
            Thread.sleep(500L);
        }
        catch (Throwable e) {
            logger.error("[PullProcessor]: PULL_TASK_SUCCESS sleep error, instanceId:" + this.executorUnit.getExecutableTask().getJobInstanceSnapshot().getId(), e);
        }
    }

    public boolean isStop() {
        return this.stop;
    }

    public void setStop(boolean stop) {
        this.stop = stop;
    }
}

