/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.operators.coordination;

import java.util.concurrent.CompletableFuture;
import org.apache.flink.runtime.operators.coordination.EventReceivingTasks;
import org.apache.flink.runtime.operators.coordination.OperatorEventValve;
import org.apache.flink.runtime.operators.coordination.TestOperatorEvent;
import org.apache.flink.util.FlinkException;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;

public class OperatorEventValveTest {
    @Test
    public void eventsPassThroughOpenValve() {
        EventReceivingTasks sender = EventReceivingTasks.createForRunningTasks();
        OperatorEventValve valve = new OperatorEventValve();
        TestOperatorEvent event = new TestOperatorEvent();
        CompletableFuture future = new CompletableFuture();
        valve.sendEvent(sender.createSendAction(event, 11), future);
        Assert.assertThat(sender.events, (Matcher)Matchers.contains((Object[])new EventReceivingTasks.EventWithSubtask[]{new EventReceivingTasks.EventWithSubtask(event, 11)}));
        Assert.assertTrue((boolean)future.isDone());
    }

    @Test
    public void shuttingMarkedValve() {
        OperatorEventValve valve = new OperatorEventValve();
        valve.markForCheckpoint(200L);
        boolean shut = valve.tryShutValve(200L);
        Assert.assertTrue((boolean)shut);
    }

    @Test
    public void notShuttingUnmarkedValve() {
        OperatorEventValve valve = new OperatorEventValve();
        boolean shut = valve.tryShutValve(123L);
        Assert.assertFalse((boolean)shut);
    }

    @Test
    public void notShuttingValveForOtherMark() {
        OperatorEventValve valve = new OperatorEventValve();
        valve.markForCheckpoint(100L);
        boolean shut = valve.tryShutValve(123L);
        Assert.assertFalse((boolean)shut);
    }

    @Test
    public void eventsBlockedByClosedValve() {
        EventReceivingTasks sender = EventReceivingTasks.createForRunningTasks();
        OperatorEventValve valve = new OperatorEventValve();
        valve.markForCheckpoint(1L);
        valve.tryShutValve(1L);
        CompletableFuture future = new CompletableFuture();
        valve.sendEvent(sender.createSendAction(new TestOperatorEvent(), 1), future);
        Assert.assertTrue((boolean)sender.events.isEmpty());
        Assert.assertFalse((boolean)future.isDone());
    }

    @Test
    public void eventsReleasedAfterOpeningValve() {
        EventReceivingTasks sender = EventReceivingTasks.createForRunningTasks();
        OperatorEventValve valve = new OperatorEventValve();
        valve.markForCheckpoint(17L);
        valve.tryShutValve(17L);
        TestOperatorEvent event1 = new TestOperatorEvent();
        TestOperatorEvent event2 = new TestOperatorEvent();
        CompletableFuture future1 = new CompletableFuture();
        valve.sendEvent(sender.createSendAction(event1, 3), future1);
        CompletableFuture future2 = new CompletableFuture();
        valve.sendEvent(sender.createSendAction(event2, 0), future2);
        valve.openValveAndUnmarkCheckpoint();
        Assert.assertThat(sender.events, (Matcher)Matchers.contains((Object[])new EventReceivingTasks.EventWithSubtask[]{new EventReceivingTasks.EventWithSubtask(event1, 3), new EventReceivingTasks.EventWithSubtask(event2, 0)}));
        Assert.assertTrue((boolean)future1.isDone());
        Assert.assertTrue((boolean)future2.isDone());
    }

    @Test
    public void releasedEventsForwardSendFailures() {
        EventReceivingTasks sender = EventReceivingTasks.createForRunningTasksFailingRpcs((Throwable)new FlinkException("test"));
        OperatorEventValve valve = new OperatorEventValve();
        valve.markForCheckpoint(17L);
        valve.tryShutValve(17L);
        CompletableFuture future = new CompletableFuture();
        valve.sendEvent(sender.createSendAction(new TestOperatorEvent(), 10), future);
        valve.openValveAndUnmarkCheckpoint();
        Assert.assertTrue((boolean)future.isCompletedExceptionally());
    }
}

