5 from sfa.util.sfalogging import logger
6 from sfa.util.faults import RecordNotFound, SliverDoesNotExist
7 from sfa.util.xrn import get_authority, hrn_to_urn, urn_to_hrn, Xrn, urn_to_sliver_id
8 from sfa.util.plxrn import slicename_to_hrn, hrn_to_pl_slicename
9 from sfa.util.version import version_core
10 from sfa.util.sfatime import utcparse
11 from sfa.util.callids import Callids
13 from sfa.trust.sfaticket import SfaTicket
14 from sfa.trust.credential import Credential
15 from sfa.rspecs.version_manager import VersionManager
16 from sfa.rspecs.rspec import RSpec
18 from sfa.server.sfaapi import SfaApi
20 import sfa.plc.peers as peers
21 from sfa.plc.aggregate import Aggregate
22 from sfa.plc.slices import Slices
24 class AggregateManager:
27 # xxx Thierry : caching at the aggregate level sounds wrong...
31 def GetVersion(self, api, options={}):
33 version_manager = VersionManager()
34 ad_rspec_versions = []
35 request_rspec_versions = []
36 for rspec_version in version_manager.versions:
37 if rspec_version.content_type in ['*', 'ad']:
38 ad_rspec_versions.append(rspec_version.to_dict())
39 if rspec_version.content_type in ['*', 'request']:
40 request_rspec_versions.append(rspec_version.to_dict())
41 default_rspec_version = version_manager.get_version("sfa 1").to_dict()
43 version_more = {'interface':'aggregate',
46 'request_rspec_versions': request_rspec_versions,
47 'ad_rspec_versions': ad_rspec_versions,
48 'default_ad_rspec': default_rspec_version
50 return version_core(version_more)
52 def _get_registry_objects(self, slice_xrn, creds, users):
56 hrn, _ = urn_to_hrn(slice_xrn)
58 hrn_auth = get_authority(hrn)
60 # Build up objects that an SFA registry would return if SFA
61 # could contact the slice's registry directly
65 # dont allow special characters in the site login base
66 #only_alphanumeric = re.compile('[^a-zA-Z0-9]+')
67 #login_base = only_alphanumeric.sub('', hrn_auth[:20]).lower()
68 slicename = hrn_to_pl_slicename(hrn)
69 login_base = slicename.split('_')[0]
73 site['name'] = 'geni.%s' % login_base
74 site['enabled'] = True
75 site['max_slices'] = 100
78 # Is it okay if this login base is the same as one already at this myplc site?
79 # Do we need uniqueness? Should use hrn_auth instead of just the leaf perhaps?
80 site['login_base'] = login_base
81 site['abbreviated_name'] = login_base
82 site['max_slivers'] = 1000
83 reg_objects['site'] = site
87 # get_expiration always returns a normalized datetime - no need to utcparse
88 extime = Credential(string=creds[0]).get_expiration()
89 # If the expiration time is > 60 days from now, set the expiration time to 60 days from now
90 if extime > datetime.datetime.utcnow() + datetime.timedelta(days=60):
91 extime = datetime.datetime.utcnow() + datetime.timedelta(days=60)
92 slice['expires'] = int(time.mktime(extime.timetuple()))
94 slice['name'] = hrn_to_pl_slicename(hrn)
96 slice['description'] = hrn
98 reg_objects['slice_record'] = slice
100 reg_objects['users'] = {}
103 hrn, _ = urn_to_hrn(user['urn'])
104 user['email'] = hrn_to_pl_slicename(hrn) + "@geni.net"
105 user['first_name'] = hrn
106 user['last_name'] = hrn
107 reg_objects['users'][user['email']] = user
111 def SliverStatus(self, api, slice_xrn, creds, options={}):
112 call_id = options.get('call_id')
113 if Callids().already_handled(call_id): return {}
115 (hrn, _) = urn_to_hrn(slice_xrn)
116 # find out where this slice is currently running
117 slicename = hrn_to_pl_slicename(hrn)
119 slices = api.driver.GetSlices([slicename], ['slice_id', 'node_ids','person_ids','name','expires'])
121 raise Exception("Slice %s not found (used %s as slicename internally)" % (slice_xrn, slicename))
124 # report about the local nodes only
125 nodes = api.driver.GetNodes({'node_id':slice['node_ids'],'peer_id':None},
126 ['node_id', 'hostname', 'site_id', 'boot_state', 'last_contact'])
127 site_ids = [node['site_id'] for node in nodes]
130 top_level_status = 'unknown'
132 top_level_status = 'ready'
133 slice_urn = Xrn(slice_xrn, 'slice').get_urn()
134 result['geni_urn'] = slice_urn
135 result['pl_login'] = slice['name']
136 result['pl_expires'] = datetime.datetime.fromtimestamp(slice['expires']).ctime()
141 res['pl_hostname'] = node['hostname']
142 res['pl_boot_state'] = node['boot_state']
143 res['pl_last_contact'] = node['last_contact']
144 if node['last_contact'] is not None:
145 res['pl_last_contact'] = datetime.datetime.fromtimestamp(node['last_contact']).ctime()
146 sliver_id = urn_to_sliver_id(slice_urn, slice['slice_id'], node['node_id'])
147 res['geni_urn'] = sliver_id
148 if node['boot_state'] == 'boot':
149 res['geni_status'] = 'ready'
151 res['geni_status'] = 'failed'
152 top_level_status = 'failed'
154 res['geni_error'] = ''
156 resources.append(res)
158 result['geni_status'] = top_level_status
159 result['geni_resources'] = resources
162 def CreateSliver(self, api, slice_xrn, creds, rspec_string, users, options={}):
164 Create the sliver[s] (slice) at this aggregate.
165 Verify HRN and initialize the slice record in PLC if necessary.
167 call_id = options.get('call_id')
168 if Callids().already_handled(call_id): return ""
170 aggregate = Aggregate(api)
172 (hrn, _) = urn_to_hrn(slice_xrn)
173 peer = slices.get_peer(hrn)
174 sfa_peer = slices.get_sfa_peer(hrn)
177 slice_record = users[0].get('slice_record', {})
180 rspec = RSpec(rspec_string)
181 requested_attributes = rspec.version.get_slice_attributes()
183 # ensure site record exists
184 site = slices.verify_site(hrn, slice_record, peer, sfa_peer)
185 # ensure slice record exists
186 slice = slices.verify_slice(hrn, slice_record, peer, sfa_peer)
187 # ensure person records exists
188 persons = slices.verify_persons(hrn, slice, users, peer, sfa_peer)
189 # ensure slice attributes exists
190 slices.verify_slice_attributes(slice, requested_attributes)
192 # add/remove slice from nodes
193 requested_slivers = [node.get('component_name') for node in rspec.version.get_nodes_with_slivers()]
194 slices.verify_slice_nodes(slice, requested_slivers, peer)
196 # add/remove links links
197 slices.verify_slice_links(slice, rspec.version.get_link_requests(), aggregate)
199 # handle MyPLC peer association.
200 # only used by plc and ple.
201 slices.handle_peer(site, slice, persons, peer)
203 return aggregate.get_rspec(slice_xrn=slice_xrn, version=rspec.version)
206 def RenewSliver(self, api, xrn, creds, expiration_time, options={}):
207 call_id = options.get('call_id')
208 if Callids().already_handled(call_id): return True
209 (hrn, _) = urn_to_hrn(xrn)
210 slicename = hrn_to_pl_slicename(hrn)
211 slices = api.driver.GetSlices({'name': slicename}, ['slice_id'])
213 raise RecordNotFound(hrn)
215 requested_time = utcparse(expiration_time)
216 record = {'expires': int(time.mktime(requested_time.timetuple()))}
218 api.driver.UpdateSlice(slice['slice_id'], record)
223 def start_slice(self, api, xrn, creds):
224 (hrn, _) = urn_to_hrn(xrn)
225 slicename = hrn_to_pl_slicename(hrn)
226 slices = api.driver.GetSlices({'name': slicename}, ['slice_id'])
228 raise RecordNotFound(hrn)
229 slice_id = slices[0]['slice_id']
230 slice_tags = api.driver.GetSliceTags({'slice_id': slice_id, 'tagname': 'enabled'}, ['slice_tag_id'])
231 # just remove the tag if it exists
233 api.driver.DeleteSliceTag(slice_tags[0]['slice_tag_id'])
237 def stop_slice(self, api, xrn, creds):
238 hrn, _ = urn_to_hrn(xrn)
239 slicename = hrn_to_pl_slicename(hrn)
240 slices = api.driver.GetSlices({'name': slicename}, ['slice_id'])
242 raise RecordNotFound(hrn)
243 slice_id = slices[0]['slice_id']
244 slice_tags = api.driver.GetSliceTags({'slice_id': slice_id, 'tagname': 'enabled'})
246 api.driver.AddSliceTag(slice_id, 'enabled', '0')
247 elif slice_tags[0]['value'] != "0":
248 tag_id = slice_tags[0]['slice_tag_id']
249 api.driver.UpdateSliceTag(tag_id, '0')
252 def reset_slice(self, api, xrn):
253 # XX not implemented at this interface
256 def DeleteSliver(self, api, xrn, creds, options={}):
257 call_id = options.get('call_id')
258 if Callids().already_handled(call_id): return ""
259 (hrn, _) = urn_to_hrn(xrn)
260 slicename = hrn_to_pl_slicename(hrn)
261 slices = api.driver.GetSlices({'name': slicename})
266 # determine if this is a peer slice
267 peer = peers.get_peer(api, hrn)
270 api.driver.UnBindObjectFromPeer('slice', slice['slice_id'], peer)
271 api.driver.DeleteSliceFromNodes(slicename, slice['node_ids'])
274 api.driver.BindObjectToPeer('slice', slice['slice_id'], peer, slice['peer_slice_id'])
277 def ListSlices(self, api, creds, options={}):
278 call_id = options.get('call_id')
279 if Callids().already_handled(call_id): return []
280 # look in cache first
281 if self.caching and api.cache:
282 slices = api.cache.get('slices')
287 slices = api.driver.GetSlices({'peer_id': None}, ['name'])
288 slice_hrns = [slicename_to_hrn(api.hrn, slice['name']) for slice in slices]
289 slice_urns = [hrn_to_urn(slice_hrn, 'slice') for slice_hrn in slice_hrns]
292 if self.caching and api.cache:
293 api.cache.add('slices', slice_urns)
297 def ListResources(self, api, creds, options={}):
298 call_id = options.get('call_id')
299 if Callids().already_handled(call_id): return ""
300 # get slice's hrn from options
301 xrn = options.get('geni_slice_urn', None)
302 (hrn, _) = urn_to_hrn(xrn)
304 version_manager = VersionManager()
305 # get the rspec's return format from options
306 rspec_version = version_manager.get_version(options.get('rspec_version'))
307 version_string = "rspec_%s" % (rspec_version.to_string())
309 #panos adding the info option to the caching key (can be improved)
310 if options.get('info'):
311 version_string = version_string + "_"+options.get('info', 'default')
313 # look in cache first
314 if self.caching and api.cache and not xrn:
315 rspec = api.cache.get(version_string)
317 api.logger.info("aggregate.ListResources: returning cached value for hrn %s"%hrn)
320 #panos: passing user-defined options
321 #print "manager options = ",options
322 aggregate = Aggregate(api, options)
323 rspec = aggregate.get_rspec(slice_xrn=xrn, version=rspec_version)
326 if self.caching and api.cache and not xrn:
327 api.cache.add(version_string, rspec)
332 def GetTicket(self, api, xrn, creds, rspec, users, options={}):
334 (slice_hrn, _) = urn_to_hrn(xrn)
336 peer = slices.get_peer(slice_hrn)
337 sfa_peer = slices.get_sfa_peer(slice_hrn)
339 # get the slice record
340 credential = api.getCredential()
341 interface = api.registries[api.hrn]
342 registry = api.server_proxy(interface, credential)
343 records = registry.Resolve(xrn, credential)
345 # make sure we get a local slice record
347 for tmp_record in records:
348 if tmp_record['type'] == 'slice' and \
349 not tmp_record['peer_authority']:
350 #Error (E0602, GetTicket): Undefined variable 'SliceRecord'
351 record = SliceRecord(dict=tmp_record)
353 raise RecordNotFound(slice_hrn)
355 # similar to CreateSliver, we must verify that the required records exist
356 # at this aggregate before we can issue a ticket
358 rspec = RSpec(rspec_string)
359 requested_attributes = rspec.version.get_slice_attributes()
361 # ensure site record exists
362 site = slices.verify_site(hrn, slice_record, peer, sfa_peer)
363 # ensure slice record exists
364 slice = slices.verify_slice(hrn, slice_record, peer, sfa_peer)
365 # ensure person records exists
366 persons = slices.verify_persons(hrn, slice, users, peer, sfa_peer)
367 # ensure slice attributes exists
368 slices.verify_slice_attributes(slice, requested_attributes)
371 slivers = slices.get_slivers(slice_hrn)
374 raise SliverDoesNotExist(slice_hrn)
379 'timestamp': int(time.time()),
380 'initscripts': initscripts,
385 object_gid = record.get_gid_object()
386 new_ticket = SfaTicket(subject = object_gid.get_subject())
387 new_ticket.set_gid_caller(api.auth.client_gid)
388 new_ticket.set_gid_object(object_gid)
389 new_ticket.set_issuer(key=api.key, subject=api.hrn)
390 new_ticket.set_pubkey(object_gid.get_pubkey())
391 new_ticket.set_attributes(data)
392 new_ticket.set_rspec(rspec)
393 #new_ticket.set_parent(api.auth.hierarchy.get_auth_ticket(auth_hrn))
397 return new_ticket.save_to_string(save_parents=True)