"""
return self.wait(guids, states = [ResourceState.STARTED,
ResourceState.STOPPED,
+ ResourceState.FAILED,
ResourceState.FINISHED])
def wait(self, guids, states = [ResourceState.FINISHED,
- ResourceState.STOPPED]):
+ ResourceState.FAILED,
+ ResourceState.STOPPED]):
""" Blocking method that waits until all the RM from the 'guid' list
reached state 'state' or until a failure occurs
if isinstance(guids, int):
guids = [guids]
- while not all([self.state(guid) in states for guid in guids]) and \
- not any([self.state(guid) in [
- ResourceState.FAILED] for guid in guids]) and \
- not self.finished:
- # debug logging
- waited = ""
- for guid in guids:
- waited += "guid %d - %s \n" % (guid, self.state(guid, hr = True))
- self.logger.debug(" WAITING FOR %s " % waited )
-
- # We keep the sleep big to decrease the number of RM state queries
- time.sleep(2)
-
+ # we randomly alter the order of the guids to avoid ordering
+ # dependencies (e.g. LinuxApplication RMs runing on the same
+ # linux host will be synchronized by the LinuxNode SSH lock)
+ random.shuffle(guids)
+
+ while True:
+ # If no more guids to wait for or an error occured, then exit
+ if len(guids) == 0 or self.finished:
+ break
+
+ # If a guid reached one of the target states, remove it from list
+ guid = guids[0]
+ state = self.state(guid)
+
+ if state in states:
+ guids.remove(guid)
+ else:
+ # Debug...
+ self.logger.debug(" WAITING FOR %g - state %s " % (guid,
+ self.state(guid, hr = True)))
+
+ # Take the opportunity to 'refresh' the states of the RMs.
+ # Query only the first up to N guids (not to overwhelm
+ # the local machine)
+ n = 100
+ lim = n if len(guids) > n else ( len(guids) -1 )
+ nguids = guids[0: lim]
+
+ # schedule state request for all guids (take advantage of
+ # scheduler multi threading).
+ for guid in nguids:
+ callback = functools.partial(self.state, guid)
+ self.schedule("0s", callback)
+
+ # If the guid is not in one of the target states, wait and
+ # continue quering. We keep the sleep big to decrease the
+ # number of RM state queries
+ time.sleep(2)
+
def get_task(self, tid):
""" Get a specific task