--- /dev/null
+"""
+ NEPI, a framework to manage network experiments
+ Copyright (C) 2013 INRIA
+
+ This program is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+ Author: Alina Quereilhac <alina.quereilhac@inria.fr>
+ Julien Tribino <julien.tribino@inria.fr>
+
+ Example :
+ - Testbed : Nitos
+ - Explanation :
+
+ CCN topology:
+
+ content ccncat
+ b1 b2
+ 0 ------------------ 0
+
+ - Experiment:
+ * t0 : b2 retrives video published in b1
+
+"""
+
+#!/usr/bin/env python
+from nepi.execution.resource import ResourceFactory, ResourceAction, ResourceState
+from nepi.execution.ec import ExperimentController
+
+from optparse import OptionParser, SUPPRESS_HELP
+
+### Define OMF Method to simplify definition of resources ###
+def add_node(ec, hostname, xmppSlice, xmppHost, xmppPort = "5222", xmppPassword = "1234"):
+ node = ec.register_resource("OMFNode")
+ ec.set(node, 'hostname', hostname)
+ ec.set(node, 'xmppSlice', xmppSlice)
+ ec.set(node, 'xmppHost', xmppHost)
+ ec.set(node, 'xmppPort', xmppPort)
+ ec.set(node, 'xmppPassword', xmppPassword)
+ return node
+
+def add_interface(ec, ip, xmppSlice, xmppHost, essid = "ccn", alias = "w0", mode = "adhoc",
+ typ = "g", xmppPort = "5222", xmppPassword = "1234"):
+ iface = ec.register_resource("OMFWifiInterface")
+ ec.set(iface, 'alias', alias)
+ ec.set(iface, 'mode', mode)
+ ec.set(iface, 'type', typ)
+ ec.set(iface, 'essid', essid)
+ ec.set(iface, 'ip', ip)
+ return iface
+
+def add_channel(ec, channel, xmppSlice, xmppHost, xmppPort = "5222", xmppPassword = "1234"):
+ chan = ec.register_resource("OMFChannel")
+ ec.set(chan, 'channel', channel)
+ ec.set(chan, 'xmppSlice', xmppSlice)
+ ec.set(chan, 'xmppHost', xmppHost)
+ ec.set(chan, 'xmppPort', xmppPort)
+ ec.set(chan, 'xmppPassword', xmppPassword)
+ return chan
+
+def add_app(ec, appid, command, args, env, xmppSlice, xmppHost,
+ xmppPort = "5222", xmppPassword = "1234"):
+ app = ec.register_resource("OMFApplication")
+ ec.set(app, 'appid', appid)
+ ec.set(app, 'path', command)
+ ec.set(app, 'args', args)
+ ec.set(app, 'env', env)
+ return app
+
+
+### Define a CCND application ###
+def add_ccnd(ec, peers, xmppSlice, xmppHost, xmppPort = "5222", xmppPassword = "1234"):
+ env = 'PATH=$PATH:/root/ccnx-0.7.2/bin HOME=/root \
+CCNR_DIRECTORY="/root" CCNR_STATUS_PORT="8080"'
+
+ # BASH command -> ' ccndstart ; ccndc add ccnx:/ udp host ; ccnr '
+ peers = map(lambda peer: "ccndc add ccnx:/ udp %s" % peer, peers)
+ #command += " ; ".join(peers) + " && "
+ command = peers[0]
+
+ app = add_app(ec, "#ccnd", command, "", env, xmppSlice, xmppHost,
+ xmppPort = xmppPort, xmppPassword = xmppPassword)
+ return app
+
+### Define a CCN SeqWriter application ###
+def add_publish(ec, movie, xmppSlice, xmppHost, xmppPort = "5222", xmppPassword = "1234"):
+ env = 'PATH=$PATH:/root/ccnx-0.7.2/bin HOME=/root CCNR_DIRECTORY="/root" CCNR_STATUS_PORT="8080"'
+
+ # BASH command -> 'ccnseqwriter -r ccnx:/VIDEO < movie'
+ command = "ccnseqwriter -r ccnx:/VIDEO"
+ command += " < " + movie
+
+ app = add_app(ec, "#ccn_write", command, "", env, xmppSlice, xmppHost,
+ xmppPort = xmppPort, xmppPassword = xmppPassword)
+ return app
+
+### Define a streaming application ###
+def add_stream(ec, xmppSlice, xmppHost, xmppPort = "5222", xmppPassword = "1234"):
+ env = 'PATH=$PATH:/root/ccnx-0.7.2/bin HOME=/root CCNR_DIRECTORY="/root" CCNR_STATUS_PORT="8080"'
+ command = " ddbus-uuidgen --ensure ; ( /root/ccnx-0.7.2/bin/ccncat ccnx:/VIDEO | /root/vlc-1.1.13/cvlc - ) "
+ #command = "ccncat ccnx:/VIDEO | /root/vlc-1.1.13/cvlc - "
+ app = add_app(ec, "#ccn_stream", command, "", env, xmppSlice, xmppHost,
+ xmppPort = xmppPort, xmppPassword = xmppPassword)
+ return app
+
+### Many options to easily addapt the script ####
+def get_options():
+ usage = "usage: %prog -w <sender-node> -r <receiver-node> -s <slice> -m <movie>"
+
+ parser = OptionParser(usage=usage)
+ parser.add_option("-w", "--sender-host", dest="sender_host",
+ help="Hostname of the sender Node", type="str")
+ parser.add_option("-i", "--sender-ip", dest="sender_ip",
+ help="IP of the sender Node", type="str")
+ parser.add_option("-r", "--receiver-host", dest="receiver_host",
+ help="Hostname of the receiver node", type="str")
+ parser.add_option("-p", "--receiver-ip", dest="receiver_ip",
+ help="IP of the receiver node", type="str")
+ parser.add_option("-c", "--channel", dest="channel",
+ help="Channel of the communication", type="str")
+ parser.add_option("-s", "--xmpp-slice", dest="xmpp_slice",
+ help="Name of the slice XMPP", type="str")
+ parser.add_option("-x", "--xmpp-host", dest="xmpp_host",
+ help="Name of the host XMPP", type="str")
+ parser.add_option("-m", "--movie", dest="movie",
+ help="Stream movie", type="str")
+
+ (options, args) = parser.parse_args()
+
+ if not options.movie:
+ parser.error("movie is a required argument")
+
+ return (options.sender_host, options.sender_ip, options.receiver_host, options.receiver_ip, options.channel, options.xmpp_slice,
+ options.xmpp_host ,options.movie)
+
+### Script itself ###
+if __name__ == '__main__':
+ (sender, sender_ip, receiver, receiver_ip, channel, xmpp_slice, xmpp_host, movie) = get_options()
+
+ env = 'PATH=$PATH:/root/ccnx-0.7.2/bin HOME=/root CCNR_DIRECTORY="/root" CCNR_STATUS_PORT="8080"'
+
+# Create the EC
+ ec = ExperimentController()
+
+# Create the topology
+ node1 = add_node(ec,sender, xmpp_slice, xmpp_host)
+ iface1 = add_interface(ec, sender_ip, xmpp_slice, xmpp_host)
+ ec.register_connection(node1, iface1)
+
+ node2 = add_node(ec, receiver, xmpp_slice, xmpp_host)
+ iface2 = add_interface(ec, receiver_ip, xmpp_slice, xmpp_host)
+ ec.register_connection(node2, iface2)
+
+ chann = add_channel(ec, channel, xmpp_slice, xmpp_host)
+ ec.register_connection(iface1, chann)
+ ec.register_connection(iface2, chann)
+
+# CCN setup for the sender
+ ccndstart1 = add_app(ec, "#ccndstart", "ccndstart &", "", env,xmpp_slice, xmpp_host)
+ ec.register_connection(ccndstart1, node1)
+
+ peers = [receiver_ip]
+ ccnd1 = add_ccnd(ec, peers, xmpp_slice, xmpp_host)
+ ec.register_connection(ccnd1, node1)
+
+ ccnr1 = add_app(ec, "#ccnr", "ccnr &", "", env, xmpp_slice, xmpp_host)
+ ec.register_connection(ccnr1, node1)
+
+ # Register content producer application (ccnseqwriter)
+ pub = add_publish(ec, movie, xmpp_slice, xmpp_host)
+ ec.register_connection(pub, node1)
+
+ # The movie can only be published after ccnd is running
+ ec.register_condition(ccnd1, ResourceAction.START, ccndstart1, ResourceState.STARTED, "1s")
+ ec.register_condition(ccnr1, ResourceAction.START, ccnd1, ResourceState.STARTED, "1s")
+ ec.register_condition(pub, ResourceAction.START, ccnr1, ResourceState.STARTED, "2s")
+
+# CCN setup for the receiver
+ ccndstart2 = add_app(ec, "#ccndstart", "ccndstart &", "", env,xmpp_slice, xmpp_host)
+ ec.register_connection(ccndstart2, node2)
+
+ peers = [sender_ip]
+ ccnd2 = add_ccnd(ec, peers, xmpp_slice, xmpp_host)
+ ec.register_connection(ccnd2, node2)
+
+ ccnr2 = add_app(ec, "#ccnr", "ccnr &", "", env,xmpp_slice, xmpp_host)
+ ec.register_connection(ccnr2, node2)
+
+ # Register consumer application (ccncat)
+ stream = add_stream(ec, xmpp_slice, xmpp_host)
+ ec.register_connection(stream, node2)
+
+ # The stream can only be retrieved after ccnd is running
+ ec.register_condition(ccnd2, ResourceAction.START, ccndstart2, ResourceState.STARTED, "1s")
+ ec.register_condition(ccnr2, ResourceAction.START, ccnd2, ResourceState.STARTED, "1s")
+ ec.register_condition(stream, ResourceAction.START, ccnr2, ResourceState.STARTED, "2s")
+
+# And also, the stream can only be retrieved after it was published
+ ec.register_condition(stream, ResourceAction.START, pub, ResourceState.STARTED, "5s")
+
+
+# Cleaning when the experiment stop
+ ccndstop1 = add_app(ec, "#ccndstop", "ccndstop", "", env, xmpp_slice, xmpp_host)
+ ec.register_connection(ccndstop1, node1)
+ ccndstop2 = add_app(ec, "#ccndstop", "ccndstop", "", env, xmpp_slice, xmpp_host)
+ ec.register_connection(ccndstop2, node2)
+ ccndstops = [ccndstop1,ccndstop2]
+
+ killall = add_app(ec, "#kill", "killall sh", "", "", xmpp_slice, xmpp_host)
+ ec.register_connection(killall, node2)
+
+ apps = [ccndstart1, ccnd1, ccnr1, pub, ccndstart2, ccnd2, ccnr2, stream]
+
+ ec.register_condition(apps, ResourceAction.STOP, stream, ResourceState.STARTED, "20s")
+
+ ec.register_condition(ccndstops, ResourceAction.START, apps, ResourceState.STOPPED, "1s")
+ ec.register_condition(killall, ResourceAction.START, ccndstops, ResourceState.STARTED)
+ ec.register_condition(ccndstops, ResourceAction.STOP, ccndstops, ResourceState.STARTED, "1s")
+ ec.register_condition(killall, ResourceAction.STOP, ccndstops, ResourceState.STOPPED)
+
+# Deploy all ResourceManagers
+ ec.deploy()
+
+# Wait until the applications are finished
+ ec.wait_finished(ccndstops)
+
+# Shutdown the experiment controller
+ ec.shutdown()
+
--- /dev/null
+"""
+ NEPI, a framework to manage network experiments
+ Copyright (C) 2013 INRIA
+
+ This program is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+ Author: Alina Quereilhac <alina.quereilhac@inria.fr>
+ Julien Tribino <julien.tribino@inria.fr>
+
+ Example :
+ - Testbed : Nitos
+ - Explanation :
+
+ CCN topology:
+
+ h2
+ 0
+ content l1 / \ l2 ccncat
+ b1 /l5 \ b2
+ 0 ----- h1 0 --- 0 h3 ------ 0
+ \ /
+ l4 \ / l3
+ 0
+ h4
+
+ - Experiment:
+ * t0 : b2 retrives video published in b1
+ * t1 : l5 goes down after 15 sec
+
+"""
+
+#!/usr/bin/env python
+from nepi.execution.resource import ResourceFactory, ResourceAction, ResourceState
+from nepi.execution.ec import ExperimentController
+
+from optparse import OptionParser, SUPPRESS_HELP
+
+### Define OMF Method to simplify definition of resources ###
+def add_node(ec, hostname, xmppSlice, xmppHost, xmppPort = "5222", xmppPassword = "1234"):
+ node = ec.register_resource("OMFNode")
+ ec.set(node, 'hostname', hostname)
+ ec.set(node, 'xmppSlice', xmppSlice)
+ ec.set(node, 'xmppHost', xmppHost)
+ ec.set(node, 'xmppPort', xmppPort)
+ ec.set(node, 'xmppPassword', xmppPassword)
+ return node
+
+def add_interface(ec, ip, xmppSlice, xmppHost, essid = "ccn", alias = "w0", mode = "adhoc",
+ typ = "g", xmppPort = "5222", xmppPassword = "1234"):
+ iface = ec.register_resource("OMFWifiInterface")
+ ec.set(iface, 'alias', alias)
+ ec.set(iface, 'mode', mode)
+ ec.set(iface, 'type', typ)
+ ec.set(iface, 'essid', essid)
+ ec.set(iface, 'ip', ip)
+ return iface
+
+def add_channel(ec, channel, xmppSlice, xmppHost, xmppPort = "5222", xmppPassword = "1234"):
+ chan = ec.register_resource("OMFChannel")
+ ec.set(chan, 'channel', channel)
+ ec.set(chan, 'xmppSlice', xmppSlice)
+ ec.set(chan, 'xmppHost', xmppHost)
+ ec.set(chan, 'xmppPort', xmppPort)
+ ec.set(chan, 'xmppPassword', xmppPassword)
+ return chan
+
+def add_app(ec, host, appid, command, args, env, xmppSlice, xmppHost,
+ xmppPort = "5222", xmppPassword = "1234"):
+ app = ec.register_resource("OMFApplication")
+ ec.set(app, 'appid', appid)
+ ec.set(app, 'path', command)
+ ec.set(app, 'args', args)
+ ec.set(app, 'env', env)
+ ec.register_connection(app, host)
+ return app
+
+
+### Define a CCND application ###
+def add_ccnd(ec, host, peers, xmppSlice, xmppHost, xmppPort = "5222", xmppPassword = "1234"):
+ env = 'PATH=$PATH:/root/ccnx-0.7.2/bin HOME=/root \
+CCNR_DIRECTORY="/root" CCNR_STATUS_PORT="8080"'
+
+ # BASH command -> ' ccndstart ; ccndc add ccnx:/ udp host ; ccnr '
+ command = "ccndc add ccnx:/ udp " + peers
+ app = add_app(ec, host, "#ccnd", command, "", env, xmppSlice, xmppHost,
+ xmppPort = xmppPort, xmppPassword = xmppPassword)
+ return app
+
+### Define a CCN SeqWriter application ###
+def add_publish(ec, host, movie, xmppSlice, xmppHost, xmppPort = "5222", xmppPassword = "1234"):
+ env = 'PATH=$PATH:/root/ccnx-0.7.2/bin HOME=/root CCNR_DIRECTORY="/root" CCNR_STATUS_PORT="8080"'
+
+ # BASH command -> 'ccnseqwriter -r ccnx:/VIDEO < movie'
+ command = "ccnseqwriter -r ccnx:/VIDEO"
+ command += " < " + movie
+
+ app = add_app(ec, host, "#ccn_write", command, "", env, xmppSlice, xmppHost,
+ xmppPort = xmppPort, xmppPassword = xmppPassword)
+ return app
+
+### Define a streaming application ###
+def add_stream(ec, host, xmppSlice, xmppHost, xmppPort = "5222", xmppPassword = "1234"):
+ env = 'PATH=$PATH:/root/ccnx-0.7.2/bin HOME=/root CCNR_DIRECTORY="/root" CCNR_STATUS_PORT="8080"'
+ command = " ddbus-uuidgen --ensure ; ( /root/ccnx-0.7.2/bin/ccncat ccnx:/VIDEO | /root/vlc-1.1.13/cvlc - ) "
+ app = add_app(ec, host, "#ccn_stream", command, "", env, xmppSlice, xmppHost,
+ xmppPort = xmppPort, xmppPassword = xmppPassword)
+ return app
+
+### Many options to easily addapt the script ####
+def get_options():
+ usage = "usage: %prog -c <channel> -s <xmpp_slice> -x <xmpp_host> -m <movie>"
+
+ parser = OptionParser(usage=usage)
+ parser.add_option("-c", "--channel", dest="channel",
+ help="Channel of the communication", type="str")
+ parser.add_option("-s", "--xmpp-slice", dest="xmpp_slice",
+ help="Name of the slice XMPP", type="str")
+ parser.add_option("-x", "--xmpp-host", dest="xmpp_host",
+ help="Name of the host XMPP", type="str")
+ parser.add_option("-m", "--movie", dest="movie",
+ help="Stream movie", type="str")
+ #parser.add_option("-e", "--exp-id", dest="exp_id",
+ # help="Label to identify experiment", type="str")
+
+ (options, args) = parser.parse_args()
+
+ if not options.movie:
+ parser.error("movie is a required argument")
+
+ return (options.channel, options.xmpp_slice,
+ options.xmpp_host ,options.movie)
+
+### Script itself ###
+if __name__ == '__main__':
+ (channel, xmpp_slice, xmpp_host, movie) = get_options()
+
+ env = 'PATH=$PATH:/root/ccnx-0.7.2/bin HOME=/root CCNR_DIRECTORY="/root" CCNR_STATUS_PORT="8080"'
+
+# Create the EC
+ ec = ExperimentController()
+
+# Create the topology
+ host1 = "omf.nitos.node022"
+ host2 = "omf.nitos.node028"
+ host3 = "omf.nitos.node024"
+ host4 = "omf.nitos.node025"
+ host5 = "omf.nitos.node026" # b1
+ host6 = "omf.nitos.node027" # b2
+
+ ip1 = "192.168.0.22"
+ ip2 = "192.168.0.28"
+ ip3 = "192.168.0.24"
+ ip4 = "192.168.0.25"
+ ip5 = "192.168.0.26"
+ ip6 = "192.168.0.27"
+
+ all_hosts = [host1, host2, host3, host4, host5, host6]
+ all_ip = [ip1, ip2, ip3, ip4, ip5, ip6]
+
+ ring_hosts = [host1, host2, host3, host4]
+ nodes = dict()
+
+ chann = add_channel(ec, channel, xmpp_slice, xmpp_host)
+ for i in xrange(len(all_hosts)):
+ node = add_node(ec,all_hosts[i], xmpp_slice, xmpp_host)
+ iface = add_interface(ec, all_ip[i], xmpp_slice, xmpp_host)
+ ec.register_connection(node, iface)
+ ec.register_connection(iface, chann)
+ nodes[all_hosts[i]] = node
+
+# CCN setup for the node
+ ccnds = dict()
+ ccnrs = dict()
+ for i in xrange(len(all_hosts)):
+ ccndstart = add_app(ec, nodes[all_hosts[i]], "#ccndstart", "ccndstart &",
+ "", env, xmpp_slice, xmpp_host)
+ ccnr = add_app(ec, nodes[all_hosts[i]], "#ccnr", "ccnr &",
+ "", env, xmpp_slice, xmpp_host)
+ ccnds[all_hosts[i]] = ccndstart
+ ccnrs[all_hosts[i]] = ccnr
+ ec.register_condition(ccnr, ResourceAction.START, ccndstart, ResourceState.STARTED, "1s")
+
+# CCNDC setup
+ # l1 : h1 - h2 , h2 - h1
+ l1u = add_ccnd(ec, nodes[host1], ip2, xmpp_slice, xmpp_host)
+ l1d = add_ccnd(ec, nodes[host2], ip1, xmpp_slice, xmpp_host)
+
+ # l2 : h2 - h3 , h3 - h2
+ l2u = add_ccnd(ec, nodes[host2], ip3, xmpp_slice, xmpp_host)
+ l2d = add_ccnd(ec, nodes[host3], ip2, xmpp_slice, xmpp_host)
+
+ # l3 : h3 - h4 , h4 - h3
+ l3u = add_ccnd(ec, nodes[host3], ip4, xmpp_slice, xmpp_host)
+ l3d = add_ccnd(ec, nodes[host4], ip3, xmpp_slice, xmpp_host)
+
+ # l4 : h4 - h1 , h1 - h4
+ l4u = add_ccnd(ec, nodes[host4], ip1, xmpp_slice, xmpp_host)
+ l4d = add_ccnd(ec, nodes[host1], ip4, xmpp_slice, xmpp_host)
+
+ # l5 : h1 - h3 , h3 - h1
+ l5u = add_ccnd(ec, nodes[host1], ip3, xmpp_slice, xmpp_host)
+ l5d = add_ccnd(ec, nodes[host3], ip1, xmpp_slice, xmpp_host)
+
+ # connect border nodes
+ b1u = add_ccnd(ec, nodes[host5], ip1, xmpp_slice, xmpp_host)
+ b1d = add_ccnd(ec, nodes[host1], ip5, xmpp_slice, xmpp_host)
+
+ b2u = add_ccnd(ec, nodes[host6], ip3, xmpp_slice, xmpp_host)
+ b2d = add_ccnd(ec, nodes[host3], ip6, xmpp_slice, xmpp_host)
+
+ link = [l1u, l1d, l2u, l2d, l3u, l3d, l4u, l4d, l5u, l5d, b1u, b1d, b2u, b2d]
+
+# List of condition
+ for i in xrange(len(all_hosts)):
+ ec.register_condition(ccnrs[all_hosts[i]], ResourceAction.START, ccnds[all_hosts[i]], ResourceState.STARTED, "1s")
+ ec.register_condition(link, ResourceAction.START, ccnrs[all_hosts[i]], ResourceState.STARTED, "1s")
+
+# Streaming Server
+ pub = add_publish(ec, nodes[host5], movie, xmpp_slice, xmpp_host)
+
+# Streaming client
+ stream = add_stream(ec, nodes[host6], xmpp_slice, xmpp_host)
+
+# Streaming conditions
+ ec.register_condition(pub, ResourceAction.START, link, ResourceState.STARTED, "2s")
+ ec.register_condition(stream, ResourceAction.START, link, ResourceState.STARTED, "2s")
+ ec.register_condition(stream, ResourceAction.START, pub, ResourceState.STARTED, "2s")
+
+# break the lin
+ ccndcstop1 = add_app(ec,nodes[host1], "#ccndc", "ccndc del ccnx:/ udp " + ip3, "", env, xmpp_slice, xmpp_host)
+ ccndcstop2 = add_app(ec,nodes[host3], "#ccndc", "ccndc del ccnx:/ udp " + ip1, "", env, xmpp_slice, xmpp_host)
+
+
+# Change the behaviour
+ ec.register_condition(l5u, ResourceAction.STOP, stream, ResourceState.STARTED, "10s")
+ ec.register_condition(l5d, ResourceAction.STOP, stream, ResourceState.STARTED, "10s")
+
+ ec.register_condition(ccndcstop1, ResourceAction.START, stream, ResourceState.STARTED, "10s")
+ ec.register_condition(ccndcstop2, ResourceAction.START, stream, ResourceState.STARTED, "10s")
+
+# Cleaning when the experiment stop
+ ccndstops = []
+ for i in xrange(len(all_hosts)):
+ ccndstop = add_app(ec, nodes[all_hosts[i]], "#ccndstop", "ccndstop", "", env, xmpp_slice, xmpp_host)
+ ccndstops.append(ccndstop)
+
+ killall = add_app(ec, nodes[host6], "#kill", "killall sh", "", "", xmpp_slice, xmpp_host)
+
+# Condition to stop and clean the experiment
+ apps = []
+ for i in xrange(len(all_hosts)):
+ apps.append(ccnds[all_hosts[i]])
+ apps.append(ccnrs[all_hosts[i]])
+ apps += link
+ apps.append(pub)
+ apps.append(stream)
+
+ ec.register_condition(apps, ResourceAction.STOP, stream, ResourceState.STARTED, "30s")
+
+ ec.register_condition(ccndstops + [killall], ResourceAction.START, apps, ResourceState.STOPPED, "1s")
+ ec.register_condition(ccndstops + [killall], ResourceAction.STOP, ccndstops, ResourceState.STARTED, "1s")
+
+# Deploy all ResourceManagers
+ ec.deploy()
+
+# Wait until the applications are finished
+ ec.wait_finished(ccndstops)
+
+# Shutdown the experiment controller
+ ec.shutdown()
+
+
--- /dev/null
+"""
+ NEPI, a framework to manage network experiments
+ Copyright (C) 2013 INRIA
+
+ This program is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+ Author: Julien Tribino <julien.tribino@inria.fr>
+
+ Example :
+ - Testbed : Plexus
+ - Explanation :
+
+ Test the STDIN Message
+
+ Node
+ wlab17
+ 0
+ |
+ |
+ 0
+ Application CTRL_test.rb
+
+ - Experiment:
+ * t0 : Deployment
+ * t1 : After the application started, one stdin message is sent
+ * t2 (t1 + 5s) : An other message is send
+
+"""
+
+#!/usr/bin/env python
+from nepi.execution.resource import ResourceFactory, ResourceAction, ResourceState
+from nepi.execution.ec import ExperimentController
+
+import time
+
+# Create the EC
+ec = ExperimentController()
+
+# Create and Configure the Nodes
+node1 = ec.register_resource("OMFNode")
+ec.set(node1, 'hostname', 'omf.plexus.wlab17')
+ec.set(node1, 'xmppSlice', "nepi")
+ec.set(node1, 'xmppHost', "xmpp-plexus.onelab.eu")
+ec.set(node1, 'xmppPort', "5222")
+ec.set(node1, 'xmppPassword', "1234")
+
+# Create and Configure the Application
+app1 = ec.register_resource("OMFApplication")
+ec.set(app1, 'appid', "robot")
+ec.set(app1, 'path', "/root/CTRL_test.rb")
+ec.set(app1, 'args', "coord.csv")
+ec.set(app1, 'env', "DISPLAY=localhost:10.0 XAUTHORITY=/root/.Xauthority")
+
+# Connection
+ec.register_connection(app1, node1)
+
+ec.register_condition(app1, ResourceAction.STOP, app1, ResourceState.STARTED , "20s")
+
+# Deploy
+ec.deploy()
+
+ec.wait_started([app1])
+ec.set(app1, 'stdin', "xxxxxxxxxxxxxxxxx")
+
+time.sleep(5)
+ec.set(app1, 'stdin', "xxxxxxxxxxxxxxxxx")
+
+ec.wait_finished([app1])
+
+# Stop Experiment
+ec.shutdown()
--- /dev/null
+"""
+ NEPI, a framework to manage network experiments
+ Copyright (C) 2013 INRIA
+
+ This program is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+ Author: Julien Tribino <julien.tribino@inria.fr>
+
+ Example :
+ - Testbed : iMinds
+ - Explanation :
+
+ Test the STDIN Message
+
+ Node Zotack
+ node0.nepi-robot.nepi.wilab2.ilabt.iminds.be
+ 0
+ |
+ |
+ 0
+ Application RobotCTRLComm.rb
+
+ - Experiment:
+ - t0 : Deployment
+ - t1 : After the application started, send the message START_DRIVE
+ - t2 (t1 + 83s) : Open Left eye of robot 1
+ - t3 (t2 + 2s) : Open Left eye of robot 2
+
+"""
+
+from nepi.execution.resource import ResourceFactory, ResourceAction, ResourceState
+from nepi.execution.ec import ExperimentController
+
+# Create the EC
+ec = ExperimentController()
+
+# Create and Configure the Node
+node1 = ec.register_resource("OMFNode")
+ # If the hostname is not declared, Nepi will take SFA to provision one.
+ec.set(node1, 'hostname', 'node0.nepi-robot.nepi.wilab2.ilabt.iminds.be')
+ # XMPP credentials
+ec.set(node1, 'xmppSlice', "default_slice_iminds")
+ec.set(node1, 'xmppHost', "am.wilab2.ilabt.iminds.be")
+ec.set(node1, 'xmppPort', "5222")
+ec.set(node1, 'xmppPassword', "1234")
+
+# Create and Configure the Application
+app1 = ec.register_resource("OMFRobotApplication")
+ec.set(app1, 'appid', "robot")
+ec.set(app1, 'path', "/users/jtribino/RobotCTRLComm.rb")
+ec.set(app1, 'args', "/users/jtribino/coordinate.csv")
+ec.set(app1, 'env', " ")
+ec.set(app1, 'sources', "/home/wlab18/Desktop/coordinate.csv")
+ec.set(app1, 'sshUser', "jtribino")
+
+# Connection
+ec.register_connection(app1, node1)
+
+# The Application should run during 350sec
+ec.register_condition(app1, ResourceAction.STOP, app1, ResourceState.STARTED , "350s")
+
+# Deploy
+ec.deploy()
+
+ec.wait_started([app1])
+
+ec.set(app1, 'stdin', "START_DRIVE")
+
+time.sleep(83)
+ec.set(app1, 'stdin', "1;openlefteye")
+
+time.sleep(2)
+ec.set(app1, 'stdin', "2;openlefteye")
+
+ec.wait_finished([app1])
+
+ Stop Experiment
+ec.shutdown()
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
+ Author: Alina Quereilhac <alina.quereilhac@inria.fr>
+ Julien Tribino <julien.tribino@inria.fr>
+
+ Example :
+ - Testbed : Plexus
+ - Explanation :
+
+ VLC Streaming on VLC
+
+ Node Node
+ omf.plexus.wlab17 omf.plexus.wlab37
+ 0--------------------------------------------------0
+ | |
+ | |
+ 0 0
+ VLC Server VLC Client
+
+ - Experiment:
+ - t0 : Deployment
+ - t1 : VLC Server start
+ - t2 (t1 + 4s) : VLC Client start
+ - t3 (t2 + 22s) : Client and Server Stop
+ - t4 (t3 + 3s): Kill all the applications
+
"""
#!/usr/bin/env python
ec.set(iface1, 'type', "g")
ec.set(iface1, 'essid', "vlcexp")
ec.set(iface1, 'ip', "10.0.0.17")
-ec.set(iface1, 'xmppSlice', "nepi")
-ec.set(iface1, 'xmppHost', "xmpp-plexus.onelab.eu")
-ec.set(iface1, 'xmppPort', "5222")
-ec.set(iface1, 'xmppPassword', "1234")
iface2 = ec.register_resource("OMFWifiInterface")
ec.set(iface2, 'alias', "w0")
ec.set(iface2, 'type', 'g')
ec.set(iface2, 'essid', "vlcexp")
ec.set(iface2, 'ip', "10.0.0.37")
-ec.set(iface2, 'xmppSlice', "nepi")
-ec.set(iface2, 'xmppHost', "xmpp-plexus.onelab.eu")
-ec.set(iface2, 'xmppPort', "5222")
-ec.set(iface2, 'xmppPassword', "1234")
# Create and Configure the Channel
channel = ec.register_resource("OMFChannel")
#ec.set(app1, 'args', "--quiet /opt/big_buck_bunny_240p_mpeg4.ts --sout '#rtp{dst=10.0.0.XX,port=1234,mux=ts} '")
#ec.set(app1, 'args', "--quiet /opt/big_buck_bunny_240p_mpeg4_lq.ts --sout '#rtp{dst=10.0.0.XX,port=1234,mux=ts} '")
ec.set(app1, 'env', "DISPLAY=localhost:10.0 XAUTHORITY=/root/.Xauthority")
-ec.set(app1, 'xmppSlice', "nepi")
-ec.set(app1, 'xmppHost', "xmpp-plexus.onelab.eu")
-ec.set(app1, 'xmppPort', "5222")
-ec.set(app1, 'xmppPassword', "1234")
app2 = ec.register_resource("OMFApplication")
ec.set(app2, 'appid', 'Vlc#2')
ec.set(app2, 'path', "/opt/vlc-1.1.13/cvlc")
ec.set(app2, 'args', "--quiet rtp://10.0.0.37:1234")
ec.set(app2, 'env', "DISPLAY=localhost:10.0 XAUTHORITY=/root/.Xauthority")
-ec.set(app2, 'xmppSlice', "nepi")
-ec.set(app2, 'xmppHost', "xmpp-plexus.onelab.eu")
-ec.set(app2, 'xmppPort', "5222")
-ec.set(app2, 'xmppPassword', "1234")
app3 = ec.register_resource("OMFApplication")
ec.set(app3, 'appid', 'Kill#2')
ec.set(app3, 'path', "/usr/bin/killall")
ec.set(app3, 'args', "vlc")
ec.set(app3, 'env', " ")
-ec.set(app3, 'xmppSlice', "nepi")
-ec.set(app3, 'xmppHost', "xmpp-plexus.onelab.eu")
-ec.set(app3, 'xmppPort', "5222")
-ec.set(app3, 'xmppPassword', "1234")
# Connection
ec.register_connection(app3, node1)
Author: Alina Quereilhac <alina.quereilhac@inria.fr>
Julien Tribino <julien.tribino@inria.fr>
+ Example :
+ - Testbed : Nitos
+ - Explanation :
+
+ VLC Streaming on VLC
+
+ Node Node
+ omf.nitos.node0xx omf.nitos.node0xx
+ 0--------------------------------------------------0
+ | |
+ | |
+ 0 0
+ VLC Server VLC Client
+
+ - Experiment:
+ - t0 : Deployment
+ - t1 : VLC Server start
+ - t2 (t1 + 4s) : VLC Client start
+ - t3 (t2 + 22s) : Client and Server Stop
+ - t4 (t3 + 3s): Kill all the applications
+
"""
#!/usr/bin/env python
# Create the EC
ec = ExperimentController()
-
# Create and Configure the Nodes
node1 = ec.register_resource("OMFNode")
ec.set(node1, 'hostname', 'omf.nitos.node0XX')
ec.set(iface1, 'type', "g")
ec.set(iface1, 'essid', "vlcexp")
ec.set(iface1, 'ip', "192.168.0.XX")
-ec.set(iface1, 'xmppSlice', "ZZZ")
-ec.set(iface1, 'xmppHost', "nitlab.inf.uth.gr")
-ec.set(iface1, 'xmppPort', "5222")
-ec.set(iface1, 'xmppPassword', "1234")
iface2 = ec.register_resource("OMFWifiInterface")
ec.set(iface2, 'alias', "w0")
ec.set(iface2, 'type', 'g')
ec.set(iface2, 'essid', "vlcexp")
ec.set(iface2, 'ip', "192.168.0.YY")
-ec.set(iface2, 'xmppSlice', "ZZZ")
-ec.set(iface2, 'xmppHost', "nitlab.inf.uth.gr")
-ec.set(iface2, 'xmppPort', "5222")
-ec.set(iface2, 'xmppPassword', "1234")
# Create and Configure the Channel
channel = ec.register_resource("OMFChannel")
ec.set(app1, 'path', "/root/vlc-1.1.13/cvlc")
ec.set(app1, 'args', "/root/10-by-p0d.avi --sout '#rtp{dst=192.168.0.YY,port=1234,mux=ts}'")
ec.set(app1, 'env', "DISPLAY=localhost:10.0 XAUTHORITY=/root/.Xauthority")
-ec.set(app1, 'xmppSlice', "ZZZ")
-ec.set(app1, 'xmppHost', "nitlab.inf.uth.gr")
-ec.set(app1, 'xmppPort', "5222")
-ec.set(app1, 'xmppPassword', "1234")
app2 = ec.register_resource("OMFApplication")
ec.set(app2, 'appid', 'Vlc#2')
ec.set(app2, 'path', "/root/vlc-1.1.13/cvlc")
ec.set(app2, 'args', "rtp://192.168.0.YY:1234")
ec.set(app2, 'env', "DISPLAY=localhost:10.0 XAUTHORITY=/root/.Xauthority")
-ec.set(app2, 'xmppSlice', "ZZZ")
-ec.set(app2, 'xmppHost', "nitlab.inf.uth.gr")
-ec.set(app2, 'xmppPort', "5222")
-ec.set(app2, 'xmppPassword', "1234")
app3 = ec.register_resource("OMFApplication")
ec.set(app3, 'appid', 'Kill#2')
ec.set(app3, 'path', "/usr/bin/killall")
ec.set(app3, 'args', "vlc_app")
ec.set(app3, 'env', " ")
-ec.set(app3, 'xmppSlice', "ZZZ")
-ec.set(app3, 'xmppHost', "nitlab.inf.uth.gr")
-ec.set(app3, 'xmppPort', "5222")
-ec.set(app3, 'xmppPassword', "1234")
app4 = ec.register_resource("OMFApplication")
ec.set(app4, 'appid', 'Kill#1')
ec.set(app4, 'path', "/usr/bin/killall")
ec.set(app4, 'args', "vlc_app")
ec.set(app4, 'env', " ")
-ec.set(app4, 'xmppSlice', "ZZZ")
-ec.set(app4, 'xmppHost', "nitlab.inf.uth.gr")
-ec.set(app4, 'xmppPort', "5222")
-ec.set(app4, 'xmppPassword', "1234")
# Connection
ec.register_connection(app3, node1)
Author: Alina Quereilhac <alina.quereilhac@inria.fr>
Julien Tribino <julien.tribino@inria.fr>
+ Example :
+ - Testbed : Nitos
+ - Explanation :
+
+ VLC Streaming on VLC
+
+ Node
+ omf.nitos.node0xx
+ 0
+ |
+ |
+ 0
+ xEyes
+
+ - Experiment:
+ - t0 : Deployment
+ - t1 : xEeyes Start
+ - t2 (t1 + 10s) : xEyes stop
+ - t3 (t2 + 2s) : Kill the application
"""
#!/usr/bin/env python
ec.set(app1, 'path', "/usr/bin/xeyes")
ec.set(app1, 'args', " ")
ec.set(app1, 'env', "DISPLAY=localhost:10.0 XAUTHORITY=/root/.Xauthority")
-ec.set(app1, 'xmppSlice', "ZZZ")
-ec.set(app1, 'xmppHost', "nitlab.inf.uth.gr")
-ec.set(app1, 'xmppPort', "5222")
-ec.set(app1, 'xmppPassword', "1234")
app2 = ec.register_resource("OMFApplication")
ec.set(app2, 'appid', 'Kill#1')
ec.set(app2, 'path', "/usr/bin/kill")
ec.set(app2, 'args', "xeyes")
ec.set(app2, 'env', " ")
-ec.set(app2, 'xmppSlice', "ZZZ")
-ec.set(app2, 'xmppHost', "nitlab.inf.uth.gr")
-ec.set(app2, 'xmppPort', "5222")
-ec.set(app2, 'xmppPassword', "1234")
# Connection
ec.register_connection(app2, node1)
import sys
import time
import threading
+import weakref
+
+class FailureLevel(object):
+ """ Describes the system failure state
+ """
+ OK = 1
+ RM_FAILURE = 2
+ EC_FAILURE = 3
+
+class FailureManager(object):
+ """ The FailureManager is responsible for handling errors,
+ and deciding whether an experiment should be aborted
+ """
+
+ def __init__(self, ec):
+ self._ec = weakref.ref(ec)
+ self._failure_level = FailureLevel.OK
+
+ @property
+ def ec(self):
+ """ Returns the Experiment Controller """
+ return self._ec()
+
+ @property
+ def abort(self):
+ if self._failure_level == FailureLevel.OK:
+ for guid in self.ec.resources:
+ state = self.ec.state(guid)
+ critical = self.ec.get(guid, "critical")
+
+ if state == ResourceState.FAILED and critical:
+ self._failure_level = FailureLevel.RM_FAILURE
+ self.ec.logger.debug("RM critical failure occurred on guid %d." \
+ " Setting EC FAILURE LEVEL to RM_FAILURE" % guid)
+ break
+
+ return self._failure_level != FailureLevel.OK
+
+ def set_ec_failure(self):
+ self._failure_level = FailureLevel.EC_FAILURE
+
class ECState(object):
""" State of the Experiment Controller
.. class:: Class Args :
:param exp_id: Human readable identifier for the experiment scenario.
- It will be used in the name of the directory
- where experiment related information is stored
:type exp_id: str
.. note::
- An experiment, or scenario, is defined by a concrete use, behavior,
- configuration and interconnection of resources that describe a single
- experiment case (We call this the experiment description).
- A same experiment (scenario) can be run many times.
+ An experiment, or scenario, is defined by a concrete set of resources,
+ behavior, configuration and interconnection of those resources.
+ The Experiment Description (ED) is a detailed representation of a
+ single experiment. It contains all the necessary information to
+ allow repeating the experiment. NEPI allows to describe
+ experiments by registering components (resources), configuring them
+ and interconnecting them.
+
+ A same experiment (scenario) can be executed many times, generating
+ different results. We call an experiment execution (instance) a 'run'.
- The ExperimentController (EC), is the entity responsible for
- managing an experiment instance (run). The same scenario can be
+ The ExperimentController (EC), is the entity responsible of
+ managing an experiment run. The same scenario can be
recreated (and re-run) by instantiating an EC and recreating
the same experiment description.
single resource. ResourceManagers are specific to a resource
type (i.e. An RM to control a Linux application will not be
the same as the RM used to control a ns-3 simulation).
- In order for a new type of resource to be supported in NEPI
- a new RM must be implemented. NEPI already provides different
+ To support a new type of resource in NEPI, a new RM must be
+ implemented. NEPI already provides a variety of
RMs to control basic resources, and new can be extended from
the existing ones.
Through the EC interface the user can create ResourceManagers (RMs),
- configure them and interconnect them, in order to describe an experiment.
+ configure them and interconnect them, to describe an experiment.
Describing an experiment through the EC does not run the experiment.
- Only when the 'deploy()' method is invoked on the EC, will the EC take
+ Only when the 'deploy()' method is invoked on the EC, the EC will take
actions to transform the 'described' experiment into a 'running' experiment.
While the experiment is running, it is possible to continue to
However, since a same 'experiment' can be run many times, the experiment
id is not enough to identify an experiment instance (run).
For this reason, the ExperimentController has two identifier, the
- exp_id, which can be re-used by different ExperimentController instances,
- and the run_id, which unique to a ExperimentController instance, and
+ exp_id, which can be re-used in different ExperimentController,
+ and the run_id, which is unique to one ExperimentController instance, and
is automatically generated by NEPI.
"""
# Logging
self._logger = logging.getLogger("ExperimentController")
- # Run identifier. It identifies a concrete instance (run) of an experiment.
- # Since a same experiment (same configuration) can be run many times,
- # this id permits to identify concrete exoeriment run
+ # Run identifier. It identifies a concrete execution instance (run)
+ # of an experiment.
+ # Since a same experiment (same configuration) can be executed many
+ # times, this run_id permits to separate result files generated on
+ # different experiment executions
self._run_id = tsformat()
# Experiment identifier. Usually assigned by the user
+ # Identifies the experiment scenario (i.e. configuration,
+ # resources used, etc)
self._exp_id = exp_id or "exp-%s" % os.urandom(8).encode('hex')
# generator of globally unique ids
# Resource managers
self._resources = dict()
- # Scheduler
+ # Scheduler. It a queue that holds tasks scheduled for
+ # execution, and yields the next task to be executed
+ # ordered by execution and arrival time
self._scheduler = HeapScheduler()
# Tasks
self._tasks = dict()
- # RM groups
+ # RM groups (for deployment)
self._groups = dict()
# generator of globally unique id for groups
self._group_id_generator = guid.GuidGenerator()
-
+
+ # Flag to stop processing thread
+ self._stop = False
+
+ # Entity in charge of managing system failures
+ self._fm = FailureManager(self)
+
+ # EC state
+ self._state = ECState.RUNNING
+
+ # The runner is a pool of threads used to parallelize
+ # execution of tasks
+ nthreads = int(os.environ.get("NEPI_NTHREADS", "50"))
+ self._runner = ParallelRun(maxthreads = nthreads)
+
# Event processing thread
self._cond = threading.Condition()
self._thread = threading.Thread(target = self._process)
self._thread.setDaemon(True)
self._thread.start()
- # EC state
- self._state = ECState.RUNNING
-
@property
def logger(self):
""" Return the logger of the Experiment Controller
return self._run_id
@property
- def finished(self):
- """ Put the state of the Experiment Controller into a final state :
- Either TERMINATED or FAILED
-
- """
- return self.ecstate in [ECState.FAILED, ECState.TERMINATED]
+ def abort(self):
+ return self._fm.abort
def wait_finished(self, guids):
- """ Blocking method that wait until all the RM from the 'guid' list
- reached the state FINISHED ( or STOPPED, FAILED or RELEASED )
+ """ Blocking method that wait until all RMs in the 'guid' list
+ reach a state >= STOPPED (i.e. FINISHED, STOPPED, FAILED or
+ RELEASED ) or until a System Failure occurs (e.g. Task Failure)
:param guids: List of guids
:type guids: list
+
"""
- return self.wait(guids)
+
+ def quit():
+ return self.abort
+
+ return self.wait(guids, state = ResourceState.STOPPED,
+ quit = quit)
def wait_started(self, guids):
- """ Blocking method that wait until all the RM from the 'guid' list
- reached the state STARTED ( or STOPPED, FINISHED, FAILED, RELEASED)
+ """ Blocking method that wait until all RMs in the 'guid' list
+ reach a state >= STARTED or until a System Failure occurs
+ (e.g. Task Failure)
:param guids: List of guids
:type guids: list
"""
- return self.wait(guids, state = ResourceState.STARTED)
+
+ def quit():
+ return self.abort
+
+ return self.wait(guids, state = ResourceState.STARTED,
+ quit = quit)
def wait_released(self, guids):
- """ Blocking method that wait until all the RM from the 'guid' list
- reached the state RELEASED (or FAILED)
+ """ Blocking method that wait until all RMs in the 'guid' list
+ reach a state = RELEASED or until the EC fails
:param guids: List of guids
:type guids: list
"""
- # TODO: solve state concurrency BUG and !!!!
- # correct waited release state to state = ResourceState.FAILED)
- return self.wait(guids, state = ResourceState.FINISHED)
+
+ def quit():
+ return self._state == ECState.FAILED
+
+ return self.wait(guids, state = ResourceState.RELEASED,
+ quit = quit)
def wait_deployed(self, guids):
- """ Blocking method that wait until all the RM from the 'guid' list
- reached the state READY (or any higher state)
+ """ Blocking method that wait until all RMs in the 'guid' list
+ reach a state >= READY or until a System Failure occurs
+ (e.g. Task Failure)
:param guids: List of guids
:type guids: list
"""
- return self.wait(guids, state = ResourceState.READY)
- def wait(self, guids, state = ResourceState.STOPPED):
- """ Blocking method that waits until all the RM from the 'guid' list
- reached state 'state' or until a failure occurs
-
+ def quit():
+ return self.abort
+
+ return self.wait(guids, state = ResourceState.READY,
+ quit = quit)
+
+ def wait(self, guids, state, quit):
+ """ Blocking method that wait until all RMs in the 'guid' list
+ reach a state >= 'state' or until quit yileds True
+
:param guids: List of guids
:type guids: list
"""
if isinstance(guids, int):
guids = [guids]
- # we randomly alter the order of the guids to avoid ordering
- # dependencies (e.g. LinuxApplication RMs runing on the same
- # linux host will be synchronized by the LinuxNode SSH lock)
- random.shuffle(guids)
-
while True:
- # If no more guids to wait for or an error occured, then exit
- if len(guids) == 0 or self.finished:
+ # If there are no more guids to wait for
+ # or the quit function returns True, exit the loop
+ if len(guids) == 0 or quit():
break
# If a guid reached one of the target states, remove it from list
guid = guids[0]
rstate = self.state(guid)
+
+ hrrstate = ResourceState2str.get(rstate)
+ hrstate = ResourceState2str.get(state)
if rstate >= state:
guids.remove(guid)
+ self.logger.debug(" guid %d DONE - state is %s, required is >= %s " % (
+ guid, hrrstate, hrstate))
else:
# Debug...
- self.logger.debug(" WAITING FOR guid %d - state is %s, required is >= %s " % (guid,
- self.state(guid, hr = True), state))
-
- # Take the opportunity to 'refresh' the states of the RMs.
- # Query only the first up to N guids (not to overwhelm
- # the local machine)
- n = 100
- lim = n if len(guids) > n else ( len(guids) -1 )
- nguids = guids[0: lim]
-
- # schedule state request for all guids (take advantage of
- # scheduler multi threading).
- for guid in nguids:
- callback = functools.partial(self.state, guid)
- self.schedule("0s", callback)
-
- # If the guid is not in one of the target states, wait and
- # continue quering. We keep the sleep big to decrease the
- # number of RM state queries
- time.sleep(4)
+ self.logger.debug(" WAITING FOR guid %d - state is %s, required is >= %s " % (
+ guid, hrrstate, hrstate))
+ time.sleep(0.5)
def get_task(self, tid):
""" Get a specific task
rm = self.get_resource(guid)
rm.set_with_conditions(name, value, guids2, state, time)
- def stop_with_conditions(self, guid):
- """ Stop a specific RM defined by its 'guid' only if all the conditions are true
-
- :param guid: Guid of the RM
- :type guid: int
-
- """
- rm = self.get_resource(guid)
- return rm.stop_with_conditions()
-
- def start_with_conditions(self, guid):
- """ Start a specific RM defined by its 'guid' only if all the conditions are true
-
- :param guid: Guid of the RM
- :type guid: int
-
- """
- rm = self.get_resource(guid)
- return rm.start_with_conditions()
-
def deploy(self, guids = None, wait_all_ready = True, group = None):
""" Deploy all resource manager in guids list
self.logger.debug(" ------- DEPLOY START ------ ")
if not guids:
- # If no guids list was indicated, all 'NEW' RMs will be deployed
+ # If no guids list was passed, all 'NEW' RMs will be deployed
guids = []
for guid in self.resources:
if self.state(guid) == ResourceState.NEW:
guids = [guids]
# Create deployment group
+ # New guids can be added to a same deployment group later on
new_group = False
if not group:
new_group = True
self._groups[group].extend(guids)
- # Before starting deployment we disorder the guids list with the
- # purpose of speeding up the whole deployment process.
- # It is likely that the user inserted in the 'guids' list closely
- # resources one after another (e.g. all applications
- # connected to the same node can likely appear one after another).
- # This can originate a slow down in the deployment since the N
- # threads the parallel runner uses to processes tasks may all
- # be taken up by the same family of resources waiting for the
- # same conditions (e.g. LinuxApplications running on a same
- # node share a single lock, so they will tend to be serialized).
- # If we disorder the guids list, this problem can be mitigated.
- random.shuffle(guids)
-
def wait_all_and_start(group):
+ # Function that checks if all resources are READY
+ # before scheduling a start_with_conditions for each RM
reschedule = False
# Get all guids in group
callback = functools.partial(wait_all_and_start, group)
self.schedule("1s", callback)
else:
- # If all resources are read, we schedule the start
+ # If all resources are ready, we schedule the start
for guid in guids:
rm = self.get_resource(guid)
self.schedule("0s", rm.start_with_conditions)
if wait_all_ready and new_group:
# Schedule a function to check that all resources are
# READY, and only then schedule the start.
- # This aimes at reducing the number of tasks looping in the
+ # This aims at reducing the number of tasks looping in the
# scheduler.
- # Intead of having N start tasks, we will have only one for
+ # Instead of having many start tasks, we will have only one for
# the whole group.
callback = functools.partial(wait_all_and_start, group)
- self.schedule("1s", callback)
+ self.schedule("0s", callback)
for guid in guids:
rm = self.get_resource(guid)
self.schedule("0s", rm.deploy_with_conditions)
if not wait_all_ready:
- self.schedule("1s", rm.start_with_conditions)
+ self.schedule("0s", rm.start_with_conditions)
if rm.conditions.get(ResourceAction.STOP):
# Only if the RM has STOP conditions we
# schedule a stop. Otherwise the RM will stop immediately
- self.schedule("2s", rm.stop_with_conditions)
+ self.schedule("0s", rm.stop_with_conditions)
def release(self, guids = None):
""" Release al RMs on the guids list or
if not guids:
guids = self.resources
+ # Remove all pending tasks from the scheduler queue
+ for tid in list(self._scheduler.pending):
+ self._scheduler.remove(tid)
+
+ self._runner.empty()
+
for guid in guids:
rm = self.get_resource(guid)
self.schedule("0s", rm.release)
Releases all the resources and stops task processing thread
"""
+ # If there was a major failure we can't exit gracefully
+ if self._state == ECState.FAILED:
+ raise RuntimeError("EC failure. Can not exit gracefully")
+
self.release()
# Mark the EC state as TERMINATED
self._state = ECState.TERMINATED
+ # Stop processing thread
+ self._stop = True
+
# Notify condition to wake up the processing thread
self._notify()
that might have been raised by the workers.
"""
- nthreads = int(os.environ.get("NEPI_NTHREADS", "50"))
- runner = ParallelRun(maxthreads = nthreads)
- runner.start()
+ self._runner.start()
- try:
- while not self.finished:
+ while not self._stop:
+ try:
self._cond.acquire()
task = self._scheduler.next()
if task:
# Process tasks in parallel
- runner.put(self._execute, task)
- except:
- import traceback
- err = traceback.format_exc()
- self.logger.error("Error while processing tasks in the EC: %s" % err)
+ self._runner.put(self._execute, task)
+ except:
+ import traceback
+ err = traceback.format_exc()
+ self.logger.error("Error while processing tasks in the EC: %s" % err)
+
+ # Set the EC to FAILED state
+ self._state = ECState.FAILED
+
+ # Set the FailureManager failure level to EC failure
+ self._fm.set_ec_failure()
- self._state = ECState.FAILED
- finally:
- self.logger.debug("Exiting the task processing loop ... ")
- runner.sync()
- runner.destroy()
+ self.logger.debug("Exiting the task processing loop ... ")
+
+ self._runner.sync()
+ self._runner.destroy()
def _execute(self, task):
""" Executes a single task.
self.logger.error("Error occurred while executing task: %s" % err)
- # Set the EC to FAILED state (this will force to exit the task
- # processing thread)
- self._state = ECState.FAILED
-
- # Notify condition to wake up the processing thread
- self._notify()
-
- # Propage error to the ParallelRunner
- raise
-
def _notify(self):
""" Awakes the processing thread in case it is blocked waiting
for a new task to be scheduled.
from nepi.util.timefuncs import tnow, tdiff, tdiffsec, stabsformat
from nepi.util.logger import Logger
+from nepi.execution.attribute import Attribute, Flags, Types
from nepi.execution.trace import TraceAttr
import copy
cls._clsinit_copy()
return cls
+def failtrap(func):
+ def wrapped(self, *args, **kwargs):
+ try:
+ return func(self, *args, **kwargs)
+ except:
+ import traceback
+ err = traceback.format_exc()
+ self.error(err)
+ self.debug("SETTING guid %d to state FAILED" % self.guid)
+ self.fail()
+ raise
+
+ return wrapped
+
# Decorator to invoke class initialization method
@clsinit
class ResourceManager(Logger):
resource attributes
"""
- pass
+ critical = Attribute("critical", "Defines whether the resource is critical. "
+ " A failure on a critical resource will interrupt the experiment. ",
+ type = Types.Bool,
+ default = True,
+ flags = Flags.ExecReadOnly)
+ cls._register_attribute(critical)
+
@classmethod
def _register_traces(cls):
""" Resource subclasses will invoke this method to register
@property
def state(self):
- """ Get the state of the current RM """
+ """ Get the current state of the RM """
return self._state
def log_message(self, msg):
def discover(self):
""" Performs resource discovery.
- This method is resposible for selecting an individual resource
+ This method is responsible for selecting an individual resource
matching user requirements.
This method should be redefined when necessary in child classes.
+
+ If overridden in child classes, make sure to use the failtrap
+ decorator to ensure the RM state will be set to FAILED in the event
+ of an exception.
+
"""
self.set_discovered()
def provision(self):
""" Performs resource provisioning.
- This method is resposible for provisioning one resource.
+ This method is responsible for provisioning one resource.
After this method has been successfully invoked, the resource
- should be acccesible/controllable by the RM.
+ should be accessible/controllable by the RM.
This method should be redefined when necessary in child classes.
+
+ If overridden in child classes, make sure to use the failtrap
+ decorator to ensure the RM state will be set to FAILED in the event
+ of an exception.
+
"""
self.set_provisioned()
def start(self):
- """ Starts the resource.
+ """ Starts the RM.
There is no generic start behavior for all resources.
This method should be redefined when necessary in child classes.
+
+ If overridden in child classes, make sure to use the failtrap
+ decorator to ensure the RM state will be set to FAILED in the event
+ of an exception.
+
"""
if not self.state in [ResourceState.READY, ResourceState.STOPPED]:
self.error("Wrong state %s for start" % self.state)
self.set_started()
def stop(self):
- """ Stops the resource.
+ """ Interrupts the RM, stopping any tasks the RM was performing.
There is no generic stop behavior for all resources.
This method should be redefined when necessary in child classes.
+
+ If overridden in child classes, make sure to use the failtrap
+ decorator to ensure the RM state will be set to FAILED in the event
+ of an exception.
+
"""
if not self.state in [ResourceState.STARTED]:
self.error("Wrong state %s for stop" % self.state)
self.set_stopped()
def deploy(self):
- """ Execute all steps required for the RM to reach the state READY
+ """ Execute all steps required for the RM to reach the state READY.
+
+ This method is responsible for deploying the resource (and invoking the
+ discover and provision methods).
+ This method should be redefined when necessary in child classes.
+
+ If overridden in child classes, make sure to use the failtrap
+ decorator to ensure the RM state will be set to FAILED in the event
+ of an exception.
"""
if self.state > ResourceState.READY:
self.set_ready()
def release(self):
+ """ Perform actions to free resources used by the RM.
+
+ This method is responsible for releasing resources that were
+ used during the experiment by the RM.
+ This method should be redefined when necessary in child classes.
+
+ If overridden in child classes, this method should never
+ raise an error and it must ensure the RM is set to state RELEASED.
+
+ """
self.set_released()
def finish(self):
+ """ Sets the RM to state FINISHED.
+
+ The FINISHED state is different from STOPPED in that it should not be
+ directly invoked by the user.
+ STOPPED indicates that the user interrupted the RM, FINISHED means
+ that the RM concluded normally the actions it was supposed to perform.
+ This method should be redefined when necessary in child classes.
+
+ If overridden in child classes, make sure to use the failtrap
+ decorator to ensure the RM state will be set to FAILED in the event
+ of an exception.
+
+ """
+
self.set_finished()
def fail(self):
+ """ Sets the RM to state FAILED.
+
+ """
+
self.set_failed()
def set(self, name, value):
reschedule = False
delay = reschedule_delay
- ## evaluate if set conditions are met
+ ## evaluate if conditions to start are met
+ if self.ec.abort:
+ return
- # only can start when RM is either STOPPED or READY
+ # Can only start when RM is either STOPPED or READY
if self.state not in [ResourceState.STOPPED, ResourceState.READY]:
reschedule = True
self.debug("---- RESCHEDULING START ---- state %s " % self.state )
reschedule = False
delay = reschedule_delay
- ## evaluate if set conditions are met
+ ## evaluate if conditions to stop are met
+ if self.ec.abort:
+ return
# only can stop when RM is STARTED
if self.state != ResourceState.STARTED:
reschedule = True
+ self.debug("---- RESCHEDULING STOP ---- state %s " % self.state )
else:
self.debug(" ---- STOP CONDITIONS ---- %s" %
self.conditions.get(ResourceAction.STOP))
reschedule = False
delay = reschedule_delay
- ## evaluate if set conditions are met
+ ## evaluate if conditions to deploy are met
+ if self.ec.abort:
+ return
# only can deploy when RM is either NEW, DISCOVERED or PROVISIONED
if self.state not in [ResourceState.NEW, ResourceState.DISCOVERED,
self.debug("----- STARTING ---- ")
self.deploy()
-
def connect(self, guid):
""" Performs actions that need to be taken upon associating RMs.
This method should be redefined when necessary in child classes.
def set_started(self):
""" Mark ResourceManager as STARTED """
- self._start_time = tnow()
- self._state = ResourceState.STARTED
+ self.set_state(ResourceState.STARTED, "_start_time")
def set_stopped(self):
""" Mark ResourceManager as STOPPED """
- self._stop_time = tnow()
- self._state = ResourceState.STOPPED
+ self.set_state(ResourceState.STOPPED, "_stop_time")
def set_ready(self):
""" Mark ResourceManager as READY """
- self._ready_time = tnow()
- self._state = ResourceState.READY
+ self.set_state(ResourceState.READY, "_ready_time")
def set_released(self):
""" Mark ResourceManager as REALEASED """
- self._release_time = tnow()
- self._state = ResourceState.RELEASED
+ self.set_state(ResourceState.RELEASED, "_release_time")
def set_finished(self):
""" Mark ResourceManager as FINISHED """
- self._finish_time = tnow()
- self._state = ResourceState.FINISHED
+ self.set_state(ResourceState.FINISHED, "_finish_time")
def set_failed(self):
""" Mark ResourceManager as FAILED """
- self._failed_time = tnow()
- self._state = ResourceState.FAILED
+ self.set_state(ResourceState.FAILED, "_failed_time")
def set_discovered(self):
""" Mark ResourceManager as DISCOVERED """
- self._discover_time = tnow()
- self._state = ResourceState.DISCOVERED
+ self.set_state(ResourceState.DISCOVERED, "_discover_time")
def set_provisioned(self):
""" Mark ResourceManager as PROVISIONED """
- self._provision_time = tnow()
- self._state = ResourceState.PROVISIONED
+ self.set_state(ResourceState.PROVISIONED, "_provision_time")
+
+ def set_state(self, state, state_time_attr):
+ # Ensure that RM state will not change after released
+ if self._state == ResourceState.RELEASED:
+ return
+
+ setattr(self, state_time_attr, tnow())
+ self._state = state
class ResourceFactory(object):
_resource_types = dict()
DONE = 1
ERROR = 2
-
class Task(object):
""" This class is to define a task, that is represented by an id,
an execution time 'timestamp' and an action 'callback """
self._valid = set()
self._idgen = itertools.count(1)
+ @property
+ def pending(self):
+ """ Returns the list of pending task ids """
+ return self._valid
+
def schedule(self, task):
""" Add the task 'task' in the heap of the scheduler
from nepi.execution.attribute import Attribute, Flags, Types
from nepi.execution.trace import Trace, TraceAttr
-from nepi.execution.resource import ResourceManager, clsinit, ResourceState, \
- ResourceAction
+from nepi.execution.resource import ResourceManager, clsinit_copy, \
+ ResourceState, ResourceAction, failtrap
from nepi.util.sshfuncs import ProcStatus
import os
import tempfile
-@clsinit
+@clsinit_copy
class Collector(ResourceManager):
""" The collector is reponsible of collecting traces
of a same type associated to RMs into a local directory.
@property
def store_path(self):
return self._store_path
-
+
+ @failtrap
def provision(self):
trace_name = self.get("traceName")
if not trace_name:
super(Collector, self).provision()
+ @failtrap
def deploy(self):
- try:
- self.discover()
- self.provision()
- except:
- self.fail()
- raise
+ self.discover()
+ self.provision()
super(Collector, self).deploy()
def release(self):
- trace_name = self.get("traceName")
- rename = self.get("rename") or trace_name
-
- msg = "Collecting '%s' traces to local directory %s" % (
- trace_name, self.store_path)
- self.info(msg)
-
- rms = self.get_connected()
- for rm in rms:
- result = self.ec.trace(rm.guid, trace_name)
- fpath = os.path.join(self.store_path, "%d.%s" % (rm.guid,
- rename))
- try:
- f = open(fpath, "w")
- f.write(result)
- f.close()
- except:
- msg = "Couldn't retrieve trace %s for %d at %s " % (trace_name,
- rm.guid, fpath)
- self.error(msg)
- continue
+ try:
+ trace_name = self.get("traceName")
+ rename = self.get("rename") or trace_name
+
+ msg = "Collecting '%s' traces to local directory %s" % (
+ trace_name, self.store_path)
+ self.info(msg)
+
+ rms = self.get_connected()
+ for rm in rms:
+ result = self.ec.trace(rm.guid, trace_name)
+ fpath = os.path.join(self.store_path, "%d.%s" % (rm.guid,
+ rename))
+ try:
+ f = open(fpath, "w")
+ f.write(result)
+ f.close()
+ except:
+ msg = "Couldn't retrieve trace %s for %d at %s " % (trace_name,
+ rm.guid, fpath)
+ self.error(msg)
+ continue
+ except:
+ import traceback
+ err = traceback.format_exc()
+ self.error(err)
super(Collector, self).release()
from nepi.execution.attribute import Attribute, Flags, Types
from nepi.execution.trace import Trace, TraceAttr
-from nepi.execution.resource import ResourceManager, clsinit, ResourceState, \
- reschedule_delay
+from nepi.execution.resource import ResourceManager, clsinit_copy, \
+ ResourceState, reschedule_delay, failtrap
from nepi.resources.linux.node import LinuxNode
from nepi.util.sshfuncs import ProcStatus
from nepi.util.timefuncs import tnow, tdiffsec
# TODO: Resolve wildcards in commands!!
# TODO: When a failure occurs during deployment, scp and ssh processes are left running behind!!
-@clsinit
+@clsinit_copy
class LinuxApplication(ResourceManager):
"""
.. class:: Class Args :
_help = "Runs an application on a Linux host with a BASH command "
_backend_type = "linux"
-
@classmethod
def _register_attributes(cls):
command = Attribute("command", "Command to execute at application start. "
out = int(out.strip())
return out
-
+
+ @failtrap
def provision(self):
# create run dir for application
self.node.mkdir(self.run_home)
# Since provisioning takes a long time, before
# each step we check that the EC is still
for step in steps:
- if self.ec.finished:
- raise RuntimeError, "EC finished"
+ if self.ec.abort:
+ self.debug("Interrupting provisioning. EC says 'ABORT")
+ return
ret = step()
if ret:
# replace application specific paths in the command
return self.replace_paths(install)
+ @failtrap
def deploy(self):
# Wait until node is associated and deployed
node = self.node
self.debug("---- RESCHEDULING DEPLOY ---- node state %s " % self.node.state )
self.ec.schedule(reschedule_delay, self.deploy)
else:
- try:
- command = self.get("command") or ""
- self.info("Deploying command '%s' " % command)
- self.discover()
- self.provision()
- except:
- self.fail()
- raise
+ command = self.get("command") or ""
+ self.info("Deploying command '%s' " % command)
+ self.discover()
+ self.provision()
super(LinuxApplication, self).deploy()
-
+
+ @failtrap
def start(self):
command = self.get("command")
# installation), then the application is directly marked as FINISHED
self.set_finished()
else:
-
if self.in_foreground:
self._run_in_foreground()
else:
command = self.get("command")
sudo = self.get("sudo") or False
x11 = self.get("forwardX11")
+ env = self.get("env")
# For a command being executed in foreground, if there is stdin,
# it is expected to be text string not a file or pipe
# to be able to kill the process from the stop method.
# We also set blocking = False, since we don't want the
# thread to block until the execution finishes.
- (out, err), self._proc = self.execute_command(self, command,
+ (out, err), self._proc = self.execute_command(command,
env = env,
sudo = sudo,
stdin = stdin,
blocking = False)
if self._proc.poll():
- self.fail()
self.error(msg, out, err)
raise RuntimeError, msg
msg = " Failed to start command '%s' " % command
if proc.poll():
- self.fail()
self.error(msg, out, err)
raise RuntimeError, msg
# Out is what was written in the stderr file
if err:
- self.fail()
msg = " Failed to start command '%s' " % command
self.error(msg, out, err)
raise RuntimeError, msg
-
+
+ @failtrap
def stop(self):
""" Stops application execution
"""
if proc.poll() or err:
msg = " Failed to STOP command '%s' " % self.get("command")
self.error(msg, out, err)
- self.fail()
- if self.state == ResourceState.STARTED:
super(LinuxApplication, self).stop()
def release(self):
self.info("Releasing resource")
- tear_down = self.get("tearDown")
- if tear_down:
- self.node.execute(tear_down)
+ try:
+ tear_down = self.get("tearDown")
+ if tear_down:
+ self.node.execute(tear_down)
- self.stop()
+ self.stop()
+ except:
+ import traceback
+ err = traceback.format_exc()
+ self.error(err)
- if self.state != ResourceState.FAILED:
- self.info("Resource released")
-
- super(LinuxApplication, self).release()
-
+ super(LinuxApplication, self).release()
+
@property
def state(self):
""" Returns the state of the application
# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
from nepi.execution.resource import clsinit_copy, ResourceState, \
- reschedule_delay
+ reschedule_delay, failtrap
from nepi.resources.linux.application import LinuxApplication
from nepi.resources.linux.ccn.ccnd import LinuxCCND
from nepi.util.timefuncs import tnow, tdiffsec
if self.ccnd: return self.ccnd.node
return None
+ @failtrap
def deploy(self):
if not self.ccnd or self.ccnd.state < ResourceState.READY:
self.debug("---- RESCHEDULING DEPLOY ---- node state %s " % self.node.state )
self.ec.schedule(reschedule_delay, self.deploy)
else:
- try:
- command = self.get("command") or ""
+ command = self.get("command") or ""
- self.info("Deploying command '%s' " % command)
-
- if not self.get("env"):
- self.set("env", self._environment)
+ self.info("Deploying command '%s' " % command)
+
+ if not self.get("env"):
+ self.set("env", self._environment)
+
+ self.discover()
+ self.provision()
- self.discover()
- self.provision()
- except:
- self.fail()
- raise
-
self.debug("----- READY ---- ")
self.set_ready()
from nepi.execution.attribute import Attribute, Flags, Types
from nepi.execution.resource import clsinit_copy, ResourceState, \
- ResourceAction, reschedule_delay
+ ResourceAction, reschedule_delay, failtrap
from nepi.resources.linux.application import LinuxApplication
from nepi.resources.linux.ccn.ccnr import LinuxCCNR
from nepi.util.timefuncs import tnow
if self.ccnr: return self.ccnr.node
return None
+ @failtrap
def deploy(self):
if not self.ccnr or self.ccnr.state < ResourceState.READY:
self.debug("---- RESCHEDULING DEPLOY ---- node state %s " % self.node.state )
# ccnr needs to wait until ccnd is deployed and running
self.ec.schedule(reschedule_delay, self.deploy)
else:
- try:
- if not self.get("command"):
- self.set("command", self._start_command)
+ if not self.get("command"):
+ self.set("command", self._start_command)
- if not self.get("env"):
- self.set("env", self._environment)
+ if not self.get("env"):
+ self.set("env", self._environment)
- # set content to stdin, so the content will be
- # uploaded during provision
- self.set("stdin", self.get("content"))
+ # set content to stdin, so the content will be
+ # uploaded during provision
+ self.set("stdin", self.get("content"))
- command = self.get("command")
+ command = self.get("command")
+
+ self.info("Deploying command '%s' " % command)
- self.info("Deploying command '%s' " % command)
+ self.discover()
+ self.provision()
- self.discover()
- self.provision()
- except:
- self.fail()
- raise
-
self.debug("----- READY ---- ")
self.set_ready()
env, blocking = True)
if proc.poll():
- self.fail()
msg = "Failed to execute command"
self.error(msg, out, err)
raise RuntimeError, msg
+ @failtrap
def start(self):
if self.state == ResourceState.READY:
command = self.get("command")
else:
msg = " Failed to execute command '%s'" % command
self.error(msg, out, err)
- sef.fail()
raise RuntimeError, msg
@property
from nepi.execution.attribute import Attribute, Flags, Types
from nepi.execution.trace import Trace, TraceAttr
from nepi.execution.resource import ResourceManager, clsinit_copy, \
- ResourceState, reschedule_delay
+ ResourceState, reschedule_delay, failtrap
from nepi.resources.linux.application import LinuxApplication
from nepi.resources.linux.node import OSType
from nepi.util.timefuncs import tnow, tdiffsec
def path(self):
return "PATH=$PATH:${BIN}/%s/" % self.version
+ @failtrap
def deploy(self):
if not self.node or self.node.state < ResourceState.READY:
self.debug("---- RESCHEDULING DEPLOY ---- node state %s " % self.node.state )
# ccnd needs to wait until node is deployed and running
self.ec.schedule(reschedule_delay, self.deploy)
else:
- try:
- if not self.get("command"):
- self.set("command", self._start_command)
-
- if not self.get("depends"):
- self.set("depends", self._dependencies)
+ if not self.get("command"):
+ self.set("command", self._start_command)
+
+ if not self.get("depends"):
+ self.set("depends", self._dependencies)
- if not self.get("sources"):
- self.set("sources", self._sources)
+ if not self.get("sources"):
+ self.set("sources", self._sources)
- sources = self.get("sources")
- source = sources.split(" ")[0]
- basename = os.path.basename(source)
- self._version = ( basename.strip().replace(".tar.gz", "")
- .replace(".tar","")
- .replace(".gz","")
- .replace(".zip","") )
+ sources = self.get("sources")
+ source = sources.split(" ")[0]
+ basename = os.path.basename(source)
+ self._version = ( basename.strip().replace(".tar.gz", "")
+ .replace(".tar","")
+ .replace(".gz","")
+ .replace(".zip","") )
- if not self.get("build"):
- self.set("build", self._build)
+ if not self.get("build"):
+ self.set("build", self._build)
- if not self.get("install"):
- self.set("install", self._install)
+ if not self.get("install"):
+ self.set("install", self._install)
- if not self.get("env"):
- self.set("env", self._environment)
+ if not self.get("env"):
+ self.set("env", self._environment)
- command = self.get("command")
+ command = self.get("command")
- self.info("Deploying command '%s' " % command)
+ self.info("Deploying command '%s' " % command)
+
+ self.discover()
+ self.provision()
- self.discover()
- self.provision()
- except:
- self.fail()
- raise
-
self.debug("----- READY ---- ")
self.set_ready()
env = env,
raise_on_error = True)
+ @failtrap
def start(self):
if self.state == ResourceState.READY:
command = self.get("command")
else:
msg = " Failed to execute command '%s'" % command
self.error(msg, out, err)
- self.set_failed()
raise RuntimeError, msg
+ @failtrap
def stop(self):
command = self.get('command') or ''
state_check_delay = 0.5
if self._state == ResourceState.STARTED and \
tdiffsec(tnow(), self._last_state_check) > state_check_delay:
- (out, err), proc = self._ccndstatus
+ (out, err), proc = self._ccndstatus()
retcode = proc.poll()
return self._state
- @property
def _ccndstatus(self):
env = self.get('env') or ""
environ = self.node.format_environment(env, inline = True)
# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
from nepi.execution.attribute import Attribute, Flags, Types
-from nepi.execution.resource import ResourceManager, clsinit_copy, ResourceState, \
- reschedule_delay
+from nepi.execution.resource import ResourceManager, clsinit_copy, \
+ ResourceState, reschedule_delay, failtrap
from nepi.resources.linux.ccn.ccnpingserver import LinuxCCNPingServer
from nepi.util.timefuncs import tnow, tdiffsec
if ccnpingserver: return ccnpingserver[0]
return None
+ @failtrap
def start(self):
if not self.ccnpingserver or \
self.ccnpingserver.state < ResourceState.STARTED:
# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
from nepi.execution.attribute import Attribute, Flags, Types
-from nepi.execution.resource import ResourceManager, clsinit_copy, ResourceState, \
- reschedule_delay
+from nepi.execution.resource import ResourceManager, clsinit_copy, \
+ ResourceState, reschedule_delay, failtrap
from nepi.resources.linux.ccn.ccnapplication import LinuxCCNApplication
from nepi.util.timefuncs import tnow, tdiffsec
super(LinuxCCNPingServer, self).__init__(ec, guid)
self._home = "ccnping-serv-%s" % self.guid
+ @failtrap
def deploy(self):
if not self.get("command"):
self.set("command", self._start_command)
from nepi.execution.attribute import Attribute, Flags, Types
from nepi.execution.trace import Trace, TraceAttr
from nepi.execution.resource import clsinit_copy, ResourceState, \
- ResourceAction, reschedule_delay
+ ResourceAction, reschedule_delay, failtrap
from nepi.resources.linux.application import LinuxApplication
from nepi.resources.linux.ccn.ccnd import LinuxCCND
from nepi.util.timefuncs import tnow
if self.ccnd: return self.ccnd.node
return None
+ @failtrap
def deploy(self):
if not self.ccnd or self.ccnd.state < ResourceState.READY:
self.debug("---- RESCHEDULING DEPLOY ---- CCND state %s " % self.ccnd.state )
# ccnr needs to wait until ccnd is deployed and running
self.ec.schedule(reschedule_delay, self.deploy)
else:
- try:
- if not self.get("command"):
- self.set("command", self._start_command)
+ if not self.get("command"):
+ self.set("command", self._start_command)
- if not self.get("env"):
- self.set("env", self._environment)
+ if not self.get("env"):
+ self.set("env", self._environment)
- command = self.get("command")
+ command = self.get("command")
+
+ self.info("Deploying command '%s' " % command)
- self.info("Deploying command '%s' " % command)
+ self.discover()
+ self.provision()
- self.discover()
- self.provision()
- except:
- self.fail()
- raise
-
self.debug("----- READY ---- ")
self.set_ready()
env = env,
raise_on_error = True)
+ @failtrap
def start(self):
if self.state == ResourceState.READY:
command = self.get("command")
else:
msg = " Failed to execute command '%s'" % command
self.error(msg, out, err)
- self.fail()
raise RuntimeError, msg
@property
from nepi.execution.attribute import Attribute, Flags, Types
from nepi.execution.trace import Trace, TraceAttr
from nepi.execution.resource import clsinit_copy, ResourceState, \
- ResourceAction, reschedule_delay
+ ResourceAction, reschedule_delay, failtrap
from nepi.resources.linux.application import LinuxApplication
from nepi.resources.linux.ccn.ccnd import LinuxCCND
from nepi.util.timefuncs import tnow
return self.ec.trace(self._traceroute, "stdout", attr, block, offset)
return super(LinuxFIBEntry, self).trace(name, attr, block, offset)
-
+
+ @failtrap
def deploy(self):
# Wait until associated ccnd is provisioned
if not self.ccnd or self.ccnd.state < ResourceState.READY:
# ccnr needs to wait until ccnd is deployed and running
self.ec.schedule(reschedule_delay, self.deploy)
else:
- try:
- if not self.get("ip"):
- host = self.get("host")
- ip = socket.gethostbyname(host)
- self.set("ip", ip)
+ if not self.get("ip"):
+ host = self.get("host")
+ ip = socket.gethostbyname(host)
+ self.set("ip", ip)
+
+ if not self.get("command"):
+ self.set("command", self._start_command)
- if not self.get("command"):
- self.set("command", self._start_command)
+ if not self.get("env"):
+ self.set("env", self._environment)
- if not self.get("env"):
- self.set("env", self._environment)
+ command = self.get("command")
- command = self.get("command")
+ self.info("Deploying command '%s' " % command)
- self.info("Deploying command '%s' " % command)
+ self.discover()
+ self.provision()
+ self.configure()
- self.discover()
- self.provision()
- self.configure()
- except:
- self.fail()
- raise
-
self.debug("----- READY ---- ")
self.set_ready()
if proc.poll():
msg = "Failed to execute command"
self.error(msg, out, err)
- self.fail()
raise RuntimeError, msg
def configure(self):
# schedule mtr deploy
self.ec.deploy(guids=[self._traceroute], group = self.deployment_group)
+ @failtrap
def start(self):
if self.state == ResourceState.READY:
command = self.get("command")
else:
msg = " Failed to execute command '%s'" % command
self.error(msg, out, err)
- self.fail()
raise RuntimeError, msg
+ @failtrap
def stop(self):
command = self.get('command')
env = self.get('env')
if err:
msg = " Failed to execute command '%s'" % command
self.error(msg, out, err)
- self.fail()
raise RuntimeError, msg
@property
# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
from nepi.execution.attribute import Attribute, Flags
-from nepi.execution.resource import ResourceManager, clsinit, ResourceState
+from nepi.execution.resource import ResourceManager, clsinit_copy, \
+ ResourceState
-@clsinit
+@clsinit_copy
class LinuxChannel(ResourceManager):
_rtype = "LinuxChannel"
_help = "Represents a wireless channel on a network of Linux hosts"
def valid_connection(self, guid):
# TODO: Validate!
return True
+
# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
from nepi.execution.attribute import Attribute, Types, Flags
-from nepi.execution.resource import ResourceManager, clsinit, ResourceState, \
- reschedule_delay
+from nepi.execution.resource import ResourceManager, clsinit_copy, \
+ ResourceState, reschedule_delay, failtrap
from nepi.resources.linux.node import LinuxNode
from nepi.resources.linux.channel import LinuxChannel
# TODO: UP, MTU attributes!
-@clsinit
+@clsinit_copy
class LinuxInterface(ResourceManager):
_rtype = "LinuxInterface"
_help = "Controls network devices on Linux hosts through the ifconfig tool"
if chan: return chan[0]
return None
+ @failtrap
def discover(self):
devname = self.get("deviceName")
ip4 = self.get("ip4")
super(LinuxInterface, self).discover()
+ @failtrap
def provision(self):
devname = self.get("deviceName")
ip4 = self.get("ip4")
super(LinuxInterface, self).provision()
+ @failtrap
def deploy(self):
# Wait until node is provisioned
node = self.node
else:
# Verify if the interface exists in node. If not, configue
# if yes, load existing configuration
- try:
- self.discover()
- self.provision()
- except:
- self.fail()
- raise
+ self.discover()
+ self.provision()
super(LinuxInterface, self).deploy()
def release(self):
- tear_down = self.get("tearDown")
- if tear_down:
- self.execute(tear_down)
+ try:
+ tear_down = self.get("tearDown")
+ if tear_down:
+ self.execute(tear_down)
+ except:
+ import traceback
+ err = traceback.format_exc()
+ self.error(err)
super(LinuxInterface, self).release()
# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
from nepi.execution.attribute import Attribute, Flags, Types
-from nepi.execution.resource import clsinit_copy
+from nepi.execution.resource import clsinit_copy, failtrap
from nepi.resources.linux.application import LinuxApplication
from nepi.util.timefuncs import tnow
self._home = "mtr-%s" % self.guid
self._sudo_kill = True
+ @failtrap
def deploy(self):
if not self.get("command"):
self.set("command", self._start_command)
#
# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
-from nepi.execution.attribute import Attribute, Flags
-from nepi.execution.resource import ResourceManager, clsinit, ResourceState, \
- reschedule_delay
+from nepi.execution.attribute import Attribute, Flags, Types
+from nepi.execution.resource import ResourceManager, clsinit_copy, \
+ ResourceState, reschedule_delay, failtrap
from nepi.resources.linux import rpmfuncs, debfuncs
from nepi.util import sshfuncs, execfuncs
from nepi.util.sshfuncs import ProcStatus
UBUNTU = "ubuntu"
DEBIAN = "debian"
-@clsinit
+@clsinit_copy
class LinuxNode(ResourceManager):
"""
.. class:: Class Args :
clean_home = Attribute("cleanHome", "Remove all nepi files and directories "
" from node home folder before starting experiment",
+ type = Types.Bool,
+ default = False,
flags = Flags.ExecReadOnly)
clean_experiment = Attribute("cleanExperiment", "Remove all files and directories "
" from a previous same experiment, before the new experiment starts",
+ type = Types.Bool,
+ default = False,
flags = Flags.ExecReadOnly)
clean_processes = Attribute("cleanProcesses",
"Kill all running processes before starting experiment",
+ type = Types.Bool,
+ default = False,
flags = Flags.ExecReadOnly)
tear_down = Attribute("tearDown", "Bash script to be executed before " + \
time.sleep(min(30.0, retrydelay))
retrydelay *= 1.5
-
@property
def use_deb(self):
return self.os in [OSType.DEBIAN, OSType.UBUNTU]
def localhost(self):
return self.get("hostname") in ['localhost', '127.0.0.7', '::1']
+ @failtrap
def provision(self):
# check if host is alive
if not self.is_alive():
- self.fail()
-
msg = "Deploy failed. Unresponsive node %s" % self.get("hostname")
self.error(msg)
raise RuntimeError, msg
super(LinuxNode, self).provision()
+ @failtrap
def deploy(self):
if self.state == ResourceState.NEW:
- try:
- self.discover()
- self.provision()
- except:
- self.fail()
- raise
+ self.info("Deploying node")
+ self.discover()
+ self.provision()
# Node needs to wait until all associated interfaces are
# ready before it can finalize deployment
super(LinuxNode, self).deploy()
def release(self):
- # Node needs to wait until all associated RMs are released
- # to be released
- rms = self.get_connected()
- for rm in rms:
- if rm.state < ResourceState.STOPPED:
- self.ec.schedule(reschedule_delay, self.release)
- return
-
- tear_down = self.get("tearDown")
- if tear_down:
- self.execute(tear_down)
+ try:
+ rms = self.get_connected()
+ for rm in rms:
+ # Node needs to wait until all associated RMs are released
+ # before it can be released
+ if rm.state < ResourceState.STOPPED:
+ self.ec.schedule(reschedule_delay, self.release)
+ return
+
+ tear_down = self.get("tearDown")
+ if tear_down:
+ self.execute(tear_down)
- self.clean_processes()
+ self.clean_processes()
+ except:
+ import traceback
+ err = traceback.format_exc()
+ self.error(err)
super(LinuxNode, self).release()
return (out, err), proc
-
def upload(self, src, dst, text = False, overwrite = True):
""" Copy content to destination
# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
from nepi.execution.attribute import Attribute, Flags, Types
-from nepi.execution.resource import clsinit_copy
+from nepi.execution.resource import clsinit_copy, failtrap
from nepi.resources.linux.application import LinuxApplication
from nepi.util.timefuncs import tnow
self._home = "nping-%s" % self.guid
self._sudo_kill = True
+ @failtrap
def deploy(self):
if not self.get("command"):
self.set("command", self._start_command)
# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
from nepi.execution.attribute import Attribute, Flags, Types
-from nepi.execution.resource import clsinit_copy
+from nepi.execution.resource import clsinit_copy, failtrap
from nepi.resources.linux.application import LinuxApplication
from nepi.util.timefuncs import tnow
super(LinuxPing, self).__init__(ec, guid)
self._home = "ping-%s" % self.guid
+ @failtrap
def deploy(self):
if not self.get("command"):
self.set("command", self._start_command)
# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
from nepi.execution.attribute import Attribute, Flags, Types
-from nepi.execution.resource import clsinit_copy
+from nepi.execution.resource import clsinit_copy, failtrap
from nepi.resources.linux.application import LinuxApplication
from nepi.util.timefuncs import tnow
self._home = "tcpdump-%s" % self.guid
self._sudo_kill = True
+ @failtrap
def deploy(self):
if not self.get("command"):
self.set("command", self._start_command)
# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
from nepi.execution.attribute import Attribute, Flags, Types
-from nepi.execution.resource import clsinit_copy
+from nepi.execution.resource import clsinit_copy, failtrap
from nepi.resources.linux.application import LinuxApplication
from nepi.util.timefuncs import tnow
super(LinuxTraceroute, self).__init__(ec, guid)
self._home = "traceroute-%s" % self.guid
+ @failtrap
def deploy(self):
if not self.get("command"):
self.set("command", self._start_command)
from nepi.execution.attribute import Attribute, Flags, Types
from nepi.execution.resource import clsinit_copy, ResourceState, \
- reschedule_delay
-from nepi.execution.resource import clsinit_copy
+ reschedule_delay, failtrap
from nepi.resources.linux.application import LinuxApplication
from nepi.util.timefuncs import tnow
super(LinuxUdpTest, self).__init__(ec, guid)
self._home = "udptest-%s" % self.guid
+ @failtrap
def deploy(self):
if not self.get("command"):
self.set("command", self._start_command)
# finished to continue )
self._run_in_background()
+ @failtrap
def start(self):
if self.get("s") == True:
# Server is already running
else:
msg = " Failed to execute command '%s'" % command
self.error(msg, out, err)
- self.fail()
- raise RuntimeError, msg
+ raise RuntimeError, err
else:
super(LinuxUdpTest, self).start()
from nepi.execution.attribute import Attribute, Flags, Types
from nepi.execution.resource import clsinit_copy, ResourceState, \
- reschedule_delay
+ reschedule_delay, failtrap
from nepi.resources.linux.application import LinuxApplication
from nepi.util.sshfuncs import ProcStatus
from nepi.util.timefuncs import tnow, tdiffsec
msg = " Failed to connect endpoints "
if proc.poll():
- self.fail()
self.error(msg, out, err)
raise RuntimeError, msg
(out, err), proc = endpoint.node.check_errors(self.run_home(endpoint))
# Out is what was written in the stderr file
if err:
- self.fail()
msg = " Failed to start command '%s' " % command
self.error(msg, out, err)
raise RuntimeError, msg
port = self.wait_local_port(endpoint)
return (port, pid, ppid)
+ @failtrap
def provision(self):
# create run dir for tunnel on each node
self.endpoint1.node.mkdir(self.run_home(self.endpoint1))
self.set_provisioned()
+ @failtrap
def deploy(self):
if (not self.endpoint1 or self.endpoint1.state < ResourceState.READY) or \
(not self.endpoint2 or self.endpoint2.state < ResourceState.READY):
self.ec.schedule(reschedule_delay, self.deploy)
else:
- try:
- self.discover()
- self.provision()
- except:
- self.fail()
- raise
+ self.discover()
+ self.provision()
self.debug("----- READY ---- ")
self.set_ready()
+ @failtrap
def start(self):
if self.state == ResourceState.READY:
command = self.get("command")
else:
msg = " Failed to execute command '%s'" % command
self.error(msg, out, err)
- self.fail()
raise RuntimeError, msg
+ @failtrap
def stop(self):
""" Stops application execution
"""
# check if execution errors occurred
msg = " Failed to STOP tunnel"
self.error(msg, err1, err2)
- self.fail()
+ raise RuntimeError, msg
- if self.state == ResourceState.STARTED:
self.set_stopped()
@property
else:
msg = "Couldn't retrieve %s" % filename
self.error(msg, out, err)
- self.fail()
raise RuntimeError, msg
return result
# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
# Julien Tribino <julien.tribino@inria.fr>
-from nepi.execution.resource import ResourceManager, clsinit_copy, ResourceState, \
- reschedule_delay
+from nepi.execution.resource import ResourceManager, clsinit_copy, \
+ ResourceState, reschedule_delay, failtrap
from nepi.execution.attribute import Attribute, Flags
from nepi.resources.omf.omf_resource import ResourceGateway, OMFResource
from nepi.resources.omf.node import OMFNode
from nepi.resources.omf.omf_api import OMFAPIFactory
-
@clsinit_copy
class OMFApplication(OMFResource):
"""
attr = self._attrs["stdin"]
attr.set_hook = self.stdin_hook
-
def valid_connection(self, guid):
""" Check if the connection with the guid in parameter is possible.
Only meaningful connections are allowed.
return True
+ @failtrap
def deploy(self):
""" Deploy the RM. It means nothing special for an application
for now (later it will be upload sources, ...)
if not self._omf_api :
msg = "Credentials are not initialzed. XMPP Connections impossible"
self.error(msg)
- self.fail()
- return
+ raise RuntimeError, msg
super(OMFApplication, self).deploy()
+ @failtrap
def start(self):
""" Start the RM. It means : Send Xmpp Message Using OMF protocol
to execute the application.
if not (self.get('appid') and self.get('path')) :
msg = "Application's information are not initialized"
self.error(msg)
- self.fail()
- return
+ raise RuntimeError, msg
if not self.get('args'):
self.set('args', " ")
except AttributeError:
msg = "Credentials are not initialzed. XMPP Connections impossible"
self.error(msg)
- self.fail()
raise
super(OMFApplication, self).start()
+ @failtrap
def stop(self):
""" Stop the RM. It means : Send Xmpp Message Using OMF protocol to
kill the application.
except AttributeError:
msg = "Credentials were not initialzed. XMPP Connections impossible"
self.error(msg)
- self.fail()
- #raise
+ raise
super(OMFApplication, self).stop()
self.set_finished()
""" Clean the RM at the end of the experiment and release the API.
"""
- if self._omf_api :
- OMFAPIFactory.release_api(self.get('xmppSlice'),
- self.get('xmppHost'), self.get('xmppPort'),
- self.get('xmppPassword'), exp_id = self.exp_id)
+ try:
+ if self._omf_api :
+ OMFAPIFactory.release_api(self.get('xmppSlice'),
+ self.get('xmppHost'), self.get('xmppPort'),
+ self.get('xmppPassword'), exp_id = self.exp_id)
+ except:
+ import traceback
+ err = traceback.format_exc()
+ self.error(err)
super(OMFApplication, self).release()
# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
# Julien Tribino <julien.tribino@inria.fr>
-from nepi.execution.resource import ResourceManager, clsinit_copy, ResourceState, \
- reschedule_delay
+from nepi.execution.resource import ResourceManager, clsinit_copy, \
+ ResourceState, reschedule_delay, failtrap
from nepi.execution.attribute import Attribute, Flags
from nepi.resources.omf.omf_resource import ResourceGateway, OMFResource
res.append(couple)
return res
- def discover(self):
- """ Discover the availables channels
-
- """
- pass
-
- def provision(self):
- """ Provision some availables channels
-
- """
- pass
-
+ @failtrap
def deploy(self):
""" Deploy the RM. It means : Get the xmpp client and send messages
using OMF 5.4 protocol to configure the channel.
if not self._omf_api :
msg = "Credentials are not initialzed. XMPP Connections impossible"
self.error(msg)
- self.fail()
- return
+ raise RuntimeError, msg
if not self.get('channel'):
msg = "Channel's value is not initialized"
self.error(msg)
- self.fail()
- raise
+ raise RuntimeError, msg
+
+ self._nodes_guid = self._get_target(self._connections)
- self._nodes_guid = self._get_target(self._connections)
if self._nodes_guid == "reschedule" :
self.ec.schedule("2s", self.deploy)
- return False
-
- try:
- for couple in self._nodes_guid:
- #print "Couple node/alias : " + couple[0] + " , " + couple[1]
- attrval = self.get('channel')
- attrname = "net/%s/%s" % (couple[1], 'channel')
- self._omf_api.configure(couple[0], attrname, attrval)
- except AttributeError:
- msg = "Credentials are not initialzed. XMPP Connections impossible"
- self.error(msg)
- self.fail()
- raise
-
- super(OMFChannel, self).deploy()
-
- def start(self):
- """ Start the RM. It means nothing special for a channel for now
- It becomes STARTED as soon as this method starts.
-
- """
-
- super(OMFChannel, self).start()
-
- def stop(self):
- """ Stop the RM. It means nothing special for a channel for now
- It becomes STOPPED as soon as this method is called
-
- """
- super(OMFChannel, self).stop()
- self.set_finished()
+ else:
+ try:
+ for couple in self._nodes_guid:
+ #print "Couple node/alias : " + couple[0] + " , " + couple[1]
+ attrval = self.get('channel')
+ attrname = "net/%s/%s" % (couple[1], 'channel')
+ self._omf_api.configure(couple[0], attrname, attrval)
+ except AttributeError:
+ msg = "Credentials are not initialzed. XMPP Connections impossible"
+ self.error(msg)
+ raise
+
+ super(OMFChannel, self).deploy()
def release(self):
""" Clean the RM at the end of the experiment and release the API
"""
- if self._omf_api :
- OMFAPIFactory.release_api(self.get('xmppSlice'),
- self.get('xmppHost'), self.get('xmppPort'),
- self.get('xmppPassword'), exp_id = self.exp_id)
+ try:
+ if self._omf_api :
+ OMFAPIFactory.release_api(self.get('xmppSlice'),
+ self.get('xmppHost'), self.get('xmppPort'),
+ self.get('xmppPassword'), exp_id = self.exp_id)
+ except:
+ import traceback
+ err = traceback.format_exc()
+ self.error(err)
super(OMFChannel, self).release()
# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
# Julien Tribino <julien.tribino@inria.fr>
-from nepi.execution.resource import ResourceManager, clsinit_copy, ResourceState, \
- reschedule_delay
+from nepi.execution.resource import ResourceManager, clsinit_copy, \
+ ResourceState, reschedule_delay, failtrap
from nepi.execution.attribute import Attribute, Flags
from nepi.resources.omf.node import OMFNode
if rm_list: return rm_list[0]
return None
-
def configure_iface(self):
""" Configure the interface without the ip
return True
+ @failtrap
def deploy(self):
""" Deploy the RM. It means : Get the xmpp client and send messages
using OMF 5.4 protocol to configure the interface.
if not self._omf_api :
msg = "Credentials are not initialzed. XMPP Connections impossible"
self.error(msg)
- self.fail()
- return
+ raise RuntimeError, msg
if not (self.get('mode') and self.get('type') and self.get('essid') \
and self.get('ip')):
msg = "Interface's variable are not initialized"
self.error(msg)
- self.fail()
- return False
+ raise RuntimeError, msg
if not self.node.get('hostname') :
msg = "The channel is connected with an undefined node"
self.error(msg)
- self.fail()
- return False
+ raise RuntimeError, msg
# Just for information
self.debug(" " + self.rtype() + " ( Guid : " + str(self._guid) +") : " + \
self.get('essid') + " : " + self.get('ip'))
# Check if the node is already deployed
- chk1 = True
if self.state < ResourceState.PROVISIONED:
- chk1 = self.configure_iface()
- if chk1:
- chk2 = self.configure_ip()
+ if self.configure_iface():
+ self.configure_ip()
- if not (chk1 and chk2) :
- return False
-
super(OMFWifiInterface, self).deploy()
- return True
-
-
- def start(self):
- """ Start the RM. It means nothing special for a channel for now
- It becomes STARTED as soon as this method starts.
-
- """
-
- super(OMFWifiInterface, self).start()
-
- def stop(self):
- """ Stop the RM. It means nothing special for a channel for now
- It becomes STOPPED as soon as this method is called
-
- """
- super(OMFWifiInterface, self).stop()
- self.set_finished()
def release(self):
""" Clean the RM at the end of the experiment and release the API
"""
- if self._omf_api :
- OMFAPIFactory.release_api(self.get('xmppSlice'),
- self.get('xmppHost'), self.get('xmppPort'),
- self.get('xmppPassword'), exp_id = self.exp_id)
+ try:
+ if self._omf_api :
+ OMFAPIFactory.release_api(self.get('xmppSlice'),
+ self.get('xmppHost'), self.get('xmppPort'),
+ self.get('xmppPassword'), exp_id = self.exp_id)
+ except:
+ import traceback
+ err = traceback.format_exc()
+ self.error(err)
super(OMFWifiInterface, self).release()
"""
payload = ET.Element("omf-message")
stdin = self._id_element(payload,"STDIN")
- value = self._attr_element(stdin,"TARGET",target)
+ value = self._attr_element(stdin,"VALUE",value)
sliceid = self._attr_element(stdin,"SLICEID",self._slice_id)
expid = self._attr_element(stdin,"EXPID",self._exp_id)
target = self._attr_element(stdin,"TARGET",target)
# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
# Julien Tribino <julien.tribino@inria.fr>
-
-from nepi.execution.resource import ResourceManager, clsinit_copy, ResourceState, \
- reschedule_delay
+from nepi.execution.resource import ResourceManager, clsinit_copy, \
+ ResourceState, reschedule_delay, failtrap
from nepi.execution.attribute import Attribute, Flags
from nepi.resources.omf.omf_resource import ResourceGateway, OMFResource
from nepi.resources.omf.omf_api import OMFAPIFactory
import time
-
@clsinit_copy
class OMFNode(OMFResource):
"""
return False
+ @failtrap
def deploy(self):
""" Deploy the RM. It means : Send Xmpp Message Using OMF protocol
to enroll the node into the experiment.
if not self._omf_api :
msg = "Credentials are not initialzed. XMPP Connections impossible"
self.error(msg)
- self.fail()
- return
+ raise RuntimeError, msg
if not self.get('hostname') :
msg = "Hostname's value is not initialized"
self.error(msg)
- self.fail()
- return False
+ raise RuntimeError, msg
try:
self._omf_api.enroll_host(self.get('hostname'))
except AttributeError:
msg = "Credentials are not initialzed. XMPP Connections impossible"
self.error(msg)
- self.fail()
- #raise AttributeError, msg
+ raise
super(OMFNode, self).deploy()
- def discover(self):
- """ Discover the availables nodes
-
- """
- pass
-
- def provision(self):
- """ Provision some availables nodes
-
- """
- pass
-
- def start(self):
- """Start the RM. It means nothing special for an interface for now
- It becomes STARTED as soon as this method starts.
-
- """
-
- super(OMFNode, self).start()
-
- def stop(self):
- """Stop the RM. It means nothing special for an interface for now
- It becomes STOPPED as soon as this method stops
-
- """
- super(OMFNode, self).stop()
- self.set_finished()
-
def release(self):
"""Clean the RM at the end of the experiment
"""
- if self._omf_api :
- self._omf_api.release(self.get('hostname'))
-
- OMFAPIFactory.release_api(self.get('xmppSlice'),
- self.get('xmppHost'), self.get('xmppPort'),
- self.get('xmppPassword'), exp_id = self.exp_id)
+ try:
+ if self._omf_api :
+ self._omf_api.release(self.get('hostname'))
+
+ OMFAPIFactory.release_api(self.get('xmppSlice'),
+ self.get('xmppHost'), self.get('xmppPort'),
+ self.get('xmppPassword'), exp_id = self.exp_id)
+ except:
+ import traceback
+ err = traceback.format_exc()
+ self.error(err)
super(OMFNode, self).release()
# Lucia Guevgeozian <lucia.guevgeozian_odizzio@inria.fr>
from nepi.execution.attribute import Attribute, Flags, Types
-from nepi.execution.resource import ResourceManager, clsinit, ResourceState, \
- reschedule_delay
+from nepi.execution.resource import ResourceManager, clsinit_copy, \
+ ResourceState, reschedule_delay
+
class ResourceGateway:
"""
Dictionary used to set OMF gateway depending on Testbed information.
"""
+ #XXX: A.Q. COMMENT: This looks a bit hardcoded
+ # SHOULDN'T THIS BE IN A SEPARATED FILE RATHER THAN IN THE
+ # BASE CLASS FOR ALL OMF RESOURCES?
TestbedtoGateway = dict({
"wilabt" : "ops.wilab2.ilabt.iminds.be",
"nitos" : "nitlab.inf.uth.gr",
})
-@clsinit
+ AMtoGateway = dict({
+ "am.wilab2.ilabt.iminds.be" : "ops.wilab2.ilabt.iminds.be",
+ "nitlab.inf.uth.gr" : "nitlab.inf.uth.gr",
+ "nicta" : "??.??.??",
+ })
+
+@clsinit_copy
class OMFResource(ResourceManager):
"""
Generic resource gathering XMPP credential information and common methods
+++ /dev/null
-#
-# NEPI, a framework to manage network experiments
-# Copyright (C) 2013 INRIA
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-#
-# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
-# Julien Tribino <julien.tribino@inria.fr>
-
-from nepi.execution.resource import ResourceManager, clsinit
-from nepi.execution.attribute import Attribute
-
-from nepi.resources.omf.omf_api import OMFAPIFactory
-
-@clsinit
-class OMFResource(ResourceManager):
- _rtype = "OMFResource"
-
- @classmethod
- def _register_attributes(cls):
- xmppSlice = Attribute("xmppSlice","Name of the slice", flags = "0x02")
- xmppHost = Attribute("xmppHost", "Xmpp Server",flags = "0x02")
- xmppPort = Attribute("xmppPort", "Xmpp Port",flags = "0x02")
- xmppPassword = Attribute("xmppPassword", "Xmpp Port",flags = "0x02")
- cls._register_attribute(xmppSlice)
- cls._register_attribute(xmppHost)
- cls._register_attribute(xmppPort)
- cls._register_attribute(xmppPassword)
-
- def __init__(self, ec, guid, creds):
- super(OMFNode, self).__init__(ec, guid)
- self.set('xmppSlice', creds['xmppSlice'])
- self.set('xmppHost', creds['xmppHost'])
- self.set('xmppPort', creds['xmppPort'])
- self.set('xmppPassword', creds['xmppPassword'])
-
- self._omf_api = OMFAPIFactory.get_api(self.get('xmppSlice'), self.get('xmppHost'), self.get('xmppPort'), self.get('xmppPassword'))
-
- def discover(self):
- pass
-
- def provision(self, credential):
- pass
-
-
# Lucia Guevgeozian <lucia.guevgeozian_odizzio@inria.fr>
from nepi.execution.attribute import Attribute, Flags, Types
-from nepi.execution.resource import ResourceManager, clsinit_copy, ResourceState, \
- reschedule_delay
+from nepi.execution.resource import ResourceManager, clsinit_copy, \
+ ResourceState, reschedule_delay, failtrap
from nepi.resources.linux.node import LinuxNode
from nepi.resources.planetlab.plcapi import PLCAPIFactory
from nepi.util.execfuncs import lexec
cls._register_attribute(min_cpu)
cls._register_attribute(max_cpu)
cls._register_attribute(timeframe)
-
def __init__(self, ec, guid):
super(PlanetlabNode, self).__init__(ec, guid)
# Alexandros Kouvakas <alexandros.kouvakas@inria.fr>
-from nepi.execution.resource import ResourceManager, clsinit_copy, ResourceState
+from nepi.execution.resource import ResourceManager, clsinit_copy, \
+ ResourceState, failtrap
from nepi.execution.attribute import Attribute, Flags
from nepi.resources.planetlab.node import PlanetlabNode
from nepi.resources.linux.application import LinuxApplication
# TODO: Validate!
return True
+ @failtrap
def provision(self):
# create home dir for ovs
self.node.mkdir(self.ovs_home)
stderr = "check_cmd_stderr")
(out, err), proc = self.node.check_output(self.ovs_checks, 'check_cmd_exitcode')
+
if out != "0\n":
msg = "Command sliver-ovs does not exist on the VM"
self.debug(msg)
raise RuntimeError, msg
+
msg = "Command sliver-ovs exists"
self.debug(msg)
+ @failtrap
def deploy(self):
""" Wait until node is associated and deployed
"""
self.ec.schedule(reschedule_delay, self.deploy)
else:
- try:
- self.discover()
- self.provision()
- self.check_sliver_ovs()
- self.servers_on()
- self.create_bridge()
- self.assign_contr()
- self.ovs_status()
- except:
- self._state = ResourceState.FAILED
- raise
-
- self._state = ResourceState.READY
+ self.discover()
+ self.provision()
+ self.check_sliver_ovs()
+ self.servers_on()
+ self.create_bridge()
+ self.assign_contr()
+ self.ovs_status()
+
+ super(OVSWitch, self).deploy()
def servers_on(self):
""" Start the openvswitch servers and also checking
# Check if the servers are running or not
(out, err), proc = self.node.check_output(self.ovs_checks, 'status_srv_exitcode')
+
if out != "0\n":
self.debug("Servers are not running")
raise RuntimeError, msg
+
self.info("Servers started")
def del_old_br(self):
(out, err), proc = self.node.check_output(self.ovs_home, 'show_stdout')
self.info(out)
- def start(self):
- """ Start the RM. It means nothing special for
- ovswitch for now.
- """
- pass
-
- def stop(self):
- """ Stop the RM.It means nothing
- for ovswitch for now.
- """
- pass
-
def release(self):
""" Delete the bridge and
close the servers
"""
# Node needs to wait until all associated RMs are released
# to be released
- from nepi.resources.planetlab.openvswitch.ovsport import OVSPort
- rm = self.get_connected(OVSPort.rtype())
+ try:
+ from nepi.resources.planetlab.openvswitch.ovsport import OVSPort
+ rm = self.get_connected(OVSPort.rtype())
- if rm[0].state < ResourceState.FINISHED:
- self.ec.schedule(reschedule_delay, self.release)
- return
+ if rm[0].state < ResourceState.FINISHED:
+ self.ec.schedule(reschedule_delay, self.release)
+ return
+
+ msg = "Deleting the bridge %s" % self.get('bridge_name')
+ self.info(msg)
+ cmd = "sliver-ovs del-bridge %s" % self.get('bridge_name')
+ (out, err), proc = self.node.run(cmd, self.ovs_checks,
+ sudo = True)
+ cmd = "sliver-ovs stop"
+ (out, err), proc = self.node.run(cmd, self.ovs_checks,
+ sudo = True)
- msg = "Deleting the bridge %s" % self.get('bridge_name')
- self.info(msg)
- cmd = "sliver-ovs del-bridge %s" % self.get('bridge_name')
- (out, err), proc = self.node.run(cmd, self.ovs_checks,
- sudo = True)
- cmd = "sliver-ovs stop"
- (out, err), proc = self.node.run(cmd, self.ovs_checks,
- sudo = True)
-
- if proc.poll():
- self.fail()
- self.error(msg, out, err)
- raise RuntimeError, msg
-
- self._state = ResourceState.RELEASED
-
+ if proc.poll():
+ self.fail()
+ self.error(msg, out, err)
+ raise RuntimeError, msg
+ except:
+ import traceback
+ err = traceback.format_exc()
+ self.error(err)
+
+ super(OVSWitch, self).release()
+
# Alexandros Kouvakas <alexandros.kouvakas@gmail.com>
from nepi.execution.attribute import Attribute, Flags, Types
-from nepi.execution.resource import ResourceManager, clsinit_copy, ResourceState
+from nepi.execution.resource import ResourceManager, clsinit_copy, \
+ ResourceState, failtrap
from nepi.resources.planetlab.openvswitch.ovs import OVSWitch
from nepi.resources.planetlab.node import PlanetlabNode
from nepi.resources.linux.application import LinuxApplication
command = self.replace_paths(command)
return command
- def provision(self):
- """ Provision the ports.No meaning.
- """
- pass
-
- def discover(self):
- """ Discover the ports.No meaning
- """
- pass
-
+ @failtrap
def deploy(self):
""" Wait until ovswitch is started
"""
self.ec.schedule(reschedule_delay, self.deploy)
else:
- try:
- self.discover()
- self.provision()
- self.get_host_ip()
- self.create_port()
- self.get_local_end()
- self.ovswitch.ovs_status()
- self._state = ResourceState.READY
- except:
- self._state = ResourceState.FAILED
- raise
-
- def start(self):
- """ Start the RM. It means nothing special for
- ovsport for now.
- """
- pass
-
- def stop(self):
- """ Stop the RM. It means nothing special for
- ovsport for now.
- """
- pass
-
+ self.discover()
+ self.provision()
+ self.get_host_ip()
+ self.create_port()
+ self.get_local_end()
+ self.ovswitch.ovs_status()
+ super(OVSPort, self).deploy()
+
def release(self):
""" Release the port RM means delete the ports
"""
# OVS needs to wait until all associated RMs are released
# to be released
- from nepi.resources.planetlab.openvswitch.tunnel import Tunnel
- rm = self.get_connected(Tunnel.rtype())
- if rm and rm[0].state < ResourceState.FINISHED:
- self.ec.schedule(reschedule_delay, self.release)
- return
-
- msg = "Deleting the port %s" % self.get('port_name')
- self.info(msg)
- cmd = "sliver-ovs del_port %s" % self.get('port_name')
- (out, err), proc = self.node.run(cmd, self.ovswitch.ovs_checks,
- sudo = True)
-
- if proc.poll():
- self.fail()
- self.error(msg, out, err)
- raise RuntimeError, msg
-
- self._state = ResourceState.RELEASED
+ try:
+ from nepi.resources.planetlab.openvswitch.tunnel import Tunnel
+ rm = self.get_connected(Tunnel.rtype())
+ if rm and rm[0].state < ResourceState.FINISHED:
+ self.ec.schedule(reschedule_delay, self.release)
+ return
+
+ msg = "Deleting the port %s" % self.get('port_name')
+ self.info(msg)
+ cmd = "sliver-ovs del_port %s" % self.get('port_name')
+ (out, err), proc = self.node.run(cmd, self.ovswitch.ovs_checks,
+ sudo = True)
+
+ if proc.poll():
+ self.fail()
+ self.error(msg, out, err)
+ raise RuntimeError, msg
+ except:
+ import traceback
+ err = traceback.format_exc()
+ self.error(err)
+
+ super(OVSPort, self).release()
# Alexandros Kouvakas <alexandros.kouvakas@gmail.com>
from nepi.execution.attribute import Attribute, Flags, Types
-from nepi.execution.resource import ResourceManager, clsinit_copy, ResourceState
+from nepi.execution.resource import ResourceManager, clsinit_copy, \
+ ResourceState, failtrap
from nepi.resources.linux.application import LinuxApplication
from nepi.resources.planetlab.node import PlanetlabNode
from nepi.resources.planetlab.openvswitch.ovs import OVSWitch
import time
import socket
-
reschedule_delay = "0.5s"
@clsinit_copy
-class Tunnel(LinuxApplication):
+class OVSTunnel(LinuxApplication):
"""
.. class:: Class Args :
"""
- _rtype = "Tunnel"
+ _rtype = "OVSTunnel"
_authorized_connections = ["OVSPort", "PlanetlabTap"]
@classmethod
:type guid: int
"""
- super(Tunnel, self).__init__(ec, guid)
+ super(OVSTunnel, self).__init__(ec, guid)
self._home = "tunnel-%s" % self.guid
self.port_info_tunl = []
self._nodes = []
self.fail()
self.error(msg, out, err)
raise RuntimeError, msg
+
msg = "Connection on host %s configured" \
% self.node.get("hostname")
self.info(msg)
self.info(msg)
return
+ @failtrap
def provision(self):
""" Provision the tunnel
"""
(self._pid, self._ppid) = self.udp_connect(self.endpoint2, self.endpoint1)
switch_connect = self.sw_host_connect(self.endpoint1, self.endpoint2)
- self.debug("------- READY -------")
- self._provision_time = tnow()
- self._state = ResourceState.PROVISIONED
-
- def discover(self):
- """ Discover the tunnel
-
- """
- pass
+ super(OVSTunnel, self).provision()
+ @failtrap
def deploy(self):
if (not self.endpoint1 or self.endpoint1.state < ResourceState.READY) or \
(not self.endpoint2 or self.endpoint2.state < ResourceState.READY):
self.ec.schedule(reschedule_delay, self.deploy)
else:
- try:
- self.discover()
- self.provision()
- except:
- self.fail()
- raise
-
- self.debug("----- READY ---- ")
- self._ready_time = tnow()
- self._state = ResourceState.READY
-
- def start(self):
- """ Start the RM. It means nothing special for
- ovsport for now.
- """
- pass
-
-
- def stop(self):
- """ Stop the RM. It means nothing special for
- ovsport for now.
- """
- pass
+ self.discover()
+ self.provision()
+ super(OVSTunnel, self).deploy()
+
def release(self):
""" Release the udp_tunnel on endpoint2.
On endpoint1 means nothing special.
"""
- if not self.check_endpoints():
- # Kill the TAP devices
- # TODO: Make more generic Release method of PLTAP
- if self._pid and self._ppid:
- self._nodes = self.get_node(self.endpoint2)
- (out, err), proc = self.node.kill(self._pid,
- self._ppid, sudo = True)
- if err or proc.poll():
- # check if execution errors occurred
- msg = " Failed to delete TAP device"
- self.error(msg, err, err)
- self.fail()
-
- self._state = ResourceState.RELEASED
-
-
-
-
-
-
+ try:
+ if not self.check_endpoints():
+ # Kill the TAP devices
+ # TODO: Make more generic Release method of PLTAP
+ if self._pid and self._ppid:
+ self._nodes = self.get_node(self.endpoint2)
+ (out, err), proc = self.node.kill(self._pid,
+ self._ppid, sudo = True)
+ if err or proc.poll():
+ # check if execution errors occurred
+ msg = " Failed to delete TAP device"
+ self.error(msg, err, err)
+ self.fail()
+ except:
+ import traceback
+ err = traceback.format_exc()
+ self.error(err)
+
+ super(OVSTunnel, self).release()
from nepi.execution.attribute import Attribute, Flags, Types
from nepi.execution.resource import clsinit_copy, ResourceState, \
- reschedule_delay
+ reschedule_delay, failtrap
from nepi.resources.linux.application import LinuxApplication
from nepi.resources.planetlab.node import PlanetlabNode
from nepi.util.timefuncs import tnow, tdiffsec
if_name = self.wait_if_name()
self.set("deviceName", if_name)
+ @failtrap
def deploy(self):
if not self.node or self.node.state < ResourceState.PROVISIONED:
self.ec.schedule(reschedule_delay, self.deploy)
if not self.get("install"):
self.set("install", self._install)
- try:
- self.discover()
- self.provision()
- except:
- self.fail()
- raise
-
+ self.discover()
+ self.provision()
+
self.debug("----- READY ---- ")
self.set_ready()
+ @failtrap
def start(self):
if self.state == ResourceState.READY:
command = self.get("command")
else:
msg = " Failed to execute command '%s'" % command
self.error(msg, out, err)
- self.fail()
raise RuntimeError, msg
+ @failtrap
def stop(self):
command = self.get('command') or ''
if out.strip().find(self.get("deviceName")) == -1:
# tap is not running is not running (socket not found)
- self._finish_time = tnow()
- self._state = ResourceState.FINISHED
+ self.finish()
self._last_state_check = tnow()
def release(self):
# Node needs to wait until all associated RMs are released
# to be released
- from nepi.resources.linux.udptunnel import UdpTunnel
- rms = self.get_connected(UdpTunnel.rtype())
- for rm in rms:
- if rm.state < ResourceState.STOPPED:
- self.ec.schedule(reschedule_delay, self.release)
- return
+ try:
+ from nepi.resources.linux.udptunnel import UdpTunnel
+ rms = self.get_connected(UdpTunnel.rtype())
+ for rm in rms:
+ if rm.state < ResourceState.STOPPED:
+ self.ec.schedule(reschedule_delay, self.release)
+ return
+ except:
+ import traceback
+ err = traceback.format_exc()
+ self.error(err)
super(PlanetlabTap, self).release()
else:
msg = "Couldn't retrieve if_name"
self.error(msg, out, err)
- self.fail()
raise RuntimeError, msg
return if_name
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# Author: Claudio Freire <claudio-daniel.freire@inria.fr>
+# Alina Quereilhac <alina.quereilhac@inria.fr>
#
-# A.Q. TODO: BUG FIX THREADCACHE. Not needed!! remove it completely!
-
import threading
import Queue
import traceback
N_PROCS = None
-#THREADCACHE = []
-#THREADCACHEPID = None
-
class WorkerThread(threading.Thread):
class QUIT:
pass
- class REASSIGNED:
- pass
-
+
def run(self):
while True:
task = self.queue.get()
- if task is None:
- self.done = True
- self.queue.task_done()
- continue
- elif task is self.QUIT:
- self.done = True
+
+ if task is self.QUIT:
self.queue.task_done()
break
- elif task is self.REASSIGNED:
- continue
- else:
- self.done = False
-
+
try:
try:
callable, args, kwargs = task
except:
traceback.print_exc(file = sys.stderr)
self.delayed_exceptions.append(sys.exc_info())
-
- def waitdone(self):
- while not self.queue.empty() and not self.done:
- self.queue.join()
-
+
def attach(self, queue, rvqueue, delayed_exceptions):
- if self.isAlive():
- self.waitdone()
- oldqueue = self.queue
self.queue = queue
self.rvqueue = rvqueue
self.delayed_exceptions = delayed_exceptions
- if self.isAlive():
- oldqueue.put(self.REASSIGNED)
-
- def detach(self):
- if self.isAlive():
- self.waitdone()
- self.oldqueue = self.queue
- self.queue = Queue.Queue()
- self.rvqueue = None
- self.delayed_exceptions = []
-
- def detach_signal(self):
- if self.isAlive():
- self.oldqueue.put(self.REASSIGNED)
- del self.oldqueue
-
+
def quit(self):
self.queue.put(self.QUIT)
- self.join()
-class ParallelMap(object):
+class ParallelRun(object):
def __init__(self, maxthreads = None, maxqueue = None, results = True):
- global N_PROCS
- #global THREADCACHE
- #global THREADCACHEPID
+ self.maxqueue = maxqueue
+ self.maxthreads = maxthreads
+
+ self.queue = Queue.Queue(self.maxqueue or 0)
+ self.delayed_exceptions = []
+
+ if results:
+ self.rvqueue = Queue.Queue()
+ else:
+ self.rvqueue = None
+
+ self.initialize_workers()
+
+ def initialize_workers(self):
+ global N_PROCS
+
+ maxthreads = self.maxthreads
+
+ # Compute maximum number of threads allowed by the system
if maxthreads is None:
if N_PROCS is None:
try:
if maxthreads is None:
maxthreads = 4
-
- self.queue = Queue.Queue(maxqueue or 0)
-
- self.delayed_exceptions = []
-
- if results:
- self.rvqueue = Queue.Queue()
- else:
- self.rvqueue = None
-
- # Check threadcache
- #if THREADCACHEPID is None or THREADCACHEPID != os.getpid():
- # del THREADCACHE[:]
- # THREADCACHEPID = os.getpid()
-
+
self.workers = []
+
+ # initialize workers
for x in xrange(maxthreads):
- t = None
- #if THREADCACHE:
- # try:
- # t = THREADCACHE.pop()
- # except:
- # pass
- if t is None:
- t = WorkerThread()
- t.setDaemon(True)
- else:
- t.waitdone()
- t.attach(self.queue, self.rvqueue, self.delayed_exceptions)
- self.workers.append(t)
+ worker = WorkerThread()
+ worker.attach(self.queue, self.rvqueue, self.delayed_exceptions)
+ worker.setDaemon(True)
+
+ self.workers.append(worker)
def __del__(self):
self.destroy()
-
- def destroy(self):
- # Check threadcache
- #global THREADCACHE
- #global THREADCACHEPID
- #if THREADCACHEPID is None or THREADCACHEPID != os.getpid():
- # del THREADCACHE[:]
- # THREADCACHEPID = os.getpid()
- for worker in self.workers:
- worker.waitdone()
- for worker in self.workers:
- worker.detach()
- for worker in self.workers:
- worker.detach_signal()
- for worker in self.workers:
- worker.quit()
-
- # TO FIX:
- # THREADCACHE.extend(self.workers)
+ def empty(self):
+ while True:
+ try:
+ self.queue.get(block = False)
+ self.queue.task_done()
+ except Queue.Empty:
+ break
+
+ def destroy(self):
+ self.join()
del self.workers[:]
self.queue.put_nowait((callable, args, kwargs))
def start(self):
- for thread in self.workers:
- if not thread.isAlive():
- thread.start()
+ for worker in self.workers:
+ if not worker.isAlive():
+ worker.start()
def join(self):
- for thread in self.workers:
- # That's the sync signal
- self.queue.put(None)
-
+ # Wait until all queued tasks have been processed
self.queue.join()
- for thread in self.workers:
- thread.waitdone()
-
- if self.delayed_exceptions:
- typ,val,loc = self.delayed_exceptions[0]
- del self.delayed_exceptions[:]
- raise typ,val,loc
-
- self.destroy()
+
+ for worker in self.workers:
+ worker.quit()
+
+ for worker in self.workers:
+ worker.join()
def sync(self):
- self.queue.join()
if self.delayed_exceptions:
typ,val,loc = self.delayed_exceptions[0]
del self.delayed_exceptions[:]
except Queue.Empty:
raise StopIteration
-
-class ParallelFilter(ParallelMap):
- class _FILTERED:
- pass
-
- def __filter(self, x):
- if self.filter_condition(x):
- return x
- else:
- return self._FILTERED
-
- def __init__(self, filter_condition, maxthreads = None, maxqueue = None):
- super(ParallelFilter, self).__init__(maxthreads, maxqueue, True)
- self.filter_condition = filter_condition
-
- def put(self, what):
- super(ParallelFilter, self).put(self.__filter, what)
-
- def put_nowait(self, what):
- super(ParallelFilter, self).put_nowait(self.__filter, what)
-
- def __iter__(self):
- for rv in super(ParallelFilter, self).__iter__():
- if rv is not self._FILTERED:
- yield rv
-
-class ParallelRun(ParallelMap):
- def __run(self, x):
- fn, args, kwargs = x
- return fn(*args, **kwargs)
-
- def __init__(self, maxthreads = None, maxqueue = None):
- super(ParallelRun, self).__init__(maxthreads, maxqueue, True)
-
- def put(self, what, *args, **kwargs):
- super(ParallelRun, self).put(self.__run, (what, args, kwargs))
-
- def put_nowait(self, what, *args, **kwargs):
- super(ParallelRun, self).put_nowait(self.__filter, (what, args, kwargs))
-
-
-def pmap(mapping, iterable, maxthreads = None, maxqueue = None):
- mapper = ParallelMap(
- maxthreads = maxthreads,
- maxqueue = maxqueue,
- results = True)
- mapper.start()
- for elem in iterable:
- mapper.put(elem)
- rv = list(mapper)
- mapper.join()
- return rv
-
-def pfilter(condition, iterable, maxthreads = None, maxqueue = None):
- filtrer = ParallelFilter(
- condition,
- maxthreads = maxthreads,
- maxqueue = maxqueue)
- filtrer.start()
- for elem in iterable:
- filtrer.put(elem)
- rv = list(filtrer)
- filtrer.join()
- return rv
-
def test_schedule_exception(self):
def raise_error():
+ # When this task is executed and the error raise,
+ # the FailureManager should set its failure level to
+ # TASK_FAILURE
raise RuntimeError, "NOT A REAL ERROR. JUST TESTING!"
ec = ExperimentController()
- ec.schedule("2s", raise_error)
- while ec.ecstate not in [ECState.FAILED, ECState.TERMINATED]:
- time.sleep(1)
+ tid = ec.schedule("2s", raise_error, track = True)
- self.assertEquals(ec.ecstate, ECState.FAILED)
- ec.shutdown()
+ while True:
+ task = ec.get_task(tid)
+ if task.status != TaskStatus.NEW:
+ break
+
+ time.sleep(1)
+ self.assertEquals(task.status, TaskStatus.ERROR)
if __name__ == '__main__':
unittest.main()
from nepi.execution.attribute import Attribute
-from nepi.execution.ec import ExperimentController
-from nepi.execution.resource import ResourceManager, ResourceState, clsinit, \
- ResourceAction
+from nepi.execution.ec import ExperimentController, FailureLevel
+from nepi.execution.resource import ResourceManager, ResourceState, \
+ clsinit_copy, ResourceAction, failtrap
import random
import time
import unittest
-@clsinit
+@clsinit_copy
class MyResource(ResourceManager):
_rtype = "MyResource"
def __init__(self, ec, guid):
super(MyResource, self).__init__(ec, guid)
-@clsinit
+@clsinit_copy
class AnotherResource(ResourceManager):
_rtype = "AnotherResource"
def __init__(self, ec, guid):
super(AnotherResource, self).__init__(ec, guid)
-
class Channel(ResourceManager):
_rtype = "Channel"
self.discover()
self.provision()
self.logger.debug(" -------- PROVISIONED ------- ")
- self.ec.schedule("3s", self.deploy)
+ self.ec.schedule("1s", self.deploy)
elif self.state == ResourceState.PROVISIONED:
ifaces = self.get_connected(Interface.rtype())
for rm in ifaces:
if node.state < ResourceState.READY:
self.ec.schedule("0.5s", self.deploy)
else:
- time.sleep(random.random() * 5)
+ time.sleep(random.random() * 2)
super(Application, self).deploy()
self.logger.debug(" -------- DEPLOYED ------- ")
def start(self):
super(Application, self).start()
- time.sleep(random.random() * 5)
+ time.sleep(random.random() * 3)
self._state = ResourceState.FINISHED
-
+
+class ErrorApplication(ResourceManager):
+ _rtype = "ErrorApplication"
+
+ def __init__(self, ec, guid):
+ super(ErrorApplication, self).__init__(ec, guid)
+
+ @failtrap
+ def deploy(self):
+ node = self.get_connected(Node.rtype())[0]
+ if node.state < ResourceState.READY:
+ self.ec.schedule("0.5s", self.deploy)
+ else:
+ time.sleep(random.random() * 2)
+ raise RuntimeError, "NOT A REAL ERROR. JUST TESTING"
class ResourceFactoryTestCase(unittest.TestCase):
def test_add_resource_factory(self):
ResourceFactory.register_type(AnotherResource)
self.assertEquals(MyResource.rtype(), "MyResource")
- self.assertEquals(len(MyResource._attributes), 1)
+ self.assertEquals(len(MyResource._attributes), 2)
self.assertEquals(ResourceManager.rtype(), "Resource")
- self.assertEquals(len(ResourceManager._attributes), 0)
+ self.assertEquals(len(ResourceManager._attributes), 1)
self.assertEquals(AnotherResource.rtype(), "AnotherResource")
- self.assertEquals(len(AnotherResource._attributes), 0)
+ self.assertEquals(len(AnotherResource._attributes), 1)
self.assertEquals(len(ResourceFactory.resource_types()), 2)
ec.shutdown()
- def test_start_with_condition(self):
+ def test_exception(self):
+ from nepi.execution.resource import ResourceFactory
+
+ ResourceFactory.register_type(ErrorApplication)
+ ResourceFactory.register_type(Node)
+ ResourceFactory.register_type(Interface)
+ ResourceFactory.register_type(Channel)
+
+ ec = ExperimentController()
+
+ node = ec.register_resource("Node")
+
+ apps = list()
+ for i in xrange(10):
+ app = ec.register_resource("ErrorApplication")
+ ec.register_connection(app, node)
+ apps.append(app)
+
+
+ ec.deploy()
+
+ ec.wait_finished(apps)
+
+ ec.shutdown()
+
+ self.assertTrue(ec._fm._failure_level == FailureLevel.RM_FAILURE)
+
+
+ def ztest_start_with_condition(self):
# TODO!!!
pass
- def test_stop_with_condition(self):
+ def ztest_stop_with_condition(self):
# TODO!!!
pass
- def test_set_with_condition(self):
+ def ztest_set_with_condition(self):
# TODO!!!
pass
self.assertEquals(len(OMFChannel._attributes), 5)
self.assertEquals(OMFApplication.rtype(), "OMFApplication")
- self.assertEquals(len(OMFApplication._attributes), 9)
+ self.assertEquals(len(OMFApplication._attributes), 12)
class OMFEachTestCase(unittest.TestCase):
self.ec.set(self.iface1, 'type', "g")
self.ec.set(self.iface1, 'essid', "vlcexp")
self.ec.set(self.iface1, 'ip', "10.0.0.17")
- self.ec.set(self.iface1, 'xmppSlice', "nepi")
- self.ec.set(self.iface1, 'xmppHost', "xmpp-plexus.onelab.eu")
- self.ec.set(self.iface1, 'xmppPort', "5222")
- self.ec.set(self.iface1, 'xmppPassword', "1234")
self.channel = self.ec.register_resource("OMFChannel")
self.ec.set(self.channel, 'channel', "6")
self.ec.set(self.app1, 'path', "/opt/vlc-1.1.13/cvlc")
self.ec.set(self.app1, 'args', "/opt/10-by-p0d.avi --sout '#rtp{dst=10.0.0.37,port=1234,mux=ts}'")
self.ec.set(self.app1, 'env', "DISPLAY=localhost:10.0 XAUTHORITY=/root/.Xauthority")
- self.ec.set(self.app1, 'xmppSlice', "nepi")
- self.ec.set(self.app1, 'xmppHost', "xmpp-plexus.onelab.eu")
- self.ec.set(self.app1, 'xmppPort', "5222")
- self.ec.set(self.app1, 'xmppPassword', "1234")
self.app2 = self.ec.register_resource("OMFApplication")
self.assertEquals(self.ec.get(self.iface1, 'type'), 'g')
self.assertEquals(self.ec.get(self.iface1, 'essid'), 'vlcexp')
self.assertEquals(self.ec.get(self.iface1, 'ip'), '10.0.0.17')
- self.assertEquals(self.ec.get(self.iface1, 'xmppSlice'), 'nepi')
- self.assertEquals(self.ec.get(self.iface1, 'xmppHost'), 'xmpp-plexus.onelab.eu')
- self.assertEquals(self.ec.get(self.iface1, 'xmppPort'), '5222')
- self.assertEquals(self.ec.get(self.iface1, 'xmppPassword'), '1234')
def test_creation_and_configuration_channel(self):
self.assertEquals(self.ec.get(self.channel, 'channel'), '6')
self.assertEquals(self.ec.get(self.app1, 'path'), '/opt/vlc-1.1.13/cvlc')
self.assertEquals(self.ec.get(self.app1, 'args'), "/opt/10-by-p0d.avi --sout '#rtp{dst=10.0.0.37,port=1234,mux=ts}'")
self.assertEquals(self.ec.get(self.app1, 'env'), 'DISPLAY=localhost:10.0 XAUTHORITY=/root/.Xauthority')
- self.assertEquals(self.ec.get(self.app1, 'xmppSlice'), 'nepi')
- self.assertEquals(self.ec.get(self.app1, 'xmppHost'), 'xmpp-plexus.onelab.eu')
- self.assertEquals(self.ec.get(self.app1, 'xmppPort'), '5222')
- self.assertEquals(self.ec.get(self.app1, 'xmppPassword'), '1234')
def test_connection(self):
self.assertEquals(len(self.ec.get_resource(self.node1).connections), 6)
class OMFVLCTestCaseComplete(unittest.TestCase):
- def test_deploy(self):
+ def xtest_deploy(self):
ec = DummyEC(exp_id = "5421" )
self.node1 = ec.register_resource("OMFNode")
ec.set(self.iface1, 'type', "g")
ec.set(self.iface1, 'essid', "vlcexp")
ec.set(self.iface1, 'ip', "10.0.0.17")
- ec.set(self.iface1, 'xmppSlice', "nepi")
- ec.set(self.iface1, 'xmppHost', "xmpp-plexus.onelab.eu")
- ec.set(self.iface1, 'xmppPort', "5222")
- ec.set(self.iface1, 'xmppPassword', "1234")
self.channel = ec.register_resource("OMFChannel")
ec.set(self.channel, 'channel', "6")
ec.set(self.app1, 'path', "/opt/vlc-1.1.13/cvlc")
ec.set(self.app1, 'args', "/opt/10-by-p0d.avi --sout '#rtp{dst=10.0.0.37,port=1234,mux=ts}'")
ec.set(self.app1, 'env', "DISPLAY=localhost:10.0 XAUTHORITY=/root/.Xauthority")
- ec.set(self.app1, 'xmppSlice', "nepi")
- ec.set(self.app1, 'xmppHost', "xmpp-plexus.onelab.eu")
- ec.set(self.app1, 'xmppPort', "5222")
- ec.set(self.app1, 'xmppPassword', "1234")
self.app2 = ec.register_resource("OMFApplication")
ec.set(self.app2, 'appid', 'Test#1')
ec.set(self.app2, 'path', "/usr/bin/test")
ec.set(self.app2, 'args', "-1")
ec.set(self.app2, 'env', " ")
- ec.set(self.app2, 'xmppSlice', "nepi")
- ec.set(self.app2, 'xmppHost', "xmpp-plexus.onelab.eu")
- ec.set(self.app2, 'xmppPort', "5222")
- ec.set(self.app2, 'xmppPassword', "1234")
self.app3 = ec.register_resource("OMFApplication")
ec.set(self.app3, 'appid', 'Test#2')
ec.set(self.app3, 'path', "/usr/bin/test")
ec.set(self.app3, 'args', "-2")
ec.set(self.app3, 'env', " ")
- ec.set(self.app3, 'xmppSlice', "nepi")
- ec.set(self.app3, 'xmppHost', "xmpp-plexus.onelab.eu")
- ec.set(self.app3, 'xmppPort', "5222")
- ec.set(self.app3, 'xmppPassword', "1234")
self.app4 = ec.register_resource("OMFApplication")
ec.set(self.app4, 'appid', 'Test#3')
ec.set(self.app4, 'path', "/usr/bin/test")
ec.set(self.app4, 'args', "-3")
ec.set(self.app4, 'env', " ")
- ec.set(self.app4, 'xmppSlice', "nepi")
- ec.set(self.app4, 'xmppHost', "xmpp-plexus.onelab.eu")
- ec.set(self.app4, 'xmppPort', "5222")
- ec.set(self.app4, 'xmppPassword', "1234")
self.app5 = ec.register_resource("OMFApplication")
ec.set(self.app5, 'appid', 'Kill#2')
ec.set(self.app5, 'path', "/usr/bin/killall")
ec.set(self.app5, 'args', "vlc")
ec.set(self.app5, 'env', " ")
- ec.set(self.app5, 'xmppSlice', "nepi")
- ec.set(self.app5, 'xmppHost', "xmpp-plexus.onelab.eu")
- ec.set(self.app5, 'xmppPort', "5222")
- ec.set(self.app5, 'xmppPassword', "1234")
ec.register_connection(self.app1, self.node1)
ec.register_connection(self.app2, self.node1)
ec.wait_finished([self.app1, self.app2, self.app3,self.app4, self.app5])
- time.sleep(2)
+ time.sleep(1)
self.assertEquals(round(tdiffsec(ec.get_resource(self.app2).start_time, ec.get_resource(self.app1).start_time),0), 3.0)
self.assertEquals(round(tdiffsec(ec.get_resource(self.app3).start_time, ec.get_resource(self.app2).start_time),0), 2.0)
self.assertEquals(ec.get_resource(self.app5).state, ResourceState.FINISHED)
ec.shutdown()
- time.sleep(2)
+ time.sleep(1)
self.assertEquals(ec.get_resource(self.node1).state, ResourceState.RELEASED)
self.assertEquals(ec.get_resource(self.iface1).state, ResourceState.RELEASED)
ec.set(self.iface1, 'type', "g")
ec.set(self.iface1, 'essid', "vlcexp")
ec.set(self.iface1, 'ip', "10.0.0.17")
- ec.set(self.iface1, 'xmppSlice', "nepi")
- ec.set(self.iface1, 'xmppHost', "xmpp-plexus.onelab.eu")
- ec.set(self.iface1, 'xmppPort', "5222")
- ec.set(self.iface1, 'xmppPassword', "1234")
self.iface2 = ec.register_resource("OMFWifiInterface")
ec.set(self.app1, 'path', "/opt/vlc-1.1.13/cvlc")
ec.set(self.app1, 'args', "/opt/10-by-p0d.avi --sout '#rtp{dst=10.0.0.37,port=1234,mux=ts}'")
ec.set(self.app1, 'env', "DISPLAY=localhost:10.0 XAUTHORITY=/root/.Xauthority")
- ec.set(self.app1, 'xmppSlice', "nepi")
- ec.set(self.app1, 'xmppHost', "xmpp-plexus.onelab.eu")
- ec.set(self.app1, 'xmppPort', "5222")
- ec.set(self.app1, 'xmppPassword', "1234")
self.app2 = ec.register_resource("OMFApplication")
- ec.set(self.app2, 'xmppSlice', "nepi")
- ec.set(self.app2, 'xmppHost', "xmpp-plexus.onelab.eu")
- ec.set(self.app2, 'xmppPort', "5222")
- ec.set(self.app2, 'xmppPassword', "1234")
self.app3 = ec.register_resource("OMFApplication")
ec.set(self.app3, 'appid', 'Kill#2')
ec.set(self.app3, 'args', "vlc")
ec.set(self.app3, 'env', " ")
- self.app4 = ec.register_resource("OMFApplication")
-
ec.register_connection(self.app1, self.node1)
ec.register_connection(self.app2, self.node1)
ec.register_connection(self.app3, self.node1)
- ec.register_connection(self.app4, self.node1)
ec.register_connection(self.node1, self.iface1)
ec.register_connection(self.iface1, self.channel)
ec.register_connection(self.node2, self.iface2)
ec.register_condition(self.app2, ResourceAction.START, self.app1, ResourceState.STARTED , "2s")
ec.register_condition(self.app3, ResourceAction.START, self.app2, ResourceState.STARTED , "2s")
- ec.register_condition(self.app4, ResourceAction.START, [self.app3, self.app2], ResourceState.STARTED , "2s")
- ec.register_condition([self.app1, self.app2, self.app3], ResourceAction.STOP, self.app1, ResourceState.STARTED , "8s")
+ ec.register_condition([self.app1, self.app2, self.app3], ResourceAction.STOP, self.app1, ResourceState.STARTED , "6s")
ec.deploy()
- ec.wait_finished([self.app1, self.app2, self.app3,self.app4])
+ ec.wait_finished([self.app1, self.app2, self.app3])
- self.assertEquals(ec.get_resource(self.node1).state, ResourceState.STARTED)
- self.assertEquals(ec.get_resource(self.node2).state, ResourceState.FAILED)
- self.assertEquals(ec.get_resource(self.iface1).state, ResourceState.STARTED)
- self.assertEquals(ec.get_resource(self.iface2).state, ResourceState.FAILED)
- self.assertEquals(ec.get_resource(self.channel).state, ResourceState.STARTED)
- self.assertEquals(ec.get_resource(self.app1).state, ResourceState.FINISHED)
- self.assertEquals(ec.get_resource(self.app2).state, ResourceState.FAILED)
- self.assertEquals(ec.get_resource(self.app3).state, ResourceState.FAILED)
- self.assertEquals(ec.get_resource(self.app4).state, ResourceState.FAILED)
+# self.assertEquals(ec.get_resource(self.node1).state, ResourceState.STARTED)
+# self.assertEquals(ec.get_resource(self.node2).state, ResourceState.FAILED)
+# self.assertEquals(ec.get_resource(self.iface1).state, ResourceState.STARTED)
+# self.assertEquals(ec.get_resource(self.iface2).state, ResourceState.FAILED)
+# self.assertEquals(ec.get_resource(self.channel).state, ResourceState.STARTED)
+# self.assertEquals(ec.get_resource(self.app1).state, ResourceState.FINISHED)
+# self.assertEquals(ec.get_resource(self.app2).state, ResourceState.FAILED)
+# self.assertEquals(ec.get_resource(self.app3).state, ResourceState.FINISHED)
time.sleep(1)
self.assertEquals(ec.get_resource(self.app1).state, ResourceState.RELEASED)
self.assertEquals(ec.get_resource(self.app2).state, ResourceState.RELEASED)
self.assertEquals(ec.get_resource(self.app3).state, ResourceState.RELEASED)
- self.assertEquals(ec.get_resource(self.app4).state, ResourceState.RELEASED)
if __name__ == '__main__':
unittest.main()
ec.set(tap2, "prefix4", 24)
ec.register_connection(tap2, node4)
- ovstun1 = ec.register_resource("Tunnel")
+ ovstun1 = ec.register_resource("OVSTunnel")
ec.register_connection(port1, ovstun1)
ec.register_connection(tap1, ovstun1)
- ovstun2 = ec.register_resource("Tunnel")
+ ovstun2 = ec.register_resource("OVSTunnel")
ec.register_connection(port3, ovstun2)
ec.register_connection(tap2, ovstun2)
- ovstun3 = ec.register_resource("Tunnel")
+ ovstun3 = ec.register_resource("OVSTunnel")
ec.register_connection(port2, ovstun3)
ec.register_connection(port4, ovstun3)
--- /dev/null
+#!/usr/bin/env python
+#
+# NEPI, a framework to manage network experiments
+# Copyright (C) 2013 INRIA
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
+
+
+from nepi.util.parallel import ParallelRun
+
+import datetime
+import unittest
+
+class ParallelRunTestCase(unittest.TestCase):
+ def test_run_simple(self):
+ runner = ParallelRun(maxthreads = 4)
+ runner.start()
+
+ count = [0]
+
+ def inc(count):
+ count[0] += 1
+
+ for x in xrange(10):
+ runner.put(inc, count)
+
+ runner.destroy()
+
+ self.assertEquals(count[0], 10)
+
+ def test_run_interrupt(self):
+
+ def sleep():
+ import time
+ time.sleep(5)
+
+ startt = datetime.datetime.now()
+
+ runner = ParallelRun(maxthreads = 4)
+ runner.start()
+
+ for x in xrange(100):
+ runner.put(sleep)
+
+ runner.empty()
+ runner.destroy()
+
+ endt = datetime.datetime.now()
+ time_elapsed = (endt - startt).seconds
+ self.assertTrue( time_elapsed < 500)
+
+ def test_run_error(self):
+ count = [0]
+
+ def inc(count):
+ count[0] += 1
+
+ def error():
+ raise RuntimeError()
+
+ runner = ParallelRun(maxthreads = 4)
+ runner.start()
+
+ for x in xrange(4):
+ runner.put(inc, count)
+
+ runner.put(error)
+
+ runner.destroy()
+
+ self.assertEquals(count[0], 4)
+
+ self.assertRaises(RuntimeError, runner.sync)
+
+if __name__ == '__main__':
+ unittest.main()
+