Merge branch 'devel' of ssh://git.planet-lab.org/git/nodemanager into devel
[nodemanager.git] / coresched.py
1 # $Id$
2 # $URL$
3
4 """Whole core scheduling
5
6 """
7
8 import logger
9 import os
10
11 class CoreSched:
12     """ Whole-core scheduler
13
14         The main entrypoint is adjustCores(self, slivers) which takes a
15         dictionary of sliver records. The cpu_cores field is pulled from the
16         effective rspec (rec["_rspec"]) for each sliver.
17
18         If cpu_cores > 0 for a sliver, then that sliver will reserve one or
19         more of the cpu_cores on the machine.
20
21         One core is always left unreserved for system slices.
22     """
23
24     def __init__(self, cgroup_var_name="cpuset.cpus", slice_attr_name="cpu_cores"):
25         self.cpus = []
26         self.cgroup_var_name = cgroup_var_name
27         self.slice_attr_name = slice_attr_name
28
29     def get_cgroup_var(self, name):
30         """ decode cpuset.cpus or cpuset.mems into a list of units that can
31             be reserved.
32         """
33
34         data = open("/dev/cgroup/" + name).readline().strip()
35
36         units = []
37
38         # cpuset.cpus could be something as arbitrary as:
39         #    0,1,2-3,4,5-6
40         # deal with commas and ranges
41         for part in data.split(","):
42             unitRange = part.split("-")
43             if len(unitRange) == 1:
44                 unitRange = (unitRange[0], unitRange[0])
45             for i in range(int(unitRange[0]), int(unitRange[1])+1):
46                 if not i in units:
47                     units.append(i)
48
49         return units
50
51     def get_cpus(self):
52         """ return a list of available cpu identifiers: [0,1,2,3...]
53         """
54
55         # the cpus never change, so if it's already been computed then don't
56         # worry about it.
57         if self.cpus!=[]:
58             return self.cpus
59
60         self.cpus = self.get_cgroup_var(self.cgroup_var_name)
61
62         return self.cpus
63
64     def get_cgroups (self):
65         """ return a list of cgroups
66             this might change as vservers are instantiated, so always compute
67             it dynamically.
68         """
69         cgroups = []
70         filenames = os.listdir("/dev/cgroup")
71         for filename in filenames:
72             if os.path.isdir(os.path.join("/dev/cgroup", filename)):
73                 cgroups.append(filename)
74         return cgroups
75
76     def decodeCoreSpec (self, cores):
77         """ Decode the value of the core attribute. It's a number, followed by
78             an optional letter "b" to indicate besteffort cores should also
79             be supplied.
80         """
81         bestEffort = False
82
83         if cores.endswith("b"):
84            cores = cores[:-1]
85            bestEffort = True
86
87         try:
88             cores = int(cores)
89         except ValueError:
90             cores = 0
91
92         return (cores, bestEffort)
93
94     def adjustCores (self, slivers):
95         """ slivers is a dict of {sliver_name: rec}
96                 rec is a dict of attributes
97                     rec['_rspec'] is the effective rspec
98         """
99
100         cpus = self.get_cpus()[:]
101
102         logger.log("CoreSched (" + self.cgroup_var_name + "): available units: " + str(cpus))
103
104         reservations = {}
105
106         # allocate the cores to the slivers that have them reserved
107         for name, rec in slivers.iteritems():
108             rspec = rec["_rspec"]
109             cores = rspec.get(self.slice_attr_name, 0)
110             (cores, bestEffort) = self.decodeCoreSpec(cores)
111
112             while (cores>0):
113                 # one cpu core reserved for best effort and system slices
114                 if len(cpus)<=1:
115                     logger.log("CoreSched: ran out of units while scheduling sliver " + name)
116                 else:
117                     cpu = cpus.pop()
118                     logger.log("CoreSched: allocating unit " + str(cpu) + " to slice " + name)
119                     reservations[name] = reservations.get(name,[]) + [cpu]
120
121                 cores = cores-1
122
123         # the leftovers go to everyone else
124         logger.log("CoreSched: allocating unit " + str(cpus) + " to _default")
125         reservations["_default"] = cpus[:]
126
127         # now check and see if any of our slices had the besteffort flag
128         # set
129         for name, rec in slivers.iteritems():
130             rspec = rec["_rspec"]
131             cores = rspec.get(self.slice_attr_name, 0)
132             (cores, bestEffort) = self.decodeCoreSpec(cores)
133
134             # if the bestEffort flag isn't set then we have nothing to do
135             if not bestEffort:
136                 continue
137
138             # note that if a reservation is [], then we don't need to add
139             # bestEffort cores to it, since it is bestEffort by default.
140
141             if reservations.get(name,[]) != []:
142                 reservations[name] = reservations[name] + reservations["_default"]
143                 logger.log("CoreSched: adding besteffort units to " + name + ". new units = " + str(reservations[name]))
144
145         self.reserveCores(reservations)
146
147     def reserveCores (self, reservations):
148         """ give a set of reservations (dictionary of slicename:cpuid_list),
149             write those reservations to the appropriate cgroup files.
150
151             reservations["_default"] is assumed to be the default reservation
152             for slices that do not reserve cores. It's essentially the leftover
153             cpu cores.
154         """
155
156         default = reservations["_default"]
157
158         # set the default vserver cpuset. this will deal with any vservers
159         # that might be created before the nodemanager has had a chance to
160         # update the cpusets.
161         self.reserveDefault(default)
162
163         for cgroup in self.get_cgroups():
164             if cgroup in reservations:
165                 cpus = reservations[cgroup]
166                 logger.log("CoreSched: reserving " + self.cgroup_var_name + " on " + cgroup + ": " + str(cpus))
167             else:
168                 # no log message for default; too much verbosity in the common case
169                 cpus = default
170
171             file("/dev/cgroup/" + cgroup + "/" + self.cgroup_var_name, "w").write( self.listToRange(cpus) + "\n" )
172
173     def reserveDefault (self, cpus):
174         if not os.path.exists("/etc/vservers/.defaults/cgroup"):
175             os.makedirs("/etc/vservers/.defaults/cgroup")
176
177         file("/etc/vservers/.defaults/cgroup/" + self.cgroup_var_name, "w").write( self.listToRange(cpus) + "\n" )
178
179     def listToRange (self, list):
180         """ take a list of items [1,2,3,5,...] and return it as a range: "1-3,5"
181             for now, just comma-separate
182         """
183         return ",".join( [str(i) for i in list] )
184
185 # a little self-test
186 if __name__=="__main__":
187     x = CoreSched()
188
189     print "cpus:", x.listToRange(x.get_cpus())
190     print "cgroups:", ",".join(x.get_cgroups())
191
192     # a quick self-test for ScottLab slices sl_test1 and sl_test2
193     #    sl_test1 = 1 core
194     #    sl_test2 = 1 core
195
196     rspec_sl_test1 = {"cpu_cores": 1}
197     rec_sl_test1 = {"_rspec": rspec_sl_test1}
198
199     rspec_sl_test2 = {"cpu_cores": 1}
200     rec_sl_test2 = {"_rspec": rspec_sl_test2}
201
202     slivers = {"sl_test1": rec_sl_test1, "sl_test2": rec_sl_test2}
203
204     x.adjustCores(slivers)
205