/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.source.coordinator;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.SortedMap;
import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
import org.apache.flink.api.connector.source.mocks.MockSourceSplitSerializer;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.source.coordinator.CoordinatorTestUtils;
import org.apache.flink.runtime.source.coordinator.SplitAssignmentTracker;
import org.junit.Assert;
import org.junit.Test;

public class SplitAssignmentTrackerTest {
    @Test
    public void testRecordIncrementalSplitAssignment() {
        SplitAssignmentTracker tracker = new SplitAssignmentTracker();
        tracker.recordSplitAssignment(CoordinatorTestUtils.getSplitsAssignment(3, 0));
        tracker.recordSplitAssignment(CoordinatorTestUtils.getSplitsAssignment(2, 6));
        CoordinatorTestUtils.verifyAssignment(Arrays.asList("0", "6"), (Collection)tracker.uncheckpointedAssignments().get(0));
        CoordinatorTestUtils.verifyAssignment(Arrays.asList("1", "2", "7", "8"), (Collection)tracker.uncheckpointedAssignments().get(1));
        CoordinatorTestUtils.verifyAssignment(Arrays.asList("3", "4", "5"), (Collection)tracker.uncheckpointedAssignments().get(2));
    }

    @Test
    public void testTakeSnapshot() throws Exception {
        long checkpointId = 123L;
        SplitAssignmentTracker tracker = new SplitAssignmentTracker();
        tracker.recordSplitAssignment(CoordinatorTestUtils.getSplitsAssignment(3, 0));
        this.takeSnapshot((SplitAssignmentTracker<MockSourceSplit>)tracker, 123L);
        Assert.assertTrue((boolean)tracker.uncheckpointedAssignments().isEmpty());
        SortedMap assignmentsByCheckpoints = tracker.assignmentsByCheckpointId();
        Assert.assertEquals((long)1L, (long)assignmentsByCheckpoints.size());
        Map assignmentForCheckpoint = (Map)assignmentsByCheckpoints.get(123L);
        Assert.assertNotNull((Object)assignmentForCheckpoint);
        CoordinatorTestUtils.verifyAssignment(Arrays.asList("0"), (Collection)assignmentForCheckpoint.get(0));
        CoordinatorTestUtils.verifyAssignment(Arrays.asList("1", "2"), (Collection)assignmentForCheckpoint.get(1));
        CoordinatorTestUtils.verifyAssignment(Arrays.asList("3", "4", "5"), (Collection)assignmentForCheckpoint.get(2));
    }

    @Test
    public void testRestore() throws Exception {
        long checkpointId = 123L;
        SplitAssignmentTracker tracker = new SplitAssignmentTracker();
        tracker.recordSplitAssignment(CoordinatorTestUtils.getSplitsAssignment(1, 0));
        byte[] bytes = this.takeSnapshot((SplitAssignmentTracker<MockSourceSplit>)tracker, 123L);
        SplitAssignmentTracker<MockSourceSplit> deserializedTracker = this.restoreSnapshot(bytes);
        Assert.assertEquals((Object)deserializedTracker.assignmentsByCheckpointId(), (Object)tracker.assignmentsByCheckpointId());
        Assert.assertEquals((Object)deserializedTracker.uncheckpointedAssignments(), (Object)tracker.uncheckpointedAssignments());
    }

