Cleaning.
[sfa.git] / sfa / senslab / slabdriver.py
1 import sys
2 import subprocess
3
4 from datetime import datetime
5 from dateutil import tz 
6 from time import strftime,gmtime
7
8 from sfa.util.faults import MissingSfaInfo , SliverDoesNotExist
9 from sfa.util.sfalogging import logger
10 from sfa.util.defaultdict import defaultdict
11
12 from sfa.storage.record import Record
13 from sfa.storage.alchemy import dbsession
14 from sfa.storage.model import RegRecord
15
16
17 from sfa.trust.certificate import *
18 from sfa.trust.credential import *
19 from sfa.trust.gid import GID
20
21 from sfa.managers.driver import Driver
22 from sfa.rspecs.version_manager import VersionManager
23 from sfa.rspecs.rspec import RSpec
24
25 from sfa.util.xrn import hrn_to_urn, urn_to_sliver_id
26 from sfa.util.plxrn import slicename_to_hrn, hostname_to_hrn, hrn_to_pl_slicename
27
28 ## thierry: everything that is API-related (i.e. handling incoming requests) 
29 # is taken care of 
30 # SlabDriver should be really only about talking to the senslab testbed
31
32 ## thierry : please avoid wildcard imports :)
33 from sfa.senslab.OARrestapi import  OARrestapi
34 from sfa.senslab.LDAPapi import LDAPapi
35
36 from sfa.senslab.parsing import parse_filter
37 from sfa.senslab.slabpostgres import SlabDB, slab_dbsession,SliceSenslab
38 from sfa.senslab.slabaggregate import SlabAggregate
39 from sfa.senslab.slabslices import SlabSlices
40
41 def list_to_dict(recs, key):
42     """
43     convert a list of dictionaries into a dictionary keyed on the 
44     specified dictionary key 
45     """
46
47     keys = [rec[key] for rec in recs]
48     return dict(zip(keys, recs))
49
50 # thierry : note
51 # this inheritance scheme is so that the driver object can receive
52 # GetNodes or GetSites sorts of calls directly
53 # and thus minimize the differences in the managers with the pl version
54 class SlabDriver(Driver):
55
56     def __init__(self, config):
57         Driver.__init__ (self, config)
58         self.config=config
59         self.hrn = config.SFA_INTERFACE_HRN
60     
61         self.root_auth = config.SFA_REGISTRY_ROOT_AUTH
62
63         
64         print >>sys.stderr, "\r\n_____________ SFA SENSLAB DRIVER \r\n" 
65
66         self.oar = OARrestapi()
67         self.ldap = LDAPapi()
68         self.time_format = "%Y-%m-%d %H:%M:%S"
69         self.db = SlabDB(config)
70         self.cache=None
71         
72     
73     def sliver_status(self,slice_urn,slice_hrn):
74         # receive a status request for slice named urn/hrn urn:publicid:IDN+senslab+nturro_slice hrn senslab.nturro_slice
75         # shall return a structure as described in
76         # http://groups.geni.net/geni/wiki/GAPI_AM_API_V2#SliverStatus
77         # NT : not sure if we should implement this or not, but used by sface.
78         
79
80         sl = self.GetSlices(slice_filter= slice_hrn, filter_type = 'slice_hrn')
81         if len(sl) is 0:
82             raise SliverDoesNotExist("%s  slice_hrn" % (slice_hrn))
83
84         print >>sys.stderr, "\r\n \r\n_____________ Sliver status urn %s hrn %s sl %s \r\n " %(slice_urn,slice_hrn,sl)
85         if sl['oar_job_id'] is not -1:
86     
87             # report about the local nodes only
88             nodes_all = self.GetNodes({'hostname':sl['node_ids']},
89                             ['node_id', 'hostname','site','boot_state'])
90             nodeall_byhostname = dict([(n['hostname'], n) for n in nodes_all])
91             nodes = sl['node_ids']
92             if len(nodes) is 0:
93                 raise SliverDoesNotExist("No slivers allocated ") 
94                     
95
96             result = {}
97             top_level_status = 'unknown'
98             if nodes:
99                 top_level_status = 'ready'
100             result['geni_urn'] = slice_urn
101             result['pl_login'] = sl['job_user']
102             #result['slab_login'] = sl['job_user']
103             
104             timestamp = float(sl['startTime']) + float(sl['walltime']) 
105             result['pl_expires'] = strftime(self.time_format, gmtime(float(timestamp)))
106             #result['slab_expires'] = strftime(self.time_format, gmtime(float(timestamp)))
107             
108             resources = []
109             for node in nodes:
110                 res = {}
111                 #res['slab_hostname'] = node['hostname']
112                 #res['slab_boot_state'] = node['boot_state']
113                 
114                 res['pl_hostname'] = nodeall_byhostname[node]['hostname']
115                 res['pl_boot_state'] = nodeall_byhostname[node]['boot_state']
116                 res['pl_last_contact'] = strftime(self.time_format, gmtime(float(timestamp)))
117                 sliver_id = urn_to_sliver_id(slice_urn, sl['record_id_slice'],nodeall_byhostname[node]['node_id'] ) 
118                 res['geni_urn'] = sliver_id 
119                 if nodeall_byhostname[node]['boot_state'] == 'Alive':
120
121                     res['geni_status'] = 'ready'
122                 else:
123                     res['geni_status'] = 'failed'
124                     top_level_status = 'failed' 
125                     
126                 res['geni_error'] = ''
127         
128                 resources.append(res)
129                 
130             result['geni_status'] = top_level_status
131             result['geni_resources'] = resources 
132             print >>sys.stderr, "\r\n \r\n_____________ Sliver status resources %s res %s \r\n " %(resources,res)
133             return result        
134         
135         
136     def create_sliver (self, slice_urn, slice_hrn, creds, rspec_string, users, options):
137         print>>sys.stderr, "\r\n \r\n \t=============================== SLABDRIVER.PY create_sliver "
138         aggregate = SlabAggregate(self)
139         
140         slices = SlabSlices(self)
141         peer = slices.get_peer(slice_hrn)
142         sfa_peer = slices.get_sfa_peer(slice_hrn)
143         slice_record=None 
144  
145         if not isinstance(creds, list):
146             creds = [creds]
147     
148         if users:
149             slice_record = users[0].get('slice_record', {})
150     
151         # parse rspec
152         rspec = RSpec(rspec_string)
153         print>>sys.stderr, "\r\n \r\n \t=============================== SLABDRIVER.PY create_sliver  ============================rspec.version %s " %(rspec.version)
154         
155         
156         # ensure site record exists?
157         # ensure slice record exists
158         slice = slices.verify_slice(slice_hrn, slice_record, peer, sfa_peer, options=options)
159         requested_attributes = rspec.version.get_slice_attributes()
160         
161         if requested_attributes:
162             for attrib_dict in requested_attributes:
163                 if 'timeslot' in attrib_dict and attrib_dict['timeslot'] is not None:
164                     slice.update({'timeslot':attrib_dict['timeslot']})
165         print >>sys.stderr, "\r\n \r\n \t=============================== SLABDRIVER.PY create_sliver  ..... slice %s " %(slice)
166         # ensure person records exists
167         persons = slices.verify_persons(slice_hrn, slice, users, peer, sfa_peer, options=options)
168         # ensure slice attributes exists?
169
170         
171         # add/remove slice from nodes 
172         print >>sys.stderr, "\r\n \r\n \t=============================== SLABDRIVER.PY create_sliver  ..... " 
173        
174         requested_slivers = [node.get('component_name') for node in rspec.version.get_nodes_with_slivers()]
175         print >>sys.stderr, "\r\n \r\n \t=============================== ........... requested_slivers ============================requested_slivers %s " %(requested_slivers)
176         nodes = slices.verify_slice_nodes(slice, requested_slivers, peer) 
177     
178         
179         return aggregate.get_rspec(slice_xrn=slice_urn, version=rspec.version)
180         
181         
182     def delete_sliver (self, slice_urn, slice_hrn, creds, options):
183         
184         slice = self.GetSlices(slice_filter= slice_hrn, filter_type = 'slice_hrn')
185         print>>sys.stderr, "\r\n \r\n \t\t  SLABDRIVER.PY delete_sliver slice %s" %(slice)
186         if not slice:
187             return 1
188        
189         slices = SlabSlices(self)
190         # determine if this is a peer slice
191         # xxx I wonder if this would not need to use PlSlices.get_peer instead 
192         # in which case plc.peers could be deprecated as this here
193         # is the only/last call to this last method in plc.peers
194         peer = slices.get_peer(slice_hrn)
195         try:
196             if peer:
197                 self.UnBindObjectFromPeer('slice', slice['record_id_slice'], peer)
198             self.DeleteSliceFromNodes(slice)
199         finally:
200             if peer:
201                 self.BindObjectToPeer('slice', slice['slice_id'], peer, slice['peer_slice_id'])
202         return 1
203             
204             
205     def AddSlice(self, slice_record):
206         slab_slice = SliceSenslab( slice_hrn = slice_record['slice_hrn'],  record_id_slice= slice_record['record_id_slice'] , record_id_user= slice_record['record_id_user'], peer_authority = slice_record['peer_authority'])
207         print>>sys.stderr, "\r\n \r\n \t\t\t =======SLABDRIVER.PY AddSlice slice_record %s slab_slice %s" %(slice_record,slab_slice)
208         slab_dbsession.add(slab_slice)
209         slab_dbsession.commit()
210         return
211         
212     # first 2 args are None in case of resource discovery
213     def list_resources (self, slice_urn, slice_hrn, creds, options):
214         #cached_requested = options.get('cached', True) 
215     
216         version_manager = VersionManager()
217         # get the rspec's return format from options
218         rspec_version = version_manager.get_version(options.get('geni_rspec_version'))
219         version_string = "rspec_%s" % (rspec_version)
220     
221         #panos adding the info option to the caching key (can be improved)
222         if options.get('info'):
223             version_string = version_string + "_"+options.get('info', 'default')
224     
225         # look in cache first
226         #if cached_requested and self.cache and not slice_hrn:
227             #rspec = self.cache.get(version_string)
228             #if rspec:
229                 #logger.debug("SlabDriver.ListResources: returning cached advertisement")
230                 #return rspec 
231     
232         #panos: passing user-defined options
233
234         aggregate = SlabAggregate(self)
235         origin_hrn = Credential(string=creds[0]).get_gid_caller().get_hrn()
236         options.update({'origin_hrn':origin_hrn})
237         rspec =  aggregate.get_rspec(slice_xrn=slice_urn, version=rspec_version, 
238                                      options=options)
239         print>>sys.stderr, " \r\n \r\n \t SLABDRIVER list_resources rspec " 
240         # cache the result
241         #if self.cache and not slice_hrn:
242             #logger.debug("Slab.ListResources: stores advertisement in cache")
243             #self.cache.add(version_string, rspec)
244     
245         return rspec
246         
247         
248     def list_slices (self, creds, options):
249         # look in cache first
250         #if self.cache:
251             #slices = self.cache.get('slices')
252             #if slices:
253                 #logger.debug("PlDriver.list_slices returns from cache")
254                 #return slices
255     
256         # get data from db 
257         print>>sys.stderr, " \r\n \t\t SLABDRIVER.PY list_slices"
258         slices = self.GetSlices()
259         slice_hrns = [slicename_to_hrn(self.hrn, slice['slice_hrn']) for slice in slices]
260         slice_urns = [hrn_to_urn(slice_hrn, 'slice') for slice_hrn in slice_hrns]
261     
262         # cache the result
263         #if self.cache:
264             #logger.debug ("SlabDriver.list_slices stores value in cache")
265             #self.cache.add('slices', slice_urns) 
266     
267         return slice_urns
268     
269     #No site or node register supported
270     def register (self, sfa_record, hrn, pub_key):
271         type = sfa_record['type']
272         slab_record = self.sfa_fields_to_slab_fields(type, hrn, sfa_record)
273     
274
275         if type == 'slice':
276             acceptable_fields=['url', 'instantiation', 'name', 'description']
277             for key in slab_record.keys():
278                 if key not in acceptable_fields:
279                     slab_record.pop(key) 
280             print>>sys.stderr, " \r\n \t\t SLABDRIVER.PY register"
281             slices = self.GetSlices(slice_filter =slab_record['hrn'], filter_type = 'slice_hrn')
282             if not slices:
283                     pointer = self.AddSlice(slab_record)
284             else:
285                     pointer = slices[0]['slice_id']
286     
287         elif type == 'user':
288             persons = self.GetPersons([sfa_record['hrn']])
289             if not persons:
290                 pointer = self.AddPerson(dict(sfa_record))
291                 #add in LDAP 
292             else:
293                 pointer = persons[0]['person_id']
294                 
295             #Does this make sense to senslab ?
296             #if 'enabled' in sfa_record and sfa_record['enabled']:
297                 #self.UpdatePerson(pointer, {'enabled': sfa_record['enabled']})
298                 
299             # add this person to the site only if she is being added for the first
300             # time by sfa and doesont already exist in plc
301             if not persons or not persons[0]['site_ids']:
302                 login_base = get_leaf(sfa_record['authority'])
303                 self.AddPersonToSite(pointer, login_base)
304     
305             # What roles should this user have?
306             self.AddRoleToPerson('user', pointer)
307             # Add the user's key
308             if pub_key:
309                 self.AddPersonKey(pointer, {'key_type' : 'ssh', 'key' : pub_key})
310                 
311         #No node adding outside OAR
312
313         return pointer
314             
315     #No site or node record update allowed       
316     def update (self, old_sfa_record, new_sfa_record, hrn, new_key):
317         pointer = old_sfa_record['pointer']
318         type = old_sfa_record['type']
319
320         # new_key implemented for users only
321         if new_key and type not in [ 'user' ]:
322             raise UnknownSfaType(type)
323         
324         #if (type == "authority"):
325             #self.shell.UpdateSite(pointer, new_sfa_record)
326     
327         if type == "slice":
328             slab_record=self.sfa_fields_to_slab_fields(type, hrn, new_sfa_record)
329             if 'name' in slab_record:
330                 slab_record.pop('name')
331                 self.UpdateSlice(pointer, slab_record)
332     
333         elif type == "user":
334             update_fields = {}
335             all_fields = new_sfa_record
336             for key in all_fields.keys():
337                 if key in ['first_name', 'last_name', 'title', 'email',
338                            'password', 'phone', 'url', 'bio', 'accepted_aup',
339                            'enabled']:
340                     update_fields[key] = all_fields[key]
341             self.UpdatePerson(pointer, update_fields)
342     
343             if new_key:
344                 # must check this key against the previous one if it exists
345                 persons = self.GetPersons([pointer], ['key_ids'])
346                 person = persons[0]
347                 keys = person['key_ids']
348                 keys = self.GetKeys(person['key_ids'])
349                 
350                 # Delete all stale keys
351                 key_exists = False
352                 for key in keys:
353                     if new_key != key['key']:
354                         self.DeleteKey(key['key_id'])
355                     else:
356                         key_exists = True
357                 if not key_exists:
358                     self.AddPersonKey(pointer, {'key_type': 'ssh', 'key': new_key})
359
360
361         return True
362         
363
364     def remove (self, sfa_record):
365         type=sfa_record['type']
366         hrn=sfa_record['hrn']
367         record_id= sfa_record['record_id']
368         if type == 'user':
369             username = hrn.split(".")[len(hrn.split(".")) -1]
370             #get user in ldap
371             persons = self.GetPersons(username)
372             # only delete this person if he has site ids. if he doesnt, it probably means
373             # he was just removed from a site, not actually deleted
374             if persons and persons[0]['site_ids']:
375                 self.DeletePerson(username)
376         elif type == 'slice':
377             if self.GetSlices(slice_filter = hrn, filter_type = 'slice_hrn'):
378                 self.DeleteSlice(hrn)
379
380         #elif type == 'authority':
381             #if self.GetSites(pointer):
382                 #self.DeleteSite(pointer)
383
384         return True
385             
386     def GetPeers (self,auth = None, peer_filter=None, return_fields=None):
387
388         existing_records = {}
389         existing_hrns_by_types= {}
390         print >>sys.stderr, "\r\n \r\n SLABDRIVER GetPeers auth = %s, peer_filter %s, return_field %s " %(auth , peer_filter, return_fields)
391         all_records = dbsession.query(RegRecord).filter(RegRecord.type.like('%authority%')).all()
392         for record in all_records:
393             existing_records[(record.hrn,record.type)] = record
394             if record.type not in existing_hrns_by_types:
395                 existing_hrns_by_types[record.type] = [record.hrn]
396                 print >>sys.stderr, "\r\n \r\n SLABDRIVER GetPeers \t NOT IN existing_hrns_by_types %s " %( existing_hrns_by_types)
397             else:
398                 
399                 print >>sys.stderr, "\r\n \r\n SLABDRIVER GetPeers \t INNN  type %s hrn %s " %( record.type,record.hrn )
400                 existing_hrns_by_types[record.type].append(record.hrn)
401                 print >>sys.stderr, "\r\n \r\n SLABDRIVER GetPeers \t INNN existing_hrns_by_types %s " %( existing_hrns_by_types)
402                 #existing_hrns_by_types.update({record.type:(existing_hrns_by_types[record.type].append(record.hrn))})
403                         
404         print >>sys.stderr, "\r\n \r\n SLABDRIVER GetPeers        existing_hrns_by_types %s " %( existing_hrns_by_types)
405         records_list= [] 
406       
407         try: 
408             print >>sys.stderr, "\r\n \r\n SLABDRIVER GetPeers  existing_hrns_by_types['authority+sa']  %s \t\t existing_records %s " %(existing_hrns_by_types['authority'],existing_records)
409             if peer_filter:
410                records_list.append(existing_records[(peer_filter,'authority')])
411             else :
412                 for hrn in existing_hrns_by_types['authority']:
413                     records_list.append(existing_records[(hrn,'authority')])
414                     
415             print >>sys.stderr, "\r\n \r\n SLABDRIVER GetPeers  records_list  %s " %(records_list)
416                 
417         except:
418                 pass
419                 
420         return_records = records_list
421         if not peer_filter and not return_fields:
422             return records_list
423         #return_records = parse_filter(records_list,peer_filter, 'peers', return_fields) 
424        
425         print >>sys.stderr, "\r\n \r\n SLABDRIVER GetPeers   return_records %s " %(return_records)
426         return return_records
427         
428      
429             
430     def GetPersons(self, person_filter=None, return_fields=None):
431         
432         #if isinstance(person_filter,list):
433             #for f in person_filter:
434                 #person = self.ldap.ldapSearch(f)
435         #if isinstance(person_filter,dict):    
436         person_list = self.ldap.ldapFindHrn({'authority': self.root_auth })
437         
438         #check = False
439         #if person_filter and isinstance(person_filter, dict):
440             #for k in  person_filter.keys():
441                 #if k in person_list[0].keys():
442                     #check = True
443                     
444         return_person_list = parse_filter(person_list,person_filter ,'persons', return_fields)
445         if return_person_list:
446             print>>sys.stderr, " \r\n GetPersons person_filter %s return_fields %s  " %(person_filter,return_fields)
447             return return_person_list
448
449     def GetTimezone(self):
450         server_timestamp,server_tz = self.oar.parser.SendRequest("GET_timezone")
451         return server_timestamp,server_tz
452     
453
454     def DeleteJobs(self, job_id, slice_hrn):
455         if not job_id:
456             return
457         username  = slice_hrn.split(".")[-1].rstrip("_slice")
458         reqdict = {}
459         reqdict['method'] = "delete"
460         reqdict['strval'] = str(job_id)
461         answer = self.oar.POSTRequestToOARRestAPI('DELETE_jobs_id',reqdict,username)
462         print>>sys.stderr, "\r\n \r\n  jobid  DeleteJobs %s "  %(answer)
463         
464                 
465     def GetJobs(self,job_id= None, resources=True,return_fields=None, username = None):
466         #job_resources=['reserved_resources', 'assigned_resources','job_id', 'job_uri', 'assigned_nodes',\
467         #'api_timestamp']
468         #assigned_res = ['resource_id', 'resource_uri']
469         #assigned_n = ['node', 'node_uri']
470      
471         if job_id and resources is False:
472             req = "GET_jobs_id"
473             node_list_k = 'assigned_network_address'
474            
475         if job_id and resources :
476             req = "GET_jobs_id_resources"
477             node_list_k = 'reserved_resources' 
478                
479         #Get job info from OAR    
480         job_info = self.oar.parser.SendRequest(req, job_id, username)
481         print>>sys.stderr, "\r\n \r\n \t\t GetJobs  %s " %(job_info)
482         
483         if 'state' in job_info :
484             if job_info['state'] == 'Terminated':
485                 print>>sys.stderr, "\r\n \r\n \t\t GetJobs TERMINELEBOUSIN "
486                 return None
487             if job_info['state'] == 'Error':
488                 print>>sys.stderr, "\r\n \r\n \t\t GetJobs ERROR message %s " %(job_info)
489                 return None
490         
491         #Get a dict of nodes . Key :hostname of the node
492         node_list = self.GetNodes() 
493         node_hostname_list = []
494         for node in node_list:
495             node_hostname_list.append(node['hostname'])
496         node_dict = dict(zip(node_hostname_list,node_list))
497         try :
498             liste =job_info[node_list_k] 
499             for k in range(len(liste)):
500                job_info[node_list_k][k] = node_dict[job_info[node_list_k][k]]['hostname']
501             
502             #Replaces the previous entry "assigned_network_address" / "reserved_resources"
503             #with "node_ids"
504             job_info.update({'node_ids':job_info[node_list_k]})
505             del job_info[node_list_k]
506             return job_info
507             
508         except KeyError:
509             print>>sys.stderr, "\r\n \r\n \t\t GetJobs KEYERROR " 
510             
511     def GetReservedNodes(self):
512         # this function returns a list of all the nodes already involved in an oar job
513
514        jobs=self.oar.parser.SendRequest("GET_jobs_details") 
515        nodes=[]
516        for j in jobs :
517           nodes=j['assigned_network_address']+nodes
518        return nodes
519      
520     def GetNodes(self,node_filter= None, return_fields=None):
521         node_dict =self.oar.parser.SendRequest("GET_resources_full")
522
523         return_node_list = []
524         if not (node_filter or return_fields):
525                 return_node_list = node_dict.values()
526                 return return_node_list
527     
528         return_node_list= parse_filter(node_dict.values(),node_filter ,'node', return_fields)
529         return return_node_list
530     
531   
532     def GetSites(self, site_filter = None, return_fields=None):
533         site_dict =self.oar.parser.SendRequest("GET_sites")
534         return_site_list = []
535         if not ( site_filter or return_fields):
536                 return_site_list = site_dict.values()
537                 return return_site_list
538     
539         return_site_list = parse_filter(site_dict.values(), site_filter,'site', return_fields)
540         return return_site_list
541         
542
543     def GetSlices(self,slice_filter = None, filter_type = None, return_fields=None):
544         return_slice_list = []
545         slicerec  = {}
546         rec = {}
547         ftypes = ['slice_hrn', 'record_id_user']
548         if filter_type and filter_type in ftypes:
549             if filter_type == 'slice_hrn':
550                 slicerec = slab_dbsession.query(SliceSenslab).filter_by(slice_hrn = slice_filter).first()    
551             if filter_type == 'record_id_user':
552                 slicerec = slab_dbsession.query(SliceSenslab).filter_by(record_id_user = slice_filter).first()
553                 
554             if slicerec:
555                 rec = slicerec.dumpquerytodict()
556                 login = slicerec.slice_hrn.split(".")[1].split("_")[0]
557                 #print >>sys.stderr, " \r\n \r\n \tSLABDRIVER.PY slicerec GetSlices   %s " %(slicerec)
558                 if slicerec.oar_job_id is not -1:
559                     rslt = self.GetJobs( slicerec.oar_job_id, resources=False, username = login )
560                     #print >>sys.stderr, " \r\n \r\n \tSLABDRIVER.PY  GetSlices  GetJobs  %s " %(rslt)     
561                     if rslt :
562                         rec.update(rslt)
563                         rec.update({'hrn':str(rec['slice_hrn'])})
564                         #If GetJobs is empty, this means the job is now in the 'Terminated' state
565                         #Update the slice record
566                     else :
567                         self.db.update_job(slice_filter, job_id = -1)
568                         rec['oar_job_id'] = -1
569                         rec.update({'hrn':str(rec['slice_hrn'])})
570             
571                 try:
572                     rec['node_ids'] = rec['node_list']
573                 except KeyError:
574                     pass
575                 
576                 #print >>sys.stderr, " \r\n \r\n \tSLABDRIVER.PY  GetSlices  rec  %s" %(rec)
577                               
578             return rec
579                 
580                 
581         else:
582             return_slice_list = slab_dbsession.query(SliceSenslab).all()
583
584         print >>sys.stderr, " \r\n \r\n \tSLABDRIVER.PY  GetSlices  slices %s slice_filter %s " %(return_slice_list,slice_filter)
585         
586         #if return_fields:
587             #return_slice_list  = parse_filter(sliceslist, slice_filter,'slice', return_fields)
588         
589         
590                     
591         return return_slice_list
592         
593
594         
595     
596     def testbed_name (self): return "senslab2" 
597          
598     # 'geni_request_rspec_versions' and 'geni_ad_rspec_versions' are mandatory
599     def aggregate_version (self):
600         version_manager = VersionManager()
601         ad_rspec_versions = []
602         request_rspec_versions = []
603         for rspec_version in version_manager.versions:
604             if rspec_version.content_type in ['*', 'ad']:
605                 ad_rspec_versions.append(rspec_version.to_dict())
606             if rspec_version.content_type in ['*', 'request']:
607                 request_rspec_versions.append(rspec_version.to_dict()) 
608         return {
609             'testbed':self.testbed_name(),
610             'geni_request_rspec_versions': request_rspec_versions,
611             'geni_ad_rspec_versions': ad_rspec_versions,
612             }
613           
614           
615           
616           
617           
618           
619     ##
620     # Convert SFA fields to PLC fields for use when registering up updating
621     # registry record in the PLC database
622     #
623     # @param type type of record (user, slice, ...)
624     # @param hrn human readable name
625     # @param sfa_fields dictionary of SFA fields
626     # @param slab_fields dictionary of PLC fields (output)
627
628     def sfa_fields_to_slab_fields(self, type, hrn, record):
629
630         def convert_ints(tmpdict, int_fields):
631             for field in int_fields:
632                 if field in tmpdict:
633                     tmpdict[field] = int(tmpdict[field])
634
635         slab_record = {}
636         #for field in record:
637         #    slab_record[field] = record[field]
638  
639         if type == "slice":
640             #instantion used in get_slivers ? 
641             if not "instantiation" in slab_record:
642                 slab_record["instantiation"] = "senslab-instantiated"
643             slab_record["hrn"] = hrn_to_pl_slicename(hrn)
644             print >>sys.stderr, "\r\n \r\n \t SLABDRIVER.PY sfa_fields_to_slab_fields slab_record %s hrn_to_pl_slicename(hrn) hrn %s " %(slab_record['hrn'], hrn)
645             if "url" in record:
646                slab_record["url"] = record["url"]
647             if "description" in record:
648                 slab_record["description"] = record["description"]
649             if "expires" in record:
650                 slab_record["expires"] = int(record["expires"])
651                 
652         #nodes added by OAR only and then imported to SFA
653         #elif type == "node":
654             #if not "hostname" in slab_record:
655                 #if not "hostname" in record:
656                     #raise MissingSfaInfo("hostname")
657                 #slab_record["hostname"] = record["hostname"]
658             #if not "model" in slab_record:
659                 #slab_record["model"] = "geni"
660                 
661         #One authority only 
662         #elif type == "authority":
663             #slab_record["login_base"] = hrn_to_slab_login_base(hrn)
664
665             #if not "name" in slab_record:
666                 #slab_record["name"] = hrn
667
668             #if not "abbreviated_name" in slab_record:
669                 #slab_record["abbreviated_name"] = hrn
670
671             #if not "enabled" in slab_record:
672                 #slab_record["enabled"] = True
673
674             #if not "is_public" in slab_record:
675                 #slab_record["is_public"] = True
676
677         return slab_record
678
679                    
680     def LaunchExperimentOnOAR(self,  slice_dict, added_nodes, slice_user=None):
681        
682         site_list = []
683         nodeid_list =[]
684         resource = ""
685         reqdict = {}
686         slice_name = slice_dict['name']
687         try:
688             slot = slice_dict['timeslot'] 
689             print>>sys.stderr, "\r\n \r\n \t\tLaunchExperimentOnOAR slot %s   " %(slot)
690         except KeyError:
691             #Running on default parameters
692             #XP immediate , 10 mins
693             slot = {'date':None,'start_time':None, 'timezone':None,'duration':None }#10 min 
694             
695             
696         reqdict['property'] ="network_address in ("
697         for node in added_nodes:
698             #Get the ID of the node : remove the root auth and put the site in a separate list
699             s=node.split(".")
700             # NT: it's not clear for me if the nodenames will have the senslab prefix
701             # so lets take the last part only, for now.
702             lastpart=s[-1]
703             #if s[0] == self.root_auth :
704             # Again here it's not clear if nodes will be prefixed with <site>_, lets split and tanke the last part for now.
705             s=lastpart.split("_")
706             nodeid=s[-1]
707             reqdict['property'] += "'"+ nodeid +"', "
708             nodeid_list.append(nodeid)
709             #site_list.append( l[0] )
710             
711             
712         reqdict['property'] =  reqdict['property'][0: len( reqdict['property'])-2] +")"
713         reqdict['resource'] ="network_address="+ str(len(nodeid_list))
714         
715         if slot['duration']:
716             walltime = slot['duration'].split(":")
717             # Fixing the walltime by adding a few delays. First put the walltime in seconds
718             # oarAdditionalDelay = 20; additional delay for /bin/sleep command to
719             # take in account  prologue and epilogue scripts execution
720             # int walltimeAdditionalDelay = 120;  additional delay
721
722             desired_walltime =  int(walltime[0])*3600 + int(walltime[1]) * 60 + int(walltime[2])
723             total_walltime = desired_walltime + 140 #+2 min 20
724             sleep_walltime = desired_walltime + 20 #+20 sec
725             print>>sys.stderr, "\r\n \r\n \t\tLaunchExperimentOnOAR desired_walltime %s  total_walltime %s sleep_walltime %s  " %(desired_walltime,total_walltime,sleep_walltime)
726             #Put the walltime back in str form
727             #First get the hours
728             walltime[0] = str(total_walltime / 3600)
729             total_walltime = total_walltime - 3600 * int(walltime[0])
730             #Get the remaining minutes
731             walltime[1] = str(total_walltime / 60)
732             total_walltime =  total_walltime - 60 * int(walltime[1])
733             #Get the seconds
734             walltime[2] = str(total_walltime)
735             print>>sys.stderr, "\r\n \r\n \t\tLaunchExperimentOnOAR  walltime %s " %(walltime)
736
737             reqdict['resource']+= ",walltime=" + str(walltime[0]) + ":" + str(walltime[1]) + ":" + str(walltime[2]) 
738             reqdict['script_path'] = "/bin/sleep " + str(sleep_walltime)
739         else:
740             reqdict['resource']+= ",walltime=" + str(00) + ":" + str(12) + ":" + str(20) #+2 min 20
741             reqdict['script_path'] = "/bin/sleep 620" #+20 sec    
742         #In case of a scheduled experiment (not immediate)
743         #To run an XP immediately, don't specify date and time in RSpec 
744         #They will be set to None.
745         if slot['date'] and slot['start_time']:
746             if slot['timezone'] is '' or slot['timezone'] is None:
747                 #assume it is server timezone
748                 server_timestamp,server_tz = self.GetTimezone()
749                 from_zone=tz.gettz(server_tz) 
750                 print>>sys.stderr, "\r\n \r\n \t\tLaunchExperimentOnOAR  timezone not specified  server_tz %s from_zone  %s" %(server_tz,from_zone) 
751             else:
752                 #Get zone of the user from the reservation time given in the rspec
753                 from_zone = tz.gettz(slot['timezone'])  
754                    
755             date = str(slot['date'])  + " " + str(slot['start_time'])
756             user_datetime = datetime.datetime.strptime(date, self.time_format)
757             user_datetime = user_datetime.replace(tzinfo = from_zone)
758             
759             #Convert to UTC zone
760             to_zone = tz.tzutc()
761             utc_date = user_datetime.astimezone(to_zone)
762             #Readable time accpeted by OAR
763             reqdict['reservation']= utc_date.strftime(self.time_format)
764         
765             print>>sys.stderr, "\r\n \r\n \t\tLaunchExperimentOnOAR  reqdict['reservation'] %s " %(reqdict['reservation'])
766             
767         else:
768             # Immediate XP
769             # reservations are performed in the oar server timebase, so :
770             # 1- we get the server time(in UTC tz )/server timezone
771             # 2- convert the server UTC time in its timezone
772             # 3- add a custom delay to this time
773             # 4- convert this time to a readable form and it for the reservation request.
774             server_timestamp,server_tz = self.GetTimezone()
775             s_tz=tz.gettz(server_tz)
776             UTC_zone = tz.gettz("UTC")
777             #weird... datetime.fromtimestamp should work since we do from datetime import datetime
778             utc_server= datetime.datetime.fromtimestamp(float(server_timestamp)+20,UTC_zone)
779             server_localtime=utc_server.astimezone(s_tz)
780     
781             print>>sys.stderr, "\r\n \r\n \t\tLaunchExperimentOnOAR server_timestamp %s server_tz %s slice_name %s added_nodes %s username %s reqdict %s " %(server_timestamp,server_tz,slice_name,added_nodes,slice_user, reqdict )
782             readable_time = server_localtime.strftime(self.time_format)
783
784             print >>sys.stderr,"  \r\n \r\n \t\t\t\tAPRES ParseTimezone readable_time %s timestanp %s  " %(readable_time ,server_timestamp)
785             reqdict['reservation'] = readable_time
786         
787
788         reqdict['type'] = "deploy" 
789         reqdict['directory']= ""
790         reqdict['name']= "TestSandrine"
791        
792          
793         # first step : start the OAR job and update the job 
794         print>>sys.stderr, "\r\n \r\n LaunchExperimentOnOAR reqdict   %s \r\n site_list   %s"  %(reqdict,site_list)   
795        
796         answer = self.oar.POSTRequestToOARRestAPI('POST_job',reqdict,slice_user)
797         print>>sys.stderr, "\r\n \r\n LaunchExperimentOnOAR jobid   %s "  %(answer)
798         try:       
799             jobid = answer['id']
800         except KeyError:
801              print>>sys.stderr, "\r\n AddSliceTonode Impossible to create job  %s "  %( answer)
802              return
803         
804         print>>sys.stderr, "\r\n \r\n LaunchExperimentOnOAR jobid    %s added_nodes  %s slice_user %s"  %(jobid,added_nodes,slice_user)
805         self.db.update_job( slice_name, jobid ,added_nodes)
806         
807           
808         # second step : configure the experiment
809         # we need to store the nodes in a yaml (well...) file like this :
810         # [1,56,23,14,45,75] with name /tmp/sfa<jobid>.json
811         f=open('/tmp/sfa/'+str(jobid)+'.json','w')
812         f.write('[')
813         f.write(str(added_nodes[0].strip('node')))
814         for node in added_nodes[1:len(added_nodes)] :
815             f.write(','+node.strip('node'))
816         f.write(']')
817         f.close()
818         
819         # third step : call the senslab-experiment wrapper
820         #command= "java -jar target/sfa-1.0-jar-with-dependencies.jar "+str(jobid)+" "+slice_user
821         javacmdline="/usr/bin/java"
822         jarname="/opt/senslabexperimentwrapper/sfa-1.0-jar-with-dependencies.jar"
823         #ret=subprocess.check_output(["/usr/bin/java", "-jar", ", str(jobid), slice_user])
824         output = subprocess.Popen([javacmdline, "-jar", jarname, str(jobid), slice_user],stdout=subprocess.PIPE).communicate()[0]
825
826         print>>sys.stderr, "\r\n \r\n LaunchExperimentOnOAR wrapper returns   %s "  %(output)
827         return 
828                  
829  
830     #Delete the jobs and updates the job id in the senslab table
831     #to set it to -1  
832     #Does not clear the node list 
833     def DeleteSliceFromNodes(self, slice_record):
834          # Get user information
835        
836         self.DeleteJobs(slice_record['oar_job_id'], slice_record['hrn'])
837         self.db.update_job(slice_record['hrn'], job_id = -1)
838         return   
839     
840  
841
842  
843             
844     def augment_records_with_testbed_info (self, sfa_records):
845         return self.fill_record_info (sfa_records)
846     
847     def fill_record_info(self, records):
848         """
849         Given a SFA record, fill in the senslab specific and SFA specific
850         fields in the record. 
851         """
852                     
853         print >>sys.stderr, "\r\n \t\t  SLABDRIVER.PY fill_record_info 000000000 fill_record_info %s  " %(records)
854         if not isinstance(records, list):
855             records = [records]
856
857         parkour = records 
858         try:
859             for record in parkour:
860                     
861                 if str(record['type']) == 'slice':
862                     #print >>sys.stderr, "\r\n \t\t  SLABDRIVER.PY  fill_record_info \t \t record %s" %(record)
863                     #sfatable = SfaTable()
864                     
865                     #existing_records_by_id = {}
866                     #all_records = dbsession.query(RegRecord).all()
867                     #for rec in all_records:
868                         #existing_records_by_id[rec.record_id] = rec
869                     #print >>sys.stderr, "\r\n \t\t SLABDRIVER.PY  fill_record_info \t\t existing_records_by_id %s" %(existing_records_by_id[record['record_id']])
870                         
871                     #recslice = self.db.find('slice',{'slice_hrn':str(record['hrn'])}) 
872                     #recslice = slab_dbsession.query(SliceSenslab).filter_by(slice_hrn = str(record['hrn'])).first()
873                     recslice = self.GetSlices(slice_filter =  str(record['hrn']), filter_type = 'slice_hrn')
874                     #print >>sys.stderr, "\r\n \t\t  SLABDRIVER.PY fill_record_info \t\t HOY HOY reclise %s" %(recslice)
875                     #if isinstance(recslice,list) and len(recslice) == 1:
876                         #recslice = recslice[0]
877                     
878                     recuser = dbsession.query(RegRecord).filter_by(record_id = recslice['record_id_user']).first()
879                     #existing_records_by_id[recslice['record_id_user']]
880                     #print >>sys.stderr, "\r\n \t\t  SLABDRIVER.PY fill_record_info \t\t recuser %s" %(recuser)
881                     
882             
883                     record.update({'PI':[recuser.hrn],
884                     'researcher': [recuser.hrn],
885                     'name':record['hrn'], 
886                     'oar_job_id':recslice['oar_job_id'],
887                     'node_ids': [],
888                     'person_ids':[recslice['record_id_user']],
889                     'geni_urn':'',  #For client_helper.py compatibility
890                     'keys':'',  #For client_helper.py compatibility
891                     'key_ids':''})  #For client_helper.py compatibility
892                     
893                 elif str(record['type']) == 'user':
894                     #Add the data about slice
895                     rec = self.GetSlices(slice_filter = record['record_id'], filter_type = 'record_id_user')
896                     print >>sys.stderr, "\r\n \t\t  SLABDRIVER.PY fill_record_info USEEEEEEEEEERDESU!  rec %s \r\n \t rec['record_id_user'] %s " %(rec,rec['record_id_user']) 
897                     #Append record in records list, therfore fetches user and slice info again(one more loop)
898                     #Will update PIs and researcher for the slice
899                     recuser = dbsession.query(RegRecord).filter_by(record_id = rec['record_id_user']).first()
900                     rec.update({'PI':[recuser.hrn],
901                     'researcher': [recuser.hrn],
902                     'name':record['hrn'], 
903                     'oar_job_id':rec['oar_job_id'],
904                     'node_ids': [],
905                     'person_ids':[rec['record_id_user']]})
906                     #retourne une liste 100512
907                     
908                     user_slab = self.GetPersons({'hrn':recuser.hrn})
909                     
910
911                     rec.update({'type':'slice','hrn':rec['slice_hrn']})
912                     record.update(user_slab[0])
913                     #For client_helper.py compatibility
914                     record.update( { 'geni_urn':'',
915                     'keys':'',
916                     'key_ids':'' })                
917                     records.append(rec)
918                     
919                     print >>sys.stderr, "\r\n \t\t  SLABDRIVER.PY fill_record_info ADDING SLICEINFO TO USER records %s" %(records) 
920                     
921             print >>sys.stderr, "\r\n \t\t  SLABDRIVER.PY fill_record_info OKrecords %s" %(records) 
922         except TypeError:
923             print >>sys.stderr, "\r\n \t\t SLABDRIVER fill_record_info  EXCEPTION RECORDS : %s" %(records)      
924         return
925         
926         #self.fill_record_slab_info(records)
927         ##print >>sys.stderr, "\r\n \t\t after fill_record_slab_info %s" %(records)     
928         #self.fill_record_sfa_info(records)
929         #print >>sys.stderr, "\r\n \t\t after fill_record_sfa_info"
930         
931         
932
933     
934         
935     #def update_membership_list(self, oldRecord, record, listName, addFunc, delFunc):
936         ## get a list of the HRNs tht are members of the old and new records
937         #if oldRecord:
938             #oldList = oldRecord.get(listName, [])
939         #else:
940             #oldList = []     
941         #newList = record.get(listName, [])
942
943         ## if the lists are the same, then we don't have to update anything
944         #if (oldList == newList):
945             #return
946
947         ## build a list of the new person ids, by looking up each person to get
948         ## their pointer
949         #newIdList = []
950         #table = SfaTable()
951         #records = table.find({'type': 'user', 'hrn': newList})
952         #for rec in records:
953             #newIdList.append(rec['pointer'])
954
955         ## build a list of the old person ids from the person_ids field 
956         #if oldRecord:
957             #oldIdList = oldRecord.get("person_ids", [])
958             #containerId = oldRecord.get_pointer()
959         #else:
960             ## if oldRecord==None, then we are doing a Register, instead of an
961             ## update.
962             #oldIdList = []
963             #containerId = record.get_pointer()
964
965     ## add people who are in the new list, but not the oldList
966         #for personId in newIdList:
967             #if not (personId in oldIdList):
968                 #addFunc(self.plauth, personId, containerId)
969
970         ## remove people who are in the old list, but not the new list
971         #for personId in oldIdList:
972             #if not (personId in newIdList):
973                 #delFunc(self.plauth, personId, containerId)
974
975     #def update_membership(self, oldRecord, record):
976         #print >>sys.stderr, " \r\n \r\n ***SLABDRIVER.PY update_membership record ", record
977         #if record.type == "slice":
978             #self.update_membership_list(oldRecord, record, 'researcher',
979                                         #self.users.AddPersonToSlice,
980                                         #self.users.DeletePersonFromSlice)
981         #elif record.type == "authority":
982             ## xxx TODO
983             #pass
984
985 ### thierry
986 # I don't think you plan on running a component manager at this point
987 # let me clean up the mess of ComponentAPI that is deprecated anyways