SlabDriver cleaning mainly.
[sfa.git] / sfa / senslab / slabdriver.py
1 import sys
2 import subprocess
3
4 from datetime import datetime
5 from dateutil import tz 
6 from time import strftime,gmtime
7
8 from sfa.util.faults import MissingSfaInfo , SliverDoesNotExist
9 from sfa.util.sfalogging import logger
10 from sfa.util.defaultdict import defaultdict
11
12 from sfa.storage.record import Record
13 from sfa.storage.alchemy import dbsession
14 from sfa.storage.model import RegRecord
15
16 from sfa.trust.credential import Credential
17 from sfa.trust.gid import GID
18
19 from sfa.managers.driver import Driver
20 from sfa.rspecs.version_manager import VersionManager
21 from sfa.rspecs.rspec import RSpec
22
23 from sfa.util.xrn import hrn_to_urn, urn_to_sliver_id
24 from sfa.util.plxrn import slicename_to_hrn, hostname_to_hrn, hrn_to_pl_slicename
25
26 ## thierry: everything that is API-related (i.e. handling incoming requests) 
27 # is taken care of 
28 # SlabDriver should be really only about talking to the senslab testbed
29
30
31 from sfa.senslab.OARrestapi import  OARrestapi
32 from sfa.senslab.LDAPapi import LDAPapi
33
34 from sfa.senslab.slabpostgres import SlabDB, slab_dbsession,SliceSenslab
35 from sfa.senslab.slabaggregate import SlabAggregate
36 from sfa.senslab.slabslices import SlabSlices
37
38
39
40 # thierry : note
41 # this inheritance scheme is so that the driver object can receive
42 # GetNodes or GetSites sorts of calls directly
43 # and thus minimize the differences in the managers with the pl version
44 class SlabDriver(Driver):
45
46     def __init__(self, config):
47         Driver.__init__ (self, config)
48         self.config=config
49         self.hrn = config.SFA_INTERFACE_HRN
50
51         self.root_auth = config.SFA_REGISTRY_ROOT_AUTH
52
53         self.oar = OARrestapi()
54         self.ldap = LDAPapi()
55         self.time_format = "%Y-%m-%d %H:%M:%S"
56         self.db = SlabDB(config)
57         self.cache=None
58         
59     
60     def sliver_status(self,slice_urn,slice_hrn):
61         """Receive a status request for slice named urn/hrn 
62         urn:publicid:IDN+senslab+nturro_slice hrn senslab.nturro_slice
63         shall return a structure as described in
64         http://groups.geni.net/geni/wiki/GAPI_AM_API_V2#SliverStatus
65         NT : not sure if we should implement this or not, but used by sface.
66         
67         """
68         
69         #First get the slice with the slice hrn
70         sl = self.GetSlices(slice_filter = slice_hrn, slice_filter_type = 'slice_hrn')
71         if len(sl) is 0:
72             raise SliverDoesNotExist("%s  slice_hrn" % (slice_hrn))
73         
74         nodes_in_slice = sl['node_ids']
75         if len(nodes_in_slice) is 0:
76             raise SliverDoesNotExist("No slivers allocated ") 
77         
78         logger.debug("Slabdriver - sliver_status Sliver status urn %s hrn %s sl\
79                              %s \r\n " %(slice_urn,slice_hrn,sl) )
80                              
81         if sl['oar_job_id'] is not -1:
82             #A job is running on Senslab for this slice
83             # report about the local nodes that are in the slice only
84             
85             nodes_all = self.GetNodes({'hostname':nodes_in_slice},
86                             ['node_id', 'hostname','site','boot_state'])
87             nodeall_byhostname = dict([(n['hostname'], n) for n in nodes_all])
88             
89
90             result = {}
91             top_level_status = 'unknown'
92             if nodes:
93                 top_level_status = 'ready'
94             result['geni_urn'] = slice_urn
95             result['pl_login'] = sl['job_user'] #For compatibility
96
97             
98             timestamp = float(sl['startTime']) + float(sl['walltime']) 
99             result['pl_expires'] = strftime(self.time_format, \
100                                                     gmtime(float(timestamp)))
101             #result['slab_expires'] = strftime(self.time_format,\
102                                                      #gmtime(float(timestamp)))
103             
104             resources = []
105             for node in nodes:
106                 res = {}
107                 #res['slab_hostname'] = node['hostname']
108                 #res['slab_boot_state'] = node['boot_state']
109                 
110                 res['pl_hostname'] = nodeall_byhostname[node]['hostname']
111                 res['pl_boot_state'] = nodeall_byhostname[node]['boot_state']
112                 res['pl_last_contact'] = strftime(self.time_format, \
113                                                     gmtime(float(timestamp)))
114                 sliver_id = urn_to_sliver_id(slice_urn, sl['record_id_slice'], \
115                                             nodeall_byhostname[node]['node_id']) 
116                 res['geni_urn'] = sliver_id 
117                 if nodeall_byhostname[node]['boot_state'] == 'Alive':
118
119                     res['geni_status'] = 'ready'
120                 else:
121                     res['geni_status'] = 'failed'
122                     top_level_status = 'failed' 
123                     
124                 res['geni_error'] = ''
125         
126                 resources.append(res)
127                 
128             result['geni_status'] = top_level_status
129             result['geni_resources'] = resources 
130             print >>sys.stderr, "\r\n \r\n_____________ Sliver status resources %s res %s \r\n " %(resources,res)
131             return result        
132         
133         
134     def create_sliver (self, slice_urn, slice_hrn, creds, rspec_string, users, options):
135         print>>sys.stderr, "\r\n \r\n \t=============================== SLABDRIVER.PY create_sliver "
136         aggregate = SlabAggregate(self)
137         
138         slices = SlabSlices(self)
139         peer = slices.get_peer(slice_hrn)
140         sfa_peer = slices.get_sfa_peer(slice_hrn)
141         slice_record=None 
142  
143         if not isinstance(creds, list):
144             creds = [creds]
145     
146         if users:
147             slice_record = users[0].get('slice_record', {})
148     
149         # parse rspec
150         rspec = RSpec(rspec_string)
151         print>>sys.stderr, "\r\n \r\n \t=============================== SLABDRIVER.PY create_sliver  ============================rspec.version %s " %(rspec.version)
152         
153         
154         # ensure site record exists?
155         # ensure slice record exists
156         slice = slices.verify_slice(slice_hrn, slice_record, peer, sfa_peer, options=options)
157         requested_attributes = rspec.version.get_slice_attributes()
158         
159         if requested_attributes:
160             for attrib_dict in requested_attributes:
161                 if 'timeslot' in attrib_dict and attrib_dict['timeslot'] is not None:
162                     slice.update({'timeslot':attrib_dict['timeslot']})
163         print >>sys.stderr, "\r\n \r\n \t=============================== SLABDRIVER.PY create_sliver  ..... slice %s " %(slice)
164         # ensure person records exists
165         persons = slices.verify_persons(slice_hrn, slice, users, peer, sfa_peer, options=options)
166         # ensure slice attributes exists?
167
168         
169         # add/remove slice from nodes 
170         print >>sys.stderr, "\r\n \r\n \t=============================== SLABDRIVER.PY create_sliver  ..... " 
171        
172         requested_slivers = [node.get('component_name') for node in rspec.version.get_nodes_with_slivers()]
173         print >>sys.stderr, "\r\n \r\n \t=============================== ........... requested_slivers ============================requested_slivers %s " %(requested_slivers)
174         nodes = slices.verify_slice_nodes(slice, requested_slivers, peer) 
175     
176         
177         return aggregate.get_rspec(slice_xrn=slice_urn, version=rspec.version)
178         
179         
180     def delete_sliver (self, slice_urn, slice_hrn, creds, options):
181         
182         slice = self.GetSlices(slice_filter= slice_hrn, slice_filter_type = 'slice_hrn')
183         print>>sys.stderr, "\r\n \r\n \t\t  SLABDRIVER.PY delete_sliver slice %s" %(slice)
184         if not slice:
185             return 1
186        
187         slices = SlabSlices(self)
188         # determine if this is a peer slice
189         # xxx I wonder if this would not need to use PlSlices.get_peer instead 
190         # in which case plc.peers could be deprecated as this here
191         # is the only/last call to this last method in plc.peers
192         peer = slices.get_peer(slice_hrn)
193         try:
194             if peer:
195                 self.UnBindObjectFromPeer('slice', slice['record_id_slice'], peer)
196             self.DeleteSliceFromNodes(slice)
197         finally:
198             if peer:
199                 self.BindObjectToPeer('slice', slice['slice_id'], peer, slice['peer_slice_id'])
200         return 1
201             
202             
203     def AddSlice(self, slice_record):
204         slab_slice = SliceSenslab( slice_hrn = slice_record['slice_hrn'],  record_id_slice= slice_record['record_id_slice'] , record_id_user= slice_record['record_id_user'], peer_authority = slice_record['peer_authority'])
205         print>>sys.stderr, "\r\n \r\n \t\t\t =======SLABDRIVER.PY AddSlice slice_record %s slab_slice %s" %(slice_record,slab_slice)
206         slab_dbsession.add(slab_slice)
207         slab_dbsession.commit()
208         return
209         
210     # first 2 args are None in case of resource discovery
211     def list_resources (self, slice_urn, slice_hrn, creds, options):
212         #cached_requested = options.get('cached', True) 
213     
214         version_manager = VersionManager()
215         # get the rspec's return format from options
216         rspec_version = version_manager.get_version(options.get('geni_rspec_version'))
217         version_string = "rspec_%s" % (rspec_version)
218     
219         #panos adding the info option to the caching key (can be improved)
220         if options.get('info'):
221             version_string = version_string + "_"+options.get('info', 'default')
222     
223         # look in cache first
224         #if cached_requested and self.cache and not slice_hrn:
225             #rspec = self.cache.get(version_string)
226             #if rspec:
227                 #logger.debug("SlabDriver.ListResources: returning cached advertisement")
228                 #return rspec 
229     
230         #panos: passing user-defined options
231
232         aggregate = SlabAggregate(self)
233         origin_hrn = Credential(string=creds[0]).get_gid_caller().get_hrn()
234         options.update({'origin_hrn':origin_hrn})
235         rspec =  aggregate.get_rspec(slice_xrn=slice_urn, version=rspec_version, 
236                                      options=options)
237         print>>sys.stderr, " \r\n \r\n \t SLABDRIVER list_resources rspec " 
238         # cache the result
239         #if self.cache and not slice_hrn:
240             #logger.debug("Slab.ListResources: stores advertisement in cache")
241             #self.cache.add(version_string, rspec)
242     
243         return rspec
244         
245         
246     def list_slices (self, creds, options):
247         # look in cache first
248         #if self.cache:
249             #slices = self.cache.get('slices')
250             #if slices:
251                 #logger.debug("PlDriver.list_slices returns from cache")
252                 #return slices
253     
254         # get data from db 
255         print>>sys.stderr, " \r\n \t\t SLABDRIVER.PY list_slices"
256         slices = self.GetSlices()
257         slice_hrns = [slicename_to_hrn(self.hrn, slice['slice_hrn']) for slice in slices]
258         slice_urns = [hrn_to_urn(slice_hrn, 'slice') for slice_hrn in slice_hrns]
259     
260         # cache the result
261         #if self.cache:
262             #logger.debug ("SlabDriver.list_slices stores value in cache")
263             #self.cache.add('slices', slice_urns) 
264     
265         return slice_urns
266     
267     #No site or node register supported
268     def register (self, sfa_record, hrn, pub_key):
269         type = sfa_record['type']
270         slab_record = self.sfa_fields_to_slab_fields(type, hrn, sfa_record)
271     
272
273         if type == 'slice':
274             acceptable_fields=['url', 'instantiation', 'name', 'description']
275             for key in slab_record.keys():
276                 if key not in acceptable_fields:
277                     slab_record.pop(key) 
278             print>>sys.stderr, " \r\n \t\t SLABDRIVER.PY register"
279             slices = self.GetSlices(slice_filter =slab_record['hrn'], slice_filter_type = 'slice_hrn')
280             if not slices:
281                     pointer = self.AddSlice(slab_record)
282             else:
283                     pointer = slices[0]['slice_id']
284     
285         elif type == 'user':
286             persons = self.GetPersons([sfa_record['hrn']])
287             if not persons:
288                 pointer = self.AddPerson(dict(sfa_record))
289                 #add in LDAP 
290             else:
291                 pointer = persons[0]['person_id']
292                 
293             #Does this make sense to senslab ?
294             #if 'enabled' in sfa_record and sfa_record['enabled']:
295                 #self.UpdatePerson(pointer, {'enabled': sfa_record['enabled']})
296                 
297             # add this person to the site only if she is being added for the first
298             # time by sfa and doesont already exist in plc
299             if not persons or not persons[0]['site_ids']:
300                 login_base = get_leaf(sfa_record['authority'])
301                 self.AddPersonToSite(pointer, login_base)
302     
303             # What roles should this user have?
304             self.AddRoleToPerson('user', pointer)
305             # Add the user's key
306             if pub_key:
307                 self.AddPersonKey(pointer, {'key_type' : 'ssh', 'key' : pub_key})
308                 
309         #No node adding outside OAR
310
311         return pointer
312             
313     #No site or node record update allowed       
314     def update (self, old_sfa_record, new_sfa_record, hrn, new_key):
315         pointer = old_sfa_record['pointer']
316         type = old_sfa_record['type']
317
318         # new_key implemented for users only
319         if new_key and type not in [ 'user' ]:
320             raise UnknownSfaType(type)
321         
322         #if (type == "authority"):
323             #self.shell.UpdateSite(pointer, new_sfa_record)
324     
325         if type == "slice":
326             slab_record=self.sfa_fields_to_slab_fields(type, hrn, new_sfa_record)
327             if 'name' in slab_record:
328                 slab_record.pop('name')
329                 self.UpdateSlice(pointer, slab_record)
330     
331         elif type == "user":
332             update_fields = {}
333             all_fields = new_sfa_record
334             for key in all_fields.keys():
335                 if key in ['first_name', 'last_name', 'title', 'email',
336                            'password', 'phone', 'url', 'bio', 'accepted_aup',
337                            'enabled']:
338                     update_fields[key] = all_fields[key]
339             self.UpdatePerson(pointer, update_fields)
340     
341             if new_key:
342                 # must check this key against the previous one if it exists
343                 persons = self.GetPersons([pointer], ['key_ids'])
344                 person = persons[0]
345                 keys = person['key_ids']
346                 keys = self.GetKeys(person['key_ids'])
347                 
348                 # Delete all stale keys
349                 key_exists = False
350                 for key in keys:
351                     if new_key != key['key']:
352                         self.DeleteKey(key['key_id'])
353                     else:
354                         key_exists = True
355                 if not key_exists:
356                     self.AddPersonKey(pointer, {'key_type': 'ssh', 'key': new_key})
357
358
359         return True
360         
361
362     def remove (self, sfa_record):
363         type=sfa_record['type']
364         hrn=sfa_record['hrn']
365         record_id= sfa_record['record_id']
366         if type == 'user':
367             username = hrn.split(".")[len(hrn.split(".")) -1]
368             #get user in ldap
369             persons = self.GetPersons(username)
370             # only delete this person if he has site ids. if he doesnt, it probably means
371             # he was just removed from a site, not actually deleted
372             if persons and persons[0]['site_ids']:
373                 self.DeletePerson(username)
374         elif type == 'slice':
375             if self.GetSlices(slice_filter = hrn, slice_filter_type = 'slice_hrn'):
376                 self.DeleteSlice(hrn)
377
378         #elif type == 'authority':
379             #if self.GetSites(pointer):
380                 #self.DeleteSite(pointer)
381
382         return True
383             
384     def GetPeers (self,auth = None, peer_filter=None, return_fields_list=None):
385
386         existing_records = {}
387         existing_hrns_by_types= {}
388         print >>sys.stderr, "\r\n \r\n SLABDRIVER GetPeers auth = %s, peer_filter %s, return_field %s " %(auth , peer_filter, return_fields_list)
389         all_records = dbsession.query(RegRecord).filter(RegRecord.type.like('%authority%')).all()
390         for record in all_records:
391             existing_records[(record.hrn,record.type)] = record
392             if record.type not in existing_hrns_by_types:
393                 existing_hrns_by_types[record.type] = [record.hrn]
394                 print >>sys.stderr, "\r\n \r\n SLABDRIVER GetPeers \t NOT IN existing_hrns_by_types %s " %( existing_hrns_by_types)
395             else:
396                 
397                 print >>sys.stderr, "\r\n \r\n SLABDRIVER GetPeers \t INNN  type %s hrn %s " %( record.type,record.hrn )
398                 existing_hrns_by_types[record.type].append(record.hrn)
399                 print >>sys.stderr, "\r\n \r\n SLABDRIVER GetPeers \t INNN existing_hrns_by_types %s " %( existing_hrns_by_types)
400                 #existing_hrns_by_types.update({record.type:(existing_hrns_by_types[record.type].append(record.hrn))})
401                         
402         print >>sys.stderr, "\r\n \r\n SLABDRIVER GetPeers        existing_hrns_by_types %s " %( existing_hrns_by_types)
403         records_list= [] 
404       
405         try: 
406             print >>sys.stderr, "\r\n \r\n SLABDRIVER GetPeers  existing_hrns_by_types['authority+sa']  %s \t\t existing_records %s " %(existing_hrns_by_types['authority'],existing_records)
407             if peer_filter:
408                records_list.append(existing_records[(peer_filter,'authority')])
409             else :
410                 for hrn in existing_hrns_by_types['authority']:
411                     records_list.append(existing_records[(hrn,'authority')])
412                     
413             print >>sys.stderr, "\r\n \r\n SLABDRIVER GetPeers  records_list  %s " %(records_list)
414                 
415         except:
416                 pass
417                 
418         return_records = records_list
419         if not peer_filter and not return_fields_list:
420             return records_list
421
422        
423         print >>sys.stderr, "\r\n \r\n SLABDRIVER GetPeers   return_records %s " %(return_records)
424         return return_records
425         
426      
427     #TODO  : Handling OR request in make_ldap_filters_from_records instead of the for loop 
428     #over the records' list
429     def GetPersons(self, person_filter=None, return_fields_list=None):
430         """
431         person_filter should be a list of dictionnaries when not set to None.
432         Returns a list of users found.
433        
434         """
435         print>>sys.stderr, "\r\n \r\n \t\t\t GetPersons person_filter %s" %(person_filter)
436         person_list = []
437         if person_filter and isinstance(person_filter,list):
438         #If we are looking for a list of users (list of dict records)
439         #Usually the list contains only one user record
440             for f in person_filter:
441                 person = self.ldap.LdapFindUser(f)
442                 person_list.append(person)
443           
444         else:
445               person_list  = self.ldap.LdapFindUser()  
446                     
447         return person_list
448  
449
450     def GetTimezone(self):
451         server_timestamp,server_tz = self.oar.parser.SendRequest("GET_timezone")
452         return server_timestamp,server_tz
453     
454
455     def DeleteJobs(self, job_id, slice_hrn):
456         if not job_id:
457             return
458         username  = slice_hrn.split(".")[-1].rstrip("_slice")
459         reqdict = {}
460         reqdict['method'] = "delete"
461         reqdict['strval'] = str(job_id)
462         answer = self.oar.POSTRequestToOARRestAPI('DELETE_jobs_id',reqdict,username)
463         print>>sys.stderr, "\r\n \r\n  jobid  DeleteJobs %s "  %(answer)
464         
465     def GetJobsId(self, job_id, username = None ):
466         """
467         Details about a specific job. 
468         Includes details about submission time, jot type, state, events, 
469         owner, assigned ressources, walltime etc...
470             
471         """
472         req = "GET_jobs_id"
473         node_list_k = 'assigned_network_address'
474         #Get job info from OAR    
475         job_info = self.oar.parser.SendRequest(req, job_id, username)
476         
477         logger.debug("SLABDRIVER \t GetJobs  %s " %(job_info))
478         try:
479             if job_info['state'] == 'Terminated':
480                 logger.debug("SLABDRIVER \t GetJobsId job %s TERMINATED"\
481                                                             %(job_id))
482                 return None
483             if job_info['state'] == 'Error':
484                 logger.debug("SLABDRIVER \t GetJobsId ERROR message %s "\
485                                                             %(job_info))
486                 return None
487                                                             
488         except KeyError:
489             logger.error("SLABDRIVER \tGetJobsId KeyError")
490             return None 
491         
492         parsed_job_info  = self.get_info_on_reserved_nodes(job_info,node_list_k)
493         #Replaces the previous entry "assigned_network_address" / "reserved_resources"
494         #with "node_ids"
495         job_info.update({'node_ids':parsed_job_info[node_list_k]})
496         del job_info[node_list_k]
497         logger.debug(" \r\nSLABDRIVER \t GetJobsId job_info %s " %(job_info))
498         return job_info
499
500         
501     def GetJobsResources(self,job_id, return_fields_list=None, username = None):
502         #job_resources=['reserved_resources', 'assigned_resources','job_id', 'job_uri', 'assigned_nodes',\
503         #'api_timestamp']
504         #assigned_res = ['resource_id', 'resource_uri']
505         #assigned_n = ['node', 'node_uri']
506
507         req = "GET_jobs_id_resources"
508         node_list_k = 'reserved_resources' 
509                
510         #Get job info from OAR    
511         job_info = self.oar.parser.SendRequest(req, job_id, username)
512         logger.debug("SLABDRIVER \t GetJobsResources  %s " %(job_info))
513         
514         parsed_job_info  = self.get_info_on_reserved_nodes(job_info,node_list_k)
515         #Replaces the previous entry "assigned_network_address" / "reserved_resources"
516         #with "node_ids"
517         job_info.update({'node_ids':parsed_job_info[node_list_k]})
518         del job_info[node_list_k]
519         return job_info
520
521             
522     def get_info_on_reserved_nodes(self,job_info,node_list_name):
523         #Get the list of the testbed nodes records and make a 
524         #dictionnary keyed on the hostname out of it
525         node_list_dict = self.GetNodes() 
526         #node_hostname_list = []
527         node_hostname_list = [node['hostname'] for node in node_list_dict] 
528         #for node in node_list_dict:
529             #node_hostname_list.append(node['hostname'])
530         node_dict = dict(zip(node_hostname_list,node_list_dict))
531         try :
532             reserved_node_hostname_list = []
533             for index in range(len(job_info[node_list_name])):
534                #job_info[node_list_name][k] = 
535                 reserved_node_hostname_list[index] = \
536                             node_dict[job_info[node_list_name][index]]['hostname']
537                             
538             logger.debug("SLABDRIVER \t get_info_on_reserved_nodes \
539                         reserved_node_hostname_list %s" \
540                         %(reserved_node_hostname_list))
541         except KeyError:
542             logger.error("SLABDRIVER \t get_info_on_reserved_nodes KEYERROR " )
543             
544         return reserved_node_hostname_list  
545               
546     def GetReservedNodes(self):
547         # this function returns a list of all the nodes already involved in an oar job
548        #jobs=self.oar.parser.SendRequest("GET_reserved_nodes") 
549        jobs=self.oar.parser.SendRequest("GET_jobs_details") 
550        nodes=[]
551        for j in jobs :
552           nodes=j['assigned_network_address']+nodes
553        return nodes
554      
555     def GetNodes(self,node_filter_dict = None, return_fields_list = None):
556         """
557         node_filter_dict : dictionnary of lists
558         
559         """
560         node_dict_by_id = self.oar.parser.SendRequest("GET_resources_full")
561         node_dict_list = node_dict_by_id.values()
562         
563         #No  filtering needed return the list directly
564         if not (node_filter_dict or return_fields_list):
565             return node_dict_list
566         
567         return_node_list = []
568         if node_filter_dict:
569             for filter_key in node_filter_dict:
570                 try:
571                     #Filter the node_dict_list by each value contained in the 
572                     #list node_filter_dict[filter_key]
573                     for value in node_filter_dict[filter_key]:
574                         for node in node_dict_list:
575                             if node[filter_key] == value:
576                                 if return_fields_list :
577                                    tmp = {}
578                                    for k in return_fields_list:
579                                         tmp[k] = node[k]     
580                                    return_node_list.append(tmp)
581                                 else:
582                                    return_node_list.append(node)
583                 except KeyError:
584                     logger.log_exc("GetNodes KeyError")
585                     return
586
587
588         return return_node_list
589     
590   
591     def GetSites(self, site_filter_name = None, return_fields_list = None):
592         site_dict = self.oar.parser.SendRequest("GET_sites")
593         #site_dict : dict where the key is the sit ename
594         return_site_list = []
595         if not ( site_filter_name or return_fields_list):
596                 return_site_list = site_dict.values()
597                 return return_site_list
598         
599         if site_filter_name in site_dict:
600             if return_fields_list:
601                 for field in return_fields_list:
602                     tmp = {}
603                     Create 
604                     try:
605                         tmp[field] = site_dict[site_filter_name][field]
606                     except KeyError:
607                         logger.error("GetSites KeyError %s "%(field))
608                         return None
609                 return_site_list.append(tmp)
610             else:
611                 return_site_list.append( site_dict[site_filter_name])
612             
613
614         return return_site_list
615         
616
617     def GetSlices(self, slice_filter = None, slice_filter_type = None, \
618                                             return_fields_list=None):
619         return_slice_list = []
620         slicerec  = {}
621         rec = {}
622         authorized_filter_types_list = ['slice_hrn', 'record_id_user']
623         print>>sys.stderr,"\r\n SLABDRIVER \tGetSlices authorized_filter_types_list %s" %(authorized_filter_types_list)
624         if slice_filter_type in authorized_filter_types_list:
625             if slice_filter_type == 'slice_hrn':
626                 slicerec = slab_dbsession.query(SliceSenslab).\
627                                     filter_by(slice_hrn = slice_filter).first()
628                                         
629             if slice_filter_type == 'record_id_user':
630                 slicerec = slab_dbsession.query(SliceSenslab).\
631                                 filter_by(record_id_user = slice_filter).first()
632                 
633             if slicerec:
634                 rec = slicerec.dump_sqlalchemyobj_to_dict()
635                 print>>sys.stderr,"\r\n SLABDRIVER \tGetSlices rec %s" %(rec)
636                 #Get login 
637                 login = slicerec.slice_hrn.split(".")[1].split("_")[0]
638                 logger.debug("\r\n SLABDRIVER \tGetSlices login %s slice record %s"\
639                                                                 %(login,rec))
640                 if slicerec.oar_job_id is not -1:
641                     #Check with OAR the status of the job if a job id is in 
642                     #the slice record
643                     rslt = self.GetJobsId(slicerec.oar_job_id,username = login)
644                     if rslt :
645                         rec.update(rslt)
646                         rec.update({'hrn':str(rec['slice_hrn'])})
647                         #If GetJobsResources is empty, this means the job is now in the 'Terminated' state
648                         #Update the slice record
649                     else :
650                         self.db.update_job(slice_filter, job_id = -1)
651                         rec['oar_job_id'] = -1
652                         rec.update({'hrn':str(rec['slice_hrn'])})
653             
654                 try:
655                     rec['node_ids'] = rec['node_list']
656                 except KeyError:
657                     pass
658                 
659                 #print >>sys.stderr, " \r\n \r\n \tSLABDRIVER.PY  GetSlices  rec  %s" %(rec)
660                               
661             return rec
662                 
663                 
664         else:
665             return_slice_list = slab_dbsession.query(SliceSenslab).all()
666
667         print >>sys.stderr, " \r\n \r\n \tSLABDRIVER.PY  GetSlices  slices %s slice_filter %s " %(return_slice_list,slice_filter)
668         
669         #if return_fields_list:
670             #return_slice_list  = parse_filter(sliceslist, slice_filter,'slice', return_fields_list)
671         
672         
673                     
674         return return_slice_list
675         
676
677         
678     
679     def testbed_name (self): return "senslab2" 
680          
681     # 'geni_request_rspec_versions' and 'geni_ad_rspec_versions' are mandatory
682     def aggregate_version (self):
683         version_manager = VersionManager()
684         ad_rspec_versions = []
685         request_rspec_versions = []
686         for rspec_version in version_manager.versions:
687             if rspec_version.content_type in ['*', 'ad']:
688                 ad_rspec_versions.append(rspec_version.to_dict())
689             if rspec_version.content_type in ['*', 'request']:
690                 request_rspec_versions.append(rspec_version.to_dict()) 
691         return {
692             'testbed':self.testbed_name(),
693             'geni_request_rspec_versions': request_rspec_versions,
694             'geni_ad_rspec_versions': ad_rspec_versions,
695             }
696           
697           
698           
699           
700           
701           
702     ##
703     # Convert SFA fields to PLC fields for use when registering up updating
704     # registry record in the PLC database
705     #
706     # @param type type of record (user, slice, ...)
707     # @param hrn human readable name
708     # @param sfa_fields dictionary of SFA fields
709     # @param slab_fields dictionary of PLC fields (output)
710
711     def sfa_fields_to_slab_fields(self, type, hrn, record):
712
713         def convert_ints(tmpdict, int_fields):
714             for field in int_fields:
715                 if field in tmpdict:
716                     tmpdict[field] = int(tmpdict[field])
717
718         slab_record = {}
719         #for field in record:
720         #    slab_record[field] = record[field]
721  
722         if type == "slice":
723             #instantion used in get_slivers ? 
724             if not "instantiation" in slab_record:
725                 slab_record["instantiation"] = "senslab-instantiated"
726             slab_record["hrn"] = hrn_to_pl_slicename(hrn)
727             print >>sys.stderr, "\r\n \r\n \t SLABDRIVER.PY sfa_fields_to_slab_fields slab_record %s hrn_to_pl_slicename(hrn) hrn %s " %(slab_record['hrn'], hrn)
728             if "url" in record:
729                slab_record["url"] = record["url"]
730             if "description" in record:
731                 slab_record["description"] = record["description"]
732             if "expires" in record:
733                 slab_record["expires"] = int(record["expires"])
734                 
735         #nodes added by OAR only and then imported to SFA
736         #elif type == "node":
737             #if not "hostname" in slab_record:
738                 #if not "hostname" in record:
739                     #raise MissingSfaInfo("hostname")
740                 #slab_record["hostname"] = record["hostname"]
741             #if not "model" in slab_record:
742                 #slab_record["model"] = "geni"
743                 
744         #One authority only 
745         #elif type == "authority":
746             #slab_record["login_base"] = hrn_to_slab_login_base(hrn)
747
748             #if not "name" in slab_record:
749                 #slab_record["name"] = hrn
750
751             #if not "abbreviated_name" in slab_record:
752                 #slab_record["abbreviated_name"] = hrn
753
754             #if not "enabled" in slab_record:
755                 #slab_record["enabled"] = True
756
757             #if not "is_public" in slab_record:
758                 #slab_record["is_public"] = True
759
760         return slab_record
761
762                    
763     def LaunchExperimentOnOAR(self,  slice_dict, added_nodes, slice_user=None):
764        
765         site_list = []
766         nodeid_list =[]
767         resource = ""
768         reqdict = {}
769         slice_name = slice_dict['name']
770         try:
771             slot = slice_dict['timeslot'] 
772             print>>sys.stderr, "\r\n \r\n \t\tLaunchExperimentOnOAR slot %s   " %(slot)
773         except KeyError:
774             #Running on default parameters
775             #XP immediate , 10 mins
776             slot = {'date':None,'start_time':None, 'timezone':None,'duration':None }#10 min 
777             
778             
779         reqdict['property'] ="network_address in ("
780         for node in added_nodes:
781             #Get the ID of the node : remove the root auth and put the site in a separate list
782             s=node.split(".")
783             # NT: it's not clear for me if the nodenames will have the senslab prefix
784             # so lets take the last part only, for now.
785             lastpart=s[-1]
786             #if s[0] == self.root_auth :
787             # Again here it's not clear if nodes will be prefixed with <site>_, lets split and tanke the last part for now.
788             s=lastpart.split("_")
789             nodeid=s[-1]
790             reqdict['property'] += "'"+ nodeid +"', "
791             nodeid_list.append(nodeid)
792             #site_list.append( l[0] )
793             
794             
795         reqdict['property'] =  reqdict['property'][0: len( reqdict['property'])-2] +")"
796         reqdict['resource'] ="network_address="+ str(len(nodeid_list))
797         
798         if slot['duration']:
799             walltime = slot['duration'].split(":")
800             # Fixing the walltime by adding a few delays. First put the walltime in seconds
801             # oarAdditionalDelay = 20; additional delay for /bin/sleep command to
802             # take in account  prologue and epilogue scripts execution
803             # int walltimeAdditionalDelay = 120;  additional delay
804
805             desired_walltime =  int(walltime[0])*3600 + int(walltime[1]) * 60 + int(walltime[2])
806             total_walltime = desired_walltime + 140 #+2 min 20
807             sleep_walltime = desired_walltime + 20 #+20 sec
808             print>>sys.stderr, "\r\n \r\n \t\tLaunchExperimentOnOAR desired_walltime %s  total_walltime %s sleep_walltime %s  " %(desired_walltime,total_walltime,sleep_walltime)
809             #Put the walltime back in str form
810             #First get the hours
811             walltime[0] = str(total_walltime / 3600)
812             total_walltime = total_walltime - 3600 * int(walltime[0])
813             #Get the remaining minutes
814             walltime[1] = str(total_walltime / 60)
815             total_walltime =  total_walltime - 60 * int(walltime[1])
816             #Get the seconds
817             walltime[2] = str(total_walltime)
818             print>>sys.stderr, "\r\n \r\n \t\tLaunchExperimentOnOAR  walltime %s " %(walltime)
819
820             reqdict['resource']+= ",walltime=" + str(walltime[0]) + ":" + str(walltime[1]) + ":" + str(walltime[2]) 
821             reqdict['script_path'] = "/bin/sleep " + str(sleep_walltime)
822         else:
823             reqdict['resource']+= ",walltime=" + str(00) + ":" + str(12) + ":" + str(20) #+2 min 20
824             reqdict['script_path'] = "/bin/sleep 620" #+20 sec    
825         #In case of a scheduled experiment (not immediate)
826         #To run an XP immediately, don't specify date and time in RSpec 
827         #They will be set to None.
828         if slot['date'] and slot['start_time']:
829             if slot['timezone'] is '' or slot['timezone'] is None:
830                 #assume it is server timezone
831                 server_timestamp,server_tz = self.GetTimezone()
832                 from_zone=tz.gettz(server_tz) 
833                 print>>sys.stderr, "\r\n \r\n \t\tLaunchExperimentOnOAR  timezone not specified  server_tz %s from_zone  %s" %(server_tz,from_zone) 
834             else:
835                 #Get zone of the user from the reservation time given in the rspec
836                 from_zone = tz.gettz(slot['timezone'])  
837                    
838             date = str(slot['date'])  + " " + str(slot['start_time'])
839             user_datetime = datetime.datetime.strptime(date, self.time_format)
840             user_datetime = user_datetime.replace(tzinfo = from_zone)
841             
842             #Convert to UTC zone
843             to_zone = tz.tzutc()
844             utc_date = user_datetime.astimezone(to_zone)
845             #Readable time accpeted by OAR
846             reqdict['reservation']= utc_date.strftime(self.time_format)
847         
848             print>>sys.stderr, "\r\n \r\n \t\tLaunchExperimentOnOAR  reqdict['reservation'] %s " %(reqdict['reservation'])
849             
850         else:
851             # Immediate XP
852             # reservations are performed in the oar server timebase, so :
853             # 1- we get the server time(in UTC tz )/server timezone
854             # 2- convert the server UTC time in its timezone
855             # 3- add a custom delay to this time
856             # 4- convert this time to a readable form and it for the reservation request.
857             server_timestamp,server_tz = self.GetTimezone()
858             s_tz=tz.gettz(server_tz)
859             UTC_zone = tz.gettz("UTC")
860             #weird... datetime.fromtimestamp should work since we do from datetime import datetime
861             utc_server= datetime.datetime.fromtimestamp(float(server_timestamp)+20,UTC_zone)
862             server_localtime=utc_server.astimezone(s_tz)
863     
864             print>>sys.stderr, "\r\n \r\n \t\tLaunchExperimentOnOAR server_timestamp %s server_tz %s slice_name %s added_nodes %s username %s reqdict %s " %(server_timestamp,server_tz,slice_name,added_nodes,slice_user, reqdict )
865             readable_time = server_localtime.strftime(self.time_format)
866
867             print >>sys.stderr,"  \r\n \r\n \t\t\t\tAPRES ParseTimezone readable_time %s timestanp %s  " %(readable_time ,server_timestamp)
868             reqdict['reservation'] = readable_time
869         
870
871         reqdict['type'] = "deploy" 
872         reqdict['directory']= ""
873         reqdict['name']= "TestSandrine"
874        
875          
876         # first step : start the OAR job and update the job 
877         print>>sys.stderr, "\r\n \r\n LaunchExperimentOnOAR reqdict   %s \r\n site_list   %s"  %(reqdict,site_list)   
878        
879         answer = self.oar.POSTRequestToOARRestAPI('POST_job',reqdict,slice_user)
880         print>>sys.stderr, "\r\n \r\n LaunchExperimentOnOAR jobid   %s "  %(answer)
881         try:       
882             jobid = answer['id']
883         except KeyError:
884              print>>sys.stderr, "\r\n AddSliceTonode Impossible to create job  %s "  %( answer)
885              return
886         
887         print>>sys.stderr, "\r\n \r\n LaunchExperimentOnOAR jobid    %s added_nodes  %s slice_user %s"  %(jobid,added_nodes,slice_user)
888         self.db.update_job( slice_name, jobid ,added_nodes)
889         
890           
891         # second step : configure the experiment
892         # we need to store the nodes in a yaml (well...) file like this :
893         # [1,56,23,14,45,75] with name /tmp/sfa<jobid>.json
894         f=open('/tmp/sfa/'+str(jobid)+'.json','w')
895         f.write('[')
896         f.write(str(added_nodes[0].strip('node')))
897         for node in added_nodes[1:len(added_nodes)] :
898             f.write(','+node.strip('node'))
899         f.write(']')
900         f.close()
901         
902         # third step : call the senslab-experiment wrapper
903         #command= "java -jar target/sfa-1.0-jar-with-dependencies.jar "+str(jobid)+" "+slice_user
904         javacmdline="/usr/bin/java"
905         jarname="/opt/senslabexperimentwrapper/sfa-1.0-jar-with-dependencies.jar"
906         #ret=subprocess.check_output(["/usr/bin/java", "-jar", ", str(jobid), slice_user])
907         output = subprocess.Popen([javacmdline, "-jar", jarname, str(jobid), slice_user],stdout=subprocess.PIPE).communicate()[0]
908
909         print>>sys.stderr, "\r\n \r\n LaunchExperimentOnOAR wrapper returns   %s "  %(output)
910         return 
911                  
912  
913     #Delete the jobs and updates the job id in the senslab table
914     #to set it to -1  
915     #Does not clear the node list 
916     def DeleteSliceFromNodes(self, slice_record):
917          # Get user information
918        
919         self.DeleteJobs(slice_record['oar_job_id'], slice_record['hrn'])
920         self.db.update_job(slice_record['hrn'], job_id = -1)
921         return   
922     
923  
924
925  
926             
927     def augment_records_with_testbed_info (self, sfa_records):
928         return self.fill_record_info (sfa_records)
929     
930     def fill_record_info(self, record_list):
931         """
932         Given a SFA record, fill in the senslab specific and SFA specific
933         fields in the record. 
934         """
935                     
936         logger.debug("SLABDRIVER \tfill_record_info records %s " %(record_list))
937         if not isinstance(record_list, list):
938             record_list = [record_list]
939             
940         try:
941             for record in record_list:
942                 #If the record is a SFA slice record, then add information 
943                 #about the user of this slice. This kind of information is in the 
944                 #Senslab's DB.
945                 if str(record['type']) == 'slice':
946                     #Get slab slice record.
947                     recslice = self.GetSlices(slice_filter = \
948                                                 str(record['hrn']),\
949                                                 slice_filter_type = 'slice_hrn')
950                     recuser = dbsession.query(RegRecord).filter_by(record_id = \
951                                             recslice['record_id_user']).first()
952                     logger.debug( "SLABDRIVER.PY \t fill_record_info SLICE \
953                                                 rec %s \r\n \r\n" %(recslice)) 
954                     record.update({'PI':[recuser.hrn],
955                             'researcher': [recuser.hrn],
956                             'name':record['hrn'], 
957                             'oar_job_id':recslice['oar_job_id'],
958                             'node_ids': [],
959                             'person_ids':[recslice['record_id_user']],
960                             'geni_urn':'',  #For client_helper.py compatibility
961                             'keys':'',  #For client_helper.py compatibility
962                             'key_ids':''})  #For client_helper.py compatibility
963                     
964                 elif str(record['type']) == 'user':
965                     #The record is a SFA user record.
966                     #Get the information about his slice from Senslab's DB
967                     #and add it to the user record.
968                     recslice = self.GetSlices(slice_filter = \
969                                             record['record_id'],\
970                                             slice_filter_type = 'record_id_user')
971                                             
972                     logger.debug( "SLABDRIVER.PY \t fill_record_info user \
973                                                 rec %s \r\n \r\n" %(recslice)) 
974                     #Append slice record in records list, 
975                     #therefore fetches user and slice info again(one more loop)
976                     #Will update PIs and researcher for the slice
977                     recuser = dbsession.query(RegRecord).filter_by(record_id = \
978                                                  recslice['record_id_user']).first()
979                     recslice.update({'PI':[recuser.hrn],
980                     'researcher': [recuser.hrn],
981                     'name':record['hrn'], 
982                     'oar_job_id':recslice['oar_job_id'],
983                     'node_ids': [],
984                     'person_ids':[recslice['record_id_user']]})
985
986                     #GetPersons takes [] as filters 
987                     user_slab = self.GetPersons([{'hrn':recuser.hrn}])
988                     
989     
990                     recslice.update({'type':'slice','hrn':recslice['slice_hrn']})
991                     record.update(user_slab[0])
992                     #For client_helper.py compatibility
993                     record.update( { 'geni_urn':'',
994                     'keys':'',
995                     'key_ids':'' })                
996                     record_list.append(recslice)
997                     
998                     logger.debug("SLABDRIVER.PY \tfill_record_info ADDING SLICE\
999                                 INFO TO USER records %s" %(record_list)) 
1000                         
1001
1002         except TypeError,e:
1003             logger.log_exc("SLABDRIVER \t fill_record_info  EXCEPTION %s" %(e))
1004         
1005         return
1006         
1007         #self.fill_record_slab_info(records)
1008         ##print >>sys.stderr, "\r\n \t\t after fill_record_slab_info %s" %(records)     
1009         #self.fill_record_sfa_info(records)
1010         #print >>sys.stderr, "\r\n \t\t after fill_record_sfa_info"
1011         
1012         
1013
1014     
1015         
1016     #def update_membership_list(self, oldRecord, record, listName, addFunc, delFunc):
1017         ## get a list of the HRNs tht are members of the old and new records
1018         #if oldRecord:
1019             #oldList = oldRecord.get(listName, [])
1020         #else:
1021             #oldList = []     
1022         #newList = record.get(listName, [])
1023
1024         ## if the lists are the same, then we don't have to update anything
1025         #if (oldList == newList):
1026             #return
1027
1028         ## build a list of the new person ids, by looking up each person to get
1029         ## their pointer
1030         #newIdList = []
1031         #table = SfaTable()
1032         #records = table.find({'type': 'user', 'hrn': newList})
1033         #for rec in records:
1034             #newIdList.append(rec['pointer'])
1035
1036         ## build a list of the old person ids from the person_ids field 
1037         #if oldRecord:
1038             #oldIdList = oldRecord.get("person_ids", [])
1039             #containerId = oldRecord.get_pointer()
1040         #else:
1041             ## if oldRecord==None, then we are doing a Register, instead of an
1042             ## update.
1043             #oldIdList = []
1044             #containerId = record.get_pointer()
1045
1046     ## add people who are in the new list, but not the oldList
1047         #for personId in newIdList:
1048             #if not (personId in oldIdList):
1049                 #addFunc(self.plauth, personId, containerId)
1050
1051         ## remove people who are in the old list, but not the new list
1052         #for personId in oldIdList:
1053             #if not (personId in newIdList):
1054                 #delFunc(self.plauth, personId, containerId)
1055
1056     #def update_membership(self, oldRecord, record):
1057         #print >>sys.stderr, " \r\n \r\n ***SLABDRIVER.PY update_membership record ", record
1058         #if record.type == "slice":
1059             #self.update_membership_list(oldRecord, record, 'researcher',
1060                                         #self.users.AddPersonToSlice,
1061                                         #self.users.DeletePersonFromSlice)
1062         #elif record.type == "authority":
1063             ## xxx TODO
1064             #pass
1065
1066 ### thierry
1067 # I don't think you plan on running a component manager at this point
1068 # let me clean up the mess of ComponentAPI that is deprecated anyways