import os
import time
-import tempfile
def add_node(ec, host, user, ssh_key = None):
node = ec.register_resource("LinuxNode")
ec.set(node, "hostname", host)
ec.set(node, "username", user)
ec.set(node, "identity", ssh_key)
- ec.set(node, "cleanHome", True)
+ #ec.set(node, "cleanHome", True)
ec.set(node, "cleanProcesses", True)
return node
return app
+def add_collector(ec, trace_name, store_dir):
+ collector = ec.register_resource("Collector")
+ ec.set(collector, "traceName", trace_name)
+ ec.set(collector, "storeDir", store_dir)
+
+ return collector
+
def get_options():
pl_slice = os.environ.get("PL_SLICE")
default_key = default_key if os.path.exists(default_key) else None
pl_ssh_key = os.environ.get("PL_SSHKEY", default_key)
- usage = "usage: %prog -s <pl-user> -m <movie> -c -e <exp-id> -i <ssh_key>"
+ usage = "usage: %prog -s <pl-user> -m <movie> -e <exp-id> -i <ssh_key> -r <results"
parser = OptionParser(usage=usage)
parser.add_option("-s", "--pl-user", dest="pl_user",
parser.add_option("-i", "--pl-ssh-key", dest="pl_ssh_key",
help="Path to private SSH key to be used for connection",
default = pl_ssh_key, type="str")
+ parser.add_option("-r", "--results", dest="results", default = "/tmp",
+ help="Path to directory where to store results", type="str")
(options, args) = parser.parse_args()
if not options.movie:
parser.error("movie is a required argument")
- return (options.pl_user, options.movie, options.exp_id, options.pl_ssh_key)
-
+ return (options.pl_user, options.movie, options.exp_id, options.pl_ssh_key,
+ options.results)
if __name__ == '__main__':
content_name = "ccnx:/test/VIDEO"
- ( pl_user, movie, exp_id, pl_ssh_key ) = get_options()
+ ( pl_user, movie, exp_id, pl_ssh_key, results_dir ) = get_options()
# Search for available RMs
populate_factory()
ec = ExperimentController(exp_id = exp_id)
- # hosts
- host1 = "planetlab2.u-strasbg.fr"
- host2 = "planet1.servers.ua.pt"
- host3 = "planetlab1.cs.uoi.gr"
- host4 = "planetlab1.aston.ac.uk"
- host5 = "planetlab2.willab.fi"
- host6 = "planetlab-1.fokus.fraunhofer.de"
-
+ # hosts in Europe
+ #host1 = "planetlab2.u-strasbg.fr"
+ #host2 = "planet1.servers.ua.pt"
+ #host3 = "planetlab1.cs.uoi.gr"
+ #host4 = "planetlab1.aston.ac.uk"
+ #host5 = "planetlab2.willab.fi"
+ #host6 = "planetlab-1.fokus.fraunhofer.de"
+
+ # host in the US
+ host1 = "planetlab4.wail.wisc.edu"
+ host2 = "planetlab2.cs.columbia.edu"
+ host3 = "ricepl-2.cs.rice.edu"
+ host4 = "node1.planetlab.albany.edu"
+ host5 = "earth.cs.brown.edu"
+ host6 = "planetlab2.engr.uconn.edu"
+
# describe nodes in the central ring
ring_hosts = [host1, host2, host3, host4]
ccnds = dict()
for i in xrange(len(ring_hosts)):
host = ring_hosts[i]
- node = add_node(ec, host, pl_user)
+ node = add_node(ec, host, pl_user, pl_ssh_key)
ccnd = add_ccnd(ec, node)
ccnr = add_ccnr(ec, ccnd)
ccnds[host] = ccnd
l5d = add_fib_entry(ec, ccnds[host3], host1)
# border node 1
- bnode1 = add_node(ec, host5, pl_user)
+ bnode1 = add_node(ec, host5, pl_user, pl_ssh_key)
ccndb1 = add_ccnd(ec, bnode1)
ccnrb1 = add_ccnr(ec, ccndb1)
ccnds[host5] = ccndb1
co = add_content(ec, ccnrb1, content_name, movie)
# border node 2
- bnode2 = add_node(ec, host6, pl_user)
+ bnode2 = add_node(ec, host6, pl_user, pl_ssh_key)
ccndb2 = add_ccnd(ec, bnode2)
ccnrb2 = add_ccnr(ec, ccndb2)
ccnds[host6] = ccndb2
ec.register_condition(l5d, ResourceAction.STOP,
app, ResourceState.STARTED, time = "10s")
+ # Register a collector to automatically collect traces
+ collector = add_collector(ec, "stderr", results_dir)
+ for ccnd in ccnds.values():
+ ec.register_connection(collector, ccnd)
+
# deploy all ResourceManagers
ec.deploy()
+ # Wait until ccncat has started retrieving the content
ec.wait_started([app])
rvideo_path = ec.trace(app, "stdout", attr = TraceAttr.PATH)
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
- (stdout, stderr) = proc2.communicate()
-
- dirpath = tempfile.mkdtemp()
- print "Storing to DIRPATH ", dirpath
-
- for ccnd in ccnds.values():
- stdout = ec.trace(ccnd, "stderr")
- fname = "log-%d" % ccnd
- path = os.path.join(dirpath, fname)
- f = open(path, "w")
- f.write(stdout)
- f.close()
+ (stdout, stderr) = proc2.communicate()
# shutdown the experiment controller
ec.shutdown()
app3.set('xmppPort', "5222")
app3.set('xmppPassword', "1234")
-# Connection
-app3.connect(node1.guid)
-node1.connect(app3.guid)
+# register_connection
+app3.register_connection(node1.guid)
+node1.register_connection(app3.guid)
-app1.connect(node1.guid)
-node1.connect(app1.guid)
+app1.register_connection(node1.guid)
+node1.register_connection(app1.guid)
-node1.connect(iface1.guid)
-iface1.connect(node1.guid)
+node1.register_connection(iface1.guid)
+iface1.register_connection(node1.guid)
-iface1.connect(channel.guid)
-channel.connect(iface1.guid)
+iface1.register_connection(channel.guid)
+channel.register_connection(iface1.guid)
-channel.connect(iface2.guid)
-iface2.connect(channel.guid)
+channel.register_connection(iface2.guid)
+iface2.register_connection(channel.guid)
-iface2.connect(node2.guid)
-node2.connect(iface2.guid)
+iface2.register_connection(node2.guid)
+node2.register_connection(iface2.guid)
-node2.connect(app2.guid)
-app2.connect(node2.guid)
+node2.register_connection(app2.guid)
+app2.register_connection(node2.guid)
# Local Deploy
node1.deploy()
"nepi.design",
"nepi.execution",
"nepi.resources",
+ "nepi.resources.all",
"nepi.resources.linux",
"nepi.resources.linux.ccn",
"nepi.resources.netns",
rm1 = self.get_resource(guid1)
rm2 = self.get_resource(guid2)
- rm1.connect(guid2)
- rm2.connect(guid1)
+ rm1.register_connection(guid2)
+ rm2.register_connection(guid1)
def register_condition(self, group1, action, group2, state,
time = None):
import pkgutil
import weakref
-reschedule_delay = "0.5s"
+reschedule_delay = "1s"
class ResourceAction:
""" Action that a user can order to a Resource Manager
"""
return " %s guid: %d - %s " % (self._rtype, self.guid, msg)
- def connect(self, guid):
- """ Establishes a connection to the RM identified by guid
+ def register_connection(self, guid):
+ """ Registers a connection to the RM identified by guid
:param guid: Global unique identified of the RM to connect to
:type guid: int
"""
if self.valid_connection(guid):
+ self.connect(guid)
self._connections.add(guid)
- def disconnect(self, guid):
- """ Removes connection to the RM identified by guid
+ def unregister_connection(self, guid):
+ """ Removes a registered connection to the RM identified by guid
:param guid: Global unique identified of the RM to connect to
:type guid: int
"""
if guid in self._connections:
+ self.disconnect(guid)
self._connections.remove(guid)
def discover(self):
"""
pass
- def register_condition(self, action, group, state,
- time = None):
+ def register_condition(self, action, group, state, time = None):
""" Registers a condition on the resource manager to allow execution
of 'action' only after 'time' has elapsed from the moment all resources
in 'group' reached state 'state'
:type time: str
"""
+
+ if not action in self.conditions:
+ self._conditions[action] = list()
+
conditions = self.conditions.get(action)
- if not conditions:
- conditions = list()
- self._conditions[action] = conditions
# For each condition to register a tuple of (group, state, time) is
# added to the 'action' list
conditions.append((group, state, time))
+ def unregister_condition(self, group, action = None):
+ """ Removed conditions for a certain group of guids
+
+ :param action: Action to restrict to condition (either 'START' or 'STOP')
+ :type action: str
+
+ :param group: Group of RMs to wait for (list of guids)
+ :type group: int or list of int
+
+ """
+ # For each condition a tuple of (group, state, time) is
+ # added to the 'action' list
+ if not isinstance(group, list):
+ group = [group]
+
+ for act, conditions in self.conditions.iteritems():
+ if action and act != action:
+ continue
+
+ for condition in list(conditions):
+ (grp, state, time) = condition
+
+ # If there is an intersection between grp and group,
+ # then remove intersected elements
+ intsec = set(group).intersection(set(grp))
+ if intsec:
+ idx = conditions.index(condition)
+ newgrp = set(grp)
+ newgrp.difference_update(intsec)
+ conditions[idx] = (newgrp, state, time)
+
def get_connected(self, rtype = None):
""" Returns the list of RM with the type 'rtype'
for guid in group:
rm = self.ec.get_resource(guid)
# If the RM state is lower than the requested state we must
- # reschedule (e.g. if RM is READY but we required STARTED)
+ # reschedule (e.g. if RM is READY but we required STARTED).
if rm.state < state:
reschedule = True
break
# time still to wait
wait = tdiffsec(stabsformat(time), stabsformat(waited))
- if wait > 0:
+ if wait > 0.001:
reschedule = True
delay = "%fs" % wait
break
self._failed_time = tnow()
self._state = ResourceState.FAILED
+ def connect(self, guid):
+ """ Performs actions that need to be taken upon associating RMs.
+ This method should be redefined when necessary in child classes.
+ """
+ pass
+
+ def disconnect(self, guid):
+ """ Performs actions that need to be taken upon disassociating RMs.
+ This method should be redefined when necessary in child classes.
+ """
+ pass
+
def valid_connection(self, guid):
"""Checks whether a connection with the other RM
is valid.
--- /dev/null
+#
+# NEPI, a framework to manage network experiments
+# Copyright (C) 2013 INRIA
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
+
+from nepi.execution.attribute import Attribute, Flags, Types
+from nepi.execution.trace import Trace, TraceAttr
+from nepi.execution.resource import ResourceManager, clsinit, ResourceState, \
+ ResourceAction
+from nepi.util.sshfuncs import ProcStatus
+from nepi.util.timefuncs import tsformat
+
+import os
+import tempfile
+
+@clsinit
+class Collector(ResourceManager):
+ """ The collector is reponsible of collecting traces
+ of a same type associated to RMs into a local directory.
+
+ .. class:: Class Args :
+
+ :param ec: The Experiment controller
+ :type ec: ExperimentController
+ :param guid: guid of the RM
+ :type guid: int
+ :param creds: Credentials to communicate with the rm (XmppClient)
+ :type creds: dict
+
+ """
+ _rtype = "Collector"
+
+ @classmethod
+ def _register_attributes(cls):
+ trace_name = Attribute("traceName", "Name of the trace to be collected",
+ flags = Flags.ExecReadOnly)
+ store_dir = Attribute("storeDir", "Path to local directory to store trace results",
+ default = tempfile.gettempdir(),
+ flags = Flags.ExecReadOnly)
+
+ cls._register_attribute(trace_name)
+ cls._register_attribute(store_dir)
+
+ def __init__(self, ec, guid):
+ super(Collector, self).__init__(ec, guid)
+ self._store_path = None
+
+ @property
+ def store_path(self):
+ return self._store_path
+
+ def provision(self):
+ trace_name = self.get("traceName")
+ if not trace_name:
+ self.fail()
+
+ msg = "No traceName was specified"
+ self.error(msg)
+ raise RuntimeError, msg
+
+ store_dir = self.get("storeDir")
+ timestamp = tsformat()
+ self._store_path = os.path.join(store_dir, self.ec.exp_id, timestamp)
+
+ msg = "Creating local directory at %s to store %s traces " % (
+ store_dir, trace_name)
+ self.info(msg)
+
+ try:
+ os.makedirs(self.store_path)
+ except OSError:
+ pass
+
+ super(Collector, self).provision()
+
+ def deploy(self):
+ try:
+ self.discover()
+ self.provision()
+ except:
+ self.fail()
+ raise
+
+ super(Collector, self).deploy()
+
+ def release(self):
+ trace_name = self.get("traceName")
+
+ msg = "Collecting '%s' traces to local directory %s" % (
+ trace_name, self.store_path)
+ self.info(msg)
+
+ rms = self.get_connected()
+ for rm in rms:
+ result = self.ec.trace(rm.guid, trace_name)
+ fpath = os.path.join(self.store_path, "%d.%s" % (rm.guid,
+ trace_name))
+ f = open(fpath, "w")
+ f.write(result)
+ f.close()
+
+ super(Collector, self).release()
+
+ def valid_connection(self, guid):
+ # TODO: Validate!
+ return True
+
from nepi.execution.attribute import Attribute, Flags, Types
from nepi.execution.trace import Trace, TraceAttr
-from nepi.execution.resource import ResourceManager, clsinit, ResourceState
+from nepi.execution.resource import ResourceManager, clsinit, ResourceState, \
+ reschedule_delay
from nepi.resources.linux.node import LinuxNode
from nepi.util.sshfuncs import ProcStatus
from nepi.util.timefuncs import tnow, tdiffsec
node = self.node
if not node or node.state < ResourceState.READY:
self.debug("---- RESCHEDULING DEPLOY ---- node state %s " % self.node.state )
-
- reschedule_delay = "0.5s"
self.ec.schedule(reschedule_delay, self.deploy)
else:
try:
--- /dev/null
+#
+# NEPI, a framework to manage network experiments
+# Copyright (C) 2013 INRIA
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
+
+from nepi.execution.resource import clsinit_copy, ResourceState, \
+ ResourceAction
+from nepi.resources.linux.application import LinuxApplication
+from nepi.resources.linux.ccn.ccnd import LinuxCCND
+
+import os
+
+@clsinit_copy
+class LinuxCCNApplication(LinuxApplication):
+ _rtype = "LinuxCCNApplication"
+
+ def __init__(self, ec, guid):
+ super(LinuxCCNApplication, self).__init__(ec, guid)
+ self._home = "ccnapp-%s" % self.guid
+
+ @property
+ def ccnd(self):
+ ccnd = self.get_connected(LinuxCCND.rtype())
+ if ccnd: return ccnd[0]
+ return None
+
+ @property
+ def node(self):
+ if self.ccnd: return self.ccnd.node
+ return None
+
+ def deploy(self):
+ if not self.get("env"):
+ self.set("env", self._environment)
+
+ super(LinuxCCNApplication, self).deploy()
+
+ @property
+ def _environment(self):
+ env = "PATH=$PATH:${EXP_HOME}/ccnx/bin "
+ return env
+
+ def execute_command(self, command, env):
+ environ = self.node.format_environment(env, inline = True)
+ command = environ + command
+ command = self.replace_paths(command)
+
+ (out, err), proc = self.node.execute(command)
+
+ if proc.poll():
+ self._state = ResourceState.FAILED
+ self.error(msg, out, err)
+ raise RuntimeError, msg
+
+ def valid_connection(self, guid):
+ # TODO: Validate!
+ return True
+
from nepi.execution.attribute import Attribute, Flags, Types
from nepi.execution.resource import clsinit_copy, ResourceState, \
- ResourceAction
+ ResourceAction, reschedule_delay
from nepi.resources.linux.ccn.ccnapplication import LinuxCCNApplication
from nepi.resources.linux.ccn.ccnr import LinuxCCNR
from nepi.util.timefuncs import tnow
if not self.ccnr or self.ccnr.state < ResourceState.READY:
self.debug("---- RESCHEDULING DEPLOY ---- node state %s " % self.node.state )
- reschedule_delay = "0.5s"
# ccnr needs to wait until ccnd is deployed and running
self.ec.schedule(reschedule_delay, self.deploy)
else:
from nepi.execution.attribute import Attribute, Flags, Types
from nepi.execution.trace import Trace, TraceAttr
-from nepi.execution.resource import ResourceManager, clsinit_copy, ResourceState
+from nepi.execution.resource import ResourceManager, clsinit_copy, ResourceState, \
+ reschedule_delay
from nepi.resources.linux.application import LinuxApplication
from nepi.resources.linux.node import OSType
-from nepi.util.timefuncs import tnow, tdiff
+from nepi.util.timefuncs import tnow, tdiffsec
import os
# TODO: use ccndlogging to dynamically change the logging level
+
@clsinit_copy
class LinuxCCND(LinuxApplication):
_rtype = "LinuxCCND"
if not self.node or self.node.state < ResourceState.READY:
self.debug("---- RESCHEDULING DEPLOY ---- node state %s " % self.node.state )
- reschedule_delay = "0.5s"
- # ccnr needs to wait until ccnd is deployed and running
+ # ccnd needs to wait until node is deployed and running
self.ec.schedule(reschedule_delay, self.deploy)
else:
if not self.get("command"):
# First check if the ccnd has failed
state_check_delay = 0.5
if self._state == ResourceState.STARTED and \
- tdiff(tnow(), self._last_state_check) > state_check_delay:
+ tdiffsec(tnow(), self._last_state_check) > state_check_delay:
(out, err), proc = self._ccndstatus
retcode = proc.poll()
@property
def _dependencies(self):
- if self.node.os in [ OSType.FEDORA_12 , OSType.FEDORA_14 ]:
+ if self.node.use_rpm:
return ( " autoconf openssl-devel expat-devel libpcap-devel "
" ecryptfs-utils-devel libxml2-devel automake gawk "
" gcc gcc-c++ git pcre-devel make ")
- elif self.node.os in [ OSType.UBUNTU , OSType.DEBIAN]:
+ elif self.node.use_deb:
return ( " autoconf libssl-dev libexpat-dev libpcap-dev "
" libecryptfs0 libxml2-utils automake gawk gcc g++ "
" git-core pkg-config libpcre3-dev make ")
from nepi.execution.attribute import Attribute, Flags, Types
from nepi.execution.trace import Trace, TraceAttr
from nepi.execution.resource import clsinit_copy, ResourceState, \
- ResourceAction
+ ResourceAction, reschedule_delay
from nepi.resources.linux.ccn.ccnapplication import LinuxCCNApplication
from nepi.resources.linux.ccn.ccnd import LinuxCCND
from nepi.util.timefuncs import tnow
if not self.ccnd or self.ccnd.state < ResourceState.READY:
self.debug("---- RESCHEDULING DEPLOY ---- CCND state %s " % self.ccnd.state )
- reschedule_delay = "0.5s"
# ccnr needs to wait until ccnd is deployed and running
self.ec.schedule(reschedule_delay, self.deploy)
else:
from nepi.execution.attribute import Attribute, Flags, Types
from nepi.execution.trace import Trace, TraceAttr
from nepi.execution.resource import clsinit_copy, ResourceState, \
- ResourceAction
+ ResourceAction, reschedule_delay
from nepi.resources.linux.ccn.ccnapplication import LinuxCCNApplication
from nepi.util.timefuncs import tnow
import os
-reschedule_delay = "0.5s"
# TODO: Add rest of options for ccndc!!!
# Implement ENTRY DELETE!!
# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
from nepi.execution.attribute import Attribute, Types, Flags
-from nepi.execution.resource import ResourceManager, clsinit, ResourceState
+from nepi.execution.resource import ResourceManager, clsinit, ResourceState, \
+ reschedule_delay
from nepi.resources.linux.node import LinuxNode
from nepi.resources.linux.channel import LinuxChannel
# TODO: UP, MTU attributes!
-reschedule_delay = "0.5s"
@clsinit
class LinuxInterface(ResourceManager):
# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
from nepi.execution.attribute import Attribute, Flags
-from nepi.execution.resource import ResourceManager, clsinit, ResourceState
+from nepi.execution.resource import ResourceManager, clsinit, ResourceState, \
+ reschedule_delay
from nepi.resources.linux import rpmfuncs, debfuncs
from nepi.util import sshfuncs, execfuncs
from nepi.util.sshfuncs import ProcStatus
# TODO: Unify delays!!
# TODO: Validate outcome of uploads!!
-reschedule_delay = "0.5s"
class ExitCode:
"""
"""
Supported flavors of Linux OS
"""
+ FEDORA_8 = "f8"
FEDORA_12 = "f12"
FEDORA_14 = "f14"
FEDORA = "fedora"
self.error(msg, out, err)
raise RuntimeError, "%s - %s - %s" %( msg, out, err )
- if out.find("Fedora release 12") == 0:
+ if out.find("Fedora release 8") == 0:
+ self._os = OSType.FEDORA_8
+ elif out.find("Fedora release 12") == 0:
self._os = OSType.FEDORA_12
elif out.find("Fedora release 14") == 0:
self._os = OSType.FEDORA_14
return self._os
+ @property
+ def use_deb(self):
+ return self.os in [OSType.DEBIAN, OSType.UBUNTU]
+
+ @property
+ def use_rpm(self):
+ return self.os in [OSType.FEDORA_12, OSType.FEDORA_14, OSType.FEDORA_8,
+ OSType.FEDORA]
+
@property
def localhost(self):
return self.get("hostname") in ['localhost', '127.0.0.7', '::1']
def install_packages(self, packages, home):
command = ""
- if self.os in [OSType.FEDORA_12, OSType.FEDORA_14, OSType.FEDORA]:
+ if self.use_rpm:
command = rpmfuncs.install_packages_command(self.os, packages)
- elif self.os in [OSType.DEBIAN, OSType.UBUNTU]:
+ elif self.use_deb:
command = debfuncs.install_packages_command(self.os, packages)
else:
msg = "Error installing packages ( OS not known ) "
def remove_packages(self, packages, home):
command = ""
- if self.os in [OSType.FEDORA_12, OSType.FEDORA_14, OSType.FEDORA]:
+ if self.use_rpm:
command = rpmfuncs.remove_packages_command(self.os, packages)
- elif self.os in [OSType.DEBIAN, OSType.UBUNTU]:
+ elif self.use_deb:
command = debfuncs.remove_packages_command(self.os, packages)
else:
msg = "Error removing packages ( OS not known ) "
if not isinstance(packages, list):
packages = [packages]
- cmd = install_rpmfusion_command(os) + " && "
+ cmd = install_rpmfusion_command(os)
+ if cmd: cmd += " && "
cmd += " && ".join(map(lambda p:
" { rpm -q %(package)s || sudo -S yum -y install %(package)s ; } " % {
'package': p}, packages))
cmd = " { rpm -q rpmfusion-free-release || sudo -S rpm -i %(package)s ; } "
if os in [OSType.FEDORA, OSType.FEDORA_12]:
+ # For f12
cmd = cmd % {'package': RPM_FUSION_URL_F12}
elif os == OSType.FEDORA_14:
- # This one works for f13+
+ # For f13+
cmd = cmd % {'package': RPM_FUSION_URL}
else:
+ # Fedora 8 is unmaintained
cmd = ""
return cmd
# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
# Julien Tribino <julien.tribino@inria.fr>
-from nepi.execution.resource import ResourceManager, clsinit, ResourceState
+from nepi.execution.resource import ResourceManager, clsinit, ResourceState, \
+ reschedule_delay
from nepi.execution.attribute import Attribute, Flags
from nepi.resources.omf.omf_api import OMFAPIFactory
-reschedule_delay = "0.5s"
@clsinit
class OMFApplication(ResourceManager):
"""
-from nepi.execution.resource import ResourceManager, clsinit, ResourceState
+from nepi.execution.resource import ResourceManager, clsinit, ResourceState, \
+ reschedule_delay
from nepi.execution.attribute import Attribute, Flags
from nepi.resources.omf.omf_api import OMFAPIFactory
-reschedule_delay = "0.5s"
@clsinit
class OMFChannel(ResourceManager):
# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
# Julien Tribino <julien.tribino@inria.fr>
-from nepi.execution.resource import ResourceManager, clsinit, ResourceState
+from nepi.execution.resource import ResourceManager, clsinit, ResourceState, \
+ reschedule_delay
from nepi.execution.attribute import Attribute, Flags
from nepi.resources.omf.omf_api import OMFAPIFactory
-reschedule_delay = "0.5s"
@clsinit
class OMFWifiInterface(ResourceManager):
# Julien Tribino <julien.tribino@inria.fr>
-from nepi.execution.resource import ResourceManager, clsinit, ResourceState
+from nepi.execution.resource import ResourceManager, clsinit, ResourceState, \
+ reschedule_delay
from nepi.execution.attribute import Attribute, Flags
from nepi.resources.omf.omf_api import OMFAPIFactory
import time
-reschedule_delay = "0.5s"
@clsinit
class OMFNode(ResourceManager):
from nepi.resources.omf.omf_client import OMFClient
from nepi.resources.omf.messages_5_4 import MessageHandler
-from nepi.util.timefuncs import tsfromat
+from nepi.util.timefuncs import tsformat
class OMFAPI(Logger):
"""
"""
super(OMFAPI, self).__init__("OMFAPI")
- date = tsfromat()
+ date = tsformat()
tz = -time.altzone if time.daylight != 0 else -time.timezone
date += "%+06.2f" % (tz / 3600) # timezone difference is in seconds
self._user = "%s-%s" % (slice, date)
# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
from nepi.execution.attribute import Attribute, Flags, Types
-from nepi.execution.resource import ResourceManager, clsinit_copy, ResourceState
+from nepi.execution.resource import ResourceManager, clsinit_copy, ResourceState, \
+ reschedule_delay
from nepi.resources.linux.node import LinuxNode
-
from nepi.resources.planetlab.plcapi import PLCAPIFactory
-reschedule_delay = "0.5s"
@clsinit_copy
class PlanetlabNode(LinuxNode):
"""
return datetime.datetime.strptime(sdate, _strf).date()
-def tsfromat(date = None):
+def tsformat(date = None):
""" Formats a datetime object to a string with format YYYYMMddHHMMSSffff.
If no date is given, the current date is used.
from nepi.execution.attribute import Attribute
from nepi.execution.ec import ExperimentController
-from nepi.execution.resource import ResourceManager, ResourceState, clsinit
+from nepi.execution.resource import ResourceManager, ResourceState, clsinit, \
+ ResourceAction
import random
import time
def __init__(self, ec, guid):
super(AnotherResource, self).__init__(ec, guid)
-
-class ResourceFactoryTestCase(unittest.TestCase):
- def test_add_resource_factory(self):
- from nepi.execution.resource import ResourceFactory
-
- ResourceFactory.register_type(MyResource)
- ResourceFactory.register_type(AnotherResource)
-
- self.assertEquals(MyResource.rtype(), "MyResource")
- self.assertEquals(len(MyResource._attributes), 1)
-
- self.assertEquals(ResourceManager.rtype(), "Resource")
- self.assertEquals(len(ResourceManager._attributes), 0)
-
- self.assertEquals(AnotherResource.rtype(), "AnotherResource")
- self.assertEquals(len(AnotherResource._attributes), 0)
- self.assertEquals(len(ResourceFactory.resource_types()), 2)
class Channel(ResourceManager):
_rtype = "Channel"
super(Application, self).start()
time.sleep(random.random() * 5)
self._state = ResourceState.FINISHED
+
+
+class ResourceFactoryTestCase(unittest.TestCase):
+ def test_add_resource_factory(self):
+ from nepi.execution.resource import ResourceFactory
+
+ ResourceFactory.register_type(MyResource)
+ ResourceFactory.register_type(AnotherResource)
+
+ self.assertEquals(MyResource.rtype(), "MyResource")
+ self.assertEquals(len(MyResource._attributes), 1)
+
+ self.assertEquals(ResourceManager.rtype(), "Resource")
+ self.assertEquals(len(ResourceManager._attributes), 0)
+
+ self.assertEquals(AnotherResource.rtype(), "AnotherResource")
+ self.assertEquals(len(AnotherResource._attributes), 0)
+
+ self.assertEquals(len(ResourceFactory.resource_types()), 2)
class ResourceManagerTestCase(unittest.TestCase):
+ def test_register_condition(self):
+ ec = ExperimentController()
+ rm = ResourceManager(ec, 15)
+
+ group = [1,3,5,7]
+ rm.register_condition(ResourceAction.START, group,
+ ResourceState.STARTED)
+
+ group = [10,8]
+ rm.register_condition(ResourceAction.START,
+ group, ResourceState.STARTED, time = "10s")
+
+ waiting_for = []
+ conditions = rm.conditions.get(ResourceAction.START)
+ for (group, state, time) in conditions:
+ waiting_for.extend(group)
+
+ self.assertEquals(waiting_for, [1, 3, 5, 7, 10, 8])
+
+ group = [1, 2, 3, 4, 6]
+ rm.unregister_condition(group)
+
+ waiting_for = []
+ conditions = rm.conditions.get(ResourceAction.START)
+ for (group, state, time) in conditions:
+ waiting_for.extend(group)
+
+ self.assertEquals(waiting_for, [5, 7, 10, 8])
+
def test_deploy_in_order(self):
"""
Test scenario: 2 applications running one on 1 node each.