From: Sandrine Avakian Date: Mon, 2 Jul 2012 12:09:39 +0000 (+0200) Subject: Added functionnal lease support. X-Git-Tag: sfa-2.1-24~3^2~144 X-Git-Url: http://git.onelab.eu/?a=commitdiff_plain;h=5cb768041b504556c5f4bc6bba9ae6a44bf852b2;p=sfa.git Added functionnal lease support. --- diff --git a/sfa/senslab/OARrestapi.py b/sfa/senslab/OARrestapi.py index 1b099bc5..5d3fd7af 100644 --- a/sfa/senslab/OARrestapi.py +++ b/sfa/senslab/OARrestapi.py @@ -1,5 +1,5 @@ #import sys -import httplib +from httplib import HTTPConnection, HTTPException import json #import datetime #from time import gmtime, strftime @@ -63,13 +63,14 @@ class OARrestapi: #seems that it does not work if we don't add this headers['content-length'] = '0' - conn = httplib.HTTPConnection(self.oarserver['ip'], \ + conn = HTTPConnection(self.oarserver['ip'], \ self.oarserver['port']) conn.request("GET", self.oarserver['uri'], data, headers) resp = ( conn.getresponse()).read() conn.close() - except NotConnected: - logger.log_exc("GET_OAR_SRVR : Could not reach OARserver") + except HTTPException, error : + logger.log_exc("GET_OAR_SRVR : Problem with OAR server : %s " \ + %(error)) #raise ServerError("GET_OAR_SRVR : Could not reach OARserver") try: js_dict = json.loads(resp) @@ -110,7 +111,7 @@ class OARrestapi: 'content-length':str(len(data))} try : - conn = httplib.HTTPConnection(self.oarserver['ip'], \ + conn = HTTPConnection(self.oarserver['ip'], \ self.oarserver['port']) conn.request("POST", self.oarserver['uri'], data, headers) resp = (conn.getresponse()).read() @@ -214,48 +215,8 @@ class OARGETParser: self.SendRequest("GET_version") - #def AddNodeNetworkAddr(self,tuplelist,value): - #tuplelist.append(('hostname',str(value))) - - #def AddNodeNetworkAddr(self,dictnode,value): - ##Inserts new key. The value associated is a tuple list - #node_id = value - - #dictnode[node_id] = [('node_id',node_id),('hostname',node_id) ] - - #return node_id - - #def AddNodeSite(self,tuplelist,value): - #tuplelist.append(('site',str(value))) - - - #def AddNodeRadio(self,tuplelist,value): - #tuplelist.append(('radio',str(value))) - - - #def AddMobility(self,tuplelist,value): - #if value : - #tuplelist.append(('mobile',int(value))) - - - #def AddPosX(self,tuplelist,value): - #tuplelist.append(('posx',value)) - - - #def AddPosY(self,tuplelist,value): - #tuplelist.append(('posy',value)) - - #def AddBootState(self,tuplelist,value): - #tuplelist.append(('boot_state',str(value))) - - ##Insert a new node into the dictnode dictionary - #def AddNodeId(self,dictnode,value): - ##Inserts new key. The value associated is a tuple list - #node_id = int(value) - - #dictnode[node_id] = [('node_id',node_id) ] - #return node_id + def ParseVersion(self) : #print self.raw_json @@ -370,6 +331,8 @@ class OARGETParser: job['user'] = json_element['owner'] logger.debug("ParseReservedNodes________job %s" %(job)) reservation_list.append(job) + #reset dict + job = {} return reservation_list def ParseRunningJobs(self): diff --git a/sfa/senslab/slabaggregate.py b/sfa/senslab/slabaggregate.py index 5effcbef..5fc796c2 100644 --- a/sfa/senslab/slabaggregate.py +++ b/sfa/senslab/slabaggregate.py @@ -1,15 +1,11 @@ -# import modules used here -- sys is a very standard one -import sys -import httplib -import json +#import httplib +#import json +import time - -#from sfa.senslab.OARrestapi import * - -from sfa.util.config import Config +#from sfa.util.config import Config from sfa.util.xrn import hrn_to_urn, urn_to_hrn, urn_to_sliver_id -from sfa.planetlab.plxrn import PlXrn, hostname_to_urn, hrn_to_pl_slicename +from sfa.planetlab.plxrn import PlXrn, hostname_to_urn, slicename_to_hrn from sfa.rspecs.rspec import RSpec from sfa.rspecs.elements.location import Location @@ -18,13 +14,17 @@ from sfa.rspecs.elements.node import Node #from sfa.rspecs.elements.login import Login #from sfa.rspecs.elements.services import Services from sfa.rspecs.elements.sliver import Sliver - +from sfa.rspecs.elements.lease import Lease from sfa.rspecs.version_manager import VersionManager -from sfa.util.sfatime import datetime_to_epoch +#from sfa.util.sfatime import datetime_to_epoch + -def hostname_to_hrn(root_auth,login_base,hostname): - return PlXrn(auth=root_auth,hostname=login_base + '_' +hostname).get_hrn() +from sfa.util.sfalogging import logger + + +def hostname_to_hrn(root_auth, login_base, hostname): + return PlXrn(auth=root_auth, hostname=login_base + '_' +hostname).get_hrn() class SlabAggregate: @@ -39,7 +39,7 @@ class SlabAggregate: user_options = {} - def __init__(self ,driver): + def __init__(self, driver): self.driver = driver def get_slice_and_slivers(self, slice_xrn): @@ -47,36 +47,40 @@ class SlabAggregate: Returns a dict of slivers keyed on the sliver's node_id """ slivers = {} - slice = None + sfa_slice = None if not slice_xrn: - return (slice, slivers) + return (sfa_slice, slivers) slice_urn = hrn_to_urn(slice_xrn, 'slice') slice_hrn, _ = urn_to_hrn(slice_xrn) slice_name = slice_hrn - print >>sys.stderr,"\r\n \r\n \t\t_____________ Slabaggregate api get_slice_and_slivers " - slices = self.driver.GetSlices(slice_filter= str(slice_name), slice_filter_type = 'slice_hrn') - print >>sys.stderr,"\r\n \r\n \t\t_____________ Slabaggregate api get_slice_and_slivers slices %s " %(slices) + + slices = self.driver.GetSlices(slice_filter= str(slice_name), \ + slice_filter_type = 'slice_hrn') + logger.debug("Slabaggregate api \tget_slice_and_slivers slices %s " \ + %(slices)) if not slices: - return (slice, slivers) - if isinstance(slice, list): - slice = slices[0] + return (sfa_slice, slivers) + if isinstance(sfa_slice, list): + sfa_slice = slices[0] else: - slice = slices + sfa_slice = slices # sort slivers by node id , if there is a job #and therfore, node allocated to this slice - if slice['oar_job_id'] is not -1: + if sfa_slice['oar_job_id'] is not -1: try: - for node_id in slice['node_ids']: + for node_id in sfa_slice['node_ids']: #node_id = self.driver.root_auth + '.' + node_id - sliver = Sliver({'sliver_id': urn_to_sliver_id(slice_urn, slice['record_id_slice'], node_id), - 'name': slice['slice_hrn'], + sliver = Sliver({'sliver_id': urn_to_sliver_id(slice_urn, \ + sfa_slice['record_id_slice'], node_id), + 'name': sfa_slice['slice_hrn'], 'type': 'slab-node', 'tags': []}) slivers[node_id] = sliver except KeyError: - print>>sys.stderr, " \r\n \t\t get_slice_and_slivers KeyError " + logger.log_exc("SLABAGGREGATE \t \ + get_slice_and_slivers KeyError ") ## sort sliver attributes by node id ##tags = self.driver.GetSliceTags({'slice_tag_id': slice['slice_tag_ids']}) ##for tag in tags: @@ -87,21 +91,24 @@ class SlabAggregate: ##'tags': []}) ##slivers[tag['node_id']] = sliver ##slivers[tag['node_id']]['tags'].append(tag) - print >>sys.stderr,"\r\n \r\n \t\t_____________ Slabaggregate api get_slice_and_slivers slivers %s " %(slivers) - return (slice, slivers) + logger.debug("SLABAGGREGATE api get_slice_and_slivers slivers %s "\ + %(slivers)) + return (sfa_slice, slivers) - def get_nodes(self, slice=None,slivers=[], options={}): + def get_nodes(self, slices=None, slivers=[], options={}): # NT: the semantic of this function is not clear to me : # if slice is not defined, then all the nodes should be returned - # if slice is defined, we should return only the nodes that are part of this slice + # if slice is defined, we should return only the nodes that + # are part of this slice # but what is the role of the slivers parameter ? # So i assume that slice['node_ids'] will be the same as slivers for us - filter = {} + #filter_dict = {} tags_filter = {} - # Commenting this part since all nodes should be returned, even if a slice is provided + # Commenting this part since all nodes should be returned, + # even if a slice is provided #if slice : # if 'node_ids' in slice and slice['node_ids']: # #first case, a non empty slice was provided @@ -151,11 +158,16 @@ class SlabAggregate: # xxx how to retrieve site['login_base'] #site_id=node['site_id'] #site=sites_dict[site_id] - rspec_node['component_id'] = hostname_to_urn(self.driver.root_auth, node['site'], node['hostname']) + rspec_node['component_id'] = \ + hostname_to_urn(self.driver.root_auth, \ + node['site'], node['hostname']) rspec_node['component_name'] = node['hostname'] - rspec_node['component_manager_id'] = hrn_to_urn(self.driver.root_auth, 'authority+sa') + rspec_node['component_manager_id'] = \ + hrn_to_urn(self.driver.root_auth, 'authority+sa') #rspec_node['component_manager_id'] = Xrn(self.driver.root_auth, 'authority+sa').get_urn() - rspec_node['authority_id'] = hrn_to_urn(PlXrn.site_hrn(self.driver.root_auth, node['site']), 'authority+sa') + rspec_node['authority_id'] = \ + hrn_to_urn(PlXrn.site_hrn(self.driver.root_auth, \ + node['site']), 'authority+sa') # do not include boot state ( element) in the manifest rspec #if not slice: @@ -177,10 +189,11 @@ class SlabAggregate: try: if node['posx'] and node['posy']: - location = Location({'longitude':node['posx'], 'latitude': node['posy']}) + location = Location({'longitude':node['posx'], \ + 'latitude': node['posy']}) rspec_node['location'] = location except KeyError: - pass + pass #rspec_node['interfaces'] = [] #if_count=0 #for if_id in node['interface_ids']: @@ -211,24 +224,70 @@ class SlabAggregate: return (rspec_nodes) + def get_leases(self, slice_record = None, options = {}): + + now = int(time.time()) + lease_filter = {'clip': now } + #if slice_record: + #lease_filter.update({'name': slice_record['name']}) + return_fields = ['lease_id', 'hostname', 'site_id', \ + 'name', 't_from', 't_until'] + #leases = self.driver.GetLeases(lease_filter) + leases = self.driver.GetLeases() + site_ids = [] + rspec_leases = [] + for lease in leases: + #as many leases as there are nodes in the job + for node in lease['reserved_nodes']: + rspec_lease = Lease() + rspec_lease['lease_id'] = lease['lease_id'] + site = node['site_id'] + rspec_lease['component_id'] = hostname_to_urn(self.driver.hrn, \ + site, node['hostname']) + rspec_lease['slice_id'] = lease['slice_id'] + rspec_lease['t_from'] = lease['t_from'] + rspec_lease['t_until'] = lease['t_until'] + rspec_leases.append(rspec_lease) + return rspec_leases + + #rspec_leases = [] + #for lease in leases: + + #rspec_lease = Lease() + + ## xxx how to retrieve site['login_base'] + + #rspec_lease['lease_id'] = lease['lease_id'] + #rspec_lease['component_id'] = hostname_to_urn(self.driver.hrn, \ + #site['login_base'], lease['hostname']) + #slice_hrn = slicename_to_hrn(self.driver.hrn, lease['name']) + #slice_urn = hrn_to_urn(slice_hrn, 'slice') + #rspec_lease['slice_id'] = slice_urn + #rspec_lease['t_from'] = lease['t_from'] + #rspec_lease['t_until'] = lease['t_until'] + #rspec_leases.append(rspec_lease) + #return rspec_leases #from plc/aggregate.py def get_rspec(self, slice_xrn=None, version = None, options={}): rspec = None - version_manager = VersionManager() - - version = version_manager.get_version(version) - print>>sys.stderr, " \r\n SlabAggregate \t\t get_rspec ************** version %s version.type %s version.version %s options %s \r\n" %(version,version.type,version.version,options) + version_manager = VersionManager() + version = version_manager.get_version(version) + logger.debug("SlabAggregate \t get_rspec ***version %s \ + version.type %s version.version %s options %s \r\n" \ + %(version,version.type,version.version,options)) - if not slice_xrn: - rspec_version = version_manager._get_version(version.type, version.version, 'ad') + if not slice_xrn: + rspec_version = version_manager._get_version(version.type, \ + version.version, 'ad') else: - rspec_version = version_manager._get_version(version.type, version.version, 'manifest') + rspec_version = version_manager._get_version(version.type, \ + version.version, 'manifest') - slice, slivers = self.get_slice_and_slivers(slice_xrn) - #at this point sliver my be {} if no senslab job is running for this user/slice. + slices, slivers = self.get_slice_and_slivers(slice_xrn) + #at this point sliver may be empty if no senslab job is running for this user/slice. rspec = RSpec(version=rspec_version, user_options=options) @@ -236,18 +295,22 @@ class SlabAggregate: #rspec.xml.set('expires', datetime_to_epoch(slice['expires'])) # add sliver defaults #nodes, links = self.get_nodes(slice, slivers) - nodes = self.get_nodes(slice,slivers) - print>>sys.stderr, " \r\n SlabAggregate \t\t get_rspec ************** options %s rspec_version %s version_manager %s rspec.version %s \r\n" %(options, rspec_version,version_manager, rspec.version) - rspec.version.add_nodes(nodes) - - - default_sliver = slivers.get(None, []) - if default_sliver: - default_sliver_attribs = default_sliver.get('tags', []) - print>>sys.stderr, " \r\n SlabAggregate \t\t get_rspec ************** default_sliver_attribs %s \r\n" %(default_sliver_attribs) - for attrib in default_sliver_attribs: - print>>sys.stderr, " \r\n SlabAggregate \t\t get_rspec ************** attrib %s \r\n" %(attrib) - logger.info(attrib) - rspec.version.add_default_sliver_attribute(attrib['tagname'], attrib['value']) + if not options.get('list_leases') or options.get('list_leases') and options['list_leases'] != 'leases': + nodes = self.get_nodes(slices, slivers) + rspec.version.add_nodes(nodes) + default_sliver = slivers.get(None, []) + if default_sliver: + default_sliver_attribs = default_sliver.get('tags', []) + logger.debug("SlabAggregate \tget_rspec **** \ + default_sliver_attribs %s \r\n" %(default_sliver_attribs)) + for attrib in default_sliver_attribs: + logger.debug("SlabAggregate \tget_rspec ******* attrib %s \r\n"\ + %(attrib)) + + rspec.version.add_default_sliver_attribute(attrib['tagname'], \ + attrib['value']) + if options.get('list_leases') : + leases = self.get_leases(slices) + rspec.version.add_leases(leases) return rspec.toxml() diff --git a/sfa/senslab/slabdriver.py b/sfa/senslab/slabdriver.py index dd6a8406..4de71726 100644 --- a/sfa/senslab/slabdriver.py +++ b/sfa/senslab/slabdriver.py @@ -229,13 +229,13 @@ class SlabDriver(Driver): #return rspec #panos: passing user-defined options - + logger.debug("SLABDRIVER \tlist_resources rspec " ) aggregate = SlabAggregate(self) origin_hrn = Credential(string=creds[0]).get_gid_caller().get_hrn() options.update({'origin_hrn':origin_hrn}) rspec = aggregate.get_rspec(slice_xrn=slice_urn, version=rspec_version, options=options) - print>>sys.stderr, " \r\n \r\n \t SLABDRIVER list_resources rspec " + # cache the result #if self.cache and not slice_hrn: #logger.debug("Slab.ListResources: stores advertisement in cache") @@ -518,9 +518,10 @@ class SlabDriver(Driver): self.__get_hostnames_from_oar_node_ids(node_id_list) #parsed_job_info = self.get_info_on_reserved_nodes(job_info,node_list_k) - #Replaces the previous entry "assigned_network_address" / "reserved_resources" + #Replaces the previous entry "assigned_network_address" / + #"reserved_resources" #with "node_ids" - job_info = {'node_ids':hostname_list} + job_info = {'node_ids': hostname_list} return job_info @@ -539,7 +540,7 @@ class SlabDriver(Driver): for index in range(len(job_info[node_list_name])): #job_info[node_list_name][k] = reserved_node_hostname_list[index] = \ - node_dict[job_info[node_list_name][index]]['hostname'] + node_dict[job_info[node_list_name][index]]['hostname'] logger.debug("SLABDRIVER \t get_info_on_reserved_nodes \ reserved_node_hostname_list %s" \ @@ -561,21 +562,27 @@ class SlabDriver(Driver): oar_id_node_dict[node['oar_id']] = node hostname_list = [] + hostname_dict_list = [] for resource_id in resource_id_list: - hostname_list.append(oar_id_node_dict[resource_id]['hostname']) - return hostname_list + hostname_dict_list.append({'hostname' : \ + oar_id_node_dict[resource_id]['hostname'], + 'site_id' : oar_id_node_dict[resource_id]['site']}) + + #hostname_list.append(oar_id_node_dict[resource_id]['hostname']) + return hostname_dict_list def GetReservedNodes(self): #Get the nodes in use and the reserved nodes reservation_dict_list = self.oar.parser.SendRequest("GET_reserved_nodes") - oar_node_id_dict = self.__get_oar_node_ids() for resa in reservation_dict_list: logger.debug ("GetReservedNodes resa %s"%(resa)) - resa['reserved_nodes_hostnames'] = \ + #dict list of hostnames and their site + resa['reserved_nodes'] = \ self.__get_hostnames_from_oar_node_ids(resa['resource_ids']) - del resa['resource_ids'] + + #del resa['resource_ids'] return reservation_dict_list def GetNodes(self,node_filter_dict = None, return_fields_list = None): @@ -614,39 +621,41 @@ class SlabDriver(Driver): return return_node_list - def GetSites(self, site_filter_name = None, return_fields_list = None): + def GetSites(self, site_filter_name_list = None, return_fields_list = None): site_dict = self.oar.parser.SendRequest("GET_sites") #site_dict : dict where the key is the sit ename return_site_list = [] - if not ( site_filter_name or return_fields_list): + if not ( site_filter_name_list or return_fields_list): return_site_list = site_dict.values() return return_site_list - if site_filter_name in site_dict: - if return_fields_list: - for field in return_fields_list: - tmp = {} - Create - try: - tmp[field] = site_dict[site_filter_name][field] - except KeyError: - logger.error("GetSites KeyError %s "%(field)) - return None - return_site_list.append(tmp) - else: - return_site_list.append( site_dict[site_filter_name]) + for site_filter_name in site_filter_name_list: + if site_filter_name in site_dict: + if return_fields_list: + for field in return_fields_list: + tmp = {} + Create + try: + tmp[field] = site_dict[site_filter_name][field] + except KeyError: + logger.error("GetSites KeyError %s "%(field)) + return None + return_site_list.append(tmp) + else: + return_site_list.append( site_dict[site_filter_name]) return return_site_list def GetSlices(self, slice_filter = None, slice_filter_type = None, \ - return_fields_list=None): + return_fields_list = None): return_slice_list = [] slicerec = {} rec = {} authorized_filter_types_list = ['slice_hrn', 'record_id_user'] - print>>sys.stderr,"\r\n SLABDRIVER \tGetSlices authorized_filter_types_list %s" %(authorized_filter_types_list) + logger.debug("SLABDRIVER \tGetSlices authorized_filter_types_list %s"\ + %(authorized_filter_types_list)) if slice_filter_type in authorized_filter_types_list: if slice_filter_type == 'slice_hrn': slicerec = slab_dbsession.query(SliceSenslab).\ @@ -661,17 +670,19 @@ class SlabDriver(Driver): print>>sys.stderr,"\r\n SLABDRIVER \tGetSlices rec %s" %(rec) #Get login login = slicerec.slice_hrn.split(".")[1].split("_")[0] - logger.debug("\r\n SLABDRIVER \tGetSlices login %s slice record %s"\ - %(login,rec)) + logger.debug("\r\n SLABDRIVER \tGetSlices login %s \ + slice record %s" %(login,rec)) if slicerec.oar_job_id is not -1: #Check with OAR the status of the job if a job id is in #the slice record - #rslt = self.GetJobsResources(slicerec.oar_job_id,username = login) - rslt = self.GetJobsId(slicerec.oar_job_id,username = login) + rslt = self.GetJobsResources(slicerec.oar_job_id, \ + username = login) + if rslt : rec.update(rslt) rec.update({'hrn':str(rec['slice_hrn'])}) - #If GetJobsResources is empty, this means the job is now in the 'Terminated' state + #If GetJobsResources is empty, this means the job is + #now in the 'Terminated' state #Update the slice record else : self.db.update_job(slice_filter, job_id = -1) @@ -996,18 +1007,24 @@ class SlabDriver(Driver): def GetLeases(self, lease_filter=None, return_fields_list=None): reservation_list = self.GetReservedNodes() #Find the slice associated with this user senslab ldap uid + logger.debug(" SLABDRIVER.PY \tGetLeases ") for resa in reservation_list: ldap_info = self.ldap.LdapSearch('(uid='+resa['user']+')') - user = dbsession.query(RegUser).filter_by(email = ldap_info['mail']).first() + ldap_info = ldap_info[0][1] + + user = dbsession.query(RegUser).filter_by(email = ldap_info['mail'][0]).first() + slice_info = slab_dbsession.query(SliceSenslab).filter_by(record_id_user = user.record_id).first() #Put the slice_urn resa['slice_id'] = hrn_to_urn(slice_info.slice_hrn, 'slice') resa['component_id_list'] = [] #Transform the hostnames into urns (component ids) - for hostname in resa['reserved_nodes_hostnames']: - resa['component_id_list'].append(hostname_to_urn(self.hrn, self.root_auth, hostname)) + for node in resa['reserved_nodes']: + resa['component_id_list'].append(hostname_to_urn(self.hrn, \ + self.root_auth, node['hostname'])) - return resa + logger.debug(" SLABDRIVER.PY \tGetLeases reservation_list %s"%(reservation_list)) + return reservation_list def augment_records_with_testbed_info (self, sfa_records): return self.fill_record_info (sfa_records)