package io.jafka.producer.async;

import io.jafka.common.AsyncProducerInterruptedException;
import io.jafka.common.QueueClosedException;
import io.jafka.common.QueueFullException;
import io.jafka.mx.AsyncProducerQueueSizeStats;
import io.jafka.mx.AsyncProducerStats;
import io.jafka.producer.ProducerConfig;
import io.jafka.producer.SyncProducer;
import io.jafka.producer.serializer.Encoder;
import io.jafka.utils.Utils;
import java.io.Closeable;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/jafka/producer/async/AsyncProducer.class */
public class AsyncProducer<T> implements Closeable {
    private final Logger logger;
    private static final Random random = new Random();
    private static final String ProducerQueueSizeMBeanName = "jafka.producer.Producer:type=AsyncProducerQueueSizeStats";
    private final SyncProducer producer;
    private final CallbackHandler<T> callbackHandler;
    private final AtomicBoolean closed;
    private final LinkedBlockingQueue<QueueItem<T>> queue;
    private final int asyncProducerID;
    private final ProducerSendThread<T> sendThread;
    private final int enqueueTimeoutMs;

    public AsyncProducer(AsyncProducerConfig asyncProducerConfig, SyncProducer syncProducer, Encoder<T> encoder, EventHandler<T> eventHandler, Properties properties, CallbackHandler<T> callbackHandler, Properties properties2) {
        this.logger = LoggerFactory.getLogger(AsyncProducer.class);
        this.closed = new AtomicBoolean(false);
        this.asyncProducerID = random.nextInt();
        this.producer = syncProducer;
        this.callbackHandler = callbackHandler;
        this.enqueueTimeoutMs = asyncProducerConfig.getEnqueueTimeoutMs();
        this.queue = new LinkedBlockingQueue<>(asyncProducerConfig.getQueueSize());
        if (eventHandler != null) {
            eventHandler.init(properties);
        }
        if (callbackHandler != null) {
            callbackHandler.init(properties2);
        }
        this.sendThread = new ProducerSendThread<>("ProducerSendThread-" + this.asyncProducerID, this.queue, encoder, syncProducer, eventHandler != null ? eventHandler : new DefaultEventHandler<>(new ProducerConfig(asyncProducerConfig.getProperties()), callbackHandler), callbackHandler, asyncProducerConfig.getQueueTime(), asyncProducerConfig.getBatchSize());
        this.sendThread.setDaemon(false);
        AsyncProducerQueueSizeStats asyncProducerQueueSizeStats = new AsyncProducerQueueSizeStats(this.queue);
        asyncProducerQueueSizeStats.setMbeanName("jafka.producer.Producer:type=AsyncProducerQueueSizeStats-" + this.asyncProducerID);
        Utils.registerMBean(asyncProducerQueueSizeStats);
    }

    public AsyncProducer(AsyncProducerConfig asyncProducerConfig) {
        this(asyncProducerConfig, new SyncProducer(asyncProducerConfig), (Encoder) Utils.getObject(asyncProducerConfig.getSerializerClass()), (EventHandler) Utils.getObject(asyncProducerConfig.getEventHandler()), asyncProducerConfig.getEventHandlerProperties(), (CallbackHandler) Utils.getObject(asyncProducerConfig.getCbkHandler()), asyncProducerConfig.getCbkHandlerProperties());
    }

    public void start() {
        this.sendThread.start();
    }

    public void send(String str, T t) {
        send(str, t, -1);
    }

    public void send(String str, T t, int i) {
        AsyncProducerStats.recordEvent();
        if (this.closed.get()) {
            throw new QueueClosedException("Attempt to add event to a closed queue.");
        }
        QueueItem<T> queueItem = new QueueItem<>(t, i, str);
        if (this.callbackHandler != null) {
            queueItem = this.callbackHandler.beforeEnqueue(queueItem);
        }
        boolean z = false;
        if (queueItem != null) {
            try {
                if (this.enqueueTimeoutMs == 0) {
                    z = this.queue.offer(queueItem);
                } else if (this.enqueueTimeoutMs < 0) {
                    this.queue.put(queueItem);
                    z = true;
                } else {
                    z = this.queue.offer(queueItem, this.enqueueTimeoutMs, TimeUnit.MILLISECONDS);
                }
            } catch (InterruptedException e) {
                throw new AsyncProducerInterruptedException(e);
            }
        }
        if (this.callbackHandler != null) {
            this.callbackHandler.afterEnqueue(queueItem, z);
        }
        if (z) {
            return;
        }
        AsyncProducerStats.recordDroppedEvents();
        throw new QueueFullException("Event queue is full of unsent messages, could not send event: " + t);
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        if (this.callbackHandler != null) {
            this.callbackHandler.close();
        }
        this.closed.set(true);
        this.sendThread.shutdown();
        this.sendThread.awaitShutdown();
        this.producer.close();
        this.logger.info("Closed AsyncProducer");
    }
}
