/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.hashtable;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.io.compression.BlockCompressionFactory;
import org.apache.flink.runtime.io.disk.iomanager.AbstractChannelReaderInputView;
import org.apache.flink.runtime.io.disk.iomanager.BlockChannelReader;
import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
import org.apache.flink.runtime.io.disk.iomanager.HeaderlessChannelReaderInputView;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.runtime.util.FileChannelUtil;
import org.apache.flink.table.runtime.util.LazyMemorySegmentPool;
import org.apache.flink.table.runtime.util.MemorySegmentPool;
import org.apache.flink.util.MathUtils;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class BaseHybridHashTable
implements MemorySegmentPool {
    protected static final Logger LOG = LoggerFactory.getLogger(BaseHybridHashTable.class);
    protected static final int MAX_RECURSION_DEPTH = 3;
    protected static final int MAX_NUM_PARTITIONS = 127;
    private static final int MIN_NUM_MEMORY_SEGMENTS = 33;
    protected final int initPartitionFanOut;
    private final int avgRecordLen;
    protected final long buildRowCount;
    protected final int totalNumBuffers;
    protected final LazyMemorySegmentPool internalPool;
    protected final IOManager ioManager;
    protected final int segmentSize;
    protected final LinkedBlockingQueue<MemorySegment> buildSpillReturnBuffers;
    public final int segmentSizeBits;
    public final int segmentSizeMask;
    protected AtomicBoolean closed = new AtomicBoolean();
    public final boolean tryDistinctBuildRow;
    protected int currentRecursionDepth;
    protected int buildSpillRetBufferNumbers;
    protected HeaderlessChannelReaderInputView currentSpilledBuildSide;
    protected AbstractChannelReaderInputView currentSpilledProbeSide;
    protected FileIOChannel.Enumerator currentEnumerator;
    protected final boolean compressionEnable;
    protected final BlockCompressionFactory compressionCodecFactory;
    protected final int compressionBlockSize;
    protected transient long numSpillFiles;
    protected transient long spillInBytes;

    public BaseHybridHashTable(Configuration conf, Object owner, MemoryManager memManager, long reservedMemorySize, IOManager ioManager, int avgRecordLen, long buildRowCount, boolean tryDistinctBuildRow) {
        this.compressionEnable = conf.getBoolean(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED);
        this.compressionCodecFactory = this.compressionEnable ? BlockCompressionFactory.createBlockCompressionFactory((String)BlockCompressionFactory.CompressionFactoryName.LZ4.toString()) : null;
        this.compressionBlockSize = (int)MemorySize.parse((String)conf.getString(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE)).getBytes();
        this.avgRecordLen = avgRecordLen;
        this.buildRowCount = buildRowCount;
        this.tryDistinctBuildRow = tryDistinctBuildRow;
        this.totalNumBuffers = (int)(reservedMemorySize / (long)memManager.getPageSize());
        Preconditions.checkArgument((this.totalNumBuffers >= 33 ? 1 : 0) != 0);
        this.internalPool = new LazyMemorySegmentPool(owner, memManager, this.totalNumBuffers);
        this.ioManager = ioManager;
        this.segmentSize = memManager.getPageSize();
        Preconditions.checkArgument((boolean)MathUtils.isPowerOf2((long)this.segmentSize));
        this.buildSpillReturnBuffers = new LinkedBlockingQueue();
        this.segmentSizeBits = MathUtils.log2strict((int)this.segmentSize);
        this.segmentSizeMask = this.segmentSize - 1;
        this.currentRecursionDepth = 0;
        this.initPartitionFanOut = Math.min(this.getPartitioningFanOutNoEstimates(), this.maxNumPartition());
        this.closed.set(false);
        LOG.info(String.format("Initialize hash table with %d memory segments, each size [%d], the memory %d MB.", this.totalNumBuffers, this.segmentSize, (long)this.totalNumBuffers * (long)this.segmentSize / 1024L / 1024L));
    }

    protected int maxNumPartition() {
        return (this.internalPool.freePages() + this.buildSpillRetBufferNumbers) / 2;
    }

    private int getPartitioningFanOutNoEstimates() {
        return Math.max(11, BaseHybridHashTable.findSmallerPrime((int)Math.min(this.buildRowCount * (long)this.avgRecordLen / (long)(10 * this.segmentSize), 127L)));
    }

    private static int findSmallerPrime(int num) {
        while (num > 1) {
            if (BaseHybridHashTable.isPrimeNumber(num)) {
                return num;
            }
            --num;
        }
        return num;
    }

    private static boolean isPrimeNumber(int num) {
        if (num == 2) {
            return true;
        }
        if (num < 2 || num % 2 == 0) {
            return false;
        }
        int i = 3;
        while ((double)i <= Math.sqrt(num)) {
            if (num % i == 0) {
                return false;
            }
            i += 2;
        }
        return true;
    }

    public MemorySegment getNextBuffer() {
        MemorySegment segment = this.internalPool.nextSegment();
        if (segment != null) {
            return segment;
        }
        if (this.buildSpillRetBufferNumbers > 0) {
            MemorySegment currBuff;
            MemorySegment toReturn;
            try {
                toReturn = this.buildSpillReturnBuffers.take();
            }
            catch (InterruptedException iex) {
                throw new RuntimeException("Hybrid Hash Join was interrupted while taking a buffer.");
            }
            --this.buildSpillRetBufferNumbers;
            while (this.buildSpillRetBufferNumbers > 0 && (currBuff = this.buildSpillReturnBuffers.poll()) != null) {
                this.returnPage(currBuff);
                --this.buildSpillRetBufferNumbers;
            }
            return toReturn;
        }
        return null;
    }

    public MemorySegment[] getNextBuffers(int bufferSize) {
        MemorySegment[] memorySegments = new MemorySegment[bufferSize];
        for (int i = 0; i < bufferSize; ++i) {
            MemorySegment nextBuffer = this.getNextBuffer();
            if (nextBuffer == null) {
                throw new RuntimeException("No enough buffers!");
            }
            memorySegments[i] = nextBuffer;
        }
        return memorySegments;
    }

    protected MemorySegment getNotNullNextBuffer() {
        MemorySegment buffer = this.getNextBuffer();
        if (buffer == null) {
            throw new RuntimeException("Bug in HybridHashJoin: No memory became available.");
        }
        return buffer;
    }

    public MemorySegment nextSegment() {
        MemorySegment seg = this.getNextBuffer();
        if (seg != null) {
            return seg;
        }
        try {
            this.spillPartition();
        }
        catch (IOException ioex) {
            throw new RuntimeException("Error spilling Hash Join Partition" + (ioex.getMessage() == null ? "." : ": " + ioex.getMessage()), ioex);
        }
        MemorySegment fromSpill = this.getNextBuffer();
        if (fromSpill == null) {
            throw new RuntimeException("BUG in Hybrid Hash Join: Spilling did not free a buffer.");
        }
        return fromSpill;
    }

    @Override
    public int freePages() {
        throw new UnsupportedOperationException("Contains spill memories, it is hard to estimate free pages.");
    }

    @Override
    public int pageSize() {
        return this.segmentSize;
    }

    @Override
    public void returnAll(List<MemorySegment> memory) {
        for (MemorySegment segment : memory) {
            if (segment == null) continue;
            this.returnPage(segment);
        }
    }

    protected abstract int spillPartition() throws IOException;

    public void ensureNumBuffersReturned(int minRequiredAvailable) {
        if (minRequiredAvailable > this.internalPool.freePages() + this.buildSpillRetBufferNumbers) {
            throw new IllegalArgumentException("More buffers requested available than totally available.");
        }
        try {
            while (this.internalPool.freePages() < minRequiredAvailable) {
                this.returnPage(this.buildSpillReturnBuffers.take());
                --this.buildSpillRetBufferNumbers;
            }
        }
        catch (InterruptedException iex) {
            throw new RuntimeException("Hash Join was interrupted.");
        }
    }

    public void close() {
        if (!this.closed.compareAndSet(false, true)) {
            return;
        }
        if (this.currentSpilledProbeSide != null) {
            try {
                this.currentSpilledProbeSide.getChannel().closeAndDelete();
            }
            catch (Throwable t) {
                LOG.warn("Could not close and delete the temp file for the current spilled partition probe side.", t);
            }
        }
        this.clearPartitions();
        for (int i = 0; i < this.buildSpillRetBufferNumbers; ++i) {
            try {
                this.returnPage(this.buildSpillReturnBuffers.take());
                continue;
            }
            catch (InterruptedException iex) {
                throw new RuntimeException("Hashtable closing was interrupted");
            }
        }
        this.buildSpillRetBufferNumbers = 0;
    }

    protected abstract void clearPartitions();

    public void free() {
        if (!this.closed.get()) {
            throw new IllegalStateException("Cannot release memory until BinaryHashTable is closed!");
        }
        this.freeCurrent();
    }

    public void freeCurrent() {
        this.internalPool.cleanCache();
    }

    LazyMemorySegmentPool getInternalPool() {
        return this.internalPool;
    }

    public void returnPage(MemorySegment segment) {
        this.internalPool.returnPage(segment);
    }

    public int remainBuffers() {
        return this.internalPool.freePages() + this.buildSpillRetBufferNumbers;
    }

    public long getUsedMemoryInBytes() {
        return (long)(this.totalNumBuffers - this.internalPool.freePages()) * (long)this.internalPool.pageSize();
    }

    public long getNumSpillFiles() {
        return this.numSpillFiles;
    }

    public long getSpillInBytes() {
        return this.spillInBytes;
    }

    public int maxInitBufferOfBucketArea(int partitions) {
        return Math.max(1, (this.totalNumBuffers - 2) / 6 / partitions);
    }

    protected List<MemorySegment> readAllBuffers(FileIOChannel.ID id, int blockCount) throws IOException {
        this.ensureNumBuffersReturned(blockCount);
        LinkedBlockingQueue<MemorySegment> retSegments = new LinkedBlockingQueue<MemorySegment>();
        BlockChannelReader<MemorySegment> reader = FileChannelUtil.createBlockChannelReader(this.ioManager, id, retSegments, this.compressionEnable, this.compressionCodecFactory, this.compressionBlockSize, this.segmentSize);
        for (int i = 0; i < blockCount; ++i) {
            reader.readBlock((Object)this.internalPool.nextSegment());
        }
        reader.closeAndDelete();
        ArrayList<MemorySegment> buffers = new ArrayList<MemorySegment>();
        retSegments.drainTo(buffers);
        return buffers;
    }

    protected HeaderlessChannelReaderInputView createInputView(FileIOChannel.ID id, int blockCount, int lastSegmentLimit) throws IOException {
        BlockChannelReader<MemorySegment> inReader = FileChannelUtil.createBlockChannelReader(this.ioManager, id, new LinkedBlockingQueue<MemorySegment>(), this.compressionEnable, this.compressionCodecFactory, this.compressionBlockSize, this.segmentSize);
        return new HeaderlessChannelReaderInputView(inReader, Arrays.asList(MemorySegmentFactory.allocateUnpooledSegment((int)this.segmentSize), MemorySegmentFactory.allocateUnpooledSegment((int)this.segmentSize)), blockCount, lastSegmentLimit, false);
    }

    public static int hash(int hashCode, int level) {
        int rotation = level * 11;
        int code = Integer.rotateLeft(hashCode, rotation);
        return code >= 0 ? code : -(code + 1);
    }

    static int partitionLevelHash(int hash) {
        return hash ^ hash >>> 16;
    }
}

