4 from datetime import datetime
5 from dateutil import tz
6 from time import strftime,gmtime
8 from sfa.util.faults import MissingSfaInfo , SliverDoesNotExist
9 from sfa.util.sfalogging import logger
10 from sfa.util.defaultdict import defaultdict
12 from sfa.storage.record import Record
13 from sfa.storage.alchemy import dbsession
14 from sfa.storage.model import RegRecord
16 from sfa.trust.credential import Credential
17 from sfa.trust.gid import GID
19 from sfa.managers.driver import Driver
20 from sfa.rspecs.version_manager import VersionManager
21 from sfa.rspecs.rspec import RSpec
23 from sfa.util.xrn import hrn_to_urn, urn_to_sliver_id
24 from sfa.util.plxrn import slicename_to_hrn, hostname_to_hrn, hrn_to_pl_slicename
26 ## thierry: everything that is API-related (i.e. handling incoming requests)
28 # SlabDriver should be really only about talking to the senslab testbed
31 from sfa.senslab.OARrestapi import OARrestapi
32 from sfa.senslab.LDAPapi import LDAPapi
34 from sfa.senslab.slabpostgres import SlabDB, slab_dbsession,SliceSenslab
35 from sfa.senslab.slabaggregate import SlabAggregate
36 from sfa.senslab.slabslices import SlabSlices
41 # this inheritance scheme is so that the driver object can receive
42 # GetNodes or GetSites sorts of calls directly
43 # and thus minimize the differences in the managers with the pl version
44 class SlabDriver(Driver):
46 def __init__(self, config):
47 Driver.__init__ (self, config)
49 self.hrn = config.SFA_INTERFACE_HRN
51 self.root_auth = config.SFA_REGISTRY_ROOT_AUTH
53 self.oar = OARrestapi()
55 self.time_format = "%Y-%m-%d %H:%M:%S"
56 self.db = SlabDB(config,debug = True)
60 def sliver_status(self,slice_urn,slice_hrn):
61 """Receive a status request for slice named urn/hrn
62 urn:publicid:IDN+senslab+nturro_slice hrn senslab.nturro_slice
63 shall return a structure as described in
64 http://groups.geni.net/geni/wiki/GAPI_AM_API_V2#SliverStatus
65 NT : not sure if we should implement this or not, but used by sface.
69 #First get the slice with the slice hrn
70 sl = self.GetSlices(slice_filter = slice_hrn, slice_filter_type = 'slice_hrn')
72 raise SliverDoesNotExist("%s slice_hrn" % (slice_hrn))
74 nodes_in_slice = sl['node_ids']
75 if len(nodes_in_slice) is 0:
76 raise SliverDoesNotExist("No slivers allocated ")
78 logger.debug("Slabdriver - sliver_status Sliver status urn %s hrn %s sl\
79 %s \r\n " %(slice_urn,slice_hrn,sl) )
81 if sl['oar_job_id'] is not -1:
82 #A job is running on Senslab for this slice
83 # report about the local nodes that are in the slice only
85 nodes_all = self.GetNodes({'hostname':nodes_in_slice},
86 ['node_id', 'hostname','site','boot_state'])
87 nodeall_byhostname = dict([(n['hostname'], n) for n in nodes_all])
91 top_level_status = 'unknown'
93 top_level_status = 'ready'
94 result['geni_urn'] = slice_urn
95 result['pl_login'] = sl['job_user'] #For compatibility
98 timestamp = float(sl['startTime']) + float(sl['walltime'])
99 result['pl_expires'] = strftime(self.time_format, \
100 gmtime(float(timestamp)))
101 #result['slab_expires'] = strftime(self.time_format,\
102 #gmtime(float(timestamp)))
107 #res['slab_hostname'] = node['hostname']
108 #res['slab_boot_state'] = node['boot_state']
110 res['pl_hostname'] = nodeall_byhostname[node]['hostname']
111 res['pl_boot_state'] = nodeall_byhostname[node]['boot_state']
112 res['pl_last_contact'] = strftime(self.time_format, \
113 gmtime(float(timestamp)))
114 sliver_id = urn_to_sliver_id(slice_urn, sl['record_id_slice'], \
115 nodeall_byhostname[node]['node_id'])
116 res['geni_urn'] = sliver_id
117 if nodeall_byhostname[node]['boot_state'] == 'Alive':
119 res['geni_status'] = 'ready'
121 res['geni_status'] = 'failed'
122 top_level_status = 'failed'
124 res['geni_error'] = ''
126 resources.append(res)
128 result['geni_status'] = top_level_status
129 result['geni_resources'] = resources
130 print >>sys.stderr, "\r\n \r\n_____________ Sliver status resources %s res %s \r\n " %(resources,res)
134 def create_sliver (self, slice_urn, slice_hrn, creds, rspec_string, users, options):
135 print>>sys.stderr, "\r\n \r\n \t=============================== SLABDRIVER.PY create_sliver "
136 aggregate = SlabAggregate(self)
138 slices = SlabSlices(self)
139 peer = slices.get_peer(slice_hrn)
140 sfa_peer = slices.get_sfa_peer(slice_hrn)
143 if not isinstance(creds, list):
147 slice_record = users[0].get('slice_record', {})
150 rspec = RSpec(rspec_string)
151 print>>sys.stderr, "\r\n \r\n \t=============================== SLABDRIVER.PY create_sliver ============================rspec.version %s " %(rspec.version)
154 # ensure site record exists?
155 # ensure slice record exists
156 slice = slices.verify_slice(slice_hrn, slice_record, peer, sfa_peer, options=options)
157 requested_attributes = rspec.version.get_slice_attributes()
159 if requested_attributes:
160 for attrib_dict in requested_attributes:
161 if 'timeslot' in attrib_dict and attrib_dict['timeslot'] is not None:
162 slice.update({'timeslot':attrib_dict['timeslot']})
163 print >>sys.stderr, "\r\n \r\n \t=============================== SLABDRIVER.PY create_sliver ..... slice %s " %(slice)
164 # ensure person records exists
165 persons = slices.verify_persons(slice_hrn, slice, users, peer, sfa_peer, options=options)
166 # ensure slice attributes exists?
169 # add/remove slice from nodes
170 print >>sys.stderr, "\r\n \r\n \t=============================== SLABDRIVER.PY create_sliver ..... "
172 requested_slivers = [node.get('component_name') for node in rspec.version.get_nodes_with_slivers()]
173 print >>sys.stderr, "\r\n \r\n \t=============================== ........... requested_slivers ============================requested_slivers %s " %(requested_slivers)
174 nodes = slices.verify_slice_nodes(slice, requested_slivers, peer)
177 return aggregate.get_rspec(slice_xrn=slice_urn, version=rspec.version)
180 def delete_sliver (self, slice_urn, slice_hrn, creds, options):
182 slice = self.GetSlices(slice_filter= slice_hrn, slice_filter_type = 'slice_hrn')
183 print>>sys.stderr, "\r\n \r\n \t\t SLABDRIVER.PY delete_sliver slice %s" %(slice)
187 slices = SlabSlices(self)
188 # determine if this is a peer slice
189 # xxx I wonder if this would not need to use PlSlices.get_peer instead
190 # in which case plc.peers could be deprecated as this here
191 # is the only/last call to this last method in plc.peers
192 peer = slices.get_peer(slice_hrn)
195 self.UnBindObjectFromPeer('slice', slice['record_id_slice'], peer)
196 self.DeleteSliceFromNodes(slice)
199 self.BindObjectToPeer('slice', slice['slice_id'], peer, slice['peer_slice_id'])
203 def AddSlice(self, slice_record):
204 slab_slice = SliceSenslab( slice_hrn = slice_record['slice_hrn'], record_id_slice= slice_record['record_id_slice'] , record_id_user= slice_record['record_id_user'], peer_authority = slice_record['peer_authority'])
205 print>>sys.stderr, "\r\n \r\n \t\t\t =======SLABDRIVER.PY AddSlice slice_record %s slab_slice %s" %(slice_record,slab_slice)
206 slab_dbsession.add(slab_slice)
207 slab_dbsession.commit()
210 # first 2 args are None in case of resource discovery
211 def list_resources (self, slice_urn, slice_hrn, creds, options):
212 #cached_requested = options.get('cached', True)
214 version_manager = VersionManager()
215 # get the rspec's return format from options
216 rspec_version = version_manager.get_version(options.get('geni_rspec_version'))
217 version_string = "rspec_%s" % (rspec_version)
219 #panos adding the info option to the caching key (can be improved)
220 if options.get('info'):
221 version_string = version_string + "_"+options.get('info', 'default')
223 # look in cache first
224 #if cached_requested and self.cache and not slice_hrn:
225 #rspec = self.cache.get(version_string)
227 #logger.debug("SlabDriver.ListResources: returning cached advertisement")
230 #panos: passing user-defined options
232 aggregate = SlabAggregate(self)
233 origin_hrn = Credential(string=creds[0]).get_gid_caller().get_hrn()
234 options.update({'origin_hrn':origin_hrn})
235 rspec = aggregate.get_rspec(slice_xrn=slice_urn, version=rspec_version,
237 print>>sys.stderr, " \r\n \r\n \t SLABDRIVER list_resources rspec "
239 #if self.cache and not slice_hrn:
240 #logger.debug("Slab.ListResources: stores advertisement in cache")
241 #self.cache.add(version_string, rspec)
246 def list_slices (self, creds, options):
247 # look in cache first
249 #slices = self.cache.get('slices')
251 #logger.debug("PlDriver.list_slices returns from cache")
255 print>>sys.stderr, " \r\n \t\t SLABDRIVER.PY list_slices"
256 slices = self.GetSlices()
257 slice_hrns = [slicename_to_hrn(self.hrn, slice['slice_hrn']) for slice in slices]
258 slice_urns = [hrn_to_urn(slice_hrn, 'slice') for slice_hrn in slice_hrns]
262 #logger.debug ("SlabDriver.list_slices stores value in cache")
263 #self.cache.add('slices', slice_urns)
267 #No site or node register supported
268 def register (self, sfa_record, hrn, pub_key):
269 type = sfa_record['type']
270 slab_record = self.sfa_fields_to_slab_fields(type, hrn, sfa_record)
274 acceptable_fields=['url', 'instantiation', 'name', 'description']
275 for key in slab_record.keys():
276 if key not in acceptable_fields:
278 print>>sys.stderr, " \r\n \t\t SLABDRIVER.PY register"
279 slices = self.GetSlices(slice_filter =slab_record['hrn'], slice_filter_type = 'slice_hrn')
281 pointer = self.AddSlice(slab_record)
283 pointer = slices[0]['slice_id']
286 persons = self.GetPersons([sfa_record])
287 #persons = self.GetPersons([sfa_record['hrn']])
289 pointer = self.AddPerson(dict(sfa_record))
292 pointer = persons[0]['person_id']
294 #Does this make sense to senslab ?
295 #if 'enabled' in sfa_record and sfa_record['enabled']:
296 #self.UpdatePerson(pointer, {'enabled': sfa_record['enabled']})
298 # add this person to the site only if she is being added for the first
299 # time by sfa and doesont already exist in plc
300 if not persons or not persons[0]['site_ids']:
301 login_base = get_leaf(sfa_record['authority'])
302 self.AddPersonToSite(pointer, login_base)
304 # What roles should this user have?
305 self.AddRoleToPerson('user', pointer)
308 self.AddPersonKey(pointer, {'key_type' : 'ssh', 'key' : pub_key})
310 #No node adding outside OAR
314 #No site or node record update allowed
315 def update (self, old_sfa_record, new_sfa_record, hrn, new_key):
316 pointer = old_sfa_record['pointer']
317 type = old_sfa_record['type']
319 # new_key implemented for users only
320 if new_key and type not in [ 'user' ]:
321 raise UnknownSfaType(type)
323 #if (type == "authority"):
324 #self.shell.UpdateSite(pointer, new_sfa_record)
327 slab_record=self.sfa_fields_to_slab_fields(type, hrn, new_sfa_record)
328 if 'name' in slab_record:
329 slab_record.pop('name')
330 self.UpdateSlice(pointer, slab_record)
334 all_fields = new_sfa_record
335 for key in all_fields.keys():
336 if key in ['first_name', 'last_name', 'title', 'email',
337 'password', 'phone', 'url', 'bio', 'accepted_aup',
339 update_fields[key] = all_fields[key]
340 self.UpdatePerson(pointer, update_fields)
343 # must check this key against the previous one if it exists
344 persons = self.GetPersons([pointer], ['key_ids'])
346 keys = person['key_ids']
347 keys = self.GetKeys(person['key_ids'])
349 # Delete all stale keys
352 if new_key != key['key']:
353 self.DeleteKey(key['key_id'])
357 self.AddPersonKey(pointer, {'key_type': 'ssh', 'key': new_key})
363 def remove (self, sfa_record):
364 type=sfa_record['type']
365 hrn=sfa_record['hrn']
366 record_id= sfa_record['record_id']
368 username = hrn.split(".")[len(hrn.split(".")) -1]
370 persons = self.GetPersons(sfa_record)
371 #persons = self.GetPersons(username)
372 # only delete this person if he has site ids. if he doesnt, it probably means
373 # he was just removed from a site, not actually deleted
374 if persons and persons[0]['site_ids']:
375 self.DeletePerson(username)
376 elif type == 'slice':
377 if self.GetSlices(slice_filter = hrn, slice_filter_type = 'slice_hrn'):
378 self.DeleteSlice(hrn)
380 #elif type == 'authority':
381 #if self.GetSites(pointer):
382 #self.DeleteSite(pointer)
386 def GetPeers (self,auth = None, peer_filter=None, return_fields_list=None):
388 existing_records = {}
389 existing_hrns_by_types= {}
390 print >>sys.stderr, "\r\n \r\n SLABDRIVER GetPeers auth = %s, peer_filter %s, return_field %s " %(auth , peer_filter, return_fields_list)
391 all_records = dbsession.query(RegRecord).filter(RegRecord.type.like('%authority%')).all()
392 for record in all_records:
393 existing_records[(record.hrn,record.type)] = record
394 if record.type not in existing_hrns_by_types:
395 existing_hrns_by_types[record.type] = [record.hrn]
396 print >>sys.stderr, "\r\n \r\n SLABDRIVER GetPeers \t NOT IN existing_hrns_by_types %s " %( existing_hrns_by_types)
399 print >>sys.stderr, "\r\n \r\n SLABDRIVER GetPeers \t INNN type %s hrn %s " %( record.type,record.hrn )
400 existing_hrns_by_types[record.type].append(record.hrn)
401 print >>sys.stderr, "\r\n \r\n SLABDRIVER GetPeers \t INNN existing_hrns_by_types %s " %( existing_hrns_by_types)
402 #existing_hrns_by_types.update({record.type:(existing_hrns_by_types[record.type].append(record.hrn))})
404 print >>sys.stderr, "\r\n \r\n SLABDRIVER GetPeers existing_hrns_by_types %s " %( existing_hrns_by_types)
408 print >>sys.stderr, "\r\n \r\n SLABDRIVER GetPeers existing_hrns_by_types['authority+sa'] %s \t\t existing_records %s " %(existing_hrns_by_types['authority'],existing_records)
410 records_list.append(existing_records[(peer_filter,'authority')])
412 for hrn in existing_hrns_by_types['authority']:
413 records_list.append(existing_records[(hrn,'authority')])
415 print >>sys.stderr, "\r\n \r\n SLABDRIVER GetPeers records_list %s " %(records_list)
420 return_records = records_list
421 if not peer_filter and not return_fields_list:
425 print >>sys.stderr, "\r\n \r\n SLABDRIVER GetPeers return_records %s " %(return_records)
426 return return_records
429 #TODO : Handling OR request in make_ldap_filters_from_records instead of the for loop
430 #over the records' list
431 def GetPersons(self, person_filter=None, return_fields_list=None):
433 person_filter should be a list of dictionnaries when not set to None.
434 Returns a list of users found.
437 print>>sys.stderr, "\r\n \r\n \t\t\t GetPersons person_filter %s" %(person_filter)
439 if person_filter and isinstance(person_filter,list):
440 #If we are looking for a list of users (list of dict records)
441 #Usually the list contains only one user record
442 for f in person_filter:
443 person = self.ldap.LdapFindUser(f)
444 person_list.append(person)
447 person_list = self.ldap.LdapFindUser()
452 def GetTimezone(self):
453 server_timestamp,server_tz = self.oar.parser.SendRequest("GET_timezone")
454 return server_timestamp,server_tz
457 def DeleteJobs(self, job_id, slice_hrn):
460 username = slice_hrn.split(".")[-1].rstrip("_slice")
462 reqdict['method'] = "delete"
463 reqdict['strval'] = str(job_id)
464 answer = self.oar.POSTRequestToOARRestAPI('DELETE_jobs_id',reqdict,username)
465 print>>sys.stderr, "\r\n \r\n jobid DeleteJobs %s " %(answer)
467 def GetJobsId(self, job_id, username = None ):
469 Details about a specific job.
470 Includes details about submission time, jot type, state, events,
471 owner, assigned ressources, walltime etc...
475 node_list_k = 'assigned_network_address'
476 #Get job info from OAR
477 job_info = self.oar.parser.SendRequest(req, job_id, username)
479 logger.debug("SLABDRIVER \t GetJobs %s " %(job_info))
481 if job_info['state'] == 'Terminated':
482 logger.debug("SLABDRIVER \t GetJobsId job %s TERMINATED"\
485 if job_info['state'] == 'Error':
486 logger.debug("SLABDRIVER \t GetJobsId ERROR message %s "\
491 logger.error("SLABDRIVER \tGetJobsId KeyError")
494 parsed_job_info = self.get_info_on_reserved_nodes(job_info,node_list_k)
495 #Replaces the previous entry "assigned_network_address" / "reserved_resources"
497 job_info.update({'node_ids':parsed_job_info[node_list_k]})
498 del job_info[node_list_k]
499 logger.debug(" \r\nSLABDRIVER \t GetJobsId job_info %s " %(job_info))
503 def GetJobsResources(self,job_id, return_fields_list=None, username = None):
504 #job_resources=['reserved_resources', 'assigned_resources','job_id', 'job_uri', 'assigned_nodes',\
506 #assigned_res = ['resource_id', 'resource_uri']
507 #assigned_n = ['node', 'node_uri']
509 req = "GET_jobs_id_resources"
510 node_list_k = 'reserved_resources'
512 #Get job info from OAR
513 job_info = self.oar.parser.SendRequest(req, job_id, username)
514 logger.debug("SLABDRIVER \t GetJobsResources %s " %(job_info))
516 parsed_job_info = self.get_info_on_reserved_nodes(job_info,node_list_k)
517 #Replaces the previous entry "assigned_network_address" / "reserved_resources"
519 job_info.update({'node_ids':parsed_job_info[node_list_k]})
520 del job_info[node_list_k]
524 def get_info_on_reserved_nodes(self,job_info,node_list_name):
525 #Get the list of the testbed nodes records and make a
526 #dictionnary keyed on the hostname out of it
527 node_list_dict = self.GetNodes()
528 #node_hostname_list = []
529 node_hostname_list = [node['hostname'] for node in node_list_dict]
530 #for node in node_list_dict:
531 #node_hostname_list.append(node['hostname'])
532 node_dict = dict(zip(node_hostname_list,node_list_dict))
534 reserved_node_hostname_list = []
535 for index in range(len(job_info[node_list_name])):
536 #job_info[node_list_name][k] =
537 reserved_node_hostname_list[index] = \
538 node_dict[job_info[node_list_name][index]]['hostname']
540 logger.debug("SLABDRIVER \t get_info_on_reserved_nodes \
541 reserved_node_hostname_list %s" \
542 %(reserved_node_hostname_list))
544 logger.error("SLABDRIVER \t get_info_on_reserved_nodes KEYERROR " )
546 return reserved_node_hostname_list
548 def GetReservedNodes(self):
549 # this function returns a list of all the nodes already involved in an oar job
550 #jobs=self.oar.parser.SendRequest("GET_reserved_nodes")
551 jobs=self.oar.parser.SendRequest("GET_jobs_details")
553 if jobs['total'] == 0:
557 nodes=j['assigned_network_address']+nodes
560 def GetNodes(self,node_filter_dict = None, return_fields_list = None):
562 node_filter_dict : dictionnary of lists
565 node_dict_by_id = self.oar.parser.SendRequest("GET_resources_full")
566 node_dict_list = node_dict_by_id.values()
568 #No filtering needed return the list directly
569 if not (node_filter_dict or return_fields_list):
570 return node_dict_list
572 return_node_list = []
574 for filter_key in node_filter_dict:
576 #Filter the node_dict_list by each value contained in the
577 #list node_filter_dict[filter_key]
578 for value in node_filter_dict[filter_key]:
579 for node in node_dict_list:
580 if node[filter_key] == value:
581 if return_fields_list :
583 for k in return_fields_list:
585 return_node_list.append(tmp)
587 return_node_list.append(node)
589 logger.log_exc("GetNodes KeyError")
593 return return_node_list
596 def GetSites(self, site_filter_name = None, return_fields_list = None):
597 site_dict = self.oar.parser.SendRequest("GET_sites")
598 #site_dict : dict where the key is the sit ename
599 return_site_list = []
600 if not ( site_filter_name or return_fields_list):
601 return_site_list = site_dict.values()
602 return return_site_list
604 if site_filter_name in site_dict:
605 if return_fields_list:
606 for field in return_fields_list:
610 tmp[field] = site_dict[site_filter_name][field]
612 logger.error("GetSites KeyError %s "%(field))
614 return_site_list.append(tmp)
616 return_site_list.append( site_dict[site_filter_name])
619 return return_site_list
622 def GetSlices(self, slice_filter = None, slice_filter_type = None, \
623 return_fields_list=None):
624 return_slice_list = []
627 authorized_filter_types_list = ['slice_hrn', 'record_id_user']
628 print>>sys.stderr,"\r\n SLABDRIVER \tGetSlices authorized_filter_types_list %s" %(authorized_filter_types_list)
629 if slice_filter_type in authorized_filter_types_list:
630 if slice_filter_type == 'slice_hrn':
631 slicerec = slab_dbsession.query(SliceSenslab).\
632 filter_by(slice_hrn = slice_filter).first()
634 if slice_filter_type == 'record_id_user':
635 slicerec = slab_dbsession.query(SliceSenslab).\
636 filter_by(record_id_user = slice_filter).first()
639 rec = slicerec.dump_sqlalchemyobj_to_dict()
640 print>>sys.stderr,"\r\n SLABDRIVER \tGetSlices rec %s" %(rec)
642 login = slicerec.slice_hrn.split(".")[1].split("_")[0]
643 logger.debug("\r\n SLABDRIVER \tGetSlices login %s slice record %s"\
645 if slicerec.oar_job_id is not -1:
646 #Check with OAR the status of the job if a job id is in
648 rslt = self.GetJobsId(slicerec.oar_job_id,username = login)
651 rec.update({'hrn':str(rec['slice_hrn'])})
652 #If GetJobsResources is empty, this means the job is now in the 'Terminated' state
653 #Update the slice record
655 self.db.update_job(slice_filter, job_id = -1)
656 rec['oar_job_id'] = -1
657 rec.update({'hrn':str(rec['slice_hrn'])})
660 rec['node_ids'] = rec['node_list']
664 #print >>sys.stderr, " \r\n \r\n \tSLABDRIVER.PY GetSlices rec %s" %(rec)
670 return_slice_list = slab_dbsession.query(SliceSenslab).all()
672 print >>sys.stderr, " \r\n \r\n \tSLABDRIVER.PY GetSlices slices %s slice_filter %s " %(return_slice_list,slice_filter)
674 #if return_fields_list:
675 #return_slice_list = parse_filter(sliceslist, slice_filter,'slice', return_fields_list)
679 return return_slice_list
684 def testbed_name (self): return "senslab2"
686 # 'geni_request_rspec_versions' and 'geni_ad_rspec_versions' are mandatory
687 def aggregate_version (self):
688 version_manager = VersionManager()
689 ad_rspec_versions = []
690 request_rspec_versions = []
691 for rspec_version in version_manager.versions:
692 if rspec_version.content_type in ['*', 'ad']:
693 ad_rspec_versions.append(rspec_version.to_dict())
694 if rspec_version.content_type in ['*', 'request']:
695 request_rspec_versions.append(rspec_version.to_dict())
697 'testbed':self.testbed_name(),
698 'geni_request_rspec_versions': request_rspec_versions,
699 'geni_ad_rspec_versions': ad_rspec_versions,
708 # Convert SFA fields to PLC fields for use when registering up updating
709 # registry record in the PLC database
711 # @param type type of record (user, slice, ...)
712 # @param hrn human readable name
713 # @param sfa_fields dictionary of SFA fields
714 # @param slab_fields dictionary of PLC fields (output)
716 def sfa_fields_to_slab_fields(self, type, hrn, record):
718 def convert_ints(tmpdict, int_fields):
719 for field in int_fields:
721 tmpdict[field] = int(tmpdict[field])
724 #for field in record:
725 # slab_record[field] = record[field]
728 #instantion used in get_slivers ?
729 if not "instantiation" in slab_record:
730 slab_record["instantiation"] = "senslab-instantiated"
731 slab_record["hrn"] = hrn_to_pl_slicename(hrn)
732 print >>sys.stderr, "\r\n \r\n \t SLABDRIVER.PY sfa_fields_to_slab_fields slab_record %s hrn_to_pl_slicename(hrn) hrn %s " %(slab_record['hrn'], hrn)
734 slab_record["url"] = record["url"]
735 if "description" in record:
736 slab_record["description"] = record["description"]
737 if "expires" in record:
738 slab_record["expires"] = int(record["expires"])
740 #nodes added by OAR only and then imported to SFA
741 #elif type == "node":
742 #if not "hostname" in slab_record:
743 #if not "hostname" in record:
744 #raise MissingSfaInfo("hostname")
745 #slab_record["hostname"] = record["hostname"]
746 #if not "model" in slab_record:
747 #slab_record["model"] = "geni"
750 #elif type == "authority":
751 #slab_record["login_base"] = hrn_to_slab_login_base(hrn)
753 #if not "name" in slab_record:
754 #slab_record["name"] = hrn
756 #if not "abbreviated_name" in slab_record:
757 #slab_record["abbreviated_name"] = hrn
759 #if not "enabled" in slab_record:
760 #slab_record["enabled"] = True
762 #if not "is_public" in slab_record:
763 #slab_record["is_public"] = True
768 def LaunchExperimentOnOAR(self, slice_dict, added_nodes, slice_user=None):
774 slice_name = slice_dict['name']
776 slot = slice_dict['timeslot']
777 print>>sys.stderr, "\r\n \r\n \t\tLaunchExperimentOnOAR slot %s " %(slot)
779 #Running on default parameters
780 #XP immediate , 10 mins
781 slot = {'date':None,'start_time':None, 'timezone':None,'duration':None }#10 min
784 reqdict['property'] ="network_address in ("
785 for node in added_nodes:
786 #Get the ID of the node : remove the root auth and put the site in a separate list
788 # NT: it's not clear for me if the nodenames will have the senslab prefix
789 # so lets take the last part only, for now.
791 #if s[0] == self.root_auth :
792 # Again here it's not clear if nodes will be prefixed with <site>_, lets split and tanke the last part for now.
793 s=lastpart.split("_")
795 reqdict['property'] += "'"+ nodeid +"', "
796 nodeid_list.append(nodeid)
797 #site_list.append( l[0] )
800 reqdict['property'] = reqdict['property'][0: len( reqdict['property'])-2] +")"
801 reqdict['resource'] ="network_address="+ str(len(nodeid_list))
804 walltime = slot['duration'].split(":")
805 # Fixing the walltime by adding a few delays. First put the walltime in seconds
806 # oarAdditionalDelay = 20; additional delay for /bin/sleep command to
807 # take in account prologue and epilogue scripts execution
808 # int walltimeAdditionalDelay = 120; additional delay
810 desired_walltime = int(walltime[0])*3600 + int(walltime[1]) * 60 + int(walltime[2])
811 total_walltime = desired_walltime + 140 #+2 min 20
812 sleep_walltime = desired_walltime + 20 #+20 sec
813 print>>sys.stderr, "\r\n \r\n \t\tLaunchExperimentOnOAR desired_walltime %s total_walltime %s sleep_walltime %s " %(desired_walltime,total_walltime,sleep_walltime)
814 #Put the walltime back in str form
816 walltime[0] = str(total_walltime / 3600)
817 total_walltime = total_walltime - 3600 * int(walltime[0])
818 #Get the remaining minutes
819 walltime[1] = str(total_walltime / 60)
820 total_walltime = total_walltime - 60 * int(walltime[1])
822 walltime[2] = str(total_walltime)
823 print>>sys.stderr, "\r\n \r\n \t\tLaunchExperimentOnOAR walltime %s " %(walltime)
825 reqdict['resource']+= ",walltime=" + str(walltime[0]) + ":" + str(walltime[1]) + ":" + str(walltime[2])
826 reqdict['script_path'] = "/bin/sleep " + str(sleep_walltime)
828 reqdict['resource']+= ",walltime=" + str(00) + ":" + str(12) + ":" + str(20) #+2 min 20
829 reqdict['script_path'] = "/bin/sleep 620" #+20 sec
830 #In case of a scheduled experiment (not immediate)
831 #To run an XP immediately, don't specify date and time in RSpec
832 #They will be set to None.
833 if slot['date'] and slot['start_time']:
834 if slot['timezone'] is '' or slot['timezone'] is None:
835 #assume it is server timezone
836 server_timestamp,server_tz = self.GetTimezone()
837 from_zone=tz.gettz(server_tz)
838 print>>sys.stderr, "\r\n \r\n \t\tLaunchExperimentOnOAR timezone not specified server_tz %s from_zone %s" %(server_tz,from_zone)
840 #Get zone of the user from the reservation time given in the rspec
841 from_zone = tz.gettz(slot['timezone'])
843 date = str(slot['date']) + " " + str(slot['start_time'])
844 user_datetime = datetime.strptime(date, self.time_format)
845 user_datetime = user_datetime.replace(tzinfo = from_zone)
849 utc_date = user_datetime.astimezone(to_zone)
850 #Readable time accpeted by OAR
851 reqdict['reservation']= utc_date.strftime(self.time_format)
853 print>>sys.stderr, "\r\n \r\n \t\tLaunchExperimentOnOAR reqdict['reservation'] %s " %(reqdict['reservation'])
857 # reservations are performed in the oar server timebase, so :
858 # 1- we get the server time(in UTC tz )/server timezone
859 # 2- convert the server UTC time in its timezone
860 # 3- add a custom delay to this time
861 # 4- convert this time to a readable form and it for the reservation request.
862 server_timestamp,server_tz = self.GetTimezone()
863 s_tz=tz.gettz(server_tz)
864 UTC_zone = tz.gettz("UTC")
865 #weird... datetime.fromtimestamp should work since we do from datetime import datetime
866 utc_server= datetime.fromtimestamp(float(server_timestamp)+20,UTC_zone)
867 server_localtime=utc_server.astimezone(s_tz)
869 print>>sys.stderr, "\r\n \r\n \t\tLaunchExperimentOnOAR server_timestamp %s server_tz %s slice_name %s added_nodes %s username %s reqdict %s " %(server_timestamp,server_tz,slice_name,added_nodes,slice_user, reqdict )
870 readable_time = server_localtime.strftime(self.time_format)
872 print >>sys.stderr," \r\n \r\n \t\t\t\tAPRES ParseTimezone readable_time %s timestanp %s " %(readable_time ,server_timestamp)
873 reqdict['reservation'] = readable_time
876 reqdict['type'] = "deploy"
877 reqdict['directory']= ""
878 reqdict['name']= "TestSandrine"
881 # first step : start the OAR job and update the job
882 print>>sys.stderr, "\r\n \r\n LaunchExperimentOnOAR reqdict %s \r\n site_list %s" %(reqdict,site_list)
884 answer = self.oar.POSTRequestToOARRestAPI('POST_job',reqdict,slice_user)
885 print>>sys.stderr, "\r\n \r\n LaunchExperimentOnOAR jobid %s " %(answer)
889 print>>sys.stderr, "\r\n AddSliceTonode Impossible to create job %s " %( answer)
892 print>>sys.stderr, "\r\n \r\n LaunchExperimentOnOAR jobid %s added_nodes %s slice_user %s" %(jobid,added_nodes,slice_user)
893 self.db.update_job( slice_name, jobid ,added_nodes)
896 # second step : configure the experiment
897 # we need to store the nodes in a yaml (well...) file like this :
898 # [1,56,23,14,45,75] with name /tmp/sfa<jobid>.json
899 f=open('/tmp/sfa/'+str(jobid)+'.json','w')
901 f.write(str(added_nodes[0].strip('node')))
902 for node in added_nodes[1:len(added_nodes)] :
903 f.write(','+node.strip('node'))
907 # third step : call the senslab-experiment wrapper
908 #command= "java -jar target/sfa-1.0-jar-with-dependencies.jar "+str(jobid)+" "+slice_user
909 javacmdline="/usr/bin/java"
910 jarname="/opt/senslabexperimentwrapper/sfa-1.0-jar-with-dependencies.jar"
911 #ret=subprocess.check_output(["/usr/bin/java", "-jar", ", str(jobid), slice_user])
912 output = subprocess.Popen([javacmdline, "-jar", jarname, str(jobid), slice_user],stdout=subprocess.PIPE).communicate()[0]
914 print>>sys.stderr, "\r\n \r\n LaunchExperimentOnOAR wrapper returns %s " %(output)
918 #Delete the jobs and updates the job id in the senslab table
920 #Does not clear the node list
921 def DeleteSliceFromNodes(self, slice_record):
922 # Get user information
924 self.DeleteJobs(slice_record['oar_job_id'], slice_record['hrn'])
925 self.db.update_job(slice_record['hrn'], job_id = -1)
932 def augment_records_with_testbed_info (self, sfa_records):
933 return self.fill_record_info (sfa_records)
935 def fill_record_info(self, record_list):
937 Given a SFA record, fill in the senslab specific and SFA specific
938 fields in the record.
941 logger.debug("SLABDRIVER \tfill_record_info records %s " %(record_list))
942 if not isinstance(record_list, list):
943 record_list = [record_list]
946 for record in record_list:
947 #If the record is a SFA slice record, then add information
948 #about the user of this slice. This kind of information is in the
950 if str(record['type']) == 'slice':
951 #Get slab slice record.
952 recslice = self.GetSlices(slice_filter = \
954 slice_filter_type = 'slice_hrn')
955 recuser = dbsession.query(RegRecord).filter_by(record_id = \
956 recslice['record_id_user']).first()
957 logger.debug( "SLABDRIVER.PY \t fill_record_info SLICE \
958 rec %s \r\n \r\n" %(recslice))
959 record.update({'PI':[recuser.hrn],
960 'researcher': [recuser.hrn],
961 'name':record['hrn'],
962 'oar_job_id':recslice['oar_job_id'],
964 'person_ids':[recslice['record_id_user']],
965 'geni_urn':'', #For client_helper.py compatibility
966 'keys':'', #For client_helper.py compatibility
967 'key_ids':''}) #For client_helper.py compatibility
969 elif str(record['type']) == 'user':
970 #The record is a SFA user record.
971 #Get the information about his slice from Senslab's DB
972 #and add it to the user record.
973 recslice = self.GetSlices(slice_filter = \
974 record['record_id'],\
975 slice_filter_type = 'record_id_user')
977 logger.debug( "SLABDRIVER.PY \t fill_record_info user \
978 rec %s \r\n \r\n" %(recslice))
979 #Append slice record in records list,
980 #therefore fetches user and slice info again(one more loop)
981 #Will update PIs and researcher for the slice
982 recuser = dbsession.query(RegRecord).filter_by(record_id = \
983 recslice['record_id_user']).first()
984 recslice.update({'PI':[recuser.hrn],
985 'researcher': [recuser.hrn],
986 'name':record['hrn'],
987 'oar_job_id':recslice['oar_job_id'],
989 'person_ids':[recslice['record_id_user']]})
991 #GetPersons takes [] as filters
992 #user_slab = self.GetPersons([{'hrn':recuser.hrn}])
993 user_slab = self.GetPersons([record])
995 recslice.update({'type':'slice','hrn':recslice['slice_hrn']})
996 record.update(user_slab[0])
997 #For client_helper.py compatibility
998 record.update( { 'geni_urn':'',
1001 record_list.append(recslice)
1003 logger.debug("SLABDRIVER.PY \tfill_record_info ADDING SLICE\
1004 INFO TO USER records %s" %(record_list))
1008 logger.log_exc("SLABDRIVER \t fill_record_info EXCEPTION %s" %(e))
1012 #self.fill_record_slab_info(records)
1013 ##print >>sys.stderr, "\r\n \t\t after fill_record_slab_info %s" %(records)
1014 #self.fill_record_sfa_info(records)
1015 #print >>sys.stderr, "\r\n \t\t after fill_record_sfa_info"
1021 #def update_membership_list(self, oldRecord, record, listName, addFunc, delFunc):
1022 ## get a list of the HRNs tht are members of the old and new records
1024 #oldList = oldRecord.get(listName, [])
1027 #newList = record.get(listName, [])
1029 ## if the lists are the same, then we don't have to update anything
1030 #if (oldList == newList):
1033 ## build a list of the new person ids, by looking up each person to get
1037 #records = table.find({'type': 'user', 'hrn': newList})
1038 #for rec in records:
1039 #newIdList.append(rec['pointer'])
1041 ## build a list of the old person ids from the person_ids field
1043 #oldIdList = oldRecord.get("person_ids", [])
1044 #containerId = oldRecord.get_pointer()
1046 ## if oldRecord==None, then we are doing a Register, instead of an
1049 #containerId = record.get_pointer()
1051 ## add people who are in the new list, but not the oldList
1052 #for personId in newIdList:
1053 #if not (personId in oldIdList):
1054 #addFunc(self.plauth, personId, containerId)
1056 ## remove people who are in the old list, but not the new list
1057 #for personId in oldIdList:
1058 #if not (personId in newIdList):
1059 #delFunc(self.plauth, personId, containerId)
1061 #def update_membership(self, oldRecord, record):
1062 #print >>sys.stderr, " \r\n \r\n ***SLABDRIVER.PY update_membership record ", record
1063 #if record.type == "slice":
1064 #self.update_membership_list(oldRecord, record, 'researcher',
1065 #self.users.AddPersonToSlice,
1066 #self.users.DeletePersonFromSlice)
1067 #elif record.type == "authority":
1072 # I don't think you plan on running a component manager at this point
1073 # let me clean up the mess of ComponentAPI that is deprecated anyways