Merge with tip
authorClaudio-Daniel Freire <claudio-daniel.freire@inria.fr>
Tue, 26 Jul 2011 16:24:29 +0000 (18:24 +0200)
committerClaudio-Daniel Freire <claudio-daniel.freire@inria.fr>
Tue, 26 Jul 2011 16:24:29 +0000 (18:24 +0200)
src/nepi/testbeds/planetlab/application.py
src/nepi/testbeds/planetlab/execute.py
src/nepi/testbeds/planetlab/metadata.py
src/nepi/testbeds/planetlab/node.py
test/testbeds/planetlab/execute.py
test/testbeds/planetlab/integration.py
test/testbeds/planetlab/integration_cross.py
test/testbeds/planetlab/integration_multi.py
test/testbeds/planetlab/integration_ns3.py

index 5dcd263..0b149aa 100644 (file)
@@ -905,4 +905,61 @@ class NS3Dependency(Dependency):
                 
         return self._tarball
 
+class YumDependency(Dependency):
+    """
+    This dependency is an internal helper class used to
+    efficiently distribute yum-downloaded rpms.
+    
+    It temporarily sets the yum cache as persistent in the
+    build master, and installs all the required packages.
+    
+    The rpm packages left in the yum cache are gathered and
+    distributed by the underlying Dependency in an efficient
+    manner. Build slaves will then install those rpms back in
+    the cache before issuing the install command.
+    
+    When packages have been installed already, nothing but an
+    empty tar is distributed.
+    """
+    
+    # Class attribute holding a *weak* reference to the shared NEPI tar file
+    # so that they may share it. Don't operate on the file itself, it would
+    # be a mess, just use its path.
+    _shared_nepi_tar = None
+    
+    def _build_get(self):
+        # canonical representation of dependencies
+        depends = ' '.join( sorted( (self.depends or "").split(' ') ) )
+        
+        # download rpms and pack into a tar archive
+        return (
+            "sudo -S sed -i -r 's/keepcache *= *0/keepcache=1/' /etc/yum.conf && "
+            " ( ( "
+                "sudo -S yum -y install %s ; "
+                "rm -f ${BUILD}/packages.tar ; "
+                "tar -C /var/cache/yum -rf ${BUILD}/packages.tar $(find /var/cache/yum -iname '*.rpm')"
+            " ) || /bin/true ) && "
+            "sudo -S sed -i -r 's/keepcache *= *1/keepcache=0/' /etc/yum.conf && "
+            "sudo -S yum -y clean packages "
+        ) % ( depends, )
+    def _build_set(self, value):
+        # ignore
+        return
+    build = property(_build_get, _build_set)
+    
+    def _install_get(self):
+        # canonical representation of dependencies
+        depends = ' '.join( sorted( (self.depends or "").split(' ') ) )
+        
+        # unpack cached rpms into yum cache, install, and cleanup
+        return (
+            "tar -k --keep-newer-files -C /var/cache/yum xzf packages.tar && "
+            "yum -y install %s && "
+            "yum -y clean packages "
+        ) % ( depends, )
+    def _install_set(self, value):
+        # ignore
+        return
+    isntall = property(_install_get, _install_set)
+        
 
index 7b8a19f..7b98ed9 100644 (file)
@@ -125,6 +125,8 @@ class TestbedController(testbed_impl.TestbedController):
             get_attribute_value("plLogLevel")
         self.tapPortBase = self._attributes.\
             get_attribute_value("tapPortBase")
+        self.p2pDeployment = self._attributes.\
+            get_attribute_value("p2pDeployment")
         
         self._logger.setLevel(getattr(logging,self.logLevel))
         
@@ -160,8 +162,9 @@ class TestbedController(testbed_impl.TestbedController):
                 # Oh... retry...
                 pass
         
-        # Plan application deployment
-        self.do_spanning_deployment_plan()
+        if self.p2pDeployment:
+            # Plan application deployment
+            self.do_spanning_deployment_plan()
 
         # Configure elements per XML data
         super(TestbedController, self).do_preconfigure()
