From 54d2a201dca3af3dabf18601d4909bf506960627 Mon Sep 17 00:00:00 2001 From: Alina Quereilhac Date: Thu, 27 Jun 2013 23:34:48 -0700 Subject: [PATCH] Adding trace Collector RM --- .../linux/ccn/ccncat_extended_ring_topo.py | 69 +++++----- examples/omf/manual_vlc_experiment_plexus.py | 30 ++--- setup.py | 1 + src/nepi/execution/ec.py | 4 +- src/nepi/execution/resource.py | 69 ++++++++-- src/nepi/resources/all/__init__.py | 0 src/nepi/resources/all/collector.py | 121 ++++++++++++++++++ src/nepi/resources/linux/application.py | 5 +- .../resources/linux/ccn/ccnapplication.py | 72 +++++++++++ src/nepi/resources/linux/ccn/ccncontent.py | 3 +- src/nepi/resources/linux/ccn/ccnd.py | 15 ++- src/nepi/resources/linux/ccn/ccnr.py | 3 +- src/nepi/resources/linux/ccn/fibentry.py | 3 +- src/nepi/resources/linux/interface.py | 4 +- src/nepi/resources/linux/node.py | 26 +++- src/nepi/resources/linux/rpmfuncs.py | 7 +- src/nepi/resources/omf/application.py | 4 +- src/nepi/resources/omf/channel.py | 4 +- src/nepi/resources/omf/interface.py | 4 +- src/nepi/resources/omf/node.py | 4 +- src/nepi/resources/omf/omf_api.py | 4 +- src/nepi/resources/planetlab/node.py | 5 +- src/nepi/util/timefuncs.py | 2 +- test/execution/resource.py | 68 +++++++--- 24 files changed, 410 insertions(+), 117 deletions(-) create mode 100644 src/nepi/resources/all/__init__.py create mode 100644 src/nepi/resources/all/collector.py create mode 100644 src/nepi/resources/linux/ccn/ccnapplication.py diff --git a/examples/linux/ccn/ccncat_extended_ring_topo.py b/examples/linux/ccn/ccncat_extended_ring_topo.py index 8c32436d..08625eaf 100755 --- a/examples/linux/ccn/ccncat_extended_ring_topo.py +++ b/examples/linux/ccn/ccncat_extended_ring_topo.py @@ -48,14 +48,13 @@ from optparse import OptionParser, SUPPRESS_HELP import os import time -import tempfile def add_node(ec, host, user, ssh_key = None): node = ec.register_resource("LinuxNode") ec.set(node, "hostname", host) ec.set(node, "username", user) ec.set(node, "identity", ssh_key) - ec.set(node, "cleanHome", True) + #ec.set(node, "cleanHome", True) ec.set(node, "cleanProcesses", True) return node @@ -94,6 +93,13 @@ def add_stream(ec, ccnd, content_name): return app +def add_collector(ec, trace_name, store_dir): + collector = ec.register_resource("Collector") + ec.set(collector, "traceName", trace_name) + ec.set(collector, "storeDir", store_dir) + + return collector + def get_options(): pl_slice = os.environ.get("PL_SLICE") @@ -103,7 +109,7 @@ def get_options(): default_key = default_key if os.path.exists(default_key) else None pl_ssh_key = os.environ.get("PL_SSHKEY", default_key) - usage = "usage: %prog -s -m -c -e -i " + usage = "usage: %prog -s -m -e -i -r 0: + if wait > 0.001: reschedule = True delay = "%fs" % wait break @@ -643,6 +676,18 @@ class ResourceManager(Logger): self._failed_time = tnow() self._state = ResourceState.FAILED + def connect(self, guid): + """ Performs actions that need to be taken upon associating RMs. + This method should be redefined when necessary in child classes. + """ + pass + + def disconnect(self, guid): + """ Performs actions that need to be taken upon disassociating RMs. + This method should be redefined when necessary in child classes. + """ + pass + def valid_connection(self, guid): """Checks whether a connection with the other RM is valid. diff --git a/src/nepi/resources/all/__init__.py b/src/nepi/resources/all/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/nepi/resources/all/collector.py b/src/nepi/resources/all/collector.py new file mode 100644 index 00000000..a9f76672 --- /dev/null +++ b/src/nepi/resources/all/collector.py @@ -0,0 +1,121 @@ +# +# NEPI, a framework to manage network experiments +# Copyright (C) 2013 INRIA +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +# Author: Alina Quereilhac + +from nepi.execution.attribute import Attribute, Flags, Types +from nepi.execution.trace import Trace, TraceAttr +from nepi.execution.resource import ResourceManager, clsinit, ResourceState, \ + ResourceAction +from nepi.util.sshfuncs import ProcStatus +from nepi.util.timefuncs import tsformat + +import os +import tempfile + +@clsinit +class Collector(ResourceManager): + """ The collector is reponsible of collecting traces + of a same type associated to RMs into a local directory. + + .. class:: Class Args : + + :param ec: The Experiment controller + :type ec: ExperimentController + :param guid: guid of the RM + :type guid: int + :param creds: Credentials to communicate with the rm (XmppClient) + :type creds: dict + + """ + _rtype = "Collector" + + @classmethod + def _register_attributes(cls): + trace_name = Attribute("traceName", "Name of the trace to be collected", + flags = Flags.ExecReadOnly) + store_dir = Attribute("storeDir", "Path to local directory to store trace results", + default = tempfile.gettempdir(), + flags = Flags.ExecReadOnly) + + cls._register_attribute(trace_name) + cls._register_attribute(store_dir) + + def __init__(self, ec, guid): + super(Collector, self).__init__(ec, guid) + self._store_path = None + + @property + def store_path(self): + return self._store_path + + def provision(self): + trace_name = self.get("traceName") + if not trace_name: + self.fail() + + msg = "No traceName was specified" + self.error(msg) + raise RuntimeError, msg + + store_dir = self.get("storeDir") + timestamp = tsformat() + self._store_path = os.path.join(store_dir, self.ec.exp_id, timestamp) + + msg = "Creating local directory at %s to store %s traces " % ( + store_dir, trace_name) + self.info(msg) + + try: + os.makedirs(self.store_path) + except OSError: + pass + + super(Collector, self).provision() + + def deploy(self): + try: + self.discover() + self.provision() + except: + self.fail() + raise + + super(Collector, self).deploy() + + def release(self): + trace_name = self.get("traceName") + + msg = "Collecting '%s' traces to local directory %s" % ( + trace_name, self.store_path) + self.info(msg) + + rms = self.get_connected() + for rm in rms: + result = self.ec.trace(rm.guid, trace_name) + fpath = os.path.join(self.store_path, "%d.%s" % (rm.guid, + trace_name)) + f = open(fpath, "w") + f.write(result) + f.close() + + super(Collector, self).release() + + def valid_connection(self, guid): + # TODO: Validate! + return True + diff --git a/src/nepi/resources/linux/application.py b/src/nepi/resources/linux/application.py index dc502308..6c7f82cc 100644 --- a/src/nepi/resources/linux/application.py +++ b/src/nepi/resources/linux/application.py @@ -19,7 +19,8 @@ from nepi.execution.attribute import Attribute, Flags, Types from nepi.execution.trace import Trace, TraceAttr -from nepi.execution.resource import ResourceManager, clsinit, ResourceState +from nepi.execution.resource import ResourceManager, clsinit, ResourceState, \ + reschedule_delay from nepi.resources.linux.node import LinuxNode from nepi.util.sshfuncs import ProcStatus from nepi.util.timefuncs import tnow, tdiffsec @@ -375,8 +376,6 @@ class LinuxApplication(ResourceManager): node = self.node if not node or node.state < ResourceState.READY: self.debug("---- RESCHEDULING DEPLOY ---- node state %s " % self.node.state ) - - reschedule_delay = "0.5s" self.ec.schedule(reschedule_delay, self.deploy) else: try: diff --git a/src/nepi/resources/linux/ccn/ccnapplication.py b/src/nepi/resources/linux/ccn/ccnapplication.py new file mode 100644 index 00000000..e5ae00e0 --- /dev/null +++ b/src/nepi/resources/linux/ccn/ccnapplication.py @@ -0,0 +1,72 @@ +# +# NEPI, a framework to manage network experiments +# Copyright (C) 2013 INRIA +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +# Author: Alina Quereilhac + +from nepi.execution.resource import clsinit_copy, ResourceState, \ + ResourceAction +from nepi.resources.linux.application import LinuxApplication +from nepi.resources.linux.ccn.ccnd import LinuxCCND + +import os + +@clsinit_copy +class LinuxCCNApplication(LinuxApplication): + _rtype = "LinuxCCNApplication" + + def __init__(self, ec, guid): + super(LinuxCCNApplication, self).__init__(ec, guid) + self._home = "ccnapp-%s" % self.guid + + @property + def ccnd(self): + ccnd = self.get_connected(LinuxCCND.rtype()) + if ccnd: return ccnd[0] + return None + + @property + def node(self): + if self.ccnd: return self.ccnd.node + return None + + def deploy(self): + if not self.get("env"): + self.set("env", self._environment) + + super(LinuxCCNApplication, self).deploy() + + @property + def _environment(self): + env = "PATH=$PATH:${EXP_HOME}/ccnx/bin " + return env + + def execute_command(self, command, env): + environ = self.node.format_environment(env, inline = True) + command = environ + command + command = self.replace_paths(command) + + (out, err), proc = self.node.execute(command) + + if proc.poll(): + self._state = ResourceState.FAILED + self.error(msg, out, err) + raise RuntimeError, msg + + def valid_connection(self, guid): + # TODO: Validate! + return True + diff --git a/src/nepi/resources/linux/ccn/ccncontent.py b/src/nepi/resources/linux/ccn/ccncontent.py index f6082120..e5216d2e 100644 --- a/src/nepi/resources/linux/ccn/ccncontent.py +++ b/src/nepi/resources/linux/ccn/ccncontent.py @@ -19,7 +19,7 @@ from nepi.execution.attribute import Attribute, Flags, Types from nepi.execution.resource import clsinit_copy, ResourceState, \ - ResourceAction + ResourceAction, reschedule_delay from nepi.resources.linux.ccn.ccnapplication import LinuxCCNApplication from nepi.resources.linux.ccn.ccnr import LinuxCCNR from nepi.util.timefuncs import tnow @@ -62,7 +62,6 @@ class LinuxCCNContent(LinuxCCNApplication): if not self.ccnr or self.ccnr.state < ResourceState.READY: self.debug("---- RESCHEDULING DEPLOY ---- node state %s " % self.node.state ) - reschedule_delay = "0.5s" # ccnr needs to wait until ccnd is deployed and running self.ec.schedule(reschedule_delay, self.deploy) else: diff --git a/src/nepi/resources/linux/ccn/ccnd.py b/src/nepi/resources/linux/ccn/ccnd.py index 6ebe5e37..44c1afb9 100644 --- a/src/nepi/resources/linux/ccn/ccnd.py +++ b/src/nepi/resources/linux/ccn/ccnd.py @@ -19,15 +19,17 @@ from nepi.execution.attribute import Attribute, Flags, Types from nepi.execution.trace import Trace, TraceAttr -from nepi.execution.resource import ResourceManager, clsinit_copy, ResourceState +from nepi.execution.resource import ResourceManager, clsinit_copy, ResourceState, \ + reschedule_delay from nepi.resources.linux.application import LinuxApplication from nepi.resources.linux.node import OSType -from nepi.util.timefuncs import tnow, tdiff +from nepi.util.timefuncs import tnow, tdiffsec import os # TODO: use ccndlogging to dynamically change the logging level + @clsinit_copy class LinuxCCND(LinuxApplication): _rtype = "LinuxCCND" @@ -129,8 +131,7 @@ class LinuxCCND(LinuxApplication): if not self.node or self.node.state < ResourceState.READY: self.debug("---- RESCHEDULING DEPLOY ---- node state %s " % self.node.state ) - reschedule_delay = "0.5s" - # ccnr needs to wait until ccnd is deployed and running + # ccnd needs to wait until node is deployed and running self.ec.schedule(reschedule_delay, self.deploy) else: if not self.get("command"): @@ -238,7 +239,7 @@ class LinuxCCND(LinuxApplication): # First check if the ccnd has failed state_check_delay = 0.5 if self._state == ResourceState.STARTED and \ - tdiff(tnow(), self._last_state_check) > state_check_delay: + tdiffsec(tnow(), self._last_state_check) > state_check_delay: (out, err), proc = self._ccndstatus retcode = proc.poll() @@ -271,11 +272,11 @@ class LinuxCCND(LinuxApplication): @property def _dependencies(self): - if self.node.os in [ OSType.FEDORA_12 , OSType.FEDORA_14 ]: + if self.node.use_rpm: return ( " autoconf openssl-devel expat-devel libpcap-devel " " ecryptfs-utils-devel libxml2-devel automake gawk " " gcc gcc-c++ git pcre-devel make ") - elif self.node.os in [ OSType.UBUNTU , OSType.DEBIAN]: + elif self.node.use_deb: return ( " autoconf libssl-dev libexpat-dev libpcap-dev " " libecryptfs0 libxml2-utils automake gawk gcc g++ " " git-core pkg-config libpcre3-dev make ") diff --git a/src/nepi/resources/linux/ccn/ccnr.py b/src/nepi/resources/linux/ccn/ccnr.py index f2e6c0e1..cdf48e37 100644 --- a/src/nepi/resources/linux/ccn/ccnr.py +++ b/src/nepi/resources/linux/ccn/ccnr.py @@ -20,7 +20,7 @@ from nepi.execution.attribute import Attribute, Flags, Types from nepi.execution.trace import Trace, TraceAttr from nepi.execution.resource import clsinit_copy, ResourceState, \ - ResourceAction + ResourceAction, reschedule_delay from nepi.resources.linux.ccn.ccnapplication import LinuxCCNApplication from nepi.resources.linux.ccn.ccnd import LinuxCCND from nepi.util.timefuncs import tnow @@ -187,7 +187,6 @@ class LinuxCCNR(LinuxCCNApplication): if not self.ccnd or self.ccnd.state < ResourceState.READY: self.debug("---- RESCHEDULING DEPLOY ---- CCND state %s " % self.ccnd.state ) - reschedule_delay = "0.5s" # ccnr needs to wait until ccnd is deployed and running self.ec.schedule(reschedule_delay, self.deploy) else: diff --git a/src/nepi/resources/linux/ccn/fibentry.py b/src/nepi/resources/linux/ccn/fibentry.py index c6698586..c6398feb 100644 --- a/src/nepi/resources/linux/ccn/fibentry.py +++ b/src/nepi/resources/linux/ccn/fibentry.py @@ -20,13 +20,12 @@ from nepi.execution.attribute import Attribute, Flags, Types from nepi.execution.trace import Trace, TraceAttr from nepi.execution.resource import clsinit_copy, ResourceState, \ - ResourceAction + ResourceAction, reschedule_delay from nepi.resources.linux.ccn.ccnapplication import LinuxCCNApplication from nepi.util.timefuncs import tnow import os -reschedule_delay = "0.5s" # TODO: Add rest of options for ccndc!!! # Implement ENTRY DELETE!! diff --git a/src/nepi/resources/linux/interface.py b/src/nepi/resources/linux/interface.py index 68fc0cd0..68c46eb0 100644 --- a/src/nepi/resources/linux/interface.py +++ b/src/nepi/resources/linux/interface.py @@ -18,7 +18,8 @@ # Author: Alina Quereilhac from nepi.execution.attribute import Attribute, Types, Flags -from nepi.execution.resource import ResourceManager, clsinit, ResourceState +from nepi.execution.resource import ResourceManager, clsinit, ResourceState, \ + reschedule_delay from nepi.resources.linux.node import LinuxNode from nepi.resources.linux.channel import LinuxChannel @@ -31,7 +32,6 @@ import time # TODO: UP, MTU attributes! -reschedule_delay = "0.5s" @clsinit class LinuxInterface(ResourceManager): diff --git a/src/nepi/resources/linux/node.py b/src/nepi/resources/linux/node.py index a96267f1..0d2597fd 100644 --- a/src/nepi/resources/linux/node.py +++ b/src/nepi/resources/linux/node.py @@ -18,7 +18,8 @@ # Author: Alina Quereilhac from nepi.execution.attribute import Attribute, Flags -from nepi.execution.resource import ResourceManager, clsinit, ResourceState +from nepi.execution.resource import ResourceManager, clsinit, ResourceState, \ + reschedule_delay from nepi.resources.linux import rpmfuncs, debfuncs from nepi.util import sshfuncs, execfuncs from nepi.util.sshfuncs import ProcStatus @@ -36,7 +37,6 @@ import threading # TODO: Unify delays!! # TODO: Validate outcome of uploads!! -reschedule_delay = "0.5s" class ExitCode: """ @@ -52,6 +52,7 @@ class OSType: """ Supported flavors of Linux OS """ + FEDORA_8 = "f8" FEDORA_12 = "f12" FEDORA_14 = "f14" FEDORA = "fedora" @@ -227,7 +228,9 @@ class LinuxNode(ResourceManager): self.error(msg, out, err) raise RuntimeError, "%s - %s - %s" %( msg, out, err ) - if out.find("Fedora release 12") == 0: + if out.find("Fedora release 8") == 0: + self._os = OSType.FEDORA_8 + elif out.find("Fedora release 12") == 0: self._os = OSType.FEDORA_12 elif out.find("Fedora release 14") == 0: self._os = OSType.FEDORA_14 @@ -242,6 +245,15 @@ class LinuxNode(ResourceManager): return self._os + @property + def use_deb(self): + return self.os in [OSType.DEBIAN, OSType.UBUNTU] + + @property + def use_rpm(self): + return self.os in [OSType.FEDORA_12, OSType.FEDORA_14, OSType.FEDORA_8, + OSType.FEDORA] + @property def localhost(self): return self.get("hostname") in ['localhost', '127.0.0.7', '::1'] @@ -367,9 +379,9 @@ class LinuxNode(ResourceManager): def install_packages(self, packages, home): command = "" - if self.os in [OSType.FEDORA_12, OSType.FEDORA_14, OSType.FEDORA]: + if self.use_rpm: command = rpmfuncs.install_packages_command(self.os, packages) - elif self.os in [OSType.DEBIAN, OSType.UBUNTU]: + elif self.use_deb: command = debfuncs.install_packages_command(self.os, packages) else: msg = "Error installing packages ( OS not known ) " @@ -389,9 +401,9 @@ class LinuxNode(ResourceManager): def remove_packages(self, packages, home): command = "" - if self.os in [OSType.FEDORA_12, OSType.FEDORA_14, OSType.FEDORA]: + if self.use_rpm: command = rpmfuncs.remove_packages_command(self.os, packages) - elif self.os in [OSType.DEBIAN, OSType.UBUNTU]: + elif self.use_deb: command = debfuncs.remove_packages_command(self.os, packages) else: msg = "Error removing packages ( OS not known ) " diff --git a/src/nepi/resources/linux/rpmfuncs.py b/src/nepi/resources/linux/rpmfuncs.py index 5fa0592e..5497a57f 100644 --- a/src/nepi/resources/linux/rpmfuncs.py +++ b/src/nepi/resources/linux/rpmfuncs.py @@ -26,7 +26,8 @@ def install_packages_command(os, packages): if not isinstance(packages, list): packages = [packages] - cmd = install_rpmfusion_command(os) + " && " + cmd = install_rpmfusion_command(os) + if cmd: cmd += " && " cmd += " && ".join(map(lambda p: " { rpm -q %(package)s || sudo -S yum -y install %(package)s ; } " % { 'package': p}, packages)) @@ -51,11 +52,13 @@ def install_rpmfusion_command(os): cmd = " { rpm -q rpmfusion-free-release || sudo -S rpm -i %(package)s ; } " if os in [OSType.FEDORA, OSType.FEDORA_12]: + # For f12 cmd = cmd % {'package': RPM_FUSION_URL_F12} elif os == OSType.FEDORA_14: - # This one works for f13+ + # For f13+ cmd = cmd % {'package': RPM_FUSION_URL} else: + # Fedora 8 is unmaintained cmd = "" return cmd diff --git a/src/nepi/resources/omf/application.py b/src/nepi/resources/omf/application.py index 6da1160a..170410b2 100644 --- a/src/nepi/resources/omf/application.py +++ b/src/nepi/resources/omf/application.py @@ -18,11 +18,11 @@ # Author: Alina Quereilhac # Julien Tribino -from nepi.execution.resource import ResourceManager, clsinit, ResourceState +from nepi.execution.resource import ResourceManager, clsinit, ResourceState, \ + reschedule_delay from nepi.execution.attribute import Attribute, Flags from nepi.resources.omf.omf_api import OMFAPIFactory -reschedule_delay = "0.5s" @clsinit class OMFApplication(ResourceManager): diff --git a/src/nepi/resources/omf/channel.py b/src/nepi/resources/omf/channel.py index fa8df03b..8d8a5db6 100644 --- a/src/nepi/resources/omf/channel.py +++ b/src/nepi/resources/omf/channel.py @@ -17,12 +17,12 @@ """ -from nepi.execution.resource import ResourceManager, clsinit, ResourceState +from nepi.execution.resource import ResourceManager, clsinit, ResourceState, \ + reschedule_delay from nepi.execution.attribute import Attribute, Flags from nepi.resources.omf.omf_api import OMFAPIFactory -reschedule_delay = "0.5s" @clsinit class OMFChannel(ResourceManager): diff --git a/src/nepi/resources/omf/interface.py b/src/nepi/resources/omf/interface.py index 0e1dad9d..b15be5ea 100644 --- a/src/nepi/resources/omf/interface.py +++ b/src/nepi/resources/omf/interface.py @@ -18,12 +18,12 @@ # Author: Alina Quereilhac # Julien Tribino -from nepi.execution.resource import ResourceManager, clsinit, ResourceState +from nepi.execution.resource import ResourceManager, clsinit, ResourceState, \ + reschedule_delay from nepi.execution.attribute import Attribute, Flags from nepi.resources.omf.omf_api import OMFAPIFactory -reschedule_delay = "0.5s" @clsinit class OMFWifiInterface(ResourceManager): diff --git a/src/nepi/resources/omf/node.py b/src/nepi/resources/omf/node.py index 4658cb54..fad525b0 100644 --- a/src/nepi/resources/omf/node.py +++ b/src/nepi/resources/omf/node.py @@ -19,14 +19,14 @@ # Julien Tribino -from nepi.execution.resource import ResourceManager, clsinit, ResourceState +from nepi.execution.resource import ResourceManager, clsinit, ResourceState, \ + reschedule_delay from nepi.execution.attribute import Attribute, Flags from nepi.resources.omf.omf_api import OMFAPIFactory import time -reschedule_delay = "0.5s" @clsinit class OMFNode(ResourceManager): diff --git a/src/nepi/resources/omf/omf_api.py b/src/nepi/resources/omf/omf_api.py index 3316f017..7d96cfb0 100644 --- a/src/nepi/resources/omf/omf_api.py +++ b/src/nepi/resources/omf/omf_api.py @@ -29,7 +29,7 @@ from nepi.util.logger import Logger from nepi.resources.omf.omf_client import OMFClient from nepi.resources.omf.messages_5_4 import MessageHandler -from nepi.util.timefuncs import tsfromat +from nepi.util.timefuncs import tsformat class OMFAPI(Logger): """ @@ -68,7 +68,7 @@ class OMFAPI(Logger): """ super(OMFAPI, self).__init__("OMFAPI") - date = tsfromat() + date = tsformat() tz = -time.altzone if time.daylight != 0 else -time.timezone date += "%+06.2f" % (tz / 3600) # timezone difference is in seconds self._user = "%s-%s" % (slice, date) diff --git a/src/nepi/resources/planetlab/node.py b/src/nepi/resources/planetlab/node.py index bdcc19f1..25358a27 100644 --- a/src/nepi/resources/planetlab/node.py +++ b/src/nepi/resources/planetlab/node.py @@ -18,12 +18,11 @@ # Author: Alina Quereilhac from nepi.execution.attribute import Attribute, Flags, Types -from nepi.execution.resource import ResourceManager, clsinit_copy, ResourceState +from nepi.execution.resource import ResourceManager, clsinit_copy, ResourceState, \ + reschedule_delay from nepi.resources.linux.node import LinuxNode - from nepi.resources.planetlab.plcapi import PLCAPIFactory -reschedule_delay = "0.5s" @clsinit_copy class PlanetlabNode(LinuxNode): diff --git a/src/nepi/util/timefuncs.py b/src/nepi/util/timefuncs.py index 625dd704..1a8e102a 100644 --- a/src/nepi/util/timefuncs.py +++ b/src/nepi/util/timefuncs.py @@ -35,7 +35,7 @@ def stformat(sdate): """ return datetime.datetime.strptime(sdate, _strf).date() -def tsfromat(date = None): +def tsformat(date = None): """ Formats a datetime object to a string with format YYYYMMddHHMMSSffff. If no date is given, the current date is used. diff --git a/test/execution/resource.py b/test/execution/resource.py index 7122b92d..365ca08c 100755 --- a/test/execution/resource.py +++ b/test/execution/resource.py @@ -21,7 +21,8 @@ from nepi.execution.attribute import Attribute from nepi.execution.ec import ExperimentController -from nepi.execution.resource import ResourceManager, ResourceState, clsinit +from nepi.execution.resource import ResourceManager, ResourceState, clsinit, \ + ResourceAction import random import time @@ -45,24 +46,7 @@ class AnotherResource(ResourceManager): def __init__(self, ec, guid): super(AnotherResource, self).__init__(ec, guid) - -class ResourceFactoryTestCase(unittest.TestCase): - def test_add_resource_factory(self): - from nepi.execution.resource import ResourceFactory - - ResourceFactory.register_type(MyResource) - ResourceFactory.register_type(AnotherResource) - - self.assertEquals(MyResource.rtype(), "MyResource") - self.assertEquals(len(MyResource._attributes), 1) - - self.assertEquals(ResourceManager.rtype(), "Resource") - self.assertEquals(len(ResourceManager._attributes), 0) - - self.assertEquals(AnotherResource.rtype(), "AnotherResource") - self.assertEquals(len(AnotherResource._attributes), 0) - self.assertEquals(len(ResourceFactory.resource_types()), 2) class Channel(ResourceManager): _rtype = "Channel" @@ -135,8 +119,56 @@ class Application(ResourceManager): super(Application, self).start() time.sleep(random.random() * 5) self._state = ResourceState.FINISHED + + +class ResourceFactoryTestCase(unittest.TestCase): + def test_add_resource_factory(self): + from nepi.execution.resource import ResourceFactory + + ResourceFactory.register_type(MyResource) + ResourceFactory.register_type(AnotherResource) + + self.assertEquals(MyResource.rtype(), "MyResource") + self.assertEquals(len(MyResource._attributes), 1) + + self.assertEquals(ResourceManager.rtype(), "Resource") + self.assertEquals(len(ResourceManager._attributes), 0) + + self.assertEquals(AnotherResource.rtype(), "AnotherResource") + self.assertEquals(len(AnotherResource._attributes), 0) + + self.assertEquals(len(ResourceFactory.resource_types()), 2) class ResourceManagerTestCase(unittest.TestCase): + def test_register_condition(self): + ec = ExperimentController() + rm = ResourceManager(ec, 15) + + group = [1,3,5,7] + rm.register_condition(ResourceAction.START, group, + ResourceState.STARTED) + + group = [10,8] + rm.register_condition(ResourceAction.START, + group, ResourceState.STARTED, time = "10s") + + waiting_for = [] + conditions = rm.conditions.get(ResourceAction.START) + for (group, state, time) in conditions: + waiting_for.extend(group) + + self.assertEquals(waiting_for, [1, 3, 5, 7, 10, 8]) + + group = [1, 2, 3, 4, 6] + rm.unregister_condition(group) + + waiting_for = [] + conditions = rm.conditions.get(ResourceAction.START) + for (group, state, time) in conditions: + waiting_for.extend(group) + + self.assertEquals(waiting_for, [5, 7, 10, 8]) + def test_deploy_in_order(self): """ Test scenario: 2 applications running one on 1 node each. -- 2.43.0