package com.taobao.drc.clusterclient.coordinator.impl;

import com.taobao.drc.clusterclient.PartitionManager;
import com.taobao.drc.clusterclient.clustermanager.APIProxy;
import com.taobao.drc.clusterclient.clustermanager.BatchCommitRequest;
import com.taobao.drc.clusterclient.clustermanager.BatchCommitResponse;
import com.taobao.drc.clusterclient.clustermanager.BatchGetPartitionRequest;
import com.taobao.drc.clusterclient.clustermanager.BatchGetPartitionResponse;
import com.taobao.drc.clusterclient.clustermanager.ExpectedConsumerStatus;
import com.taobao.drc.clusterclient.clustermanager.LocalConsumerStatus;
import com.taobao.drc.clusterclient.clustermanager.LocalPartitionStatus;
import com.taobao.drc.clusterclient.clustermanager.PartitionInfo;
import com.taobao.drc.clusterclient.coordinator.Coordinator;
import com.taobao.drc.clusterclient.partition.PartitionRef;
import com.taobao.drc.clusterclient.util.Futures;
import com.taobao.drc.clusterclient.util.SettableFuture;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/taobao/drc/clusterclient/coordinator/impl/DefaultCoordinator.class */
public class DefaultCoordinator implements Coordinator {
    static final long MIN_DELAY_MS_BETWEEN_COMMIT = 3000;
    static final long DEFAULT_COMMIT_PERIOD_MS = 10000;
    private static final Logger logger = LoggerFactory.getLogger(Coordinator.class);
    private final String clusterAddress;
    private final APIProxy apiProxy;
    private final int id;
    private AtomicBoolean stopped;
    private volatile boolean inFinalCommit;
    private final SettableFuture finalCommitFuture;
    private volatile long commitPeriodMs;
    private long sessionTimeoutMs;
    private long autoIncId;
    private final ScheduledExecutorService commitExecutor;
    private final ExecutorService eventExecutor;
    private final ExecutorService ioExecutor;
    private final Map<String, PartitionManager> seqToManager;
    private final Map<PartitionManager, String> managerToSeq;
    private final Queue<FinalCommit> finalCommits;
    private final Map<String, FinalCommit> pendingFinalCommits;
    private final Runnable normalCommitTask;
    private final Runnable finalCommitTask;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/taobao/drc/clusterclient/coordinator/impl/DefaultCoordinator$FinalCommit.class */
    public static class FinalCommit {
        private final LocalConsumerStatus consumerStatus;
        private final SettableFuture<ExpectedConsumerStatus> promise;

        public FinalCommit(LocalConsumerStatus localConsumerStatus, SettableFuture<ExpectedConsumerStatus> settableFuture) {
            this.consumerStatus = localConsumerStatus;
            this.promise = settableFuture;
        }

        public LocalConsumerStatus getConsumerStatus() {
            return this.consumerStatus;
        }

