First CCN RMs working example for Linux
authorAlina Quereilhac <alina.quereilhac@inria.fr>
Thu, 20 Jun 2013 00:16:33 +0000 (17:16 -0700)
committerAlina Quereilhac <alina.quereilhac@inria.fr>
Thu, 20 Jun 2013 00:16:33 +0000 (17:16 -0700)
examples/linux/ccn/vlc_2_hosts_ccndrms.py
src/nepi/resources/linux/application.py
src/nepi/resources/linux/ccn/ccncontent.py
src/nepi/resources/linux/ccn/ccnd.py
src/nepi/resources/linux/ccn/ccnr.py
test/resources/linux/application.py
test/resources/linux/interface.py
test/resources/linux/node.py

index d0638c9..d531c1a 100755 (executable)
 #
 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
 
+#
+# topology:
+#
+#           0
+#         /   \
+#  0 --- 0     0 --- 0
+#         \   /
+#           0
+#
+#
+
 from nepi.execution.ec import ExperimentController, ECState 
 from nepi.execution.resource import ResourceState, ResourceAction, \
         populate_factory
@@ -40,16 +51,40 @@ def add_node(ec, host, user, ssh_key = None):
 
 def add_ccnd(ec, node):
     ccnd = ec.register_resource("LinuxCCND")
+    ec.set(ccnd, "debug", 7)
     ec.register_connection(ccnd, node)
     return ccnd
 
-def add_ccnr(ec, ccnd, node):
+def add_ccnr(ec, ccnd):
     ccnr = ec.register_resource("LinuxCCNR")
-    ec.register_connection(ccnr, node)
     ec.register_connection(ccnr, ccnd)
-
     return ccnr
 
+def add_fib_entry(ec, ccnd, peer_host):
+    entry = ec.register_resource("LinuxFIBEntry")
+    ec.set(entry, "host", peer_host)
+    ec.register_connection(entry, ccnd)
+    return entry
+
+def add_content(ec, ccnr, content_name, content):
+    co = ec.register_resource("LinuxCCNContent")
+    ec.set(co, "contentName", content_name)
+    ec.set(co, "content", content)
+    ec.register_connection(co, ccnr)
+    return co
+
+def add_stream(ec, ccnd, content_name):
+    command = "sudo -S dbus-uuidgen --ensure ; ( ccncat %s | vlc - ) " % \
+            content_name
+
+    app = ec.register_resource("LinuxCCNDApplication")
+    ec.set(app, "depends", "vlc")
+    ec.set(app, "forwardX11", True)
+    ec.set(app, "command", command)
+    ec.register_connection(app, ccnd)
+
+    return app
+
 if __name__ == '__main__':
     # Search for available RMs
     populate_factory()
@@ -64,20 +99,27 @@ if __name__ == '__main__':
     user1 = "inria_alina"
     user2 = "alina"
 
+    content_name = "ccnx:/VIDEO"
+    video = "/home/alina/repos/nepi/examples/big_buck_bunny_240p_mpeg4_lq.ts"
+
     # Register a ResourceManagers (RMs)
 
     node1 = add_node(ec, host1, user1)
     ccnd1 = add_ccnd(ec, node1)
-    ccnr1 = add_ccnr(ec, ccnd1, node1)
+    ccnr1 = add_ccnr(ec, ccnd1)
+    fibentry1 = add_fib_entry(ec, ccnd1, host2)
+    co = add_content(ec, ccnr1, content_name, video)
 
     node2 = add_node(ec, host2, user2)
     ccnd2 = add_ccnd(ec, node2)
-    ccnr2 = add_ccnr(ec, ccnd2, node2)
+    ccnr2 = add_ccnr(ec, ccnd2)
+    fibentry2 = add_fib_entry(ec, ccnd2, host1)
+    app = add_stream(ec, ccnd2, content_name)
  
     # Deploy all ResourceManagers
     ec.deploy()
 
