/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tephra;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.AbstractService;
import com.google.common.util.concurrent.Service;
import com.google.inject.Inject;
import it.unimi.dsi.fastutil.longs.LongArrayList;
import it.unimi.dsi.fastutil.longs.LongArraySet;
import it.unimi.dsi.fastutil.longs.LongList;
import it.unimi.dsi.fastutil.longs.LongListIterator;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.hadoop.conf.Configuration;
import org.apache.tephra.ChangeId;
import org.apache.tephra.InvalidTruncateTimeException;
import org.apache.tephra.Transaction;
import org.apache.tephra.TransactionNotInProgressException;
import org.apache.tephra.TransactionType;
import org.apache.tephra.TxConstants;
import org.apache.tephra.manager.InvalidTxList;
import org.apache.tephra.metrics.DefaultMetricsCollector;
import org.apache.tephra.metrics.MetricsCollector;
import org.apache.tephra.persist.NoOpTransactionStateStorage;
import org.apache.tephra.persist.TransactionEdit;
import org.apache.tephra.persist.TransactionLog;
import org.apache.tephra.persist.TransactionLogReader;
import org.apache.tephra.persist.TransactionSnapshot;
import org.apache.tephra.persist.TransactionStateStorage;
import org.apache.tephra.snapshot.SnapshotCodecProvider;
import org.apache.tephra.util.TxUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TransactionManager
extends AbstractService {
    private static final Logger LOG = LoggerFactory.getLogger(TransactionManager.class);
    private static final long SNAPSHOT_POLL_INTERVAL = 1000L;
    private static final long METRICS_POLL_INTERVAL = 10000L;
    private static final String DEFAULT_CLIENTID = "unknown";
    private final NavigableMap<Long, InProgressTx> inProgress = new ConcurrentSkipListMap<Long, InProgressTx>();
    private final InvalidTxList invalidTxList = new InvalidTxList();
    private final NavigableMap<Long, Set<ChangeId>> committedChangeSets = new ConcurrentSkipListMap<Long, Set<ChangeId>>();
    private final Map<Long, Set<ChangeId>> committingChangeSets = Maps.newConcurrentMap();
    private long readPointer;
    private long lastWritePointer;
    private MetricsCollector txMetricsCollector;
    private final TransactionStateStorage persistor;
    private final int cleanupInterval;
    private final int defaultTimeout;
    private final int defaultLongTimeout;
    private final int maxTimeout;
    private DaemonThreadExecutor cleanupThread = null;
    private volatile TransactionLog currentLog;
    private long lastSnapshotTime;
    private final long snapshotFrequencyInSeconds;
    private final int snapshotRetainCount;
    private DaemonThreadExecutor snapshotThread;
    private DaemonThreadExecutor metricsThread;
    private final ReentrantReadWriteLock logLock = new ReentrantReadWriteLock();
    private final Lock logReadLock = this.logLock.readLock();
    private final Lock logWriteLock = this.logLock.writeLock();
    private final long longTimeoutTolerance;

    public TransactionManager(Configuration config) {
        this(config, new NoOpTransactionStateStorage(new SnapshotCodecProvider(config)), new DefaultMetricsCollector());
    }

    @Inject
    public TransactionManager(Configuration conf, @Nonnull TransactionStateStorage persistor, MetricsCollector txMetricsCollector) {
        this.persistor = persistor;
        this.cleanupInterval = conf.getInt("data.tx.cleanup.interval", 10);
        this.maxTimeout = conf.getInt("data.tx.max.timeout", Integer.MAX_VALUE);
        this.defaultTimeout = conf.getInt("data.tx.timeout", 30);
        this.defaultLongTimeout = conf.getInt("data.tx.long.timeout", TxConstants.Manager.DEFAULT_TX_LONG_TIMEOUT);
        this.snapshotFrequencyInSeconds = conf.getLong("data.tx.snapshot.interval", 300L);
        this.snapshotRetainCount = Math.max(conf.getInt("data.tx.snapshot.retain", 10), 1);
        this.longTimeoutTolerance = conf.getLong("data.tx.long.timeout.tolerance", 10000L);
        this.txMetricsCollector = txMetricsCollector;
        this.txMetricsCollector.configure(conf);
        this.clear();
    }

    private void clear() {
        this.invalidTxList.clear();
        this.inProgress.clear();
        this.committedChangeSets.clear();
        this.committingChangeSets.clear();
        this.lastWritePointer = 0L;
        this.readPointer = 0L;
        this.lastSnapshotTime = 0L;
    }

    private boolean isStopping() {
        return Service.State.STOPPING.equals((Object)this.state());
    }

    public synchronized void doStart() {
        LOG.info("Starting transaction manager.");
        this.txMetricsCollector.start();
        this.persistor.startAndWait();
        try {
            this.persistor.setupStorage();
        }
        catch (IOException e) {
            Throwables.propagate((Throwable)e);
        }
        this.clear();
        this.recoverState();
        this.startCleanupThread();
        this.startSnapshotThread();
        this.startMetricsThread();
        this.initLog();
        if (this.lastWritePointer == 0L) {
            this.readPointer = this.lastWritePointer = this.getNextWritePointer();
        }
        this.notifyStarted();
    }

    private void initLog() {
        if (this.currentLog == null) {
            try {
                this.currentLog = this.persistor.createLog(System.currentTimeMillis());
            }
            catch (IOException ioe) {
                throw Throwables.propagate((Throwable)ioe);
            }
        }
    }

    private void startCleanupThread() {
        if (this.cleanupInterval <= 0 || this.defaultTimeout <= 0) {
            return;
        }
        LOG.info("Starting periodic timed-out transaction cleanup every " + this.cleanupInterval + " seconds with default timeout of " + this.defaultTimeout + " seconds.");
        this.cleanupThread = new DaemonThreadExecutor("tx-clean-timeout"){

            @Override
            public void doRun() {
                TransactionManager.this.cleanupTimedOutTransactions();
            }

            @Override
            public long getSleepMillis() {
                return TransactionManager.this.cleanupInterval * 1000;
            }
        };
        this.cleanupThread.start();
    }

    private void startSnapshotThread() {
        if (this.snapshotFrequencyInSeconds > 0L) {
            LOG.info("Starting periodic snapshot thread, frequency = " + this.snapshotFrequencyInSeconds + " seconds, location = " + this.persistor.getLocation());
            this.snapshotThread = new DaemonThreadExecutor("tx-snapshot"){

                @Override
                public void doRun() {
                    long currentTime = System.currentTimeMillis();
                    if (TransactionManager.this.lastSnapshotTime < currentTime - TransactionManager.this.snapshotFrequencyInSeconds * 1000L) {
                        try {
                            TransactionManager.this.doSnapshot(false);
                        }
                        catch (IOException ioe) {
                            LOG.error("Periodic snapshot failed!", (Throwable)ioe);
                        }
                    }
                }

                @Override
                protected void onShutdown() {
                    try {
                        LOG.info("Writing final snapshot prior to shutdown");
                        TransactionManager.this.doSnapshot(true);
                    }
                    catch (IOException ioe) {
                        LOG.error("Failed performing final snapshot on shutdown", (Throwable)ioe);
                    }
                }

                @Override
                public long getSleepMillis() {
                    return 1000L;
                }
            };
            this.snapshotThread.start();
        }
    }

    private void startMetricsThread() {
        LOG.info("Starting periodic Metrics Emitter thread, frequency = 10000");
        this.metricsThread = new DaemonThreadExecutor("tx-metrics"){

            @Override
            public void doRun() {
                TransactionManager.this.txMetricsCollector.gauge("committing.size", TransactionManager.this.committingChangeSets.size(), new String[0]);
                TransactionManager.this.txMetricsCollector.gauge("committed.size", TransactionManager.this.committedChangeSets.size(), new String[0]);
                TransactionManager.this.txMetricsCollector.gauge("inprogress.size", TransactionManager.this.inProgress.size(), new String[0]);
                TransactionManager.this.txMetricsCollector.gauge("invalid.size", TransactionManager.this.getInvalidSize(), new String[0]);
            }

            @Override
            protected void onShutdown() {
                TransactionManager.this.txMetricsCollector.gauge("committing.size", TransactionManager.this.committingChangeSets.size(), new String[0]);
                TransactionManager.this.txMetricsCollector.gauge("committed.size", TransactionManager.this.committedChangeSets.size(), new String[0]);
                TransactionManager.this.txMetricsCollector.gauge("inprogress.size", TransactionManager.this.inProgress.size(), new String[0]);
                TransactionManager.this.txMetricsCollector.gauge("invalid.size", TransactionManager.this.getInvalidSize(), new String[0]);
            }

            @Override
            public long getSleepMillis() {
                return 10000L;
            }
        };
        this.metricsThread.start();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void cleanupTimedOutTransactions() {
        ArrayList invalidEdits = null;
        this.logReadLock.lock();
        try {
            TransactionManager transactionManager = this;
            synchronized (transactionManager) {
                block13: {
                    if (this.isRunning()) break block13;
                    return;
                }
                long currentTime = System.currentTimeMillis();
                HashMap timedOut = Maps.newHashMap();
                for (Map.Entry tx : this.inProgress.entrySet()) {
                    InProgressTx inProgressTx = (InProgressTx)tx.getValue();
                    long expiration = inProgressTx.getExpiration();
                    if (expiration >= 0L && currentTime > expiration) {
                        timedOut.put(tx.getKey(), inProgressTx.getType());
                        LOG.info("Tx invalid list: added tx {} belonging to client '{}' because of timeout.", tx.getKey(), (Object)inProgressTx.getClientId());
                        continue;
                    }
                    if (expiration >= 0L) continue;
                    LOG.warn("Transaction {} has negative expiration time {}. Likely cause is the transaction was not migrated correctly, this transaction will be expired immediately", tx.getKey(), (Object)expiration);
                    timedOut.put(tx.getKey(), InProgressType.LONG);
                }
                if (!timedOut.isEmpty()) {
                    invalidEdits = Lists.newArrayListWithCapacity((int)timedOut.size());
                    this.invalidTxList.addAll(timedOut.keySet());
                    for (Map.Entry tx : timedOut.entrySet()) {
                        this.inProgress.remove(tx.getKey());
                        if (InProgressType.CHECKPOINT.equals(tx.getValue())) continue;
                        this.committingChangeSets.remove(tx.getKey());
                        invalidEdits.add(TransactionEdit.createInvalid((Long)tx.getKey()));
                    }
                    LOG.info("Invalidated {} transactions due to timeout.", (Object)timedOut.size());
                }
            }
            if (invalidEdits != null) {
                this.appendToLog(invalidEdits);
            }
        }
        finally {
            this.logReadLock.unlock();
        }
    }

    public synchronized TransactionSnapshot getSnapshot() throws IOException {
        TransactionSnapshot snapshot = null;
        if (!this.isRunning() && !this.isStopping()) {
            return null;
        }
        long now = System.currentTimeMillis();
        if (now == this.lastSnapshotTime || this.currentLog != null && now == this.currentLog.getTimestamp()) {
            try {
                TimeUnit.MILLISECONDS.sleep(1L);
            }
            catch (InterruptedException ie) {
                // empty catch block
            }
        }
        snapshot = this.getCurrentState();
        LOG.debug("Starting snapshot of transaction state with timestamp {}", (Object)snapshot.getTimestamp());
        LOG.debug("Returning snapshot of state: " + snapshot);
        return snapshot;
    }

    public boolean takeSnapshot(OutputStream out) throws IOException {
        TransactionSnapshot snapshot = this.getSnapshot();
        if (snapshot != null) {
            this.persistor.writeSnapshot(out, snapshot);
            return true;
        }
        return false;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void doSnapshot(boolean closing) throws IOException {
        long snapshotTime = 0L;
        TransactionSnapshot snapshot = null;
        TransactionLog oldLog = null;
        try {
            this.logWriteLock.lock();
            try {
                TransactionManager transactionManager = this;
                synchronized (transactionManager) {
                    block14: {
                        snapshot = this.getSnapshot();
                        if (snapshot != null || closing) break block14;
                        return;
                    }
                    if (snapshot != null) {
                        snapshotTime = snapshot.getTimestamp();
                    }
                    oldLog = this.currentLog;
                    if (!closing) {
                        this.currentLog = this.persistor.createLog(snapshot.getTimestamp());
                    }
                }
                if (oldLog != null) {
                    oldLog.close();
                }
            }
            finally {
                this.logWriteLock.unlock();
            }
            if (snapshot != null) {
                this.persistor.writeSnapshot(snapshot);
                this.lastSnapshotTime = snapshotTime;
                long oldestRetainedTimestamp = this.persistor.deleteOldSnapshots(this.snapshotRetainCount);
                this.persistor.deleteLogsOlderThan(oldestRetainedTimestamp);
            }
        }
        catch (IOException ioe) {
            this.abortService("Snapshot (timestamp " + snapshotTime + ") failed due to: " + ioe.getMessage(), ioe);
        }
    }

    public synchronized TransactionSnapshot getCurrentState() {
        return TransactionSnapshot.copyFrom(System.currentTimeMillis(), this.readPointer, this.lastWritePointer, (Collection<Long>)this.invalidTxList.toRawList(), this.inProgress, this.committingChangeSets, this.committedChangeSets);
    }

    public synchronized void recoverState() {
        try {
            List<TransactionLog> logs;
            TransactionSnapshot lastSnapshot = this.persistor.getLatestSnapshot();
            if (lastSnapshot != null) {
                this.restoreSnapshot(lastSnapshot);
            }
            if ((logs = this.persistor.getLogsSince(this.lastSnapshotTime)) != null) {
                this.replayLogs(logs);
            }
        }
        catch (IOException e) {
            LOG.error("Unable to read back transaction state:", (Throwable)e);
            throw Throwables.propagate((Throwable)e);
        }
    }

    private void restoreSnapshot(TransactionSnapshot snapshot) {
        LOG.info("Restoring transaction state from snapshot at " + snapshot.getTimestamp());
        Preconditions.checkState((this.lastSnapshotTime == 0L ? 1 : 0) != 0, (Object)"lastSnapshotTime has been set!");
        Preconditions.checkState((this.readPointer == 0L ? 1 : 0) != 0, (Object)"readPointer has been set!");
        Preconditions.checkState((this.lastWritePointer == 0L ? 1 : 0) != 0, (Object)"lastWritePointer has been set!");
        Preconditions.checkState((boolean)this.invalidTxList.isEmpty(), (Object)"invalid list should be empty!");
        Preconditions.checkState((boolean)this.inProgress.isEmpty(), (Object)"inProgress map should be empty!");
        Preconditions.checkState((boolean)this.committingChangeSets.isEmpty(), (Object)"committingChangeSets should be empty!");
        Preconditions.checkState((boolean)this.committedChangeSets.isEmpty(), (Object)"committedChangeSets should be empty!");
        LOG.info("Restoring snapshot of state: " + snapshot);
        this.lastSnapshotTime = snapshot.getTimestamp();
        this.readPointer = snapshot.getReadPointer();
        this.lastWritePointer = snapshot.getWritePointer();
        this.invalidTxList.addAll(snapshot.getInvalid());
        this.inProgress.putAll(TransactionManager.txnBackwardsCompatCheck(this.defaultLongTimeout, this.longTimeoutTolerance, snapshot.getInProgress()));
        this.committingChangeSets.putAll(snapshot.getCommittingChangeSets());
        this.committedChangeSets.putAll(snapshot.getCommittedChangeSets());
    }

    public static Map<Long, InProgressTx> txnBackwardsCompatCheck(int defaultLongTimeout, long longTimeoutTolerance, Map<Long, InProgressTx> inProgress) {
        for (Map.Entry<Long, InProgressTx> entry : inProgress.entrySet()) {
            long writePointer = entry.getKey();
            long expiration = entry.getValue().getExpiration();
            if (entry.getValue().getType() == null && (expiration < 0L || TransactionManager.getTxExpirationFromWritePointer(writePointer, defaultLongTimeout) - expiration < longTimeoutTolerance)) {
                long newExpiration = TransactionManager.getTxExpirationFromWritePointer(writePointer, defaultLongTimeout);
                InProgressTx compatTx = new InProgressTx(entry.getValue().getVisibilityUpperBound(), newExpiration, InProgressType.LONG, entry.getValue().getCheckpointWritePointers());
                entry.setValue(compatTx);
                continue;
            }
            if (entry.getValue().getType() != null) continue;
            InProgressTx compatTx = new InProgressTx(entry.getValue().getVisibilityUpperBound(), entry.getValue().getExpiration(), InProgressType.SHORT, entry.getValue().getCheckpointWritePointers());
            entry.setValue(compatTx);
        }
        return inProgress;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void resetState() {
        this.logWriteLock.lock();
        try {
            this.doSnapshot(false);
            this.clear();
            this.doSnapshot(false);
        }
        catch (IOException e) {
            LOG.error("Snapshot failed when resetting state!", (Throwable)e);
            e.printStackTrace();
        }
        finally {
            this.logWriteLock.unlock();
        }
    }

    private void replayLogs(Collection<TransactionLog> logs) {
        for (TransactionLog log : logs) {
            LOG.info("Replaying edits from transaction log " + log.getName());
            int editCnt = 0;
            try {
                TransactionLogReader reader = log.getReader();
                if (reader == null) continue;
                TransactionEdit edit = null;
                block13: while ((edit = reader.next()) != null) {
                    ++editCnt;
                    switch (edit.getState()) {
                        case INPROGRESS: {
                            long expiration = edit.getExpiration();
                            TransactionType type = edit.getType();
                            if (expiration < 0L) {
                                expiration = TransactionManager.getTxExpirationFromWritePointer(edit.getWritePointer(), this.defaultLongTimeout);
                                type = TransactionType.LONG;
                            } else if (type == null) {
                                type = TransactionType.SHORT;
                            }
                            this.addInProgressAndAdvance(edit.getWritePointer(), edit.getVisibilityUpperBound(), expiration, type, null);
                            continue block13;
                        }
                        case COMMITTING: {
                            this.addCommittingChangeSet(edit.getWritePointer(), edit.getChanges());
                            continue block13;
                        }
                        case COMMITTED: {
                            long transactionId = edit.getWritePointer();
                            long[] checkpointPointers = edit.getCheckpointPointers();
                            long writePointer = checkpointPointers == null || checkpointPointers.length == 0 ? transactionId : checkpointPointers[checkpointPointers.length - 1];
                            this.doCommit(transactionId, writePointer, edit.getChanges(), edit.getCommitPointer(), edit.getCanCommit());
                            continue block13;
                        }
                        case INVALID: {
                            this.doInvalidate(edit.getWritePointer());
                            continue block13;
                        }
                        case ABORTED: {
                            TransactionType type = edit.getType();
                            if (type == null) {
                                InProgressTx inProgressTx = (InProgressTx)this.inProgress.get(edit.getWritePointer());
                                if (inProgressTx != null) {
                                    InProgressType inProgressType = inProgressTx.getType();
                                    if (InProgressType.CHECKPOINT.equals((Object)inProgressType)) {
                                        LOG.debug("Ignoring ABORTED edit for a checkpoint transaction {}", (Object)edit.getWritePointer());
                                        continue block13;
                                    }
                                    if (inProgressType != null) {
                                        type = inProgressType.getTransactionType();
                                    }
                                } else {
                                    LOG.warn("Invalidating transaction {} as it's type cannot be determined during replay", (Object)edit.getWritePointer());
                                    this.doInvalidate(edit.getWritePointer());
                                    continue block13;
                                }
                            }
                            this.doAbort(edit.getWritePointer(), edit.getCheckpointPointers(), type);
                            continue block13;
                        }
                        case TRUNCATE_INVALID_TX: {
                            if (edit.getTruncateInvalidTxTime() != 0L) {
                                this.doTruncateInvalidTxBefore(edit.getTruncateInvalidTxTime());
                                continue block13;
                            }
                            this.doTruncateInvalidTx(edit.getTruncateInvalidTx());
                            continue block13;
                        }
                        case CHECKPOINT: {
                            this.doCheckpoint(edit.getWritePointer(), edit.getParentWritePointer());
                            continue block13;
                        }
                    }
                    throw new IllegalArgumentException("Invalid state for WAL entry: " + (Object)((Object)edit.getState()));
                }
            }
            catch (IOException ioe) {
                throw Throwables.propagate((Throwable)ioe);
            }
            catch (InvalidTruncateTimeException e) {
                throw Throwables.propagate((Throwable)e);
            }
            LOG.info("Read " + editCnt + " edits from log " + log.getName());
        }
    }

    public void doStop() {
        Stopwatch timer = new Stopwatch().start();
        LOG.info("Shutting down gracefully...");
        if (this.cleanupThread != null) {
            this.cleanupThread.shutdown();
            try {
                this.cleanupThread.join(30000L);
            }
            catch (InterruptedException ie) {
                LOG.warn("Interrupted waiting for cleanup thread to stop");
                Thread.currentThread().interrupt();
            }
        }
        if (this.metricsThread != null) {
            this.metricsThread.shutdown();
            try {
                this.metricsThread.join(30000L);
            }
            catch (InterruptedException ie) {
                LOG.warn("Interrupted waiting for cleanup thread to stop");
                Thread.currentThread().interrupt();
            }
        }
        if (this.snapshotThread != null) {
            this.snapshotThread.shutdown();
            try {
                this.snapshotThread.join(30000L);
            }
            catch (InterruptedException ie) {
                LOG.warn("Interrupted waiting for snapshot thread to stop");
                Thread.currentThread().interrupt();
            }
        }
        this.persistor.stopAndWait();
        this.txMetricsCollector.stop();
        timer.stop();
        LOG.info("Took " + timer + " to stop");
        this.notifyStopped();
    }

    private void abortService(String message, Throwable error) {
        if (this.isRunning()) {
            LOG.error("Aborting transaction manager due to: " + message, error);
            this.notifyFailed(error);
        }
    }

    private void ensureAvailable() {
        Preconditions.checkState((boolean)this.isRunning(), (Object)"Transaction Manager is not running.");
    }

    public Transaction startShort() {
        return this.startShort(this.defaultTimeout);
    }

    public Transaction startShort(String clientId) {
        return this.startShort(clientId, this.defaultTimeout);
    }

    public Transaction startShort(int timeoutInSeconds) {
        return this.startShort(DEFAULT_CLIENTID, timeoutInSeconds);
    }

    public Transaction startShort(String clientId, int timeoutInSeconds) {
        Preconditions.checkArgument((clientId != null ? 1 : 0) != 0, (Object)"clientId must not be null");
        Preconditions.checkArgument((timeoutInSeconds > 0 ? 1 : 0) != 0, (String)"timeout must be positive but is %s seconds", (Object[])new Object[]{timeoutInSeconds});
        Preconditions.checkArgument((timeoutInSeconds <= this.maxTimeout ? 1 : 0) != 0, (String)"timeout must not exceed %s seconds but is %s seconds", (Object[])new Object[]{this.maxTimeout, timeoutInSeconds});
        this.txMetricsCollector.rate("start.short");
        Stopwatch timer = new Stopwatch().start();
        long expiration = TransactionManager.getTxExpiration(timeoutInSeconds);
        Transaction tx = this.startTx(expiration, TransactionType.SHORT, clientId);
        this.txMetricsCollector.histogram("start.short.latency", (int)timer.elapsedMillis());
        return tx;
    }

    private static long getTxExpiration(long timeoutInSeconds) {
        long currentTime = System.currentTimeMillis();
        return currentTime + TimeUnit.SECONDS.toMillis(timeoutInSeconds);
    }

    public static long getTxExpirationFromWritePointer(long writePointer, long timeoutInSeconds) {
        return writePointer / 1000000L + TimeUnit.SECONDS.toMillis(timeoutInSeconds);
    }

    private long getNextWritePointer() {
        return Math.max(this.lastWritePointer + 1L, System.currentTimeMillis() * 1000000L);
    }

    public Transaction startLong() {
        return this.startLong(DEFAULT_CLIENTID);
    }

    public Transaction startLong(String clientId) {
        Preconditions.checkArgument((clientId != null ? 1 : 0) != 0, (Object)"clientId must not be null");
        this.txMetricsCollector.rate("start.long");
        Stopwatch timer = new Stopwatch().start();
        long expiration = TransactionManager.getTxExpiration(this.defaultLongTimeout);
        Transaction tx = this.startTx(expiration, TransactionType.LONG, clientId);
        this.txMetricsCollector.histogram("start.long.latency", (int)timer.elapsedMillis());
        return tx;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Transaction startTx(long expiration, TransactionType type, @Nullable String clientId) {
        Transaction tx = null;
        this.logReadLock.lock();
        try {
            TransactionManager transactionManager = this;
            synchronized (transactionManager) {
                this.ensureAvailable();
                long txid = this.getNextWritePointer();
                tx = this.createTransaction(txid, type);
                this.addInProgressAndAdvance(tx.getTransactionId(), tx.getVisibilityUpperBound(), expiration, type, clientId);
            }
            this.appendToLog(TransactionEdit.createStarted(tx.getTransactionId(), tx.getVisibilityUpperBound(), expiration, type));
        }
        finally {
            this.logReadLock.unlock();
        }
        return tx;
    }

    private void addInProgressAndAdvance(long writePointer, long visibilityUpperBound, long expiration, TransactionType type, @Nullable String clientId) {
        this.addInProgressAndAdvance(writePointer, visibilityUpperBound, expiration, InProgressType.of(type), clientId);
    }

    private void addInProgressAndAdvance(long writePointer, long visibilityUpperBound, long expiration, InProgressType type, @Nullable String clientId) {
        this.inProgress.put(writePointer, new InProgressTx(clientId, visibilityUpperBound, expiration, type));
        this.advanceWritePointer(writePointer);
    }

    private void advanceWritePointer(long writePointer) {
        if (writePointer > this.lastWritePointer) {
            this.lastWritePointer = writePointer;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean canCommit(Transaction tx, Collection<byte[]> changeIds) throws TransactionNotInProgressException {
        this.txMetricsCollector.rate("canCommit");
        Stopwatch timer = new Stopwatch().start();
        if (this.inProgress.get(tx.getTransactionId()) == null) {
            TransactionManager transactionManager = this;
            synchronized (transactionManager) {
                if (this.invalidTxList.contains(tx.getTransactionId())) {
                    throw new TransactionNotInProgressException(String.format("canCommit() is called for transaction %d that is not in progress (it is known to be invalid)", tx.getTransactionId()));
                }
                throw new TransactionNotInProgressException(String.format("canCommit() is called for transaction %d that is not in progress", tx.getTransactionId()));
            }
        }
        HashSet set = Sets.newHashSetWithExpectedSize((int)changeIds.size());
        for (byte[] change : changeIds) {
            set.add(new ChangeId(change));
        }
        if (this.hasConflicts(tx, set)) {
            return false;
        }
        this.logReadLock.lock();
        try {
            TransactionManager transactionManager = this;
            synchronized (transactionManager) {
                this.ensureAvailable();
                this.addCommittingChangeSet(tx.getTransactionId(), set);
            }
            this.appendToLog(TransactionEdit.createCommitting(tx.getTransactionId(), set));
        }
        finally {
            this.logReadLock.unlock();
        }
        this.txMetricsCollector.histogram("canCommit.latency", (int)timer.elapsedMillis());
        return true;
    }

    private void addCommittingChangeSet(long writePointer, Set<ChangeId> changes) {
        this.committingChangeSets.put(writePointer, changes);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public boolean commit(Transaction tx) throws TransactionNotInProgressException {
        this.txMetricsCollector.rate("commit");
        Stopwatch timer = new Stopwatch().start();
        Set<ChangeId> changeSet = null;
        boolean addToCommitted = true;
        this.logReadLock.lock();
        try {
            long commitPointer;
            TransactionManager transactionManager = this;
            synchronized (transactionManager) {
                this.ensureAvailable();
                commitPointer = this.lastWritePointer + 1L;
                if (this.inProgress.get(tx.getTransactionId()) == null) {
                    if (this.invalidTxList.contains(tx.getTransactionId())) {
                        throw new TransactionNotInProgressException(String.format("canCommit() is called for transaction %d that is not in progress (it is known to be invalid)", tx.getTransactionId()));
                    }
                    throw new TransactionNotInProgressException(String.format("canCommit() is called for transaction %d that is not in progress", tx.getTransactionId()));
                }
                changeSet = this.committingChangeSets.remove(tx.getTransactionId());
                if (changeSet != null) {
                    if (this.hasConflicts(tx, changeSet)) {
                        boolean bl = false;
                        return bl;
                    }
                } else {
                    addToCommitted = false;
                }
                this.doCommit(tx.getTransactionId(), tx.getWritePointer(), changeSet, commitPointer, addToCommitted);
            }
            this.appendToLog(TransactionEdit.createCommitted(tx.getTransactionId(), changeSet, commitPointer, addToCommitted));
        }
        finally {
            this.logReadLock.unlock();
        }
        this.txMetricsCollector.histogram("commit.latency", (int)timer.elapsedMillis());
        return true;
    }

    private void doCommit(long transactionId, long writePointer, Set<ChangeId> changes, long commitPointer, boolean addToCommitted) {
        InProgressTx previous;
        this.committingChangeSets.remove(transactionId);
        if (addToCommitted && !changes.isEmpty()) {
            Set changeIds = (Set)this.committedChangeSets.get(commitPointer);
            if (changeIds != null) {
                changes.addAll(changeIds);
            }
            this.committedChangeSets.put(commitPointer, changes);
        }
        if ((previous = (InProgressTx)this.inProgress.remove(transactionId)) == null) {
            if (this.invalidTxList.remove(transactionId)) {
                LOG.info("Tx invalid list: removed committed tx {}", (Object)transactionId);
            }
        } else {
            LongArrayList checkpointPointers = previous.getCheckpointWritePointers();
            if (!checkpointPointers.isEmpty()) {
                writePointer = checkpointPointers.getLong(checkpointPointers.size() - 1);
                this.inProgress.keySet().removeAll((Collection<?>)previous.getCheckpointWritePointers());
            }
        }
        this.moveReadPointerIfNeeded(writePointer);
        this.committedChangeSets.headMap(TxUtils.getFirstShortInProgress(this.inProgress)).clear();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void abort(Transaction tx) {
        this.txMetricsCollector.rate("abort");
        Stopwatch timer = new Stopwatch().start();
        this.logReadLock.lock();
        try {
            TransactionManager transactionManager = this;
            synchronized (transactionManager) {
                this.ensureAvailable();
                this.doAbort(tx.getTransactionId(), tx.getCheckpointWritePointers(), tx.getType());
            }
            this.appendToLog(TransactionEdit.createAborted(tx.getTransactionId(), tx.getType(), tx.getCheckpointWritePointers()));
            this.txMetricsCollector.histogram("abort.latency", (int)timer.elapsedMillis());
        }
        finally {
            this.logReadLock.unlock();
        }
    }

    private void doAbort(long writePointer, long[] checkpointWritePointers, TransactionType type) {
        this.committingChangeSets.remove(writePointer);
        if (type == TransactionType.LONG) {
            this.doInvalidate(writePointer);
            return;
        }
        InProgressTx removed = (InProgressTx)this.inProgress.remove(writePointer);
        boolean removeInProgressCheckpoints = true;
        if (removed == null && this.invalidTxList.remove(writePointer)) {
            removeInProgressCheckpoints = false;
            if (checkpointWritePointers != null) {
                for (long checkpointWritePointer : checkpointWritePointers) {
                    this.invalidTxList.remove(checkpointWritePointer);
                }
            }
            LOG.info("Tx invalid list: removed aborted tx {}", (Object)writePointer);
        }
        if (removeInProgressCheckpoints && checkpointWritePointers != null) {
            for (long checkpointWritePointer : checkpointWritePointers) {
                this.inProgress.remove(checkpointWritePointer);
            }
        }
        this.moveReadPointerIfNeeded(writePointer);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean invalidate(long tx) {
        this.txMetricsCollector.rate("invalidate");
        Stopwatch timer = new Stopwatch().start();
        this.logReadLock.lock();
        try {
            boolean success;
            TransactionManager transactionManager = this;
            synchronized (transactionManager) {
                this.ensureAvailable();
                success = this.doInvalidate(tx);
            }
            this.appendToLog(TransactionEdit.createInvalid(tx));
            this.txMetricsCollector.histogram("invalidate.latency", (int)timer.elapsedMillis());
            boolean bl = success;
            return bl;
        }
        finally {
            this.logReadLock.unlock();
        }
    }

    private boolean doInvalidate(long writePointer) {
        Set<ChangeId> previousChangeSet = this.committingChangeSets.remove(writePointer);
        InProgressTx previous = (InProgressTx)this.inProgress.remove(writePointer);
        if (previous != null || previousChangeSet != null) {
            this.invalidTxList.add(writePointer);
            if (previous == null) {
                LOG.debug("Invalidating tx {} in committing change sets but not in-progress", (Object)writePointer);
            } else {
                LongArrayList childWritePointers = previous.getCheckpointWritePointers();
                if (!childWritePointers.isEmpty()) {
                    this.invalidTxList.addAll((LongList)childWritePointers);
                    this.inProgress.keySet().removeAll((Collection<?>)childWritePointers);
                }
            }
            String clientId = DEFAULT_CLIENTID;
            if (previous != null && previous.getClientId() != null) {
                clientId = previous.getClientId();
            }
            LOG.info("Tx invalid list: added tx {} belonging to client '{}' because of invalidate", (Object)writePointer, (Object)clientId);
            if (previous != null && !previous.isLongRunning()) {
                this.moveReadPointerIfNeeded(writePointer);
            }
            return true;
        }
        return false;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean truncateInvalidTx(Set<Long> invalidTxIds) {
        this.txMetricsCollector.rate("truncateInvalidTx");
        Stopwatch timer = new Stopwatch().start();
        this.logReadLock.lock();
        try {
            boolean success;
            TransactionManager transactionManager = this;
            synchronized (transactionManager) {
                this.ensureAvailable();
                success = this.doTruncateInvalidTx(invalidTxIds);
            }
            this.appendToLog(TransactionEdit.createTruncateInvalidTx(invalidTxIds));
            this.txMetricsCollector.histogram("truncateInvalidTx.latency", (int)timer.elapsedMillis());
            boolean bl = success;
            return bl;
        }
        finally {
            this.logReadLock.unlock();
        }
    }

    private boolean doTruncateInvalidTx(Set<Long> toRemove) {
        LOG.info("Removing tx ids {} from invalid list", toRemove);
        return this.invalidTxList.removeAll(toRemove);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean truncateInvalidTxBefore(long time) throws InvalidTruncateTimeException {
        this.txMetricsCollector.rate("truncateInvalidTxBefore");
        Stopwatch timer = new Stopwatch().start();
        this.logReadLock.lock();
        try {
            boolean success;
            TransactionManager transactionManager = this;
            synchronized (transactionManager) {
                this.ensureAvailable();
                success = this.doTruncateInvalidTxBefore(time);
            }
            this.appendToLog(TransactionEdit.createTruncateInvalidTxBefore(time));
            this.txMetricsCollector.histogram("truncateInvalidTxBefore.latency", (int)timer.elapsedMillis());
            boolean bl = success;
            return bl;
        }
        finally {
            this.logReadLock.unlock();
        }
    }

    private boolean doTruncateInvalidTxBefore(long time) throws InvalidTruncateTimeException {
        LOG.info("Removing tx ids before {} from invalid list", (Object)time);
        long truncateWp = time * 1000000L;
        if (this.inProgress.lowerKey(truncateWp) != null) {
            throw new InvalidTruncateTimeException("Transactions started earlier than " + time + " are in-progress");
        }
        LongArraySet toTruncate = new LongArraySet();
        LongListIterator it = this.invalidTxList.toRawList().iterator();
        while (it.hasNext()) {
            long wp = it.nextLong();
            if (wp >= truncateWp) continue;
            toTruncate.add(wp);
        }
        LOG.info("Removing tx ids {} from invalid list", (Object)toTruncate);
        return this.invalidTxList.removeAll((Collection<? extends Long>)toTruncate);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Transaction checkpoint(Transaction originalTx) throws TransactionNotInProgressException {
        this.txMetricsCollector.rate("checkpoint");
        Stopwatch timer = new Stopwatch().start();
        Transaction checkpointedTx = null;
        long txId = originalTx.getTransactionId();
        long newWritePointer = 0L;
        this.logReadLock.lock();
        try {
            TransactionManager transactionManager = this;
            synchronized (transactionManager) {
                this.ensureAvailable();
                InProgressTx parentTx = (InProgressTx)this.inProgress.get(txId);
                if (parentTx == null) {
                    if (this.invalidTxList.contains(txId)) {
                        throw new TransactionNotInProgressException(String.format("Transaction %d is not in progress because it was invalidated", txId));
                    }
                    throw new TransactionNotInProgressException(String.format("Transaction %d is not in progress", txId));
                }
                newWritePointer = this.getNextWritePointer();
                this.doCheckpoint(newWritePointer, txId);
                checkpointedTx = new Transaction(originalTx, newWritePointer, parentTx.getCheckpointWritePointers().toLongArray());
            }
            this.appendToLog(TransactionEdit.createCheckpoint(newWritePointer, txId));
        }
        finally {
            this.logReadLock.unlock();
        }
        this.txMetricsCollector.histogram("checkpoint.latency", (int)timer.elapsedMillis());
        return checkpointedTx;
    }

    private void doCheckpoint(long newWritePointer, long parentWritePointer) {
        InProgressTx existingTx = (InProgressTx)this.inProgress.get(parentWritePointer);
        existingTx.addCheckpointWritePointer(newWritePointer);
        this.addInProgressAndAdvance(newWritePointer, existingTx.getVisibilityUpperBound(), existingTx.getExpiration(), InProgressType.CHECKPOINT, existingTx.getClientId());
    }

    public int getExcludedListSize() {
        return this.getInvalidSize() + this.inProgress.size();
    }

    public synchronized int getInvalidSize() {
        return this.invalidTxList.size();
    }

    int getCommittedSize() {
        return this.committedChangeSets.size();
    }

    private boolean hasConflicts(Transaction tx, Set<ChangeId> changeIds) {
        if (changeIds.isEmpty()) {
            return false;
        }
        for (Map.Entry changeSet : this.committedChangeSets.entrySet()) {
            if ((Long)changeSet.getKey() <= tx.getTransactionId() || !this.overlap((Set)changeSet.getValue(), changeIds)) continue;
            return true;
        }
        return false;
    }

    private boolean overlap(Set<ChangeId> a, Set<ChangeId> b) {
        if (a.size() > b.size()) {
            for (ChangeId change : b) {
                if (!a.contains(change)) continue;
                return true;
            }
        } else {
            for (ChangeId change : a) {
                if (!b.contains(change)) continue;
                return true;
            }
        }
        return false;
    }

    private void moveReadPointerIfNeeded(long committedWritePointer) {
        if (committedWritePointer > this.readPointer) {
            this.readPointer = committedWritePointer;
        }
    }

    private Transaction createTransaction(long writePointer, TransactionType type) {
        long firstShortTx = Long.MAX_VALUE;
        LongArrayList inProgressIds = new LongArrayList(this.inProgress.size());
        for (Map.Entry entry : this.inProgress.entrySet()) {
            long txId = (Long)entry.getKey();
            inProgressIds.add(txId);
            if (firstShortTx != Long.MAX_VALUE || ((InProgressTx)entry.getValue()).isLongRunning()) continue;
            firstShortTx = txId;
        }
        return new Transaction(this.readPointer, writePointer, this.invalidTxList.toSortedArray(), inProgressIds.toLongArray(), firstShortTx, type);
    }

    private void appendToLog(TransactionEdit edit) {
        try {
            Stopwatch timer = new Stopwatch().start();
            this.currentLog.append(edit);
            this.txMetricsCollector.rate("wal.append.count");
            this.txMetricsCollector.histogram("wal.append.latency", (int)timer.elapsedMillis());
        }
        catch (IOException ioe) {
            this.abortService("Error appending to transaction log", ioe);
        }
    }

    private void appendToLog(List<TransactionEdit> edits) {
        try {
            Stopwatch timer = new Stopwatch().start();
            this.currentLog.append(edits);
            this.txMetricsCollector.rate("wal.append.count", edits.size());
            this.txMetricsCollector.histogram("wal.append.latency", (int)timer.elapsedMillis());
        }
        catch (IOException ioe) {
            this.abortService("Error appending to transaction log", ioe);
        }
    }

    public void logStatistics() {
        LOG.info("Transaction Statistics: write pointer = " + this.lastWritePointer + ", invalid = " + this.getInvalidSize() + ", in progress = " + this.inProgress.size() + ", committing = " + this.committingChangeSets.size() + ", committed = " + this.committedChangeSets.size());
    }

    @VisibleForTesting
    public TransactionStateStorage getTransactionStateStorage() {
        return this.persistor;
    }

    public static final class InProgressTx {
        private final long visibilityUpperBound;
        private final long expiration;
        private final InProgressType type;
        private final LongArrayList checkpointWritePointers;
        private final String clientId;

        public InProgressTx(String clientId, long visibilityUpperBound, long expiration, InProgressType type) {
            this(clientId, visibilityUpperBound, expiration, type, new LongArrayList());
        }

        public InProgressTx(long visibilityUpperBound, long expiration, InProgressType type) {
            this(visibilityUpperBound, expiration, type, new LongArrayList());
        }

        public InProgressTx(String clientId, long visibilityUpperBound, long expiration, InProgressType type, LongArrayList checkpointWritePointers) {
            this.visibilityUpperBound = visibilityUpperBound;
            this.expiration = expiration;
            this.type = type;
            this.checkpointWritePointers = checkpointWritePointers;
            this.clientId = clientId;
        }

        public InProgressTx(long visibilityUpperBound, long expiration, InProgressType type, LongArrayList checkpointWritePointers) {
            this.visibilityUpperBound = visibilityUpperBound;
            this.expiration = expiration;
            this.type = type;
            this.checkpointWritePointers = checkpointWritePointers;
            this.clientId = null;
        }

        @Deprecated
        public InProgressTx(long visibilityUpperBound, long expiration) {
            this(visibilityUpperBound, expiration, null);
        }

        public long getVisibilityUpperBound() {
            return this.visibilityUpperBound;
        }

        public long getExpiration() {
            return this.expiration;
        }

        @Nullable
        public InProgressType getType() {
            return this.type;
        }

        @Nullable
        public String getClientId() {
            return this.clientId;
        }

        public boolean isLongRunning() {
            if (this.type == null) {
                return this.expiration == -1L;
            }
            return this.type == InProgressType.LONG;
        }

        public void addCheckpointWritePointer(long checkpointWritePointer) {
            this.checkpointWritePointers.add(checkpointWritePointer);
        }

        public LongArrayList getCheckpointWritePointers() {
            return this.checkpointWritePointers;
        }

        public boolean equals(Object o) {
            if (o == null || !(o instanceof InProgressTx)) {
                return false;
            }
            if (this == o) {
                return true;
            }
            InProgressTx other = (InProgressTx)o;
            return Objects.equal((Object)this.visibilityUpperBound, (Object)other.getVisibilityUpperBound()) && Objects.equal((Object)this.expiration, (Object)other.getExpiration()) && Objects.equal((Object)((Object)this.type), (Object)((Object)other.type)) && Objects.equal((Object)this.checkpointWritePointers, (Object)other.checkpointWritePointers);
        }

        public int hashCode() {
            return Objects.hashCode((Object[])new Object[]{this.visibilityUpperBound, this.expiration, this.type, this.checkpointWritePointers});
        }

        public String toString() {
            return Objects.toStringHelper((Object)this).add("visibilityUpperBound", this.visibilityUpperBound).add("expiration", this.expiration).add("type", (Object)this.type).add("checkpointWritePointers", (Object)this.checkpointWritePointers).add("clientId", (Object)this.clientId).toString();
        }
    }

    public static enum InProgressType {
        SHORT(TransactionType.SHORT),
        LONG(TransactionType.LONG),
        CHECKPOINT(null);

        private final TransactionType transactionType;

        private InProgressType(TransactionType transactionType) {
            this.transactionType = transactionType;
        }

        public static InProgressType of(TransactionType type) {
            switch (type) {
                case SHORT: {
                    return SHORT;
                }
                case LONG: {
                    return LONG;
                }
            }
            throw new IllegalArgumentException("Unknown TransactionType " + type);
        }

        @Nullable
        public TransactionType getTransactionType() {
            return this.transactionType;
        }
    }

    private static abstract class DaemonThreadExecutor
    extends Thread {
        private AtomicBoolean stopped = new AtomicBoolean(false);

        public DaemonThreadExecutor(String name) {
            super(name);
            this.setDaemon(true);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         * Enabled aggressive block sorting
         * Enabled unnecessary exception pruning
         * Enabled aggressive exception aggregation
         */
        @Override
        public void run() {
            try {
                while (!this.isInterrupted() && !this.stopped.get()) {
                    this.doRun();
                    AtomicBoolean atomicBoolean = this.stopped;
                    synchronized (atomicBoolean) {
                        this.stopped.wait(this.getSleepMillis());
                    }
                }
            }
            catch (InterruptedException ie) {
                LOG.info("Interrupted thread " + this.getName());
            }
            this.onShutdown();
            LOG.info("Exiting thread " + this.getName());
        }

        public abstract void doRun();

        protected abstract long getSleepMillis();

        protected void onShutdown() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void shutdown() {
            if (this.stopped.compareAndSet(false, true)) {
                AtomicBoolean atomicBoolean = this.stopped;
                synchronized (atomicBoolean) {
                    this.stopped.notifyAll();
                }
            }
        }
    }
}

