Bugfix in AddSlicetoNodes.
[sfa.git] / sfa / senslab / slabdriver.py
1 import sys
2 import subprocess
3
4 from datetime import datetime
5 from dateutil import tz 
6 from time import strftime,gmtime
7
8 from sfa.util.faults import MissingSfaInfo , SliverDoesNotExist
9 from sfa.util.sfalogging import logger
10 from sfa.util.defaultdict import defaultdict
11
12 from sfa.storage.record import Record
13 from sfa.storage.alchemy import dbsession
14 from sfa.storage.model import RegRecord
15
16
17 from sfa.trust.certificate import *
18 from sfa.trust.credential import *
19 from sfa.trust.gid import GID
20
21 from sfa.managers.driver import Driver
22 from sfa.rspecs.version_manager import VersionManager
23 from sfa.rspecs.rspec import RSpec
24
25 from sfa.util.xrn import hrn_to_urn, urn_to_sliver_id
26 from sfa.util.plxrn import slicename_to_hrn, hostname_to_hrn, hrn_to_pl_slicename
27
28 ## thierry: everything that is API-related (i.e. handling incoming requests) 
29 # is taken care of 
30 # SlabDriver should be really only about talking to the senslab testbed
31
32 ## thierry : please avoid wildcard imports :)
33 from sfa.senslab.OARrestapi import  OARrestapi
34 from sfa.senslab.LDAPapi import LDAPapi
35
36 from sfa.senslab.parsing import parse_filter
37 from sfa.senslab.slabpostgres import SlabDB, slab_dbsession,SliceSenslab
38 from sfa.senslab.slabaggregate import SlabAggregate
39 from sfa.senslab.slabslices import SlabSlices
40
41 def list_to_dict(recs, key):
42     """
43     convert a list of dictionaries into a dictionary keyed on the 
44     specified dictionary key 
45     """
46
47     keys = [rec[key] for rec in recs]
48     return dict(zip(keys, recs))
49
50 # thierry : note
51 # this inheritance scheme is so that the driver object can receive
52 # GetNodes or GetSites sorts of calls directly
53 # and thus minimize the differences in the managers with the pl version
54 class SlabDriver(Driver):
55
56     def __init__(self, config):
57         Driver.__init__ (self, config)
58         self.config=config
59         self.hrn = config.SFA_INTERFACE_HRN
60     
61         self.root_auth = config.SFA_REGISTRY_ROOT_AUTH
62
63         
64         print >>sys.stderr, "\r\n_____________ SFA SENSLAB DRIVER \r\n" 
65
66         self.oar = OARrestapi()
67         self.ldap = LDAPapi()
68         self.time_format = "%Y-%m-%d %H:%M:%S"
69         self.db = SlabDB(config)
70         self.cache=None
71         
72
73     def sliver_status(self,slice_urn,slice_hrn):
74         # receive a status request for slice named urn/hrn urn:publicid:IDN+senslab+nturro_slice hrn senslab.nturro_slice
75         # shall return a structure as described in
76         # http://groups.geni.net/geni/wiki/GAPI_AM_API_V2#SliverStatus
77         # NT : not sure if we should implement this or not, but used by sface.
78         
79
80         sl = self.GetSlices(slice_filter= slice_hrn, filter_type = 'slice_hrn')
81         if len(sl) is 0:
82             raise SliverDoesNotExist("%s  slice_hrn" % (slice_hrn))
83
84         print >>sys.stderr, "\r\n \r\n_____________ Sliver status urn %s hrn %s sl %s \r\n " %(slice_urn,slice_hrn,sl)
85         if sl['oar_job_id'] is not -1:
86     
87             # report about the local nodes only
88             nodes_all = self.GetNodes({'hostname':sl['node_ids']},
89                             ['node_id', 'hostname','site','boot_state'])
90             nodeall_byhostname = dict([(n['hostname'], n) for n in nodes_all])
91             nodes = sl['node_ids']
92             if len(nodes) is 0:
93                 raise SliverDoesNotExist("No slivers allocated ") 
94                     
95
96             result = {}
97             top_level_status = 'unknown'
98             if nodes:
99                 top_level_status = 'ready'
100             result['geni_urn'] = slice_urn
101             result['pl_login'] = sl['job_user']
102             #result['slab_login'] = sl['job_user']
103             
104             timestamp = float(sl['startTime']) + float(sl['walltime']) 
105             result['pl_expires'] = strftime(self.time_format, gmtime(float(timestamp)))
106             #result['slab_expires'] = strftime(self.time_format, gmtime(float(timestamp)))
107             
108             resources = []
109             for node in nodes:
110                 res = {}
111                 #res['slab_hostname'] = node['hostname']
112                 #res['slab_boot_state'] = node['boot_state']
113                 
114                 res['pl_hostname'] = nodeall_byhostname[node]['hostname']
115                 res['pl_boot_state'] = nodeall_byhostname[node]['boot_state']
116                 res['pl_last_contact'] = strftime(self.time_format, gmtime(float(timestamp)))
117                 sliver_id = urn_to_sliver_id(slice_urn, sl['record_id_slice'],nodeall_byhostname[node]['node_id'] ) 
118                 res['geni_urn'] = sliver_id 
119                 if nodeall_byhostname[node]['boot_state'] == 'Alive':
120
121                     res['geni_status'] = 'ready'
122                 else:
123                     res['geni_status'] = 'failed'
124                     top_level_status = 'failed' 
125                     
126                 res['geni_error'] = ''
127         
128                 resources.append(res)
129                 
130             result['geni_status'] = top_level_status
131             result['geni_resources'] = resources 
132             print >>sys.stderr, "\r\n \r\n_____________ Sliver status resources %s res %s \r\n " %(resources,res)
133             return result        
134         
135         
136     def create_sliver (self, slice_urn, slice_hrn, creds, rspec_string, users, options):
137         print>>sys.stderr, "\r\n \r\n \t=============================== SLABDRIVER.PY create_sliver "
138         aggregate = SlabAggregate(self)
139         
140         slices = SlabSlices(self)
141         peer = slices.get_peer(slice_hrn)
142         sfa_peer = slices.get_sfa_peer(slice_hrn)
143         slice_record=None 
144  
145         if not isinstance(creds, list):
146             creds = [creds]
147     
148         if users:
149             slice_record = users[0].get('slice_record', {})
150     
151         # parse rspec
152         rspec = RSpec(rspec_string)
153         print>>sys.stderr, "\r\n \r\n \t=============================== SLABDRIVER.PY create_sliver  ============================rspec.version %s " %(rspec.version)
154         
155         
156         # ensure site record exists?
157         # ensure slice record exists
158         slice = slices.verify_slice(slice_hrn, slice_record, peer, sfa_peer, options=options)
159         requested_attributes = rspec.version.get_slice_attributes()
160         
161         if requested_attributes:
162             for attrib_dict in requested_attributes:
163                 if 'timeslot' in attrib_dict:
164                     slice.update({'timeslot':attrib_dict['timeslot']})
165         print >>sys.stderr, "\r\n \r\n \t=============================== SLABDRIVER.PY create_sliver  ..... slice %s " %(slice)
166         # ensure person records exists
167         persons = slices.verify_persons(slice_hrn, slice, users, peer, sfa_peer, options=options)
168         # ensure slice attributes exists?
169
170         
171         # add/remove slice from nodes 
172         print >>sys.stderr, "\r\n \r\n \t=============================== SLABDRIVER.PY create_sliver  ..... " 
173        
174         requested_slivers = [node.get('component_name') for node in rspec.version.get_nodes_with_slivers()]
175         print >>sys.stderr, "\r\n \r\n \t=============================== ........... requested_slivers ============================requested_slivers %s " %(requested_slivers)
176         nodes = slices.verify_slice_nodes(slice, requested_slivers, peer) 
177     
178         
179         return aggregate.get_rspec(slice_xrn=slice_urn, version=rspec.version)
180         
181         
182     def delete_sliver (self, slice_urn, slice_hrn, creds, options):
183         
184         slice = self.GetSlices(slice_filter= slice_hrn, filter_type = 'slice_hrn')
185         print>>sys.stderr, "\r\n \r\n \t\t  SLABDRIVER.PY delete_sliver slice %s" %(slice)
186         if not slice:
187             return 1
188        
189         slices = SlabSlices(self)
190         # determine if this is a peer slice
191         # xxx I wonder if this would not need to use PlSlices.get_peer instead 
192         # in which case plc.peers could be deprecated as this here
193         # is the only/last call to this last method in plc.peers
194         peer = slices.get_peer(slice_hrn)
195         try:
196             if peer:
197                 self.UnBindObjectFromPeer('slice', slice['record_id_slice'], peer)
198             self.DeleteSliceFromNodes(slice)
199         finally:
200             if peer:
201                 self.BindObjectToPeer('slice', slice['slice_id'], peer, slice['peer_slice_id'])
202         return 1
203             
204  
205     # first 2 args are None in case of resource discovery
206     def list_resources (self, slice_urn, slice_hrn, creds, options):
207         #cached_requested = options.get('cached', True) 
208     
209         version_manager = VersionManager()
210         # get the rspec's return format from options
211         rspec_version = version_manager.get_version(options.get('geni_rspec_version'))
212         version_string = "rspec_%s" % (rspec_version)
213     
214         #panos adding the info option to the caching key (can be improved)
215         if options.get('info'):
216             version_string = version_string + "_"+options.get('info', 'default')
217     
218         # look in cache first
219         #if cached_requested and self.cache and not slice_hrn:
220             #rspec = self.cache.get(version_string)
221             #if rspec:
222                 #logger.debug("SlabDriver.ListResources: returning cached advertisement")
223                 #return rspec 
224     
225         #panos: passing user-defined options
226
227         aggregate = SlabAggregate(self)
228         origin_hrn = Credential(string=creds[0]).get_gid_caller().get_hrn()
229         #print>>sys.stderr, " \r\n \r\n \t SLABDRIVER list_resources origin_hrn %s" %(origin_hrn)
230         options.update({'origin_hrn':origin_hrn})
231         #print>>sys.stderr, " \r\n \r\n \t SLABDRIVER  list_resources options %s" %(options)
232         rspec =  aggregate.get_rspec(slice_xrn=slice_urn, version=rspec_version, 
233                                      options=options)
234         print>>sys.stderr, " \r\n \r\n \t SLABDRIVER list_resources rspec " 
235         # cache the result
236         #if self.cache and not slice_hrn:
237             #logger.debug("Slab.ListResources: stores advertisement in cache")
238             #self.cache.add(version_string, rspec)
239     
240         return rspec
241         
242         
243     def list_slices (self, creds, options):
244         # look in cache first
245         #if self.cache:
246             #slices = self.cache.get('slices')
247             #if slices:
248                 #logger.debug("PlDriver.list_slices returns from cache")
249                 #return slices
250     
251         # get data from db 
252         print>>sys.stderr, " \r\n \t\t SLABDRIVER.PY list_slices"
253         slices = self.GetSlices()
254         slice_hrns = [slicename_to_hrn(self.hrn, slice['slice_hrn']) for slice in slices]
255         slice_urns = [hrn_to_urn(slice_hrn, 'slice') for slice_hrn in slice_hrns]
256     
257         # cache the result
258         #if self.cache:
259             #logger.debug ("SlabDriver.list_slices stores value in cache")
260             #self.cache.add('slices', slice_urns) 
261     
262         return slice_urns
263     
264     #No site or node register supported
265     def register (self, sfa_record, hrn, pub_key):
266         type = sfa_record['type']
267         slab_record = self.sfa_fields_to_slab_fields(type, hrn, sfa_record)
268     
269
270         if type == 'slice':
271             acceptable_fields=['url', 'instantiation', 'name', 'description']
272             for key in slab_record.keys():
273                 if key not in acceptable_fields:
274                     slab_record.pop(key) 
275             print>>sys.stderr, " \r\n \t\t SLABDRIVER.PY register"
276             slices = self.GetSlices(slice_filter =slab_record['hrn'], filter_type = 'slice_hrn')
277             if not slices:
278                     pointer = self.AddSlice(slab_record)
279             else:
280                     pointer = slices[0]['slice_id']
281     
282         elif type == 'user':
283             persons = self.GetPersons([sfa_record['hrn']])
284             if not persons:
285                 pointer = self.AddPerson(dict(sfa_record))
286                 #add in LDAP 
287             else:
288                 pointer = persons[0]['person_id']
289                 
290             #Does this make sense to senslab ?
291             #if 'enabled' in sfa_record and sfa_record['enabled']:
292                 #self.UpdatePerson(pointer, {'enabled': sfa_record['enabled']})
293                 
294             # add this person to the site only if she is being added for the first
295             # time by sfa and doesont already exist in plc
296             if not persons or not persons[0]['site_ids']:
297                 login_base = get_leaf(sfa_record['authority'])
298                 self.AddPersonToSite(pointer, login_base)
299     
300             # What roles should this user have?
301             self.AddRoleToPerson('user', pointer)
302             # Add the user's key
303             if pub_key:
304                 self.AddPersonKey(pointer, {'key_type' : 'ssh', 'key' : pub_key})
305                 
306         #No node adding outside OAR
307
308         return pointer
309             
310     #No site or node record update allowed       
311     def update (self, old_sfa_record, new_sfa_record, hrn, new_key):
312         pointer = old_sfa_record['pointer']
313         type = old_sfa_record['type']
314
315         # new_key implemented for users only
316         if new_key and type not in [ 'user' ]:
317             raise UnknownSfaType(type)
318         
319         #if (type == "authority"):
320             #self.shell.UpdateSite(pointer, new_sfa_record)
321     
322         if type == "slice":
323             slab_record=self.sfa_fields_to_slab_fields(type, hrn, new_sfa_record)
324             if 'name' in slab_record:
325                 slab_record.pop('name')
326                 self.UpdateSlice(pointer, slab_record)
327     
328         elif type == "user":
329             update_fields = {}
330             all_fields = new_sfa_record
331             for key in all_fields.keys():
332                 if key in ['first_name', 'last_name', 'title', 'email',
333                            'password', 'phone', 'url', 'bio', 'accepted_aup',
334                            'enabled']:
335                     update_fields[key] = all_fields[key]
336             self.UpdatePerson(pointer, update_fields)
337     
338             if new_key:
339                 # must check this key against the previous one if it exists
340                 persons = self.GetPersons([pointer], ['key_ids'])
341                 person = persons[0]
342                 keys = person['key_ids']
343                 keys = self.GetKeys(person['key_ids'])
344                 
345                 # Delete all stale keys
346                 key_exists = False
347                 for key in keys:
348                     if new_key != key['key']:
349                         self.DeleteKey(key['key_id'])
350                     else:
351                         key_exists = True
352                 if not key_exists:
353                     self.AddPersonKey(pointer, {'key_type': 'ssh', 'key': new_key})
354
355
356         return True
357         
358
359     def remove (self, sfa_record):
360         type=sfa_record['type']
361         hrn=sfa_record['hrn']
362         record_id= sfa_record['record_id']
363         if type == 'user':
364             username = hrn.split(".")[len(hrn.split(".")) -1]
365             #get user in ldap
366             persons = self.GetPersons(username)
367             # only delete this person if he has site ids. if he doesnt, it probably means
368             # he was just removed from a site, not actually deleted
369             if persons and persons[0]['site_ids']:
370                 self.DeletePerson(username)
371         elif type == 'slice':
372             if self.GetSlices(slice_filter = hrn, filter_type = 'slice_hrn'):
373                 self.DeleteSlice(hrn)
374
375         #elif type == 'authority':
376             #if self.GetSites(pointer):
377                 #self.DeleteSite(pointer)
378
379         return True
380             
381     def GetPeers (self,auth = None, peer_filter=None, return_fields=None):
382
383         existing_records = {}
384         existing_hrns_by_types= {}
385         print >>sys.stderr, "\r\n \r\n SLABDRIVER GetPeers auth = %s, peer_filter %s, return_field %s " %(auth , peer_filter, return_fields)
386         all_records = dbsession.query(RegRecord).filter(RegRecord.type.like('%authority%')).all()
387         for record in all_records:
388             existing_records[record.hrn] = record
389             if record.type not in existing_hrns_by_types:
390                 existing_hrns_by_types[record.type] = [record.hrn]
391                 print >>sys.stderr, "\r\n \r\n SLABDRIVER GetPeers \t NOT IN existing_hrns_by_types %s " %( existing_hrns_by_types)
392             else:
393                 
394                 print >>sys.stderr, "\r\n \r\n SLABDRIVER GetPeers \t INNN  type %s hrn %s " %( record.type,record.hrn )
395                 existing_hrns_by_types.update({record.type:(existing_hrns_by_types[record.type].append(record.hrn))})
396                         
397         print >>sys.stderr, "\r\n \r\n SLABDRIVER GetPeers        existing_hrns_by_types %s " %( existing_hrns_by_types)
398         records_list= [] 
399       
400         try:
401             for hrn in existing_hrns_by_types['authority+sa']:
402                 records_list.append(existing_records[hrn])
403                 print >>sys.stderr, "\r\n \r\n SLABDRIVER GetPeers  records_list  %s " %(records_list)
404                 
405         except:
406                 pass
407
408         if not peer_filter and not return_fields:
409             return records_list
410         return_records = parse_filter(records_list,peer_filter, 'peers', return_fields) 
411  
412         return return_records
413         
414      
415             
416     def GetPersons(self, person_filter=None, return_fields=None):
417         
418         person_list = self.ldap.ldapFind({'authority': self.root_auth })
419         
420         #check = False
421         #if person_filter and isinstance(person_filter, dict):
422             #for k in  person_filter.keys():
423                 #if k in person_list[0].keys():
424                     #check = True
425                     
426         return_person_list = parse_filter(person_list,person_filter ,'persons', return_fields)
427         if return_person_list:
428             print>>sys.stderr, " \r\n GetPersons person_filter %s return_fields %s  " %(person_filter,return_fields)
429             return return_person_list
430
431     def GetTimezone(self):
432         server_timestamp,server_tz = self.oar.parser.SendRequest("GET_timezone")
433         return server_timestamp,server_tz
434     
435
436     def DeleteJobs(self, job_id, slice_hrn):
437         if not job_id:
438             return
439         username  = slice_hrn.split(".")[-1].rstrip("_slice")
440         reqdict = {}
441         reqdict['method'] = "delete"
442         reqdict['strval'] = str(job_id)
443         answer = self.oar.POSTRequestToOARRestAPI('DELETE_jobs_id',reqdict,username)
444         print>>sys.stderr, "\r\n \r\n  jobid  DeleteJobs %s "  %(answer)
445         
446                 
447     def GetJobs(self,job_id= None, resources=True,return_fields=None, username = None):
448         #job_resources=['reserved_resources', 'assigned_resources','job_id', 'job_uri', 'assigned_nodes',\
449         #'api_timestamp']
450         #assigned_res = ['resource_id', 'resource_uri']
451         #assigned_n = ['node', 'node_uri']
452      
453         if job_id and resources is False:
454             req = "GET_jobs_id"
455             node_list_k = 'assigned_network_address'
456            
457         if job_id and resources :
458             req = "GET_jobs_id_resources"
459             node_list_k = 'reserved_resources' 
460                
461         #Get job info from OAR    
462         job_info = self.oar.parser.SendRequest(req, job_id, username)
463         print>>sys.stderr, "\r\n \r\n \t\t GetJobs  %s " %(job_info)
464         
465         if 'state' in job_info :
466             if job_info['state'] == 'Terminated':
467                 print>>sys.stderr, "\r\n \r\n \t\t GetJobs TERMINELEBOUSIN "
468                 return None
469             if job_info['state'] == 'Error':
470                 print>>sys.stderr, "\r\n \r\n \t\t GetJobs ERROR message %s " %(job_info)
471                 return None
472         
473         #Get a dict of nodes . Key :hostname of the node
474         node_list = self.GetNodes() 
475         node_hostname_list = []
476         for node in node_list:
477             node_hostname_list.append(node['hostname'])
478         node_dict = dict(zip(node_hostname_list,node_list))
479         try :
480             liste =job_info[node_list_k] 
481             #print>>sys.stderr, "\r\n \r\n \t\t GetJobs resources  job_info liste%s" %(liste)
482             for k in range(len(liste)):
483                job_info[node_list_k][k] = node_dict[job_info[node_list_k][k]]['hostname']
484             
485             #print>>sys.stderr, "\r\n \r\n \t\t YYYYYYYYYYYYGetJobs resources  job_info %s" %(job_info)  
486             #Replaces the previous entry "assigned_network_address" / "reserved_resources"
487             #with "node_ids"
488             job_info.update({'node_ids':job_info[node_list_k]})
489             del job_info[node_list_k]
490             return job_info
491             
492         except KeyError:
493             print>>sys.stderr, "\r\n \r\n \t\t GetJobs KEYERROR " 
494             
495     def GetReservedNodes(self):
496         # this function returns a list of all the nodes already involved in an oar job
497
498        jobs=self.oar.parser.SendRequest("GET_jobs_details") 
499        nodes=[]
500        for j in jobs :
501           nodes=j['assigned_network_address']+nodes
502        return nodes
503      
504     def GetNodes(self,node_filter= None, return_fields=None):
505         node_dict =self.oar.parser.SendRequest("GET_resources_full")
506
507         return_node_list = []
508         if not (node_filter or return_fields):
509                 return_node_list = node_dict.values()
510                 return return_node_list
511     
512         return_node_list= parse_filter(node_dict.values(),node_filter ,'node', return_fields)
513         return return_node_list
514     
515   
516     def GetSites(self, site_filter = None, return_fields=None):
517         site_dict =self.oar.parser.SendRequest("GET_sites")
518         return_site_list = []
519         if not ( site_filter or return_fields):
520                 return_site_list = site_dict.values()
521                 return return_site_list
522     
523         return_site_list = parse_filter(site_dict.values(), site_filter,'site', return_fields)
524         return return_site_list
525         
526
527     def GetSlices(self,slice_filter = None, filter_type = None, return_fields=None):
528         return_slice_list = []
529         slicerec  = {}
530         rec = {}
531         ftypes = ['slice_hrn', 'record_id_user']
532         if filter_type and filter_type in ftypes:
533             if filter_type == 'slice_hrn':
534                 slicerec = slab_dbsession.query(SliceSenslab).filter_by(slice_hrn = slice_filter).first()    
535             if filter_type == 'record_id_user':
536                 slicerec = slab_dbsession.query(SliceSenslab).filter_by(record_id_user = slice_filter).first()
537                 
538             if slicerec:
539                 rec = slicerec.dumpquerytodict()
540                 login = slicerec.slice_hrn.split(".")[1].split("_")[0]
541                 #print >>sys.stderr, " \r\n \r\n \tSLABDRIVER.PY slicerec GetSlices   %s " %(slicerec)
542                 if slicerec.oar_job_id is not -1:
543                     rslt = self.GetJobs( slicerec.oar_job_id, resources=False, username = login )
544                     #print >>sys.stderr, " \r\n \r\n \tSLABDRIVER.PY  GetSlices  GetJobs  %s " %(rslt)     
545                     if rslt :
546                         rec.update(rslt)
547                         rec.update({'hrn':str(rec['slice_hrn'])})
548                         #If GetJobs is empty, this means the job is now in the 'Terminated' state
549                         #Update the slice record
550                     else :
551                         self.db.update_job(slice_filter, job_id = -1)
552                         rec['oar_job_id'] = -1
553                         rec.update({'hrn':str(rec['slice_hrn'])})
554             
555                 try:
556                     rec['node_ids'] = rec['node_list']
557                 except KeyError:
558                     pass
559                 
560                 #print >>sys.stderr, " \r\n \r\n \tSLABDRIVER.PY  GetSlices  rec  %s" %(rec)
561                               
562             return rec
563                 
564                 
565         else:
566             return_slice_list = slab_dbsession.query(SliceSenslab).all()
567
568         print >>sys.stderr, " \r\n \r\n \tSLABDRIVER.PY  GetSlices  slices %s slice_filter %s " %(return_slice_list,slice_filter)
569         
570         #if return_fields:
571             #return_slice_list  = parse_filter(sliceslist, slice_filter,'slice', return_fields)
572         
573         
574                     
575         return return_slice_list
576         
577
578         
579     
580     def testbed_name (self): return "senslab2" 
581          
582     # 'geni_request_rspec_versions' and 'geni_ad_rspec_versions' are mandatory
583     def aggregate_version (self):
584         version_manager = VersionManager()
585         ad_rspec_versions = []
586         request_rspec_versions = []
587         for rspec_version in version_manager.versions:
588             if rspec_version.content_type in ['*', 'ad']:
589                 ad_rspec_versions.append(rspec_version.to_dict())
590             if rspec_version.content_type in ['*', 'request']:
591                 request_rspec_versions.append(rspec_version.to_dict()) 
592         return {
593             'testbed':self.testbed_name(),
594             'geni_request_rspec_versions': request_rspec_versions,
595             'geni_ad_rspec_versions': ad_rspec_versions,
596             }
597           
598           
599           
600           
601           
602           
603     ##
604     # Convert SFA fields to PLC fields for use when registering up updating
605     # registry record in the PLC database
606     #
607     # @param type type of record (user, slice, ...)
608     # @param hrn human readable name
609     # @param sfa_fields dictionary of SFA fields
610     # @param slab_fields dictionary of PLC fields (output)
611
612     def sfa_fields_to_slab_fields(self, type, hrn, record):
613
614         def convert_ints(tmpdict, int_fields):
615             for field in int_fields:
616                 if field in tmpdict:
617                     tmpdict[field] = int(tmpdict[field])
618
619         slab_record = {}
620         #for field in record:
621         #    slab_record[field] = record[field]
622  
623         if type == "slice":
624             #instantion used in get_slivers ? 
625             if not "instantiation" in slab_record:
626                 slab_record["instantiation"] = "senslab-instantiated"
627             slab_record["hrn"] = hrn_to_pl_slicename(hrn)
628             print >>sys.stderr, "\r\n \r\n \t SLABDRIVER.PY sfa_fields_to_slab_fields slab_record %s hrn_to_pl_slicename(hrn) hrn %s " %(slab_record['hrn'], hrn)
629             if "url" in record:
630                slab_record["url"] = record["url"]
631             if "description" in record:
632                 slab_record["description"] = record["description"]
633             if "expires" in record:
634                 slab_record["expires"] = int(record["expires"])
635                 
636         #nodes added by OAR only and then imported to SFA
637         #elif type == "node":
638             #if not "hostname" in slab_record:
639                 #if not "hostname" in record:
640                     #raise MissingSfaInfo("hostname")
641                 #slab_record["hostname"] = record["hostname"]
642             #if not "model" in slab_record:
643                 #slab_record["model"] = "geni"
644                 
645         #One authority only 
646         #elif type == "authority":
647             #slab_record["login_base"] = hrn_to_slab_login_base(hrn)
648
649             #if not "name" in slab_record:
650                 #slab_record["name"] = hrn
651
652             #if not "abbreviated_name" in slab_record:
653                 #slab_record["abbreviated_name"] = hrn
654
655             #if not "enabled" in slab_record:
656                 #slab_record["enabled"] = True
657
658             #if not "is_public" in slab_record:
659                 #slab_record["is_public"] = True
660
661         return slab_record
662
663                    
664     def AddSliceToNodes(self,  slice_dict, added_nodes, slice_user=None):
665        
666         site_list = []
667         nodeid_list =[]
668         resource = ""
669         reqdict = {}
670         slice_name = slice_dict['name']
671         try:
672             slot = slice_dict['timeslot'] 
673             print>>sys.stderr, "\r\n \r\n \t\tAddSliceToNodes slot %s   " %(slot)
674         except KeyError:
675             #Running on default parameters
676             #XP immediate , 10 mins
677             slot = {'date':None,'start_time':None, 'timezone':None,'duration':None }#10 min 
678             
679             
680         reqdict['property'] ="network_address in ("
681         for node in added_nodes:
682             #Get the ID of the node : remove the root auth and put the site in a separate list
683             s=node.split(".")
684             # NT: it's not clear for me if the nodenames will have the senslab prefix
685             # so lets take the last part only, for now.
686             lastpart=s[-1]
687             #if s[0] == self.root_auth :
688             # Again here it's not clear if nodes will be prefixed with <site>_, lets split and tanke the last part for now.
689             s=lastpart.split("_")
690             nodeid=s[-1]
691             reqdict['property'] += "'"+ nodeid +"', "
692             nodeid_list.append(nodeid)
693             #site_list.append( l[0] )
694             
695             
696         reqdict['property'] =  reqdict['property'][0: len( reqdict['property'])-2] +")"
697         reqdict['resource'] ="network_address="+ str(len(nodeid_list))
698         
699         if slot['duration']:
700             walltime = slot['duration'].split(":")
701             # Fixing the walltime by adding a few delays. First put the walltime in seconds
702             # oarAdditionalDelay = 20; additional delay for /bin/sleep command to
703             # take in account  prologue and epilogue scripts execution
704             # int walltimeAdditionalDelay = 120;  additional delay
705
706             desired_walltime =  int(walltime[0])*3600 + int(walltime[1]) * 60 + int(walltime[2])
707             total_walltime = desired_walltime + 140 #+2 min 20
708             sleep_walltime = desired_walltime + 20 #+20 sec
709             print>>sys.stderr, "\r\n \r\n \t\tAddSliceToNodes desired_walltime %s  total_walltime %s sleep_walltime %s  " %(desired_walltime,total_walltime,sleep_walltime)
710             #Put the walltime back in str form
711             #First get the hours
712             walltime[0] = str(total_walltime / 3600)
713             total_walltime = total_walltime - 3600 * int(walltime[0])
714             #Get the remaining minutes
715             walltime[1] = str(total_walltime / 60)
716             total_walltime =  total_walltime - 60 * int(walltime[1])
717             #Get the seconds
718             walltime[2] = str(total_walltime)
719             print>>sys.stderr, "\r\n \r\n \t\tAddSliceToNodes  walltime %s " %(walltime)
720
721             reqdict['resource']+= ",walltime=" + str(walltime[0]) + ":" + str(walltime[1]) + ":" + str(walltime[2]) 
722             reqdict['script_path'] = "/bin/sleep " + str(sleep_walltime)
723         else:
724             reqdict['resource']+= ",walltime=" + str(00) + ":" + str(12) + ":" + str(20) #+2 min 20
725             reqdict['script_path'] = "/bin/sleep 620" #+20 sec    
726         #In case of a scheduled experiment (not immediate)
727         #To run an XP immediately, don't specify date and time in RSpec 
728         #They will be set to None.
729         if slot['date'] and slot['start_time']:
730             if slot['timezone'] is '' or slot['timezone'] is None:
731                 #assume it is server timezone
732                 server_timestamp,server_tz = self.GetTimezone()
733                 from_zone=tz.gettz(server_tz) 
734                 print>>sys.stderr, "\r\n \r\n \t\tAddSliceToNodes  timezone not specified  server_tz %s from_zone  %s" %(server_tz,from_zone) 
735             else:
736                 #Get zone of the user from the reservation time given in the rspec
737                 from_zone = tz.gettz(slot['timezone'])  
738                    
739             date = str(slot['date'])  + " " + str(slot['start_time'])
740             user_datetime = datetime.datetime.strptime(date, self.time_format)
741             user_datetime = user_datetime.replace(tzinfo = from_zone)
742             
743             #Convert to UTC zone
744             to_zone = tz.tzutc()
745             utc_date = user_datetime.astimezone(to_zone)
746             #Readable time accpeted by OAR
747             reqdict['reservation']= utc_date.strftime(self.time_format)
748         
749             print>>sys.stderr, "\r\n \r\n \t\tAddSliceToNodes  reqdict['reservation'] %s " %(reqdict['reservation'])
750             
751         else:
752             # Immediate XP
753             # reservations are performed in the oar server timebase, so :
754             # 1- we get the server time(in UTC tz )/server timezone
755             # 2- convert the server UTC time in its timezone
756             # 3- add a custom delay to this time
757             # 4- convert this time to a readable form and it for the reservation request.
758             server_timestamp,server_tz = self.GetTimezone()
759             s_tz=tz.gettz(server_tz)
760             UTC_zone = tz.gettz("UTC")
761             #weird... datetime.fromtimestamp should work since we do from datetime import datetime
762             utc_server= datetime.datetime.fromtimestamp(float(server_timestamp)+20,UTC_zone)
763             server_localtime=utc_server.astimezone(s_tz)
764     
765             print>>sys.stderr, "\r\n \r\n \t\tAddSliceToNodes server_timestamp %s server_tz %s slice_name %s added_nodes %s username %s reqdict %s " %(server_timestamp,server_tz,slice_name,added_nodes,slice_user, reqdict )
766             readable_time = server_localtime.strftime(self.time_format)
767
768             print >>sys.stderr,"  \r\n \r\n \t\t\t\tAPRES ParseTimezone readable_time %s timestanp %s  " %(readable_time ,server_timestamp)
769             reqdict['reservation'] = readable_time
770         
771
772         reqdict['type'] = "deploy" 
773         reqdict['directory']= ""
774         reqdict['name']= "TestSandrine"
775        
776          
777         # first step : start the OAR job and update the job 
778         print>>sys.stderr, "\r\n \r\n AddSliceToNodes reqdict   %s \r\n site_list   %s"  %(reqdict,site_list)   
779        
780         answer = self.oar.POSTRequestToOARRestAPI('POST_job',reqdict,slice_user)
781         print>>sys.stderr, "\r\n \r\n AddSliceToNodes jobid   %s "  %(answer)
782         try:       
783             jobid = answer['id']
784         except KeyError:
785              print>>sys.stderr, "\r\n AddSliceTonode Impossible to create job  %s "  %( answer)
786              return
787         
788         print>>sys.stderr, "\r\n \r\n AddSliceToNodes jobid    %s added_nodes  %s slice_user %s"  %(jobid,added_nodes,slice_user)
789         self.db.update_job( slice_name, jobid ,added_nodes)
790         
791           
792         # second step : configure the experiment
793         # we need to store the nodes in a yaml (well...) file like this :
794         # [1,56,23,14,45,75] with name /tmp/sfa<jobid>.json
795         f=open('/tmp/sfa/'+str(jobid)+'.json','w')
796         f.write('[')
797         f.write(str(added_nodes[0].strip('node')))
798         for node in added_nodes[1:len(added_nodes)] :
799             f.write(','+node.strip('node'))
800         f.write(']')
801         f.close()
802         
803         # third step : call the senslab-experiment wrapper
804         #command= "java -jar target/sfa-1.0-jar-with-dependencies.jar "+str(jobid)+" "+slice_user
805         javacmdline="/usr/bin/java"
806         jarname="/opt/senslabexperimentwrapper/sfa-1.0-jar-with-dependencies.jar"
807         #ret=subprocess.check_output(["/usr/bin/java", "-jar", ", str(jobid), slice_user])
808         output = subprocess.Popen([javacmdline, "-jar", jarname, str(jobid), slice_user],stdout=subprocess.PIPE).communicate()[0]
809
810         print>>sys.stderr, "\r\n \r\n AddSliceToNodes wrapper returns   %s "  %(output)
811         return 
812                  
813  
814     #Delete the jobs and updates the job id in the senslab table
815     #to set it to -1  
816     #Does not clear the node list 
817     def DeleteSliceFromNodes(self, slice_record):
818          # Get user information
819        
820         self.DeleteJobs(slice_record['oar_job_id'], slice_record['hrn'])
821         self.db.update_job(slice_record['hrn'], job_id = -1)
822         return   
823     
824  
825
826     def fill_record_sfa_info(self, records):
827
828         def startswith(prefix, values):
829             return [value for value in values if value.startswith(prefix)]
830
831         # get person ids
832         person_ids = []
833         site_ids = []
834         for record in records:
835             person_ids.extend(record.get("person_ids", []))
836             site_ids.extend(record.get("site_ids", [])) 
837             if 'site_id' in record:
838                 site_ids.append(record['site_id']) 
839                 
840         #print>>sys.stderr, "\r\n \r\n _fill_record_sfa_info ___person_ids %s \r\n \t\t site_ids %s " %(person_ids, site_ids)
841         
842         # get all pis from the sites we've encountered
843         # and store them in a dictionary keyed on site_id 
844         site_pis = {}
845         if site_ids:
846             pi_filter = {'|roles': ['pi'], '|site_ids': site_ids} 
847             pi_list = self.GetPersons( pi_filter, ['person_id', 'site_ids'])
848             #print>>sys.stderr, "\r\n \r\n _fill_record_sfa_info ___ GetPersons ['person_id', 'site_ids'] pi_ilist %s" %(pi_list)
849
850             for pi in pi_list:
851                 # we will need the pi's hrns also
852                 person_ids.append(pi['person_id'])
853                 
854                 # we also need to keep track of the sites these pis
855                 # belong to
856                 for site_id in pi['site_ids']:
857                     if site_id in site_pis:
858                         site_pis[site_id].append(pi)
859                     else:
860                         site_pis[site_id] = [pi]
861                  
862         # get sfa records for all records associated with these records.   
863         # we'll replace pl ids (person_ids) with hrns from the sfa records
864         # we obtain
865         
866         # get the sfa records
867         #table = SfaTable()
868         existing_records = {}
869         all_records = dbsession.query(RegRecord).all()
870         for record in all_records:
871             existing_records[(record.type,record.pointer)] = record
872             
873         print >>sys.stderr, " \r\r\n SLABDRIVER fill_record_sfa_info existing_records %s "  %(existing_records)
874         person_list, persons = [], {}
875         #person_list = table.find({'type': 'user', 'pointer': person_ids})
876         try:
877             for p_id in person_ids:
878                 person_list.append( existing_records.get(('user',p_id)))
879         except KeyError:
880             print >>sys.stderr, " \r\r\n SLABDRIVER fill_record_sfa_info ERRRRRRRRRROR"
881                  
882         # create a hrns keyed on the sfa record's pointer.
883         # Its possible for  multiple records to have the same pointer so
884         # the dict's value will be a list of hrns.
885         persons = defaultdict(list)
886         for person in person_list:
887             persons[person['pointer']].append(person)
888
889         # get the pl records
890         slab_person_list, slab_persons = [], {}
891         slab_person_list = self.GetPersons(person_ids, ['person_id', 'roles'])
892         slab_persons = list_to_dict(slab_person_list, 'person_id')
893         #print>>sys.stderr, "\r\n \r\n _fill_record_sfa_info ___  _list %s \r\n \t\t SenslabUsers.GetPersons ['person_id', 'roles'] slab_persons %s \r\n records %s" %(slab_person_list, slab_persons,records) 
894         # fill sfa info
895         
896         for record in records:
897             # skip records with no pl info (top level authorities)
898             #Sandrine 24 oct 11 2 lines
899             #if record['pointer'] == -1:
900                 #continue 
901             sfa_info = {}
902             type = record['type']
903             if (type == "slice"):
904                 # all slice users are researchers
905                 #record['geni_urn'] = hrn_to_urn(record['hrn'], 'slice')  ? besoin ou pas ?
906                 record['PI'] = []
907                 record['researcher'] = []
908                 for person_id in record.get('person_ids', []):
909                          #Sandrine 24 oct 11 line
910                 #for person_id in record['person_ids']:
911                     hrns = [person['hrn'] for person in persons[person_id]]
912                     record['researcher'].extend(hrns)                
913
914                 # pis at the slice's site
915                 slab_pis = site_pis[record['site_id']]
916                 pi_ids = [pi['person_id'] for pi in slab_pis]
917                 for person_id in pi_ids:
918                     hrns = [person['hrn'] for person in persons[person_id]]
919                     record['PI'].extend(hrns)
920                 record['geni_urn'] = hrn_to_urn(record['hrn'], 'slice')
921                 record['geni_creator'] = record['PI'] 
922                 
923             elif (type == "authority"):
924                 record['PI'] = []
925                 record['operator'] = []
926                 record['owner'] = []
927                 for pointer in record['person_ids']:
928                     if pointer not in persons or pointer not in slab_persons:
929                         # this means there is not sfa or pl record for this user
930                         continue   
931                     hrns = [person['hrn'] for person in persons[pointer]] 
932                     roles = slab_persons[pointer]['roles']   
933                     if 'pi' in roles:
934                         record['PI'].extend(hrns)
935                     if 'tech' in roles:
936                         record['operator'].extend(hrns)
937                     if 'admin' in roles:
938                         record['owner'].extend(hrns)
939                     # xxx TODO: OrganizationName
940             elif (type == "node"):
941                 sfa_info['dns'] = record.get("hostname", "")
942                 # xxx TODO: URI, LatLong, IP, DNS
943     
944             elif (type == "user"):
945                  sfa_info['email'] = record.get("email", "")
946                  sfa_info['geni_urn'] = hrn_to_urn(record['hrn'], 'user')
947                  sfa_info['geni_certificate'] = record['gid'] 
948                 # xxx TODO: PostalAddress, Phone
949                 
950             #print>>sys.stderr, "\r\n \r\rn \t\t \t <<<<<<<<<<<<<<<<<<<<<<<<  fill_record_sfa_info sfa_info %s  \r\n record %s : "%(sfa_info,record)  
951             record.update(sfa_info)
952             
953     def augment_records_with_testbed_info (self, sfa_records):
954         return self.fill_record_info (sfa_records)
955     
956     def fill_record_info(self, records):
957         """
958         Given a SFA record, fill in the senslab specific and SFA specific
959         fields in the record. 
960         """
961                     
962         print >>sys.stderr, "\r\n \t\t  SLABDRIVER.PY fill_record_info 000000000 fill_record_info %s  " %(records)
963         if not isinstance(records, list):
964             records = [records]
965
966         parkour = records 
967         try:
968             for record in parkour:
969                     
970                 if str(record['type']) == 'slice':
971                     #print >>sys.stderr, "\r\n \t\t  SLABDRIVER.PY  fill_record_info \t \t record %s" %(record)
972                     #sfatable = SfaTable()
973                     
974                     #existing_records_by_id = {}
975                     #all_records = dbsession.query(RegRecord).all()
976                     #for rec in all_records:
977                         #existing_records_by_id[rec.record_id] = rec
978                     #print >>sys.stderr, "\r\n \t\t SLABDRIVER.PY  fill_record_info \t\t existing_records_by_id %s" %(existing_records_by_id[record['record_id']])
979                         
980                     #recslice = self.db.find('slice',{'slice_hrn':str(record['hrn'])}) 
981                     #recslice = slab_dbsession.query(SliceSenslab).filter_by(slice_hrn = str(record['hrn'])).first()
982                     recslice = self.GetSlices(slice_filter =  str(record['hrn']), filter_type = 'slice_hrn')
983                     #print >>sys.stderr, "\r\n \t\t  SLABDRIVER.PY fill_record_info \t\t HOY HOY reclise %s" %(recslice)
984                     #if isinstance(recslice,list) and len(recslice) == 1:
985                         #recslice = recslice[0]
986                    
987                     recuser = dbsession.query(RegRecord).filter_by(record_id = recslice['record_id_user']).first()
988                     #existing_records_by_id[recslice['record_id_user']]
989                     #print >>sys.stderr, "\r\n \t\t  SLABDRIVER.PY fill_record_info \t\t recuser %s" %(recuser)
990                     
991           
992                     record.update({'PI':[recuser.hrn],
993                     'researcher': [recuser.hrn],
994                     'name':record['hrn'], 
995                     'oar_job_id':recslice['oar_job_id'],
996                     'node_ids': [],
997                     'person_ids':[recslice['record_id_user']]})
998                     
999                 elif str(record['type']) == 'user':
1000                     #print >>sys.stderr, "\r\n \t\t  SLABDRIVER.PY fill_record_info USEEEEEEEEEERDESU!" 
1001
1002                     rec = self.GetSlices(slice_filter = record['record_id'], filter_type = 'record_id_user')
1003                     #Append record in records list, therfore fetches user and slice info again(one more loop)
1004                     #Will update PIs and researcher for the slice
1005
1006                     rec.update({'type':'slice','hrn':rec['slice_hrn']})
1007                     records.append(rec)
1008                     #print >>sys.stderr, "\r\n \t\t  SLABDRIVER.PY fill_record_info ADDING SLIC EINFO rec %s" %(rec) 
1009                     
1010             print >>sys.stderr, "\r\n \t\t  SLABDRIVER.PY fill_record_info OKrecords %s" %(records) 
1011         except TypeError:
1012             print >>sys.stderr, "\r\n \t\t SLABDRIVER fill_record_info  EXCEPTION RECORDS : %s" %(records)      
1013             return
1014         
1015         #self.fill_record_slab_info(records)
1016         ##print >>sys.stderr, "\r\n \t\t after fill_record_slab_info %s" %(records)     
1017         #self.fill_record_sfa_info(records)
1018         #print >>sys.stderr, "\r\n \t\t after fill_record_sfa_info"
1019         
1020         
1021
1022     
1023         
1024     #def update_membership_list(self, oldRecord, record, listName, addFunc, delFunc):
1025         ## get a list of the HRNs tht are members of the old and new records
1026         #if oldRecord:
1027             #oldList = oldRecord.get(listName, [])
1028         #else:
1029             #oldList = []     
1030         #newList = record.get(listName, [])
1031
1032         ## if the lists are the same, then we don't have to update anything
1033         #if (oldList == newList):
1034             #return
1035
1036         ## build a list of the new person ids, by looking up each person to get
1037         ## their pointer
1038         #newIdList = []
1039         #table = SfaTable()
1040         #records = table.find({'type': 'user', 'hrn': newList})
1041         #for rec in records:
1042             #newIdList.append(rec['pointer'])
1043
1044         ## build a list of the old person ids from the person_ids field 
1045         #if oldRecord:
1046             #oldIdList = oldRecord.get("person_ids", [])
1047             #containerId = oldRecord.get_pointer()
1048         #else:
1049             ## if oldRecord==None, then we are doing a Register, instead of an
1050             ## update.
1051             #oldIdList = []
1052             #containerId = record.get_pointer()
1053
1054     ## add people who are in the new list, but not the oldList
1055         #for personId in newIdList:
1056             #if not (personId in oldIdList):
1057                 #addFunc(self.plauth, personId, containerId)
1058
1059         ## remove people who are in the old list, but not the new list
1060         #for personId in oldIdList:
1061             #if not (personId in newIdList):
1062                 #delFunc(self.plauth, personId, containerId)
1063
1064     #def update_membership(self, oldRecord, record):
1065         #print >>sys.stderr, " \r\n \r\n ***SLABDRIVER.PY update_membership record ", record
1066         #if record.type == "slice":
1067             #self.update_membership_list(oldRecord, record, 'researcher',
1068                                         #self.users.AddPersonToSlice,
1069                                         #self.users.DeletePersonFromSlice)
1070         #elif record.type == "authority":
1071             ## xxx TODO
1072             #pass
1073
1074 ### thierry
1075 # I don't think you plan on running a component manager at this point
1076 # let me clean up the mess of ComponentAPI that is deprecated anyways