@@ -267,6 +270,9 @@ class TestbedController(testbed_impl.TestbedController):
                     )
                     
                     self._logger.info("READY Node %s at %s", guid, node.hostname)
+                    
+                    # Prepare dependency installer now
+                    node.prepare_dependencies()
         except self._node.UnresponsiveNodeError:
             # Uh... 
             self._logger.warn("UNRESPONSIVE Node %s", node.hostname)
@@ -310,6 +316,10 @@ class TestbedController(testbed_impl.TestbedController):
         for element in self._elements.itervalues():
             if isinstance(element, self._app.Dependency):
                 depgroups[dephash(element)].append(element)
+            elif isinstance(element, self._node.Node):
+                deps = element._yum_dependencies
+                if deps:
+                    depgroups[dephash(deps)].append(deps)
         
         # Set up spanning deployment for those applications that
         # have been deployed in several nodes.
index 92e0b98..fff9b8b 100644 (file)
@@ -1110,6 +1110,23 @@ testbed_attributes = dict({
             "flags": Attribute.ExecReadOnly | Attribute.ExecImmutable,
             "validation_function": validation.is_string
         }),
+        "p2p_deployment": dict({
+            "name": "p2pDeployment",
+            "help": "Enable peer-to-peer deployment of applications and dependencies. "
+                    "When enabled, dependency packages and applications are "
+                    "deployed in a P2P fashion, picking a single node to do "
+                    "the building or repo download, while all the others "
+                    "cooperatively exchange resulting binaries or rpms. "
+                    "When deploying to many nodes, this is a far more efficient "
+                    "use of resources. It does require re-encrypting and distributing "
+                    "the slice's private key. Though it is implemented in a secure "
+                    "fashion, if they key's sole purpose is not PlanetLab, then this "
+                    "feature should be disabled.",
+            "type": Attribute.BOOL,
+            "value": True,
+            "flags": Attribute.ExecReadOnly | Attribute.ExecImmutable,
+            "validation_function": validation.is_bool
+        }),
         "slice_ssh_key": dict({
             "name": "sliceSSHKey",
             "help": "The controller-local path to the slice user's ssh private key. "
index 7efdb97..734d998 100644 (file)
@@ -17,6 +17,8 @@ import logging
 from nepi.util import server
 from nepi.util import parallel
 
+import application
+
 class UnresponsiveNodeError(RuntimeError):
     pass
 
@@ -78,6 +80,7 @@ class Node(object):
         
         # Those are filled when an actual node is allocated
         self._node_id = None
+        self._yum_dependencies = None
 
         # Logging
         self._logger = logging.getLogger('nepi.testbeds.planetlab')
@@ -315,10 +318,6 @@ class Node(object):
 
     def install_dependencies(self):
         if self.required_packages:
-            # TODO: make dependant on the experiment somehow...
-            pidfile = self.DEPENDS_PIDFILE
-            logfile = self.DEPENDS_LOGFILE
-            
             # If we need rpmfusion, we must install the repo definition and the gpg keys
             if self.rpmFusion:
                 if self.operatingSystem == 'f12':
@@ -336,31 +335,25 @@ class Node(object):
             else:
                 rpmFusion = ''
             
-            # Start process in a "daemonized" way, using nohup and heavy
-            # stdin/out redirection to avoid connection issues
-            (out,err),proc = rspawn.remote_spawn(
-                "( %(rpmfusion)s yum -y install %(packages)s && echo SUCCESS || echo FAILURE )" % {
-                    'packages' : ' '.join(self.required_packages),
-                    'rpmfusion' : rpmFusion,
-                },
-                pidfile = pidfile,
-                stdout = logfile,
-                stderr = rspawn.STDOUT,
+            if rpmFusion:
+                (out,err),proc = server.popen_ssh_command(
+                    rpmFusion,
+                    host = self.hostname,
+                    port = None,
+                    user = self.slicename,
+                    agent = None,
+                    ident_key = self.ident_path,
+                    server_key = self.server_key
+                    )
                 
-                host = self.hostname,
-                port = None,
-                user = self.slicename,
-                agent = None,
-                ident_key = self.ident_path,
-                server_key = self.server_key,
-                sudo = True
-                )
+                if proc.wait():
+                    raise RuntimeError, "Failed to set up application: %s %s" % (out,err,)
             
-            if proc.wait():
-                raise RuntimeError, "Failed to set up application: %s %s" % (out,err,)
+            # Launch p2p yum dependency installer
+            self._yum_dependencies.async_setup()
     
     def wait_provisioning(self, timeout = 20*60):
-        # recently provisioned nodes may not be up yet
+        # Wait for the p2p installer
         sleeptime = 1.0
         totaltime = 0.0
         while not self.is_alive():
@@ -375,61 +368,9 @@ class Node(object):
                 raise UnresponsiveNodeError, "Unresponsive host %s" % (self.hostname,)
     
     def wait_dependencies(self, pidprobe=1, probe=0.5, pidmax=10, probemax=10):
-        if self.required_packages:
-            pidfile = self.DEPENDS_PIDFILE
-            
-            # get PID
-            pid = ppid = None
-            for probenum in xrange(pidmax):
-                pidtuple = rspawn.remote_check_pid(
-                    pidfile = pidfile,
-                    host = self.hostname,
-                    port = None,
-                    user = self.slicename,
-                    agent = None,
-                    ident_key = self.ident_path,
-                    server_key = self.server_key
-                    )
-                if pidtuple:
-                    pid, ppid = pidtuple
-                    break
-                else:
-                    time.sleep(pidprobe)
-            else:
-                raise RuntimeError, "Failed to obtain pidfile for dependency installer"
-        
-            # wait for it to finish
-            while rspawn.RUNNING is rspawn.remote_status(
-                    pid, ppid,
-                    host = self.hostname,
-                    port = None,
-                    user = self.slicename,
-                    agent = None,
-                    ident_key = self.ident_path,
-                    server_key = self.server_key
-                    ):
-                time.sleep(probe)
-                probe = min(probemax, 1.5*probe)
-            
-            # check results
-            logfile = self.DEPENDS_LOGFILE
-            
-            (out,err),proc = server.popen_ssh_command(
-                "cat %s" % (server.shell_escape(logfile),),
-                host = self.hostname,
-                port = None,
-                user = self.slicename,
-                agent = None,
-                ident_key = self.ident_path,
-                server_key = self.server_key
-                )
-            
-            if proc.wait():
-                raise RuntimeError, "Failed to install dependencies: %s %s" % (out,err,)
-            
-            success = out.strip().rsplit('\n',1)[-1].strip() == 'SUCCESS'
-            if not success:
-                raise RuntimeError, "Failed to install dependencies - buildlog:\n%s\n%s" % (out,err,)
+        # Wait for the p2p installer
+        if self._yum_dependencies:
+            self._yum_dependencies.async_setup_wait()
         
     def is_alive(self):
         # Make sure all the paths are created where 
@@ -451,6 +392,13 @@ class Node(object):
         else:
             return False
     
+    def prepare_dependencies(self):
+        # Configure p2p yum dependency installer
+        if self.required_packages:
+            self._yum_dependencies = application.YumDependency(self._api)
+            self._yum_dependencies.node = self
+            self._yum_dependencies.home_path = "nepi-yumdep"
+            self._yum_dependencies.depends = ' '.join(self.required_packages)
 
     def configure_routes(self, routes, devs):
         """
index 137b258..40a967b 100755 (executable)
@@ -53,6 +53,7 @@ class PlanetLabExecuteTestCase(unittest.TestCase):
         instance.defer_configure("authPass", pl_pwd)
         instance.defer_configure("plcHost", plchost)
         instance.defer_configure("tapPortBase", self.port_base)
+        instance.defer_configure("p2pDeployment", False) # it's interactive, we don't want it in tests
         
         return instance
 
index b5b2837..2d24e3e 100755 (executable)
@@ -58,6 +58,7 @@ class PlanetLabIntegrationTestCase(unittest.TestCase):
         pl_desc.set_attribute_value("authPass", pl_pwd)
         pl_desc.set_attribute_value("plcHost", plchost)
         pl_desc.set_attribute_value("tapPortBase", self.port_base)
+        pl_desc.set_attribute_value("p2pDeployment", False) # it's interactive, we don't want it in tests
         
         return pl_desc, exp_desc
     
@@ -117,8 +118,12 @@ class PlanetLabIntegrationTestCase(unittest.TestCase):
             controller.shutdown()
 
     @test_util.skipUnless(test_util.pl_auth() is not None, "Test requires PlanetLab authentication info (PL_USER and PL_PASS environment variables)")
+    @test_util.skipUnless(os.environ.get('NEPI_FULL_TESTS','').lower() in ('1','yes','true','on'),
+        "Test is interactive, requires NEPI_FULL_TESTS=yes")
     def test_spanning_deployment(self):
         pl, exp = self.make_experiment_desc()
+
+        pl.set_attribute_value("p2pDeployment", True) # we do want it here - even if interactive
         
         from nepi.testbeds import planetlab as plpackage
         
index e20ad97..8dcb8cb 100755 (executable)
@@ -67,6 +67,7 @@ class PlanetLabMultiIntegrationTestCase(unittest.TestCase):
         pl_desc.set_attribute_value("authPass", pl_pwd)
         pl_desc.set_attribute_value("plcHost", plchost1)
         pl_desc.set_attribute_value("tapPortBase", self.port_base)
+        pl_desc.set_attribute_value("p2pDeployment", False) # it's interactive, we don't want it in tests
 
         pl_desc2 = exp_desc.add_testbed_description(pl_provider)
         pl_desc2.set_attribute_value("homeDirectory", self.root_dir+"v2")
@@ -75,7 +76,8 @@ class PlanetLabMultiIntegrationTestCase(unittest.TestCase):
         pl_desc2.set_attribute_value("authUser", pl_user)
         pl_desc2.set_attribute_value("authPass", pl_pwd)
         pl_desc2.set_attribute_value("plcHost", plchost2)
-        pl_desc2.set_attribute_value("tapPortBase", self.port_base+100)
+        pl_desc2.set_attribute_value("tapPortBase", self.port_base+500)
+        pl_desc2.set_attribute_value("p2pDeployment", False) # it's interactive, we don't want it in tests
         
         return pl_desc, pl_desc2, exp_desc
     
index b8e8527..30f56c5 100755 (executable)
@@ -67,6 +67,7 @@ class PlanetLabMultiIntegrationTestCase(unittest.TestCase):
         pl_desc.set_attribute_value("authPass", pl_pwd)
         pl_desc.set_attribute_value("plcHost", plchost1)
         pl_desc.set_attribute_value("tapPortBase", self.port_base)
+        pl_desc.set_attribute_value("p2pDeployment", False) # it's interactive, we don't want it in tests
 
         pl_desc2 = exp_desc.add_testbed_description(pl_provider)
         pl_desc2.set_attribute_value("homeDirectory", self.root_dir+"v2")
@@ -75,7 +76,8 @@ class PlanetLabMultiIntegrationTestCase(unittest.TestCase):
         pl_desc2.set_attribute_value("authUser", pl_user)
         pl_desc2.set_attribute_value("authPass", pl_pwd)
         pl_desc2.set_attribute_value("plcHost", plchost2)
-        pl_desc.set_attribute_value("tapPortBase", self.port_base+100)
+        pl_desc2.set_attribute_value("tapPortBase", self.port_base+500)
+        pl_desc2.set_attribute_value("p2pDeployment", False) # it's interactive, we don't want it in tests
         
         return pl_desc, pl_desc2, exp_desc
     
index 8091468..b9ef952 100755 (executable)
@@ -57,6 +57,7 @@ class PlanetLabCrossIntegrationTestCase(unittest.TestCase):
         pl_desc.set_attribute_value("authPass", pl_pwd)
         pl_desc.set_attribute_value("plcHost", plchost)
         pl_desc.set_attribute_value("tapPortBase", self.port_base)
+        pl_desc.set_attribute_value("p2pDeployment", False) # it's interactive, we don't want it in tests
         
         return pl_desc, exp_desc