Ticket #73: part 1, enable spanning tree deployment of package dependencies
authorClaudio-Daniel Freire <claudio-daniel.freire@inria.fr>
Tue, 26 Jul 2011 15:47:08 +0000 (17:47 +0200)
committerClaudio-Daniel Freire <claudio-daniel.freire@inria.fr>
Tue, 26 Jul 2011 15:47:08 +0000 (17:47 +0200)
src/nepi/testbeds/planetlab/application.py
src/nepi/testbeds/planetlab/execute.py
src/nepi/testbeds/planetlab/node.py

index 5dcd263..80c9396 100644 (file)
@@ -905,4 +905,60 @@ class NS3Dependency(Dependency):
                 
         return self._tarball
 
+class YumDependency(Dependency):
+    """
+    This dependency is an internal helper class used to
+    efficiently distribute yum-downloaded rpms.
+    
+    It temporarily sets the yum cache as persistent in the
+    build master, and installs all the required packages.
+    
+    The rpm packages left in the yum cache are gathered and
+    distributed by the underlying Dependency in an efficient
+    manner. Build slaves will then install those rpms back in
+    the cache before issuing the install command.
+    
+    When packages have been installed already, nothing but an
+    empty tar is distributed.
+    """
+    
+    # Class attribute holding a *weak* reference to the shared NEPI tar file
+    # so that they may share it. Don't operate on the file itself, it would
+    # be a mess, just use its path.
+    _shared_nepi_tar = None
+    
+    def _build_get(self):
+        # canonical representation of dependencies
+        depends = ' '.join( sorted( (self.depends or "").split(' ') ) )
+        
+        # download rpms and pack into a tar archive
+        return (
+            "sudo -S sed -i -r 's/keepcache *= *0/keepcache=1/' /etc/yum.conf && "
+            " ( ( "
+                "sudo -S yum -y install %s ; "
+                "tar -C /var/cache/yum -rf ${BUILD}/packages.tar $(find /var/cache/yum -iname '*.rpm')"
+            " ) || /bin/true ) && "
+            "sudo -S sed -i -r 's/keepcache *= *1/keepcache=0/' /etc/yum.conf && "
+            "sudo -S yum -y clean packages "
+        ) % ( depends, )
+    def _build_set(self, value):
+        # ignore
+        return
+    build = property(_build_get, _build_set)
+    
+    def _install_get(self):
+        # canonical representation of dependencies
+        depends = ' '.join( sorted( (self.depends or "").split(' ') ) )
+        
+        # unpack cached rpms into yum cache, install, and cleanup
+        return (
+            "tar -k --keep-newer-files -C /var/cache/yum xzf packages.tar && "
+            "yum -y install %s && "
+            "yum -y clean packages "
+        ) % ( depends, )
+    def _install_set(self, value):
+        # ignore
+        return
+    isntall = property(_install_get, _install_set)
+        
 
index 7b8a19f..c63a954 100644 (file)
@@ -267,6 +267,9 @@ class TestbedController(testbed_impl.TestbedController):
                     )
                     
                     self._logger.info("READY Node %s at %s", guid, node.hostname)
+                    
+                    # Prepare dependency installer now
+                    node.prepare_dependencies()
         except self._node.UnresponsiveNodeError:
             # Uh... 
             self._logger.warn("UNRESPONSIVE Node %s", node.hostname)
@@ -310,6 +313,10 @@ class TestbedController(testbed_impl.TestbedController):
         for element in self._elements.itervalues():
             if isinstance(element, self._app.Dependency):
                 depgroups[dephash(element)].append(element)
+            elif isinstance(element, self._node.Node):
+                deps = element._yum_dependencies
+                if deps:
+                    depgroups[dephash(deps)].append(deps)
         
         # Set up spanning deployment for those applications that
         # have been deployed in several nodes.
index 7efdb97..734d998 100644 (file)
@@ -17,6 +17,8 @@ import logging
 from nepi.util import server
 from nepi.util import parallel
 
+import application
+
 class UnresponsiveNodeError(RuntimeError):
     pass
 
@@ -78,6 +80,7 @@ class Node(object):
         
         # Those are filled when an actual node is allocated
         self._node_id = None
+        self._yum_dependencies = None
 
         # Logging
         self._logger = logging.getLogger('nepi.testbeds.planetlab')
@@ -315,10 +318,6 @@ class Node(object):
 
     def install_dependencies(self):
         if self.required_packages:
