Ticket #29: Implement dependencies to support testbed-in-PL
authorClaudio-Daniel Freire <claudio-daniel.freire@inria.fr>
Fri, 29 Apr 2011 15:24:33 +0000 (17:24 +0200)
committerClaudio-Daniel Freire <claudio-daniel.freire@inria.fr>
Fri, 29 Apr 2011 15:24:33 +0000 (17:24 +0200)
 - Generic dependencies
 - (python)path modifiers
 - NEPI-in-NEPI dependency class

src/nepi/testbeds/planetlab/application.py
src/nepi/testbeds/planetlab/execute.py
src/nepi/testbeds/planetlab/metadata_v01.py
src/nepi/testbeds/planetlab/node.py
test/testbeds/planetlab/execute.py

index c09976f..cbde9d2 100644 (file)
@@ -14,7 +14,19 @@ import rspawn
 from nepi.util.constants import STATUS_NOT_STARTED, STATUS_RUNNING, \
         STATUS_FINISHED
 
-class Application(object):
+class Dependency(object):
+    """
+    A Dependency is in every respect like an application.
+    
+    It depends on some packages, it may require building binaries, it must deploy
+    them...
+    
+    But it has no command. Dependencies aren't ever started, or stopped, and have
+    no status.
+    """
+
+    TRACES = ('buildlog')
+
     def __init__(self, api=None):
         if not api:
             api = plcapi.PLCAPI()
@@ -35,10 +47,10 @@ class Application(object):
         self.stderr = None
         self.buildlog = None
         
+        self.add_to_path = True
+        
         # Those are filled when the app is configured
         self.home_path = None
-        self.ident_path = None
-        self.slicename = None
         
         # Those are filled when an actual node is connected
         self.node = None
@@ -53,125 +65,26 @@ class Application(object):
         self._ppid = None
     
     def __str__(self):
-        return "%s<command:%s%s>" % (
+        return "%s<%s>" % (
             self.__class__.__name__,
-            "sudo " if self.sudo else "",
-            self.command,
+            ' '.join(list(self.depends or [])
+                   + list(self.sources or []))
         )
     
     def validate(self):
         if self.home_path is None:
             raise AssertionError, "Misconfigured application: missing home path"
-        if self.ident_path is None or not os.access(self.ident_path, os.R_OK):
+        if self.node.ident_path is None or not os.access(self.node.ident_path, os.R_OK):
             raise AssertionError, "Misconfigured application: missing slice SSH key"
         if self.node is None:
             raise AssertionError, "Misconfigured application: unconnected node"
         if self.node.hostname is None:
             raise AssertionError, "Misconfigured application: misconfigured node"
-        if self.slicename is None:
+        if self.node.slicename is None:
             raise AssertionError, "Misconfigured application: unspecified slice"
-
-    def start(self):
-        # Create shell script with the command
-        # This way, complex commands and scripts can be ran seamlessly
-        # sync files
-        (out,err),proc = server.popen_scp(
-            cStringIO.StringIO(self.command),
-            '%s@%s:%s' % (self.slicename, self.node.hostname, 
-                os.path.join(self.home_path, "app.sh")),
-            port = None,
-            agent = None,
-            ident_key = self.ident_path,
-            server_key = self.node.server_key
-            )
-        
-        if proc.wait():
-            raise RuntimeError, "Failed to set up application: %s %s" % (out,err,)
-        
-        # Start process in a "daemonized" way, using nohup and heavy
-        # stdin/out redirection to avoid connection issues
-        (out,err),proc = rspawn.remote_spawn(
-            self._replace_paths("bash ./app.sh"),
-            
-            pidfile = './pid',
-            home = self.home_path,
-            stdin = 'stdin' if self.stdin is not None else '/dev/null',
-            stdout = 'stdout' if self.stdout else '/dev/null',
-            stderr = 'stderr' if self.stderr else '/dev/null',
-            sudo = self.sudo,
-            
-            host = self.node.hostname,
-            port = None,
-            user = self.slicename,
-            agent = None,
-            ident_key = self.ident_path,
-            server_key = self.node.server_key
-            )
-        
-        if proc.wait():
-            raise RuntimeError, "Failed to set up application: %s %s" % (out,err,)
-
-        self._started = True
-
-    def checkpid(self):            
-        # Get PID/PPID
-        # NOTE: wait a bit for the pidfile to be created
-        if self._started and not self._pid or not self._ppid:
-            pidtuple = rspawn.remote_check_pid(
-                os.path.join(self.home_path,'pid'),
-                host = self.node.hostname,
-                port = None,
-                user = self.slicename,
-                agent = None,
-                ident_key = self.ident_path,
-                server_key = self.node.server_key
-                )
-            
-            if pidtuple:
-                self._pid, self._ppid = pidtuple
-    
-    def status(self):
-        self.checkpid()
-        if not self._started:
-            return STATUS_NOT_STARTED
-        elif not self._pid or not self._ppid:
-            return STATUS_NOT_STARTED
-        else:
-            status = rspawn.remote_status(
-                self._pid, self._ppid,
-                host = self.node.hostname,
-                port = None,
-                user = self.slicename,
-                agent = None,
-                ident_key = self.ident_path
-                )
-            
-            if status is rspawn.NOT_STARTED:
-                return STATUS_NOT_STARTED
-            elif status is rspawn.RUNNING:
-                return STATUS_RUNNING
-            elif status is rspawn.FINISHED:
-                return STATUS_FINISHED
-            else:
-                # WTF?
-                return STATUS_NOT_STARTED
-    
-    def kill(self):
-        status = self.status()
-        if status == STATUS_RUNNING:
-            # kill by ppid+pid - SIGTERM first, then try SIGKILL
-            rspawn.remote_kill(
-                self._pid, self._ppid,
-                host = self.node.hostname,
-                port = None,
-                user = self.slicename,
-                agent = None,
-                ident_key = self.ident_path,
-                server_key = self.node.server_key
-                )
     
     def remote_trace_path(self, whichtrace):
-        if whichtrace in ('stdout','stderr'):
+        if whichtrace in self.TRACES:
             tracefile = os.path.join(self.home_path, whichtrace)
         else:
             tracefile = None
@@ -192,16 +105,16 @@ class Application(object):
             stdin = open("/dev/null","r"))
 
         if proc.wait():
-            raise RuntimeError, "Failed to synchronize trace: %s %s" % (out,err,)
+            raise RuntimeError, "Failed to synchronize trace"
         
         # sync files
         (out,err),proc = server.popen_scp(
-            '%s@%s:%s' % (self.slicename, self.node.hostname, 
+            '%s@%s:%s' % (self.node.slicename, self.node.hostname, 
                 tracefile),
             local_path,
             port = None,
             agent = None,
-            ident_key = self.ident_path,
+            ident_key = self.node.ident_path,
             server_key = self.node.server_key
             )
         
@@ -238,9 +151,9 @@ class Application(object):
             "mkdir -p %s" % (server.shell_escape(self.home_path),),
             host = self.node.hostname,
             port = None,
-            user = self.slicename,
+            user = self.node.slicename,
             agent = None,
-            ident_key = self.ident_path,
+            ident_key = self.node.ident_path,
             server_key = self.node.server_key
             )
         
@@ -252,11 +165,11 @@ class Application(object):
             # Write program input
             (out,err),proc = server.popen_scp(
                 cStringIO.StringIO(self.stdin),
-                '%s@%s:%s' % (self.slicename, self.node.hostname, 
+                '%s@%s:%s' % (self.node.slicename, self.node.hostname, 
                     os.path.join(self.home_path, 'stdin') ),
                 port = None,
                 agent = None,
-                ident_key = self.ident_path,
+                ident_key = self.node.ident_path,
                 server_key = self.node.server_key
                 )
             
@@ -280,9 +193,9 @@ class Application(object):
             # Copy all sources
             (out,err),proc = server.popen_scp(
                 sources,
-                "%s@%s:%s" % (self.slicename, self.node.hostname, 
+                "%s@%s:%s" % (self.node.slicename, self.node.hostname, 
                     os.path.join(self.home_path,'.'),),
-                ident_key = self.ident_path,
+                ident_key = self.node.ident_path,
                 server_key = self.node.server_key
                 )
         
@@ -297,9 +210,9 @@ class Application(object):
                 },
                 host = self.node.hostname,
                 port = None,
-                user = self.slicename,
+                user = self.node.slicename,
                 agent = None,
-                ident_key = self.ident_path,
+                ident_key = self.node.ident_path,
                 server_key = self.node.server_key
                 )
         
@@ -316,9 +229,9 @@ class Application(object):
                 },
                 host = self.node.hostname,
                 port = None,
-                user = self.slicename,
+                user = self.node.slicename,
                 agent = None,
-                ident_key = self.ident_path,
+                ident_key = self.node.ident_path,
                 server_key = self.node.server_key
                 )
         
@@ -333,9 +246,9 @@ class Application(object):
                 },
                 host = self.node.hostname,
                 port = None,
-                user = self.slicename,
+                user = self.node.slicename,
                 agent = None,
-                ident_key = self.ident_path,
+                ident_key = self.node.ident_path,
                 server_key = self.node.server_key
                 )
         
@@ -351,12 +264,221 @@ class Application(object):
                 },
                 host = self.node.hostname,
                 port = None,
-                user = self.slicename,
+                user = self.node.slicename,
                 agent = None,
-                ident_key = self.ident_path,
+                ident_key = self.node.ident_path,
                 server_key = self.node.server_key
                 )
         
             if proc.wait():
                 raise RuntimeError, "Failed instal build sources: %s %s" % (out,err,)
 
