import pkgutil
import weakref
-reschedule_delay = "0.5s"
+reschedule_delay = "1s"
class ResourceAction:
""" Action that a user can order to a Resource Manager
"""
return " %s guid: %d - %s " % (self._rtype, self.guid, msg)
- def connect(self, guid):
- """ Establishes a connection to the RM identified by guid
+ def register_connection(self, guid):
+ """ Registers a connection to the RM identified by guid
:param guid: Global unique identified of the RM to connect to
:type guid: int
"""
if self.valid_connection(guid):
+ self.connect(guid)
self._connections.add(guid)
- def disconnect(self, guid):
- """ Removes connection to the RM identified by guid
+ def unregister_connection(self, guid):
+ """ Removes a registered connection to the RM identified by guid
:param guid: Global unique identified of the RM to connect to
:type guid: int
"""
if guid in self._connections:
+ self.disconnect(guid)
self._connections.remove(guid)
def discover(self):
"""
pass
- def register_condition(self, action, group, state,
- time = None):
+ def register_condition(self, action, group, state, time = None):
""" Registers a condition on the resource manager to allow execution
of 'action' only after 'time' has elapsed from the moment all resources
in 'group' reached state 'state'
:type time: str
"""
+
+ if not action in self.conditions:
+ self._conditions[action] = list()
+
conditions = self.conditions.get(action)
- if not conditions:
- conditions = list()
- self._conditions[action] = conditions
# For each condition to register a tuple of (group, state, time) is
# added to the 'action' list
conditions.append((group, state, time))
+ def unregister_condition(self, group, action = None):
+ """ Removed conditions for a certain group of guids
+
+ :param action: Action to restrict to condition (either 'START' or 'STOP')
+ :type action: str
+
+ :param group: Group of RMs to wait for (list of guids)
+ :type group: int or list of int
+
+ """
+ # For each condition a tuple of (group, state, time) is
+ # added to the 'action' list
+ if not isinstance(group, list):
+ group = [group]
+
+ for act, conditions in self.conditions.iteritems():
+ if action and act != action:
+ continue
+
+ for condition in list(conditions):
+ (grp, state, time) = condition
+
+ # If there is an intersection between grp and group,
+ # then remove intersected elements
+ intsec = set(group).intersection(set(grp))
+ if intsec:
+ idx = conditions.index(condition)
+ newgrp = set(grp)
+ newgrp.difference_update(intsec)
+ conditions[idx] = (newgrp, state, time)
+
def get_connected(self, rtype = None):
""" Returns the list of RM with the type 'rtype'
for guid in group:
rm = self.ec.get_resource(guid)
# If the RM state is lower than the requested state we must
- # reschedule (e.g. if RM is READY but we required STARTED)
+ # reschedule (e.g. if RM is READY but we required STARTED).
if rm.state < state:
reschedule = True
break
# time still to wait
wait = tdiffsec(stabsformat(time), stabsformat(waited))
- if wait > 0:
+ if wait > 0.001:
reschedule = True
delay = "%fs" % wait
break
self._failed_time = tnow()
self._state = ResourceState.FAILED
+ def connect(self, guid):
+ """ Performs actions that need to be taken upon associating RMs.
+ This method should be redefined when necessary in child classes.
+ """
+ pass
+
+ def disconnect(self, guid):
+ """ Performs actions that need to be taken upon disassociating RMs.
+ This method should be redefined when necessary in child classes.
+ """
+ pass
+
def valid_connection(self, guid):
"""Checks whether a connection with the other RM
is valid.