package com.taobao.notify.remotingclient.impl;

import com.taobao.middleware.logger.Logger;
import com.taobao.notify.client.NotifyClient;
import com.taobao.notify.client.impl.MessageInStore4j;
import com.taobao.notify.client.impl.ProcessRegister;
import com.taobao.notify.client.impl.ReliableAsynSendManager;
import com.taobao.notify.client.log.ErrorCode;
import com.taobao.notify.client.log.NotifyClientLogger;
import com.taobao.notify.client.manager.NotifyGroup;
import com.taobao.notify.client.manager.NotifyGroupManager;
import com.taobao.notify.message.Message;
import com.taobao.notify.message.MessageConverter;
import com.taobao.notify.remotingclient.CheckMessageListener;
import com.taobao.notify.remotingclient.MessageStatus;
import com.taobao.notify.remotingclient.SendResult;
import com.taobao.notify.utils.BytesKey;
import com.taobao.notify.utils.LoggerPrefix;
import com.taobao.notify.utils.threadpool.ManagedThreadPoolExecutor;
import java.util.Iterator;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicLong;
import org.springframework.beans.PropertyAccessor;

/* loaded from: input_file:lib/notify-tr-client-5.0.4.jar:com/taobao/notify/remotingclient/impl/ReliableAsynTraverseMessageTask.class */
public class ReliableAsynTraverseMessageTask implements Runnable {
    static final Logger logger = NotifyClientLogger.logger();
    private static final String LogPrefix = LoggerPrefix.makeLogPrefix(AsynSendMessageTask.class);
    private final NotifyClient notifyClient;
    private final ReliableAsynSendManager reliableAsynSendManager;
    private final ThreadPoolExecutor asynSendMessageWorkTP;
    private volatile int corePoolSize;
    private volatile int maxPoolSize;
    private volatile long keepAliveTime;
    private volatile int maxQueueSize;
    private AtomicLong lostCount;
    private volatile boolean run;
    private volatile boolean suspend;
    private volatile int lostWaitWeight;
    private volatile NotifyGroupManager notifyGroupManager;

    /* loaded from: input_file:lib/notify-tr-client-5.0.4.jar:com/taobao/notify/remotingclient/impl/ReliableAsynTraverseMessageTask$ReliableAsynSendMessageTask.class */
    private static class ReliableAsynSendMessageTask implements Runnable {
        private final NotifyClient notifyClient;
        private final Message message;
        private final ReliableAsynSendManager reliableAsynSendManager;
        private final ReliableAsynTraverseMessageTask traverseTask;
        private final BytesKey messageIdKey;

