Removed custom timeslot and duration to make a reservation on OAR.
authorSandrine Avakian <sandrine.avakian@inria.fr>
Wed, 1 Aug 2012 14:55:08 +0000 (16:55 +0200)
committerSandrine Avakian <sandrine.avakian@inria.fr>
Wed, 1 Aug 2012 14:55:08 +0000 (16:55 +0200)
Now using SFA format with leases to make reservation.
Refactored code: Shrinked LaunchExperimentOnOAR and created new functions
to create the request, process the walltime, configure the experiment
and launch senslab experiment using the java lib.
Launching an experiment asap still possible, although this is not supposed
to be used in SFA.

sfa/rspecs/versions/slabv1.py
sfa/senslab/slabdriver.py
sfa/senslab/slabslices.py

index f02bad3..bba4484 100644 (file)
@@ -97,18 +97,16 @@ class Slabv1(RSpecVersion):
     def get_slice_attributes(self, network=None):
         
         slice_attributes = []
-        slot = self.get_slice_timeslot()
+
         nodes_with_slivers = self.get_nodes_with_slivers()
-        slice_attributes.append({'timeslot':slot})
-        #slice_attributes.append({'name': 'timeslot', 'value' : slot})
-        print>>sys.stderr, "\r\n \r\n \r\n \t\t SLABV1.PY get_slice_attributes -----------------nodes_with_slivers %s "%(nodes_with_slivers)
+
         # TODO: default sliver attributes in the PG rspec?
         default_ns_prefix = self.namespaces['default']
         for node in nodes_with_slivers:
             sliver_attributes = self.get_sliver_attributes(node['component_id'],node, network)
             for sliver_attribute in sliver_attributes:
-                name=str(sliver_attribute[0])
-                text =str(sliver_attribute[1])
+                name = str(sliver_attribute[0])
+                text = str(sliver_attribute[1])
                 attribs = sliver_attribute[2]
                 # we currently only suppor the <initscript> and <flack> attributes
                 #if  'info' in name:
@@ -123,8 +121,7 @@ class Slabv1(RSpecVersion):
                     attribute = {'name': 'initscript', 'value': value, 'node_id': node}
                     slice_attributes.append(attribute)
           
-                    
-        print>>sys.stderr, "\r\n \r\n \r\n \t\t SLABV1.PY get_slice_attributes ----------------- slice_attributes %s "%(slice_attributes)
+
         return slice_attributes
 
     def attributes_list(self, elem):
index dc2cc5c..8e49b04 100644 (file)
@@ -135,7 +135,6 @@ class SlabDriver(Driver):
         
     def create_sliver (self, slice_urn, slice_hrn, creds, rspec_string, \
                                                              users, options):
-        logger.debug("SLABDRIVER.PY \tcreate_sliver ")
         aggregate = SlabAggregate(self)
         
         slices = SlabSlices(self)
@@ -160,19 +159,13 @@ class SlabDriver(Driver):
         sfa_slice = slices.verify_slice(slice_hrn, slice_record, peer, \
                                                     sfa_peer, options=options)
         requested_attributes = rspec.version.get_slice_attributes()
-        
-        if requested_attributes:
-            for attrib_dict in requested_attributes:
-                if 'timeslot' in attrib_dict and attrib_dict['timeslot'] \
-                                                                is not None:
-                    sfa_slice.update({'timeslot':attrib_dict['timeslot']})
+
         logger.debug("SLABDRIVER.PY create_sliver slice %s " %(sfa_slice))
         
         # ensure person records exists
         persons = slices.verify_persons(slice_hrn, sfa_slice, users, peer, \
                                                     sfa_peer, options=options)
         
-        # ensure slice attributes exists?
 
         
         # add/remove slice from nodes 
@@ -186,22 +179,48 @@ class SlabDriver(Driver):
         nodes = slices.verify_slice_nodes(sfa_slice, requested_slivers, peer) 
         
         # add/remove leases
