From 285decbfb49359489309449c60de59216d9ca94b Mon Sep 17 00:00:00 2001 From: Sapan Bhatia Date: Wed, 30 Apr 2014 00:31:44 -0400 Subject: [PATCH] Changed core Observer logic to incorporate explicit deletions. --- planetstack/observer/event_loop.py | 136 +++++++++++++++++------------ 1 file changed, 81 insertions(+), 55 deletions(-) diff --git a/planetstack/observer/event_loop.py b/planetstack/observer/event_loop.py index 3c8c1ee..ad9c073 100644 --- a/planetstack/observer/event_loop.py +++ b/planetstack/observer/event_loop.py @@ -170,11 +170,17 @@ class PlanetStackObserver: # S doesn't have a deadline pass - def update_run_time(self, step): - self.last_run_times[step.__name__]=time.time() + def update_run_time(self, step, deletion): + if (not deletion): + self.last_run_times[step.__name__]=time.time() + else: + self.last_deletion_run_times[step.__name__]=time.time() + - def check_schedule(self, step): - time_since_last_run = time.time() - self.last_run_times.get(step.__name__, 0) + def check_schedule(self, step, deletion): + last_run_times = self.last_run_times if not deletion else self.last_deletion_run_times + + time_since_last_run = time.time() - last_run_times.get(step.__name__, 0) try: if (time_since_last_run < step.requested_interval): raise StepNotReady @@ -190,12 +196,23 @@ class PlanetStackObserver: self.last_run_times={} for e in self.ordered_steps: self.last_run_times[e]=0 + try: + jrun_times = open('/tmp/observer_deletion_run_times').read() + self.last_deletion_run_times = json.loads(jrun_times) + except: + self.last_deletion_run_times={} + for e in self.ordered_steps: + self.last_deletion_run_times[e]=0 + def save_run_times(self): run_times = json.dumps(self.last_run_times) open('/tmp/observer_run_times','w').write(run_times) + deletion_run_times = json.dumps(self.last_deletion_run_times) + open('/tmp/observer_deletion_run_times','w').write(deletion_run_times) + def check_class_dependency(self, step, failed_steps): step.dependenices = [] for obj in step.provides: @@ -220,63 +237,72 @@ class PlanetStackObserver: self.wait_for_event(timeout=30) logger.info('Observer woke up') - # Set of whole steps that failed - failed_steps = [] + # Two passes. One for sync, the other for deletion. + for deletion in (False,True): + logger.info('Creation pass...') + # Set of whole steps that failed + failed_steps = [] - # Set of individual objects within steps that failed - failed_step_objects = set() + # Set of individual objects within steps that failed + failed_step_objects = set() - for S in self.ordered_steps: - step = self.step_lookup[S] - start_time=time.time() - - sync_step = step(driver=self.driver,error_map=error_mapper) - sync_step.__name__ = step.__name__ - sync_step.dependencies = [] - try: - mlist = sync_step.provides + ordered_steps = self.ordered_steps if not deletion else reversed(self.ordered_steps) + + for S in ordered_steps: + step = self.step_lookup[S] + start_time=time.time() - for m in mlist: - sync_step.dependencies.extend(self.model_dependency_graph[m.__name__]) - except KeyError: - pass - sync_step.debug_mode = debug_mode - - should_run = False - try: - # Various checks that decide whether - # this step runs or not - self.check_class_dependency(sync_step, failed_steps) # dont run Slices if Sites failed - self.check_schedule(sync_step) # dont run sync_network_routes if time since last run < 1 hour - should_run = True - except StepNotReady: - logging.info('Step not ready: %s'%sync_step.__name__) - failed_steps.append(sync_step) - except Exception,e: - logging.error('%r',e) - logger.log_exc("sync step failed: %r!"%sync_step) - failed_steps.append(sync_step) - - if (should_run): + sync_step = step(driver=self.driver,error_map=error_mapper) + sync_step.__name__ = step.__name__ + sync_step.dependencies = [] try: - duration=time.time() - start_time - - logger.info('Executing step %s' % sync_step.__name__) - - # ********* This is the actual sync step - #import pdb - #pdb.set_trace() - failed_objects = sync_step(failed=list(failed_step_objects)) - + mlist = sync_step.provides + + for m in mlist: + sync_step.dependencies.extend(self.model_dependency_graph[m.__name__]) + except KeyError: + pass + sync_step.debug_mode = debug_mode - self.check_duration(sync_step, duration) - if failed_objects: - failed_step_objects.update(failed_objects) - self.update_run_time(sync_step) + should_run = False + try: + # Various checks that decide whether + # this step runs or not + self.check_class_dependency(sync_step, failed_steps) # dont run Slices if Sites failed + self.check_schedule(sync_step,deletion) # dont run sync_network_routes if time since last run < 1 hour + should_run = True + except StepNotReady: + logging.info('Step not ready: %s'%sync_step.__name__) + failed_steps.append(sync_step) except Exception,e: - logging.error('Model step failed. This seems like a misconfiguration or bug: %r. This error will not be relayed to the user!',e) - logger.log_exc(e) - failed_steps.append(S) + logging.error('%r',e) + logger.log_exc("sync step failed: %r. Deletion: %r"%(sync_step,deletion)) + failed_steps.append(sync_step) + + if (should_run): + try: + duration=time.time() - start_time + + logger.info('Executing step %s' % sync_step.__name__) + + # ********* This is the actual sync step + #import pdb + #pdb.set_trace() + failed_objects = sync_step(failed=list(failed_step_objects), deletion=deletion) + + + self.check_duration(sync_step, duration) + if failed_objects: + failed_step_objects.update(failed_objects) + + if (not deletion): + self.update_run_time(sync_step) + else: + self.update_deletion_run_time(sync_step) + except Exception,e: + logging.error('Model step failed. This seems like a misconfiguration or bug: %r. This error will not be relayed to the user!',e) + logger.log_exc(e) + failed_steps.append(S) self.save_run_times() except Exception, e: logging.error('Core error. This seems like a misconfiguration or bug: %r. This error will not be relayed to the user!',e) -- 2.43.0