Fixes for F16.
[nodemanager.git] / coresched.py
1 # $Id$
2 # $URL$
3
4 """Whole core scheduling
5
6 """
7
8 import logger
9 import os
10 import cgroups
11
12 glo_coresched_simulate = False
13 joinpath = os.path.join
14
15 class CoreSched:
16     """ Whole-core scheduler
17
18         The main entrypoint is adjustCores(self, slivers) which takes a
19         dictionary of sliver records. The cpu_cores field is pulled from the
20         effective rspec (rec["_rspec"]) for each sliver.
21
22         If cpu_cores > 0 for a sliver, then that sliver will reserve one or
23         more of the cpu_cores on the machine.
24
25         One core is always left unreserved for system slices.
26     """
27
28     def __init__(self, cgroup_var_name="cpuset.cpus", slice_attr_name="cpu_cores"):
29         self.cpus = []
30         self.cgroup_var_name = cgroup_var_name
31         self.slice_attr_name = slice_attr_name
32         self.cgroup_mem_name = "cpuset.mems"
33         self.mems=[]
34         self.mems_map={}
35         self.cpu_siblings={}
36
37     def get_cgroup_var(self, name=None, subsys=None, filename=None):
38         """ decode cpuset.cpus or cpuset.mems into a list of units that can
39             be reserved.
40         """
41
42         assert(filename!=None or name!=None)
43
44         if filename==None:
45             # filename="/dev/cgroup/" + name
46             filename = reduce(lambda a, b: joinpath(a, b) if b else a, [subsys, name],
47                               cgroups.get_base_path())
48
49         data = open(filename).readline().strip()
50
51         if not data:
52            return []
53
54         units = []
55
56         # cpuset.cpus could be something as arbitrary as:
57         #    0,1,2-3,4,5-6
58         # deal with commas and ranges
59         for part in data.split(","):
60             unitRange = part.split("-")
61             if len(unitRange) == 1:
62                 unitRange = (unitRange[0], unitRange[0])
63             for i in range(int(unitRange[0]), int(unitRange[1])+1):
64                 if not i in units:
65                     units.append(i)
66
67         return units
68
69     def get_cpus(self):
70         """ return a list of available cpu identifiers: [0,1,2,3...]
71         """
72
73         # the cpus never change, so if it's already been computed then don't
74         # worry about it.
75         if self.cpus!=[]:
76             return self.cpus
77
78         self.cpus = self.get_cgroup_var(self.cgroup_var_name, 'cpuset')
79
80         self.cpu_siblings = {}
81         for item in self.cpus:
82            self.cpu_siblings[item] = self.get_core_siblings(item)
83
84         return self.cpus
85
86     def find_cpu_mostsiblings(self, cpus):
87         bestCount = -1
88         bestCpu = -1
89         for cpu in cpus:
90             count = 0
91             for candidate in self.cpu_siblings[cpu]:
92                 if candidate in cpus:
93                     count = count + 1
94                 if (count > bestCount):
95                     bestCount = count
96                     bestCpu = cpu
97
98         assert(bestCpu >= 0)
99         return bestCpu
100
101
102     def find_compatible_cpu(self, cpus, compatCpu):
103         if compatCpu==None:
104            return self.find_cpu_mostsiblings(cpus)
105
106         # find a sibling if we can
107         bestDelta = None
108         bestCpu = None
109         for cpu in cpus:
110            if compatCpu in self.cpu_siblings[cpu]:
111                return cpu
112
113         return self.find_cpu_mostsiblings(cpus)
114
115     def get_cgroups (self):
116         """ return a list of cgroups
117             this might change as vservers are instantiated, so always compute
118             it dynamically.
119         """
120         return cgroups.get_cgroups()
121         #cgroups = []
122         #filenames = os.listdir("/dev/cgroup")
123         #for filename in filenames:
124         #    if os.path.isdir(os.path.join("/dev/cgroup", filename)):
125         #        cgroups.append(filename)
126         #return cgroups
127
128     def decodeCoreSpec (self, cores):
129         """ Decode the value of the core attribute. It's a number, followed by
130             an optional letter "b" to indicate besteffort cores should also
131             be supplied.
132         """
133         bestEffort = False
134
135         if cores.endswith("b"):
136            cores = cores[:-1]
137            bestEffort = True
138
139         try:
140             cores = int(cores)
141         except ValueError:
142             cores = 0
143
144         return (cores, bestEffort)
145
146     def adjustCores (self, slivers):
147         """ slivers is a dict of {sliver_name: rec}
148                 rec is a dict of attributes
149                     rec['_rspec'] is the effective rspec
150         """
151
152         cpus = self.get_cpus()[:]
153         mems = self.get_mems()[:]
154
155         memSchedule=True
156         if (len(mems) != len(cpus)):
157             logger.log("CoreSched fewer mems than " + self.cgroup_var_name + "; mem scheduling disabled")
158             memSchedule=False
159
160         logger.log("CoreSched (" + self.cgroup_var_name + "): available units: " + str(cpus))
161
162         reservations = {}
163         mem_reservations = {}
164
165         # allocate the cores to the slivers that have them reserved
166         # TODO: Need to sort this from biggest cpu_cores to smallest
167         for name, rec in slivers.iteritems():
168             rspec = rec["_rspec"]
169             cores = rspec.get(self.slice_attr_name, 0)
170             (cores, bestEffort) = self.decodeCoreSpec(cores)
171
172             lastCpu = None
173
174             while (cores>0):
175                 # one cpu core reserved for best effort and system slices
176                 if len(cpus)<=1:
177                     logger.log("CoreSched: ran out of units while scheduling sliver " + name)
178                 else:
179                     cpu = self.find_compatible_cpu(cpus, lastCpu)
180                     cpus.remove(cpu)
181                     lastCpu = cpu
182
183                     logger.log("CoreSched: allocating unit " + str(cpu) + " to slice " + name)
184                     reservations[name] = reservations.get(name,[]) + [cpu]
185
186                     # now find a memory node to go with the cpu
187                     if memSchedule:
188                         mem = self.find_associated_memnode(mems, cpu)
189                         if mem != None:
190                             mems.remove(mem)
191                             logger.log("CoreSched: allocating memory node " + str(mem) + " to slice " + name)
192                             mem_reservations[name] = mem_reservations.get(name,[]) + [mem]
193                         else:
194                             logger.log("CoreSched: failed to find memory node for cpu" + str(cpu))
195
196                 cores = cores-1
197
198         # the leftovers go to everyone else
199         logger.log("CoreSched: allocating unit " + str(cpus) + " to _default")
200         reservations["_default"] = cpus[:]
201         mem_reservations["_default"] = mems[:]
202
203         # now check and see if any of our slices had the besteffort flag
204         # set
205         for name, rec in slivers.iteritems():
206             rspec = rec["_rspec"]
207             cores = rspec.get(self.slice_attr_name, 0)
208             (cores, bestEffort) = self.decodeCoreSpec(cores)
209
210             # if the bestEffort flag isn't set then we have nothing to do
211             if not bestEffort:
212                 continue
213
214             # note that if a reservation is [], then we don't need to add
215             # bestEffort cores to it, since it is bestEffort by default.
216
217             if reservations.get(name,[]) != []:
218                 reservations[name] = reservations[name] + reservations["_default"]
219                 mem_reservations[name] = mem_reservations.get(name,[]) + mem_reservations["_default"]
220                 logger.log("CoreSched: adding besteffort units to " + name + ". new units = " + str(reservations[name]))
221
222         self.reserveUnits(self.cgroup_var_name, reservations)
223
224         self.reserveUnits(self.cgroup_mem_name, mem_reservations)
225
226     def reserveUnits (self, var_name, reservations):
227         """ give a set of reservations (dictionary of slicename:cpuid_list),
228             write those reservations to the appropriate cgroup files.
229
230             reservations["_default"] is assumed to be the default reservation
231             for slices that do not reserve cores. It's essentially the leftover
232             cpu cores.
233         """
234
235         default = reservations["_default"]
236
237         # set the default vserver cpuset. this will deal with any vservers
238         # that might be created before the nodemanager has had a chance to
239         # update the cpusets.
240         self.reserveDefault(var_name, default)
241
242         for cgroup in self.get_cgroups():
243             if cgroup in reservations:
244                 cpus = reservations[cgroup]
245                 logger.log("CoreSched: reserving " + var_name + " on " + cgroup + ": " + str(cpus))
246             else:
247                 # no log message for default; too much verbosity in the common case
248                 cpus = default
249
250             if glo_coresched_simulate:
251                 print "R", "/dev/cgroup/" + cgroup + "/" + var_name, self.listToRange(cpus)
252             else:
253                 cgroups.write(cgroup, var_name, self.listToRange(cpus))
254                 #file("/dev/cgroup/" + cgroup + "/" + var_name, "w").write( self.listToRange(cpus) + "\n" )
255
256     def reserveDefault (self, var_name, cpus):
257         #if not os.path.exists("/etc/vservers/.defaults/cgroup"):
258         #    os.makedirs("/etc/vservers/.defaults/cgroup")
259
260         #if glo_coresched_simulate:
261         #    print "RDEF", "/etc/vservers/.defaults/cgroup/" + var_name, self.listToRange(cpus)
262         #else:
263         #    file("/etc/vservers/.defaults/cgroup/" + var_name, "w").write( self.listToRange(cpus) + "\n" )
264         pass
265
266     def listToRange (self, list):
267         """ take a list of items [1,2,3,5,...] and return it as a range: "1-3,5"
268             for now, just comma-separate
269         """
270         return ",".join( [str(i) for i in list] )
271
272     def get_mems(self):
273         """ return a list of available cpu identifiers: [0,1,2,3...]
274         """
275
276         # the cpus never change, so if it's already been computed then don't
277         # worry about it.
278         if self.mems!=[]:
279             return self.mems
280
281         self.mems = self.get_cgroup_var(self.cgroup_mem_name, 'cpuset')
282
283         # build a mapping from memory nodes to the cpus they can be used with
284
285         mems_map={}
286         for item in self.mems:
287            mems_map[item] = self.get_memnode_cpus(item)
288
289         if (len(mems_map)>0):
290             # when NUMA_EMU is enabled, only the last memory node will contain
291             # the cpu_map. For example, if there were originally 2 nodes and
292             # we used NUM_EMU to raise it to 12, then
293             #    mems_map[0]=[]
294             #    ...
295             #    mems_map[4]=[]
296             #    mems_map[5]=[1,3,5,7,9,11]
297             #    mems_map[6]=[]
298             #    ...
299             #    mems_map[10]=[]
300             #    mems_map[11]=[0,2,4,6,8,10]
301             # so, we go from back to front, copying the entries as necessary.
302
303             if mems_map[self.mems[0]] == []:
304                 work = []
305                 for item in reversed(self.mems):
306                     if mems_map[item]!=[]:
307                         work = mems_map[item]
308                     else:  # mems_map[item]==[]
309                         mems_map[item] = work
310
311             self.mems_map = mems_map
312
313         return self.mems
314
315     def find_associated_memnode(self, mems, cpu):
316         """ Given a list of memory nodes and a cpu, see if one of the nodes in
317             the list can be used with that cpu.
318         """
319         for item in mems:
320             if cpu in self.mems_map[item]:
321                 return item
322         return None
323
324     def get_memnode_cpus(self, index):
325         """ for a given memory node, return the CPUs that it is associated
326             with.
327         """
328         fn = "/sys/devices/system/node/node" + str(index) + "/cpulist"
329         if not os.path.exists(fn):
330             logger.log("CoreSched: failed to locate memory node" + fn)
331             return []
332
333         return self.get_cgroup_var(filename=fn)
334
335     def get_core_siblings(self, index):
336         # use core_siblings rather than core_siblings_list, as it's compatible
337         # with older kernels
338         fn = "/sys/devices/system/cpu/cpu" + str(index) + "/topology/core_siblings"
339         if not os.path.exists(fn):
340             return []
341         siblings = []
342
343         x = open(fn, 'rt').readline().strip().split(',')[-1]
344         x = int(x, 16)
345
346         cpuid = 0
347         while (x>0):
348             if (x&1)!=0:
349                 siblings.append(cpuid)
350             x = x >> 1
351             cpuid += 1
352
353         return siblings
354
355
356 # a little self-test
357 if __name__=="__main__":
358     glo_coresched_simulate = True
359
360     x = CoreSched()
361
362     print "cgroups:", ",".join(x.get_cgroups())
363
364     print "cpus:", x.listToRange(x.get_cpus())
365     print "sibling map:"
366     for item in x.get_cpus():
367         print " ", item, ",".join([str(y) for y in x.cpu_siblings.get(item,[])])
368
369     print "mems:", x.listToRange(x.get_mems())
370     print "cpu to memory map:"
371     for item in x.get_mems():
372         print " ", item, ",".join([str(y) for y in x.mems_map.get(item,[])])
373
374     rspec_sl_test1 = {"cpu_cores": "1"}
375     rec_sl_test1 = {"_rspec": rspec_sl_test1}
376
377     rspec_sl_test2 = {"cpu_cores": "5"}
378     rec_sl_test2 = {"_rspec": rspec_sl_test2}
379
380     rspec_sl_test3 = {"cpu_cores": "3b"}
381     rec_sl_test3 = {"_rspec": rspec_sl_test3}
382
383     #slivers = {"sl_test1": rec_sl_test1, "sl_test2": rec_sl_test2}
384
385     slivers = {"arizona_beta": rec_sl_test1, "arizona_test101": rec_sl_test2, "pl_sirius": rec_sl_test3}
386
387     #slivers = {"arizona_beta": rec_sl_test1, "arizona_logmon": rec_sl_test2, "arizona_owl": rec_sl_test3}
388
389     x.adjustCores(slivers)
390