/*
 * Decompiled with CFR 0.152.
 */
package com.aliyun.datahub.clientlibrary.consumer;

import com.aliyun.datahub.client.DatahubClient;
import com.aliyun.datahub.client.exception.DatahubClientException;
import com.aliyun.datahub.client.model.SubscriptionOffset;
import com.aliyun.datahub.clientlibrary.common.BackEndTask;
import com.aliyun.datahub.clientlibrary.config.ConsumerConfig;
import com.aliyun.datahub.clientlibrary.exception.ClientException;
import com.aliyun.datahub.clientlibrary.exception.ExceptionRetryer;
import com.aliyun.datahub.clientlibrary.models.Assignment;
import com.aliyun.datahub.clientlibrary.models.Offset;
import com.aliyun.datahub.clientlibrary.models.OffsetManager;
import com.aliyun.datahub.clientlibrary.models.OffsetWrapper;
import com.aliyun.datahub.clientlibrary.models.RecordKeyImpl;
import com.aliyun.datahub.exception.OffsetSessionChangedException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class OffsetCoordinator {
    private static final Logger LOG = LoggerFactory.getLogger(OffsetCoordinator.class);
    private static final long MIN_ASYNC_COMMIT_INTERVAL_MS = 3000L;
    private String projectName;
    private String topicName;
    private String subId;
    private String consumerId;
    private ConsumerConfig config;
    private DatahubClient client;
    private CommitTask commitTask;
    private CheckpointTask checkpointTask;
    private volatile long lastCommitTime = -1L;
    private volatile long lastLogTime = -1L;
    private final OffsetManager offsetManager = new OffsetManager();
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private final Object commitLock = new Object();
    private final Object updateLock = new Object();

    OffsetCoordinator(String projectName, String topicName, String subId, ConsumerConfig consumerConfig, DatahubClient client, String consumerId) {
        this.config = consumerConfig;
        this.projectName = projectName;
        this.topicName = topicName;
        this.subId = subId;
        this.consumerId = consumerId;
        this.client = client;
        this.commitTask = new CommitTask();
        this.commitTask.start();
        if (!this.config.isAutoCommit()) {
            this.checkpointTask = new CheckpointTask();
            this.checkpointTask.start();
        }
    }

    void checkCommitTask() {
        if (!this.commitTask.checkRunning()) {
            this.commitTask = new CommitTask();
            this.commitTask.start();
        }
    }

    void check(String shardId, RecordKeyImpl key) {
        if (!this.config.isAutoCommit()) {
            OffsetWrapper offsetWrapper = this.offsetManager.getHeldOffset(shardId);
            if (offsetWrapper == null) {
                LOG.error("setOffset failed, invalid status, offset not match shard assigned, Project: {}, Topic: {}, SubId: {}, ConsumerId: {}, ShardId: {}", new Object[]{this.projectName, this.topicName, this.subId, this.consumerId, shardId});
                throw new ClientException("Invalid status, offset not match shard assigned");
            }
            offsetWrapper.getQueue().add(key);
        }
    }

    void ack(String shardId, RecordKeyImpl key) {
        if (this.config.isAutoCommit()) {
            this.updateOffset(shardId, key.getOffset());
        }
    }

    void updateOffset(String shardId, SubscriptionOffset offset) {
        OffsetWrapper offsetWrapper = this.offsetManager.getHeldOffset(shardId);
        if (offsetWrapper != null) {
            offsetWrapper.updateAckedOffset(offset);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    Map<String, Offset> openAndGetOffsets(List<String> shards) {
        HashMap<String, Offset> result = new HashMap<String, Offset>();
        List<String> shardIds = shards.stream().filter(shard -> !this.offsetManager.getHeldOffsets().keySet().contains(shard)).collect(Collectors.toList());
        if (!shardIds.isEmpty()) {
            Map<String, SubscriptionOffset> newOffsets = this.openSubscriptionSession(shardIds);
            Object object = this.updateLock;
            synchronized (object) {
                this.offsetManager.addOffsets(newOffsets);
            }
        }
        for (String shardId : shards) {
            SubscriptionOffset offset = this.offsetManager.getHeldOffset(shardId).getAckedOffset();
            result.put(shardId, this.getNextOffset(offset));
        }
        return result;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void releaseOffsets(Assignment assignment) {
        List<String> shards = assignment.getReleaseShardList(this.offsetManager.getHeldOffsets().keySet());
        if (!shards.isEmpty()) {
            Object object = this.updateLock;
            synchronized (object) {
                this.offsetManager.release(shards);
            }
            LOG.info("start releasing offsets, Project: {}, Topic: {}, SubId: {}, ConsumerId: {}, Shards: {}", new Object[]{this.projectName, this.topicName, this.subId, this.consumerId, shards.toString()});
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void removeAllOffsets() {
        Object object = this.updateLock;
        synchronized (object) {
            this.offsetManager.removeAll();
        }
    }

    List<String> getReadEndShardList(Map<String, Long> endSequenceMap) {
        if (endSequenceMap.isEmpty()) {
            return Collections.emptyList();
        }
        List<String> result = this.offsetManager.getReadEndShardList(endSequenceMap);
        if (result.size() < endSequenceMap.size()) {
            this.commitTask.triggerUpdate();
        }
        return result;
    }

    void setConsumerId(String consumerId) {
        this.consumerId = consumerId;
    }

    long getLastCommitTime() {
        return this.lastCommitTime;
    }

    void close() {
        if (this.commitTask != null) {
            this.commitTask.stop();
        }
        if (this.checkpointTask != null) {
            this.checkpointTask.stop();
            if (this.checkpointTask.waitStopped(2000L)) {
                this.offsetManager.getHeldOffsets().entrySet().parallelStream().forEach(e -> this.flushAckedOffset((String)e.getKey(), (OffsetWrapper)e.getValue(), null));
                this.offsetManager.getReleasingOffsets().entrySet().parallelStream().forEach(e -> this.flushAckedOffset((String)e.getKey(), (OffsetWrapper)e.getValue(), null));
            }
        }
        this.commitReleased();
        try {
            this.commit(true);
        }
        catch (DatahubClientException e2) {
            LOG.warn("Commit offset by `close` failed, Project: {}, Topic: {}, SubId: {}, ConsumerId: {}", new Object[]{this.projectName, this.topicName, this.subId, this.consumerId, e2});
        }
    }

    private Map<String, SubscriptionOffset> openSubscriptionSession(final List<String> shardIds) {
        return (Map)new ExceptionRetryer<Map<String, SubscriptionOffset>>(){

            @Override
            protected Map<String, SubscriptionOffset> func() {
                return OffsetCoordinator.this.client.openSubscriptionSession(OffsetCoordinator.this.projectName, OffsetCoordinator.this.topicName, OffsetCoordinator.this.subId, shardIds).getOffsets();
            }

            @Override
            protected void onExceedRetryLimit(DatahubClientException e) {
                LOG.error("Init offset failed, Project: {}, Topic: {}, SubId: {}, ConsumerId: {}", new Object[]{OffsetCoordinator.this.projectName, OffsetCoordinator.this.topicName, OffsetCoordinator.this.subId, OffsetCoordinator.this.consumerId, e});
            }

            @Override
            protected boolean isTerminated() {
                return OffsetCoordinator.this.closed.get();
            }
        }.run(3, 1000L);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean commit(boolean printLog) {
        Object object = this.commitLock;
        synchronized (object) {
            Map<String, SubscriptionOffset> offsetsToCommit;
            Object object2 = this.updateLock;
            synchronized (object2) {
                offsetsToCommit = this.offsetManager.getAckedOffsets();
            }
            try {
                return this.commit(offsetsToCommit, printLog);
            }
            catch (OffsetSessionChangedException e) {
                Object object3 = this.updateLock;
                synchronized (object3) {
                    if (this.offsetManager.getHeldOffsets().keySet().containsAll(offsetsToCommit.keySet())) {
                        throw e;
                    }
                }
                LOG.info("Offset session changed for removed shard, will retry, Project: {}, Topic: {}, SubId: {}, ConsumerId: {}, Shards: {}", new Object[]{this.projectName, this.topicName, this.subId, this.consumerId, offsetsToCommit.keySet().toString()});
                return false;
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean commitReleased() {
        Object object = this.commitLock;
        synchronized (object) {
            Map<String, SubscriptionOffset> offsetsToRelease;
            Object object2 = this.updateLock;
            synchronized (object2) {
                offsetsToRelease = this.offsetManager.getOffsetsToRelease();
            }
            if (offsetsToRelease.isEmpty()) {
                return true;
            }
            try {
                this.commit(offsetsToRelease, false);
                List<String> releaseShards = this.offsetManager.getReleasedShardList();
                if (!releaseShards.isEmpty()) {
                    LOG.info("Offset released, Project: {}, Topic: {}, SubId: {}, ConsumerId: {}, Shards: {}", new Object[]{this.projectName, this.topicName, this.subId, this.consumerId, releaseShards.toString()});
                }
            }
            catch (DatahubClientException e) {
                if (!ExceptionRetryer.canSwallow(e)) {
                    LOG.info("Remove releasing offset, Project: {}, Topic: {}, SubId: {}, ConsumerId: {}, Shards: {}", new Object[]{this.projectName, this.topicName, this.subId, this.consumerId, offsetsToRelease.keySet().toString(), e});
                    this.offsetManager.removeReleased(offsetsToRelease.keySet());
                }
            }
            catch (Throwable e) {
                LOG.info("Commit releasing offset fail, Project: {}, Topic: {}, SubId: {}, ConsumerId: {}, Shards: {}", new Object[]{this.projectName, this.topicName, this.subId, this.consumerId, offsetsToRelease.keySet().toString(), e});
            }
            return offsetsToRelease.isEmpty();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean commit(Map<String, SubscriptionOffset> offsetsToCommit, boolean printLog) {
        if (offsetsToCommit == null || offsetsToCommit.isEmpty()) {
            return false;
        }
        this.client.commitSubscriptionOffset(this.projectName, this.topicName, this.subId, offsetsToCommit);
        Object object = this.updateLock;
        synchronized (object) {
            this.offsetManager.setCommittedOffsets(offsetsToCommit);
        }
        long now = System.currentTimeMillis();
        if (printLog || now - this.lastLogTime > 60000L) {
            Object object2 = this.updateLock;
            synchronized (object2) {
                LOG.info("Committed, Project: {}, Topic: {}, SubId: {}, ConsumerId: {}, Offset: {}", new Object[]{this.projectName, this.topicName, this.subId, this.consumerId, this.offsetManager.getCommittedOffsets().toString()});
            }
            this.lastLogTime = now;
        }
        this.lastCommitTime = now;
        return true;
    }

    private Offset getNextOffset(SubscriptionOffset subscriptionOffset) {
        long sequence = subscriptionOffset.getSequence();
        long timestamp = subscriptionOffset.getTimestamp();
        if (timestamp <= 0L) {
            timestamp = 0L;
            sequence = 0L;
        } else if (sequence > -1L) {
            ++sequence;
        }
        return new Offset(sequence, timestamp);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void flushAckedOffset(String shardId, OffsetWrapper offsetWrapper, LoopFlag loopFlag) {
        if (offsetWrapper == null) {
            return;
        }
        Object object = offsetWrapper.getCheckLock();
        synchronized (object) {
            RecordKeyImpl key;
            ConcurrentLinkedQueue<RecordKeyImpl> queue = offsetWrapper.getQueue();
            RecordKeyImpl latestAck = null;
            while ((loopFlag == null || loopFlag.proceed()) && (key = queue.peek()) != null) {
                if (!key.isReady()) {
                    long now = System.currentTimeMillis();
                    if (key.getOffset().getSequence() == offsetWrapper.getLastUnReadySeq()) {
                        long interval = now - offsetWrapper.getLastCheckTime();
                        if (interval <= 60000L * (offsetWrapper.getWarningTimes().get() + 1L)) break;
                        LOG.warn("Record is not ack for {}ms, Project: {}, Topic: {}, SubId: {}, ConsumerId: {}, ShardId: {}, Seq: {}", new Object[]{interval, this.projectName, this.topicName, this.subId, this.consumerId, shardId, key.getOffset().getSequence()});
                        offsetWrapper.getWarningTimes().incrementAndGet();
                        break;
                    }
                    offsetWrapper.setLastUnReadySeq(key.getOffset().getSequence());
                    offsetWrapper.setLastCheckTime(now);
                    offsetWrapper.getWarningTimes().set(0L);
                    break;
                }
                latestAck = key;
                queue.poll();
            }
            if (latestAck != null) {
                offsetWrapper.updateAckedOffset(latestAck.getOffset());
            }
        }
    }

    private class CheckpointTask
    extends BackEndTask {
        CheckpointTask() {
            this.taskName = "checkpoint-task";
        }

        @Override
        protected void run() {
            LOG.info("Backend Checkpoint task start, Project: {}, Topic: {}, SubId: {}, ConsumerId: {}", new Object[]{OffsetCoordinator.this.projectName, OffsetCoordinator.this.topicName, OffsetCoordinator.this.subId, OffsetCoordinator.this.consumerId});
            while (this.isRunning()) {
                try {
                    OffsetCoordinator.this.offsetManager.getHeldOffsets().entrySet().parallelStream().forEach(e -> OffsetCoordinator.this.flushAckedOffset((String)e.getKey(), (OffsetWrapper)e.getValue(), this::isRunning));
                    OffsetCoordinator.this.offsetManager.getReleasingOffsets().entrySet().parallelStream().forEach(e -> OffsetCoordinator.this.flushAckedOffset((String)e.getKey(), (OffsetWrapper)e.getValue(), this::isRunning));
                    if (OffsetCoordinator.this.offsetManager.hasShardToRelease()) {
                        OffsetCoordinator.this.commitTask.triggerUpdate();
                    }
                    this.waitSignal(1000L);
                }
                catch (Throwable e2) {
                    LOG.error("Check failed, Project: {}, Topic: {}, SubId: {}, ConsumerId: {}", new Object[]{OffsetCoordinator.this.projectName, OffsetCoordinator.this.topicName, OffsetCoordinator.this.subId, OffsetCoordinator.this.consumerId, e2});
                }
            }
            LOG.info("Backend Checkpoint task stop, Project: {}, Topic: {}, SubId: {}, ConsumerId: {}", new Object[]{OffsetCoordinator.this.projectName, OffsetCoordinator.this.topicName, OffsetCoordinator.this.subId, OffsetCoordinator.this.consumerId});
        }
    }

    @FunctionalInterface
    static interface LoopFlag {
        public boolean proceed();
    }

    private class CommitTask
    extends BackEndTask {
        private volatile DatahubClientException exception = null;
        private int retryTime = 0;

        CommitTask() {
            this.taskName = "commit-task";
        }

        boolean checkRunning() {
            if (this.exception != null) {
                DatahubClientException ex = this.exception;
                this.exception = null;
                throw ex;
            }
            return !this.isStopped();
        }

        @Override
        protected void run() {
            LOG.info("Backend Commit task start, Project: {}, Topic: {}, SubId: {}, ConsumerId: {}, IntervalMs: {}", new Object[]{OffsetCoordinator.this.projectName, OffsetCoordinator.this.topicName, OffsetCoordinator.this.subId, OffsetCoordinator.this.consumerId, OffsetCoordinator.this.config.getOffsetCommitTimeoutMs()});
            while (this.isRunning()) {
                long interval = 3000L;
                try {
                    if (OffsetCoordinator.this.commitReleased() && OffsetCoordinator.this.commit(false)) {
                        interval = OffsetCoordinator.this.config.getOffsetCommitTimeoutMs();
                    }
                    this.retryTime = 0;
                }
                catch (DatahubClientException e) {
                    if (ExceptionRetryer.canRetry(e) && this.retryTime < 3) {
                        ++this.retryTime;
                        LOG.info("Committed failed, try next time, Project: {}, Topic: {}, SubId: {}, ConsumerId: {}", new Object[]{OffsetCoordinator.this.projectName, OffsetCoordinator.this.topicName, OffsetCoordinator.this.subId, OffsetCoordinator.this.consumerId, e});
                    } else if (!ExceptionRetryer.canSwallow(e)) {
                        this.exception = e;
                        LOG.error("Committed failed, Project: {}, Topic: {}, SubId: {}, ConsumerId: {}", new Object[]{OffsetCoordinator.this.projectName, OffsetCoordinator.this.topicName, OffsetCoordinator.this.subId, OffsetCoordinator.this.consumerId, e});
                    }
                }
                catch (Throwable e) {
                    LOG.error("Committed failed, Project: {}, Topic: {}, SubId: {}, ConsumerId: {}", new Object[]{OffsetCoordinator.this.projectName, OffsetCoordinator.this.topicName, OffsetCoordinator.this.subId, OffsetCoordinator.this.consumerId, e});
                    this.exception = new ClientException(e.getMessage());
                }
                if (this.exception != null) {
                    this.stop();
                    break;
                }
                this.waitSignal(interval, 3000L);
            }
            LOG.info("Backend Commit task stop, Project: {}, Topic: {}, SubId: {}, ConsumerId: {}", new Object[]{OffsetCoordinator.this.projectName, OffsetCoordinator.this.topicName, OffsetCoordinator.this.subId, OffsetCoordinator.this.consumerId});
        }
    }
}

