/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition.consumer;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.flink.core.memory.MemorySegmentProvider;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateReader;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferBuilderAndConsumerTest;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.partition.ResultPartitionTest;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannelBuilder;
import org.apache.flink.runtime.io.network.partition.consumer.RecoveredInputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannelTest;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBuilder;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class RecoveredInputChannelTest {
    private final boolean isRemote;

    @Parameterized.Parameters(name="isRemote = {0}")
    public static Collection<Object[]> parameters() {
        return Arrays.asList({true}, {false});
    }

    public RecoveredInputChannelTest(boolean isRemote) {
        this.isRemote = isRemote;
    }

    @Test
    public void testConcurrentReadStateAndProcess() throws Exception {
        this.testConcurrentReadStateAndProcess(this.isRemote);
    }

    @Test
    public void testConcurrentReadStateAndRelease() throws Exception {
        this.testConcurrentReadStateAndRelease(this.isRemote);
    }

    @Test
    public void testConcurrentReadStateAndProcessAndRelease() throws Exception {
        this.testConcurrentReadStateAndProcessAndRelease(this.isRemote);
    }

    @Test
    public void testReadEmptyState() throws Exception {
        this.testReadEmptyStateOrThrowException(this.isRemote, ChannelStateReader.NO_OP);
    }

    @Test(expected=IOException.class)
    public void testReadStateWithException() throws Exception {
        this.testReadEmptyStateOrThrowException(this.isRemote, new ResultPartitionTest.ChannelStateReaderWithException());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testReadEmptyStateOrThrowException(boolean isRemote, ChannelStateReader reader) throws Exception {
        int totalBuffers = 10;
        NetworkBufferPool globalPool = new NetworkBufferPool(10, 32, 2);
        SingleInputGate inputGate = this.createInputGate(globalPool);
        RecoveredInputChannel inputChannel = this.createRecoveredChannel(isRemote, inputGate);
        try {
            inputGate.setInputChannels(new InputChannel[]{inputChannel});
            inputGate.setup();
            inputChannel.readRecoveredState(reader);
            Assert.assertEquals((long)1L, (long)inputChannel.getNumberOfQueuedBuffers());
            Assert.assertFalse((boolean)inputChannel.getNextBuffer().isPresent());
            Assert.assertTrue((boolean)inputChannel.getStateConsumedFuture().isDone());
        }
        finally {
            inputGate.close();
            globalPool.destroyAllBufferPools();
            Assert.assertEquals((long)10L, (long)globalPool.getNumberOfAvailableMemorySegments());
            globalPool.destroy();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testConcurrentReadStateAndProcess(boolean isRemote) throws Exception {
        int totalBuffers = 10;
        NetworkBufferPool globalPool = new NetworkBufferPool(10, 32, 2);
        SingleInputGate inputGate = this.createInputGate(globalPool);
        RecoveredInputChannel inputChannel = this.createRecoveredChannel(isRemote, inputGate);
        int totalStates = 15;
        int[] states = new int[]{1, 2, 3, 4};
        ResultPartitionTest.FiniteChannelStateReader reader = new ResultPartitionTest.FiniteChannelStateReader(15, states);
        ExecutorService executor = Executors.newFixedThreadPool(2);
        Throwable thrown = null;
        try {
            inputGate.setInputChannels(new InputChannel[]{inputChannel});
            inputGate.setup();
            Callable<Void> processTask = this.processRecoveredBufferTask(inputChannel, 15, states, false);
            Callable<Void> readStateTask = this.readRecoveredStateTask(inputChannel, reader, false);
            RemoteInputChannelTest.submitTasksAndWaitForResults(executor, new Callable[]{readStateTask, processTask});
        }
        catch (Throwable t) {
            try {
                thrown = t;
            }
            catch (Throwable throwable) {
                RemoteInputChannelTest.cleanup(globalPool, executor, null, thrown, new InputChannel[]{inputChannel});
                throw throwable;
            }
            RemoteInputChannelTest.cleanup(globalPool, executor, null, thrown, new InputChannel[]{inputChannel});
        }
        RemoteInputChannelTest.cleanup(globalPool, executor, null, thrown, new InputChannel[]{inputChannel});
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testConcurrentReadStateAndRelease(boolean isRemote) throws Exception {
        int totalBuffers = 10;
        NetworkBufferPool globalPool = new NetworkBufferPool(10, 32, 2);
        SingleInputGate inputGate = this.createInputGate(globalPool);
        RecoveredInputChannel inputChannel = this.createRecoveredChannel(isRemote, inputGate);
        int totalStates = 15;
        int[] states = new int[]{1, 2, 3, 4};
        ResultPartitionTest.FiniteChannelStateReader reader = new ResultPartitionTest.FiniteChannelStateReader(15, states);
        ExecutorService executor = Executors.newFixedThreadPool(2);
        Throwable thrown = null;
        try {
            inputGate.setInputChannels(new InputChannel[]{inputChannel});
            inputGate.setup();
            RemoteInputChannelTest.submitTasksAndWaitForResults(executor, new Callable[]{this.readRecoveredStateTask(inputChannel, reader, true), this.releaseChannelTask(inputChannel)});
        }
        catch (Throwable t) {
            try {
                thrown = t;
            }
            catch (Throwable throwable) {
                RemoteInputChannelTest.cleanup(globalPool, executor, null, thrown, new InputChannel[]{inputChannel});
                throw throwable;
            }
            RemoteInputChannelTest.cleanup(globalPool, executor, null, thrown, new InputChannel[]{inputChannel});
        }
        RemoteInputChannelTest.cleanup(globalPool, executor, null, thrown, new InputChannel[]{inputChannel});
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testConcurrentReadStateAndProcessAndRelease(boolean isRemote) throws Exception {
        int totalBuffers = 10;
        NetworkBufferPool globalPool = new NetworkBufferPool(10, 32, 2);
        SingleInputGate inputGate = this.createInputGate(globalPool);
        RecoveredInputChannel inputChannel = this.createRecoveredChannel(isRemote, inputGate);
        int totalStates = 15;
        int[] states = new int[]{1, 2, 3, 4};
        ResultPartitionTest.FiniteChannelStateReader reader = new ResultPartitionTest.FiniteChannelStateReader(15, states);
        ExecutorService executor = Executors.newFixedThreadPool(2);
        Throwable thrown = null;
        try {
            inputGate.setInputChannels(new InputChannel[]{inputChannel});
            inputGate.setup();
            Callable<Void> processTask = this.processRecoveredBufferTask(inputChannel, 15, states, true);
            Callable<Void> readStateTask = this.readRecoveredStateTask(inputChannel, reader, true);
            Callable<Void> releaseTask = this.releaseChannelTask(inputChannel);
            RemoteInputChannelTest.submitTasksAndWaitForResults(executor, new Callable[]{readStateTask, processTask, releaseTask});
        }
        catch (Throwable t) {
            try {
                thrown = t;
            }
            catch (Throwable throwable) {
                RemoteInputChannelTest.cleanup(globalPool, executor, null, thrown, new InputChannel[]{inputChannel});
                throw throwable;
            }
            RemoteInputChannelTest.cleanup(globalPool, executor, null, thrown, new InputChannel[]{inputChannel});
        }
        RemoteInputChannelTest.cleanup(globalPool, executor, null, thrown, new InputChannel[]{inputChannel});
    }

    private Callable<Void> readRecoveredStateTask(RecoveredInputChannel inputChannel, ChannelStateReader reader, boolean verifyRelease) {
        return () -> {
            block2: {
                try {
                    inputChannel.readRecoveredState(reader);
                }
                catch (Throwable t) {
                    if (verifyRelease && inputChannel.isReleased()) break block2;
                    throw new AssertionError("Exceptions are expected here only if the input channel was released", t);
                }
            }
            return null;
        };
    }

    private Callable<Void> processRecoveredBufferTask(RecoveredInputChannel inputChannel, int totalStates, int[] states, boolean verifyRelease) {
        return () -> {
            int numProcessedStates = 0;
            while (!(numProcessedStates >= totalStates || verifyRelease && inputChannel.isReleased())) {
                if (inputChannel.getNumberOfQueuedBuffers() == 0) {
                    Thread.sleep(1L);
                    continue;
                }
                try {
                    Optional bufferAndAvailability = inputChannel.getNextBuffer();
                    if (!bufferAndAvailability.isPresent()) continue;
                    Buffer buffer = ((InputChannel.BufferAndAvailability)bufferAndAvailability.get()).buffer();
                    BufferBuilderAndConsumerTest.assertContent(buffer, null, states);
                    buffer.recycleBuffer();
                    ++numProcessedStates;
                }
                catch (Throwable t) {
                    if (!verifyRelease || !inputChannel.isReleased()) {
                        throw new AssertionError("Exceptions are expected here only if the input channel was released", t);
                    }
                }
            }
            return null;
        };
    }

    private Callable<Void> releaseChannelTask(RecoveredInputChannel inputChannel) {
        return () -> {
            inputChannel.releaseAllResources();
            return null;
        };
    }

    private RecoveredInputChannel createRecoveredChannel(boolean isRemote, SingleInputGate gate) {
        if (isRemote) {
            return new InputChannelBuilder().buildRemoteRecoveredChannel(gate);
        }
        return new InputChannelBuilder().buildLocalRecoveredChannel(gate);
    }

    private SingleInputGate createInputGate(NetworkBufferPool globalPool) throws Exception {
        return new SingleInputGateBuilder().setBufferPoolFactory(globalPool.createBufferPool(8, 8)).setSegmentProvider((MemorySegmentProvider)globalPool).build();
    }
}

