LinuxApplication: stdin made symlink to file in shared directory
authorAlina Quereilhac <alina.quereilhac@inria.fr>
Sat, 6 Jul 2013 13:18:05 +0000 (06:18 -0700)
committerAlina Quereilhac <alina.quereilhac@inria.fr>
Sat, 6 Jul 2013 13:18:05 +0000 (06:18 -0700)
src/nepi/execution/ec.py
src/nepi/resources/linux/application.py
src/nepi/resources/linux/node.py

index 5ec3411..0a90dec 100644 (file)
@@ -194,10 +194,12 @@ class ExperimentController(object):
         """
         return self.wait(guids, states = [ResourceState.STARTED,
             ResourceState.STOPPED,
+            ResourceState.FAILED,
             ResourceState.FINISHED])
 
     def wait(self, guids, states = [ResourceState.FINISHED, 
-        ResourceState.STOPPED]):
+            ResourceState.FAILED,
+            ResourceState.STOPPED]):
         """ Blocking method that waits until all the RM from the 'guid' list 
             reached state 'state' or until a failure occurs
             
@@ -207,19 +209,45 @@ class ExperimentController(object):
         if isinstance(guids, int):
             guids = [guids]
 
-        while not all([self.state(guid) in states for guid in guids]) and \
-                not any([self.state(guid) in [
-                        ResourceState.FAILED] for guid in guids]) and \
-                not self.finished:
-            # debug logging
-            waited = ""
-            for guid in guids:
-                waited += "guid %d - %s \n" % (guid, self.state(guid, hr = True))
-            self.logger.debug(" WAITING FOR %s " % waited )
-            
-            # We keep the sleep big to decrease the number of RM state queries
-            time.sleep(2)
-   
+        # we randomly alter the order of the guids to avoid ordering
+        # dependencies (e.g. LinuxApplication RMs runing on the same
+        # linux host will be synchronized by the LinuxNode SSH lock)
+        random.shuffle(guids)
+
+        while True:
+            # If no more guids to wait for or an error occured, then exit
+            if len(guids) == 0 or self.finished:
+                break
+
+            # If a guid reached one of the target states, remove it from list
+            guid = guids[0]
+            state = self.state(guid)
+
+            if state in states:
+                guids.remove(guid)
+            else:
+                # Debug...
+                self.logger.debug(" WAITING FOR %g - state %s " % (guid,
+                    self.state(guid, hr = True)))
+
+                # Take the opportunity to 'refresh' the states of the RMs.
+                # Query only the first up to N guids (not to overwhelm 
+                # the local machine)
+                n = 100
+                lim = n if len(guids) > n else ( len(guids) -1 )
+                nguids = guids[0: lim]
+
+                # schedule state request for all guids (take advantage of
+                # scheduler multi threading).
+                for guid in nguids:
+                    callback = functools.partial(self.state, guid)
+                    self.schedule("0s", callback)
+
+                # If the guid is not in one of the target states, wait and
+                # continue quering. We keep the sleep big to decrease the
+                # number of RM state queries
+                time.sleep(2)
+  
     def get_task(self, tid):
         """ Get a specific task
 
index 506285a..453010e 100644 (file)
@@ -29,12 +29,6 @@ import os
 import subprocess
 
 # TODO: Resolve wildcards in commands!!
-# TODO: During provisioning, everything that is not scp could be
-#       uploaded to a same script, http_sources download, etc...
-#       and like that require performing less ssh connections!!!
-# TODO: Make stdin be a symlink to the original file in ${SHARE}
-#       - later use md5sum to check wether the file needs to be re-upload
-
 
 @clsinit
 class LinuxApplication(ResourceManager):
@@ -430,9 +424,16 @@ class LinuxApplication(ResourceManager):
             # create dir for sources
             self.info("Uploading stdin")
             
-            dst = os.path.join(self.app_home, "stdin")
+            # upload stdin file to ${SHARE_DIR} directory
+            basename = os.path.basename(stdin)
+            dst = os.path.join(self.node.share_dir, basename)
             self.node.upload(stdin, dst, overwrite = False, text = True)
 
+            # create "stdin" symlink on ${APP_HOME} directory
+            command = "( cd %s ; ln -s %s stdin )" % ( self.app_home, dst)
+
+            return command
+
     def install_dependencies(self):
         depends = self.get("depends")
         if depends:
index 5080a6c..49f5342 100644 (file)
@@ -754,18 +754,19 @@ class LinuxNode(ResourceManager):
         # wait until command finishes to execute
         self.wait_run(pid, ppid)
       
-        (out, err), proc = self.check_errors(home,
+        (eout, err), proc = self.check_errors(home,
             ecodefile = ecodefile,
-            stdout = stdout,
-            stderr= stderr)
+            stderr = stderr)
 
         # Out is what was written in the stderr file
         if err:
             msg = " Failed to run command '%s' " % command
-            self.error(msg, out, err)
+            self.error(msg, eout, err)
 
             if raise_on_error:
                 raise RuntimeError, msg
+
+        (out, oerr), proc = self.check_output(home, stdout)
         
         return (out, err), proc
 
@@ -833,7 +834,6 @@ class LinuxNode(ResourceManager):
 
     def check_errors(self, home, 
             ecodefile = "exitcode", 
-            stdout = "stdout",
             stderr = "stderr"):
         """
         Checks whether errors occurred while running a command.
@@ -843,8 +843,6 @@ class LinuxNode(ResourceManager):
         """
         proc = None
         err = ""
-        # retrive standard output from the file
-        (out, oerr), oproc = self.check_output(home, stdout)
 
         # get exit code saved in the 'exitcode' file
         ecode = self.exitcode(home, ecodefile)
@@ -862,7 +860,7 @@ class LinuxNode(ResourceManager):
             if ecode == ExitCode.FILENOTFOUND and proc.poll() == 1: 
                 err = "" 
             
-        return (out, err), proc
+        return ("", err), proc
  
     def wait_pid(self, home, pidfile = "pidfile", raise_on_error = False):
         """ Waits until the pid file for the command is generated,