package io.jafka.consumer;

import com.github.zkclient.ZkClient;
import io.jafka.cluster.Cluster;
import io.jafka.common.annotations.ClientSide;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ClientSide
/* loaded from: input_file:io/jafka/consumer/Fetcher.class */
public class Fetcher {
    private final ConsumerConfig config;
    private final ZkClient zkClient;
    private final Logger logger = LoggerFactory.getLogger(Fetcher.class);
    private volatile List<FetcherRunnable> fetcherThreads = new ArrayList(0);

    public Fetcher(ConsumerConfig consumerConfig, ZkClient zkClient) {
        this.config = consumerConfig;
        this.zkClient = zkClient;
    }

    public void stopConnectionsToAllBrokers() {
        List<FetcherRunnable> list = this.fetcherThreads;
        this.fetcherThreads = new ArrayList(0);
        Iterator<FetcherRunnable> it = list.iterator();
        while (it.hasNext()) {
            try {
                it.next().shutdown();
            } catch (InterruptedException e) {
                this.logger.warn(e.getMessage(), e);
            }
        }
    }

    public <T> void startConnections(Iterable<PartitionTopicInfo> iterable, Cluster cluster, Map<String, List<MessageStream<T>>> map) {
        if (iterable == null) {
            return;
        }
        HashMap hashMap = new HashMap();
        for (PartitionTopicInfo partitionTopicInfo : iterable) {
            if (cluster.getBroker(Integer.valueOf(partitionTopicInfo.brokerId)) == null) {
                throw new IllegalStateException("Broker " + partitionTopicInfo.brokerId + " is unavailable, fetchers could not be started");
            }
            List list = (List) hashMap.get(Integer.valueOf(partitionTopicInfo.brokerId));
            if (list == null) {
                list = new ArrayList();
                hashMap.put(Integer.valueOf(partitionTopicInfo.brokerId), list);
            }
            list.add(partitionTopicInfo);
        }
        ArrayList arrayList = new ArrayList();
        for (Map.Entry entry : hashMap.entrySet()) {
            FetcherRunnable fetcherRunnable = new FetcherRunnable("FetchRunnable-" + entry.getKey(), this.zkClient, this.config, cluster.getBroker((Integer) entry.getKey()), (List) entry.getValue());
            arrayList.add(fetcherRunnable);
            fetcherRunnable.start();
        }
        this.fetcherThreads = arrayList;
    }

    public <T> void clearFetcherQueues(Collection<BlockingQueue<FetchedDataChunk>> collection, Collection<List<MessageStream<T>>> collection2) {
        Iterator<BlockingQueue<FetchedDataChunk>> it = collection.iterator();
        while (it.hasNext()) {
            it.next().clear();
        }
        Iterator<List<MessageStream<T>>> it2 = collection2.iterator();
        while (it2.hasNext()) {
            Iterator<MessageStream<T>> it3 = it2.next().iterator();
            while (it3.hasNext()) {
                it3.next().clear();
            }
        }
    }
}
