package com.taobao.notify.remotingservice;

import com.taobao.gecko.core.command.ResponseStatus;
import com.taobao.gecko.core.util.RemotingUtils;
import com.taobao.gecko.service.Connection;
import com.taobao.gecko.service.RemotingClient;
import com.taobao.gecko.service.RemotingFactory;
import com.taobao.gecko.service.RemotingServer;
import com.taobao.gecko.service.RequestProcessor;
import com.taobao.gecko.service.config.ClientConfig;
import com.taobao.gecko.service.config.ServerConfig;
import com.taobao.gecko.service.config.WireFormatType;
import com.taobao.gecko.service.exception.NotifyRemotingException;
import com.taobao.middleware.logger.Logger;
import com.taobao.notify.client.IOClientSelector;
import com.taobao.notify.client.exception.NotifyClientException;
import com.taobao.notify.client.impl.RandomIOClientSelector;
import com.taobao.notify.client.log.NotifyClientLogger;
import com.taobao.notify.client.manager.ClientSubscriptionManager;
import com.taobao.notify.client.manager.NotifyGroup;
import com.taobao.notify.client.manager.NotifyGroupManager;
import com.taobao.notify.message.Message;
import com.taobao.notify.remoting.core.command.request.DeliverMessageCommand;
import com.taobao.notify.remoting.core.command.response.MessageAckCommand;
import com.taobao.notify.remoting.service.config.NotifyWireFormatType;
import com.taobao.notify.remotingclient.CheckMessageListener;
import com.taobao.notify.remotingclient.MessageStatus;
import com.taobao.notify.remotingclient.addresses.impl.NSAddressLoadMode;
import com.taobao.notify.remotingclient.processor.DeliverMessageProcessor;
import com.taobao.notify.remotingservice.responsitory.UrlManager;
import com.taobao.notify.subscription.Binding;
import com.taobao.notify.tools.DataIdTools;
import com.taobao.notify.utils.LoggerPrefix;
import com.taobao.notify.utils.task.TaskManager;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/* loaded from: input_file:lib/notify-tr-client-5.0.4.jar:com/taobao/notify/remotingservice/IntegratedMockRemotingService.class */
public class IntegratedMockRemotingService implements RemotingService {
    private static final String LogPrefix = LoggerPrefix.makeLogPrefix(IntegratedMockRemotingService.class);
    static final Logger logger = NotifyClientLogger.logger();
    final RemotingServer server;
    final RemotingClient client;
    final int localServerPort;
    final ClientSubscriptionManager clientSubscriptionManager;
    final ExecutorService checkMessageService;
    final Set<Binding> closedBindingSet = new CopyOnWriteArraySet();
    final Set<RemoteSubscriber> remoteSubscribers = new HashSet();
    final List<Message> checkMsgList = new CopyOnWriteArrayList();
    private volatile boolean bRun = true;
    private volatile int messageTPCorePoolSize = 1;
    private volatile int messageTPMaximumPoolSize = 1;
    private volatile long messageTPKeepAliveTime = 1000;
    private volatile int checkMessageTPCorePoolSize = 1;
    private volatile int checkMessageTPMaximumPoolSize = 1;
    private volatile long checkMessageTPKeepAliveTime = 1000;
    private volatile int connectionCount = 1;
    private NSAddressLoadMode loadMode = NSAddressLoadMode.ConfigServer;

    /* loaded from: input_file:lib/notify-tr-client-5.0.4.jar:com/taobao/notify/remotingservice/IntegratedMockRemotingService$CheckMessageTask.class */
    public class CheckMessageTask implements Runnable {
        private final NotifyGroupManager notifyGroupManager;
        private final WrappedIOClient ioclient;

        public CheckMessageTask(NotifyGroupManager notifyGroupManager, RemotingClient remotingClient, Set<RemoteSubscriber> set) {
            this.notifyGroupManager = notifyGroupManager;
            this.ioclient = new IntegratedMockIOClient(remotingClient, set, null);
        }

