package io.seata.core.rpc.netty;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.concurrent.EventExecutorGroup;
import io.seata.common.exception.FrameworkErrorCode;
import io.seata.common.exception.FrameworkException;
import io.seata.common.thread.NamedThreadFactory;
import io.seata.common.util.NetUtil;
import io.seata.core.protocol.AbstractMessage;
import io.seata.core.protocol.HeartbeatMessage;
import io.seata.core.protocol.MergeResultMessage;
import io.seata.core.protocol.MergedWarpMessage;
import io.seata.core.protocol.MessageFuture;
import io.seata.core.protocol.RpcMessage;
import io.seata.core.rpc.ClientMessageListener;
import io.seata.core.rpc.ClientMessageSender;
import io.seata.core.rpc.netty.NettyPoolKey;
import io.seata.discovery.loadbalance.LoadBalanceFactory;
import io.seata.discovery.registry.RegistryFactory;
import java.net.InetSocketAddress;
import java.util.Iterator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/seata/core/rpc/netty/AbstractRpcRemotingClient.class */
public abstract class AbstractRpcRemotingClient extends AbstractRpcRemoting implements RegisterMsgListener, ClientMessageSender {
    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractRpcRemotingClient.class);
    private static final String MSG_ID_PREFIX = "msgId:";
    private static final String FUTURES_PREFIX = "futures:";
    private static final String SINGLE_LOG_POSTFIX = ";";
    private static final int MAX_MERGE_SEND_MILLS = 1;
    private static final String THREAD_PREFIX_SPLIT_CHAR = "_";
    private static final int MAX_MERGE_SEND_THREAD = 1;
    private static final long KEEP_ALIVE_TIME = 2147483647L;
    private static final int SCHEDULE_INTERVAL_MILLS = 5;
    private static final String MERGE_THREAD_PREFIX = "rpcMergeMessageSend";
    private final RpcClientBootstrap clientBootstrap;
    private NettyClientChannelManager clientChannelManager;
    private ClientMessageListener clientMessageListener;
    private final NettyPoolKey.TransactionRole transactionRole;
    private ExecutorService mergeSendExecutorService;

    /* loaded from: input_file:io/seata/core/rpc/netty/AbstractRpcRemotingClient$MergedSendRunnable.class */
    private class MergedSendRunnable implements Runnable {
        private MergedSendRunnable() {
        }

        @Override // java.lang.Runnable
        public void run() {
            while (true) {
                synchronized (AbstractRpcRemotingClient.this.mergeLock) {
                    try {
                        AbstractRpcRemotingClient.this.mergeLock.wait(1L);
                    } catch (InterruptedException e) {
                    }
                }
                AbstractRpcRemotingClient.this.isSending = true;
                Iterator it = AbstractRpcRemotingClient.this.basketMap.keySet().iterator();
                while (it.hasNext()) {
                    String str = (String) it.next();
                    BlockingQueue<RpcMessage> blockingQueue = AbstractRpcRemotingClient.this.basketMap.get(str);
                    if (!blockingQueue.isEmpty()) {
                        MergedWarpMessage mergedWarpMessage = new MergedWarpMessage();
                        while (!blockingQueue.isEmpty()) {
                            RpcMessage poll = blockingQueue.poll();
                            mergedWarpMessage.msgs.add((AbstractMessage) poll.getBody());
                            mergedWarpMessage.msgIds.add(Integer.valueOf(poll.getId()));
                        }
                        if (mergedWarpMessage.msgIds.size() > 1) {
                            printMergeMessageLog(mergedWarpMessage);
                        }
                        Channel channel = null;
                        try {
                            channel = AbstractRpcRemotingClient.this.clientChannelManager.acquireChannel(str);
                            AbstractRpcRemotingClient.this.sendRequest(channel, mergedWarpMessage);
                        } catch (FrameworkException e2) {
                            if (e2.getErrcode() == FrameworkErrorCode.ChannelIsNotWritable && channel != null) {
                                AbstractRpcRemotingClient.this.destroyChannel(str, channel);
                            }
                            Iterator<Integer> it2 = mergedWarpMessage.msgIds.iterator();
                            while (it2.hasNext()) {
                                MessageFuture remove = AbstractRpcRemotingClient.this.futures.remove(it2.next());
                                if (remove != null) {
                                    remove.setResultMessage(null);
                                }
                            }
                            AbstractRpcRemotingClient.LOGGER.error("", "client merge call failed", e2);
                        }
                    }
                }
                AbstractRpcRemotingClient.this.isSending = false;
            }
        }

        private void printMergeMessageLog(MergedWarpMessage mergedWarpMessage) {
            if (AbstractRpcRemotingClient.LOGGER.isDebugEnabled()) {
                AbstractRpcRemotingClient.LOGGER.debug("merge msg size:" + mergedWarpMessage.msgIds.size());
                Iterator<AbstractMessage> it = mergedWarpMessage.msgs.iterator();
                while (it.hasNext()) {
                    AbstractRpcRemotingClient.LOGGER.debug(it.next().toString());
                }
                StringBuilder sb = new StringBuilder();
                Iterator<Integer> it2 = mergedWarpMessage.msgIds.iterator();
                while (it2.hasNext()) {
                    sb.append(AbstractRpcRemotingClient.MSG_ID_PREFIX).append(it2.next().intValue()).append(AbstractRpcRemotingClient.SINGLE_LOG_POSTFIX);
                }
                sb.append("\n");
                Iterator it3 = AbstractRpcRemotingClient.this.futures.keySet().iterator();
                while (it3.hasNext()) {
                    sb.append(AbstractRpcRemotingClient.FUTURES_PREFIX).append(((Integer) it3.next()).intValue()).append(AbstractRpcRemotingClient.SINGLE_LOG_POSTFIX);
                }
                AbstractRpcRemotingClient.LOGGER.debug(sb.toString());
            }
        }
    }

    public AbstractRpcRemotingClient(NettyClientConfig nettyClientConfig, EventExecutorGroup eventExecutorGroup, ThreadPoolExecutor threadPoolExecutor, NettyPoolKey.TransactionRole transactionRole) {
        super(threadPoolExecutor);
        this.transactionRole = transactionRole;
        this.clientBootstrap = new RpcClientBootstrap(nettyClientConfig, eventExecutorGroup, this, transactionRole);
        this.clientChannelManager = new NettyClientChannelManager(new NettyPoolableFactory(this, this.clientBootstrap), getPoolKeyFunction(), nettyClientConfig);
    }

    public NettyClientChannelManager getClientChannelManager() {
        return this.clientChannelManager;
    }

    protected abstract Function<String, NettyPoolKey> getPoolKeyFunction();

    protected abstract String getTransactionServiceGroup();

    @Override // io.seata.core.rpc.netty.AbstractRpcRemoting
    public void init() {
        this.clientBootstrap.start();
        this.timerExecutor.scheduleAtFixedRate(new Runnable() { // from class: io.seata.core.rpc.netty.AbstractRpcRemotingClient.1
            @Override // java.lang.Runnable
            public void run() {
                AbstractRpcRemotingClient.this.clientChannelManager.reconnect(AbstractRpcRemotingClient.this.getTransactionServiceGroup());
            }
        }, 5L, 5L, TimeUnit.SECONDS);
        this.mergeSendExecutorService = new ThreadPoolExecutor(1, 1, KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS, (BlockingQueue<Runnable>) new LinkedBlockingQueue(), (ThreadFactory) new NamedThreadFactory(getThreadPrefix(), 1));
        this.mergeSendExecutorService.submit(new MergedSendRunnable());
        super.init();
    }

    @Override // io.seata.core.rpc.netty.AbstractRpcRemoting, io.seata.core.rpc.Disposable
    public void destroy() {
        this.clientBootstrap.shutdown();
        this.mergeSendExecutorService.shutdown();
    }

    @Override // io.seata.core.rpc.netty.AbstractRpcRemoting
    public void channelRead(ChannelHandlerContext channelHandlerContext, Object obj) throws Exception {
        if (obj instanceof RpcMessage) {
            RpcMessage rpcMessage = (RpcMessage) obj;
            if (rpcMessage.getBody() == HeartbeatMessage.PONG) {
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("received PONG from {}", channelHandlerContext.channel().remoteAddress());
                }
            } else {
                if (!(rpcMessage.getBody() instanceof MergeResultMessage)) {
                    super.channelRead(channelHandlerContext, obj);
                    return;
                }
                MergeResultMessage mergeResultMessage = (MergeResultMessage) rpcMessage.getBody();
                MergedWarpMessage mergedWarpMessage = (MergedWarpMessage) this.mergeMsgMap.remove(Integer.valueOf(rpcMessage.getId()));
                for (int i = 0; i < mergedWarpMessage.msgs.size(); i++) {
                    int intValue = mergedWarpMessage.msgIds.get(i).intValue();
                    MessageFuture remove = this.futures.remove(Integer.valueOf(intValue));
                    if (remove != null) {
                        remove.setResultMessage(mergeResultMessage.getMsgs()[i]);
                    } else if (LOGGER.isInfoEnabled()) {
                        LOGGER.info("msg: {} is not found in futures.", Integer.valueOf(intValue));
                    }
                }
            }
        }
    }

    @Override // io.seata.core.rpc.netty.AbstractRpcRemoting
    public void dispatch(RpcMessage rpcMessage, ChannelHandlerContext channelHandlerContext) {
        if (this.clientMessageListener != null) {
            this.clientMessageListener.onMessage(rpcMessage, NetUtil.toStringAddress(channelHandlerContext.channel().remoteAddress()), this);
        }
    }

    public void channelInactive(ChannelHandlerContext channelHandlerContext) throws Exception {
        if (this.messageExecutor.isShutdown()) {
            return;
        }
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("channel inactive: {}", channelHandlerContext.channel());
        }
        this.clientChannelManager.releaseChannel(channelHandlerContext.channel(), NetUtil.toStringAddress(channelHandlerContext.channel().remoteAddress()));
        super.channelInactive(channelHandlerContext);
    }

    public void userEventTriggered(ChannelHandlerContext channelHandlerContext, Object obj) {
        if (obj instanceof IdleStateEvent) {
            IdleStateEvent idleStateEvent = (IdleStateEvent) obj;
            if (idleStateEvent.state() == IdleState.READER_IDLE) {
                if (LOGGER.isInfoEnabled()) {
                    LOGGER.info("channel" + channelHandlerContext.channel() + " read idle.");
                }
                try {
                    try {
                        this.clientChannelManager.invalidateObject(NetUtil.toStringAddress(channelHandlerContext.channel().remoteAddress()), channelHandlerContext.channel());
                        this.clientChannelManager.releaseChannel(channelHandlerContext.channel(), getAddressFromContext(channelHandlerContext));
                    } catch (Exception e) {
                        LOGGER.error(e.getMessage());
                        this.clientChannelManager.releaseChannel(channelHandlerContext.channel(), getAddressFromContext(channelHandlerContext));
                    }
                } catch (Throwable th) {
                    this.clientChannelManager.releaseChannel(channelHandlerContext.channel(), getAddressFromContext(channelHandlerContext));
                    throw th;
                }
            }
            if (idleStateEvent == IdleStateEvent.WRITER_IDLE_STATE_EVENT) {
                try {
                    if (LOGGER.isDebugEnabled()) {
                        LOGGER.debug("will send ping msg,channel" + channelHandlerContext.channel());
                    }
                    sendRequest(channelHandlerContext.channel(), HeartbeatMessage.PING);
                } catch (Throwable th2) {
                    LOGGER.error("", "send request error", th2);
                }
            }
        }
    }

    @Override // io.seata.core.rpc.netty.AbstractRpcRemoting
    public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) throws Exception {
        LOGGER.error(FrameworkErrorCode.ExceptionCaught.getErrCode(), NetUtil.toStringAddress(channelHandlerContext.channel().remoteAddress()) + "connect exception. " + th.getMessage(), th);
        this.clientChannelManager.releaseChannel(channelHandlerContext.channel(), getAddressFromChannel(channelHandlerContext.channel()));
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("remove exception rm channel:" + channelHandlerContext.channel());
        }
        super.exceptionCaught(channelHandlerContext, th);
    }

    @Override // io.seata.core.rpc.ClientMessageSender
    public Object sendMsgWithResponse(Object obj, long j) throws TimeoutException {
        String loadBalance = loadBalance(getTransactionServiceGroup());
        return super.sendAsyncRequestWithResponse(loadBalance, this.clientChannelManager.acquireChannel(loadBalance), obj, j);
    }

    @Override // io.seata.core.rpc.ClientMessageSender
    public Object sendMsgWithResponse(Object obj) throws TimeoutException {
        return sendMsgWithResponse(obj, NettyClientConfig.getRpcRequestTimeout());
    }

    @Override // io.seata.core.rpc.ClientMessageSender
    public Object sendMsgWithResponse(String str, Object obj, long j) throws TimeoutException {
        return sendAsyncRequestWithResponse(str, this.clientChannelManager.acquireChannel(str), obj, j);
    }

    @Override // io.seata.core.rpc.ClientMessageSender
    public void sendResponse(RpcMessage rpcMessage, String str, Object obj) {
        super.sendResponse(rpcMessage, this.clientChannelManager.acquireChannel(str), obj);
    }

    public ClientMessageListener getClientMessageListener() {
        return this.clientMessageListener;
    }

    public void setClientMessageListener(ClientMessageListener clientMessageListener) {
        this.clientMessageListener = clientMessageListener;
    }

    @Override // io.seata.core.rpc.netty.AbstractRpcRemoting
    public void destroyChannel(String str, Channel channel) {
        this.clientChannelManager.destroyChannel(str, channel);
    }

    private String loadBalance(String str) {
        InetSocketAddress inetSocketAddress = null;
        try {
            inetSocketAddress = (InetSocketAddress) LoadBalanceFactory.getInstance().select(RegistryFactory.getInstance().lookup(str));
        } catch (Exception e) {
            LOGGER.error(e.getMessage());
        }
        if (inetSocketAddress == null) {
            throw new FrameworkException(FrameworkErrorCode.NoAvailableService);
        }
        return NetUtil.toStringAddress(inetSocketAddress);
    }

    private String getThreadPrefix() {
        return "rpcMergeMessageSend_" + this.transactionRole.name();
    }
}
