From 6337302c0db631641b3e6a47c6e57c4864711acc Mon Sep 17 00:00:00 2001 From: Alina Quereilhac Date: Thu, 9 May 2013 20:02:51 +0200 Subject: [PATCH] Added unit tests for linux application --- src/neco/execution/ec.py | 86 ++++++++---- src/neco/execution/resource.py | 54 +++++++- src/neco/resources/linux/application.py | 71 +++++----- src/neco/resources/linux/debfuncs.py | 4 +- src/neco/resources/linux/node.py | 43 +++--- src/neco/resources/linux/rpmfuncs.py | 4 +- src/neco/resources/omf/omf_interface.py | 2 +- src/neco/util/sshfuncs.py | 2 +- test/execution/ec.py | 63 +++++---- test/execution/resource.py | 13 +- test/resources/linux/application.py | 165 ++++++++++++++++++------ test/resources/linux/interface.py | 18 ++- test/resources/linux/node.py | 18 +++ test/resources/linux/test_utils.py | 3 +- 14 files changed, 376 insertions(+), 170 deletions(-) diff --git a/src/neco/execution/ec.py b/src/neco/execution/ec.py index c9e4e069..ba064d49 100644 --- a/src/neco/execution/ec.py +++ b/src/neco/execution/ec.py @@ -13,6 +13,12 @@ from neco.execution.scheduler import HeapScheduler, Task, TaskStatus from neco.execution.trace import TraceAttr # TODO: use multiprocessing instead of threading +# TODO: Improve speed. Too slow... !! + +class ECState(object): + RUNNING = 1 + FAILED = 2 + TERMINATED = 3 class ExperimentController(object): def __init__(self, exp_id = None, root_dir = "/tmp"): @@ -29,9 +35,6 @@ class ExperimentController(object): # Resource managers self._resources = dict() - # Resource managers - self._group = dict() - # Scheduler self._scheduler = HeapScheduler() @@ -39,11 +42,14 @@ class ExperimentController(object): self._tasks = dict() # Event processing thread - self._stop = False self._cond = threading.Condition() self._thread = threading.Thread(target = self._process) + self._thread.setDaemon(True) self._thread.start() + # EC state + self._state = ECState.RUNNING + # Logging self._logger = logging.getLogger("ExperimentController") @@ -51,6 +57,10 @@ class ExperimentController(object): def logger(self): return self._logger + @property + def ecstate(self): + return self._state + @property def exp_id(self): exp_id = self._exp_id @@ -58,6 +68,15 @@ class ExperimentController(object): exp_id = "nepi-" + exp_id return exp_id + @property + def finished(self): + return self.ecstate in [ECState.FAILED, ECState.TERMINATED] + + def wait_finished(self, guids): + while not all([self.state(guid) == ResourceState.FINISHED \ + for guid in guids]) and not self.finished: + time.sleep(1) + def get_task(self, tid): return self._tasks.get(tid) @@ -80,16 +99,6 @@ class ExperimentController(object): return guid - def register_group(self, group): - guid = self._guid_generator.next() - - if not isinstance(group, list): - group = [group] - - self._groups[guid] = group - - return guid - def get_attributes(self, guid): rm = self.get_resource(guid) return rm.get_attributes() @@ -278,10 +287,15 @@ class ExperimentController(object): thread = threading.Thread(target = steps, args = (rm,)) threads.append(thread) + thread.setDaemon(True) thread.start() - for thread in threads: - thread.join() + while list(threads) and not self.finished: + thread = threads[0] + # Time out after 5 seconds to check EC not terminated + thread.join(5) + if not thread.is_alive(): + threads.remove(thread) def release(self, group = None): if not group: @@ -292,18 +306,25 @@ class ExperimentController(object): rm = self.get_resource(guid) thread = threading.Thread(target=rm.release) threads.append(thread) + thread.setDaemon(True) thread.start() - for thread in threads: - thread.join() + while list(threads) and not self.finished: + thread = threads[0] + # Time out after 5 seconds to check EC not terminated + thread.join(5) + if not thread.is_alive(): + threads.remove(thread) + + self._state = ECState.TERMINATED def shutdown(self): self.release() - self._stop = True self._cond.acquire() self._cond.notify() self._cond.release() + if self._thread.is_alive(): self._thread.join() @@ -342,7 +363,7 @@ class ExperimentController(object): runner.start() try: - while not self._stop: + while not self.finished: self._cond.acquire() task = self._scheduler.next() self._cond.release() @@ -369,11 +390,19 @@ class ExperimentController(object): else: # Process tasks in parallel runner.put(self._execute, task) - except: + + except: import traceback err = traceback.format_exc() self._logger.error("Error while processing tasks in the EC: %s" % err) + self._state = ECState.FAILED + return + + # Mark EC state as terminated + if self.ecstate == ECState.RUNNING: + self._state = ECState.TERMINATED + def _execute(self, task): # Invoke callback task.status = TaskStatus.DONE @@ -383,8 +412,19 @@ class ExperimentController(object): except: import traceback err = traceback.format_exc() - self._logger.error("Error while executing event: %s" % err) - task.result = err task.status = TaskStatus.ERROR + + self._logger.error("Error occurred while executing task: %s" % err) + + # Mark the EC as failed + self._state = ECState.FAILED + + # Wake up the EC in case it was sleeping + self._cond.acquire() + self._cond.notify() + self._cond.release() + + # Propage error to the ParallelRunner + raise diff --git a/src/neco/execution/resource.py b/src/neco/execution/resource.py index 322e2764..0eb0d647 100644 --- a/src/neco/execution/resource.py +++ b/src/neco/execution/resource.py @@ -3,10 +3,13 @@ from neco.execution.trace import TraceAttr import copy import functools +import inspect import logging +import os +import pkgutil import weakref -_reschedule_delay = "1s" +reschedule_delay = "0.5s" class ResourceAction: DEPLOY = 0 @@ -374,7 +377,7 @@ class ResourceManager(object): """ reschedule = False - delay = _reschedule_delay + delay = reschedule_delay # check state and time elapsed on all RMs for guid in group: @@ -429,7 +432,7 @@ class ResourceManager(object): """ reschedule = False - delay = _reschedule_delay + delay = reschedule_delay ## evaluate if set conditions are met @@ -452,7 +455,7 @@ class ResourceManager(object): """ reschedule = False - delay = _reschedule_delay + delay = reschedule_delay ## evaluate if set conditions are met @@ -483,7 +486,7 @@ class ResourceManager(object): """ reschedule = False - delay = _reschedule_delay + delay = reschedule_delay ## evaluate if set conditions are met @@ -553,3 +556,44 @@ class ResourceFactory(object): rclass = cls._resource_types[rtype] return rclass(ec, guid) +def populate_factory(): + for rclass in find_types(): + ResourceFactory.register_type(rclass) + +def find_types(): + search_path = os.environ.get("NECO_SEARCH_PATH", "") + search_path = set(search_path.split(" ")) + + import neco.resources + path = os.path.dirname(neco.resources.__file__) + search_path.add(path) + + types = [] + + for importer, modname, ispkg in pkgutil.walk_packages(search_path): + loader = importer.find_module(modname) + try: + module = loader.load_module(loader.fullname) + for attrname in dir(module): + if attrname.startswith("_"): + continue + + attr = getattr(module, attrname) + + if attr == ResourceManager: + continue + + if not inspect.isclass(attr): + continue + + if issubclass(attr, ResourceManager): + types.append(attr) + except: + import traceback + err = traceback.format_exc() + logger = logging.getLogger("Resource.find_types()") + logger.error("Error while lading Resource Managers %s" % err) + + return types + + diff --git a/src/neco/resources/linux/application.py b/src/neco/resources/linux/application.py index bdd271b9..b7b3f4eb 100644 --- a/src/neco/resources/linux/application.py +++ b/src/neco/resources/linux/application.py @@ -7,7 +7,7 @@ from neco.util import sshfuncs import logging import os -DELAY ="1s" +reschedule_delay = "0.5s" # TODO: Resolve wildcards in commands!! @@ -62,12 +62,6 @@ class LinuxApplication(ResourceManager): stdin = Attribute("stdin", "Standard input", flags = Flags.ExecReadOnly) stdout = Attribute("stdout", "Standard output", flags = Flags.ExecReadOnly) stderr = Attribute("stderr", "Standard error", flags = Flags.ExecReadOnly) - update_home = Attribute("updateHome", "If application hash has changed remove old directory and" - "re-upload before starting experiment. If not keep the same directory", - default = True, - type = Types.Bool, - flags = Flags.ExecReadOnly) - tear_down = Attribute("tearDown", "Bash script to be executed before " "releasing the resource", flags = Flags.ReadOnly) @@ -84,7 +78,6 @@ class LinuxApplication(ResourceManager): cls._register_attribute(stdin) cls._register_attribute(stdout) cls._register_attribute(stderr) - cls._register_attribute(update_home) cls._register_attribute(tear_down) @classmethod @@ -116,16 +109,16 @@ class LinuxApplication(ResourceManager): return None @property - def home(self): + def app_home(self): return os.path.join(self.node.exp_dir, self._home) @property def src_dir(self): - return os.path.join(self.home, 'src') + return os.path.join(self.app_home, 'src') @property def build_dir(self): - return os.path.join(self.home, 'build') + return os.path.join(self.app_home, 'build') @property def pid(self): @@ -136,7 +129,9 @@ class LinuxApplication(ResourceManager): return self._ppid def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0): - path = os.path.join(self.home, name) + self.info("Retrieving '%s' trace %s " % (name, attr)) + + path = os.path.join(self.app_home, name) cmd = "(test -f %s && echo 'success') || echo 'error'" % path (out, err), proc = self.node.execute(cmd) @@ -150,7 +145,7 @@ class LinuxApplication(ResourceManager): return path if attr == TraceAttr.ALL: - (out, err), proc = self.node.check_output(self.home, name) + (out, err), proc = self.node.check_output(self.app_home, name) if err and proc.poll(): msg = " Couldn't read trace %s " % name @@ -177,10 +172,8 @@ class LinuxApplication(ResourceManager): return out def provision(self, filters = None): - # TODO: verify home hash or clean home - # create home dir for application - self.node.mkdir(self.home) + self.node.mkdir(self.app_home) # upload sources self.upload_sources() @@ -197,10 +190,20 @@ class LinuxApplication(ResourceManager): # Install self.install() + command = self.replace_paths(self.get("command")) + x11 = self.get("forwardX11") or False + if not x11: + self.info("Uploading command '%s'" % command) + + # If the command runs asynchronous, pre upload the command + # to the app.sh file in the remote host + dst = os.path.join(self.app_home, "app.sh") + self.node.upload(command, dst, text = True) + super(LinuxApplication, self).provision() def upload_sources(self): - # check if sources need to be uploaded and upload them + # TODO: check if sources need to be uploaded and upload them sources = self.get("sources") if sources: self.info(" Uploading sources ") @@ -208,7 +211,7 @@ class LinuxApplication(ResourceManager): # create dir for sources self.node.mkdir(self.src_dir) - sources = self.sources.split(' ') + sources = sources.split(' ') http_sources = list() for source in list(sources): @@ -219,6 +222,8 @@ class LinuxApplication(ResourceManager): # Download http sources for source in http_sources: dst = os.path.join(self.src_dir, source.split("/")[-1]) + # TODO: Check if the tar.gz is already downloaded using a hash + # and don't download twice !! command = "wget -o %s %s" % (dst, source) self.node.execute(command) @@ -239,7 +244,7 @@ class LinuxApplication(ResourceManager): depends = self.get("depends") if depends: self.info(" Installing dependencies %s" % depends) - self.node.install_packages(depends, home = self.home) + self.node.install_packages(depends, home = self.app_home) def build(self): build = self.get("build") @@ -251,9 +256,9 @@ class LinuxApplication(ResourceManager): cmd = self.replace_paths(build) - (out, err), proc = self.run_and_wait(cmd, self.home, + (out, err), proc = self.run_and_wait(cmd, self.app_home, pidfile = "build_pid", - stdout = "build_log", + stdout = "build_out", stderr = "build_err", raise_on_error = True) @@ -264,18 +269,22 @@ class LinuxApplication(ResourceManager): cmd = self.replace_paths(install) - (out, err), proc = self.run_and_wait(cmd, self.home, + (out, err), proc = self.run_and_wait(cmd, self.app_home, pidfile = "install_pid", - stdout = "install_log", + stdout = "install_out", stderr = "install_err", raise_on_error = True) def deploy(self): + command = self.replace_paths(self.get("command")) + + self.info(" Deploying command '%s' " % command) + # Wait until node is associated and deployed node = self.node if not node or node.state < ResourceState.READY: self.debug("---- RESCHEDULING DEPLOY ---- node state %s " % self.node.state ) - self.ec.schedule(DELAY, self.deploy) + self.ec.schedule(reschedule_delay, self.deploy) else: try: self.discover() @@ -296,7 +305,7 @@ class LinuxApplication(ResourceManager): super(LinuxApplication, self).start() - self.info("Starting command %s" % command) + self.info("Starting command '%s'" % command) if x11: (out, err), proc = self.node.execute(command, @@ -310,7 +319,9 @@ class LinuxApplication(ResourceManager): if proc.poll() and err: failed = True else: - (out, err), proc = self.node.run(command, self.home, + # Run the command asynchronously + command = "bash ./app.sh" + (out, err), proc = self.node.run(command, self.app_home, stdin = stdin, sudo = sudo) @@ -318,14 +329,14 @@ class LinuxApplication(ResourceManager): failed = True if not failed: - pid, ppid = self.node.wait_pid(home = self.home) + pid, ppid = self.node.wait_pid(home = self.app_home) if pid: self._pid = int(pid) if ppid: self._ppid = int(ppid) if not self.pid or not self.ppid: failed = True - (out, chkerr), proc = self.node.check_output(self.home, 'stderr') + (out, chkerr), proc = self.node.check_output(self.app_home, 'stderr') if failed or out or chkerr: # check if execution errors occurred @@ -374,7 +385,7 @@ class LinuxApplication(ResourceManager): @property def state(self): if self._state == ResourceState.STARTED: - (out, err), proc = self.node.check_output(self.home, 'stderr') + (out, err), proc = self.node.check_output(self.app_home, 'stderr') if out or err: if err.find("No such file or directory") >= 0 : @@ -432,7 +443,7 @@ class LinuxApplication(ResourceManager): return ( command .replace("${SOURCES}", self.src_dir) .replace("${BUILD}", self.build_dir) - .replace("${APPHOME}", self.home) + .replace("${APPHOME}", self.app_home) .replace("${NODEHOME}", self.node.home) ) diff --git a/src/neco/resources/linux/debfuncs.py b/src/neco/resources/linux/debfuncs.py index 7cf72605..fdc8d3c1 100644 --- a/src/neco/resources/linux/debfuncs.py +++ b/src/neco/resources/linux/debfuncs.py @@ -6,7 +6,7 @@ def install_packages_command(os, packages): cmd = "" for p in packages: - cmd += " ( dpkg -s %(package)s || sudo apt-get -y install %(package)s ) ; " % { + cmd += " ( dpkg -s %(package)s || sudo -S apt-get -y install %(package)s ) ; " % { 'package': p} #cmd = (dpkg -s vim || sudo dpkg -s install vim) ; (...) @@ -18,7 +18,7 @@ def remove_packages_command(os, packages): cmd = "" for p in packages: - cmd += " ( dpkg -s %(package)s && sudo apt-get -y purge %(package)s ) ; " % { + cmd += " ( dpkg -s %(package)s && sudo -S apt-get -y purge %(package)s ) ; " % { 'package': p} #cmd = (dpkg -s vim || sudo apt-get -y purge vim) ; (...) diff --git a/src/neco/resources/linux/node.py b/src/neco/resources/linux/node.py index f7c9daac..4907c21f 100644 --- a/src/neco/resources/linux/node.py +++ b/src/neco/resources/linux/node.py @@ -14,8 +14,9 @@ import threading # TODO: Verify files and dirs exists already # TODO: Blacklist nodes! +# TODO: Unify delays!! -DELAY ="1s" +reschedule_delay = "0.5s" @clsinit class LinuxNode(ResourceManager): @@ -83,12 +84,13 @@ class LinuxNode(ResourceManager): @property def exp_dir(self): exp_dir = os.path.join(self.home, self.ec.exp_id) - return exp_dir if exp_dir.startswith('/') else "${HOME}/" + return exp_dir if exp_dir.startswith('/') or \ + exp_dir.startswith("~/") else "~/" @property - def node_dir(self): - node_dir = "node-%d" % self.guid - return os.path.join(self.exp_dir, node_dir) + def node_home(self): + node_home = "node-%d" % self.guid + return os.path.join(self.exp_dir, node_home) @property def os(self): @@ -138,7 +140,7 @@ class LinuxNode(ResourceManager): if self.get("cleanHome"): self.clean_home() - self.mkdir(self.node_dir) + self.mkdir(self.node_home) super(LinuxNode, self).provision() @@ -157,7 +159,7 @@ class LinuxNode(ResourceManager): ifaces = self.get_connected(LinuxInterface.rtype()) for iface in ifaces: if iface.state < ResourceState.READY: - self.ec.schedule(DELAY, self.deploy) + self.ec.schedule(reschedule_delay, self.deploy) return super(LinuxNode, self).deploy() @@ -190,7 +192,6 @@ class LinuxNode(ResourceManager): "sudo -S killall -u %s || /bin/true ; " % self.get("username") + "sudo -S killall -u %s || /bin/true ; " % self.get("username")) - out = err = "" (out, err), proc = self.execute(cmd, retry = 1, with_lock = True) @@ -242,7 +243,7 @@ class LinuxNode(ResourceManager): return self.copy(src, dst) def install_packages(self, packages, home = None): - home = home or self.node_dir + home = home or self.node_home cmd = "" if self.os in ["f12", "f14"]: @@ -257,14 +258,14 @@ class LinuxNode(ResourceManager): out = err = "" (out, err), proc = self.run_and_wait(cmd, home, pidfile = "instpkg_pid", - stdout = "instpkg_log", - stderr = "instpkg_err", + stdout = "instpkg_out", + stderr = "instpkg_err", raise_on_error = True) return (out, err), proc def remove_packages(self, packages, home = None): - home = home or self.node_dir + home = home or self.node_home cmd = "" if self.os in ["f12", "f14"]: @@ -279,8 +280,8 @@ class LinuxNode(ResourceManager): out = err = "" (out, err), proc = self.run_and_wait(cmd, home, pidfile = "rmpkg_pid", - stdout = "rmpkg_log", - stderr = "rmpkg_err", + stdout = "rmpkg_out", + stderr = "rmpkg_err", raise_on_error = True) return (out, err), proc @@ -301,6 +302,7 @@ class LinuxNode(ResourceManager): stdout = 'stdout', stderr = 'stderr', sudo = False, + tty = False, raise_on_error = False): """ runs a command in background on the remote host, but waits until the command finishes execution. @@ -314,7 +316,8 @@ class LinuxNode(ResourceManager): stdin = stdin, stdout = stdout, stderr = stderr, - sudo = sudo) + sudo = sudo, + tty = tty) # check no errors occurred if proc.poll() and err: @@ -395,7 +398,7 @@ class LinuxNode(ResourceManager): def check_output(self, home, filename): """ checks file content """ (out, err), proc = self.execute("cat %s" % - os.path.join(home, filename), with_lock = True) + os.path.join(home, filename), retry = 1, with_lock = True) return (out, err), proc def is_alive(self): @@ -513,9 +516,10 @@ class LinuxNode(ResourceManager): stdin = None, stdout = 'stdout', stderr = 'stderr', - sudo = False): + sudo = False, + tty = False): - self.debug("Running %s" % command) + self.debug("Running command '%s'" % command) if self.localhost: (out, err), proc = execfuncs.lspawn(command, pidfile, @@ -544,7 +548,8 @@ class LinuxNode(ResourceManager): port = self.get("port"), agent = True, identity = self.get("identity"), - server_key = self.get("serverKey") + server_key = self.get("serverKey"), + tty = tty ) return (out, err), proc diff --git a/src/neco/resources/linux/rpmfuncs.py b/src/neco/resources/linux/rpmfuncs.py index 7f44887c..b5a7b3db 100644 --- a/src/neco/resources/linux/rpmfuncs.py +++ b/src/neco/resources/linux/rpmfuncs.py @@ -9,7 +9,7 @@ def install_packages_command(os, packages): cmd = "( %s )" % install_rpmfusion_command(os) for p in packages: - cmd += " ; ( rpm -q %(package)s || sudo yum -y install %(package)s ) " % { + cmd += " ; ( rpm -q %(package)s || sudo -S yum -y install %(package)s ) " % { 'package': p} #cmd = ((rpm -q rpmfusion-free-release || sudo -s rpm -i ...) ; (rpm -q vim || sudo yum -y install vim)) @@ -21,7 +21,7 @@ def remove_packages_command(os, packages): cmd = "" for p in packages: - cmd += " ( rpm -q %(package)s && sudo yum -y remove %(package)s ) ; " % { + cmd += " ( rpm -q %(package)s && sudo -S yum -y remove %(package)s ) ; " % { 'package': p} #cmd = (rpm -q vim || sudo yum -y remove vim) ; (...) diff --git a/src/neco/resources/omf/omf_interface.py b/src/neco/resources/omf/omf_interface.py index d4366352..1bc0fc48 100644 --- a/src/neco/resources/omf/omf_interface.py +++ b/src/neco/resources/omf/omf_interface.py @@ -35,7 +35,7 @@ class OMFWifiInterface(ResourceManager): """Register the attributes of an OMF interface """ - alias = Attribute("alias","Alias of the interface", default_value = "w0") + alias = Attribute("alias","Alias of the interface", default = "w0") mode = Attribute("mode","Mode of the interface") type = Attribute("type","Type of the interface") essid = Attribute("essid","Essid of the interface") diff --git a/src/neco/util/sshfuncs.py b/src/neco/util/sshfuncs.py index b5d8f0ed..043bc64d 100644 --- a/src/neco/util/sshfuncs.py +++ b/src/neco/util/sshfuncs.py @@ -265,7 +265,7 @@ def rexec(command, host, user, if skip: t = x*2 - msg = "SLEEPING %d ... ATEMP %d - host %s - command %s " % ( + msg = "SLEEPING %d ... ATEMPT %d - host %s - command %s " % ( t, x, host, " ".join(args)) log(msg, logging.DEBUG) diff --git a/test/execution/ec.py b/test/execution/ec.py index fba3cd5b..93225406 100755 --- a/test/execution/ec.py +++ b/test/execution/ec.py @@ -1,6 +1,6 @@ #!/usr/bin/env python -from neco.execution.ec import ExperimentController +from neco.execution.ec import ExperimentController, ECState from neco.execution.scheduler import TaskStatus import datetime @@ -13,21 +13,19 @@ class ExecuteControllersTestCase(unittest.TestCase): return 'hola!' ec = ExperimentController() + + tid = ec.schedule("0s", myfunc, track=True) - try: - tid = ec.schedule("0s", myfunc, track=True) - - while True: - task = ec.get_task(tid) - if task.status != TaskStatus.NEW: - break + while True: + task = ec.get_task(tid) + if task.status != TaskStatus.NEW: + break - time.sleep(1) + time.sleep(1) - self.assertEquals('hola!', task.result) + self.assertEquals('hola!', task.result) - finally: - ec.shutdown() + ec.shutdown() def test_schedule_date(self): def get_time(): @@ -35,25 +33,36 @@ class ExecuteControllersTestCase(unittest.TestCase): ec = ExperimentController() - try: - schedule_time = datetime.datetime.now() - - tid = ec.schedule("4s", get_time, track=True) + schedule_time = datetime.datetime.now() + + tid = ec.schedule("4s", get_time, track=True) + + while True: + task = ec.get_task(tid) + if task.status != TaskStatus.NEW: + break + + time.sleep(1) - while True: - task = ec.get_task(tid) - if task.status != TaskStatus.NEW: - break + execution_time = task.result + delta = execution_time - schedule_time + self.assertTrue(delta > datetime.timedelta(seconds=4)) + self.assertTrue(delta < datetime.timedelta(seconds=5)) - time.sleep(1) + ec.shutdown() - execution_time = task.result - delta = execution_time - schedule_time - self.assertTrue(delta > datetime.timedelta(seconds=4)) - self.assertTrue(delta < datetime.timedelta(seconds=5)) + def test_schedule_exception(self): + def raise_error(): + raise RuntimeError, "the error" - finally: - ec.shutdown() + ec = ExperimentController() + ec.schedule("2s", raise_error) + + while ec.ecstate not in [ECState.FAILED, ECState.TERMINATED]: + time.sleep(1) + + self.assertEquals(ec.ecstate, ECState.FAILED) + ec.shutdown() if __name__ == '__main__': diff --git a/test/execution/resource.py b/test/execution/resource.py index 8f5a2b5c..128c6df8 100755 --- a/test/execution/resource.py +++ b/test/execution/resource.py @@ -147,15 +147,14 @@ class ResourceManagerTestCase(unittest.TestCase): ec.register_connection(iface1, chan) ec.register_connection(iface2, chan) - try: - ec.deploy() + ec.deploy() - while not all([ ec.state(guid) == ResourceState.STARTED \ - for guid in [app1, app2, node1, node2, iface1, iface2, chan]]): - time.sleep(0.5) + while not all([ ec.state(guid) == ResourceState.STARTED \ + for guid in [app1, app2, node1, node2, iface1, iface2, chan]]) \ + and not ec.finished: + time.sleep(0.5) - finally: - ec.shutdown() + ec.shutdown() rmapp1 = ec.get_resource(app1) rmapp2 = ec.get_resource(app2) diff --git a/test/resources/linux/application.py b/test/resources/linux/application.py index 445cef7d..b6066f1c 100644 --- a/test/resources/linux/application.py +++ b/test/resources/linux/application.py @@ -1,6 +1,6 @@ #!/usr/bin/env python from neco.execution.ec import ExperimentController -from neco.execution.resource import ResourceState +from neco.execution.resource import ResourceState, ResourceAction from neco.execution.trace import TraceAttr from neco.resources.linux.node import LinuxNode from neco.resources.linux.application import LinuxApplication @@ -22,6 +22,38 @@ class LinuxApplicationTestCase(unittest.TestCase): self.target = 'nepi5.pl.sophia.inria.fr' + @skipIfNotAlive + def t_stdout(self, host, user): + from neco.execution.resource import ResourceFactory + + ResourceFactory.register_type(LinuxNode) + ResourceFactory.register_type(LinuxApplication) + + ec = ExperimentController() + + node = ec.register_resource("LinuxNode") + ec.set(node, "hostname", host) + ec.set(node, "username", user) + ec.set(node, "cleanHome", True) + ec.set(node, "cleanProcesses", True) + + app = ec.register_resource("LinuxApplication") + cmd = "echo 'HOLA'" + ec.set(app, "command", cmd) + ec.register_connection(app, node) + + ec.deploy() + + ec.wait_finished([app]) + + self.assertTrue(ec.state(node) == ResourceState.STARTED) + self.assertTrue(ec.state(app) == ResourceState.FINISHED) + + stdout = ec.trace(app, 'stdout') + self.assertTrue(stdout.strip() == "HOLA") + + ec.shutdown() + @skipIfNotAlive def t_ping(self, host, user): from neco.execution.resource import ResourceFactory @@ -43,15 +75,61 @@ class LinuxApplicationTestCase(unittest.TestCase): ec.register_connection(app, node) - try: - ec.deploy() + ec.deploy() + + ec.wait_finished([app]) - while not ec.state(app) == ResourceState.FINISHED: - time.sleep(0.5) + self.assertTrue(ec.state(node) == ResourceState.STARTED) + self.assertTrue(ec.state(app) == ResourceState.FINISHED) + + stdout = ec.trace(app, 'stdout') + size = ec.trace(app, 'stdout', attr = TraceAttr.SIZE) + self.assertEquals(len(stdout), size) + + block = ec.trace(app, 'stdout', attr = TraceAttr.STREAM, block = 5, offset = 1) + self.assertEquals(block, stdout[5:10]) - self.assertTrue(ec.state(node) == ResourceState.STARTED) - self.assertTrue(ec.state(app) == ResourceState.FINISHED) + path = ec.trace(app, 'stdout', attr = TraceAttr.PATH) + rm = ec.get_resource(app) + p = os.path.join(rm.app_home, 'stdout') + self.assertEquals(path, p) + + ec.shutdown() + + @skipIfNotAlive + def t_concurrency(self, host, user): + from neco.execution.resource import ResourceFactory + + ResourceFactory.register_type(LinuxNode) + ResourceFactory.register_type(LinuxApplication) + ec = ExperimentController() + + node = ec.register_resource("LinuxNode") + ec.set(node, "hostname", host) + ec.set(node, "username", user) + ec.set(node, "cleanHome", True) + ec.set(node, "cleanProcesses", True) + + apps = list() + for i in xrange(50): + app = ec.register_resource("LinuxApplication") + cmd = "ping -c5 %s" % self.target + ec.set(app, "command", cmd) + ec.register_connection(app, node) + apps.append(app) + + ec.deploy() + + ec.wait_finished(apps) + + self.assertTrue(ec.state(node) == ResourceState.STARTED) + self.assertTrue( + all([ec.state(guid) == ResourceState.FINISHED \ + for guid in apps]) + ) + + for app in apps: stdout = ec.trace(app, 'stdout') size = ec.trace(app, 'stdout', attr = TraceAttr.SIZE) self.assertEquals(len(stdout), size) @@ -61,14 +139,13 @@ class LinuxApplicationTestCase(unittest.TestCase): path = ec.trace(app, 'stdout', attr = TraceAttr.PATH) rm = ec.get_resource(app) - p = os.path.join(rm.home, 'stdout') + p = os.path.join(rm.app_home, 'stdout') self.assertEquals(path, p) - finally: - ec.shutdown() + ec.shutdown() @skipIfNotAlive - def t_concurrency(self, host, user): + def t_condition(self, host, user, depends): from neco.execution.resource import ResourceFactory ResourceFactory.register_type(LinuxNode) @@ -82,47 +159,44 @@ class LinuxApplicationTestCase(unittest.TestCase): ec.set(node, "cleanHome", True) ec.set(node, "cleanProcesses", True) - apps = list() - for i in xrange(50): - app = ec.register_resource("LinuxApplication") - cmd = "ping -c5 %s" % self.target - ec.set(app, "command", cmd) - ec.register_connection(app, node) - apps.append(app) + server = ec.register_resource("LinuxApplication") + cmd = "echo 'HOLA' | nc -l 3333" + ec.set(server, "command", cmd) + ec.set(server, "depends", depends) + ec.register_connection(server, node) + + client = ec.register_resource("LinuxApplication") + cmd = "nc 127.0.0.1 3333" + ec.set(client, "command", cmd) + ec.register_connection(client, node) + + ec.register_condition(client, ResourceAction.START, server, ResourceState.STARTED) - try: - ec.deploy() + apps = [client, server] + + ec.deploy() + + ec.wait_finished(apps) - while not all([ec.state(guid) == ResourceState.FINISHED \ - for guid in apps]): - time.sleep(0.5) + self.assertTrue(ec.state(node) == ResourceState.STARTED) + self.assertTrue(ec.state(server) == ResourceState.FINISHED) + self.assertTrue(ec.state(client) == ResourceState.FINISHED) - self.assertTrue(ec.state(node) == ResourceState.STARTED) - self.assertTrue( - all([ec.state(guid) == ResourceState.FINISHED \ - for guid in apps]) - ) + stdout = ec.trace(client, 'stdout') + self.assertTrue(stdout.strip() == "HOLA") - for app in apps: - stdout = ec.trace(app, 'stdout') - size = ec.trace(app, 'stdout', attr = TraceAttr.SIZE) - self.assertEquals(len(stdout), size) - - block = ec.trace(app, 'stdout', attr = TraceAttr.STREAM, block = 5, offset = 1) - self.assertEquals(block, stdout[5:10]) + ec.shutdown() - path = ec.trace(app, 'stdout', attr = TraceAttr.PATH) - rm = ec.get_resource(app) - p = os.path.join(rm.home, 'stdout') - self.assertEquals(path, p) + def test_stdout_fedora(self): + self.t_stdout(self.fedora_host, self.fedora_user) - finally: - ec.shutdown() + def test_stdout_ubuntu(self): + self.t_stdout(self.ubuntu_host, self.ubuntu_user) def test_ping_fedora(self): self.t_ping(self.fedora_host, self.fedora_user) - def test_fing_ubuntu(self): + def test_ping_ubuntu(self): self.t_ping(self.ubuntu_host, self.ubuntu_user) def test_concurrency_fedora(self): @@ -131,6 +205,13 @@ class LinuxApplicationTestCase(unittest.TestCase): def test_concurrency_ubuntu(self): self.t_concurrency(self.ubuntu_host, self.ubuntu_user) + def test_condition_fedora(self): + self.t_condition(self.fedora_host, self.fedora_user, "nc") + + def test_condition_ubuntu(self): + self.t_condition(self.ubuntu_host, self.ubuntu_user, "netcat") + + # TODO: test compilation, sources, dependencies, etc!!! if __name__ == '__main__': unittest.main() diff --git a/test/resources/linux/interface.py b/test/resources/linux/interface.py index 7b914d27..f94c7e6e 100644 --- a/test/resources/linux/interface.py +++ b/test/resources/linux/interface.py @@ -41,19 +41,17 @@ class LinuxInterfaceTestCase(unittest.TestCase): ec.register_connection(iface, node) ec.register_connection(iface, chan) - try: - ec.deploy() + ec.deploy() - while not all([ ec.state(guid) == ResourceState.STARTED \ - for guid in [node, iface]]): - time.sleep(0.5) + while not all([ ec.state(guid) == ResourceState.STARTED \ + for guid in [node, iface]]) and not ec.finished: + time.sleep(0.5) - self.assertTrue(ec.state(node) == ResourceState.STARTED) - self.assertTrue(ec.state(iface) == ResourceState.STARTED) - self.assertTrue(ec.get(iface, "deviceName") == "eth0") + self.assertTrue(ec.state(node) == ResourceState.STARTED) + self.assertTrue(ec.state(iface) == ResourceState.STARTED) + self.assertTrue(ec.get(iface, "deviceName") == "eth0") - finally: - ec.shutdown() + ec.shutdown() def test_deploy_fedora(self): self.t_deploy(self.fedora_host, self.fedora_user) diff --git a/test/resources/linux/node.py b/test/resources/linux/node.py index 075c3539..cd56757d 100644 --- a/test/resources/linux/node.py +++ b/test/resources/linux/node.py @@ -75,6 +75,18 @@ class LinuxNodeTestCase(unittest.TestCase): def t_install(self, host, user): node, ec = create_node(host, user) + (out, err), proc = node.install_packages('gcc') + self.assertEquals(out, "") + + (out, err), proc = node.remove_packages('gcc') + + self.assertEquals(out, "") + + + @skipIfNotAlive + def t_compile(self, host, user): + node, ec = create_node(host, user) + app_home = os.path.join(node.exp_dir, "my-app") node.mkdir(app_home, clean = True) @@ -142,6 +154,12 @@ main (void) def test_install_ubuntu(self): self.t_install(self.ubuntu_host, self.ubuntu_user) + + def test_compile_fedora(self): + self.t_compile(self.fedora_host, self.fedora_user) + + def test_compile_ubuntu(self): + self.t_compile(self.ubuntu_host, self.ubuntu_user) @skipInteractive def test_xterm_ubuntu(self): diff --git a/test/resources/linux/test_utils.py b/test/resources/linux/test_utils.py index 369e022a..1cf8aba3 100644 --- a/test/resources/linux/test_utils.py +++ b/test/resources/linux/test_utils.py @@ -35,7 +35,8 @@ def skipIfNotAlive(func): def skipInteractive(func): name = func.__name__ def wrapped(*args, **kwargs): - mode = os.environ.get("NEPI_INTERACTIVE", False).lower() in ['true', 'yes'] + mode = os.environ.get("NEPI_INTERACTIVE", False) + mode = mode and mode.lower() in ['true', 'yes'] if not mode: print "*** WARNING: Skipping test %s: Interactive mode off \n" % name return -- 2.43.0