package com.alibaba.dts.client.executor.grid.queue.send;

import com.alibaba.dts.client.executor.grid.queue.TaskEvent;
import com.alibaba.dts.client.executor.job.context.ClientContextImpl;
import com.alibaba.dts.client.executor.job.context.JobContextImpl;
import com.alibaba.dts.client.store.access.TaskSnapshotAccess;
import com.alibaba.dts.common.context.InvocationContext;
import com.alibaba.dts.common.domain.ExecutableTask;
import com.alibaba.dts.common.domain.remoting.RemoteMachine;
import com.alibaba.dts.common.domain.result.Result;
import com.alibaba.dts.common.domain.result.ResultCode;
import com.alibaba.dts.common.domain.store.ExecutionCounter;
import com.alibaba.dts.common.domain.store.TaskSnapshot;
import com.alibaba.dts.common.logger.SchedulerXLoggerFactory;
import com.alibaba.dts.common.logger.innerlog.Logger;
import com.alibaba.dts.common.service.NodeServerService;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;

/* loaded from: input_file:com/alibaba/dts/client/executor/grid/queue/send/TaskSender.class */
public class TaskSender implements Runnable {
    private static final Logger logger = SchedulerXLoggerFactory.getLogger((Class<?>) TaskSender.class);
    private ClientContextImpl clientContext;
    private SendManager sendManager;
    private TaskSnapshotAccess taskSnapshotDao;

    public TaskSender(ClientContextImpl clientContextImpl, SendManager sendManager) {
        this.sendManager = sendManager;
        this.clientContext = clientContextImpl;
        this.taskSnapshotDao = clientContextImpl.getStore().getTaskSnapshotDao();
    }

    @Override // java.lang.Runnable
    public void run() {
        BlockingQueue<TaskEvent> sendQueue = this.sendManager.getSendQueue();
        while (true) {
            try {
                TaskEvent take = sendQueue.take();
                long id = take.getExecutableTask().getJobInstanceSnapshot().getId();
                if (this.sendManager.isInterruptedJobInstance(take.getExecutableTask().getJobInstanceSnapshot().getId())) {
                    logger.debug("job instance interrupted, jobId=" + take.getExecutableTask().getJob().getId() + ", jobInstanceId=" + id);
                } else {
                    ExecutableTask executableTask = take.getExecutableTask();
                    if (executableTask != null) {
                        RemoteMachine targetMachine = take.getTargetMachine();
                        targetMachine.setTimeout(30000L);
                        InvocationContext.setRemoteMachine(targetMachine);
                        NodeServerService nodeServerService = this.clientContext.getNodeServerService();
                        executableTask.setSendNodeAddress(this.clientContext.getNodeConfig().getLocalAddress());
                        Result<Boolean> result = null;
                        try {
                            result = nodeServerService.receiveTasks(executableTask);
                            logger.debug("[TaskSender] send task,targetMachine:" + take.getTargetMachine() + ",result:" + result + ",jobID:" + executableTask.getJob().getId() + ",jobInstanceID:" + executableTask.getJobInstanceSnapshot().getId() + ",compensation:" + executableTask.isCompensation() + ",total tasks:" + executableTask.getTaskSnapshotList().size());
                        } catch (Throwable th) {
                            logger.error("[TaskSender]:task send error,,jobID:" + executableTask.getJob().getId() + ",jobInstanceID:" + executableTask.getJobInstanceSnapshot().getId(), th);
                        }
                        if (result != null) {
                            try {
                            } catch (Throwable th2) {
                                logger.error("[TaskSender] process updateStatusBatch error,", th2);
                            }
                            if (result.getResultCode() == ResultCode.SUCCESS) {
                                String remoteAddress = take.getTargetMachine().getRemoteAddress();
                                this.taskSnapshotDao.updateReceiveNodeBatch(executableTask.getTaskSnapshotList(), remoteAddress);
                                if (!executableTask.isCompensation()) {
                                    this.taskSnapshotDao.updateStatus2QueueIfStatusIsInitBatch(executableTask.getTaskSnapshotList());
                                }
                                ConcurrentHashMap<String, ConcurrentHashMap<String, ExecutionCounter>> concurrentHashMap = this.clientContext.getExecutionCounterTable().get(Long.valueOf(id));
                                if (concurrentHashMap == null) {
                                    concurrentHashMap = new ConcurrentHashMap<>();
                                    ConcurrentHashMap<String, ConcurrentHashMap<String, ExecutionCounter>> putIfAbsent = this.clientContext.getExecutionCounterTable().putIfAbsent(Long.valueOf(id), concurrentHashMap);
                                    if (putIfAbsent != null) {
                                        concurrentHashMap = putIfAbsent;
                                    }
                                }
                                ConcurrentHashMap<String, ExecutionCounter> concurrentHashMap2 = concurrentHashMap.get(remoteAddress);
                                if (concurrentHashMap2 == null) {
                                    concurrentHashMap2 = new ConcurrentHashMap<>();
                                    ConcurrentHashMap<String, ExecutionCounter> putIfAbsent2 = concurrentHashMap.putIfAbsent(remoteAddress, concurrentHashMap2);
                                    if (putIfAbsent2 != null) {
                                        concurrentHashMap2 = putIfAbsent2;
                                    }
                                }
                                List<TaskSnapshot> taskSnapshotList = executableTask.getTaskSnapshotList();
                                if (taskSnapshotList == null || taskSnapshotList.isEmpty()) {
                                    return;
                                }
                                Iterator<TaskSnapshot> it = taskSnapshotList.iterator();
                                while (it.hasNext()) {
                                    String taskName = it.next().getTaskName();
                                    ExecutionCounter executionCounter = concurrentHashMap2.get(taskName);
                                    if (executionCounter == null) {
                                        executionCounter = new ExecutionCounter();
                                        executionCounter.setReceiveNode(remoteAddress);
                                        executionCounter.setTaskName(taskName);
                                        ExecutionCounter putIfAbsent3 = concurrentHashMap2.putIfAbsent(taskName, executionCounter);
                                        if (putIfAbsent3 != null) {
                                            executionCounter = putIfAbsent3;
                                        }
                                    }
                                    executionCounter.getTotalCounter().getAndIncrement();
                                    executionCounter.getQueuedCounter().getAndIncrement();
                                }
                            }
                        }
                        if (result == null) {
                            reSendTasks(take);
                        } else if (result.getResultCode() == ResultCode.NODE_RECEIVE_QUEUE_NOT_AVAILABLE) {
                            reSendTasks(take);
                        }
                    } else {
                        logger.warn("[TaskSender] executableTask is nulltargetMachine:" + take.getTargetMachine());
                    }
                }
            } catch (Throwable th3) {
                logger.error("[TaskSender] process error,", th3);
            }
        }
    }

