package io.jafka.producer;

import io.jafka.cluster.Broker;
import io.jafka.cluster.Partition;
import io.jafka.common.InvalidPartitionException;
import io.jafka.common.NoBrokersForPartitionException;
import io.jafka.common.annotations.ClientSide;
import io.jafka.producer.BrokerPartitionInfo;
import io.jafka.producer.async.CallbackHandler;
import io.jafka.producer.async.EventHandler;
import io.jafka.producer.serializer.Encoder;
import io.jafka.utils.Closer;
import io.jafka.utils.Utils;
import io.jafka.utils.ZKConfig;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.SortedSet;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ClientSide
/* loaded from: input_file:io/jafka/producer/Producer.class */
public class Producer<K, V> implements BrokerPartitionInfo.Callback, IProducer<K, V> {
    ProducerConfig config;
    private Partitioner<K> partitioner;
    ProducerPool<V> producerPool;
    boolean populateProducerPool;
    BrokerPartitionInfo brokerPartitionInfo;
    private final Logger logger;
    private final AtomicBoolean hasShutdown;
    private final Random random;
    private final boolean zkEnabled;
    private Encoder<V> encoder;

    public Producer(ProducerConfig producerConfig, Partitioner<K> partitioner, ProducerPool<V> producerPool, boolean z, BrokerPartitionInfo brokerPartitionInfo) {
        this.logger = LoggerFactory.getLogger(Producer.class);
        this.hasShutdown = new AtomicBoolean(false);
        this.random = new Random();
        this.config = producerConfig;
        this.partitioner = partitioner;
        producerPool = producerPool == null ? new ProducerPool<>(producerConfig, getEncoder()) : producerPool;
        this.producerPool = producerPool;
        this.populateProducerPool = z;
        this.brokerPartitionInfo = brokerPartitionInfo;
        this.zkEnabled = producerConfig.getZkConnect() != null;
        if (this.brokerPartitionInfo == null) {
            if (this.zkEnabled) {
                Properties properties = new Properties();
                properties.put("zk.connect", producerConfig.getZkConnect());
                properties.put("zk.sessiontimeout.ms", "" + producerConfig.getZkSessionTimeoutMs());
                properties.put("zk.connectiontimeout.ms", "" + producerConfig.getZkConnectionTimeoutMs());
                properties.put("zk.synctime.ms", "" + producerConfig.getZkSyncTimeMs());
                this.brokerPartitionInfo = new ZKBrokerPartitionInfo(new ZKConfig(properties), this);
            } else {
                this.brokerPartitionInfo = new ConfigBrokerPartitionInfo(producerConfig);
            }
        }
        if (this.populateProducerPool) {
            for (Map.Entry<Integer, Broker> entry : this.brokerPartitionInfo.getAllBrokerInfo().entrySet()) {
                Broker value = entry.getValue();
                producerPool.addProducer(new Broker(entry.getKey().intValue(), value.host, value.host, value.port, value.autocreated));
            }
        }
    }

    public Producer(ProducerConfig producerConfig) {
        this(producerConfig, (Partitioner) null, (ProducerPool) null, true, (BrokerPartitionInfo) null);
    }

    public Producer(ProducerConfig producerConfig, Encoder<V> encoder, EventHandler<V> eventHandler, CallbackHandler<V> callbackHandler, Partitioner<K> partitioner) {
        this(producerConfig, (Partitioner) partitioner, new ProducerPool(producerConfig, encoder, eventHandler, callbackHandler), true, (BrokerPartitionInfo) null);
    }

    @Override // io.jafka.producer.IProducer
    public Encoder<V> getEncoder() {
        return this.encoder == null ? (Encoder) Utils.getObject(this.config.getSerializerClass()) : this.encoder;
    }

    @Override // io.jafka.producer.IProducer
    public void send(ProducerData<K, V> producerData) throws NoBrokersForPartitionException, InvalidPartitionException {
        if (producerData == null) {
            return;
        }
        if (this.zkEnabled) {
            zkSend(producerData);
        } else {
            configSend(producerData);
        }
    }

    private void configSend(ProducerData<K, V> producerData) {
        this.producerPool.send(create(producerData));
    }

    private void zkSend(ProducerData<K, V> producerData) {
        Broker broker = null;
        Partition partition = null;
        for (int i = 0; i <= this.config.getZkReadRetries() && broker == null; i++) {
            if (i > 0) {
                this.logger.info("Try #" + i + " ZK producer cache is stale. Refreshing it by reading from ZK again");
                this.brokerPartitionInfo.updateInfo();
            }
            ArrayList arrayList = new ArrayList(getPartitionListForTopic(producerData));
            partition = (Partition) arrayList.get(getPartition(producerData.getKey(), arrayList.size()));
            if (partition != null) {
                broker = this.brokerPartitionInfo.getBrokerInfo(partition.brokerId);
            }
        }
        if (broker == null) {
            throw new NoBrokersForPartitionException("Invalid Zookeeper state. Failed to get partition for topic: " + producerData.getTopic() + " and key: " + producerData.getKey());
        }
        this.producerPool.send(this.producerPool.getProducerPoolData(producerData.getTopic(), new Partition(partition.brokerId, partition.partId), producerData.getData()));
    }

    private int getPartition(K k, int i) {
        if (i <= 0) {
            throw new InvalidPartitionException("Invalid number of partitions: " + i + "\n Valid values are > 0");
        }
        int nextInt = k == null ? this.random.nextInt(i) : getPartitioner().partition(k, i);
        if (nextInt < 0 || nextInt >= i) {
            throw new InvalidPartitionException("Invalid partition id : " + nextInt + "\n Valid values are in the range inclusive [0, " + (i - 1) + "]");
        }
        return nextInt;
    }

    @Override // io.jafka.producer.BrokerPartitionInfo.Callback
    public void producerCbk(int i, String str, int i2, boolean z) {
        if (this.populateProducerPool) {
            this.producerPool.addProducer(new Broker(i, str, str, i2, z));
        } else {
            this.logger.debug("Skipping the callback since populateProducerPool = false");
        }
    }

    private ProducerPoolData<V> create(ProducerData<K, V> producerData) {
        Collection<Partition> partitionListForTopic = getPartitionListForTopic(producerData);
        return this.producerPool.getProducerPoolData(producerData.getTopic(), new Partition(((Partition) new ArrayList(partitionListForTopic).get(this.random.nextInt(partitionListForTopic.size()))).brokerId, -1), producerData.getData());
    }

    private Collection<Partition> getPartitionListForTopic(ProducerData<K, V> producerData) {
        SortedSet<Partition> brokerPartitionInfo = this.brokerPartitionInfo.getBrokerPartitionInfo(producerData.getTopic());
        if (brokerPartitionInfo.size() == 0) {
            throw new NoBrokersForPartitionException("Partition= " + producerData.getTopic());
        }
        return brokerPartitionInfo;
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        if (this.hasShutdown.compareAndSet(false, true)) {
            Closer.closeQuietly(this.producerPool);
            Closer.closeQuietly(this.brokerPartitionInfo);
        }
    }

    @Override // io.jafka.producer.IProducer
    public Partitioner<K> getPartitioner() {
        if (this.partitioner == null) {
            this.partitioner = (Partitioner) Utils.getObject(this.config.getPartitionerClass());
        }
        return this.partitioner;
    }

    public void setPartitioner(Partitioner<K> partitioner) {
        this.partitioner = partitioner;
    }
}
