X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=planetstack%2Fec2_observer%2Fevent_loop.py;fp=planetstack%2Fec2_observer%2Fevent_loop.py;h=5a3dae99f13da445c460a0df93cb9d2449467b18;hb=4a1335ce7adbd530af6f498539e1950adff7f0f7;hp=caabd1e32afe8d300cc26a1683a6fb3e819e9401;hpb=97e18bd6fabf05ca40526b07f87e1f70452984ec;p=plstackapi.git diff --git a/planetstack/ec2_observer/event_loop.py b/planetstack/ec2_observer/event_loop.py index caabd1e..5a3dae9 100644 --- a/planetstack/ec2_observer/event_loop.py +++ b/planetstack/ec2_observer/event_loop.py @@ -31,11 +31,16 @@ class StepNotReady(Exception): class NoOpDriver: def __init__(self): self.enabled = True + self.dependency_graph = None class PlanetStackObserver: #sync_steps = [SyncNetworks,SyncNetworkSlivers,SyncSites,SyncSitePrivileges,SyncSlices,SyncSliceMemberships,SyncSlivers,SyncSliverIps,SyncExternalRoutes,SyncUsers,SyncRoles,SyncNodes,SyncImages,GarbageCollector] sync_steps = [] + STEP_STATUS_WORKING=1 + STEP_STATUS_OK=2 + STEP_STATUS_KO=3 + def __init__(self): # The Condition object that gets signalled by Feefie events self.step_lookup = {} @@ -154,9 +159,9 @@ class PlanetStackObserver: pass # no dependencies, pass - dependency_graph = step_graph + self.dependency_graph = step_graph - self.ordered_steps = toposort(dependency_graph, map(lambda s:s.__name__,self.sync_steps)) + self.ordered_steps = toposort(self.dependency_graph, map(lambda s:s.__name__,self.sync_steps)) print "Order of steps=",self.ordered_steps self.load_run_times() @@ -220,75 +225,113 @@ class PlanetStackObserver: if (failed_step in step.dependencies): raise StepNotReady - def sync(self, ordered_steps, error_mapper, deletion): - # Set of whole steps that failed - failed_steps = [] + def sync(self, S, deletion): + step = self.step_lookup[S] + start_time=time.time() + + # Wait for step dependencies to be met + deps = self.dependency_graph[S] + for d in deps: + cond = self.step_conditions[d] + acquire(cond) + if (self.step_status is STEP_STATUS_WORKING): + cond.wait() + cond.release() + + sync_step = step(driver=self.driver,error_map=error_mapper) + sync_step.__name__ = step.__name__ + sync_step.dependencies = [] + try: + mlist = sync_step.provides + + for m in mlist: + sync_step.dependencies.extend(self.model_dependency_graph[m.__name__]) + except KeyError: + pass + sync_step.debug_mode = debug_mode - # Set of individual objects within steps that failed - failed_step_objects = set() + should_run = False + try: + # Various checks that decide whether + # this step runs or not + self.check_class_dependency(sync_step, self.failed_steps) # dont run Slices if Sites failed + self.check_schedule(sync_step, deletion) # dont run sync_network_routes if time since last run < 1 hour + should_run = True + except StepNotReady: + logging.info('Step not ready: %s'%sync_step.__name__) + self.failed_steps.append(sync_step) + except Exception,e: + logging.error('%r',e) + logger.log_exc("sync step failed: %r. Deletion: %r"%(sync_step,deletion)) + self.failed_steps.append(sync_step) - for S in ordered_steps: - step = self.step_lookup[S] - start_time=time.time() - - sync_step = step(driver=self.driver,error_map=error_mapper) - sync_step.__name__ = step.__name__ - sync_step.dependencies = [] + if (should_run): try: - mlist = sync_step.provides - - for m in mlist: - sync_step.dependencies.extend(self.model_dependency_graph[m.__name__]) - except KeyError: - pass - sync_step.debug_mode = debug_mode + duration=time.time() - start_time - should_run = False - try: - # Various checks that decide whether - # this step runs or not - self.check_class_dependency(sync_step, failed_steps) # dont run Slices if Sites failed - self.check_schedule(sync_step, deletion) # dont run sync_network_routes if time since last run < 1 hour - should_run = True - except StepNotReady: - logging.info('Step not ready: %s'%sync_step.__name__) - failed_steps.append(sync_step) - except Exception,e: - logging.error('%r',e) - logger.log_exc("sync step failed: %r. Deletion: %r"%(sync_step,deletion)) - failed_steps.append(sync_step) + logger.info('Executing step %s' % sync_step.__name__) - if (should_run): - try: - duration=time.time() - start_time + failed_objects = sync_step(failed=list(self.failed_step_objects), deletion=deletion) + + self.check_duration(sync_step, duration) - logger.info('Executing step %s' % sync_step.__name__) + if failed_objects: + self.failed_step_objects.update(failed_objects) - # ********* This is the actual sync step - #import pdb - #pdb.set_trace() - failed_objects = sync_step(failed=list(failed_step_objects), deletion=deletion) + my_status = STEP_STATUS_OK + self.update_run_time(sync_step,deletion) + except Exception,e: + logging.error('Model step failed. This seems like a misconfiguration or bug: %r. This error will not be relayed to the user!',e) + logger.log_exc(e) + self.failed_steps.append(S) + my_status = STEP_STATUS_KO + else: + my_status = STEP_STATUS_OK + + try: + my_cond = self.step_conditions[S] + my_cond.acquire() + self.step_status[S]=my_status + my_cond.notify_all() + my_cond.release() + except: + pass + if (self.step_conditions.has_key(S)): - self.check_duration(sync_step, duration) - if failed_objects: - failed_step_objects.update(failed_objects) - self.update_run_time(sync_step,deletion) - except Exception,e: - logging.error('Model step failed. This seems like a misconfiguration or bug: %r. This error will not be relayed to the user!',e) - logger.log_exc(e) - failed_steps.append(S) def run(self): if not self.driver.enabled: return + if (self.driver_kind=="openstack") and (not self.driver.has_openstack): return while True: try: error_map_file = getattr(Config(), "error_map_path", "/opt/planetstack/error_map.txt") - error_mapper = ErrorMapper(error_map_file) + self.error_mapper = ErrorMapper(error_map_file) + + # Set of whole steps that failed + self.failed_steps = [] + + # Set of individual objects within steps that failed + self.failed_step_objects = set() + + # Set up conditions and step status + # This is needed for steps to run in parallel + # while obeying dependencies. + + providers = set() + for v in self.dependency_graph.values(): + if (v): + providers.update(v) + self.step_conditions = {} + self.step_status = {} + for p in list(providers): + self.step_conditions[p] = threading.Condition() + self.step_status[p] = STEP_STATUS_IDLE + logger.info('Waiting for event') tBeforeWait = time.time() @@ -299,8 +342,10 @@ class PlanetStackObserver: for deletion in [False,True]: threads = [] logger.info('Deletion=%r...'%deletion) - ordered_steps = self.ordered_steps if not deletion else reversed(self.ordered_steps) - thread = threading.Thread(target=self.sync, args=(ordered_steps,error_mapper,deletion)) + schedule = self.sync_schedule if not deletion else self.delete_schedule + + thread = threading.Thread(target=self.sync, args=(schedule.start_conditions, schedule.ordered_steps,deletion, schedule.signal_sem)) + logger.info('Deletion=%r...'%deletion) threads.append(thread)