Merge branch 'master' into lxc_devel
authorMarco Yuen <marcoy@gmail.com>
Thu, 15 Dec 2011 16:28:05 +0000 (11:28 -0500)
committerMarco Yuen <marcoy@gmail.com>
Thu, 15 Dec 2011 16:28:05 +0000 (11:28 -0500)
NodeManager.spec
bwlimit.py [new file with mode: 0644]
cgroups.py [new file with mode: 0644]
config_template.xml [new file with mode: 0644]
coresched.py
sliver_libvirt.py [new file with mode: 0644]
sliver_lxc.py [new file with mode: 0644]
sliver_vs.py
slivermanager.py
sshsh [new file with mode: 0755]

index 6320d71..9ed035a 100644 (file)
@@ -72,6 +72,7 @@ chmod 755 $RPM_BUILD_ROOT/%{_initrddir}/*
 install -d -m 755 $RPM_BUILD_ROOT/var/lib/nodemanager
 
 install -D -m 644 logrotate/nodemanager $RPM_BUILD_ROOT/%{_sysconfdir}/logrotate.d/nodemanager
+install -D -m 755 sshsh $RPM_BUILD_ROOT/bin/sshsh
 
 ##########
 %post
@@ -129,6 +130,7 @@ rm -rf $RPM_BUILD_ROOT
 %{_initrddir}/
 %{_sysconfdir}/logrotate.d/nodemanager
 /var/lib/
+/bin/sshsh
 
 %changelog
 * Fri Dec 09 2011 Thierry Parmentelat <thierry.parmentelat@sophia.inria.fr> - nodemanager-2.0-34
diff --git a/bwlimit.py b/bwlimit.py
new file mode 100644 (file)
index 0000000..e310762
--- /dev/null
@@ -0,0 +1,781 @@
+#!/usr/bin/python
+# 
+# Bandwidth limit module for PlanetLab nodes. The intent is to use the
+# Hierarchical Token Bucket (HTB) queueing discipline (qdisc) to allow
+# slices to fairly share access to available node bandwidth. We
+# currently define three classes of "available node bandwidth":
+#
+# 1. Available hardware bandwidth (bwmax): The maximum rate of the
+# hardware.
+#
+# 2. Available capped bandwidth (bwcap): The maximum rate allowed to
+# non-exempt destinations. By default, equal to bwmax, but may be
+# lowered by PIs.
+#
+# 3. Available uncapped ("exempt") bandwidth: The difference between
+# bwmax and what is currently being used of bwcap, or the maximum rate
+# allowed to destinations exempt from caps (e.g., Internet2).
+#
+# All three classes of bandwidth are fairly shared according to the
+# notion of "shares". For instance, if the node is capped at 5 Mbps,
+# there are N slices, and each slice has 1 share, then each slice
+# should get at least 5/N Mbps of bandwidth. How HTB is implemented
+# makes this statement a little too simplistic. What it really means
+# is that during any single time period, only a certain number of
+# bytes can be sent onto the wire. Each slice is guaranteed that at
+# least some small number of its bytes will be sent. Whatever is left
+# over from the budget, is split in proportion to the number of shares
+# each slice has.
+#
+# Even if the node is not capped at a particular limit (bwcap ==
+# bwmax), this module enforces fair share access to bwmax. Also, if
+# the node is capped at a particular limit, rules may optionally be
+# defined that classify certain packets into the "exempt" class. This
+# class receives whatever bandwidth is leftover between bwcap and
+# bwmax; slices fairly share this bandwidth as well.
+#
+# The root context is exempt from sharing and can send as much as it
+# needs to.
+#
+# Some relevant URLs:
+#
+# 1. http://lartc.org/howto               for how to use tc
+# 2. http://luxik.cdi.cz/~devik/qos/htb/  for info on HTB
+#
+# Andy Bavier <acb@cs.princeton.edu>
+# Mark Huang <mlhuang@cs.princeton.edu>
+# Copyright (C) 2006 The Trustees of Princeton University
+#
+# $Id: bwlimit.py,v 1.15 2007/02/07 04:21:11 mlhuang Exp $
+#
+
+import sys, os, re, getopt
+import pwd
+
+
+# Where the tc binary lives
+TC = "/sbin/tc"
+
+# Where the ebtables binary lives
+EBTABLES = "/sbin/ebtables"
+
+# Default interface
+dev = "eth0"
+
+# Verbosity level
+verbose = 0
+
+# bwmin should be small enough that it can be considered negligibly
+# slow compared to the hardware. 8 bits/second appears to be the
+# smallest value supported by tc.
+bwmin = 1000
+
+# bwmax should be large enough that it can be considered at least as
+# fast as the hardware.
+bwmax = 1000*1000*1000
+
+# quantum is the maximum number of bytes that can be borrowed by a
+# share (or slice, if each slice gets 1 share) in one time period
+# (with HZ=1000, 1 ms). If multiple slices are competing for bandwidth
+# above their guarantees, and each is attempting to borrow up to the
+# node bandwidth cap, quantums control how the excess bandwidth is
+# distributed. Slices with 2 shares will borrow twice the amount in
+# one time period as slices with 1 share, so averaged over time, they
+# will get twice as much of the excess bandwidth. The value should be
+# as small as possible and at least 1 MTU. By default, it would be
+# calculated as bwmin/10, but since we use such small a value for
+# bwmin, it's better to just set it to a value safely above 1 Ethernet
+# MTU.
+quantum = 1600
+
+# cburst is the maximum number of bytes that can be burst onto the
+# wire in one time period (with HZ=1000, 1 ms). If multiple slices
+# have data queued for transmission, cbursts control how long each
+# slice can have the wire for. If not specified, it is set to the
+# smallest possible value that would enable the slice's "ceil" rate
+# (usually the node bandwidth cap), to be reached if a slice was able
+# to borrow enough bandwidth to do so. For now, it's unclear how or if
+# to relate this to the notion of shares, so just let tc set the
+# default.
+cburst = None
+
+# There is another parameter that controls how bandwidth is allocated
+# between slices on nodes that is outside the scope of HTB. We enforce
+# a 16 GByte/day total limit on each slice, which works out to about
+# 1.5mbit. If a slice exceeds this byte limit before the day finishes,
+# it is capped at (i.e., its "ceil" rate is set to) the smaller of the
+# node bandwidth cap or 1.5mbit. pl_mom is in charge of enforcing this
+# rule and executes this script to override "ceil".
+
+# We support multiple bandwidth limits, by reserving the top nibble of
+# the minor classid to be the "subclassid". Theoretically, we could
+# support up to 15 subclasses, but for now, we only define two: the
+# "default" subclass 1:10 that is capped at the node bandwidth cap (in
+# this example, 5mbit) and the "exempt" subclass 1:20 that is capped
+# at bwmax (i.e., not capped). The 1:1 parent class exists only to
+# make the borrowing model work. All bandwidth above minimum
+# guarantees is fairly shared (in this example, slice 2 is guaranteed
+# at least 1mbit in addition to fair access to the rest), subject to
+# the restrictions of the class hierarchy: namely, that the total
+# bandwidth to non-exempt destinations should not exceed the node
+# bandwidth cap.
+#
+#                         1:
+#                         |
+#                    1:1 (1gbit)
+#           ______________|_____________
+#          |                            |
+#   1:10 (8bit, 5mbit)           1:20 (8bit, 1gbit)
+#          |                            |
+#  1:100 (8bit, 5mbit)                  |
+#          |                            |
+# 1:1000 (8bit, 5mbit),        1:2000 (8bit, 1gbit),
+# 1:1001 (8bit, 5mbit),        1:2001 (8bit, 1gbit),
+# 1:1002 (1mbit, 5mbit),       1:2002 (1mbit, 1gbit),
+# ...                          ...
+# 1:1FFF (8bit, 5mbit)         1:2FFF (8bit, 1gbit)
+#
+default_minor = 0x1000
+exempt_minor = 0x2000
+
+# root_xid is for the root context. The root context is exempt from
+# fair sharing in both the default and exempt subclasses. The root
+# context gets 5 shares by default.
+root_xid = 0x0000
+root_share = 5
+
+# default_xid is for unclassifiable packets. Packets should not be
+# classified here very often. They can be if a slice's HTB classes are
+# deleted before its processes are. Each slice gets 1 share by
+# default.
+default_xid = 0x0FFF
+default_share = 1
+
+# See tc_util.c and http://physics.nist.gov/cuu/Units/binary.html. Be
+# warned that older versions of tc interpret "kbps", "mbps", "mbit",
+# and "kbit" to mean (in this system) "kibps", "mibps", "mibit", and
+# "kibit" and that if an older version is installed, all rates will
+# be off by a small fraction.
+suffixes = {
+    "":         1,
+    "bit":      1,
+    "kibit":    1024,
+    "kbit":     1000,
+    "mibit":    1024*1024,
+    "mbit":     1000000,
+    "gibit":    1024*1024*1024,
+    "gbit":     1000000000,
+    "tibit":    1024*1024*1024*1024,
+    "tbit":     1000000000000,
+    "bps":      8,
+    "kibps":    8*1024,
+    "kbps":     8000,
+    "mibps":    8*1024*1024,
+    "mbps":     8000000,
+    "gibps":    8*1024*1024*1024,
+    "gbps":     8000000000,
+    "tibps":    8*1024*1024*1024*1024,
+    "tbps":     8000000000000
+}
+
+
+def get_tc_rate(s):
+    """
+    Parses an integer or a tc rate string (e.g., 1.5mbit) into bits/second
+    """
+
+    if type(s) == int:
+        return s
+    m = re.match(r"([0-9.]+)(\D*)", s)
+    if m is None:
+        return -1
+    suffix = m.group(2).lower()
+    if suffixes.has_key(suffix):
+        return int(float(m.group(1)) * suffixes[suffix])
+    else:
+        return -1
+
+def format_bytes(bytes, si = True):
+    """
+    Formats bytes into a string
+    """
+    if si:
+        kilo = 1000.
+    else:
+        # Officially, a kibibyte
+        kilo = 1024.
+
+    if bytes >= (kilo * kilo * kilo):
+        return "%.1f GB" % (bytes / (kilo * kilo * kilo))
+    elif bytes >= 1000000:
+        return "%.1f MB" % (bytes / (kilo * kilo))
+    elif bytes >= 1000:
+        return "%.1f KB" % (bytes / kilo)
+    else:
+        return "%.0f bytes" % bytes
+
+def format_tc_rate(rate):
+    """
+    Formats a bits/second rate into a tc rate string
+    """
+
+    if rate >= 1000000000 and (rate % 1000000000) == 0:
+        return "%.0fgbit" % (rate / 1000000000.)
+    elif rate >= 1000000 and (rate % 1000000) == 0:
+        return "%.0fmbit" % (rate / 1000000.)
+    elif rate >= 1000:
+        return "%.0fkbit" % (rate / 1000.)
+    else:
+        return "%.0fbit" % rate
+
+
+# Parse /etc/planetlab/bwcap (or equivalent)
+def read_bwcap(bwcap_file):
+    bwcap = bwmax
+    try:
+        fp = open(bwcap_file, "r")
+        line = fp.readline().strip()
+        if line:
+            bwcap = get_tc_rate(line)
+    except:
+        pass
+    if bwcap == -1:
+        bwcap = bwmax
+    return bwcap
+
+
+def get_bwcap(dev = dev):
+    """
+    Get the current (live) value of the node bandwidth cap
+    """
+
+    state = tc("-d class show dev %s" % dev)
+    base_re = re.compile(r"class htb 1:10 parent 1:1 .*ceil ([^ ]+) .*")
+    base_classes = filter(None, map(base_re.match, state))
+    if not base_classes:
+        return -1
+    if len(base_classes) > 1:
+        raise Exception, "unable to get current bwcap"
+    return get_tc_rate(base_classes[0].group(1))
+
+
+def get_slice(xid):
+    """
+    Get slice name ("princeton_mlh") from slice xid (500)
+    """
+
+    if xid == root_xid:
+        return "root"
+    if xid == default_xid:
+        return "default"
+    try:
+        return pwd.getpwuid(xid).pw_name
+    except KeyError:
+        pass
+
+    return None
+
+def get_xid(slice):
+    """
+    Get slice xid ("500") from slice name ("princeton_mlh")
+    """
+
+    if slice == "root":
+        return root_xid
+    if slice == "default":
+        return default_xid
+    try:
+        try:
+            return int(slice)
+        except ValueError:
+            pass
+        return pwd.getpwnam(slice).pw_uid
+    except KeyError:
+        pass
+
+    return None
+
+def run(cmd, input = None):
+    """
+    Shortcut for running a shell command
+    """
+
+    try:
+        if verbose:
+            sys.stderr.write("Executing: " + cmd + "\n")
+        if input is None:
+            fileobj = os.popen(cmd, "r")
+            output = fileobj.readlines()
+        else:
+            fileobj = os.popen(cmd, "w")
+            fileobj.write(input)
+            output = None
+        if fileobj.close() is None:
+            return output
+    except Exception, e:
+        pass
+    return None
+
+
+def tc(cmd):
+    """
+    Shortcut for running a tc command
+    """
+
+    return run(TC + " " + cmd)
+
+def ebtables(cmd):
+    """
+    Shortcut for running a ebtables command
+    """
+
+    return run(EBTABLES + " " + cmd)
+
+
+def stop(dev = dev):
+    '''
+    Turn off all queing.  Stops all slice HTBS and reverts to pfifo_fast (the default).
+    '''
+    try:
+        for i in range(0,2):
+            tc("qdisc del dev %s root" % dev)
+    except: pass
+
+
+def init(dev = dev, bwcap = bwmax):
+    """
+    (Re)initialize the bandwidth limits on this node
+    """
+
+    # Load the module used to manage exempt classes
+    #run("/sbin/modprobe ip_set_iphash")
+    # Test the new module included in kernel 3 series
+    run("/sbin/modprobe ip_set_hash_ip")
+
+    # Save current settings
+    paramslist = get(None, dev)
+
+    # Delete root qdisc 1: if it exists. This will also automatically
+    # delete any child classes.
+    for line in tc("qdisc show dev %s" % dev):
+        # Search for the root qdisc 1:
+        m = re.match(r"qdisc htb 1:", line)
+        if m is not None:
+            tc("qdisc del dev %s root handle 1:" % dev)
+            break
+
+    # Initialize HTB. The "default" clause specifies that if a packet
+    # fails classification, it should go into the class with handle
+    # 1FFF.
+    tc("qdisc add dev %s root handle 1: htb default %x" % \
+       (dev, default_minor | default_xid))
+
+    # Set up a parent class from which all subclasses borrow.
+    tc("class add dev %s parent 1: classid 1:1 htb rate %dbit" % \
+       (dev, bwmax))
+
+    # Set up a subclass that represents the node bandwidth cap. We
+    # allow each slice to borrow up to this rate, so it is also
+    # usually the "ceil" rate for each slice.
+    tc("class add dev %s parent 1:1 classid 1:10 htb rate %dbit ceil %dbit" % \
+       (dev, bwmin, bwcap))
+
+    # Set up a subclass for DRL(Distributed Rate Limiting). 
+    # DRL will directly modify that subclass implementing the site limits.
+    tc("class add dev %s parent 1:10 classid 1:100 htb rate %dbit ceil %dbit" % \
+       (dev, bwmin, bwcap))
+
+
+    # Set up a subclass that represents "exemption" from the node
+    # bandwidth cap. Once the node bandwidth cap is reached, bandwidth
+    # to exempt destinations can still be fairly shared up to bwmax.
+    tc("class add dev %s parent 1:1 classid 1:20 htb rate %dbit ceil %dbit" % \
+       (dev, bwmin, bwmax))
+
+    # Set up the root class (and tell VNET what it is). Packets sent
+    # by root end up here and are capped at the node bandwidth
+    # cap.
+    #on(root_xid, dev, share = root_share)
+    #try:
+    #    file("/proc/sys/vnet/root_class", "w").write("%d" % ((1 << 16) | default_minor | root_xid))
+    #except:
+    #    pass
+
+    # Set up the default class. Packets that fail classification end
+    # up here.
+    on(default_xid, dev, share = default_share)
+
+    # Restore old settings
+    for (xid, share,
+         minrate, maxrate,
+         minexemptrate, maxexemptrate,
+         bytes, exemptbytes) in paramslist:
+        if xid not in (root_xid, default_xid):
+            on(xid, dev, share, minrate, maxrate, minexemptrate, maxexemptrate)
+
+
+def get(xid = None, dev = dev):
+    """
+    Get the bandwidth limits and current byte totals for a
+    particular slice xid as a tuple (xid, share, minrate, maxrate,
+    minexemptrate, maxexemptrate, bytes, exemptbytes), or all classes
+    as a list of such tuples.
+    """
+
+    if xid is None:
+        ret = []
+    else:
+        ret = None
+
+    rates = {}
+    rate = None
+
+    # ...
+    # class htb 1:1000 parent 1:10 leaf 1000: prio 0 quantum 8000 rate 8bit ceil 10000Kbit ...
+    #  Sent 6851486 bytes 49244 pkt (dropped 0, overlimits 0 requeues 0)
+    # ...
+    # class htb 1:2000 parent 1:20 leaf 2000: prio 0 quantum 8000 rate 8bit ceil 1000Mbit ...
+    #  Sent 0 bytes 0 pkt (dropped 0, overlimits 0 requeues 0) 
+    # ...
+    for line in tc("-s -d class show dev %s" % dev):
+        # Rate parameter line
+        params = re.match(r"class htb 1:([0-9a-f]+) parent 1:(10|20)", line)
+        # Statistics line
+        stats = re.match(r".* Sent ([0-9]+) bytes", line)
+        # Another class
+        ignore = re.match(r"class htb", line)
+
+        if params is not None:
+            # Which class
+            if params.group(2) == "10":
+                min = 'min'
+                max = 'max'
+                bytes = 'bytes'
+            else:
+                min = 'minexempt'
+                max = 'maxexempt'
+                bytes = 'exemptbytes'
+
+            # Slice ID
+            id = int(params.group(1), 16) & 0x0FFF;
+
+            if rates.has_key(id):
+                rate = rates[id]
+            else:
+                rate = {'id': id}
+
+            # Parse share
+            rate['share'] = 1
+            m = re.search(r"quantum (\d+)", line)
+            if m is not None:
+                rate['share'] = int(m.group(1)) / quantum
+
+            # Parse minrate
+            rate[min] = bwmin
+            m = re.search(r"rate (\w+)", line)
+            if m is not None:
+                rate[min] = get_tc_rate(m.group(1))
+
+            # Parse maxrate
+            rate[max] = bwmax
+            m = re.search(r"ceil (\w+)", line)
+            if m is not None:
+                rate[max] = get_tc_rate(m.group(1))
+
+            # Which statistics to parse
+            rate['stats'] = bytes
+
+            rates[id] = rate
+
+        elif stats is not None:
+            if rate is not None:
+                rate[rate['stats']] = int(stats.group(1))
+
+        elif ignore is not None:
+            rate = None
+
+        # Keep parsing until we get everything
+        if rate is not None and \
+           rate.has_key('min') and rate.has_key('minexempt') and \
+           rate.has_key('max') and rate.has_key('maxexempt') and \
+           rate.has_key('bytes') and rate.has_key('exemptbytes'):
+            params = (rate['id'], rate['share'],
+                      rate['min'], rate['max'],
+                      rate['minexempt'], rate['maxexempt'],
+                      rate['bytes'], rate['exemptbytes'])
+            if xid is None:
+                # Return a list of parameters
+                ret.append(params)
+                rate = None
+            elif xid == rate['id']:
+                # Return the parameters for this class
+                ret = params
+                break
+
+    return ret
+
+
+def on(xid, dev = dev, share = None, minrate = None, maxrate = None, minexemptrate = None, maxexemptrate = None):
+    """
+    Apply specified bandwidth limit to the specified slice xid
+    """
+
+    # Get defaults from current state if available
+    cap = get(xid, dev)
+    if cap is not None:
+        if share is None:
+            share = cap[1]
+        if minrate is None:
+            minrate = cap[2]
+        if maxrate is None:
+            maxrate = cap[3]
+        if minexemptrate is None:
+            minexemptrate = cap[4]
+        if maxexemptrate is None:
+            maxexemptrate = cap[5]
+
+    # Figure out what the current node bandwidth cap is
+    bwcap = get_bwcap(dev)
+
+    # Set defaults
+    if share is None:
+        share = default_share
+    if minrate is None:
+        minrate = bwmin
+    else:
+        minrate = get_tc_rate(minrate)
+    if maxrate is None:
+        maxrate = bwcap
+    else:
+        maxrate = get_tc_rate(maxrate)
+    if minexemptrate is None:
+        minexemptrate = minrate
+    else:
+        minexemptrate = get_tc_rate(minexemptrate)
+    if maxexemptrate is None:
+        maxexemptrate = bwmax
+    else:
+        maxexemptrate = get_tc_rate(maxexemptrate)
+
+    # Sanity checks
+    if maxrate < bwmin:
+        maxrate = bwmin
+    if maxrate > bwcap:
+        maxrate = bwcap
+    if minrate < bwmin:
+        minrate = bwmin
+    if minrate > maxrate:
+        minrate = maxrate
+    if maxexemptrate < bwmin:
+        maxexemptrate = bwmin
+    if maxexemptrate > bwmax:
+        maxexemptrate = bwmax
+    if minexemptrate < bwmin:
+        minexemptrate = bwmin
+    if minexemptrate > maxexemptrate:
+        minexemptrate = maxexemptrate
+
+    # Set up subclasses for the slice
+    tc("class replace dev %s parent 1:100 classid 1:%x htb rate %dbit ceil %dbit quantum %d" % \
+       (dev, default_minor | xid, minrate, maxrate, share * quantum))
+
+    tc("class replace dev %s parent 1:20 classid 1:%x htb rate %dbit ceil %dbit quantum %d" % \
+       (dev, exempt_minor | xid, minexemptrate, maxexemptrate, share * quantum))
+    
+    # Attach a FIFO to each subclass, which helps to throttle back
+    # processes that are sending faster than the token buckets can
+    # support.
+    tc("qdisc replace dev %s parent 1:%x handle %x pfifo" % \
+       (dev, default_minor | xid, default_minor | xid))
+
+    tc("qdisc replace dev %s parent 1:%x handle %x pfifo" % \
+       (dev, exempt_minor | xid, exempt_minor | xid))
+
+    # Setup a filter rule to the root class so each packet originated by a
+    # container interface is classified to it corresponding class
+    # The handle number is a mark created by ebtables with the xid
+    tc("filter replace dev %s parent 1: protocol ip prio 1 handle %d fw flowid 1:%x" % \
+        (dev, xid, default_minor | xid))
+
+def set(xid, share = None, minrate = None, maxrate = None, minexemptrate = None, maxexemptrate = None, dev = dev ):
+    on(xid = xid, dev = dev, share = share,
+       minrate = minrate, maxrate = maxrate,
+       minexemptrate = minexemptrate, maxexemptrate = maxexemptrate)
+
+
+# Remove class associated with specified slice xid. If further packets
+# are seen from this slice, they will be classified into the default
+# class 1:1FFF.
+def off(xid, dev = dev):
+    """
+    Remove class associated with specified slice xid. If further
+    packets are seen from this slice, they will be classified into the
+    default class 1:1FFF.
+    """
+
+    cap = get(xid, dev)
+    if cap is not None:
+        tc("class del dev %s classid 1:%x" % (dev, default_minor | xid))
+        tc("class del dev %s classid 1:%x" % (dev, exempt_minor | xid))
+
+
+def exempt_init(group_name, node_ips):
+    """
+    Initialize the list of destinations exempt from the node bandwidth
+    (burst) cap.
+    """
+
+    # Check of set exists
+    set = run("/sbin/ipset -S " + group_name)
+    if set == None:
+        # Create a hashed IP set of all of these destinations
+        lines = ["-N %s iphash" % group_name]
+        add_cmd = "-A %s " % group_name
+        lines += [(add_cmd + ip) for ip in node_ips]
+        lines += ["COMMIT"]
+        restore = "\n".join(lines) + "\n"
+        run("/sbin/ipset -R", restore)
+    else: # set exists
+        # Check all hosts and add missing.
+        for nodeip in node_ips:
+            if not run("/sbin/ipset -T %s %s" % (group_name, nodeip)):
+               run("/sbin/ipset -A %s %s" % (group_name, nodeip))
+
+
+def usage():
+    bwcap_description = format_tc_rate(get_bwcap())
+        
+    print """
+Usage:
+
+%s [OPTION]... [COMMAND] [ARGUMENT]...
+
+Options:
+        -d device   Network interface (default: %s)
+        -r rate         Node bandwidth cap (default: %s)
+        -q quantum      Share multiplier (default: %d bytes)
+        -n              Print rates in numeric bits per second
+        -v              Enable verbose debug messages
+        -h              This message
+
+Commands:
+        init
+                (Re)initialize all bandwidth parameters
+        on slice [share|-] [minrate|-] [maxrate|-] [minexemptrate|-] [maxexemptrate|-]
+                Set bandwidth parameter(s) for the specified slice
+        off slice
+                Remove all bandwidth parameters for the specified slice
+        get
+                Get all bandwidth parameters for all slices
+        get slice
+                Get bandwidth parameters for the specified slice
+""" % (sys.argv[0], dev, bwcap_description, quantum)
+    sys.exit(1)
+    
+
+def main():
+    global dev, quantum, verbose
+
+    # Defaults
+    numeric = False
+    bwcap = None
+
+    (opts, argv) = getopt.getopt(sys.argv[1:], "d:nr:q:vh")
+    for (opt, optval) in opts:
+        if opt == '-d':
+            dev = optval
+        elif opt == '-n':
+            numeric = True
+        elif opt == '-r':
+            bwcap = get_tc_rate(optval)
+        elif opt == '-q':
+            quantum = int(optval)
+        elif opt == '-v':
+            verbose += 1
+        elif opt == '-h':
+            usage()
+
+    if not bwcap:
+        bwcap = get_bwcap(dev)
+
+    if bwcap == -1:
+        return 0
+
+    if len(argv):
+        if argv[0] == "init" or (argv[0] == "on" and len(argv) == 1):
+            # (Re)initialize
+            init(dev, get_tc_rate(bwcap))
+
+        elif argv[0] == "get" or argv[0] == "show":
+            # Show
+            if len(argv) >= 2:
+                # Show a particular slice
+                xid = get_xid(argv[1])
+                if xid is None:
+                    sys.stderr.write("Error: Invalid slice name or context '%s'\n" % argv[1])
+                    usage()
+                params = get(xid, dev)
+                if params is None:
+                    paramslist = []
+                else:
+                    paramslist = [params]
+            else:
+                # Show all slices
+                paramslist = get(None, dev)
+
+            for (xid, share,
+                 minrate, maxrate,
+                 minexemptrate, maxexemptrate,
+                 bytes, exemptbytes) in paramslist:
+                slice = get_slice(xid)
+                if slice is None:
+                    # Orphaned (not associated with a slice) class
+                    slice = "%d?" % xid
+                if numeric:
+                    print "%s %d %d %d %d %d %d %d" % \
+                          (slice, share,
+                           minrate, maxrate,
+                           minexemptrate, maxexemptrate,
+                           bytes, exemptbytes)
+                else:
+                    print "%s %d %s %s %s %s %s %s" % \
+                          (slice, share,
+                           format_tc_rate(minrate), format_tc_rate(maxrate),
+                           format_tc_rate(minexemptrate), format_tc_rate(maxexemptrate),
+                           format_bytes(bytes), format_bytes(exemptbytes))
+
+        elif len(argv) >= 2:
+            # slice, ...
+            xid = get_xid(argv[1])
+            if xid is None:
+                sys.stderr.write("Error: Invalid slice name or context '%s'\n" % argv[1])
+                usage()
+
+            if argv[0] == "on" or argv[0] == "add" or argv[0] == "replace" or argv[0] == "set":
+                # Enable cap
+                args = []
+                if len(argv) >= 3:
+                    # ... share, minrate, maxrate, minexemptrate, maxexemptrate
+                    casts = [int, get_tc_rate, get_tc_rate, get_tc_rate, get_tc_rate]
+                    for i, arg in enumerate(argv[2:]):
+                        if i >= len(casts):
+                            break
+                        if arg == "-":
+                            args.append(None)
+                        else:
+                            args.append(casts[i](arg))
+                on(xid, dev, *args)
+
+            elif argv[0] == "off" or argv[0] == "del":
+                # Disable cap
+                off(xid, dev)
+
+            else:
+                usage()
+
+        else:
+            usage()
+
+
+if __name__ == '__main__':
+    main()
diff --git a/cgroups.py b/cgroups.py
new file mode 100644 (file)
index 0000000..cf6ca98
--- /dev/null
@@ -0,0 +1,75 @@
+# Simple wrapper arround cgroups so we don't have to worry the type of
+# virtualization the sliver runs on (lxc, qemu/kvm, etc.) managed by libvirt
+#
+# Xavi Leon <xleon@ac.upc.edu>
+
+import os
+import pyinotify
+import logger
+
+# Base dir for libvirt
+BASE_DIR = '/cgroup/libvirt/'
+VIRT_TECHS = ['lxc']
+
+# Global cgroup mapping. 
+CGROUPS = dict()
+
+class CgroupWatch(pyinotify.ProcessEvent):
+
+    def process_IN_CREATE(self, event):
+       path = os.path.join(event.path, event.name)
+       CGROUPS[event.name] = path
+       logger.verbose("Cgroup Notify: Created cgroup %s on %s" % \
+                       (event.name, event.path))
+        
+    def process_IN_DELETE(self, event):
+        try:
+           del CGROUPS[event.name]
+        except:
+            logger.verbose("Cgroup Notify: Cgroup %s does not exist, continuing..."%event.name)
+       logger.verbose("Cgroup Notify: Deleted cgroup %s on %s" % \
+                       (event.name, event.path))
+
+
+logger.verbose("Cgroups: Recognizing already existing cgroups...")
+for virt in VIRT_TECHS:
+    filenames = os.listdir(os.path.join(BASE_DIR, virt))
+    for filename in filenames:
+       path = os.path.join(BASE_DIR, virt, filename)
+       if os.path.isdir(path):
+           CGROUPS[filename] = path
+
+logger.verbose("Cgroups: Initializing watchers...")
+wm = pyinotify.WatchManager()
+notifier = pyinotify.ThreadedNotifier(wm, CgroupWatch())
+for virt in VIRT_TECHS:
+       wdd = wm.add_watch(os.path.join(BASE_DIR, virt),
+                          pyinotify.IN_DELETE | pyinotify.IN_CREATE,
+                          rec=False)
+notifier.daemon = True
+notifier.start()
+
+def get_cgroup_path(name):
+    """ Returns the base path for the cgroup with a specific name """
+    assert CGROUPS.has_key(name), \
+           "No sliver %s managed by libvirt through cgroup!" % name
+    return CGROUPS[name]
+
+def get_base_path():
+    return BASE_DIR
+
+def get_cgroups():
+    """ Returns the list of cgroups active at this moment on the node """
+    return CGROUPS.keys()
+
+def write(name, key, value):
+    """ Writes a value to the file key with the cgroup with name """
+    base_path = get_cgroup_path(name)
+    with open(os.path.join(base_path, key), 'w') as f:
+        print >>f, value
+
+def append(name, key, value):
+    """ Appends a value to the file key with the cgroup with name """
+    base_path = get_cgroup_path(name)
+    with open(os.path.join(base_path, key), 'a') as f:
+        print >>f, value       
diff --git a/config_template.xml b/config_template.xml
new file mode 100644 (file)
index 0000000..a4d6152
--- /dev/null
@@ -0,0 +1,25 @@
+<domain type='lxc'>
+  <name>$name</name>
+  <memory>32768</memory>
+  <os>
+    <type>exe</type>
+    <init>/sbin/init</init>
+  </os>
+  <vcpu>1</vcpu>
+  <clock offset='utc'/>
+  <on_poweroff>destroy</on_poweroff>
+  <on_reboot>restart</on_reboot>
+  <on_crash>destroy</on_crash>
+  <devices>
+    <emulator>/usr/libexec/libvirt_lxc</emulator>
+    <filesystem type='mount'>
+      <source dir='/vservers/$name/'/>
+      <target dir='/'/>
+    </filesystem>
+    <interface type='network'>
+      <source network='default'/>
+      <target dev='veth%xid'/>
+    </interface>
+    <console type='pty' />
+  </devices>
+</domain>
index 3954da9..3959e48 100644 (file)
@@ -7,6 +7,7 @@
 
 import logger
 import os
+import cgroups
 
 glo_coresched_simulate = False
 
@@ -40,8 +41,8 @@ class CoreSched:
         assert(filename!=None or name!=None)
 
         if filename==None:
-            filename="/dev/cgroup/" + name
-
+            filename="/dev/cgroup/" + name
+            filename=cgroups.get_base_path() + name
         data = open(filename).readline().strip()
 
         if not data:
@@ -113,12 +114,13 @@ class CoreSched:
             this might change as vservers are instantiated, so always compute
             it dynamically.
         """
-        cgroups = []
-        filenames = os.listdir("/dev/cgroup")
-        for filename in filenames:
-            if os.path.isdir(os.path.join("/dev/cgroup", filename)):
-                cgroups.append(filename)
-        return cgroups
+        return cgroups.get_cgroups()
+        #cgroups = []
+        #filenames = os.listdir("/dev/cgroup")
+        #for filename in filenames:
+        #    if os.path.isdir(os.path.join("/dev/cgroup", filename)):
+        #        cgroups.append(filename)
+        #return cgroups
 
     def decodeCoreSpec (self, cores):
         """ Decode the value of the core attribute. It's a number, followed by
@@ -245,16 +247,18 @@ class CoreSched:
             if glo_coresched_simulate:
                 print "R", "/dev/cgroup/" + cgroup + "/" + var_name, self.listToRange(cpus)
             else:
-                file("/dev/cgroup/" + cgroup + "/" + var_name, "w").write( self.listToRange(cpus) + "\n" )
+                cgroups.write(cgroup, var_name, self.listToRange(cpus))
+                #file("/dev/cgroup/" + cgroup + "/" + var_name, "w").write( self.listToRange(cpus) + "\n" )
 
     def reserveDefault (self, var_name, cpus):
-        if not os.path.exists("/etc/vservers/.defaults/cgroup"):
-            os.makedirs("/etc/vservers/.defaults/cgroup")
-
-        if glo_coresched_simulate:
-            print "RDEF", "/etc/vservers/.defaults/cgroup/" + var_name, self.listToRange(cpus)
-        else:
-            file("/etc/vservers/.defaults/cgroup/" + var_name, "w").write( self.listToRange(cpus) + "\n" )
+        #if not os.path.exists("/etc/vservers/.defaults/cgroup"):
+        #    os.makedirs("/etc/vservers/.defaults/cgroup")
+
+        #if glo_coresched_simulate:
+        #    print "RDEF", "/etc/vservers/.defaults/cgroup/" + var_name, self.listToRange(cpus)
+        #else:
+        #    file("/etc/vservers/.defaults/cgroup/" + var_name, "w").write( self.listToRange(cpus) + "\n" )
+        pass
 
     def listToRange (self, list):
         """ take a list of items [1,2,3,5,...] and return it as a range: "1-3,5"
diff --git a/sliver_libvirt.py b/sliver_libvirt.py
new file mode 100644 (file)
index 0000000..52c8bde
--- /dev/null
@@ -0,0 +1,160 @@
+#
+
+"""LibVirt slivers"""
+
+import accounts
+import logger
+import subprocess
+import os
+import os.path
+import libvirt
+import sys
+import shutil
+import bwlimit
+import cgroups
+
+from string import Template
+
+STATES = {
+    libvirt.VIR_DOMAIN_NOSTATE: 'no state',
+    libvirt.VIR_DOMAIN_RUNNING: 'running',
+    libvirt.VIR_DOMAIN_BLOCKED: 'blocked on resource',
+    libvirt.VIR_DOMAIN_PAUSED: 'paused by user',
+    libvirt.VIR_DOMAIN_SHUTDOWN: 'being shut down',
+    libvirt.VIR_DOMAIN_SHUTOFF: 'shut off',
+    libvirt.VIR_DOMAIN_CRASHED: 'crashed',
+}
+
+connections = dict()
+
+# Helper methods
+
+def getConnection(sliver_type):
+    # TODO: error checking
+    # vtype is of the form sliver.[LXC/QEMU] we need to lower case to lxc/qemu
+    vtype = sliver_type.split('.')[1].lower()
+    uri = vtype + '://'
+    return connections.setdefault(uri, libvirt.open(uri))
+
+def debuginfo(dom):
+    ''' Helper method to get a "nice" output of the info struct for debug'''
+    [state, maxmem, mem, ncpu, cputime] = dom.info()
+    return '%s is %s, maxmem = %s, mem = %s, ncpu = %s, cputime = %s' % (dom.name(), STATES.get(state, state), maxmem, mem, ncpu, cputime)
+
+# Common Libvirt code
+
+class Sliver_Libvirt(accounts.Account):
+
+    def __init__(self, rec):
+        self.name = rec['name']
+        logger.verbose ('sliver_libvirt: %s init'%(self.name))
+         
+        # Assume the directory with the image and config files
+        # are in place
+        
+        self.keys = ''
+        self.rspec = {}
+        self.slice_id = rec['slice_id']
+        self.enabled = True
+        self.conn = getConnection(rec['type'])
+        self.xid = bwlimit.get_xid(self.name)
+        
+        try:
+            self.dom = self.conn.lookupByName(self.name)
+        except:
+            logger.verbose('sliver_libvirt: Domain %s does not exist UNEXPECTED: %s'%(self.name, sys.exc_info()[0]))
+
+
+    def start(self, delay=0):
+        ''' Just start the sliver '''
+        logger.verbose('sliver_libvirt: %s start'%(self.name))
+
+        # Check if it's running to avoid throwing an exception if the
+        # domain was already running, create actually means start
+        if not self.is_running():
+            self.dom.create()
+        else:
+            logger.verbose('sliver_libvirt: sliver %s already started'%(self.name))
+
+        # After the VM is started... we can play with the virtual interface
+        # Create the ebtables rule to mark the packets going out from the virtual
+        # interface to the actual device so the filter canmatch against the mark
+        bwlimit.ebtables("-A INPUT -i veth%d -j mark --set-mark %d" % \
+            (self.xid, self.xid))
+           
+
+    def stop(self):
+        logger.verbose('sliver_libvirt: %s stop'%(self.name))
+        
+        # Remove the ebtables rule before stopping 
+        bwlimit.ebtables("-D INPUT -i veth%d -j mark --set-mark %d" % \
+            (self.xid, self.xid))
+        
+        try:
+            self.dom.destroy()
+        except:
+            logger.verbose('sliver_libvirt: Domain %s not running UNEXPECTED: %s'%(self.name, sys.exc_info()[0]))
+            print 'sliver_libvirt: Domain %s not running UNEXPECTED: %s'%(self.name, sys.exc_info()[0])
+        
+    def is_running(self):
+        ''' Return True if the domain is running '''
+        logger.verbose('sliver_libvirt: %s is_running'%self.name)
+        try:
+            [state, _, _, _, _] = self.dom.info()
+            if state == libvirt.VIR_DOMAIN_RUNNING:
+                logger.verbose('sliver_libvirt: %s is RUNNING'%self.name)
+                return True
+            else:
+                info = debuginfo(self.dom)
+                logger.verbose('sliver_libvirt: %s is NOT RUNNING...\n%s'%(self.name, info))
+                return False
+        except:
+            logger.verbose('sliver_libvirt: UNEXPECTED ERROR in %s...\n%s'%(self.name, sys.exc_info[0]))
+            print 'sliver_libvirt: UNEXPECTED ERROR in %s...\n%s'%(self.name, sys.exc_info[0])
+
+    def configure(self, rec):
+
+        #sliver.[LXC/QEMU] tolower case
+        #sliver_type = rec['type'].split('.')[1].lower() 
+
+        #BASE_DIR = '/cgroup/libvirt/%s/%s/'%(sliver_type, self.name)
+
+        # Disk allocation
+        # No way through cgroups... figure out how to do that with user/dir quotas.
+        # There is no way to do quota per directory. Chown-ing would create
+        # problems as username namespaces are not yet implemented (and thus, host
+        # and containers share the same name ids
+
+        # Btrfs support quota per volumes
+
+        # It will depend on the FS selection
+        if rec.has_key('disk_max'):
+            disk_max = rec['disk_max']
+            if disk_max == 0:
+                # unlimited 
+                pass
+            else:
+                # limit to certain number
+                pass
+
+        # Memory allocation
+        if rec.has_key('memlock_hard'):
+            mem = rec['memlock_hard'] * 1024 # hard limit in bytes
+            cgroups.write(self.name, 'memory.limit_in_bytes', mem)
+        if rec.has_key('memlock_soft'):
+            mem = rec['memlock_soft'] * 1024 # soft limit in bytes
+            cgroups.write(self.name, 'memory.soft_limit_in_bytes', mem)
+
+        # CPU allocation
+        # Only cpu_shares until figure out how to provide limits and guarantees
+        # (RT_SCHED?)
+        if rec.has_key('cpu_share'): 
+            cpu_share = rec['cpu_share']
+            cgroups.write(self.name, 'cpu.shares', cpu_share)
+
+        # Call the upper configure method (ssh keys...)
+        accounts.Account.configure(self, rec)
+
+
+
+
diff --git a/sliver_lxc.py b/sliver_lxc.py
new file mode 100644 (file)
index 0000000..1b64f34
--- /dev/null
@@ -0,0 +1,141 @@
+#
+
+"""LXC slivers"""
+
+import accounts
+import logger
+import subprocess
+import os
+import libvirt
+import sys
+from string import Template
+import bwlimit
+import sliver_libvirt as lv
+
+
+class Sliver_LXC(lv.Sliver_Libvirt):
+    """This class wraps LXC commands"""
+   
+    SHELL = '/bin/sshsh' 
+    
+    TYPE = 'sliver.LXC'
+    # Need to add a tag at myplc to actually use this account
+    # type = 'sliver.LXC'
+
+    REF_IMG_BASE_DIR = '/vservers/.lvref'
+    CON_BASE_DIR     = '/vservers'
+
+    @staticmethod
+    def create(name, rec=None):
+        logger.verbose ('sliver_lxc: %s create'%(name))
+        conn = lv.getConnection(Sliver_LXC.TYPE)
+       
+        ''' Create dirs, copy fs image, lxc_create '''
+        # Get the type of image from vref myplc tags specified as:
+        # pldistro = lxc
+        # fcdistro = squeeze
+        # arch x86_64
+        vref = rec['vref']
+        if vref is None:
+            logger.log('sliver_libvirt: %s: WARNING - no vref attached defaults to lxc-debian' % (name))
+            vref = "lxc-squeeze-x86_64"
+
+        refImgDir    = os.path.join(Sliver_LXC.REF_IMG_BASE_DIR, vref)
+        containerDir = os.path.join(Sliver_LXC.CON_BASE_DIR, name)
+
+        # check the template exists -- there's probably a better way..
+        if not os.path.isdir(refImgDir):
+            logger.log('sliver_lxc: %s: ERROR Could not create sliver - reference image %s not found' % (name,vref))
+            return
+
+        # Snapshot the reference image fs (assume the reference image is in its own
+        # subvolume)
+        command = ['btrfs', 'subvolume', 'snapshot', refImgDir, containerDir]
+        logger.log_call(command, timeout=15*60)
+        command = ['chmod', '755', containerDir]
+        logger.log_call(command, timeout=15*60)
+
+        # TODO: set quotas...
+
+        # Set hostname. A valid hostname cannot have '_'
+        #with open(os.path.join(containerDir, 'etc/hostname'), 'w') as f:
+        #    print >>f, name.replace('_', '-')
+
+        # Add slices group if not already present
+        command = ['/usr/sbin/groupadd', 'slices']
+        logger.log_call(command, timeout=15*60)
+        
+        # Add unix account (TYPE is specified in the subclass)
+        command = ['/usr/sbin/useradd', '-g', 'slices', '-s', Sliver_LXC.SHELL, name, '-p', '*']
+        logger.log_call(command, timeout=15*60)
+        command = ['mkdir', '/home/%s/.ssh'%name]
+        logger.log_call(command, timeout=15*60)
+
+        # Create PK pair keys to connect from the host to the guest without
+        # password... maybe remove the need for authentication inside the
+        # guest?
+        command = ['su', '-s', '/bin/bash', '-c', 'ssh-keygen -t rsa -N "" -f /home/%s/.ssh/id_rsa'%(name)]
+        logger.log_call(command, timeout=15*60)
+        
+        command = ['chown', '-R', '%s.slices'%name, '/home/%s/.ssh'%name]
+        logger.log_call(command, timeout=15*60)
+        
+        command = ['mkdir', '%s/root/.ssh'%containerDir]
+        logger.log_call(command, timeout=15*60)
+
+        command = ['cp', '/home/%s/.ssh/id_rsa.pub'%name, '%s/root/.ssh/authorized_keys'%containerDir]
+        logger.log_call(command, timeout=15*60)
+
+        # Lookup for xid and create template after the user is created so we
+        # can get the correct xid based on the name of the slice
+        xid = bwlimit.get_xid(name)
+
+        # Template for libvirt sliver configuration
+        try:
+            with open(Sliver_LXC.REF_IMG_BASE_DIR + '/config_template.xml') as f:
+                template = Template(f.read())
+                xml  = template.substitute(name=name, xid=xid)
+        except IOError:
+            logger.log('Cannot find XML template file')
+            return
+        # Lookup for the sliver before actually
+        # defining it, just in case it was already defined.
+        try:
+            dom = conn.lookupByName(name)
+        except:
+            dom = conn.defineXML(xml)
+        logger.verbose('lxc_create: %s -> %s'%(name, lv.debuginfo(dom)))
+
+
+    @staticmethod
+    def destroy(name):
+        logger.verbose ('sliver_lxc: %s destroy'%(name))
+        conn = lv.getConnection(Sliver_LXC.TYPE)
+        
+        containerDir = Sliver_LXC.CON_BASE_DIR + '/%s'%(name)
+
+        try:
+            # Destroy libvirt domain
+            dom = conn.lookupByName(name)
+        except:
+            logger.verbose('sliver_lxc: Domain %s does not exist! UNEXPECTED'%name)
+            return
+
+        try:    
+            dom.destroy()
+        except:
+            logger.verbose('sliver_lxc: Domain %s not running... continuing.'%name)
+        
+        dom.undefine()
+
+        # Remove user after destroy domain to force logout
+        command = ['/usr/sbin/userdel', '-f', '-r', name]
+        logger.log_call(command, timeout=15*60)
+            
+        # Remove rootfs of destroyed domain
+        command = ['btrfs', 'subvolume', 'delete', containerDir]
+        logger.log_call(command, timeout=15*60)
+
+        logger.verbose('sliver_libvirt: %s destroyed.'%name)
+
index 8382c6d..2febb37 100644 (file)
@@ -67,7 +67,6 @@ class Sliver_VS(accounts.Account, vserver.VServer):
             self.create(name, rec)
             logger.log("sliver_vs: %s: second chance..."%name)
             vserver.VServer.__init__(self, name,logfile='/var/log/nodemanager')
-
         self.keys = ''
         self.rspec = {}
         self.slice_id = rec['slice_id']
@@ -187,7 +186,7 @@ class Sliver_VS(accounts.Account, vserver.VServer):
             if code:
                 logger.log("vsliver_vs: %s: Installed new initscript in %s"%(self.name,sliver_initscript))
                 if self.is_running():
-                    # Only need to rerun the initscript if the vserver is
+                     # Only need to rerun the initscript if the vserver is
                     # already running. If the vserver isn't running, then the
                     # initscript will automatically be started by
                     # /etc/rc.d/vinit when the vserver is started.
index 383379e..3e4fb1f 100644 (file)
@@ -17,6 +17,7 @@ import database
 import accounts
 import controller
 import sliver_vs
+import sliver_lxc
 
 try: from bwlimit import bwmin, bwmax
 except ImportError: bwmin, bwmax = 8, 1000*1000*1000
@@ -208,7 +209,9 @@ def start():
     for resname, default_amount in sliver_vs.DEFAULT_ALLOCATION.iteritems():
         DEFAULT_ALLOCATION[resname]=default_amount
 
-    accounts.register_class(sliver_vs.Sliver_VS)
+    #accounts.register_class(sliver_vs.Sliver_VS)
+    #accounts.register_class(sliver_libvirt.Sliver_LV)
+    accounts.register_class(sliver_lxc.Sliver_LXC)
     accounts.register_class(controller.Controller)
     database.start()
     api_calls.deliver_ticket = deliver_ticket
diff --git a/sshsh b/sshsh
new file mode 100755 (executable)
index 0000000..edcd667
--- /dev/null
+++ b/sshsh
@@ -0,0 +1,5 @@
+#!/bin/bash
+MAC=$(virsh --connect lxc:// --readonly dumpxml $USER|grep mac|awk -F\' '{print $2}')
+IP=$(cat /var/lib/libvirt/dnsmasq/default.leases | grep $MAC | awk '{print $3}')
+shift
+ssh root@$IP "$@"