/*
 * Decompiled with CFR 0.152.
 */
package org.noear.folkmq.broker.embedded.mq;

import java.util.ArrayList;
import org.noear.folkmq.FolkMQ;
import org.noear.folkmq.broker.MqBorker;
import org.noear.folkmq.broker.MqBorkerInternal;
import org.noear.folkmq.broker.MqBorkerListener;
import org.noear.folkmq.broker.MqWatcher;
import org.noear.folkmq.broker.embedded.MqBrokerConfig;
import org.noear.folkmq.broker.embedded.admin.dso.QueueForceService;
import org.noear.folkmq.broker.embedded.admin.dso.ViewUtils;
import org.noear.folkmq.broker.embedded.admin.model.ServerInfoVo;
import org.noear.folkmq.broker.embedded.mq.FolkmqApiHandler;
import org.noear.folkmq.broker.store.fdb.MqSnapshotStore;
import org.noear.snack.ONode;
import org.noear.socketd.SocketD;
import org.noear.socketd.cluster.ClusterClientSession;
import org.noear.socketd.transport.client.ClientConfig;
import org.noear.socketd.transport.core.Entity;
import org.noear.socketd.transport.core.Listener;
import org.noear.socketd.transport.core.entity.StringEntity;
import org.noear.socketd.transport.core.listener.MessageHandler;
import org.noear.socketd.transport.server.ServerConfig;
import org.noear.socketd.utils.MemoryUtils;
import org.noear.solon.Solon;
import org.noear.solon.Utils;
import org.noear.solon.annotation.Component;
import org.noear.solon.annotation.Inject;
import org.noear.solon.boot.prop.impl.HttpServerProps;
import org.noear.solon.core.AppContext;
import org.noear.solon.core.bean.LifecycleBean;
import org.noear.solon.core.event.EventBus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Component
public class FolkmqLifecycleBean
implements LifecycleBean {
    private static final Logger log = LoggerFactory.getLogger(FolkmqLifecycleBean.class);
    private static boolean isStandalone;
    @Inject
    private AppContext appContext;
    @Inject
    private QueueForceService queueForceService;
    private MqBorker localServer;
    private MqBorkerListener brokerServiceListener;
    private ClusterClientSession brokerSession;

    public static boolean isStandalone() {
        return isStandalone;
    }

    public void start() throws Throwable {
        MqSnapshotStore watcher = new MqSnapshotStore();
        this.appContext.wrapAndPut(MqWatcher.class, (Object)watcher);
        if (Utils.isEmpty((String)MqBrokerConfig.proxyServer)) {
            isStandalone = true;
            this.startLocalServerMode((MqWatcher)watcher);
        } else {
            isStandalone = false;
            this.startBrokerSession(MqBrokerConfig.proxyServer, (MqWatcher)watcher);
        }
        log.info("Server:main: folkmq-broker: Started (SOCKET.D/{}-{}, folkmq/{})", new Object[]{SocketD.protocolVersion(), SocketD.version(), FolkMQ.versionName()});
    }

    private void startLocalServerMode(MqWatcher watcher) throws Exception {
        String schema = Solon.cfg().get("folkmq.schema");
        this.localServer = FolkMQ.createBorker((String)schema).config(c -> {
            ((ServerConfig)((ServerConfig)((ServerConfig)((ServerConfig)((ServerConfig)c.serialSend(true)).maxMemoryRatio(0.8f)).streamTimeout(MqBrokerConfig.streamTimeout)).ioThreads(MqBrokerConfig.ioThreads)).codecThreads(MqBrokerConfig.codecThreads)).exchangeThreads(MqBrokerConfig.exchangeThreads);
            EventBus.publish((Object)c);
        }).addAccessAll(MqBrokerConfig.getAccessMap());
        if (MqBrokerConfig.saveEnable) {
            this.localServer.watcher(watcher);
        }
        if (MqBrokerConfig.folkmqTransportPort > 0) {
            this.localServer.start(MqBrokerConfig.folkmqTransportPort);
        } else {
            this.localServer.start(Solon.cfg().serverPort() + 10000);
        }
        this.addApiEvent(this.localServer.getServerInternal());
        this.appContext.wrapAndPut(MqBorkerInternal.class, (Object)this.localServer.getServerInternal());
        log.info("FlokMQ local server started!");
    }

    private void startBrokerSession(String brokerServers, MqWatcher watcher) throws Exception {
        this.brokerServiceListener = new MqBorkerListener(true);
        this.brokerServiceListener.doOn("admin.view.queue", (s, m) -> {
            if (m.isRequest() || m.isSubscribe()) {
                String json = ONode.stringify(ViewUtils.queueView((MqBorkerInternal)this.brokerServiceListener));
                s.replyEnd(m, (Entity)new StringEntity(json));
            }
        });
        this.brokerServiceListener.doOn("admin.view.instance", (s, m) -> {
            if (m.isRequest() || m.isSubscribe()) {
                ServerInfoVo infoVo = new ServerInfoVo();
                infoVo.memoryRatio = MemoryUtils.getUseMemoryRatio();
                String json = ONode.stringify((Object)infoVo);
                s.replyEnd(m, (Entity)new StringEntity(json));
            }
        });
        this.brokerServiceListener.doOn("admin.queue.force.distribute", (s, m) -> {
            String topic = m.meta("mq.topic");
            String consumerGroup = m.meta("mq.consumer");
            this.queueForceService.forceDistribute((MqBorkerInternal)this.brokerServiceListener, topic, consumerGroup, false);
        });
        this.brokerServiceListener.doOn("admin.queue.force.delete", (s, m) -> {
            String topic = m.meta("mq.topic");
            String consumerGroup = m.meta("mq.consumer");
            this.queueForceService.forceDelete((MqBorkerInternal)this.brokerServiceListener, topic, consumerGroup, false);
        });
        this.brokerServiceListener.doOn("admin.queue.force.clear", (s, m) -> {
            String topic = m.meta("mq.topic");
            String consumerGroup = m.meta("mq.consumer");
            this.queueForceService.forceClear((MqBorkerInternal)this.brokerServiceListener, topic, consumerGroup, false);
        });
        this.addApiEvent((MqBorkerInternal)this.brokerServiceListener);
        if (MqBrokerConfig.saveEnable) {
            this.brokerServiceListener.watcher(watcher);
        }
        ArrayList<String> serverUrls = new ArrayList<String>();
        for (String url : brokerServers.split(",")) {
            if (Utils.isEmpty((String)(url = url.trim().replace("folkmq://", "sd:tcp://")))) continue;
            if (!url.contains("@=")) {
                url = url.contains("?") ? url + "&@=" + "folkmq-server" : url + "?@=" + "folkmq-server";
            }
            serverUrls.add(url);
        }
        this.brokerSession = (ClusterClientSession)SocketD.createClusterClient(serverUrls).config(c -> {
            HttpServerProps serverProps = HttpServerProps.getInstance();
            c.metaPut("folkmq-version", FolkMQ.versionCodeAsString());
            c.metaPut("port", String.valueOf(serverProps.getWrapPort()));
            c.metaPut("host", serverProps.getWrapHost());
            ((ClientConfig)((ClientConfig)((ClientConfig)((ClientConfig)c.heartbeatInterval(6000L).serialSend(true)).maxMemoryRatio(0.8f)).ioThreads(MqBrokerConfig.ioThreads)).codecThreads(MqBrokerConfig.codecThreads)).exchangeThreads(MqBrokerConfig.exchangeThreads);
            EventBus.publish((Object)c);
        }).listen((Listener)this.brokerServiceListener).open();
        this.brokerServiceListener.start(null);
        this.appContext.wrapAndPut(MqBorkerInternal.class, (Object)this.brokerServiceListener);
        log.info("FlokMQ broker service started!");
    }

    public void preStop() throws Throwable {
        if (this.localServer != null) {
            this.localServer.prestop();
        }
        if (this.brokerSession != null) {
            this.brokerSession.preclose();
        }
        log.info("Server:main: folkmq-broker: Stop starting (SOCKET.D/{}-{}, folkmq/{})", new Object[]{SocketD.protocolVersion(), SocketD.version(), FolkMQ.versionName()});
    }

    public void stop() throws Throwable {
        if (this.localServer != null) {
            this.localServer.stop();
        }
        if (this.brokerSession != null) {
            this.brokerSession.close();
            this.brokerServiceListener.stop(null);
        }
        log.info("Server:main: folkmq-broker: Has Stopped (SOCKET.D/{}-{}, folkmq/{})", new Object[]{SocketD.protocolVersion(), SocketD.version(), FolkMQ.versionName()});
    }

    private void addApiEvent(MqBorkerInternal serviceInternal) {
        FolkmqApiHandler handler = new FolkmqApiHandler(this.queueForceService, (MqBorkerListener)serviceInternal);
        serviceInternal.doOnEvent("mq.api", (MessageHandler)handler);
    }
}

