Merge branch 'master' of ssh://git.planet-lab.org/git/nodemanager
[nodemanager.git] / coresched_lxc.py
1 """Whole core scheduling
2
3 """
4
5 import logger
6 import os
7 import cgroups
8
9 glo_coresched_simulate = False
10 joinpath = os.path.join
11
12 class CoreSched:
13     """ Whole-core scheduler
14
15         The main entrypoint is adjustCores(self, slivers) which takes a
16         dictionary of sliver records. The cpu_cores field is pulled from the
17         effective rspec (rec["_rspec"]) for each sliver.
18
19         If cpu_cores > 0 for a sliver, then that sliver will reserve one or
20         more of the cpu_cores on the machine.
21
22         One core is always left unreserved for system slices.
23     """
24
25     def __init__(self, cgroup_var_name="cpuset.cpus", slice_attr_name="cpu_cores"):
26         self.cpus = []
27         self.cgroup_var_name = cgroup_var_name
28         self.slice_attr_name = slice_attr_name
29         self.cgroup_mem_name = "cpuset.mems"
30         self.mems=[]
31         self.mems_map={}
32         self.cpu_siblings={}
33
34     def get_cgroup_var(self, name=None, subsys=None, filename=None):
35         """ decode cpuset.cpus or cpuset.mems into a list of units that can
36             be reserved.
37         """
38
39         assert(filename!=None or name!=None)
40
41         if filename==None:
42             # filename="/dev/cgroup/" + name
43             filename = reduce(lambda a, b: joinpath(a, b) if b else a, [subsys, name],
44                               cgroups.get_base_path())
45
46         data = open(filename).readline().strip()
47
48         if not data:
49            return []
50
51         units = []
52
53         # cpuset.cpus could be something as arbitrary as:
54         #    0,1,2-3,4,5-6
55         # deal with commas and ranges
56         for part in data.split(","):
57             unitRange = part.split("-")
58             if len(unitRange) == 1:
59                 unitRange = (unitRange[0], unitRange[0])
60             for i in range(int(unitRange[0]), int(unitRange[1])+1):
61                 if not i in units:
62                     units.append(i)
63
64         return units
65
66     def get_cpus(self):
67         """ return a list of available cpu identifiers: [0,1,2,3...]
68         """
69
70         # the cpus never change, so if it's already been computed then don't
71         # worry about it.
72         if self.cpus!=[]:
73             return self.cpus
74
75         self.cpus = self.get_cgroup_var(self.cgroup_var_name, 'cpuset')
76
77         self.cpu_siblings = {}
78         for item in self.cpus:
79            self.cpu_siblings[item] = self.get_core_siblings(item)
80
81         return self.cpus
82
83     def find_cpu_mostsiblings(self, cpus):
84         bestCount = -1
85         bestCpu = -1
86         for cpu in cpus:
87             count = 0
88             for candidate in self.cpu_siblings[cpu]:
89                 if candidate in cpus:
90                     count = count + 1
91                 if (count > bestCount):
92                     bestCount = count
93                     bestCpu = cpu
94
95         assert(bestCpu >= 0)
96         return bestCpu
97
98
99     def find_compatible_cpu(self, cpus, compatCpu):
100         if compatCpu==None:
101            return self.find_cpu_mostsiblings(cpus)
102
103         # find a sibling if we can
104         bestDelta = None
105         bestCpu = None
106         for cpu in cpus:
107            if compatCpu in self.cpu_siblings[cpu]:
108                return cpu
109
110         return self.find_cpu_mostsiblings(cpus)
111
112     def get_cgroups (self):
113         """ return a list of cgroups
114             this might change as vservers are instantiated, so always compute
115             it dynamically.
116         """
117         return cgroups.get_cgroups()
118         #cgroups = []
119         #filenames = os.listdir("/dev/cgroup")
120         #for filename in filenames:
121         #    if os.path.isdir(os.path.join("/dev/cgroup", filename)):
122         #        cgroups.append(filename)
123         #return cgroups
124
125     def decodeCoreSpec (self, cores):
126         """ Decode the value of the core attribute. It's a number, followed by
127             an optional letter "b" to indicate besteffort cores should also
128             be supplied.
129         """
130         bestEffort = False
131
132         if cores.endswith("b"):
133            cores = cores[:-1]
134            bestEffort = True
135
136         try:
137             cores = int(cores)
138         except ValueError:
139             cores = 0
140
141         return (cores, bestEffort)
142
143     def adjustCores (self, slivers):
144         """ slivers is a dict of {sliver_name: rec}
145                 rec is a dict of attributes
146                     rec['_rspec'] is the effective rspec
147         """
148
149         cpus = self.get_cpus()[:]
150         mems = self.get_mems()[:]
151
152         memSchedule=True
153         if (len(mems) != len(cpus)):
154             logger.log("CoreSched fewer mems than " + self.cgroup_var_name + "; mem scheduling disabled")
155             memSchedule=False
156
157         logger.log("CoreSched (" + self.cgroup_var_name + "): available units: " + str(cpus))
158
159         reservations = {}
160         mem_reservations = {}
161
162         # allocate the cores to the slivers that have them reserved
163         # TODO: Need to sort this from biggest cpu_cores to smallest
164         for name, rec in slivers.iteritems():
165             rspec = rec["_rspec"]
166             cores = rspec.get(self.slice_attr_name, 0)
167             (cores, bestEffort) = self.decodeCoreSpec(cores)
168
169             lastCpu = None
170
171             while (cores>0):
172                 # one cpu core reserved for best effort and system slices
173                 if len(cpus)<=1:
174                     logger.log("CoreSched: ran out of units while scheduling sliver " + name)
175                 else:
176                     cpu = self.find_compatible_cpu(cpus, lastCpu)
177                     cpus.remove(cpu)
178                     lastCpu = cpu
179
180                     logger.log("CoreSched: allocating unit " + str(cpu) + " to slice " + name)
181                     reservations[name] = reservations.get(name,[]) + [cpu]
182
183                     # now find a memory node to go with the cpu
184                     if memSchedule:
185                         mem = self.find_associated_memnode(mems, cpu)
186                         if mem != None:
187                             mems.remove(mem)
188                             logger.log("CoreSched: allocating memory node " + str(mem) + " to slice " + name)
189                             mem_reservations[name] = mem_reservations.get(name,[]) + [mem]
190                         else:
191                             logger.log("CoreSched: failed to find memory node for cpu" + str(cpu))
192
193                 cores = cores-1
194
195         # the leftovers go to everyone else
196         logger.log("CoreSched: allocating unit " + str(cpus) + " to _default")
197         reservations["_default"] = cpus[:]
198         mem_reservations["_default"] = mems[:]
199
200         freezeList = {}
201
202         # now check and see if any of our slices had the besteffort flag
203         # set
204         for name, rec in slivers.iteritems():
205             rspec = rec["_rspec"]
206             cores = rspec.get(self.slice_attr_name, 0)
207             (cores, bestEffort) = self.decodeCoreSpec(cores)
208
209             freezable = rspec.get("cpu_freezable", 0)
210             logger.log("CoreSched: " + str(name) + " " + str(freezable) + " " + str(type(freezable)) + " " + str(cores) + " " + str(type(cores)))
211             if (cores==0) and (freezable == 1):
212                freezeList[name] = "FROZEN"
213             else:
214                freezeList[name] = "THAWED"
215
216             # if the bestEffort flag isn't set then we have nothing to do
217             if not bestEffort:
218                 continue
219
220             # note that if a reservation is [], then we don't need to add
221             # bestEffort cores to it, since it is bestEffort by default.
222
223             if reservations.get(name,[]) != []:
224                 reservations[name] = reservations[name] + reservations["_default"]
225                 mem_reservations[name] = mem_reservations.get(name,[]) + mem_reservations["_default"]
226                 logger.log("CoreSched: adding besteffort units to " + name + ". new units = " + str(reservations[name]))
227
228         self.reserveUnits(self.cgroup_var_name, reservations)
229
230         self.reserveUnits(self.cgroup_mem_name, mem_reservations)
231
232         self.freezeUnits("freezer.state", freezeList)
233
234     def freezeUnits (self, var_name, freezeList):
235         for (cgroup, freeze) in freezeList.items():
236             try:
237                 logger.log("CoreSched: setting freezer for " + cgroup + " to " + freeze)
238                 if glo_coresched_simulate:
239                     print "F", "/dev/cgroup/" + cgroup + "/" + var_name, freeze
240                 else:
241                     #file("/dev/cgroup/" + cgroup + "/" + var_name, "w").write(freeze)
242                     file("/sys/fs/cgroup/freezer/libvirt/lxc/" + cgroup + "/" + var_name, "w").write(freeze)
243             except:
244                 # the cgroup probably didn't exit...
245                 logger.log("CoreSched: exception while setting freeze for " + cgroup)
246
247     def reserveUnits (self, var_name, reservations):
248         """ give a set of reservations (dictionary of slicename:cpuid_list),
249             write those reservations to the appropriate cgroup files.
250
251             reservations["_default"] is assumed to be the default reservation
252             for slices that do not reserve cores. It's essentially the leftover
253             cpu cores.
254         """
255
256         default = reservations["_default"]
257
258         # set the default vserver cpuset. this will deal with any vservers
259         # that might be created before the nodemanager has had a chance to
260         # update the cpusets.
261         self.reserveDefault(var_name, default)
262
263         for cgroup in self.get_cgroups():
264             if cgroup in reservations:
265                 cpus = reservations[cgroup]
266                 logger.log("CoreSched: reserving " + var_name + " on " + cgroup + ": " + str(cpus))
267             else:
268                 # no log message for default; too much verbosity in the common case
269                 cpus = default
270
271             if glo_coresched_simulate:
272                 print "R", "/dev/cgroup/" + cgroup + "/" + var_name, self.listToRange(cpus)
273             else:
274                 cgroups.write(cgroup, var_name, self.listToRange(cpus))
275                 #file("/dev/cgroup/" + cgroup + "/" + var_name, "w").write( self.listToRange(cpus) + "\n" )
276
277     def reserveDefault (self, var_name, cpus):
278         #if not os.path.exists("/etc/vservers/.defaults/cgroup"):
279         #    os.makedirs("/etc/vservers/.defaults/cgroup")
280
281         #if glo_coresched_simulate:
282         #    print "RDEF", "/etc/vservers/.defaults/cgroup/" + var_name, self.listToRange(cpus)
283         #else:
284         #    file("/etc/vservers/.defaults/cgroup/" + var_name, "w").write( self.listToRange(cpus) + "\n" )
285         pass
286
287     def listToRange (self, list):
288         """ take a list of items [1,2,3,5,...] and return it as a range: "1-3,5"
289             for now, just comma-separate
290         """
291         return ",".join( [str(i) for i in list] )
292
293     def get_mems(self):
294         """ return a list of available cpu identifiers: [0,1,2,3...]
295         """
296
297         # the cpus never change, so if it's already been computed then don't
298         # worry about it.
299         if self.mems!=[]:
300             return self.mems
301
302         self.mems = self.get_cgroup_var(self.cgroup_mem_name, 'cpuset')
303
304         # build a mapping from memory nodes to the cpus they can be used with
305
306         mems_map={}
307         for item in self.mems:
308            mems_map[item] = self.get_memnode_cpus(item)
309
310         if (len(mems_map)>0):
311             # when NUMA_EMU is enabled, only the last memory node will contain
312             # the cpu_map. For example, if there were originally 2 nodes and
313             # we used NUM_EMU to raise it to 12, then
314             #    mems_map[0]=[]
315             #    ...
316             #    mems_map[4]=[]
317             #    mems_map[5]=[1,3,5,7,9,11]
318             #    mems_map[6]=[]
319             #    ...
320             #    mems_map[10]=[]
321             #    mems_map[11]=[0,2,4,6,8,10]
322             # so, we go from back to front, copying the entries as necessary.
323
324             if mems_map[self.mems[0]] == []:
325                 work = []
326                 for item in reversed(self.mems):
327                     if mems_map[item]!=[]:
328                         work = mems_map[item]
329                     else:  # mems_map[item]==[]
330                         mems_map[item] = work
331
332             self.mems_map = mems_map
333
334         return self.mems
335
336     def find_associated_memnode(self, mems, cpu):
337         """ Given a list of memory nodes and a cpu, see if one of the nodes in
338             the list can be used with that cpu.
339         """
340         for item in mems:
341             if cpu in self.mems_map[item]:
342                 return item
343         return None
344
345     def get_memnode_cpus(self, index):
346         """ for a given memory node, return the CPUs that it is associated
347             with.
348         """
349         fn = "/sys/devices/system/node/node" + str(index) + "/cpulist"
350         if not os.path.exists(fn):
351             logger.log("CoreSched: failed to locate memory node" + fn)
352             return []
353
354         return self.get_cgroup_var(filename=fn)
355
356     def get_core_siblings(self, index):
357         # use core_siblings rather than core_siblings_list, as it's compatible
358         # with older kernels
359         fn = "/sys/devices/system/cpu/cpu" + str(index) + "/topology/core_siblings"
360         if not os.path.exists(fn):
361             return []
362         siblings = []
363
364         x = open(fn, 'rt').readline().strip().split(',')[-1]
365         x = int(x, 16)
366
367         cpuid = 0
368         while (x>0):
369             if (x&1)!=0:
370                 siblings.append(cpuid)
371             x = x >> 1
372             cpuid += 1
373
374         return siblings
375
376
377 # a little self-test
378 if __name__=="__main__":
379     glo_coresched_simulate = True
380
381     x = CoreSched()
382
383     print "cgroups:", ",".join(x.get_cgroups())
384
385     print "cpus:", x.listToRange(x.get_cpus())
386     print "sibling map:"
387     for item in x.get_cpus():
388         print " ", item, ",".join([str(y) for y in x.cpu_siblings.get(item,[])])
389
390     print "mems:", x.listToRange(x.get_mems())
391     print "cpu to memory map:"
392     for item in x.get_mems():
393         print " ", item, ",".join([str(y) for y in x.mems_map.get(item,[])])
394
395     rspec_sl_test1 = {"cpu_cores": "1"}
396     rec_sl_test1 = {"_rspec": rspec_sl_test1}
397
398     rspec_sl_test2 = {"cpu_cores": "5"}
399     rec_sl_test2 = {"_rspec": rspec_sl_test2}
400
401     rspec_sl_test3 = {"cpu_cores": "3b"}
402     rec_sl_test3 = {"_rspec": rspec_sl_test3}
403
404     #slivers = {"sl_test1": rec_sl_test1, "sl_test2": rec_sl_test2}
405
406     slivers = {"arizona_beta": rec_sl_test1, "arizona_test101": rec_sl_test2, "pl_sirius": rec_sl_test3}
407
408     #slivers = {"arizona_beta": rec_sl_test1, "arizona_logmon": rec_sl_test2, "arizona_owl": rec_sl_test3}
409
410     x.adjustCores(slivers)
411