package io.jafka.server;

import io.jafka.consumer.MessageStream;
import io.jafka.message.Message;
import io.jafka.producer.Producer;
import io.jafka.producer.ProducerData;
import java.io.Closeable;
import java.util.Iterator;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/jafka/server/MirroringThread.class */
public class MirroringThread extends Thread implements Closeable {
    private static final Logger logger = LoggerFactory.getLogger(MirroringThread.class);
    private final MessageStream<Message> stream;
    private final String topic;
    private final int threadId;
    private final CountDownLatch shutdownComplete = new CountDownLatch(1);
    private final Producer<Void, Message> producer;

    public MirroringThread(MessageStream<Message> messageStream, String str, int i, Producer<Void, Message> producer) {
        this.stream = messageStream;
        this.topic = str;
        this.threadId = i;
        this.producer = producer;
        setDaemon(false);
        setName(String.format("jafka-embedded-consumer-%s-%d", str, Integer.valueOf(i)));
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        logger.info(String.format("Starting mirroring thread %s for topic %s and stream %d", getName(), this.topic, Integer.valueOf(this.threadId)));
        try {
            try {
                Iterator<Message> it = this.stream.iterator();
                while (it.hasNext()) {
                    this.producer.send(new ProducerData<>(this.topic, it.next()));
                }
            } catch (Exception e) {
                logger.error(this.topic + " stream " + this.threadId + " unexpectedly existed", e);
                this.shutdownComplete.countDown();
                logger.info("Stopped mirroring thread " + getName() + " for topic " + this.topic + " and stream " + this.threadId);
            }
        } finally {
            this.shutdownComplete.countDown();
            logger.info("Stopped mirroring thread " + getName() + " for topic " + this.topic + " and stream " + this.threadId);
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        try {
            this.shutdownComplete.await(10L, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            logger.error("Shutdown of thread " + getName() + " interrupted.  Mirroring thread might leak data!");
        }
    }
}