    private long getCountQueued() {
        long j = 0;
        Iterator<ConcurrentHashMap<String, ConcurrentHashMap<String, ExecutionCounter>>> it = this.clientContext.getExecutionCounterTable().values().iterator();
        while (it.hasNext()) {
            Iterator<ConcurrentHashMap<String, ExecutionCounter>> it2 = it.next().values().iterator();
            while (it2.hasNext()) {
                Iterator<ExecutionCounter> it3 = it2.next().values().iterator();
                while (it3.hasNext()) {
                    j += it3.next().getQueuedCounter().get();
                }
            }
        }
        return j;
    }

    private void reSendTasks(TaskEvent taskEvent) {
        final ExecutableTask executableTask = taskEvent.getExecutableTask();
        if (this.clientContext.getGridTaskSender().isInterruptedInstance(executableTask.getJobInstanceSnapshot().getId())) {
            logger.warn("[TaskSender]: reSendTasks force interrupt:,jobId:" + executableTask.getJob().getId() + ",jobInstanceId:" + executableTask.getJobInstanceSnapshot().getId() + ",total tasks:" + executableTask.getTaskSnapshotList().size());
            return;
        }
        final JobContextImpl jobContextImpl = new JobContextImpl();
        jobContextImpl.setJob(executableTask.getJob());
        jobContextImpl.setJobInstanceSnapshot(executableTask.getJobInstanceSnapshot());
        this.clientContext.getGridTaskSender().getReSendExecutorService().submit(new Runnable() { // from class: com.alibaba.dts.client.executor.grid.queue.send.TaskSender.1
            @Override // java.lang.Runnable
            public void run() {
                while (true) {
                    Result<Boolean> dispatchRetryTaskList = TaskSender.this.clientContext.getGridTaskSender().dispatchRetryTaskList(executableTask.getTaskSnapshotList(), jobContextImpl);
                    if (dispatchRetryTaskList != null && (dispatchRetryTaskList.getData().booleanValue() || dispatchRetryTaskList.getResultCode() == ResultCode.TASK_SEND_INTERRUPT)) {
                        return;
                    } else {
                        try {
                            TimeUnit.SECONDS.sleep(1L);
                        } catch (InterruptedException e) {
                        }
                    }
                }
            }
        });
        logger.warn("[TaskSender] retry send tasks, previous receiveNodeAddress: " + taskEvent.getTargetMachine().getRemoteAddress() + ", jobId:" + executableTask.getJob().getId() + " ,jobInstanceId:" + executableTask.getJobInstanceSnapshot().getId() + " ,total tasks:" + executableTask.getTaskSnapshotList().size());
    }
}
