Minor modifications in timeslot element.
[sfa.git] / sfa / senslab / slabdriver.py
1 import sys
2 import subprocess
3
4 from datetime import datetime
5 from dateutil import tz 
6 from time import strftime,gmtime
7
8 from sfa.util.faults import MissingSfaInfo , SliverDoesNotExist
9 from sfa.util.sfalogging import logger
10 from sfa.util.defaultdict import defaultdict
11
12 from sfa.storage.record import Record
13 from sfa.storage.alchemy import dbsession
14 from sfa.storage.model import RegRecord
15
16
17 from sfa.trust.certificate import *
18 from sfa.trust.credential import *
19 from sfa.trust.gid import GID
20
21 from sfa.managers.driver import Driver
22 from sfa.rspecs.version_manager import VersionManager
23 from sfa.rspecs.rspec import RSpec
24
25 from sfa.util.xrn import hrn_to_urn, urn_to_sliver_id
26 from sfa.util.plxrn import slicename_to_hrn, hostname_to_hrn, hrn_to_pl_slicename
27
28 ## thierry: everything that is API-related (i.e. handling incoming requests) 
29 # is taken care of 
30 # SlabDriver should be really only about talking to the senslab testbed
31
32 ## thierry : please avoid wildcard imports :)
33 from sfa.senslab.OARrestapi import  OARrestapi
34 from sfa.senslab.LDAPapi import LDAPapi
35
36 from sfa.senslab.parsing import parse_filter
37 from sfa.senslab.slabpostgres import SlabDB, slab_dbsession,SliceSenslab
38 from sfa.senslab.slabaggregate import SlabAggregate
39 from sfa.senslab.slabslices import SlabSlices
40
41 def list_to_dict(recs, key):
42     """
43     convert a list of dictionaries into a dictionary keyed on the 
44     specified dictionary key 
45     """
46
47     keys = [rec[key] for rec in recs]
48     return dict(zip(keys, recs))
49
50 # thierry : note
51 # this inheritance scheme is so that the driver object can receive
52 # GetNodes or GetSites sorts of calls directly
53 # and thus minimize the differences in the managers with the pl version
54 class SlabDriver(Driver):
55
56     def __init__(self, config):
57         Driver.__init__ (self, config)
58         self.config=config
59         self.hrn = config.SFA_INTERFACE_HRN
60     
61         self.root_auth = config.SFA_REGISTRY_ROOT_AUTH
62
63         
64         print >>sys.stderr, "\r\n_____________ SFA SENSLAB DRIVER \r\n" 
65
66         self.oar = OARrestapi()
67         self.ldap = LDAPapi()
68         self.time_format = "%Y-%m-%d %H:%M:%S"
69         self.db = SlabDB(config)
70         self.cache=None
71         
72
73     def sliver_status(self,slice_urn,slice_hrn):
74         # receive a status request for slice named urn/hrn urn:publicid:IDN+senslab+nturro_slice hrn senslab.nturro_slice
75         # shall return a structure as described in
76         # http://groups.geni.net/geni/wiki/GAPI_AM_API_V2#SliverStatus
77         # NT : not sure if we should implement this or not, but used by sface.
78         
79
80         sl = self.GetSlices(slice_filter= slice_hrn, filter_type = 'slice_hrn')
81         if len(sl) is 0:
82             raise SliverDoesNotExist("%s  slice_hrn" % (slice_hrn))
83
84         print >>sys.stderr, "\r\n \r\n_____________ Sliver status urn %s hrn %s sl %s \r\n " %(slice_urn,slice_hrn,sl)
85         if sl['oar_job_id'] is not -1:
86     
87             # report about the local nodes only
88             nodes_all = self.GetNodes({'hostname':sl['node_ids']},
89                             ['node_id', 'hostname','site','boot_state'])
90             nodeall_byhostname = dict([(n['hostname'], n) for n in nodes_all])
91             nodes = sl['node_ids']
92             if len(nodes) is 0:
93                 raise SliverDoesNotExist("No slivers allocated ") 
94                     
95
96             result = {}
97             top_level_status = 'unknown'
98             if nodes:
99                 top_level_status = 'ready'
100             result['geni_urn'] = slice_urn
101             result['pl_login'] = sl['job_user']
102             #result['slab_login'] = sl['job_user']
103             
104             timestamp = float(sl['startTime']) + float(sl['walltime']) 
105             result['pl_expires'] = strftime(self.time_format, gmtime(float(timestamp)))
106             #result['slab_expires'] = strftime(self.time_format, gmtime(float(timestamp)))
107             
108             resources = []
109             for node in nodes:
110                 res = {}
111                 #res['slab_hostname'] = node['hostname']
112                 #res['slab_boot_state'] = node['boot_state']
113                 
114                 res['pl_hostname'] = nodeall_byhostname[node]['hostname']
115                 res['pl_boot_state'] = nodeall_byhostname[node]['boot_state']
116                 res['pl_last_contact'] = strftime(self.time_format, gmtime(float(timestamp)))
117                 sliver_id = urn_to_sliver_id(slice_urn, sl['record_id_slice'],nodeall_byhostname[node]['node_id'] ) 
118                 res['geni_urn'] = sliver_id 
119                 if nodeall_byhostname[node]['boot_state'] == 'Alive':
120
121                     res['geni_status'] = 'ready'
122                 else:
123                     res['geni_status'] = 'failed'
124                     top_level_status = 'failed' 
125                     
126                 res['geni_error'] = ''
127         
128                 resources.append(res)
129                 
130             result['geni_status'] = top_level_status
131             result['geni_resources'] = resources 
132             print >>sys.stderr, "\r\n \r\n_____________ Sliver status resources %s res %s \r\n " %(resources,res)
133             return result        
134         
135         
136     def create_sliver (self, slice_urn, slice_hrn, creds, rspec_string, users, options):
137         print>>sys.stderr, "\r\n \r\n \t=============================== SLABDRIVER.PY create_sliver "
138         aggregate = SlabAggregate(self)
139         
140         slices = SlabSlices(self)
141         peer = slices.get_peer(slice_hrn)
142         sfa_peer = slices.get_sfa_peer(slice_hrn)
143         slice_record=None 
144  
145         if not isinstance(creds, list):
146             creds = [creds]
147     
148         if users:
149             slice_record = users[0].get('slice_record', {})
150     
151         # parse rspec
152         rspec = RSpec(rspec_string)
153         print>>sys.stderr, "\r\n \r\n \t=============================== SLABDRIVER.PY create_sliver  ============================rspec.version %s " %(rspec.version)
154         
155         
156         # ensure site record exists?
157         # ensure slice record exists
158         slice = slices.verify_slice(slice_hrn, slice_record, peer, sfa_peer, options=options)
159         requested_attributes = rspec.version.get_slice_attributes()
160         
161         if requested_attributes:
162             for attrib_dict in requested_attributes:
163                 if 'timeslot' in attrib_dict:
164                     slice.update({'timeslot':attrib_dict['timeslot']})
165         print >>sys.stderr, "\r\n \r\n \t=============================== SLABDRIVER.PY create_sliver  ..... slice %s " %(slice)
166         # ensure person records exists
167         persons = slices.verify_persons(slice_hrn, slice, users, peer, sfa_peer, options=options)
168         # ensure slice attributes exists?
169
170         
171         # add/remove slice from nodes 
172         print >>sys.stderr, "\r\n \r\n \t=============================== SLABDRIVER.PY create_sliver  ..... " 
173        
174         requested_slivers = [node.get('component_name') for node in rspec.version.get_nodes_with_slivers()]
175         print >>sys.stderr, "\r\n \r\n \t=============================== ........... requested_slivers ============================requested_slivers %s " %(requested_slivers)
176         nodes = slices.verify_slice_nodes(slice, requested_slivers, peer) 
177     
178         
179         return aggregate.get_rspec(slice_xrn=slice_urn, version=rspec.version)
180         
181         
182     def delete_sliver (self, slice_urn, slice_hrn, creds, options):
183         
184         slice = self.GetSlices(slice_filter= slice_hrn, filter_type = 'slice_hrn')
185         print>>sys.stderr, "\r\n \r\n \t\t  SLABDRIVER.PY delete_sliver slice %s" %(slice)
186         if not slice:
187             return 1
188        
189         slices = SlabSlices(self)
190         # determine if this is a peer slice
191         # xxx I wonder if this would not need to use PlSlices.get_peer instead 
192         # in which case plc.peers could be deprecated as this here
193         # is the only/last call to this last method in plc.peers
194         peer = slices.get_peer(slice_hrn)
195         try:
196             if peer:
197                 self.UnBindObjectFromPeer('slice', slice['record_id_slice'], peer)
198             self.DeleteSliceFromNodes(slice)
199         finally:
200             if peer:
201                 self.BindObjectToPeer('slice', slice['slice_id'], peer, slice['peer_slice_id'])
202         return 1
203             
204  
205     # first 2 args are None in case of resource discovery
206     def list_resources (self, slice_urn, slice_hrn, creds, options):
207         #cached_requested = options.get('cached', True) 
208     
209         version_manager = VersionManager()
210         # get the rspec's return format from options
211         rspec_version = version_manager.get_version(options.get('geni_rspec_version'))
212         version_string = "rspec_%s" % (rspec_version)
213     
214         #panos adding the info option to the caching key (can be improved)
215         if options.get('info'):
216             version_string = version_string + "_"+options.get('info', 'default')
217     
218         # look in cache first
219         #if cached_requested and self.cache and not slice_hrn:
220             #rspec = self.cache.get(version_string)
221             #if rspec:
222                 #logger.debug("SlabDriver.ListResources: returning cached advertisement")
223                 #return rspec 
224     
225         #panos: passing user-defined options
226
227         aggregate = SlabAggregate(self)
228         origin_hrn = Credential(string=creds[0]).get_gid_caller().get_hrn()
229         #print>>sys.stderr, " \r\n \r\n \t SLABDRIVER list_resources origin_hrn %s" %(origin_hrn)
230         options.update({'origin_hrn':origin_hrn})
231         #print>>sys.stderr, " \r\n \r\n \t SLABDRIVER  list_resources options %s" %(options)
232         rspec =  aggregate.get_rspec(slice_xrn=slice_urn, version=rspec_version, 
233                                      options=options)
234         print>>sys.stderr, " \r\n \r\n \t SLABDRIVER list_resources rspec " 
235         # cache the result
236         #if self.cache and not slice_hrn:
237             #logger.debug("Slab.ListResources: stores advertisement in cache")
238             #self.cache.add(version_string, rspec)
239     
240         return rspec
241         
242         
243     def list_slices (self, creds, options):
244         # look in cache first
245         #if self.cache:
246             #slices = self.cache.get('slices')
247             #if slices:
248                 #logger.debug("PlDriver.list_slices returns from cache")
249                 #return slices
250     
251         # get data from db 
252         print>>sys.stderr, " \r\n \t\t SLABDRIVER.PY list_slices"
253         slices = self.GetSlices()
254         slice_hrns = [slicename_to_hrn(self.hrn, slice['slice_hrn']) for slice in slices]
255         slice_urns = [hrn_to_urn(slice_hrn, 'slice') for slice_hrn in slice_hrns]
256     
257         # cache the result
258         #if self.cache:
259             #logger.debug ("SlabDriver.list_slices stores value in cache")
260             #self.cache.add('slices', slice_urns) 
261     
262         return slice_urns
263     
264     #No site or node register supported
265     def register (self, sfa_record, hrn, pub_key):
266         type = sfa_record['type']
267         slab_record = self.sfa_fields_to_slab_fields(type, hrn, sfa_record)
268     
269
270         if type == 'slice':
271             acceptable_fields=['url', 'instantiation', 'name', 'description']
272             for key in slab_record.keys():
273                 if key not in acceptable_fields:
274                     slab_record.pop(key) 
275             print>>sys.stderr, " \r\n \t\t SLABDRIVER.PY register"
276             slices = self.GetSlices(slice_filter =slab_record['hrn'], filter_type = 'slice_hrn')
277             if not slices:
278                     pointer = self.AddSlice(slab_record)
279             else:
280                     pointer = slices[0]['slice_id']
281     
282         elif type == 'user':
283             persons = self.GetPersons([sfa_record['hrn']])
284             if not persons:
285                 pointer = self.AddPerson(dict(sfa_record))
286                 #add in LDAP 
287             else:
288                 pointer = persons[0]['person_id']
289                 
290             #Does this make sense to senslab ?
291             #if 'enabled' in sfa_record and sfa_record['enabled']:
292                 #self.UpdatePerson(pointer, {'enabled': sfa_record['enabled']})
293                 
294             # add this person to the site only if she is being added for the first
295             # time by sfa and doesont already exist in plc
296             if not persons or not persons[0]['site_ids']:
297                 login_base = get_leaf(sfa_record['authority'])
298                 self.AddPersonToSite(pointer, login_base)
299     
300             # What roles should this user have?
301             self.AddRoleToPerson('user', pointer)
302             # Add the user's key
303             if pub_key:
304                 self.AddPersonKey(pointer, {'key_type' : 'ssh', 'key' : pub_key})
305                 
306         #No node adding outside OAR
307
308         return pointer
309             
310     #No site or node record update allowed       
311     def update (self, old_sfa_record, new_sfa_record, hrn, new_key):
312         pointer = old_sfa_record['pointer']
313         type = old_sfa_record['type']
314
315         # new_key implemented for users only
316         if new_key and type not in [ 'user' ]:
317             raise UnknownSfaType(type)
318         
319         #if (type == "authority"):
320             #self.shell.UpdateSite(pointer, new_sfa_record)
321     
322         if type == "slice":
323             slab_record=self.sfa_fields_to_slab_fields(type, hrn, new_sfa_record)
324             if 'name' in slab_record:
325                 slab_record.pop('name')
326                 self.UpdateSlice(pointer, slab_record)
327     
328         elif type == "user":
329             update_fields = {}
330             all_fields = new_sfa_record
331             for key in all_fields.keys():
332                 if key in ['first_name', 'last_name', 'title', 'email',
333                            'password', 'phone', 'url', 'bio', 'accepted_aup',
334                            'enabled']:
335                     update_fields[key] = all_fields[key]
336             self.UpdatePerson(pointer, update_fields)
337     
338             if new_key:
339                 # must check this key against the previous one if it exists
340                 persons = self.GetPersons([pointer], ['key_ids'])
341                 person = persons[0]
342                 keys = person['key_ids']
343                 keys = self.GetKeys(person['key_ids'])
344                 
345                 # Delete all stale keys
346                 key_exists = False
347                 for key in keys:
348                     if new_key != key['key']:
349                         self.DeleteKey(key['key_id'])
350                     else:
351                         key_exists = True
352                 if not key_exists:
353                     self.AddPersonKey(pointer, {'key_type': 'ssh', 'key': new_key})
354
355
356         return True
357         
358
359     def remove (self, sfa_record):
360         type=sfa_record['type']
361         hrn=sfa_record['hrn']
362         record_id= sfa_record['record_id']
363         if type == 'user':
364             username = hrn.split(".")[len(hrn.split(".")) -1]
365             #get user in ldap
366             persons = self.GetPersons(username)
367             # only delete this person if he has site ids. if he doesnt, it probably means
368             # he was just removed from a site, not actually deleted
369             if persons and persons[0]['site_ids']:
370                 self.DeletePerson(username)
371         elif type == 'slice':
372             if self.GetSlices(slice_filter = hrn, filter_type = 'slice_hrn'):
373                 self.DeleteSlice(hrn)
374
375         #elif type == 'authority':
376             #if self.GetSites(pointer):
377                 #self.DeleteSite(pointer)
378
379         return True
380             
381     def GetPeers (self,auth = None, peer_filter=None, return_fields=None):
382
383         existing_records = {}
384         existing_hrns_by_types= {}
385         print >>sys.stderr, "\r\n \r\n SLABDRIVER GetPeers auth = %s, peer_filter %s, return_field %s " %(auth , peer_filter, return_fields)
386         all_records = dbsession.query(RegRecord).filter(RegRecord.type.like('%authority%')).all()
387         for record in all_records:
388             existing_records[record.hrn] = record
389             if record.type not in existing_hrns_by_types:
390                 existing_hrns_by_types[record.type] = [record.hrn]
391                 print >>sys.stderr, "\r\n \r\n SLABDRIVER GetPeers \t NOT IN existing_hrns_by_types %s " %( existing_hrns_by_types)
392             else:
393                 
394                 print >>sys.stderr, "\r\n \r\n SLABDRIVER GetPeers \t INNN  type %s hrn %s " %( record.type,record.hrn )
395                 existing_hrns_by_types.update({record.type:(existing_hrns_by_types[record.type].append(record.hrn))})
396                         
397         print >>sys.stderr, "\r\n \r\n SLABDRIVER GetPeers        existing_hrns_by_types %s " %( existing_hrns_by_types)
398         records_list= [] 
399       
400         try:
401             for hrn in existing_hrns_by_types['authority+sa']:
402                 records_list.append(existing_records[hrn])
403                 print >>sys.stderr, "\r\n \r\n SLABDRIVER GetPeers  records_list  %s " %(records_list)
404                 
405         except:
406                 pass
407
408         if not peer_filter and not return_fields:
409             return records_list
410         return_records = parse_filter(records_list,peer_filter, 'peers', return_fields) 
411  
412         return return_records
413         
414      
415             
416     def GetPersons(self, person_filter=None, return_fields=None):
417         
418         person_list = self.ldap.ldapFind({'authority': self.root_auth })
419         
420         #check = False
421         #if person_filter and isinstance(person_filter, dict):
422             #for k in  person_filter.keys():
423                 #if k in person_list[0].keys():
424                     #check = True
425                     
426         return_person_list = parse_filter(person_list,person_filter ,'persons', return_fields)
427         if return_person_list:
428             print>>sys.stderr, " \r\n GetPersons person_filter %s return_fields %s  " %(person_filter,return_fields)
429             return return_person_list
430
431     def GetTimezone(self):
432         server_timestamp,server_tz = self.oar.parser.SendRequest("GET_timezone")
433         return server_timestamp,server_tz
434     
435
436     def DeleteJobs(self, job_id, slice_hrn):
437         if not job_id:
438             return
439         username  = slice_hrn.split(".")[-1].rstrip("_slice")
440         reqdict = {}
441         reqdict['method'] = "delete"
442         reqdict['strval'] = str(job_id)
443         answer = self.oar.POSTRequestToOARRestAPI('DELETE_jobs_id',reqdict,username)
444         print>>sys.stderr, "\r\n \r\n  jobid  DeleteJobs %s "  %(answer)
445         
446                 
447     def GetJobs(self,job_id= None, resources=True,return_fields=None, username = None):
448         #job_resources=['reserved_resources', 'assigned_resources','job_id', 'job_uri', 'assigned_nodes',\
449         #'api_timestamp']
450         #assigned_res = ['resource_id', 'resource_uri']
451         #assigned_n = ['node', 'node_uri']
452      
453         if job_id and resources is False:
454             req = "GET_jobs_id"
455             node_list_k = 'assigned_network_address'
456            
457         if job_id and resources :
458             req = "GET_jobs_id_resources"
459             node_list_k = 'reserved_resources' 
460                
461         #Get job info from OAR    
462         job_info = self.oar.parser.SendRequest(req, job_id, username)
463         print>>sys.stderr, "\r\n \r\n \t\t GetJobs  %s " %(job_info)
464         
465         if 'state' in job_info :
466             if job_info['state'] == 'Terminated':
467                 print>>sys.stderr, "\r\n \r\n \t\t GetJobs TERMINELEBOUSIN "
468                 return None
469             if job_info['state'] == 'Error':
470                 print>>sys.stderr, "\r\n \r\n \t\t GetJobs ERROR message %s " %(job_info)
471                 return None
472         
473         #Get a dict of nodes . Key :hostname of the node
474         node_list = self.GetNodes() 
475         node_hostname_list = []
476         for node in node_list:
477             node_hostname_list.append(node['hostname'])
478         node_dict = dict(zip(node_hostname_list,node_list))
479         try :
480             liste =job_info[node_list_k] 
481             #print>>sys.stderr, "\r\n \r\n \t\t GetJobs resources  job_info liste%s" %(liste)
482             for k in range(len(liste)):
483                job_info[node_list_k][k] = node_dict[job_info[node_list_k][k]]['hostname']
484             
485             #print>>sys.stderr, "\r\n \r\n \t\t YYYYYYYYYYYYGetJobs resources  job_info %s" %(job_info)  
486             #Replaces the previous entry "assigned_network_address" / "reserved_resources"
487             #with "node_ids"
488             job_info.update({'node_ids':job_info[node_list_k]})
489             del job_info[node_list_k]
490             return job_info
491             
492         except KeyError:
493             print>>sys.stderr, "\r\n \r\n \t\t GetJobs KEYERROR " 
494             
495     def GetReservedNodes(self):
496         # this function returns a list of all the nodes already involved in an oar job
497
498        jobs=self.oar.parser.SendRequest("GET_jobs_details") 
499        nodes=[]
500        for j in jobs :
501           nodes=j['assigned_network_address']+nodes
502        return nodes
503      
504     def GetNodes(self,node_filter= None, return_fields=None):
505         node_dict =self.oar.parser.SendRequest("GET_resources_full")
506
507         return_node_list = []
508         if not (node_filter or return_fields):
509                 return_node_list = node_dict.values()
510                 return return_node_list
511     
512         return_node_list= parse_filter(node_dict.values(),node_filter ,'node', return_fields)
513         return return_node_list
514     
515   
516     def GetSites(self, site_filter = None, return_fields=None):
517         site_dict =self.oar.parser.SendRequest("GET_sites")
518         return_site_list = []
519         if not ( site_filter or return_fields):
520                 return_site_list = site_dict.values()
521                 return return_site_list
522     
523         return_site_list = parse_filter(site_dict.values(), site_filter,'site', return_fields)
524         return return_site_list
525         
526
527     def GetSlices(self,slice_filter = None, filter_type = None, return_fields=None):
528         return_slice_list = []
529         slicerec  = {}
530         rec = {}
531         ftypes = ['slice_hrn', 'record_id_user']
532         if filter_type and filter_type in ftypes:
533             if filter_type == 'slice_hrn':
534                 slicerec = slab_dbsession.query(SliceSenslab).filter_by(slice_hrn = slice_filter).first()    
535             if filter_type == 'record_id_user':
536                 slicerec = slab_dbsession.query(SliceSenslab).filter_by(record_id_user = slice_filter).first()
537                 
538             if slicerec:
539                 rec = slicerec.dumpquerytodict()
540                 login = slicerec.slice_hrn.split(".")[1].split("_")[0]
541                 #print >>sys.stderr, " \r\n \r\n \tSLABDRIVER.PY slicerec GetSlices   %s " %(slicerec)
542                 if slicerec.oar_job_id is not -1:
543                     rslt = self.GetJobs( slicerec.oar_job_id, resources=False, username = login )
544                     #print >>sys.stderr, " \r\n \r\n \tSLABDRIVER.PY  GetSlices  GetJobs  %s " %(rslt)     
545                     if rslt :
546                         rec.update(rslt)
547                         rec.update({'hrn':str(rec['slice_hrn'])})
548                         #If GetJobs is empty, this means the job is now in the 'Terminated' state
549                         #Update the slice record
550                     else :
551                         self.db.update_job(slice_filter, job_id = -1)
552                         rec['oar_job_id'] = -1
553                         rec.update({'hrn':str(rec['slice_hrn'])})
554             
555                 try:
556                     rec['node_ids'] = rec['node_list']
557                 except KeyError:
558                     pass
559                 
560                 #print >>sys.stderr, " \r\n \r\n \tSLABDRIVER.PY  GetSlices  rec  %s" %(rec)
561                               
562             return rec
563                 
564                 
565         else:
566             return_slice_list = slab_dbsession.query(SliceSenslab).all()
567
568         print >>sys.stderr, " \r\n \r\n \tSLABDRIVER.PY  GetSlices  slices %s slice_filter %s " %(return_slice_list,slice_filter)
569         
570         #if return_fields:
571             #return_slice_list  = parse_filter(sliceslist, slice_filter,'slice', return_fields)
572         
573         
574                     
575         return return_slice_list
576         
577
578         
579     
580     def testbed_name (self): return "senslab2" 
581          
582     # 'geni_request_rspec_versions' and 'geni_ad_rspec_versions' are mandatory
583     def aggregate_version (self):
584         version_manager = VersionManager()
585         ad_rspec_versions = []
586         request_rspec_versions = []
587         for rspec_version in version_manager.versions:
588             if rspec_version.content_type in ['*', 'ad']:
589                 ad_rspec_versions.append(rspec_version.to_dict())
590             if rspec_version.content_type in ['*', 'request']:
591                 request_rspec_versions.append(rspec_version.to_dict()) 
592         return {
593             'testbed':self.testbed_name(),
594             'geni_request_rspec_versions': request_rspec_versions,
595             'geni_ad_rspec_versions': ad_rspec_versions,
596             }
597           
598           
599           
600           
601           
602           
603     ##
604     # Convert SFA fields to PLC fields for use when registering up updating
605     # registry record in the PLC database
606     #
607     # @param type type of record (user, slice, ...)
608     # @param hrn human readable name
609     # @param sfa_fields dictionary of SFA fields
610     # @param slab_fields dictionary of PLC fields (output)
611
612     def sfa_fields_to_slab_fields(self, type, hrn, record):
613
614         def convert_ints(tmpdict, int_fields):
615             for field in int_fields:
616                 if field in tmpdict:
617                     tmpdict[field] = int(tmpdict[field])
618
619         slab_record = {}
620         #for field in record:
621         #    slab_record[field] = record[field]
622  
623         if type == "slice":
624             #instantion used in get_slivers ? 
625             if not "instantiation" in slab_record:
626                 slab_record["instantiation"] = "senslab-instantiated"
627             slab_record["hrn"] = hrn_to_pl_slicename(hrn)
628             print >>sys.stderr, "\r\n \r\n \t SLABDRIVER.PY sfa_fields_to_slab_fields slab_record %s hrn_to_pl_slicename(hrn) hrn %s " %(slab_record['hrn'], hrn)
629             if "url" in record:
630                slab_record["url"] = record["url"]
631             if "description" in record:
632                 slab_record["description"] = record["description"]
633             if "expires" in record:
634                 slab_record["expires"] = int(record["expires"])
635                 
636         #nodes added by OAR only and then imported to SFA
637         #elif type == "node":
638             #if not "hostname" in slab_record:
639                 #if not "hostname" in record:
640                     #raise MissingSfaInfo("hostname")
641                 #slab_record["hostname"] = record["hostname"]
642             #if not "model" in slab_record:
643                 #slab_record["model"] = "geni"
644                 
645         #One authority only 
646         #elif type == "authority":
647             #slab_record["login_base"] = hrn_to_slab_login_base(hrn)
648
649             #if not "name" in slab_record:
650                 #slab_record["name"] = hrn
651
652             #if not "abbreviated_name" in slab_record:
653                 #slab_record["abbreviated_name"] = hrn
654
655             #if not "enabled" in slab_record:
656                 #slab_record["enabled"] = True
657
658             #if not "is_public" in slab_record:
659                 #slab_record["is_public"] = True
660
661         return slab_record
662
663                    
664     def AddSliceToNodes(self,  slice_dict, added_nodes, slice_user=None):
665        
666         site_list = []
667         nodeid_list =[]
668         resource = ""
669         reqdict = {}
670         slice_name = slice_dict['name']
671         try:
672             slot = slice_dict['timeslot'] 
673             print>>sys.stderr, "\r\n \r\n \t\tAddSliceToNodes slot %s   " %(slot)
674         except KeyError:
675             #Running on default parameters
676             #XP immediate , 10 mins
677             slot = {'date':None,'start_time':None, 'timezone':None,'duration':'00:10:00' }#10 min 
678             reqdict['resource']+= ",walltime=" + str(00) + ":" + str(12) + ":" + str(20) #+2 min 20
679             reqdict['script_path'] = "/bin/sleep 620" #+20 sec
680             
681         reqdict['property'] ="network_address in ("
682         for node in added_nodes:
683             #Get the ID of the node : remove the root auth and put the site in a separate list
684             s=node.split(".")
685             # NT: it's not clear for me if the nodenames will have the senslab prefix
686             # so lets take the last part only, for now.
687             lastpart=s[-1]
688             #if s[0] == self.root_auth :
689             # Again here it's not clear if nodes will be prefixed with <site>_, lets split and tanke the last part for now.
690             s=lastpart.split("_")
691             nodeid=s[-1]
692             reqdict['property'] += "'"+ nodeid +"', "
693             nodeid_list.append(nodeid)
694             #site_list.append( l[0] )
695             
696             
697         reqdict['property'] =  reqdict['property'][0: len( reqdict['property'])-2] +")"
698         reqdict['resource'] ="network_address="+ str(len(nodeid_list))
699         
700         if slot['duration']:
701             walltime = slot['duration'].split(":")
702             # Fixing the walltime by adding a few delays. First put the walltime in seconds
703             # oarAdditionalDelay = 20; additional delay for /bin/sleep command to
704             # take in account  prologue and epilogue scripts execution
705             # int walltimeAdditionalDelay = 120;  additional delay
706
707             desired_walltime =  int(walltime[0])*3600 + int(walltime[1]) * 60 + int(walltime[2])
708             total_walltime = desired_walltime + 140 #+2 min 20
709             sleep_walltime = desired_walltime + 20 #+20 sec
710             print>>sys.stderr, "\r\n \r\n \t\tAddSliceToNodes desired_walltime %s  total_walltime %s sleep_walltime %s  " %(desired_walltime,total_walltime,sleep_walltime)
711             #Put the walltime back in str form
712             #First get the hours
713             walltime[0] = str(total_walltime / 3600)
714             total_walltime = total_walltime - 3600 * int(walltime[0])
715             #Get the remaining minutes
716             walltime[1] = str(total_walltime / 60)
717             total_walltime =  total_walltime - 60 * int(walltime[1])
718             #Get the seconds
719             walltime[2] = str(total_walltime)
720             print>>sys.stderr, "\r\n \r\n \t\tAddSliceToNodes  walltime %s " %(walltime)
721
722             reqdict['resource']+= ",walltime=" + str(walltime[0]) + ":" + str(walltime[1]) + ":" + str(walltime[2]) 
723             reqdict['script_path'] = "/bin/sleep " + str(sleep_walltime)
724             
725         #In case of a scheduled experiment (not immediate)
726         #To run an XP immediately, don't specify date and time in RSpec 
727         #They will be set to None.
728         if slot['date'] and slot['start_time']:
729             if slot['timezone'] is '' or slot['timezone'] is None:
730                 #assume it is server timezone
731                 server_timestamp,server_tz = self.GetTimezone()
732                 from_zone=tz.gettz(server_tz) 
733                 print>>sys.stderr, "\r\n \r\n \t\tAddSliceToNodes  timezone not specified  server_tz %s from_zone  %s" %(server_tz,from_zone) 
734             else:
735                 #Get zone of the user from the reservation time given in the rspec
736                 from_zone = tz.gettz(slot['timezone'])  
737                    
738             date = str(slot['date'])  + " " + str(slot['start_time'])
739             user_datetime = datetime.datetime.strptime(date, self.time_format)
740             user_datetime = user_datetime.replace(tzinfo = from_zone)
741             
742             #Convert to UTC zone
743             to_zone = tz.tzutc()
744             utc_date = user_datetime.astimezone(to_zone)
745             #Readable time accpeted by OAR
746             reqdict['reservation']= utc_date.strftime(self.time_format)
747         
748             print>>sys.stderr, "\r\n \r\n \t\tAddSliceToNodes  reqdict['reservation'] %s " %(reqdict['reservation'])
749             
750         else:
751             # Immediate XP
752             # reservations are performed in the oar server timebase, so :
753             # 1- we get the server time(in UTC tz )/server timezone
754             # 2- convert the server UTC time in its timezone
755             # 3- add a custom delay to this time
756             # 4- convert this time to a readable form and it for the reservation request.
757             server_timestamp,server_tz = self.GetTimezone()
758             s_tz=tz.gettz(server_tz)
759             UTC_zone = tz.gettz("UTC")
760             #weird... datetime.fromtimestamp should work since we do from datetime import datetime
761             utc_server= datetime.datetime.fromtimestamp(float(server_timestamp)+20,UTC_zone)
762             server_localtime=utc_server.astimezone(s_tz)
763     
764             print>>sys.stderr, "\r\n \r\n \t\tAddSliceToNodes server_timestamp %s server_tz %s slice_name %s added_nodes %s username %s reqdict %s " %(server_timestamp,server_tz,slice_name,added_nodes,slice_user, reqdict )
765             readable_time = server_localtime.strftime(self.time_format)
766
767             print >>sys.stderr,"  \r\n \r\n \t\t\t\tAPRES ParseTimezone readable_time %s timestanp %s  " %(readable_time ,server_timestamp)
768             reqdict['reservation'] = readable_time
769         
770
771         reqdict['type'] = "deploy" 
772         reqdict['directory']= ""
773         reqdict['name']= "TestSandrine"
774        
775          
776         # first step : start the OAR job and update the job 
777         print>>sys.stderr, "\r\n \r\n AddSliceToNodes reqdict   %s \r\n site_list   %s"  %(reqdict,site_list)   
778        
779         answer = self.oar.POSTRequestToOARRestAPI('POST_job',reqdict,slice_user)
780         print>>sys.stderr, "\r\n \r\n AddSliceToNodes jobid   %s "  %(answer)
781         try:       
782             jobid = answer['id']
783         except KeyError:
784              print>>sys.stderr, "\r\n AddSliceTonode Impossible to create job  %s "  %( answer)
785              return
786         
787         print>>sys.stderr, "\r\n \r\n AddSliceToNodes jobid    %s added_nodes  %s slice_user %s"  %(jobid,added_nodes,slice_user)
788         self.db.update_job( slice_name, jobid ,added_nodes)
789         
790           
791         # second step : configure the experiment
792         # we need to store the nodes in a yaml (well...) file like this :
793         # [1,56,23,14,45,75] with name /tmp/sfa<jobid>.json
794         f=open('/tmp/sfa/'+str(jobid)+'.json','w')
795         f.write('[')
796         f.write(str(added_nodes[0].strip('node')))
797         for node in added_nodes[1:len(added_nodes)] :
798             f.write(','+node.strip('node'))
799         f.write(']')
800         f.close()
801         
802         # third step : call the senslab-experiment wrapper
803         #command= "java -jar target/sfa-1.0-jar-with-dependencies.jar "+str(jobid)+" "+slice_user
804         javacmdline="/usr/bin/java"
805         jarname="/opt/senslabexperimentwrapper/sfa-1.0-jar-with-dependencies.jar"
806         #ret=subprocess.check_output(["/usr/bin/java", "-jar", ", str(jobid), slice_user])
807         output = subprocess.Popen([javacmdline, "-jar", jarname, str(jobid), slice_user],stdout=subprocess.PIPE).communicate()[0]
808
809         print>>sys.stderr, "\r\n \r\n AddSliceToNodes wrapper returns   %s "  %(output)
810         return 
811                  
812  
813     #Delete the jobs and updates the job id in the senslab table
814     #to set it to -1  
815     #Does not clear the node list 
816     def DeleteSliceFromNodes(self, slice_record):
817          # Get user information
818        
819         self.DeleteJobs(slice_record['oar_job_id'], slice_record['hrn'])
820         self.db.update_job(slice_record['hrn'], job_id = -1)
821         return   
822     
823  
824
825     def fill_record_sfa_info(self, records):
826
827         def startswith(prefix, values):
828             return [value for value in values if value.startswith(prefix)]
829
830         # get person ids
831         person_ids = []
832         site_ids = []
833         for record in records:
834             person_ids.extend(record.get("person_ids", []))
835             site_ids.extend(record.get("site_ids", [])) 
836             if 'site_id' in record:
837                 site_ids.append(record['site_id']) 
838                 
839         #print>>sys.stderr, "\r\n \r\n _fill_record_sfa_info ___person_ids %s \r\n \t\t site_ids %s " %(person_ids, site_ids)
840         
841         # get all pis from the sites we've encountered
842         # and store them in a dictionary keyed on site_id 
843         site_pis = {}
844         if site_ids:
845             pi_filter = {'|roles': ['pi'], '|site_ids': site_ids} 
846             pi_list = self.GetPersons( pi_filter, ['person_id', 'site_ids'])
847             #print>>sys.stderr, "\r\n \r\n _fill_record_sfa_info ___ GetPersons ['person_id', 'site_ids'] pi_ilist %s" %(pi_list)
848
849             for pi in pi_list:
850                 # we will need the pi's hrns also
851                 person_ids.append(pi['person_id'])
852                 
853                 # we also need to keep track of the sites these pis
854                 # belong to
855                 for site_id in pi['site_ids']:
856                     if site_id in site_pis:
857                         site_pis[site_id].append(pi)
858                     else:
859                         site_pis[site_id] = [pi]
860                  
861         # get sfa records for all records associated with these records.   
862         # we'll replace pl ids (person_ids) with hrns from the sfa records
863         # we obtain
864         
865         # get the sfa records
866         #table = SfaTable()
867         existing_records = {}
868         all_records = dbsession.query(RegRecord).all()
869         for record in all_records:
870             existing_records[(record.type,record.pointer)] = record
871             
872         print >>sys.stderr, " \r\r\n SLABDRIVER fill_record_sfa_info existing_records %s "  %(existing_records)
873         person_list, persons = [], {}
874         #person_list = table.find({'type': 'user', 'pointer': person_ids})
875         try:
876             for p_id in person_ids:
877                 person_list.append( existing_records.get(('user',p_id)))
878         except KeyError:
879             print >>sys.stderr, " \r\r\n SLABDRIVER fill_record_sfa_info ERRRRRRRRRROR"
880                  
881         # create a hrns keyed on the sfa record's pointer.
882         # Its possible for  multiple records to have the same pointer so
883         # the dict's value will be a list of hrns.
884         persons = defaultdict(list)
885         for person in person_list:
886             persons[person['pointer']].append(person)
887
888         # get the pl records
889         slab_person_list, slab_persons = [], {}
890         slab_person_list = self.GetPersons(person_ids, ['person_id', 'roles'])
891         slab_persons = list_to_dict(slab_person_list, 'person_id')
892         #print>>sys.stderr, "\r\n \r\n _fill_record_sfa_info ___  _list %s \r\n \t\t SenslabUsers.GetPersons ['person_id', 'roles'] slab_persons %s \r\n records %s" %(slab_person_list, slab_persons,records) 
893         # fill sfa info
894         
895         for record in records:
896             # skip records with no pl info (top level authorities)
897             #Sandrine 24 oct 11 2 lines
898             #if record['pointer'] == -1:
899                 #continue 
900             sfa_info = {}
901             type = record['type']
902             if (type == "slice"):
903                 # all slice users are researchers
904                 #record['geni_urn'] = hrn_to_urn(record['hrn'], 'slice')  ? besoin ou pas ?
905                 record['PI'] = []
906                 record['researcher'] = []
907                 for person_id in record.get('person_ids', []):
908                          #Sandrine 24 oct 11 line
909                 #for person_id in record['person_ids']:
910                     hrns = [person['hrn'] for person in persons[person_id]]
911                     record['researcher'].extend(hrns)                
912
913                 # pis at the slice's site
914                 slab_pis = site_pis[record['site_id']]
915                 pi_ids = [pi['person_id'] for pi in slab_pis]
916                 for person_id in pi_ids:
917                     hrns = [person['hrn'] for person in persons[person_id]]
918                     record['PI'].extend(hrns)
919                 record['geni_urn'] = hrn_to_urn(record['hrn'], 'slice')
920                 record['geni_creator'] = record['PI'] 
921                 
922             elif (type == "authority"):
923                 record['PI'] = []
924                 record['operator'] = []
925                 record['owner'] = []
926                 for pointer in record['person_ids']:
927                     if pointer not in persons or pointer not in slab_persons:
928                         # this means there is not sfa or pl record for this user
929                         continue   
930                     hrns = [person['hrn'] for person in persons[pointer]] 
931                     roles = slab_persons[pointer]['roles']   
932                     if 'pi' in roles:
933                         record['PI'].extend(hrns)
934                     if 'tech' in roles:
935                         record['operator'].extend(hrns)
936                     if 'admin' in roles:
937                         record['owner'].extend(hrns)
938                     # xxx TODO: OrganizationName
939             elif (type == "node"):
940                 sfa_info['dns'] = record.get("hostname", "")
941                 # xxx TODO: URI, LatLong, IP, DNS
942     
943             elif (type == "user"):
944                  sfa_info['email'] = record.get("email", "")
945                  sfa_info['geni_urn'] = hrn_to_urn(record['hrn'], 'user')
946                  sfa_info['geni_certificate'] = record['gid'] 
947                 # xxx TODO: PostalAddress, Phone
948                 
949             #print>>sys.stderr, "\r\n \r\rn \t\t \t <<<<<<<<<<<<<<<<<<<<<<<<  fill_record_sfa_info sfa_info %s  \r\n record %s : "%(sfa_info,record)  
950             record.update(sfa_info)
951             
952     def augment_records_with_testbed_info (self, sfa_records):
953         return self.fill_record_info (sfa_records)
954     
955     def fill_record_info(self, records):
956         """
957         Given a SFA record, fill in the senslab specific and SFA specific
958         fields in the record. 
959         """
960                     
961         print >>sys.stderr, "\r\n \t\t  SLABDRIVER.PY fill_record_info 000000000 fill_record_info %s  " %(records)
962         if not isinstance(records, list):
963             records = [records]
964
965         parkour = records 
966         try:
967             for record in parkour:
968                     
969                 if str(record['type']) == 'slice':
970                     #print >>sys.stderr, "\r\n \t\t  SLABDRIVER.PY  fill_record_info \t \t record %s" %(record)
971                     #sfatable = SfaTable()
972                     
973                     #existing_records_by_id = {}
974                     #all_records = dbsession.query(RegRecord).all()
975                     #for rec in all_records:
976                         #existing_records_by_id[rec.record_id] = rec
977                     #print >>sys.stderr, "\r\n \t\t SLABDRIVER.PY  fill_record_info \t\t existing_records_by_id %s" %(existing_records_by_id[record['record_id']])
978                         
979                     #recslice = self.db.find('slice',{'slice_hrn':str(record['hrn'])}) 
980                     #recslice = slab_dbsession.query(SliceSenslab).filter_by(slice_hrn = str(record['hrn'])).first()
981                     recslice = self.GetSlices(slice_filter =  str(record['hrn']), filter_type = 'slice_hrn')
982                     #print >>sys.stderr, "\r\n \t\t  SLABDRIVER.PY fill_record_info \t\t HOY HOY reclise %s" %(recslice)
983                     #if isinstance(recslice,list) and len(recslice) == 1:
984                         #recslice = recslice[0]
985                    
986                     recuser = dbsession.query(RegRecord).filter_by(record_id = recslice['record_id_user']).first()
987                     #existing_records_by_id[recslice['record_id_user']]
988                     #print >>sys.stderr, "\r\n \t\t  SLABDRIVER.PY fill_record_info \t\t recuser %s" %(recuser)
989                     
990           
991                     record.update({'PI':[recuser.hrn],
992                     'researcher': [recuser.hrn],
993                     'name':record['hrn'], 
994                     'oar_job_id':recslice['oar_job_id'],
995                     'node_ids': [],
996                     'person_ids':[recslice['record_id_user']]})
997                     
998                 elif str(record['type']) == 'user':
999                     #print >>sys.stderr, "\r\n \t\t  SLABDRIVER.PY fill_record_info USEEEEEEEEEERDESU!" 
1000
1001                     rec = self.GetSlices(slice_filter = record['record_id'], filter_type = 'record_id_user')
1002                     #Append record in records list, therfore fetches user and slice info again(one more loop)
1003                     #Will update PIs and researcher for the slice
1004
1005                     rec.update({'type':'slice','hrn':rec['slice_hrn']})
1006                     records.append(rec)
1007                     #print >>sys.stderr, "\r\n \t\t  SLABDRIVER.PY fill_record_info ADDING SLIC EINFO rec %s" %(rec) 
1008                     
1009             print >>sys.stderr, "\r\n \t\t  SLABDRIVER.PY fill_record_info OKrecords %s" %(records) 
1010         except TypeError:
1011             print >>sys.stderr, "\r\n \t\t SLABDRIVER fill_record_info  EXCEPTION RECORDS : %s" %(records)      
1012             return
1013         
1014         #self.fill_record_slab_info(records)
1015         ##print >>sys.stderr, "\r\n \t\t after fill_record_slab_info %s" %(records)     
1016         #self.fill_record_sfa_info(records)
1017         #print >>sys.stderr, "\r\n \t\t after fill_record_sfa_info"
1018         
1019         
1020
1021     
1022         
1023     #def update_membership_list(self, oldRecord, record, listName, addFunc, delFunc):
1024         ## get a list of the HRNs tht are members of the old and new records
1025         #if oldRecord:
1026             #oldList = oldRecord.get(listName, [])
1027         #else:
1028             #oldList = []     
1029         #newList = record.get(listName, [])
1030
1031         ## if the lists are the same, then we don't have to update anything
1032         #if (oldList == newList):
1033             #return
1034
1035         ## build a list of the new person ids, by looking up each person to get
1036         ## their pointer
1037         #newIdList = []
1038         #table = SfaTable()
1039         #records = table.find({'type': 'user', 'hrn': newList})
1040         #for rec in records:
1041             #newIdList.append(rec['pointer'])
1042
1043         ## build a list of the old person ids from the person_ids field 
1044         #if oldRecord:
1045             #oldIdList = oldRecord.get("person_ids", [])
1046             #containerId = oldRecord.get_pointer()
1047         #else:
1048             ## if oldRecord==None, then we are doing a Register, instead of an
1049             ## update.
1050             #oldIdList = []
1051             #containerId = record.get_pointer()
1052
1053     ## add people who are in the new list, but not the oldList
1054         #for personId in newIdList:
1055             #if not (personId in oldIdList):
1056                 #addFunc(self.plauth, personId, containerId)
1057
1058         ## remove people who are in the old list, but not the new list
1059         #for personId in oldIdList:
1060             #if not (personId in newIdList):
1061                 #delFunc(self.plauth, personId, containerId)
1062
1063     #def update_membership(self, oldRecord, record):
1064         #print >>sys.stderr, " \r\n \r\n ***SLABDRIVER.PY update_membership record ", record
1065         #if record.type == "slice":
1066             #self.update_membership_list(oldRecord, record, 'researcher',
1067                                         #self.users.AddPersonToSlice,
1068                                         #self.users.DeletePersonFromSlice)
1069         #elif record.type == "authority":
1070             ## xxx TODO
1071             #pass
1072
1073 ### thierry
1074 # I don't think you plan on running a component manager at this point
1075 # let me clean up the mess of ComponentAPI that is deprecated anyways