adding vlc broadcasting over Planetlab example
[nepi.git] / src / nepi / testbeds / planetlab / node.py
index c357462..59485ef 100644 (file)
@@ -1,4 +1,3 @@
-#!/usr/bin/env python
 # -*- coding: utf-8 -*-
 
 from constants import TESTBED_ID
@@ -15,6 +14,7 @@ import sys
 import logging
 import ipaddr
 import operator
+import re
 
 from nepi.util import server
 from nepi.util import parallel
@@ -64,7 +64,15 @@ class Node(object):
         'maxLoad' : ('load%(timeframe)s', '[value'),
         'minCpu' : ('cpu%(timeframe)s', ']value'),
         'maxCpu' : ('cpu%(timeframe)s', '[value'),
-    }    
+    }
+    
+    RATE_FACTORS = (
+        # (<tag name>, <weight>, <default>)
+        ('bw%(timeframe)s', -0.001, 1024.0),
+        ('cpu%(timeframe)s', 0.1, 40.0),
+        ('load%(timeframe)s', -0.2, 3.0),
+        ('reliability%(timeframe)s', 1, 100.0),
+    )
     
     DEPENDS_PIDFILE = '/tmp/nepi-depends.pid'
     DEPENDS_LOGFILE = '/tmp/nepi-depends.log'
@@ -80,10 +88,11 @@ class Node(object):
     minLoad = _castproperty(float, '_minLoad')
     maxLoad = _castproperty(float, '_maxLoad')
     
-    def __init__(self, api=None):
+    def __init__(self, api=None, sliceapi=None):
         if not api:
             api = plcapi.PLCAPI()
         self._api = api
+        self._sliceapi = sliceapi or api
         
         # Attributes
         self.hostname = None
@@ -104,7 +113,7 @@ class Node(object):
         self.maxLoad = None
         self.min_num_external_ifaces = None
         self.max_num_external_ifaces = None
-        self.timeframe = 'm'
+        self._timeframe = 'w'
         
         # Applications and routes add requirements to connected nodes
         self.required_packages = set()
@@ -113,12 +122,16 @@ class Node(object):
         self.rpmFusion = False
         self.env = collections.defaultdict(list)
         
+        # Some special applications - initialized when connected
+        self.multicast_forwarder = None
+        
         # Testbed-derived attributes
         self.slicename = None
         self.ident_path = None
         self.server_key = None
         self.home_path = None
-        self.enable_cleanup = False
+        self.enable_proc_cleanup = False
+        self.enable_home_cleanup = False
         
         # Those are filled when an actual node is allocated
         self._node_id = None
@@ -127,6 +140,27 @@ class Node(object):
 
         # Logging
         self._logger = logging.getLogger('nepi.testbeds.planetlab')
+
+    def set_timeframe(self, timeframe):
+        if timeframe == "latest":
+            self._timeframe = ""
+        elif timeframe == "month":
+            self._timeframe = "m"
+        elif timeframe == "year":
+            self._timeframe = "y"
+        else:
+            self._timeframe = "w"
+
+    def get_timeframe(self):
+        if self._timeframe == "":
+            return "latest"
+        if self._timeframe == "m":
+            return "month"
+        if self._timeframe == "y":
+            return "year"
+        return "week"
+
+    timeframe = property(get_timeframe, set_timeframe)
     
     def _nepi_testbed_environment_setup_get(self):
         command = cStringIO.StringIO()
@@ -141,8 +175,10 @@ class Node(object):
                 for envval in envvals:
                     command.write(' ; export %s=%s' % (envkey, envval))
         return command.getvalue()
+
     def _nepi_testbed_environment_setup_set(self, value):
         pass
+
     _nepi_testbed_environment_setup = property(
         _nepi_testbed_environment_setup_get,
         _nepi_testbed_environment_setup_set)
@@ -166,7 +202,7 @@ class Node(object):
         self._logger.info("Finding candidates for %s", self.make_filter_description())
         
         fields = ('node_id',)
-        replacements = {'timeframe':self.timeframe}
+        replacements = {'timeframe':self._timeframe}
         
         # get initial candidates (no tag filters)
         basefilters = self.build_filters({}, self.BASEFILTERS)
