4 from datetime import datetime
5 from dateutil import tz
6 from time import strftime,gmtime
8 from sfa.util.faults import MissingSfaInfo , SliverDoesNotExist
9 from sfa.util.sfalogging import logger
10 from sfa.util.defaultdict import defaultdict
12 from sfa.storage.record import Record
13 from sfa.storage.alchemy import dbsession
14 from sfa.storage.model import RegRecord
16 from sfa.trust.credential import Credential
17 from sfa.trust.gid import GID
19 from sfa.managers.driver import Driver
20 from sfa.rspecs.version_manager import VersionManager
21 from sfa.rspecs.rspec import RSpec
23 from sfa.util.xrn import hrn_to_urn, urn_to_sliver_id
24 from sfa.util.plxrn import slicename_to_hrn, hostname_to_hrn, hrn_to_pl_slicename
26 ## thierry: everything that is API-related (i.e. handling incoming requests)
28 # SlabDriver should be really only about talking to the senslab testbed
31 from sfa.senslab.OARrestapi import OARrestapi
32 from sfa.senslab.LDAPapi import LDAPapi
34 from sfa.senslab.slabpostgres import SlabDB, slab_dbsession,SliceSenslab
35 from sfa.senslab.slabaggregate import SlabAggregate
36 from sfa.senslab.slabslices import SlabSlices
41 # this inheritance scheme is so that the driver object can receive
42 # GetNodes or GetSites sorts of calls directly
43 # and thus minimize the differences in the managers with the pl version
44 class SlabDriver(Driver):
46 def __init__(self, config):
47 Driver.__init__ (self, config)
49 self.hrn = config.SFA_INTERFACE_HRN
51 self.root_auth = config.SFA_REGISTRY_ROOT_AUTH
53 self.oar = OARrestapi()
55 self.time_format = "%Y-%m-%d %H:%M:%S"
56 self.db = SlabDB(config)
60 def sliver_status(self,slice_urn,slice_hrn):
61 """Receive a status request for slice named urn/hrn
62 urn:publicid:IDN+senslab+nturro_slice hrn senslab.nturro_slice
63 shall return a structure as described in
64 http://groups.geni.net/geni/wiki/GAPI_AM_API_V2#SliverStatus
65 NT : not sure if we should implement this or not, but used by sface.
69 #First get the slice with the slice hrn
70 sl = self.GetSlices(slice_filter = slice_hrn, slice_filter_type = 'slice_hrn')
72 raise SliverDoesNotExist("%s slice_hrn" % (slice_hrn))
74 nodes_in_slice = sl['node_ids']
75 if len(nodes_in_slice) is 0:
76 raise SliverDoesNotExist("No slivers allocated ")
78 logger.debug("Slabdriver - sliver_status Sliver status urn %s hrn %s sl\
79 %s \r\n " %(slice_urn,slice_hrn,sl) )
81 if sl['oar_job_id'] is not -1:
82 #A job is running on Senslab for this slice
83 # report about the local nodes that are in the slice only
85 nodes_all = self.GetNodes({'hostname':nodes_in_slice},
86 ['node_id', 'hostname','site','boot_state'])
87 nodeall_byhostname = dict([(n['hostname'], n) for n in nodes_all])
91 top_level_status = 'unknown'
93 top_level_status = 'ready'
94 result['geni_urn'] = slice_urn
95 result['pl_login'] = sl['job_user'] #For compatibility
98 timestamp = float(sl['startTime']) + float(sl['walltime'])
99 result['pl_expires'] = strftime(self.time_format, \
100 gmtime(float(timestamp)))
101 #result['slab_expires'] = strftime(self.time_format,\
102 #gmtime(float(timestamp)))
107 #res['slab_hostname'] = node['hostname']
108 #res['slab_boot_state'] = node['boot_state']
110 res['pl_hostname'] = nodeall_byhostname[node]['hostname']
111 res['pl_boot_state'] = nodeall_byhostname[node]['boot_state']
112 res['pl_last_contact'] = strftime(self.time_format, \
113 gmtime(float(timestamp)))
114 sliver_id = urn_to_sliver_id(slice_urn, sl['record_id_slice'], \
115 nodeall_byhostname[node]['node_id'])
116 res['geni_urn'] = sliver_id
117 if nodeall_byhostname[node]['boot_state'] == 'Alive':
119 res['geni_status'] = 'ready'
121 res['geni_status'] = 'failed'
122 top_level_status = 'failed'
124 res['geni_error'] = ''
126 resources.append(res)
128 result['geni_status'] = top_level_status
129 result['geni_resources'] = resources
130 print >>sys.stderr, "\r\n \r\n_____________ Sliver status resources %s res %s \r\n " %(resources,res)
134 def create_sliver (self, slice_urn, slice_hrn, creds, rspec_string, users, options):
135 print>>sys.stderr, "\r\n \r\n \t=============================== SLABDRIVER.PY create_sliver "
136 aggregate = SlabAggregate(self)
138 slices = SlabSlices(self)
139 peer = slices.get_peer(slice_hrn)
140 sfa_peer = slices.get_sfa_peer(slice_hrn)
143 if not isinstance(creds, list):
147 slice_record = users[0].get('slice_record', {})
150 rspec = RSpec(rspec_string)
151 print>>sys.stderr, "\r\n \r\n \t=============================== SLABDRIVER.PY create_sliver ============================rspec.version %s " %(rspec.version)
154 # ensure site record exists?
155 # ensure slice record exists
156 slice = slices.verify_slice(slice_hrn, slice_record, peer, sfa_peer, options=options)
157 requested_attributes = rspec.version.get_slice_attributes()
159 if requested_attributes:
160 for attrib_dict in requested_attributes:
161 if 'timeslot' in attrib_dict and attrib_dict['timeslot'] is not None:
162 slice.update({'timeslot':attrib_dict['timeslot']})
163 print >>sys.stderr, "\r\n \r\n \t=============================== SLABDRIVER.PY create_sliver ..... slice %s " %(slice)
164 # ensure person records exists
165 persons = slices.verify_persons(slice_hrn, slice, users, peer, sfa_peer, options=options)
166 # ensure slice attributes exists?
169 # add/remove slice from nodes
170 print >>sys.stderr, "\r\n \r\n \t=============================== SLABDRIVER.PY create_sliver ..... "
172 requested_slivers = [node.get('component_name') for node in rspec.version.get_nodes_with_slivers()]
173 print >>sys.stderr, "\r\n \r\n \t=============================== ........... requested_slivers ============================requested_slivers %s " %(requested_slivers)
174 nodes = slices.verify_slice_nodes(slice, requested_slivers, peer)
177 return aggregate.get_rspec(slice_xrn=slice_urn, version=rspec.version)
180 def delete_sliver (self, slice_urn, slice_hrn, creds, options):
182 slice = self.GetSlices(slice_filter= slice_hrn, slice_filter_type = 'slice_hrn')
183 print>>sys.stderr, "\r\n \r\n \t\t SLABDRIVER.PY delete_sliver slice %s" %(slice)
187 slices = SlabSlices(self)
188 # determine if this is a peer slice
189 # xxx I wonder if this would not need to use PlSlices.get_peer instead
190 # in which case plc.peers could be deprecated as this here
191 # is the only/last call to this last method in plc.peers
192 peer = slices.get_peer(slice_hrn)
195 self.UnBindObjectFromPeer('slice', slice['record_id_slice'], peer)
196 self.DeleteSliceFromNodes(slice)
199 self.BindObjectToPeer('slice', slice['slice_id'], peer, slice['peer_slice_id'])
203 def AddSlice(self, slice_record):
204 slab_slice = SliceSenslab( slice_hrn = slice_record['slice_hrn'], record_id_slice= slice_record['record_id_slice'] , record_id_user= slice_record['record_id_user'], peer_authority = slice_record['peer_authority'])
205 print>>sys.stderr, "\r\n \r\n \t\t\t =======SLABDRIVER.PY AddSlice slice_record %s slab_slice %s" %(slice_record,slab_slice)
206 slab_dbsession.add(slab_slice)
207 slab_dbsession.commit()
210 # first 2 args are None in case of resource discovery
211 def list_resources (self, slice_urn, slice_hrn, creds, options):
212 #cached_requested = options.get('cached', True)
214 version_manager = VersionManager()
215 # get the rspec's return format from options
216 rspec_version = version_manager.get_version(options.get('geni_rspec_version'))
217 version_string = "rspec_%s" % (rspec_version)
219 #panos adding the info option to the caching key (can be improved)
220 if options.get('info'):
221 version_string = version_string + "_"+options.get('info', 'default')
223 # look in cache first
224 #if cached_requested and self.cache and not slice_hrn:
225 #rspec = self.cache.get(version_string)
227 #logger.debug("SlabDriver.ListResources: returning cached advertisement")
230 #panos: passing user-defined options
232 aggregate = SlabAggregate(self)
233 origin_hrn = Credential(string=creds[0]).get_gid_caller().get_hrn()
234 options.update({'origin_hrn':origin_hrn})
235 rspec = aggregate.get_rspec(slice_xrn=slice_urn, version=rspec_version,
237 print>>sys.stderr, " \r\n \r\n \t SLABDRIVER list_resources rspec "
239 #if self.cache and not slice_hrn:
240 #logger.debug("Slab.ListResources: stores advertisement in cache")
241 #self.cache.add(version_string, rspec)
246 def list_slices (self, creds, options):
247 # look in cache first
249 #slices = self.cache.get('slices')
251 #logger.debug("PlDriver.list_slices returns from cache")
255 print>>sys.stderr, " \r\n \t\t SLABDRIVER.PY list_slices"
256 slices = self.GetSlices()
257 slice_hrns = [slicename_to_hrn(self.hrn, slice['slice_hrn']) for slice in slices]
258 slice_urns = [hrn_to_urn(slice_hrn, 'slice') for slice_hrn in slice_hrns]
262 #logger.debug ("SlabDriver.list_slices stores value in cache")
263 #self.cache.add('slices', slice_urns)
267 #No site or node register supported
268 def register (self, sfa_record, hrn, pub_key):
269 type = sfa_record['type']
270 slab_record = self.sfa_fields_to_slab_fields(type, hrn, sfa_record)
274 acceptable_fields=['url', 'instantiation', 'name', 'description']
275 for key in slab_record.keys():
276 if key not in acceptable_fields:
278 print>>sys.stderr, " \r\n \t\t SLABDRIVER.PY register"
279 slices = self.GetSlices(slice_filter =slab_record['hrn'], slice_filter_type = 'slice_hrn')
281 pointer = self.AddSlice(slab_record)
283 pointer = slices[0]['slice_id']
286 persons = self.GetPersons([sfa_record['hrn']])
288 pointer = self.AddPerson(dict(sfa_record))
291 pointer = persons[0]['person_id']
293 #Does this make sense to senslab ?
294 #if 'enabled' in sfa_record and sfa_record['enabled']:
295 #self.UpdatePerson(pointer, {'enabled': sfa_record['enabled']})
297 # add this person to the site only if she is being added for the first
298 # time by sfa and doesont already exist in plc
299 if not persons or not persons[0]['site_ids']:
300 login_base = get_leaf(sfa_record['authority'])
301 self.AddPersonToSite(pointer, login_base)
303 # What roles should this user have?
304 self.AddRoleToPerson('user', pointer)
307 self.AddPersonKey(pointer, {'key_type' : 'ssh', 'key' : pub_key})
309 #No node adding outside OAR
313 #No site or node record update allowed
314 def update (self, old_sfa_record, new_sfa_record, hrn, new_key):
315 pointer = old_sfa_record['pointer']
316 type = old_sfa_record['type']
318 # new_key implemented for users only
319 if new_key and type not in [ 'user' ]:
320 raise UnknownSfaType(type)
322 #if (type == "authority"):
323 #self.shell.UpdateSite(pointer, new_sfa_record)
326 slab_record=self.sfa_fields_to_slab_fields(type, hrn, new_sfa_record)
327 if 'name' in slab_record:
328 slab_record.pop('name')
329 self.UpdateSlice(pointer, slab_record)
333 all_fields = new_sfa_record
334 for key in all_fields.keys():
335 if key in ['first_name', 'last_name', 'title', 'email',
336 'password', 'phone', 'url', 'bio', 'accepted_aup',
338 update_fields[key] = all_fields[key]
339 self.UpdatePerson(pointer, update_fields)
342 # must check this key against the previous one if it exists
343 persons = self.GetPersons([pointer], ['key_ids'])
345 keys = person['key_ids']
346 keys = self.GetKeys(person['key_ids'])
348 # Delete all stale keys
351 if new_key != key['key']:
352 self.DeleteKey(key['key_id'])
356 self.AddPersonKey(pointer, {'key_type': 'ssh', 'key': new_key})
362 def remove (self, sfa_record):
363 type=sfa_record['type']
364 hrn=sfa_record['hrn']
365 record_id= sfa_record['record_id']
367 username = hrn.split(".")[len(hrn.split(".")) -1]
369 persons = self.GetPersons(username)
370 # only delete this person if he has site ids. if he doesnt, it probably means
371 # he was just removed from a site, not actually deleted
372 if persons and persons[0]['site_ids']:
373 self.DeletePerson(username)
374 elif type == 'slice':
375 if self.GetSlices(slice_filter = hrn, slice_filter_type = 'slice_hrn'):
376 self.DeleteSlice(hrn)
378 #elif type == 'authority':
379 #if self.GetSites(pointer):
380 #self.DeleteSite(pointer)
384 def GetPeers (self,auth = None, peer_filter=None, return_fields_list=None):
386 existing_records = {}
387 existing_hrns_by_types= {}
388 print >>sys.stderr, "\r\n \r\n SLABDRIVER GetPeers auth = %s, peer_filter %s, return_field %s " %(auth , peer_filter, return_fields_list)
389 all_records = dbsession.query(RegRecord).filter(RegRecord.type.like('%authority%')).all()
390 for record in all_records:
391 existing_records[(record.hrn,record.type)] = record
392 if record.type not in existing_hrns_by_types:
393 existing_hrns_by_types[record.type] = [record.hrn]
394 print >>sys.stderr, "\r\n \r\n SLABDRIVER GetPeers \t NOT IN existing_hrns_by_types %s " %( existing_hrns_by_types)
397 print >>sys.stderr, "\r\n \r\n SLABDRIVER GetPeers \t INNN type %s hrn %s " %( record.type,record.hrn )
398 existing_hrns_by_types[record.type].append(record.hrn)
399 print >>sys.stderr, "\r\n \r\n SLABDRIVER GetPeers \t INNN existing_hrns_by_types %s " %( existing_hrns_by_types)
400 #existing_hrns_by_types.update({record.type:(existing_hrns_by_types[record.type].append(record.hrn))})
402 print >>sys.stderr, "\r\n \r\n SLABDRIVER GetPeers existing_hrns_by_types %s " %( existing_hrns_by_types)
406 print >>sys.stderr, "\r\n \r\n SLABDRIVER GetPeers existing_hrns_by_types['authority+sa'] %s \t\t existing_records %s " %(existing_hrns_by_types['authority'],existing_records)
408 records_list.append(existing_records[(peer_filter,'authority')])
410 for hrn in existing_hrns_by_types['authority']:
411 records_list.append(existing_records[(hrn,'authority')])
413 print >>sys.stderr, "\r\n \r\n SLABDRIVER GetPeers records_list %s " %(records_list)
418 return_records = records_list
419 if not peer_filter and not return_fields_list:
423 print >>sys.stderr, "\r\n \r\n SLABDRIVER GetPeers return_records %s " %(return_records)
424 return return_records
427 #TODO : Handling OR request in make_ldap_filters_from_records instead of the for loop
428 #over the records' list
429 def GetPersons(self, person_filter=None, return_fields_list=None):
431 person_filter should be a list of dictionnaries when not set to None.
432 Returns a list of users found.
435 print>>sys.stderr, "\r\n \r\n \t\t\t GetPersons person_filter %s" %(person_filter)
437 if person_filter and isinstance(person_filter,list):
438 #If we are looking for a list of users (list of dict records)
439 #Usually the list contains only one user record
440 for f in person_filter:
441 person = self.ldap.LdapFindUser(f)
442 person_list.append(person)
445 person_list = self.ldap.LdapFindUser()
450 def GetTimezone(self):
451 server_timestamp,server_tz = self.oar.parser.SendRequest("GET_timezone")
452 return server_timestamp,server_tz
455 def DeleteJobs(self, job_id, slice_hrn):
458 username = slice_hrn.split(".")[-1].rstrip("_slice")
460 reqdict['method'] = "delete"
461 reqdict['strval'] = str(job_id)
462 answer = self.oar.POSTRequestToOARRestAPI('DELETE_jobs_id',reqdict,username)
463 print>>sys.stderr, "\r\n \r\n jobid DeleteJobs %s " %(answer)
465 def GetJobsId(self, job_id, username = None ):
467 Details about a specific job.
468 Includes details about submission time, jot type, state, events,
469 owner, assigned ressources, walltime etc...
473 node_list_k = 'assigned_network_address'
474 #Get job info from OAR
475 job_info = self.oar.parser.SendRequest(req, job_id, username)
477 logger.debug("SLABDRIVER \t GetJobs %s " %(job_info))
479 if job_info['state'] == 'Terminated':
480 logger.debug("SLABDRIVER \t GetJobsId job %s TERMINATED"\
483 if job_info['state'] == 'Error':
484 logger.debug("SLABDRIVER \t GetJobsId ERROR message %s "\
489 logger.error("SLABDRIVER \tGetJobsId KeyError")
492 parsed_job_info = self.get_info_on_reserved_nodes(job_info,node_list_k)
493 #Replaces the previous entry "assigned_network_address" / "reserved_resources"
495 job_info.update({'node_ids':parsed_job_info[node_list_k]})
496 del job_info[node_list_k]
497 logger.debug(" \r\nSLABDRIVER \t GetJobsId job_info %s " %(job_info))
501 def GetJobsResources(self,job_id, return_fields_list=None, username = None):
502 #job_resources=['reserved_resources', 'assigned_resources','job_id', 'job_uri', 'assigned_nodes',\
504 #assigned_res = ['resource_id', 'resource_uri']
505 #assigned_n = ['node', 'node_uri']
507 req = "GET_jobs_id_resources"
508 node_list_k = 'reserved_resources'
510 #Get job info from OAR
511 job_info = self.oar.parser.SendRequest(req, job_id, username)
512 logger.debug("SLABDRIVER \t GetJobsResources %s " %(job_info))
514 parsed_job_info = self.get_info_on_reserved_nodes(job_info,node_list_k)
515 #Replaces the previous entry "assigned_network_address" / "reserved_resources"
517 job_info.update({'node_ids':parsed_job_info[node_list_k]})
518 del job_info[node_list_k]
522 def get_info_on_reserved_nodes(self,job_info,node_list_name):
523 #Get the list of the testbed nodes records and make a
524 #dictionnary keyed on the hostname out of it
525 node_list_dict = self.GetNodes()
526 #node_hostname_list = []
527 node_hostname_list = [node['hostname'] for node in node_list_dict]
528 #for node in node_list_dict:
529 #node_hostname_list.append(node['hostname'])
530 node_dict = dict(zip(node_hostname_list,node_list_dict))
532 reserved_node_hostname_list = []
533 for index in range(len(job_info[node_list_name])):
534 #job_info[node_list_name][k] =
535 reserved_node_hostname_list[index] = \
536 node_dict[job_info[node_list_name][index]]['hostname']
538 logger.debug("SLABDRIVER \t get_info_on_reserved_nodes \
539 reserved_node_hostname_list %s" \
540 %(reserved_node_hostname_list))
542 logger.error("SLABDRIVER \t get_info_on_reserved_nodes KEYERROR " )
544 return reserved_node_hostname_list
546 def GetReservedNodes(self):
547 # this function returns a list of all the nodes already involved in an oar job
548 #jobs=self.oar.parser.SendRequest("GET_reserved_nodes")
549 jobs=self.oar.parser.SendRequest("GET_jobs_details")
552 nodes=j['assigned_network_address']+nodes
555 def GetNodes(self,node_filter_dict = None, return_fields_list = None):
557 node_filter_dict : dictionnary of lists
560 node_dict_by_id = self.oar.parser.SendRequest("GET_resources_full")
561 node_dict_list = node_dict_by_id.values()
563 #No filtering needed return the list directly
564 if not (node_filter_dict or return_fields_list):
565 return node_dict_list
567 return_node_list = []
569 for filter_key in node_filter_dict:
571 #Filter the node_dict_list by each value contained in the
572 #list node_filter_dict[filter_key]
573 for value in node_filter_dict[filter_key]:
574 for node in node_dict_list:
575 if node[filter_key] == value:
576 if return_fields_list :
578 for k in return_fields_list:
580 return_node_list.append(tmp)
582 return_node_list.append(node)
584 logger.log_exc("GetNodes KeyError")
588 return return_node_list
591 def GetSites(self, site_filter_name = None, return_fields_list = None):
592 site_dict = self.oar.parser.SendRequest("GET_sites")
593 #site_dict : dict where the key is the sit ename
594 return_site_list = []
595 if not ( site_filter_name or return_fields_list):
596 return_site_list = site_dict.values()
597 return return_site_list
599 if site_filter_name in site_dict:
600 if return_fields_list:
601 for field in return_fields_list:
605 tmp[field] = site_dict[site_filter_name][field]
607 logger.error("GetSites KeyError %s "%(field))
609 return_site_list.append(tmp)
611 return_site_list.append( site_dict[site_filter_name])
614 return return_site_list
617 def GetSlices(self, slice_filter = None, slice_filter_type = None, \
618 return_fields_list=None):
619 return_slice_list = []
622 authorized_filter_types_list = ['slice_hrn', 'record_id_user']
623 print>>sys.stderr,"\r\n SLABDRIVER \tGetSlices authorized_filter_types_list %s" %(authorized_filter_types_list)
624 if slice_filter_type in authorized_filter_types_list:
625 if slice_filter_type == 'slice_hrn':
626 slicerec = slab_dbsession.query(SliceSenslab).\
627 filter_by(slice_hrn = slice_filter).first()
629 if slice_filter_type == 'record_id_user':
630 slicerec = slab_dbsession.query(SliceSenslab).\
631 filter_by(record_id_user = slice_filter).first()
634 rec = slicerec.dump_sqlalchemyobj_to_dict()
635 print>>sys.stderr,"\r\n SLABDRIVER \tGetSlices rec %s" %(rec)
637 login = slicerec.slice_hrn.split(".")[1].split("_")[0]
638 logger.debug("\r\n SLABDRIVER \tGetSlices login %s slice record %s"\
640 if slicerec.oar_job_id is not -1:
641 #Check with OAR the status of the job if a job id is in
643 rslt = self.GetJobsId(slicerec.oar_job_id,username = login)
646 rec.update({'hrn':str(rec['slice_hrn'])})
647 #If GetJobsResources is empty, this means the job is now in the 'Terminated' state
648 #Update the slice record
650 self.db.update_job(slice_filter, job_id = -1)
651 rec['oar_job_id'] = -1
652 rec.update({'hrn':str(rec['slice_hrn'])})
655 rec['node_ids'] = rec['node_list']
659 #print >>sys.stderr, " \r\n \r\n \tSLABDRIVER.PY GetSlices rec %s" %(rec)
665 return_slice_list = slab_dbsession.query(SliceSenslab).all()
667 print >>sys.stderr, " \r\n \r\n \tSLABDRIVER.PY GetSlices slices %s slice_filter %s " %(return_slice_list,slice_filter)
669 #if return_fields_list:
670 #return_slice_list = parse_filter(sliceslist, slice_filter,'slice', return_fields_list)
674 return return_slice_list
679 def testbed_name (self): return "senslab2"
681 # 'geni_request_rspec_versions' and 'geni_ad_rspec_versions' are mandatory
682 def aggregate_version (self):
683 version_manager = VersionManager()
684 ad_rspec_versions = []
685 request_rspec_versions = []
686 for rspec_version in version_manager.versions:
687 if rspec_version.content_type in ['*', 'ad']:
688 ad_rspec_versions.append(rspec_version.to_dict())
689 if rspec_version.content_type in ['*', 'request']:
690 request_rspec_versions.append(rspec_version.to_dict())
692 'testbed':self.testbed_name(),
693 'geni_request_rspec_versions': request_rspec_versions,
694 'geni_ad_rspec_versions': ad_rspec_versions,
703 # Convert SFA fields to PLC fields for use when registering up updating
704 # registry record in the PLC database
706 # @param type type of record (user, slice, ...)
707 # @param hrn human readable name
708 # @param sfa_fields dictionary of SFA fields
709 # @param slab_fields dictionary of PLC fields (output)
711 def sfa_fields_to_slab_fields(self, type, hrn, record):
713 def convert_ints(tmpdict, int_fields):
714 for field in int_fields:
716 tmpdict[field] = int(tmpdict[field])
719 #for field in record:
720 # slab_record[field] = record[field]
723 #instantion used in get_slivers ?
724 if not "instantiation" in slab_record:
725 slab_record["instantiation"] = "senslab-instantiated"
726 slab_record["hrn"] = hrn_to_pl_slicename(hrn)
727 print >>sys.stderr, "\r\n \r\n \t SLABDRIVER.PY sfa_fields_to_slab_fields slab_record %s hrn_to_pl_slicename(hrn) hrn %s " %(slab_record['hrn'], hrn)
729 slab_record["url"] = record["url"]
730 if "description" in record:
731 slab_record["description"] = record["description"]
732 if "expires" in record:
733 slab_record["expires"] = int(record["expires"])
735 #nodes added by OAR only and then imported to SFA
736 #elif type == "node":
737 #if not "hostname" in slab_record:
738 #if not "hostname" in record:
739 #raise MissingSfaInfo("hostname")
740 #slab_record["hostname"] = record["hostname"]
741 #if not "model" in slab_record:
742 #slab_record["model"] = "geni"
745 #elif type == "authority":
746 #slab_record["login_base"] = hrn_to_slab_login_base(hrn)
748 #if not "name" in slab_record:
749 #slab_record["name"] = hrn
751 #if not "abbreviated_name" in slab_record:
752 #slab_record["abbreviated_name"] = hrn
754 #if not "enabled" in slab_record:
755 #slab_record["enabled"] = True
757 #if not "is_public" in slab_record:
758 #slab_record["is_public"] = True
763 def LaunchExperimentOnOAR(self, slice_dict, added_nodes, slice_user=None):
769 slice_name = slice_dict['name']
771 slot = slice_dict['timeslot']
772 print>>sys.stderr, "\r\n \r\n \t\tLaunchExperimentOnOAR slot %s " %(slot)
774 #Running on default parameters
775 #XP immediate , 10 mins
776 slot = {'date':None,'start_time':None, 'timezone':None,'duration':None }#10 min
779 reqdict['property'] ="network_address in ("
780 for node in added_nodes:
781 #Get the ID of the node : remove the root auth and put the site in a separate list
783 # NT: it's not clear for me if the nodenames will have the senslab prefix
784 # so lets take the last part only, for now.
786 #if s[0] == self.root_auth :
787 # Again here it's not clear if nodes will be prefixed with <site>_, lets split and tanke the last part for now.
788 s=lastpart.split("_")
790 reqdict['property'] += "'"+ nodeid +"', "
791 nodeid_list.append(nodeid)
792 #site_list.append( l[0] )
795 reqdict['property'] = reqdict['property'][0: len( reqdict['property'])-2] +")"
796 reqdict['resource'] ="network_address="+ str(len(nodeid_list))
799 walltime = slot['duration'].split(":")
800 # Fixing the walltime by adding a few delays. First put the walltime in seconds
801 # oarAdditionalDelay = 20; additional delay for /bin/sleep command to
802 # take in account prologue and epilogue scripts execution
803 # int walltimeAdditionalDelay = 120; additional delay
805 desired_walltime = int(walltime[0])*3600 + int(walltime[1]) * 60 + int(walltime[2])
806 total_walltime = desired_walltime + 140 #+2 min 20
807 sleep_walltime = desired_walltime + 20 #+20 sec
808 print>>sys.stderr, "\r\n \r\n \t\tLaunchExperimentOnOAR desired_walltime %s total_walltime %s sleep_walltime %s " %(desired_walltime,total_walltime,sleep_walltime)
809 #Put the walltime back in str form
811 walltime[0] = str(total_walltime / 3600)
812 total_walltime = total_walltime - 3600 * int(walltime[0])
813 #Get the remaining minutes
814 walltime[1] = str(total_walltime / 60)
815 total_walltime = total_walltime - 60 * int(walltime[1])
817 walltime[2] = str(total_walltime)
818 print>>sys.stderr, "\r\n \r\n \t\tLaunchExperimentOnOAR walltime %s " %(walltime)
820 reqdict['resource']+= ",walltime=" + str(walltime[0]) + ":" + str(walltime[1]) + ":" + str(walltime[2])
821 reqdict['script_path'] = "/bin/sleep " + str(sleep_walltime)
823 reqdict['resource']+= ",walltime=" + str(00) + ":" + str(12) + ":" + str(20) #+2 min 20
824 reqdict['script_path'] = "/bin/sleep 620" #+20 sec
825 #In case of a scheduled experiment (not immediate)
826 #To run an XP immediately, don't specify date and time in RSpec
827 #They will be set to None.
828 if slot['date'] and slot['start_time']:
829 if slot['timezone'] is '' or slot['timezone'] is None:
830 #assume it is server timezone
831 server_timestamp,server_tz = self.GetTimezone()
832 from_zone=tz.gettz(server_tz)
833 print>>sys.stderr, "\r\n \r\n \t\tLaunchExperimentOnOAR timezone not specified server_tz %s from_zone %s" %(server_tz,from_zone)
835 #Get zone of the user from the reservation time given in the rspec
836 from_zone = tz.gettz(slot['timezone'])
838 date = str(slot['date']) + " " + str(slot['start_time'])
839 user_datetime = datetime.datetime.strptime(date, self.time_format)
840 user_datetime = user_datetime.replace(tzinfo = from_zone)
844 utc_date = user_datetime.astimezone(to_zone)
845 #Readable time accpeted by OAR
846 reqdict['reservation']= utc_date.strftime(self.time_format)
848 print>>sys.stderr, "\r\n \r\n \t\tLaunchExperimentOnOAR reqdict['reservation'] %s " %(reqdict['reservation'])
852 # reservations are performed in the oar server timebase, so :
853 # 1- we get the server time(in UTC tz )/server timezone
854 # 2- convert the server UTC time in its timezone
855 # 3- add a custom delay to this time
856 # 4- convert this time to a readable form and it for the reservation request.
857 server_timestamp,server_tz = self.GetTimezone()
858 s_tz=tz.gettz(server_tz)
859 UTC_zone = tz.gettz("UTC")
860 #weird... datetime.fromtimestamp should work since we do from datetime import datetime
861 utc_server= datetime.datetime.fromtimestamp(float(server_timestamp)+20,UTC_zone)
862 server_localtime=utc_server.astimezone(s_tz)
864 print>>sys.stderr, "\r\n \r\n \t\tLaunchExperimentOnOAR server_timestamp %s server_tz %s slice_name %s added_nodes %s username %s reqdict %s " %(server_timestamp,server_tz,slice_name,added_nodes,slice_user, reqdict )
865 readable_time = server_localtime.strftime(self.time_format)
867 print >>sys.stderr," \r\n \r\n \t\t\t\tAPRES ParseTimezone readable_time %s timestanp %s " %(readable_time ,server_timestamp)
868 reqdict['reservation'] = readable_time
871 reqdict['type'] = "deploy"
872 reqdict['directory']= ""
873 reqdict['name']= "TestSandrine"
876 # first step : start the OAR job and update the job
877 print>>sys.stderr, "\r\n \r\n LaunchExperimentOnOAR reqdict %s \r\n site_list %s" %(reqdict,site_list)
879 answer = self.oar.POSTRequestToOARRestAPI('POST_job',reqdict,slice_user)
880 print>>sys.stderr, "\r\n \r\n LaunchExperimentOnOAR jobid %s " %(answer)
884 print>>sys.stderr, "\r\n AddSliceTonode Impossible to create job %s " %( answer)
887 print>>sys.stderr, "\r\n \r\n LaunchExperimentOnOAR jobid %s added_nodes %s slice_user %s" %(jobid,added_nodes,slice_user)
888 self.db.update_job( slice_name, jobid ,added_nodes)
891 # second step : configure the experiment
892 # we need to store the nodes in a yaml (well...) file like this :
893 # [1,56,23,14,45,75] with name /tmp/sfa<jobid>.json
894 f=open('/tmp/sfa/'+str(jobid)+'.json','w')
896 f.write(str(added_nodes[0].strip('node')))
897 for node in added_nodes[1:len(added_nodes)] :
898 f.write(','+node.strip('node'))
902 # third step : call the senslab-experiment wrapper
903 #command= "java -jar target/sfa-1.0-jar-with-dependencies.jar "+str(jobid)+" "+slice_user
904 javacmdline="/usr/bin/java"
905 jarname="/opt/senslabexperimentwrapper/sfa-1.0-jar-with-dependencies.jar"
906 #ret=subprocess.check_output(["/usr/bin/java", "-jar", ", str(jobid), slice_user])
907 output = subprocess.Popen([javacmdline, "-jar", jarname, str(jobid), slice_user],stdout=subprocess.PIPE).communicate()[0]
909 print>>sys.stderr, "\r\n \r\n LaunchExperimentOnOAR wrapper returns %s " %(output)
913 #Delete the jobs and updates the job id in the senslab table
915 #Does not clear the node list
916 def DeleteSliceFromNodes(self, slice_record):
917 # Get user information
919 self.DeleteJobs(slice_record['oar_job_id'], slice_record['hrn'])
920 self.db.update_job(slice_record['hrn'], job_id = -1)
927 def augment_records_with_testbed_info (self, sfa_records):
928 return self.fill_record_info (sfa_records)
930 def fill_record_info(self, record_list):
932 Given a SFA record, fill in the senslab specific and SFA specific
933 fields in the record.
936 logger.debug("SLABDRIVER \tfill_record_info records %s " %(record_list))
937 if not isinstance(record_list, list):
938 record_list = [record_list]
941 for record in record_list:
942 #If the record is a SFA slice record, then add information
943 #about the user of this slice. This kind of information is in the
945 if str(record['type']) == 'slice':
946 #Get slab slice record.
947 recslice = self.GetSlices(slice_filter = \
949 slice_filter_type = 'slice_hrn')
950 recuser = dbsession.query(RegRecord).filter_by(record_id = \
951 recslice['record_id_user']).first()
952 logger.debug( "SLABDRIVER.PY \t fill_record_info SLICE \
953 rec %s \r\n \r\n" %(recslice))
954 record.update({'PI':[recuser.hrn],
955 'researcher': [recuser.hrn],
956 'name':record['hrn'],
957 'oar_job_id':recslice['oar_job_id'],
959 'person_ids':[recslice['record_id_user']],
960 'geni_urn':'', #For client_helper.py compatibility
961 'keys':'', #For client_helper.py compatibility
962 'key_ids':''}) #For client_helper.py compatibility
964 elif str(record['type']) == 'user':
965 #The record is a SFA user record.
966 #Get the information about his slice from Senslab's DB
967 #and add it to the user record.
968 recslice = self.GetSlices(slice_filter = \
969 record['record_id'],\
970 slice_filter_type = 'record_id_user')
972 logger.debug( "SLABDRIVER.PY \t fill_record_info user \
973 rec %s \r\n \r\n" %(recslice))
974 #Append slice record in records list,
975 #therefore fetches user and slice info again(one more loop)
976 #Will update PIs and researcher for the slice
977 recuser = dbsession.query(RegRecord).filter_by(record_id = \
978 recslice['record_id_user']).first()
979 recslice.update({'PI':[recuser.hrn],
980 'researcher': [recuser.hrn],
981 'name':record['hrn'],
982 'oar_job_id':recslice['oar_job_id'],
984 'person_ids':[recslice['record_id_user']]})
986 #GetPersons takes [] as filters
987 user_slab = self.GetPersons([{'hrn':recuser.hrn}])
990 recslice.update({'type':'slice','hrn':recslice['slice_hrn']})
991 record.update(user_slab[0])
992 #For client_helper.py compatibility
993 record.update( { 'geni_urn':'',
996 record_list.append(recslice)
998 logger.debug("SLABDRIVER.PY \tfill_record_info ADDING SLICE\
999 INFO TO USER records %s" %(record_list))
1003 logger.log_exc("SLABDRIVER \t fill_record_info EXCEPTION %s" %(e))
1007 #self.fill_record_slab_info(records)
1008 ##print >>sys.stderr, "\r\n \t\t after fill_record_slab_info %s" %(records)
1009 #self.fill_record_sfa_info(records)
1010 #print >>sys.stderr, "\r\n \t\t after fill_record_sfa_info"
1016 #def update_membership_list(self, oldRecord, record, listName, addFunc, delFunc):
1017 ## get a list of the HRNs tht are members of the old and new records
1019 #oldList = oldRecord.get(listName, [])
1022 #newList = record.get(listName, [])
1024 ## if the lists are the same, then we don't have to update anything
1025 #if (oldList == newList):
1028 ## build a list of the new person ids, by looking up each person to get
1032 #records = table.find({'type': 'user', 'hrn': newList})
1033 #for rec in records:
1034 #newIdList.append(rec['pointer'])
1036 ## build a list of the old person ids from the person_ids field
1038 #oldIdList = oldRecord.get("person_ids", [])
1039 #containerId = oldRecord.get_pointer()
1041 ## if oldRecord==None, then we are doing a Register, instead of an
1044 #containerId = record.get_pointer()
1046 ## add people who are in the new list, but not the oldList
1047 #for personId in newIdList:
1048 #if not (personId in oldIdList):
1049 #addFunc(self.plauth, personId, containerId)
1051 ## remove people who are in the old list, but not the new list
1052 #for personId in oldIdList:
1053 #if not (personId in newIdList):
1054 #delFunc(self.plauth, personId, containerId)
1056 #def update_membership(self, oldRecord, record):
1057 #print >>sys.stderr, " \r\n \r\n ***SLABDRIVER.PY update_membership record ", record
1058 #if record.type == "slice":
1059 #self.update_membership_list(oldRecord, record, 'researcher',
1060 #self.users.AddPersonToSlice,
1061 #self.users.DeletePersonFromSlice)
1062 #elif record.type == "authority":
1067 # I don't think you plan on running a component manager at this point
1068 # let me clean up the mess of ComponentAPI that is deprecated anyways