/*
 * Decompiled with CFR 0.152.
 */
package com.aliyun.datahub.clientlibrary.consumer;

import com.aliyun.datahub.client.exception.DatahubClientException;
import com.aliyun.datahub.client.exception.ExpiredAccessTokenException;
import com.aliyun.datahub.client.exception.InvalidParameterException;
import com.aliyun.datahub.client.exception.SeekOutOfRangeException;
import com.aliyun.datahub.client.exception.ShardNotFoundException;
import com.aliyun.datahub.client.exception.ShardSealedException;
import com.aliyun.datahub.client.model.CursorType;
import com.aliyun.datahub.client.model.GetCursorResult;
import com.aliyun.datahub.client.model.GetRecordsResult;
import com.aliyun.datahub.client.model.RecordEntry;
import com.aliyun.datahub.client.model.ShardState;
import com.aliyun.datahub.clientlibrary.common.ClientHelper;
import com.aliyun.datahub.clientlibrary.config.ConsumerConfig;
import com.aliyun.datahub.clientlibrary.exception.ExceptionRetryer;
import com.aliyun.datahub.clientlibrary.interceptor.ReadInterceptor;
import com.aliyun.datahub.clientlibrary.models.Offset;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ShardReader {
    private static final Logger LOG = LoggerFactory.getLogger(ShardReader.class);
    private ConsumerConfig config;
    private ClientHelper clientHelper;
    private String projectName;
    private String topicName;
    private String shardId;
    private volatile Offset offset;
    private ExecutorService executor;
    private volatile String cursor;
    private volatile Future currentTask;
    private volatile boolean fetchEnd = false;
    private volatile boolean readEnd = false;
    private volatile DatahubClientException exception;
    private volatile long endSequence = -1L;
    private final ConcurrentLinkedQueue<RecordEntry> fetchedQueue = new ConcurrentLinkedQueue();
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private final ReadInterceptor interceptor;
    private final AtomicInteger queueSize = new AtomicInteger(0);
    private final Object emptyCond;
    private final AtomicInteger feasibleFetchSize;
    private final AtomicBoolean needMore = new AtomicBoolean(false);
    private volatile boolean fetchLatest = false;
    private volatile boolean shardNotFound = false;
    private String subId = "";

    ShardReader(String projectName, String topicName, String shardId, Offset offset, ConsumerConfig config, ExecutorService executor, ClientHelper clientHelper, Object emptyCond, ReadInterceptor interceptor) {
        this.projectName = projectName;
        this.topicName = topicName;
        this.shardId = shardId;
        this.offset = offset;
        this.config = config;
        this.executor = executor;
        this.currentTask = null;
        this.clientHelper = clientHelper;
        this.emptyCond = emptyCond;
        this.feasibleFetchSize = new AtomicInteger(config.getFetchSize());
        this.interceptor = interceptor;
        if (offset.hasCursor()) {
            this.cursor = offset.getCursor();
        }
    }

    String getShardId() {
        return this.shardId;
    }

    boolean isClosed() {
        return this.closed.get();
    }

    void initCursor() {
        if (this.cursor == null) {
            this.cursor = this.seekCursor(this.offset);
        }
    }

    RecordEntry read() {
        if (this.closed.get() || this.isReadEnd()) {
            return null;
        }
        this.readEnd = this.fetchEnd && this.isShardClosed() && this.fetchedQueue.isEmpty();
        this.fetchIfNeeded();
        RecordEntry result = this.fetchedQueue.peek();
        if (result != null) {
            this.fetchedQueue.poll();
            this.queueSize.decrementAndGet();
            this.fetchIfNeeded();
            return result;
        }
        if (this.exception != null) {
            DatahubClientException ex = this.exception;
            this.exception = null;
            throw ex;
        }
        return null;
    }

    boolean isReadEnd() {
        return this.readEnd && this.isShardClosed();
    }

    long getEndSequence() {
        return this.endSequence;
    }

    long frontRecordTime() {
        if (this.fetchedQueue.isEmpty() || this.fetchedQueue.peek() == null) {
            return Long.MIN_VALUE;
        }
        return this.fetchedQueue.peek().getSystemTime();
    }

    void close() {
        if (this.closed.compareAndSet(false, true) && this.currentTask != null && !this.currentTask.isDone()) {
            this.currentTask.cancel(false);
        }
    }

    void setSubId(String subId) {
        this.subId = subId;
    }

    private ShardState getShardState() {
        return this.clientHelper.getShardManager().getShardMeta().getStateMap().get(this.shardId);
    }

    private boolean isShardClosed() {
        return ShardState.CLOSED.equals((Object)this.getShardState());
    }

    private boolean isShardInactive() {
        return ShardState.INACTIVE.equals((Object)this.getShardState());
    }

    private void fetchIfNeeded() {
        if (this.closed.get() || this.isShardInactive() || this.fetchEnd && this.isShardClosed()) {
            return;
        }
        this.feasibleFetchSize.set(Math.min(this.feasibleFetchSize.get(), this.config.getFetchSize()));
        if (this.queueSize.get() >= this.feasibleFetchSize.get()) {
            return;
        }
        this.needMore.set(true);
        if (this.isTaskRunning()) {
            return;
        }
        this.needMore.set(false);
        this.submitFetchTask();
    }

    private void submitFetchTask() {
        block4: {
            if (this.shardNotFound) {
                boolean bl = this.shardNotFound = !this.clientHelper.getShardManager().getShardMeta().getShardIds().contains(this.shardId);
                if (this.shardNotFound) {
                    return;
                }
            }
            FetchTask task = new FetchTask(this.config.getFetchSize());
            try {
                this.currentTask = this.executor.submit(task);
            }
            catch (RejectedExecutionException e) {
                if (this.closed.get()) break block4;
                LOG.warn("Submit fetch task failed, Project: {}, Topic: {}, ShardId: {}, SubId: {}, Exception: {}", new Object[]{this.projectName, this.topicName, this.shardId, this.subId, e.getMessage()});
            }
        }
    }

    private String seekCursor(Offset offset) {
        if (offset.isInvalid()) {
            throw new InvalidParameterException("Sequence and system time are all invalid");
        }
        CursorType cursorType = offset.hasSequence() ? CursorType.SEQUENCE : CursorType.SYSTEM_TIME;
        long param = cursorType.equals((Object)CursorType.SEQUENCE) ? offset.getSequence() : offset.getTimestamp();
        try {
            return this.getCursor(cursorType, param, 3);
        }
        catch (SeekOutOfRangeException e) {
            if (cursorType.equals((Object)CursorType.SYSTEM_TIME) || !offset.hasTimestamp()) {
                throw e;
            }
            LOG.warn("Get cursor by sequence failed, use system time instead, Project: {}, Topic: {}, ShardId: {}, SubId: {}, Exception: {}", new Object[]{this.projectName, this.topicName, this.shardId, this.subId, e.getMessage()});
            return this.getCursor(CursorType.SYSTEM_TIME, offset.getTimestamp(), 3);
        }
    }

    private String getCursor(final CursorType cursorType, final long param, int retry) {
        return (String)new ExceptionRetryer<String>(){

            @Override
            protected String func() {
                LOG.info("Get cursor, Project: {}, Topic: {}, ShardId: {}, SubId: {}, CursorType: {}, Param: {}", new Object[]{ShardReader.this.projectName, ShardReader.this.topicName, ShardReader.this.shardId, ShardReader.this.subId, cursorType.name(), param});
                GetCursorResult getCursorResult = ShardReader.this.clientHelper.getDataClient().getCursor(ShardReader.this.shardId, cursorType, param);
                LOG.info("Get cursor result, Project: {}, Topic: {}, ShardId: {}, SubId: {}, Cursor: {}, Sequence: {}, Timestamp: {}", new Object[]{ShardReader.this.projectName, ShardReader.this.topicName, ShardReader.this.shardId, ShardReader.this.subId, getCursorResult.getCursor(), getCursorResult.getSequence(), getCursorResult.getTimestamp()});
                return getCursorResult.getCursor();
            }

            @Override
            protected void onExceedRetryLimit(DatahubClientException e) {
                if (!ShardReader.this.closed.get()) {
                    LOG.error("Get cursor failed, Project: {}, Topic: {}, ShardId: {}, SubId: {}, CursorType: {}, Param: {}", new Object[]{ShardReader.this.projectName, ShardReader.this.topicName, ShardReader.this.shardId, ShardReader.this.subId, cursorType.name(), param, e});
                }
            }

            @Override
            protected boolean isTerminated() {
                return ShardReader.this.closed.get();
            }
        }.run(retry, 1000L);
    }

    private boolean isTaskRunning() {
        if (this.currentTask != null && this.currentTask.isDone()) {
            try {
                this.currentTask.get();
            }
            catch (InterruptedException | CancellationException exception) {
            }
            catch (ExecutionException e) {
                LOG.error("Fetch task failed, Project: {}, Topic: {}, ShardId: {}, SubId: {}", new Object[]{this.projectName, this.topicName, this.shardId, this.subId, e});
                this.exception = new DatahubClientException(e.getMessage());
            }
            this.currentTask = null;
        }
        return this.currentTask != null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void wakeEmpty() {
        Object object = this.emptyCond;
        synchronized (object) {
            this.emptyCond.notifyAll();
        }
    }

    private GetRecordsResult fetchRecords(final int fetchSizeOnce) {
        return (GetRecordsResult)new ExceptionRetryer<GetRecordsResult>(){

            @Override
            protected GetRecordsResult func() {
                return ShardReader.this.clientHelper.getDataClient().getRecords(ShardReader.this.shardId, ShardReader.this.cursor, fetchSizeOnce);
            }

            @Override
            protected void onExceedRetryLimit(DatahubClientException e) {
                if (!ShardReader.this.closed.get()) {
                    LOG.error("Fetch task failed, Project: {}, Topic: {}, ShardId: {}, SubId: {}", new Object[]{ShardReader.this.projectName, ShardReader.this.topicName, ShardReader.this.shardId, ShardReader.this.subId, e});
                }
            }

            @Override
            protected boolean isTerminated() {
                return ShardReader.this.closed.get();
            }
        }.run(30, 1000L);
    }

    private void fetch(int fetchSize) {
        try {
            GetRecordsResult getRecordsResult;
            this.initCursor();
            if (this.fetchLatest) {
                this.fetchLatest = false;
                Thread.sleep(500L);
            }
            if ((getRecordsResult = this.fetchRecords(fetchSize)).getRecordCount() > 0) {
                this.fetchedQueue.addAll(this.interceptor.afterRead(getRecordsResult.getRecords()));
                RecordEntry lastRecord = (RecordEntry)getRecordsResult.getRecords().get(getRecordsResult.getRecords().size() - 1);
                this.queueSize.addAndGet(getRecordsResult.getRecordCount());
                this.endSequence = lastRecord.getSequence();
                this.feasibleFetchSize.set((getRecordsResult.getRecordCount() + this.feasibleFetchSize.get() + this.config.getFetchSize()) / 3);
                this.fetchEnd = false;
                this.readEnd = false;
                this.wakeEmpty();
                LOG.debug("Fetch records, Project: {}, Topic: {}, ShardId: {}, SubId: {}, Num: {}, QueueSize: {}", new Object[]{this.projectName, this.topicName, this.shardId, this.subId, getRecordsResult.getRecordCount(), this.queueSize.get()});
            } else if (this.cursor.equals(getRecordsResult.getNextCursor())) {
                this.fetchLatest = true;
            }
            this.cursor = getRecordsResult.getNextCursor();
        }
        catch (InvalidParameterException e) {
            if ("InvalidCursor".equalsIgnoreCase(e.getErrorCode())) {
                LOG.warn("Cursor is expired, seek to OLDEST, Project: {}, Topic: {}, ShardId: {}, SubId: {}, Cursor: {}, Exception: {}", new Object[]{this.projectName, this.topicName, this.shardId, this.subId, this.cursor, e.getMessage()});
                try {
                    this.cursor = this.getCursor(CursorType.OLDEST, 0L, 1);
                }
                catch (DatahubClientException datahubClientException) {}
            }
            throw e;
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
    }

    private class FetchTask
    implements Runnable {
        private int fetchSize;

        FetchTask(int fetchSize) {
            this.fetchSize = fetchSize;
        }

        @Override
        public void run() {
            block9: {
                try {
                    ShardReader.this.fetch(this.fetchSize);
                    ShardReader.this.exception = null;
                }
                catch (ShardSealedException e) {
                    ShardReader.this.clientHelper.getShardManager().triggerUpdateAndWait();
                    LOG.info("Fetch end of shard, Project: {}, Topic: {}, ShardId: {}, SubId: {}, endSeq: {}", new Object[]{ShardReader.this.projectName, ShardReader.this.topicName, ShardReader.this.shardId, ShardReader.this.subId, ShardReader.this.endSequence});
                    ShardReader.this.fetchEnd = true;
                }
                catch (ExpiredAccessTokenException e) {
                    LOG.info("Refresh token, Project: {}, Topic: {}, ShardId: {}, SubId: {}", new Object[]{ShardReader.this.projectName, ShardReader.this.topicName, ShardReader.this.shardId, ShardReader.this.subId});
                }
                catch (ShardNotFoundException e) {
                    ShardReader.this.clientHelper.getShardManager().triggerUpdateAndWait();
                    ShardReader.this.shardNotFound = !ShardReader.this.clientHelper.getShardManager().getShardMeta().getShardIds().contains(ShardReader.this.shardId);
                    LOG.info("Trigger refresh shard info, Project: {}, Topic: {}, ShardId: {}, SubId: {}, Msg: {}", new Object[]{ShardReader.this.projectName, ShardReader.this.topicName, ShardReader.this.shardId, ShardReader.this.subId, e.getErrorMessage()});
                }
                catch (DatahubClientException e) {
                    if (!ShardReader.this.closed.get()) {
                        LOG.warn("Fetch records fail, Project: {}, Topic: {}, ShardId: {}, SubId: {}", new Object[]{ShardReader.this.projectName, ShardReader.this.topicName, ShardReader.this.shardId, ShardReader.this.subId, e});
                        if (!ExceptionRetryer.canSwallow(e)) {
                            ShardReader.this.exception = e;
                        }
                    }
                }
                catch (Exception e) {
                    if (ShardReader.this.closed.get()) break block9;
                    LOG.error("Fetch records fail, Project: {}, Topic: {}, ShardId: {}, SubId: {}", new Object[]{ShardReader.this.projectName, ShardReader.this.topicName, ShardReader.this.shardId, ShardReader.this.subId, e});
                    ShardReader.this.exception = new DatahubClientException(e.getMessage());
                }
            }
            if (ShardReader.this.needMore.compareAndSet(true, false)) {
                ShardReader.this.submitFetchTask();
            }
        }
    }
}

