#raise ServerError("Failed to parse Server Response:" + answer)
-#def AddNodeNetworkAddr(self,tuplelist,value):
- #tuplelist.append(('hostname',str(value)))
-
+
+def AddOarNodeId(tuplelist, value):
+ """ Adds Oar internal node id to the nodes attributes """
+
+ tuplelist.append(('oar_id', int(value)))
+
+
def AddNodeNetworkAddr(dictnode, value):
#Inserts new key. The value associated is a tuple list
node_id = value
'posx': AddPosX,
'posy': AddPosY,
'state':AddBootState,
- #'id' : AddNodeId,
+ 'id' : AddOarNodeId,
}
def ParseJobsIdResources(self):
""" BROKEN since oar 2.5
- Parses the json produced by the request /oarapi/jobs/id.json.
+ Parses the json produced by the request
+ /oarapi/jobs/id/resources.json.
+ Returns a list of oar node ids that are scheduled for the
+ given job id.
"""
+ job_resources = []
+ for resource in self.raw_json['items']:
+ job_resources.append(resource['id'])
+
logger.debug("OARESTAPI \tParseJobsIdResources %s" %(self.raw_json))
- return self.raw_json
+ return job_resources
def ParseResources(self) :
""" Parses the json produced by a get_resources request on oar."""
""" Returns an array containing the list of the reserved nodes """
#resources are listed inside the 'items' list from the json
- nodes = []
+ reservation_list = []
print "ParseReservedNodes_%s" %(self.raw_json['items'])
- for job in self.raw_json['items']:
- for node in job['nodes']:
- print "ParseReservedNodes________node %s" %(node)
- logger.debug("ParseReservedNodes________node %s" %(node))
- nodes.append(node['network_address'])
- return nodes
+ job = {}
+ #Parse resources info
+ for json_element in self.raw_json['items']:
+ job['t_from'] = json_element['scheduled_start']
+ #Get resources id list for the job
+ job['resource_ids'] = \
+ [ node_dict['id'] for node_dict in json_element['resources'] ]
+
+ job['state'] = json_element['state']
+ job['lease_id'] = json_element['id']
+ job['t_until'] = json_element['scheduled_start'] + \
+ json_element['walltime']
+ job['user'] = json_element['owner']
+ logger.debug("ParseReservedNodes________job %s" %(job))
+ reservation_list.append(job)
+ return reservation_list
def ParseRunningJobs(self):
""" Gets the list of nodes currently in use from the attributes of the
# dictionary is empty and/or a new node has to be inserted
node_id = self.resources_fulljson_dict['network_address'](\
self.node_dictlist, dictline['network_address'])
- #node_id = self.resources_fulljson_dict['network_address'](self,self.node_dictlist, dictline['network_address'])
for k in keys:
if k in dictline:
if k == 'network_address':
self.resources_fulljson_dict[k](\
self.node_dictlist[node_id], dictline[k])
- #self.resources_fulljson_dict[k](self,self.node_dictlist[node_id], dictline[k])
-
+
#The last property has been inserted in the property tuple list,
#reset node_id
#Turn the property tuple list (=dict value) into a dictionary
node = self.node_dictlist[node_id]
node.update({'hrn':self.hostname_to_hrn(self.interface_hrn, \
node['site'],node['hostname'])})
- #node['hrn'] = self.hostname_to_hrn(self.interface_hrn, node['site_login_base'],node['hostname'])
+
self.node_dictlist.update({node_id:node})
- #if node_id is 1:
+
if node['site'] not in self.site_dict:
self.site_dict[node['site']] = {
'site':node['site'],
from sfa.storage.record import Record
from sfa.storage.alchemy import dbsession
-from sfa.storage.model import RegRecord
+from sfa.storage.model import RegRecord, RegUser
from sfa.trust.credential import Credential
from sfa.trust.gid import GID
from sfa.rspecs.rspec import RSpec
from sfa.util.xrn import hrn_to_urn, urn_to_sliver_id
-from sfa.planetlab.plxrn import slicename_to_hrn, hostname_to_hrn, hrn_to_pl_slicename
+from sfa.planetlab.plxrn import slicename_to_hrn, hostname_to_hrn, \
+ hrn_to_pl_slicename, hostname_to_urn
## thierry: everything that is API-related (i.e. handling incoming requests)
# is taken care of
def __init__(self, config):
Driver.__init__ (self, config)
- self.config=config
+ self.config = config
self.hrn = config.SFA_INTERFACE_HRN
self.root_auth = config.SFA_REGISTRY_ROOT_AUTH
#Get job info from OAR
job_info = self.oar.parser.SendRequest(req, job_id, username)
- logger.debug("SLABDRIVER \t GetJobs %s " %(job_info))
+ logger.debug("SLABDRIVER \t GetJobsId %s " %(job_info))
try:
if job_info['state'] == 'Terminated':
logger.debug("SLABDRIVER \t GetJobsId job %s TERMINATED"\
req = "GET_jobs_id_resources"
node_list_k = 'reserved_resources'
- #Get job info from OAR
- job_info = self.oar.parser.SendRequest(req, job_id, username)
- logger.debug("SLABDRIVER \t GetJobsResources %s " %(job_info))
+ #Get job resources list from OAR
+ node_id_list = self.oar.parser.SendRequest(req, job_id, username)
+ logger.debug("SLABDRIVER \t GetJobsResources %s " %(node_id_list))
- parsed_job_info = self.get_info_on_reserved_nodes(job_info,node_list_k)
+ hostname_list = \
+ self.__get_hostnames_from_oar_node_ids(node_id_list)
+
+ #parsed_job_info = self.get_info_on_reserved_nodes(job_info,node_list_k)
#Replaces the previous entry "assigned_network_address" / "reserved_resources"
#with "node_ids"
- job_info.update({'node_ids':parsed_job_info[node_list_k]})
- del job_info[node_list_k]
+ job_info = {'node_ids':hostname_list}
+
return job_info
logger.error("SLABDRIVER \t get_info_on_reserved_nodes KEYERROR " )
return reserved_node_hostname_list
-
+
+ def GetNodesCurrentlyInUse(self):
+ """Returns a list of all the nodes already involved in an oar job"""
+ return self.oar.parser.SendRequest("GET_running_jobs")
+
+ def __get_hostnames_from_oar_node_ids(self, resource_id_list ):
+ full_nodes_dict_list = self.GetNodes()
+ #Put the full node list into a dictionary keyed by oar node id
+ oar_id_node_dict = {}
+ for node in full_nodes_dict_list:
+ oar_id_node_dict[node['oar_id']] = node
+
+ hostname_list = []
+ for resource_id in resource_id_list:
+ hostname_list.append(oar_id_node_dict[resource_id]['hostname'])
+ return hostname_list
+
def GetReservedNodes(self):
- # this function returns a list of all the nodes already involved in an oar job
- return self.oar.parser.SendRequest("GET_running_jobs")
-
+ #Get the nodes in use and the reserved nodes
+ reservation_dict_list = self.oar.parser.SendRequest("GET_reserved_nodes")
+
+ oar_node_id_dict = self.__get_oar_node_ids()
+
+ for resa in reservation_dict_list:
+ logger.debug ("GetReservedNodes resa %s"%(resa))
+ resa['reserved_nodes_hostnames'] = \
+ self.__get_hostnames_from_oar_node_ids(resa['resource_ids'])
+ del resa['resource_ids']
+ return reservation_dict_list
def GetNodes(self,node_filter_dict = None, return_fields_list = None):
"""
except KeyError:
pass
- #print >>sys.stderr, " \r\n \r\n \tSLABDRIVER.PY GetSlices rec %s" %(rec)
+ logger.debug("SLABDRIVER.PY GetSlices rec %s" %(rec))
return rec
- def testbed_name (self): return "senslab2"
+ def testbed_name (self): return self.hrn
# 'geni_request_rspec_versions' and 'geni_ad_rspec_versions' are mandatory
def aggregate_version (self):
return slab_record
-
+ def __process_walltime(self,duration=None):
+ """ Calculates the walltime in seconds from the duration in H:M:S
+ specified in the RSpec.
+
+ """
+ if duration:
+ walltime = duration.split(":")
+ # Fixing the walltime by adding a few delays. First put the walltime
+ # in seconds oarAdditionalDelay = 20; additional delay for
+ # /bin/sleep command to
+ # take in account prologue and epilogue scripts execution
+ # int walltimeAdditionalDelay = 120; additional delay
+
+ desired_walltime = int(walltime[0])*3600 + int(walltime[1]) * 60 +\
+ int(walltime[2])
+ total_walltime = desired_walltime + 140 #+2 min 20
+ sleep_walltime = desired_walltime + 20 #+20 sec
+ logger.debug("SLABDRIVER \t__process_walltime desired_walltime %s\
+ total_walltime %s sleep_walltime %s "\
+ %(desired_walltime, total_walltime, \
+ sleep_walltime))
+ #Put the walltime back in str form
+ #First get the hours
+ walltime[0] = str(total_walltime / 3600)
+ total_walltime = total_walltime - 3600 * int(walltime[0])
+ #Get the remaining minutes
+ walltime[1] = str(total_walltime / 60)
+ total_walltime = total_walltime - 60 * int(walltime[1])
+ #Get the seconds
+ walltime[2] = str(total_walltime)
+ logger.debug("SLABDRIVER \t__process_walltime walltime %s "\
+ %(walltime))
+ else:
+ #automatically set 10min +2 min 20
+ walltime[0] = '0'
+ walltime[1] = '12'
+ walltime[2] = '20'
+ sleep_walltime = '620'
+
+ return walltime, sleep_walltime
+
+
+ def __transforms_timestamp_into_date(xp_utc_timestamp = None):
+ """ Transforms unix timestamp into valid OAR date format """
+
+ #Used in case of a scheduled experiment (not immediate)
+ #To run an XP immediately, don't specify date and time in RSpec
+ #They will be set to None.
+ if xp_utc_timestamp:
+ #transform the xp_utc_timestamp into server readable time
+ xp_server_readable_date = datetime.fromtimestamp(int(\
+ xp_utc_timestamp)).strftime(self.time_format)
+
+ return xp_server_readable_date
+
+ else:
+ return None
+
def LaunchExperimentOnOAR(self, slice_dict, added_nodes, slice_user=None):
-
+ """ Creates the structure needed for a correct POST on OAR.
+ Makes the timestamp transformation into the appropriate format.
+ Sends the POST request to create the job with the resources in
+ added_nodes.
+
+ """
site_list = []
nodeid_list =[]
resource = ""
slice_name = slice_dict['name']
try:
slot = slice_dict['timeslot']
- logger.debug("SLABDRIVER.PY \tLaunchExperimentOnOAR slot %s" %(slot))
+ logger.debug("SLABDRIVER.PY \tLaunchExperimentOnOAR \
+ slot %s" %(slot))
except KeyError:
#Running on default parameters
#XP immediate , 10 mins
- slot = {'date':None,'start_time':None, 'timezone':None,'duration':None }#10 min
+ slot = { 'date':None, 'start_time':None,
+ 'timezone':None, 'duration':None }#10 min
reqdict['workdir']= '/tmp'
reqdict['resource'] ="{network_address in ("
- #reqdict['property'] ="network_address in ("
+
for node in added_nodes:
- print>>sys.stderr, "\r\n \r\n \t\tLaunchExperimentOnOAR node %s" %(node)
-
- #Get the ID of the node : remove the root auth and put the site in a separate list
- #s=node.split(".")
- # NT: it's not clear for me if the nodenames will have the senslab prefix
- # so lets take the last part only, for now.
- #lastpart=s[-1]
- #if s[0] == self.root_auth :
- # Again here it's not clear if nodes will be prefixed with <site>_, lets split and tanke the last part for now.
+ logger.debug("OARrestapi \tLaunchExperimentOnOAR \
+ node %s" %(node))
+
+ #Get the ID of the node : remove the root auth and put
+ # the site in a separate list.
+ # NT: it's not clear for me if the nodenames will have the senslab
+ #prefix so lets take the last part only, for now.
+
+ # Again here it's not clear if nodes will be prefixed with <site>_,
+ #lets split and tanke the last part for now.
#s=lastpart.split("_")
- #nodeid=s[-1]
+
nodeid = node
reqdict['resource'] += "'"+ nodeid +"', "
nodeid_list.append(nodeid)
-
- reqdict['resource'] = reqdict['resource'][0: len( reqdict['resource'])-2] +")}/nodes=" + str(len(nodeid_list))
- if slot['duration']:
- walltime = slot['duration'].split(":")
- # Fixing the walltime by adding a few delays. First put the walltime in seconds
- # oarAdditionalDelay = 20; additional delay for /bin/sleep command to
- # take in account prologue and epilogue scripts execution
- # int walltimeAdditionalDelay = 120; additional delay
-
- desired_walltime = int(walltime[0])*3600 + int(walltime[1]) * 60 + int(walltime[2])
- total_walltime = desired_walltime + 140 #+2 min 20
- sleep_walltime = desired_walltime + 20 #+20 sec
- print>>sys.stderr, "\r\n \r\n \t\tLaunchExperimentOnOAR desired_walltime %s total_walltime %s sleep_walltime %s " %(desired_walltime,total_walltime,sleep_walltime)
- #Put the walltime back in str form
- #First get the hours
- walltime[0] = str(total_walltime / 3600)
- total_walltime = total_walltime - 3600 * int(walltime[0])
- #Get the remaining minutes
- walltime[1] = str(total_walltime / 60)
- total_walltime = total_walltime - 60 * int(walltime[1])
- #Get the seconds
- walltime[2] = str(total_walltime)
- print>>sys.stderr, "\r\n \r\n \t\tLaunchExperimentOnOAR walltime %s " %(walltime)
-
- reqdict['resource']+= ",walltime=" + str(walltime[0]) + ":" + str(walltime[1]) + ":" + str(walltime[2])
- reqdict['script_path'] = "/bin/sleep " + str(sleep_walltime)
- else:
- reqdict['resource']+= ",walltime=" + str(00) + ":" + str(12) + ":" + str(20) #+2 min 20
- reqdict['script_path'] = "/bin/sleep 620" #+20 sec
+ custom_length = len(reqdict['resource'])- 2
+ reqdict['resource'] = reqdict['resource'][0:custom_length] + \
+ ")}/nodes=" + str(len(nodeid_list))
+
+ #if slot['duration']:
+ walltime, sleep_walltime = self.__process_walltime(duration = \
+ slot['duration'])
+ #else:
+ #walltime, sleep_walltime = self.__process_walltime(duration = None)
+
+ reqdict['resource']+= ",walltime=" + str(walltime[0]) + \
+ ":" + str(walltime[1]) + ":" + str(walltime[2])
+ reqdict['script_path'] = "/bin/sleep " + str(sleep_walltime)
+
+
+
#In case of a scheduled experiment (not immediate)
#To run an XP immediately, don't specify date and time in RSpec
#They will be set to None.
if slot['timezone'] is '' or slot['timezone'] is None:
#assume it is server timezone
from_zone=tz.gettz(server_tz)
- print>>sys.stderr, "\r\n \r\n \t\tLaunchExperimentOnOAR timezone not specified server_tz %s from_zone %s" %(server_tz,from_zone)
+ logger.warning("SLABDRIVER \tLaunchExperimentOnOAR timezone \
+ not specified server_tz %s from_zone %s" \
+ %(server_tz,from_zone))
else:
- #Get zone of the user from the reservation time given in the rspec
+ #Get zone of the user from the reservation time given
+ #in the rspec
from_zone = tz.gettz(slot['timezone'])
date = str(slot['date']) + " " + str(slot['start_time'])
user_datetime = user_datetime.replace(tzinfo = from_zone)
#Convert to server zone
- #to_zone = tz.tzutc()
+
to_zone = tz.gettz(server_tz)
reservation_date = user_datetime.astimezone(to_zone)
#Readable time accpeted by OAR
reqdict['reservation']= reservation_date.strftime(self.time_format)
- print>>sys.stderr, "\r\n \r\n \t\tLaunchExperimentOnOAR reqdict['reservation'] %s " %(reqdict['reservation'])
+ logger.debug("SLABDRIVER \tLaunchExperimentOnOAR reqdict['reservation'] %s " %(reqdict['reservation']))
else:
- # Immediate XP
- # reservations are performed in the oar server timebase, so :
- # 1- we get the server time(in UTC tz )/server timezone
- # 2- convert the server UTC time in its timezone
- # 3- add a custom delay to this time
- # 4- convert this time to a readable form and it for the reservation request.
- server_timestamp,server_tz = self.GetTimezone()
- s_tz=tz.gettz(server_tz)
- UTC_zone = tz.gettz("UTC")
- #weird... datetime.fromtimestamp should work since we do from datetime import datetime
- utc_server= datetime.fromtimestamp(float(server_timestamp)+20,UTC_zone)
- server_localtime=utc_server.astimezone(s_tz)
-
- print>>sys.stderr, "\r\n \r\n \t\tLaunchExperimentOnOAR server_timestamp %s server_tz %s slice_name %s added_nodes %s username %s reqdict %s " %(server_timestamp,server_tz,slice_name,added_nodes,slice_user, reqdict )
- readable_time = server_localtime.strftime(self.time_format)
-
- print >>sys.stderr," \r\n \r\n \t\t\t\tAPRES ParseTimezone readable_time %s timestanp %s " %(readable_time ,server_timestamp)
- reqdict['reservation'] = readable_time
+ # Immediate XP. Not need to add special parameters.
+ # normally not used in SFA
+
+ pass
reqdict['type'] = "deploy"
logger.debug("SLABDRIVER.PY \tLaunchExperimentOnOAR reqdict %s \r\n site_list %s" %(reqdict,site_list) )
answer = self.oar.POSTRequestToOARRestAPI('POST_job',reqdict,slice_user)
- print>>sys.stderr, "\r\n \r\n LaunchExperimentOnOAR jobid %s " %(answer)
+ logger.debug("SLABDRIVER \tLaunchExperimentOnOAR jobid %s " %(answer))
try:
jobid = answer['id']
except KeyError:
- print>>sys.stderr, "\r\n AddSliceTonode Impossible to create job %s " %( answer)
+ logger.log_exc("SLABDRIVER \tLaunchExperimentOnOAR Impossible to create job %s " %(answer))
return
- print>>sys.stderr, "\r\n \r\n LaunchExperimentOnOAR jobid %s added_nodes %s slice_user %s" %(jobid,added_nodes,slice_user)
+ logger.debug("SLABDRIVER \tLaunchExperimentOnOAR jobid %s added_nodes %s slice_user %s" %(jobid,added_nodes,slice_user))
self.db.update_job( slice_name, jobid ,added_nodes)
-
+
+ def GetLeases(self, lease_filter=None, return_fields_list=None):
+ reservation_list = self.GetReservedNodes()
+ #Find the slice associated with this user senslab ldap uid
+ for resa in reservation_list:
+ ldap_info = self.ldap.LdapSearch('(uid='+resa['user']+')')
+ user = dbsession.query(RegUser).filter_by(email = ldap_info['mail']).first()
+ slice_info = slab_dbsession.query(SliceSenslab).filter_by(record_id_user = user.record_id).first()
+ #Put the slice_urn
+ resa['slice_id'] = hrn_to_urn(slice_info.slice_hrn, 'slice')
+ resa['component_id_list'] = []
+ #Transform the hostnames into urns (component ids)
+ for hostname in resa['reserved_nodes_hostnames']:
+ resa['component_id_list'].append(hostname_to_urn(self.hrn, self.root_auth, hostname))
+
+ return resa
def augment_records_with_testbed_info (self, sfa_records):
return self.fill_record_info (sfa_records)