X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=sfa%2Fsenslab%2Fslabdriver.py;h=41ba8c327fa8ed6c2b00609c7730c6db64abf39a;hb=d550a4bd30a06f830f8f60a4065ef2097118e09b;hp=3f06ce2e87ba2d866c37f6104fcf6eed2e1ae273;hpb=1c5813e5c42012fc6e644f2d032d19fa91f6f3da;p=sfa.git diff --git a/sfa/senslab/slabdriver.py b/sfa/senslab/slabdriver.py index 3f06ce2e..41ba8c32 100644 --- a/sfa/senslab/slabdriver.py +++ b/sfa/senslab/slabdriver.py @@ -1,15 +1,13 @@ import subprocess +import os from datetime import datetime -from dateutil import tz -from time import strftime, gmtime from sfa.util.faults import SliverDoesNotExist, UnknownSfaType from sfa.util.sfalogging import logger - from sfa.storage.alchemy import dbsession -from sfa.storage.model import RegRecord, RegUser - +from sfa.storage.model import RegRecord, RegUser, RegSlice +from sqlalchemy.orm import joinedload from sfa.trust.credential import Credential @@ -17,7 +15,7 @@ from sfa.managers.driver import Driver from sfa.rspecs.version_manager import VersionManager from sfa.rspecs.rspec import RSpec -from sfa.util.xrn import hrn_to_urn, urn_to_sliver_id, get_leaf +from sfa.util.xrn import Xrn, hrn_to_urn, get_authority ## thierry: everything that is API-related (i.e. handling incoming requests) @@ -28,31 +26,37 @@ from sfa.util.xrn import hrn_to_urn, urn_to_sliver_id, get_leaf from sfa.senslab.OARrestapi import OARrestapi from sfa.senslab.LDAPapi import LDAPapi -from sfa.senslab.slabpostgres import SlabDB, slab_dbsession, SliceSenslab -from sfa.senslab.slabaggregate import SlabAggregate, slab_xrn_to_hostname, slab_xrn_object +from sfa.senslab.slabpostgres import SlabDB, slab_dbsession, SenslabXP + + +from sfa.senslab.slabaggregate import SlabAggregate, slab_xrn_to_hostname, \ + slab_xrn_object from sfa.senslab.slabslices import SlabSlices - - # thierry : note # this inheritance scheme is so that the driver object can receive # GetNodes or GetSites sorts of calls directly # and thus minimize the differences in the managers with the pl version -class SlabDriver(Driver): + + +class SlabDriver(Driver): + """ Senslab Driver class inherited from Driver generic class. + + Contains methods compliant with the SFA standard and the testbed + infrastructure (calls to LDAP and OAR). + """ def __init__(self, config): Driver.__init__ (self, config) self.config = config self.hrn = config.SFA_INTERFACE_HRN - self.root_auth = config.SFA_REGISTRY_ROOT_AUTH - self.oar = OARrestapi() self.ldap = LDAPapi() self.time_format = "%Y-%m-%d %H:%M:%S" - self.db = SlabDB(config,debug = True) + self.db = SlabDB(config, debug = False) self.cache = None @@ -66,56 +70,84 @@ class SlabDriver(Driver): """ #First get the slice with the slice hrn - sl = self.GetSlices(slice_filter = slice_hrn, \ + slice_list = self.GetSlices(slice_filter = slice_hrn, \ slice_filter_type = 'slice_hrn') - if len(sl) is 0: - raise SliverDoesNotExist("%s slice_hrn" % (slice_hrn)) - - top_level_status = 'unknown' - nodes_in_slice = sl['node_ids'] - if len(nodes_in_slice) is 0: - raise SliverDoesNotExist("No slivers allocated ") - else: - top_level_status = 'ready' + if len(slice_list) is 0: + raise SliverDoesNotExist("%s slice_hrn" % (slice_hrn)) - logger.debug("Slabdriver - sliver_status Sliver status urn %s hrn %s sl\ - %s \r\n " %(slice_urn, slice_hrn, sl)) - - if sl['oar_job_id'] is not -1: - #A job is running on Senslab for this slice - # report about the local nodes that are in the slice only - - nodes_all = self.GetNodes({'hostname':nodes_in_slice}, - ['node_id', 'hostname','site','boot_state']) - nodeall_byhostname = dict([(n['hostname'], n) for n in nodes_all]) + #Slice has the same slice hrn for each slice in the slice/lease list + #So fetch the info on the user once + one_slice = slice_list[0] + #recuser = dbsession.query(RegRecord).filter_by(record_id = \ + #one_slice['record_id_user']).first() + + #Make a list of all the nodes hostnames in use for this slice + slice_nodes_list = [] + for single_slice in slice_list: + for node in single_slice['node_ids']: + slice_nodes_list.append(node['hostname']) + #Get all the corresponding nodes details + nodes_all = self.GetNodes({'hostname':slice_nodes_list}, + ['node_id', 'hostname','site','boot_state']) + nodeall_byhostname = dict([(one_node['hostname'], one_node) \ + for one_node in nodes_all]) + + + + for single_slice in slice_list: + #For compatibility + top_level_status = 'empty' result = {} + result.fromkeys(\ + ['geni_urn','pl_login','geni_status','geni_resources'], None) + result['pl_login'] = one_slice['reg_researchers']['hrn'] + logger.debug("Slabdriver - sliver_status Sliver status \ + urn %s hrn %s single_slice %s \r\n " \ + %(slice_urn, slice_hrn, single_slice)) + try: + nodes_in_slice = single_slice['node_ids'] + except KeyError: + #No job in the slice + result['geni_status'] = top_level_status + result['geni_resources'] = [] + return result + + top_level_status = 'ready' + + #A job is running on Senslab for this slice + # report about the local nodes that are in the slice only + result['geni_urn'] = slice_urn - result['pl_login'] = sl['job_user'] #For compatibility + - timestamp = float(sl['startTime']) + float(sl['walltime']) - result['pl_expires'] = strftime(self.time_format, \ - gmtime(float(timestamp))) + #timestamp = float(sl['startTime']) + float(sl['walltime']) + #result['pl_expires'] = strftime(self.time_format, \ + #gmtime(float(timestamp))) #result['slab_expires'] = strftime(self.time_format,\ - #gmtime(float(timestamp))) + #gmtime(float(timestamp))) resources = [] - for node in nodeall_byhostname: + for node in single_slice['node_ids']: res = {} #res['slab_hostname'] = node['hostname'] #res['slab_boot_state'] = node['boot_state'] - res['pl_hostname'] = nodeall_byhostname[node]['hostname'] - res['pl_boot_state'] = nodeall_byhostname[node]['boot_state'] - res['pl_last_contact'] = strftime(self.time_format, \ - gmtime(float(timestamp))) - sliver_id = urn_to_sliver_id(slice_urn, sl['record_id_slice'], \ - nodeall_byhostname[node]['node_id']) + res['pl_hostname'] = node['hostname'] + res['pl_boot_state'] = \ + nodeall_byhostname[node['hostname']]['boot_state'] + #res['pl_last_contact'] = strftime(self.time_format, \ + #gmtime(float(timestamp))) + sliver_id = Xrn(slice_urn, type='slice', \ + id=nodeall_byhostname[node['hostname']]['node_id'], \ + authority=self.hrn).urn + res['geni_urn'] = sliver_id - if nodeall_byhostname[node]['boot_state'] == 'Alive': + node_name = node['hostname'] + if nodeall_byhostname[node_name]['boot_state'] == 'Alive': res['geni_status'] = 'ready' else: @@ -129,13 +161,15 @@ class SlabDriver(Driver): result['geni_status'] = top_level_status result['geni_resources'] = resources logger.debug("SLABDRIVER \tsliver_statusresources %s res %s "\ - %(resources,res)) + %(resources,res)) return result - - + + def get_user(self, hrn): + return dbsession.query(RegRecord).filter_by(hrn = hrn).first() + + def create_sliver (self, slice_urn, slice_hrn, creds, rspec_string, \ users, options): - logger.debug("SLABDRIVER.PY \tcreate_sliver ") aggregate = SlabAggregate(self) slices = SlabSlices(self) @@ -147,97 +181,173 @@ class SlabDriver(Driver): creds = [creds] if users: - slice_record = users[0].get('slice_record', {}) - + slice_record = users[0].get('slice_record', {}) + logger.debug("SLABDRIVER.PY \t ===============create_sliver \t\ + creds %s \r\n \r\n users %s" \ + %(creds, users)) + slice_record['user'] = {'keys':users[0]['keys'], \ + 'email':users[0]['email'], \ + 'hrn':slice_record['reg-researchers'][0]} # parse rspec rspec = RSpec(rspec_string) - logger.debug("SLABDRIVER.PY \tcreate_sliver \trspec.version %s " \ - %(rspec.version)) - - + logger.debug("SLABDRIVER.PY \t create_sliver \trspec.version \ + %s slice_record %s users %s" \ + %(rspec.version,slice_record, users)) + + # ensure site record exists? # ensure slice record exists + #Removed options to verify_slice SA 14/08/12 sfa_slice = slices.verify_slice(slice_hrn, slice_record, peer, \ - sfa_peer, options=options) - requested_attributes = rspec.version.get_slice_attributes() - - if requested_attributes: - for attrib_dict in requested_attributes: - if 'timeslot' in attrib_dict and attrib_dict['timeslot'] \ - is not None: - sfa_slice.update({'timeslot':attrib_dict['timeslot']}) - logger.debug("SLABDRIVER.PY create_sliver slice %s " %(sfa_slice)) - + sfa_peer) + # ensure person records exists - persons = slices.verify_persons(slice_hrn, sfa_slice, users, peer, \ - sfa_peer, options=options) - - # ensure slice attributes exists? + #verify_persons returns added persons but since the return value + #is not used + slices.verify_persons(slice_hrn, sfa_slice, users, peer, \ + sfa_peer, options=options) + #requested_attributes returned by rspec.version.get_slice_attributes() + #unused, removed SA 13/08/12 + rspec.version.get_slice_attributes() + + logger.debug("SLABDRIVER.PY create_sliver slice %s " %(sfa_slice)) - # add/remove slice from nodes - requested_slivers = [node.get('component_name') \ - for node in rspec.version.get_nodes_with_slivers()] + requested_slivers = [node.get('component_id') \ + for node in rspec.version.get_nodes_with_slivers()\ + if node.get('authority_id') is self.root_auth] + l = [ node for node in rspec.version.get_nodes_with_slivers() ] logger.debug("SLADRIVER \tcreate_sliver requested_slivers \ - requested_slivers %s " %(requested_slivers)) - - nodes = slices.verify_slice_nodes(sfa_slice, requested_slivers, peer) + requested_slivers %s listnodes %s" \ + %(requested_slivers,l)) + #verify_slice_nodes returns nodes, but unused here. Removed SA 13/08/12. + #slices.verify_slice_nodes(sfa_slice, requested_slivers, peer) # add/remove leases - requested_leases = [] - kept_leases = [] + requested_lease_list = [] + + logger.debug("SLABDRIVER.PY \tcreate_sliver AVANTLEASE " ) + rspec_requested_leases = rspec.version.get_leases() for lease in rspec.version.get_leases(): - requested_lease = {} + single_requested_lease = {} + logger.debug("SLABDRIVER.PY \tcreate_sliver lease %s " %(lease)) + if not lease.get('lease_id'): - requested_lease['hostname'] = \ - slab_xrn_to_hostname(lease.get('component_id').strip()) - requested_lease['start_time'] = lease.get('start_time') - requested_lease['duration'] = lease.get('duration') - else: - kept_leases.append(int(lease['lease_id'])) - if requested_lease.get('hostname'): - requested_leases.append(requested_lease) + if get_authority(lease['component_id']) == self.root_auth: + single_requested_lease['hostname'] = \ + slab_xrn_to_hostname(\ + lease.get('component_id').strip()) + single_requested_lease['start_time'] = \ + lease.get('start_time') + single_requested_lease['duration'] = lease.get('duration') + + requested_lease_list.append(single_requested_lease) + + logger.debug("SLABDRIVER.PY \tcreate_sliver APRESLEASE" ) + #dCreate dict of leases by start_time, regrouping nodes reserved + #at the same + #time, for the same amount of time = one job on OAR + requested_job_dict = {} + for lease in requested_lease_list: + + #In case it is an asap experiment start_time is empty + if lease['start_time'] == '': + lease['start_time'] = '0' - leases = slices.verify_slice_leases(sfa_slice, \ - requested_leases, kept_leases, peer) + if lease['start_time'] not in requested_job_dict: + if isinstance(lease['hostname'], str): + lease['hostname'] = [lease['hostname']] + + requested_job_dict[lease['start_time']] = lease + + else : + job_lease = requested_job_dict[lease['start_time']] + if lease['duration'] == job_lease['duration'] : + job_lease['hostname'].append(lease['hostname']) + + + + + logger.debug("SLABDRIVER.PY \tcreate_sliver requested_job_dict %s "\ + %(requested_job_dict)) + #verify_slice_leases returns the leases , but the return value is unused + #here. Removed SA 13/08/12 + slices.verify_slice_leases(sfa_slice, \ + requested_job_dict, peer) - return aggregate.get_rspec(slice_xrn=slice_urn, version=rspec.version) + return aggregate.get_rspec(slice_xrn=slice_urn, login=sfa_slice['login'],version=rspec.version) def delete_sliver (self, slice_urn, slice_hrn, creds, options): - sfa_slice = self.GetSlices(slice_filter = slice_hrn, \ + sfa_slice_list = self.GetSlices(slice_filter = slice_hrn, \ slice_filter_type = 'slice_hrn') - logger.debug("SLABDRIVER.PY delete_sliver slice %s" %(sfa_slice)) - if not sfa_slice: + + if not sfa_slice_list: return 1 - - slices = SlabSlices(self) - # determine if this is a peer slice - - peer = slices.get_peer(slice_hrn) - try: - if peer: - self.UnBindObjectFromPeer('slice', \ - sfa_slice['record_id_slice'], peer) - self.DeleteSliceFromNodes(sfa_slice) - finally: - if peer: - self.BindObjectToPeer('slice', sfa_slice['slice_id'], \ + + #Delete all in the slice + for sfa_slice in sfa_slice_list: + + + logger.debug("SLABDRIVER.PY delete_sliver slice %s" %(sfa_slice)) + slices = SlabSlices(self) + # determine if this is a peer slice + + peer = slices.get_peer(slice_hrn) + #TODO delete_sliver SA : UnBindObjectFromPeer should be + #used when there is another + #senslab testbed, which is not the case 14/08/12 . + + logger.debug("SLABDRIVER.PY delete_sliver peer %s" %(peer)) + try: + if peer: + self.UnBindObjectFromPeer('slice', \ + sfa_slice['record_id_slice'], \ + peer, None) + self.DeleteSliceFromNodes(sfa_slice) + finally: + if peer: + self.BindObjectToPeer('slice', \ + sfa_slice['record_id_slice'], \ peer, sfa_slice['peer_slice_id']) - return 1 + return 1 - def AddSlice(self, slice_record): - slab_slice = SliceSenslab( slice_hrn = slice_record['slice_hrn'], \ - record_id_slice= slice_record['record_id_slice'] , \ - record_id_user= slice_record['record_id_user'], \ - peer_authority = slice_record['peer_authority']) - logger.debug("SLABDRIVER.PY \tAddSlice slice_record %s slab_slice %s" \ - %(slice_record,slab_slice)) - slab_dbsession.add(slab_slice) - slab_dbsession.commit() + def AddSlice(self, slice_record, user_record): + """Add slice to the sfa tables and senslab table only if the user + already exists in senslab database(user already registered in LDAP). + There is no way to separate adding the slice to the tesbed + and then importing it from the testbed to SFA because of + senslab's architecture. Therefore, sfa tables are updated here. + """ + + sfa_record = RegSlice(hrn=slice_record['slice_hrn'], + gid=slice_record['gid'], + pointer=slice_record['slice_id'], + authority=slice_record['authority']) + + logger.debug("SLABDRIVER.PY AddSlice sfa_record %s user_record %s" \ + %(sfa_record, user_record)) + sfa_record.just_created() + dbsession.add(sfa_record) + dbsession.commit() + #Update the reg-researcher dependance table + sfa_record.reg_researchers = [user_record] + dbsession.commit() + + #Update the senslab table with the new slice + #slab_slice = SenslabXP( slice_hrn = slice_record['slice_hrn'], \ + #record_id_slice = sfa_record.record_id , \ + #record_id_user = slice_record['record_id_user'], \ + #peer_authority = slice_record['peer_authority']) + + #logger.debug("SLABDRIVER.PY \tAddSlice slice_record %s \ + #slab_slice %s sfa_record %s" \ + #%(slice_record,slab_slice, sfa_record)) + #slab_dbsession.add(slab_slice) + #slab_dbsession.commit() return # first 2 args are None in case of resource discovery @@ -254,6 +364,14 @@ class SlabDriver(Driver): if options.get('info'): version_string = version_string + "_" + \ options.get('info', 'default') + + # Adding the list_leases option to the caching key + if options.get('list_leases'): + version_string = version_string + "_"+options.get('list_leases', 'default') + + # Adding geni_available to caching key + if options.get('geni_available'): + version_string = version_string + "_" + str(options.get('geni_available')) # look in cache first #if cached_requested and self.cache and not slice_hrn: @@ -265,8 +383,8 @@ class SlabDriver(Driver): #panos: passing user-defined options aggregate = SlabAggregate(self) - origin_hrn = Credential(string=creds[0]).get_gid_caller().get_hrn() - options.update({'origin_hrn':origin_hrn}) + #origin_hrn = Credential(string=creds[0]).get_gid_caller().get_hrn() + #options.update({'origin_hrn':origin_hrn}) rspec = aggregate.get_rspec(slice_xrn=slice_urn, \ version=rspec_version, options=options) @@ -290,9 +408,8 @@ class SlabDriver(Driver): slices = self.GetSlices() logger.debug("SLABDRIVER.PY \tlist_slices hrn %s \r\n \r\n" %(slices)) - slice_hrns = [slab_slice['slice_hrn'] for slab_slice in slices] - #slice_hrns = [slicename_to_hrn(self.hrn, slab_slice['slice_hrn']) \ - #for slab_slice in slices] + slice_hrns = [slab_slice['hrn'] for slab_slice in slices] + slice_urns = [hrn_to_urn(slice_hrn, 'slice') \ for slice_hrn in slice_hrns] @@ -303,65 +420,23 @@ class SlabDriver(Driver): return slice_urns - #No site or node register supported + def register (self, sfa_record, hrn, pub_key): - record_type = sfa_record['type'] - slab_record = self.sfa_fields_to_slab_fields(record_type, hrn, \ - sfa_record) - - - if record_type == 'slice': - acceptable_fields = ['url', 'instantiation', 'name', 'description'] - for key in slab_record.keys(): - if key not in acceptable_fields: - slab_record.pop(key) - logger.debug("SLABDRIVER.PY register") - slices = self.GetSlices(slice_filter =slab_record['hrn'], \ - slice_filter_type = 'slice_hrn') - if not slices: - pointer = self.AddSlice(slab_record) - else: - pointer = slices[0]['slice_id'] - - elif record_type == 'user': - persons = self.GetPersons([sfa_record]) - #persons = self.GetPersons([sfa_record['hrn']]) - if not persons: - pointer = self.AddPerson(dict(sfa_record)) - #add in LDAP - else: - pointer = persons[0]['person_id'] - - #Does this make sense to senslab ? - #if 'enabled' in sfa_record and sfa_record['enabled']: - #self.UpdatePerson(pointer, \ - #{'enabled': sfa_record['enabled']}) - - #TODO register Change this AddPersonToSite stuff 05/07/2012 SA - # add this person to the site only if - # she is being added for the first - # time by sfa and doesnt already exist in plc - if not persons or not persons[0]['site_ids']: - login_base = get_leaf(sfa_record['authority']) - self.AddPersonToSite(pointer, login_base) - - # What roles should this user have? - #TODO : DElete this AddRoleToPerson 04/07/2012 SA - #Function prototype is : - #AddRoleToPerson(self, auth, role_id_or_name, person_id_or_email) - #what's the pointer doing here? - self.AddRoleToPerson('user', pointer) - # Add the user's key - if pub_key: - self.AddPersonKey(pointer, {'key_type' : 'ssh', \ - 'key' : pub_key}) - - #No node adding outside OAR - - return pointer + """ + Adding new user, slice, node or site should not be handled + by SFA. + + Adding nodes = OAR + Adding users = LDAP Senslab + Adding slice = Import from LDAP users + Adding site = OAR + """ + return -1 - #No site or node record update allowed + def update (self, old_sfa_record, new_sfa_record, hrn, new_key): + """No site or node record update allowed in Senslab.""" + pointer = old_sfa_record['pointer'] old_sfa_record_type = old_sfa_record['type'] @@ -418,7 +493,6 @@ class SlabDriver(Driver): def remove (self, sfa_record): sfa_record_type = sfa_record['type'] hrn = sfa_record['hrn'] - record_id = sfa_record['record_id'] if sfa_record_type == 'user': #get user from senslab ldap @@ -432,7 +506,7 @@ class SlabDriver(Driver): elif sfa_record_type == 'slice': if self.GetSlices(slice_filter = hrn, \ slice_filter_type = 'slice_hrn'): - self.DeleteSlice(sfa_record_type) + self.DeleteSlice(sfa_record) #elif type == 'authority': #if self.GetSites(pointer): @@ -450,16 +524,12 @@ class SlabDriver(Driver): logger.debug("SLABDRIVER \tGetPeers auth = %s, peer_filter %s, \ return_field %s " %(auth , peer_filter, return_fields_list)) all_records = dbsession.query(RegRecord).filter(RegRecord.type.like('%authority%')).all() + for record in all_records: existing_records[(record.hrn, record.type)] = record if record.type not in existing_hrns_by_types: existing_hrns_by_types[record.type] = [record.hrn] - logger.debug("SLABDRIVER \tGetPeer\t NOT IN \ - existing_hrns_by_types %s " %( existing_hrns_by_types)) else: - - logger.debug("SLABDRIVER \tGetPeer\t \INNN type %s hrn %s " \ - %(record.type,record.hrn)) existing_hrns_by_types[record.type].append(record.hrn) @@ -477,7 +547,7 @@ class SlabDriver(Driver): logger.debug("SLABDRIVER \tGetPeer \trecords_list %s " \ %(records_list)) - except: + except KeyError: pass return_records = records_list @@ -493,7 +563,7 @@ class SlabDriver(Driver): #TODO : Handling OR request in make_ldap_filters_from_records #instead of the for loop #over the records' list - def GetPersons(self, person_filter=None, return_fields_list=None): + def GetPersons(self, person_filter=None): """ person_filter should be a list of dictionnaries when not set to None. Returns a list of users whose accounts are enabled found in ldap. @@ -511,7 +581,13 @@ class SlabDriver(Driver): #add a filter for make_ldap_filters_from_record person = self.ldap.LdapFindUser(searched_attributes, \ is_user_enabled=True) - person_list.append(person) + #If a person was found, append it to the list + if person: + person_list.append(person) + + #If the list is empty, return None + if len(person_list) is 0: + person_list = None else: #Get only enabled user accounts in senslab LDAP : @@ -521,6 +597,8 @@ class SlabDriver(Driver): return person_list def GetTimezone(self): + """ Get the OAR servier time and timezone. + Unused SA 16/11/12""" server_timestamp, server_tz = self.oar.parser.\ SendRequest("GET_timezone") return server_timestamp, server_tz @@ -534,10 +612,11 @@ class SlabDriver(Driver): reqdict['method'] = "delete" reqdict['strval'] = str(job_id) + answer = self.oar.POSTRequestToOARRestAPI('DELETE_jobs_id', \ reqdict,username) - logger.debug("SLABDRIVER \tDeleteJobs jobid %s \r\n answer %s username %s" \ - %(job_id,answer, username)) + logger.debug("SLABDRIVER \tDeleteJobs jobid %s \r\n answer %s \ + username %s" %(job_id,answer, username)) return answer @@ -589,7 +668,7 @@ class SlabDriver(Driver): #assigned_n = ['node', 'node_uri'] req = "GET_jobs_id_resources" - node_list_k = 'reserved_resources' + #Get job resources list from OAR node_id_list = self.oar.parser.SendRequest(req, job_id, username) @@ -598,8 +677,7 @@ class SlabDriver(Driver): hostname_list = \ self.__get_hostnames_from_oar_node_ids(node_id_list) - #parsed_job_info = self.get_info_on_reserved_nodes(job_info, \ - #node_list_k) + #Replaces the previous entry "assigned_network_address" / #"reserved_resources" #with "node_ids" @@ -643,24 +721,24 @@ class SlabDriver(Driver): for node in full_nodes_dict_list: oar_id_node_dict[node['oar_id']] = node - logger.debug("SLABDRIVER \t __get_hostnames_from_oar_node_ids\ - oar_id_node_dict %s" %(oar_id_node_dict)) - hostname_list = [] + #logger.debug("SLABDRIVER \t __get_hostnames_from_oar_node_ids\ + #oar_id_node_dict %s" %(oar_id_node_dict)) + hostname_dict_list = [] for resource_id in resource_id_list: #Because jobs requested "asap" do not have defined resources if resource_id is not "Undefined": - hostname_dict_list.append({'hostname' : \ - oar_id_node_dict[resource_id]['hostname'], - 'site_id' : oar_id_node_dict[resource_id]['site']}) + hostname_dict_list.append(\ + oar_id_node_dict[resource_id]['hostname']) #hostname_list.append(oar_id_node_dict[resource_id]['hostname']) return hostname_dict_list - def GetReservedNodes(self): + def GetReservedNodes(self,username = None): #Get the nodes in use and the reserved nodes reservation_dict_list = \ - self.oar.parser.SendRequest("GET_reserved_nodes") + self.oar.parser.SendRequest("GET_reserved_nodes", \ + username = username) for resa in reservation_dict_list: @@ -679,7 +757,8 @@ class SlabDriver(Driver): """ node_dict_by_id = self.oar.parser.SendRequest("GET_resources_full") node_dict_list = node_dict_by_id.values() - + logger.debug (" SLABDRIVER GetNodes node_filter_dict %s \ + return_fields_list %s "%(node_filter_dict, return_fields_list)) #No filtering needed return the list directly if not (node_filter_dict or return_fields_list): return node_dict_list @@ -732,10 +811,91 @@ class SlabDriver(Driver): return return_site_list - #warning return_fields_list paramr emoved (Not used) - def GetSlices(self, slice_filter = None, slice_filter_type = None): - #def GetSlices(self, slice_filter = None, slice_filter_type = None, \ - #return_fields_list = None): + + + def _sql_get_slice_info( self, slice_filter ): + #DO NOT USE RegSlice - reg_researchers to get the hrn + #of the user otherwise will mess up the RegRecord in + #Resolve, don't know why - SA 08/08/2012 + + #Only one entry for one user = one slice in slab_xp table + #slicerec = dbsession.query(RegRecord).filter_by(hrn = slice_filter).first() + raw_slicerec = dbsession.query(RegSlice).options(joinedload('reg_researchers')).filter_by(hrn = slice_filter).first() + #raw_slicerec = dbsession.query(RegRecord).filter_by(hrn = slice_filter).first() + if raw_slicerec: + #load_reg_researcher + #raw_slicerec.reg_researchers + raw_slicerec = raw_slicerec.__dict__ + logger.debug(" SLABDRIVER \t get_slice_info slice_filter %s raw_slicerec %s"%(slice_filter,raw_slicerec)) + slicerec = raw_slicerec + #only one researcher per slice so take the first one + #slicerec['reg_researchers'] = raw_slicerec['reg_researchers'] + #del slicerec['reg_researchers']['_sa_instance_state'] + return slicerec + + else : + return None + + + def _sql_get_slice_info_from_user( self, slice_filter ): + #slicerec = dbsession.query(RegRecord).filter_by(record_id = slice_filter).first() + raw_slicerec = dbsession.query(RegUser).options(joinedload('reg_slices_as_researcher')).filter_by(record_id = slice_filter).first() + #raw_slicerec = dbsession.query(RegRecord).filter_by(record_id = slice_filter).first() + #Put it in correct order + user_needed_fields = ['peer_authority', 'hrn', 'last_updated', 'classtype', 'authority', 'gid', 'record_id', 'date_created', 'type', 'email', 'pointer'] + slice_needed_fields = ['peer_authority', 'hrn', 'last_updated', 'classtype', 'authority', 'gid', 'record_id', 'date_created', 'type', 'pointer'] + if raw_slicerec: + #raw_slicerec.reg_slices_as_researcher + raw_slicerec = raw_slicerec.__dict__ + slicerec = {} + slicerec = dict([(k,raw_slicerec['reg_slices_as_researcher'][0].__dict__[k]) for k in slice_needed_fields]) + slicerec['reg_researchers'] = dict([(k, raw_slicerec[k]) for k in user_needed_fields]) + #TODO Handle multiple slices for one user SA 10/12/12 + #for now only take the first slice record associated to the rec user + ##slicerec = raw_slicerec['reg_slices_as_researcher'][0].__dict__ + #del raw_slicerec['reg_slices_as_researcher'] + #slicerec['reg_researchers'] = raw_slicerec + ##del slicerec['_sa_instance_state'] + + return slicerec + + else: + return None + + def _get_slice_records(self, slice_filter = None, \ + slice_filter_type = None): + + #login = None + + #Get list of slices based on the slice hrn + if slice_filter_type == 'slice_hrn': + + #if get_authority(slice_filter) == self.root_auth: + #login = slice_filter.split(".")[1].split("_")[0] + + slicerec = self._sql_get_slice_info(slice_filter) + + if slicerec is None: + return None + #return login, None + + #Get slice based on user id + if slice_filter_type == 'record_id_user': + + slicerec = self._sql_get_slice_info_from_user(slice_filter) + + if slicerec: + fixed_slicerec_dict = slicerec + #At this point if the there is no login it means + #record_id_user filter has been used for filtering + #if login is None : + ##If theslice record is from senslab + #if fixed_slicerec_dict['peer_authority'] is None: + #login = fixed_slicerec_dict['hrn'].split(".")[1].split("_")[0] + #return login, fixed_slicerec_dict + return fixed_slicerec_dict + + def GetSlices(self, slice_filter = None, slice_filter_type = None, login=None): """ Get the slice records from the slab db. Returns a slice ditc if slice_filter and slice_filter_type are specified. @@ -743,76 +903,113 @@ class SlabDriver(Driver): specified. """ - return_slice_list = [] - slicerec = {} - slicerec_dict = {} + #login = None authorized_filter_types_list = ['slice_hrn', 'record_id_user'] - logger.debug("SLABDRIVER \tGetSlices authorized_filter_types_list %s"\ - %(authorized_filter_types_list)) + return_slicerec_dictlist = [] + + #First try to get information on the slice based on the filter provided if slice_filter_type in authorized_filter_types_list: - if slice_filter_type == 'slice_hrn': - slicerec = slab_dbsession.query(SliceSenslab).filter_by(slice_hrn = slice_filter).first() - - if slice_filter_type == 'record_id_user': - slicerec = slab_dbsession.query(SliceSenslab).filter_by(record_id_user = slice_filter).first() - - if slicerec: - #warning pylint OK - slicerec_dict = slicerec.dump_sqlalchemyobj_to_dict() - logger.debug("SLABDRIVER \tGetSlices slicerec_dict %s" \ - %(slicerec_dict)) - #Get login - login = slicerec_dict['slice_hrn'].split(".")[1].split("_")[0] - logger.debug("\r\n SLABDRIVER \tGetSlices login %s \ - slice record %s" \ - %(login, slicerec_dict)) - if slicerec_dict['oar_job_id'] is not -1: - #Check with OAR the status of the job if a job id is in - #the slice record - rslt = self.GetJobsResources(slicerec_dict['oar_job_id'], \ - username = login) - - if rslt : - slicerec_dict.update(rslt) - slicerec_dict.update({'hrn':\ - str(slicerec_dict['slice_hrn'])}) - #If GetJobsResources is empty, this means the job is - #now in the 'Terminated' state - #Update the slice record - else : - self.db.update_job(slice_filter, job_id = -1) - slicerec_dict['oar_job_id'] = -1 - slicerec_dict.\ - update({'hrn':str(slicerec_dict['slice_hrn'])}) + fixed_slicerec_dict = \ + self._get_slice_records(slice_filter, slice_filter_type) + #login, fixed_slicerec_dict = \ + #self._get_slice_records(slice_filter, slice_filter_type) + logger.debug(" SLABDRIVER \tGetSlices login %s \ + slice record %s slice_filter %s slice_filter_type %s "\ + %(login, fixed_slicerec_dict,slice_filter, slice_filter_type)) + - try: - slicerec_dict['node_ids'] = slicerec_dict['node_list'] - except KeyError: - pass + #Now we have the slice record fixed_slicerec_dict, get the + #jobs associated to this slice + #leases_list = self.GetReservedNodes(username = login) + leases_list = self.GetLeases(login = login) + #If no job is running or no job scheduled + #return only the slice record + if leases_list == [] and fixed_slicerec_dict: + return_slicerec_dictlist.append(fixed_slicerec_dict) - logger.debug("SLABDRIVER.PY \tGetSlices slicerec_dict %s"\ - %(slicerec_dict)) - - return slicerec_dict + #If several jobs for one slice , put the slice record into + # each lease information dict + for lease in leases_list : + slicerec_dict = {} + + reserved_list = lease['reserved_nodes'] + + slicerec_dict['oar_job_id']= lease['lease_id'] + slicerec_dict.update({'list_node_ids':{'hostname':reserved_list}}) + slicerec_dict.update({'node_ids':lease['reserved_nodes']}) + + #Update lease dict with the slice record + if fixed_slicerec_dict: + fixed_slicerec_dict['oar_job_id'] = [] + fixed_slicerec_dict['oar_job_id'].append(slicerec_dict['oar_job_id']) + slicerec_dict.update(fixed_slicerec_dict) + #slicerec_dict.update({'hrn':\ + #str(fixed_slicerec_dict['slice_hrn'])}) + + + return_slicerec_dictlist.append(slicerec_dict) + logger.debug("SLABDRIVER.PY \tGetSlices \ + slicerec_dict %s return_slicerec_dictlist %s \ + lease['reserved_nodes'] \ + %s" %(slicerec_dict, return_slicerec_dictlist, \ + lease['reserved_nodes'] )) + + logger.debug("SLABDRIVER.PY \tGetSlices RETURN \ + return_slicerec_dictlist %s" \ + %(return_slicerec_dictlist)) + + return return_slicerec_dictlist else: - slice_list = slab_dbsession.query(SliceSenslab).all() - return_slice_list = [] - for record in slice_list: - return_slice_list.append(record.dump_sqlalchemyobj_to_dict()) - - logger.debug("SLABDRIVER.PY \tGetSlices slices %s \ - slice_filter %s " %(return_slice_list, slice_filter)) - - #if return_fields_list: - #return_slice_list = parse_filter(sliceslist, \ - #slice_filter,'slice', return_fields_list) - - return return_slice_list - + #Get all slices from the senslab sfa database , + #put them in dict format + #query_slice_list = dbsession.query(RegRecord).all() + query_slice_list = dbsession.query(RegSlice).options(joinedload('reg_researchers')).all() + #query_slice_list = dbsession.query(RegRecord).filter_by(type='slice').all() + #query_slice_list = slab_dbsession.query(SenslabXP).all() + return_slicerec_dictlist = [] + for record in query_slice_list: + tmp = record.__dict__ + tmp['reg_researchers'] = tmp['reg_researchers'][0].__dict__ + #del tmp['reg_researchers']['_sa_instance_state'] + return_slicerec_dictlist.append(tmp) + #return_slicerec_dictlist.append(record.__dict__) + + #Get all the jobs reserved nodes + leases_list = self.GetReservedNodes() + + for fixed_slicerec_dict in return_slicerec_dictlist: + slicerec_dict = {} + #Check if the slice belongs to a senslab user + if fixed_slicerec_dict['peer_authority'] is None: + owner = fixed_slicerec_dict['hrn'].split(".")[1].split("_")[0] + else: + owner = None + for lease in leases_list: + if owner == lease['user']: + slicerec_dict['oar_job_id'] = lease['lease_id'] + + #for reserved_node in lease['reserved_nodes']: + logger.debug("SLABDRIVER.PY \tGetSlices lease %s "\ + %(lease )) + + reserved_list = lease['reserved_nodes'] + + slicerec_dict.update({'node_ids':lease['reserved_nodes']}) + slicerec_dict.update({'list_node_ids':{'hostname':reserved_list}}) + slicerec_dict.update(fixed_slicerec_dict) + #slicerec_dict.update({'hrn':\ + #str(fixed_slicerec_dict['slice_hrn'])}) + #return_slicerec_dictlist.append(slicerec_dict) + fixed_slicerec_dict.update(slicerec_dict) + + logger.debug("SLABDRIVER.PY \tGetSlices RETURN \ + return_slicerec_dictlist %s \slice_filter %s " \ + %(return_slicerec_dictlist, slice_filter)) + return return_slicerec_dictlist def testbed_name (self): return self.hrn @@ -833,10 +1030,7 @@ class SlabDriver(Driver): 'geni_ad_rspec_versions': ad_rspec_versions, } - - - - + ## # Convert SFA fields to PLC fields for use when registering up updating @@ -849,10 +1043,6 @@ class SlabDriver(Driver): def sfa_fields_to_slab_fields(self, sfa_type, hrn, record): - def convert_ints(tmpdict, int_fields): - for field in int_fields: - if field in tmpdict: - tmpdict[field] = int(tmpdict[field]) slab_record = {} #for field in record: @@ -863,7 +1053,8 @@ class SlabDriver(Driver): if not "instantiation" in slab_record: slab_record["instantiation"] = "senslab-instantiated" #slab_record["hrn"] = hrn_to_pl_slicename(hrn) - #Unused hrn_to_pl_slicename because Slab's hrn already in the appropriate form SA 23/07/12 + #Unused hrn_to_pl_slicename because Slab's hrn already + #in the appropriate form SA 23/07/12 slab_record["hrn"] = hrn logger.debug("SLABDRIVER.PY sfa_fields_to_slab_fields \ slab_record %s " %(slab_record['hrn'])) @@ -919,151 +1110,114 @@ class SlabDriver(Driver): else: return None - - def LaunchExperimentOnOAR(self, slice_dict, added_nodes, slice_user=None): - """ Creates the structure needed for a correct POST on OAR. - Makes the timestamp transformation into the appropriate format. - Sends the POST request to create the job with the resources in - added_nodes. - """ - site_list = [] - nodeid_list = [] - resource = "" - reqdict = {} - slice_name = slice_dict['name'] - try: - slot = slice_dict['timeslot'] - logger.debug("SLABDRIVER.PY \tLaunchExperimentOnOAR \ - slot %s" %(slot)) - except KeyError: - #Running on default parameters - #XP immediate , 10 mins - slot = { 'date':None, 'start_time':None, - 'timezone':None, 'duration':None }#10 min - - reqdict['workdir'] = '/tmp' - reqdict['resource'] = "{network_address in (" - - for node in added_nodes: - logger.debug("OARrestapi \tLaunchExperimentOnOAR \ - node %s" %(node)) - - #Get the ID of the node : remove the root auth and put - # the site in a separate list. - # NT: it's not clear for me if the nodenames will have the senslab - #prefix so lets take the last part only, for now. - - # Again here it's not clear if nodes will be prefixed with _, - #lets split and tanke the last part for now. - #s=lastpart.split("_") - - nodeid = node - reqdict['resource'] += "'" + nodeid + "', " - nodeid_list.append(nodeid) - - custom_length = len(reqdict['resource'])- 2 - reqdict['resource'] = reqdict['resource'][0:custom_length] + \ - ")}/nodes=" + str(len(nodeid_list)) - - def __process_walltime(duration=None): - """ Calculates the walltime in seconds from the duration in H:M:S - specified in the RSpec. - - """ - if duration: - walltime = duration.split(":") - # Fixing the walltime by adding a few delays. First put the walltime - # in seconds oarAdditionalDelay = 20; additional delay for - # /bin/sleep command to - # take in account prologue and epilogue scripts execution - # int walltimeAdditionalDelay = 120; additional delay - - desired_walltime = int(walltime[0])*3600 + int(walltime[1]) * 60 +\ - int(walltime[2]) - total_walltime = desired_walltime + 140 #+2 min 20 - sleep_walltime = desired_walltime + 20 #+20 sec - logger.debug("SLABDRIVER \t__process_walltime desired_walltime %s\ - total_walltime %s sleep_walltime %s "\ - %(desired_walltime, total_walltime, \ - sleep_walltime)) - #Put the walltime back in str form - #First get the hours - walltime[0] = str(total_walltime / 3600) - total_walltime = total_walltime - 3600 * int(walltime[0]) - #Get the remaining minutes - walltime[1] = str(total_walltime / 60) - total_walltime = total_walltime - 60 * int(walltime[1]) - #Get the seconds - walltime[2] = str(total_walltime) - logger.debug("SLABDRIVER \t__process_walltime walltime %s "\ - %(walltime)) - else: - #automatically set 10min +2 min 20 - walltime[0] = '0' - walltime[1] = '12' - walltime[2] = '20' - sleep_walltime = '620' - - return walltime, sleep_walltime - - #if slot['duration']: - walltime, sleep_walltime = __process_walltime(duration = \ - slot['duration']) - #else: - #walltime, sleep_walltime = self.__process_walltime(duration = None) - - reqdict['resource'] += ",walltime=" + str(walltime[0]) + \ - ":" + str(walltime[1]) + ":" + str(walltime[2]) - reqdict['script_path'] = "/bin/sleep " + str(sleep_walltime) - - - - #In case of a scheduled experiment (not immediate) - #To run an XP immediately, don't specify date and time in RSpec - #They will be set to None. - server_timestamp, server_tz = self.GetTimezone() - if slot['date'] and slot['start_time']: - if slot['timezone'] is '' or slot['timezone'] is None: - #assume it is server timezone - from_zone = tz.gettz(server_tz) - logger.warning("SLABDRIVER \tLaunchExperimentOnOAR timezone \ - not specified server_tz %s from_zone %s" \ - %(server_tz, from_zone)) - else: - #Get zone of the user from the reservation time given - #in the rspec - from_zone = tz.gettz(slot['timezone']) - - date = str(slot['date']) + " " + str(slot['start_time']) - user_datetime = datetime.strptime(date, self.time_format) - user_datetime = user_datetime.replace(tzinfo = from_zone) + + + + def LaunchExperimentOnOAR(self, added_nodes, slice_name, \ + lease_start_time, lease_duration, slice_user=None): + lease_dict = {} + lease_dict['lease_start_time'] = lease_start_time + lease_dict['lease_duration'] = lease_duration + lease_dict['added_nodes'] = added_nodes + lease_dict['slice_name'] = slice_name + lease_dict['slice_user'] = slice_user + lease_dict['grain'] = self.GetLeaseGranularity() + lease_dict['time_format'] = self.time_format + + + def __create_job_structure_request_for_OAR(lease_dict): + """ Creates the structure needed for a correct POST on OAR. + Makes the timestamp transformation into the appropriate format. + Sends the POST request to create the job with the resources in + added_nodes. - #Convert to server zone + """ - to_zone = tz.gettz(server_tz) - reservation_date = user_datetime.astimezone(to_zone) - #Readable time accpeted by OAR - reqdict['reservation'] = reservation_date.strftime(self.time_format) - - logger.debug("SLABDRIVER \tLaunchExperimentOnOAR \ - reqdict['reservation'] %s " %(reqdict['reservation'])) + nodeid_list = [] + reqdict = {} + - else: - # Immediate XP. Not need to add special parameters. - # normally not used in SFA - - pass - + reqdict['workdir'] = '/tmp' + reqdict['resource'] = "{network_address in (" + + for node in lease_dict['added_nodes']: + logger.debug("\r\n \r\n OARrestapi \t \ + __create_job_structure_request_for_OAR node %s" %(node)) + + # Get the ID of the node + nodeid = node + reqdict['resource'] += "'" + nodeid + "', " + nodeid_list.append(nodeid) + + custom_length = len(reqdict['resource'])- 2 + reqdict['resource'] = reqdict['resource'][0:custom_length] + \ + ")}/nodes=" + str(len(nodeid_list)) + + def __process_walltime(duration): + """ Calculates the walltime in seconds from the duration in H:M:S + specified in the RSpec. + + """ + if duration: + # Fixing the walltime by adding a few delays. + # First put the walltime in seconds oarAdditionalDelay = 20; + # additional delay for /bin/sleep command to + # take in account prologue and epilogue scripts execution + # int walltimeAdditionalDelay = 240; additional delay + desired_walltime = duration + total_walltime = desired_walltime + 240 #+4 min Update SA 23/10/12 + sleep_walltime = desired_walltime # 0 sec added Update SA 23/10/12 + walltime = [] + #Put the walltime back in str form + #First get the hours + walltime.append(str(total_walltime / 3600)) + total_walltime = total_walltime - 3600 * int(walltime[0]) + #Get the remaining minutes + walltime.append(str(total_walltime / 60)) + total_walltime = total_walltime - 60 * int(walltime[1]) + #Get the seconds + walltime.append(str(total_walltime)) + + else: + logger.log_exc(" __process_walltime duration null") + + return walltime, sleep_walltime + - reqdict['type'] = "deploy" - reqdict['directory'] = "" - reqdict['name'] = "TestSandrine" - - - # first step : start the OAR job and update the job + walltime, sleep_walltime = \ + __process_walltime(int(lease_dict['lease_duration'])*lease_dict['grain']) + + + reqdict['resource'] += ",walltime=" + str(walltime[0]) + \ + ":" + str(walltime[1]) + ":" + str(walltime[2]) + reqdict['script_path'] = "/bin/sleep " + str(sleep_walltime) + + #In case of a scheduled experiment (not immediate) + #To run an XP immediately, don't specify date and time in RSpec + #They will be set to None. + if lease_dict['lease_start_time'] is not '0': + #Readable time accepted by OAR + start_time = datetime.fromtimestamp(int(lease_dict['lease_start_time'])).\ + strftime(lease_dict['time_format']) + reqdict['reservation'] = start_time + #If there is not start time, Immediate XP. No need to add special + # OAR parameters + + + reqdict['type'] = "deploy" + reqdict['directory'] = "" + reqdict['name'] = "SFA_" + lease_dict['slice_user'] + + return reqdict + + logger.debug("SLABDRIVER.PY \tLaunchExperimentOnOAR slice_user %s\ + \r\n " %(slice_user)) + #Create the request for OAR + reqdict = __create_job_structure_request_for_OAR(lease_dict) + # first step : start the OAR job and update the job logger.debug("SLABDRIVER.PY \tLaunchExperimentOnOAR reqdict %s\ - \r\n site_list %s" %(reqdict, site_list)) + \r\n " %(reqdict)) answer = self.oar.POSTRequestToOARRestAPI('POST_job', \ reqdict, slice_user) @@ -1073,94 +1227,234 @@ class SlabDriver(Driver): except KeyError: logger.log_exc("SLABDRIVER \tLaunchExperimentOnOAR \ Impossible to create job %s " %(answer)) - return + return None - logger.debug("SLABDRIVER \tLaunchExperimentOnOAR jobid %s \ - added_nodes %s slice_user %s" %(jobid, added_nodes, slice_user)) - self.db.update_job( slice_name, jobid, added_nodes) - - # second step : configure the experiment - # we need to store the nodes in a yaml (well...) file like this : - # [1,56,23,14,45,75] with name /tmp/sfa.json - job_file = open('/tmp/sfa/'+ str(jobid) + '.json', 'w') - job_file.write('[') - job_file.write(str(added_nodes[0].strip('node'))) - for node in added_nodes[1:len(added_nodes)] : - job_file.write(', '+ node.strip('node')) - job_file.write(']') - job_file.close() - - # third step : call the senslab-experiment wrapper - #command= "java -jar target/sfa-1.0-jar-with-dependencies.jar - # "+str(jobid)+" "+slice_user - javacmdline = "/usr/bin/java" - jarname = \ - "/opt/senslabexperimentwrapper/sfa-1.0-jar-with-dependencies.jar" - #ret=subprocess.check_output(["/usr/bin/java", "-jar", ", \ - #str(jobid), slice_user]) - output = subprocess.Popen([javacmdline, "-jar", jarname, str(jobid), \ - slice_user],stdout=subprocess.PIPE).communicate()[0] - - logger.debug("SLABDRIVER \tLaunchExperimentOnOAR wrapper returns%s " \ - %(output)) - return - - - #Delete the jobs and updates the job id in the senslab table - #to set it to -1 - #Does not clear the node list + def __configure_experiment(jobid, added_nodes): + # second step : configure the experiment + # we need to store the nodes in a yaml (well...) file like this : + # [1,56,23,14,45,75] with name /tmp/sfa.json + tmp_dir = '/tmp/sfa/' + if not os.path.exists(tmp_dir): + os.makedirs(tmp_dir) + job_file = open(tmp_dir + str(jobid) + '.json', 'w') + job_file.write('[') + job_file.write(str(added_nodes[0].strip('node'))) + for node in added_nodes[1:len(added_nodes)] : + job_file.write(', '+ node.strip('node')) + job_file.write(']') + job_file.close() + return + + def __launch_senslab_experiment(jobid): + # third step : call the senslab-experiment wrapper + #command= "java -jar target/sfa-1.0-jar-with-dependencies.jar + # "+str(jobid)+" "+slice_user + javacmdline = "/usr/bin/java" + jarname = \ + "/opt/senslabexperimentwrapper/sfa-1.0-jar-with-dependencies.jar" + + output = subprocess.Popen([javacmdline, "-jar", jarname, str(jobid), \ + slice_user],stdout=subprocess.PIPE).communicate()[0] + + logger.debug("SLABDRIVER \t __configure_experiment wrapper returns%s " \ + %(output)) + return + + + + if jobid : + logger.debug("SLABDRIVER \tLaunchExperimentOnOAR jobid %s \ + added_nodes %s slice_user %s" %(jobid, added_nodes, slice_user)) + + + __configure_experiment(jobid, added_nodes) + __launch_senslab_experiment(jobid) + + return jobid + + + def AddLeases(self, hostname_list, slice_record, \ + lease_start_time, lease_duration): + logger.debug("SLABDRIVER \r\n \r\n \t AddLeases hostname_list %s \ + slice_record %s lease_start_time %s lease_duration %s "\ + %( hostname_list, slice_record , lease_start_time, \ + lease_duration)) + + #tmp = slice_record['reg-researchers'][0].split(".") + username = slice_record['login'] + #username = tmp[(len(tmp)-1)] + job_id = self.LaunchExperimentOnOAR(hostname_list, slice_record['hrn'], \ + lease_start_time, lease_duration, username) + start_time = datetime.fromtimestamp(int(lease_start_time)).strftime(self.time_format) + end_time = lease_start_time + lease_duration + + import logging, logging.handlers + from sfa.util.sfalogging import _SfaLogger + logger.debug("SLABDRIVER \r\n \r\n \t AddLeases TURN ON LOGGING SQL %s %s %s "%(slice_record['hrn'], job_id, end_time)) + sql_logger = _SfaLogger(loggername = 'sqlalchemy.engine', level=logging.DEBUG) + logger.debug("SLABDRIVER \r\n \r\n \t AddLeases %s %s %s " %(type(slice_record['hrn']), type(job_id), type(end_time))) + slab_ex_row = SenslabXP(slice_hrn = slice_record['hrn'], job_id = job_id,end_time= end_time) + logger.debug("SLABDRIVER \r\n \r\n \t AddLeases slab_ex_row %s" %(slab_ex_row)) + slab_dbsession.add(slab_ex_row) + slab_dbsession.commit() + + logger.debug("SLABDRIVER \t AddLeases hostname_list start_time %s " %(start_time)) + + return + + + #Delete the jobs from job_senslab table def DeleteSliceFromNodes(self, slice_record): - # Get user information - - self.DeleteJobs(slice_record['oar_job_id'], slice_record['hrn']) - self.db.update_job(slice_record['hrn'], job_id = -1) + for job_id in slice_record['oar_job_id']: + self.DeleteJobs(job_id, slice_record['hrn']) return def GetLeaseGranularity(self): """ Returns the granularity of Senslab testbed. + OAR returns seconds for experiments duration. Defined in seconds. """ grain = 60 return grain - def GetLeases(self, lease_filter_dict=None, return_fields_list=None): - unfiltered_reservation_list = self.GetReservedNodes() + def update_jobs_in_slabdb(self, job_oar_list, jobs_psql): + #Get all the entries in slab_xp table + + + jobs_psql = set(jobs_psql) + kept_jobs = set(job_oar_list).intersection(jobs_psql) + logger.debug ( "\r\n \t\tt update_jobs_in_slabdb jobs_psql %s \r\n \t job_oar_list %s \ + kept_jobs %s " %(jobs_psql,job_oar_list,kept_jobs)) + deleted_jobs = set(jobs_psql).difference(kept_jobs) + deleted_jobs = list(deleted_jobs) + if len(deleted_jobs) > 0: + slab_dbsession.query(SenslabXP).filter(SenslabXP.job_id.in_(deleted_jobs)).delete(synchronize_session='fetch') + slab_dbsession.commit() + + return + + + + def GetLeases(self, lease_filter_dict=None, login=None): + + + unfiltered_reservation_list = self.GetReservedNodes(login) + reservation_list = [] #Find the slice associated with this user senslab ldap uid - logger.debug(" SLABDRIVER.PY \tGetLeases ") + logger.debug(" SLABDRIVER.PY \tGetLeases login %s unfiltered_reservation_list %s " %(login ,unfiltered_reservation_list)) + #Create user dict first to avoid looking several times for + #the same user in LDAP SA 27/07/12 + resa_user_dict = {} + job_oar_list = [] + + jobs_psql_query = slab_dbsession.query(SenslabXP).all() + jobs_psql_dict = [ (row.job_id, row.__dict__ )for row in jobs_psql_query ] + jobs_psql_dict = dict(jobs_psql_dict) + logger.debug("SLABDRIVER \tGetLeases jobs_psql_dict %s"\ + %(jobs_psql_dict)) + jobs_psql_id_list = [ row.job_id for row in jobs_psql_query ] + + + for resa in unfiltered_reservation_list: - ldap_info = self.ldap.LdapSearch('(uid='+resa['user']+')') - ldap_info = ldap_info[0][1] - - user = dbsession.query(RegUser).filter_by(email = \ - ldap_info['mail'][0]).first() - #Separated in case user not in database : record_id not defined SA 17/07//12 - query_slice_info = slab_dbsession.query(SliceSenslab).filter_by(record_id_user = user.record_id) - if query_slice_info: - slice_info = query_slice_info.first() + logger.debug("SLABDRIVER \tGetLeases USER %s"\ + %(resa['user'])) + #Cosntruct list of jobs (runing, waiting..) in oar + job_oar_list.append(resa['lease_id']) + #If there is information on the job in SLAB DB (slice used and job id) + if resa['lease_id'] in jobs_psql_dict: + job_info = jobs_psql_dict[resa['lease_id']] + logger.debug("SLABDRIVER \tGetLeases resa_user_dict %s"\ + %(resa_user_dict)) + resa['slice_hrn'] = job_info['slice_hrn'] + resa['slice_id'] = hrn_to_urn(resa['slice_hrn'], 'slice') - #Put the slice_urn - resa['slice_id'] = hrn_to_urn(slice_info.slice_hrn, 'slice') - resa['component_id_list'] = [] - #Transform the hostnames into urns (component ids) - for node in resa['reserved_nodes']: - #resa['component_id_list'].append(hostname_to_urn(self.hrn, \ - #self.root_auth, node['hostname'])) - slab_xrn = slab_xrn_object(self.root_auth, node['hostname']) - resa['component_id_list'].append(slab_xrn.urn) - - #Filter the reservation list if necessary - #Returns all the leases associated with a given slice - if lease_filter_dict: - logger.debug("SLABDRIVER \tGetLeases lease_filter_dict %s"\ - %(lease_filter_dict)) - for resa in unfiltered_reservation_list: - if lease_filter_dict['name'] == resa['slice_id']: - reservation_list.append(resa) - else: + #Assume it is a senslab slice: + else: + resa['slice_id'] = hrn_to_urn(self.root_auth+'.'+ resa['user'] +"_slice" , 'slice') + #if resa['user'] not in resa_user_dict: + #logger.debug("SLABDRIVER \tGetLeases userNOTIN ") + #ldap_info = self.ldap.LdapSearch('(uid='+resa['user']+')') + #if ldap_info: + #ldap_info = ldap_info[0][1] + ##Get the backref :relationship table reg-researchers + #user = dbsession.query(RegUser).options(joinedload('reg_slices_as_researcher')).filter_by(email = \ + #ldap_info['mail'][0]) + #if user: + #user = user.first() + #user = user.__dict__ + #slice_info = user['reg_slices_as_researcher'][0].__dict__ + ##Separated in case user not in database : + ##record_id not defined SA 17/07//12 + + ##query_slice_info = slab_dbsession.query(SenslabXP).filter_by(record_id_user = user.record_id) + ##if query_slice_info: + ##slice_info = query_slice_info.first() + ##else: + ##slice_info = None + + #resa_user_dict[resa['user']] = {} + #resa_user_dict[resa['user']]['ldap_info'] = user + #resa_user_dict[resa['user']]['slice_info'] = slice_info + + #resa['slice_hrn'] = resa_user_dict[resa['user']]['slice_info']['hrn'] + #resa['slice_id'] = hrn_to_urn(resa['slice_hrn'], 'slice') + + + resa['component_id_list'] = [] + resa['hrn'] = Xrn(resa['slice_id']).get_hrn() + #Transform the hostnames into urns (component ids) + for node in resa['reserved_nodes']: + #resa['component_id_list'].append(hostname_to_urn(self.hrn, \ + #self.root_auth, node['hostname'])) + slab_xrn = slab_xrn_object(self.root_auth, node) + resa['component_id_list'].append(slab_xrn.urn) + + if lease_filter_dict: + logger.debug("SLABDRIVER \tGetLeases resa_ %s \r\n leasefilter %s"\ + %(resa,lease_filter_dict)) + + if lease_filter_dict['name'] == resa['hrn']: + reservation_list.append(resa) + + if lease_filter_dict is None: reservation_list = unfiltered_reservation_list + #else: + #del unfiltered_reservation_list[unfiltered_reservation_list.index(resa)] + + + self.update_jobs_in_slabdb(job_oar_list, jobs_psql_id_list) + + #for resa in unfiltered_reservation_list: + + + ##Put the slice_urn + #if resa['user'] in resa_user_dict: + #resa['slice_hrn'] = resa_user_dict[resa['user']]['slice_info']['hrn'] + #resa['slice_id'] = hrn_to_urn(resa['slice_hrn'], 'slice') + ##Put the slice_urn + ##resa['slice_id'] = hrn_to_urn(slice_info.slice_hrn, 'slice') + #resa['component_id_list'] = [] + ##Transform the hostnames into urns (component ids) + #for node in resa['reserved_nodes']: + ##resa['component_id_list'].append(hostname_to_urn(self.hrn, \ + ##self.root_auth, node['hostname'])) + #slab_xrn = slab_xrn_object(self.root_auth, node) + #resa['component_id_list'].append(slab_xrn.urn) + + ##Filter the reservation list if necessary + ##Returns all the leases associated with a given slice + #if lease_filter_dict: + #logger.debug("SLABDRIVER \tGetLeases lease_filter_dict %s"\ + #%(lease_filter_dict)) + #for resa in unfiltered_reservation_list: + #if lease_filter_dict['name'] == resa['slice_hrn']: + #reservation_list.append(resa) + #else: + #reservation_list = unfiltered_reservation_list logger.debug(" SLABDRIVER.PY \tGetLeases reservation_list %s"\ %(reservation_list)) @@ -1185,52 +1479,91 @@ class SlabDriver(Driver): #about the user of this slice. This kind of #information is in the Senslab's DB. if str(record['type']) == 'slice': + if 'reg_researchers' in record and isinstance(record['reg_researchers'],list) : + record['reg_researchers'] = record['reg_researchers'][0].__dict__ + record.update({'PI':[record['reg_researchers']['hrn']], + 'researcher': [record['reg_researchers']['hrn']], + 'name':record['hrn'], + 'oar_job_id':[], + 'node_ids': [], + 'person_ids':[record['reg_researchers']['record_id']], + 'geni_urn':'', #For client_helper.py compatibility + 'keys':'', #For client_helper.py compatibility + 'key_ids':''}) #For client_helper.py compatibility + + #Get slab slice record. - recslice = self.GetSlices(slice_filter = \ + recslice_list = self.GetSlices(slice_filter = \ str(record['hrn']),\ slice_filter_type = 'slice_hrn') - recuser = dbsession.query(RegRecord).filter_by(record_id = \ - recslice['record_id_user']).first() - logger.debug( "SLABDRIVER.PY \t fill_record_info SLICE \ - rec %s \r\n \r\n" %(recslice)) - record.update({'PI':[recuser.hrn], - 'researcher': [recuser.hrn], - 'name':record['hrn'], - 'oar_job_id':recslice['oar_job_id'], - 'node_ids': [], - 'person_ids':[recslice['record_id_user']], - 'geni_urn':'', #For client_helper.py compatibility - 'keys':'', #For client_helper.py compatibility - 'key_ids':''}) #For client_helper.py compatibility - elif str(record['type']) == 'user': + #recuser = recslice_list[0]['reg_researchers'] + ##recuser = dbsession.query(RegRecord).filter_by(record_id = \ + ##recslice_list[0]['record_id_user']).first() + + #record.update({'PI':[recuser['hrn']], + #'researcher': [recuser['hrn']], + #'name':record['hrn'], + #'oar_job_id':[], + #'node_ids': [], + #'person_ids':[recslice_list[0]['reg_researchers']['record_id']], + #'geni_urn':'', #For client_helper.py compatibility + #'keys':'', #For client_helper.py compatibility + #'key_ids':''}) #For client_helper.py compatibility + logger.debug("SLABDRIVER \tfill_record_info TYPE SLICE RECUSER record['hrn'] %s ecord['oar_job_id'] %s " %(record['hrn'],record['oar_job_id'])) + try: + for rec in recslice_list: + logger.debug("SLABDRIVER\r\n \t \t fill_record_info oar_job_id %s " %(rec['oar_job_id'])) + #record['oar_job_id'].append(rec['oar_job_id']) + #del record['_sa_instance_state'] + del record['reg_researchers'] + record['node_ids'] = [ self.root_auth + hostname for hostname in rec['node_ids']] + except KeyError: + pass + + logger.debug( "SLABDRIVER.PY \t fill_record_info SLICE \ + recslice_list %s \r\n \t RECORD %s \r\n \r\n" %(recslice_list,record)) + if str(record['type']) == 'user': #The record is a SFA user record. #Get the information about his slice from Senslab's DB #and add it to the user record. - recslice = self.GetSlices(\ + recslice_list = self.GetSlices(\ slice_filter = record['record_id'],\ slice_filter_type = 'record_id_user') - logger.debug( "SLABDRIVER.PY \t fill_record_info user \ - rec %s \r\n \r\n" %(recslice)) + logger.debug( "SLABDRIVER.PY \t fill_record_info TYPE USER \ + recslice_list %s \r\n \t RECORD %s \r\n" %(recslice_list , record)) #Append slice record in records list, #therefore fetches user and slice info again(one more loop) #Will update PIs and researcher for the slice - recuser = dbsession.query(RegRecord).filter_by(record_id = \ - recslice['record_id_user']).first() - recslice.update({'PI':[recuser.hrn], - 'researcher': [recuser.hrn], - 'name':record['hrn'], - 'oar_job_id':recslice['oar_job_id'], - 'node_ids': [], - 'person_ids':[recslice['record_id_user']]}) + #recuser = dbsession.query(RegRecord).filter_by(record_id = \ + #recslice_list[0]['record_id_user']).first() + recuser = recslice_list[0]['reg_researchers'] + logger.debug( "SLABDRIVER.PY \t fill_record_info USER \ + recuser %s \r\n \r\n" %(recuser)) + recslice = {} + recslice = recslice_list[0] + recslice.update({'PI':[recuser['hrn']], + 'researcher': [recuser['hrn']], + 'name':record['hrn'], + 'node_ids': [], + 'oar_job_id': [], + 'person_ids':[recuser['record_id']]}) + try: + for rec in recslice_list: + recslice['oar_job_id'].append(rec['oar_job_id']) + except KeyError: + pass + + recslice.update({'type':'slice', \ + 'hrn':recslice_list[0]['hrn']}) + #GetPersons takes [] as filters #user_slab = self.GetPersons([{'hrn':recuser.hrn}]) user_slab = self.GetPersons([record]) - recslice.update({'type':'slice', \ - 'hrn':recslice['slice_hrn']}) + record.update(user_slab[0]) #For client_helper.py compatibility record.update( { 'geni_urn':'', @@ -1240,17 +1573,20 @@ class SlabDriver(Driver): logger.debug("SLABDRIVER.PY \tfill_record_info ADDING SLICE\ INFO TO USER records %s" %(record_list)) - + + logger.debug("SLABDRIVER.PY \tfill_record_info END \ + record %s \r\n \r\n " %(record)) except TypeError, error: logger.log_exc("SLABDRIVER \t fill_record_info EXCEPTION %s"\ %(error)) - + #logger.debug("SLABDRIVER.PY \t fill_record_info ENDENDEND ") + return #self.fill_record_slab_info(records) - - + + @@ -1370,7 +1706,7 @@ class SlabDriver(Driver): return #TODO UpdatePerson 04/07/2012 SA - def UpdatePerson(self, auth, person_id_or_email, person_fields=None): + def UpdatePerson(self, slab_hrn, federated_hrn, person_fields=None): """Updates a person. Only the fields specified in person_fields are updated, all other fields are left untouched. Users and techs can only update themselves. PIs can only update @@ -1379,7 +1715,11 @@ class SlabDriver(Driver): FROM PLC API DOC """ - logger.warning("SLABDRIVER UpdatePerson EMPTY - DO NOTHING \r\n ") + #new_row = FederatedToSenslab(slab_hrn, federated_hrn) + #slab_dbsession.add(new_row) + #slab_dbsession.commit() + + logger.debug("SLABDRIVER UpdatePerson EMPTY - DO NOTHING \r\n ") return #TODO GetKeys 04/07/2012 SA @@ -1439,13 +1779,29 @@ class SlabDriver(Driver): """ self.DeleteSliceFromNodes(slice_record) - self.db.update_job(slice_record['hrn'], job_id = -1, nodes = []) logger.warning("SLABDRIVER DeleteSlice %s "%(slice_record)) return + def __add_person_to_db(self, user_dict): + + check_if_exists = dbsession.query(RegUser).filter_by(email = user_dict['email']).first() + #user doesn't exists + if not check_if_exists: + logger.debug("__add_person_to_db \t Adding %s \r\n \r\n \ + _________________________________________________________________________\ + " %(user_dict['hrn'])) + user_record = RegUser(hrn =user_dict['hrn'] , pointer= '-1', authority=get_authority(hrn), \ + email= user_dict['email'], gid = None) + user_record.reg_keys = [RegKey(user_dict['pkey'])] + user_record.just_created() + dbsession.add (user_record) + dbsession.commit() + return + #TODO AddPerson 04/07/2012 SA - def AddPerson(self, auth, person_fields=None): - """Adds a new account. Any fields specified in person_fields are used, + #def AddPerson(self, auth, person_fields=None): + def AddPerson(self, record):#TODO fixing 28/08//2012 SA + """Adds a new account. Any fields specified in records are used, otherwise defaults are used. Accounts are disabled by default. To enable an account, use UpdatePerson(). @@ -1453,8 +1809,10 @@ class SlabDriver(Driver): FROM PLC API DOC """ - logger.warning("SLABDRIVER AddPerson EMPTY - DO NOTHING \r\n ") - return + ret = self.ldap.LdapAddUser(record) + logger.debug("SLABDRIVER AddPerson return code %s \r\n "%(ret)) + self.__add_person_to_db(record) + return ret['uid'] #TODO AddPersonToSite 04/07/2012 SA def AddPersonToSite (self, auth, person_id_or_email, \ @@ -1491,3 +1849,12 @@ class SlabDriver(Driver): """ logger.warning("SLABDRIVER AddPersonKey EMPTY - DO NOTHING \r\n ") return + + def DeleteLeases(self, leases_id_list, slice_hrn ): + logger.debug("SLABDRIVER DeleteLeases leases_id_list %s slice_hrn %s \ + \r\n " %(leases_id_list, slice_hrn)) + for job_id in leases_id_list: + self.DeleteJobs(job_id, slice_hrn) + + + return