X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=sfa%2Fsenslab%2Fslabpostgres.py;h=c3270248198fe8ba120de2eafe59e102d59f8d87;hb=e71c5ab5bf1aaa5428df793d5c5e8e602994be5d;hp=84c1a77096d579c1695a0d62e0e120fe9fb2c249;hpb=3d51e29695f79b143974f5cf7b2e104d89626ba4;p=sfa.git diff --git a/sfa/senslab/slabpostgres.py b/sfa/senslab/slabpostgres.py index 84c1a770..c3270248 100644 --- a/sfa/senslab/slabpostgres.py +++ b/sfa/senslab/slabpostgres.py @@ -1,61 +1,149 @@ -import psycopg2 -import psycopg2.extensions -psycopg2.extensions.register_type(psycopg2.extensions.UNICODE) -# UNICODEARRAY not exported yet -psycopg2.extensions.register_type(psycopg2._psycopg.UNICODEARRAY) +import sys + +from sqlalchemy import create_engine, and_ +from sqlalchemy.orm import sessionmaker + from sfa.util.config import Config -from sfa.storage.table import SfaTable from sfa.util.sfalogging import logger -# allow to run sfa2wsdl if this is missing (for mac) -import sys -try: import pgdb -except: print >> sys.stderr, "WARNING, could not import pgdb" + +from sqlalchemy import Column, Integer, String, DateTime +from sqlalchemy import Table, Column, MetaData, join, ForeignKey +import sfa.storage.model as model + +from sqlalchemy.ext.declarative import declarative_base +from sqlalchemy.orm import relationship, backref + + +from sqlalchemy.dialects import postgresql + +from sqlalchemy import MetaData, Table +from sqlalchemy.exc import NoSuchTableError + +from sqlalchemy import String #Dict holding the columns names of the table as keys #and their type, used for creation of the table slice_table = {'record_id_user':'integer PRIMARY KEY references X ON DELETE CASCADE ON UPDATE CASCADE','oar_job_id':'integer DEFAULT -1', 'record_id_slice':'integer', 'slice_hrn':'text NOT NULL'} #Dict with all the specific senslab tables -tablenames_dict = {'slice': slice_table} +tablenames_dict = {'slice_senslab': slice_table} + +############################## + + +SlabBase = declarative_base() + + + + +class SliceSenslab (SlabBase): + __tablename__ = 'slice_senslab' + #record_id_user = Column(Integer, primary_key=True) + slice_hrn = Column(String,primary_key=True) + peer_authority = Column( String,nullable = True) + record_id_slice = Column(Integer) + record_id_user = Column(Integer) + oar_job_id = Column( Integer,default = -1) + node_list = Column(postgresql.ARRAY(String), nullable =True) + + def __init__ (self, slice_hrn =None, oar_job_id=None, record_id_slice=None, record_id_user= None,peer_authority=None): + self.node_list = [] + if record_id_slice: + self.record_id_slice = record_id_slice + if slice_hrn: + self.slice_hrn = slice_hrn + if oar_job_id: + self.oar_job_id = oar_job_id + if slice_hrn: + self.slice_hrn = slice_hrn + if record_id_user: + self.record_id_user= record_id_user + if peer_authority: + self.peer_authority = peer_authority + + + def __repr__(self): + result=">sys.stderr, " \r\n \r\n SLAPOSTGRES INIT unix_url %s" %(unix_url) + # the TCP fallback method + tcp_url = "postgresql+psycopg2://%s:%s@%s:%s/%s"%\ + (config.SFA_DB_USER,config.SFA_DB_PASSWORD,config.SFA_DB_HOST,config.SFA_DB_PORT,dbname) + for url in [ unix_url, tcp_url ] : + try: + self.slab_engine = create_engine (url,echo_pool = l_echo_pool, echo = l_echo) + self.check() + self.url=url + return + except: + pass + self.slab_engine=None + raise Exception,"Could not connect to database" + + + + def check (self): + self.slab_engine.execute ("select 1").scalar() - def init_create_query(self): - sfatable = SfaTable() - slice_table['record_id_user'] = slice_table['record_id_user'].replace("X",sfatable.tablename) - print sys.stderr, " \r\n \r\n slice_table %s ",slice_table - def cursor(self): - if self.connection is None: - # (Re)initialize database connection - if psycopg2: - try: - # Try UNIX socket first - self.connection = psycopg2.connect(user = 'sfa', - password = 'sfa', - database = 'sfa') - #self.connection = psycopg2.connect(user = self.config.SFA_PLC_DB_USER, - #password = self.config.SFA_PLC_DB_PASSWORD, - #database = self.config.SFA_PLC_DB_NAME) - except psycopg2.OperationalError: - # Fall back on TCP - self.connection = psycopg2.connect(user = self.config.SFA_PLC_DB_USER, - password = self.config.SFA_PLC_DB_PASSWORD, - database = self.config.SFA_PLC_DB_NAME, - host = self.config.SFA_PLC_DB_HOST, - port = self.config.SFA_PLC_DB_PORT) - self.connection.set_client_encoding("UNICODE") - else: - self.connection = pgdb.connect(user = self.config.SFA_PLC_DB_USER, - password = self.config.SFA_PLC_DB_PASSWORD, - host = "%s:%d" % (self.config.SFA_PLC_DB_HOST, self.config.SFA_PLC_DB_PORT), - database = self.config.SFA_PLC_DB_NAME) - - return self.connection.cursor() + + def session (self): + if self.slab_session is None: + Session=sessionmaker () + self.slab_session=Session(bind=self.slab_engine) + return self.slab_session + + + #Close connection to database def close(self): @@ -63,32 +151,7 @@ class SlabDB: self.connection.close() self.connection = None - def selectall(self, query, hashref = True, key_field = None): - """ - Return each row as a dictionary keyed on field name (like DBI - selectrow_hashref()). If key_field is specified, return rows - as a dictionary keyed on the specified field (like DBI - selectall_hashref()). - - """ - cursor = self.cursor() - cursor.execute(query) - rows = cursor.fetchall() - cursor.close() - self.connection.commit() - - if hashref or key_field is not None: - # Return each row as a dictionary keyed on field name - # (like DBI selectrow_hashref()). - labels = [column[0] for column in cursor.description] - rows = [dict(zip(labels, row)) for row in rows] - - if key_field is not None and key_field in labels: - # Return rows as a dictionary keyed on the specified field - # (like DBI selectall_hashref()). - return dict([(row[key_field], row) for row in rows]) - else: - return rows + def exists(self, tablename): @@ -96,18 +159,16 @@ class SlabDB: Checks if the table specified as tablename exists. """ - #mark = self.cursor() - sql = "SELECT * from pg_tables" - #mark.execute(sql) - #rows = mark.fetchall() - #mark.close() - #labels = [column[0] for column in mark.description] - #rows = [dict(zip(labels, row)) for row in rows] - rows = self.selectall(sql) - rows = filter(lambda row: row['tablename'].startswith(tablename), rows) - if rows: + + try: + metadata = MetaData (bind=self.slab_engine) + table=Table (tablename, metadata, autoload=True) + return True - return False + except NoSuchTableError: + print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES EXISTS NOPE! tablename %s " %(tablename) + return False + def createtable(self, tablename ): """ @@ -115,172 +176,71 @@ class SlabDB: the table schema. """ - mark = self.cursor() - tablelist =[] - if tablename not in tablenames_dict: - logger.error("Tablename unknown - creation failed") - return - - T = tablenames_dict[tablename] - - for k in T.keys(): - tmp = str(k) +' ' + T[k] - tablelist.append(tmp) - - end_of_statement = ",".join(tablelist) - - statement = "CREATE TABLE " + tablename + " ("+ end_of_statement +");" - - #template = "CREATE INDEX %s_%s_idx ON %s (%s);" - #indexes = [template % ( self.tablename, field, self.tablename, field) \ - #for field in ['hrn', 'type', 'authority', 'peer_authority', 'pointer']] - # IF EXISTS doenst exist in postgres < 8.2 - try: - mark.execute('DROP TABLE IF EXISTS ' + tablename +';') - except: - try: - mark.execute('DROP TABLE' + tablename +';') - except: - pass - - mark.execute(statement) - #for index in indexes: - #self.db.do(index) - self.connection.commit() - mark.close() - self.close() + + print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES createtable SlabBase.metadata.sorted_tables %s \r\n engine %s" %(SlabBase.metadata.sorted_tables , slab_engine) + SlabBase.metadata.create_all(slab_engine) return + #Updates the job_id and the nodes list + #The nodes list is never erased. + def update_job(self, hrn, job_id= None, nodes = None ): + slice_rec = slab_dbsession.query(SliceSenslab).filter_by(slice_hrn = hrn).first() + print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES update_job slice_rec %s"%(slice_rec) + if job_id is not None: + slice_rec.oar_job_id = job_id + if nodes is not None : + slice_rec.node_list = nodes + slab_dbsession.commit() + def find (self, name = None, filter_dict = None): + print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES find filter_dict %s"%(filter_dict) - - def insert(self, table, columns,values): - """ - Inserts data (values) into the columns of the specified table. - - """ - mark = self.cursor() - statement = "INSERT INTO " + table + \ - "(" + ",".join(columns) + ") " + \ - "VALUES(" + ", ".join(values) + ");" - - mark.execute(statement) - self.connection.commit() - mark.close() - self.close() - return - - def insert_slab_slice(self, person_rec): - """ - Inserts information about a user and his slice into the slice table. - - """ - sfatable = SfaTable() - keys = slice_table.keys() + #Filter_by can not handle more than one argument, hence these functions + def filter_id_user(query, user_id): + print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES find filter_id_user" + return query.filter_by(record_id_user = user_id) - #returns a list of records from the sfa table (dicts) - #the filters specified will return only one matching record, into a list of dicts - #Finds the slice associated with the user (Senslabs slices hrns contains the user hrn) - - userrecord = sfatable.find({'hrn': person_rec['hrn'], 'type':'user'}) - slicerec = sfatable.find({'hrn': person_rec['hrn']+'_slice', 'type':'slice'}) - if slicerec : - if (isinstance (userrecord, list)): - userrecord = userrecord[0] - if (isinstance (slicerec, list)): - slicerec = slicerec[0] - - oar_dflt_jobid = -1 - values = [ str(oar_dflt_jobid), ' \''+ str(slicerec['hrn']) + '\'', str(userrecord['record_id']), str( slicerec['record_id'])] - - self.insert('slice', keys, values) - else : - logger.error("Trying to import a not senslab slice") - return + def filter_job(query, job): + print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES find filter_job " + return query.filter_by(oar_job_id = job) + def filer_id_slice (query, id_slice): + print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES find filer_id_slice" + return query.filter_by(record_id_slice = id_slice) - def update(self, table, column_names, values, whereclause, valueclause): - """ - Updates a record in a given table. - - """ - #Creates the values string for the update SQL command - vclause = valueclause - if len(column_names) is not len(values): - return - else: - valueslist = [] - valuesdict = dict(zip(column_names,values)) - for k in valuesdict.keys(): - valuesdict[k] = str(valuesdict[k]) - #v = ' \''+ str(k) + '\''+ '='+' \''+ valuesdict[k]+'\'' - v = str(k) + '=' + valuesdict[k] - valueslist.append(v) - if isinstance(vclause,str): - vclause = '\''+ vclause + '\'' - statement = "UPDATE %s SET %s WHERE %s = %s" % \ - (table, ", ".join(valueslist), whereclause, vclause) - print>>sys.stderr,"\r\n \r\n SLABPOSTGRES.PY update statement %s valuesdict %s valueslist %s" %(statement,valuesdict,valueslist) - mark = self.cursor() - mark.execute(statement) - self.connection.commit() - mark.close() - self.close() - - return - - def update_senslab_slice(self, slice_rec): - sfatable = SfaTable() - hrn = str(slice_rec['hrn']) - userhrn = hrn.rstrip('_slice') - userrecord = sfatable.find({'hrn': userhrn, 'type':'user'}) - print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES.PY update_senslab_slice : userrecord %s slice_rec %s userhrn %s" %( userrecord, slice_rec, userhrn) - if (isinstance (userrecord, list)): - userrecord = userrecord[0] - columns = [ 'record_id_user', 'oar_job_id'] - values = [slice_rec['record_id_user'],slice_rec['oar_job_id']] - self.update('slice',columns, values,'record_id_slice', slice_rec['record_id_slice']) - return + def filter_slice_hrn(query, hrn): + print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES find filter_slice_hrn" + return query.filter_by(slice_hrn = hrn) - - def find(self, tablename,record_filter = None, columns=None): - print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES.PY find : record_filter %s %s columns %s %s" %( record_filter , type(record_filter),columns , type(columns)) - if not columns: - columns = "*" - else: - columns = ",".join(columns) - sql = "SELECT %s FROM %s WHERE True " % (columns, tablename) - #if isinstance(record_filter, (list, tuple, set)): - #ints = filter(lambda x: isinstance(x, (int, long)), record_filter) - #strs = filter(lambda x: isinstance(x, StringTypes), record_filter) - #record_filter = Filter(SfaRecord.all_fields, {'record_id': ints, 'hrn': strs}) - #sql += "AND (%s) %s " % record_filter.sql("OR") - #elif isinstance(record_filter, dict): - #record_filter = Filter(SfaRecord.all_fields, record_filter) - #sql += " AND (%s) %s" % record_filter.sql("AND") - #elif isinstance(record_filter, StringTypes): - #record_filter = Filter(SfaRecord.all_fields, {'hrn':[record_filter]}) - #sql += " AND (%s) %s" % record_filter.sql("AND") - #elif isinstance(record_filter, int): - #record_filter = Filter(SfaRecord.all_fields, {'record_id':[record_filter]}) - #sql += " AND (%s) %s" % record_filter.sql("AND") + extended_filter = {'record_id_user': filter_id_user, + 'oar_job_id':filter_job, + 'record_id_slice': filer_id_slice, + 'slice_hrn': filter_slice_hrn} + + Q = slab_dbsession.query(SliceSenslab) + + if filter_dict is not None: + for k in filter_dict: + try: + newQ= extended_filter[k](Q, filter_dict[k]) + Q = newQ + except KeyError: + print>>sys.stderr, "\r\n \t\t FFFFFFFFFFFFFFFFUUUUUUUUFUFUFU!!!!!!!!" + print>>sys.stderr, " HEEEEEEEEEEEEY %s " %(Q.first()) + rec = Q.first() + print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES find rec %s" %(rec) + return dict(zip(['record_id_user','oar_job_id', 'record_id_slice','slice_hrn'],[rec.record_id_user,rec.oar_job_id,rec.record_id_slice, rec.slice_hrn])) + #reclist = [] + ##for rec in Q.all(): + #reclist.append(dict(zip(['record_id_user','oar_job_id', 'record_id_slice','slice_hrn'],[rec.record_id_user,rec.oar_job_id,rec.record_id_slice, rec.slice_hrn]))) + #return reclist + - if isinstance(record_filter, dict): - for k in record_filter.keys(): - #sql += "AND "+' \''+ str(k) + '\''+ '='+' \''+ str(record_filter[k])+'\'' - #sql += "AND "+ str(k) + '=' + str(record_filter[k]) - sql += "AND "+ str(k) +'='+' \''+ str(record_filter[k])+'\'' - elif isinstance(record_filter, str): - sql += "AND slice_hrn ="+ ' \''+record_filter+'\'' - #elif isinstance(record_filter, int): - #record_filter = Filter(SfaRecord.all_fields, {'record_id':[record_filter]}) - #sql += " AND (%s) %s" % record_filter.sql("AND") - sql += ";" - print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES.PY find : sql %s record_filter %s %s" %(sql, record_filter , type(record_filter)) - results = self.selectall(sql) - if isinstance(results, dict): - results = [results] - return results - + +from sfa.util.config import Config + +slab_alchemy= SlabDB(Config()) +slab_engine=slab_alchemy.slab_engine +slab_dbsession=slab_alchemy.session()