+ Implemented option to cleanup directories on PlanetLab slivers.
[nepi.git] / src / nepi / testbeds / planetlab / execute.py
index 292b92e..467d9fe 100644 (file)
@@ -1,4 +1,3 @@
-#!/usr/bin/env python
 # -*- coding: utf-8 -*-
 
 from constants import TESTBED_ID, TESTBED_VERSION
@@ -9,6 +8,7 @@ from nepi.util.graphtools import mst
 from nepi.util import ipaddr2
 from nepi.util import environ
 from nepi.util.parallel import ParallelRun
+import threading
 import sys
 import os
 import os.path
@@ -26,6 +26,7 @@ import shutil
 import logging
 import metadata
 import weakref
+import util as plutil
 
 class TempKeyError(Exception):
     pass
@@ -37,16 +38,22 @@ class TestbedController(testbed_impl.TestbedController):
         self.slicename = None
         self._traces = dict()
 
-        import node, interfaces, application
+        import node, interfaces, application, multicast
         self._node = node
         self._interfaces = interfaces
         self._app = application
+        self._multicast = multicast
         
         self._blacklist = set()
         self._just_provisioned = set()
         
         self._load_blacklist()
-        
+
+        self._slice_id = None
+        self._plcapi = None
+        self._sliceapi = None
+        self._vsys_vnet = None
+
         self._logger = logging.getLogger('nepi.testbeds.planetlab')
         
         self.recovering = False
@@ -56,47 +63,39 @@ class TestbedController(testbed_impl.TestbedController):
         return self._home_directory
 
     @property
-    def plapi(self):
-        if not hasattr(self, '_plapi'):
+    def plcapi(self):
+        if not self._plcapi:
             import plcapi
