/*
 * Decompiled with CFR 0.152.
 */
package org.apache.dubbo.rpc.protocol.dubbo;

import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.URLBuilder;
import org.apache.dubbo.common.config.ConfigurationUtils;
import org.apache.dubbo.common.extension.ExtensionLoader;
import org.apache.dubbo.common.serialize.support.SerializableClassRegistry;
import org.apache.dubbo.common.serialize.support.SerializationOptimizer;
import org.apache.dubbo.common.utils.CollectionUtils;
import org.apache.dubbo.common.utils.ConcurrentHashSet;
import org.apache.dubbo.common.utils.NetUtils;
import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.remoting.Channel;
import org.apache.dubbo.remoting.RemotingException;
import org.apache.dubbo.remoting.RemotingServer;
import org.apache.dubbo.remoting.Transporter;
import org.apache.dubbo.remoting.exchange.ExchangeChannel;
import org.apache.dubbo.remoting.exchange.ExchangeClient;
import org.apache.dubbo.remoting.exchange.ExchangeHandler;
import org.apache.dubbo.remoting.exchange.ExchangeServer;
import org.apache.dubbo.remoting.exchange.Exchangers;
import org.apache.dubbo.remoting.exchange.support.ExchangeHandlerAdapter;
import org.apache.dubbo.rpc.Exporter;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Protocol;
import org.apache.dubbo.rpc.ProtocolServer;
import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcContext;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.RpcInvocation;
import org.apache.dubbo.rpc.model.ScopeModel;
import org.apache.dubbo.rpc.protocol.AbstractProtocol;
import org.apache.dubbo.rpc.protocol.dubbo.DubboExporter;
import org.apache.dubbo.rpc.protocol.dubbo.DubboInvoker;
import org.apache.dubbo.rpc.protocol.dubbo.DubboProtocolServer;
import org.apache.dubbo.rpc.protocol.dubbo.LazyConnectExchangeClient;
import org.apache.dubbo.rpc.protocol.dubbo.ReferenceCountExchangeClient;

