Merge branch 'master' into senslab2
[sfa.git] / sfa / senslab / slabdriver.py
1 import sys
2 import subprocess
3
4 from datetime import datetime
5 from dateutil import tz 
6 from time import strftime,gmtime
7
8 from sfa.util.faults import MissingSfaInfo , SliverDoesNotExist
9 from sfa.util.sfalogging import logger
10 from sfa.util.defaultdict import defaultdict
11
12 from sfa.storage.record import Record
13 from sfa.storage.alchemy import dbsession
14 from sfa.storage.model import RegRecord
15
16
17 from sfa.trust.certificate import *
18 from sfa.trust.credential import *
19 from sfa.trust.gid import GID
20
21 from sfa.managers.driver import Driver
22 from sfa.rspecs.version_manager import VersionManager
23 from sfa.rspecs.rspec import RSpec
24
25 from sfa.util.xrn import hrn_to_urn, urn_to_sliver_id
26 from sfa.util.plxrn import slicename_to_hrn, hostname_to_hrn, hrn_to_pl_slicename
27
28 ## thierry: everything that is API-related (i.e. handling incoming requests) 
29 # is taken care of 
30 # SlabDriver should be really only about talking to the senslab testbed
31
32 ## thierry : please avoid wildcard imports :)
33 from sfa.senslab.OARrestapi import  OARrestapi
34 from sfa.senslab.LDAPapi import LDAPapi
35
36 from sfa.senslab.parsing import parse_filter
37 from sfa.senslab.slabpostgres import SlabDB, slab_dbsession,SliceSenslab
38 from sfa.senslab.slabaggregate import SlabAggregate
39 from sfa.senslab.slabslices import SlabSlices
40
41 def list_to_dict(recs, key):
42     """
43     convert a list of dictionaries into a dictionary keyed on the 
44     specified dictionary key 
45     """
46
47     keys = [rec[key] for rec in recs]
48     return dict(zip(keys, recs))
49
50 # thierry : note
51 # this inheritance scheme is so that the driver object can receive
52 # GetNodes or GetSites sorts of calls directly
53 # and thus minimize the differences in the managers with the pl version
54 class SlabDriver(Driver):
55
56     def __init__(self, config):
57         Driver.__init__ (self, config)
58         self.config=config
59         self.hrn = config.SFA_INTERFACE_HRN
60     
61         self.root_auth = config.SFA_REGISTRY_ROOT_AUTH
62
63         
64         print >>sys.stderr, "\r\n_____________ SFA SENSLAB DRIVER \r\n" 
65
66         self.oar = OARrestapi()
67         self.ldap = LDAPapi()
68         self.time_format = "%Y-%m-%d %H:%M:%S"
69         self.db = SlabDB(config)
70         self.cache=None
71         
72     
73     def sliver_status(self,slice_urn,slice_hrn):
74         # receive a status request for slice named urn/hrn urn:publicid:IDN+senslab+nturro_slice hrn senslab.nturro_slice
75         # shall return a structure as described in
76         # http://groups.geni.net/geni/wiki/GAPI_AM_API_V2#SliverStatus
77         # NT : not sure if we should implement this or not, but used by sface.
78         
79
80         sl = self.GetSlices(slice_filter= slice_hrn, filter_type = 'slice_hrn')
81         if len(sl) is 0:
82             raise SliverDoesNotExist("%s  slice_hrn" % (slice_hrn))
83
84         print >>sys.stderr, "\r\n \r\n_____________ Sliver status urn %s hrn %s sl %s \r\n " %(slice_urn,slice_hrn,sl)
85         if sl['oar_job_id'] is not -1:
86     
87             # report about the local nodes only
88             nodes_all = self.GetNodes({'hostname':sl['node_ids']},
89                             ['node_id', 'hostname','site','boot_state'])
90             nodeall_byhostname = dict([(n['hostname'], n) for n in nodes_all])
91             nodes = sl['node_ids']
92             if len(nodes) is 0:
93                 raise SliverDoesNotExist("No slivers allocated ") 
94                     
95
96             result = {}
97             top_level_status = 'unknown'
98             if nodes:
99                 top_level_status = 'ready'
100             result['geni_urn'] = slice_urn
101             result['pl_login'] = sl['job_user']
102             #result['slab_login'] = sl['job_user']
103             
104             timestamp = float(sl['startTime']) + float(sl['walltime']) 
105             result['pl_expires'] = strftime(self.time_format, gmtime(float(timestamp)))
106             #result['slab_expires'] = strftime(self.time_format, gmtime(float(timestamp)))
107             
108             resources = []
109             for node in nodes:
110                 res = {}
111                 #res['slab_hostname'] = node['hostname']
112                 #res['slab_boot_state'] = node['boot_state']
113                 
114                 res['pl_hostname'] = nodeall_byhostname[node]['hostname']
115                 res['pl_boot_state'] = nodeall_byhostname[node]['boot_state']
116                 res['pl_last_contact'] = strftime(self.time_format, gmtime(float(timestamp)))
117                 sliver_id = urn_to_sliver_id(slice_urn, sl['record_id_slice'],nodeall_byhostname[node]['node_id'] ) 
118                 res['geni_urn'] = sliver_id 
119                 if nodeall_byhostname[node]['boot_state'] == 'Alive':
120
121                     res['geni_status'] = 'ready'
122                 else:
123                     res['geni_status'] = 'failed'
124                     top_level_status = 'failed' 
125                     
126                 res['geni_error'] = ''
127         
128                 resources.append(res)
129                 
130             result['geni_status'] = top_level_status
131             result['geni_resources'] = resources 
132             print >>sys.stderr, "\r\n \r\n_____________ Sliver status resources %s res %s \r\n " %(resources,res)
133             return result        
134         
135         
136     def create_sliver (self, slice_urn, slice_hrn, creds, rspec_string, users, options):
137         print>>sys.stderr, "\r\n \r\n \t=============================== SLABDRIVER.PY create_sliver "
138         aggregate = SlabAggregate(self)
139         
140         slices = SlabSlices(self)
141         peer = slices.get_peer(slice_hrn)
142         sfa_peer = slices.get_sfa_peer(slice_hrn)
143         slice_record=None 
144  
145         if not isinstance(creds, list):
146             creds = [creds]
147     
148         if users:
149             slice_record = users[0].get('slice_record', {})
150     
151         # parse rspec
152         rspec = RSpec(rspec_string)
153         print>>sys.stderr, "\r\n \r\n \t=============================== SLABDRIVER.PY create_sliver  ============================rspec.version %s " %(rspec.version)
154         
155         
156         # ensure site record exists?
157         # ensure slice record exists
158         slice = slices.verify_slice(slice_hrn, slice_record, peer, sfa_peer, options=options)
159         requested_attributes = rspec.version.get_slice_attributes()
160         
161         if requested_attributes:
162             for attrib_dict in requested_attributes:
163                 if 'timeslot' in attrib_dict and attrib_dict['timeslot'] is not None:
164                     slice.update({'timeslot':attrib_dict['timeslot']})
165         print >>sys.stderr, "\r\n \r\n \t=============================== SLABDRIVER.PY create_sliver  ..... slice %s " %(slice)
166         # ensure person records exists
167         persons = slices.verify_persons(slice_hrn, slice, users, peer, sfa_peer, options=options)
168         # ensure slice attributes exists?
169
170         
171         # add/remove slice from nodes 
172         print >>sys.stderr, "\r\n \r\n \t=============================== SLABDRIVER.PY create_sliver  ..... " 
173        
174         requested_slivers = [node.get('component_name') for node in rspec.version.get_nodes_with_slivers()]
175         print >>sys.stderr, "\r\n \r\n \t=============================== ........... requested_slivers ============================requested_slivers %s " %(requested_slivers)
176         nodes = slices.verify_slice_nodes(slice, requested_slivers, peer) 
177     
178         
179         return aggregate.get_rspec(slice_xrn=slice_urn, version=rspec.version)
180         
181         
182     def delete_sliver (self, slice_urn, slice_hrn, creds, options):
183         
184         slice = self.GetSlices(slice_filter= slice_hrn, filter_type = 'slice_hrn')
185         print>>sys.stderr, "\r\n \r\n \t\t  SLABDRIVER.PY delete_sliver slice %s" %(slice)
186         if not slice:
187             return 1
188        
189         slices = SlabSlices(self)
190         # determine if this is a peer slice
191         # xxx I wonder if this would not need to use PlSlices.get_peer instead 
192         # in which case plc.peers could be deprecated as this here
193         # is the only/last call to this last method in plc.peers
194         peer = slices.get_peer(slice_hrn)
195         try:
196             if peer:
197                 self.UnBindObjectFromPeer('slice', slice['record_id_slice'], peer)
198             self.DeleteSliceFromNodes(slice)
199         finally:
200             if peer:
201                 self.BindObjectToPeer('slice', slice['slice_id'], peer, slice['peer_slice_id'])
202         return 1
203             
204             
205     def AddSlice(self, slice_record):
206         slab_slice = SliceSenslab( slice_hrn = slice_record['slice_hrn'],  record_id_slice= slice_record['record_id_slice'] , record_id_user= slice_record['record_id_user'], peer_authority = slice_record['peer_authority'])
207         print>>sys.stderr, "\r\n \r\n \t\t\t =======SLABDRIVER.PY AddSlice slice_record %s slab_slice %s" %(slice_record,slab_slice)
208         slab_dbsession.add(slab_slice)
209         slab_dbsession.commit()
210         return
211         
212     # first 2 args are None in case of resource discovery
213     def list_resources (self, slice_urn, slice_hrn, creds, options):
214         #cached_requested = options.get('cached', True) 
215     
216         version_manager = VersionManager()
217         # get the rspec's return format from options
218         rspec_version = version_manager.get_version(options.get('geni_rspec_version'))
219         version_string = "rspec_%s" % (rspec_version)
220     
221         #panos adding the info option to the caching key (can be improved)
222         if options.get('info'):
223             version_string = version_string + "_"+options.get('info', 'default')
224     
225         # look in cache first
226         #if cached_requested and self.cache and not slice_hrn:
227             #rspec = self.cache.get(version_string)
228             #if rspec:
229                 #logger.debug("SlabDriver.ListResources: returning cached advertisement")
230                 #return rspec 
231     
232         #panos: passing user-defined options
233
234         aggregate = SlabAggregate(self)
235         origin_hrn = Credential(string=creds[0]).get_gid_caller().get_hrn()
236         options.update({'origin_hrn':origin_hrn})
237         rspec =  aggregate.get_rspec(slice_xrn=slice_urn, version=rspec_version, 
238                                      options=options)
239         print>>sys.stderr, " \r\n \r\n \t SLABDRIVER list_resources rspec " 
240         # cache the result
241         #if self.cache and not slice_hrn:
242             #logger.debug("Slab.ListResources: stores advertisement in cache")
243             #self.cache.add(version_string, rspec)
244     
245         return rspec
246         
247         
248     def list_slices (self, creds, options):
249         # look in cache first
250         #if self.cache:
251             #slices = self.cache.get('slices')
252             #if slices:
253                 #logger.debug("PlDriver.list_slices returns from cache")
254                 #return slices
255     
256         # get data from db 
257         print>>sys.stderr, " \r\n \t\t SLABDRIVER.PY list_slices"
258         slices = self.GetSlices()
259         slice_hrns = [slicename_to_hrn(self.hrn, slice['slice_hrn']) for slice in slices]
260         slice_urns = [hrn_to_urn(slice_hrn, 'slice') for slice_hrn in slice_hrns]
261     
262         # cache the result
263         #if self.cache:
264             #logger.debug ("SlabDriver.list_slices stores value in cache")
265             #self.cache.add('slices', slice_urns) 
266     
267         return slice_urns
268     
269     #No site or node register supported
270     def register (self, sfa_record, hrn, pub_key):
271         type = sfa_record['type']
272         slab_record = self.sfa_fields_to_slab_fields(type, hrn, sfa_record)
273     
274
275         if type == 'slice':
276             acceptable_fields=['url', 'instantiation', 'name', 'description']
277             for key in slab_record.keys():
278                 if key not in acceptable_fields:
279                     slab_record.pop(key) 
280             print>>sys.stderr, " \r\n \t\t SLABDRIVER.PY register"
281             slices = self.GetSlices(slice_filter =slab_record['hrn'], filter_type = 'slice_hrn')
282             if not slices:
283                     pointer = self.AddSlice(slab_record)
284             else:
285                     pointer = slices[0]['slice_id']
286     
287         elif type == 'user':
288             persons = self.GetPersons([sfa_record['hrn']])
289             if not persons:
290                 pointer = self.AddPerson(dict(sfa_record))
291                 #add in LDAP 
292             else:
293                 pointer = persons[0]['person_id']
294                 
295             #Does this make sense to senslab ?
296             #if 'enabled' in sfa_record and sfa_record['enabled']:
297                 #self.UpdatePerson(pointer, {'enabled': sfa_record['enabled']})
298                 
299             # add this person to the site only if she is being added for the first
300             # time by sfa and doesont already exist in plc
301             if not persons or not persons[0]['site_ids']:
302                 login_base = get_leaf(sfa_record['authority'])
303                 self.AddPersonToSite(pointer, login_base)
304     
305             # What roles should this user have?
306             self.AddRoleToPerson('user', pointer)
307             # Add the user's key
308             if pub_key:
309                 self.AddPersonKey(pointer, {'key_type' : 'ssh', 'key' : pub_key})
310                 
311         #No node adding outside OAR
312
313         return pointer
314             
315     #No site or node record update allowed       
316     def update (self, old_sfa_record, new_sfa_record, hrn, new_key):
317         pointer = old_sfa_record['pointer']
318         type = old_sfa_record['type']
319
320         # new_key implemented for users only
321         if new_key and type not in [ 'user' ]:
322             raise UnknownSfaType(type)
323         
324         #if (type == "authority"):
325             #self.shell.UpdateSite(pointer, new_sfa_record)
326     
327         if type == "slice":
328             slab_record=self.sfa_fields_to_slab_fields(type, hrn, new_sfa_record)
329             if 'name' in slab_record:
330                 slab_record.pop('name')
331                 self.UpdateSlice(pointer, slab_record)
332     
333         elif type == "user":
334             update_fields = {}
335             all_fields = new_sfa_record
336             for key in all_fields.keys():
337                 if key in ['first_name', 'last_name', 'title', 'email',
338                            'password', 'phone', 'url', 'bio', 'accepted_aup',
339                            'enabled']:
340                     update_fields[key] = all_fields[key]
341             self.UpdatePerson(pointer, update_fields)
342     
343             if new_key:
344                 # must check this key against the previous one if it exists
345                 persons = self.GetPersons([pointer], ['key_ids'])
346                 person = persons[0]
347                 keys = person['key_ids']
348                 keys = self.GetKeys(person['key_ids'])
349                 
350                 # Delete all stale keys
351                 key_exists = False
352                 for key in keys:
353                     if new_key != key['key']:
354                         self.DeleteKey(key['key_id'])
355                     else:
356                         key_exists = True
357                 if not key_exists:
358                     self.AddPersonKey(pointer, {'key_type': 'ssh', 'key': new_key})
359
360
361         return True
362         
363
364     def remove (self, sfa_record):
365         type=sfa_record['type']
366         hrn=sfa_record['hrn']
367         record_id= sfa_record['record_id']
368         if type == 'user':
369             username = hrn.split(".")[len(hrn.split(".")) -1]
370             #get user in ldap
371             persons = self.GetPersons(username)
372             # only delete this person if he has site ids. if he doesnt, it probably means
373             # he was just removed from a site, not actually deleted
374             if persons and persons[0]['site_ids']:
375                 self.DeletePerson(username)
376         elif type == 'slice':
377             if self.GetSlices(slice_filter = hrn, filter_type = 'slice_hrn'):
378                 self.DeleteSlice(hrn)
379
380         #elif type == 'authority':
381             #if self.GetSites(pointer):
382                 #self.DeleteSite(pointer)
383
384         return True
385             
386     def GetPeers (self,auth = None, peer_filter=None, return_fields=None):
387
388         existing_records = {}
389         existing_hrns_by_types= {}
390         print >>sys.stderr, "\r\n \r\n SLABDRIVER GetPeers auth = %s, peer_filter %s, return_field %s " %(auth , peer_filter, return_fields)
391         all_records = dbsession.query(RegRecord).filter(RegRecord.type.like('%authority%')).all()
392         for record in all_records:
393             existing_records[(record.hrn,record.type)] = record
394             if record.type not in existing_hrns_by_types:
395                 existing_hrns_by_types[record.type] = [record.hrn]
396                 print >>sys.stderr, "\r\n \r\n SLABDRIVER GetPeers \t NOT IN existing_hrns_by_types %s " %( existing_hrns_by_types)
397             else:
398                 
399                 print >>sys.stderr, "\r\n \r\n SLABDRIVER GetPeers \t INNN  type %s hrn %s " %( record.type,record.hrn )
400                 existing_hrns_by_types[record.type].append(record.hrn)
401                 print >>sys.stderr, "\r\n \r\n SLABDRIVER GetPeers \t INNN existing_hrns_by_types %s " %( existing_hrns_by_types)
402                 #existing_hrns_by_types.update({record.type:(existing_hrns_by_types[record.type].append(record.hrn))})
403                         
404         print >>sys.stderr, "\r\n \r\n SLABDRIVER GetPeers        existing_hrns_by_types %s " %( existing_hrns_by_types)
405         records_list= [] 
406       
407         try: 
408             print >>sys.stderr, "\r\n \r\n SLABDRIVER GetPeers  existing_hrns_by_types['authority+sa']  %s \t\t existing_records %s " %(existing_hrns_by_types['authority'],existing_records)
409             if peer_filter:
410                records_list.append(existing_records[(peer_filter,'authority')])
411             else :
412                 for hrn in existing_hrns_by_types['authority']:
413                     records_list.append(existing_records[(hrn,'authority')])
414                     
415             print >>sys.stderr, "\r\n \r\n SLABDRIVER GetPeers  records_list  %s " %(records_list)
416                 
417         except:
418                 pass
419                 
420         return_records = records_list
421         if not peer_filter and not return_fields:
422             return records_list
423         #return_records = parse_filter(records_list,peer_filter, 'peers', return_fields) 
424        
425         print >>sys.stderr, "\r\n \r\n SLABDRIVER GetPeers   return_records %s " %(return_records)
426         return return_records
427         
428      
429     #TODO  : Handling OR request in make_ldap_filters_from_records instead of the for loop 
430     #over the records' list
431     def GetPersons(self, person_filter=None, return_fields=None):
432         """
433         person_filter should be a list of dictionnaries when not set to None.
434         Returns a list of users found.
435        
436         """
437         print>>sys.stderr, "\r\n \r\n \t\t\t GetPersons person_filter %s" %(person_filter)
438         person_list = []
439         if person_filter and isinstance(person_filter,list):
440         #If we are looking for a list of users (list of dict records)
441         #Usually the list contains only one user record
442             for f in person_filter:
443                 person = self.ldap.ldapSearch(f)
444                 person_list.append(person)
445           
446         else:
447               person_list  = self.ldap.ldapSearch()  
448                     
449         return person_list
450             #person_list = self.ldap.ldapFindHrn({'authority': self.root_auth })
451         ##check = False
452         ##if person_filter and isinstance(person_filter, dict):
453             ##for k in  person_filter.keys():
454                 ##if k in person_list[0].keys():
455                     ##check = True
456                     
457         #return_person_list = parse_filter(person_list,person_filter ,'persons', return_fields)
458         #if return_person_list:
459             #print>>sys.stderr, " \r\n GetPersons person_filter %s return_fields %s  " %(person_filter,return_fields)
460             #return return_person_list
461
462     def GetTimezone(self):
463         server_timestamp,server_tz = self.oar.parser.SendRequest("GET_timezone")
464         return server_timestamp,server_tz
465     
466
467     def DeleteJobs(self, job_id, slice_hrn):
468         if not job_id:
469             return
470         username  = slice_hrn.split(".")[-1].rstrip("_slice")
471         reqdict = {}
472         reqdict['method'] = "delete"
473         reqdict['strval'] = str(job_id)
474         answer = self.oar.POSTRequestToOARRestAPI('DELETE_jobs_id',reqdict,username)
475         print>>sys.stderr, "\r\n \r\n  jobid  DeleteJobs %s "  %(answer)
476         
477                 
478     def GetJobs(self,job_id= None, resources=True,return_fields=None, username = None):
479         #job_resources=['reserved_resources', 'assigned_resources','job_id', 'job_uri', 'assigned_nodes',\
480         #'api_timestamp']
481         #assigned_res = ['resource_id', 'resource_uri']
482         #assigned_n = ['node', 'node_uri']
483      
484         if job_id and resources is False:
485             req = "GET_jobs_id"
486             node_list_k = 'assigned_network_address'
487            
488         if job_id and resources :
489             req = "GET_jobs_id_resources"
490             node_list_k = 'reserved_resources' 
491                
492         #Get job info from OAR    
493         job_info = self.oar.parser.SendRequest(req, job_id, username)
494         print>>sys.stderr, "\r\n \r\n \t\t GetJobs  %s " %(job_info)
495         
496         if 'state' in job_info :
497             if job_info['state'] == 'Terminated':
498                 print>>sys.stderr, "\r\n \r\n \t\t GetJobs TERMINELEBOUSIN "
499                 return None
500             if job_info['state'] == 'Error':
501                 print>>sys.stderr, "\r\n \r\n \t\t GetJobs ERROR message %s " %(job_info)
502                 return None
503         
504         #Get a dict of nodes . Key :hostname of the node
505         node_list = self.GetNodes() 
506         node_hostname_list = []
507         for node in node_list:
508             node_hostname_list.append(node['hostname'])
509         node_dict = dict(zip(node_hostname_list,node_list))
510         try :
511             liste =job_info[node_list_k] 
512             for k in range(len(liste)):
513                job_info[node_list_k][k] = node_dict[job_info[node_list_k][k]]['hostname']
514             
515             #Replaces the previous entry "assigned_network_address" / "reserved_resources"
516             #with "node_ids"
517             job_info.update({'node_ids':job_info[node_list_k]})
518             del job_info[node_list_k]
519             return job_info
520             
521         except KeyError:
522             print>>sys.stderr, "\r\n \r\n \t\t GetJobs KEYERROR " 
523             
524     def GetReservedNodes(self):
525         # this function returns a list of all the nodes already involved in an oar job
526
527        jobs=self.oar.parser.SendRequest("GET_jobs_details") 
528        nodes=[]
529        for j in jobs :
530           nodes=j['assigned_network_address']+nodes
531        return nodes
532      
533     def GetNodes(self,node_filter= None, return_fields=None):
534         node_dict =self.oar.parser.SendRequest("GET_resources_full")
535
536         return_node_list = []
537         if not (node_filter or return_fields):
538                 return_node_list = node_dict.values()
539                 return return_node_list
540     
541         return_node_list= parse_filter(node_dict.values(),node_filter ,'node', return_fields)
542         return return_node_list
543     
544   
545     def GetSites(self, site_filter = None, return_fields=None):
546         site_dict =self.oar.parser.SendRequest("GET_sites")
547         return_site_list = []
548         if not ( site_filter or return_fields):
549                 return_site_list = site_dict.values()
550                 return return_site_list
551     
552         return_site_list = parse_filter(site_dict.values(), site_filter,'site', return_fields)
553         return return_site_list
554         
555
556     def GetSlices(self,slice_filter = None, filter_type = None, return_fields=None):
557         return_slice_list = []
558         slicerec  = {}
559         rec = {}
560         ftypes = ['slice_hrn', 'record_id_user']
561         if filter_type and filter_type in ftypes:
562             if filter_type == 'slice_hrn':
563                 slicerec = slab_dbsession.query(SliceSenslab).filter_by(slice_hrn = slice_filter).first()    
564             if filter_type == 'record_id_user':
565                 slicerec = slab_dbsession.query(SliceSenslab).filter_by(record_id_user = slice_filter).first()
566                 
567             if slicerec:
568                 rec = slicerec.dumpquerytodict()
569                 login = slicerec.slice_hrn.split(".")[1].split("_")[0]
570                 #print >>sys.stderr, " \r\n \r\n \tSLABDRIVER.PY slicerec GetSlices   %s " %(slicerec)
571                 if slicerec.oar_job_id is not -1:
572                     rslt = self.GetJobs( slicerec.oar_job_id, resources=False, username = login )
573                     #print >>sys.stderr, " \r\n \r\n \tSLABDRIVER.PY  GetSlices  GetJobs  %s " %(rslt)     
574                     if rslt :
575                         rec.update(rslt)
576                         rec.update({'hrn':str(rec['slice_hrn'])})
577                         #If GetJobs is empty, this means the job is now in the 'Terminated' state
578                         #Update the slice record
579                     else :
580                         self.db.update_job(slice_filter, job_id = -1)
581                         rec['oar_job_id'] = -1
582                         rec.update({'hrn':str(rec['slice_hrn'])})
583             
584                 try:
585                     rec['node_ids'] = rec['node_list']
586                 except KeyError:
587                     pass
588                 
589                 #print >>sys.stderr, " \r\n \r\n \tSLABDRIVER.PY  GetSlices  rec  %s" %(rec)
590                               
591             return rec
592                 
593                 
594         else:
595             return_slice_list = slab_dbsession.query(SliceSenslab).all()
596
597         print >>sys.stderr, " \r\n \r\n \tSLABDRIVER.PY  GetSlices  slices %s slice_filter %s " %(return_slice_list,slice_filter)
598         
599         #if return_fields:
600             #return_slice_list  = parse_filter(sliceslist, slice_filter,'slice', return_fields)
601         
602         
603                     
604         return return_slice_list
605         
606
607         
608     
609     def testbed_name (self): return "senslab2" 
610          
611     # 'geni_request_rspec_versions' and 'geni_ad_rspec_versions' are mandatory
612     def aggregate_version (self):
613         version_manager = VersionManager()
614         ad_rspec_versions = []
615         request_rspec_versions = []
616         for rspec_version in version_manager.versions:
617             if rspec_version.content_type in ['*', 'ad']:
618                 ad_rspec_versions.append(rspec_version.to_dict())
619             if rspec_version.content_type in ['*', 'request']:
620                 request_rspec_versions.append(rspec_version.to_dict()) 
621         return {
622             'testbed':self.testbed_name(),
623             'geni_request_rspec_versions': request_rspec_versions,
624             'geni_ad_rspec_versions': ad_rspec_versions,
625             }
626           
627           
628           
629           
630           
631           
632     ##
633     # Convert SFA fields to PLC fields for use when registering up updating
634     # registry record in the PLC database
635     #
636     # @param type type of record (user, slice, ...)
637     # @param hrn human readable name
638     # @param sfa_fields dictionary of SFA fields
639     # @param slab_fields dictionary of PLC fields (output)
640
641     def sfa_fields_to_slab_fields(self, type, hrn, record):
642
643         def convert_ints(tmpdict, int_fields):
644             for field in int_fields:
645                 if field in tmpdict:
646                     tmpdict[field] = int(tmpdict[field])
647
648         slab_record = {}
649         #for field in record:
650         #    slab_record[field] = record[field]
651  
652         if type == "slice":
653             #instantion used in get_slivers ? 
654             if not "instantiation" in slab_record:
655                 slab_record["instantiation"] = "senslab-instantiated"
656             slab_record["hrn"] = hrn_to_pl_slicename(hrn)
657             print >>sys.stderr, "\r\n \r\n \t SLABDRIVER.PY sfa_fields_to_slab_fields slab_record %s hrn_to_pl_slicename(hrn) hrn %s " %(slab_record['hrn'], hrn)
658             if "url" in record:
659                slab_record["url"] = record["url"]
660             if "description" in record:
661                 slab_record["description"] = record["description"]
662             if "expires" in record:
663                 slab_record["expires"] = int(record["expires"])
664                 
665         #nodes added by OAR only and then imported to SFA
666         #elif type == "node":
667             #if not "hostname" in slab_record:
668                 #if not "hostname" in record:
669                     #raise MissingSfaInfo("hostname")
670                 #slab_record["hostname"] = record["hostname"]
671             #if not "model" in slab_record:
672                 #slab_record["model"] = "geni"
673                 
674         #One authority only 
675         #elif type == "authority":
676             #slab_record["login_base"] = hrn_to_slab_login_base(hrn)
677
678             #if not "name" in slab_record:
679                 #slab_record["name"] = hrn
680
681             #if not "abbreviated_name" in slab_record:
682                 #slab_record["abbreviated_name"] = hrn
683
684             #if not "enabled" in slab_record:
685                 #slab_record["enabled"] = True
686
687             #if not "is_public" in slab_record:
688                 #slab_record["is_public"] = True
689
690         return slab_record
691
692                    
693     def LaunchExperimentOnOAR(self,  slice_dict, added_nodes, slice_user=None):
694        
695         site_list = []
696         nodeid_list =[]
697         resource = ""
698         reqdict = {}
699         slice_name = slice_dict['name']
700         try:
701             slot = slice_dict['timeslot'] 
702             print>>sys.stderr, "\r\n \r\n \t\tLaunchExperimentOnOAR slot %s   " %(slot)
703         except KeyError:
704             #Running on default parameters
705             #XP immediate , 10 mins
706             slot = {'date':None,'start_time':None, 'timezone':None,'duration':None }#10 min 
707             
708             
709         reqdict['property'] ="network_address in ("
710         for node in added_nodes:
711             #Get the ID of the node : remove the root auth and put the site in a separate list
712             s=node.split(".")
713             # NT: it's not clear for me if the nodenames will have the senslab prefix
714             # so lets take the last part only, for now.
715             lastpart=s[-1]
716             #if s[0] == self.root_auth :
717             # Again here it's not clear if nodes will be prefixed with <site>_, lets split and tanke the last part for now.
718             s=lastpart.split("_")
719             nodeid=s[-1]
720             reqdict['property'] += "'"+ nodeid +"', "
721             nodeid_list.append(nodeid)
722             #site_list.append( l[0] )
723             
724             
725         reqdict['property'] =  reqdict['property'][0: len( reqdict['property'])-2] +")"
726         reqdict['resource'] ="network_address="+ str(len(nodeid_list))
727         
728         if slot['duration']:
729             walltime = slot['duration'].split(":")
730             # Fixing the walltime by adding a few delays. First put the walltime in seconds
731             # oarAdditionalDelay = 20; additional delay for /bin/sleep command to
732             # take in account  prologue and epilogue scripts execution
733             # int walltimeAdditionalDelay = 120;  additional delay
734
735             desired_walltime =  int(walltime[0])*3600 + int(walltime[1]) * 60 + int(walltime[2])
736             total_walltime = desired_walltime + 140 #+2 min 20
737             sleep_walltime = desired_walltime + 20 #+20 sec
738             print>>sys.stderr, "\r\n \r\n \t\tLaunchExperimentOnOAR desired_walltime %s  total_walltime %s sleep_walltime %s  " %(desired_walltime,total_walltime,sleep_walltime)
739             #Put the walltime back in str form
740             #First get the hours
741             walltime[0] = str(total_walltime / 3600)
742             total_walltime = total_walltime - 3600 * int(walltime[0])
743             #Get the remaining minutes
744             walltime[1] = str(total_walltime / 60)
745             total_walltime =  total_walltime - 60 * int(walltime[1])
746             #Get the seconds
747             walltime[2] = str(total_walltime)
748             print>>sys.stderr, "\r\n \r\n \t\tLaunchExperimentOnOAR  walltime %s " %(walltime)
749
750             reqdict['resource']+= ",walltime=" + str(walltime[0]) + ":" + str(walltime[1]) + ":" + str(walltime[2]) 
751             reqdict['script_path'] = "/bin/sleep " + str(sleep_walltime)
752         else:
753             reqdict['resource']+= ",walltime=" + str(00) + ":" + str(12) + ":" + str(20) #+2 min 20
754             reqdict['script_path'] = "/bin/sleep 620" #+20 sec    
755         #In case of a scheduled experiment (not immediate)
756         #To run an XP immediately, don't specify date and time in RSpec 
757         #They will be set to None.
758         if slot['date'] and slot['start_time']:
759             if slot['timezone'] is '' or slot['timezone'] is None:
760                 #assume it is server timezone
761                 server_timestamp,server_tz = self.GetTimezone()
762                 from_zone=tz.gettz(server_tz) 
763                 print>>sys.stderr, "\r\n \r\n \t\tLaunchExperimentOnOAR  timezone not specified  server_tz %s from_zone  %s" %(server_tz,from_zone) 
764             else:
765                 #Get zone of the user from the reservation time given in the rspec
766                 from_zone = tz.gettz(slot['timezone'])  
767                    
768             date = str(slot['date'])  + " " + str(slot['start_time'])
769             user_datetime = datetime.datetime.strptime(date, self.time_format)
770             user_datetime = user_datetime.replace(tzinfo = from_zone)
771             
772             #Convert to UTC zone
773             to_zone = tz.tzutc()
774             utc_date = user_datetime.astimezone(to_zone)
775             #Readable time accpeted by OAR
776             reqdict['reservation']= utc_date.strftime(self.time_format)
777         
778             print>>sys.stderr, "\r\n \r\n \t\tLaunchExperimentOnOAR  reqdict['reservation'] %s " %(reqdict['reservation'])
779             
780         else:
781             # Immediate XP
782             # reservations are performed in the oar server timebase, so :
783             # 1- we get the server time(in UTC tz )/server timezone
784             # 2- convert the server UTC time in its timezone
785             # 3- add a custom delay to this time
786             # 4- convert this time to a readable form and it for the reservation request.
787             server_timestamp,server_tz = self.GetTimezone()
788             s_tz=tz.gettz(server_tz)
789             UTC_zone = tz.gettz("UTC")
790             #weird... datetime.fromtimestamp should work since we do from datetime import datetime
791             utc_server= datetime.datetime.fromtimestamp(float(server_timestamp)+20,UTC_zone)
792             server_localtime=utc_server.astimezone(s_tz)
793     
794             print>>sys.stderr, "\r\n \r\n \t\tLaunchExperimentOnOAR server_timestamp %s server_tz %s slice_name %s added_nodes %s username %s reqdict %s " %(server_timestamp,server_tz,slice_name,added_nodes,slice_user, reqdict )
795             readable_time = server_localtime.strftime(self.time_format)
796
797             print >>sys.stderr,"  \r\n \r\n \t\t\t\tAPRES ParseTimezone readable_time %s timestanp %s  " %(readable_time ,server_timestamp)
798             reqdict['reservation'] = readable_time
799         
800
801         reqdict['type'] = "deploy" 
802         reqdict['directory']= ""
803         reqdict['name']= "TestSandrine"
804        
805          
806         # first step : start the OAR job and update the job 
807         print>>sys.stderr, "\r\n \r\n LaunchExperimentOnOAR reqdict   %s \r\n site_list   %s"  %(reqdict,site_list)   
808        
809         answer = self.oar.POSTRequestToOARRestAPI('POST_job',reqdict,slice_user)
810         print>>sys.stderr, "\r\n \r\n LaunchExperimentOnOAR jobid   %s "  %(answer)
811         try:       
812             jobid = answer['id']
813         except KeyError:
814              print>>sys.stderr, "\r\n AddSliceTonode Impossible to create job  %s "  %( answer)
815              return
816         
817         print>>sys.stderr, "\r\n \r\n LaunchExperimentOnOAR jobid    %s added_nodes  %s slice_user %s"  %(jobid,added_nodes,slice_user)
818         self.db.update_job( slice_name, jobid ,added_nodes)
819         
820           
821         # second step : configure the experiment
822         # we need to store the nodes in a yaml (well...) file like this :
823         # [1,56,23,14,45,75] with name /tmp/sfa<jobid>.json
824         f=open('/tmp/sfa/'+str(jobid)+'.json','w')
825         f.write('[')
826         f.write(str(added_nodes[0].strip('node')))
827         for node in added_nodes[1:len(added_nodes)] :
828             f.write(','+node.strip('node'))
829         f.write(']')
830         f.close()
831         
832         # third step : call the senslab-experiment wrapper
833         #command= "java -jar target/sfa-1.0-jar-with-dependencies.jar "+str(jobid)+" "+slice_user
834         javacmdline="/usr/bin/java"
835         jarname="/opt/senslabexperimentwrapper/sfa-1.0-jar-with-dependencies.jar"
836         #ret=subprocess.check_output(["/usr/bin/java", "-jar", ", str(jobid), slice_user])
837         output = subprocess.Popen([javacmdline, "-jar", jarname, str(jobid), slice_user],stdout=subprocess.PIPE).communicate()[0]
838
839         print>>sys.stderr, "\r\n \r\n LaunchExperimentOnOAR wrapper returns   %s "  %(output)
840         return 
841                  
842  
843     #Delete the jobs and updates the job id in the senslab table
844     #to set it to -1  
845     #Does not clear the node list 
846     def DeleteSliceFromNodes(self, slice_record):
847          # Get user information
848        
849         self.DeleteJobs(slice_record['oar_job_id'], slice_record['hrn'])
850         self.db.update_job(slice_record['hrn'], job_id = -1)
851         return   
852     
853  
854
855  
856             
857     def augment_records_with_testbed_info (self, sfa_records):
858         return self.fill_record_info (sfa_records)
859     
860     def fill_record_info(self, records):
861         """
862         Given a SFA record, fill in the senslab specific and SFA specific
863         fields in the record. 
864         """
865                     
866         print >>sys.stderr, "\r\n \t\t  SLABDRIVER.PY fill_record_info 000000000 fill_record_info %s  " %(records)
867         if not isinstance(records, list):
868             records = [records]
869
870         parkour = records 
871         try:
872             for record in parkour:
873                     
874                 if str(record['type']) == 'slice':
875                     #print >>sys.stderr, "\r\n \t\t  SLABDRIVER.PY  fill_record_info \t \t record %s" %(record)
876                     #sfatable = SfaTable()
877                     
878                     #existing_records_by_id = {}
879                     #all_records = dbsession.query(RegRecord).all()
880                     #for rec in all_records:
881                         #existing_records_by_id[rec.record_id] = rec
882                     #print >>sys.stderr, "\r\n \t\t SLABDRIVER.PY  fill_record_info \t\t existing_records_by_id %s" %(existing_records_by_id[record['record_id']])
883                         
884                     #recslice = self.db.find('slice',{'slice_hrn':str(record['hrn'])}) 
885                     #recslice = slab_dbsession.query(SliceSenslab).filter_by(slice_hrn = str(record['hrn'])).first()
886                     recslice = self.GetSlices(slice_filter =  str(record['hrn']), filter_type = 'slice_hrn')
887                     #print >>sys.stderr, "\r\n \t\t  SLABDRIVER.PY fill_record_info \t\t HOY HOY reclise %s" %(recslice)
888                     #if isinstance(recslice,list) and len(recslice) == 1:
889                         #recslice = recslice[0]
890                     
891                     recuser = dbsession.query(RegRecord).filter_by(record_id = recslice['record_id_user']).first()
892                     #existing_records_by_id[recslice['record_id_user']]
893                     #print >>sys.stderr, "\r\n \t\t  SLABDRIVER.PY fill_record_info \t\t recuser %s" %(recuser)
894                     
895             
896                     record.update({'PI':[recuser.hrn],
897                     'researcher': [recuser.hrn],
898                     'name':record['hrn'], 
899                     'oar_job_id':recslice['oar_job_id'],
900                     'node_ids': [],
901                     'person_ids':[recslice['record_id_user']],
902                     'geni_urn':'',  #For client_helper.py compatibility
903                     'keys':'',  #For client_helper.py compatibility
904                     'key_ids':''})  #For client_helper.py compatibility
905                     
906                 elif str(record['type']) == 'user':
907                     #Add the data about slice
908                     rec = self.GetSlices(slice_filter = record['record_id'], filter_type = 'record_id_user')
909                     print >>sys.stderr, "\r\n \t\t  SLABDRIVER.PY fill_record_info USEEEEEEEEEERDESU!  rec %s \r\n \t rec['record_id_user'] %s " %(rec,rec['record_id_user']) 
910                     #Append record in records list, therfore fetches user and slice info again(one more loop)
911                     #Will update PIs and researcher for the slice
912                     recuser = dbsession.query(RegRecord).filter_by(record_id = rec['record_id_user']).first()
913                     rec.update({'PI':[recuser.hrn],
914                     'researcher': [recuser.hrn],
915                     'name':record['hrn'], 
916                     'oar_job_id':rec['oar_job_id'],
917                     'node_ids': [],
918                     'person_ids':[rec['record_id_user']]})
919                     #retourne une liste 100512
920                     
921                     #GetPersons takes [] as filters 
922                     user_slab = self.GetPersons([{'hrn':recuser.hrn}])
923                     
924
925                     rec.update({'type':'slice','hrn':rec['slice_hrn']})
926                     record.update(user_slab[0])
927                     #For client_helper.py compatibility
928                     record.update( { 'geni_urn':'',
929                     'keys':'',
930                     'key_ids':'' })                
931                     records.append(rec)
932                     
933                     print >>sys.stderr, "\r\n \t\t  SLABDRIVER.PY fill_record_info ADDING SLICEINFO TO USER records %s" %(records) 
934                     
935             print >>sys.stderr, "\r\n \t\t  SLABDRIVER.PY fill_record_info OKrecords %s" %(records) 
936         except TypeError:
937             print >>sys.stderr, "\r\n \t\t SLABDRIVER fill_record_info  EXCEPTION RECORDS : %s" %(records)      
938         return
939         
940         #self.fill_record_slab_info(records)
941         ##print >>sys.stderr, "\r\n \t\t after fill_record_slab_info %s" %(records)     
942         #self.fill_record_sfa_info(records)
943         #print >>sys.stderr, "\r\n \t\t after fill_record_sfa_info"
944         
945         
946
947     
948         
949     #def update_membership_list(self, oldRecord, record, listName, addFunc, delFunc):
950         ## get a list of the HRNs tht are members of the old and new records
951         #if oldRecord:
952             #oldList = oldRecord.get(listName, [])
953         #else:
954             #oldList = []     
955         #newList = record.get(listName, [])
956
957         ## if the lists are the same, then we don't have to update anything
958         #if (oldList == newList):
959             #return
960
961         ## build a list of the new person ids, by looking up each person to get
962         ## their pointer
963         #newIdList = []
964         #table = SfaTable()
965         #records = table.find({'type': 'user', 'hrn': newList})
966         #for rec in records:
967             #newIdList.append(rec['pointer'])
968
969         ## build a list of the old person ids from the person_ids field 
970         #if oldRecord:
971             #oldIdList = oldRecord.get("person_ids", [])
972             #containerId = oldRecord.get_pointer()
973         #else:
974             ## if oldRecord==None, then we are doing a Register, instead of an
975             ## update.
976             #oldIdList = []
977             #containerId = record.get_pointer()
978
979     ## add people who are in the new list, but not the oldList
980         #for personId in newIdList:
981             #if not (personId in oldIdList):
982                 #addFunc(self.plauth, personId, containerId)
983
984         ## remove people who are in the old list, but not the new list
985         #for personId in oldIdList:
986             #if not (personId in newIdList):
987                 #delFunc(self.plauth, personId, containerId)
988
989     #def update_membership(self, oldRecord, record):
990         #print >>sys.stderr, " \r\n \r\n ***SLABDRIVER.PY update_membership record ", record
991         #if record.type == "slice":
992             #self.update_membership_list(oldRecord, record, 'researcher',
993                                         #self.users.AddPersonToSlice,
994                                         #self.users.DeletePersonFromSlice)
995         #elif record.type == "authority":
996             ## xxx TODO
997             #pass
998
999 ### thierry
1000 # I don't think you plan on running a component manager at this point
1001 # let me clean up the mess of ComponentAPI that is deprecated anyways