        @Override // java.lang.Runnable
        public void run() {
            while (IntegratedMockRemotingService.this.bRun) {
                if (0 == IntegratedMockRemotingService.this.checkMsgList.size()) {
                    sleep();
                } else {
                    for (Message message : IntegratedMockRemotingService.this.checkMsgList) {
                        String groupId = message.getGroupId();
                        NotifyGroup group = this.notifyGroupManager.getGroup(groupId);
                        if (null == group) {
                            IntegratedMockRemotingService.logger.warn(IntegratedMockRemotingService.LogPrefix + "没有对应的GroupID：" + groupId);
                        } else {
                            CheckMessageListener checkMsgListener = group.getCheckMsgListener();
                            if (null == checkMsgListener) {
                                IntegratedMockRemotingService.logger.warn(IntegratedMockRemotingService.LogPrefix + "没有设置CheckMessageListener，GroupID为" + groupId);
                            } else {
                                MessageStatus messageStatus = new MessageStatus();
                                try {
                                    checkMsgListener.receiveCheckMessage(message, messageStatus);
                                } catch (Throwable th) {
                                    IntegratedMockRemotingService.logger.warn(IntegratedMockRemotingService.LogPrefix + "接收到CheckMessage后处理抛异常，请客户自查", th);
                                }
                                if (!messageStatus.isRollbackOnly()) {
                                    message.setCommitted(true);
                                    this.ioclient.sendWithSync(message, 1000L);
                                }
                            }
                            IntegratedMockRemotingService.this.checkMsgList.remove(message);
                        }
                    }
                    sleep();
                }
            }
        }

        private void sleep() {
            try {
                Thread.sleep(20L);
            } catch (InterruptedException e) {
                IntegratedMockRemotingService.logger.warn(IntegratedMockRemotingService.LogPrefix + "sleep失败");
            }
        }
    }

    /* loaded from: input_file:lib/notify-tr-client-5.0.4.jar:com/taobao/notify/remotingservice/IntegratedMockRemotingService$InnerDeliverMessageProcessor.class */
    public class InnerDeliverMessageProcessor implements RequestProcessor<DeliverMessageCommand> {
        private final Set<Binding> closedBindingSet;
        private final DeliverMessageProcessor deliverMessageProcessor;

        public InnerDeliverMessageProcessor(NotifyGroupManager notifyGroupManager, Set<Binding> set) {
            this.closedBindingSet = set;
            this.deliverMessageProcessor = new DeliverMessageProcessor(notifyGroupManager, null);
        }

        @Override // com.taobao.gecko.service.RequestProcessor
        public ThreadPoolExecutor getExecutor() {
            return null;
        }

        @Override // com.taobao.gecko.service.RequestProcessor
        public void handleRequest(DeliverMessageCommand deliverMessageCommand, Connection connection) {
            if (innerCheckRequest(deliverMessageCommand, connection)) {
                this.deliverMessageProcessor.handleRequest(deliverMessageCommand, connection);
            }
        }

        private boolean innerCheckRequest(DeliverMessageCommand deliverMessageCommand, Connection connection) {
            Message message = deliverMessageCommand.getMessage();
            if (!IntegratedMockRemotingService.this.clientSubscriptionManager.isValidSubscription(message)) {
                try {
                    connection.response(new MessageAckCommand(deliverMessageCommand, ResponseStatus.NO_ERROR, null, message.getMessageId()));
                    return false;
                } catch (NotifyRemotingException e) {
                    IntegratedMockRemotingService.logger.warn(IntegratedMockRemotingService.LogPrefix + "有效订阅", e);
                    return false;
                }
            }
            Iterator<Binding> it = this.closedBindingSet.iterator();
            while (it.hasNext()) {
                if (it.next().match(message)) {
                    try {
                        connection.response(new MessageAckCommand(deliverMessageCommand, ResponseStatus.NO_ERROR, null, message.getMessageId()));
                        return false;
                    } catch (NotifyRemotingException e2) {
                        IntegratedMockRemotingService.logger.warn(IntegratedMockRemotingService.LogPrefix + "关闭订阅关系", e2);
                        return false;
                    }
                }
            }
            return true;
        }
    }

    /* loaded from: input_file:lib/notify-tr-client-5.0.4.jar:com/taobao/notify/remotingservice/IntegratedMockRemotingService$RemoteSubscriber.class */
    public static class RemoteSubscriber {
        private String topic;
        private String groupId;
        private String ip;
        private int port;

        public RemoteSubscriber(String str, String str2, String str3, int i) {
            this.topic = str;
            this.groupId = str2;
            this.ip = str3;
            this.port = i;
        }

        public RemoteSubscriber(String str) {
            String[] split = str.split(":");
            this.topic = split[0];
            this.groupId = split[1];
            this.ip = split[2];
            this.port = Integer.valueOf(split[3]).intValue();
        }

        public String getTopic() {
            return this.topic;
        }

        public void setTopic(String str) {
            this.topic = str;
        }

        public String getGroupId() {
            return this.groupId;
        }

        public void setGroupId(String str) {
            this.groupId = str;
        }

        public String getIp() {
            return this.ip;
        }

        public void setIp(String str) {
            this.ip = str;
        }

        public int getPort() {
            return this.port;
        }

        public void setPort(int i) {
            this.port = i;
        }
    }

