4 from datetime import datetime
6 from sfa.util.faults import SliverDoesNotExist, UnknownSfaType
7 from sfa.util.sfalogging import logger
8 from sfa.storage.alchemy import dbsession
9 from sfa.storage.model import RegRecord, RegUser, RegSlice, RegKey
10 from sqlalchemy.orm import joinedload
13 from sfa.managers.driver import Driver
14 from sfa.rspecs.version_manager import VersionManager
15 from sfa.rspecs.rspec import RSpec
17 from sfa.util.xrn import Xrn, hrn_to_urn, get_authority
20 ## thierry: everything that is API-related (i.e. handling incoming requests)
22 # SlabDriver should be really only about talking to the senslab testbed
25 from sfa.senslab.OARrestapi import OARrestapi
26 from sfa.senslab.LDAPapi import LDAPapi
28 from sfa.senslab.slabpostgres import SlabDB, slab_dbsession, SenslabXP
31 from sfa.senslab.slabaggregate import SlabAggregate, slab_xrn_to_hostname, \
33 from sfa.senslab.slabslices import SlabSlices
38 # this inheritance scheme is so that the driver object can receive
39 # GetNodes or GetSites sorts of calls directly
40 # and thus minimize the differences in the managers with the pl version
43 class SlabTestbedAPI():
45 def __init__(self, config):
46 self.oar = OARrestapi()
48 self.time_format = "%Y-%m-%d %H:%M:%S"
49 self.root_auth = config.SFA_REGISTRY_ROOT_AUTH
50 self.grain = 600 # 10 mins lease
55 #TODO clean GetPeers. 05/07/12SA
57 def GetPeers ( auth = None, peer_filter=None, return_fields_list=None):
60 existing_hrns_by_types = {}
61 logger.debug("SLABDRIVER \tGetPeers auth = %s, peer_filter %s, \
62 return_field %s " %(auth , peer_filter, return_fields_list))
63 all_records = dbsession.query(RegRecord).filter(RegRecord.type.like('%authority%')).all()
65 for record in all_records:
66 existing_records[(record.hrn, record.type)] = record
67 if record.type not in existing_hrns_by_types:
68 existing_hrns_by_types[record.type] = [record.hrn]
70 existing_hrns_by_types[record.type].append(record.hrn)
73 logger.debug("SLABDRIVER \tGetPeer\texisting_hrns_by_types %s "\
74 %( existing_hrns_by_types))
79 records_list.append(existing_records[(peer_filter,'authority')])
81 for hrn in existing_hrns_by_types['authority']:
82 records_list.append(existing_records[(hrn,'authority')])
84 logger.debug("SLABDRIVER \tGetPeer \trecords_list %s " \
90 return_records = records_list
91 if not peer_filter and not return_fields_list:
95 logger.debug("SLABDRIVER \tGetPeer return_records %s " \
101 #TODO : Handling OR request in make_ldap_filters_from_records
102 #instead of the for loop
103 #over the records' list
104 def GetPersons(self, person_filter=None):
106 person_filter should be a list of dictionnaries when not set to None.
107 Returns a list of users whose accounts are enabled found in ldap.
110 logger.debug("SLABDRIVER \tGetPersons person_filter %s" \
113 if person_filter and isinstance(person_filter, list):
114 #If we are looking for a list of users (list of dict records)
115 #Usually the list contains only one user record
116 for searched_attributes in person_filter:
118 #Get only enabled user accounts in senslab LDAP :
119 #add a filter for make_ldap_filters_from_record
120 person = self.ldap.LdapFindUser(searched_attributes, \
121 is_user_enabled=True)
122 #If a person was found, append it to the list
124 person_list.append(person)
126 #If the list is empty, return None
127 if len(person_list) is 0:
131 #Get only enabled user accounts in senslab LDAP :
132 #add a filter for make_ldap_filters_from_record
133 person_list = self.ldap.LdapFindUser(is_user_enabled=True)
137 def GetTimezone(self):
138 """ Get the OAR servier time and timezone.
139 Unused SA 16/11/12"""
140 server_timestamp, server_tz = self.oar.parser.\
141 SendRequest("GET_timezone")
142 return server_timestamp, server_tz
145 def DeleteJobs(self, job_id, slice_hrn):
146 if not job_id or job_id is -1:
148 username = slice_hrn.split(".")[-1].rstrip("_slice")
150 reqdict['method'] = "delete"
151 reqdict['strval'] = str(job_id)
154 answer = self.oar.POSTRequestToOARRestAPI('DELETE_jobs_id', \
156 logger.debug("SLABDRIVER \tDeleteJobs jobid %s \r\n answer %s \
157 username %s" %(job_id, answer, username))
162 ##TODO : Unused GetJobsId ? SA 05/07/12
163 #def GetJobsId(self, job_id, username = None ):
165 #Details about a specific job.
166 #Includes details about submission time, jot type, state, events,
167 #owner, assigned ressources, walltime etc...
171 #node_list_k = 'assigned_network_address'
172 ##Get job info from OAR
173 #job_info = self.oar.parser.SendRequest(req, job_id, username)
175 #logger.debug("SLABDRIVER \t GetJobsId %s " %(job_info))
177 #if job_info['state'] == 'Terminated':
178 #logger.debug("SLABDRIVER \t GetJobsId job %s TERMINATED"\
181 #if job_info['state'] == 'Error':
182 #logger.debug("SLABDRIVER \t GetJobsId ERROR message %s "\
187 #logger.error("SLABDRIVER \tGetJobsId KeyError")
190 #parsed_job_info = self.get_info_on_reserved_nodes(job_info, \
192 ##Replaces the previous entry
193 ##"assigned_network_address" / "reserved_resources"
195 #job_info.update({'node_ids':parsed_job_info[node_list_k]})
196 #del job_info[node_list_k]
197 #logger.debug(" \r\nSLABDRIVER \t GetJobsId job_info %s " %(job_info))
201 def GetJobsResources(self, job_id, username = None):
202 #job_resources=['reserved_resources', 'assigned_resources',\
203 #'job_id', 'job_uri', 'assigned_nodes',\
205 #assigned_res = ['resource_id', 'resource_uri']
206 #assigned_n = ['node', 'node_uri']
208 req = "GET_jobs_id_resources"
211 #Get job resources list from OAR
212 node_id_list = self.oar.parser.SendRequest(req, job_id, username)
213 logger.debug("SLABDRIVER \t GetJobsResources %s " %(node_id_list))
216 self.__get_hostnames_from_oar_node_ids(node_id_list)
219 #Replaces the previous entry "assigned_network_address" /
220 #"reserved_resources"
222 job_info = {'node_ids': hostname_list}
227 def get_info_on_reserved_nodes(self, job_info, node_list_name):
228 #Get the list of the testbed nodes records and make a
229 #dictionnary keyed on the hostname out of it
230 node_list_dict = self.GetNodes()
231 #node_hostname_list = []
232 node_hostname_list = [node['hostname'] for node in node_list_dict]
233 #for node in node_list_dict:
234 #node_hostname_list.append(node['hostname'])
235 node_dict = dict(zip(node_hostname_list, node_list_dict))
237 reserved_node_hostname_list = []
238 for index in range(len(job_info[node_list_name])):
239 #job_info[node_list_name][k] =
240 reserved_node_hostname_list[index] = \
241 node_dict[job_info[node_list_name][index]]['hostname']
243 logger.debug("SLABDRIVER \t get_info_on_reserved_nodes \
244 reserved_node_hostname_list %s" \
245 %(reserved_node_hostname_list))
247 logger.error("SLABDRIVER \t get_info_on_reserved_nodes KEYERROR " )
249 return reserved_node_hostname_list
251 def GetNodesCurrentlyInUse(self):
252 """Returns a list of all the nodes already involved in an oar job"""
253 return self.oar.parser.SendRequest("GET_running_jobs")
255 def __get_hostnames_from_oar_node_ids(self, resource_id_list ):
256 full_nodes_dict_list = self.GetNodes()
257 #Put the full node list into a dictionary keyed by oar node id
258 oar_id_node_dict = {}
259 for node in full_nodes_dict_list:
260 oar_id_node_dict[node['oar_id']] = node
262 #logger.debug("SLABDRIVER \t __get_hostnames_from_oar_node_ids\
263 #oar_id_node_dict %s" %(oar_id_node_dict))
265 hostname_dict_list = []
266 for resource_id in resource_id_list:
267 #Because jobs requested "asap" do not have defined resources
268 if resource_id is not "Undefined":
269 hostname_dict_list.append(\
270 oar_id_node_dict[resource_id]['hostname'])
272 #hostname_list.append(oar_id_node_dict[resource_id]['hostname'])
273 return hostname_dict_list
275 def GetReservedNodes(self, username = None):
276 #Get the nodes in use and the reserved nodes
277 reservation_dict_list = \
278 self.oar.parser.SendRequest("GET_reserved_nodes", \
282 for resa in reservation_dict_list:
283 logger.debug ("GetReservedNodes resa %s"%(resa))
284 #dict list of hostnames and their site
285 resa['reserved_nodes'] = \
286 self.__get_hostnames_from_oar_node_ids(resa['resource_ids'])
288 #del resa['resource_ids']
289 return reservation_dict_list
291 def GetNodes(self, node_filter_dict = None, return_fields_list = None):
293 node_filter_dict : dictionnary of lists
296 node_dict_by_id = self.oar.parser.SendRequest("GET_resources_full")
297 node_dict_list = node_dict_by_id.values()
298 logger.debug (" SLABDRIVER GetNodes node_filter_dict %s \
299 return_fields_list %s "%(node_filter_dict, return_fields_list))
300 #No filtering needed return the list directly
301 if not (node_filter_dict or return_fields_list):
302 return node_dict_list
304 return_node_list = []
306 for filter_key in node_filter_dict:
308 #Filter the node_dict_list by each value contained in the
309 #list node_filter_dict[filter_key]
310 for value in node_filter_dict[filter_key]:
311 for node in node_dict_list:
312 if node[filter_key] == value:
313 if return_fields_list :
315 for k in return_fields_list:
317 return_node_list.append(tmp)
319 return_node_list.append(node)
321 logger.log_exc("GetNodes KeyError")
325 return return_node_list
327 def AddSlice(slice_record, user_record):
328 """Add slice to the sfa tables. Called by verify_slice
329 during lease/sliver creation.
332 sfa_record = RegSlice(hrn=slice_record['slice_hrn'],
333 gid=slice_record['gid'],
334 pointer=slice_record['slice_id'],
335 authority=slice_record['authority'])
337 logger.debug("SLABDRIVER.PY AddSlice sfa_record %s user_record %s" \
338 %(sfa_record, user_record))
339 sfa_record.just_created()
340 dbsession.add(sfa_record)
342 #Update the reg-researcher dependance table
343 sfa_record.reg_researchers = [user_record]
346 #Update the senslab table with the new slice
347 #slab_slice = SenslabXP( slice_hrn = slice_record['slice_hrn'], \
348 #record_id_slice = sfa_record.record_id , \
349 #record_id_user = slice_record['record_id_user'], \
350 #peer_authority = slice_record['peer_authority'])
352 #logger.debug("SLABDRIVER.PY \tAddSlice slice_record %s \
353 #slab_slice %s sfa_record %s" \
354 #%(slice_record,slab_slice, sfa_record))
355 #slab_dbsession.add(slab_slice)
356 #slab_dbsession.commit()
359 def GetSites(self, site_filter_name_list = None, return_fields_list = None):
360 site_dict = self.oar.parser.SendRequest("GET_sites")
361 #site_dict : dict where the key is the sit ename
362 return_site_list = []
363 if not ( site_filter_name_list or return_fields_list):
364 return_site_list = site_dict.values()
365 return return_site_list
367 for site_filter_name in site_filter_name_list:
368 if site_filter_name in site_dict:
369 if return_fields_list:
370 for field in return_fields_list:
373 tmp[field] = site_dict[site_filter_name][field]
375 logger.error("GetSites KeyError %s "%(field))
377 return_site_list.append(tmp)
379 return_site_list.append( site_dict[site_filter_name])
382 return return_site_list
388 #TODO : Check rights to delete person
389 def DeletePerson(self, person_record):
390 """ Disable an existing account in senslab LDAP.
391 Users and techs can only delete themselves. PIs can only
392 delete themselves and other non-PIs at their sites.
393 ins can delete anyone.
394 Returns 1 if successful, faults otherwise.
398 #Disable user account in senslab LDAP
399 ret = self.ldap.LdapMarkUserAsDeleted(person_record)
400 logger.warning("SLABDRIVER DeletePerson %s " %(person_record))
403 #TODO Check DeleteSlice, check rights 05/07/2012 SA
404 def DeleteSlice(self, slice_record):
405 """ Deletes the specified slice.
406 Senslab : Kill the job associated with the slice if there is one
407 using DeleteSliceFromNodes.
408 Updates the slice record in slab db to remove the slice nodes.
410 Users may only delete slices of which they are members. PIs may
411 delete any of the slices at their sites, or any slices of which
412 they are members. Admins may delete any slice.
413 Returns 1 if successful, faults otherwise.
417 self.DeleteSliceFromNodes(slice_record)
418 logger.warning("SLABDRIVER DeleteSlice %s "%(slice_record))
422 def __add_person_to_db(self, user_dict):
424 check_if_exists = dbsession.query(RegUser).filter_by(email = user_dict['email']).first()
426 if not check_if_exists:
427 logger.debug("__add_person_to_db \t Adding %s \r\n \r\n \
428 _________________________________________________________________________\
429 " %(user_dict['hrn']))
430 hrn = user_dict['hrn']
431 user_record = RegUser(hrn=hrn , pointer= '-1', authority=get_authority(hrn), \
432 email=user_dict['email'], gid = None)
433 user_record.reg_keys = [RegKey(user_dict['pkey'])]
434 user_record.just_created()
435 dbsession.add (user_record)
439 #TODO AddPerson 04/07/2012 SA
440 #def AddPerson(self, auth, person_fields=None):
441 def AddPerson(self, record):#TODO fixing 28/08//2012 SA
442 """Adds a new account. Any fields specified in records are used,
443 otherwise defaults are used.
444 Accounts are disabled by default. To enable an account,
446 Returns the new person_id (> 0) if successful, faults otherwise.
450 ret = self.ldap.LdapAddUser(record)
451 logger.debug("SLABDRIVER AddPerson return code %s \r\n "%(ret))
452 self.__add_person_to_db(record)
455 #TODO AddPersonToSite 04/07/2012 SA
456 def AddPersonToSite (self, auth, person_id_or_email, \
457 site_id_or_login_base=None):
458 """ Adds the specified person to the specified site. If the person is
459 already a member of the site, no errors are returned. Does not change
460 the person's primary site.
461 Returns 1 if successful, faults otherwise.
465 logger.warning("SLABDRIVER AddPersonToSite EMPTY - DO NOTHING \r\n ")
468 #TODO AddRoleToPerson : Not sure if needed in senslab 04/07/2012 SA
469 def AddRoleToPerson(self, auth, role_id_or_name, person_id_or_email):
470 """Grants the specified role to the person.
471 PIs can only grant the tech and user roles to users and techs at their
472 sites. Admins can grant any role to any user.
473 Returns 1 if successful, faults otherwise.
477 logger.warning("SLABDRIVER AddRoleToPerson EMPTY - DO NOTHING \r\n ")
480 #TODO AddPersonKey 04/07/2012 SA
481 def AddPersonKey(self, auth, person_id_or_email, key_fields=None):
482 """Adds a new key to the specified account.
483 Non-admins can only modify their own keys.
484 Returns the new key_id (> 0) if successful, faults otherwise.
488 logger.warning("SLABDRIVER AddPersonKey EMPTY - DO NOTHING \r\n ")
491 def DeleteLeases(self, leases_id_list, slice_hrn ):
492 logger.debug("SLABDRIVER DeleteLeases leases_id_list %s slice_hrn %s \
493 \r\n " %(leases_id_list, slice_hrn))
494 for job_id in leases_id_list:
495 self.DeleteJobs(job_id, slice_hrn)
503 def LaunchExperimentOnOAR(self, added_nodes, slice_name, \
504 lease_start_time, lease_duration, slice_user=None):
506 lease_dict['lease_start_time'] = lease_start_time
507 lease_dict['lease_duration'] = lease_duration
508 lease_dict['added_nodes'] = added_nodes
509 lease_dict['slice_name'] = slice_name
510 lease_dict['slice_user'] = slice_user
511 lease_dict['grain'] = self.GetLeaseGranularity()
512 lease_dict['time_format'] = self.time_format
515 def __create_job_structure_request_for_OAR(lease_dict):
516 """ Creates the structure needed for a correct POST on OAR.
517 Makes the timestamp transformation into the appropriate format.
518 Sends the POST request to create the job with the resources in
527 reqdict['workdir'] = '/tmp'
528 reqdict['resource'] = "{network_address in ("
530 for node in lease_dict['added_nodes']:
531 logger.debug("\r\n \r\n OARrestapi \t \
532 __create_job_structure_request_for_OAR node %s" %(node))
534 # Get the ID of the node
536 reqdict['resource'] += "'" + nodeid + "', "
537 nodeid_list.append(nodeid)
539 custom_length = len(reqdict['resource'])- 2
540 reqdict['resource'] = reqdict['resource'][0:custom_length] + \
541 ")}/nodes=" + str(len(nodeid_list))
543 def __process_walltime(duration):
544 """ Calculates the walltime in seconds from the duration in H:M:S
545 specified in the RSpec.
549 # Fixing the walltime by adding a few delays.
550 # First put the walltime in seconds oarAdditionalDelay = 20;
551 # additional delay for /bin/sleep command to
552 # take in account prologue and epilogue scripts execution
553 # int walltimeAdditionalDelay = 240; additional delay
554 desired_walltime = duration
555 total_walltime = desired_walltime + 240 #+4 min Update SA 23/10/12
556 sleep_walltime = desired_walltime # 0 sec added Update SA 23/10/12
558 #Put the walltime back in str form
560 walltime.append(str(total_walltime / 3600))
561 total_walltime = total_walltime - 3600 * int(walltime[0])
562 #Get the remaining minutes
563 walltime.append(str(total_walltime / 60))
564 total_walltime = total_walltime - 60 * int(walltime[1])
566 walltime.append(str(total_walltime))
569 logger.log_exc(" __process_walltime duration null")
571 return walltime, sleep_walltime
574 walltime, sleep_walltime = \
575 __process_walltime(int(lease_dict['lease_duration'])*lease_dict['grain'])
578 reqdict['resource'] += ",walltime=" + str(walltime[0]) + \
579 ":" + str(walltime[1]) + ":" + str(walltime[2])
580 reqdict['script_path'] = "/bin/sleep " + str(sleep_walltime)
582 #In case of a scheduled experiment (not immediate)
583 #To run an XP immediately, don't specify date and time in RSpec
584 #They will be set to None.
585 if lease_dict['lease_start_time'] is not '0':
586 #Readable time accepted by OAR
587 start_time = datetime.fromtimestamp(int(lease_dict['lease_start_time'])).\
588 strftime(lease_dict['time_format'])
589 reqdict['reservation'] = start_time
590 #If there is not start time, Immediate XP. No need to add special
594 reqdict['type'] = "deploy"
595 reqdict['directory'] = ""
596 reqdict['name'] = "SFA_" + lease_dict['slice_user']
600 logger.debug("SLABDRIVER.PY \tLaunchExperimentOnOAR slice_user %s\
601 \r\n " %(slice_user))
602 #Create the request for OAR
603 reqdict = __create_job_structure_request_for_OAR(lease_dict)
604 # first step : start the OAR job and update the job
605 logger.debug("SLABDRIVER.PY \tLaunchExperimentOnOAR reqdict %s\
608 answer = self.oar.POSTRequestToOARRestAPI('POST_job', \
610 logger.debug("SLABDRIVER \tLaunchExperimentOnOAR jobid %s " %(answer))
614 logger.log_exc("SLABDRIVER \tLaunchExperimentOnOAR \
615 Impossible to create job %s " %(answer))
619 def __configure_experiment(jobid, added_nodes):
620 # second step : configure the experiment
621 # we need to store the nodes in a yaml (well...) file like this :
622 # [1,56,23,14,45,75] with name /tmp/sfa<jobid>.json
623 tmp_dir = '/tmp/sfa/'
624 if not os.path.exists(tmp_dir):
626 job_file = open(tmp_dir + str(jobid) + '.json', 'w')
628 job_file.write(str(added_nodes[0].strip('node')))
629 for node in added_nodes[1:len(added_nodes)] :
630 job_file.write(', '+ node.strip('node'))
635 def __launch_senslab_experiment(jobid):
636 # third step : call the senslab-experiment wrapper
637 #command= "java -jar target/sfa-1.0-jar-with-dependencies.jar
638 # "+str(jobid)+" "+slice_user
639 javacmdline = "/usr/bin/java"
641 "/opt/senslabexperimentwrapper/sfa-1.0-jar-with-dependencies.jar"
643 output = subprocess.Popen([javacmdline, "-jar", jarname, str(jobid), \
644 slice_user],stdout=subprocess.PIPE).communicate()[0]
646 logger.debug("SLABDRIVER \t __configure_experiment wrapper returns%s " \
653 logger.debug("SLABDRIVER \tLaunchExperimentOnOAR jobid %s \
654 added_nodes %s slice_user %s" %(jobid, added_nodes, slice_user))
657 __configure_experiment(jobid, added_nodes)
658 __launch_senslab_experiment(jobid)
663 def AddLeases(self, hostname_list, slice_record, \
664 lease_start_time, lease_duration):
665 logger.debug("SLABDRIVER \r\n \r\n \t AddLeases hostname_list %s \
666 slice_record %s lease_start_time %s lease_duration %s "\
667 %( hostname_list, slice_record , lease_start_time, \
670 #tmp = slice_record['reg-researchers'][0].split(".")
671 username = slice_record['login']
672 #username = tmp[(len(tmp)-1)]
673 job_id = self.LaunchExperimentOnOAR(hostname_list, slice_record['hrn'], \
674 lease_start_time, lease_duration, username)
675 start_time = datetime.fromtimestamp(int(lease_start_time)).strftime(self.time_format)
676 end_time = lease_start_time + lease_duration
678 import logging, logging.handlers
679 from sfa.util.sfalogging import _SfaLogger
680 logger.debug("SLABDRIVER \r\n \r\n \t AddLeases TURN ON LOGGING SQL %s %s %s "%(slice_record['hrn'], job_id, end_time))
681 sql_logger = _SfaLogger(loggername = 'sqlalchemy.engine', level=logging.DEBUG)
682 logger.debug("SLABDRIVER \r\n \r\n \t AddLeases %s %s %s " %(type(slice_record['hrn']), type(job_id), type(end_time)))
684 slab_ex_row = SenslabXP(slice_hrn = slice_record['hrn'], \
685 job_id = job_id, end_time= end_time)
687 logger.debug("SLABDRIVER \r\n \r\n \t AddLeases slab_ex_row %s" \
689 slab_dbsession.add(slab_ex_row)
690 slab_dbsession.commit()
692 logger.debug("SLABDRIVER \t AddLeases hostname_list start_time %s " %(start_time))
697 #Delete the jobs from job_senslab table
698 def DeleteSliceFromNodes(self, slice_record):
699 for job_id in slice_record['oar_job_id']:
700 self.DeleteJobs(job_id, slice_record['hrn'])
704 def GetLeaseGranularity(self):
705 """ Returns the granularity of Senslab testbed.
706 OAR returns seconds for experiments duration.
708 Experiments which last less than 10 min are invalid"""
715 def update_jobs_in_slabdb( job_oar_list, jobs_psql):
716 #Get all the entries in slab_xp table
719 jobs_psql = set(jobs_psql)
720 kept_jobs = set(job_oar_list).intersection(jobs_psql)
721 logger.debug ( "\r\n \t\ update_jobs_in_slabdb jobs_psql %s \r\n \t \
722 job_oar_list %s kept_jobs %s "%(jobs_psql, job_oar_list, kept_jobs))
723 deleted_jobs = set(jobs_psql).difference(kept_jobs)
724 deleted_jobs = list(deleted_jobs)
725 if len(deleted_jobs) > 0:
726 slab_dbsession.query(SenslabXP).filter(SenslabXP.job_id.in_(deleted_jobs)).delete(synchronize_session='fetch')
727 slab_dbsession.commit()
733 def GetLeases(self, lease_filter_dict=None, login=None):
736 unfiltered_reservation_list = self.GetReservedNodes(login)
738 reservation_list = []
739 #Find the slice associated with this user senslab ldap uid
740 logger.debug(" SLABDRIVER.PY \tGetLeases login %s\
741 unfiltered_reservation_list %s " %(login, unfiltered_reservation_list))
742 #Create user dict first to avoid looking several times for
743 #the same user in LDAP SA 27/07/12
747 jobs_psql_query = slab_dbsession.query(SenslabXP).all()
748 jobs_psql_dict = [ (row.job_id, row.__dict__ )for row in jobs_psql_query ]
749 jobs_psql_dict = dict(jobs_psql_dict)
750 logger.debug("SLABDRIVER \tGetLeases jobs_psql_dict %s"\
752 jobs_psql_id_list = [ row.job_id for row in jobs_psql_query ]
756 for resa in unfiltered_reservation_list:
757 logger.debug("SLABDRIVER \tGetLeases USER %s"\
759 #Cosntruct list of jobs (runing, waiting..) in oar
760 job_oar_list.append(resa['lease_id'])
761 #If there is information on the job in SLAB DB (slice used and job id)
762 if resa['lease_id'] in jobs_psql_dict:
763 job_info = jobs_psql_dict[resa['lease_id']]
764 logger.debug("SLABDRIVER \tGetLeases resa_user_dict %s"\
766 resa['slice_hrn'] = job_info['slice_hrn']
767 resa['slice_id'] = hrn_to_urn(resa['slice_hrn'], 'slice')
769 #Assume it is a senslab slice:
771 resa['slice_id'] = hrn_to_urn(self.root_auth+'.'+ resa['user'] +"_slice" , 'slice')
772 #if resa['user'] not in resa_user_dict:
773 #logger.debug("SLABDRIVER \tGetLeases userNOTIN ")
774 #ldap_info = self.ldap.LdapSearch('(uid='+resa['user']+')')
776 #ldap_info = ldap_info[0][1]
777 ##Get the backref :relationship table reg-researchers
778 #user = dbsession.query(RegUser).options(joinedload('reg_slices_as_researcher')).filter_by(email = \
779 #ldap_info['mail'][0])
782 #user = user.__dict__
783 #slice_info = user['reg_slices_as_researcher'][0].__dict__
784 ##Separated in case user not in database :
785 ##record_id not defined SA 17/07//12
787 ##query_slice_info = slab_dbsession.query(SenslabXP).filter_by(record_id_user = user.record_id)
788 ##if query_slice_info:
789 ##slice_info = query_slice_info.first()
793 #resa_user_dict[resa['user']] = {}
794 #resa_user_dict[resa['user']]['ldap_info'] = user
795 #resa_user_dict[resa['user']]['slice_info'] = slice_info
797 #resa['slice_hrn'] = resa_user_dict[resa['user']]['slice_info']['hrn']
798 #resa['slice_id'] = hrn_to_urn(resa['slice_hrn'], 'slice')
801 resa['component_id_list'] = []
802 resa['hrn'] = Xrn(resa['slice_id']).get_hrn()
803 #Transform the hostnames into urns (component ids)
804 for node in resa['reserved_nodes']:
805 #resa['component_id_list'].append(hostname_to_urn(self.hrn, \
806 #self.root_auth, node['hostname']))
807 slab_xrn = slab_xrn_object(self.root_auth, node)
808 resa['component_id_list'].append(slab_xrn.urn)
810 if lease_filter_dict:
811 logger.debug("SLABDRIVER \tGetLeases resa_ %s \r\n leasefilter %s"\
812 %(resa,lease_filter_dict))
814 if lease_filter_dict['name'] == resa['hrn']:
815 reservation_list.append(resa)
817 if lease_filter_dict is None:
818 reservation_list = unfiltered_reservation_list
820 #del unfiltered_reservation_list[unfiltered_reservation_list.index(resa)]
823 self.update_jobs_in_slabdb(job_oar_list, jobs_psql_id_list)
825 #for resa in unfiltered_reservation_list:
829 #if resa['user'] in resa_user_dict:
830 #resa['slice_hrn'] = resa_user_dict[resa['user']]['slice_info']['hrn']
831 #resa['slice_id'] = hrn_to_urn(resa['slice_hrn'], 'slice')
833 ##resa['slice_id'] = hrn_to_urn(slice_info.slice_hrn, 'slice')
834 #resa['component_id_list'] = []
835 ##Transform the hostnames into urns (component ids)
836 #for node in resa['reserved_nodes']:
837 ##resa['component_id_list'].append(hostname_to_urn(self.hrn, \
838 ##self.root_auth, node['hostname']))
839 #slab_xrn = slab_xrn_object(self.root_auth, node)
840 #resa['component_id_list'].append(slab_xrn.urn)
842 ##Filter the reservation list if necessary
843 ##Returns all the leases associated with a given slice
844 #if lease_filter_dict:
845 #logger.debug("SLABDRIVER \tGetLeases lease_filter_dict %s"\
846 #%(lease_filter_dict))
847 #for resa in unfiltered_reservation_list:
848 #if lease_filter_dict['name'] == resa['slice_hrn']:
849 #reservation_list.append(resa)
851 #reservation_list = unfiltered_reservation_list
853 logger.debug(" SLABDRIVER.PY \tGetLeases reservation_list %s"\
855 return reservation_list
860 #TODO FUNCTIONS SECTION 04/07/2012 SA
862 #TODO : Is UnBindObjectFromPeer still necessary ? Currently does nothing
865 def UnBindObjectFromPeer( auth, object_type, object_id, shortname):
866 """ This method is a hopefully temporary hack to let the sfa correctly
867 detach the objects it creates from a remote peer object. This is
868 needed so that the sfa federation link can work in parallel with
869 RefreshPeer, as RefreshPeer depends on remote objects being correctly
872 auth : struct, API authentication structure
873 AuthMethod : string, Authentication method to use
874 object_type : string, Object type, among 'site','person','slice',
876 object_id : int, object_id
877 shortname : string, peer shortname
881 logger.warning("SLABDRIVER \tUnBindObjectFromPeer EMPTY-\
885 #TODO Is BindObjectToPeer still necessary ? Currently does nothing
887 def BindObjectToPeer(self, auth, object_type, object_id, shortname=None, \
888 remote_object_id=None):
889 """This method is a hopefully temporary hack to let the sfa correctly
890 attach the objects it creates to a remote peer object. This is needed
891 so that the sfa federation link can work in parallel with RefreshPeer,
892 as RefreshPeer depends on remote objects being correctly marked.
894 shortname : string, peer shortname
895 remote_object_id : int, remote object_id, set to 0 if unknown
899 logger.warning("SLABDRIVER \tBindObjectToPeer EMPTY - DO NOTHING \r\n ")
902 #TODO UpdateSlice 04/07/2012 SA
903 #Funciton should delete and create another job since oin senslab slice=job
904 def UpdateSlice(self, auth, slice_id_or_name, slice_fields=None):
905 """Updates the parameters of an existing slice with the values in
907 Users may only update slices of which they are members.
908 PIs may update any of the slices at their sites, or any slices of
909 which they are members. Admins may update any slice.
910 Only PIs and admins may update max_nodes. Slices cannot be renewed
911 (by updating the expires parameter) more than 8 weeks into the future.
912 Returns 1 if successful, faults otherwise.
916 logger.warning("SLABDRIVER UpdateSlice EMPTY - DO NOTHING \r\n ")
919 #TODO UpdatePerson 04/07/2012 SA
920 def UpdatePerson(self, slab_hrn, federated_hrn, person_fields=None):
921 """Updates a person. Only the fields specified in person_fields
922 are updated, all other fields are left untouched.
923 Users and techs can only update themselves. PIs can only update
924 themselves and other non-PIs at their sites.
925 Returns 1 if successful, faults otherwise.
929 #new_row = FederatedToSenslab(slab_hrn, federated_hrn)
930 #slab_dbsession.add(new_row)
931 #slab_dbsession.commit()
933 logger.debug("SLABDRIVER UpdatePerson EMPTY - DO NOTHING \r\n ")
936 #TODO GetKeys 04/07/2012 SA
937 def GetKeys(self, auth, key_filter=None, return_fields=None):
938 """Returns an array of structs containing details about keys.
939 If key_filter is specified and is an array of key identifiers,
940 or a struct of key attributes, only keys matching the filter
941 will be returned. If return_fields is specified, only the
942 specified details will be returned.
944 Admin may query all keys. Non-admins may only query their own keys.
948 logger.warning("SLABDRIVER GetKeys EMPTY - DO NOTHING \r\n ")
951 #TODO DeleteKey 04/07/2012 SA
952 def DeleteKey(self, key_id):
954 Non-admins may only delete their own keys.
955 Returns 1 if successful, faults otherwise.
959 logger.warning("SLABDRIVER DeleteKey EMPTY - DO NOTHING \r\n ")
966 def _sql_get_slice_info( slice_filter ):
967 #DO NOT USE RegSlice - reg_researchers to get the hrn
968 #of the user otherwise will mess up the RegRecord in
969 #Resolve, don't know why - SA 08/08/2012
971 #Only one entry for one user = one slice in slab_xp table
972 #slicerec = dbsession.query(RegRecord).filter_by(hrn = slice_filter).first()
973 raw_slicerec = dbsession.query(RegSlice).options(joinedload('reg_researchers')).filter_by(hrn = slice_filter).first()
974 #raw_slicerec = dbsession.query(RegRecord).filter_by(hrn = slice_filter).first()
977 #raw_slicerec.reg_researchers
978 raw_slicerec = raw_slicerec.__dict__
979 logger.debug(" SLABDRIVER \t get_slice_info slice_filter %s \
980 raw_slicerec %s"%(slice_filter, raw_slicerec))
981 slicerec = raw_slicerec
982 #only one researcher per slice so take the first one
983 #slicerec['reg_researchers'] = raw_slicerec['reg_researchers']
984 #del slicerec['reg_researchers']['_sa_instance_state']
991 def _sql_get_slice_info_from_user(slice_filter ):
992 #slicerec = dbsession.query(RegRecord).filter_by(record_id = slice_filter).first()
993 raw_slicerec = dbsession.query(RegUser).options(joinedload('reg_slices_as_researcher')).filter_by(record_id = slice_filter).first()
994 #raw_slicerec = dbsession.query(RegRecord).filter_by(record_id = slice_filter).first()
995 #Put it in correct order
996 user_needed_fields = ['peer_authority', 'hrn', 'last_updated', 'classtype', 'authority', 'gid', 'record_id', 'date_created', 'type', 'email', 'pointer']
997 slice_needed_fields = ['peer_authority', 'hrn', 'last_updated', 'classtype', 'authority', 'gid', 'record_id', 'date_created', 'type', 'pointer']
999 #raw_slicerec.reg_slices_as_researcher
1000 raw_slicerec = raw_slicerec.__dict__
1003 dict([(k, raw_slicerec['reg_slices_as_researcher'][0].__dict__[k]) \
1004 for k in slice_needed_fields])
1005 slicerec['reg_researchers'] = dict([(k, raw_slicerec[k]) \
1006 for k in user_needed_fields])
1007 #TODO Handle multiple slices for one user SA 10/12/12
1008 #for now only take the first slice record associated to the rec user
1009 ##slicerec = raw_slicerec['reg_slices_as_researcher'][0].__dict__
1010 #del raw_slicerec['reg_slices_as_researcher']
1011 #slicerec['reg_researchers'] = raw_slicerec
1012 ##del slicerec['_sa_instance_state']
1019 def _get_slice_records(self, slice_filter = None, \
1020 slice_filter_type = None):
1024 #Get list of slices based on the slice hrn
1025 if slice_filter_type == 'slice_hrn':
1027 #if get_authority(slice_filter) == self.root_auth:
1028 #login = slice_filter.split(".")[1].split("_")[0]
1030 slicerec = self._sql_get_slice_info(slice_filter)
1032 if slicerec is None:
1036 #Get slice based on user id
1037 if slice_filter_type == 'record_id_user':
1039 slicerec = self._sql_get_slice_info_from_user(slice_filter)
1042 fixed_slicerec_dict = slicerec
1043 #At this point if the there is no login it means
1044 #record_id_user filter has been used for filtering
1046 ##If theslice record is from senslab
1047 #if fixed_slicerec_dict['peer_authority'] is None:
1048 #login = fixed_slicerec_dict['hrn'].split(".")[1].split("_")[0]
1049 #return login, fixed_slicerec_dict
1050 return fixed_slicerec_dict
1054 def GetSlices(self, slice_filter = None, slice_filter_type = None, login=None):
1055 """ Get the slice records from the slab db.
1056 Returns a slice ditc if slice_filter and slice_filter_type
1058 Returns a list of slice dictionnaries if there are no filters
1063 authorized_filter_types_list = ['slice_hrn', 'record_id_user']
1064 return_slicerec_dictlist = []
1066 #First try to get information on the slice based on the filter provided
1067 if slice_filter_type in authorized_filter_types_list:
1068 fixed_slicerec_dict = \
1069 self._get_slice_records(slice_filter, slice_filter_type)
1070 #login, fixed_slicerec_dict = \
1071 #self._get_slice_records(slice_filter, slice_filter_type)
1072 logger.debug(" SLABDRIVER \tGetSlices login %s \
1073 slice record %s slice_filter %s slice_filter_type %s "\
1074 %(login, fixed_slicerec_dict,slice_filter, slice_filter_type))
1077 #Now we have the slice record fixed_slicerec_dict, get the
1078 #jobs associated to this slice
1079 #leases_list = self.GetReservedNodes(username = login)
1080 leases_list = self.GetLeases(login = login)
1081 #If no job is running or no job scheduled
1082 #return only the slice record
1083 if leases_list == [] and fixed_slicerec_dict:
1084 return_slicerec_dictlist.append(fixed_slicerec_dict)
1086 #If several jobs for one slice , put the slice record into
1087 # each lease information dict
1088 for lease in leases_list :
1091 reserved_list = lease['reserved_nodes']
1093 slicerec_dict['oar_job_id'] = lease['lease_id']
1094 slicerec_dict.update({'list_node_ids':{'hostname':reserved_list}})
1095 slicerec_dict.update({'node_ids':lease['reserved_nodes']})
1097 #Update lease dict with the slice record
1098 if fixed_slicerec_dict:
1099 fixed_slicerec_dict['oar_job_id'] = []
1100 fixed_slicerec_dict['oar_job_id'].append(slicerec_dict['oar_job_id'])
1101 slicerec_dict.update(fixed_slicerec_dict)
1102 #slicerec_dict.update({'hrn':\
1103 #str(fixed_slicerec_dict['slice_hrn'])})
1106 return_slicerec_dictlist.append(slicerec_dict)
1107 logger.debug("SLABDRIVER.PY \tGetSlices \
1108 slicerec_dict %s return_slicerec_dictlist %s \
1109 lease['reserved_nodes'] \
1110 %s" %(slicerec_dict, return_slicerec_dictlist, \
1111 lease['reserved_nodes'] ))
1113 logger.debug("SLABDRIVER.PY \tGetSlices RETURN \
1114 return_slicerec_dictlist %s" \
1115 %(return_slicerec_dictlist))
1117 return return_slicerec_dictlist
1121 #Get all slices from the senslab sfa database ,
1122 #put them in dict format
1123 #query_slice_list = dbsession.query(RegRecord).all()
1124 query_slice_list = dbsession.query(RegSlice).options(joinedload('reg_researchers')).all()
1125 #query_slice_list = dbsession.query(RegRecord).filter_by(type='slice').all()
1126 #query_slice_list = slab_dbsession.query(SenslabXP).all()
1127 return_slicerec_dictlist = []
1128 for record in query_slice_list:
1129 tmp = record.__dict__
1130 tmp['reg_researchers'] = tmp['reg_researchers'][0].__dict__
1131 #del tmp['reg_researchers']['_sa_instance_state']
1132 return_slicerec_dictlist.append(tmp)
1133 #return_slicerec_dictlist.append(record.__dict__)
1135 #Get all the jobs reserved nodes
1136 leases_list = self.GetReservedNodes()
1139 for fixed_slicerec_dict in return_slicerec_dictlist:
1141 #Check if the slice belongs to a senslab user
1142 if fixed_slicerec_dict['peer_authority'] is None:
1143 owner = fixed_slicerec_dict['hrn'].split(".")[1].split("_")[0]
1146 for lease in leases_list:
1147 if owner == lease['user']:
1148 slicerec_dict['oar_job_id'] = lease['lease_id']
1150 #for reserved_node in lease['reserved_nodes']:
1151 logger.debug("SLABDRIVER.PY \tGetSlices lease %s "\
1154 reserved_list = lease['reserved_nodes']
1156 slicerec_dict.update({'node_ids':lease['reserved_nodes']})
1157 slicerec_dict.update({'list_node_ids':{'hostname':reserved_list}})
1158 slicerec_dict.update(fixed_slicerec_dict)
1159 #slicerec_dict.update({'hrn':\
1160 #str(fixed_slicerec_dict['slice_hrn'])})
1161 #return_slicerec_dictlist.append(slicerec_dict)
1162 fixed_slicerec_dict.update(slicerec_dict)
1164 logger.debug("SLABDRIVER.PY \tGetSlices RETURN \
1165 return_slicerec_dictlist %s \slice_filter %s " \
1166 %(return_slicerec_dictlist, slice_filter))
1168 return return_slicerec_dictlist
1174 # Convert SFA fields to PLC fields for use when registering up updating
1175 # registry record in the PLC database
1177 # @param type type of record (user, slice, ...)
1178 # @param hrn human readable name
1179 # @param sfa_fields dictionary of SFA fields
1180 # @param slab_fields dictionary of PLC fields (output)
1182 def sfa_fields_to_slab_fields(sfa_type, hrn, record):
1186 #for field in record:
1187 # slab_record[field] = record[field]
1189 if sfa_type == "slice":
1190 #instantion used in get_slivers ?
1191 if not "instantiation" in slab_record:
1192 slab_record["instantiation"] = "senslab-instantiated"
1193 #slab_record["hrn"] = hrn_to_pl_slicename(hrn)
1194 #Unused hrn_to_pl_slicename because Slab's hrn already
1195 #in the appropriate form SA 23/07/12
1196 slab_record["hrn"] = hrn
1197 logger.debug("SLABDRIVER.PY sfa_fields_to_slab_fields \
1198 slab_record %s " %(slab_record['hrn']))
1200 slab_record["url"] = record["url"]
1201 if "description" in record:
1202 slab_record["description"] = record["description"]
1203 if "expires" in record:
1204 slab_record["expires"] = int(record["expires"])
1206 #nodes added by OAR only and then imported to SFA
1207 #elif type == "node":
1208 #if not "hostname" in slab_record:
1209 #if not "hostname" in record:
1210 #raise MissingSfaInfo("hostname")
1211 #slab_record["hostname"] = record["hostname"]
1212 #if not "model" in slab_record:
1213 #slab_record["model"] = "geni"
1216 #elif type == "authority":
1217 #slab_record["login_base"] = hrn_to_slab_login_base(hrn)
1219 #if not "name" in slab_record:
1220 #slab_record["name"] = hrn
1222 #if not "abbreviated_name" in slab_record:
1223 #slab_record["abbreviated_name"] = hrn
1225 #if not "enabled" in slab_record:
1226 #slab_record["enabled"] = True
1228 #if not "is_public" in slab_record:
1229 #slab_record["is_public"] = True
1236 def __transforms_timestamp_into_date(self, xp_utc_timestamp = None):
1237 """ Transforms unix timestamp into valid OAR date format """
1239 #Used in case of a scheduled experiment (not immediate)
1240 #To run an XP immediately, don't specify date and time in RSpec
1241 #They will be set to None.
1242 if xp_utc_timestamp:
1243 #transform the xp_utc_timestamp into server readable time
1244 xp_server_readable_date = datetime.fromtimestamp(int(\
1245 xp_utc_timestamp)).strftime(self.time_format)
1247 return xp_server_readable_date
1265 class SlabDriver(Driver):
1266 """ Senslab Driver class inherited from Driver generic class.
1268 Contains methods compliant with the SFA standard and the testbed
1269 infrastructure (calls to LDAP and OAR).
1271 def __init__(self, config):
1272 Driver.__init__ (self, config)
1273 self.config = config
1274 self.hrn = config.SFA_INTERFACE_HRN
1276 self.db = SlabDB(config, debug = False)
1277 self.slab_api = SlabTestbedAPI(config)
1280 def augment_records_with_testbed_info (self, record_list ):
1281 """ Adds specific testbed info to the records. """
1282 return self.fill_record_info (record_list)
1284 def fill_record_info(self, record_list):
1286 Given a SFA record, fill in the senslab specific and SFA specific
1287 fields in the record.
1290 logger.debug("SLABDRIVER \tfill_record_info records %s " %(record_list))
1291 if not isinstance(record_list, list):
1292 record_list = [record_list]
1295 for record in record_list:
1296 #If the record is a SFA slice record, then add information
1297 #about the user of this slice. This kind of
1298 #information is in the Senslab's DB.
1299 if str(record['type']) == 'slice':
1300 if 'reg_researchers' in record and \
1301 isinstance(record['reg_researchers'], list) :
1302 record['reg_researchers'] = record['reg_researchers'][0].__dict__
1303 record.update({'PI':[record['reg_researchers']['hrn']],
1304 'researcher': [record['reg_researchers']['hrn']],
1305 'name':record['hrn'],
1308 'person_ids':[record['reg_researchers']['record_id']],
1309 'geni_urn':'', #For client_helper.py compatibility
1310 'keys':'', #For client_helper.py compatibility
1311 'key_ids':''}) #For client_helper.py compatibility
1314 #Get slab slice record.
1315 recslice_list = self.slab_api.GetSlices(slice_filter = \
1316 str(record['hrn']),\
1317 slice_filter_type = 'slice_hrn')
1320 logger.debug("SLABDRIVER \tfill_record_info \
1321 TYPE SLICE RECUSER record['hrn'] %s ecord['oar_job_id']\
1322 %s " %(record['hrn'], record['oar_job_id']))
1324 for rec in recslice_list:
1325 logger.debug("SLABDRIVER\r\n \t fill_record_info oar_job_id %s " %(rec['oar_job_id']))
1326 del record['reg_researchers']
1327 record['node_ids'] = [ self.slab_api.root_auth + hostname for hostname in rec['node_ids']]
1331 logger.debug( "SLABDRIVER.PY \t fill_record_info SLICE \
1332 recslice_list %s \r\n \t RECORD %s \r\n \
1333 \r\n" %(recslice_list, record))
1334 if str(record['type']) == 'user':
1335 #The record is a SFA user record.
1336 #Get the information about his slice from Senslab's DB
1337 #and add it to the user record.
1338 recslice_list = self.slab_api.GetSlices(\
1339 slice_filter = record['record_id'],\
1340 slice_filter_type = 'record_id_user')
1342 logger.debug( "SLABDRIVER.PY \t fill_record_info TYPE USER \
1343 recslice_list %s \r\n \t RECORD %s \r\n" %(recslice_list , record))
1344 #Append slice record in records list,
1345 #therefore fetches user and slice info again(one more loop)
1346 #Will update PIs and researcher for the slice
1347 #recuser = dbsession.query(RegRecord).filter_by(record_id = \
1348 #recslice_list[0]['record_id_user']).first()
1349 recuser = recslice_list[0]['reg_researchers']
1350 logger.debug( "SLABDRIVER.PY \t fill_record_info USER \
1351 recuser %s \r\n \r\n" %(recuser))
1353 recslice = recslice_list[0]
1354 recslice.update({'PI':[recuser['hrn']],
1355 'researcher': [recuser['hrn']],
1356 'name':record['hrn'],
1359 'person_ids':[recuser['record_id']]})
1361 for rec in recslice_list:
1362 recslice['oar_job_id'].append(rec['oar_job_id'])
1366 recslice.update({'type':'slice', \
1367 'hrn':recslice_list[0]['hrn']})
1370 #GetPersons takes [] as filters
1371 user_slab = self.slab_api.GetPersons([record])
1374 record.update(user_slab[0])
1375 #For client_helper.py compatibility
1376 record.update( { 'geni_urn':'',
1379 record_list.append(recslice)
1381 logger.debug("SLABDRIVER.PY \tfill_record_info ADDING SLICE\
1382 INFO TO USER records %s" %(record_list))
1384 logger.debug("SLABDRIVER.PY \tfill_record_info END \
1385 record %s \r\n \r\n " %(record))
1387 except TypeError, error:
1388 logger.log_exc("SLABDRIVER \t fill_record_info EXCEPTION %s"\
1390 #logger.debug("SLABDRIVER.PY \t fill_record_info ENDENDEND ")
1395 def sliver_status(self, slice_urn, slice_hrn):
1396 """Receive a status request for slice named urn/hrn
1397 urn:publicid:IDN+senslab+nturro_slice hrn senslab.nturro_slice
1398 shall return a structure as described in
1399 http://groups.geni.net/geni/wiki/GAPI_AM_API_V2#SliverStatus
1400 NT : not sure if we should implement this or not, but used by sface.
1404 #First get the slice with the slice hrn
1405 slice_list = self.slab_api.GetSlices(slice_filter = slice_hrn, \
1406 slice_filter_type = 'slice_hrn')
1408 if len(slice_list) is 0:
1409 raise SliverDoesNotExist("%s slice_hrn" % (slice_hrn))
1411 #Used for fetching the user info witch comes along the slice info
1412 one_slice = slice_list[0]
1415 #Make a list of all the nodes hostnames in use for this slice
1416 slice_nodes_list = []
1417 #for single_slice in slice_list:
1418 #for node in single_slice['node_ids']:
1419 #slice_nodes_list.append(node['hostname'])
1420 for node in one_slice:
1421 slice_nodes_list.append(node['hostname'])
1423 #Get all the corresponding nodes details
1424 nodes_all = self.slab_api.GetNodes({'hostname':slice_nodes_list},
1425 ['node_id', 'hostname','site','boot_state'])
1426 nodeall_byhostname = dict([(one_node['hostname'], one_node) \
1427 for one_node in nodes_all])
1431 for single_slice in slice_list:
1434 top_level_status = 'empty'
1437 ['geni_urn','pl_login','geni_status','geni_resources'], None)
1438 result['pl_login'] = one_slice['reg_researchers']['hrn']
1439 logger.debug("Slabdriver - sliver_status Sliver status \
1440 urn %s hrn %s single_slice %s \r\n " \
1441 %(slice_urn, slice_hrn, single_slice))
1443 if 'node_ids' not in single_slice:
1444 #No job in the slice
1445 result['geni_status'] = top_level_status
1446 result['geni_resources'] = []
1449 top_level_status = 'ready'
1451 #A job is running on Senslab for this slice
1452 # report about the local nodes that are in the slice only
1454 result['geni_urn'] = slice_urn
1458 #timestamp = float(sl['startTime']) + float(sl['walltime'])
1459 #result['pl_expires'] = strftime(self.time_format, \
1460 #gmtime(float(timestamp)))
1461 #result['slab_expires'] = strftime(self.time_format,\
1462 #gmtime(float(timestamp)))
1465 for node in single_slice['node_ids']:
1467 #res['slab_hostname'] = node['hostname']
1468 #res['slab_boot_state'] = node['boot_state']
1470 res['pl_hostname'] = node['hostname']
1471 res['pl_boot_state'] = \
1472 nodeall_byhostname[node['hostname']]['boot_state']
1473 #res['pl_last_contact'] = strftime(self.time_format, \
1474 #gmtime(float(timestamp)))
1475 sliver_id = Xrn(slice_urn, type='slice', \
1476 id=nodeall_byhostname[node['hostname']]['node_id'], \
1477 authority=self.hrn).urn
1479 res['geni_urn'] = sliver_id
1480 node_name = node['hostname']
1481 if nodeall_byhostname[node_name]['boot_state'] == 'Alive':
1483 res['geni_status'] = 'ready'
1485 res['geni_status'] = 'failed'
1486 top_level_status = 'failed'
1488 res['geni_error'] = ''
1490 resources.append(res)
1492 result['geni_status'] = top_level_status
1493 result['geni_resources'] = resources
1494 logger.debug("SLABDRIVER \tsliver_statusresources %s res %s "\
1499 def get_user_record( hrn):
1500 """ Returns the user record based on the hrn from the SFA DB """
1501 return dbsession.query(RegRecord).filter_by(hrn = hrn).first()
1504 def testbed_name (self):
1507 # 'geni_request_rspec_versions' and 'geni_ad_rspec_versions' are mandatory
1508 def aggregate_version (self):
1509 version_manager = VersionManager()
1510 ad_rspec_versions = []
1511 request_rspec_versions = []
1512 for rspec_version in version_manager.versions:
1513 if rspec_version.content_type in ['*', 'ad']:
1514 ad_rspec_versions.append(rspec_version.to_dict())
1515 if rspec_version.content_type in ['*', 'request']:
1516 request_rspec_versions.append(rspec_version.to_dict())
1518 'testbed':self.testbed_name(),
1519 'geni_request_rspec_versions': request_rspec_versions,
1520 'geni_ad_rspec_versions': ad_rspec_versions,
1524 def create_sliver (self, slice_urn, slice_hrn, creds, rspec_string, \
1526 aggregate = SlabAggregate(self)
1528 slices = SlabSlices(self)
1529 peer = slices.get_peer(slice_hrn)
1530 sfa_peer = slices.get_sfa_peer(slice_hrn)
1533 if not isinstance(creds, list):
1537 slice_record = users[0].get('slice_record', {})
1538 logger.debug("SLABDRIVER.PY \t ===============create_sliver \t\
1539 creds %s \r\n \r\n users %s" \
1541 slice_record['user'] = {'keys':users[0]['keys'], \
1542 'email':users[0]['email'], \
1543 'hrn':slice_record['reg-researchers'][0]}
1545 rspec = RSpec(rspec_string)
1546 logger.debug("SLABDRIVER.PY \t create_sliver \trspec.version \
1547 %s slice_record %s users %s" \
1548 %(rspec.version,slice_record, users))
1551 # ensure site record exists?
1552 # ensure slice record exists
1553 #Removed options to verify_slice SA 14/08/12
1554 sfa_slice = slices.verify_slice(slice_hrn, slice_record, peer, \
1557 # ensure person records exists
1558 #verify_persons returns added persons but since the return value
1560 slices.verify_persons(slice_hrn, sfa_slice, users, peer, \
1561 sfa_peer, options=options)
1562 #requested_attributes returned by rspec.version.get_slice_attributes()
1563 #unused, removed SA 13/08/12
1564 rspec.version.get_slice_attributes()
1566 logger.debug("SLABDRIVER.PY create_sliver slice %s " %(sfa_slice))
1568 # add/remove slice from nodes
1570 requested_slivers = [node.get('component_id') \
1571 for node in rspec.version.get_nodes_with_slivers()\
1572 if node.get('authority_id') is self.slab_api.root_auth]
1573 l = [ node for node in rspec.version.get_nodes_with_slivers() ]
1574 logger.debug("SLADRIVER \tcreate_sliver requested_slivers \
1575 requested_slivers %s listnodes %s" \
1576 %(requested_slivers,l))
1577 #verify_slice_nodes returns nodes, but unused here. Removed SA 13/08/12.
1578 #slices.verify_slice_nodes(sfa_slice, requested_slivers, peer)
1581 requested_lease_list = []
1585 for lease in rspec.version.get_leases():
1586 single_requested_lease = {}
1587 logger.debug("SLABDRIVER.PY \tcreate_sliver lease %s " %(lease))
1589 if not lease.get('lease_id'):
1590 if get_authority(lease['component_id']) == self.slab_api.root_auth:
1591 single_requested_lease['hostname'] = \
1592 slab_xrn_to_hostname(\
1593 lease.get('component_id').strip())
1594 single_requested_lease['start_time'] = \
1595 lease.get('start_time')
1596 single_requested_lease['duration'] = lease.get('duration')
1597 #Check the experiment's duration is valid before adding
1598 #the lease to the requested leases list
1599 duration_in_seconds = \
1600 int(single_requested_lease['duration'])*60
1601 if duration_in_seconds > self.slab_api.GetLeaseGranularity():
1602 requested_lease_list.append(single_requested_lease)
1604 #Create dict of leases by start_time, regrouping nodes reserved
1606 #time, for the same amount of time = one job on OAR
1607 requested_job_dict = {}
1608 for lease in requested_lease_list:
1610 #In case it is an asap experiment start_time is empty
1611 if lease['start_time'] == '':
1612 lease['start_time'] = '0'
1614 if lease['start_time'] not in requested_job_dict:
1615 if isinstance(lease['hostname'], str):
1616 lease['hostname'] = [lease['hostname']]
1618 requested_job_dict[lease['start_time']] = lease
1621 job_lease = requested_job_dict[lease['start_time']]
1622 if lease['duration'] == job_lease['duration'] :
1623 job_lease['hostname'].append(lease['hostname'])
1628 logger.debug("SLABDRIVER.PY \tcreate_sliver requested_job_dict %s "\
1629 %(requested_job_dict))
1630 #verify_slice_leases returns the leases , but the return value is unused
1631 #here. Removed SA 13/08/12
1632 slices.verify_slice_leases(sfa_slice, \
1633 requested_job_dict, peer)
1635 return aggregate.get_rspec(slice_xrn=slice_urn, \
1636 login=sfa_slice['login'], version=rspec.version)
1639 def delete_sliver (self, slice_urn, slice_hrn, creds, options):
1641 sfa_slice_list = self.slab_api.GetSlices(slice_filter = slice_hrn, \
1642 slice_filter_type = 'slice_hrn')
1644 if not sfa_slice_list:
1647 #Delete all in the slice
1648 for sfa_slice in sfa_slice_list:
1651 logger.debug("SLABDRIVER.PY delete_sliver slice %s" %(sfa_slice))
1652 slices = SlabSlices(self)
1653 # determine if this is a peer slice
1655 peer = slices.get_peer(slice_hrn)
1656 #TODO delete_sliver SA : UnBindObjectFromPeer should be
1657 #used when there is another
1658 #senslab testbed, which is not the case 14/08/12 .
1660 logger.debug("SLABDRIVER.PY delete_sliver peer %s" %(peer))
1663 self.slab_api.UnBindObjectFromPeer('slice', \
1664 sfa_slice['record_id_slice'], \
1666 self.slab_api.DeleteSliceFromNodes(sfa_slice)
1669 self.slab_api.BindObjectToPeer('slice', \
1670 sfa_slice['record_id_slice'], \
1671 peer, sfa_slice['peer_slice_id'])
1675 # first 2 args are None in case of resource discovery
1676 def list_resources (self, slice_urn, slice_hrn, creds, options):
1677 #cached_requested = options.get('cached', True)
1679 version_manager = VersionManager()
1680 # get the rspec's return format from options
1682 version_manager.get_version(options.get('geni_rspec_version'))
1683 version_string = "rspec_%s" % (rspec_version)
1685 #panos adding the info option to the caching key (can be improved)
1686 if options.get('info'):
1687 version_string = version_string + "_" + \
1688 options.get('info', 'default')
1690 # Adding the list_leases option to the caching key
1691 if options.get('list_leases'):
1692 version_string = version_string + "_"+options.get('list_leases', 'default')
1694 # Adding geni_available to caching key
1695 if options.get('geni_available'):
1696 version_string = version_string + "_" + str(options.get('geni_available'))
1698 # look in cache first
1699 #if cached_requested and self.cache and not slice_hrn:
1700 #rspec = self.cache.get(version_string)
1702 #logger.debug("SlabDriver.ListResources: \
1703 #returning cached advertisement")
1706 #panos: passing user-defined options
1707 aggregate = SlabAggregate(self)
1708 #origin_hrn = Credential(string=creds[0]).get_gid_caller().get_hrn()
1709 #options.update({'origin_hrn':origin_hrn})
1710 rspec = aggregate.get_rspec(slice_xrn=slice_urn, \
1711 version=rspec_version, options=options)
1714 #if self.cache and not slice_hrn:
1715 #logger.debug("Slab.ListResources: stores advertisement in cache")
1716 #self.cache.add(version_string, rspec)
1721 def list_slices (self, creds, options):
1722 # look in cache first
1724 #slices = self.cache.get('slices')
1726 #logger.debug("PlDriver.list_slices returns from cache")
1731 slices = self.slab_api.GetSlices()
1732 logger.debug("SLABDRIVER.PY \tlist_slices hrn %s \r\n \r\n" %(slices))
1733 slice_hrns = [slab_slice['hrn'] for slab_slice in slices]
1735 slice_urns = [hrn_to_urn(slice_hrn, 'slice') \
1736 for slice_hrn in slice_hrns]
1740 #logger.debug ("SlabDriver.list_slices stores value in cache")
1741 #self.cache.add('slices', slice_urns)
1746 def register (self, sfa_record, hrn, pub_key):
1748 Adding new user, slice, node or site should not be handled
1752 Adding users = LDAP Senslab
1753 Adding slice = Import from LDAP users
1759 def update (self, old_sfa_record, new_sfa_record, hrn, new_key):
1760 """No site or node record update allowed in Senslab."""
1762 pointer = old_sfa_record['pointer']
1763 old_sfa_record_type = old_sfa_record['type']
1765 # new_key implemented for users only
1766 if new_key and old_sfa_record_type not in [ 'user' ]:
1767 raise UnknownSfaType(old_sfa_record_type)
1769 #if (type == "authority"):
1770 #self.shell.UpdateSite(pointer, new_sfa_record)
1772 if old_sfa_record_type == "slice":
1773 slab_record = self.slab_api.sfa_fields_to_slab_fields(old_sfa_record_type, \
1774 hrn, new_sfa_record)
1775 if 'name' in slab_record:
1776 slab_record.pop('name')
1777 #Prototype should be UpdateSlice(self,
1778 #auth, slice_id_or_name, slice_fields)
1779 #Senslab cannot update slice since slice = job
1780 #so we must delete and create another job
1781 self.slab_api.UpdateSlice(pointer, slab_record)
1783 elif old_sfa_record_type == "user":
1785 all_fields = new_sfa_record
1786 for key in all_fields.keys():
1787 if key in ['first_name', 'last_name', 'title', 'email',
1788 'password', 'phone', 'url', 'bio', 'accepted_aup',
1790 update_fields[key] = all_fields[key]
1791 self.slab_api.UpdatePerson(pointer, update_fields)
1794 # must check this key against the previous one if it exists
1795 persons = self.slab_api.GetPersons(['key_ids'])
1797 keys = person['key_ids']
1798 keys = self.slab_api.GetKeys(person['key_ids'])
1800 # Delete all stale keys
1803 if new_key != key['key']:
1804 self.slab_api.DeleteKey(key['key_id'])
1808 self.slab_api.AddPersonKey(pointer, {'key_type': 'ssh', \
1815 def remove (self, sfa_record):
1816 sfa_record_type = sfa_record['type']
1817 hrn = sfa_record['hrn']
1818 if sfa_record_type == 'user':
1820 #get user from senslab ldap
1821 person = self.slab_api.GetPersons(sfa_record)
1822 #No registering at a given site in Senslab.
1823 #Once registered to the LDAP, all senslab sites are
1826 #Mark account as disabled in ldap
1827 self.slab_api.DeletePerson(sfa_record)
1828 elif sfa_record_type == 'slice':
1829 if self.slab_api.GetSlices(slice_filter = hrn, \
1830 slice_filter_type = 'slice_hrn'):
1831 self.slab_api.DeleteSlice(sfa_record)
1833 #elif type == 'authority':
1834 #if self.GetSites(pointer):
1835 #self.DeleteSite(pointer)