/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.api.writer;

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.api.common.JobID;
import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateReader;
import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
import org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer;
import org.apache.flink.runtime.io.network.api.writer.ChannelSelector;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.io.network.api.writer.RecordWriterBuilder;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
import org.apache.flink.runtime.io.network.buffer.BufferBuilderAndConsumerTest;
import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.BufferPoolOwner;
import org.apache.flink.runtime.io.network.buffer.BufferProvider;
import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
import org.apache.flink.runtime.io.network.partition.MockResultPartitionWriter;
import org.apache.flink.runtime.io.network.partition.NoOpBufferAvailablityListener;
import org.apache.flink.runtime.io.network.partition.NoOpResultPartitionConsumableNotifier;
import org.apache.flink.runtime.io.network.partition.PipelinedSubpartition;
import org.apache.flink.runtime.io.network.partition.PipelinedSubpartitionView;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionBuilder;
import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
import org.apache.flink.runtime.io.network.partition.ResultPartitionTest;
import org.apache.flink.runtime.io.network.partition.ResultSubpartition;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.util.DeserializationUtils;
import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider;
import org.apache.flink.runtime.operators.shipping.OutputEmitter;
import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
import org.apache.flink.runtime.taskmanager.ConsumableNotifyingResultPartitionWriterDecorator;
import org.apache.flink.runtime.taskmanager.NoOpTaskActions;
import org.apache.flink.runtime.taskmanager.TaskActions;
import org.apache.flink.testutils.serialization.types.SerializationTestType;
import org.apache.flink.testutils.serialization.types.SerializationTestTypeFactory;
import org.apache.flink.testutils.serialization.types.Util;
import org.apache.flink.types.IntValue;
import org.apache.flink.util.XORShiftRandom;
import org.apache.flink.util.function.FunctionWithException;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.mockito.Mockito;

public class RecordWriterTest {
    private final boolean isBroadcastWriter;
    @Rule
    public TemporaryFolder tempFolder = new TemporaryFolder();

    public RecordWriterTest() {
        this(false);
    }