-        requested_leases = []
+        requested_lease_list = []
         kept_leases = []
         for lease in rspec.version.get_leases():
-            requested_lease = {}
+            single_requested_lease = {}
+            logger.debug("SLABDRIVER.PY \tcreate_sliver lease %s " %(lease))
             if not lease.get('lease_id'):
-                requested_lease['hostname'] = \
+                single_requested_lease['hostname'] = \
                             slab_xrn_to_hostname(lease.get('component_id').strip())
-                requested_lease['start_time'] = lease.get('start_time')
-                requested_lease['duration'] = lease.get('duration')
+                single_requested_lease['start_time'] = lease.get('start_time')
+                single_requested_lease['duration'] = lease.get('duration')
             else:
                 kept_leases.append(int(lease['lease_id']))
-            if requested_lease.get('hostname'):
-                requested_leases.append(requested_lease)
+            if single_requested_lease.get('hostname'):
+                requested_lease_list.append(single_requested_lease)
+                
+        #dCreate dict of leases by start_time, regrouping nodes reserved at the same
+        #time, for the same amount of time = one job on OAR
+        requested_job_dict = {}
+        for lease in requested_lease_list:
+            
+            #In case it is an asap experiment start_time is empty
+            if lease['start_time'] == '':
+                lease['start_time'] = '0' 
+                
+            if lease['start_time'] not in requested_job_dict:
+                if isinstance(lease['hostname'], str):
+                    lease['hostname'] =  [lease['hostname']]
+                    
+                requested_job_dict[lease['start_time']] = lease
+                
+            else :
+                job_lease = requested_job_dict[lease['start_time']]
+                if lease['duration'] == job_lease['duration'] :
+                    job_lease['hostname'].append(lease['hostname'])
+                    
+          
                 
+                        
+        logger.debug("SLABDRIVER.PY \tcreate_sliver  requested_job_dict %s " %(requested_job_dict))    
+                   
         leases = slices.verify_slice_leases(sfa_slice, \
-                                    requested_leases, kept_leases, peer)
+                                    requested_job_dict, kept_leases, peer)
         
         return aggregate.get_rspec(slice_xrn=slice_urn, version=rspec.version)
         
@@ -920,151 +939,114 @@ class SlabDriver(Driver):
             
         else:
             return None
