First input in handling leases.
authorSandrine Avakian <sandrine.avakian@inria.fr>
Fri, 29 Jun 2012 12:41:12 +0000 (14:41 +0200)
committerSandrine Avakian <sandrine.avakian@inria.fr>
Fri, 29 Jun 2012 12:41:12 +0000 (14:41 +0200)
Still dealing with bugs from the OAR migration.

sfa/senslab/OARrestapi.py
sfa/senslab/slabaggregate.py
sfa/senslab/slabdriver.py
sfa/senslab/slabslices.py

index 19b60c2..1b099bc 100644 (file)
@@ -133,9 +133,13 @@ class OARrestapi:
             #raise ServerError("Failed to parse Server Response:" + answer)
 
 
-#def AddNodeNetworkAddr(self,tuplelist,value):
-        #tuplelist.append(('hostname',str(value)))
-        
+
+def AddOarNodeId(tuplelist, value):
+    """ Adds Oar internal node id to the nodes attributes """
+    
+    tuplelist.append(('oar_id', int(value)))
+
+       
 def AddNodeNetworkAddr(dictnode, value):
     #Inserts new key. The value associated is a tuple list
     node_id = value
@@ -186,7 +190,7 @@ class OARGETParser:
         'posx': AddPosX,
         'posy': AddPosY,
         'state':AddBootState,
-        #'id' : AddNodeId,
+        'id' : AddOarNodeId,
         }
         
  
@@ -324,11 +328,18 @@ class OARGETParser:
 
     def ParseJobsIdResources(self):
         """ BROKEN since oar 2.5
-        Parses the json produced by the request /oarapi/jobs/id.json.
+        Parses the json produced by the request 
+        /oarapi/jobs/id/resources.json.
+        Returns a list of oar node ids that are scheduled for the 
+        given job id.
         
         """
+        job_resources = []
+        for resource in self.raw_json['items']:
+            job_resources.append(resource['id'])
+            
         logger.debug("OARESTAPI \tParseJobsIdResources %s" %(self.raw_json))
-        return self.raw_json
+        return job_resources
             
     def ParseResources(self) :
         """ Parses the json produced by a get_resources request on oar."""
@@ -342,14 +353,24 @@ class OARGETParser:
         """  Returns an array containing the list of the reserved nodes """
     
         #resources are listed inside the 'items' list from the json
-        nodes = [] 
+        reservation_list = [] 
         print "ParseReservedNodes_%s" %(self.raw_json['items'])
