/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.operators.coordination;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.accumulators.ListAccumulator;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.PrioritizedOperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.StateObjectCollection;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.coordination.TaskNotRunningException;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.OperatorStreamStateHandle;
import org.apache.flink.runtime.state.StateObject;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.TaskStateManager;
import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
import org.apache.flink.shaded.guava18.com.google.common.collect.Iterators;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;

public class CoordinatorEventsExactlyOnceITCase
extends TestLogger {
    private static final ConfigOption<String> ACC_NAME = ConfigOptions.key((String)"acc").stringType().noDefaultValue();
    private static final String OPERATOR_1_ACCUMULATOR = "op-acc-1";
    private static final String OPERATOR_2_ACCUMULATOR = "op-acc-2";
    private static MiniCluster miniCluster;

    @BeforeClass
    public static void startMiniCluster() throws Exception {
        Configuration config = new Configuration();
        config.setString(RestOptions.BIND_PORT, "0");
        MiniClusterConfiguration clusterCfg = new MiniClusterConfiguration.Builder().setNumTaskManagers(2).setNumSlotsPerTaskManager(1).setConfiguration(config).build();
        miniCluster = new MiniCluster(clusterCfg);
        miniCluster.start();
    }

    @AfterClass
    public static void shutdownMiniCluster() throws Exception {
        miniCluster.close();
    }

    @Test
    @Ignore
    public void test() throws Exception {
        int numEvents1 = 200;
        int numEvents2 = 5;
        boolean delay1 = true;
        int delay2 = 200;
        JobVertex task1 = CoordinatorEventsExactlyOnceITCase.buildJobVertex("TASK_1", 200, 1, OPERATOR_1_ACCUMULATOR);
        JobVertex task2 = CoordinatorEventsExactlyOnceITCase.buildJobVertex("TASK_2", 5, 200, OPERATOR_2_ACCUMULATOR);
        JobGraph jobGraph = new JobGraph("Coordinator Events Job", new JobVertex[]{task1, task2});
        jobGraph.setSnapshotSettings(CoordinatorEventsExactlyOnceITCase.createCheckpointSettings(task1, task2));
        JobExecutionResult result = miniCluster.executeJobBlocking(jobGraph);
        CoordinatorEventsExactlyOnceITCase.checkListContainsSequence((List)result.getAccumulatorResult(OPERATOR_2_ACCUMULATOR), 5);
        CoordinatorEventsExactlyOnceITCase.checkListContainsSequence((List)result.getAccumulatorResult(OPERATOR_1_ACCUMULATOR), 200);
    }

    private static void checkListContainsSequence(List<Integer> ints, int length) {
        if (ints.size() != length) {
            CoordinatorEventsExactlyOnceITCase.failList(ints, length);
        }
        int nextExpected = 0;
        for (int next : ints) {
            if (next == nextExpected++) continue;
            CoordinatorEventsExactlyOnceITCase.failList(ints, length);
        }
    }

    private static void failList(List<Integer> ints, int length) {
        Assert.fail((String)("List did not contain expected sequence of " + length + " elements, but was: " + ints));
    }

    private static JobVertex buildJobVertex(String name, final int numEvents, final int delay, String accName) throws IOException {
        JobVertex vertex = new JobVertex(name);
        final OperatorID opId = OperatorID.fromJobVertexID((JobVertexID)vertex.getID());
        vertex.setParallelism(1);
        vertex.setInvokableClass(EventCollectingTask.class);
        vertex.getConfiguration().setString(ACC_NAME, accName);
        OperatorCoordinator.Provider provider = new OperatorCoordinator.Provider(){

            public OperatorID getOperatorId() {
                return opId;
            }

            public OperatorCoordinator create(OperatorCoordinator.Context context) {
                return new EventSendingCoordinator(context, numEvents, delay);
            }
        };
        vertex.addOperatorCoordinator(new SerializedValue((Object)provider));
        return vertex;
    }

    private static JobCheckpointingSettings createCheckpointSettings(JobVertex ... vertices) {
        List ids = Arrays.stream(vertices).map(JobVertex::getID).collect(Collectors.toList());
        CheckpointCoordinatorConfiguration coordCfg = new CheckpointCoordinatorConfiguration.CheckpointCoordinatorConfigurationBuilder().setMaxConcurrentCheckpoints(1).setCheckpointInterval(10L).setCheckpointTimeout(100000L).build();
        return new JobCheckpointingSettings(ids, ids, ids, coordCfg, null);
    }

    static byte[] intToBytes(int value) {
        byte[] bytes = new byte[4];
        ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN).putInt(0, value);
        return bytes;
    }

    static int bytesToInt(byte[] bytes) {
        Assert.assertEquals((long)4L, (long)bytes.length);
        return ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN).getInt(0);
    }

    static ByteStreamStateHandle stateToHandle(List<Integer> state) throws IOException {
        byte[] bytes = InstantiationUtil.serializeObject(state);
        return new ByteStreamStateHandle("state", bytes);
    }

    static List<Integer> handleToState(StreamStateHandle handle) throws IOException, ClassNotFoundException {
        ByteStreamStateHandle byteHandle = (ByteStreamStateHandle)handle;
        return (List)InstantiationUtil.deserializeObject((byte[])byteHandle.getData(), (ClassLoader)EventCollectingTask.class.getClassLoader());
    }

    static TaskStateSnapshot createSnapshot(StreamStateHandle handle, OperatorID operatorId) {
        OperatorStateHandle.StateMetaInfo metaInfo = new OperatorStateHandle.StateMetaInfo(new long[]{0L}, OperatorStateHandle.Mode.SPLIT_DISTRIBUTE);
        OperatorStreamStateHandle state = new OperatorStreamStateHandle(Collections.singletonMap("\u00e9tat_et_moi_:_\u00e7a_fait_deux", metaInfo), handle);
        OperatorSubtaskState oss = new OperatorSubtaskState(StateObjectCollection.singleton((StateObject)state), StateObjectCollection.empty(), StateObjectCollection.empty(), StateObjectCollection.empty());
        return new TaskStateSnapshot(Collections.singletonMap(operatorId, oss));
    }

    @Nullable
    static StreamStateHandle readSnapshot(TaskStateManager stateManager, OperatorID operatorId) {
        PrioritizedOperatorSubtaskState poss = stateManager.prioritizedOperatorState(operatorId);
        if (!poss.isRestored()) {
            return null;
        }
        StateObjectCollection opState = (StateObjectCollection)stateManager.prioritizedOperatorState(operatorId).getPrioritizedManagedOperatorState().get(0);
        OperatorStateHandle handle = (OperatorStateHandle)Iterators.getOnlyElement((Iterator)opState.iterator());
        return handle.getDelegateStateHandle();
    }

    public static final class EventCollectingTask
    extends AbstractInvokable {
        private final OperatorID operatorID;
        private final String accumulatorName;
        private final LinkedBlockingQueue<Object> actions;
        private volatile boolean running = true;

        public EventCollectingTask(Environment environment) {
            super(environment);
            this.operatorID = OperatorID.fromJobVertexID((JobVertexID)environment.getJobVertexId());
            this.accumulatorName = (String)environment.getTaskConfiguration().get(ACC_NAME);
            this.actions = new LinkedBlockingQueue();
        }

        public void invoke() throws Exception {
            Object next;
            ArrayList<Integer> collectedInts = new ArrayList<Integer>();
            this.restoreState(collectedInts);
            this.getEnvironment().getOperatorCoordinatorEventGateway().sendOperatorEventToCoordinator(this.operatorID, new SerializedValue((Object)new StartEvent()));
            while (this.running && !((next = this.actions.take()) instanceof EndEvent)) {
                if (next instanceof IntegerEvent) {
                    collectedInts.add(((IntegerEvent)next).value);
                    continue;
                }
                if (next instanceof CheckpointMetaData) {
                    this.takeCheckpoint(((CheckpointMetaData)next).getCheckpointId(), collectedInts);
                    continue;
                }
                throw new Exception("Unrecognized: " + next);
            }
            if (this.running) {
                ListAccumulator acc = new ListAccumulator();
                collectedInts.forEach(arg_0 -> ((ListAccumulator)acc).add(arg_0));
                this.getEnvironment().getAccumulatorRegistry().getUserMap().put(this.accumulatorName, acc);
            }
        }

        public void cancel() throws Exception {
            this.running = false;
        }

        public Future<Boolean> triggerCheckpointAsync(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) {
            this.actions.add(checkpointMetaData);
            return CompletableFuture.completedFuture(true);
        }

        public Future<Void> notifyCheckpointCompleteAsync(long checkpointId) {
            return CompletableFuture.completedFuture(null);
        }

        public Future<Void> notifyCheckpointAbortAsync(long checkpointId) {
            return CompletableFuture.completedFuture(null);
        }

        public void dispatchOperatorEvent(OperatorID operator, SerializedValue<OperatorEvent> event) throws FlinkException {
            try {
                OperatorEvent opEvent = (OperatorEvent)event.deserializeValue(this.getUserCodeClassLoader());
                this.actions.add(opEvent);
            }
            catch (IOException | ClassNotFoundException e) {
                throw new FlinkException((Throwable)e);
            }
        }

        private void takeCheckpoint(long checkpointId, List<Integer> state) throws Exception {
            ByteStreamStateHandle handle = CoordinatorEventsExactlyOnceITCase.stateToHandle(state);
            TaskStateSnapshot snapshot = CoordinatorEventsExactlyOnceITCase.createSnapshot((StreamStateHandle)handle, this.operatorID);
            this.getEnvironment().acknowledgeCheckpoint(checkpointId, new CheckpointMetrics(), snapshot);
        }

        private void restoreState(List<Integer> target) throws Exception {
            StreamStateHandle stateHandle = CoordinatorEventsExactlyOnceITCase.readSnapshot(this.getEnvironment().getTaskStateManager(), this.operatorID);
            if (stateHandle != null) {
                List<Integer> list = CoordinatorEventsExactlyOnceITCase.handleToState(stateHandle);
                target.addAll(list);
            }
        }
    }

    private static final class EventSendingCoordinator
    implements OperatorCoordinator,
    Runnable {
        private final OperatorCoordinator.Context context;
        private ScheduledExecutorService executor;
        private volatile ScheduledFuture<?> periodicTask;
        private final int delay;
        private final int maxNumber;
        private int nextNumber;
        private volatile CompletableFuture<byte[]> requestedCheckpoint;
        private CompletableFuture<byte[]> nextToComplete;
        private final int failAtMessage;
        private boolean failedBefore;

        private EventSendingCoordinator(OperatorCoordinator.Context context, int numEvents, int delay) {
            Preconditions.checkArgument((delay > 0 ? 1 : 0) != 0);
            Preconditions.checkArgument((numEvents >= 3 ? 1 : 0) != 0);
            this.context = context;
            this.maxNumber = numEvents;
            this.delay = delay;
            this.executor = Executors.newSingleThreadScheduledExecutor();
            this.failAtMessage = numEvents / 3 + new Random().nextInt(numEvents / 3);
        }

        public void start() throws Exception {
        }

        public void close() throws Exception {
            this.executor.shutdownNow();
            this.executor.awaitTermination(10L, TimeUnit.SECONDS);
        }

        public void handleEventFromOperator(int subtask, OperatorEvent event) throws Exception {
            if (subtask != 0 || !(event instanceof StartEvent)) {
                throw new Exception(String.format("Don't recognize event '%s' from task %d.", event, subtask));
            }
            if (this.periodicTask != null) {
                throw new Exception("periodic already running");
            }
            this.periodicTask = this.executor.scheduleWithFixedDelay(this, this.delay, this.delay, TimeUnit.MILLISECONDS);
        }

        public void subtaskFailed(int subtask, @Nullable Throwable reason) {
            this.periodicTask.cancel(false);
            this.periodicTask = null;
            this.executor.execute(() -> {
                this.nextNumber = 0;
            });
        }

        public void subtaskReset(int subtask, long checkpointId) {
        }

        public void resetToCheckpoint(long checkpointId, @Nullable byte[] checkpointData) throws Exception {
            this.executor.execute(() -> {
                this.nextNumber = CoordinatorEventsExactlyOnceITCase.bytesToInt(checkpointData);
            });
        }

        public void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> result) throws Exception {
            this.requestedCheckpoint = result;
        }

        public void notifyCheckpointComplete(long checkpointId) {
        }

        @Override
        public void run() {
            try {
                this.handleCheckpoint();
                this.sendNextEvent();
                this.checkWhetherToTriggerFailure();
            }
            catch (Throwable t) {
                t.printStackTrace();
                System.exit(-1);
            }
        }

        private void handleCheckpoint() {
            if (this.nextToComplete != null) {
                int numToCheckpoint = Math.min(this.nextNumber, this.maxNumber);
                this.nextToComplete.complete(CoordinatorEventsExactlyOnceITCase.intToBytes(numToCheckpoint));
                this.nextToComplete = null;
            }
            if (this.requestedCheckpoint != null) {
                this.nextToComplete = this.requestedCheckpoint;
                this.requestedCheckpoint = null;
            }
        }

        private void sendNextEvent() {
            if (this.nextNumber > this.maxNumber) {
                return;
            }
            try {
                if (this.nextNumber == this.maxNumber) {
                    this.context.sendEvent((OperatorEvent)new EndEvent(), 0);
                } else {
                    this.context.sendEvent((OperatorEvent)new IntegerEvent(this.nextNumber), 0);
                }
                ++this.nextNumber;
            }
            catch (TaskNotRunningException taskNotRunningException) {
                // empty catch block
            }
        }

        private void checkWhetherToTriggerFailure() {
            if (this.nextNumber >= this.failAtMessage && !this.failedBefore) {
                this.failedBefore = true;
                this.context.failJob((Throwable)new Exception("test failure"));
            }
        }
    }

    private static final class IntegerEvent
    implements OperatorEvent {
        final int value;

        IntegerEvent(int value) {
            this.value = value;
        }

        public String toString() {
            return "IntegerEvent " + this.value;
        }
    }

    private static final class EndEvent
    implements OperatorEvent {
        private EndEvent() {
        }
    }

    private static final class StartEvent
    implements OperatorEvent {
        private StartEvent() {
        }
    }
}

