1 from datetime import datetime
3 from sfa.util.sfalogging import logger
5 from sfa.storage.alchemy import dbsession
6 from sqlalchemy.orm import joinedload
7 from sfa.storage.model import RegRecord, RegUser, RegSlice, RegKey
8 from sfa.senslab.slabpostgres import slab_dbsession, SenslabXP
10 from sfa.senslab.OARrestapi import OARrestapi
11 from sfa.senslab.LDAPapi import LDAPapi
13 from sfa.util.xrn import Xrn, hrn_to_urn, get_authority
15 from sfa.trust.certificate import Keypair, convert_public_key
16 from sfa.trust.gid import create_uuid
17 from sfa.trust.hierarchy import Hierarchy
19 from sfa.senslab.slabaggregate import slab_xrn_object
21 class SlabTestbedAPI():
22 """ Class enabled to use LDAP and OAR api calls. """
24 def __init__(self, config):
25 """Creates an instance of OARrestapi and LDAPapi which will be used to
26 issue calls to OAR or LDAP methods.
27 Set the time format and the testbed granularity used for OAR
28 reservation and leases.
30 :param config: configuration object from sfa.util.config
31 :type config: Config object
33 self.oar = OARrestapi()
35 self.time_format = "%Y-%m-%d %H:%M:%S"
36 self.root_auth = config.SFA_REGISTRY_ROOT_AUTH
37 self.grain = 1 # 10 mins lease minimum
38 #import logging, logging.handlers
39 #from sfa.util.sfalogging import _SfaLogger
40 #sql_logger = _SfaLogger(loggername = 'sqlalchemy.engine', \
46 def GetMinExperimentDurationInSec():
50 def GetPeers (peer_filter=None ):
51 """ Gathers registered authorities in SFA DB and looks for specific peer
52 if peer_filter is specified.
53 :param peer_filter: name of the site authority looked for.
54 :type peer_filter: string
55 :return: list of records.
60 existing_hrns_by_types = {}
61 logger.debug("SLABDRIVER \tGetPeers peer_filter %s, \
63 all_records = dbsession.query(RegRecord).filter(RegRecord.type.like('%authority%')).all()
65 for record in all_records:
66 existing_records[(record.hrn, record.type)] = record
67 if record.type not in existing_hrns_by_types:
68 existing_hrns_by_types[record.type] = [record.hrn]
70 existing_hrns_by_types[record.type].append(record.hrn)
73 logger.debug("SLABDRIVER \tGetPeer\texisting_hrns_by_types %s "\
74 %( existing_hrns_by_types))
79 records_list.append(existing_records[(peer_filter,'authority')])
81 for hrn in existing_hrns_by_types['authority']:
82 records_list.append(existing_records[(hrn,'authority')])
84 logger.debug("SLABDRIVER \tGetPeer \trecords_list %s " \
90 return_records = records_list
91 logger.debug("SLABDRIVER \tGetPeer return_records %s " \
97 #TODO : Handling OR request in make_ldap_filters_from_records
98 #instead of the for loop
99 #over the records' list
100 def GetPersons(self, person_filter=None):
102 Get the enabled users and their properties from Senslab LDAP.
103 If a filter is specified, looks for the user whose properties match
104 the filter, otherwise returns the whole enabled users'list.
105 :param person_filter: Must be a list of dictionnaries
106 with users properties when not set to None.
107 :param person_filter: list of dict
108 :return:Returns a list of users whose accounts are enabled
110 :rtype: list of dicts
113 logger.debug("SLABDRIVER \tGetPersons person_filter %s" \
116 if person_filter and isinstance(person_filter, list):
117 #If we are looking for a list of users (list of dict records)
118 #Usually the list contains only one user record
119 for searched_attributes in person_filter:
121 #Get only enabled user accounts in senslab LDAP :
122 #add a filter for make_ldap_filters_from_record
123 person = self.ldap.LdapFindUser(searched_attributes, \
124 is_user_enabled=True)
125 #If a person was found, append it to the list
127 person_list.append(person)
129 #If the list is empty, return None
130 if len(person_list) is 0:
134 #Get only enabled user accounts in senslab LDAP :
135 #add a filter for make_ldap_filters_from_record
136 person_list = self.ldap.LdapFindUser(is_user_enabled=True)
141 #def GetTimezone(self):
142 #""" Returns the OAR server time and timezone.
143 #Unused SA 30/05/13"""
144 #server_timestamp, server_tz = self.oar.parser.\
145 #SendRequest("GET_timezone")
146 #return server_timestamp, server_tz
148 def DeleteJobs(self, job_id, username):
150 """ Deletes the job with the specified job_id and username on OAR by
151 posting a delete request to OAR.
153 :param job_id: job id in OAR.
154 :param username: user's senslab login in LDAP.
156 :type username: string
158 :return: dictionary with the job id and if delete has been successful
162 logger.debug("SLABDRIVER \tDeleteJobs jobid %s username %s "\
164 if not job_id or job_id is -1:
168 reqdict['method'] = "delete"
169 reqdict['strval'] = str(job_id)
172 answer = self.oar.POSTRequestToOARRestAPI('DELETE_jobs_id', \
174 if answer['status'] == 'Delete request registered':
175 ret = {job_id : True }
177 ret = {job_id :False }
178 logger.debug("SLABDRIVER \tDeleteJobs jobid %s \r\n answer %s \
179 username %s" %(job_id, answer, username))
184 ##TODO : Unused GetJobsId ? SA 05/07/12
185 #def GetJobsId(self, job_id, username = None ):
187 #Details about a specific job.
188 #Includes details about submission time, jot type, state, events,
189 #owner, assigned ressources, walltime etc...
193 #node_list_k = 'assigned_network_address'
194 ##Get job info from OAR
195 #job_info = self.oar.parser.SendRequest(req, job_id, username)
197 #logger.debug("SLABDRIVER \t GetJobsId %s " %(job_info))
199 #if job_info['state'] == 'Terminated':
200 #logger.debug("SLABDRIVER \t GetJobsId job %s TERMINATED"\
203 #if job_info['state'] == 'Error':
204 #logger.debug("SLABDRIVER \t GetJobsId ERROR message %s "\
209 #logger.error("SLABDRIVER \tGetJobsId KeyError")
212 #parsed_job_info = self.get_info_on_reserved_nodes(job_info, \
214 ##Replaces the previous entry
215 ##"assigned_network_address" / "reserved_resources"
217 #job_info.update({'node_ids':parsed_job_info[node_list_k]})
218 #del job_info[node_list_k]
219 #logger.debug(" \r\nSLABDRIVER \t GetJobsId job_info %s " %(job_info))
223 def GetJobsResources(self, job_id, username = None):
224 """ Gets the list of nodes associated with the job_id and username
226 Transforms the senslab hostnames to the corresponding
228 Rertuns dict key :'node_ids' , value : hostnames list
229 :param username: user's LDAP login
230 :paran job_id: job's OAR identifier.
231 :type username: string
232 :type job_id: integer
234 :return: dicionary with nodes' hostnames belonging to the job.
238 req = "GET_jobs_id_resources"
241 #Get job resources list from OAR
242 node_id_list = self.oar.parser.SendRequest(req, job_id, username)
243 logger.debug("SLABDRIVER \t GetJobsResources %s " %(node_id_list))
246 self.__get_hostnames_from_oar_node_ids(node_id_list)
249 #Replaces the previous entry "assigned_network_address" /
250 #"reserved_resources" with "node_ids"
251 job_info = {'node_ids': hostname_list}
256 #def get_info_on_reserved_nodes(self, job_info, node_list_name):
258 #..warning:unused SA 23/05/13
260 ##Get the list of the testbed nodes records and make a
261 ##dictionnary keyed on the hostname out of it
262 #node_list_dict = self.GetNodes()
263 ##node_hostname_list = []
264 #node_hostname_list = [node['hostname'] for node in node_list_dict]
265 ##for node in node_list_dict:
266 ##node_hostname_list.append(node['hostname'])
267 #node_dict = dict(zip(node_hostname_list, node_list_dict))
269 #reserved_node_hostname_list = []
270 #for index in range(len(job_info[node_list_name])):
271 ##job_info[node_list_name][k] =
272 #reserved_node_hostname_list[index] = \
273 #node_dict[job_info[node_list_name][index]]['hostname']
275 #logger.debug("SLABDRIVER \t get_info_on_reserved_nodes \
276 #reserved_node_hostname_list %s" \
277 #%(reserved_node_hostname_list))
279 #logger.error("SLABDRIVER \t get_info_on_reserved_nodes KEYERROR " )
281 #return reserved_node_hostname_list
283 def GetNodesCurrentlyInUse(self):
284 """Returns a list of all the nodes already involved in an oar running
286 :rtype: list of nodes hostnames.
288 return self.oar.parser.SendRequest("GET_running_jobs")
290 def __get_hostnames_from_oar_node_ids(self, resource_id_list ):
291 """Get the hostnames of the nodes from their OAR identifiers.
292 Get the list of nodes dict using GetNodes and find the hostname
293 associated with the identifier.
294 :param resource_id_list: list of nodes identifiers
295 :returns: list of node hostnames.
297 full_nodes_dict_list = self.GetNodes()
298 #Put the full node list into a dictionary keyed by oar node id
299 oar_id_node_dict = {}
300 for node in full_nodes_dict_list:
301 oar_id_node_dict[node['oar_id']] = node
304 for resource_id in resource_id_list:
305 #Because jobs requested "asap" do not have defined resources
306 if resource_id is not "Undefined":
307 hostname_list.append(\
308 oar_id_node_dict[resource_id]['hostname'])
310 #hostname_list.append(oar_id_node_dict[resource_id]['hostname'])
313 def GetReservedNodes(self, username = None):
314 """ Get list of leases. Get the leases for the username if specified,
315 otherwise get all the leases. Finds the nodes hostnames for each
317 :param username: user's LDAP login
318 :type username: string
319 :return: list of reservations dict
323 #Get the nodes in use and the reserved nodes
324 reservation_dict_list = \
325 self.oar.parser.SendRequest("GET_reserved_nodes", \
329 for resa in reservation_dict_list:
330 logger.debug ("GetReservedNodes resa %s"%(resa))
331 #dict list of hostnames and their site
332 resa['reserved_nodes'] = \
333 self.__get_hostnames_from_oar_node_ids(resa['resource_ids'])
335 #del resa['resource_ids']
336 return reservation_dict_list
338 def GetNodes(self, node_filter_dict = None, return_fields_list = None):
341 Make a list of senslab nodes and their properties from information
342 given by OAR. Search for specific nodes if some filters are specified.
343 Nodes properties returned if no return_fields_list given:
344 'hrn','archi','mobile','hostname','site','boot_state','node_id',
345 'radio','posx','posy','oar_id','posz'.
347 :param node_filter_dict: dictionnary of lists with node properties
348 :type node_filter_dict: dict
349 :param return_fields_list: list of specific fields the user wants to be
351 :type return_fields_list: list
352 :return: list of dictionaries with node properties
356 node_dict_by_id = self.oar.parser.SendRequest("GET_resources_full")
357 node_dict_list = node_dict_by_id.values()
358 logger.debug (" SLABDRIVER GetNodes node_filter_dict %s \
359 return_fields_list %s "%(node_filter_dict, return_fields_list))
360 #No filtering needed return the list directly
361 if not (node_filter_dict or return_fields_list):
362 return node_dict_list
364 return_node_list = []
366 for filter_key in node_filter_dict:
368 #Filter the node_dict_list by each value contained in the
369 #list node_filter_dict[filter_key]
370 for value in node_filter_dict[filter_key]:
371 for node in node_dict_list:
372 if node[filter_key] == value:
373 if return_fields_list :
375 for k in return_fields_list:
377 return_node_list.append(tmp)
379 return_node_list.append(node)
381 logger.log_exc("GetNodes KeyError")
385 return return_node_list
390 def AddSlice(slice_record, user_record):
391 """Add slice to the local senslab sfa tables if the slice comes
392 from a federated site and is not yet in the senslab sfa DB,
393 although the user has already a LDAP login.
394 Called by verify_slice during lease/sliver creation.
395 :param slice_record: record of slice, must contain hrn, gid, slice_id
396 and authority of the slice.
397 :type slice_record: dictionary
398 :param user_record: record of the user
399 :type user_record: RegUser
402 sfa_record = RegSlice(hrn=slice_record['hrn'],
403 gid=slice_record['gid'],
404 pointer=slice_record['slice_id'],
405 authority=slice_record['authority'])
407 logger.debug("SLABDRIVER.PY AddSlice sfa_record %s user_record %s" \
408 %(sfa_record, user_record))
409 sfa_record.just_created()
410 dbsession.add(sfa_record)
412 #Update the reg-researcher dependance table
413 sfa_record.reg_researchers = [user_record]
418 #GetSites unused, SA 27/05/13
419 #def GetSites(self, site_filter_name_list = None, return_fields_list = None):
420 #site_dict = self.oar.parser.SendRequest("GET_sites")
421 ##site_dict : dict where the key is the sit ename
422 #return_site_list = []
423 #if not ( site_filter_name_list or return_fields_list):
424 #return_site_list = site_dict.values()
425 #return return_site_list
427 #for site_filter_name in site_filter_name_list:
428 #if site_filter_name in site_dict:
429 #if return_fields_list:
430 #for field in return_fields_list:
433 #tmp[field] = site_dict[site_filter_name][field]
435 #logger.error("GetSites KeyError %s "%(field))
437 #return_site_list.append(tmp)
439 #return_site_list.append( site_dict[site_filter_name])
442 #return return_site_list
448 #TODO : Check rights to delete person
449 def DeletePerson(self, person_record):
450 """ Disable an existing account in senslab LDAP.
451 Users and techs can only delete themselves. PIs can only
452 delete themselves and other non-PIs at their sites.
453 ins can delete anyone.
454 :param person_record: user's record
455 :type person_record: dict
456 :return: True if successful, False otherwise.
460 #Disable user account in senslab LDAP
461 ret = self.ldap.LdapMarkUserAsDeleted(person_record)
462 logger.warning("SLABDRIVER DeletePerson %s " %(person_record))
466 def DeleteSlice(self, slice_record):
467 """ Deletes the specified slice and kills the jobs associated with
468 the slice if any, using DeleteSliceFromNodes.
470 :return: True if all the jobs in the slice have been deleted,
471 or the list of jobs that could not be deleted otherwise.
472 :rtype: list or boolean
475 ret = self.DeleteSliceFromNodes(slice_record)
478 if False in ret[job_id]:
479 if delete_failed is None:
481 delete_failed.append(job_id)
483 logger.info("SLABDRIVER DeleteSlice %s answer %s"%(slice_record, \
485 return delete_failed or True
488 def __add_person_to_db(user_dict):
490 Add a federated user straight to db when the user issues a lease
491 request with senslab nodes and that he has not registered with senslab
492 yet (that is he does not have a LDAP entry yet).
493 Uses parts of the routines in SlabImport when importing user from LDAP.
494 Called by AddPerson, right after LdapAddUser.
495 :param user_dict: Must contain email, hrn and pkey to get a GID
496 and be added to the SFA db.
497 :type user_dict: dict
501 dbsession.query(RegUser).filter_by(email = user_dict['email']).first()
503 if not check_if_exists:
504 logger.debug("__add_person_to_db \t Adding %s \r\n \r\n \
506 hrn = user_dict['hrn']
507 person_urn = hrn_to_urn(hrn, 'user')
508 pubkey = user_dict['pkey']
510 pkey = convert_public_key(pubkey)
512 #key not good. create another pkey
513 logger.warn('__add_person_to_db: unable to convert public \
515 pkey = Keypair(create=True)
518 if pubkey is not None and pkey is not None :
519 hierarchy = Hierarchy()
520 person_gid = hierarchy.create_gid(person_urn, create_uuid(), \
522 if user_dict['email']:
523 logger.debug("__add_person_to_db \r\n \r\n \
524 SLAB IMPORTER PERSON EMAIL OK email %s "\
525 %(user_dict['email']))
526 person_gid.set_email(user_dict['email'])
528 user_record = RegUser(hrn=hrn , pointer= '-1', \
529 authority=get_authority(hrn), \
530 email=user_dict['email'], gid = person_gid)
531 user_record.reg_keys = [RegKey(user_dict['pkey'])]
532 user_record.just_created()
533 dbsession.add (user_record)
538 def AddPerson(self, record):
539 """Adds a new account. Any fields specified in records are used,
540 otherwise defaults are used. Creates an appropriate login by calling
542 :param record: dictionary with the sfa user's properties.
543 :return: The uid of the added person if sucessful, otherwise returns
544 the error message from LDAP.
545 :rtype: interger or string
547 ret = self.ldap.LdapAddUser(record)
549 if ret['bool'] is True:
550 record['hrn'] = self.root_auth + '.' + ret['uid']
551 logger.debug("SLABDRIVER AddPerson return code %s record %s \r\n "\
553 self.__add_person_to_db(record)
556 return ret['message']
560 #TODO AddPersonKey 04/07/2012 SA
561 def AddPersonKey(self, person_uid, old_attributes_dict, new_key_dict):
562 """Adds a new key to the specified account. Adds the key to the
563 senslab ldap, provided that the person_uid is valid.
564 Non-admins can only modify their own keys.
566 :param person_uid: user's senslab login in LDAP
567 :param old_attributes_dict: dict with the user's old sshPublicKey
568 :param new_key_dict:dict with the user's new sshPublicKey
569 :type person_uid: string
573 :return: True if the key has been modified, False otherwise.
576 ret = self.ldap.LdapModify(person_uid, old_attributes_dict, \
578 logger.warning("SLABDRIVER AddPersonKey EMPTY - DO NOTHING \r\n ")
581 def DeleteLeases(self, leases_id_list, slice_hrn ):
583 Deletes several leases, based on their job ids and the slice
584 they are associated with. Uses DeleteJobs to delete the jobs
585 on OAR. Note that one slice can contain multiple jobs, and in this case
586 all the jobs in the leases_id_list MUST belong to ONE slice,
587 since there is only one slice hrn provided here.
588 :param leases_id_list: list of job ids that belong to the slice whose
589 slice hrn is provided.
590 :param slice_hrn: the slice hrn .
591 ..warning: Does not have a return value since there was no easy
592 way to handle failure when dealing with multiple job delete. Plus,
593 there was no easy way to report it to the user.
595 logger.debug("SLABDRIVER DeleteLeases leases_id_list %s slice_hrn %s \
596 \r\n " %(leases_id_list, slice_hrn))
597 for job_id in leases_id_list:
598 self.DeleteJobs(job_id, slice_hrn)
603 def _process_walltime(duration):
604 """ Calculates the walltime in seconds from the duration in H:M:S
605 specified in the RSpec.
609 # Fixing the walltime by adding a few delays.
610 # First put the walltime in seconds oarAdditionalDelay = 20;
611 # additional delay for /bin/sleep command to
612 # take in account prologue and epilogue scripts execution
613 # int walltimeAdditionalDelay = 240; additional delay
614 # Put the duration in seconds first
615 desired_walltime = duration * 60
617 total_walltime = desired_walltime + 240 #+4 min Update SA 23/10/12
618 sleep_walltime = desired_walltime # 0 sec added Update SA 23/10/12
620 #Put the walltime back in str form
622 walltime.append(str(total_walltime / 3600))
623 total_walltime = total_walltime - 3600 * int(walltime[0])
624 #Get the remaining minutes
625 walltime.append(str(total_walltime / 60))
626 total_walltime = total_walltime - 60 * int(walltime[1])
628 walltime.append(str(total_walltime))
631 logger.log_exc(" __process_walltime duration null")
633 return walltime, sleep_walltime
636 def _create_job_structure_request_for_OAR(lease_dict):
637 """ Creates the structure needed for a correct POST on OAR.
638 Makes the timestamp transformation into the appropriate format.
639 Sends the POST request to create the job with the resources in
648 reqdict['workdir'] = '/tmp'
649 reqdict['resource'] = "{network_address in ("
651 for node in lease_dict['added_nodes']:
652 logger.debug("\r\n \r\n OARrestapi \t \
653 __create_job_structure_request_for_OAR node %s" %(node))
655 # Get the ID of the node
657 reqdict['resource'] += "'" + nodeid + "', "
658 nodeid_list.append(nodeid)
660 custom_length = len(reqdict['resource'])- 2
661 reqdict['resource'] = reqdict['resource'][0:custom_length] + \
662 ")}/nodes=" + str(len(nodeid_list))
665 walltime, sleep_walltime = \
666 SlabTestbedAPI._process_walltime(\
667 int(lease_dict['lease_duration']))
670 reqdict['resource'] += ",walltime=" + str(walltime[0]) + \
671 ":" + str(walltime[1]) + ":" + str(walltime[2])
672 reqdict['script_path'] = "/bin/sleep " + str(sleep_walltime)
674 #In case of a scheduled experiment (not immediate)
675 #To run an XP immediately, don't specify date and time in RSpec
676 #They will be set to None.
677 if lease_dict['lease_start_time'] is not '0':
678 #Readable time accepted by OAR
679 start_time = datetime.fromtimestamp( \
680 int(lease_dict['lease_start_time'])).\
681 strftime(lease_dict['time_format'])
682 reqdict['reservation'] = start_time
683 #If there is not start time, Immediate XP. No need to add special
687 reqdict['type'] = "deploy"
688 reqdict['directory'] = ""
689 reqdict['name'] = "SFA_" + lease_dict['slice_user']
694 def LaunchExperimentOnOAR(self, added_nodes, slice_name, \
695 lease_start_time, lease_duration, slice_user=None):
698 Create a job request structure based on the information provided
699 and post the job on OAR.
700 :param added_nodes: list of nodes that belong to the described lease.
701 :param slice_name: the slice hrn associated to the lease.
702 :param lease_start_time: timestamp of the lease startting time.
703 :param lease_duration: lease durationin minutes
707 lease_dict['lease_start_time'] = lease_start_time
708 lease_dict['lease_duration'] = lease_duration
709 lease_dict['added_nodes'] = added_nodes
710 lease_dict['slice_name'] = slice_name
711 lease_dict['slice_user'] = slice_user
712 lease_dict['grain'] = self.GetLeaseGranularity()
713 lease_dict['time_format'] = self.time_format
716 logger.debug("SLABDRIVER.PY \tLaunchExperimentOnOAR slice_user %s\
717 \r\n " %(slice_user))
718 #Create the request for OAR
719 reqdict = self._create_job_structure_request_for_OAR(lease_dict)
720 # first step : start the OAR job and update the job
721 logger.debug("SLABDRIVER.PY \tLaunchExperimentOnOAR reqdict %s\
724 answer = self.oar.POSTRequestToOARRestAPI('POST_job', \
726 logger.debug("SLABDRIVER \tLaunchExperimentOnOAR jobid %s " %(answer))
730 logger.log_exc("SLABDRIVER \tLaunchExperimentOnOAR \
731 Impossible to create job %s " %(answer))
738 logger.debug("SLABDRIVER \tLaunchExperimentOnOAR jobid %s \
739 added_nodes %s slice_user %s" %(jobid, added_nodes, \
746 def AddLeases(self, hostname_list, slice_record, \
747 lease_start_time, lease_duration):
749 """Creates a job in OAR corresponding to the information provided
750 as parameters. Adds the job id and the slice hrn in the senslab
751 database so that we are able to know which slice has which nodes.
753 :param hostname_list: list of nodes' OAR hostnames.
754 :param slice_record: sfa slice record, must contain login and hrn.
755 :param lease_start_time: starting time , unix timestamp format
756 :param lease_duration: duration in minutes
758 :type hostname_list: list
759 :type slice_record: dict
760 :type lease_start_time: integer
761 :type lease_duration: integer
764 logger.debug("SLABDRIVER \r\n \r\n \t AddLeases hostname_list %s \
765 slice_record %s lease_start_time %s lease_duration %s "\
766 %( hostname_list, slice_record , lease_start_time, \
769 #tmp = slice_record['reg-researchers'][0].split(".")
770 username = slice_record['login']
771 #username = tmp[(len(tmp)-1)]
772 job_id = self.LaunchExperimentOnOAR(hostname_list, \
773 slice_record['hrn'], \
774 lease_start_time, lease_duration, \
777 datetime.fromtimestamp(int(lease_start_time)).\
778 strftime(self.time_format)
779 end_time = lease_start_time + lease_duration
782 logger.debug("SLABDRIVER \r\n \r\n \t AddLeases TURN ON LOGGING SQL \
783 %s %s %s "%(slice_record['hrn'], job_id, end_time))
786 logger.debug("SLABDRIVER \r\n \r\n \t AddLeases %s %s %s " \
787 %(type(slice_record['hrn']), type(job_id), type(end_time)))
789 slab_ex_row = SenslabXP(slice_hrn = slice_record['hrn'], \
790 job_id = job_id, end_time= end_time)
792 logger.debug("SLABDRIVER \r\n \r\n \t AddLeases slab_ex_row %s" \
794 slab_dbsession.add(slab_ex_row)
795 slab_dbsession.commit()
797 logger.debug("SLABDRIVER \t AddLeases hostname_list start_time %s " \
803 #Delete the jobs from job_senslab table
804 def DeleteSliceFromNodes(self, slice_record):
805 """ Deletes all the running or scheduled jobs of a given slice
807 :param slice_record: record of the slice
808 :type slice_record: dict
810 :return: dict of the jobs'deletion status. Success= True, Failure=
811 False, for each job id.
814 logger.debug("SLABDRIVER \t DeleteSliceFromNodese %s " %(slice_record))
816 if isinstance(slice_record['oar_job_id'], list):
818 for job_id in slice_record['oar_job_id']:
819 ret = self.DeleteJobs(job_id, slice_record['user'])
821 oar_bool_answer.update(ret)
824 oar_bool_answer = [self.DeleteJobs(slice_record['oar_job_id'], \
825 slice_record['user'])]
827 return oar_bool_answer
831 def GetLeaseGranularity(self):
832 """ Returns the granularity of an experiment in the Senslab testbed.
833 OAR uses seconds for experiments duration , the granulaity is also
835 Experiments which last less than 10 min (600 sec) are invalid"""
840 def update_jobs_in_slabdb( job_oar_list, jobs_psql):
841 """ Cleans the slab db by deleting expired and cancelled jobs.
842 Compares the list of job ids given by OAR with the job ids that
843 are already in the database, deletes the jobs that are no longer in
845 :param job_oar_list: list of job ids coming from OAR
846 :type job_oar_list: list
847 :param job_psql: list of job ids cfrom the database.
850 #Turn the list into a set
851 set_jobs_psql = set(jobs_psql)
853 kept_jobs = set(job_oar_list).intersection(set_jobs_psql)
854 logger.debug ( "\r\n \t\ update_jobs_in_slabdb jobs_psql %s \r\n \t \
855 job_oar_list %s kept_jobs %s "%(set_jobs_psql, job_oar_list, kept_jobs))
856 deleted_jobs = set_jobs_psql.difference(kept_jobs)
857 deleted_jobs = list(deleted_jobs)
858 if len(deleted_jobs) > 0:
859 slab_dbsession.query(SenslabXP).filter(SenslabXP.job_id.in_(deleted_jobs)).delete(synchronize_session='fetch')
860 slab_dbsession.commit()
866 def GetLeases(self, lease_filter_dict=None, login=None):
867 """ Get the list of leases from OAR with complete information
868 about which slice owns which jobs and nodes.
870 -Fetch all the jobs from OAR (running, waiting..)
871 complete the reservation information with slice hrn
872 found in slabxp table. If not available in the table,
873 assume it is a senslab slice.
874 -Updates the slab table, deleting jobs when necessary.
875 :return: reservation_list, list of dictionaries with 'lease_id',
876 'reserved_nodes','slice_id', 'state', 'user', 'component_id_list',
877 'slice_hrn', 'resource_ids', 't_from', 't_until'
881 unfiltered_reservation_list = self.GetReservedNodes(login)
883 reservation_list = []
884 #Find the slice associated with this user senslab ldap uid
885 logger.debug(" SLABDRIVER.PY \tGetLeases login %s\
886 unfiltered_reservation_list %s " %(login, unfiltered_reservation_list))
887 #Create user dict first to avoid looking several times for
888 #the same user in LDAP SA 27/07/12
891 jobs_psql_query = slab_dbsession.query(SenslabXP).all()
892 jobs_psql_dict = dict([(row.job_id, row.__dict__ ) for row in jobs_psql_query ])
893 #jobs_psql_dict = jobs_psql_dict)
894 logger.debug("SLABDRIVER \tGetLeases jobs_psql_dict %s"\
896 jobs_psql_id_list = [ row.job_id for row in jobs_psql_query ]
900 for resa in unfiltered_reservation_list:
901 logger.debug("SLABDRIVER \tGetLeases USER %s"\
903 #Construct list of jobs (runing, waiting..) in oar
904 job_oar_list.append(resa['lease_id'])
905 #If there is information on the job in SLAB DB ]
906 #(slice used and job id)
907 if resa['lease_id'] in jobs_psql_dict:
908 job_info = jobs_psql_dict[resa['lease_id']]
909 logger.debug("SLABDRIVER \tGetLeases job_info %s"\
911 resa['slice_hrn'] = job_info['slice_hrn']
912 resa['slice_id'] = hrn_to_urn(resa['slice_hrn'], 'slice')
914 #otherwise, assume it is a senslab slice:
916 resa['slice_id'] = hrn_to_urn(self.root_auth+'.'+ \
917 resa['user'] +"_slice" , 'slice')
918 resa['slice_hrn'] = Xrn(resa['slice_id']).get_hrn()
920 resa['component_id_list'] = []
921 #Transform the hostnames into urns (component ids)
922 for node in resa['reserved_nodes']:
924 slab_xrn = slab_xrn_object(self.root_auth, node)
925 resa['component_id_list'].append(slab_xrn.urn)
927 if lease_filter_dict:
928 logger.debug("SLABDRIVER \tGetLeases resa_ %s \
929 \r\n leasefilter %s" %(resa, lease_filter_dict))
931 if lease_filter_dict['name'] == resa['slice_hrn']:
932 reservation_list.append(resa)
934 if lease_filter_dict is None:
935 reservation_list = unfiltered_reservation_list
938 self.update_jobs_in_slabdb(job_oar_list, jobs_psql_id_list)
940 logger.debug(" SLABDRIVER.PY \tGetLeases reservation_list %s"\
942 return reservation_list
947 #TODO FUNCTIONS SECTION 04/07/2012 SA
949 ##TODO : Is UnBindObjectFromPeer still necessary ? Currently does nothing
952 #def UnBindObjectFromPeer( auth, object_type, object_id, shortname):
953 #""" This method is a hopefully temporary hack to let the sfa correctly
954 #detach the objects it creates from a remote peer object. This is
955 #needed so that the sfa federation link can work in parallel with
956 #RefreshPeer, as RefreshPeer depends on remote objects being correctly
959 #auth : struct, API authentication structure
960 #AuthMethod : string, Authentication method to use
961 #object_type : string, Object type, among 'site','person','slice',
963 #object_id : int, object_id
964 #shortname : string, peer shortname
968 #logger.warning("SLABDRIVER \tUnBindObjectFromPeer EMPTY-\
972 ##TODO Is BindObjectToPeer still necessary ? Currently does nothing
974 #|| Commented out 28/05/13 SA
975 #def BindObjectToPeer(self, auth, object_type, object_id, shortname=None, \
976 #remote_object_id=None):
977 #"""This method is a hopefully temporary hack to let the sfa correctly
978 #attach the objects it creates to a remote peer object. This is needed
979 #so that the sfa federation link can work in parallel with RefreshPeer,
980 #as RefreshPeer depends on remote objects being correctly marked.
982 #shortname : string, peer shortname
983 #remote_object_id : int, remote object_id, set to 0 if unknown
987 #logger.warning("SLABDRIVER \tBindObjectToPeer EMPTY - DO NOTHING \r\n ")
990 ##TODO UpdateSlice 04/07/2012 SA || Commented out 28/05/13 SA
991 ##Funciton should delete and create another job since oin senslab slice=job
992 #def UpdateSlice(self, auth, slice_id_or_name, slice_fields=None):
993 #"""Updates the parameters of an existing slice with the values in
995 #Users may only update slices of which they are members.
996 #PIs may update any of the slices at their sites, or any slices of
997 #which they are members. Admins may update any slice.
998 #Only PIs and admins may update max_nodes. Slices cannot be renewed
999 #(by updating the expires parameter) more than 8 weeks into the future.
1000 #Returns 1 if successful, faults otherwise.
1004 #logger.warning("SLABDRIVER UpdateSlice EMPTY - DO NOTHING \r\n ")
1007 #Unused SA 30/05/13, we only update the user's key or we delete it.
1008 ##TODO UpdatePerson 04/07/2012 SA
1009 #def UpdatePerson(self, slab_hrn, federated_hrn, person_fields=None):
1010 #"""Updates a person. Only the fields specified in person_fields
1011 #are updated, all other fields are left untouched.
1012 #Users and techs can only update themselves. PIs can only update
1013 #themselves and other non-PIs at their sites.
1014 #Returns 1 if successful, faults otherwise.
1018 ##new_row = FederatedToSenslab(slab_hrn, federated_hrn)
1019 ##slab_dbsession.add(new_row)
1020 ##slab_dbsession.commit()
1022 #logger.debug("SLABDRIVER UpdatePerson EMPTY - DO NOTHING \r\n ")
1026 def GetKeys(key_filter=None):
1027 """Returns a dict of dict based on the key string. Each dict entry
1028 contains the key id, the ssh key, the user's email and the
1030 If key_filter is specified and is an array of key identifiers,
1031 only keys matching the filter will be returned.
1033 Admin may query all keys. Non-admins may only query their own keys.
1036 :return: dict with ssh key as key and dicts as value.
1039 if key_filter is None:
1040 keys = dbsession.query(RegKey).options(joinedload('reg_user')).all()
1042 keys = dbsession.query(RegKey).options(joinedload('reg_user')).filter(RegKey.key.in_(key_filter)).all()
1046 key_dict[key.key] = {'key_id': key.key_id, 'key': key.key, \
1047 'email': key.reg_user.email, 'hrn':key.reg_user.hrn}
1049 #ldap_rslt = self.ldap.LdapSearch({'enabled']=True})
1050 #user_by_email = dict((user[1]['mail'][0], user[1]['sshPublicKey']) \
1051 #for user in ldap_rslt)
1053 logger.debug("SLABDRIVER GetKeys -key_dict %s \r\n " %(key_dict))
1057 def DeleteKey(self, user_record, key_string):
1058 """ Deletes a key in the LDAP entry of the specified user.
1059 Removes the key_string from the user's key list and updates the LDAP
1060 user's entry with the new key attributes.
1061 :param key_string: The ssh key to remove
1062 :param user_record: User's record
1063 :type key_string: string
1064 :type user_record: dict
1065 :return: True if sucessful, False if not.
1069 all_user_keys = user_record['keys']
1070 all_user_keys.remove(key_string)
1071 new_attributes = {'sshPublicKey':all_user_keys}
1072 ret = self.ldap.LdapModifyUser(user_record, new_attributes)
1073 logger.debug("SLABDRIVER DeleteKey %s- "%(ret))
1080 def _sql_get_slice_info( slice_filter ):
1082 Get the slice record based on the slice hrn. Fetch the record of the
1083 user associated with the slice by usingjoinedload based on t
1084 he reg_researcher relationship.
1085 :param slice_filter: the slice hrn we are looking for
1086 :type slice_filter: string
1087 :return: the slice record enhanced with the user's information if the
1088 slice was found, None it wasn't.
1089 :rtype: dict or None.
1091 #DO NOT USE RegSlice - reg_researchers to get the hrn
1092 #of the user otherwise will mess up the RegRecord in
1093 #Resolve, don't know why - SA 08/08/2012
1095 #Only one entry for one user = one slice in slab_xp table
1096 #slicerec = dbsession.query(RegRecord).filter_by(hrn = slice_filter).first()
1097 raw_slicerec = dbsession.query(RegSlice).options(joinedload('reg_researchers')).filter_by(hrn = slice_filter).first()
1098 #raw_slicerec = dbsession.query(RegRecord).filter_by(hrn = slice_filter).first()
1100 #load_reg_researcher
1101 #raw_slicerec.reg_researchers
1102 raw_slicerec = raw_slicerec.__dict__
1103 logger.debug(" SLABDRIVER \t get_slice_info slice_filter %s \
1104 raw_slicerec %s"%(slice_filter, raw_slicerec))
1105 slicerec = raw_slicerec
1106 #only one researcher per slice so take the first one
1107 #slicerec['reg_researchers'] = raw_slicerec['reg_researchers']
1108 #del slicerec['reg_researchers']['_sa_instance_state']
1115 def _sql_get_slice_info_from_user(slice_filter ):
1117 Get the slice record based on the user recordid by using a joinedload
1118 on the relationship reg_slices_as_researcher. Format the sql record
1119 into a dict with the mandatory fields for user and slice.
1120 :return: dict with slice record and user record if the record was found
1121 based on the user's id, None if not..
1122 :rtype:dict or None..
1124 #slicerec = dbsession.query(RegRecord).filter_by(record_id = slice_filter).first()
1125 raw_slicerec = dbsession.query(RegUser).options(joinedload('reg_slices_as_researcher')).filter_by(record_id = slice_filter).first()
1126 #raw_slicerec = dbsession.query(RegRecord).filter_by(record_id = slice_filter).first()
1127 #Put it in correct order
1128 user_needed_fields = ['peer_authority', 'hrn', 'last_updated', 'classtype', 'authority', 'gid', 'record_id', 'date_created', 'type', 'email', 'pointer']
1129 slice_needed_fields = ['peer_authority', 'hrn', 'last_updated', 'classtype', 'authority', 'gid', 'record_id', 'date_created', 'type', 'pointer']
1131 #raw_slicerec.reg_slices_as_researcher
1132 raw_slicerec = raw_slicerec.__dict__
1135 dict([(k, raw_slicerec['reg_slices_as_researcher'][0].__dict__[k]) \
1136 for k in slice_needed_fields])
1137 slicerec['reg_researchers'] = dict([(k, raw_slicerec[k]) \
1138 for k in user_needed_fields])
1139 #TODO Handle multiple slices for one user SA 10/12/12
1140 #for now only take the first slice record associated to the rec user
1141 ##slicerec = raw_slicerec['reg_slices_as_researcher'][0].__dict__
1142 #del raw_slicerec['reg_slices_as_researcher']
1143 #slicerec['reg_researchers'] = raw_slicerec
1144 ##del slicerec['_sa_instance_state']
1151 def _get_slice_records(self, slice_filter = None, \
1152 slice_filter_type = None):
1154 Get the slice record depending on the slice filter and its type.
1155 :param slice_filter: Can be either the slice hrn or the user's record
1157 :type slice_filter: string
1158 :param slice_filter_type: describes the slice filter type used, can be
1159 slice_hrn or record_id_user
1161 :return: the slice record
1163 ..seealso:_sql_get_slice_info_from_user
1164 ..seealso: _sql_get_slice_info
1167 #Get list of slices based on the slice hrn
1168 if slice_filter_type == 'slice_hrn':
1170 #if get_authority(slice_filter) == self.root_auth:
1171 #login = slice_filter.split(".")[1].split("_")[0]
1173 slicerec = self._sql_get_slice_info(slice_filter)
1175 if slicerec is None:
1179 #Get slice based on user id
1180 if slice_filter_type == 'record_id_user':
1182 slicerec = self._sql_get_slice_info_from_user(slice_filter)
1185 fixed_slicerec_dict = slicerec
1186 #At this point if there is no login it means
1187 #record_id_user filter has been used for filtering
1189 ##If theslice record is from senslab
1190 #if fixed_slicerec_dict['peer_authority'] is None:
1191 #login = fixed_slicerec_dict['hrn'].split(".")[1].split("_")[0]
1192 #return login, fixed_slicerec_dict
1193 return fixed_slicerec_dict
1197 def GetSlices(self, slice_filter = None, slice_filter_type = None, \
1199 """ Get the slice records from the slab db and add lease information
1202 :param slice_filter: can be the slice hrn or slice record id in the db
1203 depending on the slice_filter_type.
1204 :param slice_filter_type: defines the type of the filtering used, Can be
1205 either 'slice_hrn' or "record_id'.
1206 :type slice_filter: string
1207 :type slice_filter_type: string
1208 :return: a slice dict if slice_filter and slice_filter_type
1209 are specified and a matching entry is found in the db. The result
1210 is put into a list.Or a list of slice dictionnaries if no filters are
1216 authorized_filter_types_list = ['slice_hrn', 'record_id_user']
1217 return_slicerec_dictlist = []
1219 #First try to get information on the slice based on the filter provided
1220 if slice_filter_type in authorized_filter_types_list:
1221 fixed_slicerec_dict = \
1222 self._get_slice_records(slice_filter, slice_filter_type)
1223 slice_hrn = fixed_slicerec_dict['hrn']
1225 logger.debug(" SLABDRIVER \tGetSlices login %s \
1226 slice record %s slice_filter %s \
1227 slice_filter_type %s " %(login, \
1228 fixed_slicerec_dict, slice_filter, \
1232 #Now we have the slice record fixed_slicerec_dict, get the
1233 #jobs associated to this slice
1236 leases_list = self.GetLeases(login = login)
1237 #If no job is running or no job scheduled
1238 #return only the slice record
1239 if leases_list == [] and fixed_slicerec_dict:
1240 return_slicerec_dictlist.append(fixed_slicerec_dict)
1242 #If several jobs for one slice , put the slice record into
1243 # each lease information dict
1246 for lease in leases_list :
1248 logger.debug("SLABDRIVER.PY \tGetSlices slice_filter %s \
1249 \ lease['slice_hrn'] %s" \
1250 %(slice_filter, lease['slice_hrn']))
1251 if lease['slice_hrn'] == slice_hrn:
1252 slicerec_dict['slice_hrn'] = lease['slice_hrn']
1253 slicerec_dict['hrn'] = lease['slice_hrn']
1254 slicerec_dict['user'] = lease['user']
1255 slicerec_dict['oar_job_id'] = lease['lease_id']
1256 slicerec_dict.update({'list_node_ids':{'hostname':lease['reserved_nodes']}})
1257 slicerec_dict.update({'node_ids':lease['reserved_nodes']})
1259 #Update lease dict with the slice record
1260 if fixed_slicerec_dict:
1261 fixed_slicerec_dict['oar_job_id'] = []
1262 fixed_slicerec_dict['oar_job_id'].append(slicerec_dict['oar_job_id'])
1263 slicerec_dict.update(fixed_slicerec_dict)
1264 #slicerec_dict.update({'hrn':\
1265 #str(fixed_slicerec_dict['slice_hrn'])})
1267 return_slicerec_dictlist.append(slicerec_dict)
1268 logger.debug("SLABDRIVER.PY \tGetSlices \
1269 OHOHOHOH %s" %(return_slicerec_dictlist ))
1271 logger.debug("SLABDRIVER.PY \tGetSlices \
1272 slicerec_dict %s return_slicerec_dictlist %s \
1273 lease['reserved_nodes'] \
1274 %s" %(slicerec_dict, return_slicerec_dictlist, \
1275 lease['reserved_nodes'] ))
1277 logger.debug("SLABDRIVER.PY \tGetSlices RETURN \
1278 return_slicerec_dictlist %s" \
1279 %(return_slicerec_dictlist))
1281 return return_slicerec_dictlist
1285 #Get all slices from the senslab sfa database ,
1286 #put them in dict format
1287 #query_slice_list = dbsession.query(RegRecord).all()
1288 query_slice_list = dbsession.query(RegSlice).options(joinedload('reg_researchers')).all()
1290 for record in query_slice_list:
1291 tmp = record.__dict__
1292 tmp['reg_researchers'] = tmp['reg_researchers'][0].__dict__
1293 #del tmp['reg_researchers']['_sa_instance_state']
1294 return_slicerec_dictlist.append(tmp)
1295 #return_slicerec_dictlist.append(record.__dict__)
1297 #Get all the jobs reserved nodes
1298 leases_list = self.GetReservedNodes()
1301 for fixed_slicerec_dict in return_slicerec_dictlist:
1303 #Check if the slice belongs to a senslab user
1304 if fixed_slicerec_dict['peer_authority'] is None:
1305 owner = fixed_slicerec_dict['hrn'].split(".")[1].split("_")[0]
1308 for lease in leases_list:
1309 if owner == lease['user']:
1310 slicerec_dict['oar_job_id'] = lease['lease_id']
1312 #for reserved_node in lease['reserved_nodes']:
1313 logger.debug("SLABDRIVER.PY \tGetSlices lease %s "\
1316 slicerec_dict.update({'node_ids':lease['reserved_nodes']})
1317 slicerec_dict.update({'list_node_ids':{'hostname':lease['reserved_nodes']}})
1318 slicerec_dict.update(fixed_slicerec_dict)
1319 #slicerec_dict.update({'hrn':\
1320 #str(fixed_slicerec_dict['slice_hrn'])})
1321 #return_slicerec_dictlist.append(slicerec_dict)
1322 fixed_slicerec_dict.update(slicerec_dict)
1324 logger.debug("SLABDRIVER.PY \tGetSlices RETURN \
1325 return_slicerec_dictlist %s \slice_filter %s " \
1326 %(return_slicerec_dictlist, slice_filter))
1328 return return_slicerec_dictlist
1332 #Update slice unused, therefore sfa_fields_to_slab_fields unused
1335 #def sfa_fields_to_slab_fields(sfa_type, hrn, record):
1340 ##for field in record:
1341 ## slab_record[field] = record[field]
1343 #if sfa_type == "slice":
1344 ##instantion used in get_slivers ?
1345 #if not "instantiation" in slab_record:
1346 #slab_record["instantiation"] = "senslab-instantiated"
1347 ##slab_record["hrn"] = hrn_to_pl_slicename(hrn)
1348 ##Unused hrn_to_pl_slicename because Slab's hrn already
1349 ##in the appropriate form SA 23/07/12
1350 #slab_record["hrn"] = hrn
1351 #logger.debug("SLABDRIVER.PY sfa_fields_to_slab_fields \
1352 #slab_record %s " %(slab_record['hrn']))
1353 #if "url" in record:
1354 #slab_record["url"] = record["url"]
1355 #if "description" in record:
1356 #slab_record["description"] = record["description"]
1357 #if "expires" in record:
1358 #slab_record["expires"] = int(record["expires"])
1360 ##nodes added by OAR only and then imported to SFA
1361 ##elif type == "node":
1362 ##if not "hostname" in slab_record:
1363 ##if not "hostname" in record:
1364 ##raise MissingSfaInfo("hostname")
1365 ##slab_record["hostname"] = record["hostname"]
1366 ##if not "model" in slab_record:
1367 ##slab_record["model"] = "geni"
1369 ##One authority only
1370 ##elif type == "authority":
1371 ##slab_record["login_base"] = hrn_to_slab_login_base(hrn)
1373 ##if not "name" in slab_record:
1374 ##slab_record["name"] = hrn
1376 ##if not "abbreviated_name" in slab_record:
1377 ##slab_record["abbreviated_name"] = hrn
1379 ##if not "enabled" in slab_record:
1380 ##slab_record["enabled"] = True
1382 ##if not "is_public" in slab_record:
1383 ##slab_record["is_public"] = True