/*
 * Decompiled with CFR 0.152.
 */
package io.scalecube.transport.netty.tcp;

import io.netty.channel.ChannelOption;
import io.scalecube.cluster.transport.api.TransportConfig;
import io.scalecube.transport.netty.Receiver;
import io.scalecube.transport.netty.TransportImpl;
import io.scalecube.transport.netty.tcp.TcpChannelInitializer;
import reactor.core.publisher.Mono;
import reactor.netty.DisposableServer;
import reactor.netty.tcp.TcpServer;

final class TcpReceiver
implements Receiver {
    private final TransportConfig config;

    TcpReceiver(TransportConfig config) {
        this.config = config;
    }

    @Override
    public Mono<DisposableServer> bind() {
        return Mono.deferContextual(context -> Mono.just((Object)context.get(TransportImpl.ReceiverContext.class))).flatMap(context -> this.newTcpServer((TransportImpl.ReceiverContext)context).handle((in, out) -> in.receive().retain().doOnNext(context::onMessage).then()).bind().cast(DisposableServer.class));
    }

    private TcpServer newTcpServer(TransportImpl.ReceiverContext context) {
        return (TcpServer)((TcpServer)((TcpServer)((TcpServer)TcpServer.create().runOn(context.loopResources()).port(this.config.port()).childOption(ChannelOption.TCP_NODELAY, (Object)true)).childOption(ChannelOption.SO_KEEPALIVE, (Object)true)).childOption(ChannelOption.SO_REUSEADDR, (Object)true)).doOnChannelInit((connectionObserver, channel, remoteAddress) -> new TcpChannelInitializer(this.config.maxFrameLength()).accept(connectionObserver, channel));
    }
}

