Synchronization logic for parallel steps
authorSapan Bhatia <gwsapan@gmail.com>
Wed, 3 Sep 2014 09:28:42 +0000 (05:28 -0400)
committerSapan Bhatia <gwsapan@gmail.com>
Wed, 3 Sep 2014 09:28:42 +0000 (05:28 -0400)
planetstack/ec2_observer/event_loop.py

index 5a3dae9..f6b92ea 100644 (file)
@@ -234,59 +234,65 @@ class PlanetStackObserver:
                for d in deps:
                        cond = self.step_conditions[d]
                        acquire(cond)
-                       if (self.step_status is STEP_STATUS_WORKING):
+                       if (self.step_status[S] is STEP_STATUS_WORKING):
                                cond.wait()
                        cond.release()
 
-               sync_step = step(driver=self.driver,error_map=error_mapper)
-               sync_step.__name__ = step.__name__
-               sync_step.dependencies = []
-               try:
-                       mlist = sync_step.provides
-                       
-                       for m in mlist:
-                               sync_step.dependencies.extend(self.model_dependency_graph[m.__name__])
-               except KeyError:
-                       pass
-               sync_step.debug_mode = debug_mode
-
-               should_run = False
-               try:
-                       # Various checks that decide whether
-                       # this step runs or not
-                       self.check_class_dependency(sync_step, self.failed_steps) # dont run Slices if Sites failed
-                       self.check_schedule(sync_step, deletion) # dont run sync_network_routes if time since last run < 1 hour
-                       should_run = True
-               except StepNotReady:
-                       logging.info('Step not ready: %s'%sync_step.__name__)
-                       self.failed_steps.append(sync_step)
-               except Exception,e:
-                       logging.error('%r',e)
-                       logger.log_exc("sync step failed: %r. Deletion: %r"%(sync_step,deletion))
+               if (self.step_status[S] is not STEP_STATUS_OK):
                        self.failed_steps.append(sync_step)
+                       my_status = STEP_STATUS_KO
+               else:
+                       sync_step = step(driver=self.driver,error_map=error_mapper)
+                       sync_step.__name__ = step.__name__
+                       sync_step.dependencies = []
+                       try:
+                               mlist = sync_step.provides
+                               
+                               for m in mlist:
+                                       sync_step.dependencies.extend(self.model_dependency_graph[m.__name__])
+                       except KeyError:
+                               pass
+                       sync_step.debug_mode = debug_mode
 
-               if (should_run):
+                       should_run = False
                        try:
-                               duration=time.time() - start_time
+                               # Various checks that decide whether
+                               # this step runs or not
+                               self.check_class_dependency(sync_step, self.failed_steps) # dont run Slices if Sites failed
+                               self.check_schedule(sync_step, deletion) # dont run sync_network_routes if time since last run < 1 hour
+                               should_run = True
+                       except StepNotReady:
+                               logging.info('Step not ready: %s'%sync_step.__name__)
+                               self.failed_steps.append(sync_step)
+                               my_status = STEP_STATUS_KO
+                       except Exception,e:
+                               logging.error('%r',e)
+                               logger.log_exc("sync step failed: %r. Deletion: %r"%(sync_step,deletion))
+                               self.failed_steps.append(sync_step)
+                               my_status = STEP_STATUS_KO
+
+                       if (should_run):
+                               try:
+                                       duration=time.time() - start_time
 
-                               logger.info('Executing step %s' % sync_step.__name__)
+                                       logger.info('Executing step %s' % sync_step.__name__)
 
-                               failed_objects = sync_step(failed=list(self.failed_step_objects), deletion=deletion)
+                                       failed_objects = sync_step(failed=list(self.failed_step_objects), deletion=deletion)
 
-                               self.check_duration(sync_step, duration)
+                                       self.check_duration(sync_step, duration)
 
-                               if failed_objects:
-                                       self.failed_step_objects.update(failed_objects)
+                                       if failed_objects:
+                                               self.failed_step_objects.update(failed_objects)
 
+                                       my_status = STEP_STATUS_OK
+                                       self.update_run_time(sync_step,deletion)
+                               except Exception,e:
+                                       logging.error('Model step failed. This seems like a misconfiguration or bug: %r. This error will not be relayed to the user!',e)
+                                       logger.log_exc(e)
+                                       self.failed_steps.append(S)
+                                       my_status = STEP_STATUS_KO
+                       else:
                                my_status = STEP_STATUS_OK
-                               self.update_run_time(sync_step,deletion)
-                       except Exception,e:
-                               logging.error('Model step failed. This seems like a misconfiguration or bug: %r. This error will not be relayed to the user!',e)
-                               logger.log_exc(e)
-                               self.failed_steps.append(S)
-                               my_status = STEP_STATUS_KO
-               else:
-                       my_status = STEP_STATUS_OK
                
                try:
                        my_cond = self.step_conditions[S]
@@ -294,11 +300,9 @@ class PlanetStackObserver:
                        self.step_status[S]=my_status
                        my_cond.notify_all()
                        my_cond.release()
-               except:
+               except KeyError,e:
+                       logging.info('Step %r is a leaf')
                        pass
-               if (self.step_conditions.has_key(S)):
-
-
 
        def run(self):
                if not self.driver.enabled: