2 import psycopg2.extensions
3 psycopg2.extensions.register_type(psycopg2.extensions.UNICODE)
4 # UNICODEARRAY not exported yet
5 psycopg2.extensions.register_type(psycopg2._psycopg.UNICODEARRAY)
6 from sfa.util.config import Config
7 from sfa.storage.table import SfaTable
8 from sfa.util.sfalogging import logger
9 # allow to run sfa2wsdl if this is missing (for mac)
12 except: print >> sys.stderr, "WARNING, could not import pgdb"
15 from sqlalchemy import Column, Integer, String, DateTime
16 from sqlalchemy import Table, Column, MetaData, join, ForeignKey
17 from sfa.storage.model import Base
18 from sqlalchemy.ext.declarative import declarative_base
19 from sqlalchemy.orm import relationship, backref
21 from sfa.storage.alchemy import dbsession, engine
22 from sqlalchemy import MetaData, Table
23 from sqlalchemy.exc import NoSuchTableError
25 #Dict holding the columns names of the table as keys
26 #and their type, used for creation of the table
27 slice_table = {'record_id_user':'integer PRIMARY KEY references X ON DELETE CASCADE ON UPDATE CASCADE','oar_job_id':'integer DEFAULT -1', 'record_id_slice':'integer', 'slice_hrn':'text NOT NULL'}
29 #Dict with all the specific senslab tables
30 tablenames_dict = {'slice_senslab': slice_table}
32 ##############################
36 SlabBase = declarative_base(metadata= Base.metadata, bind=engine)
41 class SlabSliceDB (SlabBase):
42 __tablename__ = 'slice_senslab'
43 record_id_user = Column(Integer, ForeignKey("records.record_id"), primary_key=True)
44 oar_job_id = Column( Integer,default = -1)
45 record_id_slice = Column(Integer)
46 slice_hrn = Column(String,nullable = False)
48 def __init__ (self, slice_hrn =None, oar_job_id=None, record_id_slice=None, record_id_user= None):
50 self.record_id_slice = record_id_slice
52 self.slice_hrn = slice_hrn
54 self.oar_job_id = oar_job_id
56 self.slice_hrn = slice_hrn
58 self.record_id_user= record_id_user
65 self.config = Config()
66 self.connection = None
67 self.init_create_query()
69 def init_create_query(self):
71 slice_table['record_id_user'] = slice_table['record_id_user'].replace("X",sfatable.tablename)
72 print sys.stderr, " \r\n \r\n slice_table %s ",slice_table
75 if self.connection is None:
76 # (Re)initialize database connection
79 # Try UNIX socket first
80 self.connection = psycopg2.connect(user = 'sfa',
83 #self.connection = psycopg2.connect(user = self.config.SFA_PLC_DB_USER,
84 #password = self.config.SFA_PLC_DB_PASSWORD,
85 #database = self.config.SFA_PLC_DB_NAME)
86 except psycopg2.OperationalError:
88 self.connection = psycopg2.connect(user = self.config.SFA_PLC_DB_USER,
89 password = self.config.SFA_PLC_DB_PASSWORD,
90 database = self.config.SFA_PLC_DB_NAME,
91 host = self.config.SFA_PLC_DB_HOST,
92 port = self.config.SFA_PLC_DB_PORT)
93 self.connection.set_client_encoding("UNICODE")
95 self.connection = pgdb.connect(user = self.config.SFA_PLC_DB_USER,
96 password = self.config.SFA_PLC_DB_PASSWORD,
97 host = "%s:%d" % (self.config.SFA_PLC_DB_HOST, self.config.SFA_PLC_DB_PORT),
98 database = self.config.SFA_PLC_DB_NAME)
100 return self.connection.cursor()
102 #Close connection to database
104 if self.connection is not None:
105 self.connection.close()
106 self.connection = None
108 def selectall(self, query, hashref = True, key_field = None):
110 Return each row as a dictionary keyed on field name (like DBI
111 selectrow_hashref()). If key_field is specified, return rows
112 as a dictionary keyed on the specified field (like DBI
113 selectall_hashref()).
116 cursor = self.cursor()
117 cursor.execute(query)
118 rows = cursor.fetchall()
120 self.connection.commit()
122 if hashref or key_field is not None:
123 # Return each row as a dictionary keyed on field name
124 # (like DBI selectrow_hashref()).
125 labels = [column[0] for column in cursor.description]
126 rows = [dict(zip(labels, row)) for row in rows]
128 if key_field is not None and key_field in labels:
129 # Return rows as a dictionary keyed on the specified field
130 # (like DBI selectall_hashref()).
131 return dict([(row[key_field], row) for row in rows])
136 def exists(self, tablename):
138 Checks if the table specified as tablename exists.
144 metadata = MetaData (bind=engine)
145 table=Table (tablename, metadata, autoload=True)
147 except NoSuchTableError:
148 print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES EXISTS NOPE! tablename %s " %(tablename)
150 ##mark = self.cursor()
151 #sql = "SELECT * from pg_tables"
153 ##rows = mark.fetchall()
155 ##labels = [column[0] for column in mark.description]
156 ##rows = [dict(zip(labels, row)) for row in rows]
157 #rows = self.selectall(sql)
158 #rows = filter(lambda row: row['tablename'].startswith(tablename), rows)
161 #print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES EXISTS NOPE ! tablename %s " %(tablename)
164 def createtable(self, tablename ):
166 Creates the specifed table. Uses the global dictionnary holding the tablenames and
170 print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES CREATETABLE "
171 SlabBase.metadata.create_all(engine)
172 print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES CREATETABLE YAAAAAAAAAAY"
173 #mark = self.cursor()
175 #if tablename not in tablenames_dict:
176 #logger.error("Tablename unknown - creation failed")
179 #T = tablenames_dict[tablename]
180 #print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES.PY T %s" %(T)
182 #tmp = str(k) +' ' + T[k]
183 #tablelist.append(tmp)
185 #end_of_statement = ",".join(tablelist)
187 #statement = "CREATE TABLE " + tablename + " ("+ end_of_statement +");"
188 #print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES.PY statement %s" %(statement)
189 ##template = "CREATE INDEX %s_%s_idx ON %s (%s);"
190 ##indexes = [template % ( self.tablename, field, self.tablename, field) \
191 ##for field in ['hrn', 'type', 'authority', 'peer_authority', 'pointer']]
192 ##IF EXISTS doenst exist in postgres < 8.2
194 #print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES.PY AVANT LE DROP IF EXISTS"
195 #mark.execute('DROP TABLE IF EXISTS ' + tablename +';')
199 #mark.execute('DROP TABLE' + tablename +';')
202 #print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES.PY AVANT EXECUTE statement ",statement
203 #mark.execute(statement)
204 #print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES.PY OUEEEEEEEEEEEEEEEEEEEEEE "
205 ##for index in indexes:
207 #self.connection.commit()
208 #print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES.PY COMMIT DE OUFGUEDIN "
216 def insert(self, table, columns,values):
218 Inserts data (values) into the columns of the specified table.
222 statement = "INSERT INTO " + table + \
223 "(" + ",".join(columns) + ") " + \
224 "VALUES(" + ", ".join(values) + ");"
226 mark.execute(statement)
227 self.connection.commit()
232 def insert_slab_slice(self, person_rec):
234 Inserts information about a user and his slice into the slice table.
237 sfatable = SfaTable()
238 keys = slice_table.keys()
240 #returns a list of records from the sfa table (dicts)
241 #the filters specified will return only one matching record, into a list of dicts
242 #Finds the slice associated with the user (Senslabs slices hrns contains the user hrn)
244 userrecord = sfatable.find({'hrn': person_rec['hrn'], 'type':'user'})
245 slicerec = sfatable.find({'hrn': person_rec['hrn']+'_slice', 'type':'slice'})
247 if (isinstance (userrecord, list)):
248 userrecord = userrecord[0]
249 if (isinstance (slicerec, list)):
250 slicerec = slicerec[0]
253 values = [ str(oar_dflt_jobid), ' \''+ str(slicerec['hrn']) + '\'', str(userrecord['record_id']), str( slicerec['record_id'])]
255 self.insert('slice_senslab', keys, values)
257 logger.error("Trying to import a not senslab slice")
261 def update(self, table, column_names, values, whereclause, valueclause):
263 Updates a record in a given table.
266 #Creates the values string for the update SQL command
267 vclause = valueclause
268 if len(column_names) is not len(values):
272 valuesdict = dict(zip(column_names,values))
273 for k in valuesdict.keys():
274 valuesdict[k] = str(valuesdict[k])
275 #v = ' \''+ str(k) + '\''+ '='+' \''+ valuesdict[k]+'\''
276 v = str(k) + '=' + valuesdict[k]
278 if isinstance(vclause,str):
279 vclause = '\''+ vclause + '\''
280 statement = "UPDATE %s SET %s WHERE %s = %s" % \
281 (table, ", ".join(valueslist), whereclause, vclause)
282 print>>sys.stderr,"\r\n \r\n SLABPOSTGRES.PY update statement %s valuesdict %s valueslist %s" %(statement,valuesdict,valueslist)
284 mark.execute(statement)
285 self.connection.commit()
291 def update_senslab_slice(self, slice_rec):
292 sfatable = SfaTable()
293 hrn = str(slice_rec['hrn'])
294 userhrn = hrn.rstrip('_slice')
295 userrecord = sfatable.find({'hrn': userhrn, 'type':'user'})
296 print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES.PY update_senslab_slice : userrecord %s slice_rec %s userhrn %s" %( userrecord, slice_rec, userhrn)
297 if (isinstance (userrecord, list)):
298 userrecord = userrecord[0]
299 columns = [ 'record_id_user', 'oar_job_id']
300 values = [slice_rec['record_id_user'],slice_rec['oar_job_id']]
301 self.update('slice',columns, values,'record_id_slice', slice_rec['record_id_slice'])
305 def find(self, tablename,record_filter = None, columns=None):
306 print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES.PY find : record_filter %s %s columns %s %s" %( record_filter , type(record_filter),columns , type(columns))
310 columns = ",".join(columns)
311 sql = "SELECT %s FROM %s WHERE True " % (columns, tablename)
313 #if isinstance(record_filter, (list, tuple, set)):
314 #ints = filter(lambda x: isinstance(x, (int, long)), record_filter)
315 #strs = filter(lambda x: isinstance(x, StringTypes), record_filter)
316 #record_filter = Filter(SfaRecord.all_fields, {'record_id': ints, 'hrn': strs})
317 #sql += "AND (%s) %s " % record_filter.sql("OR")
318 #elif isinstance(record_filter, dict):
319 #record_filter = Filter(SfaRecord.all_fields, record_filter)
320 #sql += " AND (%s) %s" % record_filter.sql("AND")
321 #elif isinstance(record_filter, StringTypes):
322 #record_filter = Filter(SfaRecord.all_fields, {'hrn':[record_filter]})
323 #sql += " AND (%s) %s" % record_filter.sql("AND")
324 #elif isinstance(record_filter, int):
325 #record_filter = Filter(SfaRecord.all_fields, {'record_id':[record_filter]})
326 #sql += " AND (%s) %s" % record_filter.sql("AND")
328 if isinstance(record_filter, dict):
329 for k in record_filter.keys():
330 #sql += "AND "+' \''+ str(k) + '\''+ '='+' \''+ str(record_filter[k])+'\''
331 #sql += "AND "+ str(k) + '=' + str(record_filter[k])
332 sql += "AND "+ str(k) +'='+' \''+ str(record_filter[k])+'\''
333 elif isinstance(record_filter, str):
334 sql += "AND slice_hrn ="+ ' \''+record_filter+'\''
336 #elif isinstance(record_filter, int):
337 #record_filter = Filter(SfaRecord.all_fields, {'record_id':[record_filter]})
338 #sql += " AND (%s) %s" % record_filter.sql("AND")
340 print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES.PY find : sql %s record_filter %s %s" %(sql, record_filter , type(record_filter))
341 results = self.selectall(sql)
342 if isinstance(results, dict):