package com.vcarecity.utilkafka;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

/* loaded from: input_file:com/vcarecity/utilkafka/KafkaConsumerUtil.class */
public class KafkaConsumerUtil implements BaseKafkaListener {
    private AtomicBoolean kafkaConsumerSwitch = new AtomicBoolean(true);
    private List<KafkaConsumer<String, String>> consumerList = new ArrayList();
    private String consumerConfig;
    private String groupId;
    private String nameFormat;
    private BaseKafkaConsumerListener<String, String> consumerListener;
    private int corePoolSize;
    private int maxNumPoolSize;
    private int maxQueueSize;
    private List<String> topics;

    /* loaded from: input_file:com/vcarecity/utilkafka/KafkaConsumerUtil$Builder.class */
    public static class Builder {
        private String consumerConfig;
        private List<String> topics = new ArrayList();
        private String nameFormat = "VC-Kafka-Consumer-%d";
        private int corePoolSize = 1;
        private int maxNumPoolSize = 4;
        private int maxQueueSize = 1024;
        private String groupId = null;
        private boolean isStart = true;

        public Builder setConsumerConfig(String str) {
            this.consumerConfig = str;
            return this;
        }

        public Builder setTopics(List<String> list) {
            this.topics = list;
            return this;
        }

        public Builder setNameFormat(String str) {
            this.nameFormat = str;
            return this;
        }

        public Builder setCorePoolSize(int i) {
            this.corePoolSize = i;
            return this;
        }

        public Builder setMaxNumPoolSize(int i) {
            this.maxNumPoolSize = i;
            return this;
        }

        public Builder setMaxQueueSize(int i) {
            this.maxQueueSize = i;
            return this;
        }

        public Builder setGroupId(String str) {
            this.groupId = str;
            return this;
        }

        public KafkaConsumerUtil build() {
            return new KafkaConsumerUtil(this.consumerConfig, this.groupId, this.topics, this.nameFormat, this.corePoolSize, this.maxNumPoolSize, this.maxQueueSize);
        }
    }

    public KafkaConsumerUtil(String str, String str2, List<String> list, String str3, int i, int i2, int i3) {
        this.consumerConfig = str;
        this.groupId = str2;
        this.topics = list;
        this.nameFormat = str3;
        this.corePoolSize = i;
        this.maxNumPoolSize = i2;
        this.maxQueueSize = i3;
    }

    public void startKafka(BaseKafkaConsumerListener<String, String> baseKafkaConsumerListener) throws IOException {
        this.consumerListener = baseKafkaConsumerListener;
        startKafka();
    }

    @Override // com.vcarecity.utilkafka.BaseKafkaListener
    public void startKafka() throws IOException {
        Properties properties = new Properties();
        properties.load(new FileInputStream(this.consumerConfig));
        if (this.groupId != null) {
            properties.put("group.id", this.groupId);
        }
        System.out.println(properties);
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(this.corePoolSize, this.maxNumPoolSize, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(this.maxQueueSize), new ThreadFactoryBuilder().setNameFormat(this.nameFormat).build(), new ThreadPoolExecutor.AbortPolicy());
        for (int i = 0; i < this.corePoolSize; i++) {
            threadPoolExecutor.execute(() -> {
                if (this.topics.isEmpty()) {
                    return;
                }
                KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
                kafkaConsumer.subscribe(this.topics);
                this.consumerList.add(kafkaConsumer);
                while (this.kafkaConsumerSwitch.get()) {
                    ConsumerRecords poll = kafkaConsumer.poll(100L);
                    if (this.consumerListener != null) {
                        Iterator it = poll.iterator();
                        while (it.hasNext()) {
                            this.consumerListener.receiveMessage((ConsumerRecord) it.next());
                        }
                    }
                }
            });
        }
        threadPoolExecutor.shutdown();
    }

    @Override // com.vcarecity.utilkafka.BaseKafkaListener
    public void stopKafka() {
        Iterator<KafkaConsumer<String, String>> it = this.consumerList.iterator();
        while (it.hasNext()) {
            it.next().close();
        }
    }

    public void setConsumerListener(BaseKafkaConsumerListener<String, String> baseKafkaConsumerListener) {
        this.consumerListener = baseKafkaConsumerListener;
    }
}
