package com.we.kafka.client.consumer;

import com.stumbleupon.async.Deferred;
import com.we.kafka.client.consumer.inter.WeKakfaDataProcessing;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/we/kafka/client/consumer/BaseConsumerSelector.class */
public class BaseConsumerSelector extends Thread {
    public static Logger LOG = LoggerFactory.getLogger(BaseConsumerSelector.class);
    private static final Long MIN_SLEEP_TIME = 1L;
    private static AtomicLong sleepTime = new AtomicLong(MIN_SLEEP_TIME.longValue());
    private static final Long MAX_SLEEP_TIME = 10000L;
    private KafkaConsumer<byte[], byte[]> consumer;
    private Long timeout;
    private AtomicBoolean started;
    private WeKakfaDataProcessing procesing;

    public BaseConsumerSelector(KafkaConsumer<byte[], byte[]> kafkaConsumer, WeKakfaDataProcessing weKakfaDataProcessing, String str) {
        this.timeout = 1000L;
        this.started = new AtomicBoolean(false);
        this.consumer = kafkaConsumer;
        this.procesing = weKakfaDataProcessing;
        super.setName(str);
    }

    public BaseConsumerSelector(KafkaConsumer<byte[], byte[]> kafkaConsumer, Long l, WeKakfaDataProcessing weKakfaDataProcessing, String str) {
        this.timeout = 1000L;
        this.started = new AtomicBoolean(false);
        this.consumer = kafkaConsumer;
        this.timeout = l;
        this.procesing = weKakfaDataProcessing;
        super.setName(str);
    }

    public BaseConsumerSelector(KafkaConsumer<byte[], byte[]> kafkaConsumer, boolean z, WeKakfaDataProcessing weKakfaDataProcessing, String str) {
        this.timeout = 1000L;
        this.started = new AtomicBoolean(false);
        this.consumer = kafkaConsumer;
        this.started = new AtomicBoolean(z);
        this.procesing = weKakfaDataProcessing;
        super.setName(str);
    }

    public BaseConsumerSelector(KafkaConsumer<byte[], byte[]> kafkaConsumer, Long l, boolean z, WeKakfaDataProcessing weKakfaDataProcessing, String str) {
        this.timeout = 1000L;
        this.started = new AtomicBoolean(false);
        this.consumer = kafkaConsumer;
        this.timeout = l;
        this.started = new AtomicBoolean(z);
        this.procesing = weKakfaDataProcessing;
        super.setName(str);
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        this.started.set(true);
        while (this.started.get()) {
            try {
                ConsumerRecords poll = this.consumer.poll(this.timeout.longValue());
                if (!poll.isEmpty()) {
                    try {
                        ArrayList arrayList = new ArrayList();
                        Iterator it = poll.iterator();
                        while (it.hasNext()) {
                            arrayList.add(this.procesing.onReceive((byte[]) ((ConsumerRecord) it.next()).value()));
                        }
                        Deferred.group(arrayList).addCallback(arrayList2 -> {
                            this.consumer.commitSync();
                            int i = 0;
                            Iterator it2 = arrayList2.iterator();
                            while (it2.hasNext()) {
                                i += ((Integer) it2.next()).intValue();
                            }
                            return Integer.valueOf(i);
                        }).addErrback(exc -> {
                            LOG.warn("consume fail, try to deal next poll.", exc);
                            return 0;
                        }).joinUninterruptibly(MAX_SLEEP_TIME.longValue());
                    } catch (Exception e) {
                        LOG.error("except in consume.", e);
                    }
                }
                Thread.sleep(sleepTime.get());
            } catch (Exception e2) {
                LOG.warn("except in consume loop.", e2);
            }
        }
    }

    public KafkaConsumer<byte[], byte[]> getConsumer() {
        return this.consumer;
    }

    public Long getTimeout() {
        return this.timeout;
    }

    public AtomicBoolean getStarted() {
        return this.started;
    }

    public WeKakfaDataProcessing getProcesing() {
        return this.procesing;
    }

    public void setConsumer(KafkaConsumer<byte[], byte[]> kafkaConsumer) {
        this.consumer = kafkaConsumer;
    }

    public void setTimeout(Long l) {
        this.timeout = l;
    }

    public void setStarted(AtomicBoolean atomicBoolean) {
        this.started = atomicBoolean;
    }

    public void setProcesing(WeKakfaDataProcessing weKakfaDataProcessing) {
        this.procesing = weKakfaDataProcessing;
    }
}