-                               
-    def LaunchExperimentOnOAR(self, slice_dict, added_nodes, slice_user=None):
-        """ Creates the structure needed for a correct POST on OAR.
-        Makes the timestamp transformation into the appropriate format.
-        Sends the POST request to create the job with the resources in 
-        added_nodes.
         
-        """
-        site_list = []
-        nodeid_list = []
-        resource = ""
-        reqdict = {}
-        slice_name = slice_dict['name']
-        try:
-            slot = slice_dict['timeslot'] 
-            logger.debug("SLABDRIVER.PY \tLaunchExperimentOnOAR \
-                                                    slot %s" %(slot))
-        except KeyError:
-            #Running on default parameters
-            #XP immediate , 10 mins
-            slot = {    'date':None, 'start_time':None,
-                        'timezone':None, 'duration':None }#10 min 
-        
-        reqdict['workdir'] = '/tmp'   
-        reqdict['resource'] = "{network_address in ("   
-
-        for node in added_nodes: 
-            logger.debug("OARrestapi \tLaunchExperimentOnOAR \
-                                                            node %s" %(node))
-
-            #Get the ID of the node : remove the root auth and put 
-            # the site in a separate list.
-            # NT: it's not clear for me if the nodenames will have the senslab 
-            #prefix so lets take the last part only, for now.
-
-            # Again here it's not clear if nodes will be prefixed with <site>_, 
-            #lets split and tanke the last part for now.
-            #s=lastpart.split("_")
-
-            nodeid = node
-            reqdict['resource'] += "'" + nodeid + "', "
-            nodeid_list.append(nodeid)
-
-        custom_length = len(reqdict['resource'])- 2
-        reqdict['resource'] = reqdict['resource'][0:custom_length] + \
-                                            ")}/nodes=" + str(len(nodeid_list))
-                                            
-        def __process_walltime(duration=None):
-            """ Calculates the walltime in seconds from the duration in H:M:S
-                specified in the RSpec.
-                
-            """
-            if duration:
-                walltime = duration.split(":")
-                # Fixing the walltime by adding a few delays. First put the walltime 
-                # in seconds oarAdditionalDelay = 20; additional delay for 
-                # /bin/sleep command to
-                # take in account  prologue and epilogue scripts execution
-                # int walltimeAdditionalDelay = 120;  additional delay
-        
-                desired_walltime = int(walltime[0])*3600 + int(walltime[1]) * 60 +\
-                                                                    int(walltime[2])
-                total_walltime = desired_walltime + 140 #+2 min 20
-                sleep_walltime = desired_walltime + 20 #+20 sec
-                logger.debug("SLABDRIVER \t__process_walltime desired_walltime %s\
-                                        total_walltime %s sleep_walltime %s  "\
-                                            %(desired_walltime, total_walltime, \
-                                                            sleep_walltime))
-                #Put the walltime back in str form
-                #First get the hours
-                walltime[0] = str(total_walltime / 3600)
-                total_walltime = total_walltime - 3600 * int(walltime[0])
-                #Get the remaining minutes
-                walltime[1] = str(total_walltime / 60)
-                total_walltime = total_walltime - 60 * int(walltime[1])
-                #Get the seconds
-                walltime[2] = str(total_walltime)
-                logger.debug("SLABDRIVER \t__process_walltime walltime %s "\
-                                                %(walltime))
-            else:
-                #automatically set 10min  +2 min 20
-                walltime[0] = '0'
-                walltime[1] = '12' 
-                walltime[2] = '20'
-                sleep_walltime = '620'
-                
-            return walltime, sleep_walltime
-                
-        #if slot['duration']:
-        walltime, sleep_walltime = __process_walltime(duration = \
-                                                            slot['duration'])
-        #else: 
-            #walltime, sleep_walltime = self.__process_walltime(duration = None)
+   
+
+             
+    def LaunchExperimentOnOAR(self, added_nodes, slice_name, \
+                        lease_start_time, lease_duration, slice_user=None):
+        lease_dict = {}
+        lease_dict['lease_start_time'] = lease_start_time
+        lease_dict['lease_duration'] = lease_duration
+        lease_dict['added_nodes'] = added_nodes
+        lease_dict['slice_name'] = slice_name
+        lease_dict['slice_user'] = slice_user
+        lease_dict['grain'] = self.GetLeaseGranularity()
+        lease_dict['time_format'] = self.time_format
+        
+        def __create_job_structure_request_for_OAR(lease_dict):
+            """ Creates the structure needed for a correct POST on OAR.
+            Makes the timestamp transformation into the appropriate format.
+            Sends the POST request to create the job with the resources in 
+            added_nodes.
             
-        reqdict['resource'] += ",walltime=" + str(walltime[0]) + \
-                            ":" + str(walltime[1]) + ":" + str(walltime[2])
-        reqdict['script_path'] = "/bin/sleep " + str(sleep_walltime)
-       
-                
-                
-        #In case of a scheduled experiment (not immediate)
-        #To run an XP immediately, don't specify date and time in RSpec 
-        #They will be set to None. 
-        server_timestamp, server_tz = self.GetTimezone()
-        if slot['date'] and slot['start_time']:
-            if slot['timezone'] is '' or slot['timezone'] is None:
-                #assume it is server timezone
-                from_zone = tz.gettz(server_tz) 
-                logger.warning("SLABDRIVER \tLaunchExperimentOnOAR  timezone \
-                not specified  server_tz %s from_zone  %s" \
-                %(server_tz, from_zone)) 
-            else:
-                #Get zone of the user from the reservation time given 
-                #in the rspec
-                from_zone = tz.gettz(slot['timezone'])  
-                   
-            date = str(slot['date']) + " " + str(slot['start_time'])
-            user_datetime = datetime.strptime(date, self.time_format)
-            user_datetime = user_datetime.replace(tzinfo = from_zone)
+            """
+    
+            site_list = []
+            nodeid_list = []
+            resource = ""
+            reqdict = {}
+    
             
