/*
 * Decompiled with CFR 0.152.
 */
package com.aliyun.openservices.ots.internal.streamclient.core;

import com.aliyun.openservices.ots.internal.streamclient.DependencyException;
import com.aliyun.openservices.ots.internal.streamclient.StreamClientException;
import com.aliyun.openservices.ots.internal.streamclient.StreamConfig;
import com.aliyun.openservices.ots.internal.streamclient.core.DataFetcher;
import com.aliyun.openservices.ots.internal.streamclient.core.RecordProcessorCheckpointer;
import com.aliyun.openservices.ots.internal.streamclient.core.ShardSyncer;
import com.aliyun.openservices.ots.internal.streamclient.core.task.BlockOnParentShardTask;
import com.aliyun.openservices.ots.internal.streamclient.core.task.ITask;
import com.aliyun.openservices.ots.internal.streamclient.core.task.InitializeTask;
import com.aliyun.openservices.ots.internal.streamclient.core.task.ProcessTask;
import com.aliyun.openservices.ots.internal.streamclient.core.task.RetryingTaskDecorator;
import com.aliyun.openservices.ots.internal.streamclient.core.task.ShutdownTask;
import com.aliyun.openservices.ots.internal.streamclient.core.task.TaskResult;
import com.aliyun.openservices.ots.internal.streamclient.core.task.TaskType;
import com.aliyun.openservices.ots.internal.streamclient.lease.ShardLease;
import com.aliyun.openservices.ots.internal.streamclient.lease.interfaces.ILeaseManager;
import com.aliyun.openservices.ots.internal.streamclient.model.ICheckpointTracker;
import com.aliyun.openservices.ots.internal.streamclient.model.IRecordProcessor;
import com.aliyun.openservices.ots.internal.streamclient.model.IRetryStrategy;
import com.aliyun.openservices.ots.internal.streamclient.model.IShutdownMarker;
import com.aliyun.openservices.ots.internal.streamclient.model.ShardInfo;
import com.aliyun.openservices.ots.internal.streamclient.model.ShutdownReason;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class ShardConsumer {
    private static final Logger LOG = LoggerFactory.getLogger(ShardConsumer.class);
    private final ShardInfo shardInfo;
    private final StreamConfig streamConfig;
    private final IRecordProcessor recordProcessor;
    private final ILeaseManager<ShardLease> leaseManager;
    private final long parentShardPollIntervalMillis;
    private final ExecutorService executorService;
    private ICheckpointTracker checkpointTracker;
    private final RecordProcessorCheckpointer recordProcessorCheckpointer;
    private final DataFetcher dataFetcher;
    private final ShardSyncer shardSyncer;
    private final IRetryStrategy taskRetryStrategy;
    private ITask currentTask;
    private long currentTaskSubmitTime;
    private Future<TaskResult> future;
    private ShardConsumerState currentState = ShardConsumerState.WAITING_ON_PARENT_SHARDS;
    private boolean beginShutdown;
    private ShutdownReason shutdownReason;
    private IShutdownMarker shutdownMarker = new IShutdownMarker(){

        @Override
        public void markForProcessDone() {
            ShardConsumer.this.markForShutdown(ShutdownReason.PROCESS_DONE);
        }

        @Override
        public void markForProcessRestart() {
            ShardConsumer.this.markForShutdown(ShutdownReason.PROCESS_RESTART);
        }
    };

    public ShardConsumer(ShardInfo shardInfo, StreamConfig streamConfig, ICheckpointTracker checkpointTracker, IRecordProcessor recordProcessor, ILeaseManager<ShardLease> leaseManager, long parentShardPollIntervalMillis, ExecutorService executorService, ShardSyncer shardSyncer, IRetryStrategy taskRetryStrategy) {
        this.shardInfo = shardInfo;
        this.streamConfig = streamConfig;
        this.checkpointTracker = checkpointTracker;
        this.recordProcessor = recordProcessor;
        this.leaseManager = leaseManager;
        this.parentShardPollIntervalMillis = parentShardPollIntervalMillis;
        this.executorService = executorService;
        this.shardSyncer = shardSyncer;
        this.taskRetryStrategy = taskRetryStrategy;
        this.dataFetcher = new DataFetcher(streamConfig.getOTSClient(), shardInfo);
        this.recordProcessorCheckpointer = new RecordProcessorCheckpointer(shardInfo, checkpointTracker, this.dataFetcher);
    }

    public synchronized boolean consumeShard() throws StreamClientException, DependencyException {
        return this.checkAndSubmitNextTask();
    }

    synchronized boolean checkAndSubmitNextTask() throws StreamClientException, DependencyException {
        if (this.future != null && !this.future.isCancelled() && !this.future.isDone()) {
            return false;
        }
        boolean isPhaseCompleted = false;
        if (this.future != null && this.future.isDone()) {
            try {
                TaskResult result = this.future.get();
                if (result.getException() != null) {
                    LOG.error("ShardId: {}, Task: {}, Exception: {}.", new Object[]{this.shardInfo.getShardId(), this.currentTask.getTaskType(), result.getException()});
                    throw result.getException();
                }
                isPhaseCompleted = result.isPhaseCompleted();
                LOG.debug("PreviousTaskDone, ShardId: {}, Task: {}, IsPhaseCompleted: {}.", new Object[]{this.shardInfo.getShardId(), this.currentTask.getTaskType(), isPhaseCompleted});
            }
            catch (DependencyException e) {
                throw e;
            }
            catch (StreamClientException e) {
                throw e;
            }
            catch (Exception e) {
                throw new StreamClientException(e.getMessage(), e);
            }
        }
        boolean submittedNewTask = false;
        this.updateState(isPhaseCompleted);
        ITask nextTask = this.getNextTask();
        if (nextTask != null) {
            this.currentTask = nextTask;
            this.future = this.executorService.submit(this.currentTask);
            this.currentTaskSubmitTime = System.currentTimeMillis();
            submittedNewTask = true;
            LOG.debug("SubmitNewTask, ShardId: {}, Task: {}.", (Object)this.shardInfo.getShardId(), (Object)this.currentTask.getTaskType());
        }
        return submittedNewTask;
    }

    ITask getNextTask() {
        ITask task = null;
        RetryingTaskDecorator taskDecorated = null;
        switch (this.currentState) {
            case WAITING_ON_PARENT_SHARDS: {
                task = new BlockOnParentShardTask(this.shardInfo, this.leaseManager, this.checkpointTracker, this.parentShardPollIntervalMillis);
                taskDecorated = new RetryingTaskDecorator(IRetryStrategy.RetryableAction.TASK_BLOCK_ON_PARENT_SHARD, this.taskRetryStrategy, task);
                break;
            }
            case INITIALIZING: {
                task = new InitializeTask(this.shardInfo, this.recordProcessor, this.checkpointTracker, this.recordProcessorCheckpointer, this.dataFetcher, this.shutdownMarker);
                taskDecorated = new RetryingTaskDecorator(IRetryStrategy.RetryableAction.TASK_INITIALIZE, this.taskRetryStrategy, task);
                break;
            }
            case PROCESSING: {
                task = new ProcessTask(this.shardInfo, this.recordProcessor, this.recordProcessorCheckpointer, this.dataFetcher, this.streamConfig, this.shutdownMarker);
                taskDecorated = new RetryingTaskDecorator(IRetryStrategy.RetryableAction.TASK_PROCESS, this.taskRetryStrategy, task);
                break;
            }
            case SHUTTING_DOWN: {
                task = new ShutdownTask(this.shardInfo, this.recordProcessor, this.recordProcessorCheckpointer, this.shutdownReason, this.shardSyncer);
                taskDecorated = new RetryingTaskDecorator(IRetryStrategy.RetryableAction.TASK_SHUTDOWN, this.taskRetryStrategy, task);
                break;
            }
            case SHUTDOWN_COMPLETE: {
                break;
            }
        }
        return taskDecorated;
    }

    synchronized boolean beginShutdown(ShutdownReason reason) throws StreamClientException, DependencyException {
        if (!this.isShutdown()) {
            this.markForShutdown(reason);
            this.checkAndSubmitNextTask();
        }
        return this.isShutdown();
    }

    synchronized void markForShutdown(ShutdownReason reason) {
        this.beginShutdown = true;
        this.shutdownReason = reason;
    }

    public boolean isShutdown() {
        return this.currentState == ShardConsumerState.SHUTDOWN_COMPLETE;
    }

    public ShutdownReason getShutdownReason() {
        return this.shutdownReason;
    }

    void updateState(boolean isPhaseCompleted) {
        switch (this.currentState) {
            case WAITING_ON_PARENT_SHARDS: {
                if (isPhaseCompleted && TaskType.BLOCK_ON_PARENT_SHARDS.equals((Object)this.currentTask.getTaskType())) {
                    if (this.beginShutdown) {
                        this.currentState = ShardConsumerState.SHUTDOWN_COMPLETE;
                        break;
                    }
                    this.currentState = ShardConsumerState.INITIALIZING;
                    break;
                }
                if (this.currentTask != null || !this.beginShutdown) break;
                this.currentState = ShardConsumerState.SHUTDOWN_COMPLETE;
                break;
            }
            case INITIALIZING: {
                if (isPhaseCompleted && TaskType.INITIALIZE.equals((Object)this.currentTask.getTaskType())) {
                    if (this.beginShutdown) {
                        this.currentState = ShardConsumerState.SHUTTING_DOWN;
                        break;
                    }
                    this.currentState = ShardConsumerState.PROCESSING;
                    break;
                }
                if (this.currentTask != null || !this.beginShutdown) break;
                this.currentState = ShardConsumerState.SHUTDOWN_COMPLETE;
                break;
            }
            case PROCESSING: {
                if (!TaskType.PROCESS.equals((Object)this.currentTask.getTaskType())) break;
                if (this.beginShutdown) {
                    this.currentState = ShardConsumerState.SHUTTING_DOWN;
                    break;
                }
                if (!isPhaseCompleted) break;
                this.markForShutdown(ShutdownReason.TERMINATE);
                this.currentState = ShardConsumerState.SHUTTING_DOWN;
                break;
            }
            case SHUTTING_DOWN: {
                if (this.currentTask != null && (!isPhaseCompleted || !TaskType.SHUTDOWN.equals((Object)this.currentTask.getTaskType()))) break;
                this.currentState = ShardConsumerState.SHUTDOWN_COMPLETE;
                break;
            }
            case SHUTDOWN_COMPLETE: {
                break;
            }
            default: {
                LOG.error("Unexpected state: " + (Object)((Object)this.currentState));
            }
        }
    }

    ShardConsumerState getCurrentState() {
        return this.currentState;
    }

    static enum ShardConsumerState {
        WAITING_ON_PARENT_SHARDS,
        INITIALIZING,
        PROCESSING,
        SHUTTING_DOWN,
        SHUTDOWN_COMPLETE;

    }
}

