package io.jafka.network;

import io.jafka.api.RequestKeys;
import io.jafka.mx.SocketServerStats;
import io.jafka.utils.Closer;
import java.io.EOFException;
import java.io.IOException;
import java.net.Socket;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/jafka/network/Processor.class */
public class Processor extends AbstractServerThread {
    private final BlockingQueue<SocketChannel> newConnections;
    private final Logger requestLogger = LoggerFactory.getLogger("jafka.request.logger");
    private RequestHandlerFactory requesthandlerFactory;
    private SocketServerStats stats;
    private int maxRequestSize;

    public Processor(RequestHandlerFactory requestHandlerFactory, SocketServerStats socketServerStats, int i, int i2) {
        this.requesthandlerFactory = requestHandlerFactory;
        this.stats = socketServerStats;
        this.maxRequestSize = i;
        this.newConnections = new ArrayBlockingQueue(i2);
    }

    @Override // java.lang.Runnable
    public void run() {
        startupComplete();
        while (isRunning()) {
            try {
                configureNewConnections();
                Selector selector = getSelector();
                if (selector.select(500L) > 0) {
                    Iterator<SelectionKey> it = selector.selectedKeys().iterator();
                    while (it.hasNext() && isRunning()) {
                        SelectionKey selectionKey = null;
                        try {
                            try {
                                selectionKey = it.next();
                                it.remove();
                                if (!selectionKey.isReadable()) {
                                    if (!selectionKey.isWritable()) {
                                        if (selectionKey.isValid()) {
                                            throw new IllegalStateException("Unrecognized key state for processor thread.");
                                            break;
                                        }
                                        close(selectionKey);
                                    } else {
                                        write(selectionKey);
                                    }
                                } else {
                                    read(selectionKey);
                                }
                            } catch (InvalidRequestException e) {
                                Socket socket = channelFor(selectionKey).socket();
                                this.logger.info(String.format("Closing socket connection to %s:%d due to invalid request: %s", socket.getInetAddress(), Integer.valueOf(socket.getPort()), e.getMessage()));
                                close(selectionKey);
                            }
                        } catch (EOFException e2) {
                            Socket socket2 = channelFor(selectionKey).socket();
                            this.logger.debug(String.format("connection closed by %s:%d.", socket2.getInetAddress(), Integer.valueOf(socket2.getPort())));
                            close(selectionKey);
                        } catch (Throwable th) {
                            Socket socket3 = channelFor(selectionKey).socket();
                            if (this.logger.isDebugEnabled()) {
                                this.logger.error(String.format("Closing socket for %s:%d becaulse of error %s", socket3.getInetAddress(), Integer.valueOf(socket3.getPort()), th.getMessage()), th);
                            } else {
                                this.logger.info(String.format("Closing socket for %s:%d becaulse of error %s", socket3.getInetAddress(), Integer.valueOf(socket3.getPort()), th.getMessage()));
                            }
                            close(selectionKey);
                        }
                    }
                }
            } catch (IOException e3) {
                this.logger.error(e3.getMessage(), e3);
            }
        }
        this.logger.debug("Closing selector while shutting down");
        closeSelector();
        shutdownComplete();
    }

    private SocketChannel channelFor(SelectionKey selectionKey) {
        return (SocketChannel) selectionKey.channel();
    }

    private void close(SelectionKey selectionKey) {
        SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("Closing connection from " + socketChannel.socket().getRemoteSocketAddress());
        }
        Closer.closeQuietly(socketChannel.socket());
        Closer.closeQuietly(socketChannel);
        selectionKey.attach(null);
        selectionKey.cancel();
    }

    private void write(SelectionKey selectionKey) throws IOException {
        Send send = (Send) selectionKey.attachment();
        this.stats.recordBytesWritten(send.writeTo(channelFor(selectionKey)));
        if (send.complete()) {
            selectionKey.attach(null);
            selectionKey.interestOps(1);
        } else {
            selectionKey.interestOps(4);
            getSelector().wakeup();
        }
    }

    private void read(SelectionKey selectionKey) throws IOException {
        Receive receive;
        SocketChannel channelFor = channelFor(selectionKey);
        if (selectionKey.attachment() == null) {
            receive = new BoundedByteBufferReceive(this.maxRequestSize);
            selectionKey.attach(receive);
        } else {
            receive = (Receive) selectionKey.attachment();
        }
        int readFrom = receive.readFrom(channelFor);
        this.stats.recordBytesRead(readFrom);
        if (readFrom < 0) {
            close(selectionKey);
            return;
        }
        if (!receive.complete()) {
            selectionKey.interestOps(1);
            getSelector().wakeup();
            if (this.logger.isTraceEnabled()) {
                this.logger.trace("reading request not been done. " + receive);
                return;
            }
            return;
        }
        Send handle = handle(selectionKey, receive);
        selectionKey.attach(null);
        if (handle != null) {
            selectionKey.attach(handle);
            selectionKey.interestOps(4);
        }
    }

    private Send handle(SelectionKey selectionKey, Receive receive) {
        short s = receive.buffer().getShort();
        RequestKeys valueOf = RequestKeys.valueOf(s);
        if (this.requestLogger.isTraceEnabled()) {
            if (valueOf == null) {
                throw new InvalidRequestException("No mapping found for handler id " + ((int) s));
            }
            this.requestLogger.trace(String.format("Handling %s request from %s", valueOf, channelFor(selectionKey).socket().getRemoteSocketAddress()));
        }
        RequestHandler mapping = this.requesthandlerFactory.mapping(valueOf, receive);
        if (mapping == null) {
            throw new InvalidRequestException("No handler found for request");
        }
        long nanoTime = System.nanoTime();
        Send handler = mapping.handler(valueOf, receive);
        this.stats.recordRequest(valueOf, System.nanoTime() - nanoTime);
        return handler;
    }

    private void configureNewConnections() throws ClosedChannelException {
        while (this.newConnections.size() > 0) {
            SocketChannel poll = this.newConnections.poll();
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("Listening to new connection from " + poll.socket().getRemoteSocketAddress());
            }
            poll.register(getSelector(), 1);
        }
    }

    public void accept(SocketChannel socketChannel) {
        this.newConnections.add(socketChannel);
        getSelector().wakeup();
    }
}
