package com.alicloud.openservices.tablestore.tunnel.worker;

import com.alicloud.openservices.tablestore.TunnelClientInterface;
import com.alicloud.openservices.tablestore.core.utils.Preconditions;
import com.alicloud.openservices.tablestore.model.tunnel.internal.Channel;
import com.alicloud.openservices.tablestore.model.tunnel.internal.GetCheckpointRequest;
import com.alicloud.openservices.tablestore.model.tunnel.internal.GetCheckpointResponse;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/alicloud/openservices/tablestore/tunnel/worker/TunnelStateMachine.class */
public class TunnelStateMachine {
    private static final Logger LOG = LoggerFactory.getLogger(TunnelStateMachine.class);
    private String tunnelId;
    private String clientId;
    private TunnelClientInterface client;
    private IChannelDialer dialer;
    private IChannelProcessorFactory processorFactory;
    private volatile ConcurrentHashMap<String, IChannelConnect> channelConnects;
    private volatile ConcurrentHashMap<String, Channel> currentChannels;

    public TunnelStateMachine(String str, String str2, IChannelDialer iChannelDialer, IChannelProcessorFactory iChannelProcessorFactory, TunnelClientInterface tunnelClientInterface) {
        Preconditions.checkArgument((str == null || str.isEmpty()) ? false : true, "The tunnel id should not be null or empty.");
        Preconditions.checkArgument((str2 == null || str2.isEmpty()) ? false : true, "The client id should not be null or empty.");
        Preconditions.checkNotNull(iChannelDialer, "Channel dialer cannot be null.");
        Preconditions.checkNotNull(iChannelProcessorFactory, "Channel process factory cannot be null.");
        Preconditions.checkNotNull(tunnelClientInterface, "Tunnel client cannot be null.");
        this.tunnelId = str;
        this.clientId = str2;
        this.dialer = iChannelDialer;
        this.processorFactory = iChannelProcessorFactory;
        this.client = tunnelClientInterface;
        this.channelConnects = new ConcurrentHashMap<>();
        this.currentChannels = new ConcurrentHashMap<>();
    }

    public void updateStatus(Channel channel) {
        LOG.debug("Begin update channel status, channel: {}", channel);
        String channelId = channel.getChannelId();
        Channel channel2 = this.currentChannels.get(channelId);
        if (channel2 == null) {
            LOG.info("Redundant channel, channelId: {}, status: {}", channelId, channel.getStatus().name());
            return;
        }
        LOG.debug("CurrentChannel: {}, UpdateChannel: {}", channel2, channel);
        if (channel2.getVersion() >= channel.getVersion()) {
            LOG.info("Expired channel version, channelId: {}, current version: {}, old version: {}", new Object[]{channelId, Long.valueOf(channel2.getVersion()), Long.valueOf(channel.getVersion())});
            return;
        }
        this.currentChannels.put(channelId, channel);
        IChannelConnect iChannelConnect = this.channelConnects.get(channelId);
        if (iChannelConnect == null || !iChannelConnect.closed()) {
            return;
        }
        this.channelConnects.remove(channelId);
    }

    public List<IChannelConnect> batchGetChannelConnects() {
        return new ArrayList(this.channelConnects.values());
    }

    public List<Channel> batchGetChannels() {
        return new ArrayList(this.currentChannels.values());
    }

    public void batchUpdateChannels(List<Channel> list) {
        LOG.info("Begin batch update channels");
        this.currentChannels = mergeChannels(list);
        for (Map.Entry<String, Channel> entry : this.currentChannels.entrySet()) {
            String key = entry.getKey();
            IChannelConnect iChannelConnect = this.channelConnects.get(key);
            if (iChannelConnect == null) {
                try {
                    GetCheckpointResponse checkpoint = this.client.getCheckpoint(new GetCheckpointRequest(this.tunnelId, this.clientId, key));
                    LOG.info("Get checkpoint response, channelId: {}, checkpoint: {}, sequenceNumber: {}", new Object[]{key, checkpoint.getCheckpoint(), Long.valueOf(checkpoint.getSequenceNumber())});
                    iChannelConnect = this.dialer.channelDial(this.tunnelId, this.clientId, key, checkpoint.getCheckpoint(), this.processorFactory.createProcessor(this.tunnelId, this.clientId, key, new Checkpointer(this.client, this.tunnelId, this.clientId, key, checkpoint.getSequenceNumber() + 1)), this);
                    this.channelConnects.put(key, iChannelConnect);
                } catch (Exception e) {
                    LOG.warn("Failed to update channel, error detail: {}", e.toString());
                    iChannelConnect = new FailedChannelConnect(this);
                }
            }
            iChannelConnect.notifyStatus(entry.getValue());
        }
        for (Map.Entry<String, IChannelConnect> entry2 : this.channelConnects.entrySet()) {
            String key2 = entry2.getKey();
            if (this.currentChannels.get(key2) == null) {
                LOG.info("Clear redundant channel connect, channelId: {}", key2);
                IChannelConnect value = entry2.getValue();
                if (!value.closed()) {
                    value.close();
                }
                this.channelConnects.remove(key2);
            }
        }
    }

    private ConcurrentHashMap<String, Channel> mergeChannels(List<Channel> list) {
        ConcurrentHashMap<String, Channel> concurrentHashMap = new ConcurrentHashMap<>(list.size());
        for (Channel channel : list) {
            String channelId = channel.getChannelId();
            Channel channel2 = this.currentChannels.get(channelId);
            if (channel2 == null) {
                concurrentHashMap.put(channelId, channel);
            } else if (channel.getVersion() >= channel2.getVersion()) {
                concurrentHashMap.put(channelId, channel);
            } else {
                concurrentHashMap.put(channelId, channel2);
            }
        }
        return concurrentHashMap;
    }

    public void close() {
        LOG.info("Begin close tunnel state machine.");
        Iterator<IChannelConnect> it = this.channelConnects.values().iterator();
        while (it.hasNext()) {
            it.next().close();
        }
        LOG.info("Tunnel state machine is closed");
    }
}