@@ -186,8 +222,8 @@ class Node(object):
             extra['peer'] = self.site
             
         candidates = set(map(operator.itemgetter('node_id'), 
-            self._api.GetNodes(filters=basefilters, fields=fields, **extra)))
-        
+            self._sliceapi.GetNodes(filters=basefilters, fields=fields, **extra)))
+
         # filter by tag, one tag at a time
         applicable = self.applicable_filters
         for tagfilter in self.TAGFILTERS.iteritems():
@@ -197,22 +233,22 @@ class Node(object):
             if attr in applicable:
                 tagfilter = rootfilters.copy()
                 tagfilter['tagname'] = tagname % replacements
-                tagfilter[expr % replacements] = getattr(self,attr)
+                tagfilter[expr % replacements] = str(getattr(self,attr))
                 tagfilter['node_id'] = list(candidates)
-                
+              
                 candidates &= set(map(operator.itemgetter('node_id'),
-                    self._api.GetNodeTags(filters=tagfilter, fields=fields)))
-        
+                    self._sliceapi.GetNodeTags(filters=tagfilter, fields=fields)))
+
         # filter by vsys tags - special case since it doesn't follow
         # the usual semantics
         if self.required_vsys:
             newcandidates = collections.defaultdict(set)
             
-            vsys_tags = self._api.GetNodeTags(
+            vsys_tags = self._sliceapi.GetNodeTags(
                 tagname='vsys', 
                 node_id = list(candidates), 
                 fields = ['node_id','value'])
-            
+
             vsys_tags = map(
                 operator.itemgetter(['node_id','value']),
                 vsys_tags)
@@ -234,7 +270,7 @@ class Node(object):
             filters = basefilters.copy()
             filters['node_id'] = list(candidates)
             ifaces = dict(map(operator.itemgetter('node_id','interface_ids'),
-                self._api.GetNodes(filters=basefilters, fields=('node_id','interface_ids')) ))
+                self._sliceapi.GetNodes(filters=basefilters, fields=('node_id','interface_ids')) ))
             
             # filter candidates by interface count
             if self.min_num_external_ifaces is not None and self.max_num_external_ifaces is not None:
@@ -248,14 +284,16 @@ class Node(object):
                     len(ifaces.get(node_id,())) <= self.max_num_external_ifaces )
             
             candidates = set(filter(predicate, candidates))
-        
+       
         # make sure hostnames are resolvable
+        hostnames = dict() 
         if candidates:
             self._logger.info("  Found %s candidates. Checking for reachability...", len(candidates))
-            
+           
             hostnames = dict(map(operator.itemgetter('node_id','hostname'),
-                self._api.GetNodes(list(candidates), ['node_id','hostname'])
+                self._sliceapi.GetNodes(list(candidates), ['node_id','hostname'])
             ))
+
             def resolvable(node_id):
                 try:
                     addr = socket.gethostbyname(hostnames[node_id])
@@ -266,8 +304,14 @@ class Node(object):
                 maxthreads = 16))
 
             self._logger.info("  Found %s reachable candidates.", len(candidates))
