- while not all([self.state(guid) in states for guid in guids]) and \
- not any([self.state(guid) in [
- ResourceState.FAILED] for guid in guids]) and \
- not self.finished:
- # debug logging
- waited = ""
- for guid in guids:
- waited += "guid %d - %s \n" % (guid, self.state(guid, hr = True))
- self.logger.debug(" WAITING FOR %s " % waited )
-
- # We keep the sleep big to decrease the number of RM state queries
- time.sleep(2)
-
+ # we randomly alter the order of the guids to avoid ordering
+ # dependencies (e.g. LinuxApplication RMs runing on the same
+ # linux host will be synchronized by the LinuxNode SSH lock)
+ random.shuffle(guids)
+
+ while True:
+ # If no more guids to wait for or an error occured, then exit
+ if len(guids) == 0 or self.finished:
+ break
+
+ # If a guid reached one of the target states, remove it from list
+ guid = guids[0]
+ state = self.state(guid)
+
+ if state in states:
+ guids.remove(guid)
+ else:
+ # Debug...
+ self.logger.debug(" WAITING FOR %g - state %s " % (guid,
+ self.state(guid, hr = True)))
+
+ # Take the opportunity to 'refresh' the states of the RMs.
+ # Query only the first up to N guids (not to overwhelm
+ # the local machine)
+ n = 100
+ lim = n if len(guids) > n else ( len(guids) -1 )
+ nguids = guids[0: lim]
+
+ # schedule state request for all guids (take advantage of
+ # scheduler multi threading).
+ for guid in nguids:
+ callback = functools.partial(self.state, guid)
+ self.schedule("0s", callback)
+
+ # If the guid is not in one of the target states, wait and
+ # continue quering. We keep the sleep big to decrease the
+ # number of RM state queries
+ time.sleep(2)
+