/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.otter.canal.kafka;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;
import com.alibaba.otter.canal.common.MQMessageUtils;
import com.alibaba.otter.canal.common.MQProperties;
import com.alibaba.otter.canal.kafka.MessageSerializer;
import com.alibaba.otter.canal.protocol.FlatMessage;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.spi.CanalMQProducer;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.apache.commons.lang.StringUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CanalKafkaProducer
implements CanalMQProducer {
    private static final Logger logger = LoggerFactory.getLogger(CanalKafkaProducer.class);
    private Producer<String, Message> producer;
    private Producer<String, String> producer2;
    private MQProperties kafkaProperties;

    @Override
    public void init(MQProperties kafkaProperties) {
        this.kafkaProperties = kafkaProperties;
        Properties properties = new Properties();
        properties.put("bootstrap.servers", kafkaProperties.getServers());
        properties.put("acks", kafkaProperties.getAcks());
        properties.put("compression.type", kafkaProperties.getCompressionType());
        properties.put("batch.size", (Object)kafkaProperties.getBatchSize());
        properties.put("linger.ms", (Object)kafkaProperties.getLingerMs());
        properties.put("max.request.size", (Object)kafkaProperties.getMaxRequestSize());
        properties.put("buffer.memory", (Object)kafkaProperties.getBufferMemory());
        properties.put("key.serializer", StringSerializer.class.getName());
        properties.put("max.in.flight.requests.per.connection", (Object)1);
        if (!kafkaProperties.getProperties().isEmpty()) {
            properties.putAll((Map<?, ?>)kafkaProperties.getProperties());
        }
        properties.put("retries", (Object)kafkaProperties.getRetries());
        if (kafkaProperties.isKerberosEnable()) {
            File krb5File = new File(kafkaProperties.getKerberosKrb5FilePath());
            File jaasFile = new File(kafkaProperties.getKerberosJaasFilePath());
            if (krb5File.exists() && jaasFile.exists()) {
                System.setProperty("java.security.krb5.conf", krb5File.getAbsolutePath());
                System.setProperty("java.security.auth.login.config", jaasFile.getAbsolutePath());
                System.setProperty("javax.security.auth.useSubjectCredsOnly", "false");
                properties.put("security.protocol", "SASL_PLAINTEXT");
                properties.put("sasl.kerberos.service.name", "kafka");
            } else {
                String errorMsg = "ERROR # The kafka kerberos configuration file does not exist! please check it";
                logger.error(errorMsg);
                throw new RuntimeException(errorMsg);
            }
        }
        if (!kafkaProperties.getFlatMessage()) {
            properties.put("value.serializer", MessageSerializer.class.getName());
            this.producer = new KafkaProducer(properties);
        } else {
            properties.put("value.serializer", StringSerializer.class.getName());
            this.producer2 = new KafkaProducer(properties);
        }
    }

    @Override
    public void stop() {
        try {
            logger.info("## stop the kafka producer");
            if (this.producer != null) {
                this.producer.close();
            }
            if (this.producer2 != null) {
                this.producer2.close();
            }
        }
        catch (Throwable e) {
            logger.warn("##something goes wrong when stopping kafka producer:", e);
        }
        finally {
            logger.info("## kafka producer is down.");
        }
    }

    @Override
    public void send(MQProperties.CanalDestination canalDestination, Message message, CanalMQProducer.Callback callback) {
        try {
            if (!StringUtils.isEmpty((String)canalDestination.getDynamicTopic())) {
                Map<String, Message> messageMap = MQMessageUtils.messageTopics(message, canalDestination.getTopic(), canalDestination.getDynamicTopic());
                for (Map.Entry<String, Message> entry : messageMap.entrySet()) {
                    String topicName = entry.getKey();
                    Message messageSub = entry.getValue();
                    if (logger.isDebugEnabled()) {
                        logger.debug("## Send message to kafka topic: " + topicName);
                    }
                    this.send(canalDestination, topicName, messageSub);
                }
            } else {
                this.send(canalDestination, canalDestination.getTopic(), message);
            }
            callback.commit();
        }
        catch (Throwable e) {
            logger.error(e.getMessage(), e);
            callback.rollback();
        }
    }

    private void send(MQProperties.CanalDestination canalDestination, String topicName, Message message) throws Exception {
        if (!this.kafkaProperties.getFlatMessage()) {
            ArrayList<ProducerRecord> records = new ArrayList<ProducerRecord>();
            if (canalDestination.getPartitionHash() != null && !canalDestination.getPartitionHash().isEmpty()) {
                Message[] messages = MQMessageUtils.messagePartition(message, canalDestination.getPartitionsNum(), canalDestination.getPartitionHash());
                int length = messages.length;
                for (int i = 0; i < length; ++i) {
                    Message messagePartition = messages[i];
                    if (messagePartition == null) continue;
                    records.add(new ProducerRecord(topicName, Integer.valueOf(i), null, (Object)messagePartition));
                }
            } else {
                int partition = canalDestination.getPartition() != null ? canalDestination.getPartition() : 0;
                records.add(new ProducerRecord(topicName, Integer.valueOf(partition), null, (Object)message));
            }
            this.produce(topicName, records, false);
        } else {
            List<FlatMessage> flatMessages = MQMessageUtils.messageConverter(message);
            ArrayList<ProducerRecord> records = new ArrayList<ProducerRecord>();
            if (flatMessages != null) {
                for (FlatMessage flatMessage : flatMessages) {
                    if (canalDestination.getPartitionHash() != null && !canalDestination.getPartitionHash().isEmpty()) {
                        FlatMessage[] partitionFlatMessage = MQMessageUtils.messagePartition(flatMessage, canalDestination.getPartitionsNum(), canalDestination.getPartitionHash());
                        int length = partitionFlatMessage.length;
                        for (int i = 0; i < length; ++i) {
                            FlatMessage flatMessagePart = partitionFlatMessage[i];
                            if (flatMessagePart == null) continue;
                            records.add(new ProducerRecord(topicName, Integer.valueOf(i), null, (Object)JSON.toJSONString((Object)flatMessagePart, (SerializerFeature[])new SerializerFeature[]{SerializerFeature.WriteMapNullValue})));
                        }
                    } else {
                        int partition = canalDestination.getPartition() != null ? canalDestination.getPartition() : 0;
                        records.add(new ProducerRecord(topicName, Integer.valueOf(partition), null, (Object)JSON.toJSONString((Object)flatMessage, (SerializerFeature[])new SerializerFeature[]{SerializerFeature.WriteMapNullValue})));
                    }
                    this.produce(topicName, records, true);
                    records.clear();
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void produce(String topicName, List<ProducerRecord> records, boolean flatMessage) {
        Object producerTmp = null;
        producerTmp = flatMessage ? this.producer2 : this.producer;
        ArrayList<Future> futures = new ArrayList<Future>();
        try {
            for (ProducerRecord record : records) {
                futures.add(producerTmp.send(record));
            }
        }
        finally {
            if (logger.isDebugEnabled()) {
                for (ProducerRecord record : records) {
                    logger.debug("Send  message to kafka topic: [{}], packet: {}", (Object)topicName, (Object)record.toString());
                }
            }
            producerTmp.flush();
            for (Future future : futures) {
                try {
                    future.get();
                }
                catch (InterruptedException | ExecutionException e) {
                    throw new RuntimeException(e);
                }
            }
        }
    }
}

