Merge branch 'master' into senslab2
[sfa.git] / sfa / senslab / slabdriver.py
1 import sys
2 import subprocess
3
4 from datetime import datetime
5 from dateutil import tz 
6 from time import strftime,gmtime
7
8 from sfa.util.faults import MissingSfaInfo , SliverDoesNotExist
9 from sfa.util.sfalogging import logger
10 from sfa.util.defaultdict import defaultdict
11
12 from sfa.storage.record import Record
13 from sfa.storage.alchemy import dbsession
14 from sfa.storage.model import RegRecord
15
16
17 from sfa.trust.certificate import *
18 from sfa.trust.credential import *
19 from sfa.trust.gid import GID
20
21 from sfa.managers.driver import Driver
22 from sfa.rspecs.version_manager import VersionManager
23 from sfa.rspecs.rspec import RSpec
24
25 from sfa.util.xrn import hrn_to_urn, urn_to_sliver_id
26 from sfa.util.plxrn import slicename_to_hrn, hostname_to_hrn, hrn_to_pl_slicename
27
28 ## thierry: everything that is API-related (i.e. handling incoming requests) 
29 # is taken care of 
30 # SlabDriver should be really only about talking to the senslab testbed
31
32 ## thierry : please avoid wildcard imports :)
33 from sfa.senslab.OARrestapi import  OARrestapi
34 from sfa.senslab.LDAPapi import LDAPapi
35
36 from sfa.senslab.parsing import parse_filter
37 from sfa.senslab.slabpostgres import SlabDB, slab_dbsession,SliceSenslab
38 from sfa.senslab.slabaggregate import SlabAggregate
39 from sfa.senslab.slabslices import SlabSlices
40
41 def list_to_dict(recs, key):
42     """
43     convert a list of dictionaries into a dictionary keyed on the 
44     specified dictionary key 
45     """
46
47     keys = [rec[key] for rec in recs]
48     return dict(zip(keys, recs))
49
50 # thierry : note
51 # this inheritance scheme is so that the driver object can receive
52 # GetNodes or GetSites sorts of calls directly
53 # and thus minimize the differences in the managers with the pl version
54 class SlabDriver(Driver):
55
56     def __init__(self, config):
57         Driver.__init__ (self, config)
58         self.config=config
59         self.hrn = config.SFA_INTERFACE_HRN
60     
61         self.root_auth = config.SFA_REGISTRY_ROOT_AUTH
62
63         
64         print >>sys.stderr, "\r\n_____________ SFA SENSLAB DRIVER \r\n" 
65
66         self.oar = OARrestapi()
67         self.ldap = LDAPapi()
68         self.time_format = "%Y-%m-%d %H:%M:%S"
69         self.db = SlabDB(config)
70         self.cache=None
71         
72
73     def sliver_status(self,slice_urn,slice_hrn):
74         # receive a status request for slice named urn/hrn urn:publicid:IDN+senslab+nturro_slice hrn senslab.nturro_slice
75         # shall return a structure as described in
76         # http://groups.geni.net/geni/wiki/GAPI_AM_API_V2#SliverStatus
77         # NT : not sure if we should implement this or not, but used by sface.
78         
79
80         sl = self.GetSlices(slice_filter= slice_hrn, filter_type = 'slice_hrn')
81         if len(sl) is 0:
82             raise SliverDoesNotExist("%s  slice_hrn" % (slice_hrn))
83
84         print >>sys.stderr, "\r\n \r\n_____________ Sliver status urn %s hrn %s sl %s \r\n " %(slice_urn,slice_hrn,sl)
85         if sl['oar_job_id'] is not -1:
86     
87             # report about the local nodes only
88             nodes_all = self.GetNodes({'hostname':sl['node_ids']},
89                             ['node_id', 'hostname','site','boot_state'])
90             nodeall_byhostname = dict([(n['hostname'], n) for n in nodes_all])
91             nodes = sl['node_ids']
92             if len(nodes) is 0:
93                 raise SliverDoesNotExist("No slivers allocated ") 
94                     
95
96             result = {}
97             top_level_status = 'unknown'
98             if nodes:
99                 top_level_status = 'ready'
100             result['geni_urn'] = slice_urn
101             result['pl_login'] = sl['job_user']
102             #result['slab_login'] = sl['job_user']
103             
104             timestamp = float(sl['startTime']) + float(sl['walltime']) 
105             result['pl_expires'] = strftime(self.time_format, gmtime(float(timestamp)))
106             #result['slab_expires'] = strftime(self.time_format, gmtime(float(timestamp)))
107             
108             resources = []
109             for node in nodes:
110                 res = {}
111                 #res['slab_hostname'] = node['hostname']
112                 #res['slab_boot_state'] = node['boot_state']
113                 
114                 res['pl_hostname'] = nodeall_byhostname[node]['hostname']
115                 res['pl_boot_state'] = nodeall_byhostname[node]['boot_state']
116                 res['pl_last_contact'] = strftime(self.time_format, gmtime(float(timestamp)))
117                 sliver_id = urn_to_sliver_id(slice_urn, sl['record_id_slice'],nodeall_byhostname[node]['node_id'] ) 
118                 res['geni_urn'] = sliver_id 
119                 if nodeall_byhostname[node]['boot_state'] == 'Alive':
120
121                     res['geni_status'] = 'ready'
122                 else:
123                     res['geni_status'] = 'failed'
124                     top_level_status = 'failed' 
125                     
126                 res['geni_error'] = ''
127         
128                 resources.append(res)
129                 
130             result['geni_status'] = top_level_status
131             result['geni_resources'] = resources 
132             print >>sys.stderr, "\r\n \r\n_____________ Sliver status resources %s res %s \r\n " %(resources,res)
133             return result        
134         
135         
136     def create_sliver (self, slice_urn, slice_hrn, creds, rspec_string, users, options):
137         print>>sys.stderr, "\r\n \r\n \t=============================== SLABDRIVER.PY create_sliver "
138         aggregate = SlabAggregate(self)
139         
140         slices = SlabSlices(self)
141         peer = slices.get_peer(slice_hrn)
142         sfa_peer = slices.get_sfa_peer(slice_hrn)
143         slice_record=None 
144  
145         if not isinstance(creds, list):
146             creds = [creds]
147     
148         if users:
149             slice_record = users[0].get('slice_record', {})
150     
151         # parse rspec
152         rspec = RSpec(rspec_string)
153         print>>sys.stderr, "\r\n \r\n \t=============================== SLABDRIVER.PY create_sliver  ============================rspec.version %s " %(rspec.version)
154         
155         
156         # ensure site record exists?
157         # ensure slice record exists
158         slice = slices.verify_slice(slice_hrn, slice_record, peer, sfa_peer, options=options)
159         requested_attributes = rspec.version.get_slice_attributes()
160         
161         if requested_attributes:
162             for attrib_dict in requested_attributes:
163                 if 'timeslot' in attrib_dict:
164                     slice.update({'timeslot':attrib_dict['timeslot']})
165         print >>sys.stderr, "\r\n \r\n \t=============================== SLABDRIVER.PY create_sliver  ..... slice %s " %(slice)
166         # ensure person records exists
167         persons = slices.verify_persons(slice_hrn, slice, users, peer, sfa_peer, options=options)
168         # ensure slice attributes exists?
169
170         
171         # add/remove slice from nodes 
172         print >>sys.stderr, "\r\n \r\n \t=============================== SLABDRIVER.PY create_sliver  ..... " 
173        
174         requested_slivers = [node.get('component_name') for node in rspec.version.get_nodes_with_slivers()]
175         print >>sys.stderr, "\r\n \r\n \t=============================== ........... requested_slivers ============================requested_slivers %s " %(requested_slivers)
176         nodes = slices.verify_slice_nodes(slice, requested_slivers, peer) 
177     
178         
179         return aggregate.get_rspec(slice_xrn=slice_urn, version=rspec.version)
180         
181         
182     def delete_sliver (self, slice_urn, slice_hrn, creds, options):
183         
184         slice = self.GetSlices(slice_filter= slice_hrn, filter_type = 'slice_hrn')
185         print>>sys.stderr, "\r\n \r\n \t\t  SLABDRIVER.PY delete_sliver slice %s" %(slice)
186         if not slice:
187             return 1
188        
189         slices = SlabSlices(self)
190         # determine if this is a peer slice
191         # xxx I wonder if this would not need to use PlSlices.get_peer instead 
192         # in which case plc.peers could be deprecated as this here
193         # is the only/last call to this last method in plc.peers
194         peer = slices.get_peer(slice_hrn)
195         try:
196             if peer:
197                 self.UnBindObjectFromPeer('slice', slice['record_id_slice'], peer)
198             self.DeleteSliceFromNodes(slice)
199         finally:
200             if peer:
201                 self.BindObjectToPeer('slice', slice['slice_id'], peer, slice['peer_slice_id'])
202         return 1
203             
204  
205     # first 2 args are None in case of resource discovery
206     def list_resources (self, slice_urn, slice_hrn, creds, options):
207         #cached_requested = options.get('cached', True) 
208     
209         version_manager = VersionManager()
210         # get the rspec's return format from options
211         rspec_version = version_manager.get_version(options.get('geni_rspec_version'))
212         version_string = "rspec_%s" % (rspec_version)
213     
214         #panos adding the info option to the caching key (can be improved)
215         if options.get('info'):
216             version_string = version_string + "_"+options.get('info', 'default')
217     
218         # look in cache first
219         #if cached_requested and self.cache and not slice_hrn:
220             #rspec = self.cache.get(version_string)
221             #if rspec:
222                 #logger.debug("SlabDriver.ListResources: returning cached advertisement")
223                 #return rspec 
224     
225         #panos: passing user-defined options
226
227         aggregate = SlabAggregate(self)
228         origin_hrn = Credential(string=creds[0]).get_gid_caller().get_hrn()
229         #print>>sys.stderr, " \r\n \r\n \t SLABDRIVER list_resources origin_hrn %s" %(origin_hrn)
230         options.update({'origin_hrn':origin_hrn})
231         #print>>sys.stderr, " \r\n \r\n \t SLABDRIVER  list_resources options %s" %(options)
232         rspec =  aggregate.get_rspec(slice_xrn=slice_urn, version=rspec_version, 
233                                      options=options)
234         print>>sys.stderr, " \r\n \r\n \t SLABDRIVER list_resources rspec " 
235         # cache the result
236         #if self.cache and not slice_hrn:
237             #logger.debug("Slab.ListResources: stores advertisement in cache")
238             #self.cache.add(version_string, rspec)
239     
240         return rspec
241         
242         
243     def list_slices (self, creds, options):
244         # look in cache first
245         #if self.cache:
246             #slices = self.cache.get('slices')
247             #if slices:
248                 #logger.debug("PlDriver.list_slices returns from cache")
249                 #return slices
250     
251         # get data from db 
252         print>>sys.stderr, " \r\n \t\t SLABDRIVER.PY list_slices"
253         slices = self.GetSlices()
254         slice_hrns = [slicename_to_hrn(self.hrn, slice['slice_hrn']) for slice in slices]
255         slice_urns = [hrn_to_urn(slice_hrn, 'slice') for slice_hrn in slice_hrns]
256     
257         # cache the result
258         #if self.cache:
259             #logger.debug ("SlabDriver.list_slices stores value in cache")
260             #self.cache.add('slices', slice_urns) 
261     
262         return slice_urns
263     
264     #No site or node register supported
265     def register (self, sfa_record, hrn, pub_key):
266         type = sfa_record['type']
267         slab_record = self.sfa_fields_to_slab_fields(type, hrn, sfa_record)
268     
269
270         if type == 'slice':
271             acceptable_fields=['url', 'instantiation', 'name', 'description']
272             for key in slab_record.keys():
273                 if key not in acceptable_fields:
274                     slab_record.pop(key) 
275             print>>sys.stderr, " \r\n \t\t SLABDRIVER.PY register"
276             slices = self.GetSlices(slice_filter =slab_record['hrn'], filter_type = 'slice_hrn')
277             if not slices:
278                     pointer = self.AddSlice(slab_record)
279             else:
280                     pointer = slices[0]['slice_id']
281     
282         elif type == 'user':
283             persons = self.GetPersons([sfa_record['hrn']])
284             if not persons:
285                 pointer = self.AddPerson(dict(sfa_record))
286                 #add in LDAP 
287             else:
288                 pointer = persons[0]['person_id']
289                 
290             #Does this make sense to senslab ?
291             #if 'enabled' in sfa_record and sfa_record['enabled']:
292                 #self.UpdatePerson(pointer, {'enabled': sfa_record['enabled']})
293                 
294             # add this person to the site only if she is being added for the first
295             # time by sfa and doesont already exist in plc
296             if not persons or not persons[0]['site_ids']:
297                 login_base = get_leaf(sfa_record['authority'])
298                 self.AddPersonToSite(pointer, login_base)
299     
300             # What roles should this user have?
301             self.AddRoleToPerson('user', pointer)
302             # Add the user's key
303             if pub_key:
304                 self.AddPersonKey(pointer, {'key_type' : 'ssh', 'key' : pub_key})
305                 
306         #No node adding outside OAR
307
308         return pointer
309             
310     #No site or node record update allowed       
311     def update (self, old_sfa_record, new_sfa_record, hrn, new_key):
312         pointer = old_sfa_record['pointer']
313         type = old_sfa_record['type']
314
315         # new_key implemented for users only
316         if new_key and type not in [ 'user' ]:
317             raise UnknownSfaType(type)
318         
319         #if (type == "authority"):
320             #self.shell.UpdateSite(pointer, new_sfa_record)
321     
322         if type == "slice":
323             slab_record=self.sfa_fields_to_slab_fields(type, hrn, new_sfa_record)
324             if 'name' in slab_record:
325                 slab_record.pop('name')
326                 self.UpdateSlice(pointer, slab_record)
327     
328         elif type == "user":
329             update_fields = {}
330             all_fields = new_sfa_record
331             for key in all_fields.keys():
332                 if key in ['first_name', 'last_name', 'title', 'email',
333                            'password', 'phone', 'url', 'bio', 'accepted_aup',
334                            'enabled']:
335                     update_fields[key] = all_fields[key]
336             self.UpdatePerson(pointer, update_fields)
337     
338             if new_key:
339                 # must check this key against the previous one if it exists
340                 persons = self.GetPersons([pointer], ['key_ids'])
341                 person = persons[0]
342                 keys = person['key_ids']
343                 keys = self.GetKeys(person['key_ids'])
344                 
345                 # Delete all stale keys
346                 key_exists = False
347                 for key in keys:
348                     if new_key != key['key']:
349                         self.DeleteKey(key['key_id'])
350                     else:
351                         key_exists = True
352                 if not key_exists:
353                     self.AddPersonKey(pointer, {'key_type': 'ssh', 'key': new_key})
354
355
356         return True
357         
358
359     def remove (self, sfa_record):
360         type=sfa_record['type']
361         hrn=sfa_record['hrn']
362         record_id= sfa_record['record_id']
363         if type == 'user':
364             username = hrn.split(".")[len(hrn.split(".")) -1]
365             #get user in ldap
366             persons = self.GetPersons(username)
367             # only delete this person if he has site ids. if he doesnt, it probably means
368             # he was just removed from a site, not actually deleted
369             if persons and persons[0]['site_ids']:
370                 self.DeletePerson(username)
371         elif type == 'slice':
372             if self.GetSlices(slice_filter = hrn, filter_type = 'slice_hrn'):
373                 self.DeleteSlice(hrn)
374
375         #elif type == 'authority':
376             #if self.GetSites(pointer):
377                 #self.DeleteSite(pointer)
378
379         return True
380             
381     def GetPeers (self,auth = None, peer_filter=None, return_fields=None):
382
383         existing_records = {}
384         existing_hrns_by_types= {}
385         print >>sys.stderr, "\r\n \r\n SLABDRIVER GetPeers auth = %s, peer_filter %s, return_field %s " %(auth , peer_filter, return_fields)
386         all_records = dbsession.query(RegRecord).filter(RegRecord.type.like('%authority%')).all()
387         for record in all_records:
388             existing_records[record.hrn] = record
389             if record.type not in existing_hrns_by_types:
390                 existing_hrns_by_types[record.type] = [record.hrn]
391                 print >>sys.stderr, "\r\n \r\n SLABDRIVER GetPeers \t NOT IN existing_hrns_by_types %s " %( existing_hrns_by_types)
392             else:
393                 
394                 print >>sys.stderr, "\r\n \r\n SLABDRIVER GetPeers \t INNN  type %s hrn %s " %( record.type,record.hrn )
395                 existing_hrns_by_types.update({record.type:(existing_hrns_by_types[record.type].append(record.hrn))})
396                         
397         print >>sys.stderr, "\r\n \r\n SLABDRIVER GetPeers        existing_hrns_by_types %s " %( existing_hrns_by_types)
398         records_list= [] 
399       
400         try:
401             for hrn in existing_hrns_by_types['authority+sa']:
402                 records_list.append(existing_records[hrn])
403                 print >>sys.stderr, "\r\n \r\n SLABDRIVER GetPeers  records_list  %s " %(records_list)
404                 
405         except:
406                 pass
407
408         if not peer_filter and not return_fields:
409             return records_list
410         return_records = parse_filter(records_list,peer_filter, 'peers', return_fields) 
411  
412         return return_records
413         
414      
415             
416     def GetPersons(self, person_filter=None, return_fields=None):
417         
418         person_list = self.ldap.ldapFind({'authority': self.root_auth })
419         
420         #check = False
421         #if person_filter and isinstance(person_filter, dict):
422             #for k in  person_filter.keys():
423                 #if k in person_list[0].keys():
424                     #check = True
425                     
426         return_person_list = parse_filter(person_list,person_filter ,'persons', return_fields)
427         if return_person_list:
428             print>>sys.stderr, " \r\n GetPersons person_filter %s return_fields %s  " %(person_filter,return_fields)
429             return return_person_list
430
431     def GetTimezone(self):
432         server_timestamp,server_tz = self.oar.parser.SendRequest("GET_timezone")
433         return server_timestamp,server_tz
434     
435
436     def DeleteJobs(self, job_id, slice_hrn):
437         if not job_id:
438             return
439         username  = slice_hrn.split(".")[-1].rstrip("_slice")
440         reqdict = {}
441         reqdict['method'] = "delete"
442         reqdict['strval'] = str(job_id)
443         answer = self.oar.POSTRequestToOARRestAPI('DELETE_jobs_id',reqdict,username)
444         print>>sys.stderr, "\r\n \r\n  jobid  DeleteJobs %s "  %(answer)
445         
446                 
447     def GetJobs(self,job_id= None, resources=True,return_fields=None, username = None):
448         #job_resources=['reserved_resources', 'assigned_resources','job_id', 'job_uri', 'assigned_nodes',\
449         #'api_timestamp']
450         #assigned_res = ['resource_id', 'resource_uri']
451         #assigned_n = ['node', 'node_uri']
452      
453         if job_id and resources is False:
454             req = "GET_jobs_id"
455             node_list_k = 'assigned_network_address'
456            
457         if job_id and resources :
458             req = "GET_jobs_id_resources"
459             node_list_k = 'reserved_resources' 
460                
461         #Get job info from OAR    
462         job_info = self.oar.parser.SendRequest(req, job_id, username)
463         print>>sys.stderr, "\r\n \r\n \t\t GetJobs  %s " %(job_info)
464         
465         if 'state' in job_info :
466             if job_info['state'] == 'Terminated':
467                 print>>sys.stderr, "\r\n \r\n \t\t GetJobs TERMINELEBOUSIN "
468                 return None
469             if job_info['state'] == 'Error':
470                 print>>sys.stderr, "\r\n \r\n \t\t GetJobs ERROR message %s " %(job_info)
471                 return None
472         
473         #Get a dict of nodes . Key :hostname of the node
474         node_list = self.GetNodes() 
475         node_hostname_list = []
476         for node in node_list:
477             node_hostname_list.append(node['hostname'])
478         node_dict = dict(zip(node_hostname_list,node_list))
479         try :
480             liste =job_info[node_list_k] 
481             #print>>sys.stderr, "\r\n \r\n \t\t GetJobs resources  job_info liste%s" %(liste)
482             for k in range(len(liste)):
483                job_info[node_list_k][k] = node_dict[job_info[node_list_k][k]]['hostname']
484             
485             #print>>sys.stderr, "\r\n \r\n \t\t YYYYYYYYYYYYGetJobs resources  job_info %s" %(job_info)  
486             #Replaces the previous entry "assigned_network_address" / "reserved_resources"
487             #with "node_ids"
488             job_info.update({'node_ids':job_info[node_list_k]})
489             del job_info[node_list_k]
490             return job_info
491             
492         except KeyError:
493             print>>sys.stderr, "\r\n \r\n \t\t GetJobs KEYERROR " 
494             
495     def GetReservedNodes(self):
496         # this function returns a list of all the nodes already involved in an oar job
497
498        jobs=self.oar.parser.SendRequest("GET_jobs_details") 
499        nodes=[]
500        for j in jobs :
501           nodes=j['assigned_network_address']+nodes
502        return nodes
503      
504     def GetNodes(self,node_filter= None, return_fields=None):
505         node_dict =self.oar.parser.SendRequest("GET_resources_full")
506
507         return_node_list = []
508         if not (node_filter or return_fields):
509                 return_node_list = node_dict.values()
510                 return return_node_list
511     
512         return_node_list= parse_filter(node_dict.values(),node_filter ,'node', return_fields)
513         return return_node_list
514     
515   
516     def GetSites(self, site_filter = None, return_fields=None):
517         site_dict =self.oar.parser.SendRequest("GET_sites")
518         return_site_list = []
519         if not ( site_filter or return_fields):
520                 return_site_list = site_dict.values()
521                 return return_site_list
522     
523         return_site_list = parse_filter(site_dict.values(), site_filter,'site', return_fields)
524         return return_site_list
525         
526
527     def GetSlices(self,slice_filter = None, filter_type = None, return_fields=None):
528         return_slice_list = []
529         slicerec  = {}
530         rec = {}
531         ftypes = ['slice_hrn', 'record_id_user']
532         if filter_type and filter_type in ftypes:
533             if filter_type == 'slice_hrn':
534                 slicerec = slab_dbsession.query(SliceSenslab).filter_by(slice_hrn = slice_filter).first()    
535             if filter_type == 'record_id_user':
536                 slicerec = slab_dbsession.query(SliceSenslab).filter_by(record_id_user = slice_filter).first()
537                 
538             if slicerec:
539                 rec = slicerec.dumpquerytodict()
540                 login = slicerec.slice_hrn.split(".")[1].split("_")[0]
541                 #print >>sys.stderr, " \r\n \r\n \tSLABDRIVER.PY slicerec GetSlices   %s " %(slicerec)
542                 if slicerec.oar_job_id is not -1:
543                     rslt = self.GetJobs( slicerec.oar_job_id, resources=False, username = login )
544                     #print >>sys.stderr, " \r\n \r\n \tSLABDRIVER.PY  GetSlices  GetJobs  %s " %(rslt)     
545                     if rslt :
546                         rec.update(rslt)
547                         rec.update({'hrn':str(rec['slice_hrn'])})
548                         #If GetJobs is empty, this means the job is now in the 'Terminated' state
549                         #Update the slice record
550                     else :
551                         self.db.update_job(slice_filter, job_id = -1)
552                         rec['oar_job_id'] = -1
553                         rec.update({'hrn':str(rec['slice_hrn'])})
554             
555                 try:
556                     rec['node_ids'] = rec['node_list']
557                 except KeyError:
558                     pass
559                 
560                 #print >>sys.stderr, " \r\n \r\n \tSLABDRIVER.PY  GetSlices  rec  %s" %(rec)
561                               
562             return rec
563                 
564                 
565         else:
566             return_slice_list = slab_dbsession.query(SliceSenslab).all()
567
568         print >>sys.stderr, " \r\n \r\n \tSLABDRIVER.PY  GetSlices  slices %s slice_filter %s " %(return_slice_list,slice_filter)
569         
570         #if return_fields:
571             #return_slice_list  = parse_filter(sliceslist, slice_filter,'slice', return_fields)
572         
573         
574                     
575         return return_slice_list
576         
577
578         
579     
580     def testbed_name (self): return "senslab2" 
581          
582     # 'geni_request_rspec_versions' and 'geni_ad_rspec_versions' are mandatory
583     def aggregate_version (self):
584         version_manager = VersionManager()
585         ad_rspec_versions = []
586         request_rspec_versions = []
587         for rspec_version in version_manager.versions:
588             if rspec_version.content_type in ['*', 'ad']:
589                 ad_rspec_versions.append(rspec_version.to_dict())
590             if rspec_version.content_type in ['*', 'request']:
591                 request_rspec_versions.append(rspec_version.to_dict()) 
592         return {
593             'testbed':self.testbed_name(),
594             'geni_request_rspec_versions': request_rspec_versions,
595             'geni_ad_rspec_versions': ad_rspec_versions,
596             }
597           
598           
599           
600           
601           
602           
603     ##
604     # Convert SFA fields to PLC fields for use when registering up updating
605     # registry record in the PLC database
606     #
607     # @param type type of record (user, slice, ...)
608     # @param hrn human readable name
609     # @param sfa_fields dictionary of SFA fields
610     # @param slab_fields dictionary of PLC fields (output)
611
612     def sfa_fields_to_slab_fields(self, type, hrn, record):
613
614         def convert_ints(tmpdict, int_fields):
615             for field in int_fields:
616                 if field in tmpdict:
617                     tmpdict[field] = int(tmpdict[field])
618
619         slab_record = {}
620         #for field in record:
621         #    slab_record[field] = record[field]
622  
623         if type == "slice":
624             #instantion used in get_slivers ? 
625             if not "instantiation" in slab_record:
626                 slab_record["instantiation"] = "senslab-instantiated"
627             slab_record["hrn"] = hrn_to_pl_slicename(hrn)
628             print >>sys.stderr, "\r\n \r\n \t SLABDRIVER.PY sfa_fields_to_slab_fields slab_record %s hrn_to_pl_slicename(hrn) hrn %s " %(slab_record['hrn'], hrn)
629             if "url" in record:
630                slab_record["url"] = record["url"]
631             if "description" in record:
632                 slab_record["description"] = record["description"]
633             if "expires" in record:
634                 slab_record["expires"] = int(record["expires"])
635                 
636         #nodes added by OAR only and then imported to SFA
637         #elif type == "node":
638             #if not "hostname" in slab_record:
639                 #if not "hostname" in record:
640                     #raise MissingSfaInfo("hostname")
641                 #slab_record["hostname"] = record["hostname"]
642             #if not "model" in slab_record:
643                 #slab_record["model"] = "geni"
644                 
645         #One authority only 
646         #elif type == "authority":
647             #slab_record["login_base"] = hrn_to_slab_login_base(hrn)
648
649             #if not "name" in slab_record:
650                 #slab_record["name"] = hrn
651
652             #if not "abbreviated_name" in slab_record:
653                 #slab_record["abbreviated_name"] = hrn
654
655             #if not "enabled" in slab_record:
656                 #slab_record["enabled"] = True
657
658             #if not "is_public" in slab_record:
659                 #slab_record["is_public"] = True
660
661         return slab_record
662
663                    
664     def AddSliceToNodes(self,  slice_dict, added_nodes, slice_user=None):
665        
666         site_list = []
667         nodeid_list =[]
668         resource = ""
669         reqdict = {}
670         slice_name = slice_dict['name']
671         try:
672             slot = slice_dict['timeslot'] 
673            
674         except KeyError:
675             slot = { 'time':None, 'duration':'00:10:00' }#10 min 
676             reqdict['resource']+= ",walltime=" + str(00) + ":" + str(12) + ":" + str(20) #+2 min 20
677             reqdict['script_path'] = "/bin/sleep 620" #+20 sec
678             
679         reqdict['property'] ="network_address in ("
680         for node in added_nodes:
681             #Get the ID of the node : remove the root auth and put the site in a separate list
682             s=node.split(".")
683             # NT: it's not clear for me if the nodenames will have the senslab prefix
684             # so lets take the last part only, for now.
685             lastpart=s[-1]
686             #if s[0] == self.root_auth :
687             # Again here it's not clear if nodes will be prefixed with <site>_, lets split and tanke the last part for now.
688             s=lastpart.split("_")
689             nodeid=s[-1]
690             reqdict['property'] += "'"+ nodeid +"', "
691             nodeid_list.append(nodeid)
692             #site_list.append( l[0] )
693             
694             
695         reqdict['property'] =  reqdict['property'][0: len( reqdict['property'])-2] +")"
696         reqdict['resource'] ="network_address="+ str(len(nodeid_list))
697         
698
699         #In case of a scheduled experiment
700         if slot['time']:
701             
702             walltime = slot['duration'].split(":")
703             # Fixing the walltime by adding a few delays. First put the walltime in seconds
704             # oarAdditionalDelay = 20; additional delay for /bin/sleep command to
705             # take in account  prologue and epilogue scripts execution
706             # int walltimeAdditionalDelay = 120;  additional delay
707
708             desired_walltime =  int(walltime[0])*3600 + int(walltime[1]) * 60 + int(walltime[2])
709             total_walltime = desired_walltime + 140 #+2 min 20
710             sleep_walltime = desired_walltime + 20 #+20 sec
711             print>>sys.stderr, "\r\n \r\n \t\tAddSliceToNodes desired_walltime %s  total_walltime %s sleep_walltime %s  " %(desired_walltime,total_walltime,sleep_walltime)
712             #Put the walltime back in str form
713             #First get the hours
714             walltime[0] = str(total_walltime / 3600)
715             total_walltime = total_walltime - 3600 * int(walltime[0])
716             #Get the remaining minutes
717             walltime[1] = str(total_walltime / 60)
718             total_walltime =  total_walltime - 60 * int(walltime[1])
719             #Get the seconds
720             walltime[2] = str(total_walltime)
721             print>>sys.stderr, "\r\n \r\n \t\tAddSliceToNodes  walltime %s " %(walltime)
722
723             reqdict['resource']+= ",walltime=" + str(walltime[0]) + ":" + str(walltime[1]) + ":" + str(walltime[2]) 
724             reqdict['script_path'] = "/bin/sleep " + str(sleep_walltime)
725             
726             #Get the reservation time
727             parse_time = slot['time'].split(" ")
728             #If timezone not specified, assume it is server timezone
729             if len(parse_time) == 2:
730               server_timestamp,server_tz = self.GetTimezone()
731               from_zone=tz.gettz(server_tz) 
732               date = ' '.join(parse_time)
733             else:    
734                 date = ' '.join(parse_time[:-1])
735                  #Get zone of the user from the reservation time given in the rspec
736                 from_zone = tz.gettz(parse_time[2])
737
738             user_datetime = datetime.datetime.strptime(date, self.time_format)
739             user_datetime = user_datetime.replace(tzinfo = from_zone)
740             
741             #Convert to UTC zone
742             to_zone = tz.tzutc()
743             utc_date = user_datetime.astimezone(to_zone)
744             #Readable time accpeted by OAR
745             reqdict['reservation']= utc_date.strftime(self.time_format)
746         
747             print>>sys.stderr, "\r\n \r\n \t\tAddSliceToNodes  reqdict['reservation'] %s " %(reqdict['reservation'])
748             
749         else:
750             # Immediate XP
751             # reservations are performed in the oar server timebase, so :
752             # 1- we get the server time(in UTC tz )/server timezone
753             # 2- convert the server UTC time in its timezone
754             # 3- add a custom delay to this time
755             # 4- convert this time to a readable form and it for the reservation request.
756             server_timestamp,server_tz = self.GetTimezone()
757             s_tz=tz.gettz(server_tz)
758             UTC_zone = tz.gettz("UTC")
759             #weird... datetime.fromtimestamp should work since we do from datetime import datetime
760             utc_server= datetime.datetime.fromtimestamp(float(server_timestamp)+20,UTC_zone)
761             server_localtime=utc_server.astimezone(s_tz)
762     
763             print>>sys.stderr, "\r\n \r\n \t\tAddSliceToNodes server_timestamp %s server_tz %s slice_name %s added_nodes %s username %s reqdict %s " %(server_timestamp,server_tz,slice_name,added_nodes,slice_user, reqdict )
764             readable_time = server_localtime.strftime(self.time_format)
765
766             print >>sys.stderr,"  \r\n \r\n \t\t\t\tAPRES ParseTimezone readable_time %s timestanp %s  " %(readable_time ,server_timestamp)
767             reqdict['reservation'] = readable_time
768         
769
770         reqdict['type'] = "deploy" 
771         reqdict['directory']= ""
772         reqdict['name']= "TestSandrine"
773        
774          
775         # first step : start the OAR job and update the job 
776         print>>sys.stderr, "\r\n \r\n AddSliceToNodes reqdict   %s \r\n site_list   %s"  %(reqdict,site_list)   
777        
778         answer = self.oar.POSTRequestToOARRestAPI('POST_job',reqdict,slice_user)
779         print>>sys.stderr, "\r\n \r\n AddSliceToNodes jobid   %s "  %(answer)
780         try:       
781             jobid = answer['id']
782         except KeyError:
783              print>>sys.stderr, "\r\n AddSliceTonode Impossible to create job  %s "  %( answer)
784              return
785         
786         print>>sys.stderr, "\r\n \r\n AddSliceToNodes jobid    %s added_nodes  %s slice_user %s"  %(jobid,added_nodes,slice_user)
787         self.db.update_job( slice_name, jobid ,added_nodes)
788         
789           
790         # second step : configure the experiment
791         # we need to store the nodes in a yaml (well...) file like this :
792         # [1,56,23,14,45,75] with name /tmp/sfa<jobid>.json
793         f=open('/tmp/sfa/'+str(jobid)+'.json','w')
794         f.write('[')
795         f.write(str(added_nodes[0].strip('node')))
796         for node in added_nodes[1:len(added_nodes)] :
797             f.write(','+node.strip('node'))
798         f.write(']')
799         f.close()
800         
801         # third step : call the senslab-experiment wrapper
802         #command= "java -jar target/sfa-1.0-jar-with-dependencies.jar "+str(jobid)+" "+slice_user
803         javacmdline="/usr/bin/java"
804         jarname="/opt/senslabexperimentwrapper/sfa-1.0-jar-with-dependencies.jar"
805         #ret=subprocess.check_output(["/usr/bin/java", "-jar", ", str(jobid), slice_user])
806         output = subprocess.Popen([javacmdline, "-jar", jarname, str(jobid), slice_user],stdout=subprocess.PIPE).communicate()[0]
807
808         print>>sys.stderr, "\r\n \r\n AddSliceToNodes wrapper returns   %s "  %(output)
809         return 
810                  
811  
812     #Delete the jobs and updates the job id in the senslab table
813     #to set it to -1  
814     #Does not clear the node list 
815     def DeleteSliceFromNodes(self, slice_record):
816          # Get user information
817        
818         self.DeleteJobs(slice_record['oar_job_id'], slice_record['hrn'])
819         self.db.update_job(slice_record['hrn'], job_id = -1)
820         return   
821     
822  
823
824     def fill_record_sfa_info(self, records):
825
826         def startswith(prefix, values):
827             return [value for value in values if value.startswith(prefix)]
828
829         # get person ids
830         person_ids = []
831         site_ids = []
832         for record in records:
833             person_ids.extend(record.get("person_ids", []))
834             site_ids.extend(record.get("site_ids", [])) 
835             if 'site_id' in record:
836                 site_ids.append(record['site_id']) 
837                 
838         #print>>sys.stderr, "\r\n \r\n _fill_record_sfa_info ___person_ids %s \r\n \t\t site_ids %s " %(person_ids, site_ids)
839         
840         # get all pis from the sites we've encountered
841         # and store them in a dictionary keyed on site_id 
842         site_pis = {}
843         if site_ids:
844             pi_filter = {'|roles': ['pi'], '|site_ids': site_ids} 
845             pi_list = self.GetPersons( pi_filter, ['person_id', 'site_ids'])
846             #print>>sys.stderr, "\r\n \r\n _fill_record_sfa_info ___ GetPersons ['person_id', 'site_ids'] pi_ilist %s" %(pi_list)
847
848             for pi in pi_list:
849                 # we will need the pi's hrns also
850                 person_ids.append(pi['person_id'])
851                 
852                 # we also need to keep track of the sites these pis
853                 # belong to
854                 for site_id in pi['site_ids']:
855                     if site_id in site_pis:
856                         site_pis[site_id].append(pi)
857                     else:
858                         site_pis[site_id] = [pi]
859                  
860         # get sfa records for all records associated with these records.   
861         # we'll replace pl ids (person_ids) with hrns from the sfa records
862         # we obtain
863         
864         # get the sfa records
865         #table = SfaTable()
866         existing_records = {}
867         all_records = dbsession.query(RegRecord).all()
868         for record in all_records:
869             existing_records[(record.type,record.pointer)] = record
870             
871         print >>sys.stderr, " \r\r\n SLABDRIVER fill_record_sfa_info existing_records %s "  %(existing_records)
872         person_list, persons = [], {}
873         #person_list = table.find({'type': 'user', 'pointer': person_ids})
874         try:
875             for p_id in person_ids:
876                 person_list.append( existing_records.get(('user',p_id)))
877         except KeyError:
878             print >>sys.stderr, " \r\r\n SLABDRIVER fill_record_sfa_info ERRRRRRRRRROR"
879                  
880         # create a hrns keyed on the sfa record's pointer.
881         # Its possible for  multiple records to have the same pointer so
882         # the dict's value will be a list of hrns.
883         persons = defaultdict(list)
884         for person in person_list:
885             persons[person['pointer']].append(person)
886
887         # get the pl records
888         slab_person_list, slab_persons = [], {}
889         slab_person_list = self.GetPersons(person_ids, ['person_id', 'roles'])
890         slab_persons = list_to_dict(slab_person_list, 'person_id')
891         #print>>sys.stderr, "\r\n \r\n _fill_record_sfa_info ___  _list %s \r\n \t\t SenslabUsers.GetPersons ['person_id', 'roles'] slab_persons %s \r\n records %s" %(slab_person_list, slab_persons,records) 
892         # fill sfa info
893         
894         for record in records:
895             # skip records with no pl info (top level authorities)
896             #Sandrine 24 oct 11 2 lines
897             #if record['pointer'] == -1:
898                 #continue 
899             sfa_info = {}
900             type = record['type']
901             if (type == "slice"):
902                 # all slice users are researchers
903                 #record['geni_urn'] = hrn_to_urn(record['hrn'], 'slice')  ? besoin ou pas ?
904                 record['PI'] = []
905                 record['researcher'] = []
906                 for person_id in record.get('person_ids', []):
907                          #Sandrine 24 oct 11 line
908                 #for person_id in record['person_ids']:
909                     hrns = [person['hrn'] for person in persons[person_id]]
910                     record['researcher'].extend(hrns)                
911
912                 # pis at the slice's site
913                 slab_pis = site_pis[record['site_id']]
914                 pi_ids = [pi['person_id'] for pi in slab_pis]
915                 for person_id in pi_ids:
916                     hrns = [person['hrn'] for person in persons[person_id]]
917                     record['PI'].extend(hrns)
918                 record['geni_urn'] = hrn_to_urn(record['hrn'], 'slice')
919                 record['geni_creator'] = record['PI'] 
920                 
921             elif (type == "authority"):
922                 record['PI'] = []
923                 record['operator'] = []
924                 record['owner'] = []
925                 for pointer in record['person_ids']:
926                     if pointer not in persons or pointer not in slab_persons:
927                         # this means there is not sfa or pl record for this user
928                         continue   
929                     hrns = [person['hrn'] for person in persons[pointer]] 
930                     roles = slab_persons[pointer]['roles']   
931                     if 'pi' in roles:
932                         record['PI'].extend(hrns)
933                     if 'tech' in roles:
934                         record['operator'].extend(hrns)
935                     if 'admin' in roles:
936                         record['owner'].extend(hrns)
937                     # xxx TODO: OrganizationName
938             elif (type == "node"):
939                 sfa_info['dns'] = record.get("hostname", "")
940                 # xxx TODO: URI, LatLong, IP, DNS
941     
942             elif (type == "user"):
943                  sfa_info['email'] = record.get("email", "")
944                  sfa_info['geni_urn'] = hrn_to_urn(record['hrn'], 'user')
945                  sfa_info['geni_certificate'] = record['gid'] 
946                 # xxx TODO: PostalAddress, Phone
947                 
948             #print>>sys.stderr, "\r\n \r\rn \t\t \t <<<<<<<<<<<<<<<<<<<<<<<<  fill_record_sfa_info sfa_info %s  \r\n record %s : "%(sfa_info,record)  
949             record.update(sfa_info)
950             
951     def augment_records_with_testbed_info (self, sfa_records):
952         return self.fill_record_info (sfa_records)
953     
954     def fill_record_info(self, records):
955         """
956         Given a SFA record, fill in the senslab specific and SFA specific
957         fields in the record. 
958         """
959                     
960         print >>sys.stderr, "\r\n \t\t  SLABDRIVER.PY fill_record_info 000000000 fill_record_info %s  " %(records)
961         if not isinstance(records, list):
962             records = [records]
963
964         parkour = records 
965         try:
966             for record in parkour:
967                     
968                 if str(record['type']) == 'slice':
969                     #print >>sys.stderr, "\r\n \t\t  SLABDRIVER.PY  fill_record_info \t \t record %s" %(record)
970                     #sfatable = SfaTable()
971                     
972                     #existing_records_by_id = {}
973                     #all_records = dbsession.query(RegRecord).all()
974                     #for rec in all_records:
975                         #existing_records_by_id[rec.record_id] = rec
976                     #print >>sys.stderr, "\r\n \t\t SLABDRIVER.PY  fill_record_info \t\t existing_records_by_id %s" %(existing_records_by_id[record['record_id']])
977                         
978                     #recslice = self.db.find('slice',{'slice_hrn':str(record['hrn'])}) 
979                     #recslice = slab_dbsession.query(SliceSenslab).filter_by(slice_hrn = str(record['hrn'])).first()
980                     recslice = self.GetSlices(slice_filter =  str(record['hrn']), filter_type = 'slice_hrn')
981                     #print >>sys.stderr, "\r\n \t\t  SLABDRIVER.PY fill_record_info \t\t HOY HOY reclise %s" %(recslice)
982                     #if isinstance(recslice,list) and len(recslice) == 1:
983                         #recslice = recslice[0]
984                    
985                     recuser = dbsession.query(RegRecord).filter_by(record_id = recslice['record_id_user']).first()
986                     #existing_records_by_id[recslice['record_id_user']]
987                     #print >>sys.stderr, "\r\n \t\t  SLABDRIVER.PY fill_record_info \t\t recuser %s" %(recuser)
988                     
989           
990                     record.update({'PI':[recuser.hrn],
991                     'researcher': [recuser.hrn],
992                     'name':record['hrn'], 
993                     'oar_job_id':recslice['oar_job_id'],
994                     'node_ids': [],
995                     'person_ids':[recslice['record_id_user']]})
996                     
997                 elif str(record['type']) == 'user':
998                     #print >>sys.stderr, "\r\n \t\t  SLABDRIVER.PY fill_record_info USEEEEEEEEEERDESU!" 
999
1000                     rec = self.GetSlices(slice_filter = record['record_id'], filter_type = 'record_id_user')
1001                     #Append record in records list, therfore fetches user and slice info again(one more loop)
1002                     #Will update PIs and researcher for the slice
1003
1004                     rec.update({'type':'slice','hrn':rec['slice_hrn']})
1005                     records.append(rec)
1006                     #print >>sys.stderr, "\r\n \t\t  SLABDRIVER.PY fill_record_info ADDING SLIC EINFO rec %s" %(rec) 
1007                     
1008             print >>sys.stderr, "\r\n \t\t  SLABDRIVER.PY fill_record_info OKrecords %s" %(records) 
1009         except TypeError:
1010             print >>sys.stderr, "\r\n \t\t SLABDRIVER fill_record_info  EXCEPTION RECORDS : %s" %(records)      
1011             return
1012         
1013         #self.fill_record_slab_info(records)
1014         ##print >>sys.stderr, "\r\n \t\t after fill_record_slab_info %s" %(records)     
1015         #self.fill_record_sfa_info(records)
1016         #print >>sys.stderr, "\r\n \t\t after fill_record_sfa_info"
1017         
1018         
1019
1020     
1021         
1022     #def update_membership_list(self, oldRecord, record, listName, addFunc, delFunc):
1023         ## get a list of the HRNs tht are members of the old and new records
1024         #if oldRecord:
1025             #oldList = oldRecord.get(listName, [])
1026         #else:
1027             #oldList = []     
1028         #newList = record.get(listName, [])
1029
1030         ## if the lists are the same, then we don't have to update anything
1031         #if (oldList == newList):
1032             #return
1033
1034         ## build a list of the new person ids, by looking up each person to get
1035         ## their pointer
1036         #newIdList = []
1037         #table = SfaTable()
1038         #records = table.find({'type': 'user', 'hrn': newList})
1039         #for rec in records:
1040             #newIdList.append(rec['pointer'])
1041
1042         ## build a list of the old person ids from the person_ids field 
1043         #if oldRecord:
1044             #oldIdList = oldRecord.get("person_ids", [])
1045             #containerId = oldRecord.get_pointer()
1046         #else:
1047             ## if oldRecord==None, then we are doing a Register, instead of an
1048             ## update.
1049             #oldIdList = []
1050             #containerId = record.get_pointer()
1051
1052     ## add people who are in the new list, but not the oldList
1053         #for personId in newIdList:
1054             #if not (personId in oldIdList):
1055                 #addFunc(self.plauth, personId, containerId)
1056
1057         ## remove people who are in the old list, but not the new list
1058         #for personId in oldIdList:
1059             #if not (personId in newIdList):
1060                 #delFunc(self.plauth, personId, containerId)
1061
1062     #def update_membership(self, oldRecord, record):
1063         #print >>sys.stderr, " \r\n \r\n ***SLABDRIVER.PY update_membership record ", record
1064         #if record.type == "slice":
1065             #self.update_membership_list(oldRecord, record, 'researcher',
1066                                         #self.users.AddPersonToSlice,
1067                                         #self.users.DeletePersonFromSlice)
1068         #elif record.type == "authority":
1069             ## xxx TODO
1070             #pass
1071
1072 ### thierry
1073 # I don't think you plan on running a component manager at this point
1074 # let me clean up the mess of ComponentAPI that is deprecated anyways