package io.jafka.message;

import io.jafka.mx.LogFlushStats;
import io.jafka.utils.IteratorTemplate;
import io.jafka.utils.Utils;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.GatheringByteChannel;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/jafka/message/FileMessageSet.class */
public class FileMessageSet extends MessageSet {
    private final Logger logger;
    private final FileChannel channel;
    private final long offset;
    private final boolean mutable;
    private final AtomicBoolean needRecover;
    private final AtomicLong setSize;
    private final AtomicLong setHighWaterMark;

    public FileMessageSet(FileChannel fileChannel, long j, long j2, boolean z, AtomicBoolean atomicBoolean) throws IOException {
        this.logger = LoggerFactory.getLogger(FileMessageSet.class);
        this.setSize = new AtomicLong();
        this.setHighWaterMark = new AtomicLong();
        this.channel = fileChannel;
        this.offset = j;
        this.mutable = z;
        this.needRecover = atomicBoolean;
        if (!z) {
            this.setSize.set(Math.min(fileChannel.size(), j2) - j);
            this.setHighWaterMark.set(getSizeInBytes());
            return;
        }
        if (j2 < Long.MAX_VALUE || j > 0) {
            throw new IllegalArgumentException("Attempt to open a mutable message set with a view or offset, which is not allowed.");
        }
        if (atomicBoolean.get()) {
            this.logger.info("Recovery succeeded in " + ((System.currentTimeMillis() - System.currentTimeMillis()) / 1000) + " seconds. " + recover() + " bytes truncated.");
        } else {
            this.setSize.set(fileChannel.size());
            this.setHighWaterMark.set(getSizeInBytes());
            fileChannel.position(fileChannel.size());
        }
    }

    public FileMessageSet(FileChannel fileChannel, boolean z) throws IOException {
        this(fileChannel, 0L, Long.MAX_VALUE, z, new AtomicBoolean(false));
    }

    public FileMessageSet(File file, boolean z) throws IOException {
        this(Utils.openChannel(file, z), z);
    }

    public FileMessageSet(FileChannel fileChannel, boolean z, AtomicBoolean atomicBoolean) throws IOException {
        this(fileChannel, 0L, Long.MAX_VALUE, z, atomicBoolean);
    }

    public FileMessageSet(File file, boolean z, AtomicBoolean atomicBoolean) throws IOException {
        this(Utils.openChannel(file, z), z, atomicBoolean);
    }

    @Override // java.lang.Iterable
    public Iterator<MessageAndOffset> iterator() {
        return new IteratorTemplate<MessageAndOffset>() { // from class: io.jafka.message.FileMessageSet.1
            long location;

            {
                this.location = FileMessageSet.this.offset;
            }

            /* JADX INFO: Access modifiers changed from: protected */
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // io.jafka.utils.IteratorTemplate
            public MessageAndOffset makeNext() {
                try {
                    ByteBuffer allocate = ByteBuffer.allocate(4);
                    FileMessageSet.this.channel.read(allocate, this.location);
                    if (allocate.hasRemaining()) {
                        return allDone();
                    }
                    allocate.rewind();
                    int i = allocate.getInt();
                    if (i < Message.MinHeaderSize) {
                        return allDone();
                    }
                    ByteBuffer allocate2 = ByteBuffer.allocate(i);
                    FileMessageSet.this.channel.read(allocate2, this.location + 4);
                    if (allocate2.hasRemaining()) {
                        return allDone();
                    }
                    allocate2.rewind();
                    this.location += i + 4;
                    return new MessageAndOffset(new Message(allocate2), this.location);
                } catch (IOException e) {
                    throw new RuntimeException(e.getMessage(), e);
                }
            }
        };
    }

    @Override // io.jafka.message.MessageSet
    public long getSizeInBytes() {
        return this.setSize.get();
    }

    @Override // io.jafka.message.MessageSet
    public long writeTo(GatheringByteChannel gatheringByteChannel, long j, long j2) throws IOException {
        return this.channel.transferTo(this.offset + j, Math.min(j2, getSizeInBytes()), gatheringByteChannel);
    }

    public MessageSet read(long j, long j2) throws IOException {
        return new FileMessageSet(this.channel, this.offset + j, Math.min(this.offset + j + j2, highWaterMark()), false, new AtomicBoolean(false));
    }

    public long[] append(MessageSet messageSet) throws IOException {
        checkMutable();
        long j = 0;
        while (true) {
            long j2 = j;
            if (j2 >= messageSet.getSizeInBytes()) {
                return new long[]{j2, this.setSize.getAndAdd(j2)};
            }
            j = j2 + messageSet.writeTo(this.channel, 0L, messageSet.getSizeInBytes());
        }
    }

    public void flush() throws IOException {
        checkMutable();
        long currentTimeMillis = System.currentTimeMillis();
        this.channel.force(true);
        long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
        LogFlushStats.recordFlushRequest(currentTimeMillis2);
        this.logger.debug("flush time " + currentTimeMillis2);
        this.setHighWaterMark.set(getSizeInBytes());
        this.logger.debug("flush high water mark:" + highWaterMark());
    }

    public void close() throws IOException {
        if (this.mutable) {
            flush();
        }
        this.channel.close();
    }

    private long recover() throws IOException {
        long validateMessage;
        checkMutable();
        long size = this.channel.size();
        ByteBuffer allocate = ByteBuffer.allocate(4);
        long j = 0;
        do {
            validateMessage = validateMessage(this.channel, j, size, allocate);
            if (validateMessage >= 0) {
                j = validateMessage;
            }
        } while (validateMessage >= 0);
        this.channel.truncate(j);
        this.setSize.set(j);
        this.setHighWaterMark.set(j);
        this.logger.info("recover high water mark:" + highWaterMark());
        this.channel.position(j);
        this.needRecover.set(false);
        return size - j;
    }

    private long validateMessage(FileChannel fileChannel, long j, long j2, ByteBuffer byteBuffer) throws IOException {
        int i;
        byteBuffer.rewind();
        if (fileChannel.read(byteBuffer, j) < 4 || (i = byteBuffer.getInt(0)) < Message.MinHeaderSize) {
            return -1L;
        }
        long j3 = j + 4 + i;
        if (j3 > j2) {
            return -1L;
        }
        ByteBuffer allocate = ByteBuffer.allocate(i);
        long j4 = j;
        long j5 = 4;
        while (true) {
            long j6 = j4 + j5;
            if (!allocate.hasRemaining()) {
                allocate.rewind();
                if (new Message(allocate).isValid()) {
                    return j3;
                }
                return -1L;
            }
            int read = fileChannel.read(allocate, j6);
            if (read < 0) {
                throw new IllegalStateException("File size changed during recovery!");
            }
            j4 = j6;
            j5 = read;
        }
    }

    void checkMutable() {
        if (!this.mutable) {
            throw new IllegalStateException("Attempt to invoke mutation on immutable message set.");
        }
    }

    public long highWaterMark() {
        return this.setHighWaterMark.get();
    }
}
