3 from datetime import datetime
4 from dateutil import tz
5 from time import strftime, gmtime
7 from sfa.util.faults import SliverDoesNotExist, UnknownSfaType
8 from sfa.util.sfalogging import logger
10 from sfa.storage.alchemy import dbsession
11 from sfa.storage.model import RegRecord, RegUser
13 from sfa.trust.credential import Credential
16 from sfa.managers.driver import Driver
17 from sfa.rspecs.version_manager import VersionManager
18 from sfa.rspecs.rspec import RSpec
20 from sfa.util.xrn import hrn_to_urn, urn_to_sliver_id, get_leaf
21 from sfa.planetlab.plxrn import slicename_to_hrn, hrn_to_pl_slicename, \
25 ## thierry: everything that is API-related (i.e. handling incoming requests)
27 # SlabDriver should be really only about talking to the senslab testbed
30 from sfa.senslab.OARrestapi import OARrestapi
31 from sfa.senslab.LDAPapi import LDAPapi
33 from sfa.senslab.slabpostgres import SlabDB, slab_dbsession, SliceSenslab
34 from sfa.senslab.slabaggregate import SlabAggregate
35 from sfa.senslab.slabslices import SlabSlices
42 # this inheritance scheme is so that the driver object can receive
43 # GetNodes or GetSites sorts of calls directly
44 # and thus minimize the differences in the managers with the pl version
45 class SlabDriver(Driver):
47 def __init__(self, config):
48 Driver.__init__ (self, config)
50 self.hrn = config.SFA_INTERFACE_HRN
52 self.root_auth = config.SFA_REGISTRY_ROOT_AUTH
54 self.oar = OARrestapi()
56 self.time_format = "%Y-%m-%d %H:%M:%S"
57 self.db = SlabDB(config,debug = True)
61 def sliver_status(self, slice_urn, slice_hrn):
62 """Receive a status request for slice named urn/hrn
63 urn:publicid:IDN+senslab+nturro_slice hrn senslab.nturro_slice
64 shall return a structure as described in
65 http://groups.geni.net/geni/wiki/GAPI_AM_API_V2#SliverStatus
66 NT : not sure if we should implement this or not, but used by sface.
70 #First get the slice with the slice hrn
71 sl = self.GetSlices(slice_filter = slice_hrn, \
72 slice_filter_type = 'slice_hrn')
74 raise SliverDoesNotExist("%s slice_hrn" % (slice_hrn))
76 top_level_status = 'unknown'
77 nodes_in_slice = sl['node_ids']
79 if len(nodes_in_slice) is 0:
80 raise SliverDoesNotExist("No slivers allocated ")
82 top_level_status = 'ready'
84 logger.debug("Slabdriver - sliver_status Sliver status urn %s hrn %s sl\
85 %s \r\n " %(slice_urn, slice_hrn, sl))
87 if sl['oar_job_id'] is not -1:
88 #A job is running on Senslab for this slice
89 # report about the local nodes that are in the slice only
91 nodes_all = self.GetNodes({'hostname':nodes_in_slice},
92 ['node_id', 'hostname','site','boot_state'])
93 nodeall_byhostname = dict([(n['hostname'], n) for n in nodes_all])
97 result['geni_urn'] = slice_urn
98 result['pl_login'] = sl['job_user'] #For compatibility
101 timestamp = float(sl['startTime']) + float(sl['walltime'])
102 result['pl_expires'] = strftime(self.time_format, \
103 gmtime(float(timestamp)))
104 #result['slab_expires'] = strftime(self.time_format,\
105 #gmtime(float(timestamp)))
108 for node in nodeall_byhostname:
110 #res['slab_hostname'] = node['hostname']
111 #res['slab_boot_state'] = node['boot_state']
113 res['pl_hostname'] = nodeall_byhostname[node]['hostname']
114 res['pl_boot_state'] = nodeall_byhostname[node]['boot_state']
115 res['pl_last_contact'] = strftime(self.time_format, \
116 gmtime(float(timestamp)))
117 sliver_id = urn_to_sliver_id(slice_urn, sl['record_id_slice'], \
118 nodeall_byhostname[node]['node_id'])
119 res['geni_urn'] = sliver_id
120 if nodeall_byhostname[node]['boot_state'] == 'Alive':
122 res['geni_status'] = 'ready'
124 res['geni_status'] = 'failed'
125 top_level_status = 'failed'
127 res['geni_error'] = ''
129 resources.append(res)
131 result['geni_status'] = top_level_status
132 result['geni_resources'] = resources
133 logger.debug("SLABDRIVER \tsliver_statusresources %s res %s "\
138 def create_sliver (self, slice_urn, slice_hrn, creds, rspec_string, \
140 logger.debug("SLABDRIVER.PY \tcreate_sliver ")
141 aggregate = SlabAggregate(self)
143 slices = SlabSlices(self)
144 peer = slices.get_peer(slice_hrn)
145 sfa_peer = slices.get_sfa_peer(slice_hrn)
148 if not isinstance(creds, list):
152 slice_record = users[0].get('slice_record', {})
155 rspec = RSpec(rspec_string)
156 logger.debug("SLABDRIVER.PY \tcreate_sliver \trspec.version %s " \
160 # ensure site record exists?
161 # ensure slice record exists
162 sfa_slice = slices.verify_slice(slice_hrn, slice_record, peer, \
163 sfa_peer, options=options)
164 requested_attributes = rspec.version.get_slice_attributes()
166 if requested_attributes:
167 for attrib_dict in requested_attributes:
168 if 'timeslot' in attrib_dict and attrib_dict['timeslot'] \
170 sfa_slice.update({'timeslot':attrib_dict['timeslot']})
171 logger.debug("SLABDRIVER.PY create_sliver slice %s " %(sfa_slice))
173 # ensure person records exists
174 persons = slices.verify_persons(slice_hrn, sfa_slice, users, peer, \
175 sfa_peer, options=options)
177 # ensure slice attributes exists?
180 # add/remove slice from nodes
182 requested_slivers = [node.get('component_name') \
183 for node in rspec.version.get_nodes_with_slivers()]
184 logger.debug("SLADRIVER \tcreate_sliver requested_slivers \
185 requested_slivers %s " %(requested_slivers))
187 nodes = slices.verify_slice_nodes(sfa_slice, requested_slivers, peer)
190 requested_leases = []
192 for lease in rspec.version.get_leases():
194 if not lease.get('lease_id'):
195 requested_lease['hostname'] = \
196 xrn_to_hostname(lease.get('component_id').strip())
197 requested_lease['start_time'] = lease.get('start_time')
198 requested_lease['duration'] = lease.get('duration')
200 kept_leases.append(int(lease['lease_id']))
201 if requested_lease.get('hostname'):
202 requested_leases.append(requested_lease)
204 leases = slices.verify_slice_leases(sfa_slice, \
205 requested_leases, kept_leases, peer)
207 return aggregate.get_rspec(slice_xrn=slice_urn, version=rspec.version)
210 def delete_sliver (self, slice_urn, slice_hrn, creds, options):
212 sfa_slice = self.GetSlices(slice_filter = slice_hrn, \
213 slice_filter_type = 'slice_hrn')
214 logger.debug("SLABDRIVER.PY delete_sliver slice %s" %(sfa_slice))
218 slices = SlabSlices(self)
219 # determine if this is a peer slice
221 peer = slices.get_peer(slice_hrn)
224 self.UnBindObjectFromPeer('slice', \
225 sfa_slice['record_id_slice'], peer)
226 self.DeleteSliceFromNodes(sfa_slice)
229 self.BindObjectToPeer('slice', sfa_slice['slice_id'], \
230 peer, sfa_slice['peer_slice_id'])
234 def AddSlice(self, slice_record):
235 slab_slice = SliceSenslab( slice_hrn = slice_record['slice_hrn'], \
236 record_id_slice= slice_record['record_id_slice'] , \
237 record_id_user= slice_record['record_id_user'], \
238 peer_authority = slice_record['peer_authority'])
239 logger.debug("SLABDRIVER.PY \tAddSlice slice_record %s slab_slice %s" \
240 %(slice_record,slab_slice))
241 slab_dbsession.add(slab_slice)
242 slab_dbsession.commit()
245 # first 2 args are None in case of resource discovery
246 def list_resources (self, slice_urn, slice_hrn, creds, options):
247 #cached_requested = options.get('cached', True)
249 version_manager = VersionManager()
250 # get the rspec's return format from options
252 version_manager.get_version(options.get('geni_rspec_version'))
253 version_string = "rspec_%s" % (rspec_version)
255 #panos adding the info option to the caching key (can be improved)
256 if options.get('info'):
257 version_string = version_string + "_" + \
258 options.get('info', 'default')
260 # look in cache first
261 #if cached_requested and self.cache and not slice_hrn:
262 #rspec = self.cache.get(version_string)
264 #logger.debug("SlabDriver.ListResources: \
265 #returning cached advertisement")
268 #panos: passing user-defined options
269 logger.debug("SLABDRIVER \tlist_resources rspec " )
270 aggregate = SlabAggregate(self)
271 origin_hrn = Credential(string=creds[0]).get_gid_caller().get_hrn()
272 options.update({'origin_hrn':origin_hrn})
273 rspec = aggregate.get_rspec(slice_xrn=slice_urn, \
274 version=rspec_version, options=options)
277 #if self.cache and not slice_hrn:
278 #logger.debug("Slab.ListResources: stores advertisement in cache")
279 #self.cache.add(version_string, rspec)
284 def list_slices (self, creds, options):
285 # look in cache first
287 #slices = self.cache.get('slices')
289 #logger.debug("PlDriver.list_slices returns from cache")
293 logger.debug("SLABDRIVER.PY \tlist_slices")
294 slices = self.GetSlices()
295 slice_hrns = [slicename_to_hrn(self.hrn, slab_slice['slice_hrn']) \
296 for slab_slice in slices]
297 slice_urns = [hrn_to_urn(slice_hrn, 'slice') \
298 for slice_hrn in slice_hrns]
302 #logger.debug ("SlabDriver.list_slices stores value in cache")
303 #self.cache.add('slices', slice_urns)
307 #No site or node register supported
308 def register (self, sfa_record, hrn, pub_key):
309 record_type = sfa_record['type']
310 slab_record = self.sfa_fields_to_slab_fields(record_type, hrn, \
314 if record_type == 'slice':
315 acceptable_fields = ['url', 'instantiation', 'name', 'description']
316 for key in slab_record.keys():
317 if key not in acceptable_fields:
319 logger.debug("SLABDRIVER.PY register")
320 slices = self.GetSlices(slice_filter =slab_record['hrn'], \
321 slice_filter_type = 'slice_hrn')
323 pointer = self.AddSlice(slab_record)
325 pointer = slices[0]['slice_id']
327 elif record_type == 'user':
328 persons = self.GetPersons([sfa_record])
329 #persons = self.GetPersons([sfa_record['hrn']])
331 pointer = self.AddPerson(dict(sfa_record))
334 pointer = persons[0]['person_id']
336 #Does this make sense to senslab ?
337 #if 'enabled' in sfa_record and sfa_record['enabled']:
338 #self.UpdatePerson(pointer, \
339 #{'enabled': sfa_record['enabled']})
341 #TODO register Change this AddPersonToSite stuff 05/07/2012 SA
342 # add this person to the site only if
343 # she is being added for the first
344 # time by sfa and doesnt already exist in plc
345 if not persons or not persons[0]['site_ids']:
346 login_base = get_leaf(sfa_record['authority'])
347 self.AddPersonToSite(pointer, login_base)
349 # What roles should this user have?
350 #TODO : DElete this AddRoleToPerson 04/07/2012 SA
351 #Function prototype is :
352 #AddRoleToPerson(self, auth, role_id_or_name, person_id_or_email)
353 #what's the pointer doing here?
354 self.AddRoleToPerson('user', pointer)
357 self.AddPersonKey(pointer, {'key_type' : 'ssh', \
360 #No node adding outside OAR
364 #No site or node record update allowed
365 def update (self, old_sfa_record, new_sfa_record, hrn, new_key):
366 pointer = old_sfa_record['pointer']
367 old_sfa_record_type = old_sfa_record['type']
369 # new_key implemented for users only
370 if new_key and old_sfa_record_type not in [ 'user' ]:
371 raise UnknownSfaType(old_sfa_record_type)
373 #if (type == "authority"):
374 #self.shell.UpdateSite(pointer, new_sfa_record)
376 if old_sfa_record_type == "slice":
377 slab_record = self.sfa_fields_to_slab_fields(old_sfa_record_type, \
379 if 'name' in slab_record:
380 slab_record.pop('name')
381 #Prototype should be UpdateSlice(self,
382 #auth, slice_id_or_name, slice_fields)
383 #Senslab cannot update slice since slice = job
384 #so we must delete and create another job
385 self.UpdateSlice(pointer, slab_record)
387 elif old_sfa_record_type == "user":
389 all_fields = new_sfa_record
390 for key in all_fields.keys():
391 if key in ['first_name', 'last_name', 'title', 'email',
392 'password', 'phone', 'url', 'bio', 'accepted_aup',
394 update_fields[key] = all_fields[key]
395 self.UpdatePerson(pointer, update_fields)
398 # must check this key against the previous one if it exists
399 persons = self.GetPersons([pointer], ['key_ids'])
401 keys = person['key_ids']
402 keys = self.GetKeys(person['key_ids'])
404 # Delete all stale keys
407 if new_key != key['key']:
408 self.DeleteKey(key['key_id'])
412 self.AddPersonKey(pointer, {'key_type': 'ssh', \
419 def remove (self, sfa_record):
420 sfa_record_type = sfa_record['type']
421 hrn = sfa_record['hrn']
422 record_id = sfa_record['record_id']
423 if sfa_record_type == 'user':
425 #get user from senslab ldap
426 person = self.GetPersons(sfa_record)
427 #No registering at a given site in Senslab.
428 #Once registered to the LDAP, all senslab sites are
431 #Mark account as disabled in ldap
432 self.DeletePerson(sfa_record)
433 elif sfa_record_type == 'slice':
434 if self.GetSlices(slice_filter = hrn, \
435 slice_filter_type = 'slice_hrn'):
436 self.DeleteSlice(sfa_record_type)
438 #elif type == 'authority':
439 #if self.GetSites(pointer):
440 #self.DeleteSite(pointer)
446 #TODO clean GetPeers. 05/07/12SA
447 def GetPeers (self, auth = None, peer_filter=None, return_fields_list=None):
449 existing_records = {}
450 existing_hrns_by_types = {}
451 logger.debug("SLABDRIVER \tGetPeers auth = %s, peer_filter %s, \
452 return_field %s " %(auth , peer_filter, return_fields_list))
453 all_records = dbsession.query(RegRecord).filter(RegRecord.type.like('%authority%')).all()
454 for record in all_records:
455 existing_records[(record.hrn, record.type)] = record
456 if record.type not in existing_hrns_by_types:
457 existing_hrns_by_types[record.type] = [record.hrn]
458 logger.debug("SLABDRIVER \tGetPeer\t NOT IN \
459 existing_hrns_by_types %s " %( existing_hrns_by_types))
462 logger.debug("SLABDRIVER \tGetPeer\t \INNN type %s hrn %s " \
463 %(record.type,record.hrn))
464 existing_hrns_by_types[record.type].append(record.hrn)
467 logger.debug("SLABDRIVER \tGetPeer\texisting_hrns_by_types %s "\
468 %( existing_hrns_by_types))
473 records_list.append(existing_records[(peer_filter,'authority')])
475 for hrn in existing_hrns_by_types['authority']:
476 records_list.append(existing_records[(hrn,'authority')])
478 logger.debug("SLABDRIVER \tGetPeer \trecords_list %s " \
484 return_records = records_list
485 if not peer_filter and not return_fields_list:
489 logger.debug("SLABDRIVER \tGetPeer return_records %s " \
491 return return_records
494 #TODO : Handling OR request in make_ldap_filters_from_records
495 #instead of the for loop
496 #over the records' list
497 def GetPersons(self, person_filter=None, return_fields_list=None):
499 person_filter should be a list of dictionnaries when not set to None.
500 Returns a list of users whose accounts are enabled found in ldap.
503 logger.debug("SLABDRIVER \tGetPersons person_filter %s" \
506 if person_filter and isinstance(person_filter, list):
507 #If we are looking for a list of users (list of dict records)
508 #Usually the list contains only one user record
509 for searched_attributes in person_filter:
511 #Get only enabled user accounts in senslab LDAP :
512 #add a filter for make_ldap_filters_from_record
513 person = self.ldap.LdapFindUser(searched_attributes, \
514 is_user_enabled=True)
515 person_list.append(person)
518 #Get only enabled user accounts in senslab LDAP :
519 #add a filter for make_ldap_filters_from_record
520 person_list = self.ldap.LdapFindUser(is_user_enabled=True)
524 def GetTimezone(self):
525 server_timestamp, server_tz = self.oar.parser.\
526 SendRequest("GET_timezone")
527 return server_timestamp, server_tz
530 def DeleteJobs(self, job_id, slice_hrn):
531 if not job_id or job_id is -1:
533 username = slice_hrn.split(".")[-1].rstrip("_slice")
535 reqdict['method'] = "delete"
536 reqdict['strval'] = str(job_id)
538 answer = self.oar.POSTRequestToOARRestAPI('DELETE_jobs_id', \
540 logger.debug("SLABDRIVER \tDeleteJobs jobid %s \r\n answer %s " \
546 ##TODO : Unused GetJobsId ? SA 05/07/12
547 #def GetJobsId(self, job_id, username = None ):
549 #Details about a specific job.
550 #Includes details about submission time, jot type, state, events,
551 #owner, assigned ressources, walltime etc...
555 #node_list_k = 'assigned_network_address'
556 ##Get job info from OAR
557 #job_info = self.oar.parser.SendRequest(req, job_id, username)
559 #logger.debug("SLABDRIVER \t GetJobsId %s " %(job_info))
561 #if job_info['state'] == 'Terminated':
562 #logger.debug("SLABDRIVER \t GetJobsId job %s TERMINATED"\
565 #if job_info['state'] == 'Error':
566 #logger.debug("SLABDRIVER \t GetJobsId ERROR message %s "\
571 #logger.error("SLABDRIVER \tGetJobsId KeyError")
574 #parsed_job_info = self.get_info_on_reserved_nodes(job_info, \
576 ##Replaces the previous entry
577 ##"assigned_network_address" / "reserved_resources"
579 #job_info.update({'node_ids':parsed_job_info[node_list_k]})
580 #del job_info[node_list_k]
581 #logger.debug(" \r\nSLABDRIVER \t GetJobsId job_info %s " %(job_info))
585 def GetJobsResources(self, job_id, username = None):
586 #job_resources=['reserved_resources', 'assigned_resources',\
587 #'job_id', 'job_uri', 'assigned_nodes',\
589 #assigned_res = ['resource_id', 'resource_uri']
590 #assigned_n = ['node', 'node_uri']
592 req = "GET_jobs_id_resources"
593 node_list_k = 'reserved_resources'
595 #Get job resources list from OAR
596 node_id_list = self.oar.parser.SendRequest(req, job_id, username)
597 logger.debug("SLABDRIVER \t GetJobsResources %s " %(node_id_list))
600 self.__get_hostnames_from_oar_node_ids(node_id_list)
602 #parsed_job_info = self.get_info_on_reserved_nodes(job_info, \
604 #Replaces the previous entry "assigned_network_address" /
605 #"reserved_resources"
607 job_info = {'node_ids': hostname_list}
612 def get_info_on_reserved_nodes(self, job_info, node_list_name):
613 #Get the list of the testbed nodes records and make a
614 #dictionnary keyed on the hostname out of it
615 node_list_dict = self.GetNodes()
616 #node_hostname_list = []
617 node_hostname_list = [node['hostname'] for node in node_list_dict]
618 #for node in node_list_dict:
619 #node_hostname_list.append(node['hostname'])
620 node_dict = dict(zip(node_hostname_list, node_list_dict))
622 reserved_node_hostname_list = []
623 for index in range(len(job_info[node_list_name])):
624 #job_info[node_list_name][k] =
625 reserved_node_hostname_list[index] = \
626 node_dict[job_info[node_list_name][index]]['hostname']
628 logger.debug("SLABDRIVER \t get_info_on_reserved_nodes \
629 reserved_node_hostname_list %s" \
630 %(reserved_node_hostname_list))
632 logger.error("SLABDRIVER \t get_info_on_reserved_nodes KEYERROR " )
634 return reserved_node_hostname_list
636 def GetNodesCurrentlyInUse(self):
637 """Returns a list of all the nodes already involved in an oar job"""
638 return self.oar.parser.SendRequest("GET_running_jobs")
640 def __get_hostnames_from_oar_node_ids(self, resource_id_list ):
641 full_nodes_dict_list = self.GetNodes()
642 #Put the full node list into a dictionary keyed by oar node id
643 oar_id_node_dict = {}
644 for node in full_nodes_dict_list:
645 oar_id_node_dict[node['oar_id']] = node
647 logger.debug("SLABDRIVER \t __get_hostnames_from_oar_node_ids\
648 oar_id_node_dict %s" %(oar_id_node_dict))
650 hostname_dict_list = []
651 for resource_id in resource_id_list:
652 hostname_dict_list.append({'hostname' : \
653 oar_id_node_dict[resource_id]['hostname'],
654 'site_id' : oar_id_node_dict[resource_id]['site']})
656 #hostname_list.append(oar_id_node_dict[resource_id]['hostname'])
657 return hostname_dict_list
659 def GetReservedNodes(self):
660 #Get the nodes in use and the reserved nodes
661 reservation_dict_list = \
662 self.oar.parser.SendRequest("GET_reserved_nodes")
665 for resa in reservation_dict_list:
666 logger.debug ("GetReservedNodes resa %s"%(resa))
667 #dict list of hostnames and their site
668 resa['reserved_nodes'] = \
669 self.__get_hostnames_from_oar_node_ids(resa['resource_ids'])
671 #del resa['resource_ids']
672 return reservation_dict_list
674 def GetNodes(self, node_filter_dict = None, return_fields_list = None):
676 node_filter_dict : dictionnary of lists
679 node_dict_by_id = self.oar.parser.SendRequest("GET_resources_full")
680 node_dict_list = node_dict_by_id.values()
682 #No filtering needed return the list directly
683 if not (node_filter_dict or return_fields_list):
684 return node_dict_list
686 return_node_list = []
688 for filter_key in node_filter_dict:
690 #Filter the node_dict_list by each value contained in the
691 #list node_filter_dict[filter_key]
692 for value in node_filter_dict[filter_key]:
693 for node in node_dict_list:
694 if node[filter_key] == value:
695 if return_fields_list :
697 for k in return_fields_list:
699 return_node_list.append(tmp)
701 return_node_list.append(node)
703 logger.log_exc("GetNodes KeyError")
707 return return_node_list
710 def GetSites(self, site_filter_name_list = None, return_fields_list = None):
711 site_dict = self.oar.parser.SendRequest("GET_sites")
712 #site_dict : dict where the key is the sit ename
713 return_site_list = []
714 if not ( site_filter_name_list or return_fields_list):
715 return_site_list = site_dict.values()
716 return return_site_list
718 for site_filter_name in site_filter_name_list:
719 if site_filter_name in site_dict:
720 if return_fields_list:
721 for field in return_fields_list:
724 tmp[field] = site_dict[site_filter_name][field]
726 logger.error("GetSites KeyError %s "%(field))
728 return_site_list.append(tmp)
730 return_site_list.append( site_dict[site_filter_name])
733 return return_site_list
734 #warning return_fields_list paramr emoved (Not used)
735 def GetSlices(self, slice_filter = None, slice_filter_type = None):
736 #def GetSlices(self, slice_filter = None, slice_filter_type = None, \
737 #return_fields_list = None):
738 """ Get the slice records from the slab db.
739 Returns a slice ditc if slice_filter and slice_filter_type
741 Returns a list of slice dictionnaries if there are no filters
745 return_slice_list = []
748 authorized_filter_types_list = ['slice_hrn', 'record_id_user']
749 logger.debug("SLABDRIVER \tGetSlices authorized_filter_types_list %s"\
750 %(authorized_filter_types_list))
751 if slice_filter_type in authorized_filter_types_list:
752 if slice_filter_type == 'slice_hrn':
753 slicerec = slab_dbsession.query(SliceSenslab).filter_by(slice_hrn = slice_filter).first()
755 if slice_filter_type == 'record_id_user':
756 slicerec = slab_dbsession.query(SliceSenslab).filter_by(record_id_user = slice_filter).first()
760 slicerec_dict = slicerec.dump_sqlalchemyobj_to_dict()
761 logger.debug("SLABDRIVER \tGetSlices slicerec_dict %s" \
764 login = slicerec_dict['slice_hrn'].split(".")[1].split("_")[0]
765 logger.debug("\r\n SLABDRIVER \tGetSlices login %s \
767 %(login, slicerec_dict))
768 if slicerec_dict['oar_job_id'] is not -1:
769 #Check with OAR the status of the job if a job id is in
771 rslt = self.GetJobsResources(slicerec_dict['oar_job_id'], \
775 slicerec_dict.update(rslt)
776 slicerec_dict.update({'hrn':\
777 str(slicerec_dict['slice_hrn'])})
778 #If GetJobsResources is empty, this means the job is
779 #now in the 'Terminated' state
780 #Update the slice record
782 self.db.update_job(slice_filter, job_id = -1)
783 slicerec_dict['oar_job_id'] = -1
785 update({'hrn':str(slicerec_dict['slice_hrn'])})
788 slicerec_dict['node_ids'] = slicerec_dict['node_list']
792 logger.debug("SLABDRIVER.PY \tGetSlices slicerec_dict %s"\
799 return_slice_list = slab_dbsession.query(SliceSenslab).all()
801 logger.debug("SLABDRIVER.PY \tGetSlices slices %s \
802 slice_filter %s " %(return_slice_list, slice_filter))
804 #if return_fields_list:
805 #return_slice_list = parse_filter(sliceslist, \
806 #slice_filter,'slice', return_fields_list)
808 return return_slice_list
814 def testbed_name (self): return self.hrn
816 # 'geni_request_rspec_versions' and 'geni_ad_rspec_versions' are mandatory
817 def aggregate_version (self):
818 version_manager = VersionManager()
819 ad_rspec_versions = []
820 request_rspec_versions = []
821 for rspec_version in version_manager.versions:
822 if rspec_version.content_type in ['*', 'ad']:
823 ad_rspec_versions.append(rspec_version.to_dict())
824 if rspec_version.content_type in ['*', 'request']:
825 request_rspec_versions.append(rspec_version.to_dict())
827 'testbed':self.testbed_name(),
828 'geni_request_rspec_versions': request_rspec_versions,
829 'geni_ad_rspec_versions': ad_rspec_versions,
838 # Convert SFA fields to PLC fields for use when registering up updating
839 # registry record in the PLC database
841 # @param type type of record (user, slice, ...)
842 # @param hrn human readable name
843 # @param sfa_fields dictionary of SFA fields
844 # @param slab_fields dictionary of PLC fields (output)
846 def sfa_fields_to_slab_fields(self, sfa_type, hrn, record):
848 def convert_ints(tmpdict, int_fields):
849 for field in int_fields:
851 tmpdict[field] = int(tmpdict[field])
854 #for field in record:
855 # slab_record[field] = record[field]
857 if sfa_type == "slice":
858 #instantion used in get_slivers ?
859 if not "instantiation" in slab_record:
860 slab_record["instantiation"] = "senslab-instantiated"
861 slab_record["hrn"] = hrn_to_pl_slicename(hrn)
862 logger.debug("SLABDRIVER.PY sfa_fields_to_slab_fields \
863 slab_record %s hrn_to_pl_slicename(hrn) hrn %s " \
864 %(slab_record['hrn'], hrn))
866 slab_record["url"] = record["url"]
867 if "description" in record:
868 slab_record["description"] = record["description"]
869 if "expires" in record:
870 slab_record["expires"] = int(record["expires"])
872 #nodes added by OAR only and then imported to SFA
873 #elif type == "node":
874 #if not "hostname" in slab_record:
875 #if not "hostname" in record:
876 #raise MissingSfaInfo("hostname")
877 #slab_record["hostname"] = record["hostname"]
878 #if not "model" in slab_record:
879 #slab_record["model"] = "geni"
882 #elif type == "authority":
883 #slab_record["login_base"] = hrn_to_slab_login_base(hrn)
885 #if not "name" in slab_record:
886 #slab_record["name"] = hrn
888 #if not "abbreviated_name" in slab_record:
889 #slab_record["abbreviated_name"] = hrn
891 #if not "enabled" in slab_record:
892 #slab_record["enabled"] = True
894 #if not "is_public" in slab_record:
895 #slab_record["is_public"] = True
902 def __transforms_timestamp_into_date(self, xp_utc_timestamp = None):
903 """ Transforms unix timestamp into valid OAR date format """
905 #Used in case of a scheduled experiment (not immediate)
906 #To run an XP immediately, don't specify date and time in RSpec
907 #They will be set to None.
909 #transform the xp_utc_timestamp into server readable time
910 xp_server_readable_date = datetime.fromtimestamp(int(\
911 xp_utc_timestamp)).strftime(self.time_format)
913 return xp_server_readable_date
918 def LaunchExperimentOnOAR(self, slice_dict, added_nodes, slice_user=None):
919 """ Creates the structure needed for a correct POST on OAR.
920 Makes the timestamp transformation into the appropriate format.
921 Sends the POST request to create the job with the resources in
929 slice_name = slice_dict['name']
931 slot = slice_dict['timeslot']
932 logger.debug("SLABDRIVER.PY \tLaunchExperimentOnOAR \
935 #Running on default parameters
936 #XP immediate , 10 mins
937 slot = { 'date':None, 'start_time':None,
938 'timezone':None, 'duration':None }#10 min
940 reqdict['workdir'] = '/tmp'
941 reqdict['resource'] = "{network_address in ("
943 for node in added_nodes:
944 logger.debug("OARrestapi \tLaunchExperimentOnOAR \
947 #Get the ID of the node : remove the root auth and put
948 # the site in a separate list.
949 # NT: it's not clear for me if the nodenames will have the senslab
950 #prefix so lets take the last part only, for now.
952 # Again here it's not clear if nodes will be prefixed with <site>_,
953 #lets split and tanke the last part for now.
954 #s=lastpart.split("_")
957 reqdict['resource'] += "'" + nodeid + "', "
958 nodeid_list.append(nodeid)
960 custom_length = len(reqdict['resource'])- 2
961 reqdict['resource'] = reqdict['resource'][0:custom_length] + \
962 ")}/nodes=" + str(len(nodeid_list))
964 def __process_walltime(duration=None):
965 """ Calculates the walltime in seconds from the duration in H:M:S
966 specified in the RSpec.
970 walltime = duration.split(":")
971 # Fixing the walltime by adding a few delays. First put the walltime
972 # in seconds oarAdditionalDelay = 20; additional delay for
973 # /bin/sleep command to
974 # take in account prologue and epilogue scripts execution
975 # int walltimeAdditionalDelay = 120; additional delay
977 desired_walltime = int(walltime[0])*3600 + int(walltime[1]) * 60 +\
979 total_walltime = desired_walltime + 140 #+2 min 20
980 sleep_walltime = desired_walltime + 20 #+20 sec
981 logger.debug("SLABDRIVER \t__process_walltime desired_walltime %s\
982 total_walltime %s sleep_walltime %s "\
983 %(desired_walltime, total_walltime, \
985 #Put the walltime back in str form
987 walltime[0] = str(total_walltime / 3600)
988 total_walltime = total_walltime - 3600 * int(walltime[0])
989 #Get the remaining minutes
990 walltime[1] = str(total_walltime / 60)
991 total_walltime = total_walltime - 60 * int(walltime[1])
993 walltime[2] = str(total_walltime)
994 logger.debug("SLABDRIVER \t__process_walltime walltime %s "\
997 #automatically set 10min +2 min 20
1001 sleep_walltime = '620'
1003 return walltime, sleep_walltime
1005 #if slot['duration']:
1006 walltime, sleep_walltime = __process_walltime(duration = \
1009 #walltime, sleep_walltime = self.__process_walltime(duration = None)
1011 reqdict['resource'] += ",walltime=" + str(walltime[0]) + \
1012 ":" + str(walltime[1]) + ":" + str(walltime[2])
1013 reqdict['script_path'] = "/bin/sleep " + str(sleep_walltime)
1017 #In case of a scheduled experiment (not immediate)
1018 #To run an XP immediately, don't specify date and time in RSpec
1019 #They will be set to None.
1020 server_timestamp, server_tz = self.GetTimezone()
1021 if slot['date'] and slot['start_time']:
1022 if slot['timezone'] is '' or slot['timezone'] is None:
1023 #assume it is server timezone
1024 from_zone = tz.gettz(server_tz)
1025 logger.warning("SLABDRIVER \tLaunchExperimentOnOAR timezone \
1026 not specified server_tz %s from_zone %s" \
1027 %(server_tz, from_zone))
1029 #Get zone of the user from the reservation time given
1031 from_zone = tz.gettz(slot['timezone'])
1033 date = str(slot['date']) + " " + str(slot['start_time'])
1034 user_datetime = datetime.strptime(date, self.time_format)
1035 user_datetime = user_datetime.replace(tzinfo = from_zone)
1037 #Convert to server zone
1039 to_zone = tz.gettz(server_tz)
1040 reservation_date = user_datetime.astimezone(to_zone)
1041 #Readable time accpeted by OAR
1042 reqdict['reservation'] = reservation_date.strftime(self.time_format)
1044 logger.debug("SLABDRIVER \tLaunchExperimentOnOAR \
1045 reqdict['reservation'] %s " %(reqdict['reservation']))
1048 # Immediate XP. Not need to add special parameters.
1049 # normally not used in SFA
1054 reqdict['type'] = "deploy"
1055 reqdict['directory'] = ""
1056 reqdict['name'] = "TestSandrine"
1059 # first step : start the OAR job and update the job
1060 logger.debug("SLABDRIVER.PY \tLaunchExperimentOnOAR reqdict %s\
1061 \r\n site_list %s" %(reqdict, site_list))
1063 answer = self.oar.POSTRequestToOARRestAPI('POST_job', \
1064 reqdict, slice_user)
1065 logger.debug("SLABDRIVER \tLaunchExperimentOnOAR jobid %s " %(answer))
1067 jobid = answer['id']
1069 logger.log_exc("SLABDRIVER \tLaunchExperimentOnOAR \
1070 Impossible to create job %s " %(answer))
1073 logger.debug("SLABDRIVER \tLaunchExperimentOnOAR jobid %s \
1074 added_nodes %s slice_user %s" %(jobid, added_nodes, slice_user))
1075 self.db.update_job( slice_name, jobid, added_nodes)
1078 # second step : configure the experiment
1079 # we need to store the nodes in a yaml (well...) file like this :
1080 # [1,56,23,14,45,75] with name /tmp/sfa<jobid>.json
1081 job_file = open('/tmp/sfa/'+ str(jobid) + '.json', 'w')
1083 job_file.write(str(added_nodes[0].strip('node')))
1084 for node in added_nodes[1:len(added_nodes)] :
1085 job_file.write(', '+ node.strip('node'))
1089 # third step : call the senslab-experiment wrapper
1090 #command= "java -jar target/sfa-1.0-jar-with-dependencies.jar
1091 # "+str(jobid)+" "+slice_user
1092 javacmdline = "/usr/bin/java"
1094 "/opt/senslabexperimentwrapper/sfa-1.0-jar-with-dependencies.jar"
1095 #ret=subprocess.check_output(["/usr/bin/java", "-jar", ", \
1096 #str(jobid), slice_user])
1097 output = subprocess.Popen([javacmdline, "-jar", jarname, str(jobid), \
1098 slice_user],stdout=subprocess.PIPE).communicate()[0]
1100 logger.debug("SLABDRIVER \tLaunchExperimentOnOAR wrapper returns%s " \
1105 #Delete the jobs and updates the job id in the senslab table
1107 #Does not clear the node list
1108 def DeleteSliceFromNodes(self, slice_record):
1109 # Get user information
1111 self.DeleteJobs(slice_record['oar_job_id'], slice_record['hrn'])
1112 self.db.update_job(slice_record['hrn'], job_id = -1)
1116 def GetLeaseGranularity(self):
1117 """ Returns the granularity of Senslab testbed.
1118 Defined in seconds. """
1123 def GetLeases(self, lease_filter_dict=None, return_fields_list=None):
1124 unfiltered_reservation_list = self.GetReservedNodes()
1125 reservation_list = []
1126 #Find the slice associated with this user senslab ldap uid
1127 logger.debug(" SLABDRIVER.PY \tGetLeases ")
1128 for resa in unfiltered_reservation_list:
1129 ldap_info = self.ldap.LdapSearch('(uid='+resa['user']+')')
1130 ldap_info = ldap_info[0][1]
1132 user = dbsession.query(RegUser).filter_by(email = \
1133 ldap_info['mail'][0]).first()
1135 slice_info = slab_dbsession.query(SliceSenslab).filter_by(record_id_user = user.record_id).first()
1137 resa['slice_id'] = hrn_to_urn(slice_info.slice_hrn, 'slice')
1138 resa['component_id_list'] = []
1139 #Transform the hostnames into urns (component ids)
1140 for node in resa['reserved_nodes']:
1141 resa['component_id_list'].append(hostname_to_urn(self.hrn, \
1142 self.root_auth, node['hostname']))
1145 #Filter the reservation list if necessary
1146 #Returns all the leases associated with a given slice
1147 if lease_filter_dict:
1148 logger.debug("SLABDRIVER \tGetLeases lease_filter_dict %s"\
1149 %(lease_filter_dict))
1150 for resa in unfiltered_reservation_list:
1151 if lease_filter_dict['name'] == resa['slice_id']:
1152 reservation_list.append(resa)
1154 reservation_list = unfiltered_reservation_list
1156 logger.debug(" SLABDRIVER.PY \tGetLeases reservation_list %s"\
1157 %(reservation_list))
1158 return reservation_list
1160 def augment_records_with_testbed_info (self, sfa_records):
1161 return self.fill_record_info (sfa_records)
1163 def fill_record_info(self, record_list):
1165 Given a SFA record, fill in the senslab specific and SFA specific
1166 fields in the record.
1169 logger.debug("SLABDRIVER \tfill_record_info records %s " %(record_list))
1170 if not isinstance(record_list, list):
1171 record_list = [record_list]
1174 for record in record_list:
1175 #If the record is a SFA slice record, then add information
1176 #about the user of this slice. This kind of
1177 #information is in the Senslab's DB.
1178 if str(record['type']) == 'slice':
1179 #Get slab slice record.
1180 recslice = self.GetSlices(slice_filter = \
1181 str(record['hrn']),\
1182 slice_filter_type = 'slice_hrn')
1183 recuser = dbsession.query(RegRecord).filter_by(record_id = \
1184 recslice['record_id_user']).first()
1185 logger.debug( "SLABDRIVER.PY \t fill_record_info SLICE \
1186 rec %s \r\n \r\n" %(recslice))
1187 record.update({'PI':[recuser.hrn],
1188 'researcher': [recuser.hrn],
1189 'name':record['hrn'],
1190 'oar_job_id':recslice['oar_job_id'],
1192 'person_ids':[recslice['record_id_user']],
1193 'geni_urn':'', #For client_helper.py compatibility
1194 'keys':'', #For client_helper.py compatibility
1195 'key_ids':''}) #For client_helper.py compatibility
1197 elif str(record['type']) == 'user':
1198 #The record is a SFA user record.
1199 #Get the information about his slice from Senslab's DB
1200 #and add it to the user record.
1201 recslice = self.GetSlices(\
1202 slice_filter = record['record_id'],\
1203 slice_filter_type = 'record_id_user')
1205 logger.debug( "SLABDRIVER.PY \t fill_record_info user \
1206 rec %s \r\n \r\n" %(recslice))
1207 #Append slice record in records list,
1208 #therefore fetches user and slice info again(one more loop)
1209 #Will update PIs and researcher for the slice
1210 recuser = dbsession.query(RegRecord).filter_by(record_id = \
1211 recslice['record_id_user']).first()
1212 recslice.update({'PI':[recuser.hrn],
1213 'researcher': [recuser.hrn],
1214 'name':record['hrn'],
1215 'oar_job_id':recslice['oar_job_id'],
1217 'person_ids':[recslice['record_id_user']]})
1219 #GetPersons takes [] as filters
1220 #user_slab = self.GetPersons([{'hrn':recuser.hrn}])
1221 user_slab = self.GetPersons([record])
1223 recslice.update({'type':'slice', \
1224 'hrn':recslice['slice_hrn']})
1225 record.update(user_slab[0])
1226 #For client_helper.py compatibility
1227 record.update( { 'geni_urn':'',
1230 record_list.append(recslice)
1232 logger.debug("SLABDRIVER.PY \tfill_record_info ADDING SLICE\
1233 INFO TO USER records %s" %(record_list))
1236 except TypeError, error:
1237 logger.log_exc("SLABDRIVER \t fill_record_info EXCEPTION %s"\
1242 #self.fill_record_slab_info(records)
1248 #TODO Update membership? update_membership_list SA 05/07/12
1249 #def update_membership_list(self, oldRecord, record, listName, addFunc, \
1251 ## get a list of the HRNs tht are members of the old and new records
1253 #oldList = oldRecord.get(listName, [])
1256 #newList = record.get(listName, [])
1258 ## if the lists are the same, then we don't have to update anything
1259 #if (oldList == newList):
1262 ## build a list of the new person ids, by looking up each person to get
1266 #records = table.find({'type': 'user', 'hrn': newList})
1267 #for rec in records:
1268 #newIdList.append(rec['pointer'])
1270 ## build a list of the old person ids from the person_ids field
1272 #oldIdList = oldRecord.get("person_ids", [])
1273 #containerId = oldRecord.get_pointer()
1275 ## if oldRecord==None, then we are doing a Register, instead of an
1278 #containerId = record.get_pointer()
1280 ## add people who are in the new list, but not the oldList
1281 #for personId in newIdList:
1282 #if not (personId in oldIdList):
1283 #addFunc(self.plauth, personId, containerId)
1285 ## remove people who are in the old list, but not the new list
1286 #for personId in oldIdList:
1287 #if not (personId in newIdList):
1288 #delFunc(self.plauth, personId, containerId)
1290 #def update_membership(self, oldRecord, record):
1292 #if record.type == "slice":
1293 #self.update_membership_list(oldRecord, record, 'researcher',
1294 #self.users.AddPersonToSlice,
1295 #self.users.DeletePersonFromSlice)
1296 #elif record.type == "authority":
1301 # I don't think you plan on running a component manager at this point
1302 # let me clean up the mess of ComponentAPI that is deprecated anyways
1305 #TODO FUNCTIONS SECTION 04/07/2012 SA
1307 #TODO : Is UnBindObjectFromPeer still necessary ? Currently does nothing
1309 def UnBindObjectFromPeer(self, auth, object_type, object_id, shortname):
1310 """ This method is a hopefully temporary hack to let the sfa correctly
1311 detach the objects it creates from a remote peer object. This is
1312 needed so that the sfa federation link can work in parallel with
1313 RefreshPeer, as RefreshPeer depends on remote objects being correctly
1316 auth : struct, API authentication structure
1317 AuthMethod : string, Authentication method to use
1318 object_type : string, Object type, among 'site','person','slice',
1320 object_id : int, object_id
1321 shortname : string, peer shortname
1325 logger.warning("SLABDRIVER \tUnBindObjectFromPeer EMPTY-\
1329 #TODO Is BindObjectToPeer still necessary ? Currently does nothing
1331 def BindObjectToPeer(self, auth, object_type, object_id, shortname=None, \
1332 remote_object_id=None):
1333 """This method is a hopefully temporary hack to let the sfa correctly
1334 attach the objects it creates to a remote peer object. This is needed
1335 so that the sfa federation link can work in parallel with RefreshPeer,
1336 as RefreshPeer depends on remote objects being correctly marked.
1338 shortname : string, peer shortname
1339 remote_object_id : int, remote object_id, set to 0 if unknown
1343 logger.warning("SLABDRIVER \tBindObjectToPeer EMPTY - DO NOTHING \r\n ")
1346 #TODO UpdateSlice 04/07/2012 SA
1347 #Funciton should delete and create another job since oin senslab slice=job
1348 def UpdateSlice(self, auth, slice_id_or_name, slice_fields=None):
1349 """Updates the parameters of an existing slice with the values in
1351 Users may only update slices of which they are members.
1352 PIs may update any of the slices at their sites, or any slices of
1353 which they are members. Admins may update any slice.
1354 Only PIs and admins may update max_nodes. Slices cannot be renewed
1355 (by updating the expires parameter) more than 8 weeks into the future.
1356 Returns 1 if successful, faults otherwise.
1360 logger.warning("SLABDRIVER UpdateSlice EMPTY - DO NOTHING \r\n ")
1363 #TODO UpdatePerson 04/07/2012 SA
1364 def UpdatePerson(self, auth, person_id_or_email, person_fields=None):
1365 """Updates a person. Only the fields specified in person_fields
1366 are updated, all other fields are left untouched.
1367 Users and techs can only update themselves. PIs can only update
1368 themselves and other non-PIs at their sites.
1369 Returns 1 if successful, faults otherwise.
1373 logger.warning("SLABDRIVER UpdatePerson EMPTY - DO NOTHING \r\n ")
1376 #TODO GetKeys 04/07/2012 SA
1377 def GetKeys(self, auth, key_filter=None, return_fields=None):
1378 """Returns an array of structs containing details about keys.
1379 If key_filter is specified and is an array of key identifiers,
1380 or a struct of key attributes, only keys matching the filter
1381 will be returned. If return_fields is specified, only the
1382 specified details will be returned.
1384 Admin may query all keys. Non-admins may only query their own keys.
1388 logger.warning("SLABDRIVER GetKeys EMPTY - DO NOTHING \r\n ")
1391 #TODO DeleteKey 04/07/2012 SA
1392 def DeleteKey(self, auth, key_id):
1394 Non-admins may only delete their own keys.
1395 Returns 1 if successful, faults otherwise.
1399 logger.warning("SLABDRIVER DeleteKey EMPTY - DO NOTHING \r\n ")
1403 #TODO : Check rights to delete person
1404 def DeletePerson(self, auth, person_record):
1405 """ Disable an existing account in senslab LDAP.
1406 Users and techs can only delete themselves. PIs can only
1407 delete themselves and other non-PIs at their sites.
1408 ins can delete anyone.
1409 Returns 1 if successful, faults otherwise.
1413 #Disable user account in senslab LDAP
1414 ret = self.ldap.LdapMarkUserAsDeleted(person_record)
1415 logger.warning("SLABDRIVER DeletePerson %s " %(person_record))
1418 #TODO Check DeleteSlice, check rights 05/07/2012 SA
1419 def DeleteSlice(self, auth, slice_record):
1420 """ Deletes the specified slice.
1421 Senslab : Kill the job associated with the slice if there is one
1422 using DeleteSliceFromNodes.
1423 Updates the slice record in slab db to remove the slice nodes.
1425 Users may only delete slices of which they are members. PIs may
1426 delete any of the slices at their sites, or any slices of which
1427 they are members. Admins may delete any slice.
1428 Returns 1 if successful, faults otherwise.
1432 self.DeleteSliceFromNodes(slice_record)
1433 self.db.update_job(slice_record['hrn'], job_id = -1, nodes = [])
1434 logger.warning("SLABDRIVER DeleteSlice %s "%(slice_record))
1437 #TODO AddPerson 04/07/2012 SA
1438 def AddPerson(self, auth, person_fields=None):
1439 """Adds a new account. Any fields specified in person_fields are used,
1440 otherwise defaults are used.
1441 Accounts are disabled by default. To enable an account,
1443 Returns the new person_id (> 0) if successful, faults otherwise.
1447 logger.warning("SLABDRIVER AddPerson EMPTY - DO NOTHING \r\n ")
1450 #TODO AddPersonToSite 04/07/2012 SA
1451 def AddPersonToSite (self, auth, person_id_or_email, \
1452 site_id_or_login_base=None):
1453 """ Adds the specified person to the specified site. If the person is
1454 already a member of the site, no errors are returned. Does not change
1455 the person's primary site.
1456 Returns 1 if successful, faults otherwise.
1460 logger.warning("SLABDRIVER AddPersonToSite EMPTY - DO NOTHING \r\n ")
1463 #TODO AddRoleToPerson : Not sure if needed in senslab 04/07/2012 SA
1464 def AddRoleToPerson(self, auth, role_id_or_name, person_id_or_email):
1465 """Grants the specified role to the person.
1466 PIs can only grant the tech and user roles to users and techs at their
1467 sites. Admins can grant any role to any user.
1468 Returns 1 if successful, faults otherwise.
1472 logger.warning("SLABDRIVER AddRoleToPerson EMPTY - DO NOTHING \r\n ")
1475 #TODO AddPersonKey 04/07/2012 SA
1476 def AddPersonKey(self, auth, person_id_or_email, key_fields=None):
1477 """Adds a new key to the specified account.
1478 Non-admins can only modify their own keys.
1479 Returns the new key_id (> 0) if successful, faults otherwise.
1483 logger.warning("SLABDRIVER AddPersonKey EMPTY - DO NOTHING \r\n ")