last plc-dependent code moved to PlDriver
authorThierry Parmentelat <thierry.parmentelat@sophia.inria.fr>
Wed, 14 Dec 2011 16:59:27 +0000 (17:59 +0100)
committerThierry Parmentelat <thierry.parmentelat@sophia.inria.fr>
Wed, 14 Dec 2011 16:59:27 +0000 (17:59 +0100)
cache reviewed (more locally handled in slicemanager and pldriver)
2 new settings to activate caching at the AM or SM in config
manager classes constructor now takes config in arg

12 files changed:
config/default_config.xml
sfa/generic/__init__.py
sfa/managers/aggregate_manager.py
sfa/managers/aggregate_manager_eucalyptus.py
sfa/managers/aggregate_manager_max.py
sfa/managers/driver.py
sfa/managers/managerwrapper.py
sfa/managers/registry_manager.py
sfa/managers/slice_manager.py
sfa/plc/peers.py
sfa/plc/pldriver.py
sfa/util/cache.py

index e074865..9156cb3 100644 (file)
@@ -128,6 +128,14 @@ Thierry Parmentelat
          <value>12347</value>
          <description>The port where the slice manager is to be found.</description>
        </variable>
          <value>12347</value>
          <description>The port where the slice manager is to be found.</description>
        </variable>
+
+       <variable id="caching" type="boolean">
+         <name>Cache advertisement rspec</name>
+         <value>false</value>
+         <description>Enable caching of the global advertisement, as
+         returned by ListResources without a slice argument. </description>
+         </variable>
+
       </variablelist>
     </category>
 
       </variablelist>
     </category>
 
@@ -145,12 +153,6 @@ Thierry Parmentelat
          aggregate manager.</description>
        </variable>
 
          aggregate manager.</description>
        </variable>
 
-    <variable id="rspec_schema" type="string">
-      <name>RSpec Schema</name>
-      <value>/etc/sfa/pl.rng</value>
-      <description>The path to the default schema</description>
-    </variable>
-
        <variable id="host" type="hostname">
          <name>Hostname</name>
          <value>localhost</value>
        <variable id="host" type="hostname">
          <name>Hostname</name>
          <value>localhost</value>
@@ -164,6 +166,13 @@ Thierry Parmentelat
          <description>The port where the aggregate is to be found.</description>
        </variable>
 
          <description>The port where the aggregate is to be found.</description>
        </variable>
 
+       <variable id="caching" type="boolean">
+         <name>Cache advertisement rspec</name>
+         <value>true</value>
+         <description>Enable caching of the global advertisement, as
+         returned by ListResources without a slice argument. </description>
+         </variable>
+
       </variablelist>
 
     </category>
       </variablelist>
 
     </category>
index 98248be..3c3855d 100644 (file)
@@ -60,15 +60,16 @@ class Generic:
         if not 'interface' in kwargs:
             logger.critical("Generic.make_api: no interface found")
         api = self.api_class()(*args, **kwargs)
         if not 'interface' in kwargs:
             logger.critical("Generic.make_api: no interface found")
         api = self.api_class()(*args, **kwargs)
-        manager = self.make_manager(api.interface)
+        manager_class_or_module = self.make_manager(api.interface)
         driver = self.make_driver (api.config, api.interface)
         ### arrange stuff together
         # add a manager wrapper
         driver = self.make_driver (api.config, api.interface)
         ### arrange stuff together
         # add a manager wrapper
-        manager_wrap = ManagerWrapper(manager,api.interface)
+        manager_wrap = ManagerWrapper(manager_class_or_module,api.interface,api.config)
         api.manager=manager_wrap
         # insert driver in manager
         api.manager=manager_wrap
         # insert driver in manager
-        logger.debug("Setting manager.driver, manager=%s"%manager)
-        manager.driver=driver
+        logger.debug("Setting manager.driver, manager=%s"%manager_class_or_module)
+        # xxx this should go into the object and not the class !?!
+        manager_class_or_module.driver=driver
         # add it in api as well for convenience
         api.driver=driver
         return api
         # add it in api as well for convenience
         api.driver=driver
         return api
index 995e694..8955920 100644 (file)
@@ -1,32 +1,10 @@
-import time
-import sys
-
-from sfa.util.sfalogging import logger
-from sfa.util.faults import RecordNotFound, SliverDoesNotExist
-from sfa.util.xrn import Xrn, get_authority, hrn_to_urn, urn_to_hrn, urn_to_sliver_id
-from sfa.util.plxrn import slicename_to_hrn, hrn_to_pl_slicename
 from sfa.util.version import version_core
 from sfa.util.version import version_core
-from sfa.util.sfatime import utcparse
+from sfa.util.xrn import Xrn
 from sfa.util.callids import Callids
 
 from sfa.util.callids import Callids
 
-from sfa.trust.sfaticket import SfaTicket
-from sfa.trust.credential import Credential
-
-from sfa.rspecs.version_manager import VersionManager
-from sfa.rspecs.rspec import RSpec
-
-from sfa.server.sfaapi import SfaApi
-
-import sfa.plc.peers as peers
-from sfa.plc.plaggregate import PlAggregate
-from sfa.plc.plslices import PlSlices
-
 class AggregateManager:
 
 class AggregateManager:
 
-    def __init__ (self):
-        # xxx Thierry : caching at the aggregate level sounds wrong...
-        self.caching=True
-        #self.caching=False
+    def __init__ (self, config): pass
     
     # essentially a union of the core version, the generic version (this code) and
     # whatever the driver needs to expose
     
     # essentially a union of the core version, the generic version (this code) and
     # whatever the driver needs to expose
@@ -45,17 +23,37 @@ class AggregateManager:
         version.update(testbed_version)
         return version
     
         version.update(testbed_version)
         return version
     
