Add new table JobSenslab so that one slice can
[sfa.git] / sfa / senslab / slabdriver.py
index 444f158..e7c95d7 100644 (file)
@@ -28,7 +28,7 @@ from sfa.util.xrn import hrn_to_urn, urn_to_sliver_id, get_leaf
 from sfa.senslab.OARrestapi import  OARrestapi
 from sfa.senslab.LDAPapi import LDAPapi
 
-from sfa.senslab.slabpostgres import SlabDB, slab_dbsession, SliceSenslab
+from sfa.senslab.slabpostgres import SlabDB, slab_dbsession, SliceSenslab, JobSenslab
 from sfa.senslab.slabaggregate import SlabAggregate, slab_xrn_to_hostname, slab_xrn_object
 from sfa.senslab.slabslices import SlabSlices
 
@@ -135,6 +135,84 @@ class SlabDriver(Driver):
             return result        
         
         
+    def synchronize_oar_and_slice_table(self, slice_hrn = None): 
+        #Get list of leases
+        oar_leases_list = self.GetReservedNodes()
+       
+        logger.debug("SLABDRIVER \tsynchronize_oar_and_slice_table \r\n \r\n : oar_leases_list %s\r\n" %( oar_leases_list))
+        #Get list of slices/leases . multiple entry per user depending on number of jobs
+        #At this point we don't have the slice_hrn so that's why
+        #we are calling Getslices, which holds a field with slice_hrn
+        
+        if slice_hrn :
+            sfa_slices_list = self.GetSlices(slice_filter = slice_hrn, slice_filter_type = 'slice_hrn')
+            self.synchronize_oar_and_slice_table_for_slice_hrn(slice_hrn, oar_leases_list, sfa_slices_list)
+        else :
+            sfa_slices_list = self.GetSlices()
+                     
+            sfa_slices_dict_by_slice_hrn = {}
+            for sfa_slice in sfa_slices_list:
+                if sfa_slice['slice_hrn'] not in sfa_slices_dict_by_slice_hrn:
+                    sfa_slices_dict_by_slice_hrn[sfa_slice['slice_hrn']] = []
+                    sfa_slices_dict_by_slice_hrn[sfa_slice['slice_hrn']].append(sfa_slice)
+                else :
+                    sfa_slices_dict_by_slice_hrn[sfa_slice['slice_hrn']].append(sfa_slice)
+            
+            for slice_hrn in sfa_slices_dict_by_slice_hrn:
+                list_slices_sfa = sfa_slices_dict_by_slice_hrn[slice_hrn] 
+                if slice_hrn =='senslab2.avakian_slice':
+                    logger.debug("SLABDRIVER \tsynchronize_oar_and_slice_table slice_hrn %s list_slices_sfa %s\r\n \r\n" %( slice_hrn,list_slices_sfa))
+                self.synchronize_oar_and_slice_table_for_slice_hrn(slice_hrn, oar_leases_list, list_slices_sfa)
+            
+        return 
+            
+         
+    def synchronize_oar_and_slice_table_for_slice_hrn(self,slice_hrn, oar_leases_list, sfa_slices_list):
+        
+        #Get list of slices/leases . multiple entry per user depending on number of jobs
+        #sfa_slices_list = self.GetSlices(slice_filter = slice_hrn, slice_filter_type = 'slice_hrn')
+        sfa_slices_dict = {}
+        oar_leases_dict = {}
+        login = slice_hrn.split(".")[1].split("_")[0]
+        
+        #Create dictionnaries based on the tuple user login/ job id
+        #for the leases list and the slices list
+
+        for sl in sfa_slices_list:
+            if sl['oar_job_id'] != [] :
+                for oar_jobid in sl['oar_job_id']:
+                    if (login, oar_jobid) not in sfa_slices_dict:
+                        sfa_slices_dict[(login,oar_jobid)] = sl
+        
+        for lease in oar_leases_list:
+            if (lease['user'], lease['lease_id']) not in oar_leases_dict:
+               oar_leases_dict[(lease['user'], lease['lease_id'])] = lease
+        
+        #Find missing entries in the sfa slices list dict by comparing
+        #the keys in both dictionnaries
+        #Add the missing entries in the slice sneslab table
+
+        for lease in oar_leases_dict :
+            logger.debug(" =============SLABDRIVER \t\t\  synchronize_oar_and_slice_table_for_slice_hrn oar_leases_list %s \r\n \t\t\t SFA_SLICES_DICT %s \r\n \r\n LOGIN %s \r\n " %( oar_leases_list,sfa_slices_dict,login))
+            if lease not in sfa_slices_dict and login == lease[0]:
+                
+                #if lease in GetReservedNodes not in GetSlices update the db
+                #First get the list of nodes hostnames for this job
+                oar_reserved_nodes_listdict = oar_leases_dict[lease]['reserved_nodes']
+                oar_reserved_nodes_list = []
+                for node_dict in oar_reserved_nodes_listdict:
+                    oar_reserved_nodes_list.append(node_dict['hostname'])
+                #And update the db with slice hrn, job id and node list 
+                self.db.add_job(slice_hrn, lease[1], oar_reserved_nodes_list)
+                
+        for lease in  sfa_slices_dict:
+            #Job is now terminated or in Error, either way ot is not going to run again
+            #Remove it from the db
+            if lease not in  oar_leases_dict:
+               self.db.delete_job( slice_hrn, lease[1])
+                  
+        return     
+             
     def create_sliver (self, slice_urn, slice_hrn, creds, rspec_string, \
                                                              users, options):
         aggregate = SlabAggregate(self)
