-###########################################################################
-# Copyright (C) 2011 by
-# <savakian@sfa2.grenoble.senslab.info>
-#
-# Copyright: See COPYING file that comes with this distribution
-#
-###########################################################################
-import psycopg2
-import psycopg2.extensions
-psycopg2.extensions.register_type(psycopg2.extensions.UNICODE)
-# UNICODEARRAY not exported yet
-psycopg2.extensions.register_type(psycopg2._psycopg.UNICODEARRAY)
-from sfa.util.config import Config
-from sfa.util.table import SfaTable
-# allow to run sfa2wsdl if this is missing (for mac)
import sys
-try: import pgdb
-except: print >> sys.stderr, "WARNING, could not import pgdb"
-slice_table = {'oar_job_id':'integer DEFAULT -1', 'record_id_user':'integer PRIMARY KEY references sfa ON DELETE CASCADE ON UPDATE CASCADE', 'record_id_slice':'integer', 'slice_hrn':'text NOT NULL'}
-tablenames_dict = {'slice': slice_table}
+from sqlalchemy import create_engine, and_
+from sqlalchemy.orm import sessionmaker
-class SlabDB:
- def __init__(self):
- self.config = Config()
- self.debug = False
+from sfa.util.config import Config
+from sfa.util.sfalogging import logger
- self.connection = None
+from sqlalchemy import Column, Integer, String, DateTime
+from sqlalchemy import Table, Column, MetaData, join, ForeignKey
+import sfa.storage.model as model
- #@handle_exception
- def cursor(self):
- if self.connection is None:
- # (Re)initialize database connection
- if psycopg2:
- try:
- # Try UNIX socket first
- self.connection = psycopg2.connect(user = 'sfa',
- password = 'sfa',
- database = 'sfa')
- #self.connection = psycopg2.connect(user = self.config.SFA_PLC_DB_USER,
- #password = self.config.SFA_PLC_DB_PASSWORD,
- #database = self.config.SFA_PLC_DB_NAME)
- except psycopg2.OperationalError:
- # Fall back on TCP
- self.connection = psycopg2.connect(user = self.config.SFA_PLC_DB_USER,
- password = self.config.SFA_PLC_DB_PASSWORD,
- database = self.config.SFA_PLC_DB_NAME,
- host = self.config.SFA_PLC_DB_HOST,
- port = self.config.SFA_PLC_DB_PORT)
- self.connection.set_client_encoding("UNICODE")
- else:
- self.connection = pgdb.connect(user = self.config.SFA_PLC_DB_USER,
- password = self.config.SFA_PLC_DB_PASSWORD,
- host = "%s:%d" % (self.config.SFA_PLC_DB_HOST, self.config.SFA_PLC_DB_PORT),
- database = self.config.SFA_PLC_DB_NAME)
-
- (self.rowcount, self.description, self.lastrowid) = \
- (None, None, None)
-
- return self.connection.cursor()
+from sqlalchemy.ext.declarative import declarative_base
+from sqlalchemy.orm import relationship, backref
+
+
+from sqlalchemy.dialects import postgresql
+
+from sqlalchemy import MetaData, Table
+from sqlalchemy.exc import NoSuchTableError
+
+from sqlalchemy import String
+
+#Dict holding the columns names of the table as keys
+#and their type, used for creation of the table
+slice_table = {'record_id_user':'integer PRIMARY KEY references X ON DELETE CASCADE ON UPDATE CASCADE','oar_job_id':'integer DEFAULT -1', 'record_id_slice':'integer', 'slice_hrn':'text NOT NULL'}
+
+#Dict with all the specific senslab tables
+tablenames_dict = {'slice_senslab': slice_table}
+
+##############################
+
+
+
+SlabBase = declarative_base()
+
+
+
+
+class SliceSenslab (SlabBase):
+ __tablename__ = 'slice_senslab'
+ record_id_user = Column(Integer, primary_key=True)
+ oar_job_id = Column( Integer,default = -1)
+ record_id_slice = Column(Integer)
+ slice_hrn = Column(String,nullable = False)
+ node_list = Column(postgresql.ARRAY(String), nullable =True)
+
+ def __init__ (self, slice_hrn =None, oar_job_id=None, record_id_slice=None, record_id_user= None):
+ self.node_list = []
+ if record_id_slice:
+ self.record_id_slice = record_id_slice
+ if slice_hrn:
+ self.slice_hrn = slice_hrn
+ if oar_job_id:
+ self.oar_job_id = oar_job_id
+ if slice_hrn:
+ self.slice_hrn = slice_hrn
+ if record_id_user:
+ self.record_id_user= record_id_user
+
+
+ def __repr__(self):
+ result="<Record id user =%s, slice hrn=%s, oar_job id=%s,Record id slice =%s node_list =%s" % \
+ (self.record_id_user, self.slice_hrn, self.oar_job_id, self.record_id_slice, self.node_list)
+ result += ">"
+ return result
+
+ def dumpquerytodict(self):
+ dict = {'slice_hrn':self.slice_hrn,
+ 'record_id':self.record_id_slice,
+ 'record_id_user':self.record_id_user,
+ 'oar_job_id':self.oar_job_id,
+ 'record_id_slice':self.record_id_slice,
+ 'slice_hrn':self.slice_hrn,
+ 'node_list':self.node_list}
+ return dict
+#class PeerSenslab(SlabBase):
+ #__tablename__ = 'peer_senslab'
+ #peername = Column(String, nullable = False)
+ #peerid = Column( Integer,primary_key=True)
+
+ #def __init__ (self,peername = None ):
+ #if peername:
+ #self.peername = peername
+
+
+ #def __repr__(self):
+ #result="<Peer id =%s, Peer name =%s" % (self.peerid, self.peername)
+ #result += ">"
+ #return result
+
+class SlabDB:
+ def __init__(self,config):
+ self.sl_base = SlabBase
+ dbname="slab_sfa"
+ # will be created lazily on-demand
+ self.slab_session = None
+ # the former PostgreSQL.py used the psycopg2 directly and was doing
+ #self.connection.set_client_encoding("UNICODE")
+ # it's unclear how to achieve this in sqlalchemy, nor if it's needed at all
+ # http://www.sqlalchemy.org/docs/dialects/postgresql.html#unicode
+ # we indeed have /var/lib/pgsql/data/postgresql.conf where
+ # this setting is unset, it might be an angle to tweak that if need be
+ # try a unix socket first - omitting the hostname does the trick
+ unix_url = "postgresql+psycopg2://%s:%s@:%s/%s"%\
+ (config.SFA_DB_USER,config.SFA_DB_PASSWORD,config.SFA_DB_PORT,dbname)
+ print >>sys.stderr, " \r\n \r\n SLAPOSTGRES INIT unix_url %s" %(unix_url)
+ # the TCP fallback method
+ tcp_url = "postgresql+psycopg2://%s:%s@%s:%s/%s"%\
+ (config.SFA_DB_USER,config.SFA_DB_PASSWORD,config.SFA_DB_HOST,config.SFA_DB_PORT,dbname)
+ for url in [ unix_url, tcp_url ] :
+ try:
+ self.slab_engine = create_engine (url,echo_pool=True,echo=True)
+ self.check()
+ self.url=url
+ return
+ except:
+ pass
+ self.slab_engine=None
+ raise Exception,"Could not connect to database"
+
+
+
+ def check (self):
+ self.slab_engine.execute ("select 1").scalar()
+
+
+ def session (self):
+ if self.slab_session is None:
+ Session=sessionmaker ()
+ self.slab_session=Session(bind=self.slab_engine)
+ return self.slab_session
+
+
+
+
+ #Close connection to database
def close(self):
if self.connection is not None:
self.connection.close()
self.connection = None
+
+
+
def exists(self, tablename):
- mark = self.cursor()
- sql = "SELECT * from pg_tables"
- mark.execute(sql)
- rows = mark.fetchall()
- mark.close()
- labels = [column[0] for column in mark.description]
- rows = [dict(zip(labels, row)) for row in rows]
-
- rows = filter(lambda row: row['tablename'].startswith(tablename), rows)
- if rows:
+ """
+ Checks if the table specified as tablename exists.
+
+ """
+
+ try:
+ metadata = MetaData (bind=self.slab_engine)
+ table=Table (tablename, metadata, autoload=True)
+
return True
- return False
+ except NoSuchTableError:
+ print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES EXISTS NOPE! tablename %s " %(tablename)
+ return False
+
def createtable(self, tablename ):
- mark = self.cursor()
- tablelist =[]
- T = tablenames_dict[tablename]
- for k in T.keys():
- tmp = str(k) +' ' + T[k]
- tablelist.append(tmp)
- end = ",".join(tablelist)
-
- statement = "CREATE TABLE " + tablename + " ("+ end +");"
-
- #template = "CREATE INDEX %s_%s_idx ON %s (%s);"
- #indexes = [template % ( self.tablename, field, self.tablename, field) \
- #for field in ['hrn', 'type', 'authority', 'peer_authority', 'pointer']]
- # IF EXISTS doenst exist in postgres < 8.2
- try:
- mark.execute('DROP TABLE IF EXISTS ' + tablename +';')
- except:
- try:
- mark.execute('DROP TABLE' + tablename +';')
- except:
- pass
-
- mark.execute(statement)
- #for index in indexes:
- #self.db.do(index)
- self.connection.commit()
- mark.close()
- #self.connection.close()
- self.close()
- return
+ """
+ Creates the specifed table. Uses the global dictionnary holding the tablenames and
+ the table schema.
-
- def findRecords(self,table, column, operator, string):
- mark = self.cursor()
+ """
+
+ print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES createtable SlabBase.metadata.sorted_tables %s \r\n engine %s" %(SlabBase.metadata.sorted_tables , slab_engine)
+ SlabBase.metadata.create_all(slab_engine)
+ return
- statement = 'SELECT * FROM ' + table + ' WHERE ' + column + ' ' + operator + ' ' + ' \'' + string +'\''
- mark.execute(statement)
- record = mark.fetchall()
- mark.close()
- self.connection.close()
- return record
-
-
- def insert(self, table, columns,values):
- mark = self.cursor()
- statement = "INSERT INTO " + table + \
- "(" + ",".join(columns) + ") " + \
- "VALUES(" + ", ".join(values) + ");"
-
- #statement = 'INSERT INTO ' + table + ' (' + columns + ') VALUES (' + values + ')'
- print>>sys.stderr, " \r\n insert statement", statement
- mark.execute(statement)
- self.connection.commit()
- mark.close()
- #self.connection.close()
- self.close()
- return
- def insert_slice(self, person_rec):
- sfatable = SfaTable()
- keys = slice_table.keys()
-
- #returns a list of records (dicts)
- #the filters specified will return only one matching record, into a list of dicts
+ def update_job(self, hrn, job_id= None, nodes = None ):
+ slice_rec = slab_dbsession.query(SliceSenslab).filter_by(slice_hrn = hrn).first()
+ print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES update_job slice_rec %s"%(slice_rec)
+ if job_id is not None:
+ slice_rec.oar_job_id = job_id
+ if nodes is not None :
+ slice_rec.node_list = nodes
+ slab_dbsession.commit()
- userrecord = sfatable.find({'hrn': person_rec['hrn'], 'type':'user'})
+ def find (self, name = None, filter_dict = None):
+ print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES find filter_dict %s"%(filter_dict)
- slicerec = sfatable.find({'hrn': person_rec['hrn']+'_slice', 'type':'slice'})
- if (isinstance (userrecord, list)):
- userrecord = userrecord[0]
- if (isinstance (slicerec, list)):
- slicerec = slicerec[0]
+ #Filter_by can not handle more than one argument, hence these functions
+ def filter_id_user(query, user_id):
+ print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES find filter_id_user"
+ return query.filter_by(record_id_user = user_id)
- values = [ '-1', ' \''+ str(slicerec['hrn']) + '\'', str(userrecord['record_id']), str( slicerec['record_id'])]
-
- self.insert('slice', keys, values)
- return
+ def filter_job(query, job):
+ print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES find filter_job "
+ return query.filter_by(oar_job_id = job)
- def update(self, table, column_names, values, whereclause, valueclause):
-
- #Creates the values string for the update SQL command
- if len(column_names) is not len(values):
- return
- else:
- valueslist = []
- valuesdict = dict(zip(column_names,values))
- for k in valuesdict.keys():
- valuesdict[k] = str(valuesdict[k])
- v = ' \''+ str(k) + '\''+ '='+' \''+ valuesdict[k]+'\''
- valueslist.append(v)
-
- statement = "UPDATE %s SET %s WHERE %s = %s" % \
- (table, ", ".join(valueslist), whereclause, valueclause)
- print >>sys.stderr, "\r\n \r\n \t SLABPOSTGRES.PY UPDATE statement ", statement
- mark = self.cursor()
- mark.execute(statement)
- self.connection.commit()
- mark.close()
- self.close()
- #self.connection.close()
- return
-
- def update_slice(self, slice_rec):
- sfatable = SfaTable()
- userhrn = slice_rec['hrn'].strip('_slice')
- userrecords = sfatable.find({'hrn': userhrn, 'type':'user'})
- columns = [ 'record_user_id', 'oar_job_id']
- values = [slice_rec['record_user_id'],slice_rec['oar_job_id']]
- self.update('slice',columns, values,'record_slice_id', slice_rec['record_slice_id'])
- return
+ def filer_id_slice (query, id_slice):
+ print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES find filer_id_slice"
+ return query.filter_by(record_id_slice = id_slice)
+
+ def filter_slice_hrn(query, hrn):
+ print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES find filter_slice_hrn"
+ return query.filter_by(slice_hrn = hrn)
+
+
+ extended_filter = {'record_id_user': filter_id_user,
+ 'oar_job_id':filter_job,
+ 'record_id_slice': filer_id_slice,
+ 'slice_hrn': filter_slice_hrn}
+
+ Q = slab_dbsession.query(SliceSenslab)
+
+ if filter_dict is not None:
+ for k in filter_dict:
+ try:
+ newQ= extended_filter[k](Q, filter_dict[k])
+ Q = newQ
+ except KeyError:
+ print>>sys.stderr, "\r\n \t\t FFFFFFFFFFFFFFFFUUUUUUUUFUFUFU!!!!!!!!"
+ print>>sys.stderr, " HEEEEEEEEEEEEY %s " %(Q.first())
+ rec = Q.first()
+ print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES find rec %s" %(rec)
+ return dict(zip(['record_id_user','oar_job_id', 'record_id_slice','slice_hrn'],[rec.record_id_user,rec.oar_job_id,rec.record_id_slice, rec.slice_hrn]))
+ #reclist = []
+ ##for rec in Q.all():
+ #reclist.append(dict(zip(['record_id_user','oar_job_id', 'record_id_slice','slice_hrn'],[rec.record_id_user,rec.oar_job_id,rec.record_id_slice, rec.slice_hrn])))
+ #print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES find reclist %s" %(reclist)
+ #return reclist
-
\ No newline at end of file
+
+
+from sfa.util.config import Config
+
+slab_alchemy= SlabDB(Config())
+slab_engine=slab_alchemy.slab_engine
+slab_dbsession=slab_alchemy.session()