-    def SliverStatus (self, api, slice_xrn, creds, options):
+    def ListSlices(self, api, creds, options):
+        call_id = options.get('call_id')
+        if Callids().already_handled(call_id): return []
+        return self.driver.list_slices (creds, options)
+
+    def ListResources(self, api, creds, options):
+        call_id = options.get('call_id')
+        if Callids().already_handled(call_id): return ""
+
+        # get slice's hrn from options
+        slice_xrn = options.get('geni_slice_urn', None)
+        # pass None if no slice is specified
+        if not slice_xrn:
+            slice_hrn, slice_urn = None, None
+        else:
+            xrn = Xrn(slice_xrn)
+            slice_urn=xrn.get_urn()
+            slice_hrn=xrn.get_hrn()
+
+        return self.driver.list_resources (slice_urn, slice_hrn, creds, options)
+    
+    def SliverStatus (self, api, xrn, creds, options):
         call_id = options.get('call_id')
         if Callids().already_handled(call_id): return {}
     
         call_id = options.get('call_id')
         if Callids().already_handled(call_id): return {}
     
-        xrn = Xrn(slice_xrn)
+        xrn = Xrn(xrn)
         slice_urn=xrn.get_urn()
         slice_hrn=xrn.get_hrn()
         slice_urn=xrn.get_urn()
         slice_hrn=xrn.get_hrn()
-
         return self.driver.sliver_status (slice_urn, slice_hrn)
     
         return self.driver.sliver_status (slice_urn, slice_hrn)
     
-    def CreateSliver(self, api, slice_xrn, creds, rspec_string, users, options):
+    def CreateSliver(self, api, xrn, creds, rspec_string, users, options):
         """
         Create the sliver[s] (slice) at this aggregate.    
         Verify HRN and initialize the slice record in PLC if necessary.
         """
         Create the sliver[s] (slice) at this aggregate.    
         Verify HRN and initialize the slice record in PLC if necessary.
@@ -63,195 +61,54 @@ class AggregateManager:
         call_id = options.get('call_id')
         if Callids().already_handled(call_id): return ""
     
         call_id = options.get('call_id')
         if Callids().already_handled(call_id): return ""
     
-        xrn = Xrn(slice_xrn)
+        xrn = Xrn(xrn)
         slice_urn=xrn.get_urn()
         slice_hrn=xrn.get_hrn()
 
         return self.driver.create_sliver (slice_urn, slice_hrn, creds, rspec_string, users, options)
     
         slice_urn=xrn.get_urn()
         slice_hrn=xrn.get_hrn()
 
         return self.driver.create_sliver (slice_urn, slice_hrn, creds, rspec_string, users, options)
     
+    def DeleteSliver(self, api, xrn, creds, options):
+        call_id = options.get('call_id')
+        if Callids().already_handled(call_id): return True
+
+        xrn = Xrn(xrn)
+        slice_urn=xrn.get_urn()
+        slice_hrn=xrn.get_hrn()
+        return self.driver.delete_sliver (slice_urn, slice_hrn, creds, options)
+
     def RenewSliver(self, api, xrn, creds, expiration_time, options):
         call_id = options.get('call_id')
         if Callids().already_handled(call_id): return True
         
     def RenewSliver(self, api, xrn, creds, expiration_time, options):
         call_id = options.get('call_id')
         if Callids().already_handled(call_id): return True
         
-        xrn = Xrn(slice_xrn)
+        xrn = Xrn(xrn)
         slice_urn=xrn.get_urn()
         slice_hrn=xrn.get_hrn()
         slice_urn=xrn.get_urn()
         slice_hrn=xrn.get_hrn()
-
         return self.driver.renew_sliver (slice_urn, slice_hrn, creds, expiration_time, options)
     
         return self.driver.renew_sliver (slice_urn, slice_hrn, creds, expiration_time, options)
     
+    ### these methods could use an options extension for at least call_id
     def start_slice(self, api, xrn, creds):
     def start_slice(self, api, xrn, creds):
-        (hrn, _) = urn_to_hrn(xrn)
-        slicename = hrn_to_pl_slicename(hrn)
-        slices = api.driver.GetSlices({'name': slicename}, ['slice_id'])
-        if not slices:
-            raise RecordNotFound(hrn)
-        slice_id = slices[0]['slice_id']
-        slice_tags = api.driver.GetSliceTags({'slice_id': slice_id, 'tagname': 'enabled'}, ['slice_tag_id'])
-        # just remove the tag if it exists
-        if slice_tags:
-            api.driver.DeleteSliceTag(slice_tags[0]['slice_tag_id'])
-    
-        return 1
+        xrn = Xrn(xrn)
+        slice_urn=xrn.get_urn()
+        slice_hrn=xrn.get_hrn()
+        return self.driver.start_slice (slice_urn, slice_hrn, creds)
      
     def stop_slice(self, api, xrn, creds):
      
     def stop_slice(self, api, xrn, creds):
-        hrn, _ = urn_to_hrn(xrn)
-        slicename = hrn_to_pl_slicename(hrn)
-        slices = api.driver.GetSlices({'name': slicename}, ['slice_id'])
-        if not slices:
-            raise RecordNotFound(hrn)
-        slice_id = slices[0]['slice_id']
-        slice_tags = api.driver.GetSliceTags({'slice_id': slice_id, 'tagname': 'enabled'})
-        if not slice_tags:
-            api.driver.AddSliceTag(slice_id, 'enabled', '0')
-        elif slice_tags[0]['value'] != "0":
-            tag_id = slice_tags[0]['slice_tag_id']
-            api.driver.UpdateSliceTag(tag_id, '0')
-        return 1
-    
+        xrn = Xrn(xrn)
+        slice_urn=xrn.get_urn()
+        slice_hrn=xrn.get_hrn()
+        return self.driver.stop_slice (slice_urn, slice_hrn, creds)
+
     def reset_slice(self, api, xrn):
     def reset_slice(self, api, xrn):
