package com.taobao.drc.clusterclient.impl;

import com.alibaba.fastjson.JSON;
import com.taobao.drc.clusterclient.BaseClusterContext;
import com.taobao.drc.clusterclient.ConsumerState;
import com.taobao.drc.clusterclient.MessageNotifier;
import com.taobao.drc.clusterclient.NotifyController;
import com.taobao.drc.clusterclient.PartitionClient;
import com.taobao.drc.clusterclient.PartitionClientFactory;
import com.taobao.drc.clusterclient.PartitionManager;
import com.taobao.drc.clusterclient.clustermanager.ExpectedConsumerStatus;
import com.taobao.drc.clusterclient.clustermanager.ExpectedPartitionStatus;
import com.taobao.drc.clusterclient.clustermanager.LocalConsumerStatus;
import com.taobao.drc.clusterclient.clustermanager.LocalPartitionStatus;
import com.taobao.drc.clusterclient.clustermanager.PartitionInfo;
import com.taobao.drc.clusterclient.coordinator.Coordinator;
import com.taobao.drc.clusterclient.coordinator.CoordinatorManager;
import com.taobao.drc.clusterclient.partition.IPartition;
import com.taobao.drc.clusterclient.partition.PartitionRef;
import com.taobao.drc.clusterclient.partition.PartitionState;
import com.taobao.drc.clusterclient.partition.PartitionStateChangeListener;
import com.taobao.drc.clusterclient.util.Futures;
import com.taobao.drc.clusterclient.util.Time;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/taobao/drc/clusterclient/impl/DefaultPartitionManager.class */
public class DefaultPartitionManager<C extends BaseClusterContext, T extends MessageNotifier> implements PartitionManager<C, T> {
    static final int CODE_NO_ERROR = 0;
    static final int CODE_WRITE_DB_FAILURE = 440;
    static final int CODE_INVALID_GENERATION = 511;
    static final long ACTION_ID_NOOP = -1;
    static final long METRICS_SUBMISSION_PERIOD = 55000;
    private static final Logger logger = LoggerFactory.getLogger(DefaultPartitionManager.class);
    private final CoordinatorManager coordinatorManager;
    private final Coordinator coordinator;
    private final PartitionClientFactory<C, T> partitionClientFactory;
    private final C context;
    private final String version;
    private String ip;
    final Map<String, ActivePartition> activePartitionMap;
    private final List<T> notifiers;
    private final List<PartitionStateChangeListener> partitionStateChangeListeners;
    private final Time time;
    private int nextListenerIdx;
    private long actionId;
    private boolean submittingMetrics;
    private long lastMetricsSubmitMs;
    private ConsumerState state;
    private volatile String seq;
    private final BlockingQueue<Future> futuresOnStopped;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/taobao/drc/clusterclient/impl/DefaultPartitionManager$ActivePartition.class */
    public static class ActivePartition {
        private final PartitionManager partitionManager;
        private final String partition;
        private final List<PartitionStateChangeListener> stateChangeListeners;
        private final PartitionRef ref;
        private final Time time;
        private PartitionInfo partitionInfo;
        private long generation;
        private String initOffset;
        private PartitionState state;
        private long stateStartTime;
        private PartitionClient client;
        private long lastSuccessCommitAt;
        private long currentActionId;
        private String message;

        public ActivePartition(PartitionManager partitionManager, String str, Time time) {
            this(partitionManager, str, time, Collections.emptyList());
        }

        public ActivePartition(PartitionManager partitionManager, String str, Time time, List<PartitionStateChangeListener> list) {
            this.state = PartitionState.INIT;
            this.currentActionId = DefaultPartitionManager.ACTION_ID_NOOP;
            this.partitionManager = partitionManager;
            this.partition = str;
            this.time = time;
            this.stateChangeListeners = list;
            this.ref = new PartitionRef(partitionManager.getContext().getAppGuid(), partitionManager.getContext().getAppGroup(), str);
            setState(PartitionState.INIT);
        }

        public String getPartition() {
            return this.partition;
        }

        public PartitionRef getRef() {
            return this.ref;
        }

        public PartitionInfo getPartitionInfo() {
            return this.partitionInfo;
        }

        public void setPartitionInfo(PartitionInfo partitionInfo) {
            this.partitionInfo = partitionInfo;
        }

        public long getGeneration() {
            return this.generation;
        }

        public void setGeneration(long j) {
            this.generation = j;
        }

        public String getInitOffset() {
            return this.initOffset;
        }

        public void setInitOffset(String str) {
            this.initOffset = str;
        }

        public String offset() {
            if (this.client != null) {
                return this.client.offset();
            }
            return null;
        }

        public PartitionState getState() {
            return this.state;
        }

