2 #import psycopg2.extensions
3 #psycopg2.extensions.register_type(psycopg2.extensions.UNICODE)
4 ## UNICODEARRAY not exported yet
5 #psycopg2.extensions.register_type(psycopg2._psycopg.UNICODEARRAY)
6 from sfa.util.config import Config
7 #from sfa.storage.table import SfaTable
8 from sfa.util.sfalogging import logger
9 # allow to run sfa2wsdl if this is missing (for mac)
12 #except: print >> sys.stderr, "WARNING, could not import pgdb"
13 from sqlalchemy import create_engine
14 from sqlalchemy.orm import sessionmaker
16 from sqlalchemy import Column, Integer, String, DateTime
17 from sqlalchemy import Table, Column, MetaData, join, ForeignKey
18 import sfa.storage.model as model
20 from sqlalchemy.ext.declarative import declarative_base
21 from sqlalchemy.orm import relationship, backref
24 from sqlalchemy import MetaData, Table
25 from sqlalchemy.exc import NoSuchTableError
27 #Dict holding the columns names of the table as keys
28 #and their type, used for creation of the table
29 slice_table = {'record_id_user':'integer PRIMARY KEY references X ON DELETE CASCADE ON UPDATE CASCADE','oar_job_id':'integer DEFAULT -1', 'record_id_slice':'integer', 'slice_hrn':'text NOT NULL'}
31 #Dict with all the specific senslab tables
32 tablenames_dict = {'slice_senslab': slice_table}
34 ##############################
38 SlabBase = declarative_base()
43 class SlabSliceDB (SlabBase):
44 __tablename__ = 'slice_senslab'
45 record_id_user = Column(Integer, primary_key=True)
46 oar_job_id = Column( Integer,default = -1)
47 record_id_slice = Column(Integer)
48 slice_hrn = Column(String,nullable = False)
50 def __init__ (self, slice_hrn =None, oar_job_id=None, record_id_slice=None, record_id_user= None):
52 self.record_id_slice = record_id_slice
54 self.slice_hrn = slice_hrn
56 self.oar_job_id = oar_job_id
58 self.slice_hrn = slice_hrn
60 self.record_id_user= record_id_user
63 result="<Record id user =%s, slice hrn=%s, oar_job id=%s,Record id slice =%s" % \
64 (self.record_id_user, self.slice_hrn, self.oar_job_id, self.record_id_slice)
72 def __init__(self,config):
73 #self.config = Config()
74 #self.connection = None
75 self.sl_base = SlabBase
76 #self.init_create_query()
79 # will be created lazily on-demand
80 self.slab_session = None
81 # the former PostgreSQL.py used the psycopg2 directly and was doing
82 #self.connection.set_client_encoding("UNICODE")
83 # it's unclear how to achieve this in sqlalchemy, nor if it's needed at all
84 # http://www.sqlalchemy.org/docs/dialects/postgresql.html#unicode
85 # we indeed have /var/lib/pgsql/data/postgresql.conf where
86 # this setting is unset, it might be an angle to tweak that if need be
87 # try a unix socket first - omitting the hostname does the trick
88 unix_url = "postgresql+psycopg2://%s:%s@:%s/%s"%\
89 (config.SFA_DB_USER,config.SFA_DB_PASSWORD,config.SFA_DB_PORT,dbname)
90 print >>sys.stderr, " \r\n \r\n SLAPOSTGRES INIT unix_url %s" %(unix_url)
91 # the TCP fallback method
92 tcp_url = "postgresql+psycopg2://%s:%s@%s:%s/%s"%\
93 (config.SFA_DB_USER,config.SFA_DB_PASSWORD,config.SFA_DB_HOST,config.SFA_DB_PORT,dbname)
94 for url in [ unix_url, tcp_url ] :
96 self.slab_engine = create_engine (url,echo_pool=True,echo=True)
102 self.slab_engine=None
103 raise Exception,"Could not connect to database"
106 self.slab_engine.execute ("select 1").scalar()
110 if self.slab_session is None:
111 Session=sessionmaker ()
112 self.slab_session=Session(bind=self.slab_engine)
113 return self.slab_session
117 if self.connection is None:
118 # (Re)initialize database connection
121 # Try UNIX socket first
122 self.connection = psycopg2.connect(user = 'sfa',
125 #self.connection = psycopg2.connect(user = self.config.SFA_PLC_DB_USER,
126 #password = self.config.SFA_PLC_DB_PASSWORD,
127 #database = self.config.SFA_PLC_DB_NAME)
128 except psycopg2.OperationalError:
130 self.connection = psycopg2.connect(user = self.config.SFA_PLC_DB_USER,
131 password = self.config.SFA_PLC_DB_PASSWORD,
132 database = self.config.SFA_PLC_DB_NAME,
133 host = self.config.SFA_PLC_DB_HOST,
134 port = self.config.SFA_PLC_DB_PORT)
135 self.connection.set_client_encoding("UNICODE")
137 self.connection = pgdb.connect(user = self.config.SFA_PLC_DB_USER,
138 password = self.config.SFA_PLC_DB_PASSWORD,
139 host = "%s:%d" % (self.config.SFA_PLC_DB_HOST, self.config.SFA_PLC_DB_PORT),
140 database = self.config.SFA_PLC_DB_NAME)
142 return self.connection.cursor()
144 #Close connection to database
146 if self.connection is not None:
147 self.connection.close()
148 self.connection = None
150 def selectall(self, query, hashref = True, key_field = None):
152 Return each row as a dictionary keyed on field name (like DBI
153 selectrow_hashref()). If key_field is specified, return rows
154 as a dictionary keyed on the specified field (like DBI
155 selectall_hashref()).
158 cursor = self.cursor()
159 cursor.execute(query)
160 rows = cursor.fetchall()
162 self.connection.commit()
164 if hashref or key_field is not None:
165 # Return each row as a dictionary keyed on field name
166 # (like DBI selectrow_hashref()).
167 labels = [column[0] for column in cursor.description]
168 rows = [dict(zip(labels, row)) for row in rows]
170 if key_field is not None and key_field in labels:
171 # Return rows as a dictionary keyed on the specified field
172 # (like DBI selectall_hashref()).
173 return dict([(row[key_field], row) for row in rows])
178 def exists(self, tablename):
180 Checks if the table specified as tablename exists.
186 metadata = MetaData (bind=self.slab_engine)
187 table=Table (tablename, metadata, autoload=True)
190 except NoSuchTableError:
191 print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES EXISTS NOPE! tablename %s " %(tablename)
193 ##mark = self.cursor()
194 #sql = "SELECT * from pg_tables"
196 ##rows = mark.fetchall()
198 ##labels = [column[0] for column in mark.description]
199 ##rows = [dict(zip(labels, row)) for row in rows]
200 #rows = self.selectall(sql)
201 #rows = filter(lambda row: row['tablename'].startswith(tablename), rows)
204 #print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES EXISTS NOPE ! tablename %s " %(tablename)
207 def createtable(self, tablename ):
209 Creates the specifed table. Uses the global dictionnary holding the tablenames and
213 #metadata = MetaData (bind=engine)
214 #table=Table (tablename, metadata, autoload=True)
215 #records = Table ( 'records', SlabBase.metadata,autoload=True )
216 #records = Table ( 'records', SlabBase.metadata,Column ('record_id', Integer, primary_key=True), )
217 print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES createtable SlabBase.metadata.sorted_tables %s \r\n engine %s" %(SlabBase.metadata.sorted_tables , slab_engine)
218 #table.create(bind =engine)
219 SlabBase.metadata.create_all(slab_engine)
225 def insert(self, table, columns,values):
227 Inserts data (values) into the columns of the specified table.
231 statement = "INSERT INTO " + table + \
232 "(" + ",".join(columns) + ") " + \
233 "VALUES(" + ", ".join(values) + ");"
235 mark.execute(statement)
236 self.connection.commit()
241 #def insert_slab_slice(self, person_rec):
243 #Inserts information about a user and his slice into the slice table.
246 #sfatable = SfaTable()
247 #keys = slice_table.keys()
249 ##returns a list of records from the sfa table (dicts)
250 ##the filters specified will return only one matching record, into a list of dicts
251 ##Finds the slice associated with the user (Senslabs slices hrns contains the user hrn)
253 #userrecord = sfatable.find({'hrn': person_rec['hrn'], 'type':'user'})
254 #slicerec = sfatable.find({'hrn': person_rec['hrn']+'_slice', 'type':'slice'})
256 #if (isinstance (userrecord, list)):
257 #userrecord = userrecord[0]
258 #if (isinstance (slicerec, list)):
259 #slicerec = slicerec[0]
262 #values = [ str(oar_dflt_jobid), ' \''+ str(slicerec['hrn']) + '\'', str(userrecord['record_id']), str( slicerec['record_id'])]
264 #self.insert('slice_senslab', keys, values)
266 #logger.error("Trying to import a not senslab slice")
270 def update(self, table, column_names, values, whereclause, valueclause):
272 Updates a record in a given table.
275 #Creates the values string for the update SQL command
276 vclause = valueclause
277 if len(column_names) is not len(values):
281 valuesdict = dict(zip(column_names,values))
282 for k in valuesdict.keys():
283 valuesdict[k] = str(valuesdict[k])
284 #v = ' \''+ str(k) + '\''+ '='+' \''+ valuesdict[k]+'\''
285 v = str(k) + '=' + valuesdict[k]
287 if isinstance(vclause,str):
288 vclause = '\''+ vclause + '\''
289 statement = "UPDATE %s SET %s WHERE %s = %s" % \
290 (table, ", ".join(valueslist), whereclause, vclause)
291 print>>sys.stderr,"\r\n \r\n SLABPOSTGRES.PY update statement %s valuesdict %s valueslist %s" %(statement,valuesdict,valueslist)
293 mark.execute(statement)
294 self.connection.commit()
300 #def update_senslab_slice(self, slice_rec):
301 #sfatable = SfaTable()
302 #hrn = str(slice_rec['hrn'])
303 #userhrn = hrn.rstrip('_slice')
304 #userrecord = sfatable.find({'hrn': userhrn, 'type':'user'})
305 #print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES.PY update_senslab_slice : userrecord %s slice_rec %s userhrn %s" %( userrecord, slice_rec, userhrn)
306 #if (isinstance (userrecord, list)):
307 #userrecord = userrecord[0]
308 #columns = [ 'record_id_user', 'oar_job_id']
309 #values = [slice_rec['record_id_user'],slice_rec['oar_job_id']]
310 #self.update('slice',columns, values,'record_id_slice', slice_rec['record_id_slice'])
314 def find(self, tablename,record_filter = None, columns=None):
315 print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES.PY find : record_filter %s %s columns %s %s" %( record_filter , type(record_filter),columns , type(columns))
319 columns = ",".join(columns)
320 sql = "SELECT %s FROM %s WHERE True " % (columns, tablename)
322 #if isinstance(record_filter, (list, tuple, set)):
323 #ints = filter(lambda x: isinstance(x, (int, long)), record_filter)
324 #strs = filter(lambda x: isinstance(x, StringTypes), record_filter)
325 #record_filter = Filter(SfaRecord.all_fields, {'record_id': ints, 'hrn': strs})
326 #sql += "AND (%s) %s " % record_filter.sql("OR")
327 #elif isinstance(record_filter, dict):
328 #record_filter = Filter(SfaRecord.all_fields, record_filter)
329 #sql += " AND (%s) %s" % record_filter.sql("AND")
330 #elif isinstance(record_filter, StringTypes):
331 #record_filter = Filter(SfaRecord.all_fields, {'hrn':[record_filter]})
332 #sql += " AND (%s) %s" % record_filter.sql("AND")
333 #elif isinstance(record_filter, int):
334 #record_filter = Filter(SfaRecord.all_fields, {'record_id':[record_filter]})
335 #sql += " AND (%s) %s" % record_filter.sql("AND")
337 if isinstance(record_filter, dict):
338 for k in record_filter.keys():
339 #sql += "AND "+' \''+ str(k) + '\''+ '='+' \''+ str(record_filter[k])+'\''
340 #sql += "AND "+ str(k) + '=' + str(record_filter[k])
341 sql += "AND "+ str(k) +'='+' \''+ str(record_filter[k])+'\''
342 elif isinstance(record_filter, str):
343 sql += "AND slice_hrn ="+ ' \''+record_filter+'\''
345 #elif isinstance(record_filter, int):
346 #record_filter = Filter(SfaRecord.all_fields, {'record_id':[record_filter]})
347 #sql += " AND (%s) %s" % record_filter.sql("AND")
349 print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES.PY find : sql %s record_filter %s %s" %(sql, record_filter , type(record_filter))
350 results = self.selectall(sql)
351 if isinstance(results, dict):
357 from sfa.util.config import Config
359 slab_alchemy= SlabDB(Config())
360 slab_engine=slab_alchemy.slab_engine
361 slab_dbsession=slab_alchemy.session()