package org.jetlinks.core.message.codec.context;

import java.time.Duration;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;

/* loaded from: input_file:org/jetlinks/core/message/codec/context/QueueSerialContext.class */
class QueueSerialContext<IN, OUT> implements SerialContext<IN, OUT> {
    private static final Logger log = LoggerFactory.getLogger(QueueSerialContext.class);
    int maxSize = 256;
    EmitterProcessor<IN> inputProcessor = EmitterProcessor.create(false);
    FluxSink<IN> inputSink = this.inputProcessor.sink(FluxSink.OverflowStrategy.BUFFER);
    Queue<Tuple2<IN, Consumer<OUT>>> inputQueue = new ConcurrentLinkedQueue();
    AtomicReference<Consumer<OUT>> output = new AtomicReference<>();
    AtomicBoolean awaiting = new AtomicBoolean();

    @Override // org.jetlinks.core.message.codec.context.SerialContext
    public Mono<OUT> inputAndAwait(IN in, Duration duration) {
        return Mono.create(monoSink -> {
            if (this.inputQueue.size() >= this.maxSize) {
                monoSink.error(new UnsupportedOperationException("out of serial queue"));
                drain();
                return;
            }
            monoSink.getClass();
            Tuple2<IN, Consumer<OUT>> of = Tuples.of(in, monoSink::success);
            this.inputQueue.add(of);
            drain();
            monoSink.onDispose(() -> {
                this.inputQueue.remove(of);
                drain();
            });
        }).timeout(duration, Mono.error(TimeoutException::new));
    }

    private void drain() {
        Tuple2<IN, Consumer<OUT>> poll;
        if (!this.awaiting.compareAndSet(false, true) || (poll = this.inputQueue.poll()) == null) {
            return;
        }
        this.output.set(poll.getT2());
        this.inputSink.next(poll.getT1());
    }

    @Override // org.jetlinks.core.message.codec.context.SerialContext
    public void output(OUT out) {
        Consumer<OUT> andSet = this.output.getAndSet(null);
        if (andSet != null) {
            andSet.accept(out);
            this.awaiting.set(false);
        }
        drain();
    }

    @Override // org.jetlinks.core.message.codec.context.SerialContext
    public Flux<IN> listen() {
        return this.inputProcessor;
    }
}