        public void setState(PartitionState partitionState) {
            this.state = partitionState;
            this.stateStartTime = this.time.millis();
            if (this.stateChangeListeners.isEmpty()) {
                return;
            }
            for (PartitionStateChangeListener partitionStateChangeListener : this.stateChangeListeners) {
                IPartition iPartition = DefaultPartitionManager.CODE_NO_ERROR;
                if (this.client != null) {
                    iPartition = this.client.getPartition();
                }
                partitionStateChangeListener.onStateChanged(getRef(), partitionState, iPartition);
            }
        }

        public long getStateStartTime() {
            return this.stateStartTime;
        }

        public long getLastSuccessCommitAt() {
            return this.lastSuccessCommitAt;
        }

        public void setLastSuccessCommitAt(long j) {
            this.lastSuccessCommitAt = j;
        }

        public void setCurrentActionId(long j) {
            this.currentActionId = j;
        }

        public long getCurrentActionId() {
            return this.currentActionId;
        }

        public PartitionClient getClient() {
            return this.client;
        }

        public long extendLease(long j) {
            long lastSuccessCommitAt = getLastSuccessCommitAt() + j;
            if (getClient() != null) {
                DefaultPartitionManager.logger.debug("To extend lease of partition [{}] to [{}]", getPartition(), Long.valueOf(lastSuccessCommitAt));
                getClient().getNotifyController().extendLeaseTo(lastSuccessCommitAt);
            } else {
                DefaultPartitionManager.logger.error("No client for partition {} to extend the lease", this);
            }
            return lastSuccessCommitAt;
        }

        public void setClient(PartitionClient partitionClient) {
            this.client = partitionClient;
        }

        public String getMessage(boolean z) {
            HashMap hashMap = new HashMap();
            if (this.client != null) {
                DefaultPartitionManager.logger.info("Partition {} metrics: {}", this.ref, this.client.getMetrics());
                if (z) {
                    hashMap.put("metrics", this.client.getMetrics());
                }
            }
            if (this.message != null) {
                hashMap.put("message", this.message);
            }
            return JSON.toJSONString(hashMap);
        }

        public void setMessage(String str) {
            this.message = str;
        }

