return self._tarball
+class YumDependency(Dependency):
+ """
+ This dependency is an internal helper class used to
+ efficiently distribute yum-downloaded rpms.
+
+ It temporarily sets the yum cache as persistent in the
+ build master, and installs all the required packages.
+
+ The rpm packages left in the yum cache are gathered and
+ distributed by the underlying Dependency in an efficient
+ manner. Build slaves will then install those rpms back in
+ the cache before issuing the install command.
+
+ When packages have been installed already, nothing but an
+ empty tar is distributed.
+ """
+
+ # Class attribute holding a *weak* reference to the shared NEPI tar file
+ # so that they may share it. Don't operate on the file itself, it would
+ # be a mess, just use its path.
+ _shared_nepi_tar = None
+
+ def _build_get(self):
+ # canonical representation of dependencies
+ depends = ' '.join( sorted( (self.depends or "").split(' ') ) )
+
+ # download rpms and pack into a tar archive
+ return (
+ "sudo -S sed -i -r 's/keepcache *= *0/keepcache=1/' /etc/yum.conf && "
+ " ( ( "
+ "sudo -S yum -y install %s ; "
+ "rm -f ${BUILD}/packages.tar ; "
+ "tar -C /var/cache/yum -rf ${BUILD}/packages.tar $(find /var/cache/yum -iname '*.rpm')"
+ " ) || /bin/true ) && "
+ "sudo -S sed -i -r 's/keepcache *= *1/keepcache=0/' /etc/yum.conf && "
+ "sudo -S yum -y clean packages "
+ ) % ( depends, )
+ def _build_set(self, value):
+ # ignore
+ return
+ build = property(_build_get, _build_set)
+
+ def _install_get(self):
+ # canonical representation of dependencies
+ depends = ' '.join( sorted( (self.depends or "").split(' ') ) )
+
+ # unpack cached rpms into yum cache, install, and cleanup
+ return (
+ "tar -k --keep-newer-files -C /var/cache/yum xzf packages.tar && "
+ "yum -y install %s && "
+ "yum -y clean packages "
+ ) % ( depends, )
+ def _install_set(self, value):
+ # ignore
+ return
+ isntall = property(_install_get, _install_set)
+
get_attribute_value("plLogLevel")
self.tapPortBase = self._attributes.\
get_attribute_value("tapPortBase")
+ self.p2pDeployment = self._attributes.\
+ get_attribute_value("p2pDeployment")
self._logger.setLevel(getattr(logging,self.logLevel))
# Oh... retry...
pass
- # Plan application deployment
- self.do_spanning_deployment_plan()
+ if self.p2pDeployment:
+ # Plan application deployment
+ self.do_spanning_deployment_plan()
# Configure elements per XML data
super(TestbedController, self).do_preconfigure()
)
self._logger.info("READY Node %s at %s", guid, node.hostname)
+
+ # Prepare dependency installer now
+ node.prepare_dependencies()
except self._node.UnresponsiveNodeError:
# Uh...
self._logger.warn("UNRESPONSIVE Node %s", node.hostname)
for element in self._elements.itervalues():
if isinstance(element, self._app.Dependency):
depgroups[dephash(element)].append(element)
+ elif isinstance(element, self._node.Node):
+ deps = element._yum_dependencies
+ if deps:
+ depgroups[dephash(deps)].append(deps)
# Set up spanning deployment for those applications that
# have been deployed in several nodes.
"flags": Attribute.ExecReadOnly | Attribute.ExecImmutable,
"validation_function": validation.is_string
}),
+ "p2p_deployment": dict({
+ "name": "p2pDeployment",
+ "help": "Enable peer-to-peer deployment of applications and dependencies. "
+ "When enabled, dependency packages and applications are "
+ "deployed in a P2P fashion, picking a single node to do "
+ "the building or repo download, while all the others "
+ "cooperatively exchange resulting binaries or rpms. "
+ "When deploying to many nodes, this is a far more efficient "
+ "use of resources. It does require re-encrypting and distributing "
+ "the slice's private key. Though it is implemented in a secure "
+ "fashion, if they key's sole purpose is not PlanetLab, then this "
+ "feature should be disabled.",
+ "type": Attribute.BOOL,
+ "value": True,
+ "flags": Attribute.ExecReadOnly | Attribute.ExecImmutable,
+ "validation_function": validation.is_bool
+ }),
"slice_ssh_key": dict({
"name": "sliceSSHKey",
"help": "The controller-local path to the slice user's ssh private key. "
from nepi.util import server
from nepi.util import parallel
+import application
+
class UnresponsiveNodeError(RuntimeError):
pass
# Those are filled when an actual node is allocated
self._node_id = None
+ self._yum_dependencies = None
# Logging
self._logger = logging.getLogger('nepi.testbeds.planetlab')
def install_dependencies(self):
if self.required_packages:
- # TODO: make dependant on the experiment somehow...
- pidfile = self.DEPENDS_PIDFILE
- logfile = self.DEPENDS_LOGFILE
-
# If we need rpmfusion, we must install the repo definition and the gpg keys
if self.rpmFusion:
if self.operatingSystem == 'f12':
else:
rpmFusion = ''
- # Start process in a "daemonized" way, using nohup and heavy
- # stdin/out redirection to avoid connection issues
- (out,err),proc = rspawn.remote_spawn(
- "( %(rpmfusion)s yum -y install %(packages)s && echo SUCCESS || echo FAILURE )" % {
- 'packages' : ' '.join(self.required_packages),
- 'rpmfusion' : rpmFusion,
- },
- pidfile = pidfile,
- stdout = logfile,
- stderr = rspawn.STDOUT,
+ if rpmFusion:
+ (out,err),proc = server.popen_ssh_command(
+ rpmFusion,
+ host = self.hostname,
+ port = None,
+ user = self.slicename,
+ agent = None,
+ ident_key = self.ident_path,
+ server_key = self.server_key
+ )
- host = self.hostname,
- port = None,
- user = self.slicename,
- agent = None,
- ident_key = self.ident_path,
- server_key = self.server_key,
- sudo = True
- )
+ if proc.wait():
+ raise RuntimeError, "Failed to set up application: %s %s" % (out,err,)
- if proc.wait():
- raise RuntimeError, "Failed to set up application: %s %s" % (out,err,)
+ # Launch p2p yum dependency installer
+ self._yum_dependencies.async_setup()
def wait_provisioning(self, timeout = 20*60):
- # recently provisioned nodes may not be up yet
+ # Wait for the p2p installer
sleeptime = 1.0
totaltime = 0.0
while not self.is_alive():
raise UnresponsiveNodeError, "Unresponsive host %s" % (self.hostname,)
def wait_dependencies(self, pidprobe=1, probe=0.5, pidmax=10, probemax=10):
- if self.required_packages:
- pidfile = self.DEPENDS_PIDFILE
-
- # get PID
- pid = ppid = None
- for probenum in xrange(pidmax):
- pidtuple = rspawn.remote_check_pid(
- pidfile = pidfile,
- host = self.hostname,
- port = None,
- user = self.slicename,
- agent = None,
- ident_key = self.ident_path,
- server_key = self.server_key
- )
- if pidtuple:
- pid, ppid = pidtuple
- break
- else:
- time.sleep(pidprobe)
- else:
- raise RuntimeError, "Failed to obtain pidfile for dependency installer"
-
- # wait for it to finish
- while rspawn.RUNNING is rspawn.remote_status(
- pid, ppid,
- host = self.hostname,
- port = None,
- user = self.slicename,
- agent = None,
- ident_key = self.ident_path,
- server_key = self.server_key
- ):
- time.sleep(probe)
- probe = min(probemax, 1.5*probe)
-
- # check results
- logfile = self.DEPENDS_LOGFILE
-
- (out,err),proc = server.popen_ssh_command(
- "cat %s" % (server.shell_escape(logfile),),
- host = self.hostname,
- port = None,
- user = self.slicename,
- agent = None,
- ident_key = self.ident_path,
- server_key = self.server_key
- )
-
- if proc.wait():
- raise RuntimeError, "Failed to install dependencies: %s %s" % (out,err,)
-
- success = out.strip().rsplit('\n',1)[-1].strip() == 'SUCCESS'
- if not success:
- raise RuntimeError, "Failed to install dependencies - buildlog:\n%s\n%s" % (out,err,)
+ # Wait for the p2p installer
+ if self._yum_dependencies:
+ self._yum_dependencies.async_setup_wait()
def is_alive(self):
# Make sure all the paths are created where
else:
return False
+ def prepare_dependencies(self):
+ # Configure p2p yum dependency installer
+ if self.required_packages:
+ self._yum_dependencies = application.YumDependency(self._api)
+ self._yum_dependencies.node = self
+ self._yum_dependencies.home_path = "nepi-yumdep"
+ self._yum_dependencies.depends = ' '.join(self.required_packages)
def configure_routes(self, routes, devs):
"""
instance.defer_configure("authPass", pl_pwd)
instance.defer_configure("plcHost", plchost)
instance.defer_configure("tapPortBase", self.port_base)
+ instance.defer_configure("p2pDeployment", False) # it's interactive, we don't want it in tests
return instance
pl_desc.set_attribute_value("authPass", pl_pwd)
pl_desc.set_attribute_value("plcHost", plchost)
pl_desc.set_attribute_value("tapPortBase", self.port_base)
+ pl_desc.set_attribute_value("p2pDeployment", False) # it's interactive, we don't want it in tests
return pl_desc, exp_desc
controller.shutdown()
@test_util.skipUnless(test_util.pl_auth() is not None, "Test requires PlanetLab authentication info (PL_USER and PL_PASS environment variables)")
+ @test_util.skipUnless(os.environ.get('NEPI_FULL_TESTS','').lower() in ('1','yes','true','on'),
+ "Test is interactive, requires NEPI_FULL_TESTS=yes")
def test_spanning_deployment(self):
pl, exp = self.make_experiment_desc()
+
+ pl.set_attribute_value("p2pDeployment", True) # we do want it here - even if interactive
from nepi.testbeds import planetlab as plpackage
pl_desc.set_attribute_value("authPass", pl_pwd)
pl_desc.set_attribute_value("plcHost", plchost1)
pl_desc.set_attribute_value("tapPortBase", self.port_base)
+ pl_desc.set_attribute_value("p2pDeployment", False) # it's interactive, we don't want it in tests
pl_desc2 = exp_desc.add_testbed_description(pl_provider)
pl_desc2.set_attribute_value("homeDirectory", self.root_dir+"v2")
pl_desc2.set_attribute_value("authUser", pl_user)
pl_desc2.set_attribute_value("authPass", pl_pwd)
pl_desc2.set_attribute_value("plcHost", plchost2)
- pl_desc2.set_attribute_value("tapPortBase", self.port_base+100)
+ pl_desc2.set_attribute_value("tapPortBase", self.port_base+500)
+ pl_desc2.set_attribute_value("p2pDeployment", False) # it's interactive, we don't want it in tests
return pl_desc, pl_desc2, exp_desc
pl_desc.set_attribute_value("authPass", pl_pwd)
pl_desc.set_attribute_value("plcHost", plchost1)
pl_desc.set_attribute_value("tapPortBase", self.port_base)
+ pl_desc.set_attribute_value("p2pDeployment", False) # it's interactive, we don't want it in tests
pl_desc2 = exp_desc.add_testbed_description(pl_provider)
pl_desc2.set_attribute_value("homeDirectory", self.root_dir+"v2")
pl_desc2.set_attribute_value("authUser", pl_user)
pl_desc2.set_attribute_value("authPass", pl_pwd)
pl_desc2.set_attribute_value("plcHost", plchost2)
- pl_desc.set_attribute_value("tapPortBase", self.port_base+100)
+ pl_desc2.set_attribute_value("tapPortBase", self.port_base+500)
+ pl_desc2.set_attribute_value("p2pDeployment", False) # it's interactive, we don't want it in tests
return pl_desc, pl_desc2, exp_desc
pl_desc.set_attribute_value("authPass", pl_pwd)
pl_desc.set_attribute_value("plcHost", plchost)
pl_desc.set_attribute_value("tapPortBase", self.port_base)
+ pl_desc.set_attribute_value("p2pDeployment", False) # it's interactive, we don't want it in tests
return pl_desc, exp_desc