package io.jafka.consumer;

import com.github.zkclient.ZkClient;
import io.jafka.api.FetchRequest;
import io.jafka.api.OffsetRequest;
import io.jafka.cluster.Broker;
import io.jafka.cluster.Partition;
import io.jafka.common.ErrorMapping;
import io.jafka.common.annotations.ClientSide;
import io.jafka.message.ByteBufferMessageSet;
import io.jafka.utils.Closer;
import io.jafka.utils.zookeeper.ZkGroupTopicDirs;
import io.jafka.utils.zookeeper.ZkUtils;
import java.io.IOException;
import java.nio.channels.ClosedByInterruptException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ClientSide
/* loaded from: input_file:io/jafka/consumer/FetcherRunnable.class */
public class FetcherRunnable extends Thread {
    private final CountDownLatch shutdownLatch;
    private final SimpleConsumer simpleConsumer;
    private volatile boolean stopped;
    private final ConsumerConfig config;
    private final Broker broker;
    private final ZkClient zkClient;
    private final List<PartitionTopicInfo> partitionTopicInfos;
    private final Logger logger;
    private static final AtomicInteger threadIndex = new AtomicInteger(0);

    public FetcherRunnable(String str, ZkClient zkClient, ConsumerConfig consumerConfig, Broker broker, List<PartitionTopicInfo> list) {
        super(str + "-" + threadIndex.getAndIncrement());
        this.shutdownLatch = new CountDownLatch(1);
        this.stopped = false;
        this.logger = LoggerFactory.getLogger(FetcherRunnable.class);
        this.zkClient = zkClient;
        this.config = consumerConfig;
        this.broker = broker;
        this.partitionTopicInfos = list;
        this.simpleConsumer = new SimpleConsumer(broker.host, broker.port, consumerConfig.getSocketTimeoutMs(), consumerConfig.getSocketBufferSize());
    }

    public void shutdown() throws InterruptedException {
        this.logger.info("shutdown the fetcher " + getName());
        this.stopped = true;
        interrupt();
        this.shutdownLatch.await(5L, TimeUnit.SECONDS);
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        StringBuilder sb = new StringBuilder("[");
        for (PartitionTopicInfo partitionTopicInfo : this.partitionTopicInfos) {
            sb.append(String.format("%s-%d-%d,", partitionTopicInfo.topic, Integer.valueOf(partitionTopicInfo.partition.brokerId), Integer.valueOf(partitionTopicInfo.partition.partId)));
        }
        sb.append(']');
        this.logger.info(String.format("%s comsume at %s:%d with %s", getName(), this.broker.host, Integer.valueOf(this.broker.port), sb.toString()));
        try {
            long maxFetchBackoffMs = this.config.getMaxFetchBackoffMs();
            long fetchBackoffMs = this.config.getFetchBackoffMs();
            while (!this.stopped) {
                if (fetchOnce() == 0) {
                    if (this.logger.isDebugEnabled()) {
                        this.logger.debug("backing off " + fetchBackoffMs + " ms");
                    }
                    Thread.sleep(fetchBackoffMs);
                    if (fetchBackoffMs < maxFetchBackoffMs) {
                        fetchBackoffMs += fetchBackoffMs / 10;
                    }
                } else {
                    fetchBackoffMs = this.config.getFetchBackoffMs();
                }
            }
        } catch (ClosedByInterruptException e) {
            this.logger.info("FetcherRunnable " + this + " interrupted");
        } catch (Exception e2) {
            if (this.stopped) {
                this.logger.info("FetcherRunnable " + this + " interrupted");
            } else {
                this.logger.error("error in FetcherRunnable ", e2);
            }
        }
        this.logger.debug("stopping fetcher " + getName() + " to broker " + this.broker);
        Closer.closeQuietly(this.simpleConsumer);
        shutdownComplete();
    }

    private long fetchOnce() throws IOException, InterruptedException {
        ArrayList arrayList = new ArrayList();
        for (PartitionTopicInfo partitionTopicInfo : this.partitionTopicInfos) {
            arrayList.add(new FetchRequest(partitionTopicInfo.topic, partitionTopicInfo.partition.partId, partitionTopicInfo.getFetchedOffset(), this.config.getFetchSize()));
        }
        int i = 0;
        long j = 0;
        Iterator<ByteBufferMessageSet> it = this.simpleConsumer.multifetch(arrayList).iterator();
        while (it.hasNext()) {
            ByteBufferMessageSet next = it.next();
            PartitionTopicInfo partitionTopicInfo2 = this.partitionTopicInfos.get(i);
            try {
                j += processMessages(next, partitionTopicInfo2);
                i++;
            } catch (IOException e) {
                throw e;
            } catch (InterruptedException e2) {
                if (!this.stopped) {
                    this.logger.error("error in FetcherRunnable for " + partitionTopicInfo2, e2);
                    partitionTopicInfo2.enqueueError(e2, partitionTopicInfo2.getFetchedOffset());
                }
                throw e2;
            } catch (RuntimeException e3) {
                if (!this.stopped) {
                    this.logger.error("error in FetcherRunnable for " + partitionTopicInfo2, e3);
                    partitionTopicInfo2.enqueueError(e3, partitionTopicInfo2.getFetchedOffset());
                }
                throw e3;
            }
        }
        return j;
    }

    private long processMessages(ByteBufferMessageSet byteBufferMessageSet, PartitionTopicInfo partitionTopicInfo) throws IOException, InterruptedException {
        boolean z = false;
        if (byteBufferMessageSet.getErrorCode() == ErrorMapping.OffsetOutOfRangeCode) {
            this.logger.warn("offset for " + partitionTopicInfo + " out of range, now we fix it");
            long resetConsumerOffsets = resetConsumerOffsets(partitionTopicInfo.topic, partitionTopicInfo.partition);
            if (resetConsumerOffsets >= 0) {
                partitionTopicInfo.resetFetchOffset(resetConsumerOffsets);
                partitionTopicInfo.resetConsumeOffset(resetConsumerOffsets);
                z = true;
            }
        }
        if (z) {
            return 0L;
        }
        return partitionTopicInfo.enqueue(byteBufferMessageSet, partitionTopicInfo.getFetchedOffset());
    }

    private void shutdownComplete() {
        this.shutdownLatch.countDown();
    }

    private long resetConsumerOffsets(String str, Partition partition) throws IOException {
        long j = -1;
        String autoOffsetReset = this.config.getAutoOffsetReset();
        if (OffsetRequest.SMALLES_TIME_STRING.equals(autoOffsetReset)) {
            j = -2;
        } else if (OffsetRequest.LARGEST_TIME_STRING.equals(autoOffsetReset)) {
            j = -1;
        }
        ZkGroupTopicDirs zkGroupTopicDirs = new ZkGroupTopicDirs(this.config.getGroupId(), str);
        long[] offsetsBefore = this.simpleConsumer.getOffsetsBefore(str, partition.partId, j, 1);
        ZkUtils.updatePersistentPath(this.zkClient, zkGroupTopicDirs.consumerOffsetDir + "/" + partition.getName(), "" + offsetsBefore[0]);
        return offsetsBefore[0];
    }
}
