Scheduler optimization
authorAlina Quereilhac <alina.quereilhac@inria.fr>
Wed, 22 May 2013 22:45:01 +0000 (00:45 +0200)
committerAlina Quereilhac <alina.quereilhac@inria.fr>
Wed, 22 May 2013 22:45:01 +0000 (00:45 +0200)
examples/linux/scalability.py
src/nepi/execution/ec.py
src/nepi/resources/linux/application.py
src/nepi/resources/omf/omf_api.py

index b85d25b..86f6a3f 100644 (file)
@@ -64,7 +64,7 @@ if __name__ == '__main__':
     apps = []
   
     hostnames = [
-             "planetlab-2.research.netlab.hut.fi",
+             #"planetlab-2.research.netlab.hut.fi",
              "planetlab2.willab.fi",
              "planetlab3.hiit.fi",
              "planetlab4.hiit.fi",
@@ -74,10 +74,10 @@ if __name__ == '__main__':
              "planetlab-1.ida.liu.se",
              "planetlab2.s3.kth.se",
              "planetlab1.sics.se",
-             "planetlab1.tlm.unavarra.es",
-             "planetlab2.uc3m.es",
-             "planetlab1.uc3m.es",
-             "planetlab2.um.es",
+             #"planetlab1.tlm.unavarra.es",
+             #"planetlab2.uc3m.es",
+             #"planetlab1.uc3m.es",
+             #"planetlab2.um.es",
              "planet1.servers.ua.pt",
              "planetlab2.fct.ualg.pt",
              "planetlab-1.tagus.ist.utl.pt",
index 42bb138..d6fcf87 100644 (file)
@@ -17,6 +17,7 @@
 
 """
 
+import functools
 import logging
 import os
 import random
@@ -285,71 +286,59 @@ class ExperimentController(object):
         """
         self.logger.debug(" ------- DEPLOY START ------ ")
 
-        stop = []
-
-        def steps(rm):
-            try:
-                rm.deploy()
-                rm.start_with_conditions()
-
-                # Only if the RM has STOP conditions we
-                # schedule a stop. Otherwise the RM will stop immediately
-                if rm.conditions.get(ResourceAction.STOP):
-                    rm.stop_with_conditions()
-            except:
-                import traceback
-                err = traceback.format_exc()
-                
-                self._logger.error("Error occurred while deploying resources: %s" % err)
-
-                # stop deployment
-                stop.append(None)
-
         if not group:
             group = self.resources
 
         # Before starting deployment we disorder the group list with the
         # purpose of speeding up the whole deployment process.
         # It is likely that the user inserted in the 'group' list closely
-        # resources resources one after another (e.g. all applications
+        # resources one after another (e.g. all applications
         # connected to the same node can likely appear one after another).
         # This can originate a slow down in the deployment since the N 
         # threads the parallel runner uses to processes tasks may all
         # be taken up by the same family of resources waiting for the 
-        # same conditions. 
-        # If we disorder the group list, this problem can be mitigated
+        # same conditions (e.g. LinuxApplications running on a same 
+        # node share a single lock, so they will tend to be serialized).
+        # If we disorder the group list, this problem can be mitigated.
         random.shuffle(group)
 
-        threads = []
+        def wait_all_and_start(group):
+            reschedule = False
+            for guid in group:
+                rm = self.get_resource(guid)
+                if rm.state < ResourceState.READY:
+                    reschedule = True
+                    break
+
+            if reschedule:
+                callback = functools.partial(wait_all_and_start, group)
+                self.schedule("1s", callback)
+            else:
+                # If all resources are read, we schedule the start
+                for guid in group:
+                    rm = self.get_resource(guid)
+                    self.schedule("0.01s", rm.start_with_conditions)
+
+        if wait_all_ready:
+            # Schedule the function that will check all resources are
+            # READY, and only then it will schedule the start.
+            # This is aimed to reduce the number of tasks looping in the scheduler.
+            # Intead of having N start tasks, we will have only one
+            callback = functools.partial(wait_all_and_start, group)
+            self.schedule("1s", callback)
+
         for guid in group:
             rm = self.get_resource(guid)
+            self.schedule("0.001s", rm.deploy)
 
-            if wait_all_ready:
-                towait = list(group)
-                towait.remove(guid)
-                self.register_condition(guid, ResourceAction.START, 
-                        towait, ResourceState.READY)
+            if not wait_all_ready:
+                self.schedule("1s", rm.start_with_conditions)
 
-            thread = threading.Thread(target = steps, args = (rm,))
-            threads.append(thread)
-            thread.setDaemon(True)
-            thread.start()
-
-        while list(threads) and not self.finished and not stop:
-            thread = threads[0]
-            # Time out after 5 seconds to check EC not terminated
-            thread.join(1)
-            if not thread.is_alive():
-                threads.remove(thread)
-
-        if stop:
-            # stop the scheduler
-            self._stop_scheduler()
-
-            if self._thread.is_alive():
-               self._thread.join()
+            if rm.conditions.get(ResourceAction.STOP):
+                # Only if the RM has STOP conditions we
+                # schedule a stop. Otherwise the RM will stop immediately
+                self.schedule("2s", rm.stop_with_conditions)
 
-            raise RuntimeError, "Error occurred, interrupting deployment " 
 
     def release(self, group = None):
         if not group:
@@ -369,7 +358,7 @@ class ExperimentController(object):
             thread.join(5)
             if not thread.is_alive():
                 threads.remove(thread)
-
+        
     def shutdown(self):
         self.release()
 
@@ -417,7 +406,7 @@ class ExperimentController(object):
                 self._cond.acquire()
                 task = self._scheduler.next()
                 self._cond.release()
-
+                
                 if not task:
                     # It there are not tasks in the tasks queue we need to 
                     # wait until a call to schedule wakes us up
@@ -440,18 +429,17 @@ class ExperimentController(object):
                     else:
                         # Process tasks in parallel
                         runner.put(self._execute, task)
-                
         except: 
             import traceback
             err = traceback.format_exc()
             self._logger.error("Error while processing tasks in the EC: %s" % err)
 
             self._state = ECState.FAILED
-        finally:
-            runner.sync()
    
         # Mark EC state as terminated
         if self.ecstate == ECState.RUNNING:
+            # Synchronize to get errors if occurred
+            runner.sync()
             self._state = ECState.TERMINATED
 
     def _execute(self, task):
index 5529f36..5e0c72b 100644 (file)
@@ -335,7 +335,7 @@ class LinuxApplication(ResourceManager):
         else:
             try:
                 command = self.get("command") or ""
-                self.info(" Deploying command '%s' " % command)
+                self.info("Deploying command '%s' " % command)
                 self.discover()
                 self.provision()
             except:
index 77746a0..1223ed4 100644 (file)
@@ -27,7 +27,7 @@ import nepi
 import threading
 
 from nepi.resources.omf.omf_client import OMFClient
-from nepi.resources.omf.omf_messages_5_4 import MessageHandler
+from nepi.resources.omf.messages_5_4 import MessageHandler
 
 class OMFAPI(object):
     """