-
-            if self.authUser:
-                self._plapi = plcapi.PLCAPI(
-                    username = self.authUser,
-                    password = self.authString,
-                    hostname = self.plcHost,
-                    urlpattern = self.plcUrl
+            self._plcapi = plcapi.plcapi(
+                    self.authUser,
+                    self.authString,
+                    self.plcHost,
+                    self.plcUrl
                     )
+        return self._plcapi
+
+    @property
+    def sliceapi(self):
+        if not self._sliceapi:
+            if not self.sfa:
+                self._sliceapi = self.plcapi
             else:
-                # anonymous access - may not be enough for much
-                self._plapi = plcapi.PLCAPI()
-        return self._plapi
+                from nepi.util import sfiapi
+                self._sliceapi = sfiapi.sfiapi(self.slice_id)
+        return self._sliceapi
 
     @property
     def slice_id(self):
-        if not hasattr(self, '_slice_id'):
-            slices = self.plapi.GetSlices(self.slicename, fields=('slice_id',))
-            if slices:
-                self._slice_id = slices[0]['slice_id']
-            else:
-                # If it wasn't found, don't remember this failure, keep trying
-                return None
+        if not self._slice_id:
+            self._slice_id = self.sliceapi.GetSliceId(self.slicename)
         return self._slice_id
     
     @property
     def vsys_vnet(self):
-        if not hasattr(self, '_vsys_vnet'):
-            slicetags = self.plapi.GetSliceTags(
-                name = self.slicename,
-                tagname = 'vsys_vnet',
-                fields=('value',))
-            if slicetags:
-                self._vsys_vnet = slicetags[0]['value']
-            else:
-                # If it wasn't found, don't remember this failure, keep trying
-                return None
+        if not self._vsys_vnet:
+            self._vsys_vnet = self.sliceapi.GetSliceVnetSysTag(self.slicename)
         return self._vsys_vnet
-    
+
     def _load_blacklist(self):
         blpath = environ.homepath('plblacklist')
         
@@ -108,9 +107,7 @@ class TestbedController(testbed_impl.TestbedController):
             
         try:
             self._blacklist = set(
-                map(int,
-                    map(str.strip, bl.readlines())
-                )
+                map(str.strip, bl.readlines())
             )
         finally:
             bl.close()
@@ -146,15 +143,26 @@ class TestbedController(testbed_impl.TestbedController):
             get_attribute_value("tapPortBase")
         self.p2pDeployment = self._attributes.\
             get_attribute_value("p2pDeployment")
-        self.dedicatedSlice = self._attributes.\
-            get_attribute_value("dedicatedSlice")
-        
+        self.cleanProc = self._attributes.\
+            get_attribute_value("cleanProc")
+        self.cleanHome = self._attributes.\
+            get_attribute_value("cleanHome")
+        self.sfa = self._attributes.\
+            get_attribute_value("sfa")
+        if self.sfa:
+            self._slice_id = self._attributes.\
+            get_attribute_value("sliceHrn")
+
         if not self.slicename:
             raise RuntimeError, "Slice not set"
         if not self.authUser:
             raise RuntimeError, "PlanetLab account username not set"
         if not self.authString:
             raise RuntimeError, "PlanetLab account passphrase not set"
+        if not self.sliceSSHKey:
+            raise RuntimeError, "PlanetLab account key not specified"
+        if not os.path.exists(self.sliceSSHKey):
+            raise RuntimeError, "PlanetLab account key cannot be opened: %s" % (self.sliceSSHKey,)
         
         self._logger.setLevel(getattr(logging,self.logLevel))
         
@@ -203,78 +211,113 @@ class TestbedController(testbed_impl.TestbedController):
         reserved = set(self._blacklist)
         for guid, node in self._elements.iteritems():
             if isinstance(node, self._node.Node) and node._node_id is not None:
-                reserved.add(node._node_id)
+                reserved.add(node.hostname)
         
         # Initial algo:
         #   look for perfectly defined nodes
         #   (ie: those with only one candidate)
-        for guid, node in self._elements.iteritems():
-            if isinstance(node, self._node.Node) and node._node_id is None:
-                # Try existing nodes first
-                # If we have only one candidate, simply use it
-                candidates = node.find_candidates(
-                    filter_slice_id = self.slice_id)
-                candidates -= reserved
-                if len(candidates) == 1:
-                    node_id = iter(candidates).next()
-                    node.assign_node_id(node_id)
-                    reserved.add(node_id)
-                elif not candidates:
+        reserve_lock = threading.RLock()
+        def assignifunique(guid, node):
+            # Try existing nodes first
+            # If we have only one candidate, simply use it
+            candidates = node.find_candidates(
+                filter_slice_id = self.slice_id)
+            
+            node_id = None
+            candidate_hosts = set(candidates.keys() if candidates else [])
+            reserve_lock.acquire()
+            try:
+                candidate_hosts -= reserved
+                if len(candidate_hosts) == 1:
+                    hostname = iter(candidate_hosts).next()
+                    node_id = candidates[hostname]
+                    reserved.add(hostname)
+                elif not candidate_hosts:
                     # Try again including unassigned nodes
-                    candidates = node.find_candidates()
-                    candidates -= reserved
-                    if len(candidates) > 1:
-                        continue
-                    if len(candidates) == 1:
-                        node_id = iter(candidates).next()
-                        node.assign_node_id(node_id)
+                    reserve_lock.release()
+                    try:
+                        candidates = node.find_candidates()
+                    finally:
+                        reserve_lock.acquire()
+                    candidate_hosts = set(candidates.keys() if candidates else [])
+                    candidate_hosts -= reserved
+                    if len(candidate_hosts) > 1:
+                        return
+                    if len(candidate_hosts) == 1:
+                        hostname = iter(candidate_hosts).next()
+                        node_id = candidates[hostname]
                         to_provision.add(node_id)
-                        reserved.add(node_id)
+                        reserved.add(hostname)
                     elif not candidates:
-                        raise RuntimeError, "Cannot assign resources for node %s, no candidates sith %s" % (guid,
+                        raise RuntimeError, "Cannot assign resources for node %s, no candidates with %s" % (guid,
                             node.make_filter_description())
+            finally:
+                reserve_lock.release()
+           
+            if node_id is not None:
+                node.assign_node_id(node_id)
+        
+        runner = ParallelRun(maxthreads=4) # don't overload the PLC API, just 4 threads to hide latencies and that's it
+        runner.start()
+        for guid, node in self._elements.iteritems():
+            if isinstance(node, self._node.Node) and node._node_id is None:
+                runner.put(assignifunique, guid, node)
+        runner.sync()
         
         # Now do the backtracking search for a suitable solution
         # First with existing slice nodes
         reqs = []
         nodes = []
+        def genreqs(node, filter_slice_id=None):
+            # Try existing nodes first
+            # If we have only one candidate, simply use it
+            candidates = node.find_candidates(
+                filter_slice_id = filter_slice_id)
+            for r in reserved:
+                if candidates.has_key(r):
+                    del candidates[r]
+            reqs.append(candidates.values())
+            nodes.append(node)
         for guid, node in self._elements.iteritems():
             if isinstance(node, self._node.Node) and node._node_id is None:
-                # Try existing nodes first
-                # If we have only one candidate, simply use it
-                candidates = node.find_candidates(
-                    filter_slice_id = self.slice_id)
-                candidates -= reserved
-                reqs.append(candidates)
-                nodes.append(node)
-        
+                runner.put(genreqs, node, self.slice_id)
+        runner.sync()
+       
         if nodes and reqs:
             if recover:
                 raise RuntimeError, "Impossible to recover: unassigned host for Nodes %r" % (nodes,)
+
+            def pickbest(fullset, nreq, node=nodes[0]):
+                if len(fullset) > nreq:
+                    fullset = zip(node.rate_nodes(fullset),fullset)
+                    fullset.sort(reverse=True)
+                    del fullset[nreq:]
+                    return set(map(operator.itemgetter(1),fullset))
+                else:
+                    return fullset
             
             try:
-                solution = resourcealloc.alloc(reqs)
+                solution = resourcealloc.alloc(reqs, sample=pickbest)
             except resourcealloc.ResourceAllocationError:
                 # Failed, try again with all nodes
                 reqs = []
                 for node in nodes:
-                    candidates = node.find_candidates()
-                    candidates -= reserved
-                    reqs.append(candidates)
-                
-                solution = resourcealloc.alloc(reqs)
+                    runner.put(genreqs, node)
+                runner.sync()
+                solution = resourcealloc.alloc(reqs, sample=pickbest)
                 to_provision.update(solution)
             
             # Do assign nodes
             for node, node_id in zip(nodes, solution):
-                node.assign_node_id(node_id)
+                runner.put(node.assign_node_id, node_id)
+            runner.join()
 
     def do_provisioning(self):
         if self._to_provision:
             # Add new nodes to the slice
-            cur_nodes = self.plapi.GetSlices(self.slicename, ['node_ids'])[0]['node_ids']
+            cur_nodes = self.sliceapi.GetSliceNodes(self.slice_id)
             new_nodes = list(set(cur_nodes) | self._to_provision)
-            self.plapi.UpdateSlice(self.slicename, nodes=new_nodes)
+            self.sliceapi.AddSliceNodes(self.slice_id, nodes=new_nodes)
 
         # cleanup
         self._just_provisioned = self._to_provision
@@ -290,12 +333,12 @@ class TestbedController(testbed_impl.TestbedController):
             
                 # Show the magic
                 self._logger.info("PlanetLab Node %s configured at %s", guid, node.hostname)
-            
+        
         try:
-            for guid, node in self._elements.iteritems():
-                if isinstance(node, self._node.Node):
-                    self._logger.info("Waiting for Node %s configured at %s", guid, node.hostname)
-                    
+            runner = ParallelRun(maxthreads=64, maxqueue=1)
+            abort = []
+            def waitforit(guid, node):
+                try:
                     node.wait_provisioning(
                         (20*60 if node._node_id in self._just_provisioned else 60)
                     )
@@ -304,9 +347,21 @@ class TestbedController(testbed_impl.TestbedController):
                     
                     # Prepare dependency installer now
                     node.prepare_dependencies()
+                except:
+                    abort.append(None)
+                    raise
+                
+            for guid, node in self._elements.iteritems():
+                if abort:
+                    break
+                if isinstance(node, self._node.Node):
+                    self._logger.info("Waiting for Node %s configured at %s", guid, node.hostname)
+                    runner.put(waitforit, guid, node)
+            runner.join()
+                    
         except self._node.UnresponsiveNodeError:
             # Uh... 
-            self._logger.warn("UNRESPONSIVE Node %s", node.hostname)
+            self._logger.warn("UNRESPONSIVE Nodes")
             
             # Mark all dead nodes (which are unresponsive) on the blacklist
             # and re-raise
@@ -314,7 +369,7 @@ class TestbedController(testbed_impl.TestbedController):
                 if isinstance(node, self._node.Node):
                     if not node.is_alive():
                         self._logger.warn("Blacklisting %s for unresponsiveness", node.hostname)
-                        self._blacklist.add(node._node_id)
+                        self._blacklist.add(node.hostname)
                         node.unassign_node()
             
             try:
@@ -340,6 +395,7 @@ class TestbedController(testbed_impl.TestbedController):
                 app.node.architecture,
                 app.node.operatingSystem,
                 app.node.pl_distro,
+                app.__class__,
             )
         
         depgroups = collections.defaultdict(list)
@@ -422,9 +478,7 @@ class TestbedController(testbed_impl.TestbedController):
             suffix = ".pub")
             
         # Create secure 256-bits temporary passphrase