        public String toString() {
            return "[" + this.partitionManager.getContext().getAppGuid() + "][" + this.partitionManager.getContext().getAppGroup() + "][" + getPartition() + "][" + getGeneration() + "][" + getState() + "]";
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/taobao/drc/clusterclient/impl/DefaultPartitionManager$Version.class */
    public static class Version implements Comparable<Version> {
        private static final Pattern VERSION_PATTERN = Pattern.compile("^(\\d+)\\.(\\d+)\\.(\\d+).*");
        private final int majorVersion;
        private final int minorVersion;
        private final int revisionNumber;

        public Version(String str) {
            Matcher matcher = VERSION_PATTERN.matcher(str);
            if (matcher.matches()) {
                this.majorVersion = Integer.parseInt(matcher.group(1));
                this.minorVersion = Integer.parseInt(matcher.group(2));
                this.revisionNumber = Integer.parseInt(matcher.group(3));
            } else {
                this.majorVersion = DefaultPartitionManager.CODE_NO_ERROR;
                this.minorVersion = DefaultPartitionManager.CODE_NO_ERROR;
                this.revisionNumber = DefaultPartitionManager.CODE_NO_ERROR;
            }
        }

        public int getMajorVersion() {
            return this.majorVersion;
        }

        public int getMinorVersion() {
            return this.minorVersion;
        }

        public int getRevisionNumber() {
            return this.revisionNumber;
        }

        @Override // java.lang.Comparable
        public int compareTo(Version version) {
            return this.majorVersion != version.majorVersion ? Integer.signum(this.majorVersion - version.majorVersion) : this.minorVersion != version.minorVersion ? Integer.signum(this.minorVersion - version.minorVersion) : this.revisionNumber != version.revisionNumber ? Integer.signum(this.revisionNumber - version.revisionNumber) : DefaultPartitionManager.CODE_NO_ERROR;
        }
    }

    public DefaultPartitionManager(CoordinatorManager coordinatorManager, PartitionClientFactory<C, T> partitionClientFactory, C c, String str, String str2, List<T> list, List<PartitionStateChangeListener> list2) {
        this(coordinatorManager, partitionClientFactory, c, str, str2, list, list2, Time.SYSTEM);
    }

    public DefaultPartitionManager(CoordinatorManager coordinatorManager, PartitionClientFactory<C, T> partitionClientFactory, C c, String str, String str2, List<T> list, List<PartitionStateChangeListener> list2, Time time) {
        this.activePartitionMap = new HashMap();
        this.nextListenerIdx = CODE_NO_ERROR;
        this.actionId = 0L;
        this.submittingMetrics = false;
        this.lastMetricsSubmitMs = 0L;
        this.state = ConsumerState.STARTING;
        this.futuresOnStopped = new LinkedBlockingDeque();
        this.coordinatorManager = coordinatorManager;
        this.coordinator = coordinatorManager.acquireCoordinator(c.getClusterUrl());
        this.partitionClientFactory = partitionClientFactory;
        this.context = c;
        this.version = str;
        this.ip = str2;
        this.notifiers = list;
        this.partitionStateChangeListeners = list2;
        this.time = time;
        logger.info("DefaultPartitionManager:version:[{}]", str);
    }

    @Override // com.taobao.drc.clusterclient.PartitionManager
    public ConsumerState getState() {
        return this.state;
    }

    @Override // com.taobao.drc.clusterclient.PartitionManager
    public void start() throws ExecutionException, InterruptedException {
        this.coordinator.register(this).get();
    }

    @Override // com.taobao.drc.clusterclient.PartitionManager
    public void stop() {
        this.futuresOnStopped.add(this.coordinator.runOnEventThread(new Runnable() { // from class: com.taobao.drc.clusterclient.impl.DefaultPartitionManager.1
            @Override // java.lang.Runnable
            public void run() {
                DefaultPartitionManager.logger.info("To stop partition manager [{}][{}]", DefaultPartitionManager.this.context.getAppGuid(), DefaultPartitionManager.this.context.getAppGroup());
                DefaultPartitionManager.this.state = ConsumerState.STOPPING;
                for (ActivePartition activePartition : DefaultPartitionManager.this.activePartitionMap.values()) {
                    switch (AnonymousClass4.$SwitchMap$com$taobao$drc$clusterclient$partition$PartitionState[activePartition.getState().ordinal()]) {
                        case 1:
                            Future maybeStopPartitionClient = DefaultPartitionManager.this.maybeStopPartitionClient(activePartition);
                            if (maybeStopPartitionClient != null) {
                                DefaultPartitionManager.this.futuresOnStopped.add(maybeStopPartitionClient);
                                break;
                            } else {
                                break;
                            }
                    }
                    activePartition.setState(PartitionState.STOPPING);
                }
            }
        }));
        this.futuresOnStopped.add(this.coordinator.deregister(this));
        this.futuresOnStopped.add(this.coordinatorManager.releaseCoordinator(this.coordinator));
    }

    private boolean isStopping() {
        return this.state == ConsumerState.STOPPING;
    }

    @Override // com.taobao.drc.clusterclient.PartitionManager
    public void waitForStop(long j) throws InterruptedException {
        long millis = this.time.millis();
        long millis2 = millis + TimeUnit.SECONDS.toMillis(j);
        Iterator it = this.futuresOnStopped.iterator();
        boolean z = true;
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            if (millis2 <= millis) {
                z = CODE_NO_ERROR;
                break;
            }
            try {
                ((Future) it.next()).get(millis2 - millis, TimeUnit.MILLISECONDS);
                it.remove();
            } catch (TimeoutException e) {
                z = CODE_NO_ERROR;
            } catch (Exception e2) {
                logger.error("Caught exception on stopping partition manager [{}][{}]", new Object[]{this.context.getAppGuid(), this.context.getAppGroup(), e2});
                it.remove();
            }
            millis = this.time.millis();
        }
        if (z) {
            logger.info("The partition manager [{}][{}][{}] has been stopped completely", new Object[]{this.context.getAppGuid(), this.context.getAppGroup(), this.seq});
        } else {
            logger.warn("The partition manager [{}][{}][{}] has not been stopped completely within [{}] seconds", new Object[]{this.context.getAppGuid(), this.context.getAppGroup(), this.seq, Long.valueOf(j)});
        }
    }

    @Override // com.taobao.drc.clusterclient.PartitionManager
    public C getContext() {
        return this.context;
    }

    @Override // com.taobao.drc.clusterclient.PartitionManager
    public T getNextListener() {
        T t;
        synchronized (this) {
            this.nextListenerIdx %= this.notifiers.size();
            List<T> list = this.notifiers;
            int i = this.nextListenerIdx;
            this.nextListenerIdx = i + 1;
            t = list.get(i);
        }
        return t;
    }

    private long getSessionTimeoutMs() {
        return this.coordinator.getSessionTimeoutMs();
    }

