/*
 * Decompiled with CFR 0.152.
 */
package com.netflix.spectator.atlas;

import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.databind.JsonSerializer;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.fasterxml.jackson.dataformat.smile.SmileFactory;
import com.netflix.spectator.api.AbstractRegistry;
import com.netflix.spectator.api.Clock;
import com.netflix.spectator.api.Counter;
import com.netflix.spectator.api.DistributionSummary;
import com.netflix.spectator.api.Gauge;
import com.netflix.spectator.api.Id;
import com.netflix.spectator.api.Measurement;
import com.netflix.spectator.api.Meter;
import com.netflix.spectator.api.Registry;
import com.netflix.spectator.api.RegistryConfig;
import com.netflix.spectator.api.Tag;
import com.netflix.spectator.api.Timer;
import com.netflix.spectator.atlas.AtlasConfig;
import com.netflix.spectator.atlas.AtlasCounter;
import com.netflix.spectator.atlas.AtlasDistributionSummary;
import com.netflix.spectator.atlas.AtlasGauge;
import com.netflix.spectator.atlas.AtlasMaxGauge;
import com.netflix.spectator.atlas.AtlasTimer;
import com.netflix.spectator.atlas.OverridableClock;
import com.netflix.spectator.atlas.RollupPolicy;
import com.netflix.spectator.atlas.StepClock;
import com.netflix.spectator.atlas.SubscriptionManager;
import com.netflix.spectator.atlas.impl.Consolidator;
import com.netflix.spectator.atlas.impl.EvalPayload;
import com.netflix.spectator.atlas.impl.Evaluator;
import com.netflix.spectator.atlas.impl.MeasurementSerializer;
import com.netflix.spectator.atlas.impl.PublishPayload;
import com.netflix.spectator.impl.AsciiSet;
import com.netflix.spectator.impl.Scheduler;
import com.netflix.spectator.ipc.http.HttpClient;
import com.netflix.spectator.ipc.http.HttpResponse;
import java.net.URI;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;
import javax.inject.Inject;
import javax.inject.Singleton;

