More examples and code for Linux CCN RMs
authorAlina Quereilhac <alina.quereilhac@inria.fr>
Fri, 21 Jun 2013 19:06:37 +0000 (12:06 -0700)
committerAlina Quereilhac <alina.quereilhac@inria.fr>
Fri, 21 Jun 2013 19:06:37 +0000 (12:06 -0700)
examples/linux/ccn/vlc_2_hosts_ccndrms.py [deleted file]
examples/linux/ccn/vlc_extended_ring_topo.py [new file with mode: 0755]
src/nepi/execution/ec.py
src/nepi/resources/linux/application.py
src/nepi/resources/linux/ccn/ccncontent.py
src/nepi/resources/linux/ccn/ccnd.py
src/nepi/resources/linux/ccn/ccnr.py
src/nepi/resources/linux/node.py

diff --git a/examples/linux/ccn/vlc_2_hosts_ccndrms.py b/examples/linux/ccn/vlc_2_hosts_ccndrms.py
deleted file mode 100755 (executable)
index d531c1a..0000000
+++ /dev/null
@@ -1,126 +0,0 @@
-#!/usr/bin/env python
-
-#
-#    NEPI, a framework to manage network experiments
-#    Copyright (C) 2013 INRIA
-#
-#    This program is free software: you can redistribute it and/or modify
-#    it under the terms of the GNU General Public License as published by
-#    the Free Software Foundation, either version 3 of the License, or
-#    (at your option) any later version.
-#
-#    This program is distributed in the hope that it will be useful,
-#    but WITHOUT ANY WARRANTY; without even the implied warranty of
-#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
-#    GNU General Public License for more details.
-#
-#    You should have received a copy of the GNU General Public License
-#    along with this program.  If not, see <http://www.gnu.org/licenses/>.
-#
-# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
-
-#
-# topology:
-#
-#           0
-#         /   \
-#  0 --- 0     0 --- 0
-#         \   /
-#           0
-#
-#
-
-from nepi.execution.ec import ExperimentController, ECState 
-from nepi.execution.resource import ResourceState, ResourceAction, \
-        populate_factory
-from nepi.resources.linux.node import OSType
-
-from optparse import OptionParser, SUPPRESS_HELP
-
-import os
-import time
-
-def add_node(ec, host, user, ssh_key = None):
-    node = ec.register_resource("LinuxNode")
-    ec.set(node, "hostname", host)
-    ec.set(node, "username", user)
-    ec.set(node, "identity", ssh_key)
-    ec.set(node, "cleanHome", True)
-    ec.set(node, "cleanProcesses", True)
-    return node
-
-def add_ccnd(ec, node):
-    ccnd = ec.register_resource("LinuxCCND")
-    ec.set(ccnd, "debug", 7)
-    ec.register_connection(ccnd, node)
-    return ccnd
-
-def add_ccnr(ec, ccnd):
-    ccnr = ec.register_resource("LinuxCCNR")
-    ec.register_connection(ccnr, ccnd)
-    return ccnr
-
-def add_fib_entry(ec, ccnd, peer_host):
-    entry = ec.register_resource("LinuxFIBEntry")
-    ec.set(entry, "host", peer_host)
-    ec.register_connection(entry, ccnd)
-    return entry
-
-def add_content(ec, ccnr, content_name, content):
-    co = ec.register_resource("LinuxCCNContent")
-    ec.set(co, "contentName", content_name)
-    ec.set(co, "content", content)
-    ec.register_connection(co, ccnr)
-    return co
-
-def add_stream(ec, ccnd, content_name):
-    command = "sudo -S dbus-uuidgen --ensure ; ( ccncat %s | vlc - ) " % \
-            content_name
-
-    app = ec.register_resource("LinuxCCNDApplication")
-    ec.set(app, "depends", "vlc")
-    ec.set(app, "forwardX11", True)
-    ec.set(app, "command", command)
-    ec.register_connection(app, ccnd)
-
-    return app
-
-if __name__ == '__main__':
-    # Search for available RMs
-    populate_factory()
-    
-    ec = ExperimentController(exp_id = "olahh")
-    
-    # hosts
-    host1 = 'planetlab2.u-strasbg.fr'
-    host2 = 'roseval.pl.sophia.inria.fr'
-
-    # users
-    user1 = "inria_alina"
-    user2 = "alina"
-
-    content_name = "ccnx:/VIDEO"
-    video = "/home/alina/repos/nepi/examples/big_buck_bunny_240p_mpeg4_lq.ts"
-
-    # Register a ResourceManagers (RMs)
-
-    node1 = add_node(ec, host1, user1)
-    ccnd1 = add_ccnd(ec, node1)
-    ccnr1 = add_ccnr(ec, ccnd1)
-    fibentry1 = add_fib_entry(ec, ccnd1, host2)
-    co = add_content(ec, ccnr1, content_name, video)
-
-    node2 = add_node(ec, host2, user2)
-    ccnd2 = add_ccnd(ec, node2)
-    ccnr2 = add_ccnr(ec, ccnd2)
-    fibentry2 = add_fib_entry(ec, ccnd2, host1)
-    app = add_stream(ec, ccnd2, content_name)
-    # Deploy all ResourceManagers
-    ec.deploy()
-
-    ec.wait_finished([app])
-
-    # Shutdown the experiment controller
-    ec.shutdown()
-
diff --git a/examples/linux/ccn/vlc_extended_ring_topo.py b/examples/linux/ccn/vlc_extended_ring_topo.py
new file mode 100755 (executable)
index 0000000..b246c9b
--- /dev/null
@@ -0,0 +1,192 @@
+#!/usr/bin/env python
+
+#
+#    NEPI, a framework to manage network experiments
+#    Copyright (C) 2013 INRIA
+#
+#    This program is free software: you can redistribute it and/or modify
+#    it under the terms of the GNU General Public License as published by
+#    the Free Software Foundation, either version 3 of the License, or
+#    (at your option) any later version.
+#
+#    This program is distributed in the hope that it will be useful,
+#    but WITHOUT ANY WARRANTY; without even the implied warranty of
+#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+#    GNU General Public License for more details.
+#
+#    You should have received a copy of the GNU General Public License
+#    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+#
+# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
+
+#
+# CCN topology:
+#
+#                h2
+#                0 
+#  content   l1 / \ l2         ccncat
+#  b1          /l5 \           b2
+#  0 ----- h1 0 --- 0 h3 ------ 0
+#              \   / 
+#            l4 \ / l3
+#                0
+#                h4
+# Experiment:
+# - t0 : b2 retrives video published in b1
+# - t1 : l1 goes down
+# - t2 : l2 goes down
+# - t3 : l5 goes up
+#
+
+from nepi.execution.ec import ExperimentController, ECState 
+from nepi.execution.resource import ResourceState, ResourceAction, \
+        populate_factory
+from nepi.resources.linux.node import OSType
+
+from optparse import OptionParser, SUPPRESS_HELP
+
+import os
+import time
+import tempfile
+
+def add_node(ec, host, user, ssh_key = None):
+    node = ec.register_resource("LinuxNode")
+    ec.set(node, "hostname", host)
+    ec.set(node, "username", user)
+    ec.set(node, "identity", ssh_key)
+    ec.set(node, "cleanHome", True)
+    ec.set(node, "cleanProcesses", True)
+    return node
+
+def add_ccnd(ec, node):
+    ccnd = ec.register_resource("LinuxCCND")
+    ec.set(ccnd, "debug", 7)
+    ec.register_connection(ccnd, node)
+    return ccnd
+
+def add_ccnr(ec, ccnd):
+    ccnr = ec.register_resource("LinuxCCNR")
+    ec.register_connection(ccnr, ccnd)
+    return ccnr
+
+def add_fib_entry(ec, ccnd, peer_host):
+    entry = ec.register_resource("LinuxFIBEntry")
+    ec.set(entry, "host", peer_host)
+    ec.register_connection(entry, ccnd)
+    return entry
+
+def add_content(ec, ccnr, content_name, content):
+    co = ec.register_resource("LinuxCCNContent")
+    ec.set(co, "contentName", content_name)
+    ec.set(co, "content", content)
+    ec.register_connection(co, ccnr)
+    return co
+
+def add_stream(ec, ccnd, content_name):
+    command = "sudo -S dbus-uuidgen --ensure ; ccncat %s | vlc - " % \
+            content_name
+
+    app = ec.register_resource("LinuxCCNApplication")
+    ec.set(app, "depends", "vlc")
+    ec.set(app, "forwardX11", True)
+    ec.set(app, "command", command)
+    ec.register_connection(app, ccnd)
+
+    return app
+
+if __name__ == '__main__':
+    # Search for available RMs
+    populate_factory()
+    
+    ec = ExperimentController(exp_id = "olahh")
+    
+    # hosts
+    host1 = "planetlab2.u-strasbg.fr"
+    host2 = "planet1.servers.ua.pt"
+    host3 = "planetlab1.cs.uoi.gr"
+    host4 = "planetlab1.aston.ac.uk"
+    host5 = "itchy.comlab.bth.se"
+    host6 = "roseval.pl.sophia.inria.fr"
+
+    # users
+    pluser = "inria_alina"
+    user = "alina"
+
+    content_name = "ccnx:/VIDEO"
+    video = "/home/alina/repos/nepi/examples/big_buck_bunny_240p_mpeg4_lq.ts"
+    """
+    # describe nodes in the central ring
+    ring_hosts = [host1, host2, host3, host4]
+    ccnds = dict()
+
+    for i in xrange(len(ring_hosts)):
+        host = ring_hosts[i]
+        node = add_node(ec, host, pluser)
+        ccnd = add_ccnd(ec, node)
+        ccnr = add_ccnr(ec, ccnd)
+        ccnds[host] = ccnd
+    
+    ## Add ccn ring links
+    # l1 : h1 - h2 , h2 - h1
+    l1u = add_fib_entry(ec, ccnds[host1], host2)
+    l1d = add_fib_entry(ec, ccnds[host2], host1)
+
+    # l2 : h2 - h3 , h3 - h2
+    l2u = add_fib_entry(ec, ccnds[host2], host3)
+    l2d = add_fib_entry(ec, ccnds[host3], host2)
+
+    # l3 : h3 - h4 , h4 - h3
+    l3u = add_fib_entry(ec, ccnds[host3], host4)
+    l3d = add_fib_entry(ec, ccnds[host4], host3)
+
+    # l4 : h4 - h1 , h1 - h4
+    l4u = add_fib_entry(ec, ccnds[host4], host1)
+    l4d = add_fib_entry(ec, ccnds[host1], host4)
+
+    # l5 : h1 - h3 , h3 - h1
+    l5u = add_fib_entry(ec, ccnds[host1], host3)
+    l5d = add_fib_entry(ec, ccnds[host3], host1)
+    """  
+    # border node 1
+    bnode1 = add_node(ec, host5, pluser)
+    ccndb1 = add_ccnd(ec, bnode1)
+    ccnrb1 = add_ccnr(ec, ccndb1)
+    co = add_content(ec, ccnrb1, content_name, video)
+
+    # border node 2
+    bnode2 = add_node(ec, host6, user)
+    ccndb2 = add_ccnd(ec, bnode2)
+    ccnrb2 = add_ccnr(ec, ccndb2)
+    app = add_stream(ec, ccndb2, content_name)
+    # connect border nodes
+    #add_fib_entry(ec, ccndb1, host1)
+    #add_fib_entry(ec, ccnds[host1], host5)
+
+    #add_fib_entry(ec, ccndb2, host3)
+    #add_fib_entry(ec, ccnds[host3], host6)
+    add_fib_entry(ec, ccndb2, host5)
+    add_fib_entry(ec, ccndb1, host6)
+    # deploy all ResourceManagers
+    ec.deploy()
+
+    ec.wait_finished([app])
+    
+    """
+    proc2 = subprocess.Popen(['vlc', 
+        '--ffmpeg-threads=1',
+        '--sub-filter', 'marq', 
+        '--marq-marquee', 
+        '(c) copyright 2008, Blender Foundation / www.bigbuckbunny.org', 
+        '--marq-position=8', 
+        '--no-video-title-show', '-'], 
+        stdin=proc1.stdout, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+
+    dirpath = tempfile.mkdtemp()
+    """
+
+    # shutdown the experiment controller
+    ec.shutdown()
+
index b5663c0..d950804 100644 (file)
@@ -152,9 +152,12 @@ class ExperimentController(object):
         :param guids: List of guids
         :type guids: list
         """
-        return self.wait(guids, states = [ResourceState.STARTED, ResourceState.FINISHED])
+        return self.wait(guids, states = [ResourceState.STARTED,
+            ResourceState.STOPPED,
+            ResourceState.FINISHED])
 
-    def wait(self, guids, states = [ResourceState.FINISHED]):
+    def wait(self, guids, states = [ResourceState.FINISHED, 
+        ResourceState.STOPPED]):
         """ Blocking method that waits until all the RM from the 'guid' list 
             reached state 'state' or until a failure occurs
             
