package com.aliyun.openservices.ons.api.impl.notify;

import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Consumer;
import com.aliyun.openservices.ons.api.ExpressionType;
import com.aliyun.openservices.ons.api.MessageListener;
import com.aliyun.openservices.ons.api.MessageSelector;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.exception.ONSClientException;
import com.aliyun.openservices.ons.api.impl.notify.util.NotifyMessageConvertUtil;
import com.aliyun.openservices.ons.api.impl.rocketmq.ONSClientAbstract;
import com.taobao.notify.common.config.MessageProperties;
import com.taobao.notify.config.AllNotifyClientProperties;
import com.taobao.notify.config.NotifyClientConfig;
import com.taobao.notify.message.Message;
import com.taobao.notify.remotingclient.DefaultNotifyManager;
import com.taobao.notify.remotingclient.MessageStatus;
import com.taobao.notify.remotingclient.NotifyManager;
import com.taobao.notify.subscription.Binding;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.lang.StringUtils;

/* loaded from: input_file:lib/ons-sdk-1.8.0-EagleEye.jar:com/aliyun/openservices/ons/api/impl/notify/ConsumerImpl.class */
public class ConsumerImpl extends ONSClientAbstract implements Consumer {
    private static final String TAG_WILDCARDS = "*";
    private static final String TAG_SEPARATOR = "||";
    List<Binding> bindingList;
    private ConcurrentHashMap<String, MessageListener> topicListenerMap;
    private ConsumeContext consumeContext;
    private NotifyManager notifyManager;
    private AtomicBoolean closed;
    private AtomicBoolean started;

    public ConsumerImpl(Properties properties) {
        super(properties);
        this.bindingList = new CopyOnWriteArrayList();
        this.topicListenerMap = new ConcurrentHashMap<>();
        this.consumeContext = new ConsumeContext();
        this.closed = new AtomicBoolean(false);
        this.started = new AtomicBoolean(false);
    }

    @Override // com.aliyun.openservices.ons.api.impl.rocketmq.ONSClientAbstract
    protected void updateNameServerAddr(String str) {
    }

    @Override // com.aliyun.openservices.ons.api.impl.rocketmq.ONSClientAbstract, com.aliyun.openservices.ons.api.Admin
    public void start() {
        this.started.compareAndSet(false, true);
    }

    @Override // com.aliyun.openservices.ons.api.impl.rocketmq.ONSClientAbstract, com.aliyun.openservices.ons.api.Admin
    public void shutdown() {
        if (!this.closed.compareAndSet(false, true) || this.notifyManager == null) {
            return;
        }
        this.notifyManager.close();
    }

    @Override // com.aliyun.openservices.ons.api.Consumer
    public void subscribe(String str, String str2, MessageListener messageListener) {
        subscribeNotify(str, str2, true, messageListener);
    }

    @Override // com.aliyun.openservices.ons.api.Consumer
    public void subscribe(String str, MessageSelector messageSelector, MessageListener messageListener) {
        if (messageSelector == null) {
            subscribeNotify(str, "*", true, messageListener);
        } else {
            if (messageSelector.getType() == null) {
                throw new ONSClientException("Expression type is null");
            }
            if (!messageSelector.getType().equals(ExpressionType.TAG)) {
                throw new ONSClientException("Notify only support tag expression!");
            }
            subscribeNotify(str, messageSelector.getSubExpression(), true, messageListener);
        }
    }

    public void subscribeNotify(String str, String str2, boolean z, MessageListener messageListener) {
        if (null == str) {
            throw new ONSClientException("topic is null");
        }
        if (null == str2) {
            throw new ONSClientException("subExpression is null");
        }
        if (null == messageListener) {
            throw new ONSClientException("listener is null");
        }
        if (this.closed.get()) {
            throw new ONSClientException("Client 已关闭,请检查...");
        }
        if (this.topicListenerMap.containsKey(str)) {
            return;
        }
        this.topicListenerMap.put(str, messageListener);
        if (null == this.notifyManager) {
            this.notifyManager = buildNotifyManager();
        }
        if (StringUtils.equals("*", str2.trim())) {
            this.bindingList.add(Binding.fanout(str, this.properties.getProperty(PropertyKeyConst.GROUP_ID, this.properties.getProperty(PropertyKeyConst.ConsumerId)), -1, z));
        } else {
            for (String str3 : str2.contains(TAG_SEPARATOR) ? str2.split("\\|\\|") : new String[]{str2}) {
                this.bindingList.add(Binding.direct(str, str3.trim(), this.properties.getProperty(PropertyKeyConst.GROUP_ID, this.properties.getProperty(PropertyKeyConst.ConsumerId)), -1, z, "center"));
            }
        }
        this.notifyManager.subscribe(this.bindingList);
    }

    private NotifyManager buildNotifyManager() {
        String property = this.properties.getProperty(PropertyKeyConst.GROUP_ID, this.properties.getProperty(PropertyKeyConst.ConsumerId));
        System.setProperty("notify_check_" + property, "");
        if (StringUtils.isBlank(property)) {
            throw new ONSClientException("GROUP_ID or ConsumerID is null.");
        }
        AllNotifyClientProperties allNotifyClientProperties = new AllNotifyClientProperties();
        NotifyClientConfig notifyClientConfig = new NotifyClientConfig();
        allNotifyClientProperties.setMessageProperties(new MessageProperties());
        DefaultNotifyManager.Builder builder = new DefaultNotifyManager.Builder(property, "aliyun-notify-name", "aliyun-notify-desc");
        builder.setMsgListener(new com.taobao.notify.remotingclient.MessageListener() { // from class: com.aliyun.openservices.ons.api.impl.notify.ConsumerImpl.1
            public void receiveMessage(Message message, MessageStatus messageStatus) {
                MessageListener messageListener = (MessageListener) ConsumerImpl.this.topicListenerMap.get(message.getTopic());
                if (messageListener == null) {
                    messageStatus.setRollbackOnly();
                    messageStatus.setReason(" no message Listener");
                    System.out.print(" 没有设置 topic " + message.getTopic() + " 的 message Listener\n");
                    return;
                }
                try {
                    if (messageListener.consume(NotifyMessageConvertUtil.convert2ONSMessage(message), ConsumerImpl.this.consumeContext) == Action.ReconsumeLater) {
                        messageStatus.setRollbackOnly();
                    }
                } catch (Throwable th) {
                    messageStatus.setRollbackOnly();
                    ONSClientException oNSClientException = new ONSClientException(th.getMessage());
                    oNSClientException.initCause(th);
                    throw oNSClientException;
                }
            }
        });
        NotifyManager build = builder.setProperties(allNotifyClientProperties).setNotifyClientConfig(notifyClientConfig).build();
        build.getNotifyClient().setNSAddressLoadMode(notifyClientConfig.getNSAddressLoadMode());
        build.getNotifyClientConfig().copyFrom(notifyClientConfig);
        build.setConnectionCount(notifyClientConfig.getConnectionCount());
        build.setMaxRetry(notifyClientConfig.getMaxRetry());
        this.notifyManager = build;
        if (this.properties.containsKey(PropertyKeyConst.ConsumeThreadNums)) {
            build.setMessageTPCorePoolSize(Integer.valueOf(this.properties.get(PropertyKeyConst.ConsumeThreadNums).toString()).intValue());
            build.setMessageTPMaximumPoolSize(Integer.valueOf(this.properties.get(PropertyKeyConst.ConsumeThreadNums).toString()).intValue());
        }
        return build;
    }

    @Override // com.aliyun.openservices.ons.api.Consumer
    public void unsubscribe(String str) {
        throw new UnsupportedOperationException("Notify not support unsubscribe method.");
    }

    @Override // com.aliyun.openservices.ons.api.impl.rocketmq.ONSClientAbstract, com.aliyun.openservices.ons.api.Admin
    public boolean isStarted() {
        return this.started.get();
    }

    @Override // com.aliyun.openservices.ons.api.impl.rocketmq.ONSClientAbstract, com.aliyun.openservices.ons.api.Admin
    public boolean isClosed() {
        return this.closed.get();
    }
}
