923b94f151f5b6e8c6b9dbb3a0d11e1136f2d200
[sfa.git] / sfa / managers / aggregate_manager.py
1 import datetime
2 import time
3 import sys
4
5 from sfa.util.faults import RecordNotFound, SliverDoesNotExist
6 from sfa.util.xrn import get_authority, hrn_to_urn, urn_to_hrn, Xrn, urn_to_sliver_id
7 from sfa.util.plxrn import slicename_to_hrn, hrn_to_pl_slicename
8 #from sfa.util.record import *
9 from sfa.util.version import version_core
10 from sfa.util.sfatime import utcparse
11 from sfa.util.callids import Callids
12
13 from sfa.trust.sfaticket import SfaTicket
14 from sfa.trust.credential import Credential
15 from sfa.rspecs.version_manager import VersionManager
16 from sfa.rspecs.rspec import RSpec
17
18 import sfa.plc.peers as peers
19 from sfa.plc.api import SfaAPI
20 from sfa.plc.aggregate import Aggregate
21 from sfa.plc.slices import Slices
22 #unused?
23 #from sfa.plc.network import *
24 #from sfa.plc.slices import *
25
26 def GetVersion(api):
27
28     version_manager = VersionManager()
29     ad_rspec_versions = []
30     request_rspec_versions = []
31     for rspec_version in version_manager.versions:
32         if rspec_version.content_type in ['*', 'ad']:
33             ad_rspec_versions.append(rspec_version.to_dict())
34         if rspec_version.content_type in ['*', 'request']:
35             request_rspec_versions.append(rspec_version.to_dict()) 
36     default_rspec_version = version_manager.get_version("sfa 1").to_dict()
37     xrn=Xrn(api.hrn)
38     version_more = {'interface':'aggregate',
39                     'testbed':'myplc',
40                     'hrn':xrn.get_hrn(),
41                     'request_rspec_versions': request_rspec_versions,
42                     'ad_rspec_versions': ad_rspec_versions,
43                     'default_ad_rspec': default_rspec_version
44                     }
45     return version_core(version_more)
46
47 def __get_registry_objects(slice_xrn, creds, users):
48     """
49
50     """
51     hrn, _ = urn_to_hrn(slice_xrn)
52
53     hrn_auth = get_authority(hrn)
54
55     # Build up objects that an SFA registry would return if SFA
56     # could contact the slice's registry directly
57     reg_objects = None
58
59     if users:
60         # dont allow special characters in the site login base
61         #only_alphanumeric = re.compile('[^a-zA-Z0-9]+')
62         #login_base = only_alphanumeric.sub('', hrn_auth[:20]).lower()
63         slicename = hrn_to_pl_slicename(hrn)
64         login_base = slicename.split('_')[0]
65         reg_objects = {}
66         site = {}
67         site['site_id'] = 0
68         site['name'] = 'geni.%s' % login_base 
69         site['enabled'] = True
70         site['max_slices'] = 100
71
72         # Note:
73         # Is it okay if this login base is the same as one already at this myplc site?
74         # Do we need uniqueness?  Should use hrn_auth instead of just the leaf perhaps?
75         site['login_base'] = login_base
76         site['abbreviated_name'] = login_base
77         site['max_slivers'] = 1000
78         reg_objects['site'] = site
79
80         slice = {}
81         
82         # get_expiration always returns a normalized datetime - no need to utcparse
83         extime = Credential(string=creds[0]).get_expiration()
84         # If the expiration time is > 60 days from now, set the expiration time to 60 days from now
85         if extime > datetime.datetime.utcnow() + datetime.timedelta(days=60):
86             extime = datetime.datetime.utcnow() + datetime.timedelta(days=60)
87         slice['expires'] = int(time.mktime(extime.timetuple()))
88         slice['hrn'] = hrn
89         slice['name'] = hrn_to_pl_slicename(hrn)
90         slice['url'] = hrn
91         slice['description'] = hrn
92         slice['pointer'] = 0
93         reg_objects['slice_record'] = slice
94
95         reg_objects['users'] = {}
96         for user in users:
97             user['key_ids'] = []
98             hrn, _ = urn_to_hrn(user['urn'])
99             user['email'] = hrn_to_pl_slicename(hrn) + "@geni.net"
100             user['first_name'] = hrn
101             user['last_name'] = hrn
102             reg_objects['users'][user['email']] = user
103
104         return reg_objects
105
106 def __get_hostnames(nodes):
107     hostnames = []
108     for node in nodes:
109         hostnames.append(node.hostname)
110     return hostnames
111
112 def SliverStatus(api, slice_xrn, creds, call_id):
113     if Callids().already_handled(call_id): return {}
114
115     (hrn, _) = urn_to_hrn(slice_xrn)
116     # find out where this slice is currently running
117     slicename = hrn_to_pl_slicename(hrn)
118     
119     slices = api.plshell.GetSlices(api.plauth, [slicename], ['slice_id', 'node_ids','person_ids','name','expires'])
120     if len(slices) == 0:        
121         raise Exception("Slice %s not found (used %s as slicename internally)" % (slice_xrn, slicename))
122     slice = slices[0]
123     
124     # report about the local nodes only
125     nodes = api.plshell.GetNodes(api.plauth, {'node_id':slice['node_ids'],'peer_id':None},
126                                  ['node_id', 'hostname', 'site_id', 'boot_state', 'last_contact'])
127     site_ids = [node['site_id'] for node in nodes]
128
129     result = {}
130     top_level_status = 'unknown'
131     if nodes:
132         top_level_status = 'ready'
133     slice_urn = Xrn(slice_xrn, 'slice').get_urn()
134     result['geni_urn'] = slice_urn
135     result['pl_login'] = slice['name']
136     result['pl_expires'] = datetime.datetime.fromtimestamp(slice['expires']).ctime()
137     
138     resources = []
139     for node in nodes:
140         res = {}
141         res['pl_hostname'] = node['hostname']
142         res['pl_boot_state'] = node['boot_state']
143         res['pl_last_contact'] = node['last_contact']
144         if node['last_contact'] is not None:
145             res['pl_last_contact'] = datetime.datetime.fromtimestamp(node['last_contact']).ctime()
146         sliver_id = urn_to_sliver_id(slice_urn, slice['slice_id'], node['node_id']) 
147         res['geni_urn'] = sliver_id
148         if node['boot_state'] == 'boot':
149             res['geni_status'] = 'ready'
150         else:
151             res['geni_status'] = 'failed'
152             top_level_status = 'failed' 
153             
154         res['geni_error'] = ''
155
156         resources.append(res)
157         
158     result['geni_status'] = top_level_status
159     result['geni_resources'] = resources
160     return result
161
162 def CreateSliver(api, slice_xrn, creds, rspec_string, users, call_id):
163     """
164     Create the sliver[s] (slice) at this aggregate.    
165     Verify HRN and initialize the slice record in PLC if necessary.
166     """
167     if Callids().already_handled(call_id): return ""
168
169     aggregate = Aggregate(api)
170     slices = Slices(api)
171     (hrn, _) = urn_to_hrn(slice_xrn)
172     peer = slices.get_peer(hrn)
173     sfa_peer = slices.get_sfa_peer(hrn)
174     slice_record=None    
175     if users:
176         slice_record = users[0].get('slice_record', {})
177
178     # parse rspec
179     rspec = RSpec(rspec_string)
180     requested_attributes = rspec.version.get_slice_attributes()
181     
182     # ensure site record exists
183     site = slices.verify_site(hrn, slice_record, peer, sfa_peer)
184     # ensure slice record exists
185     slice = slices.verify_slice(hrn, slice_record, peer, sfa_peer)
186     # ensure person records exists
187     persons = slices.verify_persons(hrn, slice, users, peer, sfa_peer)
188     # ensure slice attributes exists
189     slices.verify_slice_attributes(slice, requested_attributes)
190     
191     # add/remove slice from nodes
192     requested_slivers = [str(host) for host in rspec.version.get_nodes_with_slivers()]
193     slices.verify_slice_nodes(slice, requested_slivers, peer) 
194
195     # hanlde MyPLC peer association.
196     # only used by plc and ple.
197     slices.handle_peer(site, slice, persons, peer)
198     
199     return aggregate.get_rspec(slice_xrn=slice_xrn, version=rspec.version)
200
201
202 def RenewSliver(api, xrn, creds, expiration_time, call_id):
203     if Callids().already_handled(call_id): return True
204     (hrn, _) = urn_to_hrn(xrn)
205     slicename = hrn_to_pl_slicename(hrn)
206     slices = api.plshell.GetSlices(api.plauth, {'name': slicename}, ['slice_id'])
207     if not slices:
208         raise RecordNotFound(hrn)
209     slice = slices[0]
210     requested_time = utcparse(expiration_time)
211     record = {'expires': int(time.mktime(requested_time.timetuple()))}
212     try:
213         api.plshell.UpdateSlice(api.plauth, slice['slice_id'], record)
214         return True
215     except:
216         return False
217
218 def start_slice(api, xrn, creds):
219     (hrn, _) = urn_to_hrn(xrn)
220     slicename = hrn_to_pl_slicename(hrn)
221     slices = api.plshell.GetSlices(api.plauth, {'name': slicename}, ['slice_id'])
222     if not slices:
223         raise RecordNotFound(hrn)
224     slice_id = slices[0]['slice_id']
225     slice_tags = api.plshell.GetSliceTags(api.plauth, {'slice_id': slice_id, 'tagname': 'enabled'}, ['slice_tag_id'])
226     # just remove the tag if it exists
227     if slice_tags:
228         api.plshell.DeleteSliceTag(api.plauth, slice_tags[0]['slice_tag_id'])
229
230     return 1
231  
232 def stop_slice(api, xrn, creds):
233     hrn, _ = urn_to_hrn(xrn)
234     slicename = hrn_to_pl_slicename(hrn)
235     slices = api.plshell.GetSlices(api.plauth, {'name': slicename}, ['slice_id'])
236     if not slices:
237         raise RecordNotFound(hrn)
238     slice_id = slices[0]['slice_id']
239     slice_tags = api.plshell.GetSliceTags(api.plauth, {'slice_id': slice_id, 'tagname': 'enabled'})
240     if not slice_tags:
241         api.plshell.AddSliceTag(api.plauth, slice_id, 'enabled', '0')
242     elif slice_tags[0]['value'] != "0":
243         tag_id = slice_tags[0]['slice_tag_id']
244         api.plshell.UpdateSliceTag(api.plauth, tag_id, '0')
245     return 1
246
247 def reset_slice(api, xrn):
248     # XX not implemented at this interface
249     return 1
250
251 def DeleteSliver(api, xrn, creds, call_id):
252     if Callids().already_handled(call_id): return ""
253     (hrn, _) = urn_to_hrn(xrn)
254     slicename = hrn_to_pl_slicename(hrn)
255     slices = api.plshell.GetSlices(api.plauth, {'name': slicename})
256     if not slices:
257         return 1
258     slice = slices[0]
259
260     # determine if this is a peer slice
261     peer = peers.get_peer(api, hrn)
262     try:
263         if peer:
264             api.plshell.UnBindObjectFromPeer(api.plauth, 'slice', slice['slice_id'], peer)
265         api.plshell.DeleteSliceFromNodes(api.plauth, slicename, slice['node_ids'])
266     finally:
267         if peer:
268             api.plshell.BindObjectToPeer(api.plauth, 'slice', slice['slice_id'], peer, slice['peer_slice_id'])
269     return 1
270
271 # xxx Thierry : caching at the aggregate level sounds wrong...
272 #caching=True
273 caching=False
274 def ListSlices(api, creds, call_id):
275     if Callids().already_handled(call_id): return []
276     # look in cache first
277     if caching and api.cache:
278         slices = api.cache.get('slices')
279         if slices:
280             return slices
281
282     # get data from db 
283     slices = api.plshell.GetSlices(api.plauth, {'peer_id': None}, ['name'])
284     slice_hrns = [slicename_to_hrn(api.hrn, slice['name']) for slice in slices]
285     slice_urns = [hrn_to_urn(slice_hrn, 'slice') for slice_hrn in slice_hrns]
286
287     # cache the result
288     if caching and api.cache:
289         api.cache.add('slices', slice_urns) 
290
291     return slice_urns
292     
293 def ListResources(api, creds, options, call_id):
294     if Callids().already_handled(call_id): return ""
295     # get slice's hrn from options
296     xrn = options.get('geni_slice_urn', None)
297     (hrn, _) = urn_to_hrn(xrn)
298
299     version_manager = VersionManager()
300     # get the rspec's return format from options
301     rspec_version = version_manager.get_version(options.get('rspec_version'))
302     version_string = "rspec_%s" % (rspec_version.to_string())
303
304     #panos adding the info option to the caching key (can be improved)
305     if options.get('info'):
306         version_string = version_string + "_"+options.get('info', 'default')
307
308     # look in cache first
309     if caching and api.cache and not xrn:
310         rspec = api.cache.get(version_string)
311         if rspec:
312             api.logger.info("aggregate.ListResources: returning cached value for hrn %s"%hrn)
313             return rspec 
314
315     #panos: passing user-defined options
316     #print "manager options = ",options
317     aggregate = Aggregate(api, options)
318     rspec =  aggregate.get_rspec(slice_xrn=xrn, version=rspec_version)
319
320     # cache the result
321     if caching and api.cache and not xrn:
322         api.cache.add(version_string, rspec)
323
324     return rspec
325
326
327 def get_ticket(api, xrn, creds, rspec, users):
328
329 #unused
330 #    reg_objects = __get_registry_objects(xrn, creds, users)
331
332     (slice_hrn, _) = urn_to_hrn(xrn)
333 #unused
334 #    slices = Slices(api)
335 #    peer = slices.get_peer(slice_hrn)
336 #    sfa_peer = slices.get_sfa_peer(slice_hrn)
337
338     # get the slice record
339     registry = api.registries[api.hrn]
340     credential = api.getCredential()
341     records = registry.Resolve(xrn, credential)
342
343     # similar to CreateSliver, we must verify that the required records exist
344     # at this aggregate before we can issue a ticket
345 #Error (E1121, get_ticket): Too many positional arguments for function call
346 #unused anyway
347 #    site_id, remote_site_id = slices.verify_site(registry, credential, slice_hrn,
348 #                                                 peer, sfa_peer, reg_objects)
349 #Error (E1121, get_ticket): Too many positional arguments for function call
350 #unused anyway
351 #    slice = slices.verify_slice(registry, credential, slice_hrn, site_id,
352 #                                remote_site_id, peer, sfa_peer, reg_objects)
353
354     # make sure we get a local slice record
355     record = None
356     for tmp_record in records:
357         if tmp_record['type'] == 'slice' and \
358            not tmp_record['peer_authority']:
359 #Error (E0602, get_ticket): Undefined variable 'SliceRecord'
360             record = SliceRecord(dict=tmp_record)
361     if not record:
362         raise RecordNotFound(slice_hrn)
363
364     # get sliver info
365     slivers = Slices(api).get_slivers(slice_hrn)
366     if not slivers:
367         raise SliverDoesNotExist(slice_hrn)
368
369     # get initscripts
370     initscripts = []
371     data = {
372         'timestamp': int(time.time()),
373         'initscripts': initscripts,
374         'slivers': slivers
375     }
376
377     # create the ticket
378     object_gid = record.get_gid_object()
379     new_ticket = SfaTicket(subject = object_gid.get_subject())
380     new_ticket.set_gid_caller(api.auth.client_gid)
381     new_ticket.set_gid_object(object_gid)
382     new_ticket.set_issuer(key=api.key, subject=api.hrn)
383     new_ticket.set_pubkey(object_gid.get_pubkey())
384     new_ticket.set_attributes(data)
385     new_ticket.set_rspec(rspec)
386     #new_ticket.set_parent(api.auth.hierarchy.get_auth_ticket(auth_hrn))
387     new_ticket.encode()
388     new_ticket.sign()
389
390     return new_ticket.save_to_string(save_parents=True)
391
392
393
394 def main():
395     """
396     rspec = ListResources(api, "plc.princeton.sapan", None, 'pl_test_sapan')
397     #rspec = ListResources(api, "plc.princeton.coblitz", None, 'pl_test_coblitz')
398     #rspec = ListResources(api, "plc.pl.sirius", None, 'pl_test_sirius')
399     print rspec
400     """
401     api = SfaAPI()
402     f = open(sys.argv[1])
403     xml = f.read()
404     f.close()
405 #Error (E1120, main): No value passed for parameter 'users' in function call
406 #Error (E1120, main): No value passed for parameter 'call_id' in function call
407     CreateSliver(api, "plc.princeton.sapan", xml, 'CreateSliver_sapan')
408
409 if __name__ == "__main__":
410     main()