-        # XX not implemented at this interface
-        return 1
-    
-    def DeleteSliver(self, api, xrn, creds, options):
-        call_id = options.get('call_id')
-        if Callids().already_handled(call_id): return ""
-        (hrn, _) = urn_to_hrn(xrn)
-        slicename = hrn_to_pl_slicename(hrn)
-        slices = api.driver.GetSlices({'name': slicename})
-        if not slices:
-            return 1
-        slice = slices[0]
-    
-        # determine if this is a peer slice
-        peer = peers.get_peer(api, hrn)
-        try:
-            if peer:
-                api.driver.UnBindObjectFromPeer('slice', slice['slice_id'], peer)
-            api.driver.DeleteSliceFromNodes(slicename, slice['node_ids'])
-        finally:
-            if peer:
-                api.driver.BindObjectToPeer('slice', slice['slice_id'], peer, slice['peer_slice_id'])
-        return 1
-    
-    def ListSlices(self, api, creds, options):
-        call_id = options.get('call_id')
-        if Callids().already_handled(call_id): return []
-        # look in cache first
-        if self.caching and api.cache:
-            slices = api.cache.get('slices')
-            if slices:
-                return slices
-    
-        # get data from db 
-        slices = api.driver.GetSlices({'peer_id': None}, ['name'])
-        slice_hrns = [slicename_to_hrn(api.hrn, slice['name']) for slice in slices]
-        slice_urns = [hrn_to_urn(slice_hrn, 'slice') for slice_hrn in slice_hrns]
-    
-        # cache the result
-        if self.caching and api.cache:
-            api.cache.add('slices', slice_urns) 
-    
-        return slice_urns
-        
-    def ListResources(self, api, creds, options):
-        call_id = options.get('call_id')
-        if Callids().already_handled(call_id): return ""
-        # get slice's hrn from options
-        xrn = options.get('geni_slice_urn', None)
-        cached = options.get('cached', True) 
-        (hrn, _) = urn_to_hrn(xrn)
-    
-        version_manager = VersionManager()
-        # get the rspec's return format from options
-        rspec_version = version_manager.get_version(options.get('geni_rspec_version'))
-        version_string = "rspec_%s" % (rspec_version)
-    
-        #panos adding the info option to the caching key (can be improved)
-        if options.get('info'):
-            version_string = version_string + "_"+options.get('info', 'default')
-    
-        # look in cache first
-        if self.caching and api.cache and not xrn and cached:
-            rspec = api.cache.get(version_string)
-            if rspec:
-                api.logger.info("aggregate.ListResources: returning cached value for hrn %s"%hrn)
-                return rspec 
-    
-        #panos: passing user-defined options
-        #print "manager options = ",options
-        aggregate = PlAggregate(self.driver)
-        rspec =  aggregate.get_rspec(slice_xrn=xrn, version=rspec_version, options=options)
-    
-        # cache the result
-        if self.caching and api.cache and not xrn:
-            api.cache.add(version_string, rspec)
-    
-        return rspec
-    
-    
+        xrn = Xrn(xrn)
+        slice_urn=xrn.get_urn()
+        slice_hrn=xrn.get_hrn()
+        return self.driver.reset_slice (slice_urn, slice_hrn)
+
     def GetTicket(self, api, xrn, creds, rspec, users, options):
     
     def GetTicket(self, api, xrn, creds, rspec, users, options):
     
-        (slice_hrn, _) = urn_to_hrn(xrn)
-        slices = PlSlices(self.driver)
-        peer = slices.get_peer(slice_hrn)
-        sfa_peer = slices.get_sfa_peer(slice_hrn)
-    
-        # get the slice record
-        credential = api.getCredential()
-        interface = api.registries[api.hrn]
-        registry = api.server_proxy(interface, credential)
-        records = registry.Resolve(xrn, credential)
-    
-        # make sure we get a local slice record
-        record = None
-        for tmp_record in records:
-            if tmp_record['type'] == 'slice' and \
-               not tmp_record['peer_authority']:
-    #Error (E0602, GetTicket): Undefined variable 'SliceRecord'
-                record = SliceRecord(dict=tmp_record)
-        if not record:
-            raise RecordNotFound(slice_hrn)
-        
-        # similar to CreateSliver, we must verify that the required records exist
-        # at this aggregate before we can issue a ticket
-        # parse rspec
-        rspec = RSpec(rspec_string)
-        requested_attributes = rspec.version.get_slice_attributes()
-    
-        # ensure site record exists
-        site = slices.verify_site(hrn, slice_record, peer, sfa_peer)
-        # ensure slice record exists
-        slice = slices.verify_slice(hrn, slice_record, peer, sfa_peer)
-        # ensure person records exists
-        persons = slices.verify_persons(hrn, slice, users, peer, sfa_peer)
-        # ensure slice attributes exists
-        slices.verify_slice_attributes(slice, requested_attributes)
-        
-        # get sliver info
-        slivers = slices.get_slivers(slice_hrn)
-    
-        if not slivers:
-            raise SliverDoesNotExist(slice_hrn)
-    
-        # get initscripts
-        initscripts = []
-        data = {
-            'timestamp': int(time.time()),
-            'initscripts': initscripts,
-            'slivers': slivers
-        }
-    
-        # create the ticket
-        object_gid = record.get_gid_object()
-        new_ticket = SfaTicket(subject = object_gid.get_subject())
-        new_ticket.set_gid_caller(api.auth.client_gid)
-        new_ticket.set_gid_object(object_gid)
-        new_ticket.set_issuer(key=api.key, subject=api.hrn)
-        new_ticket.set_pubkey(object_gid.get_pubkey())
-        new_ticket.set_attributes(data)
-        new_ticket.set_rspec(rspec)
-        #new_ticket.set_parent(api.auth.hierarchy.get_auth_ticket(auth_hrn))
-        new_ticket.encode()
-        new_ticket.sign()
-    
-        return new_ticket.save_to_string(save_parents=True)
+        xrn = Xrn(xrn)
+        slice_urn=xrn.get_urn()
+        slice_hrn=xrn.get_hrn()
+
+        return self.driver.get_ticket (slice_urn, slice_hrn, creds, rspec, options)
+
index aa803ac..552f544 100644 (file)
@@ -286,7 +286,7 @@ class AggregateManagerEucalyptus:
     _inited=False
 
     # the init_server mechanism has vanished
     _inited=False
 
     # the init_server mechanism has vanished
