The Observer observes PlanetStack not OpenStack
[plstackapi.git] / planetstack / openstack / observer.py
1 import time
2 import traceback
3 import commands
4 import threading
5
6 from datetime import datetime
7 from collections import defaultdict
8 from core.models import *
9 from django.db.models import F, Q
10 from openstack.manager import OpenStackManager
11 from util.logger import Logger, logging, logger
12 #from timeout import timeout
13
14
15 logger = Logger(logfile='observer.log', level=logging.INFO)
16
17 class PlanetStackObserver:
18     
19     def __init__(self):
20         self.manager = OpenStackManager()
21         # The Condition object that gets signalled by Feefie events
22         self.event_cond = threading.Condition()
23
24     def wait_for_event(self, timeout):
25         self.event_cond.acquire()
26         self.event_cond.wait(timeout)
27         self.event_cond.release()
28         
29     def wake_up(self):
30         logger.info('Wake up routine called. Event cond %r'%self.event_cond)
31         self.event_cond.acquire()
32         self.event_cond.notify()
33         self.event_cond.release()
34
35     def run(self):
36         if not self.manager.enabled or not self.manager.has_openstack:
37             return
38         while True:
39             try:
40                 start_time=time.time()
41                 logger.info('Observer run loop')
42                 #self.sync_roles()
43
44                 logger.info('Calling sync tenants')
45                 try:
46                     self.sync_tenants()
47                 except:
48                     logger.log_exc("Exception in sync_tenants")
49                     traceback.print_exc()
50                 finish_time = time.time()
51                 logger.info('Sync tenants took %f seconds'%(finish_time-start_time))
52
53                 logger.info('Calling sync users')
54                 try:
55                     self.sync_users()
56                 except:
57                     logger.log_exc("Exception in sync_users")
58                     traceback.print_exc()
59                 finish_time = time.time()
60                 logger.info('Sync users took %f seconds'%(finish_time-start_time))
61
62                 logger.info('Calling sync tenant roles')
63                 try:
64                     self.sync_user_tenant_roles()
65                 except:
66                     logger.log_exc("Exception in sync_users")
67                     traceback.print_exc()
68
69                 logger.info('Calling sync slivers')
70                 try:
71                     self.sync_slivers()
72                 except:
73                     logger.log_exc("Exception in sync slivers")
74                     traceback.print_exc()
75                 finish_time = time.time()
76                 logger.info('Sync slivers took %f seconds'%(finish_time-start_time))
77
78                 logger.info('Calling sync sliver ips')
79                 try:
80                     self.sync_sliver_ips()
81                 except:
82                     logger.log_exc("Exception in sync_sliver_ips")
83                     traceback.print_exc()
84                 finish_time = time.time()
85                 logger.info('Sync sliver ips took %f seconds'%(finish_time-start_time))
86
87                 logger.info('Calling sync networks')
88                 try:
89                     self.sync_networks()
90                 except:
91                     logger.log_exc("Exception in sync_networks")
92                     traceback.print_exc()
93                 finish_time = time.time()
94                 logger.info('Sync networks took %f seconds'%(finish_time-start_time))
95
96                 logger.info('Calling sync network slivers')
97                 try:
98                     self.sync_network_slivers()
99                 except:
100                     logger.log_exc("Exception in sync_network_slivers")
101                     traceback.print_exc()
102                 finish_time = time.time()
103                 logger.info('Sync network sliver ips took %f seconds'%(finish_time-start_time))
104
105                 logger.info('Calling sync external routes')
106                 try:
107                     self.sync_external_routes()
108                 except:
109                      logger.log_exc("Exception in sync_external_routes")
110                      traceback.print_exc()
111                 finish_time = time.time()
112                 logger.info('Sync external routes took %f seconds'%(finish_time-start_time))
113
114                 logger.info('Waiting for event')
115                 tBeforeWait = time.time()
116                 self.wait_for_event(timeout=300)
117
118                 # Enforce 5 minutes between wakeups
119                 tSleep = 300 - (time.time() - tBeforeWait)
120                 if tSleep > 0:
121                     logger.info('Sleeping for %d seconds' % tSleep)
122                     time.sleep(tSleep)
123
124                 logger.info('Observer woken up')
125             except:
126                 logger.log_exc("Exception in observer run loop")
127                 traceback.print_exc()
128
129     def sync_roles(self):
130         """
131         save all role that don't already exist in keystone. Remove keystone roles that
132         don't exist in planetstack
133         """
134         # sync all roles that don't already in keystone  
135         keystone_roles = self.manager.driver.shell.keystone.roles.findall()
136         keystone_role_names = [kr.name for kr in keystone_roles]
137         pending_roles = Role.objects.all()
138         pending_role_names = [r.role_type for r in pending_roles] 
139         for role in pending_roles:
140             if role.role_type not in keystone_role_names:
141                 try:
142                     self.manager.save_role(role)
143                     logger.info("save role: %s" % (role))
144                 except:
145                     logger.log_exc("save role failed: %s" % role)  
146                     traceback.print_exc()
147
148         # don't delete roles for now 
149         """ 
150         # delete keystone roles that don't exist in planetstack
151         for keystone_role in keystone_roles:
152             if keystone_role.name == 'admin':
153                 continue
154             if keystone_role.name not in pending_role_names:
155                 try:
156                     self.manager.driver.delete_role({id: keystone_role.id})
157                 except:
158                     traceback.print_exc()
159         """
160
161     def sync_tenants(self):
162         """
163         Save all sites and sliceswhere enacted < updated or enacted == None. 
164         Remove sites and slices that no don't exist in openstack db if they 
165         have an enacted time (enacted != None).
166         """ 
167         # get all sites that need to be synced (enacted < updated or enacted is None)
168         pending_sites = Site.objects.filter(Q(enacted__lt=F('updated')) | Q(enacted=None))
169         for site in pending_sites:
170             try:
171                 self.manager.save_site(site)
172                 logger.info("saved site %s" % site)
173             except:
174                 logger.log_exc("save site failed: %s" % site)
175
176         # get all slices that need to be synced (enacted < updated or enacted is None)
177         pending_slices = Slice.objects.filter(Q(enacted__lt=F('updated')) | Q(enacted=None))
178         for slice in pending_slices:
179             try:
180                 self.manager.init_caller(slice.creator, slice.creator.site.login_base)
181                 self.manager.save_slice(slice)
182                 logger.info("saved slice %s" % slice)
183             except:
184                 logger.log_exc("save slice failed: %s" % slice)
185
186         # get all sites that where enacted != null. We can assume these sites
187         # have previously been synced and need to be checed for deletion.
188         sites = Site.objects.filter(enacted__isnull=False)
189         site_dict = {}
190         for site in sites:
191             site_dict[site.login_base] = site
192
193         # get all slices that where enacted != null. We can assume these slices
194         # have previously been synced and need to be checed for deletion.
195         slices = Slice.objects.filter(enacted__isnull=False)
196         slice_dict = {}
197         for slice in slices:
198             slice_dict[slice.name] = slice
199
200         # delete keystone tenants that don't have a site record
201         tenants = self.manager.driver.shell.keystone.tenants.findall()
202         system_tenants = ['admin','service']
203         for tenant in tenants:
204             if tenant.name in system_tenants: 
205                 continue
206             if tenant.name not in site_dict and tenant.name not in slice_dict:
207                 try:
208                     self.manager.driver.delete_tenant(tenant.id)
209                     logger.info("deleted tenant: %s" % (tenant))
210                 except:
211                     logger.log_exc("delete tenant failed: %s" % tenant)
212
213
214     def sync_users(self):
215         """
216         save all users where enacted < updated or enacted == None. Remove users that
217         no don't exist in openstack db if they have an enacted time (enacted != None).
218         """ 
219         # get all users that need to be synced (enacted < updated or enacted is None)
220         pending_users = User.objects.filter(Q(enacted__lt=F('updated')) | Q(enacted=None))
221         for user in pending_users:
222             try:
223                 self.manager.save_user(user)
224                 logger.info("saved user: %s" % (user))
225             except:
226                 logger.log_exc("save user failed: %s" %user)
227
228         # get all users that where enacted != null. We can assume these users
229         # have previously been synced and need to be checed for deletion.
230         users = User.objects.filter(enacted__isnull=False)
231         user_dict = {}
232         for user in users:
233             user_dict[user.kuser_id] = user
234
235         # delete keystone users that don't have a user record
236         system_users = ['admin', 'nova', 'quantum', 'glance', 'cinder', 'swift', 'service']
237         users = self.manager.driver.shell.keystone.users.findall()
238         for user in users:
239             if user.name in system_users:
240                 continue
241             if user.id not in user_dict:
242                 try:
243                     #self.manager.driver.delete_user(user.id)
244                     logger.info("deleted user: %s" % user)
245                 except:
246                     logger.log_exc("delete user failed: %s" % user)
247                     
248
249     def sync_user_tenant_roles(self):
250         """
251         Save all site privileges and slice memberships wheree enacted < updated or 
252         enacted == None. Remove ones that don't exist in openstack db if they have 
253         an enacted time (enacted != None).
254         """
255         # sync site privileges
256         pending_site_privileges = SitePrivilege.objects.filter(Q(enacted__lt=F('updated')) | Q(enacted=None))
257         for site_priv in pending_site_privileges:
258             try:
259                 self.manager.save_site_privilege(site_priv)  
260                 logger.info("saved site privilege: %s" % (site_priv))
261             except: logger.log_exc("save site privilege failed: %s " % site_priv)
262
263         # sync slice memberships
264         pending_slice_memberships = SliceMembership.objects.filter(Q(enacted__lt=F('updated')) | Q(enacted=None))
265         for slice_memb in pending_slice_memberships:
266             try:
267                 self.manager.save_slice_membership(slice_memb)
268                 logger.info("saved slice membership: %s" % (slice_memb))
269             except: logger.log_exc("save slice membership failed: %s" % slice_memb)
270
271         # get all site privileges and slice memberships that have been enacted 
272         user_tenant_roles = defaultdict(list)
273         for site_priv in SitePrivilege.objects.filter(enacted__isnull=False):
274             user_tenant_roles[(site_priv.user.kuser_id, site_priv.site.tenant_id)].append(site_priv.role.role)
275         for slice_memb in SliceMembership.objects.filter(enacted__isnull=False):
276             user_tenant_roles[(slice_memb.user.kuser_id, slice_memb.slice.tenant_id)].append(slice_memb.role.role)  
277  
278         # Some user tenant role aren't stored in planetstack but they must be preserved. 
279         # Role that fall in this category are
280         # 1. Never remove a user's role that their home site
281         # 2. Never remove a user's role at a slice they've created.
282         # Keep track of all roles that must be preserved.     
283         users = User.objects.all()
284         preserved_roles = {}
285         for user in users:
286             tenant_ids = [s['tenant_id'] for s in user.slices.values()]
287             tenant_ids.append(user.site.tenant_id) 
288             preserved_roles[user.kuser_id] = tenant_ids
289
290  
291         # begin removing user tenant roles from keystone. This is stored in the 
292         # Metadata table.
293         for metadata in self.manager.driver.shell.keystone_db.get_metadata():
294             # skip admin roles
295             if metadata.user_id == self.manager.driver.admin_user.id:
296                 continue
297             # skip preserved tenant ids
298             if metadata.user_id in preserved_roles and \
299                metadata.tenant_id in preserved_roles[metadata.user_id]: 
300                 continue           
301             # get roles for user at this tenant
302             user_tenant_role_ids = user_tenant_roles.get((metadata.user_id, metadata.tenant_id), [])
303
304             if user_tenant_role_ids:
305                 # The user has roles at the tenant. Check if roles need to 
306                 # be updated.
307                 user_keystone_role_ids = metadata.data.get('roles', [])
308                 for role_id in user_keystone_role_ids:
309                     if role_id not in user_tenant_role_ids: 
310                         user_keystone_role_ids.pop(user_keystone_role_ids.index(role_id))
311             else:
312                 # The user has no roles at this tenant. 
313                 metadata.data['roles'] = [] 
314             #session.add(metadata)
315             logger.info("pruning metadata for %s at %s" % (metadata.user_id, metadata.tenant_id))
316  
317     def sync_slivers(self):
318         """
319         save all slivers where enacted < updated or enacted == None. Remove slivers that
320         no don't exist in openstack db if they have an enacted time (enacted != None).
321         """
322         # get all users that need to be synced (enacted < updated or enacted is None)
323         pending_slivers = Sliver.objects.filter(Q(enacted__lt=F('updated')) | Q(enacted=None))
324         for sliver in pending_slivers:
325             if sliver.creator: 
326                 try: 
327                     # update manager context
328                     self.manager.init_caller(sliver.creator, sliver.slice.name)
329                     self.manager.save_sliver(sliver)
330                     logger.info("saved sliver: %s" % (sliver))
331                 except:
332                     logger.log_exc("save sliver failed: %s" % sliver) 
333
334         # get all slivers where enacted != null. We can assume these users
335         # have previously been synced and need to be checed for deletion.
336         slivers = Sliver.objects.filter(enacted__isnull=False)
337         sliver_dict = {}
338         for sliver in slivers:
339             sliver_dict[sliver.instance_id] = sliver
340
341         # delete sliver that don't have a sliver record
342         ctx = self.manager.driver.shell.nova_db.ctx
343         instances = self.manager.driver.shell.nova_db.instance_get_all(ctx)
344         for instance in instances:
345             if instance.uuid not in sliver_dict:
346                 try:
347                     # lookup tenant and update context
348                     try:
349                         tenant = self.manager.driver.shell.keystone.tenants.find(id=instance.project_id)
350                         tenant_name = tenant.name
351                     except:
352                         tenant_name = None
353                         logger.info("exception while retrieving tenant %s. Deleting instance using root tenant." % instance.project_id)
354                     self.manager.init_admin(tenant=tenant_name)
355                     self.manager.driver.destroy_instance(instance.uuid)
356                     logger.info("destroyed sliver: %s" % (instance.uuid))
357                 except:
358                     logger.log_exc("destroy sliver failed: %s" % instance) 
359                 
360
361     def sync_sliver_ips(self):
362         # fill in null ip addresses
363         slivers = Sliver.objects.filter(ip=None)
364         for sliver in slivers:
365             # update connection
366             self.manager.init_admin(tenant=sliver.slice.name)
367             servers = self.manager.driver.shell.nova.servers.findall(id=sliver.instance_id)
368             if not servers:
369                 continue
370             server = servers[0]
371             ips = server.addresses.get(sliver.slice.name, [])
372             if not ips:
373                 continue
374             sliver.ip = ips[0]['addr']
375             sliver.save()
376             logger.info("saved sliver ip: %s %s" % (sliver, ips[0]))
377
378     def sync_external_routes(self):
379         routes = self.manager.driver.get_external_routes() 
380         subnets = self.manager.driver.shell.quantum.list_subnets()['subnets']
381         for subnet in subnets:
382             try:
383                 self.manager.driver.add_external_route(subnet, routes)
384             except:
385                 logger.log_exc("failed to add external route for subnet %s" % subnet)
386
387     def sync_network_slivers(self):
388         networkSlivers = NetworkSliver.objects.all()
389         networkSlivers_by_id = {}
390         networkSlivers_by_port = {}
391         for networkSliver in networkSlivers:
392             networkSlivers_by_id[networkSliver.id] = networkSliver
393             networkSlivers_by_port[networkSliver.port_id] = networkSliver
394
395         networks = Network.objects.all()
396         networks_by_id = {}
397         for network in networks:
398             networks_by_id[network.network_id] = network
399
400         slivers = Sliver.objects.all()
401         slivers_by_instance_id = {}
402         for sliver in slivers:
403             slivers_by_instance_id[sliver.instance_id] = sliver
404
405         ports = self.manager.driver.shell.quantum.list_ports()["ports"]
406         for port in ports:
407             if port["id"] in networkSlivers_by_port:
408                 # we already have it
409                 print "already accounted for port", port["id"]
410                 continue
411
412             if port["device_owner"] != "compute:nova":
413                 # we only want the ports that connect to instances
414                 continue
415
416             network = networks_by_id.get(port['network_id'], None)
417             if not network:
418                 #print "no network for port", port["id"], "network", port["network_id"]
419                 continue
420
421             sliver = slivers_by_instance_id.get(port['device_id'], None)
422             if not sliver:
423                 print "no sliver for port", port["id"], "device_id", port['device_id']
424                 continue
425
426             if network.template.sharedNetworkId is not None:
427                 # If it's a shared network template, then more than one network
428                 # object maps to the quantum network. We have to do a whole bunch
429                 # of extra work to find the right one.
430                 networks = network.template.network_set.all()
431                 network = None
432                 for candidate_network in networks:
433                     if (candidate_network.owner == sliver.slice):
434                         print "found network", candidate_network
435                         network = candidate_network
436
437                 if not network:
438                     print "failed to find the correct network for a shared template for port", port["id"], "network", port["network_id"]
439                     continue
440
441             if not port["fixed_ips"]:
442                 print "port", port["id"], "has no fixed_ips"
443                 continue
444
445 #            print "XXX", port
446
447             ns = NetworkSliver(network=network,
448                                sliver=sliver,
449                                ip=port["fixed_ips"][0]["ip_address"],
450                                port_id=port["id"])
451             ns.save()
452
453     def sync_networks(self):
454         """
455         save all networks where enacted < updated or enacted == None. Remove networks that
456         no don't exist in openstack db if they have an enacted time (enacted != None).
457         """
458         # get all users that need to be synced (enacted < updated or enacted is None)
459         pending_networks = Network.objects.filter(Q(enacted__lt=F('updated')) | Q(enacted=None))
460         for network in pending_networks:
461             if network.owner and network.owner.creator:
462                 try:
463                     # update manager context
464                     self.manager.init_caller(network.owner.creator, network.owner.name)
465                     self.manager.save_network(network)
466                     logger.info("saved network: %s" % (network))
467                 except:
468                     logger.log_exc("save network failed: %s" % network)
469
470         # get all networks where enacted != null. We can assume these users
471         # have previously been synced and need to be checed for deletion.
472         networks = Network.objects.filter(enacted__isnull=False)
473         network_dict = {}
474         for network in networks:
475             network_dict[network.network_id] = network
476
477         # TODO: delete Network objects if quantum network doesn't exist
478         #       (need to write self.manager.driver.shell.quantum_db)
479