/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.operators.wmassigners;

import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.functions.util.FunctionUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.generated.WatermarkGenerator;
import org.apache.flink.util.Preconditions;

public class WatermarkAssignerOperator
extends AbstractStreamOperator<RowData>
implements OneInputStreamOperator<RowData, RowData>,
ProcessingTimeCallback {
    private static final long serialVersionUID = 1L;
    private final int rowtimeFieldIndex;
    private final long idleTimeout;
    private final WatermarkGenerator watermarkGenerator;
    private transient long lastWatermark;
    private transient long watermarkInterval;
    private transient long currentWatermark;
    private transient long lastRecordTime;
    private transient StreamStatusMaintainer streamStatusMaintainer;
    private transient boolean functionsClosed = false;

    public WatermarkAssignerOperator(int rowtimeFieldIndex, WatermarkGenerator watermarkGenerator, long idleTimeout, ProcessingTimeService processingTimeService) {
        this.rowtimeFieldIndex = rowtimeFieldIndex;
        this.watermarkGenerator = watermarkGenerator;
        this.idleTimeout = idleTimeout;
        this.chainingStrategy = ChainingStrategy.ALWAYS;
        this.processingTimeService = (ProcessingTimeService)Preconditions.checkNotNull((Object)processingTimeService);
    }

    public void open() throws Exception {
        super.open();
        this.currentWatermark = 0L;
        this.watermarkInterval = this.getExecutionConfig().getAutoWatermarkInterval();
        this.lastRecordTime = this.getProcessingTimeService().getCurrentProcessingTime();
        this.streamStatusMaintainer = this.getContainingTask().getStreamStatusMaintainer();
        if (this.watermarkInterval > 0L) {
            long now = this.getProcessingTimeService().getCurrentProcessingTime();
            this.getProcessingTimeService().registerTimer(now + this.watermarkInterval, (ProcessingTimeCallback)this);
        }
        FunctionUtils.setFunctionRuntimeContext((Function)this.watermarkGenerator, (RuntimeContext)this.getRuntimeContext());
        FunctionUtils.openFunction((Function)this.watermarkGenerator, (Configuration)new Configuration());
    }

    public void processElement(StreamRecord<RowData> element) throws Exception {
        RowData row;
        if (this.idleTimeout > 0L) {
            this.streamStatusMaintainer.toggleStreamStatus(StreamStatus.ACTIVE);
            this.lastRecordTime = this.getProcessingTimeService().getCurrentProcessingTime();
        }
        if ((row = (RowData)element.getValue()).isNullAt(this.rowtimeFieldIndex)) {
            throw new RuntimeException("RowTime field should not be null, please convert it to a non-null long value.");
        }
        Long watermark = this.watermarkGenerator.currentWatermark(row);
        if (watermark != null) {
            this.currentWatermark = Math.max(this.currentWatermark, watermark);
        }
        this.output.collect(element);
        if (this.currentWatermark - this.lastWatermark > this.watermarkInterval) {
            this.advanceWatermark();
        }
    }

    private void advanceWatermark() {
        if (this.currentWatermark > this.lastWatermark) {
            this.lastWatermark = this.currentWatermark;
            this.output.emitWatermark(new Watermark(this.currentWatermark));
        }
    }

    public void onProcessingTime(long timestamp) throws Exception {
        long currentTime;
        this.advanceWatermark();
        if (this.idleTimeout > 0L && (currentTime = this.getProcessingTimeService().getCurrentProcessingTime()) - this.lastRecordTime > this.idleTimeout) {
            this.streamStatusMaintainer.toggleStreamStatus(StreamStatus.IDLE);
        }
        long now = this.getProcessingTimeService().getCurrentProcessingTime();
        this.getProcessingTimeService().registerTimer(now + this.watermarkInterval, (ProcessingTimeCallback)this);
    }

    public void processWatermark(Watermark mark) throws Exception {
        if (mark.getTimestamp() == Long.MAX_VALUE && this.currentWatermark != Long.MAX_VALUE) {
            if (this.idleTimeout > 0L) {
                this.streamStatusMaintainer.toggleStreamStatus(StreamStatus.ACTIVE);
            }
            this.currentWatermark = Long.MAX_VALUE;
            this.output.emitWatermark(mark);
        }
    }

    public void close() throws Exception {
        this.processWatermark(Watermark.MAX_WATERMARK);
        this.functionsClosed = true;
        FunctionUtils.closeFunction((Function)this.watermarkGenerator);
    }

    public void dispose() throws Exception {
        super.dispose();
        if (!this.functionsClosed) {
            this.functionsClosed = true;
            FunctionUtils.closeFunction((Function)this.watermarkGenerator);
        }
    }
}