-    def __init__ (self):
+    def __init__ (self, config):
         if AggregateManagerEucalyptus._inited: return
         AggregateManagerEucalyptus.init_server()
 
         if AggregateManagerEucalyptus._inited: return
         AggregateManagerEucalyptus.init_server()
 
index 7001fb6..0300522 100644 (file)
@@ -18,6 +18,9 @@ from sfa.plc.plslices import PlSlices
 
 class AggregateManagerMax (AggregateManager):
 
 
 class AggregateManagerMax (AggregateManager):
 
+    def __init__ (self, config):
+        pass
+
     RSPEC_TMP_FILE_PREFIX = "/tmp/max_rspec"
     
     # execute shell command and return both exit code and text output
     RSPEC_TMP_FILE_PREFIX = "/tmp/max_rspec"
     
     # execute shell command and return both exit code and text output
index ae2b624..f48964f 100644 (file)
@@ -71,6 +71,16 @@ class Driver:
     # 'geni_request_rspec_versions' and 'geni_ad_rspec_versions' are mandatory
     def aggregate_version (self): return {}
 
     # 'geni_request_rspec_versions' and 'geni_ad_rspec_versions' are mandatory
     def aggregate_version (self): return {}
 
+    # the answer to ListSlices, a list of slice urns
+    def list_slices (self, creds, options):
+        return []
+
+    # answer to ListResources
+    # first 2 args are None in case of resource discovery
+    # expected : rspec (xml string)
+    def list_resources (self, slice_urn, slice_hrn, creds, options):
+        return "dummy Driver.list_resources needs to be redefined"
+
     # the answer to SliverStatus on a given slice
     def sliver_status (self, slice_urn, slice_hrn): return {}
 
     # the answer to SliverStatus on a given slice
     def sliver_status (self, slice_urn, slice_hrn): return {}
 
@@ -80,9 +90,27 @@ class Driver:
     def create_sliver (self, slice_urn, slice_hrn, creds, rspec_string, users, options):
         return "dummy Driver.create_sliver needs to be redefined"
 
     def create_sliver (self, slice_urn, slice_hrn, creds, rspec_string, users, options):
         return "dummy Driver.create_sliver needs to be redefined"
 
+    # the answer to DeleteSliver on a given slice
+    def delete_sliver (self, slice_urn, slice_hrn, creds, options):
+        return "dummy Driver.delete_sliver needs to be redefined"
+
     # the answer to RenewSliver
     # expected to return a boolean to indicate success
     def renew_sliver (self, slice_urn, slice_hrn, creds, expiration_time, options):
         return False
     # the answer to RenewSliver
     # expected to return a boolean to indicate success
     def renew_sliver (self, slice_urn, slice_hrn, creds, expiration_time, options):
         return False
-    
-    
+
+    # the answer to start_slice/stop_slice
+    # 1 means success, otherwise raise exception
+    def start_slice (self, slice_urn, slice_xrn, creds):
+        return 1
+    def stop_slice (self, slice_urn, slice_xrn, creds):
+        return 1
+    # somehow this one does not have creds - not implemented in PL anyways
+    def reset_slice (self, slice_urn, slice_xrn, creds):
+        return 1
+
+    # the answer to GetTicket
+    # expected is a ticket, i.e. a certificate, as a string
+    def get_ticket (self, slice_urn, slice_xrn, creds, rspec, options):
+        return "dummy Driver.get_ticket needs to be redefined"
+
index 86907e7..58a0527 100644 (file)
@@ -15,14 +15,14 @@ class ManagerWrapper:
     is not implemented by a libarary and will generally be more helpful than
     the standard AttributeError         
     """
     is not implemented by a libarary and will generally be more helpful than
     the standard AttributeError         
     """
-    def __init__(self, manager, interface):
+    def __init__(self, manager, interface, config):
         if isinstance (manager, ModuleType):
             # old-fashioned module implementation
             self.manager = manager
         elif isinstance (manager, ClassType):
             # create an instance; we don't pass the api in argument as it is passed 
             # to the actual method calls anyway
         if isinstance (manager, ModuleType):
             # old-fashioned module implementation
             self.manager = manager
         elif isinstance (manager, ClassType):
             # create an instance; we don't pass the api in argument as it is passed 
             # to the actual method calls anyway
-            self.manager = manager()
+            self.manager = manager(config)
         else:
             raise SfaAPIError,"Argument to ManagerWrapper must be a module or class"
         self.interface = interface
         else:
             raise SfaAPIError,"Argument to ManagerWrapper must be a module or class"
         self.interface = interface
index 0659431..d29aafe 100644 (file)
@@ -23,7 +23,7 @@ from sfa.storage.table import SfaTable
 
 class RegistryManager:
 
 
 class RegistryManager:
 
