from nepi.execution.ec import ExperimentController, ECState
from nepi.execution.resource import ResourceState, ResourceAction, \
populate_factory
+from nepi.resources.linux.node import OSType
from optparse import OptionParser, SUPPRESS_HELP
ec.set(node, "hostname", host)
ec.set(node, "username", user)
ec.set(node, "identity", ssh_key)
- ec.set(node, "cleanHome", True)
+ #ec.set(node, "cleanHome", True)
ec.set(node, "cleanProcesses", True)
return node
def add_ccnd(ec, os_type, peers):
- if os_type == "f12":
+ if os_type == OSType.FEDORA:
depends = ( " autoconf openssl-devel expat-devel libpcap-devel "
" ecryptfs-utils-devel libxml2-devel automake gawk "
" gcc gcc-c++ git pcre-devel make ")
- elif os_type == "ubuntu":
+ else: # UBUNTU
depends = ( " autoconf libssl-dev libexpat-dev libpcap-dev "
" libecryptfs0 libxml2-utils automake gawk gcc g++ "
" git-core pkg-config libpcre3-dev make ")
" ) && "
"cd ${SOURCES}/ccnx && "
# Just execute and silence warnings...
- "( ( ./configure && make ) 2>&1 )"
+ "( ./configure && make ) "
" )")
install = (
env = "PATH=$PATH:${EXP_HOME}/ccnx/bin"
# BASH command -> ' ccndstart ; ccndc add ccnx:/ udp host ; ccnr '
- command = "ccndstart ; "
+ command = "ccndstart && "
peers = map(lambda peer: "ccndc add ccnx:/ udp %s" % peer, peers)
- command += " ; ".join(peers) + " ; "
- command += " ccnr "
+ command += " ; ".join(peers) + " && "
+ command += " ccnr & "
app = ec.register_resource("LinuxApplication")
ec.set(app, "depends", depends)
node1 = add_node(ec, host1, pl_slice, pl_ssh_key)
peers = [host2]
- ccnd1 = add_ccnd(ec, "f12", peers)
+ ccnd1 = add_ccnd(ec, OSType.FEDORA, peers)
ec.register_connection(ccnd1, node1)
def wait_finished(self, guids):
""" Blocking method that wait until all the RM from the 'guid' list
- reach the state FINISHED
+ reached the state FINISHED
+ :param guids: List of guids
+ :type guids: list
+ """
+ return self.wait(guids)
+
+ def wait_started(self, guids):
+ """ Blocking method that wait until all the RM from the 'guid' list
+ reached the state STARTED
+
+ :param guids: List of guids
+ :type guids: list
+ """
+ return self.wait(guids, states = [ResourceState.STARTED, ResourceState.FINISHED])
+
+ def wait(self, guids, states = [ResourceState.FINISHED]):
+ """ Blocking method that waits until all the RM from the 'guid' list
+ reached state 'state' or until a failure occurs
+
:param guids: List of guids
:type guids: list
"""
if isinstance(guids, int):
guids = [guids]
- while not all([self.state(guid) in [ResourceState.FINISHED,
- ResourceState.STOPPED,
- ResourceState.FAILED] \
- for guid in guids]) and not self.finished:
- # We keep the sleep as large as possible to
- # decrese the number of RM state requests
+ while not all([self.state(guid) in states for guid in guids]) and \
+ not any([self.state(guid) in [
+ ResourceState.STOPPED,
+ ResourceState.FAILED] for guid in guids]) and \
+ not self.finished:
+ # We keep the sleep big to decrease the number of RM state queries
time.sleep(2)
-
+
def get_task(self, tid):
""" Get a specific task
self.logger.debug(" ------- DEPLOY START ------ ")
if not group:
- group = self.resources
-
+ # By default, if not deployment group is indicated,
+ # all RMs that are undeployed will be deployed
+ group = []
+ for guid in self.resources:
+ if self.state(guid) == ResourceState.NEW:
+ group.append(guid)
+
if isinstance(group, int):
group = [group]
self._state = ECState.FAILED
finally:
+ self._logger.info("Exiting the task processing loop ... ")
runner.sync()
def _execute(self, task):
def _register_traces(cls):
stdout = Trace("stdout", "Standard output stream")
stderr = Trace("stderr", "Standard error stream")
- buildlog = Trace("buildlog", "Output of the build process")
cls._register_trace(stdout)
cls._register_trace(stderr)
- cls._register_trace(buildlog)
def __init__(self, ec, guid):
super(LinuxApplication, self).__init__(ec, guid)
# replace application specific paths in the command
command = self.replace_paths(command)
+ env = env and self.replace_paths(env)
self.node.upload_command(command, self.app_home,
shfile = "app.sh",
# Download http sources remotely
if http_sources:
- command = " wget -c --directory-prefix=${SOURCES} "
- check = ""
+ command = [" wget -c --directory-prefix=${SOURCES} "]
+ check = []
for source in http_sources:
- command += " %s " % (source)
- check += " ls ${SOURCES}/%s ;" % os.path.basename(source)
+ command.append(" %s " % (source))
+ check.append(" ls ${SOURCES}/%s " % os.path.basename(source))
+ command = " ".join(command)
+ check = " ; ".join(check)
+
# Append the command to check that the sources were downloaded
command += " ; %s " % check
self.node.mkdir(self.build_dir)
# replace application specific paths in the command
- command = self.replace_paths(command)
+ command = self.replace_paths(build)
# Upload the command to a file, and execute asynchronously
self.node.run_and_wait(command, self.app_home,
self.info(" Installing sources ")
# replace application specific paths in the command
- command = self.replace_paths(command)
+ command = self.replace_paths(install)
# Upload the command to a file, and execute asynchronously
self.node.run_and_wait(command, self.app_home,
for var in env.split(" "):
environ += ' %s ' % var
- command = "(" + environ + " ; " + command + ")"
+ command = "{" + environ + " ; " + command + " ; }"
command = self.replace_paths(command)
# If the command requires X11 forwarding, we
def valid_connection(self, guid):
# TODO: Validate!
return True
- # XXX: What if it is connected to more than one node?
- resources = self.find_resources(exact_tags = [tags.NODE])
- self._node = resources[0] if len(resources) == 1 else None
- return self._node
--- /dev/null
+#
+# NEPI, a framework to manage network experiments
+# Copyright (C) 2013 INRIA
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
+
+from nepi.execution.attribute import Attribute, Flags, Types
+from nepi.execution.trace import Trace, TraceAttr
+from nepi.execution.resource import ResourceManager, clsinit_copy, ResourceState
+from nepi.resources.linux.application import LinuxApplication
+from nepi.resources.linux.node import OSType
+
+import os
+
+@clsinit_copy
+class LinuxCCND(LinuxApplication):
+ _rtype = "LinuxCCND"
+
+ @classmethod
+ def _register_attributes(cls):
+ debug = Attribute("debug", "Sets the CCND_DEBUG environmental variable. "
+ " Allowed values are : \n"
+ " 0 - no messages \n"
+ " 1 - basic messages (any non-zero value gets these) \n"
+ " 2 - interest messages \n"
+ " 4 - content messages \n"
+ " 8 - matching details \n"
+ " 16 - interest details \n"
+ " 32 - gory interest details \n"
+ " 64 - log occasional human-readable timestamps \n"
+ " 128 - face registration debugging \n"
+ " -1 - max logging \n"
+ " Or apply bitwise OR to these values to get combinations of them",
+ flags = Flags.ExecReadOnly)
+
+ port = Attribute("port", "Sets the CCN_LOCAL_PORT environmental variable. "
+ "Defaults to 9695 ",
+ flags = Flags.ExecReadOnly)
+
+ sockname = Attribute("sockname",
+ "Sets the CCN_LOCAL_SCOKNAME environmental variable. "
+ "Defaults to /tmp/.ccnd.sock",
+ flags = Flags.ExecReadOnly)
+
+ capacity = Attribute("capacity",
+ "Sets the CCND_CAP environmental variable. "
+ "Capacity limit in terms of ContentObjects",
+ flags = Flags.ExecReadOnly)
+
+ mtu = Attribute("mtu", "Sets the CCND_MTU environmental variable. ",
+ flags = Flags.ExecReadOnly)
+
+ data_pause = Attribute("dataPauseMicrosec",
+ "Sets the CCND_DATA_PAUSE_MICROSEC environmental variable. ",
+ flags = Flags.ExecReadOnly)
+
+ default_stale = Attribute("defaultTimeToStale",
+ "Sets the CCND_DEFAULT_TIME_TO_STALE environmental variable. ",
+ flags = Flags.ExecReadOnly)
+
+ max_stale = Attribute("maxTimeToStale",
+ "Sets the CCND_MAX_TIME_TO_STALE environmental variable. ",
+ flags = Flags.ExecReadOnly)
+
+ max_rte = Attribute("maxRteMicrosec",
+ "Sets the CCND_MAX_RTE_MICROSEC environmental variable. ",
+ flags = Flags.ExecReadOnly)
+
+ keystore = Attribute("keyStoreDirectory",
+ "Sets the CCND_KEYSTORE_DIRECTORY environmental variable. ",
+ flags = Flags.ExecReadOnly)
+
+ listen_on = Attribute("listenOn",
+ "Sets the CCND_LISTEN_ON environmental variable. ",
+ flags = Flags.ExecReadOnly)
+
+ autoreg = Attribute("autoreg",
+ "Sets the CCND_AUTOREG environmental variable. ",
+ flags = Flags.ExecReadOnly)
+
+ prefix = Attribute("prefix",
+ "Sets the CCND_PREFIX environmental variable. ",
+ flags = Flags.ExecReadOnly)
+
+ cls._register_attribute(debug)
+ cls._register_attribute(port)
+ cls._register_attribute(sockname)
+ cls._register_attribute(capacity)
+ cls._register_attribute(mtu)
+ cls._register_attribute(data_pause)
+ cls._register_attribute(default_stale)
+ cls._register_attribute(max_stale)
+ cls._register_attribute(max_rte)
+ cls._register_attribute(keystore)
+ cls._register_attribute(listen_on)
+ cls._register_attribute(autoreg)
+ cls._register_attribute(prefix)
+
+ @classmethod
+ def _register_traces(cls):
+ log = Trace("log", "CCND log output")
+ status = Trace("status", "ccndstatus output")
+
+ cls._register_trace(log)
+ cls._register_trace(status)
+
+ def __init__(self, ec, guid):
+ super(LinuxCCND, self).__init__(ec, guid)
+
+ def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
+ self.info("Retrieving '%s' trace %s " % (name, attr))
+
+ path = os.path.join(self.app_home, name)
+
+ command = "(test -f %s && echo 'success') || echo 'error'" % path
+ (out, err), proc = self.node.execute(command)
+
+ if (err and proc.poll()) or out.find("error") != -1:
+ msg = " Couldn't find trace %s " % name
+ self.error(msg, out, err)
+ return None
+
+ if attr == TraceAttr.PATH:
+ return path
+
+ if attr == TraceAttr.ALL:
+ (out, err), proc = self.node.check_output(self.app_home, name)
+
+ if err and proc.poll():
+ msg = " Couldn't read trace %s " % name
+ self.error(msg, out, err)
+ return None
+
+ return out
+
+ if attr == TraceAttr.STREAM:
+ cmd = "dd if=%s bs=%d count=1 skip=%d" % (path, block, offset)
+ elif attr == TraceAttr.SIZE:
+ cmd = "stat -c%%s %s " % path
+
+ (out, err), proc = self.node.execute(cmd)
+
+ if err and proc.poll():
+ msg = " Couldn't find trace %s " % name
+ self.error(msg, out, err)
+ return None
+
+ if attr == TraceAttr.SIZE:
+ out = int(out.strip())
+
+ return out
+
+ def deploy(self):
+ if not self.get("command"):
+ self.set("command", self._default_command)
+
+ if not self.get("depends"):
+ self.set("depends", self._default_dependencies)
+
+ if not self.get("sources"):
+ self.set("sources", self._default_sources)
+
+ if not self.get("build"):
+ self.set("build", self._default_build)
+
+ if not self.get("install"):
+ self.set("install", self._default_install)
+
+ if not self.get("env"):
+ self.set("env", self._default_environment)
+
+ super(LinuxCCND, self).deploy()
+
+ def stop(self):
+ command = self.get('command') or ''
+ state = self.state
+
+ if state == ResourceState.STARTED:
+ self.info("Stopping command '%s'" % command)
+
+ (out, err), proc = self.node.kill(self.pid, self.ppid)
+
+ if out or err:
+ # check if execution errors occurred
+ msg = " Failed to STOP command '%s' " % self.get("command")
+ self.error(msg, out, err)
+ self._state = ResourceState.FAILED
+ stopped = False
+ else:
+ super(LinuxApplication, self).stop()
+
+
+ @property
+ def _default_command(self):
+ return "ccndstart"
+
+ @property
+ def _default_dependencies(self):
+ if self.node.os in [ OSType.FEDORA_12 , OSType.FEDORA_14 ]:
+ return ( " autoconf openssl-devel expat-devel libpcap-devel "
+ " ecryptfs-utils-devel libxml2-devel automake gawk "
+ " gcc gcc-c++ git pcre-devel make ")
+ elif self.node.os in [ OSType.UBUNTU , OSType.DEBIAN]:
+ return ( " autoconf libssl-dev libexpat-dev libpcap-dev "
+ " libecryptfs0 libxml2-utils automake gawk gcc g++ "
+ " git-core pkg-config libpcre3-dev make ")
+ return ""
+
+ @property
+ def _default_sources(self):
+ return "http://www.ccnx.org/releases/ccnx-0.7.1.tar.gz"
+
+ @property
+ def _default_build(self):
+ sources = self.get("sources").split(" ")[0]
+ sources = os.path.basename(sources)
+
+ return (
+ # Evaluate if ccnx binaries are already installed
+ " ( "
+ " test -f ${EXP_HOME}/ccnx/bin/ccnd"
+ " ) || ( "
+ # If not, untar and build
+ " ( "
+ " mkdir -p ${SOURCES}/ccnx && "
+ " tar xf ${SOURCES}/%(sources)s --strip-components=1 -C ${SOURCES}/ccnx "
+ " ) && "
+ "cd ${SOURCES}/ccnx && "
+ # Just execute and silence warnings...
+ " ( ./configure && make ) "
+ " )") % ({ 'sources': sources })
+
+ @property
+ def _default_install(self):
+ return (
+ # Evaluate if ccnx binaries are already installed
+ " ( "
+ " test -f ${EXP_HOME}/ccnx/bin/ccnd"
+ " ) || ( "
+ " mkdir -p ${EXP_HOME}/ccnx/bin && "
+ " cp -r ${SOURCES}/ccnx ${EXP_HOME}"
+ " )"
+ )
+
+ @property
+ def _default_environment(self):
+ envs = dict({
+ "debug": "CCND_DEBUG",
+ "port": "CCN_LOCAL_PORT",
+ "sockname" : "CCN_LOCAL_SOCKNAME",
+ "capacity" : "CCND_CAP",
+ "mtu" : "CCND_MTU",
+ "dataPauseMicrosec" : "CCND_DATA_PAUSE_MICROSEC",
+ "defaultTimeToStale" : "CCND_DEFAULT_TIME_TO_STALE",
+ "maxTimeToStale" : "CCND_MAX_TIME_TO_STALE",
+ "maxRteMicrosec" : "CCND_MAX_RTE_MICROSEC",
+ "keyStoreDirectory" : "CCND_KEYSTORE_DIRECTORY",
+ "listenOn" : "CCND_LISTEN_ON",
+ "autoreg" : "CCND_AUTOREG",
+ "prefix" : "CCND_PREFIX",
+ })
+
+ env = "PATH=$PATH:${EXP_HOME}/ccnx/bin"
+ for key in envs.keys():
+ val = self.get(key)
+ if val:
+ env += " %s=%s" % (key, val)
+
+ return env
+
+ def valid_connection(self, guid):
+ # TODO: Validate!
+ return True
+
@classmethod
def _register_attributes(cls):
ip4 = Attribute("ip4", "IPv4 Address",
- flags = Flags.ExecReadOnly)
+ flags = Flags.ExecReadOnly)
ip6 = Attribute("ip6", "IPv6 Address",
flags = Flags.ExecReadOnly)
flags = Flags.ExecReadOnly)
mtu = Attribute("mtu", "Maximum transmition unit for device",
- type = Types.Integer)
+ type = Types.Integer)
devname = Attribute("deviceName",
"Name of the network interface (e.g. eth0, wlan0, etc)",
ERROR = -3
OK = 0
+class OSType:
+ """
+ Supported flavors of Linux OS
+ """
+ FEDORA_12 = "f12"
+ FEDORA_14 = "f14"
+ FEDORA = "fedora"
+ UBUNTU = "ubuntu"
+ DEBIAN = "debian"
+
@clsinit
class LinuxNode(ResourceManager):
_rtype = "LinuxNode"
raise RuntimeError, "%s - %s - %s" %( msg, out, err )
if out.find("Fedora release 12") == 0:
- self._os = "f12"
+ self._os = OSType.FEDORA_12
elif out.find("Fedora release 14") == 0:
- self._os = "f14"
+ self._os = OSType.FEDORA_14
elif out.find("Debian") == 0:
- self._os = "debian"
+ self._os = OSType.DEBIAN
elif out.find("Ubuntu") ==0:
- self._os = "ubuntu"
+ self._os = OSType.UBUNTU
else:
msg = "Unsupported OS"
self.error(msg, out)
def install_packages(self, packages, home):
command = ""
- if self.os in ["f12", "f14"]:
+ if self.os in [OSType.FEDORA_12, OSType.FEDORA_14, OSType.FEDORA]:
command = rpmfuncs.install_packages_command(self.os, packages)
- elif self.os in ["debian", "ubuntu"]:
+ elif self.os in [OSType.DEBIAN, OSType.UBUNTU]:
command = debfuncs.install_packages_command(self.os, packages)
else:
msg = "Error installing packages ( OS not known ) "
def remove_packages(self, packages, home):
command = ""
- if self.os in ["f12", "f14"]:
+ if self.os in [OSType.FEDORA_12, OSType.FEDORA_14, OSType.FEDORA]:
command = rpmfuncs.remove_packages_command(self.os, packages)
- elif self.os in ["debian", "ubuntu"]:
+ elif self.os in [OSType.DEBIAN, OSType.UBUNTU]:
command = debfuncs.remove_packages_command(self.os, packages)
else:
msg = "Error removing packages ( OS not known ) "
ecodefile = "exitcode",
env = None):
- command = "{ ( %(command)s ) ; } ; echo $? > %(ecodefile)s " % {
+ command = " ( %(command)s ) ; echo $? > %(ecodefile)s " % {
'command': command,
'ecodefile': ecodefile,
}
# If the stderr file was not found, assume nothing happened.
# We just ignore the error.
- if ecode == ExitCode.FILENOTFOUND and proc.poll() == 1: # cat - No such file or directory
- err = ""
+ # (cat returns 1 for error "No such file or directory")
+ if ecode == ExitCode.FILENOTFOUND and proc.poll() == 1:
+ out = err = ""
return (out, err), proc
return cmd
def install_rpmfusion_command(os):
+ from nepi.resources.linux.node import OSType
+
cmd = "rpm -q rpmfusion-free-release || sudo -S rpm -i %(package)s"
- if os == "f12":
+ if os in [OSType.FEDORA, OSType.FEDORA_12]:
cmd = cmd % {'package': RPM_FUSION_URL_F12}
- elif os == "f14":
+ elif os == OSType.FEDORA_14:
# This one works for f13+
cmd = cmd % {'package': RPM_FUSION_URL}
else:
from nepi.execution.ec import ExperimentController
from nepi.execution.resource import ResourceManager, ResourceState, clsinit
+import random
import time
import unittest
if node.state < ResourceState.READY:
self.ec.schedule("0.5s", self.deploy)
else:
+ time.sleep(random.random() * 5)
super(Application, self).deploy()
self.logger.debug(" -------- DEPLOYED ------- ")
+ def start(self):
+ super(Application, self).start()
+ time.sleep(random.random() * 5)
+ self._state = ResourceState.FINISHED
+
class ResourceManagerTestCase(unittest.TestCase):
def test_deploy_in_order(self):
"""
ec.deploy()
- while not all([ ec.state(guid) == ResourceState.STARTED \
- for guid in [app1, app2, node1, node2, iface1, iface2, chan]]) \
- and not ec.finished:
- time.sleep(0.5)
+ guids = [app1, app2]
+ ec.wait_finished(guids)
ec.shutdown()
self.assertTrue(rmchan.ready_time < rmiface1.ready_time)
self.assertTrue(rmchan.ready_time < rmiface2.ready_time)
+ def test_concurrency(self):
+ from nepi.execution.resource import ResourceFactory
+
+ ResourceFactory.register_type(Application)
+ ResourceFactory.register_type(Node)
+ ResourceFactory.register_type(Interface)
+ ResourceFactory.register_type(Channel)
+
+ ec = ExperimentController()
+
+ node = ec.register_resource("Node")
+
+ apps = list()
+ for i in xrange(5000):
+ app = ec.register_resource("Application")
+ ec.register_connection(app, node)
+ apps.append(app)
+
+ ec.deploy()
+
+ ec.wait_finished(apps)
+
+ self.assertTrue(ec.state(node) == ResourceState.STARTED)
+ self.assertTrue(
+ all([ec.state(guid) == ResourceState.FINISHED \
+ for guid in apps])
+ )
+
+ ec.shutdown()
+
def test_start_with_condition(self):
# TODO!!!
pass
class LinuxApplicationTestCase(unittest.TestCase):
def setUp(self):
- self.fedora_host = "nepi2.pl.sophia.inria.fr"
+ self.fedora_host = "nepi5.pl.sophia.inria.fr"
self.fedora_user = "inria_nepi"
self.ubuntu_host = "roseval.pl.sophia.inria.fr"
self.ubuntu_user = "alina"
- self.target = "nepi5.pl.sophia.inria.fr"
+ self.target = "nepi3.pl.sophia.inria.fr"
@skipIfNotAlive
def t_stdout(self, host, user):
def test_ping_ubuntu(self):
self.t_ping(self.ubuntu_host, self.ubuntu_user)
- def test_concurrency_fedora(self):
+ def ztest_concurrency_fedora(self):
self.t_concurrency(self.fedora_host, self.fedora_user)
- def test_concurrency_ubuntu(self):
+ def ztest_concurrency_ubuntu(self):
self.t_concurrency(self.ubuntu_host, self.ubuntu_user)
def test_condition_fedora(self):