Class and object dependencies, schedules
authorSapan Bhatia <gwsapan@gmail.com>
Mon, 2 Sep 2013 18:19:35 +0000 (14:19 -0400)
committerSapan Bhatia <gwsapan@gmail.com>
Mon, 2 Sep 2013 18:19:35 +0000 (14:19 -0400)
dmdot [new file with mode: 0755]
planetstack.deps [new file with mode: 0644]
planetstack/observer/event_loop.py
planetstack/observer/openstacksyncstep.py
planetstack/observer/syncstep.py

diff --git a/dmdot b/dmdot
new file mode 100755 (executable)
index 0000000..2d95e9d
--- /dev/null
+++ b/dmdot
@@ -0,0 +1,49 @@
+#!/usr/bin/python
+
+import os
+import pdb
+import sys
+import json
+
+sys.path.append('.')
+
+os.environ.setdefault("DJANGO_SETTINGS_MODULE", "planetstack.settings")
+
+from django.db.models.fields.related import ForeignKey
+from core.models import *
+
+try:
+       output = sys.args[1]
+except:
+       output = '-json'
+
+g = globals()
+model_classes = []
+class_names = []
+for c in g.values():
+       if type(c)==type(PlCoreBase):
+               model_classes.append(c)
+               class_names.append(c.__name__)
+
+
+if (output=='-dot'):
+       print "digraph plstack {";
+       for c in model_classes:
+               fields = c._meta.fields
+               for f in fields:
+                       if type(f)==ForeignKey and f.name.title() in class_names:
+                               print '\t"%s"->"%s";'%(c.__name__,f.name.title())
+       print "}\n";
+elif (output=='-json'):
+       d = {}
+       for c in model_classes:
+               fields = c._meta.fields
+               for f in fields:
+                       if type(f)==ForeignKey and f.name.title() in class_names:
+                               try:
+                                       d[c.__name__].append(f.name.title())
+                               except KeyError:
+                                       d[c.__name__]=[f.name.title()]
+       print json.dumps(d,indent=4)
+       
+       
diff --git a/planetstack.deps b/planetstack.deps
new file mode 100644 (file)
index 0000000..6eae1fc
--- /dev/null
@@ -0,0 +1,47 @@
+{
+    "Node": [
+        "Site", 
+        "Deployment"
+    ], 
+    "Slice": [
+        "Site"
+    ], 
+    "ReservedResource": [
+        "Sliver"
+    ], 
+    "SliceMembership": [
+        "User", 
+        "Slice", 
+        "Role"
+    ], 
+    "NetworkSlice": [
+        "Network", 
+        "Slice"
+    ], 
+    "Tag": [
+        "Project"
+    ], 
+    "User": [
+        "Site"
+    ], 
+    "SliceTag": [
+        "Slice"
+    ], 
+    "Reservation": [
+        "Slice"
+    ], 
+    "NetworkSliver": [
+        "Network", 
+        "Sliver"
+    ], 
+    "SitePrivilege": [
+        "User", 
+        "Site", 
+        "Role"
+    ], 
+    "Sliver": [
+        "Image", 
+        "Slice", 
+        "Node"
+    ]
+}
index 4b11504..b565a15 100644 (file)
@@ -12,9 +12,13 @@ from openstack.manager import OpenStackManager
 from util.logger import Logger, logging, logger
 #from timeout import timeout
 
+debug_mode = False
 
 logger = Logger(logfile='observer.log', level=logging.INFO)
 
+class StepNotReady(Exception):
+       pass
+
 def toposort(g, steps):
        reverse = {}
 
@@ -54,23 +58,23 @@ def toposort(g, steps):
 class PlanetStackObserver:
        sync_steps = ['SyncNetworks','SyncNetworkSlivers','SyncSites','SyncSitePrivileges','SyncSlices','SyncSliceMemberships','SyncSlivers','SyncSliverIps']
 
-    def __init__(self):
-        self.manager = OpenStackManager()
-        # The Condition object that gets signalled by Feefie events
+       def __init__(self):
+               self.manager = OpenStackManager()
+               # The Condition object that gets signalled by Feefie events
                self.load_sync_steps()
-        self.event_cond = threading.Condition()
+               self.event_cond = threading.Condition()
                self.load_enacted()
 
-    def wait_for_event(self, timeout):
-        self.event_cond.acquire()
-        self.event_cond.wait(timeout)
-        self.event_cond.release()
-        
-    def wake_up(self):
-        logger.info('Wake up routine called. Event cond %r'%self.event_cond)
-        self.event_cond.acquire()
-        self.event_cond.notify()
-        self.event_cond.release()
+       def wait_for_event(self, timeout):
+               self.event_cond.acquire()
+               self.event_cond.wait(timeout)
+               self.event_cond.release()
+               
+       def wake_up(self):
+               logger.info('Wake up routine called. Event cond %r'%self.event_cond)
+               self.event_cond.acquire()
+               self.event_cond.notify()
+               self.event_cond.release()
 
        def load_sync_steps(self):
                dep_path = Config().pl_dependency_path
@@ -100,6 +104,7 @@ class PlanetStackObserver:
                                        try:
                                                dest = provides_dict[m]
                                        except KeyError:
+                                               pass
                                                # no deps, pass
                                        step_graph[source]=dest
                                        
@@ -121,6 +126,7 @@ class PlanetStackObserver:
                                                        dest = backend_dict[m]
                                                except KeyError:
                                                        # no deps, pass
+                                                       pass
                                                step_graph[source]=dest
                                                
                                except KeyError:
@@ -130,32 +136,85 @@ class PlanetStackObserver:
                dependency_graph = step_graph
 
                self.ordered_steps = toposort(dependency_graph, steps)
