/*
 * Decompiled with CFR 0.152.
 */
package org.apache.dubbo.rpc.rocketmq;

import java.io.IOException;
import java.util.List;
import java.util.Objects;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.remoting.Channel;
import org.apache.dubbo.remoting.buffer.ChannelBuffer;
import org.apache.dubbo.remoting.buffer.DynamicChannelBuffer;
import org.apache.dubbo.remoting.buffer.HeapChannelBuffer;
import org.apache.dubbo.remoting.exchange.Request;
import org.apache.dubbo.remoting.exchange.Response;
import org.apache.dubbo.rpc.Exporter;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Protocol;
import org.apache.dubbo.rpc.ProtocolServer;
import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcContext;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.model.FrameworkModel;
import org.apache.dubbo.rpc.model.ScopeModel;
import org.apache.dubbo.rpc.protocol.AbstractProtocol;
import org.apache.dubbo.rpc.rocketmq.RocketMQChannel;
import org.apache.dubbo.rpc.rocketmq.RocketMQExporter;
import org.apache.dubbo.rpc.rocketmq.RocketMQInvoker;
import org.apache.dubbo.rpc.rocketmq.RocketMQProtocolConstant;
import org.apache.dubbo.rpc.rocketmq.RocketMQProtocolServer;
import org.apache.dubbo.rpc.rocketmq.codec.RocketMQCountCodec;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.utils.MessageUtil;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;