    /*  JADX ERROR: Failed to decode insn: 0x0005: MOVE_MULTI, method: com.taobao.drc.clusterclient.impl.DefaultPartitionManager.getNextActionId():long
        java.lang.ArrayIndexOutOfBoundsException: arraycopy: source index -1 out of bounds for object array[8]
        	at java.base/java.lang.System.arraycopy(Native Method)
        	at jadx.plugins.input.java.data.code.StackState.insert(StackState.java:49)
        	at jadx.plugins.input.java.data.code.CodeDecodeState.insert(CodeDecodeState.java:118)
        	at jadx.plugins.input.java.data.code.JavaInsnsRegister.dup2x1(JavaInsnsRegister.java:313)
        	at jadx.plugins.input.java.data.code.JavaInsnData.decode(JavaInsnData.java:46)
        	at jadx.core.dex.instructions.InsnDecoder.lambda$process$0(InsnDecoder.java:54)
        	at jadx.plugins.input.java.data.code.JavaCodeReader.visitInstructions(JavaCodeReader.java:81)
        	at jadx.core.dex.instructions.InsnDecoder.process(InsnDecoder.java:50)
        	at jadx.core.dex.nodes.MethodNode.load(MethodNode.java:156)
        	at jadx.core.dex.nodes.ClassNode.load(ClassNode.java:443)
        	at jadx.core.ProcessClass.process(ProcessClass.java:70)
        	at jadx.core.ProcessClass.generateCode(ProcessClass.java:113)
        	at jadx.core.dex.nodes.ClassNode.generateClassCode(ClassNode.java:400)
        	at jadx.core.dex.nodes.ClassNode.decompile(ClassNode.java:388)
        	at jadx.core.dex.nodes.ClassNode.getCode(ClassNode.java:338)
        */
    private long getNextActionId() {
        /*
            r8 = this;
            r0 = r8
            r1 = r0
            long r1 = r1.actionId
            // decode failed: arraycopy: source index -1 out of bounds for object array[8]
            r2 = 1
            long r1 = r1 + r2
            r0.actionId = r1
            return r-1
        */
        throw new UnsupportedOperationException("Method not decompiled: com.taobao.drc.clusterclient.impl.DefaultPartitionManager.getNextActionId():long");
    }

    private ActivePartition getActivePartition(String str) {
        return this.activePartitionMap.get(str);
    }

    private void startClient(final String str, final PartitionClient partitionClient, final long j) {
        this.coordinator.runOnIOPool(new Runnable() { // from class: com.taobao.drc.clusterclient.impl.DefaultPartitionManager.2
            @Override // java.lang.Runnable
            public void run() {
                try {
                    partitionClient.start();
                    DefaultPartitionManager.this.coordinator.runOnEventThread(new Runnable() { // from class: com.taobao.drc.clusterclient.impl.DefaultPartitionManager.2.1
                        @Override // java.lang.Runnable
                        public void run() {
                            DefaultPartitionManager.this.onClientStarted(str, j, partitionClient, null);
                        }
                    });
                } catch (Exception e) {
                    DefaultPartitionManager.this.coordinator.runOnEventThread(new Runnable() { // from class: com.taobao.drc.clusterclient.impl.DefaultPartitionManager.2.2
                        @Override // java.lang.Runnable
                        public void run() {
                            DefaultPartitionManager.this.onClientStarted(str, j, partitionClient, e);
                        }
                    });
                }
            }
        });
    }

    private Future stopClient(final String str, final PartitionClient partitionClient, final long j) {
        return this.coordinator.runOnIOPool(new Runnable() { // from class: com.taobao.drc.clusterclient.impl.DefaultPartitionManager.3
            @Override // java.lang.Runnable
            public void run() {
                try {
                    partitionClient.close();
                    DefaultPartitionManager.this.coordinator.runOnEventThread(new Runnable() { // from class: com.taobao.drc.clusterclient.impl.DefaultPartitionManager.3.1
                        @Override // java.lang.Runnable
                        public void run() {
                            DefaultPartitionManager.this.onClientStopped(str, j, null);
                        }
                    });
                } catch (Exception e) {
                    DefaultPartitionManager.this.coordinator.runOnEventThread(new Runnable() { // from class: com.taobao.drc.clusterclient.impl.DefaultPartitionManager.3.2
                        @Override // java.lang.Runnable
                        public void run() {
                            DefaultPartitionManager.this.onClientStopped(str, j, e);
                        }
                    });
                }
            }
        });
    }

    Future maybeStopPartitionClient(ActivePartition activePartition) {
        if (activePartition == null) {
            return Futures.success(null);
        }
        switch (activePartition.getState()) {
            case RUNNING:
                activePartition.getClient().getNotifyController().close();
                activePartition.setState(PartitionState.STOPPING);
                activePartition.setCurrentActionId(getNextActionId());
                return stopClient(activePartition.getPartition(), activePartition.getClient(), activePartition.getCurrentActionId());
            case INIT:
                activePartition.setState(PartitionState.STOPPED);
                break;
            case STARTING:
                activePartition.setCurrentActionId(ACTION_ID_NOOP);
                activePartition.setState(PartitionState.STOPPED);
                break;
        }
        return Futures.success(null);
    }

    private boolean isStateTimeout(ActivePartition activePartition) {
        return this.time.millis() - activePartition.getStateStartTime() > getSessionTimeoutMs();
    }

