Sync refactored into abstract steps
[plstackapi.git] / planetstack / observer / event_loop.py
1 import time
2 import traceback
3 import commands
4 import threading
5 import json
6
7 from datetime import datetime
8 from collections import defaultdict
9 from core.models import *
10 from django.db.models import F, Q
11 from openstack.manager import OpenStackManager
12 from util.logger import Logger, logging, logger
13 #from timeout import timeout
14
15
16 logger = Logger(logfile='observer.log', level=logging.INFO)
17
18 def toposort(g, steps):
19         reverse = {}
20
21         for k,v in g.items():
22                 for rk in v:
23                         try:
24                                 reverse[rk].append(k)
25                         except:
26                                 reverse[rk]=k
27
28         sources = []
29         for k,v in g.items():
30                 if not reverse.has_key(k):
31                         sources.append(k)
32
33
34         for k,v in reverse.iteritems():
35                 if (not v):
36                         sources.append(k)
37
38         order = []
39         marked = []
40         while sources:
41                 n = sources.pop()
42                 try:
43                         for m in g[n]:
44                                 if m not in marked:
45                                         sources.append(m)
46                                         marked.append(m)
47                 except KeyError:
48                         pass
49                 if (n in steps):
50                         order.append(n)
51
52         return order
53
54 class PlanetStackObserver:
55         sync_steps = ['SyncNetworks','SyncNetworkSlivers','SyncSites','SyncSitePrivileges','SyncSlices','SyncSliceMemberships','SyncSlivers','SyncSliverIps']
56
57     def __init__(self):
58         self.manager = OpenStackManager()
59         # The Condition object that gets signalled by Feefie events
60                 self.load_sync_steps()
61         self.event_cond = threading.Condition()
62                 self.load_enacted()
63
64     def wait_for_event(self, timeout):
65         self.event_cond.acquire()
66         self.event_cond.wait(timeout)
67         self.event_cond.release()
68         
69     def wake_up(self):
70         logger.info('Wake up routine called. Event cond %r'%self.event_cond)
71         self.event_cond.acquire()
72         self.event_cond.notify()
73         self.event_cond.release()
74
75         def load_sync_steps(self):
76                 dep_path = Config().pl_dependency_path
77                 try:
78                         # This contains dependencies between records, not sync steps
79                         self.model_dependency_graph = json.loads(open(dep_path).read())
80                 except Exception,e:
81                         raise e
82
83                 backend_path = Config().backend_dependency_path
84                 try:
85                         # This contains dependencies between backend records
86                         self.backend_dependency_graph = json.loads(open(backend_path).read())
87                 except Exception,e:
88                         raise e
89
90                 provides_dict = {}
91                 for s in sync_steps:
92                         for m in s.provides:
93                                 provides_dict[m]=s.__name__
94                                 
95                 step_graph = {}
96                 for k,v in model_dependency_graph.iteritems():
97                         try:
98                                 source = provides_dict[k]
99                                 for m in v:
100                                         try:
101                                                 dest = provides_dict[m]
102                                         except KeyError:
103                                                 # no deps, pass
104                                         step_graph[source]=dest
105                                         
106                         except KeyError:
107                                 pass
108                                 # no dependencies, pass
109                 
110                 if (backend_dependency_graph):
111                         backend_dict = {}
112                         for s in sync_steps:
113                                 for m in s.serves:
114                                         backend_dict[m]=s.__name__
115                                         
116                         for k,v in backend_dependency_graph.iteritems():
117                                 try:
118                                         source = backend_dict[k]
119                                         for m in v:
120                                                 try:
121                                                         dest = backend_dict[m]
122                                                 except KeyError:
123                                                         # no deps, pass
124                                                 step_graph[source]=dest
125                                                 
126                                 except KeyError:
127                                         pass
128                                         # no dependencies, pass
129
130                 dependency_graph = step_graph
131
132                 self.ordered_steps = toposort(dependency_graph, steps)
133                 
134
135     def run(self):
136         if not self.manager.enabled or not self.manager.has_openstack:
137             return
138
139                 
140         while True:
141             try:
142                 start_time=time.time()
143                 
144                 logger.info('Waiting for event')
145                 tBeforeWait = time.time()
146                 self.wait_for_event(timeout=300)
147
148                                 for S in self.ordered_steps:
149                                         sync_step = S()
150                                         sync_step()
151
152                 # Enforce 5 minutes between wakeups
153                 tSleep = 300 - (time.time() - tBeforeWait)
154                 if tSleep > 0:
155                     logger.info('Sleeping for %d seconds' % tSleep)
156                     time.sleep(tSleep)
157
158                 logger.info('Observer woke up')
159             except:
160                 logger.log_exc("Exception in observer run loop")
161                 traceback.print_exc()