public class RocketMQProtocol
extends AbstractProtocol {
    public static final String NAME = "rocketmq";
    public static final int DEFAULT_PORT = 20880;

    public static RocketMQProtocol getDubboProtocol(ScopeModel scopeModel) {
        return (RocketMQProtocol)((Object)scopeModel.getExtensionLoader(Protocol.class).getExtension(NAME, false));
    }

    public int getDefaultPort() {
        return 9876;
    }

    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        RocketMQProtocolServer rocketMQProtocolServer;
        URL url = invoker.getUrl();
        RocketMQExporter<T> exporter = new RocketMQExporter<T>(invoker, url, this.exporterMap);
        String topic = exporter.getKey();
        try {
            rocketMQProtocolServer = this.openServer(url, "provider");
        }
        catch (Exception e) {
            String exeptionInfo = String.format("create rocketmq client fail, url is %s , topic is %s, cause is %s", url, topic, e.getMessage());
            this.logger.error(exeptionInfo, (Throwable)e);
            throw new RpcException(exeptionInfo, (Throwable)e);
        }
        try {
            String groupModel = url.getParameter("groupModel");
            if (Objects.nonNull(groupModel) && Objects.equals(groupModel, "select")) {
                rocketMQProtocolServer.getDefaultMQPushConsumer().subscribe(topic, this.createMessageSelector(url));
            } else {
                rocketMQProtocolServer.getDefaultMQPushConsumer().subscribe(topic, "*");
            }
            return exporter;
        }
        catch (Exception e) {
            String exeptionInfo = String.format("topic subscirbe fail, topic is %s, cause is %s", topic, e.getMessage());
            this.logger.error(exeptionInfo, (Throwable)e);
            throw new RpcException(exeptionInfo, (Throwable)e);
        }
    }

    private MessageSelector createMessageSelector(URL url) {
        if (Objects.isNull(url.getParameter("group")) && Objects.isNull(url.getParameter("version"))) {
            throw new RuntimeException("group and version is not null");
        }
        StringBuffer stringBuffer = new StringBuffer();
        boolean isGroup = false;
        if (Objects.nonNull(url.getParameter("group"))) {
            stringBuffer.append("group").append("=").append(url.getParameter("group"));
            isGroup = true;
        }
        if (Objects.nonNull(url.getParameter("version"))) {
            if (isGroup) {
                stringBuffer.append(" and ");
            }
            stringBuffer.append("version").append("=").append(url.getParameter("version"));
        }
        return MessageSelector.bySql((String)stringBuffer.toString());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private RocketMQProtocolServer openServer(URL url, String model) {
        String key = url.getAddress();
        ProtocolServer server = (ProtocolServer)this.serverMap.get(key);
        if (server == null) {
            RocketMQProtocol rocketMQProtocol = this;
            synchronized (rocketMQProtocol) {
                server = (ProtocolServer)this.serverMap.get(key);
                if (server == null) {
                    this.serverMap.put(key, this.createServer(url, key, model));
                }
                server = (ProtocolServer)this.serverMap.get(key);
                RocketMQProtocolServer rocketMQProtocolServer = (RocketMQProtocolServer)server;
                return rocketMQProtocolServer;
            }
        }
        return (RocketMQProtocolServer)server;
    }

    private ProtocolServer createServer(URL url, String key, String model) {
        RocketMQProtocolServer rocketMQProtocolServer = new RocketMQProtocolServer();
        rocketMQProtocolServer.setModel(model);
        DubboMessageListenerConcurrently dubboMessageListenerConcurrently = new DubboMessageListenerConcurrently();
        dubboMessageListenerConcurrently.defaultMQProducer = rocketMQProtocolServer.getDefaultMQProducer();
        dubboMessageListenerConcurrently.rocketMQProtocolServer = rocketMQProtocolServer;
        rocketMQProtocolServer.setMessageListenerConcurrently(dubboMessageListenerConcurrently);
        rocketMQProtocolServer.reset(url);
        return rocketMQProtocolServer;
    }

    protected <T> Invoker<T> protocolBindingRefer(Class<T> type, URL url) throws RpcException {
        try {
            RocketMQProtocolServer rocketMQProtocolServer = this.openServer(url, "consumer");
            RocketMQInvoker<T> rocketMQInvoker = new RocketMQInvoker<T>(type, url, rocketMQProtocolServer);
            return rocketMQInvoker;
        }
        catch (Exception e) {
            String exceptionInfo = String.format("protocol binding refer fail, url is %s , cause is %s ", url, e.getMessage());
            this.logger.error(exceptionInfo, (Throwable)e);
            throw new RpcException(exceptionInfo, (Throwable)e);
        }
    }

    private class DubboMessageListenerConcurrently
    implements MessageListenerConcurrently {
        private RocketMQCountCodec rocketmqCountCodec = new RocketMQCountCodec(FrameworkModel.defaultModel());
        private DefaultMQProducer defaultMQProducer;
        private RocketMQProtocolServer rocketMQProtocolServer;

        private DubboMessageListenerConcurrently() {
        }

        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            for (final MessageExt messageExt : msgs) {
                this.rocketMQProtocolServer.getExecutorService().submit(new Runnable(){

                    @Override
                    public void run() {
                        DubboMessageListenerConcurrently.this.execute(messageExt);
                    }
                });
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }

        private void execute(MessageExt messageExt) {
            RpcContext.getContext().setRemoteAddress(messageExt.getUserProperty("send_address"), 9876);
            String urlString = messageExt.getUserProperty("url_string");
            URL url = URL.valueOf((String)urlString);
            RocketMQChannel channel = new RocketMQChannel();
            channel.setRemoteAddress(RpcContext.getContext().getRemoteAddress());
            channel.setUrl(url);
            channel.setUrlString(urlString);
            channel.setMessageExt(messageExt);
            channel.setDefaultMQProducer(this.defaultMQProducer);
            channel.setRocketMQCountCodec(this.rocketmqCountCodec);
            Response response = this.invoke(messageExt, channel, url);
            if (Objects.isNull(response)) {
                return;
            }
            ChannelBuffer buffer = this.createChannelBuffer(channel, response, url);
            if (Objects.isNull(buffer)) {
                return;
            }
            this.sendMessage(messageExt, buffer, url, urlString);
        }

        private Response invoke(MessageExt messageExt, Channel channel, URL url) {
            Response response = new Response();
            try {
                String timeoutString = messageExt.getUserProperty("timeout");
                Long timeout = Long.valueOf(timeoutString);
                if (RocketMQProtocol.this.logger.isDebugEnabled()) {
                    RocketMQProtocol.this.logger.debug(String.format("reply message ext is : %s", messageExt));
                }
                if (Objects.isNull(messageExt.getProperty("CLUSTER"))) {
                    MQClientException exception = new MQClientException(10007, "create reply message fail, requestMessage error, property[CLUSTER] is null.");
                    response.setErrorMessage(exception.getMessage());
                    response.setStatus((byte)40);
                    RocketMQProtocol.this.logger.error((Throwable)exception);
                } else {
                    HeapChannelBuffer heapChannelBuffer = new HeapChannelBuffer(messageExt.getBody());
                    Object object = this.rocketmqCountCodec.decode(channel, (ChannelBuffer)heapChannelBuffer);
                    String topic = messageExt.getTopic();
                    Invocation inv = (Invocation)((Request)object).getData();
                    if (timeout < System.currentTimeMillis()) {
                        RocketMQProtocol.this.logger.warn(String.format("message timeoute time is %d invocation is %s ", timeout, inv));
                        return null;
                    }
                    Invoker invoker = ((Exporter)RocketMQProtocol.this.exporterMap.get(topic)).getInvoker();
                    RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
                    Result result = invoker.invoke(inv);
                    response.setStatus((byte)20);
                    response.setResult((Object)result);
                }
            }
            catch (Exception e) {
                String exceptionInfo = String.format("data decode or invoke fail, url is %s cause is %s", url, e.getMessage());
                response.setErrorMessage(exceptionInfo);
                response.setStatus((byte)40);
                RocketMQProtocol.this.logger.error(exceptionInfo, (Throwable)e);
            }
            return response;
        }

        private ChannelBuffer createChannelBuffer(Channel channel, Response response, URL url) {
            DynamicChannelBuffer buffer = new DynamicChannelBuffer(2048);
            try {
                this.rocketmqCountCodec.encode(channel, (ChannelBuffer)buffer, response);
            }
            catch (Exception e) {
                String exceptionInfo = String.format("encode fail, url is %s cause is %s", url, e.getMessage());
                response.setErrorMessage(exceptionInfo);
                response.setStatus((byte)40);
                RocketMQProtocol.this.logger.error(exceptionInfo, (Throwable)e);
                try {
                    buffer = new DynamicChannelBuffer(2048);
                    this.rocketmqCountCodec.encode(channel, (ChannelBuffer)buffer, response);
                }
                catch (IOException e1) {
                    String exceptionInfo1 = String.format("encode exception response fail, url is %s cause is %s", url, e.getMessage());
                    RocketMQProtocol.this.logger.error(exceptionInfo1, (Throwable)e1);
                    buffer = null;
                }
            }
            return buffer;
        }

        private boolean sendMessage(MessageExt messageExt, ChannelBuffer buffer, URL url, String urlString) {
            try {
                Message newMessage = MessageUtil.createReplyMessage((Message)messageExt, (byte[])buffer.array());
                newMessage.putUserProperty("send_address", RocketMQProtocolConstant.LOCAL_ADDRESS.getHostString());
                newMessage.putUserProperty("url_string", urlString);
                SendResult sendResult = this.defaultMQProducer.send(newMessage, 3000L);
                if (RocketMQProtocol.this.logger.isDebugEnabled()) {
                    RocketMQProtocol.this.logger.debug(String.format("send result is : %s", sendResult));
                }
                return true;
            }
            catch (Exception e) {
                String exceptionInfo = String.format("send response fail, url is %s cause is %s", url, e.getMessage());
                RocketMQProtocol.this.logger.error(exceptionInfo, (Throwable)e);
                return false;
            }
        }
    }
}