@@ -155,7 +233,7 @@ class SlabDriver(Driver):
         logger.debug("SLABDRIVER.PY \tcreate_sliver \trspec.version %s " \
                                                             %(rspec.version))
         
-        
+        self.synchronize_oar_and_slice_table(slice_hrn)
         # ensure site record exists?
         # ensure slice record exists
         sfa_slice = slices.verify_slice(slice_hrn, slice_record, peer, \
@@ -557,6 +635,7 @@ class SlabDriver(Driver):
         reqdict['method'] = "delete"
         reqdict['strval'] = str(job_id)
        
+        self.db.delete_job(slice_hrn, job_id)
         answer = self.oar.POSTRequestToOARRestAPI('DELETE_jobs_id', \
                                                     reqdict,username)
         logger.debug("SLABDRIVER \tDeleteJobs jobid  %s \r\n answer %s username %s"  \
@@ -775,71 +854,96 @@ class SlabDriver(Driver):
         if slice_filter_type in authorized_filter_types_list:
             #Get list of slices based on the slice hrn
             if slice_filter_type == 'slice_hrn':
-                #There can be several jobs running for one slices 
                 login = slice_filter.split(".")[1].split("_")[0] 
                 
                 #DO NOT USE RegSlice - reg_researchers to get the hrn of the user
                 #otherwise will mess up the RegRecord in Resolve, don't know
                 #why - SA 08/08/2012
                 
-                slicerec = slab_dbsession.query(SliceSenslab).filter_by(slice_hrn = slice_filter).all()
+                #Only one entry for one user  = one slice in slice_senslab table
+                slicerec = slab_dbsession.query(SliceSenslab).filter_by(slice_hrn = slice_filter).first()
                 
-            #Get list of slices base on user id                             
+            #Get slice based on user id                             
             if slice_filter_type == 'record_id_user':
-                slicerec = slab_dbsession.query(SliceSenslab).filter_by(record_id_user = slice_filter).all()
+                slicerec = slab_dbsession.query(SliceSenslab).filter_by(record_id_user = slice_filter).first()
                 
-            if slicerec is []:
+            if slicerec is None:
                 return []
             
-            slicerec_dictlist = []
-            for record in slicerec:
-                slicerec_dictlist.append(record.dump_sqlalchemyobj_to_dict())
-                if login is None :
-                    login = slicerec_dictlist[0]['slice_hrn'].split(".")[1].split("_")[0] 
-
-        
-              
+            #slicerec_dictlist = []
+            slicerec_dict = slicerec.dump_sqlalchemyobj_to_dict()
+            if login is None :
+                login = slicerec_dict['slice_hrn'].split(".")[1].split("_")[0] 
+
+            #for record in slicerec:
+                #slicerec_dictlist.append(record.dump_sqlalchemyobj_to_dict())
+                #if login is None :
+                    #login = slicerec_dictlist[0]['slice_hrn'].split(".")[1].split("_")[0] 
+
+            #One slice can have multiple jobs
+            sqljob_list = slab_dbsession.query(JobSenslab).filter_by( slice_hrn=slicerec_dict['slice_hrn']).all() 
+            job_list = []
+            for job in sqljob_list:
+                job_list.append(job.dump_sqlalchemyobj_to_dict())
+                
             logger.debug("\r\n SLABDRIVER \tGetSlices login %s \
                                             slice record %s" \
-                                            %(login, slicerec_dictlist))
-            for slicerec_dict in slicerec_dictlist :                           
-                if slicerec_dict['oar_job_id'] is not -1:
-                    #Check with OAR the status of the job if a job id is in 
-                    #the slice record 
-                    rslt = self.GetJobsResources(slicerec_dict['oar_job_id'], \
-                                                            username = login)
-                    logger.debug("SLABDRIVER.PY  \tGetSlices  rslt fromn  GetJobsResources %s"\
-                                                            %(rslt))
-                    if rslt :
-                        slicerec_dict.update(rslt)
-                        slicerec_dict.update({'hrn':\
-                                            str(slicerec_dict['slice_hrn'])})
-                    #If GetJobsResources is empty, this means the job is 
-                    #now in the 'Terminated' state
-                    #Update the slice record
-                    else :
-                        self.db.update_job(slice_filter, job_id = -1)
-                        slicerec_dict['oar_job_id'] = -1
-                        slicerec_dict.\
-                                update({'hrn':str(slicerec_dict['slice_hrn'])})
+                                            %(login, slicerec_dict))
             
+            #Several jobs for one slice   
+            slicerec_dict['oar_job_id'] = []
+            for job in job_list :                          
+                #if slicerec_dict['oar_job_id'] is not -1:
+                #Check with OAR the status of the job if a job id is in 
+                #the slice record 
+                
+                rslt = self.GetJobsResources(job['oar_job_id'], \
+                                                        username = login)
+                logger.debug("SLABDRIVER.PY  \tGetSlices  rslt fromn  GetJobsResources %s"\
+                                                        %(rslt))
+                if rslt :
+                    slicerec_dict['oar_job_id'].append(job['oar_job_id'])
+                    slicerec_dict.update(rslt)
+                    slicerec_dict.update({'hrn':\
+                                        str(slicerec_dict['slice_hrn'])})
+                #If GetJobsResources is empty, this means the job is 
+                #now in the 'Terminated' state
+                #Update the slice record
+                else :
+                    self.db.delete_job(slice_filter, job['oar_job_id'])
+                    slicerec_dict.\
+                            update({'hrn':str(slicerec_dict['slice_hrn'])})
+        
                 try:
-                    slicerec_dict['node_ids'] = slicerec_dict['node_list']
+                    slicerec_dict['node_ids'] = job['node_list']
                 except KeyError:
                     pass
-                
-                logger.debug("SLABDRIVER.PY  \tGetSlices  RETURN slicerec_dictlist  %s"\
-                                                            %(slicerec_dictlist))
-                              
-            return slicerec_dictlist
-                
+            
+            logger.debug("SLABDRIVER.PY  \tGetSlices  RETURN slicerec_dict  %s"\
+                                                        %(slicerec_dict))
+                            
+            return [slicerec_dict]
+            
                 
         else:
             slice_list = slab_dbsession.query(SliceSenslab).all()
+            sqljob_list = slab_dbsession.query(JobSenslab).all()
+            
+            job_list = []
+            for job in sqljob_list:
+                job_list.append(job.dump_sqlalchemyobj_to_dict())
+                
             return_slice_list = []
             for record in slice_list:
                 return_slice_list.append(record.dump_sqlalchemyobj_to_dict())
+                
+            for slicerec_dict in return_slice_list:
+                slicerec_dict['oar_job_id'] = []
+                for job in job_list:
+                    if slicerec_dict['slice_hrn'] in job:
+                        slicerec_dict['oar_job_id'].append(job['oar_job_id'])
+                
             logger.debug("SLABDRIVER.PY  \tGetSlices RETURN slices %s \
                         slice_filter %s " %(return_slice_list, slice_filter))
         
@@ -1111,7 +1215,7 @@ class SlabDriver(Driver):
         if jobid :
             logger.debug("SLABDRIVER \tLaunchExperimentOnOAR jobid %s \
                     added_nodes %s slice_user %s" %(jobid, added_nodes, slice_user))
-            self.db.update_job( slice_name, jobid, added_nodes)
+            self.db.add_job( slice_name, jobid, added_nodes)
         
             __configure_experiment(jobid, added_nodes)
             __launch_senslab_experiment(jobid) 
@@ -1140,7 +1244,7 @@ class SlabDriver(Driver):
          # Get user information
        
         self.DeleteJobs(slice_record['oar_job_id'], slice_record['hrn'])
-        self.db.update_job(slice_record['hrn'], job_id = -1)
+
         return   
     
  
@@ -1153,6 +1257,10 @@ class SlabDriver(Driver):
     
     def GetLeases(self, lease_filter_dict=None, return_fields_list=None):
         unfiltered_reservation_list = self.GetReservedNodes()
+        
+        ##Synchronize slice_table of sfa senslab db
+        #self.synchronize_oar_and_slice_table(unfiltered_reservation_list)
+        
         reservation_list = []
         #Find the slice associated with this user senslab ldap uid
         logger.debug(" SLABDRIVER.PY \tGetLeases ")
@@ -1174,6 +1282,7 @@ class SlabDriver(Driver):
                     slice_info = query_slice_info.first()
                 else:
                     slice_info = None
+                    
                 resa_user_dict[resa['user']] = {}
                 resa_user_dict[resa['user']]['ldap_info'] = user
                 resa_user_dict[resa['user']]['slice_info'] = slice_info
@@ -1315,8 +1424,8 @@ class SlabDriver(Driver):
         return
         
         #self.fill_record_slab_info(records)
-       
-       
+    
+    
         
 
     
@@ -1505,7 +1614,6 @@ class SlabDriver(Driver):
         
         """
         self.DeleteSliceFromNodes(slice_record)
-        self.db.update_job(slice_record['hrn'], job_id = -1)
         logger.warning("SLABDRIVER DeleteSlice %s "%(slice_record))
         return