import sys from sqlalchemy import create_engine, and_ from sqlalchemy.orm import sessionmaker from sfa.util.config import Config from sfa.util.sfalogging import logger from sqlalchemy import Column, Integer, String, DateTime from sqlalchemy import Table, Column, MetaData, join, ForeignKey import sfa.storage.model as model from sfa.storage.model import RegSlice from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import relationship, backref from sqlalchemy.dialects import postgresql from sqlalchemy import MetaData, Table from sqlalchemy.exc import NoSuchTableError from sqlalchemy import String from sfa.storage.alchemy import dbsession #Dict holding the columns names of the table as keys #and their type, used for creation of the table slice_table = {'record_id_user':'integer PRIMARY KEY references X ON DELETE CASCADE ON UPDATE CASCADE','oar_job_id':'integer DEFAULT -1', 'record_id_slice':'integer', 'slice_hrn':'text NOT NULL'} #Dict with all the specific senslab tables tablenames_dict = {'slice_senslab': slice_table} ############################## SlabBase = declarative_base() class SliceSenslab (SlabBase): __tablename__ = 'slice_senslab' #record_id_user = Column(Integer, primary_key=True) slice_hrn = Column(String,primary_key=True) peer_authority = Column( String,nullable = True) record_id_slice = Column(Integer) record_id_user = Column(Integer) #oar_job_id = Column( Integer,default = -1) #node_list = Column(postgresql.ARRAY(String), nullable =True) def __init__ (self, slice_hrn =None, record_id_slice=None, record_id_user= None,peer_authority=None): if record_id_slice: self.record_id_slice = record_id_slice if slice_hrn: self.slice_hrn = slice_hrn if record_id_user: self.record_id_user= record_id_user if peer_authority: self.peer_authority = peer_authority def __repr__(self): result=">sys.stderr, " \r\n \r\n SLAPOSTGRES INIT unix_url %s" %(unix_url) # the TCP fallback method tcp_url = "postgresql+psycopg2://%s:%s@%s:%s/%s"%\ (config.SFA_DB_USER,config.SFA_DB_PASSWORD,config.SFA_DB_HOST,config.SFA_DB_PORT,dbname) for url in [ unix_url, tcp_url ] : try: self.slab_engine = create_engine (url,echo_pool = l_echo_pool, echo = l_echo) self.check() self.url=url return except: pass self.slab_engine=None raise Exception,"Could not connect to database" def check (self): self.slab_engine.execute ("select 1").scalar() def session (self): if self.slab_session is None: Session=sessionmaker () self.slab_session=Session(bind=self.slab_engine) return self.slab_session #Close connection to database def close(self): if self.connection is not None: self.connection.close() self.connection = None def exists(self, tablename): """ Checks if the table specified as tablename exists. """ try: metadata = MetaData (bind=self.slab_engine) table=Table (tablename, metadata, autoload=True) return True except NoSuchTableError: print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES EXISTS NOPE! tablename %s " %(tablename) return False def createtable(self, tablename ): """ Creates the specifed table. Uses the global dictionnary holding the tablenames and the table schema. """ print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES createtable SlabBase.metadata.sorted_tables %s \r\n engine %s" %(SlabBase.metadata.sorted_tables , slab_engine) SlabBase.metadata.create_all(slab_engine) return def add_job (self, hrn, job_id, nodes = None ): job_row = slab_dbsession.query(JobSenslab).filter_by(oar_job_id=job_id).first() if job_row is None: slice_rec = dbsession.query(RegSlice).filter(RegSlice.hrn.match(hrn)).first() if slice_rec : user_record = slice_rec.reg_researchers slab_slice = JobSenslab(slice_hrn = hrn, oar_job_id = job_id, \ record_id_slice=slice_rec.record_id, record_id_user= user_record[0].record_id) #slab_slice = SliceSenslab(slice_hrn = hrn, oar_job_id = job_id, \ #record_id_slice=slice_rec.record_id, record_id_user= user_record[0].record_id) logger.debug("============SLABPOSTGRES \t add_job slab_slice %s" %(slab_slice)) slab_dbsession.add(slab_slice) slab_slice.node_list = nodes slab_dbsession.commit() else: return def delete_job (self, hrn, job_id): #slab_slice = slab_dbsession.query(JobSenslab).filter_by(slice_hrn = hrn).filter_by(oar_job_id =job_id).delete() #slab_dbsession.delete(slab_slice) slab_dbsession.commit() #Updates the job_id and the nodes list #The nodes list is never erased. def find (self, name = None, filter_dict = None): print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES find filter_dict %s"%(filter_dict) #Filter_by can not handle more than one argument, hence these functions def filter_id_user(query, user_id): print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES find filter_id_user" return query.filter_by(record_id_user = user_id) def filter_job(query, job): print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES find filter_job " return query.filter_by(oar_job_id = job) def filer_id_slice (query, id_slice): print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES find filer_id_slice" return query.filter_by(record_id_slice = id_slice) def filter_slice_hrn(query, hrn): print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES find filter_slice_hrn" return query.filter_by(slice_hrn = hrn) extended_filter = {'record_id_user': filter_id_user, 'oar_job_id':filter_job, 'record_id_slice': filer_id_slice, 'slice_hrn': filter_slice_hrn} Q = slab_dbsession.query(SliceSenslab) if filter_dict is not None: for k in filter_dict: try: newQ= extended_filter[k](Q, filter_dict[k]) Q = newQ except KeyError: print>>sys.stderr, "\r\n \t\t FFFFFFFFFFFFFFFFUUUUUUUUFUFUFU!!!!!!!!" print>>sys.stderr, " HEEEEEEEEEEEEY %s " %(Q.first()) rec = Q.first() print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES find rec %s" %(rec) return dict(zip(['record_id_user','oar_job_id', 'record_id_slice','slice_hrn'],[rec.record_id_user,rec.oar_job_id,rec.record_id_slice, rec.slice_hrn])) #reclist = [] ##for rec in Q.all(): #reclist.append(dict(zip(['record_id_user','oar_job_id', 'record_id_slice','slice_hrn'],[rec.record_id_user,rec.oar_job_id,rec.record_id_slice, rec.slice_hrn]))) #return reclist from sfa.util.config import Config slab_alchemy= SlabDB(Config()) slab_engine=slab_alchemy.slab_engine slab_dbsession=slab_alchemy.session()