/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.remote.client;

import java.io.IOException;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.validation.constraints.NotNull;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.remote.Peer;
import org.apache.nifi.remote.PeerDescription;
import org.apache.nifi.remote.PeerStatus;
import org.apache.nifi.remote.TransferDirection;
import org.apache.nifi.remote.client.PeerPersistence;
import org.apache.nifi.remote.client.PeerStatusProvider;
import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
import org.apache.nifi.remote.util.EventReportUtil;
import org.apache.nifi.remote.util.PeerStatusCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PeerSelector {
    private static final Logger logger = LoggerFactory.getLogger(PeerSelector.class);
    private static final long PEER_CACHE_MILLIS = TimeUnit.MILLISECONDS.convert(1L, TimeUnit.MINUTES);
    private final PeerPersistence peerPersistence;
    private final PeerStatusProvider peerStatusProvider;
    private final ConcurrentMap<PeerDescription, Long> peerPenaltyExpirations = new ConcurrentHashMap<PeerDescription, Long>();
    private volatile PeerStatusCache peerStatusCache;
    private EventReporter eventReporter;

    public PeerSelector(PeerStatusProvider peerStatusProvider, PeerPersistence peerPersistence) {
        this.peerStatusProvider = peerStatusProvider;
        this.peerPersistence = peerPersistence;
        this.restoreInitialPeerStatusCache();
    }

    private void restoreInitialPeerStatusCache() {
        try {
            PeerStatusCache restoredPeerStatusCache = null;
            if (this.peerPersistence != null && (restoredPeerStatusCache = this.peerPersistence.restore()) != null) {
                String cachedRemoteInstanceUris;
                SiteToSiteTransportProtocol currentProtocol = this.peerStatusProvider.getTransportProtocol();
                SiteToSiteTransportProtocol cachedProtocol = restoredPeerStatusCache.getTransportProtocol();
                String currentRemoteInstanceUris = this.peerStatusProvider.getRemoteInstanceUris();
                if (!currentRemoteInstanceUris.equals(cachedRemoteInstanceUris = restoredPeerStatusCache.getRemoteInstanceUris())) {
                    logger.info("Discard stored peer statuses in {} because remote instance URIs has changed from {} to {}", new Object[]{this.peerPersistence.getClass().getSimpleName(), cachedRemoteInstanceUris, currentRemoteInstanceUris});
                    restoredPeerStatusCache = null;
                }
                if (!currentProtocol.equals((Object)cachedProtocol)) {
                    logger.warn("Discard stored peer statuses in {} because transport protocol has changed from {} to {}", new Object[]{this.peerPersistence.getClass().getSimpleName(), cachedProtocol, currentProtocol});
                    restoredPeerStatusCache = null;
                }
            }
            this.peerStatusCache = restoredPeerStatusCache;
        }
        catch (IOException ioe) {
            logger.warn("Failed to recover peer statuses from {} due to {}; will continue without loading information from file", (Object)this.peerPersistence.getClass().getSimpleName(), (Object)ioe);
        }
    }

    private static double calculateNormalizedWeight(TransferDirection direction, long totalFlowFileCount, int flowFileCount, int peerCount) {
        double cappedPercent;
        if (peerCount == 1) {
            return 100.0;
        }
        if (totalFlowFileCount == 0L) {
            cappedPercent = 1.0 / (double)peerCount;
        } else {
            double percentageOfFlowFiles;
            cappedPercent = percentageOfFlowFiles = (double)flowFileCount / (double)totalFlowFileCount;
            if (direction == TransferDirection.SEND) {
                cappedPercent = (1.0 - percentageOfFlowFiles) / (double)(peerCount - 1);
            }
        }
        return new BigDecimal(cappedPercent * 100.0).setScale(2, RoundingMode.FLOOR).doubleValue();
    }

    private static LinkedHashMap<PeerStatus, Double> sortMapByWeight(Map<PeerStatus, Double> unsortedMap) {
        ArrayList<Map.Entry<PeerStatus, Double>> list = new ArrayList<Map.Entry<PeerStatus, Double>>(unsortedMap.entrySet());
        list.sort(Map.Entry.comparingByValue());
        LinkedHashMap<PeerStatus, Double> result = new LinkedHashMap<PeerStatus, Double>();
        for (int i = list.size() - 1; i >= 0; --i) {
            Map.Entry entry = (Map.Entry)list.get(i);
            result.put((PeerStatus)entry.getKey(), (Double)entry.getValue());
        }
        return result;
    }

    private static void printDistributionStatistics(Map<PeerStatus, Double> sortedPeerWorkloads, TransferDirection direction) {
        if (logger.isDebugEnabled() && sortedPeerWorkloads != null) {
            DecimalFormat df = new DecimalFormat("##.##");
            df.setRoundingMode(RoundingMode.FLOOR);
            StringBuilder distributionDescription = new StringBuilder();
            distributionDescription.append("New weighted distribution of nodes:");
            for (Map.Entry<PeerStatus, Double> entry : sortedPeerWorkloads.entrySet()) {
                double percentage = entry.getValue();
                distributionDescription.append("\n").append(entry.getKey()).append(" will").append(direction == TransferDirection.RECEIVE ? " send " : " receive ").append(df.format(percentage)).append("% of data");
            }
            logger.debug(distributionDescription.toString());
        }
    }

    private static double sumMapValues(Map<PeerStatus, Double> peerWeightMap) {
        return peerWeightMap.values().stream().mapToDouble(Double::doubleValue).sum();
    }

    public void clear() {
        this.peerPenaltyExpirations.clear();
    }

    public PeerStatus getNextPeerStatus(TransferDirection direction) {
        Set<PeerStatus> peerStatuses = this.getPeerStatuses();
        LinkedHashMap<PeerStatus, Double> orderedPeerStatuses = this.buildWeightedPeerMap(peerStatuses, direction);
        return this.getAvailablePeerStatus(orderedPeerStatuses);
    }

    public boolean isPenalized(PeerStatus peerStatus) {
        Long expirationEnd = (Long)this.peerPenaltyExpirations.get(peerStatus.getPeerDescription());
        return expirationEnd != null && expirationEnd > System.currentTimeMillis();
    }

    public void penalize(Peer peer, long penalizationMillis) {
        this.penalize(peer.getDescription(), penalizationMillis);
    }

    public void penalize(PeerDescription peerDescription, long penalizationMillis) {
        Long expiration = (Long)this.peerPenaltyExpirations.get(peerDescription);
        if (expiration == null) {
            expiration = 0L;
        }
        long newExpiration = Math.max(expiration, System.currentTimeMillis() + penalizationMillis);
        this.peerPenaltyExpirations.put(peerDescription, newExpiration);
    }

    public void refresh() {
        long cacheAgeMs = this.getCacheAge();
        logger.debug("External refresh triggered. Last refresh was {} ms ago", (Object)cacheAgeMs);
        if (this.isPeerRefreshNeeded()) {
            logger.debug("Refreshing peer status cache");
            this.refreshPeerStatusCache();
        } else {
            logger.debug("Cache is still valid; skipping refresh");
        }
    }

    public void setEventReporter(EventReporter eventReporter) {
        this.eventReporter = eventReporter;
    }

    LinkedHashMap<PeerStatus, Double> buildWeightedPeerMap(Set<PeerStatus> statuses, TransferDirection direction) {
        Map<PeerStatus, Double> peerWorkloads = this.createDestinationMap(statuses, direction);
        if (!peerWorkloads.isEmpty()) {
            LinkedHashMap<PeerStatus, Double> sortedPeerWorkloads = PeerSelector.sortMapByWeight(peerWorkloads);
            PeerSelector.printDistributionStatistics(sortedPeerWorkloads, direction);
            return sortedPeerWorkloads;
        }
        logger.debug("No peers available");
        return new LinkedHashMap<PeerStatus, Double>();
    }

    @NotNull
    private Map<PeerStatus, Double> createDestinationMap(Set<PeerStatus> peerStatuses, TransferDirection direction) {
        HashMap<PeerStatus, Double> peerWorkloads = new HashMap<PeerStatus, Double>();
        long totalFlowFileCount = peerStatuses.stream().mapToLong(PeerStatus::getFlowFileCount).sum();
        logger.debug("Building weighted map of peers with total remote NiFi flowfile count: {}", (Object)totalFlowFileCount);
        for (PeerStatus nodeInfo : peerStatuses) {
            int flowFileCount = nodeInfo.getFlowFileCount();
            double normalizedWeight = PeerSelector.calculateNormalizedWeight(direction, totalFlowFileCount, flowFileCount, peerStatuses.size());
            peerWorkloads.put(nodeInfo, normalizedWeight);
        }
        return peerWorkloads;
    }

    private Set<PeerStatus> fetchRemotePeerStatuses(Set<PeerDescription> peersToRequestClusterInfoFrom) throws IOException {
        logger.debug("Fetching remote peer statuses from: {}", peersToRequestClusterInfoFrom);
        Exception lastFailure = null;
        HashSet<PeerStatus> allPeerStatuses = new HashSet<PeerStatus>();
        for (PeerDescription peerDescription : peersToRequestClusterInfoFrom) {
            try {
                Set<PeerStatus> statusesForPeerDescription = this.peerStatusProvider.fetchRemotePeerStatuses(peerDescription);
                Set filteredStatuses = statusesForPeerDescription.stream().filter(PeerStatus::isQueryForPeers).collect(Collectors.toSet());
                allPeerStatuses.addAll(filteredStatuses);
            }
            catch (Exception e) {
                logger.warn("Could not communicate with {}:{} to determine which node(s) exist in the remote NiFi instance, due to {}", new Object[]{peerDescription.getHostname(), peerDescription.getPort(), e.toString()});
                lastFailure = e;
            }
        }
        if (allPeerStatuses.isEmpty() && lastFailure != null) {
            throw new IOException("Unable to retrieve nodes from remote instance", lastFailure);
        }
        return allPeerStatuses;
    }

    private PeerStatus getAvailablePeerStatus(Map<PeerStatus, Double> orderedPeerStatuses) {
        if (orderedPeerStatuses == null || orderedPeerStatuses.isEmpty()) {
            logger.warn("Available peers collection is empty; no peer available");
            return null;
        }
        Map<PeerStatus, Double> unpenalizedPeers = orderedPeerStatuses.entrySet().stream().filter(e -> !this.isPenalized((PeerStatus)e.getKey())).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
        double totalWeights = PeerSelector.sumMapValues(unpenalizedPeers);
        logger.debug("Determining next available peer ({} peers with total weight {})", (Object)unpenalizedPeers.keySet().size(), (Object)totalWeights);
        double random = Math.random() * Math.min(100.0, totalWeights);
        logger.debug("Generated random value {}", (Object)random);
        double threshold = 0.0;
        for (Map.Entry<PeerStatus, Double> e2 : unpenalizedPeers.entrySet()) {
            logger.debug("Initial threshold was {}; added peer value {}; total {}", new Object[]{threshold, e2.getValue(), threshold + e2.getValue()});
            if (!(random <= (threshold += e2.getValue().doubleValue()))) continue;
            return e2.getKey();
        }
        logger.debug("Did not select a peer; r {}, t {}, w {}", new Object[]{random, threshold, orderedPeerStatuses.values()});
        logger.debug("All peers appear to be penalized; returning null");
        return null;
    }

    private long getCacheAge() {
        if (this.peerStatusCache == null) {
            return -1L;
        }
        return System.currentTimeMillis() - this.peerStatusCache.getTimestamp();
    }

    @NotNull
    private Set<PeerStatus> getLastFetchedQueryablePeers() {
        return this.peerStatusCache != null ? this.peerStatusCache.getStatuses() : Collections.emptySet();
    }

    @NotNull
    private Set<PeerStatus> getPeerStatuses() {
        if (this.isPeerRefreshNeeded()) {
            this.refreshPeerStatusCache();
        }
        return this.getLastFetchedQueryablePeers();
    }

    private Set<PeerDescription> getPeersToQuery() throws IOException {
        HashSet<PeerDescription> peersToRequestClusterInfoFrom = new HashSet<PeerDescription>();
        Set<PeerStatus> lastFetched = this.getLastFetchedQueryablePeers();
        if (lastFetched != null && !lastFetched.isEmpty()) {
            for (PeerStatus peerStatus : lastFetched) {
                peersToRequestClusterInfoFrom.add(peerStatus.getPeerDescription());
            }
        }
        peersToRequestClusterInfoFrom.add(this.peerStatusProvider.getBootstrapPeerDescription());
        return peersToRequestClusterInfoFrom;
    }

    private boolean isCacheExpired(PeerStatusCache cache) {
        return cache == null || cache.getTimestamp() + PEER_CACHE_MILLIS < System.currentTimeMillis();
    }

    private boolean isPeerRefreshNeeded() {
        return this.peerStatusCache == null || this.peerStatusCache.isEmpty() || this.isCacheExpired(this.peerStatusCache);
    }

    private void persistPeerStatuses(PeerStatusCache peerStatusCache) {
        try {
            this.peerStatusCache = peerStatusCache;
            this.peerPersistence.save(peerStatusCache);
        }
        catch (IOException e) {
            EventReportUtil.error(logger, this.eventReporter, "Failed to persist list of peers due to {}; if restarted and the nodes specified at the remote instance are down, may be unable to transfer data until communications with those nodes are restored", e.toString());
            logger.error("", (Throwable)e);
        }
    }

    private void refreshPeerStatusCache() {
        block3: {
            try {
                Set<PeerDescription> peersToQuery = this.getPeersToQuery();
                Set<PeerStatus> statuses = this.fetchRemotePeerStatuses(peersToQuery);
                if (statuses.isEmpty()) {
                    logger.info("No peers were retrieved from the remote group {}", (Object)peersToQuery.stream().map(p -> p.getHostname() + ":" + p.getPort()).collect(Collectors.joining(",")));
                }
                PeerStatusCache peerStatusCache = new PeerStatusCache(statuses, System.currentTimeMillis(), this.peerStatusProvider.getRemoteInstanceUris(), this.peerStatusProvider.getTransportProtocol());
                this.persistPeerStatuses(peerStatusCache);
                logger.info("Successfully refreshed peer status cache; remote group consists of {} peers", (Object)statuses.size());
            }
            catch (Exception e) {
                EventReportUtil.warn(logger, this.eventReporter, "Unable to refresh remote group peers due to: {}", e.getMessage());
                if (!logger.isDebugEnabled() || e.getCause() == null) break block3;
                logger.warn("Caused by: ", (Throwable)e);
            }
        }
    }
}

