Merge branch 'master' of ssh://git.planet-lab.org/git/plstackapi
authorScott Baker <smbaker@gmail.com>
Thu, 4 Sep 2014 00:24:03 +0000 (17:24 -0700)
committerScott Baker <smbaker@gmail.com>
Thu, 4 Sep 2014 00:24:03 +0000 (17:24 -0700)
23 files changed:
planetstack/core/admin.py
planetstack/core/models/plcorebase.py
planetstack/core/models/slice.py
planetstack/core/models/user.py
planetstack/ec2_observer/event_loop.py
planetstack/ec2_observer/syncstep.py
planetstack/ec2_observer/toposort.py
planetstack/model_policies.py [new file with mode: 0644]
planetstack/model_policies/model_policy_Network.py [new file with mode: 0644]
planetstack/model_policies/model_policy_Slice.py [new file with mode: 0644]
planetstack/model_policies/model_policy_User.py [new file with mode: 0644]
planetstack/model_policy.py [new file with mode: 0644]
planetstack/openstack_observer/event_loop.py
planetstack/openstack_observer/steps/sync_network_deployments.py
planetstack/openstack_observer/steps/sync_networks.py
planetstack/openstack_observer/steps/sync_sites.py
planetstack/openstack_observer/steps/sync_slice_deployments.py
planetstack/openstack_observer/steps/sync_slices.py
planetstack/openstack_observer/steps/sync_sliver_ips.py [deleted file]
planetstack/openstack_observer/steps/sync_user_deployments.py
planetstack/openstack_observer/steps/sync_users.py
planetstack/openstack_observer/syncstep.py
planetstack/templates/admin/core/slice/change_form.html

index fad99a0..d472851 100644 (file)
@@ -738,8 +738,8 @@ class SliceAdmin(PlanetStackBaseAdmin):
     fieldList = ['backend_status_text', 'site', 'name', 'serviceClass', 'enabled','description', 'service', 'slice_url', 'max_slivers']
     fieldsets = [('Slice Details', {'fields': fieldList, 'classes':['suit-tab suit-tab-general']}),]
     readonly_fields = ('backend_status_text', )
-    list_display = ('backend_status_icon', 'slicename', 'site','serviceClass', 'slice_url', 'max_slivers')
-    list_display_links = ('backend_status_icon', 'slicename', )
+    list_display = ('backend_status_icon', 'name', 'site','serviceClass', 'slice_url', 'max_slivers')
+    list_display_links = ('backend_status_icon', 'name', )
     inlines = [SlicePrivilegeInline,SliverInline, TagInline, ReservationInline,SliceNetworkInline]
 
     user_readonly_fields = fieldList
@@ -762,20 +762,19 @@ class SliceAdmin(PlanetStackBaseAdmin):
             for deployment in flavor.deployments.all():
                 deployment_flavors.append( (deployment.id, flavor.id, flavor.name) )
 
-        sites = {}
-        for site in Site.objects.all():\r
-            sites[site.id] = site.login_base
-
+        site_login_bases = []
+        for site in Site.objects.all():
+            site_login_bases.append((site.id, site.login_base)) 
+        
         context["deployment_nodes"] = deployment_nodes
         context["deployment_flavors"] = deployment_flavors
-        context["sites"] = sites
-
+        context["site_login_bases"] = site_login_bases
         return super(SliceAdmin, self).render_change_form(request, context, add, change, form_url, obj)
 
     def formfield_for_foreignkey(self, db_field, request, **kwargs):
         if db_field.name == 'site':
             kwargs['queryset'] = Site.select_by_user(request.user)
-            kwargs['widget'] = forms.Select(attrs={'onChange': "update_slice_name(this, $($(this).closest('div')[0]).find('.field-name input')[0].id)"})
+            kwargs['widget'] = forms.Select(attrs={'onChange': "update_slice_prefix(this, $($(this).closest('fieldset')[0]).find('.field-name input')[0].id)"})
 
         return super(SliceAdmin, self).formfield_for_foreignkey(db_field, request, **kwargs)
 
index 446245b..4ac60b3 100644 (file)
@@ -5,6 +5,7 @@ from django.db import models
 from django.forms.models import model_to_dict
 from django.core.urlresolvers import reverse
 from django.forms.models import model_to_dict
+import model_policies
 
 try:
     # This is a no-op if observer_disabled is set to 1 in the config file
index e474560..0ecc99d 100644 (file)
@@ -11,6 +11,7 @@ from core.models import Tag
 from django.contrib.contenttypes import generic
 from core.models import Service
 from core.models import Deployment
+from django.core.exceptions import ValidationError
 
 # Create your models here.
 
@@ -38,6 +39,11 @@ class Slice(PlCoreBase):
         return "%s_%s" % (self.site.login_base, self.name)
 
     def save(self, *args, **kwds):
+        
+        site = Site.objects.get(id=self.site.id)
+        if not self.name.startswith(site.login_base):
+            raise ValidationError('slice name must begin with %s' % site.login_base)
+        
         if self.serviceClass is None:
             # We allowed None=True for serviceClass because Django evolution
             # will fail unless it is allowed. But, we we really don't want it to
index 6b5b061..ac87c2a 100644 (file)
@@ -10,6 +10,7 @@ from timezones.fields import TimeZoneField
 from operator import itemgetter, attrgetter
 from django.core.mail import EmailMultiAlternatives
 from core.middleware import get_request
+import model_policies
 
 # Create your models here.
 class UserManager(BaseUserManager):
index a579a94..1f15a8e 100644 (file)
@@ -6,6 +6,7 @@ import traceback
 import commands
 import threading
 import json
+import pdb
 
 from datetime import datetime
 from collections import defaultdict
@@ -16,26 +17,42 @@ from openstack.driver import OpenStackDriver
 from util.logger import Logger, logging, logger
 #from timeout import timeout
 from planetstack.config import Config
