2248ff15d74e969e9cf0d8509a147b29f8e8f715
[sfa.git] / sfa / senslab / slabdriver.py
1 import os
2
3 from datetime import datetime
4
5 from sfa.util.faults import SliverDoesNotExist, UnknownSfaType
6 from sfa.util.sfalogging import logger
7 from sfa.storage.alchemy import dbsession
8 from sfa.storage.model import RegRecord, RegUser, RegSlice, RegKey
9 from sqlalchemy.orm import joinedload
10
11 from sfa.trust.certificate import Keypair, convert_public_key
12 from sfa.trust.gid import create_uuid
13 from sfa.trust.hierarchy import Hierarchy
14
15 from sfa.managers.driver import Driver
16 from sfa.rspecs.version_manager import VersionManager
17 from sfa.rspecs.rspec import RSpec
18
19 from sfa.util.xrn import Xrn, hrn_to_urn, get_authority
20
21 from sfa.senslab.OARrestapi import  OARrestapi
22 from sfa.senslab.LDAPapi import LDAPapi
23
24 from sfa.senslab.slabpostgres import SlabDB, slab_dbsession, SenslabXP
25                                                      
26                                                                 
27 from sfa.senslab.slabaggregate import SlabAggregate, slab_xrn_to_hostname, \
28                                                             slab_xrn_object
29 from sfa.senslab.slabslices import SlabSlices
30
31
32 from sfa.senslab.slabapi import SlabTestbedAPI
33
34
35      
36 class SlabDriver(Driver):
37     """ Senslab Driver class inherited from Driver generic class.
38     
39     Contains methods compliant with the SFA standard and the testbed
40     infrastructure (calls to LDAP and OAR).
41     """
42     def __init__(self, config):
43         Driver.__init__ (self, config)
44         self.config = config
45         self.hrn = config.SFA_INTERFACE_HRN
46
47         self.db = SlabDB(config, debug = False)
48         self.slab_api = SlabTestbedAPI(config)
49         self.cache = None
50         
51     def augment_records_with_testbed_info (self, record_list ):
52         """ Adds specific testbed info to the records. """
53         return self.fill_record_info (record_list)
54     
55     def fill_record_info(self, record_list):
56         """
57         Given a SFA record, fill in the senslab specific and SFA specific
58         fields in the record. 
59         """
60                     
61         logger.debug("SLABDRIVER \tfill_record_info records %s " %(record_list))
62         if not isinstance(record_list, list):
63             record_list = [record_list]
64             
65         try:
66             for record in record_list:
67                 #If the record is a SFA slice record, then add information 
68                 #about the user of this slice. This kind of 
69                 #information is in the Senslab's DB.
70                 if str(record['type']) == 'slice':
71                     if 'reg_researchers' in record and \
72                     isinstance(record['reg_researchers'], list) :
73                         record['reg_researchers'] = record['reg_researchers'][0].__dict__
74                         record.update({'PI':[record['reg_researchers']['hrn']],
75                                 'researcher': [record['reg_researchers']['hrn']],
76                                 'name':record['hrn'], 
77                                 'oar_job_id':[],
78                                 'node_ids': [],
79                                 'person_ids':[record['reg_researchers']['record_id']],
80                                 'geni_urn':'',  #For client_helper.py compatibility
81                                 'keys':'',  #For client_helper.py compatibility
82                                 'key_ids':''})  #For client_helper.py compatibility
83                         
84                         
85                     #Get slab slice record and oar job id if any.
86                     recslice_list = self.slab_api.GetSlices(slice_filter = \
87                                                 str(record['hrn']),\
88                                                 slice_filter_type = 'slice_hrn')
89                     
90                    
91                     logger.debug("SLABDRIVER \tfill_record_info \
92                         TYPE SLICE RECUSER record['hrn'] %s ecord['oar_job_id']\
93                          %s " %(record['hrn'], record['oar_job_id']))
94                     del record['reg_researchers']
95                     try:
96                         for rec in recslice_list: 
97                             logger.debug("SLABDRIVER\r\n  \t  fill_record_info oar_job_id %s " %(rec['oar_job_id']))
98                             
99                             record['node_ids'] = [ self.slab_api.root_auth + hostname for hostname in rec['node_ids']]
100                     except KeyError:
101                         pass
102                         
103                     
104                     logger.debug( "SLABDRIVER.PY \t fill_record_info SLICE \
105                                     recslice_list  %s \r\n \t RECORD %s \r\n \
106                                     \r\n" %(recslice_list, record)) 
107                                     
108                 if str(record['type']) == 'user':
109                     #The record is a SFA user record.
110                     #Get the information about his slice from Senslab's DB
111                     #and add it to the user record.
112                     recslice_list = self.slab_api.GetSlices(\
113                             slice_filter = record['record_id'],\
114                             slice_filter_type = 'record_id_user')
115                                             
116                     logger.debug( "SLABDRIVER.PY \t fill_record_info TYPE USER \
117                                                 recslice_list %s \r\n \t RECORD %s \r\n" %(recslice_list , record)) 
118                     #Append slice record in records list, 
119                     #therefore fetches user and slice info again(one more loop)
120                     #Will update PIs and researcher for the slice
121                    
122                     recuser = recslice_list[0]['reg_researchers']
123                     logger.debug( "SLABDRIVER.PY \t fill_record_info USER  \
124                                                 recuser %s \r\n \r\n" %(recuser)) 
125                     recslice = {}
126                     recslice = recslice_list[0]
127                     recslice.update({'PI':[recuser['hrn']],
128                         'researcher': [recuser['hrn']],
129                         'name':record['hrn'], 
130                         'node_ids': [],
131                         'oar_job_id': [],
132                         'person_ids':[recuser['record_id']]}) 
133                     try:
134                         for rec in recslice_list:
135                             recslice['oar_job_id'].append(rec['oar_job_id'])
136                     except KeyError:
137                         pass
138                             
139                     recslice.update({'type':'slice', \
140                                                 'hrn':recslice_list[0]['hrn']})
141
142
143                     #GetPersons takes [] as filters 
144                     user_slab = self.slab_api.GetPersons([record])
145     
146                     
147                     record.update(user_slab[0])
148                     #For client_helper.py compatibility
149                     record.update( { 'geni_urn':'',
150                     'keys':'',
151                     'key_ids':'' })                
152                     record_list.append(recslice)
153                     
154                     logger.debug("SLABDRIVER.PY \tfill_record_info ADDING SLICE\
155                                 INFO TO USER records %s" %(record_list)) 
156                   
157
158         except TypeError, error:
159             logger.log_exc("SLABDRIVER \t fill_record_info  EXCEPTION %s"\
160                                                                      %(error))
161                               
162         return
163                     
164                     
165     def sliver_status(self, slice_urn, slice_hrn):
166         """Receive a status request for slice named urn/hrn 
167         urn:publicid:IDN+senslab+nturro_slice hrn senslab.nturro_slice
168         shall return a structure as described in
169         http://groups.geni.net/geni/wiki/GAPI_AM_API_V2#SliverStatus
170         NT : not sure if we should implement this or not, but used by sface.
171         
172         """
173         
174         #First get the slice with the slice hrn
175         slice_list =  self.slab_api.GetSlices(slice_filter = slice_hrn, \
176                                     slice_filter_type = 'slice_hrn')
177         
178         if len(slice_list) is 0:
179             raise SliverDoesNotExist("%s  slice_hrn" % (slice_hrn))
180         
181         #Used for fetching the user info witch comes along the slice info 
182         one_slice = slice_list[0] 
183
184         
185         #Make a list of all the nodes hostnames  in use for this slice
186         slice_nodes_list = []
187         #for single_slice in slice_list:
188             #for node in single_slice['node_ids']:
189                 #slice_nodes_list.append(node['hostname'])
190         for node in one_slice:
191             slice_nodes_list.append(node['hostname'])
192             
193         #Get all the corresponding nodes details    
194         nodes_all = self.slab_api.GetNodes({'hostname':slice_nodes_list},
195                                 ['node_id', 'hostname','site','boot_state'])
196         nodeall_byhostname = dict([(one_node['hostname'], one_node) \
197                                             for one_node in nodes_all])  
198           
199           
200           
201         for single_slice in slice_list:
202
203               #For compatibility
204             top_level_status = 'empty' 
205             result = {}
206             result.fromkeys(\
207                 ['geni_urn','pl_login','geni_status','geni_resources'], None)
208             result['pl_login'] = one_slice['reg_researchers']['hrn']
209             logger.debug("Slabdriver - sliver_status Sliver status \
210                                         urn %s hrn %s single_slice  %s \r\n " \
211                                         %(slice_urn, slice_hrn, single_slice))
212                                         
213             if 'node_ids' not in single_slice:
214                 #No job in the slice
215                 result['geni_status'] = top_level_status
216                 result['geni_resources'] = [] 
217                 return result
218            
219             top_level_status = 'ready' 
220
221             #A job is running on Senslab for this slice
222             # report about the local nodes that are in the slice only
223          
224             result['geni_urn'] = slice_urn
225
226             resources = []
227             for node in single_slice['node_ids']:
228                 res = {}
229                 #res['slab_hostname'] = node['hostname']
230                 #res['slab_boot_state'] = node['boot_state']
231                 
232                 res['pl_hostname'] = node['hostname']
233                 res['pl_boot_state'] = \
234                             nodeall_byhostname[node['hostname']]['boot_state']
235                 #res['pl_last_contact'] = strftime(self.time_format, \
236                                                     #gmtime(float(timestamp)))
237                 sliver_id =  Xrn(slice_urn, type='slice', \
238                         id=nodeall_byhostname[node['hostname']]['node_id'], \
239                         authority=self.hrn).urn
240     
241                 res['geni_urn'] = sliver_id 
242                 node_name  = node['hostname']
243                 if nodeall_byhostname[node_name]['boot_state'] == 'Alive':
244
245                     res['geni_status'] = 'ready'
246                 else:
247                     res['geni_status'] = 'failed'
248                     top_level_status = 'failed' 
249                     
250                 res['geni_error'] = ''
251         
252                 resources.append(res)
253                 
254             result['geni_status'] = top_level_status
255             result['geni_resources'] = resources 
256             logger.debug("SLABDRIVER \tsliver_statusresources %s res %s "\
257                                                     %(resources,res))
258             return result  
259                 
260     @staticmethod                
261     def get_user_record( hrn):        
262         """ Returns the user record based on the hrn from the SFA DB """
263         return dbsession.query(RegRecord).filter_by(hrn = hrn).first() 
264          
265      
266     def testbed_name (self): 
267         return self.hrn
268          
269     # 'geni_request_rspec_versions' and 'geni_ad_rspec_versions' are mandatory
270     def aggregate_version (self):
271         version_manager = VersionManager()
272         ad_rspec_versions = []
273         request_rspec_versions = []
274         for rspec_version in version_manager.versions:
275             if rspec_version.content_type in ['*', 'ad']:
276                 ad_rspec_versions.append(rspec_version.to_dict())
277             if rspec_version.content_type in ['*', 'request']:
278                 request_rspec_versions.append(rspec_version.to_dict()) 
279         return {
280             'testbed':self.testbed_name(),
281             'geni_request_rspec_versions': request_rspec_versions,
282             'geni_ad_rspec_versions': ad_rspec_versions,
283             }  
284                
285                
286     def create_sliver (self, slice_urn, slice_hrn, creds, rspec_string, \
287                                                              users, options):
288         """  """
289         aggregate = SlabAggregate(self)
290         
291         slices = SlabSlices(self)
292         peer = slices.get_peer(slice_hrn)
293         sfa_peer = slices.get_sfa_peer(slice_hrn)
294         slice_record = None 
295  
296         if not isinstance(creds, list):
297             creds = [creds]
298     
299         if users:
300             slice_record = users[0].get('slice_record', {}) 
301             logger.debug("SLABDRIVER.PY \t ===============create_sliver \t\
302                                         creds %s \r\n \r\n users %s" \
303                                         %(creds, users))
304             slice_record['user'] = {'keys':users[0]['keys'], \
305                                     'email':users[0]['email'], \
306                                     'hrn':slice_record['reg-researchers'][0]}
307         # parse rspec
308         rspec = RSpec(rspec_string)
309         logger.debug("SLABDRIVER.PY \t create_sliver \trspec.version \
310                                         %s slice_record %s users %s" \
311                                         %(rspec.version,slice_record, users))
312                                             
313
314         # ensure site record exists?
315         # ensure slice record exists
316         #Removed options to verify_slice SA 14/08/12
317         sfa_slice = slices.verify_slice(slice_hrn, slice_record, peer, \
318                                                     sfa_peer)
319                                                     
320         # ensure person records exists
321         #verify_persons returns added persons but since the return value
322         #is not used 
323         slices.verify_persons(slice_hrn, sfa_slice, users, peer, \
324                                                     sfa_peer, options=options)                                           
325         #requested_attributes returned by rspec.version.get_slice_attributes() 
326         #unused, removed SA 13/08/12
327         rspec.version.get_slice_attributes()
328
329         logger.debug("SLABDRIVER.PY create_sliver slice %s " %(sfa_slice))
330
331         # add/remove slice from nodes 
332        
333         requested_slivers = [node.get('component_id') \
334                             for node in rspec.version.get_nodes_with_slivers()\
335                             if node.get('authority_id') is self.slab_api.root_auth]
336         l = [ node for node in rspec.version.get_nodes_with_slivers() ]
337         logger.debug("SLADRIVER \tcreate_sliver requested_slivers \
338                                     requested_slivers %s  listnodes %s" \
339                                     %(requested_slivers,l))
340         #verify_slice_nodes returns nodes, but unused here. Removed SA 13/08/12.
341         #slices.verify_slice_nodes(sfa_slice, requested_slivers, peer) 
342         
343         # add/remove leases
344         requested_lease_list = []
345
346
347
348         for lease in rspec.version.get_leases():
349             single_requested_lease = {}
350             logger.debug("SLABDRIVER.PY \tcreate_sliver lease %s " %(lease))
351             
352             if not lease.get('lease_id'):
353                 if get_authority(lease['component_id']) == self.slab_api.root_auth:
354                     single_requested_lease['hostname'] = \
355                                         slab_xrn_to_hostname(\
356                                         lease.get('component_id').strip())
357                     single_requested_lease['start_time'] = \
358                                                         lease.get('start_time')
359                     single_requested_lease['duration'] = lease.get('duration')
360                     #Check the experiment's duration is valid before adding
361                     #the lease to the requested leases list
362                     duration_in_seconds = \
363                             int(single_requested_lease['duration'])*60
364                     if duration_in_seconds > self.slab_api.GetLeaseGranularity():
365                         requested_lease_list.append(single_requested_lease)
366                      
367         #Create dict of leases by start_time, regrouping nodes reserved
368         #at the same
369         #time, for the same amount of time = one job on OAR
370         requested_job_dict = {}
371         for lease in requested_lease_list:
372             
373             #In case it is an asap experiment start_time is empty
374             if lease['start_time'] == '':
375                 lease['start_time'] = '0' 
376                 
377             if lease['start_time'] not in requested_job_dict:
378                 if isinstance(lease['hostname'], str):
379                     lease['hostname'] =  [lease['hostname']]
380                     
381                 requested_job_dict[lease['start_time']] = lease
382                 
383             else :
384                 job_lease = requested_job_dict[lease['start_time']]
385                 if lease['duration'] == job_lease['duration'] :
386                     job_lease['hostname'].append(lease['hostname'])
387                     
388           
389                 
390                         
391         logger.debug("SLABDRIVER.PY \tcreate_sliver  requested_job_dict %s "\
392                                                      %(requested_job_dict))    
393         #verify_slice_leases returns the leases , but the return value is unused
394         #here. Removed SA 13/08/12           
395         slices.verify_slice_leases(sfa_slice, \
396                                     requested_job_dict, peer)
397         
398         return aggregate.get_rspec(slice_xrn=slice_urn, \
399                 login=sfa_slice['login'], version=rspec.version)
400         
401         
402     def delete_sliver (self, slice_urn, slice_hrn, creds, options):
403         
404         sfa_slice_list  = self.slab_api.GetSlices(slice_filter = slice_hrn, \
405                                             slice_filter_type = 'slice_hrn')
406         
407         if not sfa_slice_list:
408             return 1
409         
410         #Delete all in the slice
411         for sfa_slice in sfa_slice_list:
412
413         
414             logger.debug("SLABDRIVER.PY delete_sliver slice %s" %(sfa_slice))
415             slices = SlabSlices(self)
416             # determine if this is a peer slice
417         
418             peer = slices.get_peer(slice_hrn) 
419             #TODO delete_sliver SA : UnBindObjectFromPeer should be 
420             #used when there is another 
421             #senslab testbed, which is not the case 14/08/12 . 
422             
423             logger.debug("SLABDRIVER.PY delete_sliver peer %s \r\n \t sfa_slice %s " %(peer, sfa_slice))
424             try:
425                 #if peer:
426                     #self.slab_api.UnBindObjectFromPeer('slice', \
427                                             #sfa_slice['record_id_slice'], \
428                                             #peer, None)
429                 self.slab_api.DeleteSliceFromNodes(sfa_slice)
430                 return True
431             except :
432                 return False
433             #finally:
434                 #if peer:
435                     #self.slab_api.BindObjectToPeer('slice', \
436                                             #sfa_slice['record_id_slice'], \
437                                             #peer, sfa_slice['peer_slice_id'])
438             #return 1
439                 
440     
441     # first 2 args are None in case of resource discovery
442     def list_resources (self, slice_urn, slice_hrn, creds, options):
443         #cached_requested = options.get('cached', True) 
444     
445         version_manager = VersionManager()
446         # get the rspec's return format from options
447         rspec_version = \
448                 version_manager.get_version(options.get('geni_rspec_version'))
449         version_string = "rspec_%s" % (rspec_version)
450     
451         #panos adding the info option to the caching key (can be improved)
452         if options.get('info'):
453             version_string = version_string + "_" + \
454                                         options.get('info', 'default')
455                                         
456         # Adding the list_leases option to the caching key
457         if options.get('list_leases'):
458             version_string = version_string + "_"+options.get('list_leases', 'default')
459             
460         # Adding geni_available to caching key
461         if options.get('geni_available'):
462             version_string = version_string + "_" + str(options.get('geni_available'))
463     
464         # look in cache first
465         #if cached_requested and self.cache and not slice_hrn:
466             #rspec = self.cache.get(version_string)
467             #if rspec:
468                 #logger.debug("SlabDriver.ListResources: \
469                                     #returning cached advertisement")
470                 #return rspec 
471     
472         #panos: passing user-defined options
473         aggregate = SlabAggregate(self)
474         #origin_hrn = Credential(string=creds[0]).get_gid_caller().get_hrn()
475         #options.update({'origin_hrn':origin_hrn})
476         rspec =  aggregate.get_rspec(slice_xrn=slice_urn, \
477                                         version=rspec_version, options=options)
478        
479         # cache the result
480         #if self.cache and not slice_hrn:
481             #logger.debug("Slab.ListResources: stores advertisement in cache")
482             #self.cache.add(version_string, rspec)
483     
484         return rspec
485         
486         
487     def list_slices (self, creds, options):
488         # look in cache first
489         #if self.cache:
490             #slices = self.cache.get('slices')
491             #if slices:
492                 #logger.debug("PlDriver.list_slices returns from cache")
493                 #return slices
494     
495         # get data from db 
496
497         slices = self.slab_api.GetSlices()        
498         logger.debug("SLABDRIVER.PY \tlist_slices hrn %s \r\n \r\n" %(slices))        
499         slice_hrns = [slab_slice['hrn'] for slab_slice in slices]
500
501         slice_urns = [hrn_to_urn(slice_hrn, 'slice') \
502                                                 for slice_hrn in slice_hrns]
503
504         # cache the result
505         #if self.cache:
506             #logger.debug ("SlabDriver.list_slices stores value in cache")
507             #self.cache.add('slices', slice_urns) 
508     
509         return slice_urns
510     
511    
512     def register (self, sfa_record, hrn, pub_key):
513         """ 
514         Adding new user, slice, node or site should not be handled
515         by SFA.
516         
517         Adding nodes = OAR
518         Adding users = LDAP Senslab
519         Adding slice = Import from LDAP users
520         Adding site = OAR
521         """
522         return -1
523             
524       
525     def update (self, old_sfa_record, new_sfa_record, hrn, new_key):
526         """No site or node record update allowed in Senslab."""
527         
528         pointer = old_sfa_record['pointer']
529         old_sfa_record_type = old_sfa_record['type']
530
531         # new_key implemented for users only
532         if new_key and old_sfa_record_type not in [ 'user' ]:
533             raise UnknownSfaType(old_sfa_record_type)
534         
535         #if (type == "authority"):
536             #self.shell.UpdateSite(pointer, new_sfa_record)
537     
538         if old_sfa_record_type == "slice":
539             slab_record = self.slab_api.sfa_fields_to_slab_fields(old_sfa_record_type, \
540                                                 hrn, new_sfa_record)
541             if 'name' in slab_record:
542                 slab_record.pop('name')
543                 #Prototype should be UpdateSlice(self,
544                 #auth, slice_id_or_name, slice_fields)
545                 #Senslab cannot update slice since slice = job
546                 #so we must delete and create another job
547                 self.slab_api.UpdateSlice(pointer, slab_record)
548     
549         elif old_sfa_record_type == "user":
550             update_fields = {}
551             all_fields = new_sfa_record
552             for key in all_fields.keys():
553                 if key in ['first_name', 'last_name', 'title', 'email',
554                            'password', 'phone', 'url', 'bio', 'accepted_aup',
555                            'enabled']:
556                     update_fields[key] = all_fields[key]
557             self.slab_api.UpdatePerson(pointer, update_fields)
558     
559             if new_key:
560                 # must check this key against the previous one if it exists
561                 persons = self.slab_api.GetPersons(['key_ids'])
562                 person = persons[0]
563                 keys = person['key_ids']
564                 keys = self.slab_api.GetKeys(person['key_ids'])
565                 
566                 # Delete all stale keys
567                 key_exists = False
568                 for key in keys:
569                     if new_key != key['key']:
570                         self.slab_api.DeleteKey(key['key_id'])
571                     else:
572                         key_exists = True
573                 if not key_exists:
574                     self.slab_api.AddPersonKey(pointer, {'key_type': 'ssh', \
575                                                     'key': new_key})
576
577
578         return True
579         
580
581     def remove (self, sfa_record):
582         sfa_record_type = sfa_record['type']
583         hrn = sfa_record['hrn']
584         if sfa_record_type == 'user':
585
586             #get user from senslab ldap  
587             person = self.slab_api.GetPersons(sfa_record)
588             #No registering at a given site in Senslab.
589             #Once registered to the LDAP, all senslab sites are
590             #accesible.
591             if person :
592                 #Mark account as disabled in ldap
593                 self.slab_api.DeletePerson(sfa_record)
594         elif sfa_record_type == 'slice':
595             if self.slab_api.GetSlices(slice_filter = hrn, \
596                                     slice_filter_type = 'slice_hrn'):
597                 self.slab_api.DeleteSlice(sfa_record)
598
599         #elif type == 'authority':
600             #if self.GetSites(pointer):
601                 #self.DeleteSite(pointer)
602
603         return True
604             
605