    public IntegratedMockRemotingService(ClientSubscriptionManager clientSubscriptionManager, Set<String> set, NotifyGroupManager notifyGroupManager, int i) {
        this.clientSubscriptionManager = clientSubscriptionManager;
        this.localServerPort = i;
        ServerConfig serverConfig = new ServerConfig();
        serverConfig.setPort(i);
        this.server = RemotingFactory.newRemotingServer(serverConfig);
        this.server.registerProcessor(DeliverMessageCommand.class, new InnerDeliverMessageProcessor(notifyGroupManager, this.closedBindingSet));
        try {
            this.server.start();
        } catch (NotifyRemotingException e) {
            logger.error("", LogPrefix + "server.start() 错误", e);
        }
        ClientConfig clientConfig = new ClientConfig();
        clientConfig.setWireFormatType(new NotifyWireFormatType());
        try {
            this.client = RemotingFactory.connect(clientConfig);
            Iterator<String> it = set.iterator();
            while (it.hasNext()) {
                RemoteSubscriber remoteSubscriber = new RemoteSubscriber(it.next());
                this.remoteSubscribers.add(remoteSubscriber);
                String formatServerUrl = RemotingUtils.formatServerUrl(WireFormatType.valueOf("NOTIFY_V1"), remoteSubscriber.getIp(), remoteSubscriber.getPort());
                try {
                    this.client.connect(formatServerUrl, 1);
                } catch (NotifyRemotingException e2) {
                    logger.error("", LogPrefix + "client.connect(url, 1) 错误, URL为：" + formatServerUrl, e2);
                }
            }
            this.checkMessageService = Executors.newSingleThreadExecutor();
            this.checkMessageService.execute(new CheckMessageTask(notifyGroupManager, this.client, this.remoteSubscribers));
        } catch (NotifyRemotingException e3) {
            logger.error("", LogPrefix + "client.start() 错误", e3);
            throw new NotifyClientException("Client.start失败", e3);
        }
    }

    @Override // com.taobao.notify.remotingservice.RemotingService
    public void closeAllIOClients() {
        this.bRun = false;
        this.checkMessageService.shutdown();
        try {
            this.server.stop();
            this.client.stop();
        } catch (NotifyRemotingException e) {
            logger.error("", LogPrefix + "closeAllIOClients() 错误", e);
        }
    }

    @Override // com.taobao.notify.remotingservice.RemotingService
    public boolean isMock() {
        return true;
    }

    @Override // com.taobao.notify.remotingservice.RemotingService
    public void closeIOClients(String str, String str2, boolean z) {
        showStatus();
    }

    @Override // com.taobao.notify.remotingservice.RemotingService
    public void closeSubscription(Binding binding) {
        this.closedBindingSet.add(binding);
    }

    @Override // com.taobao.notify.remotingservice.RemotingService
    public void createIOClients(boolean z, String str, NotifyGroup notifyGroup) {
        this.bRun = false;
    }

    @Override // com.taobao.notify.remotingservice.RemotingService
    public Set<Binding> getClosedSubscriptions() {
        return new HashSet(this.closedBindingSet);
    }

    @Override // com.taobao.notify.remotingservice.RemotingService
    public int getConnectionCount() {
        return this.connectionCount;
    }

    @Override // com.taobao.notify.remotingservice.RemotingService
    public WrappedIOClient getIOClient(String str, String str2) throws RuntimeException {
        String topicFromNSDataId = DataIdTools.getTopicFromNSDataId(str);
        int i = 0;
        for (RemoteSubscriber remoteSubscriber : this.remoteSubscribers) {
            if (topicFromNSDataId != null && topicFromNSDataId.equals(remoteSubscriber.getTopic())) {
                i++;
            }
        }
        if (0 != i) {
            return new IntegratedMockIOClient(this.client, this.remoteSubscribers, this.checkMsgList);
        }
        String str3 = "GroupID为：[" + str2 + "]的NotifyManager没有向配置中心订阅TOPIC[" + str + "]对应的NS地址.";
        logger.warn(LogPrefix + str3);
        throw new RuntimeException(str3);
    }

    @Override // com.taobao.notify.remotingservice.RemotingService
    public int getUrlCount(String str) {
        String topicFromNSDataId = DataIdTools.getTopicFromNSDataId(str);
        int i = 0;
        for (RemoteSubscriber remoteSubscriber : this.remoteSubscribers) {
            if (topicFromNSDataId != null && topicFromNSDataId.equals(remoteSubscriber.getTopic())) {
                i++;
            }
        }
        return i;
    }

    @Override // com.taobao.notify.remotingservice.RemotingService
    public List<String> getUrls(String str) {
        LinkedList linkedList = new LinkedList();
        String topicFromNSDataId = DataIdTools.getTopicFromNSDataId(str);
        for (RemoteSubscriber remoteSubscriber : this.remoteSubscribers) {
            if (topicFromNSDataId != null && topicFromNSDataId.equals(remoteSubscriber.getTopic())) {
                linkedList.add(RemotingUtils.formatServerUrl(WireFormatType.valueOf("NOTIFY_V1"), remoteSubscriber.getIp(), remoteSubscriber.getPort()));
            }
        }
        return linkedList;
    }

