From ea00a3f2149a18ec31e9b292e81cfe3dccc6f849 Mon Sep 17 00:00:00 2001 From: Alina Quereilhac Date: Sat, 6 Jul 2013 06:18:05 -0700 Subject: [PATCH] LinuxApplication: stdin made symlink to file in shared directory --- src/nepi/execution/ec.py | 56 ++++++++++++++++++------- src/nepi/resources/linux/application.py | 15 +++---- src/nepi/resources/linux/node.py | 14 +++---- 3 files changed, 56 insertions(+), 29 deletions(-) diff --git a/src/nepi/execution/ec.py b/src/nepi/execution/ec.py index 5ec34110..0a90dec7 100644 --- a/src/nepi/execution/ec.py +++ b/src/nepi/execution/ec.py @@ -194,10 +194,12 @@ class ExperimentController(object): """ return self.wait(guids, states = [ResourceState.STARTED, ResourceState.STOPPED, + ResourceState.FAILED, ResourceState.FINISHED]) def wait(self, guids, states = [ResourceState.FINISHED, - ResourceState.STOPPED]): + ResourceState.FAILED, + ResourceState.STOPPED]): """ Blocking method that waits until all the RM from the 'guid' list reached state 'state' or until a failure occurs @@ -207,19 +209,45 @@ class ExperimentController(object): if isinstance(guids, int): guids = [guids] - while not all([self.state(guid) in states for guid in guids]) and \ - not any([self.state(guid) in [ - ResourceState.FAILED] for guid in guids]) and \ - not self.finished: - # debug logging - waited = "" - for guid in guids: - waited += "guid %d - %s \n" % (guid, self.state(guid, hr = True)) - self.logger.debug(" WAITING FOR %s " % waited ) - - # We keep the sleep big to decrease the number of RM state queries - time.sleep(2) - + # we randomly alter the order of the guids to avoid ordering + # dependencies (e.g. LinuxApplication RMs runing on the same + # linux host will be synchronized by the LinuxNode SSH lock) + random.shuffle(guids) + + while True: + # If no more guids to wait for or an error occured, then exit + if len(guids) == 0 or self.finished: + break + + # If a guid reached one of the target states, remove it from list + guid = guids[0] + state = self.state(guid) + + if state in states: + guids.remove(guid) + else: + # Debug... + self.logger.debug(" WAITING FOR %g - state %s " % (guid, + self.state(guid, hr = True))) + + # Take the opportunity to 'refresh' the states of the RMs. + # Query only the first up to N guids (not to overwhelm + # the local machine) + n = 100 + lim = n if len(guids) > n else ( len(guids) -1 ) + nguids = guids[0: lim] + + # schedule state request for all guids (take advantage of + # scheduler multi threading). + for guid in nguids: + callback = functools.partial(self.state, guid) + self.schedule("0s", callback) + + # If the guid is not in one of the target states, wait and + # continue quering. We keep the sleep big to decrease the + # number of RM state queries + time.sleep(2) + def get_task(self, tid): """ Get a specific task diff --git a/src/nepi/resources/linux/application.py b/src/nepi/resources/linux/application.py index 506285ab..453010ef 100644 --- a/src/nepi/resources/linux/application.py +++ b/src/nepi/resources/linux/application.py @@ -29,12 +29,6 @@ import os import subprocess # TODO: Resolve wildcards in commands!! -# TODO: During provisioning, everything that is not scp could be -# uploaded to a same script, http_sources download, etc... -# and like that require performing less ssh connections!!! -# TODO: Make stdin be a symlink to the original file in ${SHARE} -# - later use md5sum to check wether the file needs to be re-upload - @clsinit class LinuxApplication(ResourceManager): @@ -430,9 +424,16 @@ class LinuxApplication(ResourceManager): # create dir for sources self.info("Uploading stdin") - dst = os.path.join(self.app_home, "stdin") + # upload stdin file to ${SHARE_DIR} directory + basename = os.path.basename(stdin) + dst = os.path.join(self.node.share_dir, basename) self.node.upload(stdin, dst, overwrite = False, text = True) + # create "stdin" symlink on ${APP_HOME} directory + command = "( cd %s ; ln -s %s stdin )" % ( self.app_home, dst) + + return command + def install_dependencies(self): depends = self.get("depends") if depends: diff --git a/src/nepi/resources/linux/node.py b/src/nepi/resources/linux/node.py index 5080a6c4..49f5342e 100644 --- a/src/nepi/resources/linux/node.py +++ b/src/nepi/resources/linux/node.py @@ -754,18 +754,19 @@ class LinuxNode(ResourceManager): # wait until command finishes to execute self.wait_run(pid, ppid) - (out, err), proc = self.check_errors(home, + (eout, err), proc = self.check_errors(home, ecodefile = ecodefile, - stdout = stdout, - stderr= stderr) + stderr = stderr) # Out is what was written in the stderr file if err: msg = " Failed to run command '%s' " % command - self.error(msg, out, err) + self.error(msg, eout, err) if raise_on_error: raise RuntimeError, msg + + (out, oerr), proc = self.check_output(home, stdout) return (out, err), proc @@ -833,7 +834,6 @@ class LinuxNode(ResourceManager): def check_errors(self, home, ecodefile = "exitcode", - stdout = "stdout", stderr = "stderr"): """ Checks whether errors occurred while running a command. @@ -843,8 +843,6 @@ class LinuxNode(ResourceManager): """ proc = None err = "" - # retrive standard output from the file - (out, oerr), oproc = self.check_output(home, stdout) # get exit code saved in the 'exitcode' file ecode = self.exitcode(home, ecodefile) @@ -862,7 +860,7 @@ class LinuxNode(ResourceManager): if ecode == ExitCode.FILENOTFOUND and proc.poll() == 1: err = "" - return (out, err), proc + return ("", err), proc def wait_pid(self, home, pidfile = "pidfile", raise_on_error = False): """ Waits until the pid file for the command is generated, -- 2.43.0