-from ec2_observer.steps import *
+from observer.steps import *
 from syncstep import SyncStep
 from toposort import toposort
-from ec2_observer.error_mapper import *
+from observer.error_mapper import *
 
 debug_mode = False
 
 logger = Logger(level=logging.INFO)
 
 class StepNotReady(Exception):
-    pass
+       pass
 
 class NoOpDriver:
-    def __init__(self):
-         self.enabled = True
+       def __init__(self):
+                self.enabled = True
+                self.dependency_graph = None
+
+STEP_STATUS_WORKING=1
+STEP_STATUS_OK=2
+STEP_STATUS_KO=3
+
+def invert_graph(g):
+       ig = {}
+       for k,v in g.items():
+               for v0 in v:
+                       try:
+                               ig[v0].append(k)
+                       except:
+                               ig=[k]
+       return ig
 
 class PlanetStackObserver:
        #sync_steps = [SyncNetworks,SyncNetworkSlivers,SyncSites,SyncSitePrivileges,SyncSlices,SyncSliceMemberships,SyncSlivers,SyncSliverIps,SyncExternalRoutes,SyncUsers,SyncRoles,SyncNodes,SyncImages,GarbageCollector]
        sync_steps = []
 
+       
        def __init__(self):
                # The Condition object that gets signalled by Feefie events
                self.step_lookup = {}
@@ -98,7 +115,7 @@ class PlanetStackObserver:
                        # This contains dependencies between backend records
                        self.backend_dependency_graph = json.loads(open(backend_path).read())
                except Exception,e:
-                       logger.info('Backend dependency graph not loaded: %s'%str(e))
+                       logger.info('Backend dependency graph not loaded')
                        # We can work without a backend graph
                        self.backend_dependency_graph = {}
 
@@ -111,7 +128,6 @@ class PlanetStackObserver:
                                except KeyError:
                                        provides_dict[m.__name__]=[s.__name__]
 
-                               
                step_graph = {}
                for k,v in self.model_dependency_graph.iteritems():
                        try:
@@ -155,9 +171,10 @@ class PlanetStackObserver:
                                        pass
                                        # no dependencies, pass
 
-               dependency_graph = step_graph
+               self.dependency_graph = step_graph
+               self.deletion_dependency_graph = invert_graph(step_graph)
 
-               self.ordered_steps = toposort(dependency_graph, map(lambda s:s.__name__,self.sync_steps))
+               self.ordered_steps = toposort(self.dependency_graph, map(lambda s:s.__name__,self.sync_steps))
                print "Order of steps=",self.ordered_steps
                self.load_run_times()
                
@@ -205,7 +222,6 @@ class PlanetStackObserver:
                                self.last_deletion_run_times[e]=0
 
 
-
        def save_run_times(self):
                run_times = json.dumps(self.last_run_times)
                open('/tmp/observer_run_times','w').write(run_times)
@@ -221,16 +237,129 @@ class PlanetStackObserver:
                        if (failed_step in step.dependencies):
                                raise StepNotReady
 
+       def sync(self, S, deletion):
+               step = self.step_lookup[S]
+               start_time=time.time()
+               
+               dependency_graph = self.dependency_graph if not deletion else self.deletion_dependency_graph
+
+               # Wait for step dependencies to be met
+               try:
+                       deps = self.dependency_graph[S]
+                       has_deps = True
+               except KeyError:
+                       has_deps = False
+
+               if (has_deps):
+                       for d in deps:
+                               cond = self.step_conditions[d]
+                               cond.acquire()
+                               if (self.step_status[d] is STEP_STATUS_WORKING):
+                                       cond.wait()
+                               cond.release()
+                       go = self.step_status[d] == STEP_STATUS_OK
+               else:
+                       go = True
+
+               if (not go):
+                       self.failed_steps.append(sync_step)
+                       my_status = STEP_STATUS_KO
+               else:
+                       sync_step = step(driver=self.driver,error_map=self.error_mapper)
+                       sync_step.__name__ = step.__name__
+                       sync_step.dependencies = []
+                       try:
+                               mlist = sync_step.provides
+                               
+                               for m in mlist:
+                                       sync_step.dependencies.extend(self.model_dependency_graph[m.__name__])
+                       except KeyError:
+                               pass
+                       sync_step.debug_mode = debug_mode
+
+                       should_run = False
+                       try:
+                               # Various checks that decide whether
+                               # this step runs or not
+                               self.check_class_dependency(sync_step, self.failed_steps) # dont run Slices if Sites failed
+                               self.check_schedule(sync_step, deletion) # dont run sync_network_routes if time since last run < 1 hour
+                               should_run = True
+                       except StepNotReady:
+                               logging.info('Step not ready: %s'%sync_step.__name__)
+                               self.failed_steps.append(sync_step)
+                               my_status = STEP_STATUS_KO
+                       except Exception,e:
+                               logging.error('%r',e)
+                               logger.log_exc("sync step failed: %r. Deletion: %r"%(sync_step,deletion))
+                               self.failed_steps.append(sync_step)
+                               my_status = STEP_STATUS_KO
+
+                       if (should_run):
+                               try:
+                                       duration=time.time() - start_time
+
+                                       logger.info('Executing step %s' % sync_step.__name__)
+
+                                       failed_objects = sync_step(failed=list(self.failed_step_objects), deletion=deletion)
+
+                                       self.check_duration(sync_step, duration)
+
+                                       if failed_objects:
+                                               self.failed_step_objects.update(failed_objects)
+
+                                       my_status = STEP_STATUS_OK
+                                       self.update_run_time(sync_step,deletion)
+                               except Exception,e:
+                                       logging.error('Model step failed. This seems like a misconfiguration or bug: %r. This error will not be relayed to the user!',e)
+                                       logger.log_exc(e)
+                                       self.failed_steps.append(S)
+                                       my_status = STEP_STATUS_KO
+                       else:
+                               my_status = STEP_STATUS_OK
+               
+               try:
+                       my_cond = self.step_conditions[S]
+                       my_cond.acquire()
+                       self.step_status[S]=my_status
+                       my_cond.notify_all()
+                       my_cond.release()
+               except KeyError,e:
+                       logging.info('Step %r is a leaf')
+                       pass
+
        def run(self):
                if not self.driver.enabled:
                        return
