/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.plan.nodes.exec.stream;

import java.time.ZoneId;
import java.util.Collections;
import java.util.List;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.planner.delegation.PlannerBase;
import org.apache.flink.table.planner.plan.logical.CumulativeWindowSpec;
import org.apache.flink.table.planner.plan.logical.HoppingWindowSpec;
import org.apache.flink.table.planner.plan.logical.TimeAttributeWindowingStrategy;
import org.apache.flink.table.planner.plan.logical.TumblingWindowSpec;
import org.apache.flink.table.planner.plan.logical.WindowSpec;
import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
import org.apache.flink.table.planner.plan.nodes.exec.SingleTransformationTranslator;
import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecNode;
import org.apache.flink.table.runtime.operators.window.TimeWindow;
import org.apache.flink.table.runtime.operators.window.WindowTableFunctionOperator;
import org.apache.flink.table.runtime.operators.window.assigners.CumulativeWindowAssigner;
import org.apache.flink.table.runtime.operators.window.assigners.SlidingWindowAssigner;
import org.apache.flink.table.runtime.operators.window.assigners.TumblingWindowAssigner;
import org.apache.flink.table.runtime.operators.window.assigners.WindowAssigner;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.runtime.util.TimeWindowUtil;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Preconditions;

@JsonIgnoreProperties(ignoreUnknown=true)
public class StreamExecWindowTableFunction
extends ExecNodeBase<RowData>
implements StreamExecNode<RowData>,
SingleTransformationTranslator<RowData> {
    public static final String FIELD_NAME_WINDOWING = "windowing";
    public static final String FIELD_NAME_EMIT_PER_RECORD = "emitPerRecord";
    @JsonProperty(value="windowing")
    private final TimeAttributeWindowingStrategy windowingStrategy;
    @JsonProperty(value="emitPerRecord")
    private final Boolean emitPerRecord;

    public StreamExecWindowTableFunction(TimeAttributeWindowingStrategy windowingStrategy, Boolean emitPerRecord, InputProperty inputProperty, RowType outputType, String description) {
        this(windowingStrategy, emitPerRecord, StreamExecWindowTableFunction.getNewNodeId(), Collections.singletonList(inputProperty), outputType, description);
    }

    @JsonCreator
    public StreamExecWindowTableFunction(@JsonProperty(value="windowing") TimeAttributeWindowingStrategy windowingStrategy, @JsonProperty(value="emitPerRecord") Boolean emitPerRecord, @JsonProperty(value="id") int id, @JsonProperty(value="inputProperties") List<InputProperty> inputProperties, @JsonProperty(value="outputType") RowType outputType, @JsonProperty(value="description") String description) {
        super(id, inputProperties, outputType, description);
        Preconditions.checkArgument((inputProperties.size() == 1 ? 1 : 0) != 0);
        this.windowingStrategy = (TimeAttributeWindowingStrategy)Preconditions.checkNotNull((Object)windowingStrategy);
        this.emitPerRecord = (Boolean)Preconditions.checkNotNull((Object)emitPerRecord);
    }

    @Override
    protected Transformation<RowData> translateToPlanInternal(PlannerBase planner) {
        ExecEdge inputEdge = this.getInputEdges().get(0);
        RowType inputRowType = (RowType)inputEdge.getOutputType();
        String[] inputFieldNames = inputRowType.getFieldNames().toArray(new String[0]);
        String windowSummary = this.windowingStrategy.toSummaryString(inputFieldNames);
        if (!this.emitPerRecord.booleanValue()) {
            throw new TableException(String.format("Currently Flink doesn't support individual window table-valued function %s.\n Please use window table-valued function with the following computations:\n1. aggregate using window_start and window_end as group keys.\n2. topN using window_start and window_end as partition key.\n3. join with join condition contains window starts equality of input tables and window ends equality of input tables.\n", windowSummary));
        }
        if (!this.windowingStrategy.isRowtime()) {
            throw new TableException("Processing time Window TableFunction is not supported yet.");
        }
        Transformation<?> inputTransform = inputEdge.translateToPlan(planner);
        WindowSpec windowSpec = this.windowingStrategy.getWindow();
        WindowAssigner<TimeWindow> windowAssigner = this.createWindowAssigner(windowSpec);
        ZoneId shiftTimeZone = TimeWindowUtil.getShiftTimeZone(this.windowingStrategy.getTimeAttributeType(), planner.getTableConfig());
        WindowTableFunctionOperator windowTableFunctionOperator = new WindowTableFunctionOperator(windowAssigner, this.windowingStrategy.getTimeAttributeIndex(), shiftTimeZone);
        return new OneInputTransformation(inputTransform, this.getDescription(), (OneInputStreamOperator)windowTableFunctionOperator, InternalTypeInfo.of(this.getOutputType()), inputTransform.getParallelism());
    }

    private WindowAssigner<TimeWindow> createWindowAssigner(WindowSpec windowSpec) {
        if (windowSpec instanceof TumblingWindowSpec) {
            TumblingWindowSpec tumblingWindowSpec = (TumblingWindowSpec)windowSpec;
            TumblingWindowAssigner windowAssigner = TumblingWindowAssigner.of(tumblingWindowSpec.getSize());
            if (tumblingWindowSpec.getOffset() != null) {
                windowAssigner = windowAssigner.withOffset(tumblingWindowSpec.getOffset());
            }
            return windowAssigner;
        }
        if (windowSpec instanceof HoppingWindowSpec) {
            HoppingWindowSpec hoppingWindowSpec = (HoppingWindowSpec)windowSpec;
            SlidingWindowAssigner windowAssigner = SlidingWindowAssigner.of(hoppingWindowSpec.getSize(), hoppingWindowSpec.getSlide());
            if (hoppingWindowSpec.getOffset() != null) {
                windowAssigner = windowAssigner.withOffset(hoppingWindowSpec.getOffset());
            }
            return windowAssigner;
        }
        if (windowSpec instanceof CumulativeWindowSpec) {
            CumulativeWindowSpec cumulativeWindowSpec = (CumulativeWindowSpec)windowSpec;
            CumulativeWindowAssigner windowAssigner = CumulativeWindowAssigner.of(cumulativeWindowSpec.getMaxSize(), cumulativeWindowSpec.getStep());
            if (cumulativeWindowSpec.getOffset() != null) {
                windowAssigner = windowAssigner.withOffset(cumulativeWindowSpec.getOffset());
            }
            return windowAssigner;
        }
        throw new TableException(String.format("Unknown window spec: %s", windowSpec.getClass().getSimpleName()));
    }
}

