"""
return self.wait(guids, states = [ResourceState.STARTED,
ResourceState.STOPPED,
+ ResourceState.FAILED,
ResourceState.FINISHED])
def wait(self, guids, states = [ResourceState.FINISHED,
- ResourceState.STOPPED]):
+ ResourceState.FAILED,
+ ResourceState.STOPPED]):
""" Blocking method that waits until all the RM from the 'guid' list
reached state 'state' or until a failure occurs
if isinstance(guids, int):
guids = [guids]
- while not all([self.state(guid) in states for guid in guids]) and \
- not any([self.state(guid) in [
- ResourceState.FAILED] for guid in guids]) and \
- not self.finished:
- # debug logging
- waited = ""
- for guid in guids:
- waited += "guid %d - %s \n" % (guid, self.state(guid, hr = True))
- self.logger.debug(" WAITING FOR %s " % waited )
-
- # We keep the sleep big to decrease the number of RM state queries
- time.sleep(2)
-
+ # we randomly alter the order of the guids to avoid ordering
+ # dependencies (e.g. LinuxApplication RMs runing on the same
+ # linux host will be synchronized by the LinuxNode SSH lock)
+ random.shuffle(guids)
+
+ while True:
+ # If no more guids to wait for or an error occured, then exit
+ if len(guids) == 0 or self.finished:
+ break
+
+ # If a guid reached one of the target states, remove it from list
+ guid = guids[0]
+ state = self.state(guid)
+
+ if state in states:
+ guids.remove(guid)
+ else:
+ # Debug...
+ self.logger.debug(" WAITING FOR %g - state %s " % (guid,
+ self.state(guid, hr = True)))
+
+ # Take the opportunity to 'refresh' the states of the RMs.
+ # Query only the first up to N guids (not to overwhelm
+ # the local machine)
+ n = 100
+ lim = n if len(guids) > n else ( len(guids) -1 )
+ nguids = guids[0: lim]
+
+ # schedule state request for all guids (take advantage of
+ # scheduler multi threading).
+ for guid in nguids:
+ callback = functools.partial(self.state, guid)
+ self.schedule("0s", callback)
+
+ # If the guid is not in one of the target states, wait and
+ # continue quering. We keep the sleep big to decrease the
+ # number of RM state queries
+ time.sleep(2)
+
def get_task(self, tid):
""" Get a specific task
import subprocess
# TODO: Resolve wildcards in commands!!
-# TODO: During provisioning, everything that is not scp could be
-# uploaded to a same script, http_sources download, etc...
-# and like that require performing less ssh connections!!!
-# TODO: Make stdin be a symlink to the original file in ${SHARE}
-# - later use md5sum to check wether the file needs to be re-upload
-
@clsinit
class LinuxApplication(ResourceManager):
# create dir for sources
self.info("Uploading stdin")
- dst = os.path.join(self.app_home, "stdin")
+ # upload stdin file to ${SHARE_DIR} directory
+ basename = os.path.basename(stdin)
+ dst = os.path.join(self.node.share_dir, basename)
self.node.upload(stdin, dst, overwrite = False, text = True)
+ # create "stdin" symlink on ${APP_HOME} directory
+ command = "( cd %s ; ln -s %s stdin )" % ( self.app_home, dst)
+
+ return command
+
def install_dependencies(self):
depends = self.get("depends")
if depends:
# wait until command finishes to execute
self.wait_run(pid, ppid)
- (out, err), proc = self.check_errors(home,
+ (eout, err), proc = self.check_errors(home,
ecodefile = ecodefile,
- stdout = stdout,
- stderr= stderr)
+ stderr = stderr)
# Out is what was written in the stderr file
if err:
msg = " Failed to run command '%s' " % command
- self.error(msg, out, err)
+ self.error(msg, eout, err)
if raise_on_error:
raise RuntimeError, msg
+
+ (out, oerr), proc = self.check_output(home, stdout)
return (out, err), proc
def check_errors(self, home,
ecodefile = "exitcode",
- stdout = "stdout",
stderr = "stderr"):
"""
Checks whether errors occurred while running a command.
"""
proc = None
err = ""
- # retrive standard output from the file
- (out, oerr), oproc = self.check_output(home, stdout)
# get exit code saved in the 'exitcode' file
ecode = self.exitcode(home, ecodefile)
if ecode == ExitCode.FILENOTFOUND and proc.poll() == 1:
err = ""
- return (out, err), proc
+ return ("", err), proc
def wait_pid(self, home, pidfile = "pidfile", raise_on_error = False):
""" Waits until the pid file for the command is generated,