from neco.execution.trace import TraceAttr
# TODO: use multiprocessing instead of threading
+# TODO: Improve speed. Too slow... !!
+
+class ECState(object):
+ RUNNING = 1
+ FAILED = 2
+ TERMINATED = 3
class ExperimentController(object):
def __init__(self, exp_id = None, root_dir = "/tmp"):
# Resource managers
self._resources = dict()
- # Resource managers
- self._group = dict()
-
# Scheduler
self._scheduler = HeapScheduler()
self._tasks = dict()
# Event processing thread
- self._stop = False
self._cond = threading.Condition()
self._thread = threading.Thread(target = self._process)
+ self._thread.setDaemon(True)
self._thread.start()
+ # EC state
+ self._state = ECState.RUNNING
+
# Logging
self._logger = logging.getLogger("ExperimentController")
def logger(self):
return self._logger
+ @property
+ def ecstate(self):
+ return self._state
+
@property
def exp_id(self):
exp_id = self._exp_id
exp_id = "nepi-" + exp_id
return exp_id
+ @property
+ def finished(self):
+ return self.ecstate in [ECState.FAILED, ECState.TERMINATED]
+
+ def wait_finished(self, guids):
+ while not all([self.state(guid) == ResourceState.FINISHED \
+ for guid in guids]) and not self.finished:
+ time.sleep(1)
+
def get_task(self, tid):
return self._tasks.get(tid)
return guid
- def register_group(self, group):
- guid = self._guid_generator.next()
-
- if not isinstance(group, list):
- group = [group]
-
- self._groups[guid] = group
-
- return guid
-
def get_attributes(self, guid):
rm = self.get_resource(guid)
return rm.get_attributes()
thread = threading.Thread(target = steps, args = (rm,))
threads.append(thread)
+ thread.setDaemon(True)
thread.start()
- for thread in threads:
- thread.join()
+ while list(threads) and not self.finished:
+ thread = threads[0]
+ # Time out after 5 seconds to check EC not terminated
+ thread.join(5)
+ if not thread.is_alive():
+ threads.remove(thread)
def release(self, group = None):
if not group:
rm = self.get_resource(guid)
thread = threading.Thread(target=rm.release)
threads.append(thread)
+ thread.setDaemon(True)
thread.start()
- for thread in threads:
- thread.join()
+ while list(threads) and not self.finished:
+ thread = threads[0]
+ # Time out after 5 seconds to check EC not terminated
+ thread.join(5)
+ if not thread.is_alive():
+ threads.remove(thread)
+
+ self._state = ECState.TERMINATED
def shutdown(self):
self.release()
- self._stop = True
self._cond.acquire()
self._cond.notify()
self._cond.release()
+
if self._thread.is_alive():
self._thread.join()
runner.start()
try:
- while not self._stop:
+ while not self.finished:
self._cond.acquire()
task = self._scheduler.next()
self._cond.release()
else:
# Process tasks in parallel
runner.put(self._execute, task)
- except:
+
+ except:
import traceback
err = traceback.format_exc()
self._logger.error("Error while processing tasks in the EC: %s" % err)
+ self._state = ECState.FAILED
+ return
+
+ # Mark EC state as terminated
+ if self.ecstate == ECState.RUNNING:
+ self._state = ECState.TERMINATED
+
def _execute(self, task):
# Invoke callback
task.status = TaskStatus.DONE
except:
import traceback
err = traceback.format_exc()
- self._logger.error("Error while executing event: %s" % err)
-
task.result = err
task.status = TaskStatus.ERROR
+
+ self._logger.error("Error occurred while executing task: %s" % err)
+
+ # Mark the EC as failed
+ self._state = ECState.FAILED
+
+ # Wake up the EC in case it was sleeping
+ self._cond.acquire()
+ self._cond.notify()
+ self._cond.release()
+
+ # Propage error to the ParallelRunner
+ raise
import copy
import functools
+import inspect
import logging
+import os
+import pkgutil
import weakref
-_reschedule_delay = "1s"
+reschedule_delay = "0.5s"
class ResourceAction:
DEPLOY = 0
"""
reschedule = False
- delay = _reschedule_delay
+ delay = reschedule_delay
# check state and time elapsed on all RMs
for guid in group:
"""
reschedule = False
- delay = _reschedule_delay
+ delay = reschedule_delay
## evaluate if set conditions are met
"""
reschedule = False
- delay = _reschedule_delay
+ delay = reschedule_delay
## evaluate if set conditions are met
"""
reschedule = False
- delay = _reschedule_delay
+ delay = reschedule_delay
## evaluate if set conditions are met
rclass = cls._resource_types[rtype]
return rclass(ec, guid)
+def populate_factory():
+ for rclass in find_types():
+ ResourceFactory.register_type(rclass)
+
+def find_types():
+ search_path = os.environ.get("NECO_SEARCH_PATH", "")
+ search_path = set(search_path.split(" "))
+
+ import neco.resources
+ path = os.path.dirname(neco.resources.__file__)
+ search_path.add(path)
+
+ types = []
+
+ for importer, modname, ispkg in pkgutil.walk_packages(search_path):
+ loader = importer.find_module(modname)
+ try:
+ module = loader.load_module(loader.fullname)
+ for attrname in dir(module):
+ if attrname.startswith("_"):
+ continue
+
+ attr = getattr(module, attrname)
+
+ if attr == ResourceManager:
+ continue
+
+ if not inspect.isclass(attr):
+ continue
+
+ if issubclass(attr, ResourceManager):
+ types.append(attr)
+ except:
+ import traceback
+ err = traceback.format_exc()
+ logger = logging.getLogger("Resource.find_types()")
+ logger.error("Error while lading Resource Managers %s" % err)
+
+ return types
+
+
import logging
import os
-DELAY ="1s"
+reschedule_delay = "0.5s"
# TODO: Resolve wildcards in commands!!
stdin = Attribute("stdin", "Standard input", flags = Flags.ExecReadOnly)
stdout = Attribute("stdout", "Standard output", flags = Flags.ExecReadOnly)
stderr = Attribute("stderr", "Standard error", flags = Flags.ExecReadOnly)
- update_home = Attribute("updateHome", "If application hash has changed remove old directory and"
- "re-upload before starting experiment. If not keep the same directory",
- default = True,
- type = Types.Bool,
- flags = Flags.ExecReadOnly)
-
tear_down = Attribute("tearDown", "Bash script to be executed before "
"releasing the resource",
flags = Flags.ReadOnly)
cls._register_attribute(stdin)
cls._register_attribute(stdout)
cls._register_attribute(stderr)
- cls._register_attribute(update_home)
cls._register_attribute(tear_down)
@classmethod
return None
@property
- def home(self):
+ def app_home(self):
return os.path.join(self.node.exp_dir, self._home)
@property
def src_dir(self):
- return os.path.join(self.home, 'src')
+ return os.path.join(self.app_home, 'src')
@property
def build_dir(self):
- return os.path.join(self.home, 'build')
+ return os.path.join(self.app_home, 'build')
@property
def pid(self):
return self._ppid
def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
- path = os.path.join(self.home, name)
+ self.info("Retrieving '%s' trace %s " % (name, attr))
+
+ path = os.path.join(self.app_home, name)
cmd = "(test -f %s && echo 'success') || echo 'error'" % path
(out, err), proc = self.node.execute(cmd)
return path
if attr == TraceAttr.ALL:
- (out, err), proc = self.node.check_output(self.home, name)
+ (out, err), proc = self.node.check_output(self.app_home, name)
if err and proc.poll():
msg = " Couldn't read trace %s " % name
return out
def provision(self, filters = None):
- # TODO: verify home hash or clean home
-
# create home dir for application
- self.node.mkdir(self.home)
+ self.node.mkdir(self.app_home)
# upload sources
self.upload_sources()
# Install
self.install()
+ command = self.replace_paths(self.get("command"))
+ x11 = self.get("forwardX11") or False
+ if not x11:
+ self.info("Uploading command '%s'" % command)
+
+ # If the command runs asynchronous, pre upload the command
+ # to the app.sh file in the remote host
+ dst = os.path.join(self.app_home, "app.sh")
+ self.node.upload(command, dst, text = True)
+
super(LinuxApplication, self).provision()
def upload_sources(self):
- # check if sources need to be uploaded and upload them
+ # TODO: check if sources need to be uploaded and upload them
sources = self.get("sources")
if sources:
self.info(" Uploading sources ")
# create dir for sources
self.node.mkdir(self.src_dir)
- sources = self.sources.split(' ')
+ sources = sources.split(' ')
http_sources = list()
for source in list(sources):
# Download http sources
for source in http_sources:
dst = os.path.join(self.src_dir, source.split("/")[-1])
+ # TODO: Check if the tar.gz is already downloaded using a hash
+ # and don't download twice !!
command = "wget -o %s %s" % (dst, source)
self.node.execute(command)
depends = self.get("depends")
if depends:
self.info(" Installing dependencies %s" % depends)
- self.node.install_packages(depends, home = self.home)
+ self.node.install_packages(depends, home = self.app_home)
def build(self):
build = self.get("build")
cmd = self.replace_paths(build)
- (out, err), proc = self.run_and_wait(cmd, self.home,
+ (out, err), proc = self.run_and_wait(cmd, self.app_home,
pidfile = "build_pid",
- stdout = "build_log",
+ stdout = "build_out",
stderr = "build_err",
raise_on_error = True)
cmd = self.replace_paths(install)
- (out, err), proc = self.run_and_wait(cmd, self.home,
+ (out, err), proc = self.run_and_wait(cmd, self.app_home,
pidfile = "install_pid",
- stdout = "install_log",
+ stdout = "install_out",
stderr = "install_err",
raise_on_error = True)
def deploy(self):
+ command = self.replace_paths(self.get("command"))
+
+ self.info(" Deploying command '%s' " % command)
+
# Wait until node is associated and deployed
node = self.node
if not node or node.state < ResourceState.READY:
self.debug("---- RESCHEDULING DEPLOY ---- node state %s " % self.node.state )
- self.ec.schedule(DELAY, self.deploy)
+ self.ec.schedule(reschedule_delay, self.deploy)
else:
try:
self.discover()
super(LinuxApplication, self).start()
- self.info("Starting command %s" % command)
+ self.info("Starting command '%s'" % command)
if x11:
(out, err), proc = self.node.execute(command,
if proc.poll() and err:
failed = True
else:
- (out, err), proc = self.node.run(command, self.home,
+ # Run the command asynchronously
+ command = "bash ./app.sh"
+ (out, err), proc = self.node.run(command, self.app_home,
stdin = stdin,
sudo = sudo)
failed = True
if not failed:
- pid, ppid = self.node.wait_pid(home = self.home)
+ pid, ppid = self.node.wait_pid(home = self.app_home)
if pid: self._pid = int(pid)
if ppid: self._ppid = int(ppid)
if not self.pid or not self.ppid:
failed = True
- (out, chkerr), proc = self.node.check_output(self.home, 'stderr')
+ (out, chkerr), proc = self.node.check_output(self.app_home, 'stderr')
if failed or out or chkerr:
# check if execution errors occurred
@property
def state(self):
if self._state == ResourceState.STARTED:
- (out, err), proc = self.node.check_output(self.home, 'stderr')
+ (out, err), proc = self.node.check_output(self.app_home, 'stderr')
if out or err:
if err.find("No such file or directory") >= 0 :
return ( command
.replace("${SOURCES}", self.src_dir)
.replace("${BUILD}", self.build_dir)
- .replace("${APPHOME}", self.home)
+ .replace("${APPHOME}", self.app_home)
.replace("${NODEHOME}", self.node.home) )
cmd = ""
for p in packages:
- cmd += " ( dpkg -s %(package)s || sudo apt-get -y install %(package)s ) ; " % {
+ cmd += " ( dpkg -s %(package)s || sudo -S apt-get -y install %(package)s ) ; " % {
'package': p}
#cmd = (dpkg -s vim || sudo dpkg -s install vim) ; (...)
cmd = ""
for p in packages:
- cmd += " ( dpkg -s %(package)s && sudo apt-get -y purge %(package)s ) ; " % {
+ cmd += " ( dpkg -s %(package)s && sudo -S apt-get -y purge %(package)s ) ; " % {
'package': p}
#cmd = (dpkg -s vim || sudo apt-get -y purge vim) ; (...)
# TODO: Verify files and dirs exists already
# TODO: Blacklist nodes!
+# TODO: Unify delays!!
-DELAY ="1s"
+reschedule_delay = "0.5s"
@clsinit
class LinuxNode(ResourceManager):
@property
def exp_dir(self):
exp_dir = os.path.join(self.home, self.ec.exp_id)
- return exp_dir if exp_dir.startswith('/') else "${HOME}/"
+ return exp_dir if exp_dir.startswith('/') or \
+ exp_dir.startswith("~/") else "~/"
@property
- def node_dir(self):
- node_dir = "node-%d" % self.guid
- return os.path.join(self.exp_dir, node_dir)
+ def node_home(self):
+ node_home = "node-%d" % self.guid
+ return os.path.join(self.exp_dir, node_home)
@property
def os(self):
if self.get("cleanHome"):
self.clean_home()
- self.mkdir(self.node_dir)
+ self.mkdir(self.node_home)
super(LinuxNode, self).provision()
ifaces = self.get_connected(LinuxInterface.rtype())
for iface in ifaces:
if iface.state < ResourceState.READY:
- self.ec.schedule(DELAY, self.deploy)
+ self.ec.schedule(reschedule_delay, self.deploy)
return
super(LinuxNode, self).deploy()
"sudo -S killall -u %s || /bin/true ; " % self.get("username") +
"sudo -S killall -u %s || /bin/true ; " % self.get("username"))
-
out = err = ""
(out, err), proc = self.execute(cmd, retry = 1, with_lock = True)
return self.copy(src, dst)
def install_packages(self, packages, home = None):
- home = home or self.node_dir
+ home = home or self.node_home
cmd = ""
if self.os in ["f12", "f14"]:
out = err = ""
(out, err), proc = self.run_and_wait(cmd, home,
pidfile = "instpkg_pid",
- stdout = "instpkg_log",
- stderr = "instpkg_err",
+ stdout = "instpkg_out",
+ stderr = "instpkg_err",
raise_on_error = True)
return (out, err), proc
def remove_packages(self, packages, home = None):
- home = home or self.node_dir
+ home = home or self.node_home
cmd = ""
if self.os in ["f12", "f14"]:
out = err = ""
(out, err), proc = self.run_and_wait(cmd, home,
pidfile = "rmpkg_pid",
- stdout = "rmpkg_log",
- stderr = "rmpkg_err",
+ stdout = "rmpkg_out",
+ stderr = "rmpkg_err",
raise_on_error = True)
return (out, err), proc
stdout = 'stdout',
stderr = 'stderr',
sudo = False,
+ tty = False,
raise_on_error = False):
""" runs a command in background on the remote host, but waits
until the command finishes execution.
stdin = stdin,
stdout = stdout,
stderr = stderr,
- sudo = sudo)
+ sudo = sudo,
+ tty = tty)
# check no errors occurred
if proc.poll() and err:
def check_output(self, home, filename):
""" checks file content """
(out, err), proc = self.execute("cat %s" %
- os.path.join(home, filename), with_lock = True)
+ os.path.join(home, filename), retry = 1, with_lock = True)
return (out, err), proc
def is_alive(self):
stdin = None,
stdout = 'stdout',
stderr = 'stderr',
- sudo = False):
+ sudo = False,
+ tty = False):
- self.debug("Running %s" % command)
+ self.debug("Running command '%s'" % command)
if self.localhost:
(out, err), proc = execfuncs.lspawn(command, pidfile,
port = self.get("port"),
agent = True,
identity = self.get("identity"),
- server_key = self.get("serverKey")
+ server_key = self.get("serverKey"),
+ tty = tty
)
return (out, err), proc
cmd = "( %s )" % install_rpmfusion_command(os)
for p in packages:
- cmd += " ; ( rpm -q %(package)s || sudo yum -y install %(package)s ) " % {
+ cmd += " ; ( rpm -q %(package)s || sudo -S yum -y install %(package)s ) " % {
'package': p}
#cmd = ((rpm -q rpmfusion-free-release || sudo -s rpm -i ...) ; (rpm -q vim || sudo yum -y install vim))
cmd = ""
for p in packages:
- cmd += " ( rpm -q %(package)s && sudo yum -y remove %(package)s ) ; " % {
+ cmd += " ( rpm -q %(package)s && sudo -S yum -y remove %(package)s ) ; " % {
'package': p}
#cmd = (rpm -q vim || sudo yum -y remove vim) ; (...)
"""Register the attributes of an OMF interface
"""
- alias = Attribute("alias","Alias of the interface", default_value = "w0")
+ alias = Attribute("alias","Alias of the interface", default = "w0")
mode = Attribute("mode","Mode of the interface")
type = Attribute("type","Type of the interface")
essid = Attribute("essid","Essid of the interface")
if skip:
t = x*2
- msg = "SLEEPING %d ... ATEMP %d - host %s - command %s " % (
+ msg = "SLEEPING %d ... ATEMPT %d - host %s - command %s " % (
t, x, host, " ".join(args))
log(msg, logging.DEBUG)
#!/usr/bin/env python
-from neco.execution.ec import ExperimentController
+from neco.execution.ec import ExperimentController, ECState
from neco.execution.scheduler import TaskStatus
import datetime
return 'hola!'
ec = ExperimentController()
+
+ tid = ec.schedule("0s", myfunc, track=True)
- try:
- tid = ec.schedule("0s", myfunc, track=True)
-
- while True:
- task = ec.get_task(tid)
- if task.status != TaskStatus.NEW:
- break
+ while True:
+ task = ec.get_task(tid)
+ if task.status != TaskStatus.NEW:
+ break
- time.sleep(1)
+ time.sleep(1)
- self.assertEquals('hola!', task.result)
+ self.assertEquals('hola!', task.result)
- finally:
- ec.shutdown()
+ ec.shutdown()
def test_schedule_date(self):
def get_time():
ec = ExperimentController()
- try:
- schedule_time = datetime.datetime.now()
-
- tid = ec.schedule("4s", get_time, track=True)
+ schedule_time = datetime.datetime.now()
+
+ tid = ec.schedule("4s", get_time, track=True)
+
+ while True:
+ task = ec.get_task(tid)
+ if task.status != TaskStatus.NEW:
+ break
+
+ time.sleep(1)
- while True:
- task = ec.get_task(tid)
- if task.status != TaskStatus.NEW:
- break
+ execution_time = task.result
+ delta = execution_time - schedule_time
+ self.assertTrue(delta > datetime.timedelta(seconds=4))
+ self.assertTrue(delta < datetime.timedelta(seconds=5))
- time.sleep(1)
+ ec.shutdown()
- execution_time = task.result
- delta = execution_time - schedule_time
- self.assertTrue(delta > datetime.timedelta(seconds=4))
- self.assertTrue(delta < datetime.timedelta(seconds=5))
+ def test_schedule_exception(self):
+ def raise_error():
+ raise RuntimeError, "the error"
- finally:
- ec.shutdown()
+ ec = ExperimentController()
+ ec.schedule("2s", raise_error)
+
+ while ec.ecstate not in [ECState.FAILED, ECState.TERMINATED]:
+ time.sleep(1)
+
+ self.assertEquals(ec.ecstate, ECState.FAILED)
+ ec.shutdown()
if __name__ == '__main__':
ec.register_connection(iface1, chan)
ec.register_connection(iface2, chan)
- try:
- ec.deploy()
+ ec.deploy()
- while not all([ ec.state(guid) == ResourceState.STARTED \
- for guid in [app1, app2, node1, node2, iface1, iface2, chan]]):
- time.sleep(0.5)
+ while not all([ ec.state(guid) == ResourceState.STARTED \
+ for guid in [app1, app2, node1, node2, iface1, iface2, chan]]) \
+ and not ec.finished:
+ time.sleep(0.5)
- finally:
- ec.shutdown()
+ ec.shutdown()
rmapp1 = ec.get_resource(app1)
rmapp2 = ec.get_resource(app2)
#!/usr/bin/env python
from neco.execution.ec import ExperimentController
-from neco.execution.resource import ResourceState
+from neco.execution.resource import ResourceState, ResourceAction
from neco.execution.trace import TraceAttr
from neco.resources.linux.node import LinuxNode
from neco.resources.linux.application import LinuxApplication
self.target = 'nepi5.pl.sophia.inria.fr'
+ @skipIfNotAlive
+ def t_stdout(self, host, user):
+ from neco.execution.resource import ResourceFactory
+
+ ResourceFactory.register_type(LinuxNode)
+ ResourceFactory.register_type(LinuxApplication)
+
+ ec = ExperimentController()
+
+ node = ec.register_resource("LinuxNode")
+ ec.set(node, "hostname", host)
+ ec.set(node, "username", user)
+ ec.set(node, "cleanHome", True)
+ ec.set(node, "cleanProcesses", True)
+
+ app = ec.register_resource("LinuxApplication")
+ cmd = "echo 'HOLA'"
+ ec.set(app, "command", cmd)
+ ec.register_connection(app, node)
+
+ ec.deploy()
+
+ ec.wait_finished([app])
+
+ self.assertTrue(ec.state(node) == ResourceState.STARTED)
+ self.assertTrue(ec.state(app) == ResourceState.FINISHED)
+
+ stdout = ec.trace(app, 'stdout')
+ self.assertTrue(stdout.strip() == "HOLA")
+
+ ec.shutdown()
+
@skipIfNotAlive
def t_ping(self, host, user):
from neco.execution.resource import ResourceFactory
ec.register_connection(app, node)
- try:
- ec.deploy()
+ ec.deploy()
+
+ ec.wait_finished([app])
- while not ec.state(app) == ResourceState.FINISHED:
- time.sleep(0.5)
+ self.assertTrue(ec.state(node) == ResourceState.STARTED)
+ self.assertTrue(ec.state(app) == ResourceState.FINISHED)
+
+ stdout = ec.trace(app, 'stdout')
+ size = ec.trace(app, 'stdout', attr = TraceAttr.SIZE)
+ self.assertEquals(len(stdout), size)
+
+ block = ec.trace(app, 'stdout', attr = TraceAttr.STREAM, block = 5, offset = 1)
+ self.assertEquals(block, stdout[5:10])
- self.assertTrue(ec.state(node) == ResourceState.STARTED)
- self.assertTrue(ec.state(app) == ResourceState.FINISHED)
+ path = ec.trace(app, 'stdout', attr = TraceAttr.PATH)
+ rm = ec.get_resource(app)
+ p = os.path.join(rm.app_home, 'stdout')
+ self.assertEquals(path, p)
+
+ ec.shutdown()
+
+ @skipIfNotAlive
+ def t_concurrency(self, host, user):
+ from neco.execution.resource import ResourceFactory
+
+ ResourceFactory.register_type(LinuxNode)
+ ResourceFactory.register_type(LinuxApplication)
+ ec = ExperimentController()
+
+ node = ec.register_resource("LinuxNode")
+ ec.set(node, "hostname", host)
+ ec.set(node, "username", user)
+ ec.set(node, "cleanHome", True)
+ ec.set(node, "cleanProcesses", True)
+
+ apps = list()
+ for i in xrange(50):
+ app = ec.register_resource("LinuxApplication")
+ cmd = "ping -c5 %s" % self.target
+ ec.set(app, "command", cmd)
+ ec.register_connection(app, node)
+ apps.append(app)
+
+ ec.deploy()
+
+ ec.wait_finished(apps)
+
+ self.assertTrue(ec.state(node) == ResourceState.STARTED)
+ self.assertTrue(
+ all([ec.state(guid) == ResourceState.FINISHED \
+ for guid in apps])
+ )
+
+ for app in apps:
stdout = ec.trace(app, 'stdout')
size = ec.trace(app, 'stdout', attr = TraceAttr.SIZE)
self.assertEquals(len(stdout), size)
path = ec.trace(app, 'stdout', attr = TraceAttr.PATH)
rm = ec.get_resource(app)
- p = os.path.join(rm.home, 'stdout')
+ p = os.path.join(rm.app_home, 'stdout')
self.assertEquals(path, p)
- finally:
- ec.shutdown()
+ ec.shutdown()
@skipIfNotAlive
- def t_concurrency(self, host, user):
+ def t_condition(self, host, user, depends):
from neco.execution.resource import ResourceFactory
ResourceFactory.register_type(LinuxNode)
ec.set(node, "cleanHome", True)
ec.set(node, "cleanProcesses", True)
- apps = list()
- for i in xrange(50):
- app = ec.register_resource("LinuxApplication")
- cmd = "ping -c5 %s" % self.target
- ec.set(app, "command", cmd)
- ec.register_connection(app, node)
- apps.append(app)
+ server = ec.register_resource("LinuxApplication")
+ cmd = "echo 'HOLA' | nc -l 3333"
+ ec.set(server, "command", cmd)
+ ec.set(server, "depends", depends)
+ ec.register_connection(server, node)
+
+ client = ec.register_resource("LinuxApplication")
+ cmd = "nc 127.0.0.1 3333"
+ ec.set(client, "command", cmd)
+ ec.register_connection(client, node)
+
+ ec.register_condition(client, ResourceAction.START, server, ResourceState.STARTED)
- try:
- ec.deploy()
+ apps = [client, server]
+
+ ec.deploy()
+
+ ec.wait_finished(apps)
- while not all([ec.state(guid) == ResourceState.FINISHED \
- for guid in apps]):
- time.sleep(0.5)
+ self.assertTrue(ec.state(node) == ResourceState.STARTED)
+ self.assertTrue(ec.state(server) == ResourceState.FINISHED)
+ self.assertTrue(ec.state(client) == ResourceState.FINISHED)
- self.assertTrue(ec.state(node) == ResourceState.STARTED)
- self.assertTrue(
- all([ec.state(guid) == ResourceState.FINISHED \
- for guid in apps])
- )
+ stdout = ec.trace(client, 'stdout')
+ self.assertTrue(stdout.strip() == "HOLA")
- for app in apps:
- stdout = ec.trace(app, 'stdout')
- size = ec.trace(app, 'stdout', attr = TraceAttr.SIZE)
- self.assertEquals(len(stdout), size)
-
- block = ec.trace(app, 'stdout', attr = TraceAttr.STREAM, block = 5, offset = 1)
- self.assertEquals(block, stdout[5:10])
+ ec.shutdown()
- path = ec.trace(app, 'stdout', attr = TraceAttr.PATH)
- rm = ec.get_resource(app)
- p = os.path.join(rm.home, 'stdout')
- self.assertEquals(path, p)
+ def test_stdout_fedora(self):
+ self.t_stdout(self.fedora_host, self.fedora_user)
- finally:
- ec.shutdown()
+ def test_stdout_ubuntu(self):
+ self.t_stdout(self.ubuntu_host, self.ubuntu_user)
def test_ping_fedora(self):
self.t_ping(self.fedora_host, self.fedora_user)
- def test_fing_ubuntu(self):
+ def test_ping_ubuntu(self):
self.t_ping(self.ubuntu_host, self.ubuntu_user)
def test_concurrency_fedora(self):
def test_concurrency_ubuntu(self):
self.t_concurrency(self.ubuntu_host, self.ubuntu_user)
+ def test_condition_fedora(self):
+ self.t_condition(self.fedora_host, self.fedora_user, "nc")
+
+ def test_condition_ubuntu(self):
+ self.t_condition(self.ubuntu_host, self.ubuntu_user, "netcat")
+
+ # TODO: test compilation, sources, dependencies, etc!!!
if __name__ == '__main__':
unittest.main()
ec.register_connection(iface, node)
ec.register_connection(iface, chan)
- try:
- ec.deploy()
+ ec.deploy()
- while not all([ ec.state(guid) == ResourceState.STARTED \
- for guid in [node, iface]]):
- time.sleep(0.5)
+ while not all([ ec.state(guid) == ResourceState.STARTED \
+ for guid in [node, iface]]) and not ec.finished:
+ time.sleep(0.5)
- self.assertTrue(ec.state(node) == ResourceState.STARTED)
- self.assertTrue(ec.state(iface) == ResourceState.STARTED)
- self.assertTrue(ec.get(iface, "deviceName") == "eth0")
+ self.assertTrue(ec.state(node) == ResourceState.STARTED)
+ self.assertTrue(ec.state(iface) == ResourceState.STARTED)
+ self.assertTrue(ec.get(iface, "deviceName") == "eth0")
- finally:
- ec.shutdown()
+ ec.shutdown()
def test_deploy_fedora(self):
self.t_deploy(self.fedora_host, self.fedora_user)
def t_install(self, host, user):
node, ec = create_node(host, user)
+ (out, err), proc = node.install_packages('gcc')
+ self.assertEquals(out, "")
+
+ (out, err), proc = node.remove_packages('gcc')
+
+ self.assertEquals(out, "")
+
+
+ @skipIfNotAlive
+ def t_compile(self, host, user):
+ node, ec = create_node(host, user)
+
app_home = os.path.join(node.exp_dir, "my-app")
node.mkdir(app_home, clean = True)
def test_install_ubuntu(self):
self.t_install(self.ubuntu_host, self.ubuntu_user)
+
+ def test_compile_fedora(self):
+ self.t_compile(self.fedora_host, self.fedora_user)
+
+ def test_compile_ubuntu(self):
+ self.t_compile(self.ubuntu_host, self.ubuntu_user)
@skipInteractive
def test_xterm_ubuntu(self):
def skipInteractive(func):
name = func.__name__
def wrapped(*args, **kwargs):
- mode = os.environ.get("NEPI_INTERACTIVE", False).lower() in ['true', 'yes']
+ mode = os.environ.get("NEPI_INTERACTIVE", False)
+ mode = mode and mode.lower() in ['true', 'yes']
if not mode:
print "*** WARNING: Skipping test %s: Interactive mode off \n" % name
return