3 from datetime import datetime
4 from dateutil import tz
5 from time import strftime, gmtime
7 from sfa.util.faults import SliverDoesNotExist, UnknownSfaType
8 from sfa.util.sfalogging import logger
10 from sfa.storage.alchemy import dbsession
11 from sfa.storage.model import RegRecord, RegUser
13 from sfa.trust.credential import Credential
16 from sfa.managers.driver import Driver
17 from sfa.rspecs.version_manager import VersionManager
18 from sfa.rspecs.rspec import RSpec
20 from sfa.util.xrn import hrn_to_urn, urn_to_sliver_id, get_leaf
21 from sfa.planetlab.plxrn import slicename_to_hrn, hrn_to_pl_slicename, \
25 ## thierry: everything that is API-related (i.e. handling incoming requests)
27 # SlabDriver should be really only about talking to the senslab testbed
30 from sfa.senslab.OARrestapi import OARrestapi
31 from sfa.senslab.LDAPapi import LDAPapi
33 from sfa.senslab.slabpostgres import SlabDB, slab_dbsession, SliceSenslab
34 from sfa.senslab.slabaggregate import SlabAggregate
35 from sfa.senslab.slabslices import SlabSlices
38 def __process_walltime(duration=None):
39 """ Calculates the walltime in seconds from the duration in H:M:S
40 specified in the RSpec.
44 walltime = duration.split(":")
45 # Fixing the walltime by adding a few delays. First put the walltime
46 # in seconds oarAdditionalDelay = 20; additional delay for
47 # /bin/sleep command to
48 # take in account prologue and epilogue scripts execution
49 # int walltimeAdditionalDelay = 120; additional delay
51 desired_walltime = int(walltime[0])*3600 + int(walltime[1]) * 60 +\
53 total_walltime = desired_walltime + 140 #+2 min 20
54 sleep_walltime = desired_walltime + 20 #+20 sec
55 logger.debug("SLABDRIVER \t__process_walltime desired_walltime %s\
56 total_walltime %s sleep_walltime %s "\
57 %(desired_walltime, total_walltime, \
59 #Put the walltime back in str form
61 walltime[0] = str(total_walltime / 3600)
62 total_walltime = total_walltime - 3600 * int(walltime[0])
63 #Get the remaining minutes
64 walltime[1] = str(total_walltime / 60)
65 total_walltime = total_walltime - 60 * int(walltime[1])
67 walltime[2] = str(total_walltime)
68 logger.debug("SLABDRIVER \t__process_walltime walltime %s "\
71 #automatically set 10min +2 min 20
75 sleep_walltime = '620'
77 return walltime, sleep_walltime
82 # this inheritance scheme is so that the driver object can receive
83 # GetNodes or GetSites sorts of calls directly
84 # and thus minimize the differences in the managers with the pl version
85 class SlabDriver(Driver):
87 def __init__(self, config):
88 Driver.__init__ (self, config)
90 self.hrn = config.SFA_INTERFACE_HRN
92 self.root_auth = config.SFA_REGISTRY_ROOT_AUTH
94 self.oar = OARrestapi()
96 self.time_format = "%Y-%m-%d %H:%M:%S"
97 self.db = SlabDB(config,debug = True)
101 def sliver_status(self, slice_urn, slice_hrn):
102 """Receive a status request for slice named urn/hrn
103 urn:publicid:IDN+senslab+nturro_slice hrn senslab.nturro_slice
104 shall return a structure as described in
105 http://groups.geni.net/geni/wiki/GAPI_AM_API_V2#SliverStatus
106 NT : not sure if we should implement this or not, but used by sface.
110 #First get the slice with the slice hrn
111 sl = self.GetSlices(slice_filter = slice_hrn, \
112 slice_filter_type = 'slice_hrn')
114 raise SliverDoesNotExist("%s slice_hrn" % (slice_hrn))
116 top_level_status = 'unknown'
117 nodes_in_slice = sl['node_ids']
119 if len(nodes_in_slice) is 0:
120 raise SliverDoesNotExist("No slivers allocated ")
122 top_level_status = 'ready'
124 logger.debug("Slabdriver - sliver_status Sliver status urn %s hrn %s sl\
125 %s \r\n " %(slice_urn, slice_hrn, sl))
127 if sl['oar_job_id'] is not -1:
128 #A job is running on Senslab for this slice
129 # report about the local nodes that are in the slice only
131 nodes_all = self.GetNodes({'hostname':nodes_in_slice},
132 ['node_id', 'hostname','site','boot_state'])
133 nodeall_byhostname = dict([(n['hostname'], n) for n in nodes_all])
137 result['geni_urn'] = slice_urn
138 result['pl_login'] = sl['job_user'] #For compatibility
141 timestamp = float(sl['startTime']) + float(sl['walltime'])
142 result['pl_expires'] = strftime(self.time_format, \
143 gmtime(float(timestamp)))
144 #result['slab_expires'] = strftime(self.time_format,\
145 #gmtime(float(timestamp)))
148 for node in nodeall_byhostname:
150 #res['slab_hostname'] = node['hostname']
151 #res['slab_boot_state'] = node['boot_state']
153 res['pl_hostname'] = nodeall_byhostname[node]['hostname']
154 res['pl_boot_state'] = nodeall_byhostname[node]['boot_state']
155 res['pl_last_contact'] = strftime(self.time_format, \
156 gmtime(float(timestamp)))
157 sliver_id = urn_to_sliver_id(slice_urn, sl['record_id_slice'], \
158 nodeall_byhostname[node]['node_id'])
159 res['geni_urn'] = sliver_id
160 if nodeall_byhostname[node]['boot_state'] == 'Alive':
162 res['geni_status'] = 'ready'
164 res['geni_status'] = 'failed'
165 top_level_status = 'failed'
167 res['geni_error'] = ''
169 resources.append(res)
171 result['geni_status'] = top_level_status
172 result['geni_resources'] = resources
173 logger.debug("SLABDRIVER \tsliver_statusresources %s res %s "\
178 def create_sliver (self, slice_urn, slice_hrn, creds, rspec_string, \
180 logger.debug("SLABDRIVER.PY \tcreate_sliver ")
181 aggregate = SlabAggregate(self)
183 slices = SlabSlices(self)
184 peer = slices.get_peer(slice_hrn)
185 sfa_peer = slices.get_sfa_peer(slice_hrn)
188 if not isinstance(creds, list):
192 slice_record = users[0].get('slice_record', {})
195 rspec = RSpec(rspec_string)
196 logger.debug("SLABDRIVER.PY \tcreate_sliver \trspec.version %s " \
200 # ensure site record exists?
201 # ensure slice record exists
202 sfa_slice = slices.verify_slice(slice_hrn, slice_record, peer, \
203 sfa_peer, options=options)
204 requested_attributes = rspec.version.get_slice_attributes()
206 if requested_attributes:
207 for attrib_dict in requested_attributes:
208 if 'timeslot' in attrib_dict and attrib_dict['timeslot'] \
210 sfa_slice.update({'timeslot':attrib_dict['timeslot']})
211 logger.debug("SLABDRIVER.PY create_sliver slice %s " %(sfa_slice))
213 # ensure person records exists
214 persons = slices.verify_persons(slice_hrn, sfa_slice, users, peer, \
215 sfa_peer, options=options)
217 # ensure slice attributes exists?
220 # add/remove slice from nodes
222 requested_slivers = [node.get('component_name') \
223 for node in rspec.version.get_nodes_with_slivers()]
224 logger.debug("SLADRIVER \tcreate_sliver requested_slivers \
225 requested_slivers %s " %(requested_slivers))
227 nodes = slices.verify_slice_nodes(sfa_slice, requested_slivers, peer)
230 requested_leases = []
232 for lease in rspec.version.get_leases():
234 if not lease.get('lease_id'):
235 requested_lease['hostname'] = \
236 xrn_to_hostname(lease.get('component_id').strip())
237 requested_lease['t_from'] = lease.get('t_from')
238 requested_lease['t_until'] = lease.get('t_until')
240 kept_leases.append(int(lease['lease_id']))
241 if requested_lease.get('hostname'):
242 requested_leases.append(requested_lease)
244 leases = slices.verify_slice_leases(sfa_slice, \
245 requested_leases, kept_leases, peer)
247 return aggregate.get_rspec(slice_xrn=slice_urn, version=rspec.version)
250 def delete_sliver (self, slice_urn, slice_hrn, creds, options):
252 sfa_slice = self.GetSlices(slice_filter = slice_hrn, \
253 slice_filter_type = 'slice_hrn')
254 logger.debug("SLABDRIVER.PY delete_sliver slice %s" %(sfa_slice))
258 slices = SlabSlices(self)
259 # determine if this is a peer slice
261 peer = slices.get_peer(slice_hrn)
264 self.UnBindObjectFromPeer('slice', \
265 sfa_slice['record_id_slice'], peer)
266 self.DeleteSliceFromNodes(sfa_slice)
269 self.BindObjectToPeer('slice', sfa_slice['slice_id'], \
270 peer, sfa_slice['peer_slice_id'])
274 def AddSlice(self, slice_record):
275 slab_slice = SliceSenslab( slice_hrn = slice_record['slice_hrn'], \
276 record_id_slice= slice_record['record_id_slice'] , \
277 record_id_user= slice_record['record_id_user'], \
278 peer_authority = slice_record['peer_authority'])
279 logger.debug("SLABDRIVER.PY \tAddSlice slice_record %s slab_slice %s" \
280 %(slice_record,slab_slice))
281 slab_dbsession.add(slab_slice)
282 slab_dbsession.commit()
285 # first 2 args are None in case of resource discovery
286 def list_resources (self, slice_urn, slice_hrn, creds, options):
287 #cached_requested = options.get('cached', True)
289 version_manager = VersionManager()
290 # get the rspec's return format from options
292 version_manager.get_version(options.get('geni_rspec_version'))
293 version_string = "rspec_%s" % (rspec_version)
295 #panos adding the info option to the caching key (can be improved)
296 if options.get('info'):
297 version_string = version_string + "_" + \
298 options.get('info', 'default')
300 # look in cache first
301 #if cached_requested and self.cache and not slice_hrn:
302 #rspec = self.cache.get(version_string)
304 #logger.debug("SlabDriver.ListResources: \
305 #returning cached advertisement")
308 #panos: passing user-defined options
309 logger.debug("SLABDRIVER \tlist_resources rspec " )
310 aggregate = SlabAggregate(self)
311 origin_hrn = Credential(string=creds[0]).get_gid_caller().get_hrn()
312 options.update({'origin_hrn':origin_hrn})
313 rspec = aggregate.get_rspec(slice_xrn=slice_urn, \
314 version=rspec_version, options=options)
317 #if self.cache and not slice_hrn:
318 #logger.debug("Slab.ListResources: stores advertisement in cache")
319 #self.cache.add(version_string, rspec)
324 def list_slices (self, creds, options):
325 # look in cache first
327 #slices = self.cache.get('slices')
329 #logger.debug("PlDriver.list_slices returns from cache")
333 logger.debug("SLABDRIVER.PY \tlist_slices")
334 slices = self.GetSlices()
335 slice_hrns = [slicename_to_hrn(self.hrn, slab_slice['slice_hrn']) \
336 for slab_slice in slices]
337 slice_urns = [hrn_to_urn(slice_hrn, 'slice') \
338 for slice_hrn in slice_hrns]
342 #logger.debug ("SlabDriver.list_slices stores value in cache")
343 #self.cache.add('slices', slice_urns)
347 #No site or node register supported
348 def register (self, sfa_record, hrn, pub_key):
349 record_type = sfa_record['type']
350 slab_record = self.sfa_fields_to_slab_fields(record_type, hrn, \
354 if record_type == 'slice':
355 acceptable_fields = ['url', 'instantiation', 'name', 'description']
356 for key in slab_record.keys():
357 if key not in acceptable_fields:
359 logger.debug("SLABDRIVER.PY register")
360 slices = self.GetSlices(slice_filter =slab_record['hrn'], \
361 slice_filter_type = 'slice_hrn')
363 pointer = self.AddSlice(slab_record)
365 pointer = slices[0]['slice_id']
367 elif record_type == 'user':
368 persons = self.GetPersons([sfa_record])
369 #persons = self.GetPersons([sfa_record['hrn']])
371 pointer = self.AddPerson(dict(sfa_record))
374 pointer = persons[0]['person_id']
376 #Does this make sense to senslab ?
377 #if 'enabled' in sfa_record and sfa_record['enabled']:
378 #self.UpdatePerson(pointer, \
379 #{'enabled': sfa_record['enabled']})
381 #TODO register Change this AddPersonToSite stuff 05/07/2012 SA
382 # add this person to the site only if
383 # she is being added for the first
384 # time by sfa and doesnt already exist in plc
385 if not persons or not persons[0]['site_ids']:
386 login_base = get_leaf(sfa_record['authority'])
387 self.AddPersonToSite(pointer, login_base)
389 # What roles should this user have?
390 #TODO : DElete this AddRoleToPerson 04/07/2012 SA
391 #Function prototype is :
392 #AddRoleToPerson(self, auth, role_id_or_name, person_id_or_email)
393 #what's the pointer doing here?
394 self.AddRoleToPerson('user', pointer)
397 self.AddPersonKey(pointer, {'key_type' : 'ssh', \
400 #No node adding outside OAR
404 #No site or node record update allowed
405 def update (self, old_sfa_record, new_sfa_record, hrn, new_key):
406 pointer = old_sfa_record['pointer']
407 old_sfa_record_type = old_sfa_record['type']
409 # new_key implemented for users only
410 if new_key and old_sfa_record_type not in [ 'user' ]:
411 raise UnknownSfaType(old_sfa_record_type)
413 #if (type == "authority"):
414 #self.shell.UpdateSite(pointer, new_sfa_record)
416 if old_sfa_record_type == "slice":
417 slab_record = self.sfa_fields_to_slab_fields(old_sfa_record_type, \
419 if 'name' in slab_record:
420 slab_record.pop('name')
421 #Prototype should be UpdateSlice(self,
422 #auth, slice_id_or_name, slice_fields)
423 #Senslab cannot update slice since slice = job
424 #so we must delete and create another job
425 self.UpdateSlice(pointer, slab_record)
427 elif old_sfa_record_type == "user":
429 all_fields = new_sfa_record
430 for key in all_fields.keys():
431 if key in ['first_name', 'last_name', 'title', 'email',
432 'password', 'phone', 'url', 'bio', 'accepted_aup',
434 update_fields[key] = all_fields[key]
435 self.UpdatePerson(pointer, update_fields)
438 # must check this key against the previous one if it exists
439 persons = self.GetPersons([pointer], ['key_ids'])
441 keys = person['key_ids']
442 keys = self.GetKeys(person['key_ids'])
444 # Delete all stale keys
447 if new_key != key['key']:
448 self.DeleteKey(key['key_id'])
452 self.AddPersonKey(pointer, {'key_type': 'ssh', \
459 def remove (self, sfa_record):
460 sfa_record_type = sfa_record['type']
461 hrn = sfa_record['hrn']
462 record_id = sfa_record['record_id']
463 if sfa_record_type == 'user':
465 #get user from senslab ldap
466 person = self.GetPersons(sfa_record)
467 #No registering at a given site in Senslab.
468 #Once registered to the LDAP, all senslab sites are
471 #Mark account as disabled in ldap
472 self.DeletePerson(sfa_record)
473 elif sfa_record_type == 'slice':
474 if self.GetSlices(slice_filter = hrn, \
475 slice_filter_type = 'slice_hrn'):
476 self.DeleteSlice(sfa_record_type)
478 #elif type == 'authority':
479 #if self.GetSites(pointer):
480 #self.DeleteSite(pointer)
486 #TODO clean GetPeers. 05/07/12SA
487 def GetPeers (self, auth = None, peer_filter=None, return_fields_list=None):
489 existing_records = {}
490 existing_hrns_by_types = {}
491 logger.debug("SLABDRIVER \tGetPeers auth = %s, peer_filter %s, \
492 return_field %s " %(auth , peer_filter, return_fields_list))
493 all_records = dbsession.query(RegRecord).\
494 filter(RegRecord.type.like('%authority%')).all()
495 for record in all_records:
496 existing_records[(record.hrn, record.type)] = record
497 if record.type not in existing_hrns_by_types:
498 existing_hrns_by_types[record.type] = [record.hrn]
499 logger.debug("SLABDRIVER \tGetPeer\t NOT IN \
500 existing_hrns_by_types %s " %( existing_hrns_by_types))
503 logger.debug("SLABDRIVER \tGetPeer\t \INNN type %s hrn %s " \
504 %(record.type,record.hrn))
505 existing_hrns_by_types[record.type].append(record.hrn)
508 logger.debug("SLABDRIVER \tGetPeer\texisting_hrns_by_types %s "\
509 %( existing_hrns_by_types))
514 records_list.append(existing_records[(peer_filter,'authority')])
516 for hrn in existing_hrns_by_types['authority']:
517 records_list.append(existing_records[(hrn,'authority')])
519 logger.debug("SLABDRIVER \tGetPeer \trecords_list %s " \
525 return_records = records_list
526 if not peer_filter and not return_fields_list:
530 logger.debug("SLABDRIVER \tGetPeer return_records %s " \
532 return return_records
535 #TODO : Handling OR request in make_ldap_filters_from_records
536 #instead of the for loop
537 #over the records' list
538 def GetPersons(self, person_filter=None, return_fields_list=None):
540 person_filter should be a list of dictionnaries when not set to None.
541 Returns a list of users whose accounts are enabled found in ldap.
544 logger.debug("SLABDRIVER \tGetPersons person_filter %s" \
547 if person_filter and isinstance(person_filter, list):
548 #If we are looking for a list of users (list of dict records)
549 #Usually the list contains only one user record
550 for searched_attributes in person_filter:
552 #Get only enabled user accounts in senslab LDAP :
553 #add a filter for make_ldap_filters_from_record
554 person = self.ldap.LdapFindUser(searched_attributes, \
555 is_user_enabled=True)
556 person_list.append(person)
559 #Get only enabled user accounts in senslab LDAP :
560 #add a filter for make_ldap_filters_from_record
561 person_list = self.ldap.LdapFindUser(is_user_enabled=True)
565 def GetTimezone(self):
566 server_timestamp, server_tz = self.oar.parser.\
567 SendRequest("GET_timezone")
568 return server_timestamp, server_tz
571 def DeleteJobs(self, job_id, slice_hrn):
572 if not job_id or job_id is -1:
574 username = slice_hrn.split(".")[-1].rstrip("_slice")
576 reqdict['method'] = "delete"
577 reqdict['strval'] = str(job_id)
579 answer = self.oar.POSTRequestToOARRestAPI('DELETE_jobs_id', \
581 logger.debug("SLABDRIVER \tDeleteJobs jobid %s \r\n answer %s " \
587 ##TODO : Unused GetJobsId ? SA 05/07/12
588 #def GetJobsId(self, job_id, username = None ):
590 #Details about a specific job.
591 #Includes details about submission time, jot type, state, events,
592 #owner, assigned ressources, walltime etc...
596 #node_list_k = 'assigned_network_address'
597 ##Get job info from OAR
598 #job_info = self.oar.parser.SendRequest(req, job_id, username)
600 #logger.debug("SLABDRIVER \t GetJobsId %s " %(job_info))
602 #if job_info['state'] == 'Terminated':
603 #logger.debug("SLABDRIVER \t GetJobsId job %s TERMINATED"\
606 #if job_info['state'] == 'Error':
607 #logger.debug("SLABDRIVER \t GetJobsId ERROR message %s "\
612 #logger.error("SLABDRIVER \tGetJobsId KeyError")
615 #parsed_job_info = self.get_info_on_reserved_nodes(job_info, \
617 ##Replaces the previous entry
618 ##"assigned_network_address" / "reserved_resources"
620 #job_info.update({'node_ids':parsed_job_info[node_list_k]})
621 #del job_info[node_list_k]
622 #logger.debug(" \r\nSLABDRIVER \t GetJobsId job_info %s " %(job_info))
626 def GetJobsResources(self, job_id, username = None):
627 #job_resources=['reserved_resources', 'assigned_resources',\
628 #'job_id', 'job_uri', 'assigned_nodes',\
630 #assigned_res = ['resource_id', 'resource_uri']
631 #assigned_n = ['node', 'node_uri']
633 req = "GET_jobs_id_resources"
634 node_list_k = 'reserved_resources'
636 #Get job resources list from OAR
637 node_id_list = self.oar.parser.SendRequest(req, job_id, username)
638 logger.debug("SLABDRIVER \t GetJobsResources %s " %(node_id_list))
641 self.__get_hostnames_from_oar_node_ids(node_id_list)
643 #parsed_job_info = self.get_info_on_reserved_nodes(job_info, \
645 #Replaces the previous entry "assigned_network_address" /
646 #"reserved_resources"
648 job_info = {'node_ids': hostname_list}
653 def get_info_on_reserved_nodes(self, job_info, node_list_name):
654 #Get the list of the testbed nodes records and make a
655 #dictionnary keyed on the hostname out of it
656 node_list_dict = self.GetNodes()
657 #node_hostname_list = []
658 node_hostname_list = [node['hostname'] for node in node_list_dict]
659 #for node in node_list_dict:
660 #node_hostname_list.append(node['hostname'])
661 node_dict = dict(zip(node_hostname_list, node_list_dict))
663 reserved_node_hostname_list = []
664 for index in range(len(job_info[node_list_name])):
665 #job_info[node_list_name][k] =
666 reserved_node_hostname_list[index] = \
667 node_dict[job_info[node_list_name][index]]['hostname']
669 logger.debug("SLABDRIVER \t get_info_on_reserved_nodes \
670 reserved_node_hostname_list %s" \
671 %(reserved_node_hostname_list))
673 logger.error("SLABDRIVER \t get_info_on_reserved_nodes KEYERROR " )
675 return reserved_node_hostname_list
677 def GetNodesCurrentlyInUse(self):
678 """Returns a list of all the nodes already involved in an oar job"""
679 return self.oar.parser.SendRequest("GET_running_jobs")
681 def __get_hostnames_from_oar_node_ids(self, resource_id_list ):
682 full_nodes_dict_list = self.GetNodes()
683 #Put the full node list into a dictionary keyed by oar node id
684 oar_id_node_dict = {}
685 for node in full_nodes_dict_list:
686 oar_id_node_dict[node['oar_id']] = node
688 logger.debug("SLABDRIVER \t __get_hostnames_from_oar_node_ids\
689 oar_id_node_dict %s" %(oar_id_node_dict))
691 hostname_dict_list = []
692 for resource_id in resource_id_list:
693 hostname_dict_list.append({'hostname' : \
694 oar_id_node_dict[resource_id]['hostname'],
695 'site_id' : oar_id_node_dict[resource_id]['site']})
697 #hostname_list.append(oar_id_node_dict[resource_id]['hostname'])
698 return hostname_dict_list
700 def GetReservedNodes(self):
701 #Get the nodes in use and the reserved nodes
702 reservation_dict_list = \
703 self.oar.parser.SendRequest("GET_reserved_nodes")
706 for resa in reservation_dict_list:
707 logger.debug ("GetReservedNodes resa %s"%(resa))
708 #dict list of hostnames and their site
709 resa['reserved_nodes'] = \
710 self.__get_hostnames_from_oar_node_ids(resa['resource_ids'])
712 #del resa['resource_ids']
713 return reservation_dict_list
715 def GetNodes(self, node_filter_dict = None, return_fields_list = None):
717 node_filter_dict : dictionnary of lists
720 node_dict_by_id = self.oar.parser.SendRequest("GET_resources_full")
721 node_dict_list = node_dict_by_id.values()
723 #No filtering needed return the list directly
724 if not (node_filter_dict or return_fields_list):
725 return node_dict_list
727 return_node_list = []
729 for filter_key in node_filter_dict:
731 #Filter the node_dict_list by each value contained in the
732 #list node_filter_dict[filter_key]
733 for value in node_filter_dict[filter_key]:
734 for node in node_dict_list:
735 if node[filter_key] == value:
736 if return_fields_list :
738 for k in return_fields_list:
740 return_node_list.append(tmp)
742 return_node_list.append(node)
744 logger.log_exc("GetNodes KeyError")
748 return return_node_list
751 def GetSites(self, site_filter_name_list = None, return_fields_list = None):
752 site_dict = self.oar.parser.SendRequest("GET_sites")
753 #site_dict : dict where the key is the sit ename
754 return_site_list = []
755 if not ( site_filter_name_list or return_fields_list):
756 return_site_list = site_dict.values()
757 return return_site_list
759 for site_filter_name in site_filter_name_list:
760 if site_filter_name in site_dict:
761 if return_fields_list:
762 for field in return_fields_list:
765 tmp[field] = site_dict[site_filter_name][field]
767 logger.error("GetSites KeyError %s "%(field))
769 return_site_list.append(tmp)
771 return_site_list.append( site_dict[site_filter_name])
774 return return_site_list
775 #warning return_fields_list paramr emoved (Not used)
776 def GetSlices(self, slice_filter = None, slice_filter_type = None):
777 #def GetSlices(self, slice_filter = None, slice_filter_type = None, \
778 #return_fields_list = None):
779 """ Get the slice records from the slab db.
780 Returns a slice ditc if slice_filter and slice_filter_type
782 Returns a list of slice dictionnaries if there are no filters
786 return_slice_list = []
789 authorized_filter_types_list = ['slice_hrn', 'record_id_user']
790 logger.debug("SLABDRIVER \tGetSlices authorized_filter_types_list %s"\
791 %(authorized_filter_types_list))
792 if slice_filter_type in authorized_filter_types_list:
793 if slice_filter_type == 'slice_hrn':
794 slicerec = slab_dbsession.query(SliceSenslab).\
795 filter_by(slice_hrn = slice_filter).first()
797 if slice_filter_type == 'record_id_user':
798 slicerec = slab_dbsession.query(SliceSenslab).\
799 filter_by(record_id_user = slice_filter).first()
803 slicerec_dict = slicerec.dump_sqlalchemyobj_to_dict()
804 logger.debug("SLABDRIVER \tGetSlices slicerec_dict %s" \
807 login = slicerec_dict['slice_hrn'].split(".")[1].split("_")[0]
808 logger.debug("\r\n SLABDRIVER \tGetSlices login %s \
810 %(login, slicerec_dict))
811 if slicerec_dict['oar_job_id'] is not -1:
812 #Check with OAR the status of the job if a job id is in
814 rslt = self.GetJobsResources(slicerec_dict['oar_job_id'], \
818 slicerec_dict.update(rslt)
819 slicerec_dict.update({'hrn':\
820 str(slicerec_dict['slice_hrn'])})
821 #If GetJobsResources is empty, this means the job is
822 #now in the 'Terminated' state
823 #Update the slice record
825 self.db.update_job(slice_filter, job_id = -1)
826 slicerec_dict['oar_job_id'] = -1
828 update({'hrn':str(slicerec_dict['slice_hrn'])})
831 slicerec_dict['node_ids'] = slicerec_dict['node_list']
835 logger.debug("SLABDRIVER.PY \tGetSlices slicerec_dict %s"\
842 return_slice_list = slab_dbsession.query(SliceSenslab).all()
844 logger.debug("SLABDRIVER.PY \tGetSlices slices %s \
845 slice_filter %s " %(return_slice_list, slice_filter))
847 #if return_fields_list:
848 #return_slice_list = parse_filter(sliceslist, \
849 #slice_filter,'slice', return_fields_list)
851 return return_slice_list
857 def testbed_name (self): return self.hrn
859 # 'geni_request_rspec_versions' and 'geni_ad_rspec_versions' are mandatory
860 def aggregate_version (self):
861 version_manager = VersionManager()
862 ad_rspec_versions = []
863 request_rspec_versions = []
864 for rspec_version in version_manager.versions:
865 if rspec_version.content_type in ['*', 'ad']:
866 ad_rspec_versions.append(rspec_version.to_dict())
867 if rspec_version.content_type in ['*', 'request']:
868 request_rspec_versions.append(rspec_version.to_dict())
870 'testbed':self.testbed_name(),
871 'geni_request_rspec_versions': request_rspec_versions,
872 'geni_ad_rspec_versions': ad_rspec_versions,
881 # Convert SFA fields to PLC fields for use when registering up updating
882 # registry record in the PLC database
884 # @param type type of record (user, slice, ...)
885 # @param hrn human readable name
886 # @param sfa_fields dictionary of SFA fields
887 # @param slab_fields dictionary of PLC fields (output)
889 def sfa_fields_to_slab_fields(self, sfa_type, hrn, record):
891 def convert_ints(tmpdict, int_fields):
892 for field in int_fields:
894 tmpdict[field] = int(tmpdict[field])
897 #for field in record:
898 # slab_record[field] = record[field]
900 if sfa_type == "slice":
901 #instantion used in get_slivers ?
902 if not "instantiation" in slab_record:
903 slab_record["instantiation"] = "senslab-instantiated"
904 slab_record["hrn"] = hrn_to_pl_slicename(hrn)
905 logger.debug("SLABDRIVER.PY sfa_fields_to_slab_fields \
906 slab_record %s hrn_to_pl_slicename(hrn) hrn %s " \
907 %(slab_record['hrn'], hrn))
909 slab_record["url"] = record["url"]
910 if "description" in record:
911 slab_record["description"] = record["description"]
912 if "expires" in record:
913 slab_record["expires"] = int(record["expires"])
915 #nodes added by OAR only and then imported to SFA
916 #elif type == "node":
917 #if not "hostname" in slab_record:
918 #if not "hostname" in record:
919 #raise MissingSfaInfo("hostname")
920 #slab_record["hostname"] = record["hostname"]
921 #if not "model" in slab_record:
922 #slab_record["model"] = "geni"
925 #elif type == "authority":
926 #slab_record["login_base"] = hrn_to_slab_login_base(hrn)
928 #if not "name" in slab_record:
929 #slab_record["name"] = hrn
931 #if not "abbreviated_name" in slab_record:
932 #slab_record["abbreviated_name"] = hrn
934 #if not "enabled" in slab_record:
935 #slab_record["enabled"] = True
937 #if not "is_public" in slab_record:
938 #slab_record["is_public"] = True
945 def __transforms_timestamp_into_date(self, xp_utc_timestamp = None):
946 """ Transforms unix timestamp into valid OAR date format """
948 #Used in case of a scheduled experiment (not immediate)
949 #To run an XP immediately, don't specify date and time in RSpec
950 #They will be set to None.
952 #transform the xp_utc_timestamp into server readable time
953 xp_server_readable_date = datetime.fromtimestamp(int(\
954 xp_utc_timestamp)).strftime(self.time_format)
956 return xp_server_readable_date
961 def LaunchExperimentOnOAR(self, slice_dict, added_nodes, slice_user=None):
962 """ Creates the structure needed for a correct POST on OAR.
963 Makes the timestamp transformation into the appropriate format.
964 Sends the POST request to create the job with the resources in
972 slice_name = slice_dict['name']
974 slot = slice_dict['timeslot']
975 logger.debug("SLABDRIVER.PY \tLaunchExperimentOnOAR \
978 #Running on default parameters
979 #XP immediate , 10 mins
980 slot = { 'date':None, 'start_time':None,
981 'timezone':None, 'duration':None }#10 min
983 reqdict['workdir'] = '/tmp'
984 reqdict['resource'] = "{network_address in ("
986 for node in added_nodes:
987 logger.debug("OARrestapi \tLaunchExperimentOnOAR \
990 #Get the ID of the node : remove the root auth and put
991 # the site in a separate list.
992 # NT: it's not clear for me if the nodenames will have the senslab
993 #prefix so lets take the last part only, for now.
995 # Again here it's not clear if nodes will be prefixed with <site>_,
996 #lets split and tanke the last part for now.
997 #s=lastpart.split("_")
1000 reqdict['resource'] += "'" + nodeid + "', "
1001 nodeid_list.append(nodeid)
1003 custom_length = len(reqdict['resource'])- 2
1004 reqdict['resource'] = reqdict['resource'][0:custom_length] + \
1005 ")}/nodes=" + str(len(nodeid_list))
1007 #if slot['duration']:
1008 walltime, sleep_walltime = __process_walltime(duration = \
1011 #walltime, sleep_walltime = self.__process_walltime(duration = None)
1013 reqdict['resource'] += ",walltime=" + str(walltime[0]) + \
1014 ":" + str(walltime[1]) + ":" + str(walltime[2])
1015 reqdict['script_path'] = "/bin/sleep " + str(sleep_walltime)
1019 #In case of a scheduled experiment (not immediate)
1020 #To run an XP immediately, don't specify date and time in RSpec
1021 #They will be set to None.
1022 server_timestamp, server_tz = self.GetTimezone()
1023 if slot['date'] and slot['start_time']:
1024 if slot['timezone'] is '' or slot['timezone'] is None:
1025 #assume it is server timezone
1026 from_zone = tz.gettz(server_tz)
1027 logger.warning("SLABDRIVER \tLaunchExperimentOnOAR timezone \
1028 not specified server_tz %s from_zone %s" \
1029 %(server_tz, from_zone))
1031 #Get zone of the user from the reservation time given
1033 from_zone = tz.gettz(slot['timezone'])
1035 date = str(slot['date']) + " " + str(slot['start_time'])
1036 user_datetime = datetime.strptime(date, self.time_format)
1037 user_datetime = user_datetime.replace(tzinfo = from_zone)
1039 #Convert to server zone
1041 to_zone = tz.gettz(server_tz)
1042 reservation_date = user_datetime.astimezone(to_zone)
1043 #Readable time accpeted by OAR
1044 reqdict['reservation'] = reservation_date.strftime(self.time_format)
1046 logger.debug("SLABDRIVER \tLaunchExperimentOnOAR \
1047 reqdict['reservation'] %s " %(reqdict['reservation']))
1050 # Immediate XP. Not need to add special parameters.
1051 # normally not used in SFA
1056 reqdict['type'] = "deploy"
1057 reqdict['directory'] = ""
1058 reqdict['name'] = "TestSandrine"
1061 # first step : start the OAR job and update the job
1062 logger.debug("SLABDRIVER.PY \tLaunchExperimentOnOAR reqdict %s\
1063 \r\n site_list %s" %(reqdict, site_list))
1065 answer = self.oar.POSTRequestToOARRestAPI('POST_job', \
1066 reqdict, slice_user)
1067 logger.debug("SLABDRIVER \tLaunchExperimentOnOAR jobid %s " %(answer))
1069 jobid = answer['id']
1071 logger.log_exc("SLABDRIVER \tLaunchExperimentOnOAR \
1072 Impossible to create job %s " %(answer))
1075 logger.debug("SLABDRIVER \tLaunchExperimentOnOAR jobid %s \
1076 added_nodes %s slice_user %s" %(jobid, added_nodes, slice_user))
1077 self.db.update_job( slice_name, jobid, added_nodes)
1080 # second step : configure the experiment
1081 # we need to store the nodes in a yaml (well...) file like this :
1082 # [1,56,23,14,45,75] with name /tmp/sfa<jobid>.json
1083 job_file = open('/tmp/sfa/'+ str(jobid) + '.json', 'w')
1085 job_file.write(str(added_nodes[0].strip('node')))
1086 for node in added_nodes[1:len(added_nodes)] :
1087 job_file.write(', '+ node.strip('node'))
1091 # third step : call the senslab-experiment wrapper
1092 #command= "java -jar target/sfa-1.0-jar-with-dependencies.jar
1093 # "+str(jobid)+" "+slice_user
1094 javacmdline = "/usr/bin/java"
1096 "/opt/senslabexperimentwrapper/sfa-1.0-jar-with-dependencies.jar"
1097 #ret=subprocess.check_output(["/usr/bin/java", "-jar", ", \
1098 #str(jobid), slice_user])
1099 output = subprocess.Popen([javacmdline, "-jar", jarname, str(jobid), \
1100 slice_user],stdout=subprocess.PIPE).communicate()[0]
1102 logger.debug("SLABDRIVER \tLaunchExperimentOnOAR wrapper returns%s " \
1107 #Delete the jobs and updates the job id in the senslab table
1109 #Does not clear the node list
1110 def DeleteSliceFromNodes(self, slice_record):
1111 # Get user information
1113 self.DeleteJobs(slice_record['oar_job_id'], slice_record['hrn'])
1114 self.db.update_job(slice_record['hrn'], job_id = -1)
1120 def GetLeases(self, lease_filter_dict=None, return_fields_list=None):
1121 unfiltered_reservation_list = self.GetReservedNodes()
1122 reservation_list = []
1123 #Find the slice associated with this user senslab ldap uid
1124 logger.debug(" SLABDRIVER.PY \tGetLeases ")
1125 for resa in unfiltered_reservation_list:
1126 ldap_info = self.ldap.LdapSearch('(uid='+resa['user']+')')
1127 ldap_info = ldap_info[0][1]
1129 user = dbsession.query(RegUser).filter_by(email = \
1130 ldap_info['mail'][0]).first()
1132 slice_info = slab_dbsession.query(SliceSenslab).\
1133 filter_by(record_id_user = user.record_id).first()
1135 resa['slice_id'] = hrn_to_urn(slice_info.slice_hrn, 'slice')
1136 resa['component_id_list'] = []
1137 #Transform the hostnames into urns (component ids)
1138 for node in resa['reserved_nodes']:
1139 resa['component_id_list'].append(hostname_to_urn(self.hrn, \
1140 self.root_auth, node['hostname']))
1143 #Filter the reservation list if necessary
1144 #Returns all the leases associated with a given slice
1145 if lease_filter_dict:
1146 logger.debug("SLABDRIVER \tGetLeases lease_filter_dict %s"\
1147 %(lease_filter_dict))
1148 for resa in unfiltered_reservation_list:
1149 if lease_filter_dict['name'] == resa['slice_id']:
1150 reservation_list.append(resa)
1152 reservation_list = unfiltered_reservation_list
1154 logger.debug(" SLABDRIVER.PY \tGetLeases reservation_list %s"\
1155 %(reservation_list))
1156 return reservation_list
1158 def augment_records_with_testbed_info (self, sfa_records):
1159 return self.fill_record_info (sfa_records)
1161 def fill_record_info(self, record_list):
1163 Given a SFA record, fill in the senslab specific and SFA specific
1164 fields in the record.
1167 logger.debug("SLABDRIVER \tfill_record_info records %s " %(record_list))
1168 if not isinstance(record_list, list):
1169 record_list = [record_list]
1172 for record in record_list:
1173 #If the record is a SFA slice record, then add information
1174 #about the user of this slice. This kind of
1175 #information is in the Senslab's DB.
1176 if str(record['type']) == 'slice':
1177 #Get slab slice record.
1178 recslice = self.GetSlices(slice_filter = \
1179 str(record['hrn']),\
1180 slice_filter_type = 'slice_hrn')
1181 recuser = dbsession.query(RegRecord).filter_by(record_id = \
1182 recslice['record_id_user']).first()
1183 logger.debug( "SLABDRIVER.PY \t fill_record_info SLICE \
1184 rec %s \r\n \r\n" %(recslice))
1185 record.update({'PI':[recuser.hrn],
1186 'researcher': [recuser.hrn],
1187 'name':record['hrn'],
1188 'oar_job_id':recslice['oar_job_id'],
1190 'person_ids':[recslice['record_id_user']],
1191 'geni_urn':'', #For client_helper.py compatibility
1192 'keys':'', #For client_helper.py compatibility
1193 'key_ids':''}) #For client_helper.py compatibility
1195 elif str(record['type']) == 'user':
1196 #The record is a SFA user record.
1197 #Get the information about his slice from Senslab's DB
1198 #and add it to the user record.
1199 recslice = self.GetSlices(\
1200 slice_filter = record['record_id'],\
1201 slice_filter_type = 'record_id_user')
1203 logger.debug( "SLABDRIVER.PY \t fill_record_info user \
1204 rec %s \r\n \r\n" %(recslice))
1205 #Append slice record in records list,
1206 #therefore fetches user and slice info again(one more loop)
1207 #Will update PIs and researcher for the slice
1208 recuser = dbsession.query(RegRecord).filter_by(record_id = \
1209 recslice['record_id_user']).\
1211 recslice.update({'PI':[recuser.hrn],
1212 'researcher': [recuser.hrn],
1213 'name':record['hrn'],
1214 'oar_job_id':recslice['oar_job_id'],
1216 'person_ids':[recslice['record_id_user']]})
1218 #GetPersons takes [] as filters
1219 #user_slab = self.GetPersons([{'hrn':recuser.hrn}])
1220 user_slab = self.GetPersons([record])
1222 recslice.update({'type':'slice', \
1223 'hrn':recslice['slice_hrn']})
1224 record.update(user_slab[0])
1225 #For client_helper.py compatibility
1226 record.update( { 'geni_urn':'',
1229 record_list.append(recslice)
1231 logger.debug("SLABDRIVER.PY \tfill_record_info ADDING SLICE\
1232 INFO TO USER records %s" %(record_list))
1235 except TypeError, error:
1236 logger.log_exc("SLABDRIVER \t fill_record_info EXCEPTION %s"\
1241 #self.fill_record_slab_info(records)
1247 #TODO Update membership? update_membership_list SA 05/07/12
1248 #def update_membership_list(self, oldRecord, record, listName, addFunc, \
1250 ## get a list of the HRNs tht are members of the old and new records
1252 #oldList = oldRecord.get(listName, [])
1255 #newList = record.get(listName, [])
1257 ## if the lists are the same, then we don't have to update anything
1258 #if (oldList == newList):
1261 ## build a list of the new person ids, by looking up each person to get
1265 #records = table.find({'type': 'user', 'hrn': newList})
1266 #for rec in records:
1267 #newIdList.append(rec['pointer'])
1269 ## build a list of the old person ids from the person_ids field
1271 #oldIdList = oldRecord.get("person_ids", [])
1272 #containerId = oldRecord.get_pointer()
1274 ## if oldRecord==None, then we are doing a Register, instead of an
1277 #containerId = record.get_pointer()
1279 ## add people who are in the new list, but not the oldList
1280 #for personId in newIdList:
1281 #if not (personId in oldIdList):
1282 #addFunc(self.plauth, personId, containerId)
1284 ## remove people who are in the old list, but not the new list
1285 #for personId in oldIdList:
1286 #if not (personId in newIdList):
1287 #delFunc(self.plauth, personId, containerId)
1289 #def update_membership(self, oldRecord, record):
1291 #if record.type == "slice":
1292 #self.update_membership_list(oldRecord, record, 'researcher',
1293 #self.users.AddPersonToSlice,
1294 #self.users.DeletePersonFromSlice)
1295 #elif record.type == "authority":
1300 # I don't think you plan on running a component manager at this point
1301 # let me clean up the mess of ComponentAPI that is deprecated anyways
1304 #TODO FUNCTIONS SECTION 04/07/2012 SA
1306 #TODO : Is UnBindObjectFromPeer still necessary ? Currently does nothing
1308 def UnBindObjectFromPeer(self, auth, object_type, object_id, shortname):
1309 """ This method is a hopefully temporary hack to let the sfa correctly
1310 detach the objects it creates from a remote peer object. This is
1311 needed so that the sfa federation link can work in parallel with
1312 RefreshPeer, as RefreshPeer depends on remote objects being correctly
1315 auth : struct, API authentication structure
1316 AuthMethod : string, Authentication method to use
1317 object_type : string, Object type, among 'site','person','slice',
1319 object_id : int, object_id
1320 shortname : string, peer shortname
1324 logger.warning("SLABDRIVER \tUnBindObjectFromPeer EMPTY-\
1328 #TODO Is BindObjectToPeer still necessary ? Currently does nothing
1330 def BindObjectToPeer(self, auth, object_type, object_id, shortname=None, \
1331 remote_object_id=None):
1332 """This method is a hopefully temporary hack to let the sfa correctly
1333 attach the objects it creates to a remote peer object. This is needed
1334 so that the sfa federation link can work in parallel with RefreshPeer,
1335 as RefreshPeer depends on remote objects being correctly marked.
1337 shortname : string, peer shortname
1338 remote_object_id : int, remote object_id, set to 0 if unknown
1342 logger.warning("SLABDRIVER \tBindObjectToPeer EMPTY - DO NOTHING \r\n ")
1345 #TODO UpdateSlice 04/07/2012 SA
1346 #Funciton should delete and create another job since oin senslab slice=job
1347 def UpdateSlice(self, auth, slice_id_or_name, slice_fields=None):
1348 """Updates the parameters of an existing slice with the values in
1350 Users may only update slices of which they are members.
1351 PIs may update any of the slices at their sites, or any slices of
1352 which they are members. Admins may update any slice.
1353 Only PIs and admins may update max_nodes. Slices cannot be renewed
1354 (by updating the expires parameter) more than 8 weeks into the future.
1355 Returns 1 if successful, faults otherwise.
1359 logger.warning("SLABDRIVER UpdateSlice EMPTY - DO NOTHING \r\n ")
1362 #TODO UpdatePerson 04/07/2012 SA
1363 def UpdatePerson(self, auth, person_id_or_email, person_fields=None):
1364 """Updates a person. Only the fields specified in person_fields
1365 are updated, all other fields are left untouched.
1366 Users and techs can only update themselves. PIs can only update
1367 themselves and other non-PIs at their sites.
1368 Returns 1 if successful, faults otherwise.
1372 logger.warning("SLABDRIVER UpdatePerson EMPTY - DO NOTHING \r\n ")
1375 #TODO GetKeys 04/07/2012 SA
1376 def GetKeys(self, auth, key_filter=None, return_fields=None):
1377 """Returns an array of structs containing details about keys.
1378 If key_filter is specified and is an array of key identifiers,
1379 or a struct of key attributes, only keys matching the filter
1380 will be returned. If return_fields is specified, only the
1381 specified details will be returned.
1383 Admin may query all keys. Non-admins may only query their own keys.
1387 logger.warning("SLABDRIVER GetKeys EMPTY - DO NOTHING \r\n ")
1390 #TODO DeleteKey 04/07/2012 SA
1391 def DeleteKey(self, auth, key_id):
1393 Non-admins may only delete their own keys.
1394 Returns 1 if successful, faults otherwise.
1398 logger.warning("SLABDRIVER DeleteKey EMPTY - DO NOTHING \r\n ")
1402 #TODO : Check rights to delete person
1403 def DeletePerson(self, auth, person_record):
1404 """ Disable an existing account in senslab LDAP.
1405 Users and techs can only delete themselves. PIs can only
1406 delete themselves and other non-PIs at their sites.
1407 ins can delete anyone.
1408 Returns 1 if successful, faults otherwise.
1412 #Disable user account in senslab LDAP
1413 ret = self.ldap.LdapMarkUserAsDeleted(person_record)
1414 logger.warning("SLABDRIVER DeletePerson %s " %(person_record))
1417 #TODO Check DeleteSlice, check rights 05/07/2012 SA
1418 def DeleteSlice(self, auth, slice_record):
1419 """ Deletes the specified slice.
1420 Senslab : Kill the job associated with the slice if there is one
1421 using DeleteSliceFromNodes.
1422 Updates the slice record in slab db to remove the slice nodes.
1424 Users may only delete slices of which they are members. PIs may
1425 delete any of the slices at their sites, or any slices of which
1426 they are members. Admins may delete any slice.
1427 Returns 1 if successful, faults otherwise.
1431 self.DeleteSliceFromNodes(slice_record)
1432 self.db.update_job(slice_record['hrn'], job_id = -1, nodes = [])
1433 logger.warning("SLABDRIVER DeleteSlice %s "%(slice_record))
1436 #TODO AddPerson 04/07/2012 SA
1437 def AddPerson(self, auth, person_fields=None):
1438 """Adds a new account. Any fields specified in person_fields are used,
1439 otherwise defaults are used.
1440 Accounts are disabled by default. To enable an account,
1442 Returns the new person_id (> 0) if successful, faults otherwise.
1446 logger.warning("SLABDRIVER AddPerson EMPTY - DO NOTHING \r\n ")
1449 #TODO AddPersonToSite 04/07/2012 SA
1450 def AddPersonToSite (self, auth, person_id_or_email, \
1451 site_id_or_login_base=None):
1452 """ Adds the specified person to the specified site. If the person is
1453 already a member of the site, no errors are returned. Does not change
1454 the person's primary site.
1455 Returns 1 if successful, faults otherwise.
1459 logger.warning("SLABDRIVER AddPersonToSite EMPTY - DO NOTHING \r\n ")
1462 #TODO AddRoleToPerson : Not sure if needed in senslab 04/07/2012 SA
1463 def AddRoleToPerson(self, auth, role_id_or_name, person_id_or_email):
1464 """Grants the specified role to the person.
1465 PIs can only grant the tech and user roles to users and techs at their
1466 sites. Admins can grant any role to any user.
1467 Returns 1 if successful, faults otherwise.
1471 logger.warning("SLABDRIVER AddRoleToPerson EMPTY - DO NOTHING \r\n ")
1474 #TODO AddPersonKey 04/07/2012 SA
1475 def AddPersonKey(self, auth, person_id_or_email, key_fields=None):
1476 """Adds a new key to the specified account.
1477 Non-admins can only modify their own keys.
1478 Returns the new key_id (> 0) if successful, faults otherwise.
1482 logger.warning("SLABDRIVER AddPersonKey EMPTY - DO NOTHING \r\n ")