+++ /dev/null
-#!/usr/bin/env python
-
-#
-# NEPI, a framework to manage network experiments
-# Copyright (C) 2013 INRIA
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-#
-# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
-
-#
-# topology:
-#
-# 0
-# / \
-# 0 --- 0 0 --- 0
-# \ /
-# 0
-#
-#
-
-from nepi.execution.ec import ExperimentController, ECState
-from nepi.execution.resource import ResourceState, ResourceAction, \
- populate_factory
-from nepi.resources.linux.node import OSType
-
-from optparse import OptionParser, SUPPRESS_HELP
-
-import os
-import time
-
-def add_node(ec, host, user, ssh_key = None):
- node = ec.register_resource("LinuxNode")
- ec.set(node, "hostname", host)
- ec.set(node, "username", user)
- ec.set(node, "identity", ssh_key)
- ec.set(node, "cleanHome", True)
- ec.set(node, "cleanProcesses", True)
- return node
-
-def add_ccnd(ec, node):
- ccnd = ec.register_resource("LinuxCCND")
- ec.set(ccnd, "debug", 7)
- ec.register_connection(ccnd, node)
- return ccnd
-
-def add_ccnr(ec, ccnd):
- ccnr = ec.register_resource("LinuxCCNR")
- ec.register_connection(ccnr, ccnd)
- return ccnr
-
-def add_fib_entry(ec, ccnd, peer_host):
- entry = ec.register_resource("LinuxFIBEntry")
- ec.set(entry, "host", peer_host)
- ec.register_connection(entry, ccnd)
- return entry
-
-def add_content(ec, ccnr, content_name, content):
- co = ec.register_resource("LinuxCCNContent")
- ec.set(co, "contentName", content_name)
- ec.set(co, "content", content)
- ec.register_connection(co, ccnr)
- return co
-
-def add_stream(ec, ccnd, content_name):
- command = "sudo -S dbus-uuidgen --ensure ; ( ccncat %s | vlc - ) " % \
- content_name
-
- app = ec.register_resource("LinuxCCNDApplication")
- ec.set(app, "depends", "vlc")
- ec.set(app, "forwardX11", True)
- ec.set(app, "command", command)
- ec.register_connection(app, ccnd)
-
- return app
-
-if __name__ == '__main__':
- # Search for available RMs
- populate_factory()
-
- ec = ExperimentController(exp_id = "olahh")
-
- # hosts
- host1 = 'planetlab2.u-strasbg.fr'
- host2 = 'roseval.pl.sophia.inria.fr'
-
- # users
- user1 = "inria_alina"
- user2 = "alina"
-
- content_name = "ccnx:/VIDEO"
- video = "/home/alina/repos/nepi/examples/big_buck_bunny_240p_mpeg4_lq.ts"
-
- # Register a ResourceManagers (RMs)
-
- node1 = add_node(ec, host1, user1)
- ccnd1 = add_ccnd(ec, node1)
- ccnr1 = add_ccnr(ec, ccnd1)
- fibentry1 = add_fib_entry(ec, ccnd1, host2)
- co = add_content(ec, ccnr1, content_name, video)
-
- node2 = add_node(ec, host2, user2)
- ccnd2 = add_ccnd(ec, node2)
- ccnr2 = add_ccnr(ec, ccnd2)
- fibentry2 = add_fib_entry(ec, ccnd2, host1)
- app = add_stream(ec, ccnd2, content_name)
-
- # Deploy all ResourceManagers
- ec.deploy()
-
- ec.wait_finished([app])
-
- # Shutdown the experiment controller
- ec.shutdown()
-
--- /dev/null
+#!/usr/bin/env python
+
+#
+# NEPI, a framework to manage network experiments
+# Copyright (C) 2013 INRIA
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
+
+#
+# CCN topology:
+#
+# h2
+# 0
+# content l1 / \ l2 ccncat
+# b1 /l5 \ b2
+# 0 ----- h1 0 --- 0 h3 ------ 0
+# \ /
+# l4 \ / l3
+# 0
+# h4
+# Experiment:
+# - t0 : b2 retrives video published in b1
+# - t1 : l1 goes down
+# - t2 : l2 goes down
+# - t3 : l5 goes up
+#
+
+from nepi.execution.ec import ExperimentController, ECState
+from nepi.execution.resource import ResourceState, ResourceAction, \
+ populate_factory
+from nepi.resources.linux.node import OSType
+
+from optparse import OptionParser, SUPPRESS_HELP
+
+import os
+import time
+import tempfile
+
+def add_node(ec, host, user, ssh_key = None):
+ node = ec.register_resource("LinuxNode")
+ ec.set(node, "hostname", host)
+ ec.set(node, "username", user)
+ ec.set(node, "identity", ssh_key)
+ ec.set(node, "cleanHome", True)
+ ec.set(node, "cleanProcesses", True)
+ return node
+
+def add_ccnd(ec, node):
+ ccnd = ec.register_resource("LinuxCCND")
+ ec.set(ccnd, "debug", 7)
+ ec.register_connection(ccnd, node)
+ return ccnd
+
+def add_ccnr(ec, ccnd):
+ ccnr = ec.register_resource("LinuxCCNR")
+ ec.register_connection(ccnr, ccnd)
+ return ccnr
+
+def add_fib_entry(ec, ccnd, peer_host):
+ entry = ec.register_resource("LinuxFIBEntry")
+ ec.set(entry, "host", peer_host)
+ ec.register_connection(entry, ccnd)
+ return entry
+
+def add_content(ec, ccnr, content_name, content):
+ co = ec.register_resource("LinuxCCNContent")
+ ec.set(co, "contentName", content_name)
+ ec.set(co, "content", content)
+ ec.register_connection(co, ccnr)
+ return co
+
+def add_stream(ec, ccnd, content_name):
+ command = "sudo -S dbus-uuidgen --ensure ; ccncat %s | vlc - " % \
+ content_name
+
+ app = ec.register_resource("LinuxCCNApplication")
+ ec.set(app, "depends", "vlc")
+ ec.set(app, "forwardX11", True)
+ ec.set(app, "command", command)
+ ec.register_connection(app, ccnd)
+
+ return app
+
+if __name__ == '__main__':
+ # Search for available RMs
+ populate_factory()
+
+ ec = ExperimentController(exp_id = "olahh")
+
+ # hosts
+ host1 = "planetlab2.u-strasbg.fr"
+ host2 = "planet1.servers.ua.pt"
+ host3 = "planetlab1.cs.uoi.gr"
+ host4 = "planetlab1.aston.ac.uk"
+ host5 = "itchy.comlab.bth.se"
+ host6 = "roseval.pl.sophia.inria.fr"
+
+ # users
+ pluser = "inria_alina"
+ user = "alina"
+
+ content_name = "ccnx:/VIDEO"
+ video = "/home/alina/repos/nepi/examples/big_buck_bunny_240p_mpeg4_lq.ts"
+ """
+ # describe nodes in the central ring
+ ring_hosts = [host1, host2, host3, host4]
+ ccnds = dict()
+
+ for i in xrange(len(ring_hosts)):
+ host = ring_hosts[i]
+ node = add_node(ec, host, pluser)
+ ccnd = add_ccnd(ec, node)
+ ccnr = add_ccnr(ec, ccnd)
+ ccnds[host] = ccnd
+
+ ## Add ccn ring links
+ # l1 : h1 - h2 , h2 - h1
+ l1u = add_fib_entry(ec, ccnds[host1], host2)
+ l1d = add_fib_entry(ec, ccnds[host2], host1)
+
+ # l2 : h2 - h3 , h3 - h2
+ l2u = add_fib_entry(ec, ccnds[host2], host3)
+ l2d = add_fib_entry(ec, ccnds[host3], host2)
+
+ # l3 : h3 - h4 , h4 - h3
+ l3u = add_fib_entry(ec, ccnds[host3], host4)
+ l3d = add_fib_entry(ec, ccnds[host4], host3)
+
+ # l4 : h4 - h1 , h1 - h4
+ l4u = add_fib_entry(ec, ccnds[host4], host1)
+ l4d = add_fib_entry(ec, ccnds[host1], host4)
+
+ # l5 : h1 - h3 , h3 - h1
+ l5u = add_fib_entry(ec, ccnds[host1], host3)
+ l5d = add_fib_entry(ec, ccnds[host3], host1)
+ """
+ # border node 1
+ bnode1 = add_node(ec, host5, pluser)
+ ccndb1 = add_ccnd(ec, bnode1)
+ ccnrb1 = add_ccnr(ec, ccndb1)
+ co = add_content(ec, ccnrb1, content_name, video)
+
+ # border node 2
+ bnode2 = add_node(ec, host6, user)
+ ccndb2 = add_ccnd(ec, bnode2)
+ ccnrb2 = add_ccnr(ec, ccndb2)
+ app = add_stream(ec, ccndb2, content_name)
+
+ # connect border nodes
+ #add_fib_entry(ec, ccndb1, host1)
+ #add_fib_entry(ec, ccnds[host1], host5)
+
+ #add_fib_entry(ec, ccndb2, host3)
+ #add_fib_entry(ec, ccnds[host3], host6)
+
+ add_fib_entry(ec, ccndb2, host5)
+ add_fib_entry(ec, ccndb1, host6)
+
+ # deploy all ResourceManagers
+ ec.deploy()
+
+ ec.wait_finished([app])
+
+ """
+ proc2 = subprocess.Popen(['vlc',
+ '--ffmpeg-threads=1',
+ '--sub-filter', 'marq',
+ '--marq-marquee',
+ '(c) copyright 2008, Blender Foundation / www.bigbuckbunny.org',
+ '--marq-position=8',
+ '--no-video-title-show', '-'],
+ stdin=proc1.stdout, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+
+ dirpath = tempfile.mkdtemp()
+ """
+
+ # shutdown the experiment controller
+ ec.shutdown()
+
:param guids: List of guids
:type guids: list
"""
- return self.wait(guids, states = [ResourceState.STARTED, ResourceState.FINISHED])
+ return self.wait(guids, states = [ResourceState.STARTED,
+ ResourceState.STOPPED,
+ ResourceState.FINISHED])
- def wait(self, guids, states = [ResourceState.FINISHED]):
+ def wait(self, guids, states = [ResourceState.FINISHED,
+ ResourceState.STOPPED]):
""" Blocking method that waits until all the RM from the 'guid' list
reached state 'state' or until a failure occurs
while not all([self.state(guid) in states for guid in guids]) and \
not any([self.state(guid) in [
- ResourceState.STOPPED,
ResourceState.FAILED] for guid in guids]) and \
not self.finished:
+ # debug logging
+ waited = ""
+ for guid in guids:
+ waited += "guid %d - %s \n" % (guid, self.state(guid, hr = True))
+ self.logger.debug(" WAITING FOR %s " % waited )
+
# We keep the sleep big to decrease the number of RM state queries
time.sleep(2)
self._pid = None
self._ppid = None
self._home = "app-%s" % self.guid
+ self._in_foreground = False
# keep a reference to the running process handler when
# the command is not executed as remote daemon in background
This means that command will be executed using 'execute' instead of
'run' ('run' executes a command in background and detached from the
terminal)
-
+
When using X11 forwarding option, the command can not run in background
and detached from a terminal, since we need to keep the terminal attached
to interact with it.
"""
- return self.get("forwardX11") or False
+ return self.get("forwardX11") or self._in_foreground
def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
self.info("Retrieving '%s' trace %s " % (name, attr))
if stdin:
# create dir for sources
self.info(" Uploading stdin ")
-
+
dst = os.path.join(self.app_home, "stdin")
+
+ # TODO:
+ # Check wether file already exists and if it exists
+ # wether the file we want to upload is the same
+ # (using md5sum)
+
self.node.upload(stdin, dst, text = True)
def install_dependencies(self):
# installation), then the application is directly marked as FINISHED
self._state = ResourceState.FINISHED
else:
+
if self.in_foreground:
self._start_in_foreground()
else:
def _start_in_foreground(self):
command = self.get("command")
- env = self.get("env")
stdin = "stdin" if self.get("stdin") else None
sudo = self.get("sudo") or False
x11 = self.get("forwardX11")
# terminal using the node 'execute' in non blocking mode.
# Export environment
+ env = self.get("env")
environ = self.node.format_environment(env, inline = True)
command = environ + command
command = self.replace_paths(command)
# If the process is not running, check for error information
# on the remote machine
if not self.pid or not self.ppid:
- (out, err), proc = self.check_errors(home, ecodefile, stderr)
+ (out, err), proc = self.node.check_errors(self.app_home,
+ stderr = stderr)
# Out is what was written in the stderr file
if err:
if self._proc:
self._proc.kill()
else:
- (out, err), proc = self.node.kill(self.pid, self.ppid)
-
- if out or err:
- # check if execution errors occurred
- msg = " Failed to STOP command '%s' " % self.get("command")
- self.error(msg, out, err)
- self._state = ResourceState.FAILED
- stopped = False
+ # Only try to kill the process if the pid and ppid
+ # were retrieved
+ if self.pid and self.ppid:
+ (out, err), proc = self.node.kill(self.pid, self.ppid)
+
+ if out or err:
+ # check if execution errors occurred
+ msg = " Failed to STOP command '%s' " % self.get("command")
+ self.error(msg, out, err)
+ self._state = ResourceState.FAILED
+ stopped = False
if stopped:
super(LinuxApplication, self).stop()
# Check if the process we used to execute the command
# is still running ...
retcode = self._proc.poll()
-
+
# retcode == None -> running
# retcode > 0 -> error
# retcode == 0 -> finished
# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
from nepi.execution.attribute import Attribute, Flags, Types
-from nepi.execution.trace import Trace, TraceAttr
-from nepi.execution.resource import ResourceManager, clsinit_copy, ResourceState, \
+from nepi.execution.resource import clsinit_copy, ResourceState, \
ResourceAction
-from nepi.resources.linux.application import LinuxApplication
+from nepi.resources.linux.ccn.ccnapplication import LinuxCCNApplication
from nepi.resources.linux.ccn.ccnr import LinuxCCNR
from nepi.util.timefuncs import strfnow, strfdiff
import os
-reschedule_delay = "0.5s"
-
@clsinit_copy
-class LinuxCCNContent(LinuxApplication):
+class LinuxCCNContent(LinuxCCNApplication):
_rtype = "LinuxCCNContent"
@classmethod
def __init__(self, ec, guid):
super(LinuxCCNContent, self).__init__(ec, guid)
self._home = "content-%s" % self.guid
- self._published = False
@property
def ccnr(self):
return None
def deploy(self):
- if not self.get("command"):
- self.set("command", self._default_command)
-
- if not self.get("env"):
- self.set("env", self._default_environment)
+ if not self.ccnr or self.ccnr.state < ResourceState.READY:
+ self.debug("---- RESCHEDULING DEPLOY ---- node state %s " % self.node.state )
+
+ reschedule_delay = "0.5s"
+ # ccnr needs to wait until ccnd is deployed and running
+ self.ec.schedule(reschedule_delay, self.deploy)
+ else:
+ command = self._start_command
+ env = self._environment
+
+ self.set("command", command)
+ self.set("env", env)
- if not self.get("stdin"):
# set content to stdin, so the content will be
# uploaded during provision
self.set("stdin", self.get("content"))
- # Wait until associated ccnd is provisioned
- ccnr = self.ccnr
+ self.info("Deploying command '%s' " % command)
- if not ccnr or ccnr.state < ResourceState.READY:
- # ccnr needs to wait until ccnd is deployed and running
- self.ec.schedule(reschedule_delay, self.deploy)
- else:
- # Invoke the actual deployment
- super(LinuxCCNContent, self).deploy()
+ self.node.mkdir(self.app_home)
- # As soon as the ccnr is running we can push the content
- # to the repository ( we don't want to lose time launching
- # writting the content to the repository later on )
- if self._state == ResourceState.READY:
- self._start_in_background()
- self._published = True
+ # upload content
+ self.upload_stdin()
+
+ # We want to make sure the content is published
+ # before the experiment starts.
+ # Run the command as a bash script in the background,
+ # in the host ( but wait until the command has
+ # finished to continue )
+ self.execute_command(command, env)
+
+ self.debug("----- READY ---- ")
+ self._ready_time = strfnow()
+ self._state = ResourceState.READY
def start(self):
- # CCNR should already be started by now.
- # Nothing to do but to set the state to STARTED
- if self._published:
+ if self._state == ResourceState.READY:
+ command = self.get("command")
+ self.info("Starting command '%s'" % command)
+
self._start_time = strfnow()
self._state = ResourceState.STARTED
else:
- msg = "Failed to execute command '%s'" % command
+ msg = " Failed to execute command '%s'" % command
self.error(msg, out, err)
self._state = ResourceState.FAILED
raise RuntimeError, msg
@property
def state(self):
- state = super(LinuxCCNContent, self).state
- if self._state in [ResourceState.FINISHED, ResourceState.FAILED]:
- self._published = False
-
- if self._state == ResourceState.READY:
- # CCND is really deployed only when ccn daemon is running
- if not self._published:
- return ResourceState.PROVISIONED
-
return self._state
@property
- def _default_command(self):
- return "ccnseqwriter -r %s " % self.get("contentName")
+ def _start_command(self):
+ return "ccnseqwriter -r %s < %s" % (self.get("contentName"),
+ os.path.join(self.app_home, 'stdin'))
- @property
- def _default_environment(self):
- env = "PATH=$PATH:${EXP_HOME}/ccnx/bin "
- return env
-
def valid_connection(self, guid):
# TODO: Validate!
return True
import os
+# TODO: use ccndlogging to dynamically change the logging level
+
@clsinit_copy
class LinuxCCND(LinuxApplication):
_rtype = "LinuxCCND"
super(LinuxCCND, self).__init__(ec, guid)
self._home = "ccnd-%s" % self.guid
- # Marks whether daemon is running
- self._running = False
-
def deploy(self):
- if not self.get("command"):
- self.set("command", self._default_command)
-
- if not self.get("depends"):
- self.set("depends", self._default_dependencies)
+ if not self.node or self.node.state < ResourceState.READY:
+ self.debug("---- RESCHEDULING DEPLOY ---- node state %s " % self.node.state )
+
+ reschedule_delay = "0.5s"
+ # ccnr needs to wait until ccnd is deployed and running
+ self.ec.schedule(reschedule_delay, self.deploy)
+ else:
+ if not self.get("command"):
+ self.set("command", self._start_command)
+
+ if not self.get("depends"):
+ self.set("depends", self._dependencies)
- if not self.get("sources"):
- self.set("sources", self._default_sources)
+ if not self.get("sources"):
+ self.set("sources", self._sources)
- if not self.get("build"):
- self.set("build", self._default_build)
+ if not self.get("build"):
+ self.set("build", self._build)
- if not self.get("install"):
- self.set("install", self._default_install)
+ if not self.get("install"):
+ self.set("install", self._install)
- if not self.get("env"):
- self.set("env", self._default_environment)
+ if not self.get("env"):
+ self.set("env", self._environment)
- super(LinuxCCND, self).deploy()
+ command = self.get("command")
+ env = self.get("env")
- # As soon as the ccnd sources are deployed, we launch the
- # daemon ( we don't want to lose time launching the ccn
- # daemon later on )
- if self._state == ResourceState.READY:
- self._start_in_background()
- self._running = True
+ self.info("Deploying command '%s' " % command)
+
+ # create home dir for application
+ self.node.mkdir(self.app_home)
+
+ # upload sources
+ self.upload_sources()
+
+ # upload code
+ self.upload_code()
+
+ # upload stdin
+ self.upload_stdin()
+
+ # install dependencies
+ self.install_dependencies()
+
+ # build
+ self.build()
+
+ # Install
+ self.install()
+
+ # We want to make sure the repository is running
+ # before the experiment starts.
+ # Run the command as a bash script in background,
+ # in the host ( but wait until the command has
+ # finished to continue )
+ env = self.replace_paths(env)
+ command = self.replace_paths(command)
+
+ self.node.run_and_wait(command, self.app_home,
+ env = env,
+ shfile = "app.sh",
+ raise_on_error = True)
+
+ self.debug("----- READY ---- ")
+ self._ready_time = strfnow()
+ self._state = ResourceState.READY
def start(self):
- # CCND should already be started by now.
- # Nothing to do but to set the state to STARTED
- if self._running:
+ if self._state == ResourceState.READY:
+ command = self.get("command")
+ self.info("Starting command '%s'" % command)
+
self._start_time = strfnow()
self._state = ResourceState.STARTED
else:
stdout = "ccndstop_stdout",
stderr = "ccndstop_stderr")
-
- super(LinuxCCND, self).stop()
-
+ self._stop_time = strfnow()
+ self._state = ResourceState.STOPPED
+
@property
def state(self):
# First check if the ccnd has failed
state_check_delay = 0.5
- if self._running and strfdiff(strfnow(), self._last_state_check) > state_check_delay:
+ if self._state == ResourceState.STARTED and \
+ strfdiff(strfnow(), self._last_state_check) > state_check_delay:
(out, err), proc = self._ccndstatus
retcode = proc.poll()
if retcode == 1 and err.find("No such file or directory") > -1:
# ccnd is not running (socket not found)
- self._running = False
self._state = ResourceState.FINISHED
elif retcode:
# other errors ...
- self._running = False
msg = " Failed to execute command '%s'" % self.get("command")
self.error(msg, out, err)
self._state = ResourceState.FAILED
self._last_state_check = strfnow()
- if self._state == ResourceState.READY:
- # CCND is really deployed only when ccn daemon is running
- if not self._running:
- return ResourceState.PROVISIONED
-
return self._state
@property
return self.node.execute(command)
@property
- def _default_command(self):
+ def _start_command(self):
return "ccndstart"
@property
- def _default_dependencies(self):
+ def _dependencies(self):
if self.node.os in [ OSType.FEDORA_12 , OSType.FEDORA_14 ]:
return ( " autoconf openssl-devel expat-devel libpcap-devel "
" ecryptfs-utils-devel libxml2-devel automake gawk "
return ""
@property
- def _default_sources(self):
+ def _sources(self):
return "http://www.ccnx.org/releases/ccnx-0.7.2.tar.gz"
@property
- def _default_build(self):
+ def _build(self):
sources = self.get("sources").split(" ")[0]
sources = os.path.basename(sources)
" )") % ({ 'sources': sources })
@property
- def _default_install(self):
+ def _install(self):
return (
# Evaluate if ccnx binaries are already installed
" ( "
)
@property
- def _default_environment(self):
+ def _environment(self):
envs = dict({
"debug": "CCND_DEBUG",
"port": "CCN_LOCAL_PORT",
from nepi.execution.attribute import Attribute, Flags, Types
from nepi.execution.trace import Trace, TraceAttr
-from nepi.execution.resource import ResourceManager, clsinit_copy, ResourceState, \
+from nepi.execution.resource import clsinit_copy, ResourceState, \
ResourceAction
-from nepi.resources.linux.application import LinuxApplication
+from nepi.resources.linux.ccn.ccnapplication import LinuxCCNApplication
from nepi.resources.linux.ccn.ccnd import LinuxCCND
from nepi.util.timefuncs import strfnow, strfdiff
import os
-reschedule_delay = "0.5s"
-
@clsinit_copy
-class LinuxCCNR(LinuxApplication):
+class LinuxCCNR(LinuxCCNApplication):
_rtype = "LinuxCCNR"
@classmethod
super(LinuxCCNR, self).__init__(ec, guid)
self._home = "ccnr-%s" % self.guid
- # Marks whether ccnr is running
- self._running = False
+ def deploy(self):
+ if not self.ccnd or self.ccnd.state < ResourceState.READY:
+ self.debug("---- RESCHEDULING DEPLOY ---- CCND state %s " % self.ccnd.state )
+
+ reschedule_delay = "0.5s"
+ # ccnr needs to wait until ccnd is deployed and running
+ self.ec.schedule(reschedule_delay, self.deploy)
+ else:
+ command = self._start_command
+ env = self._environment
- @property
- def ccnd(self):
- ccnd = self.get_connected(LinuxCCND.rtype())
- if ccnd: return ccnd[0]
- return None
+ self.set("command", command)
+ self.set("env", env)
- @property
- def node(self):
- if self.ccnd: return self.ccnd.node
- return None
+ self.info("Deploying command '%s' " % command)
- def deploy(self):
- if not self.get("command"):
- self.set("command", self._default_command)
-
- if not self.get("env"):
- self.set("env", self._default_environment)
+ self.node.mkdir(self.app_home)
- # Wait until associated ccnd is provisioned
- ccnd = self.ccnd
+ # upload sources
+ self.upload_sources()
- if not ccnd or ccnd.state < ResourceState.READY:
- # ccnr needs to wait until ccnd is deployed and running
- self.ec.schedule(reschedule_delay, self.deploy)
- else:
- # Invoke the actual deployment
- super(LinuxCCNR, self).deploy()
+ # We want to make sure the repository is running
+ # before the experiment starts.
+ # Run the command as a bash script in background,
+ # in the host ( but wait until the command has
+ # finished to continue )
+ env = self.replace_paths(env)
+ command = self.replace_paths(command)
+
+ self.node.run_and_wait(command, self.app_home,
+ env = env,
+ shfile = "app.sh",
+ raise_on_error = True)
- # As soon as deployment is finished, we launch the ccnr
- # command ( we don't want to lose time ccnr later on )
- if self._state == ResourceState.READY:
- self._start_in_background()
- self._running = True
+ self.debug("----- READY ---- ")
+ self._ready_time = strfnow()
+ self._state = ResourceState.READY
def start(self):
- # CCND should already be started by now.
- # Nothing to do but to set the state to STARTED
- if self._running:
+ if self._state == ResourceState.READY:
+ command = self.get("command")
+ self.info("Starting command '%s'" % command)
+
self._start_time = strfnow()
self._state = ResourceState.STARTED
else:
raise RuntimeError, msg
@property
- def state(self):
- state = super(LinuxCCNR, self).state
- if self._state in [ResourceState.FINISHED, ResourceState.FAILED]:
- self._running = False
-
- if self._state == ResourceState.READY:
- # CCND is really deployed only when ccn daemon is running
- if not self._running:
- return ResourceState.PROVISIONED
-
- return self._state
+ def _start_command(self):
+ return "ccnr &"
@property
- def _default_command(self):
- return "ccnr"
-
- @property
- def _default_environment(self):
+ def _environment(self):
envs = dict({
"maxFanout": "CCNR_BTREE_MAX_FANOUT",
"maxLeafEntries": "CCNR_BTREE_MAX_LEAF_ENTRIES",
env = "PATH=$PATH:${EXP_HOME}/ccnx/bin "
env += " ".join(map(lambda k: "%s=%s" % (envs.get(k), self.get(k)) \
if self.get(k) else "", envs.keys()))
-
+
return env
def valid_connection(self, guid):
sudo = False,
tty = False,
raise_on_error = False):
- """
- runs a command in background on the remote host, busy-waiting
- until the command finishes execution.
- This is more robust than doing a simple synchronized 'execute',
- since in the remote host the command can continue to run detached
- even if network disconnections occur
+ """
+ Uploads the 'command' to a bash script in the host.
+ Then runs the script detached in background in the host, and
+ busy-waites until the script finishes executing.
"""
self.upload_command(command, home,
shfile = shfile,
as a bash script (i.e. export PYTHONPATH=src/.. \n export LALA=.. \n)
"""
if not env: return ""
- env = env.strip()
+
+ # Remove extra white spaces
+ env = re.sub(r'\s+', ' ', env.strip())
sep = ";" if inline else "\n"
return sep.join(map(lambda e: " export %s" % e, env.split(" "))) + sep