Merge branch 'senslab2' of ssh://git.f-lab.fr/git/sfa into senslab2
[sfa.git] / sfa / senslab / slabdriver.py
1 import sys
2 import subprocess
3
4 from datetime import datetime
5 from dateutil import tz 
6 from time import strftime,gmtime
7
8 from sfa.util.faults import MissingSfaInfo , SliverDoesNotExist
9 from sfa.util.sfalogging import logger
10 from sfa.util.defaultdict import defaultdict
11
12 from sfa.storage.record import Record
13 from sfa.storage.alchemy import dbsession
14 from sfa.storage.model import RegRecord
15
16 from sfa.trust.credential import Credential
17 from sfa.trust.gid import GID
18
19 from sfa.managers.driver import Driver
20 from sfa.rspecs.version_manager import VersionManager
21 from sfa.rspecs.rspec import RSpec
22
23 from sfa.util.xrn import hrn_to_urn, urn_to_sliver_id
24 from sfa.planetlab.plxrn import slicename_to_hrn, hostname_to_hrn, hrn_to_pl_slicename
25
26 ## thierry: everything that is API-related (i.e. handling incoming requests) 
27 # is taken care of 
28 # SlabDriver should be really only about talking to the senslab testbed
29
30
31 from sfa.senslab.OARrestapi import  OARrestapi
32 from sfa.senslab.LDAPapi import LDAPapi
33
34 from sfa.senslab.slabpostgres import SlabDB, slab_dbsession,SliceSenslab
35 from sfa.senslab.slabaggregate import SlabAggregate
36 from sfa.senslab.slabslices import SlabSlices
37
38
39
40 # thierry : note
41 # this inheritance scheme is so that the driver object can receive
42 # GetNodes or GetSites sorts of calls directly
43 # and thus minimize the differences in the managers with the pl version
44 class SlabDriver(Driver):
45
46     def __init__(self, config):
47         Driver.__init__ (self, config)
48         self.config=config
49         self.hrn = config.SFA_INTERFACE_HRN
50
51         self.root_auth = config.SFA_REGISTRY_ROOT_AUTH
52
53         self.oar = OARrestapi()
54         self.ldap = LDAPapi()
55         self.time_format = "%Y-%m-%d %H:%M:%S"
56         self.db = SlabDB(config,debug = True)
57         self.cache=None
58         
59     
60     def sliver_status(self,slice_urn,slice_hrn):
61         """Receive a status request for slice named urn/hrn 
62         urn:publicid:IDN+senslab+nturro_slice hrn senslab.nturro_slice
63         shall return a structure as described in
64         http://groups.geni.net/geni/wiki/GAPI_AM_API_V2#SliverStatus
65         NT : not sure if we should implement this or not, but used by sface.
66         
67         """
68         
69         #First get the slice with the slice hrn
70         sl = self.GetSlices(slice_filter = slice_hrn, slice_filter_type = 'slice_hrn')
71         if len(sl) is 0:
72             raise SliverDoesNotExist("%s  slice_hrn" % (slice_hrn))
73         
74         nodes_in_slice = sl['node_ids']
75         if len(nodes_in_slice) is 0:
76             raise SliverDoesNotExist("No slivers allocated ") 
77         
78         logger.debug("Slabdriver - sliver_status Sliver status urn %s hrn %s sl\
79                              %s \r\n " %(slice_urn,slice_hrn,sl) )
80                              
81         if sl['oar_job_id'] is not -1:
82             #A job is running on Senslab for this slice
83             # report about the local nodes that are in the slice only
84             
85             nodes_all = self.GetNodes({'hostname':nodes_in_slice},
86                             ['node_id', 'hostname','site','boot_state'])
87             nodeall_byhostname = dict([(n['hostname'], n) for n in nodes_all])
88             
89
90             result = {}
91             top_level_status = 'unknown'
92             if nodes:
93                 top_level_status = 'ready'
94             result['geni_urn'] = slice_urn
95             result['pl_login'] = sl['job_user'] #For compatibility
96
97             
98             timestamp = float(sl['startTime']) + float(sl['walltime']) 
99             result['pl_expires'] = strftime(self.time_format, \
100                                                     gmtime(float(timestamp)))
101             #result['slab_expires'] = strftime(self.time_format,\
102                                                      #gmtime(float(timestamp)))
103             
104             resources = []
105             for node in nodes:
106                 res = {}
107                 #res['slab_hostname'] = node['hostname']
108                 #res['slab_boot_state'] = node['boot_state']
109                 
110                 res['pl_hostname'] = nodeall_byhostname[node]['hostname']
111                 res['pl_boot_state'] = nodeall_byhostname[node]['boot_state']
112                 res['pl_last_contact'] = strftime(self.time_format, \
113                                                     gmtime(float(timestamp)))
114                 sliver_id = urn_to_sliver_id(slice_urn, sl['record_id_slice'], \
115                                             nodeall_byhostname[node]['node_id']) 
116                 res['geni_urn'] = sliver_id 
117                 if nodeall_byhostname[node]['boot_state'] == 'Alive':
118
119                     res['geni_status'] = 'ready'
120                 else:
121                     res['geni_status'] = 'failed'
122                     top_level_status = 'failed' 
123                     
124                 res['geni_error'] = ''
125         
126                 resources.append(res)
127                 
128             result['geni_status'] = top_level_status
129             result['geni_resources'] = resources 
130             print >>sys.stderr, "\r\n \r\n_____________ Sliver status resources %s res %s \r\n " %(resources,res)
131             return result        
132         
133         
134     def create_sliver (self, slice_urn, slice_hrn, creds, rspec_string, users, options):
135         print>>sys.stderr, "\r\n \r\n \t=============================== SLABDRIVER.PY create_sliver "
136         aggregate = SlabAggregate(self)
137         
138         slices = SlabSlices(self)
139         peer = slices.get_peer(slice_hrn)
140         sfa_peer = slices.get_sfa_peer(slice_hrn)
141         slice_record=None 
142  
143         if not isinstance(creds, list):
144             creds = [creds]
145     
146         if users:
147             slice_record = users[0].get('slice_record', {})
148     
149         # parse rspec
150         rspec = RSpec(rspec_string)
151         print>>sys.stderr, "\r\n \r\n \t=============================== SLABDRIVER.PY create_sliver  ============================rspec.version %s " %(rspec.version)
152         
153         
154         # ensure site record exists?
155         # ensure slice record exists
156         slice = slices.verify_slice(slice_hrn, slice_record, peer, sfa_peer, options=options)
157         requested_attributes = rspec.version.get_slice_attributes()
158         
159         if requested_attributes:
160             for attrib_dict in requested_attributes:
161                 if 'timeslot' in attrib_dict and attrib_dict['timeslot'] is not None:
162                     slice.update({'timeslot':attrib_dict['timeslot']})
163         print >>sys.stderr, "\r\n \r\n \t=============================== SLABDRIVER.PY create_sliver  ..... slice %s " %(slice)
164         # ensure person records exists
165         persons = slices.verify_persons(slice_hrn, slice, users, peer, sfa_peer, options=options)
166         # ensure slice attributes exists?
167
168         
169         # add/remove slice from nodes 
170         print >>sys.stderr, "\r\n \r\n \t=============================== SLABDRIVER.PY create_sliver  ..... " 
171        
172         requested_slivers = [node.get('component_name') for node in rspec.version.get_nodes_with_slivers()]
173         print >>sys.stderr, "\r\n \r\n \t=============================== ........... requested_slivers ============================requested_slivers %s " %(requested_slivers)
174         nodes = slices.verify_slice_nodes(slice, requested_slivers, peer) 
175     
176         
177         return aggregate.get_rspec(slice_xrn=slice_urn, version=rspec.version)
178         
179         
180     def delete_sliver (self, slice_urn, slice_hrn, creds, options):
181         
182         slice = self.GetSlices(slice_filter= slice_hrn, slice_filter_type = 'slice_hrn')
183         print>>sys.stderr, "\r\n \r\n \t\t  SLABDRIVER.PY delete_sliver slice %s" %(slice)
184         if not slice:
185             return 1
186        
187         slices = SlabSlices(self)
188         # determine if this is a peer slice
189         # xxx I wonder if this would not need to use PlSlices.get_peer instead 
190         # in which case plc.peers could be deprecated as this here
191         # is the only/last call to this last method in plc.peers
192         peer = slices.get_peer(slice_hrn)
193         try:
194             if peer:
195                 self.UnBindObjectFromPeer('slice', slice['record_id_slice'], peer)
196             self.DeleteSliceFromNodes(slice)
197         finally:
198             if peer:
199                 self.BindObjectToPeer('slice', slice['slice_id'], peer, slice['peer_slice_id'])
200         return 1
201             
202             
203     def AddSlice(self, slice_record):
204         slab_slice = SliceSenslab( slice_hrn = slice_record['slice_hrn'],  record_id_slice= slice_record['record_id_slice'] , record_id_user= slice_record['record_id_user'], peer_authority = slice_record['peer_authority'])
205         print>>sys.stderr, "\r\n \r\n \t\t\t =======SLABDRIVER.PY AddSlice slice_record %s slab_slice %s" %(slice_record,slab_slice)
206         slab_dbsession.add(slab_slice)
207         slab_dbsession.commit()
208         return
209         
210     # first 2 args are None in case of resource discovery
211     def list_resources (self, slice_urn, slice_hrn, creds, options):
212         #cached_requested = options.get('cached', True) 
213     
214         version_manager = VersionManager()
215         # get the rspec's return format from options
216         rspec_version = version_manager.get_version(options.get('geni_rspec_version'))
217         version_string = "rspec_%s" % (rspec_version)
218     
219         #panos adding the info option to the caching key (can be improved)
220         if options.get('info'):
221             version_string = version_string + "_"+options.get('info', 'default')
222     
223         # look in cache first
224         #if cached_requested and self.cache and not slice_hrn:
225             #rspec = self.cache.get(version_string)
226             #if rspec:
227                 #logger.debug("SlabDriver.ListResources: returning cached advertisement")
228                 #return rspec 
229     
230         #panos: passing user-defined options
231
232         aggregate = SlabAggregate(self)
233         origin_hrn = Credential(string=creds[0]).get_gid_caller().get_hrn()
234         options.update({'origin_hrn':origin_hrn})
235         rspec =  aggregate.get_rspec(slice_xrn=slice_urn, version=rspec_version, 
236                                      options=options)
237         print>>sys.stderr, " \r\n \r\n \t SLABDRIVER list_resources rspec " 
238         # cache the result
239         #if self.cache and not slice_hrn:
240             #logger.debug("Slab.ListResources: stores advertisement in cache")
241             #self.cache.add(version_string, rspec)
242     
243         return rspec
244         
245         
246     def list_slices (self, creds, options):
247         # look in cache first
248         #if self.cache:
249             #slices = self.cache.get('slices')
250             #if slices:
251                 #logger.debug("PlDriver.list_slices returns from cache")
252                 #return slices
253     
254         # get data from db 
255         print>>sys.stderr, " \r\n \t\t SLABDRIVER.PY list_slices"
256         slices = self.GetSlices()
257         slice_hrns = [slicename_to_hrn(self.hrn, slice['slice_hrn']) for slice in slices]
258         slice_urns = [hrn_to_urn(slice_hrn, 'slice') for slice_hrn in slice_hrns]
259     
260         # cache the result
261         #if self.cache:
262             #logger.debug ("SlabDriver.list_slices stores value in cache")
263             #self.cache.add('slices', slice_urns) 
264     
265         return slice_urns
266     
267     #No site or node register supported
268     def register (self, sfa_record, hrn, pub_key):
269         type = sfa_record['type']
270         slab_record = self.sfa_fields_to_slab_fields(type, hrn, sfa_record)
271     
272
273         if type == 'slice':
274             acceptable_fields=['url', 'instantiation', 'name', 'description']
275             for key in slab_record.keys():
276                 if key not in acceptable_fields:
277                     slab_record.pop(key) 
278             print>>sys.stderr, " \r\n \t\t SLABDRIVER.PY register"
279             slices = self.GetSlices(slice_filter =slab_record['hrn'], slice_filter_type = 'slice_hrn')
280             if not slices:
281                     pointer = self.AddSlice(slab_record)
282             else:
283                     pointer = slices[0]['slice_id']
284     
285         elif type == 'user':  
286             persons = self.GetPersons([sfa_record])
287             #persons = self.GetPersons([sfa_record['hrn']])
288             if not persons:
289                 pointer = self.AddPerson(dict(sfa_record))
290                 #add in LDAP 
291             else:
292                 pointer = persons[0]['person_id']
293                 
294             #Does this make sense to senslab ?
295             #if 'enabled' in sfa_record and sfa_record['enabled']:
296                 #self.UpdatePerson(pointer, {'enabled': sfa_record['enabled']})
297                 
298             # add this person to the site only if she is being added for the first
299             # time by sfa and doesont already exist in plc
300             if not persons or not persons[0]['site_ids']:
301                 login_base = get_leaf(sfa_record['authority'])
302                 self.AddPersonToSite(pointer, login_base)
303     
304             # What roles should this user have?
305             self.AddRoleToPerson('user', pointer)
306             # Add the user's key
307             if pub_key:
308                 self.AddPersonKey(pointer, {'key_type' : 'ssh', 'key' : pub_key})
309                 
310         #No node adding outside OAR
311
312         return pointer
313             
314     #No site or node record update allowed       
315     def update (self, old_sfa_record, new_sfa_record, hrn, new_key):
316         pointer = old_sfa_record['pointer']
317         type = old_sfa_record['type']
318
319         # new_key implemented for users only
320         if new_key and type not in [ 'user' ]:
321             raise UnknownSfaType(type)
322         
323         #if (type == "authority"):
324             #self.shell.UpdateSite(pointer, new_sfa_record)
325     
326         if type == "slice":
327             slab_record=self.sfa_fields_to_slab_fields(type, hrn, new_sfa_record)
328             if 'name' in slab_record:
329                 slab_record.pop('name')
330                 self.UpdateSlice(pointer, slab_record)
331     
332         elif type == "user":
333             update_fields = {}
334             all_fields = new_sfa_record
335             for key in all_fields.keys():
336                 if key in ['first_name', 'last_name', 'title', 'email',
337                            'password', 'phone', 'url', 'bio', 'accepted_aup',
338                            'enabled']:
339                     update_fields[key] = all_fields[key]
340             self.UpdatePerson(pointer, update_fields)
341     
342             if new_key:
343                 # must check this key against the previous one if it exists
344                 persons = self.GetPersons([pointer], ['key_ids'])
345                 person = persons[0]
346                 keys = person['key_ids']
347                 keys = self.GetKeys(person['key_ids'])
348                 
349                 # Delete all stale keys
350                 key_exists = False
351                 for key in keys:
352                     if new_key != key['key']:
353                         self.DeleteKey(key['key_id'])
354                     else:
355                         key_exists = True
356                 if not key_exists:
357                     self.AddPersonKey(pointer, {'key_type': 'ssh', 'key': new_key})
358
359
360         return True
361         
362
363     def remove (self, sfa_record):
364         type=sfa_record['type']
365         hrn=sfa_record['hrn']
366         record_id= sfa_record['record_id']
367         if type == 'user':
368             username = hrn.split(".")[len(hrn.split(".")) -1]
369             #get user in ldap  
370             persons = self.GetPersons(sfa_record)
371             #persons = self.GetPersons(username)
372             # only delete this person if he has site ids. if he doesnt, it probably means
373             # he was just removed from a site, not actually deleted
374             if persons and persons[0]['site_ids']:
375                 self.DeletePerson(username)
376         elif type == 'slice':
377             if self.GetSlices(slice_filter = hrn, slice_filter_type = 'slice_hrn'):
378                 self.DeleteSlice(hrn)
379
380         #elif type == 'authority':
381             #if self.GetSites(pointer):
382                 #self.DeleteSite(pointer)
383
384         return True
385             
386     def GetPeers (self,auth = None, peer_filter=None, return_fields_list=None):
387
388         existing_records = {}
389         existing_hrns_by_types= {}
390         print >>sys.stderr, "\r\n \r\n SLABDRIVER GetPeers auth = %s, peer_filter %s, return_field %s " %(auth , peer_filter, return_fields_list)
391         all_records = dbsession.query(RegRecord).filter(RegRecord.type.like('%authority%')).all()
392         for record in all_records:
393             existing_records[(record.hrn,record.type)] = record
394             if record.type not in existing_hrns_by_types:
395                 existing_hrns_by_types[record.type] = [record.hrn]
396                 print >>sys.stderr, "\r\n \r\n SLABDRIVER GetPeers \t NOT IN existing_hrns_by_types %s " %( existing_hrns_by_types)
397             else:
398                 
399                 print >>sys.stderr, "\r\n \r\n SLABDRIVER GetPeers \t INNN  type %s hrn %s " %( record.type,record.hrn )
400                 existing_hrns_by_types[record.type].append(record.hrn)
401                 print >>sys.stderr, "\r\n \r\n SLABDRIVER GetPeers \t INNN existing_hrns_by_types %s " %( existing_hrns_by_types)
402                 #existing_hrns_by_types.update({record.type:(existing_hrns_by_types[record.type].append(record.hrn))})
403                         
404         print >>sys.stderr, "\r\n \r\n SLABDRIVER GetPeers        existing_hrns_by_types %s " %( existing_hrns_by_types)
405         records_list= [] 
406       
407         try: 
408             print >>sys.stderr, "\r\n \r\n SLABDRIVER GetPeers  existing_hrns_by_types['authority+sa']  %s \t\t existing_records %s " %(existing_hrns_by_types['authority'],existing_records)
409             if peer_filter:
410                records_list.append(existing_records[(peer_filter,'authority')])
411             else :
412                 for hrn in existing_hrns_by_types['authority']:
413                     records_list.append(existing_records[(hrn,'authority')])
414                     
415             print >>sys.stderr, "\r\n \r\n SLABDRIVER GetPeers  records_list  %s " %(records_list)
416                 
417         except:
418                 pass
419                 
420         return_records = records_list
421         if not peer_filter and not return_fields_list:
422             return records_list
423
424        
425         print >>sys.stderr, "\r\n \r\n SLABDRIVER GetPeers   return_records %s " %(return_records)
426         return return_records
427         
428      
429     #TODO  : Handling OR request in make_ldap_filters_from_records instead of the for loop 
430     #over the records' list
431     def GetPersons(self, person_filter=None, return_fields_list=None):
432         """
433         person_filter should be a list of dictionnaries when not set to None.
434         Returns a list of users found.
435        
436         """
437         print>>sys.stderr, "\r\n \r\n \t\t\t GetPersons person_filter %s" %(person_filter)
438         person_list = []
439         if person_filter and isinstance(person_filter,list):
440         #If we are looking for a list of users (list of dict records)
441         #Usually the list contains only one user record
442             for f in person_filter:
443                 person = self.ldap.LdapFindUser(f)
444                 person_list.append(person)
445           
446         else:
447               person_list  = self.ldap.LdapFindUser()  
448                     
449         return person_list
450  
451
452     def GetTimezone(self):
453         server_timestamp,server_tz = self.oar.parser.SendRequest("GET_timezone")
454         return server_timestamp,server_tz
455     
456
457     def DeleteJobs(self, job_id, slice_hrn):
458         if not job_id:
459             return
460         username  = slice_hrn.split(".")[-1].rstrip("_slice")
461         reqdict = {}
462         reqdict['method'] = "delete"
463         reqdict['strval'] = str(job_id)
464         answer = self.oar.POSTRequestToOARRestAPI('DELETE_jobs_id',reqdict,username)
465         print>>sys.stderr, "\r\n \r\n  jobid  DeleteJobs %s "  %(answer)
466         
467     def GetJobsId(self, job_id, username = None ):
468         """
469         Details about a specific job. 
470         Includes details about submission time, jot type, state, events, 
471         owner, assigned ressources, walltime etc...
472             
473         """
474         req = "GET_jobs_id"
475         node_list_k = 'assigned_network_address'
476         #Get job info from OAR    
477         job_info = self.oar.parser.SendRequest(req, job_id, username)
478
479         logger.debug("SLABDRIVER \t GetJobs  %s " %(job_info))
480         try:
481             if job_info['state'] == 'Terminated':
482                 logger.debug("SLABDRIVER \t GetJobsId job %s TERMINATED"\
483                                                             %(job_id))
484                 return None
485             if job_info['state'] == 'Error':
486                 logger.debug("SLABDRIVER \t GetJobsId ERROR message %s "\
487                                                             %(job_info))
488                 return None
489                                                             
490         except KeyError:
491             logger.error("SLABDRIVER \tGetJobsId KeyError")
492             return None 
493         
494         parsed_job_info  = self.get_info_on_reserved_nodes(job_info,node_list_k)
495         #Replaces the previous entry "assigned_network_address" / "reserved_resources"
496         #with "node_ids"
497         job_info.update({'node_ids':parsed_job_info[node_list_k]})
498         del job_info[node_list_k]
499         logger.debug(" \r\nSLABDRIVER \t GetJobsId job_info %s " %(job_info))
500         return job_info
501
502         
503     def GetJobsResources(self,job_id, username = None):
504         #job_resources=['reserved_resources', 'assigned_resources','job_id', 'job_uri', 'assigned_nodes',\
505         #'api_timestamp']
506         #assigned_res = ['resource_id', 'resource_uri']
507         #assigned_n = ['node', 'node_uri']
508
509         req = "GET_jobs_id_resources"
510         node_list_k = 'reserved_resources' 
511                
512         #Get job info from OAR    
513         job_info = self.oar.parser.SendRequest(req, job_id, username)
514         logger.debug("SLABDRIVER \t GetJobsResources  %s " %(job_info))
515         
516         parsed_job_info  = self.get_info_on_reserved_nodes(job_info,node_list_k)
517         #Replaces the previous entry "assigned_network_address" / "reserved_resources"
518         #with "node_ids"
519         job_info.update({'node_ids':parsed_job_info[node_list_k]})
520         del job_info[node_list_k]
521         return job_info
522
523             
524     def get_info_on_reserved_nodes(self,job_info,node_list_name):
525         #Get the list of the testbed nodes records and make a 
526         #dictionnary keyed on the hostname out of it
527         node_list_dict = self.GetNodes() 
528         #node_hostname_list = []
529         node_hostname_list = [node['hostname'] for node in node_list_dict] 
530         #for node in node_list_dict:
531             #node_hostname_list.append(node['hostname'])
532         node_dict = dict(zip(node_hostname_list,node_list_dict))
533         try :
534             reserved_node_hostname_list = []
535             for index in range(len(job_info[node_list_name])):
536                #job_info[node_list_name][k] = 
537                 reserved_node_hostname_list[index] = \
538                             node_dict[job_info[node_list_name][index]]['hostname']
539                             
540             logger.debug("SLABDRIVER \t get_info_on_reserved_nodes \
541                         reserved_node_hostname_list %s" \
542                         %(reserved_node_hostname_list))
543         except KeyError:
544             logger.error("SLABDRIVER \t get_info_on_reserved_nodes KEYERROR " )
545             
546         return reserved_node_hostname_list  
547               
548     def GetReservedNodes(self):
549         # this function returns a list of all the nodes already involved in an oar job
550
551
552
553
554
555
556
557       
558
559
560
561
562        return self.oar.parser.SendRequest("GET_reserved_nodes") 
563
564      
565     def GetNodes(self,node_filter_dict = None, return_fields_list = None):
566         """
567         node_filter_dict : dictionnary of lists
568         
569         """
570         node_dict_by_id = self.oar.parser.SendRequest("GET_resources_full")
571         node_dict_list = node_dict_by_id.values()
572         
573         #No  filtering needed return the list directly
574         if not (node_filter_dict or return_fields_list):
575             return node_dict_list
576         
577         return_node_list = []
578         if node_filter_dict:
579             for filter_key in node_filter_dict:
580                 try:
581                     #Filter the node_dict_list by each value contained in the 
582                     #list node_filter_dict[filter_key]
583                     for value in node_filter_dict[filter_key]:
584                         for node in node_dict_list:
585                             if node[filter_key] == value:
586                                 if return_fields_list :
587                                    tmp = {}
588                                    for k in return_fields_list:
589                                         tmp[k] = node[k]     
590                                    return_node_list.append(tmp)
591                                 else:
592                                    return_node_list.append(node)
593                 except KeyError:
594                     logger.log_exc("GetNodes KeyError")
595                     return
596
597
598         return return_node_list
599     
600   
601     def GetSites(self, site_filter_name = None, return_fields_list = None):
602         site_dict = self.oar.parser.SendRequest("GET_sites")
603         #site_dict : dict where the key is the sit ename
604         return_site_list = []
605         if not ( site_filter_name or return_fields_list):
606                 return_site_list = site_dict.values()
607                 return return_site_list
608         
609         if site_filter_name in site_dict:
610             if return_fields_list:
611                 for field in return_fields_list:
612                     tmp = {}
613                     Create 
614                     try:
615                         tmp[field] = site_dict[site_filter_name][field]
616                     except KeyError:
617                         logger.error("GetSites KeyError %s "%(field))
618                         return None
619                 return_site_list.append(tmp)
620             else:
621                 return_site_list.append( site_dict[site_filter_name])
622             
623
624         return return_site_list
625         
626
627     def GetSlices(self, slice_filter = None, slice_filter_type = None, \
628                                             return_fields_list=None):
629         return_slice_list = []
630         slicerec  = {}
631         rec = {}
632         authorized_filter_types_list = ['slice_hrn', 'record_id_user']
633         print>>sys.stderr,"\r\n SLABDRIVER \tGetSlices authorized_filter_types_list %s" %(authorized_filter_types_list)
634         if slice_filter_type in authorized_filter_types_list:
635             if slice_filter_type == 'slice_hrn':
636                 slicerec = slab_dbsession.query(SliceSenslab).\
637                                     filter_by(slice_hrn = slice_filter).first()
638                                         
639             if slice_filter_type == 'record_id_user':
640                 slicerec = slab_dbsession.query(SliceSenslab).\
641                                 filter_by(record_id_user = slice_filter).first()
642                 
643             if slicerec:
644                 rec = slicerec.dump_sqlalchemyobj_to_dict()
645                 print>>sys.stderr,"\r\n SLABDRIVER \tGetSlices rec %s" %(rec)
646                 #Get login 
647                 login = slicerec.slice_hrn.split(".")[1].split("_")[0]
648                 logger.debug("\r\n SLABDRIVER \tGetSlices login %s slice record %s"\
649                                                                 %(login,rec))
650                 if slicerec.oar_job_id is not -1:
651                     #Check with OAR the status of the job if a job id is in 
652                     #the slice record 
653                     #rslt = self.GetJobsResources(slicerec.oar_job_id,username = login)
654                     rslt = self.GetJobsId(slicerec.oar_job_id,username = login)
655                     if rslt :
656                         rec.update(rslt)
657                         rec.update({'hrn':str(rec['slice_hrn'])})
658                         #If GetJobsResources is empty, this means the job is now in the 'Terminated' state
659                         #Update the slice record
660                     else :
661                         self.db.update_job(slice_filter, job_id = -1)
662                         rec['oar_job_id'] = -1
663                         rec.update({'hrn':str(rec['slice_hrn'])})
664             
665                 try:
666                     rec['node_ids'] = rec['node_list']
667                 except KeyError:
668                     pass
669                 
670                 #print >>sys.stderr, " \r\n \r\n \tSLABDRIVER.PY  GetSlices  rec  %s" %(rec)
671                               
672             return rec
673                 
674                 
675         else:
676             return_slice_list = slab_dbsession.query(SliceSenslab).all()
677
678         print >>sys.stderr, " \r\n \r\n \tSLABDRIVER.PY  GetSlices  slices %s slice_filter %s " %(return_slice_list,slice_filter)
679         
680         #if return_fields_list:
681             #return_slice_list  = parse_filter(sliceslist, slice_filter,'slice', return_fields_list)
682         
683         
684                     
685         return return_slice_list
686         
687
688         
689     
690     def testbed_name (self): return "senslab2" 
691          
692     # 'geni_request_rspec_versions' and 'geni_ad_rspec_versions' are mandatory
693     def aggregate_version (self):
694         version_manager = VersionManager()
695         ad_rspec_versions = []
696         request_rspec_versions = []
697         for rspec_version in version_manager.versions:
698             if rspec_version.content_type in ['*', 'ad']:
699                 ad_rspec_versions.append(rspec_version.to_dict())
700             if rspec_version.content_type in ['*', 'request']:
701                 request_rspec_versions.append(rspec_version.to_dict()) 
702         return {
703             'testbed':self.testbed_name(),
704             'geni_request_rspec_versions': request_rspec_versions,
705             'geni_ad_rspec_versions': ad_rspec_versions,
706             }
707           
708           
709           
710           
711           
712           
713     ##
714     # Convert SFA fields to PLC fields for use when registering up updating
715     # registry record in the PLC database
716     #
717     # @param type type of record (user, slice, ...)
718     # @param hrn human readable name
719     # @param sfa_fields dictionary of SFA fields
720     # @param slab_fields dictionary of PLC fields (output)
721
722     def sfa_fields_to_slab_fields(self, type, hrn, record):
723
724         def convert_ints(tmpdict, int_fields):
725             for field in int_fields:
726                 if field in tmpdict:
727                     tmpdict[field] = int(tmpdict[field])
728
729         slab_record = {}
730         #for field in record:
731         #    slab_record[field] = record[field]
732  
733         if type == "slice":
734             #instantion used in get_slivers ? 
735             if not "instantiation" in slab_record:
736                 slab_record["instantiation"] = "senslab-instantiated"
737             slab_record["hrn"] = hrn_to_pl_slicename(hrn)
738             print >>sys.stderr, "\r\n \r\n \t SLABDRIVER.PY sfa_fields_to_slab_fields slab_record %s hrn_to_pl_slicename(hrn) hrn %s " %(slab_record['hrn'], hrn)
739             if "url" in record:
740                slab_record["url"] = record["url"]
741             if "description" in record:
742                 slab_record["description"] = record["description"]
743             if "expires" in record:
744                 slab_record["expires"] = int(record["expires"])
745                 
746         #nodes added by OAR only and then imported to SFA
747         #elif type == "node":
748             #if not "hostname" in slab_record:
749                 #if not "hostname" in record:
750                     #raise MissingSfaInfo("hostname")
751                 #slab_record["hostname"] = record["hostname"]
752             #if not "model" in slab_record:
753                 #slab_record["model"] = "geni"
754                 
755         #One authority only 
756         #elif type == "authority":
757             #slab_record["login_base"] = hrn_to_slab_login_base(hrn)
758
759             #if not "name" in slab_record:
760                 #slab_record["name"] = hrn
761
762             #if not "abbreviated_name" in slab_record:
763                 #slab_record["abbreviated_name"] = hrn
764
765             #if not "enabled" in slab_record:
766                 #slab_record["enabled"] = True
767
768             #if not "is_public" in slab_record:
769                 #slab_record["is_public"] = True
770
771         return slab_record
772
773                    
774     def LaunchExperimentOnOAR(self,  slice_dict, added_nodes, slice_user=None):
775        
776         site_list = []
777         nodeid_list =[]
778         resource = ""
779         reqdict = {}
780         slice_name = slice_dict['name']
781         try:
782             slot = slice_dict['timeslot'] 
783             logger.debug("SLABDRIVER.PY \tLaunchExperimentOnOAR slot %s" %(slot))
784         except KeyError:
785             #Running on default parameters
786             #XP immediate , 10 mins
787             slot = {'date':None,'start_time':None, 'timezone':None,'duration':None }#10 min 
788         
789         reqdict['workdir']= '/tmp'   
790         reqdict['resource'] ="{network_address in ("   
791         #reqdict['property'] ="network_address in ("
792         for node in added_nodes: 
793             print>>sys.stderr, "\r\n \r\n \t\tLaunchExperimentOnOAR  node %s" %(node)
794
795             #Get the ID of the node : remove the root auth and put the site in a separate list
796             #s=node.split(".")
797             # NT: it's not clear for me if the nodenames will have the senslab prefix
798             # so lets take the last part only, for now.
799             #lastpart=s[-1]
800             #if s[0] == self.root_auth :
801             # Again here it's not clear if nodes will be prefixed with <site>_, lets split and tanke the last part for now.
802             #s=lastpart.split("_")
803             #nodeid=s[-1]
804             nodeid = node
805             reqdict['resource'] += "'"+ nodeid +"', "
806             nodeid_list.append(nodeid)
807
808
809         reqdict['resource'] =  reqdict['resource'][0: len( reqdict['resource'])-2] +")}/nodes=" + str(len(nodeid_list))
810         if slot['duration']:
811             walltime = slot['duration'].split(":")
812             # Fixing the walltime by adding a few delays. First put the walltime in seconds
813             # oarAdditionalDelay = 20; additional delay for /bin/sleep command to
814             # take in account  prologue and epilogue scripts execution
815             # int walltimeAdditionalDelay = 120;  additional delay
816
817             desired_walltime =  int(walltime[0])*3600 + int(walltime[1]) * 60 + int(walltime[2])
818             total_walltime = desired_walltime + 140 #+2 min 20
819             sleep_walltime = desired_walltime + 20 #+20 sec
820             print>>sys.stderr, "\r\n \r\n \t\tLaunchExperimentOnOAR desired_walltime %s  total_walltime %s sleep_walltime %s  " %(desired_walltime,total_walltime,sleep_walltime)
821             #Put the walltime back in str form
822             #First get the hours
823             walltime[0] = str(total_walltime / 3600)
824             total_walltime = total_walltime - 3600 * int(walltime[0])
825             #Get the remaining minutes
826             walltime[1] = str(total_walltime / 60)
827             total_walltime =  total_walltime - 60 * int(walltime[1])
828             #Get the seconds
829             walltime[2] = str(total_walltime)
830             print>>sys.stderr, "\r\n \r\n \t\tLaunchExperimentOnOAR  walltime %s " %(walltime)
831
832             reqdict['resource']+= ",walltime=" + str(walltime[0]) + ":" + str(walltime[1]) + ":" + str(walltime[2]) 
833             reqdict['script_path'] = "/bin/sleep " + str(sleep_walltime)
834         else:
835             reqdict['resource']+= ",walltime=" + str(00) + ":" + str(12) + ":" + str(20) #+2 min 20
836             reqdict['script_path'] = "/bin/sleep 620" #+20 sec    
837         #In case of a scheduled experiment (not immediate)
838         #To run an XP immediately, don't specify date and time in RSpec 
839         #They will be set to None. 
840         server_timestamp,server_tz = self.GetTimezone()
841         if slot['date'] and slot['start_time']:
842             if slot['timezone'] is '' or slot['timezone'] is None:
843                 #assume it is server timezone
844                 from_zone=tz.gettz(server_tz) 
845                 print>>sys.stderr, "\r\n \r\n \t\tLaunchExperimentOnOAR  timezone not specified  server_tz %s from_zone  %s" %(server_tz,from_zone) 
846             else:
847                 #Get zone of the user from the reservation time given in the rspec
848                 from_zone = tz.gettz(slot['timezone'])  
849                    
850             date = str(slot['date'])  + " " + str(slot['start_time'])
851             user_datetime = datetime.strptime(date, self.time_format)
852             user_datetime = user_datetime.replace(tzinfo = from_zone)
853             
854             #Convert to server zone
855             #to_zone = tz.tzutc()
856             to_zone = tz.gettz(server_tz)
857             reservation_date = user_datetime.astimezone(to_zone)
858             #Readable time accpeted by OAR
859             reqdict['reservation']= reservation_date.strftime(self.time_format)
860         
861             print>>sys.stderr, "\r\n \r\n \t\tLaunchExperimentOnOAR  reqdict['reservation'] %s " %(reqdict['reservation'])
862             
863         else:
864             # Immediate XP
865             # reservations are performed in the oar server timebase, so :
866             # 1- we get the server time(in UTC tz )/server timezone
867             # 2- convert the server UTC time in its timezone
868             # 3- add a custom delay to this time
869             # 4- convert this time to a readable form and it for the reservation request.
870             server_timestamp,server_tz = self.GetTimezone()
871             s_tz=tz.gettz(server_tz)
872             UTC_zone = tz.gettz("UTC")
873             #weird... datetime.fromtimestamp should work since we do from datetime import datetime
874             utc_server= datetime.fromtimestamp(float(server_timestamp)+20,UTC_zone)
875             server_localtime=utc_server.astimezone(s_tz)
876     
877             print>>sys.stderr, "\r\n \r\n \t\tLaunchExperimentOnOAR server_timestamp %s server_tz %s slice_name %s added_nodes %s username %s reqdict %s " %(server_timestamp,server_tz,slice_name,added_nodes,slice_user, reqdict )
878             readable_time = server_localtime.strftime(self.time_format)
879
880             print >>sys.stderr,"  \r\n \r\n \t\t\t\tAPRES ParseTimezone readable_time %s timestanp %s  " %(readable_time ,server_timestamp)
881             reqdict['reservation'] = readable_time
882         
883
884         reqdict['type'] = "deploy" 
885         reqdict['directory']= ""
886         reqdict['name']= "TestSandrine"
887        
888          
889         # first step : start the OAR job and update the job 
890         logger.debug("SLABDRIVER.PY \tLaunchExperimentOnOAR reqdict   %s \r\n site_list   %s"  %(reqdict,site_list) )  
891        
892         answer = self.oar.POSTRequestToOARRestAPI('POST_job',reqdict,slice_user)
893         print>>sys.stderr, "\r\n \r\n LaunchExperimentOnOAR jobid   %s "  %(answer)
894         try:       
895             jobid = answer['id']
896         except KeyError:
897              print>>sys.stderr, "\r\n AddSliceTonode Impossible to create job  %s "  %( answer)
898              return
899         
900         print>>sys.stderr, "\r\n \r\n LaunchExperimentOnOAR jobid    %s added_nodes  %s slice_user %s"  %(jobid,added_nodes,slice_user)
901         self.db.update_job( slice_name, jobid ,added_nodes)
902         
903           
904         # second step : configure the experiment
905         # we need to store the nodes in a yaml (well...) file like this :
906         # [1,56,23,14,45,75] with name /tmp/sfa<jobid>.json
907         f=open('/tmp/sfa/'+str(jobid)+'.json','w')
908         f.write('[')
909         f.write(str(added_nodes[0].strip('node')))
910         for node in added_nodes[1:len(added_nodes)] :
911             f.write(','+node.strip('node'))
912         f.write(']')
913         f.close()
914         
915         # third step : call the senslab-experiment wrapper
916         #command= "java -jar target/sfa-1.0-jar-with-dependencies.jar "+str(jobid)+" "+slice_user
917         javacmdline="/usr/bin/java"
918         jarname="/opt/senslabexperimentwrapper/sfa-1.0-jar-with-dependencies.jar"
919         #ret=subprocess.check_output(["/usr/bin/java", "-jar", ", str(jobid), slice_user])
920         output = subprocess.Popen([javacmdline, "-jar", jarname, str(jobid), slice_user],stdout=subprocess.PIPE).communicate()[0]
921
922         print>>sys.stderr, "\r\n \r\n LaunchExperimentOnOAR wrapper returns   %s "  %(output)
923         return 
924                  
925  
926     #Delete the jobs and updates the job id in the senslab table
927     #to set it to -1  
928     #Does not clear the node list 
929     def DeleteSliceFromNodes(self, slice_record):
930          # Get user information
931        
932         self.DeleteJobs(slice_record['oar_job_id'], slice_record['hrn'])
933         self.db.update_job(slice_record['hrn'], job_id = -1)
934         return   
935     
936  
937
938  
939             
940     def augment_records_with_testbed_info (self, sfa_records):
941         return self.fill_record_info (sfa_records)
942     
943     def fill_record_info(self, record_list):
944         """
945         Given a SFA record, fill in the senslab specific and SFA specific
946         fields in the record. 
947         """
948                     
949         logger.debug("SLABDRIVER \tfill_record_info records %s " %(record_list))
950         if not isinstance(record_list, list):
951             record_list = [record_list]
952             
953         try:
954             for record in record_list:
955                 #If the record is a SFA slice record, then add information 
956                 #about the user of this slice. This kind of information is in the 
957                 #Senslab's DB.
958                 if str(record['type']) == 'slice':
959                     #Get slab slice record.
960                     recslice = self.GetSlices(slice_filter = \
961                                                 str(record['hrn']),\
962                                                 slice_filter_type = 'slice_hrn')
963                     recuser = dbsession.query(RegRecord).filter_by(record_id = \
964                                             recslice['record_id_user']).first()
965                     logger.debug( "SLABDRIVER.PY \t fill_record_info SLICE \
966                                                 rec %s \r\n \r\n" %(recslice)) 
967                     record.update({'PI':[recuser.hrn],
968                             'researcher': [recuser.hrn],
969                             'name':record['hrn'], 
970                             'oar_job_id':recslice['oar_job_id'],
971                             'node_ids': [],
972                             'person_ids':[recslice['record_id_user']],
973                             'geni_urn':'',  #For client_helper.py compatibility
974                             'keys':'',  #For client_helper.py compatibility
975                             'key_ids':''})  #For client_helper.py compatibility
976                     
977                 elif str(record['type']) == 'user':
978                     #The record is a SFA user record.
979                     #Get the information about his slice from Senslab's DB
980                     #and add it to the user record.
981                     recslice = self.GetSlices(slice_filter = \
982                                             record['record_id'],\
983                                             slice_filter_type = 'record_id_user')
984                                             
985                     logger.debug( "SLABDRIVER.PY \t fill_record_info user \
986                                                 rec %s \r\n \r\n" %(recslice)) 
987                     #Append slice record in records list, 
988                     #therefore fetches user and slice info again(one more loop)
989                     #Will update PIs and researcher for the slice
990                     recuser = dbsession.query(RegRecord).filter_by(record_id = \
991                                                  recslice['record_id_user']).first()
992                     recslice.update({'PI':[recuser.hrn],
993                     'researcher': [recuser.hrn],
994                     'name':record['hrn'], 
995                     'oar_job_id':recslice['oar_job_id'],
996                     'node_ids': [],
997                     'person_ids':[recslice['record_id_user']]})
998
999                     #GetPersons takes [] as filters 
1000                     #user_slab = self.GetPersons([{'hrn':recuser.hrn}])
1001                     user_slab = self.GetPersons([record])
1002     
1003                     recslice.update({'type':'slice','hrn':recslice['slice_hrn']})
1004                     record.update(user_slab[0])
1005                     #For client_helper.py compatibility
1006                     record.update( { 'geni_urn':'',
1007                     'keys':'',
1008                     'key_ids':'' })                
1009                     record_list.append(recslice)
1010                     
1011                     logger.debug("SLABDRIVER.PY \tfill_record_info ADDING SLICE\
1012                                 INFO TO USER records %s" %(record_list)) 
1013                         
1014
1015         except TypeError,e:
1016             logger.log_exc("SLABDRIVER \t fill_record_info  EXCEPTION %s" %(e))
1017         
1018         return
1019         
1020         #self.fill_record_slab_info(records)
1021         ##print >>sys.stderr, "\r\n \t\t after fill_record_slab_info %s" %(records)     
1022         #self.fill_record_sfa_info(records)
1023         #print >>sys.stderr, "\r\n \t\t after fill_record_sfa_info"
1024         
1025         
1026
1027     
1028         
1029     #def update_membership_list(self, oldRecord, record, listName, addFunc, delFunc):
1030         ## get a list of the HRNs tht are members of the old and new records
1031         #if oldRecord:
1032             #oldList = oldRecord.get(listName, [])
1033         #else:
1034             #oldList = []     
1035         #newList = record.get(listName, [])
1036
1037         ## if the lists are the same, then we don't have to update anything
1038         #if (oldList == newList):
1039             #return
1040
1041         ## build a list of the new person ids, by looking up each person to get
1042         ## their pointer
1043         #newIdList = []
1044         #table = SfaTable()
1045         #records = table.find({'type': 'user', 'hrn': newList})
1046         #for rec in records:
1047             #newIdList.append(rec['pointer'])
1048
1049         ## build a list of the old person ids from the person_ids field 
1050         #if oldRecord:
1051             #oldIdList = oldRecord.get("person_ids", [])
1052             #containerId = oldRecord.get_pointer()
1053         #else:
1054             ## if oldRecord==None, then we are doing a Register, instead of an
1055             ## update.
1056             #oldIdList = []
1057             #containerId = record.get_pointer()
1058
1059     ## add people who are in the new list, but not the oldList
1060         #for personId in newIdList:
1061             #if not (personId in oldIdList):
1062                 #addFunc(self.plauth, personId, containerId)
1063
1064         ## remove people who are in the old list, but not the new list
1065         #for personId in oldIdList:
1066             #if not (personId in newIdList):
1067                 #delFunc(self.plauth, personId, containerId)
1068
1069     #def update_membership(self, oldRecord, record):
1070         #print >>sys.stderr, " \r\n \r\n ***SLABDRIVER.PY update_membership record ", record
1071         #if record.type == "slice":
1072             #self.update_membership_list(oldRecord, record, 'researcher',
1073                                         #self.users.AddPersonToSlice,
1074                                         #self.users.DeletePersonFromSlice)
1075         #elif record.type == "authority":
1076             ## xxx TODO
1077             #pass
1078
1079 ### thierry
1080 # I don't think you plan on running a component manager at this point
1081 # let me clean up the mess of ComponentAPI that is deprecated anyways