/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;

import com.google.common.collect.HashBasedTable;
import com.google.common.collect.Table;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.CapacitySchedulerPreemptionContext;
import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.CapacitySchedulerPreemptionUtils;
import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.PreemptionCandidatesSelector;
import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.TempQueuePerPartition;
import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.TempSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;

public class QueuePriorityContainerCandidateSelector
extends PreemptionCandidatesSelector {
    private static final Log LOG = LogFactory.getLog(QueuePriorityContainerCandidateSelector.class);
    private long minTimeout;
    private boolean allowMoveReservation;
    private List<RMContainer> reservedContainers;
    private Table<String, String, Boolean> priorityDigraph = HashBasedTable.create();
    private Resource clusterResource;
    private Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates;
    private Resource totalPreemptionAllowed;
    private Map<NodeId, TempSchedulerNode> tempSchedulerNodeMap = new HashMap<NodeId, TempSchedulerNode>();
    private Set<NodeId> touchedNodes;
    private Table<String, String, Resource> toPreemptedFromOtherQueues = HashBasedTable.create();
    private final Comparator<RMContainer> CONTAINER_CREATION_TIME_COMPARATOR = new Comparator<RMContainer>(){

        @Override
        public int compare(RMContainer o1, RMContainer o2) {
            if (QueuePriorityContainerCandidateSelector.this.preemptionAllowed(o1.getQueueName(), o2.getQueueName())) {
                return -1;
            }
            if (QueuePriorityContainerCandidateSelector.this.preemptionAllowed(o2.getQueueName(), o1.getQueueName())) {
                return 1;
            }
            return Long.compare(o1.getCreationTime(), o2.getCreationTime());
        }
    };

    QueuePriorityContainerCandidateSelector(CapacitySchedulerPreemptionContext preemptionContext) {
        super(preemptionContext);
        CapacitySchedulerConfiguration csc = preemptionContext.getScheduler().getConfiguration();
        this.minTimeout = csc.getPUOrderingPolicyUnderUtilizedPreemptionDelay();
        this.allowMoveReservation = csc.getPUOrderingPolicyUnderUtilizedPreemptionMoveReservation();
    }

    private List<TempQueuePerPartition> getPathToRoot(TempQueuePerPartition tq) {
        ArrayList<TempQueuePerPartition> list = new ArrayList<TempQueuePerPartition>();
        while (tq != null) {
            list.add(tq);
            tq = tq.parent;
        }
        return list;
    }

    private void intializePriorityDigraph() {
        LOG.info((Object)"Initializing priority preemption directed graph:");
        for (String q1 : this.preemptionContext.getLeafQueueNames()) {
            for (String q2 : this.preemptionContext.getLeafQueueNames()) {
                if (q1.compareTo(q2) >= 0) continue;
                TempQueuePerPartition tq1 = this.preemptionContext.getQueueByPartition(q1, "");
                TempQueuePerPartition tq2 = this.preemptionContext.getQueueByPartition(q2, "");
                List<TempQueuePerPartition> path1 = this.getPathToRoot(tq1);
                List<TempQueuePerPartition> path2 = this.getPathToRoot(tq2);
                int i = path1.size() - 1;
                int j = path2.size() - 1;
                while (path1.get((int)i).queueName.equals(path2.get((int)j).queueName)) {
                    --i;
                    --j;
                }
                int p1 = path1.get((int)i).relativePriority;
                int p2 = path2.get((int)j).relativePriority;
                if (p1 < p2) {
                    this.priorityDigraph.put((Object)q2, (Object)q1, (Object)true);
                    if (!LOG.isDebugEnabled()) continue;
                    LOG.info((Object)("- Added priority ordering edge: " + q2 + " >> " + q1));
                    continue;
                }
                if (p2 >= p1) continue;
                this.priorityDigraph.put((Object)q1, (Object)q2, (Object)true);
                if (!LOG.isDebugEnabled()) continue;
                LOG.info((Object)("- Added priority ordering edge: " + q1 + " >> " + q2));
            }
        }
    }

    private boolean preemptionAllowed(String demandingQueue, String toBePreemptedQueue) {
        return this.priorityDigraph.contains((Object)demandingQueue, (Object)toBePreemptedQueue);
    }

    private boolean canPreemptEnoughResourceForAsked(Resource requiredResource, String demandingQueue, FiCaSchedulerNode schedulerNode, boolean lookingForNewReservationPlacement, List<RMContainer> newlySelectedContainers) {
        if (this.touchedNodes.contains(schedulerNode.getNodeID())) {
            return false;
        }
        TempSchedulerNode node = this.tempSchedulerNodeMap.get(schedulerNode.getNodeID());
        if (null == node) {
            node = TempSchedulerNode.fromSchedulerNode(schedulerNode);
            this.tempSchedulerNodeMap.put(schedulerNode.getNodeID(), node);
        }
        if (null != schedulerNode.getReservedContainer() && lookingForNewReservationPlacement) {
            return false;
        }
        Resource lacking = Resources.subtract((Resource)requiredResource, (Resource)Resources.subtract((Resource)node.getTotalResource(), (Resource)node.getAllocatedResource()));
        List<RMContainer> runningContainers = node.getRunningContainers();
        Collections.sort(runningContainers, this.CONTAINER_CREATION_TIME_COMPARATOR);
        for (RMContainer runningContainer : runningContainers) {
            if (!CapacitySchedulerPreemptionUtils.isContainerAlreadySelected(runningContainer, this.selectedCandidates)) continue;
            Resources.subtractFrom((Resource)lacking, (Resource)runningContainer.getAllocatedResource());
        }
        if (Resources.fitsIn((ResourceCalculator)this.rc, (Resource)this.clusterResource, (Resource)lacking, (Resource)Resources.none())) {
            return true;
        }
        Resource allowed = Resources.clone((Resource)this.totalPreemptionAllowed);
        Resource selected = Resources.createResource((int)0);
        for (RMContainer runningContainer : runningContainers) {
            if (CapacitySchedulerPreemptionUtils.isContainerAlreadySelected(runningContainer, this.selectedCandidates) || !this.preemptionAllowed(demandingQueue, runningContainer.getQueueName()) || runningContainer.isAMContainer()) continue;
            if (Resources.greaterThanOrEqual((ResourceCalculator)this.rc, (Resource)this.clusterResource, (Resource)allowed, (Resource)runningContainer.getAllocatedResource())) {
                Resources.subtractFrom((Resource)allowed, (Resource)runningContainer.getAllocatedResource());
                Resources.subtractFrom((Resource)lacking, (Resource)runningContainer.getAllocatedResource());
                Resources.addTo((Resource)selected, (Resource)runningContainer.getAllocatedResource());
                if (null != newlySelectedContainers) {
                    newlySelectedContainers.add(runningContainer);
                }
            }
            if (!Resources.fitsIn((ResourceCalculator)this.rc, (Resource)this.clusterResource, (Resource)lacking, (Resource)Resources.none())) continue;
            return true;
        }
        return false;
    }

    private boolean preChecksForMovingReservedContainerToNode(RMContainer reservedContainer, FiCaSchedulerNode newNode) {
        if (reservedContainer.getReservedSchedulerKey().getContainerToUpdate() != null) {
            return false;
        }
        FiCaSchedulerApp app = this.preemptionContext.getScheduler().getApplicationAttempt(reservedContainer.getApplicationAttemptId());
        if (!app.getAppSchedulingInfo().canDelayTo(reservedContainer.getReservedSchedulerKey(), "*")) {
            return false;
        }
        return StringUtils.equals((String)reservedContainer.getNodeLabelExpression(), (String)newNode.getPartition());
    }

    private void tryToMakeBetterReservationPlacement(RMContainer reservedContainer, List<FiCaSchedulerNode> allSchedulerNodes) {
        for (FiCaSchedulerNode targetNode : allSchedulerNodes) {
            if (!this.preChecksForMovingReservedContainerToNode(reservedContainer, targetNode) || !this.canPreemptEnoughResourceForAsked(reservedContainer.getReservedResource(), reservedContainer.getQueueName(), targetNode, true, null)) continue;
            NodeId fromNode = reservedContainer.getNodeId();
            if (!this.preemptionContext.getScheduler().moveReservedContainer(reservedContainer, targetNode)) continue;
            LOG.info((Object)("Successfully moved reserved container=" + reservedContainer.getContainerId() + " from targetNode=" + fromNode + " to targetNode=" + targetNode.getNodeID()));
            this.touchedNodes.add(targetNode.getNodeID());
        }
    }

    private boolean isQueueSatisfied(String demandingQueue, String partition) {
        TempQueuePerPartition tq = this.preemptionContext.getQueueByPartition(demandingQueue, partition);
        if (null == tq) {
            return false;
        }
        Resource guaranteed = tq.getGuaranteed();
        Resource usedDeductReservd = Resources.subtract((Resource)tq.getUsed(), (Resource)tq.getReserved());
        Resource markedToPreemptFromOtherQueue = (Resource)this.toPreemptedFromOtherQueues.get((Object)demandingQueue, (Object)partition);
        if (null == markedToPreemptFromOtherQueue) {
            markedToPreemptFromOtherQueue = Resources.none();
        }
        boolean flag = Resources.greaterThanOrEqual((ResourceCalculator)this.rc, (Resource)this.clusterResource, (Resource)Resources.add((Resource)usedDeductReservd, (Resource)markedToPreemptFromOtherQueue), (Resource)guaranteed);
        return flag;
    }

    private void incToPreempt(String queue, String partition, Resource allocated) {
        Resource total = (Resource)this.toPreemptedFromOtherQueues.get((Object)queue, (Object)partition);
        if (null == total) {
            total = Resources.createResource((int)0);
            this.toPreemptedFromOtherQueues.put((Object)queue, (Object)partition, (Object)total);
        }
        Resources.addTo((Resource)total, (Resource)allocated);
    }

    @Override
    public Map<ApplicationAttemptId, Set<RMContainer>> selectCandidates(Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates, Resource clusterResource, Resource totalPreemptedResourceAllowed) {
        this.priorityDigraph.clear();
        this.intializePriorityDigraph();
        if (this.priorityDigraph.isEmpty()) {
            return selectedCandidates;
        }
        this.selectedCandidates = selectedCandidates;
        this.clusterResource = clusterResource;
        this.totalPreemptionAllowed = totalPreemptedResourceAllowed;
        this.toPreemptedFromOtherQueues.clear();
        this.reservedContainers = new ArrayList<RMContainer>();
        this.tempSchedulerNodeMap.clear();
        this.touchedNodes = new HashSet<NodeId>();
        List<FiCaSchedulerNode> allSchedulerNodes = this.preemptionContext.getScheduler().getAllNodes();
        for (FiCaSchedulerNode node : allSchedulerNodes) {
            RMContainer reservedContainer = node.getReservedContainer();
            if (null == reservedContainer || !this.priorityDigraph.containsRow((Object)reservedContainer.getQueueName())) continue;
            this.reservedContainers.add(reservedContainer);
        }
        Collections.sort(this.reservedContainers, this.CONTAINER_CREATION_TIME_COMPARATOR);
        long currentTime = System.currentTimeMillis();
        for (RMContainer reservedContainer : this.reservedContainers) {
            FiCaSchedulerNode node;
            if (currentTime - reservedContainer.getCreationTime() < this.minTimeout || null == (node = this.preemptionContext.getScheduler().getNode(reservedContainer.getReservedNode()))) continue;
            ArrayList<RMContainer> newlySelectedToBePreemptContainers = new ArrayList<RMContainer>();
            String demandingQueueName = reservedContainer.getQueueName();
            boolean demandingQueueSatisfied = this.isQueueSatisfied(demandingQueueName, node.getPartition());
            boolean canPreempt = false;
            if (!demandingQueueSatisfied) {
                canPreempt = this.canPreemptEnoughResourceForAsked(reservedContainer.getReservedResource(), demandingQueueName, node, false, newlySelectedToBePreemptContainers);
            }
            if (canPreempt) {
                this.touchedNodes.add(node.getNodeID());
                if (LOG.isDebugEnabled()) {
                    LOG.debug((Object)("Trying to preempt following containers to make reserved container=" + reservedContainer.getContainerId() + " on node=" + node.getNodeID() + " can be allocated:"));
                }
                this.incToPreempt(demandingQueueName, node.getPartition(), reservedContainer.getReservedResource());
                for (RMContainer c : newlySelectedToBePreemptContainers) {
                    Set<RMContainer> containers;
                    if (LOG.isDebugEnabled()) {
                        LOG.debug((Object)(" --container=" + c.getContainerId() + " resource=" + c.getReservedResource()));
                    }
                    if (null == (containers = selectedCandidates.get(c.getApplicationAttemptId()))) {
                        containers = new HashSet<RMContainer>();
                        selectedCandidates.put(c.getApplicationAttemptId(), containers);
                    }
                    containers.add(c);
                    Resources.subtractFrom((Resource)totalPreemptedResourceAllowed, (Resource)c.getAllocatedResource());
                }
                continue;
            }
            if (demandingQueueSatisfied || !this.allowMoveReservation) continue;
            this.tryToMakeBetterReservationPlacement(reservedContainer, allSchedulerNodes);
        }
        return selectedCandidates;
    }
}

