package com.ctrip.framework.apollo.biz.message;

import com.ctrip.framework.apollo.biz.entity.ReleaseMessage;
import com.ctrip.framework.apollo.biz.repository.ReleaseMessageRepository;
import com.ctrip.framework.apollo.core.utils.ApolloThreadFactory;
import com.ctrip.framework.apollo.tracer.Tracer;
import com.ctrip.framework.apollo.tracer.spi.Transaction;
import com.google.common.collect.Queues;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.PostConstruct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;

@Component
/* loaded from: input_file:com/ctrip/framework/apollo/biz/message/DatabaseMessageSender.class */
public class DatabaseMessageSender implements MessageSender {
    private static final Logger logger = LoggerFactory.getLogger(DatabaseMessageSender.class);
    private static final int CLEAN_QUEUE_MAX_SIZE = 100;
    private BlockingQueue<Long> toClean = Queues.newLinkedBlockingQueue(CLEAN_QUEUE_MAX_SIZE);
    private final ExecutorService cleanExecutorService = Executors.newSingleThreadExecutor(ApolloThreadFactory.create("DatabaseMessageSender", true));
    private final AtomicBoolean cleanStopped = new AtomicBoolean(false);

    @Autowired
    private ReleaseMessageRepository releaseMessageRepository;

    @Override // com.ctrip.framework.apollo.biz.message.MessageSender
    @Transactional
    public void sendMessage(String str, String str2) {
        logger.info("Sending message {} to channel {}", str, str2);
        if (!Objects.equals(str2, Topics.APOLLO_RELEASE_TOPIC)) {
            logger.warn("Channel {} not supported by DatabaseMessageSender!");
            return;
        }
        Tracer.logEvent("Apollo.AdminService.ReleaseMessage", str);
        Transaction newTransaction = Tracer.newTransaction("Apollo.AdminService", "sendMessage");
        try {
            try {
                this.toClean.offer(Long.valueOf(((ReleaseMessage) this.releaseMessageRepository.save(new ReleaseMessage(str))).getId()));
                newTransaction.setStatus("0");
                newTransaction.complete();
            } catch (Throwable th) {
                logger.error("Sending message to database failed", th);
                newTransaction.setStatus(th);
                throw th;
            }
        } catch (Throwable th2) {
            newTransaction.complete();
            throw th2;
        }
    }

    @PostConstruct
    private void initialize() {
        this.cleanExecutorService.submit(() -> {
            while (!this.cleanStopped.get() && !Thread.currentThread().isInterrupted()) {
                try {
                    Long poll = this.toClean.poll(1L, TimeUnit.SECONDS);
                    if (poll != null) {
                        cleanMessage(poll);
                    } else {
                        TimeUnit.SECONDS.sleep(5L);
                    }
                } catch (Throwable th) {
                    Tracer.logError(th);
                }
            }
        });
    }

    private void cleanMessage(Long l) {
        boolean z = true;
        ReleaseMessage releaseMessage = (ReleaseMessage) this.releaseMessageRepository.findOne(l);
        if (releaseMessage == null) {
            return;
        }
        while (z && !Thread.currentThread().isInterrupted()) {
            List<ReleaseMessage> findFirst100ByMessageAndIdLessThanOrderByIdAsc = this.releaseMessageRepository.findFirst100ByMessageAndIdLessThanOrderByIdAsc(releaseMessage.getMessage(), Long.valueOf(releaseMessage.getId()));
            this.releaseMessageRepository.delete(findFirst100ByMessageAndIdLessThanOrderByIdAsc);
            z = findFirst100ByMessageAndIdLessThanOrderByIdAsc.size() == CLEAN_QUEUE_MAX_SIZE;
            findFirst100ByMessageAndIdLessThanOrderByIdAsc.forEach(releaseMessage2 -> {
                Tracer.logEvent(String.format("ReleaseMessage.Clean.%s", releaseMessage2.getMessage()), String.valueOf(releaseMessage2.getId()));
            });
        }
    }

    void stopClean() {
        this.cleanStopped.set(true);
    }
}
