#
# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
+from nepi.util.timefuncs import stformat, tsformat
+
from xml.dom import minidom
+import datetime
import sys
import os
def xmlencode(s):
if isinstance(s, str):
rv = s.decode("latin1")
+ if isinstance(s, datetime.datetime):
+ rv = tsformat(s)
elif not isinstance(s, unicode):
rv = unicode(s)
else:
rv = s
return rv.replace(u'\x00',u'�')
-def xmldecode(s):
- return s.replace(u'�',u'\x00').encode("utf8")
+def xmldecode(s, cast = str):
+ ret = s.replace(u'�',u'\x00').encode("ascii")
+ ret = cast(ret)
+ if s == "None":
+ return None
+ return ret
def from_type(value):
- if value == None:
- return str("None")
-
- if isinstance(value, str):
- return STRING
if isinstance(value, bool):
return BOOL
if isinstance(value, int):
if isinstance(value, float):
return DOUBLE
+ return STRING
+
def to_type(type, value):
+ if not value:
+ return value
+
if type == STRING:
- if value == "None":
- return None
return str(value)
if type == BOOL:
return value == "True"
for action, conds in rm._conditions.iteritems():
conditions = True
for (group, state, time) in conds:
- cnnode = doc.createElement("condition")
+ ccnnode = doc.createElement("condition")
ccnnode.setAttribute("action", xmlencode(action))
ccnnode.setAttribute("group", xmlencode(group))
ccnnode.setAttribute("state", xmlencode(state))
ecnode_list = doc.getElementsByTagName("experiment")
for ecnode in ecnode_list:
if ecnode.nodeType == doc.ELEMENT_NODE:
- exp_id = ecnode.getAttribute("exp_id")
- run_id = ecnode.getAttribute("run_id")
- nthreads = int(ecnode.getAttribute("nthreads"))
+ exp_id = xmldecode(ecnode.getAttribute("exp_id"))
+ run_id = xmldecode(ecnode.getAttribute("run_id"))
+ nthreads = xmldecode(ecnode.getAttribute("nthreads"))
- os.environ["NEPI_NTHREADS"] = str(nthreads)
+ os.environ["NEPI_NTHREADS"] = nthreads
ec = ExperimentController(exp_id = exp_id)
connections = set()
release_time = None
failed_time = None
- guid = int(rmnode.getAttribute("guid"))
+ guid = xmldecode(rmnode.getAttribute("guid"), int)
rtype = xmldecode(rmnode.getAttribute("rtype"))
- state = int(rmnode.getAttribute("state"))
+
+ # FOR NOW ONLY STATE NEW IS ALLOWED
+ state = 0
+ """
+ state = xmldecode(rmnode.getAttribute("state"), int)
if rmnode.hasAttribute("start_time"):
- start_time = xmldecode(rmnode.getAttribute("start_time"))
+ start_time = xmldecode(rmnode.getAttribute("start_time"),
+ datetime.datetime)
if rmnode.hasAttribute("stop_time"):
- stop_time = xmldecode(rmnode.getAttribute("stop_time"))
+ stop_time = xmldecode(rmnode.getAttribute("stop_time"),
+ datetime.datetime)
if rmnode.hasAttribute("discover_time"):
- dicover_time = xmldecode(rmnode.getAttribute("discover_time"))
+ dicover_time = xmldecode(rmnode.getAttribute("discover_time"),
+ datetime.datetime)
if rmnode.hasAttribute("provision_time"):
- provision_time = xmldecode(rmnode.getAttribute("provision_time"))
+ provision_time = xmldecode(rmnode.getAttribute("provision_time"),
+ datetime.datetime)
if rmnode.hasAttribute("ready_time"):
- ready_time = xmldecode(rmnode.getAttribute("ready_time"))
+ ready_time = xmldecode(rmnode.getAttribute("ready_time"),
+ datetime.datetime)
if rmnode.hasAttribute("release_time"):
- release_time = xmldecode(rmnode.getAttribute("release_time"))
+ release_time = xmldecode(rmnode.getAttribute("release_time"),
+ datetime.datetime)
if rmnode.hasAttribute("failed_time"):
- failed_time = xmldecode(rmnode.getAttribute("failed_time"))
+ failed_time = xmldecode(rmnode.getAttribute("failed_time"),
+ datetime.datetime)
+ """
ec.register_resource(rtype, guid = guid)
rm = ec.get_resource(guid)
for aanode in aanode_list:
name = xmldecode(aanode.getAttribute("name"))
value = xmldecode(aanode.getAttribute("value"))
- type = xmldecode(aanode.getAttribute("type"))
- value = to_type(type, value)
+ tipe = xmldecode(aanode.getAttribute("type"))
+ value = to_type(tipe, value)
rm.set(name, value)
cnode_list = rmnode.getElementsByTagName("connections")
if cnode_list:
ccnode_list = cnode_list[0].getElementsByTagName("connection")
for ccnode in ccnode_list:
- guid2 = int(ccnode.getAttribute("guid"))
+ guid2 = xmldecode(ccnode.getAttribute("guid"), int)
connections.add((guid, guid2))
tnode_list = rmnode.getElementsByTagName("traces")
if cnnode_list:
ccnnode_list = cnnode_list[0].getElementsByTagName("condition")
for ccnnode in ccnnode_list:
- action = int(ccnnode.getAttribute("action"))
- group = int(ccnnode.getAttribute("group"))
- state = int(ccnnode.getAttribute("state"))
- time = ccnnode.getAttribute("time")
+ action = xmldecode(ccnnode.getAttribute("action"), int)
+ group = xmldecode(ccnnode.getAttribute("group"), eval)
+ state = xmldecode(ccnnode.getAttribute("state"), int)
+ time = xmldecode(ccnnode.getAttribute("time"))
+ time = to_type('STRING', time)
ec.register_condition(guid, action, group, state, time = time)
ec.shutdown()
+ @skipIfNotAlive
+ def t_condition_serialize(self, host, user, depends):
+
+ dirpath = tempfile.mkdtemp()
+
+ ec = ExperimentController(exp_id="test-condition-serial")
+
+ node = ec.register_resource("LinuxNode")
+ ec.set(node, "hostname", host)
+ ec.set(node, "username", user)
+ ec.set(node, "cleanHome", True)
+ ec.set(node, "cleanProcesses", True)
+
+ server = ec.register_resource("LinuxApplication")
+ cmd = "echo 'HOLA' | nc -l 3333"
+ ec.set(server, "command", cmd)
+ ec.set(server, "depends", depends)
+ ec.register_connection(server, node)
+
+ client = ec.register_resource("LinuxApplication")
+ cmd = "nc 127.0.0.1 3333"
+ ec.set(client, "command", cmd)
+ ec.register_connection(client, node)
+
+ ec.register_condition(client, ResourceAction.START, server, ResourceState.STARTED)
+
+ apps = [client, server]
+
+ filepath = ec.save(dirpath)
+
+ ec.deploy()
+
+ ec.wait_finished(apps)
+
+ self.assertTrue(ec.state(node) == ResourceState.STARTED)
+ self.assertTrue(ec.state(server) == ResourceState.STOPPED)
+ self.assertTrue(ec.state(client) == ResourceState.STOPPED)
+
+ stdout = ec.trace(client, "stdout")
+ self.assertTrue(stdout.strip() == "HOLA")
+
+ ec.shutdown()
+
+ # Load serialized experiment
+ ec2 = ExperimentController.load(filepath)
+
+ ec2.deploy()
+ ec2.wait_finished(apps)
+
+ self.assertEquals(len(ec.resources), len(ec2.resources))
+
+ self.assertTrue(ec2.state(node) == ResourceState.STARTED)
+ self.assertTrue(ec2.state(server) == ResourceState.STOPPED)
+ self.assertTrue(ec2.state(client) == ResourceState.STOPPED)
+
+ stdout = ec2.trace(client, "stdout")
+
+ self.assertTrue(stdout.strip() == "HOLA")
+
+ ec2.shutdown()
+
+ shutil.rmtree(dirpath)
+
@skipIfNotAlive
def t_http_sources(self, host, user):
ec.register_connection(app, node)
+
ec.deploy()
ec.wait_finished([app])
def test_condition_ubuntu(self):
self.t_condition(self.ubuntu_host, self.ubuntu_user, "netcat")
+ def test_condition_serialize_fedora(self):
+ self.t_condition_serialize(self.fedora_host, self.fedora_user, "nc")
+
+ def test_condition_serialize_ubuntu(self):
+ self.t_condition_serialize(self.ubuntu_host, self.ubuntu_user, "netcat")
+
def test_http_sources_fedora(self):
self.t_http_sources(self.fedora_host, self.fedora_user)