3954da957fd694d518bcc97b9610fc8b1d32923b
[nodemanager.git] / coresched.py
1 # $Id$
2 # $URL$
3
4 """Whole core scheduling
5
6 """
7
8 import logger
9 import os
10
11 glo_coresched_simulate = False
12
13 class CoreSched:
14     """ Whole-core scheduler
15
16         The main entrypoint is adjustCores(self, slivers) which takes a
17         dictionary of sliver records. The cpu_cores field is pulled from the
18         effective rspec (rec["_rspec"]) for each sliver.
19
20         If cpu_cores > 0 for a sliver, then that sliver will reserve one or
21         more of the cpu_cores on the machine.
22
23         One core is always left unreserved for system slices.
24     """
25
26     def __init__(self, cgroup_var_name="cpuset.cpus", slice_attr_name="cpu_cores"):
27         self.cpus = []
28         self.cgroup_var_name = cgroup_var_name
29         self.slice_attr_name = slice_attr_name
30         self.cgroup_mem_name = "cpuset.mems"
31         self.mems=[]
32         self.mems_map={}
33         self.cpu_siblings={}
34
35     def get_cgroup_var(self, name=None, filename=None):
36         """ decode cpuset.cpus or cpuset.mems into a list of units that can
37             be reserved.
38         """
39
40         assert(filename!=None or name!=None)
41
42         if filename==None:
43             filename="/dev/cgroup/" + name
44
45         data = open(filename).readline().strip()
46
47         if not data:
48            return []
49
50         units = []
51
52         # cpuset.cpus could be something as arbitrary as:
53         #    0,1,2-3,4,5-6
54         # deal with commas and ranges
55         for part in data.split(","):
56             unitRange = part.split("-")
57             if len(unitRange) == 1:
58                 unitRange = (unitRange[0], unitRange[0])
59             for i in range(int(unitRange[0]), int(unitRange[1])+1):
60                 if not i in units:
61                     units.append(i)
62
63         return units
64
65     def get_cpus(self):
66         """ return a list of available cpu identifiers: [0,1,2,3...]
67         """
68
69         # the cpus never change, so if it's already been computed then don't
70         # worry about it.
71         if self.cpus!=[]:
72             return self.cpus
73
74         self.cpus = self.get_cgroup_var(self.cgroup_var_name)
75
76         self.cpu_siblings = {}
77         for item in self.cpus:
78            self.cpu_siblings[item] = self.get_core_siblings(item)
79
80         return self.cpus
81
82     def find_cpu_mostsiblings(self, cpus):
83         bestCount = -1
84         bestCpu = -1
85         for cpu in cpus:
86             count = 0
87             for candidate in self.cpu_siblings[cpu]:
88                 if candidate in cpus:
89                     count = count + 1
90                 if (count > bestCount):
91                     bestCount = count
92                     bestCpu = cpu
93
94         assert(bestCpu >= 0)
95         return bestCpu
96
97
98     def find_compatible_cpu(self, cpus, compatCpu):
99         if compatCpu==None:
100            return self.find_cpu_mostsiblings(cpus)
101
102         # find a sibling if we can
103         bestDelta = None
104         bestCpu = None
105         for cpu in cpus:
106            if compatCpu in self.cpu_siblings[cpu]:
107                return cpu
108
109         return self.find_cpu_mostsiblings(cpus)
110
111     def get_cgroups (self):
112         """ return a list of cgroups
113             this might change as vservers are instantiated, so always compute
114             it dynamically.
115         """
116         cgroups = []
117         filenames = os.listdir("/dev/cgroup")
118         for filename in filenames:
119             if os.path.isdir(os.path.join("/dev/cgroup", filename)):
120                 cgroups.append(filename)
121         return cgroups
122
123     def decodeCoreSpec (self, cores):
124         """ Decode the value of the core attribute. It's a number, followed by
125             an optional letter "b" to indicate besteffort cores should also
126             be supplied.
127         """
128         bestEffort = False
129
130         if cores.endswith("b"):
131            cores = cores[:-1]
132            bestEffort = True
133
134         try:
135             cores = int(cores)
136         except ValueError:
137             cores = 0
138
139         return (cores, bestEffort)
140
141     def adjustCores (self, slivers):
142         """ slivers is a dict of {sliver_name: rec}
143                 rec is a dict of attributes
144                     rec['_rspec'] is the effective rspec
145         """
146
147         cpus = self.get_cpus()[:]
148         mems = self.get_mems()[:]
149
150         memSchedule=True
151         if (len(mems) != len(cpus)):
152             logger.log("CoreSched fewer mems than " + self.cgroup_var_name + "; mem scheduling disabled")
153             memSchedule=False
154
155         logger.log("CoreSched (" + self.cgroup_var_name + "): available units: " + str(cpus))
156
157         reservations = {}
158         mem_reservations = {}
159
160         # allocate the cores to the slivers that have them reserved
161         # TODO: Need to sort this from biggest cpu_cores to smallest
162         for name, rec in slivers.iteritems():
163             rspec = rec["_rspec"]
164             cores = rspec.get(self.slice_attr_name, 0)
165             (cores, bestEffort) = self.decodeCoreSpec(cores)
166
167             lastCpu = None
168
169             while (cores>0):
170                 # one cpu core reserved for best effort and system slices
171                 if len(cpus)<=1:
172                     logger.log("CoreSched: ran out of units while scheduling sliver " + name)
173                 else:
174                     cpu = self.find_compatible_cpu(cpus, lastCpu)
175                     cpus.remove(cpu)
176                     lastCpu = cpu
177
178                     logger.log("CoreSched: allocating unit " + str(cpu) + " to slice " + name)
179                     reservations[name] = reservations.get(name,[]) + [cpu]
180
181                     # now find a memory node to go with the cpu
182                     if memSchedule:
183                         mem = self.find_associated_memnode(mems, cpu)
184                         if mem != None:
185                             mems.remove(mem)
186                             logger.log("CoreSched: allocating memory node " + str(mem) + " to slice " + name)
187                             mem_reservations[name] = mem_reservations.get(name,[]) + [mem]
188                         else:
189                             logger.log("CoreSched: failed to find memory node for cpu" + str(cpu))
190
191                 cores = cores-1
192
193         # the leftovers go to everyone else
194         logger.log("CoreSched: allocating unit " + str(cpus) + " to _default")
195         reservations["_default"] = cpus[:]
196         mem_reservations["_default"] = mems[:]
197
198         # now check and see if any of our slices had the besteffort flag
199         # set
200         for name, rec in slivers.iteritems():
201             rspec = rec["_rspec"]
202             cores = rspec.get(self.slice_attr_name, 0)
203             (cores, bestEffort) = self.decodeCoreSpec(cores)
204
205             # if the bestEffort flag isn't set then we have nothing to do
206             if not bestEffort:
207                 continue
208
209             # note that if a reservation is [], then we don't need to add
210             # bestEffort cores to it, since it is bestEffort by default.
211
212             if reservations.get(name,[]) != []:
213                 reservations[name] = reservations[name] + reservations["_default"]
214                 mem_reservations[name] = mem_reservations.get(name,[]) + mem_reservations["_default"]
215                 logger.log("CoreSched: adding besteffort units to " + name + ". new units = " + str(reservations[name]))
216
217         self.reserveUnits(self.cgroup_var_name, reservations)
218
219         self.reserveUnits(self.cgroup_mem_name, mem_reservations)
220
221     def reserveUnits (self, var_name, reservations):
222         """ give a set of reservations (dictionary of slicename:cpuid_list),
223             write those reservations to the appropriate cgroup files.
224
225             reservations["_default"] is assumed to be the default reservation
226             for slices that do not reserve cores. It's essentially the leftover
227             cpu cores.
228         """
229
230         default = reservations["_default"]
231
232         # set the default vserver cpuset. this will deal with any vservers
233         # that might be created before the nodemanager has had a chance to
234         # update the cpusets.
235         self.reserveDefault(var_name, default)
236
237         for cgroup in self.get_cgroups():
238             if cgroup in reservations:
239                 cpus = reservations[cgroup]
240                 logger.log("CoreSched: reserving " + var_name + " on " + cgroup + ": " + str(cpus))
241             else:
242                 # no log message for default; too much verbosity in the common case
243                 cpus = default
244
245             if glo_coresched_simulate:
246                 print "R", "/dev/cgroup/" + cgroup + "/" + var_name, self.listToRange(cpus)
247             else:
248                 file("/dev/cgroup/" + cgroup + "/" + var_name, "w").write( self.listToRange(cpus) + "\n" )
249
250     def reserveDefault (self, var_name, cpus):
251         if not os.path.exists("/etc/vservers/.defaults/cgroup"):
252             os.makedirs("/etc/vservers/.defaults/cgroup")
253
254         if glo_coresched_simulate:
255             print "RDEF", "/etc/vservers/.defaults/cgroup/" + var_name, self.listToRange(cpus)
256         else:
257             file("/etc/vservers/.defaults/cgroup/" + var_name, "w").write( self.listToRange(cpus) + "\n" )
258
259     def listToRange (self, list):
260         """ take a list of items [1,2,3,5,...] and return it as a range: "1-3,5"
261             for now, just comma-separate
262         """
263         return ",".join( [str(i) for i in list] )
264
265     def get_mems(self):
266         """ return a list of available cpu identifiers: [0,1,2,3...]
267         """
268
269         # the cpus never change, so if it's already been computed then don't
270         # worry about it.
271         if self.mems!=[]:
272             return self.mems
273
274         self.mems = self.get_cgroup_var(self.cgroup_mem_name)
275
276         # build a mapping from memory nodes to the cpus they can be used with
277
278         mems_map={}
279         for item in self.mems:
280            mems_map[item] = self.get_memnode_cpus(item)
281
282         if (len(mems_map)>0):
283             # when NUMA_EMU is enabled, only the last memory node will contain
284             # the cpu_map. For example, if there were originally 2 nodes and
285             # we used NUM_EMU to raise it to 12, then
286             #    mems_map[0]=[]
287             #    ...
288             #    mems_map[4]=[]
289             #    mems_map[5]=[1,3,5,7,9,11]
290             #    mems_map[6]=[]
291             #    ...
292             #    mems_map[10]=[]
293             #    mems_map[11]=[0,2,4,6,8,10]
294             # so, we go from back to front, copying the entries as necessary.
295
296             if mems_map[self.mems[0]] == []:
297                 work = []
298                 for item in reversed(self.mems):
299                     if mems_map[item]!=[]:
300                         work = mems_map[item]
301                     else:  # mems_map[item]==[]
302                         mems_map[item] = work
303
304             self.mems_map = mems_map
305
306         return self.mems
307
308     def find_associated_memnode(self, mems, cpu):
309         """ Given a list of memory nodes and a cpu, see if one of the nodes in
310             the list can be used with that cpu.
311         """
312         for item in mems:
313             if cpu in self.mems_map[item]:
314                 return item
315         return None
316
317     def get_memnode_cpus(self, index):
318         """ for a given memory node, return the CPUs that it is associated
319             with.
320         """
321         fn = "/sys/devices/system/node/node" + str(index) + "/cpulist"
322         if not os.path.exists(fn):
323             logger.log("CoreSched: failed to locate memory node" + fn)
324             return []
325
326         return self.get_cgroup_var(filename=fn)
327
328     def get_core_siblings(self, index):
329         # use core_siblings rather than core_siblings_list, as it's compatible
330         # with older kernels
331         fn = "/sys/devices/system/cpu/cpu" + str(index) + "/topology/core_siblings"
332         if not os.path.exists(fn):
333             return []
334         siblings = []
335
336         x = int(open(fn,"rt").readline().strip(),16)
337         cpuid = 0
338         while (x>0):
339             if (x&1)!=0:
340                 siblings.append(cpuid)
341             x = x >> 1
342             cpuid += 1
343
344         return siblings
345
346
347 # a little self-test
348 if __name__=="__main__":
349     glo_coresched_simulate = True
350
351     x = CoreSched()
352
353     print "cgroups:", ",".join(x.get_cgroups())
354
355     print "cpus:", x.listToRange(x.get_cpus())
356     print "sibling map:"
357     for item in x.get_cpus():
358         print " ", item, ",".join([str(y) for y in x.cpu_siblings.get(item,[])])
359
360     print "mems:", x.listToRange(x.get_mems())
361     print "cpu to memory map:"
362     for item in x.get_mems():
363         print " ", item, ",".join([str(y) for y in x.mems_map.get(item,[])])
364
365     rspec_sl_test1 = {"cpu_cores": "1"}
366     rec_sl_test1 = {"_rspec": rspec_sl_test1}
367
368     rspec_sl_test2 = {"cpu_cores": "5"}
369     rec_sl_test2 = {"_rspec": rspec_sl_test2}
370
371     rspec_sl_test3 = {"cpu_cores": "3b"}
372     rec_sl_test3 = {"_rspec": rspec_sl_test3}
373
374     #slivers = {"sl_test1": rec_sl_test1, "sl_test2": rec_sl_test2}
375
376     slivers = {"arizona_beta": rec_sl_test1, "arizona_test101": rec_sl_test2, "pl_sirius": rec_sl_test3}
377
378     #slivers = {"arizona_beta": rec_sl_test1, "arizona_logmon": rec_sl_test2, "arizona_owl": rec_sl_test3}
379
380     x.adjustCores(slivers)
381