/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.file.src.assigners;

import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.Locale;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.connector.file.src.FileSourceSplit;
import org.apache.flink.connector.file.src.assigners.FileSplitAssigner;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.util.MathUtils;
import org.apache.flink.util.NetUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@PublicEvolving
public class LocalityAwareSplitAssigner
implements FileSplitAssigner {
    private static final Logger LOG = LoggerFactory.getLogger(LocalityAwareSplitAssigner.class);
    private final HashSet<SplitWithInfo> unassigned = new HashSet();
    private final HashMap<String, LocatableSplitChooser> localPerHost = new HashMap();
    private final LocatableSplitChooser remoteSplitChooser;
    private final SimpleCounter localAssignments;
    private final SimpleCounter remoteAssignments;

    public LocalityAwareSplitAssigner(Collection<FileSourceSplit> splits) {
        for (FileSourceSplit split2 : splits) {
            this.unassigned.add(new SplitWithInfo(split2));
        }
        this.remoteSplitChooser = new LocatableSplitChooser(this.unassigned);
        this.localAssignments = new SimpleCounter();
        this.remoteAssignments = new SimpleCounter();
    }

    @Override
    public Optional<FileSourceSplit> getNext(@Nullable String host) {
        if (StringUtils.isNullOrWhitespaceOnly((String)host)) {
            Optional<FileSourceSplit> split2 = this.getRemoteSplit();
            if (split2.isPresent()) {
                LOG.info("Assigning split to non-localized request: {}", split2);
            }
            return split2;
        }
        LocatableSplitChooser localSplits = this.localPerHost.computeIfAbsent(host = LocalityAwareSplitAssigner.normalizeHostName(host), theHost -> LocalityAwareSplitAssigner.buildChooserForHost(theHost, this.unassigned));
        SplitWithInfo localSplit = localSplits.getNextUnassignedMinLocalCountSplit(this.unassigned);
        if (localSplit != null) {
            Preconditions.checkState((boolean)this.unassigned.remove(localSplit), (Object)"Selected split has already been assigned. This should not happen!");
            LOG.info("Assigning local split to requesting host '{}': {}", (Object)host, (Object)localSplit.getSplit());
            this.localAssignments.inc();
            return Optional.of(localSplit.getSplit());
        }
        Optional<FileSourceSplit> remoteSplit = this.getRemoteSplit();
        if (remoteSplit.isPresent()) {
            LOG.info("Assigning remote split to requesting host '{}': {}", (Object)host, remoteSplit);
        }
        return remoteSplit;
    }

    @Override
    public void addSplits(Collection<FileSourceSplit> splits) {
        for (FileSourceSplit split2 : splits) {
            SplitWithInfo sc = new SplitWithInfo(split2);
            this.remoteSplitChooser.addInputSplit(sc);
            this.unassigned.add(sc);
        }
    }

    @Override
    public Collection<FileSourceSplit> remainingSplits() {
        return this.unassigned.stream().map(SplitWithInfo::getSplit).collect(Collectors.toList());
    }

    private Optional<FileSourceSplit> getRemoteSplit() {
        SplitWithInfo split2 = this.remoteSplitChooser.getNextUnassignedMinLocalCountSplit(this.unassigned);
        if (split2 == null) {
            return Optional.empty();
        }
        Preconditions.checkState((boolean)this.unassigned.remove(split2), (Object)"Selected split has already been assigned. This should not happen!");
        this.remoteAssignments.inc();
        return Optional.of(split2.getSplit());
    }

    @VisibleForTesting
    int getNumberOfLocalAssignments() {
        return MathUtils.checkedDownCast((long)this.localAssignments.getCount());
    }

    @VisibleForTesting
    int getNumberOfRemoteAssignments() {
        return MathUtils.checkedDownCast((long)this.remoteAssignments.getCount());
    }

    static String normalizeHostName(String hostName) {
        return hostName == null ? null : NetUtils.getHostnameFromFQDN((String)hostName).toLowerCase(Locale.US);
    }

    static String[] normalizeHostNames(String[] hostNames) {
        if (hostNames == null) {
            return null;
        }
        String[] normalizedHostNames = new String[hostNames.length];
        boolean changed = false;
        for (int i = 0; i < hostNames.length; ++i) {
            String normalized;
            String original = hostNames[i];
            normalizedHostNames[i] = normalized = LocalityAwareSplitAssigner.normalizeHostName(original);
            changed |= original != normalized;
        }
        return changed ? normalizedHostNames : hostNames;
    }

    private static boolean isLocal(String flinkHost, String[] hosts) {
        if (flinkHost == null || hosts == null) {
            return false;
        }
        for (String h : hosts) {
            if (h == null || !h.equals(flinkHost)) continue;
            return true;
        }
        return false;
    }

    private static LocatableSplitChooser buildChooserForHost(String host, Set<SplitWithInfo> splits) {
        LocatableSplitChooser newChooser = new LocatableSplitChooser();
        for (SplitWithInfo splitWithInfo : splits) {
            if (!LocalityAwareSplitAssigner.isLocal(host, splitWithInfo.getNormalizedHosts())) continue;
            splitWithInfo.incrementLocalCount();
            newChooser.addInputSplit(splitWithInfo);
        }
        return newChooser;
    }

    private static class LocatableSplitChooser {
        private final LinkedList<SplitWithInfo> splits = new LinkedList();
        private int minLocalCount = -1;
        private int nextMinLocalCount = -1;
        private int elementCycleCount = 0;

        LocatableSplitChooser() {
        }

        LocatableSplitChooser(Collection<SplitWithInfo> splits) {
            for (SplitWithInfo split2 : splits) {
                this.addInputSplit(split2);
            }
        }

        void addInputSplit(SplitWithInfo split2) {
            int localCount = split2.getLocalCount();
            if (this.minLocalCount == -1) {
                this.minLocalCount = localCount;
                this.elementCycleCount = 1;
                this.splits.offerFirst(split2);
            } else if (localCount < this.minLocalCount) {
                this.nextMinLocalCount = this.minLocalCount;
                this.minLocalCount = localCount;
                this.elementCycleCount = 1;
                this.splits.offerFirst(split2);
            } else if (localCount == this.minLocalCount) {
                ++this.elementCycleCount;
                this.splits.offerFirst(split2);
            } else {
                if (localCount < this.nextMinLocalCount) {
                    this.nextMinLocalCount = localCount;
                }
                this.splits.offerLast(split2);
            }
        }

        @Nullable
        SplitWithInfo getNextUnassignedMinLocalCountSplit(Set<SplitWithInfo> unassignedSplits) {
            if (this.splits.size() == 0) {
                return null;
            }
            do {
                --this.elementCycleCount;
                SplitWithInfo split2 = this.splits.pollFirst();
                if (unassignedSplits.contains(split2)) {
                    int localCount = split2.getLocalCount();
                    if (localCount > this.minLocalCount) {
                        this.splits.offerLast(split2);
                        if (this.nextMinLocalCount == -1 || split2.getLocalCount() < this.nextMinLocalCount) {
                            this.nextMinLocalCount = split2.getLocalCount();
                        }
                        split2 = null;
                    }
                } else {
                    split2 = null;
                }
                if (this.elementCycleCount == 0) {
                    this.minLocalCount = this.nextMinLocalCount;
                    this.nextMinLocalCount = -1;
                    this.elementCycleCount = this.splits.size();
                }
                if (split2 == null) continue;
                return split2;
            } while (this.elementCycleCount > 0);
            return null;
        }
    }

    private static class SplitWithInfo {
        private final FileSourceSplit split;
        private final String[] normalizedHosts;
        private int localCount;

        public SplitWithInfo(FileSourceSplit split2) {
            this.split = split2;
            this.normalizedHosts = LocalityAwareSplitAssigner.normalizeHostNames(split2.hostnames());
            this.localCount = 0;
        }

        public void incrementLocalCount() {
            ++this.localCount;
        }

        public int getLocalCount() {
            return this.localCount;
        }

        public FileSourceSplit getSplit() {
            return this.split;
        }

        public String[] getNormalizedHosts() {
            return this.normalizedHosts;
        }
    }
}

