%define name sfa
%define version 1.1
-%define taglevel 1
+%define taglevel 2
%define release %{taglevel}%{?pldistro:.%{pldistro}}%{?date:.%{date}}
%global python_sitearch %( python -c "from distutils.sysconfig import get_python_lib; print get_python_lib(1)" )
[ "$1" -ge "1" ] && service sfa-cm restart || :
%changelog
+* Mon Nov 07 2011 Thierry Parmentelat <thierry.parmentelat@sophia.inria.fr> - sfa-1.1-2
+- checkpoint tag: use SFA_GENERIC_FLAVOUR instead of SFA_*_TYPE
+- improvements in the pgv2 rspecs
+- driver separated from api
+- code starts moving around where it belongs
+- sfascan caches getversion across invokations
+- vini topology extracted as a config file
+
* Fri Oct 28 2011 Thierry Parmentelat <thierry.parmentelat@sophia.inria.fr> - sfa-1.1-1
- first support for protogeni rspecs is working
- vini no longer needs a specific manager
def registry_manager_class (self) :
return sfa.managers.registry_manager
def slicemgr_manager_class (self) :
- return sfa.managers.slice_manager
+ return sfa.managers.slice_manager.SliceManager
def aggregate_manager_class (self) :
- return sfa.managers.aggregate_manager
+ return sfa.managers.aggregate_manager.AggregateManager
# driver class for server-side services, talk to the whole testbed
def driver_class (self):
from sfa.plc.aggregate import Aggregate
from sfa.plc.slices import Slices
-def GetVersion(api):
+class AggregateManager:
- version_manager = VersionManager()
- ad_rspec_versions = []
- request_rspec_versions = []
- for rspec_version in version_manager.versions:
- if rspec_version.content_type in ['*', 'ad']:
- ad_rspec_versions.append(rspec_version.to_dict())
- if rspec_version.content_type in ['*', 'request']:
- request_rspec_versions.append(rspec_version.to_dict())
- default_rspec_version = version_manager.get_version("sfa 1").to_dict()
- xrn=Xrn(api.hrn)
- version_more = {'interface':'aggregate',
- 'testbed':'myplc',
- 'hrn':xrn.get_hrn(),
- 'request_rspec_versions': request_rspec_versions,
- 'ad_rspec_versions': ad_rspec_versions,
- 'default_ad_rspec': default_rspec_version
- }
- return version_core(version_more)
-
-def __get_registry_objects(slice_xrn, creds, users):
- """
-
- """
- hrn, _ = urn_to_hrn(slice_xrn)
-
- hrn_auth = get_authority(hrn)
-
- # Build up objects that an SFA registry would return if SFA
- # could contact the slice's registry directly
- reg_objects = None
-
- if users:
- # dont allow special characters in the site login base
- #only_alphanumeric = re.compile('[^a-zA-Z0-9]+')
- #login_base = only_alphanumeric.sub('', hrn_auth[:20]).lower()
+ def __init__ (self):
+ # xxx Thierry : caching at the aggregate level sounds wrong...
+ #self.caching=True
+ self.caching=False
+
+ def GetVersion(self, api):
+
+ version_manager = VersionManager()
+ ad_rspec_versions = []
+ request_rspec_versions = []
+ for rspec_version in version_manager.versions:
+ if rspec_version.content_type in ['*', 'ad']:
+ ad_rspec_versions.append(rspec_version.to_dict())
+ if rspec_version.content_type in ['*', 'request']:
+ request_rspec_versions.append(rspec_version.to_dict())
+ default_rspec_version = version_manager.get_version("sfa 1").to_dict()
+ xrn=Xrn(api.hrn)
+ version_more = {'interface':'aggregate',
+ 'testbed':'myplc',
+ 'hrn':xrn.get_hrn(),
+ 'request_rspec_versions': request_rspec_versions,
+ 'ad_rspec_versions': ad_rspec_versions,
+ 'default_ad_rspec': default_rspec_version
+ }
+ return version_core(version_more)
+
+ def _get_registry_objects(self, slice_xrn, creds, users):
+ """
+
+ """
+ hrn, _ = urn_to_hrn(slice_xrn)
+
+ hrn_auth = get_authority(hrn)
+
+ # Build up objects that an SFA registry would return if SFA
+ # could contact the slice's registry directly
+ reg_objects = None
+
+ if users:
+ # dont allow special characters in the site login base
+ #only_alphanumeric = re.compile('[^a-zA-Z0-9]+')
+ #login_base = only_alphanumeric.sub('', hrn_auth[:20]).lower()
+ slicename = hrn_to_pl_slicename(hrn)
+ login_base = slicename.split('_')[0]
+ reg_objects = {}
+ site = {}
+ site['site_id'] = 0
+ site['name'] = 'geni.%s' % login_base
+ site['enabled'] = True
+ site['max_slices'] = 100
+
+ # Note:
+ # Is it okay if this login base is the same as one already at this myplc site?
+ # Do we need uniqueness? Should use hrn_auth instead of just the leaf perhaps?
+ site['login_base'] = login_base
+ site['abbreviated_name'] = login_base
+ site['max_slivers'] = 1000
+ reg_objects['site'] = site
+
+ slice = {}
+
+ # get_expiration always returns a normalized datetime - no need to utcparse
+ extime = Credential(string=creds[0]).get_expiration()
+ # If the expiration time is > 60 days from now, set the expiration time to 60 days from now
+ if extime > datetime.datetime.utcnow() + datetime.timedelta(days=60):
+ extime = datetime.datetime.utcnow() + datetime.timedelta(days=60)
+ slice['expires'] = int(time.mktime(extime.timetuple()))
+ slice['hrn'] = hrn
+ slice['name'] = hrn_to_pl_slicename(hrn)
+ slice['url'] = hrn
+ slice['description'] = hrn
+ slice['pointer'] = 0
+ reg_objects['slice_record'] = slice
+
+ reg_objects['users'] = {}
+ for user in users:
+ user['key_ids'] = []
+ hrn, _ = urn_to_hrn(user['urn'])
+ user['email'] = hrn_to_pl_slicename(hrn) + "@geni.net"
+ user['first_name'] = hrn
+ user['last_name'] = hrn
+ reg_objects['users'][user['email']] = user
+
+ return reg_objects
+
+ def SliverStatus(self, api, slice_xrn, creds, call_id):
+ if Callids().already_handled(call_id): return {}
+
+ (hrn, _) = urn_to_hrn(slice_xrn)
+ # find out where this slice is currently running
slicename = hrn_to_pl_slicename(hrn)
- login_base = slicename.split('_')[0]
- reg_objects = {}
- site = {}
- site['site_id'] = 0
- site['name'] = 'geni.%s' % login_base
- site['enabled'] = True
- site['max_slices'] = 100
-
- # Note:
- # Is it okay if this login base is the same as one already at this myplc site?
- # Do we need uniqueness? Should use hrn_auth instead of just the leaf perhaps?
- site['login_base'] = login_base
- site['abbreviated_name'] = login_base
- site['max_slivers'] = 1000
- reg_objects['site'] = site
-
- slice = {}
- # get_expiration always returns a normalized datetime - no need to utcparse
- extime = Credential(string=creds[0]).get_expiration()
- # If the expiration time is > 60 days from now, set the expiration time to 60 days from now
- if extime > datetime.datetime.utcnow() + datetime.timedelta(days=60):
- extime = datetime.datetime.utcnow() + datetime.timedelta(days=60)
- slice['expires'] = int(time.mktime(extime.timetuple()))
- slice['hrn'] = hrn
- slice['name'] = hrn_to_pl_slicename(hrn)
- slice['url'] = hrn
- slice['description'] = hrn
- slice['pointer'] = 0
- reg_objects['slice_record'] = slice
-
- reg_objects['users'] = {}
- for user in users:
- user['key_ids'] = []
- hrn, _ = urn_to_hrn(user['urn'])
- user['email'] = hrn_to_pl_slicename(hrn) + "@geni.net"
- user['first_name'] = hrn
- user['last_name'] = hrn
- reg_objects['users'][user['email']] = user
-
- return reg_objects
-
-def __get_hostnames(nodes):
- hostnames = []
- for node in nodes:
- hostnames.append(node.hostname)
- return hostnames
-
-def SliverStatus(api, slice_xrn, creds, call_id):
- if Callids().already_handled(call_id): return {}
-
- (hrn, _) = urn_to_hrn(slice_xrn)
- # find out where this slice is currently running
- slicename = hrn_to_pl_slicename(hrn)
-
- slices = api.driver.GetSlices([slicename], ['slice_id', 'node_ids','person_ids','name','expires'])
- if len(slices) == 0:
- raise Exception("Slice %s not found (used %s as slicename internally)" % (slice_xrn, slicename))
- slice = slices[0]
-
- # report about the local nodes only
- nodes = api.driver.GetNodes({'node_id':slice['node_ids'],'peer_id':None},
- ['node_id', 'hostname', 'site_id', 'boot_state', 'last_contact'])
- site_ids = [node['site_id'] for node in nodes]
-
- result = {}
- top_level_status = 'unknown'
- if nodes:
- top_level_status = 'ready'
- slice_urn = Xrn(slice_xrn, 'slice').get_urn()
- result['geni_urn'] = slice_urn
- result['pl_login'] = slice['name']
- result['pl_expires'] = datetime.datetime.fromtimestamp(slice['expires']).ctime()
-
- resources = []
- for node in nodes:
- res = {}
- res['pl_hostname'] = node['hostname']
- res['pl_boot_state'] = node['boot_state']
- res['pl_last_contact'] = node['last_contact']
- if node['last_contact'] is not None:
- res['pl_last_contact'] = datetime.datetime.fromtimestamp(node['last_contact']).ctime()
- sliver_id = urn_to_sliver_id(slice_urn, slice['slice_id'], node['node_id'])
- res['geni_urn'] = sliver_id
- if node['boot_state'] == 'boot':
- res['geni_status'] = 'ready'
- else:
- res['geni_status'] = 'failed'
- top_level_status = 'failed'
+ slices = api.driver.GetSlices([slicename], ['slice_id', 'node_ids','person_ids','name','expires'])
+ if len(slices) == 0:
+ raise Exception("Slice %s not found (used %s as slicename internally)" % (slice_xrn, slicename))
+ slice = slices[0]
+
+ # report about the local nodes only
+ nodes = api.driver.GetNodes({'node_id':slice['node_ids'],'peer_id':None},
+ ['node_id', 'hostname', 'site_id', 'boot_state', 'last_contact'])
+ site_ids = [node['site_id'] for node in nodes]
+
+ result = {}
+ top_level_status = 'unknown'
+ if nodes:
+ top_level_status = 'ready'
+ slice_urn = Xrn(slice_xrn, 'slice').get_urn()
+ result['geni_urn'] = slice_urn
+ result['pl_login'] = slice['name']
+ result['pl_expires'] = datetime.datetime.fromtimestamp(slice['expires']).ctime()
+
+ resources = []
+ for node in nodes:
+ res = {}
+ res['pl_hostname'] = node['hostname']
+ res['pl_boot_state'] = node['boot_state']
+ res['pl_last_contact'] = node['last_contact']
+ if node['last_contact'] is not None:
+ res['pl_last_contact'] = datetime.datetime.fromtimestamp(node['last_contact']).ctime()
+ sliver_id = urn_to_sliver_id(slice_urn, slice['slice_id'], node['node_id'])
+ res['geni_urn'] = sliver_id
+ if node['boot_state'] == 'boot':
+ res['geni_status'] = 'ready'
+ else:
+ res['geni_status'] = 'failed'
+ top_level_status = 'failed'
+
+ res['geni_error'] = ''
+
+ resources.append(res)
- res['geni_error'] = ''
-
- resources.append(res)
+ result['geni_status'] = top_level_status
+ result['geni_resources'] = resources
+ return result
+
+ def CreateSliver(self, api, slice_xrn, creds, rspec_string, users, call_id):
+ """
+ Create the sliver[s] (slice) at this aggregate.
+ Verify HRN and initialize the slice record in PLC if necessary.
+ """
+ if Callids().already_handled(call_id): return ""
+
+ aggregate = Aggregate(api)
+ slices = Slices(api)
+ (hrn, _) = urn_to_hrn(slice_xrn)
+ peer = slices.get_peer(hrn)
+ sfa_peer = slices.get_sfa_peer(hrn)
+ slice_record=None
+ if users:
+ slice_record = users[0].get('slice_record', {})
+
+ # parse rspec
+ rspec = RSpec(rspec_string)
+ requested_attributes = rspec.version.get_slice_attributes()
- result['geni_status'] = top_level_status
- result['geni_resources'] = resources
- return result
-
-def CreateSliver(api, slice_xrn, creds, rspec_string, users, call_id):
- """
- Create the sliver[s] (slice) at this aggregate.
- Verify HRN and initialize the slice record in PLC if necessary.
- """
- if Callids().already_handled(call_id): return ""
-
- aggregate = Aggregate(api)
- slices = Slices(api)
- (hrn, _) = urn_to_hrn(slice_xrn)
- peer = slices.get_peer(hrn)
- sfa_peer = slices.get_sfa_peer(hrn)
- slice_record=None
- if users:
- slice_record = users[0].get('slice_record', {})
-
- # parse rspec
- rspec = RSpec(rspec_string)
- requested_attributes = rspec.version.get_slice_attributes()
-
- # ensure site record exists
- site = slices.verify_site(hrn, slice_record, peer, sfa_peer)
- # ensure slice record exists
- slice = slices.verify_slice(hrn, slice_record, peer, sfa_peer)
- # ensure person records exists
- persons = slices.verify_persons(hrn, slice, users, peer, sfa_peer)
- # ensure slice attributes exists
- slices.verify_slice_attributes(slice, requested_attributes)
-
- # add/remove slice from nodes
- requested_slivers = [str(host) for host in rspec.version.get_nodes_with_slivers()]
- slices.verify_slice_nodes(slice, requested_slivers, peer)
-
- aggregate.prepare_nodes({'hostname': requested_slivers})
- aggregate.prepare_interfaces({'node_id': aggregate.nodes.keys()})
- slices.verify_slice_links(slice, rspec.version.get_link_requests(), aggregate)
-
- # handle MyPLC peer association.
- # only used by plc and ple.
- slices.handle_peer(site, slice, persons, peer)
+ # ensure site record exists
+ site = slices.verify_site(hrn, slice_record, peer, sfa_peer)
+ # ensure slice record exists
+ slice = slices.verify_slice(hrn, slice_record, peer, sfa_peer)
+ # ensure person records exists
+ persons = slices.verify_persons(hrn, slice, users, peer, sfa_peer)
+ # ensure slice attributes exists
+ slices.verify_slice_attributes(slice, requested_attributes)
+
+ # add/remove slice from nodes
+ requested_slivers = [str(host) for host in rspec.version.get_nodes_with_slivers()]
+ slices.verify_slice_nodes(slice, requested_slivers, peer)
+
+ aggregate.prepare_nodes({'hostname': requested_slivers})
+ aggregate.prepare_interfaces({'node_id': aggregate.nodes.keys()})
+ slices.verify_slice_links(slice, rspec.version.get_link_requests(), aggregate)
+
+ # handle MyPLC peer association.
+ # only used by plc and ple.
+ slices.handle_peer(site, slice, persons, peer)
+
+ return aggregate.get_rspec(slice_xrn=slice_xrn, version=rspec.version)
+
+
+ def RenewSliver(self, api, xrn, creds, expiration_time, call_id):
+ if Callids().already_handled(call_id): return True
+ (hrn, _) = urn_to_hrn(xrn)
+ slicename = hrn_to_pl_slicename(hrn)
+ slices = api.driver.GetSlices({'name': slicename}, ['slice_id'])
+ if not slices:
+ raise RecordNotFound(hrn)
+ slice = slices[0]
+ requested_time = utcparse(expiration_time)
+ record = {'expires': int(time.mktime(requested_time.timetuple()))}
+ try:
+ api.driver.UpdateSlice(slice['slice_id'], record)
+ return True
+ except:
+ return False
+
+ def start_slice(self, api, xrn, creds):
+ (hrn, _) = urn_to_hrn(xrn)
+ slicename = hrn_to_pl_slicename(hrn)
+ slices = api.driver.GetSlices({'name': slicename}, ['slice_id'])
+ if not slices:
+ raise RecordNotFound(hrn)
+ slice_id = slices[0]['slice_id']
+ slice_tags = api.driver.GetSliceTags({'slice_id': slice_id, 'tagname': 'enabled'}, ['slice_tag_id'])
+ # just remove the tag if it exists
+ if slice_tags:
+ api.driver.DeleteSliceTag(slice_tags[0]['slice_tag_id'])
- return aggregate.get_rspec(slice_xrn=slice_xrn, version=rspec.version)
-
-
-def RenewSliver(api, xrn, creds, expiration_time, call_id):
- if Callids().already_handled(call_id): return True
- (hrn, _) = urn_to_hrn(xrn)
- slicename = hrn_to_pl_slicename(hrn)
- slices = api.driver.GetSlices({'name': slicename}, ['slice_id'])
- if not slices:
- raise RecordNotFound(hrn)
- slice = slices[0]
- requested_time = utcparse(expiration_time)
- record = {'expires': int(time.mktime(requested_time.timetuple()))}
- try:
- api.driver.UpdateSlice(slice['slice_id'], record)
- return True
- except:
- return False
-
-def start_slice(api, xrn, creds):
- (hrn, _) = urn_to_hrn(xrn)
- slicename = hrn_to_pl_slicename(hrn)
- slices = api.driver.GetSlices({'name': slicename}, ['slice_id'])
- if not slices:
- raise RecordNotFound(hrn)
- slice_id = slices[0]['slice_id']
- slice_tags = api.driver.GetSliceTags({'slice_id': slice_id, 'tagname': 'enabled'}, ['slice_tag_id'])
- # just remove the tag if it exists
- if slice_tags:
- api.driver.DeleteSliceTag(slice_tags[0]['slice_tag_id'])
-
- return 1
-
-def stop_slice(api, xrn, creds):
- hrn, _ = urn_to_hrn(xrn)
- slicename = hrn_to_pl_slicename(hrn)
- slices = api.driver.GetSlices({'name': slicename}, ['slice_id'])
- if not slices:
- raise RecordNotFound(hrn)
- slice_id = slices[0]['slice_id']
- slice_tags = api.driver.GetSliceTags({'slice_id': slice_id, 'tagname': 'enabled'})
- if not slice_tags:
- api.driver.AddSliceTag(slice_id, 'enabled', '0')
- elif slice_tags[0]['value'] != "0":
- tag_id = slice_tags[0]['slice_tag_id']
- api.driver.UpdateSliceTag(tag_id, '0')
- return 1
-
-def reset_slice(api, xrn):
- # XX not implemented at this interface
- return 1
-
-def DeleteSliver(api, xrn, creds, call_id):
- if Callids().already_handled(call_id): return ""
- (hrn, _) = urn_to_hrn(xrn)
- slicename = hrn_to_pl_slicename(hrn)
- slices = api.driver.GetSlices({'name': slicename})
- if not slices:
return 1
- slice = slices[0]
-
- # determine if this is a peer slice
- peer = peers.get_peer(api, hrn)
- try:
- if peer:
- api.driver.UnBindObjectFromPeer('slice', slice['slice_id'], peer)
- api.driver.DeleteSliceFromNodes(slicename, slice['node_ids'])
- finally:
- if peer:
- api.driver.BindObjectToPeer('slice', slice['slice_id'], peer, slice['peer_slice_id'])
- return 1
-
-# xxx Thierry : caching at the aggregate level sounds wrong...
-#caching=True
-caching=False
-def ListSlices(api, creds, call_id):
- if Callids().already_handled(call_id): return []
- # look in cache first
- if caching and api.cache:
- slices = api.cache.get('slices')
- if slices:
- return slices
-
- # get data from db
- slices = api.driver.GetSlices({'peer_id': None}, ['name'])
- slice_hrns = [slicename_to_hrn(api.hrn, slice['name']) for slice in slices]
- slice_urns = [hrn_to_urn(slice_hrn, 'slice') for slice_hrn in slice_hrns]
-
- # cache the result
- if caching and api.cache:
- api.cache.add('slices', slice_urns)
-
- return slice_urns
+
+ def stop_slice(self, api, xrn, creds):
+ hrn, _ = urn_to_hrn(xrn)
+ slicename = hrn_to_pl_slicename(hrn)
+ slices = api.driver.GetSlices({'name': slicename}, ['slice_id'])
+ if not slices:
+ raise RecordNotFound(hrn)
+ slice_id = slices[0]['slice_id']
+ slice_tags = api.driver.GetSliceTags({'slice_id': slice_id, 'tagname': 'enabled'})
+ if not slice_tags:
+ api.driver.AddSliceTag(slice_id, 'enabled', '0')
+ elif slice_tags[0]['value'] != "0":
+ tag_id = slice_tags[0]['slice_tag_id']
+ api.driver.UpdateSliceTag(tag_id, '0')
+ return 1
-def ListResources(api, creds, options, call_id):
- if Callids().already_handled(call_id): return ""
- # get slice's hrn from options
- xrn = options.get('geni_slice_urn', None)
- (hrn, _) = urn_to_hrn(xrn)
-
- version_manager = VersionManager()
- # get the rspec's return format from options
- rspec_version = version_manager.get_version(options.get('rspec_version'))
- version_string = "rspec_%s" % (rspec_version.to_string())
-
- #panos adding the info option to the caching key (can be improved)
- if options.get('info'):
- version_string = version_string + "_"+options.get('info', 'default')
-
- # look in cache first
- if caching and api.cache and not xrn:
- rspec = api.cache.get(version_string)
- if rspec:
- api.logger.info("aggregate.ListResources: returning cached value for hrn %s"%hrn)
- return rspec
-
- #panos: passing user-defined options
- #print "manager options = ",options
- aggregate = Aggregate(api, options)
- rspec = aggregate.get_rspec(slice_xrn=xrn, version=rspec_version)
-
- # cache the result
- if caching and api.cache and not xrn:
- api.cache.add(version_string, rspec)
-
- return rspec
-
-
-def get_ticket(api, xrn, creds, rspec, users):
-
- (slice_hrn, _) = urn_to_hrn(xrn)
- slices = Slices(api)
- peer = slices.get_peer(slice_hrn)
- sfa_peer = slices.get_sfa_peer(slice_hrn)
-
- # get the slice record
- credential = api.getCredential()
- interface = api.registries[api.hrn]
- registry = api.server_proxy(interface, credential)
- records = registry.Resolve(xrn, credential)
-
- # make sure we get a local slice record
- record = None
- for tmp_record in records:
- if tmp_record['type'] == 'slice' and \
- not tmp_record['peer_authority']:
-#Error (E0602, get_ticket): Undefined variable 'SliceRecord'
- record = SliceRecord(dict=tmp_record)
- if not record:
- raise RecordNotFound(slice_hrn)
-
- # similar to CreateSliver, we must verify that the required records exist
- # at this aggregate before we can issue a ticket
- # parse rspec
- rspec = RSpec(rspec_string)
- requested_attributes = rspec.version.get_slice_attributes()
-
- # ensure site record exists
- site = slices.verify_site(hrn, slice_record, peer, sfa_peer)
- # ensure slice record exists
- slice = slices.verify_slice(hrn, slice_record, peer, sfa_peer)
- # ensure person records exists
- persons = slices.verify_persons(hrn, slice, users, peer, sfa_peer)
- # ensure slice attributes exists
- slices.verify_slice_attributes(slice, requested_attributes)
-
- # get sliver info
- slivers = slices.get_slivers(slice_hrn)
-
- if not slivers:
- raise SliverDoesNotExist(slice_hrn)
-
- # get initscripts
- initscripts = []
- data = {
- 'timestamp': int(time.time()),
- 'initscripts': initscripts,
- 'slivers': slivers
- }
-
- # create the ticket
- object_gid = record.get_gid_object()
- new_ticket = SfaTicket(subject = object_gid.get_subject())
- new_ticket.set_gid_caller(api.auth.client_gid)
- new_ticket.set_gid_object(object_gid)
- new_ticket.set_issuer(key=api.key, subject=api.hrn)
- new_ticket.set_pubkey(object_gid.get_pubkey())
- new_ticket.set_attributes(data)
- new_ticket.set_rspec(rspec)
- #new_ticket.set_parent(api.auth.hierarchy.get_auth_ticket(auth_hrn))
- new_ticket.encode()
- new_ticket.sign()
-
- return new_ticket.save_to_string(save_parents=True)
-
-
-
-#def main():
-# """
-# rspec = ListResources(api, "plc.princeton.sapan", None, 'pl_test_sapan')
-# #rspec = ListResources(api, "plc.princeton.coblitz", None, 'pl_test_coblitz')
-# #rspec = ListResources(api, "plc.pl.sirius", None, 'pl_test_sirius')
-# print rspec
-# """
-# api = PlcSfaApi()
-# f = open(sys.argv[1])
-# xml = f.read()
-# f.close()
-##Error (E1120, main): No value passed for parameter 'users' in function call
-##Error (E1120, main): No value passed for parameter 'call_id' in function call
-# CreateSliver(api, "plc.princeton.sapan", xml, 'CreateSliver_sapan')
-#
-#if __name__ == "__main__":
-# main()
+ def reset_slice(self, api, xrn):
+ # XX not implemented at this interface
+ return 1
+
+ def DeleteSliver(self, api, xrn, creds, call_id):
+ if Callids().already_handled(call_id): return ""
+ (hrn, _) = urn_to_hrn(xrn)
+ slicename = hrn_to_pl_slicename(hrn)
+ slices = api.driver.GetSlices({'name': slicename})
+ if not slices:
+ return 1
+ slice = slices[0]
+
+ # determine if this is a peer slice
+ peer = peers.get_peer(api, hrn)
+ try:
+ if peer:
+ api.driver.UnBindObjectFromPeer('slice', slice['slice_id'], peer)
+ api.driver.DeleteSliceFromNodes(slicename, slice['node_ids'])
+ finally:
+ if peer:
+ api.driver.BindObjectToPeer('slice', slice['slice_id'], peer, slice['peer_slice_id'])
+ return 1
+
+ def ListSlices(self, api, creds, call_id):
+ if Callids().already_handled(call_id): return []
+ # look in cache first
+ if self.caching and api.cache:
+ slices = api.cache.get('slices')
+ if slices:
+ return slices
+
+ # get data from db
+ slices = api.driver.GetSlices({'peer_id': None}, ['name'])
+ slice_hrns = [slicename_to_hrn(api.hrn, slice['name']) for slice in slices]
+ slice_urns = [hrn_to_urn(slice_hrn, 'slice') for slice_hrn in slice_hrns]
+
+ # cache the result
+ if self.caching and api.cache:
+ api.cache.add('slices', slice_urns)
+
+ return slice_urns
+
+ def ListResources(self, api, creds, options, call_id):
+ if Callids().already_handled(call_id): return ""
+ # get slice's hrn from options
+ xrn = options.get('geni_slice_urn', None)
+ (hrn, _) = urn_to_hrn(xrn)
+
+ version_manager = VersionManager()
+ # get the rspec's return format from options
+ rspec_version = version_manager.get_version(options.get('rspec_version'))
+ version_string = "rspec_%s" % (rspec_version.to_string())
+
+ #panos adding the info option to the caching key (can be improved)
+ if options.get('info'):
+ version_string = version_string + "_"+options.get('info', 'default')
+
+ # look in cache first
+ if self.caching and api.cache and not xrn:
+ rspec = api.cache.get(version_string)
+ if rspec:
+ api.logger.info("aggregate.ListResources: returning cached value for hrn %s"%hrn)
+ return rspec
+
+ #panos: passing user-defined options
+ #print "manager options = ",options
+ aggregate = Aggregate(api, options)
+ rspec = aggregate.get_rspec(slice_xrn=xrn, version=rspec_version)
+
+ # cache the result
+ if self.caching and api.cache and not xrn:
+ api.cache.add(version_string, rspec)
+
+ return rspec
+
+
+ def get_ticket(self, api, xrn, creds, rspec, users):
+
+ (slice_hrn, _) = urn_to_hrn(xrn)
+ slices = Slices(api)
+ peer = slices.get_peer(slice_hrn)
+ sfa_peer = slices.get_sfa_peer(slice_hrn)
+
+ # get the slice record
+ credential = api.getCredential()
+ interface = api.registries[api.hrn]
+ registry = api.server_proxy(interface, credential)
+ records = registry.Resolve(xrn, credential)
+
+ # make sure we get a local slice record
+ record = None
+ for tmp_record in records:
+ if tmp_record['type'] == 'slice' and \
+ not tmp_record['peer_authority']:
+ #Error (E0602, get_ticket): Undefined variable 'SliceRecord'
+ record = SliceRecord(dict=tmp_record)
+ if not record:
+ raise RecordNotFound(slice_hrn)
+
+ # similar to CreateSliver, we must verify that the required records exist
+ # at this aggregate before we can issue a ticket
+ # parse rspec
+ rspec = RSpec(rspec_string)
+ requested_attributes = rspec.version.get_slice_attributes()
+
+ # ensure site record exists
+ site = slices.verify_site(hrn, slice_record, peer, sfa_peer)
+ # ensure slice record exists
+ slice = slices.verify_slice(hrn, slice_record, peer, sfa_peer)
+ # ensure person records exists
+ persons = slices.verify_persons(hrn, slice, users, peer, sfa_peer)
+ # ensure slice attributes exists
+ slices.verify_slice_attributes(slice, requested_attributes)
+
+ # get sliver info
+ slivers = slices.get_slivers(slice_hrn)
+
+ if not slivers:
+ raise SliverDoesNotExist(slice_hrn)
+
+ # get initscripts
+ initscripts = []
+ data = {
+ 'timestamp': int(time.time()),
+ 'initscripts': initscripts,
+ 'slivers': slivers
+ }
+
+ # create the ticket
+ object_gid = record.get_gid_object()
+ new_ticket = SfaTicket(subject = object_gid.get_subject())
+ new_ticket.set_gid_caller(api.auth.client_gid)
+ new_ticket.set_gid_object(object_gid)
+ new_ticket.set_issuer(key=api.key, subject=api.hrn)
+ new_ticket.set_pubkey(object_gid.get_pubkey())
+ new_ticket.set_attributes(data)
+ new_ticket.set_rspec(rspec)
+ #new_ticket.set_parent(api.auth.hierarchy.get_auth_ticket(auth_hrn))
+ new_ticket.encode()
+ new_ticket.sign()
+
+ return new_ticket.save_to_string(save_parents=True)
from lxml import etree as ET
from sqlobject import *
-from sfa.util.faults import *
+from sfa.util.faults import InvalidRSpec,
from sfa.util.xrn import urn_to_hrn, Xrn
from sfa.util.plxrn import hrn_to_pl_slicename, slicename_to_hrn
from sfa.util.callids import Callids
-from sfa.util.sfalogging import logger
+#comes with its own logging
+#from sfa.util.sfalogging import logger
from sfa.util.version import version_core
from sfa.trust.credential import Credential
from sfa.server.sfaapi import SfaApi
from sfa.plc.aggregate import Aggregate
-from sfa.plc.slices import *
-from sfa.rspecs.sfa_rspec import sfa_rspec_version
-
-
-##
-# The data structure used to represent a cloud.
-# It contains the cloud name, its ip address, image information,
-# key pairs, and clusters information.
-#
-cloud = {}
-
-##
-# The location of the RelaxNG schema.
-#
-EUCALYPTUS_RSPEC_SCHEMA='/etc/sfa/eucalyptus.rng'
+from sfa.plc.slices import Slice, Slices
+# not sure what this used to be nor where it is now defined
+#from sfa.rspecs.sfa_rspec import sfa_rspec_version
##
# Meta data of an instance.
(self.image_id, self.kernel_id, self.ramdisk_id,
self.inst_type, self.key_pair))
- # XXX The return statement is for testing. REMOVE in production
- #return
-
try:
reservation = botoConn.run_instances(self.image_id,
kernel_id = self.kernel_id,
#slice_index = DatabaseIndex('slice_hrn')
instances = MultipleJoin('EucaInstance')
-##
-# Initialize the aggregate manager by reading a configuration file.
-#
-def init_server():
- logger = logging.getLogger('EucaAggregate')
- fileHandler = logging.FileHandler('/var/log/euca.log')
- fileHandler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
- logger.addHandler(fileHandler)
- fileHandler.setLevel(logging.DEBUG)
- logger.setLevel(logging.DEBUG)
-
- configParser = ConfigParser()
- configParser.read(['/etc/sfa/eucalyptus_aggregate.conf', 'eucalyptus_aggregate.conf'])
- if len(configParser.sections()) < 1:
- logger.error('No cloud defined in the config file')
- raise Exception('Cannot find cloud definition in configuration file.')
-
- # Only read the first section.
- cloudSec = configParser.sections()[0]
- cloud['name'] = cloudSec
- cloud['access_key'] = configParser.get(cloudSec, 'access_key')
- cloud['secret_key'] = configParser.get(cloudSec, 'secret_key')
- cloud['cloud_url'] = configParser.get(cloudSec, 'cloud_url')
- cloudURL = cloud['cloud_url']
- if cloudURL.find('https://') >= 0:
- cloudURL = cloudURL.replace('https://', '')
- elif cloudURL.find('http://') >= 0:
- cloudURL = cloudURL.replace('http://', '')
- (cloud['ip'], parts) = cloudURL.split(':')
-
- # Create image bundles
- images = getEucaConnection().get_all_images()
- cloud['images'] = images
- cloud['imageBundles'] = {}
- for i in images:
- if i.type != 'machine' or i.kernel_id is None: continue
- name = os.path.dirname(i.location)
- detail = {'imageID' : i.id, 'kernelID' : i.kernel_id, 'ramdiskID' : i.ramdisk_id}
- cloud['imageBundles'][name] = detail
-
- # Initialize sqlite3 database and tables.
- dbPath = '/etc/sfa/db'
- dbName = 'euca_aggregate.db'
-
- if not os.path.isdir(dbPath):
- logger.info('%s not found. Creating directory ...' % dbPath)
- os.mkdir(dbPath)
-
- conn = connectionForURI('sqlite://%s/%s' % (dbPath, dbName))
- sqlhub.processConnection = conn
- Slice.createTable(ifNotExists=True)
- EucaInstance.createTable(ifNotExists=True)
- Meta.createTable(ifNotExists=True)
-
- # Start the update process to keep track of the meta data
- # about Eucalyptus instance.
- Process(target=updateMeta).start()
-
- # Make sure the schema exists.
- if not os.path.exists(EUCALYPTUS_RSPEC_SCHEMA):
- err = 'Cannot location schema at %s' % EUCALYPTUS_RSPEC_SCHEMA
- logger.error(err)
- raise Exception(err)
-
-##
-# Creates a connection to Eucalytpus. This function is inspired by
-# the make_connection() in Euca2ools.
-#
-# @return A connection object or None
-#
-def getEucaConnection():
- global cloud
- accessKey = cloud['access_key']
- secretKey = cloud['secret_key']
- eucaURL = cloud['cloud_url']
- useSSL = False
- srvPath = '/'
- eucaPort = 8773
- logger = logging.getLogger('EucaAggregate')
-
- if not accessKey or not secretKey or not eucaURL:
- logger.error('Please set ALL of the required environment ' \
- 'variables by sourcing the eucarc file.')
- return None
-
- # Split the url into parts
- if eucaURL.find('https://') >= 0:
- useSSL = True
- eucaURL = eucaURL.replace('https://', '')
- elif eucaURL.find('http://') >= 0:
- useSSL = False
- eucaURL = eucaURL.replace('http://', '')
- (eucaHost, parts) = eucaURL.split(':')
- if len(parts) > 1:
- parts = parts.split('/')
- eucaPort = int(parts[0])
- parts = parts[1:]
- srvPath = '/'.join(parts)
-
- return boto.connect_ec2(aws_access_key_id=accessKey,
- aws_secret_access_key=secretKey,
- is_secure=useSSL,
- region=RegionInfo(None, 'eucalyptus', eucaHost),
- port=eucaPort,
- path=srvPath)
-
-##
-# Returns a string of keys that belong to the users of the given slice.
-# @param sliceHRN The hunman readable name of the slice.
-# @return sting()
-#
-# This method is no longer needed because the user keys are passed into
-# CreateSliver
-#
-def getKeysForSlice(api, sliceHRN):
- logger = logging.getLogger('EucaAggregate')
- cred = api.getCredential()
- registry = api.registries[api.hrn]
- keys = []
-
- # Get the slice record
- records = registry.Resolve(sliceHRN, cred)
- if not records:
- logging.warn('Cannot find any record for slice %s' % sliceHRN)
- return []
-
- # Find who can log into this slice
- persons = records[0]['persons']
-
- # Extract the keys from persons records
- for p in persons:
- sliceUser = registry.Resolve(p, cred)
- userKeys = sliceUser[0]['keys']
- keys += userKeys
-
- return '\n'.join(keys)
-
##
# A class that builds the RSpec for Eucalyptus.
#
return clusterList
-def ListResources(api, creds, options, call_id):
- if Callids().already_handled(call_id): return ""
- global cloud
- # get slice's hrn from options
- xrn = options.get('geni_slice_urn', '')
- hrn, type = urn_to_hrn(xrn)
- logger = logging.getLogger('EucaAggregate')
-
- # get hrn of the original caller
- origin_hrn = options.get('origin_hrn', None)
- if not origin_hrn:
- origin_hrn = Credential(string=creds[0]).get_gid_caller().get_hrn()
-
- conn = getEucaConnection()
-
- if not conn:
- logger.error('Cannot create a connection to Eucalyptus')
- return 'Cannot create a connection to Eucalyptus'
-
- try:
- # Zones
- zones = conn.get_all_zones(['verbose'])
- p = ZoneResultParser(zones)
- clusters = p.parse()
- cloud['clusters'] = clusters
-
- # Images
- images = conn.get_all_images()
- cloud['images'] = images
- cloud['imageBundles'] = {}
+class AggregateManagerEucalyptus:
+
+ # The data structure used to represent a cloud.
+ # It contains the cloud name, its ip address, image information,
+ # key pairs, and clusters information.
+ cloud = {}
+
+ # The location of the RelaxNG schema.
+ EUCALYPTUS_RSPEC_SCHEMA='/etc/sfa/eucalyptus.rng'
+
+ _inited=False
+
+ # the init_server mechanism has vanished
+ def __init__ (self):
+ if AggregateManagerEucalyptus._inited: return
+ AggregateManagerEucalyptus.init_server()
+
+ # Initialize the aggregate manager by reading a configuration file.
+ @staticmethod
+ def init_server():
+ logger = logging.getLogger('EucaAggregate')
+ fileHandler = logging.FileHandler('/var/log/euca.log')
+ fileHandler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
+ logger.addHandler(fileHandler)
+ fileHandler.setLevel(logging.DEBUG)
+ logger.setLevel(logging.DEBUG)
+
+ configParser = ConfigParser()
+ configParser.read(['/etc/sfa/eucalyptus_aggregate.conf', 'eucalyptus_aggregate.conf'])
+ if len(configParser.sections()) < 1:
+ logger.error('No cloud defined in the config file')
+ raise Exception('Cannot find cloud definition in configuration file.')
+
+ # Only read the first section.
+ cloudSec = configParser.sections()[0]
+ AggregateManagerEucalyptus.cloud['name'] = cloudSec
+ AggregateManagerEucalyptus.cloud['access_key'] = configParser.get(cloudSec, 'access_key')
+ AggregateManagerEucalyptus.cloud['secret_key'] = configParser.get(cloudSec, 'secret_key')
+ AggregateManagerEucalyptus.cloud['cloud_url'] = configParser.get(cloudSec, 'cloud_url')
+ cloudURL = AggregateManagerEucalyptus.cloud['cloud_url']
+ if cloudURL.find('https://') >= 0:
+ cloudURL = cloudURL.replace('https://', '')
+ elif cloudURL.find('http://') >= 0:
+ cloudURL = cloudURL.replace('http://', '')
+ (AggregateManagerEucalyptus.cloud['ip'], parts) = cloudURL.split(':')
+
+ # Create image bundles
+ images = self.getEucaConnection().get_all_images()
+ AggregateManagerEucalyptus.cloud['images'] = images
+ AggregateManagerEucalyptus.cloud['imageBundles'] = {}
for i in images:
if i.type != 'machine' or i.kernel_id is None: continue
name = os.path.dirname(i.location)
detail = {'imageID' : i.id, 'kernelID' : i.kernel_id, 'ramdiskID' : i.ramdisk_id}
- cloud['imageBundles'][name] = detail
-
- # Key Pairs
- keyPairs = conn.get_all_key_pairs()
- cloud['keypairs'] = keyPairs
-
- if hrn:
- instanceId = []
- instances = []
-
- # Get the instances that belong to the given slice from sqlite3
- # XXX use getOne() in production because the slice's hrn is supposed
- # to be unique. For testing, uniqueness is turned off in the db.
- # If the slice isn't found in the database, create a record for the
- # slice.
- matchedSlices = list(Slice.select(Slice.q.slice_hrn == hrn))
- if matchedSlices:
- theSlice = matchedSlices[-1]
- else:
- theSlice = Slice(slice_hrn = hrn)
- for instance in theSlice.instances:
- instanceId.append(instance.instance_id)
-
- # Get the information about those instances using their ids.
- if len(instanceId) > 0:
- reservations = conn.get_all_instances(instanceId)
- else:
- reservations = []
+ AggregateManagerEucalyptus.cloud['imageBundles'][name] = detail
+
+ # Initialize sqlite3 database and tables.
+ dbPath = '/etc/sfa/db'
+ dbName = 'euca_aggregate.db'
+
+ if not os.path.isdir(dbPath):
+ logger.info('%s not found. Creating directory ...' % dbPath)
+ os.mkdir(dbPath)
+
+ conn = connectionForURI('sqlite://%s/%s' % (dbPath, dbName))
+ sqlhub.processConnection = conn
+ Slice.createTable(ifNotExists=True)
+ EucaInstance.createTable(ifNotExists=True)
+ Meta.createTable(ifNotExists=True)
+
+ # Start the update process to keep track of the meta data
+ # about Eucalyptus instance.
+ Process(target=AggregateManagerEucalyptus.updateMeta).start()
+
+ # Make sure the schema exists.
+ if not os.path.exists(AggregateManagerEucalyptus.EUCALYPTUS_RSPEC_SCHEMA):
+ err = 'Cannot location schema at %s' % AggregateManagerEucalyptus.EUCALYPTUS_RSPEC_SCHEMA
+ logger.error(err)
+ raise Exception(err)
+
+ #
+ # A separate process that will update the meta data.
+ #
+ @staticmethod
+ def updateMeta():
+ logger = logging.getLogger('EucaMeta')
+ fileHandler = logging.FileHandler('/var/log/euca_meta.log')
+ fileHandler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
+ logger.addHandler(fileHandler)
+ fileHandler.setLevel(logging.DEBUG)
+ logger.setLevel(logging.DEBUG)
+
+ while True:
+ sleep(30)
+
+ # Get IDs of the instances that don't have IPs yet.
+ dbResults = Meta.select(
+ AND(Meta.q.pri_addr == None,
+ Meta.q.state != 'deleted')
+ )
+ dbResults = list(dbResults)
+ logger.debug('[update process] dbResults: %s' % dbResults)
+ instids = []
+ for r in dbResults:
+ if not r.instance:
+ continue
+ instids.append(r.instance.instance_id)
+ logger.debug('[update process] Instance Id: %s' % ', '.join(instids))
+
+ # Get instance information from Eucalyptus
+ conn = self.getEucaConnection()
+ vmInstances = []
+ reservations = conn.get_all_instances(instids)
for reservation in reservations:
- for instance in reservation.instances:
- instances.append(instance)
-
- # Construct a dictionary for the EucaRSpecBuilder
- instancesDict = {}
- for instance in instances:
- instList = instancesDict.setdefault(instance.instance_type, [])
- instInfoDict = {}
-
- instInfoDict['id'] = instance.id
- instInfoDict['public_dns'] = instance.public_dns_name
- instInfoDict['state'] = instance.state
- instInfoDict['key'] = instance.key_name
-
- instList.append(instInfoDict)
- cloud['instances'] = instancesDict
-
- except EC2ResponseError, ec2RespErr:
- errTree = ET.fromstring(ec2RespErr.body)
- errMsgE = errTree.find('.//Message')
- logger.error(errMsgE.text)
-
- rspec = EucaRSpecBuilder(cloud).toXML()
-
- # Remove the instances records so next time they won't
- # show up.
- if 'instances' in cloud:
- del cloud['instances']
-
- return rspec
-
-"""
-Hook called via 'sfi.py create'
-"""
-def CreateSliver(api, slice_xrn, creds, xml, users, call_id):
- if Callids().already_handled(call_id): return ""
-
- global cloud
- logger = logging.getLogger('EucaAggregate')
- logger.debug("In CreateSliver")
-
- aggregate = Aggregate(api)
- slices = Slices(api)
- (hrn, type) = urn_to_hrn(slice_xrn)
- peer = slices.get_peer(hrn)
- sfa_peer = slices.get_sfa_peer(hrn)
- slice_record=None
- if users:
- slice_record = users[0].get('slice_record', {})
-
- conn = getEucaConnection()
- if not conn:
- logger.error('Cannot create a connection to Eucalyptus')
- return ""
-
- # Validate RSpec
- schemaXML = ET.parse(EUCALYPTUS_RSPEC_SCHEMA)
- rspecValidator = ET.RelaxNG(schemaXML)
- rspecXML = ET.XML(xml)
- for network in rspecXML.iterfind("./network"):
- if network.get('name') != cloud['name']:
- # Throw away everything except my own RSpec
- # sfa_logger().error("CreateSliver: deleting %s from rspec"%network.get('id'))
- network.getparent().remove(network)
- if not rspecValidator(rspecXML):
- error = rspecValidator.error_log.last_error
- message = '%s (line %s)' % (error.message, error.line)
- raise InvalidRSpec(message)
-
+ vmInstances += reservation.instances
+
+ # Check the IPs
+ instIPs = [ {'id':i.id, 'pri_addr':i.private_dns_name, 'pub_addr':i.public_dns_name}
+ for i in vmInstances if i.private_dns_name != '0.0.0.0' ]
+ logger.debug('[update process] IP dict: %s' % str(instIPs))
+
+ # Update the local DB
+ for ipData in instIPs:
+ dbInst = EucaInstance.select(EucaInstance.q.instance_id == ipData['id']).getOne(None)
+ if not dbInst:
+ logger.info('[update process] Could not find %s in DB' % ipData['id'])
+ continue
+ dbInst.meta.pri_addr = ipData['pri_addr']
+ dbInst.meta.pub_addr = ipData['pub_addr']
+ dbInst.meta.state = 'running'
+
+ self.dumpinstanceInfo()
+
+ ##
+ # Creates a connection to Eucalytpus. This function is inspired by
+ # the make_connection() in Euca2ools.
+ #
+ # @return A connection object or None
+ #
+ def getEucaConnection():
+ accessKey = AggregateManagerEucalyptus.cloud['access_key']
+ secretKey = AggregateManagerEucalyptus.cloud['secret_key']
+ eucaURL = AggregateManagerEucalyptus.cloud['cloud_url']
+ useSSL = False
+ srvPath = '/'
+ eucaPort = 8773
+ logger = logging.getLogger('EucaAggregate')
+
+ if not accessKey or not secretKey or not eucaURL:
+ logger.error('Please set ALL of the required environment ' \
+ 'variables by sourcing the eucarc file.')
+ return None
+
+ # Split the url into parts
+ if eucaURL.find('https://') >= 0:
+ useSSL = True
+ eucaURL = eucaURL.replace('https://', '')
+ elif eucaURL.find('http://') >= 0:
+ useSSL = False
+ eucaURL = eucaURL.replace('http://', '')
+ (eucaHost, parts) = eucaURL.split(':')
+ if len(parts) > 1:
+ parts = parts.split('/')
+ eucaPort = int(parts[0])
+ parts = parts[1:]
+ srvPath = '/'.join(parts)
+
+ return boto.connect_ec2(aws_access_key_id=accessKey,
+ aws_secret_access_key=secretKey,
+ is_secure=useSSL,
+ region=RegionInfo(None, 'eucalyptus', eucaHost),
+ port=eucaPort,
+ path=srvPath)
+
+ def ListResources(api, creds, options, call_id):
+ if Callids().already_handled(call_id): return ""
+ # get slice's hrn from options
+ xrn = options.get('geni_slice_urn', '')
+ hrn, type = urn_to_hrn(xrn)
+ logger = logging.getLogger('EucaAggregate')
+
+ # get hrn of the original caller
+ origin_hrn = options.get('origin_hrn', None)
+ if not origin_hrn:
+ origin_hrn = Credential(string=creds[0]).get_gid_caller().get_hrn()
+
+ conn = self.getEucaConnection()
+
+ if not conn:
+ logger.error('Cannot create a connection to Eucalyptus')
+ return 'Cannot create a connection to Eucalyptus'
+
+ try:
+ # Zones
+ zones = conn.get_all_zones(['verbose'])
+ p = ZoneResultParser(zones)
+ clusters = p.parse()
+ AggregateManagerEucalyptus.cloud['clusters'] = clusters
+
+ # Images
+ images = conn.get_all_images()
+ AggregateManagerEucalyptus.cloud['images'] = images
+ AggregateManagerEucalyptus.cloud['imageBundles'] = {}
+ for i in images:
+ if i.type != 'machine' or i.kernel_id is None: continue
+ name = os.path.dirname(i.location)
+ detail = {'imageID' : i.id, 'kernelID' : i.kernel_id, 'ramdiskID' : i.ramdisk_id}
+ AggregateManagerEucalyptus.cloud['imageBundles'][name] = detail
+
+ # Key Pairs
+ keyPairs = conn.get_all_key_pairs()
+ AggregateManagerEucalyptus.cloud['keypairs'] = keyPairs
+
+ if hrn:
+ instanceId = []
+ instances = []
+
+ # Get the instances that belong to the given slice from sqlite3
+ # XXX use getOne() in production because the slice's hrn is supposed
+ # to be unique. For testing, uniqueness is turned off in the db.
+ # If the slice isn't found in the database, create a record for the
+ # slice.
+ matchedSlices = list(Slice.select(Slice.q.slice_hrn == hrn))
+ if matchedSlices:
+ theSlice = matchedSlices[-1]
+ else:
+ theSlice = Slice(slice_hrn = hrn)
+ for instance in theSlice.instances:
+ instanceId.append(instance.instance_id)
+
+ # Get the information about those instances using their ids.
+ if len(instanceId) > 0:
+ reservations = conn.get_all_instances(instanceId)
+ else:
+ reservations = []
+ for reservation in reservations:
+ for instance in reservation.instances:
+ instances.append(instance)
+
+ # Construct a dictionary for the EucaRSpecBuilder
+ instancesDict = {}
+ for instance in instances:
+ instList = instancesDict.setdefault(instance.instance_type, [])
+ instInfoDict = {}
+
+ instInfoDict['id'] = instance.id
+ instInfoDict['public_dns'] = instance.public_dns_name
+ instInfoDict['state'] = instance.state
+ instInfoDict['key'] = instance.key_name
+
+ instList.append(instInfoDict)
+ AggregateManagerEucalyptus.cloud['instances'] = instancesDict
+
+ except EC2ResponseError, ec2RespErr:
+ errTree = ET.fromstring(ec2RespErr.body)
+ errMsgE = errTree.find('.//Message')
+ logger.error(errMsgE.text)
+
+ rspec = EucaRSpecBuilder(AggregateManagerEucalyptus.cloud).toXML()
+
+ # Remove the instances records so next time they won't
+ # show up.
+ if 'instances' in AggregateManagerEucalyptus.cloud:
+ del AggregateManagerEucalyptus.cloud['instances']
+
+ return rspec
+
"""
- Create the sliver[s] (slice) at this aggregate.
- Verify HRN and initialize the slice record in PLC if necessary.
+ Hook called via 'sfi.py create'
"""
-
- # ensure site record exists
- site = slices.verify_site(hrn, slice_record, peer, sfa_peer)
- # ensure slice record exists
- slice = slices.verify_slice(hrn, slice_record, peer, sfa_peer)
- # ensure person records exists
- persons = slices.verify_persons(hrn, slice, users, peer, sfa_peer)
-
- # Get the slice from db or create one.
- s = Slice.select(Slice.q.slice_hrn == hrn).getOne(None)
- if s is None:
- s = Slice(slice_hrn = hrn)
-
- # Process any changes in existing instance allocation
- pendingRmInst = []
- for sliceInst in s.instances:
- pendingRmInst.append(sliceInst.instance_id)
- existingInstGroup = rspecXML.findall(".//euca_instances")
- for instGroup in existingInstGroup:
- for existingInst in instGroup:
- if existingInst.get('id') in pendingRmInst:
- pendingRmInst.remove(existingInst.get('id'))
- for inst in pendingRmInst:
- dbInst = EucaInstance.select(EucaInstance.q.instance_id == inst).getOne(None)
- if dbInst.meta.state != 'deleted':
- logger.debug('Instance %s will be terminated' % inst)
- # Terminate instances one at a time for robustness
- conn.terminate_instances([inst])
- # Only change the state but do not remove the entry from the DB.
- dbInst.meta.state = 'deleted'
- #dbInst.destroySelf()
-
- # Process new instance requests
- requests = rspecXML.findall(".//request")
- if requests:
- # Get all the public keys associate with slice.
- keys = []
- for user in users:
- keys += user['keys']
- logger.debug("Keys: %s" % user['keys'])
- pubKeys = '\n'.join(keys)
- logger.debug('Passing the following keys to the instance:\n%s' % pubKeys)
- for req in requests:
- vmTypeElement = req.getparent()
- instType = vmTypeElement.get('name')
- numInst = int(req.find('instances').text)
-
- bundleName = req.find('bundle').text
- if not cloud['imageBundles'][bundleName]:
- logger.error('Cannot find bundle %s' % bundleName)
- bundleInfo = cloud['imageBundles'][bundleName]
- instKernel = bundleInfo['kernelID']
- instDiskImg = bundleInfo['imageID']
- instRamDisk = bundleInfo['ramdiskID']
- instKey = None
-
- # Create the instances
- for i in range(0, numInst):
- eucaInst = EucaInstance(slice = s,
- kernel_id = instKernel,
- image_id = instDiskImg,
- ramdisk_id = instRamDisk,
- key_pair = instKey,
- inst_type = instType,
- meta = Meta(start_time=datetime.datetime.now()))
- eucaInst.reserveInstance(conn, pubKeys)
-
- # xxx - should return altered rspec
- # with enough data for the client to understand what's happened
- return xml
-
-##
-# Return information on the IP addresses bound to each slice's instances
-#
-def dumpInstanceInfo():
- logger = logging.getLogger('EucaMeta')
- outdir = "/var/www/html/euca/"
- outfile = outdir + "instances.txt"
-
- try:
- os.makedirs(outdir)
- except OSError, e:
- if e.errno != errno.EEXIST:
- raise
-
- dbResults = Meta.select(
- AND(Meta.q.pri_addr != None,
- Meta.q.state == 'running')
- )
- dbResults = list(dbResults)
- f = open(outfile, "w")
- for r in dbResults:
- instId = r.instance.instance_id
- ipaddr = r.pri_addr
- hrn = r.instance.slice.slice_hrn
- logger.debug('[dumpInstanceInfo] %s %s %s' % (instId, ipaddr, hrn))
- f.write("%s %s %s\n" % (instId, ipaddr, hrn))
- f.close()
-
-##
-# A separate process that will update the meta data.
-#
-def updateMeta():
- logger = logging.getLogger('EucaMeta')
- fileHandler = logging.FileHandler('/var/log/euca_meta.log')
- fileHandler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
- logger.addHandler(fileHandler)
- fileHandler.setLevel(logging.DEBUG)
- logger.setLevel(logging.DEBUG)
-
- while True:
- sleep(30)
-
- # Get IDs of the instances that don't have IPs yet.
+ def CreateSliver(api, slice_xrn, creds, xml, users, call_id):
+ if Callids().already_handled(call_id): return ""
+
+ logger = logging.getLogger('EucaAggregate')
+ logger.debug("In CreateSliver")
+
+ aggregate = Aggregate(api)
+ slices = Slices(api)
+ (hrn, type) = urn_to_hrn(slice_xrn)
+ peer = slices.get_peer(hrn)
+ sfa_peer = slices.get_sfa_peer(hrn)
+ slice_record=None
+ if users:
+ slice_record = users[0].get('slice_record', {})
+
+ conn = self.getEucaConnection()
+ if not conn:
+ logger.error('Cannot create a connection to Eucalyptus')
+ return ""
+
+ # Validate RSpec
+ schemaXML = ET.parse(AggregateManagerEucalyptus.EUCALYPTUS_RSPEC_SCHEMA)
+ rspecValidator = ET.RelaxNG(schemaXML)
+ rspecXML = ET.XML(xml)
+ for network in rspecXML.iterfind("./network"):
+ if network.get('name') != AggregateManagerEucalyptus.cloud['name']:
+ # Throw away everything except my own RSpec
+ # sfa_logger().error("CreateSliver: deleting %s from rspec"%network.get('id'))
+ network.getparent().remove(network)
+ if not rspecValidator(rspecXML):
+ error = rspecValidator.error_log.last_error
+ message = '%s (line %s)' % (error.message, error.line)
+ raise InvalidRSpec(message)
+
+ """
+ Create the sliver[s] (slice) at this aggregate.
+ Verify HRN and initialize the slice record in PLC if necessary.
+ """
+
+ # ensure site record exists
+ site = slices.verify_site(hrn, slice_record, peer, sfa_peer)
+ # ensure slice record exists
+ slice = slices.verify_slice(hrn, slice_record, peer, sfa_peer)
+ # ensure person records exists
+ persons = slices.verify_persons(hrn, slice, users, peer, sfa_peer)
+
+ # Get the slice from db or create one.
+ s = Slice.select(Slice.q.slice_hrn == hrn).getOne(None)
+ if s is None:
+ s = Slice(slice_hrn = hrn)
+
+ # Process any changes in existing instance allocation
+ pendingRmInst = []
+ for sliceInst in s.instances:
+ pendingRmInst.append(sliceInst.instance_id)
+ existingInstGroup = rspecXML.findall(".//euca_instances")
+ for instGroup in existingInstGroup:
+ for existingInst in instGroup:
+ if existingInst.get('id') in pendingRmInst:
+ pendingRmInst.remove(existingInst.get('id'))
+ for inst in pendingRmInst:
+ dbInst = EucaInstance.select(EucaInstance.q.instance_id == inst).getOne(None)
+ if dbInst.meta.state != 'deleted':
+ logger.debug('Instance %s will be terminated' % inst)
+ # Terminate instances one at a time for robustness
+ conn.terminate_instances([inst])
+ # Only change the state but do not remove the entry from the DB.
+ dbInst.meta.state = 'deleted'
+ #dbInst.destroySelf()
+
+ # Process new instance requests
+ requests = rspecXML.findall(".//request")
+ if requests:
+ # Get all the public keys associate with slice.
+ keys = []
+ for user in users:
+ keys += user['keys']
+ logger.debug("Keys: %s" % user['keys'])
+ pubKeys = '\n'.join(keys)
+ logger.debug('Passing the following keys to the instance:\n%s' % pubKeys)
+ for req in requests:
+ vmTypeElement = req.getparent()
+ instType = vmTypeElement.get('name')
+ numInst = int(req.find('instances').text)
+
+ bundleName = req.find('bundle').text
+ if not AggregateManagerEucalyptus.cloud['imageBundles'][bundleName]:
+ logger.error('Cannot find bundle %s' % bundleName)
+ bundleInfo = AggregateManagerEucalyptus.cloud['imageBundles'][bundleName]
+ instKernel = bundleInfo['kernelID']
+ instDiskImg = bundleInfo['imageID']
+ instRamDisk = bundleInfo['ramdiskID']
+ instKey = None
+
+ # Create the instances
+ for i in range(0, numInst):
+ eucaInst = EucaInstance(slice = s,
+ kernel_id = instKernel,
+ image_id = instDiskImg,
+ ramdisk_id = instRamDisk,
+ key_pair = instKey,
+ inst_type = instType,
+ meta = Meta(start_time=datetime.datetime.now()))
+ eucaInst.reserveInstance(conn, pubKeys)
+
+ # xxx - should return altered rspec
+ # with enough data for the client to understand what's happened
+ return xml
+
+ ##
+ # Return information on the IP addresses bound to each slice's instances
+ #
+ def dumpInstanceInfo():
+ logger = logging.getLogger('EucaMeta')
+ outdir = "/var/www/html/euca/"
+ outfile = outdir + "instances.txt"
+
+ try:
+ os.makedirs(outdir)
+ except OSError, e:
+ if e.errno != errno.EEXIST:
+ raise
+
dbResults = Meta.select(
- AND(Meta.q.pri_addr == None,
- Meta.q.state != 'deleted')
- )
+ AND(Meta.q.pri_addr != None,
+ Meta.q.state == 'running')
+ )
dbResults = list(dbResults)
- logger.debug('[update process] dbResults: %s' % dbResults)
- instids = []
+ f = open(outfile, "w")
for r in dbResults:
- if not r.instance:
- continue
- instids.append(r.instance.instance_id)
- logger.debug('[update process] Instance Id: %s' % ', '.join(instids))
-
- # Get instance information from Eucalyptus
- conn = getEucaConnection()
- vmInstances = []
- reservations = conn.get_all_instances(instids)
- for reservation in reservations:
- vmInstances += reservation.instances
-
- # Check the IPs
- instIPs = [ {'id':i.id, 'pri_addr':i.private_dns_name, 'pub_addr':i.public_dns_name}
- for i in vmInstances if i.private_dns_name != '0.0.0.0' ]
- logger.debug('[update process] IP dict: %s' % str(instIPs))
-
- # Update the local DB
- for ipData in instIPs:
- dbInst = EucaInstance.select(EucaInstance.q.instance_id == ipData['id']).getOne(None)
- if not dbInst:
- logger.info('[update process] Could not find %s in DB' % ipData['id'])
- continue
- dbInst.meta.pri_addr = ipData['pri_addr']
- dbInst.meta.pub_addr = ipData['pub_addr']
- dbInst.meta.state = 'running'
-
- dumpInstanceInfo()
-
-def GetVersion(api):
- xrn=Xrn(api.hrn)
- request_rspec_versions = [dict(sfa_rspec_version)]
- ad_rspec_versions = [dict(sfa_rspec_version)]
- version_more = {'interface':'aggregate',
- 'testbed':'myplc',
- 'hrn':xrn.get_hrn(),
- 'request_rspec_versions': request_rspec_versions,
- 'ad_rspec_versions': ad_rspec_versions,
- 'default_ad_rspec': dict(sfa_rspec_version)
- }
- return version_core(version_more)
-
-#def main():
-# init_server()
-#
-# #theRSpec = None
-# #with open(sys.argv[1]) as xml:
-# # theRSpec = xml.read()
-# #CreateSliver(None, 'planetcloud.pc.test', theRSpec, 'call-id-cloudtest')
-#
-# #rspec = ListResources('euca', 'planetcloud.pc.test', 'planetcloud.pc.marcoy', 'test_euca')
-# #print rspec
-#
-# server_key_file = '/var/lib/sfa/authorities/server.key'
-# server_cert_file = '/var/lib/sfa/authorities/server.cert'
-# api = PlcSfaApi(key_file = server_key_file, cert_file = server_cert_file, interface='aggregate')
-# print getKeysForSlice(api, 'gc.gc.test1')
-#
-#if __name__ == "__main__":
-# main()
+ instId = r.instance.instance_id
+ ipaddr = r.pri_addr
+ hrn = r.instance.slice.slice_hrn
+ logger.debug('[dumpInstanceInfo] %s %s %s' % (instId, ipaddr, hrn))
+ f.write("%s %s %s\n" % (instId, ipaddr, hrn))
+ f.close()
+
+ def GetVersion(api):
+ xrn=Xrn(api.hrn)
+ request_rspec_versions = [dict(sfa_rspec_version)]
+ ad_rspec_versions = [dict(sfa_rspec_version)]
+ version_more = {'interface':'aggregate',
+ 'testbed':'myplc',
+ 'hrn':xrn.get_hrn(),
+ 'request_rspec_versions': request_rspec_versions,
+ 'ad_rspec_versions': ad_rspec_versions,
+ 'default_ad_rspec': dict(sfa_rspec_version)
+ }
+ return version_core(version_more)
-import os\r
-import time\r
-import re\r
-\r
-from sfa.util.faults import *\r
-from sfa.util.sfalogging import logger\r
-from sfa.util.config import Config\r
-from sfa.util.sfatime import utcparse\r
-from sfa.util.callids import Callids\r
-from sfa.util.version import version_core\r
-from sfa.util.xrn import urn_to_hrn, hrn_to_urn, get_authority, Xrn\r
-from sfa.util.plxrn import hrn_to_pl_slicename\r
-\r
-from sfa.server.sfaapi import SfaApi\r
-from sfa.server.registry import Registries\r
-from sfa.rspecs.rspec_version import RSpecVersion\r
-from sfa.rspecs.sfa_rspec import sfa_rspec_version\r
-from sfa.rspecs.rspec_parser import parse_rspec\r
-\r
-from sfa.managers.aggregate_manager import __get_registry_objects, ListSlices\r
-\r
-from sfa.plc.slices import Slices\r
-\r
-\r
-RSPEC_TMP_FILE_PREFIX = "/tmp/max_rspec"\r
-\r
-# execute shell command and return both exit code and text output\r
-def shell_execute(cmd, timeout):\r
- pipe = os.popen('{ ' + cmd + '; } 2>&1', 'r')\r
- pipe = os.popen(cmd + ' 2>&1', 'r')\r
- text = ''\r
- while timeout:\r
- line = pipe.read()\r
- text += line\r
- time.sleep(1)\r
- timeout = timeout-1\r
- code = pipe.close()\r
- if code is None: code = 0\r
- if text[-1:] == '\n': text = text[:-1]\r
- return code, text\r
-\r
-"""\r
- call AM API client with command like in the following example:\r
- cd aggregate_client; java -classpath AggregateWS-client-api.jar:lib/* \\r
- net.geni.aggregate.client.examples.CreateSliceNetworkClient \\r
- ./repo https://geni:8443/axis2/services/AggregateGENI \\r
- ... params ...\r
-"""\r
-\r
-def call_am_apiclient(client_app, params, timeout):\r
- (client_path, am_url) = Config().get_max_aggrMgr_info()\r
- sys_cmd = "cd " + client_path + "; java -classpath AggregateWS-client-api.jar:lib/* net.geni.aggregate.client.examples." + client_app + " ./repo " + am_url + " " + ' '.join(params)\r
- ret = shell_execute(sys_cmd, timeout)\r
- logger.debug("shell_execute cmd: %s returns %s" % (sys_cmd, ret))\r
- return ret\r
-\r
-# save request RSpec xml content to a tmp file\r
-def save_rspec_to_file(rspec):\r
- path = RSPEC_TMP_FILE_PREFIX + "_" + time.strftime('%Y%m%dT%H:%M:%S', time.gmtime(time.time())) +".xml"\r
- file = open(path, "w")\r
- file.write(rspec)\r
- file.close()\r
- return path\r
-\r
-# get stripped down slice id/name plc.maxpl.xislice1 --> maxpl_xislice1\r
-def get_plc_slice_id(cred, xrn):\r
- (hrn, type) = urn_to_hrn(xrn)\r
- slice_id = hrn.find(':')\r
- sep = '.'\r
- if hrn.find(':') != -1:\r
- sep=':'\r
- elif hrn.find('+') != -1:\r
- sep='+'\r
- else:\r
- sep='.'\r
- slice_id = hrn.split(sep)[-2] + '_' + hrn.split(sep)[-1]\r
- return slice_id\r
-\r
-# extract xml \r
-def get_xml_by_tag(text, tag):\r
- indx1 = text.find('<'+tag)\r
- indx2 = text.find('/'+tag+'>')\r
- xml = None\r
- if indx1!=-1 and indx2>indx1:\r
- xml = text[indx1:indx2+len(tag)+2]\r
- return xml\r
-\r
-def prepare_slice(api, slice_xrn, creds, users):\r
- reg_objects = __get_registry_objects(slice_xrn, creds, users)\r
- (hrn, type) = urn_to_hrn(slice_xrn)\r
- slices = Slices(api)\r
- peer = slices.get_peer(hrn)\r
- sfa_peer = slices.get_sfa_peer(hrn)\r
- slice_record=None\r
- if users:\r
- slice_record = users[0].get('slice_record', {})\r
- registry = api.registries[api.hrn]\r
- credential = api.getCredential()\r
- # ensure site record exists\r
- site = slices.verify_site(hrn, slice_record, peer, sfa_peer)\r
- # ensure slice record exists\r
- slice = slices.verify_slice(hrn, slice_record, peer, sfa_peer)\r
- # ensure person records exists\r
- persons = slices.verify_persons(hrn, slice, users, peer, sfa_peer)\r
-\r
-def parse_resources(text, slice_xrn):\r
- resources = []\r
- urn = hrn_to_urn(slice_xrn, 'sliver')\r
- plc_slice = re.search("Slice Status => ([^\n]+)", text)\r
- if plc_slice.group(1) != 'NONE':\r
- res = {}\r
- res['geni_urn'] = urn + '_plc_slice'\r
- res['geni_error'] = ''\r
- res['geni_status'] = 'unknown'\r
- if plc_slice.group(1) == 'CREATED':\r
- res['geni_status'] = 'ready'\r
- resources.append(res)\r
- vlans = re.findall("GRI => ([^\n]+)\n\t Status => ([^\n]+)", text)\r
- for vlan in vlans:\r
- res = {}\r
- res['geni_error'] = ''\r
- res['geni_urn'] = urn + '_vlan_' + vlan[0]\r
- if vlan[1] == 'ACTIVE':\r
- res['geni_status'] = 'ready'\r
- elif vlan[1] == 'FAILED':\r
- res['geni_status'] = 'failed'\r
- else:\r
- res['geni_status'] = 'configuring'\r
- resources.append(res)\r
- return resources\r
-\r
-def slice_status(api, slice_xrn, creds):\r
- urn = hrn_to_urn(slice_xrn, 'slice')\r
- result = {}\r
- top_level_status = 'unknown'\r
- slice_id = get_plc_slice_id(creds, urn)\r
- (ret, output) = call_am_apiclient("QuerySliceNetworkClient", [slice_id,], 5)\r
- # parse output into rspec XML\r
- if output.find("Unkown Rspec:") > 0:\r
- top_level_staus = 'failed'\r
- result['geni_resources'] = ''\r
- else:\r
- has_failure = 0\r
- all_active = 0\r
- if output.find("Status => FAILED") > 0:\r
- top_level_staus = 'failed'\r
- elif ( output.find("Status => ACCEPTED") > 0 or output.find("Status => PENDING") > 0\r
- or output.find("Status => INSETUP") > 0 or output.find("Status => INCREATE") > 0\r
- ):\r
- top_level_status = 'configuring'\r
- else:\r
- top_level_status = 'ready'\r
- result['geni_resources'] = parse_resources(output, slice_xrn)\r
- result['geni_urn'] = urn\r
- result['geni_status'] = top_level_status\r
- return result\r
-\r
-def create_slice(api, xrn, cred, rspec, users):\r
- indx1 = rspec.find("<RSpec")\r
- indx2 = rspec.find("</RSpec>")\r
- if indx1 > -1 and indx2 > indx1:\r
- rspec = rspec[indx1+len("<RSpec type=\"SFA\">"):indx2-1]\r
- rspec_path = save_rspec_to_file(rspec)\r
- prepare_slice(api, xrn, cred, users)\r
- slice_id = get_plc_slice_id(cred, xrn)\r
- sys_cmd = "sed -i \"s/rspec id=\\\"[^\\\"]*/rspec id=\\\"" +slice_id+ "/g\" " + rspec_path + ";sed -i \"s/:rspec=[^:'<\\\" ]*/:rspec=" +slice_id+ "/g\" " + rspec_path\r
- ret = shell_execute(sys_cmd, 1)\r
- sys_cmd = "sed -i \"s/rspec id=\\\"[^\\\"]*/rspec id=\\\"" + rspec_path + "/g\""\r
- ret = shell_execute(sys_cmd, 1)\r
- (ret, output) = call_am_apiclient("CreateSliceNetworkClient", [rspec_path,], 3)\r
- # parse output ?\r
- rspec = "<RSpec type=\"SFA\"> Done! </RSpec>"\r
- return True\r
-\r
-def delete_slice(api, xrn, cred):\r
- slice_id = get_plc_slice_id(cred, xrn)\r
- (ret, output) = call_am_apiclient("DeleteSliceNetworkClient", [slice_id,], 3)\r
- # parse output ?\r
- return 1\r
-\r
-\r
-def get_rspec(api, cred, slice_urn):\r
- logger.debug("#### called max-get_rspec")\r
- #geni_slice_urn: urn:publicid:IDN+plc:maxpl+slice+xi_rspec_test1\r
- if slice_urn == None:\r
- (ret, output) = call_am_apiclient("GetResourceTopology", ['all', '\"\"'], 5)\r
- else:\r
- slice_id = get_plc_slice_id(cred, slice_urn)\r
- (ret, output) = call_am_apiclient("GetResourceTopology", ['all', slice_id,], 5)\r
- # parse output into rspec XML\r
- if output.find("No resouce found") > 0:\r
- rspec = "<RSpec type=\"SFA\"> <Fault>No resource found</Fault> </RSpec>"\r
- else:\r
- comp_rspec = get_xml_by_tag(output, 'computeResource')\r
- logger.debug("#### computeResource %s" % comp_rspec)\r
- topo_rspec = get_xml_by_tag(output, 'topology')\r
- logger.debug("#### topology %s" % topo_rspec)\r
- rspec = "<RSpec type=\"SFA\"> <network name=\"" + Config().get_interface_hrn() + "\">";\r
- if comp_rspec != None:\r
- rspec = rspec + get_xml_by_tag(output, 'computeResource')\r
- if topo_rspec != None:\r
- rspec = rspec + get_xml_by_tag(output, 'topology')\r
- rspec = rspec + "</network> </RSpec>"\r
- return (rspec)\r
-\r
-def start_slice(api, xrn, cred):\r
- # service not supported\r
- return None\r
-\r
-def stop_slice(api, xrn, cred):\r
- # service not supported\r
- return None\r
-\r
-def reset_slices(api, xrn):\r
- # service not supported\r
- return None\r
-\r
-"""\r
- GENI AM API Methods\r
-"""\r
-\r
-def GetVersion(api):\r
- xrn=Xrn(api.hrn)\r
- request_rspec_versions = [dict(sfa_rspec_version)]\r
- ad_rspec_versions = [dict(sfa_rspec_version)]\r
- #TODO: MAX-AM specific\r
- version_more = {'interface':'aggregate',\r
- 'testbed':'myplc',\r
- 'hrn':xrn.get_hrn(),\r
- 'request_rspec_versions': request_rspec_versions,\r
- 'ad_rspec_versions': ad_rspec_versions,\r
- 'default_ad_rspec': dict(sfa_rspec_version)\r
- }\r
- return version_core(version_more)\r
-\r
-def SliverStatus(api, slice_xrn, creds, call_id):\r
- if Callids().already_handled(call_id): return {}\r
- return slice_status(api, slice_xrn, creds)\r
-\r
-def CreateSliver(api, slice_xrn, creds, rspec_string, users, call_id):\r
- if Callids().already_handled(call_id): return ""\r
- #TODO: create real CreateSliver response rspec\r
- ret = create_slice(api, slice_xrn, creds, rspec_string, users)\r
- if ret:\r
- return get_rspec(api, creds, slice_xrn)\r
- else:\r
- return "<?xml version=\"1.0\" ?> <RSpec type=\"SFA\"> Error! </RSpec>"\r
-\r
-def DeleteSliver(api, xrn, creds, call_id):\r
- if Callids().already_handled(call_id): return ""\r
- return delete_slice(api, xrn, creds)\r
-\r
-# no caching\r
-def ListResources(api, creds, options,call_id):\r
- if Callids().already_handled(call_id): return ""\r
- # version_string = "rspec_%s" % (rspec_version.get_version_name())\r
- slice_urn = options.get('geni_slice_urn')\r
- return get_rspec(api, creds, slice_urn)\r
-\r
-def fetch_context(slice_hrn, user_hrn, contexts):\r
- """\r
- Returns the request context required by sfatables. At some point, this mechanism should be changed\r
- to refer to "contexts", which is the information that sfatables is requesting. But for now, we just\r
- return the basic information needed in a dict.\r
- """\r
- base_context = {'sfa':{'user':{'hrn':user_hrn}}}\r
- return base_context\r
- api = SfaApi()\r
- create_slice(api, "plc.maxpl.test000", None, rspec_xml, None)\r
-\r
+import os
+import time
+import re
+
+#from sfa.util.faults import *
+from sfa.util.sfalogging import logger
+from sfa.util.config import Config
+from sfa.util.callids import Callids
+from sfa.util.version import version_core
+from sfa.util.xrn import urn_to_hrn, hrn_to_urn, Xrn
+
+# xxx the sfa.rspecs module is dead - this symbol is now undefined
+#from sfa.rspecs.sfa_rspec import sfa_rspec_version
+
+from sfa.managers.aggregate_manager import AggregateManager
+
+from sfa.plc.slices import Slices
+
+class AggregateManagerMax (AggregateManager):
+
+ RSPEC_TMP_FILE_PREFIX = "/tmp/max_rspec"
+
+ # execute shell command and return both exit code and text output
+ def shell_execute(self, cmd, timeout):
+ pipe = os.popen('{ ' + cmd + '; } 2>&1', 'r')
+ pipe = os.popen(cmd + ' 2>&1', 'r')
+ text = ''
+ while timeout:
+ line = pipe.read()
+ text += line
+ time.sleep(1)
+ timeout = timeout-1
+ code = pipe.close()
+ if code is None: code = 0
+ if text[-1:] == '\n': text = text[:-1]
+ return code, text
+
+
+ def call_am_apiclient(self, client_app, params, timeout):
+ """
+ call AM API client with command like in the following example:
+ cd aggregate_client; java -classpath AggregateWS-client-api.jar:lib/* \
+ net.geni.aggregate.client.examples.CreateSliceNetworkClient \
+ ./repo https://geni:8443/axis2/services/AggregateGENI \
+ ... params ...
+ """
+ (client_path, am_url) = Config().get_max_aggrMgr_info()
+ sys_cmd = "cd " + client_path + "; java -classpath AggregateWS-client-api.jar:lib/* net.geni.aggregate.client.examples." + client_app + " ./repo " + am_url + " " + ' '.join(params)
+ ret = self.shell_execute(sys_cmd, timeout)
+ logger.debug("shell_execute cmd: %s returns %s" % (sys_cmd, ret))
+ return ret
+
+ # save request RSpec xml content to a tmp file
+ def save_rspec_to_file(self, rspec):
+ path = AggregateManagerMax.RSPEC_TMP_FILE_PREFIX + "_" + \
+ time.strftime('%Y%m%dT%H:%M:%S', time.gmtime(time.time())) +".xml"
+ file = open(path, "w")
+ file.write(rspec)
+ file.close()
+ return path
+
+ # get stripped down slice id/name plc.maxpl.xislice1 --> maxpl_xislice1
+ def get_plc_slice_id(self, cred, xrn):
+ (hrn, type) = urn_to_hrn(xrn)
+ slice_id = hrn.find(':')
+ sep = '.'
+ if hrn.find(':') != -1:
+ sep=':'
+ elif hrn.find('+') != -1:
+ sep='+'
+ else:
+ sep='.'
+ slice_id = hrn.split(sep)[-2] + '_' + hrn.split(sep)[-1]
+ return slice_id
+
+ # extract xml
+ def get_xml_by_tag(self, text, tag):
+ indx1 = text.find('<'+tag)
+ indx2 = text.find('/'+tag+'>')
+ xml = None
+ if indx1!=-1 and indx2>indx1:
+ xml = text[indx1:indx2+len(tag)+2]
+ return xml
+
+ def prepare_slice(self, api, slice_xrn, creds, users):
+ reg_objects = self._get_registry_objects(slice_xrn, creds, users)
+ (hrn, type) = urn_to_hrn(slice_xrn)
+ slices = Slices(api)
+ peer = slices.get_peer(hrn)
+ sfa_peer = slices.get_sfa_peer(hrn)
+ slice_record=None
+ if users:
+ slice_record = users[0].get('slice_record', {})
+ registry = api.registries[api.hrn]
+ credential = api.getCredential()
+ # ensure site record exists
+ site = slices.verify_site(hrn, slice_record, peer, sfa_peer)
+ # ensure slice record exists
+ slice = slices.verify_slice(hrn, slice_record, peer, sfa_peer)
+ # ensure person records exists
+ persons = slices.verify_persons(hrn, slice, users, peer, sfa_peer)
+
+ def parse_resources(self, text, slice_xrn):
+ resources = []
+ urn = hrn_to_urn(slice_xrn, 'sliver')
+ plc_slice = re.search("Slice Status => ([^\n]+)", text)
+ if plc_slice.group(1) != 'NONE':
+ res = {}
+ res['geni_urn'] = urn + '_plc_slice'
+ res['geni_error'] = ''
+ res['geni_status'] = 'unknown'
+ if plc_slice.group(1) == 'CREATED':
+ res['geni_status'] = 'ready'
+ resources.append(res)
+ vlans = re.findall("GRI => ([^\n]+)\n\t Status => ([^\n]+)", text)
+ for vlan in vlans:
+ res = {}
+ res['geni_error'] = ''
+ res['geni_urn'] = urn + '_vlan_' + vlan[0]
+ if vlan[1] == 'ACTIVE':
+ res['geni_status'] = 'ready'
+ elif vlan[1] == 'FAILED':
+ res['geni_status'] = 'failed'
+ else:
+ res['geni_status'] = 'configuring'
+ resources.append(res)
+ return resources
+
+ def slice_status(self, api, slice_xrn, creds):
+ urn = hrn_to_urn(slice_xrn, 'slice')
+ result = {}
+ top_level_status = 'unknown'
+ slice_id = self.get_plc_slice_id(creds, urn)
+ (ret, output) = self.call_am_apiclient("QuerySliceNetworkClient", [slice_id,], 5)
+ # parse output into rspec XML
+ if output.find("Unkown Rspec:") > 0:
+ top_level_staus = 'failed'
+ result['geni_resources'] = ''
+ else:
+ has_failure = 0
+ all_active = 0
+ if output.find("Status => FAILED") > 0:
+ top_level_staus = 'failed'
+ elif ( output.find("Status => ACCEPTED") > 0 or output.find("Status => PENDING") > 0
+ or output.find("Status => INSETUP") > 0 or output.find("Status => INCREATE") > 0
+ ):
+ top_level_status = 'configuring'
+ else:
+ top_level_status = 'ready'
+ result['geni_resources'] = self.parse_resources(output, slice_xrn)
+ result['geni_urn'] = urn
+ result['geni_status'] = top_level_status
+ return result
+
+ def create_slice(self, api, xrn, cred, rspec, users):
+ indx1 = rspec.find("<RSpec")
+ indx2 = rspec.find("</RSpec>")
+ if indx1 > -1 and indx2 > indx1:
+ rspec = rspec[indx1+len("<RSpec type=\"SFA\">"):indx2-1]
+ rspec_path = self.save_rspec_to_file(rspec)
+ self.prepare_slice(api, xrn, cred, users)
+ slice_id = self.get_plc_slice_id(cred, xrn)
+ sys_cmd = "sed -i \"s/rspec id=\\\"[^\\\"]*/rspec id=\\\"" +slice_id+ "/g\" " + rspec_path + ";sed -i \"s/:rspec=[^:'<\\\" ]*/:rspec=" +slice_id+ "/g\" " + rspec_path
+ ret = self.shell_execute(sys_cmd, 1)
+ sys_cmd = "sed -i \"s/rspec id=\\\"[^\\\"]*/rspec id=\\\"" + rspec_path + "/g\""
+ ret = self.shell_execute(sys_cmd, 1)
+ (ret, output) = self.call_am_apiclient("CreateSliceNetworkClient", [rspec_path,], 3)
+ # parse output ?
+ rspec = "<RSpec type=\"SFA\"> Done! </RSpec>"
+ return True
+
+ def delete_slice(self, api, xrn, cred):
+ slice_id = self.get_plc_slice_id(cred, xrn)
+ (ret, output) = self.call_am_apiclient("DeleteSliceNetworkClient", [slice_id,], 3)
+ # parse output ?
+ return 1
+
+
+ def get_rspec(self, api, cred, slice_urn):
+ logger.debug("#### called max-get_rspec")
+ #geni_slice_urn: urn:publicid:IDN+plc:maxpl+slice+xi_rspec_test1
+ if slice_urn == None:
+ (ret, output) = self.call_am_apiclient("GetResourceTopology", ['all', '\"\"'], 5)
+ else:
+ slice_id = self.get_plc_slice_id(cred, slice_urn)
+ (ret, output) = self.call_am_apiclient("GetResourceTopology", ['all', slice_id,], 5)
+ # parse output into rspec XML
+ if output.find("No resouce found") > 0:
+ rspec = "<RSpec type=\"SFA\"> <Fault>No resource found</Fault> </RSpec>"
+ else:
+ comp_rspec = self.get_xml_by_tag(output, 'computeResource')
+ logger.debug("#### computeResource %s" % comp_rspec)
+ topo_rspec = self.get_xml_by_tag(output, 'topology')
+ logger.debug("#### topology %s" % topo_rspec)
+ rspec = "<RSpec type=\"SFA\"> <network name=\"" + Config().get_interface_hrn() + "\">"
+ if comp_rspec != None:
+ rspec = rspec + self.get_xml_by_tag(output, 'computeResource')
+ if topo_rspec != None:
+ rspec = rspec + self.get_xml_by_tag(output, 'topology')
+ rspec = rspec + "</network> </RSpec>"
+ return (rspec)
+
+ def start_slice(self, api, xrn, cred):
+ # service not supported
+ return None
+
+ def stop_slice(self, api, xrn, cred):
+ # service not supported
+ return None
+
+ def reset_slices(self, api, xrn):
+ # service not supported
+ return None
+
+ ### GENI AM API Methods
+
+ def GetVersion(self, api):
+ xrn=Xrn(api.hrn)
+ request_rspec_versions = [dict(sfa_rspec_version)]
+ ad_rspec_versions = [dict(sfa_rspec_version)]
+ #TODO: MAX-AM specific
+ version_more = {'interface':'aggregate',
+ 'testbed':'myplc',
+ 'hrn':xrn.get_hrn(),
+ 'request_rspec_versions': request_rspec_versions,
+ 'ad_rspec_versions': ad_rspec_versions,
+ 'default_ad_rspec': dict(sfa_rspec_version)
+ }
+ return version_core(version_more)
+
+ def SliverStatus(self, api, slice_xrn, creds, call_id):
+ if Callids().already_handled(call_id): return {}
+ return self.slice_status(api, slice_xrn, creds)
+
+ def CreateSliver(self, api, slice_xrn, creds, rspec_string, users, call_id):
+ if Callids().already_handled(call_id): return ""
+ #TODO: create real CreateSliver response rspec
+ ret = self.create_slice(api, slice_xrn, creds, rspec_string, users)
+ if ret:
+ return self.get_rspec(api, creds, slice_xrn)
+ else:
+ return "<?xml version=\"1.0\" ?> <RSpec type=\"SFA\"> Error! </RSpec>"
+
+ def DeleteSliver(self, api, xrn, creds, call_id):
+ if Callids().already_handled(call_id): return ""
+ return self.delete_slice(api, xrn, creds)
+
+ # no caching
+ def ListResources(self, api, creds, options,call_id):
+ if Callids().already_handled(call_id): return ""
+ # version_string = "rspec_%s" % (rspec_version.get_version_name())
+ slice_urn = options.get('geni_slice_urn')
+ return self.get_rspec(api, creds, slice_urn)
+
+ def fetch_context(self, slice_hrn, user_hrn, contexts):
+ """
+ Returns the request context required by sfatables. At some point, this mechanism should be changed
+ to refer to "contexts", which is the information that sfatables is requesting. But for now, we just
+ return the basic information needed in a dict.
+ """
+ base_context = {'sfa':{'user':{'hrn':user_hrn}}}
+ return base_context
+
+++ /dev/null
-import sys
-
-import socket
-import struct
-
-#The following is not essential
-#from soaplib.wsgi_soap import SimpleWSGISoapApp
-#from soaplib.serializers.primitive import *
-#from soaplib.serializers.clazz import *
-
-from sfa.util.faults import *
-from sfa.util.xrn import urn_to_hrn
-from sfa.server.registry import Registries
-from sfa.util.config import Config
-from sfa.plc.nodes import *
-from sfa.util.callids import Callids
-
-# Message IDs for all the SFA light calls
-# This will be used by the aggrMgr controller
-SFA_GET_RESOURCES = 101
-SFA_CREATE_SLICE = 102
-SFA_START_SLICE = 103
-SFA_STOP_SLICE = 104
-SFA_DELETE_SLICE = 105
-SFA_GET_SLICES = 106
-SFA_RESET_SLICES = 107
-
-DEBUG = 1
-
-def print_buffer(buf):
- for i in range(0,len(buf)):
- print('%x' % buf[i])
-
-def extract(sock):
- # Shud we first obtain the message length?
- # msg_len = socket.ntohs(sock.recv(2))
- msg = ""
-
- while (1):
- try:
- chunk = sock.recv(1)
- except socket.error, message:
- if 'timed out' in message:
- break
- else:
- sys.exit("Socket error: " + message)
-
- if len(chunk) == 0:
- break
- msg += chunk
-
- print 'Done extracting %d bytes of response from aggrMgr' % len(msg)
- return msg
-
-def connect(server, port):
- '''Connect to the Aggregate Manager module'''
- sock = socket.socket ( socket.AF_INET, socket.SOCK_STREAM )
- sock.connect ( ( server, port) )
- sock.settimeout(1)
- if DEBUG: print 'Connected!'
- return sock
-
-def connect_aggrMgr():
- (aggr_mgr_ip, aggr_mgr_port) = Config().get_openflow_aggrMgr_info()
- if DEBUG: print """Connecting to port %d of %s""" % (aggr_mgr_port, aggr_mgr_ip)
- return connect(aggr_mgr_ip, aggr_mgr_port)
-
-def generate_slide_id(cred, hrn):
- if cred == None:
- cred = ""
- if hrn == None:
- hrn = ""
- #return cred + '_' + hrn
- return str(hrn)
-
-def msg_aggrMgr(cred, hrn, msg_id):
- slice_id = generate_slide_id(cred, hrn)
-
- msg = struct.pack('> B%ds' % len(slice_id), msg_id, slice_id)
- buf = struct.pack('> H', len(msg)+2) + msg
-
- try:
- aggrMgr_sock = connect_aggrMgr()
- aggrMgr_sock.send(buf)
- aggrMgr_sock.close()
- return 1
- except socket.error, message:
- print "Socket error"
- except IOerror, message:
- print "IO error"
- return 0
-
-def start_slice(cred, xrn):
- hrn = urn_to_hrn(xrn)[0]
- if DEBUG: print "Received start_slice call"
- return msg_aggrMgr(SFA_START_SLICE)
-
-def stop_slice(cred, xrn):
- hrn = urn_to_hrn(xrn)[0]
- if DEBUG: print "Received stop_slice call"
- return msg_aggrMgr(SFA_STOP_SLICE)
-
-def DeleteSliver(cred, xrn, call_id):
- if Callids().already_handled(call_id): return ""
- hrn = urn_to_hrn(xrn)[0]
- if DEBUG: print "Received DeleteSliver call"
- return msg_aggrMgr(SFA_DELETE_SLICE)
-
-def reset_slices(cred, xrn):
- hrn = urn_to_hrn(xrn)[0]
- if DEBUG: print "Received reset_slices call"
- return msg_aggrMgr(SFA_RESET_SLICES)
-
-### Thierry: xxx this should ahve api as a first arg - probably outdated
-def CreateSliver(cred, xrn, rspec, call_id):
- if Callids().already_handled(call_id): return ""
-
- hrn = urn_to_hrn(xrn)[0]
- if DEBUG: print "Received CreateSliver call"
- slice_id = generate_slide_id(cred, hrn)
-
- msg = struct.pack('> B%ds%ds' % (len(slice_id)+1, len(rspec)), SFA_CREATE_SLICE, slice_id, rspec)
- buf = struct.pack('> H', len(msg)+2) + msg
-
- try:
- aggrMgr_sock = connect_aggrMgr()
- aggrMgr_sock.send(buf)
- if DEBUG: print "Sent %d bytes and closing connection" % len(buf)
- aggrMgr_sock.close()
-
- if DEBUG: print "----------------"
- return rspec
- except socket.error, message:
- print "Socket error"
- except IOerror, message:
- print "IO error"
- return ""
-
-# Thierry : xxx this would need to handle call_id like the other AMs but is outdated...
-def ListResources(cred, xrn=None):
- hrn = urn_to_hrn(xrn)[0]
- if DEBUG: print "Received ListResources call"
- slice_id = generate_slide_id(cred, hrn)
-
- msg = struct.pack('> B%ds' % len(slice_id), SFA_GET_RESOURCES, slice_id)
- buf = struct.pack('> H', len(msg)+2) + msg
-
- try:
- aggrMgr_sock = connect_aggrMgr()
- aggrMgr_sock.send(buf)
- resource_list = extract(aggrMgr_sock);
- aggrMgr_sock.close()
-
- if DEBUG: print "----------------"
- return resource_list
- except socket.error, message:
- print "Socket error"
- except IOerror, message:
- print "IO error"
- return None
-
-"""
-Returns the request context required by sfatables. At some point, this mechanism should be changed
-to refer to "contexts", which is the information that sfatables is requesting. But for now, we just
-return the basic information needed in a dict.
-"""
-def fetch_context(slice_hrn, user_hrn, contexts):
- base_context = {'sfa':{'user':{'hrn':user_hrn}}}
- return base_context
-
-def main():
- r = RSpec()
- r.parseFile(sys.argv[1])
- rspec = r.toDict()
- CreateSliver(None,'plc',rspec,'call-id-plc')
-
-if __name__ == "__main__":
- main()
-from sfa.util.faults import SfaNotImplemented
+from types import ModuleType, ClassType
+
+from sfa.util.faults import SfaNotImplemented, SfaAPIError
from sfa.util.sfalogging import logger
####################
the standard AttributeError
"""
def __init__(self, manager, interface):
- self.manager = manager
+ if isinstance (manager, ModuleType):
+ # old-fashioned module implementation
+ self.manager = manager
+ elif isinstance (manager, ClassType):
+ # create an instance; we don't pass the api in argument as it is passed
+ # to the actual method calls anyway
+ self.manager = manager()
+ else:
+ raise SfaAPIError,"Argument to ManagerWrapper must be a module or class"
self.interface = interface
def __getattr__(self, method):
from sfa.rspecs.rspec import RSpec
from sfa.client.client_helper import sfa_to_pg_users_arg
-def _call_id_supported(api, server):
- """
- Returns true if server support the optional call_id arg, false otherwise.
- """
- server_version = api.get_cached_server_version(server)
-
- if 'sfa' in server_version:
- code_tag = server_version['code_tag']
- code_tag_parts = code_tag.split("-")
-
- version_parts = code_tag_parts[0].split(".")
- major, minor = version_parts[0:2]
- rev = code_tag_parts[1]
- if int(major) > 1:
- if int(minor) > 0 or int(rev) > 20:
- return True
- return False
-
-# we have specialized xmlrpclib.ServerProxy to remember the input url
-# OTOH it's not clear if we're only dealing with XMLRPCServerProxy instances
-def get_serverproxy_url (server):
- try:
- return server.get_url()
- except:
- logger.warning("GetVersion, falling back to xmlrpclib.ServerProxy internals")
- return server._ServerProxy__host + server._ServerProxy__handler
-
-def GetVersion(api):
- # peers explicitly in aggregates.xml
- peers =dict ([ (peername,get_serverproxy_url(v)) for (peername,v) in api.aggregates.iteritems()
- if peername != api.hrn])
- version_manager = VersionManager()
- ad_rspec_versions = []
- request_rspec_versions = []
- for rspec_version in version_manager.versions:
- if rspec_version.content_type in ['*', 'ad']:
- ad_rspec_versions.append(rspec_version.to_dict())
- if rspec_version.content_type in ['*', 'request']:
- request_rspec_versions.append(rspec_version.to_dict())
- default_rspec_version = version_manager.get_version("sfa 1").to_dict()
- xrn=Xrn(api.hrn, 'authority+sa')
- version_more = {'interface':'slicemgr',
- 'hrn' : xrn.get_hrn(),
- 'urn' : xrn.get_urn(),
- 'peers': peers,
- 'request_rspec_versions': request_rspec_versions,
- 'ad_rspec_versions': ad_rspec_versions,
- 'default_ad_rspec': default_rspec_version
+class SliceManager:
+ def __init__ (self):
+ # self.caching=False
+ self.caching=True
+
+
+ def _call_id_supported(self, api, server):
+ """
+ Returns true if server support the optional call_id arg, false otherwise.
+ """
+ server_version = api.get_cached_server_version(server)
+
+ if 'sfa' in server_version:
+ code_tag = server_version['code_tag']
+ code_tag_parts = code_tag.split("-")
+
+ version_parts = code_tag_parts[0].split(".")
+ major, minor = version_parts[0:2]
+ rev = code_tag_parts[1]
+ if int(major) > 1:
+ if int(minor) > 0 or int(rev) > 20:
+ return True
+ return False
+
+ # we have specialized xmlrpclib.ServerProxy to remember the input url
+ # OTOH it's not clear if we're only dealing with XMLRPCServerProxy instances
+ def get_serverproxy_url (self, server):
+ try:
+ return server.get_url()
+ except:
+ logger.warning("GetVersion, falling back to xmlrpclib.ServerProxy internals")
+ return server._ServerProxy__host + server._ServerProxy__handler
+
+ def GetVersion(self, api):
+ # peers explicitly in aggregates.xml
+ peers =dict ([ (peername,self.get_serverproxy_url(v)) for (peername,v) in api.aggregates.iteritems()
+ if peername != api.hrn])
+ version_manager = VersionManager()
+ ad_rspec_versions = []
+ request_rspec_versions = []
+ for rspec_version in version_manager.versions:
+ if rspec_version.content_type in ['*', 'ad']:
+ ad_rspec_versions.append(rspec_version.to_dict())
+ if rspec_version.content_type in ['*', 'request']:
+ request_rspec_versions.append(rspec_version.to_dict())
+ default_rspec_version = version_manager.get_version("sfa 1").to_dict()
+ xrn=Xrn(api.hrn, 'authority+sa')
+ version_more = {'interface':'slicemgr',
+ 'hrn' : xrn.get_hrn(),
+ 'urn' : xrn.get_urn(),
+ 'peers': peers,
+ 'request_rspec_versions': request_rspec_versions,
+ 'ad_rspec_versions': ad_rspec_versions,
+ 'default_ad_rspec': default_rspec_version
}
- sm_version=version_core(version_more)
- # local aggregate if present needs to have localhost resolved
- if api.hrn in api.aggregates:
- local_am_url=get_serverproxy_url(api.aggregates[api.hrn])
- sm_version['peers'][api.hrn]=local_am_url.replace('localhost',sm_version['hostname'])
- return sm_version
-
-def drop_slicemgr_stats(rspec):
- try:
- stats_elements = rspec.xml.xpath('//statistics')
- for node in stats_elements:
- node.getparent().remove(node)
- except Exception, e:
- logger.warn("drop_slicemgr_stats failed: %s " % (str(e)))
-
-def add_slicemgr_stat(rspec, callname, aggname, elapsed, status):
- try:
- stats_tags = rspec.xml.xpath('//statistics[@call="%s"]' % callname)
- if stats_tags:
- stats_tag = stats_tags[0]
- else:
- stats_tag = etree.SubElement(rspec.xml.root, "statistics", call=callname)
-
- etree.SubElement(stats_tag, "aggregate", name=str(aggname), elapsed=str(elapsed), status=str(status))
- except Exception, e:
- logger.warn("add_slicemgr_stat failed on %s: %s" %(aggname, str(e)))
-
-def ListResources(api, creds, options, call_id):
- version_manager = VersionManager()
- def _ListResources(aggregate, server, credential, opts, call_id):
-
- my_opts = copy(opts)
- args = [credential, my_opts]
- tStart = time.time()
+ sm_version=version_core(version_more)
+ # local aggregate if present needs to have localhost resolved
+ if api.hrn in api.aggregates:
+ local_am_url=self.get_serverproxy_url(api.aggregates[api.hrn])
+ sm_version['peers'][api.hrn]=local_am_url.replace('localhost',sm_version['hostname'])
+ return sm_version
+
+ def drop_slicemgr_stats(self, rspec):
try:
- if _call_id_supported(api, server):
- args.append(call_id)
- version = api.get_cached_server_version(server)
- # force ProtoGENI aggregates to give us a v2 RSpec
- if 'sfa' not in version.keys():
- my_opts['rspec_version'] = version_manager.get_version('ProtoGENI 2').to_dict()
- rspec = server.ListResources(*args)
- return {"aggregate": aggregate, "rspec": rspec, "elapsed": time.time()-tStart, "status": "success"}
+ stats_elements = rspec.xml.xpath('//statistics')
+ for node in stats_elements:
+ node.getparent().remove(node)
except Exception, e:
- api.logger.log_exc("ListResources failed at %s" %(server.url))
- return {"aggregate": aggregate, "elapsed": time.time()-tStart, "status": "exception"}
-
- if Callids().already_handled(call_id): return ""
-
- # get slice's hrn from options
- xrn = options.get('geni_slice_urn', '')
- (hrn, type) = urn_to_hrn(xrn)
- if 'geni_compressed' in options:
- del(options['geni_compressed'])
-
- # get the rspec's return format from options
- rspec_version = version_manager.get_version(options.get('rspec_version'))
- version_string = "rspec_%s" % (rspec_version.to_string())
-
- # look in cache first
- if caching and api.cache and not xrn:
- rspec = api.cache.get(version_string)
- if rspec:
- return rspec
-
- # get the callers hrn
- valid_cred = api.auth.checkCredentials(creds, 'listnodes', hrn)[0]
- caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
-
- # attempt to use delegated credential first
- cred = api.getDelegatedCredential(creds)
- if not cred:
- cred = api.getCredential()
- threads = ThreadManager()
- for aggregate in api.aggregates:
- # prevent infinite loop. Dont send request back to caller
- # unless the caller is the aggregate's SM
- if caller_hrn == aggregate and aggregate != api.hrn:
- continue
-
- # get the rspec from the aggregate
- interface = api.aggregates[aggregate]
- server = api.server_proxy(interface, cred)
- threads.run(_ListResources, aggregate, server, [cred], options, call_id)
-
-
- results = threads.get_results()
- rspec_version = version_manager.get_version(options.get('rspec_version'))
- if xrn:
- result_version = version_manager._get_version(rspec_version.type, rspec_version.version, 'manifest')
- else:
- result_version = version_manager._get_version(rspec_version.type, rspec_version.version, 'ad')
- rspec = RSpec(version=result_version)
- for result in results:
- add_slicemgr_stat(rspec, "ListResources", result["aggregate"], result["elapsed"], result["status"])
- if result["status"]=="success":
- try:
- rspec.version.merge(result["rspec"])
- except:
- api.logger.log_exc("SM.ListResources: Failed to merge aggregate rspec")
-
- # cache the result
- if caching and api.cache and not xrn:
- api.cache.add(version_string, rspec.toxml())
-
- return rspec.toxml()
-
-
-def CreateSliver(api, xrn, creds, rspec_str, users, call_id):
-
- version_manager = VersionManager()
- def _CreateSliver(aggregate, server, xrn, credential, rspec, users, call_id):
- tStart = time.time()
+ logger.warn("drop_slicemgr_stats failed: %s " % (str(e)))
+
+ def add_slicemgr_stat(self, rspec, callname, aggname, elapsed, status):
try:
- # Need to call GetVersion at an aggregate to determine the supported
- # rspec type/format beofre calling CreateSliver at an Aggregate.
+ stats_tags = rspec.xml.xpath('//statistics[@call="%s"]' % callname)
+ if stats_tags:
+ stats_tag = stats_tags[0]
+ else:
+ stats_tag = etree.SubElement(rspec.xml.root, "statistics", call=callname)
+
+ etree.SubElement(stats_tag, "aggregate", name=str(aggname), elapsed=str(elapsed), status=str(status))
+ except Exception, e:
+ logger.warn("add_slicemgr_stat failed on %s: %s" %(aggname, str(e)))
+
+ def ListResources(self, api, creds, options, call_id):
+ version_manager = VersionManager()
+ def _ListResources(aggregate, server, credential, opts, call_id):
+
+ my_opts = copy(opts)
+ args = [credential, my_opts]
+ tStart = time.time()
+ try:
+ if self._call_id_supported(api, server):
+ args.append(call_id)
+ version = api.get_cached_server_version(server)
+ # force ProtoGENI aggregates to give us a v2 RSpec
+ if 'sfa' not in version.keys():
+ my_opts['rspec_version'] = version_manager.get_version('ProtoGENI 2').to_dict()
+ rspec = server.ListResources(*args)
+ return {"aggregate": aggregate, "rspec": rspec, "elapsed": time.time()-tStart, "status": "success"}
+ except Exception, e:
+ api.logger.log_exc("ListResources failed at %s" %(server.url))
+ return {"aggregate": aggregate, "elapsed": time.time()-tStart, "status": "exception"}
+
+ if Callids().already_handled(call_id): return ""
+
+ # get slice's hrn from options
+ xrn = options.get('geni_slice_urn', '')
+ (hrn, type) = urn_to_hrn(xrn)
+ if 'geni_compressed' in options:
+ del(options['geni_compressed'])
+
+ # get the rspec's return format from options
+ rspec_version = version_manager.get_version(options.get('rspec_version'))
+ version_string = "rspec_%s" % (rspec_version.to_string())
+
+ # look in cache first
+ if self.caching and api.cache and not xrn:
+ rspec = api.cache.get(version_string)
+ if rspec:
+ return rspec
+
+ # get the callers hrn
+ valid_cred = api.auth.checkCredentials(creds, 'listnodes', hrn)[0]
+ caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
+
+ # attempt to use delegated credential first
+ cred = api.getDelegatedCredential(creds)
+ if not cred:
+ cred = api.getCredential()
+ threads = ThreadManager()
+ for aggregate in api.aggregates:
+ # prevent infinite loop. Dont send request back to caller
+ # unless the caller is the aggregate's SM
+ if caller_hrn == aggregate and aggregate != api.hrn:
+ continue
+
+ # get the rspec from the aggregate
+ interface = api.aggregates[aggregate]
+ server = api.server_proxy(interface, cred)
+ threads.run(_ListResources, aggregate, server, [cred], options, call_id)
+
+
+ results = threads.get_results()
+ rspec_version = version_manager.get_version(options.get('rspec_version'))
+ if xrn:
+ result_version = version_manager._get_version(rspec_version.type, rspec_version.version, 'manifest')
+ else:
+ result_version = version_manager._get_version(rspec_version.type, rspec_version.version, 'ad')
+ rspec = RSpec(version=result_version)
+ for result in results:
+ self.add_slicemgr_stat(rspec, "ListResources", result["aggregate"], result["elapsed"], result["status"])
+ if result["status"]=="success":
+ try:
+ rspec.version.merge(result["rspec"])
+ except:
+ api.logger.log_exc("SM.ListResources: Failed to merge aggregate rspec")
+
+ # cache the result
+ if self.caching and api.cache and not xrn:
+ api.cache.add(version_string, rspec.toxml())
+
+ return rspec.toxml()
+
+
+ def CreateSliver(self, api, xrn, creds, rspec_str, users, call_id):
+
+ version_manager = VersionManager()
+ def _CreateSliver(aggregate, server, xrn, credential, rspec, users, call_id):
+ tStart = time.time()
+ try:
+ # Need to call GetVersion at an aggregate to determine the supported
+ # rspec type/format beofre calling CreateSliver at an Aggregate.
+ server_version = api.get_cached_server_version(server)
+ requested_users = users
+ if 'sfa' not in server_version and 'geni_api' in server_version:
+ # sfa aggregtes support both sfa and pg rspecs, no need to convert
+ # if aggregate supports sfa rspecs. otherwise convert to pg rspec
+ rspec = RSpec(RSpecConverter.to_pg_rspec(rspec, 'request'))
+ filter = {'component_manager_id': server_version['urn']}
+ rspec.filter(filter)
+ rspec = rspec.toxml()
+ requested_users = sfa_to_pg_users_arg(users)
+ args = [xrn, credential, rspec, requested_users]
+ if self._call_id_supported(api, server):
+ args.append(call_id)
+ rspec = server.CreateSliver(*args)
+ return {"aggregate": aggregate, "rspec": rspec, "elapsed": time.time()-tStart, "status": "success"}
+ except:
+ logger.log_exc('Something wrong in _CreateSliver with URL %s'%server.url)
+ return {"aggregate": aggregate, "elapsed": time.time()-tStart, "status": "exception"}
+
+ if Callids().already_handled(call_id): return ""
+ # Validate the RSpec against PlanetLab's schema --disabled for now
+ # The schema used here needs to aggregate the PL and VINI schemas
+ # schema = "/var/www/html/schemas/pl.rng"
+ rspec = RSpec(rspec_str)
+ # schema = None
+ # if schema:
+ # rspec.validate(schema)
+
+ # if there is a <statistics> section, the aggregates don't care about it,
+ # so delete it.
+ self.drop_slicemgr_stats(rspec)
+
+ # attempt to use delegated credential first
+ cred = api.getDelegatedCredential(creds)
+ if not cred:
+ cred = api.getCredential()
+
+ # get the callers hrn
+ hrn, type = urn_to_hrn(xrn)
+ valid_cred = api.auth.checkCredentials(creds, 'createsliver', hrn)[0]
+ caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
+ threads = ThreadManager()
+ for aggregate in api.aggregates:
+ # prevent infinite loop. Dont send request back to caller
+ # unless the caller is the aggregate's SM
+ if caller_hrn == aggregate and aggregate != api.hrn:
+ continue
+ interface = api.aggregates[aggregate]
+ server = api.server_proxy(interface, cred)
+ # Just send entire RSpec to each aggregate
+ threads.run(_CreateSliver, aggregate, server, xrn, [cred], rspec.toxml(), users, call_id)
+
+ results = threads.get_results()
+ manifest_version = version_manager._get_version(rspec.version.type, rspec.version.version, 'manifest')
+ result_rspec = RSpec(version=manifest_version)
+ for result in results:
+ self.add_slicemgr_stat(result_rspec, "CreateSliver", result["aggregate"], result["elapsed"], result["status"])
+ if result["status"]=="success":
+ try:
+ result_rspec.version.merge(result["rspec"])
+ except:
+ api.logger.log_exc("SM.CreateSliver: Failed to merge aggregate rspec")
+ return result_rspec.toxml()
+
+ def RenewSliver(self, api, xrn, creds, expiration_time, call_id):
+ def _RenewSliver(server, xrn, creds, expiration_time, call_id):
server_version = api.get_cached_server_version(server)
- requested_users = users
- if 'sfa' not in server_version and 'geni_api' in server_version:
- # sfa aggregtes support both sfa and pg rspecs, no need to convert
- # if aggregate supports sfa rspecs. otherwise convert to pg rspec
- rspec = RSpec(RSpecConverter.to_pg_rspec(rspec, 'request'))
- filter = {'component_manager_id': server_version['urn']}
- rspec.filter(filter)
- rspec = rspec.toxml()
- requested_users = sfa_to_pg_users_arg(users)
- args = [xrn, credential, rspec, requested_users]
- if _call_id_supported(api, server):
+ args = [xrn, creds, expiration_time, call_id]
+ if self._call_id_supported(api, server):
args.append(call_id)
- rspec = server.CreateSliver(*args)
- return {"aggregate": aggregate, "rspec": rspec, "elapsed": time.time()-tStart, "status": "success"}
- except:
- logger.log_exc('Something wrong in _CreateSliver with URL %s'%server.url)
- return {"aggregate": aggregate, "elapsed": time.time()-tStart, "status": "exception"}
-
- if Callids().already_handled(call_id): return ""
- # Validate the RSpec against PlanetLab's schema --disabled for now
- # The schema used here needs to aggregate the PL and VINI schemas
- # schema = "/var/www/html/schemas/pl.rng"
- rspec = RSpec(rspec_str)
-# schema = None
-# if schema:
-# rspec.validate(schema)
-
- # if there is a <statistics> section, the aggregates don't care about it,
- # so delete it.
- drop_slicemgr_stats(rspec)
-
- # attempt to use delegated credential first
- cred = api.getDelegatedCredential(creds)
- if not cred:
- cred = api.getCredential()
-
- # get the callers hrn
- hrn, type = urn_to_hrn(xrn)
- valid_cred = api.auth.checkCredentials(creds, 'createsliver', hrn)[0]
- caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
- threads = ThreadManager()
- for aggregate in api.aggregates:
- # prevent infinite loop. Dont send request back to caller
- # unless the caller is the aggregate's SM
- if caller_hrn == aggregate and aggregate != api.hrn:
- continue
- interface = api.aggregates[aggregate]
- server = api.server_proxy(interface, cred)
- # Just send entire RSpec to each aggregate
- threads.run(_CreateSliver, aggregate, server, xrn, [cred], rspec.toxml(), users, call_id)
+ return server.RenewSliver(*args)
+
+ if Callids().already_handled(call_id): return True
+
+ (hrn, type) = urn_to_hrn(xrn)
+ # get the callers hrn
+ valid_cred = api.auth.checkCredentials(creds, 'renewsliver', hrn)[0]
+ caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
+
+ # attempt to use delegated credential first
+ cred = api.getDelegatedCredential(creds)
+ if not cred:
+ cred = api.getCredential()
+ threads = ThreadManager()
+ for aggregate in api.aggregates:
+ # prevent infinite loop. Dont send request back to caller
+ # unless the caller is the aggregate's SM
+ if caller_hrn == aggregate and aggregate != api.hrn:
+ continue
+ interface = api.aggregates[aggregate]
+ server = api.server_proxy(interface, cred)
+ threads.run(_RenewSliver, server, xrn, [cred], expiration_time, call_id)
+ # 'and' the results
+ return reduce (lambda x,y: x and y, threads.get_results() , True)
+
+ def DeleteSliver(self, api, xrn, creds, call_id):
+ def _DeleteSliver(server, xrn, creds, call_id):
+ server_version = api.get_cached_server_version(server)
+ args = [xrn, creds]
+ if self._call_id_supported(api, server):
+ args.append(call_id)
+ return server.DeleteSliver(*args)
+
+ if Callids().already_handled(call_id): return ""
+ (hrn, type) = urn_to_hrn(xrn)
+ # get the callers hrn
+ valid_cred = api.auth.checkCredentials(creds, 'deletesliver', hrn)[0]
+ caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
+
+ # attempt to use delegated credential first
+ cred = api.getDelegatedCredential(creds)
+ if not cred:
+ cred = api.getCredential()
+ threads = ThreadManager()
+ for aggregate in api.aggregates:
+ # prevent infinite loop. Dont send request back to caller
+ # unless the caller is the aggregate's SM
+ if caller_hrn == aggregate and aggregate != api.hrn:
+ continue
+ interface = api.aggregates[aggregate]
+ server = api.server_proxy(interface, cred)
+ threads.run(_DeleteSliver, server, xrn, [cred], call_id)
+ threads.get_results()
+ return 1
+
+
+ # first draft at a merging SliverStatus
+ def SliverStatus(self, api, slice_xrn, creds, call_id):
+ def _SliverStatus(server, xrn, creds, call_id):
+ server_version = api.get_cached_server_version(server)
+ args = [xrn, creds]
+ if self._call_id_supported(api, server):
+ args.append(call_id)
+ return server.SliverStatus(*args)
+
+ if Callids().already_handled(call_id): return {}
+ # attempt to use delegated credential first
+ cred = api.getDelegatedCredential(creds)
+ if not cred:
+ cred = api.getCredential()
+ threads = ThreadManager()
+ for aggregate in api.aggregates:
+ interface = api.aggregates[aggregate]
+ server = api.server_proxy(interface, cred)
+ threads.run (_SliverStatus, server, slice_xrn, [cred], call_id)
+ results = threads.get_results()
+
+ # get rid of any void result - e.g. when call_id was hit where by convention we return {}
+ results = [ result for result in results if result and result['geni_resources']]
+
+ # do not try to combine if there's no result
+ if not results : return {}
+
+ # otherwise let's merge stuff
+ overall = {}
+
+ # mmh, it is expected that all results carry the same urn
+ overall['geni_urn'] = results[0]['geni_urn']
+ overall['pl_login'] = results[0]['pl_login']
+ # append all geni_resources
+ overall['geni_resources'] = \
+ reduce (lambda x,y: x+y, [ result['geni_resources'] for result in results] , [])
+ overall['status'] = 'unknown'
+ if overall['geni_resources']:
+ overall['status'] = 'ready'
+
+ return overall
+
+ def ListSlices(self, api, creds, call_id):
+ def _ListSlices(server, creds, call_id):
+ server_version = api.get_cached_server_version(server)
+ args = [creds]
+ if self._call_id_supported(api, server):
+ args.append(call_id)
+ return server.ListSlices(*args)
+
+ if Callids().already_handled(call_id): return []
+
+ # look in cache first
+ if self.caching and api.cache:
+ slices = api.cache.get('slices')
+ if slices:
+ return slices
+
+ # get the callers hrn
+ valid_cred = api.auth.checkCredentials(creds, 'listslices', None)[0]
+ caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
+
+ # attempt to use delegated credential first
+ cred= api.getDelegatedCredential(creds)
+ if not cred:
+ cred = api.getCredential()
+ threads = ThreadManager()
+ # fetch from aggregates
+ for aggregate in api.aggregates:
+ # prevent infinite loop. Dont send request back to caller
+ # unless the caller is the aggregate's SM
+ if caller_hrn == aggregate and aggregate != api.hrn:
+ continue
+ interface = api.aggregates[aggregate]
+ server = api.server_proxy(interface, cred)
+ threads.run(_ListSlices, server, [cred], call_id)
+
+ # combime results
+ results = threads.get_results()
+ slices = []
+ for result in results:
+ slices.extend(result)
+
+ # cache the result
+ if self.caching and api.cache:
+ api.cache.add('slices', slices)
+
+ return slices
+
+
+ def get_ticket(self, api, xrn, creds, rspec, users):
+ slice_hrn, type = urn_to_hrn(xrn)
+ # get the netspecs contained within the clients rspec
+ aggregate_rspecs = {}
+ tree= etree.parse(StringIO(rspec))
+ elements = tree.findall('./network')
+ for element in elements:
+ aggregate_hrn = element.values()[0]
+ aggregate_rspecs[aggregate_hrn] = rspec
+
+ # get the callers hrn
+ valid_cred = api.auth.checkCredentials(creds, 'getticket', slice_hrn)[0]
+ caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
+
+ # attempt to use delegated credential first
+ cred = api.getDelegatedCredential(creds)
+ if not cred:
+ cred = api.getCredential()
+ threads = ThreadManager()
+ for (aggregate, aggregate_rspec) in aggregate_rspecs.iteritems():
+ # prevent infinite loop. Dont send request back to caller
+ # unless the caller is the aggregate's SM
+ if caller_hrn == aggregate and aggregate != api.hrn:
+ continue
- results = threads.get_results()
- manifest_version = version_manager._get_version(rspec.version.type, rspec.version.version, 'manifest')
- result_rspec = RSpec(version=manifest_version)
- for result in results:
- add_slicemgr_stat(result_rspec, "CreateSliver", result["aggregate"], result["elapsed"], result["status"])
- if result["status"]=="success":
- try:
- result_rspec.version.merge(result["rspec"])
- except:
- api.logger.log_exc("SM.CreateSliver: Failed to merge aggregate rspec")
- return result_rspec.toxml()
-
-def RenewSliver(api, xrn, creds, expiration_time, call_id):
- def _RenewSliver(server, xrn, creds, expiration_time, call_id):
- server_version = api.get_cached_server_version(server)
- args = [xrn, creds, expiration_time, call_id]
- if _call_id_supported(api, server):
- args.append(call_id)
- return server.RenewSliver(*args)
-
- if Callids().already_handled(call_id): return True
-
- (hrn, type) = urn_to_hrn(xrn)
- # get the callers hrn
- valid_cred = api.auth.checkCredentials(creds, 'renewsliver', hrn)[0]
- caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
-
- # attempt to use delegated credential first
- cred = api.getDelegatedCredential(creds)
- if not cred:
- cred = api.getCredential()
- threads = ThreadManager()
- for aggregate in api.aggregates:
- # prevent infinite loop. Dont send request back to caller
- # unless the caller is the aggregate's SM
- if caller_hrn == aggregate and aggregate != api.hrn:
- continue
- interface = api.aggregates[aggregate]
- server = api.server_proxy(interface, cred)
- threads.run(_RenewSliver, server, xrn, [cred], expiration_time, call_id)
- # 'and' the results
- return reduce (lambda x,y: x and y, threads.get_results() , True)
-
-def DeleteSliver(api, xrn, creds, call_id):
- def _DeleteSliver(server, xrn, creds, call_id):
- server_version = api.get_cached_server_version(server)
- args = [xrn, creds]
- if _call_id_supported(api, server):
- args.append(call_id)
- return server.DeleteSliver(*args)
-
- if Callids().already_handled(call_id): return ""
- (hrn, type) = urn_to_hrn(xrn)
- # get the callers hrn
- valid_cred = api.auth.checkCredentials(creds, 'deletesliver', hrn)[0]
- caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
-
- # attempt to use delegated credential first
- cred = api.getDelegatedCredential(creds)
- if not cred:
- cred = api.getCredential()
- threads = ThreadManager()
- for aggregate in api.aggregates:
- # prevent infinite loop. Dont send request back to caller
- # unless the caller is the aggregate's SM
- if caller_hrn == aggregate and aggregate != api.hrn:
- continue
- interface = api.aggregates[aggregate]
- server = api.server_proxy(interface, cred)
- threads.run(_DeleteSliver, server, xrn, [cred], call_id)
- threads.get_results()
- return 1
-
-
-# first draft at a merging SliverStatus
-def SliverStatus(api, slice_xrn, creds, call_id):
- def _SliverStatus(server, xrn, creds, call_id):
- server_version = api.get_cached_server_version(server)
- args = [xrn, creds]
- if _call_id_supported(api, server):
- args.append(call_id)
- return server.SliverStatus(*args)
-
- if Callids().already_handled(call_id): return {}
- # attempt to use delegated credential first
- cred = api.getDelegatedCredential(creds)
- if not cred:
- cred = api.getCredential()
- threads = ThreadManager()
- for aggregate in api.aggregates:
- interface = api.aggregates[aggregate]
- server = api.server_proxy(interface, cred)
- threads.run (_SliverStatus, server, slice_xrn, [cred], call_id)
- results = threads.get_results()
-
- # get rid of any void result - e.g. when call_id was hit where by convention we return {}
- results = [ result for result in results if result and result['geni_resources']]
-
- # do not try to combine if there's no result
- if not results : return {}
-
- # otherwise let's merge stuff
- overall = {}
-
- # mmh, it is expected that all results carry the same urn
- overall['geni_urn'] = results[0]['geni_urn']
- overall['pl_login'] = results[0]['pl_login']
- # append all geni_resources
- overall['geni_resources'] = \
- reduce (lambda x,y: x+y, [ result['geni_resources'] for result in results] , [])
- overall['status'] = 'unknown'
- if overall['geni_resources']:
- overall['status'] = 'ready'
-
- return overall
-
-caching=True
-#caching=False
-def ListSlices(api, creds, call_id):
- def _ListSlices(server, creds, call_id):
- server_version = api.get_cached_server_version(server)
- args = [creds]
- if _call_id_supported(api, server):
- args.append(call_id)
- return server.ListSlices(*args)
-
- if Callids().already_handled(call_id): return []
-
- # look in cache first
- if caching and api.cache:
- slices = api.cache.get('slices')
- if slices:
- return slices
-
- # get the callers hrn
- valid_cred = api.auth.checkCredentials(creds, 'listslices', None)[0]
- caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
-
- # attempt to use delegated credential first
- cred= api.getDelegatedCredential(creds)
- if not cred:
- cred = api.getCredential()
- threads = ThreadManager()
- # fetch from aggregates
- for aggregate in api.aggregates:
- # prevent infinite loop. Dont send request back to caller
- # unless the caller is the aggregate's SM
- if caller_hrn == aggregate and aggregate != api.hrn:
- continue
- interface = api.aggregates[aggregate]
- server = api.server_proxy(interface, cred)
- threads.run(_ListSlices, server, [cred], call_id)
-
- # combime results
- results = threads.get_results()
- slices = []
- for result in results:
- slices.extend(result)
-
- # cache the result
- if caching and api.cache:
- api.cache.add('slices', slices)
-
- return slices
-
-
-def get_ticket(api, xrn, creds, rspec, users):
- slice_hrn, type = urn_to_hrn(xrn)
- # get the netspecs contained within the clients rspec
- aggregate_rspecs = {}
- tree= etree.parse(StringIO(rspec))
- elements = tree.findall('./network')
- for element in elements:
- aggregate_hrn = element.values()[0]
- aggregate_rspecs[aggregate_hrn] = rspec
-
- # get the callers hrn
- valid_cred = api.auth.checkCredentials(creds, 'getticket', slice_hrn)[0]
- caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
-
- # attempt to use delegated credential first
- cred = api.getDelegatedCredential(creds)
- if not cred:
- cred = api.getCredential()
- threads = ThreadManager()
- for (aggregate, aggregate_rspec) in aggregate_rspecs.iteritems():
- # prevent infinite loop. Dont send request back to caller
- # unless the caller is the aggregate's SM
- if caller_hrn == aggregate and aggregate != api.hrn:
- continue
+ interface = api.aggregates[aggregate]
+ server = api.server_proxy(interface, cred)
+ threads.run(server.GetTicket, xrn, [cred], aggregate_rspec, users)
+
+ results = threads.get_results()
- interface = api.aggregates[aggregate]
- server = api.server_proxy(interface, cred)
- threads.run(server.GetTicket, xrn, [cred], aggregate_rspec, users)
-
- results = threads.get_results()
-
- # gather information from each ticket
- rspec = None
- initscripts = []
- slivers = []
- object_gid = None
- for result in results:
- agg_ticket = SfaTicket(string=result)
- attrs = agg_ticket.get_attributes()
- if not object_gid:
- object_gid = agg_ticket.get_gid_object()
- if not rspec:
- rspec = RSpec(agg_ticket.get_rspec())
- else:
- rspec.version.merge(agg_ticket.get_rspec())
- initscripts.extend(attrs.get('initscripts', []))
- slivers.extend(attrs.get('slivers', []))
-
- # merge info
- attributes = {'initscripts': initscripts,
- 'slivers': slivers}
-
- # create a new ticket
- ticket = SfaTicket(subject = slice_hrn)
- ticket.set_gid_caller(api.auth.client_gid)
- ticket.set_issuer(key=api.key, subject=api.hrn)
- ticket.set_gid_object(object_gid)
- ticket.set_pubkey(object_gid.get_pubkey())
- #new_ticket.set_parent(api.auth.hierarchy.get_auth_ticket(auth_hrn))
- ticket.set_attributes(attributes)
- ticket.set_rspec(rspec.toxml())
- ticket.encode()
- ticket.sign()
- return ticket.save_to_string(save_parents=True)
-
-def start_slice(api, xrn, creds):
- hrn, type = urn_to_hrn(xrn)
-
- # get the callers hrn
- valid_cred = api.auth.checkCredentials(creds, 'startslice', hrn)[0]
- caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
-
- # attempt to use delegated credential first
- cred = api.getDelegatedCredential(creds)
- if not cred:
- cred = api.getCredential()
- threads = ThreadManager()
- for aggregate in api.aggregates:
- # prevent infinite loop. Dont send request back to caller
- # unless the caller is the aggregate's SM
- if caller_hrn == aggregate and aggregate != api.hrn:
- continue
- interface = api.aggregates[aggregate]
- server = api.server_proxy(interface, cred)
- threads.run(server.Start, xrn, cred)
- threads.get_results()
- return 1
-
-def stop_slice(api, xrn, creds):
- hrn, type = urn_to_hrn(xrn)
-
- # get the callers hrn
- valid_cred = api.auth.checkCredentials(creds, 'stopslice', hrn)[0]
- caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
-
- # attempt to use delegated credential first
- cred = api.getDelegatedCredential(creds)
- if not cred:
- cred = api.getCredential()
- threads = ThreadManager()
- for aggregate in api.aggregates:
- # prevent infinite loop. Dont send request back to caller
- # unless the caller is the aggregate's SM
- if caller_hrn == aggregate and aggregate != api.hrn:
- continue
- interface = api.aggregates[aggregate]
- server = api.server_proxy(interface, cred)
- threads.run(server.Stop, xrn, cred)
- threads.get_results()
- return 1
-
-def reset_slice(api, xrn):
- """
- Not implemented
- """
- return 1
-
-def shutdown(api, xrn, creds):
- """
- Not implemented
- """
- return 1
-
-def status(api, xrn, creds):
- """
- Not implemented
- """
- return 1
-
-# this is plain broken
-#def main():
-# r = RSpec()
-# r.parseFile(sys.argv[1])
-# rspec = r.toDict()
-# CreateSliver(None,'plc.princeton.tmacktestslice',rspec,'create-slice-tmacktestslice')
-
-if __name__ == "__main__":
- main()
+ # gather information from each ticket
+ rspec = None
+ initscripts = []
+ slivers = []
+ object_gid = None
+ for result in results:
+ agg_ticket = SfaTicket(string=result)
+ attrs = agg_ticket.get_attributes()
+ if not object_gid:
+ object_gid = agg_ticket.get_gid_object()
+ if not rspec:
+ rspec = RSpec(agg_ticket.get_rspec())
+ else:
+ rspec.version.merge(agg_ticket.get_rspec())
+ initscripts.extend(attrs.get('initscripts', []))
+ slivers.extend(attrs.get('slivers', []))
+
+ # merge info
+ attributes = {'initscripts': initscripts,
+ 'slivers': slivers}
+
+ # create a new ticket
+ ticket = SfaTicket(subject = slice_hrn)
+ ticket.set_gid_caller(api.auth.client_gid)
+ ticket.set_issuer(key=api.key, subject=api.hrn)
+ ticket.set_gid_object(object_gid)
+ ticket.set_pubkey(object_gid.get_pubkey())
+ #new_ticket.set_parent(api.auth.hierarchy.get_auth_ticket(auth_hrn))
+ ticket.set_attributes(attributes)
+ ticket.set_rspec(rspec.toxml())
+ ticket.encode()
+ ticket.sign()
+ return ticket.save_to_string(save_parents=True)
+
+ def start_slice(self, api, xrn, creds):
+ hrn, type = urn_to_hrn(xrn)
+
+ # get the callers hrn
+ valid_cred = api.auth.checkCredentials(creds, 'startslice', hrn)[0]
+ caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
+
+ # attempt to use delegated credential first
+ cred = api.getDelegatedCredential(creds)
+ if not cred:
+ cred = api.getCredential()
+ threads = ThreadManager()
+ for aggregate in api.aggregates:
+ # prevent infinite loop. Dont send request back to caller
+ # unless the caller is the aggregate's SM
+ if caller_hrn == aggregate and aggregate != api.hrn:
+ continue
+ interface = api.aggregates[aggregate]
+ server = api.server_proxy(interface, cred)
+ threads.run(server.Start, xrn, cred)
+ threads.get_results()
+ return 1
+
+ def stop_slice(self, api, xrn, creds):
+ hrn, type = urn_to_hrn(xrn)
+
+ # get the callers hrn
+ valid_cred = api.auth.checkCredentials(creds, 'stopslice', hrn)[0]
+ caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
+
+ # attempt to use delegated credential first
+ cred = api.getDelegatedCredential(creds)
+ if not cred:
+ cred = api.getCredential()
+ threads = ThreadManager()
+ for aggregate in api.aggregates:
+ # prevent infinite loop. Dont send request back to caller
+ # unless the caller is the aggregate's SM
+ if caller_hrn == aggregate and aggregate != api.hrn:
+ continue
+ interface = api.aggregates[aggregate]
+ server = api.server_proxy(interface, cred)
+ threads.run(server.Stop, xrn, cred)
+ threads.get_results()
+ return 1
+
+ def reset_slice(self, api, xrn):
+ """
+ Not implemented
+ """
+ return 1
+
+ def shutdown(self, api, xrn, creds):
+ """
+ Not implemented
+ """
+ return 1
+
+ def status(self, api, xrn, creds):
+ """
+ Not implemented
+ """
+ return 1
#!/usr/bin/python
from sfa.util.xrn import hrn_to_urn, urn_to_hrn
-from sfa.util.plxrn import PlXrn, hostname_to_urn, hrn_to_pl_slicename
+from sfa.util.plxrn import PlXrn, hostname_to_urn, hrn_to_pl_slicename, urn_to_sliver_id
from sfa.rspecs.rspec import RSpec
+from sfa.rspecs.elements.hardware_type import HardwareType
from sfa.rspecs.elements.link import Link
+from sfa.rspecs.elements.login import Login
from sfa.rspecs.elements.interface import Interface
-
+from sfa.rspecs.elements.services import Services
+from sfa.rspecs.elements.pltag import PLTag
from sfa.util.topology import Topology
from sfa.rspecs.version_manager import VersionManager
from sfa.plc.vlink import get_tc_rate
class Aggregate:
api = None
- sites = {}
- nodes = {}
- interfaces = {}
- links = {}
- node_tags = {}
- pl_initscripts = {}
- prepared=False
#panos new user options variable
user_options = {}
self.api = api
self.user_options = user_options
- def prepare_sites(self, filter={}, force=False):
- if not self.sites or force:
- for site in self.api.driver.GetSites(filter):
- self.sites[site['site_id']] = site
-
- def prepare_nodes(self, filter={}, force=False):
- if not self.nodes or force:
- filter.update({'peer_id': None})
- nodes = self.api.driver.GetNodes(filter)
- site_ids = []
- interface_ids = []
- tag_ids = []
- for node in nodes:
- site_ids.append(node['site_id'])
- interface_ids.extend(node['interface_ids'])
- tag_ids.extend(node['node_tag_ids'])
- self.prepare_sites({'site_id': site_ids})
- self.prepare_interfaces({'interface_id': interface_ids})
- self.prepare_node_tags({'node_tag_id': tag_ids})
- for node in nodes:
- # add site/interface info to nodes.
- # assumes that sites, interfaces and tags have already been prepared.
- site = self.sites[node['site_id']]
- interfaces = [self.interfaces[interface_id] for interface_id in node['interface_ids']]
- tags = [self.node_tags[tag_id] for tag_id in node['node_tag_ids']]
- node['network'] = self.api.hrn
- node['network_urn'] = hrn_to_urn(self.api.hrn, 'authority+am')
- node['urn'] = hostname_to_urn(self.api.hrn, site['login_base'], node['hostname'])
- node['site_urn'] = hrn_to_urn(PlXrn.site_hrn(self.api.hrn, site['login_base']), 'authority+sa')
- node['site'] = site
- node['interfaces'] = interfaces
- node['tags'] = tags
- self.nodes[node['node_id']] = node
-
- def prepare_interfaces(self, filter={}, force=False):
- if not self.interfaces or force:
- for interface in self.api.driver.GetInterfaces(filter):
- self.interfaces[interface['interface_id']] = interface
-
- def prepare_links(self, filter={}, force=False):
- # we're aobut to deprecate sfa_aggregate_type, need to get this right
- # with the generic framework
- if not self.links or force:
- if not self.api.config.SFA_AGGREGATE_TYPE.lower() == 'vini':
- return
-
- topology = Topology()
- for (site_id1, site_id2) in topology:
- link = Link()
- if not site_id1 in self.sites or site_id2 not in self.sites:
+ def get_sites(self, filter={}):
+ sites = {}
+ for site in self.api.driver.GetSites(filter):
+ sites[site['site_id']] = site
+ return sites
+
+ def get_interfaces(self, filter={}):
+ interfaces = {}
+ for interface in self.api.driver.GetInterfaces(filter):
+ iface = Interface()
+ iface['interface_id'] = interface['interface_id']
+ iface['node_id'] = interface['node_id']
+ iface['ipv4'] = interface['ip']
+ iface['bwlimit'] = interface['bwlimit']
+ interfaces[iface['interface_id']] = iface
+ return interfaces
+
+ def get_links(self, filter={}):
+
+ if not self.api.config.SFA_AGGREGATE_TYPE.lower() == 'vini':
+ return
+
+ topology = Topology()
+ links = {}
+ for (site_id1, site_id2) in topology:
+ link = Link()
+ if not site_id1 in self.sites or site_id2 not in self.sites:
+ continue
+ site1 = self.sites[site_id1]
+ site2 = self.sites[site_id2]
+ # get hrns
+ site1_hrn = self.api.hrn + '.' + site1['login_base']
+ site2_hrn = self.api.hrn + '.' + site2['login_base']
+ # get the first node
+ node1 = self.nodes[site1['node_ids'][0]]
+ node2 = self.nodes[site2['node_ids'][0]]
+
+ # set interfaces
+ # just get first interface of the first node
+ if1_xrn = PlXrn(auth=self.api.hrn, interface='node%s:eth0' % (node1['node_id']))
+ if1_ipv4 = self.interfaces[node1['interface_ids'][0]]['ip']
+ if2_xrn = PlXrn(auth=self.api.hrn, interface='node%s:eth0' % (node2['node_id']))
+ if2_ipv4 = self.interfaces[node2['interface_ids'][0]]['ip']
+
+ if1 = Interface({'component_id': if1_xrn.urn, 'ipv4': if1_ipv4} )
+ if2 = Interface({'component_id': if2_xrn.urn, 'ipv4': if2_ipv4} )
+
+ # set link
+ link = Link({'capacity': '1000000', 'latency': '0', 'packet_loss': '0', 'type': 'ipv4'})
+ link['interface1'] = if1
+ link['interface2'] = if2
+ link['component_name'] = "%s:%s" % (site1['login_base'], site2['login_base'])
+ link['component_id'] = PlXrn(auth=self.api.hrn, interface=link['component_name']).get_urn()
+ link['component_manager_id'] = hrn_to_urn(self.api.hrn, 'authority+am')
+ links[link['component_name']] = link
+
+ return links
+
+ def get_node_tags(self, filter={}):
+ node_tags = {}
+ for node_tag in self.api.driver.GetNodeTags(filter):
+ node_tags[node_tag['node_tag_id']] = node_tag
+ return node_tags
+
+ def get_pl_initscripts(self, filter={}):
+ pl_initscripts = {}
+ filter.update({'enabled': True})
+ for initscript in self.api.driver.GetInitScripts(filter):
+ pl_initscripts[initscript['initscript_id']] = initscript
+ return pl_initscripts
+
+
+ def get_slice_and_slivers(self, slice_xrn):
+ """
+ Returns a dict of slivers keyed on the sliver's node_id
+ """
+ slivers = {}
+ slice = None
+ if not slice_xrn:
+ return (slice, slivers)
+ slice_urn = hrn_to_urn(slice_xrn)
+ slice_hrn, _ = urn_to_hrn(slice_xrn)
+ slice_name = hrn_to_pl_slicename(slice_hrn)
+ slices = self.api.driver.GetSlices(slice_name)
+ if not slices:
+ return (slice, slivers)
+ slice = slices[0]
+
+ # sort slivers by node id
+ for node_id in slice['node_ids']:
+ sliver = Sliver({'sliver_id': urn_to_sliver_id(slice_urn, slice['slice_id'], node_id),
+ 'name': 'plab-vserver',
+ 'tags': []})
+ slivers[node_id]= sliver
+
+ # sort sliver attributes by node id
+ tags = self.api.driver.GetSliceTags({'slice_tag_id': slice['slice_tag_ids']})
+ for tag in tags:
+ # most likely a default/global sliver attribute (node_id == None)
+ if tag['node_id'] not in slivers:
+ sliver = Sliver({'sliver_id': urn_to_sliver_id(slice_urn, slice['slice_id'], ""),
+ 'name': 'plab-vserver',
+ 'tags': []})
+ slivers[tag['node_id']] = sliver
+ slivers[tag['node_id']]['tags'].append(tag)
+
+ return (slice, slivers)
+
+ def get_nodes(self, slice=None):
+ filter = {}
+ if slice and 'node_ids' in slice and slice['node_ids']:
+ filter['node_id'] = slice['node_ids']
+
+ filter.update({'peer_id': None})
+ nodes = self.api.driver.GetNodes(filter)
+
+ site_ids = []
+ interface_ids = []
+ tag_ids = []
+ for node in nodes:
+ site_ids.append(node['site_id'])
+ interface_ids.extend(node['interface_ids'])
+ tag_ids.extend(node['node_tag_ids'])
+
+ # get sites
+ sites_dict = self.get_sites({'site_id': site_ids})
+ # get interfaces
+ interfaces = self.get_interfaces({'interface_id':interface_ids})
+ # get slivers
+ slivers = self.get_slivers(slice)
+ # get tags
+ node_tags = self.get_node_tags({'node_id': node_ids})
+ # get initscripts
+ pl_initscripts = self.get_pl_initscripts()
+
+ rspec_nodes = []
+ for node in nodes:
+ # skip whitelisted nodes
+ if node['slice_ids_whitelist']:
+ if not slice or slice['slice_id'] not in node['slice_ids_whitelist']:
continue
- site1 = self.sites[site_id1]
- site2 = self.sites[site_id2]
- # get hrns
- site1_hrn = self.api.hrn + '.' + site1['login_base']
- site2_hrn = self.api.hrn + '.' + site2['login_base']
- # get the first node
- node1 = self.nodes[site1['node_ids'][0]]
- node2 = self.nodes[site2['node_ids'][0]]
-
- # set interfaces
- # just get first interface of the first node
- if1_xrn = PlXrn(auth=self.api.hrn, interface='node%s:eth0' % (node1['node_id']))
- if1_ipv4 = self.interfaces[node1['interface_ids'][0]]['ip']
- if2_xrn = PlXrn(auth=self.api.hrn, interface='node%s:eth0' % (node2['node_id']))
- if2_ipv4 = self.interfaces[node2['interface_ids'][0]]['ip']
-
- if1 = Interface({'component_id': if1_xrn.urn, 'ipv4': if1_ipv4} )
- if2 = Interface({'component_id': if2_xrn.urn, 'ipv4': if2_ipv4} )
-
- # set link
- link = Link({'capacity': '1000000', 'latency': '0', 'packet_loss': '0', 'type': 'ipv4'})
- link['interface1'] = if1
- link['interface2'] = if2
- link['component_name'] = "%s:%s" % (site1['login_base'], site2['login_base'])
- link['component_id'] = PlXrn(auth=self.api.hrn, interface=link['component_name']).get_urn()
- link['component_manager_id'] = hrn_to_urn(self.api.hrn, 'authority+am')
- self.links[link['component_name']] = link
-
-
- def prepare_node_tags(self, filter={}, force=False):
- if not self.node_tags or force:
- for node_tag in self.api.driver.GetNodeTags(filter):
- self.node_tags[node_tag['node_tag_id']] = node_tag
-
- def prepare_pl_initscripts(self, filter={}, force=False):
- if not self.pl_initscripts or force:
- filter.update({'enabled': True})
- for initscript in self.api.driver.GetInitScripts(filter):
- self.pl_initscripts[initscript['initscript_id']] = initscript
-
- def prepare(self, slice = None, force=False):
- if not self.prepared or force or slice:
- if not slice:
- self.prepare_sites(force=force)
- self.prepare_interfaces(force=force)
- self.prepare_node_tags(force=force)
- self.prepare_nodes(force=force)
- self.prepare_links(force=force)
- self.prepare_pl_initscripts(force=force)
- else:
- self.prepare_sites({'site_id': slice['site_id']})
- self.prepare_interfaces({'node_id': slice['node_ids']})
- self.prepare_node_tags({'node_id': slice['node_ids']})
- self.prepare_nodes({'node_id': slice['node_ids']})
- self.prepare_links({'slice_id': slice['slice_id']})
- self.prepare_pl_initscripts()
- self.prepared = True
-
+ rspec_node = Node()
+ rspec_node['component_id'] = hostname_to_urn(self.api.hrn, site['login_base'], node['hostname'])
+ rspec_node['component_name'] = node['hostname']
+ rspec_node['component_manager_id'] = self.api.hrn
+ rspec_node['authority_id'] = hrn_to_urn(PlXrn.site_hrn(self.api.hrn, site['login_base']), 'authority+sa')
+ rspec_node['boot_state'] = node['boot_state']
+ rspec_node['exclusive'] = 'False'
+ rspec_node['hardware_types'].append(HardwareType({'name': 'plab-vserver'}))
+ # only doing this because protogeni rspec needs
+ # to advertise available initscripts
+ rspec_node['pl_initscripts'] = pl_initscripts
+ # add site/interface info to nodes.
+ # assumes that sites, interfaces and tags have already been prepared.
+ site = sites_dict[node['site_id']]
+ location = Location({'longitude': site['longitude'], 'latitude': site['latitude']})
+ rspec_node['location'] = location
+ rspec_node['interfaces'] = []
+ for if_id in node['interface_ids']:
+ interface = Interface(interfaces[if_id])
+ interface['ipv4'] = interface['ip']
+ rspec_node['interfaces'].append(interface)
+ tags = [PLTag(node_tags[tag_id]) for tag_id in node['node_tag_ids']]
+ rspec_node['tags'] = tags
+ if node['node_id'] in slivers:
+ # add sliver info
+ sliver = slivers[node['node_id']]
+ rspec_node['sliver_id'] = sliver['sliver_id']
+ rspec_node['client_id'] = node['hostname']
+ rspec_node['slivers'] = [slivers[node['node_id']]]
+
+ # slivers always provide the ssh service
+ login = Login({'authentication': 'ssh-keys', 'hostname': node['hostname'], port:'22'})
+ service = Services({'login': login})
+ rspec_node['services'].append(service)
+ rspec_nodes.append(rspec_node)
+ return rspec_nodes
+
+
def get_rspec(self, slice_xrn=None, version = None):
+
version_manager = VersionManager()
version = version_manager.get_version(version)
if not slice_xrn:
rspec_version = version_manager._get_version(version.type, version.version, 'ad')
else:
rspec_version = version_manager._get_version(version.type, version.version, 'manifest')
-
- rspec = RSpec(version=rspec_version, user_options=self.user_options)
- # get slice details if specified
- slice = None
- if slice_xrn:
- slice_hrn, _ = urn_to_hrn(slice_xrn)
- slice_name = hrn_to_pl_slicename(slice_hrn)
- slices = self.api.driver.GetSlices(slice_name)
- if slices:
- slice = slices[0]
- self.prepare(slice=slice)
- else:
- self.prepare()
-
- # filter out nodes with a whitelist:
- valid_nodes = []
- for node in self.nodes.values():
- # only doing this because protogeni rspec needs
- # to advertise available initscripts
- node['pl_initscripts'] = self.pl_initscripts
-
- if slice and node['node_id'] in slice['node_ids']:
- valid_nodes.append(node)
- elif slice and slice['slice_id'] in node['slice_ids_whitelist']:
- valid_nodes.append(node)
- elif not slice and not node['slice_ids_whitelist']:
- valid_nodes.append(node)
-
- rspec.version.add_nodes(valid_nodes)
- rspec.version.add_interfaces(self.interfaces.values())
- rspec.version.add_links(self.links.values())
-
- # add slivers
- if slice_xrn and slice:
- slivers = []
- tags = self.api.driver.GetSliceTags(slice['slice_tag_ids'])
-
- # add default tags
- for tag in tags:
- # if tag isn't bound to a node then it applies to all slivers
- # and belongs in the <sliver_defaults> tag
- if not tag['node_id']:
- rspec.version.add_default_sliver_attribute(tag['tagname'], tag['value'], self.api.hrn)
- if tag['tagname'] == 'topo_rspec' and tag['node_id']:
- node = self.nodes[tag['node_id']]
- value = eval(tag['value'])
- for (id, realip, bw, lvip, rvip, vnet) in value:
- bps = get_tc_rate(bw)
- remote = self.nodes[id]
- site1 = self.sites[node['site_id']]
- site2 = self.sites[remote['site_id']]
- link1_name = '%s:%s' % (site1['login_base'], site2['login_base'])
- link2_name = '%s:%s' % (site2['login_base'], site1['login_base'])
- p_link = None
- if link1_name in self.links:
- link = self.links[link1_name]
- elif link2_name in self.links:
- link = self.links[link2_name]
- v_link = Link()
-
- link.capacity = bps
- for node_id in slice['node_ids']:
- try:
- sliver = {}
- sliver['hostname'] = self.nodes[node_id]['hostname']
- sliver['node_id'] = node_id
- sliver['slice_id'] = slice['slice_id']
- sliver['tags'] = []
- slivers.append(sliver)
-
- # add tags for this node only
- for tag in tags:
- if tag['node_id'] and (tag['node_id'] == node_id):
- sliver['tags'].append(tag)
- except:
- self.api.logger.log_exc('unable to add sliver %s to node %s' % (slice['name'], node_id))
- rspec.version.add_slivers(slivers, sliver_urn=slice_xrn)
+ slice, slivers = self.get_slice_and_slivers(slice_xrn)
+ rspec = RSpec(version=rspec_version, user_options=self.user_options)
+ rspec.version.add_nodes(self.get_nodes(slice, slivers))
+ rspec.version.add_links(self.get_links(slice))
+
+ # add sliver defaults
+ default_sliver_attribs = slivers.get(None, [])
+ for sliver_attrib in default_sliver_attribs:
+ rspec.version.add_default_sliver_attribute(sliver_attrib['name'], sliver_attrib['value'])
+
return rspec.toxml()
+
+
'component_id': None,
'component_name': None,
'component_manager_id': None,
+ 'client_id': None,
+ 'sliver_id': None,
'authority_id': None,
'exclusive': None,
'location': None,
'bw_limit': None,
'boot_state': None,
'slivers': [],
- 'hardware_type': [],
- 'disk_image': [],
+ 'hardware_types': [],
+ 'disk_images': [],
'interfaces': [],
+ 'services': [],
'tags': [],
+ 'pl_initscripts': [],
}
class Sliver(Element):
fields = {
+ 'sliver_id': None,
+ 'client_id': None,
'name': None,
'tags': [],
- 'slice_id': None,
-
}
+
+from lxml import etree
+from sfa.util.plxrn import PlXrn
+from sfa.util.xrn import Xrn
+from sfa.rspecs.elements.node import Node
+from sfa.rspecs.elements.sliver import Sliver
+from sfa.rspecs.elements.network import Network
+from sfa.rspecs.elements.location import Location
+from sfa.rspecs.elements.hardware_type import HardwareType
+from sfa.rspecs.elements.disk_image import DiskImage
+from sfa.rspecs.elements.interface import Interface
+from sfa.rspecs.elements.bwlimit import BWlimit
+from sfa.rspecs.elements.pl_tag import PLTag
+from sfa.rspecs.rspec_elements import RSpecElement, RSpecElements
+from sfa.rspecs.elements.versions.pgv2Service import PGv2Service
+
+class PGv2Node:
+ elements = {
+ 'node': RSpecElement(RSpecElements.NODE, '//default:node | //node'),
+ 'sliver': RSpecElement(RSpecElements.SLIVER, './default:sliver_type | ./sliver_type'),
+ 'interface': RSpecElement(RSpecElements.INTERFACE, './default:interface | ./interface'),
+ 'location': RSpecElement(RSpecElements.LOCATION, './default:location | ./location'),
+ 'hardware_type': RSpecElement(RSpecElements.HARDWARE_TYPE, './default:hardware_type | ./hardware_type'),
+ 'available': RSpecElement(RSpecElements.AVAILABLE, './default:available | ./available'),
+ }
+
+ @staticmethod
+ def add_nodes(xml, nodes):
+ node_elems = []
+ for node in nodes:
+ node_elem = etree.SubElement(xml, 'node')
+ node_elems.append(node_elem)
+ if node.get('component_manager_id'):
+ node_elem.set('component_manager_id', node['component_manager_id'])
+ if node.get('component_id'):
+ node_elem.set('component_id', node['component_id'])
+ component_name = Xrn(node['component_id']).get_leaf()
+ node_elem.set('component_nama', component_name)
+ if node.get('client_id'):
+ node_elem.set('client_id', node['client_id'])
+ if node.get('sliver_id'):
+ node_elem.set('sliver_id', node['sliver_id'])
+ if node.get('exclusive'):
+ node_elem.set('exclusive', node['exclusive'])
+ hardware_types = node.get('hardware_type', [])
+ for hardware_type in hardware_types:
+ hw_type_elem = etree.SubElement(node_elem, 'hardware_type')
+ if hardware_type.get('name'):
+ hw_type_elem.set('name', hardware_type['name'])
+ if node.get('boot_state', '').lower() == 'boot':
+ available_elem = etree.SubElement(node_elem, 'available', now='True')
+ else:
+ available_elem = etree.SubElement(node_elem, 'available', now='False')
+
+ if node.get('services'):
+ PGv2Services.add_services(node_elem, node.get('services'))
+
+ slivers = node.get('slivers', [])
+ pl_initscripts = node.get('pl_initscripts', {})
+ for sliver in slivers:
+ sliver_elem = etree.SubElement(node_elem, 'sliver_type')
+ if sliver.get('name'):
+ sliver_elem.set('name', sliver['name'])
+ if sliver.get('client_id'):
+ sliver_elem.set('client_id', sliver['client_id'])
+ for pl_initscript in pl_initscripts.values():
+ etree.SubElement(sliver_elem, '{%s}initscript' % xml.namespaces['planetlab'], \
+ name=pl_initscript['name'])
+ location = node.get('location')
+ #only add locaiton if long and lat are not null
+ if location.get('longitute') and location.get('latitude'):
+ location_elem = etree.SubElement(node_elem, country=location['country'],
+ latitude=location['latitude'], longitude=location['longiutde'])
+ return node_elems
+
+ @staticmethod
+ def get_nodes(xml):
+ nodes = []
+ node_elems = xml.xpath(PGv2Node.elements['node'].path)
+ for node_elem in node_elems:
+ node = Node(node_elem.attrib, node_elem)
+ nodes.append(node)
+ if 'component_id' in node_elem.attrib:
+ node['authority_id'] = Xrn(node_elem.attrib['component_id']).get_authority_urn()
+
+ # set hardware type
+ node['hardware_types'] = []
+ hardware_type_elems = node_elem.xpath(PGv2Node.elements['hardware_type'].path, xml.namespaces)
+ for hardware_type_elem in hardware_type_elems:
+ node['hardware_types'].append(HardwareType(hardware_type_elem.attrib, hardware_type_elem))
+
+ # set location
+ location_elems = node_elem.xpath(PGv2Node.elements['location'].path, xml.namespaces)
+ if len(location_elems) > 0:
+ node['location'] = Location(location_elems[0].attrib, location_elems[0])
+
+ # set services
+ services_elems = node_elem.xpath(PGv2Service.elements['services'].path, xml.namespaces)
+ node['services'] = []
+ for services_elem in services_elems:
+ # services element has no useful info, but the child elements do
+ for child in services_elem.iterchildren():
+
+ # set interfaces
+ interface_elems = node_elem.xpath(PGv2Node.elements['interface'].path, xml.namespaces)
+ node['interfaces'] = []
+ for interface_elem in interface_elems:
+ node['interfaces'].append(Interface(interface_elem.attrib, interface_elem))
+
+ # set available
+ available = node_elem.xpath(PGv2Node.elements['available'].path, xml.namespaces)
+ if len(available) > 0:
+ if available[0].attrib.get('now', '').lower() == 'true':
+ node['boot_state'] = 'boot'
+ else:
+ node['boot_state'] = 'disabled'
+
+ # set the slivers
+ sliver_elems = node_elem.xpath(PGv2Node.elements['sliver'].path, xml.namespaces)
+ node['slivers'] = []
+ for sliver_elem in sliver_elems:
+ node['slivers'].append(Sliver(sliver_elem.attrib, sliver_elem))
+
+ return nodes
+
+
+ @staticmethod
+ def add_slivers(xml, slivers):
+ pass
+
+ @staticmethod
+ def get_nodes_with_slivers(xml):
+ nodes = PGv2Node.get_nodes(xml)
+ nodes_with_slivers = [node for node in nodes if node['slivers']]
+ return nodes_with_slivers
+
+if __name__ == '__main__':
+ from sfa.rspecs.rspec import RSpec
+ import pdb
+ r = RSpec('/tmp/emulab.rspec')
+ r2 = RSpec(version = 'ProtoGENI')
+ nodes = PGv2Node.get_nodes(r.xml)
+ PGv2Node.add_nodes(r2.xml.root, nodes)
+ #pdb.set_trace()
+
+
elements = {
'services': RSpecElement(RSpecElements.SERVICES, '//default:services | //services'),
'install': RSpecElement(RspecElements.INSTALL, './default:install | ./install'),
- 'execute': RSpecElement(RspecElements.INSTALL, './default:execute | ./execute'),
- 'login': RSpecElement(RspecElements.INSTALL, './default:login | ./login'),
+ 'execute': RSpecElement(RspecElements.EXECUTE, './default:execute | ./execute'),
+ 'login': RSpecElement(RspecElements.LOGIN, './default:login | ./login'),
}
@staticmethod
from sfa.rspecs.elements.pl_tag import PLTag
from sfa.rspecs.rspec_elements import RSpecElement, RSpecElements
from sfa.rspecs.elements.versions.sfav1Network import SFAv1Network
+from sfa.rspecs.elements.versions.pgv2Services import PGv2Services
class SFAv1Node:
for field in Location.fields:
if field in node['location'] and node['location'][field]:
location_elem.set(field, node['location'][field])
-
if 'interfaces' in node and node['interfaces']:
i = 0
for interface in node['interfaces']:
if 'bw_unallocated' in node and node['bw_unallocated']:
bw_unallocated = etree.SubElement(node_elem, 'bw_unallocated', units='kbps').text = str(int(node['bw_unallocated'])/1000)
+ if node.get('services'):
+ PGv2Services.add_services(node_elem, node.get('services'))
+
if 'tags' in node:
for tag in node['tags']:
# expose this hard wired list of tags, plus the ones that are marked 'sfa' in their category
if tag['name'] in ['fcdistro', 'arch']:
tag_element = etree.SubElement(node_elem, tag['name']).text=tag['value']
- if 'slivers' in node:
+ if node.get('slivers'):
for sliver in node['slivers']:
sliver_elem = etree.SubElement(node_elem, 'sliver')
- if 'name' in sliver and sliver['name']:
- sliver_elem.set('name', sliver['name'])
+ if sliver.get('sliver_id'):
+ sliver_id_leaf = Xrn(sliver.get('sliver_id')).get_leaf()
+ sliver_id_parts = sliver_id_leaf.split(':')
+ name = sliver_id_parts[0]
+ sliver_elem.set('name', name)
@staticmethod
def add_slivers(xml, slivers):
node = Node(node_elem.attrib, node_elem)
if 'site_id' in node_elem.attrib:
node['authority_id'] = node_elem.attrib['site_id']
-
+ if 'authority_id' in node_elem.attrib:
+ node['authority_id'] = node_elem.attrib['authority_id']
+
# set the location
location_elems = node_elem.xpath(SFAv1Node.elements['location'].path, xml.namespaces)
if len(location_elems) > 0:
+
+from lxml import etree
+
+from sfa.rspecs.elements.sliver import Sliver
+
+from sfa.util.xrn import Xrn
+from sfa.util.plxrn import PlXrn
+class SFAv1Sliver:
+
+ @staticmethod
+ def add_slivers(xml, slivers):
+ for sliver in slivers:
+ sliver_elem = etree.SubElement(xml, 'sliver')
+ if sliver.get('component_id'):
+ name_full = Xrn(sliver.get('component_id')).get_leaf()
+ name = name_full.split(':')
+ sliver_elem.set('name', name)
+
# recognized top level rspec elements
RSpecElements = Enum(
+ AVAILABLE='AVAILABLE',
BWLIMIT='BWLIMIT',
EXECUTE='EXECUTE',
NETWORK='NETWORK',
COMPONENT_MANAGER='COMPONENT_MANAGER',
+ HARDWARE_TYPE='HARDWARE_TYPE',
INSTALL='INSTALL',
INTERFACE='INTERFACE',
INTERFACE_REF='INTERFACE_REF',
from sfa.server.interface import Interfaces, Interface
from sfa.util.config import Config
+# this truly is a server-side object
class Aggregate(SfaServer):
##
def __init__(self, ip, port, key_file, cert_file):
SfaServer.__init__(self, ip, port, key_file, cert_file,'aggregate')
-##
+#
# Aggregates is a dictionary of aggregate connections keyed on the aggregate hrn
-
+# as such it's more of a client-side thing for aggregate servers to reach their peers
+#
class Aggregates(Interfaces):
default_dict = {'aggregates': {'aggregate': [Interfaces.default_fields]}}
from sfa.server.interface import Interfaces, Interface
from sfa.util.config import Config
-##
+#
# Registry is a SfaServer that serves registry and slice operations at PLC.
+# this truly is a server-side object
+#
class Registry(SfaServer):
##
# Create a new registry object.
def __init__(self, ip, port, key_file, cert_file):
SfaServer.__init__(self, ip, port, key_file, cert_file,'registry')
-##
-# Registries is a dictionary of registry connections keyed on the registry
-# hrn
-
+#
+# Registries is a dictionary of registry connections keyed on the registry hrn
+# as such it's more of a client-side thing for registry servers to reach their peers
+#
class Registries(Interfaces):
default_dict = {'registries': {'registry': [Interfaces.default_fields]}}
# it hard for us to write xpath queries for the default naemspace because lxml
# wont understand a None prefix. We will just associate the default namespeace
# with a key named 'default'.
- self.namespaces['default'] = self.namespaces[None]
+ self.namespaces['default'] = self.namespaces.pop(None)
+
else:
self.namespaces['default'] = 'default'
-
# set schema
for key in self.root.attrib.keys():
if key.endswith('schemaLocation'):