another step of moving stuff around where it belongs
[sfa.git] / sfa / managers / aggregate_manager.py
1 import datetime
2 import time
3 import sys
4
5 from sfa.util.faults import RecordNotFound, SliverDoesNotExist
6 from sfa.util.xrn import get_authority, hrn_to_urn, urn_to_hrn, Xrn, urn_to_sliver_id
7 from sfa.util.plxrn import slicename_to_hrn, hrn_to_pl_slicename
8 from sfa.util.version import version_core
9 from sfa.util.sfatime import utcparse
10 from sfa.util.callids import Callids
11
12 from sfa.trust.sfaticket import SfaTicket
13 from sfa.trust.credential import Credential
14 from sfa.rspecs.version_manager import VersionManager
15 from sfa.rspecs.rspec import RSpec
16
17 from sfa.server.sfaapi import SfaApi
18
19 import sfa.plc.peers as peers
20 from sfa.plc.aggregate import Aggregate
21 from sfa.plc.slices import Slices
22
23 def GetVersion(api):
24
25     version_manager = VersionManager()
26     ad_rspec_versions = []
27     request_rspec_versions = []
28     for rspec_version in version_manager.versions:
29         if rspec_version.content_type in ['*', 'ad']:
30             ad_rspec_versions.append(rspec_version.to_dict())
31         if rspec_version.content_type in ['*', 'request']:
32             request_rspec_versions.append(rspec_version.to_dict()) 
33     default_rspec_version = version_manager.get_version("sfa 1").to_dict()
34     xrn=Xrn(api.hrn)
35     version_more = {'interface':'aggregate',
36                     'testbed':'myplc',
37                     'hrn':xrn.get_hrn(),
38                     'request_rspec_versions': request_rspec_versions,
39                     'ad_rspec_versions': ad_rspec_versions,
40                     'default_ad_rspec': default_rspec_version
41                     }
42     return version_core(version_more)
43
44 def __get_registry_objects(slice_xrn, creds, users):
45     """
46
47     """
48     hrn, _ = urn_to_hrn(slice_xrn)
49
50     hrn_auth = get_authority(hrn)
51
52     # Build up objects that an SFA registry would return if SFA
53     # could contact the slice's registry directly
54     reg_objects = None
55
56     if users:
57         # dont allow special characters in the site login base
58         #only_alphanumeric = re.compile('[^a-zA-Z0-9]+')
59         #login_base = only_alphanumeric.sub('', hrn_auth[:20]).lower()
60         slicename = hrn_to_pl_slicename(hrn)
61         login_base = slicename.split('_')[0]
62         reg_objects = {}
63         site = {}
64         site['site_id'] = 0
65         site['name'] = 'geni.%s' % login_base 
66         site['enabled'] = True
67         site['max_slices'] = 100
68
69         # Note:
70         # Is it okay if this login base is the same as one already at this myplc site?
71         # Do we need uniqueness?  Should use hrn_auth instead of just the leaf perhaps?
72         site['login_base'] = login_base
73         site['abbreviated_name'] = login_base
74         site['max_slivers'] = 1000
75         reg_objects['site'] = site
76
77         slice = {}
78         
79         # get_expiration always returns a normalized datetime - no need to utcparse
80         extime = Credential(string=creds[0]).get_expiration()
81         # If the expiration time is > 60 days from now, set the expiration time to 60 days from now
82         if extime > datetime.datetime.utcnow() + datetime.timedelta(days=60):
83             extime = datetime.datetime.utcnow() + datetime.timedelta(days=60)
84         slice['expires'] = int(time.mktime(extime.timetuple()))
85         slice['hrn'] = hrn
86         slice['name'] = hrn_to_pl_slicename(hrn)
87         slice['url'] = hrn
88         slice['description'] = hrn
89         slice['pointer'] = 0
90         reg_objects['slice_record'] = slice
91
92         reg_objects['users'] = {}
93         for user in users:
94             user['key_ids'] = []
95             hrn, _ = urn_to_hrn(user['urn'])
96             user['email'] = hrn_to_pl_slicename(hrn) + "@geni.net"
97             user['first_name'] = hrn
98             user['last_name'] = hrn
99             reg_objects['users'][user['email']] = user
100
101         return reg_objects
102
103 def __get_hostnames(nodes):
104     hostnames = []
105     for node in nodes:
106         hostnames.append(node.hostname)
107     return hostnames
108
109 def SliverStatus(api, slice_xrn, creds, call_id):
110     if Callids().already_handled(call_id): return {}
111
112     (hrn, _) = urn_to_hrn(slice_xrn)
113     # find out where this slice is currently running
114     slicename = hrn_to_pl_slicename(hrn)
115     
116     slices = api.driver.GetSlices([slicename], ['slice_id', 'node_ids','person_ids','name','expires'])
117     if len(slices) == 0:        
118         raise Exception("Slice %s not found (used %s as slicename internally)" % (slice_xrn, slicename))
119     slice = slices[0]
120     
121     # report about the local nodes only
122     nodes = api.driver.GetNodes({'node_id':slice['node_ids'],'peer_id':None},
123                                  ['node_id', 'hostname', 'site_id', 'boot_state', 'last_contact'])
124     site_ids = [node['site_id'] for node in nodes]
125
126     result = {}
127     top_level_status = 'unknown'
128     if nodes:
129         top_level_status = 'ready'
130     slice_urn = Xrn(slice_xrn, 'slice').get_urn()
131     result['geni_urn'] = slice_urn
132     result['pl_login'] = slice['name']
133     result['pl_expires'] = datetime.datetime.fromtimestamp(slice['expires']).ctime()
134     
135     resources = []
136     for node in nodes:
137         res = {}
138         res['pl_hostname'] = node['hostname']
139         res['pl_boot_state'] = node['boot_state']
140         res['pl_last_contact'] = node['last_contact']
141         if node['last_contact'] is not None:
142             res['pl_last_contact'] = datetime.datetime.fromtimestamp(node['last_contact']).ctime()
143         sliver_id = urn_to_sliver_id(slice_urn, slice['slice_id'], node['node_id']) 
144         res['geni_urn'] = sliver_id
145         if node['boot_state'] == 'boot':
146             res['geni_status'] = 'ready'
147         else:
148             res['geni_status'] = 'failed'
149             top_level_status = 'failed' 
150             
151         res['geni_error'] = ''
152
153         resources.append(res)
154         
155     result['geni_status'] = top_level_status
156     result['geni_resources'] = resources
157     return result
158
159 def CreateSliver(api, slice_xrn, creds, rspec_string, users, call_id):
160     """
161     Create the sliver[s] (slice) at this aggregate.    
162     Verify HRN and initialize the slice record in PLC if necessary.
163     """
164     if Callids().already_handled(call_id): return ""
165
166     aggregate = Aggregate(api)
167     slices = Slices(api)
168     (hrn, _) = urn_to_hrn(slice_xrn)
169     peer = slices.get_peer(hrn)
170     sfa_peer = slices.get_sfa_peer(hrn)
171     slice_record=None    
172     if users:
173         slice_record = users[0].get('slice_record', {})
174
175     # parse rspec
176     rspec = RSpec(rspec_string)
177     requested_attributes = rspec.version.get_slice_attributes()
178     
179     # ensure site record exists
180     site = slices.verify_site(hrn, slice_record, peer, sfa_peer)
181     # ensure slice record exists
182     slice = slices.verify_slice(hrn, slice_record, peer, sfa_peer)
183     # ensure person records exists
184     persons = slices.verify_persons(hrn, slice, users, peer, sfa_peer)
185     # ensure slice attributes exists
186     slices.verify_slice_attributes(slice, requested_attributes)
187     
188     # add/remove slice from nodes
189     requested_slivers = [str(host) for host in rspec.version.get_nodes_with_slivers()]
190     slices.verify_slice_nodes(slice, requested_slivers, peer) 
191
192     aggregate.prepare_nodes({'hostname': requested_slivers})
193     aggregate.prepare_interfaces({'node_id': aggregate.nodes.keys()})    
194     slices.verify_slice_links(slice, rspec.version.get_link_requests(), aggregate)
195
196     # handle MyPLC peer association.
197     # only used by plc and ple.
198     slices.handle_peer(site, slice, persons, peer)
199     
200     return aggregate.get_rspec(slice_xrn=slice_xrn, version=rspec.version)
201
202
203 def RenewSliver(api, xrn, creds, expiration_time, call_id):
204     if Callids().already_handled(call_id): return True
205     (hrn, _) = urn_to_hrn(xrn)
206     slicename = hrn_to_pl_slicename(hrn)
207     slices = api.driver.GetSlices({'name': slicename}, ['slice_id'])
208     if not slices:
209         raise RecordNotFound(hrn)
210     slice = slices[0]
211     requested_time = utcparse(expiration_time)
212     record = {'expires': int(time.mktime(requested_time.timetuple()))}
213     try:
214         api.driver.UpdateSlice(slice['slice_id'], record)
215         return True
216     except:
217         return False
218
219 def start_slice(api, xrn, creds):
220     (hrn, _) = urn_to_hrn(xrn)
221     slicename = hrn_to_pl_slicename(hrn)
222     slices = api.driver.GetSlices({'name': slicename}, ['slice_id'])
223     if not slices:
224         raise RecordNotFound(hrn)
225     slice_id = slices[0]['slice_id']
226     slice_tags = api.driver.GetSliceTags({'slice_id': slice_id, 'tagname': 'enabled'}, ['slice_tag_id'])
227     # just remove the tag if it exists
228     if slice_tags:
229         api.driver.DeleteSliceTag(slice_tags[0]['slice_tag_id'])
230
231     return 1
232  
233 def stop_slice(api, xrn, creds):
234     hrn, _ = urn_to_hrn(xrn)
235     slicename = hrn_to_pl_slicename(hrn)
236     slices = api.driver.GetSlices({'name': slicename}, ['slice_id'])
237     if not slices:
238         raise RecordNotFound(hrn)
239     slice_id = slices[0]['slice_id']
240     slice_tags = api.driver.GetSliceTags({'slice_id': slice_id, 'tagname': 'enabled'})
241     if not slice_tags:
242         api.driver.AddSliceTag(slice_id, 'enabled', '0')
243     elif slice_tags[0]['value'] != "0":
244         tag_id = slice_tags[0]['slice_tag_id']
245         api.driver.UpdateSliceTag(tag_id, '0')
246     return 1
247
248 def reset_slice(api, xrn):
249     # XX not implemented at this interface
250     return 1
251
252 def DeleteSliver(api, xrn, creds, call_id):
253     if Callids().already_handled(call_id): return ""
254     (hrn, _) = urn_to_hrn(xrn)
255     slicename = hrn_to_pl_slicename(hrn)
256     slices = api.driver.GetSlices({'name': slicename})
257     if not slices:
258         return 1
259     slice = slices[0]
260
261     # determine if this is a peer slice
262     peer = peers.get_peer(api, hrn)
263     try:
264         if peer:
265             api.driver.UnBindObjectFromPeer('slice', slice['slice_id'], peer)
266         api.driver.DeleteSliceFromNodes(slicename, slice['node_ids'])
267     finally:
268         if peer:
269             api.driver.BindObjectToPeer('slice', slice['slice_id'], peer, slice['peer_slice_id'])
270     return 1
271
272 # xxx Thierry : caching at the aggregate level sounds wrong...
273 #caching=True
274 caching=False
275 def ListSlices(api, creds, call_id):
276     if Callids().already_handled(call_id): return []
277     # look in cache first
278     if caching and api.cache:
279         slices = api.cache.get('slices')
280         if slices:
281             return slices
282
283     # get data from db 
284     slices = api.driver.GetSlices({'peer_id': None}, ['name'])
285     slice_hrns = [slicename_to_hrn(api.hrn, slice['name']) for slice in slices]
286     slice_urns = [hrn_to_urn(slice_hrn, 'slice') for slice_hrn in slice_hrns]
287
288     # cache the result
289     if caching and api.cache:
290         api.cache.add('slices', slice_urns) 
291
292     return slice_urns
293     
294 def ListResources(api, creds, options, call_id):
295     if Callids().already_handled(call_id): return ""
296     # get slice's hrn from options
297     xrn = options.get('geni_slice_urn', None)
298     (hrn, _) = urn_to_hrn(xrn)
299
300     version_manager = VersionManager()
301     # get the rspec's return format from options
302     rspec_version = version_manager.get_version(options.get('rspec_version'))
303     version_string = "rspec_%s" % (rspec_version.to_string())
304
305     #panos adding the info option to the caching key (can be improved)
306     if options.get('info'):
307         version_string = version_string + "_"+options.get('info', 'default')
308
309     # look in cache first
310     if caching and api.cache and not xrn:
311         rspec = api.cache.get(version_string)
312         if rspec:
313             api.logger.info("aggregate.ListResources: returning cached value for hrn %s"%hrn)
314             return rspec 
315
316     #panos: passing user-defined options
317     #print "manager options = ",options
318     aggregate = Aggregate(api, options)
319     rspec =  aggregate.get_rspec(slice_xrn=xrn, version=rspec_version)
320
321     # cache the result
322     if caching and api.cache and not xrn:
323         api.cache.add(version_string, rspec)
324
325     return rspec
326
327
328 def get_ticket(api, xrn, creds, rspec, users):
329
330     (slice_hrn, _) = urn_to_hrn(xrn)
331     slices = Slices(api)
332     peer = slices.get_peer(slice_hrn)
333     sfa_peer = slices.get_sfa_peer(slice_hrn)
334
335     # get the slice record
336     credential = api.getCredential()
337     interface = api.registries[api.hrn]
338     registry = api.server_proxy(interface, credential)
339     records = registry.Resolve(xrn, credential)
340
341     # make sure we get a local slice record
342     record = None
343     for tmp_record in records:
344         if tmp_record['type'] == 'slice' and \
345            not tmp_record['peer_authority']:
346 #Error (E0602, get_ticket): Undefined variable 'SliceRecord'
347             record = SliceRecord(dict=tmp_record)
348     if not record:
349         raise RecordNotFound(slice_hrn)
350     
351     # similar to CreateSliver, we must verify that the required records exist
352     # at this aggregate before we can issue a ticket
353     # parse rspec
354     rspec = RSpec(rspec_string)
355     requested_attributes = rspec.version.get_slice_attributes()
356
357     # ensure site record exists
358     site = slices.verify_site(hrn, slice_record, peer, sfa_peer)
359     # ensure slice record exists
360     slice = slices.verify_slice(hrn, slice_record, peer, sfa_peer)
361     # ensure person records exists
362     persons = slices.verify_persons(hrn, slice, users, peer, sfa_peer)
363     # ensure slice attributes exists
364     slices.verify_slice_attributes(slice, requested_attributes)
365     
366     # get sliver info
367     slivers = slices.get_slivers(slice_hrn)
368
369     if not slivers:
370         raise SliverDoesNotExist(slice_hrn)
371
372     # get initscripts
373     initscripts = []
374     data = {
375         'timestamp': int(time.time()),
376         'initscripts': initscripts,
377         'slivers': slivers
378     }
379
380     # create the ticket
381     object_gid = record.get_gid_object()
382     new_ticket = SfaTicket(subject = object_gid.get_subject())
383     new_ticket.set_gid_caller(api.auth.client_gid)
384     new_ticket.set_gid_object(object_gid)
385     new_ticket.set_issuer(key=api.key, subject=api.hrn)
386     new_ticket.set_pubkey(object_gid.get_pubkey())
387     new_ticket.set_attributes(data)
388     new_ticket.set_rspec(rspec)
389     #new_ticket.set_parent(api.auth.hierarchy.get_auth_ticket(auth_hrn))
390     new_ticket.encode()
391     new_ticket.sign()
392
393     return new_ticket.save_to_string(save_parents=True)
394
395
396
397 #def main():
398 #    """
399 #    rspec = ListResources(api, "plc.princeton.sapan", None, 'pl_test_sapan')
400 #    #rspec = ListResources(api, "plc.princeton.coblitz", None, 'pl_test_coblitz')
401 #    #rspec = ListResources(api, "plc.pl.sirius", None, 'pl_test_sirius')
402 #    print rspec
403 #    """
404 #    api = PlcSfaApi()
405 #    f = open(sys.argv[1])
406 #    xml = f.read()
407 #    f.close()
408 ##Error (E1120, main): No value passed for parameter 'users' in function call
409 ##Error (E1120, main): No value passed for parameter 'call_id' in function call
410 #    CreateSliver(api, "plc.princeton.sapan", xml, 'CreateSliver_sapan')
411 #
412 #if __name__ == "__main__":
413 #    main()