package com.taobao.notify.client.mock;

import com.taobao.gecko.core.command.ResponseStatus;
import com.taobao.middleware.logger.Logger;
import com.taobao.notify.client.NotifyClient;
import com.taobao.notify.client.exception.NotifyClientIllegalArgumentException;
import com.taobao.notify.client.impl.DefaultNotifyClient;
import com.taobao.notify.client.log.NotifyClientLogger;
import com.taobao.notify.client.manager.NotifyGroup;
import com.taobao.notify.common.config.MessageProperties;
import com.taobao.notify.config.SubscriptMsgDetailInfo;
import com.taobao.notify.message.Message;
import com.taobao.notify.message.PackagedMessage;
import com.taobao.notify.remotingclient.CheckMessageListener;
import com.taobao.notify.remotingclient.InnerSendResult;
import com.taobao.notify.remotingclient.MessageListener;
import com.taobao.notify.remotingclient.MessageStatus;
import com.taobao.notify.remotingclient.SendResultType;
import com.taobao.notify.subscription.Binding;
import com.taobao.notify.utils.LoggerPrefix;
import com.taobao.notify.utils.UniqId;
import com.taobao.notify.utils.threadpool.ManagedThreadPoolExecutor;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ThreadPoolExecutor;

/* loaded from: input_file:lib/notify-tr-client-5.0.4.jar:com/taobao/notify/client/mock/MockNotifyClient.class */
public class MockNotifyClient extends DefaultNotifyClient implements NotifyClient {
    static final Logger logger = NotifyClientLogger.logger();
    private static final String LogPrefix = LoggerPrefix.makeLogPrefix(MockNotifyClient.class);
    private final Set<Binding> closedSubscriptions = new CopyOnWriteArraySet();
    private final ThreadPoolExecutor recvMessageWorkTP = new ManagedThreadPoolExecutor(5, 5, 1000, 10, "recvMsgTP-" + hashCode(), new ThreadPoolExecutor.AbortPolicy());
    private MockNotifyClientControl control = new MockNotifyClientControl(this);
    private int connectionCount = 1;
    private int messageTPCorePoolSize = 1;
    private int messageTPMaximumPoolSize = 1;
    private long messageTPKeepAliveTime = 1000;
    private int checkMessageTPCorePoolSize = 1;
    private int checkMessageTPMaximumPoolSize = 1;
    private long checkMessageTPKeepAliveTime = 1000;

    /* loaded from: input_file:lib/notify-tr-client-5.0.4.jar:com/taobao/notify/client/mock/MockNotifyClient$MockCheckMessageListenerTask.class */
    public class MockCheckMessageListenerTask implements Runnable {
        private final Message message;
        private final long delayTime;

        public MockCheckMessageListenerTask(Message message, long j) {
            this.message = message;
            this.delayTime = j;
        }

        @Override // java.lang.Runnable
        public void run() {
            if (0 != this.delayTime) {
                try {
                    Thread.sleep(this.delayTime);
                } catch (InterruptedException e) {
                    MockNotifyClient.logger.warn(MockNotifyClient.LogPrefix + "sleep报异常");
                }
            }
            MessageStatus messageStatus = new MessageStatus();
            NotifyGroup group = MockNotifyClient.this.notifyGroupManager.getGroup(this.message.getGroupId());
            if (null == group) {
                MockNotifyClient.logger.warn(MockNotifyClient.LogPrefix + "没有对应的GroupID");
                return;
            }
            CheckMessageListener checkMsgListener = group.getCheckMsgListener();
            if (null == checkMsgListener) {
                MockNotifyClient.logger.warn(MockNotifyClient.LogPrefix + "没有设置CheckMessageListener");
                return;
            }
            try {
                checkMsgListener.receiveCheckMessage(this.message, messageStatus);
                if (messageStatus.isRollbackOnly()) {
                    MockNotifyClient.logger.warn(MockNotifyClient.LogPrefix + "CheckMessage被Rollback");
                } else {
                    MockNotifyClient.logger.warn(MockNotifyClient.LogPrefix + "CheckMessage正常完成");
                }
            } catch (Exception e2) {
                MockNotifyClient.logger.warn(MockNotifyClient.LogPrefix + "call receive check message error", e2);
            }
        }
    }

