From f26026cdb822f1ef80ee34b0a02401774f9faae0 Mon Sep 17 00:00:00 2001 From: Sandrine Avakian Date: Thu, 15 Mar 2012 17:17:27 +0100 Subject: [PATCH] Fixed oarjob creation and update. --- sfa/importer/slabimporter.py | 8 ++-- sfa/senslab/slabdriver.py | 60 +++++++++++++++++------------- sfa/senslab/slabpostgres.py | 72 +++++++++++++++++++++++++++++++----- sfa/senslab/slabslices.py | 21 +++++------ 4 files changed, 111 insertions(+), 50 deletions(-) diff --git a/sfa/importer/slabimporter.py b/sfa/importer/slabimporter.py index 166a7c21..cc6554ed 100644 --- a/sfa/importer/slabimporter.py +++ b/sfa/importer/slabimporter.py @@ -9,7 +9,7 @@ from sfa.util.plxrn import PlXrn, slicename_to_hrn, email_to_hrn, hrn_to_pl_slic from sfa.senslab.LDAPapi import LDAPapi from sfa.senslab.slabdriver import SlabDriver -from sfa.senslab.slabpostgres import SlabSliceDB, slab_dbsession +from sfa.senslab.slabpostgres import SliceSenslab, slab_dbsession from sfa.trust.certificate import Keypair,convert_public_key from sfa.trust.gid import create_uuid @@ -66,6 +66,8 @@ class SlabImporter: if not slabdriver.db.exists('slice_senslab'): slabdriver.db.createtable('slice_senslab') + + print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES CREATETABLE YAAAAAAAAAAY" ######## retrieve all existing SFA objects all_records = dbsession.query(RegRecord).all() @@ -267,7 +269,7 @@ class SlabImporter: #Get it sl_rec = dbsession.query(RegSlice).filter(RegSlice.hrn.match(slice_hrn)).all() - slab_slice = SlabSliceDB( slice_hrn = slice_hrn, record_id_slice=sl_rec[0].record_id, record_id_user= user_record.record_id) + slab_slice = SliceSenslab( slice_hrn = slice_hrn, record_id_slice=sl_rec[0].record_id, record_id_user= user_record.record_id) print>>sys.stderr, "\r\n \r\n SLAB IMPORTER SLICE IMPORT NOTslice_record %s \r\n slab_slice %s" %(sl_rec,slab_slice) slab_dbsession.add(slab_slice) slab_dbsession.commit() @@ -311,4 +313,4 @@ class SlabImporter: dbsession.commit() - \ No newline at end of file + diff --git a/sfa/senslab/slabdriver.py b/sfa/senslab/slabdriver.py index 337c5de8..5864b79a 100644 --- a/sfa/senslab/slabdriver.py +++ b/sfa/senslab/slabdriver.py @@ -4,9 +4,7 @@ import datetime from time import gmtime, strftime from sfa.util.faults import MissingSfaInfo , SliverDoesNotExist -#from sfa.util.sfatime import datetime_to_string from sfa.util.sfalogging import logger -#from sfa.storage.table import SfaTable from sfa.util.defaultdict import defaultdict from sfa.storage.record import Record @@ -32,9 +30,9 @@ from sfa.util.plxrn import slicename_to_hrn, hostname_to_hrn, hrn_to_pl_slicenam ## thierry : please avoid wildcard imports :) from sfa.senslab.OARrestapi import OARrestapi from sfa.senslab.LDAPapi import LDAPapi -#from sfa.senslab.SenslabImportUsers import SenslabImportUsers + from sfa.senslab.parsing import parse_filter -from sfa.senslab.slabpostgres import SlabDB, slab_dbsession,SlabSliceDB +from sfa.senslab.slabpostgres import SlabDB, slab_dbsession,SliceSenslab from sfa.senslab.slabaggregate import SlabAggregate from sfa.senslab.slabslices import SlabSlices @@ -403,17 +401,21 @@ class SlabDriver(Driver): existing_records = {} existing_hrns_by_types= {} - all_records = dbsession.query(RegRecord).all + print >>sys.stderr, "\r\n \r\n SLABDRIVER GetPeers auth = %s, peer_filter %s, return_field %s " %(auth , peer_filter, return_fields) + all_records = dbsession.query(RegRecord).filter(RegRecord.type.like('%authority%')).all() for record in all_records: existing_records[record.hrn] = record if record.type not in existing_hrns_by_types: existing_hrns_by_types[record.type] = [record.hrn] + print >>sys.stderr, "\r\n \r\n SLABDRIVER GetPeers \t NOT IN existing_hrns_by_types %s " %( existing_hrns_by_types) else: + + print >>sys.stderr, "\r\n \r\n SLABDRIVER GetPeers \t INNN type %s hrn %s " %( record.type,record.hrn ) existing_hrns_by_types.update({record.type:(existing_hrns_by_types[record.type].append(record.hrn))}) print >>sys.stderr, "\r\n \r\n SLABDRIVER GetPeers existing_hrns_by_types %s " %( existing_hrns_by_types) - return_records = [] - #records_list = table.findObjects({'type':'authority+sa'}) + records_list= [] + try: for hrn in existing_hrns_by_types['authority+sa']: records_list.append(existing_records[hrn]) @@ -549,20 +551,22 @@ class SlabDriver(Driver): def GetSlices(self,slice_filter = None, return_fields=None): #sliceslist = self.db.find('slice_senslab',columns = ['oar_job_id', 'slice_hrn', 'record_id_slice','record_id_user'], record_filter=slice_filter) - sliceslist = slab_dbsession.query(SlabSliceDB).all() + #sliceslist = slab_dbsession.query(SliceSenslab).all() + return_slice_list = self.db.find('slice',slice_filter) #sliceslist = slices_records.order_by("record_id_slice").all() - print >>sys.stderr, " \r\n \r\n \tSLABDRIVER.PY GetSlices slices %s slice_filter %s " %(sliceslist,slice_filter) - - return_slice_list = parse_filter(sliceslist, slice_filter,'slice', return_fields) + print >>sys.stderr, " \r\n \r\n \tSLABDRIVER.PY GetSlices slices %s slice_filter %s " %(return_slice_list,slice_filter) + + if return_fields: + return_slice_list = parse_filter(sliceslist, slice_filter,'slice', return_fields) if return_slice_list: for sl in return_slice_list: #login = sl['slice_hrn'].split(".")[1].split("_")[0] - login = sl.slice_hrn.split(".")[1].split("_")[0] + login = sl['slice_hrn'].split(".")[1].split("_")[0] print >>sys.stderr, " \r\n \r\n \tSLABDRIVER.PY GetSlices sl %s " %(sl) - if sl.oar_job_id is not -1: - rslt = self.GetJobs( sl.oar_job_id,resources=False, username = login ) + if sl['oar_job_id'] is not -1: + rslt = self.GetJobs( sl['oar_job_id'],resources=False, username = login ) print >>sys.stderr, " \r\n \r\n \tSLABDRIVER.PY GetSlices GetJobs %s " %(rslt) if rslt : sl.update(rslt) @@ -707,8 +711,10 @@ class SlabDriver(Driver): #OAR = OARrestapi() answer = self.oar.POSTRequestToOARRestAPI('POST_job',reqdict,slice_user) print>>sys.stderr, "\r\n \r\n AddSliceToNodes jobid %s " %(answer) - self.db.update('slice',['oar_job_id'], [answer['id']], 'slice_hrn', slice_name) - + #self.db.update('slice',['oar_job_id'], [answer['id']], 'slice_hrn', slice_name) + + + self.db.update_job( answer['id'], slice_name) jobid=answer['id'] print>>sys.stderr, "\r\n \r\n AddSliceToNodes jobid %s added_nodes %s slice_user %s" %(jobid,added_nodes,slice_user) # second step : configure the experiment @@ -783,7 +789,7 @@ class SlabDriver(Driver): # get the sfa records #table = SfaTable() existing_records = {} - all_records = dbsession.query(RegRecord).all + all_records = dbsession.query(RegRecord).all() for record in all_records: existing_records[(record.type,record.pointer)] = record @@ -875,7 +881,7 @@ class SlabDriver(Driver): Given a SFA record, fill in the senslab specific and SFA specific fields in the record. """ - print >>sys.stderr, "\r\n \t\t BEFORE fill_record_info %s" %(records) + print >>sys.stderr, "\r\n \t\t SLABDRIVER.PY fill_record_info 000000000 fill_record_info %s" %(records) if not isinstance(records, list): records = [records] @@ -884,26 +890,28 @@ class SlabDriver(Driver): for record in parkour: if str(record['type']) == 'slice': - print >>sys.stderr, "\r\n \t\t SLABDRIVER.PY fill_record_info record %s" %(record) + print >>sys.stderr, "\r\n \t\t SLABDRIVER.PY fill_record_info \t \t record %s" %(record) #sfatable = SfaTable() existing_records_by_id = {} - all_records = dbsession.query(RegRecord).all + all_records = dbsession.query(RegRecord).all() for rec in all_records: existing_records_by_id[rec.record_id] = rec - print >>sys.stderr, "\r\n \t\t SLABDRIVER.PY fill_record_info existing_records_by_id %s" %(existing_records_by_id) + print >>sys.stderr, "\r\n \t\t SLABDRIVER.PY fill_record_info \t\t existing_records_by_id %s" %(existing_records_by_id[record['record_id']]) - recslice = self.db.find('slice',str(record['hrn'])) + recslice = self.db.find('slice',{'slice_hrn':str(record['hrn'])}) + + print >>sys.stderr, "\r\n \t\t SLABDRIVER.PY fill_record_info \t\t HOY HOY reclise %s" %(recslice) if isinstance(recslice,list) and len(recslice) == 1: recslice = recslice[0] #recuser = sfatable.find( recslice['record_id_user'], ['hrn']) - recuser = existing_records_by_id[recslice['record_id_user']]['hrn'] - print >>sys.stderr, "\r\n \t\t SLABDRIVER.PY fill_record_info %s" %(recuser) + recuser = existing_records_by_id[recslice['record_id_user']] + print >>sys.stderr, "\r\n \t\t SLABDRIVER.PY fill_record_info \t\t recuser %s" %(recuser) if isinstance(recuser,list) and len(recuser) == 1: recuser = recuser[0] - record.update({'PI':[recuser['hrn']], - 'researcher': [recuser['hrn']], + record.update({'PI':[recuser.hrn], + 'researcher': [recuser.hrn], 'name':record['hrn'], 'oar_job_id':recslice['oar_job_id'], 'node_ids': [], diff --git a/sfa/senslab/slabpostgres.py b/sfa/senslab/slabpostgres.py index fc561cd8..6176a033 100644 --- a/sfa/senslab/slabpostgres.py +++ b/sfa/senslab/slabpostgres.py @@ -1,6 +1,6 @@ import sys -from sqlalchemy import create_engine +from sqlalchemy import create_engine, and_ from sqlalchemy.orm import sessionmaker from sfa.util.config import Config @@ -33,7 +33,7 @@ SlabBase = declarative_base() -class SlabSliceDB (SlabBase): +class SliceSenslab (SlabBase): __tablename__ = 'slice_senslab' record_id_user = Column(Integer, primary_key=True) oar_job_id = Column( Integer,default = -1) @@ -59,7 +59,20 @@ class SlabSliceDB (SlabBase): return result - +#class PeerSenslab(SlabBase): + #__tablename__ = 'peer_senslab' + #peername = Column(String, nullable = False) + #peerid = Column( Integer,primary_key=True) + + #def __init__ (self,peername = None ): + #if peername: + #self.peername = peername + + + #def __repr__(self): + #result=">sys.stderr, " \r\n \r\n \t SLABPOSTGRES update_job slice_rec %s"%(slice_rec) + slice_rec.oar_job_id = job_id + slab_dbsession.commit() def find (self, name = None, filter_dict = None): - if filter_dict: - filter_statement = "and_(SlabSliceDB." + print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES find filter_dict %s"%(filter_dict) + + #Filter_by can not handle more than one argument, hence these functions + def filter_id_user(query, user_id): + print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES find filter_id_user" + return query.filter_by(record_id_user = user_id) + + def filter_job(query, job): + print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES find filter_job " + return query.filter_by(oar_job_id = job) + + def filer_id_slice (query, id_slice): + print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES find filer_id_slice" + return query.filter_by(record_id_slice = id_slice) + + def filter_slice_hrn(query, hrn): + print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES find filter_slice_hrn" + return query.filter_by(slice_hrn = hrn) + + + extended_filter = {'record_id_user': filter_id_user, + 'oar_job_id':filter_job, + 'record_id_slice': filer_id_slice, + 'slice_hrn': filter_slice_hrn} + + Q = slab_dbsession.query(SliceSenslab) + + if filter_dict is not None: for k in filter_dict: - filter_statement += str(k)+ "==" + str(filter_dict[l]) - filter_statement +=')' - print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES find filter_statement %s"%(filter_statement) - slab_dbsession.query(SlabSliceDB).filter(filter_statement) + try: + newQ= extended_filter[k](Q, filter_dict[k]) + Q = newQ + except KeyError: + print>>sys.stderr, "\r\n \t\t FFFFFFFFFFFFFFFFUUUUUUUUFUFUFU!!!!!!!!" + print>>sys.stderr, " HEEEEEEEEEEEEY %s " %(Q.all()) + reclist = [] + for rec in Q.all(): + reclist.append(dict(zip(['record_id_user','oar_job_id', 'record_id_slice','slice_hrn'],[rec.record_id_user,rec.oar_job_id,rec.record_id_slice, rec.slice_hrn]))) + print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES find reclist %s" %(reclist) + return reclist @@ -158,4 +210,4 @@ from sfa.util.config import Config slab_alchemy= SlabDB(Config()) slab_engine=slab_alchemy.slab_engine -slab_dbsession=slab_alchemy.session() \ No newline at end of file +slab_dbsession=slab_alchemy.session() diff --git a/sfa/senslab/slabslices.py b/sfa/senslab/slabslices.py index ccc97b33..ed35c7da 100644 --- a/sfa/senslab/slabslices.py +++ b/sfa/senslab/slabslices.py @@ -135,23 +135,21 @@ class SlabSlices: def get_peer(self, xrn): hrn, type = urn_to_hrn(xrn) - # Becaues of myplc federation, we first need to determine if this - # slice belongs to out local plc or a myplc peer. We will assume it - # is a local site, unless we find out otherwise + #Does this slice belong to a local site or a peer senslab site? peer = None - print>>sys.stderr, " \r\n \r\n \t slices.py get_peer slice_authority " + # get this slice's authority (site) slice_authority = get_authority(hrn) - + # get this site's authority (sfa root authority or sub authority) site_authority = get_authority(slice_authority).lower() - + print>>sys.stderr, " \r\n \r\n \t slices.py get_peer slice_authority %s site_authority %s " %(slice_authority, site_authority) # check if we are already peered with this site_authority, if so - peers = self.driver.GetPeers({'hrn':site_authority}) - print>>sys.stderr, " \r\n \r\n \t slices.py get_peer slice_authority %s site_authority %s" %(slice_authority,site_authority) + peers = self.driver.GetPeers({}) + print>>sys.stderr, " \r\n \r\n \t slices.py get_peer peers %s " %(peers) for peer_record in peers: - names = [name.lower() for name in peer_record.values() if isinstance(name, StringTypes)] - if site_authority in names: + + if site_authority == peer_record.hrn: peer = peer_record print>>sys.stderr, " \r\n \r\n \t slices.py get_peer peer %s " %(peer) return peer @@ -296,7 +294,8 @@ class SlabSlices: #parts = hrn_to_pl_slicename(slice_hrn).split("_") login_base = slice_hrn.split(".")[0] slicename = slice_hrn - slices = self.driver.GetSlices([slicename]) + slices = self.driver.GetSlices(slice_filter={'slice_hrn': slicename}) + #slices = self.driver.GetSlices([slicename]) print>>sys.stderr, " \r\n \r\rn Slices.py verify_slice slicename %s slices %s slice_record %s"%(slicename ,slices, slice_record) if not slices: slice = {'name': slicename, -- 2.43.0