    private String getMessage(boolean z) {
        TreeMap treeMap = new TreeMap();
        ArrayList arrayList = new ArrayList();
        Iterator<T> it = this.notifiers.iterator();
        while (it.hasNext()) {
            arrayList.add(it.next().getMetrics());
        }
        logger.info("Notifier metrics for [{}][{}]: {}", new Object[]{this.context.getAppGuid(), this.context.getAppGroup(), arrayList});
        if (z) {
            treeMap.put("metrics", arrayList);
            logger.info("DefaultPartitionManager::getMessage: [{}][{}] return :[{}]", new Object[]{this.context.getAppGuid(), this.context.getAppGroup(), JSON.toJSONString(treeMap)});
        }
        return JSON.toJSONString(treeMap);
    }

    @Override // com.taobao.drc.clusterclient.PartitionManager
    public LocalConsumerStatus getLocalConsumerStatus() {
        if (this.state == ConsumerState.RUNNING) {
            this.submittingMetrics = this.time.millis() - this.lastMetricsSubmitMs > METRICS_SUBMISSION_PERIOD;
        } else if (this.state == ConsumerState.STOPPING) {
            this.submittingMetrics = true;
        }
        LocalConsumerStatus localConsumerStatus = new LocalConsumerStatus(this.context.getAppGuid(), this.context.getAppGroup(), this.context.getAppGroupUserName(), this.context.getAppGroupPassword(), this.version, this.ip, this.context.getMaxConns(), this.state.toString(), getMessage(this.submittingMetrics));
        ArrayList<ActivePartition> arrayList = new ArrayList();
        for (ActivePartition activePartition : this.activePartitionMap.values()) {
            switch (activePartition.getState()) {
                case RUNNING:
                    if (activePartition.getClient().isActive()) {
                        if (activePartition.getClient().getNotifyController().isClosed()) {
                            logger.error("The notify controller for {} is closed");
                            maybeStopPartitionClient(activePartition);
                            break;
                        }
                    } else {
                        logger.error("The client for {} is not active", activePartition);
                        maybeStopPartitionClient(activePartition);
                        break;
                    }
                    break;
                case STOPPED:
                    arrayList.add(activePartition);
                    continue;
                default:
                    if (isStateTimeout(activePartition)) {
                        logger.error("DefaultPartitionManager State timeout for partition {}: action started at [{}]", activePartition, Long.valueOf(activePartition.getStateStartTime()));
                        activePartition.setState(PartitionState.STOPPED);
                        break;
                    }
                    break;
            }
            String offset = activePartition.offset();
            if (offset == null) {
                logger.info("Ignore partition {} the offset of which is still null", activePartition);
            } else {
                localConsumerStatus.addPartition(new LocalPartitionStatus(activePartition.getPartition(), activePartition.getGeneration(), offset, activePartition.getState().name(), activePartition.getMessage(this.submittingMetrics)));
            }
        }
        for (ActivePartition activePartition2 : arrayList) {
            logger.info("Partition {} has been stopped completely, remove it from active list", activePartition2);
            this.activePartitionMap.remove(activePartition2.getPartition());
        }
        logger.info("DefaultPartitionManager::getLocalConsumerStatus:state:[{}],AppGuid:[{}],AppGroup:[{}],AppGroupUserName:[{}],version:[{}],ip:[{}],activePartitionMap size:[{}]", new Object[]{this.state, this.context.getAppGuid(), this.context.getAppGroup(), this.context.getAppGroupUserName(), this.version, this.ip, Integer.valueOf(this.activePartitionMap.size())});
        Iterator<ActivePartition> it = this.activePartitionMap.values().iterator();
        while (it.hasNext()) {
            logger.info("DefaultPartitionManager::getLocalConsumerStatus: activePartition:[{}]", it.next());
        }
        return localConsumerStatus;
    }

    private boolean isUpgradeRequired(String str, String str2) {
        if (str == null || str2 == null) {
            return false;
        }
        try {
            return new Version(str).compareTo(new Version(str2)) > 0;
        } catch (Exception e) {
            logger.warn("Invalid version, current [{}], expected [{}]", str2, str);
            return false;
        }
    }

    private int getActivePartitionNum() {
        return this.activePartitionMap.size();
    }