-    ec.wait_started([ccnd1, ccnr1, ccnd2, ccnr2])
+    ec.wait_finished([app])
 
     # Shutdown the experiment controller
     ec.shutdown()
index d4ddf92..a68307b 100644 (file)
@@ -413,8 +413,6 @@ class LinuxApplication(ResourceManager):
         command = environ + command
         command = self.replace_paths(command)
 
-        self.info("Starting command IN FOREGROUND '%s'" % command)
-        
         # We save the reference to the process in self._proc 
         # to be able to kill the process from the stop method.
         # We also set blocking = False, since we don't want the
index fbe7c95..d550011 100644 (file)
@@ -23,46 +23,45 @@ from nepi.execution.resource import ResourceManager, clsinit_copy, ResourceState
     ResourceAction
 from nepi.resources.linux.application import LinuxApplication
 from nepi.resources.linux.ccn.ccnr import LinuxCCNR
-from nepi.resources.linux.node import OSType
-
-from nepi.util.sshfuncs import ProcStatus
 from nepi.util.timefuncs import strfnow, strfdiff
+
 import os
 
 reschedule_delay = "0.5s"
 
 @clsinit_copy
-class LinuxCCNR(LinuxApplication):
+class LinuxCCNContent(LinuxApplication):
     _rtype = "LinuxCCNContent"
 
     @classmethod
     def _register_attributes(cls):
         content_name = Attribute("contentName",
                 "The name of the content to publish (e.g. ccn:/VIDEO) ",
-            flags = Flags.ExecReadOnly)
+                flags = Flags.ExecReadOnly)
+
         content = Attribute("content",
                 "The content to publish. It can be a path to a file or plain text ",
-            flags = Flags.ExecReadOnly)
-
+                flags = Flags.ExecReadOnly)
 
         cls._register_attribute(content_name)
         cls._register_attribute(content)
 
-    @classmethod
-    def _register_traces(cls):
-        log = Trace("log", "CCND log output")
-
-        cls._register_trace(log)
-
     def __init__(self, ec, guid):
         super(LinuxCCNContent, self).__init__(ec, guid)
-
+        self._home = "content-%s" % self.guid
+        self._published = False
+        
     @property
     def ccnr(self):
         ccnr = self.get_connected(LinuxCCNR.rtype())
         if ccnr: return ccnr[0]
         return None
 
+    @property
+    def node(self):
+        if self.ccnr: return self.ccnr.node
+        return None
+
     def deploy(self):
         if not self.get("command"):
             self.set("command", self._default_command)
@@ -70,20 +69,53 @@ class LinuxCCNR(LinuxApplication):
         if not self.get("env"):
             self.set("env", self._default_environment)
 
+        if not self.get("stdin"):
+            # set content to stdin, so the content will be
+            # uploaded during provision
+            self.set("stdin", self.get("content"))
+
         # Wait until associated ccnd is provisioned
         ccnr = self.ccnr
 
-        if not ccnr or ccnr.state < ResourceState.STARTED:
+        if not ccnr or ccnr.state < ResourceState.READY:
+            # ccnr needs to wait until ccnd is deployed and running
             self.ec.schedule(reschedule_delay, self.deploy)
         else:
-            # Add a start after condition so CCNR will not start
-            # before CCND does
-            self.ec.register_condition(self.guid, ResourceAction.START, 
-                ccnd.guid, ResourceState.STARTED)
             # Invoke the actual deployment
             super(LinuxCCNContent, self).deploy()
 
