3 from datetime import datetime
4 from dateutil import tz
5 from time import strftime, gmtime
7 from sfa.util.faults import SliverDoesNotExist, UnknownSfaType
8 from sfa.util.sfalogging import logger
10 from sfa.storage.alchemy import dbsession
11 from sfa.storage.model import RegRecord, RegUser
13 from sfa.trust.credential import Credential
16 from sfa.managers.driver import Driver
17 from sfa.rspecs.version_manager import VersionManager
18 from sfa.rspecs.rspec import RSpec
20 from sfa.util.xrn import hrn_to_urn, urn_to_sliver_id, get_leaf
21 from sfa.planetlab.plxrn import slicename_to_hrn, hrn_to_pl_slicename, \
25 ## thierry: everything that is API-related (i.e. handling incoming requests)
27 # SlabDriver should be really only about talking to the senslab testbed
30 from sfa.senslab.OARrestapi import OARrestapi
31 from sfa.senslab.LDAPapi import LDAPapi
33 from sfa.senslab.slabpostgres import SlabDB, slab_dbsession, SliceSenslab
34 from sfa.senslab.slabaggregate import SlabAggregate
35 from sfa.senslab.slabslices import SlabSlices
38 def __process_walltime(duration=None):
39 """ Calculates the walltime in seconds from the duration in H:M:S
40 specified in the RSpec.
44 walltime = duration.split(":")
45 # Fixing the walltime by adding a few delays. First put the walltime
46 # in seconds oarAdditionalDelay = 20; additional delay for
47 # /bin/sleep command to
48 # take in account prologue and epilogue scripts execution
49 # int walltimeAdditionalDelay = 120; additional delay
51 desired_walltime = int(walltime[0])*3600 + int(walltime[1]) * 60 +\
53 total_walltime = desired_walltime + 140 #+2 min 20
54 sleep_walltime = desired_walltime + 20 #+20 sec
55 logger.debug("SLABDRIVER \t__process_walltime desired_walltime %s\
56 total_walltime %s sleep_walltime %s "\
57 %(desired_walltime, total_walltime, \
59 #Put the walltime back in str form
61 walltime[0] = str(total_walltime / 3600)
62 total_walltime = total_walltime - 3600 * int(walltime[0])
63 #Get the remaining minutes
64 walltime[1] = str(total_walltime / 60)
65 total_walltime = total_walltime - 60 * int(walltime[1])
67 walltime[2] = str(total_walltime)
68 logger.debug("SLABDRIVER \t__process_walltime walltime %s "\
71 #automatically set 10min +2 min 20
75 sleep_walltime = '620'
77 return walltime, sleep_walltime
82 # this inheritance scheme is so that the driver object can receive
83 # GetNodes or GetSites sorts of calls directly
84 # and thus minimize the differences in the managers with the pl version
85 class SlabDriver(Driver):
87 def __init__(self, config):
88 Driver.__init__ (self, config)
90 self.hrn = config.SFA_INTERFACE_HRN
92 self.root_auth = config.SFA_REGISTRY_ROOT_AUTH
94 self.oar = OARrestapi()
96 self.time_format = "%Y-%m-%d %H:%M:%S"
97 self.db = SlabDB(config,debug = True)
101 def sliver_status(self, slice_urn, slice_hrn):
102 """Receive a status request for slice named urn/hrn
103 urn:publicid:IDN+senslab+nturro_slice hrn senslab.nturro_slice
104 shall return a structure as described in
105 http://groups.geni.net/geni/wiki/GAPI_AM_API_V2#SliverStatus
106 NT : not sure if we should implement this or not, but used by sface.
110 #First get the slice with the slice hrn
111 sl = self.GetSlices(slice_filter = slice_hrn, \
112 slice_filter_type = 'slice_hrn')
114 raise SliverDoesNotExist("%s slice_hrn" % (slice_hrn))
116 top_level_status = 'unknown'
117 nodes_in_slice = sl['node_ids']
119 if len(nodes_in_slice) is 0:
120 raise SliverDoesNotExist("No slivers allocated ")
122 top_level_status = 'ready'
124 logger.debug("Slabdriver - sliver_status Sliver status urn %s hrn %s sl\
125 %s \r\n " %(slice_urn, slice_hrn, sl))
127 if sl['oar_job_id'] is not -1:
128 #A job is running on Senslab for this slice
129 # report about the local nodes that are in the slice only
131 nodes_all = self.GetNodes({'hostname':nodes_in_slice},
132 ['node_id', 'hostname','site','boot_state'])
133 nodeall_byhostname = dict([(n['hostname'], n) for n in nodes_all])
137 result['geni_urn'] = slice_urn
138 result['pl_login'] = sl['job_user'] #For compatibility
141 timestamp = float(sl['startTime']) + float(sl['walltime'])
142 result['pl_expires'] = strftime(self.time_format, \
143 gmtime(float(timestamp)))
144 #result['slab_expires'] = strftime(self.time_format,\
145 #gmtime(float(timestamp)))
148 for node in nodeall_byhostname:
150 #res['slab_hostname'] = node['hostname']
151 #res['slab_boot_state'] = node['boot_state']
153 res['pl_hostname'] = nodeall_byhostname[node]['hostname']
154 res['pl_boot_state'] = nodeall_byhostname[node]['boot_state']
155 res['pl_last_contact'] = strftime(self.time_format, \
156 gmtime(float(timestamp)))
157 sliver_id = urn_to_sliver_id(slice_urn, sl['record_id_slice'], \
158 nodeall_byhostname[node]['node_id'])
159 res['geni_urn'] = sliver_id
160 if nodeall_byhostname[node]['boot_state'] == 'Alive':
162 res['geni_status'] = 'ready'
164 res['geni_status'] = 'failed'
165 top_level_status = 'failed'
167 res['geni_error'] = ''
169 resources.append(res)
171 result['geni_status'] = top_level_status
172 result['geni_resources'] = resources
173 logger.debug("SLABDRIVER \tsliver_statusresources %s res %s "\
178 def create_sliver (self, slice_urn, slice_hrn, creds, rspec_string, \
180 logger.debug("SLABDRIVER.PY \tcreate_sliver ")
181 aggregate = SlabAggregate(self)
183 slices = SlabSlices(self)
184 peer = slices.get_peer(slice_hrn)
185 sfa_peer = slices.get_sfa_peer(slice_hrn)
188 if not isinstance(creds, list):
192 slice_record = users[0].get('slice_record', {})
195 rspec = RSpec(rspec_string)
196 logger.debug("SLABDRIVER.PY \tcreate_sliver \trspec.version %s " \
200 # ensure site record exists?
201 # ensure slice record exists
202 sfa_slice = slices.verify_slice(slice_hrn, slice_record, peer, \
203 sfa_peer, options=options)
204 requested_attributes = rspec.version.get_slice_attributes()
206 if requested_attributes:
207 for attrib_dict in requested_attributes:
208 if 'timeslot' in attrib_dict and attrib_dict['timeslot'] \
210 sfa_slice.update({'timeslot':attrib_dict['timeslot']})
211 logger.debug("SLABDRIVER.PY create_sliver slice %s " %(sfa_slice))
213 # ensure person records exists
214 persons = slices.verify_persons(slice_hrn, sfa_slice, users, peer, \
215 sfa_peer, options=options)
217 # ensure slice attributes exists?
220 # add/remove slice from nodes
222 requested_slivers = [node.get('component_name') \
223 for node in rspec.version.get_nodes_with_slivers()]
224 logger.debug("SLADRIVER \tcreate_sliver requested_slivers \
225 requested_slivers %s " %(requested_slivers))
227 nodes = slices.verify_slice_nodes(sfa_slice, requested_slivers, peer)
230 requested_leases = []
232 for lease in rspec.version.get_leases():
234 if not lease.get('lease_id'):
235 requested_lease['hostname'] = \
236 xrn_to_hostname(lease.get('component_id').strip())
237 requested_lease['t_from'] = lease.get('t_from')
238 requested_lease['t_until'] = lease.get('t_until')
240 kept_leases.append(int(lease['lease_id']))
241 if requested_lease.get('hostname'):
242 requested_leases.append(requested_lease)
244 leases = slices.verify_slice_leases(sfa_slice, \
245 requested_leases, kept_leases, peer)
247 return aggregate.get_rspec(slice_xrn=slice_urn, version=rspec.version)
250 def delete_sliver (self, slice_urn, slice_hrn, creds, options):
252 sfa_slice = self.GetSlices(slice_filter = slice_hrn, \
253 slice_filter_type = 'slice_hrn')
254 logger.debug("SLABDRIVER.PY delete_sliver slice %s" %(sfa_slice))
258 slices = SlabSlices(self)
259 # determine if this is a peer slice
261 peer = slices.get_peer(slice_hrn)
264 self.UnBindObjectFromPeer('slice', \
265 sfa_slice['record_id_slice'], peer)
266 self.DeleteSliceFromNodes(sfa_slice)
269 self.BindObjectToPeer('slice', sfa_slice['slice_id'], \
270 peer, sfa_slice['peer_slice_id'])
274 def AddSlice(self, slice_record):
275 slab_slice = SliceSenslab( slice_hrn = slice_record['slice_hrn'], \
276 record_id_slice= slice_record['record_id_slice'] , \
277 record_id_user= slice_record['record_id_user'], \
278 peer_authority = slice_record['peer_authority'])
279 logger.debug("SLABDRIVER.PY \tAddSlice slice_record %s slab_slice %s" \
280 %(slice_record,slab_slice))
281 slab_dbsession.add(slab_slice)
282 slab_dbsession.commit()
285 # first 2 args are None in case of resource discovery
286 def list_resources (self, slice_urn, slice_hrn, creds, options):
287 #cached_requested = options.get('cached', True)
289 version_manager = VersionManager()
290 # get the rspec's return format from options
292 version_manager.get_version(options.get('geni_rspec_version'))
293 version_string = "rspec_%s" % (rspec_version)
295 #panos adding the info option to the caching key (can be improved)
296 if options.get('info'):
297 version_string = version_string + "_" + \
298 options.get('info', 'default')
300 # look in cache first
301 #if cached_requested and self.cache and not slice_hrn:
302 #rspec = self.cache.get(version_string)
304 #logger.debug("SlabDriver.ListResources: \
305 #returning cached advertisement")
308 #panos: passing user-defined options
309 logger.debug("SLABDRIVER \tlist_resources rspec " )
310 aggregate = SlabAggregate(self)
311 origin_hrn = Credential(string=creds[0]).get_gid_caller().get_hrn()
312 options.update({'origin_hrn':origin_hrn})
313 rspec = aggregate.get_rspec(slice_xrn=slice_urn, \
314 version=rspec_version, options=options)
317 #if self.cache and not slice_hrn:
318 #logger.debug("Slab.ListResources: stores advertisement in cache")
319 #self.cache.add(version_string, rspec)
324 def list_slices (self, creds, options):
325 # look in cache first
327 #slices = self.cache.get('slices')
329 #logger.debug("PlDriver.list_slices returns from cache")
333 logger.debug("SLABDRIVER.PY \tlist_slices")
334 slices = self.GetSlices()
335 slice_hrns = [slicename_to_hrn(self.hrn, slab_slice['slice_hrn']) \
336 for slab_slice in slices]
337 slice_urns = [hrn_to_urn(slice_hrn, 'slice') \
338 for slice_hrn in slice_hrns]
342 #logger.debug ("SlabDriver.list_slices stores value in cache")
343 #self.cache.add('slices', slice_urns)
347 #No site or node register supported
348 def register (self, sfa_record, hrn, pub_key):
349 record_type = sfa_record['type']
350 slab_record = self.sfa_fields_to_slab_fields(record_type, hrn, \
354 if record_type == 'slice':
355 acceptable_fields = ['url', 'instantiation', 'name', 'description']
356 for key in slab_record.keys():
357 if key not in acceptable_fields:
359 logger.debug("SLABDRIVER.PY register")
360 slices = self.GetSlices(slice_filter =slab_record['hrn'], \
361 slice_filter_type = 'slice_hrn')
363 pointer = self.AddSlice(slab_record)
365 pointer = slices[0]['slice_id']
367 elif record_type == 'user':
368 persons = self.GetPersons([sfa_record])
369 #persons = self.GetPersons([sfa_record['hrn']])
371 pointer = self.AddPerson(dict(sfa_record))
374 pointer = persons[0]['person_id']
376 #Does this make sense to senslab ?
377 #if 'enabled' in sfa_record and sfa_record['enabled']:
378 #self.UpdatePerson(pointer, \
379 #{'enabled': sfa_record['enabled']})
381 #TODO register Change this AddPersonToSite stuff 05/07/2012 SA
382 # add this person to the site only if
383 # she is being added for the first
384 # time by sfa and doesnt already exist in plc
385 if not persons or not persons[0]['site_ids']:
386 login_base = get_leaf(sfa_record['authority'])
387 self.AddPersonToSite(pointer, login_base)
389 # What roles should this user have?
390 #TODO : DElete this AddRoleToPerson 04/07/2012 SA
391 #Function prototype is :
392 #AddRoleToPerson(self, auth, role_id_or_name, person_id_or_email)
393 #what's the pointer doing here?
394 self.AddRoleToPerson('user', pointer)
397 self.AddPersonKey(pointer, {'key_type' : 'ssh', \
400 #No node adding outside OAR
404 #No site or node record update allowed
405 def update (self, old_sfa_record, new_sfa_record, hrn, new_key):
406 pointer = old_sfa_record['pointer']
407 old_sfa_record_type = old_sfa_record['type']
409 # new_key implemented for users only
410 if new_key and old_sfa_record_type not in [ 'user' ]:
411 raise UnknownSfaType(old_sfa_record_type)
413 #if (type == "authority"):
414 #self.shell.UpdateSite(pointer, new_sfa_record)
416 if old_sfa_record_type == "slice":
417 slab_record = self.sfa_fields_to_slab_fields(old_sfa_record_type, \
419 if 'name' in slab_record:
420 slab_record.pop('name')
421 #Prototype should be UpdateSlice(self,
422 #auth, slice_id_or_name, slice_fields)
423 #Senslab cannot update slice since slice = job
424 #so we must delete and create another job
425 self.UpdateSlice(pointer, slab_record)
427 elif old_sfa_record_type == "user":
429 all_fields = new_sfa_record
430 for key in all_fields.keys():
431 if key in ['first_name', 'last_name', 'title', 'email',
432 'password', 'phone', 'url', 'bio', 'accepted_aup',
434 update_fields[key] = all_fields[key]
435 self.UpdatePerson(pointer, update_fields)
438 # must check this key against the previous one if it exists
439 persons = self.GetPersons([pointer], ['key_ids'])
441 keys = person['key_ids']
442 keys = self.GetKeys(person['key_ids'])
444 # Delete all stale keys
447 if new_key != key['key']:
448 self.DeleteKey(key['key_id'])
452 self.AddPersonKey(pointer, {'key_type': 'ssh', \
459 def remove (self, sfa_record):
460 sfa_record_type = sfa_record['type']
461 hrn = sfa_record['hrn']
462 record_id = sfa_record['record_id']
463 if sfa_record_type == 'user':
464 #ldap_uid = hrn.split(".")[len(hrn.split(".")) -1]
466 persons = self.GetPersons(sfa_record)
467 # only delete this person if he has site ids. if he doesnt, it probably means
468 # he was just removed from a site, not actually deleted
469 if persons and persons[0]['site_ids']:
470 #TODO : delete person in LDAP
471 self.DeletePerson(sfa_record)
472 elif sfa_record_type == 'slice':
473 if self.GetSlices(slice_filter = hrn, \
474 slice_filter_type = 'slice_hrn'):
475 self.DeleteSlice(hrn)
477 #elif type == 'authority':
478 #if self.GetSites(pointer):
479 #self.DeleteSite(pointer)
485 #TODO clean GetPeers. 05/07/12SA
486 def GetPeers (self, auth = None, peer_filter=None, return_fields_list=None):
488 existing_records = {}
489 existing_hrns_by_types = {}
490 logger.debug("SLABDRIVER \tGetPeers auth = %s, peer_filter %s, \
491 return_field %s " %(auth , peer_filter, return_fields_list))
492 all_records = dbsession.query(RegRecord).\
493 filter(RegRecord.type.like('%authority%')).all()
494 for record in all_records:
495 existing_records[(record.hrn, record.type)] = record
496 if record.type not in existing_hrns_by_types:
497 existing_hrns_by_types[record.type] = [record.hrn]
498 logger.debug("SLABDRIVER \tGetPeer\t NOT IN \
499 existing_hrns_by_types %s " %( existing_hrns_by_types))
502 logger.debug("SLABDRIVER \tGetPeer\t \INNN type %s hrn %s " \
503 %(record.type,record.hrn))
504 existing_hrns_by_types[record.type].append(record.hrn)
507 logger.debug("SLABDRIVER \tGetPeer\texisting_hrns_by_types %s "\
508 %( existing_hrns_by_types))
513 records_list.append(existing_records[(peer_filter,'authority')])
515 for hrn in existing_hrns_by_types['authority']:
516 records_list.append(existing_records[(hrn,'authority')])
518 logger.debug("SLABDRIVER \tGetPeer \trecords_list %s " \
524 return_records = records_list
525 if not peer_filter and not return_fields_list:
529 logger.debug("SLABDRIVER \tGetPeer return_records %s " \
531 return return_records
534 #TODO : Handling OR request in make_ldap_filters_from_records
535 #instead of the for loop
536 #over the records' list
537 def GetPersons(self, person_filter=None, return_fields_list=None):
539 person_filter should be a list of dictionnaries when not set to None.
540 Returns a list of users whose accounts are enabled found in ldap.
543 logger.debug("SLABDRIVER \tGetPersons person_filter %s" \
546 if person_filter and isinstance(person_filter, list):
547 #If we are looking for a list of users (list of dict records)
548 #Usually the list contains only one user record
549 for searched_attributes in person_filter:
551 #Get only enabled user accounts in senslab LDAP :
552 #add a filter for make_ldap_filters_from_record
553 person = self.ldap.LdapFindUser(searched_attributes, \
554 is_user_enabled=True)
555 person_list.append(person)
558 #Get only enabled user accounts in senslab LDAP :
559 #add a filter for make_ldap_filters_from_record
560 person_list = self.ldap.LdapFindUser(is_user_enabled=True)
564 def GetTimezone(self):
565 server_timestamp, server_tz = self.oar.parser.\
566 SendRequest("GET_timezone")
567 return server_timestamp, server_tz
570 def DeleteJobs(self, job_id, slice_hrn):
573 username = slice_hrn.split(".")[-1].rstrip("_slice")
575 reqdict['method'] = "delete"
576 reqdict['strval'] = str(job_id)
577 answer = self.oar.POSTRequestToOARRestAPI('DELETE_jobs_id', \
579 logger.debug("SLABDRIVER \tDeleteJobs jobid %s \r\n answer %s " \
583 ##TODO : Unused GetJobsId ? SA 05/07/12
584 #def GetJobsId(self, job_id, username = None ):
586 #Details about a specific job.
587 #Includes details about submission time, jot type, state, events,
588 #owner, assigned ressources, walltime etc...
592 #node_list_k = 'assigned_network_address'
593 ##Get job info from OAR
594 #job_info = self.oar.parser.SendRequest(req, job_id, username)
596 #logger.debug("SLABDRIVER \t GetJobsId %s " %(job_info))
598 #if job_info['state'] == 'Terminated':
599 #logger.debug("SLABDRIVER \t GetJobsId job %s TERMINATED"\
602 #if job_info['state'] == 'Error':
603 #logger.debug("SLABDRIVER \t GetJobsId ERROR message %s "\
608 #logger.error("SLABDRIVER \tGetJobsId KeyError")
611 #parsed_job_info = self.get_info_on_reserved_nodes(job_info, \
613 ##Replaces the previous entry
614 ##"assigned_network_address" / "reserved_resources"
616 #job_info.update({'node_ids':parsed_job_info[node_list_k]})
617 #del job_info[node_list_k]
618 #logger.debug(" \r\nSLABDRIVER \t GetJobsId job_info %s " %(job_info))
622 def GetJobsResources(self, job_id, username = None):
623 #job_resources=['reserved_resources', 'assigned_resources',\
624 #'job_id', 'job_uri', 'assigned_nodes',\
626 #assigned_res = ['resource_id', 'resource_uri']
627 #assigned_n = ['node', 'node_uri']
629 req = "GET_jobs_id_resources"
630 node_list_k = 'reserved_resources'
632 #Get job resources list from OAR
633 node_id_list = self.oar.parser.SendRequest(req, job_id, username)
634 logger.debug("SLABDRIVER \t GetJobsResources %s " %(node_id_list))
637 self.__get_hostnames_from_oar_node_ids(node_id_list)
639 #parsed_job_info = self.get_info_on_reserved_nodes(job_info, \
641 #Replaces the previous entry "assigned_network_address" /
642 #"reserved_resources"
644 job_info = {'node_ids': hostname_list}
649 def get_info_on_reserved_nodes(self, job_info, node_list_name):
650 #Get the list of the testbed nodes records and make a
651 #dictionnary keyed on the hostname out of it
652 node_list_dict = self.GetNodes()
653 #node_hostname_list = []
654 node_hostname_list = [node['hostname'] for node in node_list_dict]
655 #for node in node_list_dict:
656 #node_hostname_list.append(node['hostname'])
657 node_dict = dict(zip(node_hostname_list, node_list_dict))
659 reserved_node_hostname_list = []
660 for index in range(len(job_info[node_list_name])):
661 #job_info[node_list_name][k] =
662 reserved_node_hostname_list[index] = \
663 node_dict[job_info[node_list_name][index]]['hostname']
665 logger.debug("SLABDRIVER \t get_info_on_reserved_nodes \
666 reserved_node_hostname_list %s" \
667 %(reserved_node_hostname_list))
669 logger.error("SLABDRIVER \t get_info_on_reserved_nodes KEYERROR " )
671 return reserved_node_hostname_list
673 def GetNodesCurrentlyInUse(self):
674 """Returns a list of all the nodes already involved in an oar job"""
675 return self.oar.parser.SendRequest("GET_running_jobs")
677 def __get_hostnames_from_oar_node_ids(self, resource_id_list ):
678 full_nodes_dict_list = self.GetNodes()
679 #Put the full node list into a dictionary keyed by oar node id
680 oar_id_node_dict = {}
681 for node in full_nodes_dict_list:
682 oar_id_node_dict[node['oar_id']] = node
684 logger.debug("SLABDRIVER \t __get_hostnames_from_oar_node_ids\
685 oar_id_node_dict %s" %(oar_id_node_dict))
687 hostname_dict_list = []
688 for resource_id in resource_id_list:
689 hostname_dict_list.append({'hostname' : \
690 oar_id_node_dict[resource_id]['hostname'],
691 'site_id' : oar_id_node_dict[resource_id]['site']})
693 #hostname_list.append(oar_id_node_dict[resource_id]['hostname'])
694 return hostname_dict_list
696 def GetReservedNodes(self):
697 #Get the nodes in use and the reserved nodes
698 reservation_dict_list = \
699 self.oar.parser.SendRequest("GET_reserved_nodes")
702 for resa in reservation_dict_list:
703 logger.debug ("GetReservedNodes resa %s"%(resa))
704 #dict list of hostnames and their site
705 resa['reserved_nodes'] = \
706 self.__get_hostnames_from_oar_node_ids(resa['resource_ids'])
708 #del resa['resource_ids']
709 return reservation_dict_list
711 def GetNodes(self, node_filter_dict = None, return_fields_list = None):
713 node_filter_dict : dictionnary of lists
716 node_dict_by_id = self.oar.parser.SendRequest("GET_resources_full")
717 node_dict_list = node_dict_by_id.values()
719 #No filtering needed return the list directly
720 if not (node_filter_dict or return_fields_list):
721 return node_dict_list
723 return_node_list = []
725 for filter_key in node_filter_dict:
727 #Filter the node_dict_list by each value contained in the
728 #list node_filter_dict[filter_key]
729 for value in node_filter_dict[filter_key]:
730 for node in node_dict_list:
731 if node[filter_key] == value:
732 if return_fields_list :
734 for k in return_fields_list:
736 return_node_list.append(tmp)
738 return_node_list.append(node)
740 logger.log_exc("GetNodes KeyError")
744 return return_node_list
747 def GetSites(self, site_filter_name_list = None, return_fields_list = None):
748 site_dict = self.oar.parser.SendRequest("GET_sites")
749 #site_dict : dict where the key is the sit ename
750 return_site_list = []
751 if not ( site_filter_name_list or return_fields_list):
752 return_site_list = site_dict.values()
753 return return_site_list
755 for site_filter_name in site_filter_name_list:
756 if site_filter_name in site_dict:
757 if return_fields_list:
758 for field in return_fields_list:
761 tmp[field] = site_dict[site_filter_name][field]
763 logger.error("GetSites KeyError %s "%(field))
765 return_site_list.append(tmp)
767 return_site_list.append( site_dict[site_filter_name])
770 return return_site_list
771 #warning return_fields_list paramr emoved (Not used)
772 def GetSlices(self, slice_filter = None, slice_filter_type = None):
773 #def GetSlices(self, slice_filter = None, slice_filter_type = None, \
774 #return_fields_list = None):
775 """ Get the slice records from the slab db.
776 Returns a slice ditc if slice_filter and slice_filter_type
778 Returns a list of slice dictionnaries if there are no filters
782 return_slice_list = []
785 authorized_filter_types_list = ['slice_hrn', 'record_id_user']
786 logger.debug("SLABDRIVER \tGetSlices authorized_filter_types_list %s"\
787 %(authorized_filter_types_list))
788 if slice_filter_type in authorized_filter_types_list:
789 if slice_filter_type == 'slice_hrn':
790 slicerec = slab_dbsession.query(SliceSenslab).\
791 filter_by(slice_hrn = slice_filter).first()
793 if slice_filter_type == 'record_id_user':
794 slicerec = slab_dbsession.query(SliceSenslab).\
795 filter_by(record_id_user = slice_filter).first()
799 slicerec_dict = slicerec.dump_sqlalchemyobj_to_dict()
800 logger.debug("SLABDRIVER \tGetSlices slicerec_dict %s" \
803 login = slicerec_dict['slice_hrn'].split(".")[1].split("_")[0]
804 logger.debug("\r\n SLABDRIVER \tGetSlices login %s \
806 %(login, slicerec_dict))
807 if slicerec_dict['oar_job_id'] is not -1:
808 #Check with OAR the status of the job if a job id is in
810 rslt = self.GetJobsResources(slicerec_dict['oar_job_id'], \
814 slicerec_dict.update(rslt)
815 slicerec_dict.update({'hrn':\
816 str(slicerec_dict['slice_hrn'])})
817 #If GetJobsResources is empty, this means the job is
818 #now in the 'Terminated' state
819 #Update the slice record
821 self.db.update_job(slice_filter, job_id = -1)
822 slicerec_dict['oar_job_id'] = -1
824 update({'hrn':str(slicerec_dict['slice_hrn'])})
827 slicerec_dict['node_ids'] = slicerec_dict['node_list']
831 logger.debug("SLABDRIVER.PY \tGetSlices slicerec_dict %s"\
838 return_slice_list = slab_dbsession.query(SliceSenslab).all()
840 logger.debug("SLABDRIVER.PY \tGetSlices slices %s \
841 slice_filter %s " %(return_slice_list, slice_filter))
843 #if return_fields_list:
844 #return_slice_list = parse_filter(sliceslist, \
845 #slice_filter,'slice', return_fields_list)
847 return return_slice_list
853 def testbed_name (self): return self.hrn
855 # 'geni_request_rspec_versions' and 'geni_ad_rspec_versions' are mandatory
856 def aggregate_version (self):
857 version_manager = VersionManager()
858 ad_rspec_versions = []
859 request_rspec_versions = []
860 for rspec_version in version_manager.versions:
861 if rspec_version.content_type in ['*', 'ad']:
862 ad_rspec_versions.append(rspec_version.to_dict())
863 if rspec_version.content_type in ['*', 'request']:
864 request_rspec_versions.append(rspec_version.to_dict())
866 'testbed':self.testbed_name(),
867 'geni_request_rspec_versions': request_rspec_versions,
868 'geni_ad_rspec_versions': ad_rspec_versions,
877 # Convert SFA fields to PLC fields for use when registering up updating
878 # registry record in the PLC database
880 # @param type type of record (user, slice, ...)
881 # @param hrn human readable name
882 # @param sfa_fields dictionary of SFA fields
883 # @param slab_fields dictionary of PLC fields (output)
885 def sfa_fields_to_slab_fields(self, sfa_type, hrn, record):
887 def convert_ints(tmpdict, int_fields):
888 for field in int_fields:
890 tmpdict[field] = int(tmpdict[field])
893 #for field in record:
894 # slab_record[field] = record[field]
896 if sfa_type == "slice":
897 #instantion used in get_slivers ?
898 if not "instantiation" in slab_record:
899 slab_record["instantiation"] = "senslab-instantiated"
900 slab_record["hrn"] = hrn_to_pl_slicename(hrn)
901 logger.debug("SLABDRIVER.PY sfa_fields_to_slab_fields \
902 slab_record %s hrn_to_pl_slicename(hrn) hrn %s " \
903 %(slab_record['hrn'], hrn))
905 slab_record["url"] = record["url"]
906 if "description" in record:
907 slab_record["description"] = record["description"]
908 if "expires" in record:
909 slab_record["expires"] = int(record["expires"])
911 #nodes added by OAR only and then imported to SFA
912 #elif type == "node":
913 #if not "hostname" in slab_record:
914 #if not "hostname" in record:
915 #raise MissingSfaInfo("hostname")
916 #slab_record["hostname"] = record["hostname"]
917 #if not "model" in slab_record:
918 #slab_record["model"] = "geni"
921 #elif type == "authority":
922 #slab_record["login_base"] = hrn_to_slab_login_base(hrn)
924 #if not "name" in slab_record:
925 #slab_record["name"] = hrn
927 #if not "abbreviated_name" in slab_record:
928 #slab_record["abbreviated_name"] = hrn
930 #if not "enabled" in slab_record:
931 #slab_record["enabled"] = True
933 #if not "is_public" in slab_record:
934 #slab_record["is_public"] = True
941 def __transforms_timestamp_into_date(self, xp_utc_timestamp = None):
942 """ Transforms unix timestamp into valid OAR date format """
944 #Used in case of a scheduled experiment (not immediate)
945 #To run an XP immediately, don't specify date and time in RSpec
946 #They will be set to None.
948 #transform the xp_utc_timestamp into server readable time
949 xp_server_readable_date = datetime.fromtimestamp(int(\
950 xp_utc_timestamp)).strftime(self.time_format)
952 return xp_server_readable_date
957 def LaunchExperimentOnOAR(self, slice_dict, added_nodes, slice_user=None):
958 """ Creates the structure needed for a correct POST on OAR.
959 Makes the timestamp transformation into the appropriate format.
960 Sends the POST request to create the job with the resources in
968 slice_name = slice_dict['name']
970 slot = slice_dict['timeslot']
971 logger.debug("SLABDRIVER.PY \tLaunchExperimentOnOAR \
974 #Running on default parameters
975 #XP immediate , 10 mins
976 slot = { 'date':None, 'start_time':None,
977 'timezone':None, 'duration':None }#10 min
979 reqdict['workdir'] = '/tmp'
980 reqdict['resource'] = "{network_address in ("
982 for node in added_nodes:
983 logger.debug("OARrestapi \tLaunchExperimentOnOAR \
986 #Get the ID of the node : remove the root auth and put
987 # the site in a separate list.
988 # NT: it's not clear for me if the nodenames will have the senslab
989 #prefix so lets take the last part only, for now.
991 # Again here it's not clear if nodes will be prefixed with <site>_,
992 #lets split and tanke the last part for now.
993 #s=lastpart.split("_")
996 reqdict['resource'] += "'" + nodeid + "', "
997 nodeid_list.append(nodeid)
999 custom_length = len(reqdict['resource'])- 2
1000 reqdict['resource'] = reqdict['resource'][0:custom_length] + \
1001 ")}/nodes=" + str(len(nodeid_list))
1003 #if slot['duration']:
1004 walltime, sleep_walltime = __process_walltime(duration = \
1007 #walltime, sleep_walltime = self.__process_walltime(duration = None)
1009 reqdict['resource'] += ",walltime=" + str(walltime[0]) + \
1010 ":" + str(walltime[1]) + ":" + str(walltime[2])
1011 reqdict['script_path'] = "/bin/sleep " + str(sleep_walltime)
1015 #In case of a scheduled experiment (not immediate)
1016 #To run an XP immediately, don't specify date and time in RSpec
1017 #They will be set to None.
1018 server_timestamp, server_tz = self.GetTimezone()
1019 if slot['date'] and slot['start_time']:
1020 if slot['timezone'] is '' or slot['timezone'] is None:
1021 #assume it is server timezone
1022 from_zone = tz.gettz(server_tz)
1023 logger.warning("SLABDRIVER \tLaunchExperimentOnOAR timezone \
1024 not specified server_tz %s from_zone %s" \
1025 %(server_tz, from_zone))
1027 #Get zone of the user from the reservation time given
1029 from_zone = tz.gettz(slot['timezone'])
1031 date = str(slot['date']) + " " + str(slot['start_time'])
1032 user_datetime = datetime.strptime(date, self.time_format)
1033 user_datetime = user_datetime.replace(tzinfo = from_zone)
1035 #Convert to server zone
1037 to_zone = tz.gettz(server_tz)
1038 reservation_date = user_datetime.astimezone(to_zone)
1039 #Readable time accpeted by OAR
1040 reqdict['reservation'] = reservation_date.strftime(self.time_format)
1042 logger.debug("SLABDRIVER \tLaunchExperimentOnOAR \
1043 reqdict['reservation'] %s " %(reqdict['reservation']))
1046 # Immediate XP. Not need to add special parameters.
1047 # normally not used in SFA
1052 reqdict['type'] = "deploy"
1053 reqdict['directory'] = ""
1054 reqdict['name'] = "TestSandrine"
1057 # first step : start the OAR job and update the job
1058 logger.debug("SLABDRIVER.PY \tLaunchExperimentOnOAR reqdict %s\
1059 \r\n site_list %s" %(reqdict, site_list))
1061 answer = self.oar.POSTRequestToOARRestAPI('POST_job', \
1062 reqdict, slice_user)
1063 logger.debug("SLABDRIVER \tLaunchExperimentOnOAR jobid %s " %(answer))
1065 jobid = answer['id']
1067 logger.log_exc("SLABDRIVER \tLaunchExperimentOnOAR \
1068 Impossible to create job %s " %(answer))
1071 logger.debug("SLABDRIVER \tLaunchExperimentOnOAR jobid %s \
1072 added_nodes %s slice_user %s" %(jobid, added_nodes, slice_user))
1073 self.db.update_job( slice_name, jobid, added_nodes)
1076 # second step : configure the experiment
1077 # we need to store the nodes in a yaml (well...) file like this :
1078 # [1,56,23,14,45,75] with name /tmp/sfa<jobid>.json
1079 job_file = open('/tmp/sfa/'+ str(jobid) + '.json', 'w')
1081 job_file.write(str(added_nodes[0].strip('node')))
1082 for node in added_nodes[1:len(added_nodes)] :
1083 job_file.write(', '+ node.strip('node'))
1087 # third step : call the senslab-experiment wrapper
1088 #command= "java -jar target/sfa-1.0-jar-with-dependencies.jar
1089 # "+str(jobid)+" "+slice_user
1090 javacmdline = "/usr/bin/java"
1092 "/opt/senslabexperimentwrapper/sfa-1.0-jar-with-dependencies.jar"
1093 #ret=subprocess.check_output(["/usr/bin/java", "-jar", ", \
1094 #str(jobid), slice_user])
1095 output = subprocess.Popen([javacmdline, "-jar", jarname, str(jobid), \
1096 slice_user],stdout=subprocess.PIPE).communicate()[0]
1098 logger.debug("SLABDRIVER \tLaunchExperimentOnOAR wrapper returns%s " \
1103 #Delete the jobs and updates the job id in the senslab table
1105 #Does not clear the node list
1106 def DeleteSliceFromNodes(self, slice_record):
1107 # Get user information
1109 self.DeleteJobs(slice_record['oar_job_id'], slice_record['hrn'])
1110 self.db.update_job(slice_record['hrn'], job_id = -1)
1116 def GetLeases(self, lease_filter_dict=None, return_fields_list=None):
1117 unfiltered_reservation_list = self.GetReservedNodes()
1118 reservation_list = []
1119 #Find the slice associated with this user senslab ldap uid
1120 logger.debug(" SLABDRIVER.PY \tGetLeases ")
1121 for resa in unfiltered_reservation_list:
1122 ldap_info = self.ldap.LdapSearch('(uid='+resa['user']+')')
1123 ldap_info = ldap_info[0][1]
1125 user = dbsession.query(RegUser).filter_by(email = \
1126 ldap_info['mail'][0]).first()
1128 slice_info = slab_dbsession.query(SliceSenslab).\
1129 filter_by(record_id_user = user.record_id).first()
1131 resa['slice_id'] = hrn_to_urn(slice_info.slice_hrn, 'slice')
1132 resa['component_id_list'] = []
1133 #Transform the hostnames into urns (component ids)
1134 for node in resa['reserved_nodes']:
1135 resa['component_id_list'].append(hostname_to_urn(self.hrn, \
1136 self.root_auth, node['hostname']))
1139 #Filter the reservation list if necessary
1140 #Returns all the leases associated with a given slice
1141 if lease_filter_dict:
1142 logger.debug("SLABDRIVER \tGetLeases lease_filter_dict %s"\
1143 %(lease_filter_dict))
1144 for resa in unfiltered_reservation_list:
1145 if lease_filter_dict['name'] == resa['slice_id']:
1146 reservation_list.append(resa)
1148 reservation_list = unfiltered_reservation_list
1150 logger.debug(" SLABDRIVER.PY \tGetLeases reservation_list %s"\
1151 %(reservation_list))
1152 return reservation_list
1154 def augment_records_with_testbed_info (self, sfa_records):
1155 return self.fill_record_info (sfa_records)
1157 def fill_record_info(self, record_list):
1159 Given a SFA record, fill in the senslab specific and SFA specific
1160 fields in the record.
1163 logger.debug("SLABDRIVER \tfill_record_info records %s " %(record_list))
1164 if not isinstance(record_list, list):
1165 record_list = [record_list]
1168 for record in record_list:
1169 #If the record is a SFA slice record, then add information
1170 #about the user of this slice. This kind of
1171 #information is in the Senslab's DB.
1172 if str(record['type']) == 'slice':
1173 #Get slab slice record.
1174 recslice = self.GetSlices(slice_filter = \
1175 str(record['hrn']),\
1176 slice_filter_type = 'slice_hrn')
1177 recuser = dbsession.query(RegRecord).filter_by(record_id = \
1178 recslice['record_id_user']).first()
1179 logger.debug( "SLABDRIVER.PY \t fill_record_info SLICE \
1180 rec %s \r\n \r\n" %(recslice))
1181 record.update({'PI':[recuser.hrn],
1182 'researcher': [recuser.hrn],
1183 'name':record['hrn'],
1184 'oar_job_id':recslice['oar_job_id'],
1186 'person_ids':[recslice['record_id_user']],
1187 'geni_urn':'', #For client_helper.py compatibility
1188 'keys':'', #For client_helper.py compatibility
1189 'key_ids':''}) #For client_helper.py compatibility
1191 elif str(record['type']) == 'user':
1192 #The record is a SFA user record.
1193 #Get the information about his slice from Senslab's DB
1194 #and add it to the user record.
1195 recslice = self.GetSlices(\
1196 slice_filter = record['record_id'],\
1197 slice_filter_type = 'record_id_user')
1199 logger.debug( "SLABDRIVER.PY \t fill_record_info user \
1200 rec %s \r\n \r\n" %(recslice))
1201 #Append slice record in records list,
1202 #therefore fetches user and slice info again(one more loop)
1203 #Will update PIs and researcher for the slice
1204 recuser = dbsession.query(RegRecord).filter_by(record_id = \
1205 recslice['record_id_user']).\
1207 recslice.update({'PI':[recuser.hrn],
1208 'researcher': [recuser.hrn],
1209 'name':record['hrn'],
1210 'oar_job_id':recslice['oar_job_id'],
1212 'person_ids':[recslice['record_id_user']]})
1214 #GetPersons takes [] as filters
1215 #user_slab = self.GetPersons([{'hrn':recuser.hrn}])
1216 user_slab = self.GetPersons([record])
1218 recslice.update({'type':'slice', \
1219 'hrn':recslice['slice_hrn']})
1220 record.update(user_slab[0])
1221 #For client_helper.py compatibility
1222 record.update( { 'geni_urn':'',
1225 record_list.append(recslice)
1227 logger.debug("SLABDRIVER.PY \tfill_record_info ADDING SLICE\
1228 INFO TO USER records %s" %(record_list))
1231 except TypeError, error:
1232 logger.log_exc("SLABDRIVER \t fill_record_info EXCEPTION %s"\
1237 #self.fill_record_slab_info(records)
1243 #TODO Update membership? update_membership_list SA 05/07/12
1244 #def update_membership_list(self, oldRecord, record, listName, addFunc, \
1246 ## get a list of the HRNs tht are members of the old and new records
1248 #oldList = oldRecord.get(listName, [])
1251 #newList = record.get(listName, [])
1253 ## if the lists are the same, then we don't have to update anything
1254 #if (oldList == newList):
1257 ## build a list of the new person ids, by looking up each person to get
1261 #records = table.find({'type': 'user', 'hrn': newList})
1262 #for rec in records:
1263 #newIdList.append(rec['pointer'])
1265 ## build a list of the old person ids from the person_ids field
1267 #oldIdList = oldRecord.get("person_ids", [])
1268 #containerId = oldRecord.get_pointer()
1270 ## if oldRecord==None, then we are doing a Register, instead of an
1273 #containerId = record.get_pointer()
1275 ## add people who are in the new list, but not the oldList
1276 #for personId in newIdList:
1277 #if not (personId in oldIdList):
1278 #addFunc(self.plauth, personId, containerId)
1280 ## remove people who are in the old list, but not the new list
1281 #for personId in oldIdList:
1282 #if not (personId in newIdList):
1283 #delFunc(self.plauth, personId, containerId)
1285 #def update_membership(self, oldRecord, record):
1287 #if record.type == "slice":
1288 #self.update_membership_list(oldRecord, record, 'researcher',
1289 #self.users.AddPersonToSlice,
1290 #self.users.DeletePersonFromSlice)
1291 #elif record.type == "authority":
1296 # I don't think you plan on running a component manager at this point
1297 # let me clean up the mess of ComponentAPI that is deprecated anyways
1300 #TODO FUNCTIONS SECTION 04/07/2012 SA
1302 #TODO : Is UnBindObjectFromPeer still necessary ? Currently does nothing
1304 def UnBindObjectFromPeer(self, auth, object_type, object_id, shortname):
1305 """ This method is a hopefully temporary hack to let the sfa correctly
1306 detach the objects it creates from a remote peer object. This is
1307 needed so that the sfa federation link can work in parallel with
1308 RefreshPeer, as RefreshPeer depends on remote objects being correctly
1311 auth : struct, API authentication structure
1312 AuthMethod : string, Authentication method to use
1313 object_type : string, Object type, among 'site','person','slice',
1315 object_id : int, object_id
1316 shortname : string, peer shortname
1320 logger.warning("SLABDRIVER \tUnBindObjectFromPeer EMPTY-\
1324 #TODO Is BindObjectToPeer still necessary ? Currently does nothing
1326 def BindObjectToPeer(self, auth, object_type, object_id, shortname=None, \
1327 remote_object_id=None):
1328 """This method is a hopefully temporary hack to let the sfa correctly
1329 attach the objects it creates to a remote peer object. This is needed
1330 so that the sfa federation link can work in parallel with RefreshPeer,
1331 as RefreshPeer depends on remote objects being correctly marked.
1333 shortname : string, peer shortname
1334 remote_object_id : int, remote object_id, set to 0 if unknown
1338 logger.warning("SLABDRIVER \tBindObjectToPeer EMPTY - DO NOTHING \r\n ")
1341 #TODO UpdateSlice 04/07/2012 SA
1342 #Funciton should delete and create another job since oin senslab slice=job
1343 def UpdateSlice(self, auth, slice_id_or_name, slice_fields=None):
1344 """Updates the parameters of an existing slice with the values in
1346 Users may only update slices of which they are members.
1347 PIs may update any of the slices at their sites, or any slices of
1348 which they are members. Admins may update any slice.
1349 Only PIs and admins may update max_nodes. Slices cannot be renewed
1350 (by updating the expires parameter) more than 8 weeks into the future.
1351 Returns 1 if successful, faults otherwise.
1355 logger.warning("SLABDRIVER UpdateSlice EMPTY - DO NOTHING \r\n ")
1358 #TODO UpdatePerson 04/07/2012 SA
1359 def UpdatePerson(self, auth, person_id_or_email, person_fields=None):
1360 """Updates a person. Only the fields specified in person_fields
1361 are updated, all other fields are left untouched.
1362 Users and techs can only update themselves. PIs can only update
1363 themselves and other non-PIs at their sites.
1364 Returns 1 if successful, faults otherwise.
1368 logger.warning("SLABDRIVER UpdatePerson EMPTY - DO NOTHING \r\n ")
1371 #TODO GetKeys 04/07/2012 SA
1372 def GetKeys(self, auth, key_filter=None, return_fields=None):
1373 """Returns an array of structs containing details about keys.
1374 If key_filter is specified and is an array of key identifiers,
1375 or a struct of key attributes, only keys matching the filter
1376 will be returned. If return_fields is specified, only the
1377 specified details will be returned.
1379 Admin may query all keys. Non-admins may only query their own keys.
1383 logger.warning("SLABDRIVER GetKeys EMPTY - DO NOTHING \r\n ")
1386 #TODO DeleteKey 04/07/2012 SA
1387 def DeleteKey(self, auth, key_id):
1389 Non-admins may only delete their own keys.
1390 Returns 1 if successful, faults otherwise.
1394 logger.warning("SLABDRIVER DeleteKey EMPTY - DO NOTHING \r\n ")
1399 def DeletePerson(self, auth, person_record):
1400 """ Disable an existing account in senslab LDAP.
1401 Users and techs can only delete themselves. PIs can only
1402 delete themselves and other non-PIs at their sites.
1403 ins can delete anyone.
1404 Returns 1 if successful, faults otherwise.
1408 #Disable user account in senslab LDAP
1409 ret = self.ldap.LdapMarkUserAsDeleted(person_record)
1410 logger.warning("SLABDRIVER DeletePerson EMPTY - DO NOTHING \r\n ")
1413 #TODO DeleteSlice 04/07/2012 SA
1414 def DeleteSlice(self, auth, slice_id_or_name):
1415 """ Deletes the specified slice.
1416 Users may only delete slices of which they are members. PIs may
1417 delete any of the slices at their sites, or any slices of which
1418 they are members. Admins may delete any slice.
1419 Returns 1 if successful, faults otherwise.
1423 logger.warning("SLABDRIVER DeleteSlice EMPTY - DO NOTHING \r\n ")
1426 #TODO AddPerson 04/07/2012 SA
1427 def AddPerson(self, auth, person_fields=None):
1428 """Adds a new account. Any fields specified in person_fields are used,
1429 otherwise defaults are used.
1430 Accounts are disabled by default. To enable an account,
1432 Returns the new person_id (> 0) if successful, faults otherwise.
1436 logger.warning("SLABDRIVER AddPerson EMPTY - DO NOTHING \r\n ")
1439 #TODO AddPersonToSite 04/07/2012 SA
1440 def AddPersonToSite (self, auth, person_id_or_email, \
1441 site_id_or_login_base=None):
1442 """ Adds the specified person to the specified site. If the person is
1443 already a member of the site, no errors are returned. Does not change
1444 the person's primary site.
1445 Returns 1 if successful, faults otherwise.
1449 logger.warning("SLABDRIVER AddPersonToSite EMPTY - DO NOTHING \r\n ")
1452 #TODO AddRoleToPerson : Not sure if needed in senslab 04/07/2012 SA
1453 def AddRoleToPerson(self, auth, role_id_or_name, person_id_or_email):
1454 """Grants the specified role to the person.
1455 PIs can only grant the tech and user roles to users and techs at their
1456 sites. Admins can grant any role to any user.
1457 Returns 1 if successful, faults otherwise.
1461 logger.warning("SLABDRIVER AddRoleToPerson EMPTY - DO NOTHING \r\n ")
1464 #TODO AddPersonKey 04/07/2012 SA
1465 def AddPersonKey(self, auth, person_id_or_email, key_fields=None):
1466 """Adds a new key to the specified account.
1467 Non-admins can only modify their own keys.
1468 Returns the new key_id (> 0) if successful, faults otherwise.
1472 logger.warning("SLABDRIVER AddPersonKey EMPTY - DO NOTHING \r\n ")