-    def __init__ (self): pass
+    def __init__ (self, config): pass
 
     # The GENI GetVersion call
     def GetVersion(self, api, options):
 
     # The GENI GetVersion call
     def GetVersion(self, api, options):
index ee082c7..be6cc7e 100644 (file)
@@ -12,18 +12,28 @@ from sfa.util.sfalogging import logger
 from sfa.util.xrn import Xrn, urn_to_hrn
 from sfa.util.version import version_core
 from sfa.util.callids import Callids
 from sfa.util.xrn import Xrn, urn_to_hrn
 from sfa.util.version import version_core
 from sfa.util.callids import Callids
+from sfa.util.cache import Cache
+
 from sfa.server.threadmanager import ThreadManager
 from sfa.server.threadmanager import ThreadManager
+
 from sfa.rspecs.rspec_converter import RSpecConverter
 from sfa.rspecs.version_manager import VersionManager
 from sfa.rspecs.rspec import RSpec 
 from sfa.rspecs.rspec_converter import RSpecConverter
 from sfa.rspecs.version_manager import VersionManager
 from sfa.rspecs.rspec import RSpec 
+
 from sfa.client.client_helper import sfa_to_pg_users_arg
 from sfa.client.return_value import ReturnValue
 
 class SliceManager:
 from sfa.client.client_helper import sfa_to_pg_users_arg
 from sfa.client.return_value import ReturnValue
 
 class SliceManager:
-    def __init__ (self):
-        # xxx todo should be configurable 
-        # self.caching=False
-        self.caching=True
+
+    # the cache instance is a class member so it survives across incoming requests
+    cache = None
+
+    def __init__ (self, config):
+        self.cache=None
+        if config.SFA_SM_CACHING:
+            if SliceManager.cache is None:
+                SliceManager.cache = Cache()
+            self.cache = SliceManager.cache
         
     def GetVersion(self, api, options):
         # peers explicitly in aggregates.xml
         
     def GetVersion(self, api, options):
         # peers explicitly in aggregates.xml
@@ -121,9 +131,10 @@ class SliceManager:
         version_string = "rspec_%s" % (rspec_version)
     
         # look in cache first
         version_string = "rspec_%s" % (rspec_version)
     
         # look in cache first
-        if self.caching and api.cache and not xrn:
-            rspec =  api.cache.get(version_string)
+        if self.cache and not xrn:
+            rspec =  self.cache.get(version_string)
             if rspec:
             if rspec:
+                api.logger.debug("SliceManager.ListResources returns cached advertisement")
                 return rspec
     
         # get the callers hrn
                 return rspec
     
         # get the callers hrn
@@ -164,8 +175,9 @@ class SliceManager:
                     api.logger.log_exc("SM.ListResources: Failed to merge aggregate rspec")
     
         # cache the result
                     api.logger.log_exc("SM.ListResources: Failed to merge aggregate rspec")
     
         # cache the result
-        if self.caching and api.cache and not xrn:
-            api.cache.add(version_string, rspec.toxml())
+        if self.cache and not xrn:
+            api.logger.debug("SliceManager.ListResources caches advertisement")
+            self.cache.add(version_string, rspec.toxml())
     
         return rspec.toxml()
 
     
         return rspec.toxml()
 
@@ -346,9 +358,11 @@ class SliceManager:
             return server.ListSlices(creds, options)
 
         # look in cache first
             return server.ListSlices(creds, options)
 
         # look in cache first
-        if self.caching and api.cache:
-            slices = api.cache.get('slices')
+        # xxx is this really frequent enough that it is worth being cached ?
+        if self.cache:
+            slices = self.cache.get('slices')
             if slices:
             if slices:
+                api.logger.debug("SliceManager.ListSlices returns from cache")
                 return slices
     
         # get the callers hrn
                 return slices
     
         # get the callers hrn
@@ -377,8 +391,9 @@ class SliceManager:
             slices.extend(result)
     
         # cache the result
             slices.extend(result)
     
         # cache the result
-        if self.caching and api.cache:
-            api.cache.add('slices', slices)
+        if self.cache:
+            api.logger.debug("SliceManager.ListSlices caches value")
+            self.cache.add('slices', slices)
     
         return slices
     
     
         return slices
     
index f9f80db..c256c7b 100644 (file)
@@ -1,7 +1,7 @@
 from sfa.util.xrn import get_authority
 from types import StringTypes
 
 from sfa.util.xrn import get_authority
 from types import StringTypes
 
-def get_peer(api, hrn):
+def get_peer(pldriver, hrn):
     # Because of myplc native federation,  we first need to determine if this
     # slice belongs to out local plc or a myplc peer. We will assume it
     # is a local site, unless we find out otherwise
     # Because of myplc native federation,  we first need to determine if this
     # slice belongs to out local plc or a myplc peer. We will assume it
     # is a local site, unless we find out otherwise
@@ -13,7 +13,7 @@ def get_peer(api, hrn):
     # get this site's authority (sfa root authority or sub authority)
     site_authority = get_authority(slice_authority).lower()
     # check if we are already peered with this site_authority, if so
     # get this site's authority (sfa root authority or sub authority)
     site_authority = get_authority(slice_authority).lower()
     # check if we are already peered with this site_authority, if so
-    peers = api.driver.GetPeers( {}, ['peer_id', 'peername', 'shortname', 'hrn_root'])
+    peers = pldriver.GetPeers( {}, ['peer_id', 'peername', 'shortname', 'hrn_root'])
     for peer_record in peers:
         names = [name.lower() for name in peer_record.values() if isinstance(name, StringTypes)]
         if site_authority in names:
     for peer_record in peers:
         names = [name.lower() for name in peer_record.values() if isinstance(name, StringTypes)]
         if site_authority in names:
