/*
 * Decompiled with CFR 0.152.
 */
package com.aliyun.drc.clusterclient.partition;

import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.aliyun.drc.clusterclient.RegionContext;
import com.aliyun.drc.clusterclient.impl.ClientCluster;
import com.aliyun.drc.clusterclient.partition.Partition;
import com.aliyun.drc.clustermanager.ClusterManager;
import com.aliyun.drc.clustermanager.Register;
import com.aliyun.drc.clustermanager.RegisteredInfo;
import com.aliyun.drc.regionmanager.RegionRouter;
import com.aliyun.drc.regionmanager.RegionRouterInfo;
import com.aliyun.drc.util.AbnormalThreadHook;
import com.aliyun.drc.util.CipherUtils;
import com.aliyun.drc.util.ReadManifest;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.http.client.utils.URLEncodedUtils;
import org.apache.http.message.BasicNameValuePair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PartitionPool
extends Thread {
    private static final Logger logger = LoggerFactory.getLogger(PartitionPool.class);
    private static final int KEEP_ALIVE_PERIOD = 10000;
    private volatile boolean exited;
    private String guid;
    private String ip;
    private String seq;
    private Register register;
    private final ClientCluster cluster;
    private Map<String, Partition> pool;
    private Map<Thread, AbnormalThreadHook> monitoredThreads;
    private RegionRouter regionRouter;
    private RegionRouterInfo regionRouterInfo;

    public PartitionPool(RegionContext context, ClientCluster cluster) {
        this.setName("DTS-Keep-Alive-Thread");
        this.cluster = cluster;
        this.regionRouter = new RegionRouter(context);
        this.pool = new ConcurrentHashMap<String, Partition>();
        this.monitoredThreads = new ConcurrentHashMap<Thread, AbnormalThreadHook>();
    }

    public void init() throws Exception {
        this.regionRouterInfo = this.regionRouter.getRegionRouterInfo(this.guid);
        if (this.regionRouterInfo == null) {
            logger.error("region router info is null, guid:" + this.guid);
            return;
        }
        ClusterManager cm = new ClusterManager(this.regionRouterInfo.getClusterUrl());
        this.register = new Register(cm);
        this.cluster.setRegionRouterInfo(this.regionRouterInfo);
    }

    public void addPartition(Partition partition) {
        partition.setClientCluster(this.cluster);
        partition.setRegister(this.register);
        partition.setRegionRouterInfo(this.regionRouterInfo);
        partition.setGuid(this.guid);
        partition.setIp(this.ip);
        partition.setSeq(this.seq);
        this.pool.put(partition.getName(), partition);
    }

    public void removePartition(String partition) {
        if (this.pool.containsKey(partition)) {
            this.pool.remove(partition);
        }
    }

    public Partition getPartition(String partition) {
        if (this.pool.containsKey(partition)) {
            return this.pool.get(partition);
        }
        return null;
    }

    public void addMonitoredThread(Thread thread, AbnormalThreadHook hook) {
        this.monitoredThreads.put(thread, hook);
    }

    private void keepAlive() throws Exception {
        JSONArray jsonArray;
        ArrayList<BasicNameValuePair> query = new ArrayList<BasicNameValuePair>();
        query.add(new BasicNameValuePair("ts", String.valueOf(System.currentTimeMillis())));
        query.add(new BasicNameValuePair("group", this.regionRouterInfo.getConsumerGroup()));
        query.add(new BasicNameValuePair("guid", this.guid));
        query.add(new BasicNameValuePair("ip", this.ip));
        query.add(new BasicNameValuePair("seq", this.seq));
        query.add(new BasicNameValuePair("consumer", this.regionRouterInfo.getUsername()));
        query.add(new BasicNameValuePair("password", this.regionRouterInfo.getPassword()));
        query.add(new BasicNameValuePair("maxconns", "10240"));
        logger.info("register client, ip:" + this.ip + ", seq:" + this.seq);
        HashMap<String, String> props = new HashMap<String, String>();
        String token = CipherUtils.encrypt(URLEncodedUtils.format(query, (Charset)Charset.defaultCharset()));
        props.put("token", token);
        RegisteredInfo registeredInfo = this.register.registerClientAsIdle(props);
        ArrayList<String> list = new ArrayList<String>();
        if (registeredInfo.getIsSuccess().booleanValue() && (jsonArray = registeredInfo.getData()) != null && !jsonArray.isEmpty()) {
            for (Object o : jsonArray) {
                JSONObject partitionObject = new JSONObject((Map)o);
                String partition = partitionObject.getJSONObject("partition").getString("name");
                if (this.pool.get(partition) == null) {
                    logger.info("start new partition: " + partitionObject.toJSONString());
                    this.cluster.doStart(partitionObject);
                }
                list.add(partition);
            }
        }
        if (!this.pool.isEmpty()) {
            Iterator<Map.Entry<String, Partition>> iterator = this.pool.entrySet().iterator();
            while (iterator.hasNext()) {
                Map.Entry<String, Partition> entry = iterator.next();
                if (!list.contains(entry.getKey())) {
                    iterator.remove();
                    this.cluster.doStop(entry.getKey());
                    continue;
                }
                entry.getValue().sendHeartbeat();
                entry.getValue().forceActAsConsumed();
            }
        } else {
            logger.info("client partition is empty, wait partition balance");
        }
    }

    private void monitorThreadAlive() {
        Iterator<Map.Entry<Thread, AbnormalThreadHook>> iterator = this.monitoredThreads.entrySet().iterator();
        while (iterator.hasNext()) {
            Map.Entry<Thread, AbnormalThreadHook> entry = iterator.next();
            Thread thread = entry.getKey();
            if (thread.isAlive()) continue;
            entry.getValue().notifyThreadFailed(thread);
            iterator.remove();
        }
    }

    @Override
    public void run() {
        String SDKVersion = ReadManifest.getValue("Build-Version");
        logger.info("Welcome to start keep alive thread, SDK version is " + (SDKVersion != null ? SDKVersion : "4.6.27.12.0"));
        while (!this.exited) {
            try {
                this.keepAlive();
                this.monitorThreadAlive();
                Thread.sleep(10000L);
            }
            catch (InterruptedException e) {
                logger.warn("keep alive thread interrupted...");
                this.exited = true;
                break;
            }
            catch (Exception e) {
                logger.error("keep alive thread exception: ", (Throwable)e);
            }
        }
    }

    public void shutdown() throws InterruptedException {
        this.exited = true;
        this.interrupt();
        this.join();
        logger.info("partition pool has been shutdown...");
    }

    public void setGuid(String guid) {
        this.guid = guid;
    }

    public final String getGuid() {
        return this.guid;
    }

    public void setIp(String ip) {
        this.ip = ip;
    }

    public final String getIp() {
        return this.ip;
    }

    public String getSeq() {
        return this.seq;
    }

    public void setSeq(String seq) {
        this.seq = seq;
    }
}

