From: Alina Quereilhac Date: Thu, 20 Jun 2013 00:16:33 +0000 (-0700) Subject: First CCN RMs working example for Linux X-Git-Tag: nepi-3.0.0~93 X-Git-Url: http://git.onelab.eu/?a=commitdiff_plain;h=4c64cfa394d6210febf4c384276820093b104106;p=nepi.git First CCN RMs working example for Linux --- diff --git a/examples/linux/ccn/vlc_2_hosts_ccndrms.py b/examples/linux/ccn/vlc_2_hosts_ccndrms.py index d0638c9f..d531c1ad 100755 --- a/examples/linux/ccn/vlc_2_hosts_ccndrms.py +++ b/examples/linux/ccn/vlc_2_hosts_ccndrms.py @@ -19,6 +19,17 @@ # # Author: Alina Quereilhac +# +# topology: +# +# 0 +# / \ +# 0 --- 0 0 --- 0 +# \ / +# 0 +# +# + from nepi.execution.ec import ExperimentController, ECState from nepi.execution.resource import ResourceState, ResourceAction, \ populate_factory @@ -40,16 +51,40 @@ def add_node(ec, host, user, ssh_key = None): def add_ccnd(ec, node): ccnd = ec.register_resource("LinuxCCND") + ec.set(ccnd, "debug", 7) ec.register_connection(ccnd, node) return ccnd -def add_ccnr(ec, ccnd, node): +def add_ccnr(ec, ccnd): ccnr = ec.register_resource("LinuxCCNR") - ec.register_connection(ccnr, node) ec.register_connection(ccnr, ccnd) - return ccnr +def add_fib_entry(ec, ccnd, peer_host): + entry = ec.register_resource("LinuxFIBEntry") + ec.set(entry, "host", peer_host) + ec.register_connection(entry, ccnd) + return entry + +def add_content(ec, ccnr, content_name, content): + co = ec.register_resource("LinuxCCNContent") + ec.set(co, "contentName", content_name) + ec.set(co, "content", content) + ec.register_connection(co, ccnr) + return co + +def add_stream(ec, ccnd, content_name): + command = "sudo -S dbus-uuidgen --ensure ; ( ccncat %s | vlc - ) " % \ + content_name + + app = ec.register_resource("LinuxCCNDApplication") + ec.set(app, "depends", "vlc") + ec.set(app, "forwardX11", True) + ec.set(app, "command", command) + ec.register_connection(app, ccnd) + + return app + if __name__ == '__main__': # Search for available RMs populate_factory() @@ -64,20 +99,27 @@ if __name__ == '__main__': user1 = "inria_alina" user2 = "alina" + content_name = "ccnx:/VIDEO" + video = "/home/alina/repos/nepi/examples/big_buck_bunny_240p_mpeg4_lq.ts" + # Register a ResourceManagers (RMs) node1 = add_node(ec, host1, user1) ccnd1 = add_ccnd(ec, node1) - ccnr1 = add_ccnr(ec, ccnd1, node1) + ccnr1 = add_ccnr(ec, ccnd1) + fibentry1 = add_fib_entry(ec, ccnd1, host2) + co = add_content(ec, ccnr1, content_name, video) node2 = add_node(ec, host2, user2) ccnd2 = add_ccnd(ec, node2) - ccnr2 = add_ccnr(ec, ccnd2, node2) + ccnr2 = add_ccnr(ec, ccnd2) + fibentry2 = add_fib_entry(ec, ccnd2, host1) + app = add_stream(ec, ccnd2, content_name) # Deploy all ResourceManagers ec.deploy() - ec.wait_started([ccnd1, ccnr1, ccnd2, ccnr2]) + ec.wait_finished([app]) # Shutdown the experiment controller ec.shutdown() diff --git a/src/nepi/resources/linux/application.py b/src/nepi/resources/linux/application.py index d4ddf92d..a68307b7 100644 --- a/src/nepi/resources/linux/application.py +++ b/src/nepi/resources/linux/application.py @@ -413,8 +413,6 @@ class LinuxApplication(ResourceManager): command = environ + command command = self.replace_paths(command) - self.info("Starting command IN FOREGROUND '%s'" % command) - # We save the reference to the process in self._proc # to be able to kill the process from the stop method. # We also set blocking = False, since we don't want the diff --git a/src/nepi/resources/linux/ccn/ccncontent.py b/src/nepi/resources/linux/ccn/ccncontent.py index fbe7c956..d5500118 100644 --- a/src/nepi/resources/linux/ccn/ccncontent.py +++ b/src/nepi/resources/linux/ccn/ccncontent.py @@ -23,46 +23,45 @@ from nepi.execution.resource import ResourceManager, clsinit_copy, ResourceState ResourceAction from nepi.resources.linux.application import LinuxApplication from nepi.resources.linux.ccn.ccnr import LinuxCCNR -from nepi.resources.linux.node import OSType - -from nepi.util.sshfuncs import ProcStatus from nepi.util.timefuncs import strfnow, strfdiff + import os reschedule_delay = "0.5s" @clsinit_copy -class LinuxCCNR(LinuxApplication): +class LinuxCCNContent(LinuxApplication): _rtype = "LinuxCCNContent" @classmethod def _register_attributes(cls): content_name = Attribute("contentName", "The name of the content to publish (e.g. ccn:/VIDEO) ", - flags = Flags.ExecReadOnly) + flags = Flags.ExecReadOnly) + content = Attribute("content", "The content to publish. It can be a path to a file or plain text ", - flags = Flags.ExecReadOnly) - + flags = Flags.ExecReadOnly) cls._register_attribute(content_name) cls._register_attribute(content) - @classmethod - def _register_traces(cls): - log = Trace("log", "CCND log output") - - cls._register_trace(log) - def __init__(self, ec, guid): super(LinuxCCNContent, self).__init__(ec, guid) - + self._home = "content-%s" % self.guid + self._published = False + @property def ccnr(self): ccnr = self.get_connected(LinuxCCNR.rtype()) if ccnr: return ccnr[0] return None + @property + def node(self): + if self.ccnr: return self.ccnr.node + return None + def deploy(self): if not self.get("command"): self.set("command", self._default_command) @@ -70,20 +69,53 @@ class LinuxCCNR(LinuxApplication): if not self.get("env"): self.set("env", self._default_environment) + if not self.get("stdin"): + # set content to stdin, so the content will be + # uploaded during provision + self.set("stdin", self.get("content")) + # Wait until associated ccnd is provisioned ccnr = self.ccnr - if not ccnr or ccnr.state < ResourceState.STARTED: + if not ccnr or ccnr.state < ResourceState.READY: + # ccnr needs to wait until ccnd is deployed and running self.ec.schedule(reschedule_delay, self.deploy) else: - # Add a start after condition so CCNR will not start - # before CCND does - self.ec.register_condition(self.guid, ResourceAction.START, - ccnd.guid, ResourceState.STARTED) - # Invoke the actual deployment super(LinuxCCNContent, self).deploy() + # As soon as the ccnr is running we can push the content + # to the repository ( we don't want to lose time launching + # writting the content to the repository later on ) + if self._state == ResourceState.READY: + self._start_in_background() + self._published = True + + def start(self): + # CCNR should already be started by now. + # Nothing to do but to set the state to STARTED + if self._published: + self._start_time = strfnow() + self._state = ResourceState.STARTED + else: + msg = "Failed to execute command '%s'" % command + self.error(msg, out, err) + self._state = ResourceState.FAILED + raise RuntimeError, msg + + @property + def state(self): + state = super(LinuxCCNContent, self).state + if self._state in [ResourceState.FINISHED, ResourceState.FAILED]: + self._published = False + + if self._state == ResourceState.READY: + # CCND is really deployed only when ccn daemon is running + if not self._published: + return ResourceState.PROVISIONED + + return self._state + @property def _default_command(self): return "ccnseqwriter -r %s " % self.get("contentName") diff --git a/src/nepi/resources/linux/ccn/ccnd.py b/src/nepi/resources/linux/ccn/ccnd.py index 35d4de2e..a393768a 100644 --- a/src/nepi/resources/linux/ccn/ccnd.py +++ b/src/nepi/resources/linux/ccn/ccnd.py @@ -22,9 +22,8 @@ from nepi.execution.trace import Trace, TraceAttr from nepi.execution.resource import ResourceManager, clsinit_copy, ResourceState from nepi.resources.linux.application import LinuxApplication from nepi.resources.linux.node import OSType - -from nepi.util.sshfuncs import ProcStatus from nepi.util.timefuncs import strfnow, strfdiff + import os @clsinit_copy @@ -46,6 +45,7 @@ class LinuxCCND(LinuxApplication): " 128 - face registration debugging \n" " -1 - max logging \n" " Or apply bitwise OR to these values to get combinations of them", + type = Types.Integer, flags = Flags.ExecReadOnly) port = Attribute("port", "Sets the CCN_LOCAL_PORT environmental variable. " @@ -121,6 +121,8 @@ class LinuxCCND(LinuxApplication): def __init__(self, ec, guid): super(LinuxCCND, self).__init__(ec, guid) + self._home = "ccnd-%s" % self.guid + # Marks whether daemon is running self._running = False @@ -193,9 +195,9 @@ class LinuxCCND(LinuxApplication): @property def state(self): # First check if the ccnd has failed + state_check_delay = 0.5 if self._running and strfdiff(strfnow(), self._last_state_check) > state_check_delay: - state_check_delay = 0.5 - (out, err), proc = self._cndstatus() + (out, err), proc = self._ccndstatus retcode = proc.poll() @@ -206,7 +208,7 @@ class LinuxCCND(LinuxApplication): elif retcode: # other errors ... self._running = False - msg = " Failed to execute command '%s'" % command + msg = " Failed to execute command '%s'" % self.get("command") self.error(msg, out, err) self._state = ResourceState.FAILED @@ -223,7 +225,7 @@ class LinuxCCND(LinuxApplication): def _ccndstatus(self): env = self.get('env') or "" environ = self.node.format_environment(env, inline = True) - command = environ + "; ccndstatus" + command = environ + " ccndstatus" command = self.replace_paths(command) return self.node.execute(command) @@ -246,7 +248,7 @@ class LinuxCCND(LinuxApplication): @property def _default_sources(self): - return "http://www.ccnx.org/releases/ccnx-0.7.1.tar.gz" + return "http://www.ccnx.org/releases/ccnx-0.7.2.tar.gz" @property def _default_build(self): @@ -302,7 +304,7 @@ class LinuxCCND(LinuxApplication): }) env = "PATH=$PATH:${EXP_HOME}/ccnx/bin " - env += " ".join(map(lambda k: "%s=%s" % (envs.get(k), self.get(k)) \ + env += " ".join(map(lambda k: "%s=%s" % (envs.get(k), str(self.get(k))) \ if self.get(k) else "", envs.keys())) return env diff --git a/src/nepi/resources/linux/ccn/ccnr.py b/src/nepi/resources/linux/ccn/ccnr.py index 72ccef40..5103719b 100644 --- a/src/nepi/resources/linux/ccn/ccnr.py +++ b/src/nepi/resources/linux/ccn/ccnr.py @@ -23,10 +23,8 @@ from nepi.execution.resource import ResourceManager, clsinit_copy, ResourceState ResourceAction from nepi.resources.linux.application import LinuxApplication from nepi.resources.linux.ccn.ccnd import LinuxCCND -from nepi.resources.linux.node import OSType - -from nepi.util.sshfuncs import ProcStatus from nepi.util.timefuncs import strfnow, strfdiff + import os reschedule_delay = "0.5s" @@ -185,6 +183,8 @@ class LinuxCCNR(LinuxApplication): def __init__(self, ec, guid): super(LinuxCCNR, self).__init__(ec, guid) + self._home = "ccnr-%s" % self.guid + # Marks whether ccnr is running self._running = False @@ -194,6 +194,11 @@ class LinuxCCNR(LinuxApplication): if ccnd: return ccnd[0] return None + @property + def node(self): + if self.ccnd: return self.ccnd.node + return None + def deploy(self): if not self.get("command"): self.set("command", self._default_command) @@ -211,9 +216,8 @@ class LinuxCCNR(LinuxApplication): # Invoke the actual deployment super(LinuxCCNR, self).deploy() - # As soon as the ccnd sources are deployed, we launch the - # daemon ( we don't want to lose time launching the ccn - # daemon later on ) + # As soon as deployment is finished, we launch the ccnr + # command ( we don't want to lose time ccnr later on ) if self._state == ResourceState.READY: self._start_in_background() self._running = True @@ -232,8 +236,8 @@ class LinuxCCNR(LinuxApplication): @property def state(self): - state = super(LinuxCCNR, self).state() - if self._state in [ResourceState.TERMINATED, ResourceState.FAILED]: + state = super(LinuxCCNR, self).state + if self._state in [ResourceState.FINISHED, ResourceState.FAILED]: self._running = False if self._state == ResourceState.READY: diff --git a/test/resources/linux/application.py b/test/resources/linux/application.py index 5fe15d90..b1e489ce 100755 --- a/test/resources/linux/application.py +++ b/test/resources/linux/application.py @@ -35,7 +35,6 @@ import unittest class LinuxApplicationTestCase(unittest.TestCase): def setUp(self): self.fedora_host = "nepi2.pl.sophia.inria.fr" - self.fedora_host = "planetlab2.u-strasbg.fr" self.fedora_user = "inria_nepi" self.ubuntu_host = "roseval.pl.sophia.inria.fr" diff --git a/test/resources/linux/interface.py b/test/resources/linux/interface.py index 6b1f5496..0f332f37 100755 --- a/test/resources/linux/interface.py +++ b/test/resources/linux/interface.py @@ -35,11 +35,10 @@ import unittest class LinuxInterfaceTestCase(unittest.TestCase): def setUp(self): self.fedora_host = "nepi2.pl.sophia.inria.fr" - self.fedora_host = "planetlab2.u-strasbg.fr" - self.fedora_user = 'inria_nepi' + self.fedora_user = "inria_nepi" - self.ubuntu_host = 'roseval.pl.sophia.inria.fr' - self.ubuntu_user = 'alina' + self.ubuntu_host = "roseval.pl.sophia.inria.fr" + self.ubuntu_user = "alina" @skipIfNotAlive def t_deploy(self, host, user): diff --git a/test/resources/linux/node.py b/test/resources/linux/node.py index e4fbcb86..ef3d8df9 100755 --- a/test/resources/linux/node.py +++ b/test/resources/linux/node.py @@ -32,7 +32,6 @@ import unittest class LinuxNodeTestCase(unittest.TestCase): def setUp(self): self.fedora_host = "nepi2.pl.sophia.inria.fr" - self.fedora_host = "planetlab2.u-strasbg.fr" self.fedora_user = "inria_nepi" self.ubuntu_host = "roseval.pl.sophia.inria.fr"