package com.we.kafka.client.consumer;

import com.we.kafka.client.consumer.inter.WeKakfaDataProcessing;
import com.we.kafka.client.init.KafkaConf;
import java.lang.Thread;
import java.util.Properties;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Pattern;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.annotation.Resource;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

@Service
/* loaded from: input_file:com/we/kafka/client/consumer/WeKafkaConsumer.class */
public class WeKafkaConsumer {
    public static final String NAME = "kafkaConsumer";
    public static Logger LOG = LoggerFactory.getLogger(WeKafkaConsumer.class);
    private static KafkaConsumer<byte[], byte[]> consumer = null;
    public static final Long TIMEOUT = 1000L;
    private AtomicBoolean started = new AtomicBoolean(false);
    private Thread selector = null;
    private Thread.UncaughtExceptionHandler uncaughtExceptionHandler = null;

    @Resource
    KafkaConf conf;

    @PostConstruct
    public void init() {
        Executors.newScheduledThreadPool(1).schedule(new Runnable() { // from class: com.we.kafka.client.consumer.WeKafkaConsumer.1
            @Override // java.lang.Runnable
            public void run() {
                Properties properties = new Properties();
                properties.put("bootstrap.servers", WeKafkaConsumer.this.conf.getKafkaCluster());
                properties.put("group.id", "metricDataWriter");
                properties.put("enable.auto.commit", "false");
                properties.put("session.timeout.ms", "30000");
                properties.put("request.timeout.ms", "35000");
                properties.put("heartbeat.interval.ms", "6000");
                properties.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
                properties.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
                KafkaConsumer unused = WeKafkaConsumer.consumer = new KafkaConsumer(properties);
                WeKafkaConsumer.LOG.info("kafka consumer start successfully.");
            }
        }, 10L, TimeUnit.SECONDS);
    }

    @PreDestroy
    public void destroy() {
        this.started.set(false);
        consumer.close();
        if (null != this.selector) {
            this.started.set(false);
        }
    }

    public void takeTopic(String str, WeKakfaDataProcessing weKakfaDataProcessing) {
        takeTopic(str, weKakfaDataProcessing, false, TIMEOUT);
    }

    public void takeTopic(String str, WeKakfaDataProcessing weKakfaDataProcessing, boolean z) {
        takeTopic(str, weKakfaDataProcessing, z, TIMEOUT);
    }

    public void takeTopic(String str, WeKakfaDataProcessing weKakfaDataProcessing, boolean z, Long l) {
        consumer.subscribe(Pattern.compile(str), new NoOpConsumerRebalanceListener());
        this.selector = new BaseConsumerSelector(consumer, l, z, weKakfaDataProcessing, NAME);
        this.uncaughtExceptionHandler = (thread, th) -> {
            LOG.warn("uncaught exception, restart kafkaConsumer.", th);
            this.selector = new BaseConsumerSelector(consumer, l, z, weKakfaDataProcessing, NAME);
            this.selector.setUncaughtExceptionHandler(this.uncaughtExceptionHandler);
            this.selector.start();
        };
        this.selector.setUncaughtExceptionHandler(this.uncaughtExceptionHandler);
        this.selector.start();
    }
}
