reguire gnupg1 on f>=31; sense the system to use gpg1 when installed
[nodemanager.git] / coresched_lxc.py
1 """
2 Whole-core scheduling
3 """
4
5 import logger
6 import os
7 import os.path
8 import cgroups
9 from functools import reduce
10
11 glo_coresched_simulate = False
12 joinpath = os.path.join
13
14 class CoreSched:
15     """ 
16     Whole-core scheduler
17
18     The main entrypoint is adjustCores(self, slivers) which takes a
19     dictionary of sliver records. The cpu_cores field is pulled from the
20     effective rspec (rec["_rspec"]) for each sliver.
21
22     If cpu_cores > 0 for a sliver, then that sliver will reserve one or
23     more of the cpu_cores on the machine.
24
25     One core is always left unreserved for system slices.
26     """
27
28     def __init__(self, cgroup_var_name="cpuset.cpus", slice_attr_name="cpu_cores"):
29         self.cpus = []
30         self.cgroup_var_name = cgroup_var_name
31         self.slice_attr_name = slice_attr_name
32         self.cgroup_mem_name = "cpuset.mems"
33         self.mems=[]
34         self.mems_map={}
35         self.cpu_siblings={}
36
37     def get_cgroup_var(self, name=None, subsys=None, filename=None):
38         """ 
39         decode cpuset.cpus or cpuset.mems into a list of units that can
40         be reserved.
41         """
42
43         assert(filename!=None or name!=None)
44
45         if filename==None:
46             # filename="/dev/cgroup/" + name
47             filename = reduce(lambda a, b: joinpath(a, b) if b else a, [subsys, name],
48                               cgroups.get_base_path())
49
50         data = open(filename).readline().strip()
51
52         if not data:
53            return []
54
55         units = []
56
57         # cpuset.cpus could be something as arbitrary as:
58         #    0,1,2-3,4,5-6
59         # deal with commas and ranges
60         for part in data.split(","):
61             unitRange = part.split("-")
62             if len(unitRange) == 1:
63                 unitRange = (unitRange[0], unitRange[0])
64             for i in range(int(unitRange[0]), int(unitRange[1])+1):
65                 if not i in units:
66                     units.append(i)
67
68         return units
69
70     def get_cpus(self):
71         """ 
72         return a list of available cpu identifiers: [0,1,2,3...]
73         """
74
75         # the cpus never change, so if it's already been computed then don't
76         # worry about it.
77         if self.cpus!=[]:
78             return self.cpus
79
80         self.cpus = self.get_cgroup_var(self.cgroup_var_name, 'cpuset')
81
82         self.cpu_siblings = {}
83         for item in self.cpus:
84            self.cpu_siblings[item] = self.get_core_siblings(item)
85
86         return self.cpus
87
88     def find_cpu_mostsiblings(self, cpus):
89         bestCount = -1
90         bestCpu = -1
91         for cpu in cpus:
92             count = 0
93             for candidate in self.cpu_siblings[cpu]:
94                 if candidate in cpus:
95                     count = count + 1
96                 if (count > bestCount):
97                     bestCount = count
98                     bestCpu = cpu
99
100         assert(bestCpu >= 0)
101         return bestCpu
102
103
104     def find_compatible_cpu(self, cpus, compatCpu):
105         if compatCpu==None:
106            return self.find_cpu_mostsiblings(cpus)
107
108         # find a sibling if we can
109         bestDelta = None
110         bestCpu = None
111         for cpu in cpus:
112            if compatCpu in self.cpu_siblings[cpu]:
113                return cpu
114
115         return self.find_cpu_mostsiblings(cpus)
116
117     def get_cgroups (self):
118         """ 
119         return a list of cgroups
120         this might change as vservers are instantiated, 
121         so always compute it dynamically.
122         """
123         return cgroups.get_cgroups()
124         #cgroups = []
125         #filenames = os.listdir("/dev/cgroup")
126         #for filename in filenames:
127         #    if os.path.isdir(os.path.join("/dev/cgroup", filename)):
128         #        cgroups.append(filename)
129         #return cgroups
130
131     def decodeCoreSpec (self, cores):
132         """ 
133         Decode the value of the core attribute. 
134         It's a number, followed by an optional letter "b" to indicate besteffort
135         cores should also be supplied.
136         """
137         bestEffort = False
138
139         if cores.endswith("b"):
140            cores = cores[:-1]
141            bestEffort = True
142
143         try:
144             cores = int(cores)
145         except ValueError:
146             cores = 0
147
148         return (cores, bestEffort)
149
150     def adjustCores (self, slivers):
151         """ 
152         slivers is a dict of {sliver_name: rec}
153            rec is a dict of attributes
154            rec['_rspec'] is the effective rspec
155         """
156
157         cpus = self.get_cpus()[:]
158         mems = self.get_mems()[:]
159
160         memSchedule=True
161         if (len(mems) != len(cpus)):
162             logger.log("CoreSched fewer mems than " + self.cgroup_var_name + "; mem scheduling disabled")
163             memSchedule=False
164
165         logger.log("CoreSched (" + self.cgroup_var_name + "): available units: " + str(cpus))
166
167         reservations = {}
168         mem_reservations = {}
169
170         # allocate the cores to the slivers that have them reserved
171         # TODO: Need to sort this from biggest cpu_cores to smallest
172         for name, rec in slivers.items():
173             rspec = rec["_rspec"]
174             cores = rspec.get(self.slice_attr_name, 0)
175             (cores, bestEffort) = self.decodeCoreSpec(cores)
176
177             lastCpu = None
178
179             while (cores>0):
180                 # one cpu core reserved for best effort and system slices
181                 if len(cpus)<=1:
182                     logger.log("CoreSched: ran out of units while scheduling sliver " + name)
183                 else:
184                     cpu = self.find_compatible_cpu(cpus, lastCpu)
185                     cpus.remove(cpu)
186                     lastCpu = cpu
187
188                     logger.log("CoreSched: allocating unit " + str(cpu) + " to slice " + name)
189                     reservations[name] = reservations.get(name, []) + [cpu]
190
191                     # now find a memory node to go with the cpu
192                     if memSchedule:
193                         mem = self.find_associated_memnode(mems, cpu)
194                         if mem != None:
195                             mems.remove(mem)
196                             logger.log("CoreSched: allocating memory node " + str(mem) + " to slice " + name)
197                             mem_reservations[name] = mem_reservations.get(name, []) + [mem]
198                         else:
199                             logger.log("CoreSched: failed to find memory node for cpu" + str(cpu))
200
201                 cores = cores-1
202
203         # the leftovers go to everyone else
204         logger.log("CoreSched: allocating unit " + str(cpus) + " to _default")
205         reservations["_default"] = cpus[:]
206         mem_reservations["_default"] = mems[:]
207
208         freezeList = {}
209
210         # now check and see if any of our slices had the besteffort flag
211         # set
212         for name, rec in slivers.items():
213             rspec = rec["_rspec"]
214             cores = rspec.get(self.slice_attr_name, 0)
215             (cores, bestEffort) = self.decodeCoreSpec(cores)
216
217             freezable = rspec.get("cpu_freezable", 0)
218             if (cores==0) and (freezable == 1):
219                freezeList[name] = "FROZEN"
220             else:
221                freezeList[name] = "THAWED"
222
223             # if the bestEffort flag isn't set then we have nothing to do
224             if not bestEffort:
225                 continue
226
227             # note that if a reservation is [], then we don't need to add
228             # bestEffort cores to it, since it is bestEffort by default.
229
230             if reservations.get(name, []) != []:
231                 reservations[name] = reservations[name] + reservations["_default"]
232                 mem_reservations[name] = mem_reservations.get(name, []) + mem_reservations["_default"]
233                 logger.log("CoreSched: adding besteffort units to " + name + ". new units = " + str(reservations[name]))
234
235         self.reserveUnits(self.cgroup_var_name, reservations)
236
237         self.reserveUnits(self.cgroup_mem_name, mem_reservations)
238
239         self.freezeUnits("freezer.state", freezeList)
240
241     def freezeUnits (self, var_name, freezeList):
242         for (slicename, freeze) in list(freezeList.items()):
243             try:
244                 cgroup_path = cgroups.get_cgroup_path(slicename, 'freezer')
245                 logger.verbose("CoreSched: setting freezer for {} to {} - path={} var={}"
246                                .format(slicename,freeze, cgroup_path, var_name))
247                 cgroup = os.path.join(cgroup_path, var_name)
248                 if not cgroup:
249                     logger.log("Warning: Could not spot 'freezer' cgroup file for slice {} - ignored".format(slicename))
250                     break
251
252                 if glo_coresched_simulate:
253                         print("F", cgroup)
254                 else:
255                     with open(cgroup, "w") as f:
256                         f.write(freeze)
257             except Exception as e:
258                 # the cgroup probably didn't exit...
259                 logger.log("CoreSched: exception while setting freeze for {} ({})".format(slicename, e))
260
261     def reserveUnits (self, var_name, reservations):
262         """ 
263         give a set of reservations (dictionary of slicename:cpuid_list),
264         write those reservations to the appropriate cgroup files.
265
266         reservations["_default"] is assumed to be the default reservation
267         for slices that do not reserve cores. It's essentially the leftover
268         cpu cores.
269         """
270
271         default = reservations["_default"]
272
273         # set the default vserver cpuset. this will deal with any vservers
274         # that might be created before the nodemanager has had a chance to
275         # update the cpusets.
276         self.reserveDefault(var_name, default)
277
278         for cgroup in self.get_cgroups():
279             if cgroup in reservations:
280                 cpus = reservations[cgroup]
281                 logger.log("CoreSched: reserving " + var_name + " on " + cgroup + ": " + str(cpus))
282             else:
283                 # no log message for default; too much verbosity in the common case
284                 cpus = default
285
286             if glo_coresched_simulate:
287                 print("R", cgroup + "/" + var_name, self.listToRange(cpus))
288             else:
289                 cgroups.write(cgroup, var_name, self.listToRange(cpus))
290
291     def reserveDefault (self, var_name, cpus):
292         #if not os.path.exists("/etc/vservers/.defaults/cgroup"):
293         #    os.makedirs("/etc/vservers/.defaults/cgroup")
294
295         #if glo_coresched_simulate:
296         #    print "RDEF", "/etc/vservers/.defaults/cgroup/" + var_name, self.listToRange(cpus)
297         #else:
298         #    file("/etc/vservers/.defaults/cgroup/" + var_name, "w").write( self.listToRange(cpus) + "\n" )
299         pass
300
301     def listToRange (self, list):
302         """ 
303         take a list of items [1,2,3,5,...] and return it as a range: "1-3,5"
304         for now, just comma-separate
305         """
306         return ",".join( [str(i) for i in list] )
307
308     def get_mems(self):
309         """ 
310         return a list of available cpu identifiers: [0,1,2,3...]
311         """
312
313         # the cpus never change, so if it's already been computed then don't
314         # worry about it.
315         if self.mems!=[]:
316             return self.mems
317
318         self.mems = self.get_cgroup_var(self.cgroup_mem_name, 'cpuset')
319
320         # build a mapping from memory nodes to the cpus they can be used with
321
322         mems_map={}
323         for item in self.mems:
324            mems_map[item] = self.get_memnode_cpus(item)
325
326         if (len(mems_map)>0):
327             # when NUMA_EMU is enabled, only the last memory node will contain
328             # the cpu_map. For example, if there were originally 2 nodes and
329             # we used NUM_EMU to raise it to 12, then
330             #    mems_map[0]=[]
331             #    ...
332             #    mems_map[4]=[]
333             #    mems_map[5]=[1,3,5,7,9,11]
334             #    mems_map[6]=[]
335             #    ...
336             #    mems_map[10]=[]
337             #    mems_map[11]=[0,2,4,6,8,10]
338             # so, we go from back to front, copying the entries as necessary.
339
340             if mems_map[self.mems[0]] == []:
341                 work = []
342                 for item in reversed(self.mems):
343                     if mems_map[item]!=[]:
344                         work = mems_map[item]
345                     else:  # mems_map[item]==[]
346                         mems_map[item] = work
347
348             self.mems_map = mems_map
349
350         return self.mems
351
352     def find_associated_memnode(self, mems, cpu):
353         """ 
354         Given a list of memory nodes and a cpu, see if one of the nodes in
355         the list can be used with that cpu.
356         """
357         for item in mems:
358             if cpu in self.mems_map[item]:
359                 return item
360         return None
361
362     def get_memnode_cpus(self, index):
363         """
364         for a given memory node, return the CPUs that it is associated with.
365         """
366         fn = "/sys/devices/system/node/node" + str(index) + "/cpulist"
367         if not os.path.exists(fn):
368             logger.log("CoreSched: failed to locate memory node" + fn)
369             return []
370
371         return self.get_cgroup_var(filename=fn)
372
373     def get_core_siblings(self, index):
374         # use core_siblings rather than core_siblings_list, as it's compatible
375         # with older kernels
376         fn = "/sys/devices/system/cpu/cpu" + str(index) + "/topology/core_siblings"
377         if not os.path.exists(fn):
378             return []
379         siblings = []
380
381         x = open(fn, 'rt').readline().strip().split(',')[-1]
382         x = int(x, 16)
383
384         cpuid = 0
385         while (x>0):
386             if (x&1)!=0:
387                 siblings.append(cpuid)
388             x = x >> 1
389             cpuid += 1
390
391         return siblings
392
393
394 # a little self-test
395 if __name__=="__main__":
396     glo_coresched_simulate = True
397
398     x = CoreSched()
399
400     print("cgroups:", ",".join(x.get_cgroups()))
401
402     print("cpus:", x.listToRange(x.get_cpus()))
403     print("sibling map:")
404     for item in x.get_cpus():
405         print(" ", item, ",".join([str(y) for y in x.cpu_siblings.get(item, [])]))
406
407     print("mems:", x.listToRange(x.get_mems()))
408     print("cpu to memory map:")
409     for item in x.get_mems():
410         print(" ", item, ",".join([str(y) for y in x.mems_map.get(item, [])]))
411
412     rspec_sl_test1 = {"cpu_cores": "1"}
413     rec_sl_test1 = {"_rspec": rspec_sl_test1}
414
415     rspec_sl_test2 = {"cpu_cores": "5"}
416     rec_sl_test2 = {"_rspec": rspec_sl_test2}
417
418     rspec_sl_test3 = {"cpu_cores": "3b"}
419     rec_sl_test3 = {"_rspec": rspec_sl_test3}
420
421     #slivers = {"sl_test1": rec_sl_test1, "sl_test2": rec_sl_test2}
422
423     slivers = {"arizona_beta": rec_sl_test1, "arizona_test101": rec_sl_test2, "pl_sirius": rec_sl_test3}
424
425     #slivers = {"arizona_beta": rec_sl_test1, "arizona_logmon": rec_sl_test2, "arizona_owl": rec_sl_test3}
426
427     x.adjustCores(slivers)
428