1 from datetime import datetime
3 from sfa.util.sfalogging import logger
5 from sfa.storage.alchemy import dbsession
6 from sqlalchemy.orm import joinedload
7 from sfa.storage.model import RegRecord, RegUser, RegSlice, RegKey
8 from sfa.senslab.slabpostgres import SlabDB, slab_dbsession, SenslabXP
10 from sfa.senslab.OARrestapi import OARrestapi
11 from sfa.senslab.LDAPapi import LDAPapi
13 from sfa.util.xrn import Xrn, hrn_to_urn, get_authority
15 from sfa.trust.certificate import Keypair, convert_public_key
16 from sfa.trust.gid import create_uuid
17 from sfa.trust.hierarchy import Hierarchy
20 from sfa.senslab.slabaggregate import SlabAggregate, slab_xrn_to_hostname, \
23 class SlabTestbedAPI():
25 def __init__(self, config):
26 self.oar = OARrestapi()
28 self.time_format = "%Y-%m-%d %H:%M:%S"
29 self.root_auth = config.SFA_REGISTRY_ROOT_AUTH
30 self.grain = 600 # 10 mins lease
35 #TODO clean GetPeers. 05/07/12SA
37 def GetPeers ( auth = None, peer_filter=None, return_fields_list=None):
40 existing_hrns_by_types = {}
41 logger.debug("SLABDRIVER \tGetPeers auth = %s, peer_filter %s, \
42 return_field %s " %(auth , peer_filter, return_fields_list))
43 all_records = dbsession.query(RegRecord).filter(RegRecord.type.like('%authority%')).all()
45 for record in all_records:
46 existing_records[(record.hrn, record.type)] = record
47 if record.type not in existing_hrns_by_types:
48 existing_hrns_by_types[record.type] = [record.hrn]
50 existing_hrns_by_types[record.type].append(record.hrn)
53 logger.debug("SLABDRIVER \tGetPeer\texisting_hrns_by_types %s "\
54 %( existing_hrns_by_types))
59 records_list.append(existing_records[(peer_filter,'authority')])
61 for hrn in existing_hrns_by_types['authority']:
62 records_list.append(existing_records[(hrn,'authority')])
64 logger.debug("SLABDRIVER \tGetPeer \trecords_list %s " \
70 return_records = records_list
71 if not peer_filter and not return_fields_list:
75 logger.debug("SLABDRIVER \tGetPeer return_records %s " \
81 #TODO : Handling OR request in make_ldap_filters_from_records
82 #instead of the for loop
83 #over the records' list
84 def GetPersons(self, person_filter=None):
86 person_filter should be a list of dictionnaries when not set to None.
87 Returns a list of users whose accounts are enabled found in ldap.
90 logger.debug("SLABDRIVER \tGetPersons person_filter %s" \
93 if person_filter and isinstance(person_filter, list):
94 #If we are looking for a list of users (list of dict records)
95 #Usually the list contains only one user record
96 for searched_attributes in person_filter:
98 #Get only enabled user accounts in senslab LDAP :
99 #add a filter for make_ldap_filters_from_record
100 person = self.ldap.LdapFindUser(searched_attributes, \
101 is_user_enabled=True)
102 #If a person was found, append it to the list
104 person_list.append(person)
106 #If the list is empty, return None
107 if len(person_list) is 0:
111 #Get only enabled user accounts in senslab LDAP :
112 #add a filter for make_ldap_filters_from_record
113 person_list = self.ldap.LdapFindUser(is_user_enabled=True)
120 def GetTimezone(self):
121 """ Get the OAR servier time and timezone.
122 Unused SA 16/11/12"""
123 server_timestamp, server_tz = self.oar.parser.\
124 SendRequest("GET_timezone")
125 return server_timestamp, server_tz
130 def DeleteJobs(self, job_id, username):
132 """Delete a job on OAR given its job id and the username assoaciated.
133 Posts a delete request to OAR."""
134 logger.debug("SLABDRIVER \tDeleteJobs jobid %s username %s " %(job_id, username))
135 if not job_id or job_id is -1:
139 reqdict['method'] = "delete"
140 reqdict['strval'] = str(job_id)
143 answer = self.oar.POSTRequestToOARRestAPI('DELETE_jobs_id', \
145 logger.debug("SLABDRIVER \tDeleteJobs jobid %s \r\n answer %s \
146 username %s" %(job_id, answer, username))
151 ##TODO : Unused GetJobsId ? SA 05/07/12
152 #def GetJobsId(self, job_id, username = None ):
154 #Details about a specific job.
155 #Includes details about submission time, jot type, state, events,
156 #owner, assigned ressources, walltime etc...
160 #node_list_k = 'assigned_network_address'
161 ##Get job info from OAR
162 #job_info = self.oar.parser.SendRequest(req, job_id, username)
164 #logger.debug("SLABDRIVER \t GetJobsId %s " %(job_info))
166 #if job_info['state'] == 'Terminated':
167 #logger.debug("SLABDRIVER \t GetJobsId job %s TERMINATED"\
170 #if job_info['state'] == 'Error':
171 #logger.debug("SLABDRIVER \t GetJobsId ERROR message %s "\
176 #logger.error("SLABDRIVER \tGetJobsId KeyError")
179 #parsed_job_info = self.get_info_on_reserved_nodes(job_info, \
181 ##Replaces the previous entry
182 ##"assigned_network_address" / "reserved_resources"
184 #job_info.update({'node_ids':parsed_job_info[node_list_k]})
185 #del job_info[node_list_k]
186 #logger.debug(" \r\nSLABDRIVER \t GetJobsId job_info %s " %(job_info))
190 def GetJobsResources(self, job_id, username = None):
191 """ Gets the list of nodes associated with the job_id.
192 Transforms the senslab hostnames to the corresponding
194 Rertuns dict key :'node_ids' , value : hostnames list """
196 req = "GET_jobs_id_resources"
199 #Get job resources list from OAR
200 node_id_list = self.oar.parser.SendRequest(req, job_id, username)
201 logger.debug("SLABDRIVER \t GetJobsResources %s " %(node_id_list))
204 self.__get_hostnames_from_oar_node_ids(node_id_list)
207 #Replaces the previous entry "assigned_network_address" /
208 #"reserved_resources" with "node_ids"
209 job_info = {'node_ids': hostname_list}
214 def get_info_on_reserved_nodes(self, job_info, node_list_name):
215 #Get the list of the testbed nodes records and make a
216 #dictionnary keyed on the hostname out of it
217 node_list_dict = self.GetNodes()
218 #node_hostname_list = []
219 node_hostname_list = [node['hostname'] for node in node_list_dict]
220 #for node in node_list_dict:
221 #node_hostname_list.append(node['hostname'])
222 node_dict = dict(zip(node_hostname_list, node_list_dict))
224 reserved_node_hostname_list = []
225 for index in range(len(job_info[node_list_name])):
226 #job_info[node_list_name][k] =
227 reserved_node_hostname_list[index] = \
228 node_dict[job_info[node_list_name][index]]['hostname']
230 logger.debug("SLABDRIVER \t get_info_on_reserved_nodes \
231 reserved_node_hostname_list %s" \
232 %(reserved_node_hostname_list))
234 logger.error("SLABDRIVER \t get_info_on_reserved_nodes KEYERROR " )
236 return reserved_node_hostname_list
238 def GetNodesCurrentlyInUse(self):
239 """Returns a list of all the nodes already involved in an oar job"""
240 return self.oar.parser.SendRequest("GET_running_jobs")
242 def __get_hostnames_from_oar_node_ids(self, resource_id_list ):
243 full_nodes_dict_list = self.GetNodes()
244 #Put the full node list into a dictionary keyed by oar node id
245 oar_id_node_dict = {}
246 for node in full_nodes_dict_list:
247 oar_id_node_dict[node['oar_id']] = node
249 #logger.debug("SLABDRIVER \t __get_hostnames_from_oar_node_ids\
250 #oar_id_node_dict %s" %(oar_id_node_dict))
252 hostname_dict_list = []
253 for resource_id in resource_id_list:
254 #Because jobs requested "asap" do not have defined resources
255 if resource_id is not "Undefined":
256 hostname_dict_list.append(\
257 oar_id_node_dict[resource_id]['hostname'])
259 #hostname_list.append(oar_id_node_dict[resource_id]['hostname'])
260 return hostname_dict_list
262 def GetReservedNodes(self, username = None):
263 #Get the nodes in use and the reserved nodes
264 reservation_dict_list = \
265 self.oar.parser.SendRequest("GET_reserved_nodes", \
269 for resa in reservation_dict_list:
270 logger.debug ("GetReservedNodes resa %s"%(resa))
271 #dict list of hostnames and their site
272 resa['reserved_nodes'] = \
273 self.__get_hostnames_from_oar_node_ids(resa['resource_ids'])
275 #del resa['resource_ids']
276 return reservation_dict_list
278 def GetNodes(self, node_filter_dict = None, return_fields_list = None):
280 node_filter_dict : dictionnary of lists
283 node_dict_by_id = self.oar.parser.SendRequest("GET_resources_full")
284 node_dict_list = node_dict_by_id.values()
285 logger.debug (" SLABDRIVER GetNodes node_filter_dict %s \
286 return_fields_list %s "%(node_filter_dict, return_fields_list))
287 #No filtering needed return the list directly
288 if not (node_filter_dict or return_fields_list):
289 return node_dict_list
291 return_node_list = []
293 for filter_key in node_filter_dict:
295 #Filter the node_dict_list by each value contained in the
296 #list node_filter_dict[filter_key]
297 for value in node_filter_dict[filter_key]:
298 for node in node_dict_list:
299 if node[filter_key] == value:
300 if return_fields_list :
302 for k in return_fields_list:
304 return_node_list.append(tmp)
306 return_node_list.append(node)
308 logger.log_exc("GetNodes KeyError")
312 return return_node_list
317 def AddSlice(slice_record, user_record):
318 """Add slice to the sfa tables. Called by verify_slice
319 during lease/sliver creation.
322 sfa_record = RegSlice(hrn=slice_record['hrn'],
323 gid=slice_record['gid'],
324 pointer=slice_record['slice_id'],
325 authority=slice_record['authority'])
327 logger.debug("SLABDRIVER.PY AddSlice sfa_record %s user_record %s" \
328 %(sfa_record, user_record))
329 sfa_record.just_created()
330 dbsession.add(sfa_record)
332 #Update the reg-researcher dependance table
333 sfa_record.reg_researchers = [user_record]
336 #Update the senslab table with the new slice
337 #slab_slice = SenslabXP( slice_hrn = slice_record['slice_hrn'], \
338 #record_id_slice = sfa_record.record_id , \
339 #record_id_user = slice_record['record_id_user'], \
340 #peer_authority = slice_record['peer_authority'])
342 #logger.debug("SLABDRIVER.PY \tAddSlice slice_record %s \
343 #slab_slice %s sfa_record %s" \
344 #%(slice_record,slab_slice, sfa_record))
345 #slab_dbsession.add(slab_slice)
346 #slab_dbsession.commit()
349 def GetSites(self, site_filter_name_list = None, return_fields_list = None):
350 site_dict = self.oar.parser.SendRequest("GET_sites")
351 #site_dict : dict where the key is the sit ename
352 return_site_list = []
353 if not ( site_filter_name_list or return_fields_list):
354 return_site_list = site_dict.values()
355 return return_site_list
357 for site_filter_name in site_filter_name_list:
358 if site_filter_name in site_dict:
359 if return_fields_list:
360 for field in return_fields_list:
363 tmp[field] = site_dict[site_filter_name][field]
365 logger.error("GetSites KeyError %s "%(field))
367 return_site_list.append(tmp)
369 return_site_list.append( site_dict[site_filter_name])
372 return return_site_list
378 #TODO : Check rights to delete person
379 def DeletePerson(self, person_record):
380 """ Disable an existing account in senslab LDAP.
381 Users and techs can only delete themselves. PIs can only
382 delete themselves and other non-PIs at their sites.
383 ins can delete anyone.
384 Returns 1 if successful, faults otherwise.
388 #Disable user account in senslab LDAP
389 ret = self.ldap.LdapMarkUserAsDeleted(person_record)
390 logger.warning("SLABDRIVER DeletePerson %s " %(person_record))
393 #TODO Check DeleteSlice, check rights 05/07/2012 SA
394 def DeleteSlice(self, slice_record):
395 """ Deletes the specified slice.
396 Senslab : Kill the job associated with the slice if there is one
397 using DeleteSliceFromNodes.
398 Updates the slice record in slab db to remove the slice nodes.
400 Users may only delete slices of which they are members. PIs may
401 delete any of the slices at their sites, or any slices of which
402 they are members. Admins may delete any slice.
403 Returns 1 if successful, faults otherwise.
407 self.DeleteSliceFromNodes(slice_record)
408 logger.warning("SLABDRIVER DeleteSlice %s "%(slice_record))
412 def __add_person_to_db(user_dict):
414 check_if_exists = dbsession.query(RegUser).filter_by(email = user_dict['email']).first()
416 if not check_if_exists:
417 logger.debug("__add_person_to_db \t Adding %s \r\n \r\n \
418 _________________________________________________________________________\
420 hrn = user_dict['hrn']
421 person_urn = hrn_to_urn(hrn, 'user')
422 pubkey = user_dict['pkey']
424 pkey = convert_public_key(pubkey)
426 #key not good. create another pkey
427 logger.warn('__add_person_to_db: unable to convert public \
429 pkey = Keypair(create=True)
432 if pubkey is not None and pkey is not None :
433 hierarchy = Hierarchy()
434 person_gid = hierarchy.create_gid(person_urn, create_uuid(), pkey)
435 if user_dict['email']:
436 logger.debug("__add_person_to_db \r\n \r\n SLAB IMPORTER PERSON EMAIL OK email %s " %(user_dict['email']))
437 person_gid.set_email(user_dict['email'])
439 user_record = RegUser(hrn=hrn , pointer= '-1', authority=get_authority(hrn), \
440 email=user_dict['email'], gid = person_gid)
441 user_record.reg_keys = [RegKey(user_dict['pkey'])]
442 user_record.just_created()
443 dbsession.add (user_record)
447 #TODO AddPerson 04/07/2012 SA
448 #def AddPerson(self, auth, person_fields=None):
449 def AddPerson(self, record):#TODO fixing 28/08//2012 SA
450 """Adds a new account. Any fields specified in records are used,
451 otherwise defaults are used.
452 Accounts are disabled by default. To enable an account,
454 Returns the new person_id (> 0) if successful, faults otherwise.
458 ret = self.ldap.LdapAddUser(record)
460 record['hrn'] = self.root_auth + '.' + ret['uid']
461 logger.debug("SLABDRIVER AddPerson return code %s record %s \r\n "\
463 self.__add_person_to_db(record)
466 #TODO AddPersonToSite 04/07/2012 SA
467 def AddPersonToSite (self, auth, person_id_or_email, \
468 site_id_or_login_base=None):
469 """ Adds the specified person to the specified site. If the person is
470 already a member of the site, no errors are returned. Does not change
471 the person's primary site.
472 Returns 1 if successful, faults otherwise.
476 logger.warning("SLABDRIVER AddPersonToSite EMPTY - DO NOTHING \r\n ")
479 #TODO AddRoleToPerson : Not sure if needed in senslab 04/07/2012 SA
480 def AddRoleToPerson(self, auth, role_id_or_name, person_id_or_email):
481 """Grants the specified role to the person.
482 PIs can only grant the tech and user roles to users and techs at their
483 sites. Admins can grant any role to any user.
484 Returns 1 if successful, faults otherwise.
488 logger.warning("SLABDRIVER AddRoleToPerson EMPTY - DO NOTHING \r\n ")
491 #TODO AddPersonKey 04/07/2012 SA
492 def AddPersonKey(self, auth, person_id_or_email, key_fields=None):
493 """Adds a new key to the specified account.
494 Non-admins can only modify their own keys.
495 Returns the new key_id (> 0) if successful, faults otherwise.
499 logger.warning("SLABDRIVER AddPersonKey EMPTY - DO NOTHING \r\n ")
502 def DeleteLeases(self, leases_id_list, slice_hrn ):
503 logger.debug("SLABDRIVER DeleteLeases leases_id_list %s slice_hrn %s \
504 \r\n " %(leases_id_list, slice_hrn))
505 for job_id in leases_id_list:
506 self.DeleteJobs(job_id, slice_hrn)
512 def _process_walltime(duration):
513 """ Calculates the walltime in seconds from the duration in H:M:S
514 specified in the RSpec.
518 # Fixing the walltime by adding a few delays.
519 # First put the walltime in seconds oarAdditionalDelay = 20;
520 # additional delay for /bin/sleep command to
521 # take in account prologue and epilogue scripts execution
522 # int walltimeAdditionalDelay = 240; additional delay
523 desired_walltime = duration
524 total_walltime = desired_walltime + 240 #+4 min Update SA 23/10/12
525 sleep_walltime = desired_walltime # 0 sec added Update SA 23/10/12
527 #Put the walltime back in str form
529 walltime.append(str(total_walltime / 3600))
530 total_walltime = total_walltime - 3600 * int(walltime[0])
531 #Get the remaining minutes
532 walltime.append(str(total_walltime / 60))
533 total_walltime = total_walltime - 60 * int(walltime[1])
535 walltime.append(str(total_walltime))
538 logger.log_exc(" __process_walltime duration null")
540 return walltime, sleep_walltime
543 def _create_job_structure_request_for_OAR(lease_dict):
544 """ Creates the structure needed for a correct POST on OAR.
545 Makes the timestamp transformation into the appropriate format.
546 Sends the POST request to create the job with the resources in
555 reqdict['workdir'] = '/tmp'
556 reqdict['resource'] = "{network_address in ("
558 for node in lease_dict['added_nodes']:
559 logger.debug("\r\n \r\n OARrestapi \t \
560 __create_job_structure_request_for_OAR node %s" %(node))
562 # Get the ID of the node
564 reqdict['resource'] += "'" + nodeid + "', "
565 nodeid_list.append(nodeid)
567 custom_length = len(reqdict['resource'])- 2
568 reqdict['resource'] = reqdict['resource'][0:custom_length] + \
569 ")}/nodes=" + str(len(nodeid_list))
572 walltime, sleep_walltime = \
573 SlabTestbedAPI._process_walltime(int(lease_dict['lease_duration'])*lease_dict['grain'])
576 reqdict['resource'] += ",walltime=" + str(walltime[0]) + \
577 ":" + str(walltime[1]) + ":" + str(walltime[2])
578 reqdict['script_path'] = "/bin/sleep " + str(sleep_walltime)
580 #In case of a scheduled experiment (not immediate)
581 #To run an XP immediately, don't specify date and time in RSpec
582 #They will be set to None.
583 if lease_dict['lease_start_time'] is not '0':
584 #Readable time accepted by OAR
585 start_time = datetime.fromtimestamp(int(lease_dict['lease_start_time'])).\
586 strftime(lease_dict['time_format'])
587 reqdict['reservation'] = start_time
588 #If there is not start time, Immediate XP. No need to add special
592 reqdict['type'] = "deploy"
593 reqdict['directory'] = ""
594 reqdict['name'] = "SFA_" + lease_dict['slice_user']
599 def LaunchExperimentOnOAR(self, added_nodes, slice_name, \
600 lease_start_time, lease_duration, slice_user=None):
602 lease_dict['lease_start_time'] = lease_start_time
603 lease_dict['lease_duration'] = lease_duration
604 lease_dict['added_nodes'] = added_nodes
605 lease_dict['slice_name'] = slice_name
606 lease_dict['slice_user'] = slice_user
607 lease_dict['grain'] = self.GetLeaseGranularity()
608 lease_dict['time_format'] = self.time_format
611 logger.debug("SLABDRIVER.PY \tLaunchExperimentOnOAR slice_user %s\
612 \r\n " %(slice_user))
613 #Create the request for OAR
614 reqdict = self._create_job_structure_request_for_OAR(lease_dict)
615 # first step : start the OAR job and update the job
616 logger.debug("SLABDRIVER.PY \tLaunchExperimentOnOAR reqdict %s\
619 answer = self.oar.POSTRequestToOARRestAPI('POST_job', \
621 logger.debug("SLABDRIVER \tLaunchExperimentOnOAR jobid %s " %(answer))
625 logger.log_exc("SLABDRIVER \tLaunchExperimentOnOAR \
626 Impossible to create job %s " %(answer))
633 logger.debug("SLABDRIVER \tLaunchExperimentOnOAR jobid %s \
634 added_nodes %s slice_user %s" %(jobid, added_nodes, slice_user))
640 def AddLeases(self, hostname_list, slice_record, \
641 lease_start_time, lease_duration):
642 logger.debug("SLABDRIVER \r\n \r\n \t AddLeases hostname_list %s \
643 slice_record %s lease_start_time %s lease_duration %s "\
644 %( hostname_list, slice_record , lease_start_time, \
647 #tmp = slice_record['reg-researchers'][0].split(".")
648 username = slice_record['login']
649 #username = tmp[(len(tmp)-1)]
650 job_id = self.LaunchExperimentOnOAR(hostname_list, slice_record['hrn'], \
651 lease_start_time, lease_duration, username)
652 start_time = datetime.fromtimestamp(int(lease_start_time)).strftime(self.time_format)
653 end_time = lease_start_time + lease_duration
655 import logging, logging.handlers
656 from sfa.util.sfalogging import _SfaLogger
657 logger.debug("SLABDRIVER \r\n \r\n \t AddLeases TURN ON LOGGING SQL %s %s %s "%(slice_record['hrn'], job_id, end_time))
658 sql_logger = _SfaLogger(loggername = 'sqlalchemy.engine', level=logging.DEBUG)
659 logger.debug("SLABDRIVER \r\n \r\n \t AddLeases %s %s %s " %(type(slice_record['hrn']), type(job_id), type(end_time)))
661 slab_ex_row = SenslabXP(slice_hrn = slice_record['hrn'], \
662 job_id = job_id, end_time= end_time)
664 logger.debug("SLABDRIVER \r\n \r\n \t AddLeases slab_ex_row %s" \
666 slab_dbsession.add(slab_ex_row)
667 slab_dbsession.commit()
669 logger.debug("SLABDRIVER \t AddLeases hostname_list start_time %s " %(start_time))
674 #Delete the jobs from job_senslab table
675 def DeleteSliceFromNodes(self, slice_record):
676 logger.debug("SLABDRIVER \t DeleteSliceFromNodese %s " %(slice_record))
677 if isinstance(slice_record['oar_job_id'], list):
678 for job_id in slice_record['oar_job_id']:
679 self.DeleteJobs(job_id, slice_record['user'])
681 self.DeleteJobs(slice_record['oar_job_id'], slice_record['user'])
685 def GetLeaseGranularity(self):
686 """ Returns the granularity of Senslab testbed.
687 OAR returns seconds for experiments duration.
689 Experiments which last less than 10 min are invalid"""
696 def update_jobs_in_slabdb( job_oar_list, jobs_psql):
697 #Get all the entries in slab_xp table
700 jobs_psql = set(jobs_psql)
701 kept_jobs = set(job_oar_list).intersection(jobs_psql)
702 logger.debug ( "\r\n \t\ update_jobs_in_slabdb jobs_psql %s \r\n \t \
703 job_oar_list %s kept_jobs %s "%(jobs_psql, job_oar_list, kept_jobs))
704 deleted_jobs = set(jobs_psql).difference(kept_jobs)
705 deleted_jobs = list(deleted_jobs)
706 if len(deleted_jobs) > 0:
707 slab_dbsession.query(SenslabXP).filter(SenslabXP.job_id.in_(deleted_jobs)).delete(synchronize_session='fetch')
708 slab_dbsession.commit()
714 def GetLeases(self, lease_filter_dict=None, login=None):
717 unfiltered_reservation_list = self.GetReservedNodes(login)
719 reservation_list = []
720 #Find the slice associated with this user senslab ldap uid
721 logger.debug(" SLABDRIVER.PY \tGetLeases login %s\
722 unfiltered_reservation_list %s " %(login, unfiltered_reservation_list))
723 #Create user dict first to avoid looking several times for
724 #the same user in LDAP SA 27/07/12
728 jobs_psql_query = slab_dbsession.query(SenslabXP).all()
729 jobs_psql_dict = [ (row.job_id, row.__dict__ )for row in jobs_psql_query ]
730 jobs_psql_dict = dict(jobs_psql_dict)
731 logger.debug("SLABDRIVER \tGetLeases jobs_psql_dict %s"\
733 jobs_psql_id_list = [ row.job_id for row in jobs_psql_query ]
737 for resa in unfiltered_reservation_list:
738 logger.debug("SLABDRIVER \tGetLeases USER %s"\
740 #Cosntruct list of jobs (runing, waiting..) in oar
741 job_oar_list.append(resa['lease_id'])
742 #If there is information on the job in SLAB DB (slice used and job id)
743 if resa['lease_id'] in jobs_psql_dict:
744 job_info = jobs_psql_dict[resa['lease_id']]
745 logger.debug("SLABDRIVER \tGetLeases resa_user_dict %s"\
747 resa['slice_hrn'] = job_info['slice_hrn']
748 resa['slice_id'] = hrn_to_urn(resa['slice_hrn'], 'slice')
750 #Assume it is a senslab slice:
752 resa['slice_id'] = hrn_to_urn(self.root_auth+'.'+ resa['user'] +"_slice" , 'slice')
753 #if resa['user'] not in resa_user_dict:
754 #logger.debug("SLABDRIVER \tGetLeases userNOTIN ")
755 #ldap_info = self.ldap.LdapSearch('(uid='+resa['user']+')')
757 #ldap_info = ldap_info[0][1]
758 ##Get the backref :relationship table reg-researchers
759 #user = dbsession.query(RegUser).options(joinedload('reg_slices_as_researcher')).filter_by(email = \
760 #ldap_info['mail'][0])
763 #user = user.__dict__
764 #slice_info = user['reg_slices_as_researcher'][0].__dict__
765 ##Separated in case user not in database :
766 ##record_id not defined SA 17/07//12
768 ##query_slice_info = slab_dbsession.query(SenslabXP).filter_by(record_id_user = user.record_id)
769 ##if query_slice_info:
770 ##slice_info = query_slice_info.first()
774 #resa_user_dict[resa['user']] = {}
775 #resa_user_dict[resa['user']]['ldap_info'] = user
776 #resa_user_dict[resa['user']]['slice_info'] = slice_info
778 #resa['slice_hrn'] = resa_user_dict[resa['user']]['slice_info']['hrn']
779 #resa['slice_id'] = hrn_to_urn(resa['slice_hrn'], 'slice')
781 resa['slice_hrn'] = Xrn(resa['slice_id']).get_hrn()
783 resa['component_id_list'] = []
784 #Transform the hostnames into urns (component ids)
785 for node in resa['reserved_nodes']:
786 #resa['component_id_list'].append(hostname_to_urn(self.hrn, \
787 #self.root_auth, node['hostname']))
788 slab_xrn = slab_xrn_object(self.root_auth, node)
789 resa['component_id_list'].append(slab_xrn.urn)
791 if lease_filter_dict:
792 logger.debug("SLABDRIVER \tGetLeases resa_ %s \r\n leasefilter %s"\
793 %(resa,lease_filter_dict))
795 if lease_filter_dict['name'] == resa['slice_hrn']:
796 reservation_list.append(resa)
798 if lease_filter_dict is None:
799 reservation_list = unfiltered_reservation_list
801 #del unfiltered_reservation_list[unfiltered_reservation_list.index(resa)]
804 self.update_jobs_in_slabdb(job_oar_list, jobs_psql_id_list)
806 #for resa in unfiltered_reservation_list:
810 #if resa['user'] in resa_user_dict:
811 #resa['slice_hrn'] = resa_user_dict[resa['user']]['slice_info']['hrn']
812 #resa['slice_id'] = hrn_to_urn(resa['slice_hrn'], 'slice')
814 ##resa['slice_id'] = hrn_to_urn(slice_info.slice_hrn, 'slice')
815 #resa['component_id_list'] = []
816 ##Transform the hostnames into urns (component ids)
817 #for node in resa['reserved_nodes']:
818 ##resa['component_id_list'].append(hostname_to_urn(self.hrn, \
819 ##self.root_auth, node['hostname']))
820 #slab_xrn = slab_xrn_object(self.root_auth, node)
821 #resa['component_id_list'].append(slab_xrn.urn)
823 ##Filter the reservation list if necessary
824 ##Returns all the leases associated with a given slice
825 #if lease_filter_dict:
826 #logger.debug("SLABDRIVER \tGetLeases lease_filter_dict %s"\
827 #%(lease_filter_dict))
828 #for resa in unfiltered_reservation_list:
829 #if lease_filter_dict['name'] == resa['slice_hrn']:
830 #reservation_list.append(resa)
832 #reservation_list = unfiltered_reservation_list
834 logger.debug(" SLABDRIVER.PY \tGetLeases reservation_list %s"\
836 return reservation_list
841 #TODO FUNCTIONS SECTION 04/07/2012 SA
843 #TODO : Is UnBindObjectFromPeer still necessary ? Currently does nothing
846 def UnBindObjectFromPeer( auth, object_type, object_id, shortname):
847 """ This method is a hopefully temporary hack to let the sfa correctly
848 detach the objects it creates from a remote peer object. This is
849 needed so that the sfa federation link can work in parallel with
850 RefreshPeer, as RefreshPeer depends on remote objects being correctly
853 auth : struct, API authentication structure
854 AuthMethod : string, Authentication method to use
855 object_type : string, Object type, among 'site','person','slice',
857 object_id : int, object_id
858 shortname : string, peer shortname
862 logger.warning("SLABDRIVER \tUnBindObjectFromPeer EMPTY-\
866 #TODO Is BindObjectToPeer still necessary ? Currently does nothing
868 def BindObjectToPeer(self, auth, object_type, object_id, shortname=None, \
869 remote_object_id=None):
870 """This method is a hopefully temporary hack to let the sfa correctly
871 attach the objects it creates to a remote peer object. This is needed
872 so that the sfa federation link can work in parallel with RefreshPeer,
873 as RefreshPeer depends on remote objects being correctly marked.
875 shortname : string, peer shortname
876 remote_object_id : int, remote object_id, set to 0 if unknown
880 logger.warning("SLABDRIVER \tBindObjectToPeer EMPTY - DO NOTHING \r\n ")
883 #TODO UpdateSlice 04/07/2012 SA
884 #Funciton should delete and create another job since oin senslab slice=job
885 def UpdateSlice(self, auth, slice_id_or_name, slice_fields=None):
886 """Updates the parameters of an existing slice with the values in
888 Users may only update slices of which they are members.
889 PIs may update any of the slices at their sites, or any slices of
890 which they are members. Admins may update any slice.
891 Only PIs and admins may update max_nodes. Slices cannot be renewed
892 (by updating the expires parameter) more than 8 weeks into the future.
893 Returns 1 if successful, faults otherwise.
897 logger.warning("SLABDRIVER UpdateSlice EMPTY - DO NOTHING \r\n ")
900 #TODO UpdatePerson 04/07/2012 SA
901 def UpdatePerson(self, slab_hrn, federated_hrn, person_fields=None):
902 """Updates a person. Only the fields specified in person_fields
903 are updated, all other fields are left untouched.
904 Users and techs can only update themselves. PIs can only update
905 themselves and other non-PIs at their sites.
906 Returns 1 if successful, faults otherwise.
910 #new_row = FederatedToSenslab(slab_hrn, federated_hrn)
911 #slab_dbsession.add(new_row)
912 #slab_dbsession.commit()
914 logger.debug("SLABDRIVER UpdatePerson EMPTY - DO NOTHING \r\n ")
917 #TODO GetKeys 04/07/2012 SA
918 def GetKeys(self, auth, key_filter=None, return_fields=None):
919 """Returns an array of structs containing details about keys.
920 If key_filter is specified and is an array of key identifiers,
921 or a struct of key attributes, only keys matching the filter
922 will be returned. If return_fields is specified, only the
923 specified details will be returned.
925 Admin may query all keys. Non-admins may only query their own keys.
929 logger.warning("SLABDRIVER GetKeys EMPTY - DO NOTHING \r\n ")
932 #TODO DeleteKey 04/07/2012 SA
933 def DeleteKey(self, key_id):
935 Non-admins may only delete their own keys.
936 Returns 1 if successful, faults otherwise.
940 logger.warning("SLABDRIVER DeleteKey EMPTY - DO NOTHING \r\n ")
947 def _sql_get_slice_info( slice_filter ):
948 #DO NOT USE RegSlice - reg_researchers to get the hrn
949 #of the user otherwise will mess up the RegRecord in
950 #Resolve, don't know why - SA 08/08/2012
952 #Only one entry for one user = one slice in slab_xp table
953 #slicerec = dbsession.query(RegRecord).filter_by(hrn = slice_filter).first()
954 raw_slicerec = dbsession.query(RegSlice).options(joinedload('reg_researchers')).filter_by(hrn = slice_filter).first()
955 #raw_slicerec = dbsession.query(RegRecord).filter_by(hrn = slice_filter).first()
958 #raw_slicerec.reg_researchers
959 raw_slicerec = raw_slicerec.__dict__
960 logger.debug(" SLABDRIVER \t get_slice_info slice_filter %s \
961 raw_slicerec %s"%(slice_filter, raw_slicerec))
962 slicerec = raw_slicerec
963 #only one researcher per slice so take the first one
964 #slicerec['reg_researchers'] = raw_slicerec['reg_researchers']
965 #del slicerec['reg_researchers']['_sa_instance_state']
972 def _sql_get_slice_info_from_user(slice_filter ):
973 #slicerec = dbsession.query(RegRecord).filter_by(record_id = slice_filter).first()
974 raw_slicerec = dbsession.query(RegUser).options(joinedload('reg_slices_as_researcher')).filter_by(record_id = slice_filter).first()
975 #raw_slicerec = dbsession.query(RegRecord).filter_by(record_id = slice_filter).first()
976 #Put it in correct order
977 user_needed_fields = ['peer_authority', 'hrn', 'last_updated', 'classtype', 'authority', 'gid', 'record_id', 'date_created', 'type', 'email', 'pointer']
978 slice_needed_fields = ['peer_authority', 'hrn', 'last_updated', 'classtype', 'authority', 'gid', 'record_id', 'date_created', 'type', 'pointer']
980 #raw_slicerec.reg_slices_as_researcher
981 raw_slicerec = raw_slicerec.__dict__
984 dict([(k, raw_slicerec['reg_slices_as_researcher'][0].__dict__[k]) \
985 for k in slice_needed_fields])
986 slicerec['reg_researchers'] = dict([(k, raw_slicerec[k]) \
987 for k in user_needed_fields])
988 #TODO Handle multiple slices for one user SA 10/12/12
989 #for now only take the first slice record associated to the rec user
990 ##slicerec = raw_slicerec['reg_slices_as_researcher'][0].__dict__
991 #del raw_slicerec['reg_slices_as_researcher']
992 #slicerec['reg_researchers'] = raw_slicerec
993 ##del slicerec['_sa_instance_state']
1000 def _get_slice_records(self, slice_filter = None, \
1001 slice_filter_type = None):
1005 #Get list of slices based on the slice hrn
1006 if slice_filter_type == 'slice_hrn':
1008 #if get_authority(slice_filter) == self.root_auth:
1009 #login = slice_filter.split(".")[1].split("_")[0]
1011 slicerec = self._sql_get_slice_info(slice_filter)
1013 if slicerec is None:
1017 #Get slice based on user id
1018 if slice_filter_type == 'record_id_user':
1020 slicerec = self._sql_get_slice_info_from_user(slice_filter)
1023 fixed_slicerec_dict = slicerec
1024 #At this point if the there is no login it means
1025 #record_id_user filter has been used for filtering
1027 ##If theslice record is from senslab
1028 #if fixed_slicerec_dict['peer_authority'] is None:
1029 #login = fixed_slicerec_dict['hrn'].split(".")[1].split("_")[0]
1030 #return login, fixed_slicerec_dict
1031 return fixed_slicerec_dict
1035 def GetSlices(self, slice_filter = None, slice_filter_type = None, login=None):
1036 """ Get the slice records from the slab db.
1037 Returns a slice ditc if slice_filter and slice_filter_type
1039 Returns a list of slice dictionnaries if there are no filters
1044 authorized_filter_types_list = ['slice_hrn', 'record_id_user']
1045 return_slicerec_dictlist = []
1047 #First try to get information on the slice based on the filter provided
1048 if slice_filter_type in authorized_filter_types_list:
1049 fixed_slicerec_dict = \
1050 self._get_slice_records(slice_filter, slice_filter_type)
1051 slice_hrn = fixed_slicerec_dict['hrn']
1052 #login, fixed_slicerec_dict = \
1053 #self._get_slice_records(slice_filter, slice_filter_type)
1054 logger.debug(" SLABDRIVER \tGetSlices login %s \
1055 slice record %s slice_filter %s slice_filter_type %s "\
1056 %(login, fixed_slicerec_dict,slice_filter, slice_filter_type))
1059 #Now we have the slice record fixed_slicerec_dict, get the
1060 #jobs associated to this slice
1061 #leases_list = self.GetReservedNodes(username = login)
1062 leases_list = self.GetLeases(login = login)
1063 #If no job is running or no job scheduled
1064 #return only the slice record
1065 if leases_list == [] and fixed_slicerec_dict:
1066 return_slicerec_dictlist.append(fixed_slicerec_dict)
1068 #If several jobs for one slice , put the slice record into
1069 # each lease information dict
1072 for lease in leases_list :
1074 logger.debug("SLABDRIVER.PY \tGetSlices slice_filter %s \
1075 \ lease['slice_hrn'] %s" \
1076 %(slice_filter, lease['slice_hrn']))
1077 if lease['slice_hrn'] == slice_hrn:
1078 reserved_list = lease['reserved_nodes']
1079 slicerec_dict['slice_hrn'] = lease['slice_hrn']
1080 slicerec_dict['hrn'] = lease['slice_hrn']
1081 slicerec_dict['user'] = lease['user']
1082 slicerec_dict['oar_job_id'] = lease['lease_id']
1083 slicerec_dict.update({'list_node_ids':{'hostname':reserved_list}})
1084 slicerec_dict.update({'node_ids':lease['reserved_nodes']})
1086 #Update lease dict with the slice record
1087 if fixed_slicerec_dict:
1088 fixed_slicerec_dict['oar_job_id'] = []
1089 fixed_slicerec_dict['oar_job_id'].append(slicerec_dict['oar_job_id'])
1090 slicerec_dict.update(fixed_slicerec_dict)
1091 #slicerec_dict.update({'hrn':\
1092 #str(fixed_slicerec_dict['slice_hrn'])})
1094 return_slicerec_dictlist.append(slicerec_dict)
1095 logger.debug("SLABDRIVER.PY \tGetSlices \
1096 OHOHOHOH %s" %(return_slicerec_dictlist ))
1098 logger.debug("SLABDRIVER.PY \tGetSlices \
1099 slicerec_dict %s return_slicerec_dictlist %s \
1100 lease['reserved_nodes'] \
1101 %s" %(slicerec_dict, return_slicerec_dictlist, \
1102 lease['reserved_nodes'] ))
1104 logger.debug("SLABDRIVER.PY \tGetSlices RETURN \
1105 return_slicerec_dictlist %s" \
1106 %(return_slicerec_dictlist))
1108 return return_slicerec_dictlist
1112 #Get all slices from the senslab sfa database ,
1113 #put them in dict format
1114 #query_slice_list = dbsession.query(RegRecord).all()
1115 query_slice_list = dbsession.query(RegSlice).options(joinedload('reg_researchers')).all()
1116 #query_slice_list = dbsession.query(RegRecord).filter_by(type='slice').all()
1117 #query_slice_list = slab_dbsession.query(SenslabXP).all()
1118 return_slicerec_dictlist = []
1119 for record in query_slice_list:
1120 tmp = record.__dict__
1121 tmp['reg_researchers'] = tmp['reg_researchers'][0].__dict__
1122 #del tmp['reg_researchers']['_sa_instance_state']
1123 return_slicerec_dictlist.append(tmp)
1124 #return_slicerec_dictlist.append(record.__dict__)
1126 #Get all the jobs reserved nodes
1127 leases_list = self.GetReservedNodes()
1130 for fixed_slicerec_dict in return_slicerec_dictlist:
1132 #Check if the slice belongs to a senslab user
1133 if fixed_slicerec_dict['peer_authority'] is None:
1134 owner = fixed_slicerec_dict['hrn'].split(".")[1].split("_")[0]
1137 for lease in leases_list:
1138 if owner == lease['user']:
1139 slicerec_dict['oar_job_id'] = lease['lease_id']
1141 #for reserved_node in lease['reserved_nodes']:
1142 logger.debug("SLABDRIVER.PY \tGetSlices lease %s "\
1145 reserved_list = lease['reserved_nodes']
1147 slicerec_dict.update({'node_ids':lease['reserved_nodes']})
1148 slicerec_dict.update({'list_node_ids':{'hostname':reserved_list}})
1149 slicerec_dict.update(fixed_slicerec_dict)
1150 #slicerec_dict.update({'hrn':\
1151 #str(fixed_slicerec_dict['slice_hrn'])})
1152 #return_slicerec_dictlist.append(slicerec_dict)
1153 fixed_slicerec_dict.update(slicerec_dict)
1155 logger.debug("SLABDRIVER.PY \tGetSlices RETURN \
1156 return_slicerec_dictlist %s \slice_filter %s " \
1157 %(return_slicerec_dictlist, slice_filter))
1159 return return_slicerec_dictlist
1165 # Convert SFA fields to PLC fields for use when registering up updating
1166 # registry record in the PLC database
1168 # @param type type of record (user, slice, ...)
1169 # @param hrn human readable name
1170 # @param sfa_fields dictionary of SFA fields
1171 # @param slab_fields dictionary of PLC fields (output)
1173 def sfa_fields_to_slab_fields(sfa_type, hrn, record):
1177 #for field in record:
1178 # slab_record[field] = record[field]
1180 if sfa_type == "slice":
1181 #instantion used in get_slivers ?
1182 if not "instantiation" in slab_record:
1183 slab_record["instantiation"] = "senslab-instantiated"
1184 #slab_record["hrn"] = hrn_to_pl_slicename(hrn)
1185 #Unused hrn_to_pl_slicename because Slab's hrn already
1186 #in the appropriate form SA 23/07/12
1187 slab_record["hrn"] = hrn
1188 logger.debug("SLABDRIVER.PY sfa_fields_to_slab_fields \
1189 slab_record %s " %(slab_record['hrn']))
1191 slab_record["url"] = record["url"]
1192 if "description" in record:
1193 slab_record["description"] = record["description"]
1194 if "expires" in record:
1195 slab_record["expires"] = int(record["expires"])
1197 #nodes added by OAR only and then imported to SFA
1198 #elif type == "node":
1199 #if not "hostname" in slab_record:
1200 #if not "hostname" in record:
1201 #raise MissingSfaInfo("hostname")
1202 #slab_record["hostname"] = record["hostname"]
1203 #if not "model" in slab_record:
1204 #slab_record["model"] = "geni"
1207 #elif type == "authority":
1208 #slab_record["login_base"] = hrn_to_slab_login_base(hrn)
1210 #if not "name" in slab_record:
1211 #slab_record["name"] = hrn
1213 #if not "abbreviated_name" in slab_record:
1214 #slab_record["abbreviated_name"] = hrn
1216 #if not "enabled" in slab_record:
1217 #slab_record["enabled"] = True
1219 #if not "is_public" in slab_record:
1220 #slab_record["is_public"] = True
1227 def __transforms_timestamp_into_date(self, xp_utc_timestamp = None):
1228 """ Transforms unix timestamp into valid OAR date format """
1230 #Used in case of a scheduled experiment (not immediate)
1231 #To run an XP immediately, don't specify date and time in RSpec
1232 #They will be set to None.
1233 if xp_utc_timestamp:
1234 #transform the xp_utc_timestamp into server readable time
1235 xp_server_readable_date = datetime.fromtimestamp(int(\
1236 xp_utc_timestamp)).strftime(self.time_format)
1238 return xp_server_readable_date