package org.apache.flink.streaming.util;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.ProcessOperator;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import org.apache.flink.util.TestLogger;
import org.hamcrest.CoreMatchers;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/flink/streaming/util/AbstractStreamOperatorTestHarnessTest.class */
public class AbstractStreamOperatorTestHarnessTest extends TestLogger {

    @Rule
    public ExpectedException expectedException = ExpectedException.none();

    /* loaded from: input_file:org/apache/flink/streaming/util/AbstractStreamOperatorTestHarnessTest$SideOutputTypeInformationTestFunction.class */
    private static class SideOutputTypeInformationTestFunction extends ProcessFunction<Integer, Integer> {
        private final OutputTag<Integer> outputTag;

        SideOutputTypeInformationTestFunction(OutputTag<Integer> outputTag) {
            this.outputTag = outputTag;
        }

        public void processElement(Integer num, ProcessFunction<Integer, Integer>.Context context, Collector<Integer> collector) throws Exception {
            context.output(this.outputTag, num);
        }

        public /* bridge */ /* synthetic */ void processElement(Object obj, ProcessFunction.Context context, Collector collector) throws Exception {
            processElement((Integer) obj, (ProcessFunction<Integer, Integer>.Context) context, (Collector<Integer>) collector);
        }
    }

    @Test
    public void testInitializeAfterOpenning() throws Throwable {
        this.expectedException.expect(IllegalStateException.class);
        this.expectedException.expectMessage(CoreMatchers.containsString("TestHarness has already been initialized."));
        AbstractStreamOperatorTestHarness abstractStreamOperatorTestHarness = new AbstractStreamOperatorTestHarness((StreamOperator) new AbstractStreamOperator<Integer>() { // from class: org.apache.flink.streaming.util.AbstractStreamOperatorTestHarnessTest.1
        }, 1, 1, 0);
        abstractStreamOperatorTestHarness.setup();
        abstractStreamOperatorTestHarness.open();
        abstractStreamOperatorTestHarness.initializeState(OperatorSubtaskState.builder().build());
    }

    @Test
    public void testSetTtlTimeProvider() throws Exception {
        AbstractStreamOperator<Integer> abstractStreamOperator = new AbstractStreamOperator<Integer>() { // from class: org.apache.flink.streaming.util.AbstractStreamOperatorTestHarnessTest.2
        };
        AbstractStreamOperatorTestHarness abstractStreamOperatorTestHarness = new AbstractStreamOperatorTestHarness((StreamOperator) abstractStreamOperator, 1, 1, 0);
        Throwable th = null;
        try {
            try {
                abstractStreamOperatorTestHarness.config.setStateKeySerializer(IntSerializer.INSTANCE);
                Time hours = Time.hours(1L);
                abstractStreamOperatorTestHarness.initializeState(OperatorSubtaskState.builder().build());
                abstractStreamOperatorTestHarness.open();
                ValueStateDescriptor valueStateDescriptor = new ValueStateDescriptor("test", IntSerializer.INSTANCE);
                valueStateDescriptor.enableTimeToLive(StateTtlConfig.newBuilder(hours).build());
                KeyedStateBackend keyedStateBackend = abstractStreamOperator.getKeyedStateBackend();
                ValueState partitionedState = keyedStateBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, valueStateDescriptor);
                keyedStateBackend.setCurrentKey(1);
                abstractStreamOperatorTestHarness.setStateTtlProcessingTime(0L);
                partitionedState.update(42);
                Assert.assertEquals(42, ((Integer) partitionedState.value()).intValue());
                abstractStreamOperatorTestHarness.setStateTtlProcessingTime(hours.toMilliseconds() + 1);
                Assert.assertNull(partitionedState.value());
                if (abstractStreamOperatorTestHarness != null) {
                    if (0 == 0) {
                        abstractStreamOperatorTestHarness.close();
                        return;
                    }
                    try {
                        abstractStreamOperatorTestHarness.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (abstractStreamOperatorTestHarness != null) {
                if (th != null) {
                    try {
                        abstractStreamOperatorTestHarness.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    abstractStreamOperatorTestHarness.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testSideOutputTypeInformation() throws Throwable {
        TypeSerializer typeSerializer = (TypeSerializer) Mockito.spy(TypeSerializer.class);
        TypeInformation typeInformation = (TypeInformation) Mockito.spy(Types.INT);
        Mockito.when(typeInformation.createSerializer((ExecutionConfig) ArgumentMatchers.any(ExecutionConfig.class))).thenReturn(typeSerializer);
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness((OneInputStreamOperator) new ProcessOperator(new SideOutputTypeInformationTestFunction(new OutputTag("test", typeInformation))));
        oneInputStreamOperatorTestHarness.setup();
        oneInputStreamOperatorTestHarness.open();
        oneInputStreamOperatorTestHarness.processElement(12, 1000L);
        ((TypeSerializer) Mockito.verify(typeSerializer, Mockito.times(1))).copy(Integer.valueOf(ArgumentMatchers.eq(12)));
    }
}
