f723f490bac78ecfebaac9dcbdc1a32d61db978a
[sfa.git] / sfa / managers / aggregate_manager_pl.py
1
2
3 import datetime
4 import time
5 import traceback
6 import sys
7 import re
8 from types import StringTypes
9
10 from sfa.util.faults import *
11 from sfa.util.xrn import get_authority, hrn_to_urn, urn_to_hrn, Xrn, urn_to_sliver_id
12 from sfa.util.plxrn import slicename_to_hrn, hrn_to_pl_slicename, hostname_to_urn
13 from sfa.util.rspec import *
14 from sfa.util.specdict import *
15 from sfa.util.record import SfaRecord
16 from sfa.util.policy import Policy
17 from sfa.util.record import *
18 from sfa.util.sfaticket import SfaTicket
19 from sfa.plc.slices import Slices
20 from sfa.trust.credential import Credential
21 import sfa.plc.peers as peers
22 from sfa.plc.network import *
23 from sfa.plc.api import SfaAPI
24 from sfa.plc.aggregate import Aggregate
25 from sfa.plc.slices import *
26 from sfa.util.version import version_core
27 from sfa.rspecs.rspec_version import RSpecVersion
28 from sfa.rspecs.sfa_rspec import sfa_rspec_version
29 from sfa.rspecs.pg_rspec import pg_rspec_ad_version, pg_rspec_request_version
30 from sfa.rspecs.rspec_parser import parse_rspec 
31 from sfa.util.sfatime import utcparse
32 from sfa.util.callids import Callids
33
34 def GetVersion(api):
35     xrn=Xrn(api.hrn)
36     request_rspec_versions = [dict(pg_rspec_request_version), dict(sfa_rspec_version)]
37     ad_rspec_versions = [dict(pg_rspec_ad_version), dict(sfa_rspec_version)]
38     version_more = {'interface':'aggregate',
39                     'testbed':'myplc',
40                     'hrn':xrn.get_hrn(),
41                     'request_rspec_versions': request_rspec_versions,
42                     'ad_rspec_versions': ad_rspec_versions,
43                     'default_ad_rspec': dict(sfa_rspec_version)
44                     }
45     return version_core(version_more)
46
47 def __get_registry_objects(slice_xrn, creds, users):
48     """
49
50     """
51     hrn, type = urn_to_hrn(slice_xrn)
52
53     hrn_auth = get_authority(hrn)
54
55     # Build up objects that an SFA registry would return if SFA
56     # could contact the slice's registry directly
57     reg_objects = None
58
59     if users:
60         # dont allow special characters in the site login base
61         #only_alphanumeric = re.compile('[^a-zA-Z0-9]+')
62         #login_base = only_alphanumeric.sub('', hrn_auth[:20]).lower()
63         slicename = hrn_to_pl_slicename(hrn)
64         login_base = slicename.split('_')[0]
65         reg_objects = {}
66         site = {}
67         site['site_id'] = 0
68         site['name'] = 'geni.%s' % login_base 
69         site['enabled'] = True
70         site['max_slices'] = 100
71
72         # Note:
73         # Is it okay if this login base is the same as one already at this myplc site?
74         # Do we need uniqueness?  Should use hrn_auth instead of just the leaf perhaps?
75         site['login_base'] = login_base
76         site['abbreviated_name'] = login_base
77         site['max_slivers'] = 1000
78         reg_objects['site'] = site
79
80         slice = {}
81         
82         # get_expiration always returns a normalized datetime - no need to utcparse
83         extime = Credential(string=creds[0]).get_expiration()
84         # If the expiration time is > 60 days from now, set the expiration time to 60 days from now
85         if extime > datetime.datetime.utcnow() + datetime.timedelta(days=60):
86             extime = datetime.datetime.utcnow() + datetime.timedelta(days=60)
87         slice['expires'] = int(time.mktime(extime.timetuple()))
88         slice['hrn'] = hrn
89         slice['name'] = hrn_to_pl_slicename(hrn)
90         slice['url'] = hrn
91         slice['description'] = hrn
92         slice['pointer'] = 0
93         reg_objects['slice_record'] = slice
94
95         reg_objects['users'] = {}
96         for user in users:
97             user['key_ids'] = []
98             hrn, _ = urn_to_hrn(user['urn'])
99             user['email'] = hrn_to_pl_slicename(hrn) + "@geni.net"
100             user['first_name'] = hrn
101             user['last_name'] = hrn
102             reg_objects['users'][user['email']] = user
103
104         return reg_objects
105
106 def __get_hostnames(nodes):
107     hostnames = []
108     for node in nodes:
109         hostnames.append(node.hostname)
110     return hostnames
111
112 def SliverStatus(api, slice_xrn, creds, call_id):
113     if Callids().already_handled(call_id): return {}
114
115     (hrn, type) = urn_to_hrn(slice_xrn)
116     # find out where this slice is currently running
117     slicename = hrn_to_pl_slicename(hrn)
118     
119     slices = api.plshell.GetSlices(api.plauth, [slicename], ['slice_id', 'node_ids','person_ids','name','expires'])
120     if len(slices) == 0:        
121         raise Exception("Slice %s not found (used %s as slicename internally)" % (slice_xrn, slicename))
122     slice = slices[0]
123     
124     # report about the local nodes only
125     nodes = api.plshell.GetNodes(api.plauth, {'node_id':slice['node_ids'],'peer_id':None},
126                                  ['node_id', 'hostname', 'site_id', 'boot_state', 'last_contact'])
127     site_ids = [node['site_id'] for node in nodes]
128     sites = api.plshell.GetSites(api.plauth, site_ids, ['site_id', 'login_base'])
129     sites_dict = dict ( [ (site['site_id'],site['login_base'] ) for site in sites ] )
130
131     result = {}
132     top_level_status = 'unknown'
133     if nodes:
134         top_level_status = 'ready'
135     slice_urn = Xrn(slice_xrn, 'slice').get_urn()
136     result['geni_urn'] = slice_urn
137     result['pl_login'] = slice['name']
138     result['pl_expires'] = datetime.datetime.fromtimestamp(slice['expires']).ctime()
139     
140     resources = []
141     for node in nodes:
142         res = {}
143         res['pl_hostname'] = node['hostname']
144         res['pl_boot_state'] = node['boot_state']
145         res['pl_last_contact'] = node['last_contact']
146         if node['last_contact'] is not None:
147             res['pl_last_contact'] = datetime.datetime.fromtimestamp(node['last_contact']).ctime()
148         sliver_id = urn_to_sliver_id(slice_urn, slice['slice_id'], node['node_id']) 
149         res['geni_urn'] = sliver_id
150         if node['boot_state'] == 'boot':
151             res['geni_status'] = 'ready'
152         else:
153             res['geni_status'] = 'failed'
154             top_level_staus = 'failed' 
155             
156         res['geni_error'] = ''
157
158         resources.append(res)
159         
160     result['geni_status'] = top_level_status
161     result['geni_resources'] = resources
162     return result
163
164 def CreateSliver(api, slice_xrn, creds, rspec_string, users, call_id):
165     """
166     Create the sliver[s] (slice) at this aggregate.    
167     Verify HRN and initialize the slice record in PLC if necessary.
168     """
169     if Callids().already_handled(call_id): return ""
170
171     aggregate = Aggregate(api)
172     slices = Slices(api)
173     (hrn, type) = urn_to_hrn(slice_xrn)
174     peer = slices.get_peer(hrn)
175     sfa_peer = slices.get_sfa_peer(hrn)
176     slice_record=None    
177     if users:
178         slice_record = users[0].get('slice_record', {})
179
180     # parse rspec
181     rspec = parse_rspec(rspec_string)
182     requested_attributes = rspec.get_slice_attributes()
183     
184     # ensure site record exists
185     site = slices.verify_site(hrn, slice_record, peer, sfa_peer)
186     # ensure slice record exists
187     slice = slices.verify_slice(hrn, slice_record, peer, sfa_peer)
188     # ensure person records exists
189     persons = slices.verify_persons(hrn, slice, users, peer, sfa_peer)
190     # ensure slice attributes exists
191     slices.verify_slice_attributes(slice, requested_attributes)
192     
193     # add/remove slice from nodes
194     requested_slivers = [str(host) for host in rspec.get_nodes_with_slivers()]
195     slices.verify_slice_nodes(slice, requested_slivers, peer) 
196
197     # hanlde MyPLC peer association.
198     # only used by plc and ple.
199     slices.handle_peer(site, slice, persons, peer)
200     
201     return aggregate.get_rspec(slice_xrn=slice_xrn, version=rspec.version)
202
203
204 def RenewSliver(api, xrn, creds, expiration_time, call_id):
205     if Callids().already_handled(call_id): return True
206     (hrn, type) = urn_to_hrn(xrn)
207     slicename = hrn_to_pl_slicename(hrn)
208     slices = api.plshell.GetSlices(api.plauth, {'name': slicename}, ['slice_id'])
209     if not slices:
210         raise RecordNotFound(hrn)
211     slice = slices[0]
212     requested_time = utcparse(expiration_time)
213     record = {'expires': int(time.mktime(requested_time.timetuple()))}
214     try:
215         api.plshell.UpdateSlice(api.plauth, slice['slice_id'], record)
216         return True
217     except:
218         return False
219
220 def start_slice(api, xrn, creds):
221     hrn, type = urn_to_hrn(xrn)
222     slicename = hrn_to_pl_slicename(hrn)
223     slices = api.plshell.GetSlices(api.plauth, {'name': slicename}, ['slice_id'])
224     if not slices:
225         raise RecordNotFound(hrn)
226     slice_id = slices[0]['slice_id']
227     slice_tags = api.plshell.GetSliceTags(api.plauth, {'slice_id': slice_id, 'tagname': 'enabled'}, ['slice_tag_id'])
228     # just remove the tag if it exists
229     if slice_tags:
230         api.plshell.DeleteSliceTag(api.plauth, slice_tags[0]['slice_tag_id'])
231
232     return 1
233  
234 def stop_slice(api, xrn, creds):
235     hrn, type = urn_to_hrn(xrn)
236     slicename = hrn_to_pl_slicename(hrn)
237     slices = api.plshell.GetSlices(api.plauth, {'name': slicename}, ['slice_id'])
238     if not slices:
239         raise RecordNotFound(hrn)
240     slice_id = slices[0]['slice_id']
241     slice_tags = api.plshell.GetSliceTags(api.plauth, {'slice_id': slice_id, 'tagname': 'enabled'})
242     if not slice_tags:
243         api.plshell.AddSliceTag(api.plauth, slice_id, 'enabled', '0')
244     elif slice_tags[0]['value'] != "0":
245         tag_id = attributes[0]['slice_tag_id']
246         api.plshell.UpdateSliceTag(api.plauth, tag_id, '0')
247     return 1
248
249 def reset_slice(api, xrn):
250     # XX not implemented at this interface
251     return 1
252
253 def DeleteSliver(api, xrn, creds, call_id):
254     if Callids().already_handled(call_id): return ""
255     (hrn, type) = urn_to_hrn(xrn)
256     slicename = hrn_to_pl_slicename(hrn)
257     slices = api.plshell.GetSlices(api.plauth, {'name': slicename})
258     if not slices:
259         return 1
260     slice = slices[0]
261
262     # determine if this is a peer slice
263     peer = peers.get_peer(api, hrn)
264     try:
265         if peer:
266             api.plshell.UnBindObjectFromPeer(api.plauth, 'slice', slice['slice_id'], peer)
267         api.plshell.DeleteSliceFromNodes(api.plauth, slicename, slice['node_ids'])
268     finally:
269         if peer:
270             api.plshell.BindObjectToPeer(api.plauth, 'slice', slice['slice_id'], peer, slice['peer_slice_id'])
271     return 1
272
273 # xxx Thierry : caching at the aggregate level sounds wrong...
274 #caching=True
275 caching=False
276 def ListSlices(api, creds, call_id):
277     if Callids().already_handled(call_id): return []
278     # look in cache first
279     if caching and api.cache:
280         slices = api.cache.get('slices')
281         if slices:
282             return slices
283
284     # get data from db 
285     slices = api.plshell.GetSlices(api.plauth, {'peer_id': None}, ['name'])
286     slice_hrns = [slicename_to_hrn(api.hrn, slice['name']) for slice in slices]
287     slice_urns = [hrn_to_urn(slice_hrn, 'slice') for slice_hrn in slice_hrns]
288
289     # cache the result
290     if caching and api.cache:
291         api.cache.add('slices', slice_urns) 
292
293     return slice_urns
294     
295 def ListResources(api, creds, options,call_id):
296     if Callids().already_handled(call_id): return ""
297     # get slice's hrn from options
298     xrn = options.get('geni_slice_urn', '')
299     (hrn, type) = urn_to_hrn(xrn)
300
301     # get the rspec's return format from options
302     rspec_version = RSpecVersion(options.get('rspec_version'))
303     version_string = "rspec_%s" % (rspec_version.get_version_name())
304
305     #panos adding the info option to the caching key (can be improved)
306     if options.get('info'):
307         version_string = version_string + "_"+options.get('info', 'default')
308
309     # look in cache first
310     if caching and api.cache and not xrn:
311         rspec = api.cache.get(version_string)
312         if rspec:
313             api.logger.info("aggregate.ListResources: returning cached value for hrn %s"%hrn)
314             return rspec 
315
316     #aggregate = Aggregate(api)
317     #panos: passing user-defined options
318     #print "manager options = ",options
319     aggregate = Aggregate(api, options)
320
321     rspec =  aggregate.get_rspec(slice_xrn=xrn, version=rspec_version)
322
323     # cache the result
324     if caching and api.cache and not xrn:
325         api.cache.add(version_string, rspec)
326
327     return rspec
328
329
330 def get_ticket(api, xrn, creds, rspec, users):
331
332     reg_objects = __get_registry_objects(xrn, creds, users)
333
334     slice_hrn, type = urn_to_hrn(xrn)
335     slices = Slices(api)
336     peer = slices.get_peer(slice_hrn)
337     sfa_peer = slices.get_sfa_peer(slice_hrn)
338
339     # get the slice record
340     registry = api.registries[api.hrn]
341     credential = api.getCredential()
342     records = registry.Resolve(xrn, credential)
343
344     # similar to CreateSliver, we must verify that the required records exist
345     # at this aggregate before we can issue a ticket
346     site_id, remote_site_id = slices.verify_site(registry, credential, slice_hrn,
347                                                  peer, sfa_peer, reg_objects)
348     slice = slices.verify_slice(registry, credential, slice_hrn, site_id,
349                                 remote_site_id, peer, sfa_peer, reg_objects)
350
351     # make sure we get a local slice record
352     record = None
353     for tmp_record in records:
354         if tmp_record['type'] == 'slice' and \
355            not tmp_record['peer_authority']:
356             record = SliceRecord(dict=tmp_record)
357     if not record:
358         raise RecordNotFound(slice_hrn)
359
360     # get sliver info
361     slivers = Slices(api).get_slivers(slice_hrn)
362     if not slivers:
363         raise SliverDoesNotExist(slice_hrn)
364
365     # get initscripts
366     initscripts = []
367     data = {
368         'timestamp': int(time.time()),
369         'initscripts': initscripts,
370         'slivers': slivers
371     }
372
373     # create the ticket
374     object_gid = record.get_gid_object()
375     new_ticket = SfaTicket(subject = object_gid.get_subject())
376     new_ticket.set_gid_caller(api.auth.client_gid)
377     new_ticket.set_gid_object(object_gid)
378     new_ticket.set_issuer(key=api.key, subject=api.hrn)
379     new_ticket.set_pubkey(object_gid.get_pubkey())
380     new_ticket.set_attributes(data)
381     new_ticket.set_rspec(rspec)
382     #new_ticket.set_parent(api.auth.hierarchy.get_auth_ticket(auth_hrn))
383     new_ticket.encode()
384     new_ticket.sign()
385
386     return new_ticket.save_to_string(save_parents=True)
387
388
389
390 def main():
391     api = SfaAPI()
392     """
393     rspec = ListResources(api, "plc.princeton.sapan", None, 'pl_test_sapan')
394     #rspec = ListResources(api, "plc.princeton.coblitz", None, 'pl_test_coblitz')
395     #rspec = ListResources(api, "plc.pl.sirius", None, 'pl_test_sirius')
396     print rspec
397     """
398     f = open(sys.argv[1])
399     xml = f.read()
400     f.close()
401     CreateSliver(api, "plc.princeton.sapan", xml, 'CreateSliver_sapan')
402
403 if __name__ == "__main__":
404     main()