X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=planetstack%2Fobserver%2Fevent_loop.py;h=1cd28d4c5c35922bf016534c25439aedd58c82e1;hb=2e0dc7fc4b93aa75a673619a5a764cd47187f583;hp=7ad3efc8104bae20fb69684f3378e79cf6c0a780;hpb=4fa85fb13f41338f263ae0b2b9d39aec87703d8b;p=plstackapi.git diff --git a/planetstack/observer/event_loop.py b/planetstack/observer/event_loop.py index 7ad3efc..1cd28d4 100644 --- a/planetstack/observer/event_loop.py +++ b/planetstack/observer/event_loop.py @@ -1,3 +1,6 @@ +import os +import imp +import inspect import time import traceback import commands @@ -14,72 +17,76 @@ from util.logger import Logger, logging, logger #from timeout import timeout from planetstack.config import Config from observer.steps import * +from syncstep import SyncStep +from toposort import toposort debug_mode = False -logger = Logger(logfile='observer.log', level=logging.INFO) +logger = Logger(level=logging.INFO) class StepNotReady(Exception): pass -def toposort(g, steps): - reverse = {} - - for k,v in g.items(): - for rk in v: - try: - reverse[rk].append(k) - except: - reverse[rk]=k - - sources = [] - for k,v in g.items(): - if not reverse.has_key(k): - sources.append(k) - - - for k,v in reverse.iteritems(): - if (not v): - sources.append(k) - - order = [] - marked = [] - - while sources: - n = sources.pop() - try: - for m in g[n]: - if m not in marked: - sources.append(m) - marked.append(m) - except KeyError: - pass - order.append(n) - return order +class NoOpDriver: + def __init__(self): + self.enabled = True class PlanetStackObserver: - sync_steps = [SyncNetworks,SyncNetworkSlivers,SyncSites,SyncSitePrivileges,SyncSlices,SyncSliceMemberships,SyncSlivers,SyncSliverIps,SyncExternalRoutes,SyncUsers,GarbageCollector] + #sync_steps = [SyncNetworks,SyncNetworkSlivers,SyncSites,SyncSitePrivileges,SyncSlices,SyncSliceMemberships,SyncSlivers,SyncSliverIps,SyncExternalRoutes,SyncUsers,SyncRoles,SyncNodes,SyncImages,GarbageCollector] + sync_steps = [] def __init__(self): # The Condition object that gets signalled by Feefie events self.step_lookup = {} + self.load_sync_step_modules() self.load_sync_steps() self.event_cond = threading.Condition() - self.driver = OpenStackDriver() + + + self.driver_kind = getattr(Config(), "observer_driver", "openstack") + if self.driver_kind=="openstack": + self.driver = OpenStackDriver() + else: + self.driver = NoOpDriver() def wait_for_event(self, timeout): + logger.info('Waiting for event') self.event_cond.acquire() self.event_cond.wait(timeout) self.event_cond.release() - + def wake_up(self): logger.info('Wake up routine called. Event cond %r'%self.event_cond) self.event_cond.acquire() self.event_cond.notify() self.event_cond.release() + def load_sync_step_modules(self, step_dir=None): + if step_dir is None: + if hasattr(Config(), "observer_steps_dir"): + step_dir = Config().observer_steps_dir + else: + step_dir = "/opt/planetstack/observer/steps" + + for fn in os.listdir(step_dir): + pathname = os.path.join(step_dir,fn) + if os.path.isfile(pathname) and fn.endswith(".py") and (fn!="__init__.py"): + module = imp.load_source(fn[:-3],pathname) + for classname in dir(module): + c = getattr(module, classname, None) + + # make sure 'c' is a descendent of SyncStep and has a + # provides field (this eliminates the abstract base classes + # since they don't have a provides) + + if inspect.isclass(c) and issubclass(c, SyncStep) and hasattr(c,"provides") and (c not in self.sync_steps): + self.sync_steps.append(c) + logger.info('loaded sync steps: %s' % ",".join([x.__name__ for x in self.sync_steps])) + # print 'loaded sync steps: %s' % ",".join([x.__name__ for x in self.sync_steps]) + def load_sync_steps(self): - dep_path = Config().observer_backend_dependency_graph + dep_path = Config().observer_dependency_graph + logger.info('Loading model dependency graph from %s' % dep_path) try: # This contains dependencies between records, not sync steps self.model_dependency_graph = json.loads(open(dep_path).read()) @@ -88,15 +95,17 @@ class PlanetStackObserver: try: backend_path = Config().observer_pl_dependency_graph + logger.info('Loading backend dependency graph from %s' % backend_path) # This contains dependencies between backend records self.backend_dependency_graph = json.loads(open(backend_path).read()) except Exception,e: + logger.info('Backend dependency graph not loaded') # We can work without a backend graph self.backend_dependency_graph = {} provides_dict = {} for s in self.sync_steps: - self.step_lookup[s.__name__] = s + self.step_lookup[s.__name__] = s for m in s.provides: try: provides_dict[m.__name__].append(s.__name__) @@ -113,7 +122,8 @@ class PlanetStackObserver: for dest in provides_dict[m]: # no deps, pass try: - step_graph[source].append(dest) + if (dest not in step_graph[source]): + step_graph[source].append(dest) except: step_graph[source]=[dest] except KeyError: @@ -148,7 +158,7 @@ class PlanetStackObserver: dependency_graph = step_graph - self.ordered_steps = toposort(dependency_graph, self.sync_steps) + self.ordered_steps = toposort(dependency_graph, map(lambda s:s.__name__,self.sync_steps)) print "Order of steps=",self.ordered_steps self.load_run_times() @@ -165,7 +175,7 @@ class PlanetStackObserver: self.last_run_times[step.__name__]=time.time() def check_schedule(self, step): - time_since_last_run = time.time() - self.last_run_times[step.__name__] + time_since_last_run = time.time() - self.last_run_times.get(step.__name__, 0) try: if (time_since_last_run < step.requested_interval): raise StepNotReady @@ -183,71 +193,101 @@ class PlanetStackObserver: self.last_run_times[e]=0 - def save_run_times(self): run_times = json.dumps(self.last_run_times) open('/tmp/observer_run_times','w').write(run_times) def check_class_dependency(self, step, failed_steps): + step.dependenices = [] + for obj in step.provides: + step.dependenices.extend(self.model_dependency_graph.get(obj.__name__, [])) for failed_step in failed_steps: - step.dependencies = self.model_dependency_graph.get(step.provides[0].__name__, []) if (failed_step in step.dependencies): raise StepNotReady - def run(self): - if not self.driver.enabled or not self.driver.has_openstack: - return - while True: - try: - logger.info('Waiting for event') - tBeforeWait = time.time() - self.wait_for_event(timeout=300) - logger.info('Observer woke up') + def run_steps(self): + try: + logger.info('Observer run steps') - # Set of whole steps that failed - failed_steps = [] + # Set of whole steps that failed + failed_steps = [] - # Set of individual objects within steps that failed - failed_step_objects = [] + # Set of individual objects within steps that failed + failed_step_objects = set() - for S in self.ordered_steps: - step = self.step_lookup[S] - start_time=time.time() + for S in self.ordered_steps: + step = self.step_lookup[S] + start_time=time.time() + + sync_step = step(driver=self.driver) + sync_step.__name__ = step.__name__ + sync_step.dependencies = [] + try: + mlist = sync_step.provides - sync_step = step(driver=self.driver) - sync_step.__name__ = step.__name__ - #sync_step.dependencies = self.dependencies[sync_step.name] - sync_step.debug_mode = debug_mode + for m in mlist: + sync_step.dependencies.extend(self.model_dependency_graph[m.__name__]) + except KeyError: + pass + sync_step.debug_mode = debug_mode - should_run = False + should_run = False + try: + # Various checks that decide whether + # this step runs or not + self.check_class_dependency(sync_step, failed_steps) # dont run Slices if Sites failed + self.check_schedule(sync_step) # dont run sync_network_routes if time since last run < 1 hour + should_run = True + except StepNotReady: + logging.info('Step not ready: %s'%sync_step.__name__) + failed_steps.append(sync_step) + except: + failed_steps.append(sync_step) + + if (should_run): try: - # Various checks that decide whether - # this step runs or not - self.check_class_dependency(sync_step, failed_steps) # dont run Slices if Sites failed - self.check_schedule(sync_step) # dont run sync_network_routes if time since last run < 1 hour - should_run = True - except StepNotReady: - logging.info('Step not ready: %s'%sync_step.__name__) - failed_steps.append(sync_step) + duration=time.time() - start_time + + logger.info('Executing step %s' % sync_step.__name__) + + # ********* This is the actual sync step + #import pdb + #pdb.set_trace() + failed_objects = sync_step(failed=list(failed_step_objects)) + + + self.check_duration(sync_step, duration) + if failed_objects: + failed_step_objects.update(failed_objects) + self.update_run_time(sync_step) except: - failed_steps.append(sync_step) + failed_steps.append(S) + self.save_run_times() + except: + logger.log_exc("Exception in observer run loop") + traceback.print_exc() - if (should_run): - try: - duration=time.time() - start_time - - # ********* This is the actual sync step - failed_objects = sync_step(failed=failed_step_objects) - - - self.check_duration(sync_step, duration) - if failed_objects: - failed_step_objects.extend(failed_objects) - self.update_run_time(sync_step) - except: - failed_steps.append(S) - self.save_run_times() - except: - logger.log_exc("Exception in observer run loop") - traceback.print_exc() + def run(self): + try: + logger.info('Observer start run loop') + if not self.driver.enabled: + return + if (self.driver_kind=="openstack") and (not self.driver.has_openstack): + return + + while True: + try: + self.wait_for_event(timeout=30) + except: + logger.log_exc("Exception in observer wait for event") + traceback.print_exc() + + try: + self.run_steps() + except: + logger.log_exc("Exception in observer run steps") + traceback.print_exc() + except: + logger.log_exc("Exception in observer run loop") + traceback.print_exc()