From 25a46fdfb9753474f35fa2dc677182f8500c52a9 Mon Sep 17 00:00:00 2001 From: Alina Quereilhac Date: Fri, 21 Jun 2013 12:06:37 -0700 Subject: [PATCH] More examples and code for Linux CCN RMs --- examples/linux/ccn/vlc_2_hosts_ccndrms.py | 126 ------------ examples/linux/ccn/vlc_extended_ring_topo.py | 192 +++++++++++++++++++ src/nepi/execution/ec.py | 14 +- src/nepi/resources/linux/application.py | 40 ++-- src/nepi/resources/linux/ccn/ccncontent.py | 85 ++++---- src/nepi/resources/linux/ccn/ccnd.py | 121 +++++++----- src/nepi/resources/linux/ccn/ccnr.py | 97 ++++------ src/nepi/resources/linux/node.py | 14 +- 8 files changed, 394 insertions(+), 295 deletions(-) delete mode 100755 examples/linux/ccn/vlc_2_hosts_ccndrms.py create mode 100755 examples/linux/ccn/vlc_extended_ring_topo.py diff --git a/examples/linux/ccn/vlc_2_hosts_ccndrms.py b/examples/linux/ccn/vlc_2_hosts_ccndrms.py deleted file mode 100755 index d531c1ad..00000000 --- a/examples/linux/ccn/vlc_2_hosts_ccndrms.py +++ /dev/null @@ -1,126 +0,0 @@ -#!/usr/bin/env python - -# -# NEPI, a framework to manage network experiments -# Copyright (C) 2013 INRIA -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . -# -# Author: Alina Quereilhac - -# -# topology: -# -# 0 -# / \ -# 0 --- 0 0 --- 0 -# \ / -# 0 -# -# - -from nepi.execution.ec import ExperimentController, ECState -from nepi.execution.resource import ResourceState, ResourceAction, \ - populate_factory -from nepi.resources.linux.node import OSType - -from optparse import OptionParser, SUPPRESS_HELP - -import os -import time - -def add_node(ec, host, user, ssh_key = None): - node = ec.register_resource("LinuxNode") - ec.set(node, "hostname", host) - ec.set(node, "username", user) - ec.set(node, "identity", ssh_key) - ec.set(node, "cleanHome", True) - ec.set(node, "cleanProcesses", True) - return node - -def add_ccnd(ec, node): - ccnd = ec.register_resource("LinuxCCND") - ec.set(ccnd, "debug", 7) - ec.register_connection(ccnd, node) - return ccnd - -def add_ccnr(ec, ccnd): - ccnr = ec.register_resource("LinuxCCNR") - ec.register_connection(ccnr, ccnd) - return ccnr - -def add_fib_entry(ec, ccnd, peer_host): - entry = ec.register_resource("LinuxFIBEntry") - ec.set(entry, "host", peer_host) - ec.register_connection(entry, ccnd) - return entry - -def add_content(ec, ccnr, content_name, content): - co = ec.register_resource("LinuxCCNContent") - ec.set(co, "contentName", content_name) - ec.set(co, "content", content) - ec.register_connection(co, ccnr) - return co - -def add_stream(ec, ccnd, content_name): - command = "sudo -S dbus-uuidgen --ensure ; ( ccncat %s | vlc - ) " % \ - content_name - - app = ec.register_resource("LinuxCCNDApplication") - ec.set(app, "depends", "vlc") - ec.set(app, "forwardX11", True) - ec.set(app, "command", command) - ec.register_connection(app, ccnd) - - return app - -if __name__ == '__main__': - # Search for available RMs - populate_factory() - - ec = ExperimentController(exp_id = "olahh") - - # hosts - host1 = 'planetlab2.u-strasbg.fr' - host2 = 'roseval.pl.sophia.inria.fr' - - # users - user1 = "inria_alina" - user2 = "alina" - - content_name = "ccnx:/VIDEO" - video = "/home/alina/repos/nepi/examples/big_buck_bunny_240p_mpeg4_lq.ts" - - # Register a ResourceManagers (RMs) - - node1 = add_node(ec, host1, user1) - ccnd1 = add_ccnd(ec, node1) - ccnr1 = add_ccnr(ec, ccnd1) - fibentry1 = add_fib_entry(ec, ccnd1, host2) - co = add_content(ec, ccnr1, content_name, video) - - node2 = add_node(ec, host2, user2) - ccnd2 = add_ccnd(ec, node2) - ccnr2 = add_ccnr(ec, ccnd2) - fibentry2 = add_fib_entry(ec, ccnd2, host1) - app = add_stream(ec, ccnd2, content_name) - - # Deploy all ResourceManagers - ec.deploy() - - ec.wait_finished([app]) - - # Shutdown the experiment controller - ec.shutdown() - diff --git a/examples/linux/ccn/vlc_extended_ring_topo.py b/examples/linux/ccn/vlc_extended_ring_topo.py new file mode 100755 index 00000000..b246c9bf --- /dev/null +++ b/examples/linux/ccn/vlc_extended_ring_topo.py @@ -0,0 +1,192 @@ +#!/usr/bin/env python + +# +# NEPI, a framework to manage network experiments +# Copyright (C) 2013 INRIA +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +# Author: Alina Quereilhac + +# +# CCN topology: +# +# h2 +# 0 +# content l1 / \ l2 ccncat +# b1 /l5 \ b2 +# 0 ----- h1 0 --- 0 h3 ------ 0 +# \ / +# l4 \ / l3 +# 0 +# h4 +# Experiment: +# - t0 : b2 retrives video published in b1 +# - t1 : l1 goes down +# - t2 : l2 goes down +# - t3 : l5 goes up +# + +from nepi.execution.ec import ExperimentController, ECState +from nepi.execution.resource import ResourceState, ResourceAction, \ + populate_factory +from nepi.resources.linux.node import OSType + +from optparse import OptionParser, SUPPRESS_HELP + +import os +import time +import tempfile + +def add_node(ec, host, user, ssh_key = None): + node = ec.register_resource("LinuxNode") + ec.set(node, "hostname", host) + ec.set(node, "username", user) + ec.set(node, "identity", ssh_key) + ec.set(node, "cleanHome", True) + ec.set(node, "cleanProcesses", True) + return node + +def add_ccnd(ec, node): + ccnd = ec.register_resource("LinuxCCND") + ec.set(ccnd, "debug", 7) + ec.register_connection(ccnd, node) + return ccnd + +def add_ccnr(ec, ccnd): + ccnr = ec.register_resource("LinuxCCNR") + ec.register_connection(ccnr, ccnd) + return ccnr + +def add_fib_entry(ec, ccnd, peer_host): + entry = ec.register_resource("LinuxFIBEntry") + ec.set(entry, "host", peer_host) + ec.register_connection(entry, ccnd) + return entry + +def add_content(ec, ccnr, content_name, content): + co = ec.register_resource("LinuxCCNContent") + ec.set(co, "contentName", content_name) + ec.set(co, "content", content) + ec.register_connection(co, ccnr) + return co + +def add_stream(ec, ccnd, content_name): + command = "sudo -S dbus-uuidgen --ensure ; ccncat %s | vlc - " % \ + content_name + + app = ec.register_resource("LinuxCCNApplication") + ec.set(app, "depends", "vlc") + ec.set(app, "forwardX11", True) + ec.set(app, "command", command) + ec.register_connection(app, ccnd) + + return app + +if __name__ == '__main__': + # Search for available RMs + populate_factory() + + ec = ExperimentController(exp_id = "olahh") + + # hosts + host1 = "planetlab2.u-strasbg.fr" + host2 = "planet1.servers.ua.pt" + host3 = "planetlab1.cs.uoi.gr" + host4 = "planetlab1.aston.ac.uk" + host5 = "itchy.comlab.bth.se" + host6 = "roseval.pl.sophia.inria.fr" + + # users + pluser = "inria_alina" + user = "alina" + + content_name = "ccnx:/VIDEO" + video = "/home/alina/repos/nepi/examples/big_buck_bunny_240p_mpeg4_lq.ts" + """ + # describe nodes in the central ring + ring_hosts = [host1, host2, host3, host4] + ccnds = dict() + + for i in xrange(len(ring_hosts)): + host = ring_hosts[i] + node = add_node(ec, host, pluser) + ccnd = add_ccnd(ec, node) + ccnr = add_ccnr(ec, ccnd) + ccnds[host] = ccnd + + ## Add ccn ring links + # l1 : h1 - h2 , h2 - h1 + l1u = add_fib_entry(ec, ccnds[host1], host2) + l1d = add_fib_entry(ec, ccnds[host2], host1) + + # l2 : h2 - h3 , h3 - h2 + l2u = add_fib_entry(ec, ccnds[host2], host3) + l2d = add_fib_entry(ec, ccnds[host3], host2) + + # l3 : h3 - h4 , h4 - h3 + l3u = add_fib_entry(ec, ccnds[host3], host4) + l3d = add_fib_entry(ec, ccnds[host4], host3) + + # l4 : h4 - h1 , h1 - h4 + l4u = add_fib_entry(ec, ccnds[host4], host1) + l4d = add_fib_entry(ec, ccnds[host1], host4) + + # l5 : h1 - h3 , h3 - h1 + l5u = add_fib_entry(ec, ccnds[host1], host3) + l5d = add_fib_entry(ec, ccnds[host3], host1) + """ + # border node 1 + bnode1 = add_node(ec, host5, pluser) + ccndb1 = add_ccnd(ec, bnode1) + ccnrb1 = add_ccnr(ec, ccndb1) + co = add_content(ec, ccnrb1, content_name, video) + + # border node 2 + bnode2 = add_node(ec, host6, user) + ccndb2 = add_ccnd(ec, bnode2) + ccnrb2 = add_ccnr(ec, ccndb2) + app = add_stream(ec, ccndb2, content_name) + + # connect border nodes + #add_fib_entry(ec, ccndb1, host1) + #add_fib_entry(ec, ccnds[host1], host5) + + #add_fib_entry(ec, ccndb2, host3) + #add_fib_entry(ec, ccnds[host3], host6) + + add_fib_entry(ec, ccndb2, host5) + add_fib_entry(ec, ccndb1, host6) + + # deploy all ResourceManagers + ec.deploy() + + ec.wait_finished([app]) + + """ + proc2 = subprocess.Popen(['vlc', + '--ffmpeg-threads=1', + '--sub-filter', 'marq', + '--marq-marquee', + '(c) copyright 2008, Blender Foundation / www.bigbuckbunny.org', + '--marq-position=8', + '--no-video-title-show', '-'], + stdin=proc1.stdout, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + + dirpath = tempfile.mkdtemp() + """ + + # shutdown the experiment controller + ec.shutdown() + diff --git a/src/nepi/execution/ec.py b/src/nepi/execution/ec.py index b5663c0f..d9508048 100644 --- a/src/nepi/execution/ec.py +++ b/src/nepi/execution/ec.py @@ -152,9 +152,12 @@ class ExperimentController(object): :param guids: List of guids :type guids: list """ - return self.wait(guids, states = [ResourceState.STARTED, ResourceState.FINISHED]) + return self.wait(guids, states = [ResourceState.STARTED, + ResourceState.STOPPED, + ResourceState.FINISHED]) - def wait(self, guids, states = [ResourceState.FINISHED]): + def wait(self, guids, states = [ResourceState.FINISHED, + ResourceState.STOPPED]): """ Blocking method that waits until all the RM from the 'guid' list reached state 'state' or until a failure occurs @@ -166,9 +169,14 @@ class ExperimentController(object): while not all([self.state(guid) in states for guid in guids]) and \ not any([self.state(guid) in [ - ResourceState.STOPPED, ResourceState.FAILED] for guid in guids]) and \ not self.finished: + # debug logging + waited = "" + for guid in guids: + waited += "guid %d - %s \n" % (guid, self.state(guid, hr = True)) + self.logger.debug(" WAITING FOR %s " % waited ) + # We keep the sleep big to decrease the number of RM state queries time.sleep(2) diff --git a/src/nepi/resources/linux/application.py b/src/nepi/resources/linux/application.py index a68307b7..440e2d1c 100644 --- a/src/nepi/resources/linux/application.py +++ b/src/nepi/resources/linux/application.py @@ -111,6 +111,7 @@ class LinuxApplication(ResourceManager): self._pid = None self._ppid = None self._home = "app-%s" % self.guid + self._in_foreground = False # keep a reference to the running process handler when # the command is not executed as remote daemon in background @@ -155,12 +156,12 @@ class LinuxApplication(ResourceManager): This means that command will be executed using 'execute' instead of 'run' ('run' executes a command in background and detached from the terminal) - + When using X11 forwarding option, the command can not run in background and detached from a terminal, since we need to keep the terminal attached to interact with it. """ - return self.get("forwardX11") or False + return self.get("forwardX11") or self._in_foreground def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0): self.info("Retrieving '%s' trace %s " % (name, attr)) @@ -312,8 +313,14 @@ class LinuxApplication(ResourceManager): if stdin: # create dir for sources self.info(" Uploading stdin ") - + dst = os.path.join(self.app_home, "stdin") + + # TODO: + # Check wether file already exists and if it exists + # wether the file we want to upload is the same + # (using md5sum) + self.node.upload(stdin, dst, text = True) def install_dependencies(self): @@ -391,6 +398,7 @@ class LinuxApplication(ResourceManager): # installation), then the application is directly marked as FINISHED self._state = ResourceState.FINISHED else: + if self.in_foreground: self._start_in_foreground() else: @@ -400,7 +408,6 @@ class LinuxApplication(ResourceManager): def _start_in_foreground(self): command = self.get("command") - env = self.get("env") stdin = "stdin" if self.get("stdin") else None sudo = self.get("sudo") or False x11 = self.get("forwardX11") @@ -409,6 +416,7 @@ class LinuxApplication(ResourceManager): # terminal using the node 'execute' in non blocking mode. # Export environment + env = self.get("env") environ = self.node.format_environment(env, inline = True) command = environ + command command = self.replace_paths(command) @@ -463,7 +471,8 @@ class LinuxApplication(ResourceManager): # If the process is not running, check for error information # on the remote machine if not self.pid or not self.ppid: - (out, err), proc = self.check_errors(home, ecodefile, stderr) + (out, err), proc = self.node.check_errors(self.app_home, + stderr = stderr) # Out is what was written in the stderr file if err: @@ -490,14 +499,17 @@ class LinuxApplication(ResourceManager): if self._proc: self._proc.kill() else: - (out, err), proc = self.node.kill(self.pid, self.ppid) - - if out or err: - # check if execution errors occurred - msg = " Failed to STOP command '%s' " % self.get("command") - self.error(msg, out, err) - self._state = ResourceState.FAILED - stopped = False + # Only try to kill the process if the pid and ppid + # were retrieved + if self.pid and self.ppid: + (out, err), proc = self.node.kill(self.pid, self.ppid) + + if out or err: + # check if execution errors occurred + msg = " Failed to STOP command '%s' " % self.get("command") + self.error(msg, out, err) + self._state = ResourceState.FAILED + stopped = False if stopped: super(LinuxApplication, self).stop() @@ -523,7 +535,7 @@ class LinuxApplication(ResourceManager): # Check if the process we used to execute the command # is still running ... retcode = self._proc.poll() - + # retcode == None -> running # retcode > 0 -> error # retcode == 0 -> finished diff --git a/src/nepi/resources/linux/ccn/ccncontent.py b/src/nepi/resources/linux/ccn/ccncontent.py index d5500118..3b081710 100644 --- a/src/nepi/resources/linux/ccn/ccncontent.py +++ b/src/nepi/resources/linux/ccn/ccncontent.py @@ -18,19 +18,16 @@ # Author: Alina Quereilhac from nepi.execution.attribute import Attribute, Flags, Types -from nepi.execution.trace import Trace, TraceAttr -from nepi.execution.resource import ResourceManager, clsinit_copy, ResourceState, \ +from nepi.execution.resource import clsinit_copy, ResourceState, \ ResourceAction -from nepi.resources.linux.application import LinuxApplication +from nepi.resources.linux.ccn.ccnapplication import LinuxCCNApplication from nepi.resources.linux.ccn.ccnr import LinuxCCNR from nepi.util.timefuncs import strfnow, strfdiff import os -reschedule_delay = "0.5s" - @clsinit_copy -class LinuxCCNContent(LinuxApplication): +class LinuxCCNContent(LinuxCCNApplication): _rtype = "LinuxCCNContent" @classmethod @@ -49,7 +46,6 @@ class LinuxCCNContent(LinuxApplication): def __init__(self, ec, guid): super(LinuxCCNContent, self).__init__(ec, guid) self._home = "content-%s" % self.guid - self._published = False @property def ccnr(self): @@ -63,68 +59,63 @@ class LinuxCCNContent(LinuxApplication): return None def deploy(self): - if not self.get("command"): - self.set("command", self._default_command) - - if not self.get("env"): - self.set("env", self._default_environment) + if not self.ccnr or self.ccnr.state < ResourceState.READY: + self.debug("---- RESCHEDULING DEPLOY ---- node state %s " % self.node.state ) + + reschedule_delay = "0.5s" + # ccnr needs to wait until ccnd is deployed and running + self.ec.schedule(reschedule_delay, self.deploy) + else: + command = self._start_command + env = self._environment + + self.set("command", command) + self.set("env", env) - if not self.get("stdin"): # set content to stdin, so the content will be # uploaded during provision self.set("stdin", self.get("content")) - # Wait until associated ccnd is provisioned - ccnr = self.ccnr + self.info("Deploying command '%s' " % command) - if not ccnr or ccnr.state < ResourceState.READY: - # ccnr needs to wait until ccnd is deployed and running - self.ec.schedule(reschedule_delay, self.deploy) - else: - # Invoke the actual deployment - super(LinuxCCNContent, self).deploy() + self.node.mkdir(self.app_home) - # As soon as the ccnr is running we can push the content - # to the repository ( we don't want to lose time launching - # writting the content to the repository later on ) - if self._state == ResourceState.READY: - self._start_in_background() - self._published = True + # upload content + self.upload_stdin() + + # We want to make sure the content is published + # before the experiment starts. + # Run the command as a bash script in the background, + # in the host ( but wait until the command has + # finished to continue ) + self.execute_command(command, env) + + self.debug("----- READY ---- ") + self._ready_time = strfnow() + self._state = ResourceState.READY def start(self): - # CCNR should already be started by now. - # Nothing to do but to set the state to STARTED - if self._published: + if self._state == ResourceState.READY: + command = self.get("command") + self.info("Starting command '%s'" % command) + self._start_time = strfnow() self._state = ResourceState.STARTED else: - msg = "Failed to execute command '%s'" % command + msg = " Failed to execute command '%s'" % command self.error(msg, out, err) self._state = ResourceState.FAILED raise RuntimeError, msg @property def state(self): - state = super(LinuxCCNContent, self).state - if self._state in [ResourceState.FINISHED, ResourceState.FAILED]: - self._published = False - - if self._state == ResourceState.READY: - # CCND is really deployed only when ccn daemon is running - if not self._published: - return ResourceState.PROVISIONED - return self._state @property - def _default_command(self): - return "ccnseqwriter -r %s " % self.get("contentName") + def _start_command(self): + return "ccnseqwriter -r %s < %s" % (self.get("contentName"), + os.path.join(self.app_home, 'stdin')) - @property - def _default_environment(self): - env = "PATH=$PATH:${EXP_HOME}/ccnx/bin " - return env - def valid_connection(self, guid): # TODO: Validate! return True diff --git a/src/nepi/resources/linux/ccn/ccnd.py b/src/nepi/resources/linux/ccn/ccnd.py index a393768a..bd001b4a 100644 --- a/src/nepi/resources/linux/ccn/ccnd.py +++ b/src/nepi/resources/linux/ccn/ccnd.py @@ -26,6 +26,8 @@ from nepi.util.timefuncs import strfnow, strfdiff import os +# TODO: use ccndlogging to dynamically change the logging level + @clsinit_copy class LinuxCCND(LinuxApplication): _rtype = "LinuxCCND" @@ -123,41 +125,80 @@ class LinuxCCND(LinuxApplication): super(LinuxCCND, self).__init__(ec, guid) self._home = "ccnd-%s" % self.guid - # Marks whether daemon is running - self._running = False - def deploy(self): - if not self.get("command"): - self.set("command", self._default_command) - - if not self.get("depends"): - self.set("depends", self._default_dependencies) + if not self.node or self.node.state < ResourceState.READY: + self.debug("---- RESCHEDULING DEPLOY ---- node state %s " % self.node.state ) + + reschedule_delay = "0.5s" + # ccnr needs to wait until ccnd is deployed and running + self.ec.schedule(reschedule_delay, self.deploy) + else: + if not self.get("command"): + self.set("command", self._start_command) + + if not self.get("depends"): + self.set("depends", self._dependencies) - if not self.get("sources"): - self.set("sources", self._default_sources) + if not self.get("sources"): + self.set("sources", self._sources) - if not self.get("build"): - self.set("build", self._default_build) + if not self.get("build"): + self.set("build", self._build) - if not self.get("install"): - self.set("install", self._default_install) + if not self.get("install"): + self.set("install", self._install) - if not self.get("env"): - self.set("env", self._default_environment) + if not self.get("env"): + self.set("env", self._environment) - super(LinuxCCND, self).deploy() + command = self.get("command") + env = self.get("env") - # As soon as the ccnd sources are deployed, we launch the - # daemon ( we don't want to lose time launching the ccn - # daemon later on ) - if self._state == ResourceState.READY: - self._start_in_background() - self._running = True + self.info("Deploying command '%s' " % command) + + # create home dir for application + self.node.mkdir(self.app_home) + + # upload sources + self.upload_sources() + + # upload code + self.upload_code() + + # upload stdin + self.upload_stdin() + + # install dependencies + self.install_dependencies() + + # build + self.build() + + # Install + self.install() + + # We want to make sure the repository is running + # before the experiment starts. + # Run the command as a bash script in background, + # in the host ( but wait until the command has + # finished to continue ) + env = self.replace_paths(env) + command = self.replace_paths(command) + + self.node.run_and_wait(command, self.app_home, + env = env, + shfile = "app.sh", + raise_on_error = True) + + self.debug("----- READY ---- ") + self._ready_time = strfnow() + self._state = ResourceState.READY def start(self): - # CCND should already be started by now. - # Nothing to do but to set the state to STARTED - if self._running: + if self._state == ResourceState.READY: + command = self.get("command") + self.info("Starting command '%s'" % command) + self._start_time = strfnow() self._state = ResourceState.STARTED else: @@ -189,36 +230,30 @@ class LinuxCCND(LinuxApplication): stdout = "ccndstop_stdout", stderr = "ccndstop_stderr") - - super(LinuxCCND, self).stop() - + self._stop_time = strfnow() + self._state = ResourceState.STOPPED + @property def state(self): # First check if the ccnd has failed state_check_delay = 0.5 - if self._running and strfdiff(strfnow(), self._last_state_check) > state_check_delay: + if self._state == ResourceState.STARTED and \ + strfdiff(strfnow(), self._last_state_check) > state_check_delay: (out, err), proc = self._ccndstatus retcode = proc.poll() if retcode == 1 and err.find("No such file or directory") > -1: # ccnd is not running (socket not found) - self._running = False self._state = ResourceState.FINISHED elif retcode: # other errors ... - self._running = False msg = " Failed to execute command '%s'" % self.get("command") self.error(msg, out, err) self._state = ResourceState.FAILED self._last_state_check = strfnow() - if self._state == ResourceState.READY: - # CCND is really deployed only when ccn daemon is running - if not self._running: - return ResourceState.PROVISIONED - return self._state @property @@ -231,11 +266,11 @@ class LinuxCCND(LinuxApplication): return self.node.execute(command) @property - def _default_command(self): + def _start_command(self): return "ccndstart" @property - def _default_dependencies(self): + def _dependencies(self): if self.node.os in [ OSType.FEDORA_12 , OSType.FEDORA_14 ]: return ( " autoconf openssl-devel expat-devel libpcap-devel " " ecryptfs-utils-devel libxml2-devel automake gawk " @@ -247,11 +282,11 @@ class LinuxCCND(LinuxApplication): return "" @property - def _default_sources(self): + def _sources(self): return "http://www.ccnx.org/releases/ccnx-0.7.2.tar.gz" @property - def _default_build(self): + def _build(self): sources = self.get("sources").split(" ")[0] sources = os.path.basename(sources) @@ -272,7 +307,7 @@ class LinuxCCND(LinuxApplication): " )") % ({ 'sources': sources }) @property - def _default_install(self): + def _install(self): return ( # Evaluate if ccnx binaries are already installed " ( " @@ -286,7 +321,7 @@ class LinuxCCND(LinuxApplication): ) @property - def _default_environment(self): + def _environment(self): envs = dict({ "debug": "CCND_DEBUG", "port": "CCN_LOCAL_PORT", diff --git a/src/nepi/resources/linux/ccn/ccnr.py b/src/nepi/resources/linux/ccn/ccnr.py index 5103719b..ef7ac26a 100644 --- a/src/nepi/resources/linux/ccn/ccnr.py +++ b/src/nepi/resources/linux/ccn/ccnr.py @@ -19,18 +19,16 @@ from nepi.execution.attribute import Attribute, Flags, Types from nepi.execution.trace import Trace, TraceAttr -from nepi.execution.resource import ResourceManager, clsinit_copy, ResourceState, \ +from nepi.execution.resource import clsinit_copy, ResourceState, \ ResourceAction -from nepi.resources.linux.application import LinuxApplication +from nepi.resources.linux.ccn.ccnapplication import LinuxCCNApplication from nepi.resources.linux.ccn.ccnd import LinuxCCND from nepi.util.timefuncs import strfnow, strfdiff import os -reschedule_delay = "0.5s" - @clsinit_copy -class LinuxCCNR(LinuxApplication): +class LinuxCCNR(LinuxCCNApplication): _rtype = "LinuxCCNR" @classmethod @@ -185,47 +183,49 @@ class LinuxCCNR(LinuxApplication): super(LinuxCCNR, self).__init__(ec, guid) self._home = "ccnr-%s" % self.guid - # Marks whether ccnr is running - self._running = False + def deploy(self): + if not self.ccnd or self.ccnd.state < ResourceState.READY: + self.debug("---- RESCHEDULING DEPLOY ---- CCND state %s " % self.ccnd.state ) + + reschedule_delay = "0.5s" + # ccnr needs to wait until ccnd is deployed and running + self.ec.schedule(reschedule_delay, self.deploy) + else: + command = self._start_command + env = self._environment - @property - def ccnd(self): - ccnd = self.get_connected(LinuxCCND.rtype()) - if ccnd: return ccnd[0] - return None + self.set("command", command) + self.set("env", env) - @property - def node(self): - if self.ccnd: return self.ccnd.node - return None + self.info("Deploying command '%s' " % command) - def deploy(self): - if not self.get("command"): - self.set("command", self._default_command) - - if not self.get("env"): - self.set("env", self._default_environment) + self.node.mkdir(self.app_home) - # Wait until associated ccnd is provisioned - ccnd = self.ccnd + # upload sources + self.upload_sources() - if not ccnd or ccnd.state < ResourceState.READY: - # ccnr needs to wait until ccnd is deployed and running - self.ec.schedule(reschedule_delay, self.deploy) - else: - # Invoke the actual deployment - super(LinuxCCNR, self).deploy() + # We want to make sure the repository is running + # before the experiment starts. + # Run the command as a bash script in background, + # in the host ( but wait until the command has + # finished to continue ) + env = self.replace_paths(env) + command = self.replace_paths(command) + + self.node.run_and_wait(command, self.app_home, + env = env, + shfile = "app.sh", + raise_on_error = True) - # As soon as deployment is finished, we launch the ccnr - # command ( we don't want to lose time ccnr later on ) - if self._state == ResourceState.READY: - self._start_in_background() - self._running = True + self.debug("----- READY ---- ") + self._ready_time = strfnow() + self._state = ResourceState.READY def start(self): - # CCND should already be started by now. - # Nothing to do but to set the state to STARTED - if self._running: + if self._state == ResourceState.READY: + command = self.get("command") + self.info("Starting command '%s'" % command) + self._start_time = strfnow() self._state = ResourceState.STARTED else: @@ -235,24 +235,11 @@ class LinuxCCNR(LinuxApplication): raise RuntimeError, msg @property - def state(self): - state = super(LinuxCCNR, self).state - if self._state in [ResourceState.FINISHED, ResourceState.FAILED]: - self._running = False - - if self._state == ResourceState.READY: - # CCND is really deployed only when ccn daemon is running - if not self._running: - return ResourceState.PROVISIONED - - return self._state + def _start_command(self): + return "ccnr &" @property - def _default_command(self): - return "ccnr" - - @property - def _default_environment(self): + def _environment(self): envs = dict({ "maxFanout": "CCNR_BTREE_MAX_FANOUT", "maxLeafEntries": "CCNR_BTREE_MAX_LEAF_ENTRIES", @@ -285,7 +272,7 @@ class LinuxCCNR(LinuxApplication): env = "PATH=$PATH:${EXP_HOME}/ccnx/bin " env += " ".join(map(lambda k: "%s=%s" % (envs.get(k), self.get(k)) \ if self.get(k) else "", envs.keys())) - + return env def valid_connection(self, guid): diff --git a/src/nepi/resources/linux/node.py b/src/nepi/resources/linux/node.py index 44bd29df..a96267f1 100644 --- a/src/nepi/resources/linux/node.py +++ b/src/nepi/resources/linux/node.py @@ -429,12 +429,10 @@ class LinuxNode(ResourceManager): sudo = False, tty = False, raise_on_error = False): - """ - runs a command in background on the remote host, busy-waiting - until the command finishes execution. - This is more robust than doing a simple synchronized 'execute', - since in the remote host the command can continue to run detached - even if network disconnections occur + """ + Uploads the 'command' to a bash script in the host. + Then runs the script detached in background in the host, and + busy-waites until the script finishes executing. """ self.upload_command(command, home, shfile = shfile, @@ -537,7 +535,9 @@ class LinuxNode(ResourceManager): as a bash script (i.e. export PYTHONPATH=src/.. \n export LALA=.. \n) """ if not env: return "" - env = env.strip() + + # Remove extra white spaces + env = re.sub(r'\s+', ' ', env.strip()) sep = ";" if inline else "\n" return sep.join(map(lambda e: " export %s" % e, env.split(" "))) + sep -- 2.43.0