    @Override // com.taobao.drc.clusterclient.PartitionManager
    public Map<PartitionRef, Long> onCommitComplete(ExpectedConsumerStatus expectedConsumerStatus, Throwable th) {
        if (isStopping()) {
            return Collections.emptyMap();
        }
        long millis = this.time.millis();
        if (th != null) {
            for (ActivePartition activePartition : this.activePartitionMap.values()) {
                if (activePartition.getState() != PartitionState.RUNNING) {
                    logger.info("Failed to commit for partition {}, while the state of the partition is [{}]", activePartition, activePartition.getState());
                } else if (activePartition.getClient().getNotifyController().isValid()) {
                    logger.info("Failed to commit for partition {}, while the partition is still valid for consuming", activePartition);
                } else {
                    logger.warn("The running partition {} has not committed successfully since [{}], exceeded session timeout [{}] for [{}] ms", new Object[]{activePartition, Long.valueOf(activePartition.getLastSuccessCommitAt()), Long.valueOf(getSessionTimeoutMs()), Long.valueOf((millis - activePartition.getLastSuccessCommitAt()) - getSessionTimeoutMs())});
                    NotifyController notifyController = activePartition.getClient().getNotifyController();
                    if (notifyController.isClosed() || (!notifyController.isNotifying() && millis - notifyController.getLastNotifiedMs() >= getSessionTimeoutMs())) {
                        logger.warn("The partition {} is not notifying message, and no message has arrived since [{}], stop the client", activePartition, Long.valueOf(notifyController.getLastNotifiedMs()));
                        maybeStopPartitionClient(activePartition);
                    } else {
                        long sessionTimeoutMs = millis + getSessionTimeoutMs();
                        logger.info("The client for partition {} is still notifying messages, last notified at [{}], extends the lease to [{}]", new Object[]{activePartition, Long.valueOf(notifyController.getLastNotifiedMs()), Long.valueOf(sessionTimeoutMs)});
                        notifyController.extendLeaseTo(sessionTimeoutMs);
                    }
                }
            }
            return Collections.emptyMap();
        }
        logger.info("DefaultPartitionManager::onCommitComplete enter:expectedConsumerStatus.getExpectedPartitionStatusList:size:[{}]", Integer.valueOf(expectedConsumerStatus.getExpectedPartitionStatusList().size()));
        if (this.state == ConsumerState.STARTING && expectedConsumerStatus.isNewSeqAllocated()) {
            this.state = ConsumerState.RUNNING;
            this.seq = expectedConsumerStatus.getAllocatedSeq();
        }
        if (expectedConsumerStatus.getErrCode().intValue() != 0) {
            logger.error("Failed to commit for [{}][{}], error code [{}], msg: [{}]", new Object[]{this.context.getAppGuid(), this.context.getAppGroup(), expectedConsumerStatus.getErrCode(), expectedConsumerStatus.getErrMsg()});
            return Collections.emptyMap();
        }
        if (this.submittingMetrics) {
            this.lastMetricsSubmitMs = this.time.millis();
        }
        if (isUpgradeRequired(expectedConsumerStatus.getVersion(), this.version)) {
            logger.warn("Please update to the version [{}], current version is [{}]", expectedConsumerStatus.getVersion(), this.version);
        }
        if (expectedConsumerStatus.getIp() != null && !StringUtils.equals(expectedConsumerStatus.getIp(), this.ip)) {
            logger.info("Changed ip of [{}] from [{}] to [{}] according to response", new Object[]{expectedConsumerStatus.getSeq(), this.ip, expectedConsumerStatus.getIp()});
            this.ip = expectedConsumerStatus.getIp();
        }
        HashMap hashMap = new HashMap();
        for (ExpectedPartitionStatus expectedPartitionStatus : expectedConsumerStatus.getExpectedPartitionStatusList()) {
            ActivePartition activePartition2 = getActivePartition(expectedPartitionStatus.getPartition());
            switch (expectedPartitionStatus.getErrCode().intValue()) {
                case CODE_NO_ERROR /* 0 */:
                    break;
                case CODE_WRITE_DB_FAILURE /* 440 */:
                    logger.warn("Cluster manager failed to commit checkpoint of [{}][{}][{}] on [{}] to database", new Object[]{this.context.getAppGuid(), this.context.getAppGroup(), expectedPartitionStatus.getPartition(), expectedConsumerStatus.getSeq()});
                    break;
                case CODE_INVALID_GENERATION /* 511 */:
                    if (activePartition2 != null) {
                        handleInvalidGeneration(activePartition2, expectedPartitionStatus);
                        hashMap.put(activePartition2.getRef(), expectedPartitionStatus.getGeneration());
                        break;
                    }
                    break;
                default:
                    logger.warn("Got error for partition [{}][{}][{}][{}]: [{}][{}]", new Object[]{this.context.getAppGuid(), this.context.getAppGroup(), expectedPartitionStatus.getPartition(), expectedPartitionStatus.getGeneration(), expectedPartitionStatus.getErrCode(), expectedPartitionStatus.getErrMsg()});
                    maybeStopPartitionClient(activePartition2);
                    continue;
            }
            if (activePartition2 != null) {
                switch (activePartition2.getState()) {
                    case RUNNING:
                        activePartition2.extendLease(getSessionTimeoutMs());
                    default:
                        activePartition2.setLastSuccessCommitAt(millis);
                        break;
                }
            } else if (getActivePartitionNum() >= this.context.getMaxConns()) {
                logger.warn("DefaultPartitionManager Active partition number has reached [{}], do not start partition [{}][{}][{}][{}]", new Object[]{Integer.valueOf(this.context.getMaxConns()), this.context.getAppGuid(), this.context.getAppGroup(), expectedPartitionStatus.getPartition(), expectedPartitionStatus.getGeneration()});
            } else {
                logger.warn("onCommitComplete:Got new partition [{}][{}][{}] on [{}]: generation [{}], offset: [{}]", new Object[]{this.context.getAppGuid(), this.context.getAppGroup(), expectedPartitionStatus.getPartition(), this.seq, expectedPartitionStatus.getGeneration(), expectedPartitionStatus.getOffset()});
                activePartition2 = new ActivePartition(this, expectedPartitionStatus.getPartition(), this.time, this.partitionStateChangeListeners);
                activePartition2.setGeneration(expectedPartitionStatus.getGeneration().longValue());
                activePartition2.setInitOffset(expectedPartitionStatus.getOffset());
                this.activePartitionMap.put(expectedPartitionStatus.getPartition(), activePartition2);
                hashMap.put(activePartition2.getRef(), expectedPartitionStatus.getGeneration());
            }
            activePartition2.setLastSuccessCommitAt(millis);
        }
        return hashMap;
    }