@@ -22,14 +22,14 @@ def get_peer(api, hrn):
     return peer
 
 
     return peer
 
 
-def get_sfa_peer(api, hrn):
-    # return the authority for this hrn or None if we are the authority
-    sfa_peer = None
-    slice_authority = get_authority(hrn)
-    site_authority = get_authority(slice_authority)
-
-    if site_authority != api.hrn:
-        sfa_peer = site_authority
-
-    return sfa_peer
+#def get_sfa_peer(pldriver, hrn):
+#    # return the authority for this hrn or None if we are the authority
+#    sfa_peer = None
+#    slice_authority = get_authority(hrn)
+#    site_authority = get_authority(slice_authority)
+#
+#    if site_authority != pldriver.hrn:
+#        sfa_peer = site_authority
+#
+#    return sfa_peer
 
 
index d892e25..5ce3b71 100644 (file)
@@ -1,14 +1,21 @@
+import time
 import datetime
 #
 import datetime
 #
-from sfa.util.faults import MissingSfaInfo, UnknownSfaType
+from sfa.util.faults import MissingSfaInfo, UnknownSfaType, \
+    RecordNotFound, SfaNotImplemented, SliverDoesNotExist
+
 from sfa.util.sfalogging import logger
 from sfa.util.defaultdict import defaultdict
 from sfa.util.sfalogging import logger
 from sfa.util.defaultdict import defaultdict
-from sfa.util.xrn import hrn_to_urn, get_leaf
-from sfa.util.plxrn import slicename_to_hrn, hostname_to_hrn, hrn_to_pl_slicename, hrn_to_pl_login_base
+from sfa.util.sfatime import utcparse
+from sfa.util.xrn import hrn_to_urn, get_leaf, urn_to_sliver_id
+from sfa.util.cache import Cache
 
 # one would think the driver should not need to mess with the SFA db, but..
 from sfa.storage.table import SfaTable
 
 
 # one would think the driver should not need to mess with the SFA db, but..
 from sfa.storage.table import SfaTable
 
+# used to be used in get_ticket
+#from sfa.trust.sfaticket import SfaTicket
+
 from sfa.rspecs.version_manager import VersionManager
 from sfa.rspecs.rspec import RSpec
 
 from sfa.rspecs.version_manager import VersionManager
 from sfa.rspecs.rspec import RSpec
 
@@ -16,10 +23,11 @@ from sfa.rspecs.rspec import RSpec
 from sfa.managers.driver import Driver
 
 from sfa.plc.plshell import PlShell
 from sfa.managers.driver import Driver
 
 from sfa.plc.plshell import PlShell
-
 import sfa.plc.peers as peers
 from sfa.plc.plaggregate import PlAggregate
 from sfa.plc.plslices import PlSlices
 import sfa.plc.peers as peers
 from sfa.plc.plaggregate import PlAggregate
 from sfa.plc.plslices import PlSlices
+from sfa.util.plxrn import slicename_to_hrn, hostname_to_hrn, hrn_to_pl_slicename, hrn_to_pl_login_base
+
 
 def list_to_dict(recs, key):
     """
 
 def list_to_dict(recs, key):
     """
@@ -40,9 +48,17 @@ def list_to_dict(recs, key):
 # 
 class PlDriver (Driver, PlShell):
 
 # 
 class PlDriver (Driver, PlShell):
 
+    # the cache instance is a class member so it survives across incoming requests
+    cache = None
+
     def __init__ (self, config):
         PlShell.__init__ (self, config)
         Driver.__init__ (self, config)
     def __init__ (self, config):
         PlShell.__init__ (self, config)
         Driver.__init__ (self, config)
+        self.cache=None
+        if config.SFA_AGGREGATE_CACHING:
+            if PlDriver.cache is None:
+                PlDriver.cache = Cache()
+            self.cache = PlDriver.cache
  
     ########################################
     ########## registry oriented
  
     ########################################
     ########## registry oriented
@@ -105,9 +121,9 @@ class PlDriver (Driver, PlShell):
 
         elif type == 'node':
             login_base = hrn_to_pl_login_base(sfa_record['authority'])
 
         elif type == 'node':
             login_base = hrn_to_pl_login_base(sfa_record['authority'])
-            nodes = api.driver.GetNodes([pl_record['hostname']])
+            nodes = self.GetNodes([pl_record['hostname']])
             if not nodes:
             if not nodes:
-                pointer = api.driver.AddNode(login_base, pl_record)
+                pointer = self.AddNode(login_base, pl_record)
             else:
                 pointer = nodes[0]['node_id']
     
             else:
                 pointer = nodes[0]['node_id']
     
@@ -392,7 +408,6 @@ class PlDriver (Driver, PlShell):
             
         return records   
 
             
         return records   
 
-    # aggregates is basically api.aggregates
     def fill_record_sfa_info(self, records):
 
         def startswith(prefix, values):
     def fill_record_sfa_info(self, records):
 
         def startswith(prefix, values):
@@ -543,13 +558,66 @@ class PlDriver (Driver, PlShell):
             'geni_ad_rspec_versions': ad_rspec_versions,
             }
 
             'geni_ad_rspec_versions': ad_rspec_versions,
             }
 
