4 from datetime import datetime
6 from sfa.util.faults import SliverDoesNotExist, UnknownSfaType
7 from sfa.util.sfalogging import logger
8 from sfa.storage.alchemy import dbsession
9 from sfa.storage.model import RegRecord, RegUser, RegSlice, RegKey
10 from sqlalchemy.orm import joinedload
13 from sfa.managers.driver import Driver
14 from sfa.rspecs.version_manager import VersionManager
15 from sfa.rspecs.rspec import RSpec
17 from sfa.util.xrn import Xrn, hrn_to_urn, get_authority
20 ## thierry: everything that is API-related (i.e. handling incoming requests)
22 # SlabDriver should be really only about talking to the senslab testbed
25 from sfa.senslab.OARrestapi import OARrestapi
26 from sfa.senslab.LDAPapi import LDAPapi
28 from sfa.senslab.slabpostgres import SlabDB, slab_dbsession, SenslabXP
31 from sfa.senslab.slabaggregate import SlabAggregate, slab_xrn_to_hostname, \
33 from sfa.senslab.slabslices import SlabSlices
38 # this inheritance scheme is so that the driver object can receive
39 # GetNodes or GetSites sorts of calls directly
40 # and thus minimize the differences in the managers with the pl version
43 class SlabTestbedAPI():
45 def __init__(self, config):
46 self.oar = OARrestapi()
48 self.time_format = "%Y-%m-%d %H:%M:%S"
49 self.root_auth = config.SFA_REGISTRY_ROOT_AUTH
50 self.grain = 600 # 10 mins lease
55 #TODO clean GetPeers. 05/07/12SA
57 def GetPeers ( auth = None, peer_filter=None, return_fields_list=None):
60 existing_hrns_by_types = {}
61 logger.debug("SLABDRIVER \tGetPeers auth = %s, peer_filter %s, \
62 return_field %s " %(auth , peer_filter, return_fields_list))
63 all_records = dbsession.query(RegRecord).filter(RegRecord.type.like('%authority%')).all()
65 for record in all_records:
66 existing_records[(record.hrn, record.type)] = record
67 if record.type not in existing_hrns_by_types:
68 existing_hrns_by_types[record.type] = [record.hrn]
70 existing_hrns_by_types[record.type].append(record.hrn)
73 logger.debug("SLABDRIVER \tGetPeer\texisting_hrns_by_types %s "\
74 %( existing_hrns_by_types))
79 records_list.append(existing_records[(peer_filter,'authority')])
81 for hrn in existing_hrns_by_types['authority']:
82 records_list.append(existing_records[(hrn,'authority')])
84 logger.debug("SLABDRIVER \tGetPeer \trecords_list %s " \
90 return_records = records_list
91 if not peer_filter and not return_fields_list:
95 logger.debug("SLABDRIVER \tGetPeer return_records %s " \
101 #TODO : Handling OR request in make_ldap_filters_from_records
102 #instead of the for loop
103 #over the records' list
104 def GetPersons(self, person_filter=None):
106 person_filter should be a list of dictionnaries when not set to None.
107 Returns a list of users whose accounts are enabled found in ldap.
110 logger.debug("SLABDRIVER \tGetPersons person_filter %s" \
113 if person_filter and isinstance(person_filter, list):
114 #If we are looking for a list of users (list of dict records)
115 #Usually the list contains only one user record
116 for searched_attributes in person_filter:
118 #Get only enabled user accounts in senslab LDAP :
119 #add a filter for make_ldap_filters_from_record
120 person = self.ldap.LdapFindUser(searched_attributes, \
121 is_user_enabled=True)
122 #If a person was found, append it to the list
124 person_list.append(person)
126 #If the list is empty, return None
127 if len(person_list) is 0:
131 #Get only enabled user accounts in senslab LDAP :
132 #add a filter for make_ldap_filters_from_record
133 person_list = self.ldap.LdapFindUser(is_user_enabled=True)
137 def GetTimezone(self):
138 """ Get the OAR servier time and timezone.
139 Unused SA 16/11/12"""
140 server_timestamp, server_tz = self.oar.parser.\
141 SendRequest("GET_timezone")
142 return server_timestamp, server_tz
145 def DeleteJobs(self, job_id, username):
146 logger.debug("SLABDRIVER \tDeleteJobs jobid %s username %s " %(job_id, username))
147 if not job_id or job_id is -1:
149 #username = slice_hrn.split(".")[-1].rstrip("_slice")
151 reqdict['method'] = "delete"
152 reqdict['strval'] = str(job_id)
155 answer = self.oar.POSTRequestToOARRestAPI('DELETE_jobs_id', \
157 logger.debug("SLABDRIVER \tDeleteJobs jobid %s \r\n answer %s \
158 username %s" %(job_id, answer, username))
163 ##TODO : Unused GetJobsId ? SA 05/07/12
164 #def GetJobsId(self, job_id, username = None ):
166 #Details about a specific job.
167 #Includes details about submission time, jot type, state, events,
168 #owner, assigned ressources, walltime etc...
172 #node_list_k = 'assigned_network_address'
173 ##Get job info from OAR
174 #job_info = self.oar.parser.SendRequest(req, job_id, username)
176 #logger.debug("SLABDRIVER \t GetJobsId %s " %(job_info))
178 #if job_info['state'] == 'Terminated':
179 #logger.debug("SLABDRIVER \t GetJobsId job %s TERMINATED"\
182 #if job_info['state'] == 'Error':
183 #logger.debug("SLABDRIVER \t GetJobsId ERROR message %s "\
188 #logger.error("SLABDRIVER \tGetJobsId KeyError")
191 #parsed_job_info = self.get_info_on_reserved_nodes(job_info, \
193 ##Replaces the previous entry
194 ##"assigned_network_address" / "reserved_resources"
196 #job_info.update({'node_ids':parsed_job_info[node_list_k]})
197 #del job_info[node_list_k]
198 #logger.debug(" \r\nSLABDRIVER \t GetJobsId job_info %s " %(job_info))
202 def GetJobsResources(self, job_id, username = None):
203 #job_resources=['reserved_resources', 'assigned_resources',\
204 #'job_id', 'job_uri', 'assigned_nodes',\
206 #assigned_res = ['resource_id', 'resource_uri']
207 #assigned_n = ['node', 'node_uri']
209 req = "GET_jobs_id_resources"
212 #Get job resources list from OAR
213 node_id_list = self.oar.parser.SendRequest(req, job_id, username)
214 logger.debug("SLABDRIVER \t GetJobsResources %s " %(node_id_list))
217 self.__get_hostnames_from_oar_node_ids(node_id_list)
220 #Replaces the previous entry "assigned_network_address" /
221 #"reserved_resources"
223 job_info = {'node_ids': hostname_list}
228 def get_info_on_reserved_nodes(self, job_info, node_list_name):
229 #Get the list of the testbed nodes records and make a
230 #dictionnary keyed on the hostname out of it
231 node_list_dict = self.GetNodes()
232 #node_hostname_list = []
233 node_hostname_list = [node['hostname'] for node in node_list_dict]
234 #for node in node_list_dict:
235 #node_hostname_list.append(node['hostname'])
236 node_dict = dict(zip(node_hostname_list, node_list_dict))
238 reserved_node_hostname_list = []
239 for index in range(len(job_info[node_list_name])):
240 #job_info[node_list_name][k] =
241 reserved_node_hostname_list[index] = \
242 node_dict[job_info[node_list_name][index]]['hostname']
244 logger.debug("SLABDRIVER \t get_info_on_reserved_nodes \
245 reserved_node_hostname_list %s" \
246 %(reserved_node_hostname_list))
248 logger.error("SLABDRIVER \t get_info_on_reserved_nodes KEYERROR " )
250 return reserved_node_hostname_list
252 def GetNodesCurrentlyInUse(self):
253 """Returns a list of all the nodes already involved in an oar job"""
254 return self.oar.parser.SendRequest("GET_running_jobs")
256 def __get_hostnames_from_oar_node_ids(self, resource_id_list ):
257 full_nodes_dict_list = self.GetNodes()
258 #Put the full node list into a dictionary keyed by oar node id
259 oar_id_node_dict = {}
260 for node in full_nodes_dict_list:
261 oar_id_node_dict[node['oar_id']] = node
263 #logger.debug("SLABDRIVER \t __get_hostnames_from_oar_node_ids\
264 #oar_id_node_dict %s" %(oar_id_node_dict))
266 hostname_dict_list = []
267 for resource_id in resource_id_list:
268 #Because jobs requested "asap" do not have defined resources
269 if resource_id is not "Undefined":
270 hostname_dict_list.append(\
271 oar_id_node_dict[resource_id]['hostname'])
273 #hostname_list.append(oar_id_node_dict[resource_id]['hostname'])
274 return hostname_dict_list
276 def GetReservedNodes(self, username = None):
277 #Get the nodes in use and the reserved nodes
278 reservation_dict_list = \
279 self.oar.parser.SendRequest("GET_reserved_nodes", \
283 for resa in reservation_dict_list:
284 logger.debug ("GetReservedNodes resa %s"%(resa))
285 #dict list of hostnames and their site
286 resa['reserved_nodes'] = \
287 self.__get_hostnames_from_oar_node_ids(resa['resource_ids'])
289 #del resa['resource_ids']
290 return reservation_dict_list
292 def GetNodes(self, node_filter_dict = None, return_fields_list = None):
294 node_filter_dict : dictionnary of lists
297 node_dict_by_id = self.oar.parser.SendRequest("GET_resources_full")
298 node_dict_list = node_dict_by_id.values()
299 logger.debug (" SLABDRIVER GetNodes node_filter_dict %s \
300 return_fields_list %s "%(node_filter_dict, return_fields_list))
301 #No filtering needed return the list directly
302 if not (node_filter_dict or return_fields_list):
303 return node_dict_list
305 return_node_list = []
307 for filter_key in node_filter_dict:
309 #Filter the node_dict_list by each value contained in the
310 #list node_filter_dict[filter_key]
311 for value in node_filter_dict[filter_key]:
312 for node in node_dict_list:
313 if node[filter_key] == value:
314 if return_fields_list :
316 for k in return_fields_list:
318 return_node_list.append(tmp)
320 return_node_list.append(node)
322 logger.log_exc("GetNodes KeyError")
326 return return_node_list
328 def AddSlice(slice_record, user_record):
329 """Add slice to the sfa tables. Called by verify_slice
330 during lease/sliver creation.
333 sfa_record = RegSlice(hrn=slice_record['slice_hrn'],
334 gid=slice_record['gid'],
335 pointer=slice_record['slice_id'],
336 authority=slice_record['authority'])
338 logger.debug("SLABDRIVER.PY AddSlice sfa_record %s user_record %s" \
339 %(sfa_record, user_record))
340 sfa_record.just_created()
341 dbsession.add(sfa_record)
343 #Update the reg-researcher dependance table
344 sfa_record.reg_researchers = [user_record]
347 #Update the senslab table with the new slice
348 #slab_slice = SenslabXP( slice_hrn = slice_record['slice_hrn'], \
349 #record_id_slice = sfa_record.record_id , \
350 #record_id_user = slice_record['record_id_user'], \
351 #peer_authority = slice_record['peer_authority'])
353 #logger.debug("SLABDRIVER.PY \tAddSlice slice_record %s \
354 #slab_slice %s sfa_record %s" \
355 #%(slice_record,slab_slice, sfa_record))
356 #slab_dbsession.add(slab_slice)
357 #slab_dbsession.commit()
360 def GetSites(self, site_filter_name_list = None, return_fields_list = None):
361 site_dict = self.oar.parser.SendRequest("GET_sites")
362 #site_dict : dict where the key is the sit ename
363 return_site_list = []
364 if not ( site_filter_name_list or return_fields_list):
365 return_site_list = site_dict.values()
366 return return_site_list
368 for site_filter_name in site_filter_name_list:
369 if site_filter_name in site_dict:
370 if return_fields_list:
371 for field in return_fields_list:
374 tmp[field] = site_dict[site_filter_name][field]
376 logger.error("GetSites KeyError %s "%(field))
378 return_site_list.append(tmp)
380 return_site_list.append( site_dict[site_filter_name])
383 return return_site_list
389 #TODO : Check rights to delete person
390 def DeletePerson(self, person_record):
391 """ Disable an existing account in senslab LDAP.
392 Users and techs can only delete themselves. PIs can only
393 delete themselves and other non-PIs at their sites.
394 ins can delete anyone.
395 Returns 1 if successful, faults otherwise.
399 #Disable user account in senslab LDAP
400 ret = self.ldap.LdapMarkUserAsDeleted(person_record)
401 logger.warning("SLABDRIVER DeletePerson %s " %(person_record))
404 #TODO Check DeleteSlice, check rights 05/07/2012 SA
405 def DeleteSlice(self, slice_record):
406 """ Deletes the specified slice.
407 Senslab : Kill the job associated with the slice if there is one
408 using DeleteSliceFromNodes.
409 Updates the slice record in slab db to remove the slice nodes.
411 Users may only delete slices of which they are members. PIs may
412 delete any of the slices at their sites, or any slices of which
413 they are members. Admins may delete any slice.
414 Returns 1 if successful, faults otherwise.
418 self.DeleteSliceFromNodes(slice_record)
419 logger.warning("SLABDRIVER DeleteSlice %s "%(slice_record))
423 def __add_person_to_db(self, user_dict):
425 check_if_exists = dbsession.query(RegUser).filter_by(email = user_dict['email']).first()
427 if not check_if_exists:
428 logger.debug("__add_person_to_db \t Adding %s \r\n \r\n \
429 _________________________________________________________________________\
430 " %(user_dict['hrn']))
431 hrn = user_dict['hrn']
432 user_record = RegUser(hrn=hrn , pointer= '-1', authority=get_authority(hrn), \
433 email=user_dict['email'], gid = None)
434 user_record.reg_keys = [RegKey(user_dict['pkey'])]
435 user_record.just_created()
436 dbsession.add (user_record)
440 #TODO AddPerson 04/07/2012 SA
441 #def AddPerson(self, auth, person_fields=None):
442 def AddPerson(self, record):#TODO fixing 28/08//2012 SA
443 """Adds a new account. Any fields specified in records are used,
444 otherwise defaults are used.
445 Accounts are disabled by default. To enable an account,
447 Returns the new person_id (> 0) if successful, faults otherwise.
451 ret = self.ldap.LdapAddUser(record)
452 logger.debug("SLABDRIVER AddPerson return code %s \r\n "%(ret))
453 self.__add_person_to_db(record)
456 #TODO AddPersonToSite 04/07/2012 SA
457 def AddPersonToSite (self, auth, person_id_or_email, \
458 site_id_or_login_base=None):
459 """ Adds the specified person to the specified site. If the person is
460 already a member of the site, no errors are returned. Does not change
461 the person's primary site.
462 Returns 1 if successful, faults otherwise.
466 logger.warning("SLABDRIVER AddPersonToSite EMPTY - DO NOTHING \r\n ")
469 #TODO AddRoleToPerson : Not sure if needed in senslab 04/07/2012 SA
470 def AddRoleToPerson(self, auth, role_id_or_name, person_id_or_email):
471 """Grants the specified role to the person.
472 PIs can only grant the tech and user roles to users and techs at their
473 sites. Admins can grant any role to any user.
474 Returns 1 if successful, faults otherwise.
478 logger.warning("SLABDRIVER AddRoleToPerson EMPTY - DO NOTHING \r\n ")
481 #TODO AddPersonKey 04/07/2012 SA
482 def AddPersonKey(self, auth, person_id_or_email, key_fields=None):
483 """Adds a new key to the specified account.
484 Non-admins can only modify their own keys.
485 Returns the new key_id (> 0) if successful, faults otherwise.
489 logger.warning("SLABDRIVER AddPersonKey EMPTY - DO NOTHING \r\n ")
492 def DeleteLeases(self, leases_id_list, slice_hrn ):
493 logger.debug("SLABDRIVER DeleteLeases leases_id_list %s slice_hrn %s \
494 \r\n " %(leases_id_list, slice_hrn))
495 for job_id in leases_id_list:
496 self.DeleteJobs(job_id, slice_hrn)
504 def LaunchExperimentOnOAR(self, added_nodes, slice_name, \
505 lease_start_time, lease_duration, slice_user=None):
507 lease_dict['lease_start_time'] = lease_start_time
508 lease_dict['lease_duration'] = lease_duration
509 lease_dict['added_nodes'] = added_nodes
510 lease_dict['slice_name'] = slice_name
511 lease_dict['slice_user'] = slice_user
512 lease_dict['grain'] = self.GetLeaseGranularity()
513 lease_dict['time_format'] = self.time_format
516 def __create_job_structure_request_for_OAR(lease_dict):
517 """ Creates the structure needed for a correct POST on OAR.
518 Makes the timestamp transformation into the appropriate format.
519 Sends the POST request to create the job with the resources in
528 reqdict['workdir'] = '/tmp'
529 reqdict['resource'] = "{network_address in ("
531 for node in lease_dict['added_nodes']:
532 logger.debug("\r\n \r\n OARrestapi \t \
533 __create_job_structure_request_for_OAR node %s" %(node))
535 # Get the ID of the node
537 reqdict['resource'] += "'" + nodeid + "', "
538 nodeid_list.append(nodeid)
540 custom_length = len(reqdict['resource'])- 2
541 reqdict['resource'] = reqdict['resource'][0:custom_length] + \
542 ")}/nodes=" + str(len(nodeid_list))
544 def __process_walltime(duration):
545 """ Calculates the walltime in seconds from the duration in H:M:S
546 specified in the RSpec.
550 # Fixing the walltime by adding a few delays.
551 # First put the walltime in seconds oarAdditionalDelay = 20;
552 # additional delay for /bin/sleep command to
553 # take in account prologue and epilogue scripts execution
554 # int walltimeAdditionalDelay = 240; additional delay
555 desired_walltime = duration
556 total_walltime = desired_walltime + 240 #+4 min Update SA 23/10/12
557 sleep_walltime = desired_walltime # 0 sec added Update SA 23/10/12
559 #Put the walltime back in str form
561 walltime.append(str(total_walltime / 3600))
562 total_walltime = total_walltime - 3600 * int(walltime[0])
563 #Get the remaining minutes
564 walltime.append(str(total_walltime / 60))
565 total_walltime = total_walltime - 60 * int(walltime[1])
567 walltime.append(str(total_walltime))
570 logger.log_exc(" __process_walltime duration null")
572 return walltime, sleep_walltime
575 walltime, sleep_walltime = \
576 __process_walltime(int(lease_dict['lease_duration'])*lease_dict['grain'])
579 reqdict['resource'] += ",walltime=" + str(walltime[0]) + \
580 ":" + str(walltime[1]) + ":" + str(walltime[2])
581 reqdict['script_path'] = "/bin/sleep " + str(sleep_walltime)
583 #In case of a scheduled experiment (not immediate)
584 #To run an XP immediately, don't specify date and time in RSpec
585 #They will be set to None.
586 if lease_dict['lease_start_time'] is not '0':
587 #Readable time accepted by OAR
588 start_time = datetime.fromtimestamp(int(lease_dict['lease_start_time'])).\
589 strftime(lease_dict['time_format'])
590 reqdict['reservation'] = start_time
591 #If there is not start time, Immediate XP. No need to add special
595 reqdict['type'] = "deploy"
596 reqdict['directory'] = ""
597 reqdict['name'] = "SFA_" + lease_dict['slice_user']
601 logger.debug("SLABDRIVER.PY \tLaunchExperimentOnOAR slice_user %s\
602 \r\n " %(slice_user))
603 #Create the request for OAR
604 reqdict = __create_job_structure_request_for_OAR(lease_dict)
605 # first step : start the OAR job and update the job
606 logger.debug("SLABDRIVER.PY \tLaunchExperimentOnOAR reqdict %s\
609 answer = self.oar.POSTRequestToOARRestAPI('POST_job', \
611 logger.debug("SLABDRIVER \tLaunchExperimentOnOAR jobid %s " %(answer))
615 logger.log_exc("SLABDRIVER \tLaunchExperimentOnOAR \
616 Impossible to create job %s " %(answer))
620 def __configure_experiment(jobid, added_nodes):
621 # second step : configure the experiment
622 # we need to store the nodes in a yaml (well...) file like this :
623 # [1,56,23,14,45,75] with name /tmp/sfa<jobid>.json
624 tmp_dir = '/tmp/sfa/'
625 if not os.path.exists(tmp_dir):
627 job_file = open(tmp_dir + str(jobid) + '.json', 'w')
629 job_file.write(str(added_nodes[0].strip('node')))
630 for node in added_nodes[1:len(added_nodes)] :
631 job_file.write(', '+ node.strip('node'))
636 def __launch_senslab_experiment(jobid):
637 # third step : call the senslab-experiment wrapper
638 #command= "java -jar target/sfa-1.0-jar-with-dependencies.jar
639 # "+str(jobid)+" "+slice_user
640 javacmdline = "/usr/bin/java"
642 "/opt/senslabexperimentwrapper/sfa-1.0-jar-with-dependencies.jar"
644 output = subprocess.Popen([javacmdline, "-jar", jarname, str(jobid), \
645 slice_user],stdout=subprocess.PIPE).communicate()[0]
647 logger.debug("SLABDRIVER \t __configure_experiment wrapper returns%s " \
654 logger.debug("SLABDRIVER \tLaunchExperimentOnOAR jobid %s \
655 added_nodes %s slice_user %s" %(jobid, added_nodes, slice_user))
658 #__configure_experiment(jobid, added_nodes)
659 #__launch_senslab_experiment(jobid)
664 def AddLeases(self, hostname_list, slice_record, \
665 lease_start_time, lease_duration):
666 logger.debug("SLABDRIVER \r\n \r\n \t AddLeases hostname_list %s \
667 slice_record %s lease_start_time %s lease_duration %s "\
668 %( hostname_list, slice_record , lease_start_time, \
671 #tmp = slice_record['reg-researchers'][0].split(".")
672 username = slice_record['login']
673 #username = tmp[(len(tmp)-1)]
674 job_id = self.LaunchExperimentOnOAR(hostname_list, slice_record['hrn'], \
675 lease_start_time, lease_duration, username)
676 start_time = datetime.fromtimestamp(int(lease_start_time)).strftime(self.time_format)
677 end_time = lease_start_time + lease_duration
679 import logging, logging.handlers
680 from sfa.util.sfalogging import _SfaLogger
681 logger.debug("SLABDRIVER \r\n \r\n \t AddLeases TURN ON LOGGING SQL %s %s %s "%(slice_record['hrn'], job_id, end_time))
682 sql_logger = _SfaLogger(loggername = 'sqlalchemy.engine', level=logging.DEBUG)
683 logger.debug("SLABDRIVER \r\n \r\n \t AddLeases %s %s %s " %(type(slice_record['hrn']), type(job_id), type(end_time)))
685 slab_ex_row = SenslabXP(slice_hrn = slice_record['hrn'], \
686 job_id = job_id, end_time= end_time)
688 logger.debug("SLABDRIVER \r\n \r\n \t AddLeases slab_ex_row %s" \
690 slab_dbsession.add(slab_ex_row)
691 slab_dbsession.commit()
693 logger.debug("SLABDRIVER \t AddLeases hostname_list start_time %s " %(start_time))
698 #Delete the jobs from job_senslab table
699 def DeleteSliceFromNodes(self, slice_record):
700 logger.debug("SLABDRIVER \t DeleteSliceFromNodese %s " %(slice_record))
701 if isinstance(slice_record['oar_job_id'],list):
702 for job_id in slice_record['oar_job_id']:
703 self.DeleteJobs(job_id, slice_record['user'])
705 self.DeleteJobs(slice_record['oar_job_id'],slice_record['user'])
709 def GetLeaseGranularity(self):
710 """ Returns the granularity of Senslab testbed.
711 OAR returns seconds for experiments duration.
713 Experiments which last less than 10 min are invalid"""
720 def update_jobs_in_slabdb( job_oar_list, jobs_psql):
721 #Get all the entries in slab_xp table
724 jobs_psql = set(jobs_psql)
725 kept_jobs = set(job_oar_list).intersection(jobs_psql)
726 logger.debug ( "\r\n \t\ update_jobs_in_slabdb jobs_psql %s \r\n \t \
727 job_oar_list %s kept_jobs %s "%(jobs_psql, job_oar_list, kept_jobs))
728 deleted_jobs = set(jobs_psql).difference(kept_jobs)
729 deleted_jobs = list(deleted_jobs)
730 if len(deleted_jobs) > 0:
731 slab_dbsession.query(SenslabXP).filter(SenslabXP.job_id.in_(deleted_jobs)).delete(synchronize_session='fetch')
732 slab_dbsession.commit()
738 def GetLeases(self, lease_filter_dict=None, login=None):
741 unfiltered_reservation_list = self.GetReservedNodes(login)
743 reservation_list = []
744 #Find the slice associated with this user senslab ldap uid
745 logger.debug(" SLABDRIVER.PY \tGetLeases login %s\
746 unfiltered_reservation_list %s " %(login, unfiltered_reservation_list))
747 #Create user dict first to avoid looking several times for
748 #the same user in LDAP SA 27/07/12
752 jobs_psql_query = slab_dbsession.query(SenslabXP).all()
753 jobs_psql_dict = [ (row.job_id, row.__dict__ )for row in jobs_psql_query ]
754 jobs_psql_dict = dict(jobs_psql_dict)
755 logger.debug("SLABDRIVER \tGetLeases jobs_psql_dict %s"\
757 jobs_psql_id_list = [ row.job_id for row in jobs_psql_query ]
761 for resa in unfiltered_reservation_list:
762 logger.debug("SLABDRIVER \tGetLeases USER %s"\
764 #Cosntruct list of jobs (runing, waiting..) in oar
765 job_oar_list.append(resa['lease_id'])
766 #If there is information on the job in SLAB DB (slice used and job id)
767 if resa['lease_id'] in jobs_psql_dict:
768 job_info = jobs_psql_dict[resa['lease_id']]
769 logger.debug("SLABDRIVER \tGetLeases resa_user_dict %s"\
771 resa['slice_hrn'] = job_info['slice_hrn']
772 resa['slice_id'] = hrn_to_urn(resa['slice_hrn'], 'slice')
774 #Assume it is a senslab slice:
776 resa['slice_id'] = hrn_to_urn(self.root_auth+'.'+ resa['user'] +"_slice" , 'slice')
777 #if resa['user'] not in resa_user_dict:
778 #logger.debug("SLABDRIVER \tGetLeases userNOTIN ")
779 #ldap_info = self.ldap.LdapSearch('(uid='+resa['user']+')')
781 #ldap_info = ldap_info[0][1]
782 ##Get the backref :relationship table reg-researchers
783 #user = dbsession.query(RegUser).options(joinedload('reg_slices_as_researcher')).filter_by(email = \
784 #ldap_info['mail'][0])
787 #user = user.__dict__
788 #slice_info = user['reg_slices_as_researcher'][0].__dict__
789 ##Separated in case user not in database :
790 ##record_id not defined SA 17/07//12
792 ##query_slice_info = slab_dbsession.query(SenslabXP).filter_by(record_id_user = user.record_id)
793 ##if query_slice_info:
794 ##slice_info = query_slice_info.first()
798 #resa_user_dict[resa['user']] = {}
799 #resa_user_dict[resa['user']]['ldap_info'] = user
800 #resa_user_dict[resa['user']]['slice_info'] = slice_info
802 #resa['slice_hrn'] = resa_user_dict[resa['user']]['slice_info']['hrn']
803 #resa['slice_id'] = hrn_to_urn(resa['slice_hrn'], 'slice')
805 resa['slice_hrn'] = Xrn(resa['slice_id']).get_hrn()
807 resa['component_id_list'] = []
808 #Transform the hostnames into urns (component ids)
809 for node in resa['reserved_nodes']:
810 #resa['component_id_list'].append(hostname_to_urn(self.hrn, \
811 #self.root_auth, node['hostname']))
812 slab_xrn = slab_xrn_object(self.root_auth, node)
813 resa['component_id_list'].append(slab_xrn.urn)
815 if lease_filter_dict:
816 logger.debug("SLABDRIVER \tGetLeases resa_ %s \r\n leasefilter %s"\
817 %(resa,lease_filter_dict))
819 if lease_filter_dict['name'] == resa['slice_hrn']:
820 reservation_list.append(resa)
822 if lease_filter_dict is None:
823 reservation_list = unfiltered_reservation_list
825 #del unfiltered_reservation_list[unfiltered_reservation_list.index(resa)]
828 self.update_jobs_in_slabdb(job_oar_list, jobs_psql_id_list)
830 #for resa in unfiltered_reservation_list:
834 #if resa['user'] in resa_user_dict:
835 #resa['slice_hrn'] = resa_user_dict[resa['user']]['slice_info']['hrn']
836 #resa['slice_id'] = hrn_to_urn(resa['slice_hrn'], 'slice')
838 ##resa['slice_id'] = hrn_to_urn(slice_info.slice_hrn, 'slice')
839 #resa['component_id_list'] = []
840 ##Transform the hostnames into urns (component ids)
841 #for node in resa['reserved_nodes']:
842 ##resa['component_id_list'].append(hostname_to_urn(self.hrn, \
843 ##self.root_auth, node['hostname']))
844 #slab_xrn = slab_xrn_object(self.root_auth, node)
845 #resa['component_id_list'].append(slab_xrn.urn)
847 ##Filter the reservation list if necessary
848 ##Returns all the leases associated with a given slice
849 #if lease_filter_dict:
850 #logger.debug("SLABDRIVER \tGetLeases lease_filter_dict %s"\
851 #%(lease_filter_dict))
852 #for resa in unfiltered_reservation_list:
853 #if lease_filter_dict['name'] == resa['slice_hrn']:
854 #reservation_list.append(resa)
856 #reservation_list = unfiltered_reservation_list
858 logger.debug(" SLABDRIVER.PY \tGetLeases reservation_list %s"\
860 return reservation_list
865 #TODO FUNCTIONS SECTION 04/07/2012 SA
867 #TODO : Is UnBindObjectFromPeer still necessary ? Currently does nothing
870 def UnBindObjectFromPeer( auth, object_type, object_id, shortname):
871 """ This method is a hopefully temporary hack to let the sfa correctly
872 detach the objects it creates from a remote peer object. This is
873 needed so that the sfa federation link can work in parallel with
874 RefreshPeer, as RefreshPeer depends on remote objects being correctly
877 auth : struct, API authentication structure
878 AuthMethod : string, Authentication method to use
879 object_type : string, Object type, among 'site','person','slice',
881 object_id : int, object_id
882 shortname : string, peer shortname
886 logger.warning("SLABDRIVER \tUnBindObjectFromPeer EMPTY-\
890 #TODO Is BindObjectToPeer still necessary ? Currently does nothing
892 def BindObjectToPeer(self, auth, object_type, object_id, shortname=None, \
893 remote_object_id=None):
894 """This method is a hopefully temporary hack to let the sfa correctly
895 attach the objects it creates to a remote peer object. This is needed
896 so that the sfa federation link can work in parallel with RefreshPeer,
897 as RefreshPeer depends on remote objects being correctly marked.
899 shortname : string, peer shortname
900 remote_object_id : int, remote object_id, set to 0 if unknown
904 logger.warning("SLABDRIVER \tBindObjectToPeer EMPTY - DO NOTHING \r\n ")
907 #TODO UpdateSlice 04/07/2012 SA
908 #Funciton should delete and create another job since oin senslab slice=job
909 def UpdateSlice(self, auth, slice_id_or_name, slice_fields=None):
910 """Updates the parameters of an existing slice with the values in
912 Users may only update slices of which they are members.
913 PIs may update any of the slices at their sites, or any slices of
914 which they are members. Admins may update any slice.
915 Only PIs and admins may update max_nodes. Slices cannot be renewed
916 (by updating the expires parameter) more than 8 weeks into the future.
917 Returns 1 if successful, faults otherwise.
921 logger.warning("SLABDRIVER UpdateSlice EMPTY - DO NOTHING \r\n ")
924 #TODO UpdatePerson 04/07/2012 SA
925 def UpdatePerson(self, slab_hrn, federated_hrn, person_fields=None):
926 """Updates a person. Only the fields specified in person_fields
927 are updated, all other fields are left untouched.
928 Users and techs can only update themselves. PIs can only update
929 themselves and other non-PIs at their sites.
930 Returns 1 if successful, faults otherwise.
934 #new_row = FederatedToSenslab(slab_hrn, federated_hrn)
935 #slab_dbsession.add(new_row)
936 #slab_dbsession.commit()
938 logger.debug("SLABDRIVER UpdatePerson EMPTY - DO NOTHING \r\n ")
941 #TODO GetKeys 04/07/2012 SA
942 def GetKeys(self, auth, key_filter=None, return_fields=None):
943 """Returns an array of structs containing details about keys.
944 If key_filter is specified and is an array of key identifiers,
945 or a struct of key attributes, only keys matching the filter
946 will be returned. If return_fields is specified, only the
947 specified details will be returned.
949 Admin may query all keys. Non-admins may only query their own keys.
953 logger.warning("SLABDRIVER GetKeys EMPTY - DO NOTHING \r\n ")
956 #TODO DeleteKey 04/07/2012 SA
957 def DeleteKey(self, key_id):
959 Non-admins may only delete their own keys.
960 Returns 1 if successful, faults otherwise.
964 logger.warning("SLABDRIVER DeleteKey EMPTY - DO NOTHING \r\n ")
971 def _sql_get_slice_info( slice_filter ):
972 #DO NOT USE RegSlice - reg_researchers to get the hrn
973 #of the user otherwise will mess up the RegRecord in
974 #Resolve, don't know why - SA 08/08/2012
976 #Only one entry for one user = one slice in slab_xp table
977 #slicerec = dbsession.query(RegRecord).filter_by(hrn = slice_filter).first()
978 raw_slicerec = dbsession.query(RegSlice).options(joinedload('reg_researchers')).filter_by(hrn = slice_filter).first()
979 #raw_slicerec = dbsession.query(RegRecord).filter_by(hrn = slice_filter).first()
982 #raw_slicerec.reg_researchers
983 raw_slicerec = raw_slicerec.__dict__
984 logger.debug(" SLABDRIVER \t get_slice_info slice_filter %s \
985 raw_slicerec %s"%(slice_filter, raw_slicerec))
986 slicerec = raw_slicerec
987 #only one researcher per slice so take the first one
988 #slicerec['reg_researchers'] = raw_slicerec['reg_researchers']
989 #del slicerec['reg_researchers']['_sa_instance_state']
996 def _sql_get_slice_info_from_user(slice_filter ):
997 #slicerec = dbsession.query(RegRecord).filter_by(record_id = slice_filter).first()
998 raw_slicerec = dbsession.query(RegUser).options(joinedload('reg_slices_as_researcher')).filter_by(record_id = slice_filter).first()
999 #raw_slicerec = dbsession.query(RegRecord).filter_by(record_id = slice_filter).first()
1000 #Put it in correct order
1001 user_needed_fields = ['peer_authority', 'hrn', 'last_updated', 'classtype', 'authority', 'gid', 'record_id', 'date_created', 'type', 'email', 'pointer']
1002 slice_needed_fields = ['peer_authority', 'hrn', 'last_updated', 'classtype', 'authority', 'gid', 'record_id', 'date_created', 'type', 'pointer']
1004 #raw_slicerec.reg_slices_as_researcher
1005 raw_slicerec = raw_slicerec.__dict__
1008 dict([(k, raw_slicerec['reg_slices_as_researcher'][0].__dict__[k]) \
1009 for k in slice_needed_fields])
1010 slicerec['reg_researchers'] = dict([(k, raw_slicerec[k]) \
1011 for k in user_needed_fields])
1012 #TODO Handle multiple slices for one user SA 10/12/12
1013 #for now only take the first slice record associated to the rec user
1014 ##slicerec = raw_slicerec['reg_slices_as_researcher'][0].__dict__
1015 #del raw_slicerec['reg_slices_as_researcher']
1016 #slicerec['reg_researchers'] = raw_slicerec
1017 ##del slicerec['_sa_instance_state']
1024 def _get_slice_records(self, slice_filter = None, \
1025 slice_filter_type = None):
1029 #Get list of slices based on the slice hrn
1030 if slice_filter_type == 'slice_hrn':
1032 #if get_authority(slice_filter) == self.root_auth:
1033 #login = slice_filter.split(".")[1].split("_")[0]
1035 slicerec = self._sql_get_slice_info(slice_filter)
1037 if slicerec is None:
1041 #Get slice based on user id
1042 if slice_filter_type == 'record_id_user':
1044 slicerec = self._sql_get_slice_info_from_user(slice_filter)
1047 fixed_slicerec_dict = slicerec
1048 #At this point if the there is no login it means
1049 #record_id_user filter has been used for filtering
1051 ##If theslice record is from senslab
1052 #if fixed_slicerec_dict['peer_authority'] is None:
1053 #login = fixed_slicerec_dict['hrn'].split(".")[1].split("_")[0]
1054 #return login, fixed_slicerec_dict
1055 return fixed_slicerec_dict
1059 def GetSlices(self, slice_filter = None, slice_filter_type = None, login=None):
1060 """ Get the slice records from the slab db.
1061 Returns a slice ditc if slice_filter and slice_filter_type
1063 Returns a list of slice dictionnaries if there are no filters
1068 authorized_filter_types_list = ['slice_hrn', 'record_id_user']
1069 return_slicerec_dictlist = []
1071 #First try to get information on the slice based on the filter provided
1072 if slice_filter_type in authorized_filter_types_list:
1073 fixed_slicerec_dict = \
1074 self._get_slice_records(slice_filter, slice_filter_type)
1075 #login, fixed_slicerec_dict = \
1076 #self._get_slice_records(slice_filter, slice_filter_type)
1077 logger.debug(" SLABDRIVER \tGetSlices login %s \
1078 slice record %s slice_filter %s slice_filter_type %s "\
1079 %(login, fixed_slicerec_dict,slice_filter, slice_filter_type))
1082 #Now we have the slice record fixed_slicerec_dict, get the
1083 #jobs associated to this slice
1084 #leases_list = self.GetReservedNodes(username = login)
1085 leases_list = self.GetLeases(login = login)
1086 #If no job is running or no job scheduled
1087 #return only the slice record
1088 if leases_list == [] and fixed_slicerec_dict:
1089 return_slicerec_dictlist.append(fixed_slicerec_dict)
1091 #If several jobs for one slice , put the slice record into
1092 # each lease information dict
1095 for lease in leases_list :
1097 logger.debug("SLABDRIVER.PY \tGetSlices slice_filter %s \
1098 \ lease['slice_hrn'] %s" \
1099 %(slice_filter, lease['slice_hrn']))
1100 if slice_filter_type =='slice_hrn' and lease['slice_hrn'] == slice_filter:
1101 reserved_list = lease['reserved_nodes']
1102 slicerec_dict['slice_hrn'] = lease['slice_hrn']
1103 slicerec_dict['hrn'] = lease['slice_hrn']
1104 slicerec_dict['user'] = lease['user']
1105 slicerec_dict['oar_job_id'] = lease['lease_id']
1106 slicerec_dict.update({'list_node_ids':{'hostname':reserved_list}})
1107 slicerec_dict.update({'node_ids':lease['reserved_nodes']})
1109 #Update lease dict with the slice record
1110 if fixed_slicerec_dict:
1111 fixed_slicerec_dict['oar_job_id'] = []
1112 fixed_slicerec_dict['oar_job_id'].append(slicerec_dict['oar_job_id'])
1113 slicerec_dict.update(fixed_slicerec_dict)
1114 #slicerec_dict.update({'hrn':\
1115 #str(fixed_slicerec_dict['slice_hrn'])})
1117 return_slicerec_dictlist.append(slicerec_dict)
1118 logger.debug("SLABDRIVER.PY \tGetSlices \
1119 OHOHOHOH %s" %(return_slicerec_dictlist ))
1121 logger.debug("SLABDRIVER.PY \tGetSlices \
1122 slicerec_dict %s return_slicerec_dictlist %s \
1123 lease['reserved_nodes'] \
1124 %s" %(slicerec_dict, return_slicerec_dictlist, \
1125 lease['reserved_nodes'] ))
1127 logger.debug("SLABDRIVER.PY \tGetSlices RETURN \
1128 return_slicerec_dictlist %s" \
1129 %(return_slicerec_dictlist))
1131 return return_slicerec_dictlist
1135 #Get all slices from the senslab sfa database ,
1136 #put them in dict format
1137 #query_slice_list = dbsession.query(RegRecord).all()
1138 query_slice_list = dbsession.query(RegSlice).options(joinedload('reg_researchers')).all()
1139 #query_slice_list = dbsession.query(RegRecord).filter_by(type='slice').all()
1140 #query_slice_list = slab_dbsession.query(SenslabXP).all()
1141 return_slicerec_dictlist = []
1142 for record in query_slice_list:
1143 tmp = record.__dict__
1144 tmp['reg_researchers'] = tmp['reg_researchers'][0].__dict__
1145 #del tmp['reg_researchers']['_sa_instance_state']
1146 return_slicerec_dictlist.append(tmp)
1147 #return_slicerec_dictlist.append(record.__dict__)
1149 #Get all the jobs reserved nodes
1150 leases_list = self.GetReservedNodes()
1153 for fixed_slicerec_dict in return_slicerec_dictlist:
1155 #Check if the slice belongs to a senslab user
1156 if fixed_slicerec_dict['peer_authority'] is None:
1157 owner = fixed_slicerec_dict['hrn'].split(".")[1].split("_")[0]
1160 for lease in leases_list:
1161 if owner == lease['user']:
1162 slicerec_dict['oar_job_id'] = lease['lease_id']
1164 #for reserved_node in lease['reserved_nodes']:
1165 logger.debug("SLABDRIVER.PY \tGetSlices lease %s "\
1168 reserved_list = lease['reserved_nodes']
1170 slicerec_dict.update({'node_ids':lease['reserved_nodes']})
1171 slicerec_dict.update({'list_node_ids':{'hostname':reserved_list}})
1172 slicerec_dict.update(fixed_slicerec_dict)
1173 #slicerec_dict.update({'hrn':\
1174 #str(fixed_slicerec_dict['slice_hrn'])})
1175 #return_slicerec_dictlist.append(slicerec_dict)
1176 fixed_slicerec_dict.update(slicerec_dict)
1178 logger.debug("SLABDRIVER.PY \tGetSlices RETURN \
1179 return_slicerec_dictlist %s \slice_filter %s " \
1180 %(return_slicerec_dictlist, slice_filter))
1182 return return_slicerec_dictlist
1188 # Convert SFA fields to PLC fields for use when registering up updating
1189 # registry record in the PLC database
1191 # @param type type of record (user, slice, ...)
1192 # @param hrn human readable name
1193 # @param sfa_fields dictionary of SFA fields
1194 # @param slab_fields dictionary of PLC fields (output)
1196 def sfa_fields_to_slab_fields(sfa_type, hrn, record):
1200 #for field in record:
1201 # slab_record[field] = record[field]
1203 if sfa_type == "slice":
1204 #instantion used in get_slivers ?
1205 if not "instantiation" in slab_record:
1206 slab_record["instantiation"] = "senslab-instantiated"
1207 #slab_record["hrn"] = hrn_to_pl_slicename(hrn)
1208 #Unused hrn_to_pl_slicename because Slab's hrn already
1209 #in the appropriate form SA 23/07/12
1210 slab_record["hrn"] = hrn
1211 logger.debug("SLABDRIVER.PY sfa_fields_to_slab_fields \
1212 slab_record %s " %(slab_record['hrn']))
1214 slab_record["url"] = record["url"]
1215 if "description" in record:
1216 slab_record["description"] = record["description"]
1217 if "expires" in record:
1218 slab_record["expires"] = int(record["expires"])
1220 #nodes added by OAR only and then imported to SFA
1221 #elif type == "node":
1222 #if not "hostname" in slab_record:
1223 #if not "hostname" in record:
1224 #raise MissingSfaInfo("hostname")
1225 #slab_record["hostname"] = record["hostname"]
1226 #if not "model" in slab_record:
1227 #slab_record["model"] = "geni"
1230 #elif type == "authority":
1231 #slab_record["login_base"] = hrn_to_slab_login_base(hrn)
1233 #if not "name" in slab_record:
1234 #slab_record["name"] = hrn
1236 #if not "abbreviated_name" in slab_record:
1237 #slab_record["abbreviated_name"] = hrn
1239 #if not "enabled" in slab_record:
1240 #slab_record["enabled"] = True
1242 #if not "is_public" in slab_record:
1243 #slab_record["is_public"] = True
1250 def __transforms_timestamp_into_date(self, xp_utc_timestamp = None):
1251 """ Transforms unix timestamp into valid OAR date format """
1253 #Used in case of a scheduled experiment (not immediate)
1254 #To run an XP immediately, don't specify date and time in RSpec
1255 #They will be set to None.
1256 if xp_utc_timestamp:
1257 #transform the xp_utc_timestamp into server readable time
1258 xp_server_readable_date = datetime.fromtimestamp(int(\
1259 xp_utc_timestamp)).strftime(self.time_format)
1261 return xp_server_readable_date
1279 class SlabDriver(Driver):
1280 """ Senslab Driver class inherited from Driver generic class.
1282 Contains methods compliant with the SFA standard and the testbed
1283 infrastructure (calls to LDAP and OAR).
1285 def __init__(self, config):
1286 Driver.__init__ (self, config)
1287 self.config = config
1288 self.hrn = config.SFA_INTERFACE_HRN
1290 self.db = SlabDB(config, debug = False)
1291 self.slab_api = SlabTestbedAPI(config)
1294 def augment_records_with_testbed_info (self, record_list ):
1295 """ Adds specific testbed info to the records. """
1296 return self.fill_record_info (record_list)
1298 def fill_record_info(self, record_list):
1300 Given a SFA record, fill in the senslab specific and SFA specific
1301 fields in the record.
1304 logger.debug("SLABDRIVER \tfill_record_info records %s " %(record_list))
1305 if not isinstance(record_list, list):
1306 record_list = [record_list]
1309 for record in record_list:
1310 #If the record is a SFA slice record, then add information
1311 #about the user of this slice. This kind of
1312 #information is in the Senslab's DB.
1313 if str(record['type']) == 'slice':
1314 if 'reg_researchers' in record and \
1315 isinstance(record['reg_researchers'], list) :
1316 record['reg_researchers'] = record['reg_researchers'][0].__dict__
1317 record.update({'PI':[record['reg_researchers']['hrn']],
1318 'researcher': [record['reg_researchers']['hrn']],
1319 'name':record['hrn'],
1322 'person_ids':[record['reg_researchers']['record_id']],
1323 'geni_urn':'', #For client_helper.py compatibility
1324 'keys':'', #For client_helper.py compatibility
1325 'key_ids':''}) #For client_helper.py compatibility
1328 #Get slab slice record.
1329 recslice_list = self.slab_api.GetSlices(slice_filter = \
1330 str(record['hrn']),\
1331 slice_filter_type = 'slice_hrn')
1334 logger.debug("SLABDRIVER \tfill_record_info \
1335 TYPE SLICE RECUSER record['hrn'] %s ecord['oar_job_id']\
1336 %s " %(record['hrn'], record['oar_job_id']))
1338 for rec in recslice_list:
1339 logger.debug("SLABDRIVER\r\n \t fill_record_info oar_job_id %s " %(rec['oar_job_id']))
1340 del record['reg_researchers']
1341 record['node_ids'] = [ self.slab_api.root_auth + hostname for hostname in rec['node_ids']]
1345 logger.debug( "SLABDRIVER.PY \t fill_record_info SLICE \
1346 recslice_list %s \r\n \t RECORD %s \r\n \
1347 \r\n" %(recslice_list, record))
1348 if str(record['type']) == 'user':
1349 #The record is a SFA user record.
1350 #Get the information about his slice from Senslab's DB
1351 #and add it to the user record.
1352 recslice_list = self.slab_api.GetSlices(\
1353 slice_filter = record['record_id'],\
1354 slice_filter_type = 'record_id_user')
1356 logger.debug( "SLABDRIVER.PY \t fill_record_info TYPE USER \
1357 recslice_list %s \r\n \t RECORD %s \r\n" %(recslice_list , record))
1358 #Append slice record in records list,
1359 #therefore fetches user and slice info again(one more loop)
1360 #Will update PIs and researcher for the slice
1361 #recuser = dbsession.query(RegRecord).filter_by(record_id = \
1362 #recslice_list[0]['record_id_user']).first()
1363 recuser = recslice_list[0]['reg_researchers']
1364 logger.debug( "SLABDRIVER.PY \t fill_record_info USER \
1365 recuser %s \r\n \r\n" %(recuser))
1367 recslice = recslice_list[0]
1368 recslice.update({'PI':[recuser['hrn']],
1369 'researcher': [recuser['hrn']],
1370 'name':record['hrn'],
1373 'person_ids':[recuser['record_id']]})
1375 for rec in recslice_list:
1376 recslice['oar_job_id'].append(rec['oar_job_id'])
1380 recslice.update({'type':'slice', \
1381 'hrn':recslice_list[0]['hrn']})
1384 #GetPersons takes [] as filters
1385 user_slab = self.slab_api.GetPersons([record])
1388 record.update(user_slab[0])
1389 #For client_helper.py compatibility
1390 record.update( { 'geni_urn':'',
1393 record_list.append(recslice)
1395 logger.debug("SLABDRIVER.PY \tfill_record_info ADDING SLICE\
1396 INFO TO USER records %s" %(record_list))
1398 logger.debug("SLABDRIVER.PY \tfill_record_info END \
1399 record %s \r\n \r\n " %(record))
1401 except TypeError, error:
1402 logger.log_exc("SLABDRIVER \t fill_record_info EXCEPTION %s"\
1404 #logger.debug("SLABDRIVER.PY \t fill_record_info ENDENDEND ")
1409 def sliver_status(self, slice_urn, slice_hrn):
1410 """Receive a status request for slice named urn/hrn
1411 urn:publicid:IDN+senslab+nturro_slice hrn senslab.nturro_slice
1412 shall return a structure as described in
1413 http://groups.geni.net/geni/wiki/GAPI_AM_API_V2#SliverStatus
1414 NT : not sure if we should implement this or not, but used by sface.
1418 #First get the slice with the slice hrn
1419 slice_list = self.slab_api.GetSlices(slice_filter = slice_hrn, \
1420 slice_filter_type = 'slice_hrn')
1422 if len(slice_list) is 0:
1423 raise SliverDoesNotExist("%s slice_hrn" % (slice_hrn))
1425 #Used for fetching the user info witch comes along the slice info
1426 one_slice = slice_list[0]
1429 #Make a list of all the nodes hostnames in use for this slice
1430 slice_nodes_list = []
1431 #for single_slice in slice_list:
1432 #for node in single_slice['node_ids']:
1433 #slice_nodes_list.append(node['hostname'])
1434 for node in one_slice:
1435 slice_nodes_list.append(node['hostname'])
1437 #Get all the corresponding nodes details
1438 nodes_all = self.slab_api.GetNodes({'hostname':slice_nodes_list},
1439 ['node_id', 'hostname','site','boot_state'])
1440 nodeall_byhostname = dict([(one_node['hostname'], one_node) \
1441 for one_node in nodes_all])
1445 for single_slice in slice_list:
1448 top_level_status = 'empty'
1451 ['geni_urn','pl_login','geni_status','geni_resources'], None)
1452 result['pl_login'] = one_slice['reg_researchers']['hrn']
1453 logger.debug("Slabdriver - sliver_status Sliver status \
1454 urn %s hrn %s single_slice %s \r\n " \
1455 %(slice_urn, slice_hrn, single_slice))
1457 if 'node_ids' not in single_slice:
1458 #No job in the slice
1459 result['geni_status'] = top_level_status
1460 result['geni_resources'] = []
1463 top_level_status = 'ready'
1465 #A job is running on Senslab for this slice
1466 # report about the local nodes that are in the slice only
1468 result['geni_urn'] = slice_urn
1472 #timestamp = float(sl['startTime']) + float(sl['walltime'])
1473 #result['pl_expires'] = strftime(self.time_format, \
1474 #gmtime(float(timestamp)))
1475 #result['slab_expires'] = strftime(self.time_format,\
1476 #gmtime(float(timestamp)))
1479 for node in single_slice['node_ids']:
1481 #res['slab_hostname'] = node['hostname']
1482 #res['slab_boot_state'] = node['boot_state']
1484 res['pl_hostname'] = node['hostname']
1485 res['pl_boot_state'] = \
1486 nodeall_byhostname[node['hostname']]['boot_state']
1487 #res['pl_last_contact'] = strftime(self.time_format, \
1488 #gmtime(float(timestamp)))
1489 sliver_id = Xrn(slice_urn, type='slice', \
1490 id=nodeall_byhostname[node['hostname']]['node_id'], \
1491 authority=self.hrn).urn
1493 res['geni_urn'] = sliver_id
1494 node_name = node['hostname']
1495 if nodeall_byhostname[node_name]['boot_state'] == 'Alive':
1497 res['geni_status'] = 'ready'
1499 res['geni_status'] = 'failed'
1500 top_level_status = 'failed'
1502 res['geni_error'] = ''
1504 resources.append(res)
1506 result['geni_status'] = top_level_status
1507 result['geni_resources'] = resources
1508 logger.debug("SLABDRIVER \tsliver_statusresources %s res %s "\
1513 def get_user_record( hrn):
1514 """ Returns the user record based on the hrn from the SFA DB """
1515 return dbsession.query(RegRecord).filter_by(hrn = hrn).first()
1518 def testbed_name (self):
1521 # 'geni_request_rspec_versions' and 'geni_ad_rspec_versions' are mandatory
1522 def aggregate_version (self):
1523 version_manager = VersionManager()
1524 ad_rspec_versions = []
1525 request_rspec_versions = []
1526 for rspec_version in version_manager.versions:
1527 if rspec_version.content_type in ['*', 'ad']:
1528 ad_rspec_versions.append(rspec_version.to_dict())
1529 if rspec_version.content_type in ['*', 'request']:
1530 request_rspec_versions.append(rspec_version.to_dict())
1532 'testbed':self.testbed_name(),
1533 'geni_request_rspec_versions': request_rspec_versions,
1534 'geni_ad_rspec_versions': ad_rspec_versions,
1538 def create_sliver (self, slice_urn, slice_hrn, creds, rspec_string, \
1540 aggregate = SlabAggregate(self)
1542 slices = SlabSlices(self)
1543 peer = slices.get_peer(slice_hrn)
1544 sfa_peer = slices.get_sfa_peer(slice_hrn)
1547 if not isinstance(creds, list):
1551 slice_record = users[0].get('slice_record', {})
1552 logger.debug("SLABDRIVER.PY \t ===============create_sliver \t\
1553 creds %s \r\n \r\n users %s" \
1555 slice_record['user'] = {'keys':users[0]['keys'], \
1556 'email':users[0]['email'], \
1557 'hrn':slice_record['reg-researchers'][0]}
1559 rspec = RSpec(rspec_string)
1560 logger.debug("SLABDRIVER.PY \t create_sliver \trspec.version \
1561 %s slice_record %s users %s" \
1562 %(rspec.version,slice_record, users))
1565 # ensure site record exists?
1566 # ensure slice record exists
1567 #Removed options to verify_slice SA 14/08/12
1568 sfa_slice = slices.verify_slice(slice_hrn, slice_record, peer, \
1571 # ensure person records exists
1572 #verify_persons returns added persons but since the return value
1574 slices.verify_persons(slice_hrn, sfa_slice, users, peer, \
1575 sfa_peer, options=options)
1576 #requested_attributes returned by rspec.version.get_slice_attributes()
1577 #unused, removed SA 13/08/12
1578 rspec.version.get_slice_attributes()
1580 logger.debug("SLABDRIVER.PY create_sliver slice %s " %(sfa_slice))
1582 # add/remove slice from nodes
1584 requested_slivers = [node.get('component_id') \
1585 for node in rspec.version.get_nodes_with_slivers()\
1586 if node.get('authority_id') is self.slab_api.root_auth]
1587 l = [ node for node in rspec.version.get_nodes_with_slivers() ]
1588 logger.debug("SLADRIVER \tcreate_sliver requested_slivers \
1589 requested_slivers %s listnodes %s" \
1590 %(requested_slivers,l))
1591 #verify_slice_nodes returns nodes, but unused here. Removed SA 13/08/12.
1592 #slices.verify_slice_nodes(sfa_slice, requested_slivers, peer)
1595 requested_lease_list = []
1599 for lease in rspec.version.get_leases():
1600 single_requested_lease = {}
1601 logger.debug("SLABDRIVER.PY \tcreate_sliver lease %s " %(lease))
1603 if not lease.get('lease_id'):
1604 if get_authority(lease['component_id']) == self.slab_api.root_auth:
1605 single_requested_lease['hostname'] = \
1606 slab_xrn_to_hostname(\
1607 lease.get('component_id').strip())
1608 single_requested_lease['start_time'] = \
1609 lease.get('start_time')
1610 single_requested_lease['duration'] = lease.get('duration')
1611 #Check the experiment's duration is valid before adding
1612 #the lease to the requested leases list
1613 duration_in_seconds = \
1614 int(single_requested_lease['duration'])*60
1615 if duration_in_seconds > self.slab_api.GetLeaseGranularity():
1616 requested_lease_list.append(single_requested_lease)
1618 #Create dict of leases by start_time, regrouping nodes reserved
1620 #time, for the same amount of time = one job on OAR
1621 requested_job_dict = {}
1622 for lease in requested_lease_list:
1624 #In case it is an asap experiment start_time is empty
1625 if lease['start_time'] == '':
1626 lease['start_time'] = '0'
1628 if lease['start_time'] not in requested_job_dict:
1629 if isinstance(lease['hostname'], str):
1630 lease['hostname'] = [lease['hostname']]
1632 requested_job_dict[lease['start_time']] = lease
1635 job_lease = requested_job_dict[lease['start_time']]
1636 if lease['duration'] == job_lease['duration'] :
1637 job_lease['hostname'].append(lease['hostname'])
1642 logger.debug("SLABDRIVER.PY \tcreate_sliver requested_job_dict %s "\
1643 %(requested_job_dict))
1644 #verify_slice_leases returns the leases , but the return value is unused
1645 #here. Removed SA 13/08/12
1646 slices.verify_slice_leases(sfa_slice, \
1647 requested_job_dict, peer)
1649 return aggregate.get_rspec(slice_xrn=slice_urn, \
1650 login=sfa_slice['login'], version=rspec.version)
1653 def delete_sliver (self, slice_urn, slice_hrn, creds, options):
1655 sfa_slice_list = self.slab_api.GetSlices(slice_filter = slice_hrn, \
1656 slice_filter_type = 'slice_hrn')
1658 if not sfa_slice_list:
1661 #Delete all in the slice
1662 for sfa_slice in sfa_slice_list:
1665 logger.debug("SLABDRIVER.PY delete_sliver slice %s" %(sfa_slice))
1666 slices = SlabSlices(self)
1667 # determine if this is a peer slice
1669 peer = slices.get_peer(slice_hrn)
1670 #TODO delete_sliver SA : UnBindObjectFromPeer should be
1671 #used when there is another
1672 #senslab testbed, which is not the case 14/08/12 .
1674 logger.debug("SLABDRIVER.PY delete_sliver peer %s \r\n \t sfa_slice %s " %(peer, sfa_slice))
1677 #self.slab_api.UnBindObjectFromPeer('slice', \
1678 #sfa_slice['record_id_slice'], \
1680 self.slab_api.DeleteSliceFromNodes(sfa_slice)
1686 #self.slab_api.BindObjectToPeer('slice', \
1687 #sfa_slice['record_id_slice'], \
1688 #peer, sfa_slice['peer_slice_id'])
1692 # first 2 args are None in case of resource discovery
1693 def list_resources (self, slice_urn, slice_hrn, creds, options):
1694 #cached_requested = options.get('cached', True)
1696 version_manager = VersionManager()
1697 # get the rspec's return format from options
1699 version_manager.get_version(options.get('geni_rspec_version'))
1700 version_string = "rspec_%s" % (rspec_version)
1702 #panos adding the info option to the caching key (can be improved)
1703 if options.get('info'):
1704 version_string = version_string + "_" + \
1705 options.get('info', 'default')
1707 # Adding the list_leases option to the caching key
1708 if options.get('list_leases'):
1709 version_string = version_string + "_"+options.get('list_leases', 'default')
1711 # Adding geni_available to caching key
1712 if options.get('geni_available'):
1713 version_string = version_string + "_" + str(options.get('geni_available'))
1715 # look in cache first
1716 #if cached_requested and self.cache and not slice_hrn:
1717 #rspec = self.cache.get(version_string)
1719 #logger.debug("SlabDriver.ListResources: \
1720 #returning cached advertisement")
1723 #panos: passing user-defined options
1724 aggregate = SlabAggregate(self)
1725 #origin_hrn = Credential(string=creds[0]).get_gid_caller().get_hrn()
1726 #options.update({'origin_hrn':origin_hrn})
1727 rspec = aggregate.get_rspec(slice_xrn=slice_urn, \
1728 version=rspec_version, options=options)
1731 #if self.cache and not slice_hrn:
1732 #logger.debug("Slab.ListResources: stores advertisement in cache")
1733 #self.cache.add(version_string, rspec)
1738 def list_slices (self, creds, options):
1739 # look in cache first
1741 #slices = self.cache.get('slices')
1743 #logger.debug("PlDriver.list_slices returns from cache")
1748 slices = self.slab_api.GetSlices()
1749 logger.debug("SLABDRIVER.PY \tlist_slices hrn %s \r\n \r\n" %(slices))
1750 slice_hrns = [slab_slice['hrn'] for slab_slice in slices]
1752 slice_urns = [hrn_to_urn(slice_hrn, 'slice') \
1753 for slice_hrn in slice_hrns]
1757 #logger.debug ("SlabDriver.list_slices stores value in cache")
1758 #self.cache.add('slices', slice_urns)
1763 def register (self, sfa_record, hrn, pub_key):
1765 Adding new user, slice, node or site should not be handled
1769 Adding users = LDAP Senslab
1770 Adding slice = Import from LDAP users
1776 def update (self, old_sfa_record, new_sfa_record, hrn, new_key):
1777 """No site or node record update allowed in Senslab."""
1779 pointer = old_sfa_record['pointer']
1780 old_sfa_record_type = old_sfa_record['type']
1782 # new_key implemented for users only
1783 if new_key and old_sfa_record_type not in [ 'user' ]:
1784 raise UnknownSfaType(old_sfa_record_type)
1786 #if (type == "authority"):
1787 #self.shell.UpdateSite(pointer, new_sfa_record)
1789 if old_sfa_record_type == "slice":
1790 slab_record = self.slab_api.sfa_fields_to_slab_fields(old_sfa_record_type, \
1791 hrn, new_sfa_record)
1792 if 'name' in slab_record:
1793 slab_record.pop('name')
1794 #Prototype should be UpdateSlice(self,
1795 #auth, slice_id_or_name, slice_fields)
1796 #Senslab cannot update slice since slice = job
1797 #so we must delete and create another job
1798 self.slab_api.UpdateSlice(pointer, slab_record)
1800 elif old_sfa_record_type == "user":
1802 all_fields = new_sfa_record
1803 for key in all_fields.keys():
1804 if key in ['first_name', 'last_name', 'title', 'email',
1805 'password', 'phone', 'url', 'bio', 'accepted_aup',
1807 update_fields[key] = all_fields[key]
1808 self.slab_api.UpdatePerson(pointer, update_fields)
1811 # must check this key against the previous one if it exists
1812 persons = self.slab_api.GetPersons(['key_ids'])
1814 keys = person['key_ids']
1815 keys = self.slab_api.GetKeys(person['key_ids'])
1817 # Delete all stale keys
1820 if new_key != key['key']:
1821 self.slab_api.DeleteKey(key['key_id'])
1825 self.slab_api.AddPersonKey(pointer, {'key_type': 'ssh', \
1832 def remove (self, sfa_record):
1833 sfa_record_type = sfa_record['type']
1834 hrn = sfa_record['hrn']
1835 if sfa_record_type == 'user':
1837 #get user from senslab ldap
1838 person = self.slab_api.GetPersons(sfa_record)
1839 #No registering at a given site in Senslab.
1840 #Once registered to the LDAP, all senslab sites are
1843 #Mark account as disabled in ldap
1844 self.slab_api.DeletePerson(sfa_record)
1845 elif sfa_record_type == 'slice':
1846 if self.slab_api.GetSlices(slice_filter = hrn, \
1847 slice_filter_type = 'slice_hrn'):
1848 self.slab_api.DeleteSlice(sfa_record)
1850 #elif type == 'authority':
1851 #if self.GetSites(pointer):
1852 #self.DeleteSite(pointer)