-        passphrase = ''.join(map(chr,[rng.randint(0,255) 
-                                      for rng in (random.SystemRandom(),)
-                                      for i in xrange(32)] )).encode("hex")
+        passphrase = os.urandom(32).encode("hex")
                 
         # Copy keys
         oprk = open(self.sliceSSHKey, "rb")
@@ -479,6 +533,12 @@ class TestbedController(testbed_impl.TestbedController):
         # TODO: take on account schedule time for the task
         element = self._elements[guid]
         if element:
+            if name == "up":
+                if value == True:
+                    element.if_up()
+                else:
+                    element.if_down()
+
             try:
                 setattr(element, name, value)
             except:
@@ -544,10 +604,10 @@ class TestbedController(testbed_impl.TestbedController):
         self._traces.clear()
 
     def trace(self, guid, trace_id, attribute='value'):
-        app = self._elements[guid]
+        elem = self._elements[guid]
 
         if attribute == 'value':
-            path = app.sync_trace(self.home_directory, trace_id)
+            path = elem.sync_trace(self.home_directory, trace_id)
             if path:
                 fd = open(path, "r")
                 content = fd.read()
@@ -555,9 +615,9 @@ class TestbedController(testbed_impl.TestbedController):
             else:
                 content = None
         elif attribute == 'path':