+
                if (self.driver_kind=="openstack") and (not self.driver.has_openstack):
                        return
 
                while True:
                        try:
                                error_map_file = getattr(Config(), "error_map_path", "/opt/planetstack/error_map.txt")
-                               error_mapper = ErrorMapper(error_map_file)
+                               self.error_mapper = ErrorMapper(error_map_file)
+
+                               # Set of whole steps that failed
+                               self.failed_steps = []
+
+                               # Set of individual objects within steps that failed
+                               self.failed_step_objects = set()
+
+                               # Set up conditions and step status
+                               # This is needed for steps to run in parallel
+                               # while obeying dependencies.
+
+                               providers = set()
+                               for v in self.dependency_graph.values():
+                                       if (v):
+                                               providers.update(v)
+
+                               self.step_conditions = {}
+                               self.step_status = {}
+                               for p in list(providers):
+                                       self.step_conditions[p] = threading.Condition()
+                                       self.step_status[p] = STEP_STATUS_WORKING
+
 
                                logger.info('Waiting for event')
                                tBeforeWait = time.time()
@@ -238,68 +367,25 @@ class PlanetStackObserver:
                                logger.info('Observer woke up')
 
                                # Two passes. One for sync, the other for deletion.
-                               for deletion in (False,True):
+                               for deletion in [False,True]:
+                                       threads = []
                                        logger.info('Deletion=%r...'%deletion)
-                                       # Set of whole steps that failed
-                                       failed_steps = []
+                                       schedule = self.ordered_steps if not deletion else reversed(self.ordered_steps)
 
-                                       # Set of individual objects within steps that failed
-                                       failed_step_objects = set()
+                                       for S in schedule:
+                                               thread = threading.Thread(target=self.sync, args=(S, deletion))
 
-                                       ordered_steps = self.ordered_steps if not deletion else reversed(self.ordered_steps)
+                                               logger.info('Deletion=%r...'%deletion)
+                                               threads.append(thread)
 
-                                       for S in ordered_steps:
-                                               step = self.step_lookup[S]
-                                               start_time=time.time()
-                                               
-                                               sync_step = step(driver=self.driver,error_map=error_mapper)
-                                               sync_step.__name__ = step.__name__
-                                               sync_step.dependencies = []
-                                               try:
-                                                       mlist = sync_step.provides
-                                                       
-                                                       for m in mlist:
-                                                               sync_step.dependencies.extend(self.model_dependency_graph[m.__name__])
-                                               except KeyError:
-                                                       pass
-                                               sync_step.debug_mode = debug_mode
+                                       # Start threads 
+                                       for t in threads:
+                                               t.start()
+
+                                       # Wait for all threads to finish before continuing with the run loop
+                                       for t in threads:
+                                               t.join()
 
-                                               should_run = False
-                                               try:
-                                                       # Various checks that decide whether
-                                                       # this step runs or not
-                                                       self.check_class_dependency(sync_step, failed_steps) # dont run Slices if Sites failed
-                                                       self.check_schedule(sync_step, deletion) # dont run sync_network_routes if time since last run < 1 hour
-                                                       should_run = True
-                                               except StepNotReady:
-                                                       logging.info('Step not ready: %s'%sync_step.__name__)
-                                                       failed_steps.append(sync_step)
-                                               except Exception,e:
-                                                       logging.error('%r',e)
-                                                       logger.log_exc("sync step failed: %r. Deletion: %r"%(sync_step,deletion))
-                                                       failed_steps.append(sync_step)
-
-                                               if (should_run):
-                                                       try:
-                                                               duration=time.time() - start_time
-
-                                                               logger.info('Executing step %s' % sync_step.__name__)
-
-                                                               # ********* This is the actual sync step
-                                                               #import pdb
-                                                               #pdb.set_trace()
-                                                               failed_objects = sync_step(failed=list(failed_step_objects), deletion=deletion)
-
-
-                                                               self.check_duration(sync_step, duration)
-                                                               if failed_objects:
-                                                                       failed_step_objects.update(failed_objects)
-
-                                                               self.update_run_time(sync_step,deletion)
-                                                       except Exception,e:
-                                                               logging.error('Model step failed. This seems like a misconfiguration or bug: %r. This error will not be relayed to the user!',e)
-                                                               logger.log_exc(e)
-                                                               failed_steps.append(S)
                                self.save_run_times()
                        except Exception, e:
                                logging.error('Core error. This seems like a misconfiguration or bug: %r. This error will not be relayed to the user!',e)
index d5f7523..31fec04 100644 (file)
@@ -86,7 +86,8 @@ class SyncStep:
                 except:
                     o.backend_status = str(e)
 
-                o.save(update_fields=['backend_status'])
+                if (o.pk):
+                    o.save(update_fields=['backend_status'])
 
                 logger.log_exc("sync step failed!")
                 failed.append(o)
index a2c9389..e771325 100644 (file)
@@ -10,6 +10,32 @@ import pdb
 from datetime import datetime
 from collections import defaultdict
 
+# Assumes that there are no empty dependencies
+# in the graph. E.g. Foo -> []
+def dfs(graph, visit):
+       nodes = graph.keys()
+       edge_nodes = set()
+
+       for n in nodes:
+               edge_nodes|=set(graph[n])
+
+       sinks = list(edge_nodes - set(nodes))
+       sources = list(set(nodes) - edge_nodes)
+       
+       nodes.extend(sinks)
+
+       visited = set(sources)
+       stack = sources
+       while stack:
+               current = stack.pop()
+               visit(current)
+               for node in graph[current]:
+                       if node not in visited:
+                               stack.append(node)
+                               visited.add(node)
+
+       return sources
+
 # Topological sort
 # Notes:
 # - Uses a stack instead of recursion
