/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition;

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateReader;
import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
import org.apache.flink.runtime.checkpoint.channel.ResultSubpartitionInfo;
import org.apache.flink.runtime.io.disk.FileChannelManager;
import org.apache.flink.runtime.io.disk.FileChannelManagerImpl;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
import org.apache.flink.runtime.io.network.buffer.BufferBuilderAndConsumerTest;
import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
import org.apache.flink.runtime.io.network.partition.NoOpBufferAvailablityListener;
import org.apache.flink.runtime.io.network.partition.PartitionTestUtils;
import org.apache.flink.runtime.io.network.partition.PipelinedSubpartition;
import org.apache.flink.runtime.io.network.partition.PipelinedSubpartitionView;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionBuilder;
import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.ResultSubpartition;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
import org.apache.flink.runtime.taskmanager.ConsumableNotifyingResultPartitionWriterDecorator;
import org.apache.flink.runtime.taskmanager.NoOpTaskActions;
import org.apache.flink.runtime.taskmanager.TaskActions;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;

public class ResultPartitionTest {
    private static final String tempDir = EnvironmentInformation.getTemporaryFileDirectory();
    private static FileChannelManager fileChannelManager;

    @BeforeClass
    public static void setUp() {
        fileChannelManager = new FileChannelManagerImpl(new String[]{tempDir}, "testing");
    }

    @AfterClass
    public static void shutdown() throws Exception {
        fileChannelManager.close();
    }

    @Test
    public void testResultSubpartitionInfo() {
        int numPartitions = 2;
        int numSubpartitions = 3;
        for (int i = 0; i < 2; ++i) {
            ResultPartition partition = new ResultPartitionBuilder().setResultPartitionIndex(i).setNumberOfSubpartitions(3).build();
            ResultSubpartition[] subpartitions = partition.getAllPartitions();
            for (int j = 0; j < subpartitions.length; ++j) {
                ResultSubpartitionInfo subpartitionInfo = subpartitions[j].getSubpartitionInfo();
                Assert.assertEquals((long)i, (long)subpartitionInfo.getPartitionIdx());
                Assert.assertEquals((long)j, (long)subpartitionInfo.getSubPartitionIdx());
            }
        }
    }

    @Test
    public void testSendScheduleOrUpdateConsumersMessage() throws Exception {
        JobID jobId = new JobID();
        NoOpTaskActions taskActions = new NoOpTaskActions();
        ResultPartitionConsumableNotifier notifier = (ResultPartitionConsumableNotifier)Mockito.mock(ResultPartitionConsumableNotifier.class);
        ResultPartitionWriter consumableNotifyingPartitionWriter = this.createConsumableNotifyingResultPartitionWriter(ResultPartitionType.PIPELINED, (TaskActions)taskActions, jobId, notifier);
        consumableNotifyingPartitionWriter.addBufferConsumer(BufferBuilderTestUtils.createFilledFinishedBufferConsumer(32768), 0);
        ((ResultPartitionConsumableNotifier)Mockito.verify((Object)notifier, (VerificationMode)Mockito.times((int)1))).notifyPartitionConsumable((JobID)Matchers.eq((Object)jobId), (ResultPartitionID)Matchers.eq((Object)consumableNotifyingPartitionWriter.getPartitionId()), (TaskActions)Matchers.eq((Object)taskActions));
        notifier = (ResultPartitionConsumableNotifier)Mockito.mock(ResultPartitionConsumableNotifier.class);
        ResultPartitionWriter partition = this.createConsumableNotifyingResultPartitionWriter(ResultPartitionType.BLOCKING, (TaskActions)taskActions, jobId, notifier);
        partition.addBufferConsumer(BufferBuilderTestUtils.createFilledFinishedBufferConsumer(32768), 0);
        ((ResultPartitionConsumableNotifier)Mockito.verify((Object)notifier, (VerificationMode)Mockito.never())).notifyPartitionConsumable((JobID)Matchers.eq((Object)jobId), (ResultPartitionID)Matchers.eq((Object)partition.getPartitionId()), (TaskActions)Matchers.eq((Object)taskActions));
    }

    @Test
    public void testAddOnFinishedPipelinedPartition() throws Exception {
        this.testAddOnFinishedPartition(ResultPartitionType.PIPELINED);
    }

    @Test
    public void testAddOnFinishedBlockingPartition() throws Exception {
        this.testAddOnFinishedPartition(ResultPartitionType.BLOCKING);
    }

