package com.taobao.gecko.service.impl;

import com.taobao.gecko.core.command.Constants;
import com.taobao.gecko.core.command.RequestCommand;
import com.taobao.gecko.core.command.ResponseCommand;
import com.taobao.gecko.core.command.ResponseStatus;
import com.taobao.gecko.core.command.kernel.HeartBeatRequestCommand;
import com.taobao.gecko.core.core.Handler;
import com.taobao.gecko.core.core.Session;
import com.taobao.gecko.core.nio.NioSession;
import com.taobao.gecko.core.nio.impl.TimerRef;
import com.taobao.gecko.core.util.ExceptionMonitor;
import com.taobao.gecko.core.util.RemotingUtils;
import com.taobao.gecko.core.util.SelectorFactory;
import com.taobao.gecko.service.Connection;
import com.taobao.gecko.service.ConnectionLifeCycleListener;
import com.taobao.gecko.service.RemotingController;
import com.taobao.gecko.service.RemotingServer;
import com.taobao.gecko.service.RequestProcessor;
import com.taobao.gecko.service.SingleRequestCallBackListener;
import com.taobao.gecko.service.exception.IllegalMessageException;
import com.taobao.gecko.service.exception.NotifyRemotingException;
import java.net.InetSocketAddress;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import java.util.Set;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:com/taobao/gecko/service/impl/GeckoHandler.class */
public class GeckoHandler implements Handler {
    private final DefaultRemotingContext remotingContext;
    private final RemotingController remotingController;
    private ReconnectManager reconnectManager;
    private static final Log log = LogFactory.getLog(GeckoHandler.class);

    /* loaded from: input_file:com/taobao/gecko/service/impl/GeckoHandler$HeartBeatListener.class */
    private static final class HeartBeatListener implements SingleRequestCallBackListener {
        private final Connection conn;
        static final String HEARBEAT_FAIL_COUNT = "connection_heartbeat_fail_count";

        @Override // com.taobao.gecko.service.SingleRequestCallBackListener
        public ThreadPoolExecutor getExecutor() {
            return null;
        }

        private HeartBeatListener(Connection connection) {
            this.conn = connection;
        }

        @Override // com.taobao.gecko.service.SingleRequestCallBackListener
        public void onException(Exception exc) {
            innerCloseConnection(this.conn);
        }

        @Override // com.taobao.gecko.service.SingleRequestCallBackListener
        public void onResponse(ResponseCommand responseCommand, Connection connection) {
            if (responseCommand != null && responseCommand.getResponseStatus() == ResponseStatus.NO_ERROR) {
                this.conn.removeAttribute(HEARBEAT_FAIL_COUNT);
                return;
            }
            Integer num = (Integer) this.conn.setAttributeIfAbsent(HEARBEAT_FAIL_COUNT, 1);
            if (num != null) {
                Integer valueOf = Integer.valueOf(num.intValue() + 1);
                if (valueOf.intValue() < 3) {
                    connection.setAttribute(HEARBEAT_FAIL_COUNT, valueOf);
                } else {
                    innerCloseConnection(connection);
                }
            }
        }

