/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.api.writer;

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.api.writer.MultipleRecordWriters;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.io.network.api.writer.RecordWriterBuilder;
import org.apache.flink.runtime.io.network.api.writer.RecordWriterDelegate;
import org.apache.flink.runtime.io.network.api.writer.RecordWriterTest;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.api.writer.SingleRecordWriter;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.BufferPoolOwner;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
import org.apache.flink.runtime.io.network.partition.NoOpBufferAvailablityListener;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionBuilder;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.function.FunctionWithException;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class RecordWriterDelegateTest
extends TestLogger {
    private static final int numberOfBuffers = 10;
    private static final int memorySegmentSize = 128;
    private static final int numberOfSegmentsToRequest = 2;
    private NetworkBufferPool globalPool;

    @Before
    public void setup() {
        this.globalPool = new NetworkBufferPool(10, 128, 2);
    }

    @After
    public void teardown() {
        this.globalPool.destroyAllBufferPools();
        this.globalPool.destroy();
    }

    @Test
    public void testSingleRecordWriterAvailability() throws Exception {
        RecordWriter recordWriter = this.createRecordWriter(this.globalPool);
        SingleRecordWriter writerDelegate = new SingleRecordWriter(recordWriter);
        Assert.assertEquals((Object)recordWriter, (Object)writerDelegate.getRecordWriter(0));
        this.verifyAvailability((RecordWriterDelegate)writerDelegate);
    }

    @Test
    public void testMultipleRecordWritersAvailability() throws Exception {
        int numRecordWriters = 2;
        ArrayList<RecordWriter> recordWriters = new ArrayList<RecordWriter>(2);
        for (int i = 0; i < 2; ++i) {
            recordWriters.add(this.createRecordWriter(this.globalPool));
        }
        MultipleRecordWriters writerDelegate = new MultipleRecordWriters(recordWriters);
        for (int i = 0; i < 2; ++i) {
            Assert.assertEquals(recordWriters.get(i), (Object)writerDelegate.getRecordWriter(i));
        }
        this.verifyAvailability((RecordWriterDelegate)writerDelegate);
    }

    @Test
    public void testSingleRecordWriterBroadcastEvent() throws Exception {
        ArrayDeque[] queues = new ArrayDeque[]{new ArrayDeque(), new ArrayDeque()};
        RecordWriter recordWriter = this.createRecordWriter(queues);
        SingleRecordWriter writerDelegate = new SingleRecordWriter(recordWriter);
        this.verifyBroadcastEvent((RecordWriterDelegate)writerDelegate, queues, 1);
    }

    @Test
    public void testMultipleRecordWritersBroadcastEvent() throws Exception {
        int numRecordWriters = 2;
        ArrayList<RecordWriter> recordWriters = new ArrayList<RecordWriter>(2);
        ArrayDeque[] queues = new ArrayDeque[]{new ArrayDeque(), new ArrayDeque()};
        for (int i = 0; i < 2; ++i) {
            recordWriters.add(this.createRecordWriter(queues));
        }
        MultipleRecordWriters writerDelegate = new MultipleRecordWriters(recordWriters);
        this.verifyBroadcastEvent((RecordWriterDelegate)writerDelegate, queues, 2);
    }

    private RecordWriter createRecordWriter(NetworkBufferPool globalPool) throws Exception {
        BufferPool localPool = globalPool.createBufferPool(1, 1, null, 1, Integer.MAX_VALUE);
        ResultPartition partition = new ResultPartitionBuilder().setBufferPoolFactory((FunctionWithException<BufferPoolOwner, BufferPool, IOException>)((FunctionWithException)p -> localPool)).build();
        partition.setup();
        return new RecordWriterBuilder().build((ResultPartitionWriter)partition);
    }

    private RecordWriter createRecordWriter(ArrayDeque<BufferConsumer>[] queues) {
        RecordWriterTest.CollectingPartitionWriter partition = new RecordWriterTest.CollectingPartitionWriter(queues, new TestPooledBufferProvider(1));
        return new RecordWriterBuilder().build((ResultPartitionWriter)partition);
    }

    private void verifyAvailability(RecordWriterDelegate writerDelegate) throws Exception {
        Assert.assertTrue((boolean)writerDelegate.isAvailable());
        Assert.assertTrue((boolean)writerDelegate.getAvailableFuture().isDone());
        RecordWriter recordWriter = writerDelegate.getRecordWriter(0);
        BufferBuilder bufferBuilder = (BufferBuilder)Preconditions.checkNotNull((Object)recordWriter.getBufferBuilder(0));
        Assert.assertFalse((boolean)writerDelegate.isAvailable());
        CompletableFuture future = writerDelegate.getAvailableFuture();
        Assert.assertFalse((boolean)future.isDone());
        BufferBuilderTestUtils.fillBufferBuilder(bufferBuilder, 1).finish();
        ResultSubpartitionView readView = recordWriter.getTargetPartition().getSubpartition(0).createReadView((BufferAvailabilityListener)new NoOpBufferAvailablityListener());
        Buffer buffer = readView.getNextBuffer().buffer();
        buffer.recycleBuffer();
        Assert.assertTrue((boolean)future.isDone());
        Assert.assertTrue((boolean)writerDelegate.isAvailable());
        Assert.assertTrue((boolean)writerDelegate.getAvailableFuture().isDone());
    }

    private void verifyBroadcastEvent(RecordWriterDelegate writerDelegate, ArrayDeque<BufferConsumer>[] queues, int numRecordWriters) throws Exception {
        CancelCheckpointMarker message = new CancelCheckpointMarker(1L);
        writerDelegate.broadcastEvent((AbstractEvent)message);
        for (int i = 0; i < queues.length; ++i) {
            Assert.assertEquals((long)numRecordWriters, (long)queues[i].size());
            for (int j = 0; j < numRecordWriters; ++j) {
                BufferOrEvent boe = RecordWriterTest.parseBuffer(queues[i].remove(), i);
                Assert.assertTrue((boolean)boe.isEvent());
                Assert.assertEquals((Object)message, (Object)boe.getEvent());
            }
        }
    }
}