-            #Convert to server zone
+            reqdict['workdir'] = '/tmp'   
+            reqdict['resource'] = "{network_address in ("   
+    
+            for node in lease_dict['added_nodes']: 
+                logger.debug("\r\n \r\n OARrestapi \t __create_job_structure_request_for_OAR \
+                                                                node %s" %(node))
+    
+                # Get the ID of the node 
+                nodeid = node
+                reqdict['resource'] += "'" + nodeid + "', "
+                nodeid_list.append(nodeid)
+    
+            custom_length = len(reqdict['resource'])- 2
+            reqdict['resource'] = reqdict['resource'][0:custom_length] + \
+                                                ")}/nodes=" + str(len(nodeid_list))
+    
+            def __process_walltime(duration):
+                """ Calculates the walltime in seconds from the duration in H:M:S
+                    specified in the RSpec.
+                    
+                """
+                if duration:
+                    # Fixing the walltime by adding a few delays. 
+                    # First put the walltime in seconds oarAdditionalDelay = 20;
+                    #  additional delay for /bin/sleep command to
+                    # take in account  prologue and epilogue scripts execution
+                    # int walltimeAdditionalDelay = 120;  additional delay
+                    desired_walltime = duration 
+                    total_walltime = desired_walltime + 140#+2 min 20
+                    sleep_walltime = desired_walltime + 20 #+20 sec
+                    walltime = []
+                    #Put the walltime back in str form
+                    #First get the hours
+                    walltime.append(str(total_walltime / 3600))
+                    total_walltime = total_walltime - 3600 * int(walltime[0])
+                    #Get the remaining minutes
+                    walltime.append(str(total_walltime / 60))
+                    total_walltime = total_walltime - 60 * int(walltime[1])
+                    #Get the seconds
+                    walltime.append(str(total_walltime))
+    
+                else:
+                    logger.log_exc(" __process_walltime duration null")
+                    
+                return walltime, sleep_walltime
+                    
 
-            to_zone = tz.gettz(server_tz)
-            reservation_date = user_datetime.astimezone(to_zone)
-            #Readable time accpeted by OAR
-            reqdict['reservation'] = reservation_date.strftime(self.time_format)
-        
-            logger.debug("SLABDRIVER \tLaunchExperimentOnOAR \
-                        reqdict['reservation'] %s " %(reqdict['reservation']))
-            
-        else:
-            # Immediate XP. Not need to add special parameters.
-            # normally not used in SFA
-       
-            pass
+            walltime, sleep_walltime = \
+                        __process_walltime(int(lease_dict['lease_duration'])*lease_dict['grain'])
+    
+    
+            reqdict['resource'] += ",walltime=" + str(walltime[0]) + \
+                                ":" + str(walltime[1]) + ":" + str(walltime[2])
+            reqdict['script_path'] = "/bin/sleep " + str(sleep_walltime)
+    
+            #In case of a scheduled experiment (not immediate)
+            #To run an XP immediately, don't specify date and time in RSpec 
+            #They will be set to None.
+            if lease_dict['lease_start_time'] is not '0':
+                #Readable time accepted by OAR
+                start_time = datetime.fromtimestamp(int(lease_dict['lease_start_time'])).\
+                                                        strftime(lease_dict['time_format'])
+                reqdict['reservation'] = start_time
+            #If there is not start time, Immediate XP. No need to add special 
+            # OAR parameters
+    
+    
+            reqdict['type'] = "deploy" 
+            reqdict['directory'] = ""
+            reqdict['name'] = "SFA_" + lease_dict['slice_user']
+    
+            return reqdict
         
