From f3deba9f453d3911a1b7f3cb653d19be5be2982e Mon Sep 17 00:00:00 2001 From: Sapan Bhatia Date: Wed, 3 Sep 2014 11:29:22 -0400 Subject: [PATCH] Bug fixes to parallelization --- planetstack/ec2_observer/event_loop.py | 38 +++++++++++++++++--------- 1 file changed, 25 insertions(+), 13 deletions(-) diff --git a/planetstack/ec2_observer/event_loop.py b/planetstack/ec2_observer/event_loop.py index 88725db..77ae263 100644 --- a/planetstack/ec2_observer/event_loop.py +++ b/planetstack/ec2_observer/event_loop.py @@ -241,20 +241,31 @@ class PlanetStackObserver: step = self.step_lookup[S] start_time=time.time() + dependency_graph = self.dependency_graph if not deletion else self.deletion_dependency_graph + # Wait for step dependencies to be met - deps = self.dependency_graph[S] - for d in deps: - cond = self.step_conditions[d] - acquire(cond) - if (self.step_status[S] is STEP_STATUS_WORKING): - cond.wait() - cond.release() - - if (self.step_status[S] is not STEP_STATUS_OK): + try: + deps = self.dependency_graph[S] + has_deps = True + except KeyError: + has_deps = False + + if (has_deps): + for d in deps: + cond = self.step_conditions[d] + cond.acquire() + if (self.step_status[d] is STEP_STATUS_WORKING): + cond.wait() + cond.release() + go = self.step_status[d] == STEP_STATUS_OK + else: + go = True + + if (not go): self.failed_steps.append(sync_step) my_status = STEP_STATUS_KO else: - sync_step = step(driver=self.driver,error_map=error_mapper) + sync_step = step(driver=self.driver,error_map=self.error_mapper) sync_step.__name__ = step.__name__ sync_step.dependencies = [] try: @@ -361,10 +372,11 @@ class PlanetStackObserver: logger.info('Deletion=%r...'%deletion) schedule = self.ordered_steps if not deletion else reversed(self.ordered_steps) - thread = threading.Thread(target=self.sync, args=(schedule.start_conditions, schedule.ordered_steps,deletion, schedule.signal_sem)) + for S in schedule: + thread = threading.Thread(target=self.sync, args=(S, deletion)) - logger.info('Deletion=%r...'%deletion) - threads.append(thread) + logger.info('Deletion=%r...'%deletion) + threads.append(thread) # Start threads for t in threads: -- 2.47.0