package io.jboot.core.mq.zbus;

import com.google.common.collect.Maps;
import com.jfinal.log.Log;
import io.jboot.Jboot;
import io.jboot.core.cache.ehredis.JbootEhredisCacheImpl;
import io.jboot.core.mq.Jbootmq;
import io.jboot.core.mq.JbootmqBase;
import io.jboot.exception.JbootException;
import io.jboot.utils.StringUtils;
import io.zbus.mq.Broker;
import io.zbus.mq.ConsumeGroup;
import io.zbus.mq.Consumer;
import io.zbus.mq.ConsumerConfig;
import io.zbus.mq.Message;
import io.zbus.mq.MessageHandler;
import io.zbus.mq.MqClient;
import io.zbus.mq.Producer;
import java.io.IOException;
import java.util.Map;

/* loaded from: input_file:io/jboot/core/mq/zbus/JbootZbusmqImpl.class */
public class JbootZbusmqImpl extends JbootmqBase implements Jbootmq, MessageHandler {
    private static final Log LOG = Log.getLog(JbootZbusmqImpl.class);
    private Broker broker;
    private Map<String, Producer> producerMap = Maps.newConcurrentMap();

    public JbootZbusmqImpl() {
        JbootZbusmqConfig jbootZbusmqConfig = (JbootZbusmqConfig) Jboot.config(JbootZbusmqConfig.class);
        String channel = jbootZbusmqConfig.getChannel();
        if (StringUtils.isBlank(channel)) {
            throw new JbootException("channel config cannot empty in jboot.properties");
        }
        String[] split = (channel.endsWith(",") ? channel + JbootEhredisCacheImpl.DEFAULT_NOTIFY_CHANNEL : channel + ",jboot_ehredis_channel").split(",");
        this.broker = new Broker(jbootZbusmqConfig.getBroker());
        for (String str : split) {
            ConsumerConfig consumerConfig = new ConsumerConfig(this.broker);
            consumerConfig.setTopic(str);
            consumerConfig.setMessageHandler(this);
            consumerConfig.setConsumeGroup(ConsumeGroup.createTempBroadcastGroup());
            try {
                new Consumer(consumerConfig).start();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        String queue = jbootZbusmqConfig.getQueue();
        if (StringUtils.isBlank(queue)) {
            return;
        }
        for (String str2 : queue.split(",")) {
            ConsumerConfig consumerConfig2 = new ConsumerConfig(this.broker);
            consumerConfig2.setTopic(str2);
            consumerConfig2.setMessageHandler(this);
            try {
                new Consumer(consumerConfig2).start();
            } catch (IOException e2) {
                e2.printStackTrace();
            }
        }
    }

    @Override // io.jboot.core.mq.Jbootmq
    public void enqueue(Object obj, String str) {
        publish(obj, str);
    }

    @Override // io.jboot.core.mq.Jbootmq
    public void publish(Object obj, String str) {
        Producer producer = getProducer(str);
        Message message = new Message();
        message.setTopic(str);
        message.setBody(Jboot.me().getSerializer().serialize(obj));
        try {
            producer.publish(message);
        } catch (Exception e) {
            LOG.error(e.toString(), e);
        }
    }

    public Producer getProducer(String str) {
        Producer producer = this.producerMap.get(str);
        if (producer == null) {
            producer = new Producer(this.broker);
            try {
                producer.declareTopic(str);
            } catch (Exception e) {
                LOG.error(e.toString(), e);
            }
            this.producerMap.put(str, producer);
        }
        return producer;
    }

    public void handle(Message message, MqClient mqClient) throws IOException {
        notifyListeners(message.getTopic(), Jboot.me().getSerializer().deserialize(message.getBody()));
    }
}