-
-        reqdict['type'] = "deploy" 
-        reqdict['directory'] = ""
-        reqdict['name'] = "TestSandrine"
-       
-         
-        # first step : start the OAR job and update the job 
+                                   
+        #Create the request for OAR
+        reqdict = __create_job_structure_request_for_OAR(lease_dict)
+         # first step : start the OAR job and update the job 
         logger.debug("SLABDRIVER.PY \tLaunchExperimentOnOAR reqdict %s\
-                             \r\n site_list   %s"  %(reqdict, site_list))  
+                             \r\n "  %(reqdict))  
        
         answer = self.oar.POSTRequestToOARRestAPI('POST_job', \
                                                             reqdict, slice_user)
@@ -1076,38 +1058,63 @@ class SlabDriver(Driver):
                                 Impossible to create job  %s "  %(answer))
             return
         
-        logger.debug("SLABDRIVER \tLaunchExperimentOnOAR jobid %s \
-                added_nodes %s slice_user %s" %(jobid, added_nodes, slice_user))
-        self.db.update_job( slice_name, jobid, added_nodes)
         
-          
-        # second step : configure the experiment
-        # we need to store the nodes in a yaml (well...) file like this :
-        # [1,56,23,14,45,75] with name /tmp/sfa<jobid>.json
-        job_file = open('/tmp/sfa/'+ str(jobid) + '.json', 'w')
-        job_file.write('[')
-        job_file.write(str(added_nodes[0].strip('node')))
-        for node in added_nodes[1:len(added_nodes)] :
-            job_file.write(', '+ node.strip('node'))
-        job_file.write(']')
-        job_file.close()
-        
-        # third step : call the senslab-experiment wrapper
-        #command= "java -jar target/sfa-1.0-jar-with-dependencies.jar 
-        # "+str(jobid)+" "+slice_user
-        javacmdline = "/usr/bin/java"
-        jarname = \
-            "/opt/senslabexperimentwrapper/sfa-1.0-jar-with-dependencies.jar"
-        #ret=subprocess.check_output(["/usr/bin/java", "-jar", ", \
-                                                    #str(jobid), slice_user])
-        output = subprocess.Popen([javacmdline, "-jar", jarname, str(jobid), \
-                            slice_user],stdout=subprocess.PIPE).communicate()[0]
-
-        logger.debug("SLABDRIVER \tLaunchExperimentOnOAR wrapper returns%s " \
-                                                                 %(output))
-        return 
-                 
+        def __configure_experiment(jobid, added_nodes):
+            # second step : configure the experiment
+            # we need to store the nodes in a yaml (well...) file like this :
+            # [1,56,23,14,45,75] with name /tmp/sfa<jobid>.json
+            job_file = open('/tmp/sfa/'+ str(jobid) + '.json', 'w')
+            job_file.write('[')
+            job_file.write(str(added_nodes[0].strip('node')))
+            for node in added_nodes[1:len(added_nodes)] :
+                job_file.write(', '+ node.strip('node'))
+            job_file.write(']')
+            job_file.close()
+            return 
+        
+        def __launch_senslab_experiment(jobid):   
+            # third step : call the senslab-experiment wrapper
+            #command= "java -jar target/sfa-1.0-jar-with-dependencies.jar 
+            # "+str(jobid)+" "+slice_user
+            javacmdline = "/usr/bin/java"
+            jarname = \
+                "/opt/senslabexperimentwrapper/sfa-1.0-jar-with-dependencies.jar"
+            #ret=subprocess.check_output(["/usr/bin/java", "-jar", ", \
+                                                        #str(jobid), slice_user])
+            output = subprocess.Popen([javacmdline, "-jar", jarname, str(jobid), \
+                                slice_user],stdout=subprocess.PIPE).communicate()[0]
+    
+            logger.debug("SLABDRIVER \t __configure_experiment wrapper returns%s " \
+                                                                    %(output))
+            return 
+        
+        
+        
+        if jobid :
+            logger.debug("SLABDRIVER \tLaunchExperimentOnOAR jobid %s \
+                    added_nodes %s slice_user %s" %(jobid, added_nodes, slice_user))
+            self.db.update_job( slice_name, jobid, added_nodes)
+        
+            __configure_experiment(jobid, added_nodes)
+            __launch_senslab_experiment(jobid) 
+            
+        return
+        
+    def AddLeases(self, hostname_list, slice_record, lease_start_time, lease_duration):
+        logger.debug("SLABDRIVER \r\n \r\n \t AddLeases hostname_list %s  \
+                slice_record %s lease_start_time %s lease_duration %s  "\
+                 %( hostname_list, slice_record , lease_start_time, \
+                 lease_duration))
+
+        tmp = slice_record['PI'][0].split(".")
+        username = tmp[(len(tmp)-1)]
+        self.LaunchExperimentOnOAR(hostname_list, slice_record['name'], lease_start_time, lease_duration, username)
+        start_time = datetime.fromtimestamp(int(lease_start_time)).strftime(self.time_format)
+        logger.debug("SLABDRIVER \t AddLeases hostname_list start_time %s " %(start_time))
+        
+        return
+    
+    
     #Delete the jobs and updates the job id in the senslab table
     #to set it to -1  
     #Does not clear the node list 
