5 from sfa.util.sfalogging import logger
6 from sfa.util.faults import RecordNotFound, SliverDoesNotExist
7 from sfa.util.xrn import get_authority, hrn_to_urn, urn_to_hrn, Xrn, urn_to_sliver_id
8 from sfa.util.plxrn import slicename_to_hrn, hrn_to_pl_slicename
9 from sfa.util.version import version_core
10 from sfa.util.sfatime import utcparse
11 from sfa.util.callids import Callids
13 from sfa.trust.sfaticket import SfaTicket
14 from sfa.trust.credential import Credential
16 from sfa.rspecs.version_manager import VersionManager
17 from sfa.rspecs.rspec import RSpec
19 from sfa.server.sfaapi import SfaApi
21 import sfa.plc.peers as peers
22 from sfa.plc.aggregate import Aggregate
23 from sfa.plc.slices import Slices
25 class AggregateManager:
28 # xxx Thierry : caching at the aggregate level sounds wrong...
32 def GetVersion(self, api, options={}):
34 version_manager = VersionManager()
35 ad_rspec_versions = []
36 request_rspec_versions = []
37 for rspec_version in version_manager.versions:
38 if rspec_version.content_type in ['*', 'ad']:
39 ad_rspec_versions.append(rspec_version.to_dict())
40 if rspec_version.content_type in ['*', 'request']:
41 request_rspec_versions.append(rspec_version.to_dict())
43 version_more = {'interface':'aggregate',
45 'geni_api': api.config.SFA_AGGREGATE_API_VERSION,
48 'geni_request_rspec_versions': request_rspec_versions,
49 'geni_ad_rspec_versions': ad_rspec_versions,
51 return version_core(version_more)
53 def _get_registry_objects(self, slice_xrn, creds, users):
57 hrn, _ = urn_to_hrn(slice_xrn)
59 #hrn_auth = get_authority(hrn)
61 # Build up objects that an SFA registry would return if SFA
62 # could contact the slice's registry directly
66 # dont allow special characters in the site login base
67 #only_alphanumeric = re.compile('[^a-zA-Z0-9]+')
68 #login_base = only_alphanumeric.sub('', hrn_auth[:20]).lower()
69 slicename = hrn_to_pl_slicename(hrn)
70 login_base = slicename.split('_')[0]
74 site['name'] = 'geni.%s' % login_base
75 site['enabled'] = True
76 site['max_slices'] = 100
79 # Is it okay if this login base is the same as one already at this myplc site?
80 # Do we need uniqueness? Should use hrn_auth instead of just the leaf perhaps?
81 site['login_base'] = login_base
82 site['abbreviated_name'] = login_base
83 site['max_slivers'] = 1000
84 reg_objects['site'] = site
88 # get_expiration always returns a normalized datetime - no need to utcparse
89 extime = Credential(string=creds[0]).get_expiration()
90 # If the expiration time is > 60 days from now, set the expiration time to 60 days from now
91 if extime > datetime.datetime.utcnow() + datetime.timedelta(days=60):
92 extime = datetime.datetime.utcnow() + datetime.timedelta(days=60)
93 slice['expires'] = int(time.mktime(extime.timetuple()))
95 slice['name'] = hrn_to_pl_slicename(hrn)
97 slice['description'] = hrn
99 reg_objects['slice_record'] = slice
101 reg_objects['users'] = {}
104 hrn, _ = urn_to_hrn(user['urn'])
105 user['email'] = hrn_to_pl_slicename(hrn) + "@geni.net"
106 user['first_name'] = hrn
107 user['last_name'] = hrn
108 reg_objects['users'][user['email']] = user
112 def SliverStatus(self, api, slice_xrn, creds, options={}):
113 call_id = options.get('call_id')
114 if Callids().already_handled(call_id): return {}
116 (hrn, _) = urn_to_hrn(slice_xrn)
117 # find out where this slice is currently running
118 slicename = hrn_to_pl_slicename(hrn)
120 slices = api.driver.GetSlices([slicename], ['slice_id', 'node_ids','person_ids','name','expires'])
122 raise Exception("Slice %s not found (used %s as slicename internally)" % (slice_xrn, slicename))
125 # report about the local nodes only
126 nodes = api.driver.GetNodes({'node_id':slice['node_ids'],'peer_id':None},
127 ['node_id', 'hostname', 'site_id', 'boot_state', 'last_contact'])
128 site_ids = [node['site_id'] for node in nodes]
131 top_level_status = 'unknown'
133 top_level_status = 'ready'
134 slice_urn = Xrn(slice_xrn, 'slice').get_urn()
135 result['geni_urn'] = slice_urn
136 result['pl_login'] = slice['name']
137 result['pl_expires'] = datetime.datetime.fromtimestamp(slice['expires']).ctime()
142 res['pl_hostname'] = node['hostname']
143 res['pl_boot_state'] = node['boot_state']
144 res['pl_last_contact'] = node['last_contact']
145 if node['last_contact'] is not None:
146 res['pl_last_contact'] = datetime.datetime.fromtimestamp(node['last_contact']).ctime()
147 sliver_id = urn_to_sliver_id(slice_urn, slice['slice_id'], node['node_id'])
148 res['geni_urn'] = sliver_id
149 if node['boot_state'] == 'boot':
150 res['geni_status'] = 'ready'
152 res['geni_status'] = 'failed'
153 top_level_status = 'failed'
155 res['geni_error'] = ''
157 resources.append(res)
159 result['geni_status'] = top_level_status
160 result['geni_resources'] = resources
163 def CreateSliver(self, api, slice_xrn, creds, rspec_string, users, options={}):
165 Create the sliver[s] (slice) at this aggregate.
166 Verify HRN and initialize the slice record in PLC if necessary.
168 call_id = options.get('call_id')
169 if Callids().already_handled(call_id): return ""
171 aggregate = Aggregate(self.driver)
173 (hrn, _) = urn_to_hrn(slice_xrn)
174 peer = slices.get_peer(hrn)
175 sfa_peer = slices.get_sfa_peer(hrn)
178 slice_record = users[0].get('slice_record', {})
181 rspec = RSpec(rspec_string)
182 requested_attributes = rspec.version.get_slice_attributes()
184 # ensure site record exists
185 site = slices.verify_site(hrn, slice_record, peer, sfa_peer)
186 # ensure slice record exists
187 slice = slices.verify_slice(hrn, slice_record, peer, sfa_peer)
188 # ensure person records exists
189 persons = slices.verify_persons(hrn, slice, users, peer, sfa_peer)
190 # ensure slice attributes exists
191 slices.verify_slice_attributes(slice, requested_attributes)
193 # add/remove slice from nodes
194 requested_slivers = [node.get('component_name') for node in rspec.version.get_nodes_with_slivers()]
195 slices.verify_slice_nodes(slice, requested_slivers, peer)
197 # add/remove links links
198 slices.verify_slice_links(slice, rspec.version.get_link_requests(), aggregate)
200 # handle MyPLC peer association.
201 # only used by plc and ple.
202 slices.handle_peer(site, slice, persons, peer)
204 return aggregate.get_rspec(slice_xrn=slice_xrn, version=rspec.version)
207 def RenewSliver(self, api, xrn, creds, expiration_time, options={}):
208 call_id = options.get('call_id')
209 if Callids().already_handled(call_id): return True
210 (hrn, _) = urn_to_hrn(xrn)
211 slicename = hrn_to_pl_slicename(hrn)
212 slices = api.driver.GetSlices({'name': slicename}, ['slice_id'])
214 raise RecordNotFound(hrn)
216 requested_time = utcparse(expiration_time)
217 record = {'expires': int(time.mktime(requested_time.timetuple()))}
219 api.driver.UpdateSlice(slice['slice_id'], record)
224 def start_slice(self, api, xrn, creds):
225 (hrn, _) = urn_to_hrn(xrn)
226 slicename = hrn_to_pl_slicename(hrn)
227 slices = api.driver.GetSlices({'name': slicename}, ['slice_id'])
229 raise RecordNotFound(hrn)
230 slice_id = slices[0]['slice_id']
231 slice_tags = api.driver.GetSliceTags({'slice_id': slice_id, 'tagname': 'enabled'}, ['slice_tag_id'])
232 # just remove the tag if it exists
234 api.driver.DeleteSliceTag(slice_tags[0]['slice_tag_id'])
238 def stop_slice(self, api, xrn, creds):
239 hrn, _ = urn_to_hrn(xrn)
240 slicename = hrn_to_pl_slicename(hrn)
241 slices = api.driver.GetSlices({'name': slicename}, ['slice_id'])
243 raise RecordNotFound(hrn)
244 slice_id = slices[0]['slice_id']
245 slice_tags = api.driver.GetSliceTags({'slice_id': slice_id, 'tagname': 'enabled'})
247 api.driver.AddSliceTag(slice_id, 'enabled', '0')
248 elif slice_tags[0]['value'] != "0":
249 tag_id = slice_tags[0]['slice_tag_id']
250 api.driver.UpdateSliceTag(tag_id, '0')
253 def reset_slice(self, api, xrn):
254 # XX not implemented at this interface
257 def DeleteSliver(self, api, xrn, creds, options={}):
258 call_id = options.get('call_id')
259 if Callids().already_handled(call_id): return ""
260 (hrn, _) = urn_to_hrn(xrn)
261 slicename = hrn_to_pl_slicename(hrn)
262 slices = api.driver.GetSlices({'name': slicename})
267 # determine if this is a peer slice
268 peer = peers.get_peer(api, hrn)
271 api.driver.UnBindObjectFromPeer('slice', slice['slice_id'], peer)
272 api.driver.DeleteSliceFromNodes(slicename, slice['node_ids'])
275 api.driver.BindObjectToPeer('slice', slice['slice_id'], peer, slice['peer_slice_id'])
278 def ListSlices(self, api, creds, options={}):
279 call_id = options.get('call_id')
280 if Callids().already_handled(call_id): return []
281 # look in cache first
282 if self.caching and api.cache:
283 slices = api.cache.get('slices')
288 slices = api.driver.GetSlices({'peer_id': None}, ['name'])
289 slice_hrns = [slicename_to_hrn(api.hrn, slice['name']) for slice in slices]
290 slice_urns = [hrn_to_urn(slice_hrn, 'slice') for slice_hrn in slice_hrns]
293 if self.caching and api.cache:
294 api.cache.add('slices', slice_urns)
298 def ListResources(self, api, creds, options={}):
299 call_id = options.get('call_id')
300 if Callids().already_handled(call_id): return ""
301 # get slice's hrn from options
302 xrn = options.get('geni_slice_urn', None)
303 cached = options.get('cached', True)
304 (hrn, _) = urn_to_hrn(xrn)
306 version_manager = VersionManager()
307 # get the rspec's return format from options
308 rspec_version = version_manager.get_version(options.get('geni_rspec_version'))
309 version_string = "rspec_%s" % (rspec_version)
311 #panos adding the info option to the caching key (can be improved)
312 if options.get('info'):
313 version_string = version_string + "_"+options.get('info', 'default')
315 # look in cache first
316 if self.caching and api.cache and not xrn and cached:
317 rspec = api.cache.get(version_string)
319 api.logger.info("aggregate.ListResources: returning cached value for hrn %s"%hrn)
322 #panos: passing user-defined options
323 #print "manager options = ",options
324 aggregate = Aggregate(self.driver)
325 rspec = aggregate.get_rspec(slice_xrn=xrn, version=rspec_version, options=options)
328 if self.caching and api.cache and not xrn:
329 api.cache.add(version_string, rspec)
334 def GetTicket(self, api, xrn, creds, rspec, users, options={}):
336 (slice_hrn, _) = urn_to_hrn(xrn)
338 peer = slices.get_peer(slice_hrn)
339 sfa_peer = slices.get_sfa_peer(slice_hrn)
341 # get the slice record
342 credential = api.getCredential()
343 interface = api.registries[api.hrn]
344 registry = api.server_proxy(interface, credential)
345 records = registry.Resolve(xrn, credential)
347 # make sure we get a local slice record
349 for tmp_record in records:
350 if tmp_record['type'] == 'slice' and \
351 not tmp_record['peer_authority']:
352 #Error (E0602, GetTicket): Undefined variable 'SliceRecord'
353 record = SliceRecord(dict=tmp_record)
355 raise RecordNotFound(slice_hrn)
357 # similar to CreateSliver, we must verify that the required records exist
358 # at this aggregate before we can issue a ticket
360 rspec = RSpec(rspec_string)
361 requested_attributes = rspec.version.get_slice_attributes()
363 # ensure site record exists
364 site = slices.verify_site(hrn, slice_record, peer, sfa_peer)
365 # ensure slice record exists
366 slice = slices.verify_slice(hrn, slice_record, peer, sfa_peer)
367 # ensure person records exists
368 persons = slices.verify_persons(hrn, slice, users, peer, sfa_peer)
369 # ensure slice attributes exists
370 slices.verify_slice_attributes(slice, requested_attributes)
373 slivers = slices.get_slivers(slice_hrn)
376 raise SliverDoesNotExist(slice_hrn)
381 'timestamp': int(time.time()),
382 'initscripts': initscripts,
387 object_gid = record.get_gid_object()
388 new_ticket = SfaTicket(subject = object_gid.get_subject())
389 new_ticket.set_gid_caller(api.auth.client_gid)
390 new_ticket.set_gid_object(object_gid)
391 new_ticket.set_issuer(key=api.key, subject=api.hrn)
392 new_ticket.set_pubkey(object_gid.get_pubkey())
393 new_ticket.set_attributes(data)
394 new_ticket.set_rspec(rspec)
395 #new_ticket.set_parent(api.auth.hierarchy.get_auth_ticket(auth_hrn))
399 return new_ticket.save_to_string(save_parents=True)