/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.api.common.eventtime;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.common.eventtime.WatermarkOutput;
import org.apache.flink.util.Preconditions;

@Internal
public class WatermarkOutputMultiplexer {
    private final WatermarkOutput underlyingOutput;
    private long combinedWatermark = Long.MIN_VALUE;
    private final Map<String, OutputState> watermarkPerOutputId;
    private final List<OutputState> watermarkOutputs;

    public WatermarkOutputMultiplexer(WatermarkOutput underlyingOutput) {
        this.underlyingOutput = underlyingOutput;
        this.watermarkPerOutputId = new HashMap<String, OutputState>();
        this.watermarkOutputs = new ArrayList<OutputState>();
    }

    public void registerNewOutput(String id) {
        OutputState outputState = new OutputState();
        OutputState previouslyRegistered = this.watermarkPerOutputId.putIfAbsent(id, outputState);
        Preconditions.checkState(previouslyRegistered == null, "Already contains an output for ID %s", id);
        this.watermarkOutputs.add(outputState);
    }

    public boolean unregisterOutput(String id) {
        OutputState output = this.watermarkPerOutputId.remove(id);
        if (output != null) {
            this.watermarkOutputs.remove(output);
            return true;
        }
        return false;
    }

    public WatermarkOutput getImmediateOutput(String outputId) {
        OutputState outputState = this.watermarkPerOutputId.get(outputId);
        Preconditions.checkArgument(outputState != null, "no output registered under id %s", outputId);
        return new ImmediateOutput(outputState);
    }

    public WatermarkOutput getDeferredOutput(String outputId) {
        OutputState outputState = this.watermarkPerOutputId.get(outputId);
        Preconditions.checkArgument(outputState != null, "no output registered under id %s", outputId);
        return new DeferredOutput(outputState);
    }

    public void onPeriodicEmit() {
        this.updateCombinedWatermark();
    }

    private void updateCombinedWatermark() {
        long minimumOverAllOutputs = Long.MAX_VALUE;
        boolean hasOutputs = false;
        boolean allIdle = true;
        for (OutputState outputState : this.watermarkOutputs) {
            if (!outputState.isIdle()) {
                minimumOverAllOutputs = Math.min(minimumOverAllOutputs, outputState.getWatermark());
                allIdle = false;
            }
            hasOutputs = true;
        }
        if (!hasOutputs) {
            return;
        }
        if (allIdle) {
            this.underlyingOutput.markIdle();
        } else if (minimumOverAllOutputs > this.combinedWatermark) {
            this.combinedWatermark = minimumOverAllOutputs;
            this.underlyingOutput.emitWatermark(new Watermark(minimumOverAllOutputs));
        }
    }

    private static class DeferredOutput
    implements WatermarkOutput {
        private final OutputState state;

        public DeferredOutput(OutputState state) {
            this.state = state;
        }

        @Override
        public void emitWatermark(Watermark watermark) {
            this.state.setWatermark(watermark.getTimestamp());
        }

        @Override
        public void markIdle() {
            this.state.setIdle(true);
        }
    }

    private class ImmediateOutput
    implements WatermarkOutput {
        private final OutputState state;

        public ImmediateOutput(OutputState state) {
            this.state = state;
        }

        @Override
        public void emitWatermark(Watermark watermark) {
            long timestamp = watermark.getTimestamp();
            boolean wasUpdated = this.state.setWatermark(timestamp);
            if (wasUpdated && timestamp > WatermarkOutputMultiplexer.this.combinedWatermark) {
                WatermarkOutputMultiplexer.this.updateCombinedWatermark();
            }
        }

        @Override
        public void markIdle() {
            this.state.setIdle(true);
            WatermarkOutputMultiplexer.this.updateCombinedWatermark();
        }
    }

    private static class OutputState {
        private long watermark = Long.MIN_VALUE;
        private boolean idle = false;

        private OutputState() {
        }

        public long getWatermark() {
            Preconditions.checkState(!this.idle, "Output is idle.");
            return this.watermark;
        }

        public boolean setWatermark(long watermark) {
            this.idle = false;
            boolean updated = watermark > this.watermark;
            this.watermark = Math.max(watermark, this.watermark);
            return updated;
        }

        public boolean isIdle() {
            return this.idle;
        }

        public void setIdle(boolean idle) {
            this.idle = idle;
        }
    }
}

