4 from datetime import datetime
5 from dateutil import tz
6 from time import strftime,gmtime
8 from sfa.util.faults import MissingSfaInfo , SliverDoesNotExist
9 from sfa.util.sfalogging import logger
10 from sfa.util.defaultdict import defaultdict
12 from sfa.storage.record import Record
13 from sfa.storage.alchemy import dbsession
14 from sfa.storage.model import RegRecord
16 from sfa.trust.credential import Credential
17 from sfa.trust.gid import GID
19 from sfa.managers.driver import Driver
20 from sfa.rspecs.version_manager import VersionManager
21 from sfa.rspecs.rspec import RSpec
23 from sfa.util.xrn import hrn_to_urn, urn_to_sliver_id
24 from sfa.planetlab.plxrn import slicename_to_hrn, hostname_to_hrn, hrn_to_pl_slicename
26 ## thierry: everything that is API-related (i.e. handling incoming requests)
28 # SlabDriver should be really only about talking to the senslab testbed
31 from sfa.senslab.OARrestapi import OARrestapi
32 from sfa.senslab.LDAPapi import LDAPapi
34 from sfa.senslab.slabpostgres import SlabDB, slab_dbsession,SliceSenslab
35 from sfa.senslab.slabaggregate import SlabAggregate
36 from sfa.senslab.slabslices import SlabSlices
41 # this inheritance scheme is so that the driver object can receive
42 # GetNodes or GetSites sorts of calls directly
43 # and thus minimize the differences in the managers with the pl version
44 class SlabDriver(Driver):
46 def __init__(self, config):
47 Driver.__init__ (self, config)
49 self.hrn = config.SFA_INTERFACE_HRN
51 self.root_auth = config.SFA_REGISTRY_ROOT_AUTH
53 self.oar = OARrestapi()
55 self.time_format = "%Y-%m-%d %H:%M:%S"
56 self.db = SlabDB(config,debug = True)
60 def sliver_status(self,slice_urn,slice_hrn):
61 """Receive a status request for slice named urn/hrn
62 urn:publicid:IDN+senslab+nturro_slice hrn senslab.nturro_slice
63 shall return a structure as described in
64 http://groups.geni.net/geni/wiki/GAPI_AM_API_V2#SliverStatus
65 NT : not sure if we should implement this or not, but used by sface.
69 #First get the slice with the slice hrn
70 sl = self.GetSlices(slice_filter = slice_hrn, slice_filter_type = 'slice_hrn')
72 raise SliverDoesNotExist("%s slice_hrn" % (slice_hrn))
74 nodes_in_slice = sl['node_ids']
75 if len(nodes_in_slice) is 0:
76 raise SliverDoesNotExist("No slivers allocated ")
78 logger.debug("Slabdriver - sliver_status Sliver status urn %s hrn %s sl\
79 %s \r\n " %(slice_urn,slice_hrn,sl) )
81 if sl['oar_job_id'] is not -1:
82 #A job is running on Senslab for this slice
83 # report about the local nodes that are in the slice only
85 nodes_all = self.GetNodes({'hostname':nodes_in_slice},
86 ['node_id', 'hostname','site','boot_state'])
87 nodeall_byhostname = dict([(n['hostname'], n) for n in nodes_all])
91 top_level_status = 'unknown'
93 top_level_status = 'ready'
94 result['geni_urn'] = slice_urn
95 result['pl_login'] = sl['job_user'] #For compatibility
98 timestamp = float(sl['startTime']) + float(sl['walltime'])
99 result['pl_expires'] = strftime(self.time_format, \
100 gmtime(float(timestamp)))
101 #result['slab_expires'] = strftime(self.time_format,\
102 #gmtime(float(timestamp)))
107 #res['slab_hostname'] = node['hostname']
108 #res['slab_boot_state'] = node['boot_state']
110 res['pl_hostname'] = nodeall_byhostname[node]['hostname']
111 res['pl_boot_state'] = nodeall_byhostname[node]['boot_state']
112 res['pl_last_contact'] = strftime(self.time_format, \
113 gmtime(float(timestamp)))
114 sliver_id = urn_to_sliver_id(slice_urn, sl['record_id_slice'], \
115 nodeall_byhostname[node]['node_id'])
116 res['geni_urn'] = sliver_id
117 if nodeall_byhostname[node]['boot_state'] == 'Alive':
119 res['geni_status'] = 'ready'
121 res['geni_status'] = 'failed'
122 top_level_status = 'failed'
124 res['geni_error'] = ''
126 resources.append(res)
128 result['geni_status'] = top_level_status
129 result['geni_resources'] = resources
130 print >>sys.stderr, "\r\n \r\n_____________ Sliver status resources %s res %s \r\n " %(resources,res)
134 def create_sliver (self, slice_urn, slice_hrn, creds, rspec_string, users, options):
135 print>>sys.stderr, "\r\n \r\n \t=============================== SLABDRIVER.PY create_sliver "
136 aggregate = SlabAggregate(self)
138 slices = SlabSlices(self)
139 peer = slices.get_peer(slice_hrn)
140 sfa_peer = slices.get_sfa_peer(slice_hrn)
143 if not isinstance(creds, list):
147 slice_record = users[0].get('slice_record', {})
150 rspec = RSpec(rspec_string)
151 print>>sys.stderr, "\r\n \r\n \t=============================== SLABDRIVER.PY create_sliver ============================rspec.version %s " %(rspec.version)
154 # ensure site record exists?
155 # ensure slice record exists
156 slice = slices.verify_slice(slice_hrn, slice_record, peer, sfa_peer, options=options)
157 requested_attributes = rspec.version.get_slice_attributes()
159 if requested_attributes:
160 for attrib_dict in requested_attributes:
161 if 'timeslot' in attrib_dict and attrib_dict['timeslot'] is not None:
162 slice.update({'timeslot':attrib_dict['timeslot']})
163 print >>sys.stderr, "\r\n \r\n \t=============================== SLABDRIVER.PY create_sliver ..... slice %s " %(slice)
164 # ensure person records exists
165 persons = slices.verify_persons(slice_hrn, slice, users, peer, sfa_peer, options=options)
166 # ensure slice attributes exists?
169 # add/remove slice from nodes
170 print >>sys.stderr, "\r\n \r\n \t=============================== SLABDRIVER.PY create_sliver ..... "
172 requested_slivers = [node.get('component_name') for node in rspec.version.get_nodes_with_slivers()]
173 print >>sys.stderr, "\r\n \r\n \t=============================== ........... requested_slivers ============================requested_slivers %s " %(requested_slivers)
174 nodes = slices.verify_slice_nodes(slice, requested_slivers, peer)
177 return aggregate.get_rspec(slice_xrn=slice_urn, version=rspec.version)
180 def delete_sliver (self, slice_urn, slice_hrn, creds, options):
182 slice = self.GetSlices(slice_filter= slice_hrn, slice_filter_type = 'slice_hrn')
183 print>>sys.stderr, "\r\n \r\n \t\t SLABDRIVER.PY delete_sliver slice %s" %(slice)
187 slices = SlabSlices(self)
188 # determine if this is a peer slice
189 # xxx I wonder if this would not need to use PlSlices.get_peer instead
190 # in which case plc.peers could be deprecated as this here
191 # is the only/last call to this last method in plc.peers
192 peer = slices.get_peer(slice_hrn)
195 self.UnBindObjectFromPeer('slice', slice['record_id_slice'], peer)
196 self.DeleteSliceFromNodes(slice)
199 self.BindObjectToPeer('slice', slice['slice_id'], peer, slice['peer_slice_id'])
203 def AddSlice(self, slice_record):
204 slab_slice = SliceSenslab( slice_hrn = slice_record['slice_hrn'], record_id_slice= slice_record['record_id_slice'] , record_id_user= slice_record['record_id_user'], peer_authority = slice_record['peer_authority'])
205 print>>sys.stderr, "\r\n \r\n \t\t\t =======SLABDRIVER.PY AddSlice slice_record %s slab_slice %s" %(slice_record,slab_slice)
206 slab_dbsession.add(slab_slice)
207 slab_dbsession.commit()
210 # first 2 args are None in case of resource discovery
211 def list_resources (self, slice_urn, slice_hrn, creds, options):
212 #cached_requested = options.get('cached', True)
214 version_manager = VersionManager()
215 # get the rspec's return format from options
216 rspec_version = version_manager.get_version(options.get('geni_rspec_version'))
217 version_string = "rspec_%s" % (rspec_version)
219 #panos adding the info option to the caching key (can be improved)
220 if options.get('info'):
221 version_string = version_string + "_"+options.get('info', 'default')
223 # look in cache first
224 #if cached_requested and self.cache and not slice_hrn:
225 #rspec = self.cache.get(version_string)
227 #logger.debug("SlabDriver.ListResources: returning cached advertisement")
230 #panos: passing user-defined options
232 aggregate = SlabAggregate(self)
233 origin_hrn = Credential(string=creds[0]).get_gid_caller().get_hrn()
234 options.update({'origin_hrn':origin_hrn})
235 rspec = aggregate.get_rspec(slice_xrn=slice_urn, version=rspec_version,
237 print>>sys.stderr, " \r\n \r\n \t SLABDRIVER list_resources rspec "
239 #if self.cache and not slice_hrn:
240 #logger.debug("Slab.ListResources: stores advertisement in cache")
241 #self.cache.add(version_string, rspec)
246 def list_slices (self, creds, options):
247 # look in cache first
249 #slices = self.cache.get('slices')
251 #logger.debug("PlDriver.list_slices returns from cache")
255 print>>sys.stderr, " \r\n \t\t SLABDRIVER.PY list_slices"
256 slices = self.GetSlices()
257 slice_hrns = [slicename_to_hrn(self.hrn, slice['slice_hrn']) for slice in slices]
258 slice_urns = [hrn_to_urn(slice_hrn, 'slice') for slice_hrn in slice_hrns]
262 #logger.debug ("SlabDriver.list_slices stores value in cache")
263 #self.cache.add('slices', slice_urns)
267 #No site or node register supported
268 def register (self, sfa_record, hrn, pub_key):
269 type = sfa_record['type']
270 slab_record = self.sfa_fields_to_slab_fields(type, hrn, sfa_record)
274 acceptable_fields=['url', 'instantiation', 'name', 'description']
275 for key in slab_record.keys():
276 if key not in acceptable_fields:
278 print>>sys.stderr, " \r\n \t\t SLABDRIVER.PY register"
279 slices = self.GetSlices(slice_filter =slab_record['hrn'], slice_filter_type = 'slice_hrn')
281 pointer = self.AddSlice(slab_record)
283 pointer = slices[0]['slice_id']
286 persons = self.GetPersons([sfa_record])
287 #persons = self.GetPersons([sfa_record['hrn']])
289 pointer = self.AddPerson(dict(sfa_record))
292 pointer = persons[0]['person_id']
294 #Does this make sense to senslab ?
295 #if 'enabled' in sfa_record and sfa_record['enabled']:
296 #self.UpdatePerson(pointer, {'enabled': sfa_record['enabled']})
298 # add this person to the site only if she is being added for the first
299 # time by sfa and doesont already exist in plc
300 if not persons or not persons[0]['site_ids']:
301 login_base = get_leaf(sfa_record['authority'])
302 self.AddPersonToSite(pointer, login_base)
304 # What roles should this user have?
305 self.AddRoleToPerson('user', pointer)
308 self.AddPersonKey(pointer, {'key_type' : 'ssh', 'key' : pub_key})
310 #No node adding outside OAR
314 #No site or node record update allowed
315 def update (self, old_sfa_record, new_sfa_record, hrn, new_key):
316 pointer = old_sfa_record['pointer']
317 type = old_sfa_record['type']
319 # new_key implemented for users only
320 if new_key and type not in [ 'user' ]:
321 raise UnknownSfaType(type)
323 #if (type == "authority"):
324 #self.shell.UpdateSite(pointer, new_sfa_record)
327 slab_record=self.sfa_fields_to_slab_fields(type, hrn, new_sfa_record)
328 if 'name' in slab_record:
329 slab_record.pop('name')
330 self.UpdateSlice(pointer, slab_record)
334 all_fields = new_sfa_record
335 for key in all_fields.keys():
336 if key in ['first_name', 'last_name', 'title', 'email',
337 'password', 'phone', 'url', 'bio', 'accepted_aup',
339 update_fields[key] = all_fields[key]
340 self.UpdatePerson(pointer, update_fields)
343 # must check this key against the previous one if it exists
344 persons = self.GetPersons([pointer], ['key_ids'])
346 keys = person['key_ids']
347 keys = self.GetKeys(person['key_ids'])
349 # Delete all stale keys
352 if new_key != key['key']:
353 self.DeleteKey(key['key_id'])
357 self.AddPersonKey(pointer, {'key_type': 'ssh', 'key': new_key})
363 def remove (self, sfa_record):
364 type=sfa_record['type']
365 hrn=sfa_record['hrn']
366 record_id= sfa_record['record_id']
368 username = hrn.split(".")[len(hrn.split(".")) -1]
370 persons = self.GetPersons(sfa_record)
371 #persons = self.GetPersons(username)
372 # only delete this person if he has site ids. if he doesnt, it probably means
373 # he was just removed from a site, not actually deleted
374 if persons and persons[0]['site_ids']:
375 self.DeletePerson(username)
376 elif type == 'slice':
377 if self.GetSlices(slice_filter = hrn, slice_filter_type = 'slice_hrn'):
378 self.DeleteSlice(hrn)
380 #elif type == 'authority':
381 #if self.GetSites(pointer):
382 #self.DeleteSite(pointer)
386 def GetPeers (self,auth = None, peer_filter=None, return_fields_list=None):
388 existing_records = {}
389 existing_hrns_by_types= {}
390 print >>sys.stderr, "\r\n \r\n SLABDRIVER GetPeers auth = %s, peer_filter %s, return_field %s " %(auth , peer_filter, return_fields_list)
391 all_records = dbsession.query(RegRecord).filter(RegRecord.type.like('%authority%')).all()
392 for record in all_records:
393 existing_records[(record.hrn,record.type)] = record
394 if record.type not in existing_hrns_by_types:
395 existing_hrns_by_types[record.type] = [record.hrn]
396 print >>sys.stderr, "\r\n \r\n SLABDRIVER GetPeers \t NOT IN existing_hrns_by_types %s " %( existing_hrns_by_types)
399 print >>sys.stderr, "\r\n \r\n SLABDRIVER GetPeers \t INNN type %s hrn %s " %( record.type,record.hrn )
400 existing_hrns_by_types[record.type].append(record.hrn)
401 print >>sys.stderr, "\r\n \r\n SLABDRIVER GetPeers \t INNN existing_hrns_by_types %s " %( existing_hrns_by_types)
402 #existing_hrns_by_types.update({record.type:(existing_hrns_by_types[record.type].append(record.hrn))})
404 print >>sys.stderr, "\r\n \r\n SLABDRIVER GetPeers existing_hrns_by_types %s " %( existing_hrns_by_types)
408 print >>sys.stderr, "\r\n \r\n SLABDRIVER GetPeers existing_hrns_by_types['authority+sa'] %s \t\t existing_records %s " %(existing_hrns_by_types['authority'],existing_records)
410 records_list.append(existing_records[(peer_filter,'authority')])
412 for hrn in existing_hrns_by_types['authority']:
413 records_list.append(existing_records[(hrn,'authority')])
415 print >>sys.stderr, "\r\n \r\n SLABDRIVER GetPeers records_list %s " %(records_list)
420 return_records = records_list
421 if not peer_filter and not return_fields_list:
425 print >>sys.stderr, "\r\n \r\n SLABDRIVER GetPeers return_records %s " %(return_records)
426 return return_records
429 #TODO : Handling OR request in make_ldap_filters_from_records instead of the for loop
430 #over the records' list
431 def GetPersons(self, person_filter=None, return_fields_list=None):
433 person_filter should be a list of dictionnaries when not set to None.
434 Returns a list of users found.
437 print>>sys.stderr, "\r\n \r\n \t\t\t GetPersons person_filter %s" %(person_filter)
439 if person_filter and isinstance(person_filter,list):
440 #If we are looking for a list of users (list of dict records)
441 #Usually the list contains only one user record
442 for f in person_filter:
443 person = self.ldap.LdapFindUser(f)
444 person_list.append(person)
447 person_list = self.ldap.LdapFindUser()
452 def GetTimezone(self):
453 server_timestamp,server_tz = self.oar.parser.SendRequest("GET_timezone")
454 return server_timestamp,server_tz
457 def DeleteJobs(self, job_id, slice_hrn):
460 username = slice_hrn.split(".")[-1].rstrip("_slice")
462 reqdict['method'] = "delete"
463 reqdict['strval'] = str(job_id)
464 answer = self.oar.POSTRequestToOARRestAPI('DELETE_jobs_id',reqdict,username)
465 print>>sys.stderr, "\r\n \r\n jobid DeleteJobs %s " %(answer)
467 def GetJobsId(self, job_id, username = None ):
469 Details about a specific job.
470 Includes details about submission time, jot type, state, events,
471 owner, assigned ressources, walltime etc...
475 node_list_k = 'assigned_network_address'
476 #Get job info from OAR
477 job_info = self.oar.parser.SendRequest(req, job_id, username)
479 logger.debug("SLABDRIVER \t GetJobs %s " %(job_info))
481 if job_info['state'] == 'Terminated':
482 logger.debug("SLABDRIVER \t GetJobsId job %s TERMINATED"\
485 if job_info['state'] == 'Error':
486 logger.debug("SLABDRIVER \t GetJobsId ERROR message %s "\
491 logger.error("SLABDRIVER \tGetJobsId KeyError")
494 parsed_job_info = self.get_info_on_reserved_nodes(job_info,node_list_k)
495 #Replaces the previous entry "assigned_network_address" / "reserved_resources"
497 job_info.update({'node_ids':parsed_job_info[node_list_k]})
498 del job_info[node_list_k]
499 logger.debug(" \r\nSLABDRIVER \t GetJobsId job_info %s " %(job_info))
503 def GetJobsResources(self,job_id, return_fields_list=None, username = None):
504 #job_resources=['reserved_resources', 'assigned_resources','job_id', 'job_uri', 'assigned_nodes',\
506 #assigned_res = ['resource_id', 'resource_uri']
507 #assigned_n = ['node', 'node_uri']
509 req = "GET_jobs_id_resources"
510 node_list_k = 'reserved_resources'
512 #Get job info from OAR
513 job_info = self.oar.parser.SendRequest(req, job_id, username)
514 logger.debug("SLABDRIVER \t GetJobsResources %s " %(job_info))
516 parsed_job_info = self.get_info_on_reserved_nodes(job_info,node_list_k)
517 #Replaces the previous entry "assigned_network_address" / "reserved_resources"
519 job_info.update({'node_ids':parsed_job_info[node_list_k]})
520 del job_info[node_list_k]
524 def get_info_on_reserved_nodes(self,job_info,node_list_name):
525 #Get the list of the testbed nodes records and make a
526 #dictionnary keyed on the hostname out of it
527 node_list_dict = self.GetNodes()
528 #node_hostname_list = []
529 node_hostname_list = [node['hostname'] for node in node_list_dict]
530 #for node in node_list_dict:
531 #node_hostname_list.append(node['hostname'])
532 node_dict = dict(zip(node_hostname_list,node_list_dict))
534 reserved_node_hostname_list = []
535 for index in range(len(job_info[node_list_name])):
536 #job_info[node_list_name][k] =
537 reserved_node_hostname_list[index] = \
538 node_dict[job_info[node_list_name][index]]['hostname']
540 logger.debug("SLABDRIVER \t get_info_on_reserved_nodes \
541 reserved_node_hostname_list %s" \
542 %(reserved_node_hostname_list))
544 logger.error("SLABDRIVER \t get_info_on_reserved_nodes KEYERROR " )
546 return reserved_node_hostname_list
548 def GetReservedNodes(self):
549 # this function returns a list of all the nodes already involved in an oar job
550 return self.oar.parser.SendRequest("GET_reserved_nodes")
552 def GetNodes(self,node_filter_dict = None, return_fields_list = None):
554 node_filter_dict : dictionnary of lists
557 node_dict_by_id = self.oar.parser.SendRequest("GET_resources_full")
558 node_dict_list = node_dict_by_id.values()
560 #No filtering needed return the list directly
561 if not (node_filter_dict or return_fields_list):
562 return node_dict_list
564 return_node_list = []
566 for filter_key in node_filter_dict:
568 #Filter the node_dict_list by each value contained in the
569 #list node_filter_dict[filter_key]
570 for value in node_filter_dict[filter_key]:
571 for node in node_dict_list:
572 if node[filter_key] == value:
573 if return_fields_list :
575 for k in return_fields_list:
577 return_node_list.append(tmp)
579 return_node_list.append(node)
581 logger.log_exc("GetNodes KeyError")
585 return return_node_list
588 def GetSites(self, site_filter_name = None, return_fields_list = None):
589 site_dict = self.oar.parser.SendRequest("GET_sites")
590 #site_dict : dict where the key is the sit ename
591 return_site_list = []
592 if not ( site_filter_name or return_fields_list):
593 return_site_list = site_dict.values()
594 return return_site_list
596 if site_filter_name in site_dict:
597 if return_fields_list:
598 for field in return_fields_list:
602 tmp[field] = site_dict[site_filter_name][field]
604 logger.error("GetSites KeyError %s "%(field))
606 return_site_list.append(tmp)
608 return_site_list.append( site_dict[site_filter_name])
611 return return_site_list
614 def GetSlices(self, slice_filter = None, slice_filter_type = None, \
615 return_fields_list=None):
616 return_slice_list = []
619 authorized_filter_types_list = ['slice_hrn', 'record_id_user']
620 print>>sys.stderr,"\r\n SLABDRIVER \tGetSlices authorized_filter_types_list %s" %(authorized_filter_types_list)
621 if slice_filter_type in authorized_filter_types_list:
622 if slice_filter_type == 'slice_hrn':
623 slicerec = slab_dbsession.query(SliceSenslab).\
624 filter_by(slice_hrn = slice_filter).first()
626 if slice_filter_type == 'record_id_user':
627 slicerec = slab_dbsession.query(SliceSenslab).\
628 filter_by(record_id_user = slice_filter).first()
631 rec = slicerec.dump_sqlalchemyobj_to_dict()
632 print>>sys.stderr,"\r\n SLABDRIVER \tGetSlices rec %s" %(rec)
634 login = slicerec.slice_hrn.split(".")[1].split("_")[0]
635 logger.debug("\r\n SLABDRIVER \tGetSlices login %s slice record %s"\
637 if slicerec.oar_job_id is not -1:
638 #Check with OAR the status of the job if a job id is in
640 rslt = self.GetJobsId(slicerec.oar_job_id,username = login)
643 rec.update({'hrn':str(rec['slice_hrn'])})
644 #If GetJobsResources is empty, this means the job is now in the 'Terminated' state
645 #Update the slice record
647 self.db.update_job(slice_filter, job_id = -1)
648 rec['oar_job_id'] = -1
649 rec.update({'hrn':str(rec['slice_hrn'])})
652 rec['node_ids'] = rec['node_list']
656 #print >>sys.stderr, " \r\n \r\n \tSLABDRIVER.PY GetSlices rec %s" %(rec)
662 return_slice_list = slab_dbsession.query(SliceSenslab).all()
664 print >>sys.stderr, " \r\n \r\n \tSLABDRIVER.PY GetSlices slices %s slice_filter %s " %(return_slice_list,slice_filter)
666 #if return_fields_list:
667 #return_slice_list = parse_filter(sliceslist, slice_filter,'slice', return_fields_list)
671 return return_slice_list
676 def testbed_name (self): return "senslab2"
678 # 'geni_request_rspec_versions' and 'geni_ad_rspec_versions' are mandatory
679 def aggregate_version (self):
680 version_manager = VersionManager()
681 ad_rspec_versions = []
682 request_rspec_versions = []
683 for rspec_version in version_manager.versions:
684 if rspec_version.content_type in ['*', 'ad']:
685 ad_rspec_versions.append(rspec_version.to_dict())
686 if rspec_version.content_type in ['*', 'request']:
687 request_rspec_versions.append(rspec_version.to_dict())
689 'testbed':self.testbed_name(),
690 'geni_request_rspec_versions': request_rspec_versions,
691 'geni_ad_rspec_versions': ad_rspec_versions,
700 # Convert SFA fields to PLC fields for use when registering up updating
701 # registry record in the PLC database
703 # @param type type of record (user, slice, ...)
704 # @param hrn human readable name
705 # @param sfa_fields dictionary of SFA fields
706 # @param slab_fields dictionary of PLC fields (output)
708 def sfa_fields_to_slab_fields(self, type, hrn, record):
710 def convert_ints(tmpdict, int_fields):
711 for field in int_fields:
713 tmpdict[field] = int(tmpdict[field])
716 #for field in record:
717 # slab_record[field] = record[field]
720 #instantion used in get_slivers ?
721 if not "instantiation" in slab_record:
722 slab_record["instantiation"] = "senslab-instantiated"
723 slab_record["hrn"] = hrn_to_pl_slicename(hrn)
724 print >>sys.stderr, "\r\n \r\n \t SLABDRIVER.PY sfa_fields_to_slab_fields slab_record %s hrn_to_pl_slicename(hrn) hrn %s " %(slab_record['hrn'], hrn)
726 slab_record["url"] = record["url"]
727 if "description" in record:
728 slab_record["description"] = record["description"]
729 if "expires" in record:
730 slab_record["expires"] = int(record["expires"])
732 #nodes added by OAR only and then imported to SFA
733 #elif type == "node":
734 #if not "hostname" in slab_record:
735 #if not "hostname" in record:
736 #raise MissingSfaInfo("hostname")
737 #slab_record["hostname"] = record["hostname"]
738 #if not "model" in slab_record:
739 #slab_record["model"] = "geni"
742 #elif type == "authority":
743 #slab_record["login_base"] = hrn_to_slab_login_base(hrn)
745 #if not "name" in slab_record:
746 #slab_record["name"] = hrn
748 #if not "abbreviated_name" in slab_record:
749 #slab_record["abbreviated_name"] = hrn
751 #if not "enabled" in slab_record:
752 #slab_record["enabled"] = True
754 #if not "is_public" in slab_record:
755 #slab_record["is_public"] = True
760 def LaunchExperimentOnOAR(self, slice_dict, added_nodes, slice_user=None):
766 slice_name = slice_dict['name']
768 slot = slice_dict['timeslot']
769 print>>sys.stderr, "\r\n \r\n \t\tLaunchExperimentOnOAR slot %s " %(slot)
771 #Running on default parameters
772 #XP immediate , 10 mins
773 slot = {'date':None,'start_time':None, 'timezone':None,'duration':None }#10 min
776 reqdict['property'] ="network_address in ("
777 for node in added_nodes:
778 #Get the ID of the node : remove the root auth and put the site in a separate list
780 # NT: it's not clear for me if the nodenames will have the senslab prefix
781 # so lets take the last part only, for now.
783 #if s[0] == self.root_auth :
784 # Again here it's not clear if nodes will be prefixed with <site>_, lets split and tanke the last part for now.
785 s=lastpart.split("_")
787 reqdict['property'] += "'"+ nodeid +"', "
788 nodeid_list.append(nodeid)
789 #site_list.append( l[0] )
792 reqdict['property'] = reqdict['property'][0: len( reqdict['property'])-2] +")"
793 reqdict['resource'] ="network_address="+ str(len(nodeid_list))
796 walltime = slot['duration'].split(":")
797 # Fixing the walltime by adding a few delays. First put the walltime in seconds
798 # oarAdditionalDelay = 20; additional delay for /bin/sleep command to
799 # take in account prologue and epilogue scripts execution
800 # int walltimeAdditionalDelay = 120; additional delay
802 desired_walltime = int(walltime[0])*3600 + int(walltime[1]) * 60 + int(walltime[2])
803 total_walltime = desired_walltime + 140 #+2 min 20
804 sleep_walltime = desired_walltime + 20 #+20 sec
805 print>>sys.stderr, "\r\n \r\n \t\tLaunchExperimentOnOAR desired_walltime %s total_walltime %s sleep_walltime %s " %(desired_walltime,total_walltime,sleep_walltime)
806 #Put the walltime back in str form
808 walltime[0] = str(total_walltime / 3600)
809 total_walltime = total_walltime - 3600 * int(walltime[0])
810 #Get the remaining minutes
811 walltime[1] = str(total_walltime / 60)
812 total_walltime = total_walltime - 60 * int(walltime[1])
814 walltime[2] = str(total_walltime)
815 print>>sys.stderr, "\r\n \r\n \t\tLaunchExperimentOnOAR walltime %s " %(walltime)
817 reqdict['resource']+= ",walltime=" + str(walltime[0]) + ":" + str(walltime[1]) + ":" + str(walltime[2])
818 reqdict['script_path'] = "/bin/sleep " + str(sleep_walltime)
820 reqdict['resource']+= ",walltime=" + str(00) + ":" + str(12) + ":" + str(20) #+2 min 20
821 reqdict['script_path'] = "/bin/sleep 620" #+20 sec
822 #In case of a scheduled experiment (not immediate)
823 #To run an XP immediately, don't specify date and time in RSpec
824 #They will be set to None.
825 if slot['date'] and slot['start_time']:
826 if slot['timezone'] is '' or slot['timezone'] is None:
827 #assume it is server timezone
828 server_timestamp,server_tz = self.GetTimezone()
829 from_zone=tz.gettz(server_tz)
830 print>>sys.stderr, "\r\n \r\n \t\tLaunchExperimentOnOAR timezone not specified server_tz %s from_zone %s" %(server_tz,from_zone)
832 #Get zone of the user from the reservation time given in the rspec
833 from_zone = tz.gettz(slot['timezone'])
835 date = str(slot['date']) + " " + str(slot['start_time'])
836 user_datetime = datetime.strptime(date, self.time_format)
837 user_datetime = user_datetime.replace(tzinfo = from_zone)
841 utc_date = user_datetime.astimezone(to_zone)
842 #Readable time accpeted by OAR
843 reqdict['reservation']= utc_date.strftime(self.time_format)
845 print>>sys.stderr, "\r\n \r\n \t\tLaunchExperimentOnOAR reqdict['reservation'] %s " %(reqdict['reservation'])
849 # reservations are performed in the oar server timebase, so :
850 # 1- we get the server time(in UTC tz )/server timezone
851 # 2- convert the server UTC time in its timezone
852 # 3- add a custom delay to this time
853 # 4- convert this time to a readable form and it for the reservation request.
854 server_timestamp,server_tz = self.GetTimezone()
855 s_tz=tz.gettz(server_tz)
856 UTC_zone = tz.gettz("UTC")
857 #weird... datetime.fromtimestamp should work since we do from datetime import datetime
858 utc_server= datetime.fromtimestamp(float(server_timestamp)+20,UTC_zone)
859 server_localtime=utc_server.astimezone(s_tz)
861 print>>sys.stderr, "\r\n \r\n \t\tLaunchExperimentOnOAR server_timestamp %s server_tz %s slice_name %s added_nodes %s username %s reqdict %s " %(server_timestamp,server_tz,slice_name,added_nodes,slice_user, reqdict )
862 readable_time = server_localtime.strftime(self.time_format)
864 print >>sys.stderr," \r\n \r\n \t\t\t\tAPRES ParseTimezone readable_time %s timestanp %s " %(readable_time ,server_timestamp)
865 reqdict['reservation'] = readable_time
868 reqdict['type'] = "deploy"
869 reqdict['directory']= ""
870 reqdict['name']= "TestSandrine"
873 # first step : start the OAR job and update the job
874 print>>sys.stderr, "\r\n \r\n LaunchExperimentOnOAR reqdict %s \r\n site_list %s" %(reqdict,site_list)
876 answer = self.oar.POSTRequestToOARRestAPI('POST_job',reqdict,slice_user)
877 print>>sys.stderr, "\r\n \r\n LaunchExperimentOnOAR jobid %s " %(answer)
881 print>>sys.stderr, "\r\n AddSliceTonode Impossible to create job %s " %( answer)
884 print>>sys.stderr, "\r\n \r\n LaunchExperimentOnOAR jobid %s added_nodes %s slice_user %s" %(jobid,added_nodes,slice_user)
885 self.db.update_job( slice_name, jobid ,added_nodes)
888 # second step : configure the experiment
889 # we need to store the nodes in a yaml (well...) file like this :
890 # [1,56,23,14,45,75] with name /tmp/sfa<jobid>.json
891 f=open('/tmp/sfa/'+str(jobid)+'.json','w')
893 f.write(str(added_nodes[0].strip('node')))
894 for node in added_nodes[1:len(added_nodes)] :
895 f.write(','+node.strip('node'))
899 # third step : call the senslab-experiment wrapper
900 #command= "java -jar target/sfa-1.0-jar-with-dependencies.jar "+str(jobid)+" "+slice_user
901 javacmdline="/usr/bin/java"
902 jarname="/opt/senslabexperimentwrapper/sfa-1.0-jar-with-dependencies.jar"
903 #ret=subprocess.check_output(["/usr/bin/java", "-jar", ", str(jobid), slice_user])
904 output = subprocess.Popen([javacmdline, "-jar", jarname, str(jobid), slice_user],stdout=subprocess.PIPE).communicate()[0]
906 print>>sys.stderr, "\r\n \r\n LaunchExperimentOnOAR wrapper returns %s " %(output)
910 #Delete the jobs and updates the job id in the senslab table
912 #Does not clear the node list
913 def DeleteSliceFromNodes(self, slice_record):
914 # Get user information
916 self.DeleteJobs(slice_record['oar_job_id'], slice_record['hrn'])
917 self.db.update_job(slice_record['hrn'], job_id = -1)
924 def augment_records_with_testbed_info (self, sfa_records):
925 return self.fill_record_info (sfa_records)
927 def fill_record_info(self, record_list):
929 Given a SFA record, fill in the senslab specific and SFA specific
930 fields in the record.
933 logger.debug("SLABDRIVER \tfill_record_info records %s " %(record_list))
934 if not isinstance(record_list, list):
935 record_list = [record_list]
938 for record in record_list:
939 #If the record is a SFA slice record, then add information
940 #about the user of this slice. This kind of information is in the
942 if str(record['type']) == 'slice':
943 #Get slab slice record.
944 recslice = self.GetSlices(slice_filter = \
946 slice_filter_type = 'slice_hrn')
947 recuser = dbsession.query(RegRecord).filter_by(record_id = \
948 recslice['record_id_user']).first()
949 logger.debug( "SLABDRIVER.PY \t fill_record_info SLICE \
950 rec %s \r\n \r\n" %(recslice))
951 record.update({'PI':[recuser.hrn],
952 'researcher': [recuser.hrn],
953 'name':record['hrn'],
954 'oar_job_id':recslice['oar_job_id'],
956 'person_ids':[recslice['record_id_user']],
957 'geni_urn':'', #For client_helper.py compatibility
958 'keys':'', #For client_helper.py compatibility
959 'key_ids':''}) #For client_helper.py compatibility
961 elif str(record['type']) == 'user':
962 #The record is a SFA user record.
963 #Get the information about his slice from Senslab's DB
964 #and add it to the user record.
965 recslice = self.GetSlices(slice_filter = \
966 record['record_id'],\
967 slice_filter_type = 'record_id_user')
969 logger.debug( "SLABDRIVER.PY \t fill_record_info user \
970 rec %s \r\n \r\n" %(recslice))
971 #Append slice record in records list,
972 #therefore fetches user and slice info again(one more loop)
973 #Will update PIs and researcher for the slice
974 recuser = dbsession.query(RegRecord).filter_by(record_id = \
975 recslice['record_id_user']).first()
976 recslice.update({'PI':[recuser.hrn],
977 'researcher': [recuser.hrn],
978 'name':record['hrn'],
979 'oar_job_id':recslice['oar_job_id'],
981 'person_ids':[recslice['record_id_user']]})
983 #GetPersons takes [] as filters
984 #user_slab = self.GetPersons([{'hrn':recuser.hrn}])
985 user_slab = self.GetPersons([record])
987 recslice.update({'type':'slice','hrn':recslice['slice_hrn']})
988 record.update(user_slab[0])
989 #For client_helper.py compatibility
990 record.update( { 'geni_urn':'',
993 record_list.append(recslice)
995 logger.debug("SLABDRIVER.PY \tfill_record_info ADDING SLICE\
996 INFO TO USER records %s" %(record_list))
1000 logger.log_exc("SLABDRIVER \t fill_record_info EXCEPTION %s" %(e))
1004 #self.fill_record_slab_info(records)
1005 ##print >>sys.stderr, "\r\n \t\t after fill_record_slab_info %s" %(records)
1006 #self.fill_record_sfa_info(records)
1007 #print >>sys.stderr, "\r\n \t\t after fill_record_sfa_info"
1013 #def update_membership_list(self, oldRecord, record, listName, addFunc, delFunc):
1014 ## get a list of the HRNs tht are members of the old and new records
1016 #oldList = oldRecord.get(listName, [])
1019 #newList = record.get(listName, [])
1021 ## if the lists are the same, then we don't have to update anything
1022 #if (oldList == newList):
1025 ## build a list of the new person ids, by looking up each person to get
1029 #records = table.find({'type': 'user', 'hrn': newList})
1030 #for rec in records:
1031 #newIdList.append(rec['pointer'])
1033 ## build a list of the old person ids from the person_ids field
1035 #oldIdList = oldRecord.get("person_ids", [])
1036 #containerId = oldRecord.get_pointer()
1038 ## if oldRecord==None, then we are doing a Register, instead of an
1041 #containerId = record.get_pointer()
1043 ## add people who are in the new list, but not the oldList
1044 #for personId in newIdList:
1045 #if not (personId in oldIdList):
1046 #addFunc(self.plauth, personId, containerId)
1048 ## remove people who are in the old list, but not the new list
1049 #for personId in oldIdList:
1050 #if not (personId in newIdList):
1051 #delFunc(self.plauth, personId, containerId)
1053 #def update_membership(self, oldRecord, record):
1054 #print >>sys.stderr, " \r\n \r\n ***SLABDRIVER.PY update_membership record ", record
1055 #if record.type == "slice":
1056 #self.update_membership_list(oldRecord, record, 'researcher',
1057 #self.users.AddPersonToSlice,
1058 #self.users.DeletePersonFromSlice)
1059 #elif record.type == "authority":
1064 # I don't think you plan on running a component manager at this point
1065 # let me clean up the mess of ComponentAPI that is deprecated anyways