/*
 * Decompiled with CFR 0.152.
 */
package org.apache.rocketmq.broker.transaction.queue;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.rocketmq.broker.transaction.AbstractTransactionalMessageCheckListener;
import org.apache.rocketmq.broker.transaction.OperationResult;
import org.apache.rocketmq.broker.transaction.TransactionalMessageService;
import org.apache.rocketmq.broker.transaction.queue.GetResult;
import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageBridge;
import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageUtil;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.consumer.PullStatus;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.header.EndTransactionRequestHeader;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.store.MessageExtBrokerInner;
import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.PutMessageStatus;

public class TransactionalMessageServiceImpl
implements TransactionalMessageService {
    private static final InternalLogger log = InternalLoggerFactory.getLogger((String)"RocketmqTransaction");
    private TransactionalMessageBridge transactionalMessageBridge;
    private static final int PULL_MSG_RETRY_NUMBER = 1;
    private static final int MAX_PROCESS_TIME_LIMIT = 60000;
    private static final int MAX_RETRY_COUNT_WHEN_HALF_NULL = 1;
    private ConcurrentHashMap<MessageQueue, MessageQueue> opQueueMap = new ConcurrentHashMap();

    public TransactionalMessageServiceImpl(TransactionalMessageBridge transactionBridge) {
        this.transactionalMessageBridge = transactionBridge;
    }

    @Override
    public PutMessageResult prepareMessage(MessageExtBrokerInner messageInner) {
        return this.transactionalMessageBridge.putHalfMessage(messageInner);
    }

    private boolean needDiscard(MessageExt msgExt, int transactionCheckMax) {
        String checkTimes = msgExt.getProperty("TRANSACTION_CHECK_TIMES");
        int checkTime = 1;
        if (null != checkTimes) {
            checkTime = this.getInt(checkTimes);
            if (checkTime >= transactionCheckMax) {
                return true;
            }
            ++checkTime;
        }
        msgExt.putUserProperty("TRANSACTION_CHECK_TIMES", String.valueOf(checkTime));
        return false;
    }

    private boolean needSkip(MessageExt msgExt) {
        long valueOfCurrentMinusBorn = System.currentTimeMillis() - msgExt.getBornTimestamp();
        if (valueOfCurrentMinusBorn > (long)this.transactionalMessageBridge.getBrokerController().getMessageStoreConfig().getFileReservedTime() * 3600L * 1000L) {
            log.info("Half message exceed file reserved time ,so skip it.messageId {},bornTime {}", (Object)msgExt.getMsgId(), (Object)msgExt.getBornTimestamp());
            return true;
        }
        return false;
    }

    private boolean putBackHalfMsgQueue(MessageExt msgExt, long offset) {
        PutMessageResult putMessageResult = this.putBackToHalfQueueReturnResult(msgExt);
        if (putMessageResult != null && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
            msgExt.setQueueOffset(putMessageResult.getAppendMessageResult().getLogicsOffset());
            msgExt.setCommitLogOffset(putMessageResult.getAppendMessageResult().getWroteOffset());
            msgExt.setMsgId(putMessageResult.getAppendMessageResult().getMsgId());
            log.info("Send check message, the offset={} restored in queueOffset={} commitLogOffset={} newMsgId={} realMsgId={} topic={}", new Object[]{offset, msgExt.getQueueOffset(), msgExt.getCommitLogOffset(), msgExt.getMsgId(), msgExt.getUserProperty("UNIQ_KEY"), msgExt.getTopic()});
            return true;
        }
        log.error("PutBackToHalfQueueReturnResult write failed, topic: {}, queueId: {}, msgId: {}", new Object[]{msgExt.getTopic(), msgExt.getQueueId(), msgExt.getMsgId()});
        return false;
    }

    @Override
    public void check(long transactionTimeout, int transactionCheckMax, AbstractTransactionalMessageCheckListener listener) {
        try {
            String topic = "RMQ_SYS_TRANS_HALF_TOPIC";
            Set<MessageQueue> msgQueues = this.transactionalMessageBridge.fetchMessageQueues(topic);
            if (msgQueues == null || msgQueues.size() == 0) {
                log.warn("The queue of topic is empty :" + topic);
                return;
            }
            log.info("Check topic={}, queues={}", (Object)topic, msgQueues);
            for (MessageQueue messageQueue : msgQueues) {
                long newOpOffset;
                long startTime = System.currentTimeMillis();
                MessageQueue opQueue = this.getOpQueue(messageQueue);
                long halfOffset = this.transactionalMessageBridge.fetchConsumeOffset(messageQueue);
                long opOffset = this.transactionalMessageBridge.fetchConsumeOffset(opQueue);
                log.info("Before check, the queue={} msgOffset={} opOffset={}", new Object[]{messageQueue, halfOffset, opOffset});
                if (halfOffset < 0L || opOffset < 0L) {
                    log.error("MessageQueue: {} illegal offset read: {}, op offset: {},skip this queue", new Object[]{messageQueue, halfOffset, opOffset});
                    continue;
                }
                HashMap<Long, Long> removeMap = new HashMap<Long, Long>();
                ArrayList<Long> doneOpOffset = new ArrayList<Long>();
                PullResult pullResult = this.fillOpRemoveMap(removeMap, opQueue, opOffset, halfOffset, doneOpOffset);
                if (null == pullResult) {
                    log.error("The queue={} check msgOffset={} with opOffset={} failed, pullResult is null", new Object[]{messageQueue, halfOffset, opOffset});
                    continue;
                }
                int getMessageNullCount = 1;
                long newOffset = halfOffset;
                long i = halfOffset;
                while (true) {
                    if (System.currentTimeMillis() - startTime > 60000L) {
                        log.info("Queue={} process time reach max={}", (Object)messageQueue, (Object)60000);
                        break;
                    }
                    if (removeMap.containsKey(i)) {
                        log.info("Half offset {} has been committed/rolled back", (Object)i);
                        removeMap.remove(i);
                    } else {
                        List opMsg;
                        boolean isNeedCheck;
                        GetResult getResult = this.getHalfMsg(messageQueue, i);
                        MessageExt msgExt = getResult.getMsg();
                        if (msgExt == null) {
                            if (getMessageNullCount++ > 1) break;
                            if (getResult.getPullResult().getPullStatus() == PullStatus.NO_NEW_MSG) {
                                log.info("No new msg, the miss offset={} in={}, continue check={}, pull result={}", new Object[]{i, messageQueue, getMessageNullCount, getResult.getPullResult()});
                                break;
                            }
                            log.info("Illegal offset, the miss offset={} in={}, continue check={}, pull result={}", new Object[]{i, messageQueue, getMessageNullCount, getResult.getPullResult()});
                            newOffset = i = getResult.getPullResult().getNextBeginOffset();
                            continue;
                        }
                        if (this.needDiscard(msgExt, transactionCheckMax) || this.needSkip(msgExt)) {
                            listener.resolveDiscardMsg(msgExt);
                            newOffset = i + 1L;
                            ++i;
                            continue;
                        }
                        if (msgExt.getStoreTimestamp() >= startTime) {
                            log.info("Fresh stored. the miss offset={}, check it later, store={}", (Object)i, (Object)new Date(msgExt.getStoreTimestamp()));
                            break;
                        }
                        long valueOfCurrentMinusBorn = System.currentTimeMillis() - msgExt.getBornTimestamp();
                        long checkImmunityTime = transactionTimeout;
                        String checkImmunityTimeStr = msgExt.getUserProperty("CHECK_IMMUNITY_TIME_IN_SECONDS");
                        if (null != checkImmunityTimeStr) {
                            checkImmunityTime = this.getImmunityTime(checkImmunityTimeStr, transactionTimeout);
                            if (valueOfCurrentMinusBorn < checkImmunityTime && this.checkPrepareQueueOffset(removeMap, doneOpOffset, msgExt)) {
                                newOffset = i + 1L;
                                ++i;
                                continue;
                            }
                        } else if (0L <= valueOfCurrentMinusBorn && valueOfCurrentMinusBorn < checkImmunityTime) {
                            log.info("New arrived, the miss offset={}, check it later checkImmunity={}, born={}", new Object[]{i, checkImmunityTime, new Date(msgExt.getBornTimestamp())});
                            break;
                        }
                        boolean bl = isNeedCheck = (opMsg = pullResult.getMsgFoundList()) == null && valueOfCurrentMinusBorn > checkImmunityTime || opMsg != null && ((MessageExt)opMsg.get(opMsg.size() - 1)).getBornTimestamp() - startTime > transactionTimeout || valueOfCurrentMinusBorn <= -1L;
                        if (isNeedCheck) {
                            if (!this.putBackHalfMsgQueue(msgExt, i)) continue;
                            listener.resolveHalfMsg(msgExt);
                        } else {
                            pullResult = this.fillOpRemoveMap(removeMap, opQueue, pullResult.getNextBeginOffset(), halfOffset, doneOpOffset);
                            log.info("The miss offset:{} in messageQueue:{} need to get more opMsg, result is:{}", new Object[]{i, messageQueue, pullResult});
                            continue;
                        }
                    }
                    newOffset = i + 1L;
                    ++i;
                }
                if (newOffset != halfOffset) {
                    this.transactionalMessageBridge.updateConsumeOffset(messageQueue, newOffset);
                }
                if ((newOpOffset = this.calculateOpOffset(doneOpOffset, opOffset)) == opOffset) continue;
                this.transactionalMessageBridge.updateConsumeOffset(opQueue, newOpOffset);
            }
        }
        catch (Exception e) {
            e.printStackTrace();
            log.error("Check error", (Throwable)e);
        }
    }

    private long getImmunityTime(String checkImmunityTimeStr, long transactionTimeout) {
        long checkImmunityTime = this.getLong(checkImmunityTimeStr);
        checkImmunityTime = -1L == checkImmunityTime ? transactionTimeout : (checkImmunityTime *= 1000L);
        return checkImmunityTime;
    }

    private PullResult fillOpRemoveMap(HashMap<Long, Long> removeMap, MessageQueue opQueue, long pullOffsetOfOp, long miniOffset, List<Long> doneOpOffset) {
        PullResult pullResult = this.pullOpMsg(opQueue, pullOffsetOfOp, 32);
        if (null == pullResult) {
            return null;
        }
        if (pullResult.getPullStatus() == PullStatus.OFFSET_ILLEGAL || pullResult.getPullStatus() == PullStatus.NO_MATCHED_MSG) {
            log.warn("The miss op offset={} in queue={} is illegal, pullResult={}", new Object[]{pullOffsetOfOp, opQueue, pullResult});
            this.transactionalMessageBridge.updateConsumeOffset(opQueue, pullResult.getNextBeginOffset());
            return pullResult;
        }
        if (pullResult.getPullStatus() == PullStatus.NO_NEW_MSG) {
            log.warn("The miss op offset={} in queue={} is NO_NEW_MSG, pullResult={}", new Object[]{pullOffsetOfOp, opQueue, pullResult});
            return pullResult;
        }
        List opMsg = pullResult.getMsgFoundList();
        if (opMsg == null) {
            log.warn("The miss op offset={} in queue={} is empty, pullResult={}", new Object[]{pullOffsetOfOp, opQueue, pullResult});
            return pullResult;
        }
        for (MessageExt opMessageExt : opMsg) {
            Long queueOffset = this.getLong(new String(opMessageExt.getBody(), TransactionalMessageUtil.charset));
            log.info("Topic: {} tags: {}, OpOffset: {}, HalfOffset: {}", new Object[]{opMessageExt.getTopic(), opMessageExt.getTags(), opMessageExt.getQueueOffset(), queueOffset});
            if ("d".equals(opMessageExt.getTags())) {
                if (queueOffset < miniOffset) {
                    doneOpOffset.add(opMessageExt.getQueueOffset());
                    continue;
                }
                removeMap.put(queueOffset, opMessageExt.getQueueOffset());
                continue;
            }
            log.error("Found a illegal tag in opMessageExt= {} ", (Object)opMessageExt);
        }
        log.debug("Remove map: {}", removeMap);
        log.debug("Done op list: {}", doneOpOffset);
        return pullResult;
    }

    private boolean checkPrepareQueueOffset(HashMap<Long, Long> removeMap, List<Long> doneOpOffset, MessageExt msgExt) {
        String prepareQueueOffsetStr = msgExt.getUserProperty("TRAN_PREPARED_QUEUE_OFFSET");
        if (null == prepareQueueOffsetStr) {
            return this.putImmunityMsgBackToHalfQueue(msgExt);
        }
        long prepareQueueOffset = this.getLong(prepareQueueOffsetStr);
        if (-1L == prepareQueueOffset) {
            return false;
        }
        if (removeMap.containsKey(prepareQueueOffset)) {
            long tmpOpOffset = removeMap.remove(prepareQueueOffset);
            doneOpOffset.add(tmpOpOffset);
            return true;
        }
        return this.putImmunityMsgBackToHalfQueue(msgExt);
    }

    private PutMessageResult putBackToHalfQueueReturnResult(MessageExt messageExt) {
        PutMessageResult putMessageResult = null;
        try {
            MessageExtBrokerInner msgInner = this.transactionalMessageBridge.renewHalfMessageInner(messageExt);
            putMessageResult = this.transactionalMessageBridge.putMessageReturnResult(msgInner);
        }
        catch (Exception e) {
            log.warn("PutBackToHalfQueueReturnResult error", (Throwable)e);
        }
        return putMessageResult;
    }

    private boolean putImmunityMsgBackToHalfQueue(MessageExt messageExt) {
        MessageExtBrokerInner msgInner = this.transactionalMessageBridge.renewImmunityHalfMessageInner(messageExt);
        return this.transactionalMessageBridge.putMessage(msgInner);
    }

    private PullResult pullHalfMsg(MessageQueue mq, long offset, int nums) {
        return this.transactionalMessageBridge.getHalfMessage(mq.getQueueId(), offset, nums);
    }

    private PullResult pullOpMsg(MessageQueue mq, long offset, int nums) {
        return this.transactionalMessageBridge.getOpMessage(mq.getQueueId(), offset, nums);
    }

    private Long getLong(String s) {
        long v = -1L;
        try {
            v = Long.valueOf(s);
        }
        catch (Exception e) {
            log.error("GetLong error", (Throwable)e);
        }
        return v;
    }

    private Integer getInt(String s) {
        int v = -1;
        try {
            v = Integer.valueOf(s);
        }
        catch (Exception e) {
            log.error("GetInt error", (Throwable)e);
        }
        return v;
    }

    private long calculateOpOffset(List<Long> doneOffset, long oldOffset) {
        Collections.sort(doneOffset);
        long newOffset = oldOffset;
        for (int i = 0; i < doneOffset.size() && doneOffset.get(i) == newOffset; ++newOffset, ++i) {
        }
        return newOffset;
    }

    private MessageQueue getOpQueue(MessageQueue messageQueue) {
        MessageQueue opQueue = this.opQueueMap.get(messageQueue);
        if (opQueue == null) {
            opQueue = new MessageQueue(TransactionalMessageUtil.buildOpTopic(), messageQueue.getBrokerName(), messageQueue.getQueueId());
            this.opQueueMap.put(messageQueue, opQueue);
        }
        return opQueue;
    }

    private GetResult getHalfMsg(MessageQueue messageQueue, long offset) {
        GetResult getResult = new GetResult();
        PullResult result = this.pullHalfMsg(messageQueue, offset, 1);
        getResult.setPullResult(result);
        List messageExts = result.getMsgFoundList();
        if (messageExts == null) {
            return getResult;
        }
        getResult.setMsg((MessageExt)messageExts.get(0));
        return getResult;
    }

    private OperationResult getHalfMessageByOffset(long commitLogOffset) {
        OperationResult response = new OperationResult();
        MessageExt messageExt = this.transactionalMessageBridge.lookMessageByOffset(commitLogOffset);
        if (messageExt != null) {
            response.setPrepareMessage(messageExt);
            response.setResponseCode(0);
        } else {
            response.setResponseCode(1);
            response.setResponseRemark("Find prepared transaction message failed");
        }
        return response;
    }

    @Override
    public boolean deletePrepareMessage(MessageExt msgExt) {
        if (this.transactionalMessageBridge.putOpMessage(msgExt, "d")) {
            log.info("Transaction op message write successfully. messageId={}, queueId={} msgExt:{}", new Object[]{msgExt.getMsgId(), msgExt.getQueueId(), msgExt});
            return true;
        }
        log.error("Transaction op message write failed. messageId is {}, queueId is {}", (Object)msgExt.getMsgId(), (Object)msgExt.getQueueId());
        return false;
    }

    @Override
    public OperationResult commitMessage(EndTransactionRequestHeader requestHeader) {
        return this.getHalfMessageByOffset(requestHeader.getCommitLogOffset());
    }

    @Override
    public OperationResult rollbackMessage(EndTransactionRequestHeader requestHeader) {
        return this.getHalfMessageByOffset(requestHeader.getCommitLogOffset());
    }

    @Override
    public boolean open() {
        return true;
    }

    @Override
    public void close() {
    }
}

