/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.dts.client.executor.longtime.unit;

import com.alibaba.dts.client.executor.job.context.ClientContextImpl;
import com.alibaba.dts.client.executor.longtime.LongTimePool;
import com.alibaba.dts.client.executor.longtime.processor.LongTimeTaskProcessor;
import com.alibaba.dts.client.executor.longtime.processor.PullProcessor;
import com.alibaba.dts.client.executor.longtime.processor.ReFillingProcessor;
import com.alibaba.dts.client.executor.longtime.unit.StatesReportTimer;
import com.alibaba.dts.client.executor.longtime.unit.TaskRunStatistic;
import com.alibaba.dts.common.constants.Constants;
import com.alibaba.dts.common.domain.ExecutableTask;
import com.alibaba.dts.common.domain.result.Result;
import com.alibaba.dts.common.domain.store.TaskSnapshot;
import com.alibaba.dts.common.exception.InitException;
import com.alibaba.dts.common.logger.SchedulerXLoggerFactory;
import com.alibaba.dts.common.logger.innerlog.Logger;
import java.util.Date;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.springframework.util.CollectionUtils;

public class ExecutorUnit
implements Constants {
    private static final Logger logger = SchedulerXLoggerFactory.getLogger(ExecutorUnit.class);
    private volatile boolean releaseTaskFlag = false;
    private volatile boolean pullTaskFlag = false;
    private final ClientContextImpl clientContext;
    private ExecutableTask executableTask;
    private PullProcessor pullProcessor = null;
    private ReFillingProcessor reFillingProcessor = null;
    private BlockingQueue<TaskSnapshot> queue = null;
    private BlockingQueue<TaskSnapshot> completedqueue = null;
    private LongTimeTaskProcessor[] longTimeTaskProcessors = null;
    private ConcurrentHashMap<Long, TaskRunStatistic> taskRunStatisticMap = new ConcurrentHashMap();
    private BlockingQueue<TaskSnapshot> releaseQueue = new LinkedBlockingQueue<TaskSnapshot>();
    private final AtomicInteger threadCounter = new AtomicInteger();
    private final LongTimePool longTimePool;
    private ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1, new ThreadFactory(){

        @Override
        public Thread newThread(Runnable runnable) {
            return new Thread(runnable, "DTS-LongTimeTaskStates-report-thread-" + ExecutorUnit.this.executableTask.getJob().getId() + "-" + ExecutorUnit.this.executableTask.getJob().getJobProcessor() + "-" + ExecutorUnit.this.executableTask.getJobInstanceSnapshot().getId());
        }
    });

    public ConcurrentHashMap<Long, TaskRunStatistic> getTaskRunStatisticMap() {
        return this.taskRunStatisticMap;
    }

    public ExecutorUnit(ClientContextImpl clientContext, LongTimePool longTimePool, ExecutableTask executableTask) {
        this.clientContext = clientContext;
        this.longTimePool = longTimePool;
        this.executableTask = executableTask;
        int pageSize = clientContext.getClientConfig().getPageSize();
        Map<String, Integer> pageSizeMap = clientContext.getClientConfig().getPageSizeMap();
        if (!CollectionUtils.isEmpty(pageSizeMap) && pageSizeMap.get(executableTask.getJob().getJobProcessor()) != null) {
            pageSize = clientContext.getClientConfig().checkPageSize(pageSizeMap.get(executableTask.getJob().getJobProcessor()));
        }
        this.executableTask.setLength(1);
        this.releaseQueue = new LinkedBlockingQueue<TaskSnapshot>(10000);
    }

    public boolean isPullTaskFlag() {
        return this.pullTaskFlag;
    }

    public void setPullTaskFlag(boolean pullTaskLock) {
        this.pullTaskFlag = pullTaskLock;
    }

    public boolean isReleaseTaskFlag() {
        return this.releaseTaskFlag;
    }

    public void setReleaseTaskFlag(boolean releaseTaskLock) {
        this.releaseTaskFlag = releaseTaskLock;
    }

    public boolean isExistsInTaskRunStatisticMap(Long taskid) {
        return this.taskRunStatisticMap.containsKey(taskid);
    }

    public void addTaskRunStatisticMap(Long taskid) {
        this.taskRunStatisticMap.put(taskid, new TaskRunStatistic(taskid));
    }

    public void addTaskRunStatisticMap(Long taskid, int lastProcessResult) {
        this.taskRunStatisticMap.put(taskid, new TaskRunStatistic(taskid, lastProcessResult));
    }

    public void updateTaskRunStatisticMap(Long taskid, Long runtime) {
        try {
            TaskRunStatistic taskRunStatistic = this.taskRunStatisticMap.get(taskid);
            taskRunStatistic.setRuntimes(taskRunStatistic.getRuntimes() + 1);
            taskRunStatistic.setLastTimeConsuming(runtime);
            taskRunStatistic.setLastRunTime(new Date());
            this.taskRunStatisticMap.put(taskid, taskRunStatistic);
        }
        catch (Throwable e) {
            logger.error("[LExecutorUnit]: updateTaskRunStatisticMap error, taskid:" + taskid + ", runtime:" + runtime, e);
        }
    }

    public void updateTaskRunStatisticMap(Long taskid, Long runtime, int lastProcessResult) {
        try {
            TaskRunStatistic taskRunStatistic = this.taskRunStatisticMap.get(taskid);
            taskRunStatistic.setRuntimes(taskRunStatistic.getRuntimes() + 1);
            taskRunStatistic.setLastTimeConsuming(runtime);
            taskRunStatistic.setLastRunTime(new Date());
            taskRunStatistic.setProcessResult(lastProcessResult);
            this.taskRunStatisticMap.put(taskid, taskRunStatistic);
        }
        catch (Throwable e) {
            logger.error("[LExecutorUnit]: updateTaskRunStatisticMap error, taskid:" + taskid + ", runtime:" + runtime + ", lastProcessResult:" + lastProcessResult, e);
        }
    }

    public void deleteTaskRunStatisticMap(Long taskid) {
        try {
            this.taskRunStatisticMap.remove(taskid);
        }
        catch (Throwable e) {
            logger.error("[LExecutorUnit]: deleteTaskRunStatisticMap error, taskid:" + taskid, e);
        }
    }

    public String getTaskRunStatisticMapStr() {
        StringBuffer taskRunStatisticStr = new StringBuffer();
        taskRunStatisticStr.append("[");
        for (TaskRunStatistic taskRunStatistic : this.taskRunStatisticMap.values()) {
            taskRunStatisticStr.append("," + taskRunStatistic.toString());
        }
        taskRunStatisticStr.append("]");
        return taskRunStatisticStr.toString();
    }

    public void refresh(ExecutableTask executableTask) {
        this.executableTask = executableTask;
        int pageSize = this.clientContext.getClientConfig().getPageSize();
        Map<String, Integer> pageSizeMap = this.clientContext.getClientConfig().getPageSizeMap();
        if (!CollectionUtils.isEmpty(pageSizeMap) && pageSizeMap.get(executableTask.getJob().getJobProcessor()) != null) {
            pageSize = this.clientContext.getClientConfig().checkPageSize(pageSizeMap.get(executableTask.getJob().getJobProcessor()));
        }
        this.executableTask.setLength(pageSize);
        for (int i = 0; i < this.longTimeTaskProcessors.length; ++i) {
            this.longTimeTaskProcessors[i].refresh(this, i);
        }
        this.pullProcessor.refresh(this);
    }

    public void init() throws InitException {
        this.pullProcessor = new PullProcessor(this.clientContext, this);
        this.reFillingProcessor = new ReFillingProcessor(this.clientContext, this);
        this.queue = new LinkedBlockingQueue<TaskSnapshot>(10000);
        this.completedqueue = new LinkedBlockingQueue<TaskSnapshot>(10000);
        this.pullProcessor.start();
        this.reFillingProcessor.start();
        this.initTaskProcessors();
        this.initStatesReportTimer();
    }

    public void initStatesReportTimer() throws InitException {
        try {
            this.executorService = Executors.newScheduledThreadPool(1, new ThreadFactory(){

                @Override
                public Thread newThread(Runnable runnable) {
                    return new Thread(runnable, "DTS-LongTimeTaskStates-report-thread-" + ExecutorUnit.this.executableTask.getJob().getId() + "-" + ExecutorUnit.this.executableTask.getJob().getJobProcessor() + "-" + ExecutorUnit.this.executableTask.getJobInstanceSnapshot().getId());
                }
            });
            this.executorService.scheduleAtFixedRate(new StatesReportTimer(this, this.clientContext), 60000L, 120000L, TimeUnit.MILLISECONDS);
        }
        catch (Throwable e) {
            throw new InitException("[ExecutorUnit]: initStatesReportTimer error", e);
        }
    }

    public void activeInit() throws InitException {
        this.pullProcessor = new PullProcessor(this.clientContext, this);
        this.reFillingProcessor = new ReFillingProcessor(this.clientContext, this);
        this.queue = new LinkedBlockingQueue<TaskSnapshot>(10000);
        this.completedqueue = new LinkedBlockingQueue<TaskSnapshot>(10000);
        this.pullProcessor.setStop(true);
        if (this.reFillingProcessor.isStop() || !this.reFillingProcessor.isAlive()) {
            this.reFillingProcessor = new ReFillingProcessor(this.clientContext, this);
            this.reFillingProcessor.start();
        }
        this.initTaskProcessors();
    }

    private void initTaskProcessors() {
        try {
            int consumerThreads = this.clientContext.getClientConfig().getConsumerThreads();
            Map<String, Integer> consumerThreadsMap = this.clientContext.getClientConfig().getConsumerThreadsMap();
            if (!CollectionUtils.isEmpty(consumerThreadsMap) && consumerThreadsMap.get(this.executableTask.getJob().getJobProcessor()) != null) {
                consumerThreads = this.clientContext.getClientConfig().checkConsumerThreads(consumerThreadsMap.get(this.executableTask.getJob().getJobProcessor()));
            }
            if (this.executableTask.getRunThreads() > 0) {
                consumerThreads = this.executableTask.getRunThreads();
            }
            this.longTimeTaskProcessors = new LongTimeTaskProcessor[consumerThreads];
            for (int i = 0; i < consumerThreads; ++i) {
                this.longTimeTaskProcessors[i] = new LongTimeTaskProcessor(this.clientContext, this, i, this.threadCounter);
                this.longTimeTaskProcessors[i].start();
            }
        }
        catch (Throwable e) {
            logger.error("[LExecutorUnit]: initTaskProcessors error", e);
        }
    }

    public void restartPull() {
        try {
            if (!this.isExistsProcessors()) {
                this.initTaskProcessors();
            }
            if (this.pullProcessor == null || this.pullProcessor.isStop() || !this.pullProcessor.isAlive()) {
                this.pullProcessor = new PullProcessor(this.clientContext, this);
                this.pullProcessor.start();
            }
            logger.info("[LExecutorUnit]: restartPull start!");
        }
        catch (Throwable e) {
            logger.error("[LExecutorUnit]: restartPull error", e);
        }
    }

    private boolean isExistsProcessors() {
        Boolean result = false;
        if (this.longTimeTaskProcessors == null) {
            return false;
        }
        for (int i = 0; i < this.longTimeTaskProcessors.length; ++i) {
            if (this.longTimeTaskProcessors[i] == null && this.longTimeTaskProcessors[i].isStop() && !this.longTimeTaskProcessors[i].isAlive()) continue;
            return true;
        }
        return result;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void releaseCompleteTask() {
        if (!this.isPullTaskFlag()) {
            try {
                TaskSnapshot taskSnapshot;
                this.setReleaseTaskFlag(true);
                BlockingQueue<TaskSnapshot> queue = this.getQueue();
                BlockingQueue<TaskSnapshot> completeQueue = this.getCompletedqueue();
                int queueTotal = queue.size();
                int completeQueueTotal = completeQueue.size();
                int releaseQueueSuccess = 0;
                while (!queue.isEmpty()) {
                    try {
                        taskSnapshot = (TaskSnapshot)queue.poll();
                        if (taskSnapshot == null) continue;
                        this.releaseQueue.put(taskSnapshot);
                    }
                    catch (Throwable e) {
                        logger.error("[LReleaseProcessor]: pullQueue error, instanceId:" + this.getExecutableTask().getJobInstanceSnapshot().getId(), e);
                    }
                }
                while (!completeQueue.isEmpty()) {
                    try {
                        taskSnapshot = (TaskSnapshot)completeQueue.poll();
                        if (taskSnapshot == null) continue;
                        this.releaseQueue.put(taskSnapshot);
                    }
                    catch (Throwable e) {
                        logger.error("[LReleaseProcessor]: pullAndPut error, instanceId:" + this.getExecutableTask().getJobInstanceSnapshot().getId(), e);
                    }
                }
                int releaseQueueTotal = this.releaseQueue.size();
                while (!this.releaseQueue.isEmpty()) {
                    try {
                        Result<Boolean> ackResult = null;
                        TaskSnapshot taskSnapshot2 = (TaskSnapshot)this.releaseQueue.poll();
                        if (taskSnapshot2 != null) {
                            ackResult = this.clientContext.getExecutor().acknowledgeRes(taskSnapshot2, 7, 0);
                        }
                        if (ackResult != null && ((Boolean)ackResult.getData()).booleanValue()) {
                            this.deleteTaskRunStatisticMap(taskSnapshot2.getId());
                            ++releaseQueueSuccess;
                            logger.info("[LReleaseProcessor] release task, instanceid:" + taskSnapshot2.getJobInstanceId() + ",taskid(db):" + taskSnapshot2.getId());
                            continue;
                        }
                        if (taskSnapshot2 == null) continue;
                        queue.put(taskSnapshot2);
                        logger.warn("[LReleaseProcessor] release task failur, reenter queue, instanceid:" + taskSnapshot2.getJobInstanceId() + ",taskid(db):" + taskSnapshot2.getId() + ",ackResult:" + ackResult.toString());
                    }
                    catch (Throwable e) {
                        logger.error("[LReleaseProcessor]: pullCompleteQueue error, instanceId:" + this.getExecutableTask().getJobInstanceSnapshot().getId(), e);
                    }
                }
                logger.info("[LReleaseProcessor]: releaseQueue end, queueTotal:" + queueTotal + ", completeQueueTotal:" + completeQueueTotal + ", releaseQueueTotal:" + releaseQueueTotal + ", releaseQueueSuccess:" + releaseQueueSuccess);
            }
            catch (Throwable e) {
                logger.error("[LReleaseProcessor]: run error, instanceId:" + this.getExecutableTask().getJobInstanceSnapshot().getId(), e);
            }
            finally {
                this.setReleaseTaskFlag(false);
            }
        }
    }

    public void clear() {
        try {
            int queueTotal = this.queue.size();
            int completeQueueTotal = this.completedqueue.size();
            int releaseQueueTotal = this.releaseQueue.size();
            int taskRunStatisticMapTotal = this.taskRunStatisticMap.size();
            this.queue.clear();
            this.completedqueue.clear();
            this.releaseQueue.clear();
            this.taskRunStatisticMap.clear();
            logger.info("[LExecutorUnit]: clear success, queueTotal:" + queueTotal + ", completeQueueTotal:" + completeQueueTotal + ", releaseQueueTotal:" + releaseQueueTotal + ", taskRunStatisticMapTotal:" + taskRunStatisticMapTotal);
        }
        catch (Throwable e) {
            logger.error("[LExecutorUnit]: clear error, instanceId:" + this.executableTask.getJobInstanceSnapshot().getId(), e);
        }
    }

    public void stopTask() {
        try {
            this.pullProcessor.setStop(true);
            this.reFillingProcessor.setStop(true);
            this.executorService.shutdown();
            for (int i = 0; i < this.longTimeTaskProcessors.length; ++i) {
                this.longTimeTaskProcessors[i].setStop(true);
            }
            this.clear();
            logger.info("[LExecutorUnit]: stopTask end");
        }
        catch (Throwable e) {
            logger.error("[LExecutorUnit]: stopTask error, instanceId:" + this.executableTask.getJobInstanceSnapshot().getId(), e);
        }
    }

    public void forceStopTask() {
        try {
            try {
                this.pullProcessor.stop();
            }
            catch (Throwable e) {
                logger.error("[LExecutorUnit]: forceStopTask pullProcessor error, instanceId:" + this.executableTask.getJobInstanceSnapshot().getId(), e);
            }
            try {
                this.reFillingProcessor.stop();
            }
            catch (Throwable e) {
                logger.error("[LExecutorUnit]: forceStopTask pullProcessor error, instanceId:" + this.executableTask.getJobInstanceSnapshot().getId(), e);
            }
            this.clear();
            this.executorService.shutdownNow();
            for (int i = 0; i < this.longTimeTaskProcessors.length; ++i) {
                try {
                    this.longTimeTaskProcessors[i].stop();
                    continue;
                }
                catch (Throwable e) {
                    logger.error("[LExecutorUnit]: forceStopTask parallelTaskProcessors error, instanceId:" + this.executableTask.getJobInstanceSnapshot().getId(), e);
                }
            }
            logger.info("[LExecutorUnit]: stopTask end");
        }
        catch (Throwable e) {
            logger.error("[LExecutorUnit]: forceStopTask error, instanceId:" + this.executableTask.getJobInstanceSnapshot().getId(), e);
        }
    }

    public boolean isExecutorStop() {
        return this.queue.isEmpty() && this.threadCounter.get() == 0;
    }

    public boolean offer(TaskSnapshot taskSnapshot) {
        boolean result = false;
        try {
            result = this.queue.offer(taskSnapshot, 5000L, TimeUnit.MILLISECONDS);
            logger.info("[LExecutorUnit]: offer task,instanceId:" + taskSnapshot.getJobInstanceId() + ",taskid:" + taskSnapshot.getId() + ",result:" + result);
        }
        catch (Throwable e) {
            logger.error("[LExecutorUnit]: offer error, jobInstanceId:" + taskSnapshot.getJobInstanceId() + ", id:" + taskSnapshot.getId(), e);
        }
        return result;
    }

    public void taskPostProcess(TaskSnapshot taskSnapshot) {
        try {
            this.completedqueue.add(taskSnapshot);
            logger.info("[LExecutorUnit]: taskPostProcess,taskid:" + taskSnapshot.getId());
        }
        catch (Throwable e) {
            logger.error("[LPullProcessor]: put error, instanceId:" + taskSnapshot.getJobInstanceId() + ", id:" + taskSnapshot.getId(), e);
        }
    }

    public BlockingQueue<TaskSnapshot> getCompletedqueue() {
        return this.completedqueue;
    }

    public ExecutableTask getExecutableTask() {
        return this.executableTask;
    }

    public BlockingQueue<TaskSnapshot> getQueue() {
        return this.queue;
    }

    public LongTimeTaskProcessor[] getLongTimeTaskProcessors() {
        return this.longTimeTaskProcessors;
    }

    public AtomicInteger getThreadCounter() {
        return this.threadCounter;
    }

    public LongTimePool getLongTimePool() {
        return this.longTimePool;
    }

    public String toString() {
        return "ExecutorUnit [executableTask=" + this.executableTask + "]";
    }

    public ReFillingProcessor getReFillingProcessor() {
        return this.reFillingProcessor;
    }
}

