package com.taobao.notify.remotingclient.processor;

import com.taobao.gecko.core.command.ResponseCommand;
import com.taobao.gecko.core.command.ResponseStatus;
import com.taobao.gecko.core.command.kernel.BooleanAckCommand;
import com.taobao.gecko.service.Connection;
import com.taobao.gecko.service.RequestProcessor;
import com.taobao.gecko.service.exception.NotifyRemotingException;
import com.taobao.middleware.logger.Logger;
import com.taobao.notify.client.log.ErrorCode;
import com.taobao.notify.client.log.NotifyClientLogger;
import com.taobao.notify.client.manager.NotifyGroup;
import com.taobao.notify.client.manager.NotifyGroupManager;
import com.taobao.notify.message.Message;
import com.taobao.notify.remoting.core.command.request.CheckMessageCommand;
import com.taobao.notify.remoting.core.command.request.NotifyRequestCommand;
import com.taobao.notify.remoting.core.command.response.CheckMessageAckCommand;
import com.taobao.notify.remoting.core.command.response.NotifyBooleanAckCommand;
import com.taobao.notify.remotingclient.CheckMessageListener;
import com.taobao.notify.remotingclient.MessageStatus;
import com.taobao.notify.utils.LoggerPrefix;
import com.taobao.notify.utils.UniqId;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;

/* loaded from: input_file:lib/notify-tr-client-5.0.4.jar:com/taobao/notify/remotingclient/processor/NewCheckMessageProcessor.class */
public class NewCheckMessageProcessor implements RequestProcessor<CheckMessageCommand> {
    static final Logger logger = NotifyClientLogger.logger();
    private static final String LogPrefix = LoggerPrefix.makeLogPrefix(NewCheckMessageProcessor.class);
    private final NotifyGroupManager notifyGroupManager;
    private final ThreadPoolExecutor checkMessageWorkTP;

    public NewCheckMessageProcessor(NotifyGroupManager notifyGroupManager, ThreadPoolExecutor threadPoolExecutor) {
        if (null == threadPoolExecutor) {
            throw new IllegalArgumentException("需要为NewCheckMessageProcessor设置工作线程池");
        }
        this.notifyGroupManager = notifyGroupManager;
        this.checkMessageWorkTP = threadPoolExecutor;
    }

    @Override // com.taobao.gecko.service.RequestProcessor
    public ThreadPoolExecutor getExecutor() {
        return this.checkMessageWorkTP;
    }

