trimmed useless imports, unstarred all imports
[sfa.git] / sfa / managers / aggregate_manager.py
1 import datetime
2 import time
3 import sys
4
5 from sfa.util.faults import RecordNotFound, SliverDoesNotExist
6 from sfa.util.xrn import get_authority, hrn_to_urn, urn_to_hrn, Xrn, urn_to_sliver_id
7 from sfa.util.plxrn import slicename_to_hrn, hrn_to_pl_slicename
8 from sfa.util.version import version_core
9 from sfa.util.sfatime import utcparse
10 from sfa.util.callids import Callids
11
12 from sfa.trust.sfaticket import SfaTicket
13 from sfa.trust.credential import Credential
14 from sfa.rspecs.version_manager import VersionManager
15 from sfa.rspecs.rspec import RSpec
16
17 import sfa.plc.peers as peers
18 from sfa.plc.api import SfaAPI
19 from sfa.plc.aggregate import Aggregate
20 from sfa.plc.slices import Slices
21
22 def GetVersion(api):
23
24     version_manager = VersionManager()
25     ad_rspec_versions = []
26     request_rspec_versions = []
27     for rspec_version in version_manager.versions:
28         if rspec_version.content_type in ['*', 'ad']:
29             ad_rspec_versions.append(rspec_version.to_dict())
30         if rspec_version.content_type in ['*', 'request']:
31             request_rspec_versions.append(rspec_version.to_dict()) 
32     default_rspec_version = version_manager.get_version("sfa 1").to_dict()
33     xrn=Xrn(api.hrn)
34     version_more = {'interface':'aggregate',
35                     'testbed':'myplc',
36                     'hrn':xrn.get_hrn(),
37                     'request_rspec_versions': request_rspec_versions,
38                     'ad_rspec_versions': ad_rspec_versions,
39                     'default_ad_rspec': default_rspec_version
40                     }
41     return version_core(version_more)
42
43 def __get_registry_objects(slice_xrn, creds, users):
44     """
45
46     """
47     hrn, _ = urn_to_hrn(slice_xrn)
48
49     hrn_auth = get_authority(hrn)
50
51     # Build up objects that an SFA registry would return if SFA
52     # could contact the slice's registry directly
53     reg_objects = None
54
55     if users:
56         # dont allow special characters in the site login base
57         #only_alphanumeric = re.compile('[^a-zA-Z0-9]+')
58         #login_base = only_alphanumeric.sub('', hrn_auth[:20]).lower()
59         slicename = hrn_to_pl_slicename(hrn)
60         login_base = slicename.split('_')[0]
61         reg_objects = {}
62         site = {}
63         site['site_id'] = 0
64         site['name'] = 'geni.%s' % login_base 
65         site['enabled'] = True
66         site['max_slices'] = 100
67
68         # Note:
69         # Is it okay if this login base is the same as one already at this myplc site?
70         # Do we need uniqueness?  Should use hrn_auth instead of just the leaf perhaps?
71         site['login_base'] = login_base
72         site['abbreviated_name'] = login_base
73         site['max_slivers'] = 1000
74         reg_objects['site'] = site
75
76         slice = {}
77         
78         # get_expiration always returns a normalized datetime - no need to utcparse
79         extime = Credential(string=creds[0]).get_expiration()
80         # If the expiration time is > 60 days from now, set the expiration time to 60 days from now
81         if extime > datetime.datetime.utcnow() + datetime.timedelta(days=60):
82             extime = datetime.datetime.utcnow() + datetime.timedelta(days=60)
83         slice['expires'] = int(time.mktime(extime.timetuple()))
84         slice['hrn'] = hrn
85         slice['name'] = hrn_to_pl_slicename(hrn)
86         slice['url'] = hrn
87         slice['description'] = hrn
88         slice['pointer'] = 0
89         reg_objects['slice_record'] = slice
90
91         reg_objects['users'] = {}
92         for user in users:
93             user['key_ids'] = []
94             hrn, _ = urn_to_hrn(user['urn'])
95             user['email'] = hrn_to_pl_slicename(hrn) + "@geni.net"
96             user['first_name'] = hrn
97             user['last_name'] = hrn
98             reg_objects['users'][user['email']] = user
99
100         return reg_objects
101
102 def __get_hostnames(nodes):
103     hostnames = []
104     for node in nodes:
105         hostnames.append(node.hostname)
106     return hostnames
107
108 def SliverStatus(api, slice_xrn, creds, call_id):
109     if Callids().already_handled(call_id): return {}
110
111     (hrn, _) = urn_to_hrn(slice_xrn)
112     # find out where this slice is currently running
113     slicename = hrn_to_pl_slicename(hrn)
114     
115     slices = api.plshell.GetSlices(api.plauth, [slicename], ['slice_id', 'node_ids','person_ids','name','expires'])
116     if len(slices) == 0:        
117         raise Exception("Slice %s not found (used %s as slicename internally)" % (slice_xrn, slicename))
118     slice = slices[0]
119     
120     # report about the local nodes only
121     nodes = api.plshell.GetNodes(api.plauth, {'node_id':slice['node_ids'],'peer_id':None},
122                                  ['node_id', 'hostname', 'site_id', 'boot_state', 'last_contact'])
123     site_ids = [node['site_id'] for node in nodes]
124
125     result = {}
126     top_level_status = 'unknown'
127     if nodes:
128         top_level_status = 'ready'
129     slice_urn = Xrn(slice_xrn, 'slice').get_urn()
130     result['geni_urn'] = slice_urn
131     result['pl_login'] = slice['name']
132     result['pl_expires'] = datetime.datetime.fromtimestamp(slice['expires']).ctime()
133     
134     resources = []
135     for node in nodes:
136         res = {}
137         res['pl_hostname'] = node['hostname']
138         res['pl_boot_state'] = node['boot_state']
139         res['pl_last_contact'] = node['last_contact']
140         if node['last_contact'] is not None:
141             res['pl_last_contact'] = datetime.datetime.fromtimestamp(node['last_contact']).ctime()
142         sliver_id = urn_to_sliver_id(slice_urn, slice['slice_id'], node['node_id']) 
143         res['geni_urn'] = sliver_id
144         if node['boot_state'] == 'boot':
145             res['geni_status'] = 'ready'
146         else:
147             res['geni_status'] = 'failed'
148             top_level_status = 'failed' 
149             
150         res['geni_error'] = ''
151
152         resources.append(res)
153         
154     result['geni_status'] = top_level_status
155     result['geni_resources'] = resources
156     return result
157
158 def CreateSliver(api, slice_xrn, creds, rspec_string, users, call_id):
159     """
160     Create the sliver[s] (slice) at this aggregate.    
161     Verify HRN and initialize the slice record in PLC if necessary.
162     """
163     if Callids().already_handled(call_id): return ""
164
165     aggregate = Aggregate(api)
166     slices = Slices(api)
167     (hrn, _) = urn_to_hrn(slice_xrn)
168     peer = slices.get_peer(hrn)
169     sfa_peer = slices.get_sfa_peer(hrn)
170     slice_record=None    
171     if users:
172         slice_record = users[0].get('slice_record', {})
173
174     # parse rspec
175     rspec = RSpec(rspec_string)
176     requested_attributes = rspec.version.get_slice_attributes()
177     
178     # ensure site record exists
179     site = slices.verify_site(hrn, slice_record, peer, sfa_peer)
180     # ensure slice record exists
181     slice = slices.verify_slice(hrn, slice_record, peer, sfa_peer)
182     # ensure person records exists
183     persons = slices.verify_persons(hrn, slice, users, peer, sfa_peer)
184     # ensure slice attributes exists
185     slices.verify_slice_attributes(slice, requested_attributes)
186     
187     # add/remove slice from nodes
188     requested_slivers = [str(host) for host in rspec.version.get_nodes_with_slivers()]
189     slices.verify_slice_nodes(slice, requested_slivers, peer) 
190
191     # hanlde MyPLC peer association.
192     # only used by plc and ple.
193     slices.handle_peer(site, slice, persons, peer)
194     
195     return aggregate.get_rspec(slice_xrn=slice_xrn, version=rspec.version)
196
197
198 def RenewSliver(api, xrn, creds, expiration_time, call_id):
199     if Callids().already_handled(call_id): return True
200     (hrn, _) = urn_to_hrn(xrn)
201     slicename = hrn_to_pl_slicename(hrn)
202     slices = api.plshell.GetSlices(api.plauth, {'name': slicename}, ['slice_id'])
203     if not slices:
204         raise RecordNotFound(hrn)
205     slice = slices[0]
206     requested_time = utcparse(expiration_time)
207     record = {'expires': int(time.mktime(requested_time.timetuple()))}
208     try:
209         api.plshell.UpdateSlice(api.plauth, slice['slice_id'], record)
210         return True
211     except:
212         return False
213
214 def start_slice(api, xrn, creds):
215     (hrn, _) = urn_to_hrn(xrn)
216     slicename = hrn_to_pl_slicename(hrn)
217     slices = api.plshell.GetSlices(api.plauth, {'name': slicename}, ['slice_id'])
218     if not slices:
219         raise RecordNotFound(hrn)
220     slice_id = slices[0]['slice_id']
221     slice_tags = api.plshell.GetSliceTags(api.plauth, {'slice_id': slice_id, 'tagname': 'enabled'}, ['slice_tag_id'])
222     # just remove the tag if it exists
223     if slice_tags:
224         api.plshell.DeleteSliceTag(api.plauth, slice_tags[0]['slice_tag_id'])
225
226     return 1
227  
228 def stop_slice(api, xrn, creds):
229     hrn, _ = urn_to_hrn(xrn)
230     slicename = hrn_to_pl_slicename(hrn)
231     slices = api.plshell.GetSlices(api.plauth, {'name': slicename}, ['slice_id'])
232     if not slices:
233         raise RecordNotFound(hrn)
234     slice_id = slices[0]['slice_id']
235     slice_tags = api.plshell.GetSliceTags(api.plauth, {'slice_id': slice_id, 'tagname': 'enabled'})
236     if not slice_tags:
237         api.plshell.AddSliceTag(api.plauth, slice_id, 'enabled', '0')
238     elif slice_tags[0]['value'] != "0":
239         tag_id = slice_tags[0]['slice_tag_id']
240         api.plshell.UpdateSliceTag(api.plauth, tag_id, '0')
241     return 1
242
243 def reset_slice(api, xrn):
244     # XX not implemented at this interface
245     return 1
246
247 def DeleteSliver(api, xrn, creds, call_id):
248     if Callids().already_handled(call_id): return ""
249     (hrn, _) = urn_to_hrn(xrn)
250     slicename = hrn_to_pl_slicename(hrn)
251     slices = api.plshell.GetSlices(api.plauth, {'name': slicename})
252     if not slices:
253         return 1
254     slice = slices[0]
255
256     # determine if this is a peer slice
257     peer = peers.get_peer(api, hrn)
258     try:
259         if peer:
260             api.plshell.UnBindObjectFromPeer(api.plauth, 'slice', slice['slice_id'], peer)
261         api.plshell.DeleteSliceFromNodes(api.plauth, slicename, slice['node_ids'])
262     finally:
263         if peer:
264             api.plshell.BindObjectToPeer(api.plauth, 'slice', slice['slice_id'], peer, slice['peer_slice_id'])
265     return 1
266
267 # xxx Thierry : caching at the aggregate level sounds wrong...
268 #caching=True
269 caching=False
270 def ListSlices(api, creds, call_id):
271     if Callids().already_handled(call_id): return []
272     # look in cache first
273     if caching and api.cache:
274         slices = api.cache.get('slices')
275         if slices:
276             return slices
277
278     # get data from db 
279     slices = api.plshell.GetSlices(api.plauth, {'peer_id': None}, ['name'])
280     slice_hrns = [slicename_to_hrn(api.hrn, slice['name']) for slice in slices]
281     slice_urns = [hrn_to_urn(slice_hrn, 'slice') for slice_hrn in slice_hrns]
282
283     # cache the result
284     if caching and api.cache:
285         api.cache.add('slices', slice_urns) 
286
287     return slice_urns
288     
289 def ListResources(api, creds, options, call_id):
290     if Callids().already_handled(call_id): return ""
291     # get slice's hrn from options
292     xrn = options.get('geni_slice_urn', None)
293     (hrn, _) = urn_to_hrn(xrn)
294
295     version_manager = VersionManager()
296     # get the rspec's return format from options
297     rspec_version = version_manager.get_version(options.get('rspec_version'))
298     version_string = "rspec_%s" % (rspec_version.to_string())
299
300     #panos adding the info option to the caching key (can be improved)
301     if options.get('info'):
302         version_string = version_string + "_"+options.get('info', 'default')
303
304     # look in cache first
305     if caching and api.cache and not xrn:
306         rspec = api.cache.get(version_string)
307         if rspec:
308             api.logger.info("aggregate.ListResources: returning cached value for hrn %s"%hrn)
309             return rspec 
310
311     #panos: passing user-defined options
312     #print "manager options = ",options
313     aggregate = Aggregate(api, options)
314     rspec =  aggregate.get_rspec(slice_xrn=xrn, version=rspec_version)
315
316     # cache the result
317     if caching and api.cache and not xrn:
318         api.cache.add(version_string, rspec)
319
320     return rspec
321
322
323 def get_ticket(api, xrn, creds, rspec, users):
324
325 #unused
326 #    reg_objects = __get_registry_objects(xrn, creds, users)
327
328     (slice_hrn, _) = urn_to_hrn(xrn)
329 #unused
330 #    slices = Slices(api)
331 #    peer = slices.get_peer(slice_hrn)
332 #    sfa_peer = slices.get_sfa_peer(slice_hrn)
333
334     # get the slice record
335     registry = api.registries[api.hrn]
336     credential = api.getCredential()
337     records = registry.Resolve(xrn, credential)
338
339     # similar to CreateSliver, we must verify that the required records exist
340     # at this aggregate before we can issue a ticket
341 #Error (E1121, get_ticket): Too many positional arguments for function call
342 #unused anyway
343 #    site_id, remote_site_id = slices.verify_site(registry, credential, slice_hrn,
344 #                                                 peer, sfa_peer, reg_objects)
345 #Error (E1121, get_ticket): Too many positional arguments for function call
346 #unused anyway
347 #    slice = slices.verify_slice(registry, credential, slice_hrn, site_id,
348 #                                remote_site_id, peer, sfa_peer, reg_objects)
349
350     # make sure we get a local slice record
351     record = None
352     for tmp_record in records:
353         if tmp_record['type'] == 'slice' and \
354            not tmp_record['peer_authority']:
355 #Error (E0602, get_ticket): Undefined variable 'SliceRecord'
356             record = SliceRecord(dict=tmp_record)
357     if not record:
358         raise RecordNotFound(slice_hrn)
359
360     # get sliver info
361     slivers = Slices(api).get_slivers(slice_hrn)
362     if not slivers:
363         raise SliverDoesNotExist(slice_hrn)
364
365     # get initscripts
366     initscripts = []
367     data = {
368         'timestamp': int(time.time()),
369         'initscripts': initscripts,
370         'slivers': slivers
371     }
372
373     # create the ticket
374     object_gid = record.get_gid_object()
375     new_ticket = SfaTicket(subject = object_gid.get_subject())
376     new_ticket.set_gid_caller(api.auth.client_gid)
377     new_ticket.set_gid_object(object_gid)
378     new_ticket.set_issuer(key=api.key, subject=api.hrn)
379     new_ticket.set_pubkey(object_gid.get_pubkey())
380     new_ticket.set_attributes(data)
381     new_ticket.set_rspec(rspec)
382     #new_ticket.set_parent(api.auth.hierarchy.get_auth_ticket(auth_hrn))
383     new_ticket.encode()
384     new_ticket.sign()
385
386     return new_ticket.save_to_string(save_parents=True)
387
388
389
390 def main():
391     """
392     rspec = ListResources(api, "plc.princeton.sapan", None, 'pl_test_sapan')
393     #rspec = ListResources(api, "plc.princeton.coblitz", None, 'pl_test_coblitz')
394     #rspec = ListResources(api, "plc.pl.sirius", None, 'pl_test_sirius')
395     print rspec
396     """
397     api = SfaAPI()
398     f = open(sys.argv[1])
399     xml = f.read()
400     f.close()
401 #Error (E1120, main): No value passed for parameter 'users' in function call
402 #Error (E1120, main): No value passed for parameter 'call_id' in function call
403     CreateSliver(api, "plc.princeton.sapan", xml, 'CreateSliver_sapan')
404
405 if __name__ == "__main__":
406     main()