    /* loaded from: input_file:lib/notify-tr-client-5.0.4.jar:com/taobao/notify/client/mock/MockNotifyClient$MockMessageListenerTask.class */
    public class MockMessageListenerTask implements Runnable {
        private final Message message;
        private final long delayTime;

        public MockMessageListenerTask(Message message, long j) {
            this.message = message;
            this.delayTime = j;
        }

        @Override // java.lang.Runnable
        public void run() {
            if (0 != this.delayTime) {
                try {
                    Thread.sleep(this.delayTime);
                } catch (InterruptedException e) {
                    MockNotifyClient.logger.warn(MockNotifyClient.LogPrefix + "sleep报异常");
                }
            }
            MessageStatus messageStatus = new MessageStatus();
            String topic = this.message.getTopic();
            String topic2 = this.message.getTopic();
            String targetGroup = this.message.getTargetGroup();
            if (null == targetGroup) {
                throw new NullPointerException("Null targetGroup. \r\nMessage: \r\n" + this.message.toStringWithoutBody());
            }
            if (MockNotifyClient.this.closedSubscriptions.contains(targetGroup + ":" + topic + ":" + topic2)) {
                MockNotifyClient.logger.warn(MockNotifyClient.LogPrefix + "由于设置了暂时关闭订阅消息，消息被过滤，不被接收的MessageListener处理");
                return;
            }
            NotifyGroup group = MockNotifyClient.this.notifyGroupManager.getGroup(targetGroup);
            if (null == group) {
                MockNotifyClient.logger.warn(MockNotifyClient.LogPrefix + "接收端没有对应的TargetGroup");
                return;
            }
            MessageListener msgListener = group.getMsgListener();
            if (null != group) {
                msgListener = group.getMsgListener();
            }
            if (null == msgListener) {
                MockNotifyClient.logger.warn(MockNotifyClient.LogPrefix + "订阅者没有设置MessageListener");
                return;
            }
            StringBuilder sb = new StringBuilder();
            ResponseStatus responseStatus = ResponseStatus.NO_ERROR;
            try {
                try {
                    msgListener.receiveMessage(this.message, messageStatus);
                    if (messageStatus.isRollbackOnly()) {
                        String reason = messageStatus.getReason();
                        if (null == reason || "".equals(reason)) {
                            reason = "消息经处理后，被订阅者回滚。但没有给出原因。";
                            MockNotifyClient.logger.warn(MockNotifyClient.LogPrefix + "消息经处理后，被订阅者回滚。但没有给出原因，请设置原因，方便追踪问题!!!");
                        }
                        responseStatus = ResponseStatus.ERROR;
                        sb.append(reason);
                    } else {
                        sb.append("消息处理成功");
                        responseStatus = ResponseStatus.NO_ERROR;
                    }
                    MockNotifyClient.logger.warn(MockNotifyClient.LogPrefix + responseStatus);
                } catch (RuntimeException e2) {
                    MockNotifyClient.logger.warn(MockNotifyClient.LogPrefix + "处理消息发生异常", e2);
                    sb.append("发生异常：");
                    sb.append(e2.getMessage());
                    MockNotifyClient.logger.warn(MockNotifyClient.LogPrefix + ResponseStatus.EXCEPTION);
                }
            } catch (Throwable th) {
                MockNotifyClient.logger.warn(MockNotifyClient.LogPrefix + responseStatus);
                throw th;
            }
        }
    }

    @Override // com.taobao.notify.client.impl.DefaultNotifyClient, com.taobao.notify.client.NotifyClientStub
    public void addGroup(String str, String str2, String str3, MessageListener messageListener, CheckMessageListener checkMessageListener, MessageProperties messageProperties, long j) {
        if (this.notifyGroupManager.containsGroup(str)) {
            logger.warn(LogPrefix + "已添加该组，不允许重复添加, GroupID为：" + str);
        }
        this.notifyGroupManager.addGroup(new NotifyGroup.Builder(str, str2, str3).setMsgListener(messageListener).setCheckMsgListener(checkMessageListener).setMessageProperties(messageProperties).setWaitForConnTime(j).build());
    }

