9 from functools import reduce
11 glo_coresched_simulate = False
12 joinpath = os.path.join
18 The main entrypoint is adjustCores(self, slivers) which takes a
19 dictionary of sliver records. The cpu_cores field is pulled from the
20 effective rspec (rec["_rspec"]) for each sliver.
22 If cpu_cores > 0 for a sliver, then that sliver will reserve one or
23 more of the cpu_cores on the machine.
25 One core is always left unreserved for system slices.
28 def __init__(self, cgroup_var_name="cpuset.cpus", slice_attr_name="cpu_cores"):
30 self.cgroup_var_name = cgroup_var_name
31 self.slice_attr_name = slice_attr_name
32 self.cgroup_mem_name = "cpuset.mems"
37 def get_cgroup_var(self, name=None, subsys=None, filename=None):
39 decode cpuset.cpus or cpuset.mems into a list of units that can
43 assert(filename!=None or name!=None)
46 # filename="/dev/cgroup/" + name
47 filename = reduce(lambda a, b: joinpath(a, b) if b else a, [subsys, name],
48 cgroups.get_base_path())
50 data = open(filename).readline().strip()
57 # cpuset.cpus could be something as arbitrary as:
59 # deal with commas and ranges
60 for part in data.split(","):
61 unitRange = part.split("-")
62 if len(unitRange) == 1:
63 unitRange = (unitRange[0], unitRange[0])
64 for i in range(int(unitRange[0]), int(unitRange[1])+1):
72 return a list of available cpu identifiers: [0,1,2,3...]
75 # the cpus never change, so if it's already been computed then don't
80 self.cpus = self.get_cgroup_var(self.cgroup_var_name, 'cpuset')
82 self.cpu_siblings = {}
83 for item in self.cpus:
84 self.cpu_siblings[item] = self.get_core_siblings(item)
88 def find_cpu_mostsiblings(self, cpus):
93 for candidate in self.cpu_siblings[cpu]:
96 if (count > bestCount):
104 def find_compatible_cpu(self, cpus, compatCpu):
106 return self.find_cpu_mostsiblings(cpus)
108 # find a sibling if we can
112 if compatCpu in self.cpu_siblings[cpu]:
115 return self.find_cpu_mostsiblings(cpus)
117 def get_cgroups (self):
119 return a list of cgroups
120 this might change as vservers are instantiated,
121 so always compute it dynamically.
123 return cgroups.get_cgroups()
125 #filenames = os.listdir("/dev/cgroup")
126 #for filename in filenames:
127 # if os.path.isdir(os.path.join("/dev/cgroup", filename)):
128 # cgroups.append(filename)
131 def decodeCoreSpec (self, cores):
133 Decode the value of the core attribute.
134 It's a number, followed by an optional letter "b" to indicate besteffort
135 cores should also be supplied.
139 if cores.endswith("b"):
148 return (cores, bestEffort)
150 def adjustCores (self, slivers):
152 slivers is a dict of {sliver_name: rec}
153 rec is a dict of attributes
154 rec['_rspec'] is the effective rspec
157 cpus = self.get_cpus()[:]
158 mems = self.get_mems()[:]
161 if (len(mems) != len(cpus)):
162 logger.log("CoreSched fewer mems than " + self.cgroup_var_name + "; mem scheduling disabled")
165 logger.log("CoreSched (" + self.cgroup_var_name + "): available units: " + str(cpus))
168 mem_reservations = {}
170 # allocate the cores to the slivers that have them reserved
171 # TODO: Need to sort this from biggest cpu_cores to smallest
172 for name, rec in slivers.items():
173 rspec = rec["_rspec"]
174 cores = rspec.get(self.slice_attr_name, 0)
175 (cores, bestEffort) = self.decodeCoreSpec(cores)
180 # one cpu core reserved for best effort and system slices
182 logger.log("CoreSched: ran out of units while scheduling sliver " + name)
184 cpu = self.find_compatible_cpu(cpus, lastCpu)
188 logger.log("CoreSched: allocating unit " + str(cpu) + " to slice " + name)
189 reservations[name] = reservations.get(name, []) + [cpu]
191 # now find a memory node to go with the cpu
193 mem = self.find_associated_memnode(mems, cpu)
196 logger.log("CoreSched: allocating memory node " + str(mem) + " to slice " + name)
197 mem_reservations[name] = mem_reservations.get(name, []) + [mem]
199 logger.log("CoreSched: failed to find memory node for cpu" + str(cpu))
203 # the leftovers go to everyone else
204 logger.log("CoreSched: allocating unit " + str(cpus) + " to _default")
205 reservations["_default"] = cpus[:]
206 mem_reservations["_default"] = mems[:]
210 # now check and see if any of our slices had the besteffort flag
212 for name, rec in slivers.items():
213 rspec = rec["_rspec"]
214 cores = rspec.get(self.slice_attr_name, 0)
215 (cores, bestEffort) = self.decodeCoreSpec(cores)
217 freezable = rspec.get("cpu_freezable", 0)
218 if (cores==0) and (freezable == 1):
219 freezeList[name] = "FROZEN"
221 freezeList[name] = "THAWED"
223 # if the bestEffort flag isn't set then we have nothing to do
227 # note that if a reservation is [], then we don't need to add
228 # bestEffort cores to it, since it is bestEffort by default.
230 if reservations.get(name, []) != []:
231 reservations[name] = reservations[name] + reservations["_default"]
232 mem_reservations[name] = mem_reservations.get(name, []) + mem_reservations["_default"]
233 logger.log("CoreSched: adding besteffort units to " + name + ". new units = " + str(reservations[name]))
235 self.reserveUnits(self.cgroup_var_name, reservations)
237 self.reserveUnits(self.cgroup_mem_name, mem_reservations)
239 self.freezeUnits("freezer.state", freezeList)
241 def freezeUnits (self, var_name, freezeList):
242 for (slicename, freeze) in list(freezeList.items()):
244 cgroup_path = cgroups.get_cgroup_path(slicename, 'freezer')
245 logger.verbose("CoreSched: setting freezer for {} to {} - path={} var={}"
246 .format(slicename,freeze, cgroup_path, var_name))
247 cgroup = os.path.join(cgroup_path, var_name)
249 logger.log("Warning: Could not spot 'freezer' cgroup file for slice {} - ignored".format(slicename))
252 if glo_coresched_simulate:
255 with open(cgroup, "w") as f:
257 except Exception as e:
258 # the cgroup probably didn't exit...
259 logger.log("CoreSched: exception while setting freeze for {} ({})".format(slicename, e))
261 def reserveUnits (self, var_name, reservations):
263 give a set of reservations (dictionary of slicename:cpuid_list),
264 write those reservations to the appropriate cgroup files.
266 reservations["_default"] is assumed to be the default reservation
267 for slices that do not reserve cores. It's essentially the leftover
271 default = reservations["_default"]
273 # set the default vserver cpuset. this will deal with any vservers
274 # that might be created before the nodemanager has had a chance to
275 # update the cpusets.
276 self.reserveDefault(var_name, default)
278 for cgroup in self.get_cgroups():
279 if cgroup in reservations:
280 cpus = reservations[cgroup]
281 logger.log("CoreSched: reserving " + var_name + " on " + cgroup + ": " + str(cpus))
283 # no log message for default; too much verbosity in the common case
286 if glo_coresched_simulate:
287 print("R", cgroup + "/" + var_name, self.listToRange(cpus))
289 cgroups.write(cgroup, var_name, self.listToRange(cpus))
291 def reserveDefault (self, var_name, cpus):
292 #if not os.path.exists("/etc/vservers/.defaults/cgroup"):
293 # os.makedirs("/etc/vservers/.defaults/cgroup")
295 #if glo_coresched_simulate:
296 # print "RDEF", "/etc/vservers/.defaults/cgroup/" + var_name, self.listToRange(cpus)
298 # file("/etc/vservers/.defaults/cgroup/" + var_name, "w").write( self.listToRange(cpus) + "\n" )
301 def listToRange (self, list):
303 take a list of items [1,2,3,5,...] and return it as a range: "1-3,5"
304 for now, just comma-separate
306 return ",".join( [str(i) for i in list] )
310 return a list of available cpu identifiers: [0,1,2,3...]
313 # the cpus never change, so if it's already been computed then don't
318 self.mems = self.get_cgroup_var(self.cgroup_mem_name, 'cpuset')
320 # build a mapping from memory nodes to the cpus they can be used with
323 for item in self.mems:
324 mems_map[item] = self.get_memnode_cpus(item)
326 if (len(mems_map)>0):
327 # when NUMA_EMU is enabled, only the last memory node will contain
328 # the cpu_map. For example, if there were originally 2 nodes and
329 # we used NUM_EMU to raise it to 12, then
333 # mems_map[5]=[1,3,5,7,9,11]
337 # mems_map[11]=[0,2,4,6,8,10]
338 # so, we go from back to front, copying the entries as necessary.
340 if mems_map[self.mems[0]] == []:
342 for item in reversed(self.mems):
343 if mems_map[item]!=[]:
344 work = mems_map[item]
345 else: # mems_map[item]==[]
346 mems_map[item] = work
348 self.mems_map = mems_map
352 def find_associated_memnode(self, mems, cpu):
354 Given a list of memory nodes and a cpu, see if one of the nodes in
355 the list can be used with that cpu.
358 if cpu in self.mems_map[item]:
362 def get_memnode_cpus(self, index):
364 for a given memory node, return the CPUs that it is associated with.
366 fn = "/sys/devices/system/node/node" + str(index) + "/cpulist"
367 if not os.path.exists(fn):
368 logger.log("CoreSched: failed to locate memory node" + fn)
371 return self.get_cgroup_var(filename=fn)
373 def get_core_siblings(self, index):
374 # use core_siblings rather than core_siblings_list, as it's compatible
376 fn = "/sys/devices/system/cpu/cpu" + str(index) + "/topology/core_siblings"
377 if not os.path.exists(fn):
381 x = open(fn, 'rt').readline().strip().split(',')[-1]
387 siblings.append(cpuid)
395 if __name__=="__main__":
396 glo_coresched_simulate = True
400 print("cgroups:", ",".join(x.get_cgroups()))
402 print("cpus:", x.listToRange(x.get_cpus()))
403 print("sibling map:")
404 for item in x.get_cpus():
405 print(" ", item, ",".join([str(y) for y in x.cpu_siblings.get(item, [])]))
407 print("mems:", x.listToRange(x.get_mems()))
408 print("cpu to memory map:")
409 for item in x.get_mems():
410 print(" ", item, ",".join([str(y) for y in x.mems_map.get(item, [])]))
412 rspec_sl_test1 = {"cpu_cores": "1"}
413 rec_sl_test1 = {"_rspec": rspec_sl_test1}
415 rspec_sl_test2 = {"cpu_cores": "5"}
416 rec_sl_test2 = {"_rspec": rspec_sl_test2}
418 rspec_sl_test3 = {"cpu_cores": "3b"}
419 rec_sl_test3 = {"_rspec": rspec_sl_test3}
421 #slivers = {"sl_test1": rec_sl_test1, "sl_test2": rec_sl_test2}
423 slivers = {"arizona_beta": rec_sl_test1, "arizona_test101": rec_sl_test2, "pl_sirius": rec_sl_test3}
425 #slivers = {"arizona_beta": rec_sl_test1, "arizona_logmon": rec_sl_test2, "arizona_owl": rec_sl_test3}
427 x.adjustCores(slivers)