/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.io;

import java.io.IOException;
import org.apache.flink.annotation.Internal;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.partition.consumer.CheckpointableInput;
import org.apache.flink.streaming.runtime.io.CheckpointBarrierBehaviourController;
import org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinator;

@Internal
public class UnalignedController
implements CheckpointBarrierBehaviourController {
    private final SubtaskCheckpointCoordinator checkpointCoordinator;
    private final CheckpointableInput[] inputs;

    public UnalignedController(SubtaskCheckpointCoordinator checkpointCoordinator, CheckpointableInput ... inputs) {
        this.checkpointCoordinator = checkpointCoordinator;
        this.inputs = inputs;
    }

    @Override
    public void barrierReceived(InputChannelInfo channelInfo, CheckpointBarrier barrier) {
    }

    @Override
    public boolean preProcessFirstBarrier(InputChannelInfo channelInfo, CheckpointBarrier barrier) throws IOException, CheckpointException {
        this.checkpointCoordinator.initCheckpoint(barrier.getId(), barrier.getCheckpointOptions());
        for (CheckpointableInput input : this.inputs) {
            input.checkpointStarted(barrier);
        }
        return true;
    }

    @Override
    public boolean postProcessLastBarrier(InputChannelInfo channelInfo, CheckpointBarrier barrier) {
        this.resetPendingCheckpoint(barrier.getId());
        return false;
    }

    private void resetPendingCheckpoint(long cancelledId) {
        for (CheckpointableInput input : this.inputs) {
            input.checkpointStopped(cancelledId);
        }
    }

    @Override
    public void abortPendingCheckpoint(long cancelledId, CheckpointException exception) {
        this.resetPendingCheckpoint(cancelledId);
    }

    @Override
    public void obsoleteBarrierReceived(InputChannelInfo channelInfo, CheckpointBarrier barrier) {
    }
}

