""" Whole-core scheduling """ import logger import os import os.path import cgroups from functools import reduce glo_coresched_simulate = False joinpath = os.path.join class CoreSched: """ Whole-core scheduler The main entrypoint is adjustCores(self, slivers) which takes a dictionary of sliver records. The cpu_cores field is pulled from the effective rspec (rec["_rspec"]) for each sliver. If cpu_cores > 0 for a sliver, then that sliver will reserve one or more of the cpu_cores on the machine. One core is always left unreserved for system slices. """ def __init__(self, cgroup_var_name="cpuset.cpus", slice_attr_name="cpu_cores"): self.cpus = [] self.cgroup_var_name = cgroup_var_name self.slice_attr_name = slice_attr_name self.cgroup_mem_name = "cpuset.mems" self.mems=[] self.mems_map={} self.cpu_siblings={} def get_cgroup_var(self, name=None, subsys=None, filename=None): """ decode cpuset.cpus or cpuset.mems into a list of units that can be reserved. """ assert(filename!=None or name!=None) if filename==None: # filename="/dev/cgroup/" + name filename = reduce(lambda a, b: joinpath(a, b) if b else a, [subsys, name], cgroups.get_base_path()) data = open(filename).readline().strip() if not data: return [] units = [] # cpuset.cpus could be something as arbitrary as: # 0,1,2-3,4,5-6 # deal with commas and ranges for part in data.split(","): unitRange = part.split("-") if len(unitRange) == 1: unitRange = (unitRange[0], unitRange[0]) for i in range(int(unitRange[0]), int(unitRange[1])+1): if not i in units: units.append(i) return units def get_cpus(self): """ return a list of available cpu identifiers: [0,1,2,3...] """ # the cpus never change, so if it's already been computed then don't # worry about it. if self.cpus!=[]: return self.cpus self.cpus = self.get_cgroup_var(self.cgroup_var_name, 'cpuset') self.cpu_siblings = {} for item in self.cpus: self.cpu_siblings[item] = self.get_core_siblings(item) return self.cpus def find_cpu_mostsiblings(self, cpus): bestCount = -1 bestCpu = -1 for cpu in cpus: count = 0 for candidate in self.cpu_siblings[cpu]: if candidate in cpus: count = count + 1 if (count > bestCount): bestCount = count bestCpu = cpu assert(bestCpu >= 0) return bestCpu def find_compatible_cpu(self, cpus, compatCpu): if compatCpu==None: return self.find_cpu_mostsiblings(cpus) # find a sibling if we can bestDelta = None bestCpu = None for cpu in cpus: if compatCpu in self.cpu_siblings[cpu]: return cpu return self.find_cpu_mostsiblings(cpus) def get_cgroups (self): """ return a list of cgroups this might change as vservers are instantiated, so always compute it dynamically. """ return cgroups.get_cgroups() #cgroups = [] #filenames = os.listdir("/dev/cgroup") #for filename in filenames: # if os.path.isdir(os.path.join("/dev/cgroup", filename)): # cgroups.append(filename) #return cgroups def decodeCoreSpec (self, cores): """ Decode the value of the core attribute. It's a number, followed by an optional letter "b" to indicate besteffort cores should also be supplied. """ bestEffort = False if cores.endswith("b"): cores = cores[:-1] bestEffort = True try: cores = int(cores) except ValueError: cores = 0 return (cores, bestEffort) def adjustCores (self, slivers): """ slivers is a dict of {sliver_name: rec} rec is a dict of attributes rec['_rspec'] is the effective rspec """ cpus = self.get_cpus()[:] mems = self.get_mems()[:] memSchedule=True if (len(mems) != len(cpus)): logger.log("CoreSched fewer mems than " + self.cgroup_var_name + "; mem scheduling disabled") memSchedule=False logger.log("CoreSched (" + self.cgroup_var_name + "): available units: " + str(cpus)) reservations = {} mem_reservations = {} # allocate the cores to the slivers that have them reserved # TODO: Need to sort this from biggest cpu_cores to smallest for name, rec in slivers.items(): rspec = rec["_rspec"] cores = rspec.get(self.slice_attr_name, 0) (cores, bestEffort) = self.decodeCoreSpec(cores) lastCpu = None while (cores>0): # one cpu core reserved for best effort and system slices if len(cpus)<=1: logger.log("CoreSched: ran out of units while scheduling sliver " + name) else: cpu = self.find_compatible_cpu(cpus, lastCpu) cpus.remove(cpu) lastCpu = cpu logger.log("CoreSched: allocating unit " + str(cpu) + " to slice " + name) reservations[name] = reservations.get(name, []) + [cpu] # now find a memory node to go with the cpu if memSchedule: mem = self.find_associated_memnode(mems, cpu) if mem != None: mems.remove(mem) logger.log("CoreSched: allocating memory node " + str(mem) + " to slice " + name) mem_reservations[name] = mem_reservations.get(name, []) + [mem] else: logger.log("CoreSched: failed to find memory node for cpu" + str(cpu)) cores = cores-1 # the leftovers go to everyone else logger.log("CoreSched: allocating unit " + str(cpus) + " to _default") reservations["_default"] = cpus[:] mem_reservations["_default"] = mems[:] freezeList = {} # now check and see if any of our slices had the besteffort flag # set for name, rec in slivers.items(): rspec = rec["_rspec"] cores = rspec.get(self.slice_attr_name, 0) (cores, bestEffort) = self.decodeCoreSpec(cores) freezable = rspec.get("cpu_freezable", 0) if (cores==0) and (freezable == 1): freezeList[name] = "FROZEN" else: freezeList[name] = "THAWED" # if the bestEffort flag isn't set then we have nothing to do if not bestEffort: continue # note that if a reservation is [], then we don't need to add # bestEffort cores to it, since it is bestEffort by default. if reservations.get(name, []) != []: reservations[name] = reservations[name] + reservations["_default"] mem_reservations[name] = mem_reservations.get(name, []) + mem_reservations["_default"] logger.log("CoreSched: adding besteffort units to " + name + ". new units = " + str(reservations[name])) self.reserveUnits(self.cgroup_var_name, reservations) self.reserveUnits(self.cgroup_mem_name, mem_reservations) self.freezeUnits("freezer.state", freezeList) def freezeUnits (self, var_name, freezeList): for (slicename, freeze) in list(freezeList.items()): try: cgroup_path = cgroups.get_cgroup_path(slicename, 'freezer') logger.verbose("CoreSched: setting freezer for {} to {} - path={} var={}" .format(slicename,freeze, cgroup_path, var_name)) cgroup = os.path.join(cgroup_path, var_name) if not cgroup: logger.log("Warning: Could not spot 'freezer' cgroup file for slice {} - ignored".format(slicename)) break if glo_coresched_simulate: print("F", cgroup) else: with open(cgroup, "w") as f: f.write(freeze) except Exception as e: # the cgroup probably didn't exit... logger.log("CoreSched: exception while setting freeze for {} ({})".format(slicename, e)) def reserveUnits (self, var_name, reservations): """ give a set of reservations (dictionary of slicename:cpuid_list), write those reservations to the appropriate cgroup files. reservations["_default"] is assumed to be the default reservation for slices that do not reserve cores. It's essentially the leftover cpu cores. """ default = reservations["_default"] # set the default vserver cpuset. this will deal with any vservers # that might be created before the nodemanager has had a chance to # update the cpusets. self.reserveDefault(var_name, default) for cgroup in self.get_cgroups(): if cgroup in reservations: cpus = reservations[cgroup] logger.log("CoreSched: reserving " + var_name + " on " + cgroup + ": " + str(cpus)) else: # no log message for default; too much verbosity in the common case cpus = default if glo_coresched_simulate: print("R", cgroup + "/" + var_name, self.listToRange(cpus)) else: cgroups.write(cgroup, var_name, self.listToRange(cpus)) def reserveDefault (self, var_name, cpus): #if not os.path.exists("/etc/vservers/.defaults/cgroup"): # os.makedirs("/etc/vservers/.defaults/cgroup") #if glo_coresched_simulate: # print "RDEF", "/etc/vservers/.defaults/cgroup/" + var_name, self.listToRange(cpus) #else: # file("/etc/vservers/.defaults/cgroup/" + var_name, "w").write( self.listToRange(cpus) + "\n" ) pass def listToRange (self, list): """ take a list of items [1,2,3,5,...] and return it as a range: "1-3,5" for now, just comma-separate """ return ",".join( [str(i) for i in list] ) def get_mems(self): """ return a list of available cpu identifiers: [0,1,2,3...] """ # the cpus never change, so if it's already been computed then don't # worry about it. if self.mems!=[]: return self.mems self.mems = self.get_cgroup_var(self.cgroup_mem_name, 'cpuset') # build a mapping from memory nodes to the cpus they can be used with mems_map={} for item in self.mems: mems_map[item] = self.get_memnode_cpus(item) if (len(mems_map)>0): # when NUMA_EMU is enabled, only the last memory node will contain # the cpu_map. For example, if there were originally 2 nodes and # we used NUM_EMU to raise it to 12, then # mems_map[0]=[] # ... # mems_map[4]=[] # mems_map[5]=[1,3,5,7,9,11] # mems_map[6]=[] # ... # mems_map[10]=[] # mems_map[11]=[0,2,4,6,8,10] # so, we go from back to front, copying the entries as necessary. if mems_map[self.mems[0]] == []: work = [] for item in reversed(self.mems): if mems_map[item]!=[]: work = mems_map[item] else: # mems_map[item]==[] mems_map[item] = work self.mems_map = mems_map return self.mems def find_associated_memnode(self, mems, cpu): """ Given a list of memory nodes and a cpu, see if one of the nodes in the list can be used with that cpu. """ for item in mems: if cpu in self.mems_map[item]: return item return None def get_memnode_cpus(self, index): """ for a given memory node, return the CPUs that it is associated with. """ fn = "/sys/devices/system/node/node" + str(index) + "/cpulist" if not os.path.exists(fn): logger.log("CoreSched: failed to locate memory node" + fn) return [] return self.get_cgroup_var(filename=fn) def get_core_siblings(self, index): # use core_siblings rather than core_siblings_list, as it's compatible # with older kernels fn = "/sys/devices/system/cpu/cpu" + str(index) + "/topology/core_siblings" if not os.path.exists(fn): return [] siblings = [] x = open(fn, 'rt').readline().strip().split(',')[-1] x = int(x, 16) cpuid = 0 while (x>0): if (x&1)!=0: siblings.append(cpuid) x = x >> 1 cpuid += 1 return siblings # a little self-test if __name__=="__main__": glo_coresched_simulate = True x = CoreSched() print("cgroups:", ",".join(x.get_cgroups())) print("cpus:", x.listToRange(x.get_cpus())) print("sibling map:") for item in x.get_cpus(): print(" ", item, ",".join([str(y) for y in x.cpu_siblings.get(item, [])])) print("mems:", x.listToRange(x.get_mems())) print("cpu to memory map:") for item in x.get_mems(): print(" ", item, ",".join([str(y) for y in x.mems_map.get(item, [])])) rspec_sl_test1 = {"cpu_cores": "1"} rec_sl_test1 = {"_rspec": rspec_sl_test1} rspec_sl_test2 = {"cpu_cores": "5"} rec_sl_test2 = {"_rspec": rspec_sl_test2} rspec_sl_test3 = {"cpu_cores": "3b"} rec_sl_test3 = {"_rspec": rspec_sl_test3} #slivers = {"sl_test1": rec_sl_test1, "sl_test2": rec_sl_test2} slivers = {"arizona_beta": rec_sl_test1, "arizona_test101": rec_sl_test2, "pl_sirius": rec_sl_test3} #slivers = {"arizona_beta": rec_sl_test1, "arizona_logmon": rec_sl_test2, "arizona_owl": rec_sl_test3} x.adjustCores(slivers)