From 51f489340de333158a57d0fafeb5c284cd34b946 Mon Sep 17 00:00:00 2001 From: Sapan Bhatia Date: Mon, 25 Aug 2014 04:17:12 -0400 Subject: [PATCH] Initial plumbing for parallel Observer --- planetstack/ec2_observer/event_loop.py | 140 ++++++++++--------- planetstack/openstack_observer/event_loop.py | 128 +++++++++-------- 2 files changed, 146 insertions(+), 122 deletions(-) diff --git a/planetstack/ec2_observer/event_loop.py b/planetstack/ec2_observer/event_loop.py index b366f31..12e88ab 100644 --- a/planetstack/ec2_observer/event_loop.py +++ b/planetstack/ec2_observer/event_loop.py @@ -16,21 +16,21 @@ from openstack.driver import OpenStackDriver from util.logger import Logger, logging, logger #from timeout import timeout from planetstack.config import Config -from ec2_observer.steps import * +from observer.steps import * from syncstep import SyncStep from toposort import toposort -from ec2_observer.error_mapper import * +from observer.error_mapper import * debug_mode = False logger = Logger(level=logging.INFO) class StepNotReady(Exception): - pass + pass class NoOpDriver: - def __init__(self): - self.enabled = True + def __init__(self): + self.enabled = True class PlanetStackObserver: #sync_steps = [SyncNetworks,SyncNetworkSlivers,SyncSites,SyncSitePrivileges,SyncSlices,SyncSliceMemberships,SyncSlivers,SyncSliverIps,SyncExternalRoutes,SyncUsers,SyncRoles,SyncNodes,SyncImages,GarbageCollector] @@ -98,7 +98,7 @@ class PlanetStackObserver: # This contains dependencies between backend records self.backend_dependency_graph = json.loads(open(backend_path).read()) except Exception,e: - logger.info('Backend dependency graph not loaded: %s'%str(e)) + logger.info('Backend dependency graph not loaded') # We can work without a backend graph self.backend_dependency_graph = {} @@ -111,7 +111,6 @@ class PlanetStackObserver: except KeyError: provides_dict[m.__name__]=[s.__name__] - step_graph = {} for k,v in self.model_dependency_graph.iteritems(): try: @@ -221,6 +220,65 @@ class PlanetStackObserver: if (failed_step in step.dependencies): raise StepNotReady + def sync(self, ordered_steps, error_mapper, deletion): + # Set of whole steps that failed + failed_steps = [] + + # Set of individual objects within steps that failed + failed_step_objects = set() + + for S in ordered_steps: + step = self.step_lookup[S] + start_time=time.time() + + sync_step = step(driver=self.driver,error_map=error_mapper) + sync_step.__name__ = step.__name__ + sync_step.dependencies = [] + try: + mlist = sync_step.provides + + for m in mlist: + sync_step.dependencies.extend(self.model_dependency_graph[m.__name__]) + except KeyError: + pass + sync_step.debug_mode = debug_mode + + should_run = False + try: + # Various checks that decide whether + # this step runs or not + self.check_class_dependency(sync_step, failed_steps) # dont run Slices if Sites failed + self.check_schedule(sync_step, deletion) # dont run sync_network_routes if time since last run < 1 hour + should_run = True + except StepNotReady: + logging.info('Step not ready: %s'%sync_step.__name__) + failed_steps.append(sync_step) + except Exception,e: + logging.error('%r',e) + logger.log_exc("sync step failed: %r. Deletion: %r"%(sync_step,deletion)) + failed_steps.append(sync_step) + + if (should_run): + try: + duration=time.time() - start_time + + logger.info('Executing step %s' % sync_step.__name__) + + # ********* This is the actual sync step + #import pdb + #pdb.set_trace() + failed_objects = sync_step(failed=list(failed_step_objects), deletion=deletion) + + + self.check_duration(sync_step, duration) + if failed_objects: + failed_step_objects.update(failed_objects) + + self.update_run_time(sync_step,deletion) + except Exception,e: + logging.error('Model step failed. This seems like a misconfiguration or bug: %r. This error will not be relayed to the user!',e) + logger.log_exc(e) + failed_steps.append(S) def run(self): if not self.driver.enabled: return @@ -239,67 +297,21 @@ class PlanetStackObserver: # Two passes. One for sync, the other for deletion. for deletion in [False,True]: + threads = [] logger.info('Deletion=%r...'%deletion) - # Set of whole steps that failed - failed_steps = [] - - # Set of individual objects within steps that failed - failed_step_objects = set() - ordered_steps = self.ordered_steps if not deletion else reversed(self.ordered_steps) + thread = threading.Thread(target=self.sync, args=(ordered_steps,error_mapper,deletion)) + logger.info('Deletion=%r...'%deletion) + threads.append(thread) - for S in ordered_steps: - step = self.step_lookup[S] - start_time=time.time() - - sync_step = step(driver=self.driver,error_map=error_mapper) - sync_step.__name__ = step.__name__ - sync_step.dependencies = [] - try: - mlist = sync_step.provides - - for m in mlist: - sync_step.dependencies.extend(self.model_dependency_graph[m.__name__]) - except KeyError: - pass - sync_step.debug_mode = debug_mode + # Start threads + for t in threads: + t.start() + + # Wait for all threads to finish before continuing with the run loop + for t in threads: + t.join() - should_run = False - try: - # Various checks that decide whether - # this step runs or not - self.check_class_dependency(sync_step, failed_steps) # dont run Slices if Sites failed - self.check_schedule(sync_step, deletion) # dont run sync_network_routes if time since last run < 1 hour - should_run = True - except StepNotReady: - logging.info('Step not ready: %s'%sync_step.__name__) - failed_steps.append(sync_step) - except Exception,e: - logging.error('%r',e) - logger.log_exc("sync step failed: %r. Deletion: %r"%(sync_step,deletion)) - failed_steps.append(sync_step) - - if (should_run): - try: - duration=time.time() - start_time - - logger.info('Executing step %s' % sync_step.__name__) - - # ********* This is the actual sync step - #import pdb - #pdb.set_trace() - failed_objects = sync_step(failed=list(failed_step_objects), deletion=deletion) - - - self.check_duration(sync_step, duration) - if failed_objects: - failed_step_objects.update(failed_objects) - - self.update_run_time(sync_step,deletion) - except Exception,e: - logging.error('Model step failed. This seems like a misconfiguration or bug: %r. This error will not be relayed to the user!',e) - logger.log_exc(e) - failed_steps.append(S) self.save_run_times() except Exception, e: logging.error('Core error. This seems like a misconfiguration or bug: %r. This error will not be relayed to the user!',e) diff --git a/planetstack/openstack_observer/event_loop.py b/planetstack/openstack_observer/event_loop.py index 90f99ad..12e88ab 100644 --- a/planetstack/openstack_observer/event_loop.py +++ b/planetstack/openstack_observer/event_loop.py @@ -111,7 +111,6 @@ class PlanetStackObserver: except KeyError: provides_dict[m.__name__]=[s.__name__] - step_graph = {} for k,v in self.model_dependency_graph.iteritems(): try: @@ -221,6 +220,65 @@ class PlanetStackObserver: if (failed_step in step.dependencies): raise StepNotReady + def sync(self, ordered_steps, error_mapper, deletion): + # Set of whole steps that failed + failed_steps = [] + + # Set of individual objects within steps that failed + failed_step_objects = set() + + for S in ordered_steps: + step = self.step_lookup[S] + start_time=time.time() + + sync_step = step(driver=self.driver,error_map=error_mapper) + sync_step.__name__ = step.__name__ + sync_step.dependencies = [] + try: + mlist = sync_step.provides + + for m in mlist: + sync_step.dependencies.extend(self.model_dependency_graph[m.__name__]) + except KeyError: + pass + sync_step.debug_mode = debug_mode + + should_run = False + try: + # Various checks that decide whether + # this step runs or not + self.check_class_dependency(sync_step, failed_steps) # dont run Slices if Sites failed + self.check_schedule(sync_step, deletion) # dont run sync_network_routes if time since last run < 1 hour + should_run = True + except StepNotReady: + logging.info('Step not ready: %s'%sync_step.__name__) + failed_steps.append(sync_step) + except Exception,e: + logging.error('%r',e) + logger.log_exc("sync step failed: %r. Deletion: %r"%(sync_step,deletion)) + failed_steps.append(sync_step) + + if (should_run): + try: + duration=time.time() - start_time + + logger.info('Executing step %s' % sync_step.__name__) + + # ********* This is the actual sync step + #import pdb + #pdb.set_trace() + failed_objects = sync_step(failed=list(failed_step_objects), deletion=deletion) + + + self.check_duration(sync_step, duration) + if failed_objects: + failed_step_objects.update(failed_objects) + + self.update_run_time(sync_step,deletion) + except Exception,e: + logging.error('Model step failed. This seems like a misconfiguration or bug: %r. This error will not be relayed to the user!',e) + logger.log_exc(e) + failed_steps.append(S) def run(self): if not self.driver.enabled: return @@ -239,67 +297,21 @@ class PlanetStackObserver: # Two passes. One for sync, the other for deletion. for deletion in [False,True]: + threads = [] logger.info('Deletion=%r...'%deletion) - # Set of whole steps that failed - failed_steps = [] - - # Set of individual objects within steps that failed - failed_step_objects = set() - ordered_steps = self.ordered_steps if not deletion else reversed(self.ordered_steps) + thread = threading.Thread(target=self.sync, args=(ordered_steps,error_mapper,deletion)) + logger.info('Deletion=%r...'%deletion) + threads.append(thread) - for S in ordered_steps: - step = self.step_lookup[S] - start_time=time.time() - - sync_step = step(driver=self.driver,error_map=error_mapper) - sync_step.__name__ = step.__name__ - sync_step.dependencies = [] - try: - mlist = sync_step.provides - - for m in mlist: - sync_step.dependencies.extend(self.model_dependency_graph[m.__name__]) - except KeyError: - pass - sync_step.debug_mode = debug_mode + # Start threads + for t in threads: + t.start() + + # Wait for all threads to finish before continuing with the run loop + for t in threads: + t.join() - should_run = False - try: - # Various checks that decide whether - # this step runs or not - self.check_class_dependency(sync_step, failed_steps) # dont run Slices if Sites failed - self.check_schedule(sync_step, deletion) # dont run sync_network_routes if time since last run < 1 hour - should_run = True - except StepNotReady: - logging.info('Step not ready: %s'%sync_step.__name__) - failed_steps.append(sync_step) - except Exception,e: - logging.error('%r',e) - logger.log_exc("sync step failed: %r. Deletion: %r"%(sync_step,deletion)) - failed_steps.append(sync_step) - - if (should_run): - try: - duration=time.time() - start_time - - logger.info('Executing step %s' % sync_step.__name__) - - # ********* This is the actual sync step - #import pdb - #pdb.set_trace() - failed_objects = sync_step(failed=list(failed_step_objects), deletion=deletion) - - - self.check_duration(sync_step, duration) - if failed_objects: - failed_step_objects.update(failed_objects) - - self.update_run_time(sync_step,deletion) - except Exception,e: - logging.error('Model step failed. This seems like a misconfiguration or bug: %r. This error will not be relayed to the user!',e) - logger.log_exc(e) - failed_steps.append(S) self.save_run_times() except Exception, e: logging.error('Core error. This seems like a misconfiguration or bug: %r. This error will not be relayed to the user!',e) -- 2.43.0