/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.plan.nodes.exec.serde;

import java.io.IOException;
import java.io.StringWriter;
import java.io.Writer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.calcite.rel.core.AggregateCall;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexNode;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.shaded.guava30.com.google.common.collect.Sets;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonDeserializer;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonSerializer;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.Module;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.jsontype.NamedType;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.module.SimpleModule;
import org.apache.flink.streaming.api.transformations.StreamExchangeMode;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeGraph;
import org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecSink;
import org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecTableSourceScan;
import org.apache.flink.table.planner.plan.nodes.exec.serde.AggregateCallJsonDeserializer;
import org.apache.flink.table.planner.plan.nodes.exec.serde.AggregateCallJsonSerializer;
import org.apache.flink.table.planner.plan.nodes.exec.serde.DurationJsonDeserializer;
import org.apache.flink.table.planner.plan.nodes.exec.serde.DurationJsonSerializer;
import org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeUtil;
import org.apache.flink.table.planner.plan.nodes.exec.serde.LogicalTypeJsonDeserializer;
import org.apache.flink.table.planner.plan.nodes.exec.serde.LogicalTypeJsonSerializer;
import org.apache.flink.table.planner.plan.nodes.exec.serde.ObjectIdentifierJsonDeserializer;
import org.apache.flink.table.planner.plan.nodes.exec.serde.ObjectIdentifierJsonSerializer;
import org.apache.flink.table.planner.plan.nodes.exec.serde.RelDataTypeJsonDeserializer;
import org.apache.flink.table.planner.plan.nodes.exec.serde.RelDataTypeJsonSerializer;
import org.apache.flink.table.planner.plan.nodes.exec.serde.RexLiteralJsonDeserializer;
import org.apache.flink.table.planner.plan.nodes.exec.serde.RexNodeJsonDeserializer;
import org.apache.flink.table.planner.plan.nodes.exec.serde.RexNodeJsonSerializer;
import org.apache.flink.table.planner.plan.nodes.exec.serde.SerdeContext;
import org.apache.flink.table.planner.plan.nodes.exec.serde.ShuffleJsonDeserializer;
import org.apache.flink.table.planner.plan.nodes.exec.serde.ShuffleJsonSerializer;
import org.apache.flink.table.planner.plan.nodes.exec.spec.DynamicTableSinkSpec;
import org.apache.flink.table.planner.plan.nodes.exec.spec.DynamicTableSourceSpec;
import org.apache.flink.table.planner.plan.nodes.exec.spec.TemporalTableSourceSpec;
import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecLookupJoin;
import org.apache.flink.table.planner.plan.nodes.exec.visitor.AbstractExecNodeExactlyOnceVisitor;
import org.apache.flink.table.planner.plan.nodes.exec.visitor.ExecNodeVisitorImpl;
import org.apache.flink.table.planner.plan.utils.ReflectionsUtil;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.util.Preconditions;

public class ExecNodeGraphJsonPlanGenerator {
    public static String generateJsonPlan(ExecNodeGraph execGraph, SerdeContext serdeCtx) throws IOException {
        ExecNodeGraphJsonPlanGenerator.validate(execGraph);
        ObjectMapper mapper = JsonSerdeUtil.createObjectMapper(serdeCtx);
        SimpleModule module = new SimpleModule();
        ExecNodeGraphJsonPlanGenerator.registerSerializers(module);
        mapper.registerModule((Module)module);
        StringWriter writer = new StringWriter(1024);
        try (JsonGenerator gen = mapper.getFactory().createGenerator((Writer)writer);){
            JsonPlanGraph jsonPlanGraph = JsonPlanGraph.fromExecNodeGraph(execGraph);
            gen.writeObject((Object)jsonPlanGraph);
        }
        return writer.toString();
    }

    public static ExecNodeGraph generateExecNodeGraph(String jsonPlan, SerdeContext serdeCtx) throws IOException {
        ObjectMapper mapper = JsonSerdeUtil.createObjectMapper(serdeCtx);
        SimpleModule module = new SimpleModule();
        Set<Class<ExecNode>> nodeClasses = ReflectionsUtil.scanSubClasses("org.apache.flink.table.planner.plan.nodes.exec", ExecNode.class);
        nodeClasses.forEach(c -> module.registerSubtypes(new NamedType[]{new NamedType(c, c.getName())}));
        ExecNodeGraphJsonPlanGenerator.registerDeserializers(module);
        mapper.registerModule((Module)module);
        JsonPlanGraph jsonPlanGraph = (JsonPlanGraph)mapper.readValue(jsonPlan, JsonPlanGraph.class);
        return jsonPlanGraph.convertToExecNodeGraph(serdeCtx);
    }