    @Override // com.taobao.notify.client.impl.DefaultNotifyClient, com.taobao.notify.client.NotifyClientStub
    public void close() {
        Iterator it = new HashSet(this.notifyGroupManager.getGroupIds()).iterator();
        while (it.hasNext()) {
            removeGroup((String) it.next());
        }
        this.notifyGroupManager.clear();
    }

    @Override // com.taobao.notify.client.impl.DefaultNotifyClient, com.taobao.notify.client.NotifyClientStub
    public void removeGroup(String str) {
        if (!this.notifyGroupManager.containsGroup(str)) {
            logger.warn(LogPrefix + "没有添加过该组，不能删除, GroupID为：" + str);
            return;
        }
        this.publishTopicsManager.removeTopics(str);
        this.clientSubscriptionManager.removeBindingByGroup(str);
        this.notifyGroupManager.removeGroup(str);
    }

    @Override // com.taobao.notify.client.impl.DefaultNotifyClient, com.taobao.notify.client.NotifyPublisher
    public void setCheckMessageTPCorePoolSize(int i) {
        if (i <= 0) {
            throw new NotifyClientIllegalArgumentException("无效的CheckMessageTPCorePoolSize");
        }
        this.checkMessageTPCorePoolSize = i;
    }

    @Override // com.taobao.notify.client.impl.DefaultNotifyClient, com.taobao.notify.client.NotifyPublisher
    public void setCheckMessageTPMaximumPoolSize(int i) {
        if (i <= 0) {
            throw new NotifyClientIllegalArgumentException("无效的CheckMessageTPMaximumPoolSize");
        }
        this.checkMessageTPMaximumPoolSize = i;
    }

    @Override // com.taobao.notify.client.impl.DefaultNotifyClient, com.taobao.notify.client.NotifyPublisher
    public void setCheckMessageTPKeepAliveTime(long j) {
        if (j <= 0) {
            throw new NotifyClientIllegalArgumentException("无效的CheckMessageTPKeepAliveTime");
        }
        this.checkMessageTPKeepAliveTime = j;
    }

    @Override // com.taobao.notify.client.impl.DefaultNotifyClient, com.taobao.notify.client.NotifySubscriber
    public void setMessageTPKeepAliveTime(long j) {
        if (j <= 0) {
            throw new NotifyClientIllegalArgumentException("无效的MessageTPKeepAliveTime");
        }
        this.messageTPKeepAliveTime = j;
    }

    @Override // com.taobao.notify.client.impl.DefaultNotifyClient, com.taobao.notify.client.NotifySubscriber
    public void setMessageTPCorePoolSize(int i) {
        if (i <= 0) {
            throw new NotifyClientIllegalArgumentException("无效的MessageTPCorePoolSize");
        }
        this.messageTPCorePoolSize = i;
    }

    @Override // com.taobao.notify.client.impl.DefaultNotifyClient, com.taobao.notify.client.NotifySubscriber
    public void setMessageTPMaximumPoolSize(int i) {
        if (i <= 0) {
            throw new NotifyClientIllegalArgumentException("无效的MessageTPMaximumPoolSize");
        }
        this.messageTPMaximumPoolSize = i;
    }

    @Override // com.taobao.notify.client.impl.DefaultNotifyClient, com.taobao.notify.client.NotifyPublisher
    public int getCheckMessageTPCorePoolSize() {
        return this.checkMessageTPCorePoolSize;
    }

    @Override // com.taobao.notify.client.impl.DefaultNotifyClient, com.taobao.notify.client.NotifyPublisher
    public int getCheckMessageTPMaximumPoolSize() {
        return this.checkMessageTPMaximumPoolSize;
    }

    @Override // com.taobao.notify.client.impl.DefaultNotifyClient, com.taobao.notify.client.NotifyPublisher
    public long getCheckMessageTPKeepAliveTime() {
        return this.checkMessageTPKeepAliveTime;
    }

