reguire gnupg1 on f>=31; sense the system to use gpg1 when installed
[nodemanager.git] / coresched_vs.py
1 """Whole core scheduling
2
3 """
4
5 import logger
6 import os
7
8 glo_coresched_simulate = False
9
10 class CoreSched:
11     """ Whole-core scheduler
12
13         The main entrypoint is adjustCores(self, slivers) which takes a
14         dictionary of sliver records. The cpu_cores field is pulled from the
15         effective rspec (rec["_rspec"]) for each sliver.
16
17         If cpu_cores > 0 for a sliver, then that sliver will reserve one or
18         more of the cpu_cores on the machine.
19
20         One core is always left unreserved for system slices.
21     """
22
23     def __init__(self, cgroup_var_name="cpuset.cpus", slice_attr_name="cpu_cores"):
24         self.cpus = []
25         self.cgroup_var_name = cgroup_var_name
26         self.slice_attr_name = slice_attr_name
27         self.cgroup_mem_name = "cpuset.mems"
28         self.mems=[]
29         self.mems_map={}
30         self.cpu_siblings={}
31
32     def get_cgroup_var(self, name=None, filename=None):
33         """ decode cpuset.cpus or cpuset.mems into a list of units that can
34             be reserved.
35         """
36
37         assert(filename!=None or name!=None)
38
39         if filename==None:
40             filename="/dev/cgroup/" + name
41
42         data = open(filename).readline().strip()
43
44         if not data:
45            return []
46
47         units = []
48
49         # cpuset.cpus could be something as arbitrary as:
50         #    0,1,2-3,4,5-6
51         # deal with commas and ranges
52         for part in data.split(","):
53             unitRange = part.split("-")
54             if len(unitRange) == 1:
55                 unitRange = (unitRange[0], unitRange[0])
56             for i in range(int(unitRange[0]), int(unitRange[1])+1):
57                 if not i in units:
58                     units.append(i)
59
60         return units
61
62     def get_cpus(self):
63         """ return a list of available cpu identifiers: [0,1,2,3...]
64         """
65
66         # the cpus never change, so if it's already been computed then don't
67         # worry about it.
68         if self.cpus!=[]:
69             return self.cpus
70
71         self.cpus = self.get_cgroup_var(self.cgroup_var_name)
72
73         self.cpu_siblings = {}
74         for item in self.cpus:
75            self.cpu_siblings[item] = self.get_core_siblings(item)
76
77         return self.cpus
78
79     def find_cpu_mostsiblings(self, cpus):
80         bestCount = -1
81         bestCpu = -1
82         for cpu in cpus:
83             count = 0
84             for candidate in self.cpu_siblings[cpu]:
85                 if candidate in cpus:
86                     count = count + 1
87                 if (count > bestCount):
88                     bestCount = count
89                     bestCpu = cpu
90
91         assert(bestCpu >= 0)
92         return bestCpu
93
94
95     def find_compatible_cpu(self, cpus, compatCpu):
96         if compatCpu==None:
97            return self.find_cpu_mostsiblings(cpus)
98
99         # find a sibling if we can
100         bestDelta = None
101         bestCpu = None
102         for cpu in cpus:
103            if compatCpu in self.cpu_siblings[cpu]:
104                return cpu
105
106         return self.find_cpu_mostsiblings(cpus)
107
108     def get_cgroups (self):
109         """ return a list of cgroups
110             this might change as vservers are instantiated, so always compute
111             it dynamically.
112         """
113         cgroups = []
114         filenames = os.listdir("/dev/cgroup")
115         for filename in filenames:
116             if os.path.isdir(os.path.join("/dev/cgroup", filename)):
117                 cgroups.append(filename)
118         return cgroups
119
120     def decodeCoreSpec (self, cores):
121         """ Decode the value of the core attribute. It's a number, followed by
122             an optional letter "b" to indicate besteffort cores should also
123             be supplied.
124         """
125         bestEffort = False
126
127         if cores.endswith("b"):
128            cores = cores[:-1]
129            bestEffort = True
130
131         try:
132             cores = int(cores)
133         except ValueError:
134             cores = 0
135
136         return (cores, bestEffort)
137
138     def adjustCores (self, slivers):
139         """ slivers is a dict of {sliver_name: rec}
140                 rec is a dict of attributes
141                     rec['_rspec'] is the effective rspec
142         """
143
144         cpus = self.get_cpus()[:]
145         mems = self.get_mems()[:]
146
147         memSchedule=True
148         if (len(mems) != len(cpus)):
149             logger.log("CoreSched fewer mems than " + self.cgroup_var_name + "; mem scheduling disabled")
150             memSchedule=False
151
152         logger.log("CoreSched (" + self.cgroup_var_name + "): available units: " + str(cpus))
153
154         reservations = {}
155         mem_reservations = {}
156
157         # allocate the cores to the slivers that have them reserved
158         # TODO: Need to sort this from biggest cpu_cores to smallest
159         for name, rec in slivers.items():
160             rspec = rec["_rspec"]
161             cores = rspec.get(self.slice_attr_name, 0)
162             (cores, bestEffort) = self.decodeCoreSpec(cores)
163
164             lastCpu = None
165
166             while (cores>0):
167                 # one cpu core reserved for best effort and system slices
168                 if len(cpus)<=1:
169                     logger.log("CoreSched: ran out of units while scheduling sliver " + name)
170                 else:
171                     cpu = self.find_compatible_cpu(cpus, lastCpu)
172                     cpus.remove(cpu)
173                     lastCpu = cpu
174
175                     logger.log("CoreSched: allocating unit " + str(cpu) + " to slice " + name)
176                     reservations[name] = reservations.get(name, []) + [cpu]
177
178                     # now find a memory node to go with the cpu
179                     if memSchedule:
180                         mem = self.find_associated_memnode(mems, cpu)
181                         if mem != None:
182                             mems.remove(mem)
183                             logger.log("CoreSched: allocating memory node " + str(mem) + " to slice " + name)
184                             mem_reservations[name] = mem_reservations.get(name, []) + [mem]
185                         else:
186                             logger.log("CoreSched: failed to find memory node for cpu" + str(cpu))
187
188                 cores = cores-1
189
190         # the leftovers go to everyone else
191         logger.log("CoreSched: allocating unit " + str(cpus) + " to _default")
192         reservations["_default"] = cpus[:]
193         mem_reservations["_default"] = mems[:]
194
195         # now check and see if any of our slices had the besteffort flag
196         # set
197         for name, rec in slivers.items():
198             rspec = rec["_rspec"]
199             cores = rspec.get(self.slice_attr_name, 0)
200             (cores, bestEffort) = self.decodeCoreSpec(cores)
201
202             # if the bestEffort flag isn't set then we have nothing to do
203             if not bestEffort:
204                 continue
205
206             # note that if a reservation is [], then we don't need to add
207             # bestEffort cores to it, since it is bestEffort by default.
208
209             if reservations.get(name, []) != []:
210                 reservations[name] = reservations[name] + reservations["_default"]
211                 mem_reservations[name] = mem_reservations.get(name, []) + mem_reservations["_default"]
212                 logger.log("CoreSched: adding besteffort units to " + name + ". new units = " + str(reservations[name]))
213
214         self.reserveUnits(self.cgroup_var_name, reservations)
215
216         self.reserveUnits(self.cgroup_mem_name, mem_reservations)
217
218     def reserveUnits (self, var_name, reservations):
219         """ give a set of reservations (dictionary of slicename:cpuid_list),
220             write those reservations to the appropriate cgroup files.
221
222             reservations["_default"] is assumed to be the default reservation
223             for slices that do not reserve cores. It's essentially the leftover
224             cpu cores.
225         """
226
227         default = reservations["_default"]
228
229         # set the default vserver cpuset. this will deal with any vservers
230         # that might be created before the nodemanager has had a chance to
231         # update the cpusets.
232         self.reserveDefault(var_name, default)
233
234         for cgroup in self.get_cgroups():
235             if cgroup in reservations:
236                 cpus = reservations[cgroup]
237                 logger.log("CoreSched: reserving " + var_name + " on " + cgroup + ": " + str(cpus))
238             else:
239                 # no log message for default; too much verbosity in the common case
240                 cpus = default
241
242             if glo_coresched_simulate:
243                 print("R", "/dev/cgroup/" + cgroup + "/" + var_name, self.listToRange(cpus))
244             else:
245                 with opwn("/dev/cgroup/{}/{}".format(cgroup, var_name), "w") as f:
246                     f.write( self.listToRange(cpus) + "\n" )
247
248     def reserveDefault (self, var_name, cpus):
249         if not os.path.exists("/etc/vservers/.defaults/cgroup"):
250             os.makedirs("/etc/vservers/.defaults/cgroup")
251
252         if glo_coresched_simulate:
253             print("RDEF", "/etc/vservers/.defaults/cgroup/" + var_name, self.listToRange(cpus))
254         else:
255             with open("/etc/vservers/.defaults/cgroup/{}".format(var_name), "w") as f:
256                 f.write( self.listToRange(cpus) + "\n" )
257
258     def listToRange (self, list):
259         """ take a list of items [1,2,3,5,...] and return it as a range: "1-3,5"
260             for now, just comma-separate
261         """
262         return ",".join( [str(i) for i in list] )
263
264     def get_mems(self):
265         """ return a list of available cpu identifiers: [0,1,2,3...]
266         """
267
268         # the cpus never change, so if it's already been computed then don't
269         # worry about it.
270         if self.mems!=[]:
271             return self.mems
272
273         self.mems = self.get_cgroup_var(self.cgroup_mem_name)
274
275         # build a mapping from memory nodes to the cpus they can be used with
276
277         mems_map={}
278         for item in self.mems:
279            mems_map[item] = self.get_memnode_cpus(item)
280
281         if (len(mems_map)>0):
282             # when NUMA_EMU is enabled, only the last memory node will contain
283             # the cpu_map. For example, if there were originally 2 nodes and
284             # we used NUM_EMU to raise it to 12, then
285             #    mems_map[0]=[]
286             #    ...
287             #    mems_map[4]=[]
288             #    mems_map[5]=[1,3,5,7,9,11]
289             #    mems_map[6]=[]
290             #    ...
291             #    mems_map[10]=[]
292             #    mems_map[11]=[0,2,4,6,8,10]
293             # so, we go from back to front, copying the entries as necessary.
294
295             if mems_map[self.mems[0]] == []:
296                 work = []
297                 for item in reversed(self.mems):
298                     if mems_map[item] != []:
299                         work = mems_map[item]
300                     else:  # mems_map[item]==[]
301                         mems_map[item] = work
302
303             self.mems_map = mems_map
304
305         return self.mems
306
307     def find_associated_memnode(self, mems, cpu):
308         """ Given a list of memory nodes and a cpu, see if one of the nodes in
309             the list can be used with that cpu.
310         """
311         for item in mems:
312             if cpu in self.mems_map[item]:
313                 return item
314         return None
315
316     def get_memnode_cpus(self, index):
317         """ for a given memory node, return the CPUs that it is associated
318             with.
319         """
320         fn = "/sys/devices/system/node/node" + str(index) + "/cpulist"
321         if not os.path.exists(fn):
322             logger.log("CoreSched: failed to locate memory node" + fn)
323             return []
324
325         return self.get_cgroup_var(filename=fn)
326
327     def get_core_siblings(self, index):
328         # use core_siblings rather than core_siblings_list, as it's compatible
329         # with older kernels
330         fn = "/sys/devices/system/cpu/cpu" + str(index) + "/topology/core_siblings"
331         if not os.path.exists(fn):
332             return []
333         siblings = []
334
335         with open(fn, "rt") as f:
336             x = int(f.readline().strip(), 16)
337         cpuid = 0
338         while (x>0):
339             if (x&1)!=0:
340                 siblings.append(cpuid)
341             x = x >> 1
342             cpuid += 1
343
344         return siblings
345
346
347 # a little self-test
348 if __name__=="__main__":
349     glo_coresched_simulate = True
350
351     x = CoreSched()
352
353     print("cgroups:", ",".join(x.get_cgroups()))
354
355     print("cpus:", x.listToRange(x.get_cpus()))
356     print("sibling map:")
357     for item in x.get_cpus():
358         print(" ", item, ",".join([str(y) for y in x.cpu_siblings.get(item, [])]))
359
360     print("mems:", x.listToRange(x.get_mems()))
361     print("cpu to memory map:")
362     for item in x.get_mems():
363         print(" ", item, ",".join([str(y) for y in x.mems_map.get(item, [])]))
364
365     rspec_sl_test1 = {"cpu_cores": "1"}
366     rec_sl_test1 = {"_rspec": rspec_sl_test1}
367
368     rspec_sl_test2 = {"cpu_cores": "5"}
369     rec_sl_test2 = {"_rspec": rspec_sl_test2}
370
371     rspec_sl_test3 = {"cpu_cores": "3b"}
372     rec_sl_test3 = {"_rspec": rspec_sl_test3}
373
374     #slivers = {"sl_test1": rec_sl_test1, "sl_test2": rec_sl_test2}
375
376     slivers = {"arizona_beta": rec_sl_test1, "arizona_test101": rec_sl_test2, "pl_sirius": rec_sl_test3}
377
378     #slivers = {"arizona_beta": rec_sl_test1, "arizona_logmon": rec_sl_test2, "arizona_owl": rec_sl_test3}
379
380     x.adjustCores(slivers)
381