    @Override // com.taobao.gecko.service.RequestProcessor
    public void handleRequest(final CheckMessageCommand checkMessageCommand, final Connection connection) {
        CheckMessageListener checkMessageListener;
        final MessageStatus messageStatus = new MessageStatus();
        String targetGroup = checkMessageCommand.getMessage().getTargetGroup();
        final Message processType = checkMessageCommand.getMessage().toProcessType();
        if (targetGroup == null) {
            throw new NullPointerException("Null targetGroup. \r\nMessage: \r\n" + processType.toStringWithoutBody());
        }
        final ResponseCommand responseCommand = null;
        NotifyGroup group = this.notifyGroupManager.getGroup(targetGroup);
        ThreadPoolExecutor threadPoolExecutor = null;
        if (null != group) {
            checkMessageListener = group.getCheckMsgListener();
            threadPoolExecutor = group.getCheckMessageWorkTP();
        } else {
            checkMessageListener = null;
        }
        if (threadPoolExecutor == null) {
            innerHandleRequest(checkMessageCommand, connection, messageStatus, processType, null, checkMessageListener);
            return;
        }
        try {
            final CheckMessageListener checkMessageListener2 = checkMessageListener;
            threadPoolExecutor.execute(new Runnable() { // from class: com.taobao.notify.remotingclient.processor.NewCheckMessageProcessor.1
                @Override // java.lang.Runnable
                public void run() {
                    NewCheckMessageProcessor.this.innerHandleRequest(checkMessageCommand, connection, messageStatus, processType, responseCommand, checkMessageListener2);
                }
            });
        } catch (RejectedExecutionException e) {
            responseThreadPoolBusy(checkMessageCommand, connection);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void innerHandleRequest(CheckMessageCommand checkMessageCommand, Connection connection, MessageStatus messageStatus, Message message, ResponseCommand responseCommand, CheckMessageListener checkMessageListener) {
        if (null != checkMessageListener) {
            try {
                checkMessageListener.receiveCheckMessage(message, messageStatus);
                if (messageStatus.getStatus() != null) {
                    switch (messageStatus.getStatus()) {
                        case COMMITTED:
                            responseCommand = newCheckMessageAckCommand(checkMessageCommand, CheckMessageAckCommand.CheckResult.COMMITTED, message);
                            break;
                        case ROLLBACK:
                            responseCommand = newCheckMessageAckCommand(checkMessageCommand, CheckMessageAckCommand.CheckResult.ROLLBACK, message);
                            break;
                        case NOACTION:
                            responseCommand = newCheckMessageAckCommand(checkMessageCommand, CheckMessageAckCommand.CheckResult.NOACTION, message);
                            break;
                    }
                } else {
                    responseCommand = messageStatus.isRollbackOnly() ? newCheckMessageAckCommand(checkMessageCommand, CheckMessageAckCommand.CheckResult.ROLLBACK, message) : newCheckMessageAckCommand(checkMessageCommand, CheckMessageAckCommand.CheckResult.COMMITTED, message);
                }
            } catch (Exception e) {
                logger.error(ErrorCode.NotifyClient_Send_Check_Message.name(), ErrorCode.NotifyClient_Send_Check_Message.toString(), LogPrefix + "call receive check message error", e);
                responseCommand = new NotifyBooleanAckCommand((NotifyRequestCommand) checkMessageCommand, ResponseStatus.EXCEPTION, "check异常:" + e.getMessage());
            }
        } else {
            StringBuilder sb = new StringBuilder();
            sb.append("MessageID: [").append(message.getMessageId() != null ? UniqId.getInstance().bytes2string(message.getMessageId()) : "unknown_id");
            sb.append("] Topic: [").append(message.getTopic());
            sb.append("] MessageType: [").append(message.getMessageType());
            sb.append("] 没有对应的CheckMessageListener");
            responseCommand = new NotifyBooleanAckCommand((NotifyRequestCommand) checkMessageCommand, ResponseStatus.UNKNOWN, sb.toString());
            logger.warn(ErrorCode.NotifyClient_Send_No_Check_Message_Listener.name(), ErrorCode.NotifyClient_Send_No_Check_Message_Listener.toString() + sb.toString());
        }
        response(connection, responseCommand);
    }

    private ResponseCommand newCheckMessageAckCommand(CheckMessageCommand checkMessageCommand, CheckMessageAckCommand.CheckResult checkResult, Message message) {
        return new CheckMessageAckCommand(checkMessageCommand, ResponseStatus.NO_ERROR, message.getMessageId(), checkResult, checkMessageCommand.getServerData(), message.getPostDelayTime());
    }

    private void response(Connection connection, ResponseCommand responseCommand) {
        try {
            connection.response(responseCommand);
        } catch (NotifyRemotingException e) {
            logger.warn(LogPrefix + "发送CheckMessage的响应消息失败", e);
        }
    }

    private void responseThreadPoolBusy(CheckMessageCommand checkMessageCommand, Connection connection) {
        BooleanAckCommand createBooleanAckCommand = connection.getRemotingContext().getCommandFactory().createBooleanAckCommand(checkMessageCommand.getRequestHeader(), ResponseStatus.THREADPOOL_BUSY, "groupId:[" + checkMessageCommand.getMessage().toProcessType().getTargetGroup() + "]的checkMessageWorkTP线程池繁忙");
        logger.warn("groupId:[" + checkMessageCommand.getMessage().toProcessType().getTargetGroup() + "]的checkMessageWorkTP线程池繁忙");
        response(connection, createBooleanAckCommand);
    }
}