diff --git a/planetstack/model_policies.py b/planetstack/model_policies.py
new file mode 100644 (file)
index 0000000..3663c27
--- /dev/null
@@ -0,0 +1,8 @@
+from django.core.signals import post_save
+from django.dispatch import receiver
+import pdb
+
+@receiver(post_save)
+def post_save_handler(sender, **kwargs):
+       pdb.set_trace()
+    print("Request finished!")
diff --git a/planetstack/model_policies/model_policy_Network.py b/planetstack/model_policies/model_policy_Network.py
new file mode 100644 (file)
index 0000000..0511bee
--- /dev/null
@@ -0,0 +1,21 @@
+from core.models import *
+
+def handle(network):
+       # network deployments are not visible to users. We must ensure
+       # networks are deployed at all deploymets available to their slices.
+       slice_deployments = SliceDeployments.objects.all()
+       slice_deploy_lookup = defaultdict(list)
+       for slice_deployment in slice_deployments:
+               slice_deploy_lookup[slice_deployment.slice].append(slice_deployment.deployment)
+
+       network_deployments = NetworkDeployments.objects.all()
+       network_deploy_lookup = defaultdict(list)
+       for network_deployment in network_deployments:
+               network_deploy_lookup[network_deployment.network].append(network_deployment.deployment)
+
+       expected_deployments = slice_deploy_lookup[network.owner]
+       for expected_deployment in expected_deployments:
+               if network not in network_deploy_lookup or \
+                 expected_deployment not in network_deploy_lookup[network]:
+                       nd = NetworkDeployments(network=network, deployment=expected_deployment)
+                       nd.save()
diff --git a/planetstack/model_policies/model_policy_Slice.py b/planetstack/model_policies/model_policy_Slice.py
new file mode 100644 (file)
index 0000000..5d66903
--- /dev/null
@@ -0,0 +1,23 @@
+from core.models import *
+
+def handle(slice):
+       site_deployments = SiteDeployments.objects.all()
+       site_deploy_lookup = defaultdict(list)
+       for site_deployment in site_deployments:
+               site_deploy_lookup[site_deployment.site].append(site_deployment.deployment)
+       
+       slice_deployments = SliceDeployments.objects.all()
+       slice_deploy_lookup = defaultdict(list)
+       for slice_deployment in slice_deployments:
+               slice_deploy_lookup[slice_deployment.slice].append(slice_deployment.deployment)
+       
+       all_deployments = Deployment.objects.all() 
+       # slices are added to all deployments for now
+       expected_deployments = all_deployments
+       #expected_deployments = site_deploy_lookup[slice.site]
+       for expected_deployment in expected_deployments:
+               if slice not in slice_deploy_lookup or \
+                  expected_deployment not in slice_deploy_lookup[slice]:
+                       sd = SliceDeployments(slice=slice, deployment=expected_deployment)
+                       sd.save()
+
diff --git a/planetstack/model_policies/model_policy_User.py b/planetstack/model_policies/model_policy_User.py
new file mode 100644 (file)
index 0000000..1b1895e
--- /dev/null
@@ -0,0 +1,30 @@
+from core.models import *
+
+def handle(user):
+       deployments = Deployment.objects.all()
+       site_deployments = SiteDeployments.objects.all()
+       site_deploy_lookup = defaultdict(list)
+       for site_deployment in site_deployments:
+               site_deploy_lookup[site_deployment.site].append(site_deployment.deployment)
+
+       user_deploy_lookup = defaultdict(list)
+       for user_deployment in UserDeployments.objects.all():
+               user_deploy_lookup[user_deployment.user].append(user_deployment.deployment)
+   
+       all_deployments = Deployment.objects.filter() 
+       if user.is_admin:
+               # admins should have an account at all deployments
+               expected_deployments = deployments
+       else:
+               # normal users should have an account at their site's deployments
+               #expected_deployments = site_deploy_lookup[user.site]
+               # users are added to all deployments for now
+               expected_deployments = deployments        
+
+       for expected_deployment in expected_deployments:
+               if not user in user_deploy_lookup or \
+                 expected_deployment not in user_deploy_lookup[user]: 
+                       # add new record
+                       ud = UserDeployments(user=user, deployment=expected_deployment)
+                       ud.save()    
+
diff --git a/planetstack/model_policy.py b/planetstack/model_policy.py
new file mode 100644 (file)
index 0000000..38caf01
--- /dev/null
@@ -0,0 +1,14 @@
+from django.db.models.signals import post_save
+from django.dispatch import receiver
+import pdb
+from model_policies import *
+
+@receiver(post_save)
+def post_save_handler(sender, **kwargs):
+       sender_name = sender.__name__
+       policy_name = 'model_policy_%s'%sender_name
+       try:
+               policy_handler = globals[policy_name]
+               policy_handler(sender)
+       except:
+               pass
index 9f4658e..1f15a8e 100644 (file)
@@ -6,6 +6,7 @@ import traceback
 import commands
 import threading
 import json
+import pdb
 
 from datetime import datetime
 from collections import defaultdict
@@ -31,11 +32,27 @@ class StepNotReady(Exception):
 class NoOpDriver:
        def __init__(self):
                 self.enabled = True
+                self.dependency_graph = None
+
+STEP_STATUS_WORKING=1
+STEP_STATUS_OK=2
+STEP_STATUS_KO=3
+
+def invert_graph(g):
+       ig = {}
+       for k,v in g.items():
+               for v0 in v:
+                       try:
+                               ig[v0].append(k)
+                       except:
+                               ig=[k]
+       return ig
 
 class PlanetStackObserver:
        #sync_steps = [SyncNetworks,SyncNetworkSlivers,SyncSites,SyncSitePrivileges,SyncSlices,SyncSliceMemberships,SyncSlivers,SyncSliverIps,SyncExternalRoutes,SyncUsers,SyncRoles,SyncNodes,SyncImages,GarbageCollector]
        sync_steps = []
 
