import resourcealloc
import socket
import sys
+import logging
+import ipaddr
+import operator
from nepi.util import server
from nepi.util import parallel
+import application
+
+MAX_VROUTE_ROUTES = 5
+
class UnresponsiveNodeError(RuntimeError):
pass
+def _castproperty(typ, propattr):
+ def _get(self):
+ return getattr(self, propattr)
+ def _set(self, value):
+ if value is not None or (isinstance(value, basestring) and not value):
+ value = typ(value)
+ return setattr(self, propattr, value)
+ def _del(self, value):
+ return delattr(self, propattr)
+ _get.__name__ = propattr + '_get'
+ _set.__name__ = propattr + '_set'
+ _del.__name__ = propattr + '_del'
+ return property(_get, _set, _del)
+
class Node(object):
BASEFILTERS = {
# Map Node attribute to plcapi filter name
'architecture' : ('arch','value'),
'operatingSystem' : ('fcdistro','value'),
'pl_distro' : ('pldistro','value'),
+ 'city' : ('city','value'),
+ 'country' : ('country','value'),
+ 'region' : ('region','value'),
'minReliability' : ('reliability%(timeframe)s', ']value'),
'maxReliability' : ('reliability%(timeframe)s', '[value'),
'minBandwidth' : ('bw%(timeframe)s', ']value'),
'maxBandwidth' : ('bw%(timeframe)s', '[value'),
+ 'minLoad' : ('load%(timeframe)s', ']value'),
+ 'maxLoad' : ('load%(timeframe)s', '[value'),
+ 'minCpu' : ('cpu%(timeframe)s', ']value'),
+ 'maxCpu' : ('cpu%(timeframe)s', '[value'),
}
DEPENDS_PIDFILE = '/tmp/nepi-depends.pid'
DEPENDS_LOGFILE = '/tmp/nepi-depends.log'
+ RPM_FUSION_URL = 'http://download1.rpmfusion.org/free/fedora/rpmfusion-free-release-stable.noarch.rpm'
+ RPM_FUSION_URL_F12 = 'http://download1.rpmfusion.org/free/fedora/releases/12/Everything/x86_64/os/rpmfusion-free-release-12-1.noarch.rpm'
+
+ minReliability = _castproperty(float, '_minReliability')
+ maxReliability = _castproperty(float, '_maxReliability')
+ minBandwidth = _castproperty(float, '_minBandwidth')
+ maxBandwidth = _castproperty(float, '_maxBandwidth')
+ minCpu = _castproperty(float, '_minCpu')
+ maxCpu = _castproperty(float, '_maxCpu')
+ minLoad = _castproperty(float, '_minLoad')
+ maxLoad = _castproperty(float, '_maxLoad')
def __init__(self, api=None):
if not api:
self.operatingSystem = None
self.pl_distro = None
self.site = None
- self.emulation = None
+ self.city = None
+ self.country = None
+ self.region = None
self.minReliability = None
self.maxReliability = None
self.minBandwidth = None
self.maxBandwidth = None
+ self.minCpu = None
+ self.maxCpu = None
+ self.minLoad = None
+ self.maxLoad = None
self.min_num_external_ifaces = None
self.max_num_external_ifaces = None
self.timeframe = 'm'
self.required_packages = set()
self.required_vsys = set()
self.pythonpath = []
+ self.rpmFusion = False
self.env = collections.defaultdict(list)
+ # Some special applications - initialized when connected
+ self.multicast_forwarder = None
+
# Testbed-derived attributes
self.slicename = None
self.ident_path = None
self.server_key = None
self.home_path = None
+ self.enable_cleanup = False
# Those are filled when an actual node is allocated
self._node_id = None
+ self._yum_dependencies = None
+ self._installed = False
+
+ # Logging
+ self._logger = logging.getLogger('nepi.testbeds.planetlab')
- @property
- def _nepi_testbed_environment_setup(self):
+ def _nepi_testbed_environment_setup_get(self):
command = cStringIO.StringIO()
command.write('export PYTHONPATH=$PYTHONPATH:%s' % (
':'.join(["${HOME}/"+server.shell_escape(s) for s in self.pythonpath])
for envval in envvals:
command.write(' ; export %s=%s' % (envkey, envval))
return command.getvalue()
+ def _nepi_testbed_environment_setup_set(self, value):
+ pass
+ _nepi_testbed_environment_setup = property(
+ _nepi_testbed_environment_setup_get,
+ _nepi_testbed_environment_setup_set)
def build_filters(self, target_filters, filter_map):
for attr, tag in filter_map.iteritems():
)
def find_candidates(self, filter_slice_id=None):
- print >>sys.stderr, "Finding candidates for", self.make_filter_description()
+ self._logger.info("Finding candidates for %s", self.make_filter_description())
fields = ('node_id',)
replacements = {'timeframe':self.timeframe}
# make sure hostnames are resolvable
if candidates:
- print >>sys.stderr, " Found", len(candidates), "candidates. Checking for reachability..."
+ self._logger.info(" Found %s candidates. Checking for reachability...", len(candidates))
hostnames = dict(map(operator.itemgetter('node_id','hostname'),
self._api.GetNodes(list(candidates), ['node_id','hostname'])
candidates = set(parallel.pfilter(resolvable, candidates,
maxthreads = 16))
- print >>sys.stderr, " Found", len(candidates), "reachable candidates."
+ self._logger.info(" Found %s reachable candidates.", len(candidates))
return candidates
if self.slicename is None:
raise AssertionError, "Misconfigured node: unspecified slice"
+ def recover(self):
+ # Mark dependencies installed
+ self._installed = True
+
+ # Clear load attributes, they impair re-discovery
+ self.minReliability = \
+ self.maxReliability = \
+ self.minBandwidth = \
+ self.maxBandwidth = \
+ self.minCpu = \
+ self.maxCpu = \
+ self.minLoad = \
+ self.maxLoad = None
+
def install_dependencies(self):
- if self.required_packages:
- # TODO: make dependant on the experiment somehow...
- pidfile = self.DEPENDS_PIDFILE
- logfile = self.DEPENDS_LOGFILE
+ if self.required_packages and not self._installed:
+ # If we need rpmfusion, we must install the repo definition and the gpg keys
+ if self.rpmFusion:
+ if self.operatingSystem == 'f12':
+ # Fedora 12 requires a different rpmfusion package
+ RPM_FUSION_URL = self.RPM_FUSION_URL_F12
+ else:
+ # This one works for f13+
+ RPM_FUSION_URL = self.RPM_FUSION_URL
+
+ rpmFusion = (
+ '( rpm -q $(rpm -q -p %(RPM_FUSION_URL)s) || rpm -i %(RPM_FUSION_URL)s ) &&'
+ ) % {
+ 'RPM_FUSION_URL' : RPM_FUSION_URL
+ }
+ else:
+ rpmFusion = ''
- # Start process in a "daemonized" way, using nohup and heavy
- # stdin/out redirection to avoid connection issues
- (out,err),proc = rspawn.remote_spawn(
- "( yum -y install %(packages)s && echo SUCCESS || echo FAILURE )" % {
- 'packages' : ' '.join(self.required_packages),
- },
- pidfile = pidfile,
- stdout = logfile,
- stderr = rspawn.STDOUT,
+ if rpmFusion:
+ (out,err),proc = server.popen_ssh_command(
+ rpmFusion,
+ host = self.hostname,
+ port = None,
+ user = self.slicename,
+ agent = None,
+ ident_key = self.ident_path,
+ server_key = self.server_key,
+ timeout = 600,
+ )
- host = self.hostname,
- port = None,
- user = self.slicename,
- agent = None,
- ident_key = self.ident_path,
- server_key = self.server_key,
- sudo = True
- )
+ if proc.wait():
+ raise RuntimeError, "Failed to set up application: %s %s" % (out,err,)
- if proc.wait():
- raise RuntimeError, "Failed to set up application: %s %s" % (out,err,)
+ # Launch p2p yum dependency installer
+ self._yum_dependencies.async_setup()
def wait_provisioning(self, timeout = 20*60):
- # recently provisioned nodes may not be up yet
+ # Wait for the p2p installer
sleeptime = 1.0
totaltime = 0.0
while not self.is_alive():
# If we're above that delay, the unresponsiveness is not due
# to this delay.
raise UnresponsiveNodeError, "Unresponsive host %s" % (self.hostname,)
+
+ # Ensure the node is clean (no apps running that could interfere with operations)
+ if self.enable_cleanup:
+ self.do_cleanup()
def wait_dependencies(self, pidprobe=1, probe=0.5, pidmax=10, probemax=10):
- if self.required_packages:
- pidfile = self.DEPENDS_PIDFILE
-
- # get PID
- pid = ppid = None
- for probenum in xrange(pidmax):
- pidtuple = rspawn.remote_check_pid(
- pidfile = pidfile,
- host = self.hostname,
- port = None,
- user = self.slicename,
- agent = None,
- ident_key = self.ident_path,
- server_key = self.server_key
- )
- if pidtuple:
- pid, ppid = pidtuple
- break
- else:
- time.sleep(pidprobe)
- else:
- raise RuntimeError, "Failed to obtain pidfile for dependency installer"
-
- # wait for it to finish
- while rspawn.RUNNING is rspawn.remote_status(
- pid, ppid,
- host = self.hostname,
- port = None,
- user = self.slicename,
- agent = None,
- ident_key = self.ident_path,
- server_key = self.server_key
- ):
- time.sleep(probe)
- probe = min(probemax, 1.5*probe)
-
- # check results
- logfile = self.DEPENDS_LOGFILE
-
- (out,err),proc = server.popen_ssh_command(
- "cat %s" % (server.shell_escape(logfile),),
- host = self.hostname,
- port = None,
- user = self.slicename,
- agent = None,
- ident_key = self.ident_path,
- server_key = self.server_key
- )
-
- if proc.wait():
- raise RuntimeError, "Failed to install dependencies: %s %s" % (out,err,)
-
- success = out.strip().rsplit('\n',1)[-1].strip() == 'SUCCESS'
- if not success:
- raise RuntimeError, "Failed to install dependencies - buildlog:\n%s\n%s" % (out,err,)
+ # Wait for the p2p installer
+ if self._yum_dependencies and not self._installed:
+ self._yum_dependencies.async_setup_wait()
+ self._installed = True
def is_alive(self):
# Make sure all the paths are created where
# they have to be created for deployment
- (out,err),proc = server.popen_ssh_command(
+ (out,err),proc = server.eintr_retry(server.popen_ssh_command)(
"echo 'ALIVE'",
host = self.hostname,
port = None,
user = self.slicename,
agent = None,
ident_key = self.ident_path,
- server_key = self.server_key
+ server_key = self.server_key,
+ timeout = 60,
+ err_on_timeout = False
)
if proc.wait():
else:
return False
+ def destroy(self):
+ if self.enable_cleanup:
+ self.do_cleanup()
+
+ def do_cleanup(self):
+ if self.testbed().recovering:
+ # WOW - not now
+ return
+
+ self._logger.info("Cleaning up %s", self.hostname)
+
+ cmds = [
+ "sudo -S killall python tcpdump || /bin/true ; "
+ "sudo -S killall python tcpdump || /bin/true ; "
+ "sudo -S kill $(ps -N -T -o pid --no-heading | grep -v $PPID | sort) || /bin/true ",
+ "sudo -S killall -u %(slicename)s || /bin/true ",
+ "sudo -S killall -u root || /bin/true ",
+ "sudo -S killall -u %(slicename)s || /bin/true ",
+ "sudo -S killall -u root || /bin/true ",
+ ]
+
+ for cmd in cmds:
+ (out,err),proc = server.popen_ssh_command(
+ # Some apps need two kills
+ cmd % {
+ 'slicename' : self.slicename ,
+ },
+ host = self.hostname,
+ port = None,
+ user = self.slicename,
+ agent = None,
+ ident_key = self.ident_path,
+ server_key = self.server_key,
+ tty = True, # so that ps -N -T works as advertised...
+ timeout = 60,
+ retry = 3
+ )
+ proc.wait()
+
+ def prepare_dependencies(self):
+ # Configure p2p yum dependency installer
+ if self.required_packages and not self._installed:
+ self._yum_dependencies = application.YumDependency(self._api)
+ self._yum_dependencies.node = self
+ self._yum_dependencies.home_path = "nepi-yumdep"
+ self._yum_dependencies.depends = ' '.join(self.required_packages)
- def configure_routes(self, routes, devs):
+ def routing_method(self, routes, vsys_vnet):
"""
- Add the specified routes to the node's routing table
+ There are two methods, vroute and sliceip.
+
+ vroute:
+ Modifies the node's routing table directly, validating that the IP
+ range lies within the network given by the slice's vsys_vnet tag.
+ This method is the most scalable for very small routing tables
+ that need not modify other routes (including the default)
+
+ sliceip:
+ Uses policy routing and iptables filters to create per-sliver
+ routing tables. It's the most flexible way, but it doesn't scale
+ as well since only 155 routing tables can be created this way.
+
+ This method will return the most appropriate routing method, which will
+ prefer vroute for small routing tables.
"""
- rules = []
+ # For now, sliceip results in kernel panics
+ # so we HAVE to use vroute
+ return 'vroute'
+
+ # We should not make the routing table grow too big
+ if len(routes) > MAX_VROUTE_ROUTES:
+ return 'sliceip'
+
+ vsys_vnet = ipaddr.IPNetwork(vsys_vnet)
+ for route in routes:
+ dest, prefix, nexthop, metric = route
+ dest = ipaddr.IPNetwork("%s/%d" % (dest,prefix))
+ nexthop = ipaddr.IPAddress(nexthop)
+ if dest not in vsys_vnet or nexthop not in vsys_vnet:
+ return 'sliceip'
+
+ return 'vroute'
+
+ def format_route(self, route, dev, method, action):
+ dest, prefix, nexthop, metric = route
+ if method == 'vroute':
+ return (
+ "%s %s%s gw %s %s" % (
+ action,
+ dest,
+ (("/%d" % (prefix,)) if prefix and prefix != 32 else ""),
+ nexthop,
+ dev,
+ )
+ )
+ elif method == 'sliceip':
+ return (
+ "route %s to %s%s via %s metric %s dev %s" % (
+ action,
+ dest,
+ (("/%d" % (prefix,)) if prefix and prefix != 32 else ""),
+ nexthop,
+ metric or 1,
+ dev,
+ )
+ )
+ else:
+ raise AssertionError, "Unknown method"
+
+ def _annotate_routes_with_devs(self, routes, devs, method):
+ dev_routes = []
for route in routes:
for dev in devs:
if dev.routes_here(route):
- # Schedule rule
- dest, prefix, nexthop = route
- rules.append(
- "add %s%s gw %s %s" % (
- dest,
- (("/%d" % (prefix,)) if prefix and prefix != 32 else ""),
- nexthop,
- dev.if_name,
- )
- )
+ dev_routes.append(tuple(route) + (dev.if_name,))
# Stop checking
break
else:
- raise RuntimeError, "Route %s cannot be bound to any virtual interface " \
- "- PL can only handle rules over virtual interfaces. Candidates are: %s" % (route,devs)
+ if method == 'sliceip':
+ dev_routes.append(tuple(route) + ('eth0',))
+ else:
+ raise RuntimeError, "Route %s cannot be bound to any virtual interface " \
+ "- PL can only handle rules over virtual interfaces. Candidates are: %s" % (route,devs)
+ return dev_routes
+
+ def configure_routes(self, routes, devs, vsys_vnet):
+ """
+ Add the specified routes to the node's routing table
+ """
+ rules = []
+ method = self.routing_method(routes, vsys_vnet)
+ tdevs = set()
+
+ # annotate routes with devices
+ dev_routes = self._annotate_routes_with_devs(routes, devs, method)
+ for route in dev_routes:
+ route, dev = route[:-1], route[-1]
+
+ # Schedule rule
+ tdevs.add(dev)
+ rules.append(self.format_route(route, dev, method, 'add'))
+
+ if method == 'sliceip':
+ rules = map('enable '.__add__, tdevs) + rules
+ self._logger.info("Setting up routes for %s using %s", self.hostname, method)
+ self._logger.debug("Routes for %s:\n\t%s", self.hostname, '\n\t'.join(rules))
+
+ self.apply_route_rules(rules, method)
+
+ self._configured_routes = set(routes)
+ self._configured_devs = tdevs
+ self._configured_method = method
+
+ def reconfigure_routes(self, routes, devs, vsys_vnet):
+ """
+ Updates the routes in the node's routing table to match
+ the given route list
+ """
+ method = self._configured_method
+
+ dev_routes = self._annotate_routes_with_devs(routes, devs, method)
+
+ current = self._configured_routes
+ current_devs = self._configured_devs
+
+ new = set(dev_routes)
+ new_devs = set(map(operator.itemgetter(-1), dev_routes))
+
+ deletions = current - new
+ insertions = new - current
+
+ dev_deletions = current_devs - new_devs
+ dev_insertions = new_devs - current_devs
+
+ # Generate rules
+ rules = []
+
+ # Rule deletions first
+ for route in deletions:
+ route, dev = route[:-1], route[-1]
+ rules.append(self.format_route(route, dev, method, 'del'))
+
+ if method == 'sliceip':
+ # Dev deletions now
+ rules.extend(map('disable '.__add__, dev_deletions))
+
+ # Dev insertions now
+ rules.extend(map('enable '.__add__, dev_insertions))
+
+ # Rule insertions now
+ for route in insertions:
+ route, dev = route[:-1], dev[-1]
+ rules.append(self.format_route(route, dev, method, 'add'))
+
+ # Apply
+ self.apply_route_rules(rules, method)
+
+ self._configured_routes = dev_routes
+ self._configured_devs = new_devs
+
+ def apply_route_rules(self, rules, method):
(out,err),proc = server.popen_ssh_command(
- "( sudo -S bash -c 'cat /vsys/vroute.out >&2' & ) ; sudo -S bash -c 'cat > /vsys/vroute.in' ; sleep 0.1" % dict(
- home = server.shell_escape(self.home_path)),
+ "( sudo -S bash -c 'cat /vsys/%(method)s.out >&2' & ) ; sudo -S bash -c 'cat > /vsys/%(method)s.in' ; sleep 0.5" % dict(
+ home = server.shell_escape(self.home_path),
+ method = method),
host = self.hostname,
port = None,
user = self.slicename,
agent = None,
ident_key = self.ident_path,
server_key = self.server_key,
- stdin = '\n'.join(rules)
+ stdin = '\n'.join(rules),
+ timeout = 300
)
if proc.wait() or err:
raise RuntimeError, "Could not set routes (%s) errors: %s%s" % (rules,out,err)
-
-
+ elif out or err:
+ logger.debug("%s said: %s%s", method, out, err)