/*
 * Decompiled with CFR 0.152.
 */
package com.vcarecity.utilkafka;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.vcarecity.utilkafka.BaseKafkaConsumerListener;
import com.vcarecity.utilkafka.BaseKafkaListener;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

public class KafkaConsumerUtil
implements BaseKafkaListener {
    private AtomicBoolean kafkaConsumerSwitch = new AtomicBoolean(true);
    private List<KafkaConsumer<String, String>> consumerList;
    private String consumerConfig;
    private String groupId;
    private String nameFormat;
    private BaseKafkaConsumerListener<String, String> consumerListener;
    private int corePoolSize;
    private int maxNumPoolSize;
    private int maxQueueSize;
    private List<String> topics;

    public KafkaConsumerUtil(String consumerConfig, String groupId, List<String> topics, String nameFormat, int corePoolSize, int maxNumPoolSize, int maxQueueSize) {
        this.consumerConfig = consumerConfig;
        this.groupId = groupId;
        this.topics = topics;
        this.nameFormat = nameFormat;
        this.corePoolSize = corePoolSize;
        this.maxNumPoolSize = maxNumPoolSize;
        this.maxQueueSize = maxQueueSize;
        this.consumerList = new ArrayList<KafkaConsumer<String, String>>();
    }

    public void startKafka(BaseKafkaConsumerListener<String, String> listener) throws IOException {
        this.consumerListener = listener;
        this.startKafka();
    }

    @Override
    public void startKafka() throws IOException {
        Properties props = new Properties();
        props.load(new FileInputStream(this.consumerConfig));
        if (this.groupId != null) {
            props.put("group.id", this.groupId);
        }
        System.out.println(props);
        ThreadFactory nameFactory = new ThreadFactoryBuilder().setNameFormat(this.nameFormat).build();
        ThreadPoolExecutor threadPool = new ThreadPoolExecutor(this.corePoolSize, this.maxNumPoolSize, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(this.maxQueueSize), nameFactory, new ThreadPoolExecutor.AbortPolicy());
        for (int i = 0; i < this.corePoolSize; ++i) {
            threadPool.execute(() -> {
                if (this.topics.isEmpty()) {
                    return;
                }
                KafkaConsumer consumer = new KafkaConsumer(props);
                consumer.subscribe(this.topics);
                this.consumerList.add((KafkaConsumer<String, String>)consumer);
                while (this.kafkaConsumerSwitch.get()) {
                    ConsumerRecords records = consumer.poll(100L);
                    if (this.consumerListener == null) continue;
                    for (ConsumerRecord record : records) {
                        this.consumerListener.receiveMessage((ConsumerRecord<String, String>)record);
                    }
                }
            });
        }
        threadPool.shutdown();
    }

    @Override
    public void stopKafka() {
        for (KafkaConsumer<String, String> consumer : this.consumerList) {
            consumer.close();
        }
    }

    public void setConsumerListener(BaseKafkaConsumerListener<String, String> consumerListener) {
        this.consumerListener = consumerListener;
    }

    public static class Builder {
        private String consumerConfig;
        private List<String> topics = new ArrayList<String>();
        private String nameFormat = "VC-Kafka-Consumer-%d";
        private int corePoolSize = 1;
        private int maxNumPoolSize = 4;
        private int maxQueueSize = 1024;
        private String groupId = null;
        private boolean isStart = true;

        public Builder setConsumerConfig(String consumerConfig) {
            this.consumerConfig = consumerConfig;
            return this;
        }

        public Builder setTopics(List<String> topics) {
            this.topics = topics;
            return this;
        }

        public Builder setNameFormat(String nameFormat) {
            this.nameFormat = nameFormat;
            return this;
        }

        public Builder setCorePoolSize(int corePoolSize) {
            this.corePoolSize = corePoolSize;
            return this;
        }

        public Builder setMaxNumPoolSize(int maxNumPoolSize) {
            this.maxNumPoolSize = maxNumPoolSize;
            return this;
        }

        public Builder setMaxQueueSize(int maxQueueSize) {
            this.maxQueueSize = maxQueueSize;
            return this;
        }

        public Builder setGroupId(String groupId) {
            this.groupId = groupId;
            return this;
        }

        public KafkaConsumerUtil build() {
            return new KafkaConsumerUtil(this.consumerConfig, this.groupId, this.topics, this.nameFormat, this.corePoolSize, this.maxNumPoolSize, this.maxQueueSize);
        }
    }
}

