/*
 * Decompiled with CFR 0.152.
 */
package org.noear.folkmq.broker;

import java.util.Map;
import org.noear.folkmq.broker.MqBorker;
import org.noear.folkmq.broker.MqBorkerInternal;
import org.noear.folkmq.broker.MqBorkerListener;
import org.noear.folkmq.broker.MqWatcher;
import org.noear.socketd.SocketD;
import org.noear.socketd.transport.core.Listener;
import org.noear.socketd.transport.server.Server;
import org.noear.socketd.transport.server.ServerConfig;
import org.noear.socketd.transport.server.ServerConfigHandler;
import org.noear.socketd.utils.RunnableEx;
import org.noear.socketd.utils.StrUtils;

public class MqBorkerDefault
implements MqBorker {
    private final MqBorkerListener serverListener;
    private final String serverSchema;
    private Server server;
    private ServerConfigHandler serverConfigHandler;

    public MqBorkerDefault(String schema, MqBorkerListener serverListener) {
        this.serverSchema = StrUtils.isEmpty((String)schema) ? "sd:tcp" : schema;
        this.serverListener = serverListener == null ? new MqBorkerListener(false) : serverListener;
    }

    public MqBorkerDefault(String schema) {
        this(schema, null);
    }

    public MqBorkerDefault() {
        this(null, null);
    }

    @Override
    public MqBorker config(ServerConfigHandler configHandler) {
        this.serverConfigHandler = configHandler;
        return this;
    }

    @Override
    public MqBorker watcher(MqWatcher watcher) {
        this.serverListener.watcher(watcher);
        return this;
    }

    @Override
    public MqBorker addAccess(String accessKey, String accessSecretKey) {
        this.serverListener.addAccess(accessKey, accessSecretKey);
        return this;
    }

    @Override
    public MqBorker addAccessAll(Map<String, String> accessMap) {
        this.serverListener.addAccessAll(accessMap);
        return this;
    }

    @Override
    public MqBorker start(int port) throws Exception {
        this.server = SocketD.createServer((String)this.serverSchema);
        this.server.config(c -> {
            ServerConfig cfr_ignored_0 = (ServerConfig)((ServerConfig)((ServerConfig)((ServerConfig)((ServerConfig)((ServerConfig)c.serialSend(true)).maxMemoryRatio(0.8f)).streamTimeout(300000L)).ioThreads(1)).codecThreads(1)).exchangeThreads(1);
        });
        if (this.serverConfigHandler != null) {
            this.server.config(this.serverConfigHandler);
        }
        this.server.config(c -> c.port(port)).listen((Listener)this.serverListener);
        this.serverListener.start((RunnableEx<Exception>)((RunnableEx)() -> this.server.start()));
        return this;
    }

    @Override
    public void prestop() {
        this.server.prestop();
    }

    @Override
    public void stop() {
        this.serverListener.stop(() -> this.server.stop());
    }

    @Override
    public MqBorkerInternal getServerInternal() {
        return this.serverListener;
    }
}

