4 from datetime import datetime
6 from sfa.util.faults import SliverDoesNotExist, UnknownSfaType
7 from sfa.util.sfalogging import logger
8 from sfa.storage.alchemy import dbsession
9 from sfa.storage.model import RegRecord, RegUser, RegSlice, RegKey
10 from sqlalchemy.orm import joinedload
13 from sfa.managers.driver import Driver
14 from sfa.rspecs.version_manager import VersionManager
15 from sfa.rspecs.rspec import RSpec
17 from sfa.util.xrn import Xrn, hrn_to_urn, get_authority
20 ## thierry: everything that is API-related (i.e. handling incoming requests)
22 # SlabDriver should be really only about talking to the senslab testbed
25 from sfa.senslab.OARrestapi import OARrestapi
26 from sfa.senslab.LDAPapi import LDAPapi
28 from sfa.senslab.slabpostgres import SlabDB, slab_dbsession, SenslabXP
31 from sfa.senslab.slabaggregate import SlabAggregate, slab_xrn_to_hostname, \
33 from sfa.senslab.slabslices import SlabSlices
38 # this inheritance scheme is so that the driver object can receive
39 # GetNodes or GetSites sorts of calls directly
40 # and thus minimize the differences in the managers with the pl version
43 class SlabTestbedAPI():
45 def __init__(self, config):
46 self.oar = OARrestapi()
48 self.time_format = "%Y-%m-%d %H:%M:%S"
49 self.root_auth = config.SFA_REGISTRY_ROOT_AUTH
50 self.grain = 600 # 10 mins lease
55 #TODO clean GetPeers. 05/07/12SA
57 def GetPeers ( auth = None, peer_filter=None, return_fields_list=None):
60 existing_hrns_by_types = {}
61 logger.debug("SLABDRIVER \tGetPeers auth = %s, peer_filter %s, \
62 return_field %s " %(auth , peer_filter, return_fields_list))
63 all_records = dbsession.query(RegRecord).filter(RegRecord.type.like('%authority%')).all()
65 for record in all_records:
66 existing_records[(record.hrn, record.type)] = record
67 if record.type not in existing_hrns_by_types:
68 existing_hrns_by_types[record.type] = [record.hrn]
70 existing_hrns_by_types[record.type].append(record.hrn)
73 logger.debug("SLABDRIVER \tGetPeer\texisting_hrns_by_types %s "\
74 %( existing_hrns_by_types))
79 records_list.append(existing_records[(peer_filter,'authority')])
81 for hrn in existing_hrns_by_types['authority']:
82 records_list.append(existing_records[(hrn,'authority')])
84 logger.debug("SLABDRIVER \tGetPeer \trecords_list %s " \
90 return_records = records_list
91 if not peer_filter and not return_fields_list:
95 logger.debug("SLABDRIVER \tGetPeer return_records %s " \
101 #TODO : Handling OR request in make_ldap_filters_from_records
102 #instead of the for loop
103 #over the records' list
104 def GetPersons(self, person_filter=None):
106 person_filter should be a list of dictionnaries when not set to None.
107 Returns a list of users whose accounts are enabled found in ldap.
110 logger.debug("SLABDRIVER \tGetPersons person_filter %s" \
113 if person_filter and isinstance(person_filter, list):
114 #If we are looking for a list of users (list of dict records)
115 #Usually the list contains only one user record
116 for searched_attributes in person_filter:
118 #Get only enabled user accounts in senslab LDAP :
119 #add a filter for make_ldap_filters_from_record
120 person = self.ldap.LdapFindUser(searched_attributes, \
121 is_user_enabled=True)
122 #If a person was found, append it to the list
124 person_list.append(person)
126 #If the list is empty, return None
127 if len(person_list) is 0:
131 #Get only enabled user accounts in senslab LDAP :
132 #add a filter for make_ldap_filters_from_record
133 person_list = self.ldap.LdapFindUser(is_user_enabled=True)
137 def GetTimezone(self):
138 """ Get the OAR servier time and timezone.
139 Unused SA 16/11/12"""
140 server_timestamp, server_tz = self.oar.parser.\
141 SendRequest("GET_timezone")
142 return server_timestamp, server_tz
145 def DeleteJobs(self, job_id, username):
146 logger.debug("SLABDRIVER \tDeleteJobs jobid %s username %s " %(job_id, username))
147 if not job_id or job_id is -1:
149 #username = slice_hrn.split(".")[-1].rstrip("_slice")
151 reqdict['method'] = "delete"
152 reqdict['strval'] = str(job_id)
155 answer = self.oar.POSTRequestToOARRestAPI('DELETE_jobs_id', \
157 logger.debug("SLABDRIVER \tDeleteJobs jobid %s \r\n answer %s \
158 username %s" %(job_id, answer, username))
163 ##TODO : Unused GetJobsId ? SA 05/07/12
164 #def GetJobsId(self, job_id, username = None ):
166 #Details about a specific job.
167 #Includes details about submission time, jot type, state, events,
168 #owner, assigned ressources, walltime etc...
172 #node_list_k = 'assigned_network_address'
173 ##Get job info from OAR
174 #job_info = self.oar.parser.SendRequest(req, job_id, username)
176 #logger.debug("SLABDRIVER \t GetJobsId %s " %(job_info))
178 #if job_info['state'] == 'Terminated':
179 #logger.debug("SLABDRIVER \t GetJobsId job %s TERMINATED"\
182 #if job_info['state'] == 'Error':
183 #logger.debug("SLABDRIVER \t GetJobsId ERROR message %s "\
188 #logger.error("SLABDRIVER \tGetJobsId KeyError")
191 #parsed_job_info = self.get_info_on_reserved_nodes(job_info, \
193 ##Replaces the previous entry
194 ##"assigned_network_address" / "reserved_resources"
196 #job_info.update({'node_ids':parsed_job_info[node_list_k]})
197 #del job_info[node_list_k]
198 #logger.debug(" \r\nSLABDRIVER \t GetJobsId job_info %s " %(job_info))
202 def GetJobsResources(self, job_id, username = None):
203 #job_resources=['reserved_resources', 'assigned_resources',\
204 #'job_id', 'job_uri', 'assigned_nodes',\
206 #assigned_res = ['resource_id', 'resource_uri']
207 #assigned_n = ['node', 'node_uri']
209 req = "GET_jobs_id_resources"
212 #Get job resources list from OAR
213 node_id_list = self.oar.parser.SendRequest(req, job_id, username)
214 logger.debug("SLABDRIVER \t GetJobsResources %s " %(node_id_list))
217 self.__get_hostnames_from_oar_node_ids(node_id_list)
220 #Replaces the previous entry "assigned_network_address" /
221 #"reserved_resources"
223 job_info = {'node_ids': hostname_list}
228 def get_info_on_reserved_nodes(self, job_info, node_list_name):
229 #Get the list of the testbed nodes records and make a
230 #dictionnary keyed on the hostname out of it
231 node_list_dict = self.GetNodes()
232 #node_hostname_list = []
233 node_hostname_list = [node['hostname'] for node in node_list_dict]
234 #for node in node_list_dict:
235 #node_hostname_list.append(node['hostname'])
236 node_dict = dict(zip(node_hostname_list, node_list_dict))
238 reserved_node_hostname_list = []
239 for index in range(len(job_info[node_list_name])):
240 #job_info[node_list_name][k] =
241 reserved_node_hostname_list[index] = \
242 node_dict[job_info[node_list_name][index]]['hostname']
244 logger.debug("SLABDRIVER \t get_info_on_reserved_nodes \
245 reserved_node_hostname_list %s" \
246 %(reserved_node_hostname_list))
248 logger.error("SLABDRIVER \t get_info_on_reserved_nodes KEYERROR " )
250 return reserved_node_hostname_list
252 def GetNodesCurrentlyInUse(self):
253 """Returns a list of all the nodes already involved in an oar job"""
254 return self.oar.parser.SendRequest("GET_running_jobs")
256 def __get_hostnames_from_oar_node_ids(self, resource_id_list ):
257 full_nodes_dict_list = self.GetNodes()
258 #Put the full node list into a dictionary keyed by oar node id
259 oar_id_node_dict = {}
260 for node in full_nodes_dict_list:
261 oar_id_node_dict[node['oar_id']] = node
263 #logger.debug("SLABDRIVER \t __get_hostnames_from_oar_node_ids\
264 #oar_id_node_dict %s" %(oar_id_node_dict))
266 hostname_dict_list = []
267 for resource_id in resource_id_list:
268 #Because jobs requested "asap" do not have defined resources
269 if resource_id is not "Undefined":
270 hostname_dict_list.append(\
271 oar_id_node_dict[resource_id]['hostname'])
273 #hostname_list.append(oar_id_node_dict[resource_id]['hostname'])
274 return hostname_dict_list
276 def GetReservedNodes(self, username = None):
277 #Get the nodes in use and the reserved nodes
278 reservation_dict_list = \
279 self.oar.parser.SendRequest("GET_reserved_nodes", \
283 for resa in reservation_dict_list:
284 logger.debug ("GetReservedNodes resa %s"%(resa))
285 #dict list of hostnames and their site
286 resa['reserved_nodes'] = \
287 self.__get_hostnames_from_oar_node_ids(resa['resource_ids'])
289 #del resa['resource_ids']
290 return reservation_dict_list
292 def GetNodes(self, node_filter_dict = None, return_fields_list = None):
294 node_filter_dict : dictionnary of lists
297 node_dict_by_id = self.oar.parser.SendRequest("GET_resources_full")
298 node_dict_list = node_dict_by_id.values()
299 logger.debug (" SLABDRIVER GetNodes node_filter_dict %s \
300 return_fields_list %s "%(node_filter_dict, return_fields_list))
301 #No filtering needed return the list directly
302 if not (node_filter_dict or return_fields_list):
303 return node_dict_list
305 return_node_list = []
307 for filter_key in node_filter_dict:
309 #Filter the node_dict_list by each value contained in the
310 #list node_filter_dict[filter_key]
311 for value in node_filter_dict[filter_key]:
312 for node in node_dict_list:
313 if node[filter_key] == value:
314 if return_fields_list :
316 for k in return_fields_list:
318 return_node_list.append(tmp)
320 return_node_list.append(node)
322 logger.log_exc("GetNodes KeyError")
326 return return_node_list
328 def AddSlice(slice_record, user_record):
329 """Add slice to the sfa tables. Called by verify_slice
330 during lease/sliver creation.
333 sfa_record = RegSlice(hrn=slice_record['slice_hrn'],
334 gid=slice_record['gid'],
335 pointer=slice_record['slice_id'],
336 authority=slice_record['authority'])
338 logger.debug("SLABDRIVER.PY AddSlice sfa_record %s user_record %s" \
339 %(sfa_record, user_record))
340 sfa_record.just_created()
341 dbsession.add(sfa_record)
343 #Update the reg-researcher dependance table
344 sfa_record.reg_researchers = [user_record]
347 #Update the senslab table with the new slice
348 #slab_slice = SenslabXP( slice_hrn = slice_record['slice_hrn'], \
349 #record_id_slice = sfa_record.record_id , \
350 #record_id_user = slice_record['record_id_user'], \
351 #peer_authority = slice_record['peer_authority'])
353 #logger.debug("SLABDRIVER.PY \tAddSlice slice_record %s \
354 #slab_slice %s sfa_record %s" \
355 #%(slice_record,slab_slice, sfa_record))
356 #slab_dbsession.add(slab_slice)
357 #slab_dbsession.commit()
360 def GetSites(self, site_filter_name_list = None, return_fields_list = None):
361 site_dict = self.oar.parser.SendRequest("GET_sites")
362 #site_dict : dict where the key is the sit ename
363 return_site_list = []
364 if not ( site_filter_name_list or return_fields_list):
365 return_site_list = site_dict.values()
366 return return_site_list
368 for site_filter_name in site_filter_name_list:
369 if site_filter_name in site_dict:
370 if return_fields_list:
371 for field in return_fields_list:
374 tmp[field] = site_dict[site_filter_name][field]
376 logger.error("GetSites KeyError %s "%(field))
378 return_site_list.append(tmp)
380 return_site_list.append( site_dict[site_filter_name])
383 return return_site_list
389 #TODO : Check rights to delete person
390 def DeletePerson(self, person_record):
391 """ Disable an existing account in senslab LDAP.
392 Users and techs can only delete themselves. PIs can only
393 delete themselves and other non-PIs at their sites.
394 ins can delete anyone.
395 Returns 1 if successful, faults otherwise.
399 #Disable user account in senslab LDAP
400 ret = self.ldap.LdapMarkUserAsDeleted(person_record)
401 logger.warning("SLABDRIVER DeletePerson %s " %(person_record))
404 #TODO Check DeleteSlice, check rights 05/07/2012 SA
405 def DeleteSlice(self, slice_record):
406 """ Deletes the specified slice.
407 Senslab : Kill the job associated with the slice if there is one
408 using DeleteSliceFromNodes.
409 Updates the slice record in slab db to remove the slice nodes.
411 Users may only delete slices of which they are members. PIs may
412 delete any of the slices at their sites, or any slices of which
413 they are members. Admins may delete any slice.
414 Returns 1 if successful, faults otherwise.
418 self.DeleteSliceFromNodes(slice_record)
419 logger.warning("SLABDRIVER DeleteSlice %s "%(slice_record))
423 def __add_person_to_db(user_dict):
425 check_if_exists = dbsession.query(RegUser).filter_by(email = user_dict['email']).first()
427 if not check_if_exists:
428 logger.debug("__add_person_to_db \t Adding %s \r\n \r\n \
429 _________________________________________________________________________\
431 hrn = user_dict['hrn']
432 user_record = RegUser(hrn=hrn , pointer= '-1', authority=get_authority(hrn), \
433 email=user_dict['email'], gid = None)
434 user_record.reg_keys = [RegKey(user_dict['pkey'])]
435 user_record.just_created()
436 dbsession.add (user_record)
440 #TODO AddPerson 04/07/2012 SA
441 #def AddPerson(self, auth, person_fields=None):
442 def AddPerson(self, record):#TODO fixing 28/08//2012 SA
443 """Adds a new account. Any fields specified in records are used,
444 otherwise defaults are used.
445 Accounts are disabled by default. To enable an account,
447 Returns the new person_id (> 0) if successful, faults otherwise.
451 ret = self.ldap.LdapAddUser(record)
453 record['hrn'] = self.root_auth + '.' + ret['uid']
454 logger.debug("SLABDRIVER AddPerson return code %s record %s \r\n "%(ret,record))
455 self.__add_person_to_db(record)
458 #TODO AddPersonToSite 04/07/2012 SA
459 def AddPersonToSite (self, auth, person_id_or_email, \
460 site_id_or_login_base=None):
461 """ Adds the specified person to the specified site. If the person is
462 already a member of the site, no errors are returned. Does not change
463 the person's primary site.
464 Returns 1 if successful, faults otherwise.
468 logger.warning("SLABDRIVER AddPersonToSite EMPTY - DO NOTHING \r\n ")
471 #TODO AddRoleToPerson : Not sure if needed in senslab 04/07/2012 SA
472 def AddRoleToPerson(self, auth, role_id_or_name, person_id_or_email):
473 """Grants the specified role to the person.
474 PIs can only grant the tech and user roles to users and techs at their
475 sites. Admins can grant any role to any user.
476 Returns 1 if successful, faults otherwise.
480 logger.warning("SLABDRIVER AddRoleToPerson EMPTY - DO NOTHING \r\n ")
483 #TODO AddPersonKey 04/07/2012 SA
484 def AddPersonKey(self, auth, person_id_or_email, key_fields=None):
485 """Adds a new key to the specified account.
486 Non-admins can only modify their own keys.
487 Returns the new key_id (> 0) if successful, faults otherwise.
491 logger.warning("SLABDRIVER AddPersonKey EMPTY - DO NOTHING \r\n ")
494 def DeleteLeases(self, leases_id_list, slice_hrn ):
495 logger.debug("SLABDRIVER DeleteLeases leases_id_list %s slice_hrn %s \
496 \r\n " %(leases_id_list, slice_hrn))
497 for job_id in leases_id_list:
498 self.DeleteJobs(job_id, slice_hrn)
506 def LaunchExperimentOnOAR(self, added_nodes, slice_name, \
507 lease_start_time, lease_duration, slice_user=None):
509 lease_dict['lease_start_time'] = lease_start_time
510 lease_dict['lease_duration'] = lease_duration
511 lease_dict['added_nodes'] = added_nodes
512 lease_dict['slice_name'] = slice_name
513 lease_dict['slice_user'] = slice_user
514 lease_dict['grain'] = self.GetLeaseGranularity()
515 lease_dict['time_format'] = self.time_format
518 def __create_job_structure_request_for_OAR(lease_dict):
519 """ Creates the structure needed for a correct POST on OAR.
520 Makes the timestamp transformation into the appropriate format.
521 Sends the POST request to create the job with the resources in
530 reqdict['workdir'] = '/tmp'
531 reqdict['resource'] = "{network_address in ("
533 for node in lease_dict['added_nodes']:
534 logger.debug("\r\n \r\n OARrestapi \t \
535 __create_job_structure_request_for_OAR node %s" %(node))
537 # Get the ID of the node
539 reqdict['resource'] += "'" + nodeid + "', "
540 nodeid_list.append(nodeid)
542 custom_length = len(reqdict['resource'])- 2
543 reqdict['resource'] = reqdict['resource'][0:custom_length] + \
544 ")}/nodes=" + str(len(nodeid_list))
546 def __process_walltime(duration):
547 """ Calculates the walltime in seconds from the duration in H:M:S
548 specified in the RSpec.
552 # Fixing the walltime by adding a few delays.
553 # First put the walltime in seconds oarAdditionalDelay = 20;
554 # additional delay for /bin/sleep command to
555 # take in account prologue and epilogue scripts execution
556 # int walltimeAdditionalDelay = 240; additional delay
557 desired_walltime = duration
558 total_walltime = desired_walltime + 240 #+4 min Update SA 23/10/12
559 sleep_walltime = desired_walltime # 0 sec added Update SA 23/10/12
561 #Put the walltime back in str form
563 walltime.append(str(total_walltime / 3600))
564 total_walltime = total_walltime - 3600 * int(walltime[0])
565 #Get the remaining minutes
566 walltime.append(str(total_walltime / 60))
567 total_walltime = total_walltime - 60 * int(walltime[1])
569 walltime.append(str(total_walltime))
572 logger.log_exc(" __process_walltime duration null")
574 return walltime, sleep_walltime
577 walltime, sleep_walltime = \
578 __process_walltime(int(lease_dict['lease_duration'])*lease_dict['grain'])
581 reqdict['resource'] += ",walltime=" + str(walltime[0]) + \
582 ":" + str(walltime[1]) + ":" + str(walltime[2])
583 reqdict['script_path'] = "/bin/sleep " + str(sleep_walltime)
585 #In case of a scheduled experiment (not immediate)
586 #To run an XP immediately, don't specify date and time in RSpec
587 #They will be set to None.
588 if lease_dict['lease_start_time'] is not '0':
589 #Readable time accepted by OAR
590 start_time = datetime.fromtimestamp(int(lease_dict['lease_start_time'])).\
591 strftime(lease_dict['time_format'])
592 reqdict['reservation'] = start_time
593 #If there is not start time, Immediate XP. No need to add special
597 reqdict['type'] = "deploy"
598 reqdict['directory'] = ""
599 reqdict['name'] = "SFA_" + lease_dict['slice_user']
603 logger.debug("SLABDRIVER.PY \tLaunchExperimentOnOAR slice_user %s\
604 \r\n " %(slice_user))
605 #Create the request for OAR
606 reqdict = __create_job_structure_request_for_OAR(lease_dict)
607 # first step : start the OAR job and update the job
608 logger.debug("SLABDRIVER.PY \tLaunchExperimentOnOAR reqdict %s\
611 answer = self.oar.POSTRequestToOARRestAPI('POST_job', \
613 logger.debug("SLABDRIVER \tLaunchExperimentOnOAR jobid %s " %(answer))
617 logger.log_exc("SLABDRIVER \tLaunchExperimentOnOAR \
618 Impossible to create job %s " %(answer))
622 def __configure_experiment(jobid, added_nodes):
623 # second step : configure the experiment
624 # we need to store the nodes in a yaml (well...) file like this :
625 # [1,56,23,14,45,75] with name /tmp/sfa<jobid>.json
626 tmp_dir = '/tmp/sfa/'
627 if not os.path.exists(tmp_dir):
629 job_file = open(tmp_dir + str(jobid) + '.json', 'w')
631 job_file.write(str(added_nodes[0].strip('node')))
632 for node in added_nodes[1:len(added_nodes)] :
633 job_file.write(', '+ node.strip('node'))
638 def __launch_senslab_experiment(jobid):
639 # third step : call the senslab-experiment wrapper
640 #command= "java -jar target/sfa-1.0-jar-with-dependencies.jar
641 # "+str(jobid)+" "+slice_user
642 javacmdline = "/usr/bin/java"
644 "/opt/senslabexperimentwrapper/sfa-1.0-jar-with-dependencies.jar"
646 output = subprocess.Popen([javacmdline, "-jar", jarname, str(jobid), \
647 slice_user],stdout=subprocess.PIPE).communicate()[0]
649 logger.debug("SLABDRIVER \t __configure_experiment wrapper returns%s " \
656 logger.debug("SLABDRIVER \tLaunchExperimentOnOAR jobid %s \
657 added_nodes %s slice_user %s" %(jobid, added_nodes, slice_user))
660 #__configure_experiment(jobid, added_nodes)
661 #__launch_senslab_experiment(jobid)
666 def AddLeases(self, hostname_list, slice_record, \
667 lease_start_time, lease_duration):
668 logger.debug("SLABDRIVER \r\n \r\n \t AddLeases hostname_list %s \
669 slice_record %s lease_start_time %s lease_duration %s "\
670 %( hostname_list, slice_record , lease_start_time, \
673 #tmp = slice_record['reg-researchers'][0].split(".")
674 username = slice_record['login']
675 #username = tmp[(len(tmp)-1)]
676 job_id = self.LaunchExperimentOnOAR(hostname_list, slice_record['hrn'], \
677 lease_start_time, lease_duration, username)
678 start_time = datetime.fromtimestamp(int(lease_start_time)).strftime(self.time_format)
679 end_time = lease_start_time + lease_duration
681 import logging, logging.handlers
682 from sfa.util.sfalogging import _SfaLogger
683 logger.debug("SLABDRIVER \r\n \r\n \t AddLeases TURN ON LOGGING SQL %s %s %s "%(slice_record['hrn'], job_id, end_time))
684 sql_logger = _SfaLogger(loggername = 'sqlalchemy.engine', level=logging.DEBUG)
685 logger.debug("SLABDRIVER \r\n \r\n \t AddLeases %s %s %s " %(type(slice_record['hrn']), type(job_id), type(end_time)))
687 slab_ex_row = SenslabXP(slice_hrn = slice_record['hrn'], \
688 job_id = job_id, end_time= end_time)
690 logger.debug("SLABDRIVER \r\n \r\n \t AddLeases slab_ex_row %s" \
692 slab_dbsession.add(slab_ex_row)
693 slab_dbsession.commit()
695 logger.debug("SLABDRIVER \t AddLeases hostname_list start_time %s " %(start_time))
700 #Delete the jobs from job_senslab table
701 def DeleteSliceFromNodes(self, slice_record):
702 logger.debug("SLABDRIVER \t DeleteSliceFromNodese %s " %(slice_record))
703 if isinstance(slice_record['oar_job_id'],list):
704 for job_id in slice_record['oar_job_id']:
705 self.DeleteJobs(job_id, slice_record['user'])
707 self.DeleteJobs(slice_record['oar_job_id'],slice_record['user'])
711 def GetLeaseGranularity(self):
712 """ Returns the granularity of Senslab testbed.
713 OAR returns seconds for experiments duration.
715 Experiments which last less than 10 min are invalid"""
722 def update_jobs_in_slabdb( job_oar_list, jobs_psql):
723 #Get all the entries in slab_xp table
726 jobs_psql = set(jobs_psql)
727 kept_jobs = set(job_oar_list).intersection(jobs_psql)
728 logger.debug ( "\r\n \t\ update_jobs_in_slabdb jobs_psql %s \r\n \t \
729 job_oar_list %s kept_jobs %s "%(jobs_psql, job_oar_list, kept_jobs))
730 deleted_jobs = set(jobs_psql).difference(kept_jobs)
731 deleted_jobs = list(deleted_jobs)
732 if len(deleted_jobs) > 0:
733 slab_dbsession.query(SenslabXP).filter(SenslabXP.job_id.in_(deleted_jobs)).delete(synchronize_session='fetch')
734 slab_dbsession.commit()
740 def GetLeases(self, lease_filter_dict=None, login=None):
743 unfiltered_reservation_list = self.GetReservedNodes(login)
745 reservation_list = []
746 #Find the slice associated with this user senslab ldap uid
747 logger.debug(" SLABDRIVER.PY \tGetLeases login %s\
748 unfiltered_reservation_list %s " %(login, unfiltered_reservation_list))
749 #Create user dict first to avoid looking several times for
750 #the same user in LDAP SA 27/07/12
754 jobs_psql_query = slab_dbsession.query(SenslabXP).all()
755 jobs_psql_dict = [ (row.job_id, row.__dict__ )for row in jobs_psql_query ]
756 jobs_psql_dict = dict(jobs_psql_dict)
757 logger.debug("SLABDRIVER \tGetLeases jobs_psql_dict %s"\
759 jobs_psql_id_list = [ row.job_id for row in jobs_psql_query ]
763 for resa in unfiltered_reservation_list:
764 logger.debug("SLABDRIVER \tGetLeases USER %s"\
766 #Cosntruct list of jobs (runing, waiting..) in oar
767 job_oar_list.append(resa['lease_id'])
768 #If there is information on the job in SLAB DB (slice used and job id)
769 if resa['lease_id'] in jobs_psql_dict:
770 job_info = jobs_psql_dict[resa['lease_id']]
771 logger.debug("SLABDRIVER \tGetLeases resa_user_dict %s"\
773 resa['slice_hrn'] = job_info['slice_hrn']
774 resa['slice_id'] = hrn_to_urn(resa['slice_hrn'], 'slice')
776 #Assume it is a senslab slice:
778 resa['slice_id'] = hrn_to_urn(self.root_auth+'.'+ resa['user'] +"_slice" , 'slice')
779 #if resa['user'] not in resa_user_dict:
780 #logger.debug("SLABDRIVER \tGetLeases userNOTIN ")
781 #ldap_info = self.ldap.LdapSearch('(uid='+resa['user']+')')
783 #ldap_info = ldap_info[0][1]
784 ##Get the backref :relationship table reg-researchers
785 #user = dbsession.query(RegUser).options(joinedload('reg_slices_as_researcher')).filter_by(email = \
786 #ldap_info['mail'][0])
789 #user = user.__dict__
790 #slice_info = user['reg_slices_as_researcher'][0].__dict__
791 ##Separated in case user not in database :
792 ##record_id not defined SA 17/07//12
794 ##query_slice_info = slab_dbsession.query(SenslabXP).filter_by(record_id_user = user.record_id)
795 ##if query_slice_info:
796 ##slice_info = query_slice_info.first()
800 #resa_user_dict[resa['user']] = {}
801 #resa_user_dict[resa['user']]['ldap_info'] = user
802 #resa_user_dict[resa['user']]['slice_info'] = slice_info
804 #resa['slice_hrn'] = resa_user_dict[resa['user']]['slice_info']['hrn']
805 #resa['slice_id'] = hrn_to_urn(resa['slice_hrn'], 'slice')
807 resa['slice_hrn'] = Xrn(resa['slice_id']).get_hrn()
809 resa['component_id_list'] = []
810 #Transform the hostnames into urns (component ids)
811 for node in resa['reserved_nodes']:
812 #resa['component_id_list'].append(hostname_to_urn(self.hrn, \
813 #self.root_auth, node['hostname']))
814 slab_xrn = slab_xrn_object(self.root_auth, node)
815 resa['component_id_list'].append(slab_xrn.urn)
817 if lease_filter_dict:
818 logger.debug("SLABDRIVER \tGetLeases resa_ %s \r\n leasefilter %s"\
819 %(resa,lease_filter_dict))
821 if lease_filter_dict['name'] == resa['slice_hrn']:
822 reservation_list.append(resa)
824 if lease_filter_dict is None:
825 reservation_list = unfiltered_reservation_list
827 #del unfiltered_reservation_list[unfiltered_reservation_list.index(resa)]
830 self.update_jobs_in_slabdb(job_oar_list, jobs_psql_id_list)
832 #for resa in unfiltered_reservation_list:
836 #if resa['user'] in resa_user_dict:
837 #resa['slice_hrn'] = resa_user_dict[resa['user']]['slice_info']['hrn']
838 #resa['slice_id'] = hrn_to_urn(resa['slice_hrn'], 'slice')
840 ##resa['slice_id'] = hrn_to_urn(slice_info.slice_hrn, 'slice')
841 #resa['component_id_list'] = []
842 ##Transform the hostnames into urns (component ids)
843 #for node in resa['reserved_nodes']:
844 ##resa['component_id_list'].append(hostname_to_urn(self.hrn, \
845 ##self.root_auth, node['hostname']))
846 #slab_xrn = slab_xrn_object(self.root_auth, node)
847 #resa['component_id_list'].append(slab_xrn.urn)
849 ##Filter the reservation list if necessary
850 ##Returns all the leases associated with a given slice
851 #if lease_filter_dict:
852 #logger.debug("SLABDRIVER \tGetLeases lease_filter_dict %s"\
853 #%(lease_filter_dict))
854 #for resa in unfiltered_reservation_list:
855 #if lease_filter_dict['name'] == resa['slice_hrn']:
856 #reservation_list.append(resa)
858 #reservation_list = unfiltered_reservation_list
860 logger.debug(" SLABDRIVER.PY \tGetLeases reservation_list %s"\
862 return reservation_list
867 #TODO FUNCTIONS SECTION 04/07/2012 SA
869 #TODO : Is UnBindObjectFromPeer still necessary ? Currently does nothing
872 def UnBindObjectFromPeer( auth, object_type, object_id, shortname):
873 """ This method is a hopefully temporary hack to let the sfa correctly
874 detach the objects it creates from a remote peer object. This is
875 needed so that the sfa federation link can work in parallel with
876 RefreshPeer, as RefreshPeer depends on remote objects being correctly
879 auth : struct, API authentication structure
880 AuthMethod : string, Authentication method to use
881 object_type : string, Object type, among 'site','person','slice',
883 object_id : int, object_id
884 shortname : string, peer shortname
888 logger.warning("SLABDRIVER \tUnBindObjectFromPeer EMPTY-\
892 #TODO Is BindObjectToPeer still necessary ? Currently does nothing
894 def BindObjectToPeer(self, auth, object_type, object_id, shortname=None, \
895 remote_object_id=None):
896 """This method is a hopefully temporary hack to let the sfa correctly
897 attach the objects it creates to a remote peer object. This is needed
898 so that the sfa federation link can work in parallel with RefreshPeer,
899 as RefreshPeer depends on remote objects being correctly marked.
901 shortname : string, peer shortname
902 remote_object_id : int, remote object_id, set to 0 if unknown
906 logger.warning("SLABDRIVER \tBindObjectToPeer EMPTY - DO NOTHING \r\n ")
909 #TODO UpdateSlice 04/07/2012 SA
910 #Funciton should delete and create another job since oin senslab slice=job
911 def UpdateSlice(self, auth, slice_id_or_name, slice_fields=None):
912 """Updates the parameters of an existing slice with the values in
914 Users may only update slices of which they are members.
915 PIs may update any of the slices at their sites, or any slices of
916 which they are members. Admins may update any slice.
917 Only PIs and admins may update max_nodes. Slices cannot be renewed
918 (by updating the expires parameter) more than 8 weeks into the future.
919 Returns 1 if successful, faults otherwise.
923 logger.warning("SLABDRIVER UpdateSlice EMPTY - DO NOTHING \r\n ")
926 #TODO UpdatePerson 04/07/2012 SA
927 def UpdatePerson(self, slab_hrn, federated_hrn, person_fields=None):
928 """Updates a person. Only the fields specified in person_fields
929 are updated, all other fields are left untouched.
930 Users and techs can only update themselves. PIs can only update
931 themselves and other non-PIs at their sites.
932 Returns 1 if successful, faults otherwise.
936 #new_row = FederatedToSenslab(slab_hrn, federated_hrn)
937 #slab_dbsession.add(new_row)
938 #slab_dbsession.commit()
940 logger.debug("SLABDRIVER UpdatePerson EMPTY - DO NOTHING \r\n ")
943 #TODO GetKeys 04/07/2012 SA
944 def GetKeys(self, auth, key_filter=None, return_fields=None):
945 """Returns an array of structs containing details about keys.
946 If key_filter is specified and is an array of key identifiers,
947 or a struct of key attributes, only keys matching the filter
948 will be returned. If return_fields is specified, only the
949 specified details will be returned.
951 Admin may query all keys. Non-admins may only query their own keys.
955 logger.warning("SLABDRIVER GetKeys EMPTY - DO NOTHING \r\n ")
958 #TODO DeleteKey 04/07/2012 SA
959 def DeleteKey(self, key_id):
961 Non-admins may only delete their own keys.
962 Returns 1 if successful, faults otherwise.
966 logger.warning("SLABDRIVER DeleteKey EMPTY - DO NOTHING \r\n ")
973 def _sql_get_slice_info( slice_filter ):
974 #DO NOT USE RegSlice - reg_researchers to get the hrn
975 #of the user otherwise will mess up the RegRecord in
976 #Resolve, don't know why - SA 08/08/2012
978 #Only one entry for one user = one slice in slab_xp table
979 #slicerec = dbsession.query(RegRecord).filter_by(hrn = slice_filter).first()
980 raw_slicerec = dbsession.query(RegSlice).options(joinedload('reg_researchers')).filter_by(hrn = slice_filter).first()
981 #raw_slicerec = dbsession.query(RegRecord).filter_by(hrn = slice_filter).first()
984 #raw_slicerec.reg_researchers
985 raw_slicerec = raw_slicerec.__dict__
986 logger.debug(" SLABDRIVER \t get_slice_info slice_filter %s \
987 raw_slicerec %s"%(slice_filter, raw_slicerec))
988 slicerec = raw_slicerec
989 #only one researcher per slice so take the first one
990 #slicerec['reg_researchers'] = raw_slicerec['reg_researchers']
991 #del slicerec['reg_researchers']['_sa_instance_state']
998 def _sql_get_slice_info_from_user(slice_filter ):
999 #slicerec = dbsession.query(RegRecord).filter_by(record_id = slice_filter).first()
1000 raw_slicerec = dbsession.query(RegUser).options(joinedload('reg_slices_as_researcher')).filter_by(record_id = slice_filter).first()
1001 #raw_slicerec = dbsession.query(RegRecord).filter_by(record_id = slice_filter).first()
1002 #Put it in correct order
1003 user_needed_fields = ['peer_authority', 'hrn', 'last_updated', 'classtype', 'authority', 'gid', 'record_id', 'date_created', 'type', 'email', 'pointer']
1004 slice_needed_fields = ['peer_authority', 'hrn', 'last_updated', 'classtype', 'authority', 'gid', 'record_id', 'date_created', 'type', 'pointer']
1006 #raw_slicerec.reg_slices_as_researcher
1007 raw_slicerec = raw_slicerec.__dict__
1010 dict([(k, raw_slicerec['reg_slices_as_researcher'][0].__dict__[k]) \
1011 for k in slice_needed_fields])
1012 slicerec['reg_researchers'] = dict([(k, raw_slicerec[k]) \
1013 for k in user_needed_fields])
1014 #TODO Handle multiple slices for one user SA 10/12/12
1015 #for now only take the first slice record associated to the rec user
1016 ##slicerec = raw_slicerec['reg_slices_as_researcher'][0].__dict__
1017 #del raw_slicerec['reg_slices_as_researcher']
1018 #slicerec['reg_researchers'] = raw_slicerec
1019 ##del slicerec['_sa_instance_state']
1026 def _get_slice_records(self, slice_filter = None, \
1027 slice_filter_type = None):
1031 #Get list of slices based on the slice hrn
1032 if slice_filter_type == 'slice_hrn':
1034 #if get_authority(slice_filter) == self.root_auth:
1035 #login = slice_filter.split(".")[1].split("_")[0]
1037 slicerec = self._sql_get_slice_info(slice_filter)
1039 if slicerec is None:
1043 #Get slice based on user id
1044 if slice_filter_type == 'record_id_user':
1046 slicerec = self._sql_get_slice_info_from_user(slice_filter)
1049 fixed_slicerec_dict = slicerec
1050 #At this point if the there is no login it means
1051 #record_id_user filter has been used for filtering
1053 ##If theslice record is from senslab
1054 #if fixed_slicerec_dict['peer_authority'] is None:
1055 #login = fixed_slicerec_dict['hrn'].split(".")[1].split("_")[0]
1056 #return login, fixed_slicerec_dict
1057 return fixed_slicerec_dict
1061 def GetSlices(self, slice_filter = None, slice_filter_type = None, login=None):
1062 """ Get the slice records from the slab db.
1063 Returns a slice ditc if slice_filter and slice_filter_type
1065 Returns a list of slice dictionnaries if there are no filters
1070 authorized_filter_types_list = ['slice_hrn', 'record_id_user']
1071 return_slicerec_dictlist = []
1073 #First try to get information on the slice based on the filter provided
1074 if slice_filter_type in authorized_filter_types_list:
1075 fixed_slicerec_dict = \
1076 self._get_slice_records(slice_filter, slice_filter_type)
1077 #login, fixed_slicerec_dict = \
1078 #self._get_slice_records(slice_filter, slice_filter_type)
1079 logger.debug(" SLABDRIVER \tGetSlices login %s \
1080 slice record %s slice_filter %s slice_filter_type %s "\
1081 %(login, fixed_slicerec_dict,slice_filter, slice_filter_type))
1084 #Now we have the slice record fixed_slicerec_dict, get the
1085 #jobs associated to this slice
1086 #leases_list = self.GetReservedNodes(username = login)
1087 leases_list = self.GetLeases(login = login)
1088 #If no job is running or no job scheduled
1089 #return only the slice record
1090 if leases_list == [] and fixed_slicerec_dict:
1091 return_slicerec_dictlist.append(fixed_slicerec_dict)
1093 #If several jobs for one slice , put the slice record into
1094 # each lease information dict
1097 for lease in leases_list :
1099 logger.debug("SLABDRIVER.PY \tGetSlices slice_filter %s \
1100 \ lease['slice_hrn'] %s" \
1101 %(slice_filter, lease['slice_hrn']))
1102 if slice_filter_type =='slice_hrn' and lease['slice_hrn'] == slice_filter:
1103 reserved_list = lease['reserved_nodes']
1104 slicerec_dict['slice_hrn'] = lease['slice_hrn']
1105 slicerec_dict['hrn'] = lease['slice_hrn']
1106 slicerec_dict['user'] = lease['user']
1107 slicerec_dict['oar_job_id'] = lease['lease_id']
1108 slicerec_dict.update({'list_node_ids':{'hostname':reserved_list}})
1109 slicerec_dict.update({'node_ids':lease['reserved_nodes']})
1111 #Update lease dict with the slice record
1112 if fixed_slicerec_dict:
1113 fixed_slicerec_dict['oar_job_id'] = []
1114 fixed_slicerec_dict['oar_job_id'].append(slicerec_dict['oar_job_id'])
1115 slicerec_dict.update(fixed_slicerec_dict)
1116 #slicerec_dict.update({'hrn':\
1117 #str(fixed_slicerec_dict['slice_hrn'])})
1119 return_slicerec_dictlist.append(slicerec_dict)
1120 logger.debug("SLABDRIVER.PY \tGetSlices \
1121 OHOHOHOH %s" %(return_slicerec_dictlist ))
1123 logger.debug("SLABDRIVER.PY \tGetSlices \
1124 slicerec_dict %s return_slicerec_dictlist %s \
1125 lease['reserved_nodes'] \
1126 %s" %(slicerec_dict, return_slicerec_dictlist, \
1127 lease['reserved_nodes'] ))
1129 logger.debug("SLABDRIVER.PY \tGetSlices RETURN \
1130 return_slicerec_dictlist %s" \
1131 %(return_slicerec_dictlist))
1133 return return_slicerec_dictlist
1137 #Get all slices from the senslab sfa database ,
1138 #put them in dict format
1139 #query_slice_list = dbsession.query(RegRecord).all()
1140 query_slice_list = dbsession.query(RegSlice).options(joinedload('reg_researchers')).all()
1141 #query_slice_list = dbsession.query(RegRecord).filter_by(type='slice').all()
1142 #query_slice_list = slab_dbsession.query(SenslabXP).all()
1143 return_slicerec_dictlist = []
1144 for record in query_slice_list:
1145 tmp = record.__dict__
1146 tmp['reg_researchers'] = tmp['reg_researchers'][0].__dict__
1147 #del tmp['reg_researchers']['_sa_instance_state']
1148 return_slicerec_dictlist.append(tmp)
1149 #return_slicerec_dictlist.append(record.__dict__)
1151 #Get all the jobs reserved nodes
1152 leases_list = self.GetReservedNodes()
1155 for fixed_slicerec_dict in return_slicerec_dictlist:
1157 #Check if the slice belongs to a senslab user
1158 if fixed_slicerec_dict['peer_authority'] is None:
1159 owner = fixed_slicerec_dict['hrn'].split(".")[1].split("_")[0]
1162 for lease in leases_list:
1163 if owner == lease['user']:
1164 slicerec_dict['oar_job_id'] = lease['lease_id']
1166 #for reserved_node in lease['reserved_nodes']:
1167 logger.debug("SLABDRIVER.PY \tGetSlices lease %s "\
1170 reserved_list = lease['reserved_nodes']
1172 slicerec_dict.update({'node_ids':lease['reserved_nodes']})
1173 slicerec_dict.update({'list_node_ids':{'hostname':reserved_list}})
1174 slicerec_dict.update(fixed_slicerec_dict)
1175 #slicerec_dict.update({'hrn':\
1176 #str(fixed_slicerec_dict['slice_hrn'])})
1177 #return_slicerec_dictlist.append(slicerec_dict)
1178 fixed_slicerec_dict.update(slicerec_dict)
1180 logger.debug("SLABDRIVER.PY \tGetSlices RETURN \
1181 return_slicerec_dictlist %s \slice_filter %s " \
1182 %(return_slicerec_dictlist, slice_filter))
1184 return return_slicerec_dictlist
1190 # Convert SFA fields to PLC fields for use when registering up updating
1191 # registry record in the PLC database
1193 # @param type type of record (user, slice, ...)
1194 # @param hrn human readable name
1195 # @param sfa_fields dictionary of SFA fields
1196 # @param slab_fields dictionary of PLC fields (output)
1198 def sfa_fields_to_slab_fields(sfa_type, hrn, record):
1202 #for field in record:
1203 # slab_record[field] = record[field]
1205 if sfa_type == "slice":
1206 #instantion used in get_slivers ?
1207 if not "instantiation" in slab_record:
1208 slab_record["instantiation"] = "senslab-instantiated"
1209 #slab_record["hrn"] = hrn_to_pl_slicename(hrn)
1210 #Unused hrn_to_pl_slicename because Slab's hrn already
1211 #in the appropriate form SA 23/07/12
1212 slab_record["hrn"] = hrn
1213 logger.debug("SLABDRIVER.PY sfa_fields_to_slab_fields \
1214 slab_record %s " %(slab_record['hrn']))
1216 slab_record["url"] = record["url"]
1217 if "description" in record:
1218 slab_record["description"] = record["description"]
1219 if "expires" in record:
1220 slab_record["expires"] = int(record["expires"])
1222 #nodes added by OAR only and then imported to SFA
1223 #elif type == "node":
1224 #if not "hostname" in slab_record:
1225 #if not "hostname" in record:
1226 #raise MissingSfaInfo("hostname")
1227 #slab_record["hostname"] = record["hostname"]
1228 #if not "model" in slab_record:
1229 #slab_record["model"] = "geni"
1232 #elif type == "authority":
1233 #slab_record["login_base"] = hrn_to_slab_login_base(hrn)
1235 #if not "name" in slab_record:
1236 #slab_record["name"] = hrn
1238 #if not "abbreviated_name" in slab_record:
1239 #slab_record["abbreviated_name"] = hrn
1241 #if not "enabled" in slab_record:
1242 #slab_record["enabled"] = True
1244 #if not "is_public" in slab_record:
1245 #slab_record["is_public"] = True
1252 def __transforms_timestamp_into_date(self, xp_utc_timestamp = None):
1253 """ Transforms unix timestamp into valid OAR date format """
1255 #Used in case of a scheduled experiment (not immediate)
1256 #To run an XP immediately, don't specify date and time in RSpec
1257 #They will be set to None.
1258 if xp_utc_timestamp:
1259 #transform the xp_utc_timestamp into server readable time
1260 xp_server_readable_date = datetime.fromtimestamp(int(\
1261 xp_utc_timestamp)).strftime(self.time_format)
1263 return xp_server_readable_date
1281 class SlabDriver(Driver):
1282 """ Senslab Driver class inherited from Driver generic class.
1284 Contains methods compliant with the SFA standard and the testbed
1285 infrastructure (calls to LDAP and OAR).
1287 def __init__(self, config):
1288 Driver.__init__ (self, config)
1289 self.config = config
1290 self.hrn = config.SFA_INTERFACE_HRN
1292 self.db = SlabDB(config, debug = False)
1293 self.slab_api = SlabTestbedAPI(config)
1296 def augment_records_with_testbed_info (self, record_list ):
1297 """ Adds specific testbed info to the records. """
1298 return self.fill_record_info (record_list)
1300 def fill_record_info(self, record_list):
1302 Given a SFA record, fill in the senslab specific and SFA specific
1303 fields in the record.
1306 logger.debug("SLABDRIVER \tfill_record_info records %s " %(record_list))
1307 if not isinstance(record_list, list):
1308 record_list = [record_list]
1311 for record in record_list:
1312 #If the record is a SFA slice record, then add information
1313 #about the user of this slice. This kind of
1314 #information is in the Senslab's DB.
1315 if str(record['type']) == 'slice':
1316 if 'reg_researchers' in record and \
1317 isinstance(record['reg_researchers'], list) :
1318 record['reg_researchers'] = record['reg_researchers'][0].__dict__
1319 record.update({'PI':[record['reg_researchers']['hrn']],
1320 'researcher': [record['reg_researchers']['hrn']],
1321 'name':record['hrn'],
1324 'person_ids':[record['reg_researchers']['record_id']],
1325 'geni_urn':'', #For client_helper.py compatibility
1326 'keys':'', #For client_helper.py compatibility
1327 'key_ids':''}) #For client_helper.py compatibility
1330 #Get slab slice record.
1331 recslice_list = self.slab_api.GetSlices(slice_filter = \
1332 str(record['hrn']),\
1333 slice_filter_type = 'slice_hrn')
1336 logger.debug("SLABDRIVER \tfill_record_info \
1337 TYPE SLICE RECUSER record['hrn'] %s ecord['oar_job_id']\
1338 %s " %(record['hrn'], record['oar_job_id']))
1340 for rec in recslice_list:
1341 logger.debug("SLABDRIVER\r\n \t fill_record_info oar_job_id %s " %(rec['oar_job_id']))
1342 del record['reg_researchers']
1343 record['node_ids'] = [ self.slab_api.root_auth + hostname for hostname in rec['node_ids']]
1347 logger.debug( "SLABDRIVER.PY \t fill_record_info SLICE \
1348 recslice_list %s \r\n \t RECORD %s \r\n \
1349 \r\n" %(recslice_list, record))
1350 if str(record['type']) == 'user':
1351 #The record is a SFA user record.
1352 #Get the information about his slice from Senslab's DB
1353 #and add it to the user record.
1354 recslice_list = self.slab_api.GetSlices(\
1355 slice_filter = record['record_id'],\
1356 slice_filter_type = 'record_id_user')
1358 logger.debug( "SLABDRIVER.PY \t fill_record_info TYPE USER \
1359 recslice_list %s \r\n \t RECORD %s \r\n" %(recslice_list , record))
1360 #Append slice record in records list,
1361 #therefore fetches user and slice info again(one more loop)
1362 #Will update PIs and researcher for the slice
1363 #recuser = dbsession.query(RegRecord).filter_by(record_id = \
1364 #recslice_list[0]['record_id_user']).first()
1365 recuser = recslice_list[0]['reg_researchers']
1366 logger.debug( "SLABDRIVER.PY \t fill_record_info USER \
1367 recuser %s \r\n \r\n" %(recuser))
1369 recslice = recslice_list[0]
1370 recslice.update({'PI':[recuser['hrn']],
1371 'researcher': [recuser['hrn']],
1372 'name':record['hrn'],
1375 'person_ids':[recuser['record_id']]})
1377 for rec in recslice_list:
1378 recslice['oar_job_id'].append(rec['oar_job_id'])
1382 recslice.update({'type':'slice', \
1383 'hrn':recslice_list[0]['hrn']})
1386 #GetPersons takes [] as filters
1387 user_slab = self.slab_api.GetPersons([record])
1390 record.update(user_slab[0])
1391 #For client_helper.py compatibility
1392 record.update( { 'geni_urn':'',
1395 record_list.append(recslice)
1397 logger.debug("SLABDRIVER.PY \tfill_record_info ADDING SLICE\
1398 INFO TO USER records %s" %(record_list))
1400 logger.debug("SLABDRIVER.PY \tfill_record_info END \
1401 record %s \r\n \r\n " %(record))
1403 except TypeError, error:
1404 logger.log_exc("SLABDRIVER \t fill_record_info EXCEPTION %s"\
1406 #logger.debug("SLABDRIVER.PY \t fill_record_info ENDENDEND ")
1411 def sliver_status(self, slice_urn, slice_hrn):
1412 """Receive a status request for slice named urn/hrn
1413 urn:publicid:IDN+senslab+nturro_slice hrn senslab.nturro_slice
1414 shall return a structure as described in
1415 http://groups.geni.net/geni/wiki/GAPI_AM_API_V2#SliverStatus
1416 NT : not sure if we should implement this or not, but used by sface.
1420 #First get the slice with the slice hrn
1421 slice_list = self.slab_api.GetSlices(slice_filter = slice_hrn, \
1422 slice_filter_type = 'slice_hrn')
1424 if len(slice_list) is 0:
1425 raise SliverDoesNotExist("%s slice_hrn" % (slice_hrn))
1427 #Used for fetching the user info witch comes along the slice info
1428 one_slice = slice_list[0]
1431 #Make a list of all the nodes hostnames in use for this slice
1432 slice_nodes_list = []
1433 #for single_slice in slice_list:
1434 #for node in single_slice['node_ids']:
1435 #slice_nodes_list.append(node['hostname'])
1436 for node in one_slice:
1437 slice_nodes_list.append(node['hostname'])
1439 #Get all the corresponding nodes details
1440 nodes_all = self.slab_api.GetNodes({'hostname':slice_nodes_list},
1441 ['node_id', 'hostname','site','boot_state'])
1442 nodeall_byhostname = dict([(one_node['hostname'], one_node) \
1443 for one_node in nodes_all])
1447 for single_slice in slice_list:
1450 top_level_status = 'empty'
1453 ['geni_urn','pl_login','geni_status','geni_resources'], None)
1454 result['pl_login'] = one_slice['reg_researchers']['hrn']
1455 logger.debug("Slabdriver - sliver_status Sliver status \
1456 urn %s hrn %s single_slice %s \r\n " \
1457 %(slice_urn, slice_hrn, single_slice))
1459 if 'node_ids' not in single_slice:
1460 #No job in the slice
1461 result['geni_status'] = top_level_status
1462 result['geni_resources'] = []
1465 top_level_status = 'ready'
1467 #A job is running on Senslab for this slice
1468 # report about the local nodes that are in the slice only
1470 result['geni_urn'] = slice_urn
1474 #timestamp = float(sl['startTime']) + float(sl['walltime'])
1475 #result['pl_expires'] = strftime(self.time_format, \
1476 #gmtime(float(timestamp)))
1477 #result['slab_expires'] = strftime(self.time_format,\
1478 #gmtime(float(timestamp)))
1481 for node in single_slice['node_ids']:
1483 #res['slab_hostname'] = node['hostname']
1484 #res['slab_boot_state'] = node['boot_state']
1486 res['pl_hostname'] = node['hostname']
1487 res['pl_boot_state'] = \
1488 nodeall_byhostname[node['hostname']]['boot_state']
1489 #res['pl_last_contact'] = strftime(self.time_format, \
1490 #gmtime(float(timestamp)))
1491 sliver_id = Xrn(slice_urn, type='slice', \
1492 id=nodeall_byhostname[node['hostname']]['node_id'], \
1493 authority=self.hrn).urn
1495 res['geni_urn'] = sliver_id
1496 node_name = node['hostname']
1497 if nodeall_byhostname[node_name]['boot_state'] == 'Alive':
1499 res['geni_status'] = 'ready'
1501 res['geni_status'] = 'failed'
1502 top_level_status = 'failed'
1504 res['geni_error'] = ''
1506 resources.append(res)
1508 result['geni_status'] = top_level_status
1509 result['geni_resources'] = resources
1510 logger.debug("SLABDRIVER \tsliver_statusresources %s res %s "\
1515 def get_user_record( hrn):
1516 """ Returns the user record based on the hrn from the SFA DB """
1517 return dbsession.query(RegRecord).filter_by(hrn = hrn).first()
1520 def testbed_name (self):
1523 # 'geni_request_rspec_versions' and 'geni_ad_rspec_versions' are mandatory
1524 def aggregate_version (self):
1525 version_manager = VersionManager()
1526 ad_rspec_versions = []
1527 request_rspec_versions = []
1528 for rspec_version in version_manager.versions:
1529 if rspec_version.content_type in ['*', 'ad']:
1530 ad_rspec_versions.append(rspec_version.to_dict())
1531 if rspec_version.content_type in ['*', 'request']:
1532 request_rspec_versions.append(rspec_version.to_dict())
1534 'testbed':self.testbed_name(),
1535 'geni_request_rspec_versions': request_rspec_versions,
1536 'geni_ad_rspec_versions': ad_rspec_versions,
1540 def create_sliver (self, slice_urn, slice_hrn, creds, rspec_string, \
1542 aggregate = SlabAggregate(self)
1544 slices = SlabSlices(self)
1545 peer = slices.get_peer(slice_hrn)
1546 sfa_peer = slices.get_sfa_peer(slice_hrn)
1549 if not isinstance(creds, list):
1553 slice_record = users[0].get('slice_record', {})
1554 logger.debug("SLABDRIVER.PY \t ===============create_sliver \t\
1555 creds %s \r\n \r\n users %s" \
1557 slice_record['user'] = {'keys':users[0]['keys'], \
1558 'email':users[0]['email'], \
1559 'hrn':slice_record['reg-researchers'][0]}
1561 rspec = RSpec(rspec_string)
1562 logger.debug("SLABDRIVER.PY \t create_sliver \trspec.version \
1563 %s slice_record %s users %s" \
1564 %(rspec.version,slice_record, users))
1567 # ensure site record exists?
1568 # ensure slice record exists
1569 #Removed options to verify_slice SA 14/08/12
1570 sfa_slice = slices.verify_slice(slice_hrn, slice_record, peer, \
1573 # ensure person records exists
1574 #verify_persons returns added persons but since the return value
1576 slices.verify_persons(slice_hrn, sfa_slice, users, peer, \
1577 sfa_peer, options=options)
1578 #requested_attributes returned by rspec.version.get_slice_attributes()
1579 #unused, removed SA 13/08/12
1580 rspec.version.get_slice_attributes()
1582 logger.debug("SLABDRIVER.PY create_sliver slice %s " %(sfa_slice))
1584 # add/remove slice from nodes
1586 requested_slivers = [node.get('component_id') \
1587 for node in rspec.version.get_nodes_with_slivers()\
1588 if node.get('authority_id') is self.slab_api.root_auth]
1589 l = [ node for node in rspec.version.get_nodes_with_slivers() ]
1590 logger.debug("SLADRIVER \tcreate_sliver requested_slivers \
1591 requested_slivers %s listnodes %s" \
1592 %(requested_slivers,l))
1593 #verify_slice_nodes returns nodes, but unused here. Removed SA 13/08/12.
1594 #slices.verify_slice_nodes(sfa_slice, requested_slivers, peer)
1597 requested_lease_list = []
1601 for lease in rspec.version.get_leases():
1602 single_requested_lease = {}
1603 logger.debug("SLABDRIVER.PY \tcreate_sliver lease %s " %(lease))
1605 if not lease.get('lease_id'):
1606 if get_authority(lease['component_id']) == self.slab_api.root_auth:
1607 single_requested_lease['hostname'] = \
1608 slab_xrn_to_hostname(\
1609 lease.get('component_id').strip())
1610 single_requested_lease['start_time'] = \
1611 lease.get('start_time')
1612 single_requested_lease['duration'] = lease.get('duration')
1613 #Check the experiment's duration is valid before adding
1614 #the lease to the requested leases list
1615 duration_in_seconds = \
1616 int(single_requested_lease['duration'])*60
1617 if duration_in_seconds > self.slab_api.GetLeaseGranularity():
1618 requested_lease_list.append(single_requested_lease)
1620 #Create dict of leases by start_time, regrouping nodes reserved
1622 #time, for the same amount of time = one job on OAR
1623 requested_job_dict = {}
1624 for lease in requested_lease_list:
1626 #In case it is an asap experiment start_time is empty
1627 if lease['start_time'] == '':
1628 lease['start_time'] = '0'
1630 if lease['start_time'] not in requested_job_dict:
1631 if isinstance(lease['hostname'], str):
1632 lease['hostname'] = [lease['hostname']]
1634 requested_job_dict[lease['start_time']] = lease
1637 job_lease = requested_job_dict[lease['start_time']]
1638 if lease['duration'] == job_lease['duration'] :
1639 job_lease['hostname'].append(lease['hostname'])
1644 logger.debug("SLABDRIVER.PY \tcreate_sliver requested_job_dict %s "\
1645 %(requested_job_dict))
1646 #verify_slice_leases returns the leases , but the return value is unused
1647 #here. Removed SA 13/08/12
1648 slices.verify_slice_leases(sfa_slice, \
1649 requested_job_dict, peer)
1651 return aggregate.get_rspec(slice_xrn=slice_urn, \
1652 login=sfa_slice['login'], version=rspec.version)
1655 def delete_sliver (self, slice_urn, slice_hrn, creds, options):
1657 sfa_slice_list = self.slab_api.GetSlices(slice_filter = slice_hrn, \
1658 slice_filter_type = 'slice_hrn')
1660 if not sfa_slice_list:
1663 #Delete all in the slice
1664 for sfa_slice in sfa_slice_list:
1667 logger.debug("SLABDRIVER.PY delete_sliver slice %s" %(sfa_slice))
1668 slices = SlabSlices(self)
1669 # determine if this is a peer slice
1671 peer = slices.get_peer(slice_hrn)
1672 #TODO delete_sliver SA : UnBindObjectFromPeer should be
1673 #used when there is another
1674 #senslab testbed, which is not the case 14/08/12 .
1676 logger.debug("SLABDRIVER.PY delete_sliver peer %s \r\n \t sfa_slice %s " %(peer, sfa_slice))
1679 #self.slab_api.UnBindObjectFromPeer('slice', \
1680 #sfa_slice['record_id_slice'], \
1682 self.slab_api.DeleteSliceFromNodes(sfa_slice)
1688 #self.slab_api.BindObjectToPeer('slice', \
1689 #sfa_slice['record_id_slice'], \
1690 #peer, sfa_slice['peer_slice_id'])
1694 # first 2 args are None in case of resource discovery
1695 def list_resources (self, slice_urn, slice_hrn, creds, options):
1696 #cached_requested = options.get('cached', True)
1698 version_manager = VersionManager()
1699 # get the rspec's return format from options
1701 version_manager.get_version(options.get('geni_rspec_version'))
1702 version_string = "rspec_%s" % (rspec_version)
1704 #panos adding the info option to the caching key (can be improved)
1705 if options.get('info'):
1706 version_string = version_string + "_" + \
1707 options.get('info', 'default')
1709 # Adding the list_leases option to the caching key
1710 if options.get('list_leases'):
1711 version_string = version_string + "_"+options.get('list_leases', 'default')
1713 # Adding geni_available to caching key
1714 if options.get('geni_available'):
1715 version_string = version_string + "_" + str(options.get('geni_available'))
1717 # look in cache first
1718 #if cached_requested and self.cache and not slice_hrn:
1719 #rspec = self.cache.get(version_string)
1721 #logger.debug("SlabDriver.ListResources: \
1722 #returning cached advertisement")
1725 #panos: passing user-defined options
1726 aggregate = SlabAggregate(self)
1727 #origin_hrn = Credential(string=creds[0]).get_gid_caller().get_hrn()
1728 #options.update({'origin_hrn':origin_hrn})
1729 rspec = aggregate.get_rspec(slice_xrn=slice_urn, \
1730 version=rspec_version, options=options)
1733 #if self.cache and not slice_hrn:
1734 #logger.debug("Slab.ListResources: stores advertisement in cache")
1735 #self.cache.add(version_string, rspec)
1740 def list_slices (self, creds, options):
1741 # look in cache first
1743 #slices = self.cache.get('slices')
1745 #logger.debug("PlDriver.list_slices returns from cache")
1750 slices = self.slab_api.GetSlices()
1751 logger.debug("SLABDRIVER.PY \tlist_slices hrn %s \r\n \r\n" %(slices))
1752 slice_hrns = [slab_slice['hrn'] for slab_slice in slices]
1754 slice_urns = [hrn_to_urn(slice_hrn, 'slice') \
1755 for slice_hrn in slice_hrns]
1759 #logger.debug ("SlabDriver.list_slices stores value in cache")
1760 #self.cache.add('slices', slice_urns)
1765 def register (self, sfa_record, hrn, pub_key):
1767 Adding new user, slice, node or site should not be handled
1771 Adding users = LDAP Senslab
1772 Adding slice = Import from LDAP users
1778 def update (self, old_sfa_record, new_sfa_record, hrn, new_key):
1779 """No site or node record update allowed in Senslab."""
1781 pointer = old_sfa_record['pointer']
1782 old_sfa_record_type = old_sfa_record['type']
1784 # new_key implemented for users only
1785 if new_key and old_sfa_record_type not in [ 'user' ]:
1786 raise UnknownSfaType(old_sfa_record_type)
1788 #if (type == "authority"):
1789 #self.shell.UpdateSite(pointer, new_sfa_record)
1791 if old_sfa_record_type == "slice":
1792 slab_record = self.slab_api.sfa_fields_to_slab_fields(old_sfa_record_type, \
1793 hrn, new_sfa_record)
1794 if 'name' in slab_record:
1795 slab_record.pop('name')
1796 #Prototype should be UpdateSlice(self,
1797 #auth, slice_id_or_name, slice_fields)
1798 #Senslab cannot update slice since slice = job
1799 #so we must delete and create another job
1800 self.slab_api.UpdateSlice(pointer, slab_record)
1802 elif old_sfa_record_type == "user":
1804 all_fields = new_sfa_record
1805 for key in all_fields.keys():
1806 if key in ['first_name', 'last_name', 'title', 'email',
1807 'password', 'phone', 'url', 'bio', 'accepted_aup',
1809 update_fields[key] = all_fields[key]
1810 self.slab_api.UpdatePerson(pointer, update_fields)
1813 # must check this key against the previous one if it exists
1814 persons = self.slab_api.GetPersons(['key_ids'])
1816 keys = person['key_ids']
1817 keys = self.slab_api.GetKeys(person['key_ids'])
1819 # Delete all stale keys
1822 if new_key != key['key']:
1823 self.slab_api.DeleteKey(key['key_id'])
1827 self.slab_api.AddPersonKey(pointer, {'key_type': 'ssh', \
1834 def remove (self, sfa_record):
1835 sfa_record_type = sfa_record['type']
1836 hrn = sfa_record['hrn']
1837 if sfa_record_type == 'user':
1839 #get user from senslab ldap
1840 person = self.slab_api.GetPersons(sfa_record)
1841 #No registering at a given site in Senslab.
1842 #Once registered to the LDAP, all senslab sites are
1845 #Mark account as disabled in ldap
1846 self.slab_api.DeletePerson(sfa_record)
1847 elif sfa_record_type == 'slice':
1848 if self.slab_api.GetSlices(slice_filter = hrn, \
1849 slice_filter_type = 'slice_hrn'):
1850 self.slab_api.DeleteSlice(sfa_record)
1852 #elif type == 'authority':
1853 #if self.GetSites(pointer):
1854 #self.DeleteSite(pointer)