@Singleton
public final class AtlasRegistry
extends AbstractRegistry
implements AutoCloseable {
    private static final String CLOCK_SKEW_TIMER = "spectator.atlas.clockSkew";
    private static final String PUBLISH_TASK_TIMER = "spectator.atlas.publishTaskTime";
    private final Clock stepClock;
    private final AtlasConfig config;
    private final Duration step;
    private final long stepMillis;
    private final long meterTTL;
    private final URI uri;
    private final Duration lwcStep;
    private final long lwcStepMillis;
    private final Duration configRefreshFrequency;
    private final URI evalUri;
    private final int connectTimeout;
    private final int readTimeout;
    private final int batchSize;
    private final int numThreads;
    private final Map<String, String> commonTags;
    private final AsciiSet charset;
    private final ObjectMapper jsonMapper;
    private final ObjectMapper smileMapper;
    private final Registry debugRegistry;
    private final RollupPolicy rollupPolicy;
    private final HttpClient client;
    private Scheduler scheduler;
    private ExecutorService senderPool;
    private final SubscriptionManager subManager;
    private final Evaluator evaluator;
    private long lastPollTimestamp = -1L;
    private final Map<Id, Consolidator> atlasMeasurements = new HashMap<Id, Consolidator>();

    @Inject
    public AtlasRegistry(Clock clock, AtlasConfig config) {
        super((Clock)new OverridableClock(clock), (RegistryConfig)config);
        this.config = config;
        this.stepClock = new StepClock(clock, config.lwcStep().toMillis());
        this.step = config.step();
        this.stepMillis = this.step.toMillis();
        this.meterTTL = config.meterTTL().toMillis();
        this.uri = URI.create(config.uri());
        this.lwcStep = config.lwcStep();
        this.lwcStepMillis = this.lwcStep.toMillis();
        if (this.lwcStepMillis > this.stepMillis) {
            throw new IllegalArgumentException("lwcStep cannot be larger than step (" + this.lwcStep + " > " + this.step + ")");
        }
        if (this.stepMillis % this.lwcStepMillis != 0L) {
            throw new IllegalArgumentException("step is not an even multiple of lwcStep (" + this.step + " % " + this.lwcStep + " != 0)");
        }
        this.configRefreshFrequency = config.configRefreshFrequency();
        this.evalUri = URI.create(config.evalUri());
        this.connectTimeout = (int)config.connectTimeout().toMillis();
        this.readTimeout = (int)config.readTimeout().toMillis();
        this.batchSize = config.batchSize();
        this.numThreads = config.numThreads();
        this.commonTags = new TreeMap<String, String>(config.commonTags());
        this.charset = AsciiSet.fromPattern((String)config.validTagCharacters());
        SimpleModule module = new SimpleModule().addSerializer(Measurement.class, (JsonSerializer)new MeasurementSerializer(this.charset));
        this.jsonMapper = new ObjectMapper(new JsonFactory()).registerModule((Module)module);
        this.smileMapper = new ObjectMapper((JsonFactory)new SmileFactory()).registerModule((Module)module);
        this.debugRegistry = Optional.ofNullable(config.debugRegistry()).orElse((Registry)this);
        this.rollupPolicy = config.rollupPolicy();
        this.client = HttpClient.create((Registry)this.debugRegistry);
        this.subManager = new SubscriptionManager(this.jsonMapper, this.client, clock, config);
        this.evaluator = new Evaluator(this.commonTags, this::toMap, this.lwcStepMillis);
        if (config.autoStart()) {
            this.start();
        }
    }

    public void start() {
        if (this.scheduler == null) {
            this.logger.info("common tags: {}", this.commonTags);
            ThreadFactory factory = new ThreadFactory(){
                private final AtomicInteger next = new AtomicInteger();

                @Override
                public Thread newThread(Runnable r) {
                    String name = "spectator-atlas-publish-" + this.next.getAndIncrement();
                    Thread t = new Thread(r, name);
                    t.setDaemon(true);
                    return t;
                }
            };
            this.senderPool = Executors.newFixedThreadPool(this.numThreads, factory);
            Scheduler.Options options = new Scheduler.Options().withFrequency(Scheduler.Policy.FIXED_RATE_SKIP_IF_LONG, this.step).withInitialDelay(Duration.ofMillis(this.getInitialDelay(this.stepMillis))).withStopOnFailure(false);
            this.scheduler = new Scheduler(this.debugRegistry, "spectator-reg-atlas", this.numThreads);
            this.scheduler.schedule(options, this::sendToAtlas);
            this.logger.info("started collecting metrics every {} reporting to {}", (Object)this.step, (Object)this.uri);
            Scheduler.Options lwcOptions = new Scheduler.Options().withFrequency(Scheduler.Policy.FIXED_RATE_SKIP_IF_LONG, this.lwcStep).withInitialDelay(Duration.ofMillis(this.getInitialDelay(this.lwcStepMillis))).withStopOnFailure(false);
            this.scheduler.schedule(lwcOptions, this::sendToLWC);
            Scheduler.Options subOptions = new Scheduler.Options().withFrequency(Scheduler.Policy.FIXED_DELAY, this.configRefreshFrequency).withStopOnFailure(false);
            this.scheduler.schedule(subOptions, this::fetchSubscriptions);
        } else {
            this.logger.warn("registry already started, ignoring duplicate request");
        }
    }

    long getInitialDelay(long stepSize) {
        long now = this.clock().wallTime();
        long stepBoundary = now / stepSize * stepSize;
        long firstTime = stepBoundary + stepSize / 10L;
        return firstTime > now ? firstTime - now : firstTime + stepSize - now;
    }

    public void stop() {
        if (this.scheduler != null) {
            this.scheduler.shutdown();
            this.scheduler = null;
            this.logger.info("stopped collecting metrics every {}ms reporting to {}", (Object)this.step, (Object)this.uri);
        } else {
            this.logger.warn("registry stopped, but was never started");
        }
        try {
            this.logger.info("flushing data for final interval to Atlas");
            OverridableClock overridableClock = (OverridableClock)this.clock();
            long now = this.clock().wallTime();
            overridableClock.setWallTime(now / this.lwcStepMillis * this.lwcStepMillis + this.lwcStepMillis);
            this.pollMeters(overridableClock.wallTime());
            overridableClock.setWallTime(now / this.stepMillis * this.stepMillis + this.stepMillis);
            this.sendToAtlas();
        }
        catch (Exception e) {
            this.logger.warn("failed to flush data to Atlas", (Throwable)e);
        }
        if (this.senderPool != null) {
            this.senderPool.shutdown();
            this.senderPool = null;
        }
    }

    @Override
    public void close() {
        this.stop();
    }

    private long lastCompletedTimestamp(long s) {
        long now = this.clock().wallTime();
        return now / s * s;
    }

    private Timer publishTaskTimer(String id) {
        return this.debugRegistry.timer(PUBLISH_TASK_TIMER, new String[]{"id", id});
    }

    private void sendBatch(List<Measurement> batch) {
        this.publishTaskTimer("sendBatch").record(() -> {
            try {
                HttpResponse res;
                Instant date;
                PublishPayload p = new PublishPayload(this.commonTags, batch);
                if (this.logger.isTraceEnabled()) {
                    this.logger.trace("publish payload: {}", (Object)this.jsonMapper.writeValueAsString((Object)p));
                }
                this.recordClockSkew((date = (res = this.client.post(this.uri).withConnectTimeout(this.connectTimeout).withReadTimeout(this.readTimeout).withContent("application/x-jackson-smile", this.smileMapper.writeValueAsBytes((Object)p)).compress(1).send()).dateHeader("Date")) == null ? 0L : date.toEpochMilli());
            }
            catch (Exception e) {
                this.logger.warn("failed to send metrics (uri={})", (Object)this.uri, (Object)e);
            }
        });
    }

    void sendToAtlas() {
        this.publishTaskTimer("sendToAtlas").record(() -> {
            if (this.config.enabled()) {
                long t = this.lastCompletedTimestamp(this.stepMillis);
                this.pollMeters(t);
                this.logger.debug("sending to Atlas for time: {}", (Object)t);
                ArrayList<CompletableFuture<Void>> futures = new ArrayList<CompletableFuture<Void>>();
                for (List<Measurement> batch : this.getBatches(t)) {
                    CompletableFuture<Void> future = CompletableFuture.runAsync(() -> this.sendBatch(batch), this.senderPool);
                    futures.add(future);
                }
                CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
            } else {
                this.logger.debug("publishing is disabled, skipping collection");
            }
            this.removeExpiredMeters();
        });
    }

    void sendToLWC() {
        this.publishTaskTimer("sendToLWC").record(() -> {
            long t = this.lastCompletedTimestamp(this.lwcStepMillis);
            this.pollMeters(t);
            if (this.config.lwcEnabled()) {
                this.logger.debug("sending to LWC for time: {}", (Object)t);
                try {
                    EvalPayload payload = this.evaluator.eval(t);
                    if (!payload.getMetrics().isEmpty()) {
                        String json = this.jsonMapper.writeValueAsString((Object)payload);
                        if (this.logger.isTraceEnabled()) {
                            this.logger.trace("eval payload: {}", (Object)json);
                        }
                        this.client.post(this.evalUri).withConnectTimeout(this.connectTimeout).withReadTimeout(this.readTimeout).withJsonContent(json).send();
                    }
                }
                catch (Exception e) {
                    this.logger.warn("failed to send metrics for subscriptions (uri={})", (Object)this.evalUri, (Object)e);
                }
            } else {
                this.logger.debug("lwc is disabled, skipping subscriptions");
            }
        });
    }

    synchronized void pollMeters(long t) {
        this.publishTaskTimer("pollMeters").record(() -> {
            if (t > this.lastPollTimestamp) {
                this.logger.debug("collecting measurements for time: {}", (Object)t);
                ArrayList measurements = new ArrayList(this.atlasMeasurements.size());
                this.publishTaskTimer("pollMeasurements").record(() -> {
                    Iterator iterator = this.iterator();
                    while (iterator.hasNext()) {
                        Meter meter = (Meter)iterator.next();
                        if (meter.hasExpired()) continue;
                        for (Measurement m : meter.measure()) {
                            if (Double.isNaN(m.value())) continue;
                            measurements.add(m);
                        }
                    }
                });
                this.logger.debug("updating evaluator for time: {}", (Object)t);
                for (Measurement m : measurements) {
                    Consolidator consolidator = this.atlasMeasurements.get(m.id());
                    if (consolidator == null) {
                        int multiple = (int)(this.stepMillis / this.lwcStepMillis);
                        consolidator = Consolidator.create(m.id(), this.stepMillis, multiple);
                        this.atlasMeasurements.put(m.id(), consolidator);
                    }
                    consolidator.update(m);
                    this.evaluator.update(m);
                }
                this.lastPollTimestamp = t;
            }
        });
    }

    public void removeExpiredMeters() {
        super.removeExpiredMeters();
    }

    private void fetchSubscriptions() {
        if (this.config.lwcEnabled()) {
            this.subManager.refresh();
            this.evaluator.sync(this.subManager.subscriptions());
        } else {
            this.logger.debug("lwc is disabled, skipping subscription config refresh");
        }
    }

    private void recordClockSkew(long responseTimestamp) {
        if (responseTimestamp == 0L) {
            this.logger.debug("no date timestamp on response, cannot record skew");
        } else {
            long delta = this.clock().wallTime() - responseTimestamp;
            if (delta >= 0L) {
                this.debugRegistry.timer(CLOCK_SKEW_TIMER, new String[]{"id", "fast"}).record(delta, TimeUnit.MILLISECONDS);
            } else {
                this.debugRegistry.timer(CLOCK_SKEW_TIMER, new String[]{"id", "slow"}).record(-delta, TimeUnit.MILLISECONDS);
            }
            this.logger.debug("clock skew between client and server: {}ms", (Object)delta);
        }
    }

    private Map<String, String> toMap(Id id) {
        HashMap<String, String> tags = new HashMap<String, String>();
        for (Tag t : id.tags()) {
            String k = this.charset.replaceNonMembers(t.key(), '_');
            String v = this.charset.replaceNonMembers(t.value(), '_');
            tags.put(k, v);
        }
        String name = this.charset.replaceNonMembers(id.name(), '_');
        tags.put("name", name);
        return tags;
    }

    synchronized List<List<Measurement>> getBatches(long t) {
        ArrayList<List<Measurement>> batches = new ArrayList<List<Measurement>>();
        this.publishTaskTimer("getBatches").record(() -> {
            int n = this.atlasMeasurements.size();
            this.debugRegistry.distributionSummary("spectator.registrySize").record((long)n);
            ArrayList<Measurement> input = new ArrayList<Measurement>(n);
            Iterator<Map.Entry<Id, Consolidator>> it = this.atlasMeasurements.entrySet().iterator();
            while (it.hasNext()) {
                Map.Entry<Id, Consolidator> entry = it.next();
                Consolidator consolidator = entry.getValue();
                consolidator.update(t, Double.NaN);
                double v = consolidator.value(t);
                if (!Double.isNaN(v)) {
                    input.add(new Measurement(entry.getKey(), t, v));
                }
                if (!consolidator.isEmpty()) continue;
                it.remove();
            }
            List ms = (List)this.rollupPolicy.apply(input);
            this.debugRegistry.distributionSummary("spectator.rollupResultSize").record((long)ms.size());
            for (int i = 0; i < ms.size(); i += this.batchSize) {
                List batch = ms.subList(i, Math.min(ms.size(), i + this.batchSize));
                batches.add(batch);
            }
        });
        return batches;
    }

    public Stream<Measurement> measurements() {
        long t = this.lastCompletedTimestamp(this.stepMillis);
        this.pollMeters(t);
        return this.getBatches(t).stream().flatMap(Collection::stream);
    }

    protected Counter newCounter(Id id) {
        return new AtlasCounter(id, this.clock(), this.meterTTL, this.lwcStepMillis);
    }

    protected DistributionSummary newDistributionSummary(Id id) {
        return new AtlasDistributionSummary(id, this.clock(), this.meterTTL, this.lwcStepMillis);
    }

    protected Timer newTimer(Id id) {
        return new AtlasTimer(id, this.clock(), this.meterTTL, this.lwcStepMillis);
    }

    protected Gauge newGauge(Id id) {
        return new AtlasGauge(id, this.stepClock, this.meterTTL);
    }

    protected Gauge newMaxGauge(Id id) {
        return new AtlasMaxGauge(id, this.clock(), this.meterTTL, this.lwcStepMillis);
    }
}

