package io.jboot.core.mq.aliyunmq;

import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Consumer;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.Producer;
import io.jboot.Jboot;
import io.jboot.core.cache.ehredis.JbootEhredisCacheImpl;
import io.jboot.core.mq.Jbootmq;
import io.jboot.core.mq.JbootmqBase;
import io.jboot.exception.JbootException;
import io.jboot.utils.StringUtils;
import java.util.Properties;

/* loaded from: input_file:io/jboot/core/mq/aliyunmq/JbootAliyunmqImpl.class */
public class JbootAliyunmqImpl extends JbootmqBase implements Jbootmq, MessageListener {
    private Producer producer;
    private Consumer consumer;

    public void JbootAliyunmq() {
        JbootAliyunmqConfig jbootAliyunmqConfig = (JbootAliyunmqConfig) Jboot.config(JbootAliyunmqConfig.class);
        Properties properties = new Properties();
        properties.put("AccessKey", jbootAliyunmqConfig.getAccessKey());
        properties.put("SecretKey", jbootAliyunmqConfig.getSecretKey());
        properties.put("ProducerId", jbootAliyunmqConfig.getProducerId());
        properties.put("ONSAddr", jbootAliyunmqConfig.getAddr());
        properties.setProperty("SendMsgTimeoutMillis", jbootAliyunmqConfig.getSendMsgTimeoutMillis());
        this.producer = ONSFactory.createProducer(properties);
        this.consumer = ONSFactory.createConsumer(properties);
        String channel = jbootAliyunmqConfig.getChannel();
        if (StringUtils.isBlank(channel)) {
            throw new JbootException("jboot.mq.aliyun.channel config cannot empty in jboot.properties");
        }
        for (String str : channel.split(",")) {
            this.consumer.subscribe(str, "*", this);
        }
        this.consumer.subscribe(JbootEhredisCacheImpl.DEFAULT_NOTIFY_CHANNEL, "*", this);
        this.producer.start();
        this.consumer.start();
    }

    @Override // io.jboot.core.mq.Jbootmq
    public void enqueue(Object obj, String str) {
        throw new RuntimeException("not finished!");
    }

    @Override // io.jboot.core.mq.Jbootmq
    public void publish(Object obj, String str) {
        this.producer.send(new Message(str, "*", Jboot.me().getSerializer().serialize(obj)));
    }

    public Action consume(Message message, ConsumeContext consumeContext) {
        notifyListeners(message.getTopic(), Jboot.me().getSerializer().deserialize(message.getBody()));
        return Action.CommitMessage;
    }
}