        private void innerCloseConnection(Connection connection) {
            GeckoHandler.log.info("心跳检测失败，关闭连接" + connection.getRemoteSocketAddress() + ",分组信息" + connection.getGroupSet());
            try {
                connection.close(true);
            } catch (NotifyRemotingException e) {
                GeckoHandler.log.error("关闭连接失败", e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/taobao/gecko/service/impl/GeckoHandler$ProcessorRunner.class */
    public static final class ProcessorRunner<T extends RequestCommand> implements Runnable {
        private final DefaultConnection defaultConnection;
        private final RequestProcessor<T> processor;
        private final T message;

        private ProcessorRunner(DefaultConnection defaultConnection, RequestProcessor<T> requestProcessor, T t) {
            this.defaultConnection = defaultConnection;
            this.processor = requestProcessor;
            this.message = t;
        }

        @Override // java.lang.Runnable
        public void run() {
            this.processor.handleRequest(this.message, this.defaultConnection);
        }
    }

    public void setReconnectManager(ReconnectManager reconnectManager) {
        this.reconnectManager = reconnectManager;
    }

    private void responseThreadPoolBusy(Session session, Object obj, DefaultConnection defaultConnection) {
        if (defaultConnection == null || !(obj instanceof RequestCommand)) {
            return;
        }
        try {
            defaultConnection.response(defaultConnection.getRemotingContext().getCommandFactory().createBooleanAckCommand(((RequestCommand) obj).getRequestHeader(), ResponseStatus.THREADPOOL_BUSY, "线程池繁忙"));
        } catch (NotifyRemotingException e) {
            onExceptionCaught(session, e);
        }
    }

    public GeckoHandler(RemotingController remotingController) {
        this.remotingContext = (DefaultRemotingContext) remotingController.getRemotingContext();
        this.remotingController = remotingController;
    }

    @Override // com.taobao.gecko.core.core.Handler
    public void onExceptionCaught(Session session, Throwable th) {
        if (th.getCause() != null) {
            ExceptionMonitor.getInstance().exceptionCaught(th.getCause());
        } else {
            ExceptionMonitor.getInstance().exceptionCaught(th);
        }
    }

    @Override // com.taobao.gecko.core.core.Handler
    public void onMessageReceived(Session session, Object obj) {
        DefaultConnection connectionBySession = this.remotingContext.getConnectionBySession((NioSession) session);
        if (connectionBySession == null) {
            log.error("Connection[" + RemotingUtils.getAddrString(session.getRemoteSocketAddress()) + "]已经被关闭，无法处理消息");
            session.close();
        } else if (obj instanceof RequestCommand) {
            processRequest(session, obj, connectionBySession);
        } else {
            if (!(obj instanceof ResponseCommand)) {
                throw new IllegalMessageException("未知的消息类型" + obj);
            }
            processResponse(obj, connectionBySession);
        }
    }

    private void processResponse(Object obj, DefaultConnection defaultConnection) {
        ResponseCommand responseCommand = (ResponseCommand) obj;
        responseCommand.setResponseHost(defaultConnection.getRemoteSocketAddress());
        responseCommand.setResponseTime(System.currentTimeMillis());
        RequestCallBack requestCallBack = defaultConnection.getRequestCallBack(responseCommand.getOpaque());
        if (requestCallBack != null) {
            requestCallBack.onResponse(null, responseCommand, defaultConnection);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    private <T extends RequestCommand> void processRequest(Session session, Object obj, DefaultConnection defaultConnection) {
        RequestProcessor processorByMessage = getProcessorByMessage(obj);
        if (processorByMessage != null) {
            executeProcessor(session, (RequestCommand) obj, defaultConnection, processorByMessage);
        } else {
            log.error("未找到" + obj.getClass().getCanonicalName() + "对应的处理器");
            responseNoProcessor(session, obj, defaultConnection);
        }
    }

    private <T extends RequestCommand> RequestProcessor<T> getProcessorByMessage(Object obj) {
        return (RequestProcessor<T>) (obj instanceof HeartBeatRequestCommand ? this.remotingContext.processorMap.get(HeartBeatRequestCommand.class) : this.remotingContext.processorMap.get(obj.getClass()));
    }

    private <T extends RequestCommand> void executeProcessor(Session session, T t, DefaultConnection defaultConnection, RequestProcessor<T> requestProcessor) {
        if (requestProcessor.getExecutor() == null) {
            requestProcessor.handleRequest(t, defaultConnection);
            return;
        }
        try {
            requestProcessor.getExecutor().execute(new ProcessorRunner(defaultConnection, requestProcessor, t));
        } catch (RejectedExecutionException e) {
            responseThreadPoolBusy(session, t, defaultConnection);
        }
    }

    private void responseNoProcessor(Session session, Object obj, DefaultConnection defaultConnection) {
        if (defaultConnection == null || !(obj instanceof RequestCommand)) {
            return;
        }
        try {
            defaultConnection.response(defaultConnection.getRemotingContext().getCommandFactory().createBooleanAckCommand(((RequestCommand) obj).getRequestHeader(), ResponseStatus.NO_PROCESSOR, "未注册请求处理器，请求处理器类为" + obj.getClass().getCanonicalName()));
        } catch (NotifyRemotingException e) {
            onExceptionCaught(session, e);
        }
    }

    @Override // com.taobao.gecko.core.core.Handler
    public void onMessageSent(Session session, Object obj) {
    }

    @Override // com.taobao.gecko.core.core.Handler
    public void onSessionClosed(Session session) {
        InetSocketAddress remoteSocketAddress = session.getRemoteSocketAddress();
        DefaultConnection connectionBySession = this.remotingContext.getConnectionBySession((NioSession) session);
        if (connectionBySession == null) {
            session.close();
            return;
        }
        log.debug("远端连接" + RemotingUtils.getAddrString(remoteSocketAddress) + "断开,分组信息" + connectionBySession.getGroupSet());
        if (connectionBySession.isAllowReconnect() && this.reconnectManager != null) {
            waitForReady(connectionBySession);
            addReconnectTask(remoteSocketAddress, connectionBySession);
        }
        removeFromGroups(connectionBySession);
        connectionBySession.dispose();
        this.remotingContext.removeSession2ConnectionMapping((NioSession) session);
        adjustMaxScheduleWrittenBytes();
        this.remotingContext.notifyConnectionClosed(connectionBySession);
    }

    private void removeFromGroups(DefaultConnection defaultConnection) {
        Iterator<String> it = defaultConnection.getGroupSet().iterator();
        while (it.hasNext()) {
            this.remotingContext.removeConnectionFromGroup(it.next(), defaultConnection);
        }
    }

    private void addReconnectTask(InetSocketAddress inetSocketAddress, DefaultConnection defaultConnection) {
        Set<String> groupSet = defaultConnection.getGroupSet();
        log.info("远端连接" + RemotingUtils.getAddrString(inetSocketAddress) + "关闭，启动重连任务");
        synchronized (defaultConnection) {
            if (!groupSet.isEmpty() && !hasOnlyDefaultGroup(groupSet) && defaultConnection.isAllowReconnect()) {
                this.reconnectManager.addReconnectTask(new ReconnectTask(groupSet, inetSocketAddress));
                defaultConnection.setAllowReconnect(false);
            }
        }
    }

    private boolean hasOnlyDefaultGroup(Set<String> set) {
        return set.size() == 1 && set.contains(Constants.DEFAULT_GROUP);
    }

    private void waitForReady(DefaultConnection defaultConnection) {
        synchronized (defaultConnection) {
            int i = 0;
            while (!defaultConnection.isReady() && defaultConnection.isAllowReconnect()) {
                int i2 = i;
                i++;
                if (i2 >= 3) {
                    break;
                }
                try {
                    defaultConnection.wait(SelectorFactory.timeout);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        }
    }

    @Override // com.taobao.gecko.core.core.Handler
    public void onSessionConnected(Session session, Object... objArr) {
        TimerRef timerRef;
        Set<String> set = (Set) objArr[0];
        if (objArr.length >= 3 && (timerRef = (TimerRef) objArr[2]) != null) {
            timerRef.cancel();
        }
        DefaultConnection connectionBySession = this.remotingContext.getConnectionBySession((NioSession) session);
        if (connectionBySession != null) {
            try {
                if (!set.isEmpty()) {
                    addConnection2Group(connectionBySession, set);
                    if (connectionBySession == null && connectionBySession.isConnected()) {
                        notifyConnectionReady(connectionBySession);
                        return;
                    }
                }
            } catch (Throwable th) {
                if (connectionBySession != null && connectionBySession.isConnected()) {
                    notifyConnectionReady(connectionBySession);
                }
                throw th;
            }
        }
        session.close();
        log.error("建立的连接没有对应的connection");
        if (connectionBySession == null) {
        }
    }

    private void addConnection2Group(DefaultConnection defaultConnection, Set<String> set) {
        if (set.isEmpty() || hasOnlyDefaultGroup(set)) {
            closeConnectionWithoutReconnect(defaultConnection);
            return;
        }
        for (String str : set) {
            Object attribute = this.remotingController.getAttribute(str, Constants.CONNECTION_COUNT_ATTR);
            if (attribute == null) {
                log.info("连接被强制断开，由于分组" + str + "没有发起过连接请求");
                closeConnectionWithoutReconnect(defaultConnection);
                return;
            }
            int intValue = ((Integer) attribute).intValue();
            synchronized (this) {
                if (this.remotingController.getConnectionCount(str) < intValue) {
                    addConnectionToGroup(defaultConnection, str, intValue);
                } else if (removeDisconnectedConnection(str)) {
                    addConnectionToGroup(defaultConnection, str, intValue);
                } else {
                    log.warn("连接数(" + defaultConnection.getRemoteSocketAddress() + ")超过设定值" + intValue + "，连接将被关闭");
                    closeConnectionWithoutReconnect(defaultConnection);
                }
            }
        }
    }

    private void closeConnectionWithoutReconnect(DefaultConnection defaultConnection) {
        try {
            defaultConnection.close(false);
        } catch (NotifyRemotingException e) {
            log.error("关闭连接失败", e);
        }
    }

    private void notifyConnectionReady(DefaultConnection defaultConnection) {
        if (defaultConnection != null) {
            synchronized (defaultConnection) {
                defaultConnection.setReady(true);
                defaultConnection.notifyAll();
            }
            Iterator<ConnectionLifeCycleListener> it = this.remotingContext.connectionLifeCycleListenerList.iterator();
            while (it.hasNext()) {
                try {
                    it.next().onConnectionReady(defaultConnection);
                } catch (Throwable th) {
                    log.error("调用ConnectionLifeCycleListener.onConnectionReady异常", th);
                }
            }
        }
    }

    private boolean removeDisconnectedConnection(String str) {
        List<Connection> connectionsByGroup = this.remotingController.getRemotingContext().getConnectionsByGroup(str);
        Connection connection = null;
        if (connectionsByGroup != null) {
            synchronized (connectionsByGroup) {
                ListIterator<Connection> listIterator = connectionsByGroup.listIterator();
                while (true) {
                    if (!listIterator.hasNext()) {
                        break;
                    }
                    Connection next = listIterator.next();
                    if (!next.isConnected()) {
                        connection = next;
                        break;
                    }
                    if (!((DefaultConnection) next).isReady() && !next.getGroupSet().isEmpty()) {
                        notifyConnectionReady((DefaultConnection) next);
                    }
                }
            }
        }
        if (connection != null) {
            return connectionsByGroup.remove(connection);
        }
        return false;
    }

    private void addConnectionToGroup(DefaultConnection defaultConnection, String str, int i) {
        defaultConnection.getRemotingContext().addConnectionToGroup(str, defaultConnection);
        Object attribute = this.remotingController.getAttribute(str, Constants.GROUP_CONNECTION_READY_LOCK);
        if (attribute != null) {
            synchronized (attribute) {
                if (this.remotingController.getConnectionCount(str) >= i) {
                    attribute.notifyAll();
                }
            }
        }
    }

    @Override // com.taobao.gecko.core.core.Handler
    public void onSessionCreated(Session session) {
        log.debug("连接建立，远端信息:" + RemotingUtils.getAddrString(session.getRemoteSocketAddress()));
        DefaultConnection defaultConnection = new DefaultConnection((NioSession) session, this.remotingContext);
        this.remotingContext.addConnection(defaultConnection);
        this.remotingContext.addSession2ConnectionMapping((NioSession) session, defaultConnection);
        this.remotingContext.notifyConnectionCreated(defaultConnection);
        adjustMaxScheduleWrittenBytes();
    }

    private void adjustMaxScheduleWrittenBytes() {
        if (this.remotingController instanceof RemotingServer) {
            List<Connection> connectionsByGroup = this.remotingContext.getConnectionsByGroup(Constants.DEFAULT_GROUP);
            int size = connectionsByGroup != null ? connectionsByGroup.size() : 0;
            if (size > 0) {
                this.remotingContext.getConfig().setMaxScheduleWrittenBytes((Runtime.getRuntime().maxMemory() / 3) / size);
            }
        }
    }

    @Override // com.taobao.gecko.core.core.Handler
    public void onSessionExpired(Session session) {
    }

    @Override // com.taobao.gecko.core.core.Handler
    public void onSessionIdle(Session session) {
        DefaultConnection connectionBySession = this.remotingContext.getConnectionBySession((NioSession) session);
        try {
            connectionBySession.send(connectionBySession.getRemotingContext().getCommandFactory().createHeartBeatCommand(), new HeartBeatListener(connectionBySession), SelectorFactory.timeout, TimeUnit.MILLISECONDS);
        } catch (NotifyRemotingException e) {
            log.error("发送心跳命令失败", e);
        }
    }

    @Override // com.taobao.gecko.core.core.Handler
    public void onSessionStarted(Session session) {
    }
}