    @Override // com.taobao.notify.client.impl.DefaultNotifyClient, com.taobao.notify.client.NotifySubscriber
    public long getMessageTPKeepAliveTime() {
        return this.messageTPKeepAliveTime;
    }

    @Override // com.taobao.notify.client.impl.DefaultNotifyClient, com.taobao.notify.client.NotifySubscriber
    public int getMessageTPCorePoolSize() {
        return this.messageTPCorePoolSize;
    }

    @Override // com.taobao.notify.client.impl.DefaultNotifyClient, com.taobao.notify.client.NotifySubscriber
    public int getMessageTPMaximumPoolSize() {
        return this.messageTPMaximumPoolSize;
    }

    @Override // com.taobao.notify.client.impl.DefaultNotifyClient, com.taobao.notify.client.NotifyClientStub
    public void showStatus() {
    }

    @Override // com.taobao.notify.client.impl.DefaultNotifyClient, com.taobao.notify.client.NotifyPublisher
    public void addPublishTopic(String str, String str2) {
        if (this.publishTopicsManager.isValidTopic(str, str2)) {
            return;
        }
        this.publishTopicsManager.addTopic(str, str2);
    }

    @Override // com.taobao.notify.client.impl.DefaultNotifyClient, com.taobao.notify.client.NotifyPublisher
    public void resetPublishTopics(String str, Collection<String> collection) {
        this.publishTopicsManager.resetTopics(str, collection);
    }

    @Override // com.taobao.notify.client.impl.DefaultNotifyClient
    protected InnerSendResult innerSendMessage(Message message, String str, String str2) {
        InnerSendResult innerSendResult = new InnerSendResult();
        SendResultType peekSendMessageProcessResultInChain = this.control.peekSendMessageProcessResultInChain();
        if (null == peekSendMessageProcessResultInChain) {
            peekSendMessageProcessResultInChain = SendResultType.SUCCESS;
        }
        switch (peekSendMessageProcessResultInChain) {
            case SUCCESS:
                innerSendResult.addSendResultMessageInfo(UniqId.getInstance().bytes2string(message.getMessageId()), null);
                innerSendResult.setSuccess(true);
                innerSendResult.setErrorMessage("null");
                innerSendResult.setSendResultType(SendResultType.SUCCESS);
                break;
            case TIMEOUT:
                innerSendResult.setErrorMessage("发消息时通信异常：发送超时, " + message.toString());
                innerSendResult.setSuccess(false);
                innerSendResult.addSendResultMessageInfo(UniqId.getInstance().bytes2string(message.getMessageId()), null);
                innerSendResult.setSendResultType(SendResultType.TIMEOUT);
                break;
            case NO_CONNECTION:
                innerSendResult.setErrorMessage("发消息时无对应连接" + message.toString());
                innerSendResult.setSuccess(false);
                innerSendResult.addSendResultMessageInfo(UniqId.getInstance().bytes2string(message.getMessageId()), null);
                innerSendResult.setSendResultType(SendResultType.NO_CONNECTION);
                break;
            case ERROR:
                innerSendResult = new InnerSendResult();
                innerSendResult.setSuccess(false);
                innerSendResult.addSendResultMessageInfo(UniqId.getInstance().bytes2string(message.getMessageId()), null);
                innerSendResult.setErrorMessage("发消息时错误");
                innerSendResult.setSendResultType(SendResultType.ERROR);
                break;
            case ERROR_COMM:
                innerSendResult = new InnerSendResult();
                innerSendResult.setSuccess(false);
                innerSendResult.addSendResultMessageInfo(UniqId.getInstance().bytes2string(message.getMessageId()), null);
                innerSendResult.setErrorMessage("发消息时通信异常");
                innerSendResult.setSendResultType(SendResultType.ERROR_COMM);
                break;
            case EXCEPTION:
                innerSendResult = new InnerSendResult();
                innerSendResult.setSuccess(false);
                innerSendResult.addSendResultMessageInfo(UniqId.getInstance().bytes2string(message.getMessageId()), null);
                innerSendResult.setErrorMessage("发消息时异常");
                innerSendResult.setSendResultType(SendResultType.EXCEPTION);
                break;
            case FORMAT_ERROR:
                innerSendResult = new InnerSendResult();
                innerSendResult.setSuccess(false);
                innerSendResult.addSendResultMessageInfo(UniqId.getInstance().bytes2string(message.getMessageId()), null);
                innerSendResult.setErrorMessage("发消息时格式错误");
                innerSendResult.setSendResultType(SendResultType.FORMAT_ERROR);
                break;
            case NO_PROCESSOR:
                innerSendResult = new InnerSendResult();
                innerSendResult.setSuccess(false);
                innerSendResult.addSendResultMessageInfo(UniqId.getInstance().bytes2string(message.getMessageId()), null);
                innerSendResult.setErrorMessage("没有对应处理器");
                innerSendResult.setSendResultType(SendResultType.NO_PROCESSOR);
                break;
            case THREADPOOL_BUSY:
                innerSendResult = new InnerSendResult();
                innerSendResult.setSuccess(false);
                innerSendResult.addSendResultMessageInfo(UniqId.getInstance().bytes2string(message.getMessageId()), null);
                innerSendResult.setErrorMessage("发消息时线程池繁忙");
                innerSendResult.setSendResultType(SendResultType.THREADPOOL_BUSY);
                break;
            default:
                innerSendResult = new InnerSendResult();
                innerSendResult.setSuccess(false);
                innerSendResult.addSendResultMessageInfo(UniqId.getInstance().bytes2string(message.getMessageId()), null);
                innerSendResult.setErrorMessage("未知异常");
                innerSendResult.setSendResultType(SendResultType.ERROR);
                break;
        }
        return innerSendResult;
    }