+       
        def __init__(self):
                # The Condition object that gets signalled by Feefie events
                self.step_lookup = {}
@@ -111,7 +128,6 @@ class PlanetStackObserver:
                                except KeyError:
                                        provides_dict[m.__name__]=[s.__name__]
 
-                               
                step_graph = {}
                for k,v in self.model_dependency_graph.iteritems():
                        try:
@@ -155,9 +171,10 @@ class PlanetStackObserver:
                                        pass
                                        # no dependencies, pass
 
-               dependency_graph = step_graph
+               self.dependency_graph = step_graph
+               self.deletion_dependency_graph = invert_graph(step_graph)
 
-               self.ordered_steps = toposort(dependency_graph, map(lambda s:s.__name__,self.sync_steps))
+               self.ordered_steps = toposort(self.dependency_graph, map(lambda s:s.__name__,self.sync_steps))
                print "Order of steps=",self.ordered_steps
                self.load_run_times()
                
@@ -205,7 +222,6 @@ class PlanetStackObserver:
                                self.last_deletion_run_times[e]=0
 
 
-
        def save_run_times(self):
                run_times = json.dumps(self.last_run_times)
                open('/tmp/observer_run_times','w').write(run_times)
@@ -221,16 +237,129 @@ class PlanetStackObserver:
                        if (failed_step in step.dependencies):
                                raise StepNotReady
 
+       def sync(self, S, deletion):
+               step = self.step_lookup[S]
+               start_time=time.time()
+               
+               dependency_graph = self.dependency_graph if not deletion else self.deletion_dependency_graph
+
+               # Wait for step dependencies to be met
+               try:
+                       deps = self.dependency_graph[S]
+                       has_deps = True
+               except KeyError:
+                       has_deps = False
+
+               if (has_deps):
+                       for d in deps:
+                               cond = self.step_conditions[d]
+                               cond.acquire()
+                               if (self.step_status[d] is STEP_STATUS_WORKING):
+                                       cond.wait()
+                               cond.release()
+                       go = self.step_status[d] == STEP_STATUS_OK
+               else:
+                       go = True
+
+               if (not go):
+                       self.failed_steps.append(sync_step)
+                       my_status = STEP_STATUS_KO
+               else:
+                       sync_step = step(driver=self.driver,error_map=self.error_mapper)
+                       sync_step.__name__ = step.__name__
+                       sync_step.dependencies = []
+                       try:
+                               mlist = sync_step.provides
+                               
+                               for m in mlist:
+                                       sync_step.dependencies.extend(self.model_dependency_graph[m.__name__])
+                       except KeyError:
+                               pass
+                       sync_step.debug_mode = debug_mode
+
+                       should_run = False
+                       try:
+                               # Various checks that decide whether
+                               # this step runs or not
+                               self.check_class_dependency(sync_step, self.failed_steps) # dont run Slices if Sites failed
+                               self.check_schedule(sync_step, deletion) # dont run sync_network_routes if time since last run < 1 hour
+                               should_run = True
+                       except StepNotReady:
+                               logging.info('Step not ready: %s'%sync_step.__name__)
+                               self.failed_steps.append(sync_step)
+                               my_status = STEP_STATUS_KO
+                       except Exception,e:
+                               logging.error('%r',e)
+                               logger.log_exc("sync step failed: %r. Deletion: %r"%(sync_step,deletion))
+                               self.failed_steps.append(sync_step)
+                               my_status = STEP_STATUS_KO
+
+                       if (should_run):
+                               try:
+                                       duration=time.time() - start_time
+
+                                       logger.info('Executing step %s' % sync_step.__name__)
+
+                                       failed_objects = sync_step(failed=list(self.failed_step_objects), deletion=deletion)
+
+                                       self.check_duration(sync_step, duration)
+
+                                       if failed_objects:
+                                               self.failed_step_objects.update(failed_objects)
+
+                                       my_status = STEP_STATUS_OK
+                                       self.update_run_time(sync_step,deletion)
+                               except Exception,e:
+                                       logging.error('Model step failed. This seems like a misconfiguration or bug: %r. This error will not be relayed to the user!',e)
+                                       logger.log_exc(e)
+                                       self.failed_steps.append(S)
+                                       my_status = STEP_STATUS_KO
+                       else:
+                               my_status = STEP_STATUS_OK
+               
+               try:
+                       my_cond = self.step_conditions[S]
+                       my_cond.acquire()
+                       self.step_status[S]=my_status
+                       my_cond.notify_all()
+                       my_cond.release()
+               except KeyError,e:
+                       logging.info('Step %r is a leaf')
+                       pass
+
        def run(self):
                if not self.driver.enabled:
                        return
+
                if (self.driver_kind=="openstack") and (not self.driver.has_openstack):
                        return
 
                while True:
                        try:
                                error_map_file = getattr(Config(), "error_map_path", "/opt/planetstack/error_map.txt")
-                               error_mapper = ErrorMapper(error_map_file)
+                               self.error_mapper = ErrorMapper(error_map_file)
+
+                               # Set of whole steps that failed
+                               self.failed_steps = []
+
+                               # Set of individual objects within steps that failed
+                               self.failed_step_objects = set()
+
+                               # Set up conditions and step status
+                               # This is needed for steps to run in parallel
+                               # while obeying dependencies.
+
+                               providers = set()
+                               for v in self.dependency_graph.values():
+                                       if (v):
+                                               providers.update(v)
+
+                               self.step_conditions = {}
+                               self.step_status = {}
+                               for p in list(providers):
+                                       self.step_conditions[p] = threading.Condition()
+                                       self.step_status[p] = STEP_STATUS_WORKING
+
 
                                logger.info('Waiting for event')
                                tBeforeWait = time.time()
