/*
 * Decompiled with CFR 0.152.
 */
package org.apache.skywalking.oap.server.exporter.provider.grpc;

import io.grpc.Channel;
import io.grpc.ManagedChannel;
import io.grpc.stub.StreamObserver;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
import org.apache.skywalking.oap.server.core.analysis.metrics.DoubleValueHolder;
import org.apache.skywalking.oap.server.core.analysis.metrics.IntValueHolder;
import org.apache.skywalking.oap.server.core.analysis.metrics.LongValueHolder;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.analysis.metrics.MetricsMetaInfo;
import org.apache.skywalking.oap.server.core.analysis.metrics.WithMetadata;
import org.apache.skywalking.oap.server.core.exporter.ExportEvent;
import org.apache.skywalking.oap.server.core.exporter.MetricValuesExportService;
import org.apache.skywalking.oap.server.exporter.grpc.ExportMetricValue;
import org.apache.skywalking.oap.server.exporter.grpc.ExportResponse;
import org.apache.skywalking.oap.server.exporter.grpc.MetricExportServiceGrpc;
import org.apache.skywalking.oap.server.exporter.grpc.SubscriptionReq;
import org.apache.skywalking.oap.server.exporter.grpc.SubscriptionsResp;
import org.apache.skywalking.oap.server.exporter.grpc.ValueType;
import org.apache.skywalking.oap.server.exporter.provider.MetricFormatter;
import org.apache.skywalking.oap.server.exporter.provider.grpc.GRPCExporterSetting;
import org.apache.skywalking.oap.server.library.client.grpc.GRPCClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class GRPCExporter
extends MetricFormatter
implements MetricValuesExportService,
IConsumer<ExportData> {
    private static final Logger logger = LoggerFactory.getLogger(GRPCExporter.class);
    private GRPCExporterSetting setting;
    private final MetricExportServiceGrpc.MetricExportServiceStub exportServiceFutureStub;
    private final MetricExportServiceGrpc.MetricExportServiceBlockingStub blockingStub;
    private final DataCarrier exportBuffer;
    private final Set<String> subscriptionSet;

    public GRPCExporter(GRPCExporterSetting setting) {
        this.setting = setting;
        GRPCClient client = new GRPCClient(setting.getTargetHost(), setting.getTargetPort());
        client.connect();
        ManagedChannel channel = client.getChannel();
        this.exportServiceFutureStub = MetricExportServiceGrpc.newStub((Channel)channel);
        this.blockingStub = MetricExportServiceGrpc.newBlockingStub((Channel)channel);
        this.exportBuffer = new DataCarrier(setting.getBufferChannelNum(), setting.getBufferChannelSize());
        this.exportBuffer.consume((IConsumer)this, 1, 200L);
        this.subscriptionSet = new HashSet<String>();
    }

    public void export(ExportEvent event) {
        Metrics metrics;
        if (ExportEvent.EventType.TOTAL == event.getType() && (metrics = event.getMetrics()) instanceof WithMetadata) {
            MetricsMetaInfo meta = ((WithMetadata)metrics).getMeta();
            if (this.subscriptionSet.size() == 0 || this.subscriptionSet.contains(meta.getMetricsName())) {
                this.exportBuffer.produce((Object)new ExportData(meta, metrics));
            }
        }
    }

    public void initSubscriptionList() {
        SubscriptionsResp subscription = this.blockingStub.subscription(SubscriptionReq.newBuilder().build());
        subscription.getMetricNamesList().forEach(this.subscriptionSet::add);
        logger.debug("Get exporter subscription list, {}", this.subscriptionSet);
    }

    public void init() {
    }

    public void consume(List<ExportData> data) {
        if (data.size() == 0) {
            return;
        }
        final ExportStatus status = new ExportStatus();
        StreamObserver<ExportMetricValue> streamObserver = this.exportServiceFutureStub.export(new StreamObserver<ExportResponse>(){

            public void onNext(ExportResponse response) {
            }

            public void onError(Throwable throwable) {
                status.done();
            }

            public void onCompleted() {
                status.done();
            }
        });
        AtomicInteger exportNum = new AtomicInteger();
        data.forEach(row -> {
            ExportMetricValue.Builder builder = ExportMetricValue.newBuilder();
            Metrics metrics = ((ExportData)row).getMetrics();
            if (metrics instanceof LongValueHolder) {
                long value = ((LongValueHolder)metrics).getValue();
                builder.setLongValue(value);
                builder.setType(ValueType.LONG);
            } else if (metrics instanceof IntValueHolder) {
                long value = ((IntValueHolder)metrics).getValue();
                builder.setLongValue(value);
                builder.setType(ValueType.LONG);
            } else if (metrics instanceof DoubleValueHolder) {
                double value = ((DoubleValueHolder)metrics).getValue();
                builder.setDoubleValue(value);
                builder.setType(ValueType.DOUBLE);
            } else {
                return;
            }
            MetricsMetaInfo meta = ((ExportData)row).getMeta();
            builder.setMetricName(meta.getMetricsName());
            String entityName = this.getEntityName(meta);
            if (entityName == null) {
                return;
            }
            builder.setEntityName(entityName);
            builder.setEntityId(meta.getId());
            builder.setTimeBucket(metrics.getTimeBucket());
            streamObserver.onNext((Object)builder.build());
            exportNum.getAndIncrement();
        });
        streamObserver.onCompleted();
        long sleepTime = 0L;
        long cycle = 100L;
        while (!status.isDone()) {
            try {
                sleepTime += cycle;
                Thread.sleep(cycle);
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
            if (sleepTime <= 2000L) continue;
            logger.warn("Export {} metrics to {}:{}, wait {} milliseconds.", new Object[]{exportNum.get(), this.setting.getTargetHost(), this.setting.getTargetPort(), sleepTime});
            cycle = 2000L;
        }
        logger.debug("Exported {} metrics to {}:{} in {} milliseconds.", new Object[]{exportNum.get(), this.setting.getTargetHost(), this.setting.getTargetPort(), sleepTime});
    }

    public void onError(List<ExportData> data, Throwable t) {
        logger.error(t.getMessage(), t);
    }

    public void onExit() {
    }

    private class ExportStatus {
        private boolean done = false;

        private ExportStatus() {
        }

        private void done() {
            this.done = true;
        }

        public boolean isDone() {
            return this.done;
        }
    }

    public class ExportData {
        private MetricsMetaInfo meta;
        private Metrics metrics;

        public ExportData(MetricsMetaInfo meta, Metrics metrics) {
            this.meta = meta;
            this.metrics = metrics;
        }

        private MetricsMetaInfo getMeta() {
            return this.meta;
        }

        private Metrics getMetrics() {
            return this.metrics;
        }
    }
}

