# Condition
# Topology behaviour : It should not be done by the user, but ....
-ec.register_condition([iface1, iface2, channel], ResourceAction.START, [node1, node2], ResourceState.STARTED , 2)
-ec.register_condition(channel, ResourceAction.START, [iface1, iface2], ResourceState.STARTED , 1)
-ec.register_condition(app1, ResourceAction.START, channel, ResourceState.STARTED , 1)
+#ec.register_condition([iface1, iface2, channel], ResourceAction.START, [node1, node2], ResourceState.STARTED , 2)
+#ec.register_condition(channel, ResourceAction.START, [iface1, iface2], ResourceState.STARTED , 1)
+#ec.register_condition(app1, ResourceAction.START, channel, ResourceState.STARTED , 1)
# User Behaviour
-ec.register_condition(app2, ResourceAction.START, app1, ResourceState.STARTED , 4)
-ec.register_condition([app1, app2], ResourceAction.STOP, app2, ResourceState.STARTED , 20)
-ec.register_condition(app3, ResourceAction.START, app2, ResourceState.STARTED , 25)
+ec.register_condition(app2, ResourceAction.START, app1, ResourceState.STARTED , "4s")
+ec.register_condition([app1, app2], ResourceAction.STOP, app2, ResourceState.STARTED , "20s")
+ec.register_condition(app3, ResourceAction.START, app2, ResourceState.STARTED , "25s")
# Deploy
ec.deploy()
# Stop Experiment
-time.sleep(45)
+time.sleep(50)
ec.shutdown()
-from neco.util.timefuncs import strfnow, strfdiff, strfvalid
+from neco.util.timefuncs import strfnow, strfdiff, strfvalid
import copy
import functools
_reschedule_delay = "1s"
class ResourceAction:
- START = 0
- STOP = 1
+ DEPLOYED = 0
+ START = 1
+ STOP = 2
class ResourceState:
NEW = 0
_rtype = "Resource"
_filters = None
_attributes = None
+ _waiters = []
@classmethod
def _register_filter(cls, attr):
def rtype(cls):
return cls._rtype
+ @classmethod
+ def waiters(cls):
+ return cls._waiters
+
@classmethod
def get_filters(cls):
""" Returns a copy of the filters
def stop_time(self):
return self._stop_time
+ @property
+ def deploy_time(self):
+ return self._deploy_time
+
@property
def state(self):
return self._state
:param time: time to wait after the state
:type time: str
+ .. note : time should be written like "2s" or "3m" with s for seconds, m for minutes, h for hours, ...
+ If for example, you need to wait 2min 30sec, time could be "150s" or "2.5m".
+ For the moment, 2m30s is not a correct syntax.
"""
reschedule = False
break
if time:
- if state == ResourceState.STARTED:
+ if state == ResourceState.DEPLOYED:
+ t = rm.deploy_time
+ elif state == ResourceState.STARTED:
t = rm.start_time
elif state == ResourceState.STOPPED:
t = rm.stop_time
# Only keep time information for START and STOP
break
- d = strfdiff(strfnow(), t)
- if d < time:
+ d = strfdiff(strfnow(), t)
+ #print "This is the value of d : " + str(d) + " // With the value of t : " + str(t) + " // With the value of time : " + str(time)
+ wait = strfdiff(strfvalid(time),strfvalid(str(d)+"s"))
+ if wait > 0.001:
reschedule = True
- delay = "%ds" % (int(time - d) +1)
+ delay = "%fs" % wait
break
return reschedule, delay
print TIME.strftime("%H:%M:%S", TIME.localtime()) + " RM : " + self._rtype + " (Guid : "+ str(self.guid) +") ----- start condition : " + str(self.conditions.items())
# Need to separate because it could have more that tuple of condition
# for the same action.
- conditions_start = self.conditions.get(ResourceAction.START, []):
+ conditions_start = self.conditions.get(ResourceAction.START, [])
for (group, state, time) in conditions_start:
reschedule, delay = self._needs_reschedule(group, state, time)
if reschedule:
if self.state != ResourceState.STARTED:
reschedule = True
else:
- #print TIME.strftime("%H:%M:%S", TIME.localtime()) + " RM : " + self._rtype + "\
+ print TIME.strftime("%H:%M:%S", TIME.localtime()) + " RM : " + self._rtype + "\
(Guid : "+ str(self.guid) +") ---- stop condition : " + str(self.conditions.items())
conditions_stop = self.conditions.get(ResourceAction.STOP, [])
for (group, state, time) in conditions_stop:
callback = functools.partial(self.stop_with_conditions)
self.ec.schedule(delay, callback)
else:
+ print TIME.strftime("%H:%M:%S", TIME.localtime()) + " RM : " + self._rtype + " (Guid : "+ str(self.guid) +") ----\
+------------------------------------------------------------------------------\
+---------------------------------------------------------------- STOPPING -- "
self.stop()
def deploy(self):
"""Execute all the differents steps required to reach the state DEPLOYED
"""
+ self.deploy_restriction()
self.discover()
self.provision()
+ self.deploy_with_conditions()
+
+ def deploy_restriction(self):
+ dep = set()
+ for guid in self.connections:
+ if self.ec.get_resource(guid).rtype() in self.__class__._waiters:
+ dep.add(guid)
+ self.register_condition(ResourceAction.DEPLOYED, dep, ResourceState.DEPLOYED)
+
+
+ def deploy_with_conditions(self):
+ """ Starts when all the conditions are reached
+
+ """
+ reschedule = False
+ delay = _reschedule_delay
+
+ ## evaluate if set conditions are met
+
+ # only can deploy when RM is NEW
+ if not self._state in [ResourceState.NEW]:
+ self.logger.error("Wrong state %s for stop" % self.state)
+ return
+ else:
+ print TIME.strftime("%H:%M:%S", TIME.localtime()) + " RM : " + self._rtype + " (Guid : "+ str(self.guid) +") ----- deploy condition : " + str(self.conditions.items())
+ # Need to separate because it could have more that tuple of condition
+ # for the same action.
+ conditions_deployed = self.conditions.get(ResourceAction.DEPLOYED, [])
+ for (group, state, time) in conditions_deployed:
+ reschedule, delay = self._needs_reschedule(group, state, time)
+ if reschedule:
+ break
+
+ if reschedule:
+ callback = functools.partial(self.deploy_with_conditions)
+ self.ec.schedule(delay, callback)
+ else:
+ print TIME.strftime("%H:%M:%S", TIME.localtime()) + " RM : " + self._rtype + " (Guid : "+ str(self.guid) +") ----\
+------------------------------------------------------------------------------\
+---------------------------------------------------------------- DEPLOY -- "
+ self.deploy_action()
+
+ def deploy_action(self):
+
+ self._deploy_time = strfnow()
self._state = ResourceState.DEPLOYED
+
def release(self):
"""Clean the resource at the end of the Experiment and change the status
"""
_rtype = "OMFApplication"
_authorized_connections = ["OMFNode"]
+ _waiters = ["OMFNode", "OMFChannel", "OMFWifiInterface"]
@classmethod
def _register_attributes(cls):
return rm
return None
- def deploy(self):
+ def deploy_action(self):
"""Deploy the RM
"""
- super(OMFApplication, self).deploy()
self._omf_api = OMFAPIFactory.get_api(self.get('xmppSlice'),
self.get('xmppHost'), self.get('xmppPort'), self.get('xmppPassword'))
+ super(OMFApplication, self).deploy_action()
def start(self):
"""Send Xmpp Message Using OMF protocol to execute the application
"""
_rtype = "OMFChannel"
- _authorized_connections = ["OMFWifiInterface"]
+ _authorized_connections = ["OMFWifiInterface", "OMFNode"]
+ _waiters = ["OMFNode", "OMFWifiInterface"]
+
@classmethod
def _register_attributes(cls):
"""
for elt in conn_set:
rm_iface = self.ec.get_resource(elt)
- for conn in rm_iface._connections:
+ for conn in rm_iface.connections:
rm_node = self.ec.get_resource(conn)
if rm_node.rtype() == "OMFNode":
couple = [rm_node.get('hostname'), rm_iface.get('alias')]
self._nodes_guid.append(couple)
return self._nodes_guid
- def deploy(self):
+ def deploy_action(self):
"""Deploy the RM
"""
- super(OMFChannel, self).deploy()
self._omf_api = OMFAPIFactory.get_api(self.get('xmppSlice'),
self.get('xmppHost'), self.get('xmppPort'), self.get('xmppPassword'))
+ if self.get('channel'):
+ set_nodes = self._get_target(self._connections)
+ print set_nodes
+ for couple in set_nodes:
+ #print "Couple node/alias : " + couple[0] + " , " + couple[1]
+ attrval = self.get('channel')
+ attrname = "net/%s/%s" % (couple[1], 'channel')
+ #print "Send the configure message"
+ self._omf_api.configure(couple[0], attrname, attrval)
+
+ super(OMFChannel, self).deploy_action()
+
def discover(self):
""" Discover the availables channels
"""Send Xmpp Message Using OMF protocol to configure Channel
"""
- if self.get('channel'):
- set_nodes = self._get_target(self._connections)
- print set_nodes
- for couple in set_nodes:
- #print "Couple node/alias : " + couple[0] + " , " + couple[1]
- attrval = self.get('channel')
- attrname = "net/%s/%s" % (couple[1], 'channel')
- #print "Send the configure message"
- self._omf_api.configure(couple[0], attrname, attrval)
+
super(OMFChannel, self).start()
def stop(self):
"""
_rtype = "OMFWifiInterface"
_authorized_connections = ["OMFNode" , "OMFChannel"]
+ _waiters = ["OMFNode"]
#alias2name = dict({'w0':'wlan0', 'w1':'wlan1'})
return rm
return None
- def deploy(self):
+ def deploy_action(self):
"""Deploy the RM
"""
- super(OMFWifiInterface, self).deploy()
self._omf_api = OMFAPIFactory.get_api(self.get('xmppSlice'),
self.get('xmppHost'), self.get('xmppPort'), self.get('xmppPassword'))
-
- def start(self):
- """Send Xmpp Messages Using OMF protocol to configure Interface
-
- """
self._logger.debug(" " + self.rtype() + " ( Guid : " + str(self._guid) +") : " +
self.get('mode') + " : " + self.get('type') + " : " +
self.get('essid') + " : " + self.get('ip'))
attrname = "net/%s/%s" % (self._alias, attrname)
#print "Send the configure message"
self._omf_api.configure(rm_node.get('hostname'), attrname, attrval)
+
+ super(OMFWifiInterface, self).deploy_action()
+
+
+ def start(self):
+ """Send Xmpp Messages Using OMF protocol to configure Interface
+
+ """
+
super(OMFWifiInterface, self).start()
def stop(self):
import neco
import logging
+import time
@clsinit
class OMFNode(ResourceManager):
"""
_rtype = "OMFNode"
_authorized_connections = ["OMFApplication" , "OMFWifiInterface"]
+ _waiters = []
@classmethod
def _register_attributes(cls):
(self.rtype(), self._guid, rm.rtype(), guid))
return False
- def deploy(self):
+ def deploy_action(self):
"""Deploy the RM
"""
self._omf_api = OMFAPIFactory.get_api(self.get('xmppSlice'),
self.get('xmppHost'), self.get('xmppPort'), self.get('xmppPassword'))
- super(OMFNode, self).deploy()
+ self._omf_api.enroll_host(self.get('hostname'))
+
+ super(OMFNode, self).deploy_action()
def discover(self):
""" Discover the availables nodes
"""
super(OMFNode, self).start()
- self._omf_api.enroll_host(self.get('hostname'))
+
def stop(self):
"""Send Xmpp Message Using OMF protocol to disconnect the node
return (ret or 0.001)
def strfvalid(date):
- """ User defined date to scheduler date """
+ """ User defined date to scheduler date
+
+ :param date : user define date matchin the pattern _strf
+ :type date : date
+
+ """
if not date:
return strfnow()
if _reabs.match(date):
from neco.resources.omf.omf_api import OMFAPIFactory
from neco.util import guid
+from neco.util.timefuncs import *
import time
import unittest
self.ec.register_connection(node1, iface1)
self.ec.register_connection(iface1, channel)
- # For the moment
- self.ec.register_condition([iface1, channel], ResourceAction.START, node1, ResourceState.STARTED , 2)
- self.ec.register_condition(channel, ResourceAction.START, iface1, ResourceState.STARTED , 1)
- self.ec.register_condition(app1, ResourceAction.START, channel, ResourceState.STARTED , 1)
+ self.ec.register_condition(app2, ResourceAction.START, app1, ResourceState.STARTED , "4s")
- # Real test
- self.ec.register_condition(app2, ResourceAction.START, app1, ResourceState.STARTED , 4)
+ self.assertEquals(len(self.ec.get_resource(app2).conditions), 1)
- self.assertEquals(len(self.ec.get_resource(node1).conditions), 0)
- self.assertEquals(len(self.ec.get_resource(iface1).conditions), 1)
- self.assertEquals(len(self.ec.get_resource(channel).conditions), 1)
- self.assertEquals(len(self.ec.get_resource(app1).conditions), 1)
+ def test_deploy(self):
+ node1 = self.ec.register_resource("OMFNode")
+ self.ec.set(node1, 'hostname', 'omf.plexus.wlab17')
+ self.ec.set(node1, 'xmppSlice', "nepi")
+ self.ec.set(node1, 'xmppHost', "xmpp-plexus.onelab.eu")
+ self.ec.set(node1, 'xmppPort', "5222")
+ self.ec.set(node1, 'xmppPassword', "1234")
+
+ iface1 = self.ec.register_resource("OMFWifiInterface")
+ self.ec.set(iface1, 'alias', "w0")
+ self.ec.set(iface1, 'mode', "adhoc")
+ self.ec.set(iface1, 'type', "g")
+ self.ec.set(iface1, 'essid', "vlcexp")
+ self.ec.set(iface1, 'ip', "10.0.0.17")
+ self.ec.set(iface1, 'xmppSlice', "nepi")
+ self.ec.set(iface1, 'xmppHost', "xmpp-plexus.onelab.eu")
+ self.ec.set(iface1, 'xmppPort', "5222")
+ self.ec.set(iface1, 'xmppPassword', "1234")
+
+ channel = self.ec.register_resource("OMFChannel")
+ self.ec.set(channel, 'channel', "6")
+ self.ec.set(channel, 'xmppSlice', "nepi")
+ self.ec.set(channel, 'xmppHost', "xmpp-plexus.onelab.eu")
+ self.ec.set(channel, 'xmppPort', "5222")
+ self.ec.set(channel, 'xmppPassword', "1234")
+
+ app1 = self.ec.register_resource("OMFApplication")
+ self.ec.set(app1, 'xmppSlice', "nepi")
+ self.ec.set(app1, 'xmppHost', "xmpp-plexus.onelab.eu")
+ self.ec.set(app1, 'xmppPort', "5222")
+ self.ec.set(app1, 'xmppPassword', "1234")
+
+ app2 = self.ec.register_resource("OMFApplication")
+ self.ec.set(app2, 'xmppSlice', "nepi")
+ self.ec.set(app2, 'xmppHost', "xmpp-plexus.onelab.eu")
+ self.ec.set(app2, 'xmppPort', "5222")
+ self.ec.set(app2, 'xmppPassword', "1234")
+
+ app3 = self.ec.register_resource("OMFApplication")
+ self.ec.set(app3, 'xmppSlice', "nepi")
+ self.ec.set(app3, 'xmppHost', "xmpp-plexus.onelab.eu")
+ self.ec.set(app3, 'xmppPort', "5222")
+ self.ec.set(app3, 'xmppPassword', "1234")
+
+ app4 = self.ec.register_resource("OMFApplication")
+ self.ec.set(app4, 'xmppSlice', "nepi")
+ self.ec.set(app4, 'xmppHost', "xmpp-plexus.onelab.eu")
+ self.ec.set(app4, 'xmppPort', "5222")
+ self.ec.set(app4, 'xmppPassword', "1234")
+
+ app5 = self.ec.register_resource("OMFApplication")
+ self.ec.set(app5, 'xmppSlice', "nepi")
+ self.ec.set(app5, 'xmppHost', "xmpp-plexus.onelab.eu")
+ self.ec.set(app5, 'xmppPort', "5222")
+ self.ec.set(app5, 'xmppPassword', "1234")
+
+ self.ec.register_connection(app1, node1)
+ self.ec.register_connection(app2, node1)
+ self.ec.register_connection(app3, node1)
+ self.ec.register_connection(app4, node1)
+ self.ec.register_connection(app5, node1)
+ self.ec.register_connection(node1, iface1)
+ self.ec.register_connection(iface1, channel)
+
+ self.ec.register_condition(app2, ResourceAction.START, app1, ResourceState.STARTED , "3s")
+ self.ec.register_condition(app3, ResourceAction.START, app2, ResourceState.STARTED , "2s")
+ self.ec.register_condition(app4, ResourceAction.START, app3, ResourceState.STARTED , "3s")
+ self.ec.register_condition(app5, ResourceAction.START, [app3, app2], ResourceState.STARTED , "2s")
+ self.ec.register_condition(app5, ResourceAction.START, app1, ResourceState.STARTED , "1m20s")
+ self.ec.deploy()
+ time.sleep(150)
- def xtest_deploy(self):
- ec.deploy()
+ self.assertEquals(round(strfdiff(self.ec.get_resource(app2).start_time, self.ec.get_resource(app1).start_time),1), 3.0)
+ self.assertEquals(round(strfdiff(self.ec.get_resource(app3).start_time, self.ec.get_resource(app2).start_time),1), 2.0)
+ self.assertEquals(round(strfdiff(self.ec.get_resource(app4).start_time, self.ec.get_resource(app3).start_time),1), 3.0)
+ self.assertEquals(round(strfdiff(self.ec.get_resource(app5).start_time, self.ec.get_resource(app3).start_time),1), 2.0)
+ self.assertEquals(round(strfdiff(self.ec.get_resource(app5).start_time, self.ec.get_resource(app1).start_time),1), 7.0)
+ # Precision is at 1/10. So this one returns an error 7.03 != 7.0
+ #self.assertEquals(strfdiff(self.ec.get_resource(app5).start_time, self.ec.get_resource(app1).start_time), 7)
#In order to release everythings
- time.sleep(45)
- ec.shutdown()
+ time.sleep(5)
if __name__ == '__main__':