package com.alibaba.dts.client.executor.grid.queue.receive;

import com.alibaba.dts.client.executor.grid.processor.GridTaskProcessor;
import com.alibaba.dts.client.executor.job.context.ClientContextImpl;
import com.alibaba.dts.common.domain.ExecutableTask;
import com.alibaba.dts.common.domain.store.TaskSnapshot;
import com.alibaba.dts.common.logger.SchedulerXLoggerFactory;
import com.alibaba.dts.common.logger.innerlog.Logger;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;

/* loaded from: input_file:com/alibaba/dts/client/executor/grid/queue/receive/TaskReceiveHandler.class */
public class TaskReceiveHandler {
    private static final Logger logger = SchedulerXLoggerFactory.getLogger((Class<?>) TaskReceiveHandler.class);
    private ClientContextImpl clientContext;
    private Map<Long, ExecutorService> executorServiceMap;

    public TaskReceiveHandler(ClientContextImpl clientContextImpl, Map<Long, ExecutorService> map) {
        this.executorServiceMap = new ConcurrentHashMap();
        this.clientContext = clientContextImpl;
        this.executorServiceMap = map;
    }

    public void listen(final BlockingQueue<ExecutableTask> blockingQueue) {
        new Thread(new Runnable() { // from class: com.alibaba.dts.client.executor.grid.queue.receive.TaskReceiveHandler.1
            @Override // java.lang.Runnable
            public void run() {
                while (true) {
                    try {
                        try {
                            ExecutableTask executableTask = (ExecutableTask) blockingQueue.take();
                            if (TaskReceiveHandler.this.clientContext.getGridTaskSender().isInterruptedInstance(executableTask.getJobInstanceSnapshot().getId())) {
                                TaskReceiveHandler.logger.warn("[TaskReceiveHandler]: force interrupt:,jobId:" + executableTask.getJob().getId() + ",jobInstanceId:" + executableTask.getJobInstanceSnapshot().getId() + ",total tasks:" + executableTask.getTaskSnapshotList().size());
                            } else if (executableTask == null) {
                                TaskReceiveHandler.logger.warn("executableTask is null");
                            } else {
                                ExecutorService executorService = (ExecutorService) TaskReceiveHandler.this.executorServiceMap.get(Long.valueOf(executableTask.getJobInstanceSnapshot().getId()));
                                if (executorService == null || executorService.isTerminated() || executorService.isShutdown()) {
                                    TaskReceiveHandler.logger.warn("executorservice is null or terminated, {}", executableTask);
                                } else {
                                    TaskSnapshot taskSnapshot = executableTask.getTaskSnapshot();
                                    if (taskSnapshot != null) {
                                        executorService.submit(new GridTaskProcessor(TaskReceiveHandler.this.clientContext, executableTask, taskSnapshot));
                                    } else {
                                        for (TaskSnapshot taskSnapshot2 : executableTask.getTaskSnapshotList()) {
                                            taskSnapshot2.setSendNodeAddress(executableTask.getSendNodeAddress());
                                            executorService.submit(new GridTaskProcessor(TaskReceiveHandler.this.clientContext, executableTask, taskSnapshot2));
                                        }
                                    }
                                }
                            }
                        } catch (InterruptedException e) {
                            TaskReceiveHandler.logger.warn("executableTask is interrupted");
                        }
                    } catch (Throwable th) {
                        TaskReceiveHandler.logger.error("ExecutableTask receive process error", th);
                    }
                }
            }
        }).start();
    }
}
