/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition;

import java.io.IOException;
import java.nio.file.Path;
import org.apache.flink.runtime.io.disk.FileChannelManager;
import org.apache.flink.runtime.io.disk.FileChannelManagerImpl;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
import org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartitionType;
import org.apache.flink.runtime.io.network.partition.BoundedData;
import org.apache.flink.runtime.io.network.partition.BoundedDataTestBase;
import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
import org.apache.flink.runtime.io.network.partition.FileChannelBoundedData;
import org.apache.flink.runtime.io.network.partition.NoOpResultSubpartitionView;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionBuilder;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.ResultSubpartition;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

public class FileChannelBoundedDataTest
extends BoundedDataTestBase {
    private static final String tempDir = EnvironmentInformation.getTemporaryFileDirectory();
    private static FileChannelManager fileChannelManager;

    @BeforeClass
    public static void setUp() {
        fileChannelManager = new FileChannelManagerImpl(new String[]{tempDir}, "testing");
    }

    @AfterClass
    public static void shutdown() throws Exception {
        fileChannelManager.close();
    }

    @Override
    protected boolean isRegionBased() {
        return false;
    }

    @Override
    protected BoundedData createBoundedData(Path tempFilePath) throws IOException {
        return FileChannelBoundedData.create((Path)tempFilePath, (int)0x100000);
    }

    @Override
    protected BoundedData createBoundedDataWithRegion(Path tempFilePath, int regionSize) throws IOException {
        throw new UnsupportedOperationException();
    }

    @Test
    public void testReadNextBuffer() throws Exception {
        int numberOfBuffers = 3;
        try (BoundedData data = this.createBoundedData();){
            FileChannelBoundedDataTest.writeBuffers(data, 3);
            BoundedData.Reader reader = data.createReader();
            Buffer buffer1 = reader.nextBuffer();
            Buffer buffer2 = reader.nextBuffer();
            Assert.assertNotNull((Object)buffer1);
            Assert.assertNotNull((Object)buffer2);
            Assert.assertNull((Object)reader.nextBuffer());
            buffer1.recycleBuffer();
            buffer2.recycleBuffer();
        }
    }

    @Test
    public void testRecycleBufferForNotifyingSubpartitionView() throws Exception {
        int numberOfBuffers = 2;
        try (BoundedData data = this.createBoundedData();){
            FileChannelBoundedDataTest.writeBuffers(data, 2);
            VerifyNotificationResultSubpartitionView subpartitionView = new VerifyNotificationResultSubpartitionView();
            BoundedData.Reader reader = data.createReader((ResultSubpartitionView)subpartitionView);
            Buffer buffer1 = reader.nextBuffer();
            Buffer buffer2 = reader.nextBuffer();
            Assert.assertNotNull((Object)buffer1);
            Assert.assertNotNull((Object)buffer2);
            Assert.assertFalse((boolean)subpartitionView.isAvailable);
            buffer1.recycleBuffer();
            Assert.assertTrue((boolean)subpartitionView.isAvailable);
            subpartitionView.resetAvailable();
            Assert.assertFalse((boolean)subpartitionView.isAvailable);
            Assert.assertNull((Object)reader.nextBuffer());
            buffer2.recycleBuffer();
            Assert.assertFalse((boolean)subpartitionView.isAvailable);
        }
    }

    @Test
    public void testRecycleBufferForNotifyingBufferAvailabilityListener() throws Exception {
        ResultSubpartition subpartition = FileChannelBoundedDataTest.createFileBoundedBlockingSubpartition();
        int numberOfBuffers = 2;
        FileChannelBoundedDataTest.writeBuffers(subpartition, 2);
        VerifyNotificationBufferAvailabilityListener listener = new VerifyNotificationBufferAvailabilityListener();
        ResultSubpartitionView subpartitionView = subpartition.createReadView((BufferAvailabilityListener)listener);
        Assert.assertTrue((boolean)listener.isAvailable);
        listener.resetAvailable();
        Assert.assertFalse((boolean)listener.isAvailable);
        ResultSubpartition.BufferAndBacklog buffer1 = subpartitionView.getNextBuffer();
        ResultSubpartition.BufferAndBacklog buffer2 = subpartitionView.getNextBuffer();
        Assert.assertNotNull((Object)buffer1);
        Assert.assertNotNull((Object)buffer2);
        Assert.assertFalse((boolean)subpartitionView.isAvailable(Integer.MAX_VALUE));
        buffer1.buffer().recycleBuffer();
        Assert.assertTrue((boolean)listener.isAvailable);
        buffer2.buffer().recycleBuffer();
        subpartitionView.releaseAllResources();
        subpartition.release();
    }

    private static ResultSubpartition createFileBoundedBlockingSubpartition() {
        ResultPartition resultPartition = new ResultPartitionBuilder().setNetworkBufferSize(0x100000).setResultPartitionType(ResultPartitionType.BLOCKING).setBoundedBlockingSubpartitionType(BoundedBlockingSubpartitionType.FILE).setFileChannelManager(fileChannelManager).build();
        return resultPartition.subpartitions[0];
    }

    private static void writeBuffers(BoundedData data, int numberOfBuffers) throws IOException {
        for (int i = 0; i < numberOfBuffers; ++i) {
            data.writeBuffer(BufferBuilderTestUtils.buildSomeBuffer(0x100000));
        }
        data.finishWrite();
    }

    private static void writeBuffers(ResultSubpartition subpartition, int numberOfBuffers) throws IOException {
        for (int i = 0; i < numberOfBuffers; ++i) {
            subpartition.add(BufferBuilderTestUtils.createFilledFinishedBufferConsumer(0x100000));
        }
        subpartition.finish();
    }

    private static class VerifyNotificationBufferAvailabilityListener
    implements BufferAvailabilityListener {
        private boolean isAvailable;

        private VerifyNotificationBufferAvailabilityListener() {
        }

        public void notifyDataAvailable() {
            this.isAvailable = true;
        }

        private void resetAvailable() {
            this.isAvailable = false;
        }
    }

    private static class VerifyNotificationResultSubpartitionView
    extends NoOpResultSubpartitionView {
        private boolean isAvailable;

        private VerifyNotificationResultSubpartitionView() {
        }

        public void notifyDataAvailable() {
            this.isAvailable = true;
        }

        private void resetAvailable() {
            this.isAvailable = false;
        }
    }
}