-            content = app.remote_trace_path(trace_id)
+            content = elem.remote_trace_path(trace_id)
         elif attribute == 'name':
-            content = app.remote_trace_name(trace_id)
+            content = elem.remote_trace_name(trace_id)
         else:
             content = None
         return content
@@ -626,6 +686,7 @@ class TestbedController(testbed_impl.TestbedController):
                     Parallel(metadata.NS3DEPENDENCY),
                     Parallel(metadata.DEPENDENCY),
                     Parallel(metadata.APPLICATION),
+                    Parallel(metadata.CCNXDAEMON),
                 ])
 
             # Tunnels are not harmed by configuration after
@@ -654,8 +715,10 @@ class TestbedController(testbed_impl.TestbedController):
         finally:
             self.recovering = True
     
-    def _make_generic(self, parameters, kind):
-        app = kind(self.plapi)
+    def _make_generic(self, parameters, kind, **kwargs):
+        args = dict({'api': self.plcapi})
+        args.update(kwargs)
+        app = kind(**args)
         app.testbed = weakref.ref(self)
 
         # Note: there is 1-to-1 correspondence between attribute names
@@ -674,8 +737,10 @@ class TestbedController(testbed_impl.TestbedController):
         return app
 
     def _make_node(self, parameters):
-        node = self._make_generic(parameters, self._node.Node)
-        node.enable_cleanup = self.dedicatedSlice
+        args = dict({'sliceapi': self.sliceapi})
+        node = self._make_generic(parameters, self._node.Node, **args)
+        node.enable_proc_cleanup = self.cleanProc
+        node.enable_home_cleanup = self.cleanHome
         return node
 
     def _make_node_iface(self, parameters):
@@ -693,8 +758,10 @@ class TestbedController(testbed_impl.TestbedController):
     def _make_internet(self, parameters):
         return self._make_generic(parameters, self._interfaces.Internet)
 
-    def _make_application(self, parameters):
-        return self._make_generic(parameters, self._app.Application)
+    def _make_application(self, parameters, clazz = None):
+        if not clazz:
+            clazz = self._app.Application
+        return self._make_generic(parameters, clazz)
 
     def _make_dependency(self, parameters):
         return self._make_generic(parameters, self._app.Dependency)
@@ -708,3 +775,19 @@ class TestbedController(testbed_impl.TestbedController):
     def _make_tun_filter(self, parameters):
         return self._make_generic(parameters, self._interfaces.TunFilter)
 
+    def _make_class_queue_filter(self, parameters):
+        return self._make_generic(parameters, self._interfaces.ClassQueueFilter)
+
+    def _make_tos_queue_filter(self, parameters):
+        return self._make_generic(parameters, self._interfaces.ToSQueueFilter)
+
+    def _make_multicast_forwarder(self, parameters):
+        return self._make_generic(parameters, self._multicast.MulticastForwarder)
+
+    def _make_multicast_announcer(self, parameters):
+        return self._make_generic(parameters, self._multicast.MulticastAnnouncer)
+
+    def _make_multicast_router(self, parameters):
+        return self._make_generic(parameters, self._multicast.MulticastRouter)
+
+