1 from datetime import datetime
3 from sfa.util.sfalogging import logger
5 from sfa.storage.alchemy import dbsession
6 from sqlalchemy.orm import joinedload
7 from sfa.storage.model import RegRecord, RegUser, RegSlice, RegKey
8 from sfa.iotlab.iotlabpostgres import iotlab_dbsession, IotlabXP
10 from sfa.iotlab.OARrestapi import OARrestapi
11 from sfa.iotlab.LDAPapi import LDAPapi
13 from sfa.util.xrn import Xrn, hrn_to_urn, get_authority
15 from sfa.trust.certificate import Keypair, convert_public_key
16 from sfa.trust.gid import create_uuid
17 from sfa.trust.hierarchy import Hierarchy
19 from sfa.iotlab.iotlabaggregate import iotlab_xrn_object
21 class IotlabTestbedAPI():
22 """ Class enabled to use LDAP and OAR api calls. """
24 def __init__(self, config):
25 """Creates an instance of OARrestapi and LDAPapi which will be used to
26 issue calls to OAR or LDAP methods.
27 Set the time format and the testbed granularity used for OAR
28 reservation and leases.
30 :param config: configuration object from sfa.util.config
31 :type config: Config object
33 self.oar = OARrestapi()
35 self.time_format = "%Y-%m-%d %H:%M:%S"
36 self.root_auth = config.SFA_REGISTRY_ROOT_AUTH
37 self.grain = 1 # 10 mins lease minimum
38 #import logging, logging.handlers
39 #from sfa.util.sfalogging import _SfaLogger
40 #sql_logger = _SfaLogger(loggername = 'sqlalchemy.engine', \
46 def GetMinExperimentDurationInSec():
50 def GetPeers (peer_filter=None ):
51 """ Gathers registered authorities in SFA DB and looks for specific peer
52 if peer_filter is specified.
53 :param peer_filter: name of the site authority looked for.
54 :type peer_filter: string
55 :return: list of records.
60 existing_hrns_by_types = {}
61 logger.debug("IOTLABDRIVER \tGetPeers peer_filter %s, \
63 all_records = dbsession.query(RegRecord).filter(RegRecord.type.like('%authority%')).all()
65 for record in all_records:
66 existing_records[(record.hrn, record.type)] = record
67 if record.type not in existing_hrns_by_types:
68 existing_hrns_by_types[record.type] = [record.hrn]
70 existing_hrns_by_types[record.type].append(record.hrn)
73 logger.debug("IOTLABDRIVER \tGetPeer\texisting_hrns_by_types %s "\
74 %( existing_hrns_by_types))
79 records_list.append(existing_records[(peer_filter,'authority')])
81 for hrn in existing_hrns_by_types['authority']:
82 records_list.append(existing_records[(hrn,'authority')])
84 logger.debug("IOTLABDRIVER \tGetPeer \trecords_list %s " \
90 return_records = records_list
91 logger.debug("IOTLABDRIVER \tGetPeer return_records %s " \
97 #TODO : Handling OR request in make_ldap_filters_from_records
98 #instead of the for loop
99 #over the records' list
100 def GetPersons(self, person_filter=None):
102 Get the enabled users and their properties from Iotlab LDAP.
103 If a filter is specified, looks for the user whose properties match
104 the filter, otherwise returns the whole enabled users'list.
105 :param person_filter: Must be a list of dictionnaries
106 with users properties when not set to None.
107 :param person_filter: list of dict
108 :return:Returns a list of users whose accounts are enabled
110 :rtype: list of dicts
113 logger.debug("IOTLABDRIVER \tGetPersons person_filter %s" \
116 if person_filter and isinstance(person_filter, list):
117 #If we are looking for a list of users (list of dict records)
118 #Usually the list contains only one user record
119 for searched_attributes in person_filter:
121 #Get only enabled user accounts in iotlab LDAP :
122 #add a filter for make_ldap_filters_from_record
123 person = self.ldap.LdapFindUser(searched_attributes, \
124 is_user_enabled=True)
125 #If a person was found, append it to the list
127 person_list.append(person)
129 #If the list is empty, return None
130 if len(person_list) is 0:
134 #Get only enabled user accounts in iotlab LDAP :
135 #add a filter for make_ldap_filters_from_record
136 person_list = self.ldap.LdapFindUser(is_user_enabled=True)
141 #def GetTimezone(self):
142 #""" Returns the OAR server time and timezone.
143 #Unused SA 30/05/13"""
144 #server_timestamp, server_tz = self.oar.parser.\
145 #SendRequest("GET_timezone")
146 #return server_timestamp, server_tz
148 def DeleteJobs(self, job_id, username):
150 """ Deletes the job with the specified job_id and username on OAR by
151 posting a delete request to OAR.
153 :param job_id: job id in OAR.
154 :param username: user's iotlab login in LDAP.
156 :type username: string
158 :return: dictionary with the job id and if delete has been successful
162 logger.debug("IOTLABDRIVER \tDeleteJobs jobid %s username %s "\
164 if not job_id or job_id is -1:
168 reqdict['method'] = "delete"
169 reqdict['strval'] = str(job_id)
172 answer = self.oar.POSTRequestToOARRestAPI('DELETE_jobs_id', \
174 if answer['status'] == 'Delete request registered':
175 ret = {job_id : True }
177 ret = {job_id :False }
178 logger.debug("IOTLABDRIVER \tDeleteJobs jobid %s \r\n answer %s \
179 username %s" %(job_id, answer, username))
184 ##TODO : Unused GetJobsId ? SA 05/07/12
185 #def GetJobsId(self, job_id, username = None ):
187 #Details about a specific job.
188 #Includes details about submission time, jot type, state, events,
189 #owner, assigned ressources, walltime etc...
193 #node_list_k = 'assigned_network_address'
194 ##Get job info from OAR
195 #job_info = self.oar.parser.SendRequest(req, job_id, username)
197 #logger.debug("IOTLABDRIVER \t GetJobsId %s " %(job_info))
199 #if job_info['state'] == 'Terminated':
200 #logger.debug("IOTLABDRIVER \t GetJobsId job %s TERMINATED"\
203 #if job_info['state'] == 'Error':
204 #logger.debug("IOTLABDRIVER \t GetJobsId ERROR message %s "\
209 #logger.error("IOTLABDRIVER \tGetJobsId KeyError")
212 #parsed_job_info = self.get_info_on_reserved_nodes(job_info, \
214 ##Replaces the previous entry
215 ##"assigned_network_address" / "reserved_resources"
217 #job_info.update({'node_ids':parsed_job_info[node_list_k]})
218 #del job_info[node_list_k]
219 #logger.debug(" \r\nIOTLABDRIVER \t GetJobsId job_info %s " %(job_info))
223 def GetJobsResources(self, job_id, username = None):
224 """ Gets the list of nodes associated with the job_id and username
226 Transforms the iotlab hostnames to the corresponding
228 Rertuns dict key :'node_ids' , value : hostnames list
229 :param username: user's LDAP login
230 :paran job_id: job's OAR identifier.
231 :type username: string
232 :type job_id: integer
234 :return: dicionary with nodes' hostnames belonging to the job.
238 req = "GET_jobs_id_resources"
241 #Get job resources list from OAR
242 node_id_list = self.oar.parser.SendRequest(req, job_id, username)
243 logger.debug("IOTLABDRIVER \t GetJobsResources %s " %(node_id_list))
246 self.__get_hostnames_from_oar_node_ids(node_id_list)
249 #Replaces the previous entry "assigned_network_address" /
250 #"reserved_resources" with "node_ids"
251 job_info = {'node_ids': hostname_list}
256 #def get_info_on_reserved_nodes(self, job_info, node_list_name):
258 #..warning:unused SA 23/05/13
260 ##Get the list of the testbed nodes records and make a
261 ##dictionnary keyed on the hostname out of it
262 #node_list_dict = self.GetNodes()
263 ##node_hostname_list = []
264 #node_hostname_list = [node['hostname'] for node in node_list_dict]
265 ##for node in node_list_dict:
266 ##node_hostname_list.append(node['hostname'])
267 #node_dict = dict(zip(node_hostname_list, node_list_dict))
269 #reserved_node_hostname_list = []
270 #for index in range(len(job_info[node_list_name])):
271 ##job_info[node_list_name][k] =
272 #reserved_node_hostname_list[index] = \
273 #node_dict[job_info[node_list_name][index]]['hostname']
275 #logger.debug("IOTLABDRIVER \t get_info_on_reserved_nodes \
276 #reserved_node_hostname_list %s" \
277 #%(reserved_node_hostname_list))
279 #logger.error("IOTLABDRIVER \t get_info_on_reserved_nodes KEYERROR " )
281 #return reserved_node_hostname_list
283 def GetNodesCurrentlyInUse(self):
284 """Returns a list of all the nodes already involved in an oar running
286 :rtype: list of nodes hostnames.
288 return self.oar.parser.SendRequest("GET_running_jobs")
290 def __get_hostnames_from_oar_node_ids(self, resource_id_list ):
291 """Get the hostnames of the nodes from their OAR identifiers.
292 Get the list of nodes dict using GetNodes and find the hostname
293 associated with the identifier.
294 :param resource_id_list: list of nodes identifiers
295 :returns: list of node hostnames.
297 full_nodes_dict_list = self.GetNodes()
298 #Put the full node list into a dictionary keyed by oar node id
299 oar_id_node_dict = {}
300 for node in full_nodes_dict_list:
301 oar_id_node_dict[node['oar_id']] = node
304 for resource_id in resource_id_list:
305 #Because jobs requested "asap" do not have defined resources
306 if resource_id is not "Undefined":
307 hostname_list.append(\
308 oar_id_node_dict[resource_id]['hostname'])
310 #hostname_list.append(oar_id_node_dict[resource_id]['hostname'])
313 def GetReservedNodes(self, username = None):
314 """ Get list of leases. Get the leases for the username if specified,
315 otherwise get all the leases. Finds the nodes hostnames for each
317 :param username: user's LDAP login
318 :type username: string
319 :return: list of reservations dict
323 #Get the nodes in use and the reserved nodes
324 reservation_dict_list = \
325 self.oar.parser.SendRequest("GET_reserved_nodes", \
329 for resa in reservation_dict_list:
330 logger.debug ("GetReservedNodes resa %s"%(resa))
331 #dict list of hostnames and their site
332 resa['reserved_nodes'] = \
333 self.__get_hostnames_from_oar_node_ids(resa['resource_ids'])
335 #del resa['resource_ids']
336 return reservation_dict_list
338 def GetNodes(self, node_filter_dict = None, return_fields_list = None):
341 Make a list of iotlab nodes and their properties from information
342 given by OAR. Search for specific nodes if some filters are specified.
343 Nodes properties returned if no return_fields_list given:
344 'hrn','archi','mobile','hostname','site','boot_state','node_id',
345 'radio','posx','posy','oar_id','posz'.
347 :param node_filter_dict: dictionnary of lists with node properties
348 :type node_filter_dict: dict
349 :param return_fields_list: list of specific fields the user wants to be
351 :type return_fields_list: list
352 :return: list of dictionaries with node properties
356 node_dict_by_id = self.oar.parser.SendRequest("GET_resources_full")
357 node_dict_list = node_dict_by_id.values()
358 logger.debug (" IOTLABDRIVER GetNodes node_filter_dict %s \
359 return_fields_list %s "%(node_filter_dict, return_fields_list))
360 #No filtering needed return the list directly
361 if not (node_filter_dict or return_fields_list):
362 return node_dict_list
364 return_node_list = []
366 for filter_key in node_filter_dict:
368 #Filter the node_dict_list by each value contained in the
369 #list node_filter_dict[filter_key]
370 for value in node_filter_dict[filter_key]:
371 for node in node_dict_list:
372 if node[filter_key] == value:
373 if return_fields_list :
375 for k in return_fields_list:
377 return_node_list.append(tmp)
379 return_node_list.append(node)
381 logger.log_exc("GetNodes KeyError")
385 return return_node_list
390 def AddSlice(slice_record, user_record):
391 """Add slice to the local iotlab sfa tables if the slice comes
392 from a federated site and is not yet in the iotlab sfa DB,
393 although the user has already a LDAP login.
394 Called by verify_slice during lease/sliver creation.
395 :param slice_record: record of slice, must contain hrn, gid, slice_id
396 and authority of the slice.
397 :type slice_record: dictionary
398 :param user_record: record of the user
399 :type user_record: RegUser
402 sfa_record = RegSlice(hrn=slice_record['hrn'],
403 gid=slice_record['gid'],
404 pointer=slice_record['slice_id'],
405 authority=slice_record['authority'])
406 logger.debug("IOTLABDRIVER.PY AddSlice sfa_record %s user_record %s"
407 % (sfa_record, user_record))
408 sfa_record.just_created()
409 dbsession.add(sfa_record)
411 #Update the reg-researcher dependance table
412 sfa_record.reg_researchers = [user_record]
418 def GetSites(self, site_filter_name_list = None, return_fields_list = None):
419 site_dict = self.oar.parser.SendRequest("GET_sites")
420 #site_dict : dict where the key is the sit ename
421 return_site_list = []
422 if not ( site_filter_name_list or return_fields_list):
423 return_site_list = site_dict.values()
424 return return_site_list
426 for site_filter_name in site_filter_name_list:
427 if site_filter_name in site_dict:
428 if return_fields_list:
429 for field in return_fields_list:
432 tmp[field] = site_dict[site_filter_name][field]
434 logger.error("GetSites KeyError %s "%(field))
436 return_site_list.append(tmp)
438 return_site_list.append( site_dict[site_filter_name])
441 return return_site_list
447 #TODO : Check rights to delete person
448 def DeletePerson(self, person_record):
449 """ Disable an existing account in iotlab LDAP.
450 Users and techs can only delete themselves. PIs can only
451 delete themselves and other non-PIs at their sites.
452 ins can delete anyone.
453 :param person_record: user's record
454 :type person_record: dict
455 :return: True if successful, False otherwise.
459 #Disable user account in iotlab LDAP
460 ret = self.ldap.LdapMarkUserAsDeleted(person_record)
461 logger.warning("IOTLABDRIVER DeletePerson %s " %(person_record))
465 def DeleteSlice(self, slice_record):
466 """ Deletes the specified slice and kills the jobs associated with
467 the slice if any, using DeleteSliceFromNodes.
469 :return: True if all the jobs in the slice have been deleted,
470 or the list of jobs that could not be deleted otherwise.
471 :rtype: list or boolean
474 ret = self.DeleteSliceFromNodes(slice_record)
477 if False in ret[job_id]:
478 if delete_failed is None:
480 delete_failed.append(job_id)
482 logger.info("IOTLABDRIVER DeleteSlice %s answer %s"%(slice_record, \
484 return delete_failed or True
487 def __add_person_to_db(user_dict):
489 Add a federated user straight to db when the user issues a lease
490 request with iotlab nodes and that he has not registered with iotlab
491 yet (that is he does not have a LDAP entry yet).
492 Uses parts of the routines in SlabImport when importing user from LDAP.
493 Called by AddPerson, right after LdapAddUser.
494 :param user_dict: Must contain email, hrn and pkey to get a GID
495 and be added to the SFA db.
496 :type user_dict: dict
500 dbsession.query(RegUser).filter_by(email = user_dict['email']).first()
502 if not check_if_exists:
503 logger.debug("__add_person_to_db \t Adding %s \r\n \r\n \
505 hrn = user_dict['hrn']
506 person_urn = hrn_to_urn(hrn, 'user')
507 pubkey = user_dict['pkey']
509 pkey = convert_public_key(pubkey)
511 #key not good. create another pkey
512 logger.warn('__add_person_to_db: unable to convert public \
514 pkey = Keypair(create=True)
517 if pubkey is not None and pkey is not None :
518 hierarchy = Hierarchy()
519 person_gid = hierarchy.create_gid(person_urn, create_uuid(), \
521 if user_dict['email']:
522 logger.debug("__add_person_to_db \r\n \r\n \
523 SLAB IMPORTER PERSON EMAIL OK email %s "\
524 %(user_dict['email']))
525 person_gid.set_email(user_dict['email'])
527 user_record = RegUser(hrn=hrn , pointer= '-1', \
528 authority=get_authority(hrn), \
529 email=user_dict['email'], gid = person_gid)
530 user_record.reg_keys = [RegKey(user_dict['pkey'])]
531 user_record.just_created()
532 dbsession.add (user_record)
537 def AddPerson(self, record):
538 """Adds a new account. Any fields specified in records are used,
539 otherwise defaults are used. Creates an appropriate login by calling
541 :param record: dictionary with the sfa user's properties.
542 :return: The uid of the added person if sucessful, otherwise returns
543 the error message from LDAP.
544 :rtype: interger or string
546 ret = self.ldap.LdapAddUser(record)
548 if ret['bool'] is True:
549 record['hrn'] = self.root_auth + '.' + ret['uid']
550 logger.debug("IOTLABDRIVER AddPerson return code %s record %s \r\n "\
552 self.__add_person_to_db(record)
555 return ret['message']
559 #TODO AddPersonKey 04/07/2012 SA
560 def AddPersonKey(self, person_uid, old_attributes_dict, new_key_dict):
561 """Adds a new key to the specified account. Adds the key to the
562 iotlab ldap, provided that the person_uid is valid.
563 Non-admins can only modify their own keys.
565 :param person_uid: user's iotlab login in LDAP
566 :param old_attributes_dict: dict with the user's old sshPublicKey
567 :param new_key_dict:dict with the user's new sshPublicKey
568 :type person_uid: string
572 :return: True if the key has been modified, False otherwise.
575 ret = self.ldap.LdapModify(person_uid, old_attributes_dict, \
577 logger.warning("IOTLABDRIVER AddPersonKey EMPTY - DO NOTHING \r\n ")
580 def DeleteLeases(self, leases_id_list, slice_hrn ):
582 Deletes several leases, based on their job ids and the slice
583 they are associated with. Uses DeleteJobs to delete the jobs
584 on OAR. Note that one slice can contain multiple jobs, and in this case
585 all the jobs in the leases_id_list MUST belong to ONE slice,
586 since there is only one slice hrn provided here.
587 :param leases_id_list: list of job ids that belong to the slice whose
588 slice hrn is provided.
589 :param slice_hrn: the slice hrn .
590 ..warning: Does not have a return value since there was no easy
591 way to handle failure when dealing with multiple job delete. Plus,
592 there was no easy way to report it to the user.
594 logger.debug("IOTLABDRIVER DeleteLeases leases_id_list %s slice_hrn %s \
595 \r\n " %(leases_id_list, slice_hrn))
596 for job_id in leases_id_list:
597 self.DeleteJobs(job_id, slice_hrn)
602 def _process_walltime(duration):
603 """ Calculates the walltime in seconds from the duration in H:M:S
604 specified in the RSpec.
608 # Fixing the walltime by adding a few delays.
609 # First put the walltime in seconds oarAdditionalDelay = 20;
610 # additional delay for /bin/sleep command to
611 # take in account prologue and epilogue scripts execution
612 # int walltimeAdditionalDelay = 240; additional delay
613 #for prologue/epilogue execution = $SERVER_PROLOGUE_EPILOGUE_TIMEOUT
615 # Put the duration in seconds first
616 #desired_walltime = duration * 60
617 desired_walltime = duration
618 total_walltime = desired_walltime + 240 #+4 min Update SA 23/10/12
619 sleep_walltime = desired_walltime # 0 sec added Update SA 23/10/12
621 #Put the walltime back in str form
623 walltime.append(str(total_walltime / 3600))
624 total_walltime = total_walltime - 3600 * int(walltime[0])
625 #Get the remaining minutes
626 walltime.append(str(total_walltime / 60))
627 total_walltime = total_walltime - 60 * int(walltime[1])
629 walltime.append(str(total_walltime))
632 logger.log_exc(" __process_walltime duration null")
634 return walltime, sleep_walltime
637 def _create_job_structure_request_for_OAR(lease_dict):
638 """ Creates the structure needed for a correct POST on OAR.
639 Makes the timestamp transformation into the appropriate format.
640 Sends the POST request to create the job with the resources in
649 reqdict['workdir'] = '/tmp'
650 reqdict['resource'] = "{network_address in ("
652 for node in lease_dict['added_nodes']:
653 logger.debug("\r\n \r\n OARrestapi \t \
654 __create_job_structure_request_for_OAR node %s" %(node))
656 # Get the ID of the node
658 reqdict['resource'] += "'" + nodeid + "', "
659 nodeid_list.append(nodeid)
661 custom_length = len(reqdict['resource'])- 2
662 reqdict['resource'] = reqdict['resource'][0:custom_length] + \
663 ")}/nodes=" + str(len(nodeid_list))
666 walltime, sleep_walltime = \
667 IotlabTestbedAPI._process_walltime(\
668 int(lease_dict['lease_duration']))
671 reqdict['resource'] += ",walltime=" + str(walltime[0]) + \
672 ":" + str(walltime[1]) + ":" + str(walltime[2])
673 reqdict['script_path'] = "/bin/sleep " + str(sleep_walltime)
675 #In case of a scheduled experiment (not immediate)
676 #To run an XP immediately, don't specify date and time in RSpec
677 #They will be set to None.
678 if lease_dict['lease_start_time'] is not '0':
679 #Readable time accepted by OAR
680 start_time = datetime.fromtimestamp( \
681 int(lease_dict['lease_start_time'])).\
682 strftime(lease_dict['time_format'])
683 reqdict['reservation'] = start_time
684 #If there is not start time, Immediate XP. No need to add special
688 reqdict['type'] = "deploy"
689 reqdict['directory'] = ""
690 reqdict['name'] = "SFA_" + lease_dict['slice_user']
695 def LaunchExperimentOnOAR(self, added_nodes, slice_name, \
696 lease_start_time, lease_duration, slice_user=None):
699 Create a job request structure based on the information provided
700 and post the job on OAR.
701 :param added_nodes: list of nodes that belong to the described lease.
702 :param slice_name: the slice hrn associated to the lease.
703 :param lease_start_time: timestamp of the lease startting time.
704 :param lease_duration: lease durationin minutes
708 lease_dict['lease_start_time'] = lease_start_time
709 lease_dict['lease_duration'] = lease_duration
710 lease_dict['added_nodes'] = added_nodes
711 lease_dict['slice_name'] = slice_name
712 lease_dict['slice_user'] = slice_user
713 lease_dict['grain'] = self.GetLeaseGranularity()
714 lease_dict['time_format'] = self.time_format
717 logger.debug("IOTLABDRIVER.PY \tLaunchExperimentOnOAR slice_user %s\
718 \r\n " %(slice_user))
719 #Create the request for OAR
720 reqdict = self._create_job_structure_request_for_OAR(lease_dict)
721 # first step : start the OAR job and update the job
722 logger.debug("IOTLABDRIVER.PY \tLaunchExperimentOnOAR reqdict %s\
725 answer = self.oar.POSTRequestToOARRestAPI('POST_job', \
727 logger.debug("IOTLABDRIVER \tLaunchExperimentOnOAR jobid %s " %(answer))
731 logger.log_exc("IOTLABDRIVER \tLaunchExperimentOnOAR \
732 Impossible to create job %s " %(answer))
739 logger.debug("IOTLABDRIVER \tLaunchExperimentOnOAR jobid %s \
740 added_nodes %s slice_user %s" %(jobid, added_nodes, \
747 def AddLeases(self, hostname_list, slice_record, \
748 lease_start_time, lease_duration):
750 """Creates a job in OAR corresponding to the information provided
751 as parameters. Adds the job id and the slice hrn in the iotlab
752 database so that we are able to know which slice has which nodes.
754 :param hostname_list: list of nodes' OAR hostnames.
755 :param slice_record: sfa slice record, must contain login and hrn.
756 :param lease_start_time: starting time , unix timestamp format
757 :param lease_duration: duration in minutes
759 :type hostname_list: list
760 :type slice_record: dict
761 :type lease_start_time: integer
762 :type lease_duration: integer
765 logger.debug("IOTLABDRIVER \r\n \r\n \t AddLeases hostname_list %s \
766 slice_record %s lease_start_time %s lease_duration %s "\
767 %( hostname_list, slice_record , lease_start_time, \
770 #tmp = slice_record['reg-researchers'][0].split(".")
771 username = slice_record['login']
772 #username = tmp[(len(tmp)-1)]
773 job_id = self.LaunchExperimentOnOAR(hostname_list, \
774 slice_record['hrn'], \
775 lease_start_time, lease_duration, \
778 datetime.fromtimestamp(int(lease_start_time)).\
779 strftime(self.time_format)
780 end_time = lease_start_time + lease_duration
783 logger.debug("IOTLABDRIVER \r\n \r\n \t AddLeases TURN ON LOGGING SQL \
784 %s %s %s "%(slice_record['hrn'], job_id, end_time))
787 logger.debug("IOTLABDRIVER \r\n \r\n \t AddLeases %s %s %s " \
788 %(type(slice_record['hrn']), type(job_id), type(end_time)))
790 iotlab_ex_row = IotlabXP(slice_hrn = slice_record['hrn'], \
791 job_id = job_id, end_time= end_time)
793 logger.debug("IOTLABDRIVER \r\n \r\n \t AddLeases iotlab_ex_row %s" \
795 iotlab_dbsession.add(iotlab_ex_row)
796 iotlab_dbsession.commit()
798 logger.debug("IOTLABDRIVER \t AddLeases hostname_list start_time %s " \
804 #Delete the jobs from job_iotlab table
805 def DeleteSliceFromNodes(self, slice_record):
806 """ Deletes all the running or scheduled jobs of a given slice
808 :param slice_record: record of the slice
809 :type slice_record: dict
811 :return: dict of the jobs'deletion status. Success= True, Failure=
812 False, for each job id.
815 logger.debug("IOTLABDRIVER \t DeleteSliceFromNodese %s " %(slice_record))
817 if isinstance(slice_record['oar_job_id'], list):
819 for job_id in slice_record['oar_job_id']:
820 ret = self.DeleteJobs(job_id, slice_record['user'])
822 oar_bool_answer.update(ret)
825 oar_bool_answer = [self.DeleteJobs(slice_record['oar_job_id'], \
826 slice_record['user'])]
828 return oar_bool_answer
832 def GetLeaseGranularity(self):
833 """ Returns the granularity of an experiment in the Iotlab testbed.
834 OAR uses seconds for experiments duration , the granulaity is also
836 Experiments which last less than 10 min (600 sec) are invalid"""
841 def update_jobs_in_iotlabdb( job_oar_list, jobs_psql):
842 """ Cleans the iotlab db by deleting expired and cancelled jobs.
843 Compares the list of job ids given by OAR with the job ids that
844 are already in the database, deletes the jobs that are no longer in
846 :param job_oar_list: list of job ids coming from OAR
847 :type job_oar_list: list
848 :param job_psql: list of job ids cfrom the database.
851 #Turn the list into a set
852 set_jobs_psql = set(jobs_psql)
854 kept_jobs = set(job_oar_list).intersection(set_jobs_psql)
855 logger.debug ( "\r\n \t\ update_jobs_in_iotlabdb jobs_psql %s \r\n \t \
856 job_oar_list %s kept_jobs %s "%(set_jobs_psql, job_oar_list, kept_jobs))
857 deleted_jobs = set_jobs_psql.difference(kept_jobs)
858 deleted_jobs = list(deleted_jobs)
859 if len(deleted_jobs) > 0:
860 iotlab_dbsession.query(IotlabXP).filter(IotlabXP.job_id.in_(deleted_jobs)).delete(synchronize_session='fetch')
861 iotlab_dbsession.commit()
867 def GetLeases(self, lease_filter_dict=None, login=None):
868 """ Get the list of leases from OAR with complete information
869 about which slice owns which jobs and nodes.
871 -Fetch all the jobs from OAR (running, waiting..)
872 complete the reservation information with slice hrn
873 found in iotlab_xp table. If not available in the table,
874 assume it is a iotlab slice.
875 -Updates the iotlab table, deleting jobs when necessary.
876 :return: reservation_list, list of dictionaries with 'lease_id',
877 'reserved_nodes','slice_id', 'state', 'user', 'component_id_list',
878 'slice_hrn', 'resource_ids', 't_from', 't_until'
882 unfiltered_reservation_list = self.GetReservedNodes(login)
884 reservation_list = []
885 #Find the slice associated with this user iotlab ldap uid
886 logger.debug(" IOTLABDRIVER.PY \tGetLeases login %s\
887 unfiltered_reservation_list %s " %(login, unfiltered_reservation_list))
888 #Create user dict first to avoid looking several times for
889 #the same user in LDAP SA 27/07/12
892 jobs_psql_query = iotlab_dbsession.query(IotlabXP).all()
893 jobs_psql_dict = dict([(row.job_id, row.__dict__ ) for row in jobs_psql_query ])
894 #jobs_psql_dict = jobs_psql_dict)
895 logger.debug("IOTLABDRIVER \tGetLeases jobs_psql_dict %s"\
897 jobs_psql_id_list = [ row.job_id for row in jobs_psql_query ]
901 for resa in unfiltered_reservation_list:
902 logger.debug("IOTLABDRIVER \tGetLeases USER %s"\
904 #Construct list of jobs (runing, waiting..) in oar
905 job_oar_list.append(resa['lease_id'])
906 #If there is information on the job in SLAB DB ]
907 #(slice used and job id)
908 if resa['lease_id'] in jobs_psql_dict:
909 job_info = jobs_psql_dict[resa['lease_id']]
910 logger.debug("IOTLABDRIVER \tGetLeases job_info %s"\
912 resa['slice_hrn'] = job_info['slice_hrn']
913 resa['slice_id'] = hrn_to_urn(resa['slice_hrn'], 'slice')
915 #otherwise, assume it is a iotlab slice:
917 resa['slice_id'] = hrn_to_urn(self.root_auth+'.'+ \
918 resa['user'] +"_slice" , 'slice')
919 resa['slice_hrn'] = Xrn(resa['slice_id']).get_hrn()
921 resa['component_id_list'] = []
922 #Transform the hostnames into urns (component ids)
923 for node in resa['reserved_nodes']:
925 iotlab_xrn = iotlab_xrn_object(self.root_auth, node)
926 resa['component_id_list'].append(iotlab_xrn.urn)
928 if lease_filter_dict:
929 logger.debug("IOTLABDRIVER \tGetLeases resa_ %s \
930 \r\n leasefilter %s" %(resa, lease_filter_dict))
932 if lease_filter_dict['name'] == resa['slice_hrn']:
933 reservation_list.append(resa)
935 if lease_filter_dict is None:
936 reservation_list = unfiltered_reservation_list
939 self.update_jobs_in_iotlabdb(job_oar_list, jobs_psql_id_list)
941 logger.debug(" IOTLABDRIVER.PY \tGetLeases reservation_list %s"\
943 return reservation_list
948 #TODO FUNCTIONS SECTION 04/07/2012 SA
950 ##TODO : Is UnBindObjectFromPeer still necessary ? Currently does nothing
953 #def UnBindObjectFromPeer( auth, object_type, object_id, shortname):
954 #""" This method is a hopefully temporary hack to let the sfa correctly
955 #detach the objects it creates from a remote peer object. This is
956 #needed so that the sfa federation link can work in parallel with
957 #RefreshPeer, as RefreshPeer depends on remote objects being correctly
960 #auth : struct, API authentication structure
961 #AuthMethod : string, Authentication method to use
962 #object_type : string, Object type, among 'site','person','slice',
964 #object_id : int, object_id
965 #shortname : string, peer shortname
969 #logger.warning("IOTLABDRIVER \tUnBindObjectFromPeer EMPTY-\
973 ##TODO Is BindObjectToPeer still necessary ? Currently does nothing
975 #|| Commented out 28/05/13 SA
976 #def BindObjectToPeer(self, auth, object_type, object_id, shortname=None, \
977 #remote_object_id=None):
978 #"""This method is a hopefully temporary hack to let the sfa correctly
979 #attach the objects it creates to a remote peer object. This is needed
980 #so that the sfa federation link can work in parallel with RefreshPeer,
981 #as RefreshPeer depends on remote objects being correctly marked.
983 #shortname : string, peer shortname
984 #remote_object_id : int, remote object_id, set to 0 if unknown
988 #logger.warning("IOTLABDRIVER \tBindObjectToPeer EMPTY - DO NOTHING \r\n ")
991 ##TODO UpdateSlice 04/07/2012 SA || Commented out 28/05/13 SA
992 ##Funciton should delete and create another job since oin iotlab slice=job
993 #def UpdateSlice(self, auth, slice_id_or_name, slice_fields=None):
994 #"""Updates the parameters of an existing slice with the values in
996 #Users may only update slices of which they are members.
997 #PIs may update any of the slices at their sites, or any slices of
998 #which they are members. Admins may update any slice.
999 #Only PIs and admins may update max_nodes. Slices cannot be renewed
1000 #(by updating the expires parameter) more than 8 weeks into the future.
1001 #Returns 1 if successful, faults otherwise.
1005 #logger.warning("IOTLABDRIVER UpdateSlice EMPTY - DO NOTHING \r\n ")
1008 #Unused SA 30/05/13, we only update the user's key or we delete it.
1009 ##TODO UpdatePerson 04/07/2012 SA
1010 #def UpdatePerson(self, iotlab_hrn, federated_hrn, person_fields=None):
1011 #"""Updates a person. Only the fields specified in person_fields
1012 #are updated, all other fields are left untouched.
1013 #Users and techs can only update themselves. PIs can only update
1014 #themselves and other non-PIs at their sites.
1015 #Returns 1 if successful, faults otherwise.
1019 ##new_row = FederatedToIotlab(iotlab_hrn, federated_hrn)
1020 ##iotlab_dbsession.add(new_row)
1021 ##iotlab_dbsession.commit()
1023 #logger.debug("IOTLABDRIVER UpdatePerson EMPTY - DO NOTHING \r\n ")
1027 def GetKeys(key_filter=None):
1028 """Returns a dict of dict based on the key string. Each dict entry
1029 contains the key id, the ssh key, the user's email and the
1031 If key_filter is specified and is an array of key identifiers,
1032 only keys matching the filter will be returned.
1034 Admin may query all keys. Non-admins may only query their own keys.
1037 :return: dict with ssh key as key and dicts as value.
1040 if key_filter is None:
1041 keys = dbsession.query(RegKey).options(joinedload('reg_user')).all()
1043 keys = dbsession.query(RegKey).options(joinedload('reg_user')).filter(RegKey.key.in_(key_filter)).all()
1047 key_dict[key.key] = {'key_id': key.key_id, 'key': key.key, \
1048 'email': key.reg_user.email, 'hrn':key.reg_user.hrn}
1050 #ldap_rslt = self.ldap.LdapSearch({'enabled']=True})
1051 #user_by_email = dict((user[1]['mail'][0], user[1]['sshPublicKey']) \
1052 #for user in ldap_rslt)
1054 logger.debug("IOTLABDRIVER GetKeys -key_dict %s \r\n " %(key_dict))
1058 def DeleteKey(self, user_record, key_string):
1059 """ Deletes a key in the LDAP entry of the specified user.
1060 Removes the key_string from the user's key list and updates the LDAP
1061 user's entry with the new key attributes.
1062 :param key_string: The ssh key to remove
1063 :param user_record: User's record
1064 :type key_string: string
1065 :type user_record: dict
1066 :return: True if sucessful, False if not.
1070 all_user_keys = user_record['keys']
1071 all_user_keys.remove(key_string)
1072 new_attributes = {'sshPublicKey':all_user_keys}
1073 ret = self.ldap.LdapModifyUser(user_record, new_attributes)
1074 logger.debug("IOTLABDRIVER DeleteKey %s- "%(ret))
1081 def _sql_get_slice_info( slice_filter ):
1083 Get the slice record based on the slice hrn. Fetch the record of the
1084 user associated with the slice by usingjoinedload based on t
1085 he reg_researcher relationship.
1086 :param slice_filter: the slice hrn we are looking for
1087 :type slice_filter: string
1088 :return: the slice record enhanced with the user's information if the
1089 slice was found, None it wasn't.
1090 :rtype: dict or None.
1092 #DO NOT USE RegSlice - reg_researchers to get the hrn
1093 #of the user otherwise will mess up the RegRecord in
1094 #Resolve, don't know why - SA 08/08/2012
1096 #Only one entry for one user = one slice in iotlab_xp table
1097 #slicerec = dbsession.query(RegRecord).filter_by(hrn = slice_filter).first()
1098 raw_slicerec = dbsession.query(RegSlice).options(joinedload('reg_researchers')).filter_by(hrn = slice_filter).first()
1099 #raw_slicerec = dbsession.query(RegRecord).filter_by(hrn = slice_filter).first()
1101 #load_reg_researcher
1102 #raw_slicerec.reg_researchers
1103 raw_slicerec = raw_slicerec.__dict__
1104 logger.debug(" IOTLABDRIVER \t get_slice_info slice_filter %s \
1105 raw_slicerec %s"%(slice_filter, raw_slicerec))
1106 slicerec = raw_slicerec
1107 #only one researcher per slice so take the first one
1108 #slicerec['reg_researchers'] = raw_slicerec['reg_researchers']
1109 #del slicerec['reg_researchers']['_sa_instance_state']
1116 def _sql_get_slice_info_from_user(slice_filter ):
1118 Get the slice record based on the user recordid by using a joinedload
1119 on the relationship reg_slices_as_researcher. Format the sql record
1120 into a dict with the mandatory fields for user and slice.
1121 :return: dict with slice record and user record if the record was found
1122 based on the user's id, None if not..
1123 :rtype:dict or None..
1125 #slicerec = dbsession.query(RegRecord).filter_by(record_id = slice_filter).first()
1126 raw_slicerec = dbsession.query(RegUser).options(joinedload('reg_slices_as_researcher')).filter_by(record_id = slice_filter).first()
1127 #raw_slicerec = dbsession.query(RegRecord).filter_by(record_id = slice_filter).first()
1128 #Put it in correct order
1129 user_needed_fields = ['peer_authority', 'hrn', 'last_updated', 'classtype', 'authority', 'gid', 'record_id', 'date_created', 'type', 'email', 'pointer']
1130 slice_needed_fields = ['peer_authority', 'hrn', 'last_updated', 'classtype', 'authority', 'gid', 'record_id', 'date_created', 'type', 'pointer']
1132 #raw_slicerec.reg_slices_as_researcher
1133 raw_slicerec = raw_slicerec.__dict__
1136 dict([(k, raw_slicerec['reg_slices_as_researcher'][0].__dict__[k]) \
1137 for k in slice_needed_fields])
1138 slicerec['reg_researchers'] = dict([(k, raw_slicerec[k]) \
1139 for k in user_needed_fields])
1140 #TODO Handle multiple slices for one user SA 10/12/12
1141 #for now only take the first slice record associated to the rec user
1142 ##slicerec = raw_slicerec['reg_slices_as_researcher'][0].__dict__
1143 #del raw_slicerec['reg_slices_as_researcher']
1144 #slicerec['reg_researchers'] = raw_slicerec
1145 ##del slicerec['_sa_instance_state']
1152 def _get_slice_records(self, slice_filter = None, \
1153 slice_filter_type = None):
1155 Get the slice record depending on the slice filter and its type.
1156 :param slice_filter: Can be either the slice hrn or the user's record
1158 :type slice_filter: string
1159 :param slice_filter_type: describes the slice filter type used, can be
1160 slice_hrn or record_id_user
1162 :return: the slice record
1164 ..seealso:_sql_get_slice_info_from_user
1165 ..seealso: _sql_get_slice_info
1168 #Get list of slices based on the slice hrn
1169 if slice_filter_type == 'slice_hrn':
1171 #if get_authority(slice_filter) == self.root_auth:
1172 #login = slice_filter.split(".")[1].split("_")[0]
1174 slicerec = self._sql_get_slice_info(slice_filter)
1176 if slicerec is None:
1180 #Get slice based on user id
1181 if slice_filter_type == 'record_id_user':
1183 slicerec = self._sql_get_slice_info_from_user(slice_filter)
1186 fixed_slicerec_dict = slicerec
1187 #At this point if there is no login it means
1188 #record_id_user filter has been used for filtering
1190 ##If theslice record is from iotlab
1191 #if fixed_slicerec_dict['peer_authority'] is None:
1192 #login = fixed_slicerec_dict['hrn'].split(".")[1].split("_")[0]
1193 #return login, fixed_slicerec_dict
1194 return fixed_slicerec_dict
1198 def GetSlices(self, slice_filter = None, slice_filter_type = None, \
1200 """ Get the slice records from the iotlab db and add lease information
1203 :param slice_filter: can be the slice hrn or slice record id in the db
1204 depending on the slice_filter_type.
1205 :param slice_filter_type: defines the type of the filtering used, Can be
1206 either 'slice_hrn' or "record_id'.
1207 :type slice_filter: string
1208 :type slice_filter_type: string
1209 :return: a slice dict if slice_filter and slice_filter_type
1210 are specified and a matching entry is found in the db. The result
1211 is put into a list.Or a list of slice dictionnaries if no filters are
1217 authorized_filter_types_list = ['slice_hrn', 'record_id_user']
1218 return_slicerec_dictlist = []
1220 #First try to get information on the slice based on the filter provided
1221 if slice_filter_type in authorized_filter_types_list:
1222 fixed_slicerec_dict = \
1223 self._get_slice_records(slice_filter, slice_filter_type)
1224 slice_hrn = fixed_slicerec_dict['hrn']
1226 logger.debug(" IOTLABDRIVER \tGetSlices login %s \
1227 slice record %s slice_filter %s \
1228 slice_filter_type %s " %(login, \
1229 fixed_slicerec_dict, slice_filter, \
1233 #Now we have the slice record fixed_slicerec_dict, get the
1234 #jobs associated to this slice
1237 leases_list = self.GetLeases(login = login)
1238 #If no job is running or no job scheduled
1239 #return only the slice record
1240 if leases_list == [] and fixed_slicerec_dict:
1241 return_slicerec_dictlist.append(fixed_slicerec_dict)
1243 #If several jobs for one slice , put the slice record into
1244 # each lease information dict
1247 for lease in leases_list :
1249 logger.debug("IOTLABDRIVER.PY \tGetSlices slice_filter %s \
1250 \ lease['slice_hrn'] %s" \
1251 %(slice_filter, lease['slice_hrn']))
1252 if lease['slice_hrn'] == slice_hrn:
1253 slicerec_dict['slice_hrn'] = lease['slice_hrn']
1254 slicerec_dict['hrn'] = lease['slice_hrn']
1255 slicerec_dict['user'] = lease['user']
1256 slicerec_dict['oar_job_id'] = lease['lease_id']
1257 slicerec_dict.update({'list_node_ids':{'hostname':lease['reserved_nodes']}})
1258 slicerec_dict.update({'node_ids':lease['reserved_nodes']})
1260 #Update lease dict with the slice record
1261 if fixed_slicerec_dict:
1262 fixed_slicerec_dict['oar_job_id'] = []
1263 fixed_slicerec_dict['oar_job_id'].append(slicerec_dict['oar_job_id'])
1264 slicerec_dict.update(fixed_slicerec_dict)
1265 #slicerec_dict.update({'hrn':\
1266 #str(fixed_slicerec_dict['slice_hrn'])})
1268 return_slicerec_dictlist.append(slicerec_dict)
1269 logger.debug("IOTLABDRIVER.PY \tGetSlices \
1270 OHOHOHOH %s" %(return_slicerec_dictlist ))
1272 logger.debug("IOTLABDRIVER.PY \tGetSlices \
1273 slicerec_dict %s return_slicerec_dictlist %s \
1274 lease['reserved_nodes'] \
1275 %s" %(slicerec_dict, return_slicerec_dictlist, \
1276 lease['reserved_nodes'] ))
1278 logger.debug("IOTLABDRIVER.PY \tGetSlices RETURN \
1279 return_slicerec_dictlist %s" \
1280 %(return_slicerec_dictlist))
1282 return return_slicerec_dictlist
1286 #Get all slices from the iotlab sfa database ,
1287 #put them in dict format
1288 #query_slice_list = dbsession.query(RegRecord).all()
1289 query_slice_list = dbsession.query(RegSlice).options(joinedload('reg_researchers')).all()
1291 for record in query_slice_list:
1292 tmp = record.__dict__
1293 tmp['reg_researchers'] = tmp['reg_researchers'][0].__dict__
1294 #del tmp['reg_researchers']['_sa_instance_state']
1295 return_slicerec_dictlist.append(tmp)
1296 #return_slicerec_dictlist.append(record.__dict__)
1298 #Get all the jobs reserved nodes
1299 leases_list = self.GetReservedNodes()
1302 for fixed_slicerec_dict in return_slicerec_dictlist:
1304 #Check if the slice belongs to a iotlab user
1305 if fixed_slicerec_dict['peer_authority'] is None:
1306 owner = fixed_slicerec_dict['hrn'].split(".")[1].split("_")[0]
1309 for lease in leases_list:
1310 if owner == lease['user']:
1311 slicerec_dict['oar_job_id'] = lease['lease_id']
1313 #for reserved_node in lease['reserved_nodes']:
1314 logger.debug("IOTLABDRIVER.PY \tGetSlices lease %s "\
1317 slicerec_dict.update({'node_ids':lease['reserved_nodes']})
1318 slicerec_dict.update({'list_node_ids':{'hostname':lease['reserved_nodes']}})
1319 slicerec_dict.update(fixed_slicerec_dict)
1320 #slicerec_dict.update({'hrn':\
1321 #str(fixed_slicerec_dict['slice_hrn'])})
1322 #return_slicerec_dictlist.append(slicerec_dict)
1323 fixed_slicerec_dict.update(slicerec_dict)
1325 logger.debug("IOTLABDRIVER.PY \tGetSlices RETURN \
1326 return_slicerec_dictlist %s \slice_filter %s " \
1327 %(return_slicerec_dictlist, slice_filter))
1329 return return_slicerec_dictlist
1333 #Update slice unused, therefore sfa_fields_to_iotlab_fields unused
1336 #def sfa_fields_to_iotlab_fields(sfa_type, hrn, record):
1341 ##for field in record:
1342 ## iotlab_record[field] = record[field]
1344 #if sfa_type == "slice":
1345 ##instantion used in get_slivers ?
1346 #if not "instantiation" in iotlab_record:
1347 #iotlab_record["instantiation"] = "iotlab-instantiated"
1348 ##iotlab_record["hrn"] = hrn_to_pl_slicename(hrn)
1349 ##Unused hrn_to_pl_slicename because Slab's hrn already
1350 ##in the appropriate form SA 23/07/12
1351 #iotlab_record["hrn"] = hrn
1352 #logger.debug("IOTLABDRIVER.PY sfa_fields_to_iotlab_fields \
1353 #iotlab_record %s " %(iotlab_record['hrn']))
1354 #if "url" in record:
1355 #iotlab_record["url"] = record["url"]
1356 #if "description" in record:
1357 #iotlab_record["description"] = record["description"]
1358 #if "expires" in record:
1359 #iotlab_record["expires"] = int(record["expires"])
1361 ##nodes added by OAR only and then imported to SFA
1362 ##elif type == "node":
1363 ##if not "hostname" in iotlab_record:
1364 ##if not "hostname" in record:
1365 ##raise MissingSfaInfo("hostname")
1366 ##iotlab_record["hostname"] = record["hostname"]
1367 ##if not "model" in iotlab_record:
1368 ##iotlab_record["model"] = "geni"
1370 ##One authority only
1371 ##elif type == "authority":
1372 ##iotlab_record["login_base"] = hrn_to_iotlab_login_base(hrn)
1374 ##if not "name" in iotlab_record:
1375 ##iotlab_record["name"] = hrn
1377 ##if not "abbreviated_name" in iotlab_record:
1378 ##iotlab_record["abbreviated_name"] = hrn
1380 ##if not "enabled" in iotlab_record:
1381 ##iotlab_record["enabled"] = True
1383 ##if not "is_public" in iotlab_record:
1384 ##iotlab_record["is_public"] = True
1386 #return iotlab_record