/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.operators.sort;

import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
import org.apache.flink.api.common.typeutils.base.IntComparator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.memory.MemoryManagerBuilder;
import org.apache.flink.runtime.operators.sort.UnilateralSortMerger;
import org.apache.flink.runtime.operators.testutils.DummyInvokable;
import org.apache.flink.runtime.operators.testutils.RandomIntPairGenerator;
import org.apache.flink.runtime.operators.testutils.TestData;
import org.apache.flink.runtime.operators.testutils.types.IntPair;
import org.apache.flink.runtime.operators.testutils.types.IntPairSerializer;
import org.apache.flink.util.MutableObjectIterator;
import org.apache.flink.util.TestLogger;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ExternalSortITCase
extends TestLogger {
    private static final Logger LOG = LoggerFactory.getLogger(ExternalSortITCase.class);
    private static final long SEED = 649180756312423613L;
    private static final int KEY_MAX = Integer.MAX_VALUE;
    private static final int VALUE_LENGTH = 114;
    private static final String VAL = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ";
    private static final int NUM_PAIRS = 200000;
    private static final int MEMORY_SIZE = 0x4E00000;
    private final AbstractInvokable parentTask = new DummyInvokable();
    private IOManager ioManager;
    private MemoryManager memoryManager;
    private TypeSerializerFactory<Tuple2<Integer, String>> pactRecordSerializer;
    private TypeComparator<Tuple2<Integer, String>> pactRecordComparator;
    private boolean testSuccess;

    @Before
    public void beforeTest() {
        this.memoryManager = MemoryManagerBuilder.newBuilder().setMemorySize(0x4E00000L).build();
        this.ioManager = new IOManagerAsync();
        this.pactRecordSerializer = TestData.getIntStringTupleSerializerFactory();
        this.pactRecordComparator = TestData.getIntStringTupleComparator();
    }

    @After
    public void afterTest() throws Exception {
        this.ioManager.close();
        if (this.memoryManager != null && this.testSuccess) {
            Assert.assertTrue((String)"Memory leak: not all segments have been returned to the memory manager.", (boolean)this.memoryManager.verifyEmpty());
            this.memoryManager.shutdown();
            this.memoryManager = null;
        }
    }

    @Test
    public void testInMemorySort() {
        try {
            IntComparator keyComparator = new IntComparator(true);
            TestData.TupleGenerator generator = new TestData.TupleGenerator(649180756312423613L, Integer.MAX_VALUE, 114, TestData.TupleGenerator.KeyMode.RANDOM, TestData.TupleGenerator.ValueMode.CONSTANT, VAL);
            TestData.TupleGeneratorIterator source = new TestData.TupleGeneratorIterator(generator, 200000);
            LOG.debug("Initializing sortmerger...");
            UnilateralSortMerger merger = new UnilateralSortMerger(this.memoryManager, this.ioManager, (MutableObjectIterator)source, this.parentTask, this.pactRecordSerializer, this.pactRecordComparator, 0.8205128205128205, 2, 0.9f, true, true);
            LOG.debug("Reading and sorting data...");
            MutableObjectIterator iterator = merger.getIterator();
            LOG.debug("Checking results...");
            int pairsEmitted = 1;
            Tuple2 rec1 = new Tuple2();
            Tuple2 rec2 = new Tuple2();
            rec1 = (Tuple2)iterator.next((Object)rec1);
            Assert.assertTrue((rec1 != null ? 1 : 0) != 0);
            while ((rec2 = (Tuple2)iterator.next((Object)rec2)) != null) {
                ++pairsEmitted;
                Assert.assertTrue((keyComparator.compare(rec1.f0, rec2.f0) <= 0 ? 1 : 0) != 0);
                Tuple2 tmp = rec1;
                rec1 = rec2;
                rec2 = tmp;
            }
            Assert.assertTrue((200000 == pairsEmitted ? 1 : 0) != 0);
            merger.close();
            this.testSuccess = true;
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testInMemorySortUsing10Buffers() {
        try {
            IntComparator keyComparator = new IntComparator(true);
            TestData.TupleGenerator generator = new TestData.TupleGenerator(649180756312423613L, Integer.MAX_VALUE, 114, TestData.TupleGenerator.KeyMode.RANDOM, TestData.TupleGenerator.ValueMode.CONSTANT, VAL);
            TestData.TupleGeneratorIterator source = new TestData.TupleGeneratorIterator(generator, 200000);
            LOG.debug("Initializing sortmerger...");
            UnilateralSortMerger merger = new UnilateralSortMerger(this.memoryManager, this.ioManager, (MutableObjectIterator)source, this.parentTask, this.pactRecordSerializer, this.pactRecordComparator, 0.8205128205128205, 10, 2, 0.9f, true, false);
            LOG.debug("Reading and sorting data...");
            MutableObjectIterator iterator = merger.getIterator();
            LOG.debug("Checking results...");
            int pairsEmitted = 1;
            Tuple2 rec1 = new Tuple2();
            Tuple2 rec2 = new Tuple2();
            rec1 = (Tuple2)iterator.next((Object)rec1);
            Assert.assertTrue((rec1 != null ? 1 : 0) != 0);
            while ((rec2 = (Tuple2)iterator.next((Object)rec2)) != null) {
                ++pairsEmitted;
                Assert.assertTrue((keyComparator.compare(rec1.f0, rec2.f0) <= 0 ? 1 : 0) != 0);
                Tuple2 tmp = rec1;
                rec1 = rec2;
                rec2 = tmp;
            }
            Assert.assertTrue((200000 == pairsEmitted ? 1 : 0) != 0);
            merger.close();
            this.testSuccess = true;
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testSpillingSort() {
        try {
            IntComparator keyComparator = new IntComparator(true);
            TestData.TupleGenerator generator = new TestData.TupleGenerator(649180756312423613L, Integer.MAX_VALUE, 114, TestData.TupleGenerator.KeyMode.RANDOM, TestData.TupleGenerator.ValueMode.CONSTANT, VAL);
            TestData.TupleGeneratorIterator source = new TestData.TupleGeneratorIterator(generator, 200000);
            LOG.debug("Initializing sortmerger...");
            UnilateralSortMerger merger = new UnilateralSortMerger(this.memoryManager, this.ioManager, (MutableObjectIterator)source, this.parentTask, this.pactRecordSerializer, this.pactRecordComparator, 0.20512820512820512, 64, 0.7f, true, true);
            LOG.debug("Reading and sorting data...");
            MutableObjectIterator iterator = merger.getIterator();
            LOG.debug("Checking results...");
            int pairsEmitted = 1;
            Tuple2 rec1 = new Tuple2();
            Tuple2 rec2 = new Tuple2();
            rec1 = (Tuple2)iterator.next((Object)rec1);
            Assert.assertTrue((rec1 != null ? 1 : 0) != 0);
            while ((rec2 = (Tuple2)iterator.next((Object)rec2)) != null) {
                ++pairsEmitted;
                Assert.assertTrue((keyComparator.compare(rec1.f0, rec2.f0) <= 0 ? 1 : 0) != 0);
                Tuple2 tmp = rec1;
                rec1 = rec2;
                rec2 = tmp;
            }
            Assert.assertTrue((200000 == pairsEmitted ? 1 : 0) != 0);
            merger.close();
            this.testSuccess = true;
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testSpillingSortWithIntermediateMerge() {
        try {
            int PAIRS = 10000000;
            IntComparator keyComparator = new IntComparator(true);
            TestData.TupleGenerator generator = new TestData.TupleGenerator(649180756312423613L, Integer.MAX_VALUE, 114, TestData.TupleGenerator.KeyMode.RANDOM, TestData.TupleGenerator.ValueMode.FIX_LENGTH);
            TestData.TupleGeneratorIterator source = new TestData.TupleGeneratorIterator(generator, 10000000);
            LOG.debug("Initializing sortmerger...");
            UnilateralSortMerger merger = new UnilateralSortMerger(this.memoryManager, this.ioManager, (MutableObjectIterator)source, this.parentTask, this.pactRecordSerializer, this.pactRecordComparator, 0.8205128205128205, 16, 0.7f, true, false);
            LOG.debug("Emitting data...");
            MutableObjectIterator iterator = merger.getIterator();
            LOG.debug("Checking results...");
            int pairsRead = 1;
            int nextStep = 500000;
            Tuple2 rec1 = new Tuple2();
            Tuple2 rec2 = new Tuple2();
            rec1 = (Tuple2)iterator.next((Object)rec1);
            Assert.assertTrue((rec1 != null ? 1 : 0) != 0);
            while ((rec2 = (Tuple2)iterator.next((Object)rec2)) != null) {
                Assert.assertTrue((keyComparator.compare(rec1.f0, rec2.f0) <= 0 ? 1 : 0) != 0);
                Tuple2 tmp = rec1;
                rec1 = rec2;
                rec2 = tmp;
                if (++pairsRead != nextStep) continue;
                nextStep += 500000;
            }
            Assert.assertEquals((String)"Not all pairs were read back in.", (long)10000000L, (long)pairsRead);
            merger.close();
            this.testSuccess = true;
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testSpillingSortWithIntermediateMergeIntPair() {
        try {
            int PAIRS = 50000000;
            RandomIntPairGenerator generator = new RandomIntPairGenerator(12345678L, 50000000L);
            IntPairSerializer.IntPairSerializerFactory serializerFactory = new IntPairSerializer.IntPairSerializerFactory();
            TestData.IntPairComparator comparator = new TestData.IntPairComparator();
            LOG.debug("Initializing sortmerger...");
            UnilateralSortMerger merger = new UnilateralSortMerger(this.memoryManager, this.ioManager, (MutableObjectIterator)generator, this.parentTask, (TypeSerializerFactory)serializerFactory, (TypeComparator)comparator, 0.8205128205128205, 4, 0.7f, true, true);
            LOG.debug("Emitting data...");
            MutableObjectIterator iterator = merger.getIterator();
            LOG.debug("Checking results...");
            int pairsRead = 1;
            int nextStep = 2500000;
            IntPair rec1 = new IntPair();
            IntPair rec2 = new IntPair();
            rec1 = (IntPair)iterator.next((Object)rec1);
            Assert.assertTrue((rec1 != null ? 1 : 0) != 0);
            while ((rec2 = (IntPair)iterator.next((Object)rec2)) != null) {
                int k1 = rec1.getKey();
                int k2 = rec2.getKey();
                Assert.assertTrue((k1 - k2 <= 0 ? 1 : 0) != 0);
                IntPair tmp = rec1;
                rec1 = rec2;
                rec2 = tmp;
                if (++pairsRead != nextStep) continue;
                nextStep += 2500000;
            }
            Assert.assertEquals((String)"Not all pairs were read back in.", (long)50000000L, (long)pairsRead);
            merger.close();
            this.testSuccess = true;
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }
}