+            # As soon as the ccnr is running we can push the content
+            # to the repository ( we don't want to lose time launching 
+            # writting the content to the repository later on )
+            if self._state == ResourceState.READY:
+                self._start_in_background()
+                self._published = True
+
+    def start(self):
+        # CCNR should already be started by now.
+        # Nothing to do but to set the state to STARTED
+        if self._published:
+            self._start_time = strfnow()
+            self._state = ResourceState.STARTED
+        else:
+            msg = "Failed to execute command '%s'" % command
+            self.error(msg, out, err)
+            self._state = ResourceState.FAILED
+            raise RuntimeError, msg
+
+    @property
+    def state(self):
+        state = super(LinuxCCNContent, self).state
+        if self._state in [ResourceState.FINISHED, ResourceState.FAILED]:
+            self._published = False
+
+        if self._state == ResourceState.READY:
+            # CCND is really deployed only when ccn daemon is running 
+            if not self._published:
+                return ResourceState.PROVISIONED
+        return self._state
+
     @property
     def _default_command(self):
         return "ccnseqwriter -r %s " % self.get("contentName")
index 35d4de2..a393768 100644 (file)
@@ -22,9 +22,8 @@ from nepi.execution.trace import Trace, TraceAttr
 from nepi.execution.resource import ResourceManager, clsinit_copy, ResourceState
 from nepi.resources.linux.application import LinuxApplication
 from nepi.resources.linux.node import OSType
-
-from nepi.util.sshfuncs import ProcStatus
 from nepi.util.timefuncs import strfnow, strfdiff
+
 import os
 
 @clsinit_copy
@@ -46,6 +45,7 @@ class LinuxCCND(LinuxApplication):
             "  128 - face registration debugging \n"
             "  -1 - max logging \n"
             "  Or apply bitwise OR to these values to get combinations of them",
+            type = Types.Integer,
             flags = Flags.ExecReadOnly)
 
         port = Attribute("port", "Sets the CCN_LOCAL_PORT environmental variable. "
@@ -121,6 +121,8 @@ class LinuxCCND(LinuxApplication):
 
     def __init__(self, ec, guid):
         super(LinuxCCND, self).__init__(ec, guid)
+        self._home = "ccnd-%s" % self.guid
+
         # Marks whether daemon is running
         self._running = False
 
@@ -193,9 +195,9 @@ class LinuxCCND(LinuxApplication):
     @property
     def state(self):
         # First check if the ccnd has failed
+        state_check_delay = 0.5
         if self._running and strfdiff(strfnow(), self._last_state_check) > state_check_delay:
-            state_check_delay = 0.5
-            (out, err), proc = self._cndstatus()
+            (out, err), proc = self._ccndstatus
 
             retcode = proc.poll()
 
@@ -206,7 +208,7 @@ class LinuxCCND(LinuxApplication):
             elif retcode:
                 # other errors ...
                 self._running = False
-                msg = " Failed to execute command '%s'" % command
+                msg = " Failed to execute command '%s'" % self.get("command")
                 self.error(msg, out, err)
                 self._state = ResourceState.FAILED
 
@@ -223,7 +225,7 @@ class LinuxCCND(LinuxApplication):
     def _ccndstatus(self):
         env = self.get('env') or ""
         environ = self.node.format_environment(env, inline = True)
-        command = environ + "; ccndstatus"
+        command = environ + " ccndstatus"
         command = self.replace_paths(command)
     
         return self.node.execute(command)
@@ -246,7 +248,7 @@ class LinuxCCND(LinuxApplication):
 
     @property
     def _default_sources(self):
-        return "http://www.ccnx.org/releases/ccnx-0.7.1.tar.gz"
+        return "http://www.ccnx.org/releases/ccnx-0.7.2.tar.gz"
 
     @property
     def _default_build(self):
@@ -302,7 +304,7 @@ class LinuxCCND(LinuxApplication):
             })
 
         env = "PATH=$PATH:${EXP_HOME}/ccnx/bin "
