/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.source.coordinator;

import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.mocks.MockSource;
import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator;
import org.apache.flink.runtime.source.coordinator.SourceCoordinator;
import org.apache.flink.runtime.source.coordinator.SourceCoordinatorProvider;
import org.apache.flink.runtime.source.event.ReaderRegistrationEvent;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class SourceCoordinatorProviderTest {
    private static final OperatorID OPERATOR_ID = new OperatorID(1234L, 5678L);
    private static final int NUM_SPLITS = 10;
    private SourceCoordinatorProvider<MockSourceSplit> provider;

    @Before
    public void setup() {
        this.provider = new SourceCoordinatorProvider("SourceCoordinatorProviderTest", OPERATOR_ID, (Source)new MockSource(Boundedness.BOUNDED, 10), 1);
    }

    @Test
    public void testCreate() throws Exception {
        OperatorCoordinator coordinator = this.provider.create((OperatorCoordinator.Context)new MockOperatorCoordinatorContext(OPERATOR_ID, 10));
        Assert.assertTrue((boolean)(coordinator instanceof RecreateOnResetOperatorCoordinator));
    }

    @Test
    public void testCheckpointAndReset() throws Exception {
        MockOperatorCoordinatorContext context = new MockOperatorCoordinatorContext(OPERATOR_ID, 10);
        RecreateOnResetOperatorCoordinator coordinator = (RecreateOnResetOperatorCoordinator)this.provider.create((OperatorCoordinator.Context)context);
        SourceCoordinator sourceCoordinator = (SourceCoordinator)coordinator.getInternalCoordinator();
        coordinator.start();
        coordinator.handleEventFromOperator(0, (OperatorEvent)new ReaderRegistrationEvent(0, "location"));
        CompletableFuture future = new CompletableFuture();
        coordinator.checkpointCoordinator(0L, future);
        byte[] bytes = (byte[])future.get();
        coordinator.handleEventFromOperator(1, (OperatorEvent)new ReaderRegistrationEvent(1, "location"));
        while (sourceCoordinator.getContext().registeredReaders().size() < 2) {
            Thread.sleep(1L);
        }
        coordinator.resetToCheckpoint(0L, bytes);
        SourceCoordinator restoredSourceCoordinator = (SourceCoordinator)coordinator.getInternalCoordinator();
        Assert.assertNotEquals((String)"The restored source coordinator should be a different instance", (Object)restoredSourceCoordinator, (Object)sourceCoordinator);
        Assert.assertEquals((String)"There should only be one registered reader.", (long)1L, (long)restoredSourceCoordinator.getContext().registeredReaders().size());
        Assert.assertNotNull((String)"The only registered reader should be reader 0", restoredSourceCoordinator.getContext().registeredReaders().get(0));
    }

    @Test
    public void testCallAsyncExceptionFailsJob() throws Exception {
        MockOperatorCoordinatorContext context = new MockOperatorCoordinatorContext(OPERATOR_ID, 10);
        RecreateOnResetOperatorCoordinator coordinator = (RecreateOnResetOperatorCoordinator)this.provider.create((OperatorCoordinator.Context)context);
        SourceCoordinator sourceCoordinator = (SourceCoordinator)coordinator.getInternalCoordinator();
        sourceCoordinator.getContext().callAsync(() -> null, (ignored, e) -> {
            throw new RuntimeException();
        });
        CommonTestUtils.waitUtil(context::isJobFailed, (Duration)Duration.ofSeconds(10L), (String)"The job did not fail before timeout.");
    }
}

