package com.maihaoche.starter.mq.base;

import com.google.gson.Gson;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.Iterator;
import java.util.List;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.common.message.MessageExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/maihaoche/starter/mq/base/AbstractMQPushConsumer.class */
public abstract class AbstractMQPushConsumer<T> {
    private static final Logger log = LoggerFactory.getLogger(AbstractMQPushConsumer.class);
    private static Gson gson = new Gson();

    public abstract boolean process(T t);

    public ConsumeConcurrentlyStatus dealMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
        Iterator<MessageExt> it = list.iterator();
        while (it.hasNext()) {
            T parseMessage = parseMessage(it.next());
            if (null != parseMessage && !process(parseMessage)) {
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }

    private T parseMessage(MessageExt messageExt) {
        if (messageExt == null || messageExt.getBody() == null) {
            return null;
        }
        Type messageType = getMessageType();
        if (messageType instanceof Class) {
            return (T) gson.fromJson(new String(messageExt.getBody()), messageType);
        }
        log.warn("Parse msg error. {}", messageExt);
        return null;
    }

    private Type getMessageType() {
        Type genericSuperclass = getClass().getGenericSuperclass();
        if (genericSuperclass instanceof ParameterizedType) {
            return ((ParameterizedType) genericSuperclass).getActualTypeArguments()[0];
        }
        throw new RuntimeException("Unkown parameterized type.");
    }
}