        public ReliableAsynSendMessageTask(NotifyClient notifyClient, ReliableAsynSendManager reliableAsynSendManager, Message message, BytesKey bytesKey, ReliableAsynTraverseMessageTask reliableAsynTraverseMessageTask) {
            this.notifyClient = notifyClient;
            this.reliableAsynSendManager = reliableAsynSendManager;
            this.message = message;
            this.traverseTask = reliableAsynTraverseMessageTask;
            this.messageIdKey = bytesKey;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                try {
                    if (!this.notifyClient.isValidGroup(this.message.getGroupId())) {
                        if (ReliableAsynTraverseMessageTask.logger.isDebugEnabled()) {
                            ReliableAsynTraverseMessageTask.logger.debug(ReliableAsynTraverseMessageTask.LogPrefix + "可靠异步的[同步发送消息]阶段，跳过无效的GroupID[" + this.message.getGroupId() + "]的消息，启动阶段为正常现象");
                        }
                        ProcessRegister.getInstance().unregister(this.messageIdKey);
                        return;
                    }
                    byte[] messageId = this.message.getMessageId();
                    SendResult sendMessage = this.notifyClient.sendMessage(this.message);
                    if (sendMessage.isSuccess()) {
                        this.reliableAsynSendManager.removeMessage(messageId);
                    } else {
                        this.traverseTask.addLostCount();
                        ReliableAsynTraverseMessageTask.logger.error(ErrorCode.NotifyClient_Send_Store4j_Message_Failed.name(), ErrorCode.NotifyClient_Send_Store4j_Message_Failed.toString(), ReliableAsynTraverseMessageTask.LogPrefix + "可靠异步的[同步发送消息]阶段错误，MessageID：" + sendMessage.getMessageId() + "，错误原因：" + sendMessage.getErrorMessage() + "，异常：" + sendMessage.getRuntimeException());
                    }
                    ProcessRegister.getInstance().unregister(this.messageIdKey);
                } catch (Throwable th) {
                    this.traverseTask.addLostCount();
                    ReliableAsynTraverseMessageTask.logger.error(ErrorCode.NotifyClient_Send_Store4j_Message_Failed.name(), ErrorCode.NotifyClient_Send_Store4j_Message_Failed.toString(), "可靠异步的[同步发送消息]阶段错误: ", th);
                    ProcessRegister.getInstance().unregister(this.messageIdKey);
                }
            } catch (Throwable th2) {
                ProcessRegister.getInstance().unregister(this.messageIdKey);
                throw th2;
            }
        }
    }

    public ReliableAsynTraverseMessageTask(NotifyClient notifyClient, NotifyGroupManager notifyGroupManager, ReliableAsynSendManager reliableAsynSendManager) {
        this(notifyClient, notifyGroupManager, reliableAsynSendManager, 5, 10, 1000L, 100);
    }

    public ReliableAsynTraverseMessageTask(NotifyClient notifyClient, NotifyGroupManager notifyGroupManager, ReliableAsynSendManager reliableAsynSendManager, int i, int i2, long j, int i3) {
        this.lostCount = new AtomicLong(0L);
        this.run = true;
        this.suspend = false;
        this.lostWaitWeight = 1;
        this.corePoolSize = i;
        this.maxPoolSize = i2;
        this.keepAliveTime = j;
        this.maxQueueSize = i3;
        if (null == notifyClient) {
            throw new NullPointerException("null == notifyManager");
        }
        if (null == reliableAsynSendManager) {
            throw new NullPointerException("null == reliableAsynSendManager");
        }
        if (null == notifyGroupManager) {
            throw new NullPointerException("null == notifyGroupManager");
        }
        this.reliableAsynSendManager = reliableAsynSendManager;
        this.notifyClient = notifyClient;
        this.notifyGroupManager = notifyGroupManager;
        this.asynSendMessageWorkTP = new ManagedThreadPoolExecutor(this.corePoolSize, this.maxPoolSize, this.keepAliveTime, this.maxQueueSize, "reliableAsynSendMsgTP-" + hashCode(), new ThreadPoolExecutor.AbortPolicy());
    }

    public int getWaitCount() {
        int storeSize = this.reliableAsynSendManager.storeSize();
        if (storeSize > this.maxPoolSize) {
            storeSize = this.maxPoolSize;
        }
        if (this.lostCount.get() > storeSize / 2) {
            logger.error("", LogPrefix + "在可靠异步发送中，上个周期异步发送消息发送不出去，休眠" + (10 * this.lostWaitWeight) + "秒，然后再次尝试可靠异步发送消息，未发送的消息存储在本地硬盘，请客户放心，消息是安全的");
            sleep(10000 * this.lostWaitWeight);
            this.lostWaitWeight++;
            if (this.lostWaitWeight > 30) {
                this.lostWaitWeight = 30;
            }
            this.lostCount.set(0L);
        } else {
            this.lostWaitWeight = 1;
        }
        return storeSize;
    }

    public boolean isSuspend() {
        return this.suspend;
    }

    public void setSuspend(boolean z) {
        this.suspend = z;
    }

    /* JADX WARN: Finally extract failed */
    @Override // java.lang.Runnable
    public void run() {
        logger.info(LogPrefix + "进入可靠异步发送消息流程");
        Thread.currentThread().setName("ReliableAsynTraverseMessageTask");
        boolean z = true;
        while (this.run) {
            boolean z2 = false;
            if (this.suspend) {
                if (z) {
                    logger.warn(">>>>>可靠异步发送消息流程被用户暂停");
                    z = false;
                }
                sleep(5000L);
            } else if (null == this.reliableAsynSendManager.getStore()) {
                sleep(1000L);
            } else {
                int waitCount = getWaitCount();
                try {
                    Iterator<byte[]> iterator = this.reliableAsynSendManager.getIterator();
                    while (true) {
                        if (!iterator.hasNext()) {
                            break;
                        }
                        byte[] next = iterator.next();
                        if (waitCount <= 0) {
                            waitCount = getWaitCount();
                        }
                        BytesKey bytesKey = new BytesKey(next);
                        if (ProcessRegister.getInstance().register(bytesKey)) {
                            MessageInStore4j messageInStore4j = this.reliableAsynSendManager.getMessageInStore4j(next);
                            if (null == messageInStore4j) {
                                this.reliableAsynSendManager.removeMessage(next);
                                ProcessRegister.getInstance().unregister(bytesKey);
                                z2 = true;
                                break;
                            }
                            if (messageInStore4j.isEnabledSend()) {
                                try {
                                    Message sendTypeMessage = MessageConverter.toSendTypeMessage(messageInStore4j.getMessage().toProcessType());
                                    if (sendTypeMessage != null) {
                                        try {
                                            try {
                                                this.asynSendMessageWorkTP.execute(new ReliableAsynSendMessageTask(this.notifyClient, this.reliableAsynSendManager, sendTypeMessage, bytesKey, this));
                                                z2 = true;
                                                waitCount--;
                                            } catch (Throwable th) {
                                                int i = waitCount - 1;
                                                throw th;
                                            }
                                        } catch (Exception e) {
                                            logger.warn(LogPrefix + "消息投递过快", e);
                                            ProcessRegister.getInstance().unregister(bytesKey);
                                            sleep(1000L);
                                            z2 = true;
                                            waitCount--;
                                        }
                                    }
                                } catch (Exception e2) {
                                    logger.warn(LogPrefix + "转化MessageInStore4j有误,消息为可投递状态 MsgId:[" + next + PropertyAccessor.PROPERTY_KEY_SUFFIX, e2);
                                    ProcessRegister.getInstance().unregister(bytesKey);
                                    z2 = true;
                                }
                            } else {
                                try {
                                    if (System.currentTimeMillis() - messageInStore4j.getCreateTime() > 10000) {
                                        try {
                                            Message sendTypeMessage2 = MessageConverter.toSendTypeMessage(messageInStore4j.getMessage().toProcessType());
                                            MessageStatus messageStatus = new MessageStatus();
                                            NotifyGroup group = this.notifyGroupManager.getGroup(sendTypeMessage2.getGroupId());
                                            CheckMessageListener checkMessageListener = null;
                                            if (null != group) {
                                                checkMessageListener = group.getCheckMsgListener();
                                            }
                                            if (null != checkMessageListener) {
                                                try {
                                                    checkMessageListener.receiveCheckMessage(sendTypeMessage2, messageStatus);
                                                    if (messageStatus.isRollbackOnly()) {
                                                        logger.warn(LogPrefix + "事务消息的事务没有完成，删除该Message MsgId:[" + next + PropertyAccessor.PROPERTY_KEY_SUFFIX);
                                                        this.reliableAsynSendManager.removeMessage(next);
                                                    } else {
                                                        this.reliableAsynSendManager.commitMessage(sendTypeMessage2, new SendResult());
                                                    }
                                                } catch (Exception e3) {
                                                    logger.error(ErrorCode.NotifyClient_Store4j.name(), ErrorCode.NotifyClient_Store4j.toString(), LogPrefix + "call receive check message error", e3);
                                                }
                                            }
                                        } catch (Exception e4) {
                                            logger.warn(LogPrefix + "转化MessageInStore4j有误,消息为不可投递状态 MsgId:[" + next + PropertyAccessor.PROPERTY_KEY_SUFFIX, e4);
                                            ProcessRegister.getInstance().unregister(bytesKey);
                                        }
                                    }
                                    ProcessRegister.getInstance().unregister(bytesKey);
                                } catch (Throwable th2) {
                                    ProcessRegister.getInstance().unregister(bytesKey);
                                    throw th2;
                                }
                            }
                        }
                    }
                } catch (Throwable th3) {
                    logger.error(ErrorCode.NotifyClient_Store4j.name(), ErrorCode.NotifyClient_Store4j.toString(), LogPrefix + "遍历Store4j出错", th3);
                }
                if (!z2 && this.run) {
                    sleep(1000L);
                }
            }
        }
        logger.warn(LogPrefix, "可靠异步结束时，Store4j中消息的数量：" + this.reliableAsynSendManager.storeSize());
        this.asynSendMessageWorkTP.shutdown();
    }

    private void sleep(long j) {
        try {
            Thread.sleep(j);
        } catch (InterruptedException e) {
            logger.error(ErrorCode.NotifyClient_Store4j.name(), ErrorCode.NotifyClient_Store4j.toString(), LogPrefix + "Thread.sleep()出错", e);
            Thread.currentThread().interrupt();
        }
    }

    public boolean isRun() {
        return this.run;
    }

    public void setRun(boolean z) {
        this.run = z;
    }

    public int getCorePoolSize() {
        return this.corePoolSize;
    }

    public void setCorePoolSize(int i) {
        this.corePoolSize = i;
    }

    public int getMaxPoolSize() {
        return this.maxPoolSize;
    }

    public void setMaxPoolSize(int i) {
        this.maxPoolSize = i;
    }

    public long getKeepAliveTime() {
        return this.keepAliveTime;
    }

    public void setKeepAliveTime(long j) {
        this.keepAliveTime = j;
    }

    public int getMaxQueueSize() {
        return this.maxQueueSize;
    }

    public void setMaxQueueSize(int i) {
        this.maxQueueSize = i;
    }

    public void addLostCount() {
        this.lostCount.incrementAndGet();
    }

    public void clearLostCount() {
        this.lostCount.set(0L);
    }
}