@@ -166,9 +169,14 @@ class ExperimentController(object):
 
         while not all([self.state(guid) in states for guid in guids]) and \
                 not any([self.state(guid) in [
-                        ResourceState.STOPPED, 
                         ResourceState.FAILED] for guid in guids]) and \
                 not self.finished:
+            # debug logging
+            waited = ""
+            for guid in guids:
+                waited += "guid %d - %s \n" % (guid, self.state(guid, hr = True))
+            self.logger.debug(" WAITING FOR %s " % waited )
+            
             # We keep the sleep big to decrease the number of RM state queries
             time.sleep(2)
    
index a68307b..440e2d1 100644 (file)
@@ -111,6 +111,7 @@ class LinuxApplication(ResourceManager):
         self._pid = None
         self._ppid = None
         self._home = "app-%s" % self.guid
+        self._in_foreground = False
 
         # keep a reference to the running process handler when 
         # the command is not executed as remote daemon in background
@@ -155,12 +156,12 @@ class LinuxApplication(ResourceManager):
         This means that command will be executed using 'execute' instead of
         'run' ('run' executes a command in background and detached from the 
         terminal)
-
+        
         When using X11 forwarding option, the command can not run in background
         and detached from a terminal, since we need to keep the terminal attached 
         to interact with it.
         """
-        return self.get("forwardX11") or False
+        return self.get("forwardX11") or self._in_foreground
 
     def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
         self.info("Retrieving '%s' trace %s " % (name, attr))
@@ -312,8 +313,14 @@ class LinuxApplication(ResourceManager):
         if stdin:
             # create dir for sources
             self.info(" Uploading stdin ")
-
+            
             dst = os.path.join(self.app_home, "stdin")
+
+            # TODO:
+            # Check wether file already exists and if it exists 
+            # wether the file we want to upload is the same
+            # (using md5sum)
+
             self.node.upload(stdin, dst, text = True)
 
     def install_dependencies(self):
@@ -391,6 +398,7 @@ class LinuxApplication(ResourceManager):
             # installation), then the application is directly marked as FINISHED
             self._state = ResourceState.FINISHED
         else:
+
             if self.in_foreground:
                 self._start_in_foreground()
             else:
@@ -400,7 +408,6 @@ class LinuxApplication(ResourceManager):
 
     def _start_in_foreground(self):
         command = self.get("command")
-        env = self.get("env")
         stdin = "stdin" if self.get("stdin") else None
         sudo = self.get("sudo") or False
         x11 = self.get("forwardX11")
@@ -409,6 +416,7 @@ class LinuxApplication(ResourceManager):
         # terminal using the node 'execute' in non blocking mode.
 
         # Export environment
+        env = self.get("env")
         environ = self.node.format_environment(env, inline = True)
         command = environ + command
         command = self.replace_paths(command)
@@ -463,7 +471,8 @@ class LinuxApplication(ResourceManager):
         # If the process is not running, check for error information
         # on the remote machine
         if not self.pid or not self.ppid:
-            (out, err), proc = self.check_errors(home, ecodefile, stderr)
+            (out, err), proc = self.node.check_errors(self.app_home,
+                    stderr = stderr) 
 
             # Out is what was written in the stderr file
             if err:
@@ -490,14 +499,17 @@ class LinuxApplication(ResourceManager):
             if self._proc:
                 self._proc.kill()
             else:
-                (out, err), proc = self.node.kill(self.pid, self.ppid)
-
-                if out or err:
-                    # check if execution errors occurred
-                    msg = " Failed to STOP command '%s' " % self.get("command")
-                    self.error(msg, out, err)
-                    self._state = ResourceState.FAILED
-                    stopped = False
+                # Only try to kill the process if the pid and ppid
+                # were retrieved
+                if self.pid and self.ppid:
+                    (out, err), proc = self.node.kill(self.pid, self.ppid)
+
+                    if out or err:
+                        # check if execution errors occurred
+                        msg = " Failed to STOP command '%s' " % self.get("command")
+                        self.error(msg, out, err)
+                        self._state = ResourceState.FAILED
+                        stopped = False
 
             if stopped:
                 super(LinuxApplication, self).stop()
@@ -523,7 +535,7 @@ class LinuxApplication(ResourceManager):
                 # Check if the process we used to execute the command
                 # is still running ...
                 retcode = self._proc.poll()
-                
+
                 # retcode == None -> running
                 # retcode > 0 -> error
                 # retcode == 0 -> finished
index d550011..3b08171 100644 (file)
 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
 
 from nepi.execution.attribute import Attribute, Flags, Types
-from nepi.execution.trace import Trace, TraceAttr
-from nepi.execution.resource import ResourceManager, clsinit_copy, ResourceState, \
+from nepi.execution.resource import clsinit_copy, ResourceState, \
     ResourceAction
-from nepi.resources.linux.application import LinuxApplication
+from nepi.resources.linux.ccn.ccnapplication import LinuxCCNApplication
 from nepi.resources.linux.ccn.ccnr import LinuxCCNR
 from nepi.util.timefuncs import strfnow, strfdiff
 
 import os
 
-reschedule_delay = "0.5s"
-
 @clsinit_copy
-class LinuxCCNContent(LinuxApplication):
+class LinuxCCNContent(LinuxCCNApplication):
     _rtype = "LinuxCCNContent"
 
     @classmethod
@@ -49,7 +46,6 @@ class LinuxCCNContent(LinuxApplication):
     def __init__(self, ec, guid):
         super(LinuxCCNContent, self).__init__(ec, guid)
         self._home = "content-%s" % self.guid
-        self._published = False
         
     @property
     def ccnr(self):
@@ -63,68 +59,63 @@ class LinuxCCNContent(LinuxApplication):
         return None
 
     def deploy(self):
-        if not self.get("command"):
-            self.set("command", self._default_command)
-        
-        if not self.get("env"):
-            self.set("env", self._default_environment)
+        if not self.ccnr or self.ccnr.state < ResourceState.READY:
+            self.debug("---- RESCHEDULING DEPLOY ---- node state %s " % self.node.state )
+            
+            reschedule_delay = "0.5s"
+            # ccnr needs to wait until ccnd is deployed and running
+            self.ec.schedule(reschedule_delay, self.deploy)
+        else:
+            command = self._start_command
+            env = self._environment
+
+            self.set("command", command)
+            self.set("env", env)
 
-        if not self.get("stdin"):
             # set content to stdin, so the content will be
             # uploaded during provision
             self.set("stdin", self.get("content"))
 
-        # Wait until associated ccnd is provisioned
-        ccnr = self.ccnr
+            self.info("Deploying command '%s' " % command)
 
-        if not ccnr or ccnr.state < ResourceState.READY:
-            # ccnr needs to wait until ccnd is deployed and running
-            self.ec.schedule(reschedule_delay, self.deploy)
-        else:
-            # Invoke the actual deployment
-            super(LinuxCCNContent, self).deploy()
+            self.node.mkdir(self.app_home)
 
-            # As soon as the ccnr is running we can push the content
-            # to the repository ( we don't want to lose time launching 
-            # writting the content to the repository later on )
-            if self._state == ResourceState.READY:
-                self._start_in_background()
-                self._published = True
+            # upload content 
+            self.upload_stdin()
+
+            # We want to make sure the content is published
+            # before the experiment starts.
+            # Run the command as a bash script in the background, 
+            # in the host ( but wait until the command has
+            # finished to continue )
+            self.execute_command(command, env)
+
+            self.debug("----- READY ---- ")
+            self._ready_time = strfnow()
+            self._state = ResourceState.READY
 
     def start(self):
-        # CCNR should already be started by now.
-        # Nothing to do but to set the state to STARTED
-        if self._published:
+        if self._state == ResourceState.READY:
+            command = self.get("command")
+            self.info("Starting command '%s'" % command)
+
             self._start_time = strfnow()
             self._state = ResourceState.STARTED
         else:
-            msg = "Failed to execute command '%s'" % command
+            msg = " Failed to execute command '%s'" % command
             self.error(msg, out, err)
             self._state = ResourceState.FAILED
             raise RuntimeError, msg
 
     @property
     def state(self):
-        state = super(LinuxCCNContent, self).state
-        if self._state in [ResourceState.FINISHED, ResourceState.FAILED]:
-            self._published = False
-
-        if self._state == ResourceState.READY:
-            # CCND is really deployed only when ccn daemon is running 
-            if not self._published:
-                return ResourceState.PROVISIONED
         return self._state
 
     @property
-    def _default_command(self):
-        return "ccnseqwriter -r %s " % self.get("contentName")
+    def _start_command(self):
+        return "ccnseqwriter -r %s < %s" % (self.get("contentName"),
+                os.path.join(self.app_home, 'stdin'))
 
-    @property
-    def _default_environment(self):
-        env = "PATH=$PATH:${EXP_HOME}/ccnx/bin "
-        return env            
-        
     def valid_connection(self, guid):
         # TODO: Validate!
         return True
index a393768..bd001b4 100644 (file)
@@ -26,6 +26,8 @@ from nepi.util.timefuncs import strfnow, strfdiff
 
 import os
 
+# TODO: use ccndlogging to dynamically change the logging level
+
 @clsinit_copy
 class LinuxCCND(LinuxApplication):
     _rtype = "LinuxCCND"
@@ -123,41 +125,80 @@ class LinuxCCND(LinuxApplication):
         super(LinuxCCND, self).__init__(ec, guid)
         self._home = "ccnd-%s" % self.guid
 
-        # Marks whether daemon is running
-        self._running = False
-
     def deploy(self):
-        if not self.get("command"):
-            self.set("command", self._default_command)
-        
-        if not self.get("depends"):
-            self.set("depends", self._default_dependencies)
+        if not self.node or self.node.state < ResourceState.READY:
+            self.debug("---- RESCHEDULING DEPLOY ---- node state %s " % self.node.state )
+            
+            reschedule_delay = "0.5s"
+            # ccnr needs to wait until ccnd is deployed and running
+            self.ec.schedule(reschedule_delay, self.deploy)
+        else:
+            if not self.get("command"):
+                self.set("command", self._start_command)
+            
+            if not self.get("depends"):
+                self.set("depends", self._dependencies)
 
-        if not self.get("sources"):
-            self.set("sources", self._default_sources)
+            if not self.get("sources"):
+                self.set("sources", self._sources)
 
-        if not self.get("build"):
-            self.set("build", self._default_build)
+            if not self.get("build"):
+                self.set("build", self._build)
 
-        if not self.get("install"):
-            self.set("install", self._default_install)
+            if not self.get("install"):
+                self.set("install", self._install)
 
-        if not self.get("env"):
-            self.set("env", self._default_environment)
+            if not self.get("env"):
+                self.set("env", self._environment)
 
-        super(LinuxCCND, self).deploy()
+            command = self.get("command")
+            env = self.get("env")
 
-        # As soon as the ccnd sources are deployed, we launch the
-        # daemon ( we don't want to lose time launching the ccn 
-        # daemon later on )
-        if self._state == ResourceState.READY:
-            self._start_in_background()
-            self._running = True
+            self.info("Deploying command '%s' " % command)
+
+            # create home dir for application
+            self.node.mkdir(self.app_home)
+
+            # upload sources
+            self.upload_sources()
+
+            # upload code
+            self.upload_code()
+
+            # upload stdin
+            self.upload_stdin()
+
+            # install dependencies
+            self.install_dependencies()
+
+            # build
+            self.build()
+
+            # Install
+            self.install()
+
+            # We want to make sure the repository is running
+            # before the experiment starts.
+            # Run the command as a bash script in background,
+            # in the host ( but wait until the command has
+            # finished to continue )
+            env = self.replace_paths(env)
+            command = self.replace_paths(command)
+
+            self.node.run_and_wait(command, self.app_home,
+                    env = env,
+                    shfile = "app.sh",
+                    raise_on_error = True)
+    
+            self.debug("----- READY ---- ")
+            self._ready_time = strfnow()
+            self._state = ResourceState.READY
 
     def start(self):
-        # CCND should already be started by now.
-        # Nothing to do but to set the state to STARTED
-        if self._running:
+        if self._state == ResourceState.READY:
+            command = self.get("command")
+            self.info("Starting command '%s'" % command)
+
             self._start_time = strfnow()
             self._state = ResourceState.STARTED
         else:
@@ -189,36 +230,30 @@ class LinuxCCND(LinuxApplication):
                         stdout = "ccndstop_stdout", 
                         stderr = "ccndstop_stderr")
 
-
-            super(LinuxCCND, self).stop()
-
+            self._stop_time = strfnow()
+            self._state = ResourceState.STOPPED
+    
     @property
     def state(self):
         # First check if the ccnd has failed
         state_check_delay = 0.5
-        if self._running and strfdiff(strfnow(), self._last_state_check) > state_check_delay:
+        if self._state == ResourceState.STARTED and \
+                strfdiff(strfnow(), self._last_state_check) > state_check_delay:
             (out, err), proc = self._ccndstatus
 
             retcode = proc.poll()
 
             if retcode == 1 and err.find("No such file or directory") > -1:
                 # ccnd is not running (socket not found)
-                self._running = False
                 self._state = ResourceState.FINISHED
             elif retcode:
                 # other errors ...
-                self._running = False
                 msg = " Failed to execute command '%s'" % self.get("command")
                 self.error(msg, out, err)
                 self._state = ResourceState.FAILED
 
             self._last_state_check = strfnow()
 
-        if self._state == ResourceState.READY:
-            # CCND is really deployed only when ccn daemon is running 
-            if not self._running:
-                return ResourceState.PROVISIONED
-
         return self._state
 
     @property
@@ -231,11 +266,11 @@ class LinuxCCND(LinuxApplication):
         return self.node.execute(command)
 
     @property
-    def _default_command(self):
+    def _start_command(self):
         return "ccndstart"
 
     @property
-    def _default_dependencies(self):
+    def _dependencies(self):
         if self.node.os in [ OSType.FEDORA_12 , OSType.FEDORA_14 ]:
             return ( " autoconf openssl-devel  expat-devel libpcap-devel "
                 " ecryptfs-utils-devel libxml2-devel automake gawk " 
@@ -247,11 +282,11 @@ class LinuxCCND(LinuxApplication):
         return ""
 
     @property
-    def _default_sources(self):
+    def _sources(self):
         return "http://www.ccnx.org/releases/ccnx-0.7.2.tar.gz"
 
     @property
-    def _default_build(self):
+    def _build(self):
         sources = self.get("sources").split(" ")[0]
         sources = os.path.basename(sources)
 
@@ -272,7 +307,7 @@ class LinuxCCND(LinuxApplication):
              " )") % ({ 'sources': sources })
 
     @property
-    def _default_install(self):
+    def _install(self):
         return (
             # Evaluate if ccnx binaries are already installed
             " ( "
@@ -286,7 +321,7 @@ class LinuxCCND(LinuxApplication):
             )
 
     @property
-    def _default_environment(self):
+    def _environment(self):
         envs = dict({
             "debug": "CCND_DEBUG",
             "port": "CCN_LOCAL_PORT",
index 5103719..ef7ac26 100644 (file)
 
 from nepi.execution.attribute import Attribute, Flags, Types
 from nepi.execution.trace import Trace, TraceAttr
-from nepi.execution.resource import ResourceManager, clsinit_copy, ResourceState, \
+from nepi.execution.resource import clsinit_copy, ResourceState, \
     ResourceAction
-from nepi.resources.linux.application import LinuxApplication
+from nepi.resources.linux.ccn.ccnapplication import LinuxCCNApplication
 from nepi.resources.linux.ccn.ccnd import LinuxCCND
 from nepi.util.timefuncs import strfnow, strfdiff
 
 import os
 
-reschedule_delay = "0.5s"
-
 @clsinit_copy
-class LinuxCCNR(LinuxApplication):
+class LinuxCCNR(LinuxCCNApplication):
     _rtype = "LinuxCCNR"
 
     @classmethod
@@ -185,47 +183,49 @@ class LinuxCCNR(LinuxApplication):
         super(LinuxCCNR, self).__init__(ec, guid)
         self._home = "ccnr-%s" % self.guid
 
-        # Marks whether ccnr is running
-        self._running = False
+    def deploy(self):
+        if not self.ccnd or self.ccnd.state < ResourceState.READY:
+            self.debug("---- RESCHEDULING DEPLOY ---- CCND state %s " % self.ccnd.state )
+            
+            reschedule_delay = "0.5s"
+            # ccnr needs to wait until ccnd is deployed and running
+            self.ec.schedule(reschedule_delay, self.deploy)
+        else:
+            command = self._start_command
+            env = self._environment
 
-    @property
-    def ccnd(self):
-        ccnd = self.get_connected(LinuxCCND.rtype())
-        if ccnd: return ccnd[0]
-        return None
+            self.set("command", command)
+            self.set("env", env)
 
-    @property
-    def node(self):
-        if self.ccnd: return self.ccnd.node
-        return None
+            self.info("Deploying command '%s' " % command)
 
-    def deploy(self):
-        if not self.get("command"):
-            self.set("command", self._default_command)
-        
-        if not self.get("env"):
-            self.set("env", self._default_environment)
+            self.node.mkdir(self.app_home)
 
-        # Wait until associated ccnd is provisioned
-        ccnd = self.ccnd
+            # upload sources
+            self.upload_sources()
 
-        if not ccnd or ccnd.state < ResourceState.READY:
-            # ccnr needs to wait until ccnd is deployed and running
-            self.ec.schedule(reschedule_delay, self.deploy)
-        else:
-            # Invoke the actual deployment
-            super(LinuxCCNR, self).deploy()
+            # We want to make sure the repository is running
+            # before the experiment starts.
+            # Run the command as a bash script in background,
+            # in the host ( but wait until the command has
+            # finished to continue )
+            env = self.replace_paths(env)
+            command = self.replace_paths(command)
+
+            self.node.run_and_wait(command, self.app_home,
+                    env = env,
+                    shfile = "app.sh",
+                    raise_on_error = True)
 
-            # As soon as deployment is finished, we launch the ccnr
-            # command ( we don't want to lose time ccnr later on )
-            if self._state == ResourceState.READY:
-                self._start_in_background()
-                self._running = True
+            self.debug("----- READY ---- ")
+            self._ready_time = strfnow()
+            self._state = ResourceState.READY
 
     def start(self):
-        # CCND should already be started by now.
-        # Nothing to do but to set the state to STARTED
-        if self._running:
+        if self._state == ResourceState.READY:
+            command = self.get("command")
+            self.info("Starting command '%s'" % command)
+
             self._start_time = strfnow()
             self._state = ResourceState.STARTED
         else:
@@ -235,24 +235,11 @@ class LinuxCCNR(LinuxApplication):
             raise RuntimeError, msg
 
     @property
-    def state(self):
-        state = super(LinuxCCNR, self).state
-        if self._state in [ResourceState.FINISHED, ResourceState.FAILED]:
-            self._running = False
-
-        if self._state == ResourceState.READY:
-            # CCND is really deployed only when ccn daemon is running 
-            if not self._running:
-                return ResourceState.PROVISIONED
-        return self._state
+    def _start_command(self):
+        return "ccnr &"
 
     @property
-    def _default_command(self):
-        return "ccnr"
-
-    @property
-    def _default_environment(self):
+    def _environment(self):
         envs = dict({
             "maxFanout": "CCNR_BTREE_MAX_FANOUT",
             "maxLeafEntries": "CCNR_BTREE_MAX_LEAF_ENTRIES",
@@ -285,7 +272,7 @@ class LinuxCCNR(LinuxApplication):
         env = "PATH=$PATH:${EXP_HOME}/ccnx/bin "
         env += " ".join(map(lambda k: "%s=%s" % (envs.get(k), self.get(k)) \
             if self.get(k) else "", envs.keys()))
-        
+       
         return env            
         
     def valid_connection(self, guid):
index 44bd29d..a96267f 100644 (file)
@@ -429,12 +429,10 @@ class LinuxNode(ResourceManager):
             sudo = False,
             tty = False,
             raise_on_error = False):
-        """ 
-        runs a command in background on the remote host, busy-waiting
-        until the command finishes execution.
-        This is more robust than doing a simple synchronized 'execute',
-        since in the remote host the command can continue to run detached
-        even if network disconnections occur
+        """
+        Uploads the 'command' to a bash script in the host.
+        Then runs the script detached in background in the host, and
+        busy-waites until the script finishes executing.
         """
         self.upload_command(command, home, 
             shfile = shfile, 
@@ -537,7 +535,9 @@ class LinuxNode(ResourceManager):
         as a bash script (i.e. export PYTHONPATH=src/.. \n export LALA=.. \n)
         """
         if not env: return ""
-        env = env.strip()
+
+        # Remove extra white spaces
+        env = re.sub(r'\s+', ' ', env.strip())
 
         sep = ";" if inline else "\n"
         return sep.join(map(lambda e: " export %s" % e, env.split(" "))) + sep