From 6ff37c462f6b230988933c31e9c2f433dc4e5900 Mon Sep 17 00:00:00 2001 From: Sapan Bhatia Date: Wed, 3 Sep 2014 05:28:42 -0400 Subject: [PATCH] Synchronization logic for parallel steps --- planetstack/ec2_observer/event_loop.py | 94 ++++++++++++++------------ 1 file changed, 49 insertions(+), 45 deletions(-) diff --git a/planetstack/ec2_observer/event_loop.py b/planetstack/ec2_observer/event_loop.py index 5a3dae9..f6b92ea 100644 --- a/planetstack/ec2_observer/event_loop.py +++ b/planetstack/ec2_observer/event_loop.py @@ -234,59 +234,65 @@ class PlanetStackObserver: for d in deps: cond = self.step_conditions[d] acquire(cond) - if (self.step_status is STEP_STATUS_WORKING): + if (self.step_status[S] is STEP_STATUS_WORKING): cond.wait() cond.release() - sync_step = step(driver=self.driver,error_map=error_mapper) - sync_step.__name__ = step.__name__ - sync_step.dependencies = [] - try: - mlist = sync_step.provides - - for m in mlist: - sync_step.dependencies.extend(self.model_dependency_graph[m.__name__]) - except KeyError: - pass - sync_step.debug_mode = debug_mode - - should_run = False - try: - # Various checks that decide whether - # this step runs or not - self.check_class_dependency(sync_step, self.failed_steps) # dont run Slices if Sites failed - self.check_schedule(sync_step, deletion) # dont run sync_network_routes if time since last run < 1 hour - should_run = True - except StepNotReady: - logging.info('Step not ready: %s'%sync_step.__name__) - self.failed_steps.append(sync_step) - except Exception,e: - logging.error('%r',e) - logger.log_exc("sync step failed: %r. Deletion: %r"%(sync_step,deletion)) + if (self.step_status[S] is not STEP_STATUS_OK): self.failed_steps.append(sync_step) + my_status = STEP_STATUS_KO + else: + sync_step = step(driver=self.driver,error_map=error_mapper) + sync_step.__name__ = step.__name__ + sync_step.dependencies = [] + try: + mlist = sync_step.provides + + for m in mlist: + sync_step.dependencies.extend(self.model_dependency_graph[m.__name__]) + except KeyError: + pass + sync_step.debug_mode = debug_mode - if (should_run): + should_run = False try: - duration=time.time() - start_time + # Various checks that decide whether + # this step runs or not + self.check_class_dependency(sync_step, self.failed_steps) # dont run Slices if Sites failed + self.check_schedule(sync_step, deletion) # dont run sync_network_routes if time since last run < 1 hour + should_run = True + except StepNotReady: + logging.info('Step not ready: %s'%sync_step.__name__) + self.failed_steps.append(sync_step) + my_status = STEP_STATUS_KO + except Exception,e: + logging.error('%r',e) + logger.log_exc("sync step failed: %r. Deletion: %r"%(sync_step,deletion)) + self.failed_steps.append(sync_step) + my_status = STEP_STATUS_KO + + if (should_run): + try: + duration=time.time() - start_time - logger.info('Executing step %s' % sync_step.__name__) + logger.info('Executing step %s' % sync_step.__name__) - failed_objects = sync_step(failed=list(self.failed_step_objects), deletion=deletion) + failed_objects = sync_step(failed=list(self.failed_step_objects), deletion=deletion) - self.check_duration(sync_step, duration) + self.check_duration(sync_step, duration) - if failed_objects: - self.failed_step_objects.update(failed_objects) + if failed_objects: + self.failed_step_objects.update(failed_objects) + my_status = STEP_STATUS_OK + self.update_run_time(sync_step,deletion) + except Exception,e: + logging.error('Model step failed. This seems like a misconfiguration or bug: %r. This error will not be relayed to the user!',e) + logger.log_exc(e) + self.failed_steps.append(S) + my_status = STEP_STATUS_KO + else: my_status = STEP_STATUS_OK - self.update_run_time(sync_step,deletion) - except Exception,e: - logging.error('Model step failed. This seems like a misconfiguration or bug: %r. This error will not be relayed to the user!',e) - logger.log_exc(e) - self.failed_steps.append(S) - my_status = STEP_STATUS_KO - else: - my_status = STEP_STATUS_OK try: my_cond = self.step_conditions[S] @@ -294,11 +300,9 @@ class PlanetStackObserver: self.step_status[S]=my_status my_cond.notify_all() my_cond.release() - except: + except KeyError,e: + logging.info('Step %r is a leaf') pass - if (self.step_conditions.has_key(S)): - - def run(self): if not self.driver.enabled: -- 2.47.0