    @Override // com.taobao.notify.client.impl.DefaultNotifyClient, com.taobao.notify.client.NotifySubscriber
    public void subscribeMessage(String str, String str2, String str3, boolean z, int i) {
        this.clientSubscriptionManager.addBinding(Binding.direct(str2, str3, str, i, z));
    }

    @Override // com.taobao.notify.client.impl.DefaultNotifyClient, com.taobao.notify.client.NotifySubscriber
    public void subscribeMessages(String str, Map<String, Map<String, SubscriptMsgDetailInfo>> map) {
        this.clientSubscriptionManager.addBindings(innerSubsrcibeMessages(str, map));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void receivedMessage(Message message, long j) {
        this.recvMessageWorkTP.execute(new MockMessageListenerTask(message, j));
    }

    public void receivedCheckMessage(Message message, long j) {
        this.recvMessageWorkTP.execute(new MockCheckMessageListenerTask(message, j));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setControl(MockNotifyClientControl mockNotifyClientControl) {
        this.control = mockNotifyClientControl;
    }

    @Override // com.taobao.notify.client.impl.DefaultNotifyClient, com.taobao.notify.client.NotifySubscriber
    public void subscribe(Binding binding) {
        this.clientSubscriptionManager.addBinding(binding);
    }

    @Override // com.taobao.notify.client.impl.DefaultNotifyClient, com.taobao.notify.client.NotifySubscriber
    public void subscribe(List<Binding> list) {
        this.clientSubscriptionManager.addBindings(list);
        Iterator<String> it = getGroupSetFromBindingList(list).iterator();
        while (it.hasNext()) {
            this.clientSubscriptionManager.getBindingStringByGroup(it.next());
        }
    }

    @Override // com.taobao.notify.client.impl.DefaultNotifyClient, com.taobao.notify.client.NotifySubscriber
    public List<Binding> subscribeWithReplaceBinding(List<Binding> list) {
        List<Binding> replaceBindings = this.clientSubscriptionManager.replaceBindings(list);
        Iterator<String> it = getGroupSetFromBindingList(list).iterator();
        while (it.hasNext()) {
            this.clientSubscriptionManager.getBindingStringByGroup(it.next());
        }
        return replaceBindings;
    }

    private Set<String> getGroupSetFromBindingList(List<Binding> list) {
        HashSet hashSet = new HashSet();
        Iterator<Binding> it = list.iterator();
        while (it.hasNext()) {
            hashSet.add(it.next().getGroup());
        }
        return hashSet;
    }

    @Override // com.taobao.notify.client.impl.DefaultNotifyClient, com.taobao.notify.client.NotifyClientStub
    public int getConnectionCount() {
        return this.connectionCount;
    }

    @Override // com.taobao.notify.client.impl.DefaultNotifyClient, com.taobao.notify.client.NotifyClientStub
    public void setConnectionCount(int i) {
        this.connectionCount = i;
    }

    @Override // com.taobao.notify.client.impl.DefaultNotifyClient
    protected InnerSendResult checkMessage(Message message) {
        String topic = message.getTopic();
        String groupId = message.getGroupId();
        String messageType = message.getMessageType();
        InnerSendResult innerSendResult = new InnerSendResult();
        if (null == topic) {
            logger.error("", LogPrefix + "没有设置Topic" + message.toExtStringWithoutBody());
            innerSendResult.clearSendResultMessageInfo();
            innerSendResult.addSendResultMessageInfo(UniqId.getInstance().bytes2string(message.getMessageId()), null);
            innerSendResult.setSuccess(false);
            innerSendResult.setErrorMessage("没有设置Topic");
            innerSendResult.setSendResultType(SendResultType.FORMAT_ERROR);
            this.control.peekSendMessageProcessResultInChain();
            return innerSendResult;
        }
        if (null == messageType) {
            logger.error("", LogPrefix + "没有设置MessageType" + message.toExtStringWithoutBody());
            innerSendResult.clearSendResultMessageInfo();
            innerSendResult.addSendResultMessageInfo(UniqId.getInstance().bytes2string(message.getMessageId()), null);
            innerSendResult.setSuccess(false);
            innerSendResult.setErrorMessage("没有设置MessageType");
            innerSendResult.setSendResultType(SendResultType.FORMAT_ERROR);
            this.control.peekSendMessageProcessResultInChain();
            return innerSendResult;
        }
        if (null == groupId) {
            logger.error("", LogPrefix + "没有设置GroupID" + message.toExtStringWithoutBody());
            innerSendResult.clearSendResultMessageInfo();
            innerSendResult.addSendResultMessageInfo(UniqId.getInstance().bytes2string(message.getMessageId()), null);
            innerSendResult.setSuccess(false);
            innerSendResult.setErrorMessage("没有设置GroupID");
            innerSendResult.setSendResultType(SendResultType.FORMAT_ERROR);
            this.control.peekSendMessageProcessResultInChain();
            return innerSendResult;
        }
        if ((message instanceof PackagedMessage) && ((PackagedMessage) message).getMessageListSize() == 0) {
            logger.error("", LogPrefix + "不能传输不包含Message的PackagedMessage" + message.toExtStringWithoutBody());
            innerSendResult.clearSendResultMessageInfo();
            innerSendResult.addSendResultMessageInfo(UniqId.getInstance().bytes2string(message.getMessageId()), null);
            innerSendResult.setSuccess(false);
            innerSendResult.setErrorMessage("不能传输不包含Message的PackagedMessage");
            innerSendResult.setSendResultType(SendResultType.ERROR);
            this.control.peekSendMessageProcessResultInChain();
            return innerSendResult;
        }
        if (this.notifyGroupManager.containsGroup(groupId)) {
            if (this.publishTopicsManager.isValidTopic(groupId, topic)) {
                return null;
            }
            addPublishTopic(groupId, topic);
            return null;
        }
        logger.error("", LogPrefix + "设置了无效的GroupID" + message.toExtStringWithoutBody());
        innerSendResult.clearSendResultMessageInfo();
        innerSendResult.addSendResultMessageInfo(UniqId.getInstance().bytes2string(message.getMessageId()), null);
        innerSendResult.setSuccess(false);
        innerSendResult.setErrorMessage("设置了无效的GroupID");
        innerSendResult.setSendResultType(SendResultType.FORMAT_ERROR);
        this.control.peekSendMessageProcessResultInChain();
        return innerSendResult;
    }
}