        public SettableFuture<ExpectedConsumerStatus> getPromise() {
            return this.promise;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public DefaultCoordinator(String str, int i, String str2, int i2) {
        this(str, i, str2, new APIProxy(str), i2);
    }

    DefaultCoordinator(final String str, final int i, final String str2, APIProxy aPIProxy, int i2) {
        this.stopped = new AtomicBoolean(false);
        this.inFinalCommit = false;
        this.finalCommitFuture = new SettableFuture();
        this.commitPeriodMs = DEFAULT_COMMIT_PERIOD_MS;
        this.sessionTimeoutMs = 0L;
        this.autoIncId = 0L;
        this.seqToManager = new HashMap();
        this.managerToSeq = new IdentityHashMap();
        this.finalCommits = new ArrayDeque();
        this.pendingFinalCommits = new HashMap();
        this.normalCommitTask = new Runnable() { // from class: com.taobao.drc.clusterclient.coordinator.impl.DefaultCoordinator.1
            @Override // java.lang.Runnable
            public void run() {
                DefaultCoordinator.this.collectAndCommit();
            }
        };
        this.finalCommitTask = new Runnable() { // from class: com.taobao.drc.clusterclient.coordinator.impl.DefaultCoordinator.2
            @Override // java.lang.Runnable
            public void run() {
                DefaultCoordinator.this.doFinalCommits();
            }
        };
        if (i2 < 1) {
            throw new IllegalArgumentException("Invalid IO pool capacity [" + i2 + "]");
        }
        logger.debug("DefaultCoordinator:id:[{}],clusterAddress:[{}]", Integer.valueOf(i), str);
        this.clusterAddress = str;
        this.id = i;
        this.apiProxy = aPIProxy;
        this.commitExecutor = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() { // from class: com.taobao.drc.clusterclient.coordinator.impl.DefaultCoordinator.3
            @Override // java.util.concurrent.ThreadFactory
            public Thread newThread(Runnable runnable) {
                return new Thread(runnable, str2 + "Commit-Loop[" + i + "]-" + str);
            }
        });
        this.eventExecutor = Executors.newSingleThreadExecutor(new ThreadFactory() { // from class: com.taobao.drc.clusterclient.coordinator.impl.DefaultCoordinator.4
            @Override // java.util.concurrent.ThreadFactory
            public Thread newThread(Runnable runnable) {
                return new Thread(runnable, str2 + "Event-Loop[" + i + "]-" + str);
            }
        });
        this.ioExecutor = new ThreadPoolExecutor(i2, i2, 30L, TimeUnit.SECONDS, new LinkedBlockingDeque(), new ThreadFactory() { // from class: com.taobao.drc.clusterclient.coordinator.impl.DefaultCoordinator.5
            @Override // java.util.concurrent.ThreadFactory
            public Thread newThread(Runnable runnable) {
                return new Thread(runnable, str2 + "IO-Thread[" + i + "]-" + str);
            }
        });
        scheduleCommitAfter(MIN_DELAY_MS_BETWEEN_COMMIT);
        scheduleFinalCommitAfter(MIN_DELAY_MS_BETWEEN_COMMIT);
    }

    @Override // com.taobao.drc.clusterclient.coordinator.Coordinator
    public String getClusterAddress() {
        return this.clusterAddress;
    }

    @Override // com.taobao.drc.clusterclient.coordinator.Coordinator
    public long getSessionTimeoutMs() {
        return this.sessionTimeoutMs;
    }