+class Application(Dependency):
+    """
+    An application also has dependencies, but also a command to be ran and monitored.
+    
+    It adds the output of that command as traces.
+    """
+    
+    TRACES = ('stdout','stderr','buildlog')
+    
+    def __init__(self, api=None):
+        super(Application,self).__init__(api)
+        
+        # Attributes
+        self.command = None
+        self.sudo = False
+        
+        self.stdin = None
+        self.stdout = None
+        self.stderr = None
+        
+        # Those are filled when the app is started
+        #   Having both pid and ppid makes it harder
+        #   for pid rollover to induce tracking mistakes
+        self._started = False
+        self._pid = None
+        self._ppid = None
+
+        # Do not add to the python path of nodes
+        self.add_to_path = False
+    
+    def __str__(self):
+        return "%s<command:%s%s>" % (
+            self.__class__.__name__,
+            "sudo " if self.sudo else "",
+            self.command,
+        )
+    
+    def start(self):
+        # Create shell script with the command
+        # This way, complex commands and scripts can be ran seamlessly
+        # sync files
+        command = cStringIO.StringIO()
+        command.write('export PYTHONPATH=$PYTHONPATH:%s\n' % (
+            ':'.join(["${HOME}/"+server.shell_escape(s) for s in self.node.pythonpath])
+        ))
+        command.write('export PATH=$PATH:%s\n' % (
+            ':'.join(["${HOME}/"+server.shell_escape(s) for s in self.node.pythonpath])
+        ))
+        command.write(self.command)
+        command.seek(0)
+        
+        (out,err),proc = server.popen_scp(
+            command,
+            '%s@%s:%s' % (self.node.slicename, self.node.hostname, 
+                os.path.join(self.home_path, "app.sh")),
+            port = None,
+            agent = None,
+            ident_key = self.node.ident_path,
+            server_key = self.node.server_key
+            )
+        
+        if proc.wait():
+            raise RuntimeError, "Failed to set up application: %s %s" % (out,err,)
+        
+        # Start process in a "daemonized" way, using nohup and heavy
+        # stdin/out redirection to avoid connection issues
+        (out,err),proc = rspawn.remote_spawn(
+            self._replace_paths("bash ./app.sh"),
+            
+            pidfile = './pid',
+            home = self.home_path,
+            stdin = 'stdin' if self.stdin is not None else '/dev/null',
+            stdout = 'stdout' if self.stdout else '/dev/null',
+            stderr = 'stderr' if self.stderr else '/dev/null',
+            sudo = self.sudo,
+            
+            host = self.node.hostname,
+            port = None,
+            user = self.node.slicename,
+            agent = None,
+            ident_key = self.node.ident_path,
+            server_key = self.node.server_key
+            )
+        
+        if proc.wait():
+            raise RuntimeError, "Failed to set up application: %s %s" % (out,err,)
+
+        self._started = True
+
+    def checkpid(self):            
+        # Get PID/PPID
+        # NOTE: wait a bit for the pidfile to be created
+        if self._started and not self._pid or not self._ppid:
+            pidtuple = rspawn.remote_check_pid(
+                os.path.join(self.home_path,'pid'),
+                host = self.node.hostname,
+                port = None,
+                user = self.node.slicename,
+                agent = None,
+                ident_key = self.node.ident_path,
+                server_key = self.node.server_key
+                )
+            
+            if pidtuple:
+                self._pid, self._ppid = pidtuple
+    
+    def status(self):
+        self.checkpid()
+        if not self._started:
+            return STATUS_NOT_STARTED
+        elif not self._pid or not self._ppid:
+            return STATUS_NOT_STARTED
+        else:
+            status = rspawn.remote_status(
+                self._pid, self._ppid,
+                host = self.node.hostname,
+                port = None,
+                user = self.node.slicename,
+                agent = None,
+                ident_key = self.node.ident_path
+                )
+            
+            if status is rspawn.NOT_STARTED:
+                return STATUS_NOT_STARTED
+            elif status is rspawn.RUNNING:
+                return STATUS_RUNNING
+            elif status is rspawn.FINISHED:
+                return STATUS_FINISHED
+            else:
+                # WTF?
+                return STATUS_NOT_STARTED
+    
+    def kill(self):
+        status = self.status()
+        if status == STATUS_RUNNING:
+            # kill by ppid+pid - SIGTERM first, then try SIGKILL
+            rspawn.remote_kill(
+                self._pid, self._ppid,
+                host = self.node.hostname,
+                port = None,
+                user = self.node.slicename,
+                agent = None,
+                ident_key = self.node.ident_path,
+                server_key = self.node.server_key
+                )
+    
+class NepiDependency(Dependency):
+    """
+    A Dependency is in every respect like an application.
+    
+    It depends on some packages, it may require building binaries, it must deploy
+    them...
+    
+    But it has no command. Dependencies aren't ever started, or stopped, and have
+    no status.
+    """
+    
+    # Class attribute holding a *weak* reference to the shared NEPI tar file
+    # so that they may share it. Don't operate on the file itself, it would
+    # be a mess, just use its path.
+    _shared_nepi_tar = None
+    
+    def __init__(self, api = None):
+        super(NepiDependency, self).__init__(api)
+        
+        self._tarball = None
+        
+        self.depends = 'python python-ipaddrn python-setuptools'
+        
+        # our sources are in our ad-hoc tarball
+        self.sources = self.tarball.name
+        
+        tarname = os.path.basename(self.tarball.name)
+        
+        # it's already built - just move the tarball into place
+        self.build = "mv ${SOURCES}/%s ." % (tarname,)
+        
+        # unpack it into sources, and we're done
+        self.install = "tar xzf ${BUILD}/%s -C .." % (tarname,)
+    
+    @property
+    def tarball(self):
+        if self._tarball is None:
+            shared_tar = self._shared_nepi_tar and self._shared_nepi_tar()
+            if shared_tar is not None:
+                self._tarball = shared_tar
+            else:
+                # Build an ad-hoc tarball
+                # Prebuilt
+                import nepi
+                import tempfile
+                
+                shared_tar = tempfile.NamedTemporaryFile(prefix='nepi-src-', suffix='.tar.gz')
+                
+                proc = subprocess.Popen(
+                    ["tar", "czf", shared_tar.name, 
+                        '-C', os.path.join(os.path.dirname(os.path.dirname(nepi.__file__)),'.'), 
+                        'nepi'],
+                    stdout = open("/dev/null","w"),
+                    stdin = open("/dev/null","r"))
+
+                if proc.wait():
+                    raise RuntimeError, "Failed to create nepi tarball"
+                
+                self._tarball = self._shared_nepi_tar = shared_tar
+                
+        return self._tarball
+
+
index 1562f26..edbaa9a 100644 (file)
@@ -12,7 +12,7 @@ class TestbedController(testbed_impl.TestbedController):
         self._home_directory = None
         self.slicename = None
         self._traces = dict()
