4 from datetime import datetime
6 from sfa.util.faults import SliverDoesNotExist, UnknownSfaType
7 from sfa.util.sfalogging import logger
8 from sfa.storage.alchemy import dbsession
9 from sfa.storage.model import RegRecord, RegUser, RegSlice
10 from sqlalchemy.orm import joinedload
11 from sfa.trust.credential import Credential
14 from sfa.managers.driver import Driver
15 from sfa.rspecs.version_manager import VersionManager
16 from sfa.rspecs.rspec import RSpec
18 from sfa.util.xrn import Xrn, hrn_to_urn, get_authority
21 ## thierry: everything that is API-related (i.e. handling incoming requests)
23 # SlabDriver should be really only about talking to the senslab testbed
26 from sfa.senslab.OARrestapi import OARrestapi
27 from sfa.senslab.LDAPapi import LDAPapi
29 from sfa.senslab.slabpostgres import SlabDB, slab_dbsession, SenslabXP
32 from sfa.senslab.slabaggregate import SlabAggregate, slab_xrn_to_hostname, \
34 from sfa.senslab.slabslices import SlabSlices
39 # this inheritance scheme is so that the driver object can receive
40 # GetNodes or GetSites sorts of calls directly
41 # and thus minimize the differences in the managers with the pl version
45 class SlabDriver(Driver):
46 """ Senslab Driver class inherited from Driver generic class.
48 Contains methods compliant with the SFA standard and the testbed
49 infrastructure (calls to LDAP and OAR).
51 def __init__(self, config):
52 Driver.__init__ (self, config)
54 self.hrn = config.SFA_INTERFACE_HRN
55 self.root_auth = config.SFA_REGISTRY_ROOT_AUTH
56 self.oar = OARrestapi()
58 self.time_format = "%Y-%m-%d %H:%M:%S"
59 self.db = SlabDB(config, debug = False)
63 def sliver_status(self, slice_urn, slice_hrn):
64 """Receive a status request for slice named urn/hrn
65 urn:publicid:IDN+senslab+nturro_slice hrn senslab.nturro_slice
66 shall return a structure as described in
67 http://groups.geni.net/geni/wiki/GAPI_AM_API_V2#SliverStatus
68 NT : not sure if we should implement this or not, but used by sface.
72 #First get the slice with the slice hrn
73 slice_list = self.GetSlices(slice_filter = slice_hrn, \
74 slice_filter_type = 'slice_hrn')
76 if len(slice_list) is 0:
77 raise SliverDoesNotExist("%s slice_hrn" % (slice_hrn))
79 #Slice has the same slice hrn for each slice in the slice/lease list
80 #So fetch the info on the user once
81 one_slice = slice_list[0]
82 #recuser = dbsession.query(RegRecord).filter_by(record_id = \
83 #one_slice['record_id_user']).first()
85 #Make a list of all the nodes hostnames in use for this slice
87 for single_slice in slice_list:
88 for node in single_slice['node_ids']:
89 slice_nodes_list.append(node['hostname'])
91 #Get all the corresponding nodes details
92 nodes_all = self.GetNodes({'hostname':slice_nodes_list},
93 ['node_id', 'hostname','site','boot_state'])
94 nodeall_byhostname = dict([(one_node['hostname'], one_node) \
95 for one_node in nodes_all])
99 for single_slice in slice_list:
102 top_level_status = 'empty'
105 ['geni_urn','pl_login','geni_status','geni_resources'], None)
106 result['pl_login'] = one_slice['reg_researchers']['hrn']
107 logger.debug("Slabdriver - sliver_status Sliver status \
108 urn %s hrn %s single_slice %s \r\n " \
109 %(slice_urn, slice_hrn, single_slice))
111 nodes_in_slice = single_slice['node_ids']
114 result['geni_status'] = top_level_status
115 result['geni_resources'] = []
118 top_level_status = 'ready'
120 #A job is running on Senslab for this slice
121 # report about the local nodes that are in the slice only
123 result['geni_urn'] = slice_urn
127 #timestamp = float(sl['startTime']) + float(sl['walltime'])
128 #result['pl_expires'] = strftime(self.time_format, \
129 #gmtime(float(timestamp)))
130 #result['slab_expires'] = strftime(self.time_format,\
131 #gmtime(float(timestamp)))
134 for node in single_slice['node_ids']:
136 #res['slab_hostname'] = node['hostname']
137 #res['slab_boot_state'] = node['boot_state']
139 res['pl_hostname'] = node['hostname']
140 res['pl_boot_state'] = \
141 nodeall_byhostname[node['hostname']]['boot_state']
142 #res['pl_last_contact'] = strftime(self.time_format, \
143 #gmtime(float(timestamp)))
144 sliver_id = Xrn(slice_urn, type='slice', \
145 id=nodeall_byhostname[node['hostname']]['node_id'], \
146 authority=self.hrn).urn
148 res['geni_urn'] = sliver_id
149 node_name = node['hostname']
150 if nodeall_byhostname[node_name]['boot_state'] == 'Alive':
152 res['geni_status'] = 'ready'
154 res['geni_status'] = 'failed'
155 top_level_status = 'failed'
157 res['geni_error'] = ''
159 resources.append(res)
161 result['geni_status'] = top_level_status
162 result['geni_resources'] = resources
163 logger.debug("SLABDRIVER \tsliver_statusresources %s res %s "\
167 def get_user(self, hrn):
168 return dbsession.query(RegRecord).filter_by(hrn = hrn).first()
171 def create_sliver (self, slice_urn, slice_hrn, creds, rspec_string, \
173 aggregate = SlabAggregate(self)
175 slices = SlabSlices(self)
176 peer = slices.get_peer(slice_hrn)
177 sfa_peer = slices.get_sfa_peer(slice_hrn)
180 if not isinstance(creds, list):
184 slice_record = users[0].get('slice_record', {})
185 logger.debug("SLABDRIVER.PY \t ===============create_sliver \t\
186 creds %s \r\n \r\n users %s" \
188 slice_record['user'] = {'keys':users[0]['keys'], \
189 'email':users[0]['email'], \
190 'hrn':slice_record['reg-researchers'][0]}
192 rspec = RSpec(rspec_string)
193 logger.debug("SLABDRIVER.PY \t create_sliver \trspec.version \
194 %s slice_record %s users %s" \
195 %(rspec.version,slice_record, users))
198 # ensure site record exists?
199 # ensure slice record exists
200 #Removed options to verify_slice SA 14/08/12
201 sfa_slice = slices.verify_slice(slice_hrn, slice_record, peer, \
204 # ensure person records exists
205 #verify_persons returns added persons but since the return value
207 slices.verify_persons(slice_hrn, sfa_slice, users, peer, \
208 sfa_peer, options=options)
209 #requested_attributes returned by rspec.version.get_slice_attributes()
210 #unused, removed SA 13/08/12
211 rspec.version.get_slice_attributes()
213 logger.debug("SLABDRIVER.PY create_sliver slice %s " %(sfa_slice))
215 # add/remove slice from nodes
217 requested_slivers = [node.get('component_id') \
218 for node in rspec.version.get_nodes_with_slivers()\
219 if node.get('authority_id') is self.root_auth]
220 l = [ node for node in rspec.version.get_nodes_with_slivers() ]
221 logger.debug("SLADRIVER \tcreate_sliver requested_slivers \
222 requested_slivers %s listnodes %s" \
223 %(requested_slivers,l))
224 #verify_slice_nodes returns nodes, but unused here. Removed SA 13/08/12.
225 #slices.verify_slice_nodes(sfa_slice, requested_slivers, peer)
228 requested_lease_list = []
230 logger.debug("SLABDRIVER.PY \tcreate_sliver AVANTLEASE " )
231 rspec_requested_leases = rspec.version.get_leases()
232 for lease in rspec.version.get_leases():
233 single_requested_lease = {}
234 logger.debug("SLABDRIVER.PY \tcreate_sliver lease %s " %(lease))
236 if not lease.get('lease_id'):
237 if get_authority(lease['component_id']) == self.root_auth:
238 single_requested_lease['hostname'] = \
239 slab_xrn_to_hostname(\
240 lease.get('component_id').strip())
241 single_requested_lease['start_time'] = \
242 lease.get('start_time')
243 single_requested_lease['duration'] = lease.get('duration')
245 requested_lease_list.append(single_requested_lease)
247 logger.debug("SLABDRIVER.PY \tcreate_sliver APRESLEASE" )
248 #dCreate dict of leases by start_time, regrouping nodes reserved
250 #time, for the same amount of time = one job on OAR
251 requested_job_dict = {}
252 for lease in requested_lease_list:
254 #In case it is an asap experiment start_time is empty
255 if lease['start_time'] == '':
256 lease['start_time'] = '0'
258 if lease['start_time'] not in requested_job_dict:
259 if isinstance(lease['hostname'], str):
260 lease['hostname'] = [lease['hostname']]
262 requested_job_dict[lease['start_time']] = lease
265 job_lease = requested_job_dict[lease['start_time']]
266 if lease['duration'] == job_lease['duration'] :
267 job_lease['hostname'].append(lease['hostname'])
272 logger.debug("SLABDRIVER.PY \tcreate_sliver requested_job_dict %s "\
273 %(requested_job_dict))
274 #verify_slice_leases returns the leases , but the return value is unused
275 #here. Removed SA 13/08/12
276 slices.verify_slice_leases(sfa_slice, \
277 requested_job_dict, peer)
279 return aggregate.get_rspec(slice_xrn=slice_urn, login=sfa_slice['login'],version=rspec.version)
282 def delete_sliver (self, slice_urn, slice_hrn, creds, options):
284 sfa_slice_list = self.GetSlices(slice_filter = slice_hrn, \
285 slice_filter_type = 'slice_hrn')
287 if not sfa_slice_list:
290 #Delete all in the slice
291 for sfa_slice in sfa_slice_list:
294 logger.debug("SLABDRIVER.PY delete_sliver slice %s" %(sfa_slice))
295 slices = SlabSlices(self)
296 # determine if this is a peer slice
298 peer = slices.get_peer(slice_hrn)
299 #TODO delete_sliver SA : UnBindObjectFromPeer should be
300 #used when there is another
301 #senslab testbed, which is not the case 14/08/12 .
303 logger.debug("SLABDRIVER.PY delete_sliver peer %s" %(peer))
306 self.UnBindObjectFromPeer('slice', \
307 sfa_slice['record_id_slice'], \
309 self.DeleteSliceFromNodes(sfa_slice)
312 self.BindObjectToPeer('slice', \
313 sfa_slice['record_id_slice'], \
314 peer, sfa_slice['peer_slice_id'])
318 def AddSlice(self, slice_record, user_record):
319 """Add slice to the sfa tables and senslab table only if the user
320 already exists in senslab database(user already registered in LDAP).
321 There is no way to separate adding the slice to the tesbed
322 and then importing it from the testbed to SFA because of
323 senslab's architecture. Therefore, sfa tables are updated here.
326 sfa_record = RegSlice(hrn=slice_record['slice_hrn'],
327 gid=slice_record['gid'],
328 pointer=slice_record['slice_id'],
329 authority=slice_record['authority'])
331 logger.debug("SLABDRIVER.PY AddSlice sfa_record %s user_record %s" \
332 %(sfa_record, user_record))
333 sfa_record.just_created()
334 dbsession.add(sfa_record)
336 #Update the reg-researcher dependance table
337 sfa_record.reg_researchers = [user_record]
340 #Update the senslab table with the new slice
341 #slab_slice = SenslabXP( slice_hrn = slice_record['slice_hrn'], \
342 #record_id_slice = sfa_record.record_id , \
343 #record_id_user = slice_record['record_id_user'], \
344 #peer_authority = slice_record['peer_authority'])
346 #logger.debug("SLABDRIVER.PY \tAddSlice slice_record %s \
347 #slab_slice %s sfa_record %s" \
348 #%(slice_record,slab_slice, sfa_record))
349 #slab_dbsession.add(slab_slice)
350 #slab_dbsession.commit()
353 # first 2 args are None in case of resource discovery
354 def list_resources (self, slice_urn, slice_hrn, creds, options):
355 #cached_requested = options.get('cached', True)
357 version_manager = VersionManager()
358 # get the rspec's return format from options
360 version_manager.get_version(options.get('geni_rspec_version'))
361 version_string = "rspec_%s" % (rspec_version)
363 #panos adding the info option to the caching key (can be improved)
364 if options.get('info'):
365 version_string = version_string + "_" + \
366 options.get('info', 'default')
368 # Adding the list_leases option to the caching key
369 if options.get('list_leases'):
370 version_string = version_string + "_"+options.get('list_leases', 'default')
372 # Adding geni_available to caching key
373 if options.get('geni_available'):
374 version_string = version_string + "_" + str(options.get('geni_available'))
376 # look in cache first
377 #if cached_requested and self.cache and not slice_hrn:
378 #rspec = self.cache.get(version_string)
380 #logger.debug("SlabDriver.ListResources: \
381 #returning cached advertisement")
384 #panos: passing user-defined options
385 aggregate = SlabAggregate(self)
386 #origin_hrn = Credential(string=creds[0]).get_gid_caller().get_hrn()
387 #options.update({'origin_hrn':origin_hrn})
388 rspec = aggregate.get_rspec(slice_xrn=slice_urn, \
389 version=rspec_version, options=options)
392 #if self.cache and not slice_hrn:
393 #logger.debug("Slab.ListResources: stores advertisement in cache")
394 #self.cache.add(version_string, rspec)
399 def list_slices (self, creds, options):
400 # look in cache first
402 #slices = self.cache.get('slices')
404 #logger.debug("PlDriver.list_slices returns from cache")
409 slices = self.GetSlices()
410 logger.debug("SLABDRIVER.PY \tlist_slices hrn %s \r\n \r\n" %(slices))
411 slice_hrns = [slab_slice['hrn'] for slab_slice in slices]
413 slice_urns = [hrn_to_urn(slice_hrn, 'slice') \
414 for slice_hrn in slice_hrns]
418 #logger.debug ("SlabDriver.list_slices stores value in cache")
419 #self.cache.add('slices', slice_urns)
424 def register (self, sfa_record, hrn, pub_key):
426 Adding new user, slice, node or site should not be handled
430 Adding users = LDAP Senslab
431 Adding slice = Import from LDAP users
437 def update (self, old_sfa_record, new_sfa_record, hrn, new_key):
438 """No site or node record update allowed in Senslab."""
440 pointer = old_sfa_record['pointer']
441 old_sfa_record_type = old_sfa_record['type']
443 # new_key implemented for users only
444 if new_key and old_sfa_record_type not in [ 'user' ]:
445 raise UnknownSfaType(old_sfa_record_type)
447 #if (type == "authority"):
448 #self.shell.UpdateSite(pointer, new_sfa_record)
450 if old_sfa_record_type == "slice":
451 slab_record = self.sfa_fields_to_slab_fields(old_sfa_record_type, \
453 if 'name' in slab_record:
454 slab_record.pop('name')
455 #Prototype should be UpdateSlice(self,
456 #auth, slice_id_or_name, slice_fields)
457 #Senslab cannot update slice since slice = job
458 #so we must delete and create another job
459 self.UpdateSlice(pointer, slab_record)
461 elif old_sfa_record_type == "user":
463 all_fields = new_sfa_record
464 for key in all_fields.keys():
465 if key in ['first_name', 'last_name', 'title', 'email',
466 'password', 'phone', 'url', 'bio', 'accepted_aup',
468 update_fields[key] = all_fields[key]
469 self.UpdatePerson(pointer, update_fields)
472 # must check this key against the previous one if it exists
473 persons = self.GetPersons([pointer], ['key_ids'])
475 keys = person['key_ids']
476 keys = self.GetKeys(person['key_ids'])
478 # Delete all stale keys
481 if new_key != key['key']:
482 self.DeleteKey(key['key_id'])
486 self.AddPersonKey(pointer, {'key_type': 'ssh', \
493 def remove (self, sfa_record):
494 sfa_record_type = sfa_record['type']
495 hrn = sfa_record['hrn']
496 if sfa_record_type == 'user':
498 #get user from senslab ldap
499 person = self.GetPersons(sfa_record)
500 #No registering at a given site in Senslab.
501 #Once registered to the LDAP, all senslab sites are
504 #Mark account as disabled in ldap
505 self.DeletePerson(sfa_record)
506 elif sfa_record_type == 'slice':
507 if self.GetSlices(slice_filter = hrn, \
508 slice_filter_type = 'slice_hrn'):
509 self.DeleteSlice(sfa_record)
511 #elif type == 'authority':
512 #if self.GetSites(pointer):
513 #self.DeleteSite(pointer)
519 #TODO clean GetPeers. 05/07/12SA
520 def GetPeers (self, auth = None, peer_filter=None, return_fields_list=None):
522 existing_records = {}
523 existing_hrns_by_types = {}
524 logger.debug("SLABDRIVER \tGetPeers auth = %s, peer_filter %s, \
525 return_field %s " %(auth , peer_filter, return_fields_list))
526 all_records = dbsession.query(RegRecord).filter(RegRecord.type.like('%authority%')).all()
528 for record in all_records:
529 existing_records[(record.hrn, record.type)] = record
530 if record.type not in existing_hrns_by_types:
531 existing_hrns_by_types[record.type] = [record.hrn]
533 existing_hrns_by_types[record.type].append(record.hrn)
536 logger.debug("SLABDRIVER \tGetPeer\texisting_hrns_by_types %s "\
537 %( existing_hrns_by_types))
542 records_list.append(existing_records[(peer_filter,'authority')])
544 for hrn in existing_hrns_by_types['authority']:
545 records_list.append(existing_records[(hrn,'authority')])
547 logger.debug("SLABDRIVER \tGetPeer \trecords_list %s " \
553 return_records = records_list
554 if not peer_filter and not return_fields_list:
558 logger.debug("SLABDRIVER \tGetPeer return_records %s " \
560 return return_records
563 #TODO : Handling OR request in make_ldap_filters_from_records
564 #instead of the for loop
565 #over the records' list
566 def GetPersons(self, person_filter=None):
568 person_filter should be a list of dictionnaries when not set to None.
569 Returns a list of users whose accounts are enabled found in ldap.
572 logger.debug("SLABDRIVER \tGetPersons person_filter %s" \
575 if person_filter and isinstance(person_filter, list):
576 #If we are looking for a list of users (list of dict records)
577 #Usually the list contains only one user record
578 for searched_attributes in person_filter:
580 #Get only enabled user accounts in senslab LDAP :
581 #add a filter for make_ldap_filters_from_record
582 person = self.ldap.LdapFindUser(searched_attributes, \
583 is_user_enabled=True)
584 #If a person was found, append it to the list
586 person_list.append(person)
588 #If the list is empty, return None
589 if len(person_list) is 0:
593 #Get only enabled user accounts in senslab LDAP :
594 #add a filter for make_ldap_filters_from_record
595 person_list = self.ldap.LdapFindUser(is_user_enabled=True)
599 def GetTimezone(self):
600 """ Get the OAR servier time and timezone.
601 Unused SA 16/11/12"""
602 server_timestamp, server_tz = self.oar.parser.\
603 SendRequest("GET_timezone")
604 return server_timestamp, server_tz
607 def DeleteJobs(self, job_id, slice_hrn):
608 if not job_id or job_id is -1:
610 username = slice_hrn.split(".")[-1].rstrip("_slice")
612 reqdict['method'] = "delete"
613 reqdict['strval'] = str(job_id)
616 answer = self.oar.POSTRequestToOARRestAPI('DELETE_jobs_id', \
618 logger.debug("SLABDRIVER \tDeleteJobs jobid %s \r\n answer %s \
619 username %s" %(job_id,answer, username))
624 ##TODO : Unused GetJobsId ? SA 05/07/12
625 #def GetJobsId(self, job_id, username = None ):
627 #Details about a specific job.
628 #Includes details about submission time, jot type, state, events,
629 #owner, assigned ressources, walltime etc...
633 #node_list_k = 'assigned_network_address'
634 ##Get job info from OAR
635 #job_info = self.oar.parser.SendRequest(req, job_id, username)
637 #logger.debug("SLABDRIVER \t GetJobsId %s " %(job_info))
639 #if job_info['state'] == 'Terminated':
640 #logger.debug("SLABDRIVER \t GetJobsId job %s TERMINATED"\
643 #if job_info['state'] == 'Error':
644 #logger.debug("SLABDRIVER \t GetJobsId ERROR message %s "\
649 #logger.error("SLABDRIVER \tGetJobsId KeyError")
652 #parsed_job_info = self.get_info_on_reserved_nodes(job_info, \
654 ##Replaces the previous entry
655 ##"assigned_network_address" / "reserved_resources"
657 #job_info.update({'node_ids':parsed_job_info[node_list_k]})
658 #del job_info[node_list_k]
659 #logger.debug(" \r\nSLABDRIVER \t GetJobsId job_info %s " %(job_info))
663 def GetJobsResources(self, job_id, username = None):
664 #job_resources=['reserved_resources', 'assigned_resources',\
665 #'job_id', 'job_uri', 'assigned_nodes',\
667 #assigned_res = ['resource_id', 'resource_uri']
668 #assigned_n = ['node', 'node_uri']
670 req = "GET_jobs_id_resources"
673 #Get job resources list from OAR
674 node_id_list = self.oar.parser.SendRequest(req, job_id, username)
675 logger.debug("SLABDRIVER \t GetJobsResources %s " %(node_id_list))
678 self.__get_hostnames_from_oar_node_ids(node_id_list)
681 #Replaces the previous entry "assigned_network_address" /
682 #"reserved_resources"
684 job_info = {'node_ids': hostname_list}
689 def get_info_on_reserved_nodes(self, job_info, node_list_name):
690 #Get the list of the testbed nodes records and make a
691 #dictionnary keyed on the hostname out of it
692 node_list_dict = self.GetNodes()
693 #node_hostname_list = []
694 node_hostname_list = [node['hostname'] for node in node_list_dict]
695 #for node in node_list_dict:
696 #node_hostname_list.append(node['hostname'])
697 node_dict = dict(zip(node_hostname_list, node_list_dict))
699 reserved_node_hostname_list = []
700 for index in range(len(job_info[node_list_name])):
701 #job_info[node_list_name][k] =
702 reserved_node_hostname_list[index] = \
703 node_dict[job_info[node_list_name][index]]['hostname']
705 logger.debug("SLABDRIVER \t get_info_on_reserved_nodes \
706 reserved_node_hostname_list %s" \
707 %(reserved_node_hostname_list))
709 logger.error("SLABDRIVER \t get_info_on_reserved_nodes KEYERROR " )
711 return reserved_node_hostname_list
713 def GetNodesCurrentlyInUse(self):
714 """Returns a list of all the nodes already involved in an oar job"""
715 return self.oar.parser.SendRequest("GET_running_jobs")
717 def __get_hostnames_from_oar_node_ids(self, resource_id_list ):
718 full_nodes_dict_list = self.GetNodes()
719 #Put the full node list into a dictionary keyed by oar node id
720 oar_id_node_dict = {}
721 for node in full_nodes_dict_list:
722 oar_id_node_dict[node['oar_id']] = node
724 #logger.debug("SLABDRIVER \t __get_hostnames_from_oar_node_ids\
725 #oar_id_node_dict %s" %(oar_id_node_dict))
727 hostname_dict_list = []
728 for resource_id in resource_id_list:
729 #Because jobs requested "asap" do not have defined resources
730 if resource_id is not "Undefined":
731 hostname_dict_list.append(\
732 oar_id_node_dict[resource_id]['hostname'])
734 #hostname_list.append(oar_id_node_dict[resource_id]['hostname'])
735 return hostname_dict_list
737 def GetReservedNodes(self,username = None):
738 #Get the nodes in use and the reserved nodes
739 reservation_dict_list = \
740 self.oar.parser.SendRequest("GET_reserved_nodes", \
744 for resa in reservation_dict_list:
745 logger.debug ("GetReservedNodes resa %s"%(resa))
746 #dict list of hostnames and their site
747 resa['reserved_nodes'] = \
748 self.__get_hostnames_from_oar_node_ids(resa['resource_ids'])
750 #del resa['resource_ids']
751 return reservation_dict_list
753 def GetNodes(self, node_filter_dict = None, return_fields_list = None):
755 node_filter_dict : dictionnary of lists
758 node_dict_by_id = self.oar.parser.SendRequest("GET_resources_full")
759 node_dict_list = node_dict_by_id.values()
760 logger.debug (" SLABDRIVER GetNodes node_filter_dict %s \
761 return_fields_list %s "%(node_filter_dict, return_fields_list))
762 #No filtering needed return the list directly
763 if not (node_filter_dict or return_fields_list):
764 return node_dict_list
766 return_node_list = []
768 for filter_key in node_filter_dict:
770 #Filter the node_dict_list by each value contained in the
771 #list node_filter_dict[filter_key]
772 for value in node_filter_dict[filter_key]:
773 for node in node_dict_list:
774 if node[filter_key] == value:
775 if return_fields_list :
777 for k in return_fields_list:
779 return_node_list.append(tmp)
781 return_node_list.append(node)
783 logger.log_exc("GetNodes KeyError")
787 return return_node_list
790 def GetSites(self, site_filter_name_list = None, return_fields_list = None):
791 site_dict = self.oar.parser.SendRequest("GET_sites")
792 #site_dict : dict where the key is the sit ename
793 return_site_list = []
794 if not ( site_filter_name_list or return_fields_list):
795 return_site_list = site_dict.values()
796 return return_site_list
798 for site_filter_name in site_filter_name_list:
799 if site_filter_name in site_dict:
800 if return_fields_list:
801 for field in return_fields_list:
804 tmp[field] = site_dict[site_filter_name][field]
806 logger.error("GetSites KeyError %s "%(field))
808 return_site_list.append(tmp)
810 return_site_list.append( site_dict[site_filter_name])
813 return return_site_list
816 def _sql_get_slice_info( self, slice_filter ):
817 #DO NOT USE RegSlice - reg_researchers to get the hrn
818 #of the user otherwise will mess up the RegRecord in
819 #Resolve, don't know why - SA 08/08/2012
821 #Only one entry for one user = one slice in slab_xp table
822 #slicerec = dbsession.query(RegRecord).filter_by(hrn = slice_filter).first()
823 raw_slicerec = dbsession.query(RegSlice).options(joinedload('reg_researchers')).filter_by(hrn = slice_filter).first()
824 #raw_slicerec = dbsession.query(RegRecord).filter_by(hrn = slice_filter).first()
827 #raw_slicerec.reg_researchers
828 raw_slicerec = raw_slicerec.__dict__
829 logger.debug(" SLABDRIVER \t get_slice_info slice_filter %s raw_slicerec %s"%(slice_filter,raw_slicerec))
830 slicerec = raw_slicerec
831 #only one researcher per slice so take the first one
832 #slicerec['reg_researchers'] = raw_slicerec['reg_researchers']
833 #del slicerec['reg_researchers']['_sa_instance_state']
840 def _sql_get_slice_info_from_user( self, slice_filter ):
841 #slicerec = dbsession.query(RegRecord).filter_by(record_id = slice_filter).first()
842 raw_slicerec = dbsession.query(RegUser).options(joinedload('reg_slices_as_researcher')).filter_by(record_id = slice_filter).first()
843 #raw_slicerec = dbsession.query(RegRecord).filter_by(record_id = slice_filter).first()
844 #Put it in correct order
845 user_needed_fields = ['peer_authority', 'hrn', 'last_updated', 'classtype', 'authority', 'gid', 'record_id', 'date_created', 'type', 'email', 'pointer']
846 slice_needed_fields = ['peer_authority', 'hrn', 'last_updated', 'classtype', 'authority', 'gid', 'record_id', 'date_created', 'type', 'pointer']
848 #raw_slicerec.reg_slices_as_researcher
849 raw_slicerec = raw_slicerec.__dict__
851 slicerec = dict([(k,raw_slicerec['reg_slices_as_researcher'][0].__dict__[k]) for k in slice_needed_fields])
852 slicerec['reg_researchers'] = dict([(k, raw_slicerec[k]) for k in user_needed_fields])
853 #TODO Handle multiple slices for one user SA 10/12/12
854 #for now only take the first slice record associated to the rec user
855 ##slicerec = raw_slicerec['reg_slices_as_researcher'][0].__dict__
856 #del raw_slicerec['reg_slices_as_researcher']
857 #slicerec['reg_researchers'] = raw_slicerec
858 ##del slicerec['_sa_instance_state']
865 def _get_slice_records(self, slice_filter = None, \
866 slice_filter_type = None):
870 #Get list of slices based on the slice hrn
871 if slice_filter_type == 'slice_hrn':
873 #if get_authority(slice_filter) == self.root_auth:
874 #login = slice_filter.split(".")[1].split("_")[0]
876 slicerec = self._sql_get_slice_info(slice_filter)
882 #Get slice based on user id
883 if slice_filter_type == 'record_id_user':
885 slicerec = self._sql_get_slice_info_from_user(slice_filter)
888 fixed_slicerec_dict = slicerec
889 #At this point if the there is no login it means
890 #record_id_user filter has been used for filtering
892 ##If theslice record is from senslab
893 #if fixed_slicerec_dict['peer_authority'] is None:
894 #login = fixed_slicerec_dict['hrn'].split(".")[1].split("_")[0]
895 #return login, fixed_slicerec_dict
896 return fixed_slicerec_dict
898 def GetSlices(self, slice_filter = None, slice_filter_type = None, login=None):
899 """ Get the slice records from the slab db.
900 Returns a slice ditc if slice_filter and slice_filter_type
902 Returns a list of slice dictionnaries if there are no filters
907 authorized_filter_types_list = ['slice_hrn', 'record_id_user']
908 return_slicerec_dictlist = []
910 #First try to get information on the slice based on the filter provided
911 if slice_filter_type in authorized_filter_types_list:
912 fixed_slicerec_dict = \
913 self._get_slice_records(slice_filter, slice_filter_type)
914 #login, fixed_slicerec_dict = \
915 #self._get_slice_records(slice_filter, slice_filter_type)
916 logger.debug(" SLABDRIVER \tGetSlices login %s \
917 slice record %s slice_filter %s slice_filter_type %s "\
918 %(login, fixed_slicerec_dict,slice_filter, slice_filter_type))
921 #Now we have the slice record fixed_slicerec_dict, get the
922 #jobs associated to this slice
923 #leases_list = self.GetReservedNodes(username = login)
924 leases_list = self.GetLeases(login = login)
925 #If no job is running or no job scheduled
926 #return only the slice record
927 if leases_list == [] and fixed_slicerec_dict:
928 return_slicerec_dictlist.append(fixed_slicerec_dict)
930 #If several jobs for one slice , put the slice record into
931 # each lease information dict
932 for lease in leases_list :
935 reserved_list = lease['reserved_nodes']
937 slicerec_dict['oar_job_id']= lease['lease_id']
938 slicerec_dict.update({'list_node_ids':{'hostname':reserved_list}})
939 slicerec_dict.update({'node_ids':lease['reserved_nodes']})
941 #Update lease dict with the slice record
942 if fixed_slicerec_dict:
943 fixed_slicerec_dict['oar_job_id'] = []
944 fixed_slicerec_dict['oar_job_id'].append(slicerec_dict['oar_job_id'])
945 slicerec_dict.update(fixed_slicerec_dict)
946 #slicerec_dict.update({'hrn':\
947 #str(fixed_slicerec_dict['slice_hrn'])})
950 return_slicerec_dictlist.append(slicerec_dict)
951 logger.debug("SLABDRIVER.PY \tGetSlices \
952 slicerec_dict %s return_slicerec_dictlist %s \
953 lease['reserved_nodes'] \
954 %s" %(slicerec_dict, return_slicerec_dictlist, \
955 lease['reserved_nodes'] ))
957 logger.debug("SLABDRIVER.PY \tGetSlices RETURN \
958 return_slicerec_dictlist %s" \
959 %(return_slicerec_dictlist))
961 return return_slicerec_dictlist
965 #Get all slices from the senslab sfa database ,
966 #put them in dict format
967 #query_slice_list = dbsession.query(RegRecord).all()
968 query_slice_list = dbsession.query(RegSlice).options(joinedload('reg_researchers')).all()
969 #query_slice_list = dbsession.query(RegRecord).filter_by(type='slice').all()
970 #query_slice_list = slab_dbsession.query(SenslabXP).all()
971 return_slicerec_dictlist = []
972 for record in query_slice_list:
973 tmp = record.__dict__
974 tmp['reg_researchers'] = tmp['reg_researchers'][0].__dict__
975 #del tmp['reg_researchers']['_sa_instance_state']
976 return_slicerec_dictlist.append(tmp)
977 #return_slicerec_dictlist.append(record.__dict__)
979 #Get all the jobs reserved nodes
980 leases_list = self.GetReservedNodes()
983 for fixed_slicerec_dict in return_slicerec_dictlist:
985 #Check if the slice belongs to a senslab user
986 if fixed_slicerec_dict['peer_authority'] is None:
987 owner = fixed_slicerec_dict['hrn'].split(".")[1].split("_")[0]
990 for lease in leases_list:
991 if owner == lease['user']:
992 slicerec_dict['oar_job_id'] = lease['lease_id']
994 #for reserved_node in lease['reserved_nodes']:
995 logger.debug("SLABDRIVER.PY \tGetSlices lease %s "\
998 reserved_list = lease['reserved_nodes']
1000 slicerec_dict.update({'node_ids':lease['reserved_nodes']})
1001 slicerec_dict.update({'list_node_ids':{'hostname':reserved_list}})
1002 slicerec_dict.update(fixed_slicerec_dict)
1003 #slicerec_dict.update({'hrn':\
1004 #str(fixed_slicerec_dict['slice_hrn'])})
1005 #return_slicerec_dictlist.append(slicerec_dict)
1006 fixed_slicerec_dict.update(slicerec_dict)
1008 logger.debug("SLABDRIVER.PY \tGetSlices RETURN \
1009 return_slicerec_dictlist %s \slice_filter %s " \
1010 %(return_slicerec_dictlist, slice_filter))
1012 return return_slicerec_dictlist
1015 def testbed_name (self): return self.hrn
1017 # 'geni_request_rspec_versions' and 'geni_ad_rspec_versions' are mandatory
1018 def aggregate_version (self):
1019 version_manager = VersionManager()
1020 ad_rspec_versions = []
1021 request_rspec_versions = []
1022 for rspec_version in version_manager.versions:
1023 if rspec_version.content_type in ['*', 'ad']:
1024 ad_rspec_versions.append(rspec_version.to_dict())
1025 if rspec_version.content_type in ['*', 'request']:
1026 request_rspec_versions.append(rspec_version.to_dict())
1028 'testbed':self.testbed_name(),
1029 'geni_request_rspec_versions': request_rspec_versions,
1030 'geni_ad_rspec_versions': ad_rspec_versions,
1036 # Convert SFA fields to PLC fields for use when registering up updating
1037 # registry record in the PLC database
1039 # @param type type of record (user, slice, ...)
1040 # @param hrn human readable name
1041 # @param sfa_fields dictionary of SFA fields
1042 # @param slab_fields dictionary of PLC fields (output)
1044 def sfa_fields_to_slab_fields(self, sfa_type, hrn, record):
1048 #for field in record:
1049 # slab_record[field] = record[field]
1051 if sfa_type == "slice":
1052 #instantion used in get_slivers ?
1053 if not "instantiation" in slab_record:
1054 slab_record["instantiation"] = "senslab-instantiated"
1055 #slab_record["hrn"] = hrn_to_pl_slicename(hrn)
1056 #Unused hrn_to_pl_slicename because Slab's hrn already
1057 #in the appropriate form SA 23/07/12
1058 slab_record["hrn"] = hrn
1059 logger.debug("SLABDRIVER.PY sfa_fields_to_slab_fields \
1060 slab_record %s " %(slab_record['hrn']))
1062 slab_record["url"] = record["url"]
1063 if "description" in record:
1064 slab_record["description"] = record["description"]
1065 if "expires" in record:
1066 slab_record["expires"] = int(record["expires"])
1068 #nodes added by OAR only and then imported to SFA
1069 #elif type == "node":
1070 #if not "hostname" in slab_record:
1071 #if not "hostname" in record:
1072 #raise MissingSfaInfo("hostname")
1073 #slab_record["hostname"] = record["hostname"]
1074 #if not "model" in slab_record:
1075 #slab_record["model"] = "geni"
1078 #elif type == "authority":
1079 #slab_record["login_base"] = hrn_to_slab_login_base(hrn)
1081 #if not "name" in slab_record:
1082 #slab_record["name"] = hrn
1084 #if not "abbreviated_name" in slab_record:
1085 #slab_record["abbreviated_name"] = hrn
1087 #if not "enabled" in slab_record:
1088 #slab_record["enabled"] = True
1090 #if not "is_public" in slab_record:
1091 #slab_record["is_public"] = True
1098 def __transforms_timestamp_into_date(self, xp_utc_timestamp = None):
1099 """ Transforms unix timestamp into valid OAR date format """
1101 #Used in case of a scheduled experiment (not immediate)
1102 #To run an XP immediately, don't specify date and time in RSpec
1103 #They will be set to None.
1104 if xp_utc_timestamp:
1105 #transform the xp_utc_timestamp into server readable time
1106 xp_server_readable_date = datetime.fromtimestamp(int(\
1107 xp_utc_timestamp)).strftime(self.time_format)
1109 return xp_server_readable_date
1117 def LaunchExperimentOnOAR(self, added_nodes, slice_name, \
1118 lease_start_time, lease_duration, slice_user=None):
1120 lease_dict['lease_start_time'] = lease_start_time
1121 lease_dict['lease_duration'] = lease_duration
1122 lease_dict['added_nodes'] = added_nodes
1123 lease_dict['slice_name'] = slice_name
1124 lease_dict['slice_user'] = slice_user
1125 lease_dict['grain'] = self.GetLeaseGranularity()
1126 lease_dict['time_format'] = self.time_format
1129 def __create_job_structure_request_for_OAR(lease_dict):
1130 """ Creates the structure needed for a correct POST on OAR.
1131 Makes the timestamp transformation into the appropriate format.
1132 Sends the POST request to create the job with the resources in
1141 reqdict['workdir'] = '/tmp'
1142 reqdict['resource'] = "{network_address in ("
1144 for node in lease_dict['added_nodes']:
1145 logger.debug("\r\n \r\n OARrestapi \t \
1146 __create_job_structure_request_for_OAR node %s" %(node))
1148 # Get the ID of the node
1150 reqdict['resource'] += "'" + nodeid + "', "
1151 nodeid_list.append(nodeid)
1153 custom_length = len(reqdict['resource'])- 2
1154 reqdict['resource'] = reqdict['resource'][0:custom_length] + \
1155 ")}/nodes=" + str(len(nodeid_list))
1157 def __process_walltime(duration):
1158 """ Calculates the walltime in seconds from the duration in H:M:S
1159 specified in the RSpec.
1163 # Fixing the walltime by adding a few delays.
1164 # First put the walltime in seconds oarAdditionalDelay = 20;
1165 # additional delay for /bin/sleep command to
1166 # take in account prologue and epilogue scripts execution
1167 # int walltimeAdditionalDelay = 240; additional delay
1168 desired_walltime = duration
1169 total_walltime = desired_walltime + 240 #+4 min Update SA 23/10/12
1170 sleep_walltime = desired_walltime # 0 sec added Update SA 23/10/12
1172 #Put the walltime back in str form
1173 #First get the hours
1174 walltime.append(str(total_walltime / 3600))
1175 total_walltime = total_walltime - 3600 * int(walltime[0])
1176 #Get the remaining minutes
1177 walltime.append(str(total_walltime / 60))
1178 total_walltime = total_walltime - 60 * int(walltime[1])
1180 walltime.append(str(total_walltime))
1183 logger.log_exc(" __process_walltime duration null")
1185 return walltime, sleep_walltime
1188 walltime, sleep_walltime = \
1189 __process_walltime(int(lease_dict['lease_duration'])*lease_dict['grain'])
1192 reqdict['resource'] += ",walltime=" + str(walltime[0]) + \
1193 ":" + str(walltime[1]) + ":" + str(walltime[2])
1194 reqdict['script_path'] = "/bin/sleep " + str(sleep_walltime)
1196 #In case of a scheduled experiment (not immediate)
1197 #To run an XP immediately, don't specify date and time in RSpec
1198 #They will be set to None.
1199 if lease_dict['lease_start_time'] is not '0':
1200 #Readable time accepted by OAR
1201 start_time = datetime.fromtimestamp(int(lease_dict['lease_start_time'])).\
1202 strftime(lease_dict['time_format'])
1203 reqdict['reservation'] = start_time
1204 #If there is not start time, Immediate XP. No need to add special
1208 reqdict['type'] = "deploy"
1209 reqdict['directory'] = ""
1210 reqdict['name'] = "SFA_" + lease_dict['slice_user']
1214 logger.debug("SLABDRIVER.PY \tLaunchExperimentOnOAR slice_user %s\
1215 \r\n " %(slice_user))
1216 #Create the request for OAR
1217 reqdict = __create_job_structure_request_for_OAR(lease_dict)
1218 # first step : start the OAR job and update the job
1219 logger.debug("SLABDRIVER.PY \tLaunchExperimentOnOAR reqdict %s\
1222 answer = self.oar.POSTRequestToOARRestAPI('POST_job', \
1223 reqdict, slice_user)
1224 logger.debug("SLABDRIVER \tLaunchExperimentOnOAR jobid %s " %(answer))
1226 jobid = answer['id']
1228 logger.log_exc("SLABDRIVER \tLaunchExperimentOnOAR \
1229 Impossible to create job %s " %(answer))
1233 def __configure_experiment(jobid, added_nodes):
1234 # second step : configure the experiment
1235 # we need to store the nodes in a yaml (well...) file like this :
1236 # [1,56,23,14,45,75] with name /tmp/sfa<jobid>.json
1237 tmp_dir = '/tmp/sfa/'
1238 if not os.path.exists(tmp_dir):
1239 os.makedirs(tmp_dir)
1240 job_file = open(tmp_dir + str(jobid) + '.json', 'w')
1242 job_file.write(str(added_nodes[0].strip('node')))
1243 for node in added_nodes[1:len(added_nodes)] :
1244 job_file.write(', '+ node.strip('node'))
1249 def __launch_senslab_experiment(jobid):
1250 # third step : call the senslab-experiment wrapper
1251 #command= "java -jar target/sfa-1.0-jar-with-dependencies.jar
1252 # "+str(jobid)+" "+slice_user
1253 javacmdline = "/usr/bin/java"
1255 "/opt/senslabexperimentwrapper/sfa-1.0-jar-with-dependencies.jar"
1257 output = subprocess.Popen([javacmdline, "-jar", jarname, str(jobid), \
1258 slice_user],stdout=subprocess.PIPE).communicate()[0]
1260 logger.debug("SLABDRIVER \t __configure_experiment wrapper returns%s " \
1267 logger.debug("SLABDRIVER \tLaunchExperimentOnOAR jobid %s \
1268 added_nodes %s slice_user %s" %(jobid, added_nodes, slice_user))
1271 __configure_experiment(jobid, added_nodes)
1272 __launch_senslab_experiment(jobid)
1277 def AddLeases(self, hostname_list, slice_record, \
1278 lease_start_time, lease_duration):
1279 logger.debug("SLABDRIVER \r\n \r\n \t AddLeases hostname_list %s \
1280 slice_record %s lease_start_time %s lease_duration %s "\
1281 %( hostname_list, slice_record , lease_start_time, \
1284 #tmp = slice_record['reg-researchers'][0].split(".")
1285 username = slice_record['login']
1286 #username = tmp[(len(tmp)-1)]
1287 job_id = self.LaunchExperimentOnOAR(hostname_list, slice_record['hrn'], \
1288 lease_start_time, lease_duration, username)
1289 start_time = datetime.fromtimestamp(int(lease_start_time)).strftime(self.time_format)
1290 end_time = lease_start_time + lease_duration
1292 import logging, logging.handlers
1293 from sfa.util.sfalogging import _SfaLogger
1294 logger.debug("SLABDRIVER \r\n \r\n \t AddLeases TURN ON LOGGING SQL %s %s %s "%(slice_record['hrn'], job_id, end_time))
1295 sql_logger = _SfaLogger(loggername = 'sqlalchemy.engine', level=logging.DEBUG)
1296 logger.debug("SLABDRIVER \r\n \r\n \t AddLeases %s %s %s " %(type(slice_record['hrn']), type(job_id), type(end_time)))
1297 slab_ex_row = SenslabXP(slice_hrn = slice_record['hrn'], job_id = job_id,end_time= end_time)
1298 logger.debug("SLABDRIVER \r\n \r\n \t AddLeases slab_ex_row %s" %(slab_ex_row))
1299 slab_dbsession.add(slab_ex_row)
1300 slab_dbsession.commit()
1302 logger.debug("SLABDRIVER \t AddLeases hostname_list start_time %s " %(start_time))
1307 #Delete the jobs from job_senslab table
1308 def DeleteSliceFromNodes(self, slice_record):
1309 for job_id in slice_record['oar_job_id']:
1310 self.DeleteJobs(job_id, slice_record['hrn'])
1314 def GetLeaseGranularity(self):
1315 """ Returns the granularity of Senslab testbed.
1316 OAR returns seconds for experiments duration.
1317 Defined in seconds. """
1322 def update_jobs_in_slabdb(self, job_oar_list, jobs_psql):
1323 #Get all the entries in slab_xp table
1326 jobs_psql = set(jobs_psql)
1327 kept_jobs = set(job_oar_list).intersection(jobs_psql)
1328 logger.debug ( "\r\n \t\tt update_jobs_in_slabdb jobs_psql %s \r\n \t job_oar_list %s \
1329 kept_jobs %s " %(jobs_psql,job_oar_list,kept_jobs))
1330 deleted_jobs = set(jobs_psql).difference(kept_jobs)
1331 deleted_jobs = list(deleted_jobs)
1332 if len(deleted_jobs) > 0:
1333 slab_dbsession.query(SenslabXP).filter(SenslabXP.job_id.in_(deleted_jobs)).delete(synchronize_session='fetch')
1334 slab_dbsession.commit()
1340 def GetLeases(self, lease_filter_dict=None, login=None):
1343 unfiltered_reservation_list = self.GetReservedNodes(login)
1345 reservation_list = []
1346 #Find the slice associated with this user senslab ldap uid
1347 logger.debug(" SLABDRIVER.PY \tGetLeases login %s unfiltered_reservation_list %s " %(login ,unfiltered_reservation_list))
1348 #Create user dict first to avoid looking several times for
1349 #the same user in LDAP SA 27/07/12
1353 jobs_psql_query = slab_dbsession.query(SenslabXP).all()
1354 jobs_psql_dict = [ (row.job_id, row.__dict__ )for row in jobs_psql_query ]
1355 jobs_psql_dict = dict(jobs_psql_dict)
1356 logger.debug("SLABDRIVER \tGetLeases jobs_psql_dict %s"\
1358 jobs_psql_id_list = [ row.job_id for row in jobs_psql_query ]
1362 for resa in unfiltered_reservation_list:
1363 logger.debug("SLABDRIVER \tGetLeases USER %s"\
1365 #Cosntruct list of jobs (runing, waiting..) in oar
1366 job_oar_list.append(resa['lease_id'])
1367 #If there is information on the job in SLAB DB (slice used and job id)
1368 if resa['lease_id'] in jobs_psql_dict:
1369 job_info = jobs_psql_dict[resa['lease_id']]
1370 logger.debug("SLABDRIVER \tGetLeases resa_user_dict %s"\
1372 resa['slice_hrn'] = job_info['slice_hrn']
1373 resa['slice_id'] = hrn_to_urn(resa['slice_hrn'], 'slice')
1375 #Assume it is a senslab slice:
1377 resa['slice_id'] = hrn_to_urn(self.root_auth+'.'+ resa['user'] +"_slice" , 'slice')
1378 #if resa['user'] not in resa_user_dict:
1379 #logger.debug("SLABDRIVER \tGetLeases userNOTIN ")
1380 #ldap_info = self.ldap.LdapSearch('(uid='+resa['user']+')')
1382 #ldap_info = ldap_info[0][1]
1383 ##Get the backref :relationship table reg-researchers
1384 #user = dbsession.query(RegUser).options(joinedload('reg_slices_as_researcher')).filter_by(email = \
1385 #ldap_info['mail'][0])
1387 #user = user.first()
1388 #user = user.__dict__
1389 #slice_info = user['reg_slices_as_researcher'][0].__dict__
1390 ##Separated in case user not in database :
1391 ##record_id not defined SA 17/07//12
1393 ##query_slice_info = slab_dbsession.query(SenslabXP).filter_by(record_id_user = user.record_id)
1394 ##if query_slice_info:
1395 ##slice_info = query_slice_info.first()
1399 #resa_user_dict[resa['user']] = {}
1400 #resa_user_dict[resa['user']]['ldap_info'] = user
1401 #resa_user_dict[resa['user']]['slice_info'] = slice_info
1403 #resa['slice_hrn'] = resa_user_dict[resa['user']]['slice_info']['hrn']
1404 #resa['slice_id'] = hrn_to_urn(resa['slice_hrn'], 'slice')
1407 resa['component_id_list'] = []
1408 resa['hrn'] = Xrn(resa['slice_id']).get_hrn()
1409 #Transform the hostnames into urns (component ids)
1410 for node in resa['reserved_nodes']:
1411 #resa['component_id_list'].append(hostname_to_urn(self.hrn, \
1412 #self.root_auth, node['hostname']))
1413 slab_xrn = slab_xrn_object(self.root_auth, node)
1414 resa['component_id_list'].append(slab_xrn.urn)
1416 if lease_filter_dict:
1417 logger.debug("SLABDRIVER \tGetLeases resa_ %s \r\n leasefilter %s"\
1418 %(resa,lease_filter_dict))
1420 if lease_filter_dict['name'] == resa['hrn']:
1421 reservation_list.append(resa)
1423 if lease_filter_dict is None:
1424 reservation_list = unfiltered_reservation_list
1426 #del unfiltered_reservation_list[unfiltered_reservation_list.index(resa)]
1429 self.update_jobs_in_slabdb(job_oar_list, jobs_psql_id_list)
1431 #for resa in unfiltered_reservation_list:
1435 #if resa['user'] in resa_user_dict:
1436 #resa['slice_hrn'] = resa_user_dict[resa['user']]['slice_info']['hrn']
1437 #resa['slice_id'] = hrn_to_urn(resa['slice_hrn'], 'slice')
1439 ##resa['slice_id'] = hrn_to_urn(slice_info.slice_hrn, 'slice')
1440 #resa['component_id_list'] = []
1441 ##Transform the hostnames into urns (component ids)
1442 #for node in resa['reserved_nodes']:
1443 ##resa['component_id_list'].append(hostname_to_urn(self.hrn, \
1444 ##self.root_auth, node['hostname']))
1445 #slab_xrn = slab_xrn_object(self.root_auth, node)
1446 #resa['component_id_list'].append(slab_xrn.urn)
1448 ##Filter the reservation list if necessary
1449 ##Returns all the leases associated with a given slice
1450 #if lease_filter_dict:
1451 #logger.debug("SLABDRIVER \tGetLeases lease_filter_dict %s"\
1452 #%(lease_filter_dict))
1453 #for resa in unfiltered_reservation_list:
1454 #if lease_filter_dict['name'] == resa['slice_hrn']:
1455 #reservation_list.append(resa)
1457 #reservation_list = unfiltered_reservation_list
1459 logger.debug(" SLABDRIVER.PY \tGetLeases reservation_list %s"\
1460 %(reservation_list))
1461 return reservation_list
1463 def augment_records_with_testbed_info (self, sfa_records):
1464 return self.fill_record_info (sfa_records)
1466 def fill_record_info(self, record_list):
1468 Given a SFA record, fill in the senslab specific and SFA specific
1469 fields in the record.
1472 logger.debug("SLABDRIVER \tfill_record_info records %s " %(record_list))
1473 if not isinstance(record_list, list):
1474 record_list = [record_list]
1477 for record in record_list:
1478 #If the record is a SFA slice record, then add information
1479 #about the user of this slice. This kind of
1480 #information is in the Senslab's DB.
1481 if str(record['type']) == 'slice':
1482 if 'reg_researchers' in record and isinstance(record['reg_researchers'],list) :
1483 record['reg_researchers'] = record['reg_researchers'][0].__dict__
1484 record.update({'PI':[record['reg_researchers']['hrn']],
1485 'researcher': [record['reg_researchers']['hrn']],
1486 'name':record['hrn'],
1489 'person_ids':[record['reg_researchers']['record_id']],
1490 'geni_urn':'', #For client_helper.py compatibility
1491 'keys':'', #For client_helper.py compatibility
1492 'key_ids':''}) #For client_helper.py compatibility
1495 #Get slab slice record.
1496 recslice_list = self.GetSlices(slice_filter = \
1497 str(record['hrn']),\
1498 slice_filter_type = 'slice_hrn')
1500 #recuser = recslice_list[0]['reg_researchers']
1501 ##recuser = dbsession.query(RegRecord).filter_by(record_id = \
1502 ##recslice_list[0]['record_id_user']).first()
1504 #record.update({'PI':[recuser['hrn']],
1505 #'researcher': [recuser['hrn']],
1506 #'name':record['hrn'],
1509 #'person_ids':[recslice_list[0]['reg_researchers']['record_id']],
1510 #'geni_urn':'', #For client_helper.py compatibility
1511 #'keys':'', #For client_helper.py compatibility
1512 #'key_ids':''}) #For client_helper.py compatibility
1513 logger.debug("SLABDRIVER \tfill_record_info TYPE SLICE RECUSER record['hrn'] %s ecord['oar_job_id'] %s " %(record['hrn'],record['oar_job_id']))
1515 for rec in recslice_list:
1516 logger.debug("SLABDRIVER\r\n \t \t fill_record_info oar_job_id %s " %(rec['oar_job_id']))
1517 #record['oar_job_id'].append(rec['oar_job_id'])
1518 #del record['_sa_instance_state']
1519 del record['reg_researchers']
1520 record['node_ids'] = [ self.root_auth + hostname for hostname in rec['node_ids']]
1524 logger.debug( "SLABDRIVER.PY \t fill_record_info SLICE \
1525 recslice_list %s \r\n \t RECORD %s \r\n \r\n" %(recslice_list,record))
1526 if str(record['type']) == 'user':
1527 #The record is a SFA user record.
1528 #Get the information about his slice from Senslab's DB
1529 #and add it to the user record.
1530 recslice_list = self.GetSlices(\
1531 slice_filter = record['record_id'],\
1532 slice_filter_type = 'record_id_user')
1534 logger.debug( "SLABDRIVER.PY \t fill_record_info TYPE USER \
1535 recslice_list %s \r\n \t RECORD %s \r\n" %(recslice_list , record))
1536 #Append slice record in records list,
1537 #therefore fetches user and slice info again(one more loop)
1538 #Will update PIs and researcher for the slice
1539 #recuser = dbsession.query(RegRecord).filter_by(record_id = \
1540 #recslice_list[0]['record_id_user']).first()
1541 recuser = recslice_list[0]['reg_researchers']
1542 logger.debug( "SLABDRIVER.PY \t fill_record_info USER \
1543 recuser %s \r\n \r\n" %(recuser))
1545 recslice = recslice_list[0]
1546 recslice.update({'PI':[recuser['hrn']],
1547 'researcher': [recuser['hrn']],
1548 'name':record['hrn'],
1551 'person_ids':[recuser['record_id']]})
1553 for rec in recslice_list:
1554 recslice['oar_job_id'].append(rec['oar_job_id'])
1558 recslice.update({'type':'slice', \
1559 'hrn':recslice_list[0]['hrn']})
1562 #GetPersons takes [] as filters
1563 #user_slab = self.GetPersons([{'hrn':recuser.hrn}])
1564 user_slab = self.GetPersons([record])
1567 record.update(user_slab[0])
1568 #For client_helper.py compatibility
1569 record.update( { 'geni_urn':'',
1572 record_list.append(recslice)
1574 logger.debug("SLABDRIVER.PY \tfill_record_info ADDING SLICE\
1575 INFO TO USER records %s" %(record_list))
1577 logger.debug("SLABDRIVER.PY \tfill_record_info END \
1578 record %s \r\n \r\n " %(record))
1580 except TypeError, error:
1581 logger.log_exc("SLABDRIVER \t fill_record_info EXCEPTION %s"\
1583 #logger.debug("SLABDRIVER.PY \t fill_record_info ENDENDEND ")
1587 #self.fill_record_slab_info(records)
1593 #TODO Update membership? update_membership_list SA 05/07/12
1594 #def update_membership_list(self, oldRecord, record, listName, addFunc, \
1596 ## get a list of the HRNs tht are members of the old and new records
1598 #oldList = oldRecord.get(listName, [])
1601 #newList = record.get(listName, [])
1603 ## if the lists are the same, then we don't have to update anything
1604 #if (oldList == newList):
1607 ## build a list of the new person ids, by looking up each person to get
1611 #records = table.find({'type': 'user', 'hrn': newList})
1612 #for rec in records:
1613 #newIdList.append(rec['pointer'])
1615 ## build a list of the old person ids from the person_ids field
1617 #oldIdList = oldRecord.get("person_ids", [])
1618 #containerId = oldRecord.get_pointer()
1620 ## if oldRecord==None, then we are doing a Register, instead of an
1623 #containerId = record.get_pointer()
1625 ## add people who are in the new list, but not the oldList
1626 #for personId in newIdList:
1627 #if not (personId in oldIdList):
1628 #addFunc(self.plauth, personId, containerId)
1630 ## remove people who are in the old list, but not the new list
1631 #for personId in oldIdList:
1632 #if not (personId in newIdList):
1633 #delFunc(self.plauth, personId, containerId)
1635 #def update_membership(self, oldRecord, record):
1637 #if record.type == "slice":
1638 #self.update_membership_list(oldRecord, record, 'researcher',
1639 #self.users.AddPersonToSlice,
1640 #self.users.DeletePersonFromSlice)
1641 #elif record.type == "authority":
1646 # I don't think you plan on running a component manager at this point
1647 # let me clean up the mess of ComponentAPI that is deprecated anyways
1650 #TODO FUNCTIONS SECTION 04/07/2012 SA
1652 #TODO : Is UnBindObjectFromPeer still necessary ? Currently does nothing
1654 def UnBindObjectFromPeer(self, auth, object_type, object_id, shortname):
1655 """ This method is a hopefully temporary hack to let the sfa correctly
1656 detach the objects it creates from a remote peer object. This is
1657 needed so that the sfa federation link can work in parallel with
1658 RefreshPeer, as RefreshPeer depends on remote objects being correctly
1661 auth : struct, API authentication structure
1662 AuthMethod : string, Authentication method to use
1663 object_type : string, Object type, among 'site','person','slice',
1665 object_id : int, object_id
1666 shortname : string, peer shortname
1670 logger.warning("SLABDRIVER \tUnBindObjectFromPeer EMPTY-\
1674 #TODO Is BindObjectToPeer still necessary ? Currently does nothing
1676 def BindObjectToPeer(self, auth, object_type, object_id, shortname=None, \
1677 remote_object_id=None):
1678 """This method is a hopefully temporary hack to let the sfa correctly
1679 attach the objects it creates to a remote peer object. This is needed
1680 so that the sfa federation link can work in parallel with RefreshPeer,
1681 as RefreshPeer depends on remote objects being correctly marked.
1683 shortname : string, peer shortname
1684 remote_object_id : int, remote object_id, set to 0 if unknown
1688 logger.warning("SLABDRIVER \tBindObjectToPeer EMPTY - DO NOTHING \r\n ")
1691 #TODO UpdateSlice 04/07/2012 SA
1692 #Funciton should delete and create another job since oin senslab slice=job
1693 def UpdateSlice(self, auth, slice_id_or_name, slice_fields=None):
1694 """Updates the parameters of an existing slice with the values in
1696 Users may only update slices of which they are members.
1697 PIs may update any of the slices at their sites, or any slices of
1698 which they are members. Admins may update any slice.
1699 Only PIs and admins may update max_nodes. Slices cannot be renewed
1700 (by updating the expires parameter) more than 8 weeks into the future.
1701 Returns 1 if successful, faults otherwise.
1705 logger.warning("SLABDRIVER UpdateSlice EMPTY - DO NOTHING \r\n ")
1708 #TODO UpdatePerson 04/07/2012 SA
1709 def UpdatePerson(self, slab_hrn, federated_hrn, person_fields=None):
1710 """Updates a person. Only the fields specified in person_fields
1711 are updated, all other fields are left untouched.
1712 Users and techs can only update themselves. PIs can only update
1713 themselves and other non-PIs at their sites.
1714 Returns 1 if successful, faults otherwise.
1718 #new_row = FederatedToSenslab(slab_hrn, federated_hrn)
1719 #slab_dbsession.add(new_row)
1720 #slab_dbsession.commit()
1722 logger.debug("SLABDRIVER UpdatePerson EMPTY - DO NOTHING \r\n ")
1725 #TODO GetKeys 04/07/2012 SA
1726 def GetKeys(self, auth, key_filter=None, return_fields=None):
1727 """Returns an array of structs containing details about keys.
1728 If key_filter is specified and is an array of key identifiers,
1729 or a struct of key attributes, only keys matching the filter
1730 will be returned. If return_fields is specified, only the
1731 specified details will be returned.
1733 Admin may query all keys. Non-admins may only query their own keys.
1737 logger.warning("SLABDRIVER GetKeys EMPTY - DO NOTHING \r\n ")
1740 #TODO DeleteKey 04/07/2012 SA
1741 def DeleteKey(self, auth, key_id):
1743 Non-admins may only delete their own keys.
1744 Returns 1 if successful, faults otherwise.
1748 logger.warning("SLABDRIVER DeleteKey EMPTY - DO NOTHING \r\n ")
1752 #TODO : Check rights to delete person
1753 def DeletePerson(self, auth, person_record):
1754 """ Disable an existing account in senslab LDAP.
1755 Users and techs can only delete themselves. PIs can only
1756 delete themselves and other non-PIs at their sites.
1757 ins can delete anyone.
1758 Returns 1 if successful, faults otherwise.
1762 #Disable user account in senslab LDAP
1763 ret = self.ldap.LdapMarkUserAsDeleted(person_record)
1764 logger.warning("SLABDRIVER DeletePerson %s " %(person_record))
1767 #TODO Check DeleteSlice, check rights 05/07/2012 SA
1768 def DeleteSlice(self, auth, slice_record):
1769 """ Deletes the specified slice.
1770 Senslab : Kill the job associated with the slice if there is one
1771 using DeleteSliceFromNodes.
1772 Updates the slice record in slab db to remove the slice nodes.
1774 Users may only delete slices of which they are members. PIs may
1775 delete any of the slices at their sites, or any slices of which
1776 they are members. Admins may delete any slice.
1777 Returns 1 if successful, faults otherwise.
1781 self.DeleteSliceFromNodes(slice_record)
1782 logger.warning("SLABDRIVER DeleteSlice %s "%(slice_record))
1785 def __add_person_to_db(self, user_dict):
1787 check_if_exists = dbsession.query(RegUser).filter_by(email = user_dict['email']).first()
1788 #user doesn't exists
1789 if not check_if_exists:
1790 logger.debug("__add_person_to_db \t Adding %s \r\n \r\n \
1791 _________________________________________________________________________\
1792 " %(user_dict['hrn']))
1793 user_record = RegUser(hrn =user_dict['hrn'] , pointer= '-1', authority=get_authority(hrn), \
1794 email= user_dict['email'], gid = None)
1795 user_record.reg_keys = [RegKey(user_dict['pkey'])]
1796 user_record.just_created()
1797 dbsession.add (user_record)
1801 #TODO AddPerson 04/07/2012 SA
1802 #def AddPerson(self, auth, person_fields=None):
1803 def AddPerson(self, record):#TODO fixing 28/08//2012 SA
1804 """Adds a new account. Any fields specified in records are used,
1805 otherwise defaults are used.
1806 Accounts are disabled by default. To enable an account,
1808 Returns the new person_id (> 0) if successful, faults otherwise.
1812 ret = self.ldap.LdapAddUser(record)
1813 logger.debug("SLABDRIVER AddPerson return code %s \r\n "%(ret))
1814 self.__add_person_to_db(record)
1817 #TODO AddPersonToSite 04/07/2012 SA
1818 def AddPersonToSite (self, auth, person_id_or_email, \
1819 site_id_or_login_base=None):
1820 """ Adds the specified person to the specified site. If the person is
1821 already a member of the site, no errors are returned. Does not change
1822 the person's primary site.
1823 Returns 1 if successful, faults otherwise.
1827 logger.warning("SLABDRIVER AddPersonToSite EMPTY - DO NOTHING \r\n ")
1830 #TODO AddRoleToPerson : Not sure if needed in senslab 04/07/2012 SA
1831 def AddRoleToPerson(self, auth, role_id_or_name, person_id_or_email):
1832 """Grants the specified role to the person.
1833 PIs can only grant the tech and user roles to users and techs at their
1834 sites. Admins can grant any role to any user.
1835 Returns 1 if successful, faults otherwise.
1839 logger.warning("SLABDRIVER AddRoleToPerson EMPTY - DO NOTHING \r\n ")
1842 #TODO AddPersonKey 04/07/2012 SA
1843 def AddPersonKey(self, auth, person_id_or_email, key_fields=None):
1844 """Adds a new key to the specified account.
1845 Non-admins can only modify their own keys.
1846 Returns the new key_id (> 0) if successful, faults otherwise.
1850 logger.warning("SLABDRIVER AddPersonKey EMPTY - DO NOTHING \r\n ")
1853 def DeleteLeases(self, leases_id_list, slice_hrn ):
1854 logger.debug("SLABDRIVER DeleteLeases leases_id_list %s slice_hrn %s \
1855 \r\n " %(leases_id_list, slice_hrn))
1856 for job_id in leases_id_list:
1857 self.DeleteJobs(job_id, slice_hrn)