From 13c7f114712b9b49abc1c46357dc309afdbe7fb1 Mon Sep 17 00:00:00 2001 From: Sapan Bhatia Date: Mon, 2 Sep 2013 14:19:35 -0400 Subject: [PATCH] Class and object dependencies, schedules --- dmdot | 49 ++++++++ planetstack.deps | 47 ++++++++ planetstack/observer/event_loop.py | 135 ++++++++++++++++------ planetstack/observer/openstacksyncstep.py | 12 +- planetstack/observer/syncstep.py | 25 +++- 5 files changed, 217 insertions(+), 51 deletions(-) create mode 100755 dmdot create mode 100644 planetstack.deps diff --git a/dmdot b/dmdot new file mode 100755 index 0000000..2d95e9d --- /dev/null +++ b/dmdot @@ -0,0 +1,49 @@ +#!/usr/bin/python + +import os +import pdb +import sys +import json + +sys.path.append('.') + +os.environ.setdefault("DJANGO_SETTINGS_MODULE", "planetstack.settings") + +from django.db.models.fields.related import ForeignKey +from core.models import * + +try: + output = sys.args[1] +except: + output = '-json' + +g = globals() +model_classes = [] +class_names = [] +for c in g.values(): + if type(c)==type(PlCoreBase): + model_classes.append(c) + class_names.append(c.__name__) + + +if (output=='-dot'): + print "digraph plstack {"; + for c in model_classes: + fields = c._meta.fields + for f in fields: + if type(f)==ForeignKey and f.name.title() in class_names: + print '\t"%s"->"%s";'%(c.__name__,f.name.title()) + print "}\n"; +elif (output=='-json'): + d = {} + for c in model_classes: + fields = c._meta.fields + for f in fields: + if type(f)==ForeignKey and f.name.title() in class_names: + try: + d[c.__name__].append(f.name.title()) + except KeyError: + d[c.__name__]=[f.name.title()] + print json.dumps(d,indent=4) + + diff --git a/planetstack.deps b/planetstack.deps new file mode 100644 index 0000000..6eae1fc --- /dev/null +++ b/planetstack.deps @@ -0,0 +1,47 @@ +{ + "Node": [ + "Site", + "Deployment" + ], + "Slice": [ + "Site" + ], + "ReservedResource": [ + "Sliver" + ], + "SliceMembership": [ + "User", + "Slice", + "Role" + ], + "NetworkSlice": [ + "Network", + "Slice" + ], + "Tag": [ + "Project" + ], + "User": [ + "Site" + ], + "SliceTag": [ + "Slice" + ], + "Reservation": [ + "Slice" + ], + "NetworkSliver": [ + "Network", + "Sliver" + ], + "SitePrivilege": [ + "User", + "Site", + "Role" + ], + "Sliver": [ + "Image", + "Slice", + "Node" + ] +} diff --git a/planetstack/observer/event_loop.py b/planetstack/observer/event_loop.py index 4b11504..b565a15 100644 --- a/planetstack/observer/event_loop.py +++ b/planetstack/observer/event_loop.py @@ -12,9 +12,13 @@ from openstack.manager import OpenStackManager from util.logger import Logger, logging, logger #from timeout import timeout +debug_mode = False logger = Logger(logfile='observer.log', level=logging.INFO) +class StepNotReady(Exception): + pass + def toposort(g, steps): reverse = {} @@ -54,23 +58,23 @@ def toposort(g, steps): class PlanetStackObserver: sync_steps = ['SyncNetworks','SyncNetworkSlivers','SyncSites','SyncSitePrivileges','SyncSlices','SyncSliceMemberships','SyncSlivers','SyncSliverIps'] - def __init__(self): - self.manager = OpenStackManager() - # The Condition object that gets signalled by Feefie events + def __init__(self): + self.manager = OpenStackManager() + # The Condition object that gets signalled by Feefie events self.load_sync_steps() - self.event_cond = threading.Condition() + self.event_cond = threading.Condition() self.load_enacted() - def wait_for_event(self, timeout): - self.event_cond.acquire() - self.event_cond.wait(timeout) - self.event_cond.release() - - def wake_up(self): - logger.info('Wake up routine called. Event cond %r'%self.event_cond) - self.event_cond.acquire() - self.event_cond.notify() - self.event_cond.release() + def wait_for_event(self, timeout): + self.event_cond.acquire() + self.event_cond.wait(timeout) + self.event_cond.release() + + def wake_up(self): + logger.info('Wake up routine called. Event cond %r'%self.event_cond) + self.event_cond.acquire() + self.event_cond.notify() + self.event_cond.release() def load_sync_steps(self): dep_path = Config().pl_dependency_path @@ -100,6 +104,7 @@ class PlanetStackObserver: try: dest = provides_dict[m] except KeyError: + pass # no deps, pass step_graph[source]=dest @@ -121,6 +126,7 @@ class PlanetStackObserver: dest = backend_dict[m] except KeyError: # no deps, pass + pass step_graph[source]=dest except KeyError: @@ -130,32 +136,85 @@ class PlanetStackObserver: dependency_graph = step_graph self.ordered_steps = toposort(dependency_graph, steps) - + self.last_run_times={} + for e in self.ordered_steps: + self.last_run_times[e.name]=0 + + def check_duration(self): + try: + if (duration > S.deadline): + logger.info('Sync step %s missed deadline, took %.2f seconds'%(S.name,duration)) + except AttributeError: + # S doesn't have a deadline + pass - def run(self): - if not self.manager.enabled or not self.manager.has_openstack: - return + def update_run_time(self, step): + self.last_run_times[step.name]=time.time() - - while True: - try: - start_time=time.time() - - logger.info('Waiting for event') - tBeforeWait = time.time() - self.wait_for_event(timeout=300) + def check_schedule(self, step): + time_since_last_run = time.time() - self.last_run_times[step.name] + try: + if (time_since_last_run < step.requested_interval): + raise StepNotReady + except AttributeError: + logger.info('Step %s does not have requested_interval set'%step.name) + raise StepNotReady + + def check_class_dependency(self, step, failed_steps): + for failed_step in failed_steps: + if (failed_step in self.dependency_graph[step.name]): + raise StepNotReady + + def run(self): + if not self.manager.enabled or not self.manager.has_openstack: + return + + while True: + try: + logger.info('Waiting for event') + tBeforeWait = time.time() + self.wait_for_event(timeout=300) + logger.info('Observer woke up') + + # Set of whole steps that failed + failed_steps = [] + + # Set of individual objects within steps that failed + failed_step_objects = [] for S in self.ordered_steps: + start_time=time.time() + sync_step = S() - sync_step() - - # Enforce 5 minutes between wakeups - tSleep = 300 - (time.time() - tBeforeWait) - if tSleep > 0: - logger.info('Sleeping for %d seconds' % tSleep) - time.sleep(tSleep) - - logger.info('Observer woke up') - except: - logger.log_exc("Exception in observer run loop") - traceback.print_exc() + sync_step.dependencies = self.dependencies[sync_step.name] + sync_step.debug_mode = debug_mode + + should_run = False + try: + # Various checks that decide whether + # this step runs or not + self.check_class_dependency(sync_step, failed_steps) # dont run Slices if Sites failed + self.check_schedule(sync_step) # dont run sync_network_routes if time since last run < 1 hour + should_run = True + except StepNotReady: + logging.info('Step not ready: %s'%sync_step.name) + failed_steps.add(sync_step) + except: + failed_steps.add(sync_step) + + if (should_run): + try: + duration=time.time() - start_time + + # ********* This is the actual sync step + failed_objects = sync_step(failed=failed_step_objects) + + + check_deadline(sync_step, duration) + failed_step_objects.extend(failed_objects) + self.update_run_time(sync_step) + except: + failed_steps.add(S) + except: + logger.log_exc("Exception in observer run loop") + traceback.print_exc() diff --git a/planetstack/observer/openstacksyncstep.py b/planetstack/observer/openstacksyncstep.py index 7bfe9f4..3ce3c68 100644 --- a/planetstack/observer/openstacksyncstep.py +++ b/planetstack/observer/openstacksyncstep.py @@ -10,17 +10,7 @@ class OpenStackSyncStep: super(SyncStep,self).__init__(**args) return - def call(self): - pending = self.fetch_pending() - failed = [] - for o in pending: - if (not self.depends_on(o, failed)): - try: - self.sync_record(o) - o.enacted = datetime.now() # Is this the same timezone? XXX - o.save(update_fields=['enacted']) - except: - failed.append(o) + def __call__(self): diff --git a/planetstack/observer/syncstep.py b/planetstack/observer/syncstep.py index b206106..f3eb4ba 100644 --- a/planetstack/observer/syncstep.py +++ b/planetstack/observer/syncstep.py @@ -2,6 +2,9 @@ import os import base64 from planetstack.config import Config +class FailedDependency(Exception): + pass + class SyncStep: """ A PlanetStack Sync step. @@ -24,6 +27,7 @@ class SyncStep: name -- Name of the step provides -- PlanetStack models sync'd by this step """ + dependencies = [] try: self.soft_deadline = int(self.get_prop('soft_deadline_seconds')) except: @@ -33,9 +37,26 @@ class SyncStep: def fetch_pending(self): return Sliver.objects.filter(ip=None) + + def check_dependencies(self, obj): + for dep in dependencies: + peer_object = getattr(obj, dep.name.lowercase()) + if (peer_object.pk==dep.pk): + raise DependencyFailed - def call(self): - return True + def call(self, failed=failed_objects): + pending = self.fetch_pending() + failed = [] + for o in pending: + if (not self.depends_on(o, failed)): + try: + check_dependencies(o) # Raises exception if failed + self.sync_record(o) + o.enacted = datetime.now() # Is this the same timezone? XXX + o.save(update_fields=['enacted']) + except: + failed.append(o) + return failed def __call__(self): return self.call() -- 2.43.0