-            # TODO: make dependant on the experiment somehow...
-            pidfile = self.DEPENDS_PIDFILE
-            logfile = self.DEPENDS_LOGFILE
-            
             # If we need rpmfusion, we must install the repo definition and the gpg keys
             if self.rpmFusion:
                 if self.operatingSystem == 'f12':
@@ -336,31 +335,25 @@ class Node(object):
             else:
                 rpmFusion = ''
             
-            # Start process in a "daemonized" way, using nohup and heavy
-            # stdin/out redirection to avoid connection issues
-            (out,err),proc = rspawn.remote_spawn(
-                "( %(rpmfusion)s yum -y install %(packages)s && echo SUCCESS || echo FAILURE )" % {
-                    'packages' : ' '.join(self.required_packages),
-                    'rpmfusion' : rpmFusion,
-                },
-                pidfile = pidfile,
-                stdout = logfile,
-                stderr = rspawn.STDOUT,
+            if rpmFusion:
+                (out,err),proc = server.popen_ssh_command(
+                    rpmFusion,
+                    host = self.hostname,
+                    port = None,
+                    user = self.slicename,
+                    agent = None,
+                    ident_key = self.ident_path,
+                    server_key = self.server_key
+                    )
                 
-                host = self.hostname,
-                port = None,
-                user = self.slicename,
-                agent = None,
-                ident_key = self.ident_path,
-                server_key = self.server_key,
-                sudo = True
-                )
+                if proc.wait():
+                    raise RuntimeError, "Failed to set up application: %s %s" % (out,err,)
             
-            if proc.wait():
-                raise RuntimeError, "Failed to set up application: %s %s" % (out,err,)
+            # Launch p2p yum dependency installer
+            self._yum_dependencies.async_setup()
     
     def wait_provisioning(self, timeout = 20*60):
-        # recently provisioned nodes may not be up yet
+        # Wait for the p2p installer
         sleeptime = 1.0
         totaltime = 0.0
         while not self.is_alive():
@@ -375,61 +368,9 @@ class Node(object):
                 raise UnresponsiveNodeError, "Unresponsive host %s" % (self.hostname,)
     
     def wait_dependencies(self, pidprobe=1, probe=0.5, pidmax=10, probemax=10):
-        if self.required_packages:
-            pidfile = self.DEPENDS_PIDFILE
-            
-            # get PID
-            pid = ppid = None
-            for probenum in xrange(pidmax):
-                pidtuple = rspawn.remote_check_pid(
-                    pidfile = pidfile,
-                    host = self.hostname,
-                    port = None,
-                    user = self.slicename,
-                    agent = None,
-                    ident_key = self.ident_path,
-                    server_key = self.server_key
-                    )
-                if pidtuple:
-                    pid, ppid = pidtuple
-                    break
-                else:
-                    time.sleep(pidprobe)
-            else:
-                raise RuntimeError, "Failed to obtain pidfile for dependency installer"
-        
-            # wait for it to finish
-            while rspawn.RUNNING is rspawn.remote_status(
-                    pid, ppid,
-                    host = self.hostname,
-                    port = None,
-                    user = self.slicename,
-                    agent = None,
-                    ident_key = self.ident_path,
-                    server_key = self.server_key
-                    ):
-                time.sleep(probe)
-                probe = min(probemax, 1.5*probe)
-            
-            # check results
-            logfile = self.DEPENDS_LOGFILE
-            
-            (out,err),proc = server.popen_ssh_command(
-                "cat %s" % (server.shell_escape(logfile),),
-                host = self.hostname,
-                port = None,
-                user = self.slicename,
-                agent = None,
-                ident_key = self.ident_path,
-                server_key = self.server_key
-                )
-            
-            if proc.wait():
-                raise RuntimeError, "Failed to install dependencies: %s %s" % (out,err,)
-            
-            success = out.strip().rsplit('\n',1)[-1].strip() == 'SUCCESS'
-            if not success:
-                raise RuntimeError, "Failed to install dependencies - buildlog:\n%s\n%s" % (out,err,)
+        # Wait for the p2p installer
+        if self._yum_dependencies:
+            self._yum_dependencies.async_setup_wait()
         
     def is_alive(self):
         # Make sure all the paths are created where 
@@ -451,6 +392,13 @@ class Node(object):
         else:
             return False
     
+    def prepare_dependencies(self):
+        # Configure p2p yum dependency installer
+        if self.required_packages:
+            self._yum_dependencies = application.YumDependency(self._api)
+            self._yum_dependencies.node = self
+            self._yum_dependencies.home_path = "nepi-yumdep"
+            self._yum_dependencies.depends = ' '.join(self.required_packages)
 
     def configure_routes(self, routes, devs):
         """