/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.util;

import java.util.Arrays;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

public class FiniteTestSource<T>
implements SourceFunction<T>,
CheckpointListener {
    private static final long serialVersionUID = 1L;
    private final Iterable<T> elements;
    private volatile boolean running = true;
    private transient int numCheckpointsComplete;

    @SafeVarargs
    public FiniteTestSource(T ... elements) {
        this((Iterable<T>)Arrays.asList(elements));
    }

    public FiniteTestSource(Iterable<T> elements) {
        this.elements = elements;
    }

    public void run(SourceFunction.SourceContext<T> ctx) throws Exception {
        this.emitElementsAndWaitForCheckpoints(ctx, 2);
        this.emitElementsAndWaitForCheckpoints(ctx, 2);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void emitElementsAndWaitForCheckpoints(SourceFunction.SourceContext<T> ctx, int checkpointsToWaitFor) throws InterruptedException {
        int checkpointToAwait;
        Object lock;
        Object object = lock = ctx.getCheckpointLock();
        synchronized (object) {
            checkpointToAwait = this.numCheckpointsComplete + checkpointsToWaitFor;
            for (T t : this.elements) {
                ctx.collect(t);
            }
        }
        object = lock;
        synchronized (object) {
            while (this.running && this.numCheckpointsComplete < checkpointToAwait) {
                lock.wait(1L);
            }
        }
    }

    public void cancel() {
        this.running = false;
    }

    public void notifyCheckpointComplete(long checkpointId) throws Exception {
        ++this.numCheckpointsComplete;
    }

    public void notifyCheckpointAborted(long checkpointId) {
    }
}

