Merge branch 'master' into senslab2
[sfa.git] / sfa / managers / aggregate_manager_slab.py
1 import datetime
2 import time
3 import sys
4
5 from sfa.util.sfalogging import logger
6 from sfa.util.faults import RecordNotFound, SliverDoesNotExist
7 from sfa.util.xrn import get_authority, hrn_to_urn, urn_to_hrn, Xrn, urn_to_sliver_id
8 from sfa.util.plxrn import slicename_to_hrn, hrn_to_pl_slicename
9 from sfa.util.version import version_core
10 from sfa.util.sfatime import utcparse
11 from sfa.util.callids import Callids
12
13 from sfa.trust.sfaticket import SfaTicket
14 from sfa.trust.credential import Credential
15 from sfa.rspecs.version_manager import VersionManager
16 from sfa.rspecs.rspec import RSpec
17
18 from sfa.server.sfaapi import SfaApi
19 from sfa.senslab.slabaggregate import SlabAggregate
20 import sfa.plc.peers as peers
21
22 from sfa.senslab.slices import SlabSlices
23
24 class AggregateManager:
25     def __init__ (self, config): pass
26     # essentially a union of the core version, the generic version (this code) and
27     # whatever the driver needs to expose
28     
29     
30     def GetVersion(self, api, options):
31
32         xrn=Xrn(api.hrn)
33         version = version_core()
34         version_generic = {'interface':'aggregate',
35                             'sfa': 2,
36                             'geni_api': 2,
37                             'hrn':xrn.get_hrn(),
38                             'urn':xrn.get_urn(),
39                             }
40         version.update(version_generic)
41         testbed_version = self.driver.aggregate_version()
42         version.update(testbed_version)
43         return version
44      
45     #def GetVersion(self, api, options={}):
46     
47         #version_manager = VersionManager()
48         #ad_rspec_versions = []
49         #request_rspec_versions = []
50         #for rspec_version in version_manager.versions:
51             #if rspec_version.content_type in ['*', 'ad']:
52                 #ad_rspec_versions.append(rspec_version.to_dict())
53             #if rspec_version.content_type in ['*', 'request']:
54                 #request_rspec_versions.append(rspec_version.to_dict()) 
55         #xrn=Xrn(api.hrn)
56         #version_more = {'interface':'aggregate',
57                         #'sfa': 2,
58                         #'geni_api': api.config.SFA_AGGREGATE_API_VERSION,
59                         #'testbed':'myplc',
60                         #'hrn':xrn.get_hrn(),
61                         #'geni_request_rspec_versions': request_rspec_versions,
62                         #'geni_ad_rspec_versions': ad_rspec_versions,
63                         #}
64         #return version_core(version_more)
65     
66     def _get_registry_objects(self, slice_xrn, creds, users):
67         """
68     
69         """
70         hrn, _ = urn_to_hrn(slice_xrn)
71     
72         hrn_auth = get_authority(hrn)
73     
74         # Build up objects that an SFA registry would return if SFA
75         # could contact the slice's registry directly
76         reg_objects = None
77     
78         if users:
79             # dont allow special characters in the site login base
80             #only_alphanumeric = re.compile('[^a-zA-Z0-9]+')
81             #login_base = only_alphanumeric.sub('', hrn_auth[:20]).lower()
82             slicename = hrn_to_pl_slicename(hrn)
83             login_base = slicename.split('_')[0]
84             reg_objects = {}
85             site = {}
86             site['site_id'] = 0
87             site['name'] = 'geni.%s' % login_base 
88             site['enabled'] = True
89             site['max_slices'] = 100
90     
91             # Note:
92             # Is it okay if this login base is the same as one already at this myplc site?
93             # Do we need uniqueness?  Should use hrn_auth instead of just the leaf perhaps?
94             site['login_base'] = login_base
95             site['abbreviated_name'] = login_base
96             site['max_slivers'] = 1000
97             reg_objects['site'] = site
98     
99             slice = {}
100             
101             # get_expiration always returns a normalized datetime - no need to utcparse
102             extime = Credential(string=creds[0]).get_expiration()
103             # If the expiration time is > 60 days from now, set the expiration time to 60 days from now
104             if extime > datetime.datetime.utcnow() + datetime.timedelta(days=60):
105                 extime = datetime.datetime.utcnow() + datetime.timedelta(days=60)
106             slice['expires'] = int(time.mktime(extime.timetuple()))
107             slice['hrn'] = hrn
108             slice['name'] = hrn_to_pl_slicename(hrn)
109             slice['url'] = hrn
110             slice['description'] = hrn
111             slice['pointer'] = 0
112             reg_objects['slice_record'] = slice
113     
114             reg_objects['users'] = {}
115             for user in users:
116                 user['key_ids'] = []
117                 hrn, _ = urn_to_hrn(user['urn'])
118                 user['email'] = hrn_to_pl_slicename(hrn) + "@geni.net"
119                 user['first_name'] = hrn
120                 user['last_name'] = hrn
121                 reg_objects['users'][user['email']] = user
122     
123             return reg_objects
124     
125     def SliverStatus(self, api, slice_xrn, creds, options={}):
126         call_id = options.get('call_id')
127         if Callids().already_handled(call_id): return {}
128     
129         (hrn, _) = urn_to_hrn(slice_xrn)
130         # find out where this slice is currently running
131         slicename = hrn_to_pl_slicename(hrn)
132         
133         slices = api.driver.GetSlices([slicename], ['slice_id', 'node_ids','person_ids','name','expires'])
134         if len(slices) == 0:        
135             raise Exception("Slice %s not found (used %s as slicename internally)" % (slice_xrn, slicename))
136         slice = slices[0]
137         
138         # report about the local nodes only
139         nodes = api.driver.GetNodes({'node_id':slice['node_ids'],'peer_id':None},
140                                      ['node_id', 'hostname', 'site_id', 'boot_state', 'last_contact'])
141         site_ids = [node['site_id'] for node in nodes]
142     
143         result = {}
144         top_level_status = 'unknown'
145         if nodes:
146             top_level_status = 'ready'
147         slice_urn = Xrn(slice_xrn, 'slice').get_urn()
148         result['geni_urn'] = slice_urn
149         result['pl_login'] = slice['name']
150         result['pl_expires'] = datetime.datetime.fromtimestamp(slice['expires']).ctime()
151         
152         resources = []
153         for node in nodes:
154             res = {}
155             res['pl_hostname'] = node['hostname']
156             res['pl_boot_state'] = node['boot_state']
157             res['pl_last_contact'] = node['last_contact']
158             if node['last_contact'] is not None:
159                 res['pl_last_contact'] = datetime.datetime.fromtimestamp(node['last_contact']).ctime()
160             sliver_id = urn_to_sliver_id(slice_urn, slice['slice_id'], node['node_id']) 
161             res['geni_urn'] = sliver_id
162             if node['boot_state'] == 'boot':
163                 res['geni_status'] = 'ready'
164             else:
165                 res['geni_status'] = 'failed'
166                 top_level_status = 'failed' 
167                 
168             res['geni_error'] = ''
169     
170             resources.append(res)
171             
172         result['geni_status'] = top_level_status
173         result['geni_resources'] = resources
174         return result
175     
176     def CreateSliver(self, api, slice_xrn, creds, rspec_string, users, options={}):
177         """
178         Create the sliver[s] (slice) at this aggregate.    
179         Verify HRN and initialize the slice record in PLC if necessary.
180         """
181         call_id = options.get('call_id')
182         if Callids().already_handled(call_id): return ""
183         aggregate = SlabAggregate(api)
184         #aggregate = Aggregate(api)
185         slices = SlabSlices(api)
186         (hrn, _) = urn_to_hrn(slice_xrn)
187         peer = slices.get_peer(hrn)
188         sfa_peer = slices.get_sfa_peer(hrn)
189         slice_record=None    
190         if users:
191             slice_record = users[0].get('slice_record', {})
192             print >>sys.stderr, " \r\n \t AGGREGATESLAB.PY Slice slice_record : ", slice_record
193     
194         # parse rspec
195         rspec = RSpec(rspec_string)
196         requested_attributes = rspec.version.get_slice_attributes()
197         
198         # ensure site record exists
199         site = slices.verify_site(hrn, slice_record, peer, sfa_peer)
200         # ensure slice record exists
201         print>>sys.stderr, " \r\n \t AGGREGATESLAB.PY Slice users : ", users
202         slice = slices.verify_slice(hrn, slice_record, peer, sfa_peer)
203         print >>sys.stderr, " \r\n \t AGGREGATESLAB.PY Slice slice : ", slice
204         # ensure person records exists
205         persons = slices.verify_persons(hrn, slice, users)
206         #persons = slices.verify_persons(hrn, slice, users, peer, sfa_peer)
207         # ensure slice attributes exists
208         #slices.verify_slice_attributes(slice, requested_attributes)
209         
210         # add/remove slice from nodes
211         requested_slivers = [node.get('component_name') for node in rspec.version.get_nodes_with_slivers()]
212         print >>sys.stderr, " \r\n \t AGGREGATESLAB.PY Slice requested_slivers : ", requested_slivers
213         slices.verify_slice_nodes(slice, requested_slivers, peer) 
214    
215         # add/remove links links 
216         #slices.verify_slice_links(slice, rspec.version.get_link_requests(), aggregate)
217     
218         # handle MyPLC peer association.
219         # only used by plc and ple.
220         slices.handle_peer(site, slice, persons, peer)
221         
222         return aggregate.get_rspec(slice_xrn=slice_xrn, version=rspec.version)
223     
224     
225     def RenewSliver(self, api, xrn, creds, expiration_time, options={}):
226         call_id = options.get('call_id')
227         if Callids().already_handled(call_id): return True
228         (hrn, _) = urn_to_hrn(xrn)
229         slicename = hrn_to_pl_slicename(hrn)
230         slices = api.driver.GetSlices({'name': slicename}, ['slice_id'])
231         if not slices:
232             raise RecordNotFound(hrn)
233         slice = slices[0]
234         requested_time = utcparse(expiration_time)
235         record = {'expires': int(time.mktime(requested_time.timetuple()))}
236         try:
237             api.driver.UpdateSlice(slice['slice_id'], record)
238             return True
239         except:
240             return False
241     
242     def start_slice(self, api, xrn, creds):
243         (hrn, _) = urn_to_hrn(xrn)
244         slicename = hrn_to_pl_slicename(hrn)
245         slices = api.driver.GetSlices({'name': slicename}, ['slice_id'])
246         if not slices:
247             raise RecordNotFound(hrn)
248         slice_id = slices[0]['slice_id']
249         slice_tags = api.driver.GetSliceTags({'slice_id': slice_id, 'tagname': 'enabled'}, ['slice_tag_id'])
250         # just remove the tag if it exists
251         if slice_tags:
252             api.driver.DeleteSliceTag(slice_tags[0]['slice_tag_id'])
253     
254         return 1
255      
256     def stop_slice(self, api, xrn, creds):
257         hrn, _ = urn_to_hrn(xrn)
258         slicename = hrn_to_pl_slicename(hrn)
259         slices = api.driver.GetSlices({'name': slicename}, ['slice_id'])
260         if not slices:
261             raise RecordNotFound(hrn)
262         slice_id = slices[0]['slice_id']
263         slice_tags = api.driver.GetSliceTags({'slice_id': slice_id, 'tagname': 'enabled'})
264         if not slice_tags:
265             api.driver.AddSliceTag(slice_id, 'enabled', '0')
266         elif slice_tags[0]['value'] != "0":
267             tag_id = slice_tags[0]['slice_tag_id']
268             api.driver.UpdateSliceTag(tag_id, '0')
269         return 1
270     
271     def reset_slice(self, api, xrn):
272         # XX not implemented at this interface
273         return 1
274     
275     def DeleteSliver(self, api, xrn, creds, options={}):
276         call_id = options.get('call_id')
277         if Callids().already_handled(call_id): return ""
278         (hrn, _) = urn_to_hrn(xrn)
279         slicename = hrn_to_pl_slicename(hrn)
280         slices = api.driver.GetSlices({'name': slicename})
281         if not slices:
282             return 1
283         slice = slices[0]
284     
285         # determine if this is a peer slice
286         peer = peers.get_peer(api, hrn)
287         try:
288             if peer:
289                 api.driver.UnBindObjectFromPeer('slice', slice['slice_id'], peer)
290             api.driver.DeleteSliceFromNodes(slicename, slice['node_ids'])
291         finally:
292             if peer:
293                 api.driver.BindObjectToPeer('slice', slice['slice_id'], peer, slice['peer_slice_id'])
294         return 1
295     
296     def ListSlices(self, api, creds, options={}):
297         call_id = options.get('call_id')
298         if Callids().already_handled(call_id): return []
299         # look in cache first
300         #if self.caching and api.cache:
301             #slices = api.cache.get('slices')
302             #if slices:
303                 #return slices
304     
305         # get data from db 
306         slices = api.driver.GetSlices({'peer_id': None}, ['name'])
307         slice_hrns = [slicename_to_hrn(api.hrn, slice['name']) for slice in slices]
308         slice_urns = [hrn_to_urn(slice_hrn, 'slice') for slice_hrn in slice_hrns]
309     
310         # cache the result
311         #if self.caching and api.cache:
312             #api.cache.add('slices', slice_urns) 
313     
314         return slice_urns
315         
316     def ListResources(self, api, creds, options={}):
317         call_id = options.get('call_id')
318         if Callids().already_handled(call_id): return ""
319         # get slice's hrn from options
320         xrn = options.get('geni_slice_urn', None)
321         cached = options.get('cached', True) 
322         (hrn, _) = urn_to_hrn(xrn)
323     
324         version_manager = VersionManager()
325         # get the rspec's return format from options
326         rspec_version = version_manager.get_version(options.get('geni_rspec_version'))
327         version_string = "rspec_%s" % (rspec_version)
328     
329         #panos adding the info option to the caching key (can be improved)
330         if options.get('info'):
331             version_string = version_string + "_"+options.get('info', 'default')
332     
333         # look in cache first
334         #if self.cache and api.cache and not xrn and cached:
335             #rspec = api.cache.get(version_string)
336             #if rspec:
337                 #api.logger.info("aggregate.ListResources: returning cached value for hrn %s"%hrn)
338                 #return rspec 
339     
340         #panos: passing user-defined options
341         #print "manager options = ",options
342         aggregate = SlabAggregate(api)
343         #aggregate = Aggregate(api)
344         rspec =  aggregate.get_rspec(slice_xrn=xrn, version=rspec_version, options=options)
345     
346         # cache the result
347         #if self.caching and api.cache and not xrn:
348             #api.cache.add(version_string, rspec)
349     
350         return rspec
351     
352     
353     def GetTicket(self, api, xrn, creds, rspec, users, options={}):
354     
355         (slice_hrn, _) = urn_to_hrn(xrn)
356         slices = SlabSlices(api)
357         peer = slices.get_peer(slice_hrn)
358         sfa_peer = slices.get_sfa_peer(slice_hrn)
359     
360         # get the slice record
361         credential = api.getCredential()
362         interface = api.registries[api.hrn]
363         registry = api.server_proxy(interface, credential)
364         records = registry.Resolve(xrn, credential)
365     
366         # make sure we get a local slice record
367         record = None
368         for tmp_record in records:
369             if tmp_record['type'] == 'slice' and \
370                not tmp_record['peer_authority']:
371     #Error (E0602, GetTicket): Undefined variable 'SliceRecord'
372                 record = SliceRecord(dict=tmp_record)
373         if not record:
374             raise RecordNotFound(slice_hrn)
375         
376         # similar to CreateSliver, we must verify that the required records exist
377         # at this aggregate before we can issue a ticket
378         # parse rspec
379         rspec = RSpec(rspec_string)
380         requested_attributes = rspec.version.get_slice_attributes()
381     
382         # ensure site record exists
383         site = slices.verify_site(hrn, slice_record, peer, sfa_peer)
384         # ensure slice record exists
385         slice = slices.verify_slice(hrn, slice_record, peer, sfa_peer)
386         # ensure person records exists
387         persons = slices.verify_persons(hrn, slice, users, peer, sfa_peer)
388         # ensure slice attributes exists
389         slices.verify_slice_attributes(slice, requested_attributes)
390         
391         # get sliver info
392         slivers = slices.get_slivers(slice_hrn)
393     
394         if not slivers:
395             raise SliverDoesNotExist(slice_hrn)
396     
397         # get initscripts
398         initscripts = []
399         data = {
400             'timestamp': int(time.time()),
401             'initscripts': initscripts,
402             'slivers': slivers
403         }
404     
405         # create the ticket
406         object_gid = record.get_gid_object()
407         new_ticket = SfaTicket(subject = object_gid.get_subject())
408         new_ticket.set_gid_caller(api.auth.client_gid)
409         new_ticket.set_gid_object(object_gid)
410         new_ticket.set_issuer(key=api.key, subject=api.hrn)
411         new_ticket.set_pubkey(object_gid.get_pubkey())
412         new_ticket.set_attributes(data)
413         new_ticket.set_rspec(rspec)
414         #new_ticket.set_parent(api.auth.hierarchy.get_auth_ticket(auth_hrn))
415         new_ticket.encode()
416         new_ticket.sign()
417     
418         return new_ticket.save_to_string(save_parents=True)