    RecordWriterTest(boolean isBroadcastWriter) {
        this.isBroadcastWriter = isBroadcastWriter;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testClearBuffersAfterInterruptDuringBlockingBufferRequest() throws Exception {
        ExecutorService executor = null;
        try {
            executor = Executors.newSingleThreadExecutor();
            TestPooledBufferProvider bufferProvider = new TestPooledBufferProvider(1);
            KeepingPartitionWriter partitionWriter = new KeepingPartitionWriter(bufferProvider);
            final RecordWriter recordWriter = this.createRecordWriter(partitionWriter);
            final CountDownLatch waitLock = new CountDownLatch(1);
            Future<Void> result = executor.submit(new Callable<Void>(){

                @Override
                public Void call() throws Exception {
                    IntValue val = new IntValue(0);
                    try {
                        recordWriter.emit((IOReadableWritable)val);
                        recordWriter.flushAll();
                        waitLock.countDown();
                        recordWriter.emit((IOReadableWritable)val);
                    }
                    catch (InterruptedException e) {
                        recordWriter.clearBuffers();
                    }
                    return null;
                }
            });
            waitLock.await();
            result.cancel(true);
            recordWriter.clearBuffers();
            Assert.assertEquals((long)0L, (long)bufferProvider.getNumberOfAvailableBuffers());
            partitionWriter.close();
            Assert.assertEquals((long)1L, (long)bufferProvider.getNumberOfAvailableBuffers());
        }
        finally {
            if (executor != null) {
                executor.shutdown();
            }
        }
    }

    @Test
    public void testSerializerClearedAfterClearBuffers() throws Exception {
        ResultPartitionWriter partitionWriter = (ResultPartitionWriter)Mockito.spy((Object)new RecyclingPartitionWriter(new TestPooledBufferProvider(1, 16)));
        RecordWriter recordWriter = this.createRecordWriter(partitionWriter);
        recordWriter.emit((IOReadableWritable)new IntValue(0));
        recordWriter.clearBuffers();
        recordWriter.flushAll();
    }

    @Test
    public void testBroadcastEventNoRecords() throws Exception {
        int numberOfChannels = 4;
        int bufferSize = 32;
        Queue[] queues = new Queue[numberOfChannels];
        for (int i = 0; i < numberOfChannels; ++i) {
            queues[i] = new ArrayDeque();
        }
        TestPooledBufferProvider bufferProvider = new TestPooledBufferProvider(Integer.MAX_VALUE, bufferSize);
        CollectingPartitionWriter partitionWriter = new CollectingPartitionWriter(queues, bufferProvider);
        RecordWriter writer = this.createRecordWriter(partitionWriter);
        CheckpointBarrier barrier = new CheckpointBarrier(2148402839L, 2166311875L, CheckpointOptions.forCheckpointWithDefaultLocation());
        writer.broadcastEvent((AbstractEvent)barrier);
        Assert.assertEquals((long)0L, (long)bufferProvider.getNumberOfCreatedBuffers());
        for (int i = 0; i < numberOfChannels; ++i) {
            Assert.assertEquals((long)1L, (long)queues[i].size());
            BufferOrEvent boe = RecordWriterTest.parseBuffer((BufferConsumer)queues[i].remove(), i);
            Assert.assertTrue((boolean)boe.isEvent());
            Assert.assertEquals((Object)barrier, (Object)boe.getEvent());
            Assert.assertEquals((long)0L, (long)queues[i].size());
        }
    }

    @Test
    public void testBroadcastEventMixedRecords() throws Exception {
        XORShiftRandom rand = new XORShiftRandom();
        int numberOfChannels = 4;
        int bufferSize = 32;
        int lenBytes = 4;
        Queue[] queues = new Queue[numberOfChannels];
        for (int i = 0; i < numberOfChannels; ++i) {
            queues[i] = new ArrayDeque();
        }
        TestPooledBufferProvider bufferProvider = new TestPooledBufferProvider(Integer.MAX_VALUE, bufferSize);
        CollectingPartitionWriter partitionWriter = new CollectingPartitionWriter(queues, bufferProvider);
        RecordWriter writer = this.createRecordWriter(partitionWriter);
        CheckpointBarrier barrier = new CheckpointBarrier(2147484939L, 2147483846L, CheckpointOptions.forCheckpointWithDefaultLocation());
        byte[] bytes = new byte[bufferSize / 2];
        rand.nextBytes(bytes);
        writer.emit((IOReadableWritable)new ByteArrayIO(bytes));
        bytes = new byte[bufferSize + 1];
        rand.nextBytes(bytes);
        writer.emit((IOReadableWritable)new ByteArrayIO(bytes));
        bytes = new byte[bufferSize - lenBytes];
        rand.nextBytes(bytes);
        writer.emit((IOReadableWritable)new ByteArrayIO(bytes));
        writer.broadcastEvent((AbstractEvent)barrier);
        if (this.isBroadcastWriter) {
            Assert.assertEquals((long)3L, (long)bufferProvider.getNumberOfCreatedBuffers());
            for (int i = 0; i < numberOfChannels; ++i) {
                Assert.assertEquals((long)4L, (long)queues[i].size());
                for (int j = 0; j < 3; ++j) {
                    Assert.assertTrue((boolean)RecordWriterTest.parseBuffer((BufferConsumer)queues[i].remove(), 0).isBuffer());
                }
                BufferOrEvent boe = RecordWriterTest.parseBuffer((BufferConsumer)queues[i].remove(), i);
                Assert.assertTrue((boolean)boe.isEvent());
                Assert.assertEquals((Object)barrier, (Object)boe.getEvent());
            }
        } else {
            Assert.assertEquals((long)4L, (long)bufferProvider.getNumberOfCreatedBuffers());
            Assert.assertEquals((long)2L, (long)queues[0].size());
            Assert.assertTrue((boolean)RecordWriterTest.parseBuffer((BufferConsumer)queues[0].remove(), 0).isBuffer());
            Assert.assertEquals((long)3L, (long)queues[1].size());
            Assert.assertTrue((boolean)RecordWriterTest.parseBuffer((BufferConsumer)queues[1].remove(), 1).isBuffer());
            Assert.assertTrue((boolean)RecordWriterTest.parseBuffer((BufferConsumer)queues[1].remove(), 1).isBuffer());
            Assert.assertEquals((long)2L, (long)queues[2].size());
            Assert.assertTrue((boolean)RecordWriterTest.parseBuffer((BufferConsumer)queues[2].remove(), 2).isBuffer());
            Assert.assertEquals((long)1L, (long)queues[3].size());
            for (int i = 0; i < numberOfChannels; ++i) {
                BufferOrEvent boe = RecordWriterTest.parseBuffer((BufferConsumer)queues[i].remove(), i);
                Assert.assertTrue((boolean)boe.isEvent());
                Assert.assertEquals((Object)barrier, (Object)boe.getEvent());
            }
        }
    }

    @Test
    public void testBroadcastEventBufferReferenceCounting() throws Exception {
        ArrayDeque[] queues = new ArrayDeque[]{new ArrayDeque(), new ArrayDeque()};
        CollectingPartitionWriter partition = new CollectingPartitionWriter(queues, new TestPooledBufferProvider(Integer.MAX_VALUE));
        RecordWriter writer = this.createRecordWriter(partition);
        writer.broadcastEvent((AbstractEvent)EndOfPartitionEvent.INSTANCE);
        Assert.assertEquals((long)1L, (long)queues[0].size());
        Assert.assertEquals((long)1L, (long)queues[1].size());
        BufferConsumer bufferConsumer1 = (BufferConsumer)queues[0].getFirst();
        BufferConsumer bufferConsumer2 = (BufferConsumer)queues[1].getFirst();
        for (int i = 0; i < queues.length; ++i) {
            Assert.assertTrue((boolean)RecordWriterTest.parseBuffer((BufferConsumer)queues[i].remove(), i).isEvent());
        }
        Assert.assertTrue((boolean)bufferConsumer1.isRecycled());
        Assert.assertTrue((boolean)bufferConsumer2.isRecycled());
    }

    @Test
    public void testBroadcastEventBufferIndependence() throws Exception {
        this.verifyBroadcastBufferOrEventIndependence(true);
    }

    @Test
    public void testBroadcastEmitBufferIndependence() throws Exception {
        this.verifyBroadcastBufferOrEventIndependence(false);
    }

    @Test
    public void testBroadcastEmitRecord() throws Exception {
        int numberOfChannels = 4;
        int bufferSize = 32;
        int numValues = 8;
        int serializationLength = 4;
        Queue[] queues = new Queue[4];
        for (int i = 0; i < 4; ++i) {
            queues[i] = new ArrayDeque();
        }
        TestPooledBufferProvider bufferProvider = new TestPooledBufferProvider(Integer.MAX_VALUE, 32);
        CollectingPartitionWriter partitionWriter = new CollectingPartitionWriter(queues, bufferProvider);
        RecordWriter writer = this.createRecordWriter(partitionWriter);
        SpillingAdaptiveSpanningRecordDeserializer deserializer = new SpillingAdaptiveSpanningRecordDeserializer(new String[]{this.tempFolder.getRoot().getAbsolutePath()});
        ArrayDeque<SerializationTestType> serializedRecords = new ArrayDeque<SerializationTestType>();
        Util.MockRecords records = Util.randomRecords((int)8, (SerializationTestTypeFactory)SerializationTestTypeFactory.INT);
        for (SerializationTestType record : records) {
            serializedRecords.add(record);
            writer.broadcastEmit((IOReadableWritable)record);
        }
        int numRequiredBuffers = 2;
        if (this.isBroadcastWriter) {
            Assert.assertEquals((long)2L, (long)bufferProvider.getNumberOfCreatedBuffers());
        } else {
            Assert.assertEquals((long)8L, (long)bufferProvider.getNumberOfCreatedBuffers());
        }
        for (int i = 0; i < 4; ++i) {
            Assert.assertEquals((long)2L, (long)queues[i].size());
            this.verifyDeserializationResults(queues[i], (RecordDeserializer<SerializationTestType>)deserializer, (ArrayDeque<SerializationTestType>)serializedRecords.clone(), 2, 8);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testIsAvailableOrNot() throws Exception {
        NetworkBufferPool globalPool = new NetworkBufferPool(10, 128, 2);
        BufferPool localPool = globalPool.createBufferPool(1, 1, null, 1, Integer.MAX_VALUE);
        ResultPartition resultPartition = new ResultPartitionBuilder().setBufferPoolFactory((FunctionWithException<BufferPoolOwner, BufferPool, IOException>)((FunctionWithException)p -> localPool)).build();
        resultPartition.setup();
        ConsumableNotifyingResultPartitionWriterDecorator partitionWrapper = new ConsumableNotifyingResultPartitionWriterDecorator((TaskActions)new NoOpTaskActions(), new JobID(), (ResultPartitionWriter)resultPartition, (ResultPartitionConsumableNotifier)new NoOpResultPartitionConsumableNotifier());
        RecordWriter recordWriter = this.createRecordWriter((ResultPartitionWriter)partitionWrapper);
        try {
            Assert.assertTrue((boolean)recordWriter.getAvailableFuture().isDone());
            BufferBuilder bufferBuilder = resultPartition.getBufferBuilder(0);
            Assert.assertNotNull((Object)bufferBuilder);
            Assert.assertFalse((boolean)recordWriter.getAvailableFuture().isDone());
            Buffer buffer = BufferBuilderTestUtils.buildSingleBuffer(bufferBuilder);
            buffer.recycleBuffer();
            Assert.assertTrue((boolean)recordWriter.getAvailableFuture().isDone());
            Assert.assertEquals((Object)RecordWriter.AVAILABLE, (Object)recordWriter.getAvailableFuture());
        }
        finally {
            localPool.lazyDestroy();
            globalPool.destroy();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testEmitRecordWithPartitionStateRecovery() throws Exception {
        int totalBuffers = 10;
        int totalStates = 2;
        int[] states = new int[]{1, 2, 3, 4};
        int[] records = new int[]{5, 6, 7, 8};
        int bufferSize = states.length * 4;
        NetworkBufferPool globalPool = new NetworkBufferPool(10, bufferSize, 1);
        ResultPartitionTest.FiniteChannelStateReader stateReader = new ResultPartitionTest.FiniteChannelStateReader(2, states);
        ResultPartition partition = new ResultPartitionBuilder().setNetworkBufferPool(globalPool).build();
        RecordWriter recordWriter = new RecordWriterBuilder().build((ResultPartitionWriter)partition);
        try {
            partition.setup();
            partition.readRecoveredState((ChannelStateReader)stateReader);
            for (int record : records) {
                recordWriter.broadcastEmit((IOReadableWritable)new IntValue(record));
            }
            int[][] expectedRecordsInBuffer = new int[][]{{4, 5, 4, 6}, {4, 7, 4, 8}};
            for (ResultSubpartition subpartition : partition.getAllPartitions()) {
                ResultSubpartition.BufferAndBacklog bufferAndBacklog;
                PipelinedSubpartitionView view = new PipelinedSubpartitionView((PipelinedSubpartition)subpartition, (BufferAvailabilityListener)new NoOpBufferAvailablityListener());
                int numConsumedBuffers = 0;
                while ((bufferAndBacklog = view.getNextBuffer()) != null) {
                    Buffer buffer = bufferAndBacklog.buffer();
                    int[] expected = numConsumedBuffers < 2 ? states : expectedRecordsInBuffer[numConsumedBuffers - 2];
                    BufferBuilderAndConsumerTest.assertContent(buffer, partition.getBufferPool().getSubpartitionBufferRecyclers()[subpartition.getSubPartitionIndex()], expected);
                    buffer.recycleBuffer();
                    ++numConsumedBuffers;
                }
                Assert.assertEquals((long)(2 + expectedRecordsInBuffer.length), (long)numConsumedBuffers);
            }
        }
        finally {
            globalPool.destroyAllBufferPools();
            globalPool.destroy();
        }
    }

    @Test
    public void testIdleTime() throws IOException, InterruptedException {
        NetworkBufferPool globalPool = new NetworkBufferPool(10, 128, 2);
        BufferPool localPool = globalPool.createBufferPool(1, 1, null, 1, Integer.MAX_VALUE);
        ResultPartition resultPartition = new ResultPartitionBuilder().setBufferPoolFactory((FunctionWithException<BufferPoolOwner, BufferPool, IOException>)((FunctionWithException)p -> localPool)).build();
        resultPartition.setup();
        ConsumableNotifyingResultPartitionWriterDecorator partitionWrapper = new ConsumableNotifyingResultPartitionWriterDecorator((TaskActions)new NoOpTaskActions(), new JobID(), (ResultPartitionWriter)resultPartition, (ResultPartitionConsumableNotifier)new NoOpResultPartitionConsumableNotifier());
        final RecordWriter recordWriter = this.createRecordWriter((ResultPartitionWriter)partitionWrapper);
        BufferBuilder builder = recordWriter.requestNewBufferBuilder(0);
        BufferBuilderTestUtils.fillBufferBuilder(builder, 1).finish();
        ResultSubpartitionView readView = resultPartition.getSubpartition(0).createReadView((BufferAvailabilityListener)new NoOpBufferAvailablityListener());
        Buffer buffer = readView.getNextBuffer().buffer();
        Assert.assertEquals((long)0L, (long)recordWriter.getIdleTimeMsPerSecond().getCount());
        final CountDownLatch syncLock = new CountDownLatch(1);
        final AtomicReference asyncRequestResult = new AtomicReference();
        Thread requestThread = new Thread(new Runnable(){

            @Override
            public void run() {
                try {
                    syncLock.countDown();
                    asyncRequestResult.set(recordWriter.requestNewBufferBuilder(0));
                }
                catch (Exception exception) {
                    // empty catch block
                }
            }
        });
        requestThread.start();
        syncLock.await();
        Thread.sleep(10L);
        buffer.recycleBuffer();
        requestThread.join();
        Assert.assertThat((Object)recordWriter.getIdleTimeMsPerSecond().getCount(), (Matcher)Matchers.greaterThan((Comparable)Long.valueOf(0L)));
        Assert.assertNotNull(asyncRequestResult.get());
    }

    private void verifyBroadcastBufferOrEventIndependence(boolean broadcastEvent) throws Exception {
        ArrayDeque[] queues = new ArrayDeque[]{new ArrayDeque(), new ArrayDeque()};
        CollectingPartitionWriter partition = new CollectingPartitionWriter(queues, new TestPooledBufferProvider(Integer.MAX_VALUE));
        RecordWriter writer = this.createRecordWriter(partition);
        if (broadcastEvent) {
            writer.broadcastEvent((AbstractEvent)EndOfPartitionEvent.INSTANCE);
        } else {
            writer.broadcastEmit((IOReadableWritable)new IntValue(0));
        }
        Assert.assertEquals((long)1L, (long)queues[0].size());
        Assert.assertEquals((long)1L, (long)queues[1].size());
        Buffer buffer1 = BufferBuilderTestUtils.buildSingleBuffer((BufferConsumer)queues[0].remove());
        Buffer buffer2 = BufferBuilderTestUtils.buildSingleBuffer((BufferConsumer)queues[1].remove());
        Assert.assertEquals((long)0L, (long)buffer1.getReaderIndex());
        Assert.assertEquals((long)0L, (long)buffer2.getReaderIndex());
        buffer1.setReaderIndex(1);
        Assert.assertEquals((String)"Buffer 2 shares the same reader index as buffer 1", (long)0L, (long)buffer2.getReaderIndex());
    }

    protected void verifyDeserializationResults(Queue<BufferConsumer> queue, RecordDeserializer<SerializationTestType> deserializer, ArrayDeque<SerializationTestType> expectedRecords, int numRequiredBuffers, int numValues) throws Exception {
        int assertRecords = 0;
        for (int j = 0; j < numRequiredBuffers; ++j) {
            Buffer buffer = BufferBuilderTestUtils.buildSingleBuffer(queue.remove());
            deserializer.setNextBuffer(buffer);
            assertRecords += DeserializationUtils.deserializeRecords(expectedRecords, deserializer);
        }
        Assert.assertEquals((long)numValues, (long)assertRecords);
    }

    private RecordWriter createRecordWriter(ResultPartitionWriter writer) {
        if (this.isBroadcastWriter) {
            return new RecordWriterBuilder().setChannelSelector((ChannelSelector)new OutputEmitter(ShipStrategyType.BROADCAST, 0)).build(writer);
        }
        return new RecordWriterBuilder().build(writer);
    }

    static BufferOrEvent parseBuffer(BufferConsumer bufferConsumer, int targetChannel) throws IOException {
        Buffer buffer = BufferBuilderTestUtils.buildSingleBuffer(bufferConsumer);
        if (buffer.isBuffer()) {
            return new BufferOrEvent(buffer, new InputChannelInfo(0, targetChannel));
        }
        AbstractEvent event = EventSerializer.fromBuffer((Buffer)buffer, (ClassLoader)RecordWriterTest.class.getClassLoader());
        buffer.recycleBuffer();
        return new BufferOrEvent(event, new InputChannelInfo(0, targetChannel));
    }

    private static class TrackingBufferRecycler
    implements BufferRecycler {
        private final ArrayList<MemorySegment> recycledMemorySegments = new ArrayList();

        private TrackingBufferRecycler() {
        }

        public synchronized void recycle(MemorySegment memorySegment) {
            this.recycledMemorySegments.add(memorySegment);
        }

        public synchronized List<MemorySegment> getRecycledMemorySegments() {
            return this.recycledMemorySegments;
        }
    }

    private static class ByteArrayIO
    implements IOReadableWritable {
        private final byte[] bytes;

        public ByteArrayIO(byte[] bytes) {
            this.bytes = bytes;
        }

        public void write(DataOutputView out) throws IOException {
            out.write(this.bytes);
        }

        public void read(DataInputView in) throws IOException {
            in.readFully(this.bytes);
        }
    }

    static class KeepingPartitionWriter
    extends MockResultPartitionWriter {
        private final BufferProvider bufferProvider;
        private Map<Integer, List<BufferConsumer>> produced = new HashMap<Integer, List<BufferConsumer>>();

        KeepingPartitionWriter(BufferProvider bufferProvider) {
            this.bufferProvider = bufferProvider;
        }

        @Override
        public BufferBuilder getBufferBuilder(int targetChannel) throws IOException, InterruptedException {
            return this.bufferProvider.requestBufferBuilderBlocking(targetChannel);
        }

        @Override
        public BufferBuilder tryGetBufferBuilder(int targetChannel) throws IOException {
            return this.bufferProvider.requestBufferBuilder(targetChannel);
        }

        @Override
        public boolean addBufferConsumer(BufferConsumer bufferConsumer, int targetChannel, boolean isPriorityEvent) {
            this.produced.putIfAbsent(targetChannel, new ArrayList());
            this.produced.get(targetChannel).add(bufferConsumer);
            return true;
        }

        public List<BufferConsumer> getAddedBufferConsumers(int subpartitionIndex) {
            return this.produced.get(subpartitionIndex);
        }

        @Override
        public void close() {
            for (List<BufferConsumer> bufferConsumers : this.produced.values()) {
                for (BufferConsumer bufferConsumer : bufferConsumers) {
                    bufferConsumer.close();
                }
            }
            this.produced.clear();
        }
    }

    private static class RecyclingPartitionWriter
    extends MockResultPartitionWriter {
        private final BufferProvider bufferProvider;

        private RecyclingPartitionWriter(BufferProvider bufferProvider) {
            this.bufferProvider = bufferProvider;
        }

        @Override
        public BufferBuilder getBufferBuilder(int targetChannel) throws IOException, InterruptedException {
            return this.bufferProvider.requestBufferBuilderBlocking(targetChannel);
        }

        @Override
        public BufferBuilder tryGetBufferBuilder(int targetChannel) throws IOException {
            return this.bufferProvider.requestBufferBuilder(targetChannel);
        }
    }

    static class CollectingPartitionWriter
    extends MockResultPartitionWriter {
        private final Queue<BufferConsumer>[] queues;
        private final BufferProvider bufferProvider;

        CollectingPartitionWriter(Queue<BufferConsumer>[] queues, BufferProvider bufferProvider) {
            this.queues = queues;
            this.bufferProvider = bufferProvider;
        }

        @Override
        public int getNumberOfSubpartitions() {
            return this.queues.length;
        }

        @Override
        public BufferBuilder getBufferBuilder(int targetChannel) throws IOException, InterruptedException {
            return this.bufferProvider.requestBufferBuilderBlocking(targetChannel);
        }

        @Override
        public BufferBuilder tryGetBufferBuilder(int targetChannel) throws IOException {
            return this.bufferProvider.requestBufferBuilder(targetChannel);
        }

        @Override
        public boolean addBufferConsumer(BufferConsumer buffer, int targetChannel, boolean isPriorityEvent) {
            return this.queues[targetChannel].add(buffer);
        }
    }
}

