-# $Id$
-# $URL$
-
"""Whole core scheduling
"""
import logger
import os
+import cgroups
+
+glo_coresched_simulate = False
+joinpath = os.path.join
class CoreSched:
""" Whole-core scheduler
One core is always left unreserved for system slices.
"""
- def __init__(self):
+ def __init__(self, cgroup_var_name="cpuset.cpus", slice_attr_name="cpu_cores"):
self.cpus = []
+ self.cgroup_var_name = cgroup_var_name
+ self.slice_attr_name = slice_attr_name
+ self.cgroup_mem_name = "cpuset.mems"
+ self.mems=[]
+ self.mems_map={}
+ self.cpu_siblings={}
+
+ def get_cgroup_var(self, name=None, subsys=None, filename=None):
+ """ decode cpuset.cpus or cpuset.mems into a list of units that can
+ be reserved.
+ """
+
+ assert(filename!=None or name!=None)
+
+ if filename==None:
+ # filename="/dev/cgroup/" + name
+ filename = reduce(lambda a, b: joinpath(a, b) if b else a, [subsys, name],
+ cgroups.get_base_path())
+
+ data = open(filename).readline().strip()
+
+ if not data:
+ return []
+
+ units = []
+
+ # cpuset.cpus could be something as arbitrary as:
+ # 0,1,2-3,4,5-6
+ # deal with commas and ranges
+ for part in data.split(","):
+ unitRange = part.split("-")
+ if len(unitRange) == 1:
+ unitRange = (unitRange[0], unitRange[0])
+ for i in range(int(unitRange[0]), int(unitRange[1])+1):
+ if not i in units:
+ units.append(i)
+
+ return units
def get_cpus(self):
""" return a list of available cpu identifiers: [0,1,2,3...]
if self.cpus!=[]:
return self.cpus
- cpuset_cpus = open("/dev/cgroup/cpuset.cpus").readline().strip()
+ self.cpus = self.get_cgroup_var(self.cgroup_var_name, 'cpuset')
- # cpuset.cpus could be something as arbitrary as:
- # 0,1,2-3,4,5-6
- # deal with commas and ranges
- for part in cpuset_cpus.split(","):
- cpuRange = part.split("-")
- if len(cpuRange) == 1:
- cpuRange = (cpuRange[0], cpuRange[0])
- for i in range(int(cpuRange[0]), int(cpuRange[1])+1):
- if not i in self.cpus:
- self.cpus.append(i)
+ self.cpu_siblings = {}
+ for item in self.cpus:
+ self.cpu_siblings[item] = self.get_core_siblings(item)
- return self.cpus
+ return self.cpus
+
+ def find_cpu_mostsiblings(self, cpus):
+ bestCount = -1
+ bestCpu = -1
+ for cpu in cpus:
+ count = 0
+ for candidate in self.cpu_siblings[cpu]:
+ if candidate in cpus:
+ count = count + 1
+ if (count > bestCount):
+ bestCount = count
+ bestCpu = cpu
+
+ assert(bestCpu >= 0)
+ return bestCpu
+
+
+ def find_compatible_cpu(self, cpus, compatCpu):
+ if compatCpu==None:
+ return self.find_cpu_mostsiblings(cpus)
+
+ # find a sibling if we can
+ bestDelta = None
+ bestCpu = None
+ for cpu in cpus:
+ if compatCpu in self.cpu_siblings[cpu]:
+ return cpu
+
+ return self.find_cpu_mostsiblings(cpus)
def get_cgroups (self):
""" return a list of cgroups
this might change as vservers are instantiated, so always compute
it dynamically.
"""
- cgroups = []
- filenames = os.listdir("/dev/cgroup")
- for filename in filenames:
- if os.path.isdir(os.path.join("/dev/cgroup", filename)):
- cgroups.append(filename)
- return cgroups
+ return cgroups.get_cgroups()
+ #cgroups = []
+ #filenames = os.listdir("/dev/cgroup")
+ #for filename in filenames:
+ # if os.path.isdir(os.path.join("/dev/cgroup", filename)):
+ # cgroups.append(filename)
+ #return cgroups
def decodeCoreSpec (self, cores):
""" Decode the value of the core attribute. It's a number, followed by
rec['_rspec'] is the effective rspec
"""
- logger.log("CoreSched: adjusting cores")
-
cpus = self.get_cpus()[:]
+ mems = self.get_mems()[:]
+
+ memSchedule=True
+ if (len(mems) != len(cpus)):
+ logger.log("CoreSched fewer mems than " + self.cgroup_var_name + "; mem scheduling disabled")
+ memSchedule=False
+
+ logger.log("CoreSched (" + self.cgroup_var_name + "): available units: " + str(cpus))
reservations = {}
+ mem_reservations = {}
# allocate the cores to the slivers that have them reserved
+ # TODO: Need to sort this from biggest cpu_cores to smallest
for name, rec in slivers.iteritems():
rspec = rec["_rspec"]
- cores = rspec.get("cpu_cores", 0)
+ cores = rspec.get(self.slice_attr_name, 0)
(cores, bestEffort) = self.decodeCoreSpec(cores)
+ lastCpu = None
+
while (cores>0):
# one cpu core reserved for best effort and system slices
if len(cpus)<=1:
- logger.log("CoreSched: ran out of cpu cores while scheduling: " + name)
+ logger.log("CoreSched: ran out of units while scheduling sliver " + name)
else:
- cpu = cpus.pop()
- logger.log("CoreSched: allocating cpu " + str(cpu) + " to slice " + name)
+ cpu = self.find_compatible_cpu(cpus, lastCpu)
+ cpus.remove(cpu)
+ lastCpu = cpu
+
+ logger.log("CoreSched: allocating unit " + str(cpu) + " to slice " + name)
reservations[name] = reservations.get(name,[]) + [cpu]
+ # now find a memory node to go with the cpu
+ if memSchedule:
+ mem = self.find_associated_memnode(mems, cpu)
+ if mem != None:
+ mems.remove(mem)
+ logger.log("CoreSched: allocating memory node " + str(mem) + " to slice " + name)
+ mem_reservations[name] = mem_reservations.get(name,[]) + [mem]
+ else:
+ logger.log("CoreSched: failed to find memory node for cpu" + str(cpu))
+
cores = cores-1
# the leftovers go to everyone else
- logger.log("CoreSched: allocating cpus " + str(cpus) + " to _default")
+ logger.log("CoreSched: allocating unit " + str(cpus) + " to _default")
reservations["_default"] = cpus[:]
+ mem_reservations["_default"] = mems[:]
- # now check and see if any of our reservations had the besteffort flag
+ # now check and see if any of our slices had the besteffort flag
# set
for name, rec in slivers.iteritems():
rspec = rec["_rspec"]
- cores = rspec.get("cpu_cores", 0)
+ cores = rspec.get(self.slice_attr_name, 0)
(cores, bestEffort) = self.decodeCoreSpec(cores)
- if not (reservations.get(name,[])):
- # if there is no reservation for this slice, then it's already
- # besteffort by default.
+ # if the bestEffort flag isn't set then we have nothing to do
+ if not bestEffort:
continue
- if bestEffort:
+ # note that if a reservation is [], then we don't need to add
+ # bestEffort cores to it, since it is bestEffort by default.
+
+ if reservations.get(name,[]) != []:
reservations[name] = reservations[name] + reservations["_default"]
- logger.log("CoreSched: adding besteffort cores to " + name + ". new cores = " + str(reservations[name]))
+ mem_reservations[name] = mem_reservations.get(name,[]) + mem_reservations["_default"]
+ logger.log("CoreSched: adding besteffort units to " + name + ". new units = " + str(reservations[name]))
- self.reserveCores(reservations)
+ self.reserveUnits(self.cgroup_var_name, reservations)
- def reserveCores (self, reservations):
+ self.reserveUnits(self.cgroup_mem_name, mem_reservations)
+
+ def reserveUnits (self, var_name, reservations):
""" give a set of reservations (dictionary of slicename:cpuid_list),
write those reservations to the appropriate cgroup files.
# set the default vserver cpuset. this will deal with any vservers
# that might be created before the nodemanager has had a chance to
# update the cpusets.
- self.reserveDefault(default)
+ self.reserveDefault(var_name, default)
for cgroup in self.get_cgroups():
- cpus = reservations.get(cgroup, default)
-
- logger.log("CoreSched: reserving " + cgroup + " " + str(cpus))
-
- file("/dev/cgroup/" + cgroup + "/cpuset.cpus", "w").write( self.listToRange(cpus) + "\n" )
-
- def reserveDefault (self, cpus):
- if not os.path.exists("/etc/vservers/.defaults/cgroup"):
- os.makedirs("/etc/vservers/.defaults/cgroup")
-
- file("/etc/vservers/.defaults/cgroup/cpuset.cpus", "w").write( self.listToRange(cpus) + "\n" )
+ if cgroup in reservations:
+ cpus = reservations[cgroup]
+ logger.log("CoreSched: reserving " + var_name + " on " + cgroup + ": " + str(cpus))
+ else:
+ # no log message for default; too much verbosity in the common case
+ cpus = default
+
+ if glo_coresched_simulate:
+ print "R", "/dev/cgroup/" + cgroup + "/" + var_name, self.listToRange(cpus)
+ else:
+ cgroups.write(cgroup, var_name, self.listToRange(cpus))
+ #file("/dev/cgroup/" + cgroup + "/" + var_name, "w").write( self.listToRange(cpus) + "\n" )
+
+ def reserveDefault (self, var_name, cpus):
+ #if not os.path.exists("/etc/vservers/.defaults/cgroup"):
+ # os.makedirs("/etc/vservers/.defaults/cgroup")
+
+ #if glo_coresched_simulate:
+ # print "RDEF", "/etc/vservers/.defaults/cgroup/" + var_name, self.listToRange(cpus)
+ #else:
+ # file("/etc/vservers/.defaults/cgroup/" + var_name, "w").write( self.listToRange(cpus) + "\n" )
+ pass
def listToRange (self, list):
""" take a list of items [1,2,3,5,...] and return it as a range: "1-3,5"
"""
return ",".join( [str(i) for i in list] )
+ def get_mems(self):
+ """ return a list of available cpu identifiers: [0,1,2,3...]
+ """
+
+ # the cpus never change, so if it's already been computed then don't
+ # worry about it.
+ if self.mems!=[]:
+ return self.mems
+
+ self.mems = self.get_cgroup_var(self.cgroup_mem_name, 'cpuset')
+
+ # build a mapping from memory nodes to the cpus they can be used with
+
+ mems_map={}
+ for item in self.mems:
+ mems_map[item] = self.get_memnode_cpus(item)
+
+ if (len(mems_map)>0):
+ # when NUMA_EMU is enabled, only the last memory node will contain
+ # the cpu_map. For example, if there were originally 2 nodes and
+ # we used NUM_EMU to raise it to 12, then
+ # mems_map[0]=[]
+ # ...
+ # mems_map[4]=[]
+ # mems_map[5]=[1,3,5,7,9,11]
+ # mems_map[6]=[]
+ # ...
+ # mems_map[10]=[]
+ # mems_map[11]=[0,2,4,6,8,10]
+ # so, we go from back to front, copying the entries as necessary.
+
+ if mems_map[self.mems[0]] == []:
+ work = []
+ for item in reversed(self.mems):
+ if mems_map[item]!=[]:
+ work = mems_map[item]
+ else: # mems_map[item]==[]
+ mems_map[item] = work
+
+ self.mems_map = mems_map
+
+ return self.mems
+
+ def find_associated_memnode(self, mems, cpu):
+ """ Given a list of memory nodes and a cpu, see if one of the nodes in
+ the list can be used with that cpu.
+ """
+ for item in mems:
+ if cpu in self.mems_map[item]:
+ return item
+ return None
+
+ def get_memnode_cpus(self, index):
+ """ for a given memory node, return the CPUs that it is associated
+ with.
+ """
+ fn = "/sys/devices/system/node/node" + str(index) + "/cpulist"
+ if not os.path.exists(fn):
+ logger.log("CoreSched: failed to locate memory node" + fn)
+ return []
+
+ return self.get_cgroup_var(filename=fn)
+
+ def get_core_siblings(self, index):
+ # use core_siblings rather than core_siblings_list, as it's compatible
+ # with older kernels
+ fn = "/sys/devices/system/cpu/cpu" + str(index) + "/topology/core_siblings"
+ if not os.path.exists(fn):
+ return []
+ siblings = []
+
+ x = open(fn, 'rt').readline().strip().split(',')[-1]
+ x = int(x, 16)
+
+ cpuid = 0
+ while (x>0):
+ if (x&1)!=0:
+ siblings.append(cpuid)
+ x = x >> 1
+ cpuid += 1
+
+ return siblings
+
+
# a little self-test
if __name__=="__main__":
+ glo_coresched_simulate = True
+
x = CoreSched()
- print "cpus:", x.listToRange(x.get_cpus())
print "cgroups:", ",".join(x.get_cgroups())
- # a quick self-test for ScottLab slices sl_test1 and sl_test2
- # sl_test1 = 1 core
- # sl_test2 = 1 core
+ print "cpus:", x.listToRange(x.get_cpus())
+ print "sibling map:"
+ for item in x.get_cpus():
+ print " ", item, ",".join([str(y) for y in x.cpu_siblings.get(item,[])])
+
+ print "mems:", x.listToRange(x.get_mems())
+ print "cpu to memory map:"
+ for item in x.get_mems():
+ print " ", item, ",".join([str(y) for y in x.mems_map.get(item,[])])
- rspec_sl_test1 = {"cpu_cores": 1}
+ rspec_sl_test1 = {"cpu_cores": "1"}
rec_sl_test1 = {"_rspec": rspec_sl_test1}
- rspec_sl_test2 = {"cpu_cores": 1}
+ rspec_sl_test2 = {"cpu_cores": "5"}
rec_sl_test2 = {"_rspec": rspec_sl_test2}
- slivers = {"sl_test1": rec_sl_test1, "sl_test2": rec_sl_test2}
+ rspec_sl_test3 = {"cpu_cores": "3b"}
+ rec_sl_test3 = {"_rspec": rspec_sl_test3}
+
+ #slivers = {"sl_test1": rec_sl_test1, "sl_test2": rec_sl_test2}
+
+ slivers = {"arizona_beta": rec_sl_test1, "arizona_test101": rec_sl_test2, "pl_sirius": rec_sl_test3}
+
+ #slivers = {"arizona_beta": rec_sl_test1, "arizona_logmon": rec_sl_test2, "arizona_owl": rec_sl_test3}
x.adjustCores(slivers)