-        
+
         import node, interfaces, application
         self._node = node
         self._interfaces = interfaces
@@ -21,12 +21,12 @@ class TestbedController(testbed_impl.TestbedController):
     @property
     def home_directory(self):
         return self._home_directory
-    
+
     @property
     def plapi(self):
         if not hasattr(self, '_plapi'):
             import plcapi
-            
+
             if self.authUser:
                 self._plapi = plcapi.PLCAPI(
                     username = self.authUser,
@@ -35,7 +35,7 @@ class TestbedController(testbed_impl.TestbedController):
                 # anonymous access - may not be enough for much
                 self._plapi = plcapi.PLCAPI()
         return self._plapi
-    
+
     @property
     def slice_id(self):
         if not hasattr(self, '_slice_id'):
@@ -63,16 +63,16 @@ class TestbedController(testbed_impl.TestbedController):
         # Perform resource discovery if we don't have
         # specific resources assigned yet
         self.do_resource_discovery()
-        
+
         # Create PlanetLab slivers
         self.do_provisioning()
-        
+
         # Configure elements per XML data
         super(TestbedController, self).do_preconfigure()
-    
+
     def do_resource_discovery(self):
         # Do what?
-        
+
         # Provisional algo:
         #   look for perfectly defined nodes
         #   (ie: those with only one candidate)
@@ -103,18 +103,18 @@ class TestbedController(testbed_impl.TestbedController):
             cur_nodes = self.plapi.GetSlices(self.slicename, ['node_ids'])[0]['node_ids']
             new_nodes = list(set(cur_nodes) | self._to_provision)
             self.plapi.UpdateSlice(self.slicename, nodes=new_nodes)
-    
+
         # cleanup
         del self._to_provision
-    
+
 
     def set(self, time, guid, name, value):
         super(TestbedController, self).set(time, guid, name, value)
-        # TODO: take on account schedule time for the task 
+        # TODO: take on account schedule time for the task
         element = self._elements[guid]
         if element:
             setattr(element, name, value)
-            
+
             if hasattr(element, 'refresh'):
                 # invoke attribute refresh hook
                 element.refresh()
@@ -142,7 +142,7 @@ class TestbedController(testbed_impl.TestbedController):
 
     def get_address(self, guid, index, attribute='Address'):
         index = int(index)
-        
+
         # try the real stuff
         iface = self._elements.get(guid)
         if iface and index == 0:
@@ -152,7 +152,7 @@ class TestbedController(testbed_impl.TestbedController):
                 return iface.netprefix
             elif attribute == 'Broadcast':
                 return iface.broadcast
-        
+
         # if all else fails, query box
         try:
             return self.box_get_address(guid, index, attribute)
@@ -173,7 +173,7 @@ class TestbedController(testbed_impl.TestbedController):
 
     def trace(self, guid, trace_id, attribute='value'):
         app = self._elements[guid]
-        
+
         if attribute == 'value':
             path = app.sync_trace(self.home_directory, trace_id)
             if path:
@@ -187,68 +187,50 @@ class TestbedController(testbed_impl.TestbedController):
         else:
             content = None
         return content
-        
+
     def follow_trace(self, trace_id, trace):
         self._traces[trace_id] = trace
+    
+    def _make_generic(self, parameters, kind):
+        app = kind(self.plapi)
 
-    def _make_node(self, parameters):
-        node = self._node.Node(self.plapi)
-        
         # Note: there is 1-to-1 correspondence between attribute names
         #   If that changes, this has to change as well
         for attr,val in parameters.iteritems():
-            setattr(node, attr, val)
-        
-        # If emulation is enabled, we automatically need 
+            setattr(app, attr, val)
+
+        return app
+
+    def _make_node(self, parameters):
+        node = self._make_generic(parameters, self._node.Node)
+
+        # If emulation is enabled, we automatically need
         # some vsys interfaces and packages
         if node.emulation:
             node.required_vsys.add('ipfw-be')
             node.required_packages.add('ipfwslice')
-        
+
         return node
-    
+
     def _make_node_iface(self, parameters):
-        iface = self._interfaces.NodeIface(self.plapi)
-        
-        # Note: there is 1-to-1 correspondence between attribute names
-        #   If that changes, this has to change as well
-        for attr,val in parameters.iteritems():
-            setattr(iface, attr, val)
-        
-        return iface
-    
+        return self._make_generic(parameters, self._interfaces.NodeIface)
+
     def _make_tun_iface(self, parameters):
-        iface = self._interfaces.TunIface(self.plapi)
-        
-        # Note: there is 1-to-1 correspondence between attribute names
-        #   If that changes, this has to change as well
-        for attr,val in parameters.iteritems():
-            setattr(iface, attr, val)
-        
-        return iface
-    
+        return self._make_generic(parameters, self._interfaces.TunIface)
+
     def _make_netpipe(self, parameters):
-        iface = self._interfaces.NetPipe(self.plapi)
-        
-        # Note: there is 1-to-1 correspondence between attribute names
-        #   If that changes, this has to change as well
-        for attr,val in parameters.iteritems():
-            setattr(iface, attr, val)
-        
-        return iface
-    
+        return self._make_generic(parameters, self._interfaces.NetPipe)
+
     def _make_internet(self, parameters):
-        return self._interfaces.Internet(self.plapi)
-    
+        return self._make_generic(parameters, self._interfaces.Internet)
+
     def _make_application(self, parameters):
-        app = self._app.Application(self.plapi)
-        
-        # Note: there is 1-to-1 correspondence between attribute names
-        #   If that changes, this has to change as well
-        for attr,val in parameters.iteritems():
-            setattr(app, attr, val)
-        
-        return app
-        
+        return self._make_generic(parameters, self._app.Application)
+
+    def _make_dependency(self, parameters):
+        return self._make_generic(parameters, self._app.Dependency)
+
+    def _make_nepi_dependency(self, parameters):
+        return self._make_generic(parameters, self._app.NepiDependency)
 
 
index b1d51c6..34022bc 100644 (file)
@@ -18,6 +18,8 @@ NODE = "Node"
 NODEIFACE = "NodeInterface"
 TUNIFACE = "TunInterface"
 APPLICATION = "Application"
+DEPENDENCY = "Dependency"
+NEPIDEPENDENCY = "NepiDependency"
 INTERNET = "Internet"
 NETPIPE = "NetPipe"
 
@@ -93,12 +95,16 @@ def connect_tun_iface_peer(proto, testbed_instance, iface, peer_iface):
     iface.peer_proto = \
     iface.tun_proto = proto
 
-def connect_app(testbed_instance, node, app):
+def connect_dep(testbed_instance, node, app):
     app.node = node
     
     if app.depends:
         node.required_packages.update(set(
             app.depends.split() ))
+    
+    if app.add_to_path:
+        if app.home_path and app.home_path not in node.pythonpath:
+            node.pythonpath.append(app.home_path)
 
 def connect_node_netpipe(testbed_instance, node, netpipe):
     if not node.emulation:
@@ -137,6 +143,28 @@ def create_tuniface(testbed_instance, guid):
 def create_application(testbed_instance, guid):
     parameters = testbed_instance._get_parameters(guid)
     element = testbed_instance._make_application(parameters)
+    
+    # Just inject configuration stuff
+    element.home_path = "nepi-app-%s" % (guid,)
+    
+    testbed_instance.elements[guid] = element
+
+def create_dependency(testbed_instance, guid):
+    parameters = testbed_instance._get_parameters(guid)
+    element = testbed_instance._make_dependency(parameters)
+    
+    # Just inject configuration stuff
+    element.home_path = "nepi-dep-%s" % (guid,)
+    
+    testbed_instance.elements[guid] = element
+
+def create_nepi_dependency(testbed_instance, guid):
+    parameters = testbed_instance._get_parameters(guid)
+    element = testbed_instance._make_nepi_dependency(parameters)
+    
+    # Just inject configuration stuff
+    element.home_path = "nepi-nepi-%s" % (guid,)
+    
     testbed_instance.elements[guid] = element
 
 def create_internet(testbed_instance, guid):
@@ -260,11 +288,6 @@ def configure_node(testbed_instance, guid):
 def configure_application(testbed_instance, guid):
     app = testbed_instance._elements[guid]
     
-    # Just inject configuration stuff
-    app.home_path = "nepi-app-%s" % (guid,)
-    app.ident_path = testbed_instance.sliceSSHKey
-    app.slicename = testbed_instance.slicename
-    
     # Do some validations
     app.validate()
     
@@ -274,6 +297,18 @@ def configure_application(testbed_instance, guid):
     # Install stuff
     app.setup()
 
+def configure_dependency(testbed_instance, guid):
+    dep = testbed_instance._elements[guid]
+    
+    # Do some validations
+    dep.validate()
+    
+    # Wait for dependencies
+    dep.node.wait_dependencies()
+    
+    # Install stuff
+    dep.setup()
+
 def configure_netpipe(testbed_instance, guid):
     netpipe = testbed_instance._elements[guid]
     
@@ -301,6 +336,13 @@ connector_types = dict({
                 "max": -1, 
                 "min": 0
             }),
+    "deps": dict({
+                "help": "Connector from node to application dependencies "
+                        "(packages and applications that need to be installed)", 
+                "name": "deps",
+                "max": -1, 
+                "min": 0
+            }),
     "inet": dict({
                 "help": "Connector from network interfaces to the internet", 
                 "name": "inet",
@@ -356,7 +398,19 @@ connections = [
     dict({
         "from": (TESTBED_ID, NODE, "apps"),
         "to":   (TESTBED_ID, APPLICATION, "node"),
-        "init_code": connect_app,
+        "init_code": connect_dep,
+        "can_cross": False
+    }),
+    dict({
+        "from": (TESTBED_ID, NODE, "deps"),
+        "to":   (TESTBED_ID, DEPENDENCY, "node"),
+        "init_code": connect_dep,
+        "can_cross": False
+    }),
+    dict({
+        "from": (TESTBED_ID, NODE, "deps"),
+        "to":   (TESTBED_ID, NEPIDEPENDENCY, "node"),
+        "init_code": connect_dep,
         "can_cross": False
     }),
     dict({
@@ -684,9 +738,9 @@ traces = dict({
               }),
     })
 
-create_order = [ INTERNET, NODE, NODEIFACE, TUNIFACE, NETPIPE, APPLICATION ]
+create_order = [ INTERNET, NODE, NODEIFACE, TUNIFACE, NETPIPE, NEPIDEPENDENCY, DEPENDENCY, APPLICATION ]
 
-configure_order = [ INTERNET, NODE, NODEIFACE, TUNIFACE, NETPIPE, APPLICATION ]
+configure_order = [ INTERNET, NODE, NODEIFACE, TUNIFACE, NETPIPE, NEPIDEPENDENCY, DEPENDENCY, APPLICATION ]
 
 factories_info = dict({
     NODE: dict({
@@ -707,7 +761,7 @@ factories_info = dict({
                 "min_bandwidth",
                 "max_bandwidth",
             ],
-            "connector_types": ["devs", "apps", "pipes"]
+            "connector_types": ["devs", "apps", "pipes", "deps"]
        }),
     NODEIFACE: dict({
             "has_addresses": True,
@@ -744,7 +798,26 @@ factories_info = dict({
                                "depends", "build-depends", "build", "install",
                                "sources" ],
             "connector_types": ["node"],
-            "traces": ["stdout", "stderr"]
+            "traces": ["stdout", "stderr", "buildlog"]
+        }),
+    DEPENDENCY: dict({
+            "help": "Requirement for package or application to be installed on some node",
+            "category": "applications",
+            "create_function": create_dependency,
+            "configure_function": configure_dependency,
+            "box_attributes": ["depends", "build-depends", "build", "install",
+                               "sources" ],
+            "connector_types": ["node"],
+            "traces": ["buildlog"]
+        }),
+    NEPIDEPENDENCY: dict({
+            "help": "Requirement for NEPI inside NEPI - required to run testbed instances inside a node",
+            "category": "applications",
+            "create_function": create_nepi_dependency,
+            "configure_function": configure_dependency,
+            "box_attributes": [ ],
+            "connector_types": ["node"],
+            "traces": ["buildlog"]
         }),
     INTERNET: dict({
             "help": "Internet routing",
index db3828b..2214379 100644 (file)
@@ -56,6 +56,7 @@ class Node(object):
         # Applications and routes add requirements to connected nodes
         self.required_packages = set()
         self.required_vsys = set()
+        self.pythonpath = []
         
         # Testbed-derived attributes
         self.slicename = None
index 3ee224e..bfca936 100755 (executable)
@@ -54,34 +54,39 @@ class PlanetLabExecuteTestCase(unittest.TestCase):
         instance.defer_add_trace(7, "stderr")
         instance.defer_connect(7, "node", 2, "apps")
 
-        instance.do_setup()
-        instance.do_create()
-        instance.do_connect_init()
-        instance.do_connect_compl()
-        instance.do_preconfigure()
-        
-        # Manually replace netref
-        instance.set(TIME_NOW, 7, "command",
-            instance.get(TIME_NOW, 7, "command")
-                .replace("{#[GUID-5].addr[0].[Address]#}", 
-                    instance.get_address(5, 0, "Address") )
-        )
-
-        instance.do_configure()
-        
-        instance.start()
-        while instance.status(7) != STATUS_FINISHED:
-            time.sleep(0.5)
-        ping_result = instance.trace(7, "stdout") or ""
         comp_result = r"""PING .* \(.*\) \d*\(\d*\) bytes of data.
 
 --- .* ping statistics ---
 1 packets transmitted, 1 received, 0% packet loss, time \d*ms.*
 """
+
+        try:
+            instance.do_setup()
+            instance.do_create()
+            instance.do_connect_init()
+            instance.do_connect_compl()
+            instance.do_preconfigure()
+            
+            # Manually replace netref
+            instance.set(TIME_NOW, 7, "command",
+                instance.get(TIME_NOW, 7, "command")
+                    .replace("{#[GUID-5].addr[0].[Address]#}", 
+                        instance.get_address(5, 0, "Address") )
+            )
+
+            instance.do_configure()
+            
+            instance.start()
+            while instance.status(7) != STATUS_FINISHED:
+                time.sleep(0.5)
+            ping_result = instance.trace(7, "stdout") or ""
+            instance.stop()
+        finally:
+            instance.shutdown()
+
+        # asserts at the end, to make sure there's proper cleanup
         self.assertTrue(re.match(comp_result, ping_result, re.MULTILINE),
             "Unexpected trace:\n" + ping_result)
-        instance.stop()
-        instance.shutdown()
         
     @test_util.skipUnless(test_util.pl_auth() is not None, "Test requires PlanetLab authentication info (PL_USER and PL_PASS environment variables)")
     def test_depends(self):
@@ -100,22 +105,26 @@ class PlanetLabExecuteTestCase(unittest.TestCase):
         instance.defer_add_trace(5, "stderr")
         instance.defer_connect(5, "node", 2, "apps")
 
-        instance.do_setup()
-        instance.do_create()
-        instance.do_connect_init()
-        instance.do_connect_compl()
-        instance.do_preconfigure()
-        instance.do_configure()
-        
-        instance.start()
-        while instance.status(5) != STATUS_FINISHED:
-            time.sleep(0.5)
-        ping_result = instance.trace(5, "stdout") or ""
-        comp_result = r".*GNU Fortran \(GCC\).*"
+        try:
+            instance.do_setup()
+            instance.do_create()
+            instance.do_connect_init()
+            instance.do_connect_compl()
+            instance.do_preconfigure()
+            instance.do_configure()
+            
+            instance.start()
+            while instance.status(5) != STATUS_FINISHED:
+                time.sleep(0.5)
+            ping_result = instance.trace(5, "stdout") or ""
+            comp_result = r".*GNU Fortran \(GCC\).*"
+            instance.stop()
+        finally:
+            instance.shutdown()
+
+        # asserts at the end, to make sure there's proper cleanup
         self.assertTrue(re.match(comp_result, ping_result, re.MULTILINE),
             "Unexpected trace:\n" + ping_result)
-        instance.stop()
-        instance.shutdown()
         
     @test_util.skipUnless(test_util.pl_auth() is not None, "Test requires PlanetLab authentication info (PL_USER and PL_PASS environment variables)")
     def test_build(self):
@@ -137,17 +146,6 @@ class PlanetLabExecuteTestCase(unittest.TestCase):
         instance.defer_add_trace(10, "stderr")
         instance.defer_connect(10, "node", 2, "apps")
 
-        instance.do_setup()
-        instance.do_create()
-        instance.do_connect_init()
-        instance.do_connect_compl()
-        instance.do_preconfigure()
-        instance.do_configure()
-        
-        instance.start()
-        while instance.status(10) != STATUS_FINISHED:
-            time.sleep(0.5)
-        ping_result = instance.trace(10, "stdout") or ""
         comp_result = \
 r""".*ETH_P_ALL = 0x[0-9a-fA-F]{8}
 ETH_P_IP = 0x[0-9a-fA-F]{8}
@@ -162,10 +160,26 @@ IFNAMSIZ = 0x[0-9a-fA-F]{8}
 IFREQ_SZ = 0x[0-9a-fA-F]{8}
 FIONREAD = 0x[0-9a-fA-F]{8}.*
 """
+
+        try:
+            instance.do_setup()
+            instance.do_create()
+            instance.do_connect_init()
+            instance.do_connect_compl()
+            instance.do_preconfigure()
+            instance.do_configure()
+            
+            instance.start()
+            while instance.status(10) != STATUS_FINISHED:
+                time.sleep(0.5)
+            ping_result = instance.trace(10, "stdout") or ""
+            instance.stop()
+        finally:
+            instance.shutdown()
+
+        # asserts at the end, to make sure there's proper cleanup
         self.assertTrue(re.match(comp_result, ping_result, re.MULTILINE),
             "Unexpected trace:\n" + ping_result)
-        instance.stop()
-        instance.shutdown()
         
     @test_util.skipUnless(test_util.pl_auth() is not None, "Test requires PlanetLab authentication info (PL_USER and PL_PASS environment variables)")
     def test_simple_vsys(self):
@@ -195,21 +209,25 @@ echo 'OKIDOKI'
         instance.defer_add_trace(6, "stderr")
         instance.defer_connect(6, "node", 2, "apps")
 
-        instance.do_setup()
-        instance.do_create()
-        instance.do_connect_init()
-        instance.do_connect_compl()
-        instance.do_preconfigure()
-        instance.do_configure()
-        
-        instance.start()
-        while instance.status(6) != STATUS_FINISHED:
-            time.sleep(0.5)
-        test_result = (instance.trace(6, "stdout") or "").strip()
-        comp_result = "OKIDOKI"
+        try:
+            instance.do_setup()
+            instance.do_create()
+            instance.do_connect_init()
+            instance.do_connect_compl()
+            instance.do_preconfigure()
+            instance.do_configure()
+            
+            instance.start()
+            while instance.status(6) != STATUS_FINISHED:
+                time.sleep(0.5)
+            test_result = (instance.trace(6, "stdout") or "").strip()
+            comp_result = "OKIDOKI"
+            instance.stop()
+        finally:
+            instance.shutdown()
+
+        # asserts at the end, to make sure there's proper cleanup
         self.assertEqual(comp_result, test_result)
-        instance.stop()
-        instance.shutdown()
 
     @test_util.skipUnless(test_util.pl_auth() is not None, "Test requires PlanetLab authentication info (PL_USER and PL_PASS environment variables)")
     def test_emulation(self):
@@ -239,22 +257,24 @@ echo 'OKIDOKI'
         instance.defer_add_trace(8, "stderr")
         instance.defer_connect(8, "node", 2, "apps")
 
-        instance.do_setup()
-        instance.do_create()
-        instance.do_connect_init()
-        instance.do_connect_compl()
-        instance.do_preconfigure()
-        instance.do_configure()
-        
-        instance.start()
-        while instance.status(8) != STATUS_FINISHED:
-            time.sleep(0.5)
-        test_result = (instance.trace(8, "stderr") or "").strip()
-        comp_result = r".*real\s*(?P<min>[0-9]+)m(?P<sec>[0-9]+[.][0-9]+)s.*"
-        netpipe_stats = instance.trace(7, "netpipeStats")
-        
-        instance.stop()
-        instance.shutdown()
+        try:
+            instance.do_setup()
+            instance.do_create()
+            instance.do_connect_init()
+            instance.do_connect_compl()
+            instance.do_preconfigure()
+            instance.do_configure()
+            
+            instance.start()
+            while instance.status(8) != STATUS_FINISHED:
+                time.sleep(0.5)
+            test_result = (instance.trace(8, "stderr") or "").strip()
+            comp_result = r".*real\s*(?P<min>[0-9]+)m(?P<sec>[0-9]+[.][0-9]+)s.*"
+            netpipe_stats = instance.trace(7, "netpipeStats")
+            
+            instance.stop()
+        finally:
+            instance.shutdown()
 
         # asserts at the end, to make sure there's proper cleanup
         match = re.match(comp_result, test_result, re.MULTILINE)
@@ -326,36 +346,75 @@ echo 'OKIDOKI'
         instance.defer_add_trace(9, "stderr")
         instance.defer_connect(9, "node", 2, "apps")
 
-        instance.do_setup()
-        instance.do_create()
-        instance.do_connect_init()
-        instance.do_connect_compl()
-        instance.do_preconfigure()
-        
-        # Manually replace netref
-        instance.set(TIME_NOW, 9, "command",
-            instance.get(TIME_NOW, 9, "command")
-                .replace("{#[GUID-8].addr[0].[Address]#}", 
-                    instance.get_address(8, 0, "Address") )
-        )
-        
-        instance.do_configure()
-        
-        instance.start()
-        while instance.status(9) != STATUS_FINISHED:
-            time.sleep(0.5)
-        ping_result = instance.trace(9, "stdout") or ""
         comp_result = r"""PING .* \(.*\) \d*\(\d*\) bytes of data.
 
 --- .* ping statistics ---
 1 packets transmitted, 1 received, 0% packet loss, time \d*ms.*
 """
-        instance.stop()
-        instance.shutdown()
+
+        try:
+            instance.do_setup()
+            instance.do_create()
+            instance.do_connect_init()
+            instance.do_connect_compl()
+            instance.do_preconfigure()
+            
+            # Manually replace netref
+            instance.set(TIME_NOW, 9, "command",
+                instance.get(TIME_NOW, 9, "command")
+                    .replace("{#[GUID-8].addr[0].[Address]#}", 
+                        instance.get_address(8, 0, "Address") )
+            )
+            
+            instance.do_configure()
+            
+            instance.start()
+            while instance.status(9) != STATUS_FINISHED:
+                time.sleep(0.5)
+            ping_result = instance.trace(9, "stdout") or ""
+            instance.stop()
+        finally:
+            instance.shutdown()
 
         # asserts at the end, to make sure there's proper cleanup
         self.assertTrue(re.match(comp_result, ping_result, re.MULTILINE),
             "Unexpected trace:\n" + ping_result)
+
+    @test_util.skipUnless(test_util.pl_auth() is not None, "Test requires PlanetLab authentication info (PL_USER and PL_PASS environment variables)")
+    def test_nepi_depends(self):
+        instance = self.make_instance()
+        
+        instance.defer_create(2, "Node")
+        instance.defer_create_set(2, "hostname", "onelab11.pl.sophia.inria.fr")
+        instance.defer_create(3, "NodeInterface")
+        instance.defer_connect(2, "devs", 3, "node")
+        instance.defer_create(4, "Internet")
+        instance.defer_connect(3, "inet", 4, "devs")
+        instance.defer_create(5, "NepiDependency")
+        instance.defer_connect(5, "node", 2, "deps")
+        instance.defer_create(12, "Application")
+        instance.defer_connect(12, "node", 2, "apps")
+        instance.defer_create_set(12, "command", "python -c 'import nepi'")
+        instance.defer_add_trace(12, "stderr")
+
+        try:
+            instance.do_setup()
+            instance.do_create()
+            instance.do_connect_init()
+            instance.do_connect_compl()
+            instance.do_preconfigure()
+            instance.do_configure()
+            
+            instance.start()
+            while instance.status(12) != STATUS_FINISHED:
+                time.sleep(0.5)
+            ping_result = (instance.trace(12, "stderr") or "").strip()
+            instance.stop()
+        finally:
+            instance.shutdown()
+        
+        # asserts at the end, to make sure there's proper cleanup
+        self.assertEqual(ping_result, "")
         
 
 if __name__ == '__main__':