@@ -238,68 +367,25 @@ class PlanetStackObserver:
                                logger.info('Observer woke up')
 
                                # Two passes. One for sync, the other for deletion.
-                               for deletion in (False,True):
+                               for deletion in [False,True]:
+                                       threads = []
                                        logger.info('Deletion=%r...'%deletion)
-                                       # Set of whole steps that failed
-                                       failed_steps = []
+                                       schedule = self.ordered_steps if not deletion else reversed(self.ordered_steps)
 
-                                       # Set of individual objects within steps that failed
-                                       failed_step_objects = set()
+                                       for S in schedule:
+                                               thread = threading.Thread(target=self.sync, args=(S, deletion))
 
-                                       ordered_steps = self.ordered_steps if not deletion else reversed(self.ordered_steps)
+                                               logger.info('Deletion=%r...'%deletion)
+                                               threads.append(thread)
 
-                                       for S in ordered_steps:
-                                               step = self.step_lookup[S]
-                                               start_time=time.time()
-                                               
-                                               sync_step = step(driver=self.driver,error_map=error_mapper)
-                                               sync_step.__name__ = step.__name__
-                                               sync_step.dependencies = []
-                                               try:
-                                                       mlist = sync_step.provides
-                                                       
-                                                       for m in mlist:
-                                                               sync_step.dependencies.extend(self.model_dependency_graph[m.__name__])
-                                               except KeyError:
-                                                       pass
-                                               sync_step.debug_mode = debug_mode
+                                       # Start threads 
+                                       for t in threads:
+                                               t.start()
+
+                                       # Wait for all threads to finish before continuing with the run loop
+                                       for t in threads:
+                                               t.join()
 
-                                               should_run = False
-                                               try:
-                                                       # Various checks that decide whether
-                                                       # this step runs or not
-                                                       self.check_class_dependency(sync_step, failed_steps) # dont run Slices if Sites failed
-                                                       self.check_schedule(sync_step, deletion) # dont run sync_network_routes if time since last run < 1 hour
-                                                       should_run = True
-                                               except StepNotReady:
-                                                       logging.info('Step not ready: %s'%sync_step.__name__)
-                                                       failed_steps.append(sync_step)
-                                               except Exception,e:
-                                                       logging.error('%r',e)
-                                                       logger.log_exc("sync step failed: %r. Deletion: %r"%(sync_step,deletion))
-                                                       failed_steps.append(sync_step)
-
-                                               if (should_run):
-                                                       try:
-                                                               duration=time.time() - start_time
-
-                                                               logger.info('Executing step %s' % sync_step.__name__)
-
-                                                               # ********* This is the actual sync step
-                                                               #import pdb
-                                                               #pdb.set_trace()
-                                                               failed_objects = sync_step(failed=list(failed_step_objects), deletion=deletion)
-
-
-                                                               self.check_duration(sync_step, duration)
-                                                               if failed_objects:
-                                                                       failed_step_objects.update(failed_objects)
-
-                                                               self.update_run_time(sync_step,deletion)
-                                                       except Exception,e:
-                                                               logging.error('Model step failed. This seems like a misconfiguration or bug: %r. This error will not be relayed to the user!',e)
-                                                               logger.log_exc(e)
-                                                               failed_steps.append(S)
                                self.save_run_times()
                        except Exception, e:
                                logging.error('Core error. This seems like a misconfiguration or bug: %r. This error will not be relayed to the user!',e)
index 367440d..8eecb3b 100644 (file)
@@ -20,25 +20,6 @@ class SyncNetworkDeployments(OpenStackSyncStep):
         if (deleted):
             return NetworkDeployments.deleted_objects.all()
         else:
-            # network deployments are not visible to users. We must ensure
-            # networks are deployed at all deploymets available to their slices.
-            slice_deployments = SliceDeployments.objects.all()
-            slice_deploy_lookup = defaultdict(list)
-            for slice_deployment in slice_deployments:
-                slice_deploy_lookup[slice_deployment.slice].append(slice_deployment.deployment)
-
-            network_deployments = NetworkDeployments.objects.all()
-            network_deploy_lookup = defaultdict(list)
-            for network_deployment in network_deployments:
-                network_deploy_lookup[network_deployment.network].append(network_deployment.deployment)
-
-            for network in Network.objects.filter():
-                expected_deployments = slice_deploy_lookup[network.owner]
-                for expected_deployment in expected_deployments:
-                    if network not in network_deploy_lookup or \
-                      expected_deployment not in network_deploy_lookup[network]:
-                        nd = NetworkDeployments(network=network, deployment=expected_deployment)
-                        nd.save()
             return NetworkDeployments.objects.filter(Q(enacted__lt=F('updated')) | Q(enacted=None))
 
     def get_next_subnet(self, deployment=None):
index 84257f7..5174fe6 100644 (file)
@@ -5,6 +5,7 @@ from planetstack.config import Config
 from observer.openstacksyncstep import OpenStackSyncStep
 from core.models.network import *
 from util.logger import Logger, logging
+from observer.steps.sync_network_deployments import *
 
 logger = Logger(level=logging.INFO)
 
@@ -16,10 +17,10 @@ class SyncNetworks(OpenStackSyncStep):
         network.save()
 
     def delete_record(self, network):
-        network_deployment_deleter = NetworkDeploymentDeleter()
+        network_deployment_deleter = SyncNetworkDeployments().delete_record
         for network_deployment in NetworkDeployments.objects.filter(network=network):
             try:
-                network_deployment_deleter(network_deployment.id)    
+                network_deployment_deleter(network_deployment)    
             except Exeption,e:
                 logger.log_exc("Failed to delete network deployment %s" % network_deployment)
                 raise e
index e7c645e..c560a6a 100644 (file)
@@ -4,6 +4,7 @@ from django.db.models import F, Q
 from planetstack.config import Config
 from observer.openstacksyncstep import OpenStackSyncStep
 from core.models.site import Site
