4 from datetime import datetime
6 from sfa.util.faults import SliverDoesNotExist, UnknownSfaType
7 from sfa.util.sfalogging import logger
8 from sfa.storage.alchemy import dbsession
9 from sfa.storage.model import RegRecord, RegUser, RegSlice, RegKey
10 from sqlalchemy.orm import joinedload
12 from sfa.trust.certificate import Keypair, convert_public_key
13 from sfa.trust.gid import create_uuid
14 from sfa.trust.hierarchy import Hierarchy
16 from sfa.managers.driver import Driver
17 from sfa.rspecs.version_manager import VersionManager
18 from sfa.rspecs.rspec import RSpec
20 from sfa.util.xrn import Xrn, hrn_to_urn, get_authority
23 ## thierry: everything that is API-related (i.e. handling incoming requests)
25 # SlabDriver should be really only about talking to the senslab testbed
28 from sfa.senslab.OARrestapi import OARrestapi
29 from sfa.senslab.LDAPapi import LDAPapi
31 from sfa.senslab.slabpostgres import SlabDB, slab_dbsession, SenslabXP
34 from sfa.senslab.slabaggregate import SlabAggregate, slab_xrn_to_hostname, \
36 from sfa.senslab.slabslices import SlabSlices
41 # this inheritance scheme is so that the driver object can receive
42 # GetNodes or GetSites sorts of calls directly
43 # and thus minimize the differences in the managers with the pl version
46 class SlabTestbedAPI():
48 def __init__(self, config):
49 self.oar = OARrestapi()
51 self.time_format = "%Y-%m-%d %H:%M:%S"
52 self.root_auth = config.SFA_REGISTRY_ROOT_AUTH
53 self.grain = 600 # 10 mins lease
58 #TODO clean GetPeers. 05/07/12SA
60 def GetPeers ( auth = None, peer_filter=None, return_fields_list=None):
63 existing_hrns_by_types = {}
64 logger.debug("SLABDRIVER \tGetPeers auth = %s, peer_filter %s, \
65 return_field %s " %(auth , peer_filter, return_fields_list))
66 all_records = dbsession.query(RegRecord).filter(RegRecord.type.like('%authority%')).all()
68 for record in all_records:
69 existing_records[(record.hrn, record.type)] = record
70 if record.type not in existing_hrns_by_types:
71 existing_hrns_by_types[record.type] = [record.hrn]
73 existing_hrns_by_types[record.type].append(record.hrn)
76 logger.debug("SLABDRIVER \tGetPeer\texisting_hrns_by_types %s "\
77 %( existing_hrns_by_types))
82 records_list.append(existing_records[(peer_filter,'authority')])
84 for hrn in existing_hrns_by_types['authority']:
85 records_list.append(existing_records[(hrn,'authority')])
87 logger.debug("SLABDRIVER \tGetPeer \trecords_list %s " \
93 return_records = records_list
94 if not peer_filter and not return_fields_list:
98 logger.debug("SLABDRIVER \tGetPeer return_records %s " \
100 return return_records
104 #TODO : Handling OR request in make_ldap_filters_from_records
105 #instead of the for loop
106 #over the records' list
107 def GetPersons(self, person_filter=None):
109 person_filter should be a list of dictionnaries when not set to None.
110 Returns a list of users whose accounts are enabled found in ldap.
113 logger.debug("SLABDRIVER \tGetPersons person_filter %s" \
116 if person_filter and isinstance(person_filter, list):
117 #If we are looking for a list of users (list of dict records)
118 #Usually the list contains only one user record
119 for searched_attributes in person_filter:
121 #Get only enabled user accounts in senslab LDAP :
122 #add a filter for make_ldap_filters_from_record
123 person = self.ldap.LdapFindUser(searched_attributes, \
124 is_user_enabled=True)
125 #If a person was found, append it to the list
127 person_list.append(person)
129 #If the list is empty, return None
130 if len(person_list) is 0:
134 #Get only enabled user accounts in senslab LDAP :
135 #add a filter for make_ldap_filters_from_record
136 person_list = self.ldap.LdapFindUser(is_user_enabled=True)
140 def GetTimezone(self):
141 """ Get the OAR servier time and timezone.
142 Unused SA 16/11/12"""
143 server_timestamp, server_tz = self.oar.parser.\
144 SendRequest("GET_timezone")
145 return server_timestamp, server_tz
148 def DeleteJobs(self, job_id, username):
149 logger.debug("SLABDRIVER \tDeleteJobs jobid %s username %s " %(job_id, username))
150 if not job_id or job_id is -1:
152 #username = slice_hrn.split(".")[-1].rstrip("_slice")
154 reqdict['method'] = "delete"
155 reqdict['strval'] = str(job_id)
158 answer = self.oar.POSTRequestToOARRestAPI('DELETE_jobs_id', \
160 logger.debug("SLABDRIVER \tDeleteJobs jobid %s \r\n answer %s \
161 username %s" %(job_id, answer, username))
166 ##TODO : Unused GetJobsId ? SA 05/07/12
167 #def GetJobsId(self, job_id, username = None ):
169 #Details about a specific job.
170 #Includes details about submission time, jot type, state, events,
171 #owner, assigned ressources, walltime etc...
175 #node_list_k = 'assigned_network_address'
176 ##Get job info from OAR
177 #job_info = self.oar.parser.SendRequest(req, job_id, username)
179 #logger.debug("SLABDRIVER \t GetJobsId %s " %(job_info))
181 #if job_info['state'] == 'Terminated':
182 #logger.debug("SLABDRIVER \t GetJobsId job %s TERMINATED"\
185 #if job_info['state'] == 'Error':
186 #logger.debug("SLABDRIVER \t GetJobsId ERROR message %s "\
191 #logger.error("SLABDRIVER \tGetJobsId KeyError")
194 #parsed_job_info = self.get_info_on_reserved_nodes(job_info, \
196 ##Replaces the previous entry
197 ##"assigned_network_address" / "reserved_resources"
199 #job_info.update({'node_ids':parsed_job_info[node_list_k]})
200 #del job_info[node_list_k]
201 #logger.debug(" \r\nSLABDRIVER \t GetJobsId job_info %s " %(job_info))
205 def GetJobsResources(self, job_id, username = None):
206 #job_resources=['reserved_resources', 'assigned_resources',\
207 #'job_id', 'job_uri', 'assigned_nodes',\
209 #assigned_res = ['resource_id', 'resource_uri']
210 #assigned_n = ['node', 'node_uri']
212 req = "GET_jobs_id_resources"
215 #Get job resources list from OAR
216 node_id_list = self.oar.parser.SendRequest(req, job_id, username)
217 logger.debug("SLABDRIVER \t GetJobsResources %s " %(node_id_list))
220 self.__get_hostnames_from_oar_node_ids(node_id_list)
223 #Replaces the previous entry "assigned_network_address" /
224 #"reserved_resources"
226 job_info = {'node_ids': hostname_list}
231 def get_info_on_reserved_nodes(self, job_info, node_list_name):
232 #Get the list of the testbed nodes records and make a
233 #dictionnary keyed on the hostname out of it
234 node_list_dict = self.GetNodes()
235 #node_hostname_list = []
236 node_hostname_list = [node['hostname'] for node in node_list_dict]
237 #for node in node_list_dict:
238 #node_hostname_list.append(node['hostname'])
239 node_dict = dict(zip(node_hostname_list, node_list_dict))
241 reserved_node_hostname_list = []
242 for index in range(len(job_info[node_list_name])):
243 #job_info[node_list_name][k] =
244 reserved_node_hostname_list[index] = \
245 node_dict[job_info[node_list_name][index]]['hostname']
247 logger.debug("SLABDRIVER \t get_info_on_reserved_nodes \
248 reserved_node_hostname_list %s" \
249 %(reserved_node_hostname_list))
251 logger.error("SLABDRIVER \t get_info_on_reserved_nodes KEYERROR " )
253 return reserved_node_hostname_list
255 def GetNodesCurrentlyInUse(self):
256 """Returns a list of all the nodes already involved in an oar job"""
257 return self.oar.parser.SendRequest("GET_running_jobs")
259 def __get_hostnames_from_oar_node_ids(self, resource_id_list ):
260 full_nodes_dict_list = self.GetNodes()
261 #Put the full node list into a dictionary keyed by oar node id
262 oar_id_node_dict = {}
263 for node in full_nodes_dict_list:
264 oar_id_node_dict[node['oar_id']] = node
266 #logger.debug("SLABDRIVER \t __get_hostnames_from_oar_node_ids\
267 #oar_id_node_dict %s" %(oar_id_node_dict))
269 hostname_dict_list = []
270 for resource_id in resource_id_list:
271 #Because jobs requested "asap" do not have defined resources
272 if resource_id is not "Undefined":
273 hostname_dict_list.append(\
274 oar_id_node_dict[resource_id]['hostname'])
276 #hostname_list.append(oar_id_node_dict[resource_id]['hostname'])
277 return hostname_dict_list
279 def GetReservedNodes(self, username = None):
280 #Get the nodes in use and the reserved nodes
281 reservation_dict_list = \
282 self.oar.parser.SendRequest("GET_reserved_nodes", \
286 for resa in reservation_dict_list:
287 logger.debug ("GetReservedNodes resa %s"%(resa))
288 #dict list of hostnames and their site
289 resa['reserved_nodes'] = \
290 self.__get_hostnames_from_oar_node_ids(resa['resource_ids'])
292 #del resa['resource_ids']
293 return reservation_dict_list
295 def GetNodes(self, node_filter_dict = None, return_fields_list = None):
297 node_filter_dict : dictionnary of lists
300 node_dict_by_id = self.oar.parser.SendRequest("GET_resources_full")
301 node_dict_list = node_dict_by_id.values()
302 logger.debug (" SLABDRIVER GetNodes node_filter_dict %s \
303 return_fields_list %s "%(node_filter_dict, return_fields_list))
304 #No filtering needed return the list directly
305 if not (node_filter_dict or return_fields_list):
306 return node_dict_list
308 return_node_list = []
310 for filter_key in node_filter_dict:
312 #Filter the node_dict_list by each value contained in the
313 #list node_filter_dict[filter_key]
314 for value in node_filter_dict[filter_key]:
315 for node in node_dict_list:
316 if node[filter_key] == value:
317 if return_fields_list :
319 for k in return_fields_list:
321 return_node_list.append(tmp)
323 return_node_list.append(node)
325 logger.log_exc("GetNodes KeyError")
329 return return_node_list
331 def AddSlice(slice_record, user_record):
332 """Add slice to the sfa tables. Called by verify_slice
333 during lease/sliver creation.
336 sfa_record = RegSlice(hrn=slice_record['hrn'],
337 gid=slice_record['gid'],
338 pointer=slice_record['slice_id'],
339 authority=slice_record['authority'])
341 logger.debug("SLABDRIVER.PY AddSlice sfa_record %s user_record %s" \
342 %(sfa_record, user_record))
343 sfa_record.just_created()
344 dbsession.add(sfa_record)
346 #Update the reg-researcher dependance table
347 sfa_record.reg_researchers = [user_record]
350 #Update the senslab table with the new slice
351 #slab_slice = SenslabXP( slice_hrn = slice_record['slice_hrn'], \
352 #record_id_slice = sfa_record.record_id , \
353 #record_id_user = slice_record['record_id_user'], \
354 #peer_authority = slice_record['peer_authority'])
356 #logger.debug("SLABDRIVER.PY \tAddSlice slice_record %s \
357 #slab_slice %s sfa_record %s" \
358 #%(slice_record,slab_slice, sfa_record))
359 #slab_dbsession.add(slab_slice)
360 #slab_dbsession.commit()
363 def GetSites(self, site_filter_name_list = None, return_fields_list = None):
364 site_dict = self.oar.parser.SendRequest("GET_sites")
365 #site_dict : dict where the key is the sit ename
366 return_site_list = []
367 if not ( site_filter_name_list or return_fields_list):
368 return_site_list = site_dict.values()
369 return return_site_list
371 for site_filter_name in site_filter_name_list:
372 if site_filter_name in site_dict:
373 if return_fields_list:
374 for field in return_fields_list:
377 tmp[field] = site_dict[site_filter_name][field]
379 logger.error("GetSites KeyError %s "%(field))
381 return_site_list.append(tmp)
383 return_site_list.append( site_dict[site_filter_name])
386 return return_site_list
392 #TODO : Check rights to delete person
393 def DeletePerson(self, person_record):
394 """ Disable an existing account in senslab LDAP.
395 Users and techs can only delete themselves. PIs can only
396 delete themselves and other non-PIs at their sites.
397 ins can delete anyone.
398 Returns 1 if successful, faults otherwise.
402 #Disable user account in senslab LDAP
403 ret = self.ldap.LdapMarkUserAsDeleted(person_record)
404 logger.warning("SLABDRIVER DeletePerson %s " %(person_record))
407 #TODO Check DeleteSlice, check rights 05/07/2012 SA
408 def DeleteSlice(self, slice_record):
409 """ Deletes the specified slice.
410 Senslab : Kill the job associated with the slice if there is one
411 using DeleteSliceFromNodes.
412 Updates the slice record in slab db to remove the slice nodes.
414 Users may only delete slices of which they are members. PIs may
415 delete any of the slices at their sites, or any slices of which
416 they are members. Admins may delete any slice.
417 Returns 1 if successful, faults otherwise.
421 self.DeleteSliceFromNodes(slice_record)
422 logger.warning("SLABDRIVER DeleteSlice %s "%(slice_record))
426 def __add_person_to_db(user_dict):
428 check_if_exists = dbsession.query(RegUser).filter_by(email = user_dict['email']).first()
430 if not check_if_exists:
431 logger.debug("__add_person_to_db \t Adding %s \r\n \r\n \
432 _________________________________________________________________________\
434 hrn = user_dict['hrn']
435 person_urn = hrn_to_urn(hrn, 'user')
436 pubkey = user_dict['pkey']
438 pkey = convert_public_key(pubkey)
440 #key not good. create another pkey
441 self.logger.warn('__add_person_to_db: unable to convert public \
443 pkey = Keypair(create=True)
446 if pubkey is not None and pkey is not None :
447 hierarchy = Hierarchy()
448 person_gid = hierarchy.create_gid(person_urn, create_uuid(), pkey)
449 if user_dict['email']:
450 logger.debug("__add_person_to_db \r\n \r\n SLAB IMPORTER PERSON EMAIL OK email %s " %(user_dict['email']))
451 person_gid.set_email(user_dict['email'])
453 user_record = RegUser(hrn=hrn , pointer= '-1', authority=get_authority(hrn), \
454 email=user_dict['email'], gid = person_gid)
455 user_record.reg_keys = [RegKey(user_dict['pkey'])]
456 user_record.just_created()
457 dbsession.add (user_record)
461 #TODO AddPerson 04/07/2012 SA
462 #def AddPerson(self, auth, person_fields=None):
463 def AddPerson(self, record):#TODO fixing 28/08//2012 SA
464 """Adds a new account. Any fields specified in records are used,
465 otherwise defaults are used.
466 Accounts are disabled by default. To enable an account,
468 Returns the new person_id (> 0) if successful, faults otherwise.
472 ret = self.ldap.LdapAddUser(record)
474 record['hrn'] = self.root_auth + '.' + ret['uid']
475 logger.debug("SLABDRIVER AddPerson return code %s record %s \r\n "%(ret,record))
476 self.__add_person_to_db(record)
479 #TODO AddPersonToSite 04/07/2012 SA
480 def AddPersonToSite (self, auth, person_id_or_email, \
481 site_id_or_login_base=None):
482 """ Adds the specified person to the specified site. If the person is
483 already a member of the site, no errors are returned. Does not change
484 the person's primary site.
485 Returns 1 if successful, faults otherwise.
489 logger.warning("SLABDRIVER AddPersonToSite EMPTY - DO NOTHING \r\n ")
492 #TODO AddRoleToPerson : Not sure if needed in senslab 04/07/2012 SA
493 def AddRoleToPerson(self, auth, role_id_or_name, person_id_or_email):
494 """Grants the specified role to the person.
495 PIs can only grant the tech and user roles to users and techs at their
496 sites. Admins can grant any role to any user.
497 Returns 1 if successful, faults otherwise.
501 logger.warning("SLABDRIVER AddRoleToPerson EMPTY - DO NOTHING \r\n ")
504 #TODO AddPersonKey 04/07/2012 SA
505 def AddPersonKey(self, auth, person_id_or_email, key_fields=None):
506 """Adds a new key to the specified account.
507 Non-admins can only modify their own keys.
508 Returns the new key_id (> 0) if successful, faults otherwise.
512 logger.warning("SLABDRIVER AddPersonKey EMPTY - DO NOTHING \r\n ")
515 def DeleteLeases(self, leases_id_list, slice_hrn ):
516 logger.debug("SLABDRIVER DeleteLeases leases_id_list %s slice_hrn %s \
517 \r\n " %(leases_id_list, slice_hrn))
518 for job_id in leases_id_list:
519 self.DeleteJobs(job_id, slice_hrn)
527 def LaunchExperimentOnOAR(self, added_nodes, slice_name, \
528 lease_start_time, lease_duration, slice_user=None):
530 lease_dict['lease_start_time'] = lease_start_time
531 lease_dict['lease_duration'] = lease_duration
532 lease_dict['added_nodes'] = added_nodes
533 lease_dict['slice_name'] = slice_name
534 lease_dict['slice_user'] = slice_user
535 lease_dict['grain'] = self.GetLeaseGranularity()
536 lease_dict['time_format'] = self.time_format
539 def __create_job_structure_request_for_OAR(lease_dict):
540 """ Creates the structure needed for a correct POST on OAR.
541 Makes the timestamp transformation into the appropriate format.
542 Sends the POST request to create the job with the resources in
551 reqdict['workdir'] = '/tmp'
552 reqdict['resource'] = "{network_address in ("
554 for node in lease_dict['added_nodes']:
555 logger.debug("\r\n \r\n OARrestapi \t \
556 __create_job_structure_request_for_OAR node %s" %(node))
558 # Get the ID of the node
560 reqdict['resource'] += "'" + nodeid + "', "
561 nodeid_list.append(nodeid)
563 custom_length = len(reqdict['resource'])- 2
564 reqdict['resource'] = reqdict['resource'][0:custom_length] + \
565 ")}/nodes=" + str(len(nodeid_list))
567 def __process_walltime(duration):
568 """ Calculates the walltime in seconds from the duration in H:M:S
569 specified in the RSpec.
573 # Fixing the walltime by adding a few delays.
574 # First put the walltime in seconds oarAdditionalDelay = 20;
575 # additional delay for /bin/sleep command to
576 # take in account prologue and epilogue scripts execution
577 # int walltimeAdditionalDelay = 240; additional delay
578 desired_walltime = duration
579 total_walltime = desired_walltime + 240 #+4 min Update SA 23/10/12
580 sleep_walltime = desired_walltime # 0 sec added Update SA 23/10/12
582 #Put the walltime back in str form
584 walltime.append(str(total_walltime / 3600))
585 total_walltime = total_walltime - 3600 * int(walltime[0])
586 #Get the remaining minutes
587 walltime.append(str(total_walltime / 60))
588 total_walltime = total_walltime - 60 * int(walltime[1])
590 walltime.append(str(total_walltime))
593 logger.log_exc(" __process_walltime duration null")
595 return walltime, sleep_walltime
598 walltime, sleep_walltime = \
599 __process_walltime(int(lease_dict['lease_duration'])*lease_dict['grain'])
602 reqdict['resource'] += ",walltime=" + str(walltime[0]) + \
603 ":" + str(walltime[1]) + ":" + str(walltime[2])
604 reqdict['script_path'] = "/bin/sleep " + str(sleep_walltime)
606 #In case of a scheduled experiment (not immediate)
607 #To run an XP immediately, don't specify date and time in RSpec
608 #They will be set to None.
609 if lease_dict['lease_start_time'] is not '0':
610 #Readable time accepted by OAR
611 start_time = datetime.fromtimestamp(int(lease_dict['lease_start_time'])).\
612 strftime(lease_dict['time_format'])
613 reqdict['reservation'] = start_time
614 #If there is not start time, Immediate XP. No need to add special
618 reqdict['type'] = "deploy"
619 reqdict['directory'] = ""
620 reqdict['name'] = "SFA_" + lease_dict['slice_user']
624 logger.debug("SLABDRIVER.PY \tLaunchExperimentOnOAR slice_user %s\
625 \r\n " %(slice_user))
626 #Create the request for OAR
627 reqdict = __create_job_structure_request_for_OAR(lease_dict)
628 # first step : start the OAR job and update the job
629 logger.debug("SLABDRIVER.PY \tLaunchExperimentOnOAR reqdict %s\
632 answer = self.oar.POSTRequestToOARRestAPI('POST_job', \
634 logger.debug("SLABDRIVER \tLaunchExperimentOnOAR jobid %s " %(answer))
638 logger.log_exc("SLABDRIVER \tLaunchExperimentOnOAR \
639 Impossible to create job %s " %(answer))
646 logger.debug("SLABDRIVER \tLaunchExperimentOnOAR jobid %s \
647 added_nodes %s slice_user %s" %(jobid, added_nodes, slice_user))
653 def AddLeases(self, hostname_list, slice_record, \
654 lease_start_time, lease_duration):
655 logger.debug("SLABDRIVER \r\n \r\n \t AddLeases hostname_list %s \
656 slice_record %s lease_start_time %s lease_duration %s "\
657 %( hostname_list, slice_record , lease_start_time, \
660 #tmp = slice_record['reg-researchers'][0].split(".")
661 username = slice_record['login']
662 #username = tmp[(len(tmp)-1)]
663 job_id = self.LaunchExperimentOnOAR(hostname_list, slice_record['hrn'], \
664 lease_start_time, lease_duration, username)
665 start_time = datetime.fromtimestamp(int(lease_start_time)).strftime(self.time_format)
666 end_time = lease_start_time + lease_duration
668 import logging, logging.handlers
669 from sfa.util.sfalogging import _SfaLogger
670 logger.debug("SLABDRIVER \r\n \r\n \t AddLeases TURN ON LOGGING SQL %s %s %s "%(slice_record['hrn'], job_id, end_time))
671 sql_logger = _SfaLogger(loggername = 'sqlalchemy.engine', level=logging.DEBUG)
672 logger.debug("SLABDRIVER \r\n \r\n \t AddLeases %s %s %s " %(type(slice_record['hrn']), type(job_id), type(end_time)))
674 slab_ex_row = SenslabXP(slice_hrn = slice_record['hrn'], \
675 job_id = job_id, end_time= end_time)
677 logger.debug("SLABDRIVER \r\n \r\n \t AddLeases slab_ex_row %s" \
679 slab_dbsession.add(slab_ex_row)
680 slab_dbsession.commit()
682 logger.debug("SLABDRIVER \t AddLeases hostname_list start_time %s " %(start_time))
687 #Delete the jobs from job_senslab table
688 def DeleteSliceFromNodes(self, slice_record):
689 logger.debug("SLABDRIVER \t DeleteSliceFromNodese %s " %(slice_record))
690 if isinstance(slice_record['oar_job_id'],list):
691 for job_id in slice_record['oar_job_id']:
692 self.DeleteJobs(job_id, slice_record['user'])
694 self.DeleteJobs(slice_record['oar_job_id'],slice_record['user'])
698 def GetLeaseGranularity(self):
699 """ Returns the granularity of Senslab testbed.
700 OAR returns seconds for experiments duration.
702 Experiments which last less than 10 min are invalid"""
709 def update_jobs_in_slabdb( job_oar_list, jobs_psql):
710 #Get all the entries in slab_xp table
713 jobs_psql = set(jobs_psql)
714 kept_jobs = set(job_oar_list).intersection(jobs_psql)
715 logger.debug ( "\r\n \t\ update_jobs_in_slabdb jobs_psql %s \r\n \t \
716 job_oar_list %s kept_jobs %s "%(jobs_psql, job_oar_list, kept_jobs))
717 deleted_jobs = set(jobs_psql).difference(kept_jobs)
718 deleted_jobs = list(deleted_jobs)
719 if len(deleted_jobs) > 0:
720 slab_dbsession.query(SenslabXP).filter(SenslabXP.job_id.in_(deleted_jobs)).delete(synchronize_session='fetch')
721 slab_dbsession.commit()
727 def GetLeases(self, lease_filter_dict=None, login=None):
730 unfiltered_reservation_list = self.GetReservedNodes(login)
732 reservation_list = []
733 #Find the slice associated with this user senslab ldap uid
734 logger.debug(" SLABDRIVER.PY \tGetLeases login %s\
735 unfiltered_reservation_list %s " %(login, unfiltered_reservation_list))
736 #Create user dict first to avoid looking several times for
737 #the same user in LDAP SA 27/07/12
741 jobs_psql_query = slab_dbsession.query(SenslabXP).all()
742 jobs_psql_dict = [ (row.job_id, row.__dict__ )for row in jobs_psql_query ]
743 jobs_psql_dict = dict(jobs_psql_dict)
744 logger.debug("SLABDRIVER \tGetLeases jobs_psql_dict %s"\
746 jobs_psql_id_list = [ row.job_id for row in jobs_psql_query ]
750 for resa in unfiltered_reservation_list:
751 logger.debug("SLABDRIVER \tGetLeases USER %s"\
753 #Cosntruct list of jobs (runing, waiting..) in oar
754 job_oar_list.append(resa['lease_id'])
755 #If there is information on the job in SLAB DB (slice used and job id)
756 if resa['lease_id'] in jobs_psql_dict:
757 job_info = jobs_psql_dict[resa['lease_id']]
758 logger.debug("SLABDRIVER \tGetLeases resa_user_dict %s"\
760 resa['slice_hrn'] = job_info['slice_hrn']
761 resa['slice_id'] = hrn_to_urn(resa['slice_hrn'], 'slice')
763 #Assume it is a senslab slice:
765 resa['slice_id'] = hrn_to_urn(self.root_auth+'.'+ resa['user'] +"_slice" , 'slice')
766 #if resa['user'] not in resa_user_dict:
767 #logger.debug("SLABDRIVER \tGetLeases userNOTIN ")
768 #ldap_info = self.ldap.LdapSearch('(uid='+resa['user']+')')
770 #ldap_info = ldap_info[0][1]
771 ##Get the backref :relationship table reg-researchers
772 #user = dbsession.query(RegUser).options(joinedload('reg_slices_as_researcher')).filter_by(email = \
773 #ldap_info['mail'][0])
776 #user = user.__dict__
777 #slice_info = user['reg_slices_as_researcher'][0].__dict__
778 ##Separated in case user not in database :
779 ##record_id not defined SA 17/07//12
781 ##query_slice_info = slab_dbsession.query(SenslabXP).filter_by(record_id_user = user.record_id)
782 ##if query_slice_info:
783 ##slice_info = query_slice_info.first()
787 #resa_user_dict[resa['user']] = {}
788 #resa_user_dict[resa['user']]['ldap_info'] = user
789 #resa_user_dict[resa['user']]['slice_info'] = slice_info
791 #resa['slice_hrn'] = resa_user_dict[resa['user']]['slice_info']['hrn']
792 #resa['slice_id'] = hrn_to_urn(resa['slice_hrn'], 'slice')
794 resa['slice_hrn'] = Xrn(resa['slice_id']).get_hrn()
796 resa['component_id_list'] = []
797 #Transform the hostnames into urns (component ids)
798 for node in resa['reserved_nodes']:
799 #resa['component_id_list'].append(hostname_to_urn(self.hrn, \
800 #self.root_auth, node['hostname']))
801 slab_xrn = slab_xrn_object(self.root_auth, node)
802 resa['component_id_list'].append(slab_xrn.urn)
804 if lease_filter_dict:
805 logger.debug("SLABDRIVER \tGetLeases resa_ %s \r\n leasefilter %s"\
806 %(resa,lease_filter_dict))
808 if lease_filter_dict['name'] == resa['slice_hrn']:
809 reservation_list.append(resa)
811 if lease_filter_dict is None:
812 reservation_list = unfiltered_reservation_list
814 #del unfiltered_reservation_list[unfiltered_reservation_list.index(resa)]
817 self.update_jobs_in_slabdb(job_oar_list, jobs_psql_id_list)
819 #for resa in unfiltered_reservation_list:
823 #if resa['user'] in resa_user_dict:
824 #resa['slice_hrn'] = resa_user_dict[resa['user']]['slice_info']['hrn']
825 #resa['slice_id'] = hrn_to_urn(resa['slice_hrn'], 'slice')
827 ##resa['slice_id'] = hrn_to_urn(slice_info.slice_hrn, 'slice')
828 #resa['component_id_list'] = []
829 ##Transform the hostnames into urns (component ids)
830 #for node in resa['reserved_nodes']:
831 ##resa['component_id_list'].append(hostname_to_urn(self.hrn, \
832 ##self.root_auth, node['hostname']))
833 #slab_xrn = slab_xrn_object(self.root_auth, node)
834 #resa['component_id_list'].append(slab_xrn.urn)
836 ##Filter the reservation list if necessary
837 ##Returns all the leases associated with a given slice
838 #if lease_filter_dict:
839 #logger.debug("SLABDRIVER \tGetLeases lease_filter_dict %s"\
840 #%(lease_filter_dict))
841 #for resa in unfiltered_reservation_list:
842 #if lease_filter_dict['name'] == resa['slice_hrn']:
843 #reservation_list.append(resa)
845 #reservation_list = unfiltered_reservation_list
847 logger.debug(" SLABDRIVER.PY \tGetLeases reservation_list %s"\
849 return reservation_list
854 #TODO FUNCTIONS SECTION 04/07/2012 SA
856 #TODO : Is UnBindObjectFromPeer still necessary ? Currently does nothing
859 def UnBindObjectFromPeer( auth, object_type, object_id, shortname):
860 """ This method is a hopefully temporary hack to let the sfa correctly
861 detach the objects it creates from a remote peer object. This is
862 needed so that the sfa federation link can work in parallel with
863 RefreshPeer, as RefreshPeer depends on remote objects being correctly
866 auth : struct, API authentication structure
867 AuthMethod : string, Authentication method to use
868 object_type : string, Object type, among 'site','person','slice',
870 object_id : int, object_id
871 shortname : string, peer shortname
875 logger.warning("SLABDRIVER \tUnBindObjectFromPeer EMPTY-\
879 #TODO Is BindObjectToPeer still necessary ? Currently does nothing
881 def BindObjectToPeer(self, auth, object_type, object_id, shortname=None, \
882 remote_object_id=None):
883 """This method is a hopefully temporary hack to let the sfa correctly
884 attach the objects it creates to a remote peer object. This is needed
885 so that the sfa federation link can work in parallel with RefreshPeer,
886 as RefreshPeer depends on remote objects being correctly marked.
888 shortname : string, peer shortname
889 remote_object_id : int, remote object_id, set to 0 if unknown
893 logger.warning("SLABDRIVER \tBindObjectToPeer EMPTY - DO NOTHING \r\n ")
896 #TODO UpdateSlice 04/07/2012 SA
897 #Funciton should delete and create another job since oin senslab slice=job
898 def UpdateSlice(self, auth, slice_id_or_name, slice_fields=None):
899 """Updates the parameters of an existing slice with the values in
901 Users may only update slices of which they are members.
902 PIs may update any of the slices at their sites, or any slices of
903 which they are members. Admins may update any slice.
904 Only PIs and admins may update max_nodes. Slices cannot be renewed
905 (by updating the expires parameter) more than 8 weeks into the future.
906 Returns 1 if successful, faults otherwise.
910 logger.warning("SLABDRIVER UpdateSlice EMPTY - DO NOTHING \r\n ")
913 #TODO UpdatePerson 04/07/2012 SA
914 def UpdatePerson(self, slab_hrn, federated_hrn, person_fields=None):
915 """Updates a person. Only the fields specified in person_fields
916 are updated, all other fields are left untouched.
917 Users and techs can only update themselves. PIs can only update
918 themselves and other non-PIs at their sites.
919 Returns 1 if successful, faults otherwise.
923 #new_row = FederatedToSenslab(slab_hrn, federated_hrn)
924 #slab_dbsession.add(new_row)
925 #slab_dbsession.commit()
927 logger.debug("SLABDRIVER UpdatePerson EMPTY - DO NOTHING \r\n ")
930 #TODO GetKeys 04/07/2012 SA
931 def GetKeys(self, auth, key_filter=None, return_fields=None):
932 """Returns an array of structs containing details about keys.
933 If key_filter is specified and is an array of key identifiers,
934 or a struct of key attributes, only keys matching the filter
935 will be returned. If return_fields is specified, only the
936 specified details will be returned.
938 Admin may query all keys. Non-admins may only query their own keys.
942 logger.warning("SLABDRIVER GetKeys EMPTY - DO NOTHING \r\n ")
945 #TODO DeleteKey 04/07/2012 SA
946 def DeleteKey(self, key_id):
948 Non-admins may only delete their own keys.
949 Returns 1 if successful, faults otherwise.
953 logger.warning("SLABDRIVER DeleteKey EMPTY - DO NOTHING \r\n ")
960 def _sql_get_slice_info( slice_filter ):
961 #DO NOT USE RegSlice - reg_researchers to get the hrn
962 #of the user otherwise will mess up the RegRecord in
963 #Resolve, don't know why - SA 08/08/2012
965 #Only one entry for one user = one slice in slab_xp table
966 #slicerec = dbsession.query(RegRecord).filter_by(hrn = slice_filter).first()
967 raw_slicerec = dbsession.query(RegSlice).options(joinedload('reg_researchers')).filter_by(hrn = slice_filter).first()
968 #raw_slicerec = dbsession.query(RegRecord).filter_by(hrn = slice_filter).first()
971 #raw_slicerec.reg_researchers
972 raw_slicerec = raw_slicerec.__dict__
973 logger.debug(" SLABDRIVER \t get_slice_info slice_filter %s \
974 raw_slicerec %s"%(slice_filter, raw_slicerec))
975 slicerec = raw_slicerec
976 #only one researcher per slice so take the first one
977 #slicerec['reg_researchers'] = raw_slicerec['reg_researchers']
978 #del slicerec['reg_researchers']['_sa_instance_state']
985 def _sql_get_slice_info_from_user(slice_filter ):
986 #slicerec = dbsession.query(RegRecord).filter_by(record_id = slice_filter).first()
987 raw_slicerec = dbsession.query(RegUser).options(joinedload('reg_slices_as_researcher')).filter_by(record_id = slice_filter).first()
988 #raw_slicerec = dbsession.query(RegRecord).filter_by(record_id = slice_filter).first()
989 #Put it in correct order
990 user_needed_fields = ['peer_authority', 'hrn', 'last_updated', 'classtype', 'authority', 'gid', 'record_id', 'date_created', 'type', 'email', 'pointer']
991 slice_needed_fields = ['peer_authority', 'hrn', 'last_updated', 'classtype', 'authority', 'gid', 'record_id', 'date_created', 'type', 'pointer']
993 #raw_slicerec.reg_slices_as_researcher
994 raw_slicerec = raw_slicerec.__dict__
997 dict([(k, raw_slicerec['reg_slices_as_researcher'][0].__dict__[k]) \
998 for k in slice_needed_fields])
999 slicerec['reg_researchers'] = dict([(k, raw_slicerec[k]) \
1000 for k in user_needed_fields])
1001 #TODO Handle multiple slices for one user SA 10/12/12
1002 #for now only take the first slice record associated to the rec user
1003 ##slicerec = raw_slicerec['reg_slices_as_researcher'][0].__dict__
1004 #del raw_slicerec['reg_slices_as_researcher']
1005 #slicerec['reg_researchers'] = raw_slicerec
1006 ##del slicerec['_sa_instance_state']
1013 def _get_slice_records(self, slice_filter = None, \
1014 slice_filter_type = None):
1018 #Get list of slices based on the slice hrn
1019 if slice_filter_type == 'slice_hrn':
1021 #if get_authority(slice_filter) == self.root_auth:
1022 #login = slice_filter.split(".")[1].split("_")[0]
1024 slicerec = self._sql_get_slice_info(slice_filter)
1026 if slicerec is None:
1030 #Get slice based on user id
1031 if slice_filter_type == 'record_id_user':
1033 slicerec = self._sql_get_slice_info_from_user(slice_filter)
1036 fixed_slicerec_dict = slicerec
1037 #At this point if the there is no login it means
1038 #record_id_user filter has been used for filtering
1040 ##If theslice record is from senslab
1041 #if fixed_slicerec_dict['peer_authority'] is None:
1042 #login = fixed_slicerec_dict['hrn'].split(".")[1].split("_")[0]
1043 #return login, fixed_slicerec_dict
1044 return fixed_slicerec_dict
1048 def GetSlices(self, slice_filter = None, slice_filter_type = None, login=None):
1049 """ Get the slice records from the slab db.
1050 Returns a slice ditc if slice_filter and slice_filter_type
1052 Returns a list of slice dictionnaries if there are no filters
1057 authorized_filter_types_list = ['slice_hrn', 'record_id_user']
1058 return_slicerec_dictlist = []
1060 #First try to get information on the slice based on the filter provided
1061 if slice_filter_type in authorized_filter_types_list:
1062 fixed_slicerec_dict = \
1063 self._get_slice_records(slice_filter, slice_filter_type)
1064 #login, fixed_slicerec_dict = \
1065 #self._get_slice_records(slice_filter, slice_filter_type)
1066 logger.debug(" SLABDRIVER \tGetSlices login %s \
1067 slice record %s slice_filter %s slice_filter_type %s "\
1068 %(login, fixed_slicerec_dict,slice_filter, slice_filter_type))
1071 #Now we have the slice record fixed_slicerec_dict, get the
1072 #jobs associated to this slice
1073 #leases_list = self.GetReservedNodes(username = login)
1074 leases_list = self.GetLeases(login = login)
1075 #If no job is running or no job scheduled
1076 #return only the slice record
1077 if leases_list == [] and fixed_slicerec_dict:
1078 return_slicerec_dictlist.append(fixed_slicerec_dict)
1080 #If several jobs for one slice , put the slice record into
1081 # each lease information dict
1084 for lease in leases_list :
1086 logger.debug("SLABDRIVER.PY \tGetSlices slice_filter %s \
1087 \ lease['slice_hrn'] %s" \
1088 %(slice_filter, lease['slice_hrn']))
1089 if slice_filter_type =='slice_hrn' and lease['slice_hrn'] == slice_filter:
1090 reserved_list = lease['reserved_nodes']
1091 slicerec_dict['slice_hrn'] = lease['slice_hrn']
1092 slicerec_dict['hrn'] = lease['slice_hrn']
1093 slicerec_dict['user'] = lease['user']
1094 slicerec_dict['oar_job_id'] = lease['lease_id']
1095 slicerec_dict.update({'list_node_ids':{'hostname':reserved_list}})
1096 slicerec_dict.update({'node_ids':lease['reserved_nodes']})
1098 #Update lease dict with the slice record
1099 if fixed_slicerec_dict:
1100 fixed_slicerec_dict['oar_job_id'] = []
1101 fixed_slicerec_dict['oar_job_id'].append(slicerec_dict['oar_job_id'])
1102 slicerec_dict.update(fixed_slicerec_dict)
1103 #slicerec_dict.update({'hrn':\
1104 #str(fixed_slicerec_dict['slice_hrn'])})
1106 return_slicerec_dictlist.append(slicerec_dict)
1107 logger.debug("SLABDRIVER.PY \tGetSlices \
1108 OHOHOHOH %s" %(return_slicerec_dictlist ))
1110 logger.debug("SLABDRIVER.PY \tGetSlices \
1111 slicerec_dict %s return_slicerec_dictlist %s \
1112 lease['reserved_nodes'] \
1113 %s" %(slicerec_dict, return_slicerec_dictlist, \
1114 lease['reserved_nodes'] ))
1116 logger.debug("SLABDRIVER.PY \tGetSlices RETURN \
1117 return_slicerec_dictlist %s" \
1118 %(return_slicerec_dictlist))
1120 return return_slicerec_dictlist
1124 #Get all slices from the senslab sfa database ,
1125 #put them in dict format
1126 #query_slice_list = dbsession.query(RegRecord).all()
1127 query_slice_list = dbsession.query(RegSlice).options(joinedload('reg_researchers')).all()
1128 #query_slice_list = dbsession.query(RegRecord).filter_by(type='slice').all()
1129 #query_slice_list = slab_dbsession.query(SenslabXP).all()
1130 return_slicerec_dictlist = []
1131 for record in query_slice_list:
1132 tmp = record.__dict__
1133 tmp['reg_researchers'] = tmp['reg_researchers'][0].__dict__
1134 #del tmp['reg_researchers']['_sa_instance_state']
1135 return_slicerec_dictlist.append(tmp)
1136 #return_slicerec_dictlist.append(record.__dict__)
1138 #Get all the jobs reserved nodes
1139 leases_list = self.GetReservedNodes()
1142 for fixed_slicerec_dict in return_slicerec_dictlist:
1144 #Check if the slice belongs to a senslab user
1145 if fixed_slicerec_dict['peer_authority'] is None:
1146 owner = fixed_slicerec_dict['hrn'].split(".")[1].split("_")[0]
1149 for lease in leases_list:
1150 if owner == lease['user']:
1151 slicerec_dict['oar_job_id'] = lease['lease_id']
1153 #for reserved_node in lease['reserved_nodes']:
1154 logger.debug("SLABDRIVER.PY \tGetSlices lease %s "\
1157 reserved_list = lease['reserved_nodes']
1159 slicerec_dict.update({'node_ids':lease['reserved_nodes']})
1160 slicerec_dict.update({'list_node_ids':{'hostname':reserved_list}})
1161 slicerec_dict.update(fixed_slicerec_dict)
1162 #slicerec_dict.update({'hrn':\
1163 #str(fixed_slicerec_dict['slice_hrn'])})
1164 #return_slicerec_dictlist.append(slicerec_dict)
1165 fixed_slicerec_dict.update(slicerec_dict)
1167 logger.debug("SLABDRIVER.PY \tGetSlices RETURN \
1168 return_slicerec_dictlist %s \slice_filter %s " \
1169 %(return_slicerec_dictlist, slice_filter))
1171 return return_slicerec_dictlist
1177 # Convert SFA fields to PLC fields for use when registering up updating
1178 # registry record in the PLC database
1180 # @param type type of record (user, slice, ...)
1181 # @param hrn human readable name
1182 # @param sfa_fields dictionary of SFA fields
1183 # @param slab_fields dictionary of PLC fields (output)
1185 def sfa_fields_to_slab_fields(sfa_type, hrn, record):
1189 #for field in record:
1190 # slab_record[field] = record[field]
1192 if sfa_type == "slice":
1193 #instantion used in get_slivers ?
1194 if not "instantiation" in slab_record:
1195 slab_record["instantiation"] = "senslab-instantiated"
1196 #slab_record["hrn"] = hrn_to_pl_slicename(hrn)
1197 #Unused hrn_to_pl_slicename because Slab's hrn already
1198 #in the appropriate form SA 23/07/12
1199 slab_record["hrn"] = hrn
1200 logger.debug("SLABDRIVER.PY sfa_fields_to_slab_fields \
1201 slab_record %s " %(slab_record['hrn']))
1203 slab_record["url"] = record["url"]
1204 if "description" in record:
1205 slab_record["description"] = record["description"]
1206 if "expires" in record:
1207 slab_record["expires"] = int(record["expires"])
1209 #nodes added by OAR only and then imported to SFA
1210 #elif type == "node":
1211 #if not "hostname" in slab_record:
1212 #if not "hostname" in record:
1213 #raise MissingSfaInfo("hostname")
1214 #slab_record["hostname"] = record["hostname"]
1215 #if not "model" in slab_record:
1216 #slab_record["model"] = "geni"
1219 #elif type == "authority":
1220 #slab_record["login_base"] = hrn_to_slab_login_base(hrn)
1222 #if not "name" in slab_record:
1223 #slab_record["name"] = hrn
1225 #if not "abbreviated_name" in slab_record:
1226 #slab_record["abbreviated_name"] = hrn
1228 #if not "enabled" in slab_record:
1229 #slab_record["enabled"] = True
1231 #if not "is_public" in slab_record:
1232 #slab_record["is_public"] = True
1239 def __transforms_timestamp_into_date(self, xp_utc_timestamp = None):
1240 """ Transforms unix timestamp into valid OAR date format """
1242 #Used in case of a scheduled experiment (not immediate)
1243 #To run an XP immediately, don't specify date and time in RSpec
1244 #They will be set to None.
1245 if xp_utc_timestamp:
1246 #transform the xp_utc_timestamp into server readable time
1247 xp_server_readable_date = datetime.fromtimestamp(int(\
1248 xp_utc_timestamp)).strftime(self.time_format)
1250 return xp_server_readable_date
1268 class SlabDriver(Driver):
1269 """ Senslab Driver class inherited from Driver generic class.
1271 Contains methods compliant with the SFA standard and the testbed
1272 infrastructure (calls to LDAP and OAR).
1274 def __init__(self, config):
1275 Driver.__init__ (self, config)
1276 self.config = config
1277 self.hrn = config.SFA_INTERFACE_HRN
1279 self.db = SlabDB(config, debug = False)
1280 self.slab_api = SlabTestbedAPI(config)
1283 def augment_records_with_testbed_info (self, record_list ):
1284 """ Adds specific testbed info to the records. """
1285 return self.fill_record_info (record_list)
1287 def fill_record_info(self, record_list):
1289 Given a SFA record, fill in the senslab specific and SFA specific
1290 fields in the record.
1293 logger.debug("SLABDRIVER \tfill_record_info records %s " %(record_list))
1294 if not isinstance(record_list, list):
1295 record_list = [record_list]
1298 for record in record_list:
1299 #If the record is a SFA slice record, then add information
1300 #about the user of this slice. This kind of
1301 #information is in the Senslab's DB.
1302 if str(record['type']) == 'slice':
1303 if 'reg_researchers' in record and \
1304 isinstance(record['reg_researchers'], list) :
1305 record['reg_researchers'] = record['reg_researchers'][0].__dict__
1306 record.update({'PI':[record['reg_researchers']['hrn']],
1307 'researcher': [record['reg_researchers']['hrn']],
1308 'name':record['hrn'],
1311 'person_ids':[record['reg_researchers']['record_id']],
1312 'geni_urn':'', #For client_helper.py compatibility
1313 'keys':'', #For client_helper.py compatibility
1314 'key_ids':''}) #For client_helper.py compatibility
1317 #Get slab slice record.
1318 recslice_list = self.slab_api.GetSlices(slice_filter = \
1319 str(record['hrn']),\
1320 slice_filter_type = 'slice_hrn')
1323 logger.debug("SLABDRIVER \tfill_record_info \
1324 TYPE SLICE RECUSER record['hrn'] %s ecord['oar_job_id']\
1325 %s " %(record['hrn'], record['oar_job_id']))
1327 for rec in recslice_list:
1328 logger.debug("SLABDRIVER\r\n \t fill_record_info oar_job_id %s " %(rec['oar_job_id']))
1329 del record['reg_researchers']
1330 record['node_ids'] = [ self.slab_api.root_auth + hostname for hostname in rec['node_ids']]
1334 logger.debug( "SLABDRIVER.PY \t fill_record_info SLICE \
1335 recslice_list %s \r\n \t RECORD %s \r\n \
1336 \r\n" %(recslice_list, record))
1337 if str(record['type']) == 'user':
1338 #The record is a SFA user record.
1339 #Get the information about his slice from Senslab's DB
1340 #and add it to the user record.
1341 recslice_list = self.slab_api.GetSlices(\
1342 slice_filter = record['record_id'],\
1343 slice_filter_type = 'record_id_user')
1345 logger.debug( "SLABDRIVER.PY \t fill_record_info TYPE USER \
1346 recslice_list %s \r\n \t RECORD %s \r\n" %(recslice_list , record))
1347 #Append slice record in records list,
1348 #therefore fetches user and slice info again(one more loop)
1349 #Will update PIs and researcher for the slice
1350 #recuser = dbsession.query(RegRecord).filter_by(record_id = \
1351 #recslice_list[0]['record_id_user']).first()
1352 recuser = recslice_list[0]['reg_researchers']
1353 logger.debug( "SLABDRIVER.PY \t fill_record_info USER \
1354 recuser %s \r\n \r\n" %(recuser))
1356 recslice = recslice_list[0]
1357 recslice.update({'PI':[recuser['hrn']],
1358 'researcher': [recuser['hrn']],
1359 'name':record['hrn'],
1362 'person_ids':[recuser['record_id']]})
1364 for rec in recslice_list:
1365 recslice['oar_job_id'].append(rec['oar_job_id'])
1369 recslice.update({'type':'slice', \
1370 'hrn':recslice_list[0]['hrn']})
1373 #GetPersons takes [] as filters
1374 user_slab = self.slab_api.GetPersons([record])
1377 record.update(user_slab[0])
1378 #For client_helper.py compatibility
1379 record.update( { 'geni_urn':'',
1382 record_list.append(recslice)
1384 logger.debug("SLABDRIVER.PY \tfill_record_info ADDING SLICE\
1385 INFO TO USER records %s" %(record_list))
1387 logger.debug("SLABDRIVER.PY \tfill_record_info END \
1388 record %s \r\n \r\n " %(record))
1390 except TypeError, error:
1391 logger.log_exc("SLABDRIVER \t fill_record_info EXCEPTION %s"\
1393 #logger.debug("SLABDRIVER.PY \t fill_record_info ENDENDEND ")
1398 def sliver_status(self, slice_urn, slice_hrn):
1399 """Receive a status request for slice named urn/hrn
1400 urn:publicid:IDN+senslab+nturro_slice hrn senslab.nturro_slice
1401 shall return a structure as described in
1402 http://groups.geni.net/geni/wiki/GAPI_AM_API_V2#SliverStatus
1403 NT : not sure if we should implement this or not, but used by sface.
1407 #First get the slice with the slice hrn
1408 slice_list = self.slab_api.GetSlices(slice_filter = slice_hrn, \
1409 slice_filter_type = 'slice_hrn')
1411 if len(slice_list) is 0:
1412 raise SliverDoesNotExist("%s slice_hrn" % (slice_hrn))
1414 #Used for fetching the user info witch comes along the slice info
1415 one_slice = slice_list[0]
1418 #Make a list of all the nodes hostnames in use for this slice
1419 slice_nodes_list = []
1420 #for single_slice in slice_list:
1421 #for node in single_slice['node_ids']:
1422 #slice_nodes_list.append(node['hostname'])
1423 for node in one_slice:
1424 slice_nodes_list.append(node['hostname'])
1426 #Get all the corresponding nodes details
1427 nodes_all = self.slab_api.GetNodes({'hostname':slice_nodes_list},
1428 ['node_id', 'hostname','site','boot_state'])
1429 nodeall_byhostname = dict([(one_node['hostname'], one_node) \
1430 for one_node in nodes_all])
1434 for single_slice in slice_list:
1437 top_level_status = 'empty'
1440 ['geni_urn','pl_login','geni_status','geni_resources'], None)
1441 result['pl_login'] = one_slice['reg_researchers']['hrn']
1442 logger.debug("Slabdriver - sliver_status Sliver status \
1443 urn %s hrn %s single_slice %s \r\n " \
1444 %(slice_urn, slice_hrn, single_slice))
1446 if 'node_ids' not in single_slice:
1447 #No job in the slice
1448 result['geni_status'] = top_level_status
1449 result['geni_resources'] = []
1452 top_level_status = 'ready'
1454 #A job is running on Senslab for this slice
1455 # report about the local nodes that are in the slice only
1457 result['geni_urn'] = slice_urn
1461 #timestamp = float(sl['startTime']) + float(sl['walltime'])
1462 #result['pl_expires'] = strftime(self.time_format, \
1463 #gmtime(float(timestamp)))
1464 #result['slab_expires'] = strftime(self.time_format,\
1465 #gmtime(float(timestamp)))
1468 for node in single_slice['node_ids']:
1470 #res['slab_hostname'] = node['hostname']
1471 #res['slab_boot_state'] = node['boot_state']
1473 res['pl_hostname'] = node['hostname']
1474 res['pl_boot_state'] = \
1475 nodeall_byhostname[node['hostname']]['boot_state']
1476 #res['pl_last_contact'] = strftime(self.time_format, \
1477 #gmtime(float(timestamp)))
1478 sliver_id = Xrn(slice_urn, type='slice', \
1479 id=nodeall_byhostname[node['hostname']]['node_id'], \
1480 authority=self.hrn).urn
1482 res['geni_urn'] = sliver_id
1483 node_name = node['hostname']
1484 if nodeall_byhostname[node_name]['boot_state'] == 'Alive':
1486 res['geni_status'] = 'ready'
1488 res['geni_status'] = 'failed'
1489 top_level_status = 'failed'
1491 res['geni_error'] = ''
1493 resources.append(res)
1495 result['geni_status'] = top_level_status
1496 result['geni_resources'] = resources
1497 logger.debug("SLABDRIVER \tsliver_statusresources %s res %s "\
1502 def get_user_record( hrn):
1503 """ Returns the user record based on the hrn from the SFA DB """
1504 return dbsession.query(RegRecord).filter_by(hrn = hrn).first()
1507 def testbed_name (self):
1510 # 'geni_request_rspec_versions' and 'geni_ad_rspec_versions' are mandatory
1511 def aggregate_version (self):
1512 version_manager = VersionManager()
1513 ad_rspec_versions = []
1514 request_rspec_versions = []
1515 for rspec_version in version_manager.versions:
1516 if rspec_version.content_type in ['*', 'ad']:
1517 ad_rspec_versions.append(rspec_version.to_dict())
1518 if rspec_version.content_type in ['*', 'request']:
1519 request_rspec_versions.append(rspec_version.to_dict())
1521 'testbed':self.testbed_name(),
1522 'geni_request_rspec_versions': request_rspec_versions,
1523 'geni_ad_rspec_versions': ad_rspec_versions,
1527 def create_sliver (self, slice_urn, slice_hrn, creds, rspec_string, \
1529 aggregate = SlabAggregate(self)
1531 slices = SlabSlices(self)
1532 peer = slices.get_peer(slice_hrn)
1533 sfa_peer = slices.get_sfa_peer(slice_hrn)
1536 if not isinstance(creds, list):
1540 slice_record = users[0].get('slice_record', {})
1541 logger.debug("SLABDRIVER.PY \t ===============create_sliver \t\
1542 creds %s \r\n \r\n users %s" \
1544 slice_record['user'] = {'keys':users[0]['keys'], \
1545 'email':users[0]['email'], \
1546 'hrn':slice_record['reg-researchers'][0]}
1548 rspec = RSpec(rspec_string)
1549 logger.debug("SLABDRIVER.PY \t create_sliver \trspec.version \
1550 %s slice_record %s users %s" \
1551 %(rspec.version,slice_record, users))
1554 # ensure site record exists?
1555 # ensure slice record exists
1556 #Removed options to verify_slice SA 14/08/12
1557 sfa_slice = slices.verify_slice(slice_hrn, slice_record, peer, \
1560 # ensure person records exists
1561 #verify_persons returns added persons but since the return value
1563 slices.verify_persons(slice_hrn, sfa_slice, users, peer, \
1564 sfa_peer, options=options)
1565 #requested_attributes returned by rspec.version.get_slice_attributes()
1566 #unused, removed SA 13/08/12
1567 rspec.version.get_slice_attributes()
1569 logger.debug("SLABDRIVER.PY create_sliver slice %s " %(sfa_slice))
1571 # add/remove slice from nodes
1573 requested_slivers = [node.get('component_id') \
1574 for node in rspec.version.get_nodes_with_slivers()\
1575 if node.get('authority_id') is self.slab_api.root_auth]
1576 l = [ node for node in rspec.version.get_nodes_with_slivers() ]
1577 logger.debug("SLADRIVER \tcreate_sliver requested_slivers \
1578 requested_slivers %s listnodes %s" \
1579 %(requested_slivers,l))
1580 #verify_slice_nodes returns nodes, but unused here. Removed SA 13/08/12.
1581 #slices.verify_slice_nodes(sfa_slice, requested_slivers, peer)
1584 requested_lease_list = []
1588 for lease in rspec.version.get_leases():
1589 single_requested_lease = {}
1590 logger.debug("SLABDRIVER.PY \tcreate_sliver lease %s " %(lease))
1592 if not lease.get('lease_id'):
1593 if get_authority(lease['component_id']) == self.slab_api.root_auth:
1594 single_requested_lease['hostname'] = \
1595 slab_xrn_to_hostname(\
1596 lease.get('component_id').strip())
1597 single_requested_lease['start_time'] = \
1598 lease.get('start_time')
1599 single_requested_lease['duration'] = lease.get('duration')
1600 #Check the experiment's duration is valid before adding
1601 #the lease to the requested leases list
1602 duration_in_seconds = \
1603 int(single_requested_lease['duration'])*60
1604 if duration_in_seconds > self.slab_api.GetLeaseGranularity():
1605 requested_lease_list.append(single_requested_lease)
1607 #Create dict of leases by start_time, regrouping nodes reserved
1609 #time, for the same amount of time = one job on OAR
1610 requested_job_dict = {}
1611 for lease in requested_lease_list:
1613 #In case it is an asap experiment start_time is empty
1614 if lease['start_time'] == '':
1615 lease['start_time'] = '0'
1617 if lease['start_time'] not in requested_job_dict:
1618 if isinstance(lease['hostname'], str):
1619 lease['hostname'] = [lease['hostname']]
1621 requested_job_dict[lease['start_time']] = lease
1624 job_lease = requested_job_dict[lease['start_time']]
1625 if lease['duration'] == job_lease['duration'] :
1626 job_lease['hostname'].append(lease['hostname'])
1631 logger.debug("SLABDRIVER.PY \tcreate_sliver requested_job_dict %s "\
1632 %(requested_job_dict))
1633 #verify_slice_leases returns the leases , but the return value is unused
1634 #here. Removed SA 13/08/12
1635 slices.verify_slice_leases(sfa_slice, \
1636 requested_job_dict, peer)
1638 return aggregate.get_rspec(slice_xrn=slice_urn, \
1639 login=sfa_slice['login'], version=rspec.version)
1642 def delete_sliver (self, slice_urn, slice_hrn, creds, options):
1644 sfa_slice_list = self.slab_api.GetSlices(slice_filter = slice_hrn, \
1645 slice_filter_type = 'slice_hrn')
1647 if not sfa_slice_list:
1650 #Delete all in the slice
1651 for sfa_slice in sfa_slice_list:
1654 logger.debug("SLABDRIVER.PY delete_sliver slice %s" %(sfa_slice))
1655 slices = SlabSlices(self)
1656 # determine if this is a peer slice
1658 peer = slices.get_peer(slice_hrn)
1659 #TODO delete_sliver SA : UnBindObjectFromPeer should be
1660 #used when there is another
1661 #senslab testbed, which is not the case 14/08/12 .
1663 logger.debug("SLABDRIVER.PY delete_sliver peer %s \r\n \t sfa_slice %s " %(peer, sfa_slice))
1666 #self.slab_api.UnBindObjectFromPeer('slice', \
1667 #sfa_slice['record_id_slice'], \
1669 self.slab_api.DeleteSliceFromNodes(sfa_slice)
1675 #self.slab_api.BindObjectToPeer('slice', \
1676 #sfa_slice['record_id_slice'], \
1677 #peer, sfa_slice['peer_slice_id'])
1681 # first 2 args are None in case of resource discovery
1682 def list_resources (self, slice_urn, slice_hrn, creds, options):
1683 #cached_requested = options.get('cached', True)
1685 version_manager = VersionManager()
1686 # get the rspec's return format from options
1688 version_manager.get_version(options.get('geni_rspec_version'))
1689 version_string = "rspec_%s" % (rspec_version)
1691 #panos adding the info option to the caching key (can be improved)
1692 if options.get('info'):
1693 version_string = version_string + "_" + \
1694 options.get('info', 'default')
1696 # Adding the list_leases option to the caching key
1697 if options.get('list_leases'):
1698 version_string = version_string + "_"+options.get('list_leases', 'default')
1700 # Adding geni_available to caching key
1701 if options.get('geni_available'):
1702 version_string = version_string + "_" + str(options.get('geni_available'))
1704 # look in cache first
1705 #if cached_requested and self.cache and not slice_hrn:
1706 #rspec = self.cache.get(version_string)
1708 #logger.debug("SlabDriver.ListResources: \
1709 #returning cached advertisement")
1712 #panos: passing user-defined options
1713 aggregate = SlabAggregate(self)
1714 #origin_hrn = Credential(string=creds[0]).get_gid_caller().get_hrn()
1715 #options.update({'origin_hrn':origin_hrn})
1716 rspec = aggregate.get_rspec(slice_xrn=slice_urn, \
1717 version=rspec_version, options=options)
1720 #if self.cache and not slice_hrn:
1721 #logger.debug("Slab.ListResources: stores advertisement in cache")
1722 #self.cache.add(version_string, rspec)
1727 def list_slices (self, creds, options):
1728 # look in cache first
1730 #slices = self.cache.get('slices')
1732 #logger.debug("PlDriver.list_slices returns from cache")
1737 slices = self.slab_api.GetSlices()
1738 logger.debug("SLABDRIVER.PY \tlist_slices hrn %s \r\n \r\n" %(slices))
1739 slice_hrns = [slab_slice['hrn'] for slab_slice in slices]
1741 slice_urns = [hrn_to_urn(slice_hrn, 'slice') \
1742 for slice_hrn in slice_hrns]
1746 #logger.debug ("SlabDriver.list_slices stores value in cache")
1747 #self.cache.add('slices', slice_urns)
1752 def register (self, sfa_record, hrn, pub_key):
1754 Adding new user, slice, node or site should not be handled
1758 Adding users = LDAP Senslab
1759 Adding slice = Import from LDAP users
1765 def update (self, old_sfa_record, new_sfa_record, hrn, new_key):
1766 """No site or node record update allowed in Senslab."""
1768 pointer = old_sfa_record['pointer']
1769 old_sfa_record_type = old_sfa_record['type']
1771 # new_key implemented for users only
1772 if new_key and old_sfa_record_type not in [ 'user' ]:
1773 raise UnknownSfaType(old_sfa_record_type)
1775 #if (type == "authority"):
1776 #self.shell.UpdateSite(pointer, new_sfa_record)
1778 if old_sfa_record_type == "slice":
1779 slab_record = self.slab_api.sfa_fields_to_slab_fields(old_sfa_record_type, \
1780 hrn, new_sfa_record)
1781 if 'name' in slab_record:
1782 slab_record.pop('name')
1783 #Prototype should be UpdateSlice(self,
1784 #auth, slice_id_or_name, slice_fields)
1785 #Senslab cannot update slice since slice = job
1786 #so we must delete and create another job
1787 self.slab_api.UpdateSlice(pointer, slab_record)
1789 elif old_sfa_record_type == "user":
1791 all_fields = new_sfa_record
1792 for key in all_fields.keys():
1793 if key in ['first_name', 'last_name', 'title', 'email',
1794 'password', 'phone', 'url', 'bio', 'accepted_aup',
1796 update_fields[key] = all_fields[key]
1797 self.slab_api.UpdatePerson(pointer, update_fields)
1800 # must check this key against the previous one if it exists
1801 persons = self.slab_api.GetPersons(['key_ids'])
1803 keys = person['key_ids']
1804 keys = self.slab_api.GetKeys(person['key_ids'])
1806 # Delete all stale keys
1809 if new_key != key['key']:
1810 self.slab_api.DeleteKey(key['key_id'])
1814 self.slab_api.AddPersonKey(pointer, {'key_type': 'ssh', \
1821 def remove (self, sfa_record):
1822 sfa_record_type = sfa_record['type']
1823 hrn = sfa_record['hrn']
1824 if sfa_record_type == 'user':
1826 #get user from senslab ldap
1827 person = self.slab_api.GetPersons(sfa_record)
1828 #No registering at a given site in Senslab.
1829 #Once registered to the LDAP, all senslab sites are
1832 #Mark account as disabled in ldap
1833 self.slab_api.DeletePerson(sfa_record)
1834 elif sfa_record_type == 'slice':
1835 if self.slab_api.GetSlices(slice_filter = hrn, \
1836 slice_filter_type = 'slice_hrn'):
1837 self.slab_api.DeleteSlice(sfa_record)
1839 #elif type == 'authority':
1840 #if self.GetSites(pointer):
1841 #self.DeleteSite(pointer)