/*
 * Decompiled with CFR 0.152.
 */
package net.opentsdb.core;

import com.stumbleupon.async.Callback;
import com.stumbleupon.async.Deferred;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.PriorityQueue;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import net.opentsdb.core.AppendDataPoints;
import net.opentsdb.core.ByteBufferList;
import net.opentsdb.core.ColumnDatapointIterator;
import net.opentsdb.core.Const;
import net.opentsdb.core.IllegalDataException;
import net.opentsdb.core.Internal;
import net.opentsdb.core.TSDB;
import net.opentsdb.meta.Annotation;
import net.opentsdb.stats.StatsCollector;
import net.opentsdb.utils.JSON;
import org.hbase.async.Bytes;
import org.hbase.async.HBaseRpc;
import org.hbase.async.KeyValue;
import org.hbase.async.PleaseThrottleException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class CompactionQueue
extends ConcurrentSkipListMap<byte[], Boolean> {
    private static final Logger LOG = LoggerFactory.getLogger(CompactionQueue.class);
    private final AtomicInteger size = new AtomicInteger();
    private final AtomicLong duplicates_different = new AtomicLong();
    private final AtomicLong duplicates_same = new AtomicLong();
    private final AtomicLong compaction_count = new AtomicLong();
    private final AtomicLong written_cells = new AtomicLong();
    private final AtomicLong deleted_cells = new AtomicLong();
    private final TSDB tsdb;
    private final short metric_width;
    private final int flush_interval;
    private final int min_flush_threshold;
    private final int max_concurrent_flushes;
    private final int flush_speed;
    private final CompactCB compactcb = new CompactCB();
    private final HandleErrorCB handle_read_error = new HandleErrorCB("read");
    private final HandleErrorCB handle_write_error = new HandleErrorCB("write");
    private final HandleErrorCB handle_delete_error = new HandleErrorCB("delete");
    static final long serialVersionUID = 1307386642L;

    public CompactionQueue(TSDB tsdb) {
        super(new Cmp(tsdb));
        this.tsdb = tsdb;
        this.metric_width = tsdb.metrics.width();
        this.flush_interval = tsdb.config.getInt("tsd.storage.compaction.flush_interval");
        this.min_flush_threshold = tsdb.config.getInt("tsd.storage.compaction.min_flush_threshold");
        this.max_concurrent_flushes = tsdb.config.getInt("tsd.storage.compaction.max_concurrent_flushes");
        this.flush_speed = tsdb.config.getInt("tsd.storage.compaction.flush_speed");
        if (tsdb.config.enable_compactions()) {
            this.startCompactionThread();
        }
    }

    @Override
    public int size() {
        return this.size.get();
    }

    public void add(byte[] row) {
        if (super.put(row, Boolean.TRUE) == null) {
            this.size.incrementAndGet();
        }
    }

    public Deferred<ArrayList<Object>> flush() {
        int size = this.size();
        if (size > 0) {
            LOG.info("Flushing all old outstanding rows out of " + size + " rows");
        }
        long now = System.currentTimeMillis();
        return this.flush(now / 1000L - 3600L - 1L, Integer.MAX_VALUE);
    }

    void collectStats(StatsCollector collector) {
        collector.record("compaction.count", this.compaction_count);
        collector.record("compaction.duplicates", this.duplicates_same, "type=identical");
        collector.record("compaction.duplicates", this.duplicates_different, "type=variant");
        if (!this.tsdb.config.enable_compactions()) {
            return;
        }
        collector.record("compaction.queue.size", this.size);
        collector.record("compaction.errors", this.handle_read_error.errors, "rpc=read");
        collector.record("compaction.errors", this.handle_write_error.errors, "rpc=put");
        collector.record("compaction.errors", this.handle_delete_error.errors, "rpc=delete");
        collector.record("compaction.writes", this.written_cells);
        collector.record("compaction.deletes", this.deleted_cells);
    }

    private Deferred<ArrayList<Object>> flush(final long cut_off, int maxflushes) {
        assert (maxflushes > 0) : "maxflushes must be > 0, but I got " + maxflushes;
        if ((maxflushes = Math.min(maxflushes, this.size())) == 0) {
            return Deferred.fromResult(new ArrayList(0));
        }
        ArrayList<Deferred> ds = new ArrayList<Deferred>(Math.min(maxflushes, this.max_concurrent_flushes));
        int nflushes = 0;
        int seed = (int)(System.nanoTime() % 3L);
        for (byte[] row : this.keySet()) {
            if (maxflushes == 0) break;
            if (seed == row.hashCode() % 3) continue;
            long base_time = Bytes.getUnsignedInt((byte[])row, (int)(Const.SALT_WIDTH() + this.metric_width));
            if (base_time > cut_off || nflushes == this.max_concurrent_flushes) break;
            if (super.remove(row) == null) continue;
            ++nflushes;
            --maxflushes;
            this.size.decrementAndGet();
            ds.add(this.tsdb.get(row).addCallbacks((Callback)this.compactcb, (Callback)this.handle_read_error));
        }
        Deferred group = Deferred.group(ds);
        if (nflushes == this.max_concurrent_flushes && maxflushes > 0) {
            this.tsdb.getClient().flush();
            final int maxflushez = maxflushes;
            final class FlushMoreCB
            implements Callback<Deferred<ArrayList<Object>>, ArrayList<Object>> {
                FlushMoreCB() {
                }

                public Deferred<ArrayList<Object>> call(ArrayList<Object> arg) {
                    return CompactionQueue.this.flush(cut_off, maxflushez);
                }

                public String toString() {
                    return "Continue flushing with cut_off=" + cut_off + ", maxflushes=" + maxflushez;
                }
            }
            group.addCallbackDeferring((Callback)new FlushMoreCB());
        }
        return group;
    }

    KeyValue compact(ArrayList<KeyValue> row, List<Annotation> annotations) {
        KeyValue[] compacted = new KeyValue[]{null};
        this.compact(row, compacted, annotations);
        return compacted[0];
    }

    protected static boolean isDatapoint(KeyValue kv) {
        return (kv.qualifier().length & 1) == 0;
    }

    Deferred<Object> compact(ArrayList<KeyValue> row, KeyValue[] compacted, List<Annotation> annotations) {
        return new Compaction(row, compacted, annotations).compact();
    }

    private void startCompactionThread() {
        Thrd thread = new Thrd();
        thread.setDaemon(true);
        thread.start();
    }

    private static final class Cmp
    implements Comparator<byte[]> {
        private final short timestamp_pos;

        public Cmp(TSDB tsdb) {
            this.timestamp_pos = (short)(Const.SALT_WIDTH() + tsdb.metrics.width());
        }

        @Override
        public int compare(byte[] a, byte[] b) {
            int c = Bytes.memcmp((byte[])a, (byte[])b, (int)this.timestamp_pos, (int)4);
            return c != 0 ? c : Bytes.memcmp((byte[])a, (byte[])b);
        }
    }

    final class Thrd
    extends Thread {
        public Thrd() {
            super("CompactionThread");
        }

        @Override
        public void run() {
            while (true) {
                try {
                    int size = CompactionQueue.this.size();
                    if (size > CompactionQueue.this.min_flush_threshold) {
                        int maxflushes = Math.max(CompactionQueue.this.min_flush_threshold, size * CompactionQueue.this.flush_interval * CompactionQueue.this.flush_speed / 3600);
                        long now = System.currentTimeMillis();
                        CompactionQueue.this.flush(now / 1000L - 3600L - 1L, maxflushes);
                        if (LOG.isDebugEnabled()) {
                            int newsize = CompactionQueue.this.size();
                            LOG.debug("flush() took " + (System.currentTimeMillis() - now) + "ms, new queue size=" + newsize + " (" + (newsize - size) + ')');
                        }
                    }
                }
                catch (Exception e) {
                    LOG.error("Uncaught exception in compaction thread", (Throwable)e);
                }
                catch (OutOfMemoryError e) {
                    int sz = CompactionQueue.this.size.get();
                    CompactionQueue.super.clear();
                    CompactionQueue.this.size.set(0);
                    LOG.error("Discarded the compaction queue, size=" + sz, (Throwable)e);
                }
                catch (Throwable e) {
                    LOG.error("Uncaught *Throwable* in compaction thread", e);
                    try {
                        Thread.sleep(1000L);
                    }
                    catch (InterruptedException i) {
                        LOG.error("Compaction thread interrupted in error handling", (Throwable)i);
                        return;
                    }
                    CompactionQueue.this.startCompactionThread();
                    return;
                }
                try {
                    Thread.sleep(CompactionQueue.this.flush_interval * 1000);
                }
                catch (InterruptedException e) {
                    LOG.error("Compaction thread interrupted, doing one last flush", (Throwable)e);
                    CompactionQueue.this.flush();
                    return;
                }
            }
        }
    }

    private final class HandleErrorCB
    implements Callback<Object, Exception> {
        private volatile int errors;
        private final String what;

        public HandleErrorCB(String what) {
            this.what = what;
        }

        public Object call(Exception e) {
            if (e instanceof PleaseThrottleException) {
                HBaseRpc rpc = ((PleaseThrottleException)e).getFailedRpc();
                if (rpc instanceof HBaseRpc.HasKey) {
                    CompactionQueue.this.add(((HBaseRpc.HasKey)rpc).key());
                    return Boolean.TRUE;
                }
                LOG.error("WTF?  Cannot retry this RPC, and this shouldn't happen: " + rpc);
            }
            if (++this.errors % 100 == 1) {
                LOG.error("Failed to " + this.what + " a row to re-compact", (Throwable)e);
            }
            return e;
        }

        public String toString() {
            return "handle " + this.what + " error";
        }
    }

    private final class DeleteCompactedCB
    implements Callback<Object, Object> {
        private final byte[] key;
        private final byte[][] qualifiers;

        public DeleteCompactedCB(List<KeyValue> cells) {
            KeyValue first = cells.get(0);
            this.key = first.key();
            this.qualifiers = new byte[cells.size()][];
            for (int i = 0; i < this.qualifiers.length; ++i) {
                this.qualifiers[i] = cells.get(i).qualifier();
            }
        }

        public Object call(Object arg) {
            return CompactionQueue.this.tsdb.delete(this.key, this.qualifiers).addErrback((Callback)CompactionQueue.this.handle_delete_error);
        }

        public String toString() {
            return "delete compacted cells";
        }
    }

    private class Compaction {
        private final ArrayList<KeyValue> row;
        private final KeyValue[] compacted;
        private final List<Annotation> annotations;
        private final int nkvs;
        private final List<KeyValue> to_delete;
        private PriorityQueue<ColumnDatapointIterator> heap;
        private boolean ms_in_row;
        private boolean s_in_row;
        private KeyValue longest;
        private KeyValue last_append_column;

        public Compaction(ArrayList<KeyValue> row, KeyValue[] compacted, List<Annotation> annotations) {
            this.nkvs = row.size();
            this.row = row;
            this.compacted = compacted;
            this.annotations = annotations;
            this.to_delete = new ArrayList<KeyValue>(this.nkvs);
        }

        private boolean noMergesOrFixups() {
            switch (this.heap.size()) {
                case 0: {
                    return true;
                }
                case 1: {
                    ColumnDatapointIterator col = this.heap.peek();
                    return (col.qualifier.length == 2 || col.qualifier.length == 4 && Internal.inMilliseconds(col.qualifier)) && !col.needsFixup();
                }
            }
            return false;
        }

        public Deferred<Object> compact() {
            if (this.nkvs == 0) {
                return null;
            }
            this.heap = new PriorityQueue(this.nkvs);
            int tot_values = this.buildHeapProcessAnnotations();
            if (this.noMergesOrFixups()) {
                if (this.compacted != null && this.heap.size() == 1) {
                    this.compacted[0] = this.findFirstDatapointColumn();
                }
                return null;
            }
            ByteBufferList compacted_qual = new ByteBufferList(tot_values);
            ByteBufferList compacted_val = new ByteBufferList(tot_values);
            CompactionQueue.this.compaction_count.incrementAndGet();
            this.mergeDatapoints(compacted_qual, compacted_val);
            if (compacted_qual.segmentCount() == 0) {
                return null;
            }
            KeyValue compact = this.buildCompactedColumn(compacted_qual, compacted_val);
            boolean write = this.updateDeletesCheckForWrite(compact);
            if (this.compacted != null) {
                long cut_off;
                this.compacted[0] = compact;
                long base_time = Bytes.getUnsignedInt((byte[])compact.key(), (int)(Const.SALT_WIDTH() + CompactionQueue.this.metric_width));
                if (base_time > (cut_off = System.currentTimeMillis() / 1000L - 3600L - 1L)) {
                    return null;
                }
            }
            if (!((CompactionQueue)CompactionQueue.this).tsdb.config.enable_compactions() || !write && this.to_delete.isEmpty()) {
                return null;
            }
            byte[] key = compact.key();
            CompactionQueue.this.deleted_cells.addAndGet(this.to_delete.size());
            if (write) {
                CompactionQueue.this.written_cells.incrementAndGet();
                Deferred deferred = CompactionQueue.this.tsdb.put(key, compact.qualifier(), compact.value());
                if (!this.to_delete.isEmpty()) {
                    deferred = deferred.addCallbacks((Callback)new DeleteCompactedCB(this.to_delete), (Callback)CompactionQueue.this.handle_write_error);
                }
                return deferred;
            }
            if (this.last_append_column == null) {
                new DeleteCompactedCB(this.to_delete).call(null);
                return null;
            }
            return null;
        }

        private KeyValue findFirstDatapointColumn() {
            if (this.last_append_column != null) {
                return this.last_append_column;
            }
            for (KeyValue kv : this.row) {
                if (!CompactionQueue.isDatapoint(kv)) continue;
                return kv;
            }
            return null;
        }

        private int buildHeapProcessAnnotations() {
            int tot_values = 0;
            for (KeyValue kv : this.row) {
                ColumnDatapointIterator col;
                byte[] qual = kv.qualifier();
                int len = qual.length;
                if ((len & 1) != 0) {
                    if (qual[0] == Annotation.PREFIX()) {
                        this.annotations.add(JSON.parseToObject(kv.value(), Annotation.class));
                        continue;
                    }
                    if (qual[0] == 5) {
                        AppendDataPoints adp = new AppendDataPoints();
                        tot_values += adp.parseKeyValue(CompactionQueue.this.tsdb, kv).size();
                        this.last_append_column = new KeyValue(kv.key(), kv.family(), adp.qualifier(), kv.timestamp(), adp.value());
                        if (this.longest == null || this.longest.qualifier().length < this.last_append_column.qualifier().length) {
                            this.longest = this.last_append_column;
                        }
                        if (!(col = new ColumnDatapointIterator(this.last_append_column)).hasMoreData()) continue;
                        this.heap.add(col);
                        continue;
                    }
                    LOG.warn("Ignoring unexpected extended format type " + qual[0]);
                    continue;
                }
                int entry_size = Internal.inMilliseconds(qual) ? 4 : 2;
                tot_values += (len + entry_size - 1) / entry_size;
                if (this.longest == null || this.longest.qualifier().length < kv.qualifier().length) {
                    this.longest = kv;
                }
                if ((col = new ColumnDatapointIterator(kv)).hasMoreData()) {
                    this.heap.add(col);
                }
                this.to_delete.add(kv);
            }
            return tot_values;
        }

        private void mergeDatapoints(ByteBufferList compacted_qual, ByteBufferList compacted_val) {
            int prevTs = -1;
            while (!this.heap.isEmpty()) {
                ColumnDatapointIterator col = (ColumnDatapointIterator)this.heap.remove();
                int ts = col.getTimestampOffsetMs();
                if (ts == prevTs) {
                    byte[] discardedVal;
                    byte[] existingVal = compacted_val.getLastSegment();
                    if (!Arrays.equals(existingVal, discardedVal = col.getCopyOfCurrentValue())) {
                        CompactionQueue.this.duplicates_different.incrementAndGet();
                        if (!((CompactionQueue)CompactionQueue.this).tsdb.config.fix_duplicates()) {
                            throw new IllegalDataException("Duplicate timestamp for key=" + Arrays.toString(this.row.get(0).key()) + ", ms_offset=" + ts + ", older=" + Arrays.toString(existingVal) + ", newer=" + Arrays.toString(discardedVal) + "; set tsd.storage.fix_duplicates=true to fix automatically or run Fsck");
                        }
                        LOG.warn("Duplicate timestamp for key=" + Arrays.toString(this.row.get(0).key()) + ", ms_offset=" + ts + ", kept=" + Arrays.toString(existingVal) + ", discarded=" + Arrays.toString(discardedVal));
                    } else {
                        CompactionQueue.this.duplicates_same.incrementAndGet();
                    }
                } else {
                    prevTs = ts;
                    col.writeToBuffers(compacted_qual, compacted_val);
                    this.ms_in_row |= col.isMilliseconds();
                    this.s_in_row |= !col.isMilliseconds();
                }
                if (!col.advance()) continue;
                this.heap.add(col);
            }
        }

        private KeyValue buildCompactedColumn(ByteBufferList compacted_qual, ByteBufferList compacted_val) {
            int metadata_length = compacted_val.segmentCount() > 1 ? 1 : 0;
            byte[] cq = compacted_qual.toBytes(0);
            byte[] cv = compacted_val.toBytes(metadata_length);
            if (metadata_length > 0) {
                byte metadata_flag = 0;
                if (this.ms_in_row && this.s_in_row) {
                    metadata_flag = (byte)(metadata_flag | 1);
                }
                cv[cv.length - 1] = metadata_flag;
            }
            KeyValue first = this.row.get(0);
            return new KeyValue(first.key(), first.family(), cq, cv);
        }

        private boolean updateDeletesCheckForWrite(KeyValue compact) {
            if (this.last_append_column != null) {
                return false;
            }
            if (this.longest != null && this.longest.qualifier().length >= compact.qualifier().length) {
                Iterator<KeyValue> deleteIterator = this.to_delete.iterator();
                while (deleteIterator.hasNext()) {
                    KeyValue cur = deleteIterator.next();
                    if (!Arrays.equals(cur.qualifier(), compact.qualifier())) continue;
                    deleteIterator.remove();
                    return !Arrays.equals(cur.value(), compact.value());
                }
            }
            return true;
        }
    }

    private final class CompactCB
    implements Callback<Object, ArrayList<KeyValue>> {
        private CompactCB() {
        }

        public Object call(ArrayList<KeyValue> row) {
            return CompactionQueue.this.compact(row, null);
        }

        public String toString() {
            return "compact";
        }
    }
}

