/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.operators.coordination;

import java.util.concurrent.CompletableFuture;
import java.util.function.BiFunction;
import org.apache.flink.runtime.operators.coordination.OperatorEventValve;
import org.apache.flink.runtime.operators.coordination.TestEventSender;
import org.apache.flink.runtime.operators.coordination.TestOperatorEvent;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.SerializedValue;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;

public class OperatorEventValveTest {
    @Test
    public void eventsPassThroughOpenValve() throws Exception {
        TestEventSender sender = new TestEventSender();
        OperatorEventValve valve = new OperatorEventValve((BiFunction)sender);
        TestOperatorEvent event = new TestOperatorEvent();
        CompletableFuture future = valve.sendEvent(new SerializedValue((Object)event), 11);
        Assert.assertThat(sender.events, (Matcher)Matchers.contains((Object[])new TestEventSender.EventWithSubtask[]{new TestEventSender.EventWithSubtask(event, 11)}));
        Assert.assertTrue((boolean)future.isDone());
    }

    @Test(expected=IllegalStateException.class)
    public void errorShuttingUnmarkedValve() throws Exception {
        TestEventSender sender = new TestEventSender();
        OperatorEventValve valve = new OperatorEventValve((BiFunction)sender);
        valve.shutValve(123L);
    }

    @Test(expected=IllegalStateException.class)
    public void errorShuttingValveForOtherMark() throws Exception {
        TestEventSender sender = new TestEventSender();
        OperatorEventValve valve = new OperatorEventValve((BiFunction)sender);
        valve.markForCheckpoint(100L);
        valve.shutValve(123L);
    }

    @Test
    public void eventsBlockedByClosedValve() throws Exception {
        TestEventSender sender = new TestEventSender();
        OperatorEventValve valve = new OperatorEventValve((BiFunction)sender);
        valve.markForCheckpoint(1L);
        valve.shutValve(1L);
        CompletableFuture future = valve.sendEvent(new SerializedValue((Object)new TestOperatorEvent()), 1);
        Assert.assertTrue((boolean)sender.events.isEmpty());
        Assert.assertFalse((boolean)future.isDone());
    }

    @Test
    public void eventsReleasedAfterOpeningValve() throws Exception {
        TestEventSender sender = new TestEventSender();
        OperatorEventValve valve = new OperatorEventValve((BiFunction)sender);
        valve.markForCheckpoint(17L);
        valve.shutValve(17L);
        TestOperatorEvent event1 = new TestOperatorEvent();
        TestOperatorEvent event2 = new TestOperatorEvent();
        CompletableFuture future1 = valve.sendEvent(new SerializedValue((Object)event1), 3);
        CompletableFuture future2 = valve.sendEvent(new SerializedValue((Object)event2), 0);
        valve.openValveAndUnmarkCheckpoint();
        Assert.assertThat(sender.events, (Matcher)Matchers.containsInAnyOrder((Object[])new TestEventSender.EventWithSubtask[]{new TestEventSender.EventWithSubtask(event1, 3), new TestEventSender.EventWithSubtask(event2, 0)}));
        Assert.assertTrue((boolean)future1.isDone());
        Assert.assertTrue((boolean)future2.isDone());
    }

    @Test
    public void releasedEventsForwardSendFailures() throws Exception {
        TestEventSender sender = new TestEventSender((Throwable)new FlinkException("test"));
        OperatorEventValve valve = new OperatorEventValve((BiFunction)sender);
        valve.markForCheckpoint(17L);
        valve.shutValve(17L);
        CompletableFuture future = valve.sendEvent(new SerializedValue((Object)new TestOperatorEvent()), 10);
        valve.openValveAndUnmarkCheckpoint();
        Assert.assertTrue((boolean)future.isCompletedExceptionally());
    }

    @Test
    public void resetDropsAllEvents() throws Exception {
        TestEventSender sender = new TestEventSender();
        OperatorEventValve valve = new OperatorEventValve((BiFunction)sender);
        valve.markForCheckpoint(17L);
        valve.shutValve(17L);
        valve.sendEvent(new SerializedValue((Object)new TestOperatorEvent()), 0);
        valve.sendEvent(new SerializedValue((Object)new TestOperatorEvent()), 1);
        valve.reset();
        valve.openValveAndUnmarkCheckpoint();
        Assert.assertTrue((boolean)sender.events.isEmpty());
    }

    @Test
    public void resetForTaskDropsSelectiveEvents() throws Exception {
        TestEventSender sender = new TestEventSender();
        OperatorEventValve valve = new OperatorEventValve((BiFunction)sender);
        valve.markForCheckpoint(17L);
        valve.shutValve(17L);
        TestOperatorEvent event1 = new TestOperatorEvent();
        TestOperatorEvent event2 = new TestOperatorEvent();
        CompletableFuture future1 = valve.sendEvent(new SerializedValue((Object)event1), 0);
        CompletableFuture future2 = valve.sendEvent(new SerializedValue((Object)event2), 1);
        valve.resetForTask(1);
        valve.openValveAndUnmarkCheckpoint();
        Assert.assertThat(sender.events, (Matcher)Matchers.contains((Object[])new TestEventSender.EventWithSubtask[]{new TestEventSender.EventWithSubtask(event1, 0)}));
        Assert.assertTrue((boolean)future1.isDone());
        Assert.assertTrue((boolean)future2.isCompletedExceptionally());
    }

    @Test
    public void resetOpensValve() throws Exception {
        TestEventSender sender = new TestEventSender();
        OperatorEventValve valve = new OperatorEventValve((BiFunction)sender);
        valve.markForCheckpoint(17L);
        valve.shutValve(17L);
        valve.reset();
        Assert.assertFalse((boolean)valve.isShut());
    }
}

