/*
 * Decompiled with CFR 0.152.
 */
package com.aliyun.datahub.clientlibrary.consumer;

import com.aliyun.datahub.client.exception.DatahubClientException;
import com.aliyun.datahub.client.exception.InvalidParameterException;
import com.aliyun.datahub.client.model.RecordEntry;
import com.aliyun.datahub.client.model.RecordKey;
import com.aliyun.datahub.client.util.FormatUtils;
import com.aliyun.datahub.clientlibrary.callback.ShardReadEndCallback;
import com.aliyun.datahub.clientlibrary.common.BackEndTask;
import com.aliyun.datahub.clientlibrary.common.ClientHelper;
import com.aliyun.datahub.clientlibrary.config.CheckTaskConfig;
import com.aliyun.datahub.clientlibrary.config.ConsumerConfig;
import com.aliyun.datahub.clientlibrary.consumer.OffsetCoordinator;
import com.aliyun.datahub.clientlibrary.consumer.ShardCoordinator;
import com.aliyun.datahub.clientlibrary.consumer.ShardGroupReader;
import com.aliyun.datahub.clientlibrary.exception.ExceptionRetryer;
import com.aliyun.datahub.clientlibrary.models.Assignment;
import com.aliyun.datahub.clientlibrary.models.Offset;
import com.aliyun.datahub.clientlibrary.models.RecordKeyImpl;
import com.aliyun.datahub.clientlibrary.models.ShardRecordKey;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Consumer {
    private static final Logger LOG = LoggerFactory.getLogger(Consumer.class);
    private ClientHelper clientHelper;
    private OffsetCoordinator offsetCoordinator;
    private ShardGroupReader shardGroupReader;
    private ShardCoordinator shardCoordinator;
    private PeriodicCheckTask checkTask;
    private ShardReadEndCallback shardReadEndCallback;
    private long assignmentVersion = -1L;
    private long commitVersion = -1L;
    private ShardRecordKey lastKey = null;
    private RecordEntry stageRecord = null;
    private int hashId = System.identityHashCode(this);
    private String projectName;
    private String topicName;
    private String subId;
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private final Object assignmentLock = new Object();

    public Consumer(String projectName, String topicName, String subId, ConsumerConfig config) {
        this.preCheck(projectName, topicName, subId);
        try {
            this.clientHelper = config.getHelperBuilder().setProjectName(projectName).setTopicName(topicName).setSubId(subId).build();
            this.shardReadEndCallback = config.getShardReadEndCallback();
            this.shardGroupReader = new ShardGroupReader(projectName, topicName, config, this.clientHelper);
            this.shardGroupReader.setSubId(subId);
            this.shardCoordinator = new ShardCoordinator(projectName, topicName, subId, config, this.clientHelper.getClient());
            this.offsetCoordinator = new OffsetCoordinator(projectName, topicName, subId, config, this.clientHelper.getClient(), this.getConsumerId());
            this.checkTask = new PeriodicCheckTask(config.getCheckTaskConfig());
            this.checkTask.start();
        }
        catch (Exception e) {
            this.close();
            throw e;
        }
    }

    public Consumer(String projectName, String topicName, String subId, List<String> shardIds, ConsumerConfig config) {
        this.preCheck(projectName, topicName, subId, shardIds);
        try {
            this.clientHelper = config.getHelperBuilder().setProjectName(projectName).setTopicName(topicName).setSubId(subId).build();
            this.shardReadEndCallback = config.getShardReadEndCallback();
            this.shardGroupReader = new ShardGroupReader(projectName, topicName, config, this.clientHelper);
            this.shardGroupReader.setSubId(subId);
            this.offsetCoordinator = new OffsetCoordinator(projectName, topicName, subId, config, this.clientHelper.getClient(), this.getConsumerId());
            this.shardGroupReader.createShardReader(this.offsetCoordinator.openAndGetOffsets(shardIds));
            this.checkTask = new PeriodicCheckTask(config.getCheckTaskConfig());
            this.checkTask.start();
        }
        catch (Exception e) {
            this.close();
            throw e;
        }
    }

    public Consumer(String projectName, String topicName, String subId, Map<String, Offset> offsetMap, ConsumerConfig config) {
        this.preCheck(projectName, topicName, subId, offsetMap);
        try {
            this.clientHelper = config.getHelperBuilder().setProjectName(projectName).setTopicName(topicName).setSubId(subId).build();
            this.shardReadEndCallback = config.getShardReadEndCallback();
            this.shardGroupReader = new ShardGroupReader(projectName, topicName, config, this.clientHelper);
            this.shardGroupReader.setSubId(subId);
            this.offsetCoordinator = new OffsetCoordinator(projectName, topicName, subId, config, this.clientHelper.getClient(), this.getConsumerId());
            this.offsetCoordinator.openAndGetOffsets(new ArrayList<String>(offsetMap.keySet()));
            this.shardGroupReader.createShardReader(offsetMap);
            this.checkTask = new PeriodicCheckTask(config.getCheckTaskConfig());
            this.checkTask.start();
        }
        catch (Exception e) {
            this.close();
            throw e;
        }
    }

    public RecordEntry read(int maxRetry) {
        if (this.closed.get()) {
            throw new InvalidParameterException("This consumer has already been closed");
        }
        if (maxRetry < 0) {
            throw new InvalidParameterException("Retry must not be negative");
        }
        this.ackLast();
        this.checkTask.touch();
        for (int i = 0; i <= maxRetry && !this.closed.get(); ++i) {
            RecordEntry record;
            this.syncAssignment();
            this.offsetCoordinator.checkCommitTask();
            RecordEntry recordEntry = record = this.stageRecord == null ? this.shardGroupReader.read() : this.stageRecord;
            if (record != null) {
                try {
                    RecordKeyImpl recordKey = new RecordKeyImpl(record.getSequence(), record.getSystemTime());
                    this.offsetCoordinator.check(record.getShardId(), recordKey);
                    this.lastKey = new ShardRecordKey(record.getShardId(), recordKey);
                    record.setKey((RecordKey)recordKey);
                    this.stageRecord = null;
                    return record;
                }
                catch (Throwable e) {
                    LOG.warn("Check record key failed, project: {}, topic: {}, subId: {}, consumer: {}", new Object[]{this.projectName, this.topicName, this.subId, this.getConsumerId(), e});
                    this.stageRecord = record;
                    throw e;
                }
            }
            LOG.debug("No records yet and return null, project: {}, topic: {}, subId: {}, consumer: {}", new Object[]{this.projectName, this.topicName, this.subId, this.getConsumerId()});
            if (i >= maxRetry || this.closed.get()) continue;
            this.shardGroupReader.waitSignal(1000L);
        }
        return null;
    }

    public String getConsumerId() {
        return this.shardCoordinator == null ? this.subId + "-hashId-" + this.hashId : this.shardCoordinator.getConsumerId();
    }

    public void close() {
        if (!this.closed.compareAndSet(false, true)) {
            return;
        }
        if (this.checkTask != null) {
            this.checkTask.stop();
        }
        if (this.shardGroupReader != null) {
            this.shardGroupReader.close();
        }
        if (this.shardCoordinator != null) {
            this.shardCoordinator.close();
        }
        if (this.offsetCoordinator != null) {
            this.ackLast();
            this.offsetCoordinator.close();
        }
        if (this.clientHelper != null) {
            this.clientHelper.close();
        }
        LOG.info("Consumer closed");
    }

    private void preCheck(String projectName, String topicName, String subId) {
        if (!FormatUtils.checkProjectName((String)projectName)) {
            throw new InvalidParameterException("ProjectName format is invalid");
        }
        if (!FormatUtils.checkTopicName((String)topicName)) {
            throw new InvalidParameterException("TopicName format is invalid");
        }
        if (StringUtils.isEmpty((CharSequence)subId)) {
            throw new InvalidParameterException("SubId format is invalid");
        }
        this.projectName = projectName;
        this.topicName = topicName;
        this.subId = subId;
    }

    private void preCheck(String projectName, String topicName, String subId, List<String> shardIds) {
        this.preCheck(projectName, topicName, subId);
        if (shardIds == null || shardIds.isEmpty()) {
            throw new InvalidParameterException("ShardIds must not be empty");
        }
    }

    private void preCheck(String projectName, String topicName, String subId, Map<String, Offset> offsetMap) {
        this.preCheck(projectName, topicName, subId);
        if (offsetMap == null || offsetMap.isEmpty()) {
            throw new InvalidParameterException("Offset map must not be empty");
        }
    }

    private void ackLast() {
        if (this.lastKey != null) {
            this.offsetCoordinator.ack(this.lastKey.getShardId(), this.lastKey.getKey());
            this.lastKey = null;
        }
    }

    private void syncAssignment() {
        block6: {
            Assignment newAssignment;
            if (this.shardCoordinator == null) {
                List<String> readEndShardList;
                if (this.shardReadEndCallback != null && !(readEndShardList = this.getReadEndShardList()).isEmpty()) {
                    this.shardReadEndCallback.onShardReadEnd(readEndShardList);
                }
                return;
            }
            if (this.shardCoordinator.rejoinIfNeeded()) {
                this.offsetCoordinator.setConsumerId(this.getConsumerId());
            }
            if ((newAssignment = this.shardCoordinator.getNewAssignment()).getVersion() != this.assignmentVersion) {
                this.offsetCoordinator.releaseOffsets(newAssignment);
                this.shardGroupReader.removeShardReader(newAssignment);
                try {
                    List<String> newShardList = newAssignment.getNewShardList(this.shardGroupReader.getShards());
                    Map<String, Offset> newOffsetMap = this.offsetCoordinator.openAndGetOffsets(newShardList);
                    this.shardGroupReader.createShardReader(newOffsetMap, false);
                    this.assignmentVersion = newAssignment.getVersion();
                }
                catch (DatahubClientException e) {
                    LOG.warn("create new reader failed, project: {}, topic: {}, subId: {}, consumer: {}", new Object[]{this.projectName, this.topicName, this.subId, this.getConsumerId(), e});
                    if (!ExceptionRetryer.isFatalException(e)) break block6;
                    throw e;
                }
            }
        }
        this.shardCoordinator.syncGroup(this.getReadEndShardList());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void leaveGroup() {
        if (this.shardCoordinator == null || this.offsetCoordinator == null || this.shardGroupReader == null) {
            return;
        }
        Object object = this.assignmentLock;
        synchronized (object) {
            this.shardCoordinator.leaveGroup();
            this.offsetCoordinator.removeAllOffsets();
            this.shardGroupReader.removeAllShardReader();
            this.assignmentVersion = -1L;
            this.commitVersion = -1L;
        }
    }

    private List<String> getReadEndShardList() {
        List<String> readEndShardList = Collections.emptyList();
        long lastCommitTime = this.offsetCoordinator.getLastCommitTime();
        if (lastCommitTime != this.commitVersion) {
            Map<String, Long> endSeqMap = this.shardGroupReader.getEndSeqMap();
            readEndShardList = this.offsetCoordinator.getReadEndShardList(endSeqMap);
            this.commitVersion = lastCommitTime;
        }
        return readEndShardList;
    }

    private class PeriodicCheckTask
    extends BackEndTask {
        private CheckTaskConfig config;
        private long lastReadTime;
        private long lastLogTime;
        private volatile boolean readCalled = false;

        PeriodicCheckTask(CheckTaskConfig config) {
            this.taskName = "periodic-check";
            this.config = config;
            this.lastReadTime = this.lastLogTime = System.currentTimeMillis();
        }

        public void touch() {
            this.readCalled = true;
        }

        private boolean needLeaveGroup(long now) {
            return Consumer.this.shardCoordinator != null && this.config.isAutoLeaveGroup() && now - this.lastReadTime > this.config.getLeaveGroupTimeoutMs();
        }

        @Override
        protected void run() {
            LOG.info("Periodic check task start, Project: {}, Topic: {}, SubId: {}, ConsumerId: {}, IntervalMs: {}", new Object[]{Consumer.this.projectName, Consumer.this.topicName, Consumer.this.subId, Consumer.this.getConsumerId(), this.config.getPeriodicCheckIntervalMs()});
            while (this.isRunning()) {
                try {
                    long now = System.currentTimeMillis();
                    if (this.readCalled) {
                        this.lastReadTime = now;
                    } else {
                        if (now - this.lastLogTime > 60000L) {
                            LOG.info("Consumer is not called for {}ms, Project: {}, Topic: {}, SubId: {}, ConsumerId: {}", new Object[]{now - this.lastReadTime, Consumer.this.projectName, Consumer.this.topicName, Consumer.this.subId, Consumer.this.getConsumerId()});
                            this.lastLogTime = now;
                        }
                        if (this.needLeaveGroup(now)) {
                            Consumer.this.leaveGroup();
                        }
                    }
                    this.readCalled = false;
                }
                catch (Throwable e) {
                    LOG.error("Periodic check failed, Project: {}, Topic: {}, SubId: {}, ConsumerId: {}", new Object[]{Consumer.this.projectName, Consumer.this.topicName, Consumer.this.subId, Consumer.this.getConsumerId(), e});
                }
                this.waitSignal(this.config.getPeriodicCheckIntervalMs());
            }
            LOG.info("Periodic check task stop, Project: {}, Topic: {}, SubId: {}, ConsumerId: {}", new Object[]{Consumer.this.projectName, Consumer.this.topicName, Consumer.this.subId, Consumer.this.getConsumerId()});
        }
    }
}

