4 from datetime import datetime
6 from sfa.util.faults import SliverDoesNotExist, UnknownSfaType
7 from sfa.util.sfalogging import logger
8 from sfa.storage.alchemy import dbsession
9 from sfa.storage.model import RegRecord, RegUser, RegSlice, RegKey
10 from sqlalchemy.orm import joinedload
12 from sfa.trust.certificate import Keypair, convert_public_key
13 from sfa.trust.gid import create_uuid
14 from sfa.trust.hierarchy import Hierarchy
16 from sfa.managers.driver import Driver
17 from sfa.rspecs.version_manager import VersionManager
18 from sfa.rspecs.rspec import RSpec
20 from sfa.util.xrn import Xrn, hrn_to_urn, get_authority
23 ## thierry: everything that is API-related (i.e. handling incoming requests)
25 # SlabDriver should be really only about talking to the senslab testbed
28 from sfa.senslab.OARrestapi import OARrestapi
29 from sfa.senslab.LDAPapi import LDAPapi
31 from sfa.senslab.slabpostgres import SlabDB, slab_dbsession, SenslabXP
34 from sfa.senslab.slabaggregate import SlabAggregate, slab_xrn_to_hostname, \
36 from sfa.senslab.slabslices import SlabSlices
41 # this inheritance scheme is so that the driver object can receive
42 # GetNodes or GetSites sorts of calls directly
43 # and thus minimize the differences in the managers with the pl version
46 class SlabTestbedAPI():
48 def __init__(self, config):
49 self.oar = OARrestapi()
51 self.time_format = "%Y-%m-%d %H:%M:%S"
52 self.root_auth = config.SFA_REGISTRY_ROOT_AUTH
53 self.grain = 600 # 10 mins lease
58 #TODO clean GetPeers. 05/07/12SA
60 def GetPeers ( auth = None, peer_filter=None, return_fields_list=None):
63 existing_hrns_by_types = {}
64 logger.debug("SLABDRIVER \tGetPeers auth = %s, peer_filter %s, \
65 return_field %s " %(auth , peer_filter, return_fields_list))
66 all_records = dbsession.query(RegRecord).filter(RegRecord.type.like('%authority%')).all()
68 for record in all_records:
69 existing_records[(record.hrn, record.type)] = record
70 if record.type not in existing_hrns_by_types:
71 existing_hrns_by_types[record.type] = [record.hrn]
73 existing_hrns_by_types[record.type].append(record.hrn)
76 logger.debug("SLABDRIVER \tGetPeer\texisting_hrns_by_types %s "\
77 %( existing_hrns_by_types))
82 records_list.append(existing_records[(peer_filter,'authority')])
84 for hrn in existing_hrns_by_types['authority']:
85 records_list.append(existing_records[(hrn,'authority')])
87 logger.debug("SLABDRIVER \tGetPeer \trecords_list %s " \
93 return_records = records_list
94 if not peer_filter and not return_fields_list:
98 logger.debug("SLABDRIVER \tGetPeer return_records %s " \
100 return return_records
104 #TODO : Handling OR request in make_ldap_filters_from_records
105 #instead of the for loop
106 #over the records' list
107 def GetPersons(self, person_filter=None):
109 person_filter should be a list of dictionnaries when not set to None.
110 Returns a list of users whose accounts are enabled found in ldap.
113 logger.debug("SLABDRIVER \tGetPersons person_filter %s" \
116 if person_filter and isinstance(person_filter, list):
117 #If we are looking for a list of users (list of dict records)
118 #Usually the list contains only one user record
119 for searched_attributes in person_filter:
121 #Get only enabled user accounts in senslab LDAP :
122 #add a filter for make_ldap_filters_from_record
123 person = self.ldap.LdapFindUser(searched_attributes, \
124 is_user_enabled=True)
125 #If a person was found, append it to the list
127 person_list.append(person)
129 #If the list is empty, return None
130 if len(person_list) is 0:
134 #Get only enabled user accounts in senslab LDAP :
135 #add a filter for make_ldap_filters_from_record
136 person_list = self.ldap.LdapFindUser(is_user_enabled=True)
140 def GetTimezone(self):
141 """ Get the OAR servier time and timezone.
142 Unused SA 16/11/12"""
143 server_timestamp, server_tz = self.oar.parser.\
144 SendRequest("GET_timezone")
145 return server_timestamp, server_tz
148 def DeleteJobs(self, job_id, username):
149 logger.debug("SLABDRIVER \tDeleteJobs jobid %s username %s " %(job_id, username))
150 if not job_id or job_id is -1:
152 #username = slice_hrn.split(".")[-1].rstrip("_slice")
154 reqdict['method'] = "delete"
155 reqdict['strval'] = str(job_id)
158 answer = self.oar.POSTRequestToOARRestAPI('DELETE_jobs_id', \
160 logger.debug("SLABDRIVER \tDeleteJobs jobid %s \r\n answer %s \
161 username %s" %(job_id, answer, username))
166 ##TODO : Unused GetJobsId ? SA 05/07/12
167 #def GetJobsId(self, job_id, username = None ):
169 #Details about a specific job.
170 #Includes details about submission time, jot type, state, events,
171 #owner, assigned ressources, walltime etc...
175 #node_list_k = 'assigned_network_address'
176 ##Get job info from OAR
177 #job_info = self.oar.parser.SendRequest(req, job_id, username)
179 #logger.debug("SLABDRIVER \t GetJobsId %s " %(job_info))
181 #if job_info['state'] == 'Terminated':
182 #logger.debug("SLABDRIVER \t GetJobsId job %s TERMINATED"\
185 #if job_info['state'] == 'Error':
186 #logger.debug("SLABDRIVER \t GetJobsId ERROR message %s "\
191 #logger.error("SLABDRIVER \tGetJobsId KeyError")
194 #parsed_job_info = self.get_info_on_reserved_nodes(job_info, \
196 ##Replaces the previous entry
197 ##"assigned_network_address" / "reserved_resources"
199 #job_info.update({'node_ids':parsed_job_info[node_list_k]})
200 #del job_info[node_list_k]
201 #logger.debug(" \r\nSLABDRIVER \t GetJobsId job_info %s " %(job_info))
205 def GetJobsResources(self, job_id, username = None):
206 #job_resources=['reserved_resources', 'assigned_resources',\
207 #'job_id', 'job_uri', 'assigned_nodes',\
209 #assigned_res = ['resource_id', 'resource_uri']
210 #assigned_n = ['node', 'node_uri']
212 req = "GET_jobs_id_resources"
215 #Get job resources list from OAR
216 node_id_list = self.oar.parser.SendRequest(req, job_id, username)
217 logger.debug("SLABDRIVER \t GetJobsResources %s " %(node_id_list))
220 self.__get_hostnames_from_oar_node_ids(node_id_list)
223 #Replaces the previous entry "assigned_network_address" /
224 #"reserved_resources"
226 job_info = {'node_ids': hostname_list}
231 def get_info_on_reserved_nodes(self, job_info, node_list_name):
232 #Get the list of the testbed nodes records and make a
233 #dictionnary keyed on the hostname out of it
234 node_list_dict = self.GetNodes()
235 #node_hostname_list = []
236 node_hostname_list = [node['hostname'] for node in node_list_dict]
237 #for node in node_list_dict:
238 #node_hostname_list.append(node['hostname'])
239 node_dict = dict(zip(node_hostname_list, node_list_dict))
241 reserved_node_hostname_list = []
242 for index in range(len(job_info[node_list_name])):
243 #job_info[node_list_name][k] =
244 reserved_node_hostname_list[index] = \
245 node_dict[job_info[node_list_name][index]]['hostname']
247 logger.debug("SLABDRIVER \t get_info_on_reserved_nodes \
248 reserved_node_hostname_list %s" \
249 %(reserved_node_hostname_list))
251 logger.error("SLABDRIVER \t get_info_on_reserved_nodes KEYERROR " )
253 return reserved_node_hostname_list
255 def GetNodesCurrentlyInUse(self):
256 """Returns a list of all the nodes already involved in an oar job"""
257 return self.oar.parser.SendRequest("GET_running_jobs")
259 def __get_hostnames_from_oar_node_ids(self, resource_id_list ):
260 full_nodes_dict_list = self.GetNodes()
261 #Put the full node list into a dictionary keyed by oar node id
262 oar_id_node_dict = {}
263 for node in full_nodes_dict_list:
264 oar_id_node_dict[node['oar_id']] = node
266 #logger.debug("SLABDRIVER \t __get_hostnames_from_oar_node_ids\
267 #oar_id_node_dict %s" %(oar_id_node_dict))
269 hostname_dict_list = []
270 for resource_id in resource_id_list:
271 #Because jobs requested "asap" do not have defined resources
272 if resource_id is not "Undefined":
273 hostname_dict_list.append(\
274 oar_id_node_dict[resource_id]['hostname'])
276 #hostname_list.append(oar_id_node_dict[resource_id]['hostname'])
277 return hostname_dict_list
279 def GetReservedNodes(self, username = None):
280 #Get the nodes in use and the reserved nodes
281 reservation_dict_list = \
282 self.oar.parser.SendRequest("GET_reserved_nodes", \
286 for resa in reservation_dict_list:
287 logger.debug ("GetReservedNodes resa %s"%(resa))
288 #dict list of hostnames and their site
289 resa['reserved_nodes'] = \
290 self.__get_hostnames_from_oar_node_ids(resa['resource_ids'])
292 #del resa['resource_ids']
293 return reservation_dict_list
295 def GetNodes(self, node_filter_dict = None, return_fields_list = None):
297 node_filter_dict : dictionnary of lists
300 node_dict_by_id = self.oar.parser.SendRequest("GET_resources_full")
301 node_dict_list = node_dict_by_id.values()
302 logger.debug (" SLABDRIVER GetNodes node_filter_dict %s \
303 return_fields_list %s "%(node_filter_dict, return_fields_list))
304 #No filtering needed return the list directly
305 if not (node_filter_dict or return_fields_list):
306 return node_dict_list
308 return_node_list = []
310 for filter_key in node_filter_dict:
312 #Filter the node_dict_list by each value contained in the
313 #list node_filter_dict[filter_key]
314 for value in node_filter_dict[filter_key]:
315 for node in node_dict_list:
316 if node[filter_key] == value:
317 if return_fields_list :
319 for k in return_fields_list:
321 return_node_list.append(tmp)
323 return_node_list.append(node)
325 logger.log_exc("GetNodes KeyError")
329 return return_node_list
331 def AddSlice(slice_record, user_record):
332 """Add slice to the sfa tables. Called by verify_slice
333 during lease/sliver creation.
336 sfa_record = RegSlice(hrn=slice_record['hrn'],
337 gid=slice_record['gid'],
338 pointer=slice_record['slice_id'],
339 authority=slice_record['authority'])
341 logger.debug("SLABDRIVER.PY AddSlice sfa_record %s user_record %s" \
342 %(sfa_record, user_record))
343 sfa_record.just_created()
344 dbsession.add(sfa_record)
346 #Update the reg-researcher dependance table
347 sfa_record.reg_researchers = [user_record]
350 #Update the senslab table with the new slice
351 #slab_slice = SenslabXP( slice_hrn = slice_record['slice_hrn'], \
352 #record_id_slice = sfa_record.record_id , \
353 #record_id_user = slice_record['record_id_user'], \
354 #peer_authority = slice_record['peer_authority'])
356 #logger.debug("SLABDRIVER.PY \tAddSlice slice_record %s \
357 #slab_slice %s sfa_record %s" \
358 #%(slice_record,slab_slice, sfa_record))
359 #slab_dbsession.add(slab_slice)
360 #slab_dbsession.commit()
363 def GetSites(self, site_filter_name_list = None, return_fields_list = None):
364 site_dict = self.oar.parser.SendRequest("GET_sites")
365 #site_dict : dict where the key is the sit ename
366 return_site_list = []
367 if not ( site_filter_name_list or return_fields_list):
368 return_site_list = site_dict.values()
369 return return_site_list
371 for site_filter_name in site_filter_name_list:
372 if site_filter_name in site_dict:
373 if return_fields_list:
374 for field in return_fields_list:
377 tmp[field] = site_dict[site_filter_name][field]
379 logger.error("GetSites KeyError %s "%(field))
381 return_site_list.append(tmp)
383 return_site_list.append( site_dict[site_filter_name])
386 return return_site_list
392 #TODO : Check rights to delete person
393 def DeletePerson(self, person_record):
394 """ Disable an existing account in senslab LDAP.
395 Users and techs can only delete themselves. PIs can only
396 delete themselves and other non-PIs at their sites.
397 ins can delete anyone.
398 Returns 1 if successful, faults otherwise.
402 #Disable user account in senslab LDAP
403 ret = self.ldap.LdapMarkUserAsDeleted(person_record)
404 logger.warning("SLABDRIVER DeletePerson %s " %(person_record))
407 #TODO Check DeleteSlice, check rights 05/07/2012 SA
408 def DeleteSlice(self, slice_record):
409 """ Deletes the specified slice.
410 Senslab : Kill the job associated with the slice if there is one
411 using DeleteSliceFromNodes.
412 Updates the slice record in slab db to remove the slice nodes.
414 Users may only delete slices of which they are members. PIs may
415 delete any of the slices at their sites, or any slices of which
416 they are members. Admins may delete any slice.
417 Returns 1 if successful, faults otherwise.
421 self.DeleteSliceFromNodes(slice_record)
422 logger.warning("SLABDRIVER DeleteSlice %s "%(slice_record))
426 def __add_person_to_db(user_dict):
428 check_if_exists = dbsession.query(RegUser).filter_by(email = user_dict['email']).first()
430 if not check_if_exists:
431 logger.debug("__add_person_to_db \t Adding %s \r\n \r\n \
432 _________________________________________________________________________\
434 hrn = user_dict['hrn']
435 person_urn = hrn_to_urn(hrn, 'user')
436 pubkey = user_dict['pkey']
438 pkey = convert_public_key(pubkey)
440 #key not good. create another pkey
441 self.logger.warn('__add_person_to_db: unable to convert public \
443 pkey = Keypair(create=True)
446 if pubkey is not None and pkey is not None :
447 hierarchy = Hierarchy()
448 person_gid = hierarchy.create_gid(person_urn, create_uuid(), pkey)
449 if user_dict['email']:
450 logger.debug("__add_person_to_db \r\n \r\n SLAB IMPORTER PERSON EMAIL OK email %s " %(user_dict['email']))
451 person_gid.set_email(user_dict['email'])
453 user_record = RegUser(hrn=hrn , pointer= '-1', authority=get_authority(hrn), \
454 email=user_dict['email'], gid = person_gid)
455 user_record.reg_keys = [RegKey(user_dict['pkey'])]
456 user_record.just_created()
457 dbsession.add (user_record)
461 #TODO AddPerson 04/07/2012 SA
462 #def AddPerson(self, auth, person_fields=None):
463 def AddPerson(self, record):#TODO fixing 28/08//2012 SA
464 """Adds a new account. Any fields specified in records are used,
465 otherwise defaults are used.
466 Accounts are disabled by default. To enable an account,
468 Returns the new person_id (> 0) if successful, faults otherwise.
472 ret = self.ldap.LdapAddUser(record)
474 record['hrn'] = self.root_auth + '.' + ret['uid']
475 logger.debug("SLABDRIVER AddPerson return code %s record %s \r\n "%(ret,record))
476 self.__add_person_to_db(record)
479 #TODO AddPersonToSite 04/07/2012 SA
480 def AddPersonToSite (self, auth, person_id_or_email, \
481 site_id_or_login_base=None):
482 """ Adds the specified person to the specified site. If the person is
483 already a member of the site, no errors are returned. Does not change
484 the person's primary site.
485 Returns 1 if successful, faults otherwise.
489 logger.warning("SLABDRIVER AddPersonToSite EMPTY - DO NOTHING \r\n ")
492 #TODO AddRoleToPerson : Not sure if needed in senslab 04/07/2012 SA
493 def AddRoleToPerson(self, auth, role_id_or_name, person_id_or_email):
494 """Grants the specified role to the person.
495 PIs can only grant the tech and user roles to users and techs at their
496 sites. Admins can grant any role to any user.
497 Returns 1 if successful, faults otherwise.
501 logger.warning("SLABDRIVER AddRoleToPerson EMPTY - DO NOTHING \r\n ")
504 #TODO AddPersonKey 04/07/2012 SA
505 def AddPersonKey(self, auth, person_id_or_email, key_fields=None):
506 """Adds a new key to the specified account.
507 Non-admins can only modify their own keys.
508 Returns the new key_id (> 0) if successful, faults otherwise.
512 logger.warning("SLABDRIVER AddPersonKey EMPTY - DO NOTHING \r\n ")
515 def DeleteLeases(self, leases_id_list, slice_hrn ):
516 logger.debug("SLABDRIVER DeleteLeases leases_id_list %s slice_hrn %s \
517 \r\n " %(leases_id_list, slice_hrn))
518 for job_id in leases_id_list:
519 self.DeleteJobs(job_id, slice_hrn)
527 def LaunchExperimentOnOAR(self, added_nodes, slice_name, \
528 lease_start_time, lease_duration, slice_user=None):
530 lease_dict['lease_start_time'] = lease_start_time
531 lease_dict['lease_duration'] = lease_duration
532 lease_dict['added_nodes'] = added_nodes
533 lease_dict['slice_name'] = slice_name
534 lease_dict['slice_user'] = slice_user
535 lease_dict['grain'] = self.GetLeaseGranularity()
536 lease_dict['time_format'] = self.time_format
539 def __create_job_structure_request_for_OAR(lease_dict):
540 """ Creates the structure needed for a correct POST on OAR.
541 Makes the timestamp transformation into the appropriate format.
542 Sends the POST request to create the job with the resources in
551 reqdict['workdir'] = '/tmp'
552 reqdict['resource'] = "{network_address in ("
554 for node in lease_dict['added_nodes']:
555 logger.debug("\r\n \r\n OARrestapi \t \
556 __create_job_structure_request_for_OAR node %s" %(node))
558 # Get the ID of the node
560 reqdict['resource'] += "'" + nodeid + "', "
561 nodeid_list.append(nodeid)
563 custom_length = len(reqdict['resource'])- 2
564 reqdict['resource'] = reqdict['resource'][0:custom_length] + \
565 ")}/nodes=" + str(len(nodeid_list))
567 def __process_walltime(duration):
568 """ Calculates the walltime in seconds from the duration in H:M:S
569 specified in the RSpec.
573 # Fixing the walltime by adding a few delays.
574 # First put the walltime in seconds oarAdditionalDelay = 20;
575 # additional delay for /bin/sleep command to
576 # take in account prologue and epilogue scripts execution
577 # int walltimeAdditionalDelay = 240; additional delay
578 desired_walltime = duration
579 total_walltime = desired_walltime + 240 #+4 min Update SA 23/10/12
580 sleep_walltime = desired_walltime # 0 sec added Update SA 23/10/12
582 #Put the walltime back in str form
584 walltime.append(str(total_walltime / 3600))
585 total_walltime = total_walltime - 3600 * int(walltime[0])
586 #Get the remaining minutes
587 walltime.append(str(total_walltime / 60))
588 total_walltime = total_walltime - 60 * int(walltime[1])
590 walltime.append(str(total_walltime))
593 logger.log_exc(" __process_walltime duration null")
595 return walltime, sleep_walltime
598 walltime, sleep_walltime = \
599 __process_walltime(int(lease_dict['lease_duration'])*lease_dict['grain'])
602 reqdict['resource'] += ",walltime=" + str(walltime[0]) + \
603 ":" + str(walltime[1]) + ":" + str(walltime[2])
604 reqdict['script_path'] = "/bin/sleep " + str(sleep_walltime)
606 #In case of a scheduled experiment (not immediate)
607 #To run an XP immediately, don't specify date and time in RSpec
608 #They will be set to None.
609 if lease_dict['lease_start_time'] is not '0':
610 #Readable time accepted by OAR
611 start_time = datetime.fromtimestamp(int(lease_dict['lease_start_time'])).\
612 strftime(lease_dict['time_format'])
613 reqdict['reservation'] = start_time
614 #If there is not start time, Immediate XP. No need to add special
618 reqdict['type'] = "deploy"
619 reqdict['directory'] = ""
620 reqdict['name'] = "SFA_" + lease_dict['slice_user']
624 logger.debug("SLABDRIVER.PY \tLaunchExperimentOnOAR slice_user %s\
625 \r\n " %(slice_user))
626 #Create the request for OAR
627 reqdict = __create_job_structure_request_for_OAR(lease_dict)
628 # first step : start the OAR job and update the job
629 logger.debug("SLABDRIVER.PY \tLaunchExperimentOnOAR reqdict %s\
632 answer = self.oar.POSTRequestToOARRestAPI('POST_job', \
634 logger.debug("SLABDRIVER \tLaunchExperimentOnOAR jobid %s " %(answer))
638 logger.log_exc("SLABDRIVER \tLaunchExperimentOnOAR \
639 Impossible to create job %s " %(answer))
643 def __configure_experiment(jobid, added_nodes):
644 # second step : configure the experiment
645 # we need to store the nodes in a yaml (well...) file like this :
646 # [1,56,23,14,45,75] with name /tmp/sfa<jobid>.json
647 tmp_dir = '/tmp/sfa/'
648 if not os.path.exists(tmp_dir):
650 job_file = open(tmp_dir + str(jobid) + '.json', 'w')
652 job_file.write(str(added_nodes[0].strip('node')))
653 for node in added_nodes[1:len(added_nodes)] :
654 job_file.write(', '+ node.strip('node'))
659 def __launch_senslab_experiment(jobid):
660 # third step : call the senslab-experiment wrapper
661 #command= "java -jar target/sfa-1.0-jar-with-dependencies.jar
662 # "+str(jobid)+" "+slice_user
663 javacmdline = "/usr/bin/java"
665 "/opt/senslabexperimentwrapper/sfa-1.0-jar-with-dependencies.jar"
667 output = subprocess.Popen([javacmdline, "-jar", jarname, str(jobid), \
668 slice_user],stdout=subprocess.PIPE).communicate()[0]
670 logger.debug("SLABDRIVER \t __configure_experiment wrapper returns%s " \
677 logger.debug("SLABDRIVER \tLaunchExperimentOnOAR jobid %s \
678 added_nodes %s slice_user %s" %(jobid, added_nodes, slice_user))
681 #__configure_experiment(jobid, added_nodes)
682 #__launch_senslab_experiment(jobid)
687 def AddLeases(self, hostname_list, slice_record, \
688 lease_start_time, lease_duration):
689 logger.debug("SLABDRIVER \r\n \r\n \t AddLeases hostname_list %s \
690 slice_record %s lease_start_time %s lease_duration %s "\
691 %( hostname_list, slice_record , lease_start_time, \
694 #tmp = slice_record['reg-researchers'][0].split(".")
695 username = slice_record['login']
696 #username = tmp[(len(tmp)-1)]
697 job_id = self.LaunchExperimentOnOAR(hostname_list, slice_record['hrn'], \
698 lease_start_time, lease_duration, username)
699 start_time = datetime.fromtimestamp(int(lease_start_time)).strftime(self.time_format)
700 end_time = lease_start_time + lease_duration
702 import logging, logging.handlers
703 from sfa.util.sfalogging import _SfaLogger
704 logger.debug("SLABDRIVER \r\n \r\n \t AddLeases TURN ON LOGGING SQL %s %s %s "%(slice_record['hrn'], job_id, end_time))
705 sql_logger = _SfaLogger(loggername = 'sqlalchemy.engine', level=logging.DEBUG)
706 logger.debug("SLABDRIVER \r\n \r\n \t AddLeases %s %s %s " %(type(slice_record['hrn']), type(job_id), type(end_time)))
708 slab_ex_row = SenslabXP(slice_hrn = slice_record['hrn'], \
709 job_id = job_id, end_time= end_time)
711 logger.debug("SLABDRIVER \r\n \r\n \t AddLeases slab_ex_row %s" \
713 slab_dbsession.add(slab_ex_row)
714 slab_dbsession.commit()
716 logger.debug("SLABDRIVER \t AddLeases hostname_list start_time %s " %(start_time))
721 #Delete the jobs from job_senslab table
722 def DeleteSliceFromNodes(self, slice_record):
723 logger.debug("SLABDRIVER \t DeleteSliceFromNodese %s " %(slice_record))
724 if isinstance(slice_record['oar_job_id'],list):
725 for job_id in slice_record['oar_job_id']:
726 self.DeleteJobs(job_id, slice_record['user'])
728 self.DeleteJobs(slice_record['oar_job_id'],slice_record['user'])
732 def GetLeaseGranularity(self):
733 """ Returns the granularity of Senslab testbed.
734 OAR returns seconds for experiments duration.
736 Experiments which last less than 10 min are invalid"""
743 def update_jobs_in_slabdb( job_oar_list, jobs_psql):
744 #Get all the entries in slab_xp table
747 jobs_psql = set(jobs_psql)
748 kept_jobs = set(job_oar_list).intersection(jobs_psql)
749 logger.debug ( "\r\n \t\ update_jobs_in_slabdb jobs_psql %s \r\n \t \
750 job_oar_list %s kept_jobs %s "%(jobs_psql, job_oar_list, kept_jobs))
751 deleted_jobs = set(jobs_psql).difference(kept_jobs)
752 deleted_jobs = list(deleted_jobs)
753 if len(deleted_jobs) > 0:
754 slab_dbsession.query(SenslabXP).filter(SenslabXP.job_id.in_(deleted_jobs)).delete(synchronize_session='fetch')
755 slab_dbsession.commit()
761 def GetLeases(self, lease_filter_dict=None, login=None):
764 unfiltered_reservation_list = self.GetReservedNodes(login)
766 reservation_list = []
767 #Find the slice associated with this user senslab ldap uid
768 logger.debug(" SLABDRIVER.PY \tGetLeases login %s\
769 unfiltered_reservation_list %s " %(login, unfiltered_reservation_list))
770 #Create user dict first to avoid looking several times for
771 #the same user in LDAP SA 27/07/12
775 jobs_psql_query = slab_dbsession.query(SenslabXP).all()
776 jobs_psql_dict = [ (row.job_id, row.__dict__ )for row in jobs_psql_query ]
777 jobs_psql_dict = dict(jobs_psql_dict)
778 logger.debug("SLABDRIVER \tGetLeases jobs_psql_dict %s"\
780 jobs_psql_id_list = [ row.job_id for row in jobs_psql_query ]
784 for resa in unfiltered_reservation_list:
785 logger.debug("SLABDRIVER \tGetLeases USER %s"\
787 #Cosntruct list of jobs (runing, waiting..) in oar
788 job_oar_list.append(resa['lease_id'])
789 #If there is information on the job in SLAB DB (slice used and job id)
790 if resa['lease_id'] in jobs_psql_dict:
791 job_info = jobs_psql_dict[resa['lease_id']]
792 logger.debug("SLABDRIVER \tGetLeases resa_user_dict %s"\
794 resa['slice_hrn'] = job_info['slice_hrn']
795 resa['slice_id'] = hrn_to_urn(resa['slice_hrn'], 'slice')
797 #Assume it is a senslab slice:
799 resa['slice_id'] = hrn_to_urn(self.root_auth+'.'+ resa['user'] +"_slice" , 'slice')
800 #if resa['user'] not in resa_user_dict:
801 #logger.debug("SLABDRIVER \tGetLeases userNOTIN ")
802 #ldap_info = self.ldap.LdapSearch('(uid='+resa['user']+')')
804 #ldap_info = ldap_info[0][1]
805 ##Get the backref :relationship table reg-researchers
806 #user = dbsession.query(RegUser).options(joinedload('reg_slices_as_researcher')).filter_by(email = \
807 #ldap_info['mail'][0])
810 #user = user.__dict__
811 #slice_info = user['reg_slices_as_researcher'][0].__dict__
812 ##Separated in case user not in database :
813 ##record_id not defined SA 17/07//12
815 ##query_slice_info = slab_dbsession.query(SenslabXP).filter_by(record_id_user = user.record_id)
816 ##if query_slice_info:
817 ##slice_info = query_slice_info.first()
821 #resa_user_dict[resa['user']] = {}
822 #resa_user_dict[resa['user']]['ldap_info'] = user
823 #resa_user_dict[resa['user']]['slice_info'] = slice_info
825 #resa['slice_hrn'] = resa_user_dict[resa['user']]['slice_info']['hrn']
826 #resa['slice_id'] = hrn_to_urn(resa['slice_hrn'], 'slice')
828 resa['slice_hrn'] = Xrn(resa['slice_id']).get_hrn()
830 resa['component_id_list'] = []
831 #Transform the hostnames into urns (component ids)
832 for node in resa['reserved_nodes']:
833 #resa['component_id_list'].append(hostname_to_urn(self.hrn, \
834 #self.root_auth, node['hostname']))
835 slab_xrn = slab_xrn_object(self.root_auth, node)
836 resa['component_id_list'].append(slab_xrn.urn)
838 if lease_filter_dict:
839 logger.debug("SLABDRIVER \tGetLeases resa_ %s \r\n leasefilter %s"\
840 %(resa,lease_filter_dict))
842 if lease_filter_dict['name'] == resa['slice_hrn']:
843 reservation_list.append(resa)
845 if lease_filter_dict is None:
846 reservation_list = unfiltered_reservation_list
848 #del unfiltered_reservation_list[unfiltered_reservation_list.index(resa)]
851 self.update_jobs_in_slabdb(job_oar_list, jobs_psql_id_list)
853 #for resa in unfiltered_reservation_list:
857 #if resa['user'] in resa_user_dict:
858 #resa['slice_hrn'] = resa_user_dict[resa['user']]['slice_info']['hrn']
859 #resa['slice_id'] = hrn_to_urn(resa['slice_hrn'], 'slice')
861 ##resa['slice_id'] = hrn_to_urn(slice_info.slice_hrn, 'slice')
862 #resa['component_id_list'] = []
863 ##Transform the hostnames into urns (component ids)
864 #for node in resa['reserved_nodes']:
865 ##resa['component_id_list'].append(hostname_to_urn(self.hrn, \
866 ##self.root_auth, node['hostname']))
867 #slab_xrn = slab_xrn_object(self.root_auth, node)
868 #resa['component_id_list'].append(slab_xrn.urn)
870 ##Filter the reservation list if necessary
871 ##Returns all the leases associated with a given slice
872 #if lease_filter_dict:
873 #logger.debug("SLABDRIVER \tGetLeases lease_filter_dict %s"\
874 #%(lease_filter_dict))
875 #for resa in unfiltered_reservation_list:
876 #if lease_filter_dict['name'] == resa['slice_hrn']:
877 #reservation_list.append(resa)
879 #reservation_list = unfiltered_reservation_list
881 logger.debug(" SLABDRIVER.PY \tGetLeases reservation_list %s"\
883 return reservation_list
888 #TODO FUNCTIONS SECTION 04/07/2012 SA
890 #TODO : Is UnBindObjectFromPeer still necessary ? Currently does nothing
893 def UnBindObjectFromPeer( auth, object_type, object_id, shortname):
894 """ This method is a hopefully temporary hack to let the sfa correctly
895 detach the objects it creates from a remote peer object. This is
896 needed so that the sfa federation link can work in parallel with
897 RefreshPeer, as RefreshPeer depends on remote objects being correctly
900 auth : struct, API authentication structure
901 AuthMethod : string, Authentication method to use
902 object_type : string, Object type, among 'site','person','slice',
904 object_id : int, object_id
905 shortname : string, peer shortname
909 logger.warning("SLABDRIVER \tUnBindObjectFromPeer EMPTY-\
913 #TODO Is BindObjectToPeer still necessary ? Currently does nothing
915 def BindObjectToPeer(self, auth, object_type, object_id, shortname=None, \
916 remote_object_id=None):
917 """This method is a hopefully temporary hack to let the sfa correctly
918 attach the objects it creates to a remote peer object. This is needed
919 so that the sfa federation link can work in parallel with RefreshPeer,
920 as RefreshPeer depends on remote objects being correctly marked.
922 shortname : string, peer shortname
923 remote_object_id : int, remote object_id, set to 0 if unknown
927 logger.warning("SLABDRIVER \tBindObjectToPeer EMPTY - DO NOTHING \r\n ")
930 #TODO UpdateSlice 04/07/2012 SA
931 #Funciton should delete and create another job since oin senslab slice=job
932 def UpdateSlice(self, auth, slice_id_or_name, slice_fields=None):
933 """Updates the parameters of an existing slice with the values in
935 Users may only update slices of which they are members.
936 PIs may update any of the slices at their sites, or any slices of
937 which they are members. Admins may update any slice.
938 Only PIs and admins may update max_nodes. Slices cannot be renewed
939 (by updating the expires parameter) more than 8 weeks into the future.
940 Returns 1 if successful, faults otherwise.
944 logger.warning("SLABDRIVER UpdateSlice EMPTY - DO NOTHING \r\n ")
947 #TODO UpdatePerson 04/07/2012 SA
948 def UpdatePerson(self, slab_hrn, federated_hrn, person_fields=None):
949 """Updates a person. Only the fields specified in person_fields
950 are updated, all other fields are left untouched.
951 Users and techs can only update themselves. PIs can only update
952 themselves and other non-PIs at their sites.
953 Returns 1 if successful, faults otherwise.
957 #new_row = FederatedToSenslab(slab_hrn, federated_hrn)
958 #slab_dbsession.add(new_row)
959 #slab_dbsession.commit()
961 logger.debug("SLABDRIVER UpdatePerson EMPTY - DO NOTHING \r\n ")
964 #TODO GetKeys 04/07/2012 SA
965 def GetKeys(self, auth, key_filter=None, return_fields=None):
966 """Returns an array of structs containing details about keys.
967 If key_filter is specified and is an array of key identifiers,
968 or a struct of key attributes, only keys matching the filter
969 will be returned. If return_fields is specified, only the
970 specified details will be returned.
972 Admin may query all keys. Non-admins may only query their own keys.
976 logger.warning("SLABDRIVER GetKeys EMPTY - DO NOTHING \r\n ")
979 #TODO DeleteKey 04/07/2012 SA
980 def DeleteKey(self, key_id):
982 Non-admins may only delete their own keys.
983 Returns 1 if successful, faults otherwise.
987 logger.warning("SLABDRIVER DeleteKey EMPTY - DO NOTHING \r\n ")
994 def _sql_get_slice_info( slice_filter ):
995 #DO NOT USE RegSlice - reg_researchers to get the hrn
996 #of the user otherwise will mess up the RegRecord in
997 #Resolve, don't know why - SA 08/08/2012
999 #Only one entry for one user = one slice in slab_xp table
1000 #slicerec = dbsession.query(RegRecord).filter_by(hrn = slice_filter).first()
1001 raw_slicerec = dbsession.query(RegSlice).options(joinedload('reg_researchers')).filter_by(hrn = slice_filter).first()
1002 #raw_slicerec = dbsession.query(RegRecord).filter_by(hrn = slice_filter).first()
1004 #load_reg_researcher
1005 #raw_slicerec.reg_researchers
1006 raw_slicerec = raw_slicerec.__dict__
1007 logger.debug(" SLABDRIVER \t get_slice_info slice_filter %s \
1008 raw_slicerec %s"%(slice_filter, raw_slicerec))
1009 slicerec = raw_slicerec
1010 #only one researcher per slice so take the first one
1011 #slicerec['reg_researchers'] = raw_slicerec['reg_researchers']
1012 #del slicerec['reg_researchers']['_sa_instance_state']
1019 def _sql_get_slice_info_from_user(slice_filter ):
1020 #slicerec = dbsession.query(RegRecord).filter_by(record_id = slice_filter).first()
1021 raw_slicerec = dbsession.query(RegUser).options(joinedload('reg_slices_as_researcher')).filter_by(record_id = slice_filter).first()
1022 #raw_slicerec = dbsession.query(RegRecord).filter_by(record_id = slice_filter).first()
1023 #Put it in correct order
1024 user_needed_fields = ['peer_authority', 'hrn', 'last_updated', 'classtype', 'authority', 'gid', 'record_id', 'date_created', 'type', 'email', 'pointer']
1025 slice_needed_fields = ['peer_authority', 'hrn', 'last_updated', 'classtype', 'authority', 'gid', 'record_id', 'date_created', 'type', 'pointer']
1027 #raw_slicerec.reg_slices_as_researcher
1028 raw_slicerec = raw_slicerec.__dict__
1031 dict([(k, raw_slicerec['reg_slices_as_researcher'][0].__dict__[k]) \
1032 for k in slice_needed_fields])
1033 slicerec['reg_researchers'] = dict([(k, raw_slicerec[k]) \
1034 for k in user_needed_fields])
1035 #TODO Handle multiple slices for one user SA 10/12/12
1036 #for now only take the first slice record associated to the rec user
1037 ##slicerec = raw_slicerec['reg_slices_as_researcher'][0].__dict__
1038 #del raw_slicerec['reg_slices_as_researcher']
1039 #slicerec['reg_researchers'] = raw_slicerec
1040 ##del slicerec['_sa_instance_state']
1047 def _get_slice_records(self, slice_filter = None, \
1048 slice_filter_type = None):
1052 #Get list of slices based on the slice hrn
1053 if slice_filter_type == 'slice_hrn':
1055 #if get_authority(slice_filter) == self.root_auth:
1056 #login = slice_filter.split(".")[1].split("_")[0]
1058 slicerec = self._sql_get_slice_info(slice_filter)
1060 if slicerec is None:
1064 #Get slice based on user id
1065 if slice_filter_type == 'record_id_user':
1067 slicerec = self._sql_get_slice_info_from_user(slice_filter)
1070 fixed_slicerec_dict = slicerec
1071 #At this point if the there is no login it means
1072 #record_id_user filter has been used for filtering
1074 ##If theslice record is from senslab
1075 #if fixed_slicerec_dict['peer_authority'] is None:
1076 #login = fixed_slicerec_dict['hrn'].split(".")[1].split("_")[0]
1077 #return login, fixed_slicerec_dict
1078 return fixed_slicerec_dict
1082 def GetSlices(self, slice_filter = None, slice_filter_type = None, login=None):
1083 """ Get the slice records from the slab db.
1084 Returns a slice ditc if slice_filter and slice_filter_type
1086 Returns a list of slice dictionnaries if there are no filters
1091 authorized_filter_types_list = ['slice_hrn', 'record_id_user']
1092 return_slicerec_dictlist = []
1094 #First try to get information on the slice based on the filter provided
1095 if slice_filter_type in authorized_filter_types_list:
1096 fixed_slicerec_dict = \
1097 self._get_slice_records(slice_filter, slice_filter_type)
1098 #login, fixed_slicerec_dict = \
1099 #self._get_slice_records(slice_filter, slice_filter_type)
1100 logger.debug(" SLABDRIVER \tGetSlices login %s \
1101 slice record %s slice_filter %s slice_filter_type %s "\
1102 %(login, fixed_slicerec_dict,slice_filter, slice_filter_type))
1105 #Now we have the slice record fixed_slicerec_dict, get the
1106 #jobs associated to this slice
1107 #leases_list = self.GetReservedNodes(username = login)
1108 leases_list = self.GetLeases(login = login)
1109 #If no job is running or no job scheduled
1110 #return only the slice record
1111 if leases_list == [] and fixed_slicerec_dict:
1112 return_slicerec_dictlist.append(fixed_slicerec_dict)
1114 #If several jobs for one slice , put the slice record into
1115 # each lease information dict
1118 for lease in leases_list :
1120 logger.debug("SLABDRIVER.PY \tGetSlices slice_filter %s \
1121 \ lease['slice_hrn'] %s" \
1122 %(slice_filter, lease['slice_hrn']))
1123 if slice_filter_type =='slice_hrn' and lease['slice_hrn'] == slice_filter:
1124 reserved_list = lease['reserved_nodes']
1125 slicerec_dict['slice_hrn'] = lease['slice_hrn']
1126 slicerec_dict['hrn'] = lease['slice_hrn']
1127 slicerec_dict['user'] = lease['user']
1128 slicerec_dict['oar_job_id'] = lease['lease_id']
1129 slicerec_dict.update({'list_node_ids':{'hostname':reserved_list}})
1130 slicerec_dict.update({'node_ids':lease['reserved_nodes']})
1132 #Update lease dict with the slice record
1133 if fixed_slicerec_dict:
1134 fixed_slicerec_dict['oar_job_id'] = []
1135 fixed_slicerec_dict['oar_job_id'].append(slicerec_dict['oar_job_id'])
1136 slicerec_dict.update(fixed_slicerec_dict)
1137 #slicerec_dict.update({'hrn':\
1138 #str(fixed_slicerec_dict['slice_hrn'])})
1140 return_slicerec_dictlist.append(slicerec_dict)
1141 logger.debug("SLABDRIVER.PY \tGetSlices \
1142 OHOHOHOH %s" %(return_slicerec_dictlist ))
1144 logger.debug("SLABDRIVER.PY \tGetSlices \
1145 slicerec_dict %s return_slicerec_dictlist %s \
1146 lease['reserved_nodes'] \
1147 %s" %(slicerec_dict, return_slicerec_dictlist, \
1148 lease['reserved_nodes'] ))
1150 logger.debug("SLABDRIVER.PY \tGetSlices RETURN \
1151 return_slicerec_dictlist %s" \
1152 %(return_slicerec_dictlist))
1154 return return_slicerec_dictlist
1158 #Get all slices from the senslab sfa database ,
1159 #put them in dict format
1160 #query_slice_list = dbsession.query(RegRecord).all()
1161 query_slice_list = dbsession.query(RegSlice).options(joinedload('reg_researchers')).all()
1162 #query_slice_list = dbsession.query(RegRecord).filter_by(type='slice').all()
1163 #query_slice_list = slab_dbsession.query(SenslabXP).all()
1164 return_slicerec_dictlist = []
1165 for record in query_slice_list:
1166 tmp = record.__dict__
1167 tmp['reg_researchers'] = tmp['reg_researchers'][0].__dict__
1168 #del tmp['reg_researchers']['_sa_instance_state']
1169 return_slicerec_dictlist.append(tmp)
1170 #return_slicerec_dictlist.append(record.__dict__)
1172 #Get all the jobs reserved nodes
1173 leases_list = self.GetReservedNodes()
1176 for fixed_slicerec_dict in return_slicerec_dictlist:
1178 #Check if the slice belongs to a senslab user
1179 if fixed_slicerec_dict['peer_authority'] is None:
1180 owner = fixed_slicerec_dict['hrn'].split(".")[1].split("_")[0]
1183 for lease in leases_list:
1184 if owner == lease['user']:
1185 slicerec_dict['oar_job_id'] = lease['lease_id']
1187 #for reserved_node in lease['reserved_nodes']:
1188 logger.debug("SLABDRIVER.PY \tGetSlices lease %s "\
1191 reserved_list = lease['reserved_nodes']
1193 slicerec_dict.update({'node_ids':lease['reserved_nodes']})
1194 slicerec_dict.update({'list_node_ids':{'hostname':reserved_list}})
1195 slicerec_dict.update(fixed_slicerec_dict)
1196 #slicerec_dict.update({'hrn':\
1197 #str(fixed_slicerec_dict['slice_hrn'])})
1198 #return_slicerec_dictlist.append(slicerec_dict)
1199 fixed_slicerec_dict.update(slicerec_dict)
1201 logger.debug("SLABDRIVER.PY \tGetSlices RETURN \
1202 return_slicerec_dictlist %s \slice_filter %s " \
1203 %(return_slicerec_dictlist, slice_filter))
1205 return return_slicerec_dictlist
1211 # Convert SFA fields to PLC fields for use when registering up updating
1212 # registry record in the PLC database
1214 # @param type type of record (user, slice, ...)
1215 # @param hrn human readable name
1216 # @param sfa_fields dictionary of SFA fields
1217 # @param slab_fields dictionary of PLC fields (output)
1219 def sfa_fields_to_slab_fields(sfa_type, hrn, record):
1223 #for field in record:
1224 # slab_record[field] = record[field]
1226 if sfa_type == "slice":
1227 #instantion used in get_slivers ?
1228 if not "instantiation" in slab_record:
1229 slab_record["instantiation"] = "senslab-instantiated"
1230 #slab_record["hrn"] = hrn_to_pl_slicename(hrn)
1231 #Unused hrn_to_pl_slicename because Slab's hrn already
1232 #in the appropriate form SA 23/07/12
1233 slab_record["hrn"] = hrn
1234 logger.debug("SLABDRIVER.PY sfa_fields_to_slab_fields \
1235 slab_record %s " %(slab_record['hrn']))
1237 slab_record["url"] = record["url"]
1238 if "description" in record:
1239 slab_record["description"] = record["description"]
1240 if "expires" in record:
1241 slab_record["expires"] = int(record["expires"])
1243 #nodes added by OAR only and then imported to SFA
1244 #elif type == "node":
1245 #if not "hostname" in slab_record:
1246 #if not "hostname" in record:
1247 #raise MissingSfaInfo("hostname")
1248 #slab_record["hostname"] = record["hostname"]
1249 #if not "model" in slab_record:
1250 #slab_record["model"] = "geni"
1253 #elif type == "authority":
1254 #slab_record["login_base"] = hrn_to_slab_login_base(hrn)
1256 #if not "name" in slab_record:
1257 #slab_record["name"] = hrn
1259 #if not "abbreviated_name" in slab_record:
1260 #slab_record["abbreviated_name"] = hrn
1262 #if not "enabled" in slab_record:
1263 #slab_record["enabled"] = True
1265 #if not "is_public" in slab_record:
1266 #slab_record["is_public"] = True
1273 def __transforms_timestamp_into_date(self, xp_utc_timestamp = None):
1274 """ Transforms unix timestamp into valid OAR date format """
1276 #Used in case of a scheduled experiment (not immediate)
1277 #To run an XP immediately, don't specify date and time in RSpec
1278 #They will be set to None.
1279 if xp_utc_timestamp:
1280 #transform the xp_utc_timestamp into server readable time
1281 xp_server_readable_date = datetime.fromtimestamp(int(\
1282 xp_utc_timestamp)).strftime(self.time_format)
1284 return xp_server_readable_date
1302 class SlabDriver(Driver):
1303 """ Senslab Driver class inherited from Driver generic class.
1305 Contains methods compliant with the SFA standard and the testbed
1306 infrastructure (calls to LDAP and OAR).
1308 def __init__(self, config):
1309 Driver.__init__ (self, config)
1310 self.config = config
1311 self.hrn = config.SFA_INTERFACE_HRN
1313 self.db = SlabDB(config, debug = False)
1314 self.slab_api = SlabTestbedAPI(config)
1317 def augment_records_with_testbed_info (self, record_list ):
1318 """ Adds specific testbed info to the records. """
1319 return self.fill_record_info (record_list)
1321 def fill_record_info(self, record_list):
1323 Given a SFA record, fill in the senslab specific and SFA specific
1324 fields in the record.
1327 logger.debug("SLABDRIVER \tfill_record_info records %s " %(record_list))
1328 if not isinstance(record_list, list):
1329 record_list = [record_list]
1332 for record in record_list:
1333 #If the record is a SFA slice record, then add information
1334 #about the user of this slice. This kind of
1335 #information is in the Senslab's DB.
1336 if str(record['type']) == 'slice':
1337 if 'reg_researchers' in record and \
1338 isinstance(record['reg_researchers'], list) :
1339 record['reg_researchers'] = record['reg_researchers'][0].__dict__
1340 record.update({'PI':[record['reg_researchers']['hrn']],
1341 'researcher': [record['reg_researchers']['hrn']],
1342 'name':record['hrn'],
1345 'person_ids':[record['reg_researchers']['record_id']],
1346 'geni_urn':'', #For client_helper.py compatibility
1347 'keys':'', #For client_helper.py compatibility
1348 'key_ids':''}) #For client_helper.py compatibility
1351 #Get slab slice record.
1352 recslice_list = self.slab_api.GetSlices(slice_filter = \
1353 str(record['hrn']),\
1354 slice_filter_type = 'slice_hrn')
1357 logger.debug("SLABDRIVER \tfill_record_info \
1358 TYPE SLICE RECUSER record['hrn'] %s ecord['oar_job_id']\
1359 %s " %(record['hrn'], record['oar_job_id']))
1361 for rec in recslice_list:
1362 logger.debug("SLABDRIVER\r\n \t fill_record_info oar_job_id %s " %(rec['oar_job_id']))
1363 del record['reg_researchers']
1364 record['node_ids'] = [ self.slab_api.root_auth + hostname for hostname in rec['node_ids']]
1368 logger.debug( "SLABDRIVER.PY \t fill_record_info SLICE \
1369 recslice_list %s \r\n \t RECORD %s \r\n \
1370 \r\n" %(recslice_list, record))
1371 if str(record['type']) == 'user':
1372 #The record is a SFA user record.
1373 #Get the information about his slice from Senslab's DB
1374 #and add it to the user record.
1375 recslice_list = self.slab_api.GetSlices(\
1376 slice_filter = record['record_id'],\
1377 slice_filter_type = 'record_id_user')
1379 logger.debug( "SLABDRIVER.PY \t fill_record_info TYPE USER \
1380 recslice_list %s \r\n \t RECORD %s \r\n" %(recslice_list , record))
1381 #Append slice record in records list,
1382 #therefore fetches user and slice info again(one more loop)
1383 #Will update PIs and researcher for the slice
1384 #recuser = dbsession.query(RegRecord).filter_by(record_id = \
1385 #recslice_list[0]['record_id_user']).first()
1386 recuser = recslice_list[0]['reg_researchers']
1387 logger.debug( "SLABDRIVER.PY \t fill_record_info USER \
1388 recuser %s \r\n \r\n" %(recuser))
1390 recslice = recslice_list[0]
1391 recslice.update({'PI':[recuser['hrn']],
1392 'researcher': [recuser['hrn']],
1393 'name':record['hrn'],
1396 'person_ids':[recuser['record_id']]})
1398 for rec in recslice_list:
1399 recslice['oar_job_id'].append(rec['oar_job_id'])
1403 recslice.update({'type':'slice', \
1404 'hrn':recslice_list[0]['hrn']})
1407 #GetPersons takes [] as filters
1408 user_slab = self.slab_api.GetPersons([record])
1411 record.update(user_slab[0])
1412 #For client_helper.py compatibility
1413 record.update( { 'geni_urn':'',
1416 record_list.append(recslice)
1418 logger.debug("SLABDRIVER.PY \tfill_record_info ADDING SLICE\
1419 INFO TO USER records %s" %(record_list))
1421 logger.debug("SLABDRIVER.PY \tfill_record_info END \
1422 record %s \r\n \r\n " %(record))
1424 except TypeError, error:
1425 logger.log_exc("SLABDRIVER \t fill_record_info EXCEPTION %s"\
1427 #logger.debug("SLABDRIVER.PY \t fill_record_info ENDENDEND ")
1432 def sliver_status(self, slice_urn, slice_hrn):
1433 """Receive a status request for slice named urn/hrn
1434 urn:publicid:IDN+senslab+nturro_slice hrn senslab.nturro_slice
1435 shall return a structure as described in
1436 http://groups.geni.net/geni/wiki/GAPI_AM_API_V2#SliverStatus
1437 NT : not sure if we should implement this or not, but used by sface.
1441 #First get the slice with the slice hrn
1442 slice_list = self.slab_api.GetSlices(slice_filter = slice_hrn, \
1443 slice_filter_type = 'slice_hrn')
1445 if len(slice_list) is 0:
1446 raise SliverDoesNotExist("%s slice_hrn" % (slice_hrn))
1448 #Used for fetching the user info witch comes along the slice info
1449 one_slice = slice_list[0]
1452 #Make a list of all the nodes hostnames in use for this slice
1453 slice_nodes_list = []
1454 #for single_slice in slice_list:
1455 #for node in single_slice['node_ids']:
1456 #slice_nodes_list.append(node['hostname'])
1457 for node in one_slice:
1458 slice_nodes_list.append(node['hostname'])
1460 #Get all the corresponding nodes details
1461 nodes_all = self.slab_api.GetNodes({'hostname':slice_nodes_list},
1462 ['node_id', 'hostname','site','boot_state'])
1463 nodeall_byhostname = dict([(one_node['hostname'], one_node) \
1464 for one_node in nodes_all])
1468 for single_slice in slice_list:
1471 top_level_status = 'empty'
1474 ['geni_urn','pl_login','geni_status','geni_resources'], None)
1475 result['pl_login'] = one_slice['reg_researchers']['hrn']
1476 logger.debug("Slabdriver - sliver_status Sliver status \
1477 urn %s hrn %s single_slice %s \r\n " \
1478 %(slice_urn, slice_hrn, single_slice))
1480 if 'node_ids' not in single_slice:
1481 #No job in the slice
1482 result['geni_status'] = top_level_status
1483 result['geni_resources'] = []
1486 top_level_status = 'ready'
1488 #A job is running on Senslab for this slice
1489 # report about the local nodes that are in the slice only
1491 result['geni_urn'] = slice_urn
1495 #timestamp = float(sl['startTime']) + float(sl['walltime'])
1496 #result['pl_expires'] = strftime(self.time_format, \
1497 #gmtime(float(timestamp)))
1498 #result['slab_expires'] = strftime(self.time_format,\
1499 #gmtime(float(timestamp)))
1502 for node in single_slice['node_ids']:
1504 #res['slab_hostname'] = node['hostname']
1505 #res['slab_boot_state'] = node['boot_state']
1507 res['pl_hostname'] = node['hostname']
1508 res['pl_boot_state'] = \
1509 nodeall_byhostname[node['hostname']]['boot_state']
1510 #res['pl_last_contact'] = strftime(self.time_format, \
1511 #gmtime(float(timestamp)))
1512 sliver_id = Xrn(slice_urn, type='slice', \
1513 id=nodeall_byhostname[node['hostname']]['node_id'], \
1514 authority=self.hrn).urn
1516 res['geni_urn'] = sliver_id
1517 node_name = node['hostname']
1518 if nodeall_byhostname[node_name]['boot_state'] == 'Alive':
1520 res['geni_status'] = 'ready'
1522 res['geni_status'] = 'failed'
1523 top_level_status = 'failed'
1525 res['geni_error'] = ''
1527 resources.append(res)
1529 result['geni_status'] = top_level_status
1530 result['geni_resources'] = resources
1531 logger.debug("SLABDRIVER \tsliver_statusresources %s res %s "\
1536 def get_user_record( hrn):
1537 """ Returns the user record based on the hrn from the SFA DB """
1538 return dbsession.query(RegRecord).filter_by(hrn = hrn).first()
1541 def testbed_name (self):
1544 # 'geni_request_rspec_versions' and 'geni_ad_rspec_versions' are mandatory
1545 def aggregate_version (self):
1546 version_manager = VersionManager()
1547 ad_rspec_versions = []
1548 request_rspec_versions = []
1549 for rspec_version in version_manager.versions:
1550 if rspec_version.content_type in ['*', 'ad']:
1551 ad_rspec_versions.append(rspec_version.to_dict())
1552 if rspec_version.content_type in ['*', 'request']:
1553 request_rspec_versions.append(rspec_version.to_dict())
1555 'testbed':self.testbed_name(),
1556 'geni_request_rspec_versions': request_rspec_versions,
1557 'geni_ad_rspec_versions': ad_rspec_versions,
1561 def create_sliver (self, slice_urn, slice_hrn, creds, rspec_string, \
1563 aggregate = SlabAggregate(self)
1565 slices = SlabSlices(self)
1566 peer = slices.get_peer(slice_hrn)
1567 sfa_peer = slices.get_sfa_peer(slice_hrn)
1570 if not isinstance(creds, list):
1574 slice_record = users[0].get('slice_record', {})
1575 logger.debug("SLABDRIVER.PY \t ===============create_sliver \t\
1576 creds %s \r\n \r\n users %s" \
1578 slice_record['user'] = {'keys':users[0]['keys'], \
1579 'email':users[0]['email'], \
1580 'hrn':slice_record['reg-researchers'][0]}
1582 rspec = RSpec(rspec_string)
1583 logger.debug("SLABDRIVER.PY \t create_sliver \trspec.version \
1584 %s slice_record %s users %s" \
1585 %(rspec.version,slice_record, users))
1588 # ensure site record exists?
1589 # ensure slice record exists
1590 #Removed options to verify_slice SA 14/08/12
1591 sfa_slice = slices.verify_slice(slice_hrn, slice_record, peer, \
1594 # ensure person records exists
1595 #verify_persons returns added persons but since the return value
1597 slices.verify_persons(slice_hrn, sfa_slice, users, peer, \
1598 sfa_peer, options=options)
1599 #requested_attributes returned by rspec.version.get_slice_attributes()
1600 #unused, removed SA 13/08/12
1601 rspec.version.get_slice_attributes()
1603 logger.debug("SLABDRIVER.PY create_sliver slice %s " %(sfa_slice))
1605 # add/remove slice from nodes
1607 requested_slivers = [node.get('component_id') \
1608 for node in rspec.version.get_nodes_with_slivers()\
1609 if node.get('authority_id') is self.slab_api.root_auth]
1610 l = [ node for node in rspec.version.get_nodes_with_slivers() ]
1611 logger.debug("SLADRIVER \tcreate_sliver requested_slivers \
1612 requested_slivers %s listnodes %s" \
1613 %(requested_slivers,l))
1614 #verify_slice_nodes returns nodes, but unused here. Removed SA 13/08/12.
1615 #slices.verify_slice_nodes(sfa_slice, requested_slivers, peer)
1618 requested_lease_list = []
1622 for lease in rspec.version.get_leases():
1623 single_requested_lease = {}
1624 logger.debug("SLABDRIVER.PY \tcreate_sliver lease %s " %(lease))
1626 if not lease.get('lease_id'):
1627 if get_authority(lease['component_id']) == self.slab_api.root_auth:
1628 single_requested_lease['hostname'] = \
1629 slab_xrn_to_hostname(\
1630 lease.get('component_id').strip())
1631 single_requested_lease['start_time'] = \
1632 lease.get('start_time')
1633 single_requested_lease['duration'] = lease.get('duration')
1634 #Check the experiment's duration is valid before adding
1635 #the lease to the requested leases list
1636 duration_in_seconds = \
1637 int(single_requested_lease['duration'])*60
1638 if duration_in_seconds > self.slab_api.GetLeaseGranularity():
1639 requested_lease_list.append(single_requested_lease)
1641 #Create dict of leases by start_time, regrouping nodes reserved
1643 #time, for the same amount of time = one job on OAR
1644 requested_job_dict = {}
1645 for lease in requested_lease_list:
1647 #In case it is an asap experiment start_time is empty
1648 if lease['start_time'] == '':
1649 lease['start_time'] = '0'
1651 if lease['start_time'] not in requested_job_dict:
1652 if isinstance(lease['hostname'], str):
1653 lease['hostname'] = [lease['hostname']]
1655 requested_job_dict[lease['start_time']] = lease
1658 job_lease = requested_job_dict[lease['start_time']]
1659 if lease['duration'] == job_lease['duration'] :
1660 job_lease['hostname'].append(lease['hostname'])
1665 logger.debug("SLABDRIVER.PY \tcreate_sliver requested_job_dict %s "\
1666 %(requested_job_dict))
1667 #verify_slice_leases returns the leases , but the return value is unused
1668 #here. Removed SA 13/08/12
1669 slices.verify_slice_leases(sfa_slice, \
1670 requested_job_dict, peer)
1672 return aggregate.get_rspec(slice_xrn=slice_urn, \
1673 login=sfa_slice['login'], version=rspec.version)
1676 def delete_sliver (self, slice_urn, slice_hrn, creds, options):
1678 sfa_slice_list = self.slab_api.GetSlices(slice_filter = slice_hrn, \
1679 slice_filter_type = 'slice_hrn')
1681 if not sfa_slice_list:
1684 #Delete all in the slice
1685 for sfa_slice in sfa_slice_list:
1688 logger.debug("SLABDRIVER.PY delete_sliver slice %s" %(sfa_slice))
1689 slices = SlabSlices(self)
1690 # determine if this is a peer slice
1692 peer = slices.get_peer(slice_hrn)
1693 #TODO delete_sliver SA : UnBindObjectFromPeer should be
1694 #used when there is another
1695 #senslab testbed, which is not the case 14/08/12 .
1697 logger.debug("SLABDRIVER.PY delete_sliver peer %s \r\n \t sfa_slice %s " %(peer, sfa_slice))
1700 #self.slab_api.UnBindObjectFromPeer('slice', \
1701 #sfa_slice['record_id_slice'], \
1703 self.slab_api.DeleteSliceFromNodes(sfa_slice)
1709 #self.slab_api.BindObjectToPeer('slice', \
1710 #sfa_slice['record_id_slice'], \
1711 #peer, sfa_slice['peer_slice_id'])
1715 # first 2 args are None in case of resource discovery
1716 def list_resources (self, slice_urn, slice_hrn, creds, options):
1717 #cached_requested = options.get('cached', True)
1719 version_manager = VersionManager()
1720 # get the rspec's return format from options
1722 version_manager.get_version(options.get('geni_rspec_version'))
1723 version_string = "rspec_%s" % (rspec_version)
1725 #panos adding the info option to the caching key (can be improved)
1726 if options.get('info'):
1727 version_string = version_string + "_" + \
1728 options.get('info', 'default')
1730 # Adding the list_leases option to the caching key
1731 if options.get('list_leases'):
1732 version_string = version_string + "_"+options.get('list_leases', 'default')
1734 # Adding geni_available to caching key
1735 if options.get('geni_available'):
1736 version_string = version_string + "_" + str(options.get('geni_available'))
1738 # look in cache first
1739 #if cached_requested and self.cache and not slice_hrn:
1740 #rspec = self.cache.get(version_string)
1742 #logger.debug("SlabDriver.ListResources: \
1743 #returning cached advertisement")
1746 #panos: passing user-defined options
1747 aggregate = SlabAggregate(self)
1748 #origin_hrn = Credential(string=creds[0]).get_gid_caller().get_hrn()
1749 #options.update({'origin_hrn':origin_hrn})
1750 rspec = aggregate.get_rspec(slice_xrn=slice_urn, \
1751 version=rspec_version, options=options)
1754 #if self.cache and not slice_hrn:
1755 #logger.debug("Slab.ListResources: stores advertisement in cache")
1756 #self.cache.add(version_string, rspec)
1761 def list_slices (self, creds, options):
1762 # look in cache first
1764 #slices = self.cache.get('slices')
1766 #logger.debug("PlDriver.list_slices returns from cache")
1771 slices = self.slab_api.GetSlices()
1772 logger.debug("SLABDRIVER.PY \tlist_slices hrn %s \r\n \r\n" %(slices))
1773 slice_hrns = [slab_slice['hrn'] for slab_slice in slices]
1775 slice_urns = [hrn_to_urn(slice_hrn, 'slice') \
1776 for slice_hrn in slice_hrns]
1780 #logger.debug ("SlabDriver.list_slices stores value in cache")
1781 #self.cache.add('slices', slice_urns)
1786 def register (self, sfa_record, hrn, pub_key):
1788 Adding new user, slice, node or site should not be handled
1792 Adding users = LDAP Senslab
1793 Adding slice = Import from LDAP users
1799 def update (self, old_sfa_record, new_sfa_record, hrn, new_key):
1800 """No site or node record update allowed in Senslab."""
1802 pointer = old_sfa_record['pointer']
1803 old_sfa_record_type = old_sfa_record['type']
1805 # new_key implemented for users only
1806 if new_key and old_sfa_record_type not in [ 'user' ]:
1807 raise UnknownSfaType(old_sfa_record_type)
1809 #if (type == "authority"):
1810 #self.shell.UpdateSite(pointer, new_sfa_record)
1812 if old_sfa_record_type == "slice":
1813 slab_record = self.slab_api.sfa_fields_to_slab_fields(old_sfa_record_type, \
1814 hrn, new_sfa_record)
1815 if 'name' in slab_record:
1816 slab_record.pop('name')
1817 #Prototype should be UpdateSlice(self,
1818 #auth, slice_id_or_name, slice_fields)
1819 #Senslab cannot update slice since slice = job
1820 #so we must delete and create another job
1821 self.slab_api.UpdateSlice(pointer, slab_record)
1823 elif old_sfa_record_type == "user":
1825 all_fields = new_sfa_record
1826 for key in all_fields.keys():
1827 if key in ['first_name', 'last_name', 'title', 'email',
1828 'password', 'phone', 'url', 'bio', 'accepted_aup',
1830 update_fields[key] = all_fields[key]
1831 self.slab_api.UpdatePerson(pointer, update_fields)
1834 # must check this key against the previous one if it exists
1835 persons = self.slab_api.GetPersons(['key_ids'])
1837 keys = person['key_ids']
1838 keys = self.slab_api.GetKeys(person['key_ids'])
1840 # Delete all stale keys
1843 if new_key != key['key']:
1844 self.slab_api.DeleteKey(key['key_id'])
1848 self.slab_api.AddPersonKey(pointer, {'key_type': 'ssh', \
1855 def remove (self, sfa_record):
1856 sfa_record_type = sfa_record['type']
1857 hrn = sfa_record['hrn']
1858 if sfa_record_type == 'user':
1860 #get user from senslab ldap
1861 person = self.slab_api.GetPersons(sfa_record)
1862 #No registering at a given site in Senslab.
1863 #Once registered to the LDAP, all senslab sites are
1866 #Mark account as disabled in ldap
1867 self.slab_api.DeletePerson(sfa_record)
1868 elif sfa_record_type == 'slice':
1869 if self.slab_api.GetSlices(slice_filter = hrn, \
1870 slice_filter_type = 'slice_hrn'):
1871 self.slab_api.DeleteSlice(sfa_record)
1873 #elif type == 'authority':
1874 #if self.GetSites(pointer):
1875 #self.DeleteSite(pointer)