@@ -1525,4 +1532,4 @@ class SlabDriver(Driver):
             self.DeleteJobs(job_id, slice_hrn)
         
         logger.debug("SLABDRIVER DeleteLeases leases_id_list %s slice_hrn %s \r\n " %(leases_id_list, slice_hrn))
-        return 
\ No newline at end of file
+        return 
index 9428ffb..ec6d072 100644 (file)
@@ -139,13 +139,15 @@ class SlabSlices:
         return sfa_peer
         
         
-    def verify_slice_leases(self, sfa_slice, requested_leases, kept_leases, \
+    def verify_slice_leases(self, sfa_slice, requested_jobs_dict, kept_leases, \
         peer):
-        logger.debug("SLABSLICES \tverify_slice_leases requested_leases %s kept_leases %s sfa_slice%s peer%s" %(requested_leases, kept_leases,sfa_slice,peer) )
+
+       
+        #First get the list of current leases from OAR  
         leases = self.driver.GetLeases({'name':sfa_slice['name']}, ['lease_id'])
-        grain = self.driver.GetLeaseGranularity()
         if leases : 
             current_leases = [lease['lease_id'] for lease in leases]
+            #Deleted leases are the ones with lease id not declared in the Rspec
             deleted_leases = list(set(current_leases).difference(kept_leases))
     
             try:
@@ -154,15 +156,21 @@ class SlabSlices:
                     #TODO :UnBindObjectFromPeer Quick and dirty auth='senslab2 SA 27/07/12
                     self.driver.UnBindObjectFromPeer('senslab2', 'slice', \
                                     sfa_slice['record_id_slice'], peer.hrn)
-                deleted = self.driver.DeleteLeases(deleted_leases, sfa_slice['name'])
-                for lease in requested_leases:
-                    added = self.driver.AddLeases(lease['hostname'], \
-                            sfa_slice['name'], int(lease['start_time']), \
-                            int(lease['duration']))
+                
+                deleted = self.driver.DeleteLeases(deleted_leases, \
+                                        sfa_slice['name'])
+               
             #TODO : catch other exception?
             except KeyError: 
                 logger.log_exc('Failed to add/remove slice leases')
-
+                
+        #Add new leases        
+        for start_time in requested_jobs_dict:
+            job = requested_jobs_dict[start_time]
+            added = self.driver.AddLeases(job['hostname'], \
+                        sfa_slice, int(job['start_time']), \
+                        int(job['duration']))
+                        
         return leases
 
     def verify_slice_nodes(self, sfa_slice, requested_slivers, peer):