4 from datetime import datetime
5 from dateutil import tz
6 from time import strftime,gmtime
8 from sfa.util.faults import MissingSfaInfo , SliverDoesNotExist
9 from sfa.util.sfalogging import logger
10 from sfa.util.defaultdict import defaultdict
12 from sfa.storage.record import Record
13 from sfa.storage.alchemy import dbsession
14 from sfa.storage.model import RegRecord
16 from sfa.trust.credential import Credential
17 from sfa.trust.gid import GID
19 from sfa.managers.driver import Driver
20 from sfa.rspecs.version_manager import VersionManager
21 from sfa.rspecs.rspec import RSpec
23 from sfa.util.xrn import hrn_to_urn, urn_to_sliver_id
24 from sfa.planetlab.plxrn import slicename_to_hrn, hostname_to_hrn, hrn_to_pl_slicename
26 ## thierry: everything that is API-related (i.e. handling incoming requests)
28 # SlabDriver should be really only about talking to the senslab testbed
31 from sfa.senslab.OARrestapi import OARrestapi
32 from sfa.senslab.LDAPapi import LDAPapi
34 from sfa.senslab.slabpostgres import SlabDB, slab_dbsession,SliceSenslab
35 from sfa.senslab.slabaggregate import SlabAggregate
36 from sfa.senslab.slabslices import SlabSlices
41 # this inheritance scheme is so that the driver object can receive
42 # GetNodes or GetSites sorts of calls directly
43 # and thus minimize the differences in the managers with the pl version
44 class SlabDriver(Driver):
46 def __init__(self, config):
47 Driver.__init__ (self, config)
49 self.hrn = config.SFA_INTERFACE_HRN
51 self.root_auth = config.SFA_REGISTRY_ROOT_AUTH
53 self.oar = OARrestapi()
55 self.time_format = "%Y-%m-%d %H:%M:%S"
56 self.db = SlabDB(config,debug = True)
60 def sliver_status(self,slice_urn,slice_hrn):
61 """Receive a status request for slice named urn/hrn
62 urn:publicid:IDN+senslab+nturro_slice hrn senslab.nturro_slice
63 shall return a structure as described in
64 http://groups.geni.net/geni/wiki/GAPI_AM_API_V2#SliverStatus
65 NT : not sure if we should implement this or not, but used by sface.
69 #First get the slice with the slice hrn
70 sl = self.GetSlices(slice_filter = slice_hrn, slice_filter_type = 'slice_hrn')
72 raise SliverDoesNotExist("%s slice_hrn" % (slice_hrn))
74 nodes_in_slice = sl['node_ids']
75 if len(nodes_in_slice) is 0:
76 raise SliverDoesNotExist("No slivers allocated ")
78 logger.debug("Slabdriver - sliver_status Sliver status urn %s hrn %s sl\
79 %s \r\n " %(slice_urn,slice_hrn,sl) )
81 if sl['oar_job_id'] is not -1:
82 #A job is running on Senslab for this slice
83 # report about the local nodes that are in the slice only
85 nodes_all = self.GetNodes({'hostname':nodes_in_slice},
86 ['node_id', 'hostname','site','boot_state'])
87 nodeall_byhostname = dict([(n['hostname'], n) for n in nodes_all])
91 top_level_status = 'unknown'
93 top_level_status = 'ready'
94 result['geni_urn'] = slice_urn
95 result['pl_login'] = sl['job_user'] #For compatibility
98 timestamp = float(sl['startTime']) + float(sl['walltime'])
99 result['pl_expires'] = strftime(self.time_format, \
100 gmtime(float(timestamp)))
101 #result['slab_expires'] = strftime(self.time_format,\
102 #gmtime(float(timestamp)))
107 #res['slab_hostname'] = node['hostname']
108 #res['slab_boot_state'] = node['boot_state']
110 res['pl_hostname'] = nodeall_byhostname[node]['hostname']
111 res['pl_boot_state'] = nodeall_byhostname[node]['boot_state']
112 res['pl_last_contact'] = strftime(self.time_format, \
113 gmtime(float(timestamp)))
114 sliver_id = urn_to_sliver_id(slice_urn, sl['record_id_slice'], \
115 nodeall_byhostname[node]['node_id'])
116 res['geni_urn'] = sliver_id
117 if nodeall_byhostname[node]['boot_state'] == 'Alive':
119 res['geni_status'] = 'ready'
121 res['geni_status'] = 'failed'
122 top_level_status = 'failed'
124 res['geni_error'] = ''
126 resources.append(res)
128 result['geni_status'] = top_level_status
129 result['geni_resources'] = resources
130 print >>sys.stderr, "\r\n \r\n_____________ Sliver status resources %s res %s \r\n " %(resources,res)
134 def create_sliver (self, slice_urn, slice_hrn, creds, rspec_string, users, options):
135 print>>sys.stderr, "\r\n \r\n \t=============================== SLABDRIVER.PY create_sliver "
136 aggregate = SlabAggregate(self)
138 slices = SlabSlices(self)
139 peer = slices.get_peer(slice_hrn)
140 sfa_peer = slices.get_sfa_peer(slice_hrn)
143 if not isinstance(creds, list):
147 slice_record = users[0].get('slice_record', {})
150 rspec = RSpec(rspec_string)
151 print>>sys.stderr, "\r\n \r\n \t=============================== SLABDRIVER.PY create_sliver ============================rspec.version %s " %(rspec.version)
154 # ensure site record exists?
155 # ensure slice record exists
156 slice = slices.verify_slice(slice_hrn, slice_record, peer, sfa_peer, options=options)
157 requested_attributes = rspec.version.get_slice_attributes()
159 if requested_attributes:
160 for attrib_dict in requested_attributes:
161 if 'timeslot' in attrib_dict and attrib_dict['timeslot'] is not None:
162 slice.update({'timeslot':attrib_dict['timeslot']})
163 print >>sys.stderr, "\r\n \r\n \t=============================== SLABDRIVER.PY create_sliver ..... slice %s " %(slice)
164 # ensure person records exists
165 persons = slices.verify_persons(slice_hrn, slice, users, peer, sfa_peer, options=options)
166 # ensure slice attributes exists?
169 # add/remove slice from nodes
170 print >>sys.stderr, "\r\n \r\n \t=============================== SLABDRIVER.PY create_sliver ..... "
172 requested_slivers = [node.get('component_name') for node in rspec.version.get_nodes_with_slivers()]
173 print >>sys.stderr, "\r\n \r\n \t=============================== ........... requested_slivers ============================requested_slivers %s " %(requested_slivers)
174 nodes = slices.verify_slice_nodes(slice, requested_slivers, peer)
177 return aggregate.get_rspec(slice_xrn=slice_urn, version=rspec.version)
180 def delete_sliver (self, slice_urn, slice_hrn, creds, options):
182 slice = self.GetSlices(slice_filter= slice_hrn, slice_filter_type = 'slice_hrn')
183 print>>sys.stderr, "\r\n \r\n \t\t SLABDRIVER.PY delete_sliver slice %s" %(slice)
187 slices = SlabSlices(self)
188 # determine if this is a peer slice
189 # xxx I wonder if this would not need to use PlSlices.get_peer instead
190 # in which case plc.peers could be deprecated as this here
191 # is the only/last call to this last method in plc.peers
192 peer = slices.get_peer(slice_hrn)
195 self.UnBindObjectFromPeer('slice', slice['record_id_slice'], peer)
196 self.DeleteSliceFromNodes(slice)
199 self.BindObjectToPeer('slice', slice['slice_id'], peer, slice['peer_slice_id'])
203 def AddSlice(self, slice_record):
204 slab_slice = SliceSenslab( slice_hrn = slice_record['slice_hrn'], record_id_slice= slice_record['record_id_slice'] , record_id_user= slice_record['record_id_user'], peer_authority = slice_record['peer_authority'])
205 print>>sys.stderr, "\r\n \r\n \t\t\t =======SLABDRIVER.PY AddSlice slice_record %s slab_slice %s" %(slice_record,slab_slice)
206 slab_dbsession.add(slab_slice)
207 slab_dbsession.commit()
210 # first 2 args are None in case of resource discovery
211 def list_resources (self, slice_urn, slice_hrn, creds, options):
212 #cached_requested = options.get('cached', True)
214 version_manager = VersionManager()
215 # get the rspec's return format from options
216 rspec_version = version_manager.get_version(options.get('geni_rspec_version'))
217 version_string = "rspec_%s" % (rspec_version)
219 #panos adding the info option to the caching key (can be improved)
220 if options.get('info'):
221 version_string = version_string + "_"+options.get('info', 'default')
223 # look in cache first
224 #if cached_requested and self.cache and not slice_hrn:
225 #rspec = self.cache.get(version_string)
227 #logger.debug("SlabDriver.ListResources: returning cached advertisement")
230 #panos: passing user-defined options
232 aggregate = SlabAggregate(self)
233 origin_hrn = Credential(string=creds[0]).get_gid_caller().get_hrn()
234 options.update({'origin_hrn':origin_hrn})
235 rspec = aggregate.get_rspec(slice_xrn=slice_urn, version=rspec_version,
237 print>>sys.stderr, " \r\n \r\n \t SLABDRIVER list_resources rspec "
239 #if self.cache and not slice_hrn:
240 #logger.debug("Slab.ListResources: stores advertisement in cache")
241 #self.cache.add(version_string, rspec)
246 def list_slices (self, creds, options):
247 # look in cache first
249 #slices = self.cache.get('slices')
251 #logger.debug("PlDriver.list_slices returns from cache")
255 print>>sys.stderr, " \r\n \t\t SLABDRIVER.PY list_slices"
256 slices = self.GetSlices()
257 slice_hrns = [slicename_to_hrn(self.hrn, slice['slice_hrn']) for slice in slices]
258 slice_urns = [hrn_to_urn(slice_hrn, 'slice') for slice_hrn in slice_hrns]
262 #logger.debug ("SlabDriver.list_slices stores value in cache")
263 #self.cache.add('slices', slice_urns)
267 #No site or node register supported
268 def register (self, sfa_record, hrn, pub_key):
269 type = sfa_record['type']
270 slab_record = self.sfa_fields_to_slab_fields(type, hrn, sfa_record)
274 acceptable_fields=['url', 'instantiation', 'name', 'description']
275 for key in slab_record.keys():
276 if key not in acceptable_fields:
278 print>>sys.stderr, " \r\n \t\t SLABDRIVER.PY register"
279 slices = self.GetSlices(slice_filter =slab_record['hrn'], slice_filter_type = 'slice_hrn')
281 pointer = self.AddSlice(slab_record)
283 pointer = slices[0]['slice_id']
286 persons = self.GetPersons([sfa_record])
287 #persons = self.GetPersons([sfa_record['hrn']])
289 pointer = self.AddPerson(dict(sfa_record))
292 pointer = persons[0]['person_id']
294 #Does this make sense to senslab ?
295 #if 'enabled' in sfa_record and sfa_record['enabled']:
296 #self.UpdatePerson(pointer, {'enabled': sfa_record['enabled']})
298 # add this person to the site only if she is being added for the first
299 # time by sfa and doesont already exist in plc
300 if not persons or not persons[0]['site_ids']:
301 login_base = get_leaf(sfa_record['authority'])
302 self.AddPersonToSite(pointer, login_base)
304 # What roles should this user have?
305 self.AddRoleToPerson('user', pointer)
308 self.AddPersonKey(pointer, {'key_type' : 'ssh', 'key' : pub_key})
310 #No node adding outside OAR
314 #No site or node record update allowed
315 def update (self, old_sfa_record, new_sfa_record, hrn, new_key):
316 pointer = old_sfa_record['pointer']
317 type = old_sfa_record['type']
319 # new_key implemented for users only
320 if new_key and type not in [ 'user' ]:
321 raise UnknownSfaType(type)
323 #if (type == "authority"):
324 #self.shell.UpdateSite(pointer, new_sfa_record)
327 slab_record=self.sfa_fields_to_slab_fields(type, hrn, new_sfa_record)
328 if 'name' in slab_record:
329 slab_record.pop('name')
330 self.UpdateSlice(pointer, slab_record)
334 all_fields = new_sfa_record
335 for key in all_fields.keys():
336 if key in ['first_name', 'last_name', 'title', 'email',
337 'password', 'phone', 'url', 'bio', 'accepted_aup',
339 update_fields[key] = all_fields[key]
340 self.UpdatePerson(pointer, update_fields)
343 # must check this key against the previous one if it exists
344 persons = self.GetPersons([pointer], ['key_ids'])
346 keys = person['key_ids']
347 keys = self.GetKeys(person['key_ids'])
349 # Delete all stale keys
352 if new_key != key['key']:
353 self.DeleteKey(key['key_id'])
357 self.AddPersonKey(pointer, {'key_type': 'ssh', 'key': new_key})
363 def remove (self, sfa_record):
364 type=sfa_record['type']
365 hrn=sfa_record['hrn']
366 record_id= sfa_record['record_id']
368 username = hrn.split(".")[len(hrn.split(".")) -1]
370 persons = self.GetPersons(sfa_record)
371 #persons = self.GetPersons(username)
372 # only delete this person if he has site ids. if he doesnt, it probably means
373 # he was just removed from a site, not actually deleted
374 if persons and persons[0]['site_ids']:
375 self.DeletePerson(username)
376 elif type == 'slice':
377 if self.GetSlices(slice_filter = hrn, slice_filter_type = 'slice_hrn'):
378 self.DeleteSlice(hrn)
380 #elif type == 'authority':
381 #if self.GetSites(pointer):
382 #self.DeleteSite(pointer)
386 def GetPeers (self,auth = None, peer_filter=None, return_fields_list=None):
388 existing_records = {}
389 existing_hrns_by_types= {}
390 print >>sys.stderr, "\r\n \r\n SLABDRIVER GetPeers auth = %s, peer_filter %s, return_field %s " %(auth , peer_filter, return_fields_list)
391 all_records = dbsession.query(RegRecord).filter(RegRecord.type.like('%authority%')).all()
392 for record in all_records:
393 existing_records[(record.hrn,record.type)] = record
394 if record.type not in existing_hrns_by_types:
395 existing_hrns_by_types[record.type] = [record.hrn]
396 print >>sys.stderr, "\r\n \r\n SLABDRIVER GetPeers \t NOT IN existing_hrns_by_types %s " %( existing_hrns_by_types)
399 print >>sys.stderr, "\r\n \r\n SLABDRIVER GetPeers \t INNN type %s hrn %s " %( record.type,record.hrn )
400 existing_hrns_by_types[record.type].append(record.hrn)
401 print >>sys.stderr, "\r\n \r\n SLABDRIVER GetPeers \t INNN existing_hrns_by_types %s " %( existing_hrns_by_types)
402 #existing_hrns_by_types.update({record.type:(existing_hrns_by_types[record.type].append(record.hrn))})
404 print >>sys.stderr, "\r\n \r\n SLABDRIVER GetPeers existing_hrns_by_types %s " %( existing_hrns_by_types)
408 print >>sys.stderr, "\r\n \r\n SLABDRIVER GetPeers existing_hrns_by_types['authority+sa'] %s \t\t existing_records %s " %(existing_hrns_by_types['authority'],existing_records)
410 records_list.append(existing_records[(peer_filter,'authority')])
412 for hrn in existing_hrns_by_types['authority']:
413 records_list.append(existing_records[(hrn,'authority')])
415 print >>sys.stderr, "\r\n \r\n SLABDRIVER GetPeers records_list %s " %(records_list)
420 return_records = records_list
421 if not peer_filter and not return_fields_list:
425 print >>sys.stderr, "\r\n \r\n SLABDRIVER GetPeers return_records %s " %(return_records)
426 return return_records
429 #TODO : Handling OR request in make_ldap_filters_from_records instead of the for loop
430 #over the records' list
431 def GetPersons(self, person_filter=None, return_fields_list=None):
433 person_filter should be a list of dictionnaries when not set to None.
434 Returns a list of users found.
437 print>>sys.stderr, "\r\n \r\n \t\t\t GetPersons person_filter %s" %(person_filter)
439 if person_filter and isinstance(person_filter,list):
440 #If we are looking for a list of users (list of dict records)
441 #Usually the list contains only one user record
442 for f in person_filter:
443 person = self.ldap.LdapFindUser(f)
444 person_list.append(person)
447 person_list = self.ldap.LdapFindUser()
452 def GetTimezone(self):
453 server_timestamp,server_tz = self.oar.parser.SendRequest("GET_timezone")
454 return server_timestamp,server_tz
457 def DeleteJobs(self, job_id, slice_hrn):
460 username = slice_hrn.split(".")[-1].rstrip("_slice")
462 reqdict['method'] = "delete"
463 reqdict['strval'] = str(job_id)
464 answer = self.oar.POSTRequestToOARRestAPI('DELETE_jobs_id',reqdict,username)
465 print>>sys.stderr, "\r\n \r\n jobid DeleteJobs %s " %(answer)
467 def GetJobsId(self, job_id, username = None ):
469 Details about a specific job.
470 Includes details about submission time, jot type, state, events,
471 owner, assigned ressources, walltime etc...
475 node_list_k = 'assigned_network_address'
476 #Get job info from OAR
477 job_info = self.oar.parser.SendRequest(req, job_id, username)
479 logger.debug("SLABDRIVER \t GetJobs %s " %(job_info))
481 if job_info['state'] == 'Terminated':
482 logger.debug("SLABDRIVER \t GetJobsId job %s TERMINATED"\
485 if job_info['state'] == 'Error':
486 logger.debug("SLABDRIVER \t GetJobsId ERROR message %s "\
491 logger.error("SLABDRIVER \tGetJobsId KeyError")
494 parsed_job_info = self.get_info_on_reserved_nodes(job_info,node_list_k)
495 #Replaces the previous entry "assigned_network_address" / "reserved_resources"
497 job_info.update({'node_ids':parsed_job_info[node_list_k]})
498 del job_info[node_list_k]
499 logger.debug(" \r\nSLABDRIVER \t GetJobsId job_info %s " %(job_info))
503 def GetJobsResources(self,job_id, username = None):
504 #job_resources=['reserved_resources', 'assigned_resources','job_id', 'job_uri', 'assigned_nodes',\
506 #assigned_res = ['resource_id', 'resource_uri']
507 #assigned_n = ['node', 'node_uri']
509 req = "GET_jobs_id_resources"
510 node_list_k = 'reserved_resources'
512 #Get job info from OAR
513 job_info = self.oar.parser.SendRequest(req, job_id, username)
514 logger.debug("SLABDRIVER \t GetJobsResources %s " %(job_info))
516 parsed_job_info = self.get_info_on_reserved_nodes(job_info,node_list_k)
517 #Replaces the previous entry "assigned_network_address" / "reserved_resources"
519 job_info.update({'node_ids':parsed_job_info[node_list_k]})
520 del job_info[node_list_k]
524 def get_info_on_reserved_nodes(self,job_info,node_list_name):
525 #Get the list of the testbed nodes records and make a
526 #dictionnary keyed on the hostname out of it
527 node_list_dict = self.GetNodes()
528 #node_hostname_list = []
529 node_hostname_list = [node['hostname'] for node in node_list_dict]
530 #for node in node_list_dict:
531 #node_hostname_list.append(node['hostname'])
532 node_dict = dict(zip(node_hostname_list,node_list_dict))
534 reserved_node_hostname_list = []
535 for index in range(len(job_info[node_list_name])):
536 #job_info[node_list_name][k] =
537 reserved_node_hostname_list[index] = \
538 node_dict[job_info[node_list_name][index]]['hostname']
540 logger.debug("SLABDRIVER \t get_info_on_reserved_nodes \
541 reserved_node_hostname_list %s" \
542 %(reserved_node_hostname_list))
544 logger.error("SLABDRIVER \t get_info_on_reserved_nodes KEYERROR " )
546 return reserved_node_hostname_list
548 def GetReservedNodes(self):
549 # this function returns a list of all the nodes already involved in an oar job
562 return self.oar.parser.SendRequest("GET_reserved_nodes")
565 def GetNodes(self,node_filter_dict = None, return_fields_list = None):
567 node_filter_dict : dictionnary of lists
570 node_dict_by_id = self.oar.parser.SendRequest("GET_resources_full")
571 node_dict_list = node_dict_by_id.values()
573 #No filtering needed return the list directly
574 if not (node_filter_dict or return_fields_list):
575 return node_dict_list
577 return_node_list = []
579 for filter_key in node_filter_dict:
581 #Filter the node_dict_list by each value contained in the
582 #list node_filter_dict[filter_key]
583 for value in node_filter_dict[filter_key]:
584 for node in node_dict_list:
585 if node[filter_key] == value:
586 if return_fields_list :
588 for k in return_fields_list:
590 return_node_list.append(tmp)
592 return_node_list.append(node)
594 logger.log_exc("GetNodes KeyError")
598 return return_node_list
601 def GetSites(self, site_filter_name = None, return_fields_list = None):
602 site_dict = self.oar.parser.SendRequest("GET_sites")
603 #site_dict : dict where the key is the sit ename
604 return_site_list = []
605 if not ( site_filter_name or return_fields_list):
606 return_site_list = site_dict.values()
607 return return_site_list
609 if site_filter_name in site_dict:
610 if return_fields_list:
611 for field in return_fields_list:
615 tmp[field] = site_dict[site_filter_name][field]
617 logger.error("GetSites KeyError %s "%(field))
619 return_site_list.append(tmp)
621 return_site_list.append( site_dict[site_filter_name])
624 return return_site_list
627 def GetSlices(self, slice_filter = None, slice_filter_type = None, \
628 return_fields_list=None):
629 return_slice_list = []
632 authorized_filter_types_list = ['slice_hrn', 'record_id_user']
633 print>>sys.stderr,"\r\n SLABDRIVER \tGetSlices authorized_filter_types_list %s" %(authorized_filter_types_list)
634 if slice_filter_type in authorized_filter_types_list:
635 if slice_filter_type == 'slice_hrn':
636 slicerec = slab_dbsession.query(SliceSenslab).\
637 filter_by(slice_hrn = slice_filter).first()
639 if slice_filter_type == 'record_id_user':
640 slicerec = slab_dbsession.query(SliceSenslab).\
641 filter_by(record_id_user = slice_filter).first()
644 rec = slicerec.dump_sqlalchemyobj_to_dict()
645 print>>sys.stderr,"\r\n SLABDRIVER \tGetSlices rec %s" %(rec)
647 login = slicerec.slice_hrn.split(".")[1].split("_")[0]
648 logger.debug("\r\n SLABDRIVER \tGetSlices login %s slice record %s"\
650 if slicerec.oar_job_id is not -1:
651 #Check with OAR the status of the job if a job id is in
653 #rslt = self.GetJobsResources(slicerec.oar_job_id,username = login)
654 rslt = self.GetJobsId(slicerec.oar_job_id,username = login)
657 rec.update({'hrn':str(rec['slice_hrn'])})
658 #If GetJobsResources is empty, this means the job is now in the 'Terminated' state
659 #Update the slice record
661 self.db.update_job(slice_filter, job_id = -1)
662 rec['oar_job_id'] = -1
663 rec.update({'hrn':str(rec['slice_hrn'])})
666 rec['node_ids'] = rec['node_list']
670 #print >>sys.stderr, " \r\n \r\n \tSLABDRIVER.PY GetSlices rec %s" %(rec)
676 return_slice_list = slab_dbsession.query(SliceSenslab).all()
678 print >>sys.stderr, " \r\n \r\n \tSLABDRIVER.PY GetSlices slices %s slice_filter %s " %(return_slice_list,slice_filter)
680 #if return_fields_list:
681 #return_slice_list = parse_filter(sliceslist, slice_filter,'slice', return_fields_list)
685 return return_slice_list
690 def testbed_name (self): return "senslab2"
692 # 'geni_request_rspec_versions' and 'geni_ad_rspec_versions' are mandatory
693 def aggregate_version (self):
694 version_manager = VersionManager()
695 ad_rspec_versions = []
696 request_rspec_versions = []
697 for rspec_version in version_manager.versions:
698 if rspec_version.content_type in ['*', 'ad']:
699 ad_rspec_versions.append(rspec_version.to_dict())
700 if rspec_version.content_type in ['*', 'request']:
701 request_rspec_versions.append(rspec_version.to_dict())
703 'testbed':self.testbed_name(),
704 'geni_request_rspec_versions': request_rspec_versions,
705 'geni_ad_rspec_versions': ad_rspec_versions,
714 # Convert SFA fields to PLC fields for use when registering up updating
715 # registry record in the PLC database
717 # @param type type of record (user, slice, ...)
718 # @param hrn human readable name
719 # @param sfa_fields dictionary of SFA fields
720 # @param slab_fields dictionary of PLC fields (output)
722 def sfa_fields_to_slab_fields(self, type, hrn, record):
724 def convert_ints(tmpdict, int_fields):
725 for field in int_fields:
727 tmpdict[field] = int(tmpdict[field])
730 #for field in record:
731 # slab_record[field] = record[field]
734 #instantion used in get_slivers ?
735 if not "instantiation" in slab_record:
736 slab_record["instantiation"] = "senslab-instantiated"
737 slab_record["hrn"] = hrn_to_pl_slicename(hrn)
738 print >>sys.stderr, "\r\n \r\n \t SLABDRIVER.PY sfa_fields_to_slab_fields slab_record %s hrn_to_pl_slicename(hrn) hrn %s " %(slab_record['hrn'], hrn)
740 slab_record["url"] = record["url"]
741 if "description" in record:
742 slab_record["description"] = record["description"]
743 if "expires" in record:
744 slab_record["expires"] = int(record["expires"])
746 #nodes added by OAR only and then imported to SFA
747 #elif type == "node":
748 #if not "hostname" in slab_record:
749 #if not "hostname" in record:
750 #raise MissingSfaInfo("hostname")
751 #slab_record["hostname"] = record["hostname"]
752 #if not "model" in slab_record:
753 #slab_record["model"] = "geni"
756 #elif type == "authority":
757 #slab_record["login_base"] = hrn_to_slab_login_base(hrn)
759 #if not "name" in slab_record:
760 #slab_record["name"] = hrn
762 #if not "abbreviated_name" in slab_record:
763 #slab_record["abbreviated_name"] = hrn
765 #if not "enabled" in slab_record:
766 #slab_record["enabled"] = True
768 #if not "is_public" in slab_record:
769 #slab_record["is_public"] = True
774 def LaunchExperimentOnOAR(self, slice_dict, added_nodes, slice_user=None):
780 slice_name = slice_dict['name']
782 slot = slice_dict['timeslot']
783 logger.debug("SLABDRIVER.PY \tLaunchExperimentOnOAR slot %s" %(slot))
785 #Running on default parameters
786 #XP immediate , 10 mins
787 slot = {'date':None,'start_time':None, 'timezone':None,'duration':None }#10 min
789 reqdict['workdir']= '/tmp'
790 reqdict['resource'] ="{network_address in ("
791 #reqdict['property'] ="network_address in ("
792 for node in added_nodes:
793 print>>sys.stderr, "\r\n \r\n \t\tLaunchExperimentOnOAR node %s" %(node)
795 #Get the ID of the node : remove the root auth and put the site in a separate list
797 # NT: it's not clear for me if the nodenames will have the senslab prefix
798 # so lets take the last part only, for now.
800 #if s[0] == self.root_auth :
801 # Again here it's not clear if nodes will be prefixed with <site>_, lets split and tanke the last part for now.
802 #s=lastpart.split("_")
805 reqdict['resource'] += "'"+ nodeid +"', "
806 nodeid_list.append(nodeid)
809 reqdict['resource'] = reqdict['resource'][0: len( reqdict['resource'])-2] +")}/nodes=" + str(len(nodeid_list))
811 walltime = slot['duration'].split(":")
812 # Fixing the walltime by adding a few delays. First put the walltime in seconds
813 # oarAdditionalDelay = 20; additional delay for /bin/sleep command to
814 # take in account prologue and epilogue scripts execution
815 # int walltimeAdditionalDelay = 120; additional delay
817 desired_walltime = int(walltime[0])*3600 + int(walltime[1]) * 60 + int(walltime[2])
818 total_walltime = desired_walltime + 140 #+2 min 20
819 sleep_walltime = desired_walltime + 20 #+20 sec
820 print>>sys.stderr, "\r\n \r\n \t\tLaunchExperimentOnOAR desired_walltime %s total_walltime %s sleep_walltime %s " %(desired_walltime,total_walltime,sleep_walltime)
821 #Put the walltime back in str form
823 walltime[0] = str(total_walltime / 3600)
824 total_walltime = total_walltime - 3600 * int(walltime[0])
825 #Get the remaining minutes
826 walltime[1] = str(total_walltime / 60)
827 total_walltime = total_walltime - 60 * int(walltime[1])
829 walltime[2] = str(total_walltime)
830 print>>sys.stderr, "\r\n \r\n \t\tLaunchExperimentOnOAR walltime %s " %(walltime)
832 reqdict['resource']+= ",walltime=" + str(walltime[0]) + ":" + str(walltime[1]) + ":" + str(walltime[2])
833 reqdict['script_path'] = "/bin/sleep " + str(sleep_walltime)
835 reqdict['resource']+= ",walltime=" + str(00) + ":" + str(12) + ":" + str(20) #+2 min 20
836 reqdict['script_path'] = "/bin/sleep 620" #+20 sec
837 #In case of a scheduled experiment (not immediate)
838 #To run an XP immediately, don't specify date and time in RSpec
839 #They will be set to None.
840 server_timestamp,server_tz = self.GetTimezone()
841 if slot['date'] and slot['start_time']:
842 if slot['timezone'] is '' or slot['timezone'] is None:
843 #assume it is server timezone
844 from_zone=tz.gettz(server_tz)
845 print>>sys.stderr, "\r\n \r\n \t\tLaunchExperimentOnOAR timezone not specified server_tz %s from_zone %s" %(server_tz,from_zone)
847 #Get zone of the user from the reservation time given in the rspec
848 from_zone = tz.gettz(slot['timezone'])
850 date = str(slot['date']) + " " + str(slot['start_time'])
851 user_datetime = datetime.strptime(date, self.time_format)
852 user_datetime = user_datetime.replace(tzinfo = from_zone)
854 #Convert to server zone
855 #to_zone = tz.tzutc()
856 to_zone = tz.gettz(server_tz)
857 reservation_date = user_datetime.astimezone(to_zone)
858 #Readable time accpeted by OAR
859 reqdict['reservation']= reservation_date.strftime(self.time_format)
861 print>>sys.stderr, "\r\n \r\n \t\tLaunchExperimentOnOAR reqdict['reservation'] %s " %(reqdict['reservation'])
865 # reservations are performed in the oar server timebase, so :
866 # 1- we get the server time(in UTC tz )/server timezone
867 # 2- convert the server UTC time in its timezone
868 # 3- add a custom delay to this time
869 # 4- convert this time to a readable form and it for the reservation request.
870 server_timestamp,server_tz = self.GetTimezone()
871 s_tz=tz.gettz(server_tz)
872 UTC_zone = tz.gettz("UTC")
873 #weird... datetime.fromtimestamp should work since we do from datetime import datetime
874 utc_server= datetime.fromtimestamp(float(server_timestamp)+20,UTC_zone)
875 server_localtime=utc_server.astimezone(s_tz)
877 print>>sys.stderr, "\r\n \r\n \t\tLaunchExperimentOnOAR server_timestamp %s server_tz %s slice_name %s added_nodes %s username %s reqdict %s " %(server_timestamp,server_tz,slice_name,added_nodes,slice_user, reqdict )
878 readable_time = server_localtime.strftime(self.time_format)
880 print >>sys.stderr," \r\n \r\n \t\t\t\tAPRES ParseTimezone readable_time %s timestanp %s " %(readable_time ,server_timestamp)
881 reqdict['reservation'] = readable_time
884 reqdict['type'] = "deploy"
885 reqdict['directory']= ""
886 reqdict['name']= "TestSandrine"
889 # first step : start the OAR job and update the job
890 logger.debug("SLABDRIVER.PY \tLaunchExperimentOnOAR reqdict %s \r\n site_list %s" %(reqdict,site_list) )
892 answer = self.oar.POSTRequestToOARRestAPI('POST_job',reqdict,slice_user)
893 print>>sys.stderr, "\r\n \r\n LaunchExperimentOnOAR jobid %s " %(answer)
897 print>>sys.stderr, "\r\n AddSliceTonode Impossible to create job %s " %( answer)
900 print>>sys.stderr, "\r\n \r\n LaunchExperimentOnOAR jobid %s added_nodes %s slice_user %s" %(jobid,added_nodes,slice_user)
901 self.db.update_job( slice_name, jobid ,added_nodes)
904 # second step : configure the experiment
905 # we need to store the nodes in a yaml (well...) file like this :
906 # [1,56,23,14,45,75] with name /tmp/sfa<jobid>.json
907 f=open('/tmp/sfa/'+str(jobid)+'.json','w')
909 f.write(str(added_nodes[0].strip('node')))
910 for node in added_nodes[1:len(added_nodes)] :
911 f.write(','+node.strip('node'))
915 # third step : call the senslab-experiment wrapper
916 #command= "java -jar target/sfa-1.0-jar-with-dependencies.jar "+str(jobid)+" "+slice_user
917 javacmdline="/usr/bin/java"
918 jarname="/opt/senslabexperimentwrapper/sfa-1.0-jar-with-dependencies.jar"
919 #ret=subprocess.check_output(["/usr/bin/java", "-jar", ", str(jobid), slice_user])
920 output = subprocess.Popen([javacmdline, "-jar", jarname, str(jobid), slice_user],stdout=subprocess.PIPE).communicate()[0]
922 print>>sys.stderr, "\r\n \r\n LaunchExperimentOnOAR wrapper returns %s " %(output)
926 #Delete the jobs and updates the job id in the senslab table
928 #Does not clear the node list
929 def DeleteSliceFromNodes(self, slice_record):
930 # Get user information
932 self.DeleteJobs(slice_record['oar_job_id'], slice_record['hrn'])
933 self.db.update_job(slice_record['hrn'], job_id = -1)
940 def augment_records_with_testbed_info (self, sfa_records):
941 return self.fill_record_info (sfa_records)
943 def fill_record_info(self, record_list):
945 Given a SFA record, fill in the senslab specific and SFA specific
946 fields in the record.
949 logger.debug("SLABDRIVER \tfill_record_info records %s " %(record_list))
950 if not isinstance(record_list, list):
951 record_list = [record_list]
954 for record in record_list:
955 #If the record is a SFA slice record, then add information
956 #about the user of this slice. This kind of information is in the
958 if str(record['type']) == 'slice':
959 #Get slab slice record.
960 recslice = self.GetSlices(slice_filter = \
962 slice_filter_type = 'slice_hrn')
963 recuser = dbsession.query(RegRecord).filter_by(record_id = \
964 recslice['record_id_user']).first()
965 logger.debug( "SLABDRIVER.PY \t fill_record_info SLICE \
966 rec %s \r\n \r\n" %(recslice))
967 record.update({'PI':[recuser.hrn],
968 'researcher': [recuser.hrn],
969 'name':record['hrn'],
970 'oar_job_id':recslice['oar_job_id'],
972 'person_ids':[recslice['record_id_user']],
973 'geni_urn':'', #For client_helper.py compatibility
974 'keys':'', #For client_helper.py compatibility
975 'key_ids':''}) #For client_helper.py compatibility
977 elif str(record['type']) == 'user':
978 #The record is a SFA user record.
979 #Get the information about his slice from Senslab's DB
980 #and add it to the user record.
981 recslice = self.GetSlices(slice_filter = \
982 record['record_id'],\
983 slice_filter_type = 'record_id_user')
985 logger.debug( "SLABDRIVER.PY \t fill_record_info user \
986 rec %s \r\n \r\n" %(recslice))
987 #Append slice record in records list,
988 #therefore fetches user and slice info again(one more loop)
989 #Will update PIs and researcher for the slice
990 recuser = dbsession.query(RegRecord).filter_by(record_id = \
991 recslice['record_id_user']).first()
992 recslice.update({'PI':[recuser.hrn],
993 'researcher': [recuser.hrn],
994 'name':record['hrn'],
995 'oar_job_id':recslice['oar_job_id'],
997 'person_ids':[recslice['record_id_user']]})
999 #GetPersons takes [] as filters
1000 #user_slab = self.GetPersons([{'hrn':recuser.hrn}])
1001 user_slab = self.GetPersons([record])
1003 recslice.update({'type':'slice','hrn':recslice['slice_hrn']})
1004 record.update(user_slab[0])
1005 #For client_helper.py compatibility
1006 record.update( { 'geni_urn':'',
1009 record_list.append(recslice)
1011 logger.debug("SLABDRIVER.PY \tfill_record_info ADDING SLICE\
1012 INFO TO USER records %s" %(record_list))
1016 logger.log_exc("SLABDRIVER \t fill_record_info EXCEPTION %s" %(e))
1020 #self.fill_record_slab_info(records)
1021 ##print >>sys.stderr, "\r\n \t\t after fill_record_slab_info %s" %(records)
1022 #self.fill_record_sfa_info(records)
1023 #print >>sys.stderr, "\r\n \t\t after fill_record_sfa_info"
1029 #def update_membership_list(self, oldRecord, record, listName, addFunc, delFunc):
1030 ## get a list of the HRNs tht are members of the old and new records
1032 #oldList = oldRecord.get(listName, [])
1035 #newList = record.get(listName, [])
1037 ## if the lists are the same, then we don't have to update anything
1038 #if (oldList == newList):
1041 ## build a list of the new person ids, by looking up each person to get
1045 #records = table.find({'type': 'user', 'hrn': newList})
1046 #for rec in records:
1047 #newIdList.append(rec['pointer'])
1049 ## build a list of the old person ids from the person_ids field
1051 #oldIdList = oldRecord.get("person_ids", [])
1052 #containerId = oldRecord.get_pointer()
1054 ## if oldRecord==None, then we are doing a Register, instead of an
1057 #containerId = record.get_pointer()
1059 ## add people who are in the new list, but not the oldList
1060 #for personId in newIdList:
1061 #if not (personId in oldIdList):
1062 #addFunc(self.plauth, personId, containerId)
1064 ## remove people who are in the old list, but not the new list
1065 #for personId in oldIdList:
1066 #if not (personId in newIdList):
1067 #delFunc(self.plauth, personId, containerId)
1069 #def update_membership(self, oldRecord, record):
1070 #print >>sys.stderr, " \r\n \r\n ***SLABDRIVER.PY update_membership record ", record
1071 #if record.type == "slice":
1072 #self.update_membership_list(oldRecord, record, 'researcher',
1073 #self.users.AddPersonToSlice,
1074 #self.users.DeletePersonFromSlice)
1075 #elif record.type == "authority":
1080 # I don't think you plan on running a component manager at this point
1081 # let me clean up the mess of ComponentAPI that is deprecated anyways