+    def list_slices (self, creds, options):
+        # look in cache first
+        if self.cache:
+            slices = self.cache.get('slices')
+            if slices:
+                logger.debug("PlDriver.list_slices returns from cache")
+                return slices
+    
+        # get data from db 
+        slices = self.GetSlices({'peer_id': None}, ['name'])
+        slice_hrns = [slicename_to_hrn(self.hrn, slice['name']) for slice in slices]
+        slice_urns = [hrn_to_urn(slice_hrn, 'slice') for slice_hrn in slice_hrns]
+    
+        # cache the result
+        if self.cache:
+            logger.debug ("PlDriver.list_slices stores value in cache")
+            self.cache.add('slices', slice_urns) 
+    
+        return slice_urns
+        
+    # first 2 args are None in case of resource discovery
+    def list_resources (self, slice_urn, slice_hrn, creds, options):
+        cached_requested = options.get('cached', True) 
+    
+        version_manager = VersionManager()
+        # get the rspec's return format from options
+        rspec_version = version_manager.get_version(options.get('geni_rspec_version'))
+        version_string = "rspec_%s" % (rspec_version)
+    
+        #panos adding the info option to the caching key (can be improved)
+        if options.get('info'):
+            version_string = version_string + "_"+options.get('info', 'default')
+    
+        # look in cache first
+        if cached_requested and self.cache and not slice_hrn:
+            rspec = self.cache.get(version_string)
+            if rspec:
+                logger.debug("PlDriver.ListResources: returning cached advertisement")
+                return rspec 
+    
+        #panos: passing user-defined options
+        #print "manager options = ",options
+        aggregate = PlAggregate(self)
+        rspec =  aggregate.get_rspec(slice_xrn=slice_urn, version=rspec_version, 
+                                     options=options)
+    
+        # cache the result
+        if self.cache and not slice_hrn:
+            logger.debug("PlDriver.ListResources: stores advertisement in cache")
+            self.cache.add(version_string, rspec)
+    
+        return rspec
+    
     def sliver_status (self, slice_urn, slice_hrn):
         # find out where this slice is currently running
         slicename = hrn_to_pl_slicename(slice_hrn)
         
         slices = self.GetSlices([slicename], ['slice_id', 'node_ids','person_ids','name','expires'])
         if len(slices) == 0:        
     def sliver_status (self, slice_urn, slice_hrn):
         # find out where this slice is currently running
         slicename = hrn_to_pl_slicename(slice_hrn)
         
         slices = self.GetSlices([slicename], ['slice_id', 'node_ids','person_ids','name','expires'])
         if len(slices) == 0:        
-            raise Exception("Slice %s not found (used %s as slicename internally)" % (slice_xrn, slicename))
+            raise SliverDoesNotExist("%s (used %s as slicename internally)" % (slice_hrn, slicename))
         slice = slices[0]
         
         # report about the local nodes only
         slice = slices[0]
         
         # report about the local nodes only
@@ -625,17 +693,138 @@ class PlDriver (Driver, PlShell):
         
         return aggregate.get_rspec(slice_xrn=slice_urn, version=rspec.version)
 
         
         return aggregate.get_rspec(slice_xrn=slice_urn, version=rspec.version)
 
+    def delete_sliver (self, slice_urn, slice_hrn, creds, options):
+        slicename = hrn_to_pl_slicename(slice_hrn)
+        slices = self.GetSlices({'name': slicename})
+        if not slices:
+            return 1
+        slice = slices[0]
+    
+        # determine if this is a peer slice
+        # xxx I wonder if this would not need to use PlSlices.get_peer instead 
+        # in which case plc.peers could be deprecated as this here
+        # is the only/last call to this last method in plc.peers
+        peer = peers.get_peer(self, slice_hrn)
+        try:
+            if peer:
+                self.UnBindObjectFromPeer('slice', slice['slice_id'], peer)
+            self.DeleteSliceFromNodes(slicename, slice['node_ids'])
+        finally:
+            if peer:
+                self.BindObjectToPeer('slice', slice['slice_id'], peer, slice['peer_slice_id'])
+        return 1
+    
     def renew_sliver (self, slice_urn, slice_hrn, creds, expiration_time, options):
         slicename = hrn_to_pl_slicename(slice_hrn)
     def renew_sliver (self, slice_urn, slice_hrn, creds, expiration_time, options):
         slicename = hrn_to_pl_slicename(slice_hrn)
-        slices = self.driver.GetSlices({'name': slicename}, ['slice_id'])
+        slices = self.GetSlices({'name': slicename}, ['slice_id'])
         if not slices:
             raise RecordNotFound(slice_hrn)
         slice = slices[0]
         requested_time = utcparse(expiration_time)
         record = {'expires': int(time.mktime(requested_time.timetuple()))}
         try:
         if not slices:
             raise RecordNotFound(slice_hrn)
         slice = slices[0]
         requested_time = utcparse(expiration_time)
         record = {'expires': int(time.mktime(requested_time.timetuple()))}
         try:
-            self.driver.UpdateSlice(slice['slice_id'], record)
+            self.UpdateSlice(slice['slice_id'], record)
             return True
         except:
             return False
 
             return True
         except:
             return False
 
