5 from sfa.util.sfalogging import logger
6 from sfa.util.faults import RecordNotFound, SliverDoesNotExist
7 from sfa.util.xrn import get_authority, hrn_to_urn, urn_to_hrn, Xrn, urn_to_sliver_id
8 from sfa.util.plxrn import slicename_to_hrn, hrn_to_pl_slicename
9 from sfa.util.version import version_core
10 from sfa.util.sfatime import utcparse
11 from sfa.util.callids import Callids
13 from sfa.trust.sfaticket import SfaTicket
14 from sfa.trust.credential import Credential
15 from sfa.rspecs.version_manager import VersionManager
16 from sfa.rspecs.rspec import RSpec
18 from sfa.server.sfaapi import SfaApi
20 import sfa.plc.peers as peers
21 from sfa.plc.aggregate import Aggregate
22 from sfa.plc.slices import Slices
24 class AggregateManager:
27 # xxx Thierry : caching at the aggregate level sounds wrong...
31 def GetVersion(self, api, options={}):
33 version_manager = VersionManager()
34 ad_rspec_versions = []
35 request_rspec_versions = []
36 for rspec_version in version_manager.versions:
37 if rspec_version.content_type in ['*', 'ad']:
38 ad_rspec_versions.append(rspec_version.to_dict())
39 if rspec_version.content_type in ['*', 'request']:
40 request_rspec_versions.append(rspec_version.to_dict())
42 version_more = {'interface':'aggregate',
44 'geni_api': api.config.SFA_AGGREGATE_API_VERSION,
47 'geni_request_rspec_versions': request_rspec_versions,
48 'geni_ad_rspec_versions': ad_rspec_versions,
50 return version_core(version_more)
52 def _get_registry_objects(self, slice_xrn, creds, users):
56 hrn, _ = urn_to_hrn(slice_xrn)
58 hrn_auth = get_authority(hrn)
60 # Build up objects that an SFA registry would return if SFA
61 # could contact the slice's registry directly
65 # dont allow special characters in the site login base
66 #only_alphanumeric = re.compile('[^a-zA-Z0-9]+')
67 #login_base = only_alphanumeric.sub('', hrn_auth[:20]).lower()
68 slicename = hrn_to_pl_slicename(hrn)
69 login_base = slicename.split('_')[0]
73 site['name'] = 'geni.%s' % login_base
74 site['enabled'] = True
75 site['max_slices'] = 100
78 # Is it okay if this login base is the same as one already at this myplc site?
79 # Do we need uniqueness? Should use hrn_auth instead of just the leaf perhaps?
80 site['login_base'] = login_base
81 site['abbreviated_name'] = login_base
82 site['max_slivers'] = 1000
83 reg_objects['site'] = site
87 # get_expiration always returns a normalized datetime - no need to utcparse
88 extime = Credential(string=creds[0]).get_expiration()
89 # If the expiration time is > 60 days from now, set the expiration time to 60 days from now
90 if extime > datetime.datetime.utcnow() + datetime.timedelta(days=60):
91 extime = datetime.datetime.utcnow() + datetime.timedelta(days=60)
92 slice['expires'] = int(time.mktime(extime.timetuple()))
94 slice['name'] = hrn_to_pl_slicename(hrn)
96 slice['description'] = hrn
98 reg_objects['slice_record'] = slice
100 reg_objects['users'] = {}
103 hrn, _ = urn_to_hrn(user['urn'])
104 user['email'] = hrn_to_pl_slicename(hrn) + "@geni.net"
105 user['first_name'] = hrn
106 user['last_name'] = hrn
107 reg_objects['users'][user['email']] = user
111 def SliverStatus(self, api, slice_xrn, creds, options={}):
112 call_id = options.get('call_id')
113 if Callids().already_handled(call_id): return {}
115 (hrn, _) = urn_to_hrn(slice_xrn)
116 # find out where this slice is currently running
117 slicename = hrn_to_pl_slicename(hrn)
119 slices = api.driver.GetSlices([slicename], ['slice_id', 'node_ids','person_ids','name','expires'])
121 raise Exception("Slice %s not found (used %s as slicename internally)" % (slice_xrn, slicename))
124 # report about the local nodes only
125 nodes = api.driver.GetNodes({'node_id':slice['node_ids'],'peer_id':None},
126 ['node_id', 'hostname', 'site_id', 'boot_state', 'last_contact'])
127 site_ids = [node['site_id'] for node in nodes]
130 top_level_status = 'unknown'
132 top_level_status = 'ready'
133 slice_urn = Xrn(slice_xrn, 'slice').get_urn()
134 result['geni_urn'] = slice_urn
135 result['pl_login'] = slice['name']
136 result['pl_expires'] = datetime.datetime.fromtimestamp(slice['expires']).ctime()
141 res['pl_hostname'] = node['hostname']
142 res['pl_boot_state'] = node['boot_state']
143 res['pl_last_contact'] = node['last_contact']
144 if node['last_contact'] is not None:
145 res['pl_last_contact'] = datetime.datetime.fromtimestamp(node['last_contact']).ctime()
146 sliver_id = urn_to_sliver_id(slice_urn, slice['slice_id'], node['node_id'])
147 res['geni_urn'] = sliver_id
148 if node['boot_state'] == 'boot':
149 res['geni_status'] = 'ready'
151 res['geni_status'] = 'failed'
152 top_level_status = 'failed'
154 res['geni_error'] = ''
156 resources.append(res)
158 result['geni_status'] = top_level_status
159 result['geni_resources'] = resources
162 def CreateSliver(self, api, slice_xrn, creds, rspec_string, users, options={}):
164 Create the sliver[s] (slice) at this aggregate.
165 Verify HRN and initialize the slice record in PLC if necessary.
167 call_id = options.get('call_id')
168 if Callids().already_handled(call_id): return ""
170 aggregate = Aggregate(api)
172 (hrn, _) = urn_to_hrn(slice_xrn)
173 peer = slices.get_peer(hrn)
174 sfa_peer = slices.get_sfa_peer(hrn)
177 slice_record = users[0].get('slice_record', {})
180 rspec = RSpec(rspec_string)
181 requested_attributes = rspec.version.get_slice_attributes()
183 # ensure site record exists
184 site = slices.verify_site(hrn, slice_record, peer, sfa_peer)
185 # ensure slice record exists
186 slice = slices.verify_slice(hrn, slice_record, peer, sfa_peer)
187 # ensure person records exists
188 persons = slices.verify_persons(hrn, slice, users, peer, sfa_peer)
189 # ensure slice attributes exists
190 slices.verify_slice_attributes(slice, requested_attributes)
192 # add/remove slice from nodes
193 requested_slivers = [node.get('component_name') for node in rspec.version.get_nodes_with_slivers()]
194 slices.verify_slice_nodes(slice, requested_slivers, peer)
196 # add/remove links links
197 slices.verify_slice_links(slice, rspec.version.get_link_requests(), aggregate)
199 # handle MyPLC peer association.
200 # only used by plc and ple.
201 slices.handle_peer(site, slice, persons, peer)
203 return aggregate.get_rspec(slice_xrn=slice_xrn, version=rspec.version)
206 def RenewSliver(self, api, xrn, creds, expiration_time, options={}):
207 call_id = options.get('call_id')
208 if Callids().already_handled(call_id): return True
209 (hrn, _) = urn_to_hrn(xrn)
210 slicename = hrn_to_pl_slicename(hrn)
211 slices = api.driver.GetSlices({'name': slicename}, ['slice_id'])
213 raise RecordNotFound(hrn)
215 requested_time = utcparse(expiration_time)
216 record = {'expires': int(time.mktime(requested_time.timetuple()))}
218 api.driver.UpdateSlice(slice['slice_id'], record)
223 def start_slice(self, api, xrn, creds):
224 (hrn, _) = urn_to_hrn(xrn)
225 slicename = hrn_to_pl_slicename(hrn)
226 slices = api.driver.GetSlices({'name': slicename}, ['slice_id'])
228 raise RecordNotFound(hrn)
229 slice_id = slices[0]['slice_id']
230 slice_tags = api.driver.GetSliceTags({'slice_id': slice_id, 'tagname': 'enabled'}, ['slice_tag_id'])
231 # just remove the tag if it exists
233 api.driver.DeleteSliceTag(slice_tags[0]['slice_tag_id'])
237 def stop_slice(self, api, xrn, creds):
238 hrn, _ = urn_to_hrn(xrn)
239 slicename = hrn_to_pl_slicename(hrn)
240 slices = api.driver.GetSlices({'name': slicename}, ['slice_id'])
242 raise RecordNotFound(hrn)
243 slice_id = slices[0]['slice_id']
244 slice_tags = api.driver.GetSliceTags({'slice_id': slice_id, 'tagname': 'enabled'})
246 api.driver.AddSliceTag(slice_id, 'enabled', '0')
247 elif slice_tags[0]['value'] != "0":
248 tag_id = slice_tags[0]['slice_tag_id']
249 api.driver.UpdateSliceTag(tag_id, '0')
252 def reset_slice(self, api, xrn):
253 # XX not implemented at this interface
256 def DeleteSliver(self, api, xrn, creds, options={}):
257 call_id = options.get('call_id')
258 if Callids().already_handled(call_id): return ""
259 (hrn, _) = urn_to_hrn(xrn)
260 slicename = hrn_to_pl_slicename(hrn)
261 slices = api.driver.GetSlices({'name': slicename})
266 # determine if this is a peer slice
267 peer = peers.get_peer(api, hrn)
270 api.driver.UnBindObjectFromPeer('slice', slice['slice_id'], peer)
271 api.driver.DeleteSliceFromNodes(slicename, slice['node_ids'])
274 api.driver.BindObjectToPeer('slice', slice['slice_id'], peer, slice['peer_slice_id'])
277 def ListSlices(self, api, creds, options={}):
278 call_id = options.get('call_id')
279 if Callids().already_handled(call_id): return []
280 # look in cache first
281 if self.caching and api.cache:
282 slices = api.cache.get('slices')
287 slices = api.driver.GetSlices({'peer_id': None}, ['name'])
288 slice_hrns = [slicename_to_hrn(api.hrn, slice['name']) for slice in slices]
289 slice_urns = [hrn_to_urn(slice_hrn, 'slice') for slice_hrn in slice_hrns]
292 if self.caching and api.cache:
293 api.cache.add('slices', slice_urns)
297 def ListResources(self, api, creds, options={}):
298 call_id = options.get('call_id')
299 if Callids().already_handled(call_id): return ""
300 # get slice's hrn from options
301 xrn = options.get('geni_slice_urn', None)
302 cached = options.get('cached', True)
303 (hrn, _) = urn_to_hrn(xrn)
305 version_manager = VersionManager()
306 # get the rspec's return format from options
307 rspec_version = version_manager.get_version(options.get('rspec_version'))
308 version_string = "rspec_%s" % (rspec_version.to_string())
310 #panos adding the info option to the caching key (can be improved)
311 if options.get('info'):
312 version_string = version_string + "_"+options.get('info', 'default')
314 # look in cache first
315 if self.caching and api.cache and not xrn and cached:
316 rspec = api.cache.get(version_string)
318 api.logger.info("aggregate.ListResources: returning cached value for hrn %s"%hrn)
321 #panos: passing user-defined options
322 #print "manager options = ",options
323 aggregate = Aggregate(api)
324 rspec = aggregate.get_rspec(slice_xrn=xrn, version=rspec_version, options=options)
327 if self.caching and api.cache and not xrn:
328 api.cache.add(version_string, rspec)
333 def GetTicket(self, api, xrn, creds, rspec, users, options={}):
335 (slice_hrn, _) = urn_to_hrn(xrn)
337 peer = slices.get_peer(slice_hrn)
338 sfa_peer = slices.get_sfa_peer(slice_hrn)
340 # get the slice record
341 credential = api.getCredential()
342 interface = api.registries[api.hrn]
343 registry = api.server_proxy(interface, credential)
344 records = registry.Resolve(xrn, credential)
346 # make sure we get a local slice record
348 for tmp_record in records:
349 if tmp_record['type'] == 'slice' and \
350 not tmp_record['peer_authority']:
351 #Error (E0602, GetTicket): Undefined variable 'SliceRecord'
352 record = SliceRecord(dict=tmp_record)
354 raise RecordNotFound(slice_hrn)
356 # similar to CreateSliver, we must verify that the required records exist
357 # at this aggregate before we can issue a ticket
359 rspec = RSpec(rspec_string)
360 requested_attributes = rspec.version.get_slice_attributes()
362 # ensure site record exists
363 site = slices.verify_site(hrn, slice_record, peer, sfa_peer)
364 # ensure slice record exists
365 slice = slices.verify_slice(hrn, slice_record, peer, sfa_peer)
366 # ensure person records exists
367 persons = slices.verify_persons(hrn, slice, users, peer, sfa_peer)
368 # ensure slice attributes exists
369 slices.verify_slice_attributes(slice, requested_attributes)
372 slivers = slices.get_slivers(slice_hrn)
375 raise SliverDoesNotExist(slice_hrn)
380 'timestamp': int(time.time()),
381 'initscripts': initscripts,
386 object_gid = record.get_gid_object()
387 new_ticket = SfaTicket(subject = object_gid.get_subject())
388 new_ticket.set_gid_caller(api.auth.client_gid)
389 new_ticket.set_gid_object(object_gid)
390 new_ticket.set_issuer(key=api.key, subject=api.hrn)
391 new_ticket.set_pubkey(object_gid.get_pubkey())
392 new_ticket.set_attributes(data)
393 new_ticket.set_rspec(rspec)
394 #new_ticket.set_parent(api.auth.hierarchy.get_auth_ticket(auth_hrn))
398 return new_ticket.save_to_string(save_parents=True)