fix coresched locating cgroup and reduce verbosity
[nodemanager.git] / coresched_lxc.py
1 """Whole core scheduling
2
3 """
4
5 import logger
6 import os
7 import os.path
8 import cgroups
9
10 glo_coresched_simulate = False
11 joinpath = os.path.join
12
13 class CoreSched:
14     """ Whole-core scheduler
15
16         The main entrypoint is adjustCores(self, slivers) which takes a
17         dictionary of sliver records. The cpu_cores field is pulled from the
18         effective rspec (rec["_rspec"]) for each sliver.
19
20         If cpu_cores > 0 for a sliver, then that sliver will reserve one or
21         more of the cpu_cores on the machine.
22
23         One core is always left unreserved for system slices.
24     """
25
26     def __init__(self, cgroup_var_name="cpuset.cpus", slice_attr_name="cpu_cores"):
27         self.cpus = []
28         self.cgroup_var_name = cgroup_var_name
29         self.slice_attr_name = slice_attr_name
30         self.cgroup_mem_name = "cpuset.mems"
31         self.mems=[]
32         self.mems_map={}
33         self.cpu_siblings={}
34
35     def get_cgroup_var(self, name=None, subsys=None, filename=None):
36         """ decode cpuset.cpus or cpuset.mems into a list of units that can
37             be reserved.
38         """
39
40         assert(filename!=None or name!=None)
41
42         if filename==None:
43             # filename="/dev/cgroup/" + name
44             filename = reduce(lambda a, b: joinpath(a, b) if b else a, [subsys, name],
45                               cgroups.get_base_path())
46
47         data = open(filename).readline().strip()
48
49         if not data:
50            return []
51
52         units = []
53
54         # cpuset.cpus could be something as arbitrary as:
55         #    0,1,2-3,4,5-6
56         # deal with commas and ranges
57         for part in data.split(","):
58             unitRange = part.split("-")
59             if len(unitRange) == 1:
60                 unitRange = (unitRange[0], unitRange[0])
61             for i in range(int(unitRange[0]), int(unitRange[1])+1):
62                 if not i in units:
63                     units.append(i)
64
65         return units
66
67     def get_cpus(self):
68         """ return a list of available cpu identifiers: [0,1,2,3...]
69         """
70
71         # the cpus never change, so if it's already been computed then don't
72         # worry about it.
73         if self.cpus!=[]:
74             return self.cpus
75
76         self.cpus = self.get_cgroup_var(self.cgroup_var_name, 'cpuset')
77
78         self.cpu_siblings = {}
79         for item in self.cpus:
80            self.cpu_siblings[item] = self.get_core_siblings(item)
81
82         return self.cpus
83
84     def find_cpu_mostsiblings(self, cpus):
85         bestCount = -1
86         bestCpu = -1
87         for cpu in cpus:
88             count = 0
89             for candidate in self.cpu_siblings[cpu]:
90                 if candidate in cpus:
91                     count = count + 1
92                 if (count > bestCount):
93                     bestCount = count
94                     bestCpu = cpu
95
96         assert(bestCpu >= 0)
97         return bestCpu
98
99
100     def find_compatible_cpu(self, cpus, compatCpu):
101         if compatCpu==None:
102            return self.find_cpu_mostsiblings(cpus)
103
104         # find a sibling if we can
105         bestDelta = None
106         bestCpu = None
107         for cpu in cpus:
108            if compatCpu in self.cpu_siblings[cpu]:
109                return cpu
110
111         return self.find_cpu_mostsiblings(cpus)
112
113     def get_cgroups (self):
114         """ return a list of cgroups
115             this might change as vservers are instantiated, so always compute
116             it dynamically.
117         """
118         return cgroups.get_cgroups()
119         #cgroups = []
120         #filenames = os.listdir("/dev/cgroup")
121         #for filename in filenames:
122         #    if os.path.isdir(os.path.join("/dev/cgroup", filename)):
123         #        cgroups.append(filename)
124         #return cgroups
125
126     def decodeCoreSpec (self, cores):
127         """ Decode the value of the core attribute. It's a number, followed by
128             an optional letter "b" to indicate besteffort cores should also
129             be supplied.
130         """
131         bestEffort = False
132
133         if cores.endswith("b"):
134            cores = cores[:-1]
135            bestEffort = True
136
137         try:
138             cores = int(cores)
139         except ValueError:
140             cores = 0
141
142         return (cores, bestEffort)
143
144     def adjustCores (self, slivers):
145         """ slivers is a dict of {sliver_name: rec}
146                 rec is a dict of attributes
147                     rec['_rspec'] is the effective rspec
148         """
149
150         cpus = self.get_cpus()[:]
151         mems = self.get_mems()[:]
152
153         memSchedule=True
154         if (len(mems) != len(cpus)):
155             logger.log("CoreSched fewer mems than " + self.cgroup_var_name + "; mem scheduling disabled")
156             memSchedule=False
157
158         logger.log("CoreSched (" + self.cgroup_var_name + "): available units: " + str(cpus))
159
160         reservations = {}
161         mem_reservations = {}
162
163         # allocate the cores to the slivers that have them reserved
164         # TODO: Need to sort this from biggest cpu_cores to smallest
165         for name, rec in slivers.iteritems():
166             rspec = rec["_rspec"]
167             cores = rspec.get(self.slice_attr_name, 0)
168             (cores, bestEffort) = self.decodeCoreSpec(cores)
169
170             lastCpu = None
171
172             while (cores>0):
173                 # one cpu core reserved for best effort and system slices
174                 if len(cpus)<=1:
175                     logger.log("CoreSched: ran out of units while scheduling sliver " + name)
176                 else:
177                     cpu = self.find_compatible_cpu(cpus, lastCpu)
178                     cpus.remove(cpu)
179                     lastCpu = cpu
180
181                     logger.log("CoreSched: allocating unit " + str(cpu) + " to slice " + name)
182                     reservations[name] = reservations.get(name,[]) + [cpu]
183
184                     # now find a memory node to go with the cpu
185                     if memSchedule:
186                         mem = self.find_associated_memnode(mems, cpu)
187                         if mem != None:
188                             mems.remove(mem)
189                             logger.log("CoreSched: allocating memory node " + str(mem) + " to slice " + name)
190                             mem_reservations[name] = mem_reservations.get(name,[]) + [mem]
191                         else:
192                             logger.log("CoreSched: failed to find memory node for cpu" + str(cpu))
193
194                 cores = cores-1
195
196         # the leftovers go to everyone else
197         logger.log("CoreSched: allocating unit " + str(cpus) + " to _default")
198         reservations["_default"] = cpus[:]
199         mem_reservations["_default"] = mems[:]
200
201         freezeList = {}
202
203         # now check and see if any of our slices had the besteffort flag
204         # set
205         for name, rec in slivers.iteritems():
206             rspec = rec["_rspec"]
207             cores = rspec.get(self.slice_attr_name, 0)
208             (cores, bestEffort) = self.decodeCoreSpec(cores)
209
210             freezable = rspec.get("cpu_freezable", 0)
211             if (cores==0) and (freezable == 1):
212                freezeList[name] = "FROZEN"
213             else:
214                freezeList[name] = "THAWED"
215
216             # if the bestEffort flag isn't set then we have nothing to do
217             if not bestEffort:
218                 continue
219
220             # note that if a reservation is [], then we don't need to add
221             # bestEffort cores to it, since it is bestEffort by default.
222
223             if reservations.get(name,[]) != []:
224                 reservations[name] = reservations[name] + reservations["_default"]
225                 mem_reservations[name] = mem_reservations.get(name,[]) + mem_reservations["_default"]
226                 logger.log("CoreSched: adding besteffort units to " + name + ". new units = " + str(reservations[name]))
227
228         self.reserveUnits(self.cgroup_var_name, reservations)
229
230         self.reserveUnits(self.cgroup_mem_name, mem_reservations)
231
232         self.freezeUnits("freezer.state", freezeList)
233
234     def freezeUnits (self, var_name, freezeList):
235         for (cgroup, freeze) in freezeList.items():
236             try:
237                 logger.verbose("CoreSched: setting freezer for " + cgroup + " to " + freeze)
238                 attempts = []
239 #                attempts.append("/dev/cgroup/{}/var_name".format(cgroup, var_name))
240                 attempts.append("/sys/fs/cgroup/freezer/libvirt/lxc/{}/{}".format(cgroup, var_name))
241                 attempts.append("/sys/fs/cgroup/freezer/machine.slice/machine-lxc\\x2d{}.scope/{}".format(cgroup, var_name))
242                 if glo_coresched_simulate:
243                     for attempt in attempts: print "F", attempt
244                 else:
245                     ok = False
246                     for attempt in attempts:
247                         if os.path.exists(attempt):
248                             file(attempt, "w").write(freeze)
249                             ok = True
250                             break
251                     if not ok:
252                         logger.log("CoreSched: could not freezeUnits with {}".format(cgroup))
253             except Exception as e:
254                 # the cgroup probably didn't exit...
255                 logger.log("CoreSched: exception while setting freeze for {} ({})".format(cgroup, e))
256
257     def reserveUnits (self, var_name, reservations):
258         """ 
259         give a set of reservations (dictionary of slicename:cpuid_list),
260         write those reservations to the appropriate cgroup files.
261
262         reservations["_default"] is assumed to be the default reservation
263         for slices that do not reserve cores. It's essentially the leftover
264         cpu cores.
265         """
266
267         default = reservations["_default"]
268
269         # set the default vserver cpuset. this will deal with any vservers
270         # that might be created before the nodemanager has had a chance to
271         # update the cpusets.
272         self.reserveDefault(var_name, default)
273
274         for cgroup in self.get_cgroups():
275             if cgroup in reservations:
276                 cpus = reservations[cgroup]
277                 logger.log("CoreSched: reserving " + var_name + " on " + cgroup + ": " + str(cpus))
278             else:
279                 # no log message for default; too much verbosity in the common case
280                 cpus = default
281
282             if glo_coresched_simulate:
283                 print "R", "/dev/cgroup/" + cgroup + "/" + var_name, self.listToRange(cpus)
284             else:
285                 cgroups.write(cgroup, var_name, self.listToRange(cpus))
286                 #file("/dev/cgroup/" + cgroup + "/" + var_name, "w").write( self.listToRange(cpus) + "\n" )
287
288     def reserveDefault (self, var_name, cpus):
289         #if not os.path.exists("/etc/vservers/.defaults/cgroup"):
290         #    os.makedirs("/etc/vservers/.defaults/cgroup")
291
292         #if glo_coresched_simulate:
293         #    print "RDEF", "/etc/vservers/.defaults/cgroup/" + var_name, self.listToRange(cpus)
294         #else:
295         #    file("/etc/vservers/.defaults/cgroup/" + var_name, "w").write( self.listToRange(cpus) + "\n" )
296         pass
297
298     def listToRange (self, list):
299         """ take a list of items [1,2,3,5,...] and return it as a range: "1-3,5"
300             for now, just comma-separate
301         """
302         return ",".join( [str(i) for i in list] )
303
304     def get_mems(self):
305         """ return a list of available cpu identifiers: [0,1,2,3...]
306         """
307
308         # the cpus never change, so if it's already been computed then don't
309         # worry about it.
310         if self.mems!=[]:
311             return self.mems
312
313         self.mems = self.get_cgroup_var(self.cgroup_mem_name, 'cpuset')
314
315         # build a mapping from memory nodes to the cpus they can be used with
316
317         mems_map={}
318         for item in self.mems:
319            mems_map[item] = self.get_memnode_cpus(item)
320
321         if (len(mems_map)>0):
322             # when NUMA_EMU is enabled, only the last memory node will contain
323             # the cpu_map. For example, if there were originally 2 nodes and
324             # we used NUM_EMU to raise it to 12, then
325             #    mems_map[0]=[]
326             #    ...
327             #    mems_map[4]=[]
328             #    mems_map[5]=[1,3,5,7,9,11]
329             #    mems_map[6]=[]
330             #    ...
331             #    mems_map[10]=[]
332             #    mems_map[11]=[0,2,4,6,8,10]
333             # so, we go from back to front, copying the entries as necessary.
334
335             if mems_map[self.mems[0]] == []:
336                 work = []
337                 for item in reversed(self.mems):
338                     if mems_map[item]!=[]:
339                         work = mems_map[item]
340                     else:  # mems_map[item]==[]
341                         mems_map[item] = work
342
343             self.mems_map = mems_map
344
345         return self.mems
346
347     def find_associated_memnode(self, mems, cpu):
348         """ Given a list of memory nodes and a cpu, see if one of the nodes in
349             the list can be used with that cpu.
350         """
351         for item in mems:
352             if cpu in self.mems_map[item]:
353                 return item
354         return None
355
356     def get_memnode_cpus(self, index):
357         """ for a given memory node, return the CPUs that it is associated
358             with.
359         """
360         fn = "/sys/devices/system/node/node" + str(index) + "/cpulist"
361         if not os.path.exists(fn):
362             logger.log("CoreSched: failed to locate memory node" + fn)
363             return []
364
365         return self.get_cgroup_var(filename=fn)
366
367     def get_core_siblings(self, index):
368         # use core_siblings rather than core_siblings_list, as it's compatible
369         # with older kernels
370         fn = "/sys/devices/system/cpu/cpu" + str(index) + "/topology/core_siblings"
371         if not os.path.exists(fn):
372             return []
373         siblings = []
374
375         x = open(fn, 'rt').readline().strip().split(',')[-1]
376         x = int(x, 16)
377
378         cpuid = 0
379         while (x>0):
380             if (x&1)!=0:
381                 siblings.append(cpuid)
382             x = x >> 1
383             cpuid += 1
384
385         return siblings
386
387
388 # a little self-test
389 if __name__=="__main__":
390     glo_coresched_simulate = True
391
392     x = CoreSched()
393
394     print "cgroups:", ",".join(x.get_cgroups())
395
396     print "cpus:", x.listToRange(x.get_cpus())
397     print "sibling map:"
398     for item in x.get_cpus():
399         print " ", item, ",".join([str(y) for y in x.cpu_siblings.get(item,[])])
400
401     print "mems:", x.listToRange(x.get_mems())
402     print "cpu to memory map:"
403     for item in x.get_mems():
404         print " ", item, ",".join([str(y) for y in x.mems_map.get(item,[])])
405
406     rspec_sl_test1 = {"cpu_cores": "1"}
407     rec_sl_test1 = {"_rspec": rspec_sl_test1}
408
409     rspec_sl_test2 = {"cpu_cores": "5"}
410     rec_sl_test2 = {"_rspec": rspec_sl_test2}
411
412     rspec_sl_test3 = {"cpu_cores": "3b"}
413     rec_sl_test3 = {"_rspec": rspec_sl_test3}
414
415     #slivers = {"sl_test1": rec_sl_test1, "sl_test2": rec_sl_test2}
416
417     slivers = {"arizona_beta": rec_sl_test1, "arizona_test101": rec_sl_test2, "pl_sirius": rec_sl_test3}
418
419     #slivers = {"arizona_beta": rec_sl_test1, "arizona_logmon": rec_sl_test2, "arizona_owl": rec_sl_test3}
420
421     x.adjustCores(slivers)
422