/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.disk;

import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import org.apache.flink.annotation.Internal;
import org.apache.flink.core.fs.RefCountedFile;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;

@Internal
public class FileBasedBufferIterator
implements CloseableIterator<Buffer> {
    private final RefCountedFile file;
    private final FileInputStream stream;
    private final int bufferSize;
    private int offset;
    private int bytesToRead;

    public FileBasedBufferIterator(RefCountedFile file, int bytesToRead, int bufferSize) throws FileNotFoundException {
        Preconditions.checkNotNull(file);
        Preconditions.checkArgument(bytesToRead >= 0);
        Preconditions.checkArgument(bufferSize > 0);
        this.stream = new FileInputStream(file.getFile());
        this.file = file;
        this.bufferSize = bufferSize;
        this.bytesToRead = bytesToRead;
        file.retain();
    }

    @Override
    public boolean hasNext() {
        return this.bytesToRead > 0;
    }

    @Override
    public Buffer next() {
        byte[] buffer = new byte[this.bufferSize];
        int bytesRead = this.read(buffer);
        Preconditions.checkState(bytesRead >= 0, "unexpected end of file, file = " + this.file.getFile() + ", offset=" + this.offset);
        this.offset += bytesRead;
        this.bytesToRead -= bytesRead;
        return new NetworkBuffer(MemorySegmentFactory.wrap(buffer), FreeingBufferRecycler.INSTANCE, Buffer.DataType.DATA_BUFFER, bytesRead);
    }

    private int read(byte[] buffer) {
        int limit = Math.min(buffer.length, this.bytesToRead);
        try {
            return this.stream.read(buffer, 0, limit);
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public void close() throws Exception {
        AutoCloseable[] autoCloseableArray = new AutoCloseable[2];
        autoCloseableArray[0] = this.stream;
        autoCloseableArray[1] = this.file::release;
        IOUtils.closeAll(autoCloseableArray);
    }
}