-               
+               self.last_run_times={}
+               for e in self.ordered_steps:
+                       self.last_run_times[e.name]=0
+
+       def check_duration(self):
+               try:
+                       if (duration > S.deadline):
+                               logger.info('Sync step %s missed deadline, took %.2f seconds'%(S.name,duration))
+               except AttributeError:
+                       # S doesn't have a deadline
+                       pass
 
-    def run(self):
-        if not self.manager.enabled or not self.manager.has_openstack:
-            return
+       def update_run_time(self, step):
+               self.last_run_times[step.name]=time.time()
 
-               
-        while True:
-            try:
-                start_time=time.time()
-                
-                logger.info('Waiting for event')
-                tBeforeWait = time.time()
-                self.wait_for_event(timeout=300)
+       def check_schedule(self, step):
+               time_since_last_run = time.time() - self.last_run_times[step.name]
+               try:
+                       if (time_since_last_run < step.requested_interval):
+                               raise StepNotReady
+               except AttributeError:
+                       logger.info('Step %s does not have requested_interval set'%step.name)
+                       raise StepNotReady
+       
+       def check_class_dependency(self, step, failed_steps):
+               for failed_step in failed_steps:
+                       if (failed_step in self.dependency_graph[step.name]):
+                               raise StepNotReady
+
+       def run(self):
+               if not self.manager.enabled or not self.manager.has_openstack:
+                       return
+
+               while True:
+                       try:
+                               logger.info('Waiting for event')
+                               tBeforeWait = time.time()
+                               self.wait_for_event(timeout=300)
+                               logger.info('Observer woke up')
+
+                               # Set of whole steps that failed
+                               failed_steps = []
+
+                               # Set of individual objects within steps that failed
+                               failed_step_objects = []
 
                                for S in self.ordered_steps:
+                                       start_time=time.time()
+                                       
                                        sync_step = S()
-                                       sync_step()
-
-                # Enforce 5 minutes between wakeups
-                tSleep = 300 - (time.time() - tBeforeWait)
-                if tSleep > 0:
-                    logger.info('Sleeping for %d seconds' % tSleep)
-                    time.sleep(tSleep)
-
-                logger.info('Observer woke up')
-            except:
-                logger.log_exc("Exception in observer run loop")
-                traceback.print_exc()
+                                       sync_step.dependencies = self.dependencies[sync_step.name]
+                                       sync_step.debug_mode = debug_mode
+
+                                       should_run = False
+                                       try:
+                                               # Various checks that decide whether
+                                               # this step runs or not
+                                               self.check_class_dependency(sync_step, failed_steps) # dont run Slices if Sites failed
+                                               self.check_schedule(sync_step) # dont run sync_network_routes if time since last run < 1 hour
+                                               should_run = True
+                                       except StepNotReady:
+                                               logging.info('Step not ready: %s'%sync_step.name)
+                                               failed_steps.add(sync_step)
+                                       except:
+                                               failed_steps.add(sync_step)
+
+                                       if (should_run):
+                                               try:
+                                                       duration=time.time() - start_time
+
+                                                       # ********* This is the actual sync step
+                                                       failed_objects = sync_step(failed=failed_step_objects)
+
+
+                                                       check_deadline(sync_step, duration)
+                                                       failed_step_objects.extend(failed_objects)
+                                                       self.update_run_time(sync_step)
+                                               except:
+                                                       failed_steps.add(S)
+                       except:
+                               logger.log_exc("Exception in observer run loop")
+                               traceback.print_exc()
index 7bfe9f4..3ce3c68 100644 (file)
@@ -10,17 +10,7 @@ class OpenStackSyncStep:
                super(SyncStep,self).__init__(**args)
                return
 
-       def call(self):
-               pending = self.fetch_pending()
-               failed = []
-               for o in pending:
-                       if (not self.depends_on(o, failed)):
-                               try:
-                                       self.sync_record(o)
-                                       o.enacted = datetime.now() # Is this the same timezone? XXX
-                                       o.save(update_fields=['enacted'])
-                               except:
-                                       failed.append(o)
+       
 
 
        def __call__(self):
index b206106..f3eb4ba 100644 (file)
@@ -2,6 +2,9 @@ import os
 import base64
 from planetstack.config import Config
 
+class FailedDependency(Exception):
+       pass
+
 class SyncStep:
        """ A PlanetStack Sync step. 
 
@@ -24,6 +27,7 @@ class SyncStep:
                                name -- Name of the step
                                provides -- PlanetStack models sync'd by this step
                """
+               dependencies = []
                try:
                        self.soft_deadline = int(self.get_prop('soft_deadline_seconds'))
                except:
@@ -33,9 +37,26 @@ class SyncStep:
 
        def fetch_pending(self):
                return Sliver.objects.filter(ip=None)
+       
+       def check_dependencies(self, obj):
+               for dep in dependencies:
+                       peer_object = getattr(obj, dep.name.lowercase())
+                       if (peer_object.pk==dep.pk):
+                               raise DependencyFailed
 
-       def call(self):
-               return True
+       def call(self, failed=failed_objects):
+               pending = self.fetch_pending()
+               failed = []
+               for o in pending:
+                       if (not self.depends_on(o, failed)):
+                               try:
+                                       check_dependencies(o) # Raises exception if failed                                      
+                                       self.sync_record(o)
+                                       o.enacted = datetime.now() # Is this the same timezone? XXX
+                                       o.save(update_fields=['enacted'])
+                               except:
+                                       failed.append(o)
+               return failed
 
        def __call__(self):
                return self.call()