1 from datetime import datetime
3 from sfa.util.sfalogging import logger
5 from sfa.storage.alchemy import dbsession
6 from sqlalchemy.orm import joinedload
7 from sfa.storage.model import RegRecord, RegUser, RegSlice, RegKey
8 from sfa.senslab.slabpostgres import SlabDB, slab_dbsession, SenslabXP
10 from sfa.senslab.OARrestapi import OARrestapi
11 from sfa.senslab.LDAPapi import LDAPapi
13 from sfa.util.xrn import Xrn, hrn_to_urn, get_authority
15 from sfa.trust.certificate import Keypair, convert_public_key
16 from sfa.trust.gid import create_uuid
17 from sfa.trust.hierarchy import Hierarchy
20 from sfa.senslab.slabaggregate import SlabAggregate, slab_xrn_to_hostname, \
23 class SlabTestbedAPI():
25 def __init__(self, config):
26 self.oar = OARrestapi()
28 self.time_format = "%Y-%m-%d %H:%M:%S"
29 self.root_auth = config.SFA_REGISTRY_ROOT_AUTH
30 self.grain = 600 # 10 mins lease
35 #TODO clean GetPeers. 05/07/12SA
37 def GetPeers ( auth = None, peer_filter=None ):
38 """ Gathers registered authorities in SFA DB and looks for specific peer
39 if peer_filter is specified.
40 :returns list of records.
45 existing_hrns_by_types = {}
46 logger.debug("SLABDRIVER \tGetPeers auth = %s, peer_filter %s, \
47 " %(auth , peer_filter))
48 all_records = dbsession.query(RegRecord).filter(RegRecord.type.like('%authority%')).all()
50 for record in all_records:
51 existing_records[(record.hrn, record.type)] = record
52 if record.type not in existing_hrns_by_types:
53 existing_hrns_by_types[record.type] = [record.hrn]
55 existing_hrns_by_types[record.type].append(record.hrn)
58 logger.debug("SLABDRIVER \tGetPeer\texisting_hrns_by_types %s "\
59 %( existing_hrns_by_types))
64 records_list.append(existing_records[(peer_filter,'authority')])
66 for hrn in existing_hrns_by_types['authority']:
67 records_list.append(existing_records[(hrn,'authority')])
69 logger.debug("SLABDRIVER \tGetPeer \trecords_list %s " \
75 return_records = records_list
80 logger.debug("SLABDRIVER \tGetPeer return_records %s " \
86 #TODO : Handling OR request in make_ldap_filters_from_records
87 #instead of the for loop
88 #over the records' list
89 def GetPersons(self, person_filter=None):
91 person_filter should be a list of dictionnaries when not set to None.
92 Returns a list of users whose accounts are enabled found in ldap.
95 logger.debug("SLABDRIVER \tGetPersons person_filter %s" \
98 if person_filter and isinstance(person_filter, list):
99 #If we are looking for a list of users (list of dict records)
100 #Usually the list contains only one user record
101 for searched_attributes in person_filter:
103 #Get only enabled user accounts in senslab LDAP :
104 #add a filter for make_ldap_filters_from_record
105 person = self.ldap.LdapFindUser(searched_attributes, \
106 is_user_enabled=True)
107 #If a person was found, append it to the list
109 person_list.append(person)
111 #If the list is empty, return None
112 if len(person_list) is 0:
116 #Get only enabled user accounts in senslab LDAP :
117 #add a filter for make_ldap_filters_from_record
118 person_list = self.ldap.LdapFindUser(is_user_enabled=True)
125 def GetTimezone(self):
126 """ Get the OAR servier time and timezone.
127 Unused SA 16/11/12"""
128 server_timestamp, server_tz = self.oar.parser.\
129 SendRequest("GET_timezone")
130 return server_timestamp, server_tz
135 def DeleteJobs(self, job_id, username):
137 """Delete a job on OAR given its job id and the username assoaciated.
138 Posts a delete request to OAR."""
139 logger.debug("SLABDRIVER \tDeleteJobs jobid %s username %s " %(job_id, username))
140 if not job_id or job_id is -1:
144 reqdict['method'] = "delete"
145 reqdict['strval'] = str(job_id)
148 answer = self.oar.POSTRequestToOARRestAPI('DELETE_jobs_id', \
150 logger.debug("SLABDRIVER \tDeleteJobs jobid %s \r\n answer %s \
151 username %s" %(job_id, answer, username))
156 ##TODO : Unused GetJobsId ? SA 05/07/12
157 #def GetJobsId(self, job_id, username = None ):
159 #Details about a specific job.
160 #Includes details about submission time, jot type, state, events,
161 #owner, assigned ressources, walltime etc...
165 #node_list_k = 'assigned_network_address'
166 ##Get job info from OAR
167 #job_info = self.oar.parser.SendRequest(req, job_id, username)
169 #logger.debug("SLABDRIVER \t GetJobsId %s " %(job_info))
171 #if job_info['state'] == 'Terminated':
172 #logger.debug("SLABDRIVER \t GetJobsId job %s TERMINATED"\
175 #if job_info['state'] == 'Error':
176 #logger.debug("SLABDRIVER \t GetJobsId ERROR message %s "\
181 #logger.error("SLABDRIVER \tGetJobsId KeyError")
184 #parsed_job_info = self.get_info_on_reserved_nodes(job_info, \
186 ##Replaces the previous entry
187 ##"assigned_network_address" / "reserved_resources"
189 #job_info.update({'node_ids':parsed_job_info[node_list_k]})
190 #del job_info[node_list_k]
191 #logger.debug(" \r\nSLABDRIVER \t GetJobsId job_info %s " %(job_info))
195 def GetJobsResources(self, job_id, username = None):
196 """ Gets the list of nodes associated with the job_id.
197 Transforms the senslab hostnames to the corresponding
199 Rertuns dict key :'node_ids' , value : hostnames list """
201 req = "GET_jobs_id_resources"
204 #Get job resources list from OAR
205 node_id_list = self.oar.parser.SendRequest(req, job_id, username)
206 logger.debug("SLABDRIVER \t GetJobsResources %s " %(node_id_list))
209 self.__get_hostnames_from_oar_node_ids(node_id_list)
212 #Replaces the previous entry "assigned_network_address" /
213 #"reserved_resources" with "node_ids"
214 job_info = {'node_ids': hostname_list}
219 def get_info_on_reserved_nodes(self, job_info, node_list_name):
220 #Get the list of the testbed nodes records and make a
221 #dictionnary keyed on the hostname out of it
222 node_list_dict = self.GetNodes()
223 #node_hostname_list = []
224 node_hostname_list = [node['hostname'] for node in node_list_dict]
225 #for node in node_list_dict:
226 #node_hostname_list.append(node['hostname'])
227 node_dict = dict(zip(node_hostname_list, node_list_dict))
229 reserved_node_hostname_list = []
230 for index in range(len(job_info[node_list_name])):
231 #job_info[node_list_name][k] =
232 reserved_node_hostname_list[index] = \
233 node_dict[job_info[node_list_name][index]]['hostname']
235 logger.debug("SLABDRIVER \t get_info_on_reserved_nodes \
236 reserved_node_hostname_list %s" \
237 %(reserved_node_hostname_list))
239 logger.error("SLABDRIVER \t get_info_on_reserved_nodes KEYERROR " )
241 return reserved_node_hostname_list
243 def GetNodesCurrentlyInUse(self):
244 """Returns a list of all the nodes already involved in an oar job"""
245 return self.oar.parser.SendRequest("GET_running_jobs")
247 def __get_hostnames_from_oar_node_ids(self, resource_id_list ):
248 full_nodes_dict_list = self.GetNodes()
249 #Put the full node list into a dictionary keyed by oar node id
250 oar_id_node_dict = {}
251 for node in full_nodes_dict_list:
252 oar_id_node_dict[node['oar_id']] = node
254 #logger.debug("SLABDRIVER \t __get_hostnames_from_oar_node_ids\
255 #oar_id_node_dict %s" %(oar_id_node_dict))
257 hostname_dict_list = []
258 for resource_id in resource_id_list:
259 #Because jobs requested "asap" do not have defined resources
260 if resource_id is not "Undefined":
261 hostname_dict_list.append(\
262 oar_id_node_dict[resource_id]['hostname'])
264 #hostname_list.append(oar_id_node_dict[resource_id]['hostname'])
265 return hostname_dict_list
267 def GetReservedNodes(self, username = None):
268 #Get the nodes in use and the reserved nodes
269 reservation_dict_list = \
270 self.oar.parser.SendRequest("GET_reserved_nodes", \
274 for resa in reservation_dict_list:
275 logger.debug ("GetReservedNodes resa %s"%(resa))
276 #dict list of hostnames and their site
277 resa['reserved_nodes'] = \
278 self.__get_hostnames_from_oar_node_ids(resa['resource_ids'])
280 #del resa['resource_ids']
281 return reservation_dict_list
283 def GetNodes(self, node_filter_dict = None, return_fields_list = None):
285 node_filter_dict : dictionnary of lists
288 node_dict_by_id = self.oar.parser.SendRequest("GET_resources_full")
289 node_dict_list = node_dict_by_id.values()
290 logger.debug (" SLABDRIVER GetNodes node_filter_dict %s \
291 return_fields_list %s "%(node_filter_dict, return_fields_list))
292 #No filtering needed return the list directly
293 if not (node_filter_dict or return_fields_list):
294 return node_dict_list
296 return_node_list = []
298 for filter_key in node_filter_dict:
300 #Filter the node_dict_list by each value contained in the
301 #list node_filter_dict[filter_key]
302 for value in node_filter_dict[filter_key]:
303 for node in node_dict_list:
304 if node[filter_key] == value:
305 if return_fields_list :
307 for k in return_fields_list:
309 return_node_list.append(tmp)
311 return_node_list.append(node)
313 logger.log_exc("GetNodes KeyError")
317 return return_node_list
322 def AddSlice(slice_record, user_record):
323 """Add slice to the sfa tables. Called by verify_slice
324 during lease/sliver creation.
327 sfa_record = RegSlice(hrn=slice_record['hrn'],
328 gid=slice_record['gid'],
329 pointer=slice_record['slice_id'],
330 authority=slice_record['authority'])
332 logger.debug("SLABDRIVER.PY AddSlice sfa_record %s user_record %s" \
333 %(sfa_record, user_record))
334 sfa_record.just_created()
335 dbsession.add(sfa_record)
337 #Update the reg-researcher dependance table
338 sfa_record.reg_researchers = [user_record]
341 #Update the senslab table with the new slice
342 #slab_slice = SenslabXP( slice_hrn = slice_record['slice_hrn'], \
343 #record_id_slice = sfa_record.record_id , \
344 #record_id_user = slice_record['record_id_user'], \
345 #peer_authority = slice_record['peer_authority'])
347 #logger.debug("SLABDRIVER.PY \tAddSlice slice_record %s \
348 #slab_slice %s sfa_record %s" \
349 #%(slice_record,slab_slice, sfa_record))
350 #slab_dbsession.add(slab_slice)
351 #slab_dbsession.commit()
354 def GetSites(self, site_filter_name_list = None, return_fields_list = None):
355 site_dict = self.oar.parser.SendRequest("GET_sites")
356 #site_dict : dict where the key is the sit ename
357 return_site_list = []
358 if not ( site_filter_name_list or return_fields_list):
359 return_site_list = site_dict.values()
360 return return_site_list
362 for site_filter_name in site_filter_name_list:
363 if site_filter_name in site_dict:
364 if return_fields_list:
365 for field in return_fields_list:
368 tmp[field] = site_dict[site_filter_name][field]
370 logger.error("GetSites KeyError %s "%(field))
372 return_site_list.append(tmp)
374 return_site_list.append( site_dict[site_filter_name])
377 return return_site_list
383 #TODO : Check rights to delete person
384 def DeletePerson(self, person_record):
385 """ Disable an existing account in senslab LDAP.
386 Users and techs can only delete themselves. PIs can only
387 delete themselves and other non-PIs at their sites.
388 ins can delete anyone.
389 Returns 1 if successful, faults otherwise.
393 #Disable user account in senslab LDAP
394 ret = self.ldap.LdapMarkUserAsDeleted(person_record)
395 logger.warning("SLABDRIVER DeletePerson %s " %(person_record))
398 #TODO Check DeleteSlice, check rights 05/07/2012 SA
399 def DeleteSlice(self, slice_record):
400 """ Deletes the specified slice.
401 Senslab : Kill the job associated with the slice if there is one
402 using DeleteSliceFromNodes.
403 Updates the slice record in slab db to remove the slice nodes.
405 Users may only delete slices of which they are members. PIs may
406 delete any of the slices at their sites, or any slices of which
407 they are members. Admins may delete any slice.
408 Returns 1 if successful, faults otherwise.
412 self.DeleteSliceFromNodes(slice_record)
413 logger.warning("SLABDRIVER DeleteSlice %s "%(slice_record))
417 def __add_person_to_db(user_dict):
419 check_if_exists = dbsession.query(RegUser).filter_by(email = user_dict['email']).first()
421 if not check_if_exists:
422 logger.debug("__add_person_to_db \t Adding %s \r\n \r\n \
423 _________________________________________________________________________\
425 hrn = user_dict['hrn']
426 person_urn = hrn_to_urn(hrn, 'user')
427 pubkey = user_dict['pkey']
429 pkey = convert_public_key(pubkey)
431 #key not good. create another pkey
432 logger.warn('__add_person_to_db: unable to convert public \
434 pkey = Keypair(create=True)
437 if pubkey is not None and pkey is not None :
438 hierarchy = Hierarchy()
439 person_gid = hierarchy.create_gid(person_urn, create_uuid(), pkey)
440 if user_dict['email']:
441 logger.debug("__add_person_to_db \r\n \r\n SLAB IMPORTER PERSON EMAIL OK email %s " %(user_dict['email']))
442 person_gid.set_email(user_dict['email'])
444 user_record = RegUser(hrn=hrn , pointer= '-1', authority=get_authority(hrn), \
445 email=user_dict['email'], gid = person_gid)
446 user_record.reg_keys = [RegKey(user_dict['pkey'])]
447 user_record.just_created()
448 dbsession.add (user_record)
452 #TODO AddPerson 04/07/2012 SA
453 #def AddPerson(self, auth, person_fields=None):
454 def AddPerson(self, record):#TODO fixing 28/08//2012 SA
455 """Adds a new account. Any fields specified in records are used,
456 otherwise defaults are used.
457 Accounts are disabled by default. To enable an account,
459 Returns the new person_id (> 0) if successful, faults otherwise.
463 ret = self.ldap.LdapAddUser(record)
465 record['hrn'] = self.root_auth + '.' + ret['uid']
466 logger.debug("SLABDRIVER AddPerson return code %s record %s \r\n "\
468 self.__add_person_to_db(record)
471 #TODO AddPersonToSite 04/07/2012 SA
472 def AddPersonToSite (self, auth, person_id_or_email, \
473 site_id_or_login_base=None):
474 """ Adds the specified person to the specified site. If the person is
475 already a member of the site, no errors are returned. Does not change
476 the person's primary site.
477 Returns 1 if successful, faults otherwise.
481 logger.warning("SLABDRIVER AddPersonToSite EMPTY - DO NOTHING \r\n ")
484 #TODO AddRoleToPerson : Not sure if needed in senslab 04/07/2012 SA
485 def AddRoleToPerson(self, auth, role_id_or_name, person_id_or_email):
486 """Grants the specified role to the person.
487 PIs can only grant the tech and user roles to users and techs at their
488 sites. Admins can grant any role to any user.
489 Returns 1 if successful, faults otherwise.
493 logger.warning("SLABDRIVER AddRoleToPerson EMPTY - DO NOTHING \r\n ")
496 #TODO AddPersonKey 04/07/2012 SA
497 def AddPersonKey(self, auth, person_id_or_email, key_fields=None):
498 """Adds a new key to the specified account.
499 Non-admins can only modify their own keys.
500 Returns the new key_id (> 0) if successful, faults otherwise.
504 logger.warning("SLABDRIVER AddPersonKey EMPTY - DO NOTHING \r\n ")
507 def DeleteLeases(self, leases_id_list, slice_hrn ):
508 logger.debug("SLABDRIVER DeleteLeases leases_id_list %s slice_hrn %s \
509 \r\n " %(leases_id_list, slice_hrn))
510 for job_id in leases_id_list:
511 self.DeleteJobs(job_id, slice_hrn)
517 def _process_walltime(duration):
518 """ Calculates the walltime in seconds from the duration in H:M:S
519 specified in the RSpec.
523 # Fixing the walltime by adding a few delays.
524 # First put the walltime in seconds oarAdditionalDelay = 20;
525 # additional delay for /bin/sleep command to
526 # take in account prologue and epilogue scripts execution
527 # int walltimeAdditionalDelay = 240; additional delay
528 desired_walltime = duration
529 total_walltime = desired_walltime + 240 #+4 min Update SA 23/10/12
530 sleep_walltime = desired_walltime # 0 sec added Update SA 23/10/12
532 #Put the walltime back in str form
534 walltime.append(str(total_walltime / 3600))
535 total_walltime = total_walltime - 3600 * int(walltime[0])
536 #Get the remaining minutes
537 walltime.append(str(total_walltime / 60))
538 total_walltime = total_walltime - 60 * int(walltime[1])
540 walltime.append(str(total_walltime))
543 logger.log_exc(" __process_walltime duration null")
545 return walltime, sleep_walltime
548 def _create_job_structure_request_for_OAR(lease_dict):
549 """ Creates the structure needed for a correct POST on OAR.
550 Makes the timestamp transformation into the appropriate format.
551 Sends the POST request to create the job with the resources in
560 reqdict['workdir'] = '/tmp'
561 reqdict['resource'] = "{network_address in ("
563 for node in lease_dict['added_nodes']:
564 logger.debug("\r\n \r\n OARrestapi \t \
565 __create_job_structure_request_for_OAR node %s" %(node))
567 # Get the ID of the node
569 reqdict['resource'] += "'" + nodeid + "', "
570 nodeid_list.append(nodeid)
572 custom_length = len(reqdict['resource'])- 2
573 reqdict['resource'] = reqdict['resource'][0:custom_length] + \
574 ")}/nodes=" + str(len(nodeid_list))
577 walltime, sleep_walltime = \
578 SlabTestbedAPI._process_walltime(int(lease_dict['lease_duration'])*lease_dict['grain'])
581 reqdict['resource'] += ",walltime=" + str(walltime[0]) + \
582 ":" + str(walltime[1]) + ":" + str(walltime[2])
583 reqdict['script_path'] = "/bin/sleep " + str(sleep_walltime)
585 #In case of a scheduled experiment (not immediate)
586 #To run an XP immediately, don't specify date and time in RSpec
587 #They will be set to None.
588 if lease_dict['lease_start_time'] is not '0':
589 #Readable time accepted by OAR
590 start_time = datetime.fromtimestamp(int(lease_dict['lease_start_time'])).\
591 strftime(lease_dict['time_format'])
592 reqdict['reservation'] = start_time
593 #If there is not start time, Immediate XP. No need to add special
597 reqdict['type'] = "deploy"
598 reqdict['directory'] = ""
599 reqdict['name'] = "SFA_" + lease_dict['slice_user']
604 def LaunchExperimentOnOAR(self, added_nodes, slice_name, \
605 lease_start_time, lease_duration, slice_user=None):
607 lease_dict['lease_start_time'] = lease_start_time
608 lease_dict['lease_duration'] = lease_duration
609 lease_dict['added_nodes'] = added_nodes
610 lease_dict['slice_name'] = slice_name
611 lease_dict['slice_user'] = slice_user
612 lease_dict['grain'] = self.GetLeaseGranularity()
613 lease_dict['time_format'] = self.time_format
616 logger.debug("SLABDRIVER.PY \tLaunchExperimentOnOAR slice_user %s\
617 \r\n " %(slice_user))
618 #Create the request for OAR
619 reqdict = self._create_job_structure_request_for_OAR(lease_dict)
620 # first step : start the OAR job and update the job
621 logger.debug("SLABDRIVER.PY \tLaunchExperimentOnOAR reqdict %s\
624 answer = self.oar.POSTRequestToOARRestAPI('POST_job', \
626 logger.debug("SLABDRIVER \tLaunchExperimentOnOAR jobid %s " %(answer))
630 logger.log_exc("SLABDRIVER \tLaunchExperimentOnOAR \
631 Impossible to create job %s " %(answer))
638 logger.debug("SLABDRIVER \tLaunchExperimentOnOAR jobid %s \
639 added_nodes %s slice_user %s" %(jobid, added_nodes, slice_user))
645 def AddLeases(self, hostname_list, slice_record, \
646 lease_start_time, lease_duration):
647 logger.debug("SLABDRIVER \r\n \r\n \t AddLeases hostname_list %s \
648 slice_record %s lease_start_time %s lease_duration %s "\
649 %( hostname_list, slice_record , lease_start_time, \
652 #tmp = slice_record['reg-researchers'][0].split(".")
653 username = slice_record['login']
654 #username = tmp[(len(tmp)-1)]
655 job_id = self.LaunchExperimentOnOAR(hostname_list, slice_record['hrn'], \
656 lease_start_time, lease_duration, username)
657 start_time = datetime.fromtimestamp(int(lease_start_time)).strftime(self.time_format)
658 end_time = lease_start_time + lease_duration
660 import logging, logging.handlers
661 from sfa.util.sfalogging import _SfaLogger
662 logger.debug("SLABDRIVER \r\n \r\n \t AddLeases TURN ON LOGGING SQL %s %s %s "%(slice_record['hrn'], job_id, end_time))
663 sql_logger = _SfaLogger(loggername = 'sqlalchemy.engine', level=logging.DEBUG)
664 logger.debug("SLABDRIVER \r\n \r\n \t AddLeases %s %s %s " %(type(slice_record['hrn']), type(job_id), type(end_time)))
666 slab_ex_row = SenslabXP(slice_hrn = slice_record['hrn'], \
667 job_id = job_id, end_time= end_time)
669 logger.debug("SLABDRIVER \r\n \r\n \t AddLeases slab_ex_row %s" \
671 slab_dbsession.add(slab_ex_row)
672 slab_dbsession.commit()
674 logger.debug("SLABDRIVER \t AddLeases hostname_list start_time %s " %(start_time))
679 #Delete the jobs from job_senslab table
680 def DeleteSliceFromNodes(self, slice_record):
681 logger.debug("SLABDRIVER \t DeleteSliceFromNodese %s " %(slice_record))
682 if isinstance(slice_record['oar_job_id'], list):
683 for job_id in slice_record['oar_job_id']:
684 self.DeleteJobs(job_id, slice_record['user'])
686 self.DeleteJobs(slice_record['oar_job_id'], slice_record['user'])
690 def GetLeaseGranularity(self):
691 """ Returns the granularity of Senslab testbed.
692 OAR returns seconds for experiments duration.
694 Experiments which last less than 10 min are invalid"""
701 def update_jobs_in_slabdb( job_oar_list, jobs_psql):
702 #Get all the entries in slab_xp table
704 set_jobs_psql = set(jobs_psql)
706 kept_jobs = set(job_oar_list).intersection(set_jobs_psql)
707 logger.debug ( "\r\n \t\ update_jobs_in_slabdb jobs_psql %s \r\n \t \
708 job_oar_list %s kept_jobs %s "%(set_jobs_psql, job_oar_list, kept_jobs))
709 deleted_jobs = set_jobs_psql.difference(kept_jobs)
710 deleted_jobs = list(deleted_jobs)
711 if len(deleted_jobs) > 0:
712 slab_dbsession.query(SenslabXP).filter(SenslabXP.job_id.in_(deleted_jobs)).delete(synchronize_session='fetch')
713 slab_dbsession.commit()
719 def GetLeases(self, lease_filter_dict=None, login=None):
722 unfiltered_reservation_list = self.GetReservedNodes(login)
724 reservation_list = []
725 #Find the slice associated with this user senslab ldap uid
726 logger.debug(" SLABDRIVER.PY \tGetLeases login %s\
727 unfiltered_reservation_list %s " %(login, unfiltered_reservation_list))
728 #Create user dict first to avoid looking several times for
729 #the same user in LDAP SA 27/07/12
733 jobs_psql_query = slab_dbsession.query(SenslabXP).all()
734 jobs_psql_dict = dict( [ (row.job_id, row.__dict__ )for row in jobs_psql_query ])
735 #jobs_psql_dict = jobs_psql_dict)
736 logger.debug("SLABDRIVER \tGetLeases jobs_psql_dict %s"\
738 jobs_psql_id_list = [ row.job_id for row in jobs_psql_query ]
742 for resa in unfiltered_reservation_list:
743 logger.debug("SLABDRIVER \tGetLeases USER %s"\
745 #Cosntruct list of jobs (runing, waiting..) in oar
746 job_oar_list.append(resa['lease_id'])
747 #If there is information on the job in SLAB DB (slice used and job id)
748 if resa['lease_id'] in jobs_psql_dict:
749 job_info = jobs_psql_dict[resa['lease_id']]
750 logger.debug("SLABDRIVER \tGetLeases job_info %s"\
752 resa['slice_hrn'] = job_info['slice_hrn']
753 resa['slice_id'] = hrn_to_urn(resa['slice_hrn'], 'slice')
755 #Assume it is a senslab slice:
757 resa['slice_id'] = hrn_to_urn(self.root_auth+'.'+ resa['user'] +"_slice" , 'slice')
758 resa['slice_hrn'] = Xrn(resa['slice_id']).get_hrn()
760 resa['component_id_list'] = []
761 #Transform the hostnames into urns (component ids)
762 for node in resa['reserved_nodes']:
763 #resa['component_id_list'].append(hostname_to_urn(self.hrn, \
764 #self.root_auth, node['hostname']))
765 slab_xrn = slab_xrn_object(self.root_auth, node)
766 resa['component_id_list'].append(slab_xrn.urn)
768 if lease_filter_dict:
769 logger.debug("SLABDRIVER \tGetLeases resa_ %s \r\n leasefilter %s"\
770 %(resa,lease_filter_dict))
772 if lease_filter_dict['name'] == resa['slice_hrn']:
773 reservation_list.append(resa)
775 if lease_filter_dict is None:
776 reservation_list = unfiltered_reservation_list
778 #del unfiltered_reservation_list[unfiltered_reservation_list.index(resa)]
781 self.update_jobs_in_slabdb(job_oar_list, jobs_psql_id_list)
783 #for resa in unfiltered_reservation_list:
787 #if resa['user'] in resa_user_dict:
788 #resa['slice_hrn'] = resa_user_dict[resa['user']]['slice_info']['hrn']
789 #resa['slice_id'] = hrn_to_urn(resa['slice_hrn'], 'slice')
791 ##resa['slice_id'] = hrn_to_urn(slice_info.slice_hrn, 'slice')
792 #resa['component_id_list'] = []
793 ##Transform the hostnames into urns (component ids)
794 #for node in resa['reserved_nodes']:
795 ##resa['component_id_list'].append(hostname_to_urn(self.hrn, \
796 ##self.root_auth, node['hostname']))
797 #slab_xrn = slab_xrn_object(self.root_auth, node)
798 #resa['component_id_list'].append(slab_xrn.urn)
800 ##Filter the reservation list if necessary
801 ##Returns all the leases associated with a given slice
802 #if lease_filter_dict:
803 #logger.debug("SLABDRIVER \tGetLeases lease_filter_dict %s"\
804 #%(lease_filter_dict))
805 #for resa in unfiltered_reservation_list:
806 #if lease_filter_dict['name'] == resa['slice_hrn']:
807 #reservation_list.append(resa)
809 #reservation_list = unfiltered_reservation_list
811 logger.debug(" SLABDRIVER.PY \tGetLeases reservation_list %s"\
813 return reservation_list
818 #TODO FUNCTIONS SECTION 04/07/2012 SA
820 #TODO : Is UnBindObjectFromPeer still necessary ? Currently does nothing
823 def UnBindObjectFromPeer( auth, object_type, object_id, shortname):
824 """ This method is a hopefully temporary hack to let the sfa correctly
825 detach the objects it creates from a remote peer object. This is
826 needed so that the sfa federation link can work in parallel with
827 RefreshPeer, as RefreshPeer depends on remote objects being correctly
830 auth : struct, API authentication structure
831 AuthMethod : string, Authentication method to use
832 object_type : string, Object type, among 'site','person','slice',
834 object_id : int, object_id
835 shortname : string, peer shortname
839 logger.warning("SLABDRIVER \tUnBindObjectFromPeer EMPTY-\
843 #TODO Is BindObjectToPeer still necessary ? Currently does nothing
845 def BindObjectToPeer(self, auth, object_type, object_id, shortname=None, \
846 remote_object_id=None):
847 """This method is a hopefully temporary hack to let the sfa correctly
848 attach the objects it creates to a remote peer object. This is needed
849 so that the sfa federation link can work in parallel with RefreshPeer,
850 as RefreshPeer depends on remote objects being correctly marked.
852 shortname : string, peer shortname
853 remote_object_id : int, remote object_id, set to 0 if unknown
857 logger.warning("SLABDRIVER \tBindObjectToPeer EMPTY - DO NOTHING \r\n ")
860 #TODO UpdateSlice 04/07/2012 SA
861 #Funciton should delete and create another job since oin senslab slice=job
862 def UpdateSlice(self, auth, slice_id_or_name, slice_fields=None):
863 """Updates the parameters of an existing slice with the values in
865 Users may only update slices of which they are members.
866 PIs may update any of the slices at their sites, or any slices of
867 which they are members. Admins may update any slice.
868 Only PIs and admins may update max_nodes. Slices cannot be renewed
869 (by updating the expires parameter) more than 8 weeks into the future.
870 Returns 1 if successful, faults otherwise.
874 logger.warning("SLABDRIVER UpdateSlice EMPTY - DO NOTHING \r\n ")
877 #TODO UpdatePerson 04/07/2012 SA
878 def UpdatePerson(self, slab_hrn, federated_hrn, person_fields=None):
879 """Updates a person. Only the fields specified in person_fields
880 are updated, all other fields are left untouched.
881 Users and techs can only update themselves. PIs can only update
882 themselves and other non-PIs at their sites.
883 Returns 1 if successful, faults otherwise.
887 #new_row = FederatedToSenslab(slab_hrn, federated_hrn)
888 #slab_dbsession.add(new_row)
889 #slab_dbsession.commit()
891 logger.debug("SLABDRIVER UpdatePerson EMPTY - DO NOTHING \r\n ")
894 #TODO GetKeys 04/07/2012 SA
895 def GetKeys(self, auth, key_filter=None, return_fields=None):
896 """Returns an array of structs containing details about keys.
897 If key_filter is specified and is an array of key identifiers,
898 or a struct of key attributes, only keys matching the filter
899 will be returned. If return_fields is specified, only the
900 specified details will be returned.
902 Admin may query all keys. Non-admins may only query their own keys.
906 logger.warning("SLABDRIVER GetKeys EMPTY - DO NOTHING \r\n ")
909 #TODO DeleteKey 04/07/2012 SA
910 def DeleteKey(self, key_id):
912 Non-admins may only delete their own keys.
913 Returns 1 if successful, faults otherwise.
917 logger.warning("SLABDRIVER DeleteKey EMPTY - DO NOTHING \r\n ")
924 def _sql_get_slice_info( slice_filter ):
925 #DO NOT USE RegSlice - reg_researchers to get the hrn
926 #of the user otherwise will mess up the RegRecord in
927 #Resolve, don't know why - SA 08/08/2012
929 #Only one entry for one user = one slice in slab_xp table
930 #slicerec = dbsession.query(RegRecord).filter_by(hrn = slice_filter).first()
931 raw_slicerec = dbsession.query(RegSlice).options(joinedload('reg_researchers')).filter_by(hrn = slice_filter).first()
932 #raw_slicerec = dbsession.query(RegRecord).filter_by(hrn = slice_filter).first()
935 #raw_slicerec.reg_researchers
936 raw_slicerec = raw_slicerec.__dict__
937 logger.debug(" SLABDRIVER \t get_slice_info slice_filter %s \
938 raw_slicerec %s"%(slice_filter, raw_slicerec))
939 slicerec = raw_slicerec
940 #only one researcher per slice so take the first one
941 #slicerec['reg_researchers'] = raw_slicerec['reg_researchers']
942 #del slicerec['reg_researchers']['_sa_instance_state']
949 def _sql_get_slice_info_from_user(slice_filter ):
950 #slicerec = dbsession.query(RegRecord).filter_by(record_id = slice_filter).first()
951 raw_slicerec = dbsession.query(RegUser).options(joinedload('reg_slices_as_researcher')).filter_by(record_id = slice_filter).first()
952 #raw_slicerec = dbsession.query(RegRecord).filter_by(record_id = slice_filter).first()
953 #Put it in correct order
954 user_needed_fields = ['peer_authority', 'hrn', 'last_updated', 'classtype', 'authority', 'gid', 'record_id', 'date_created', 'type', 'email', 'pointer']
955 slice_needed_fields = ['peer_authority', 'hrn', 'last_updated', 'classtype', 'authority', 'gid', 'record_id', 'date_created', 'type', 'pointer']
957 #raw_slicerec.reg_slices_as_researcher
958 raw_slicerec = raw_slicerec.__dict__
961 dict([(k, raw_slicerec['reg_slices_as_researcher'][0].__dict__[k]) \
962 for k in slice_needed_fields])
963 slicerec['reg_researchers'] = dict([(k, raw_slicerec[k]) \
964 for k in user_needed_fields])
965 #TODO Handle multiple slices for one user SA 10/12/12
966 #for now only take the first slice record associated to the rec user
967 ##slicerec = raw_slicerec['reg_slices_as_researcher'][0].__dict__
968 #del raw_slicerec['reg_slices_as_researcher']
969 #slicerec['reg_researchers'] = raw_slicerec
970 ##del slicerec['_sa_instance_state']
977 def _get_slice_records(self, slice_filter = None, \
978 slice_filter_type = None):
982 #Get list of slices based on the slice hrn
983 if slice_filter_type == 'slice_hrn':
985 #if get_authority(slice_filter) == self.root_auth:
986 #login = slice_filter.split(".")[1].split("_")[0]
988 slicerec = self._sql_get_slice_info(slice_filter)
994 #Get slice based on user id
995 if slice_filter_type == 'record_id_user':
997 slicerec = self._sql_get_slice_info_from_user(slice_filter)
1000 fixed_slicerec_dict = slicerec
1001 #At this point if there is no login it means
1002 #record_id_user filter has been used for filtering
1004 ##If theslice record is from senslab
1005 #if fixed_slicerec_dict['peer_authority'] is None:
1006 #login = fixed_slicerec_dict['hrn'].split(".")[1].split("_")[0]
1007 #return login, fixed_slicerec_dict
1008 return fixed_slicerec_dict
1012 def GetSlices(self, slice_filter = None, slice_filter_type = None, login=None):
1013 """ Get the slice records from the slab db.
1014 Returns a slice ditc if slice_filter and slice_filter_type
1016 Returns a list of slice dictionnaries if there are no filters
1021 authorized_filter_types_list = ['slice_hrn', 'record_id_user']
1022 return_slicerec_dictlist = []
1024 #First try to get information on the slice based on the filter provided
1025 if slice_filter_type in authorized_filter_types_list:
1026 fixed_slicerec_dict = \
1027 self._get_slice_records(slice_filter, slice_filter_type)
1028 slice_hrn = fixed_slicerec_dict['hrn']
1029 #login, fixed_slicerec_dict = \
1030 #self._get_slice_records(slice_filter, slice_filter_type)
1031 logger.debug(" SLABDRIVER \tGetSlices login %s \
1032 slice record %s slice_filter %s slice_filter_type %s "\
1033 %(login, fixed_slicerec_dict,slice_filter, slice_filter_type))
1036 #Now we have the slice record fixed_slicerec_dict, get the
1037 #jobs associated to this slice
1038 #leases_list = self.GetReservedNodes(username = login)
1039 leases_list = self.GetLeases(login = login)
1040 #If no job is running or no job scheduled
1041 #return only the slice record
1042 if leases_list == [] and fixed_slicerec_dict:
1043 return_slicerec_dictlist.append(fixed_slicerec_dict)
1045 #If several jobs for one slice , put the slice record into
1046 # each lease information dict
1049 for lease in leases_list :
1051 logger.debug("SLABDRIVER.PY \tGetSlices slice_filter %s \
1052 \ lease['slice_hrn'] %s" \
1053 %(slice_filter, lease['slice_hrn']))
1054 if lease['slice_hrn'] == slice_hrn:
1055 reserved_list = lease['reserved_nodes']
1056 slicerec_dict['slice_hrn'] = lease['slice_hrn']
1057 slicerec_dict['hrn'] = lease['slice_hrn']
1058 slicerec_dict['user'] = lease['user']
1059 slicerec_dict['oar_job_id'] = lease['lease_id']
1060 slicerec_dict.update({'list_node_ids':{'hostname':reserved_list}})
1061 slicerec_dict.update({'node_ids':lease['reserved_nodes']})
1063 #Update lease dict with the slice record
1064 if fixed_slicerec_dict:
1065 fixed_slicerec_dict['oar_job_id'] = []
1066 fixed_slicerec_dict['oar_job_id'].append(slicerec_dict['oar_job_id'])
1067 slicerec_dict.update(fixed_slicerec_dict)
1068 #slicerec_dict.update({'hrn':\
1069 #str(fixed_slicerec_dict['slice_hrn'])})
1071 return_slicerec_dictlist.append(slicerec_dict)
1072 logger.debug("SLABDRIVER.PY \tGetSlices \
1073 OHOHOHOH %s" %(return_slicerec_dictlist ))
1075 logger.debug("SLABDRIVER.PY \tGetSlices \
1076 slicerec_dict %s return_slicerec_dictlist %s \
1077 lease['reserved_nodes'] \
1078 %s" %(slicerec_dict, return_slicerec_dictlist, \
1079 lease['reserved_nodes'] ))
1081 logger.debug("SLABDRIVER.PY \tGetSlices RETURN \
1082 return_slicerec_dictlist %s" \
1083 %(return_slicerec_dictlist))
1085 return return_slicerec_dictlist
1089 #Get all slices from the senslab sfa database ,
1090 #put them in dict format
1091 #query_slice_list = dbsession.query(RegRecord).all()
1092 query_slice_list = dbsession.query(RegSlice).options(joinedload('reg_researchers')).all()
1093 #query_slice_list = dbsession.query(RegRecord).filter_by(type='slice').all()
1094 #query_slice_list = slab_dbsession.query(SenslabXP).all()
1095 return_slicerec_dictlist = []
1096 for record in query_slice_list:
1097 tmp = record.__dict__
1098 tmp['reg_researchers'] = tmp['reg_researchers'][0].__dict__
1099 #del tmp['reg_researchers']['_sa_instance_state']
1100 return_slicerec_dictlist.append(tmp)
1101 #return_slicerec_dictlist.append(record.__dict__)
1103 #Get all the jobs reserved nodes
1104 leases_list = self.GetReservedNodes()
1107 for fixed_slicerec_dict in return_slicerec_dictlist:
1109 #Check if the slice belongs to a senslab user
1110 if fixed_slicerec_dict['peer_authority'] is None:
1111 owner = fixed_slicerec_dict['hrn'].split(".")[1].split("_")[0]
1114 for lease in leases_list:
1115 if owner == lease['user']:
1116 slicerec_dict['oar_job_id'] = lease['lease_id']
1118 #for reserved_node in lease['reserved_nodes']:
1119 logger.debug("SLABDRIVER.PY \tGetSlices lease %s "\
1122 reserved_list = lease['reserved_nodes']
1124 slicerec_dict.update({'node_ids':lease['reserved_nodes']})
1125 slicerec_dict.update({'list_node_ids':{'hostname':reserved_list}})
1126 slicerec_dict.update(fixed_slicerec_dict)
1127 #slicerec_dict.update({'hrn':\
1128 #str(fixed_slicerec_dict['slice_hrn'])})
1129 #return_slicerec_dictlist.append(slicerec_dict)
1130 fixed_slicerec_dict.update(slicerec_dict)
1132 logger.debug("SLABDRIVER.PY \tGetSlices RETURN \
1133 return_slicerec_dictlist %s \slice_filter %s " \
1134 %(return_slicerec_dictlist, slice_filter))
1136 return return_slicerec_dictlist
1142 # Convert SFA fields to PLC fields for use when registering up updating
1143 # registry record in the PLC database
1145 # @param type type of record (user, slice, ...)
1146 # @param hrn human readable name
1147 # @param sfa_fields dictionary of SFA fields
1148 # @param slab_fields dictionary of PLC fields (output)
1150 def sfa_fields_to_slab_fields(sfa_type, hrn, record):
1154 #for field in record:
1155 # slab_record[field] = record[field]
1157 if sfa_type == "slice":
1158 #instantion used in get_slivers ?
1159 if not "instantiation" in slab_record:
1160 slab_record["instantiation"] = "senslab-instantiated"
1161 #slab_record["hrn"] = hrn_to_pl_slicename(hrn)
1162 #Unused hrn_to_pl_slicename because Slab's hrn already
1163 #in the appropriate form SA 23/07/12
1164 slab_record["hrn"] = hrn
1165 logger.debug("SLABDRIVER.PY sfa_fields_to_slab_fields \
1166 slab_record %s " %(slab_record['hrn']))
1168 slab_record["url"] = record["url"]
1169 if "description" in record:
1170 slab_record["description"] = record["description"]
1171 if "expires" in record:
1172 slab_record["expires"] = int(record["expires"])
1174 #nodes added by OAR only and then imported to SFA
1175 #elif type == "node":
1176 #if not "hostname" in slab_record:
1177 #if not "hostname" in record:
1178 #raise MissingSfaInfo("hostname")
1179 #slab_record["hostname"] = record["hostname"]
1180 #if not "model" in slab_record:
1181 #slab_record["model"] = "geni"
1184 #elif type == "authority":
1185 #slab_record["login_base"] = hrn_to_slab_login_base(hrn)
1187 #if not "name" in slab_record:
1188 #slab_record["name"] = hrn
1190 #if not "abbreviated_name" in slab_record:
1191 #slab_record["abbreviated_name"] = hrn
1193 #if not "enabled" in slab_record:
1194 #slab_record["enabled"] = True
1196 #if not "is_public" in slab_record:
1197 #slab_record["is_public"] = True
1204 def __transforms_timestamp_into_date(self, xp_utc_timestamp = None):
1205 """ Transforms unix timestamp into valid OAR date format """
1207 #Used in case of a scheduled experiment (not immediate)
1208 #To run an XP immediately, don't specify date and time in RSpec
1209 #They will be set to None.
1210 if xp_utc_timestamp:
1211 #transform the xp_utc_timestamp into server readable time
1212 xp_server_readable_date = datetime.fromtimestamp(int(\
1213 xp_utc_timestamp)).strftime(self.time_format)
1215 return xp_server_readable_date