package com.alibaba.fescar.core.rpc.netty;

import com.alibaba.fescar.common.exception.FrameworkErrorCode;
import com.alibaba.fescar.common.exception.FrameworkException;
import com.alibaba.fescar.common.thread.NamedThreadFactory;
import com.alibaba.fescar.common.util.NetUtil;
import com.alibaba.fescar.core.protocol.AbstractMessage;
import com.alibaba.fescar.core.protocol.HeartbeatMessage;
import com.alibaba.fescar.core.protocol.MergeResultMessage;
import com.alibaba.fescar.core.protocol.MergedWarpMessage;
import com.alibaba.fescar.core.protocol.MessageFuture;
import com.alibaba.fescar.core.protocol.RpcMessage;
import com.alibaba.fescar.core.rpc.ClientMessageListener;
import com.alibaba.fescar.core.rpc.ClientMessageSender;
import com.alibaba.fescar.core.rpc.RemotingService;
import com.alibaba.fescar.core.rpc.netty.NettyPoolKey;
import com.alibaba.fescar.core.service.ServiceManager;
import com.alibaba.fescar.core.service.ServiceManagerStaticConfigImpl;
import com.alibaba.fescar.discovery.registry.RegistryFactory;
import com.alibaba.nacos.client.naming.utils.CollectionUtils;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.EpollChannelOption;
import io.netty.channel.epoll.EpollMode;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.pool.AbstractChannelPoolMap;
import io.netty.channel.pool.ChannelHealthChecker;
import io.netty.channel.pool.FixedChannelPool;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import io.netty.util.concurrent.EventExecutorGroup;
import io.netty.util.internal.PlatformDependent;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.pool.impl.GenericKeyedObjectPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/alibaba/fescar/core/rpc/netty/AbstractRpcRemotingClient.class */
public abstract class AbstractRpcRemotingClient extends AbstractRpcRemoting implements RemotingService, RegisterMsgListener, ClientMessageSender {
    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractRpcRemotingClient.class);
    private final NettyClientConfig nettyClientConfig;
    private final Bootstrap bootstrap;
    private final EventLoopGroup eventLoopGroupWorker;
    private EventExecutorGroup defaultEventExecutorGroup;
    private AbstractChannelPoolMap<InetSocketAddress, FixedChannelPool> clientChannelPool;
    private final AtomicBoolean initialized;
    private static final String MSG_ID_PREFIX = "msgId:";
    private static final String FUTURES_PREFIX = "futures:";
    private static final String SINGLE_LOG_POSTFIX = ";";
    private static final int MAX_MERGE_SEND_MILLS = 1;
    private static final String THREAD_PREFIX_SPLIT_CHAR = "_";
    protected ServiceManager serviceManager;
    protected GenericKeyedObjectPool<NettyPoolKey, Channel> nettyClientKeyPool;
    protected ClientMessageListener clientMessageListener;

    /* loaded from: input_file:com/alibaba/fescar/core/rpc/netty/AbstractRpcRemotingClient$MergedSendRunnable.class */
    public class MergedSendRunnable implements Runnable {
        public MergedSendRunnable() {
        }

        @Override // java.lang.Runnable
        public void run() {
            while (true) {
                synchronized (AbstractRpcRemotingClient.this.mergeLock) {
                    try {
                        AbstractRpcRemotingClient.this.mergeLock.wait(1L);
                    } catch (InterruptedException e) {
                    }
                }
                AbstractRpcRemotingClient.this.isSending = true;
                Iterator it = AbstractRpcRemotingClient.this.basketMap.keySet().iterator();
                while (it.hasNext()) {
                    String str = (String) it.next();
                    BlockingQueue<RpcMessage> blockingQueue = AbstractRpcRemotingClient.this.basketMap.get(str);
                    if (!blockingQueue.isEmpty()) {
                        MergedWarpMessage mergedWarpMessage = new MergedWarpMessage();
                        while (!blockingQueue.isEmpty()) {
                            RpcMessage poll = blockingQueue.poll();
                            mergedWarpMessage.msgs.add((AbstractMessage) poll.getBody());
                            mergedWarpMessage.msgIds.add(Long.valueOf(poll.getId()));
                        }
                        if (mergedWarpMessage.msgIds.size() > 1) {
                            printMergeMessageLog(mergedWarpMessage);
                        }
                        Channel connect = AbstractRpcRemotingClient.this.connect(str);
                        try {
                            AbstractRpcRemotingClient.this.sendRequest(connect, mergedWarpMessage);
                        } catch (FrameworkException e2) {
                            if (e2.getErrcode() == FrameworkErrorCode.ChannelIsNotWritable && str != null) {
                                AbstractRpcRemotingClient.this.destroyChannel(str, connect);
                            }
                            AbstractRpcRemotingClient.LOGGER.error("", "client merge call failed", e2);
                        }
                    }
                }
                AbstractRpcRemotingClient.this.isSending = false;
            }
        }

        private void printMergeMessageLog(MergedWarpMessage mergedWarpMessage) {
            if (AbstractRpcRemotingClient.LOGGER.isDebugEnabled()) {
                AbstractRpcRemotingClient.LOGGER.debug("merge msg size:" + mergedWarpMessage.msgIds.size());
                Iterator<AbstractMessage> it = mergedWarpMessage.msgs.iterator();
                while (it.hasNext()) {
                    AbstractRpcRemotingClient.LOGGER.debug(it.next().toString());
                }
                StringBuffer stringBuffer = new StringBuffer();
                Iterator<Long> it2 = mergedWarpMessage.msgIds.iterator();
                while (it2.hasNext()) {
                    stringBuffer.append(AbstractRpcRemotingClient.MSG_ID_PREFIX).append(it2.next().longValue()).append(AbstractRpcRemotingClient.SINGLE_LOG_POSTFIX);
                }
                stringBuffer.append("\n");
                Iterator it3 = AbstractRpcRemotingClient.this.futures.keySet().iterator();
                while (it3.hasNext()) {
                    stringBuffer.append(AbstractRpcRemotingClient.FUTURES_PREFIX).append(((Long) it3.next()).longValue()).append(AbstractRpcRemotingClient.SINGLE_LOG_POSTFIX);
                }
                AbstractRpcRemotingClient.LOGGER.debug(stringBuffer.toString());
            }
        }
    }

    public AbstractRpcRemotingClient(NettyClientConfig nettyClientConfig) {
        this(nettyClientConfig, null, null);
    }

    public AbstractRpcRemotingClient(NettyClientConfig nettyClientConfig, EventExecutorGroup eventExecutorGroup, ThreadPoolExecutor threadPoolExecutor) {
        super(threadPoolExecutor);
        this.bootstrap = new Bootstrap();
        this.initialized = new AtomicBoolean(false);
        if (null == nettyClientConfig) {
            nettyClientConfig = new NettyClientConfig();
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info("use default netty client config.");
            }
        }
        this.nettyClientConfig = nettyClientConfig;
        int clientSelectorThreadSize = this.nettyClientConfig.getClientSelectorThreadSize();
        this.eventLoopGroupWorker = new NioEventLoopGroup(clientSelectorThreadSize, new NamedThreadFactory(getThreadPrefix(this.nettyClientConfig.getClientSelectorThreadPrefix()), clientSelectorThreadSize));
        this.defaultEventExecutorGroup = eventExecutorGroup;
    }

    @Override // com.alibaba.fescar.core.rpc.netty.AbstractRpcRemoting
    public void init() {
        this.nettyClientKeyPool = new GenericKeyedObjectPool<>(new NettyPoolableFactory(this));
        this.nettyClientKeyPool.setConfig(getNettyPoolConfig());
        this.serviceManager = new ServiceManagerStaticConfigImpl();
        super.init();
    }

    @Override // com.alibaba.fescar.core.rpc.RemotingService
    public void start() {
        if (this.defaultEventExecutorGroup == null) {
            this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(this.nettyClientConfig.getClientWorkerThreads(), new NamedThreadFactory(getThreadPrefix(this.nettyClientConfig.getClientWorkerThreadPrefix()), this.nettyClientConfig.getClientWorkerThreads()));
        }
        this.bootstrap.group(this.eventLoopGroupWorker).channel(this.nettyClientConfig.getClientChannelClazz()).option(ChannelOption.TCP_NODELAY, true).option(ChannelOption.SO_KEEPALIVE, true).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Integer.valueOf(this.nettyClientConfig.getConnectTimeoutMillis())).option(ChannelOption.SO_SNDBUF, Integer.valueOf(this.nettyClientConfig.getClientSocketSndBufSize())).option(ChannelOption.SO_RCVBUF, Integer.valueOf(this.nettyClientConfig.getClientSocketRcvBufSize()));
        if (this.nettyClientConfig.enableNative()) {
            if (!PlatformDependent.isOsx()) {
                this.bootstrap.option(EpollChannelOption.EPOLL_MODE, EpollMode.EDGE_TRIGGERED).option(EpollChannelOption.TCP_QUICKACK, true);
            } else if (LOGGER.isInfoEnabled()) {
                LOGGER.info("client run on macOS");
            }
        }
        if (this.nettyClientConfig.isUseConnPool()) {
            this.clientChannelPool = new AbstractChannelPoolMap<InetSocketAddress, FixedChannelPool>() { // from class: com.alibaba.fescar.core.rpc.netty.AbstractRpcRemotingClient.1
                /* JADX INFO: Access modifiers changed from: protected */
                public FixedChannelPool newPool(InetSocketAddress inetSocketAddress) {
                    return new FixedChannelPool(AbstractRpcRemotingClient.this.bootstrap.remoteAddress(inetSocketAddress), new DefaultChannelPoolHandler() { // from class: com.alibaba.fescar.core.rpc.netty.AbstractRpcRemotingClient.1.1
                        @Override // com.alibaba.fescar.core.rpc.netty.DefaultChannelPoolHandler
                        public void channelCreated(Channel channel) throws Exception {
                            super.channelCreated(channel);
                            ChannelPipeline pipeline = channel.pipeline();
                            pipeline.addLast(AbstractRpcRemotingClient.this.defaultEventExecutorGroup, new ChannelHandler[]{new IdleStateHandler(AbstractRpcRemotingClient.this.nettyClientConfig.getChannelMaxReadIdleSeconds(), AbstractRpcRemotingClient.this.nettyClientConfig.getChannelMaxWriteIdleSeconds(), AbstractRpcRemotingClient.this.nettyClientConfig.getChannelMaxAllIdleSeconds())});
                            pipeline.addLast(AbstractRpcRemotingClient.this.defaultEventExecutorGroup, new ChannelHandler[]{new RpcClientHandler()});
                        }
                    }, ChannelHealthChecker.ACTIVE, FixedChannelPool.AcquireTimeoutAction.FAIL, AbstractRpcRemotingClient.this.nettyClientConfig.getMaxAcquireConnMills(), AbstractRpcRemotingClient.this.nettyClientConfig.getPerHostMaxConn(), AbstractRpcRemotingClient.this.nettyClientConfig.getPendingConnSize(), false);
                }
            };
        } else {
            this.bootstrap.handler(new ChannelInitializer<SocketChannel>() { // from class: com.alibaba.fescar.core.rpc.netty.AbstractRpcRemotingClient.2
                public void initChannel(SocketChannel socketChannel) {
                    socketChannel.pipeline().addLast(new ChannelHandler[]{new IdleStateHandler(AbstractRpcRemotingClient.this.nettyClientConfig.getChannelMaxReadIdleSeconds(), AbstractRpcRemotingClient.this.nettyClientConfig.getChannelMaxWriteIdleSeconds(), AbstractRpcRemotingClient.this.nettyClientConfig.getChannelMaxAllIdleSeconds())}).addLast(new ChannelHandler[]{new MessageCodecHandler()});
                    if (null != AbstractRpcRemotingClient.this.channelHandlers) {
                        AbstractRpcRemotingClient.this.addChannelPipelineLast(socketChannel, AbstractRpcRemotingClient.this.channelHandlers);
                    }
                }
            });
        }
        if (this.initialized.compareAndSet(false, true) && LOGGER.isInfoEnabled()) {
            LOGGER.info("AbstractRpcRemotingClient has started");
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Channel getNewChannel(InetSocketAddress inetSocketAddress) {
        ChannelFuture connect = this.bootstrap.connect(inetSocketAddress);
        try {
            connect.await(this.nettyClientConfig.getConnectTimeoutMillis(), TimeUnit.MILLISECONDS);
            if (connect.isCancelled()) {
                throw new FrameworkException(connect.cause(), "connect cancelled, can not connect to fescar-server.");
            }
            if (connect.isSuccess()) {
                return connect.channel();
            }
            throw new FrameworkException(connect.cause(), "connect failed, can not connect to fescar-server.");
        } catch (Exception e) {
            throw new FrameworkException(e, "can not connect to fescar-server.");
        }
    }

    @Override // com.alibaba.fescar.core.rpc.RemotingService
    public void shutdown() {
        try {
            if (null != this.clientChannelPool) {
                this.clientChannelPool.close();
            }
            this.eventLoopGroupWorker.shutdownGracefully();
            if (this.defaultEventExecutorGroup != null) {
                this.defaultEventExecutorGroup.shutdownGracefully();
            }
            super.destroy();
        } catch (Exception e) {
            LOGGER.error("shutdown error:" + e.getMessage());
        }
    }

    @Override // com.alibaba.fescar.core.rpc.netty.AbstractRpcRemoting
    public void channelRead(ChannelHandlerContext channelHandlerContext, Object obj) throws Exception {
        if ((obj instanceof RpcMessage) && ((RpcMessage) obj).getBody() == HeartbeatMessage.PONG) {
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("received PONG from " + channelHandlerContext.channel().remoteAddress());
                return;
            }
            return;
        }
        if (!(((RpcMessage) obj).getBody() instanceof MergeResultMessage)) {
            super.channelRead(channelHandlerContext, obj);
            return;
        }
        MergeResultMessage mergeResultMessage = (MergeResultMessage) ((RpcMessage) obj).getBody();
        MergedWarpMessage mergedWarpMessage = (MergedWarpMessage) this.mergeMsgMap.remove(Long.valueOf(((RpcMessage) obj).getId()));
        int size = mergedWarpMessage.msgs.size();
        for (int i = 0; i < size; i++) {
            long longValue = mergedWarpMessage.msgIds.get(i).longValue();
            MessageFuture remove = this.futures.remove(Long.valueOf(longValue));
            if (remove != null) {
                remove.setResultMessage(mergeResultMessage.getMsgs()[i]);
            } else if (LOGGER.isInfoEnabled()) {
                LOGGER.info("msg:" + longValue + " is not found in futures.");
            }
        }
    }

    public ClientMessageListener getClientMessageListener() {
        return this.clientMessageListener;
    }

    public void setClientMessageListener(ClientMessageListener clientMessageListener) {
        this.clientMessageListener = clientMessageListener;
    }

    @Override // com.alibaba.fescar.core.rpc.netty.AbstractRpcRemoting
    public void dispatch(long j, ChannelHandlerContext channelHandlerContext, Object obj) {
        if (this.clientMessageListener != null) {
            this.clientMessageListener.onMessage(j, NetUtil.toStringAddress(channelHandlerContext.channel().remoteAddress()), obj, this);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public List<String> getAvailServerList(String str) throws Exception {
        ArrayList arrayList = new ArrayList();
        List lookup = RegistryFactory.getInstance().lookup(str);
        if (!CollectionUtils.isEmpty(lookup)) {
            Iterator it = lookup.iterator();
            while (it.hasNext()) {
                arrayList.add(NetUtil.toStringAddress((InetSocketAddress) it.next()));
            }
        }
        return arrayList;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public String getThreadPrefix(String str) {
        return str + THREAD_PREFIX_SPLIT_CHAR + getTransactionRole().name();
    }

    protected abstract Channel connect(String str);

    protected abstract GenericKeyedObjectPool.Config getNettyPoolConfig();

    protected abstract NettyPoolKey.TransactionRole getTransactionRole();
}
