package com.taobao.hsf.notify.extend;

import com.ali.unit.rule.Router;
import com.taobao.eagleeye.EagleEye;
import com.taobao.hsf.notify.client.Binding;
import com.taobao.hsf.notify.client.MessageStatus;
import com.taobao.hsf.notify.client.message.Message;
import com.taobao.hsf.notify.client.message.MessageAccessor;
import com.taobao.notify.client.manager.ClientSubscriptionManager;
import com.taobao.notify.utils.LoggerPrefix;
import com.taobao.notify.utils.UniqId;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:lib/hsf-notify-client-3.2.2.jar:com/taobao/hsf/notify/extend/MessageWorker.class */
public class MessageWorker<T> {
    private static final Log logger = LogFactory.getLog(MessageWorker.class);
    private static final String LogPrefix = LoggerPrefix.makeLogPrefix(MessageWorker.class);
    private TypedMessageConverter<T> converter;
    private MessageProcessor<T> processor;
    private MessageRouteCallback<T> messageRoute = null;

    public void setMessageConverter(TypedMessageConverter<T> typedMessageConverter) {
        this.converter = typedMessageConverter;
    }

    public void setMessageProcessor(MessageProcessor<T> messageProcessor) {
        this.processor = messageProcessor;
    }

    public void setMessageRoute(MessageRouteCallback<T> messageRouteCallback) {
        this.messageRoute = messageRouteCallback;
    }

    /* JADX WARN: Finally extract failed */
    public void execute(Message message, MessageStatus messageStatus) {
        if (this.converter == null || this.processor == null) {
            logger.error(LogPrefix + "no corresponding converter or processor attach with " + getClass().getName());
            throw new RuntimeException("MessageWorker的Converter或者Processor不全部有值。");
        }
        ClassLoader classLoader = this.converter.getClass().getClassLoader();
        ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
        try {
            try {
                Thread.currentThread().setContextClassLoader(classLoader);
                T fromMessage = this.converter.fromMessage(message);
                Thread.currentThread().setContextClassLoader(contextClassLoader);
                ProcessStatus processStatus = new ProcessStatus();
                RouteUser routeUser = new RouteUser();
                try {
                    if (Router.isUnitMode()) {
                        String stringProperty = message.getStringProperty(Binding.BUYER);
                        if (stringProperty == null && ClientSubscriptionManager.getInstance().isBuyerSubscription(MessageAccessor.getMessage(message)) && this.messageRoute == null) {
                            logger.error(LogPrefix + "消息 t: " + message.getTopic() + " mt: " + message.getMessageType() + " tg: " + message.getTargetGroup() + " id:" + UniqId.getInstance().bytes2string(message.getMessageId()) + " buyer订阅，但没有设置 routerCheck，打回重投递");
                            messageStatus.setRollbackOnly();
                            messageStatus.setBuyId("-1");
                            messageStatus.setReason("路由错乱，打回重投递");
                            return;
                        }
                        if (!Router.isCenterUnit() || (Router.isCenterUnit() && this.messageRoute != null)) {
                            if (this.messageRoute != null) {
                                this.messageRoute.handleRouteUser(fromMessage, routeUser);
                            }
                            if (stringProperty == null) {
                                logger.warn(LogPrefix + "使用MessageRouteCallback设置的buyerid ：" + routeUser.getRouteUser());
                                stringProperty = routeUser.getRouteUser();
                            }
                            if (stringProperty != null && !Router.isInCurrentUnit(Long.valueOf(stringProperty).longValue())) {
                                messageStatus.setRollbackOnly();
                                messageStatus.setBuyId(stringProperty);
                                messageStatus.setReason("路由错乱，打回重投递");
                                logger.error(LogPrefix + "消息 t: " + message.getTopic() + " mt: " + message.getMessageType() + " tg: " + message.getTargetGroup() + " id:" + UniqId.getInstance().bytes2string(message.getMessageId()) + " b: " + stringProperty + "路由错乱，打回重投递");
                                String stringProperty2 = message.getStringProperty(Binding.BUYER);
                                if (stringProperty2 == null || "0".equals(stringProperty2.trim())) {
                                    EagleEye.attribute("b", stringProperty + "_y");
                                    return;
                                } else {
                                    EagleEye.attribute("b", stringProperty2 + "_y");
                                    return;
                                }
                            }
                        }
                    }
                    try {
                        try {
                            Thread.currentThread().setContextClassLoader(classLoader);
                            this.processor.process(fromMessage, processStatus);
                            Thread.currentThread().setContextClassLoader(contextClassLoader);
                            if (processStatus.isSuccess()) {
                                return;
                            }
                            messageStatus.setRollbackOnly();
                            messageStatus.setReason(processStatus.getReason());
                        } catch (Throwable th) {
                            Thread.currentThread().setContextClassLoader(contextClassLoader);
                            throw th;
                        }
                    } catch (RuntimeException e) {
                        logger.warn(LogPrefix + "message process exception: ", e);
                        throw new RuntimeException("消息处理发生异常:" + e.getMessage());
                    }
                } catch (Exception e2) {
                    logger.error(LogPrefix + "消息 t: " + message.getTopic() + " mt: " + message.getMessageType() + " tg: " + message.getTargetGroup() + " id:" + UniqId.getInstance().bytes2string(message.getMessageId()) + " 路由check出错:", e2);
                    messageStatus.setRollbackOnly();
                    if (0 != 0) {
                        messageStatus.setBuyId(null);
                    } else {
                        messageStatus.setBuyId("-1");
                    }
                    messageStatus.setReason("路由错乱，打回重投递");
                }
            } catch (Throwable th2) {
                Thread.currentThread().setContextClassLoader(contextClassLoader);
                throw th2;
            }
        } catch (RuntimeException e3) {
            logger.error(LogPrefix + "message converter exception: ", e3);
            throw new RuntimeException("消息转换出错:" + e3.getMessage());
        }
    }

    public String getMessageType() {
        if (this.converter != null) {
            return this.converter.getMessageType();
        }
        logger.warn(LogPrefix + "no corresponding converter attach with " + getClass().getName());
        return null;
    }

    public String getMessageTopic() {
        if (this.converter != null) {
            return this.converter.getMessageTopic();
        }
        logger.warn(LogPrefix + "no corresponding converter attach with " + getClass().getName());
        return null;
    }
}
