-
from neco.util.timefuncs import strfnow, strfdiff, strfvalid
import copy
import functools
import logging
import weakref
-import time as TIME
_reschedule_delay = "1s"
class ResourceAction:
- DEPLOYED = 0
+ DEPLOY = 0
START = 1
STOP = 2
class ResourceState:
NEW = 0
- DEPLOYED = 1
- STARTED = 2
- STOPPED = 3
- FAILED = 4
- RELEASED = 5
+ DISCOVERED = 1
+ PROVISIONED = 2
+ READY = 3
+ STARTED = 4
+ STOPPED = 5
+ FAILED = 6
+ RELEASED = 7
def clsinit(cls):
cls._clsinit()
_rtype = "Resource"
_filters = None
_attributes = None
- _waiters = []
@classmethod
def _register_filter(cls, attr):
def rtype(cls):
return cls._rtype
- @classmethod
- def waiters(cls):
- return cls._waiters
-
@classmethod
def get_filters(cls):
""" Returns a copy of the filters
self._start_time = None
self._stop_time = None
+ self._discover_time = None
+ self._provision_time = None
+ self._ready_time = None
+ self._release_time = None
# Logging
- self._logger = logging.getLogger("neco.execution.resource.Resource.%s" %
- self.guid)
+ self._logger = logging.getLogger("neco.execution.resource.Resource %s.%d " % (self._rtype, self.guid))
@property
def logger(self):
@property
def start_time(self):
- """ timestamp with """
+ """ Returns timestamp with the time the RM started """
return self._start_time
@property
def stop_time(self):
+ """ Returns timestamp with the time the RM stopped """
return self._stop_time
@property
- def deploy_time(self):
- return self._deploy_time
+ def discover_time(self):
+ """ Returns timestamp with the time the RM passed to state discovered """
+ return self._discover_time
+
+ @property
+ def provision_time(self):
+ """ Returns timestamp with the time the RM passed to state provisioned """
+ return self._provision_time
+
+ @property
+ def ready_time(self):
+ """ Returns timestamp with the time the RM passed to state ready """
+ return self._ready_time
+
+ @property
+ def release_time(self):
+ """ Returns timestamp with the time the RM was released """
+ return self._release_time
@property
def state(self):
return self._state
def connect(self, guid):
- if (self._validate_connection(guid)):
+ if self.valid_connection(guid):
self._connections.add(guid)
def discover(self, filters = None):
- pass
+ self._discover_time = strfnow()
+ self._state = ResourceState.DISCOVERED
def provision(self, filters = None):
- pass
+ self._provision_time = strfnow()
+ self._state = ResourceState.PROVISIONED
def start(self):
""" Start the Resource Manager
"""
- if not self._state in [ResourceState.DEPLOYED, ResourceState.STOPPED]:
+ if not self._state in [ResourceState.READY, ResourceState.STOPPED]:
self.logger.error("Wrong state %s for start" % self.state)
+ return
self._start_time = strfnow()
self._state = ResourceState.STARTED
"""
if not self._state in [ResourceState.STARTED]:
self.logger.error("Wrong state %s for stop" % self.state)
+ return
self._stop_time = strfnow()
self._state = ResourceState.STOPPED
def register_condition(self, action, group, state,
time = None):
- """ Do the 'action' after 'time' on the current RM when 'group'
- reach the state 'state'
+ """ Registers a condition on the resource manager to allow execution
+ of 'action' only after 'time' has elapsed from the moment all resources
+ in 'group' reached state 'state'
- :param action: Action to do. Either 'START' or 'STOP'
+ :param action: Action to restrict to condition (either 'START' or 'STOP')
:type action: str
- :param group: group of RM
- :type group: str
- :param state: RM that are part of the condition
- :type state: list
- :param time: Time to wait after the state is reached (ex : '2s' )
+ :param group: Group of RMs to wait for (list of guids)
+ :type group: int or list of int
+ :param state: State to wait for on all RM in group. (either 'STARTED' or 'STOPPED')
+ :type state: str
+ :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
:type time: str
"""
- if action not in self.conditions:
- self._conditions[action] = set()
+ conditions = self.conditions.get(action)
+ if not conditions:
+ conditions = list()
+ self._conditions[action] = conditions
- # We need to use only sequence inside a set and not a list.
- # As group is a list, we need to change it.
- #print (tuple(group), state, time)
- self.conditions.get(action).add((tuple(group), state, time))
+ # For each condition to register a tuple of (group, state, time) is
+ # added to the 'action' list
+ if not isinstance(group, list):
+ group = [group]
+
+ conditions.append((group, state, time))
def _needs_reschedule(self, group, state, time):
""" Internal method that verify if 'time' has elapsed since
all elements in 'group' have reached state 'state'.
- :param group: RM that are part of the condition
- :type group: list
- :param state: State that group need to reach for the condtion
+ :param group: Group of RMs to wait for (list of guids)
+ :type group: int or list of int
+ :param state: State to wait for on all RM in group. (either 'STARTED' or 'STOPPED')
:type state: str
- :param time: time to wait after the state
+ :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
:type time: str
.. note : time should be written like "2s" or "3m" with s for seconds, m for minutes, h for hours, ...
# check state and time elapsed on all RMs
for guid in group:
rm = self.ec.get_resource(guid)
- # If the RMs is lower than the requested state we must
- # reschedule (e.g. if RM is DEPLOYED but we required STARTED)
+ # If the RM state is lower than the requested state we must
+ # reschedule (e.g. if RM is READY but we required STARTED)
if rm.state < state:
reschedule = True
break
+ # If there is a time restriction, we must verify the
+ # restriction is satisfied
if time:
- if state == ResourceState.DEPLOYED:
- t = rm.deploy_time
+ if state == ResourceState.DISCOVERED:
+ t = rm.discover_time
+ if state == ResourceState.PROVISIONED:
+ t = rm.provision_time
+ elif state == ResourceState.READY:
+ t = rm.ready_time
elif state == ResourceState.STARTED:
t = rm.start_time
elif state == ResourceState.STOPPED:
break
d = strfdiff(strfnow(), t)
- #print "This is the value of d : " + str(d) + " // With the value of t : " + str(t) + " // With the value of time : " + str(time)
wait = strfdiff(strfvalid(time),strfvalid(str(d)+"s"))
if wait > 0.001:
reschedule = True
def set_with_conditions(self, name, value, group, state, time):
""" Set value 'value' on attribute with name 'name' when 'time'
has elapsed since all elements in 'group' have reached state
- 'state'.
+ 'state'
- :param name: Name of the attribute
+ :param name: Name of the attribute to set
:type name: str
- :param name: Value of the attribute
+ :param name: Value of the attribute to set
:type name: str
- :param group: RM that are part of the condition
- :type group: list
- :param state: State that group need to reach before set
+ :param group: Group of RMs to wait for (list of guids)
+ :type group: int or list of int
+ :param state: State to wait for on all RM in group. (either 'STARTED', 'STOPPED' or 'READY')
:type state: str
- :param time: Time to wait after the state is reached (ex : '2s' )
+ :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
:type time: str
"""
self.set(name, value)
def start_with_conditions(self):
- """ Starts when all the conditions are reached
+ """ Starts RM when all the conditions in self.conditions for
+ action 'START' are satisfied.
"""
reschedule = False
## evaluate if set conditions are met
- # only can start when RM is either STOPPED or DEPLOYED
- if self.state not in [ResourceState.STOPPED, ResourceState.DEPLOYED]:
+ # only can start when RM is either STOPPED or READY
+ if self.state not in [ResourceState.STOPPED, ResourceState.READY]:
reschedule = True
else:
- print TIME.strftime("%H:%M:%S", TIME.localtime()) + " RM : " + self._rtype + " (Guid : "+ str(self.guid) +") ----- start condition : " + str(self.conditions.items())
- # Need to separate because it could have more that tuple of condition
- # for the same action.
- conditions_start = self.conditions.get(ResourceAction.START, [])
- for (group, state, time) in conditions_start:
+ self.logger.debug("---- START CONDITIONS ---- %s" %
+ self.conditions.get(ResourceAction.START))
+
+ # Verify all start conditions are met
+ start_conditions = self.conditions.get(ResourceAction.START, [])
+ for (group, state, time) in start_conditions:
reschedule, delay = self._needs_reschedule(group, state, time)
if reschedule:
break
if reschedule:
- callback = functools.partial(self.start_with_conditions)
- self.ec.schedule(delay, callback)
+ self.ec.schedule(delay, self.start_with_conditions)
else:
- print TIME.strftime("%H:%M:%S", TIME.localtime()) + " RM : " + self._rtype + " (Guid : "+ str(self.guid) +") ----\
-------------------------------------------------------------------------------\
----------------------------------------------------------------- STARTING -- "
+ self.logger.debug("----- STARTING ---- ")
self.start()
def stop_with_conditions(self):
- """ Stop when all the conditions are reached
+ """ Stops RM when all the conditions in self.conditions for
+ action 'STOP' are satisfied.
"""
reschedule = False
if self.state != ResourceState.STARTED:
reschedule = True
else:
- print TIME.strftime("%H:%M:%S", TIME.localtime()) + " RM : " + self._rtype + "\
- (Guid : "+ str(self.guid) +") ---- stop condition : " + str(self.conditions.items())
- conditions_stop = self.conditions.get(ResourceAction.STOP, [])
- for (group, state, time) in conditions_stop:
+ self.logger.debug(" ---- STOP CONDITIONS ---- %s" %
+ self.conditions.get(ResourceAction.STOP))
+
+ stop_conditions = self.conditions.get(ResourceAction.STOP, [])
+ for (group, state, time) in stop_conditions:
reschedule, delay = self._needs_reschedule(group, state, time)
if reschedule:
break
+
if reschedule:
callback = functools.partial(self.stop_with_conditions)
self.ec.schedule(delay, callback)
else:
- print TIME.strftime("%H:%M:%S", TIME.localtime()) + " RM : " + self._rtype + " (Guid : "+ str(self.guid) +") ----\
-------------------------------------------------------------------------------\
----------------------------------------------------------------- STOPPING -- "
+ self.logger.debug(" ----- STOPPING ---- ")
self.stop()
def deploy(self):
- """Execute all the differents steps required to reach the state DEPLOYED
+ """ Execute all steps required for the RM to reach the state READY
"""
- self.deploy_restriction()
- self.discover()
- self.provision()
- self.deploy_with_conditions()
-
- def deploy_restriction(self):
- dep = set()
- for guid in self.connections:
- if self.ec.get_resource(guid).rtype() in self.__class__._waiters:
- dep.add(guid)
- self.register_condition(ResourceAction.DEPLOYED, dep, ResourceState.DEPLOYED)
-
-
- def deploy_with_conditions(self):
- """ Starts when all the conditions are reached
-
- """
- reschedule = False
- delay = _reschedule_delay
-
- ## evaluate if set conditions are met
-
- # only can deploy when RM is NEW
- if not self._state in [ResourceState.NEW]:
- self.logger.error("Wrong state %s for stop" % self.state)
+ if self._state > ResourceState.READY:
+ self.logger.error("Wrong state %s for deploy" % self.state)
return
- else:
- print TIME.strftime("%H:%M:%S", TIME.localtime()) + " RM : " + self._rtype + " (Guid : "+ str(self.guid) +") ----- deploy condition : " + str(self.conditions.items())
- # Need to separate because it could have more that tuple of condition
- # for the same action.
- conditions_deployed = self.conditions.get(ResourceAction.DEPLOYED, [])
- for (group, state, time) in conditions_deployed:
- reschedule, delay = self._needs_reschedule(group, state, time)
- if reschedule:
- break
-
- if reschedule:
- callback = functools.partial(self.deploy_with_conditions)
- self.ec.schedule(delay, callback)
- else:
- print TIME.strftime("%H:%M:%S", TIME.localtime()) + " RM : " + self._rtype + " (Guid : "+ str(self.guid) +") ----\
-------------------------------------------------------------------------------\
----------------------------------------------------------------- DEPLOY -- "
- self.deploy_action()
-
- def deploy_action(self):
-
- self._deploy_time = strfnow()
- self._state = ResourceState.DEPLOYED
+ self._ready_time = strfnow()
+ self._state = ResourceState.READY
def release(self):
"""Clean the resource at the end of the Experiment and change the status
"""
+ self._release_time = strfnow()
self._state = ResourceState.RELEASED
- def _validate_connection(self, guid):
+ def valid_connection(self, guid):
"""Check if the connection is available.
:param guid: Guid of the current Resource Manager
#!/usr/bin/env python
-from neco.execution.resource import ResourceManager, ResourceFactory, clsinit
from neco.execution.attribute import Attribute
+from neco.execution.ec import ExperimentController
+from neco.execution.resource import ResourceManager, ResourceState, clsinit
+import time
import unittest
@clsinit
def __init__(self, ec, guid):
super(AnotherResource, self).__init__(ec, guid)
-class EC(object):
- pass
-
class ResourceFactoryTestCase(unittest.TestCase):
def test_add_resource_factory(self):
+ from neco.execution.resource import ResourceFactory
+
ResourceFactory.register_type(MyResource)
ResourceFactory.register_type(AnotherResource)
self.assertEquals(len(ResourceFactory.resource_types()), 2)
-# TODO:!!!
+def get_connected(connections, rtype, ec):
+ connected = []
+ for guid in connections:
+ rm = ec.get_resource(guid)
+ if rm.rtype() == rtype:
+ connected.append(rm)
+ return connected
+
+class Channel(ResourceManager):
+ _rtype = "Channel"
+
+ def __init__(self, ec, guid):
+ super(Channel, self).__init__(ec, guid)
+
+ def deploy(self):
+ time.sleep(1)
+ super(Channel, self).deploy()
+ self.logger.debug(" -------- DEPLOYED ------- ")
+
+class Interface(ResourceManager):
+ _rtype = "Interface"
+
+ def __init__(self, ec, guid):
+ super(Interface, self).__init__(ec, guid)
+
+ def deploy(self):
+ node = get_connected(self.connections, Node.rtype(), self.ec)[0]
+ chan = get_connected(self.connections, Channel.rtype(), self.ec)[0]
+
+ if node.state < ResourceState.PROVISIONED:
+ self.ec.schedule("0.5s", self.deploy)
+ elif chan.state < ResourceState.READY:
+ self.ec.schedule("0.5s", self.deploy)
+ else:
+ time.sleep(2)
+ super(Interface, self).deploy()
+ self.logger.debug(" -------- DEPLOYED ------- ")
+
+class Node(ResourceManager):
+ _rtype = "Node"
+
+ def __init__(self, ec, guid):
+ super(Node, self).__init__(ec, guid)
+
+ def deploy(self):
+ if self.state == ResourceState.NEW:
+ self.discover()
+ self.provision()
+ self.logger.debug(" -------- PROVISIONED ------- ")
+ self.ec.schedule("3s", self.deploy)
+ elif self.state == ResourceState.PROVISIONED:
+ ifaces = get_connected(self.connections, Interface.rtype(), self.ec)
+ for rm in ifaces:
+ if rm.state < ResourceState.READY:
+ self.ec.schedule("0.5s", self.deploy)
+ return
+
+ super(Node, self).deploy()
+ self.logger.debug(" -------- DEPLOYED ------- ")
+
+class Application(ResourceManager):
+ _rtype = "Application"
+
+ def __init__(self, ec, guid):
+ super(Application, self).__init__(ec, guid)
+
+ def deploy(self):
+ node = get_connected(self.connections, Node.rtype(), self.ec)[0]
+ if node.state < ResourceState.READY:
+ self.ec.schedule("0.5s", self.deploy)
+ else:
+ super(Application, self).deploy()
+ self.logger.debug(" -------- DEPLOYED ------- ")
+
class ResourceManagerTestCase(unittest.TestCase):
+ def test_deploy_in_order(self):
+ """
+ Test scenario: 2 applications running one on 1 node each.
+ Nodes are connected to Interfaces which are connected
+ through a channel between them.
+
+ - Application needs to wait until Node is ready to be ready
+ - Node needs to wait until Interface is ready to be ready
+ - Interface needs to wait until Node is provisioned to be ready
+ - Interface needs to wait until Channel is ready to be ready
+ - The channel doesn't wait for any other resource to be ready
+
+ """
+ from neco.execution.resource import ResourceFactory
+
+ ResourceFactory.register_type(Application)
+ ResourceFactory.register_type(Node)
+ ResourceFactory.register_type(Interface)
+ ResourceFactory.register_type(Channel)
+
+ ec = ExperimentController()
+
+ app1 = ec.register_resource("Application")
+ app2 = ec.register_resource("Application")
+ node1 = ec.register_resource("Node")
+ node2 = ec.register_resource("Node")
+ iface1 = ec.register_resource("Interface")
+ iface2 = ec.register_resource("Interface")
+ chan = ec.register_resource("Channel")
+
+ ec.register_connection(app1, node1)
+ ec.register_connection(app2, node2)
+ ec.register_connection(iface1, node1)
+ ec.register_connection(iface2, node2)
+ ec.register_connection(iface1, chan)
+ ec.register_connection(iface2, chan)
+
+ try:
+ ec.deploy()
+
+ while not all([ ec.state(guid) == ResourceState.STARTED \
+ for guid in [app1, app2, node1, node2, iface1, iface2, chan]]):
+ time.sleep(0.5)
+
+ finally:
+ ec.shutdown()
+
+ rmapp1 = ec.get_resource(app1)
+ rmapp2 = ec.get_resource(app2)
+ rmnode1 = ec.get_resource(node1)
+ rmnode2 = ec.get_resource(node2)
+ rmiface1 = ec.get_resource(iface1)
+ rmiface2 = ec.get_resource(iface2)
+ rmchan = ec.get_resource(chan)
+
+ ## Validate deploy order
+ # - Application needs to wait until Node is ready to be ready
+ self.assertTrue(rmnode1.ready_time < rmapp1.ready_time)
+ self.assertTrue(rmnode2.ready_time < rmapp2.ready_time)
+
+ # - Node needs to wait until Interface is ready to be ready
+ self.assertTrue(rmnode1.ready_time > rmiface1.ready_time)
+ self.assertTrue(rmnode2.ready_time > rmiface2.ready_time)
+
+ # - Interface needs to wait until Node is provisioned to be ready
+ self.assertTrue(rmnode1.provision_time < rmiface1.ready_time)
+ self.assertTrue(rmnode2.provision_time < rmiface2.ready_time)
+
+ # - Interface needs to wait until Channel is ready to be ready
+ self.assertTrue(rmchan.ready_time < rmiface1.ready_time)
+ self.assertTrue(rmchan.ready_time < rmiface2.ready_time)
+
def test_start_with_condition(self):
+ # TODO!!!
pass
def test_stop_with_condition(self):
+ # TODO!!!
pass
def test_set_with_condition(self):
+ # TODO!!!
pass