cosmetic - slightly changed message when a freezer cgroup file can't be figured out
[nodemanager.git] / coresched_lxc.py
1 """
2 Whole-core scheduling
3 """
4
5 import logger
6 import os
7 import os.path
8 import cgroups
9
10 glo_coresched_simulate = False
11 joinpath = os.path.join
12
13 class CoreSched:
14     """ 
15     Whole-core scheduler
16
17     The main entrypoint is adjustCores(self, slivers) which takes a
18     dictionary of sliver records. The cpu_cores field is pulled from the
19     effective rspec (rec["_rspec"]) for each sliver.
20
21     If cpu_cores > 0 for a sliver, then that sliver will reserve one or
22     more of the cpu_cores on the machine.
23
24     One core is always left unreserved for system slices.
25     """
26
27     def __init__(self, cgroup_var_name="cpuset.cpus", slice_attr_name="cpu_cores"):
28         self.cpus = []
29         self.cgroup_var_name = cgroup_var_name
30         self.slice_attr_name = slice_attr_name
31         self.cgroup_mem_name = "cpuset.mems"
32         self.mems=[]
33         self.mems_map={}
34         self.cpu_siblings={}
35
36     def get_cgroup_var(self, name=None, subsys=None, filename=None):
37         """ 
38         decode cpuset.cpus or cpuset.mems into a list of units that can
39         be reserved.
40         """
41
42         assert(filename!=None or name!=None)
43
44         if filename==None:
45             # filename="/dev/cgroup/" + name
46             filename = reduce(lambda a, b: joinpath(a, b) if b else a, [subsys, name],
47                               cgroups.get_base_path())
48
49         data = open(filename).readline().strip()
50
51         if not data:
52            return []
53
54         units = []
55
56         # cpuset.cpus could be something as arbitrary as:
57         #    0,1,2-3,4,5-6
58         # deal with commas and ranges
59         for part in data.split(","):
60             unitRange = part.split("-")
61             if len(unitRange) == 1:
62                 unitRange = (unitRange[0], unitRange[0])
63             for i in range(int(unitRange[0]), int(unitRange[1])+1):
64                 if not i in units:
65                     units.append(i)
66
67         return units
68
69     def get_cpus(self):
70         """ 
71         return a list of available cpu identifiers: [0,1,2,3...]
72         """
73
74         # the cpus never change, so if it's already been computed then don't
75         # worry about it.
76         if self.cpus!=[]:
77             return self.cpus
78
79         self.cpus = self.get_cgroup_var(self.cgroup_var_name, 'cpuset')
80
81         self.cpu_siblings = {}
82         for item in self.cpus:
83            self.cpu_siblings[item] = self.get_core_siblings(item)
84
85         return self.cpus
86
87     def find_cpu_mostsiblings(self, cpus):
88         bestCount = -1
89         bestCpu = -1
90         for cpu in cpus:
91             count = 0
92             for candidate in self.cpu_siblings[cpu]:
93                 if candidate in cpus:
94                     count = count + 1
95                 if (count > bestCount):
96                     bestCount = count
97                     bestCpu = cpu
98
99         assert(bestCpu >= 0)
100         return bestCpu
101
102
103     def find_compatible_cpu(self, cpus, compatCpu):
104         if compatCpu==None:
105            return self.find_cpu_mostsiblings(cpus)
106
107         # find a sibling if we can
108         bestDelta = None
109         bestCpu = None
110         for cpu in cpus:
111            if compatCpu in self.cpu_siblings[cpu]:
112                return cpu
113
114         return self.find_cpu_mostsiblings(cpus)
115
116     def get_cgroups (self):
117         """ 
118         return a list of cgroups
119         this might change as vservers are instantiated, 
120         so always compute it dynamically.
121         """
122         return cgroups.get_cgroups()
123         #cgroups = []
124         #filenames = os.listdir("/dev/cgroup")
125         #for filename in filenames:
126         #    if os.path.isdir(os.path.join("/dev/cgroup", filename)):
127         #        cgroups.append(filename)
128         #return cgroups
129
130     def decodeCoreSpec (self, cores):
131         """ 
132         Decode the value of the core attribute. 
133         It's a number, followed by an optional letter "b" to indicate besteffort
134         cores should also be supplied.
135         """
136         bestEffort = False
137
138         if cores.endswith("b"):
139            cores = cores[:-1]
140            bestEffort = True
141
142         try:
143             cores = int(cores)
144         except ValueError:
145             cores = 0
146
147         return (cores, bestEffort)
148
149     def adjustCores (self, slivers):
150         """ 
151         slivers is a dict of {sliver_name: rec}
152            rec is a dict of attributes
153            rec['_rspec'] is the effective rspec
154         """
155
156         cpus = self.get_cpus()[:]
157         mems = self.get_mems()[:]
158
159         memSchedule=True
160         if (len(mems) != len(cpus)):
161             logger.log("CoreSched fewer mems than " + self.cgroup_var_name + "; mem scheduling disabled")
162             memSchedule=False
163
164         logger.log("CoreSched (" + self.cgroup_var_name + "): available units: " + str(cpus))
165
166         reservations = {}
167         mem_reservations = {}
168
169         # allocate the cores to the slivers that have them reserved
170         # TODO: Need to sort this from biggest cpu_cores to smallest
171         for name, rec in slivers.iteritems():
172             rspec = rec["_rspec"]
173             cores = rspec.get(self.slice_attr_name, 0)
174             (cores, bestEffort) = self.decodeCoreSpec(cores)
175
176             lastCpu = None
177
178             while (cores>0):
179                 # one cpu core reserved for best effort and system slices
180                 if len(cpus)<=1:
181                     logger.log("CoreSched: ran out of units while scheduling sliver " + name)
182                 else:
183                     cpu = self.find_compatible_cpu(cpus, lastCpu)
184                     cpus.remove(cpu)
185                     lastCpu = cpu
186
187                     logger.log("CoreSched: allocating unit " + str(cpu) + " to slice " + name)
188                     reservations[name] = reservations.get(name, []) + [cpu]
189
190                     # now find a memory node to go with the cpu
191                     if memSchedule:
192                         mem = self.find_associated_memnode(mems, cpu)
193                         if mem != None:
194                             mems.remove(mem)
195                             logger.log("CoreSched: allocating memory node " + str(mem) + " to slice " + name)
196                             mem_reservations[name] = mem_reservations.get(name, []) + [mem]
197                         else:
198                             logger.log("CoreSched: failed to find memory node for cpu" + str(cpu))
199
200                 cores = cores-1
201
202         # the leftovers go to everyone else
203         logger.log("CoreSched: allocating unit " + str(cpus) + " to _default")
204         reservations["_default"] = cpus[:]
205         mem_reservations["_default"] = mems[:]
206
207         freezeList = {}
208
209         # now check and see if any of our slices had the besteffort flag
210         # set
211         for name, rec in slivers.iteritems():
212             rspec = rec["_rspec"]
213             cores = rspec.get(self.slice_attr_name, 0)
214             (cores, bestEffort) = self.decodeCoreSpec(cores)
215
216             freezable = rspec.get("cpu_freezable", 0)
217             if (cores==0) and (freezable == 1):
218                freezeList[name] = "FROZEN"
219             else:
220                freezeList[name] = "THAWED"
221
222             # if the bestEffort flag isn't set then we have nothing to do
223             if not bestEffort:
224                 continue
225
226             # note that if a reservation is [], then we don't need to add
227             # bestEffort cores to it, since it is bestEffort by default.
228
229             if reservations.get(name, []) != []:
230                 reservations[name] = reservations[name] + reservations["_default"]
231                 mem_reservations[name] = mem_reservations.get(name, []) + mem_reservations["_default"]
232                 logger.log("CoreSched: adding besteffort units to " + name + ". new units = " + str(reservations[name]))
233
234         self.reserveUnits(self.cgroup_var_name, reservations)
235
236         self.reserveUnits(self.cgroup_mem_name, mem_reservations)
237
238         self.freezeUnits("freezer.state", freezeList)
239
240     def freezeUnits (self, var_name, freezeList):
241         for (slicename, freeze) in freezeList.items():
242             try:
243                 cgroup_path = cgroups.get_cgroup_path(slicename, 'freezer')
244                 logger.verbose("CoreSched: setting freezer for {} to {} - path={} var={}"
245                                .format(slicename,freeze, cgroup_path, var_name))
246                 cgroup = os.path.join(cgroup_path, var_name)
247                 if not cgroup:
248                     logger.log("Warning: Could not spot 'freezer' cgroup file for slice {} - ignored".format(slicename))
249                     break
250
251                 if glo_coresched_simulate:
252                         print "F", cgroup
253                 else:
254                     with open(cgroup, "w") as f:
255                         f.write(freeze)
256             except Exception as e:
257                 # the cgroup probably didn't exit...
258                 logger.log("CoreSched: exception while setting freeze for {} ({})".format(slicename, e))
259
260     def reserveUnits (self, var_name, reservations):
261         """ 
262         give a set of reservations (dictionary of slicename:cpuid_list),
263         write those reservations to the appropriate cgroup files.
264
265         reservations["_default"] is assumed to be the default reservation
266         for slices that do not reserve cores. It's essentially the leftover
267         cpu cores.
268         """
269
270         default = reservations["_default"]
271
272         # set the default vserver cpuset. this will deal with any vservers
273         # that might be created before the nodemanager has had a chance to
274         # update the cpusets.
275         self.reserveDefault(var_name, default)
276
277         for cgroup in self.get_cgroups():
278             if cgroup in reservations:
279                 cpus = reservations[cgroup]
280                 logger.log("CoreSched: reserving " + var_name + " on " + cgroup + ": " + str(cpus))
281             else:
282                 # no log message for default; too much verbosity in the common case
283                 cpus = default
284
285             if glo_coresched_simulate:
286                 print "R", cgroup + "/" + var_name, self.listToRange(cpus)
287             else:
288                 cgroups.write(cgroup, var_name, self.listToRange(cpus))
289
290     def reserveDefault (self, var_name, cpus):
291         #if not os.path.exists("/etc/vservers/.defaults/cgroup"):
292         #    os.makedirs("/etc/vservers/.defaults/cgroup")
293
294         #if glo_coresched_simulate:
295         #    print "RDEF", "/etc/vservers/.defaults/cgroup/" + var_name, self.listToRange(cpus)
296         #else:
297         #    file("/etc/vservers/.defaults/cgroup/" + var_name, "w").write( self.listToRange(cpus) + "\n" )
298         pass
299
300     def listToRange (self, list):
301         """ 
302         take a list of items [1,2,3,5,...] and return it as a range: "1-3,5"
303         for now, just comma-separate
304         """
305         return ",".join( [str(i) for i in list] )
306
307     def get_mems(self):
308         """ 
309         return a list of available cpu identifiers: [0,1,2,3...]
310         """
311
312         # the cpus never change, so if it's already been computed then don't
313         # worry about it.
314         if self.mems!=[]:
315             return self.mems
316
317         self.mems = self.get_cgroup_var(self.cgroup_mem_name, 'cpuset')
318
319         # build a mapping from memory nodes to the cpus they can be used with
320
321         mems_map={}
322         for item in self.mems:
323            mems_map[item] = self.get_memnode_cpus(item)
324
325         if (len(mems_map)>0):
326             # when NUMA_EMU is enabled, only the last memory node will contain
327             # the cpu_map. For example, if there were originally 2 nodes and
328             # we used NUM_EMU to raise it to 12, then
329             #    mems_map[0]=[]
330             #    ...
331             #    mems_map[4]=[]
332             #    mems_map[5]=[1,3,5,7,9,11]
333             #    mems_map[6]=[]
334             #    ...
335             #    mems_map[10]=[]
336             #    mems_map[11]=[0,2,4,6,8,10]
337             # so, we go from back to front, copying the entries as necessary.
338
339             if mems_map[self.mems[0]] == []:
340                 work = []
341                 for item in reversed(self.mems):
342                     if mems_map[item]!=[]:
343                         work = mems_map[item]
344                     else:  # mems_map[item]==[]
345                         mems_map[item] = work
346
347             self.mems_map = mems_map
348
349         return self.mems
350
351     def find_associated_memnode(self, mems, cpu):
352         """ 
353         Given a list of memory nodes and a cpu, see if one of the nodes in
354         the list can be used with that cpu.
355         """
356         for item in mems:
357             if cpu in self.mems_map[item]:
358                 return item
359         return None
360
361     def get_memnode_cpus(self, index):
362         """
363         for a given memory node, return the CPUs that it is associated with.
364         """
365         fn = "/sys/devices/system/node/node" + str(index) + "/cpulist"
366         if not os.path.exists(fn):
367             logger.log("CoreSched: failed to locate memory node" + fn)
368             return []
369
370         return self.get_cgroup_var(filename=fn)
371
372     def get_core_siblings(self, index):
373         # use core_siblings rather than core_siblings_list, as it's compatible
374         # with older kernels
375         fn = "/sys/devices/system/cpu/cpu" + str(index) + "/topology/core_siblings"
376         if not os.path.exists(fn):
377             return []
378         siblings = []
379
380         x = open(fn, 'rt').readline().strip().split(',')[-1]
381         x = int(x, 16)
382
383         cpuid = 0
384         while (x>0):
385             if (x&1)!=0:
386                 siblings.append(cpuid)
387             x = x >> 1
388             cpuid += 1
389
390         return siblings
391
392
393 # a little self-test
394 if __name__=="__main__":
395     glo_coresched_simulate = True
396
397     x = CoreSched()
398
399     print "cgroups:", ",".join(x.get_cgroups())
400
401     print "cpus:", x.listToRange(x.get_cpus())
402     print "sibling map:"
403     for item in x.get_cpus():
404         print " ", item, ",".join([str(y) for y in x.cpu_siblings.get(item, [])])
405
406     print "mems:", x.listToRange(x.get_mems())
407     print "cpu to memory map:"
408     for item in x.get_mems():
409         print " ", item, ",".join([str(y) for y in x.mems_map.get(item, [])])
410
411     rspec_sl_test1 = {"cpu_cores": "1"}
412     rec_sl_test1 = {"_rspec": rspec_sl_test1}
413
414     rspec_sl_test2 = {"cpu_cores": "5"}
415     rec_sl_test2 = {"_rspec": rspec_sl_test2}
416
417     rspec_sl_test3 = {"cpu_cores": "3b"}
418     rec_sl_test3 = {"_rspec": rspec_sl_test3}
419
420     #slivers = {"sl_test1": rec_sl_test1, "sl_test2": rec_sl_test2}
421
422     slivers = {"arizona_beta": rec_sl_test1, "arizona_test101": rec_sl_test2, "pl_sirius": rec_sl_test3}
423
424     #slivers = {"arizona_beta": rec_sl_test1, "arizona_logmon": rec_sl_test2, "arizona_owl": rec_sl_test3}
425
426     x.adjustCores(slivers)
427