X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=planetstack%2Fobserver%2Fevent_loop.py;fp=planetstack%2Fobserver%2Fevent_loop.py;h=b565a1530b984358d4a842e2f29cb13ce5948789;hb=13c7f114712b9b49abc1c46357dc309afdbe7fb1;hp=4b1150403a90e43fbb869bc6979c3af016f5bde0;hpb=24836f18c96de50f93cacb2be40f311f66e4876a;p=plstackapi.git diff --git a/planetstack/observer/event_loop.py b/planetstack/observer/event_loop.py index 4b11504..b565a15 100644 --- a/planetstack/observer/event_loop.py +++ b/planetstack/observer/event_loop.py @@ -12,9 +12,13 @@ from openstack.manager import OpenStackManager from util.logger import Logger, logging, logger #from timeout import timeout +debug_mode = False logger = Logger(logfile='observer.log', level=logging.INFO) +class StepNotReady(Exception): + pass + def toposort(g, steps): reverse = {} @@ -54,23 +58,23 @@ def toposort(g, steps): class PlanetStackObserver: sync_steps = ['SyncNetworks','SyncNetworkSlivers','SyncSites','SyncSitePrivileges','SyncSlices','SyncSliceMemberships','SyncSlivers','SyncSliverIps'] - def __init__(self): - self.manager = OpenStackManager() - # The Condition object that gets signalled by Feefie events + def __init__(self): + self.manager = OpenStackManager() + # The Condition object that gets signalled by Feefie events self.load_sync_steps() - self.event_cond = threading.Condition() + self.event_cond = threading.Condition() self.load_enacted() - def wait_for_event(self, timeout): - self.event_cond.acquire() - self.event_cond.wait(timeout) - self.event_cond.release() - - def wake_up(self): - logger.info('Wake up routine called. Event cond %r'%self.event_cond) - self.event_cond.acquire() - self.event_cond.notify() - self.event_cond.release() + def wait_for_event(self, timeout): + self.event_cond.acquire() + self.event_cond.wait(timeout) + self.event_cond.release() + + def wake_up(self): + logger.info('Wake up routine called. Event cond %r'%self.event_cond) + self.event_cond.acquire() + self.event_cond.notify() + self.event_cond.release() def load_sync_steps(self): dep_path = Config().pl_dependency_path @@ -100,6 +104,7 @@ class PlanetStackObserver: try: dest = provides_dict[m] except KeyError: + pass # no deps, pass step_graph[source]=dest @@ -121,6 +126,7 @@ class PlanetStackObserver: dest = backend_dict[m] except KeyError: # no deps, pass + pass step_graph[source]=dest except KeyError: @@ -130,32 +136,85 @@ class PlanetStackObserver: dependency_graph = step_graph self.ordered_steps = toposort(dependency_graph, steps) - + self.last_run_times={} + for e in self.ordered_steps: + self.last_run_times[e.name]=0 + + def check_duration(self): + try: + if (duration > S.deadline): + logger.info('Sync step %s missed deadline, took %.2f seconds'%(S.name,duration)) + except AttributeError: + # S doesn't have a deadline + pass - def run(self): - if not self.manager.enabled or not self.manager.has_openstack: - return + def update_run_time(self, step): + self.last_run_times[step.name]=time.time() - - while True: - try: - start_time=time.time() - - logger.info('Waiting for event') - tBeforeWait = time.time() - self.wait_for_event(timeout=300) + def check_schedule(self, step): + time_since_last_run = time.time() - self.last_run_times[step.name] + try: + if (time_since_last_run < step.requested_interval): + raise StepNotReady + except AttributeError: + logger.info('Step %s does not have requested_interval set'%step.name) + raise StepNotReady + + def check_class_dependency(self, step, failed_steps): + for failed_step in failed_steps: + if (failed_step in self.dependency_graph[step.name]): + raise StepNotReady + + def run(self): + if not self.manager.enabled or not self.manager.has_openstack: + return + + while True: + try: + logger.info('Waiting for event') + tBeforeWait = time.time() + self.wait_for_event(timeout=300) + logger.info('Observer woke up') + + # Set of whole steps that failed + failed_steps = [] + + # Set of individual objects within steps that failed + failed_step_objects = [] for S in self.ordered_steps: + start_time=time.time() + sync_step = S() - sync_step() - - # Enforce 5 minutes between wakeups - tSleep = 300 - (time.time() - tBeforeWait) - if tSleep > 0: - logger.info('Sleeping for %d seconds' % tSleep) - time.sleep(tSleep) - - logger.info('Observer woke up') - except: - logger.log_exc("Exception in observer run loop") - traceback.print_exc() + sync_step.dependencies = self.dependencies[sync_step.name] + sync_step.debug_mode = debug_mode + + should_run = False + try: + # Various checks that decide whether + # this step runs or not + self.check_class_dependency(sync_step, failed_steps) # dont run Slices if Sites failed + self.check_schedule(sync_step) # dont run sync_network_routes if time since last run < 1 hour + should_run = True + except StepNotReady: + logging.info('Step not ready: %s'%sync_step.name) + failed_steps.add(sync_step) + except: + failed_steps.add(sync_step) + + if (should_run): + try: + duration=time.time() - start_time + + # ********* This is the actual sync step + failed_objects = sync_step(failed=failed_step_objects) + + + check_deadline(sync_step, duration) + failed_step_objects.extend(failed_objects) + self.update_run_time(sync_step) + except: + failed_steps.add(S) + except: + logger.log_exc("Exception in observer run loop") + traceback.print_exc()