From 24836f18c96de50f93cacb2be40f311f66e4876a Mon Sep 17 00:00:00 2001 From: Sapan Bhatia Date: Tue, 27 Aug 2013 10:16:05 -0400 Subject: [PATCH] Sync refactored into abstract steps --- planetstack/core/models/plcorebase.py | 1 - planetstack/observer/event_loop.py | 161 ++++++ planetstack/observer/openstacksyncstep.py | 27 + planetstack/observer/steps/__init__.py | 10 + .../observer/steps/sync_external_routes.py | 14 + .../observer/steps/sync_network_slivers.py | 73 +++ planetstack/observer/steps/sync_networks.py | 50 ++ .../observer/steps/sync_site_privileges.py | 11 + planetstack/observer/steps/sync_sites.py | 26 + .../observer/steps/sync_slice_memberships.py | 12 + planetstack/observer/steps/sync_slices.py | 55 ++ planetstack/observer/steps/sync_sliver_ips.py | 22 + planetstack/observer/steps/sync_slivers.py | 26 + planetstack/observer/steps/sync_users.py | 32 ++ planetstack/observer/syncstep.py | 41 ++ planetstack/observer/toposort.py | 48 ++ planetstack/openstack/observer.py | 479 ------------------ 17 files changed, 608 insertions(+), 480 deletions(-) create mode 100644 planetstack/observer/event_loop.py create mode 100644 planetstack/observer/openstacksyncstep.py create mode 100644 planetstack/observer/steps/__init__.py create mode 100644 planetstack/observer/steps/sync_external_routes.py create mode 100644 planetstack/observer/steps/sync_network_slivers.py create mode 100644 planetstack/observer/steps/sync_networks.py create mode 100644 planetstack/observer/steps/sync_site_privileges.py create mode 100644 planetstack/observer/steps/sync_sites.py create mode 100644 planetstack/observer/steps/sync_slice_memberships.py create mode 100644 planetstack/observer/steps/sync_slices.py create mode 100644 planetstack/observer/steps/sync_sliver_ips.py create mode 100644 planetstack/observer/steps/sync_slivers.py create mode 100644 planetstack/observer/steps/sync_users.py create mode 100644 planetstack/observer/syncstep.py create mode 100755 planetstack/observer/toposort.py delete mode 100644 planetstack/openstack/observer.py diff --git a/planetstack/core/models/plcorebase.py b/planetstack/core/models/plcorebase.py index 30d4df3..00dc2d0 100644 --- a/planetstack/core/models/plcorebase.py +++ b/planetstack/core/models/plcorebase.py @@ -8,7 +8,6 @@ class PlCoreBase(models.Model): created = models.DateTimeField(auto_now_add=True) updated = models.DateTimeField(auto_now=True) - enacted = models.DateTimeField(null=True, default=None) class Meta: abstract = True diff --git a/planetstack/observer/event_loop.py b/planetstack/observer/event_loop.py new file mode 100644 index 0000000..4b11504 --- /dev/null +++ b/planetstack/observer/event_loop.py @@ -0,0 +1,161 @@ +import time +import traceback +import commands +import threading +import json + +from datetime import datetime +from collections import defaultdict +from core.models import * +from django.db.models import F, Q +from openstack.manager import OpenStackManager +from util.logger import Logger, logging, logger +#from timeout import timeout + + +logger = Logger(logfile='observer.log', level=logging.INFO) + +def toposort(g, steps): + reverse = {} + + for k,v in g.items(): + for rk in v: + try: + reverse[rk].append(k) + except: + reverse[rk]=k + + sources = [] + for k,v in g.items(): + if not reverse.has_key(k): + sources.append(k) + + + for k,v in reverse.iteritems(): + if (not v): + sources.append(k) + + order = [] + marked = [] + while sources: + n = sources.pop() + try: + for m in g[n]: + if m not in marked: + sources.append(m) + marked.append(m) + except KeyError: + pass + if (n in steps): + order.append(n) + + return order + +class PlanetStackObserver: + sync_steps = ['SyncNetworks','SyncNetworkSlivers','SyncSites','SyncSitePrivileges','SyncSlices','SyncSliceMemberships','SyncSlivers','SyncSliverIps'] + + def __init__(self): + self.manager = OpenStackManager() + # The Condition object that gets signalled by Feefie events + self.load_sync_steps() + self.event_cond = threading.Condition() + self.load_enacted() + + def wait_for_event(self, timeout): + self.event_cond.acquire() + self.event_cond.wait(timeout) + self.event_cond.release() + + def wake_up(self): + logger.info('Wake up routine called. Event cond %r'%self.event_cond) + self.event_cond.acquire() + self.event_cond.notify() + self.event_cond.release() + + def load_sync_steps(self): + dep_path = Config().pl_dependency_path + try: + # This contains dependencies between records, not sync steps + self.model_dependency_graph = json.loads(open(dep_path).read()) + except Exception,e: + raise e + + backend_path = Config().backend_dependency_path + try: + # This contains dependencies between backend records + self.backend_dependency_graph = json.loads(open(backend_path).read()) + except Exception,e: + raise e + + provides_dict = {} + for s in sync_steps: + for m in s.provides: + provides_dict[m]=s.__name__ + + step_graph = {} + for k,v in model_dependency_graph.iteritems(): + try: + source = provides_dict[k] + for m in v: + try: + dest = provides_dict[m] + except KeyError: + # no deps, pass + step_graph[source]=dest + + except KeyError: + pass + # no dependencies, pass + + if (backend_dependency_graph): + backend_dict = {} + for s in sync_steps: + for m in s.serves: + backend_dict[m]=s.__name__ + + for k,v in backend_dependency_graph.iteritems(): + try: + source = backend_dict[k] + for m in v: + try: + dest = backend_dict[m] + except KeyError: + # no deps, pass + step_graph[source]=dest + + except KeyError: + pass + # no dependencies, pass + + dependency_graph = step_graph + + self.ordered_steps = toposort(dependency_graph, steps) + + + def run(self): + if not self.manager.enabled or not self.manager.has_openstack: + return + + + while True: + try: + start_time=time.time() + + logger.info('Waiting for event') + tBeforeWait = time.time() + self.wait_for_event(timeout=300) + + for S in self.ordered_steps: + sync_step = S() + sync_step() + + # Enforce 5 minutes between wakeups + tSleep = 300 - (time.time() - tBeforeWait) + if tSleep > 0: + logger.info('Sleeping for %d seconds' % tSleep) + time.sleep(tSleep) + + logger.info('Observer woke up') + except: + logger.log_exc("Exception in observer run loop") + traceback.print_exc() diff --git a/planetstack/observer/openstacksyncstep.py b/planetstack/observer/openstacksyncstep.py new file mode 100644 index 0000000..7bfe9f4 --- /dev/null +++ b/planetstack/observer/openstacksyncstep.py @@ -0,0 +1,27 @@ +import os +import base64 +from syncstep import SyncStep + +class OpenStackSyncStep: + """ PlanetStack Sync step for copying data to OpenStack + """ + + def __init__(self, **args): + super(SyncStep,self).__init__(**args) + return + + def call(self): + pending = self.fetch_pending() + failed = [] + for o in pending: + if (not self.depends_on(o, failed)): + try: + self.sync_record(o) + o.enacted = datetime.now() # Is this the same timezone? XXX + o.save(update_fields=['enacted']) + except: + failed.append(o) + + + def __call__(self): + return self.call() diff --git a/planetstack/observer/steps/__init__.py b/planetstack/observer/steps/__init__.py new file mode 100644 index 0000000..a239587 --- /dev/null +++ b/planetstack/observer/steps/__init__.py @@ -0,0 +1,10 @@ +from .syncexternalroutes import SyncExternalRoutes +from .syncnetworkslivers import SyncNetworkSlivers +from .syncnetworks import SyncNetworks +from .syncsiteprivileges import SyncSitePrivileges +from .syncsites import SyncSites +from .syncslicememberships import SyncSliceMemberships +from .syncslices import SyncSlices +from .syncsliverips import SyncSliverIps +from .syncslivers import SyncSlivers +from .syncusers import SyncUsers diff --git a/planetstack/observer/steps/sync_external_routes.py b/planetstack/observer/steps/sync_external_routes.py new file mode 100644 index 0000000..ba4f939 --- /dev/null +++ b/planetstack/observer/steps/sync_external_routes.py @@ -0,0 +1,14 @@ +import os +import base64 +from planetstack.config import Config + +class SyncExternalRoutes(SyncStep): + # XXX what does this provide? + def call(self): + routes = self.manager.driver.get_external_routes() + subnets = self.manager.driver.shell.quantum.list_subnets()['subnets'] + for subnet in subnets: + try: + self.manager.driver.add_external_route(subnet, routes) + except: + logger.log_exc("failed to add external route for subnet %s" % subnet) diff --git a/planetstack/observer/steps/sync_network_slivers.py b/planetstack/observer/steps/sync_network_slivers.py new file mode 100644 index 0000000..414a260 --- /dev/null +++ b/planetstack/observer/steps/sync_network_slivers.py @@ -0,0 +1,73 @@ +import os +import base64 +from planetstack.config import Config + +class SyncNetworkSlivers(OpenStackSyncStep): + slow=True + provides=[NetworkSliver] + + def call(self): + networkSlivers = NetworkSliver.objects.all() + networkSlivers_by_id = {} + networkSlivers_by_port = {} + for networkSliver in networkSlivers: + networkSlivers_by_id[networkSliver.id] = networkSliver + networkSlivers_by_port[networkSliver.port_id] = networkSliver + + networks = Network.objects.all() + networks_by_id = {} + for network in networks: + networks_by_id[network.network_id] = network + + slivers = Sliver.objects.all() + slivers_by_instance_id = {} + for sliver in slivers: + slivers_by_instance_id[sliver.instance_id] = sliver + + ports = self.manager.driver.shell.quantum.list_ports()["ports"] + for port in ports: + if port["id"] in networkSlivers_by_port: + # we already have it + print "already accounted for port", port["id"] + continue + + if port["device_owner"] != "compute:nova": + # we only want the ports that connect to instances + continue + + network = networks_by_id.get(port['network_id'], None) + if not network: + #print "no network for port", port["id"], "network", port["network_id"] + continue + + sliver = slivers_by_instance_id.get(port['device_id'], None) + if not sliver: + print "no sliver for port", port["id"], "device_id", port['device_id'] + continue + + if network.template.sharedNetworkId is not None: + # If it's a shared network template, then more than one network + # object maps to the quantum network. We have to do a whole bunch + # of extra work to find the right one. + networks = network.template.network_set.all() + network = None + for candidate_network in networks: + if (candidate_network.owner == sliver.slice): + print "found network", candidate_network + network = candidate_network + + if not network: + print "failed to find the correct network for a shared template for port", port["id"], "network", port["network_id"] + continue + + if not port["fixed_ips"]: + print "port", port["id"], "has no fixed_ips" + continue + +# print "XXX", port + + ns = NetworkSliver(network=network, + sliver=sliver, + ip=port["fixed_ips"][0]["ip_address"], + port_id=port["id"]) + ns.save() diff --git a/planetstack/observer/steps/sync_networks.py b/planetstack/observer/steps/sync_networks.py new file mode 100644 index 0000000..7ae7dc2 --- /dev/null +++ b/planetstack/observer/steps/sync_networks.py @@ -0,0 +1,50 @@ +import os +import base64 +from planetstack.config import Config + +class SyncNetworks(OpenStackSyncStep): + provides=[Network] + + def save_network(self, network): + if not network.network_id: + if network.template.sharedNetworkName: + network.network_id = network.template.sharedNetworkId + (network.subnet_id, network.subnet) = self.driver.get_network_subnet(network.network_id) + else: + network_name = network.name + + # create network + os_network = self.driver.create_network(network_name, shared=True) + network.network_id = os_network['id'] + + # create router + router = self.driver.create_router(network_name) + network.router_id = router['id'] + + # create subnet + next_subnet = self.get_next_subnet() + cidr = str(next_subnet.cidr) + ip_version = next_subnet.version + start = str(next_subnet[2]) + end = str(next_subnet[-2]) + subnet = self.driver.create_subnet(name=network_name, + network_id = network.network_id, + cidr_ip = cidr, + ip_version = ip_version, + start = start, + end = end) + network.subnet = cidr + network.subnet_id = subnet['id'] + + @require_enabled + def sync_record(self, site): + if network.owner and network.owner.creator: + try: + # update manager context + self.driver.init_caller(network.owner.creator, network.owner.name) + self.save_network(network) + logger.info("saved network: %s" % (network)) + except Exception,e: + logger.log_exc("save network failed: %s" % network) + raise e + diff --git a/planetstack/observer/steps/sync_site_privileges.py b/planetstack/observer/steps/sync_site_privileges.py new file mode 100644 index 0000000..8f8f8ac --- /dev/null +++ b/planetstack/observer/steps/sync_site_privileges.py @@ -0,0 +1,11 @@ +import os +import base64 +from planetstack.config import Config + +class SyncSitePrivileges(OpenStackSyncStep): + provides=[SitePrivilege] + def sync_record(self, user): + if site_priv.user.kuser_id and site_priv.site.tenant_id: + self.driver.add_user_role(site_priv.user.kuser_id, + site_priv.site.tenant_id, + site_priv.role.role_type) diff --git a/planetstack/observer/steps/sync_sites.py b/planetstack/observer/steps/sync_sites.py new file mode 100644 index 0000000..5d7cc30 --- /dev/null +++ b/planetstack/observer/steps/sync_sites.py @@ -0,0 +1,26 @@ +import os +import base64 +from planetstack.config import Config + +class SyncSites(OpenStackSyncStep): + provides=[Site] + def sync_record(self, site): + save_site = False + if not site.tenant_id: + tenant = self.driver.create_tenant(tenant_name=site.login_base, + description=site.name, + enabled=site.enabled) + site.tenant_id = tenant.id + save_site = True + # XXX - What's caller? + # self.driver.add_user_role(self.caller.kuser_id, tenant.id, 'admin') + + # update the record + if site.id and site.tenant_id: + self.driver.update_tenant(site.tenant_id, + description=site.name, + enabled=site.enabled) + + if (save_site): + site.save() # + diff --git a/planetstack/observer/steps/sync_slice_memberships.py b/planetstack/observer/steps/sync_slice_memberships.py new file mode 100644 index 0000000..a0c83eb --- /dev/null +++ b/planetstack/observer/steps/sync_slice_memberships.py @@ -0,0 +1,12 @@ +import os +import base64 +from planetstack.config import Config + +class SyncSliceMemberships(OpenStackSyncStep): + provides=[SliceMembership] + def sync_record(self, user): + if slice_memb.user.kuser_id and slice_memb.slice.tenant_id: + self.driver.add_user_role(slice_memb.user.kuser_id, + slice_memb.slice.tenant_id, + slice_memb.role.role_type) + diff --git a/planetstack/observer/steps/sync_slices.py b/planetstack/observer/steps/sync_slices.py new file mode 100644 index 0000000..736fde6 --- /dev/null +++ b/planetstack/observer/steps/sync_slices.py @@ -0,0 +1,55 @@ +import os +import base64 +from planetstack.config import Config + +class SyncSlices(OpenStackSyncStep): + provides=[Slice] + def sync_record(self, slice): + if not slice.tenant_id: + nova_fields = {'tenant_name': slice.name, + 'description': slice.description, + 'enabled': slice.enabled} + tenant = self.driver.create_tenant(**nova_fields) + slice.tenant_id = tenant.id + + # XXX give caller an admin role at the tenant they've created + self.driver.add_user_role(self.caller.kuser_id, tenant.id, 'admin') + + # refresh credentials using this tenant + self.driver.shell.connect(username=self.driver.shell.keystone.username, + password=self.driver.shell.keystone.password, + tenant=tenant.name) + + # create network + network = self.driver.create_network(slice.name) + slice.network_id = network['id'] + + # create router + router = self.driver.create_router(slice.name) + slice.router_id = router['id'] + + # create subnet + next_subnet = self.get_next_subnet() + cidr = str(next_subnet.cidr) + ip_version = next_subnet.version + start = str(next_subnet[2]) + end = str(next_subnet[-2]) + subnet = self.driver.create_subnet(name=slice.name, + network_id = network['id'], + cidr_ip = cidr, + ip_version = ip_version, + start = start, + end = end) + slice.subnet_id = subnet['id'] + # add subnet as interface to slice's router + self.driver.add_router_interface(router['id'], subnet['id']) + # add external route + self.driver.add_external_route(subnet) + + + if slice.id and slice.tenant_id: + self.driver.update_tenant(slice.tenant_id, + description=slice.description, + enabled=slice.enabled) + + slice.save() diff --git a/planetstack/observer/steps/sync_sliver_ips.py b/planetstack/observer/steps/sync_sliver_ips.py new file mode 100644 index 0000000..4421ca5 --- /dev/null +++ b/planetstack/observer/steps/sync_sliver_ips.py @@ -0,0 +1,22 @@ +import os +import base64 +from planetstack.config import Config + +class SyncSliverIps(OpenStackSyncStep): + provides=[Sliver] + def fetch_pending(self): + slivers = Sliver.objects.filter(ip=None) + return slivers + + def sync_record(self, sliver): + self.manager.init_admin(tenant=sliver.slice.name) + servers = self.manager.driver.shell.nova.servers.findall(id=sliver.instance_id) + if not servers: + continue + server = servers[0] + ips = server.addresses.get(sliver.slice.name, []) + if not ips: + continue + sliver.ip = ips[0]['addr'] + sliver.save() + logger.info("saved sliver ip: %s %s" % (sliver, ips[0])) diff --git a/planetstack/observer/steps/sync_slivers.py b/planetstack/observer/steps/sync_slivers.py new file mode 100644 index 0000000..a8ef822 --- /dev/null +++ b/planetstack/observer/steps/sync_slivers.py @@ -0,0 +1,26 @@ +import os +import base64 +from planetstack.config import Config + +class SyncSlivers(OpenStackSyncStep): + provides=[Sliver] + def sync_record(self, slice): + if not sliver.instance_id: + nics = self.get_requested_networks(sliver.slice) + file("/tmp/scott-manager","a").write("slice: %s\nreq: %s\n" % (str(sliver.slice.name), str(nics))) + slice_memberships = SliceMembership.objects.filter(slice=sliver.slice) + pubkeys = [sm.user.public_key for sm in slice_memberships if sm.user.public_key] + pubkeys.append(sliver.creator.public_key) + instance = self.driver.spawn_instance(name=sliver.name, + key_name = sliver.creator.keyname, + image_id = sliver.image.image_id, + hostname = sliver.node.name, + pubkeys = pubkeys, + nics = nics ) + sliver.instance_id = instance.id + sliver.instance_name = getattr(instance, 'OS-EXT-SRV-ATTR:instance_name') + + if sliver.instance_id and ("numberCores" in sliver.changed_fields): + self.driver.update_instance_metadata(sliver.instance_id, {"cpu_cores": str(sliver.numberCores)}) + + sliver.save() diff --git a/planetstack/observer/steps/sync_users.py b/planetstack/observer/steps/sync_users.py new file mode 100644 index 0000000..af1bc30 --- /dev/null +++ b/planetstack/observer/steps/sync_users.py @@ -0,0 +1,32 @@ +import os +import base64 +from planetstack.config import Config + +class SyncUsers(OpenStackSyncStep): + provides=[User] + def sync_record(self, user): + name = user.email[:user.email.find('@')] + user_fields = {'name': name, + 'email': user.email, + 'password': hashlib.md5(user.password).hexdigest()[:6], + 'enabled': True} + if not user.kuser_id: + keystone_user = self.driver.create_user(**user_fields) + user.kuser_id = keystone_user.id + else: + self.driver.update_user(user.kuser_id, user_fields) + + if user.site: + self.driver.add_user_role(user.kuser_id, user.site.tenant_id, 'user') + if user.is_admin: + self.driver.add_user_role(user.kuser_id, user.site.tenant_id, 'admin') + else: + # may have admin role so attempt to remove it + self.driver.delete_user_role(user.kuser_id, user.site.tenant_id, 'admin') + + if user.public_key: + self.init_caller(user, user.site.login_base) + self.save_key(user.public_key, user.keyname) + self.init_admin() + + user.save() diff --git a/planetstack/observer/syncstep.py b/planetstack/observer/syncstep.py new file mode 100644 index 0000000..b206106 --- /dev/null +++ b/planetstack/observer/syncstep.py @@ -0,0 +1,41 @@ +import os +import base64 +from planetstack.config import Config + +class SyncStep: + """ A PlanetStack Sync step. + + Attributes: + psmodel Model name the step synchronizes + dependencies list of names of models that must be synchronized first if the current model depends on them + """ + slow=False + def get_prop(prop): + try: + sync_config_dir = Config().sync_config_dir + except: + sync_config_dir = '/etc/planetstack/sync' + prop_config_path = '/'.join(sync_config_dir,self.name,prop) + return open(prop_config_path).read().rstrip() + + def __init__(self, **args): + """Initialize a sync step + Keyword arguments: + name -- Name of the step + provides -- PlanetStack models sync'd by this step + """ + try: + self.soft_deadline = int(self.get_prop('soft_deadline_seconds')) + except: + self.soft_deadline = 5 # 5 seconds + + return + + def fetch_pending(self): + return Sliver.objects.filter(ip=None) + + def call(self): + return True + + def __call__(self): + return self.call() diff --git a/planetstack/observer/toposort.py b/planetstack/observer/toposort.py new file mode 100755 index 0000000..34bf6f5 --- /dev/null +++ b/planetstack/observer/toposort.py @@ -0,0 +1,48 @@ +#!/usr/bin/python + +import time +import traceback +import commands +import threading +import json + +from datetime import datetime +from collections import defaultdict + +def toposort(g, steps): + reverse = {} + + for k,v in g.items(): + for rk in v: + try: + reverse[rk].append(k) + except: + reverse[rk]=k + + sources = [] + for k,v in g.items(): + if not reverse.has_key(k): + sources.append(k) + + + for k,v in reverse.iteritems(): + if (not v): + sources.append(k) + + order = [] + marked = [] + while sources: + n = sources.pop() + try: + for m in g[n]: + if m not in marked: + sources.append(m) + marked.append(m) + except KeyError: + pass + if (n in steps): + order.append(n) + + return order + +print toposort({'a':'b','b':'c','c':'d','d':'c'},['d','c','b','a']) diff --git a/planetstack/openstack/observer.py b/planetstack/openstack/observer.py deleted file mode 100644 index 977aa86..0000000 --- a/planetstack/openstack/observer.py +++ /dev/null @@ -1,479 +0,0 @@ -import time -import traceback -import commands -import threading - -from datetime import datetime -from collections import defaultdict -from core.models import * -from django.db.models import F, Q -from openstack.manager import OpenStackManager -from util.logger import Logger, logging, logger -#from timeout import timeout - - -logger = Logger(logfile='observer.log', level=logging.INFO) - -class PlanetStackObserver: - - def __init__(self): - self.manager = OpenStackManager() - # The Condition object that gets signalled by Feefie events - self.event_cond = threading.Condition() - - def wait_for_event(self, timeout): - self.event_cond.acquire() - self.event_cond.wait(timeout) - self.event_cond.release() - - def wake_up(self): - logger.info('Wake up routine called. Event cond %r'%self.event_cond) - self.event_cond.acquire() - self.event_cond.notify() - self.event_cond.release() - - def run(self): - if not self.manager.enabled or not self.manager.has_openstack: - return - while True: - try: - start_time=time.time() - logger.info('Observer run loop') - #self.sync_roles() - - logger.info('Calling sync tenants') - try: - self.sync_tenants() - except: - logger.log_exc("Exception in sync_tenants") - traceback.print_exc() - finish_time = time.time() - logger.info('Sync tenants took %f seconds'%(finish_time-start_time)) - - logger.info('Calling sync users') - try: - self.sync_users() - except: - logger.log_exc("Exception in sync_users") - traceback.print_exc() - finish_time = time.time() - logger.info('Sync users took %f seconds'%(finish_time-start_time)) - - logger.info('Calling sync tenant roles') - try: - self.sync_user_tenant_roles() - except: - logger.log_exc("Exception in sync_users") - traceback.print_exc() - - logger.info('Calling sync slivers') - try: - self.sync_slivers() - except: - logger.log_exc("Exception in sync slivers") - traceback.print_exc() - finish_time = time.time() - logger.info('Sync slivers took %f seconds'%(finish_time-start_time)) - - logger.info('Calling sync sliver ips') - try: - self.sync_sliver_ips() - except: - logger.log_exc("Exception in sync_sliver_ips") - traceback.print_exc() - finish_time = time.time() - logger.info('Sync sliver ips took %f seconds'%(finish_time-start_time)) - - logger.info('Calling sync networks') - try: - self.sync_networks() - except: - logger.log_exc("Exception in sync_networks") - traceback.print_exc() - finish_time = time.time() - logger.info('Sync networks took %f seconds'%(finish_time-start_time)) - - logger.info('Calling sync network slivers') - try: - self.sync_network_slivers() - except: - logger.log_exc("Exception in sync_network_slivers") - traceback.print_exc() - finish_time = time.time() - logger.info('Sync network sliver ips took %f seconds'%(finish_time-start_time)) - - logger.info('Calling sync external routes') - try: - self.sync_external_routes() - except: - logger.log_exc("Exception in sync_external_routes") - traceback.print_exc() - finish_time = time.time() - logger.info('Sync external routes took %f seconds'%(finish_time-start_time)) - - logger.info('Waiting for event') - tBeforeWait = time.time() - self.wait_for_event(timeout=300) - - # Enforce 5 minutes between wakeups - tSleep = 300 - (time.time() - tBeforeWait) - if tSleep > 0: - logger.info('Sleeping for %d seconds' % tSleep) - time.sleep(tSleep) - - logger.info('Observer woken up') - except: - logger.log_exc("Exception in observer run loop") - traceback.print_exc() - - def sync_roles(self): - """ - save all role that don't already exist in keystone. Remove keystone roles that - don't exist in planetstack - """ - # sync all roles that don't already in keystone - keystone_roles = self.manager.driver.shell.keystone.roles.findall() - keystone_role_names = [kr.name for kr in keystone_roles] - pending_roles = Role.objects.all() - pending_role_names = [r.role_type for r in pending_roles] - for role in pending_roles: - if role.role_type not in keystone_role_names: - try: - self.manager.save_role(role) - logger.info("save role: %s" % (role)) - except: - logger.log_exc("save role failed: %s" % role) - traceback.print_exc() - - # don't delete roles for now - """ - # delete keystone roles that don't exist in planetstack - for keystone_role in keystone_roles: - if keystone_role.name == 'admin': - continue - if keystone_role.name not in pending_role_names: - try: - self.manager.driver.delete_role({id: keystone_role.id}) - except: - traceback.print_exc() - """ - - def sync_tenants(self): - """ - Save all sites and sliceswhere enacted < updated or enacted == None. - Remove sites and slices that no don't exist in openstack db if they - have an enacted time (enacted != None). - """ - # get all sites that need to be synced (enacted < updated or enacted is None) - pending_sites = Site.objects.filter(Q(enacted__lt=F('updated')) | Q(enacted=None)) - for site in pending_sites: - try: - self.manager.save_site(site) - logger.info("saved site %s" % site) - except: - logger.log_exc("save site failed: %s" % site) - - # get all slices that need to be synced (enacted < updated or enacted is None) - pending_slices = Slice.objects.filter(Q(enacted__lt=F('updated')) | Q(enacted=None)) - for slice in pending_slices: - try: - self.manager.init_caller(slice.creator, slice.creator.site.login_base) - self.manager.save_slice(slice) - logger.info("saved slice %s" % slice) - except: - logger.log_exc("save slice failed: %s" % slice) - - # get all sites that where enacted != null. We can assume these sites - # have previously been synced and need to be checed for deletion. - sites = Site.objects.filter(enacted__isnull=False) - site_dict = {} - for site in sites: - site_dict[site.login_base] = site - - # get all slices that where enacted != null. We can assume these slices - # have previously been synced and need to be checed for deletion. - slices = Slice.objects.filter(enacted__isnull=False) - slice_dict = {} - for slice in slices: - slice_dict[slice.name] = slice - - # delete keystone tenants that don't have a site record - tenants = self.manager.driver.shell.keystone.tenants.findall() - system_tenants = ['admin','service'] - for tenant in tenants: - if tenant.name in system_tenants: - continue - if tenant.name not in site_dict and tenant.name not in slice_dict: - try: - self.manager.driver.delete_tenant(tenant.id) - logger.info("deleted tenant: %s" % (tenant)) - except: - logger.log_exc("delete tenant failed: %s" % tenant) - - - def sync_users(self): - """ - save all users where enacted < updated or enacted == None. Remove users that - no don't exist in openstack db if they have an enacted time (enacted != None). - """ - # get all users that need to be synced (enacted < updated or enacted is None) - pending_users = User.objects.filter(Q(enacted__lt=F('updated')) | Q(enacted=None)) - for user in pending_users: - try: - self.manager.save_user(user) - logger.info("saved user: %s" % (user)) - except: - logger.log_exc("save user failed: %s" %user) - - # get all users that where enacted != null. We can assume these users - # have previously been synced and need to be checed for deletion. - users = User.objects.filter(enacted__isnull=False) - user_dict = {} - for user in users: - user_dict[user.kuser_id] = user - - # delete keystone users that don't have a user record - system_users = ['admin', 'nova', 'quantum', 'glance', 'cinder', 'swift', 'service'] - users = self.manager.driver.shell.keystone.users.findall() - for user in users: - if user.name in system_users: - continue - if user.id not in user_dict: - try: - #self.manager.driver.delete_user(user.id) - logger.info("deleted user: %s" % user) - except: - logger.log_exc("delete user failed: %s" % user) - - - def sync_user_tenant_roles(self): - """ - Save all site privileges and slice memberships wheree enacted < updated or - enacted == None. Remove ones that don't exist in openstack db if they have - an enacted time (enacted != None). - """ - # sync site privileges - pending_site_privileges = SitePrivilege.objects.filter(Q(enacted__lt=F('updated')) | Q(enacted=None)) - for site_priv in pending_site_privileges: - try: - self.manager.save_site_privilege(site_priv) - logger.info("saved site privilege: %s" % (site_priv)) - except: logger.log_exc("save site privilege failed: %s " % site_priv) - - # sync slice memberships - pending_slice_memberships = SliceMembership.objects.filter(Q(enacted__lt=F('updated')) | Q(enacted=None)) - for slice_memb in pending_slice_memberships: - try: - self.manager.save_slice_membership(slice_memb) - logger.info("saved slice membership: %s" % (slice_memb)) - except: logger.log_exc("save slice membership failed: %s" % slice_memb) - - # get all site privileges and slice memberships that have been enacted - user_tenant_roles = defaultdict(list) - for site_priv in SitePrivilege.objects.filter(enacted__isnull=False): - user_tenant_roles[(site_priv.user.kuser_id, site_priv.site.tenant_id)].append(site_priv.role.role) - for slice_memb in SliceMembership.objects.filter(enacted__isnull=False): - user_tenant_roles[(slice_memb.user.kuser_id, slice_memb.slice.tenant_id)].append(slice_memb.role.role) - - # Some user tenant role aren't stored in planetstack but they must be preserved. - # Role that fall in this category are - # 1. Never remove a user's role that their home site - # 2. Never remove a user's role at a slice they've created. - # Keep track of all roles that must be preserved. - users = User.objects.all() - preserved_roles = {} - for user in users: - tenant_ids = [s['tenant_id'] for s in user.slices.values()] - tenant_ids.append(user.site.tenant_id) - preserved_roles[user.kuser_id] = tenant_ids - - - # begin removing user tenant roles from keystone. This is stored in the - # Metadata table. - for metadata in self.manager.driver.shell.keystone_db.get_metadata(): - # skip admin roles - if metadata.user_id == self.manager.driver.admin_user.id: - continue - # skip preserved tenant ids - if metadata.user_id in preserved_roles and \ - metadata.tenant_id in preserved_roles[metadata.user_id]: - continue - # get roles for user at this tenant - user_tenant_role_ids = user_tenant_roles.get((metadata.user_id, metadata.tenant_id), []) - - if user_tenant_role_ids: - # The user has roles at the tenant. Check if roles need to - # be updated. - user_keystone_role_ids = metadata.data.get('roles', []) - for role_id in user_keystone_role_ids: - if role_id not in user_tenant_role_ids: - user_keystone_role_ids.pop(user_keystone_role_ids.index(role_id)) - else: - # The user has no roles at this tenant. - metadata.data['roles'] = [] - #session.add(metadata) - logger.info("pruning metadata for %s at %s" % (metadata.user_id, metadata.tenant_id)) - - def sync_slivers(self): - """ - save all slivers where enacted < updated or enacted == None. Remove slivers that - no don't exist in openstack db if they have an enacted time (enacted != None). - """ - # get all users that need to be synced (enacted < updated or enacted is None) - pending_slivers = Sliver.objects.filter(Q(enacted__lt=F('updated')) | Q(enacted=None)) - for sliver in pending_slivers: - if sliver.creator: - try: - # update manager context - self.manager.init_caller(sliver.creator, sliver.slice.name) - self.manager.save_sliver(sliver) - logger.info("saved sliver: %s" % (sliver)) - except: - logger.log_exc("save sliver failed: %s" % sliver) - - # get all slivers where enacted != null. We can assume these users - # have previously been synced and need to be checed for deletion. - slivers = Sliver.objects.filter(enacted__isnull=False) - sliver_dict = {} - for sliver in slivers: - sliver_dict[sliver.instance_id] = sliver - - # delete sliver that don't have a sliver record - ctx = self.manager.driver.shell.nova_db.ctx - instances = self.manager.driver.shell.nova_db.instance_get_all(ctx) - for instance in instances: - if instance.uuid not in sliver_dict: - try: - # lookup tenant and update context - try: - tenant = self.manager.driver.shell.keystone.tenants.find(id=instance.project_id) - tenant_name = tenant.name - except: - tenant_name = None - logger.info("exception while retrieving tenant %s. Deleting instance using root tenant." % instance.project_id) - self.manager.init_admin(tenant=tenant_name) - self.manager.driver.destroy_instance(instance.uuid) - logger.info("destroyed sliver: %s" % (instance.uuid)) - except: - logger.log_exc("destroy sliver failed: %s" % instance) - - - def sync_sliver_ips(self): - # fill in null ip addresses - slivers = Sliver.objects.filter(ip=None) - for sliver in slivers: - # update connection - self.manager.init_admin(tenant=sliver.slice.name) - servers = self.manager.driver.shell.nova.servers.findall(id=sliver.instance_id) - if not servers: - continue - server = servers[0] - ips = server.addresses.get(sliver.slice.name, []) - if not ips: - continue - sliver.ip = ips[0]['addr'] - sliver.save() - logger.info("saved sliver ip: %s %s" % (sliver, ips[0])) - - def sync_external_routes(self): - routes = self.manager.driver.get_external_routes() - subnets = self.manager.driver.shell.quantum.list_subnets()['subnets'] - for subnet in subnets: - try: - self.manager.driver.add_external_route(subnet, routes) - except: - logger.log_exc("failed to add external route for subnet %s" % subnet) - - def sync_network_slivers(self): - networkSlivers = NetworkSliver.objects.all() - networkSlivers_by_id = {} - networkSlivers_by_port = {} - for networkSliver in networkSlivers: - networkSlivers_by_id[networkSliver.id] = networkSliver - networkSlivers_by_port[networkSliver.port_id] = networkSliver - - networks = Network.objects.all() - networks_by_id = {} - for network in networks: - networks_by_id[network.network_id] = network - - slivers = Sliver.objects.all() - slivers_by_instance_id = {} - for sliver in slivers: - slivers_by_instance_id[sliver.instance_id] = sliver - - ports = self.manager.driver.shell.quantum.list_ports()["ports"] - for port in ports: - if port["id"] in networkSlivers_by_port: - # we already have it - print "already accounted for port", port["id"] - continue - - if port["device_owner"] != "compute:nova": - # we only want the ports that connect to instances - continue - - network = networks_by_id.get(port['network_id'], None) - if not network: - #print "no network for port", port["id"], "network", port["network_id"] - continue - - sliver = slivers_by_instance_id.get(port['device_id'], None) - if not sliver: - print "no sliver for port", port["id"], "device_id", port['device_id'] - continue - - if network.template.sharedNetworkId is not None: - # If it's a shared network template, then more than one network - # object maps to the quantum network. We have to do a whole bunch - # of extra work to find the right one. - networks = network.template.network_set.all() - network = None - for candidate_network in networks: - if (candidate_network.owner == sliver.slice): - print "found network", candidate_network - network = candidate_network - - if not network: - print "failed to find the correct network for a shared template for port", port["id"], "network", port["network_id"] - continue - - if not port["fixed_ips"]: - print "port", port["id"], "has no fixed_ips" - continue - -# print "XXX", port - - ns = NetworkSliver(network=network, - sliver=sliver, - ip=port["fixed_ips"][0]["ip_address"], - port_id=port["id"]) - ns.save() - - def sync_networks(self): - """ - save all networks where enacted < updated or enacted == None. Remove networks that - no don't exist in openstack db if they have an enacted time (enacted != None). - """ - # get all users that need to be synced (enacted < updated or enacted is None) - pending_networks = Network.objects.filter(Q(enacted__lt=F('updated')) | Q(enacted=None)) - for network in pending_networks: - if network.owner and network.owner.creator: - try: - # update manager context - self.manager.init_caller(network.owner.creator, network.owner.name) - self.manager.save_network(network) - logger.info("saved network: %s" % (network)) - except: - logger.log_exc("save network failed: %s" % network) - - # get all networks where enacted != null. We can assume these users - # have previously been synced and need to be checed for deletion. - networks = Network.objects.filter(enacted__isnull=False) - network_dict = {} - for network in networks: - network_dict[network.network_id] = network - - # TODO: delete Network objects if quantum network doesn't exist - # (need to write self.manager.driver.shell.quantum_db) - -- 2.43.0