4 from datetime import datetime
5 from dateutil import tz
6 from time import strftime,gmtime
8 from sfa.util.faults import SliverDoesNotExist, UnknownSfaType
9 from sfa.util.sfalogging import logger
11 from sfa.storage.alchemy import dbsession
12 from sfa.storage.model import RegRecord, RegUser
14 from sfa.trust.credential import Credential
17 from sfa.managers.driver import Driver
18 from sfa.rspecs.version_manager import VersionManager
19 from sfa.rspecs.rspec import RSpec
21 from sfa.util.xrn import hrn_to_urn, urn_to_sliver_id, get_leaf
22 from sfa.planetlab.plxrn import slicename_to_hrn, hrn_to_pl_slicename, \
26 ## thierry: everything that is API-related (i.e. handling incoming requests)
28 # SlabDriver should be really only about talking to the senslab testbed
31 from sfa.senslab.OARrestapi import OARrestapi
32 from sfa.senslab.LDAPapi import LDAPapi
34 from sfa.senslab.slabpostgres import SlabDB, slab_dbsession,SliceSenslab
35 from sfa.senslab.slabaggregate import SlabAggregate
36 from sfa.senslab.slabslices import SlabSlices
41 # this inheritance scheme is so that the driver object can receive
42 # GetNodes or GetSites sorts of calls directly
43 # and thus minimize the differences in the managers with the pl version
44 class SlabDriver(Driver):
46 def __init__(self, config):
47 Driver.__init__ (self, config)
49 self.hrn = config.SFA_INTERFACE_HRN
51 self.root_auth = config.SFA_REGISTRY_ROOT_AUTH
53 self.oar = OARrestapi()
55 self.time_format = "%Y-%m-%d %H:%M:%S"
56 self.db = SlabDB(config,debug = True)
60 def sliver_status(self,slice_urn,slice_hrn):
61 """Receive a status request for slice named urn/hrn
62 urn:publicid:IDN+senslab+nturro_slice hrn senslab.nturro_slice
63 shall return a structure as described in
64 http://groups.geni.net/geni/wiki/GAPI_AM_API_V2#SliverStatus
65 NT : not sure if we should implement this or not, but used by sface.
69 #First get the slice with the slice hrn
70 sl = self.GetSlices(slice_filter = slice_hrn, slice_filter_type = 'slice_hrn')
72 raise SliverDoesNotExist("%s slice_hrn" % (slice_hrn))
74 top_level_status = 'unknown'
75 nodes_in_slice = sl['node_ids']
77 if len(nodes_in_slice) is 0:
78 raise SliverDoesNotExist("No slivers allocated ")
80 top_level_status = 'ready'
82 logger.debug("Slabdriver - sliver_status Sliver status urn %s hrn %s sl\
83 %s \r\n " %(slice_urn,slice_hrn,sl) )
85 if sl['oar_job_id'] is not -1:
86 #A job is running on Senslab for this slice
87 # report about the local nodes that are in the slice only
89 nodes_all = self.GetNodes({'hostname':nodes_in_slice},
90 ['node_id', 'hostname','site','boot_state'])
91 nodeall_byhostname = dict([(n['hostname'], n) for n in nodes_all])
95 result['geni_urn'] = slice_urn
96 result['pl_login'] = sl['job_user'] #For compatibility
99 timestamp = float(sl['startTime']) + float(sl['walltime'])
100 result['pl_expires'] = strftime(self.time_format, \
101 gmtime(float(timestamp)))
102 #result['slab_expires'] = strftime(self.time_format,\
103 #gmtime(float(timestamp)))
106 for node in nodeall_byhostname:
108 #res['slab_hostname'] = node['hostname']
109 #res['slab_boot_state'] = node['boot_state']
111 res['pl_hostname'] = nodeall_byhostname[node]['hostname']
112 res['pl_boot_state'] = nodeall_byhostname[node]['boot_state']
113 res['pl_last_contact'] = strftime(self.time_format, \
114 gmtime(float(timestamp)))
115 sliver_id = urn_to_sliver_id(slice_urn, sl['record_id_slice'], \
116 nodeall_byhostname[node]['node_id'])
117 res['geni_urn'] = sliver_id
118 if nodeall_byhostname[node]['boot_state'] == 'Alive':
120 res['geni_status'] = 'ready'
122 res['geni_status'] = 'failed'
123 top_level_status = 'failed'
125 res['geni_error'] = ''
127 resources.append(res)
129 result['geni_status'] = top_level_status
130 result['geni_resources'] = resources
131 print >>sys.stderr, "\r\n \r\n_____________ Sliver status resources %s res %s \r\n " %(resources,res)
135 def create_sliver (self, slice_urn, slice_hrn, creds, rspec_string, users, options):
136 logger.debug("SLABDRIVER.PY \tcreate_sliver ")
137 aggregate = SlabAggregate(self)
139 slices = SlabSlices(self)
140 peer = slices.get_peer(slice_hrn)
141 sfa_peer = slices.get_sfa_peer(slice_hrn)
144 if not isinstance(creds, list):
148 slice_record = users[0].get('slice_record', {})
151 rspec = RSpec(rspec_string)
152 logger.debug("SLABDRIVER.PY \tcreate_sliver \trspec.version %s " %(rspec.version))
155 # ensure site record exists?
156 # ensure slice record exists
157 sfa_slice = slices.verify_slice(slice_hrn, slice_record, peer, sfa_peer, options=options)
158 requested_attributes = rspec.version.get_slice_attributes()
160 if requested_attributes:
161 for attrib_dict in requested_attributes:
162 if 'timeslot' in attrib_dict and attrib_dict['timeslot'] is not None:
163 sfa_slice.update({'timeslot':attrib_dict['timeslot']})
164 print >>sys.stderr, "\r\n \r\n \t=============================== SLABDRIVER.PY create_sliver ..... slice %s " %(sfa_slice)
166 # ensure person records exists
167 persons = slices.verify_persons(slice_hrn, sfa_slice, users, peer, sfa_peer, options=options)
169 # ensure slice attributes exists?
172 # add/remove slice from nodes
174 requested_slivers = [node.get('component_name') for node in rspec.version.get_nodes_with_slivers()]
175 logger.debug("SLADRIVER \tcreate_sliver requested_slivers requested_slivers %s " %(requested_slivers))
177 nodes = slices.verify_slice_nodes(sfa_slice, requested_slivers, peer)
180 requested_leases = []
182 for lease in rspec.version.get_leases():
184 if not lease.get('lease_id'):
185 requested_lease['hostname'] = xrn_to_hostname(lease.get('component_id').strip())
186 requested_lease['t_from'] = lease.get('t_from')
187 requested_lease['t_until'] = lease.get('t_until')
189 kept_leases.append(int(lease['lease_id']))
190 if requested_lease.get('hostname'):
191 requested_leases.append(requested_lease)
193 leases = slices.verify_slice_leases(sfa_slice, requested_leases, kept_leases, peer)
195 return aggregate.get_rspec(slice_xrn=slice_urn, version=rspec.version)
198 def delete_sliver (self, slice_urn, slice_hrn, creds, options):
200 sfa_slice = self.GetSlices(slice_filter = slice_hrn, slice_filter_type = 'slice_hrn')
201 logger.debug("SLABDRIVER.PY delete_sliver slice %s" %(sfa_slice))
205 slices = SlabSlices(self)
206 # determine if this is a peer slice
208 peer = slices.get_peer(slice_hrn)
211 self.UnBindObjectFromPeer('slice', sfa_slice['record_id_slice'], peer)
212 self.DeleteSliceFromNodes(sfa_slice)
215 self.BindObjectToPeer('slice', sfa_slice['slice_id'], peer, sfa_slice['peer_slice_id'])
219 def AddSlice(self, slice_record):
220 slab_slice = SliceSenslab( slice_hrn = slice_record['slice_hrn'], record_id_slice= slice_record['record_id_slice'] , record_id_user= slice_record['record_id_user'], peer_authority = slice_record['peer_authority'])
221 print>>sys.stderr, "\r\n \r\n \t\t\t =======SLABDRIVER.PY AddSlice slice_record %s slab_slice %s" %(slice_record,slab_slice)
222 slab_dbsession.add(slab_slice)
223 slab_dbsession.commit()
226 # first 2 args are None in case of resource discovery
227 def list_resources (self, slice_urn, slice_hrn, creds, options):
228 #cached_requested = options.get('cached', True)
230 version_manager = VersionManager()
231 # get the rspec's return format from options
232 rspec_version = version_manager.get_version(options.get('geni_rspec_version'))
233 version_string = "rspec_%s" % (rspec_version)
235 #panos adding the info option to the caching key (can be improved)
236 if options.get('info'):
237 version_string = version_string + "_"+options.get('info', 'default')
239 # look in cache first
240 #if cached_requested and self.cache and not slice_hrn:
241 #rspec = self.cache.get(version_string)
243 #logger.debug("SlabDriver.ListResources: returning cached advertisement")
246 #panos: passing user-defined options
247 logger.debug("SLABDRIVER \tlist_resources rspec " )
248 aggregate = SlabAggregate(self)
249 origin_hrn = Credential(string=creds[0]).get_gid_caller().get_hrn()
250 options.update({'origin_hrn':origin_hrn})
251 rspec = aggregate.get_rspec(slice_xrn=slice_urn, version=rspec_version,
255 #if self.cache and not slice_hrn:
256 #logger.debug("Slab.ListResources: stores advertisement in cache")
257 #self.cache.add(version_string, rspec)
262 def list_slices (self, creds, options):
263 # look in cache first
265 #slices = self.cache.get('slices')
267 #logger.debug("PlDriver.list_slices returns from cache")
271 print>>sys.stderr, " \r\n \t\t SLABDRIVER.PY list_slices"
272 slices = self.GetSlices()
273 slice_hrns = [slicename_to_hrn(self.hrn, slice['slice_hrn']) for slice in slices]
274 slice_urns = [hrn_to_urn(slice_hrn, 'slice') for slice_hrn in slice_hrns]
278 #logger.debug ("SlabDriver.list_slices stores value in cache")
279 #self.cache.add('slices', slice_urns)
283 #No site or node register supported
284 def register (self, sfa_record, hrn, pub_key):
285 type = sfa_record['type']
286 slab_record = self.sfa_fields_to_slab_fields(type, hrn, sfa_record)
290 acceptable_fields=['url', 'instantiation', 'name', 'description']
291 for key in slab_record.keys():
292 if key not in acceptable_fields:
294 print>>sys.stderr, " \r\n \t\t SLABDRIVER.PY register"
295 slices = self.GetSlices(slice_filter =slab_record['hrn'], slice_filter_type = 'slice_hrn')
297 pointer = self.AddSlice(slab_record)
299 pointer = slices[0]['slice_id']
302 persons = self.GetPersons([sfa_record])
303 #persons = self.GetPersons([sfa_record['hrn']])
305 pointer = self.AddPerson(dict(sfa_record))
308 pointer = persons[0]['person_id']
310 #Does this make sense to senslab ?
311 #if 'enabled' in sfa_record and sfa_record['enabled']:
312 #self.UpdatePerson(pointer, {'enabled': sfa_record['enabled']})
314 # add this person to the site only if she is being added for the first
315 # time by sfa and doesont already exist in plc
316 if not persons or not persons[0]['site_ids']:
317 login_base = get_leaf(sfa_record['authority'])
318 self.AddPersonToSite(pointer, login_base)
320 # What roles should this user have?
321 #TODO : DElete this AddRoleToPerson 04/07/2012 SA
322 #Function prototype is :
323 #AddRoleToPerson(self, auth, role_id_or_name, person_id_or_email)
324 #what's the pointer doing here?
325 self.AddRoleToPerson('user', pointer)
328 self.AddPersonKey(pointer, {'key_type' : 'ssh', 'key' : pub_key})
330 #No node adding outside OAR
334 #No site or node record update allowed
335 def update (self, old_sfa_record, new_sfa_record, hrn, new_key):
336 pointer = old_sfa_record['pointer']
337 type = old_sfa_record['type']
339 # new_key implemented for users only
340 if new_key and type not in [ 'user' ]:
341 raise UnknownSfaType(type)
343 #if (type == "authority"):
344 #self.shell.UpdateSite(pointer, new_sfa_record)
347 slab_record=self.sfa_fields_to_slab_fields(type, hrn, new_sfa_record)
348 if 'name' in slab_record:
349 slab_record.pop('name')
350 #Prototype should be UpdateSlice(self,
351 #auth, slice_id_or_name, slice_fields)
352 #Senslab cannot update slice since slice = job
353 #so we must delete and create another job
354 self.UpdateSlice(pointer, slab_record)
358 all_fields = new_sfa_record
359 for key in all_fields.keys():
360 if key in ['first_name', 'last_name', 'title', 'email',
361 'password', 'phone', 'url', 'bio', 'accepted_aup',
363 update_fields[key] = all_fields[key]
364 self.UpdatePerson(pointer, update_fields)
367 # must check this key against the previous one if it exists
368 persons = self.GetPersons([pointer], ['key_ids'])
370 keys = person['key_ids']
371 keys = self.GetKeys(person['key_ids'])
373 # Delete all stale keys
376 if new_key != key['key']:
377 self.DeleteKey(key['key_id'])
381 self.AddPersonKey(pointer, {'key_type': 'ssh', 'key': new_key})
387 def remove (self, sfa_record):
388 type = sfa_record['type']
389 hrn = sfa_record['hrn']
390 record_id= sfa_record['record_id']
392 username = hrn.split(".")[len(hrn.split(".")) -1]
394 persons = self.GetPersons(sfa_record)
395 #persons = self.GetPersons(username)
396 # only delete this person if he has site ids. if he doesnt, it probably means
397 # he was just removed from a site, not actually deleted
398 if persons and persons[0]['site_ids']:
399 #TODO : delete person in LDAP
400 self.DeletePerson(username)
401 elif type == 'slice':
402 if self.GetSlices(slice_filter = hrn, slice_filter_type = 'slice_hrn'):
403 self.DeleteSlice(hrn)
405 #elif type == 'authority':
406 #if self.GetSites(pointer):
407 #self.DeleteSite(pointer)
411 def GetPeers (self,auth = None, peer_filter=None, return_fields_list=None):
413 existing_records = {}
414 existing_hrns_by_types= {}
415 print >>sys.stderr, "\r\n \r\n SLABDRIVER GetPeers auth = %s, peer_filter %s, return_field %s " %(auth , peer_filter, return_fields_list)
416 all_records = dbsession.query(RegRecord).filter(RegRecord.type.like('%authority%')).all()
417 for record in all_records:
418 existing_records[(record.hrn,record.type)] = record
419 if record.type not in existing_hrns_by_types:
420 existing_hrns_by_types[record.type] = [record.hrn]
421 print >>sys.stderr, "\r\n \r\n SLABDRIVER GetPeers \t NOT IN existing_hrns_by_types %s " %( existing_hrns_by_types)
424 print >>sys.stderr, "\r\n \r\n SLABDRIVER GetPeers \t INNN type %s hrn %s " %( record.type,record.hrn )
425 existing_hrns_by_types[record.type].append(record.hrn)
426 print >>sys.stderr, "\r\n \r\n SLABDRIVER GetPeers \t INNN existing_hrns_by_types %s " %( existing_hrns_by_types)
427 #existing_hrns_by_types.update({record.type:(existing_hrns_by_types[record.type].append(record.hrn))})
429 print >>sys.stderr, "\r\n \r\n SLABDRIVER GetPeers existing_hrns_by_types %s " %( existing_hrns_by_types)
433 print >>sys.stderr, "\r\n \r\n SLABDRIVER GetPeers existing_hrns_by_types['authority+sa'] %s \t\t existing_records %s " %(existing_hrns_by_types['authority'],existing_records)
435 records_list.append(existing_records[(peer_filter,'authority')])
437 for hrn in existing_hrns_by_types['authority']:
438 records_list.append(existing_records[(hrn,'authority')])
440 print >>sys.stderr, "\r\n \r\n SLABDRIVER GetPeers records_list %s " %(records_list)
445 return_records = records_list
446 if not peer_filter and not return_fields_list:
450 print >>sys.stderr, "\r\n \r\n SLABDRIVER GetPeers return_records %s " %(return_records)
451 return return_records
454 #TODO : Handling OR request in make_ldap_filters_from_records instead of the for loop
455 #over the records' list
456 def GetPersons(self, person_filter=None, return_fields_list=None):
458 person_filter should be a list of dictionnaries when not set to None.
459 Returns a list of users found.
462 print>>sys.stderr, "\r\n \r\n \t\t\t GetPersons person_filter %s" %(person_filter)
464 if person_filter and isinstance(person_filter,list):
465 #If we are looking for a list of users (list of dict records)
466 #Usually the list contains only one user record
467 for f in person_filter:
468 person = self.ldap.LdapFindUser(f)
469 person_list.append(person)
472 person_list = self.ldap.LdapFindUser()
477 def GetTimezone(self):
478 server_timestamp,server_tz = self.oar.parser.SendRequest("GET_timezone")
479 return server_timestamp,server_tz
482 def DeleteJobs(self, job_id, slice_hrn):
485 username = slice_hrn.split(".")[-1].rstrip("_slice")
487 reqdict['method'] = "delete"
488 reqdict['strval'] = str(job_id)
489 answer = self.oar.POSTRequestToOARRestAPI('DELETE_jobs_id',reqdict,username)
490 print>>sys.stderr, "\r\n \r\n jobid DeleteJobs %s " %(answer)
492 def GetJobsId(self, job_id, username = None ):
494 Details about a specific job.
495 Includes details about submission time, jot type, state, events,
496 owner, assigned ressources, walltime etc...
500 node_list_k = 'assigned_network_address'
501 #Get job info from OAR
502 job_info = self.oar.parser.SendRequest(req, job_id, username)
504 logger.debug("SLABDRIVER \t GetJobsId %s " %(job_info))
506 if job_info['state'] == 'Terminated':
507 logger.debug("SLABDRIVER \t GetJobsId job %s TERMINATED"\
510 if job_info['state'] == 'Error':
511 logger.debug("SLABDRIVER \t GetJobsId ERROR message %s "\
516 logger.error("SLABDRIVER \tGetJobsId KeyError")
519 parsed_job_info = self.get_info_on_reserved_nodes(job_info,node_list_k)
520 #Replaces the previous entry "assigned_network_address" / "reserved_resources"
522 job_info.update({'node_ids':parsed_job_info[node_list_k]})
523 del job_info[node_list_k]
524 logger.debug(" \r\nSLABDRIVER \t GetJobsId job_info %s " %(job_info))
528 def GetJobsResources(self,job_id, username = None):
529 #job_resources=['reserved_resources', 'assigned_resources','job_id', 'job_uri', 'assigned_nodes',\
531 #assigned_res = ['resource_id', 'resource_uri']
532 #assigned_n = ['node', 'node_uri']
534 req = "GET_jobs_id_resources"
535 node_list_k = 'reserved_resources'
537 #Get job resources list from OAR
538 node_id_list = self.oar.parser.SendRequest(req, job_id, username)
539 logger.debug("SLABDRIVER \t GetJobsResources %s " %(node_id_list))
542 self.__get_hostnames_from_oar_node_ids(node_id_list)
544 #parsed_job_info = self.get_info_on_reserved_nodes(job_info,node_list_k)
545 #Replaces the previous entry "assigned_network_address" /
546 #"reserved_resources"
548 job_info = {'node_ids': hostname_list}
553 def get_info_on_reserved_nodes(self,job_info,node_list_name):
554 #Get the list of the testbed nodes records and make a
555 #dictionnary keyed on the hostname out of it
556 node_list_dict = self.GetNodes()
557 #node_hostname_list = []
558 node_hostname_list = [node['hostname'] for node in node_list_dict]
559 #for node in node_list_dict:
560 #node_hostname_list.append(node['hostname'])
561 node_dict = dict(zip(node_hostname_list,node_list_dict))
563 reserved_node_hostname_list = []
564 for index in range(len(job_info[node_list_name])):
565 #job_info[node_list_name][k] =
566 reserved_node_hostname_list[index] = \
567 node_dict[job_info[node_list_name][index]]['hostname']
569 logger.debug("SLABDRIVER \t get_info_on_reserved_nodes \
570 reserved_node_hostname_list %s" \
571 %(reserved_node_hostname_list))
573 logger.error("SLABDRIVER \t get_info_on_reserved_nodes KEYERROR " )
575 return reserved_node_hostname_list
577 def GetNodesCurrentlyInUse(self):
578 """Returns a list of all the nodes already involved in an oar job"""
579 return self.oar.parser.SendRequest("GET_running_jobs")
581 def __get_hostnames_from_oar_node_ids(self, resource_id_list ):
582 full_nodes_dict_list = self.GetNodes()
583 #Put the full node list into a dictionary keyed by oar node id
584 oar_id_node_dict = {}
585 for node in full_nodes_dict_list:
586 oar_id_node_dict[node['oar_id']] = node
588 logger.debug("SLABDRIVER \t __get_hostnames_from_oar_node_ids\
589 oar_id_node_dict %s" %(oar_id_node_dict))
591 hostname_dict_list = []
592 for resource_id in resource_id_list:
593 hostname_dict_list.append({'hostname' : \
594 oar_id_node_dict[resource_id]['hostname'],
595 'site_id' : oar_id_node_dict[resource_id]['site']})
597 #hostname_list.append(oar_id_node_dict[resource_id]['hostname'])
598 return hostname_dict_list
600 def GetReservedNodes(self):
601 #Get the nodes in use and the reserved nodes
602 reservation_dict_list = self.oar.parser.SendRequest("GET_reserved_nodes")
605 for resa in reservation_dict_list:
606 logger.debug ("GetReservedNodes resa %s"%(resa))
607 #dict list of hostnames and their site
608 resa['reserved_nodes'] = \
609 self.__get_hostnames_from_oar_node_ids(resa['resource_ids'])
611 #del resa['resource_ids']
612 return reservation_dict_list
614 def GetNodes(self,node_filter_dict = None, return_fields_list = None):
616 node_filter_dict : dictionnary of lists
619 node_dict_by_id = self.oar.parser.SendRequest("GET_resources_full")
620 node_dict_list = node_dict_by_id.values()
622 #No filtering needed return the list directly
623 if not (node_filter_dict or return_fields_list):
624 return node_dict_list
626 return_node_list = []
628 for filter_key in node_filter_dict:
630 #Filter the node_dict_list by each value contained in the
631 #list node_filter_dict[filter_key]
632 for value in node_filter_dict[filter_key]:
633 for node in node_dict_list:
634 if node[filter_key] == value:
635 if return_fields_list :
637 for k in return_fields_list:
639 return_node_list.append(tmp)
641 return_node_list.append(node)
643 logger.log_exc("GetNodes KeyError")
647 return return_node_list
650 def GetSites(self, site_filter_name_list = None, return_fields_list = None):
651 site_dict = self.oar.parser.SendRequest("GET_sites")
652 #site_dict : dict where the key is the sit ename
653 return_site_list = []
654 if not ( site_filter_name_list or return_fields_list):
655 return_site_list = site_dict.values()
656 return return_site_list
658 for site_filter_name in site_filter_name_list:
659 if site_filter_name in site_dict:
660 if return_fields_list:
661 for field in return_fields_list:
664 tmp[field] = site_dict[site_filter_name][field]
666 logger.error("GetSites KeyError %s "%(field))
668 return_site_list.append(tmp)
670 return_site_list.append( site_dict[site_filter_name])
673 return return_site_list
676 def GetSlices(self, slice_filter = None, slice_filter_type = None, \
677 return_fields_list = None):
678 return_slice_list = []
681 authorized_filter_types_list = ['slice_hrn', 'record_id_user']
682 logger.debug("SLABDRIVER \tGetSlices authorized_filter_types_list %s"\
683 %(authorized_filter_types_list))
684 if slice_filter_type in authorized_filter_types_list:
685 if slice_filter_type == 'slice_hrn':
686 slicerec = slab_dbsession.query(SliceSenslab).\
687 filter_by(slice_hrn = slice_filter).first()
689 if slice_filter_type == 'record_id_user':
690 slicerec = slab_dbsession.query(SliceSenslab).\
691 filter_by(record_id_user = slice_filter).first()
695 slicerec_dict = slicerec.dump_sqlalchemyobj_to_dict() #warning pylint OK
696 logger.debug("SLABDRIVER \tGetSlices slicerec_dict %s" \
699 login = slicerec_dict['slice_hrn'].split(".")[1].split("_")[0]
700 logger.debug("\r\n SLABDRIVER \tGetSlices login %s \
702 %(login, slicerec_dict))
703 if slicerec_dict['oar_job_id'] is not -1:
704 #Check with OAR the status of the job if a job id is in
706 rslt = self.GetJobsResources(slicerec_dict['oar_job_id'], \
710 slicerec_dict.update(rslt)
711 slicerec_dict.update({'hrn':\
712 str(slicerec_dict['slice_hrn'])})
713 #If GetJobsResources is empty, this means the job is
714 #now in the 'Terminated' state
715 #Update the slice record
717 self.db.update_job(slice_filter, job_id = -1)
718 slicerec_dict['oar_job_id'] = -1
719 slicerec_dict.update({'hrn':str(slicerec_dict['slice_hrn'])})
722 slicerec_dict['node_ids'] = slicerec_dict['node_list']
726 logger.debug("SLABDRIVER.PY GetSlices slicerec_dict %s"\
733 return_slice_list = slab_dbsession.query(SliceSenslab).all()
735 print >>sys.stderr, " \r\n \r\n \tSLABDRIVER.PY GetSlices slices %s slice_filter %s " %(return_slice_list,slice_filter)
737 #if return_fields_list:
738 #return_slice_list = parse_filter(sliceslist, slice_filter,'slice', return_fields_list)
742 return return_slice_list
747 def testbed_name (self): return self.hrn
749 # 'geni_request_rspec_versions' and 'geni_ad_rspec_versions' are mandatory
750 def aggregate_version (self):
751 version_manager = VersionManager()
752 ad_rspec_versions = []
753 request_rspec_versions = []
754 for rspec_version in version_manager.versions:
755 if rspec_version.content_type in ['*', 'ad']:
756 ad_rspec_versions.append(rspec_version.to_dict())
757 if rspec_version.content_type in ['*', 'request']:
758 request_rspec_versions.append(rspec_version.to_dict())
760 'testbed':self.testbed_name(),
761 'geni_request_rspec_versions': request_rspec_versions,
762 'geni_ad_rspec_versions': ad_rspec_versions,
771 # Convert SFA fields to PLC fields for use when registering up updating
772 # registry record in the PLC database
774 # @param type type of record (user, slice, ...)
775 # @param hrn human readable name
776 # @param sfa_fields dictionary of SFA fields
777 # @param slab_fields dictionary of PLC fields (output)
779 def sfa_fields_to_slab_fields(self, type, hrn, record):
781 def convert_ints(tmpdict, int_fields):
782 for field in int_fields:
784 tmpdict[field] = int(tmpdict[field])
787 #for field in record:
788 # slab_record[field] = record[field]
791 #instantion used in get_slivers ?
792 if not "instantiation" in slab_record:
793 slab_record["instantiation"] = "senslab-instantiated"
794 slab_record["hrn"] = hrn_to_pl_slicename(hrn)
795 print >>sys.stderr, "\r\n \r\n \t SLABDRIVER.PY sfa_fields_to_slab_fields slab_record %s hrn_to_pl_slicename(hrn) hrn %s " %(slab_record['hrn'], hrn)
797 slab_record["url"] = record["url"]
798 if "description" in record:
799 slab_record["description"] = record["description"]
800 if "expires" in record:
801 slab_record["expires"] = int(record["expires"])
803 #nodes added by OAR only and then imported to SFA
804 #elif type == "node":
805 #if not "hostname" in slab_record:
806 #if not "hostname" in record:
807 #raise MissingSfaInfo("hostname")
808 #slab_record["hostname"] = record["hostname"]
809 #if not "model" in slab_record:
810 #slab_record["model"] = "geni"
813 #elif type == "authority":
814 #slab_record["login_base"] = hrn_to_slab_login_base(hrn)
816 #if not "name" in slab_record:
817 #slab_record["name"] = hrn
819 #if not "abbreviated_name" in slab_record:
820 #slab_record["abbreviated_name"] = hrn
822 #if not "enabled" in slab_record:
823 #slab_record["enabled"] = True
825 #if not "is_public" in slab_record:
826 #slab_record["is_public"] = True
830 def __process_walltime(self,duration=None):
831 """ Calculates the walltime in seconds from the duration in H:M:S
832 specified in the RSpec.
836 walltime = duration.split(":")
837 # Fixing the walltime by adding a few delays. First put the walltime
838 # in seconds oarAdditionalDelay = 20; additional delay for
839 # /bin/sleep command to
840 # take in account prologue and epilogue scripts execution
841 # int walltimeAdditionalDelay = 120; additional delay
843 desired_walltime = int(walltime[0])*3600 + int(walltime[1]) * 60 +\
845 total_walltime = desired_walltime + 140 #+2 min 20
846 sleep_walltime = desired_walltime + 20 #+20 sec
847 logger.debug("SLABDRIVER \t__process_walltime desired_walltime %s\
848 total_walltime %s sleep_walltime %s "\
849 %(desired_walltime, total_walltime, \
851 #Put the walltime back in str form
853 walltime[0] = str(total_walltime / 3600)
854 total_walltime = total_walltime - 3600 * int(walltime[0])
855 #Get the remaining minutes
856 walltime[1] = str(total_walltime / 60)
857 total_walltime = total_walltime - 60 * int(walltime[1])
859 walltime[2] = str(total_walltime)
860 logger.debug("SLABDRIVER \t__process_walltime walltime %s "\
863 #automatically set 10min +2 min 20
867 sleep_walltime = '620'
869 return walltime, sleep_walltime
872 def __transforms_timestamp_into_date(self, xp_utc_timestamp = None):
873 """ Transforms unix timestamp into valid OAR date format """
875 #Used in case of a scheduled experiment (not immediate)
876 #To run an XP immediately, don't specify date and time in RSpec
877 #They will be set to None.
879 #transform the xp_utc_timestamp into server readable time
880 xp_server_readable_date = datetime.fromtimestamp(int(\
881 xp_utc_timestamp)).strftime(self.time_format)
883 return xp_server_readable_date
888 def LaunchExperimentOnOAR(self, slice_dict, added_nodes, slice_user=None):
889 """ Creates the structure needed for a correct POST on OAR.
890 Makes the timestamp transformation into the appropriate format.
891 Sends the POST request to create the job with the resources in
899 slice_name = slice_dict['name']
901 slot = slice_dict['timeslot']
902 logger.debug("SLABDRIVER.PY \tLaunchExperimentOnOAR \
905 #Running on default parameters
906 #XP immediate , 10 mins
907 slot = { 'date':None, 'start_time':None,
908 'timezone':None, 'duration':None }#10 min
910 reqdict['workdir']= '/tmp'
911 reqdict['resource'] ="{network_address in ("
913 for node in added_nodes:
914 logger.debug("OARrestapi \tLaunchExperimentOnOAR \
917 #Get the ID of the node : remove the root auth and put
918 # the site in a separate list.
919 # NT: it's not clear for me if the nodenames will have the senslab
920 #prefix so lets take the last part only, for now.
922 # Again here it's not clear if nodes will be prefixed with <site>_,
923 #lets split and tanke the last part for now.
924 #s=lastpart.split("_")
927 reqdict['resource'] += "'"+ nodeid +"', "
928 nodeid_list.append(nodeid)
930 custom_length = len(reqdict['resource'])- 2
931 reqdict['resource'] = reqdict['resource'][0:custom_length] + \
932 ")}/nodes=" + str(len(nodeid_list))
934 #if slot['duration']:
935 walltime, sleep_walltime = self.__process_walltime(duration = \
938 #walltime, sleep_walltime = self.__process_walltime(duration = None)
940 reqdict['resource']+= ",walltime=" + str(walltime[0]) + \
941 ":" + str(walltime[1]) + ":" + str(walltime[2])
942 reqdict['script_path'] = "/bin/sleep " + str(sleep_walltime)
946 #In case of a scheduled experiment (not immediate)
947 #To run an XP immediately, don't specify date and time in RSpec
948 #They will be set to None.
949 server_timestamp,server_tz = self.GetTimezone()
950 if slot['date'] and slot['start_time']:
951 if slot['timezone'] is '' or slot['timezone'] is None:
952 #assume it is server timezone
953 from_zone=tz.gettz(server_tz)
954 logger.warning("SLABDRIVER \tLaunchExperimentOnOAR timezone \
955 not specified server_tz %s from_zone %s" \
956 %(server_tz,from_zone))
958 #Get zone of the user from the reservation time given
960 from_zone = tz.gettz(slot['timezone'])
962 date = str(slot['date']) + " " + str(slot['start_time'])
963 user_datetime = datetime.strptime(date, self.time_format)
964 user_datetime = user_datetime.replace(tzinfo = from_zone)
966 #Convert to server zone
968 to_zone = tz.gettz(server_tz)
969 reservation_date = user_datetime.astimezone(to_zone)
970 #Readable time accpeted by OAR
971 reqdict['reservation']= reservation_date.strftime(self.time_format)
973 logger.debug("SLABDRIVER \tLaunchExperimentOnOAR reqdict['reservation'] %s " %(reqdict['reservation']))
976 # Immediate XP. Not need to add special parameters.
977 # normally not used in SFA
982 reqdict['type'] = "deploy"
983 reqdict['directory']= ""
984 reqdict['name']= "TestSandrine"
987 # first step : start the OAR job and update the job
988 logger.debug("SLABDRIVER.PY \tLaunchExperimentOnOAR reqdict %s \r\n site_list %s" %(reqdict,site_list) )
990 answer = self.oar.POSTRequestToOARRestAPI('POST_job',reqdict,slice_user)
991 logger.debug("SLABDRIVER \tLaunchExperimentOnOAR jobid %s " %(answer))
995 logger.log_exc("SLABDRIVER \tLaunchExperimentOnOAR Impossible to create job %s " %(answer))
998 logger.debug("SLABDRIVER \tLaunchExperimentOnOAR jobid %s added_nodes %s slice_user %s" %(jobid,added_nodes,slice_user))
999 self.db.update_job( slice_name, jobid ,added_nodes)
1002 # second step : configure the experiment
1003 # we need to store the nodes in a yaml (well...) file like this :
1004 # [1,56,23,14,45,75] with name /tmp/sfa<jobid>.json
1005 f=open('/tmp/sfa/'+str(jobid)+'.json','w')
1007 f.write(str(added_nodes[0].strip('node')))
1008 for node in added_nodes[1:len(added_nodes)] :
1009 f.write(','+node.strip('node'))
1013 # third step : call the senslab-experiment wrapper
1014 #command= "java -jar target/sfa-1.0-jar-with-dependencies.jar "+str(jobid)+" "+slice_user
1015 javacmdline="/usr/bin/java"
1016 jarname="/opt/senslabexperimentwrapper/sfa-1.0-jar-with-dependencies.jar"
1017 #ret=subprocess.check_output(["/usr/bin/java", "-jar", ", str(jobid), slice_user])
1018 output = subprocess.Popen([javacmdline, "-jar", jarname, str(jobid), slice_user],stdout=subprocess.PIPE).communicate()[0]
1020 print>>sys.stderr, "\r\n \r\n LaunchExperimentOnOAR wrapper returns %s " %(output)
1024 #Delete the jobs and updates the job id in the senslab table
1026 #Does not clear the node list
1027 def DeleteSliceFromNodes(self, slice_record):
1028 # Get user information
1030 self.DeleteJobs(slice_record['oar_job_id'], slice_record['hrn'])
1031 self.db.update_job(slice_record['hrn'], job_id = -1)
1037 def GetLeases(self, lease_filter_dict=None, return_fields_list=None):
1038 unfiltered_reservation_list = self.GetReservedNodes()
1039 reservation_list = []
1040 #Find the slice associated with this user senslab ldap uid
1041 logger.debug(" SLABDRIVER.PY \tGetLeases ")
1042 for resa in unfiltered_reservation_list:
1043 ldap_info = self.ldap.LdapSearch('(uid='+resa['user']+')')
1044 ldap_info = ldap_info[0][1]
1046 user = dbsession.query(RegUser).filter_by(email = ldap_info['mail'][0]).first()
1048 slice_info = slab_dbsession.query(SliceSenslab).filter_by(record_id_user = user.record_id).first()
1050 resa['slice_id'] = hrn_to_urn(slice_info.slice_hrn, 'slice')
1051 resa['component_id_list'] = []
1052 #Transform the hostnames into urns (component ids)
1053 for node in resa['reserved_nodes']:
1054 resa['component_id_list'].append(hostname_to_urn(self.hrn, \
1055 self.root_auth, node['hostname']))
1058 #Filter the reservation list if necessary
1059 #Returns all the leases associated with a given slice
1060 if lease_filter_dict:
1061 logger.debug("SLABDRIVER \tGetLeases lease_filter_dict %s"%(lease_filter_dict))
1062 for resa in unfiltered_reservation_list:
1063 if lease_filter_dict['name'] == resa['slice_id']:
1064 reservation_list.append(resa)
1066 reservation_list = unfiltered_reservation_list
1068 logger.debug(" SLABDRIVER.PY \tGetLeases reservation_list %s"%(reservation_list))
1069 return reservation_list
1071 def augment_records_with_testbed_info (self, sfa_records):
1072 return self.fill_record_info (sfa_records)
1074 def fill_record_info(self, record_list):
1076 Given a SFA record, fill in the senslab specific and SFA specific
1077 fields in the record.
1080 logger.debug("SLABDRIVER \tfill_record_info records %s " %(record_list))
1081 if not isinstance(record_list, list):
1082 record_list = [record_list]
1085 for record in record_list:
1086 #If the record is a SFA slice record, then add information
1087 #about the user of this slice. This kind of information is in the
1089 if str(record['type']) == 'slice':
1090 #Get slab slice record.
1091 recslice = self.GetSlices(slice_filter = \
1092 str(record['hrn']),\
1093 slice_filter_type = 'slice_hrn')
1094 recuser = dbsession.query(RegRecord).filter_by(record_id = \
1095 recslice['record_id_user']).first()
1096 logger.debug( "SLABDRIVER.PY \t fill_record_info SLICE \
1097 rec %s \r\n \r\n" %(recslice))
1098 record.update({'PI':[recuser.hrn],
1099 'researcher': [recuser.hrn],
1100 'name':record['hrn'],
1101 'oar_job_id':recslice['oar_job_id'],
1103 'person_ids':[recslice['record_id_user']],
1104 'geni_urn':'', #For client_helper.py compatibility
1105 'keys':'', #For client_helper.py compatibility
1106 'key_ids':''}) #For client_helper.py compatibility
1108 elif str(record['type']) == 'user':
1109 #The record is a SFA user record.
1110 #Get the information about his slice from Senslab's DB
1111 #and add it to the user record.
1112 recslice = self.GetSlices(slice_filter = \
1113 record['record_id'],\
1114 slice_filter_type = 'record_id_user')
1116 logger.debug( "SLABDRIVER.PY \t fill_record_info user \
1117 rec %s \r\n \r\n" %(recslice))
1118 #Append slice record in records list,
1119 #therefore fetches user and slice info again(one more loop)
1120 #Will update PIs and researcher for the slice
1121 recuser = dbsession.query(RegRecord).filter_by(record_id = \
1122 recslice['record_id_user']).first()
1123 recslice.update({'PI':[recuser.hrn],
1124 'researcher': [recuser.hrn],
1125 'name':record['hrn'],
1126 'oar_job_id':recslice['oar_job_id'],
1128 'person_ids':[recslice['record_id_user']]})
1130 #GetPersons takes [] as filters
1131 #user_slab = self.GetPersons([{'hrn':recuser.hrn}])
1132 user_slab = self.GetPersons([record])
1134 recslice.update({'type':'slice','hrn':recslice['slice_hrn']})
1135 record.update(user_slab[0])
1136 #For client_helper.py compatibility
1137 record.update( { 'geni_urn':'',
1140 record_list.append(recslice)
1142 logger.debug("SLABDRIVER.PY \tfill_record_info ADDING SLICE\
1143 INFO TO USER records %s" %(record_list))
1147 logger.log_exc("SLABDRIVER \t fill_record_info EXCEPTION %s" %(e))
1151 #self.fill_record_slab_info(records)
1152 ##print >>sys.stderr, "\r\n \t\t after fill_record_slab_info %s" %(records)
1153 #self.fill_record_sfa_info(records)
1154 #print >>sys.stderr, "\r\n \t\t after fill_record_sfa_info"
1160 #def update_membership_list(self, oldRecord, record, listName, addFunc, delFunc):
1161 ## get a list of the HRNs tht are members of the old and new records
1163 #oldList = oldRecord.get(listName, [])
1166 #newList = record.get(listName, [])
1168 ## if the lists are the same, then we don't have to update anything
1169 #if (oldList == newList):
1172 ## build a list of the new person ids, by looking up each person to get
1176 #records = table.find({'type': 'user', 'hrn': newList})
1177 #for rec in records:
1178 #newIdList.append(rec['pointer'])
1180 ## build a list of the old person ids from the person_ids field
1182 #oldIdList = oldRecord.get("person_ids", [])
1183 #containerId = oldRecord.get_pointer()
1185 ## if oldRecord==None, then we are doing a Register, instead of an
1188 #containerId = record.get_pointer()
1190 ## add people who are in the new list, but not the oldList
1191 #for personId in newIdList:
1192 #if not (personId in oldIdList):
1193 #addFunc(self.plauth, personId, containerId)
1195 ## remove people who are in the old list, but not the new list
1196 #for personId in oldIdList:
1197 #if not (personId in newIdList):
1198 #delFunc(self.plauth, personId, containerId)
1200 #def update_membership(self, oldRecord, record):
1201 #print >>sys.stderr, " \r\n \r\n ***SLABDRIVER.PY update_membership record ", record
1202 #if record.type == "slice":
1203 #self.update_membership_list(oldRecord, record, 'researcher',
1204 #self.users.AddPersonToSlice,
1205 #self.users.DeletePersonFromSlice)
1206 #elif record.type == "authority":
1211 # I don't think you plan on running a component manager at this point
1212 # let me clean up the mess of ComponentAPI that is deprecated anyways
1215 #TODO FUNCTIONS SECTION 04/07/2012 SA
1217 #TODO : Is UnBindObjectFromPeer still necessary ? Currently does nothing
1219 def UnBindObjectFromPeer(self, auth, object_type, object_id, shortname):
1220 """ This method is a hopefully temporary hack to let the sfa correctly
1221 detach the objects it creates from a remote peer object. This is
1222 needed so that the sfa federation link can work in parallel with
1223 RefreshPeer, as RefreshPeer depends on remote objects being correctly
1226 auth : struct, API authentication structure
1227 AuthMethod : string, Authentication method to use
1228 object_type : string, Object type, among 'site','person','slice',
1230 object_id : int, object_id
1231 shortname : string, peer shortname
1235 logger.warning("SLABDRIVER \tUnBindObjectFromPeer EMPTY-\
1239 #TODO Is BindObjectToPeer still necessary ? Currently does nothing
1241 def BindObjectToPeer(self, auth, object_type, object_id, shortname=None, \
1242 remote_object_id=None):
1243 """This method is a hopefully temporary hack to let the sfa correctly
1244 attach the objects it creates to a remote peer object. This is needed
1245 so that the sfa federation link can work in parallel with RefreshPeer,
1246 as RefreshPeer depends on remote objects being correctly marked.
1248 shortname : string, peer shortname
1249 remote_object_id : int, remote object_id, set to 0 if unknown
1253 logger.warning("SLABDRIVER \tBindObjectToPeer EMPTY - DO NOTHING \r\n ")
1256 #TODO UpdateSlice 04/07/2012 SA
1257 #Funciton should delete and create another job since oin senslab slice=job
1258 def UpdateSlice(self, auth, slice_id_or_name, slice_fields=None):
1259 """Updates the parameters of an existing slice with the values in
1261 Users may only update slices of which they are members.
1262 PIs may update any of the slices at their sites, or any slices of
1263 which they are members. Admins may update any slice.
1264 Only PIs and admins may update max_nodes. Slices cannot be renewed
1265 (by updating the expires parameter) more than 8 weeks into the future.
1266 Returns 1 if successful, faults otherwise.
1272 #TODO UpdatePerson 04/07/2012 SA
1273 def UpdatePerson(self, auth, person_id_or_email, person_fields=None):
1274 """Updates a person. Only the fields specified in person_fields
1275 are updated, all other fields are left untouched.
1276 Users and techs can only update themselves. PIs can only update
1277 themselves and other non-PIs at their sites.
1278 Returns 1 if successful, faults otherwise.
1284 #TODO GetKeys 04/07/2012 SA
1285 def GetKeys(self, auth, key_filter=None, return_fields=None):
1286 """Returns an array of structs containing details about keys.
1287 If key_filter is specified and is an array of key identifiers,
1288 or a struct of key attributes, only keys matching the filter
1289 will be returned. If return_fields is specified, only the
1290 specified details will be returned.
1292 Admin may query all keys. Non-admins may only query their own keys.
1299 #TODO DeleteKey 04/07/2012 SA
1300 def DeleteKey(self, auth, key_id):
1302 Non-admins may only delete their own keys.
1303 Returns 1 if successful, faults otherwise.
1310 #TODO DeletePerson 04/07/2012 SA
1311 def DeletePerson(self, auth, person_id_or_email):
1312 """ Mark an existing account as deleted.
1313 Users and techs can only delete themselves. PIs can only
1314 delete themselves and other non-PIs at their sites.
1315 ins can delete anyone.
1316 Returns 1 if successful, faults otherwise.
1322 #TODO DeleteSlice 04/07/2012 SA
1323 def DeleteSlice(self, auth, slice_id_or_name):
1324 """ Deletes the specified slice.
1325 Users may only delete slices of which they are members. PIs may
1326 delete any of the slices at their sites, or any slices of which
1327 they are members. Admins may delete any slice.
1328 Returns 1 if successful, faults otherwise.
1335 #TODO AddPerson 04/07/2012 SA
1336 def AddPerson(self, auth, person_fields=None):
1337 """Adds a new account. Any fields specified in person_fields are used,
1338 otherwise defaults are used.
1339 Accounts are disabled by default. To enable an account,
1341 Returns the new person_id (> 0) if successful, faults otherwise.
1347 #TODO AddPersonToSite 04/07/2012 SA
1348 def AddPersonToSite (self, auth, person_id_or_email, \
1349 site_id_or_login_base=None):
1350 """ Adds the specified person to the specified site. If the person is
1351 already a member of the site, no errors are returned. Does not change
1352 the person's primary site.
1353 Returns 1 if successful, faults otherwise.
1359 #TODO AddRoleToPerson : Not sure if needed in senslab 04/07/2012 SA
1360 def AddRoleToPerson(self, auth, role_id_or_name, person_id_or_email):
1361 """Grants the specified role to the person.
1362 PIs can only grant the tech and user roles to users and techs at their
1363 sites. Admins can grant any role to any user.
1364 Returns 1 if successful, faults otherwise.
1371 #TODO AddPersonKey 04/07/2012 SA
1372 def AddPersonKey(self, auth, person_id_or_email, key_fields=None):
1373 """Adds a new key to the specified account.
1374 Non-admins can only modify their own keys.
1375 Returns the new key_id (> 0) if successful, faults otherwise.