/*
 * Decompiled with CFR 0.152.
 */
package com.aliyun.datahub.clientlibrary.producer;

import com.aliyun.datahub.client.exception.DatahubClientException;
import com.aliyun.datahub.client.exception.InvalidParameterException;
import com.aliyun.datahub.client.exception.MalformedRecordException;
import com.aliyun.datahub.client.exception.ShardNotFoundException;
import com.aliyun.datahub.client.exception.ShardSealedException;
import com.aliyun.datahub.client.model.RecordEntry;
import com.aliyun.datahub.client.util.FormatUtils;
import com.aliyun.datahub.clientlibrary.common.BackEndTask;
import com.aliyun.datahub.clientlibrary.common.ClientHelper;
import com.aliyun.datahub.clientlibrary.config.ProducerConfig;
import com.aliyun.datahub.clientlibrary.exception.ClientException;
import com.aliyun.datahub.clientlibrary.exception.ExceptionRetryer;
import com.aliyun.datahub.clientlibrary.interceptor.WriteInterceptor;
import com.aliyun.datahub.clientlibrary.models.Assignment;
import com.aliyun.datahub.clientlibrary.producer.ShardAssigner;
import com.aliyun.datahub.clientlibrary.producer.ShardGroupWriter;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Producer {
    private static final Logger LOG = LoggerFactory.getLogger(Producer.class);
    private ClientHelper clientHelper;
    private ShardGroupWriter shardGroupWriter;
    private ShardAssigner shardAssigner;
    private boolean autoAssigned;
    private volatile long assignmentVersion = -1L;
    private Heartbeat heartbeat;
    private final Object updateLock = new Object();
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private final WriteInterceptor interceptor;

    public Producer(String projectName, String topicName, ProducerConfig config) {
        this.preCheck(projectName, topicName);
        this.clientHelper = config.getHelperBuilder().setProjectName(projectName).setTopicName(topicName).build();
        this.shardGroupWriter = new ShardGroupWriter(projectName, topicName, config, this.clientHelper);
        this.shardAssigner = new ShardAssigner(this.clientHelper.getShardManager());
        this.autoAssigned = true;
        this.interceptor = config.getInterceptorBuilder().buildWriteInterceptor();
        this.interceptor.setProperty("topic", topicName);
        this.startHeartbeat(projectName, topicName, config);
    }

    public Producer(String projectName, String topicName, List<String> shardIds, ProducerConfig config) {
        this.preCheck(projectName, topicName, shardIds);
        this.clientHelper = config.getHelperBuilder().setProjectName(projectName).setTopicName(topicName).build();
        this.shardAssigner = new ShardAssigner(this.clientHelper.getShardManager());
        if (!this.shardAssigner.checkAllActive(shardIds)) {
            throw new InvalidParameterException("Shard must be valid and active");
        }
        this.shardGroupWriter = new ShardGroupWriter(projectName, topicName, config, this.clientHelper);
        this.shardGroupWriter.createShardWriter(shardIds);
        this.autoAssigned = false;
        this.interceptor = config.getInterceptorBuilder().buildWriteInterceptor();
        this.interceptor.setProperty("topic", topicName);
        this.startHeartbeat(projectName, topicName, config);
    }

    public void send(List<RecordEntry> records, int maxRetry) {
        this.send(records, null, maxRetry);
    }

    public void send(List<RecordEntry> records, String shardId, int maxRetry) {
        if (this.closed.get()) {
            throw new ClientException("This producer has already been closed");
        }
        if (maxRetry < 0) {
            throw new InvalidParameterException("Retry must not be negative");
        }
        if (this.heartbeat != null && this.heartbeat.isStopped()) {
            this.heartbeat.start();
        }
        List<RecordEntry> processed = this.interceptor.beforeWrite(records);
        for (int retry = 0; retry <= maxRetry && !this.closed.get(); ++retry) {
            try {
                this.syncAssignmentIfNeeded();
                this.shardGroupWriter.write(processed, shardId);
                return;
            }
            catch (InvalidParameterException | MalformedRecordException | ClientException e) {
                this.fail((DatahubClientException)((Object)e), true);
                continue;
            }
            catch (ShardNotFoundException | ShardSealedException e) {
                if (retry < maxRetry) {
                    this.shardAssigner.triggerUpdateAndWait();
                } else {
                    this.shardAssigner.triggerUpdate();
                }
                this.fail((DatahubClientException)e, retry == maxRetry);
                continue;
            }
            catch (DatahubClientException e) {
                this.fail(e, retry == maxRetry);
            }
        }
        throw new ClientException("Send records failed, retry limit exceeded");
    }

    public void close() {
        if (this.heartbeat != null) {
            this.heartbeat.stop();
        }
        if (this.closed.compareAndSet(false, true)) {
            this.shardGroupWriter.close();
        }
        if (this.clientHelper != null) {
            this.clientHelper.close();
        }
    }

    private void preCheck(String projectName, String topicName) {
        if (!FormatUtils.checkProjectName((String)projectName)) {
            throw new InvalidParameterException("ProjectName format is invalid");
        }
        if (!FormatUtils.checkTopicName((String)topicName)) {
            throw new InvalidParameterException("TopicName format is invalid");
        }
    }

    private void preCheck(String projectName, String topicName, List<String> shardIds) {
        this.preCheck(projectName, topicName);
        if (shardIds == null || shardIds.isEmpty()) {
            throw new InvalidParameterException("ShardIds must not be empty");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void syncAssignmentIfNeeded() {
        if (!this.autoAssigned) {
            return;
        }
        Assignment newAssignment = this.shardAssigner.getAssignment();
        if (this.assignmentVersion != newAssignment.getVersion()) {
            Object object = this.updateLock;
            synchronized (object) {
                if (this.assignmentVersion != newAssignment.getVersion()) {
                    List<String> newShardList = newAssignment.getNewShardList(this.shardGroupWriter.getShards());
                    List<String> releaseShardList = newAssignment.getReleaseShardList(this.shardGroupWriter.getShards());
                    this.shardGroupWriter.createShardWriter(newShardList);
                    this.shardGroupWriter.removeShardWriter(releaseShardList);
                    this.assignmentVersion = newAssignment.getVersion();
                }
            }
        }
    }

    private void fail(DatahubClientException exception, boolean needThrow) {
        if (needThrow) {
            LOG.error("Send records failed", (Throwable)exception);
            throw exception;
        }
        try {
            Thread.sleep(50L);
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
        ExceptionRetryer.invalidUriProtect(exception);
        LOG.debug("Send records retrying, Exception: {}", (Object)exception.getMessage());
    }

    private void startHeartbeat(String projectName, String topicName, ProducerConfig config) {
        if (config.isSendHeartbeat() && config.getHeartbeatGenerator() != null) {
            this.heartbeat = new Heartbeat(projectName, topicName, config);
            this.heartbeat.start();
        }
    }

    private class Heartbeat
    extends BackEndTask {
        private String projectName;
        private String topicName;
        private ProducerConfig config;

        Heartbeat(String projectName, String topicName, ProducerConfig config) {
            this.taskName = "producer-heartbeat";
            this.projectName = projectName;
            this.topicName = topicName;
            this.config = config;
        }

        void heartbeat() {
            RecordEntry heartbeatMessage = this.config.getHeartbeatGenerator().genHeartbeat();
            if (heartbeatMessage != null) {
                String shard;
                Producer.this.syncAssignmentIfNeeded();
                int size = Producer.this.shardAssigner.getAssignment().getSize();
                ArrayList<RecordEntry> records = new ArrayList<RecordEntry>();
                records.add(heartbeatMessage);
                HashSet<String> visited = new HashSet<String>();
                for (int i = 0; i < size && !visited.contains(shard = Producer.this.shardGroupWriter.write(records, null)); ++i) {
                    visited.add(shard);
                }
            }
        }

        @Override
        protected void run() {
            LOG.info("Producer heartbeat start, Project: {}, Topic: {}, IntervalMs: {}", new Object[]{this.projectName, this.topicName, 30000});
            while (this.isRunning()) {
                this.waitSignal(30000L, 30000L);
                try {
                    this.heartbeat();
                    LOG.debug("Producer send heartbeat message, Project: {}, Topic: {}", (Object)this.projectName, (Object)this.topicName);
                }
                catch (DatahubClientException e) {
                    if (!ExceptionRetryer.isFatalException(e)) continue;
                    LOG.error("Producer send heartbeat fatal error, task stopped, Project: {}, Topic: {}", new Object[]{this.projectName, this.topicName, e});
                    this.stop();
                    break;
                }
                catch (Throwable e) {
                    LOG.error("Producer send heartbeat failed, Project: {}, Topic: {}", new Object[]{this.projectName, this.topicName, e});
                }
            }
            LOG.info("Producer heartbeat stop, Project: {}, Topic: {}", (Object)this.projectName, (Object)this.topicName);
        }
    }
}

