From b1c9e8465031caee7ca146a60be2d31fbe781bbb Mon Sep 17 00:00:00 2001 From: Alina Quereilhac Date: Thu, 23 May 2013 00:45:01 +0200 Subject: [PATCH] Scheduler optimization --- examples/linux/scalability.py | 10 +-- src/nepi/execution/ec.py | 94 +++++++++++-------------- src/nepi/resources/linux/application.py | 2 +- src/nepi/resources/omf/omf_api.py | 2 +- 4 files changed, 48 insertions(+), 60 deletions(-) diff --git a/examples/linux/scalability.py b/examples/linux/scalability.py index b85d25bd..86f6a3f9 100644 --- a/examples/linux/scalability.py +++ b/examples/linux/scalability.py @@ -64,7 +64,7 @@ if __name__ == '__main__': apps = [] hostnames = [ - "planetlab-2.research.netlab.hut.fi", + #"planetlab-2.research.netlab.hut.fi", "planetlab2.willab.fi", "planetlab3.hiit.fi", "planetlab4.hiit.fi", @@ -74,10 +74,10 @@ if __name__ == '__main__': "planetlab-1.ida.liu.se", "planetlab2.s3.kth.se", "planetlab1.sics.se", - "planetlab1.tlm.unavarra.es", - "planetlab2.uc3m.es", - "planetlab1.uc3m.es", - "planetlab2.um.es", + #"planetlab1.tlm.unavarra.es", + #"planetlab2.uc3m.es", + #"planetlab1.uc3m.es", + #"planetlab2.um.es", "planet1.servers.ua.pt", "planetlab2.fct.ualg.pt", "planetlab-1.tagus.ist.utl.pt", diff --git a/src/nepi/execution/ec.py b/src/nepi/execution/ec.py index 42bb1387..d6fcf871 100644 --- a/src/nepi/execution/ec.py +++ b/src/nepi/execution/ec.py @@ -17,6 +17,7 @@ """ +import functools import logging import os import random @@ -285,71 +286,59 @@ class ExperimentController(object): """ self.logger.debug(" ------- DEPLOY START ------ ") - stop = [] - - def steps(rm): - try: - rm.deploy() - rm.start_with_conditions() - - # Only if the RM has STOP conditions we - # schedule a stop. Otherwise the RM will stop immediately - if rm.conditions.get(ResourceAction.STOP): - rm.stop_with_conditions() - except: - import traceback - err = traceback.format_exc() - - self._logger.error("Error occurred while deploying resources: %s" % err) - - # stop deployment - stop.append(None) - if not group: group = self.resources # Before starting deployment we disorder the group list with the # purpose of speeding up the whole deployment process. # It is likely that the user inserted in the 'group' list closely - # resources resources one after another (e.g. all applications + # resources one after another (e.g. all applications # connected to the same node can likely appear one after another). # This can originate a slow down in the deployment since the N # threads the parallel runner uses to processes tasks may all # be taken up by the same family of resources waiting for the - # same conditions. - # If we disorder the group list, this problem can be mitigated + # same conditions (e.g. LinuxApplications running on a same + # node share a single lock, so they will tend to be serialized). + # If we disorder the group list, this problem can be mitigated. random.shuffle(group) - threads = [] + def wait_all_and_start(group): + reschedule = False + for guid in group: + rm = self.get_resource(guid) + if rm.state < ResourceState.READY: + reschedule = True + break + + if reschedule: + callback = functools.partial(wait_all_and_start, group) + self.schedule("1s", callback) + else: + # If all resources are read, we schedule the start + for guid in group: + rm = self.get_resource(guid) + self.schedule("0.01s", rm.start_with_conditions) + + if wait_all_ready: + # Schedule the function that will check all resources are + # READY, and only then it will schedule the start. + # This is aimed to reduce the number of tasks looping in the scheduler. + # Intead of having N start tasks, we will have only one + callback = functools.partial(wait_all_and_start, group) + self.schedule("1s", callback) + for guid in group: rm = self.get_resource(guid) + self.schedule("0.001s", rm.deploy) - if wait_all_ready: - towait = list(group) - towait.remove(guid) - self.register_condition(guid, ResourceAction.START, - towait, ResourceState.READY) + if not wait_all_ready: + self.schedule("1s", rm.start_with_conditions) - thread = threading.Thread(target = steps, args = (rm,)) - threads.append(thread) - thread.setDaemon(True) - thread.start() - - while list(threads) and not self.finished and not stop: - thread = threads[0] - # Time out after 5 seconds to check EC not terminated - thread.join(1) - if not thread.is_alive(): - threads.remove(thread) - - if stop: - # stop the scheduler - self._stop_scheduler() - - if self._thread.is_alive(): - self._thread.join() + if rm.conditions.get(ResourceAction.STOP): + # Only if the RM has STOP conditions we + # schedule a stop. Otherwise the RM will stop immediately + self.schedule("2s", rm.stop_with_conditions) - raise RuntimeError, "Error occurred, interrupting deployment " def release(self, group = None): if not group: @@ -369,7 +358,7 @@ class ExperimentController(object): thread.join(5) if not thread.is_alive(): threads.remove(thread) - + def shutdown(self): self.release() @@ -417,7 +406,7 @@ class ExperimentController(object): self._cond.acquire() task = self._scheduler.next() self._cond.release() - + if not task: # It there are not tasks in the tasks queue we need to # wait until a call to schedule wakes us up @@ -440,18 +429,17 @@ class ExperimentController(object): else: # Process tasks in parallel runner.put(self._execute, task) - except: import traceback err = traceback.format_exc() self._logger.error("Error while processing tasks in the EC: %s" % err) self._state = ECState.FAILED - finally: - runner.sync() # Mark EC state as terminated if self.ecstate == ECState.RUNNING: + # Synchronize to get errors if occurred + runner.sync() self._state = ECState.TERMINATED def _execute(self, task): diff --git a/src/nepi/resources/linux/application.py b/src/nepi/resources/linux/application.py index 5529f36f..5e0c72b5 100644 --- a/src/nepi/resources/linux/application.py +++ b/src/nepi/resources/linux/application.py @@ -335,7 +335,7 @@ class LinuxApplication(ResourceManager): else: try: command = self.get("command") or "" - self.info(" Deploying command '%s' " % command) + self.info("Deploying command '%s' " % command) self.discover() self.provision() except: diff --git a/src/nepi/resources/omf/omf_api.py b/src/nepi/resources/omf/omf_api.py index 77746a02..1223ed49 100644 --- a/src/nepi/resources/omf/omf_api.py +++ b/src/nepi/resources/omf/omf_api.py @@ -27,7 +27,7 @@ import nepi import threading from nepi.resources.omf.omf_client import OMFClient -from nepi.resources.omf.omf_messages_5_4 import MessageHandler +from nepi.resources.omf.messages_5_4 import MessageHandler class OMFAPI(object): """ -- 2.43.0