    @Test
    public void testBlockingPartitionIsConsumableMultipleTimesIfNotReleasedOnConsumption() throws IOException {
        ResultPartitionManager manager = new ResultPartitionManager();
        ResultPartition partition = new ResultPartitionBuilder().isReleasedOnConsumption(false).setResultPartitionManager(manager).setResultPartitionType(ResultPartitionType.BLOCKING).setFileChannelManager(fileChannelManager).build();
        manager.registerResultPartition(partition);
        partition.finish();
        MatcherAssert.assertThat((Object)manager.getUnreleasedPartitions(), (Matcher)org.hamcrest.Matchers.contains((Object[])new ResultPartitionID[]{partition.getPartitionId()}));
        for (int x = 0; x < 2; ++x) {
            ResultSubpartitionView subpartitionView1 = partition.createSubpartitionView(0, () -> {});
            subpartitionView1.releaseAllResources();
            MatcherAssert.assertThat((Object)manager.getUnreleasedPartitions(), (Matcher)org.hamcrest.Matchers.contains((Object[])new ResultPartitionID[]{partition.getPartitionId()}));
            Assert.assertFalse((boolean)partition.isReleased());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testAddOnFinishedPartition(ResultPartitionType partitionType) throws Exception {
        BufferConsumer bufferConsumer = BufferBuilderTestUtils.createFilledFinishedBufferConsumer(32768);
        ResultPartitionConsumableNotifier notifier = (ResultPartitionConsumableNotifier)Mockito.mock(ResultPartitionConsumableNotifier.class);
        JobID jobId = new JobID();
        NoOpTaskActions taskActions = new NoOpTaskActions();
        ResultPartitionWriter consumableNotifyingPartitionWriter = this.createConsumableNotifyingResultPartitionWriter(partitionType, (TaskActions)taskActions, jobId, notifier);
        try {
            consumableNotifyingPartitionWriter.finish();
            Mockito.reset((Object[])new ResultPartitionConsumableNotifier[]{notifier});
            consumableNotifyingPartitionWriter.addBufferConsumer(bufferConsumer, 0);
            Assert.fail((String)"exception expected");
        }
        catch (IllegalStateException illegalStateException) {
        }
        finally {
            if (!bufferConsumer.isRecycled()) {
                bufferConsumer.close();
                Assert.fail((String)"bufferConsumer not recycled");
            }
            ((ResultPartitionConsumableNotifier)Mockito.verify((Object)notifier, (VerificationMode)Mockito.never())).notifyPartitionConsumable((JobID)Matchers.eq((Object)jobId), (ResultPartitionID)Matchers.eq((Object)consumableNotifyingPartitionWriter.getPartitionId()), (TaskActions)Matchers.eq((Object)taskActions));
        }
    }

    @Test
    public void testAddOnReleasedPipelinedPartition() throws Exception {
        this.testAddOnReleasedPartition(ResultPartitionType.PIPELINED);
    }

    @Test
    public void testAddOnReleasedBlockingPartition() throws Exception {
        this.testAddOnReleasedPartition(ResultPartitionType.BLOCKING);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testAddOnReleasedPartition(ResultPartitionType partitionType) throws Exception {
        BufferConsumer bufferConsumer = BufferBuilderTestUtils.createFilledFinishedBufferConsumer(32768);
        ResultPartitionConsumableNotifier notifier = (ResultPartitionConsumableNotifier)Mockito.mock(ResultPartitionConsumableNotifier.class);
        JobID jobId = new JobID();
        NoOpTaskActions taskActions = new NoOpTaskActions();
        ResultPartition partition = partitionType == ResultPartitionType.BLOCKING ? PartitionTestUtils.createPartition(partitionType, fileChannelManager) : PartitionTestUtils.createPartition(partitionType);
        ResultPartitionWriter consumableNotifyingPartitionWriter = ConsumableNotifyingResultPartitionWriterDecorator.decorate(Collections.singleton(PartitionTestUtils.createPartitionDeploymentDescriptor(partitionType)), (ResultPartitionWriter[])new ResultPartitionWriter[]{partition}, (TaskActions)taskActions, (JobID)jobId, (ResultPartitionConsumableNotifier)notifier)[0];
        try {
            partition.release();
            consumableNotifyingPartitionWriter.addBufferConsumer(bufferConsumer, 0);
            Assert.assertTrue((boolean)partition.isReleased());
        }
        finally {
            if (!bufferConsumer.isRecycled()) {
                bufferConsumer.close();
                Assert.fail((String)"bufferConsumer not recycled");
            }
            ((ResultPartitionConsumableNotifier)Mockito.verify((Object)notifier, (VerificationMode)Mockito.never())).notifyPartitionConsumable((JobID)Matchers.eq((Object)jobId), (ResultPartitionID)Matchers.eq((Object)partition.getPartitionId()), (TaskActions)Matchers.eq((Object)taskActions));
        }
    }

    @Test
    public void testAddOnPipelinedPartition() throws Exception {
        this.testAddOnPartition(ResultPartitionType.PIPELINED);
    }

    @Test
    public void testAddOnBlockingPartition() throws Exception {
        this.testAddOnPartition(ResultPartitionType.BLOCKING);
    }

    @Test
    public void testCreateSubpartitionOnFailingPartition() throws Exception {
        ResultPartitionManager manager = new ResultPartitionManager();
        ResultPartition partition = new ResultPartitionBuilder().setResultPartitionManager(manager).build();
        manager.registerResultPartition(partition);
        partition.fail(null);
        PartitionTestUtils.verifyCreateSubpartitionViewThrowsException((ResultPartitionProvider)manager, partition.getPartitionId());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testAddOnPartition(ResultPartitionType partitionType) throws Exception {
        ResultPartitionConsumableNotifier notifier = (ResultPartitionConsumableNotifier)Mockito.mock(ResultPartitionConsumableNotifier.class);
        JobID jobId = new JobID();
        NoOpTaskActions taskActions = new NoOpTaskActions();
        ResultPartitionWriter consumableNotifyingPartitionWriter = this.createConsumableNotifyingResultPartitionWriter(partitionType, (TaskActions)taskActions, jobId, notifier);
        BufferConsumer bufferConsumer = BufferBuilderTestUtils.createFilledFinishedBufferConsumer(32768);
        try {
            consumableNotifyingPartitionWriter.addBufferConsumer(bufferConsumer, 0);
            Assert.assertFalse((String)"bufferConsumer should not be recycled (still in the queue)", (boolean)bufferConsumer.isRecycled());
        }
        finally {
            if (!bufferConsumer.isRecycled()) {
                bufferConsumer.close();
            }
            if (partitionType.isPipelined()) {
                ((ResultPartitionConsumableNotifier)Mockito.verify((Object)notifier, (VerificationMode)Mockito.times((int)1))).notifyPartitionConsumable((JobID)Matchers.eq((Object)jobId), (ResultPartitionID)Matchers.eq((Object)consumableNotifyingPartitionWriter.getPartitionId()), (TaskActions)Matchers.eq((Object)taskActions));
            }
        }
    }

    @Test
    public void testReleaseMemoryOnPipelinedPartition() throws Exception {
        this.testReleaseMemory(ResultPartitionType.PIPELINED);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testReleaseMemory(ResultPartitionType resultPartitionType) throws Exception {
        int numAllBuffers = 10;
        NettyShuffleEnvironment network = new NettyShuffleEnvironmentBuilder().setNumNetworkBuffers(10).build();
        ResultPartition resultPartition = PartitionTestUtils.createPartition(network, resultPartitionType, 1);
        try {
            resultPartition.setup();
            for (int i = 0; i < 10; ++i) {
                BufferBuilder bufferBuilder = resultPartition.getBufferPool().requestBufferBuilderBlocking();
                resultPartition.addBufferConsumer(bufferBuilder.createBufferConsumer(), 0);
            }
            resultPartition.finish();
            Assert.assertEquals((long)0L, (long)resultPartition.getBufferPool().getNumberOfAvailableMemorySegments());
            int numLocalBuffers = 4;
            resultPartition.getBufferPool().setNumBuffers(4);
            if (!resultPartitionType.hasBackPressure()) {
                Assert.assertEquals((long)4L, (long)resultPartition.getBufferPool().getNumberOfAvailableMemorySegments());
            } else {
                Assert.assertEquals((long)0L, (long)resultPartition.getBufferPool().getNumberOfAvailableMemorySegments());
            }
        }
        finally {
            resultPartition.release();
            network.close();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testIsAvailableOrNot() throws IOException, InterruptedException {
        int numAllBuffers = 10;
        NettyShuffleEnvironment network = new NettyShuffleEnvironmentBuilder().setNumNetworkBuffers(10).build();
        ResultPartition resultPartition = PartitionTestUtils.createPartition(network, ResultPartitionType.PIPELINED, 1);
        try {
            resultPartition.setup();
            resultPartition.getBufferPool().setNumBuffers(2);
            Assert.assertTrue((boolean)resultPartition.getAvailableFuture().isDone());
            resultPartition.getBufferBuilder(0);
            resultPartition.getBufferBuilder(0);
            Assert.assertFalse((boolean)resultPartition.getAvailableFuture().isDone());
        }
        finally {
            resultPartition.release();
            network.close();
        }
    }

    @Test
    public void testPipelinedPartitionBufferPool() throws Exception {
        this.testPartitionBufferPool(ResultPartitionType.PIPELINED_BOUNDED);
    }

    @Test
    public void testBlockingPartitionBufferPool() throws Exception {
        this.testPartitionBufferPool(ResultPartitionType.BLOCKING);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testPartitionBufferPool(ResultPartitionType type) throws Exception {
        int networkBuffersPerChannel = 2;
        int floatingNetworkBuffersPerGate = 8;
        NetworkBufferPool globalPool = new NetworkBufferPool(20, 1, 1);
        ResultPartition partition = new ResultPartitionBuilder().setResultPartitionType(type).setFileChannelManager(fileChannelManager).setNetworkBuffersPerChannel(2).setFloatingNetworkBuffersPerGate(8).setNetworkBufferPool(globalPool).build();
        try {
            partition.setup();
            BufferPool bufferPool = partition.getBufferPool();
            Assert.assertEquals((long)(partition.getNumberOfSubpartitions() + 1), (long)bufferPool.getNumberOfRequiredMemorySegments());
            if (type.isBounded()) {
                int maxNumBuffers = 2 * partition.getNumberOfSubpartitions() + 8;
                Assert.assertEquals((long)maxNumBuffers, (long)bufferPool.getMaxNumberOfMemorySegments());
            } else {
                Assert.assertEquals((long)Integer.MAX_VALUE, (long)bufferPool.getMaxNumberOfMemorySegments());
            }
        }
        finally {
            globalPool.destroyAllBufferPools();
            globalPool.destroy();
        }
    }

    private ResultPartitionWriter createConsumableNotifyingResultPartitionWriter(ResultPartitionType partitionType, TaskActions taskActions, JobID jobId, ResultPartitionConsumableNotifier notifier) {
        ResultPartition partition = partitionType == ResultPartitionType.BLOCKING ? PartitionTestUtils.createPartition(partitionType, fileChannelManager) : PartitionTestUtils.createPartition(partitionType);
        return ConsumableNotifyingResultPartitionWriterDecorator.decorate(Collections.singleton(PartitionTestUtils.createPartitionDeploymentDescriptor(partitionType)), (ResultPartitionWriter[])new ResultPartitionWriter[]{partition}, (TaskActions)taskActions, (JobID)jobId, (ResultPartitionConsumableNotifier)notifier)[0];
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testInitializeEmptyState() throws Exception {
        int totalBuffers = 2;
        NetworkBufferPool globalPool = new NetworkBufferPool(2, 1, 1);
        ResultPartition partition = new ResultPartitionBuilder().setNetworkBufferPool(globalPool).build();
        ChannelStateReader stateReader = ChannelStateReader.NO_OP;
        try {
            partition.setup();
            partition.readRecoveredState(stateReader);
            for (ResultSubpartition subpartition : partition.getAllPartitions()) {
                Assert.assertEquals((long)0L, (long)subpartition.getTotalNumberOfBuffers());
            }
            partition.getBufferPool().lazyDestroy();
            Assert.assertEquals((long)2L, (long)globalPool.getNumberOfAvailableMemorySegments());
        }
        finally {
            globalPool.destroyAllBufferPools();
            globalPool.destroy();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testInitializeMoreStateThanBuffer() throws Exception {
        int totalBuffers = 2;
        int totalStates = 5;
        int[] states = new int[]{1, 2, 3, 4};
        int bufferSize = states.length * 4;
        NetworkBufferPool globalPool = new NetworkBufferPool(2, bufferSize, 1);
        FiniteChannelStateReader stateReader = new FiniteChannelStateReader(5, states);
        ResultPartition partition = new ResultPartitionBuilder().setNetworkBufferPool(globalPool).build();
        ExecutorService executor = Executors.newFixedThreadPool(1);
        try {
            Callable<Void> partitionConsumeTask = () -> {
                for (ResultSubpartition subpartition : partition.getAllPartitions()) {
                    PipelinedSubpartitionView view = new PipelinedSubpartitionView((PipelinedSubpartition)subpartition, (BufferAvailabilityListener)new NoOpBufferAvailablityListener());
                    int numConsumedBuffers = 0;
                    while (numConsumedBuffers != 5) {
                        ResultSubpartition.BufferAndBacklog bufferAndBacklog = view.getNextBuffer();
                        if (bufferAndBacklog != null) {
                            Buffer buffer = bufferAndBacklog.buffer();
                            BufferBuilderAndConsumerTest.assertContent(buffer, partition.getBufferPool().getSubpartitionBufferRecyclers()[subpartition.getSubPartitionIndex()], states);
                            buffer.recycleBuffer();
                            ++numConsumedBuffers;
                            continue;
                        }
                        Thread.sleep(5L);
                    }
                    Assert.assertNull((Object)view.getNextBuffer());
                }
                return null;
            };
            Future<Void> result = executor.submit(partitionConsumeTask);
            partition.setup();
            partition.readRecoveredState((ChannelStateReader)stateReader);
            result.get(20L, TimeUnit.SECONDS);
            partition.getBufferPool().lazyDestroy();
            Assert.assertEquals((long)2L, (long)globalPool.getNumberOfAvailableMemorySegments());
        }
        finally {
            executor.shutdown();
            globalPool.destroyAllBufferPools();
            globalPool.destroy();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testReadRecoveredStateWithException() throws Exception {
        int totalBuffers = 2;
        NetworkBufferPool globalPool = new NetworkBufferPool(2, 1, 1);
        ResultPartition partition = new ResultPartitionBuilder().setNetworkBufferPool(globalPool).build();
        ChannelStateReaderWithException stateReader = new ChannelStateReaderWithException();
        try {
            partition.setup();
            partition.readRecoveredState((ChannelStateReader)stateReader);
        }
        catch (IOException e) {
            MatcherAssert.assertThat((String)"should throw custom exception message", (boolean)e.getMessage().contains("test"));
        }
        finally {
            globalPool.destroyAllBufferPools();
            Assert.assertEquals((long)2L, (long)globalPool.getNumberOfAvailableMemorySegments());
            globalPool.destroy();
        }
    }

    public static final class ChannelStateReaderWithException
    implements ChannelStateReader {
        public boolean hasChannelStates() {
            return true;
        }

        public ChannelStateReader.ReadResult readInputData(InputChannelInfo info, Buffer buffer) throws IOException {
            throw new IOException("test");
        }

        public ChannelStateReader.ReadResult readOutputData(ResultSubpartitionInfo info, BufferBuilder bufferBuilder) throws IOException {
            throw new IOException("test");
        }

        public void close() {
        }
    }

    public static final class FiniteChannelStateReader
    implements ChannelStateReader {
        private final int totalStates;
        private int numRestoredStates;
        private final int[] states;
        private final Map<InputChannelInfo, Integer> counters = new HashMap<InputChannelInfo, Integer>();

        public FiniteChannelStateReader(int totalStates, int[] states) {
            this.totalStates = totalStates;
            this.states = states;
        }

        public boolean hasChannelStates() {
            return true;
        }

        public ChannelStateReader.ReadResult readInputData(InputChannelInfo info, Buffer buffer) {
            for (int state : this.states) {
                buffer.asByteBuf().writeInt(state);
            }
            int result = this.counters.compute(info, (unused, counter) -> {
                int n;
                if (counter == null) {
                    n = 1;
                } else {
                    counter = counter + 1;
                    n = counter;
                }
                return n;
            });
            return this.getReadResult(result);
        }

        public ChannelStateReader.ReadResult readOutputData(ResultSubpartitionInfo info, BufferBuilder bufferBuilder) {
            bufferBuilder.appendAndCommit(BufferBuilderAndConsumerTest.toByteBuffer(this.states));
            return this.getReadResult(++this.numRestoredStates);
        }

        private ChannelStateReader.ReadResult getReadResult(int numRestoredStates) {
            if (numRestoredStates < this.totalStates) {
                return ChannelStateReader.ReadResult.HAS_MORE_DATA;
            }
            return ChannelStateReader.ReadResult.NO_MORE_DATA;
        }

        public void close() {
        }
    }
}