+from observer.steps.sync_site_deployments import *
 
 class SyncSites(OpenStackSyncStep):
     provides=[Site]
@@ -14,6 +15,6 @@ class SyncSites(OpenStackSyncStep):
 
     def delete_record(self, site):
         site_deployments = SiteDeployments.objects.filter(site=site)
-        site_deployment_deleter = SiteDeploymentDeleter()
+        site_deployment_deleter = SyncSiteDeployments().delete_record
         for site_deployment in site_deployments:
-            site_deployment_deleter(site_deployment.id)
+            site_deployment_deleter(site_deployment)
index 24b7459..fff2e04 100644 (file)
@@ -20,30 +20,6 @@ class SyncSliceDeployments(OpenStackSyncStep):
         if (deleted):
             return SliceDeployments.deleted_objects.all()
         else:
-            # slice deployments are not visible to users. We must ensure
-            # slices are deployed at all deploymets available to their site.
-            site_deployments = SiteDeployments.objects.all()
-            site_deploy_lookup = defaultdict(list)
-            for site_deployment in site_deployments:
-                site_deploy_lookup[site_deployment.site].append(site_deployment.deployment)
-            
-            slice_deployments = SliceDeployments.objects.all()
-            slice_deploy_lookup = defaultdict(list)
-            for slice_deployment in slice_deployments:
-                slice_deploy_lookup[slice_deployment.slice].append(slice_deployment.deployment)
-            
-            all_deployments = Deployment.objects.all() 
-            for slice in Slice.objects.all():
-                # slices are added to all deployments for now
-                expected_deployments = all_deployments
-                #expected_deployments = site_deploy_lookup[slice.site]
-                for expected_deployment in expected_deployments:
-                    if slice not in slice_deploy_lookup or \
-                       expected_deployment not in slice_deploy_lookup[slice]:
-                        sd = SliceDeployments(slice=slice, deployment=expected_deployment)
-                        sd.save()
-
-            # now we can return all slice deployments that need to be enacted   
             return SliceDeployments.objects.filter(Q(enacted__lt=F('updated')) | Q(enacted=None))
 
     def get_next_subnet(self, deployment=None):
index c0b8abe..a6073b6 100644 (file)
@@ -6,6 +6,7 @@ from planetstack.config import Config
 from observer.openstacksyncstep import OpenStackSyncStep
 from core.models.slice import Slice, SliceDeployments
 from util.logger import Logger, logging
+from observer.steps.sync_slice_deployments import *
 
 logger = Logger(level=logging.INFO)
 
@@ -20,10 +21,10 @@ class SyncSlices(OpenStackSyncStep):
             slice_deployment.save()    
 
     def delete_record(self, slice):
-        slice_deployment_deleter = SliceDeploymentDeleter()
+        slice_deployment_deleter = SyncSliceDeployments().delete_record
         for slice_deployment in SliceDeployments.objects.filter(slice=slice):
             try:
-                slice_deployment_deleter(slice_deployment.id)
+                slice_deployment_deleter(slice_deployment)
             except Exception,e:
                 logger.log_exc("Failed to delete slice_deployment %s" % slice_deployment) 
                 raise e
diff --git a/planetstack/openstack_observer/steps/sync_sliver_ips.py b/planetstack/openstack_observer/steps/sync_sliver_ips.py
deleted file mode 100644 (file)
index d723da5..0000000
+++ /dev/null
@@ -1,50 +0,0 @@
-import os
-import base64
-from django.db.models import F, Q
-from planetstack.config import Config
-from observer.openstacksyncstep import OpenStackSyncStep
-from core.models.sliver import Sliver
-from util.logger import Logger, logging
-
-logger = Logger(level=logging.INFO)
-
-class SyncSliverIps(OpenStackSyncStep):
-    provides=[Sliver]
-    requested_interval=0
-
-    def fetch_pending(self, deleted):
-        return [] # XXX smbaker - disabling this sync_step, since sliver.ip is obsoleted by sync_network_slivers()
-
-        # Not supported yet
-        if (deleted):
-            return []
-        slivers = Sliver.objects.filter(ip=None)
-        return slivers
-
-    def sync_record(self, sliver):
-        driver = self.driver.client_driver(tenant=sliver.slice.name,
-                                           deployment=sliver.node.deployment.name)
-        servers = driver.shell.nova.servers.findall(id=sliver.instance_id)
-        if not servers:
-            return
-        server = servers[0]
-
-        # First try to grab the dedicated public address
-        # NOTE: "ext-net" is hardcoded here.
-        ip = None
-        ext_net_addrs = server.addresses.get("ext-net")\r
-        if ext_net_addrs:\r
-            ip = ext_net_addrs[0]["addr"]\r
-\r
-        # If there was no public address, then grab the first address in the\r
-        # list.\r
-        if not ip:\r
-            if server.addresses:\r
-                addrs = server.addresses.values()[0]\r
-                if addrs:\r
-                    ip = addrs[0]["addr"]
-
-        if ip and ip!=sliver.ip:
-            sliver.ip = ip
-            sliver.save()
-            logger.info("saved sliver ip: %s %s" % (sliver, ip))
index 5d6ce2d..42aae56 100644 (file)
@@ -20,42 +20,7 @@ class SyncUserDeployments(OpenStackSyncStep):
 
         if (deleted):
             return UserDeployments.deleted_objects.all()
-
-        # user deployments are not visible to users. We must ensure
-        # user are deployed at all deploymets available to their sites.
         else:
