/*
 * Decompiled with CFR 0.152.
 */
package com.dianping.cat.message.io;

import com.dianping.cat.analyzer.LocalAggregator;
import com.dianping.cat.configuration.ClientConfigService;
import com.dianping.cat.configuration.DefaultClientConfigService;
import com.dianping.cat.configuration.MessageType;
import com.dianping.cat.configuration.client.entity.Server;
import com.dianping.cat.internal.shaded.io.netty.buffer.ByteBuf;
import com.dianping.cat.internal.shaded.io.netty.channel.ChannelFuture;
import com.dianping.cat.log.CatLogger;
import com.dianping.cat.message.Message;
import com.dianping.cat.message.internal.DefaultTransaction;
import com.dianping.cat.message.internal.MessageIdFactory;
import com.dianping.cat.message.io.ChannelManager;
import com.dianping.cat.message.io.MessageSender;
import com.dianping.cat.message.queue.DefaultMessageQueue;
import com.dianping.cat.message.queue.PriorityMessageQueue;
import com.dianping.cat.message.spi.MessageCodec;
import com.dianping.cat.message.spi.MessageQueue;
import com.dianping.cat.message.spi.MessageStatistics;
import com.dianping.cat.message.spi.MessageTree;
import com.dianping.cat.message.spi.codec.NativeMessageCodec;
import com.dianping.cat.message.spi.codec.PlainTextMessageCodec;
import com.dianping.cat.message.spi.internal.DefaultMessageStatistics;
import com.dianping.cat.status.AbstractCollector;
import com.dianping.cat.status.StatusExtensionRegister;
import com.dianping.cat.util.Threads;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;