public class DubboProtocol
extends AbstractProtocol {
    public static final String NAME = "dubbo";
    public static final int DEFAULT_PORT = 20880;
    private static final String IS_CALLBACK_SERVICE_INVOKE = "_isCallBackServiceInvoke";
    private final Map<String, Object> referenceClientMap = new ConcurrentHashMap<String, Object>();
    private static final Object PENDING_OBJECT = new Object();
    private final Set<String> optimizers = new ConcurrentHashSet<String>();
    private AtomicBoolean destroyed = new AtomicBoolean();
    private ExchangeHandler requestHandler = new ExchangeHandlerAdapter(){

        @Override
        public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException {
            if (!(message instanceof Invocation)) {
                throw new RemotingException((Channel)channel, "Unsupported request: " + (message == null ? null : message.getClass().getName() + ": " + message) + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
            }
            Invocation inv = (Invocation)message;
            Invoker<?> invoker = DubboProtocol.this.getInvoker(channel, inv);
            inv.setServiceModel(invoker.getUrl().getServiceModel());
            if (invoker.getUrl().getServiceModel() != null) {
                Thread.currentThread().setContextClassLoader(invoker.getUrl().getServiceModel().getClassLoader());
            }
            if (Boolean.TRUE.toString().equals(inv.getObjectAttachments().get(DubboProtocol.IS_CALLBACK_SERVICE_INVOKE))) {
                String methodsStr = invoker.getUrl().getParameters().get("methods");
                boolean hasMethod = false;
                if (methodsStr == null || !methodsStr.contains(",")) {
                    hasMethod = inv.getMethodName().equals(methodsStr);
                } else {
                    String[] methods;
                    for (String method : methods = methodsStr.split(",")) {
                        if (!inv.getMethodName().equals(method)) continue;
                        hasMethod = true;
                        break;
                    }
                }
                if (!hasMethod) {
                    DubboProtocol.this.logger.warn(new IllegalStateException("The methodName " + inv.getMethodName() + " not found in callback service interface ,invoke will be ignored. please update the api interface. url is:" + invoker.getUrl()) + " ,invocation is :" + inv);
                    return null;
                }
            }
            RpcContext.getServiceContext().setRemoteAddress(channel.getRemoteAddress());
            Result result = invoker.invoke(inv);
            return result.thenApply(Function.identity());
        }

        @Override
        public void received(Channel channel, Object message) throws RemotingException {
            if (message instanceof Invocation) {
                this.reply((ExchangeChannel)channel, message);
            } else {
                super.received(channel, message);
            }
        }

        @Override
        public void connected(Channel channel) throws RemotingException {
            this.invoke(channel, "onconnect");
        }

        @Override
        public void disconnected(Channel channel) throws RemotingException {
            if (DubboProtocol.this.logger.isDebugEnabled()) {
                DubboProtocol.this.logger.debug("disconnected from " + channel.getRemoteAddress() + ",url:" + channel.getUrl());
            }
            this.invoke(channel, "ondisconnect");
        }

        private void invoke(Channel channel, String methodKey) {
            Invocation invocation = this.createInvocation(channel, channel.getUrl(), methodKey);
            if (invocation != null) {
                try {
                    this.received(channel, invocation);
                }
                catch (Throwable t) {
                    DubboProtocol.this.logger.warn("Failed to invoke event method " + invocation.getMethodName() + "(), cause: " + t.getMessage(), t);
                }
            }
        }

        private Invocation createInvocation(Channel channel, URL url, String methodKey) {
            String method = url.getParameter(methodKey);
            if (method == null || method.length() == 0) {
                return null;
            }
            RpcInvocation invocation = new RpcInvocation(url.getServiceModel(), method, url.getParameter("interface"), "", new Class[0], new Object[0]);
            invocation.setAttachment("path", url.getPath());
            invocation.setAttachment("group", url.getGroup());
            invocation.setAttachment("interface", url.getParameter("interface"));
            invocation.setAttachment("version", url.getVersion());
            if (url.getParameter("dubbo.stub.event", false)) {
                invocation.setAttachment("dubbo.stub.event", Boolean.TRUE.toString());
            }
            return invocation;
        }
    };

    @Deprecated
    public static DubboProtocol getDubboProtocol() {
        return (DubboProtocol)ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(NAME, false);
    }

    public static DubboProtocol getDubboProtocol(ScopeModel scopeModel) {
        return (DubboProtocol)scopeModel.getExtensionLoader(Protocol.class).getExtension(NAME, false);
    }

    @Override
    public Collection<Exporter<?>> getExporters() {
        return Collections.unmodifiableCollection(this.exporterMap.values());
    }

    private boolean isClientSide(Channel channel) {
        InetSocketAddress address = channel.getRemoteAddress();
        URL url = channel.getUrl();
        return url.getPort() == address.getPort() && NetUtils.filterLocalHost(channel.getUrl().getIp()).equals(NetUtils.filterLocalHost(address.getAddress().getHostAddress()));
    }

    Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException {
        String serviceKey;
        DubboExporter exporter;
        boolean isCallBackServiceInvoke;
        int port = channel.getLocalAddress().getPort();
        String path = (String)inv.getObjectAttachments().get("path");
        boolean isStubServiceInvoke = Boolean.TRUE.toString().equals(inv.getObjectAttachments().get("dubbo.stub.event"));
        if (isStubServiceInvoke) {
            port = channel.getRemoteAddress().getPort();
        }
        boolean bl = isCallBackServiceInvoke = this.isClientSide(channel) && !isStubServiceInvoke;
        if (isCallBackServiceInvoke) {
            path = path + "." + inv.getObjectAttachments().get("callback.service.instid");
            inv.getObjectAttachments().put(IS_CALLBACK_SERVICE_INVOKE, Boolean.TRUE.toString());
        }
        if ((exporter = (DubboExporter)this.exporterMap.get(serviceKey = DubboProtocol.serviceKey(port, path, (String)inv.getObjectAttachments().get("version"), (String)inv.getObjectAttachments().get("group")))) == null) {
            throw new RemotingException(channel, "Not found exported service: " + serviceKey + " in " + this.exporterMap.keySet() + ", may be version or group mismatch , channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress() + ", message:" + this.getInvocationWithoutData(inv));
        }
        return exporter.getInvoker();
    }

    public Collection<Invoker<?>> getInvokers() {
        return Collections.unmodifiableCollection(this.invokers);
    }

    @Override
    public int getDefaultPort() {
        return 20880;
    }

    @Override
    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        String stubServiceMethods;
        this.checkDestroyed();
        URL url = invoker.getUrl();
        String key = DubboProtocol.serviceKey(url);
        DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, this.exporterMap);
        Boolean isStubSupportEvent = url.getParameter("dubbo.stub.event", false);
        Boolean isCallbackservice = url.getParameter("is_callback_service", false);
        if (isStubSupportEvent.booleanValue() && !isCallbackservice.booleanValue() && ((stubServiceMethods = url.getParameter("dubbo.stub.event.methods")) == null || stubServiceMethods.length() == 0) && this.logger.isWarnEnabled()) {
            this.logger.warn(new IllegalStateException("consumer [" + url.getParameter("interface") + "], has set stubproxy support event ,but no stub methods founded."));
        }
        this.openServer(url);
        this.optimizeSerialization(url);
        return exporter;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void openServer(URL url) {
        this.checkDestroyed();
        String key = url.getAddress();
        boolean isServer = url.getParameter("isserver", true);
        if (isServer) {
            ProtocolServer server = (ProtocolServer)this.serverMap.get(key);
            if (server == null) {
                DubboProtocol dubboProtocol = this;
                synchronized (dubboProtocol) {
                    server = (ProtocolServer)this.serverMap.get(key);
                    if (server == null) {
                        this.serverMap.put(key, this.createServer(url));
                    } else {
                        server.reset(url);
                    }
                }
            } else {
                server.reset(url);
            }
        }
    }

    private void checkDestroyed() {
        if (this.destroyed.get()) {
            throw new IllegalStateException(this.getClass().getSimpleName() + " is destroyed");
        }
    }

    private ProtocolServer createServer(URL url) {
        Set<String> supportedTypes;
        ExchangeServer server;
        String str = (url = URLBuilder.from(url).addParameterIfAbsent("channel.readonly.sent", Boolean.TRUE.toString()).addParameterIfAbsent("heartbeat", String.valueOf(60000)).addParameter("codec", NAME).build()).getParameter("server", "netty");
        if (StringUtils.isNotEmpty(str) && !url.getOrDefaultFrameworkModel().getExtensionLoader(Transporter.class).hasExtension(str)) {
            throw new RpcException("Unsupported server type: " + str + ", url: " + url);
        }
        try {
            server = Exchangers.bind(url, this.requestHandler);
        }
        catch (RemotingException e) {
            throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), (Throwable)e);
        }
        str = url.getParameter("client");
        if (StringUtils.isNotEmpty(str) && !(supportedTypes = url.getOrDefaultFrameworkModel().getExtensionLoader(Transporter.class).getSupportedExtensions()).contains(str)) {
            throw new RpcException("Unsupported client type: " + str);
        }
        DubboProtocolServer protocolServer = new DubboProtocolServer(server);
        this.loadServerProperties(protocolServer);
        return protocolServer;
    }

    private void optimizeSerialization(URL url) throws RpcException {
        String className = url.getParameter("optimizer", "");
        if (StringUtils.isEmpty(className) || this.optimizers.contains(className)) {
            return;
        }
        this.logger.info("Optimizing the serialization process for Kryo, FST, etc...");
        try {
            Class<?> clazz = Thread.currentThread().getContextClassLoader().loadClass(className);
            if (!SerializationOptimizer.class.isAssignableFrom(clazz)) {
                throw new RpcException("The serialization optimizer " + className + " isn't an instance of " + SerializationOptimizer.class.getName());
            }
            SerializationOptimizer optimizer = (SerializationOptimizer)clazz.newInstance();
            if (optimizer.getSerializableClasses() == null) {
                return;
            }
            for (Class<?> c : optimizer.getSerializableClasses()) {
                SerializableClassRegistry.registerClass(c);
            }
            this.optimizers.add(className);
        }
        catch (ClassNotFoundException e) {
            throw new RpcException("Cannot find the serialization optimizer class: " + className, (Throwable)e);
        }
        catch (IllegalAccessException | InstantiationException e) {
            throw new RpcException("Cannot instantiate the serialization optimizer class: " + className, (Throwable)e);
        }
    }

    @Override
    public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
        this.checkDestroyed();
        return this.protocolBindingRefer(type, url);
    }

    @Override
    public <T> Invoker<T> protocolBindingRefer(Class<T> serviceType, URL url) throws RpcException {
        this.checkDestroyed();
        this.optimizeSerialization(url);
        DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, this.getClients(url), this.invokers);
        this.invokers.add(invoker);
        return invoker;
    }

    private ExchangeClient[] getClients(URL url) {
        boolean useShareConnect = false;
        int connections = url.getParameter("connections", 0);
        List<ReferenceCountExchangeClient> shareClients = null;
        if (connections == 0) {
            useShareConnect = true;
            String shareConnectionsStr = url.getParameter("shareconnections", (String)null);
            connections = Integer.parseInt(StringUtils.isBlank(shareConnectionsStr) ? ConfigurationUtils.getProperty(url.getOrDefaultApplicationModel(), "shareconnections", "1") : shareConnectionsStr);
            shareClients = this.getSharedClient(url, connections);
        }
        ExchangeClient[] clients = new ExchangeClient[connections];
        for (int i = 0; i < clients.length; ++i) {
            clients[i] = useShareConnect ? (ExchangeClient)shareClients.get(i) : this.initClient(url);
        }
        return clients;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private List<ReferenceCountExchangeClient> getSharedClient(URL url, int connectNum) {
        List typedClients;
        String key = url.getAddress();
        Object clients = this.referenceClientMap.get(key);
        if (clients instanceof List && this.checkClientCanUse(typedClients = (List)clients)) {
            this.batchClientRefIncr(typedClients);
            return typedClients;
        }
        typedClients = null;
        Map<String, Object> map = this.referenceClientMap;
        synchronized (map) {
            block24: {
                while (true) {
                    if ((clients = this.referenceClientMap.get(key)) instanceof List) {
                        typedClients = (List)clients;
                        if (this.checkClientCanUse(typedClients)) {
                            this.batchClientRefIncr(typedClients);
                            return typedClients;
                        }
                        this.referenceClientMap.put(key, PENDING_OBJECT);
                        break block24;
                    }
                    if (clients != PENDING_OBJECT) break;
                    try {
                        this.referenceClientMap.wait();
                    }
                    catch (InterruptedException interruptedException) {}
                }
                this.referenceClientMap.put(key, PENDING_OBJECT);
            }
        }
        try {
            connectNum = Math.max(connectNum, 1);
            if (CollectionUtils.isEmpty(typedClients)) {
                typedClients = this.buildReferenceCountExchangeClientList(url, connectNum);
            } else {
                for (int i = 0; i < typedClients.size(); ++i) {
                    ReferenceCountExchangeClient referenceCountExchangeClient = typedClients.get(i);
                    if (referenceCountExchangeClient == null || referenceCountExchangeClient.isClosed()) {
                        typedClients.set(i, (ReferenceCountExchangeClient)this.buildReferenceCountExchangeClient(url));
                        continue;
                    }
                    referenceCountExchangeClient.incrementAndGetCount();
                }
            }
        }
        finally {
            Map<String, Object> map2 = this.referenceClientMap;
            synchronized (map2) {
                if (typedClients == null) {
                    this.referenceClientMap.remove(key);
                } else {
                    this.referenceClientMap.put(key, typedClients);
                }
                this.referenceClientMap.notifyAll();
            }
        }
        return typedClients;
    }

    private boolean checkClientCanUse(List<ReferenceCountExchangeClient> referenceCountExchangeClients) {
        if (CollectionUtils.isEmpty(referenceCountExchangeClients)) {
            return false;
        }
        for (ReferenceCountExchangeClient referenceCountExchangeClient : referenceCountExchangeClients) {
            if (referenceCountExchangeClient != null && referenceCountExchangeClient.getCount() > 0 && !referenceCountExchangeClient.isClosed()) continue;
            return false;
        }
        return true;
    }

    private void batchClientRefIncr(List<ReferenceCountExchangeClient> referenceCountExchangeClients) {
        if (CollectionUtils.isEmpty(referenceCountExchangeClients)) {
            return;
        }
        for (ReferenceCountExchangeClient referenceCountExchangeClient : referenceCountExchangeClients) {
            if (referenceCountExchangeClient == null) continue;
            referenceCountExchangeClient.incrementAndGetCount();
        }
    }

    private List<ReferenceCountExchangeClient> buildReferenceCountExchangeClientList(URL url, int connectNum) {
        ArrayList<ReferenceCountExchangeClient> clients = new ArrayList<ReferenceCountExchangeClient>();
        for (int i = 0; i < connectNum; ++i) {
            clients.add(this.buildReferenceCountExchangeClient(url));
        }
        return clients;
    }

    private ReferenceCountExchangeClient buildReferenceCountExchangeClient(URL url) {
        ExchangeClient exchangeClient = this.initClient(url);
        ReferenceCountExchangeClient client = new ReferenceCountExchangeClient(exchangeClient, NAME);
        int shutdownTimeout = ConfigurationUtils.getServerShutdownTimeout(url.getScopeModel());
        client.setShutdownWaitTime(shutdownTimeout);
        return client;
    }

    private ExchangeClient initClient(URL url) {
        ExchangeClient client;
        String str = url.getParameter("client", url.getParameter("server", "netty"));
        url = url.addParameter("codec", NAME);
        url = url.addParameterIfAbsent("heartbeat", String.valueOf(60000));
        if (StringUtils.isNotEmpty(str) && !url.getOrDefaultFrameworkModel().getExtensionLoader(Transporter.class).hasExtension(str)) {
            throw new RpcException("Unsupported client type: " + str + ", supported client type is " + StringUtils.join(url.getOrDefaultFrameworkModel().getExtensionLoader(Transporter.class).getSupportedExtensions(), " "));
        }
        try {
            client = url.getParameter("lazy", false) ? new LazyConnectExchangeClient(url, this.requestHandler, NAME, url.getParameters()) : Exchangers.connect(url, this.requestHandler);
        }
        catch (RemotingException e) {
            throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), (Throwable)e);
        }
        return client;
    }

    @Override
    public void destroy() {
        if (!this.destroyed.compareAndSet(false, true)) {
            return;
        }
        if (this.logger.isInfoEnabled()) {
            this.logger.info("Destroying protocol [" + this.getClass().getSimpleName() + "] ...");
        }
        for (String key : new ArrayList(this.serverMap.keySet())) {
            ProtocolServer protocolServer = (ProtocolServer)this.serverMap.remove(key);
            if (protocolServer == null) continue;
            RemotingServer server = protocolServer.getRemotingServer();
            try {
                if (this.logger.isInfoEnabled()) {
                    this.logger.info("Closing dubbo server: " + server.getLocalAddress());
                }
                server.close(this.getServerShutdownTimeout(protocolServer));
            }
            catch (Throwable t) {
                this.logger.warn("Close dubbo server [" + server.getLocalAddress() + "] failed: " + t.getMessage(), t);
            }
        }
        this.serverMap.clear();
        for (String key : new ArrayList<String>(this.referenceClientMap.keySet())) {
            List typedClients;
            Object clients = this.referenceClientMap.remove(key);
            if (!(clients instanceof List) || CollectionUtils.isEmpty(typedClients = (List)clients)) continue;
            for (ReferenceCountExchangeClient client : typedClients) {
                this.closeReferenceCountExchangeClient(client);
            }
        }
        this.referenceClientMap.clear();
        super.destroy();
    }

    private void closeReferenceCountExchangeClient(ReferenceCountExchangeClient client) {
        if (client == null) {
            return;
        }
        try {
            if (this.logger.isInfoEnabled()) {
                this.logger.info("Close dubbo connect: " + client.getLocalAddress() + "-->" + client.getRemoteAddress());
            }
            client.close(client.getShutdownWaitTime());
        }
        catch (Throwable t) {
            this.logger.warn(t.getMessage(), t);
        }
    }

    private Invocation getInvocationWithoutData(Invocation invocation) {
        if (this.logger.isDebugEnabled()) {
            return invocation;
        }
        if (invocation instanceof RpcInvocation) {
            RpcInvocation rpcInvocation = (RpcInvocation)invocation;
            rpcInvocation.setArguments(null);
            return rpcInvocation;
        }
        return invocation;
    }
}