    private static void registerSerializers(SimpleModule module) {
        module.addSerializer((JsonSerializer)new ObjectIdentifierJsonSerializer());
        module.addSerializer((JsonSerializer)new LogicalTypeJsonSerializer());
        module.addSerializer((JsonSerializer)new RelDataTypeJsonSerializer());
        module.addSerializer((JsonSerializer)new RexNodeJsonSerializer());
        module.addSerializer((JsonSerializer)new AggregateCallJsonSerializer());
        module.addSerializer((JsonSerializer)new DurationJsonSerializer());
    }

    private static void registerDeserializers(SimpleModule module) {
        module.addDeserializer(ObjectIdentifier.class, (JsonDeserializer)new ObjectIdentifierJsonDeserializer());
        module.addDeserializer(LogicalType.class, (JsonDeserializer)new LogicalTypeJsonDeserializer());
        module.addDeserializer(RelDataType.class, (JsonDeserializer)new RelDataTypeJsonDeserializer());
        module.addDeserializer(RexNode.class, (JsonDeserializer)new RexNodeJsonDeserializer());
        module.addDeserializer(RexLiteral.class, (JsonDeserializer)new RexLiteralJsonDeserializer());
        module.addDeserializer(AggregateCall.class, (JsonDeserializer)new AggregateCallJsonDeserializer());
        module.addDeserializer(Duration.class, (JsonDeserializer)new DurationJsonDeserializer());
    }

    private static void validate(ExecNodeGraph execGraph) {
        AbstractExecNodeExactlyOnceVisitor visitor = new AbstractExecNodeExactlyOnceVisitor(){

            @Override
            protected void visitNode(ExecNode<?> node) {
                StreamExecLookupJoin streamExecLookupJoin;
                if (!JsonSerdeUtil.hasJsonCreatorAnnotation(node.getClass())) {
                    throw new TableException(String.format("%s does not implement @JsonCreator annotation on constructor.", node.getClass().getCanonicalName()));
                }
                if (node instanceof StreamExecLookupJoin && null == (streamExecLookupJoin = (StreamExecLookupJoin)node).getTemporalTableSourceSpec().getTableSourceSpec()) {
                    throw new TableException("TemporalTableSourceSpec can not be serialized.");
                }
                super.visitInputs(node);
            }
        };
        execGraph.getRootNodes().forEach(visitor::visit);
    }

    public static class JsonPlanEdge {
        public static final String FIELD_NAME_SOURCE = "source";
        public static final String FIELD_NAME_TARGET = "target";
        public static final String FIELD_NAME_SHUFFLE = "shuffle";
        public static final String FIELD_NAME_SHUFFLE_MODE = "shuffleMode";
        @JsonProperty(value="source")
        private final int sourceId;
        @JsonProperty(value="target")
        private final int targetId;
        @JsonProperty(value="shuffle")
        @JsonSerialize(using=ShuffleJsonSerializer.class)
        @JsonDeserialize(using=ShuffleJsonDeserializer.class)
        private final ExecEdge.Shuffle shuffle;
        @JsonProperty(value="shuffleMode")
        private final StreamExchangeMode exchangeMode;

        @JsonCreator
        public JsonPlanEdge(@JsonProperty(value="source") int sourceId, @JsonProperty(value="target") int targetId, @JsonProperty(value="shuffle") ExecEdge.Shuffle shuffle, @JsonProperty(value="shuffleMode") StreamExchangeMode exchangeMode) {
            this.sourceId = sourceId;
            this.targetId = targetId;
            this.shuffle = shuffle;
            this.exchangeMode = exchangeMode;
        }

        @JsonIgnore
        public int getSourceId() {
            return this.sourceId;
        }

        @JsonIgnore
        public int getTargetId() {
            return this.targetId;
        }

        @JsonIgnore
        public ExecEdge.Shuffle getShuffle() {
            return this.shuffle;
        }

        @JsonIgnore
        public StreamExchangeMode getExchangeMode() {
            return this.exchangeMode;
        }

        public static JsonPlanEdge fromExecEdge(ExecEdge execEdge) {
            return new JsonPlanEdge(execEdge.getSource().getId(), execEdge.getTarget().getId(), execEdge.getShuffle(), execEdge.getExchangeMode());
        }
    }

    public static class JsonPlanGraph {
        public static final String FIELD_NAME_FLINK_VERSION = "flinkVersion";
        public static final String FIELD_NAME_NODES = "nodes";
        public static final String FIELD_NAME_EDGES = "edges";
        @JsonProperty(value="flinkVersion")
        private final String flinkVersion;
        @JsonProperty(value="nodes")
        private final List<ExecNode<?>> nodes;
        @JsonProperty(value="edges")
        private final List<JsonPlanEdge> edges;

        @JsonCreator
        public JsonPlanGraph(@JsonProperty(value="flinkVersion") String flinkVersion, @JsonProperty(value="nodes") List<ExecNode<?>> nodes, @JsonProperty(value="edges") List<JsonPlanEdge> edges) {
            this.flinkVersion = flinkVersion;
            this.nodes = nodes;
            this.edges = edges;
        }