-            
-        return candidates
+
+            for h in hostnames.keys():
+                if h not in candidates:
+                    del hostnames[h]
+
+            hostnames = dict((v,k) for k, v in hostnames.iteritems())
+
+        return hostnames
     
     def make_filter_description(self):
         """
@@ -311,22 +355,59 @@ class Node(object):
     
     def unassign_node(self):
         self._node_id = None
-        self.__dict__.update(self.__orig_attrs)
+        self.hostip = None
+        
+        try:
+            orig_attrs = self.__orig_attrs
+        except AttributeError:
+            return
+            
+        for key, value in orig_attrs.iteritems():
+            setattr(self, key, value)
+        del self.__orig_attrs
     
+    def rate_nodes(self, nodes):
+        rates = collections.defaultdict(int)
+        tags = collections.defaultdict(dict)
+        replacements = {'timeframe':self._timeframe}
+        tagnames = [ tagname % replacements 
+                     for tagname, weight, default in self.RATE_FACTORS ]
+       
+        taginfo = self._sliceapi.GetNodeTags(
+            node_id=list(nodes), 
+            tagname=tagnames,
+            fields=('node_id','tagname','value'))
+
+        unpack = operator.itemgetter('node_id','tagname','value')
+        for value in taginfo:
+            node, tagname, value = unpack(value)
+            if value and value.lower() != 'n/a':
+                tags[tagname][node] = float(value)
+        
+        for tagname, weight, default in self.RATE_FACTORS:
+            taginfo = tags[tagname % replacements].get
+            for node in nodes:
+                rates[node] += weight * taginfo(node,default)
+        
+        return map(rates.__getitem__, nodes)
+            
     def fetch_node_info(self):
         orig_attrs = {}
         
-        info = self._api.GetNodes(self._node_id)[0]
+        info, tags = self._sliceapi.GetNodeInfo(self._node_id)
+        info = info[0]
+        
         tags = dict( (t['tagname'],t['value'])
-                     for t in self._api.GetNodeTags(node_id=self._node_id, fields=('tagname','value')) )
+                     for t in tags )
 
         orig_attrs['min_num_external_ifaces'] = self.min_num_external_ifaces
         orig_attrs['max_num_external_ifaces'] = self.max_num_external_ifaces
         self.min_num_external_ifaces = None
         self.max_num_external_ifaces = None
-        self.timeframe = 'm'
+        if not self._timeframe: self._timeframe = 'w'
         
-        replacements = {'timeframe':self.timeframe}
+        replacements = {'timeframe':self._timeframe}
+
         for attr, tag in self.BASEFILTERS.iteritems():
             if tag in info:
                 value = info[tag]
@@ -339,11 +420,13 @@ class Node(object):
                 value = tags[tag]
                 if hasattr(self, attr):
                     orig_attrs[attr] = getattr(self, attr)
+                if not value or value.lower() == 'n/a':
+                    value = None
                 setattr(self, attr, value)
         
         if 'peer_id' in info:
             orig_attrs['site'] = self.site
-            self.site = self._api.peer_map[info['peer_id']]
+            self.site = self._sliceapi.peer_map[info['peer_id']]
         
         if 'interface_ids' in info:
             self.min_num_external_ifaces = \
@@ -353,7 +436,12 @@ class Node(object):
             orig_attrs['server_key'] = self.server_key
             self.server_key = info['ssh_rsa_key']
         
-        self.__orig_attrs = orig_attrs
+        self.hostip = socket.gethostbyname(self.hostname)
+        
+        try:
+            self.__orig_attrs
+        except AttributeError:
+            self.__orig_attrs = orig_attrs
 
     def validate(self):
         if self.home_path is None:
@@ -389,7 +477,7 @@ class Node(object):
                     RPM_FUSION_URL = self.RPM_FUSION_URL
                     
                 rpmFusion = (
-                  '( rpm -q $(rpm -q -p %(RPM_FUSION_URL)s) || rpm -i %(RPM_FUSION_URL)s ) &&'
+                  'rpm -q $(rpm -q -p %(RPM_FUSION_URL)s) || sudo -S rpm -i %(RPM_FUSION_URL)s'
                 ) % {
                     'RPM_FUSION_URL' : RPM_FUSION_URL
                 }
@@ -404,10 +492,13 @@ class Node(object):
                     user = self.slicename,
                     agent = None,
                     ident_key = self.ident_path,
-                    server_key = self.server_key
+                    server_key = self.server_key,
+                    timeout = 600,
                     )
                 
                 if proc.wait():
+                    if self.check_bad_host(out,err):
+                        self.blacklist()
                     raise RuntimeError, "Failed to set up application: %s %s" % (out,err,)
             
             # Launch p2p yum dependency installer
@@ -426,19 +517,22 @@ class Node(object):
                 # PlanetLab has a 15' delay on configuration propagation
                 # If we're above that delay, the unresponsiveness is not due
                 # to this delay.
-                raise UnresponsiveNodeError, "Unresponsive host %s" % (self.hostname,)
+                if not self.is_alive(verbose=True):
+                    raise UnresponsiveNodeError, "Unresponsive host %s" % (self.hostname,)
         
         # Ensure the node is clean (no apps running that could interfere with operations)
-        if self.enable_cleanup:
-            self.do_cleanup()
-    
+        if self.enable_proc_cleanup:
+            self.do_proc_cleanup()
+        if self.enable_home_cleanup:
+            self.do_home_cleanup()
+   
     def wait_dependencies(self, pidprobe=1, probe=0.5, pidmax=10, probemax=10):
         # Wait for the p2p installer
         if self._yum_dependencies and not self._installed:
             self._yum_dependencies.async_setup_wait()
             self._installed = True
         
-    def is_alive(self):
+    def is_alive(self, verbose = False):
         # Make sure all the paths are created where 
         # they have to be created for deployment
         (out,err),proc = server.eintr_retry(server.popen_ssh_command)(
@@ -448,26 +542,39 @@ class Node(object):
             user = self.slicename,
             agent = None,
             ident_key = self.ident_path,
-            server_key = self.server_key
+            server_key = self.server_key,
+            timeout = 60,
+            err_on_timeout = False,
+            persistent = False
             )
         
         if proc.wait():
+            if verbose:
+                self._logger.warn("Unresponsive node %s got:\n%s%s", self.hostname, out, err)
             return False
         elif not err and out.strip() == 'ALIVE':
             return True
         else:
+            if verbose:
+                self._logger.warn("Unresponsive node %s got:\n%s%s", self.hostname, out, err)
             return False
     
     def destroy(self):
-        if self.enable_cleanup:
-            self.do_cleanup()
+        if self.enable_proc_cleanup:
+            self.do_proc_cleanup()
     
-    def do_cleanup(self):
+    def blacklist(self):
+        if self._node_id:
+            self._logger.warn("Blacklisting malfunctioning node %s", self.hostname)
+            import util
+            util.appendBlacklist(self.hostname)
+    
+    def do_proc_cleanup(self):
         if self.testbed().recovering:
             # WOW - not now
             return
             
-        self._logger.info("Cleaning up %s", self.hostname)
+        self._logger.info("Cleaning up processes on %s", self.hostname)
         
         cmds = [
             "sudo -S killall python tcpdump || /bin/true ; "
@@ -492,9 +599,40 @@ class Node(object):
                 ident_key = self.ident_path,
                 server_key = self.server_key,
                 tty = True, # so that ps -N -T works as advertised...
+                timeout = 60,
+                retry = 3
                 )
             proc.wait()
-    
+     
+    def do_home_cleanup(self):
+        if self.testbed().recovering:
+            # WOW - not now
+            return
+            
+        self._logger.info("Cleaning up home on %s", self.hostname)
+        
+        cmds = [
+            "find . -maxdepth 1 ! -name '.bash*' ! -name '.' -execdir rm -rf {} + "
+        ]
+
+        for cmd in cmds:
+            (out,err),proc = server.popen_ssh_command(
+                # Some apps need two kills
+                cmd % {
+                    'slicename' : self.slicename ,
+                },
+                host = self.hostname,
+                port = None,
+                user = self.slicename,
+                agent = None,
+                ident_key = self.ident_path,
+                server_key = self.server_key,
+                tty = True, # so that ps -N -T works as advertised...
+                timeout = 60,
+                retry = 3
+                )
+            proc.wait()
+   
     def prepare_dependencies(self):
         # Configure p2p yum dependency installer
         if self.required_packages and not self._installed:
@@ -530,10 +668,10 @@ class Node(object):
         if len(routes) > MAX_VROUTE_ROUTES:
             return 'sliceip'
         
-        vsys_vnet = ipaddr.IPNetwork(vsys_vnet)
+        vsys_vnet = ipaddr.IPv4Network(vsys_vnet)
         for route in routes:
-            dest, prefix, nexthop, metric = route
-            dest = ipaddr.IPNetwork("%s/%d" % (dest,prefix))
+            dest, prefix, nexthop, metric, device = route
+            dest = ipaddr.IPv4Network("%s/%d" % (dest,prefix))
             nexthop = ipaddr.IPAddress(nexthop)
             if dest not in vsys_vnet or nexthop not in vsys_vnet:
                 return 'sliceip'
@@ -541,7 +679,7 @@ class Node(object):
         return 'vroute'
     
     def format_route(self, route, dev, method, action):
-        dest, prefix, nexthop, metric = route
+        dest, prefix, nexthop, metric, device = route
         if method == 'vroute':
             return (
                 "%s %s%s gw %s %s" % (
@@ -670,7 +808,8 @@ class Node(object):
             agent = None,
             ident_key = self.ident_path,
             server_key = self.server_key,
-            stdin = '\n'.join(rules)
+            stdin = '\n'.join(rules),
+            timeout = 300
             )
         
         if proc.wait() or err:
@@ -678,3 +817,10 @@ class Node(object):
         elif out or err:
             logger.debug("%s said: %s%s", method, out, err)
 
+    def check_bad_host(self, out, err):
+        badre = re.compile(r'(?:'
+                           r"curl: [(]\d+[)] Couldn't resolve host 'download1[.]rpmfusion[.]org'"
+                           r'|Error: disk I/O error'
+                           r')', 
+                           re.I)
+        return badre.search(out) or badre.search(err)