/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
import org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartition;
import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
import org.apache.flink.runtime.io.network.partition.CountingAvailabilityListener;
import org.apache.flink.runtime.io.network.partition.NoOpBufferAvailablityListener;
import org.apache.flink.runtime.io.network.partition.PartitionTestUtils;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultSubpartition;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

public class BoundedBlockingSubpartitionAvailabilityTest {
    @ClassRule
    public static final TemporaryFolder TMP_FOLDER = new TemporaryFolder();
    private static final int BUFFER_SIZE = 32768;

    @Test
    public void testInitiallyAvailable() throws Exception {
        BoundedBlockingSubpartition subpartition = BoundedBlockingSubpartitionAvailabilityTest.createPartitionWithData(10);
        CountingAvailabilityListener listener = new CountingAvailabilityListener();
        ResultSubpartitionView subpartitionView = subpartition.createReadView((BufferAvailabilityListener)listener);
        Assert.assertEquals((long)1L, (long)listener.numNotifications);
        subpartitionView.releaseAllResources();
        subpartition.release();
    }

    @Test
    public void testUnavailableWhenBuffersExhausted() throws Exception {
        BoundedBlockingSubpartition subpartition = BoundedBlockingSubpartitionAvailabilityTest.createPartitionWithData(100000);
        CountingAvailabilityListener listener = new CountingAvailabilityListener();
        ResultSubpartitionView reader = subpartition.createReadView((BufferAvailabilityListener)listener);
        List<ResultSubpartition.BufferAndBacklog> data = BoundedBlockingSubpartitionAvailabilityTest.drainAvailableData(reader);
        Assert.assertFalse((boolean)reader.isAvailable(Integer.MAX_VALUE));
        Assert.assertFalse((boolean)data.get(data.size() - 1).isDataAvailable());
        reader.releaseAllResources();
        subpartition.release();
    }

    @Test
    public void testAvailabilityNotificationWhenBuffersReturn() throws Exception {
        BoundedBlockingSubpartition subpartition = BoundedBlockingSubpartitionAvailabilityTest.createPartitionWithData(100000);
        CountingAvailabilityListener listener = new CountingAvailabilityListener();
        ResultSubpartitionView reader = subpartition.createReadView((BufferAvailabilityListener)listener);
        List<ResultSubpartition.BufferAndBacklog> data = BoundedBlockingSubpartitionAvailabilityTest.drainAvailableData(reader);
        data.get(0).buffer().recycleBuffer();
        data.get(1).buffer().recycleBuffer();
        Assert.assertTrue((boolean)reader.isAvailable(Integer.MAX_VALUE));
        Assert.assertEquals((long)2L, (long)listener.numNotifications);
        reader.releaseAllResources();
        subpartition.release();
    }

    @Test
    public void testNotAvailableWhenEmpty() throws Exception {
        BoundedBlockingSubpartition subpartition = BoundedBlockingSubpartitionAvailabilityTest.createPartitionWithData(100000);
        ResultSubpartitionView reader = subpartition.createReadView((BufferAvailabilityListener)new NoOpBufferAvailablityListener());
        BoundedBlockingSubpartitionAvailabilityTest.drainAllData(reader);
        Assert.assertFalse((boolean)reader.isAvailable(Integer.MAX_VALUE));
        reader.releaseAllResources();
        subpartition.release();
    }

    private static BoundedBlockingSubpartition createPartitionWithData(int numberOfBuffers) throws IOException {
        ResultPartition parent = PartitionTestUtils.createPartition();
        BoundedBlockingSubpartition partition = BoundedBlockingSubpartition.createWithFileChannel((int)0, (ResultPartition)parent, (File)new File(TMP_FOLDER.newFolder(), "data"), (int)32768);
        BoundedBlockingSubpartitionAvailabilityTest.writeBuffers((ResultSubpartition)partition, numberOfBuffers);
        partition.finish();
        return partition;
    }

    private static void writeBuffers(ResultSubpartition partition, int numberOfBuffers) throws IOException {
        for (int i = 0; i < numberOfBuffers; ++i) {
            partition.add(BufferBuilderTestUtils.createFilledFinishedBufferConsumer(32768));
        }
    }

    private static List<ResultSubpartition.BufferAndBacklog> drainAvailableData(ResultSubpartitionView reader) throws Exception {
        ResultSubpartition.BufferAndBacklog bab;
        ArrayList<ResultSubpartition.BufferAndBacklog> list = new ArrayList<ResultSubpartition.BufferAndBacklog>();
        while ((bab = reader.getNextBuffer()) != null) {
            list.add(bab);
        }
        return list;
    }

    private static void drainAllData(ResultSubpartitionView reader) throws Exception {
        ResultSubpartition.BufferAndBacklog bab;
        while ((bab = reader.getNextBuffer()) != null) {
            bab.buffer().recycleBuffer();
        }
    }
}

