4 from datetime import datetime
5 from dateutil import tz
6 from time import strftime,gmtime
8 from sfa.util.faults import MissingSfaInfo , SliverDoesNotExist
9 from sfa.util.sfalogging import logger
10 from sfa.util.defaultdict import defaultdict
12 from sfa.storage.record import Record
13 from sfa.storage.alchemy import dbsession
14 from sfa.storage.model import RegRecord
16 from sfa.trust.credential import Credential
17 from sfa.trust.gid import GID
19 from sfa.managers.driver import Driver
20 from sfa.rspecs.version_manager import VersionManager
21 from sfa.rspecs.rspec import RSpec
23 from sfa.util.xrn import hrn_to_urn, urn_to_sliver_id
24 from sfa.util.plxrn import slicename_to_hrn, hostname_to_hrn, hrn_to_pl_slicename
26 ## thierry: everything that is API-related (i.e. handling incoming requests)
28 # SlabDriver should be really only about talking to the senslab testbed
31 from sfa.senslab.OARrestapi import OARrestapi
32 from sfa.senslab.LDAPapi import LDAPapi
34 from sfa.senslab.slabpostgres import SlabDB, slab_dbsession,SliceSenslab
35 from sfa.senslab.slabaggregate import SlabAggregate
36 from sfa.senslab.slabslices import SlabSlices
41 # this inheritance scheme is so that the driver object can receive
42 # GetNodes or GetSites sorts of calls directly
43 # and thus minimize the differences in the managers with the pl version
44 class SlabDriver(Driver):
46 def __init__(self, config):
47 Driver.__init__ (self, config)
49 self.hrn = config.SFA_INTERFACE_HRN
51 self.root_auth = config.SFA_REGISTRY_ROOT_AUTH
53 self.oar = OARrestapi()
55 self.time_format = "%Y-%m-%d %H:%M:%S"
56 self.db = SlabDB(config)
60 def sliver_status(self,slice_urn,slice_hrn):
61 """Receive a status request for slice named urn/hrn
62 urn:publicid:IDN+senslab+nturro_slice hrn senslab.nturro_slice
63 shall return a structure as described in
64 http://groups.geni.net/geni/wiki/GAPI_AM_API_V2#SliverStatus
65 NT : not sure if we should implement this or not, but used by sface.
69 #First get the slice with the slice hrn
70 sl = self.GetSlices(slice_filter= slice_hrn, filter_type = 'slice_hrn')
72 raise SliverDoesNotExist("%s slice_hrn" % (slice_hrn))
74 nodes_in_slice = sl['node_ids']
75 if len(nodes_in_slice) is 0:
76 raise SliverDoesNotExist("No slivers allocated ")
78 logger.debug("Slabdriver - sliver_status Sliver status urn %s hrn %s sl\
79 %s \r\n " %(slice_urn,slice_hrn,sl) )
81 if sl['oar_job_id'] is not -1:
82 #A job is running on Senslab for this slice
83 # report about the local nodes that are in the slice only
85 nodes_all = self.GetNodes({'hostname':nodes_in_slice},
86 ['node_id', 'hostname','site','boot_state'])
87 nodeall_byhostname = dict([(n['hostname'], n) for n in nodes_all])
91 top_level_status = 'unknown'
93 top_level_status = 'ready'
94 result['geni_urn'] = slice_urn
95 result['pl_login'] = sl['job_user'] #For compatibility
98 timestamp = float(sl['startTime']) + float(sl['walltime'])
99 result['pl_expires'] = strftime(self.time_format, \
100 gmtime(float(timestamp)))
101 #result['slab_expires'] = strftime(self.time_format,\
102 #gmtime(float(timestamp)))
107 #res['slab_hostname'] = node['hostname']
108 #res['slab_boot_state'] = node['boot_state']
110 res['pl_hostname'] = nodeall_byhostname[node]['hostname']
111 res['pl_boot_state'] = nodeall_byhostname[node]['boot_state']
112 res['pl_last_contact'] = strftime(self.time_format, \
113 gmtime(float(timestamp)))
114 sliver_id = urn_to_sliver_id(slice_urn, sl['record_id_slice'], \
115 nodeall_byhostname[node]['node_id'])
116 res['geni_urn'] = sliver_id
117 if nodeall_byhostname[node]['boot_state'] == 'Alive':
119 res['geni_status'] = 'ready'
121 res['geni_status'] = 'failed'
122 top_level_status = 'failed'
124 res['geni_error'] = ''
126 resources.append(res)
128 result['geni_status'] = top_level_status
129 result['geni_resources'] = resources
130 print >>sys.stderr, "\r\n \r\n_____________ Sliver status resources %s res %s \r\n " %(resources,res)
134 def create_sliver (self, slice_urn, slice_hrn, creds, rspec_string, users, options):
135 print>>sys.stderr, "\r\n \r\n \t=============================== SLABDRIVER.PY create_sliver "
136 aggregate = SlabAggregate(self)
138 slices = SlabSlices(self)
139 peer = slices.get_peer(slice_hrn)
140 sfa_peer = slices.get_sfa_peer(slice_hrn)
143 if not isinstance(creds, list):
147 slice_record = users[0].get('slice_record', {})
150 rspec = RSpec(rspec_string)
151 print>>sys.stderr, "\r\n \r\n \t=============================== SLABDRIVER.PY create_sliver ============================rspec.version %s " %(rspec.version)
154 # ensure site record exists?
155 # ensure slice record exists
156 slice = slices.verify_slice(slice_hrn, slice_record, peer, sfa_peer, options=options)
157 requested_attributes = rspec.version.get_slice_attributes()
159 if requested_attributes:
160 for attrib_dict in requested_attributes:
161 if 'timeslot' in attrib_dict and attrib_dict['timeslot'] is not None:
162 slice.update({'timeslot':attrib_dict['timeslot']})
163 print >>sys.stderr, "\r\n \r\n \t=============================== SLABDRIVER.PY create_sliver ..... slice %s " %(slice)
164 # ensure person records exists
165 persons = slices.verify_persons(slice_hrn, slice, users, peer, sfa_peer, options=options)
166 # ensure slice attributes exists?
169 # add/remove slice from nodes
170 print >>sys.stderr, "\r\n \r\n \t=============================== SLABDRIVER.PY create_sliver ..... "
172 requested_slivers = [node.get('component_name') for node in rspec.version.get_nodes_with_slivers()]
173 print >>sys.stderr, "\r\n \r\n \t=============================== ........... requested_slivers ============================requested_slivers %s " %(requested_slivers)
174 nodes = slices.verify_slice_nodes(slice, requested_slivers, peer)
177 return aggregate.get_rspec(slice_xrn=slice_urn, version=rspec.version)
180 def delete_sliver (self, slice_urn, slice_hrn, creds, options):
182 slice = self.GetSlices(slice_filter= slice_hrn, filter_type = 'slice_hrn')
183 print>>sys.stderr, "\r\n \r\n \t\t SLABDRIVER.PY delete_sliver slice %s" %(slice)
187 slices = SlabSlices(self)
188 # determine if this is a peer slice
189 # xxx I wonder if this would not need to use PlSlices.get_peer instead
190 # in which case plc.peers could be deprecated as this here
191 # is the only/last call to this last method in plc.peers
192 peer = slices.get_peer(slice_hrn)
195 self.UnBindObjectFromPeer('slice', slice['record_id_slice'], peer)
196 self.DeleteSliceFromNodes(slice)
199 self.BindObjectToPeer('slice', slice['slice_id'], peer, slice['peer_slice_id'])
203 def AddSlice(self, slice_record):
204 slab_slice = SliceSenslab( slice_hrn = slice_record['slice_hrn'], record_id_slice= slice_record['record_id_slice'] , record_id_user= slice_record['record_id_user'], peer_authority = slice_record['peer_authority'])
205 print>>sys.stderr, "\r\n \r\n \t\t\t =======SLABDRIVER.PY AddSlice slice_record %s slab_slice %s" %(slice_record,slab_slice)
206 slab_dbsession.add(slab_slice)
207 slab_dbsession.commit()
210 # first 2 args are None in case of resource discovery
211 def list_resources (self, slice_urn, slice_hrn, creds, options):
212 #cached_requested = options.get('cached', True)
214 version_manager = VersionManager()
215 # get the rspec's return format from options
216 rspec_version = version_manager.get_version(options.get('geni_rspec_version'))
217 version_string = "rspec_%s" % (rspec_version)
219 #panos adding the info option to the caching key (can be improved)
220 if options.get('info'):
221 version_string = version_string + "_"+options.get('info', 'default')
223 # look in cache first
224 #if cached_requested and self.cache and not slice_hrn:
225 #rspec = self.cache.get(version_string)
227 #logger.debug("SlabDriver.ListResources: returning cached advertisement")
230 #panos: passing user-defined options
232 aggregate = SlabAggregate(self)
233 origin_hrn = Credential(string=creds[0]).get_gid_caller().get_hrn()
234 options.update({'origin_hrn':origin_hrn})
235 rspec = aggregate.get_rspec(slice_xrn=slice_urn, version=rspec_version,
237 print>>sys.stderr, " \r\n \r\n \t SLABDRIVER list_resources rspec "
239 #if self.cache and not slice_hrn:
240 #logger.debug("Slab.ListResources: stores advertisement in cache")
241 #self.cache.add(version_string, rspec)
246 def list_slices (self, creds, options):
247 # look in cache first
249 #slices = self.cache.get('slices')
251 #logger.debug("PlDriver.list_slices returns from cache")
255 print>>sys.stderr, " \r\n \t\t SLABDRIVER.PY list_slices"
256 slices = self.GetSlices()
257 slice_hrns = [slicename_to_hrn(self.hrn, slice['slice_hrn']) for slice in slices]
258 slice_urns = [hrn_to_urn(slice_hrn, 'slice') for slice_hrn in slice_hrns]
262 #logger.debug ("SlabDriver.list_slices stores value in cache")
263 #self.cache.add('slices', slice_urns)
267 #No site or node register supported
268 def register (self, sfa_record, hrn, pub_key):
269 type = sfa_record['type']
270 slab_record = self.sfa_fields_to_slab_fields(type, hrn, sfa_record)
274 acceptable_fields=['url', 'instantiation', 'name', 'description']
275 for key in slab_record.keys():
276 if key not in acceptable_fields:
278 print>>sys.stderr, " \r\n \t\t SLABDRIVER.PY register"
279 slices = self.GetSlices(slice_filter =slab_record['hrn'], filter_type = 'slice_hrn')
281 pointer = self.AddSlice(slab_record)
283 pointer = slices[0]['slice_id']
286 persons = self.GetPersons([sfa_record['hrn']])
288 pointer = self.AddPerson(dict(sfa_record))
291 pointer = persons[0]['person_id']
293 #Does this make sense to senslab ?
294 #if 'enabled' in sfa_record and sfa_record['enabled']:
295 #self.UpdatePerson(pointer, {'enabled': sfa_record['enabled']})
297 # add this person to the site only if she is being added for the first
298 # time by sfa and doesont already exist in plc
299 if not persons or not persons[0]['site_ids']:
300 login_base = get_leaf(sfa_record['authority'])
301 self.AddPersonToSite(pointer, login_base)
303 # What roles should this user have?
304 self.AddRoleToPerson('user', pointer)
307 self.AddPersonKey(pointer, {'key_type' : 'ssh', 'key' : pub_key})
309 #No node adding outside OAR
313 #No site or node record update allowed
314 def update (self, old_sfa_record, new_sfa_record, hrn, new_key):
315 pointer = old_sfa_record['pointer']
316 type = old_sfa_record['type']
318 # new_key implemented for users only
319 if new_key and type not in [ 'user' ]:
320 raise UnknownSfaType(type)
322 #if (type == "authority"):
323 #self.shell.UpdateSite(pointer, new_sfa_record)
326 slab_record=self.sfa_fields_to_slab_fields(type, hrn, new_sfa_record)
327 if 'name' in slab_record:
328 slab_record.pop('name')
329 self.UpdateSlice(pointer, slab_record)
333 all_fields = new_sfa_record
334 for key in all_fields.keys():
335 if key in ['first_name', 'last_name', 'title', 'email',
336 'password', 'phone', 'url', 'bio', 'accepted_aup',
338 update_fields[key] = all_fields[key]
339 self.UpdatePerson(pointer, update_fields)
342 # must check this key against the previous one if it exists
343 persons = self.GetPersons([pointer], ['key_ids'])
345 keys = person['key_ids']
346 keys = self.GetKeys(person['key_ids'])
348 # Delete all stale keys
351 if new_key != key['key']:
352 self.DeleteKey(key['key_id'])
356 self.AddPersonKey(pointer, {'key_type': 'ssh', 'key': new_key})
362 def remove (self, sfa_record):
363 type=sfa_record['type']
364 hrn=sfa_record['hrn']
365 record_id= sfa_record['record_id']
367 username = hrn.split(".")[len(hrn.split(".")) -1]
369 persons = self.GetPersons(username)
370 # only delete this person if he has site ids. if he doesnt, it probably means
371 # he was just removed from a site, not actually deleted
372 if persons and persons[0]['site_ids']:
373 self.DeletePerson(username)
374 elif type == 'slice':
375 if self.GetSlices(slice_filter = hrn, filter_type = 'slice_hrn'):
376 self.DeleteSlice(hrn)
378 #elif type == 'authority':
379 #if self.GetSites(pointer):
380 #self.DeleteSite(pointer)
384 def GetPeers (self,auth = None, peer_filter=None, return_fields_list=None):
386 existing_records = {}
387 existing_hrns_by_types= {}
388 print >>sys.stderr, "\r\n \r\n SLABDRIVER GetPeers auth = %s, peer_filter %s, return_field %s " %(auth , peer_filter, return_fields_list)
389 all_records = dbsession.query(RegRecord).filter(RegRecord.type.like('%authority%')).all()
390 for record in all_records:
391 existing_records[(record.hrn,record.type)] = record
392 if record.type not in existing_hrns_by_types:
393 existing_hrns_by_types[record.type] = [record.hrn]
394 print >>sys.stderr, "\r\n \r\n SLABDRIVER GetPeers \t NOT IN existing_hrns_by_types %s " %( existing_hrns_by_types)
397 print >>sys.stderr, "\r\n \r\n SLABDRIVER GetPeers \t INNN type %s hrn %s " %( record.type,record.hrn )
398 existing_hrns_by_types[record.type].append(record.hrn)
399 print >>sys.stderr, "\r\n \r\n SLABDRIVER GetPeers \t INNN existing_hrns_by_types %s " %( existing_hrns_by_types)
400 #existing_hrns_by_types.update({record.type:(existing_hrns_by_types[record.type].append(record.hrn))})
402 print >>sys.stderr, "\r\n \r\n SLABDRIVER GetPeers existing_hrns_by_types %s " %( existing_hrns_by_types)
406 print >>sys.stderr, "\r\n \r\n SLABDRIVER GetPeers existing_hrns_by_types['authority+sa'] %s \t\t existing_records %s " %(existing_hrns_by_types['authority'],existing_records)
408 records_list.append(existing_records[(peer_filter,'authority')])
410 for hrn in existing_hrns_by_types['authority']:
411 records_list.append(existing_records[(hrn,'authority')])
413 print >>sys.stderr, "\r\n \r\n SLABDRIVER GetPeers records_list %s " %(records_list)
418 return_records = records_list
419 if not peer_filter and not return_fields_list:
423 print >>sys.stderr, "\r\n \r\n SLABDRIVER GetPeers return_records %s " %(return_records)
424 return return_records
427 #TODO : Handling OR request in make_ldap_filters_from_records instead of the for loop
428 #over the records' list
429 def GetPersons(self, person_filter=None, return_fields_list=None):
431 person_filter should be a list of dictionnaries when not set to None.
432 Returns a list of users found.
435 print>>sys.stderr, "\r\n \r\n \t\t\t GetPersons person_filter %s" %(person_filter)
437 if person_filter and isinstance(person_filter,list):
438 #If we are looking for a list of users (list of dict records)
439 #Usually the list contains only one user record
440 for f in person_filter:
441 person = self.ldap.LdapFindUser(f)
442 person_list.append(person)
445 person_list = self.ldap.LdapFindUser()
450 def GetTimezone(self):
451 server_timestamp,server_tz = self.oar.parser.SendRequest("GET_timezone")
452 return server_timestamp,server_tz
455 def DeleteJobs(self, job_id, slice_hrn):
458 username = slice_hrn.split(".")[-1].rstrip("_slice")
460 reqdict['method'] = "delete"
461 reqdict['strval'] = str(job_id)
462 answer = self.oar.POSTRequestToOARRestAPI('DELETE_jobs_id',reqdict,username)
463 print>>sys.stderr, "\r\n \r\n jobid DeleteJobs %s " %(answer)
466 def GetJobs(self,job_id= None, resources=True,return_fields_list=None, username = None):
467 #job_resources=['reserved_resources', 'assigned_resources','job_id', 'job_uri', 'assigned_nodes',\
469 #assigned_res = ['resource_id', 'resource_uri']
470 #assigned_n = ['node', 'node_uri']
472 if job_id and resources is False:
474 node_list_k = 'assigned_network_address'
476 if job_id and resources :
477 req = "GET_jobs_id_resources"
478 node_list_k = 'reserved_resources'
480 #Get job info from OAR
481 job_info = self.oar.parser.SendRequest(req, job_id, username)
482 print>>sys.stderr, "\r\n \r\n \t\t GetJobs %s " %(job_info)
484 if 'state' in job_info :
485 if job_info['state'] == 'Terminated':
486 print>>sys.stderr, "\r\n \r\n \t\t GetJobs TERMINELEBOUSIN "
488 if job_info['state'] == 'Error':
489 print>>sys.stderr, "\r\n \r\n \t\t GetJobs ERROR message %s " %(job_info)
492 #Get a dict of nodes . Key :hostname of the node
493 node_list = self.GetNodes()
494 node_hostname_list = []
495 for node in node_list:
496 node_hostname_list.append(node['hostname'])
497 node_dict = dict(zip(node_hostname_list,node_list))
499 liste =job_info[node_list_k]
500 for k in range(len(liste)):
501 job_info[node_list_k][k] = node_dict[job_info[node_list_k][k]]['hostname']
503 #Replaces the previous entry "assigned_network_address" / "reserved_resources"
505 job_info.update({'node_ids':job_info[node_list_k]})
506 del job_info[node_list_k]
510 print>>sys.stderr, "\r\n \r\n \t\t GetJobs KEYERROR "
512 def GetReservedNodes(self):
513 # this function returns a list of all the nodes already involved in an oar job
514 #jobs=self.oar.parser.SendRequest("GET_reserved_nodes")
515 jobs=self.oar.parser.SendRequest("GET_jobs_details")
518 nodes=j['assigned_network_address']+nodes
521 def GetNodes(self,node_filter_dict = None, return_fields_list = None):
523 node_filter_dict : dictionnary of lists
526 node_dict_by_id = self.oar.parser.SendRequest("GET_resources_full")
527 node_dict_list = node_dict_by_id.values()
529 #No filtering needed return the list directly
530 if not (node_filter_dict or return_fields_list):
531 return node_dict_list
533 return_node_list = []
535 for filter_key in node_filter_dict:
537 #Filter the node_dict_list by each value contained in the
538 #list node_filter_dict[filter_key]
539 for value in node_filter_dict[filter_key]:
540 for node in node_dict_list:
541 if node[filter_key] == value:
542 if return_fields_list :
544 for k in return_fields_list:
546 return_node_list.append(tmp)
548 return_node_list.append(node)
550 logger.log_exc("GetNodes KeyError")
554 return return_node_list
557 def GetSites(self, site_filter_name = None, return_fields_list = None):
558 site_dict = self.oar.parser.SendRequest("GET_sites")
559 #site_dict : dict where the key is the sit ename
560 return_site_list = []
561 if not ( site_filter_name or return_fields_list):
562 return_site_list = site_dict.values()
563 return return_site_list
565 if site_filter_name in site_dict:
566 if return_fields_list:
567 for field in return_fields_list:
571 tmp[field] = site_dict[site_filter_name][field]
573 logger.error("GetSites KeyError %s "%(field))
575 return_site_list.append(tmp)
577 return_site_list.append( site_dict[site_filter_name])
580 return return_site_list
583 def GetSlices(self,slice_filter = None, filter_type = None, return_fields_list=None):
584 return_slice_list = []
587 ftypes = ['slice_hrn', 'record_id_user']
588 if filter_type and filter_type in ftypes:
589 if filter_type == 'slice_hrn':
590 slicerec = slab_dbsession.query(SliceSenslab).filter_by(slice_hrn = slice_filter).first()
591 if filter_type == 'record_id_user':
592 slicerec = slab_dbsession.query(SliceSenslab).filter_by(record_id_user = slice_filter).first()
595 rec = slicerec.dumpquerytodict()
596 login = slicerec.slice_hrn.split(".")[1].split("_")[0]
597 #print >>sys.stderr, " \r\n \r\n \tSLABDRIVER.PY slicerec GetSlices %s " %(slicerec)
598 if slicerec.oar_job_id is not -1:
599 rslt = self.GetJobs( slicerec.oar_job_id, resources=False, username = login )
600 #print >>sys.stderr, " \r\n \r\n \tSLABDRIVER.PY GetSlices GetJobs %s " %(rslt)
603 rec.update({'hrn':str(rec['slice_hrn'])})
604 #If GetJobs is empty, this means the job is now in the 'Terminated' state
605 #Update the slice record
607 self.db.update_job(slice_filter, job_id = -1)
608 rec['oar_job_id'] = -1
609 rec.update({'hrn':str(rec['slice_hrn'])})
612 rec['node_ids'] = rec['node_list']
616 #print >>sys.stderr, " \r\n \r\n \tSLABDRIVER.PY GetSlices rec %s" %(rec)
622 return_slice_list = slab_dbsession.query(SliceSenslab).all()
624 print >>sys.stderr, " \r\n \r\n \tSLABDRIVER.PY GetSlices slices %s slice_filter %s " %(return_slice_list,slice_filter)
626 #if return_fields_list:
627 #return_slice_list = parse_filter(sliceslist, slice_filter,'slice', return_fields_list)
631 return return_slice_list
636 def testbed_name (self): return "senslab2"
638 # 'geni_request_rspec_versions' and 'geni_ad_rspec_versions' are mandatory
639 def aggregate_version (self):
640 version_manager = VersionManager()
641 ad_rspec_versions = []
642 request_rspec_versions = []
643 for rspec_version in version_manager.versions:
644 if rspec_version.content_type in ['*', 'ad']:
645 ad_rspec_versions.append(rspec_version.to_dict())
646 if rspec_version.content_type in ['*', 'request']:
647 request_rspec_versions.append(rspec_version.to_dict())
649 'testbed':self.testbed_name(),
650 'geni_request_rspec_versions': request_rspec_versions,
651 'geni_ad_rspec_versions': ad_rspec_versions,
660 # Convert SFA fields to PLC fields for use when registering up updating
661 # registry record in the PLC database
663 # @param type type of record (user, slice, ...)
664 # @param hrn human readable name
665 # @param sfa_fields dictionary of SFA fields
666 # @param slab_fields dictionary of PLC fields (output)
668 def sfa_fields_to_slab_fields(self, type, hrn, record):
670 def convert_ints(tmpdict, int_fields):
671 for field in int_fields:
673 tmpdict[field] = int(tmpdict[field])
676 #for field in record:
677 # slab_record[field] = record[field]
680 #instantion used in get_slivers ?
681 if not "instantiation" in slab_record:
682 slab_record["instantiation"] = "senslab-instantiated"
683 slab_record["hrn"] = hrn_to_pl_slicename(hrn)
684 print >>sys.stderr, "\r\n \r\n \t SLABDRIVER.PY sfa_fields_to_slab_fields slab_record %s hrn_to_pl_slicename(hrn) hrn %s " %(slab_record['hrn'], hrn)
686 slab_record["url"] = record["url"]
687 if "description" in record:
688 slab_record["description"] = record["description"]
689 if "expires" in record:
690 slab_record["expires"] = int(record["expires"])
692 #nodes added by OAR only and then imported to SFA
693 #elif type == "node":
694 #if not "hostname" in slab_record:
695 #if not "hostname" in record:
696 #raise MissingSfaInfo("hostname")
697 #slab_record["hostname"] = record["hostname"]
698 #if not "model" in slab_record:
699 #slab_record["model"] = "geni"
702 #elif type == "authority":
703 #slab_record["login_base"] = hrn_to_slab_login_base(hrn)
705 #if not "name" in slab_record:
706 #slab_record["name"] = hrn
708 #if not "abbreviated_name" in slab_record:
709 #slab_record["abbreviated_name"] = hrn
711 #if not "enabled" in slab_record:
712 #slab_record["enabled"] = True
714 #if not "is_public" in slab_record:
715 #slab_record["is_public"] = True
720 def LaunchExperimentOnOAR(self, slice_dict, added_nodes, slice_user=None):
726 slice_name = slice_dict['name']
728 slot = slice_dict['timeslot']
729 print>>sys.stderr, "\r\n \r\n \t\tLaunchExperimentOnOAR slot %s " %(slot)
731 #Running on default parameters
732 #XP immediate , 10 mins
733 slot = {'date':None,'start_time':None, 'timezone':None,'duration':None }#10 min
736 reqdict['property'] ="network_address in ("
737 for node in added_nodes:
738 #Get the ID of the node : remove the root auth and put the site in a separate list
740 # NT: it's not clear for me if the nodenames will have the senslab prefix
741 # so lets take the last part only, for now.
743 #if s[0] == self.root_auth :
744 # Again here it's not clear if nodes will be prefixed with <site>_, lets split and tanke the last part for now.
745 s=lastpart.split("_")
747 reqdict['property'] += "'"+ nodeid +"', "
748 nodeid_list.append(nodeid)
749 #site_list.append( l[0] )
752 reqdict['property'] = reqdict['property'][0: len( reqdict['property'])-2] +")"
753 reqdict['resource'] ="network_address="+ str(len(nodeid_list))
756 walltime = slot['duration'].split(":")
757 # Fixing the walltime by adding a few delays. First put the walltime in seconds
758 # oarAdditionalDelay = 20; additional delay for /bin/sleep command to
759 # take in account prologue and epilogue scripts execution
760 # int walltimeAdditionalDelay = 120; additional delay
762 desired_walltime = int(walltime[0])*3600 + int(walltime[1]) * 60 + int(walltime[2])
763 total_walltime = desired_walltime + 140 #+2 min 20
764 sleep_walltime = desired_walltime + 20 #+20 sec
765 print>>sys.stderr, "\r\n \r\n \t\tLaunchExperimentOnOAR desired_walltime %s total_walltime %s sleep_walltime %s " %(desired_walltime,total_walltime,sleep_walltime)
766 #Put the walltime back in str form
768 walltime[0] = str(total_walltime / 3600)
769 total_walltime = total_walltime - 3600 * int(walltime[0])
770 #Get the remaining minutes
771 walltime[1] = str(total_walltime / 60)
772 total_walltime = total_walltime - 60 * int(walltime[1])
774 walltime[2] = str(total_walltime)
775 print>>sys.stderr, "\r\n \r\n \t\tLaunchExperimentOnOAR walltime %s " %(walltime)
777 reqdict['resource']+= ",walltime=" + str(walltime[0]) + ":" + str(walltime[1]) + ":" + str(walltime[2])
778 reqdict['script_path'] = "/bin/sleep " + str(sleep_walltime)
780 reqdict['resource']+= ",walltime=" + str(00) + ":" + str(12) + ":" + str(20) #+2 min 20
781 reqdict['script_path'] = "/bin/sleep 620" #+20 sec
782 #In case of a scheduled experiment (not immediate)
783 #To run an XP immediately, don't specify date and time in RSpec
784 #They will be set to None.
785 if slot['date'] and slot['start_time']:
786 if slot['timezone'] is '' or slot['timezone'] is None:
787 #assume it is server timezone
788 server_timestamp,server_tz = self.GetTimezone()
789 from_zone=tz.gettz(server_tz)
790 print>>sys.stderr, "\r\n \r\n \t\tLaunchExperimentOnOAR timezone not specified server_tz %s from_zone %s" %(server_tz,from_zone)
792 #Get zone of the user from the reservation time given in the rspec
793 from_zone = tz.gettz(slot['timezone'])
795 date = str(slot['date']) + " " + str(slot['start_time'])
796 user_datetime = datetime.datetime.strptime(date, self.time_format)
797 user_datetime = user_datetime.replace(tzinfo = from_zone)
801 utc_date = user_datetime.astimezone(to_zone)
802 #Readable time accpeted by OAR
803 reqdict['reservation']= utc_date.strftime(self.time_format)
805 print>>sys.stderr, "\r\n \r\n \t\tLaunchExperimentOnOAR reqdict['reservation'] %s " %(reqdict['reservation'])
809 # reservations are performed in the oar server timebase, so :
810 # 1- we get the server time(in UTC tz )/server timezone
811 # 2- convert the server UTC time in its timezone
812 # 3- add a custom delay to this time
813 # 4- convert this time to a readable form and it for the reservation request.
814 server_timestamp,server_tz = self.GetTimezone()
815 s_tz=tz.gettz(server_tz)
816 UTC_zone = tz.gettz("UTC")
817 #weird... datetime.fromtimestamp should work since we do from datetime import datetime
818 utc_server= datetime.datetime.fromtimestamp(float(server_timestamp)+20,UTC_zone)
819 server_localtime=utc_server.astimezone(s_tz)
821 print>>sys.stderr, "\r\n \r\n \t\tLaunchExperimentOnOAR server_timestamp %s server_tz %s slice_name %s added_nodes %s username %s reqdict %s " %(server_timestamp,server_tz,slice_name,added_nodes,slice_user, reqdict )
822 readable_time = server_localtime.strftime(self.time_format)
824 print >>sys.stderr," \r\n \r\n \t\t\t\tAPRES ParseTimezone readable_time %s timestanp %s " %(readable_time ,server_timestamp)
825 reqdict['reservation'] = readable_time
828 reqdict['type'] = "deploy"
829 reqdict['directory']= ""
830 reqdict['name']= "TestSandrine"
833 # first step : start the OAR job and update the job
834 print>>sys.stderr, "\r\n \r\n LaunchExperimentOnOAR reqdict %s \r\n site_list %s" %(reqdict,site_list)
836 answer = self.oar.POSTRequestToOARRestAPI('POST_job',reqdict,slice_user)
837 print>>sys.stderr, "\r\n \r\n LaunchExperimentOnOAR jobid %s " %(answer)
841 print>>sys.stderr, "\r\n AddSliceTonode Impossible to create job %s " %( answer)
844 print>>sys.stderr, "\r\n \r\n LaunchExperimentOnOAR jobid %s added_nodes %s slice_user %s" %(jobid,added_nodes,slice_user)
845 self.db.update_job( slice_name, jobid ,added_nodes)
848 # second step : configure the experiment
849 # we need to store the nodes in a yaml (well...) file like this :
850 # [1,56,23,14,45,75] with name /tmp/sfa<jobid>.json
851 f=open('/tmp/sfa/'+str(jobid)+'.json','w')
853 f.write(str(added_nodes[0].strip('node')))
854 for node in added_nodes[1:len(added_nodes)] :
855 f.write(','+node.strip('node'))
859 # third step : call the senslab-experiment wrapper
860 #command= "java -jar target/sfa-1.0-jar-with-dependencies.jar "+str(jobid)+" "+slice_user
861 javacmdline="/usr/bin/java"
862 jarname="/opt/senslabexperimentwrapper/sfa-1.0-jar-with-dependencies.jar"
863 #ret=subprocess.check_output(["/usr/bin/java", "-jar", ", str(jobid), slice_user])
864 output = subprocess.Popen([javacmdline, "-jar", jarname, str(jobid), slice_user],stdout=subprocess.PIPE).communicate()[0]
866 print>>sys.stderr, "\r\n \r\n LaunchExperimentOnOAR wrapper returns %s " %(output)
870 #Delete the jobs and updates the job id in the senslab table
872 #Does not clear the node list
873 def DeleteSliceFromNodes(self, slice_record):
874 # Get user information
876 self.DeleteJobs(slice_record['oar_job_id'], slice_record['hrn'])
877 self.db.update_job(slice_record['hrn'], job_id = -1)
884 def augment_records_with_testbed_info (self, sfa_records):
885 return self.fill_record_info (sfa_records)
887 def fill_record_info(self, records):
889 Given a SFA record, fill in the senslab specific and SFA specific
890 fields in the record.
893 print >>sys.stderr, "\r\n \t\t SLABDRIVER.PY fill_record_info 000000000 fill_record_info %s " %(records)
894 if not isinstance(records, list):
899 for record in parkour:
901 if str(record['type']) == 'slice':
902 #print >>sys.stderr, "\r\n \t\t SLABDRIVER.PY fill_record_info \t \t record %s" %(record)
903 #sfatable = SfaTable()
905 #existing_records_by_id = {}
906 #all_records = dbsession.query(RegRecord).all()
907 #for rec in all_records:
908 #existing_records_by_id[rec.record_id] = rec
909 #print >>sys.stderr, "\r\n \t\t SLABDRIVER.PY fill_record_info \t\t existing_records_by_id %s" %(existing_records_by_id[record['record_id']])
911 #recslice = self.db.find('slice',{'slice_hrn':str(record['hrn'])})
912 #recslice = slab_dbsession.query(SliceSenslab).filter_by(slice_hrn = str(record['hrn'])).first()
913 recslice = self.GetSlices(slice_filter = str(record['hrn']), filter_type = 'slice_hrn')
914 #print >>sys.stderr, "\r\n \t\t SLABDRIVER.PY fill_record_info \t\t HOY HOY reclise %s" %(recslice)
915 #if isinstance(recslice,list) and len(recslice) == 1:
916 #recslice = recslice[0]
918 recuser = dbsession.query(RegRecord).filter_by(record_id = recslice['record_id_user']).first()
919 #existing_records_by_id[recslice['record_id_user']]
920 #print >>sys.stderr, "\r\n \t\t SLABDRIVER.PY fill_record_info \t\t recuser %s" %(recuser)
923 record.update({'PI':[recuser.hrn],
924 'researcher': [recuser.hrn],
925 'name':record['hrn'],
926 'oar_job_id':recslice['oar_job_id'],
928 'person_ids':[recslice['record_id_user']],
929 'geni_urn':'', #For client_helper.py compatibility
930 'keys':'', #For client_helper.py compatibility
931 'key_ids':''}) #For client_helper.py compatibility
933 elif str(record['type']) == 'user':
934 #Add the data about slice
935 rec = self.GetSlices(slice_filter = record['record_id'], filter_type = 'record_id_user')
936 print >>sys.stderr, "\r\n \t\t SLABDRIVER.PY fill_record_info USEEEEEEEEEERDESU! rec %s \r\n \t rec['record_id_user'] %s " %(rec,rec['record_id_user'])
937 #Append record in records list, therfore fetches user and slice info again(one more loop)
938 #Will update PIs and researcher for the slice
939 recuser = dbsession.query(RegRecord).filter_by(record_id = rec['record_id_user']).first()
940 rec.update({'PI':[recuser.hrn],
941 'researcher': [recuser.hrn],
942 'name':record['hrn'],
943 'oar_job_id':rec['oar_job_id'],
945 'person_ids':[rec['record_id_user']]})
946 #retourne une liste 100512
948 #GetPersons takes [] as filters
949 user_slab = self.GetPersons([{'hrn':recuser.hrn}])
952 rec.update({'type':'slice','hrn':rec['slice_hrn']})
953 record.update(user_slab[0])
954 #For client_helper.py compatibility
955 record.update( { 'geni_urn':'',
960 print >>sys.stderr, "\r\n \t\t SLABDRIVER.PY fill_record_info ADDING SLICEINFO TO USER records %s" %(records)
962 print >>sys.stderr, "\r\n \t\t SLABDRIVER.PY fill_record_info OKrecords %s" %(records)
964 print >>sys.stderr, "\r\n \t\t SLABDRIVER fill_record_info EXCEPTION RECORDS : %s" %(records)
967 #self.fill_record_slab_info(records)
968 ##print >>sys.stderr, "\r\n \t\t after fill_record_slab_info %s" %(records)
969 #self.fill_record_sfa_info(records)
970 #print >>sys.stderr, "\r\n \t\t after fill_record_sfa_info"
976 #def update_membership_list(self, oldRecord, record, listName, addFunc, delFunc):
977 ## get a list of the HRNs tht are members of the old and new records
979 #oldList = oldRecord.get(listName, [])
982 #newList = record.get(listName, [])
984 ## if the lists are the same, then we don't have to update anything
985 #if (oldList == newList):
988 ## build a list of the new person ids, by looking up each person to get
992 #records = table.find({'type': 'user', 'hrn': newList})
994 #newIdList.append(rec['pointer'])
996 ## build a list of the old person ids from the person_ids field
998 #oldIdList = oldRecord.get("person_ids", [])
999 #containerId = oldRecord.get_pointer()
1001 ## if oldRecord==None, then we are doing a Register, instead of an
1004 #containerId = record.get_pointer()
1006 ## add people who are in the new list, but not the oldList
1007 #for personId in newIdList:
1008 #if not (personId in oldIdList):
1009 #addFunc(self.plauth, personId, containerId)
1011 ## remove people who are in the old list, but not the new list
1012 #for personId in oldIdList:
1013 #if not (personId in newIdList):
1014 #delFunc(self.plauth, personId, containerId)
1016 #def update_membership(self, oldRecord, record):
1017 #print >>sys.stderr, " \r\n \r\n ***SLABDRIVER.PY update_membership record ", record
1018 #if record.type == "slice":
1019 #self.update_membership_list(oldRecord, record, 'researcher',
1020 #self.users.AddPersonToSlice,
1021 #self.users.DeletePersonFromSlice)
1022 #elif record.type == "authority":
1027 # I don't think you plan on running a component manager at this point
1028 # let me clean up the mess of ComponentAPI that is deprecated anyways