Sync refactored into abstract steps
[plstackapi.git] / planetstack / observer / event_loop.py
diff --git a/planetstack/observer/event_loop.py b/planetstack/observer/event_loop.py
new file mode 100644 (file)
index 0000000..4b11504
--- /dev/null
@@ -0,0 +1,161 @@
+import time
+import traceback
+import commands
+import threading
+import json
+
+from datetime import datetime
+from collections import defaultdict
+from core.models import *
+from django.db.models import F, Q
+from openstack.manager import OpenStackManager
+from util.logger import Logger, logging, logger
+#from timeout import timeout
+
+
+logger = Logger(logfile='observer.log', level=logging.INFO)
+
+def toposort(g, steps):
+       reverse = {}
+
+       for k,v in g.items():
+               for rk in v:
+                       try:
+                               reverse[rk].append(k)
+                       except:
+                               reverse[rk]=k
+
+       sources = []
+       for k,v in g.items():
+               if not reverse.has_key(k):
+                       sources.append(k)
+
+
+       for k,v in reverse.iteritems():
+               if (not v):
+                       sources.append(k)
+
+       order = []
+       marked = []
+       while sources:
+               n = sources.pop()
+               try:
+                       for m in g[n]:
+                               if m not in marked:
+                                       sources.append(m)
+                                       marked.append(m)
+               except KeyError:
+                       pass
+               if (n in steps):
+                       order.append(n)
+
+       return order
+
+class PlanetStackObserver:
+       sync_steps = ['SyncNetworks','SyncNetworkSlivers','SyncSites','SyncSitePrivileges','SyncSlices','SyncSliceMemberships','SyncSlivers','SyncSliverIps']
+
+    def __init__(self):
+        self.manager = OpenStackManager()
+        # The Condition object that gets signalled by Feefie events
+               self.load_sync_steps()
+        self.event_cond = threading.Condition()
+               self.load_enacted()
+
+    def wait_for_event(self, timeout):
+        self.event_cond.acquire()
+        self.event_cond.wait(timeout)
+        self.event_cond.release()
+        
+    def wake_up(self):
+        logger.info('Wake up routine called. Event cond %r'%self.event_cond)
+        self.event_cond.acquire()
+        self.event_cond.notify()
+        self.event_cond.release()
+
+       def load_sync_steps(self):
+               dep_path = Config().pl_dependency_path
+               try:
+                       # This contains dependencies between records, not sync steps
+                       self.model_dependency_graph = json.loads(open(dep_path).read())
+               except Exception,e:
+                       raise e
+
+               backend_path = Config().backend_dependency_path
+               try:
+                       # This contains dependencies between backend records
+                       self.backend_dependency_graph = json.loads(open(backend_path).read())
+               except Exception,e:
+                       raise e
+
+               provides_dict = {}
+               for s in sync_steps:
+                       for m in s.provides:
+                               provides_dict[m]=s.__name__
+                               
+               step_graph = {}
+               for k,v in model_dependency_graph.iteritems():
+                       try:
+                               source = provides_dict[k]
+                               for m in v:
+                                       try:
+                                               dest = provides_dict[m]
+                                       except KeyError:
+                                               # no deps, pass
+                                       step_graph[source]=dest
+                                       
+                       except KeyError:
+                               pass
+                               # no dependencies, pass
+               
+               if (backend_dependency_graph):
+                       backend_dict = {}
+                       for s in sync_steps:
+                               for m in s.serves:
+                                       backend_dict[m]=s.__name__
+                                       
+                       for k,v in backend_dependency_graph.iteritems():
+                               try:
+                                       source = backend_dict[k]
+                                       for m in v:
+                                               try:
+                                                       dest = backend_dict[m]
+                                               except KeyError:
+                                                       # no deps, pass
+                                               step_graph[source]=dest
+                                               
+                               except KeyError:
+                                       pass
+                                       # no dependencies, pass
+
+               dependency_graph = step_graph
+
+               self.ordered_steps = toposort(dependency_graph, steps)
+               
+
+    def run(self):
+        if not self.manager.enabled or not self.manager.has_openstack:
+            return
+
+               
+        while True:
+            try:
+                start_time=time.time()
+                
+                logger.info('Waiting for event')
+                tBeforeWait = time.time()
+                self.wait_for_event(timeout=300)
+
+                               for S in self.ordered_steps:
+                                       sync_step = S()
+                                       sync_step()
+
+                # Enforce 5 minutes between wakeups
+                tSleep = 300 - (time.time() - tBeforeWait)
+                if tSleep > 0:
+                    logger.info('Sleeping for %d seconds' % tSleep)
+                    time.sleep(tSleep)
+
+                logger.info('Observer woke up')
+            except:
+                logger.log_exc("Exception in observer run loop")
+                traceback.print_exc()