    @Override // com.taobao.drc.clusterclient.coordinator.Coordinator
    public long getCommitPeriodMs() {
        return this.commitPeriodMs;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public String generateLocalSeq() {
        StringBuilder append = new StringBuilder().append("local-");
        long j = this.autoIncId;
        this.autoIncId = j + 1;
        return append.append(j).toString();
    }

    @Override // com.taobao.drc.clusterclient.coordinator.Coordinator
    public Future register(final PartitionManager partitionManager) {
        if (this.clusterAddress.equals(partitionManager.getContext().getClusterUrl())) {
            return this.eventExecutor.submit(new Runnable() { // from class: com.taobao.drc.clusterclient.coordinator.impl.DefaultCoordinator.6
                @Override // java.lang.Runnable
                public void run() {
                    if (DefaultCoordinator.this.managerToSeq.containsKey(partitionManager)) {
                        DefaultCoordinator.logger.info("The partition manager has been registered with seq [{}]!", DefaultCoordinator.this.managerToSeq.get(partitionManager));
                        throw new IllegalArgumentException("The partition manager has been registered!");
                    }
                    String generateLocalSeq = DefaultCoordinator.this.generateLocalSeq();
                    DefaultCoordinator.this.managerToSeq.put(partitionManager, generateLocalSeq);
                    DefaultCoordinator.this.seqToManager.put(generateLocalSeq, partitionManager);
                    DefaultCoordinator.logger.info("DefaultCoordinator::register Allocated temp-seq [{}] for partition manager [{}][{}]", new Object[]{generateLocalSeq, partitionManager.getContext().getAppGuid(), partitionManager.getContext().getAppGroup()});
                }
            });
        }
        throw new IllegalArgumentException("Invalid cluster address of partition manager, expected: [" + this.clusterAddress + "], actual: [" + partitionManager.getContext().getClusterUrl() + "]");
    }

    @Override // com.taobao.drc.clusterclient.coordinator.Coordinator
    public Future deregister(final PartitionManager partitionManager) {
        final SettableFuture settableFuture = new SettableFuture();
        this.eventExecutor.submit(new Runnable() { // from class: com.taobao.drc.clusterclient.coordinator.impl.DefaultCoordinator.7
            @Override // java.lang.Runnable
            public void run() {
                String str = (String) DefaultCoordinator.this.managerToSeq.remove(partitionManager);
                if (str == null) {
                    DefaultCoordinator.logger.warn("Partition manager of [{}][{}] was not registered on coordinator for [{}]", new Object[]{partitionManager.getContext().getAppGuid(), partitionManager.getContext().getAppGroup(), DefaultCoordinator.this.clusterAddress});
                    settableFuture.success(null);
                    return;
                }
                DefaultCoordinator.this.seqToManager.remove(str);
                LocalConsumerStatus localConsumerStatus = partitionManager.getLocalConsumerStatus();
                localConsumerStatus.setSeq(str);
                ArrayList arrayList = new ArrayList();
                Iterator<LocalPartitionStatus> it = localConsumerStatus.getPartitions().iterator();
                while (it.hasNext()) {
                    arrayList.add(it.next().getPartition());
                }
                DefaultCoordinator.logger.info("Partition manager [{}][{}][{}] is about to stop, partitions {}", new Object[]{localConsumerStatus.getGuid(), localConsumerStatus.getGroup(), localConsumerStatus.getSeq(), arrayList});
                if (localConsumerStatus.getPartitions().isEmpty()) {
                    settableFuture.success(null);
                } else {
                    DefaultCoordinator.logger.info("Try to do final commit for partitions {} of [{}][{}]", new Object[]{arrayList, localConsumerStatus.getGuid(), localConsumerStatus.getGroup()});
                    DefaultCoordinator.this.finalCommits.add(new FinalCommit(localConsumerStatus, settableFuture));
                }
            }
        });
        return settableFuture;
    }

    private void scheduleCommitAfter(long j) {
        logger.debug("To schedule a batch commit for [{}][{}] after [{}] ms", new Object[]{this.clusterAddress, Integer.valueOf(this.id), Long.valueOf(j)});
        this.commitExecutor.schedule(this.normalCommitTask, j, TimeUnit.MILLISECONDS);
    }

    private void scheduleFinalCommitAfter(long j) {
        logger.trace("To schedule a final commit for [{}][{}] after [{}] ms", new Object[]{this.clusterAddress, Integer.valueOf(this.id), Long.valueOf(j)});
        this.commitExecutor.schedule(this.finalCommitTask, j, TimeUnit.MILLISECONDS);
    }

    private Future<BatchCommitRequest> collectRequest() {
        return this.eventExecutor.submit(new Callable<BatchCommitRequest>() { // from class: com.taobao.drc.clusterclient.coordinator.impl.DefaultCoordinator.8
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public BatchCommitRequest call() throws Exception {
                BatchCommitRequest batchCommitRequest = new BatchCommitRequest();
                for (Map.Entry entry : DefaultCoordinator.this.seqToManager.entrySet()) {
                    LocalConsumerStatus localConsumerStatus = ((PartitionManager) entry.getValue()).getLocalConsumerStatus();
                    localConsumerStatus.setSeq((String) entry.getKey());
                    batchCommitRequest.addConsumerStatus(localConsumerStatus);
                }
                return batchCommitRequest;
            }
        });
    }

    private Future<BatchCommitRequest> collectFinalRequest() {
        return this.eventExecutor.submit(new Callable<BatchCommitRequest>() { // from class: com.taobao.drc.clusterclient.coordinator.impl.DefaultCoordinator.9
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public BatchCommitRequest call() throws Exception {
                BatchCommitRequest batchCommitRequest = new BatchCommitRequest();
                while (!DefaultCoordinator.this.finalCommits.isEmpty()) {
                    FinalCommit finalCommit = (FinalCommit) DefaultCoordinator.this.finalCommits.poll();
                    batchCommitRequest.addConsumerStatus(finalCommit.getConsumerStatus());
                    DefaultCoordinator.this.pendingFinalCommits.put(finalCommit.getConsumerStatus().getSeq(), finalCommit);
                }
                return batchCommitRequest;
            }
        });
    }

    void collectAndCommit() {
        if (this.stopped.get()) {
            logger.info("Coordinator for [{}][{}] has been stopped", this.clusterAddress, Integer.valueOf(this.id));
            return;
        }
        long nanoTime = System.nanoTime();
        try {
            try {
                final BatchCommitRequest batchCommitRequest = collectRequest().get();
                TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nanoTime);
                final BatchCommitResponse batchCommit = this.apiProxy.batchCommit(batchCommitRequest);
                if (!batchCommit.isSuccess()) {
                    throw new IllegalStateException("Batch commit for [" + this.clusterAddress + "] failed: [" + batchCommit.getErrMsg() + "]");
                }
                handleChangedParameter(batchCommit);
                this.eventExecutor.execute(new Runnable() { // from class: com.taobao.drc.clusterclient.coordinator.impl.DefaultCoordinator.10
                    @Override // java.lang.Runnable
                    public void run() {
                        DefaultCoordinator.this.handleBatchCommitSuccess(batchCommitRequest, batchCommit);
                    }
                });
                long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nanoTime);
                logger.debug("Batch commit for [{}][{}] took [{}] ms", new Object[]{this.clusterAddress, Integer.valueOf(this.id), Long.valueOf(millis)});
                if (this.stopped.get()) {
                    return;
                }
                scheduleCommitAfter(Math.max(getCommitPeriodMs() - millis, MIN_DELAY_MS_BETWEEN_COMMIT));
            } catch (InterruptedException e) {
                logger.info("Coordinator for [{}][{}] interrupted", this.clusterAddress, Integer.valueOf(this.id));
                long millis2 = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nanoTime);
                logger.debug("Batch commit for [{}][{}] took [{}] ms", new Object[]{this.clusterAddress, Integer.valueOf(this.id), Long.valueOf(millis2)});
                if (this.stopped.get()) {
                    return;
                }
                scheduleCommitAfter(Math.max(getCommitPeriodMs() - millis2, MIN_DELAY_MS_BETWEEN_COMMIT));
            } catch (Exception e2) {
                logger.error("Coordinator Failed to commit for [{}][{}],content:[{}],need check:cm", new Object[]{this.clusterAddress, Integer.valueOf(this.id), e2});
                this.eventExecutor.execute(new Runnable() { // from class: com.taobao.drc.clusterclient.coordinator.impl.DefaultCoordinator.11
                    @Override // java.lang.Runnable
                    public void run() {
                        DefaultCoordinator.this.handleBatchCommitFailure(e2);
                    }
                });
                long millis3 = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nanoTime);
                logger.debug("Batch commit for [{}][{}] took [{}] ms", new Object[]{this.clusterAddress, Integer.valueOf(this.id), Long.valueOf(millis3)});
                if (this.stopped.get()) {
                    return;
                }
                scheduleCommitAfter(Math.max(getCommitPeriodMs() - millis3, MIN_DELAY_MS_BETWEEN_COMMIT));
            }
        } catch (Throwable th) {
            long millis4 = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nanoTime);
            logger.debug("Batch commit for [{}][{}] took [{}] ms", new Object[]{this.clusterAddress, Integer.valueOf(this.id), Long.valueOf(millis4)});
            if (!this.stopped.get()) {
                scheduleCommitAfter(Math.max(getCommitPeriodMs() - millis4, MIN_DELAY_MS_BETWEEN_COMMIT));
            }
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void doFinalCommits() {
        if (this.stopped.get()) {
            logger.info("To do the final commit for [{}][{}], remaining manager num: [{}]", new Object[]{this.clusterAddress, Integer.valueOf(this.id), Integer.valueOf(this.seqToManager.size())});
            this.inFinalCommit = true;
        }
        long nanoTime = System.nanoTime();
        try {
            try {
                final BatchCommitRequest batchCommitRequest = collectFinalRequest().get();
                if (batchCommitRequest.getLocalConsumerStatusList().isEmpty()) {
                    logger.trace("No final commits for [{}][{}]", this.clusterAddress, Integer.valueOf(this.id));
                    if (this.inFinalCommit) {
                        this.finalCommitFuture.success(null);
                    }
                    if (0 != 0) {
                        logger.debug("Final commit for [{}][{}] took [{}] ms", new Object[]{this.clusterAddress, Integer.valueOf(this.id), Long.valueOf(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nanoTime))});
                    }
                    if (this.stopped.get()) {
                        return;
                    }
                    scheduleFinalCommitAfter(MIN_DELAY_MS_BETWEEN_COMMIT);
                    return;
                }
                logger.info("doFinalCommits:BatchCommitRequest clusterAddress:[{}] path:[{}],content:[{}]", new Object[]{this.clusterAddress, batchCommitRequest.getPath(), batchCommitRequest.getContent()});
                final BatchCommitResponse batchCommit = this.apiProxy.batchCommit(batchCommitRequest);
                if (!batchCommit.isSuccess()) {
                    throw new IllegalStateException("Batch commit to [" + this.clusterAddress + "] failed: [" + batchCommit.getErrMsg() + "]");
                }
                this.eventExecutor.execute(new Runnable() { // from class: com.taobao.drc.clusterclient.coordinator.impl.DefaultCoordinator.12
                    @Override // java.lang.Runnable
                    public void run() {
                        DefaultCoordinator.this.handleFinalCommitSuccess(batchCommitRequest, batchCommit);
                    }
                });
                if (this.inFinalCommit) {
                    this.finalCommitFuture.success(null);
                }
                if (1 != 0) {
                    logger.debug("Final commit for [{}][{}] took [{}] ms", new Object[]{this.clusterAddress, Integer.valueOf(this.id), Long.valueOf(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nanoTime))});
                }
                if (this.stopped.get()) {
                    return;
                }
                scheduleFinalCommitAfter(MIN_DELAY_MS_BETWEEN_COMMIT);
            } catch (Exception e) {
                logger.error("Failed to do final commit for [{}][{}]", new Object[]{this.clusterAddress, Integer.valueOf(this.id), e});
                this.eventExecutor.execute(new Runnable() { // from class: com.taobao.drc.clusterclient.coordinator.impl.DefaultCoordinator.13
                    @Override // java.lang.Runnable
                    public void run() {
                        DefaultCoordinator.this.handleFinalCommitFailure(e);
                    }
                });
                if (this.inFinalCommit) {
                    this.finalCommitFuture.success(null);
                }
                if (0 != 0) {
                    logger.debug("Final commit for [{}][{}] took [{}] ms", new Object[]{this.clusterAddress, Integer.valueOf(this.id), Long.valueOf(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nanoTime))});
                }
                if (this.stopped.get()) {
                    return;
                }
                scheduleFinalCommitAfter(MIN_DELAY_MS_BETWEEN_COMMIT);
            }
        } catch (Throwable th) {
            if (this.inFinalCommit) {
                this.finalCommitFuture.success(null);
            }
            if (0 != 0) {
                logger.debug("Final commit for [{}][{}] took [{}] ms", new Object[]{this.clusterAddress, Integer.valueOf(this.id), Long.valueOf(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nanoTime))});
            }
            if (!this.stopped.get()) {
                scheduleFinalCommitAfter(MIN_DELAY_MS_BETWEEN_COMMIT);
            }
            throw th;
        }
    }

    private void handleChangedParameter(BatchCommitResponse batchCommitResponse) {
        long intValue = batchCommitResponse.getData().getSessionTimeoutSeconds().intValue() * 1000;
        long intValue2 = batchCommitResponse.getData().getCommitPeriodSeconds().intValue() * 1000;
        if (this.sessionTimeoutMs != intValue) {
            logger.info("DefaultCoordinator::handleChangedParameter Session timeout for cluster [{}] changed from [{}] to [{}]", new Object[]{this.clusterAddress, Long.valueOf(this.sessionTimeoutMs), Long.valueOf(intValue)});
            this.sessionTimeoutMs = intValue;
        }
        if (this.commitPeriodMs != intValue2) {
            logger.info("DefaultCoordinator::handleChangedParameter Commit period for cluster [{}] changed from [{}] to [{}]", new Object[]{this.clusterAddress, Long.valueOf(this.commitPeriodMs), Long.valueOf(intValue2)});
            this.commitPeriodMs = intValue2;
        }
    }

    void handleBatchCommitSuccess(BatchCommitRequest batchCommitRequest, BatchCommitResponse batchCommitResponse) {
        final BatchGetPartitionRequest batchGetPartitionRequest = new BatchGetPartitionRequest();
        final IdentityHashMap identityHashMap = new IdentityHashMap();
        HashSet hashSet = new HashSet();
        Iterator<LocalConsumerStatus> it = batchCommitRequest.getLocalConsumerStatusList().iterator();
        while (it.hasNext()) {
            hashSet.add(it.next().getSeq());
        }
        for (ExpectedConsumerStatus expectedConsumerStatus : batchCommitResponse.getData().getConsumers()) {
            logger.info("DefaultCoordinator expectedConsumerStatus:[{}]", expectedConsumerStatus);
            if (expectedConsumerStatus.isNewSeqAllocated()) {
                logger.info("Allocated consumer seq [{}] for local seq [{}]", expectedConsumerStatus.getAllocatedSeq(), expectedConsumerStatus.getLocalSeq());
                PartitionManager partitionManager = this.seqToManager.get(expectedConsumerStatus.getLocalSeq());
                if (partitionManager == null) {
                    logger.warn("DefaultCoordinator Manager with local seq [{}] not exists", expectedConsumerStatus.getLocalSeq());
                } else {
                    this.seqToManager.remove(expectedConsumerStatus.getLocalSeq());
                    this.seqToManager.put(expectedConsumerStatus.getAllocatedSeq(), partitionManager);
                    this.managerToSeq.put(partitionManager, expectedConsumerStatus.getAllocatedSeq());
                    Map<PartitionRef, Long> onCommitComplete = partitionManager.onCommitComplete(expectedConsumerStatus, null);
                    identityHashMap.put(partitionManager, onCommitComplete);
                    for (PartitionRef partitionRef : onCommitComplete.keySet()) {
                        logger.info("isNewSeqAllocated request.addPartition:Guid:[{}],Group:[{}],Partition:[{}]", new Object[]{partitionRef.getGuid(), partitionRef.getGroup(), partitionRef.getPartition()});
                        batchGetPartitionRequest.addPartition(partitionRef.getGuid(), partitionRef.getGroup(), partitionRef.getPartition());
                    }
                }
                hashSet.remove(expectedConsumerStatus.getLocalSeq());
            } else {
                PartitionManager partitionManager2 = this.seqToManager.get(expectedConsumerStatus.getSeq());
                if (partitionManager2 != null) {
                    Map<PartitionRef, Long> onCommitComplete2 = partitionManager2.onCommitComplete(expectedConsumerStatus, null);
                    identityHashMap.put(partitionManager2, onCommitComplete2);
                    for (PartitionRef partitionRef2 : onCommitComplete2.keySet()) {
                        logger.info("not isNewSeqAllocated request.addPartition:Guid:[{}],Group:[{}],Partition:[{}]", new Object[]{partitionRef2.getGuid(), partitionRef2.getGroup(), partitionRef2.getPartition()});
                        batchGetPartitionRequest.addPartition(partitionRef2.getGuid(), partitionRef2.getGroup(), partitionRef2.getPartition());
                    }
                } else {
                    logger.error("Manager with seq [{}] not exists on [{}][{}]", new Object[]{expectedConsumerStatus.getSeq(), this.clusterAddress, Integer.valueOf(this.id)});
                }
                hashSet.remove(expectedConsumerStatus.getSeq());
            }
        }
        if (!hashSet.isEmpty()) {
            logger.error("Found consumers provided in batch commit request not exists in batch commit response: {}", hashSet);
        }
        if (batchGetPartitionRequest.getPartitionRefs().isEmpty()) {
            return;
        }
        logger.debug("handleBatchCommitSuccess begin ioExecutor batchGetPartition");
        this.ioExecutor.execute(new Runnable() { // from class: com.taobao.drc.clusterclient.coordinator.impl.DefaultCoordinator.14
            @Override // java.lang.Runnable
            public void run() {
                try {
                    final BatchGetPartitionResponse batchGetPartitions = DefaultCoordinator.this.apiProxy.batchGetPartitions(batchGetPartitionRequest);
                    if (!batchGetPartitions.isSuccess()) {
                        throw new IllegalStateException("Batch get partitions from [" + DefaultCoordinator.this.clusterAddress + "] failed: [" + batchGetPartitions.getErrMsg() + "]");
                    }
                    DefaultCoordinator.this.eventExecutor.execute(new Runnable() { // from class: com.taobao.drc.clusterclient.coordinator.impl.DefaultCoordinator.14.1
                        @Override // java.lang.Runnable
                        public void run() {
                            DefaultCoordinator.this.handleBatchGetPartitionSuccess(identityHashMap, batchGetPartitions);
                        }
                    });
                } catch (Exception e) {
                    DefaultCoordinator.this.eventExecutor.execute(new Runnable() { // from class: com.taobao.drc.clusterclient.coordinator.impl.DefaultCoordinator.14.2
                        @Override // java.lang.Runnable
                        public void run() {
                            DefaultCoordinator.this.handleBatchGetException(identityHashMap, e);
                        }
                    });
                }
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleBatchCommitFailure(Exception exc) {
        Iterator<PartitionManager> it = this.seqToManager.values().iterator();
        while (it.hasNext()) {
            it.next().onCommitComplete(null, exc);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleFinalCommitSuccess(BatchCommitRequest batchCommitRequest, BatchCommitResponse batchCommitResponse) {
        Iterator<LocalConsumerStatus> it = batchCommitRequest.getLocalConsumerStatusList().iterator();
        while (it.hasNext()) {
            FinalCommit remove = this.pendingFinalCommits.remove(it.next().getSeq());
            logger.error("Final commit for [{}][{}][{}] succeeded", new Object[]{remove.getConsumerStatus().getGuid(), remove.getConsumerStatus().getGroup(), remove.getConsumerStatus().getSeq()});
            remove.getPromise().success(null);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleFinalCommitFailure(Exception exc) {
        for (FinalCommit finalCommit : this.pendingFinalCommits.values()) {
            logger.error("Final commit for [{}][{}][{}] failed", new Object[]{finalCommit.getConsumerStatus().getGuid(), finalCommit.getConsumerStatus().getGroup(), finalCommit.getConsumerStatus().getSeq(), exc});
            finalCommit.getPromise().success(null);
        }
        this.pendingFinalCommits.clear();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleBatchGetPartitionSuccess(Map<PartitionManager, Map<PartitionRef, Long>> map, BatchGetPartitionResponse batchGetPartitionResponse) {
        Map<PartitionRef, PartitionInfo> groupPartitionInfoMap = batchGetPartitionResponse.groupPartitionInfoMap();
        for (Map.Entry<PartitionManager, Map<PartitionRef, Long>> entry : map.entrySet()) {
            PartitionManager key = entry.getKey();
            Map<PartitionRef, Long> value = entry.getValue();
            HashMap hashMap = new HashMap();
            for (Map.Entry<PartitionRef, Long> entry2 : value.entrySet()) {
                PartitionInfo partitionInfo = groupPartitionInfoMap.get(entry2.getKey());
                if (partitionInfo == null) {
                    logger.error("Got no partition info for [{}]", entry2.getKey());
                } else {
                    logger.info("DefaultCoordinator: partition:key:" + entry2.getKey() + ",partitionInfo:" + partitionInfo.toString());
                }
                hashMap.put(entry2.getKey(), partitionInfo);
            }
            key.onGetPartitionInfoComplete(hashMap, value, null);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleBatchGetException(Map<PartitionManager, Map<PartitionRef, Long>> map, Throwable th) {
        for (Map.Entry<PartitionManager, Map<PartitionRef, Long>> entry : map.entrySet()) {
            entry.getKey().onGetPartitionInfoComplete(new HashMap(), entry.getValue(), th);
        }
    }

    @Override // com.taobao.drc.clusterclient.coordinator.Coordinator
    public Future runOnIOPool(Runnable runnable) {
        return this.ioExecutor.submit(runnable);
    }

    @Override // com.taobao.drc.clusterclient.coordinator.Coordinator
    public Future runOnEventThread(Runnable runnable) {
        return this.eventExecutor.submit(runnable);
    }

    @Override // com.taobao.drc.clusterclient.coordinator.Coordinator
    public Future asyncClose() {
        if (!this.stopped.compareAndSet(false, true)) {
            logger.warn("The coordinator for [{}][{}] has already been closed", this.clusterAddress, Integer.valueOf(this.id));
            return Futures.success(null);
        }
        final SettableFuture settableFuture = new SettableFuture();
        Thread thread = new Thread(new Runnable() { // from class: com.taobao.drc.clusterclient.coordinator.impl.DefaultCoordinator.15
            @Override // java.lang.Runnable
            public void run() {
                try {
                    DefaultCoordinator.this.finalCommitFuture.get();
                    DefaultCoordinator.this.commitExecutor.shutdown();
                    DefaultCoordinator.this.ioExecutor.shutdown();
                    DefaultCoordinator.this.eventExecutor.shutdown();
                    DefaultCoordinator.this.eventExecutor.awaitTermination(1L, TimeUnit.MINUTES);
                    DefaultCoordinator.this.commitExecutor.awaitTermination(1L, TimeUnit.MINUTES);
                    DefaultCoordinator.this.ioExecutor.awaitTermination(10L, TimeUnit.MINUTES);
                } catch (InterruptedException e) {
                    DefaultCoordinator.logger.info("Interrupted while closing coordinator for [{}][{}]", DefaultCoordinator.this.clusterAddress, Integer.valueOf(DefaultCoordinator.this.id));
                } catch (ExecutionException e2) {
                    DefaultCoordinator.logger.error("Failed to close coordinator for [{}][{}] peacefully", DefaultCoordinator.this.clusterAddress, Integer.valueOf(DefaultCoordinator.this.id));
                } finally {
                    settableFuture.success(false);
                }
            }
        });
        thread.setName("Coordinator-Finalizer-[" + this.id + "]-" + this.clusterAddress);
        thread.setDaemon(true);
        thread.start();
        return settableFuture;
    }
}
