/*
 * Decompiled with CFR 0.152.
 */
package org.noear.folkmq.broker;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import org.noear.folkmq.broker.MqBorkerListener;
import org.noear.folkmq.broker.MqMessageHolder;
import org.noear.folkmq.broker.MqNextTime;
import org.noear.folkmq.broker.MqQueue;
import org.noear.folkmq.broker.MqQueueBase;
import org.noear.folkmq.broker.MqWatcher;
import org.noear.socketd.transport.client.ClientSession;
import org.noear.socketd.transport.core.Entity;
import org.noear.socketd.transport.core.Message;
import org.noear.socketd.transport.core.Session;
import org.noear.socketd.utils.SessionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MqQueueDefault
extends MqQueueBase
implements MqQueue {
    private static final Logger log = LoggerFactory.getLogger(MqQueueDefault.class);
    private final MqBorkerListener serviceListener;
    private final boolean transaction;
    private final AtomicReference<Boolean> sequenceLock = new AtomicReference<Boolean>(false);
    private final String topic;
    private final String consumerGroup;
    private final String queueName;
    private final MqWatcher watcher;

    public MqQueueDefault(MqBorkerListener serviceListener, MqWatcher watcher, String topic, String consumerGroup, String queueName) {
        this.serviceListener = serviceListener;
        this.topic = topic;
        this.consumerGroup = consumerGroup;
        this.queueName = queueName;
        this.transaction = "!".equals(consumerGroup);
        this.watcher = watcher;
    }

    @Override
    public boolean isTransaction() {
        return this.transaction;
    }

    @Override
    public String getTopic() {
        return this.topic;
    }

    @Override
    public String getConsumerGroup() {
        return this.consumerGroup;
    }

    @Override
    public String getQueueName() {
        return this.queueName;
    }

    public Map<String, MqMessageHolder> getMessageMap() {
        return Collections.unmodifiableMap(this.messageMap);
    }

    @Override
    public void add(MqMessageHolder messageHolder) {
        this.messageAddLock.lock();
        try {
            if (messageHolder.getDistributeTime() != this.messageDistributeTime.get() && messageHolder.getDistributeTime() < System.currentTimeMillis() + 1000L) {
                this.messageDistributeTime.set(messageHolder.getDistributeTime());
                this.messageIndexer.set(0L);
            }
            messageHolder.setDistributeIdx(this.messageIndexer.incrementAndGet());
            this.messageMap.put(messageHolder.getKey(), messageHolder);
            this.internalAdd(messageHolder);
        }
        finally {
            this.messageAddLock.unlock();
        }
    }

    @Override
    public boolean distribute() {
        MqMessageHolder messageHolder;
        if (this.sequenceLock.get().booleanValue()) {
            return true;
        }
        if (this.targetSessionCount() == 0) {
            messageHolder = (MqMessageHolder)this.messageQueue.peek();
            if (messageHolder == null) {
                return false;
            }
            if (messageHolder.getExpiration() < 1L) {
                return false;
            }
            if (messageHolder.getExpiration() > System.currentTimeMillis()) {
                return false;
            }
        }
        if ((messageHolder = this.messageQueue.poll()) != null) {
            if (messageHolder.isTransaction()) {
                return this.transpond0(messageHolder);
            }
            return this.distribute0(messageHolder);
        }
        return false;
    }

    protected int targetSessionCount() {
        if (this.isTransaction()) {
            return this.serviceListener.brokerListener.getSessionCount();
        }
        return this.sessionCount();
    }

    @Override
    public void removeAt(String key) {
        MqMessageHolder messageHolder = (MqMessageHolder)this.messageMap.get(key);
        if (messageHolder != null) {
            this.internalDelete(messageHolder);
            this.internalRemove(messageHolder);
        }
    }

    @Override
    public void affirmAt(String key, boolean isRollback) {
        MqMessageHolder messageHolder = (MqMessageHolder)this.messageMap.get(key);
        if (messageHolder != null) {
            this.internalRemove(messageHolder);
            this.affirmAtDo(messageHolder, isRollback);
        }
    }

    protected void affirmAtDo(MqMessageHolder messageHolder, boolean isRollback) {
        this.internalDelete(messageHolder);
        if (!isRollback) {
            Message message = messageHolder.noTransaction();
            this.serviceListener.routingDo(messageHolder.mr, message);
        }
    }

    private void internalAdd(MqMessageHolder mh) {
        this.messageQueue.add(mh);
    }

    private void internalRemove(MqMessageHolder mh) {
        this.messageQueue.remove(mh);
    }

    @Override
    public void forceClear() {
        this.messageAddLock.lock();
        try {
            this.messageIndexer.set(0L);
            this.messageMap.clear();
            this.messageQueue.clear();
            System.gc();
        }
        finally {
            this.messageAddLock.unlock();
        }
    }

    @Override
    public void forceDistribute(int times, int count) {
        if (count == 0 || count > this.messageTotal()) {
            count = this.messageTotal();
        }
        ArrayList<MqMessageHolder> msgList = new ArrayList<MqMessageHolder>(count);
        for (Map.Entry kv : this.messageMap.entrySet()) {
            MqMessageHolder msg = (MqMessageHolder)kv.getValue();
            if (!(msg.isDone() || msg.getDistributeCount() < times && msg.getDistributeCountPre() < times)) {
                msgList.add(msg);
            }
            if (msgList.size() != count) continue;
            break;
        }
        for (MqMessageHolder msg : msgList) {
            this.messageQueue.remove(msg);
            msg.delayed(System.currentTimeMillis());
            this.messageQueue.add(msg);
        }
    }

    protected boolean transpond0(MqMessageHolder messageHolder) {
        if (messageHolder.isDone()) {
            this.internalDelete(messageHolder);
            return true;
        }
        if (messageHolder.getExpiration() > 0L && messageHolder.getExpiration() < System.currentTimeMillis()) {
            this.internalDelete(messageHolder);
            if (log.isWarnEnabled()) {
                log.warn("Queue: message have expired, key={}", (Object)messageHolder.getKey());
            }
            return true;
        }
        Session s1 = null;
        s1 = this.serviceListener.proxyMode ? this.serviceListener.brokerListener.getSessionAny() : this.serviceListener.brokerListener.getPlayerAny(messageHolder.getSender());
        if (s1 != null) {
            try {
                this.serviceListener.qpsDistribute.record();
                s1.sendAndRequest("mq.event.request", (Entity)messageHolder.getEntity(), MqNextTime.maxConsumeMillis()).thenReply(r -> {
                    int ack = Integer.parseInt(r.metaOrDefault("mq.ack", "0"));
                    if (ack == 1) {
                        this.affirmAtDo(messageHolder, false);
                    } else {
                        this.affirmAtDo(messageHolder, true);
                    }
                }).thenError(err -> {
                    this.internalAdd(messageHolder.delayed());
                    if (log.isDebugEnabled()) {
                        log.debug("Queue: request then error, key={}", (Object)messageHolder.getKey(), err);
                    }
                });
            }
            catch (Throwable e) {
                if (s1 != null && !s1.isValid()) {
                    this.sessionRemove(s1);
                }
                this.internalAdd(messageHolder.delayed());
                if (log.isWarnEnabled()) {
                    log.warn("Queue: request error, key={}", (Object)messageHolder.getKey(), (Object)e);
                }
            }
        } else {
            this.internalAdd(messageHolder.delayed());
            if (log.isDebugEnabled()) {
                if (this.serviceListener.proxyMode) {
                    log.debug("Queue: request: broker no sessions, times={}, key={}", (Object)messageHolder.getDistributeCount(), (Object)messageHolder.getKey());
                } else {
                    log.debug("Queue: request: @{} no sessions, times={}, key={}", new Object[]{messageHolder.getSender(), messageHolder.getDistributeCount(), messageHolder.getKey()});
                }
            }
        }
        return true;
    }

    protected boolean distribute0(MqMessageHolder messageHolder) {
        block12: {
            if (messageHolder.isDone()) {
                this.internalDelete(messageHolder);
                return true;
            }
            if (messageHolder.getExpiration() > 0L && messageHolder.getExpiration() < System.currentTimeMillis()) {
                this.internalDelete(messageHolder);
                if (log.isWarnEnabled()) {
                    log.warn("Queue: message have expired, key={}", (Object)messageHolder.getKey());
                }
                return true;
            }
            if (messageHolder.isSequence()) {
                if (messageHolder.getDistributeTimeRef() > System.currentTimeMillis()) {
                    this.internalAdd(messageHolder);
                    return false;
                }
                this.sequenceLock.set(true);
            }
            if (this.sessionCount() > 0) {
                Session s1 = this.sessionGetOne(messageHolder);
                try {
                    if (s1 == null) {
                        this.internalAdd(messageHolder.delayed());
                        this.sequenceLock.set(false);
                        break block12;
                    }
                    this.distributeDo(s1, messageHolder);
                }
                catch (Throwable e) {
                    if (s1 != null && !s1.isValid()) {
                        this.sessionRemove(s1);
                    }
                    this.internalAdd(messageHolder.delayed());
                    this.sequenceLock.set(false);
                    if (log.isWarnEnabled()) {
                        log.warn("Queue: distribute error, key={}", (Object)messageHolder.getKey(), (Object)e);
                    }
                    break block12;
                }
            }
            this.internalAdd(messageHolder.delayed());
            this.sequenceLock.set(false);
            if (log.isDebugEnabled()) {
                log.debug("Queue: distribute: @{} no sessions, times={}, key={}", new Object[]{this.consumerGroup, messageHolder.getDistributeCount(), messageHolder.getKey()});
            }
        }
        return true;
    }

    private void distributeDo(Session s1, MqMessageHolder messageHolder) throws IOException {
        this.watcher.onDistribute(this.topic, this.consumerGroup, messageHolder);
        if (messageHolder.getQos() > 0 && !messageHolder.isBroadcast()) {
            this.serviceListener.qpsDistribute.record();
            s1.sendAndRequest("mq.event.distribute", (Entity)messageHolder.getEntity(), MqNextTime.maxConsumeMillis()).thenReply(r -> {
                int ack = Integer.parseInt(r.metaOrDefault("mq.ack", "0"));
                this.acknowledgeDo(messageHolder, ack, true);
            }).thenError(err -> this.acknowledgeDo(messageHolder, 0, true));
            messageHolder.preDelayed(System.currentTimeMillis() + MqNextTime.maxConsumeMillis());
            this.internalAdd(messageHolder);
        } else {
            if (messageHolder.isBroadcast()) {
                for (Session s0 : this.sessionAll()) {
                    if (!SessionUtils.isActive((ClientSession)s0)) continue;
                    this.serviceListener.qpsDistribute.record();
                    s0.send("mq.event.distribute", (Entity)messageHolder.getEntity());
                }
            } else {
                this.serviceListener.qpsDistribute.record();
                s1.send("mq.event.distribute", (Entity)messageHolder.getEntity());
            }
            this.acknowledgeDo(messageHolder, 1, false);
        }
    }

    @Override
    public void acknowledgeAt(String key, int ack) {
        MqMessageHolder messageHolder = (MqMessageHolder)this.messageMap.get(key);
        if (messageHolder != null) {
            this.acknowledgeDo(messageHolder, ack, true);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void acknowledgeDo(MqMessageHolder messageHolder, int ack, boolean removeQueue) {
        try {
            if (!this.messageMap.containsKey(messageHolder.getKey())) {
                return;
            }
            if (ack > 0) {
                this.internalDelete(messageHolder);
                if (removeQueue) {
                    this.internalRemove(messageHolder);
                }
                messageHolder.setDone(true);
            } else {
                this.internalRemove(messageHolder);
                this.internalAdd(messageHolder.delayed());
            }
            this.watcher.onAcknowledge(this.topic, this.consumerGroup, messageHolder, ack > 0);
        }
        finally {
            this.sequenceLock.set(false);
        }
    }

    protected void internalDelete(MqMessageHolder messageHolder) {
        this.watcher.onRemove(this.topic, this.consumerGroup, messageHolder);
        this.messageMap.remove(messageHolder.getKey());
    }
}

