One core is always left unreserved for system slices.
"""
- def __init__(self):
+ def __init__(self, cgroup_var_name="cpuset.cpus", slice_attr_name="cpu_cores"):
self.cpus = []
+ self.cgroup_var_name = cgroup_var_name
+ self.slice_attr_name = slice_attr_name
+
+ def get_cgroup_var(self, name):
+ """ decode cpuset.cpus or cpuset.mems into a list of units that can
+ be reserved.
+ """
+
+ data = open("/dev/cgroup/" + name).readline().strip()
+
+ units = []
+
+ # cpuset.cpus could be something as arbitrary as:
+ # 0,1,2-3,4,5-6
+ # deal with commas and ranges
+ for part in data.split(","):
+ unitRange = part.split("-")
+ if len(unitRange) == 1:
+ unitRange = (unitRange[0], unitRange[0])
+ for i in range(int(unitRange[0]), int(unitRange[1])+1):
+ if not i in units:
+ units.append(i)
+
+ return units
def get_cpus(self):
""" return a list of available cpu identifiers: [0,1,2,3...]
if self.cpus!=[]:
return self.cpus
- cpuset_cpus = open("/dev/cgroup/cpuset.cpus").readline().strip()
+ self.cpus = self.get_cgroup_var(self.cgroup_var_name)
- # cpuset.cpus could be something as arbitrary as:
- # 0,1,2-3,4,5-6
- # deal with commas and ranges
- for part in cpuset_cpus.split(","):
- cpuRange = part.split("-")
- if len(cpuRange) == 1:
- cpuRange = (cpuRange[0], cpuRange[0])
- for i in range(int(cpuRange[0]), int(cpuRange[1])+1):
- if not i in self.cpus:
- self.cpus.append(i)
-
- return self.cpus
+ return self.cpus
def get_cgroups (self):
""" return a list of cgroups
rec['_rspec'] is the effective rspec
"""
- logger.log("CoreSched: adjusting cores")
-
cpus = self.get_cpus()[:]
+ logger.log("CoreSched (" + self.cgroup_var_name + "): available units: " + str(cpus))
+
reservations = {}
# allocate the cores to the slivers that have them reserved
for name, rec in slivers.iteritems():
rspec = rec["_rspec"]
- cores = rspec.get("cpu_cores", 0)
+ cores = rspec.get(self.slice_attr_name, 0)
(cores, bestEffort) = self.decodeCoreSpec(cores)
while (cores>0):
# one cpu core reserved for best effort and system slices
if len(cpus)<=1:
- logger.log("CoreSched: ran out of cpu cores while scheduling: " + name)
+ logger.log("CoreSched: ran out of units while scheduling sliver " + name)
else:
cpu = cpus.pop()
- logger.log("CoreSched: allocating cpu " + str(cpu) + " to slice " + name)
+ logger.log("CoreSched: allocating unit " + str(cpu) + " to slice " + name)
reservations[name] = reservations.get(name,[]) + [cpu]
cores = cores-1
# the leftovers go to everyone else
- logger.log("CoreSched: allocating cpus " + str(cpus) + " to _default")
+ logger.log("CoreSched: allocating unit " + str(cpus) + " to _default")
reservations["_default"] = cpus[:]
- # now check and see if any of our reservations had the besteffort flag
+ # now check and see if any of our slices had the besteffort flag
# set
for name, rec in slivers.iteritems():
rspec = rec["_rspec"]
- cores = rspec.get("cpu_cores", 0)
+ cores = rspec.get(self.slice_attr_name, 0)
(cores, bestEffort) = self.decodeCoreSpec(cores)
- if not (reservations.get(name,[])):
- # if there is no reservation for this slice, then it's already
- # besteffort by default.
+ # if the bestEffort flag isn't set then we have nothing to do
+ if not bestEffort:
continue
- if bestEffort:
+ # note that if a reservation is [], then we don't need to add
+ # bestEffort cores to it, since it is bestEffort by default.
+
+ if reservations.get(name,[]) != []:
reservations[name] = reservations[name] + reservations["_default"]
- logger.log("CoreSched: adding besteffort cores to " + name + ". new cores = " + str(reservations[name]))
+ logger.log("CoreSched: adding besteffort units to " + name + ". new units = " + str(reservations[name]))
self.reserveCores(reservations)
self.reserveDefault(default)
for cgroup in self.get_cgroups():
- cpus = reservations.get(cgroup, default)
-
- logger.log("CoreSched: reserving " + cgroup + " " + str(cpus))
+ if cgroup in reservations:
+ cpus = reservations[cgroup]
+ logger.log("CoreSched: reserving " + self.cgroup_var_name + " on " + cgroup + ": " + str(cpus))
+ else:
+ # no log message for default; too much verbosity in the common case
+ cpus = default
- file("/dev/cgroup/" + cgroup + "/cpuset.cpus", "w").write( self.listToRange(cpus) + "\n" )
+ file("/dev/cgroup/" + cgroup + "/" + self.cgroup_var_name, "w").write( self.listToRange(cpus) + "\n" )
def reserveDefault (self, cpus):
if not os.path.exists("/etc/vservers/.defaults/cgroup"):
os.makedirs("/etc/vservers/.defaults/cgroup")
- file("/etc/vservers/.defaults/cgroup/cpuset.cpus", "w").write( self.listToRange(cpus) + "\n" )
+ file("/etc/vservers/.defaults/cgroup/" + self.cgroup_var_name, "w").write( self.listToRange(cpus) + "\n" )
def listToRange (self, list):
""" take a list of items [1,2,3,5,...] and return it as a range: "1-3,5"
# it turned out, however, that after around 10 cycles of the nodemanager,
# attempts to call GetSlivers were failing with a curl error 60
# we are thus reverting to the version from tag curlwrapper.py-NodeManager-2.0-8
-# the (broekn) pycurl version can be found in tags 2.0-9 and 2.0-10
+# the (broken) pycurl version can be found in tags 2.0-9 and 2.0-10
from subprocess import PIPE, Popen
from select import select
import logger
+verbose=False
+#verbose=True
+
class Sopen(Popen):
def kill(self, signal = signal.SIGTERM):
os.kill(self.pid, signal)
def retrieve(url, cacert=None, postdata=None, timeout=90):
-# options = ('/usr/bin/curl', '--fail', '--silent')
- options = ('/usr/bin/curl', '--fail', )
- if cacert: options += ('--cacert', cacert)
- if postdata: options += ('--data', '@-')
+# command = ('/usr/bin/curl', '--fail', '--silent')
+ command = ('/usr/bin/curl', '--fail', )
+ if cacert: command += ('--cacert', cacert)
+ if postdata: command += ('--data', '@-')
if timeout:
- options += ('--max-time', str(timeout))
- options += ('--connect-timeout', str(timeout))
- p = Sopen(options + (url,), stdin=PIPE, stdout=PIPE, stderr=PIPE, close_fds=True)
+ command += ('--max-time', str(timeout))
+ command += ('--connect-timeout', str(timeout))
+ command += (url,)
+ if verbose:
+ print 'Invoking ',command
+ if postdata: print 'with postdata=',postdata
+ p = Sopen(command , stdin=PIPE, stdout=PIPE, stderr=PIPE, close_fds=True)
if postdata: p.stdin.write(postdata)
p.stdin.close()
sout, sin, serr = select([p.stdout,p.stderr],[],[], timeout)