From: Thierry Parmentelat Date: Wed, 14 Dec 2011 16:59:27 +0000 (+0100) Subject: last plc-dependent code moved to PlDriver X-Git-Tag: sfa-2.0-5~1 X-Git-Url: http://git.onelab.eu/?p=sfa.git;a=commitdiff_plain;h=b6de9285a970eccb9658a0078c49f76fb11af7cd last plc-dependent code moved to PlDriver cache reviewed (more locally handled in slicemanager and pldriver) 2 new settings to activate caching at the AM or SM in config manager classes constructor now takes config in arg --- diff --git a/config/default_config.xml b/config/default_config.xml index e074865e..9156cb32 100644 --- a/config/default_config.xml +++ b/config/default_config.xml @@ -128,6 +128,14 @@ Thierry Parmentelat 12347 The port where the slice manager is to be found. + + + Cache advertisement rspec + false + Enable caching of the global advertisement, as + returned by ListResources without a slice argument. + + @@ -145,12 +153,6 @@ Thierry Parmentelat aggregate manager. - - RSpec Schema - /etc/sfa/pl.rng - The path to the default schema - - Hostname localhost @@ -164,6 +166,13 @@ Thierry Parmentelat The port where the aggregate is to be found. + + Cache advertisement rspec + true + Enable caching of the global advertisement, as + returned by ListResources without a slice argument. + + diff --git a/sfa/generic/__init__.py b/sfa/generic/__init__.py index 98248be2..3c3855dc 100644 --- a/sfa/generic/__init__.py +++ b/sfa/generic/__init__.py @@ -60,15 +60,16 @@ class Generic: if not 'interface' in kwargs: logger.critical("Generic.make_api: no interface found") api = self.api_class()(*args, **kwargs) - manager = self.make_manager(api.interface) + manager_class_or_module = self.make_manager(api.interface) driver = self.make_driver (api.config, api.interface) ### arrange stuff together # add a manager wrapper - manager_wrap = ManagerWrapper(manager,api.interface) + manager_wrap = ManagerWrapper(manager_class_or_module,api.interface,api.config) api.manager=manager_wrap # insert driver in manager - logger.debug("Setting manager.driver, manager=%s"%manager) - manager.driver=driver + logger.debug("Setting manager.driver, manager=%s"%manager_class_or_module) + # xxx this should go into the object and not the class !?! + manager_class_or_module.driver=driver # add it in api as well for convenience api.driver=driver return api diff --git a/sfa/managers/aggregate_manager.py b/sfa/managers/aggregate_manager.py index 995e694a..89559204 100644 --- a/sfa/managers/aggregate_manager.py +++ b/sfa/managers/aggregate_manager.py @@ -1,32 +1,10 @@ -import time -import sys - -from sfa.util.sfalogging import logger -from sfa.util.faults import RecordNotFound, SliverDoesNotExist -from sfa.util.xrn import Xrn, get_authority, hrn_to_urn, urn_to_hrn, urn_to_sliver_id -from sfa.util.plxrn import slicename_to_hrn, hrn_to_pl_slicename from sfa.util.version import version_core -from sfa.util.sfatime import utcparse +from sfa.util.xrn import Xrn from sfa.util.callids import Callids -from sfa.trust.sfaticket import SfaTicket -from sfa.trust.credential import Credential - -from sfa.rspecs.version_manager import VersionManager -from sfa.rspecs.rspec import RSpec - -from sfa.server.sfaapi import SfaApi - -import sfa.plc.peers as peers -from sfa.plc.plaggregate import PlAggregate -from sfa.plc.plslices import PlSlices - class AggregateManager: - def __init__ (self): - # xxx Thierry : caching at the aggregate level sounds wrong... - self.caching=True - #self.caching=False + def __init__ (self, config): pass # essentially a union of the core version, the generic version (this code) and # whatever the driver needs to expose @@ -45,17 +23,37 @@ class AggregateManager: version.update(testbed_version) return version - def SliverStatus (self, api, slice_xrn, creds, options): + def ListSlices(self, api, creds, options): + call_id = options.get('call_id') + if Callids().already_handled(call_id): return [] + return self.driver.list_slices (creds, options) + + def ListResources(self, api, creds, options): + call_id = options.get('call_id') + if Callids().already_handled(call_id): return "" + + # get slice's hrn from options + slice_xrn = options.get('geni_slice_urn', None) + # pass None if no slice is specified + if not slice_xrn: + slice_hrn, slice_urn = None, None + else: + xrn = Xrn(slice_xrn) + slice_urn=xrn.get_urn() + slice_hrn=xrn.get_hrn() + + return self.driver.list_resources (slice_urn, slice_hrn, creds, options) + + def SliverStatus (self, api, xrn, creds, options): call_id = options.get('call_id') if Callids().already_handled(call_id): return {} - xrn = Xrn(slice_xrn) + xrn = Xrn(xrn) slice_urn=xrn.get_urn() slice_hrn=xrn.get_hrn() - return self.driver.sliver_status (slice_urn, slice_hrn) - def CreateSliver(self, api, slice_xrn, creds, rspec_string, users, options): + def CreateSliver(self, api, xrn, creds, rspec_string, users, options): """ Create the sliver[s] (slice) at this aggregate. Verify HRN and initialize the slice record in PLC if necessary. @@ -63,195 +61,54 @@ class AggregateManager: call_id = options.get('call_id') if Callids().already_handled(call_id): return "" - xrn = Xrn(slice_xrn) + xrn = Xrn(xrn) slice_urn=xrn.get_urn() slice_hrn=xrn.get_hrn() return self.driver.create_sliver (slice_urn, slice_hrn, creds, rspec_string, users, options) + def DeleteSliver(self, api, xrn, creds, options): + call_id = options.get('call_id') + if Callids().already_handled(call_id): return True + + xrn = Xrn(xrn) + slice_urn=xrn.get_urn() + slice_hrn=xrn.get_hrn() + return self.driver.delete_sliver (slice_urn, slice_hrn, creds, options) + def RenewSliver(self, api, xrn, creds, expiration_time, options): call_id = options.get('call_id') if Callids().already_handled(call_id): return True - xrn = Xrn(slice_xrn) + xrn = Xrn(xrn) slice_urn=xrn.get_urn() slice_hrn=xrn.get_hrn() - return self.driver.renew_sliver (slice_urn, slice_hrn, creds, expiration_time, options) + ### these methods could use an options extension for at least call_id def start_slice(self, api, xrn, creds): - (hrn, _) = urn_to_hrn(xrn) - slicename = hrn_to_pl_slicename(hrn) - slices = api.driver.GetSlices({'name': slicename}, ['slice_id']) - if not slices: - raise RecordNotFound(hrn) - slice_id = slices[0]['slice_id'] - slice_tags = api.driver.GetSliceTags({'slice_id': slice_id, 'tagname': 'enabled'}, ['slice_tag_id']) - # just remove the tag if it exists - if slice_tags: - api.driver.DeleteSliceTag(slice_tags[0]['slice_tag_id']) - - return 1 + xrn = Xrn(xrn) + slice_urn=xrn.get_urn() + slice_hrn=xrn.get_hrn() + return self.driver.start_slice (slice_urn, slice_hrn, creds) def stop_slice(self, api, xrn, creds): - hrn, _ = urn_to_hrn(xrn) - slicename = hrn_to_pl_slicename(hrn) - slices = api.driver.GetSlices({'name': slicename}, ['slice_id']) - if not slices: - raise RecordNotFound(hrn) - slice_id = slices[0]['slice_id'] - slice_tags = api.driver.GetSliceTags({'slice_id': slice_id, 'tagname': 'enabled'}) - if not slice_tags: - api.driver.AddSliceTag(slice_id, 'enabled', '0') - elif slice_tags[0]['value'] != "0": - tag_id = slice_tags[0]['slice_tag_id'] - api.driver.UpdateSliceTag(tag_id, '0') - return 1 - + xrn = Xrn(xrn) + slice_urn=xrn.get_urn() + slice_hrn=xrn.get_hrn() + return self.driver.stop_slice (slice_urn, slice_hrn, creds) + def reset_slice(self, api, xrn): - # XX not implemented at this interface - return 1 - - def DeleteSliver(self, api, xrn, creds, options): - call_id = options.get('call_id') - if Callids().already_handled(call_id): return "" - (hrn, _) = urn_to_hrn(xrn) - slicename = hrn_to_pl_slicename(hrn) - slices = api.driver.GetSlices({'name': slicename}) - if not slices: - return 1 - slice = slices[0] - - # determine if this is a peer slice - peer = peers.get_peer(api, hrn) - try: - if peer: - api.driver.UnBindObjectFromPeer('slice', slice['slice_id'], peer) - api.driver.DeleteSliceFromNodes(slicename, slice['node_ids']) - finally: - if peer: - api.driver.BindObjectToPeer('slice', slice['slice_id'], peer, slice['peer_slice_id']) - return 1 - - def ListSlices(self, api, creds, options): - call_id = options.get('call_id') - if Callids().already_handled(call_id): return [] - # look in cache first - if self.caching and api.cache: - slices = api.cache.get('slices') - if slices: - return slices - - # get data from db - slices = api.driver.GetSlices({'peer_id': None}, ['name']) - slice_hrns = [slicename_to_hrn(api.hrn, slice['name']) for slice in slices] - slice_urns = [hrn_to_urn(slice_hrn, 'slice') for slice_hrn in slice_hrns] - - # cache the result - if self.caching and api.cache: - api.cache.add('slices', slice_urns) - - return slice_urns - - def ListResources(self, api, creds, options): - call_id = options.get('call_id') - if Callids().already_handled(call_id): return "" - # get slice's hrn from options - xrn = options.get('geni_slice_urn', None) - cached = options.get('cached', True) - (hrn, _) = urn_to_hrn(xrn) - - version_manager = VersionManager() - # get the rspec's return format from options - rspec_version = version_manager.get_version(options.get('geni_rspec_version')) - version_string = "rspec_%s" % (rspec_version) - - #panos adding the info option to the caching key (can be improved) - if options.get('info'): - version_string = version_string + "_"+options.get('info', 'default') - - # look in cache first - if self.caching and api.cache and not xrn and cached: - rspec = api.cache.get(version_string) - if rspec: - api.logger.info("aggregate.ListResources: returning cached value for hrn %s"%hrn) - return rspec - - #panos: passing user-defined options - #print "manager options = ",options - aggregate = PlAggregate(self.driver) - rspec = aggregate.get_rspec(slice_xrn=xrn, version=rspec_version, options=options) - - # cache the result - if self.caching and api.cache and not xrn: - api.cache.add(version_string, rspec) - - return rspec - - + xrn = Xrn(xrn) + slice_urn=xrn.get_urn() + slice_hrn=xrn.get_hrn() + return self.driver.reset_slice (slice_urn, slice_hrn) + def GetTicket(self, api, xrn, creds, rspec, users, options): - (slice_hrn, _) = urn_to_hrn(xrn) - slices = PlSlices(self.driver) - peer = slices.get_peer(slice_hrn) - sfa_peer = slices.get_sfa_peer(slice_hrn) - - # get the slice record - credential = api.getCredential() - interface = api.registries[api.hrn] - registry = api.server_proxy(interface, credential) - records = registry.Resolve(xrn, credential) - - # make sure we get a local slice record - record = None - for tmp_record in records: - if tmp_record['type'] == 'slice' and \ - not tmp_record['peer_authority']: - #Error (E0602, GetTicket): Undefined variable 'SliceRecord' - record = SliceRecord(dict=tmp_record) - if not record: - raise RecordNotFound(slice_hrn) - - # similar to CreateSliver, we must verify that the required records exist - # at this aggregate before we can issue a ticket - # parse rspec - rspec = RSpec(rspec_string) - requested_attributes = rspec.version.get_slice_attributes() - - # ensure site record exists - site = slices.verify_site(hrn, slice_record, peer, sfa_peer) - # ensure slice record exists - slice = slices.verify_slice(hrn, slice_record, peer, sfa_peer) - # ensure person records exists - persons = slices.verify_persons(hrn, slice, users, peer, sfa_peer) - # ensure slice attributes exists - slices.verify_slice_attributes(slice, requested_attributes) - - # get sliver info - slivers = slices.get_slivers(slice_hrn) - - if not slivers: - raise SliverDoesNotExist(slice_hrn) - - # get initscripts - initscripts = [] - data = { - 'timestamp': int(time.time()), - 'initscripts': initscripts, - 'slivers': slivers - } - - # create the ticket - object_gid = record.get_gid_object() - new_ticket = SfaTicket(subject = object_gid.get_subject()) - new_ticket.set_gid_caller(api.auth.client_gid) - new_ticket.set_gid_object(object_gid) - new_ticket.set_issuer(key=api.key, subject=api.hrn) - new_ticket.set_pubkey(object_gid.get_pubkey()) - new_ticket.set_attributes(data) - new_ticket.set_rspec(rspec) - #new_ticket.set_parent(api.auth.hierarchy.get_auth_ticket(auth_hrn)) - new_ticket.encode() - new_ticket.sign() - - return new_ticket.save_to_string(save_parents=True) + xrn = Xrn(xrn) + slice_urn=xrn.get_urn() + slice_hrn=xrn.get_hrn() + + return self.driver.get_ticket (slice_urn, slice_hrn, creds, rspec, options) + diff --git a/sfa/managers/aggregate_manager_eucalyptus.py b/sfa/managers/aggregate_manager_eucalyptus.py index aa803acc..552f5446 100644 --- a/sfa/managers/aggregate_manager_eucalyptus.py +++ b/sfa/managers/aggregate_manager_eucalyptus.py @@ -286,7 +286,7 @@ class AggregateManagerEucalyptus: _inited=False # the init_server mechanism has vanished - def __init__ (self): + def __init__ (self, config): if AggregateManagerEucalyptus._inited: return AggregateManagerEucalyptus.init_server() diff --git a/sfa/managers/aggregate_manager_max.py b/sfa/managers/aggregate_manager_max.py index 7001fb6b..03005228 100644 --- a/sfa/managers/aggregate_manager_max.py +++ b/sfa/managers/aggregate_manager_max.py @@ -18,6 +18,9 @@ from sfa.plc.plslices import PlSlices class AggregateManagerMax (AggregateManager): + def __init__ (self, config): + pass + RSPEC_TMP_FILE_PREFIX = "/tmp/max_rspec" # execute shell command and return both exit code and text output diff --git a/sfa/managers/driver.py b/sfa/managers/driver.py index ae2b6248..f48964f1 100644 --- a/sfa/managers/driver.py +++ b/sfa/managers/driver.py @@ -71,6 +71,16 @@ class Driver: # 'geni_request_rspec_versions' and 'geni_ad_rspec_versions' are mandatory def aggregate_version (self): return {} + # the answer to ListSlices, a list of slice urns + def list_slices (self, creds, options): + return [] + + # answer to ListResources + # first 2 args are None in case of resource discovery + # expected : rspec (xml string) + def list_resources (self, slice_urn, slice_hrn, creds, options): + return "dummy Driver.list_resources needs to be redefined" + # the answer to SliverStatus on a given slice def sliver_status (self, slice_urn, slice_hrn): return {} @@ -80,9 +90,27 @@ class Driver: def create_sliver (self, slice_urn, slice_hrn, creds, rspec_string, users, options): return "dummy Driver.create_sliver needs to be redefined" + # the answer to DeleteSliver on a given slice + def delete_sliver (self, slice_urn, slice_hrn, creds, options): + return "dummy Driver.delete_sliver needs to be redefined" + # the answer to RenewSliver # expected to return a boolean to indicate success def renew_sliver (self, slice_urn, slice_hrn, creds, expiration_time, options): return False - - + + # the answer to start_slice/stop_slice + # 1 means success, otherwise raise exception + def start_slice (self, slice_urn, slice_xrn, creds): + return 1 + def stop_slice (self, slice_urn, slice_xrn, creds): + return 1 + # somehow this one does not have creds - not implemented in PL anyways + def reset_slice (self, slice_urn, slice_xrn, creds): + return 1 + + # the answer to GetTicket + # expected is a ticket, i.e. a certificate, as a string + def get_ticket (self, slice_urn, slice_xrn, creds, rspec, options): + return "dummy Driver.get_ticket needs to be redefined" + diff --git a/sfa/managers/managerwrapper.py b/sfa/managers/managerwrapper.py index 86907e79..58a0527b 100644 --- a/sfa/managers/managerwrapper.py +++ b/sfa/managers/managerwrapper.py @@ -15,14 +15,14 @@ class ManagerWrapper: is not implemented by a libarary and will generally be more helpful than the standard AttributeError """ - def __init__(self, manager, interface): + def __init__(self, manager, interface, config): if isinstance (manager, ModuleType): # old-fashioned module implementation self.manager = manager elif isinstance (manager, ClassType): # create an instance; we don't pass the api in argument as it is passed # to the actual method calls anyway - self.manager = manager() + self.manager = manager(config) else: raise SfaAPIError,"Argument to ManagerWrapper must be a module or class" self.interface = interface diff --git a/sfa/managers/registry_manager.py b/sfa/managers/registry_manager.py index 0659431d..d29aafef 100644 --- a/sfa/managers/registry_manager.py +++ b/sfa/managers/registry_manager.py @@ -23,7 +23,7 @@ from sfa.storage.table import SfaTable class RegistryManager: - def __init__ (self): pass + def __init__ (self, config): pass # The GENI GetVersion call def GetVersion(self, api, options): diff --git a/sfa/managers/slice_manager.py b/sfa/managers/slice_manager.py index ee082c71..be6cc7e2 100644 --- a/sfa/managers/slice_manager.py +++ b/sfa/managers/slice_manager.py @@ -12,18 +12,28 @@ from sfa.util.sfalogging import logger from sfa.util.xrn import Xrn, urn_to_hrn from sfa.util.version import version_core from sfa.util.callids import Callids +from sfa.util.cache import Cache + from sfa.server.threadmanager import ThreadManager + from sfa.rspecs.rspec_converter import RSpecConverter from sfa.rspecs.version_manager import VersionManager from sfa.rspecs.rspec import RSpec + from sfa.client.client_helper import sfa_to_pg_users_arg from sfa.client.return_value import ReturnValue class SliceManager: - def __init__ (self): - # xxx todo should be configurable - # self.caching=False - self.caching=True + + # the cache instance is a class member so it survives across incoming requests + cache = None + + def __init__ (self, config): + self.cache=None + if config.SFA_SM_CACHING: + if SliceManager.cache is None: + SliceManager.cache = Cache() + self.cache = SliceManager.cache def GetVersion(self, api, options): # peers explicitly in aggregates.xml @@ -121,9 +131,10 @@ class SliceManager: version_string = "rspec_%s" % (rspec_version) # look in cache first - if self.caching and api.cache and not xrn: - rspec = api.cache.get(version_string) + if self.cache and not xrn: + rspec = self.cache.get(version_string) if rspec: + api.logger.debug("SliceManager.ListResources returns cached advertisement") return rspec # get the callers hrn @@ -164,8 +175,9 @@ class SliceManager: api.logger.log_exc("SM.ListResources: Failed to merge aggregate rspec") # cache the result - if self.caching and api.cache and not xrn: - api.cache.add(version_string, rspec.toxml()) + if self.cache and not xrn: + api.logger.debug("SliceManager.ListResources caches advertisement") + self.cache.add(version_string, rspec.toxml()) return rspec.toxml() @@ -346,9 +358,11 @@ class SliceManager: return server.ListSlices(creds, options) # look in cache first - if self.caching and api.cache: - slices = api.cache.get('slices') + # xxx is this really frequent enough that it is worth being cached ? + if self.cache: + slices = self.cache.get('slices') if slices: + api.logger.debug("SliceManager.ListSlices returns from cache") return slices # get the callers hrn @@ -377,8 +391,9 @@ class SliceManager: slices.extend(result) # cache the result - if self.caching and api.cache: - api.cache.add('slices', slices) + if self.cache: + api.logger.debug("SliceManager.ListSlices caches value") + self.cache.add('slices', slices) return slices diff --git a/sfa/plc/peers.py b/sfa/plc/peers.py index f9f80db0..c256c7b5 100644 --- a/sfa/plc/peers.py +++ b/sfa/plc/peers.py @@ -1,7 +1,7 @@ from sfa.util.xrn import get_authority from types import StringTypes -def get_peer(api, hrn): +def get_peer(pldriver, hrn): # Because of myplc native federation, we first need to determine if this # slice belongs to out local plc or a myplc peer. We will assume it # is a local site, unless we find out otherwise @@ -13,7 +13,7 @@ def get_peer(api, hrn): # get this site's authority (sfa root authority or sub authority) site_authority = get_authority(slice_authority).lower() # check if we are already peered with this site_authority, if so - peers = api.driver.GetPeers( {}, ['peer_id', 'peername', 'shortname', 'hrn_root']) + peers = pldriver.GetPeers( {}, ['peer_id', 'peername', 'shortname', 'hrn_root']) for peer_record in peers: names = [name.lower() for name in peer_record.values() if isinstance(name, StringTypes)] if site_authority in names: @@ -22,14 +22,14 @@ def get_peer(api, hrn): return peer -def get_sfa_peer(api, hrn): - # return the authority for this hrn or None if we are the authority - sfa_peer = None - slice_authority = get_authority(hrn) - site_authority = get_authority(slice_authority) - - if site_authority != api.hrn: - sfa_peer = site_authority - - return sfa_peer +#def get_sfa_peer(pldriver, hrn): +# # return the authority for this hrn or None if we are the authority +# sfa_peer = None +# slice_authority = get_authority(hrn) +# site_authority = get_authority(slice_authority) +# +# if site_authority != pldriver.hrn: +# sfa_peer = site_authority +# +# return sfa_peer diff --git a/sfa/plc/pldriver.py b/sfa/plc/pldriver.py index d892e25e..5ce3b710 100644 --- a/sfa/plc/pldriver.py +++ b/sfa/plc/pldriver.py @@ -1,14 +1,21 @@ +import time import datetime # -from sfa.util.faults import MissingSfaInfo, UnknownSfaType +from sfa.util.faults import MissingSfaInfo, UnknownSfaType, \ + RecordNotFound, SfaNotImplemented, SliverDoesNotExist + from sfa.util.sfalogging import logger from sfa.util.defaultdict import defaultdict -from sfa.util.xrn import hrn_to_urn, get_leaf -from sfa.util.plxrn import slicename_to_hrn, hostname_to_hrn, hrn_to_pl_slicename, hrn_to_pl_login_base +from sfa.util.sfatime import utcparse +from sfa.util.xrn import hrn_to_urn, get_leaf, urn_to_sliver_id +from sfa.util.cache import Cache # one would think the driver should not need to mess with the SFA db, but.. from sfa.storage.table import SfaTable +# used to be used in get_ticket +#from sfa.trust.sfaticket import SfaTicket + from sfa.rspecs.version_manager import VersionManager from sfa.rspecs.rspec import RSpec @@ -16,10 +23,11 @@ from sfa.rspecs.rspec import RSpec from sfa.managers.driver import Driver from sfa.plc.plshell import PlShell - import sfa.plc.peers as peers from sfa.plc.plaggregate import PlAggregate from sfa.plc.plslices import PlSlices +from sfa.util.plxrn import slicename_to_hrn, hostname_to_hrn, hrn_to_pl_slicename, hrn_to_pl_login_base + def list_to_dict(recs, key): """ @@ -40,9 +48,17 @@ def list_to_dict(recs, key): # class PlDriver (Driver, PlShell): + # the cache instance is a class member so it survives across incoming requests + cache = None + def __init__ (self, config): PlShell.__init__ (self, config) Driver.__init__ (self, config) + self.cache=None + if config.SFA_AGGREGATE_CACHING: + if PlDriver.cache is None: + PlDriver.cache = Cache() + self.cache = PlDriver.cache ######################################## ########## registry oriented @@ -105,9 +121,9 @@ class PlDriver (Driver, PlShell): elif type == 'node': login_base = hrn_to_pl_login_base(sfa_record['authority']) - nodes = api.driver.GetNodes([pl_record['hostname']]) + nodes = self.GetNodes([pl_record['hostname']]) if not nodes: - pointer = api.driver.AddNode(login_base, pl_record) + pointer = self.AddNode(login_base, pl_record) else: pointer = nodes[0]['node_id'] @@ -392,7 +408,6 @@ class PlDriver (Driver, PlShell): return records - # aggregates is basically api.aggregates def fill_record_sfa_info(self, records): def startswith(prefix, values): @@ -543,13 +558,66 @@ class PlDriver (Driver, PlShell): 'geni_ad_rspec_versions': ad_rspec_versions, } + def list_slices (self, creds, options): + # look in cache first + if self.cache: + slices = self.cache.get('slices') + if slices: + logger.debug("PlDriver.list_slices returns from cache") + return slices + + # get data from db + slices = self.GetSlices({'peer_id': None}, ['name']) + slice_hrns = [slicename_to_hrn(self.hrn, slice['name']) for slice in slices] + slice_urns = [hrn_to_urn(slice_hrn, 'slice') for slice_hrn in slice_hrns] + + # cache the result + if self.cache: + logger.debug ("PlDriver.list_slices stores value in cache") + self.cache.add('slices', slice_urns) + + return slice_urns + + # first 2 args are None in case of resource discovery + def list_resources (self, slice_urn, slice_hrn, creds, options): + cached_requested = options.get('cached', True) + + version_manager = VersionManager() + # get the rspec's return format from options + rspec_version = version_manager.get_version(options.get('geni_rspec_version')) + version_string = "rspec_%s" % (rspec_version) + + #panos adding the info option to the caching key (can be improved) + if options.get('info'): + version_string = version_string + "_"+options.get('info', 'default') + + # look in cache first + if cached_requested and self.cache and not slice_hrn: + rspec = self.cache.get(version_string) + if rspec: + logger.debug("PlDriver.ListResources: returning cached advertisement") + return rspec + + #panos: passing user-defined options + #print "manager options = ",options + aggregate = PlAggregate(self) + rspec = aggregate.get_rspec(slice_xrn=slice_urn, version=rspec_version, + options=options) + + # cache the result + if self.cache and not slice_hrn: + logger.debug("PlDriver.ListResources: stores advertisement in cache") + self.cache.add(version_string, rspec) + + return rspec + def sliver_status (self, slice_urn, slice_hrn): # find out where this slice is currently running slicename = hrn_to_pl_slicename(slice_hrn) slices = self.GetSlices([slicename], ['slice_id', 'node_ids','person_ids','name','expires']) if len(slices) == 0: - raise Exception("Slice %s not found (used %s as slicename internally)" % (slice_xrn, slicename)) + raise SliverDoesNotExist("%s (used %s as slicename internally)" % (slice_hrn, slicename)) slice = slices[0] # report about the local nodes only @@ -625,17 +693,138 @@ class PlDriver (Driver, PlShell): return aggregate.get_rspec(slice_xrn=slice_urn, version=rspec.version) + def delete_sliver (self, slice_urn, slice_hrn, creds, options): + slicename = hrn_to_pl_slicename(slice_hrn) + slices = self.GetSlices({'name': slicename}) + if not slices: + return 1 + slice = slices[0] + + # determine if this is a peer slice + # xxx I wonder if this would not need to use PlSlices.get_peer instead + # in which case plc.peers could be deprecated as this here + # is the only/last call to this last method in plc.peers + peer = peers.get_peer(self, slice_hrn) + try: + if peer: + self.UnBindObjectFromPeer('slice', slice['slice_id'], peer) + self.DeleteSliceFromNodes(slicename, slice['node_ids']) + finally: + if peer: + self.BindObjectToPeer('slice', slice['slice_id'], peer, slice['peer_slice_id']) + return 1 + def renew_sliver (self, slice_urn, slice_hrn, creds, expiration_time, options): slicename = hrn_to_pl_slicename(slice_hrn) - slices = self.driver.GetSlices({'name': slicename}, ['slice_id']) + slices = self.GetSlices({'name': slicename}, ['slice_id']) if not slices: raise RecordNotFound(slice_hrn) slice = slices[0] requested_time = utcparse(expiration_time) record = {'expires': int(time.mktime(requested_time.timetuple()))} try: - self.driver.UpdateSlice(slice['slice_id'], record) + self.UpdateSlice(slice['slice_id'], record) return True except: return False + # remove the 'enabled' tag + def start_slice (self, slice_urn, slice_hrn, creds): + slicename = hrn_to_pl_slicename(slice_hrn) + slices = self.GetSlices({'name': slicename}, ['slice_id']) + if not slices: + raise RecordNotFound(slice_hrn) + slice_id = slices[0]['slice_id'] + slice_tags = self.GetSliceTags({'slice_id': slice_id, 'tagname': 'enabled'}, ['slice_tag_id']) + # just remove the tag if it exists + if slice_tags: + self.DeleteSliceTag(slice_tags[0]['slice_tag_id']) + return 1 + + # set the 'enabled' tag to 0 + def stop_slice (self, slice_urn, slice_hrn, creds): + slicename = hrn_to_pl_slicename(slice_hrn) + slices = self.GetSlices({'name': slicename}, ['slice_id']) + if not slices: + raise RecordNotFound(slice_hrn) + slice_id = slices[0]['slice_id'] + slice_tags = self.GetSliceTags({'slice_id': slice_id, 'tagname': 'enabled'}) + if not slice_tags: + self.AddSliceTag(slice_id, 'enabled', '0') + elif slice_tags[0]['value'] != "0": + tag_id = slice_tags[0]['slice_tag_id'] + self.UpdateSliceTag(tag_id, '0') + return 1 + + def reset_slice (self, slice_urn, slice_hrn, creds): + raise SfaNotImplemented ("reset_slice not available at this interface") + + # xxx this code is quite old and has not run for ages + # it is obviously totally broken and needs a rewrite + def get_ticket (self, slice_urn, slice_hrn, creds, rspec_string, options): + raise SfaNotImplemented,"PlDriver.get_ticket needs a rewrite" +# please keep this code for future reference +# slices = PlSlices(self) +# peer = slices.get_peer(slice_hrn) +# sfa_peer = slices.get_sfa_peer(slice_hrn) +# +# # get the slice record +# credential = api.getCredential() +# interface = api.registries[api.hrn] +# registry = api.server_proxy(interface, credential) +# records = registry.Resolve(xrn, credential) +# +# # make sure we get a local slice record +# record = None +# for tmp_record in records: +# if tmp_record['type'] == 'slice' and \ +# not tmp_record['peer_authority']: +# #Error (E0602, GetTicket): Undefined variable 'SliceRecord' +# slice_record = SliceRecord(dict=tmp_record) +# if not record: +# raise RecordNotFound(slice_hrn) +# +# # similar to CreateSliver, we must verify that the required records exist +# # at this aggregate before we can issue a ticket +# # parse rspec +# rspec = RSpec(rspec_string) +# requested_attributes = rspec.version.get_slice_attributes() +# +# # ensure site record exists +# site = slices.verify_site(slice_hrn, slice_record, peer, sfa_peer) +# # ensure slice record exists +# slice = slices.verify_slice(slice_hrn, slice_record, peer, sfa_peer) +# # ensure person records exists +# # xxx users is undefined in this context +# persons = slices.verify_persons(slice_hrn, slice, users, peer, sfa_peer) +# # ensure slice attributes exists +# slices.verify_slice_attributes(slice, requested_attributes) +# +# # get sliver info +# slivers = slices.get_slivers(slice_hrn) +# +# if not slivers: +# raise SliverDoesNotExist(slice_hrn) +# +# # get initscripts +# initscripts = [] +# data = { +# 'timestamp': int(time.time()), +# 'initscripts': initscripts, +# 'slivers': slivers +# } +# +# # create the ticket +# object_gid = record.get_gid_object() +# new_ticket = SfaTicket(subject = object_gid.get_subject()) +# new_ticket.set_gid_caller(api.auth.client_gid) +# new_ticket.set_gid_object(object_gid) +# new_ticket.set_issuer(key=api.key, subject=self.hrn) +# new_ticket.set_pubkey(object_gid.get_pubkey()) +# new_ticket.set_attributes(data) +# new_ticket.set_rspec(rspec) +# #new_ticket.set_parent(api.auth.hierarchy.get_auth_ticket(auth_hrn)) +# new_ticket.encode() +# new_ticket.sign() +# +# return new_ticket.save_to_string(save_parents=True) diff --git a/sfa/util/cache.py b/sfa/util/cache.py index a2ded4a4..ee4716c6 100644 --- a/sfa/util/cache.py +++ b/sfa/util/cache.py @@ -8,7 +8,7 @@ import pickle from datetime import datetime # maximum lifetime of cached data (in seconds) -MAX_CACHE_TTL = 60 * 60 +DEFAULT_CACHE_TTL = 60 * 60 class CacheData: @@ -17,7 +17,7 @@ class CacheData: expires = None lock = None - def __init__(self, data, ttl = MAX_CACHE_TTL): + def __init__(self, data, ttl = DEFAULT_CACHE_TTL): self.lock = threading.RLock() self.data = data self.renew(ttl) @@ -31,11 +31,11 @@ class CacheData: def get_expires_date(self): return str(datetime.fromtimestamp(self.expires)) - def renew(self, ttl = MAX_CACHE_TTL): + def renew(self, ttl = DEFAULT_CACHE_TTL): self.created = time.time() self.expires = self.created + ttl - def set_data(self, data, renew=True, ttl = MAX_CACHE_TTL): + def set_data(self, data, renew=True, ttl = DEFAULT_CACHE_TTL): with self.lock: self.data = data if renew: @@ -73,7 +73,7 @@ class Cache: if filename: self.load_from_file(filename) - def add(self, key, value, ttl = MAX_CACHE_TTL): + def add(self, key, value, ttl = DEFAULT_CACHE_TTL): with self.lock: if self.cache.has_key(key): self.cache[key].set_data(value, ttl=ttl)