4 from datetime import datetime
6 from sfa.util.faults import SliverDoesNotExist, UnknownSfaType
7 from sfa.util.sfalogging import logger
8 from sfa.storage.alchemy import dbsession
9 from sfa.storage.model import RegRecord, RegUser, RegSlice
10 from sqlalchemy.orm import joinedload
13 from sfa.managers.driver import Driver
14 from sfa.rspecs.version_manager import VersionManager
15 from sfa.rspecs.rspec import RSpec
17 from sfa.util.xrn import Xrn, hrn_to_urn, get_authority
20 ## thierry: everything that is API-related (i.e. handling incoming requests)
22 # SlabDriver should be really only about talking to the senslab testbed
25 from sfa.senslab.OARrestapi import OARrestapi
26 from sfa.senslab.LDAPapi import LDAPapi
28 from sfa.senslab.slabpostgres import SlabDB, slab_dbsession, SenslabXP
31 from sfa.senslab.slabaggregate import SlabAggregate, slab_xrn_to_hostname, \
33 from sfa.senslab.slabslices import SlabSlices
38 # this inheritance scheme is so that the driver object can receive
39 # GetNodes or GetSites sorts of calls directly
40 # and thus minimize the differences in the managers with the pl version
44 class SlabDriver(Driver):
45 """ Senslab Driver class inherited from Driver generic class.
47 Contains methods compliant with the SFA standard and the testbed
48 infrastructure (calls to LDAP and OAR).
50 def __init__(self, config):
51 Driver.__init__ (self, config)
53 self.hrn = config.SFA_INTERFACE_HRN
54 self.root_auth = config.SFA_REGISTRY_ROOT_AUTH
55 self.oar = OARrestapi()
57 self.time_format = "%Y-%m-%d %H:%M:%S"
58 self.db = SlabDB(config, debug = False)
62 def sliver_status(self, slice_urn, slice_hrn):
63 """Receive a status request for slice named urn/hrn
64 urn:publicid:IDN+senslab+nturro_slice hrn senslab.nturro_slice
65 shall return a structure as described in
66 http://groups.geni.net/geni/wiki/GAPI_AM_API_V2#SliverStatus
67 NT : not sure if we should implement this or not, but used by sface.
71 #First get the slice with the slice hrn
72 slice_list = self.GetSlices(slice_filter = slice_hrn, \
73 slice_filter_type = 'slice_hrn')
75 if len(slice_list) is 0:
76 raise SliverDoesNotExist("%s slice_hrn" % (slice_hrn))
78 #Slice has the same slice hrn for each slice in the slice/lease list
79 #So fetch the info on the user once
80 one_slice = slice_list[0]
81 #recuser = dbsession.query(RegRecord).filter_by(record_id = \
82 #one_slice['record_id_user']).first()
84 #Make a list of all the nodes hostnames in use for this slice
86 for single_slice in slice_list:
87 for node in single_slice['node_ids']:
88 slice_nodes_list.append(node['hostname'])
90 #Get all the corresponding nodes details
91 nodes_all = self.GetNodes({'hostname':slice_nodes_list},
92 ['node_id', 'hostname','site','boot_state'])
93 nodeall_byhostname = dict([(one_node['hostname'], one_node) \
94 for one_node in nodes_all])
98 for single_slice in slice_list:
101 top_level_status = 'empty'
104 ['geni_urn','pl_login','geni_status','geni_resources'], None)
105 result['pl_login'] = one_slice['reg_researchers']['hrn']
106 logger.debug("Slabdriver - sliver_status Sliver status \
107 urn %s hrn %s single_slice %s \r\n " \
108 %(slice_urn, slice_hrn, single_slice))
110 nodes_in_slice = single_slice['node_ids']
113 result['geni_status'] = top_level_status
114 result['geni_resources'] = []
117 top_level_status = 'ready'
119 #A job is running on Senslab for this slice
120 # report about the local nodes that are in the slice only
122 result['geni_urn'] = slice_urn
126 #timestamp = float(sl['startTime']) + float(sl['walltime'])
127 #result['pl_expires'] = strftime(self.time_format, \
128 #gmtime(float(timestamp)))
129 #result['slab_expires'] = strftime(self.time_format,\
130 #gmtime(float(timestamp)))
133 for node in single_slice['node_ids']:
135 #res['slab_hostname'] = node['hostname']
136 #res['slab_boot_state'] = node['boot_state']
138 res['pl_hostname'] = node['hostname']
139 res['pl_boot_state'] = \
140 nodeall_byhostname[node['hostname']]['boot_state']
141 #res['pl_last_contact'] = strftime(self.time_format, \
142 #gmtime(float(timestamp)))
143 sliver_id = Xrn(slice_urn, type='slice', \
144 id=nodeall_byhostname[node['hostname']]['node_id'], \
145 authority=self.hrn).urn
147 res['geni_urn'] = sliver_id
148 node_name = node['hostname']
149 if nodeall_byhostname[node_name]['boot_state'] == 'Alive':
151 res['geni_status'] = 'ready'
153 res['geni_status'] = 'failed'
154 top_level_status = 'failed'
156 res['geni_error'] = ''
158 resources.append(res)
160 result['geni_status'] = top_level_status
161 result['geni_resources'] = resources
162 logger.debug("SLABDRIVER \tsliver_statusresources %s res %s "\
166 def get_user(self, hrn):
167 return dbsession.query(RegRecord).filter_by(hrn = hrn).first()
170 def create_sliver (self, slice_urn, slice_hrn, creds, rspec_string, \
172 aggregate = SlabAggregate(self)
174 slices = SlabSlices(self)
175 peer = slices.get_peer(slice_hrn)
176 sfa_peer = slices.get_sfa_peer(slice_hrn)
179 if not isinstance(creds, list):
183 slice_record = users[0].get('slice_record', {})
184 logger.debug("SLABDRIVER.PY \t ===============create_sliver \t\
185 creds %s \r\n \r\n users %s" \
187 slice_record['user'] = {'keys':users[0]['keys'], \
188 'email':users[0]['email'], \
189 'hrn':slice_record['reg-researchers'][0]}
191 rspec = RSpec(rspec_string)
192 logger.debug("SLABDRIVER.PY \t create_sliver \trspec.version \
193 %s slice_record %s users %s" \
194 %(rspec.version,slice_record, users))
197 # ensure site record exists?
198 # ensure slice record exists
199 #Removed options to verify_slice SA 14/08/12
200 sfa_slice = slices.verify_slice(slice_hrn, slice_record, peer, \
203 # ensure person records exists
204 #verify_persons returns added persons but since the return value
206 slices.verify_persons(slice_hrn, sfa_slice, users, peer, \
207 sfa_peer, options=options)
208 #requested_attributes returned by rspec.version.get_slice_attributes()
209 #unused, removed SA 13/08/12
210 rspec.version.get_slice_attributes()
212 logger.debug("SLABDRIVER.PY create_sliver slice %s " %(sfa_slice))
214 # add/remove slice from nodes
216 requested_slivers = [node.get('component_id') \
217 for node in rspec.version.get_nodes_with_slivers()\
218 if node.get('authority_id') is self.root_auth]
219 l = [ node for node in rspec.version.get_nodes_with_slivers() ]
220 logger.debug("SLADRIVER \tcreate_sliver requested_slivers \
221 requested_slivers %s listnodes %s" \
222 %(requested_slivers,l))
223 #verify_slice_nodes returns nodes, but unused here. Removed SA 13/08/12.
224 #slices.verify_slice_nodes(sfa_slice, requested_slivers, peer)
227 requested_lease_list = []
229 logger.debug("SLABDRIVER.PY \tcreate_sliver AVANTLEASE " )
230 rspec_requested_leases = rspec.version.get_leases()
231 for lease in rspec.version.get_leases():
232 single_requested_lease = {}
233 logger.debug("SLABDRIVER.PY \tcreate_sliver lease %s " %(lease))
235 if not lease.get('lease_id'):
236 if get_authority(lease['component_id']) == self.root_auth:
237 single_requested_lease['hostname'] = \
238 slab_xrn_to_hostname(\
239 lease.get('component_id').strip())
240 single_requested_lease['start_time'] = \
241 lease.get('start_time')
242 single_requested_lease['duration'] = lease.get('duration')
243 #Check the experiment's duration is valid before adding
244 #the lease to the requested leases list
245 duration_in_seconds = \
246 int(single_requested_lease['duration'])*60
247 if duration_in_seconds > self.GetLeaseGranularity():
248 requested_lease_list.append(single_requested_lease)
250 #Create dict of leases by start_time, regrouping nodes reserved
252 #time, for the same amount of time = one job on OAR
253 requested_job_dict = {}
254 for lease in requested_lease_list:
256 #In case it is an asap experiment start_time is empty
257 if lease['start_time'] == '':
258 lease['start_time'] = '0'
260 if lease['start_time'] not in requested_job_dict:
261 if isinstance(lease['hostname'], str):
262 lease['hostname'] = [lease['hostname']]
264 requested_job_dict[lease['start_time']] = lease
267 job_lease = requested_job_dict[lease['start_time']]
268 if lease['duration'] == job_lease['duration'] :
269 job_lease['hostname'].append(lease['hostname'])
274 logger.debug("SLABDRIVER.PY \tcreate_sliver requested_job_dict %s "\
275 %(requested_job_dict))
276 #verify_slice_leases returns the leases , but the return value is unused
277 #here. Removed SA 13/08/12
278 slices.verify_slice_leases(sfa_slice, \
279 requested_job_dict, peer)
281 return aggregate.get_rspec(slice_xrn=slice_urn, login=sfa_slice['login'],version=rspec.version)
284 def delete_sliver (self, slice_urn, slice_hrn, creds, options):
286 sfa_slice_list = self.GetSlices(slice_filter = slice_hrn, \
287 slice_filter_type = 'slice_hrn')
289 if not sfa_slice_list:
292 #Delete all in the slice
293 for sfa_slice in sfa_slice_list:
296 logger.debug("SLABDRIVER.PY delete_sliver slice %s" %(sfa_slice))
297 slices = SlabSlices(self)
298 # determine if this is a peer slice
300 peer = slices.get_peer(slice_hrn)
301 #TODO delete_sliver SA : UnBindObjectFromPeer should be
302 #used when there is another
303 #senslab testbed, which is not the case 14/08/12 .
305 logger.debug("SLABDRIVER.PY delete_sliver peer %s" %(peer))
308 self.UnBindObjectFromPeer('slice', \
309 sfa_slice['record_id_slice'], \
311 self.DeleteSliceFromNodes(sfa_slice)
314 self.BindObjectToPeer('slice', \
315 sfa_slice['record_id_slice'], \
316 peer, sfa_slice['peer_slice_id'])
320 def AddSlice(self, slice_record, user_record):
321 """Add slice to the sfa tables and senslab table only if the user
322 already exists in senslab database(user already registered in LDAP).
323 There is no way to separate adding the slice to the tesbed
324 and then importing it from the testbed to SFA because of
325 senslab's architecture. Therefore, sfa tables are updated here.
328 sfa_record = RegSlice(hrn=slice_record['slice_hrn'],
329 gid=slice_record['gid'],
330 pointer=slice_record['slice_id'],
331 authority=slice_record['authority'])
333 logger.debug("SLABDRIVER.PY AddSlice sfa_record %s user_record %s" \
334 %(sfa_record, user_record))
335 sfa_record.just_created()
336 dbsession.add(sfa_record)
338 #Update the reg-researcher dependance table
339 sfa_record.reg_researchers = [user_record]
342 #Update the senslab table with the new slice
343 #slab_slice = SenslabXP( slice_hrn = slice_record['slice_hrn'], \
344 #record_id_slice = sfa_record.record_id , \
345 #record_id_user = slice_record['record_id_user'], \
346 #peer_authority = slice_record['peer_authority'])
348 #logger.debug("SLABDRIVER.PY \tAddSlice slice_record %s \
349 #slab_slice %s sfa_record %s" \
350 #%(slice_record,slab_slice, sfa_record))
351 #slab_dbsession.add(slab_slice)
352 #slab_dbsession.commit()
355 # first 2 args are None in case of resource discovery
356 def list_resources (self, slice_urn, slice_hrn, creds, options):
357 #cached_requested = options.get('cached', True)
359 version_manager = VersionManager()
360 # get the rspec's return format from options
362 version_manager.get_version(options.get('geni_rspec_version'))
363 version_string = "rspec_%s" % (rspec_version)
365 #panos adding the info option to the caching key (can be improved)
366 if options.get('info'):
367 version_string = version_string + "_" + \
368 options.get('info', 'default')
370 # Adding the list_leases option to the caching key
371 if options.get('list_leases'):
372 version_string = version_string + "_"+options.get('list_leases', 'default')
374 # Adding geni_available to caching key
375 if options.get('geni_available'):
376 version_string = version_string + "_" + str(options.get('geni_available'))
378 # look in cache first
379 #if cached_requested and self.cache and not slice_hrn:
380 #rspec = self.cache.get(version_string)
382 #logger.debug("SlabDriver.ListResources: \
383 #returning cached advertisement")
386 #panos: passing user-defined options
387 aggregate = SlabAggregate(self)
388 #origin_hrn = Credential(string=creds[0]).get_gid_caller().get_hrn()
389 #options.update({'origin_hrn':origin_hrn})
390 rspec = aggregate.get_rspec(slice_xrn=slice_urn, \
391 version=rspec_version, options=options)
394 #if self.cache and not slice_hrn:
395 #logger.debug("Slab.ListResources: stores advertisement in cache")
396 #self.cache.add(version_string, rspec)
401 def list_slices (self, creds, options):
402 # look in cache first
404 #slices = self.cache.get('slices')
406 #logger.debug("PlDriver.list_slices returns from cache")
411 slices = self.GetSlices()
412 logger.debug("SLABDRIVER.PY \tlist_slices hrn %s \r\n \r\n" %(slices))
413 slice_hrns = [slab_slice['hrn'] for slab_slice in slices]
415 slice_urns = [hrn_to_urn(slice_hrn, 'slice') \
416 for slice_hrn in slice_hrns]
420 #logger.debug ("SlabDriver.list_slices stores value in cache")
421 #self.cache.add('slices', slice_urns)
426 def register (self, sfa_record, hrn, pub_key):
428 Adding new user, slice, node or site should not be handled
432 Adding users = LDAP Senslab
433 Adding slice = Import from LDAP users
439 def update (self, old_sfa_record, new_sfa_record, hrn, new_key):
440 """No site or node record update allowed in Senslab."""
442 pointer = old_sfa_record['pointer']
443 old_sfa_record_type = old_sfa_record['type']
445 # new_key implemented for users only
446 if new_key and old_sfa_record_type not in [ 'user' ]:
447 raise UnknownSfaType(old_sfa_record_type)
449 #if (type == "authority"):
450 #self.shell.UpdateSite(pointer, new_sfa_record)
452 if old_sfa_record_type == "slice":
453 slab_record = self.sfa_fields_to_slab_fields(old_sfa_record_type, \
455 if 'name' in slab_record:
456 slab_record.pop('name')
457 #Prototype should be UpdateSlice(self,
458 #auth, slice_id_or_name, slice_fields)
459 #Senslab cannot update slice since slice = job
460 #so we must delete and create another job
461 self.UpdateSlice(pointer, slab_record)
463 elif old_sfa_record_type == "user":
465 all_fields = new_sfa_record
466 for key in all_fields.keys():
467 if key in ['first_name', 'last_name', 'title', 'email',
468 'password', 'phone', 'url', 'bio', 'accepted_aup',
470 update_fields[key] = all_fields[key]
471 self.UpdatePerson(pointer, update_fields)
474 # must check this key against the previous one if it exists
475 persons = self.GetPersons([pointer], ['key_ids'])
477 keys = person['key_ids']
478 keys = self.GetKeys(person['key_ids'])
480 # Delete all stale keys
483 if new_key != key['key']:
484 self.DeleteKey(key['key_id'])
488 self.AddPersonKey(pointer, {'key_type': 'ssh', \
495 def remove (self, sfa_record):
496 sfa_record_type = sfa_record['type']
497 hrn = sfa_record['hrn']
498 if sfa_record_type == 'user':
500 #get user from senslab ldap
501 person = self.GetPersons(sfa_record)
502 #No registering at a given site in Senslab.
503 #Once registered to the LDAP, all senslab sites are
506 #Mark account as disabled in ldap
507 self.DeletePerson(sfa_record)
508 elif sfa_record_type == 'slice':
509 if self.GetSlices(slice_filter = hrn, \
510 slice_filter_type = 'slice_hrn'):
511 self.DeleteSlice(sfa_record)
513 #elif type == 'authority':
514 #if self.GetSites(pointer):
515 #self.DeleteSite(pointer)
521 #TODO clean GetPeers. 05/07/12SA
522 def GetPeers (self, auth = None, peer_filter=None, return_fields_list=None):
524 existing_records = {}
525 existing_hrns_by_types = {}
526 logger.debug("SLABDRIVER \tGetPeers auth = %s, peer_filter %s, \
527 return_field %s " %(auth , peer_filter, return_fields_list))
528 all_records = dbsession.query(RegRecord).filter(RegRecord.type.like('%authority%')).all()
530 for record in all_records:
531 existing_records[(record.hrn, record.type)] = record
532 if record.type not in existing_hrns_by_types:
533 existing_hrns_by_types[record.type] = [record.hrn]
535 existing_hrns_by_types[record.type].append(record.hrn)
538 logger.debug("SLABDRIVER \tGetPeer\texisting_hrns_by_types %s "\
539 %( existing_hrns_by_types))
544 records_list.append(existing_records[(peer_filter,'authority')])
546 for hrn in existing_hrns_by_types['authority']:
547 records_list.append(existing_records[(hrn,'authority')])
549 logger.debug("SLABDRIVER \tGetPeer \trecords_list %s " \
555 return_records = records_list
556 if not peer_filter and not return_fields_list:
560 logger.debug("SLABDRIVER \tGetPeer return_records %s " \
562 return return_records
565 #TODO : Handling OR request in make_ldap_filters_from_records
566 #instead of the for loop
567 #over the records' list
568 def GetPersons(self, person_filter=None):
570 person_filter should be a list of dictionnaries when not set to None.
571 Returns a list of users whose accounts are enabled found in ldap.
574 logger.debug("SLABDRIVER \tGetPersons person_filter %s" \
577 if person_filter and isinstance(person_filter, list):
578 #If we are looking for a list of users (list of dict records)
579 #Usually the list contains only one user record
580 for searched_attributes in person_filter:
582 #Get only enabled user accounts in senslab LDAP :
583 #add a filter for make_ldap_filters_from_record
584 person = self.ldap.LdapFindUser(searched_attributes, \
585 is_user_enabled=True)
586 #If a person was found, append it to the list
588 person_list.append(person)
590 #If the list is empty, return None
591 if len(person_list) is 0:
595 #Get only enabled user accounts in senslab LDAP :
596 #add a filter for make_ldap_filters_from_record
597 person_list = self.ldap.LdapFindUser(is_user_enabled=True)
601 def GetTimezone(self):
602 """ Get the OAR servier time and timezone.
603 Unused SA 16/11/12"""
604 server_timestamp, server_tz = self.oar.parser.\
605 SendRequest("GET_timezone")
606 return server_timestamp, server_tz
609 def DeleteJobs(self, job_id, slice_hrn):
610 if not job_id or job_id is -1:
612 username = slice_hrn.split(".")[-1].rstrip("_slice")
614 reqdict['method'] = "delete"
615 reqdict['strval'] = str(job_id)
618 answer = self.oar.POSTRequestToOARRestAPI('DELETE_jobs_id', \
620 logger.debug("SLABDRIVER \tDeleteJobs jobid %s \r\n answer %s \
621 username %s" %(job_id,answer, username))
626 ##TODO : Unused GetJobsId ? SA 05/07/12
627 #def GetJobsId(self, job_id, username = None ):
629 #Details about a specific job.
630 #Includes details about submission time, jot type, state, events,
631 #owner, assigned ressources, walltime etc...
635 #node_list_k = 'assigned_network_address'
636 ##Get job info from OAR
637 #job_info = self.oar.parser.SendRequest(req, job_id, username)
639 #logger.debug("SLABDRIVER \t GetJobsId %s " %(job_info))
641 #if job_info['state'] == 'Terminated':
642 #logger.debug("SLABDRIVER \t GetJobsId job %s TERMINATED"\
645 #if job_info['state'] == 'Error':
646 #logger.debug("SLABDRIVER \t GetJobsId ERROR message %s "\
651 #logger.error("SLABDRIVER \tGetJobsId KeyError")
654 #parsed_job_info = self.get_info_on_reserved_nodes(job_info, \
656 ##Replaces the previous entry
657 ##"assigned_network_address" / "reserved_resources"
659 #job_info.update({'node_ids':parsed_job_info[node_list_k]})
660 #del job_info[node_list_k]
661 #logger.debug(" \r\nSLABDRIVER \t GetJobsId job_info %s " %(job_info))
665 def GetJobsResources(self, job_id, username = None):
666 #job_resources=['reserved_resources', 'assigned_resources',\
667 #'job_id', 'job_uri', 'assigned_nodes',\
669 #assigned_res = ['resource_id', 'resource_uri']
670 #assigned_n = ['node', 'node_uri']
672 req = "GET_jobs_id_resources"
675 #Get job resources list from OAR
676 node_id_list = self.oar.parser.SendRequest(req, job_id, username)
677 logger.debug("SLABDRIVER \t GetJobsResources %s " %(node_id_list))
680 self.__get_hostnames_from_oar_node_ids(node_id_list)
683 #Replaces the previous entry "assigned_network_address" /
684 #"reserved_resources"
686 job_info = {'node_ids': hostname_list}
691 def get_info_on_reserved_nodes(self, job_info, node_list_name):
692 #Get the list of the testbed nodes records and make a
693 #dictionnary keyed on the hostname out of it
694 node_list_dict = self.GetNodes()
695 #node_hostname_list = []
696 node_hostname_list = [node['hostname'] for node in node_list_dict]
697 #for node in node_list_dict:
698 #node_hostname_list.append(node['hostname'])
699 node_dict = dict(zip(node_hostname_list, node_list_dict))
701 reserved_node_hostname_list = []
702 for index in range(len(job_info[node_list_name])):
703 #job_info[node_list_name][k] =
704 reserved_node_hostname_list[index] = \
705 node_dict[job_info[node_list_name][index]]['hostname']
707 logger.debug("SLABDRIVER \t get_info_on_reserved_nodes \
708 reserved_node_hostname_list %s" \
709 %(reserved_node_hostname_list))
711 logger.error("SLABDRIVER \t get_info_on_reserved_nodes KEYERROR " )
713 return reserved_node_hostname_list
715 def GetNodesCurrentlyInUse(self):
716 """Returns a list of all the nodes already involved in an oar job"""
717 return self.oar.parser.SendRequest("GET_running_jobs")
719 def __get_hostnames_from_oar_node_ids(self, resource_id_list ):
720 full_nodes_dict_list = self.GetNodes()
721 #Put the full node list into a dictionary keyed by oar node id
722 oar_id_node_dict = {}
723 for node in full_nodes_dict_list:
724 oar_id_node_dict[node['oar_id']] = node
726 #logger.debug("SLABDRIVER \t __get_hostnames_from_oar_node_ids\
727 #oar_id_node_dict %s" %(oar_id_node_dict))
729 hostname_dict_list = []
730 for resource_id in resource_id_list:
731 #Because jobs requested "asap" do not have defined resources
732 if resource_id is not "Undefined":
733 hostname_dict_list.append(\
734 oar_id_node_dict[resource_id]['hostname'])
736 #hostname_list.append(oar_id_node_dict[resource_id]['hostname'])
737 return hostname_dict_list
739 def GetReservedNodes(self,username = None):
740 #Get the nodes in use and the reserved nodes
741 reservation_dict_list = \
742 self.oar.parser.SendRequest("GET_reserved_nodes", \
746 for resa in reservation_dict_list:
747 logger.debug ("GetReservedNodes resa %s"%(resa))
748 #dict list of hostnames and their site
749 resa['reserved_nodes'] = \
750 self.__get_hostnames_from_oar_node_ids(resa['resource_ids'])
752 #del resa['resource_ids']
753 return reservation_dict_list
755 def GetNodes(self, node_filter_dict = None, return_fields_list = None):
757 node_filter_dict : dictionnary of lists
760 node_dict_by_id = self.oar.parser.SendRequest("GET_resources_full")
761 node_dict_list = node_dict_by_id.values()
762 logger.debug (" SLABDRIVER GetNodes node_filter_dict %s \
763 return_fields_list %s "%(node_filter_dict, return_fields_list))
764 #No filtering needed return the list directly
765 if not (node_filter_dict or return_fields_list):
766 return node_dict_list
768 return_node_list = []
770 for filter_key in node_filter_dict:
772 #Filter the node_dict_list by each value contained in the
773 #list node_filter_dict[filter_key]
774 for value in node_filter_dict[filter_key]:
775 for node in node_dict_list:
776 if node[filter_key] == value:
777 if return_fields_list :
779 for k in return_fields_list:
781 return_node_list.append(tmp)
783 return_node_list.append(node)
785 logger.log_exc("GetNodes KeyError")
789 return return_node_list
792 def GetSites(self, site_filter_name_list = None, return_fields_list = None):
793 site_dict = self.oar.parser.SendRequest("GET_sites")
794 #site_dict : dict where the key is the sit ename
795 return_site_list = []
796 if not ( site_filter_name_list or return_fields_list):
797 return_site_list = site_dict.values()
798 return return_site_list
800 for site_filter_name in site_filter_name_list:
801 if site_filter_name in site_dict:
802 if return_fields_list:
803 for field in return_fields_list:
806 tmp[field] = site_dict[site_filter_name][field]
808 logger.error("GetSites KeyError %s "%(field))
810 return_site_list.append(tmp)
812 return_site_list.append( site_dict[site_filter_name])
815 return return_site_list
818 def _sql_get_slice_info( self, slice_filter ):
819 #DO NOT USE RegSlice - reg_researchers to get the hrn
820 #of the user otherwise will mess up the RegRecord in
821 #Resolve, don't know why - SA 08/08/2012
823 #Only one entry for one user = one slice in slab_xp table
824 #slicerec = dbsession.query(RegRecord).filter_by(hrn = slice_filter).first()
825 raw_slicerec = dbsession.query(RegSlice).options(joinedload('reg_researchers')).filter_by(hrn = slice_filter).first()
826 #raw_slicerec = dbsession.query(RegRecord).filter_by(hrn = slice_filter).first()
829 #raw_slicerec.reg_researchers
830 raw_slicerec = raw_slicerec.__dict__
831 logger.debug(" SLABDRIVER \t get_slice_info slice_filter %s raw_slicerec %s"%(slice_filter,raw_slicerec))
832 slicerec = raw_slicerec
833 #only one researcher per slice so take the first one
834 #slicerec['reg_researchers'] = raw_slicerec['reg_researchers']
835 #del slicerec['reg_researchers']['_sa_instance_state']
842 def _sql_get_slice_info_from_user( self, slice_filter ):
843 #slicerec = dbsession.query(RegRecord).filter_by(record_id = slice_filter).first()
844 raw_slicerec = dbsession.query(RegUser).options(joinedload('reg_slices_as_researcher')).filter_by(record_id = slice_filter).first()
845 #raw_slicerec = dbsession.query(RegRecord).filter_by(record_id = slice_filter).first()
846 #Put it in correct order
847 user_needed_fields = ['peer_authority', 'hrn', 'last_updated', 'classtype', 'authority', 'gid', 'record_id', 'date_created', 'type', 'email', 'pointer']
848 slice_needed_fields = ['peer_authority', 'hrn', 'last_updated', 'classtype', 'authority', 'gid', 'record_id', 'date_created', 'type', 'pointer']
850 #raw_slicerec.reg_slices_as_researcher
851 raw_slicerec = raw_slicerec.__dict__
853 slicerec = dict([(k,raw_slicerec['reg_slices_as_researcher'][0].__dict__[k]) for k in slice_needed_fields])
854 slicerec['reg_researchers'] = dict([(k, raw_slicerec[k]) for k in user_needed_fields])
855 #TODO Handle multiple slices for one user SA 10/12/12
856 #for now only take the first slice record associated to the rec user
857 ##slicerec = raw_slicerec['reg_slices_as_researcher'][0].__dict__
858 #del raw_slicerec['reg_slices_as_researcher']
859 #slicerec['reg_researchers'] = raw_slicerec
860 ##del slicerec['_sa_instance_state']
867 def _get_slice_records(self, slice_filter = None, \
868 slice_filter_type = None):
872 #Get list of slices based on the slice hrn
873 if slice_filter_type == 'slice_hrn':
875 #if get_authority(slice_filter) == self.root_auth:
876 #login = slice_filter.split(".")[1].split("_")[0]
878 slicerec = self._sql_get_slice_info(slice_filter)
884 #Get slice based on user id
885 if slice_filter_type == 'record_id_user':
887 slicerec = self._sql_get_slice_info_from_user(slice_filter)
890 fixed_slicerec_dict = slicerec
891 #At this point if the there is no login it means
892 #record_id_user filter has been used for filtering
894 ##If theslice record is from senslab
895 #if fixed_slicerec_dict['peer_authority'] is None:
896 #login = fixed_slicerec_dict['hrn'].split(".")[1].split("_")[0]
897 #return login, fixed_slicerec_dict
898 return fixed_slicerec_dict
900 def GetSlices(self, slice_filter = None, slice_filter_type = None, login=None):
901 """ Get the slice records from the slab db.
902 Returns a slice ditc if slice_filter and slice_filter_type
904 Returns a list of slice dictionnaries if there are no filters
909 authorized_filter_types_list = ['slice_hrn', 'record_id_user']
910 return_slicerec_dictlist = []
912 #First try to get information on the slice based on the filter provided
913 if slice_filter_type in authorized_filter_types_list:
914 fixed_slicerec_dict = \
915 self._get_slice_records(slice_filter, slice_filter_type)
916 #login, fixed_slicerec_dict = \
917 #self._get_slice_records(slice_filter, slice_filter_type)
918 logger.debug(" SLABDRIVER \tGetSlices login %s \
919 slice record %s slice_filter %s slice_filter_type %s "\
920 %(login, fixed_slicerec_dict,slice_filter, slice_filter_type))
923 #Now we have the slice record fixed_slicerec_dict, get the
924 #jobs associated to this slice
925 #leases_list = self.GetReservedNodes(username = login)
926 leases_list = self.GetLeases(login = login)
927 #If no job is running or no job scheduled
928 #return only the slice record
929 if leases_list == [] and fixed_slicerec_dict:
930 return_slicerec_dictlist.append(fixed_slicerec_dict)
932 #If several jobs for one slice , put the slice record into
933 # each lease information dict
934 for lease in leases_list :
937 reserved_list = lease['reserved_nodes']
939 slicerec_dict['oar_job_id']= lease['lease_id']
940 slicerec_dict.update({'list_node_ids':{'hostname':reserved_list}})
941 slicerec_dict.update({'node_ids':lease['reserved_nodes']})
943 #Update lease dict with the slice record
944 if fixed_slicerec_dict:
945 fixed_slicerec_dict['oar_job_id'] = []
946 fixed_slicerec_dict['oar_job_id'].append(slicerec_dict['oar_job_id'])
947 slicerec_dict.update(fixed_slicerec_dict)
948 #slicerec_dict.update({'hrn':\
949 #str(fixed_slicerec_dict['slice_hrn'])})
952 return_slicerec_dictlist.append(slicerec_dict)
953 logger.debug("SLABDRIVER.PY \tGetSlices \
954 slicerec_dict %s return_slicerec_dictlist %s \
955 lease['reserved_nodes'] \
956 %s" %(slicerec_dict, return_slicerec_dictlist, \
957 lease['reserved_nodes'] ))
959 logger.debug("SLABDRIVER.PY \tGetSlices RETURN \
960 return_slicerec_dictlist %s" \
961 %(return_slicerec_dictlist))
963 return return_slicerec_dictlist
967 #Get all slices from the senslab sfa database ,
968 #put them in dict format
969 #query_slice_list = dbsession.query(RegRecord).all()
970 query_slice_list = dbsession.query(RegSlice).options(joinedload('reg_researchers')).all()
971 #query_slice_list = dbsession.query(RegRecord).filter_by(type='slice').all()
972 #query_slice_list = slab_dbsession.query(SenslabXP).all()
973 return_slicerec_dictlist = []
974 for record in query_slice_list:
975 tmp = record.__dict__
976 tmp['reg_researchers'] = tmp['reg_researchers'][0].__dict__
977 #del tmp['reg_researchers']['_sa_instance_state']
978 return_slicerec_dictlist.append(tmp)
979 #return_slicerec_dictlist.append(record.__dict__)
981 #Get all the jobs reserved nodes
982 leases_list = self.GetReservedNodes()
985 for fixed_slicerec_dict in return_slicerec_dictlist:
987 #Check if the slice belongs to a senslab user
988 if fixed_slicerec_dict['peer_authority'] is None:
989 owner = fixed_slicerec_dict['hrn'].split(".")[1].split("_")[0]
992 for lease in leases_list:
993 if owner == lease['user']:
994 slicerec_dict['oar_job_id'] = lease['lease_id']
996 #for reserved_node in lease['reserved_nodes']:
997 logger.debug("SLABDRIVER.PY \tGetSlices lease %s "\
1000 reserved_list = lease['reserved_nodes']
1002 slicerec_dict.update({'node_ids':lease['reserved_nodes']})
1003 slicerec_dict.update({'list_node_ids':{'hostname':reserved_list}})
1004 slicerec_dict.update(fixed_slicerec_dict)
1005 #slicerec_dict.update({'hrn':\
1006 #str(fixed_slicerec_dict['slice_hrn'])})
1007 #return_slicerec_dictlist.append(slicerec_dict)
1008 fixed_slicerec_dict.update(slicerec_dict)
1010 logger.debug("SLABDRIVER.PY \tGetSlices RETURN \
1011 return_slicerec_dictlist %s \slice_filter %s " \
1012 %(return_slicerec_dictlist, slice_filter))
1014 return return_slicerec_dictlist
1017 def testbed_name (self): return self.hrn
1019 # 'geni_request_rspec_versions' and 'geni_ad_rspec_versions' are mandatory
1020 def aggregate_version (self):
1021 version_manager = VersionManager()
1022 ad_rspec_versions = []
1023 request_rspec_versions = []
1024 for rspec_version in version_manager.versions:
1025 if rspec_version.content_type in ['*', 'ad']:
1026 ad_rspec_versions.append(rspec_version.to_dict())
1027 if rspec_version.content_type in ['*', 'request']:
1028 request_rspec_versions.append(rspec_version.to_dict())
1030 'testbed':self.testbed_name(),
1031 'geni_request_rspec_versions': request_rspec_versions,
1032 'geni_ad_rspec_versions': ad_rspec_versions,
1038 # Convert SFA fields to PLC fields for use when registering up updating
1039 # registry record in the PLC database
1041 # @param type type of record (user, slice, ...)
1042 # @param hrn human readable name
1043 # @param sfa_fields dictionary of SFA fields
1044 # @param slab_fields dictionary of PLC fields (output)
1046 def sfa_fields_to_slab_fields(self, sfa_type, hrn, record):
1050 #for field in record:
1051 # slab_record[field] = record[field]
1053 if sfa_type == "slice":
1054 #instantion used in get_slivers ?
1055 if not "instantiation" in slab_record:
1056 slab_record["instantiation"] = "senslab-instantiated"
1057 #slab_record["hrn"] = hrn_to_pl_slicename(hrn)
1058 #Unused hrn_to_pl_slicename because Slab's hrn already
1059 #in the appropriate form SA 23/07/12
1060 slab_record["hrn"] = hrn
1061 logger.debug("SLABDRIVER.PY sfa_fields_to_slab_fields \
1062 slab_record %s " %(slab_record['hrn']))
1064 slab_record["url"] = record["url"]
1065 if "description" in record:
1066 slab_record["description"] = record["description"]
1067 if "expires" in record:
1068 slab_record["expires"] = int(record["expires"])
1070 #nodes added by OAR only and then imported to SFA
1071 #elif type == "node":
1072 #if not "hostname" in slab_record:
1073 #if not "hostname" in record:
1074 #raise MissingSfaInfo("hostname")
1075 #slab_record["hostname"] = record["hostname"]
1076 #if not "model" in slab_record:
1077 #slab_record["model"] = "geni"
1080 #elif type == "authority":
1081 #slab_record["login_base"] = hrn_to_slab_login_base(hrn)
1083 #if not "name" in slab_record:
1084 #slab_record["name"] = hrn
1086 #if not "abbreviated_name" in slab_record:
1087 #slab_record["abbreviated_name"] = hrn
1089 #if not "enabled" in slab_record:
1090 #slab_record["enabled"] = True
1092 #if not "is_public" in slab_record:
1093 #slab_record["is_public"] = True
1100 def __transforms_timestamp_into_date(self, xp_utc_timestamp = None):
1101 """ Transforms unix timestamp into valid OAR date format """
1103 #Used in case of a scheduled experiment (not immediate)
1104 #To run an XP immediately, don't specify date and time in RSpec
1105 #They will be set to None.
1106 if xp_utc_timestamp:
1107 #transform the xp_utc_timestamp into server readable time
1108 xp_server_readable_date = datetime.fromtimestamp(int(\
1109 xp_utc_timestamp)).strftime(self.time_format)
1111 return xp_server_readable_date
1119 def LaunchExperimentOnOAR(self, added_nodes, slice_name, \
1120 lease_start_time, lease_duration, slice_user=None):
1122 lease_dict['lease_start_time'] = lease_start_time
1123 lease_dict['lease_duration'] = lease_duration
1124 lease_dict['added_nodes'] = added_nodes
1125 lease_dict['slice_name'] = slice_name
1126 lease_dict['slice_user'] = slice_user
1127 lease_dict['grain'] = self.GetLeaseGranularity()
1128 lease_dict['time_format'] = self.time_format
1131 def __create_job_structure_request_for_OAR(lease_dict):
1132 """ Creates the structure needed for a correct POST on OAR.
1133 Makes the timestamp transformation into the appropriate format.
1134 Sends the POST request to create the job with the resources in
1143 reqdict['workdir'] = '/tmp'
1144 reqdict['resource'] = "{network_address in ("
1146 for node in lease_dict['added_nodes']:
1147 logger.debug("\r\n \r\n OARrestapi \t \
1148 __create_job_structure_request_for_OAR node %s" %(node))
1150 # Get the ID of the node
1152 reqdict['resource'] += "'" + nodeid + "', "
1153 nodeid_list.append(nodeid)
1155 custom_length = len(reqdict['resource'])- 2
1156 reqdict['resource'] = reqdict['resource'][0:custom_length] + \
1157 ")}/nodes=" + str(len(nodeid_list))
1159 def __process_walltime(duration):
1160 """ Calculates the walltime in seconds from the duration in H:M:S
1161 specified in the RSpec.
1165 # Fixing the walltime by adding a few delays.
1166 # First put the walltime in seconds oarAdditionalDelay = 20;
1167 # additional delay for /bin/sleep command to
1168 # take in account prologue and epilogue scripts execution
1169 # int walltimeAdditionalDelay = 240; additional delay
1170 desired_walltime = duration
1171 total_walltime = desired_walltime + 240 #+4 min Update SA 23/10/12
1172 sleep_walltime = desired_walltime # 0 sec added Update SA 23/10/12
1174 #Put the walltime back in str form
1175 #First get the hours
1176 walltime.append(str(total_walltime / 3600))
1177 total_walltime = total_walltime - 3600 * int(walltime[0])
1178 #Get the remaining minutes
1179 walltime.append(str(total_walltime / 60))
1180 total_walltime = total_walltime - 60 * int(walltime[1])
1182 walltime.append(str(total_walltime))
1185 logger.log_exc(" __process_walltime duration null")
1187 return walltime, sleep_walltime
1190 walltime, sleep_walltime = \
1191 __process_walltime(int(lease_dict['lease_duration'])*lease_dict['grain'])
1194 reqdict['resource'] += ",walltime=" + str(walltime[0]) + \
1195 ":" + str(walltime[1]) + ":" + str(walltime[2])
1196 reqdict['script_path'] = "/bin/sleep " + str(sleep_walltime)
1198 #In case of a scheduled experiment (not immediate)
1199 #To run an XP immediately, don't specify date and time in RSpec
1200 #They will be set to None.
1201 if lease_dict['lease_start_time'] is not '0':
1202 #Readable time accepted by OAR
1203 start_time = datetime.fromtimestamp(int(lease_dict['lease_start_time'])).\
1204 strftime(lease_dict['time_format'])
1205 reqdict['reservation'] = start_time
1206 #If there is not start time, Immediate XP. No need to add special
1210 reqdict['type'] = "deploy"
1211 reqdict['directory'] = ""
1212 reqdict['name'] = "SFA_" + lease_dict['slice_user']
1216 logger.debug("SLABDRIVER.PY \tLaunchExperimentOnOAR slice_user %s\
1217 \r\n " %(slice_user))
1218 #Create the request for OAR
1219 reqdict = __create_job_structure_request_for_OAR(lease_dict)
1220 # first step : start the OAR job and update the job
1221 logger.debug("SLABDRIVER.PY \tLaunchExperimentOnOAR reqdict %s\
1224 answer = self.oar.POSTRequestToOARRestAPI('POST_job', \
1225 reqdict, slice_user)
1226 logger.debug("SLABDRIVER \tLaunchExperimentOnOAR jobid %s " %(answer))
1228 jobid = answer['id']
1230 logger.log_exc("SLABDRIVER \tLaunchExperimentOnOAR \
1231 Impossible to create job %s " %(answer))
1235 def __configure_experiment(jobid, added_nodes):
1236 # second step : configure the experiment
1237 # we need to store the nodes in a yaml (well...) file like this :
1238 # [1,56,23,14,45,75] with name /tmp/sfa<jobid>.json
1239 tmp_dir = '/tmp/sfa/'
1240 if not os.path.exists(tmp_dir):
1241 os.makedirs(tmp_dir)
1242 job_file = open(tmp_dir + str(jobid) + '.json', 'w')
1244 job_file.write(str(added_nodes[0].strip('node')))
1245 for node in added_nodes[1:len(added_nodes)] :
1246 job_file.write(', '+ node.strip('node'))
1251 def __launch_senslab_experiment(jobid):
1252 # third step : call the senslab-experiment wrapper
1253 #command= "java -jar target/sfa-1.0-jar-with-dependencies.jar
1254 # "+str(jobid)+" "+slice_user
1255 javacmdline = "/usr/bin/java"
1257 "/opt/senslabexperimentwrapper/sfa-1.0-jar-with-dependencies.jar"
1259 output = subprocess.Popen([javacmdline, "-jar", jarname, str(jobid), \
1260 slice_user],stdout=subprocess.PIPE).communicate()[0]
1262 logger.debug("SLABDRIVER \t __configure_experiment wrapper returns%s " \
1269 logger.debug("SLABDRIVER \tLaunchExperimentOnOAR jobid %s \
1270 added_nodes %s slice_user %s" %(jobid, added_nodes, slice_user))
1273 __configure_experiment(jobid, added_nodes)
1274 __launch_senslab_experiment(jobid)
1279 def AddLeases(self, hostname_list, slice_record, \
1280 lease_start_time, lease_duration):
1281 logger.debug("SLABDRIVER \r\n \r\n \t AddLeases hostname_list %s \
1282 slice_record %s lease_start_time %s lease_duration %s "\
1283 %( hostname_list, slice_record , lease_start_time, \
1286 #tmp = slice_record['reg-researchers'][0].split(".")
1287 username = slice_record['login']
1288 #username = tmp[(len(tmp)-1)]
1289 job_id = self.LaunchExperimentOnOAR(hostname_list, slice_record['hrn'], \
1290 lease_start_time, lease_duration, username)
1291 start_time = datetime.fromtimestamp(int(lease_start_time)).strftime(self.time_format)
1292 end_time = lease_start_time + lease_duration
1294 import logging, logging.handlers
1295 from sfa.util.sfalogging import _SfaLogger
1296 logger.debug("SLABDRIVER \r\n \r\n \t AddLeases TURN ON LOGGING SQL %s %s %s "%(slice_record['hrn'], job_id, end_time))
1297 sql_logger = _SfaLogger(loggername = 'sqlalchemy.engine', level=logging.DEBUG)
1298 logger.debug("SLABDRIVER \r\n \r\n \t AddLeases %s %s %s " %(type(slice_record['hrn']), type(job_id), type(end_time)))
1299 slab_ex_row = SenslabXP(slice_hrn = slice_record['hrn'], job_id = job_id,end_time= end_time)
1300 logger.debug("SLABDRIVER \r\n \r\n \t AddLeases slab_ex_row %s" %(slab_ex_row))
1301 slab_dbsession.add(slab_ex_row)
1302 slab_dbsession.commit()
1304 logger.debug("SLABDRIVER \t AddLeases hostname_list start_time %s " %(start_time))
1309 #Delete the jobs from job_senslab table
1310 def DeleteSliceFromNodes(self, slice_record):
1311 for job_id in slice_record['oar_job_id']:
1312 self.DeleteJobs(job_id, slice_record['hrn'])
1316 def GetLeaseGranularity(self):
1317 """ Returns the granularity of Senslab testbed.
1318 OAR returns seconds for experiments duration.
1320 Experiments which last less than 10 min are invalid"""
1325 def update_jobs_in_slabdb(self, job_oar_list, jobs_psql):
1326 #Get all the entries in slab_xp table
1329 jobs_psql = set(jobs_psql)
1330 kept_jobs = set(job_oar_list).intersection(jobs_psql)
1331 logger.debug ( "\r\n \t\tt update_jobs_in_slabdb jobs_psql %s \r\n \t job_oar_list %s \
1332 kept_jobs %s " %(jobs_psql,job_oar_list,kept_jobs))
1333 deleted_jobs = set(jobs_psql).difference(kept_jobs)
1334 deleted_jobs = list(deleted_jobs)
1335 if len(deleted_jobs) > 0:
1336 slab_dbsession.query(SenslabXP).filter(SenslabXP.job_id.in_(deleted_jobs)).delete(synchronize_session='fetch')
1337 slab_dbsession.commit()
1343 def GetLeases(self, lease_filter_dict=None, login=None):
1346 unfiltered_reservation_list = self.GetReservedNodes(login)
1348 reservation_list = []
1349 #Find the slice associated with this user senslab ldap uid
1350 logger.debug(" SLABDRIVER.PY \tGetLeases login %s unfiltered_reservation_list %s " %(login ,unfiltered_reservation_list))
1351 #Create user dict first to avoid looking several times for
1352 #the same user in LDAP SA 27/07/12
1356 jobs_psql_query = slab_dbsession.query(SenslabXP).all()
1357 jobs_psql_dict = [ (row.job_id, row.__dict__ )for row in jobs_psql_query ]
1358 jobs_psql_dict = dict(jobs_psql_dict)
1359 logger.debug("SLABDRIVER \tGetLeases jobs_psql_dict %s"\
1361 jobs_psql_id_list = [ row.job_id for row in jobs_psql_query ]
1365 for resa in unfiltered_reservation_list:
1366 logger.debug("SLABDRIVER \tGetLeases USER %s"\
1368 #Cosntruct list of jobs (runing, waiting..) in oar
1369 job_oar_list.append(resa['lease_id'])
1370 #If there is information on the job in SLAB DB (slice used and job id)
1371 if resa['lease_id'] in jobs_psql_dict:
1372 job_info = jobs_psql_dict[resa['lease_id']]
1373 logger.debug("SLABDRIVER \tGetLeases resa_user_dict %s"\
1375 resa['slice_hrn'] = job_info['slice_hrn']
1376 resa['slice_id'] = hrn_to_urn(resa['slice_hrn'], 'slice')
1378 #Assume it is a senslab slice:
1380 resa['slice_id'] = hrn_to_urn(self.root_auth+'.'+ resa['user'] +"_slice" , 'slice')
1381 #if resa['user'] not in resa_user_dict:
1382 #logger.debug("SLABDRIVER \tGetLeases userNOTIN ")
1383 #ldap_info = self.ldap.LdapSearch('(uid='+resa['user']+')')
1385 #ldap_info = ldap_info[0][1]
1386 ##Get the backref :relationship table reg-researchers
1387 #user = dbsession.query(RegUser).options(joinedload('reg_slices_as_researcher')).filter_by(email = \
1388 #ldap_info['mail'][0])
1390 #user = user.first()
1391 #user = user.__dict__
1392 #slice_info = user['reg_slices_as_researcher'][0].__dict__
1393 ##Separated in case user not in database :
1394 ##record_id not defined SA 17/07//12
1396 ##query_slice_info = slab_dbsession.query(SenslabXP).filter_by(record_id_user = user.record_id)
1397 ##if query_slice_info:
1398 ##slice_info = query_slice_info.first()
1402 #resa_user_dict[resa['user']] = {}
1403 #resa_user_dict[resa['user']]['ldap_info'] = user
1404 #resa_user_dict[resa['user']]['slice_info'] = slice_info
1406 #resa['slice_hrn'] = resa_user_dict[resa['user']]['slice_info']['hrn']
1407 #resa['slice_id'] = hrn_to_urn(resa['slice_hrn'], 'slice')
1410 resa['component_id_list'] = []
1411 resa['hrn'] = Xrn(resa['slice_id']).get_hrn()
1412 #Transform the hostnames into urns (component ids)
1413 for node in resa['reserved_nodes']:
1414 #resa['component_id_list'].append(hostname_to_urn(self.hrn, \
1415 #self.root_auth, node['hostname']))
1416 slab_xrn = slab_xrn_object(self.root_auth, node)
1417 resa['component_id_list'].append(slab_xrn.urn)
1419 if lease_filter_dict:
1420 logger.debug("SLABDRIVER \tGetLeases resa_ %s \r\n leasefilter %s"\
1421 %(resa,lease_filter_dict))
1423 if lease_filter_dict['name'] == resa['hrn']:
1424 reservation_list.append(resa)
1426 if lease_filter_dict is None:
1427 reservation_list = unfiltered_reservation_list
1429 #del unfiltered_reservation_list[unfiltered_reservation_list.index(resa)]
1432 self.update_jobs_in_slabdb(job_oar_list, jobs_psql_id_list)
1434 #for resa in unfiltered_reservation_list:
1438 #if resa['user'] in resa_user_dict:
1439 #resa['slice_hrn'] = resa_user_dict[resa['user']]['slice_info']['hrn']
1440 #resa['slice_id'] = hrn_to_urn(resa['slice_hrn'], 'slice')
1442 ##resa['slice_id'] = hrn_to_urn(slice_info.slice_hrn, 'slice')
1443 #resa['component_id_list'] = []
1444 ##Transform the hostnames into urns (component ids)
1445 #for node in resa['reserved_nodes']:
1446 ##resa['component_id_list'].append(hostname_to_urn(self.hrn, \
1447 ##self.root_auth, node['hostname']))
1448 #slab_xrn = slab_xrn_object(self.root_auth, node)
1449 #resa['component_id_list'].append(slab_xrn.urn)
1451 ##Filter the reservation list if necessary
1452 ##Returns all the leases associated with a given slice
1453 #if lease_filter_dict:
1454 #logger.debug("SLABDRIVER \tGetLeases lease_filter_dict %s"\
1455 #%(lease_filter_dict))
1456 #for resa in unfiltered_reservation_list:
1457 #if lease_filter_dict['name'] == resa['slice_hrn']:
1458 #reservation_list.append(resa)
1460 #reservation_list = unfiltered_reservation_list
1462 logger.debug(" SLABDRIVER.PY \tGetLeases reservation_list %s"\
1463 %(reservation_list))
1464 return reservation_list
1466 def augment_records_with_testbed_info (self, sfa_records):
1467 return self.fill_record_info (sfa_records)
1469 def fill_record_info(self, record_list):
1471 Given a SFA record, fill in the senslab specific and SFA specific
1472 fields in the record.
1475 logger.debug("SLABDRIVER \tfill_record_info records %s " %(record_list))
1476 if not isinstance(record_list, list):
1477 record_list = [record_list]
1480 for record in record_list:
1481 #If the record is a SFA slice record, then add information
1482 #about the user of this slice. This kind of
1483 #information is in the Senslab's DB.
1484 if str(record['type']) == 'slice':
1485 if 'reg_researchers' in record and isinstance(record['reg_researchers'],list) :
1486 record['reg_researchers'] = record['reg_researchers'][0].__dict__
1487 record.update({'PI':[record['reg_researchers']['hrn']],
1488 'researcher': [record['reg_researchers']['hrn']],
1489 'name':record['hrn'],
1492 'person_ids':[record['reg_researchers']['record_id']],
1493 'geni_urn':'', #For client_helper.py compatibility
1494 'keys':'', #For client_helper.py compatibility
1495 'key_ids':''}) #For client_helper.py compatibility
1498 #Get slab slice record.
1499 recslice_list = self.GetSlices(slice_filter = \
1500 str(record['hrn']),\
1501 slice_filter_type = 'slice_hrn')
1503 #recuser = recslice_list[0]['reg_researchers']
1504 ##recuser = dbsession.query(RegRecord).filter_by(record_id = \
1505 ##recslice_list[0]['record_id_user']).first()
1507 #record.update({'PI':[recuser['hrn']],
1508 #'researcher': [recuser['hrn']],
1509 #'name':record['hrn'],
1512 #'person_ids':[recslice_list[0]['reg_researchers']['record_id']],
1513 #'geni_urn':'', #For client_helper.py compatibility
1514 #'keys':'', #For client_helper.py compatibility
1515 #'key_ids':''}) #For client_helper.py compatibility
1516 logger.debug("SLABDRIVER \tfill_record_info TYPE SLICE RECUSER record['hrn'] %s ecord['oar_job_id'] %s " %(record['hrn'],record['oar_job_id']))
1518 for rec in recslice_list:
1519 logger.debug("SLABDRIVER\r\n \t \t fill_record_info oar_job_id %s " %(rec['oar_job_id']))
1520 #record['oar_job_id'].append(rec['oar_job_id'])
1521 #del record['_sa_instance_state']
1522 del record['reg_researchers']
1523 record['node_ids'] = [ self.root_auth + hostname for hostname in rec['node_ids']]
1527 logger.debug( "SLABDRIVER.PY \t fill_record_info SLICE \
1528 recslice_list %s \r\n \t RECORD %s \r\n \r\n" %(recslice_list,record))
1529 if str(record['type']) == 'user':
1530 #The record is a SFA user record.
1531 #Get the information about his slice from Senslab's DB
1532 #and add it to the user record.
1533 recslice_list = self.GetSlices(\
1534 slice_filter = record['record_id'],\
1535 slice_filter_type = 'record_id_user')
1537 logger.debug( "SLABDRIVER.PY \t fill_record_info TYPE USER \
1538 recslice_list %s \r\n \t RECORD %s \r\n" %(recslice_list , record))
1539 #Append slice record in records list,
1540 #therefore fetches user and slice info again(one more loop)
1541 #Will update PIs and researcher for the slice
1542 #recuser = dbsession.query(RegRecord).filter_by(record_id = \
1543 #recslice_list[0]['record_id_user']).first()
1544 recuser = recslice_list[0]['reg_researchers']
1545 logger.debug( "SLABDRIVER.PY \t fill_record_info USER \
1546 recuser %s \r\n \r\n" %(recuser))
1548 recslice = recslice_list[0]
1549 recslice.update({'PI':[recuser['hrn']],
1550 'researcher': [recuser['hrn']],
1551 'name':record['hrn'],
1554 'person_ids':[recuser['record_id']]})
1556 for rec in recslice_list:
1557 recslice['oar_job_id'].append(rec['oar_job_id'])
1561 recslice.update({'type':'slice', \
1562 'hrn':recslice_list[0]['hrn']})
1565 #GetPersons takes [] as filters
1566 #user_slab = self.GetPersons([{'hrn':recuser.hrn}])
1567 user_slab = self.GetPersons([record])
1570 record.update(user_slab[0])
1571 #For client_helper.py compatibility
1572 record.update( { 'geni_urn':'',
1575 record_list.append(recslice)
1577 logger.debug("SLABDRIVER.PY \tfill_record_info ADDING SLICE\
1578 INFO TO USER records %s" %(record_list))
1580 logger.debug("SLABDRIVER.PY \tfill_record_info END \
1581 record %s \r\n \r\n " %(record))
1583 except TypeError, error:
1584 logger.log_exc("SLABDRIVER \t fill_record_info EXCEPTION %s"\
1586 #logger.debug("SLABDRIVER.PY \t fill_record_info ENDENDEND ")
1590 #self.fill_record_slab_info(records)
1596 #TODO Update membership? update_membership_list SA 05/07/12
1597 #def update_membership_list(self, oldRecord, record, listName, addFunc, \
1599 ## get a list of the HRNs tht are members of the old and new records
1601 #oldList = oldRecord.get(listName, [])
1604 #newList = record.get(listName, [])
1606 ## if the lists are the same, then we don't have to update anything
1607 #if (oldList == newList):
1610 ## build a list of the new person ids, by looking up each person to get
1614 #records = table.find({'type': 'user', 'hrn': newList})
1615 #for rec in records:
1616 #newIdList.append(rec['pointer'])
1618 ## build a list of the old person ids from the person_ids field
1620 #oldIdList = oldRecord.get("person_ids", [])
1621 #containerId = oldRecord.get_pointer()
1623 ## if oldRecord==None, then we are doing a Register, instead of an
1626 #containerId = record.get_pointer()
1628 ## add people who are in the new list, but not the oldList
1629 #for personId in newIdList:
1630 #if not (personId in oldIdList):
1631 #addFunc(self.plauth, personId, containerId)
1633 ## remove people who are in the old list, but not the new list
1634 #for personId in oldIdList:
1635 #if not (personId in newIdList):
1636 #delFunc(self.plauth, personId, containerId)
1638 #def update_membership(self, oldRecord, record):
1640 #if record.type == "slice":
1641 #self.update_membership_list(oldRecord, record, 'researcher',
1642 #self.users.AddPersonToSlice,
1643 #self.users.DeletePersonFromSlice)
1644 #elif record.type == "authority":
1649 # I don't think you plan on running a component manager at this point
1650 # let me clean up the mess of ComponentAPI that is deprecated anyways
1653 #TODO FUNCTIONS SECTION 04/07/2012 SA
1655 #TODO : Is UnBindObjectFromPeer still necessary ? Currently does nothing
1657 def UnBindObjectFromPeer(self, auth, object_type, object_id, shortname):
1658 """ This method is a hopefully temporary hack to let the sfa correctly
1659 detach the objects it creates from a remote peer object. This is
1660 needed so that the sfa federation link can work in parallel with
1661 RefreshPeer, as RefreshPeer depends on remote objects being correctly
1664 auth : struct, API authentication structure
1665 AuthMethod : string, Authentication method to use
1666 object_type : string, Object type, among 'site','person','slice',
1668 object_id : int, object_id
1669 shortname : string, peer shortname
1673 logger.warning("SLABDRIVER \tUnBindObjectFromPeer EMPTY-\
1677 #TODO Is BindObjectToPeer still necessary ? Currently does nothing
1679 def BindObjectToPeer(self, auth, object_type, object_id, shortname=None, \
1680 remote_object_id=None):
1681 """This method is a hopefully temporary hack to let the sfa correctly
1682 attach the objects it creates to a remote peer object. This is needed
1683 so that the sfa federation link can work in parallel with RefreshPeer,
1684 as RefreshPeer depends on remote objects being correctly marked.
1686 shortname : string, peer shortname
1687 remote_object_id : int, remote object_id, set to 0 if unknown
1691 logger.warning("SLABDRIVER \tBindObjectToPeer EMPTY - DO NOTHING \r\n ")
1694 #TODO UpdateSlice 04/07/2012 SA
1695 #Funciton should delete and create another job since oin senslab slice=job
1696 def UpdateSlice(self, auth, slice_id_or_name, slice_fields=None):
1697 """Updates the parameters of an existing slice with the values in
1699 Users may only update slices of which they are members.
1700 PIs may update any of the slices at their sites, or any slices of
1701 which they are members. Admins may update any slice.
1702 Only PIs and admins may update max_nodes. Slices cannot be renewed
1703 (by updating the expires parameter) more than 8 weeks into the future.
1704 Returns 1 if successful, faults otherwise.
1708 logger.warning("SLABDRIVER UpdateSlice EMPTY - DO NOTHING \r\n ")
1711 #TODO UpdatePerson 04/07/2012 SA
1712 def UpdatePerson(self, slab_hrn, federated_hrn, person_fields=None):
1713 """Updates a person. Only the fields specified in person_fields
1714 are updated, all other fields are left untouched.
1715 Users and techs can only update themselves. PIs can only update
1716 themselves and other non-PIs at their sites.
1717 Returns 1 if successful, faults otherwise.
1721 #new_row = FederatedToSenslab(slab_hrn, federated_hrn)
1722 #slab_dbsession.add(new_row)
1723 #slab_dbsession.commit()
1725 logger.debug("SLABDRIVER UpdatePerson EMPTY - DO NOTHING \r\n ")
1728 #TODO GetKeys 04/07/2012 SA
1729 def GetKeys(self, auth, key_filter=None, return_fields=None):
1730 """Returns an array of structs containing details about keys.
1731 If key_filter is specified and is an array of key identifiers,
1732 or a struct of key attributes, only keys matching the filter
1733 will be returned. If return_fields is specified, only the
1734 specified details will be returned.
1736 Admin may query all keys. Non-admins may only query their own keys.
1740 logger.warning("SLABDRIVER GetKeys EMPTY - DO NOTHING \r\n ")
1743 #TODO DeleteKey 04/07/2012 SA
1744 def DeleteKey(self, auth, key_id):
1746 Non-admins may only delete their own keys.
1747 Returns 1 if successful, faults otherwise.
1751 logger.warning("SLABDRIVER DeleteKey EMPTY - DO NOTHING \r\n ")
1755 #TODO : Check rights to delete person
1756 def DeletePerson(self, auth, person_record):
1757 """ Disable an existing account in senslab LDAP.
1758 Users and techs can only delete themselves. PIs can only
1759 delete themselves and other non-PIs at their sites.
1760 ins can delete anyone.
1761 Returns 1 if successful, faults otherwise.
1765 #Disable user account in senslab LDAP
1766 ret = self.ldap.LdapMarkUserAsDeleted(person_record)
1767 logger.warning("SLABDRIVER DeletePerson %s " %(person_record))
1770 #TODO Check DeleteSlice, check rights 05/07/2012 SA
1771 def DeleteSlice(self, auth, slice_record):
1772 """ Deletes the specified slice.
1773 Senslab : Kill the job associated with the slice if there is one
1774 using DeleteSliceFromNodes.
1775 Updates the slice record in slab db to remove the slice nodes.
1777 Users may only delete slices of which they are members. PIs may
1778 delete any of the slices at their sites, or any slices of which
1779 they are members. Admins may delete any slice.
1780 Returns 1 if successful, faults otherwise.
1784 self.DeleteSliceFromNodes(slice_record)
1785 logger.warning("SLABDRIVER DeleteSlice %s "%(slice_record))
1788 def __add_person_to_db(self, user_dict):
1790 check_if_exists = dbsession.query(RegUser).filter_by(email = user_dict['email']).first()
1791 #user doesn't exists
1792 if not check_if_exists:
1793 logger.debug("__add_person_to_db \t Adding %s \r\n \r\n \
1794 _________________________________________________________________________\
1795 " %(user_dict['hrn']))
1796 user_record = RegUser(hrn =user_dict['hrn'] , pointer= '-1', authority=get_authority(hrn), \
1797 email= user_dict['email'], gid = None)
1798 user_record.reg_keys = [RegKey(user_dict['pkey'])]
1799 user_record.just_created()
1800 dbsession.add (user_record)
1804 #TODO AddPerson 04/07/2012 SA
1805 #def AddPerson(self, auth, person_fields=None):
1806 def AddPerson(self, record):#TODO fixing 28/08//2012 SA
1807 """Adds a new account. Any fields specified in records are used,
1808 otherwise defaults are used.
1809 Accounts are disabled by default. To enable an account,
1811 Returns the new person_id (> 0) if successful, faults otherwise.
1815 ret = self.ldap.LdapAddUser(record)
1816 logger.debug("SLABDRIVER AddPerson return code %s \r\n "%(ret))
1817 self.__add_person_to_db(record)
1820 #TODO AddPersonToSite 04/07/2012 SA
1821 def AddPersonToSite (self, auth, person_id_or_email, \
1822 site_id_or_login_base=None):
1823 """ Adds the specified person to the specified site. If the person is
1824 already a member of the site, no errors are returned. Does not change
1825 the person's primary site.
1826 Returns 1 if successful, faults otherwise.
1830 logger.warning("SLABDRIVER AddPersonToSite EMPTY - DO NOTHING \r\n ")
1833 #TODO AddRoleToPerson : Not sure if needed in senslab 04/07/2012 SA
1834 def AddRoleToPerson(self, auth, role_id_or_name, person_id_or_email):
1835 """Grants the specified role to the person.
1836 PIs can only grant the tech and user roles to users and techs at their
1837 sites. Admins can grant any role to any user.
1838 Returns 1 if successful, faults otherwise.
1842 logger.warning("SLABDRIVER AddRoleToPerson EMPTY - DO NOTHING \r\n ")
1845 #TODO AddPersonKey 04/07/2012 SA
1846 def AddPersonKey(self, auth, person_id_or_email, key_fields=None):
1847 """Adds a new key to the specified account.
1848 Non-admins can only modify their own keys.
1849 Returns the new key_id (> 0) if successful, faults otherwise.
1853 logger.warning("SLABDRIVER AddPersonKey EMPTY - DO NOTHING \r\n ")
1856 def DeleteLeases(self, leases_id_list, slice_hrn ):
1857 logger.debug("SLABDRIVER DeleteLeases leases_id_list %s slice_hrn %s \
1858 \r\n " %(leases_id_list, slice_hrn))
1859 for job_id in leases_id_list:
1860 self.DeleteJobs(job_id, slice_hrn)