    @Test
    public void testOnCheckpointComplete() throws Exception {
        long checkpointId1 = 100L;
        long checkpointId2 = 101L;
        SplitAssignmentTracker tracker = new SplitAssignmentTracker();
        tracker.recordSplitAssignment(CoordinatorTestUtils.getSplitsAssignment(2, 0));
        this.takeSnapshot((SplitAssignmentTracker<MockSourceSplit>)tracker, 100L);
        CoordinatorTestUtils.verifyAssignment(Arrays.asList("0"), (Collection)tracker.assignmentsByCheckpointId(100L).get(0));
        CoordinatorTestUtils.verifyAssignment(Arrays.asList("1", "2"), (Collection)tracker.assignmentsByCheckpointId(100L).get(1));
        tracker.recordSplitAssignment(CoordinatorTestUtils.getSplitsAssignment(2, 3));
        this.takeSnapshot((SplitAssignmentTracker<MockSourceSplit>)tracker, 101L);
        CoordinatorTestUtils.verifyAssignment(Arrays.asList("0"), (Collection)tracker.assignmentsByCheckpointId(100L).get(0));
        CoordinatorTestUtils.verifyAssignment(Arrays.asList("1", "2"), (Collection)tracker.assignmentsByCheckpointId(100L).get(1));
        CoordinatorTestUtils.verifyAssignment(Arrays.asList("3"), (Collection)tracker.assignmentsByCheckpointId(101L).get(0));
        CoordinatorTestUtils.verifyAssignment(Arrays.asList("4", "5"), (Collection)tracker.assignmentsByCheckpointId(101L).get(1));
        tracker.onCheckpointComplete(100L);
        Assert.assertNull((Object)tracker.assignmentsByCheckpointId(100L));
        CoordinatorTestUtils.verifyAssignment(Arrays.asList("3"), (Collection)tracker.assignmentsByCheckpointId(101L).get(0));
        CoordinatorTestUtils.verifyAssignment(Arrays.asList("4", "5"), (Collection)tracker.assignmentsByCheckpointId(101L).get(1));
    }

    @Test
    public void testGetAndRemoveUncheckpointedAssignment() throws Exception {
        long checkpointId1 = 100L;
        long checkpointId2 = 101L;
        SplitAssignmentTracker tracker = new SplitAssignmentTracker();
        tracker.recordSplitAssignment(CoordinatorTestUtils.getSplitsAssignment(2, 0));
        this.takeSnapshot((SplitAssignmentTracker<MockSourceSplit>)tracker, 100L);
        tracker.recordSplitAssignment(CoordinatorTestUtils.getSplitsAssignment(2, 3));
        this.takeSnapshot((SplitAssignmentTracker<MockSourceSplit>)tracker, 101L);
        List splitsToPutBack = tracker.getAndRemoveUncheckpointedAssignment(0, 99L);
        CoordinatorTestUtils.verifyAssignment(Arrays.asList("0", "3"), splitsToPutBack);
    }

    @Test
    public void testGetAndRemoveSplitsAfterSomeCheckpoint() throws Exception {
        long checkpointId1 = 100L;
        long checkpointId2 = 101L;
        SplitAssignmentTracker tracker = new SplitAssignmentTracker();
        tracker.recordSplitAssignment(CoordinatorTestUtils.getSplitsAssignment(2, 0));
        this.takeSnapshot((SplitAssignmentTracker<MockSourceSplit>)tracker, 100L);
        tracker.recordSplitAssignment(CoordinatorTestUtils.getSplitsAssignment(2, 3));
        this.takeSnapshot((SplitAssignmentTracker<MockSourceSplit>)tracker, 101L);
        List splitsToPutBack = tracker.getAndRemoveUncheckpointedAssignment(0, 100L);
        CoordinatorTestUtils.verifyAssignment(Collections.singletonList("3"), splitsToPutBack);
    }

    private byte[] takeSnapshot(SplitAssignmentTracker<MockSourceSplit> tracker, long checkpointId) throws Exception {
        byte[] bytes;
        try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
             DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper((OutputStream)baos);){
            tracker.snapshotState(checkpointId, (SimpleVersionedSerializer)new MockSourceSplitSerializer(), (DataOutputStream)out);
            out.flush();
            bytes = baos.toByteArray();
        }
        return bytes;
    }

    private SplitAssignmentTracker<MockSourceSplit> restoreSnapshot(byte[] bytes) throws Exception {
        SplitAssignmentTracker deserializedTracker;
        try (ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
             DataInputViewStreamWrapper in = new DataInputViewStreamWrapper((InputStream)bais);){
            deserializedTracker = new SplitAssignmentTracker();
            deserializedTracker.restoreState((SimpleVersionedSerializer)new MockSourceSplitSerializer(), (DataInputStream)in);
        }
        return deserializedTracker;
    }
}

