/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.source.coordinator;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.OutputStream;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import org.apache.flink.api.connector.source.ReaderInfo;
import org.apache.flink.api.connector.source.SplitsAssignment;
import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
import org.apache.flink.api.connector.source.mocks.MockSourceSplitSerializer;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.source.coordinator.CoordinatorTestUtils;
import org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext;
import org.apache.flink.runtime.source.coordinator.SourceCoordinatorProvider;
import org.apache.flink.runtime.source.coordinator.SourceCoordinatorTestBase;
import org.apache.flink.runtime.source.coordinator.SplitAssignmentTracker;
import org.apache.flink.runtime.source.event.AddSplitEvent;
import org.apache.flink.util.function.ThrowingRunnable;
import org.junit.Assert;
import org.junit.Test;

public class SourceCoordinatorContextTest
extends SourceCoordinatorTestBase {
    @Test
    public void testRegisterReader() {
        List<ReaderInfo> readerInfo = this.registerReaders();
        Assert.assertTrue((boolean)this.context.registeredReaders().containsKey(0));
        Assert.assertTrue((boolean)this.context.registeredReaders().containsKey(1));
        Assert.assertEquals((Object)readerInfo.get(0), this.context.registeredReaders().get(0));
        Assert.assertEquals((Object)readerInfo.get(1), this.context.registeredReaders().get(1));
    }

    @Test
    public void testUnregisterReader() {
        List<ReaderInfo> readerInfo = this.registerReaders();
        Assert.assertEquals((Object)readerInfo.get(0), this.context.registeredReaders().get(0));
        this.context.unregisterSourceReader(0);
        Assert.assertEquals((String)"Only reader 2 should be registered.", (long)2L, (long)this.context.registeredReaders().size());
        Assert.assertNull(this.context.registeredReaders().get(0));
        Assert.assertEquals((Object)readerInfo.get(1), this.context.registeredReaders().get(1));
        Assert.assertEquals((Object)readerInfo.get(2), this.context.registeredReaders().get(2));
    }

    @Test
    public void testUnregisterUnregisteredReader() {
        this.context.unregisterSourceReader(0);
    }

    @Test
    public void testAssignSplitsFromCoordinatorExecutor() throws Exception {
        this.testAssignSplits(true);
    }

    @Test
    public void testAssignSplitsFromOtherThread() throws Exception {
        this.testAssignSplits(false);
    }

    private void testAssignSplits(boolean fromCoordinatorExecutor) throws Exception {
        this.registerReaders();
        SplitsAssignment<MockSourceSplit> splitsAssignment = CoordinatorTestUtils.getSplitsAssignment(2, 0);
        if (fromCoordinatorExecutor) {
            this.coordinatorExecutor.submit(() -> this.context.assignSplits(splitsAssignment)).get();
        } else {
            this.context.assignSplits(splitsAssignment);
        }
        CoordinatorTestUtils.verifyAssignment(Arrays.asList("0"), (Collection)this.splitSplitAssignmentTracker.uncheckpointedAssignments().get(0));
        CoordinatorTestUtils.verifyAssignment(Arrays.asList("1", "2"), (Collection)this.splitSplitAssignmentTracker.uncheckpointedAssignments().get(1));
        Assert.assertEquals((String)"There should be two events sent to the subtasks.", (long)2L, (long)this.operatorCoordinatorContext.getEventsToOperator().size());
        List<OperatorEvent> eventsToSubtask0 = this.operatorCoordinatorContext.getEventsToOperatorBySubtaskId(0);
        Assert.assertEquals((long)1L, (long)eventsToSubtask0.size());
        OperatorEvent event = eventsToSubtask0.get(0);
        Assert.assertTrue((boolean)(event instanceof AddSplitEvent));
        CoordinatorTestUtils.verifyAssignment(Arrays.asList("0"), ((AddSplitEvent)event).splits((SimpleVersionedSerializer)new MockSourceSplitSerializer()));
    }

    @Test
    public void testAssignSplitToUnregisteredReaderFromCoordinatorExecutor() {
        this.testAssignSplitToUnregisterdReader(true);
    }

    @Test
    public void testAssignSplitToUnregisteredReaderFromOtherThread() {
        this.testAssignSplitToUnregisterdReader(false);
    }

    private void testAssignSplitToUnregisterdReader(boolean fromCoordinatorExecutor) {
        SplitsAssignment<MockSourceSplit> splitsAssignment = CoordinatorTestUtils.getSplitsAssignment(2, 0);
        CoordinatorTestUtils.verifyException((ThrowingRunnable<Throwable>)((ThrowingRunnable)() -> {
            if (fromCoordinatorExecutor) {
                this.coordinatorExecutor.submit(() -> this.context.assignSplits(splitsAssignment)).get();
            } else {
                this.context.assignSplits(splitsAssignment);
            }
        }), "assignSplits() should fail to assign the splits to a reader that is not registered.", "Cannot assign splits");
    }

    @Test
    public void testSnapshotAndRestore() throws Exception {
        SourceCoordinatorContext restoredContext;
        this.registerReaders();
        SplitsAssignment<MockSourceSplit> splitsAssignment = CoordinatorTestUtils.getSplitsAssignment(2, 0);
        this.coordinatorExecutor.submit(() -> this.context.assignSplits(splitsAssignment)).get();
        byte[] bytes = this.takeSnapshot((SourceCoordinatorContext<MockSourceSplit>)this.context, 100L);
        SplitAssignmentTracker restoredTracker = new SplitAssignmentTracker();
        SourceCoordinatorProvider.CoordinatorExecutorThreadFactory coordinatorThreadFactory = new SourceCoordinatorProvider.CoordinatorExecutorThreadFactory(TEST_OPERATOR_ID.toHexString(), this.getClass().getClassLoader());
        try (ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
             DataInputStream in = new DataInputStream(bais);){
            restoredContext = new SourceCoordinatorContext(this.coordinatorExecutor, coordinatorThreadFactory, 1, (OperatorCoordinator.Context)this.operatorCoordinatorContext, (SimpleVersionedSerializer)new MockSourceSplitSerializer(), restoredTracker);
            restoredContext.restoreState((SimpleVersionedSerializer)new MockSourceSplitSerializer(), in);
        }
        Assert.assertEquals((Object)this.context.registeredReaders(), (Object)restoredContext.registeredReaders());
        Assert.assertEquals((Object)this.splitSplitAssignmentTracker.uncheckpointedAssignments(), (Object)restoredTracker.uncheckpointedAssignments());
        Assert.assertEquals((Object)this.splitSplitAssignmentTracker.assignmentsByCheckpointId(), (Object)restoredTracker.assignmentsByCheckpointId());
    }

    private List<ReaderInfo> registerReaders() {
        ReaderInfo readerInfo0 = new ReaderInfo(0, "subtask_0_location");
        ReaderInfo readerInfo1 = new ReaderInfo(1, "subtask_1_location");
        ReaderInfo readerInfo2 = new ReaderInfo(2, "subtask_1_location");
        this.context.registerSourceReader(readerInfo0);
        this.context.registerSourceReader(readerInfo1);
        this.context.registerSourceReader(readerInfo2);
        return Arrays.asList(readerInfo0, readerInfo1, readerInfo2);
    }

    private byte[] takeSnapshot(SourceCoordinatorContext<MockSourceSplit> context, long checkpointId) throws Exception {
        byte[] bytes;
        try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
             DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper((OutputStream)baos);){
            context.snapshotState(checkpointId, (SimpleVersionedSerializer)new MockSourceSplitSerializer(), (DataOutputStream)out);
            out.flush();
            bytes = baos.toByteArray();
        }
        return bytes;
    }
}

