/*
 * Decompiled with CFR 0.152.
 */
package net.opentsdb.tsd;

import java.io.IOException;
import java.nio.channels.ClosedChannelException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import net.opentsdb.stats.StatsCollector;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelEvent;
import org.jboss.netty.channel.ChannelException;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.SimpleChannelHandler;
import org.jboss.netty.channel.group.DefaultChannelGroup;
import org.jboss.netty.handler.codec.embedder.CodecEmbedderException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class ConnectionManager
extends SimpleChannelHandler {
    private static final Logger LOG = LoggerFactory.getLogger(ConnectionManager.class);
    private static final AtomicLong connections_established = new AtomicLong();
    private static final AtomicLong connections_rejected = new AtomicLong();
    private static final AtomicLong exceptions_unknown = new AtomicLong();
    private static final AtomicLong exceptions_closed = new AtomicLong();
    private static final AtomicLong exceptions_reset = new AtomicLong();
    private static final AtomicLong exceptions_timeout = new AtomicLong();
    private final int connections_limit;
    private final AtomicInteger open_connections = new AtomicInteger();
    private static final DefaultChannelGroup channels = new DefaultChannelGroup("all-channels");

    static void closeAllConnections() {
        channels.close().awaitUninterruptibly();
    }

    public ConnectionManager() {
        this.connections_limit = 0;
    }

    public ConnectionManager(int connections_limit) {
        LOG.info("TSD concurrent connection limit set to: " + connections_limit);
        this.connections_limit = connections_limit;
    }

    public static void collectStats(StatsCollector collector) {
        collector.record("connectionmgr.connections", channels.size(), "type=open");
        collector.record("connectionmgr.connections", connections_rejected, "type=rejected");
        collector.record("connectionmgr.connections", connections_established, "type=total");
        collector.record("connectionmgr.exceptions", exceptions_closed, "type=closed");
        collector.record("connectionmgr.exceptions", exceptions_reset, "type=reset");
        collector.record("connectionmgr.exceptions", exceptions_timeout, "type=timeout");
        collector.record("connectionmgr.exceptions", exceptions_unknown, "type=unknown");
    }

    public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e) throws IOException {
        int channel_size;
        if (this.connections_limit > 0 && (channel_size = this.open_connections.incrementAndGet()) > this.connections_limit) {
            throw new ConnectionRefusedException("Channel size (" + channel_size + ") exceeds total connection limit (" + this.connections_limit + ")");
        }
        channels.add(e.getChannel());
        connections_established.incrementAndGet();
    }

    public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws IOException {
        this.open_connections.decrementAndGet();
    }

    public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e) throws Exception {
        if (e instanceof ChannelStateEvent) {
            LOG.info(e.toString());
        }
        super.handleUpstream(ctx, e);
    }

    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
        Throwable cause = e.getCause();
        Channel chan = ctx.getChannel();
        if (cause instanceof ClosedChannelException) {
            exceptions_closed.incrementAndGet();
            LOG.warn("Attempt to write to closed channel " + chan);
            return;
        }
        if (cause instanceof IOException) {
            String message = cause.getMessage();
            if ("Connection reset by peer".equals(message)) {
                exceptions_reset.incrementAndGet();
                return;
            }
            if ("Connection timed out".equals(message)) {
                exceptions_timeout.incrementAndGet();
                return;
            }
            if (cause instanceof ConnectionRefusedException) {
                connections_rejected.incrementAndGet();
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Refusing connection from " + chan, e.getCause());
                }
                chan.close();
                return;
            }
        }
        if (cause instanceof CodecEmbedderException) {
            LOG.warn("Http codec error : " + cause.getMessage());
            e.getChannel().close();
            return;
        }
        exceptions_unknown.incrementAndGet();
        LOG.error("Unexpected exception from downstream for " + chan, cause);
        e.getChannel().close();
    }

    private static class ConnectionRefusedException
    extends ChannelException {
        private static final long serialVersionUID = 5348377149312597939L;

        public ConnectionRefusedException(String message) {
            super(message);
        }
    }
}