-        for job in  self.raw_json['items']:
-            for node in job['nodes']:
-                print "ParseReservedNodes________node %s" %(node)
-                logger.debug("ParseReservedNodes________node %s" %(node))  
-                nodes.append(node['network_address'])
-        return nodes
+        job = {}
+        #Parse resources info
+        for json_element in  self.raw_json['items']:
+            job['t_from'] = json_element['scheduled_start']
+            #Get resources id list for the job
+            job['resource_ids'] = \
+                [ node_dict['id'] for node_dict in json_element['resources'] ]
+           
+            job['state'] = json_element['state'] 
+            job['lease_id'] = json_element['id'] 
+            job['t_until'] = json_element['scheduled_start'] + \
+                                                    json_element['walltime']
+            job['user'] = json_element['owner']
+            logger.debug("ParseReservedNodes________job %s" %(job))  
+            reservation_list.append(job)
+        return reservation_list
     
     def ParseRunningJobs(self): 
         """ Gets the list of nodes currently in use from the attributes of the
@@ -418,7 +439,6 @@ class OARGETParser:
             # dictionary is empty and/or a new node has to be inserted  
             node_id = self.resources_fulljson_dict['network_address'](\
                                 self.node_dictlist, dictline['network_address']) 
-            #node_id = self.resources_fulljson_dict['network_address'](self,self.node_dictlist, dictline['network_address'])
             for k in keys:
                 if k in dictline:
                     if k == 'network_address':
@@ -426,8 +446,7 @@ class OARGETParser:
                  
                     self.resources_fulljson_dict[k](\
                                     self.node_dictlist[node_id], dictline[k])
-                    #self.resources_fulljson_dict[k](self,self.node_dictlist[node_id], dictline[k])
-            
+
             #The last property has been inserted in the property tuple list, 
             #reset node_id 
             #Turn the property tuple list (=dict value) into a dictionary
@@ -465,9 +484,9 @@ class OARGETParser:
             node  = self.node_dictlist[node_id]
             node.update({'hrn':self.hostname_to_hrn(self.interface_hrn, \
                                             node['site'],node['hostname'])})
-            #node['hrn'] = self.hostname_to_hrn(self.interface_hrn, node['site_login_base'],node['hostname'])
+
             self.node_dictlist.update({node_id:node})
-            #if node_id is 1:
+
             if node['site'] not in self.site_dict:
                 self.site_dict[node['site']] = {
                     'site':node['site'],
index 379e0c1..5effcbe 100644 (file)
@@ -35,7 +35,7 @@ class SlabAggregate:
     links = {}
     node_tags = {}
     
-    prepared=False
+    prepared = False
 
     user_options = {}
     
@@ -61,7 +61,7 @@ class SlabAggregate:
         if isinstance(slice, list):
             slice = slices[0]
         else:
-           slice =slices
+           slice = slices
 
         # sort slivers by node id , if there is a job
         #and therfore, node allocated to this slice
@@ -74,7 +74,7 @@ class SlabAggregate:
                                     'name': slice['slice_hrn'],
                                     'type': 'slab-node', 
                                     'tags': []})
-                    slivers[node_id]= sliver
+                    slivers[node_id] = sliver
             except KeyError:
                     print>>sys.stderr, " \r\n \t\t get_slice_and_slivers KeyError "
         ## sort sliver attributes by node id    
@@ -140,7 +140,7 @@ class SlabAggregate:
         #node_tags = self.get_node_tags(tags_filter)
        
 
-        reserved_nodes=self.driver.GetReservedNodes()
+        reserved_nodes = self.driver.GetNodesCurrentlyInUse()
         rspec_nodes = []
         for node in nodes:
             # skip whitelisted nodes
@@ -211,6 +211,7 @@ class SlabAggregate:
         
         return (rspec_nodes)       
 
+        
 #from plc/aggregate.py 
     def get_rspec(self, slice_xrn=None, version = None, options={}):
 
index e320113..dd6a840 100644 (file)
@@ -11,7 +11,7 @@ from sfa.util.defaultdict import defaultdict
 
 from sfa.storage.record import Record
 from sfa.storage.alchemy import dbsession
-from sfa.storage.model import RegRecord
+from sfa.storage.model import RegRecord, RegUser
 
 from sfa.trust.credential import Credential
 from sfa.trust.gid import GID
@@ -21,7 +21,8 @@ from sfa.rspecs.version_manager import VersionManager
 from sfa.rspecs.rspec import RSpec
 
 from sfa.util.xrn import hrn_to_urn, urn_to_sliver_id
-from sfa.planetlab.plxrn import slicename_to_hrn, hostname_to_hrn, hrn_to_pl_slicename
+from sfa.planetlab.plxrn import slicename_to_hrn, hostname_to_hrn, \
+                                        hrn_to_pl_slicename, hostname_to_urn
 
 ## thierry: everything that is API-related (i.e. handling incoming requests) 
 # is taken care of 
@@ -45,7 +46,7 @@ class SlabDriver(Driver):
 
     def __init__(self, config):
         Driver.__init__ (self, config)
-        self.config=config
+        self.config = config
         self.hrn = config.SFA_INTERFACE_HRN
 
         self.root_auth = config.SFA_REGISTRY_ROOT_AUTH
@@ -476,7 +477,7 @@ class SlabDriver(Driver):
         #Get job info from OAR    
         job_info = self.oar.parser.SendRequest(req, job_id, username)
 
-        logger.debug("SLABDRIVER \t GetJobs  %s " %(job_info))
+        logger.debug("SLABDRIVER \t GetJobsId  %s " %(job_info))
         try:
             if job_info['state'] == 'Terminated':
                 logger.debug("SLABDRIVER \t GetJobsId job %s TERMINATED"\
@@ -509,15 +510,18 @@ class SlabDriver(Driver):
         req = "GET_jobs_id_resources"
         node_list_k = 'reserved_resources' 
                
-        #Get job info from OAR    
-        job_info = self.oar.parser.SendRequest(req, job_id, username)
-        logger.debug("SLABDRIVER \t GetJobsResources  %s " %(job_info))
+        #Get job resources list from OAR    
+        node_id_list = self.oar.parser.SendRequest(req, job_id, username)
+        logger.debug("SLABDRIVER \t GetJobsResources  %s " %(node_id_list))
         
-        parsed_job_info  = self.get_info_on_reserved_nodes(job_info,node_list_k)
+        hostname_list = \
+            self.__get_hostnames_from_oar_node_ids(node_id_list)
+        
+        #parsed_job_info  = self.get_info_on_reserved_nodes(job_info,node_list_k)
         #Replaces the previous entry "assigned_network_address" / "reserved_resources"
         #with "node_ids"
-        job_info.update({'node_ids':parsed_job_info[node_list_k]})
-        del job_info[node_list_k]
+        job_info = {'node_ids':hostname_list}
+
         return job_info
 
             
@@ -544,11 +548,35 @@ class SlabDriver(Driver):
             logger.error("SLABDRIVER \t get_info_on_reserved_nodes KEYERROR " )
             
         return reserved_node_hostname_list  
-              
+            
+    def GetNodesCurrentlyInUse(self):
+        """Returns a list of all the nodes already involved in an oar job"""
+        return self.oar.parser.SendRequest("GET_running_jobs") 
+    
+    def __get_hostnames_from_oar_node_ids(self, resource_id_list ):
+        full_nodes_dict_list = self.GetNodes()
+        #Put the full node list into a dictionary keyed by oar node id
+        oar_id_node_dict = {}
+        for node in full_nodes_dict_list:
+            oar_id_node_dict[node['oar_id']] = node
+        
+        hostname_list = []
+        for resource_id in resource_id_list:
+            hostname_list.append(oar_id_node_dict[resource_id]['hostname'])
+        return  hostname_list
+        
     def GetReservedNodes(self):
-        # this function returns a list of all the nodes already involved in an oar job
-       return self.oar.parser.SendRequest("GET_running_jobs") 
-
+        #Get the nodes in use and the reserved nodes
+        reservation_dict_list = self.oar.parser.SendRequest("GET_reserved_nodes")
+        
+        oar_node_id_dict = self.__get_oar_node_ids()
+        
+        for resa in reservation_dict_list:
+            logger.debug ("GetReservedNodes resa %s"%(resa))
+            resa['reserved_nodes_hostnames'] = \
+                self.__get_hostnames_from_oar_node_ids(resa['resource_ids'])
+            del resa['resource_ids']
+        return reservation_dict_list
      
     def GetNodes(self,node_filter_dict = None, return_fields_list = None):
         """
@@ -655,7 +683,7 @@ class SlabDriver(Driver):
                 except KeyError:
                     pass
                 
-                #print >>sys.stderr, " \r\n \r\n \tSLABDRIVER.PY  GetSlices  rec  %s" %(rec)
+                logger.debug("SLABDRIVER.PY  GetSlices  rec  %s" %(rec))
                               
             return rec
                 
@@ -675,7 +703,7 @@ class SlabDriver(Driver):
 
         
     
-    def testbed_name (self): return "senslab2" 
+    def testbed_name (self): return self.hrn
          
     # 'geni_request_rspec_versions' and 'geni_ad_rspec_versions' are mandatory
     def aggregate_version (self):
@@ -758,9 +786,71 @@ class SlabDriver(Driver):
 
         return slab_record
 
-                   
+    def __process_walltime(self,duration=None):
+        """ Calculates the walltime in seconds from the duration in H:M:S
+         specified in the RSpec.
+         
+        """
+        if duration:
+            walltime = duration.split(":")
+            # Fixing the walltime by adding a few delays. First put the walltime 
+            # in seconds oarAdditionalDelay = 20; additional delay for 
+            # /bin/sleep command to
+            # take in account  prologue and epilogue scripts execution
+            # int walltimeAdditionalDelay = 120;  additional delay
+
+            desired_walltime = int(walltime[0])*3600 + int(walltime[1]) * 60 +\
+                                                                int(walltime[2])
+            total_walltime = desired_walltime + 140 #+2 min 20
+            sleep_walltime = desired_walltime + 20 #+20 sec
+            logger.debug("SLABDRIVER \t__process_walltime desired_walltime %s\
+                                    total_walltime %s sleep_walltime %s  "\
+                                     %(desired_walltime, total_walltime, \
+                                                        sleep_walltime))
+            #Put the walltime back in str form
+            #First get the hours
+            walltime[0] = str(total_walltime / 3600)
+            total_walltime = total_walltime - 3600 * int(walltime[0])
+            #Get the remaining minutes
+            walltime[1] = str(total_walltime / 60)
+            total_walltime = total_walltime - 60 * int(walltime[1])
+            #Get the seconds
+            walltime[2] = str(total_walltime)
+            logger.debug("SLABDRIVER \t__process_walltime walltime %s "\
+                                         %(walltime))
+        else:
+            #automatically set 10min  +2 min 20
+            walltime[0] = '0'
+            walltime[1] = '12' 
+            walltime[2] = '20'
+            sleep_walltime = '620'
+            
+        return walltime, sleep_walltime
+
+            
+    def __transforms_timestamp_into_date(xp_utc_timestamp = None):
+        """ Transforms unix timestamp into valid OAR date format """
+        
+        #Used in case of a scheduled experiment (not immediate)
+        #To run an XP immediately, don't specify date and time in RSpec 
+        #They will be set to None. 
+        if xp_utc_timestamp:
+            #transform the xp_utc_timestamp into server readable time  
+            xp_server_readable_date = datetime.fromtimestamp(int(\
+                                xp_utc_timestamp)).strftime(self.time_format)
+
+            return xp_server_readable_date
+            
+        else:
+           return None
+                               
     def LaunchExperimentOnOAR(self,  slice_dict, added_nodes, slice_user=None):
-       
+        """ Creates the structure needed for a correct POST on OAR.
+        Makes the timestamp transformation into the appropriate format.
+        Sends the POST request to create the job with the resources in 
+        added_nodes.
+        
+        """
         site_list = []
         nodeid_list =[]
         resource = ""
@@ -768,60 +858,50 @@ class SlabDriver(Driver):
         slice_name = slice_dict['name']
         try:
             slot = slice_dict['timeslot'] 
-            logger.debug("SLABDRIVER.PY \tLaunchExperimentOnOAR slot %s" %(slot))
+            logger.debug("SLABDRIVER.PY \tLaunchExperimentOnOAR \
+                                                    slot %s" %(slot))
         except KeyError:
             #Running on default parameters
             #XP immediate , 10 mins
-            slot = {'date':None,'start_time':None, 'timezone':None,'duration':None }#10 min 
+            slot = {    'date':None, 'start_time':None,
+                        'timezone':None, 'duration':None }#10 min 
         
         reqdict['workdir']= '/tmp'   
         reqdict['resource'] ="{network_address in ("   
-        #reqdict['property'] ="network_address in ("
+
         for node in added_nodes: 
-            print>>sys.stderr, "\r\n \r\n \t\tLaunchExperimentOnOAR  node %s" %(node)
-
-            #Get the ID of the node : remove the root auth and put the site in a separate list
-            #s=node.split(".")
-            # NT: it's not clear for me if the nodenames will have the senslab prefix
-            # so lets take the last part only, for now.
-            #lastpart=s[-1]
-            #if s[0] == self.root_auth :
-            # Again here it's not clear if nodes will be prefixed with <site>_, lets split and tanke the last part for now.
+            logger.debug("OARrestapi \tLaunchExperimentOnOAR \
+                                                            node %s" %(node))
+
+            #Get the ID of the node : remove the root auth and put 
+            # the site in a separate list.
+            # NT: it's not clear for me if the nodenames will have the senslab 
+            #prefix so lets take the last part only, for now.
+
+            # Again here it's not clear if nodes will be prefixed with <site>_, 
+            #lets split and tanke the last part for now.
             #s=lastpart.split("_")
-            #nodeid=s[-1]
+
             nodeid = node
             reqdict['resource'] += "'"+ nodeid +"', "
             nodeid_list.append(nodeid)
 
-
-        reqdict['resource'] =  reqdict['resource'][0: len( reqdict['resource'])-2] +")}/nodes=" + str(len(nodeid_list))
-        if slot['duration']:
-            walltime = slot['duration'].split(":")
-            # Fixing the walltime by adding a few delays. First put the walltime in seconds
-            # oarAdditionalDelay = 20; additional delay for /bin/sleep command to
-            # take in account  prologue and epilogue scripts execution
-            # int walltimeAdditionalDelay = 120;  additional delay
-
-            desired_walltime =  int(walltime[0])*3600 + int(walltime[1]) * 60 + int(walltime[2])
-            total_walltime = desired_walltime + 140 #+2 min 20
-            sleep_walltime = desired_walltime + 20 #+20 sec
-            print>>sys.stderr, "\r\n \r\n \t\tLaunchExperimentOnOAR desired_walltime %s  total_walltime %s sleep_walltime %s  " %(desired_walltime,total_walltime,sleep_walltime)
-            #Put the walltime back in str form
-            #First get the hours
-            walltime[0] = str(total_walltime / 3600)
-            total_walltime = total_walltime - 3600 * int(walltime[0])
-            #Get the remaining minutes
-            walltime[1] = str(total_walltime / 60)
-            total_walltime =  total_walltime - 60 * int(walltime[1])
-            #Get the seconds
-            walltime[2] = str(total_walltime)
-            print>>sys.stderr, "\r\n \r\n \t\tLaunchExperimentOnOAR  walltime %s " %(walltime)
-
-            reqdict['resource']+= ",walltime=" + str(walltime[0]) + ":" + str(walltime[1]) + ":" + str(walltime[2]) 
-            reqdict['script_path'] = "/bin/sleep " + str(sleep_walltime)
-        else:
-            reqdict['resource']+= ",walltime=" + str(00) + ":" + str(12) + ":" + str(20) #+2 min 20
-            reqdict['script_path'] = "/bin/sleep 620" #+20 sec    
+        custom_length = len(reqdict['resource'])- 2
+        reqdict['resource'] = reqdict['resource'][0:custom_length] + \
+                                            ")}/nodes=" + str(len(nodeid_list))
+        
+        #if slot['duration']:
+        walltime, sleep_walltime = self.__process_walltime(duration = \
+                                                            slot['duration'])
+        #else: 
+            #walltime, sleep_walltime = self.__process_walltime(duration = None)
+            
+        reqdict['resource']+= ",walltime=" + str(walltime[0]) + \
+                            ":" + str(walltime[1]) + ":" + str(walltime[2])
+        reqdict['script_path'] = "/bin/sleep " + str(sleep_walltime)
+       
+                
+                
         #In case of a scheduled experiment (not immediate)
         #To run an XP immediately, don't specify date and time in RSpec 
         #They will be set to None. 
@@ -830,9 +910,12 @@ class SlabDriver(Driver):
             if slot['timezone'] is '' or slot['timezone'] is None:
                 #assume it is server timezone
                 from_zone=tz.gettz(server_tz) 
-                print>>sys.stderr, "\r\n \r\n \t\tLaunchExperimentOnOAR  timezone not specified  server_tz %s from_zone  %s" %(server_tz,from_zone) 
+                logger.warning("SLABDRIVER \tLaunchExperimentOnOAR  timezone \
+                not specified  server_tz %s from_zone  %s" \
+                %(server_tz,from_zone)) 
             else:
-                #Get zone of the user from the reservation time given in the rspec
+                #Get zone of the user from the reservation time given 
+                #in the rspec
                 from_zone = tz.gettz(slot['timezone'])  
                    
             date = str(slot['date'])  + " " + str(slot['start_time'])
@@ -840,33 +923,19 @@ class SlabDriver(Driver):
             user_datetime = user_datetime.replace(tzinfo = from_zone)
             
             #Convert to server zone
-            #to_zone = tz.tzutc()
+
             to_zone = tz.gettz(server_tz)
             reservation_date = user_datetime.astimezone(to_zone)
             #Readable time accpeted by OAR
             reqdict['reservation']= reservation_date.strftime(self.time_format)
         
-            print>>sys.stderr, "\r\n \r\n \t\tLaunchExperimentOnOAR  reqdict['reservation'] %s " %(reqdict['reservation'])
+            logger.debug("SLABDRIVER \tLaunchExperimentOnOAR  reqdict['reservation'] %s " %(reqdict['reservation']))
             
         else:
-            # Immediate XP
-            # reservations are performed in the oar server timebase, so :
-            # 1- we get the server time(in UTC tz )/server timezone
-            # 2- convert the server UTC time in its timezone
-            # 3- add a custom delay to this time
-            # 4- convert this time to a readable form and it for the reservation request.
-            server_timestamp,server_tz = self.GetTimezone()
-            s_tz=tz.gettz(server_tz)
-            UTC_zone = tz.gettz("UTC")
-            #weird... datetime.fromtimestamp should work since we do from datetime import datetime
-            utc_server= datetime.fromtimestamp(float(server_timestamp)+20,UTC_zone)
-            server_localtime=utc_server.astimezone(s_tz)
-    
-            print>>sys.stderr, "\r\n \r\n \t\tLaunchExperimentOnOAR server_timestamp %s server_tz %s slice_name %s added_nodes %s username %s reqdict %s " %(server_timestamp,server_tz,slice_name,added_nodes,slice_user, reqdict )
-            readable_time = server_localtime.strftime(self.time_format)
-
-            print >>sys.stderr,"  \r\n \r\n \t\t\t\tAPRES ParseTimezone readable_time %s timestanp %s  " %(readable_time ,server_timestamp)
-            reqdict['reservation'] = readable_time
+            # Immediate XP. Not need to add special parameters.
+            # normally not used in SFA
+       
+            pass
         
 
         reqdict['type'] = "deploy" 
@@ -878,14 +947,14 @@ class SlabDriver(Driver):
         logger.debug("SLABDRIVER.PY \tLaunchExperimentOnOAR reqdict   %s \r\n site_list   %s"  %(reqdict,site_list) )  
        
         answer = self.oar.POSTRequestToOARRestAPI('POST_job',reqdict,slice_user)
-        print>>sys.stderr, "\r\n \r\n LaunchExperimentOnOAR jobid   %s "  %(answer)
+        logger.debug("SLABDRIVER \tLaunchExperimentOnOAR jobid   %s "  %(answer))
         try:       
             jobid = answer['id']
         except KeyError:
-             print>>sys.stderr, "\r\n AddSliceTonode Impossible to create job  %s "  %( answer)
+             logger.log_exc("SLABDRIVER \tLaunchExperimentOnOAR Impossible to create job  %s "  %(answer))
              return
         
-        print>>sys.stderr, "\r\n \r\n LaunchExperimentOnOAR jobid    %s added_nodes  %s slice_user %s"  %(jobid,added_nodes,slice_user)
+        logger.debug("SLABDRIVER \tLaunchExperimentOnOAR jobid    %s added_nodes  %s slice_user %s"  %(jobid,added_nodes,slice_user))
         self.db.update_job( slice_name, jobid ,added_nodes)
         
           
@@ -923,7 +992,22 @@ class SlabDriver(Driver):
     
  
 
+    
+    def GetLeases(self, lease_filter=None, return_fields_list=None):
+        reservation_list = self.GetReservedNodes()
+        #Find the slice associated with this user senslab ldap uid
+        for resa in reservation_list:
+            ldap_info = self.ldap.LdapSearch('(uid='+resa['user']+')')
+            user = dbsession.query(RegUser).filter_by(email = ldap_info['mail']).first()
+            slice_info = slab_dbsession.query(SliceSenslab).filter_by(record_id_user = user.record_id).first()
+            #Put the slice_urn 
+            resa['slice_id'] = hrn_to_urn(slice_info.slice_hrn, 'slice')
+            resa['component_id_list'] = []
+            #Transform the hostnames into urns (component ids)
+            for hostname in resa['reserved_nodes_hostnames']:
+                resa['component_id_list'].append(hostname_to_urn(self.hrn,  self.root_auth, hostname))
+
+        return resa
             
     def augment_records_with_testbed_info (self, sfa_records):
         return self.fill_record_info (sfa_records)
index 261c58c..f23f96d 100644 (file)
@@ -49,6 +49,7 @@ class SlabSlices:
  
 
         # Get user information
+        #TODO
         alchemy_person = dbsession.query(RegRecord).filter_by(record_id = slice['record_id_user']).first()
 
         slivers = []
@@ -167,7 +168,8 @@ class SlabSlices:
             try:
                 slot = slice['timeslot']
                 self.driver.LaunchExperimentOnOAR(slice, added_nodes, username)
-            except KeyError:
+            except KeyError:  
+                logger.log_exc("SLABSLICES \tVERIFY_SLICE_NODES KeyError slice %s  " %(slice))
                 pass