From: Sandrine Avakian Date: Fri, 29 Jun 2012 12:41:12 +0000 (+0200) Subject: First input in handling leases. X-Git-Tag: sfa-2.1-24~3^2~145 X-Git-Url: http://git.onelab.eu/?a=commitdiff_plain;h=072119885b0ceb41cc639731e0062fc1e3edfe3a;p=sfa.git First input in handling leases. Still dealing with bugs from the OAR migration. --- diff --git a/sfa/senslab/OARrestapi.py b/sfa/senslab/OARrestapi.py index 19b60c2f..1b099bc5 100644 --- a/sfa/senslab/OARrestapi.py +++ b/sfa/senslab/OARrestapi.py @@ -133,9 +133,13 @@ class OARrestapi: #raise ServerError("Failed to parse Server Response:" + answer) -#def AddNodeNetworkAddr(self,tuplelist,value): - #tuplelist.append(('hostname',str(value))) - + +def AddOarNodeId(tuplelist, value): + """ Adds Oar internal node id to the nodes attributes """ + + tuplelist.append(('oar_id', int(value))) + + def AddNodeNetworkAddr(dictnode, value): #Inserts new key. The value associated is a tuple list node_id = value @@ -186,7 +190,7 @@ class OARGETParser: 'posx': AddPosX, 'posy': AddPosY, 'state':AddBootState, - #'id' : AddNodeId, + 'id' : AddOarNodeId, } @@ -324,11 +328,18 @@ class OARGETParser: def ParseJobsIdResources(self): """ BROKEN since oar 2.5 - Parses the json produced by the request /oarapi/jobs/id.json. + Parses the json produced by the request + /oarapi/jobs/id/resources.json. + Returns a list of oar node ids that are scheduled for the + given job id. """ + job_resources = [] + for resource in self.raw_json['items']: + job_resources.append(resource['id']) + logger.debug("OARESTAPI \tParseJobsIdResources %s" %(self.raw_json)) - return self.raw_json + return job_resources def ParseResources(self) : """ Parses the json produced by a get_resources request on oar.""" @@ -342,14 +353,24 @@ class OARGETParser: """ Returns an array containing the list of the reserved nodes """ #resources are listed inside the 'items' list from the json - nodes = [] + reservation_list = [] print "ParseReservedNodes_%s" %(self.raw_json['items']) - for job in self.raw_json['items']: - for node in job['nodes']: - print "ParseReservedNodes________node %s" %(node) - logger.debug("ParseReservedNodes________node %s" %(node)) - nodes.append(node['network_address']) - return nodes + job = {} + #Parse resources info + for json_element in self.raw_json['items']: + job['t_from'] = json_element['scheduled_start'] + #Get resources id list for the job + job['resource_ids'] = \ + [ node_dict['id'] for node_dict in json_element['resources'] ] + + job['state'] = json_element['state'] + job['lease_id'] = json_element['id'] + job['t_until'] = json_element['scheduled_start'] + \ + json_element['walltime'] + job['user'] = json_element['owner'] + logger.debug("ParseReservedNodes________job %s" %(job)) + reservation_list.append(job) + return reservation_list def ParseRunningJobs(self): """ Gets the list of nodes currently in use from the attributes of the @@ -418,7 +439,6 @@ class OARGETParser: # dictionary is empty and/or a new node has to be inserted node_id = self.resources_fulljson_dict['network_address'](\ self.node_dictlist, dictline['network_address']) - #node_id = self.resources_fulljson_dict['network_address'](self,self.node_dictlist, dictline['network_address']) for k in keys: if k in dictline: if k == 'network_address': @@ -426,8 +446,7 @@ class OARGETParser: self.resources_fulljson_dict[k](\ self.node_dictlist[node_id], dictline[k]) - #self.resources_fulljson_dict[k](self,self.node_dictlist[node_id], dictline[k]) - + #The last property has been inserted in the property tuple list, #reset node_id #Turn the property tuple list (=dict value) into a dictionary @@ -465,9 +484,9 @@ class OARGETParser: node = self.node_dictlist[node_id] node.update({'hrn':self.hostname_to_hrn(self.interface_hrn, \ node['site'],node['hostname'])}) - #node['hrn'] = self.hostname_to_hrn(self.interface_hrn, node['site_login_base'],node['hostname']) + self.node_dictlist.update({node_id:node}) - #if node_id is 1: + if node['site'] not in self.site_dict: self.site_dict[node['site']] = { 'site':node['site'], diff --git a/sfa/senslab/slabaggregate.py b/sfa/senslab/slabaggregate.py index 379e0c1f..5effcbef 100644 --- a/sfa/senslab/slabaggregate.py +++ b/sfa/senslab/slabaggregate.py @@ -35,7 +35,7 @@ class SlabAggregate: links = {} node_tags = {} - prepared=False + prepared = False user_options = {} @@ -61,7 +61,7 @@ class SlabAggregate: if isinstance(slice, list): slice = slices[0] else: - slice =slices + slice = slices # sort slivers by node id , if there is a job #and therfore, node allocated to this slice @@ -74,7 +74,7 @@ class SlabAggregate: 'name': slice['slice_hrn'], 'type': 'slab-node', 'tags': []}) - slivers[node_id]= sliver + slivers[node_id] = sliver except KeyError: print>>sys.stderr, " \r\n \t\t get_slice_and_slivers KeyError " ## sort sliver attributes by node id @@ -140,7 +140,7 @@ class SlabAggregate: #node_tags = self.get_node_tags(tags_filter) - reserved_nodes=self.driver.GetReservedNodes() + reserved_nodes = self.driver.GetNodesCurrentlyInUse() rspec_nodes = [] for node in nodes: # skip whitelisted nodes @@ -211,6 +211,7 @@ class SlabAggregate: return (rspec_nodes) + #from plc/aggregate.py def get_rspec(self, slice_xrn=None, version = None, options={}): diff --git a/sfa/senslab/slabdriver.py b/sfa/senslab/slabdriver.py index e3201136..dd6a8406 100644 --- a/sfa/senslab/slabdriver.py +++ b/sfa/senslab/slabdriver.py @@ -11,7 +11,7 @@ from sfa.util.defaultdict import defaultdict from sfa.storage.record import Record from sfa.storage.alchemy import dbsession -from sfa.storage.model import RegRecord +from sfa.storage.model import RegRecord, RegUser from sfa.trust.credential import Credential from sfa.trust.gid import GID @@ -21,7 +21,8 @@ from sfa.rspecs.version_manager import VersionManager from sfa.rspecs.rspec import RSpec from sfa.util.xrn import hrn_to_urn, urn_to_sliver_id -from sfa.planetlab.plxrn import slicename_to_hrn, hostname_to_hrn, hrn_to_pl_slicename +from sfa.planetlab.plxrn import slicename_to_hrn, hostname_to_hrn, \ + hrn_to_pl_slicename, hostname_to_urn ## thierry: everything that is API-related (i.e. handling incoming requests) # is taken care of @@ -45,7 +46,7 @@ class SlabDriver(Driver): def __init__(self, config): Driver.__init__ (self, config) - self.config=config + self.config = config self.hrn = config.SFA_INTERFACE_HRN self.root_auth = config.SFA_REGISTRY_ROOT_AUTH @@ -476,7 +477,7 @@ class SlabDriver(Driver): #Get job info from OAR job_info = self.oar.parser.SendRequest(req, job_id, username) - logger.debug("SLABDRIVER \t GetJobs %s " %(job_info)) + logger.debug("SLABDRIVER \t GetJobsId %s " %(job_info)) try: if job_info['state'] == 'Terminated': logger.debug("SLABDRIVER \t GetJobsId job %s TERMINATED"\ @@ -509,15 +510,18 @@ class SlabDriver(Driver): req = "GET_jobs_id_resources" node_list_k = 'reserved_resources' - #Get job info from OAR - job_info = self.oar.parser.SendRequest(req, job_id, username) - logger.debug("SLABDRIVER \t GetJobsResources %s " %(job_info)) + #Get job resources list from OAR + node_id_list = self.oar.parser.SendRequest(req, job_id, username) + logger.debug("SLABDRIVER \t GetJobsResources %s " %(node_id_list)) - parsed_job_info = self.get_info_on_reserved_nodes(job_info,node_list_k) + hostname_list = \ + self.__get_hostnames_from_oar_node_ids(node_id_list) + + #parsed_job_info = self.get_info_on_reserved_nodes(job_info,node_list_k) #Replaces the previous entry "assigned_network_address" / "reserved_resources" #with "node_ids" - job_info.update({'node_ids':parsed_job_info[node_list_k]}) - del job_info[node_list_k] + job_info = {'node_ids':hostname_list} + return job_info @@ -544,11 +548,35 @@ class SlabDriver(Driver): logger.error("SLABDRIVER \t get_info_on_reserved_nodes KEYERROR " ) return reserved_node_hostname_list - + + def GetNodesCurrentlyInUse(self): + """Returns a list of all the nodes already involved in an oar job""" + return self.oar.parser.SendRequest("GET_running_jobs") + + def __get_hostnames_from_oar_node_ids(self, resource_id_list ): + full_nodes_dict_list = self.GetNodes() + #Put the full node list into a dictionary keyed by oar node id + oar_id_node_dict = {} + for node in full_nodes_dict_list: + oar_id_node_dict[node['oar_id']] = node + + hostname_list = [] + for resource_id in resource_id_list: + hostname_list.append(oar_id_node_dict[resource_id]['hostname']) + return hostname_list + def GetReservedNodes(self): - # this function returns a list of all the nodes already involved in an oar job - return self.oar.parser.SendRequest("GET_running_jobs") - + #Get the nodes in use and the reserved nodes + reservation_dict_list = self.oar.parser.SendRequest("GET_reserved_nodes") + + oar_node_id_dict = self.__get_oar_node_ids() + + for resa in reservation_dict_list: + logger.debug ("GetReservedNodes resa %s"%(resa)) + resa['reserved_nodes_hostnames'] = \ + self.__get_hostnames_from_oar_node_ids(resa['resource_ids']) + del resa['resource_ids'] + return reservation_dict_list def GetNodes(self,node_filter_dict = None, return_fields_list = None): """ @@ -655,7 +683,7 @@ class SlabDriver(Driver): except KeyError: pass - #print >>sys.stderr, " \r\n \r\n \tSLABDRIVER.PY GetSlices rec %s" %(rec) + logger.debug("SLABDRIVER.PY GetSlices rec %s" %(rec)) return rec @@ -675,7 +703,7 @@ class SlabDriver(Driver): - def testbed_name (self): return "senslab2" + def testbed_name (self): return self.hrn # 'geni_request_rspec_versions' and 'geni_ad_rspec_versions' are mandatory def aggregate_version (self): @@ -758,9 +786,71 @@ class SlabDriver(Driver): return slab_record - + def __process_walltime(self,duration=None): + """ Calculates the walltime in seconds from the duration in H:M:S + specified in the RSpec. + + """ + if duration: + walltime = duration.split(":") + # Fixing the walltime by adding a few delays. First put the walltime + # in seconds oarAdditionalDelay = 20; additional delay for + # /bin/sleep command to + # take in account prologue and epilogue scripts execution + # int walltimeAdditionalDelay = 120; additional delay + + desired_walltime = int(walltime[0])*3600 + int(walltime[1]) * 60 +\ + int(walltime[2]) + total_walltime = desired_walltime + 140 #+2 min 20 + sleep_walltime = desired_walltime + 20 #+20 sec + logger.debug("SLABDRIVER \t__process_walltime desired_walltime %s\ + total_walltime %s sleep_walltime %s "\ + %(desired_walltime, total_walltime, \ + sleep_walltime)) + #Put the walltime back in str form + #First get the hours + walltime[0] = str(total_walltime / 3600) + total_walltime = total_walltime - 3600 * int(walltime[0]) + #Get the remaining minutes + walltime[1] = str(total_walltime / 60) + total_walltime = total_walltime - 60 * int(walltime[1]) + #Get the seconds + walltime[2] = str(total_walltime) + logger.debug("SLABDRIVER \t__process_walltime walltime %s "\ + %(walltime)) + else: + #automatically set 10min +2 min 20 + walltime[0] = '0' + walltime[1] = '12' + walltime[2] = '20' + sleep_walltime = '620' + + return walltime, sleep_walltime + + + def __transforms_timestamp_into_date(xp_utc_timestamp = None): + """ Transforms unix timestamp into valid OAR date format """ + + #Used in case of a scheduled experiment (not immediate) + #To run an XP immediately, don't specify date and time in RSpec + #They will be set to None. + if xp_utc_timestamp: + #transform the xp_utc_timestamp into server readable time + xp_server_readable_date = datetime.fromtimestamp(int(\ + xp_utc_timestamp)).strftime(self.time_format) + + return xp_server_readable_date + + else: + return None + def LaunchExperimentOnOAR(self, slice_dict, added_nodes, slice_user=None): - + """ Creates the structure needed for a correct POST on OAR. + Makes the timestamp transformation into the appropriate format. + Sends the POST request to create the job with the resources in + added_nodes. + + """ site_list = [] nodeid_list =[] resource = "" @@ -768,60 +858,50 @@ class SlabDriver(Driver): slice_name = slice_dict['name'] try: slot = slice_dict['timeslot'] - logger.debug("SLABDRIVER.PY \tLaunchExperimentOnOAR slot %s" %(slot)) + logger.debug("SLABDRIVER.PY \tLaunchExperimentOnOAR \ + slot %s" %(slot)) except KeyError: #Running on default parameters #XP immediate , 10 mins - slot = {'date':None,'start_time':None, 'timezone':None,'duration':None }#10 min + slot = { 'date':None, 'start_time':None, + 'timezone':None, 'duration':None }#10 min reqdict['workdir']= '/tmp' reqdict['resource'] ="{network_address in (" - #reqdict['property'] ="network_address in (" + for node in added_nodes: - print>>sys.stderr, "\r\n \r\n \t\tLaunchExperimentOnOAR node %s" %(node) - - #Get the ID of the node : remove the root auth and put the site in a separate list - #s=node.split(".") - # NT: it's not clear for me if the nodenames will have the senslab prefix - # so lets take the last part only, for now. - #lastpart=s[-1] - #if s[0] == self.root_auth : - # Again here it's not clear if nodes will be prefixed with _, lets split and tanke the last part for now. + logger.debug("OARrestapi \tLaunchExperimentOnOAR \ + node %s" %(node)) + + #Get the ID of the node : remove the root auth and put + # the site in a separate list. + # NT: it's not clear for me if the nodenames will have the senslab + #prefix so lets take the last part only, for now. + + # Again here it's not clear if nodes will be prefixed with _, + #lets split and tanke the last part for now. #s=lastpart.split("_") - #nodeid=s[-1] + nodeid = node reqdict['resource'] += "'"+ nodeid +"', " nodeid_list.append(nodeid) - - reqdict['resource'] = reqdict['resource'][0: len( reqdict['resource'])-2] +")}/nodes=" + str(len(nodeid_list)) - if slot['duration']: - walltime = slot['duration'].split(":") - # Fixing the walltime by adding a few delays. First put the walltime in seconds - # oarAdditionalDelay = 20; additional delay for /bin/sleep command to - # take in account prologue and epilogue scripts execution - # int walltimeAdditionalDelay = 120; additional delay - - desired_walltime = int(walltime[0])*3600 + int(walltime[1]) * 60 + int(walltime[2]) - total_walltime = desired_walltime + 140 #+2 min 20 - sleep_walltime = desired_walltime + 20 #+20 sec - print>>sys.stderr, "\r\n \r\n \t\tLaunchExperimentOnOAR desired_walltime %s total_walltime %s sleep_walltime %s " %(desired_walltime,total_walltime,sleep_walltime) - #Put the walltime back in str form - #First get the hours - walltime[0] = str(total_walltime / 3600) - total_walltime = total_walltime - 3600 * int(walltime[0]) - #Get the remaining minutes - walltime[1] = str(total_walltime / 60) - total_walltime = total_walltime - 60 * int(walltime[1]) - #Get the seconds - walltime[2] = str(total_walltime) - print>>sys.stderr, "\r\n \r\n \t\tLaunchExperimentOnOAR walltime %s " %(walltime) - - reqdict['resource']+= ",walltime=" + str(walltime[0]) + ":" + str(walltime[1]) + ":" + str(walltime[2]) - reqdict['script_path'] = "/bin/sleep " + str(sleep_walltime) - else: - reqdict['resource']+= ",walltime=" + str(00) + ":" + str(12) + ":" + str(20) #+2 min 20 - reqdict['script_path'] = "/bin/sleep 620" #+20 sec + custom_length = len(reqdict['resource'])- 2 + reqdict['resource'] = reqdict['resource'][0:custom_length] + \ + ")}/nodes=" + str(len(nodeid_list)) + + #if slot['duration']: + walltime, sleep_walltime = self.__process_walltime(duration = \ + slot['duration']) + #else: + #walltime, sleep_walltime = self.__process_walltime(duration = None) + + reqdict['resource']+= ",walltime=" + str(walltime[0]) + \ + ":" + str(walltime[1]) + ":" + str(walltime[2]) + reqdict['script_path'] = "/bin/sleep " + str(sleep_walltime) + + + #In case of a scheduled experiment (not immediate) #To run an XP immediately, don't specify date and time in RSpec #They will be set to None. @@ -830,9 +910,12 @@ class SlabDriver(Driver): if slot['timezone'] is '' or slot['timezone'] is None: #assume it is server timezone from_zone=tz.gettz(server_tz) - print>>sys.stderr, "\r\n \r\n \t\tLaunchExperimentOnOAR timezone not specified server_tz %s from_zone %s" %(server_tz,from_zone) + logger.warning("SLABDRIVER \tLaunchExperimentOnOAR timezone \ + not specified server_tz %s from_zone %s" \ + %(server_tz,from_zone)) else: - #Get zone of the user from the reservation time given in the rspec + #Get zone of the user from the reservation time given + #in the rspec from_zone = tz.gettz(slot['timezone']) date = str(slot['date']) + " " + str(slot['start_time']) @@ -840,33 +923,19 @@ class SlabDriver(Driver): user_datetime = user_datetime.replace(tzinfo = from_zone) #Convert to server zone - #to_zone = tz.tzutc() + to_zone = tz.gettz(server_tz) reservation_date = user_datetime.astimezone(to_zone) #Readable time accpeted by OAR reqdict['reservation']= reservation_date.strftime(self.time_format) - print>>sys.stderr, "\r\n \r\n \t\tLaunchExperimentOnOAR reqdict['reservation'] %s " %(reqdict['reservation']) + logger.debug("SLABDRIVER \tLaunchExperimentOnOAR reqdict['reservation'] %s " %(reqdict['reservation'])) else: - # Immediate XP - # reservations are performed in the oar server timebase, so : - # 1- we get the server time(in UTC tz )/server timezone - # 2- convert the server UTC time in its timezone - # 3- add a custom delay to this time - # 4- convert this time to a readable form and it for the reservation request. - server_timestamp,server_tz = self.GetTimezone() - s_tz=tz.gettz(server_tz) - UTC_zone = tz.gettz("UTC") - #weird... datetime.fromtimestamp should work since we do from datetime import datetime - utc_server= datetime.fromtimestamp(float(server_timestamp)+20,UTC_zone) - server_localtime=utc_server.astimezone(s_tz) - - print>>sys.stderr, "\r\n \r\n \t\tLaunchExperimentOnOAR server_timestamp %s server_tz %s slice_name %s added_nodes %s username %s reqdict %s " %(server_timestamp,server_tz,slice_name,added_nodes,slice_user, reqdict ) - readable_time = server_localtime.strftime(self.time_format) - - print >>sys.stderr," \r\n \r\n \t\t\t\tAPRES ParseTimezone readable_time %s timestanp %s " %(readable_time ,server_timestamp) - reqdict['reservation'] = readable_time + # Immediate XP. Not need to add special parameters. + # normally not used in SFA + + pass reqdict['type'] = "deploy" @@ -878,14 +947,14 @@ class SlabDriver(Driver): logger.debug("SLABDRIVER.PY \tLaunchExperimentOnOAR reqdict %s \r\n site_list %s" %(reqdict,site_list) ) answer = self.oar.POSTRequestToOARRestAPI('POST_job',reqdict,slice_user) - print>>sys.stderr, "\r\n \r\n LaunchExperimentOnOAR jobid %s " %(answer) + logger.debug("SLABDRIVER \tLaunchExperimentOnOAR jobid %s " %(answer)) try: jobid = answer['id'] except KeyError: - print>>sys.stderr, "\r\n AddSliceTonode Impossible to create job %s " %( answer) + logger.log_exc("SLABDRIVER \tLaunchExperimentOnOAR Impossible to create job %s " %(answer)) return - print>>sys.stderr, "\r\n \r\n LaunchExperimentOnOAR jobid %s added_nodes %s slice_user %s" %(jobid,added_nodes,slice_user) + logger.debug("SLABDRIVER \tLaunchExperimentOnOAR jobid %s added_nodes %s slice_user %s" %(jobid,added_nodes,slice_user)) self.db.update_job( slice_name, jobid ,added_nodes) @@ -923,7 +992,22 @@ class SlabDriver(Driver): - + + def GetLeases(self, lease_filter=None, return_fields_list=None): + reservation_list = self.GetReservedNodes() + #Find the slice associated with this user senslab ldap uid + for resa in reservation_list: + ldap_info = self.ldap.LdapSearch('(uid='+resa['user']+')') + user = dbsession.query(RegUser).filter_by(email = ldap_info['mail']).first() + slice_info = slab_dbsession.query(SliceSenslab).filter_by(record_id_user = user.record_id).first() + #Put the slice_urn + resa['slice_id'] = hrn_to_urn(slice_info.slice_hrn, 'slice') + resa['component_id_list'] = [] + #Transform the hostnames into urns (component ids) + for hostname in resa['reserved_nodes_hostnames']: + resa['component_id_list'].append(hostname_to_urn(self.hrn, self.root_auth, hostname)) + + return resa def augment_records_with_testbed_info (self, sfa_records): return self.fill_record_info (sfa_records) diff --git a/sfa/senslab/slabslices.py b/sfa/senslab/slabslices.py index 261c58c6..f23f96df 100644 --- a/sfa/senslab/slabslices.py +++ b/sfa/senslab/slabslices.py @@ -49,6 +49,7 @@ class SlabSlices: # Get user information + #TODO alchemy_person = dbsession.query(RegRecord).filter_by(record_id = slice['record_id_user']).first() slivers = [] @@ -167,7 +168,8 @@ class SlabSlices: try: slot = slice['timeslot'] self.driver.LaunchExperimentOnOAR(slice, added_nodes, username) - except KeyError: + except KeyError: + logger.log_exc("SLABSLICES \tVERIFY_SLICE_NODES KeyError slice %s " %(slice)) pass