    private void handleInvalidGeneration(ActivePartition activePartition, ExpectedPartitionStatus expectedPartitionStatus) {
        logger.warn("Generation of [{}][{}][{}] changed: [{}] -> [{}], offset: [{}]", new Object[]{this.context.getAppGuid(), this.context.getAppGroup(), expectedPartitionStatus.getPartition(), Long.valueOf(activePartition.getGeneration()), expectedPartitionStatus.getGeneration(), expectedPartitionStatus.getOffset()});
        switch (activePartition.getState()) {
            case RUNNING:
                maybeStopPartitionClient(activePartition);
                activePartition.setPartitionInfo(null);
                activePartition.setClient(null);
                break;
            default:
                activePartition.setCurrentActionId(ACTION_ID_NOOP);
                break;
        }
        activePartition.setGeneration(expectedPartitionStatus.getGeneration().longValue());
        activePartition.setInitOffset(expectedPartitionStatus.getOffset());
        activePartition.setState(PartitionState.INIT);
    }

    @Override // com.taobao.drc.clusterclient.PartitionManager
    public void onGetPartitionInfoComplete(Map<PartitionRef, PartitionInfo> map, Map<PartitionRef, Long> map2, Throwable th) {
        if (isStopping()) {
            return;
        }
        if (th != null) {
            logger.error("Failed to get partition info for {}", map2, th);
            for (Map.Entry<PartitionRef, Long> entry : map2.entrySet()) {
                ActivePartition activePartition = getActivePartition(entry.getKey().getPartition());
                if (activePartition.getGeneration() == entry.getValue().longValue() && activePartition.getState() == PartitionState.INIT) {
                    this.activePartitionMap.remove(activePartition.getPartition());
                    logger.error("Removed partition [{}][{}] due to getting partition info failure", new Object[]{activePartition.getPartition(), Long.valueOf(activePartition.getGeneration()), th});
                }
            }
            return;
        }
        for (Map.Entry<PartitionRef, PartitionInfo> entry2 : map.entrySet()) {
            Long l = map2.get(entry2.getKey());
            PartitionInfo value = entry2.getValue();
            ActivePartition activePartition2 = getActivePartition(value.getPartition());
            if (activePartition2 == null) {
                logger.warn("Got info of partition [{}][{}] while active partition not exists", value.getPartition(), l);
            } else if (l == null || activePartition2.getGeneration() != l.longValue()) {
                logger.warn("Got info of partition [{}] of generation [{}], while current generation of the partition is [{}]", new Object[]{value.getPartition(), l, Long.valueOf(activePartition2.getGeneration())});
            } else if (activePartition2.getPartitionInfo() != null) {
                logger.warn("Partition info has already been set for {}", activePartition2);
            } else if (value.getErrCode().intValue() != 0) {
                logger.error("Got invalid partition info [{}][{}] for [{}][{}]", new Object[]{value.getErrCode(), value.getErrMsg(), value.getPartition(), l});
                this.activePartitionMap.remove(activePartition2.getPartition());
            } else if (StringUtils.equals(value.getGuid(), this.context.getAppGuid())) {
                activePartition2.setPartitionInfo(value);
                try {
                    activePartition2.setClient(this.partitionClientFactory.create(this.context, value, getNextListener(), activePartition2.getInitOffset()));
                    activePartition2.setState(PartitionState.STARTING);
                    activePartition2.setCurrentActionId(getNextActionId());
                    startClient(activePartition2.getPartition(), activePartition2.getClient(), activePartition2.getCurrentActionId());
                } catch (Exception e) {
                    logger.error("Failed to create client for partition [{}], mark the partition as stopped", activePartition2, e);
                    activePartition2.setState(PartitionState.STOPPED);
                }
            } else {
                logger.error("Got invalid guid from the partition info [{}], expected [{}], group [{}]", new Object[]{value, this.context.getAppGuid(), this.context.getAppGroup()});
                this.activePartitionMap.remove(activePartition2.getPartition());
            }
        }
    }

