Use system.multicall to accelerate batch API calls
[nepi.git] / src / nepi / testbeds / planetlab / node.py
index 15e66e1..ed8ddbd 100644 (file)
@@ -64,7 +64,15 @@ class Node(object):
         'maxLoad' : ('load%(timeframe)s', '[value'),
         'minCpu' : ('cpu%(timeframe)s', ']value'),
         'maxCpu' : ('cpu%(timeframe)s', '[value'),
-    }    
+    }
+    
+    RATE_FACTORS = (
+        # (<tag name>, <weight>, <default>)
+        ('bw%(timeframe)s', -0.001, 1024.0),
+        ('cpu%(timeframe)s', 0.1, 40.0),
+        ('load%(timeframe)s', -0.2, 3.0),
+        ('reliability%(timeframe)s', 1, 100.0),
+    )
     
     DEPENDS_PIDFILE = '/tmp/nepi-depends.pid'
     DEPENDS_LOGFILE = '/tmp/nepi-depends.log'
@@ -113,6 +121,9 @@ class Node(object):
         self.rpmFusion = False
         self.env = collections.defaultdict(list)
         
+        # Some special applications - initialized when connected
+        self.multicast_forwarder = None
+        
         # Testbed-derived attributes
         self.slicename = None
         self.ident_path = None
@@ -313,12 +324,42 @@ class Node(object):
         self._node_id = None
         self.__dict__.update(self.__orig_attrs)
     
+    def rate_nodes(self, nodes):
+        rates = collections.defaultdict(int)
+        tags = collections.defaultdict(dict)
+        replacements = {'timeframe':self.timeframe}
+        tagnames = [ tagname % replacements 
+                     for tagname, weight, default in self.RATE_FACTORS ]
+        
+        taginfo = self._api.GetNodeTags(
+            node_id=list(nodes), 
+            tagname=tagnames,
+            fields=('node_id','tagname','value'))
+        
+        unpack = operator.itemgetter('node_id','tagname','value')
+        for value in taginfo:
+            node, tagname, value = unpack(value)
+            if value and value.lower() != 'n/a':
+                tags[tagname][int(node)] = float(value)
+        
+        for tagname, weight, default in self.RATE_FACTORS:
+            taginfo = tags[tagname % replacements].get
+            for node in nodes:
+                rates[node] += weight * taginfo(node,default)
+        
+        return map(rates.__getitem__, nodes)
+            
     def fetch_node_info(self):
         orig_attrs = {}
         
-        info = self._api.GetNodes(self._node_id)[0]
+        self._api.StartMulticall()
+        info = self._api.GetNodes(self._node_id)
+        tags = self._api.GetNodeTags(node_id=self._node_id, fields=('tagname','value'))
+        info, tags = self._api.FinishMulticall()
+        info = info[0]
+        
         tags = dict( (t['tagname'],t['value'])
-                     for t in self._api.GetNodeTags(node_id=self._node_id, fields=('tagname','value')) )
+                     for t in tags )
 
         orig_attrs['min_num_external_ifaces'] = self.min_num_external_ifaces
         orig_attrs['max_num_external_ifaces'] = self.max_num_external_ifaces
@@ -339,6 +380,8 @@ class Node(object):
                 value = tags[tag]
                 if hasattr(self, attr):
                     orig_attrs[attr] = getattr(self, attr)
+                if not value or value.lower() == 'n/a':
+                    value = None
                 setattr(self, attr, value)
         
         if 'peer_id' in info:
@@ -404,7 +447,8 @@ class Node(object):
                     user = self.slicename,
                     agent = None,
                     ident_key = self.ident_path,
-                    server_key = self.server_key
+                    server_key = self.server_key,
+                    timeout = 600,
                     )
                 
                 if proc.wait():
@@ -448,7 +492,9 @@ class Node(object):
             user = self.slicename,
             agent = None,
             ident_key = self.ident_path,
-            server_key = self.server_key
+            server_key = self.server_key,
+            timeout = 60,
+            err_on_timeout = False
             )
         
         if proc.wait():
@@ -468,27 +514,34 @@ class Node(object):
             return
             
         self._logger.info("Cleaning up %s", self.hostname)
-
-        (out,err),proc = server.popen_ssh_command(
-            # Some apps need two kills
+        
+        cmds = [
             "sudo -S killall python tcpdump || /bin/true ; "
             "sudo -S killall python tcpdump || /bin/true ; "
-            "sudo -S kill $(ps -N T -o pid --no-heading | sort) || /bin/true ; "
-            "sudo -S killall -u %(slicename)s || /bin/true ; "
-            "sudo -S killall -u %(slicename)s || /bin/true ; "
-            "sudo -S killall -u root || /bin/true ; "
-            "sudo -S killall -u root || /bin/true " % {
-                'slicename' : self.slicename ,
-            },
-            host = self.hostname,
-            port = None,
-            user = self.slicename,
-            agent = None,
-            ident_key = self.ident_path,
-            server_key = self.server_key,
-            tty = True, # so that ps -N -T works as advertised...
-            )
-        proc.wait()
+            "sudo -S kill $(ps -N -T -o pid --no-heading | grep -v $PPID | sort) || /bin/true ",
+            "sudo -S killall -u %(slicename)s || /bin/true ",
+            "sudo -S killall -u root || /bin/true ",
+            "sudo -S killall -u %(slicename)s || /bin/true ",
+            "sudo -S killall -u root || /bin/true ",
+        ]
+
+        for cmd in cmds:
+            (out,err),proc = server.popen_ssh_command(
+                # Some apps need two kills
+                cmd % {
+                    'slicename' : self.slicename ,
+                },
+                host = self.hostname,
+                port = None,
+                user = self.slicename,
+                agent = None,
+                ident_key = self.ident_path,
+                server_key = self.server_key,
+                tty = True, # so that ps -N -T works as advertised...
+                timeout = 60,
+                retry = 3
+                )
+            proc.wait()
     
     def prepare_dependencies(self):
         # Configure p2p yum dependency installer
@@ -665,7 +718,8 @@ class Node(object):
             agent = None,
             ident_key = self.ident_path,
             server_key = self.server_key,
-            stdin = '\n'.join(rules)
+            stdin = '\n'.join(rules),
+            timeout = 300
             )
         
         if proc.wait() or err: