From dd274713d7098ab54c81516d67d700c53fc01e56 Mon Sep 17 00:00:00 2001 From: Thierry Parmentelat Date: Wed, 27 Jun 2012 11:09:29 +0200 Subject: [PATCH] keep both coreshed_vs and coresched_lxc - cgroups in the lxc package coresched being not a plugin, we had to mess with database, hopefully this is temporary --- coresched.py => coresched_lxc.py | 0 coresched_vs.py | 378 +++++++++++++++++++++++++++++++ database.py | 10 +- setup-lib.py | 2 - setup-lxc.py | 2 + setup-vs.py | 1 + 6 files changed, 388 insertions(+), 5 deletions(-) rename coresched.py => coresched_lxc.py (100%) create mode 100644 coresched_vs.py diff --git a/coresched.py b/coresched_lxc.py similarity index 100% rename from coresched.py rename to coresched_lxc.py diff --git a/coresched_vs.py b/coresched_vs.py new file mode 100644 index 0000000..d064e93 --- /dev/null +++ b/coresched_vs.py @@ -0,0 +1,378 @@ +"""Whole core scheduling + +""" + +import logger +import os + +glo_coresched_simulate = False + +class CoreSched: + """ Whole-core scheduler + + The main entrypoint is adjustCores(self, slivers) which takes a + dictionary of sliver records. The cpu_cores field is pulled from the + effective rspec (rec["_rspec"]) for each sliver. + + If cpu_cores > 0 for a sliver, then that sliver will reserve one or + more of the cpu_cores on the machine. + + One core is always left unreserved for system slices. + """ + + def __init__(self, cgroup_var_name="cpuset.cpus", slice_attr_name="cpu_cores"): + self.cpus = [] + self.cgroup_var_name = cgroup_var_name + self.slice_attr_name = slice_attr_name + self.cgroup_mem_name = "cpuset.mems" + self.mems=[] + self.mems_map={} + self.cpu_siblings={} + + def get_cgroup_var(self, name=None, filename=None): + """ decode cpuset.cpus or cpuset.mems into a list of units that can + be reserved. + """ + + assert(filename!=None or name!=None) + + if filename==None: + filename="/dev/cgroup/" + name + + data = open(filename).readline().strip() + + if not data: + return [] + + units = [] + + # cpuset.cpus could be something as arbitrary as: + # 0,1,2-3,4,5-6 + # deal with commas and ranges + for part in data.split(","): + unitRange = part.split("-") + if len(unitRange) == 1: + unitRange = (unitRange[0], unitRange[0]) + for i in range(int(unitRange[0]), int(unitRange[1])+1): + if not i in units: + units.append(i) + + return units + + def get_cpus(self): + """ return a list of available cpu identifiers: [0,1,2,3...] + """ + + # the cpus never change, so if it's already been computed then don't + # worry about it. + if self.cpus!=[]: + return self.cpus + + self.cpus = self.get_cgroup_var(self.cgroup_var_name) + + self.cpu_siblings = {} + for item in self.cpus: + self.cpu_siblings[item] = self.get_core_siblings(item) + + return self.cpus + + def find_cpu_mostsiblings(self, cpus): + bestCount = -1 + bestCpu = -1 + for cpu in cpus: + count = 0 + for candidate in self.cpu_siblings[cpu]: + if candidate in cpus: + count = count + 1 + if (count > bestCount): + bestCount = count + bestCpu = cpu + + assert(bestCpu >= 0) + return bestCpu + + + def find_compatible_cpu(self, cpus, compatCpu): + if compatCpu==None: + return self.find_cpu_mostsiblings(cpus) + + # find a sibling if we can + bestDelta = None + bestCpu = None + for cpu in cpus: + if compatCpu in self.cpu_siblings[cpu]: + return cpu + + return self.find_cpu_mostsiblings(cpus) + + def get_cgroups (self): + """ return a list of cgroups + this might change as vservers are instantiated, so always compute + it dynamically. + """ + cgroups = [] + filenames = os.listdir("/dev/cgroup") + for filename in filenames: + if os.path.isdir(os.path.join("/dev/cgroup", filename)): + cgroups.append(filename) + return cgroups + + def decodeCoreSpec (self, cores): + """ Decode the value of the core attribute. It's a number, followed by + an optional letter "b" to indicate besteffort cores should also + be supplied. + """ + bestEffort = False + + if cores.endswith("b"): + cores = cores[:-1] + bestEffort = True + + try: + cores = int(cores) + except ValueError: + cores = 0 + + return (cores, bestEffort) + + def adjustCores (self, slivers): + """ slivers is a dict of {sliver_name: rec} + rec is a dict of attributes + rec['_rspec'] is the effective rspec + """ + + cpus = self.get_cpus()[:] + mems = self.get_mems()[:] + + memSchedule=True + if (len(mems) != len(cpus)): + logger.log("CoreSched fewer mems than " + self.cgroup_var_name + "; mem scheduling disabled") + memSchedule=False + + logger.log("CoreSched (" + self.cgroup_var_name + "): available units: " + str(cpus)) + + reservations = {} + mem_reservations = {} + + # allocate the cores to the slivers that have them reserved + # TODO: Need to sort this from biggest cpu_cores to smallest + for name, rec in slivers.iteritems(): + rspec = rec["_rspec"] + cores = rspec.get(self.slice_attr_name, 0) + (cores, bestEffort) = self.decodeCoreSpec(cores) + + lastCpu = None + + while (cores>0): + # one cpu core reserved for best effort and system slices + if len(cpus)<=1: + logger.log("CoreSched: ran out of units while scheduling sliver " + name) + else: + cpu = self.find_compatible_cpu(cpus, lastCpu) + cpus.remove(cpu) + lastCpu = cpu + + logger.log("CoreSched: allocating unit " + str(cpu) + " to slice " + name) + reservations[name] = reservations.get(name,[]) + [cpu] + + # now find a memory node to go with the cpu + if memSchedule: + mem = self.find_associated_memnode(mems, cpu) + if mem != None: + mems.remove(mem) + logger.log("CoreSched: allocating memory node " + str(mem) + " to slice " + name) + mem_reservations[name] = mem_reservations.get(name,[]) + [mem] + else: + logger.log("CoreSched: failed to find memory node for cpu" + str(cpu)) + + cores = cores-1 + + # the leftovers go to everyone else + logger.log("CoreSched: allocating unit " + str(cpus) + " to _default") + reservations["_default"] = cpus[:] + mem_reservations["_default"] = mems[:] + + # now check and see if any of our slices had the besteffort flag + # set + for name, rec in slivers.iteritems(): + rspec = rec["_rspec"] + cores = rspec.get(self.slice_attr_name, 0) + (cores, bestEffort) = self.decodeCoreSpec(cores) + + # if the bestEffort flag isn't set then we have nothing to do + if not bestEffort: + continue + + # note that if a reservation is [], then we don't need to add + # bestEffort cores to it, since it is bestEffort by default. + + if reservations.get(name,[]) != []: + reservations[name] = reservations[name] + reservations["_default"] + mem_reservations[name] = mem_reservations.get(name,[]) + mem_reservations["_default"] + logger.log("CoreSched: adding besteffort units to " + name + ". new units = " + str(reservations[name])) + + self.reserveUnits(self.cgroup_var_name, reservations) + + self.reserveUnits(self.cgroup_mem_name, mem_reservations) + + def reserveUnits (self, var_name, reservations): + """ give a set of reservations (dictionary of slicename:cpuid_list), + write those reservations to the appropriate cgroup files. + + reservations["_default"] is assumed to be the default reservation + for slices that do not reserve cores. It's essentially the leftover + cpu cores. + """ + + default = reservations["_default"] + + # set the default vserver cpuset. this will deal with any vservers + # that might be created before the nodemanager has had a chance to + # update the cpusets. + self.reserveDefault(var_name, default) + + for cgroup in self.get_cgroups(): + if cgroup in reservations: + cpus = reservations[cgroup] + logger.log("CoreSched: reserving " + var_name + " on " + cgroup + ": " + str(cpus)) + else: + # no log message for default; too much verbosity in the common case + cpus = default + + if glo_coresched_simulate: + print "R", "/dev/cgroup/" + cgroup + "/" + var_name, self.listToRange(cpus) + else: + file("/dev/cgroup/" + cgroup + "/" + var_name, "w").write( self.listToRange(cpus) + "\n" ) + + def reserveDefault (self, var_name, cpus): + if not os.path.exists("/etc/vservers/.defaults/cgroup"): + os.makedirs("/etc/vservers/.defaults/cgroup") + + if glo_coresched_simulate: + print "RDEF", "/etc/vservers/.defaults/cgroup/" + var_name, self.listToRange(cpus) + else: + file("/etc/vservers/.defaults/cgroup/" + var_name, "w").write( self.listToRange(cpus) + "\n" ) + + def listToRange (self, list): + """ take a list of items [1,2,3,5,...] and return it as a range: "1-3,5" + for now, just comma-separate + """ + return ",".join( [str(i) for i in list] ) + + def get_mems(self): + """ return a list of available cpu identifiers: [0,1,2,3...] + """ + + # the cpus never change, so if it's already been computed then don't + # worry about it. + if self.mems!=[]: + return self.mems + + self.mems = self.get_cgroup_var(self.cgroup_mem_name) + + # build a mapping from memory nodes to the cpus they can be used with + + mems_map={} + for item in self.mems: + mems_map[item] = self.get_memnode_cpus(item) + + if (len(mems_map)>0): + # when NUMA_EMU is enabled, only the last memory node will contain + # the cpu_map. For example, if there were originally 2 nodes and + # we used NUM_EMU to raise it to 12, then + # mems_map[0]=[] + # ... + # mems_map[4]=[] + # mems_map[5]=[1,3,5,7,9,11] + # mems_map[6]=[] + # ... + # mems_map[10]=[] + # mems_map[11]=[0,2,4,6,8,10] + # so, we go from back to front, copying the entries as necessary. + + if mems_map[self.mems[0]] == []: + work = [] + for item in reversed(self.mems): + if mems_map[item]!=[]: + work = mems_map[item] + else: # mems_map[item]==[] + mems_map[item] = work + + self.mems_map = mems_map + + return self.mems + + def find_associated_memnode(self, mems, cpu): + """ Given a list of memory nodes and a cpu, see if one of the nodes in + the list can be used with that cpu. + """ + for item in mems: + if cpu in self.mems_map[item]: + return item + return None + + def get_memnode_cpus(self, index): + """ for a given memory node, return the CPUs that it is associated + with. + """ + fn = "/sys/devices/system/node/node" + str(index) + "/cpulist" + if not os.path.exists(fn): + logger.log("CoreSched: failed to locate memory node" + fn) + return [] + + return self.get_cgroup_var(filename=fn) + + def get_core_siblings(self, index): + # use core_siblings rather than core_siblings_list, as it's compatible + # with older kernels + fn = "/sys/devices/system/cpu/cpu" + str(index) + "/topology/core_siblings" + if not os.path.exists(fn): + return [] + siblings = [] + + x = int(open(fn,"rt").readline().strip(),16) + cpuid = 0 + while (x>0): + if (x&1)!=0: + siblings.append(cpuid) + x = x >> 1 + cpuid += 1 + + return siblings + + +# a little self-test +if __name__=="__main__": + glo_coresched_simulate = True + + x = CoreSched() + + print "cgroups:", ",".join(x.get_cgroups()) + + print "cpus:", x.listToRange(x.get_cpus()) + print "sibling map:" + for item in x.get_cpus(): + print " ", item, ",".join([str(y) for y in x.cpu_siblings.get(item,[])]) + + print "mems:", x.listToRange(x.get_mems()) + print "cpu to memory map:" + for item in x.get_mems(): + print " ", item, ",".join([str(y) for y in x.mems_map.get(item,[])]) + + rspec_sl_test1 = {"cpu_cores": "1"} + rec_sl_test1 = {"_rspec": rspec_sl_test1} + + rspec_sl_test2 = {"cpu_cores": "5"} + rec_sl_test2 = {"_rspec": rspec_sl_test2} + + rspec_sl_test3 = {"cpu_cores": "3b"} + rec_sl_test3 = {"_rspec": rspec_sl_test3} + + #slivers = {"sl_test1": rec_sl_test1, "sl_test2": rec_sl_test2} + + slivers = {"arizona_beta": rec_sl_test1, "arizona_test101": rec_sl_test2, "pl_sirius": rec_sl_test3} + + #slivers = {"arizona_beta": rec_sl_test1, "arizona_logmon": rec_sl_test2, "arizona_owl": rec_sl_test3} + + x.adjustCores(slivers) + diff --git a/database.py b/database.py index 96664af..7064335 100644 --- a/database.py +++ b/database.py @@ -18,11 +18,15 @@ import threading import time import account -import coresched import logger import tools import bwmon +# hopefully temporary +# is there a good reason to have this done here and not in a plugin ? +try: from coresched_lxc import CoreSched +except: from coresched_vs import CoreSched + # We enforce minimum allocations to keep the clueless from hosing their slivers. # Disallow disk loans because there's currently no way to punish slivers over quota. MINIMUM_ALLOCATION = {'cpu_pct': 0, @@ -120,8 +124,8 @@ It may be necessary in the future to do something smarter.""" self._compute_effective_rspecs() try: - x = coresched.CoreSched() - x.adjustCores(self) + coresched = CoreSched() + coresched.adjustCores(self) except: logger.log_exc("database: exception while doing core sched") diff --git a/setup-lib.py b/setup-lib.py index 32fc7a8..7d6d255 100644 --- a/setup-lib.py +++ b/setup-lib.py @@ -15,11 +15,9 @@ setup( 'api_calls', 'bwmon', 'bwlimitlxc', - 'cgroups', 'conf_files', 'config', 'controller', - 'coresched', 'curlwrapper', 'database', 'initscript', diff --git a/setup-lxc.py b/setup-lxc.py index 7948e0f..50b77a3 100644 --- a/setup-lxc.py +++ b/setup-lxc.py @@ -12,6 +12,8 @@ setup( py_modules=[ 'sliver_libvirt', 'sliver_lxc', + 'cgroups', + 'coresched_lxc', ], scripts = [ ], diff --git a/setup-vs.py b/setup-vs.py index a2032dc..88be064 100644 --- a/setup-vs.py +++ b/setup-vs.py @@ -11,6 +11,7 @@ from distutils.core import setup, Extension setup( py_modules=[ 'sliver_vs', + 'coresched_vs', ], scripts = [ ], -- 2.43.0