        public static JsonPlanGraph fromExecNodeGraph(ExecNodeGraph execGraph) {
            final ArrayList allNodes = new ArrayList();
            final ArrayList<JsonPlanEdge> allEdges = new ArrayList<JsonPlanEdge>();
            final HashSet nodesIds = new HashSet();
            final Set visitedNodes = Sets.newIdentityHashSet();
            ExecNodeVisitorImpl visitor = new ExecNodeVisitorImpl(){

                @Override
                public void visit(ExecNode<?> node) {
                    if (visitedNodes.contains(node)) {
                        return;
                    }
                    super.visitInputs(node);
                    int id = node.getId();
                    if (nodesIds.contains(id)) {
                        throw new TableException(String.format("The id: %s is not unique for ExecNode: %s.\nplease check it.", id, node.getDescription()));
                    }
                    allNodes.add(node);
                    nodesIds.add(id);
                    visitedNodes.add(node);
                    for (ExecEdge execEdge : node.getInputEdges()) {
                        allEdges.add(JsonPlanEdge.fromExecEdge(execEdge));
                    }
                }
            };
            execGraph.getRootNodes().forEach(visitor::visit);
            Preconditions.checkArgument((allNodes.size() == nodesIds.size() ? 1 : 0) != 0);
            return new JsonPlanGraph(execGraph.getFlinkVersion(), allNodes, allEdges);
        }

        public ExecNodeGraph convertToExecNodeGraph(SerdeContext serdeCtx) {
            HashMap idToExecNodes = new HashMap();
            for (ExecNode<?> execNode : this.nodes) {
                int id = execNode.getId();
                if (idToExecNodes.containsKey(id)) {
                    throw new TableException(String.format("The id: %s is not unique for ExecNode: %s.\nplease check it.", id, execNode.getDescription()));
                }
                if (execNode instanceof CommonExecTableSourceScan) {
                    DynamicTableSourceSpec tableSourceSpec = ((CommonExecTableSourceScan)execNode).getTableSourceSpec();
                    tableSourceSpec.setReadableConfig((ReadableConfig)serdeCtx.getConfiguration());
                    tableSourceSpec.setClassLoader(serdeCtx.getClassLoader());
                } else if (execNode instanceof CommonExecSink) {
                    DynamicTableSinkSpec tableSinkSpec = ((CommonExecSink)execNode).getTableSinkSpec();
                    tableSinkSpec.setReadableConfig((ReadableConfig)serdeCtx.getConfiguration());
                    tableSinkSpec.setClassLoader(serdeCtx.getClassLoader());
                } else if (execNode instanceof StreamExecLookupJoin) {
                    StreamExecLookupJoin streamExecLookupJoin = (StreamExecLookupJoin)execNode;
                    TemporalTableSourceSpec temporalTableSourceSpec = streamExecLookupJoin.getTemporalTableSourceSpec();
                    if (null == temporalTableSourceSpec) {
                        throw new TableException("temporalTable can't be null, please check corresponding node.");
                    }
                    DynamicTableSourceSpec tableSourceSpec = temporalTableSourceSpec.getTableSourceSpec();
                    if (null == tableSourceSpec) {
                        throw new TableException("tableSourceSpec can't be null, please check corresponding node.");
                    }
                    tableSourceSpec.setReadableConfig((ReadableConfig)serdeCtx.getConfiguration());
                    tableSourceSpec.setClassLoader(serdeCtx.getClassLoader());
                }
                idToExecNodes.put(id, execNode);
            }
            HashMap<Integer, List> idToInputEdges = new HashMap<Integer, List>();
            HashMap<Integer, List> idToOutputEdges = new HashMap<Integer, List>();
            for (JsonPlanEdge edge : this.edges) {
                ExecNode source = (ExecNode)idToExecNodes.get(edge.sourceId);
                if (source == null) {
                    throw new TableException(String.format("Source node id: %s is not found in nodes.", edge.getSourceId()));
                }
                ExecNode target = (ExecNode)idToExecNodes.get(edge.getTargetId());
                if (target == null) {
                    throw new TableException(String.format("Target node id: %s is not found in nodes.", edge.getTargetId()));
                }
                ExecEdge execEdge = ExecEdge.builder().source(source).target(target).shuffle(edge.getShuffle()).exchangeMode(edge.getExchangeMode()).build();
                idToInputEdges.computeIfAbsent(target.getId(), n -> new ArrayList()).add(execEdge);
                idToOutputEdges.computeIfAbsent(source.getId(), n -> new ArrayList()).add(execEdge);
            }
            ArrayList rootNodes = new ArrayList();
            for (Map.Entry entry : idToExecNodes.entrySet()) {
                int id = (Integer)entry.getKey();
                ExecNode node = (ExecNode)entry.getValue();
                List inputEdges = idToInputEdges.getOrDefault(id, new ArrayList());
                node.setInputEdges(inputEdges);
                if (idToOutputEdges.containsKey(id)) continue;
                rootNodes.add(node);
            }
            return new ExecNodeGraph(rootNodes);
        }
    }
}

