def slicemgr_manager_class (self) :
return sfa.managers.slice_manager.SliceManager
def aggregate_manager_class (self) :
- return sfa.managers.aggregate_manager
+ return sfa.managers.aggregate_manager.AggregateManager
# driver class for server-side services, talk to the whole testbed
def driver_class (self):
from sfa.plc.aggregate import Aggregate
from sfa.plc.slices import Slices
-def GetVersion(api):
+class AggregateManager:
- version_manager = VersionManager()
- ad_rspec_versions = []
- request_rspec_versions = []
- for rspec_version in version_manager.versions:
- if rspec_version.content_type in ['*', 'ad']:
- ad_rspec_versions.append(rspec_version.to_dict())
- if rspec_version.content_type in ['*', 'request']:
- request_rspec_versions.append(rspec_version.to_dict())
- default_rspec_version = version_manager.get_version("sfa 1").to_dict()
- xrn=Xrn(api.hrn)
- version_more = {'interface':'aggregate',
- 'testbed':'myplc',
- 'hrn':xrn.get_hrn(),
- 'request_rspec_versions': request_rspec_versions,
- 'ad_rspec_versions': ad_rspec_versions,
- 'default_ad_rspec': default_rspec_version
- }
- return version_core(version_more)
-
-def __get_registry_objects(slice_xrn, creds, users):
- """
-
- """
- hrn, _ = urn_to_hrn(slice_xrn)
-
- hrn_auth = get_authority(hrn)
-
- # Build up objects that an SFA registry would return if SFA
- # could contact the slice's registry directly
- reg_objects = None
-
- if users:
- # dont allow special characters in the site login base
- #only_alphanumeric = re.compile('[^a-zA-Z0-9]+')
- #login_base = only_alphanumeric.sub('', hrn_auth[:20]).lower()
+ def __init__ (self):
+ # xxx Thierry : caching at the aggregate level sounds wrong...
+ #self.caching=True
+ self.caching=False
+
+ def GetVersion(self, api):
+
+ version_manager = VersionManager()
+ ad_rspec_versions = []
+ request_rspec_versions = []
+ for rspec_version in version_manager.versions:
+ if rspec_version.content_type in ['*', 'ad']:
+ ad_rspec_versions.append(rspec_version.to_dict())
+ if rspec_version.content_type in ['*', 'request']:
+ request_rspec_versions.append(rspec_version.to_dict())
+ default_rspec_version = version_manager.get_version("sfa 1").to_dict()
+ xrn=Xrn(api.hrn)
+ version_more = {'interface':'aggregate',
+ 'testbed':'myplc',
+ 'hrn':xrn.get_hrn(),
+ 'request_rspec_versions': request_rspec_versions,
+ 'ad_rspec_versions': ad_rspec_versions,
+ 'default_ad_rspec': default_rspec_version
+ }
+ return version_core(version_more)
+
+ def _get_registry_objects(self, slice_xrn, creds, users):
+ """
+
+ """
+ hrn, _ = urn_to_hrn(slice_xrn)
+
+ hrn_auth = get_authority(hrn)
+
+ # Build up objects that an SFA registry would return if SFA
+ # could contact the slice's registry directly
+ reg_objects = None
+
+ if users:
+ # dont allow special characters in the site login base
+ #only_alphanumeric = re.compile('[^a-zA-Z0-9]+')
+ #login_base = only_alphanumeric.sub('', hrn_auth[:20]).lower()
+ slicename = hrn_to_pl_slicename(hrn)
+ login_base = slicename.split('_')[0]
+ reg_objects = {}
+ site = {}
+ site['site_id'] = 0
+ site['name'] = 'geni.%s' % login_base
+ site['enabled'] = True
+ site['max_slices'] = 100
+
+ # Note:
+ # Is it okay if this login base is the same as one already at this myplc site?
+ # Do we need uniqueness? Should use hrn_auth instead of just the leaf perhaps?
+ site['login_base'] = login_base
+ site['abbreviated_name'] = login_base
+ site['max_slivers'] = 1000
+ reg_objects['site'] = site
+
+ slice = {}
+
+ # get_expiration always returns a normalized datetime - no need to utcparse
+ extime = Credential(string=creds[0]).get_expiration()
+ # If the expiration time is > 60 days from now, set the expiration time to 60 days from now
+ if extime > datetime.datetime.utcnow() + datetime.timedelta(days=60):
+ extime = datetime.datetime.utcnow() + datetime.timedelta(days=60)
+ slice['expires'] = int(time.mktime(extime.timetuple()))
+ slice['hrn'] = hrn
+ slice['name'] = hrn_to_pl_slicename(hrn)
+ slice['url'] = hrn
+ slice['description'] = hrn
+ slice['pointer'] = 0
+ reg_objects['slice_record'] = slice
+
+ reg_objects['users'] = {}
+ for user in users:
+ user['key_ids'] = []
+ hrn, _ = urn_to_hrn(user['urn'])
+ user['email'] = hrn_to_pl_slicename(hrn) + "@geni.net"
+ user['first_name'] = hrn
+ user['last_name'] = hrn
+ reg_objects['users'][user['email']] = user
+
+ return reg_objects
+
+ def SliverStatus(self, api, slice_xrn, creds, call_id):
+ if Callids().already_handled(call_id): return {}
+
+ (hrn, _) = urn_to_hrn(slice_xrn)
+ # find out where this slice is currently running
slicename = hrn_to_pl_slicename(hrn)
- login_base = slicename.split('_')[0]
- reg_objects = {}
- site = {}
- site['site_id'] = 0
- site['name'] = 'geni.%s' % login_base
- site['enabled'] = True
- site['max_slices'] = 100
-
- # Note:
- # Is it okay if this login base is the same as one already at this myplc site?
- # Do we need uniqueness? Should use hrn_auth instead of just the leaf perhaps?
- site['login_base'] = login_base
- site['abbreviated_name'] = login_base
- site['max_slivers'] = 1000
- reg_objects['site'] = site
-
- slice = {}
- # get_expiration always returns a normalized datetime - no need to utcparse
- extime = Credential(string=creds[0]).get_expiration()
- # If the expiration time is > 60 days from now, set the expiration time to 60 days from now
- if extime > datetime.datetime.utcnow() + datetime.timedelta(days=60):
- extime = datetime.datetime.utcnow() + datetime.timedelta(days=60)
- slice['expires'] = int(time.mktime(extime.timetuple()))
- slice['hrn'] = hrn
- slice['name'] = hrn_to_pl_slicename(hrn)
- slice['url'] = hrn
- slice['description'] = hrn
- slice['pointer'] = 0
- reg_objects['slice_record'] = slice
-
- reg_objects['users'] = {}
- for user in users:
- user['key_ids'] = []
- hrn, _ = urn_to_hrn(user['urn'])
- user['email'] = hrn_to_pl_slicename(hrn) + "@geni.net"
- user['first_name'] = hrn
- user['last_name'] = hrn
- reg_objects['users'][user['email']] = user
-
- return reg_objects
-
-def __get_hostnames(nodes):
- hostnames = []
- for node in nodes:
- hostnames.append(node.hostname)
- return hostnames
-
-def SliverStatus(api, slice_xrn, creds, call_id):
- if Callids().already_handled(call_id): return {}
-
- (hrn, _) = urn_to_hrn(slice_xrn)
- # find out where this slice is currently running
- slicename = hrn_to_pl_slicename(hrn)
-
- slices = api.driver.GetSlices([slicename], ['slice_id', 'node_ids','person_ids','name','expires'])
- if len(slices) == 0:
- raise Exception("Slice %s not found (used %s as slicename internally)" % (slice_xrn, slicename))
- slice = slices[0]
-
- # report about the local nodes only
- nodes = api.driver.GetNodes({'node_id':slice['node_ids'],'peer_id':None},
- ['node_id', 'hostname', 'site_id', 'boot_state', 'last_contact'])
- site_ids = [node['site_id'] for node in nodes]
-
- result = {}
- top_level_status = 'unknown'
- if nodes:
- top_level_status = 'ready'
- slice_urn = Xrn(slice_xrn, 'slice').get_urn()
- result['geni_urn'] = slice_urn
- result['pl_login'] = slice['name']
- result['pl_expires'] = datetime.datetime.fromtimestamp(slice['expires']).ctime()
-
- resources = []
- for node in nodes:
- res = {}
- res['pl_hostname'] = node['hostname']
- res['pl_boot_state'] = node['boot_state']
- res['pl_last_contact'] = node['last_contact']
- if node['last_contact'] is not None:
- res['pl_last_contact'] = datetime.datetime.fromtimestamp(node['last_contact']).ctime()
- sliver_id = urn_to_sliver_id(slice_urn, slice['slice_id'], node['node_id'])
- res['geni_urn'] = sliver_id
- if node['boot_state'] == 'boot':
- res['geni_status'] = 'ready'
- else:
- res['geni_status'] = 'failed'
- top_level_status = 'failed'
+ slices = api.driver.GetSlices([slicename], ['slice_id', 'node_ids','person_ids','name','expires'])
+ if len(slices) == 0:
+ raise Exception("Slice %s not found (used %s as slicename internally)" % (slice_xrn, slicename))
+ slice = slices[0]
+
+ # report about the local nodes only
+ nodes = api.driver.GetNodes({'node_id':slice['node_ids'],'peer_id':None},
+ ['node_id', 'hostname', 'site_id', 'boot_state', 'last_contact'])
+ site_ids = [node['site_id'] for node in nodes]
+
+ result = {}
+ top_level_status = 'unknown'
+ if nodes:
+ top_level_status = 'ready'
+ slice_urn = Xrn(slice_xrn, 'slice').get_urn()
+ result['geni_urn'] = slice_urn
+ result['pl_login'] = slice['name']
+ result['pl_expires'] = datetime.datetime.fromtimestamp(slice['expires']).ctime()
+
+ resources = []
+ for node in nodes:
+ res = {}
+ res['pl_hostname'] = node['hostname']
+ res['pl_boot_state'] = node['boot_state']
+ res['pl_last_contact'] = node['last_contact']
+ if node['last_contact'] is not None:
+ res['pl_last_contact'] = datetime.datetime.fromtimestamp(node['last_contact']).ctime()
+ sliver_id = urn_to_sliver_id(slice_urn, slice['slice_id'], node['node_id'])
+ res['geni_urn'] = sliver_id
+ if node['boot_state'] == 'boot':
+ res['geni_status'] = 'ready'
+ else:
+ res['geni_status'] = 'failed'
+ top_level_status = 'failed'
+
+ res['geni_error'] = ''
+
+ resources.append(res)
- res['geni_error'] = ''
-
- resources.append(res)
+ result['geni_status'] = top_level_status
+ result['geni_resources'] = resources
+ return result
+
+ def CreateSliver(self, api, slice_xrn, creds, rspec_string, users, call_id):
+ """
+ Create the sliver[s] (slice) at this aggregate.
+ Verify HRN and initialize the slice record in PLC if necessary.
+ """
+ if Callids().already_handled(call_id): return ""
+
+ aggregate = Aggregate(api)
+ slices = Slices(api)
+ (hrn, _) = urn_to_hrn(slice_xrn)
+ peer = slices.get_peer(hrn)
+ sfa_peer = slices.get_sfa_peer(hrn)
+ slice_record=None
+ if users:
+ slice_record = users[0].get('slice_record', {})
+
+ # parse rspec
+ rspec = RSpec(rspec_string)
+ requested_attributes = rspec.version.get_slice_attributes()
- result['geni_status'] = top_level_status
- result['geni_resources'] = resources
- return result
-
-def CreateSliver(api, slice_xrn, creds, rspec_string, users, call_id):
- """
- Create the sliver[s] (slice) at this aggregate.
- Verify HRN and initialize the slice record in PLC if necessary.
- """
- if Callids().already_handled(call_id): return ""
-
- aggregate = Aggregate(api)
- slices = Slices(api)
- (hrn, _) = urn_to_hrn(slice_xrn)
- peer = slices.get_peer(hrn)
- sfa_peer = slices.get_sfa_peer(hrn)
- slice_record=None
- if users:
- slice_record = users[0].get('slice_record', {})
-
- # parse rspec
- rspec = RSpec(rspec_string)
- requested_attributes = rspec.version.get_slice_attributes()
-
- # ensure site record exists
- site = slices.verify_site(hrn, slice_record, peer, sfa_peer)
- # ensure slice record exists
- slice = slices.verify_slice(hrn, slice_record, peer, sfa_peer)
- # ensure person records exists
- persons = slices.verify_persons(hrn, slice, users, peer, sfa_peer)
- # ensure slice attributes exists
- slices.verify_slice_attributes(slice, requested_attributes)
-
- # add/remove slice from nodes
- requested_slivers = [str(host) for host in rspec.version.get_nodes_with_slivers()]
- slices.verify_slice_nodes(slice, requested_slivers, peer)
-
- aggregate.prepare_nodes({'hostname': requested_slivers})
- aggregate.prepare_interfaces({'node_id': aggregate.nodes.keys()})
- slices.verify_slice_links(slice, rspec.version.get_link_requests(), aggregate)
-
- # handle MyPLC peer association.
- # only used by plc and ple.
- slices.handle_peer(site, slice, persons, peer)
+ # ensure site record exists
+ site = slices.verify_site(hrn, slice_record, peer, sfa_peer)
+ # ensure slice record exists
+ slice = slices.verify_slice(hrn, slice_record, peer, sfa_peer)
+ # ensure person records exists
+ persons = slices.verify_persons(hrn, slice, users, peer, sfa_peer)
+ # ensure slice attributes exists
+ slices.verify_slice_attributes(slice, requested_attributes)
+
+ # add/remove slice from nodes
+ requested_slivers = [str(host) for host in rspec.version.get_nodes_with_slivers()]
+ slices.verify_slice_nodes(slice, requested_slivers, peer)
+
+ aggregate.prepare_nodes({'hostname': requested_slivers})
+ aggregate.prepare_interfaces({'node_id': aggregate.nodes.keys()})
+ slices.verify_slice_links(slice, rspec.version.get_link_requests(), aggregate)
+
+ # handle MyPLC peer association.
+ # only used by plc and ple.
+ slices.handle_peer(site, slice, persons, peer)
+
+ return aggregate.get_rspec(slice_xrn=slice_xrn, version=rspec.version)
+
+
+ def RenewSliver(self, api, xrn, creds, expiration_time, call_id):
+ if Callids().already_handled(call_id): return True
+ (hrn, _) = urn_to_hrn(xrn)
+ slicename = hrn_to_pl_slicename(hrn)
+ slices = api.driver.GetSlices({'name': slicename}, ['slice_id'])
+ if not slices:
+ raise RecordNotFound(hrn)
+ slice = slices[0]
+ requested_time = utcparse(expiration_time)
+ record = {'expires': int(time.mktime(requested_time.timetuple()))}
+ try:
+ api.driver.UpdateSlice(slice['slice_id'], record)
+ return True
+ except:
+ return False
+
+ def start_slice(self, api, xrn, creds):
+ (hrn, _) = urn_to_hrn(xrn)
+ slicename = hrn_to_pl_slicename(hrn)
+ slices = api.driver.GetSlices({'name': slicename}, ['slice_id'])
+ if not slices:
+ raise RecordNotFound(hrn)
+ slice_id = slices[0]['slice_id']
+ slice_tags = api.driver.GetSliceTags({'slice_id': slice_id, 'tagname': 'enabled'}, ['slice_tag_id'])
+ # just remove the tag if it exists
+ if slice_tags:
+ api.driver.DeleteSliceTag(slice_tags[0]['slice_tag_id'])
- return aggregate.get_rspec(slice_xrn=slice_xrn, version=rspec.version)
-
-
-def RenewSliver(api, xrn, creds, expiration_time, call_id):
- if Callids().already_handled(call_id): return True
- (hrn, _) = urn_to_hrn(xrn)
- slicename = hrn_to_pl_slicename(hrn)
- slices = api.driver.GetSlices({'name': slicename}, ['slice_id'])
- if not slices:
- raise RecordNotFound(hrn)
- slice = slices[0]
- requested_time = utcparse(expiration_time)
- record = {'expires': int(time.mktime(requested_time.timetuple()))}
- try:
- api.driver.UpdateSlice(slice['slice_id'], record)
- return True
- except:
- return False
-
-def start_slice(api, xrn, creds):
- (hrn, _) = urn_to_hrn(xrn)
- slicename = hrn_to_pl_slicename(hrn)
- slices = api.driver.GetSlices({'name': slicename}, ['slice_id'])
- if not slices:
- raise RecordNotFound(hrn)
- slice_id = slices[0]['slice_id']
- slice_tags = api.driver.GetSliceTags({'slice_id': slice_id, 'tagname': 'enabled'}, ['slice_tag_id'])
- # just remove the tag if it exists
- if slice_tags:
- api.driver.DeleteSliceTag(slice_tags[0]['slice_tag_id'])
-
- return 1
-
-def stop_slice(api, xrn, creds):
- hrn, _ = urn_to_hrn(xrn)
- slicename = hrn_to_pl_slicename(hrn)
- slices = api.driver.GetSlices({'name': slicename}, ['slice_id'])
- if not slices:
- raise RecordNotFound(hrn)
- slice_id = slices[0]['slice_id']
- slice_tags = api.driver.GetSliceTags({'slice_id': slice_id, 'tagname': 'enabled'})
- if not slice_tags:
- api.driver.AddSliceTag(slice_id, 'enabled', '0')
- elif slice_tags[0]['value'] != "0":
- tag_id = slice_tags[0]['slice_tag_id']
- api.driver.UpdateSliceTag(tag_id, '0')
- return 1
-
-def reset_slice(api, xrn):
- # XX not implemented at this interface
- return 1
-
-def DeleteSliver(api, xrn, creds, call_id):
- if Callids().already_handled(call_id): return ""
- (hrn, _) = urn_to_hrn(xrn)
- slicename = hrn_to_pl_slicename(hrn)
- slices = api.driver.GetSlices({'name': slicename})
- if not slices:
return 1
- slice = slices[0]
-
- # determine if this is a peer slice
- peer = peers.get_peer(api, hrn)
- try:
- if peer:
- api.driver.UnBindObjectFromPeer('slice', slice['slice_id'], peer)
- api.driver.DeleteSliceFromNodes(slicename, slice['node_ids'])
- finally:
- if peer:
- api.driver.BindObjectToPeer('slice', slice['slice_id'], peer, slice['peer_slice_id'])
- return 1
-
-# xxx Thierry : caching at the aggregate level sounds wrong...
-#caching=True
-caching=False
-def ListSlices(api, creds, call_id):
- if Callids().already_handled(call_id): return []
- # look in cache first
- if caching and api.cache:
- slices = api.cache.get('slices')
- if slices:
- return slices
-
- # get data from db
- slices = api.driver.GetSlices({'peer_id': None}, ['name'])
- slice_hrns = [slicename_to_hrn(api.hrn, slice['name']) for slice in slices]
- slice_urns = [hrn_to_urn(slice_hrn, 'slice') for slice_hrn in slice_hrns]
-
- # cache the result
- if caching and api.cache:
- api.cache.add('slices', slice_urns)
-
- return slice_urns
+
+ def stop_slice(self, api, xrn, creds):
+ hrn, _ = urn_to_hrn(xrn)
+ slicename = hrn_to_pl_slicename(hrn)
+ slices = api.driver.GetSlices({'name': slicename}, ['slice_id'])
+ if not slices:
+ raise RecordNotFound(hrn)
+ slice_id = slices[0]['slice_id']
+ slice_tags = api.driver.GetSliceTags({'slice_id': slice_id, 'tagname': 'enabled'})
+ if not slice_tags:
+ api.driver.AddSliceTag(slice_id, 'enabled', '0')
+ elif slice_tags[0]['value'] != "0":
+ tag_id = slice_tags[0]['slice_tag_id']
+ api.driver.UpdateSliceTag(tag_id, '0')
+ return 1
-def ListResources(api, creds, options, call_id):
- if Callids().already_handled(call_id): return ""
- # get slice's hrn from options
- xrn = options.get('geni_slice_urn', None)
- (hrn, _) = urn_to_hrn(xrn)
-
- version_manager = VersionManager()
- # get the rspec's return format from options
- rspec_version = version_manager.get_version(options.get('rspec_version'))
- version_string = "rspec_%s" % (rspec_version.to_string())
-
- #panos adding the info option to the caching key (can be improved)
- if options.get('info'):
- version_string = version_string + "_"+options.get('info', 'default')
-
- # look in cache first
- if caching and api.cache and not xrn:
- rspec = api.cache.get(version_string)
- if rspec:
- api.logger.info("aggregate.ListResources: returning cached value for hrn %s"%hrn)
- return rspec
-
- #panos: passing user-defined options
- #print "manager options = ",options
- aggregate = Aggregate(api, options)
- rspec = aggregate.get_rspec(slice_xrn=xrn, version=rspec_version)
-
- # cache the result
- if caching and api.cache and not xrn:
- api.cache.add(version_string, rspec)
-
- return rspec
-
-
-def get_ticket(api, xrn, creds, rspec, users):
-
- (slice_hrn, _) = urn_to_hrn(xrn)
- slices = Slices(api)
- peer = slices.get_peer(slice_hrn)
- sfa_peer = slices.get_sfa_peer(slice_hrn)
-
- # get the slice record
- credential = api.getCredential()
- interface = api.registries[api.hrn]
- registry = api.server_proxy(interface, credential)
- records = registry.Resolve(xrn, credential)
-
- # make sure we get a local slice record
- record = None
- for tmp_record in records:
- if tmp_record['type'] == 'slice' and \
- not tmp_record['peer_authority']:
-#Error (E0602, get_ticket): Undefined variable 'SliceRecord'
- record = SliceRecord(dict=tmp_record)
- if not record:
- raise RecordNotFound(slice_hrn)
-
- # similar to CreateSliver, we must verify that the required records exist
- # at this aggregate before we can issue a ticket
- # parse rspec
- rspec = RSpec(rspec_string)
- requested_attributes = rspec.version.get_slice_attributes()
-
- # ensure site record exists
- site = slices.verify_site(hrn, slice_record, peer, sfa_peer)
- # ensure slice record exists
- slice = slices.verify_slice(hrn, slice_record, peer, sfa_peer)
- # ensure person records exists
- persons = slices.verify_persons(hrn, slice, users, peer, sfa_peer)
- # ensure slice attributes exists
- slices.verify_slice_attributes(slice, requested_attributes)
-
- # get sliver info
- slivers = slices.get_slivers(slice_hrn)
-
- if not slivers:
- raise SliverDoesNotExist(slice_hrn)
-
- # get initscripts
- initscripts = []
- data = {
- 'timestamp': int(time.time()),
- 'initscripts': initscripts,
- 'slivers': slivers
- }
-
- # create the ticket
- object_gid = record.get_gid_object()
- new_ticket = SfaTicket(subject = object_gid.get_subject())
- new_ticket.set_gid_caller(api.auth.client_gid)
- new_ticket.set_gid_object(object_gid)
- new_ticket.set_issuer(key=api.key, subject=api.hrn)
- new_ticket.set_pubkey(object_gid.get_pubkey())
- new_ticket.set_attributes(data)
- new_ticket.set_rspec(rspec)
- #new_ticket.set_parent(api.auth.hierarchy.get_auth_ticket(auth_hrn))
- new_ticket.encode()
- new_ticket.sign()
-
- return new_ticket.save_to_string(save_parents=True)
-
-
-
-#def main():
-# """
-# rspec = ListResources(api, "plc.princeton.sapan", None, 'pl_test_sapan')
-# #rspec = ListResources(api, "plc.princeton.coblitz", None, 'pl_test_coblitz')
-# #rspec = ListResources(api, "plc.pl.sirius", None, 'pl_test_sirius')
-# print rspec
-# """
-# api = PlcSfaApi()
-# f = open(sys.argv[1])
-# xml = f.read()
-# f.close()
-##Error (E1120, main): No value passed for parameter 'users' in function call
-##Error (E1120, main): No value passed for parameter 'call_id' in function call
-# CreateSliver(api, "plc.princeton.sapan", xml, 'CreateSliver_sapan')
-#
-#if __name__ == "__main__":
-# main()
+ def reset_slice(self, api, xrn):
+ # XX not implemented at this interface
+ return 1
+
+ def DeleteSliver(self, api, xrn, creds, call_id):
+ if Callids().already_handled(call_id): return ""
+ (hrn, _) = urn_to_hrn(xrn)
+ slicename = hrn_to_pl_slicename(hrn)
+ slices = api.driver.GetSlices({'name': slicename})
+ if not slices:
+ return 1
+ slice = slices[0]
+
+ # determine if this is a peer slice
+ peer = peers.get_peer(api, hrn)
+ try:
+ if peer:
+ api.driver.UnBindObjectFromPeer('slice', slice['slice_id'], peer)
+ api.driver.DeleteSliceFromNodes(slicename, slice['node_ids'])
+ finally:
+ if peer:
+ api.driver.BindObjectToPeer('slice', slice['slice_id'], peer, slice['peer_slice_id'])
+ return 1
+
+ def ListSlices(self, api, creds, call_id):
+ if Callids().already_handled(call_id): return []
+ # look in cache first
+ if self.caching and api.cache:
+ slices = api.cache.get('slices')
+ if slices:
+ return slices
+
+ # get data from db
+ slices = api.driver.GetSlices({'peer_id': None}, ['name'])
+ slice_hrns = [slicename_to_hrn(api.hrn, slice['name']) for slice in slices]
+ slice_urns = [hrn_to_urn(slice_hrn, 'slice') for slice_hrn in slice_hrns]
+
+ # cache the result
+ if self.caching and api.cache:
+ api.cache.add('slices', slice_urns)
+
+ return slice_urns
+
+ def ListResources(self, api, creds, options, call_id):
+ if Callids().already_handled(call_id): return ""
+ # get slice's hrn from options
+ xrn = options.get('geni_slice_urn', None)
+ (hrn, _) = urn_to_hrn(xrn)
+
+ version_manager = VersionManager()
+ # get the rspec's return format from options
+ rspec_version = version_manager.get_version(options.get('rspec_version'))
+ version_string = "rspec_%s" % (rspec_version.to_string())
+
+ #panos adding the info option to the caching key (can be improved)
+ if options.get('info'):
+ version_string = version_string + "_"+options.get('info', 'default')
+
+ # look in cache first
+ if self.caching and api.cache and not xrn:
+ rspec = api.cache.get(version_string)
+ if rspec:
+ api.logger.info("aggregate.ListResources: returning cached value for hrn %s"%hrn)
+ return rspec
+
+ #panos: passing user-defined options
+ #print "manager options = ",options
+ aggregate = Aggregate(api, options)
+ rspec = aggregate.get_rspec(slice_xrn=xrn, version=rspec_version)
+
+ # cache the result
+ if self.caching and api.cache and not xrn:
+ api.cache.add(version_string, rspec)
+
+ return rspec
+
+
+ def get_ticket(self, api, xrn, creds, rspec, users):
+
+ (slice_hrn, _) = urn_to_hrn(xrn)
+ slices = Slices(api)
+ peer = slices.get_peer(slice_hrn)
+ sfa_peer = slices.get_sfa_peer(slice_hrn)
+
+ # get the slice record
+ credential = api.getCredential()
+ interface = api.registries[api.hrn]
+ registry = api.server_proxy(interface, credential)
+ records = registry.Resolve(xrn, credential)
+
+ # make sure we get a local slice record
+ record = None
+ for tmp_record in records:
+ if tmp_record['type'] == 'slice' and \
+ not tmp_record['peer_authority']:
+ #Error (E0602, get_ticket): Undefined variable 'SliceRecord'
+ record = SliceRecord(dict=tmp_record)
+ if not record:
+ raise RecordNotFound(slice_hrn)
+
+ # similar to CreateSliver, we must verify that the required records exist
+ # at this aggregate before we can issue a ticket
+ # parse rspec
+ rspec = RSpec(rspec_string)
+ requested_attributes = rspec.version.get_slice_attributes()
+
+ # ensure site record exists
+ site = slices.verify_site(hrn, slice_record, peer, sfa_peer)
+ # ensure slice record exists
+ slice = slices.verify_slice(hrn, slice_record, peer, sfa_peer)
+ # ensure person records exists
+ persons = slices.verify_persons(hrn, slice, users, peer, sfa_peer)
+ # ensure slice attributes exists
+ slices.verify_slice_attributes(slice, requested_attributes)
+
+ # get sliver info
+ slivers = slices.get_slivers(slice_hrn)
+
+ if not slivers:
+ raise SliverDoesNotExist(slice_hrn)
+
+ # get initscripts
+ initscripts = []
+ data = {
+ 'timestamp': int(time.time()),
+ 'initscripts': initscripts,
+ 'slivers': slivers
+ }
+
+ # create the ticket
+ object_gid = record.get_gid_object()
+ new_ticket = SfaTicket(subject = object_gid.get_subject())
+ new_ticket.set_gid_caller(api.auth.client_gid)
+ new_ticket.set_gid_object(object_gid)
+ new_ticket.set_issuer(key=api.key, subject=api.hrn)
+ new_ticket.set_pubkey(object_gid.get_pubkey())
+ new_ticket.set_attributes(data)
+ new_ticket.set_rspec(rspec)
+ #new_ticket.set_parent(api.auth.hierarchy.get_auth_ticket(auth_hrn))
+ new_ticket.encode()
+ new_ticket.sign()
+
+ return new_ticket.save_to_string(save_parents=True)
from lxml import etree as ET
from sqlobject import *
-from sfa.util.faults import *
+from sfa.util.faults import InvalidRSpec,
from sfa.util.xrn import urn_to_hrn, Xrn
from sfa.util.plxrn import hrn_to_pl_slicename, slicename_to_hrn
from sfa.util.callids import Callids
-from sfa.util.sfalogging import logger
+#comes with its own logging
+#from sfa.util.sfalogging import logger
from sfa.util.version import version_core
from sfa.trust.credential import Credential
from sfa.server.sfaapi import SfaApi
from sfa.plc.aggregate import Aggregate
-from sfa.plc.slices import *
-from sfa.rspecs.sfa_rspec import sfa_rspec_version
-
-
-##
-# The data structure used to represent a cloud.
-# It contains the cloud name, its ip address, image information,
-# key pairs, and clusters information.
-#
-cloud = {}
-
-##
-# The location of the RelaxNG schema.
-#
-EUCALYPTUS_RSPEC_SCHEMA='/etc/sfa/eucalyptus.rng'
+from sfa.plc.slices import Slice, Slices
+# not sure what this used to be nor where it is now defined
+#from sfa.rspecs.sfa_rspec import sfa_rspec_version
##
# Meta data of an instance.
(self.image_id, self.kernel_id, self.ramdisk_id,
self.inst_type, self.key_pair))
- # XXX The return statement is for testing. REMOVE in production
- #return
-
try:
reservation = botoConn.run_instances(self.image_id,
kernel_id = self.kernel_id,
#slice_index = DatabaseIndex('slice_hrn')
instances = MultipleJoin('EucaInstance')
-##
-# Initialize the aggregate manager by reading a configuration file.
-#
-def init_server():
- logger = logging.getLogger('EucaAggregate')
- fileHandler = logging.FileHandler('/var/log/euca.log')
- fileHandler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
- logger.addHandler(fileHandler)
- fileHandler.setLevel(logging.DEBUG)
- logger.setLevel(logging.DEBUG)
-
- configParser = ConfigParser()
- configParser.read(['/etc/sfa/eucalyptus_aggregate.conf', 'eucalyptus_aggregate.conf'])
- if len(configParser.sections()) < 1:
- logger.error('No cloud defined in the config file')
- raise Exception('Cannot find cloud definition in configuration file.')
-
- # Only read the first section.
- cloudSec = configParser.sections()[0]
- cloud['name'] = cloudSec
- cloud['access_key'] = configParser.get(cloudSec, 'access_key')
- cloud['secret_key'] = configParser.get(cloudSec, 'secret_key')
- cloud['cloud_url'] = configParser.get(cloudSec, 'cloud_url')
- cloudURL = cloud['cloud_url']
- if cloudURL.find('https://') >= 0:
- cloudURL = cloudURL.replace('https://', '')
- elif cloudURL.find('http://') >= 0:
- cloudURL = cloudURL.replace('http://', '')
- (cloud['ip'], parts) = cloudURL.split(':')
-
- # Create image bundles
- images = getEucaConnection().get_all_images()
- cloud['images'] = images
- cloud['imageBundles'] = {}
- for i in images:
- if i.type != 'machine' or i.kernel_id is None: continue
- name = os.path.dirname(i.location)
- detail = {'imageID' : i.id, 'kernelID' : i.kernel_id, 'ramdiskID' : i.ramdisk_id}
- cloud['imageBundles'][name] = detail
-
- # Initialize sqlite3 database and tables.
- dbPath = '/etc/sfa/db'
- dbName = 'euca_aggregate.db'
-
- if not os.path.isdir(dbPath):
- logger.info('%s not found. Creating directory ...' % dbPath)
- os.mkdir(dbPath)
-
- conn = connectionForURI('sqlite://%s/%s' % (dbPath, dbName))
- sqlhub.processConnection = conn
- Slice.createTable(ifNotExists=True)
- EucaInstance.createTable(ifNotExists=True)
- Meta.createTable(ifNotExists=True)
-
- # Start the update process to keep track of the meta data
- # about Eucalyptus instance.
- Process(target=updateMeta).start()
-
- # Make sure the schema exists.
- if not os.path.exists(EUCALYPTUS_RSPEC_SCHEMA):
- err = 'Cannot location schema at %s' % EUCALYPTUS_RSPEC_SCHEMA
- logger.error(err)
- raise Exception(err)
-
-##
-# Creates a connection to Eucalytpus. This function is inspired by
-# the make_connection() in Euca2ools.
-#
-# @return A connection object or None
-#
-def getEucaConnection():
- global cloud
- accessKey = cloud['access_key']
- secretKey = cloud['secret_key']
- eucaURL = cloud['cloud_url']
- useSSL = False
- srvPath = '/'
- eucaPort = 8773
- logger = logging.getLogger('EucaAggregate')
-
- if not accessKey or not secretKey or not eucaURL:
- logger.error('Please set ALL of the required environment ' \
- 'variables by sourcing the eucarc file.')
- return None
-
- # Split the url into parts
- if eucaURL.find('https://') >= 0:
- useSSL = True
- eucaURL = eucaURL.replace('https://', '')
- elif eucaURL.find('http://') >= 0:
- useSSL = False
- eucaURL = eucaURL.replace('http://', '')
- (eucaHost, parts) = eucaURL.split(':')
- if len(parts) > 1:
- parts = parts.split('/')
- eucaPort = int(parts[0])
- parts = parts[1:]
- srvPath = '/'.join(parts)
-
- return boto.connect_ec2(aws_access_key_id=accessKey,
- aws_secret_access_key=secretKey,
- is_secure=useSSL,
- region=RegionInfo(None, 'eucalyptus', eucaHost),
- port=eucaPort,
- path=srvPath)
-
-##
-# Returns a string of keys that belong to the users of the given slice.
-# @param sliceHRN The hunman readable name of the slice.
-# @return sting()
-#
-# This method is no longer needed because the user keys are passed into
-# CreateSliver
-#
-def getKeysForSlice(api, sliceHRN):
- logger = logging.getLogger('EucaAggregate')
- cred = api.getCredential()
- registry = api.registries[api.hrn]
- keys = []
-
- # Get the slice record
- records = registry.Resolve(sliceHRN, cred)
- if not records:
- logging.warn('Cannot find any record for slice %s' % sliceHRN)
- return []
-
- # Find who can log into this slice
- persons = records[0]['persons']
-
- # Extract the keys from persons records
- for p in persons:
- sliceUser = registry.Resolve(p, cred)
- userKeys = sliceUser[0]['keys']
- keys += userKeys
-
- return '\n'.join(keys)
-
##
# A class that builds the RSpec for Eucalyptus.
#
return clusterList
-def ListResources(api, creds, options, call_id):
- if Callids().already_handled(call_id): return ""
- global cloud
- # get slice's hrn from options
- xrn = options.get('geni_slice_urn', '')
- hrn, type = urn_to_hrn(xrn)
- logger = logging.getLogger('EucaAggregate')
-
- # get hrn of the original caller
- origin_hrn = options.get('origin_hrn', None)
- if not origin_hrn:
- origin_hrn = Credential(string=creds[0]).get_gid_caller().get_hrn()
-
- conn = getEucaConnection()
-
- if not conn:
- logger.error('Cannot create a connection to Eucalyptus')
- return 'Cannot create a connection to Eucalyptus'
-
- try:
- # Zones
- zones = conn.get_all_zones(['verbose'])
- p = ZoneResultParser(zones)
- clusters = p.parse()
- cloud['clusters'] = clusters
-
- # Images
- images = conn.get_all_images()
- cloud['images'] = images
- cloud['imageBundles'] = {}
+class AggregateManagerEucalyptus:
+
+ # The data structure used to represent a cloud.
+ # It contains the cloud name, its ip address, image information,
+ # key pairs, and clusters information.
+ cloud = {}
+
+ # The location of the RelaxNG schema.
+ EUCALYPTUS_RSPEC_SCHEMA='/etc/sfa/eucalyptus.rng'
+
+ _inited=False
+
+ # the init_server mechanism has vanished
+ def __init__ (self):
+ if AggregateManagerEucalyptus._inited: return
+ AggregateManagerEucalyptus.init_server()
+
+ # Initialize the aggregate manager by reading a configuration file.
+ @staticmethod
+ def init_server():
+ logger = logging.getLogger('EucaAggregate')
+ fileHandler = logging.FileHandler('/var/log/euca.log')
+ fileHandler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
+ logger.addHandler(fileHandler)
+ fileHandler.setLevel(logging.DEBUG)
+ logger.setLevel(logging.DEBUG)
+
+ configParser = ConfigParser()
+ configParser.read(['/etc/sfa/eucalyptus_aggregate.conf', 'eucalyptus_aggregate.conf'])
+ if len(configParser.sections()) < 1:
+ logger.error('No cloud defined in the config file')
+ raise Exception('Cannot find cloud definition in configuration file.')
+
+ # Only read the first section.
+ cloudSec = configParser.sections()[0]
+ AggregateManagerEucalyptus.cloud['name'] = cloudSec
+ AggregateManagerEucalyptus.cloud['access_key'] = configParser.get(cloudSec, 'access_key')
+ AggregateManagerEucalyptus.cloud['secret_key'] = configParser.get(cloudSec, 'secret_key')
+ AggregateManagerEucalyptus.cloud['cloud_url'] = configParser.get(cloudSec, 'cloud_url')
+ cloudURL = AggregateManagerEucalyptus.cloud['cloud_url']
+ if cloudURL.find('https://') >= 0:
+ cloudURL = cloudURL.replace('https://', '')
+ elif cloudURL.find('http://') >= 0:
+ cloudURL = cloudURL.replace('http://', '')
+ (AggregateManagerEucalyptus.cloud['ip'], parts) = cloudURL.split(':')
+
+ # Create image bundles
+ images = self.getEucaConnection().get_all_images()
+ AggregateManagerEucalyptus.cloud['images'] = images
+ AggregateManagerEucalyptus.cloud['imageBundles'] = {}
for i in images:
if i.type != 'machine' or i.kernel_id is None: continue
name = os.path.dirname(i.location)
detail = {'imageID' : i.id, 'kernelID' : i.kernel_id, 'ramdiskID' : i.ramdisk_id}
- cloud['imageBundles'][name] = detail
-
- # Key Pairs
- keyPairs = conn.get_all_key_pairs()
- cloud['keypairs'] = keyPairs
-
- if hrn:
- instanceId = []
- instances = []
-
- # Get the instances that belong to the given slice from sqlite3
- # XXX use getOne() in production because the slice's hrn is supposed
- # to be unique. For testing, uniqueness is turned off in the db.
- # If the slice isn't found in the database, create a record for the
- # slice.
- matchedSlices = list(Slice.select(Slice.q.slice_hrn == hrn))
- if matchedSlices:
- theSlice = matchedSlices[-1]
- else:
- theSlice = Slice(slice_hrn = hrn)
- for instance in theSlice.instances:
- instanceId.append(instance.instance_id)
-
- # Get the information about those instances using their ids.
- if len(instanceId) > 0:
- reservations = conn.get_all_instances(instanceId)
- else:
- reservations = []
+ AggregateManagerEucalyptus.cloud['imageBundles'][name] = detail
+
+ # Initialize sqlite3 database and tables.
+ dbPath = '/etc/sfa/db'
+ dbName = 'euca_aggregate.db'
+
+ if not os.path.isdir(dbPath):
+ logger.info('%s not found. Creating directory ...' % dbPath)
+ os.mkdir(dbPath)
+
+ conn = connectionForURI('sqlite://%s/%s' % (dbPath, dbName))
+ sqlhub.processConnection = conn
+ Slice.createTable(ifNotExists=True)
+ EucaInstance.createTable(ifNotExists=True)
+ Meta.createTable(ifNotExists=True)
+
+ # Start the update process to keep track of the meta data
+ # about Eucalyptus instance.
+ Process(target=AggregateManagerEucalyptus.updateMeta).start()
+
+ # Make sure the schema exists.
+ if not os.path.exists(AggregateManagerEucalyptus.EUCALYPTUS_RSPEC_SCHEMA):
+ err = 'Cannot location schema at %s' % AggregateManagerEucalyptus.EUCALYPTUS_RSPEC_SCHEMA
+ logger.error(err)
+ raise Exception(err)
+
+ #
+ # A separate process that will update the meta data.
+ #
+ @staticmethod
+ def updateMeta():
+ logger = logging.getLogger('EucaMeta')
+ fileHandler = logging.FileHandler('/var/log/euca_meta.log')
+ fileHandler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
+ logger.addHandler(fileHandler)
+ fileHandler.setLevel(logging.DEBUG)
+ logger.setLevel(logging.DEBUG)
+
+ while True:
+ sleep(30)
+
+ # Get IDs of the instances that don't have IPs yet.
+ dbResults = Meta.select(
+ AND(Meta.q.pri_addr == None,
+ Meta.q.state != 'deleted')
+ )
+ dbResults = list(dbResults)
+ logger.debug('[update process] dbResults: %s' % dbResults)
+ instids = []
+ for r in dbResults:
+ if not r.instance:
+ continue
+ instids.append(r.instance.instance_id)
+ logger.debug('[update process] Instance Id: %s' % ', '.join(instids))
+
+ # Get instance information from Eucalyptus
+ conn = self.getEucaConnection()
+ vmInstances = []
+ reservations = conn.get_all_instances(instids)
for reservation in reservations:
- for instance in reservation.instances:
- instances.append(instance)
-
- # Construct a dictionary for the EucaRSpecBuilder
- instancesDict = {}
- for instance in instances:
- instList = instancesDict.setdefault(instance.instance_type, [])
- instInfoDict = {}
-
- instInfoDict['id'] = instance.id
- instInfoDict['public_dns'] = instance.public_dns_name
- instInfoDict['state'] = instance.state
- instInfoDict['key'] = instance.key_name
-
- instList.append(instInfoDict)
- cloud['instances'] = instancesDict
-
- except EC2ResponseError, ec2RespErr:
- errTree = ET.fromstring(ec2RespErr.body)
- errMsgE = errTree.find('.//Message')
- logger.error(errMsgE.text)
-
- rspec = EucaRSpecBuilder(cloud).toXML()
-
- # Remove the instances records so next time they won't
- # show up.
- if 'instances' in cloud:
- del cloud['instances']
-
- return rspec
-
-"""
-Hook called via 'sfi.py create'
-"""
-def CreateSliver(api, slice_xrn, creds, xml, users, call_id):
- if Callids().already_handled(call_id): return ""
-
- global cloud
- logger = logging.getLogger('EucaAggregate')
- logger.debug("In CreateSliver")
-
- aggregate = Aggregate(api)
- slices = Slices(api)
- (hrn, type) = urn_to_hrn(slice_xrn)
- peer = slices.get_peer(hrn)
- sfa_peer = slices.get_sfa_peer(hrn)
- slice_record=None
- if users:
- slice_record = users[0].get('slice_record', {})
-
- conn = getEucaConnection()
- if not conn:
- logger.error('Cannot create a connection to Eucalyptus')
- return ""
-
- # Validate RSpec
- schemaXML = ET.parse(EUCALYPTUS_RSPEC_SCHEMA)
- rspecValidator = ET.RelaxNG(schemaXML)
- rspecXML = ET.XML(xml)
- for network in rspecXML.iterfind("./network"):
- if network.get('name') != cloud['name']:
- # Throw away everything except my own RSpec
- # sfa_logger().error("CreateSliver: deleting %s from rspec"%network.get('id'))
- network.getparent().remove(network)
- if not rspecValidator(rspecXML):
- error = rspecValidator.error_log.last_error
- message = '%s (line %s)' % (error.message, error.line)
- raise InvalidRSpec(message)
-
+ vmInstances += reservation.instances
+
+ # Check the IPs
+ instIPs = [ {'id':i.id, 'pri_addr':i.private_dns_name, 'pub_addr':i.public_dns_name}
+ for i in vmInstances if i.private_dns_name != '0.0.0.0' ]
+ logger.debug('[update process] IP dict: %s' % str(instIPs))
+
+ # Update the local DB
+ for ipData in instIPs:
+ dbInst = EucaInstance.select(EucaInstance.q.instance_id == ipData['id']).getOne(None)
+ if not dbInst:
+ logger.info('[update process] Could not find %s in DB' % ipData['id'])
+ continue
+ dbInst.meta.pri_addr = ipData['pri_addr']
+ dbInst.meta.pub_addr = ipData['pub_addr']
+ dbInst.meta.state = 'running'
+
+ self.dumpinstanceInfo()
+
+ ##
+ # Creates a connection to Eucalytpus. This function is inspired by
+ # the make_connection() in Euca2ools.
+ #
+ # @return A connection object or None
+ #
+ def getEucaConnection():
+ accessKey = AggregateManagerEucalyptus.cloud['access_key']
+ secretKey = AggregateManagerEucalyptus.cloud['secret_key']
+ eucaURL = AggregateManagerEucalyptus.cloud['cloud_url']
+ useSSL = False
+ srvPath = '/'
+ eucaPort = 8773
+ logger = logging.getLogger('EucaAggregate')
+
+ if not accessKey or not secretKey or not eucaURL:
+ logger.error('Please set ALL of the required environment ' \
+ 'variables by sourcing the eucarc file.')
+ return None
+
+ # Split the url into parts
+ if eucaURL.find('https://') >= 0:
+ useSSL = True
+ eucaURL = eucaURL.replace('https://', '')
+ elif eucaURL.find('http://') >= 0:
+ useSSL = False
+ eucaURL = eucaURL.replace('http://', '')
+ (eucaHost, parts) = eucaURL.split(':')
+ if len(parts) > 1:
+ parts = parts.split('/')
+ eucaPort = int(parts[0])
+ parts = parts[1:]
+ srvPath = '/'.join(parts)
+
+ return boto.connect_ec2(aws_access_key_id=accessKey,
+ aws_secret_access_key=secretKey,
+ is_secure=useSSL,
+ region=RegionInfo(None, 'eucalyptus', eucaHost),
+ port=eucaPort,
+ path=srvPath)
+
+ def ListResources(api, creds, options, call_id):
+ if Callids().already_handled(call_id): return ""
+ # get slice's hrn from options
+ xrn = options.get('geni_slice_urn', '')
+ hrn, type = urn_to_hrn(xrn)
+ logger = logging.getLogger('EucaAggregate')
+
+ # get hrn of the original caller
+ origin_hrn = options.get('origin_hrn', None)
+ if not origin_hrn:
+ origin_hrn = Credential(string=creds[0]).get_gid_caller().get_hrn()
+
+ conn = self.getEucaConnection()
+
+ if not conn:
+ logger.error('Cannot create a connection to Eucalyptus')
+ return 'Cannot create a connection to Eucalyptus'
+
+ try:
+ # Zones
+ zones = conn.get_all_zones(['verbose'])
+ p = ZoneResultParser(zones)
+ clusters = p.parse()
+ AggregateManagerEucalyptus.cloud['clusters'] = clusters
+
+ # Images
+ images = conn.get_all_images()
+ AggregateManagerEucalyptus.cloud['images'] = images
+ AggregateManagerEucalyptus.cloud['imageBundles'] = {}
+ for i in images:
+ if i.type != 'machine' or i.kernel_id is None: continue
+ name = os.path.dirname(i.location)
+ detail = {'imageID' : i.id, 'kernelID' : i.kernel_id, 'ramdiskID' : i.ramdisk_id}
+ AggregateManagerEucalyptus.cloud['imageBundles'][name] = detail
+
+ # Key Pairs
+ keyPairs = conn.get_all_key_pairs()
+ AggregateManagerEucalyptus.cloud['keypairs'] = keyPairs
+
+ if hrn:
+ instanceId = []
+ instances = []
+
+ # Get the instances that belong to the given slice from sqlite3
+ # XXX use getOne() in production because the slice's hrn is supposed
+ # to be unique. For testing, uniqueness is turned off in the db.
+ # If the slice isn't found in the database, create a record for the
+ # slice.
+ matchedSlices = list(Slice.select(Slice.q.slice_hrn == hrn))
+ if matchedSlices:
+ theSlice = matchedSlices[-1]
+ else:
+ theSlice = Slice(slice_hrn = hrn)
+ for instance in theSlice.instances:
+ instanceId.append(instance.instance_id)
+
+ # Get the information about those instances using their ids.
+ if len(instanceId) > 0:
+ reservations = conn.get_all_instances(instanceId)
+ else:
+ reservations = []
+ for reservation in reservations:
+ for instance in reservation.instances:
+ instances.append(instance)
+
+ # Construct a dictionary for the EucaRSpecBuilder
+ instancesDict = {}
+ for instance in instances:
+ instList = instancesDict.setdefault(instance.instance_type, [])
+ instInfoDict = {}
+
+ instInfoDict['id'] = instance.id
+ instInfoDict['public_dns'] = instance.public_dns_name
+ instInfoDict['state'] = instance.state
+ instInfoDict['key'] = instance.key_name
+
+ instList.append(instInfoDict)
+ AggregateManagerEucalyptus.cloud['instances'] = instancesDict
+
+ except EC2ResponseError, ec2RespErr:
+ errTree = ET.fromstring(ec2RespErr.body)
+ errMsgE = errTree.find('.//Message')
+ logger.error(errMsgE.text)
+
+ rspec = EucaRSpecBuilder(AggregateManagerEucalyptus.cloud).toXML()
+
+ # Remove the instances records so next time they won't
+ # show up.
+ if 'instances' in AggregateManagerEucalyptus.cloud:
+ del AggregateManagerEucalyptus.cloud['instances']
+
+ return rspec
+
"""
- Create the sliver[s] (slice) at this aggregate.
- Verify HRN and initialize the slice record in PLC if necessary.
+ Hook called via 'sfi.py create'
"""
-
- # ensure site record exists
- site = slices.verify_site(hrn, slice_record, peer, sfa_peer)
- # ensure slice record exists
- slice = slices.verify_slice(hrn, slice_record, peer, sfa_peer)
- # ensure person records exists
- persons = slices.verify_persons(hrn, slice, users, peer, sfa_peer)
-
- # Get the slice from db or create one.
- s = Slice.select(Slice.q.slice_hrn == hrn).getOne(None)
- if s is None:
- s = Slice(slice_hrn = hrn)
-
- # Process any changes in existing instance allocation
- pendingRmInst = []
- for sliceInst in s.instances:
- pendingRmInst.append(sliceInst.instance_id)
- existingInstGroup = rspecXML.findall(".//euca_instances")
- for instGroup in existingInstGroup:
- for existingInst in instGroup:
- if existingInst.get('id') in pendingRmInst:
- pendingRmInst.remove(existingInst.get('id'))
- for inst in pendingRmInst:
- dbInst = EucaInstance.select(EucaInstance.q.instance_id == inst).getOne(None)
- if dbInst.meta.state != 'deleted':
- logger.debug('Instance %s will be terminated' % inst)
- # Terminate instances one at a time for robustness
- conn.terminate_instances([inst])
- # Only change the state but do not remove the entry from the DB.
- dbInst.meta.state = 'deleted'
- #dbInst.destroySelf()
-
- # Process new instance requests
- requests = rspecXML.findall(".//request")
- if requests:
- # Get all the public keys associate with slice.
- keys = []
- for user in users:
- keys += user['keys']
- logger.debug("Keys: %s" % user['keys'])
- pubKeys = '\n'.join(keys)
- logger.debug('Passing the following keys to the instance:\n%s' % pubKeys)
- for req in requests:
- vmTypeElement = req.getparent()
- instType = vmTypeElement.get('name')
- numInst = int(req.find('instances').text)
-
- bundleName = req.find('bundle').text
- if not cloud['imageBundles'][bundleName]:
- logger.error('Cannot find bundle %s' % bundleName)
- bundleInfo = cloud['imageBundles'][bundleName]
- instKernel = bundleInfo['kernelID']
- instDiskImg = bundleInfo['imageID']
- instRamDisk = bundleInfo['ramdiskID']
- instKey = None
-
- # Create the instances
- for i in range(0, numInst):
- eucaInst = EucaInstance(slice = s,
- kernel_id = instKernel,
- image_id = instDiskImg,
- ramdisk_id = instRamDisk,
- key_pair = instKey,
- inst_type = instType,
- meta = Meta(start_time=datetime.datetime.now()))
- eucaInst.reserveInstance(conn, pubKeys)
-
- # xxx - should return altered rspec
- # with enough data for the client to understand what's happened
- return xml
-
-##
-# Return information on the IP addresses bound to each slice's instances
-#
-def dumpInstanceInfo():
- logger = logging.getLogger('EucaMeta')
- outdir = "/var/www/html/euca/"
- outfile = outdir + "instances.txt"
-
- try:
- os.makedirs(outdir)
- except OSError, e:
- if e.errno != errno.EEXIST:
- raise
-
- dbResults = Meta.select(
- AND(Meta.q.pri_addr != None,
- Meta.q.state == 'running')
- )
- dbResults = list(dbResults)
- f = open(outfile, "w")
- for r in dbResults:
- instId = r.instance.instance_id
- ipaddr = r.pri_addr
- hrn = r.instance.slice.slice_hrn
- logger.debug('[dumpInstanceInfo] %s %s %s' % (instId, ipaddr, hrn))
- f.write("%s %s %s\n" % (instId, ipaddr, hrn))
- f.close()
-
-##
-# A separate process that will update the meta data.
-#
-def updateMeta():
- logger = logging.getLogger('EucaMeta')
- fileHandler = logging.FileHandler('/var/log/euca_meta.log')
- fileHandler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
- logger.addHandler(fileHandler)
- fileHandler.setLevel(logging.DEBUG)
- logger.setLevel(logging.DEBUG)
-
- while True:
- sleep(30)
-
- # Get IDs of the instances that don't have IPs yet.
+ def CreateSliver(api, slice_xrn, creds, xml, users, call_id):
+ if Callids().already_handled(call_id): return ""
+
+ logger = logging.getLogger('EucaAggregate')
+ logger.debug("In CreateSliver")
+
+ aggregate = Aggregate(api)
+ slices = Slices(api)
+ (hrn, type) = urn_to_hrn(slice_xrn)
+ peer = slices.get_peer(hrn)
+ sfa_peer = slices.get_sfa_peer(hrn)
+ slice_record=None
+ if users:
+ slice_record = users[0].get('slice_record', {})
+
+ conn = self.getEucaConnection()
+ if not conn:
+ logger.error('Cannot create a connection to Eucalyptus')
+ return ""
+
+ # Validate RSpec
+ schemaXML = ET.parse(AggregateManagerEucalyptus.EUCALYPTUS_RSPEC_SCHEMA)
+ rspecValidator = ET.RelaxNG(schemaXML)
+ rspecXML = ET.XML(xml)
+ for network in rspecXML.iterfind("./network"):
+ if network.get('name') != AggregateManagerEucalyptus.cloud['name']:
+ # Throw away everything except my own RSpec
+ # sfa_logger().error("CreateSliver: deleting %s from rspec"%network.get('id'))
+ network.getparent().remove(network)
+ if not rspecValidator(rspecXML):
+ error = rspecValidator.error_log.last_error
+ message = '%s (line %s)' % (error.message, error.line)
+ raise InvalidRSpec(message)
+
+ """
+ Create the sliver[s] (slice) at this aggregate.
+ Verify HRN and initialize the slice record in PLC if necessary.
+ """
+
+ # ensure site record exists
+ site = slices.verify_site(hrn, slice_record, peer, sfa_peer)
+ # ensure slice record exists
+ slice = slices.verify_slice(hrn, slice_record, peer, sfa_peer)
+ # ensure person records exists
+ persons = slices.verify_persons(hrn, slice, users, peer, sfa_peer)
+
+ # Get the slice from db or create one.
+ s = Slice.select(Slice.q.slice_hrn == hrn).getOne(None)
+ if s is None:
+ s = Slice(slice_hrn = hrn)
+
+ # Process any changes in existing instance allocation
+ pendingRmInst = []
+ for sliceInst in s.instances:
+ pendingRmInst.append(sliceInst.instance_id)
+ existingInstGroup = rspecXML.findall(".//euca_instances")
+ for instGroup in existingInstGroup:
+ for existingInst in instGroup:
+ if existingInst.get('id') in pendingRmInst:
+ pendingRmInst.remove(existingInst.get('id'))
+ for inst in pendingRmInst:
+ dbInst = EucaInstance.select(EucaInstance.q.instance_id == inst).getOne(None)
+ if dbInst.meta.state != 'deleted':
+ logger.debug('Instance %s will be terminated' % inst)
+ # Terminate instances one at a time for robustness
+ conn.terminate_instances([inst])
+ # Only change the state but do not remove the entry from the DB.
+ dbInst.meta.state = 'deleted'
+ #dbInst.destroySelf()
+
+ # Process new instance requests
+ requests = rspecXML.findall(".//request")
+ if requests:
+ # Get all the public keys associate with slice.
+ keys = []
+ for user in users:
+ keys += user['keys']
+ logger.debug("Keys: %s" % user['keys'])
+ pubKeys = '\n'.join(keys)
+ logger.debug('Passing the following keys to the instance:\n%s' % pubKeys)
+ for req in requests:
+ vmTypeElement = req.getparent()
+ instType = vmTypeElement.get('name')
+ numInst = int(req.find('instances').text)
+
+ bundleName = req.find('bundle').text
+ if not AggregateManagerEucalyptus.cloud['imageBundles'][bundleName]:
+ logger.error('Cannot find bundle %s' % bundleName)
+ bundleInfo = AggregateManagerEucalyptus.cloud['imageBundles'][bundleName]
+ instKernel = bundleInfo['kernelID']
+ instDiskImg = bundleInfo['imageID']
+ instRamDisk = bundleInfo['ramdiskID']
+ instKey = None
+
+ # Create the instances
+ for i in range(0, numInst):
+ eucaInst = EucaInstance(slice = s,
+ kernel_id = instKernel,
+ image_id = instDiskImg,
+ ramdisk_id = instRamDisk,
+ key_pair = instKey,
+ inst_type = instType,
+ meta = Meta(start_time=datetime.datetime.now()))
+ eucaInst.reserveInstance(conn, pubKeys)
+
+ # xxx - should return altered rspec
+ # with enough data for the client to understand what's happened
+ return xml
+
+ ##
+ # Return information on the IP addresses bound to each slice's instances
+ #
+ def dumpInstanceInfo():
+ logger = logging.getLogger('EucaMeta')
+ outdir = "/var/www/html/euca/"
+ outfile = outdir + "instances.txt"
+
+ try:
+ os.makedirs(outdir)
+ except OSError, e:
+ if e.errno != errno.EEXIST:
+ raise
+
dbResults = Meta.select(
- AND(Meta.q.pri_addr == None,
- Meta.q.state != 'deleted')
- )
+ AND(Meta.q.pri_addr != None,
+ Meta.q.state == 'running')
+ )
dbResults = list(dbResults)
- logger.debug('[update process] dbResults: %s' % dbResults)
- instids = []
+ f = open(outfile, "w")
for r in dbResults:
- if not r.instance:
- continue
- instids.append(r.instance.instance_id)
- logger.debug('[update process] Instance Id: %s' % ', '.join(instids))
-
- # Get instance information from Eucalyptus
- conn = getEucaConnection()
- vmInstances = []
- reservations = conn.get_all_instances(instids)
- for reservation in reservations:
- vmInstances += reservation.instances
-
- # Check the IPs
- instIPs = [ {'id':i.id, 'pri_addr':i.private_dns_name, 'pub_addr':i.public_dns_name}
- for i in vmInstances if i.private_dns_name != '0.0.0.0' ]
- logger.debug('[update process] IP dict: %s' % str(instIPs))
-
- # Update the local DB
- for ipData in instIPs:
- dbInst = EucaInstance.select(EucaInstance.q.instance_id == ipData['id']).getOne(None)
- if not dbInst:
- logger.info('[update process] Could not find %s in DB' % ipData['id'])
- continue
- dbInst.meta.pri_addr = ipData['pri_addr']
- dbInst.meta.pub_addr = ipData['pub_addr']
- dbInst.meta.state = 'running'
-
- dumpInstanceInfo()
-
-def GetVersion(api):
- xrn=Xrn(api.hrn)
- request_rspec_versions = [dict(sfa_rspec_version)]
- ad_rspec_versions = [dict(sfa_rspec_version)]
- version_more = {'interface':'aggregate',
- 'testbed':'myplc',
- 'hrn':xrn.get_hrn(),
- 'request_rspec_versions': request_rspec_versions,
- 'ad_rspec_versions': ad_rspec_versions,
- 'default_ad_rspec': dict(sfa_rspec_version)
- }
- return version_core(version_more)
-
-#def main():
-# init_server()
-#
-# #theRSpec = None
-# #with open(sys.argv[1]) as xml:
-# # theRSpec = xml.read()
-# #CreateSliver(None, 'planetcloud.pc.test', theRSpec, 'call-id-cloudtest')
-#
-# #rspec = ListResources('euca', 'planetcloud.pc.test', 'planetcloud.pc.marcoy', 'test_euca')
-# #print rspec
-#
-# server_key_file = '/var/lib/sfa/authorities/server.key'
-# server_cert_file = '/var/lib/sfa/authorities/server.cert'
-# api = PlcSfaApi(key_file = server_key_file, cert_file = server_cert_file, interface='aggregate')
-# print getKeysForSlice(api, 'gc.gc.test1')
-#
-#if __name__ == "__main__":
-# main()
+ instId = r.instance.instance_id
+ ipaddr = r.pri_addr
+ hrn = r.instance.slice.slice_hrn
+ logger.debug('[dumpInstanceInfo] %s %s %s' % (instId, ipaddr, hrn))
+ f.write("%s %s %s\n" % (instId, ipaddr, hrn))
+ f.close()
+
+ def GetVersion(api):
+ xrn=Xrn(api.hrn)
+ request_rspec_versions = [dict(sfa_rspec_version)]
+ ad_rspec_versions = [dict(sfa_rspec_version)]
+ version_more = {'interface':'aggregate',
+ 'testbed':'myplc',
+ 'hrn':xrn.get_hrn(),
+ 'request_rspec_versions': request_rspec_versions,
+ 'ad_rspec_versions': ad_rspec_versions,
+ 'default_ad_rspec': dict(sfa_rspec_version)
+ }
+ return version_core(version_more)
-import os\r
-import time\r
-import re\r
-\r
-from sfa.util.faults import *\r
-from sfa.util.sfalogging import logger\r
-from sfa.util.config import Config\r
-from sfa.util.sfatime import utcparse\r
-from sfa.util.callids import Callids\r
-from sfa.util.version import version_core\r
-from sfa.util.xrn import urn_to_hrn, hrn_to_urn, get_authority, Xrn\r
-from sfa.util.plxrn import hrn_to_pl_slicename\r
-\r
-from sfa.server.sfaapi import SfaApi\r
-from sfa.server.registry import Registries\r
-from sfa.rspecs.rspec_version import RSpecVersion\r
-from sfa.rspecs.sfa_rspec import sfa_rspec_version\r
-from sfa.rspecs.rspec_parser import parse_rspec\r
-\r
-from sfa.managers.aggregate_manager import __get_registry_objects, ListSlices\r
-\r
-from sfa.plc.slices import Slices\r
-\r
-\r
-RSPEC_TMP_FILE_PREFIX = "/tmp/max_rspec"\r
-\r
-# execute shell command and return both exit code and text output\r
-def shell_execute(cmd, timeout):\r
- pipe = os.popen('{ ' + cmd + '; } 2>&1', 'r')\r
- pipe = os.popen(cmd + ' 2>&1', 'r')\r
- text = ''\r
- while timeout:\r
- line = pipe.read()\r
- text += line\r
- time.sleep(1)\r
- timeout = timeout-1\r
- code = pipe.close()\r
- if code is None: code = 0\r
- if text[-1:] == '\n': text = text[:-1]\r
- return code, text\r
-\r
-"""\r
- call AM API client with command like in the following example:\r
- cd aggregate_client; java -classpath AggregateWS-client-api.jar:lib/* \\r
- net.geni.aggregate.client.examples.CreateSliceNetworkClient \\r
- ./repo https://geni:8443/axis2/services/AggregateGENI \\r
- ... params ...\r
-"""\r
-\r
-def call_am_apiclient(client_app, params, timeout):\r
- (client_path, am_url) = Config().get_max_aggrMgr_info()\r
- sys_cmd = "cd " + client_path + "; java -classpath AggregateWS-client-api.jar:lib/* net.geni.aggregate.client.examples." + client_app + " ./repo " + am_url + " " + ' '.join(params)\r
- ret = shell_execute(sys_cmd, timeout)\r
- logger.debug("shell_execute cmd: %s returns %s" % (sys_cmd, ret))\r
- return ret\r
-\r
-# save request RSpec xml content to a tmp file\r
-def save_rspec_to_file(rspec):\r
- path = RSPEC_TMP_FILE_PREFIX + "_" + time.strftime('%Y%m%dT%H:%M:%S', time.gmtime(time.time())) +".xml"\r
- file = open(path, "w")\r
- file.write(rspec)\r
- file.close()\r
- return path\r
-\r
-# get stripped down slice id/name plc.maxpl.xislice1 --> maxpl_xislice1\r
-def get_plc_slice_id(cred, xrn):\r
- (hrn, type) = urn_to_hrn(xrn)\r
- slice_id = hrn.find(':')\r
- sep = '.'\r
- if hrn.find(':') != -1:\r
- sep=':'\r
- elif hrn.find('+') != -1:\r
- sep='+'\r
- else:\r
- sep='.'\r
- slice_id = hrn.split(sep)[-2] + '_' + hrn.split(sep)[-1]\r
- return slice_id\r
-\r
-# extract xml \r
-def get_xml_by_tag(text, tag):\r
- indx1 = text.find('<'+tag)\r
- indx2 = text.find('/'+tag+'>')\r
- xml = None\r
- if indx1!=-1 and indx2>indx1:\r
- xml = text[indx1:indx2+len(tag)+2]\r
- return xml\r
-\r
-def prepare_slice(api, slice_xrn, creds, users):\r
- reg_objects = __get_registry_objects(slice_xrn, creds, users)\r
- (hrn, type) = urn_to_hrn(slice_xrn)\r
- slices = Slices(api)\r
- peer = slices.get_peer(hrn)\r
- sfa_peer = slices.get_sfa_peer(hrn)\r
- slice_record=None\r
- if users:\r
- slice_record = users[0].get('slice_record', {})\r
- registry = api.registries[api.hrn]\r
- credential = api.getCredential()\r
- # ensure site record exists\r
- site = slices.verify_site(hrn, slice_record, peer, sfa_peer)\r
- # ensure slice record exists\r
- slice = slices.verify_slice(hrn, slice_record, peer, sfa_peer)\r
- # ensure person records exists\r
- persons = slices.verify_persons(hrn, slice, users, peer, sfa_peer)\r
-\r
-def parse_resources(text, slice_xrn):\r
- resources = []\r
- urn = hrn_to_urn(slice_xrn, 'sliver')\r
- plc_slice = re.search("Slice Status => ([^\n]+)", text)\r
- if plc_slice.group(1) != 'NONE':\r
- res = {}\r
- res['geni_urn'] = urn + '_plc_slice'\r
- res['geni_error'] = ''\r
- res['geni_status'] = 'unknown'\r
- if plc_slice.group(1) == 'CREATED':\r
- res['geni_status'] = 'ready'\r
- resources.append(res)\r
- vlans = re.findall("GRI => ([^\n]+)\n\t Status => ([^\n]+)", text)\r
- for vlan in vlans:\r
- res = {}\r
- res['geni_error'] = ''\r
- res['geni_urn'] = urn + '_vlan_' + vlan[0]\r
- if vlan[1] == 'ACTIVE':\r
- res['geni_status'] = 'ready'\r
- elif vlan[1] == 'FAILED':\r
- res['geni_status'] = 'failed'\r
- else:\r
- res['geni_status'] = 'configuring'\r
- resources.append(res)\r
- return resources\r
-\r
-def slice_status(api, slice_xrn, creds):\r
- urn = hrn_to_urn(slice_xrn, 'slice')\r
- result = {}\r
- top_level_status = 'unknown'\r
- slice_id = get_plc_slice_id(creds, urn)\r
- (ret, output) = call_am_apiclient("QuerySliceNetworkClient", [slice_id,], 5)\r
- # parse output into rspec XML\r
- if output.find("Unkown Rspec:") > 0:\r
- top_level_staus = 'failed'\r
- result['geni_resources'] = ''\r
- else:\r
- has_failure = 0\r
- all_active = 0\r
- if output.find("Status => FAILED") > 0:\r
- top_level_staus = 'failed'\r
- elif ( output.find("Status => ACCEPTED") > 0 or output.find("Status => PENDING") > 0\r
- or output.find("Status => INSETUP") > 0 or output.find("Status => INCREATE") > 0\r
- ):\r
- top_level_status = 'configuring'\r
- else:\r
- top_level_status = 'ready'\r
- result['geni_resources'] = parse_resources(output, slice_xrn)\r
- result['geni_urn'] = urn\r
- result['geni_status'] = top_level_status\r
- return result\r
-\r
-def create_slice(api, xrn, cred, rspec, users):\r
- indx1 = rspec.find("<RSpec")\r
- indx2 = rspec.find("</RSpec>")\r
- if indx1 > -1 and indx2 > indx1:\r
- rspec = rspec[indx1+len("<RSpec type=\"SFA\">"):indx2-1]\r
- rspec_path = save_rspec_to_file(rspec)\r
- prepare_slice(api, xrn, cred, users)\r
- slice_id = get_plc_slice_id(cred, xrn)\r
- sys_cmd = "sed -i \"s/rspec id=\\\"[^\\\"]*/rspec id=\\\"" +slice_id+ "/g\" " + rspec_path + ";sed -i \"s/:rspec=[^:'<\\\" ]*/:rspec=" +slice_id+ "/g\" " + rspec_path\r
- ret = shell_execute(sys_cmd, 1)\r
- sys_cmd = "sed -i \"s/rspec id=\\\"[^\\\"]*/rspec id=\\\"" + rspec_path + "/g\""\r
- ret = shell_execute(sys_cmd, 1)\r
- (ret, output) = call_am_apiclient("CreateSliceNetworkClient", [rspec_path,], 3)\r
- # parse output ?\r
- rspec = "<RSpec type=\"SFA\"> Done! </RSpec>"\r
- return True\r
-\r
-def delete_slice(api, xrn, cred):\r
- slice_id = get_plc_slice_id(cred, xrn)\r
- (ret, output) = call_am_apiclient("DeleteSliceNetworkClient", [slice_id,], 3)\r
- # parse output ?\r
- return 1\r
-\r
-\r
-def get_rspec(api, cred, slice_urn):\r
- logger.debug("#### called max-get_rspec")\r
- #geni_slice_urn: urn:publicid:IDN+plc:maxpl+slice+xi_rspec_test1\r
- if slice_urn == None:\r
- (ret, output) = call_am_apiclient("GetResourceTopology", ['all', '\"\"'], 5)\r
- else:\r
- slice_id = get_plc_slice_id(cred, slice_urn)\r
- (ret, output) = call_am_apiclient("GetResourceTopology", ['all', slice_id,], 5)\r
- # parse output into rspec XML\r
- if output.find("No resouce found") > 0:\r
- rspec = "<RSpec type=\"SFA\"> <Fault>No resource found</Fault> </RSpec>"\r
- else:\r
- comp_rspec = get_xml_by_tag(output, 'computeResource')\r
- logger.debug("#### computeResource %s" % comp_rspec)\r
- topo_rspec = get_xml_by_tag(output, 'topology')\r
- logger.debug("#### topology %s" % topo_rspec)\r
- rspec = "<RSpec type=\"SFA\"> <network name=\"" + Config().get_interface_hrn() + "\">";\r
- if comp_rspec != None:\r
- rspec = rspec + get_xml_by_tag(output, 'computeResource')\r
- if topo_rspec != None:\r
- rspec = rspec + get_xml_by_tag(output, 'topology')\r
- rspec = rspec + "</network> </RSpec>"\r
- return (rspec)\r
-\r
-def start_slice(api, xrn, cred):\r
- # service not supported\r
- return None\r
-\r
-def stop_slice(api, xrn, cred):\r
- # service not supported\r
- return None\r
-\r
-def reset_slices(api, xrn):\r
- # service not supported\r
- return None\r
-\r
-"""\r
- GENI AM API Methods\r
-"""\r
-\r
-def GetVersion(api):\r
- xrn=Xrn(api.hrn)\r
- request_rspec_versions = [dict(sfa_rspec_version)]\r
- ad_rspec_versions = [dict(sfa_rspec_version)]\r
- #TODO: MAX-AM specific\r
- version_more = {'interface':'aggregate',\r
- 'testbed':'myplc',\r
- 'hrn':xrn.get_hrn(),\r
- 'request_rspec_versions': request_rspec_versions,\r
- 'ad_rspec_versions': ad_rspec_versions,\r
- 'default_ad_rspec': dict(sfa_rspec_version)\r
- }\r
- return version_core(version_more)\r
-\r
-def SliverStatus(api, slice_xrn, creds, call_id):\r
- if Callids().already_handled(call_id): return {}\r
- return slice_status(api, slice_xrn, creds)\r
-\r
-def CreateSliver(api, slice_xrn, creds, rspec_string, users, call_id):\r
- if Callids().already_handled(call_id): return ""\r
- #TODO: create real CreateSliver response rspec\r
- ret = create_slice(api, slice_xrn, creds, rspec_string, users)\r
- if ret:\r
- return get_rspec(api, creds, slice_xrn)\r
- else:\r
- return "<?xml version=\"1.0\" ?> <RSpec type=\"SFA\"> Error! </RSpec>"\r
-\r
-def DeleteSliver(api, xrn, creds, call_id):\r
- if Callids().already_handled(call_id): return ""\r
- return delete_slice(api, xrn, creds)\r
-\r
-# no caching\r
-def ListResources(api, creds, options,call_id):\r
- if Callids().already_handled(call_id): return ""\r
- # version_string = "rspec_%s" % (rspec_version.get_version_name())\r
- slice_urn = options.get('geni_slice_urn')\r
- return get_rspec(api, creds, slice_urn)\r
-\r
-def fetch_context(slice_hrn, user_hrn, contexts):\r
- """\r
- Returns the request context required by sfatables. At some point, this mechanism should be changed\r
- to refer to "contexts", which is the information that sfatables is requesting. But for now, we just\r
- return the basic information needed in a dict.\r
- """\r
- base_context = {'sfa':{'user':{'hrn':user_hrn}}}\r
- return base_context\r
- api = SfaApi()\r
- create_slice(api, "plc.maxpl.test000", None, rspec_xml, None)\r
-\r
+import os
+import time
+import re
+
+#from sfa.util.faults import *
+from sfa.util.sfalogging import logger
+from sfa.util.config import Config
+from sfa.util.callids import Callids
+from sfa.util.version import version_core
+from sfa.util.xrn import urn_to_hrn, hrn_to_urn, Xrn
+
+# xxx the sfa.rspecs module is dead - this symbol is now undefined
+#from sfa.rspecs.sfa_rspec import sfa_rspec_version
+
+from sfa.managers.aggregate_manager import AggregateManager
+
+from sfa.plc.slices import Slices
+
+class AggregateManagerMax (AggregateManager):
+
+ RSPEC_TMP_FILE_PREFIX = "/tmp/max_rspec"
+
+ # execute shell command and return both exit code and text output
+ def shell_execute(self, cmd, timeout):
+ pipe = os.popen('{ ' + cmd + '; } 2>&1', 'r')
+ pipe = os.popen(cmd + ' 2>&1', 'r')
+ text = ''
+ while timeout:
+ line = pipe.read()
+ text += line
+ time.sleep(1)
+ timeout = timeout-1
+ code = pipe.close()
+ if code is None: code = 0
+ if text[-1:] == '\n': text = text[:-1]
+ return code, text
+
+
+ def call_am_apiclient(self, client_app, params, timeout):
+ """
+ call AM API client with command like in the following example:
+ cd aggregate_client; java -classpath AggregateWS-client-api.jar:lib/* \
+ net.geni.aggregate.client.examples.CreateSliceNetworkClient \
+ ./repo https://geni:8443/axis2/services/AggregateGENI \
+ ... params ...
+ """
+ (client_path, am_url) = Config().get_max_aggrMgr_info()
+ sys_cmd = "cd " + client_path + "; java -classpath AggregateWS-client-api.jar:lib/* net.geni.aggregate.client.examples." + client_app + " ./repo " + am_url + " " + ' '.join(params)
+ ret = self.shell_execute(sys_cmd, timeout)
+ logger.debug("shell_execute cmd: %s returns %s" % (sys_cmd, ret))
+ return ret
+
+ # save request RSpec xml content to a tmp file
+ def save_rspec_to_file(self, rspec):
+ path = AggregateManagerMax.RSPEC_TMP_FILE_PREFIX + "_" + \
+ time.strftime('%Y%m%dT%H:%M:%S', time.gmtime(time.time())) +".xml"
+ file = open(path, "w")
+ file.write(rspec)
+ file.close()
+ return path
+
+ # get stripped down slice id/name plc.maxpl.xislice1 --> maxpl_xislice1
+ def get_plc_slice_id(self, cred, xrn):
+ (hrn, type) = urn_to_hrn(xrn)
+ slice_id = hrn.find(':')
+ sep = '.'
+ if hrn.find(':') != -1:
+ sep=':'
+ elif hrn.find('+') != -1:
+ sep='+'
+ else:
+ sep='.'
+ slice_id = hrn.split(sep)[-2] + '_' + hrn.split(sep)[-1]
+ return slice_id
+
+ # extract xml
+ def get_xml_by_tag(self, text, tag):
+ indx1 = text.find('<'+tag)
+ indx2 = text.find('/'+tag+'>')
+ xml = None
+ if indx1!=-1 and indx2>indx1:
+ xml = text[indx1:indx2+len(tag)+2]
+ return xml
+
+ def prepare_slice(self, api, slice_xrn, creds, users):
+ reg_objects = self._get_registry_objects(slice_xrn, creds, users)
+ (hrn, type) = urn_to_hrn(slice_xrn)
+ slices = Slices(api)
+ peer = slices.get_peer(hrn)
+ sfa_peer = slices.get_sfa_peer(hrn)
+ slice_record=None
+ if users:
+ slice_record = users[0].get('slice_record', {})
+ registry = api.registries[api.hrn]
+ credential = api.getCredential()
+ # ensure site record exists
+ site = slices.verify_site(hrn, slice_record, peer, sfa_peer)
+ # ensure slice record exists
+ slice = slices.verify_slice(hrn, slice_record, peer, sfa_peer)
+ # ensure person records exists
+ persons = slices.verify_persons(hrn, slice, users, peer, sfa_peer)
+
+ def parse_resources(self, text, slice_xrn):
+ resources = []
+ urn = hrn_to_urn(slice_xrn, 'sliver')
+ plc_slice = re.search("Slice Status => ([^\n]+)", text)
+ if plc_slice.group(1) != 'NONE':
+ res = {}
+ res['geni_urn'] = urn + '_plc_slice'
+ res['geni_error'] = ''
+ res['geni_status'] = 'unknown'
+ if plc_slice.group(1) == 'CREATED':
+ res['geni_status'] = 'ready'
+ resources.append(res)
+ vlans = re.findall("GRI => ([^\n]+)\n\t Status => ([^\n]+)", text)
+ for vlan in vlans:
+ res = {}
+ res['geni_error'] = ''
+ res['geni_urn'] = urn + '_vlan_' + vlan[0]
+ if vlan[1] == 'ACTIVE':
+ res['geni_status'] = 'ready'
+ elif vlan[1] == 'FAILED':
+ res['geni_status'] = 'failed'
+ else:
+ res['geni_status'] = 'configuring'
+ resources.append(res)
+ return resources
+
+ def slice_status(self, api, slice_xrn, creds):
+ urn = hrn_to_urn(slice_xrn, 'slice')
+ result = {}
+ top_level_status = 'unknown'
+ slice_id = self.get_plc_slice_id(creds, urn)
+ (ret, output) = self.call_am_apiclient("QuerySliceNetworkClient", [slice_id,], 5)
+ # parse output into rspec XML
+ if output.find("Unkown Rspec:") > 0:
+ top_level_staus = 'failed'
+ result['geni_resources'] = ''
+ else:
+ has_failure = 0
+ all_active = 0
+ if output.find("Status => FAILED") > 0:
+ top_level_staus = 'failed'
+ elif ( output.find("Status => ACCEPTED") > 0 or output.find("Status => PENDING") > 0
+ or output.find("Status => INSETUP") > 0 or output.find("Status => INCREATE") > 0
+ ):
+ top_level_status = 'configuring'
+ else:
+ top_level_status = 'ready'
+ result['geni_resources'] = self.parse_resources(output, slice_xrn)
+ result['geni_urn'] = urn
+ result['geni_status'] = top_level_status
+ return result
+
+ def create_slice(self, api, xrn, cred, rspec, users):
+ indx1 = rspec.find("<RSpec")
+ indx2 = rspec.find("</RSpec>")
+ if indx1 > -1 and indx2 > indx1:
+ rspec = rspec[indx1+len("<RSpec type=\"SFA\">"):indx2-1]
+ rspec_path = self.save_rspec_to_file(rspec)
+ self.prepare_slice(api, xrn, cred, users)
+ slice_id = self.get_plc_slice_id(cred, xrn)
+ sys_cmd = "sed -i \"s/rspec id=\\\"[^\\\"]*/rspec id=\\\"" +slice_id+ "/g\" " + rspec_path + ";sed -i \"s/:rspec=[^:'<\\\" ]*/:rspec=" +slice_id+ "/g\" " + rspec_path
+ ret = self.shell_execute(sys_cmd, 1)
+ sys_cmd = "sed -i \"s/rspec id=\\\"[^\\\"]*/rspec id=\\\"" + rspec_path + "/g\""
+ ret = self.shell_execute(sys_cmd, 1)
+ (ret, output) = self.call_am_apiclient("CreateSliceNetworkClient", [rspec_path,], 3)
+ # parse output ?
+ rspec = "<RSpec type=\"SFA\"> Done! </RSpec>"
+ return True
+
+ def delete_slice(self, api, xrn, cred):
+ slice_id = self.get_plc_slice_id(cred, xrn)
+ (ret, output) = self.call_am_apiclient("DeleteSliceNetworkClient", [slice_id,], 3)
+ # parse output ?
+ return 1
+
+
+ def get_rspec(self, api, cred, slice_urn):
+ logger.debug("#### called max-get_rspec")
+ #geni_slice_urn: urn:publicid:IDN+plc:maxpl+slice+xi_rspec_test1
+ if slice_urn == None:
+ (ret, output) = self.call_am_apiclient("GetResourceTopology", ['all', '\"\"'], 5)
+ else:
+ slice_id = self.get_plc_slice_id(cred, slice_urn)
+ (ret, output) = self.call_am_apiclient("GetResourceTopology", ['all', slice_id,], 5)
+ # parse output into rspec XML
+ if output.find("No resouce found") > 0:
+ rspec = "<RSpec type=\"SFA\"> <Fault>No resource found</Fault> </RSpec>"
+ else:
+ comp_rspec = self.get_xml_by_tag(output, 'computeResource')
+ logger.debug("#### computeResource %s" % comp_rspec)
+ topo_rspec = self.get_xml_by_tag(output, 'topology')
+ logger.debug("#### topology %s" % topo_rspec)
+ rspec = "<RSpec type=\"SFA\"> <network name=\"" + Config().get_interface_hrn() + "\">"
+ if comp_rspec != None:
+ rspec = rspec + self.get_xml_by_tag(output, 'computeResource')
+ if topo_rspec != None:
+ rspec = rspec + self.get_xml_by_tag(output, 'topology')
+ rspec = rspec + "</network> </RSpec>"
+ return (rspec)
+
+ def start_slice(self, api, xrn, cred):
+ # service not supported
+ return None
+
+ def stop_slice(self, api, xrn, cred):
+ # service not supported
+ return None
+
+ def reset_slices(self, api, xrn):
+ # service not supported
+ return None
+
+ ### GENI AM API Methods
+
+ def GetVersion(self, api):
+ xrn=Xrn(api.hrn)
+ request_rspec_versions = [dict(sfa_rspec_version)]
+ ad_rspec_versions = [dict(sfa_rspec_version)]
+ #TODO: MAX-AM specific
+ version_more = {'interface':'aggregate',
+ 'testbed':'myplc',
+ 'hrn':xrn.get_hrn(),
+ 'request_rspec_versions': request_rspec_versions,
+ 'ad_rspec_versions': ad_rspec_versions,
+ 'default_ad_rspec': dict(sfa_rspec_version)
+ }
+ return version_core(version_more)
+
+ def SliverStatus(self, api, slice_xrn, creds, call_id):
+ if Callids().already_handled(call_id): return {}
+ return self.slice_status(api, slice_xrn, creds)
+
+ def CreateSliver(self, api, slice_xrn, creds, rspec_string, users, call_id):
+ if Callids().already_handled(call_id): return ""
+ #TODO: create real CreateSliver response rspec
+ ret = self.create_slice(api, slice_xrn, creds, rspec_string, users)
+ if ret:
+ return self.get_rspec(api, creds, slice_xrn)
+ else:
+ return "<?xml version=\"1.0\" ?> <RSpec type=\"SFA\"> Error! </RSpec>"
+
+ def DeleteSliver(self, api, xrn, creds, call_id):
+ if Callids().already_handled(call_id): return ""
+ return self.delete_slice(api, xrn, creds)
+
+ # no caching
+ def ListResources(self, api, creds, options,call_id):
+ if Callids().already_handled(call_id): return ""
+ # version_string = "rspec_%s" % (rspec_version.get_version_name())
+ slice_urn = options.get('geni_slice_urn')
+ return self.get_rspec(api, creds, slice_urn)
+
+ def fetch_context(self, slice_hrn, user_hrn, contexts):
+ """
+ Returns the request context required by sfatables. At some point, this mechanism should be changed
+ to refer to "contexts", which is the information that sfatables is requesting. But for now, we just
+ return the basic information needed in a dict.
+ """
+ base_context = {'sfa':{'user':{'hrn':user_hrn}}}
+ return base_context
+
+++ /dev/null
-import sys
-
-import socket
-import struct
-
-#The following is not essential
-#from soaplib.wsgi_soap import SimpleWSGISoapApp
-#from soaplib.serializers.primitive import *
-#from soaplib.serializers.clazz import *
-
-from sfa.util.faults import *
-from sfa.util.xrn import urn_to_hrn
-from sfa.server.registry import Registries
-from sfa.util.config import Config
-from sfa.plc.nodes import *
-from sfa.util.callids import Callids
-
-# Message IDs for all the SFA light calls
-# This will be used by the aggrMgr controller
-SFA_GET_RESOURCES = 101
-SFA_CREATE_SLICE = 102
-SFA_START_SLICE = 103
-SFA_STOP_SLICE = 104
-SFA_DELETE_SLICE = 105
-SFA_GET_SLICES = 106
-SFA_RESET_SLICES = 107
-
-DEBUG = 1
-
-def print_buffer(buf):
- for i in range(0,len(buf)):
- print('%x' % buf[i])
-
-def extract(sock):
- # Shud we first obtain the message length?
- # msg_len = socket.ntohs(sock.recv(2))
- msg = ""
-
- while (1):
- try:
- chunk = sock.recv(1)
- except socket.error, message:
- if 'timed out' in message:
- break
- else:
- sys.exit("Socket error: " + message)
-
- if len(chunk) == 0:
- break
- msg += chunk
-
- print 'Done extracting %d bytes of response from aggrMgr' % len(msg)
- return msg
-
-def connect(server, port):
- '''Connect to the Aggregate Manager module'''
- sock = socket.socket ( socket.AF_INET, socket.SOCK_STREAM )
- sock.connect ( ( server, port) )
- sock.settimeout(1)
- if DEBUG: print 'Connected!'
- return sock
-
-def connect_aggrMgr():
- (aggr_mgr_ip, aggr_mgr_port) = Config().get_openflow_aggrMgr_info()
- if DEBUG: print """Connecting to port %d of %s""" % (aggr_mgr_port, aggr_mgr_ip)
- return connect(aggr_mgr_ip, aggr_mgr_port)
-
-def generate_slide_id(cred, hrn):
- if cred == None:
- cred = ""
- if hrn == None:
- hrn = ""
- #return cred + '_' + hrn
- return str(hrn)
-
-def msg_aggrMgr(cred, hrn, msg_id):
- slice_id = generate_slide_id(cred, hrn)
-
- msg = struct.pack('> B%ds' % len(slice_id), msg_id, slice_id)
- buf = struct.pack('> H', len(msg)+2) + msg
-
- try:
- aggrMgr_sock = connect_aggrMgr()
- aggrMgr_sock.send(buf)
- aggrMgr_sock.close()
- return 1
- except socket.error, message:
- print "Socket error"
- except IOerror, message:
- print "IO error"
- return 0
-
-def start_slice(cred, xrn):
- hrn = urn_to_hrn(xrn)[0]
- if DEBUG: print "Received start_slice call"
- return msg_aggrMgr(SFA_START_SLICE)
-
-def stop_slice(cred, xrn):
- hrn = urn_to_hrn(xrn)[0]
- if DEBUG: print "Received stop_slice call"
- return msg_aggrMgr(SFA_STOP_SLICE)
-
-def DeleteSliver(cred, xrn, call_id):
- if Callids().already_handled(call_id): return ""
- hrn = urn_to_hrn(xrn)[0]
- if DEBUG: print "Received DeleteSliver call"
- return msg_aggrMgr(SFA_DELETE_SLICE)
-
-def reset_slices(cred, xrn):
- hrn = urn_to_hrn(xrn)[0]
- if DEBUG: print "Received reset_slices call"
- return msg_aggrMgr(SFA_RESET_SLICES)
-
-### Thierry: xxx this should ahve api as a first arg - probably outdated
-def CreateSliver(cred, xrn, rspec, call_id):
- if Callids().already_handled(call_id): return ""
-
- hrn = urn_to_hrn(xrn)[0]
- if DEBUG: print "Received CreateSliver call"
- slice_id = generate_slide_id(cred, hrn)
-
- msg = struct.pack('> B%ds%ds' % (len(slice_id)+1, len(rspec)), SFA_CREATE_SLICE, slice_id, rspec)
- buf = struct.pack('> H', len(msg)+2) + msg
-
- try:
- aggrMgr_sock = connect_aggrMgr()
- aggrMgr_sock.send(buf)
- if DEBUG: print "Sent %d bytes and closing connection" % len(buf)
- aggrMgr_sock.close()
-
- if DEBUG: print "----------------"
- return rspec
- except socket.error, message:
- print "Socket error"
- except IOerror, message:
- print "IO error"
- return ""
-
-# Thierry : xxx this would need to handle call_id like the other AMs but is outdated...
-def ListResources(cred, xrn=None):
- hrn = urn_to_hrn(xrn)[0]
- if DEBUG: print "Received ListResources call"
- slice_id = generate_slide_id(cred, hrn)
-
- msg = struct.pack('> B%ds' % len(slice_id), SFA_GET_RESOURCES, slice_id)
- buf = struct.pack('> H', len(msg)+2) + msg
-
- try:
- aggrMgr_sock = connect_aggrMgr()
- aggrMgr_sock.send(buf)
- resource_list = extract(aggrMgr_sock);
- aggrMgr_sock.close()
-
- if DEBUG: print "----------------"
- return resource_list
- except socket.error, message:
- print "Socket error"
- except IOerror, message:
- print "IO error"
- return None
-
-"""
-Returns the request context required by sfatables. At some point, this mechanism should be changed
-to refer to "contexts", which is the information that sfatables is requesting. But for now, we just
-return the basic information needed in a dict.
-"""
-def fetch_context(slice_hrn, user_hrn, contexts):
- base_context = {'sfa':{'user':{'hrn':user_hrn}}}
- return base_context
-
-def main():
- r = RSpec()
- r.parseFile(sys.argv[1])
- rspec = r.toDict()
- CreateSliver(None,'plc',rspec,'call-id-plc')
-
-if __name__ == "__main__":
- main()
from sfa.server.interface import Interfaces, Interface
from sfa.util.config import Config
+# this truly is a server-side object
class Aggregate(SfaServer):
##
def __init__(self, ip, port, key_file, cert_file):
SfaServer.__init__(self, ip, port, key_file, cert_file,'aggregate')
-##
+#
# Aggregates is a dictionary of aggregate connections keyed on the aggregate hrn
-
+# as such it's more of a client-side thing for aggregate servers to reach their peers
+#
class Aggregates(Interfaces):
default_dict = {'aggregates': {'aggregate': [Interfaces.default_fields]}}
from sfa.server.interface import Interfaces, Interface
from sfa.util.config import Config
-##
+#
# Registry is a SfaServer that serves registry and slice operations at PLC.
+# this truly is a server-side object
+#
class Registry(SfaServer):
##
# Create a new registry object.
def __init__(self, ip, port, key_file, cert_file):
SfaServer.__init__(self, ip, port, key_file, cert_file,'registry')
-##
-# Registries is a dictionary of registry connections keyed on the registry
-# hrn
-
+#
+# Registries is a dictionary of registry connections keyed on the registry hrn
+# as such it's more of a client-side thing for registry servers to reach their peers
+#
class Registries(Interfaces):
default_dict = {'registries': {'registry': [Interfaces.default_fields]}}