4 from datetime import datetime
6 from sfa.util.faults import SliverDoesNotExist, UnknownSfaType
7 from sfa.util.sfalogging import logger
8 from sfa.storage.alchemy import dbsession
9 from sfa.storage.model import RegRecord, RegUser, RegSlice
10 from sqlalchemy.orm import joinedload
13 from sfa.managers.driver import Driver
14 from sfa.rspecs.version_manager import VersionManager
15 from sfa.rspecs.rspec import RSpec
17 from sfa.util.xrn import Xrn, hrn_to_urn, get_authority
20 ## thierry: everything that is API-related (i.e. handling incoming requests)
22 # SlabDriver should be really only about talking to the senslab testbed
25 from sfa.senslab.OARrestapi import OARrestapi
26 from sfa.senslab.LDAPapi import LDAPapi
28 from sfa.senslab.slabpostgres import SlabDB, slab_dbsession, SenslabXP
31 from sfa.senslab.slabaggregate import SlabAggregate, slab_xrn_to_hostname, \
33 from sfa.senslab.slabslices import SlabSlices
38 # this inheritance scheme is so that the driver object can receive
39 # GetNodes or GetSites sorts of calls directly
40 # and thus minimize the differences in the managers with the pl version
44 class SlabDriver(Driver):
45 """ Senslab Driver class inherited from Driver generic class.
47 Contains methods compliant with the SFA standard and the testbed
48 infrastructure (calls to LDAP and OAR).
50 def __init__(self, config):
51 Driver.__init__ (self, config)
53 self.hrn = config.SFA_INTERFACE_HRN
54 self.root_auth = config.SFA_REGISTRY_ROOT_AUTH
55 self.oar = OARrestapi()
57 self.time_format = "%Y-%m-%d %H:%M:%S"
58 self.db = SlabDB(config, debug = False)
59 self.grain = 600 # 10 mins lease
63 def sliver_status(self, slice_urn, slice_hrn):
64 """Receive a status request for slice named urn/hrn
65 urn:publicid:IDN+senslab+nturro_slice hrn senslab.nturro_slice
66 shall return a structure as described in
67 http://groups.geni.net/geni/wiki/GAPI_AM_API_V2#SliverStatus
68 NT : not sure if we should implement this or not, but used by sface.
72 #First get the slice with the slice hrn
73 slice_list = self.GetSlices(slice_filter = slice_hrn, \
74 slice_filter_type = 'slice_hrn')
76 if len(slice_list) is 0:
77 raise SliverDoesNotExist("%s slice_hrn" % (slice_hrn))
79 #Used for fetching the user info witch comes along the slice info
80 one_slice = slice_list[0]
83 #Make a list of all the nodes hostnames in use for this slice
85 #for single_slice in slice_list:
86 #for node in single_slice['node_ids']:
87 #slice_nodes_list.append(node['hostname'])
88 for node in one_slice:
89 slice_nodes_list.append(node['hostname'])
91 #Get all the corresponding nodes details
92 nodes_all = self.GetNodes({'hostname':slice_nodes_list},
93 ['node_id', 'hostname','site','boot_state'])
94 nodeall_byhostname = dict([(one_node['hostname'], one_node) \
95 for one_node in nodes_all])
99 for single_slice in slice_list:
102 top_level_status = 'empty'
105 ['geni_urn','pl_login','geni_status','geni_resources'], None)
106 result['pl_login'] = one_slice['reg_researchers']['hrn']
107 logger.debug("Slabdriver - sliver_status Sliver status \
108 urn %s hrn %s single_slice %s \r\n " \
109 %(slice_urn, slice_hrn, single_slice))
111 if 'node_ids' not in single_slice:
113 result['geni_status'] = top_level_status
114 result['geni_resources'] = []
117 top_level_status = 'ready'
119 #A job is running on Senslab for this slice
120 # report about the local nodes that are in the slice only
122 result['geni_urn'] = slice_urn
126 #timestamp = float(sl['startTime']) + float(sl['walltime'])
127 #result['pl_expires'] = strftime(self.time_format, \
128 #gmtime(float(timestamp)))
129 #result['slab_expires'] = strftime(self.time_format,\
130 #gmtime(float(timestamp)))
133 for node in single_slice['node_ids']:
135 #res['slab_hostname'] = node['hostname']
136 #res['slab_boot_state'] = node['boot_state']
138 res['pl_hostname'] = node['hostname']
139 res['pl_boot_state'] = \
140 nodeall_byhostname[node['hostname']]['boot_state']
141 #res['pl_last_contact'] = strftime(self.time_format, \
142 #gmtime(float(timestamp)))
143 sliver_id = Xrn(slice_urn, type='slice', \
144 id=nodeall_byhostname[node['hostname']]['node_id'], \
145 authority=self.hrn).urn
147 res['geni_urn'] = sliver_id
148 node_name = node['hostname']
149 if nodeall_byhostname[node_name]['boot_state'] == 'Alive':
151 res['geni_status'] = 'ready'
153 res['geni_status'] = 'failed'
154 top_level_status = 'failed'
156 res['geni_error'] = ''
158 resources.append(res)
160 result['geni_status'] = top_level_status
161 result['geni_resources'] = resources
162 logger.debug("SLABDRIVER \tsliver_statusresources %s res %s "\
168 #def get_user(self, hrn):
169 return dbsession.query(RegRecord).filter_by(hrn = hrn).first()
172 def create_sliver (self, slice_urn, slice_hrn, creds, rspec_string, \
174 aggregate = SlabAggregate(self)
176 slices = SlabSlices(self)
177 peer = slices.get_peer(slice_hrn)
178 sfa_peer = slices.get_sfa_peer(slice_hrn)
181 if not isinstance(creds, list):
185 slice_record = users[0].get('slice_record', {})
186 logger.debug("SLABDRIVER.PY \t ===============create_sliver \t\
187 creds %s \r\n \r\n users %s" \
189 slice_record['user'] = {'keys':users[0]['keys'], \
190 'email':users[0]['email'], \
191 'hrn':slice_record['reg-researchers'][0]}
193 rspec = RSpec(rspec_string)
194 logger.debug("SLABDRIVER.PY \t create_sliver \trspec.version \
195 %s slice_record %s users %s" \
196 %(rspec.version,slice_record, users))
199 # ensure site record exists?
200 # ensure slice record exists
201 #Removed options to verify_slice SA 14/08/12
202 sfa_slice = slices.verify_slice(slice_hrn, slice_record, peer, \
205 # ensure person records exists
206 #verify_persons returns added persons but since the return value
208 slices.verify_persons(slice_hrn, sfa_slice, users, peer, \
209 sfa_peer, options=options)
210 #requested_attributes returned by rspec.version.get_slice_attributes()
211 #unused, removed SA 13/08/12
212 rspec.version.get_slice_attributes()
214 logger.debug("SLABDRIVER.PY create_sliver slice %s " %(sfa_slice))
216 # add/remove slice from nodes
218 requested_slivers = [node.get('component_id') \
219 for node in rspec.version.get_nodes_with_slivers()\
220 if node.get('authority_id') is self.root_auth]
221 l = [ node for node in rspec.version.get_nodes_with_slivers() ]
222 logger.debug("SLADRIVER \tcreate_sliver requested_slivers \
223 requested_slivers %s listnodes %s" \
224 %(requested_slivers,l))
225 #verify_slice_nodes returns nodes, but unused here. Removed SA 13/08/12.
226 #slices.verify_slice_nodes(sfa_slice, requested_slivers, peer)
229 requested_lease_list = []
231 logger.debug("SLABDRIVER.PY \tcreate_sliver AVANTLEASE " )
233 for lease in rspec.version.get_leases():
234 single_requested_lease = {}
235 logger.debug("SLABDRIVER.PY \tcreate_sliver lease %s " %(lease))
237 if not lease.get('lease_id'):
238 if get_authority(lease['component_id']) == self.root_auth:
239 single_requested_lease['hostname'] = \
240 slab_xrn_to_hostname(\
241 lease.get('component_id').strip())
242 single_requested_lease['start_time'] = \
243 lease.get('start_time')
244 single_requested_lease['duration'] = lease.get('duration')
245 #Check the experiment's duration is valid before adding
246 #the lease to the requested leases list
247 duration_in_seconds = \
248 int(single_requested_lease['duration'])*60
249 if duration_in_seconds > self.GetLeaseGranularity():
250 requested_lease_list.append(single_requested_lease)
252 #Create dict of leases by start_time, regrouping nodes reserved
254 #time, for the same amount of time = one job on OAR
255 requested_job_dict = {}
256 for lease in requested_lease_list:
258 #In case it is an asap experiment start_time is empty
259 if lease['start_time'] == '':
260 lease['start_time'] = '0'
262 if lease['start_time'] not in requested_job_dict:
263 if isinstance(lease['hostname'], str):
264 lease['hostname'] = [lease['hostname']]
266 requested_job_dict[lease['start_time']] = lease
269 job_lease = requested_job_dict[lease['start_time']]
270 if lease['duration'] == job_lease['duration'] :
271 job_lease['hostname'].append(lease['hostname'])
276 logger.debug("SLABDRIVER.PY \tcreate_sliver requested_job_dict %s "\
277 %(requested_job_dict))
278 #verify_slice_leases returns the leases , but the return value is unused
279 #here. Removed SA 13/08/12
280 slices.verify_slice_leases(sfa_slice, \
281 requested_job_dict, peer)
283 return aggregate.get_rspec(slice_xrn=slice_urn, login=sfa_slice['login'], version=rspec.version)
286 def delete_sliver (self, slice_urn, slice_hrn, creds, options):
288 sfa_slice_list = self.GetSlices(slice_filter = slice_hrn, \
289 slice_filter_type = 'slice_hrn')
291 if not sfa_slice_list:
294 #Delete all in the slice
295 for sfa_slice in sfa_slice_list:
298 logger.debug("SLABDRIVER.PY delete_sliver slice %s" %(sfa_slice))
299 slices = SlabSlices(self)
300 # determine if this is a peer slice
302 peer = slices.get_peer(slice_hrn)
303 #TODO delete_sliver SA : UnBindObjectFromPeer should be
304 #used when there is another
305 #senslab testbed, which is not the case 14/08/12 .
307 logger.debug("SLABDRIVER.PY delete_sliver peer %s" %(peer))
310 self.UnBindObjectFromPeer('slice', \
311 sfa_slice['record_id_slice'], \
313 self.DeleteSliceFromNodes(sfa_slice)
316 self.BindObjectToPeer('slice', \
317 sfa_slice['record_id_slice'], \
318 peer, sfa_slice['peer_slice_id'])
322 def AddSlice(slice_record, user_record):
323 """Add slice to the sfa tables. Called by verify_slice
324 during lease/sliver creation.
327 sfa_record = RegSlice(hrn=slice_record['slice_hrn'],
328 gid=slice_record['gid'],
329 pointer=slice_record['slice_id'],
330 authority=slice_record['authority'])
332 logger.debug("SLABDRIVER.PY AddSlice sfa_record %s user_record %s" \
333 %(sfa_record, user_record))
334 sfa_record.just_created()
335 dbsession.add(sfa_record)
337 #Update the reg-researcher dependance table
338 sfa_record.reg_researchers = [user_record]
341 #Update the senslab table with the new slice
342 #slab_slice = SenslabXP( slice_hrn = slice_record['slice_hrn'], \
343 #record_id_slice = sfa_record.record_id , \
344 #record_id_user = slice_record['record_id_user'], \
345 #peer_authority = slice_record['peer_authority'])
347 #logger.debug("SLABDRIVER.PY \tAddSlice slice_record %s \
348 #slab_slice %s sfa_record %s" \
349 #%(slice_record,slab_slice, sfa_record))
350 #slab_dbsession.add(slab_slice)
351 #slab_dbsession.commit()
354 # first 2 args are None in case of resource discovery
355 def list_resources (self, slice_urn, slice_hrn, creds, options):
356 #cached_requested = options.get('cached', True)
358 version_manager = VersionManager()
359 # get the rspec's return format from options
361 version_manager.get_version(options.get('geni_rspec_version'))
362 version_string = "rspec_%s" % (rspec_version)
364 #panos adding the info option to the caching key (can be improved)
365 if options.get('info'):
366 version_string = version_string + "_" + \
367 options.get('info', 'default')
369 # Adding the list_leases option to the caching key
370 if options.get('list_leases'):
371 version_string = version_string + "_"+options.get('list_leases', 'default')
373 # Adding geni_available to caching key
374 if options.get('geni_available'):
375 version_string = version_string + "_" + str(options.get('geni_available'))
377 # look in cache first
378 #if cached_requested and self.cache and not slice_hrn:
379 #rspec = self.cache.get(version_string)
381 #logger.debug("SlabDriver.ListResources: \
382 #returning cached advertisement")
385 #panos: passing user-defined options
386 aggregate = SlabAggregate(self)
387 #origin_hrn = Credential(string=creds[0]).get_gid_caller().get_hrn()
388 #options.update({'origin_hrn':origin_hrn})
389 rspec = aggregate.get_rspec(slice_xrn=slice_urn, \
390 version=rspec_version, options=options)
393 #if self.cache and not slice_hrn:
394 #logger.debug("Slab.ListResources: stores advertisement in cache")
395 #self.cache.add(version_string, rspec)
400 def list_slices (self, creds, options):
401 # look in cache first
403 #slices = self.cache.get('slices')
405 #logger.debug("PlDriver.list_slices returns from cache")
410 slices = self.GetSlices()
411 logger.debug("SLABDRIVER.PY \tlist_slices hrn %s \r\n \r\n" %(slices))
412 slice_hrns = [slab_slice['hrn'] for slab_slice in slices]
414 slice_urns = [hrn_to_urn(slice_hrn, 'slice') \
415 for slice_hrn in slice_hrns]
419 #logger.debug ("SlabDriver.list_slices stores value in cache")
420 #self.cache.add('slices', slice_urns)
425 def register (self, sfa_record, hrn, pub_key):
427 Adding new user, slice, node or site should not be handled
431 Adding users = LDAP Senslab
432 Adding slice = Import from LDAP users
438 def update (self, old_sfa_record, new_sfa_record, hrn, new_key):
439 """No site or node record update allowed in Senslab."""
441 pointer = old_sfa_record['pointer']
442 old_sfa_record_type = old_sfa_record['type']
444 # new_key implemented for users only
445 if new_key and old_sfa_record_type not in [ 'user' ]:
446 raise UnknownSfaType(old_sfa_record_type)
448 #if (type == "authority"):
449 #self.shell.UpdateSite(pointer, new_sfa_record)
451 if old_sfa_record_type == "slice":
452 slab_record = self.sfa_fields_to_slab_fields(old_sfa_record_type, \
454 if 'name' in slab_record:
455 slab_record.pop('name')
456 #Prototype should be UpdateSlice(self,
457 #auth, slice_id_or_name, slice_fields)
458 #Senslab cannot update slice since slice = job
459 #so we must delete and create another job
460 self.UpdateSlice(pointer, slab_record)
462 elif old_sfa_record_type == "user":
464 all_fields = new_sfa_record
465 for key in all_fields.keys():
466 if key in ['first_name', 'last_name', 'title', 'email',
467 'password', 'phone', 'url', 'bio', 'accepted_aup',
469 update_fields[key] = all_fields[key]
470 self.UpdatePerson(pointer, update_fields)
473 # must check this key against the previous one if it exists
474 persons = self.GetPersons(['key_ids'])
476 keys = person['key_ids']
477 keys = self.GetKeys(person['key_ids'])
479 # Delete all stale keys
482 if new_key != key['key']:
483 self.DeleteKey(key['key_id'])
487 self.AddPersonKey(pointer, {'key_type': 'ssh', \
494 def remove (self, sfa_record):
495 sfa_record_type = sfa_record['type']
496 hrn = sfa_record['hrn']
497 if sfa_record_type == 'user':
499 #get user from senslab ldap
500 person = self.GetPersons(sfa_record)
501 #No registering at a given site in Senslab.
502 #Once registered to the LDAP, all senslab sites are
505 #Mark account as disabled in ldap
506 self.DeletePerson(sfa_record)
507 elif sfa_record_type == 'slice':
508 if self.GetSlices(slice_filter = hrn, \
509 slice_filter_type = 'slice_hrn'):
510 self.DeleteSlice(sfa_record)
512 #elif type == 'authority':
513 #if self.GetSites(pointer):
514 #self.DeleteSite(pointer)
520 #TODO clean GetPeers. 05/07/12SA
522 def GetPeers ( auth = None, peer_filter=None, return_fields_list=None):
524 existing_records = {}
525 existing_hrns_by_types = {}
526 logger.debug("SLABDRIVER \tGetPeers auth = %s, peer_filter %s, \
527 return_field %s " %(auth , peer_filter, return_fields_list))
528 all_records = dbsession.query(RegRecord).filter(RegRecord.type.like('%authority%')).all()
530 for record in all_records:
531 existing_records[(record.hrn, record.type)] = record
532 if record.type not in existing_hrns_by_types:
533 existing_hrns_by_types[record.type] = [record.hrn]
535 existing_hrns_by_types[record.type].append(record.hrn)
538 logger.debug("SLABDRIVER \tGetPeer\texisting_hrns_by_types %s "\
539 %( existing_hrns_by_types))
544 records_list.append(existing_records[(peer_filter,'authority')])
546 for hrn in existing_hrns_by_types['authority']:
547 records_list.append(existing_records[(hrn,'authority')])
549 logger.debug("SLABDRIVER \tGetPeer \trecords_list %s " \
555 return_records = records_list
556 if not peer_filter and not return_fields_list:
560 logger.debug("SLABDRIVER \tGetPeer return_records %s " \
562 return return_records
565 #TODO : Handling OR request in make_ldap_filters_from_records
566 #instead of the for loop
567 #over the records' list
568 def GetPersons(self, person_filter=None):
570 person_filter should be a list of dictionnaries when not set to None.
571 Returns a list of users whose accounts are enabled found in ldap.
574 logger.debug("SLABDRIVER \tGetPersons person_filter %s" \
577 if person_filter and isinstance(person_filter, list):
578 #If we are looking for a list of users (list of dict records)
579 #Usually the list contains only one user record
580 for searched_attributes in person_filter:
582 #Get only enabled user accounts in senslab LDAP :
583 #add a filter for make_ldap_filters_from_record
584 person = self.ldap.LdapFindUser(searched_attributes, \
585 is_user_enabled=True)
586 #If a person was found, append it to the list
588 person_list.append(person)
590 #If the list is empty, return None
591 if len(person_list) is 0:
595 #Get only enabled user accounts in senslab LDAP :
596 #add a filter for make_ldap_filters_from_record
597 person_list = self.ldap.LdapFindUser(is_user_enabled=True)
601 def GetTimezone(self):
602 """ Get the OAR servier time and timezone.
603 Unused SA 16/11/12"""
604 server_timestamp, server_tz = self.oar.parser.\
605 SendRequest("GET_timezone")
606 return server_timestamp, server_tz
609 def DeleteJobs(self, job_id, slice_hrn):
610 if not job_id or job_id is -1:
612 username = slice_hrn.split(".")[-1].rstrip("_slice")
614 reqdict['method'] = "delete"
615 reqdict['strval'] = str(job_id)
618 answer = self.oar.POSTRequestToOARRestAPI('DELETE_jobs_id', \
620 logger.debug("SLABDRIVER \tDeleteJobs jobid %s \r\n answer %s \
621 username %s" %(job_id, answer, username))
626 ##TODO : Unused GetJobsId ? SA 05/07/12
627 #def GetJobsId(self, job_id, username = None ):
629 #Details about a specific job.
630 #Includes details about submission time, jot type, state, events,
631 #owner, assigned ressources, walltime etc...
635 #node_list_k = 'assigned_network_address'
636 ##Get job info from OAR
637 #job_info = self.oar.parser.SendRequest(req, job_id, username)
639 #logger.debug("SLABDRIVER \t GetJobsId %s " %(job_info))
641 #if job_info['state'] == 'Terminated':
642 #logger.debug("SLABDRIVER \t GetJobsId job %s TERMINATED"\
645 #if job_info['state'] == 'Error':
646 #logger.debug("SLABDRIVER \t GetJobsId ERROR message %s "\
651 #logger.error("SLABDRIVER \tGetJobsId KeyError")
654 #parsed_job_info = self.get_info_on_reserved_nodes(job_info, \
656 ##Replaces the previous entry
657 ##"assigned_network_address" / "reserved_resources"
659 #job_info.update({'node_ids':parsed_job_info[node_list_k]})
660 #del job_info[node_list_k]
661 #logger.debug(" \r\nSLABDRIVER \t GetJobsId job_info %s " %(job_info))
665 def GetJobsResources(self, job_id, username = None):
666 #job_resources=['reserved_resources', 'assigned_resources',\
667 #'job_id', 'job_uri', 'assigned_nodes',\
669 #assigned_res = ['resource_id', 'resource_uri']
670 #assigned_n = ['node', 'node_uri']
672 req = "GET_jobs_id_resources"
675 #Get job resources list from OAR
676 node_id_list = self.oar.parser.SendRequest(req, job_id, username)
677 logger.debug("SLABDRIVER \t GetJobsResources %s " %(node_id_list))
680 self.__get_hostnames_from_oar_node_ids(node_id_list)
683 #Replaces the previous entry "assigned_network_address" /
684 #"reserved_resources"
686 job_info = {'node_ids': hostname_list}
691 def get_info_on_reserved_nodes(self, job_info, node_list_name):
692 #Get the list of the testbed nodes records and make a
693 #dictionnary keyed on the hostname out of it
694 node_list_dict = self.GetNodes()
695 #node_hostname_list = []
696 node_hostname_list = [node['hostname'] for node in node_list_dict]
697 #for node in node_list_dict:
698 #node_hostname_list.append(node['hostname'])
699 node_dict = dict(zip(node_hostname_list, node_list_dict))
701 reserved_node_hostname_list = []
702 for index in range(len(job_info[node_list_name])):
703 #job_info[node_list_name][k] =
704 reserved_node_hostname_list[index] = \
705 node_dict[job_info[node_list_name][index]]['hostname']
707 logger.debug("SLABDRIVER \t get_info_on_reserved_nodes \
708 reserved_node_hostname_list %s" \
709 %(reserved_node_hostname_list))
711 logger.error("SLABDRIVER \t get_info_on_reserved_nodes KEYERROR " )
713 return reserved_node_hostname_list
715 def GetNodesCurrentlyInUse(self):
716 """Returns a list of all the nodes already involved in an oar job"""
717 return self.oar.parser.SendRequest("GET_running_jobs")
719 def __get_hostnames_from_oar_node_ids(self, resource_id_list ):
720 full_nodes_dict_list = self.GetNodes()
721 #Put the full node list into a dictionary keyed by oar node id
722 oar_id_node_dict = {}
723 for node in full_nodes_dict_list:
724 oar_id_node_dict[node['oar_id']] = node
726 #logger.debug("SLABDRIVER \t __get_hostnames_from_oar_node_ids\
727 #oar_id_node_dict %s" %(oar_id_node_dict))
729 hostname_dict_list = []
730 for resource_id in resource_id_list:
731 #Because jobs requested "asap" do not have defined resources
732 if resource_id is not "Undefined":
733 hostname_dict_list.append(\
734 oar_id_node_dict[resource_id]['hostname'])
736 #hostname_list.append(oar_id_node_dict[resource_id]['hostname'])
737 return hostname_dict_list
739 def GetReservedNodes(self, username = None):
740 #Get the nodes in use and the reserved nodes
741 reservation_dict_list = \
742 self.oar.parser.SendRequest("GET_reserved_nodes", \
746 for resa in reservation_dict_list:
747 logger.debug ("GetReservedNodes resa %s"%(resa))
748 #dict list of hostnames and their site
749 resa['reserved_nodes'] = \
750 self.__get_hostnames_from_oar_node_ids(resa['resource_ids'])
752 #del resa['resource_ids']
753 return reservation_dict_list
755 def GetNodes(self, node_filter_dict = None, return_fields_list = None):
757 node_filter_dict : dictionnary of lists
760 node_dict_by_id = self.oar.parser.SendRequest("GET_resources_full")
761 node_dict_list = node_dict_by_id.values()
762 logger.debug (" SLABDRIVER GetNodes node_filter_dict %s \
763 return_fields_list %s "%(node_filter_dict, return_fields_list))
764 #No filtering needed return the list directly
765 if not (node_filter_dict or return_fields_list):
766 return node_dict_list
768 return_node_list = []
770 for filter_key in node_filter_dict:
772 #Filter the node_dict_list by each value contained in the
773 #list node_filter_dict[filter_key]
774 for value in node_filter_dict[filter_key]:
775 for node in node_dict_list:
776 if node[filter_key] == value:
777 if return_fields_list :
779 for k in return_fields_list:
781 return_node_list.append(tmp)
783 return_node_list.append(node)
785 logger.log_exc("GetNodes KeyError")
789 return return_node_list
792 def GetSites(self, site_filter_name_list = None, return_fields_list = None):
793 site_dict = self.oar.parser.SendRequest("GET_sites")
794 #site_dict : dict where the key is the sit ename
795 return_site_list = []
796 if not ( site_filter_name_list or return_fields_list):
797 return_site_list = site_dict.values()
798 return return_site_list
800 for site_filter_name in site_filter_name_list:
801 if site_filter_name in site_dict:
802 if return_fields_list:
803 for field in return_fields_list:
806 tmp[field] = site_dict[site_filter_name][field]
808 logger.error("GetSites KeyError %s "%(field))
810 return_site_list.append(tmp)
812 return_site_list.append( site_dict[site_filter_name])
815 return return_site_list
818 def _sql_get_slice_info( slice_filter ):
819 #DO NOT USE RegSlice - reg_researchers to get the hrn
820 #of the user otherwise will mess up the RegRecord in
821 #Resolve, don't know why - SA 08/08/2012
823 #Only one entry for one user = one slice in slab_xp table
824 #slicerec = dbsession.query(RegRecord).filter_by(hrn = slice_filter).first()
825 raw_slicerec = dbsession.query(RegSlice).options(joinedload('reg_researchers')).filter_by(hrn = slice_filter).first()
826 #raw_slicerec = dbsession.query(RegRecord).filter_by(hrn = slice_filter).first()
829 #raw_slicerec.reg_researchers
830 raw_slicerec = raw_slicerec.__dict__
831 logger.debug(" SLABDRIVER \t get_slice_info slice_filter %s \
832 raw_slicerec %s"%(slice_filter, raw_slicerec))
833 slicerec = raw_slicerec
834 #only one researcher per slice so take the first one
835 #slicerec['reg_researchers'] = raw_slicerec['reg_researchers']
836 #del slicerec['reg_researchers']['_sa_instance_state']
843 def _sql_get_slice_info_from_user(slice_filter ):
844 #slicerec = dbsession.query(RegRecord).filter_by(record_id = slice_filter).first()
845 raw_slicerec = dbsession.query(RegUser).options(joinedload('reg_slices_as_researcher')).filter_by(record_id = slice_filter).first()
846 #raw_slicerec = dbsession.query(RegRecord).filter_by(record_id = slice_filter).first()
847 #Put it in correct order
848 user_needed_fields = ['peer_authority', 'hrn', 'last_updated', 'classtype', 'authority', 'gid', 'record_id', 'date_created', 'type', 'email', 'pointer']
849 slice_needed_fields = ['peer_authority', 'hrn', 'last_updated', 'classtype', 'authority', 'gid', 'record_id', 'date_created', 'type', 'pointer']
851 #raw_slicerec.reg_slices_as_researcher
852 raw_slicerec = raw_slicerec.__dict__
855 dict([(k, raw_slicerec['reg_slices_as_researcher'][0].__dict__[k]) \
856 for k in slice_needed_fields])
857 slicerec['reg_researchers'] = dict([(k, raw_slicerec[k]) \
858 for k in user_needed_fields])
859 #TODO Handle multiple slices for one user SA 10/12/12
860 #for now only take the first slice record associated to the rec user
861 ##slicerec = raw_slicerec['reg_slices_as_researcher'][0].__dict__
862 #del raw_slicerec['reg_slices_as_researcher']
863 #slicerec['reg_researchers'] = raw_slicerec
864 ##del slicerec['_sa_instance_state']
871 def _get_slice_records(self, slice_filter = None, \
872 slice_filter_type = None):
876 #Get list of slices based on the slice hrn
877 if slice_filter_type == 'slice_hrn':
879 #if get_authority(slice_filter) == self.root_auth:
880 #login = slice_filter.split(".")[1].split("_")[0]
882 slicerec = self._sql_get_slice_info(slice_filter)
888 #Get slice based on user id
889 if slice_filter_type == 'record_id_user':
891 slicerec = self._sql_get_slice_info_from_user(slice_filter)
894 fixed_slicerec_dict = slicerec
895 #At this point if the there is no login it means
896 #record_id_user filter has been used for filtering
898 ##If theslice record is from senslab
899 #if fixed_slicerec_dict['peer_authority'] is None:
900 #login = fixed_slicerec_dict['hrn'].split(".")[1].split("_")[0]
901 #return login, fixed_slicerec_dict
902 return fixed_slicerec_dict
904 def GetSlices(self, slice_filter = None, slice_filter_type = None, login=None):
905 """ Get the slice records from the slab db.
906 Returns a slice ditc if slice_filter and slice_filter_type
908 Returns a list of slice dictionnaries if there are no filters
913 authorized_filter_types_list = ['slice_hrn', 'record_id_user']
914 return_slicerec_dictlist = []
916 #First try to get information on the slice based on the filter provided
917 if slice_filter_type in authorized_filter_types_list:
918 fixed_slicerec_dict = \
919 self._get_slice_records(slice_filter, slice_filter_type)
920 #login, fixed_slicerec_dict = \
921 #self._get_slice_records(slice_filter, slice_filter_type)
922 logger.debug(" SLABDRIVER \tGetSlices login %s \
923 slice record %s slice_filter %s slice_filter_type %s "\
924 %(login, fixed_slicerec_dict,slice_filter, slice_filter_type))
927 #Now we have the slice record fixed_slicerec_dict, get the
928 #jobs associated to this slice
929 #leases_list = self.GetReservedNodes(username = login)
930 leases_list = self.GetLeases(login = login)
931 #If no job is running or no job scheduled
932 #return only the slice record
933 if leases_list == [] and fixed_slicerec_dict:
934 return_slicerec_dictlist.append(fixed_slicerec_dict)
936 #If several jobs for one slice , put the slice record into
937 # each lease information dict
938 for lease in leases_list :
941 reserved_list = lease['reserved_nodes']
943 slicerec_dict['oar_job_id'] = lease['lease_id']
944 slicerec_dict.update({'list_node_ids':{'hostname':reserved_list}})
945 slicerec_dict.update({'node_ids':lease['reserved_nodes']})
947 #Update lease dict with the slice record
948 if fixed_slicerec_dict:
949 fixed_slicerec_dict['oar_job_id'] = []
950 fixed_slicerec_dict['oar_job_id'].append(slicerec_dict['oar_job_id'])
951 slicerec_dict.update(fixed_slicerec_dict)
952 #slicerec_dict.update({'hrn':\
953 #str(fixed_slicerec_dict['slice_hrn'])})
956 return_slicerec_dictlist.append(slicerec_dict)
957 logger.debug("SLABDRIVER.PY \tGetSlices \
958 slicerec_dict %s return_slicerec_dictlist %s \
959 lease['reserved_nodes'] \
960 %s" %(slicerec_dict, return_slicerec_dictlist, \
961 lease['reserved_nodes'] ))
963 logger.debug("SLABDRIVER.PY \tGetSlices RETURN \
964 return_slicerec_dictlist %s" \
965 %(return_slicerec_dictlist))
967 return return_slicerec_dictlist
971 #Get all slices from the senslab sfa database ,
972 #put them in dict format
973 #query_slice_list = dbsession.query(RegRecord).all()
974 query_slice_list = dbsession.query(RegSlice).options(joinedload('reg_researchers')).all()
975 #query_slice_list = dbsession.query(RegRecord).filter_by(type='slice').all()
976 #query_slice_list = slab_dbsession.query(SenslabXP).all()
977 return_slicerec_dictlist = []
978 for record in query_slice_list:
979 tmp = record.__dict__
980 tmp['reg_researchers'] = tmp['reg_researchers'][0].__dict__
981 #del tmp['reg_researchers']['_sa_instance_state']
982 return_slicerec_dictlist.append(tmp)
983 #return_slicerec_dictlist.append(record.__dict__)
985 #Get all the jobs reserved nodes
986 leases_list = self.GetReservedNodes()
989 for fixed_slicerec_dict in return_slicerec_dictlist:
991 #Check if the slice belongs to a senslab user
992 if fixed_slicerec_dict['peer_authority'] is None:
993 owner = fixed_slicerec_dict['hrn'].split(".")[1].split("_")[0]
996 for lease in leases_list:
997 if owner == lease['user']:
998 slicerec_dict['oar_job_id'] = lease['lease_id']
1000 #for reserved_node in lease['reserved_nodes']:
1001 logger.debug("SLABDRIVER.PY \tGetSlices lease %s "\
1004 reserved_list = lease['reserved_nodes']
1006 slicerec_dict.update({'node_ids':lease['reserved_nodes']})
1007 slicerec_dict.update({'list_node_ids':{'hostname':reserved_list}})
1008 slicerec_dict.update(fixed_slicerec_dict)
1009 #slicerec_dict.update({'hrn':\
1010 #str(fixed_slicerec_dict['slice_hrn'])})
1011 #return_slicerec_dictlist.append(slicerec_dict)
1012 fixed_slicerec_dict.update(slicerec_dict)
1014 logger.debug("SLABDRIVER.PY \tGetSlices RETURN \
1015 return_slicerec_dictlist %s \slice_filter %s " \
1016 %(return_slicerec_dictlist, slice_filter))
1018 return return_slicerec_dictlist
1021 def testbed_name (self):
1024 # 'geni_request_rspec_versions' and 'geni_ad_rspec_versions' are mandatory
1025 def aggregate_version (self):
1026 version_manager = VersionManager()
1027 ad_rspec_versions = []
1028 request_rspec_versions = []
1029 for rspec_version in version_manager.versions:
1030 if rspec_version.content_type in ['*', 'ad']:
1031 ad_rspec_versions.append(rspec_version.to_dict())
1032 if rspec_version.content_type in ['*', 'request']:
1033 request_rspec_versions.append(rspec_version.to_dict())
1035 'testbed':self.testbed_name(),
1036 'geni_request_rspec_versions': request_rspec_versions,
1037 'geni_ad_rspec_versions': ad_rspec_versions,
1043 # Convert SFA fields to PLC fields for use when registering up updating
1044 # registry record in the PLC database
1046 # @param type type of record (user, slice, ...)
1047 # @param hrn human readable name
1048 # @param sfa_fields dictionary of SFA fields
1049 # @param slab_fields dictionary of PLC fields (output)
1051 def sfa_fields_to_slab_fields(sfa_type, hrn, record):
1055 #for field in record:
1056 # slab_record[field] = record[field]
1058 if sfa_type == "slice":
1059 #instantion used in get_slivers ?
1060 if not "instantiation" in slab_record:
1061 slab_record["instantiation"] = "senslab-instantiated"
1062 #slab_record["hrn"] = hrn_to_pl_slicename(hrn)
1063 #Unused hrn_to_pl_slicename because Slab's hrn already
1064 #in the appropriate form SA 23/07/12
1065 slab_record["hrn"] = hrn
1066 logger.debug("SLABDRIVER.PY sfa_fields_to_slab_fields \
1067 slab_record %s " %(slab_record['hrn']))
1069 slab_record["url"] = record["url"]
1070 if "description" in record:
1071 slab_record["description"] = record["description"]
1072 if "expires" in record:
1073 slab_record["expires"] = int(record["expires"])
1075 #nodes added by OAR only and then imported to SFA
1076 #elif type == "node":
1077 #if not "hostname" in slab_record:
1078 #if not "hostname" in record:
1079 #raise MissingSfaInfo("hostname")
1080 #slab_record["hostname"] = record["hostname"]
1081 #if not "model" in slab_record:
1082 #slab_record["model"] = "geni"
1085 #elif type == "authority":
1086 #slab_record["login_base"] = hrn_to_slab_login_base(hrn)
1088 #if not "name" in slab_record:
1089 #slab_record["name"] = hrn
1091 #if not "abbreviated_name" in slab_record:
1092 #slab_record["abbreviated_name"] = hrn
1094 #if not "enabled" in slab_record:
1095 #slab_record["enabled"] = True
1097 #if not "is_public" in slab_record:
1098 #slab_record["is_public"] = True
1105 def __transforms_timestamp_into_date(self, xp_utc_timestamp = None):
1106 """ Transforms unix timestamp into valid OAR date format """
1108 #Used in case of a scheduled experiment (not immediate)
1109 #To run an XP immediately, don't specify date and time in RSpec
1110 #They will be set to None.
1111 if xp_utc_timestamp:
1112 #transform the xp_utc_timestamp into server readable time
1113 xp_server_readable_date = datetime.fromtimestamp(int(\
1114 xp_utc_timestamp)).strftime(self.time_format)
1116 return xp_server_readable_date
1124 def LaunchExperimentOnOAR(self, added_nodes, slice_name, \
1125 lease_start_time, lease_duration, slice_user=None):
1127 lease_dict['lease_start_time'] = lease_start_time
1128 lease_dict['lease_duration'] = lease_duration
1129 lease_dict['added_nodes'] = added_nodes
1130 lease_dict['slice_name'] = slice_name
1131 lease_dict['slice_user'] = slice_user
1132 lease_dict['grain'] = self.GetLeaseGranularity()
1133 lease_dict['time_format'] = self.time_format
1136 def __create_job_structure_request_for_OAR(lease_dict):
1137 """ Creates the structure needed for a correct POST on OAR.
1138 Makes the timestamp transformation into the appropriate format.
1139 Sends the POST request to create the job with the resources in
1148 reqdict['workdir'] = '/tmp'
1149 reqdict['resource'] = "{network_address in ("
1151 for node in lease_dict['added_nodes']:
1152 logger.debug("\r\n \r\n OARrestapi \t \
1153 __create_job_structure_request_for_OAR node %s" %(node))
1155 # Get the ID of the node
1157 reqdict['resource'] += "'" + nodeid + "', "
1158 nodeid_list.append(nodeid)
1160 custom_length = len(reqdict['resource'])- 2
1161 reqdict['resource'] = reqdict['resource'][0:custom_length] + \
1162 ")}/nodes=" + str(len(nodeid_list))
1164 def __process_walltime(duration):
1165 """ Calculates the walltime in seconds from the duration in H:M:S
1166 specified in the RSpec.
1170 # Fixing the walltime by adding a few delays.
1171 # First put the walltime in seconds oarAdditionalDelay = 20;
1172 # additional delay for /bin/sleep command to
1173 # take in account prologue and epilogue scripts execution
1174 # int walltimeAdditionalDelay = 240; additional delay
1175 desired_walltime = duration
1176 total_walltime = desired_walltime + 240 #+4 min Update SA 23/10/12
1177 sleep_walltime = desired_walltime # 0 sec added Update SA 23/10/12
1179 #Put the walltime back in str form
1180 #First get the hours
1181 walltime.append(str(total_walltime / 3600))
1182 total_walltime = total_walltime - 3600 * int(walltime[0])
1183 #Get the remaining minutes
1184 walltime.append(str(total_walltime / 60))
1185 total_walltime = total_walltime - 60 * int(walltime[1])
1187 walltime.append(str(total_walltime))
1190 logger.log_exc(" __process_walltime duration null")
1192 return walltime, sleep_walltime
1195 walltime, sleep_walltime = \
1196 __process_walltime(int(lease_dict['lease_duration'])*lease_dict['grain'])
1199 reqdict['resource'] += ",walltime=" + str(walltime[0]) + \
1200 ":" + str(walltime[1]) + ":" + str(walltime[2])
1201 reqdict['script_path'] = "/bin/sleep " + str(sleep_walltime)
1203 #In case of a scheduled experiment (not immediate)
1204 #To run an XP immediately, don't specify date and time in RSpec
1205 #They will be set to None.
1206 if lease_dict['lease_start_time'] is not '0':
1207 #Readable time accepted by OAR
1208 start_time = datetime.fromtimestamp(int(lease_dict['lease_start_time'])).\
1209 strftime(lease_dict['time_format'])
1210 reqdict['reservation'] = start_time
1211 #If there is not start time, Immediate XP. No need to add special
1215 reqdict['type'] = "deploy"
1216 reqdict['directory'] = ""
1217 reqdict['name'] = "SFA_" + lease_dict['slice_user']
1221 logger.debug("SLABDRIVER.PY \tLaunchExperimentOnOAR slice_user %s\
1222 \r\n " %(slice_user))
1223 #Create the request for OAR
1224 reqdict = __create_job_structure_request_for_OAR(lease_dict)
1225 # first step : start the OAR job and update the job
1226 logger.debug("SLABDRIVER.PY \tLaunchExperimentOnOAR reqdict %s\
1229 answer = self.oar.POSTRequestToOARRestAPI('POST_job', \
1230 reqdict, slice_user)
1231 logger.debug("SLABDRIVER \tLaunchExperimentOnOAR jobid %s " %(answer))
1233 jobid = answer['id']
1235 logger.log_exc("SLABDRIVER \tLaunchExperimentOnOAR \
1236 Impossible to create job %s " %(answer))
1240 def __configure_experiment(jobid, added_nodes):
1241 # second step : configure the experiment
1242 # we need to store the nodes in a yaml (well...) file like this :
1243 # [1,56,23,14,45,75] with name /tmp/sfa<jobid>.json
1244 tmp_dir = '/tmp/sfa/'
1245 if not os.path.exists(tmp_dir):
1246 os.makedirs(tmp_dir)
1247 job_file = open(tmp_dir + str(jobid) + '.json', 'w')
1249 job_file.write(str(added_nodes[0].strip('node')))
1250 for node in added_nodes[1:len(added_nodes)] :
1251 job_file.write(', '+ node.strip('node'))
1256 def __launch_senslab_experiment(jobid):
1257 # third step : call the senslab-experiment wrapper
1258 #command= "java -jar target/sfa-1.0-jar-with-dependencies.jar
1259 # "+str(jobid)+" "+slice_user
1260 javacmdline = "/usr/bin/java"
1262 "/opt/senslabexperimentwrapper/sfa-1.0-jar-with-dependencies.jar"
1264 output = subprocess.Popen([javacmdline, "-jar", jarname, str(jobid), \
1265 slice_user],stdout=subprocess.PIPE).communicate()[0]
1267 logger.debug("SLABDRIVER \t __configure_experiment wrapper returns%s " \
1274 logger.debug("SLABDRIVER \tLaunchExperimentOnOAR jobid %s \
1275 added_nodes %s slice_user %s" %(jobid, added_nodes, slice_user))
1278 __configure_experiment(jobid, added_nodes)
1279 __launch_senslab_experiment(jobid)
1284 def AddLeases(self, hostname_list, slice_record, \
1285 lease_start_time, lease_duration):
1286 logger.debug("SLABDRIVER \r\n \r\n \t AddLeases hostname_list %s \
1287 slice_record %s lease_start_time %s lease_duration %s "\
1288 %( hostname_list, slice_record , lease_start_time, \
1291 #tmp = slice_record['reg-researchers'][0].split(".")
1292 username = slice_record['login']
1293 #username = tmp[(len(tmp)-1)]
1294 job_id = self.LaunchExperimentOnOAR(hostname_list, slice_record['hrn'], \
1295 lease_start_time, lease_duration, username)
1296 start_time = datetime.fromtimestamp(int(lease_start_time)).strftime(self.time_format)
1297 end_time = lease_start_time + lease_duration
1299 import logging, logging.handlers
1300 from sfa.util.sfalogging import _SfaLogger
1301 logger.debug("SLABDRIVER \r\n \r\n \t AddLeases TURN ON LOGGING SQL %s %s %s "%(slice_record['hrn'], job_id, end_time))
1302 sql_logger = _SfaLogger(loggername = 'sqlalchemy.engine', level=logging.DEBUG)
1303 logger.debug("SLABDRIVER \r\n \r\n \t AddLeases %s %s %s " %(type(slice_record['hrn']), type(job_id), type(end_time)))
1305 slab_ex_row = SenslabXP(slice_hrn = slice_record['hrn'], \
1306 job_id = job_id, end_time= end_time)
1308 logger.debug("SLABDRIVER \r\n \r\n \t AddLeases slab_ex_row %s" \
1310 slab_dbsession.add(slab_ex_row)
1311 slab_dbsession.commit()
1313 logger.debug("SLABDRIVER \t AddLeases hostname_list start_time %s " %(start_time))
1318 #Delete the jobs from job_senslab table
1319 def DeleteSliceFromNodes(self, slice_record):
1320 for job_id in slice_record['oar_job_id']:
1321 self.DeleteJobs(job_id, slice_record['hrn'])
1325 def GetLeaseGranularity(self):
1326 """ Returns the granularity of Senslab testbed.
1327 OAR returns seconds for experiments duration.
1329 Experiments which last less than 10 min are invalid"""
1336 def update_jobs_in_slabdb( job_oar_list, jobs_psql):
1337 #Get all the entries in slab_xp table
1340 jobs_psql = set(jobs_psql)
1341 kept_jobs = set(job_oar_list).intersection(jobs_psql)
1342 logger.debug ( "\r\n \t\ update_jobs_in_slabdb jobs_psql %s \r\n \t \
1343 job_oar_list %s kept_jobs %s "%(jobs_psql, job_oar_list, kept_jobs))
1344 deleted_jobs = set(jobs_psql).difference(kept_jobs)
1345 deleted_jobs = list(deleted_jobs)
1346 if len(deleted_jobs) > 0:
1347 slab_dbsession.query(SenslabXP).filter(SenslabXP.job_id.in_(deleted_jobs)).delete(synchronize_session='fetch')
1348 slab_dbsession.commit()
1354 def GetLeases(self, lease_filter_dict=None, login=None):
1357 unfiltered_reservation_list = self.GetReservedNodes(login)
1359 reservation_list = []
1360 #Find the slice associated with this user senslab ldap uid
1361 logger.debug(" SLABDRIVER.PY \tGetLeases login %s\
1362 unfiltered_reservation_list %s " %(login, unfiltered_reservation_list))
1363 #Create user dict first to avoid looking several times for
1364 #the same user in LDAP SA 27/07/12
1368 jobs_psql_query = slab_dbsession.query(SenslabXP).all()
1369 jobs_psql_dict = [ (row.job_id, row.__dict__ )for row in jobs_psql_query ]
1370 jobs_psql_dict = dict(jobs_psql_dict)
1371 logger.debug("SLABDRIVER \tGetLeases jobs_psql_dict %s"\
1373 jobs_psql_id_list = [ row.job_id for row in jobs_psql_query ]
1377 for resa in unfiltered_reservation_list:
1378 logger.debug("SLABDRIVER \tGetLeases USER %s"\
1380 #Cosntruct list of jobs (runing, waiting..) in oar
1381 job_oar_list.append(resa['lease_id'])
1382 #If there is information on the job in SLAB DB (slice used and job id)
1383 if resa['lease_id'] in jobs_psql_dict:
1384 job_info = jobs_psql_dict[resa['lease_id']]
1385 logger.debug("SLABDRIVER \tGetLeases resa_user_dict %s"\
1387 resa['slice_hrn'] = job_info['slice_hrn']
1388 resa['slice_id'] = hrn_to_urn(resa['slice_hrn'], 'slice')
1390 #Assume it is a senslab slice:
1392 resa['slice_id'] = hrn_to_urn(self.root_auth+'.'+ resa['user'] +"_slice" , 'slice')
1393 #if resa['user'] not in resa_user_dict:
1394 #logger.debug("SLABDRIVER \tGetLeases userNOTIN ")
1395 #ldap_info = self.ldap.LdapSearch('(uid='+resa['user']+')')
1397 #ldap_info = ldap_info[0][1]
1398 ##Get the backref :relationship table reg-researchers
1399 #user = dbsession.query(RegUser).options(joinedload('reg_slices_as_researcher')).filter_by(email = \
1400 #ldap_info['mail'][0])
1402 #user = user.first()
1403 #user = user.__dict__
1404 #slice_info = user['reg_slices_as_researcher'][0].__dict__
1405 ##Separated in case user not in database :
1406 ##record_id not defined SA 17/07//12
1408 ##query_slice_info = slab_dbsession.query(SenslabXP).filter_by(record_id_user = user.record_id)
1409 ##if query_slice_info:
1410 ##slice_info = query_slice_info.first()
1414 #resa_user_dict[resa['user']] = {}
1415 #resa_user_dict[resa['user']]['ldap_info'] = user
1416 #resa_user_dict[resa['user']]['slice_info'] = slice_info
1418 #resa['slice_hrn'] = resa_user_dict[resa['user']]['slice_info']['hrn']
1419 #resa['slice_id'] = hrn_to_urn(resa['slice_hrn'], 'slice')
1422 resa['component_id_list'] = []
1423 resa['hrn'] = Xrn(resa['slice_id']).get_hrn()
1424 #Transform the hostnames into urns (component ids)
1425 for node in resa['reserved_nodes']:
1426 #resa['component_id_list'].append(hostname_to_urn(self.hrn, \
1427 #self.root_auth, node['hostname']))
1428 slab_xrn = slab_xrn_object(self.root_auth, node)
1429 resa['component_id_list'].append(slab_xrn.urn)
1431 if lease_filter_dict:
1432 logger.debug("SLABDRIVER \tGetLeases resa_ %s \r\n leasefilter %s"\
1433 %(resa,lease_filter_dict))
1435 if lease_filter_dict['name'] == resa['hrn']:
1436 reservation_list.append(resa)
1438 if lease_filter_dict is None:
1439 reservation_list = unfiltered_reservation_list
1441 #del unfiltered_reservation_list[unfiltered_reservation_list.index(resa)]
1444 self.update_jobs_in_slabdb(job_oar_list, jobs_psql_id_list)
1446 #for resa in unfiltered_reservation_list:
1450 #if resa['user'] in resa_user_dict:
1451 #resa['slice_hrn'] = resa_user_dict[resa['user']]['slice_info']['hrn']
1452 #resa['slice_id'] = hrn_to_urn(resa['slice_hrn'], 'slice')
1454 ##resa['slice_id'] = hrn_to_urn(slice_info.slice_hrn, 'slice')
1455 #resa['component_id_list'] = []
1456 ##Transform the hostnames into urns (component ids)
1457 #for node in resa['reserved_nodes']:
1458 ##resa['component_id_list'].append(hostname_to_urn(self.hrn, \
1459 ##self.root_auth, node['hostname']))
1460 #slab_xrn = slab_xrn_object(self.root_auth, node)
1461 #resa['component_id_list'].append(slab_xrn.urn)
1463 ##Filter the reservation list if necessary
1464 ##Returns all the leases associated with a given slice
1465 #if lease_filter_dict:
1466 #logger.debug("SLABDRIVER \tGetLeases lease_filter_dict %s"\
1467 #%(lease_filter_dict))
1468 #for resa in unfiltered_reservation_list:
1469 #if lease_filter_dict['name'] == resa['slice_hrn']:
1470 #reservation_list.append(resa)
1472 #reservation_list = unfiltered_reservation_list
1474 logger.debug(" SLABDRIVER.PY \tGetLeases reservation_list %s"\
1475 %(reservation_list))
1476 return reservation_list
1478 def augment_records_with_testbed_info (self, sfa_records):
1479 return self.fill_record_info (sfa_records)
1481 def fill_record_info(self, record_list):
1483 Given a SFA record, fill in the senslab specific and SFA specific
1484 fields in the record.
1487 logger.debug("SLABDRIVER \tfill_record_info records %s " %(record_list))
1488 if not isinstance(record_list, list):
1489 record_list = [record_list]
1492 for record in record_list:
1493 #If the record is a SFA slice record, then add information
1494 #about the user of this slice. This kind of
1495 #information is in the Senslab's DB.
1496 if str(record['type']) == 'slice':
1497 if 'reg_researchers' in record and \
1498 isinstance(record['reg_researchers'], list) :
1499 record['reg_researchers'] = record['reg_researchers'][0].__dict__
1500 record.update({'PI':[record['reg_researchers']['hrn']],
1501 'researcher': [record['reg_researchers']['hrn']],
1502 'name':record['hrn'],
1505 'person_ids':[record['reg_researchers']['record_id']],
1506 'geni_urn':'', #For client_helper.py compatibility
1507 'keys':'', #For client_helper.py compatibility
1508 'key_ids':''}) #For client_helper.py compatibility
1511 #Get slab slice record.
1512 recslice_list = self.GetSlices(slice_filter = \
1513 str(record['hrn']),\
1514 slice_filter_type = 'slice_hrn')
1516 #recuser = recslice_list[0]['reg_researchers']
1517 ##recuser = dbsession.query(RegRecord).filter_by(record_id = \
1518 ##recslice_list[0]['record_id_user']).first()
1520 #record.update({'PI':[recuser['hrn']],
1521 #'researcher': [recuser['hrn']],
1522 #'name':record['hrn'],
1525 #'person_ids':[recslice_list[0]['reg_researchers']['record_id']],
1526 #'geni_urn':'', #For client_helper.py compatibility
1527 #'keys':'', #For client_helper.py compatibility
1528 #'key_ids':''}) #For client_helper.py compatibility
1529 logger.debug("SLABDRIVER \tfill_record_info \
1530 TYPE SLICE RECUSER record['hrn'] %s ecord['oar_job_id']\
1531 %s " %(record['hrn'], record['oar_job_id']))
1533 for rec in recslice_list:
1534 logger.debug("SLABDRIVER\r\n \t \t fill_record_info oar_job_id %s " %(rec['oar_job_id']))
1535 #record['oar_job_id'].append(rec['oar_job_id'])
1536 #del record['_sa_instance_state']
1537 del record['reg_researchers']
1538 record['node_ids'] = [ self.root_auth + hostname for hostname in rec['node_ids']]
1542 logger.debug( "SLABDRIVER.PY \t fill_record_info SLICE \
1543 recslice_list %s \r\n \t RECORD %s \r\n \
1544 \r\n" %(recslice_list, record))
1545 if str(record['type']) == 'user':
1546 #The record is a SFA user record.
1547 #Get the information about his slice from Senslab's DB
1548 #and add it to the user record.
1549 recslice_list = self.GetSlices(\
1550 slice_filter = record['record_id'],\
1551 slice_filter_type = 'record_id_user')
1553 logger.debug( "SLABDRIVER.PY \t fill_record_info TYPE USER \
1554 recslice_list %s \r\n \t RECORD %s \r\n" %(recslice_list , record))
1555 #Append slice record in records list,
1556 #therefore fetches user and slice info again(one more loop)
1557 #Will update PIs and researcher for the slice
1558 #recuser = dbsession.query(RegRecord).filter_by(record_id = \
1559 #recslice_list[0]['record_id_user']).first()
1560 recuser = recslice_list[0]['reg_researchers']
1561 logger.debug( "SLABDRIVER.PY \t fill_record_info USER \
1562 recuser %s \r\n \r\n" %(recuser))
1564 recslice = recslice_list[0]
1565 recslice.update({'PI':[recuser['hrn']],
1566 'researcher': [recuser['hrn']],
1567 'name':record['hrn'],
1570 'person_ids':[recuser['record_id']]})
1572 for rec in recslice_list:
1573 recslice['oar_job_id'].append(rec['oar_job_id'])
1577 recslice.update({'type':'slice', \
1578 'hrn':recslice_list[0]['hrn']})
1581 #GetPersons takes [] as filters
1582 #user_slab = self.GetPersons([{'hrn':recuser.hrn}])
1583 user_slab = self.GetPersons([record])
1586 record.update(user_slab[0])
1587 #For client_helper.py compatibility
1588 record.update( { 'geni_urn':'',
1591 record_list.append(recslice)
1593 logger.debug("SLABDRIVER.PY \tfill_record_info ADDING SLICE\
1594 INFO TO USER records %s" %(record_list))
1596 logger.debug("SLABDRIVER.PY \tfill_record_info END \
1597 record %s \r\n \r\n " %(record))
1599 except TypeError, error:
1600 logger.log_exc("SLABDRIVER \t fill_record_info EXCEPTION %s"\
1602 #logger.debug("SLABDRIVER.PY \t fill_record_info ENDENDEND ")
1606 #self.fill_record_slab_info(records)
1612 #TODO Update membership? update_membership_list SA 05/07/12
1613 #def update_membership_list(self, oldRecord, record, listName, addFunc, \
1615 ## get a list of the HRNs tht are members of the old and new records
1617 #oldList = oldRecord.get(listName, [])
1620 #newList = record.get(listName, [])
1622 ## if the lists are the same, then we don't have to update anything
1623 #if (oldList == newList):
1626 ## build a list of the new person ids, by looking up each person to get
1630 #records = table.find({'type': 'user', 'hrn': newList})
1631 #for rec in records:
1632 #newIdList.append(rec['pointer'])
1634 ## build a list of the old person ids from the person_ids field
1636 #oldIdList = oldRecord.get("person_ids", [])
1637 #containerId = oldRecord.get_pointer()
1639 ## if oldRecord==None, then we are doing a Register, instead of an
1642 #containerId = record.get_pointer()
1644 ## add people who are in the new list, but not the oldList
1645 #for personId in newIdList:
1646 #if not (personId in oldIdList):
1647 #addFunc(self.plauth, personId, containerId)
1649 ## remove people who are in the old list, but not the new list
1650 #for personId in oldIdList:
1651 #if not (personId in newIdList):
1652 #delFunc(self.plauth, personId, containerId)
1654 #def update_membership(self, oldRecord, record):
1656 #if record.type == "slice":
1657 #self.update_membership_list(oldRecord, record, 'researcher',
1658 #self.users.AddPersonToSlice,
1659 #self.users.DeletePersonFromSlice)
1660 #elif record.type == "authority":
1665 # I don't think you plan on running a component manager at this point
1666 # let me clean up the mess of ComponentAPI that is deprecated anyways
1669 #TODO FUNCTIONS SECTION 04/07/2012 SA
1671 #TODO : Is UnBindObjectFromPeer still necessary ? Currently does nothing
1674 def UnBindObjectFromPeer( auth, object_type, object_id, shortname):
1675 """ This method is a hopefully temporary hack to let the sfa correctly
1676 detach the objects it creates from a remote peer object. This is
1677 needed so that the sfa federation link can work in parallel with
1678 RefreshPeer, as RefreshPeer depends on remote objects being correctly
1681 auth : struct, API authentication structure
1682 AuthMethod : string, Authentication method to use
1683 object_type : string, Object type, among 'site','person','slice',
1685 object_id : int, object_id
1686 shortname : string, peer shortname
1690 logger.warning("SLABDRIVER \tUnBindObjectFromPeer EMPTY-\
1694 #TODO Is BindObjectToPeer still necessary ? Currently does nothing
1696 def BindObjectToPeer(self, auth, object_type, object_id, shortname=None, \
1697 remote_object_id=None):
1698 """This method is a hopefully temporary hack to let the sfa correctly
1699 attach the objects it creates to a remote peer object. This is needed
1700 so that the sfa federation link can work in parallel with RefreshPeer,
1701 as RefreshPeer depends on remote objects being correctly marked.
1703 shortname : string, peer shortname
1704 remote_object_id : int, remote object_id, set to 0 if unknown
1708 logger.warning("SLABDRIVER \tBindObjectToPeer EMPTY - DO NOTHING \r\n ")
1711 #TODO UpdateSlice 04/07/2012 SA
1712 #Funciton should delete and create another job since oin senslab slice=job
1713 def UpdateSlice(self, auth, slice_id_or_name, slice_fields=None):
1714 """Updates the parameters of an existing slice with the values in
1716 Users may only update slices of which they are members.
1717 PIs may update any of the slices at their sites, or any slices of
1718 which they are members. Admins may update any slice.
1719 Only PIs and admins may update max_nodes. Slices cannot be renewed
1720 (by updating the expires parameter) more than 8 weeks into the future.
1721 Returns 1 if successful, faults otherwise.
1725 logger.warning("SLABDRIVER UpdateSlice EMPTY - DO NOTHING \r\n ")
1728 #TODO UpdatePerson 04/07/2012 SA
1729 def UpdatePerson(self, slab_hrn, federated_hrn, person_fields=None):
1730 """Updates a person. Only the fields specified in person_fields
1731 are updated, all other fields are left untouched.
1732 Users and techs can only update themselves. PIs can only update
1733 themselves and other non-PIs at their sites.
1734 Returns 1 if successful, faults otherwise.
1738 #new_row = FederatedToSenslab(slab_hrn, federated_hrn)
1739 #slab_dbsession.add(new_row)
1740 #slab_dbsession.commit()
1742 logger.debug("SLABDRIVER UpdatePerson EMPTY - DO NOTHING \r\n ")
1745 #TODO GetKeys 04/07/2012 SA
1746 def GetKeys(self, auth, key_filter=None, return_fields=None):
1747 """Returns an array of structs containing details about keys.
1748 If key_filter is specified and is an array of key identifiers,
1749 or a struct of key attributes, only keys matching the filter
1750 will be returned. If return_fields is specified, only the
1751 specified details will be returned.
1753 Admin may query all keys. Non-admins may only query their own keys.
1757 logger.warning("SLABDRIVER GetKeys EMPTY - DO NOTHING \r\n ")
1760 #TODO DeleteKey 04/07/2012 SA
1761 def DeleteKey(self, key_id):
1763 Non-admins may only delete their own keys.
1764 Returns 1 if successful, faults otherwise.
1768 logger.warning("SLABDRIVER DeleteKey EMPTY - DO NOTHING \r\n ")
1772 #TODO : Check rights to delete person
1773 def DeletePerson(self, person_record):
1774 """ Disable an existing account in senslab LDAP.
1775 Users and techs can only delete themselves. PIs can only
1776 delete themselves and other non-PIs at their sites.
1777 ins can delete anyone.
1778 Returns 1 if successful, faults otherwise.
1782 #Disable user account in senslab LDAP
1783 ret = self.ldap.LdapMarkUserAsDeleted(person_record)
1784 logger.warning("SLABDRIVER DeletePerson %s " %(person_record))
1787 #TODO Check DeleteSlice, check rights 05/07/2012 SA
1788 def DeleteSlice(self, slice_record):
1789 """ Deletes the specified slice.
1790 Senslab : Kill the job associated with the slice if there is one
1791 using DeleteSliceFromNodes.
1792 Updates the slice record in slab db to remove the slice nodes.
1794 Users may only delete slices of which they are members. PIs may
1795 delete any of the slices at their sites, or any slices of which
1796 they are members. Admins may delete any slice.
1797 Returns 1 if successful, faults otherwise.
1801 self.DeleteSliceFromNodes(slice_record)
1802 logger.warning("SLABDRIVER DeleteSlice %s "%(slice_record))
1805 def __add_person_to_db(self, user_dict):
1807 check_if_exists = dbsession.query(RegUser).filter_by(email = user_dict['email']).first()
1808 #user doesn't exists
1809 if not check_if_exists:
1810 logger.debug("__add_person_to_db \t Adding %s \r\n \r\n \
1811 _________________________________________________________________________\
1812 " %(user_dict['hrn']))
1813 user_record = RegUser(hrn =user_dict['hrn'] , pointer= '-1', authority=get_authority(hrn), \
1814 email= user_dict['email'], gid = None)
1815 user_record.reg_keys = [RegKey(user_dict['pkey'])]
1816 user_record.just_created()
1817 dbsession.add (user_record)
1821 #TODO AddPerson 04/07/2012 SA
1822 #def AddPerson(self, auth, person_fields=None):
1823 def AddPerson(self, record):#TODO fixing 28/08//2012 SA
1824 """Adds a new account. Any fields specified in records are used,
1825 otherwise defaults are used.
1826 Accounts are disabled by default. To enable an account,
1828 Returns the new person_id (> 0) if successful, faults otherwise.
1832 ret = self.ldap.LdapAddUser(record)
1833 logger.debug("SLABDRIVER AddPerson return code %s \r\n "%(ret))
1834 self.__add_person_to_db(record)
1837 #TODO AddPersonToSite 04/07/2012 SA
1838 def AddPersonToSite (self, auth, person_id_or_email, \
1839 site_id_or_login_base=None):
1840 """ Adds the specified person to the specified site. If the person is
1841 already a member of the site, no errors are returned. Does not change
1842 the person's primary site.
1843 Returns 1 if successful, faults otherwise.
1847 logger.warning("SLABDRIVER AddPersonToSite EMPTY - DO NOTHING \r\n ")
1850 #TODO AddRoleToPerson : Not sure if needed in senslab 04/07/2012 SA
1851 def AddRoleToPerson(self, auth, role_id_or_name, person_id_or_email):
1852 """Grants the specified role to the person.
1853 PIs can only grant the tech and user roles to users and techs at their
1854 sites. Admins can grant any role to any user.
1855 Returns 1 if successful, faults otherwise.
1859 logger.warning("SLABDRIVER AddRoleToPerson EMPTY - DO NOTHING \r\n ")
1862 #TODO AddPersonKey 04/07/2012 SA
1863 def AddPersonKey(self, auth, person_id_or_email, key_fields=None):
1864 """Adds a new key to the specified account.
1865 Non-admins can only modify their own keys.
1866 Returns the new key_id (> 0) if successful, faults otherwise.
1870 logger.warning("SLABDRIVER AddPersonKey EMPTY - DO NOTHING \r\n ")
1873 def DeleteLeases(self, leases_id_list, slice_hrn ):
1874 logger.debug("SLABDRIVER DeleteLeases leases_id_list %s slice_hrn %s \
1875 \r\n " %(leases_id_list, slice_hrn))
1876 for job_id in leases_id_list:
1877 self.DeleteJobs(job_id, slice_hrn)