ec.set(node, "hostname", host)
ec.set(node, "username", user)
ec.set(node, "identity", ssh_key)
- #ec.set(node, "cleanHome", True)
+ ec.set(node, "cleanHome", True)
ec.set(node, "cleanProcesses", True)
return node
--- /dev/null
+#!/usr/bin/env python
+
+#
+# NEPI, a framework to manage network experiments
+# Copyright (C) 2013 INRIA
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
+
+from nepi.execution.ec import ExperimentController, ECState
+from nepi.execution.resource import ResourceState, ResourceAction, \
+ populate_factory
+from nepi.resources.linux.node import OSType
+
+from optparse import OptionParser, SUPPRESS_HELP
+
+import os
+import time
+
+def add_node(ec, host, user, ssh_key = None):
+ node = ec.register_resource("LinuxNode")
+ ec.set(node, "hostname", host)
+ ec.set(node, "username", user)
+ ec.set(node, "identity", ssh_key)
+ ec.set(node, "cleanHome", True)
+ ec.set(node, "cleanProcesses", True)
+ return node
+
+def add_ccnd(ec, node):
+ ccnd = ec.register_resource("LinuxCCND")
+ ec.register_connection(ccnd, node)
+ return ccnd
+
+def add_ccnr(ec, ccnd, node):
+ ccnr = ec.register_resource("LinuxCCNR")
+ ec.register_connection(ccnr, node)
+ ec.register_connection(ccnr, ccnd)
+
+ return ccnr
+
+if __name__ == '__main__':
+ # Search for available RMs
+ populate_factory()
+
+ ec = ExperimentController(exp_id = "olahh")
+
+ # hosts
+ host1 = 'planetlab2.u-strasbg.fr'
+ host2 = 'roseval.pl.sophia.inria.fr'
+
+ # users
+ user1 = "inria_alina"
+ user2 = "alina"
+
+ # Register a ResourceManagers (RMs)
+
+ node1 = add_node(ec, host1, user1)
+ ccnd1 = add_ccnd(ec, node1)
+ ccnr1 = add_ccnr(ec, ccnd1, node1)
+
+ node2 = add_node(ec, host2, user2)
+ ccnd2 = add_ccnd(ec, node2)
+ ccnr2 = add_ccnr(ec, ccnd2, node2)
+
+ # Deploy all ResourceManagers
+ ec.deploy()
+
+ ec.wait_started([ccnd1, ccnr1, ccnd2, ccnr2])
+
+ # Shutdown the experiment controller
+ ec.shutdown()
+
"nepi.execution",
"nepi.resources",
"nepi.resources.linux",
+ "nepi.resources.linux.ccn",
"nepi.resources.netns",
"nepi.resources.ns3",
"nepi.resources.omf",
def start(self):
command = self.get("command")
+
+ self.info("Starting command '%s'" % command)
+
+ if not command:
+ # If no command was given (i.e. Application was used for dependency
+ # installation), then the application is directly marked as FINISHED
+ self._state = ResourceState.FINISHED
+ else:
+ if self.in_foreground:
+ self._start_in_foreground()
+ else:
+ self._start_in_background()
+
+ super(LinuxApplication, self).start()
+
+ def _start_in_foreground(self):
+ command = self.get("command")
env = self.get("env")
stdin = "stdin" if self.get("stdin") else None
- stdout = "stdout" if self.get("stdout") else "stdout"
- stderr = "stderr" if self.get("stderr") else "stderr"
sudo = self.get("sudo") or False
- failed = False
+ x11 = self.get("forwardX11")
- self.info("Starting command '%s'" % command)
+ # Command will be launched in foreground and attached to the
+ # terminal using the node 'execute' in non blocking mode.
- if self.in_foreground:
- # If command should run in foreground, we invoke 'execute' method
- # of the node
- if not command:
- msg = "No command is defined but X11 forwarding has been set"
- self.error(msg)
- self._state = ResourceState.FAILED
- raise RuntimeError, msg
+ # Export environment
+ environ = self.node.format_environment(env, inline = True)
+ command = environ + command
+ command = self.replace_paths(command)
- # Export environment
- environ = self.node.format_environment(env, inline = True)
+ self.info("Starting command IN FOREGROUND '%s'" % command)
+
+ # We save the reference to the process in self._proc
+ # to be able to kill the process from the stop method.
+ # We also set blocking = False, since we don't want the
+ # thread to block until the execution finishes.
+ (out, err), self._proc = self.node.execute(command,
+ sudo = sudo,
+ stdin = stdin,
+ forward_x11 = x11,
+ blocking = False)
+
+ if self._proc.poll():
+ self._state = ResourceState.FAILED
+ self.error(msg, out, err)
+ raise RuntimeError, msg
- command = environ + command
- command = self.replace_paths(command)
-
- x11 = self.get("forwardX11")
-
- # We save the reference to the process in self._proc
- # to be able to kill the process from the stop method.
- # We also set blocking = False, since we don't want the
- # thread to block until the execution finishes.
- (out, err), self._proc = self.node.execute(command,
- sudo = sudo,
- stdin = stdin,
- forward_x11 = x11,
- blocking = False)
-
- if self._proc.poll():
- out = ""
- err = self._proc.stderr.read()
- self._state = ResourceState.FAILED
- self.error(msg, out, err)
- raise RuntimeError, msg
-
- super(LinuxApplication, self).start()
+ def _start_in_background(self):
+ command = self.get("command")
+ env = self.get("env")
+ stdin = "stdin" if self.get("stdin") else None
+ stdout = "stdout" if self.get("stdout") else "stdout"
+ stderr = "stderr" if self.get("stderr") else "stderr"
+ sudo = self.get("sudo") or False
- elif command:
- # If command is set (i.e. application is not used only for dependency
- # installation), and it does not need to run in foreground, then we
- # invoke the 'run' method of the node to launch the application as a
- # daemon in background
-
- # The real command to execute was previously uploaded to a remote bash
- # script during deployment, now launch the remote script using 'run'
- # method from the node
- cmd = "bash ./app.sh"
- (out, err), proc = self.node.run(cmd, self.app_home,
- stdin = stdin,
- stdout = stdout,
- stderr = stderr,
- sudo = sudo)
-
- # check if execution errors occurred
- msg = " Failed to start command '%s' " % command
-
- if proc.poll():
+ # Command will be as a daemon in baground and detached from any terminal.
+ # The real command to run was previously uploaded to a bash script
+ # during deployment, now launch the remote script using 'run'
+ # method from the node
+ cmd = "bash ./app.sh"
+ (out, err), proc = self.node.run(cmd, self.app_home,
+ stdin = stdin,
+ stdout = stdout,
+ stderr = stderr,
+ sudo = sudo)
+
+ # check if execution errors occurred
+ msg = " Failed to start command '%s' " % command
+
+ if proc.poll():
+ self._state = ResourceState.FAILED
+ self.error(msg, out, err)
+ raise RuntimeError, msg
+
+ # Wait for pid file to be generated
+ pid, ppid = self.node.wait_pid(self.app_home)
+ if pid: self._pid = int(pid)
+ if ppid: self._ppid = int(ppid)
+
+ # If the process is not running, check for error information
+ # on the remote machine
+ if not self.pid or not self.ppid:
+ (out, err), proc = self.check_errors(home, ecodefile, stderr)
+
+ # Out is what was written in the stderr file
+ if err:
+ self._state = ResourceState.FAILED
+ msg = " Failed to start command '%s' " % command
self.error(msg, out, err)
raise RuntimeError, msg
- # Wait for pid file to be generated
- pid, ppid = self.node.wait_pid(self.app_home)
- if pid: self._pid = int(pid)
- if ppid: self._ppid = int(ppid)
-
- # If the process is not running, check for error information
- # on the remote machine
- if not self.pid or not self.ppid:
- (out, err), proc = self.check_errors(home, ecodefile, stderr)
-
- # Out is what was written in the stderr file
- if err:
- msg = " Failed to start command '%s' " % command
- self.error(msg, out, err)
- raise RuntimeError, msg
-
- super(LinuxApplication, self).start()
-
- else:
- # If no command was given (i.e. Application was used for dependency
- # installation), then the application is directly marked as FINISHED
- self._state = ResourceState.FINISHED
-
def stop(self):
""" Stops application execution
"""
command = self.get('command') or ''
- state = self.state
- if state == ResourceState.STARTED:
+ if self.state == ResourceState.STARTED:
stopped = True
self.info("Stopping command '%s'" % command)
self.node.execute(tear_down)
self.stop()
+
if self.state == ResourceState.STOPPED:
super(LinuxApplication, self).release()
--- /dev/null
+#
+# NEPI, a framework to manage network experiments
+# Copyright (C) 2013 INRIA
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
+
+from nepi.execution.attribute import Attribute, Flags, Types
+from nepi.execution.trace import Trace, TraceAttr
+from nepi.execution.resource import ResourceManager, clsinit_copy, ResourceState, \
+ ResourceAction
+from nepi.resources.linux.application import LinuxApplication
+from nepi.resources.linux.ccn.ccnr import LinuxCCNR
+from nepi.resources.linux.node import OSType
+
+from nepi.util.sshfuncs import ProcStatus
+from nepi.util.timefuncs import strfnow, strfdiff
+import os
+
+reschedule_delay = "0.5s"
+
+@clsinit_copy
+class LinuxCCNR(LinuxApplication):
+ _rtype = "LinuxCCNContent"
+
+ @classmethod
+ def _register_attributes(cls):
+ content_name = Attribute("contentName",
+ "The name of the content to publish (e.g. ccn:/VIDEO) ",
+ flags = Flags.ExecReadOnly)
+ content = Attribute("content",
+ "The content to publish. It can be a path to a file or plain text ",
+ flags = Flags.ExecReadOnly)
+
+
+ cls._register_attribute(content_name)
+ cls._register_attribute(content)
+
+ @classmethod
+ def _register_traces(cls):
+ log = Trace("log", "CCND log output")
+
+ cls._register_trace(log)
+
+ def __init__(self, ec, guid):
+ super(LinuxCCNContent, self).__init__(ec, guid)
+
+ @property
+ def ccnr(self):
+ ccnr = self.get_connected(LinuxCCNR.rtype())
+ if ccnr: return ccnr[0]
+ return None
+
+ def deploy(self):
+ if not self.get("command"):
+ self.set("command", self._default_command)
+
+ if not self.get("env"):
+ self.set("env", self._default_environment)
+
+ # Wait until associated ccnd is provisioned
+ ccnr = self.ccnr
+
+ if not ccnr or ccnr.state < ResourceState.STARTED:
+ self.ec.schedule(reschedule_delay, self.deploy)
+ else:
+ # Add a start after condition so CCNR will not start
+ # before CCND does
+ self.ec.register_condition(self.guid, ResourceAction.START,
+ ccnd.guid, ResourceState.STARTED)
+
+ # Invoke the actual deployment
+ super(LinuxCCNContent, self).deploy()
+
+ @property
+ def _default_command(self):
+ return "ccnseqwriter -r %s " % self.get("contentName")
+
+ @property
+ def _default_environment(self):
+ env = "PATH=$PATH:${EXP_HOME}/ccnx/bin "
+ return env
+
+ def valid_connection(self, guid):
+ # TODO: Validate!
+ return True
+
def __init__(self, ec, guid):
super(LinuxCCND, self).__init__(ec, guid)
+ # Marks whether daemon is running
+ self._running = False
def deploy(self):
if not self.get("command"):
super(LinuxCCND, self).deploy()
+ # As soon as the ccnd sources are deployed, we launch the
+ # daemon ( we don't want to lose time launching the ccn
+ # daemon later on )
+ if self._state == ResourceState.READY:
+ self._start_in_background()
+ self._running = True
+
+ def start(self):
+ # CCND should already be started by now.
+ # Nothing to do but to set the state to STARTED
+ if self._running:
+ self._start_time = strfnow()
+ self._state = ResourceState.STARTED
+ else:
+ msg = " Failed to execute command '%s'" % command
+ self.error(msg, out, err)
+ self._state = ResourceState.FAILED
+ raise RuntimeError, msg
+
def stop(self):
command = self.get('command') or ''
state = self.state
@property
def state(self):
- if self._state == ResourceState.STARTED:
- # we executed the ccndstart command. This should have started
- # a remote ccnd daemon. The way we can query wheather ccnd is
- # still running is by executing the ccndstatus command.
+ # First check if the ccnd has failed
+ if self._running and strfdiff(strfnow(), self._last_state_check) > state_check_delay:
state_check_delay = 0.5
- if strfdiff(strfnow(), self._last_state_check) > state_check_delay:
- env = self.get('env') or ""
- environ = self.node.format_environment(env, inline = True)
- command = environ + "; ccndstatus"
- command = self.replace_paths(command)
-
- (out, err), proc = self.node.execute(command)
-
- retcode = proc.poll()
-
- if retcode == 1 and err.find("No such file or directory") > -1:
- # ccnd is not running (socket not found)
- self._state = ResourceState.FINISHED
- elif retcode:
- # other error
- msg = " Failed to execute command '%s'" % command
- self.error(msg, out, err)
- self._state = ResourceState.FAILED
-
- self._last_state_check = strfnow()
+ (out, err), proc = self._cndstatus()
+
+ retcode = proc.poll()
+
+ if retcode == 1 and err.find("No such file or directory") > -1:
+ # ccnd is not running (socket not found)
+ self._running = False
+ self._state = ResourceState.FINISHED
+ elif retcode:
+ # other errors ...
+ self._running = False
+ msg = " Failed to execute command '%s'" % command
+ self.error(msg, out, err)
+ self._state = ResourceState.FAILED
+
+ self._last_state_check = strfnow()
+
+ if self._state == ResourceState.READY:
+ # CCND is really deployed only when ccn daemon is running
+ if not self._running:
+ return ResourceState.PROVISIONED
return self._state
+ @property
+ def _ccndstatus(self):
+ env = self.get('env') or ""
+ environ = self.node.format_environment(env, inline = True)
+ command = environ + "; ccndstatus"
+ command = self.replace_paths(command)
+
+ return self.node.execute(command)
+
@property
def _default_command(self):
return "ccndstart"
from nepi.execution.resource import ResourceManager, clsinit_copy, ResourceState, \
ResourceAction
from nepi.resources.linux.application import LinuxApplication
-from nepi.resources.linux.ccnd import LinuxCCND
+from nepi.resources.linux.ccn.ccnd import LinuxCCND
from nepi.resources.linux.node import OSType
from nepi.util.sshfuncs import ProcStatus
def __init__(self, ec, guid):
super(LinuxCCNR, self).__init__(ec, guid)
+ # Marks whether ccnr is running
+ self._running = False
@property
def ccnd(self):
# Wait until associated ccnd is provisioned
ccnd = self.ccnd
- if not ccnd or ccnd.state < ResourceState.PROVISIONED:
+ if not ccnd or ccnd.state < ResourceState.READY:
+ # ccnr needs to wait until ccnd is deployed and running
self.ec.schedule(reschedule_delay, self.deploy)
else:
- # Add a start after condition so CCNR will not start
- # before CCND does
- self.ec.register_condition(self.guid, ResourceAction.START,
- ccnd.guid, ResourceState.STARTED)
-
# Invoke the actual deployment
super(LinuxCCNR, self).deploy()
+ # As soon as the ccnd sources are deployed, we launch the
+ # daemon ( we don't want to lose time launching the ccn
+ # daemon later on )
+ if self._state == ResourceState.READY:
+ self._start_in_background()
+ self._running = True
+
+ def start(self):
+ # CCND should already be started by now.
+ # Nothing to do but to set the state to STARTED
+ if self._running:
+ self._start_time = strfnow()
+ self._state = ResourceState.STARTED
+ else:
+ msg = " Failed to execute command '%s'" % command
+ self.error(msg, out, err)
+ self._state = ResourceState.FAILED
+ raise RuntimeError, msg
+
+ @property
+ def state(self):
+ state = super(LinuxCCNR, self).state()
+ if self._state in [ResourceState.TERMINATED, ResourceState.FAILED]:
+ self._running = False
+
+ if self._state == ResourceState.READY:
+ # CCND is really deployed only when ccn daemon is running
+ if not self._running:
+ return ResourceState.PROVISIONED
+
+ return self._state
+
@property
def _default_command(self):
return "ccnr"
@clsinit
class LinuxNode(ResourceManager):
+ """
+ .. class:: Class Args :
+
+ :param ec: The Experiment controller
+ :type ec: ExperimentController
+ :param guid: guid of the RM
+ :type guid: int
+
+ .. note::
+
+ There are different ways in which commands can be executed using the
+ LinuxNode interface (i.e. 'execute' - blocking and non blocking, 'run',
+ 'run_and_wait').
+
+ Brief explanation:
+
+ * 'execute' (blocking mode) :
+
+ HOW IT WORKS: 'execute', forks a process and run the
+ command, synchronously, attached to the terminal, in
+ foreground.
+ The execute method will block until the command returns
+ the result on 'out', 'err' (so until it finishes executing).
+
+ USAGE: short-lived commands that must be executed attached
+ to a terminal and in foreground, for which it IS necessary
+ to block until the command has finished (e.g. if you want
+ to run 'ls' or 'cat').
+
+ * 'execute' (NON blocking mode - blocking = False) :
+
+ HOW IT WORKS: Same as before, except that execute method
+ will return immediately (even if command still running).
+
+ USAGE: long-lived commands that must be executed attached
+ to a terminal and in foreground, but for which it is not
+ necessary to block until the command has finished. (e.g.
+ start an application using X11 forwarding)
+
+ * 'run' :
+
+ HOW IT WORKS: Connects to the host ( using SSH if remote)
+ and launches the command in background, detached from any
+ terminal (daemonized), and returns. The command continues to
+ run remotely, but since it is detached from the terminal,
+ its pipes (stdin, stdout, stderr) can't be redirected to the
+ console (as normal non detached processes would), and so they
+ are explicitly redirected to files. The pidfile is created as
+ part of the process of launching the command. The pidfile
+ holds the pid and ppid of the process forked in background,
+ so later on it is possible to check whether the command is still
+ running.
+
+ USAGE: long-lived commands that can run detached in background,
+ for which it is NOT necessary to block (wait) until the command
+ has finished. (e.g. start an application that is not using X11
+ forwarding. It can run detached and remotely in background)
+
+ * 'run_and_wait' :
+
+ HOW IT WORKS: Similar to 'run' except that it 'blocks' until
+ the command has finished execution. It also checks whether
+ errors occurred during runtime by reading the exitcode file,
+ which contains the exit code of the command that was run
+ (checking stderr only is not always reliable since many
+ commands throw debugging info to stderr and the only way to
+ automatically know whether an error really happened is to
+ check the process exit code).
+
+ Another difference with respect to 'run', is that instead
+ of directly executing the command as a bash command line,
+ it uploads the command to a bash script and runs the script.
+ This allows to use the bash script to debug errors, since
+ it remains at the remote host and can be run manually to
+ reproduce the error.
+
+ USAGE: medium-lived commands that can run detached in
+ background, for which it IS necessary to block (wait) until
+ the command has finished. (e.g. Package installation,
+ source compilation, file download, etc)
+
+ """
_rtype = "LinuxNode"
@classmethod
""" Saves the command as a bash script file in the remote host, and
forces to save the exit code of the command execution to the ecodefile
"""
+
+ if not (command.strip().endswith(";") or command.strip().endswith("&")):
+ command += ";"
# The exit code of the command will be stored in ecodefile
- command = " %(command)s ; echo $? > %(ecodefile)s ;" % {
+ command = " { %(command)s } ; echo $? > %(ecodefile)s ;" % {
'command': command,
'ecodefile': ecodefile,
}
def format_environment(self, env, inline = False):
"""Format environmental variables for command to be executed either
- as an inline command (i.e. PYTHONPATH=src/.. python script.py) or
+ as an inline command
+ (i.e. export PYTHONPATH=src/..; export LALAL= ..;python script.py) or
as a bash script (i.e. export PYTHONPATH=src/.. \n export LALA=.. \n)
"""
- sep = " " if inline else "\n"
- export = " " if inline else "export"
- return sep.join(map(lambda e: "%s %s" % (export, e),
- env.strip().split(" "))) + sep if env else ""
+ if not env: return ""
+ env = env.strip()
+
+ sep = ";" if inline else "\n"
+ return sep.join(map(lambda e: " export %s" % e, env.split(" "))) + sep
def check_errors(self, home,
ecodefile = "exitcode",
# by default, rexec calls _communicate which will block
# until the process has exit. The argument block == False
# forces to rexec to return immediately, without blocking
- if not blocking:
- return (("", ""), proc)
-
try:
- out, err = _communicate(proc, stdin, timeout, err_on_timeout)
+ if blocking:
+ out, err = _communicate(proc, stdin, timeout, err_on_timeout)
+ else:
+ out = err = ""
+ if proc.poll():
+ err = self._proc.stderr.read()
+
msg = " rexec - host %s - command %s " % (host, " ".join(args))
log(msg, logging.DEBUG, out, err)
class LinuxApplicationTestCase(unittest.TestCase):
def setUp(self):
self.fedora_host = "nepi2.pl.sophia.inria.fr"
+ self.fedora_host = "planetlab2.u-strasbg.fr"
self.fedora_user = "inria_nepi"
self.ubuntu_host = "roseval.pl.sophia.inria.fr"
class LinuxInterfaceTestCase(unittest.TestCase):
def setUp(self):
- self.fedora_host = 'nepi2.pl.sophia.inria.fr'
+ self.fedora_host = "nepi2.pl.sophia.inria.fr"
+ self.fedora_host = "planetlab2.u-strasbg.fr"
self.fedora_user = 'inria_nepi'
self.ubuntu_host = 'roseval.pl.sophia.inria.fr'
class LinuxNodeTestCase(unittest.TestCase):
def setUp(self):
self.fedora_host = "nepi2.pl.sophia.inria.fr"
+ self.fedora_host = "planetlab2.u-strasbg.fr"
self.fedora_user = "inria_nepi"
self.ubuntu_host = "roseval.pl.sophia.inria.fr"
node, ec = create_node(host, user)
(out, err), proc = node.mkdir(node.node_home, clean = True)
- self.assertEquals(out, "")
+ self.assertEquals(err, "")
(out, err), proc = node.install_packages("gcc", node.node_home)
- self.assertEquals(out, "")
+ self.assertEquals(err, "")
(out, err), proc = node.remove_packages("gcc", node.node_home)
- self.assertEquals(out, "")
+ self.assertEquals(err, "")
(out, err), proc = node.rmdir(node.exp_home)
- self.assertEquals(out, "")
+ self.assertEquals(err, "")
@skipIfNotAlive
def t_xterm(self, host, user):
node, ec = create_node(host, user)
(out, err), proc = node.mkdir(node.node_home, clean = True)
- self.assertEquals(out, "")
+ self.assertEquals(err, "")
node.install_packages("xterm", node.node_home)
- self.assertEquals(out, "")
+ self.assertEquals(err, "")
(out, err), proc = node.execute("xterm", forward_x11 = True)
- self.assertEquals(out, "")
+ self.assertEquals(err, "")
(out, err), proc = node.remove_packages("xterm", node.node_home)
- self.assertEquals(out, "")
+ self.assertEquals(err, "")
@skipIfNotAlive
def t_compile(self, host, user):
from nepi.resources.planetlab.plcapi import PLCAPIFactory
-from nepi.util.sshfuncs import RUNNING, FINISHED
import os
import unittest