-            deployments = Deployment.objects.all()
-            site_deployments = SiteDeployments.objects.all()
-            site_deploy_lookup = defaultdict(list)
-            for site_deployment in site_deployments:
-                site_deploy_lookup[site_deployment.site].append(site_deployment.deployment)
-
-            user_deploy_lookup = defaultdict(list)
-            for user_deployment in UserDeployments.objects.all():
-                user_deploy_lookup[user_deployment.user].append(user_deployment.deployment)
-           
-            all_deployments = Deployment.objects.filter() 
-            for user in User.objects.all():
-                if user.is_admin:
-                    # admins should have an account at all deployments
-                    expected_deployments = deployments
-                else:
-                    # normal users should have an account at their site's deployments
-                    #expected_deployments = site_deploy_lookup[user.site]
-                    # users are added to all deployments for now
-                    expected_deployments = deployments        
-                for expected_deployment in expected_deployments:
-                    if not user in user_deploy_lookup or \
-                      expected_deployment not in user_deploy_lookup[user]: 
-                        # add new record
-                        ud = UserDeployments(user=user, deployment=expected_deployment)
-                        ud.save()
-                        #user_deployments.append(ud)
-                    #else:
-                    #    # update existing record
-                    #    ud = UserDeployments.objects.get(user=user, deployment=expected_deployment)
-                    #    user_deployments.append(ud)
-
             return UserDeployments.objects.filter(Q(enacted__lt=F('updated')) | Q(enacted=None)) 
 
     def sync_record(self, user_deployment):
index 2852b73..a22c213 100644 (file)
@@ -6,6 +6,7 @@ from planetstack.config import Config
 from observer.openstacksyncstep import OpenStackSyncStep
 from core.models.user import User
 from core.models.userdeployments import  UserDeployments
+from observer.steps.sync_user_deployments import SyncUserDeployments
 
 class SyncUsers(OpenStackSyncStep):
     provides=[User]
@@ -18,6 +19,6 @@ class SyncUsers(OpenStackSyncStep):
             user_deployment.save()
 
     def delete_record(self, user):
-        user_deployment_deleter = UserDeploymentDeleter()
+        user_deployment_deleter = SyncUserDeployments().delete_record
         for user_deployment in UserDeployments.objects.filter(user=user):
-            user_deployment_deleter(user_deployment.id)
+            user_deployment_deleter(user_deployment)
index c77c8d5..ad148b5 100644 (file)
@@ -4,6 +4,7 @@ from datetime import datetime
 from planetstack.config import Config
 from util.logger import Logger, logging
 from observer.steps import *
+from django.db.models import F, Q
 
 logger = Logger(level=logging.INFO)
 
@@ -48,7 +49,7 @@ class SyncStep:
         # Steps should override it if they have their own logic
         # for figuring out what objects are outstanding.
         main_obj = self.provides[0]
-        if (not deleted):
+        if (not deletion):
             objs = main_obj.objects.filter(Q(enacted__lt=F('updated')) | Q(enacted=None))
         else:
             objs = main_obj.deleted_objects.all()
@@ -59,10 +60,16 @@ class SyncStep:
     def check_dependencies(self, obj, failed):
         for dep in self.dependencies:
             peer_name = dep[0].lower() + dep[1:]    # django names are camelCased with the first letter lower
-            peer_object = getattr(obj, peer_name)
+            try:
+                peer_object = getattr(obj, peer_name)
+            except:
+                peer_object = None
+
             if (peer_object and peer_object.pk==failed.pk and type(peer_object)==type(failed)):
-                raise FailedDependency("Failed dependency for %s:%s peer %s:%s failed  %s:%s" % (obj.__class__.__name__, str(obj.pk\\r
-), peer_object.__class__.__name__, str(peer_object.pk), failed.__class__.__name__, str(failed.pk)))
+                if (obj.backend_status!=peer_object.backend_status):
+                    obj.backend_status = peer_object.backend_status
+                    obj.save(update_fields=['backend_status'])
+                raise FailedDependency("Failed dependency for %s:%s peer %s:%s failed  %s:%s" % (obj.__class__.__name__, str(obj.pk), peer_object.__class__.__name__, str(peer_object.pk), failed.__class__.__name__, str(failed.pk)))
 
     def call(self, failed=[], deletion=False):
         pending = self.fetch_pending(deletion)
@@ -79,14 +86,22 @@ class SyncStep:
                     o.backend_status = "OK"
                     o.save(update_fields=['enacted'])
             except Exception,e:
+                logger.log_exc("sync step failed!")
+                str_e = '%r'%e
                 try:
-                    o.backend_status = self.error_map.map(str(e))
+                    o.backend_status = self.error_map.map(str_e)
                 except:
-                    o.backend_status = str(e)
+                    o.backend_status = str_e
 
-                o.save(update_fields=['backend_status'])
+                # TOFIX:
+                # DatabaseError: value too long for type character varying(140)
+                if (o.pk):
+                    try:
+                        o.save(update_fields=['backend_status'])
+                    except:
+                        print "Could not update backend status field!"
+                        pass
 
-                logger.log_exc("sync step failed!")
                 failed.append(o)
 
         return failed
index 9fb9758..1084004 100644 (file)
@@ -14,13 +14,12 @@ deployment_flavors = [
 {% endfor %}
 ];
 
-sites = [
-{% for s in sites %}
-  [{{ s.0 }}, {{ s.1 }}],
+site_login_bases = [
+{% for s in site_login_bases %}
+  [{{ s.0 }}, "{{ s.1 }}"],
 {% endfor %}
 ];
 
-
 function update_nodes(deployment_select, node_select_id) {
     deployment_id = $(deployment_select).val();
     html = "<option value=''>---------</option>\n";
@@ -63,8 +62,10 @@ function sliver_deployment_changed(deployment_select) {
 function update_slice_prefix(site_select, slice_name_id) {
     site_id = $(site_select).val();
     slice_prefix="";
-    if (site_id in sites) {
-        slice_prefix=sites[site_id]+"_";
+    for (i in site_login_bases) {
+        if (site_login_bases[i][0] == site_id) {
+            slice_prefix=site_login_bases[i][1]+"_";
+        }
     }
     $("#"+slice_name_id).val(slice_prefix); 
 }