/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.graph;

import java.io.IOException;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import javax.annotation.Nullable;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.operators.ResourceSpec;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.runtime.OperatorIDPair;
import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.InputOutputFormatContainer;
import org.apache.flink.runtime.jobgraph.InputOutputFormatVertex;
import org.apache.flink.runtime.jobgraph.JobEdge;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.runtime.jobgraph.topology.DefaultLogicalPipelinedRegion;
import org.apache.flink.runtime.jobgraph.topology.DefaultLogicalTopology;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.util.TaskConfig;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.checkpoint.WithMasterCheckpointHook;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.graph.FunctionMasterCheckpointHookFactory;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.graph.StreamEdge;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.graph.StreamGraphHasher;
import org.apache.flink.streaming.api.graph.StreamGraphHasherV2;
import org.apache.flink.streaming.api.graph.StreamGraphUserHashHasher;
import org.apache.flink.streaming.api.graph.StreamNode;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.InputSelectable;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.operators.UdfStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.YieldingOperatorFactory;
import org.apache.flink.streaming.api.transformations.ShuffleMode;
import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
import org.apache.flink.streaming.runtime.partitioner.RescalePartitioner;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
import org.apache.flink.streaming.runtime.tasks.StreamIterationHead;
import org.apache.flink.streaming.runtime.tasks.StreamIterationTail;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class StreamingJobGraphGenerator {
    private static final Logger LOG = LoggerFactory.getLogger(StreamingJobGraphGenerator.class);
    private static final int MANAGED_MEMORY_FRACTION_SCALE = 16;
    private static final long DEFAULT_NETWORK_BUFFER_TIMEOUT = 100L;
    public static final long UNDEFINED_NETWORK_BUFFER_TIMEOUT = -1L;
    private final StreamGraph streamGraph;
    private final Map<Integer, JobVertex> jobVertices;
    private final JobGraph jobGraph;
    private final Collection<Integer> builtVertices;
    private final List<StreamEdge> physicalEdgesInOrder;
    private final Map<Integer, Map<Integer, StreamConfig>> chainedConfigs;
    private final Map<Integer, StreamConfig> vertexConfigs;
    private final Map<Integer, String> chainedNames;
    private final Map<Integer, ResourceSpec> chainedMinResources;
    private final Map<Integer, ResourceSpec> chainedPreferredResources;
    private final Map<Integer, InputOutputFormatContainer> chainedInputOutputFormats;
    private final StreamGraphHasher defaultStreamGraphHasher;
    private final List<StreamGraphHasher> legacyStreamGraphHashers;

    public static JobGraph createJobGraph(StreamGraph streamGraph) {
        return StreamingJobGraphGenerator.createJobGraph(streamGraph, null);
    }

    public static JobGraph createJobGraph(StreamGraph streamGraph, @Nullable JobID jobID) {
        return new StreamingJobGraphGenerator(streamGraph, jobID).createJobGraph();
    }

    private StreamingJobGraphGenerator(StreamGraph streamGraph, @Nullable JobID jobID) {
        this.streamGraph = streamGraph;
        this.defaultStreamGraphHasher = new StreamGraphHasherV2();
        this.legacyStreamGraphHashers = Arrays.asList(new StreamGraphUserHashHasher());
        this.jobVertices = new HashMap<Integer, JobVertex>();
        this.builtVertices = new HashSet<Integer>();
        this.chainedConfigs = new HashMap<Integer, Map<Integer, StreamConfig>>();
        this.vertexConfigs = new HashMap<Integer, StreamConfig>();
        this.chainedNames = new HashMap<Integer, String>();
        this.chainedMinResources = new HashMap<Integer, ResourceSpec>();
        this.chainedPreferredResources = new HashMap<Integer, ResourceSpec>();
        this.chainedInputOutputFormats = new HashMap<Integer, InputOutputFormatContainer>();
        this.physicalEdgesInOrder = new ArrayList<StreamEdge>();
        this.jobGraph = new JobGraph(jobID, streamGraph.getJobName());
    }

    private JobGraph createJobGraph() {
        this.preValidate();
        this.jobGraph.setScheduleMode(this.streamGraph.getScheduleMode());
        Map<Integer, byte[]> hashes = this.defaultStreamGraphHasher.traverseStreamGraphAndGenerateHashes(this.streamGraph);
        ArrayList<Map<Integer, byte[]>> legacyHashes = new ArrayList<Map<Integer, byte[]>>(this.legacyStreamGraphHashers.size());
        for (StreamGraphHasher hasher : this.legacyStreamGraphHashers) {
            legacyHashes.add(hasher.traverseStreamGraphAndGenerateHashes(this.streamGraph));
        }
        this.setChaining(hashes, legacyHashes);
        this.setPhysicalEdges();
        this.setSlotSharingAndCoLocation();
        StreamingJobGraphGenerator.setManagedMemoryFraction(Collections.unmodifiableMap(this.jobVertices), Collections.unmodifiableMap(this.vertexConfigs), Collections.unmodifiableMap(this.chainedConfigs), id -> this.streamGraph.getStreamNode((Integer)id).getMinResources(), id -> this.streamGraph.getStreamNode((Integer)id).getManagedMemoryWeight());
        this.configureCheckpointing();
        this.jobGraph.setSavepointRestoreSettings(this.streamGraph.getSavepointRestoreSettings());
        JobGraphUtils.addUserArtifactEntries(this.streamGraph.getUserArtifacts(), (JobGraph)this.jobGraph);
        try {
            this.jobGraph.setExecutionConfig(this.streamGraph.getExecutionConfig());
        }
        catch (IOException e) {
            throw new IllegalConfigurationException("Could not serialize the ExecutionConfig.This indicates that non-serializable types (like custom serializers) were registered");
        }
        return this.jobGraph;
    }

    private void preValidate() {
        CheckpointConfig checkpointConfig = this.streamGraph.getCheckpointConfig();
        if (checkpointConfig.isCheckpointingEnabled()) {
            if (this.streamGraph.isIterative() && !checkpointConfig.isForceCheckpointing()) {
                throw new UnsupportedOperationException("Checkpointing is currently not supported by default for iterative jobs, as we cannot guarantee exactly once semantics. State checkpoints happen normally, but records in-transit during the snapshot will be lost upon failure. \nThe user can force enable state checkpoints with the reduced guarantees by calling: env.enableCheckpointing(interval,true)");
            }
            ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
            for (StreamNode node : this.streamGraph.getStreamNodes()) {
                Class<StreamOperator> operatorClass;
                StreamOperatorFactory<?> operatorFactory = node.getOperatorFactory();
                if (operatorFactory == null || !InputSelectable.class.isAssignableFrom(operatorClass = operatorFactory.getStreamOperatorClass(classLoader))) continue;
                throw new UnsupportedOperationException("Checkpointing is currently not supported for operators that implement InputSelectable:" + operatorClass.getName());
            }
        }
        if (checkpointConfig.isUnalignedCheckpointsEnabled() && this.getCheckpointingMode(checkpointConfig) != CheckpointingMode.EXACTLY_ONCE) {
            LOG.warn("Unaligned checkpoints can only be used with checkpointing mode EXACTLY_ONCE");
            checkpointConfig.enableUnalignedCheckpoints(false);
        }
    }

    private void setPhysicalEdges() {
        HashMap<Integer, List> physicalInEdgesInOrder = new HashMap<Integer, List>();
        for (StreamEdge streamEdge : this.physicalEdgesInOrder) {
            int target = streamEdge.getTargetId();
            List inEdges = physicalInEdgesInOrder.computeIfAbsent(target, k -> new ArrayList());
            inEdges.add(streamEdge);
        }
        for (Map.Entry entry : physicalInEdgesInOrder.entrySet()) {
            int vertex = (Integer)entry.getKey();
            List edgeList = (List)entry.getValue();
            this.vertexConfigs.get(vertex).setInPhysicalEdges(edgeList);
        }
    }

    private void setChaining(Map<Integer, byte[]> hashes, List<Map<Integer, byte[]>> legacyHashes) {
        for (Integer sourceNodeId : this.streamGraph.getSourceIDs()) {
            this.createChain(sourceNodeId, 0, new OperatorChainInfo(sourceNodeId, hashes, legacyHashes, this.streamGraph));
        }
    }

    private List<StreamEdge> createChain(Integer currentNodeId, int chainIndex, OperatorChainInfo chainInfo) {
        Integer startNodeId = chainInfo.getStartNodeId();
        if (!this.builtVertices.contains(startNodeId)) {
            ArrayList<StreamEdge> transitiveOutEdges = new ArrayList<StreamEdge>();
            ArrayList<StreamEdge> chainableOutputs = new ArrayList<StreamEdge>();
            ArrayList<StreamEdge> nonChainableOutputs = new ArrayList<StreamEdge>();
            StreamNode currentNode = this.streamGraph.getStreamNode(currentNodeId);
            for (StreamEdge outEdge : currentNode.getOutEdges()) {
                if (StreamingJobGraphGenerator.isChainable(outEdge, this.streamGraph)) {
                    chainableOutputs.add(outEdge);
                    continue;
                }
                nonChainableOutputs.add(outEdge);
            }
            for (StreamEdge chainable : chainableOutputs) {
                transitiveOutEdges.addAll(this.createChain(chainable.getTargetId(), chainIndex + 1, chainInfo));
            }
            for (StreamEdge nonChainable : nonChainableOutputs) {
                transitiveOutEdges.add(nonChainable);
                this.createChain(nonChainable.getTargetId(), 0, chainInfo.newChain(nonChainable.getTargetId()));
            }
            this.chainedNames.put(currentNodeId, this.createChainedName(currentNodeId, chainableOutputs));
            this.chainedMinResources.put(currentNodeId, this.createChainedMinResources(currentNodeId, chainableOutputs));
            this.chainedPreferredResources.put(currentNodeId, this.createChainedPreferredResources(currentNodeId, chainableOutputs));
            OperatorID currentOperatorId = chainInfo.addNodeToChain(currentNodeId, this.chainedNames.get(currentNodeId));
            if (currentNode.getInputFormat() != null) {
                this.getOrCreateFormatContainer(startNodeId).addInputFormat(currentOperatorId, currentNode.getInputFormat());
            }
            if (currentNode.getOutputFormat() != null) {
                this.getOrCreateFormatContainer(startNodeId).addOutputFormat(currentOperatorId, currentNode.getOutputFormat());
            }
            StreamConfig config = currentNodeId.equals(startNodeId) ? this.createJobVertex(startNodeId, chainInfo) : new StreamConfig(new Configuration());
            this.setVertexConfig(currentNodeId, config, chainableOutputs, nonChainableOutputs);
            if (currentNodeId.equals(startNodeId)) {
                config.setChainStart();
                config.setChainIndex(0);
                config.setOperatorName(this.streamGraph.getStreamNode(currentNodeId).getOperatorName());
                for (StreamEdge edge : transitiveOutEdges) {
                    this.connect(startNodeId, edge);
                }
                config.setOutEdgesInOrder(transitiveOutEdges);
                config.setTransitiveChainedTaskConfigs(this.chainedConfigs.get(startNodeId));
            } else {
                this.chainedConfigs.computeIfAbsent(startNodeId, k -> new HashMap());
                config.setChainIndex(chainIndex);
                StreamNode node = this.streamGraph.getStreamNode(currentNodeId);
                config.setOperatorName(node.getOperatorName());
                this.chainedConfigs.get(startNodeId).put(currentNodeId, config);
            }
            config.setOperatorID(currentOperatorId);
            if (chainableOutputs.isEmpty()) {
                config.setChainEnd();
            }
            return transitiveOutEdges;
        }
        return new ArrayList<StreamEdge>();
    }

    private InputOutputFormatContainer getOrCreateFormatContainer(Integer startNodeId) {
        return this.chainedInputOutputFormats.computeIfAbsent(startNodeId, k -> new InputOutputFormatContainer(Thread.currentThread().getContextClassLoader()));
    }

    private String createChainedName(Integer vertexID, List<StreamEdge> chainedOutputs) {
        String operatorName = this.streamGraph.getStreamNode(vertexID).getOperatorName();
        if (chainedOutputs.size() > 1) {
            ArrayList<String> outputChainedNames = new ArrayList<String>();
            for (StreamEdge chainable : chainedOutputs) {
                outputChainedNames.add(this.chainedNames.get(chainable.getTargetId()));
            }
            return operatorName + " -> (" + StringUtils.join(outputChainedNames, (String)", ") + ")";
        }
        if (chainedOutputs.size() == 1) {
            return operatorName + " -> " + this.chainedNames.get(chainedOutputs.get(0).getTargetId());
        }
        return operatorName;
    }

    private ResourceSpec createChainedMinResources(Integer vertexID, List<StreamEdge> chainedOutputs) {
        ResourceSpec minResources = this.streamGraph.getStreamNode(vertexID).getMinResources();
        for (StreamEdge chainable : chainedOutputs) {
            minResources = minResources.merge(this.chainedMinResources.get(chainable.getTargetId()));
        }
        return minResources;
    }

    private ResourceSpec createChainedPreferredResources(Integer vertexID, List<StreamEdge> chainedOutputs) {
        ResourceSpec preferredResources = this.streamGraph.getStreamNode(vertexID).getPreferredResources();
        for (StreamEdge chainable : chainedOutputs) {
            preferredResources = preferredResources.merge(this.chainedPreferredResources.get(chainable.getTargetId()));
        }
        return preferredResources;
    }

    private StreamConfig createJobVertex(Integer streamNodeId, OperatorChainInfo chainInfo) {
        InputOutputFormatVertex jobVertex;
        StreamNode streamNode = this.streamGraph.getStreamNode(streamNodeId);
        byte[] hash = chainInfo.getHash(streamNodeId);
        if (hash == null) {
            throw new IllegalStateException("Cannot find node hash. Did you generate them before calling this method?");
        }
        JobVertexID jobVertexId = new JobVertexID(hash);
        List chainedOperators = chainInfo.getChainedOperatorHashes(streamNodeId);
        ArrayList<OperatorIDPair> operatorIDPairs = new ArrayList<OperatorIDPair>();
        if (chainedOperators != null) {
            for (Tuple2 chainedOperator : chainedOperators) {
                OperatorID userDefinedOperatorID = chainedOperator.f1 == null ? null : new OperatorID((byte[])chainedOperator.f1);
                operatorIDPairs.add(OperatorIDPair.of((OperatorID)new OperatorID((byte[])chainedOperator.f0), (OperatorID)userDefinedOperatorID));
            }
        }
        if (this.chainedInputOutputFormats.containsKey(streamNodeId)) {
            jobVertex = new InputOutputFormatVertex(this.chainedNames.get(streamNodeId), jobVertexId, operatorIDPairs);
            this.chainedInputOutputFormats.get(streamNodeId).write(new TaskConfig(jobVertex.getConfiguration()));
        } else {
            jobVertex = new JobVertex(this.chainedNames.get(streamNodeId), jobVertexId, operatorIDPairs);
        }
        for (OperatorCoordinator.Provider coordinatorProvider : chainInfo.getCoordinatorProviders()) {
            try {
                jobVertex.addOperatorCoordinator(new SerializedValue((Object)coordinatorProvider));
            }
            catch (IOException e) {
                throw new FlinkRuntimeException(String.format("Coordinator Provider for node %s is not serializable.", this.chainedNames.get(streamNodeId)));
            }
        }
        jobVertex.setResources(this.chainedMinResources.get(streamNodeId), this.chainedPreferredResources.get(streamNodeId));
        jobVertex.setInvokableClass(streamNode.getJobVertexClass());
        int parallelism = streamNode.getParallelism();
        if (parallelism > 0) {
            jobVertex.setParallelism(parallelism);
        } else {
            parallelism = jobVertex.getParallelism();
        }
        jobVertex.setMaxParallelism(streamNode.getMaxParallelism());
        if (LOG.isDebugEnabled()) {
            LOG.debug("Parallelism set: {} for {}", (Object)parallelism, (Object)streamNodeId);
        }
        jobVertex.setInputDependencyConstraint(this.streamGraph.getExecutionConfig().getDefaultInputDependencyConstraint());
        this.jobVertices.put(streamNodeId, (JobVertex)jobVertex);
        this.builtVertices.add(streamNodeId);
        this.jobGraph.addVertex((JobVertex)jobVertex);
        return new StreamConfig(jobVertex.getConfiguration());
    }

    private void setVertexConfig(Integer vertexID, StreamConfig config, List<StreamEdge> chainableOutputs, List<StreamEdge> nonChainableOutputs) {
        StreamNode vertex = this.streamGraph.getStreamNode(vertexID);
        config.setVertexID(vertexID);
        config.setTypeSerializersIn(vertex.getTypeSerializersIn());
        config.setTypeSerializerOut(vertex.getTypeSerializerOut());
        for (StreamEdge edge : chainableOutputs) {
            if (edge.getOutputTag() == null) continue;
            config.setTypeSerializerSideOut(edge.getOutputTag(), edge.getOutputTag().getTypeInfo().createSerializer(this.streamGraph.getExecutionConfig()));
        }
        for (StreamEdge edge : nonChainableOutputs) {
            if (edge.getOutputTag() == null) continue;
            config.setTypeSerializerSideOut(edge.getOutputTag(), edge.getOutputTag().getTypeInfo().createSerializer(this.streamGraph.getExecutionConfig()));
        }
        config.setStreamOperatorFactory(vertex.getOperatorFactory());
        config.setOutputSelectors(vertex.getOutputSelectors());
        config.setNumberOfOutputs(nonChainableOutputs.size());
        config.setNonChainedOutputs(nonChainableOutputs);
        config.setChainedOutputs(chainableOutputs);
        config.setTimeCharacteristic(this.streamGraph.getTimeCharacteristic());
        CheckpointConfig checkpointCfg = this.streamGraph.getCheckpointConfig();
        config.setStateBackend(this.streamGraph.getStateBackend());
        config.setCheckpointingEnabled(checkpointCfg.isCheckpointingEnabled());
        config.setCheckpointMode(this.getCheckpointingMode(checkpointCfg));
        config.setUnalignedCheckpointsEnabled(checkpointCfg.isUnalignedCheckpointsEnabled());
        for (int i = 0; i < vertex.getStatePartitioners().length; ++i) {
            config.setStatePartitioner(i, vertex.getStatePartitioners()[i]);
        }
        config.setStateKeySerializer(vertex.getStateKeySerializer());
        Class<? extends AbstractInvokable> vertexClass = vertex.getJobVertexClass();
        if (vertexClass.equals(StreamIterationHead.class) || vertexClass.equals(StreamIterationTail.class)) {
            config.setIterationId(this.streamGraph.getBrokerID(vertexID));
            config.setIterationWaitTime(this.streamGraph.getLoopTimeout(vertexID));
        }
        this.vertexConfigs.put(vertexID, config);
    }

    private CheckpointingMode getCheckpointingMode(CheckpointConfig checkpointConfig) {
        CheckpointingMode checkpointingMode = checkpointConfig.getCheckpointingMode();
        Preconditions.checkArgument((checkpointingMode == CheckpointingMode.EXACTLY_ONCE || checkpointingMode == CheckpointingMode.AT_LEAST_ONCE ? 1 : 0) != 0, (Object)"Unexpected checkpointing mode.");
        if (checkpointConfig.isCheckpointingEnabled()) {
            return checkpointingMode;
        }
        return CheckpointingMode.AT_LEAST_ONCE;
    }

    private void connect(Integer headOfChain, StreamEdge edge) {
        ResultPartitionType resultPartitionType;
        this.physicalEdgesInOrder.add(edge);
        Integer downStreamVertexID = edge.getTargetId();
        JobVertex headVertex = this.jobVertices.get(headOfChain);
        JobVertex downStreamVertex = this.jobVertices.get(downStreamVertexID);
        StreamConfig downStreamConfig = new StreamConfig(downStreamVertex.getConfiguration());
        downStreamConfig.setNumberOfInputs(downStreamConfig.getNumberOfInputs() + 1);
        StreamPartitioner<?> partitioner = edge.getPartitioner();
        switch (edge.getShuffleMode()) {
            case PIPELINED: {
                resultPartitionType = ResultPartitionType.PIPELINED_BOUNDED;
                break;
            }
            case BATCH: {
                resultPartitionType = ResultPartitionType.BLOCKING;
                break;
            }
            case UNDEFINED: {
                resultPartitionType = this.determineResultPartitionType(partitioner);
                break;
            }
            default: {
                throw new UnsupportedOperationException("Data exchange mode " + (Object)((Object)edge.getShuffleMode()) + " is not supported yet.");
            }
        }
        this.checkAndResetBufferTimeout(resultPartitionType, edge);
        JobEdge jobEdge = StreamingJobGraphGenerator.isPointwisePartitioner(partitioner) ? downStreamVertex.connectNewDataSetAsInput(headVertex, DistributionPattern.POINTWISE, resultPartitionType) : downStreamVertex.connectNewDataSetAsInput(headVertex, DistributionPattern.ALL_TO_ALL, resultPartitionType);
        jobEdge.setShipStrategyName(partitioner.toString());
        if (LOG.isDebugEnabled()) {
            LOG.debug("CONNECTED: {} - {} -> {}", new Object[]{partitioner.getClass().getSimpleName(), headOfChain, downStreamVertexID});
        }
    }

    private void checkAndResetBufferTimeout(ResultPartitionType type, StreamEdge edge) {
        long bufferTimeout = edge.getBufferTimeout();
        if (type.isBlocking() && bufferTimeout != -1L) {
            throw new UnsupportedOperationException("Blocking partition does not support buffer timeout " + bufferTimeout + " for src operator in edge " + edge.toString() + ". \nPlease either reset buffer timeout as -1 or use the non-blocking partition.");
        }
        if (type.isPipelined() && bufferTimeout == -1L) {
            edge.setBufferTimeout(100L);
        }
    }

    private static boolean isPointwisePartitioner(StreamPartitioner<?> partitioner) {
        return partitioner instanceof ForwardPartitioner || partitioner instanceof RescalePartitioner;
    }

    private ResultPartitionType determineResultPartitionType(StreamPartitioner<?> partitioner) {
        switch (this.streamGraph.getGlobalDataExchangeMode()) {
            case ALL_EDGES_BLOCKING: {
                return ResultPartitionType.BLOCKING;
            }
            case FORWARD_EDGES_PIPELINED: {
                if (partitioner instanceof ForwardPartitioner) {
                    return ResultPartitionType.PIPELINED_BOUNDED;
                }
                return ResultPartitionType.BLOCKING;
            }
            case POINTWISE_EDGES_PIPELINED: {
                if (StreamingJobGraphGenerator.isPointwisePartitioner(partitioner)) {
                    return ResultPartitionType.PIPELINED_BOUNDED;
                }
                return ResultPartitionType.BLOCKING;
            }
            case ALL_EDGES_PIPELINED: {
                return ResultPartitionType.PIPELINED_BOUNDED;
            }
        }
        throw new RuntimeException("Unrecognized global data exchange mode " + (Object)((Object)this.streamGraph.getGlobalDataExchangeMode()));
    }

    public static boolean isChainable(StreamEdge edge, StreamGraph streamGraph) {
        StreamNode upStreamVertex = streamGraph.getSourceVertex(edge);
        StreamNode downStreamVertex = streamGraph.getTargetVertex(edge);
        return downStreamVertex.getInEdges().size() == 1 && upStreamVertex.isSameSlotSharingGroup(downStreamVertex) && StreamingJobGraphGenerator.areOperatorsChainable(upStreamVertex, downStreamVertex, streamGraph) && edge.getPartitioner() instanceof ForwardPartitioner && edge.getShuffleMode() != ShuffleMode.BATCH && upStreamVertex.getParallelism() == downStreamVertex.getParallelism() && streamGraph.isChainingEnabled();
    }

    @VisibleForTesting
    static boolean areOperatorsChainable(StreamNode upStreamVertex, StreamNode downStreamVertex, StreamGraph streamGraph) {
        StreamOperatorFactory<?> upStreamOperator = upStreamVertex.getOperatorFactory();
        StreamOperatorFactory<?> downStreamOperator = downStreamVertex.getOperatorFactory();
        if (downStreamOperator == null || upStreamOperator == null) {
            return false;
        }
        if (upStreamOperator.getChainingStrategy() == ChainingStrategy.NEVER || downStreamOperator.getChainingStrategy() != ChainingStrategy.ALWAYS) {
            return false;
        }
        if (downStreamOperator instanceof YieldingOperatorFactory) {
            return !StreamingJobGraphGenerator.getHeadOperator(upStreamVertex, streamGraph).isStreamSource();
        }
        return true;
    }

    private static StreamOperatorFactory<?> getHeadOperator(StreamNode upStreamVertex, StreamGraph streamGraph) {
        if (upStreamVertex.getInEdges().size() == 1 && StreamingJobGraphGenerator.isChainable(upStreamVertex.getInEdges().get(0), streamGraph)) {
            return StreamingJobGraphGenerator.getHeadOperator(streamGraph.getSourceVertex(upStreamVertex.getInEdges().get(0)), streamGraph);
        }
        return upStreamVertex.getOperatorFactory();
    }

    private void setSlotSharingAndCoLocation() {
        this.setSlotSharing();
        this.setCoLocation();
    }

    private void setSlotSharing() {
        HashMap<String, SlotSharingGroup> specifiedSlotSharingGroups = new HashMap<String, SlotSharingGroup>();
        Map<JobVertexID, SlotSharingGroup> vertexRegionSlotSharingGroups = this.buildVertexRegionSlotSharingGroups();
        for (Map.Entry<Integer, JobVertex> entry : this.jobVertices.entrySet()) {
            JobVertex vertex = entry.getValue();
            String slotSharingGroupKey = this.streamGraph.getStreamNode(entry.getKey()).getSlotSharingGroup();
            SlotSharingGroup effectiveSlotSharingGroup = slotSharingGroupKey == null ? null : (slotSharingGroupKey.equals("default") ? vertexRegionSlotSharingGroups.get(vertex.getID()) : specifiedSlotSharingGroups.computeIfAbsent(slotSharingGroupKey, k -> new SlotSharingGroup()));
            vertex.setSlotSharingGroup(effectiveSlotSharingGroup);
        }
    }

    private Map<JobVertexID, SlotSharingGroup> buildVertexRegionSlotSharingGroups() {
        HashMap<JobVertexID, SlotSharingGroup> vertexRegionSlotSharingGroups = new HashMap<JobVertexID, SlotSharingGroup>();
        SlotSharingGroup defaultSlotSharingGroup = new SlotSharingGroup();
        boolean allRegionsInSameSlotSharingGroup = this.streamGraph.isAllVerticesInSameSlotSharingGroupByDefault();
        Set regions = new DefaultLogicalTopology(this.jobGraph).getLogicalPipelinedRegions();
        for (DefaultLogicalPipelinedRegion region : regions) {
            SlotSharingGroup regionSlotSharingGroup = allRegionsInSameSlotSharingGroup ? defaultSlotSharingGroup : new SlotSharingGroup();
            for (JobVertexID jobVertexID : region.getVertexIDs()) {
                vertexRegionSlotSharingGroups.put(jobVertexID, regionSlotSharingGroup);
            }
        }
        return vertexRegionSlotSharingGroups;
    }

    private void setCoLocation() {
        HashMap<String, Tuple2> coLocationGroups = new HashMap<String, Tuple2>();
        for (Map.Entry<Integer, JobVertex> entry : this.jobVertices.entrySet()) {
            StreamNode node = this.streamGraph.getStreamNode(entry.getKey());
            JobVertex vertex = entry.getValue();
            SlotSharingGroup sharingGroup = vertex.getSlotSharingGroup();
            String coLocationGroupKey = node.getCoLocationGroup();
            if (coLocationGroupKey == null) continue;
            if (sharingGroup == null) {
                throw new IllegalStateException("Cannot use a co-location constraint without a slot sharing group");
            }
            Tuple2 constraint = coLocationGroups.computeIfAbsent(coLocationGroupKey, k -> new Tuple2((Object)sharingGroup, (Object)new CoLocationGroup()));
            if (constraint.f0 != sharingGroup) {
                throw new IllegalStateException("Cannot co-locate operators from different slot sharing groups");
            }
            vertex.updateCoLocationGroup((CoLocationGroup)constraint.f1);
            ((CoLocationGroup)constraint.f1).addVertex(vertex);
        }
    }

    private static void setManagedMemoryFraction(Map<Integer, JobVertex> jobVertices, Map<Integer, StreamConfig> operatorConfigs, Map<Integer, Map<Integer, StreamConfig>> vertexChainedConfigs, Function<Integer, ResourceSpec> operatorResourceRetriever, Function<Integer, Integer> operatorManagedMemoryWeightRetriever) {
        Set<SlotSharingGroup> slotSharingGroups = Collections.newSetFromMap(new IdentityHashMap());
        HashMap<JobVertexID, Integer> vertexHeadOperators = new HashMap<JobVertexID, Integer>();
        HashMap<JobVertexID, Set<Integer>> vertexOperators = new HashMap<JobVertexID, Set<Integer>>();
        for (Map.Entry<Integer, JobVertex> entry : jobVertices.entrySet()) {
            int headOperatorId = entry.getKey();
            JobVertex jobVertex = entry.getValue();
            SlotSharingGroup jobVertexSlotSharingGroup = jobVertex.getSlotSharingGroup();
            Preconditions.checkState((jobVertexSlotSharingGroup != null ? 1 : 0) != 0, (Object)"JobVertex slot sharing group must not be null");
            slotSharingGroups.add(jobVertexSlotSharingGroup);
            vertexHeadOperators.put(jobVertex.getID(), headOperatorId);
            HashSet<Integer> operatorIds = new HashSet<Integer>();
            operatorIds.add(headOperatorId);
            operatorIds.addAll(vertexChainedConfigs.getOrDefault(headOperatorId, Collections.emptyMap()).keySet());
            vertexOperators.put(jobVertex.getID(), operatorIds);
        }
        for (SlotSharingGroup slotSharingGroup : slotSharingGroups) {
            StreamingJobGraphGenerator.setManagedMemoryFractionForSlotSharingGroup(slotSharingGroup, vertexHeadOperators, vertexOperators, operatorConfigs, vertexChainedConfigs, operatorResourceRetriever, operatorManagedMemoryWeightRetriever);
        }
    }

    private static void setManagedMemoryFractionForSlotSharingGroup(SlotSharingGroup slotSharingGroup, Map<JobVertexID, Integer> vertexHeadOperators, Map<JobVertexID, Set<Integer>> vertexOperators, Map<Integer, StreamConfig> operatorConfigs, Map<Integer, Map<Integer, StreamConfig>> vertexChainedConfigs, Function<Integer, ResourceSpec> operatorResourceRetriever, Function<Integer, Integer> operatorManagedMemoryWeightRetriever) {
        int groupManagedMemoryWeight = slotSharingGroup.getJobVertexIds().stream().flatMap(vid -> ((Set)vertexOperators.get(vid)).stream()).mapToInt(operatorManagedMemoryWeightRetriever::apply).sum();
        for (JobVertexID jobVertexID : slotSharingGroup.getJobVertexIds()) {
            for (int operatorNodeId : vertexOperators.get(jobVertexID)) {
                StreamConfig operatorConfig = operatorConfigs.get(operatorNodeId);
                ResourceSpec operatorResourceSpec = operatorResourceRetriever.apply(operatorNodeId);
                int operatorManagedMemoryWeight = operatorManagedMemoryWeightRetriever.apply(operatorNodeId);
                StreamingJobGraphGenerator.setManagedMemoryFractionForOperator(operatorResourceSpec, slotSharingGroup.getResourceSpec(), operatorManagedMemoryWeight, groupManagedMemoryWeight, operatorConfig);
            }
            int headOperatorNodeId = vertexHeadOperators.get(jobVertexID);
            StreamConfig vertexConfig = operatorConfigs.get(headOperatorNodeId);
            vertexConfig.setTransitiveChainedTaskConfigs(vertexChainedConfigs.get(headOperatorNodeId));
        }
    }

    private static void setManagedMemoryFractionForOperator(ResourceSpec operatorResourceSpec, ResourceSpec groupResourceSpec, int operatorManagedMemoryWeight, int groupManagedMemoryWeight, StreamConfig operatorConfig) {
        long groupManagedMemoryBytes;
        double managedMemoryFraction = groupResourceSpec.equals((Object)ResourceSpec.UNKNOWN) ? (groupManagedMemoryWeight > 0 ? StreamingJobGraphGenerator.getFractionRoundedDown(operatorManagedMemoryWeight, groupManagedMemoryWeight) : 0.0) : ((groupManagedMemoryBytes = groupResourceSpec.getManagedMemory().getBytes()) > 0L ? StreamingJobGraphGenerator.getFractionRoundedDown(operatorResourceSpec.getManagedMemory().getBytes(), groupManagedMemoryBytes) : 0.0);
        operatorConfig.setManagedMemoryFraction(managedMemoryFraction);
    }

    private static double getFractionRoundedDown(long dividend, long divisor) {
        return BigDecimal.valueOf(dividend).divide(BigDecimal.valueOf(divisor), 16, 1).doubleValue();
    }

    private void configureCheckpointing() {
        SerializedValue serializedStateBackend;
        SerializedValue serializedHooks;
        CheckpointRetentionPolicy retentionAfterTermination;
        CheckpointConfig cfg = this.streamGraph.getCheckpointConfig();
        long interval = cfg.getCheckpointInterval();
        if (interval < 10L) {
            interval = Long.MAX_VALUE;
        }
        ArrayList<JobVertexID> triggerVertices = new ArrayList<JobVertexID>();
        ArrayList<JobVertexID> ackVertices = new ArrayList<JobVertexID>(this.jobVertices.size());
        ArrayList<JobVertexID> commitVertices = new ArrayList<JobVertexID>(this.jobVertices.size());
        for (JobVertex vertex : this.jobVertices.values()) {
            if (vertex.isInputVertex()) {
                triggerVertices.add(vertex.getID());
            }
            commitVertices.add(vertex.getID());
            ackVertices.add(vertex.getID());
        }
        if (cfg.isExternalizedCheckpointsEnabled()) {
            CheckpointConfig.ExternalizedCheckpointCleanup cleanup = cfg.getExternalizedCheckpointCleanup();
            if (cleanup == null) {
                throw new IllegalStateException("Externalized checkpoints enabled, but no cleanup mode configured.");
            }
            retentionAfterTermination = cleanup.deleteOnCancellation() ? CheckpointRetentionPolicy.RETAIN_ON_FAILURE : CheckpointRetentionPolicy.RETAIN_ON_CANCELLATION;
        } else {
            retentionAfterTermination = CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION;
        }
        ArrayList<FunctionMasterCheckpointHookFactory> hooks = new ArrayList<FunctionMasterCheckpointHookFactory>();
        for (StreamNode node : this.streamGraph.getStreamNodes()) {
            org.apache.flink.api.common.functions.Function f;
            if (!(node.getOperatorFactory() instanceof UdfStreamOperatorFactory) || !((f = ((UdfStreamOperatorFactory)node.getOperatorFactory()).getUserFunction()) instanceof WithMasterCheckpointHook)) continue;
            hooks.add(new FunctionMasterCheckpointHookFactory((WithMasterCheckpointHook)f));
        }
        if (hooks.isEmpty()) {
            serializedHooks = null;
        } else {
            try {
                MasterTriggerRestoreHook.Factory[] asArray = hooks.toArray(new MasterTriggerRestoreHook.Factory[hooks.size()]);
                serializedHooks = new SerializedValue((Object)asArray);
            }
            catch (IOException e) {
                throw new FlinkRuntimeException("Trigger/restore hook is not serializable", (Throwable)e);
            }
        }
        if (this.streamGraph.getStateBackend() == null) {
            serializedStateBackend = null;
        } else {
            try {
                serializedStateBackend = new SerializedValue((Object)this.streamGraph.getStateBackend());
            }
            catch (IOException e) {
                throw new FlinkRuntimeException("State backend is not serializable", (Throwable)e);
            }
        }
        JobCheckpointingSettings settings = new JobCheckpointingSettings(triggerVertices, ackVertices, commitVertices, CheckpointCoordinatorConfiguration.builder().setCheckpointInterval(interval).setCheckpointTimeout(cfg.getCheckpointTimeout()).setMinPauseBetweenCheckpoints(cfg.getMinPauseBetweenCheckpoints()).setMaxConcurrentCheckpoints(cfg.getMaxConcurrentCheckpoints()).setCheckpointRetentionPolicy(retentionAfterTermination).setExactlyOnce(this.getCheckpointingMode(cfg) == CheckpointingMode.EXACTLY_ONCE).setPreferCheckpointForRecovery(cfg.isPreferCheckpointForRecovery()).setTolerableCheckpointFailureNumber(cfg.getTolerableCheckpointFailureNumber()).setUnalignedCheckpointsEnabled(cfg.isUnalignedCheckpointsEnabled()).build(), serializedStateBackend, serializedHooks);
        this.jobGraph.setSnapshotSettings(settings);
    }

    private static class OperatorChainInfo {
        private final Integer startNodeId;
        private final Map<Integer, byte[]> hashes;
        private final List<Map<Integer, byte[]>> legacyHashes;
        private final Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes;
        private final List<OperatorCoordinator.Provider> coordinatorProviders;
        private final StreamGraph streamGraph;

        private OperatorChainInfo(int startNodeId, Map<Integer, byte[]> hashes, List<Map<Integer, byte[]>> legacyHashes, StreamGraph streamGraph) {
            this.startNodeId = startNodeId;
            this.hashes = hashes;
            this.legacyHashes = legacyHashes;
            this.chainedOperatorHashes = new HashMap<Integer, List<Tuple2<byte[], byte[]>>>();
            this.coordinatorProviders = new ArrayList<OperatorCoordinator.Provider>();
            this.streamGraph = streamGraph;
        }

        byte[] getHash(Integer streamNodeId) {
            return this.hashes.get(streamNodeId);
        }

        private Integer getStartNodeId() {
            return this.startNodeId;
        }

        private List<Tuple2<byte[], byte[]>> getChainedOperatorHashes(int startNodeId) {
            return this.chainedOperatorHashes.get(startNodeId);
        }

        private List<OperatorCoordinator.Provider> getCoordinatorProviders() {
            return this.coordinatorProviders;
        }

        private OperatorID addNodeToChain(int currentNodeId, String operatorName) {
            List operatorHashes = this.chainedOperatorHashes.computeIfAbsent(this.startNodeId, k -> new ArrayList());
            byte[] primaryHashBytes = this.hashes.get(currentNodeId);
            for (Map<Integer, byte[]> legacyHash : this.legacyHashes) {
                operatorHashes.add(new Tuple2((Object)primaryHashBytes, (Object)legacyHash.get(currentNodeId)));
            }
            this.streamGraph.getStreamNode(currentNodeId).getCoordinatorProvider(operatorName, new OperatorID(this.getHash(currentNodeId))).map(this.coordinatorProviders::add);
            return new OperatorID(primaryHashBytes);
        }

        private OperatorChainInfo newChain(Integer startNodeId) {
            return new OperatorChainInfo(startNodeId, this.hashes, this.legacyHashes, this.streamGraph);
        }
    }
}