public class TcpSocketSender
implements Threads.Task,
MessageSender {
    private MessageCodec nativeCodec = new NativeMessageCodec();
    private MessageStatistics statistics = new DefaultMessageStatistics();
    private ClientConfigService configService = DefaultClientConfigService.getInstance();
    private MessageQueue messageQueue = new PriorityMessageQueue(5000);
    private MessageIdFactory factory = MessageIdFactory.getInstance();
    private AtomicMessageManager atomicQueueManager = new AtomicMessageManager(5000);
    private ChannelManager channelManager = ChannelManager.getInstance();
    private boolean active;
    private static final int SIZE = 5000;
    private static final long HOUR = 3600000L;
    private static CatLogger LOGGER = CatLogger.getInstance();
    private static TcpSocketSender INSTANCE = new TcpSocketSender();

    public static TcpSocketSender getInstance() {
        return INSTANCE;
    }

    private TcpSocketSender() {
        List<Server> servers = this.configService.getServers();
        ArrayList<InetSocketAddress> addresses = new ArrayList<InetSocketAddress>();
        for (Server server : servers) {
            if (!server.isEnabled()) continue;
            addresses.add(new InetSocketAddress(server.getIp(), server.getPort()));
        }
        this.initialize(addresses);
    }

    @Override
    public String getName() {
        return "netty-tcp-data-sender";
    }

    private void initialize(List<InetSocketAddress> addresses) {
        Threads.forGroup("cat").start(this.channelManager);
        Runtime.getRuntime().addShutdownHook(new Thread(){

            @Override
            public void run() {
                LOGGER.info("shut down cat client in runtime shut down hook!");
                TcpSocketSender.this.shutdown();
            }
        });
        StatusExtensionRegister.getInstance().register(new AbstractCollector(){

            @Override
            public String getId() {
                return "cat.status";
            }

            @Override
            public Map<String, String> getProperties() {
                LinkedHashMap<String, String> map = new LinkedHashMap<String, String>();
                map.put("cat.status.send.sample.ratio", String.valueOf(TcpSocketSender.this.configService.getSamplingRate() * 100.0));
                map.put("cat.status.send.queue.size", String.valueOf(TcpSocketSender.this.messageQueue.size()));
                map.put("cat.status.send.atomic.queue.size", String.valueOf(TcpSocketSender.this.atomicQueueManager.getQueueSize()));
                Map<String, Long> values = TcpSocketSender.this.statistics.getStatistics();
                for (Map.Entry<String, Long> entry : values.entrySet()) {
                    map.put(entry.getKey(), String.valueOf(entry.getValue()));
                }
                return map;
            }
        });
    }

    private void logMessageDiscard(MessageTree tree) {
        this.statistics.onOverflowed(tree);
    }

    private void offer(MessageTree tree) {
        MessageType type = this.configService.parseMessageType(tree);
        boolean result = true;
        switch (type) {
            case NORMAL_MESSAGE: {
                result = this.messageQueue.offer(tree);
                break;
            }
            case SMALL_TRANSACTION: {
                result = this.atomicQueueManager.offerToQueue(tree);
                break;
            }
            case STAND_ALONE_EVENT: {
                this.processTreeInClient(tree);
            }
        }
        if (!result) {
            this.processTreeInClient(tree);
            if (!tree.canDiscard()) {
                this.logMessageDiscard(tree);
            }
        }
    }

    private void processMessage() {
        block12: {
            ChannelFuture channel = this.channelManager.channel();
            if (channel != null) {
                MessageTree tree = null;
                try {
                    tree = this.messageQueue.poll();
                    if (tree != null) {
                        this.sendInternal(channel, tree);
                        tree.setMessage(null);
                        break block12;
                    }
                    try {
                        Thread.sleep(5L);
                    }
                    catch (Exception e) {
                        this.active = false;
                    }
                }
                catch (Throwable t) {
                    LOGGER.error(PlainTextMessageCodec.encodeTree(tree));
                    LOGGER.error("Error when sending message over TCP socket!", t);
                }
            } else {
                long current = System.currentTimeMillis();
                long oldTimestamp = current - 3600000L;
                try {
                    MessageTree tree;
                    while ((tree = this.messageQueue.peek()) != null && tree.getMessage().getTimestamp() < oldTimestamp) {
                        MessageTree discardTree = this.messageQueue.poll();
                        if (discardTree == null) continue;
                        this.statistics.onOverflowed(discardTree);
                    }
                }
                catch (Exception e) {
                    LOGGER.error(e.getMessage(), e);
                }
                try {
                    Thread.sleep(5L);
                }
                catch (Exception e) {
                    this.active = false;
                }
            }
        }
    }

    private void processTreeInClient(MessageTree tree) {
        LocalAggregator.aggregate(tree);
    }

    @Override
    public void run() {
        MessageTree tree;
        this.active = true;
        while (this.active) {
            this.processMessage();
            this.atomicQueueManager.processAtomicMessage();
        }
        this.atomicQueueManager.processAtomicMessage();
        while ((tree = this.messageQueue.poll()) != null) {
            ChannelFuture channel = this.channelManager.channel();
            if (channel != null) {
                this.sendInternal(channel, tree);
                continue;
            }
            this.offer(tree);
        }
    }

    @Override
    public void send(MessageTree tree) {
        if (!this.configService.isMessageBlock()) {
            double sampleRatio = this.configService.getSamplingRate();
            if (tree.canDiscard() && sampleRatio < 1.0 && !tree.isHitSample()) {
                this.processTreeInClient(tree);
            } else {
                this.offer(tree);
            }
        }
    }

    private void sendInternal(ChannelFuture channel, MessageTree tree) {
        if (tree.getMessageId() == null) {
            tree.setMessageId(this.factory.getNextId());
        }
        ByteBuf buf = this.nativeCodec.encode(tree);
        int size = buf.readableBytes();
        channel.channel().writeAndFlush(buf);
        if (this.statistics != null) {
            this.statistics.onBytes(size);
        }
    }

    @Override
    public void shutdown() {
        this.active = false;
        this.channelManager.shutdown();
    }

    public class AtomicMessageManager {
        private MessageQueue smallMessages;
        private static final long HOUR = 3600000L;
        private static final int MAX_CHILD_NUMBER = 200;
        private static final int MAX_DURATION = 30000;

        public AtomicMessageManager(int size) {
            this.smallMessages = new DefaultMessageQueue(size);
        }

        public int getQueueSize() {
            return this.smallMessages.size();
        }

        private boolean isSameHour(long time1, long time2) {
            int hour1 = (int)(time1 / 3600000L);
            int hour2 = (int)(time2 / 3600000L);
            return hour1 == hour2;
        }

        private MessageTree mergeTree(MessageQueue queue) {
            long nextTimestamp;
            MessageTree tree;
            DefaultTransaction t = new DefaultTransaction("System", "AtomicAggregator");
            MessageTree first = queue.poll();
            Message message = first.getMessage();
            long timestamp = message.getTimestamp();
            t.setStatus("0");
            t.setCompleted(true);
            t.setDurationStart(timestamp);
            t.setTimestamp(timestamp);
            t.setDurationInMicros(0L);
            t.addChild(message);
            for (int max = 200; max >= 0 && (tree = queue.peek()) != null && this.isSameHour(timestamp, nextTimestamp = tree.getMessage().getTimestamp()) && (tree = queue.poll()) != null; --max) {
                t.addChild(tree.getMessage());
            }
            first.setMessage(t);
            return first;
        }

        public boolean offerToQueue(MessageTree tree) {
            return this.smallMessages.offer(tree);
        }

        public void processAtomicMessage() {
            this.processNormalAtomicMessage();
        }

        void processNormalAtomicMessage() {
            while (this.shouldMerge(this.smallMessages)) {
                MessageTree tree = this.mergeTree(this.smallMessages);
                TcpSocketSender.this.offer(tree);
            }
        }

        private boolean shouldMerge(MessageQueue queue) {
            MessageTree tree = queue.peek();
            if (tree != null) {
                long firstTime = tree.getMessage().getTimestamp();
                return System.currentTimeMillis() - firstTime > 30000L || queue.size() >= 200;
            }
            return false;
        }
    }
}