+    # remove the 'enabled' tag 
+    def start_slice (self, slice_urn, slice_hrn, creds):
+        slicename = hrn_to_pl_slicename(slice_hrn)
+        slices = self.GetSlices({'name': slicename}, ['slice_id'])
+        if not slices:
+            raise RecordNotFound(slice_hrn)
+        slice_id = slices[0]['slice_id']
+        slice_tags = self.GetSliceTags({'slice_id': slice_id, 'tagname': 'enabled'}, ['slice_tag_id'])
+        # just remove the tag if it exists
+        if slice_tags:
+            self.DeleteSliceTag(slice_tags[0]['slice_tag_id'])
+        return 1
+
+    # set the 'enabled' tag to 0
+    def stop_slice (self, slice_urn, slice_hrn, creds):
+        slicename = hrn_to_pl_slicename(slice_hrn)
+        slices = self.GetSlices({'name': slicename}, ['slice_id'])
+        if not slices:
+            raise RecordNotFound(slice_hrn)
+        slice_id = slices[0]['slice_id']
+        slice_tags = self.GetSliceTags({'slice_id': slice_id, 'tagname': 'enabled'})
+        if not slice_tags:
+            self.AddSliceTag(slice_id, 'enabled', '0')
+        elif slice_tags[0]['value'] != "0":
+            tag_id = slice_tags[0]['slice_tag_id']
+            self.UpdateSliceTag(tag_id, '0')
+        return 1
+    
+    def reset_slice (self, slice_urn, slice_hrn, creds):
+        raise SfaNotImplemented ("reset_slice not available at this interface")
+    
+    # xxx this code is quite old and has not run for ages
+    # it is obviously totally broken and needs a rewrite
+    def get_ticket (self, slice_urn, slice_hrn, creds, rspec_string, options):
+        raise SfaNotImplemented,"PlDriver.get_ticket needs a rewrite"
+# please keep this code for future reference
+#        slices = PlSlices(self)
+#        peer = slices.get_peer(slice_hrn)
+#        sfa_peer = slices.get_sfa_peer(slice_hrn)
+#    
+#        # get the slice record
+#        credential = api.getCredential()
+#        interface = api.registries[api.hrn]
+#        registry = api.server_proxy(interface, credential)
+#        records = registry.Resolve(xrn, credential)
+#    
+#        # make sure we get a local slice record
+#        record = None
+#        for tmp_record in records:
+#            if tmp_record['type'] == 'slice' and \
+#               not tmp_record['peer_authority']:
+#    #Error (E0602, GetTicket): Undefined variable 'SliceRecord'
+#                slice_record = SliceRecord(dict=tmp_record)
+#        if not record:
+#            raise RecordNotFound(slice_hrn)
+#        
+#        # similar to CreateSliver, we must verify that the required records exist
+#        # at this aggregate before we can issue a ticket
+#        # parse rspec
+#        rspec = RSpec(rspec_string)
+#        requested_attributes = rspec.version.get_slice_attributes()
+#    
+#        # ensure site record exists
+#        site = slices.verify_site(slice_hrn, slice_record, peer, sfa_peer)
+#        # ensure slice record exists
+#        slice = slices.verify_slice(slice_hrn, slice_record, peer, sfa_peer)
+#        # ensure person records exists
+#    # xxx users is undefined in this context
+#        persons = slices.verify_persons(slice_hrn, slice, users, peer, sfa_peer)
+#        # ensure slice attributes exists
+#        slices.verify_slice_attributes(slice, requested_attributes)
+#        
+#        # get sliver info
+#        slivers = slices.get_slivers(slice_hrn)
+#    
+#        if not slivers:
+#            raise SliverDoesNotExist(slice_hrn)
+#    
+#        # get initscripts
+#        initscripts = []
+#        data = {
+#            'timestamp': int(time.time()),
+#            'initscripts': initscripts,
+#            'slivers': slivers
+#        }
+#    
+#        # create the ticket
+#        object_gid = record.get_gid_object()
+#        new_ticket = SfaTicket(subject = object_gid.get_subject())
+#        new_ticket.set_gid_caller(api.auth.client_gid)
+#        new_ticket.set_gid_object(object_gid)
+#        new_ticket.set_issuer(key=api.key, subject=self.hrn)
+#        new_ticket.set_pubkey(object_gid.get_pubkey())
+#        new_ticket.set_attributes(data)
+#        new_ticket.set_rspec(rspec)
+#        #new_ticket.set_parent(api.auth.hierarchy.get_auth_ticket(auth_hrn))
+#        new_ticket.encode()
+#        new_ticket.sign()
+#    
+#        return new_ticket.save_to_string(save_parents=True)
index a2ded4a..ee4716c 100644 (file)
@@ -8,7 +8,7 @@ import pickle
 from datetime import datetime
 
 # maximum lifetime of cached data (in seconds) 
 from datetime import datetime
 
 # maximum lifetime of cached data (in seconds) 
-MAX_CACHE_TTL = 60 * 60
+DEFAULT_CACHE_TTL = 60 * 60
 
 class CacheData:
 
 
 class CacheData:
 
@@ -17,7 +17,7 @@ class CacheData:
     expires = None
     lock = None
 
     expires = None
     lock = None
 
-    def __init__(self, data, ttl = MAX_CACHE_TTL):
+    def __init__(self, data, ttl = DEFAULT_CACHE_TTL):
         self.lock = threading.RLock()
         self.data = data
         self.renew(ttl)
         self.lock = threading.RLock()
         self.data = data
         self.renew(ttl)
@@ -31,11 +31,11 @@ class CacheData:
     def get_expires_date(self):
         return str(datetime.fromtimestamp(self.expires))
 
     def get_expires_date(self):
         return str(datetime.fromtimestamp(self.expires))
 
-    def renew(self, ttl = MAX_CACHE_TTL):
+    def renew(self, ttl = DEFAULT_CACHE_TTL):
         self.created = time.time()
         self.expires = self.created + ttl   
        
         self.created = time.time()
         self.expires = self.created + ttl   
        
-    def set_data(self, data, renew=True, ttl = MAX_CACHE_TTL):
+    def set_data(self, data, renew=True, ttl = DEFAULT_CACHE_TTL):
         with self.lock: 
             self.data = data
             if renew:
         with self.lock: 
             self.data = data
             if renew:
@@ -73,7 +73,7 @@ class Cache:
         if filename:
             self.load_from_file(filename)
    
         if filename:
             self.load_from_file(filename)
    
-    def add(self, key, value, ttl = MAX_CACHE_TTL):
+    def add(self, key, value, ttl = DEFAULT_CACHE_TTL):
         with self.lock:
             if self.cache.has_key(key):
                 self.cache[key].set_data(value, ttl=ttl)
         with self.lock:
             if self.cache.has_key(key):
                 self.cache[key].set_data(value, ttl=ttl)