-        env += " ".join(map(lambda k: "%s=%s" % (envs.get(k), self.get(k)) \
+        env += " ".join(map(lambda k: "%s=%s" % (envs.get(k), str(self.get(k))) \
             if self.get(k) else "", envs.keys()))
         
         return env            
index 72ccef4..5103719 100644 (file)
@@ -23,10 +23,8 @@ from nepi.execution.resource import ResourceManager, clsinit_copy, ResourceState
     ResourceAction
 from nepi.resources.linux.application import LinuxApplication
 from nepi.resources.linux.ccn.ccnd import LinuxCCND
-from nepi.resources.linux.node import OSType
-
-from nepi.util.sshfuncs import ProcStatus
 from nepi.util.timefuncs import strfnow, strfdiff
+
 import os
 
 reschedule_delay = "0.5s"
@@ -185,6 +183,8 @@ class LinuxCCNR(LinuxApplication):
 
     def __init__(self, ec, guid):
         super(LinuxCCNR, self).__init__(ec, guid)
+        self._home = "ccnr-%s" % self.guid
+
         # Marks whether ccnr is running
         self._running = False
 
@@ -194,6 +194,11 @@ class LinuxCCNR(LinuxApplication):
         if ccnd: return ccnd[0]
         return None
 
+    @property
+    def node(self):
+        if self.ccnd: return self.ccnd.node
+        return None
+
     def deploy(self):
         if not self.get("command"):
             self.set("command", self._default_command)
@@ -211,9 +216,8 @@ class LinuxCCNR(LinuxApplication):
             # Invoke the actual deployment
             super(LinuxCCNR, self).deploy()
 
-            # As soon as the ccnd sources are deployed, we launch the
-            # daemon ( we don't want to lose time launching the ccn 
-            # daemon later on )
+            # As soon as deployment is finished, we launch the ccnr
+            # command ( we don't want to lose time ccnr later on )
             if self._state == ResourceState.READY:
                 self._start_in_background()
                 self._running = True
@@ -232,8 +236,8 @@ class LinuxCCNR(LinuxApplication):
 
     @property
     def state(self):
-        state = super(LinuxCCNR, self).state()
-        if self._state in [ResourceState.TERMINATED, ResourceState.FAILED]:
+        state = super(LinuxCCNR, self).state
+        if self._state in [ResourceState.FINISHED, ResourceState.FAILED]:
             self._running = False
 
         if self._state == ResourceState.READY:
index 5fe15d9..b1e489c 100755 (executable)
@@ -35,7 +35,6 @@ import unittest
 class LinuxApplicationTestCase(unittest.TestCase):
     def setUp(self):
         self.fedora_host = "nepi2.pl.sophia.inria.fr"
-        self.fedora_host = "planetlab2.u-strasbg.fr"
         self.fedora_user = "inria_nepi"
 
         self.ubuntu_host = "roseval.pl.sophia.inria.fr"
index 6b1f549..0f332f3 100755 (executable)
@@ -35,11 +35,10 @@ import unittest
 class LinuxInterfaceTestCase(unittest.TestCase):
     def setUp(self):
         self.fedora_host = "nepi2.pl.sophia.inria.fr"
-        self.fedora_host = "planetlab2.u-strasbg.fr"
-        self.fedora_user = 'inria_nepi'
+        self.fedora_user = "inria_nepi"
 
-        self.ubuntu_host = 'roseval.pl.sophia.inria.fr'
-        self.ubuntu_user = 'alina'
+        self.ubuntu_host = "roseval.pl.sophia.inria.fr"
+        self.ubuntu_user = "alina"
 
     @skipIfNotAlive
     def t_deploy(self, host, user):
index e4fbcb8..ef3d8df 100755 (executable)
@@ -32,7 +32,6 @@ import unittest
 class LinuxNodeTestCase(unittest.TestCase):
     def setUp(self):
         self.fedora_host = "nepi2.pl.sophia.inria.fr"
-        self.fedora_host = "planetlab2.u-strasbg.fr"
         self.fedora_user = "inria_nepi"
 
         self.ubuntu_host = "roseval.pl.sophia.inria.fr"