4 from datetime import datetime
5 from dateutil import tz
6 from time import strftime,gmtime
8 from sfa.util.faults import MissingSfaInfo , SliverDoesNotExist
9 from sfa.util.sfalogging import logger
10 from sfa.util.defaultdict import defaultdict
12 from sfa.storage.record import Record
13 from sfa.storage.alchemy import dbsession
14 from sfa.storage.model import RegRecord, RegUser
16 from sfa.trust.credential import Credential
17 from sfa.trust.gid import GID
19 from sfa.managers.driver import Driver
20 from sfa.rspecs.version_manager import VersionManager
21 from sfa.rspecs.rspec import RSpec
23 from sfa.util.xrn import hrn_to_urn, urn_to_sliver_id
24 from sfa.planetlab.plxrn import slicename_to_hrn, hostname_to_hrn, \
25 hrn_to_pl_slicename, hostname_to_urn
27 ## thierry: everything that is API-related (i.e. handling incoming requests)
29 # SlabDriver should be really only about talking to the senslab testbed
32 from sfa.senslab.OARrestapi import OARrestapi
33 from sfa.senslab.LDAPapi import LDAPapi
35 from sfa.senslab.slabpostgres import SlabDB, slab_dbsession,SliceSenslab
36 from sfa.senslab.slabaggregate import SlabAggregate
37 from sfa.senslab.slabslices import SlabSlices
42 # this inheritance scheme is so that the driver object can receive
43 # GetNodes or GetSites sorts of calls directly
44 # and thus minimize the differences in the managers with the pl version
45 class SlabDriver(Driver):
47 def __init__(self, config):
48 Driver.__init__ (self, config)
50 self.hrn = config.SFA_INTERFACE_HRN
52 self.root_auth = config.SFA_REGISTRY_ROOT_AUTH
54 self.oar = OARrestapi()
56 self.time_format = "%Y-%m-%d %H:%M:%S"
57 self.db = SlabDB(config,debug = True)
61 def sliver_status(self,slice_urn,slice_hrn):
62 """Receive a status request for slice named urn/hrn
63 urn:publicid:IDN+senslab+nturro_slice hrn senslab.nturro_slice
64 shall return a structure as described in
65 http://groups.geni.net/geni/wiki/GAPI_AM_API_V2#SliverStatus
66 NT : not sure if we should implement this or not, but used by sface.
70 #First get the slice with the slice hrn
71 sl = self.GetSlices(slice_filter = slice_hrn, slice_filter_type = 'slice_hrn')
73 raise SliverDoesNotExist("%s slice_hrn" % (slice_hrn))
75 nodes_in_slice = sl['node_ids']
76 if len(nodes_in_slice) is 0:
77 raise SliverDoesNotExist("No slivers allocated ")
79 logger.debug("Slabdriver - sliver_status Sliver status urn %s hrn %s sl\
80 %s \r\n " %(slice_urn,slice_hrn,sl) )
82 if sl['oar_job_id'] is not -1:
83 #A job is running on Senslab for this slice
84 # report about the local nodes that are in the slice only
86 nodes_all = self.GetNodes({'hostname':nodes_in_slice},
87 ['node_id', 'hostname','site','boot_state'])
88 nodeall_byhostname = dict([(n['hostname'], n) for n in nodes_all])
92 top_level_status = 'unknown'
94 top_level_status = 'ready'
95 result['geni_urn'] = slice_urn
96 result['pl_login'] = sl['job_user'] #For compatibility
99 timestamp = float(sl['startTime']) + float(sl['walltime'])
100 result['pl_expires'] = strftime(self.time_format, \
101 gmtime(float(timestamp)))
102 #result['slab_expires'] = strftime(self.time_format,\
103 #gmtime(float(timestamp)))
108 #res['slab_hostname'] = node['hostname']
109 #res['slab_boot_state'] = node['boot_state']
111 res['pl_hostname'] = nodeall_byhostname[node]['hostname']
112 res['pl_boot_state'] = nodeall_byhostname[node]['boot_state']
113 res['pl_last_contact'] = strftime(self.time_format, \
114 gmtime(float(timestamp)))
115 sliver_id = urn_to_sliver_id(slice_urn, sl['record_id_slice'], \
116 nodeall_byhostname[node]['node_id'])
117 res['geni_urn'] = sliver_id
118 if nodeall_byhostname[node]['boot_state'] == 'Alive':
120 res['geni_status'] = 'ready'
122 res['geni_status'] = 'failed'
123 top_level_status = 'failed'
125 res['geni_error'] = ''
127 resources.append(res)
129 result['geni_status'] = top_level_status
130 result['geni_resources'] = resources
131 print >>sys.stderr, "\r\n \r\n_____________ Sliver status resources %s res %s \r\n " %(resources,res)
135 def create_sliver (self, slice_urn, slice_hrn, creds, rspec_string, users, options):
136 print>>sys.stderr, "\r\n \r\n \t=============================== SLABDRIVER.PY create_sliver "
137 aggregate = SlabAggregate(self)
139 slices = SlabSlices(self)
140 peer = slices.get_peer(slice_hrn)
141 sfa_peer = slices.get_sfa_peer(slice_hrn)
144 if not isinstance(creds, list):
148 slice_record = users[0].get('slice_record', {})
151 rspec = RSpec(rspec_string)
152 print>>sys.stderr, "\r\n \r\n \t=============================== SLABDRIVER.PY create_sliver ============================rspec.version %s " %(rspec.version)
155 # ensure site record exists?
156 # ensure slice record exists
157 slice = slices.verify_slice(slice_hrn, slice_record, peer, sfa_peer, options=options)
158 requested_attributes = rspec.version.get_slice_attributes()
160 if requested_attributes:
161 for attrib_dict in requested_attributes:
162 if 'timeslot' in attrib_dict and attrib_dict['timeslot'] is not None:
163 slice.update({'timeslot':attrib_dict['timeslot']})
164 print >>sys.stderr, "\r\n \r\n \t=============================== SLABDRIVER.PY create_sliver ..... slice %s " %(slice)
165 # ensure person records exists
166 persons = slices.verify_persons(slice_hrn, slice, users, peer, sfa_peer, options=options)
167 # ensure slice attributes exists?
170 # add/remove slice from nodes
171 print >>sys.stderr, "\r\n \r\n \t=============================== SLABDRIVER.PY create_sliver ..... "
173 requested_slivers = [node.get('component_name') for node in rspec.version.get_nodes_with_slivers()]
174 print >>sys.stderr, "\r\n \r\n \t=============================== ........... requested_slivers ============================requested_slivers %s " %(requested_slivers)
175 nodes = slices.verify_slice_nodes(slice, requested_slivers, peer)
178 return aggregate.get_rspec(slice_xrn=slice_urn, version=rspec.version)
181 def delete_sliver (self, slice_urn, slice_hrn, creds, options):
183 slice = self.GetSlices(slice_filter= slice_hrn, slice_filter_type = 'slice_hrn')
184 print>>sys.stderr, "\r\n \r\n \t\t SLABDRIVER.PY delete_sliver slice %s" %(slice)
188 slices = SlabSlices(self)
189 # determine if this is a peer slice
190 # xxx I wonder if this would not need to use PlSlices.get_peer instead
191 # in which case plc.peers could be deprecated as this here
192 # is the only/last call to this last method in plc.peers
193 peer = slices.get_peer(slice_hrn)
196 self.UnBindObjectFromPeer('slice', slice['record_id_slice'], peer)
197 self.DeleteSliceFromNodes(slice)
200 self.BindObjectToPeer('slice', slice['slice_id'], peer, slice['peer_slice_id'])
204 def AddSlice(self, slice_record):
205 slab_slice = SliceSenslab( slice_hrn = slice_record['slice_hrn'], record_id_slice= slice_record['record_id_slice'] , record_id_user= slice_record['record_id_user'], peer_authority = slice_record['peer_authority'])
206 print>>sys.stderr, "\r\n \r\n \t\t\t =======SLABDRIVER.PY AddSlice slice_record %s slab_slice %s" %(slice_record,slab_slice)
207 slab_dbsession.add(slab_slice)
208 slab_dbsession.commit()
211 # first 2 args are None in case of resource discovery
212 def list_resources (self, slice_urn, slice_hrn, creds, options):
213 #cached_requested = options.get('cached', True)
215 version_manager = VersionManager()
216 # get the rspec's return format from options
217 rspec_version = version_manager.get_version(options.get('geni_rspec_version'))
218 version_string = "rspec_%s" % (rspec_version)
220 #panos adding the info option to the caching key (can be improved)
221 if options.get('info'):
222 version_string = version_string + "_"+options.get('info', 'default')
224 # look in cache first
225 #if cached_requested and self.cache and not slice_hrn:
226 #rspec = self.cache.get(version_string)
228 #logger.debug("SlabDriver.ListResources: returning cached advertisement")
231 #panos: passing user-defined options
233 aggregate = SlabAggregate(self)
234 origin_hrn = Credential(string=creds[0]).get_gid_caller().get_hrn()
235 options.update({'origin_hrn':origin_hrn})
236 rspec = aggregate.get_rspec(slice_xrn=slice_urn, version=rspec_version,
238 print>>sys.stderr, " \r\n \r\n \t SLABDRIVER list_resources rspec "
240 #if self.cache and not slice_hrn:
241 #logger.debug("Slab.ListResources: stores advertisement in cache")
242 #self.cache.add(version_string, rspec)
247 def list_slices (self, creds, options):
248 # look in cache first
250 #slices = self.cache.get('slices')
252 #logger.debug("PlDriver.list_slices returns from cache")
256 print>>sys.stderr, " \r\n \t\t SLABDRIVER.PY list_slices"
257 slices = self.GetSlices()
258 slice_hrns = [slicename_to_hrn(self.hrn, slice['slice_hrn']) for slice in slices]
259 slice_urns = [hrn_to_urn(slice_hrn, 'slice') for slice_hrn in slice_hrns]
263 #logger.debug ("SlabDriver.list_slices stores value in cache")
264 #self.cache.add('slices', slice_urns)
268 #No site or node register supported
269 def register (self, sfa_record, hrn, pub_key):
270 type = sfa_record['type']
271 slab_record = self.sfa_fields_to_slab_fields(type, hrn, sfa_record)
275 acceptable_fields=['url', 'instantiation', 'name', 'description']
276 for key in slab_record.keys():
277 if key not in acceptable_fields:
279 print>>sys.stderr, " \r\n \t\t SLABDRIVER.PY register"
280 slices = self.GetSlices(slice_filter =slab_record['hrn'], slice_filter_type = 'slice_hrn')
282 pointer = self.AddSlice(slab_record)
284 pointer = slices[0]['slice_id']
287 persons = self.GetPersons([sfa_record])
288 #persons = self.GetPersons([sfa_record['hrn']])
290 pointer = self.AddPerson(dict(sfa_record))
293 pointer = persons[0]['person_id']
295 #Does this make sense to senslab ?
296 #if 'enabled' in sfa_record and sfa_record['enabled']:
297 #self.UpdatePerson(pointer, {'enabled': sfa_record['enabled']})
299 # add this person to the site only if she is being added for the first
300 # time by sfa and doesont already exist in plc
301 if not persons or not persons[0]['site_ids']:
302 login_base = get_leaf(sfa_record['authority'])
303 self.AddPersonToSite(pointer, login_base)
305 # What roles should this user have?
306 self.AddRoleToPerson('user', pointer)
309 self.AddPersonKey(pointer, {'key_type' : 'ssh', 'key' : pub_key})
311 #No node adding outside OAR
315 #No site or node record update allowed
316 def update (self, old_sfa_record, new_sfa_record, hrn, new_key):
317 pointer = old_sfa_record['pointer']
318 type = old_sfa_record['type']
320 # new_key implemented for users only
321 if new_key and type not in [ 'user' ]:
322 raise UnknownSfaType(type)
324 #if (type == "authority"):
325 #self.shell.UpdateSite(pointer, new_sfa_record)
328 slab_record=self.sfa_fields_to_slab_fields(type, hrn, new_sfa_record)
329 if 'name' in slab_record:
330 slab_record.pop('name')
331 self.UpdateSlice(pointer, slab_record)
335 all_fields = new_sfa_record
336 for key in all_fields.keys():
337 if key in ['first_name', 'last_name', 'title', 'email',
338 'password', 'phone', 'url', 'bio', 'accepted_aup',
340 update_fields[key] = all_fields[key]
341 self.UpdatePerson(pointer, update_fields)
344 # must check this key against the previous one if it exists
345 persons = self.GetPersons([pointer], ['key_ids'])
347 keys = person['key_ids']
348 keys = self.GetKeys(person['key_ids'])
350 # Delete all stale keys
353 if new_key != key['key']:
354 self.DeleteKey(key['key_id'])
358 self.AddPersonKey(pointer, {'key_type': 'ssh', 'key': new_key})
364 def remove (self, sfa_record):
365 type=sfa_record['type']
366 hrn=sfa_record['hrn']
367 record_id= sfa_record['record_id']
369 username = hrn.split(".")[len(hrn.split(".")) -1]
371 persons = self.GetPersons(sfa_record)
372 #persons = self.GetPersons(username)
373 # only delete this person if he has site ids. if he doesnt, it probably means
374 # he was just removed from a site, not actually deleted
375 if persons and persons[0]['site_ids']:
376 self.DeletePerson(username)
377 elif type == 'slice':
378 if self.GetSlices(slice_filter = hrn, slice_filter_type = 'slice_hrn'):
379 self.DeleteSlice(hrn)
381 #elif type == 'authority':
382 #if self.GetSites(pointer):
383 #self.DeleteSite(pointer)
387 def GetPeers (self,auth = None, peer_filter=None, return_fields_list=None):
389 existing_records = {}
390 existing_hrns_by_types= {}
391 print >>sys.stderr, "\r\n \r\n SLABDRIVER GetPeers auth = %s, peer_filter %s, return_field %s " %(auth , peer_filter, return_fields_list)
392 all_records = dbsession.query(RegRecord).filter(RegRecord.type.like('%authority%')).all()
393 for record in all_records:
394 existing_records[(record.hrn,record.type)] = record
395 if record.type not in existing_hrns_by_types:
396 existing_hrns_by_types[record.type] = [record.hrn]
397 print >>sys.stderr, "\r\n \r\n SLABDRIVER GetPeers \t NOT IN existing_hrns_by_types %s " %( existing_hrns_by_types)
400 print >>sys.stderr, "\r\n \r\n SLABDRIVER GetPeers \t INNN type %s hrn %s " %( record.type,record.hrn )
401 existing_hrns_by_types[record.type].append(record.hrn)
402 print >>sys.stderr, "\r\n \r\n SLABDRIVER GetPeers \t INNN existing_hrns_by_types %s " %( existing_hrns_by_types)
403 #existing_hrns_by_types.update({record.type:(existing_hrns_by_types[record.type].append(record.hrn))})
405 print >>sys.stderr, "\r\n \r\n SLABDRIVER GetPeers existing_hrns_by_types %s " %( existing_hrns_by_types)
409 print >>sys.stderr, "\r\n \r\n SLABDRIVER GetPeers existing_hrns_by_types['authority+sa'] %s \t\t existing_records %s " %(existing_hrns_by_types['authority'],existing_records)
411 records_list.append(existing_records[(peer_filter,'authority')])
413 for hrn in existing_hrns_by_types['authority']:
414 records_list.append(existing_records[(hrn,'authority')])
416 print >>sys.stderr, "\r\n \r\n SLABDRIVER GetPeers records_list %s " %(records_list)
421 return_records = records_list
422 if not peer_filter and not return_fields_list:
426 print >>sys.stderr, "\r\n \r\n SLABDRIVER GetPeers return_records %s " %(return_records)
427 return return_records
430 #TODO : Handling OR request in make_ldap_filters_from_records instead of the for loop
431 #over the records' list
432 def GetPersons(self, person_filter=None, return_fields_list=None):
434 person_filter should be a list of dictionnaries when not set to None.
435 Returns a list of users found.
438 print>>sys.stderr, "\r\n \r\n \t\t\t GetPersons person_filter %s" %(person_filter)
440 if person_filter and isinstance(person_filter,list):
441 #If we are looking for a list of users (list of dict records)
442 #Usually the list contains only one user record
443 for f in person_filter:
444 person = self.ldap.LdapFindUser(f)
445 person_list.append(person)
448 person_list = self.ldap.LdapFindUser()
453 def GetTimezone(self):
454 server_timestamp,server_tz = self.oar.parser.SendRequest("GET_timezone")
455 return server_timestamp,server_tz
458 def DeleteJobs(self, job_id, slice_hrn):
461 username = slice_hrn.split(".")[-1].rstrip("_slice")
463 reqdict['method'] = "delete"
464 reqdict['strval'] = str(job_id)
465 answer = self.oar.POSTRequestToOARRestAPI('DELETE_jobs_id',reqdict,username)
466 print>>sys.stderr, "\r\n \r\n jobid DeleteJobs %s " %(answer)
468 def GetJobsId(self, job_id, username = None ):
470 Details about a specific job.
471 Includes details about submission time, jot type, state, events,
472 owner, assigned ressources, walltime etc...
476 node_list_k = 'assigned_network_address'
477 #Get job info from OAR
478 job_info = self.oar.parser.SendRequest(req, job_id, username)
480 logger.debug("SLABDRIVER \t GetJobsId %s " %(job_info))
482 if job_info['state'] == 'Terminated':
483 logger.debug("SLABDRIVER \t GetJobsId job %s TERMINATED"\
486 if job_info['state'] == 'Error':
487 logger.debug("SLABDRIVER \t GetJobsId ERROR message %s "\
492 logger.error("SLABDRIVER \tGetJobsId KeyError")
495 parsed_job_info = self.get_info_on_reserved_nodes(job_info,node_list_k)
496 #Replaces the previous entry "assigned_network_address" / "reserved_resources"
498 job_info.update({'node_ids':parsed_job_info[node_list_k]})
499 del job_info[node_list_k]
500 logger.debug(" \r\nSLABDRIVER \t GetJobsId job_info %s " %(job_info))
504 def GetJobsResources(self,job_id, username = None):
505 #job_resources=['reserved_resources', 'assigned_resources','job_id', 'job_uri', 'assigned_nodes',\
507 #assigned_res = ['resource_id', 'resource_uri']
508 #assigned_n = ['node', 'node_uri']
510 req = "GET_jobs_id_resources"
511 node_list_k = 'reserved_resources'
513 #Get job resources list from OAR
514 node_id_list = self.oar.parser.SendRequest(req, job_id, username)
515 logger.debug("SLABDRIVER \t GetJobsResources %s " %(node_id_list))
518 self.__get_hostnames_from_oar_node_ids(node_id_list)
520 #parsed_job_info = self.get_info_on_reserved_nodes(job_info,node_list_k)
521 #Replaces the previous entry "assigned_network_address" / "reserved_resources"
523 job_info = {'node_ids':hostname_list}
528 def get_info_on_reserved_nodes(self,job_info,node_list_name):
529 #Get the list of the testbed nodes records and make a
530 #dictionnary keyed on the hostname out of it
531 node_list_dict = self.GetNodes()
532 #node_hostname_list = []
533 node_hostname_list = [node['hostname'] for node in node_list_dict]
534 #for node in node_list_dict:
535 #node_hostname_list.append(node['hostname'])
536 node_dict = dict(zip(node_hostname_list,node_list_dict))
538 reserved_node_hostname_list = []
539 for index in range(len(job_info[node_list_name])):
540 #job_info[node_list_name][k] =
541 reserved_node_hostname_list[index] = \
542 node_dict[job_info[node_list_name][index]]['hostname']
544 logger.debug("SLABDRIVER \t get_info_on_reserved_nodes \
545 reserved_node_hostname_list %s" \
546 %(reserved_node_hostname_list))
548 logger.error("SLABDRIVER \t get_info_on_reserved_nodes KEYERROR " )
550 return reserved_node_hostname_list
552 def GetNodesCurrentlyInUse(self):
553 """Returns a list of all the nodes already involved in an oar job"""
554 return self.oar.parser.SendRequest("GET_running_jobs")
556 def __get_hostnames_from_oar_node_ids(self, resource_id_list ):
557 full_nodes_dict_list = self.GetNodes()
558 #Put the full node list into a dictionary keyed by oar node id
559 oar_id_node_dict = {}
560 for node in full_nodes_dict_list:
561 oar_id_node_dict[node['oar_id']] = node
564 for resource_id in resource_id_list:
565 hostname_list.append(oar_id_node_dict[resource_id]['hostname'])
568 def GetReservedNodes(self):
569 #Get the nodes in use and the reserved nodes
570 reservation_dict_list = self.oar.parser.SendRequest("GET_reserved_nodes")
572 oar_node_id_dict = self.__get_oar_node_ids()
574 for resa in reservation_dict_list:
575 logger.debug ("GetReservedNodes resa %s"%(resa))
576 resa['reserved_nodes_hostnames'] = \
577 self.__get_hostnames_from_oar_node_ids(resa['resource_ids'])
578 del resa['resource_ids']
579 return reservation_dict_list
581 def GetNodes(self,node_filter_dict = None, return_fields_list = None):
583 node_filter_dict : dictionnary of lists
586 node_dict_by_id = self.oar.parser.SendRequest("GET_resources_full")
587 node_dict_list = node_dict_by_id.values()
589 #No filtering needed return the list directly
590 if not (node_filter_dict or return_fields_list):
591 return node_dict_list
593 return_node_list = []
595 for filter_key in node_filter_dict:
597 #Filter the node_dict_list by each value contained in the
598 #list node_filter_dict[filter_key]
599 for value in node_filter_dict[filter_key]:
600 for node in node_dict_list:
601 if node[filter_key] == value:
602 if return_fields_list :
604 for k in return_fields_list:
606 return_node_list.append(tmp)
608 return_node_list.append(node)
610 logger.log_exc("GetNodes KeyError")
614 return return_node_list
617 def GetSites(self, site_filter_name = None, return_fields_list = None):
618 site_dict = self.oar.parser.SendRequest("GET_sites")
619 #site_dict : dict where the key is the sit ename
620 return_site_list = []
621 if not ( site_filter_name or return_fields_list):
622 return_site_list = site_dict.values()
623 return return_site_list
625 if site_filter_name in site_dict:
626 if return_fields_list:
627 for field in return_fields_list:
631 tmp[field] = site_dict[site_filter_name][field]
633 logger.error("GetSites KeyError %s "%(field))
635 return_site_list.append(tmp)
637 return_site_list.append( site_dict[site_filter_name])
640 return return_site_list
643 def GetSlices(self, slice_filter = None, slice_filter_type = None, \
644 return_fields_list=None):
645 return_slice_list = []
648 authorized_filter_types_list = ['slice_hrn', 'record_id_user']
649 print>>sys.stderr,"\r\n SLABDRIVER \tGetSlices authorized_filter_types_list %s" %(authorized_filter_types_list)
650 if slice_filter_type in authorized_filter_types_list:
651 if slice_filter_type == 'slice_hrn':
652 slicerec = slab_dbsession.query(SliceSenslab).\
653 filter_by(slice_hrn = slice_filter).first()
655 if slice_filter_type == 'record_id_user':
656 slicerec = slab_dbsession.query(SliceSenslab).\
657 filter_by(record_id_user = slice_filter).first()
660 rec = slicerec.dump_sqlalchemyobj_to_dict()
661 print>>sys.stderr,"\r\n SLABDRIVER \tGetSlices rec %s" %(rec)
663 login = slicerec.slice_hrn.split(".")[1].split("_")[0]
664 logger.debug("\r\n SLABDRIVER \tGetSlices login %s slice record %s"\
666 if slicerec.oar_job_id is not -1:
667 #Check with OAR the status of the job if a job id is in
669 #rslt = self.GetJobsResources(slicerec.oar_job_id,username = login)
670 rslt = self.GetJobsId(slicerec.oar_job_id,username = login)
673 rec.update({'hrn':str(rec['slice_hrn'])})
674 #If GetJobsResources is empty, this means the job is now in the 'Terminated' state
675 #Update the slice record
677 self.db.update_job(slice_filter, job_id = -1)
678 rec['oar_job_id'] = -1
679 rec.update({'hrn':str(rec['slice_hrn'])})
682 rec['node_ids'] = rec['node_list']
686 logger.debug("SLABDRIVER.PY GetSlices rec %s" %(rec))
692 return_slice_list = slab_dbsession.query(SliceSenslab).all()
694 print >>sys.stderr, " \r\n \r\n \tSLABDRIVER.PY GetSlices slices %s slice_filter %s " %(return_slice_list,slice_filter)
696 #if return_fields_list:
697 #return_slice_list = parse_filter(sliceslist, slice_filter,'slice', return_fields_list)
701 return return_slice_list
706 def testbed_name (self): return self.hrn
708 # 'geni_request_rspec_versions' and 'geni_ad_rspec_versions' are mandatory
709 def aggregate_version (self):
710 version_manager = VersionManager()
711 ad_rspec_versions = []
712 request_rspec_versions = []
713 for rspec_version in version_manager.versions:
714 if rspec_version.content_type in ['*', 'ad']:
715 ad_rspec_versions.append(rspec_version.to_dict())
716 if rspec_version.content_type in ['*', 'request']:
717 request_rspec_versions.append(rspec_version.to_dict())
719 'testbed':self.testbed_name(),
720 'geni_request_rspec_versions': request_rspec_versions,
721 'geni_ad_rspec_versions': ad_rspec_versions,
730 # Convert SFA fields to PLC fields for use when registering up updating
731 # registry record in the PLC database
733 # @param type type of record (user, slice, ...)
734 # @param hrn human readable name
735 # @param sfa_fields dictionary of SFA fields
736 # @param slab_fields dictionary of PLC fields (output)
738 def sfa_fields_to_slab_fields(self, type, hrn, record):
740 def convert_ints(tmpdict, int_fields):
741 for field in int_fields:
743 tmpdict[field] = int(tmpdict[field])
746 #for field in record:
747 # slab_record[field] = record[field]
750 #instantion used in get_slivers ?
751 if not "instantiation" in slab_record:
752 slab_record["instantiation"] = "senslab-instantiated"
753 slab_record["hrn"] = hrn_to_pl_slicename(hrn)
754 print >>sys.stderr, "\r\n \r\n \t SLABDRIVER.PY sfa_fields_to_slab_fields slab_record %s hrn_to_pl_slicename(hrn) hrn %s " %(slab_record['hrn'], hrn)
756 slab_record["url"] = record["url"]
757 if "description" in record:
758 slab_record["description"] = record["description"]
759 if "expires" in record:
760 slab_record["expires"] = int(record["expires"])
762 #nodes added by OAR only and then imported to SFA
763 #elif type == "node":
764 #if not "hostname" in slab_record:
765 #if not "hostname" in record:
766 #raise MissingSfaInfo("hostname")
767 #slab_record["hostname"] = record["hostname"]
768 #if not "model" in slab_record:
769 #slab_record["model"] = "geni"
772 #elif type == "authority":
773 #slab_record["login_base"] = hrn_to_slab_login_base(hrn)
775 #if not "name" in slab_record:
776 #slab_record["name"] = hrn
778 #if not "abbreviated_name" in slab_record:
779 #slab_record["abbreviated_name"] = hrn
781 #if not "enabled" in slab_record:
782 #slab_record["enabled"] = True
784 #if not "is_public" in slab_record:
785 #slab_record["is_public"] = True
789 def __process_walltime(self,duration=None):
790 """ Calculates the walltime in seconds from the duration in H:M:S
791 specified in the RSpec.
795 walltime = duration.split(":")
796 # Fixing the walltime by adding a few delays. First put the walltime
797 # in seconds oarAdditionalDelay = 20; additional delay for
798 # /bin/sleep command to
799 # take in account prologue and epilogue scripts execution
800 # int walltimeAdditionalDelay = 120; additional delay
802 desired_walltime = int(walltime[0])*3600 + int(walltime[1]) * 60 +\
804 total_walltime = desired_walltime + 140 #+2 min 20
805 sleep_walltime = desired_walltime + 20 #+20 sec
806 logger.debug("SLABDRIVER \t__process_walltime desired_walltime %s\
807 total_walltime %s sleep_walltime %s "\
808 %(desired_walltime, total_walltime, \
810 #Put the walltime back in str form
812 walltime[0] = str(total_walltime / 3600)
813 total_walltime = total_walltime - 3600 * int(walltime[0])
814 #Get the remaining minutes
815 walltime[1] = str(total_walltime / 60)
816 total_walltime = total_walltime - 60 * int(walltime[1])
818 walltime[2] = str(total_walltime)
819 logger.debug("SLABDRIVER \t__process_walltime walltime %s "\
822 #automatically set 10min +2 min 20
826 sleep_walltime = '620'
828 return walltime, sleep_walltime
831 def __transforms_timestamp_into_date(xp_utc_timestamp = None):
832 """ Transforms unix timestamp into valid OAR date format """
834 #Used in case of a scheduled experiment (not immediate)
835 #To run an XP immediately, don't specify date and time in RSpec
836 #They will be set to None.
838 #transform the xp_utc_timestamp into server readable time
839 xp_server_readable_date = datetime.fromtimestamp(int(\
840 xp_utc_timestamp)).strftime(self.time_format)
842 return xp_server_readable_date
847 def LaunchExperimentOnOAR(self, slice_dict, added_nodes, slice_user=None):
848 """ Creates the structure needed for a correct POST on OAR.
849 Makes the timestamp transformation into the appropriate format.
850 Sends the POST request to create the job with the resources in
858 slice_name = slice_dict['name']
860 slot = slice_dict['timeslot']
861 logger.debug("SLABDRIVER.PY \tLaunchExperimentOnOAR \
864 #Running on default parameters
865 #XP immediate , 10 mins
866 slot = { 'date':None, 'start_time':None,
867 'timezone':None, 'duration':None }#10 min
869 reqdict['workdir']= '/tmp'
870 reqdict['resource'] ="{network_address in ("
872 for node in added_nodes:
873 logger.debug("OARrestapi \tLaunchExperimentOnOAR \
876 #Get the ID of the node : remove the root auth and put
877 # the site in a separate list.
878 # NT: it's not clear for me if the nodenames will have the senslab
879 #prefix so lets take the last part only, for now.
881 # Again here it's not clear if nodes will be prefixed with <site>_,
882 #lets split and tanke the last part for now.
883 #s=lastpart.split("_")
886 reqdict['resource'] += "'"+ nodeid +"', "
887 nodeid_list.append(nodeid)
889 custom_length = len(reqdict['resource'])- 2
890 reqdict['resource'] = reqdict['resource'][0:custom_length] + \
891 ")}/nodes=" + str(len(nodeid_list))
893 #if slot['duration']:
894 walltime, sleep_walltime = self.__process_walltime(duration = \
897 #walltime, sleep_walltime = self.__process_walltime(duration = None)
899 reqdict['resource']+= ",walltime=" + str(walltime[0]) + \
900 ":" + str(walltime[1]) + ":" + str(walltime[2])
901 reqdict['script_path'] = "/bin/sleep " + str(sleep_walltime)
905 #In case of a scheduled experiment (not immediate)
906 #To run an XP immediately, don't specify date and time in RSpec
907 #They will be set to None.
908 server_timestamp,server_tz = self.GetTimezone()
909 if slot['date'] and slot['start_time']:
910 if slot['timezone'] is '' or slot['timezone'] is None:
911 #assume it is server timezone
912 from_zone=tz.gettz(server_tz)
913 logger.warning("SLABDRIVER \tLaunchExperimentOnOAR timezone \
914 not specified server_tz %s from_zone %s" \
915 %(server_tz,from_zone))
917 #Get zone of the user from the reservation time given
919 from_zone = tz.gettz(slot['timezone'])
921 date = str(slot['date']) + " " + str(slot['start_time'])
922 user_datetime = datetime.strptime(date, self.time_format)
923 user_datetime = user_datetime.replace(tzinfo = from_zone)
925 #Convert to server zone
927 to_zone = tz.gettz(server_tz)
928 reservation_date = user_datetime.astimezone(to_zone)
929 #Readable time accpeted by OAR
930 reqdict['reservation']= reservation_date.strftime(self.time_format)
932 logger.debug("SLABDRIVER \tLaunchExperimentOnOAR reqdict['reservation'] %s " %(reqdict['reservation']))
935 # Immediate XP. Not need to add special parameters.
936 # normally not used in SFA
941 reqdict['type'] = "deploy"
942 reqdict['directory']= ""
943 reqdict['name']= "TestSandrine"
946 # first step : start the OAR job and update the job
947 logger.debug("SLABDRIVER.PY \tLaunchExperimentOnOAR reqdict %s \r\n site_list %s" %(reqdict,site_list) )
949 answer = self.oar.POSTRequestToOARRestAPI('POST_job',reqdict,slice_user)
950 logger.debug("SLABDRIVER \tLaunchExperimentOnOAR jobid %s " %(answer))
954 logger.log_exc("SLABDRIVER \tLaunchExperimentOnOAR Impossible to create job %s " %(answer))
957 logger.debug("SLABDRIVER \tLaunchExperimentOnOAR jobid %s added_nodes %s slice_user %s" %(jobid,added_nodes,slice_user))
958 self.db.update_job( slice_name, jobid ,added_nodes)
961 # second step : configure the experiment
962 # we need to store the nodes in a yaml (well...) file like this :
963 # [1,56,23,14,45,75] with name /tmp/sfa<jobid>.json
964 f=open('/tmp/sfa/'+str(jobid)+'.json','w')
966 f.write(str(added_nodes[0].strip('node')))
967 for node in added_nodes[1:len(added_nodes)] :
968 f.write(','+node.strip('node'))
972 # third step : call the senslab-experiment wrapper
973 #command= "java -jar target/sfa-1.0-jar-with-dependencies.jar "+str(jobid)+" "+slice_user
974 javacmdline="/usr/bin/java"
975 jarname="/opt/senslabexperimentwrapper/sfa-1.0-jar-with-dependencies.jar"
976 #ret=subprocess.check_output(["/usr/bin/java", "-jar", ", str(jobid), slice_user])
977 output = subprocess.Popen([javacmdline, "-jar", jarname, str(jobid), slice_user],stdout=subprocess.PIPE).communicate()[0]
979 print>>sys.stderr, "\r\n \r\n LaunchExperimentOnOAR wrapper returns %s " %(output)
983 #Delete the jobs and updates the job id in the senslab table
985 #Does not clear the node list
986 def DeleteSliceFromNodes(self, slice_record):
987 # Get user information
989 self.DeleteJobs(slice_record['oar_job_id'], slice_record['hrn'])
990 self.db.update_job(slice_record['hrn'], job_id = -1)
996 def GetLeases(self, lease_filter=None, return_fields_list=None):
997 reservation_list = self.GetReservedNodes()
998 #Find the slice associated with this user senslab ldap uid
999 for resa in reservation_list:
1000 ldap_info = self.ldap.LdapSearch('(uid='+resa['user']+')')
1001 user = dbsession.query(RegUser).filter_by(email = ldap_info['mail']).first()
1002 slice_info = slab_dbsession.query(SliceSenslab).filter_by(record_id_user = user.record_id).first()
1004 resa['slice_id'] = hrn_to_urn(slice_info.slice_hrn, 'slice')
1005 resa['component_id_list'] = []
1006 #Transform the hostnames into urns (component ids)
1007 for hostname in resa['reserved_nodes_hostnames']:
1008 resa['component_id_list'].append(hostname_to_urn(self.hrn, self.root_auth, hostname))
1012 def augment_records_with_testbed_info (self, sfa_records):
1013 return self.fill_record_info (sfa_records)
1015 def fill_record_info(self, record_list):
1017 Given a SFA record, fill in the senslab specific and SFA specific
1018 fields in the record.
1021 logger.debug("SLABDRIVER \tfill_record_info records %s " %(record_list))
1022 if not isinstance(record_list, list):
1023 record_list = [record_list]
1026 for record in record_list:
1027 #If the record is a SFA slice record, then add information
1028 #about the user of this slice. This kind of information is in the
1030 if str(record['type']) == 'slice':
1031 #Get slab slice record.
1032 recslice = self.GetSlices(slice_filter = \
1033 str(record['hrn']),\
1034 slice_filter_type = 'slice_hrn')
1035 recuser = dbsession.query(RegRecord).filter_by(record_id = \
1036 recslice['record_id_user']).first()
1037 logger.debug( "SLABDRIVER.PY \t fill_record_info SLICE \
1038 rec %s \r\n \r\n" %(recslice))
1039 record.update({'PI':[recuser.hrn],
1040 'researcher': [recuser.hrn],
1041 'name':record['hrn'],
1042 'oar_job_id':recslice['oar_job_id'],
1044 'person_ids':[recslice['record_id_user']],
1045 'geni_urn':'', #For client_helper.py compatibility
1046 'keys':'', #For client_helper.py compatibility
1047 'key_ids':''}) #For client_helper.py compatibility
1049 elif str(record['type']) == 'user':
1050 #The record is a SFA user record.
1051 #Get the information about his slice from Senslab's DB
1052 #and add it to the user record.
1053 recslice = self.GetSlices(slice_filter = \
1054 record['record_id'],\
1055 slice_filter_type = 'record_id_user')
1057 logger.debug( "SLABDRIVER.PY \t fill_record_info user \
1058 rec %s \r\n \r\n" %(recslice))
1059 #Append slice record in records list,
1060 #therefore fetches user and slice info again(one more loop)
1061 #Will update PIs and researcher for the slice
1062 recuser = dbsession.query(RegRecord).filter_by(record_id = \
1063 recslice['record_id_user']).first()
1064 recslice.update({'PI':[recuser.hrn],
1065 'researcher': [recuser.hrn],
1066 'name':record['hrn'],
1067 'oar_job_id':recslice['oar_job_id'],
1069 'person_ids':[recslice['record_id_user']]})
1071 #GetPersons takes [] as filters
1072 #user_slab = self.GetPersons([{'hrn':recuser.hrn}])
1073 user_slab = self.GetPersons([record])
1075 recslice.update({'type':'slice','hrn':recslice['slice_hrn']})
1076 record.update(user_slab[0])
1077 #For client_helper.py compatibility
1078 record.update( { 'geni_urn':'',
1081 record_list.append(recslice)
1083 logger.debug("SLABDRIVER.PY \tfill_record_info ADDING SLICE\
1084 INFO TO USER records %s" %(record_list))
1088 logger.log_exc("SLABDRIVER \t fill_record_info EXCEPTION %s" %(e))
1092 #self.fill_record_slab_info(records)
1093 ##print >>sys.stderr, "\r\n \t\t after fill_record_slab_info %s" %(records)
1094 #self.fill_record_sfa_info(records)
1095 #print >>sys.stderr, "\r\n \t\t after fill_record_sfa_info"
1101 #def update_membership_list(self, oldRecord, record, listName, addFunc, delFunc):
1102 ## get a list of the HRNs tht are members of the old and new records
1104 #oldList = oldRecord.get(listName, [])
1107 #newList = record.get(listName, [])
1109 ## if the lists are the same, then we don't have to update anything
1110 #if (oldList == newList):
1113 ## build a list of the new person ids, by looking up each person to get
1117 #records = table.find({'type': 'user', 'hrn': newList})
1118 #for rec in records:
1119 #newIdList.append(rec['pointer'])
1121 ## build a list of the old person ids from the person_ids field
1123 #oldIdList = oldRecord.get("person_ids", [])
1124 #containerId = oldRecord.get_pointer()
1126 ## if oldRecord==None, then we are doing a Register, instead of an
1129 #containerId = record.get_pointer()
1131 ## add people who are in the new list, but not the oldList
1132 #for personId in newIdList:
1133 #if not (personId in oldIdList):
1134 #addFunc(self.plauth, personId, containerId)
1136 ## remove people who are in the old list, but not the new list
1137 #for personId in oldIdList:
1138 #if not (personId in newIdList):
1139 #delFunc(self.plauth, personId, containerId)
1141 #def update_membership(self, oldRecord, record):
1142 #print >>sys.stderr, " \r\n \r\n ***SLABDRIVER.PY update_membership record ", record
1143 #if record.type == "slice":
1144 #self.update_membership_list(oldRecord, record, 'researcher',
1145 #self.users.AddPersonToSlice,
1146 #self.users.DeletePersonFromSlice)
1147 #elif record.type == "authority":
1152 # I don't think you plan on running a component manager at this point
1153 # let me clean up the mess of ComponentAPI that is deprecated anyways