import socket
import sys
import logging
+import ipaddr
+import operator
from nepi.util import server
from nepi.util import parallel
import application
+MAX_VROUTE_ROUTES = 5
+
class UnresponsiveNodeError(RuntimeError):
pass
'architecture' : ('arch','value'),
'operatingSystem' : ('fcdistro','value'),
'pl_distro' : ('pldistro','value'),
+ 'city' : ('city','value'),
+ 'country' : ('country','value'),
+ 'region' : ('region','value'),
'minReliability' : ('reliability%(timeframe)s', ']value'),
'maxReliability' : ('reliability%(timeframe)s', '[value'),
'minBandwidth' : ('bw%(timeframe)s', ']value'),
'maxBandwidth' : ('bw%(timeframe)s', '[value'),
+ 'minLoad' : ('load%(timeframe)s', ']value'),
+ 'maxLoad' : ('load%(timeframe)s', '[value'),
+ 'minCpu' : ('cpu%(timeframe)s', ']value'),
+ 'maxCpu' : ('cpu%(timeframe)s', '[value'),
}
DEPENDS_PIDFILE = '/tmp/nepi-depends.pid'
maxReliability = _castproperty(float, '_maxReliability')
minBandwidth = _castproperty(float, '_minBandwidth')
maxBandwidth = _castproperty(float, '_maxBandwidth')
+ minCpu = _castproperty(float, '_minCpu')
+ maxCpu = _castproperty(float, '_maxCpu')
+ minLoad = _castproperty(float, '_minLoad')
+ maxLoad = _castproperty(float, '_maxLoad')
def __init__(self, api=None):
if not api:
self.operatingSystem = None
self.pl_distro = None
self.site = None
+ self.city = None
+ self.country = None
+ self.region = None
self.minReliability = None
self.maxReliability = None
self.minBandwidth = None
self.maxBandwidth = None
+ self.minCpu = None
+ self.maxCpu = None
+ self.minLoad = None
+ self.maxLoad = None
self.min_num_external_ifaces = None
self.max_num_external_ifaces = None
self.timeframe = 'm'
self.rpmFusion = False
self.env = collections.defaultdict(list)
+ # Some special applications - initialized when connected
+ self.multicast_forwarder = None
+
# Testbed-derived attributes
self.slicename = None
self.ident_path = None
self.server_key = None
self.home_path = None
+ self.enable_cleanup = False
# Those are filled when an actual node is allocated
self._node_id = None
self.minReliability = \
self.maxReliability = \
self.minBandwidth = \
- self.maxBandwidth = None
+ self.maxBandwidth = \
+ self.minCpu = \
+ self.maxCpu = \
+ self.minLoad = \
+ self.maxLoad = None
def install_dependencies(self):
if self.required_packages and not self._installed:
user = self.slicename,
agent = None,
ident_key = self.ident_path,
- server_key = self.server_key
+ server_key = self.server_key,
+ timeout = 600,
)
if proc.wait():
# If we're above that delay, the unresponsiveness is not due
# to this delay.
raise UnresponsiveNodeError, "Unresponsive host %s" % (self.hostname,)
+
+ # Ensure the node is clean (no apps running that could interfere with operations)
+ if self.enable_cleanup:
+ self.do_cleanup()
def wait_dependencies(self, pidprobe=1, probe=0.5, pidmax=10, probemax=10):
# Wait for the p2p installer
user = self.slicename,
agent = None,
ident_key = self.ident_path,
- server_key = self.server_key
+ server_key = self.server_key,
+ timeout = 60,
+ err_on_timeout = False
)
if proc.wait():
else:
return False
+ def destroy(self):
+ if self.enable_cleanup:
+ self.do_cleanup()
+
+ def do_cleanup(self):
+ if self.testbed().recovering:
+ # WOW - not now
+ return
+
+ self._logger.info("Cleaning up %s", self.hostname)
+
+ cmds = [
+ "sudo -S killall python tcpdump || /bin/true ; "
+ "sudo -S killall python tcpdump || /bin/true ; "
+ "sudo -S kill $(ps -N -T -o pid --no-heading | grep -v $PPID | sort) || /bin/true ",
+ "sudo -S killall -u %(slicename)s || /bin/true ",
+ "sudo -S killall -u root || /bin/true ",
+ "sudo -S killall -u %(slicename)s || /bin/true ",
+ "sudo -S killall -u root || /bin/true ",
+ ]
+
+ for cmd in cmds:
+ (out,err),proc = server.popen_ssh_command(
+ # Some apps need two kills
+ cmd % {
+ 'slicename' : self.slicename ,
+ },
+ host = self.hostname,
+ port = None,
+ user = self.slicename,
+ agent = None,
+ ident_key = self.ident_path,
+ server_key = self.server_key,
+ tty = True, # so that ps -N -T works as advertised...
+ timeout = 60,
+ retry = 3
+ )
+ proc.wait()
+
def prepare_dependencies(self):
# Configure p2p yum dependency installer
if self.required_packages and not self._installed:
self._yum_dependencies.home_path = "nepi-yumdep"
self._yum_dependencies.depends = ' '.join(self.required_packages)
- def configure_routes(self, routes, devs):
+ def routing_method(self, routes, vsys_vnet):
"""
- Add the specified routes to the node's routing table
+ There are two methods, vroute and sliceip.
+
+ vroute:
+ Modifies the node's routing table directly, validating that the IP
+ range lies within the network given by the slice's vsys_vnet tag.
+ This method is the most scalable for very small routing tables
+ that need not modify other routes (including the default)
+
+ sliceip:
+ Uses policy routing and iptables filters to create per-sliver
+ routing tables. It's the most flexible way, but it doesn't scale
+ as well since only 155 routing tables can be created this way.
+
+ This method will return the most appropriate routing method, which will
+ prefer vroute for small routing tables.
"""
- rules = []
+ # For now, sliceip results in kernel panics
+ # so we HAVE to use vroute
+ return 'vroute'
+
+ # We should not make the routing table grow too big
+ if len(routes) > MAX_VROUTE_ROUTES:
+ return 'sliceip'
+
+ vsys_vnet = ipaddr.IPNetwork(vsys_vnet)
+ for route in routes:
+ dest, prefix, nexthop, metric = route
+ dest = ipaddr.IPNetwork("%s/%d" % (dest,prefix))
+ nexthop = ipaddr.IPAddress(nexthop)
+ if dest not in vsys_vnet or nexthop not in vsys_vnet:
+ return 'sliceip'
+
+ return 'vroute'
+
+ def format_route(self, route, dev, method, action):
+ dest, prefix, nexthop, metric = route
+ if method == 'vroute':
+ return (
+ "%s %s%s gw %s %s" % (
+ action,
+ dest,
+ (("/%d" % (prefix,)) if prefix and prefix != 32 else ""),
+ nexthop,
+ dev,
+ )
+ )
+ elif method == 'sliceip':
+ return (
+ "route %s to %s%s via %s metric %s dev %s" % (
+ action,
+ dest,
+ (("/%d" % (prefix,)) if prefix and prefix != 32 else ""),
+ nexthop,
+ metric or 1,
+ dev,
+ )
+ )
+ else:
+ raise AssertionError, "Unknown method"
+
+ def _annotate_routes_with_devs(self, routes, devs, method):
+ dev_routes = []
for route in routes:
for dev in devs:
if dev.routes_here(route):
- # Schedule rule
- dest, prefix, nexthop, metric = route
- rules.append(
- "add %s%s gw %s %s" % (
- dest,
- (("/%d" % (prefix,)) if prefix and prefix != 32 else ""),
- nexthop,
- dev.if_name,
- )
- )
+ dev_routes.append(tuple(route) + (dev.if_name,))
# Stop checking
break
else:
- raise RuntimeError, "Route %s cannot be bound to any virtual interface " \
- "- PL can only handle rules over virtual interfaces. Candidates are: %s" % (route,devs)
+ if method == 'sliceip':
+ dev_routes.append(tuple(route) + ('eth0',))
+ else:
+ raise RuntimeError, "Route %s cannot be bound to any virtual interface " \
+ "- PL can only handle rules over virtual interfaces. Candidates are: %s" % (route,devs)
+ return dev_routes
+
+ def configure_routes(self, routes, devs, vsys_vnet):
+ """
+ Add the specified routes to the node's routing table
+ """
+ rules = []
+ method = self.routing_method(routes, vsys_vnet)
+ tdevs = set()
+
+ # annotate routes with devices
+ dev_routes = self._annotate_routes_with_devs(routes, devs, method)
+ for route in dev_routes:
+ route, dev = route[:-1], route[-1]
+
+ # Schedule rule
+ tdevs.add(dev)
+ rules.append(self.format_route(route, dev, method, 'add'))
+
+ if method == 'sliceip':
+ rules = map('enable '.__add__, tdevs) + rules
- self._logger.info("Setting up routes for %s", self.hostname)
+ self._logger.info("Setting up routes for %s using %s", self.hostname, method)
+ self._logger.debug("Routes for %s:\n\t%s", self.hostname, '\n\t'.join(rules))
+
+ self.apply_route_rules(rules, method)
+
+ self._configured_routes = set(routes)
+ self._configured_devs = tdevs
+ self._configured_method = method
+
+ def reconfigure_routes(self, routes, devs, vsys_vnet):
+ """
+ Updates the routes in the node's routing table to match
+ the given route list
+ """
+ method = self._configured_method
+
+ dev_routes = self._annotate_routes_with_devs(routes, devs, method)
+
+ current = self._configured_routes
+ current_devs = self._configured_devs
+ new = set(dev_routes)
+ new_devs = set(map(operator.itemgetter(-1), dev_routes))
+
+ deletions = current - new
+ insertions = new - current
+
+ dev_deletions = current_devs - new_devs
+ dev_insertions = new_devs - current_devs
+
+ # Generate rules
+ rules = []
+
+ # Rule deletions first
+ for route in deletions:
+ route, dev = route[:-1], route[-1]
+ rules.append(self.format_route(route, dev, method, 'del'))
+
+ if method == 'sliceip':
+ # Dev deletions now
+ rules.extend(map('disable '.__add__, dev_deletions))
+
+ # Dev insertions now
+ rules.extend(map('enable '.__add__, dev_insertions))
+
+ # Rule insertions now
+ for route in insertions:
+ route, dev = route[:-1], dev[-1]
+ rules.append(self.format_route(route, dev, method, 'add'))
+
+ # Apply
+ self.apply_route_rules(rules, method)
+
+ self._configured_routes = dev_routes
+ self._configured_devs = new_devs
+
+ def apply_route_rules(self, rules, method):
(out,err),proc = server.popen_ssh_command(
- "( sudo -S bash -c 'cat /vsys/vroute.out >&2' & ) ; sudo -S bash -c 'cat > /vsys/vroute.in' ; sleep 0.1" % dict(
- home = server.shell_escape(self.home_path)),
+ "( sudo -S bash -c 'cat /vsys/%(method)s.out >&2' & ) ; sudo -S bash -c 'cat > /vsys/%(method)s.in' ; sleep 0.5" % dict(
+ home = server.shell_escape(self.home_path),
+ method = method),
host = self.hostname,
port = None,
user = self.slicename,
agent = None,
ident_key = self.ident_path,
server_key = self.server_key,
- stdin = '\n'.join(rules)
+ stdin = '\n'.join(rules),
+ timeout = 300
)
if proc.wait() or err:
raise RuntimeError, "Could not set routes (%s) errors: %s%s" % (rules,out,err)
-
-
+ elif out or err:
+ logger.debug("%s said: %s%s", method, out, err)