    @Override // com.taobao.notify.remotingservice.RemotingService
    public void openSubscription(Binding binding) {
        this.closedBindingSet.remove(binding);
    }

    @Override // com.taobao.notify.remotingservice.RemotingService
    public void sendSubscription(String str, String str2, ClientSubscriptionManager clientSubscriptionManager, boolean z) {
    }

    @Override // com.taobao.notify.remotingservice.RemotingService
    public void setCheckMessageTPCorePoolSize(int i) {
        this.checkMessageTPCorePoolSize = i;
    }

    @Override // com.taobao.notify.remotingservice.RemotingService
    public void setCheckMessageTPKeepAliveTime(long j) {
        this.checkMessageTPKeepAliveTime = j;
    }

    @Override // com.taobao.notify.remotingservice.RemotingService
    public void setCheckMessageTPMaximumPoolSize(int i) {
        this.checkMessageTPMaximumPoolSize = i;
    }

    @Override // com.taobao.notify.remotingservice.RemotingService
    public void setConnectionCount(int i) {
        this.connectionCount = i;
    }

    @Override // com.taobao.notify.remotingservice.RemotingService
    public void setMessageTPCorePoolSize(int i) {
        this.messageTPCorePoolSize = i;
    }

    @Override // com.taobao.notify.remotingservice.RemotingService
    public void setMessageTPKeepAliveTime(long j) {
        this.messageTPKeepAliveTime = j;
    }

    @Override // com.taobao.notify.remotingservice.RemotingService
    public void setMessageTPMaximumPoolSize(int i) {
        this.messageTPMaximumPoolSize = i;
    }

    @Override // com.taobao.notify.remotingservice.RemotingService
    public int getMessageTPCorePoolSize() {
        return this.messageTPCorePoolSize;
    }

    @Override // com.taobao.notify.remotingservice.RemotingService
    public int getMessageTPMaximumPoolSize() {
        return this.messageTPMaximumPoolSize;
    }

    @Override // com.taobao.notify.remotingservice.RemotingService
    public long getMessageTPKeepAliveTime() {
        return this.messageTPKeepAliveTime;
    }

    @Override // com.taobao.notify.remotingservice.RemotingService
    public int getCheckMessageTPCorePoolSize() {
        return this.checkMessageTPCorePoolSize;
    }

    @Override // com.taobao.notify.remotingservice.RemotingService
    public int getCheckMessageTPMaximumPoolSize() {
        return this.checkMessageTPMaximumPoolSize;
    }

    @Override // com.taobao.notify.remotingservice.RemotingService
    public long getCheckMessageTPKeepAliveTime() {
        return this.checkMessageTPKeepAliveTime;
    }

    @Override // com.taobao.notify.remotingservice.RemotingService
    public IOClientSelector getIoClientSelector() {
        return new RandomIOClientSelector();
    }

    @Override // com.taobao.notify.remotingservice.RemotingService
    public void setIoClientSelector(IOClientSelector iOClientSelector) {
    }

    @Override // com.taobao.notify.remotingservice.RemotingService
    public void showStatus() {
    }

    @Override // com.taobao.notify.remotingservice.RemotingService
    public void awaitReadyInterruptibly() throws InterruptedException {
    }

    @Override // com.taobao.notify.remotingservice.RemotingService
    public void awaitReadyInterruptibly(long j, TimeUnit timeUnit) throws InterruptedException {
    }

    @Override // com.taobao.notify.remotingservice.RemotingService
    public void setNSAddressLoadMode(NSAddressLoadMode nSAddressLoadMode) {
        this.loadMode = nSAddressLoadMode;
    }

    @Override // com.taobao.notify.remotingservice.RemotingService
    public Map<String, Set<String>> getSnapshotOfTopics2Group() {
        return Collections.emptyMap();
    }

    @Override // com.taobao.notify.remotingservice.RemotingService
    public Map<String, List<String>> getSnapshotOfTopics2ServerUrl() {
        return Collections.emptyMap();
    }

    @Override // com.taobao.notify.remotingservice.RemotingService
    public void setUnitSlave(Boolean bool) {
    }

    @Override // com.taobao.notify.remotingservice.RemotingService
    public TaskManager getTaskManager() {
        return null;
    }

    @Override // com.taobao.notify.remotingservice.RemotingService
    public RemotingClient getRemotingClient() {
        return null;
    }

    @Override // com.taobao.notify.remotingservice.RemotingService
    public UrlManager getUrlManager() {
        return null;
    }
}
