/*
 * Decompiled with CFR 0.152.
 */
package org.noear.folkmq.broker;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ScheduledFuture;
import org.noear.folkmq.FolkMQ;
import org.noear.folkmq.broker.MqBorkerInternal;
import org.noear.folkmq.broker.MqBorkerListenerBase;
import org.noear.folkmq.broker.MqDraft;
import org.noear.folkmq.broker.MqQps;
import org.noear.folkmq.broker.MqQueue;
import org.noear.folkmq.broker.MqWatcher;
import org.noear.folkmq.broker.MqWatcherDefault;
import org.noear.folkmq.common.MqMetasResolver;
import org.noear.folkmq.common.MqUtils;
import org.noear.snack.ONode;
import org.noear.socketd.broker.BrokerListener;
import org.noear.socketd.exception.SocketDAlarmException;
import org.noear.socketd.transport.core.Entity;
import org.noear.socketd.transport.core.Message;
import org.noear.socketd.transport.core.Session;
import org.noear.socketd.transport.core.entity.StringEntity;
import org.noear.socketd.transport.core.listener.MessageHandler;
import org.noear.socketd.utils.RunUtils;
import org.noear.socketd.utils.RunnableEx;

public class MqBorkerListener
extends MqBorkerListenerBase
implements MqBorkerInternal {
    protected final BrokerListener brokerListener = new BrokerListener();
    protected final MqQps qpsPublish = new MqQps();
    protected final MqQps qpsDistribute = new MqQps();
    protected final ScheduledFuture<?> qpsScheduled;

    @Override
    public MqQps getQpsDistribute() {
        return this.qpsDistribute;
    }

    @Override
    public MqQps getQpsPublish() {
        return this.qpsPublish;
    }

    public MqBorkerListener(boolean proxyMode) {
        this.proxyMode = proxyMode;
        this.distributeThread = new Thread(this::distributeDo, "distributeThread");
        this.watcher = new MqWatcherDefault();
        this.watcher.init(this);
        this.qpsScheduled = RunUtils.delayAndRepeat(() -> {
            this.qpsPublish.reset();
            this.qpsDistribute.reset();
        }, (long)5000L);
        this.doOn("mq.event.subscribe", (s, m) -> {
            this.onSubscribe(s, m);
            this.confirmDo(s, m);
        });
        this.doOn("mq.event.unsubscribe", (s, m) -> {
            this.onUnsubscribe(s, m);
            this.confirmDo(s, m);
        });
        this.doOn("mq.event.publish", (s, m) -> {
            this.qpsPublish.record();
            MqMetasResolver mr = MqUtils.getOf((Entity)m);
            boolean isTrans = mr.isTransaction((Entity)m);
            if (proxyMode) {
                m.putMeta("X-Unlimited", "1");
            }
            if (isTrans) {
                mr.bakExpiration((Entity)m, true);
                mr.bakScheduled((Entity)m, true);
                mr.setExpiration((Entity)m, System.currentTimeMillis() + 3600000L);
                mr.setScheduled((Entity)m, System.currentTimeMillis() + 60000L);
                MqDraft draft = new MqDraft(mr, m);
                String queueName = draft.topic + "#" + "!";
                this.transactionMessageMap.put(draft.key, draft.topic);
                this.queueGetOrInit(draft.topic, "!", queueName);
                this.routingToQueueName(draft, queueName);
            } else {
                this.onPublish(s, m, mr);
            }
            this.confirmDo(s, m);
        });
        this.doOn("mq.event.publish2", (s, m) -> {
            boolean isRollback = "1".equals(m.meta("mq.rollback"));
            String[] keyAry = m.dataAsString().split(",");
            boolean confirmIsOk = true;
            String confirmHint = "";
            if (!isRollback) {
                for (String key : keyAry) {
                    if (this.transactionMessageMap.containsKey(key)) continue;
                    confirmIsOk = false;
                    confirmHint = confirmHint.length() > 0 ? confirmHint + "," + key : "Transaction messages have failed to be published: " + key;
                }
            }
            if (confirmIsOk) {
                for (String key : keyAry) {
                    String queueName;
                    MqQueue queue;
                    String topic = (String)this.transactionMessageMap.remove(key);
                    if (topic == null || (queue = this.getQueue(queueName = topic + "#" + "!")) == null) continue;
                    queue.affirmAt(key, isRollback);
                }
            }
            this.confirmDo(s, m, confirmIsOk, confirmHint);
        });
        this.doOn("mq.event.unpublish", (s, m) -> {
            this.onUnpublish(s, m);
            this.confirmDo(s, m);
        });
        this.doOn("mq.event.save", (s, m) -> RunUtils.asyncAndTry(() -> {
            this.save();
            this.confirmDo(s, m);
        }));
        this.doOn("mq.event.request", (s, m) -> {
            this.qpsPublish.record();
            String atName = m.atName();
            Session responder = this.brokerListener.getPlayerAny(atName, s, m);
            if (responder != null && responder.isValid()) {
                try {
                    this.qpsDistribute.record();
                    this.brokerListener.forwardToSession(s, m, responder);
                }
                catch (Throwable e) {
                    s.sendAlarm(m, "Broker: forward '@" + atName + "' error: " + e.getMessage());
                }
            } else {
                s.sendAlarm(m, "Broker: don't have '@" + atName + "' session");
            }
        });
    }

    public void onSend(Session session, Message message) {
        if ("mq.event.distribute".equals(message.event())) {
            MqMetasResolver mr = null;
            mr = this.proxyMode ? MqUtils.getOf((Entity)message) : MqUtils.getOf(session);
            if (mr.version() >= 3) {
                session.config().getStreamManger().removeStream(message.sid());
            }
        }
    }

    public void onReply(Session session, Message message) {
        if ("mq.event.distribute".equals(message.event())) {
            MqMetasResolver mr = null;
            mr = this.proxyMode ? MqUtils.getOf((Entity)message) : MqUtils.getOf(session);
            if (mr.version() >= 3) {
                String key = mr.getKey((Entity)message);
                String topic = mr.getTopic((Entity)message);
                String consumerGroup = mr.getConsumerGroup((Entity)message);
                String queueName = topic + "#" + consumerGroup;
                MqQueue queue = this.getQueue(queueName);
                if (queue != null) {
                    int ack = Integer.parseInt(message.metaOrDefault("mq.ack", "0"));
                    queue.acknowledgeAt(key, ack);
                }
            }
        }
    }

    public MqBorkerListener watcher(MqWatcher watcher) {
        if (watcher != null) {
            this.watcher = watcher;
            this.watcher.init(this);
        }
        return this;
    }

    public MqBorkerListener addAccess(String accessKey, String accessSecretKey) {
        this.serverAccessMap.put(accessKey, accessSecretKey);
        return this;
    }

    public MqBorkerListener addAccessAll(Map<String, String> accessMap) {
        if (accessMap != null) {
            this.serverAccessMap.putAll(accessMap);
        }
        return this;
    }

    @Override
    public void save() {
        this.watcher.onSave();
    }

    public void start(RunnableEx<Exception> onStart) throws Exception {
        this.watcher.onStartBefore();
        if (onStart != null) {
            onStart.run();
        }
        this.distributeThread.start();
        this.watcher.onStartAfter();
        this.isStarted.set(true);
    }

    public void stop(Runnable onStop) {
        this.watcher.onStopBefore();
        if (onStop != null) {
            onStop.run();
        }
        this.distributeThread.interrupt();
        this.watcher.onStopAfter();
        ArrayList queueList = new ArrayList(this.queueMap.values());
        for (MqQueue queue : queueList) {
            queue.close();
        }
        if (this.qpsScheduled != null) {
            this.qpsScheduled.cancel(true);
        }
        this.isStarted.set(false);
    }

    public void onOpen(Session session) throws IOException {
        super.onOpen(session);
        session.handshake().outMeta("folkmq-version", FolkMQ.versionCodeAsString());
        if (this.proxyMode) {
            session.send("mq.event.join", (Entity)new StringEntity("").metaPut("X-Unlimited", "1"));
            log.info("Broker: {} channel opened, sessionId={}", (Object)this.chanelType(), (Object)session.sessionId());
        } else {
            if (!this.auth(session)) {
                session.close();
                return;
            }
            log.info("Broker: {} channel opened, sessionId={}", (Object)this.chanelType(), (Object)session.sessionId());
        }
        this.sessionAllMap.put(session.sessionId(), session);
        this.brokerListener.onOpen(session);
    }

    protected boolean auth(Session session) {
        if (this.serverAccessMap.size() > 0) {
            String accessKey = session.param("ak");
            String accessSecretKey = session.param("sk");
            if (accessKey == null || accessSecretKey == null) {
                return false;
            }
            if (!accessSecretKey.equals(this.serverAccessMap.get(accessKey))) {
                return false;
            }
        }
        return true;
    }

    public void onClose(Session session) {
        super.onClose(session);
        log.info("Broker: {} channel closed, sessionId={}", (Object)this.chanelType(), (Object)session.sessionId());
        ArrayList queueNameList = new ArrayList(session.attrMap().keySet());
        for (String queueName : queueNameList) {
            MqQueue queue = (MqQueue)this.queueMap.get(queueName);
            if (queue == null) continue;
            queue.sessionRemove(session);
        }
        this.brokerListener.onClose(session);
    }

    public void onError(Session session, Throwable error) {
        super.onError(session, error);
        if (log.isWarnEnabled()) {
            if (error instanceof SocketDAlarmException) {
                SocketDAlarmException alarmException = (SocketDAlarmException)error;
                log.warn("Broker: {} channel error, sessionId={}, from={}", new Object[]{this.chanelType(), session.sessionId(), alarmException.getAlarm(), error});
            } else {
                log.warn("Broker: {} channel error, sessionId={}", new Object[]{this.chanelType(), session.sessionId(), error});
            }
        }
    }

    @Override
    public void doOnEvent(String event, MessageHandler handler) {
        this.doOn(event, handler);
    }

    protected void onSubscribe(Session s, Message m) throws IOException {
        String is_batch = m.meta("mq.batch");
        if ("1".equals(is_batch)) {
            ONode oNode = ONode.loadStr((String)m.dataAsString());
            Map subscribeData = (Map)oNode.toObject();
            if (subscribeData != null) {
                for (Map.Entry kv : subscribeData.entrySet()) {
                    for (String queueName : (Collection)kv.getValue()) {
                        String consumerGroup = queueName.split("#")[1];
                        this.watcher.onSubscribe((String)kv.getKey(), consumerGroup, s);
                        this.subscribeDo((String)kv.getKey(), consumerGroup, s);
                    }
                }
            }
        } else {
            String topic = m.meta("mq.topic");
            String consumerGroup = m.meta("mq.consumer");
            this.watcher.onSubscribe(topic, consumerGroup, s);
            this.subscribeDo(topic, consumerGroup, s);
        }
    }

    protected void onUnsubscribe(Session s, Message m) throws IOException {
        String topic = m.meta("mq.topic");
        String consumerGroup = m.meta("mq.consumer");
        this.watcher.onUnSubscribe(topic, consumerGroup, s);
        this.unsubscribeDo(topic, consumerGroup, s);
    }

    private void onPublish(Session s, Message m, MqMetasResolver mr) throws IOException {
        if (m == null) {
            return;
        }
        this.watcher.onPublish(m);
        this.routingDo(mr, m);
    }

    private void onUnpublish(Session s, Message m) throws IOException {
        this.watcher.onUnPublish(m);
        this.unRoutingDo(m);
    }

    private void confirmDo(Session s, Message m) throws IOException {
        this.confirmDo(s, m, true, "");
    }

    private void confirmDo(Session s, Message m, boolean isOk, String hint) throws IOException {
        if ((m.isRequest() || m.isSubscribe()) && s.isValid()) {
            s.replyEnd(m, (Entity)new StringEntity(hint).metaPut("mq.confirm", isOk ? "1" : "0"));
        }
    }
}