    void onClientStarted(String str, long j, PartitionClient partitionClient, Throwable th) {
        logger.info("onClientStarted");
        if (th == null) {
            logger.info("Client for partition [{}][{}][{}] started, action id [{}]", new Object[]{this.context.getAppGuid(), this.context.getAppGroup(), str, Long.valueOf(j)});
        } else {
            logger.warn("Failed to start client for partition [{}][{}][{}], action id [{}]", new Object[]{this.context.getAppGuid(), this.context.getAppGroup(), str, Long.valueOf(j), th});
        }
        if (isStopping()) {
            if (th == null) {
                logger.warn("Client for partition [{}][{}][{}], action id [{}] started while the partition manager is not running", new Object[]{this.context.getAppGuid(), this.context.getAppGroup(), str, Long.valueOf(j)});
                stopClient(str, partitionClient, ACTION_ID_NOOP);
                return;
            }
            return;
        }
        ActivePartition activePartition = getActivePartition(str);
        if (activePartition == null) {
            if (th != null) {
                logger.warn("Client started failed while no active partition [{}][{}][{}] exists, action id [{}]", new Object[]{this.context.getAppGuid(), this.context.getAppGroup(), str, Long.valueOf(j), th});
                return;
            } else {
                logger.warn("Client started while no active partition [{}][{}][{}] exists, action id [{}]", new Object[]{this.context.getAppGuid(), this.context.getAppGroup(), str, Long.valueOf(j)});
                stopClient(str, partitionClient, ACTION_ID_NOOP);
                return;
            }
        }
        if (th != null) {
            if (activePartition.getCurrentActionId() != j) {
                logger.error("Failed to start client for [{}][{}][{}], action id [{}], while the current action id is [{}], ignore", new Object[]{this.context.getAppGuid(), this.context.getAppGroup(), str, Long.valueOf(j), Long.valueOf(activePartition.getCurrentActionId())});
                return;
            }
            logger.error("Failed to start client for partition {}", activePartition, th);
            activePartition.setCurrentActionId(ACTION_ID_NOOP);
            activePartition.setState(PartitionState.STOPPED);
            return;
        }
        switch (activePartition.getState()) {
            case STARTING:
                if (activePartition.getCurrentActionId() != j) {
                    logger.error("Client started for partition {}, but the action id does not match [{}]", activePartition, Long.valueOf(j));
                    stopClient(str, partitionClient, ACTION_ID_NOOP);
                    return;
                } else {
                    activePartition.setState(PartitionState.RUNNING);
                    activePartition.setClient(partitionClient);
                    activePartition.setCurrentActionId(ACTION_ID_NOOP);
                    activePartition.extendLease(getSessionTimeoutMs());
                    return;
                }
            default:
                logger.error("Client started for partition {}, but the partition is in an invalid state [{}]", activePartition, activePartition.getState());
                stopClient(str, partitionClient, j);
                activePartition.setState(PartitionState.STOPPING);
                return;
        }
    }

    void onClientStopped(String str, long j, Throwable th) {
        if (th == null) {
            logger.info("Client for partition [{}][{}][{}] stopped, action id [{}]", new Object[]{this.context.getAppGuid(), this.context.getAppGroup(), str, Long.valueOf(j)});
        } else {
            logger.warn("Failed to stop client for partition [{}][{}][{}], action id [{}]", new Object[]{this.context.getAppGuid(), this.context.getAppGroup(), str, Long.valueOf(j), th});
        }
        if (isStopping()) {
            return;
        }
        ActivePartition activePartition = getActivePartition(str);
        if (activePartition == null) {
            logger.warn("No active partition for [{}] exists, action id [{}]", str, Long.valueOf(j));
            return;
        }
        if (activePartition.getState() != PartitionState.STOPPING || activePartition.getCurrentActionId() != j) {
            logger.error("Partition [{}] is not in expected state, action id [{}]", activePartition, Long.valueOf(j));
            return;
        }
        activePartition.setState(PartitionState.STOPPED);
        activePartition.setCurrentActionId(ACTION_ID_NOOP);
        activePartition.setClient(null);
    }
}
