package com.zkhy.teach.mq.starter;

import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.Consumer;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.zkhy.teach.config.properties.BaseCoreProperties;
import com.zkhy.teach.config.properties.RocketMqProperties;
import com.zkhy.teach.mq.annotation.EnableRocketMq;
import com.zkhy.teach.mq.annotation.RocketMqListener;
import com.zkhy.teach.util.PubUtils;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;
import javax.annotation.PostConstruct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;

@Configuration
/* loaded from: input_file:com/zkhy/teach/mq/starter/RocketMqConsumer.class */
public class RocketMqConsumer {
    private static final Logger log = LoggerFactory.getLogger(RocketMqConsumer.class);

    @Autowired
    public ApplicationContext applicationContext;
    private volatile boolean initConsumer = false;

    @Autowired
    private BaseCoreProperties baseCoreProperties;
    private Map<String, Consumer> consumerMap;

    public RocketMqConsumer() {
    }

    public RocketMqConsumer(ApplicationContext applicationContext, RocketMqProperties rocketMqProperties) {
        this.applicationContext = applicationContext;
    }

    @PostConstruct
    public synchronized void start() throws Exception {
        if (this.initConsumer) {
            log.warn("请不要重复初始化RocketMQ消费者");
        } else {
            initializeConsumer();
            this.initConsumer = true;
        }
    }

    private void initializeConsumer() {
        initCommonConsumer();
    }

    private void initCommonConsumer() {
        RocketMqProperties rocketMq = this.baseCoreProperties.getRocketMq();
        Map beansWithAnnotation = this.applicationContext.getBeansWithAnnotation(EnableRocketMq.class);
        if (PubUtils.isNotNull(new Object[]{beansWithAnnotation})) {
            for (Map.Entry entry : beansWithAnnotation.entrySet()) {
                String str = (String) entry.getKey();
                Arrays.stream(entry.getValue().getClass().getDeclaredMethods()).filter(method -> {
                    return method.isAnnotationPresent(RocketMqListener.class);
                }).peek(method2 -> {
                    commonConsumerActive(rocketMq, method2, this.applicationContext.getBean(str));
                }).collect(Collectors.toList());
            }
        }
    }

    private void commonConsumerActive(RocketMqProperties rocketMqProperties, Method method, Object obj) {
        RocketMqListener rocketMqListener = (RocketMqListener) method.getAnnotation(RocketMqListener.class);
        Properties properties = new Properties();
        Environment environment = this.applicationContext.getEnvironment();
        properties.put("GROUP_ID", environment.getProperty(rocketMqListener.consumerGroup()));
        properties.put("AccessKey", rocketMqProperties.getAccessKey());
        properties.put("SecretKey", rocketMqProperties.getSecretKey());
        properties.put("NAMESRV_ADDR", rocketMqProperties.getNameServerAddr());
        properties.put("MessageModel", rocketMqListener.messageModel());
        Consumer createConsumer = ONSFactory.createConsumer(properties);
        createConsumer.subscribe(environment.getProperty(rocketMqListener.topic()), rocketMqListener.tag(), (message, consumeContext) -> {
            try {
                method.invoke(obj, message);
                return Action.CommitMessage;
            } catch (IllegalAccessException e) {
                log.error("message [{}] has IllegalAccessException, reconsume times [{}]", message.getMsgID(), Integer.valueOf(message.getReconsumeTimes()));
                return Action.ReconsumeLater;
            } catch (InvocationTargetException e2) {
                log.error("message [{}] has InvocationTargetException, reconsume times [{}]", message.getMsgID(), Integer.valueOf(message.getReconsumeTimes()));
                return Action.ReconsumeLater;
            } catch (Exception e3) {
                log.error("message [{}] has Exception, reconsume times [{}]", new Object[]{message.getMsgID(), Integer.valueOf(message.getReconsumeTimes()), e3});
                return Action.ReconsumeLater;
            }
        });
        createConsumer.start();
    }
}
