Thrash commit to use SQLalchemy and create senslab_slice table.
[sfa.git] / sfa / senslab / slabpostgres.py
1 import psycopg2
2 import psycopg2.extensions
3 psycopg2.extensions.register_type(psycopg2.extensions.UNICODE)
4 # UNICODEARRAY not exported yet
5 psycopg2.extensions.register_type(psycopg2._psycopg.UNICODEARRAY)
6 from sfa.util.config import Config
7 from sfa.storage.table import SfaTable
8 from sfa.util.sfalogging import logger
9 # allow to run sfa2wsdl if this is missing (for mac)
10 import sys
11 try: import pgdb
12 except: print >> sys.stderr, "WARNING, could not import pgdb"
13
14
15 from sqlalchemy import Column, Integer, String, DateTime
16 from sqlalchemy import Table, Column, MetaData, join, ForeignKey
17 from sfa.storage.model import Base
18 from sqlalchemy.ext.declarative import declarative_base
19 from sqlalchemy.orm import relationship, backref
20
21 from sfa.storage.alchemy import dbsession, engine 
22 from sqlalchemy import MetaData, Table
23 from sqlalchemy.exc import NoSuchTableError
24
25 #Dict holding the columns names of the table as keys
26 #and their type, used for creation of the table
27 slice_table = {'record_id_user':'integer PRIMARY KEY references X ON DELETE CASCADE ON UPDATE CASCADE','oar_job_id':'integer DEFAULT -1',  'record_id_slice':'integer', 'slice_hrn':'text NOT NULL'}
28
29 #Dict with all the specific senslab tables
30 tablenames_dict = {'slice_senslab': slice_table}
31
32 ##############################
33
34
35
36 SlabBase = declarative_base(metadata= Base.metadata, bind=engine)
37
38
39
40
41 class SlabSliceDB (SlabBase):
42     __tablename__ = 'slice_senslab'
43     record_id_user = Column(Integer, ForeignKey("records.record_id"), primary_key=True)
44     oar_job_id = Column( Integer,default = -1)
45     record_id_slice = Column(Integer)
46     slice_hrn = Column(String,nullable = False)
47     
48     def __init__ (self, slice_hrn =None, oar_job_id=None, record_id_slice=None, record_id_user= None):
49         if record_id_slice: 
50             self.record_id_slice = record_id_slice
51         if slice_hrn:
52             self.slice_hrn = slice_hrn
53         if oar_job_id:
54             self.oar_job_id = oar_job_id
55         if slice_hrn:
56             self.slice_hrn = slice_hrn 
57         if record_id_user: 
58             self.record_id_user= record_id_user
59             
60             
61
62           
63 class SlabDB:
64     def __init__(self):
65         self.config = Config()
66         self.connection = None
67         self.init_create_query()
68         
69     def init_create_query(self):
70         sfatable = SfaTable()
71         slice_table['record_id_user'] =  slice_table['record_id_user'].replace("X",sfatable.tablename)
72         print sys.stderr, " \r\n \r\n slice_table %s ",slice_table 
73         
74     def cursor(self):
75         if self.connection is None:
76             # (Re)initialize database connection
77             if psycopg2:
78                 try:
79                     # Try UNIX socket first                    
80                     self.connection = psycopg2.connect(user = 'sfa',
81                                                        password = 'sfa',
82                                                        database = 'sfa')
83                     #self.connection = psycopg2.connect(user = self.config.SFA_PLC_DB_USER,
84                                                        #password = self.config.SFA_PLC_DB_PASSWORD,
85                                                        #database = self.config.SFA_PLC_DB_NAME)
86                 except psycopg2.OperationalError:
87                     # Fall back on TCP
88                     self.connection = psycopg2.connect(user = self.config.SFA_PLC_DB_USER,
89                                                        password = self.config.SFA_PLC_DB_PASSWORD,
90                                                        database = self.config.SFA_PLC_DB_NAME,
91                                                        host = self.config.SFA_PLC_DB_HOST,
92                                                        port = self.config.SFA_PLC_DB_PORT)
93                 self.connection.set_client_encoding("UNICODE")
94             else:
95                 self.connection = pgdb.connect(user = self.config.SFA_PLC_DB_USER,
96                                                password = self.config.SFA_PLC_DB_PASSWORD,
97                                                host = "%s:%d" % (self.config.SFA_PLC_DB_HOST, self.config.SFA_PLC_DB_PORT),
98                                                database = self.config.SFA_PLC_DB_NAME)
99
100         return self.connection.cursor()
101         
102     #Close connection to database
103     def close(self):
104         if self.connection is not None:
105             self.connection.close()
106             self.connection = None
107             
108     def selectall(self, query,  hashref = True, key_field = None):
109         """
110         Return each row as a dictionary keyed on field name (like DBI
111         selectrow_hashref()). If key_field is specified, return rows
112         as a dictionary keyed on the specified field (like DBI
113         selectall_hashref()).
114
115         """
116         cursor = self.cursor()
117         cursor.execute(query)
118         rows = cursor.fetchall()
119         cursor.close()
120         self.connection.commit()
121
122         if hashref or key_field is not None:
123             # Return each row as a dictionary keyed on field name
124             # (like DBI selectrow_hashref()).
125             labels = [column[0] for column in cursor.description]
126             rows = [dict(zip(labels, row)) for row in rows]
127
128         if key_field is not None and key_field in labels:
129             # Return rows as a dictionary keyed on the specified field
130             # (like DBI selectall_hashref()).
131             return dict([(row[key_field], row) for row in rows])
132         else:
133             return rows
134         
135         
136     def exists(self, tablename):
137         """
138         Checks if the table specified as tablename exists.
139     
140         """
141         
142        
143         try:
144             metadata = MetaData (bind=engine)
145             table=Table (tablename, metadata, autoload=True)
146             return True
147         except NoSuchTableError:
148             print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES EXISTS NOPE! tablename %s " %(tablename)
149             return False
150         ##mark = self.cursor()
151         #sql = "SELECT * from pg_tables"
152         ##mark.execute(sql)
153         ##rows = mark.fetchall()
154         ##mark.close()
155         ##labels = [column[0] for column in mark.description]
156         ##rows = [dict(zip(labels, row)) for row in rows]
157         #rows = self.selectall(sql)
158         #rows = filter(lambda row: row['tablename'].startswith(tablename), rows)
159         #if rows:
160             #return True
161         #print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES EXISTS NOPE ! tablename %s " %(tablename)
162         #return False
163     
164     def createtable(self, tablename ):
165         """
166         Creates the specifed table. Uses the global dictionnary holding the tablenames and
167         the table schema.
168     
169         """
170         print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES CREATETABLE " 
171         SlabBase.metadata.create_all(engine)
172         print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES CREATETABLE  YAAAAAAAAAAY" 
173         #mark = self.cursor()
174         #tablelist =[]
175         #if tablename not in tablenames_dict:
176             #logger.error("Tablename unknown - creation failed")
177             #return
178             
179         #T  = tablenames_dict[tablename]
180         #print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES.PY T %s" %(T)
181         #for k in T.keys(): 
182             #tmp = str(k) +' ' + T[k]
183             #tablelist.append(tmp)
184             
185         #end_of_statement = ",".join(tablelist)
186         
187         #statement = "CREATE TABLE " + tablename + " ("+ end_of_statement +");"
188         #print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES.PY statement  %s" %(statement)
189         ##template = "CREATE INDEX %s_%s_idx ON %s (%s);"
190         ##indexes = [template % ( self.tablename, field, self.tablename, field) \
191                     ##for field in ['hrn', 'type', 'authority', 'peer_authority', 'pointer']]
192         ##IF EXISTS doenst exist in postgres < 8.2
193         #try: 
194             #print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES.PY AVANT LE DROP IF EXISTS"
195             #mark.execute('DROP TABLE IF EXISTS ' + tablename +';')
196             
197         #except:
198             #try:
199                 #mark.execute('DROP TABLE' + tablename +';')
200             #except:
201                 #pass
202         #print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES.PY AVANT EXECUTE statement ",statement     
203         #mark.execute(statement)
204         #print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES.PY OUEEEEEEEEEEEEEEEEEEEEEE "   
205         ##for index in indexes:
206             ##self.db.do(index)
207         #self.connection.commit()
208         #print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES.PY COMMIT DE OUFGUEDIN "  
209         #mark.close()
210         #self.close()
211         return
212     
213
214
215
216     def insert(self, table, columns,values):
217         """
218         Inserts data (values) into the columns of the specified table. 
219     
220         """
221         mark = self.cursor()
222         statement = "INSERT INTO " + table + \
223                     "(" + ",".join(columns) + ") " + \
224                     "VALUES(" + ", ".join(values) + ");"
225
226         mark.execute(statement) 
227         self.connection.commit()
228         mark.close()
229         self.close()
230         return
231     
232     def insert_slab_slice(self, person_rec):
233         """
234         Inserts information about a user and his slice into the slice table. 
235     
236         """
237         sfatable = SfaTable()
238         keys = slice_table.keys()
239         
240         #returns a list of records from the sfa table (dicts)
241         #the filters specified will return only one matching record, into a list of dicts
242         #Finds the slice associated with the user (Senslabs slices  hrns contains the user hrn)
243
244         userrecord = sfatable.find({'hrn': person_rec['hrn'], 'type':'user'})
245         slicerec =  sfatable.find({'hrn': person_rec['hrn']+'_slice', 'type':'slice'})
246         if slicerec :
247             if (isinstance (userrecord, list)):
248                 userrecord = userrecord[0]
249             if (isinstance (slicerec, list)):
250                 slicerec = slicerec[0]
251                 
252             oar_dflt_jobid = -1
253             values = [ str(oar_dflt_jobid), ' \''+ str(slicerec['hrn']) + '\'', str(userrecord['record_id']), str( slicerec['record_id'])]
254     
255             self.insert('slice_senslab', keys, values)
256         else :
257             logger.error("Trying to import a not senslab slice")
258         return
259         
260         
261     def update(self, table, column_names, values, whereclause, valueclause):
262         """
263         Updates a record in a given table. 
264     
265         """
266         #Creates the values string for the update SQL command
267         vclause = valueclause
268         if len(column_names) is not len(values):
269             return
270         else:
271             valueslist = []
272             valuesdict = dict(zip(column_names,values))
273             for k in valuesdict.keys():
274                 valuesdict[k] = str(valuesdict[k])
275                 #v = ' \''+ str(k) + '\''+ '='+' \''+ valuesdict[k]+'\''
276                 v = str(k) + '=' + valuesdict[k]
277                 valueslist.append(v)
278         if isinstance(vclause,str):
279             vclause = '\''+ vclause + '\''
280         statement = "UPDATE %s SET %s WHERE %s = %s" % \
281                     (table, ", ".join(valueslist), whereclause, vclause)
282         print>>sys.stderr,"\r\n \r\n SLABPOSTGRES.PY update statement %s valuesdict %s valueslist %s" %(statement,valuesdict,valueslist)
283         mark = self.cursor()
284         mark.execute(statement) 
285         self.connection.commit()
286         mark.close()
287         self.close()
288
289         return
290
291     def update_senslab_slice(self, slice_rec):
292         sfatable = SfaTable()
293         hrn = str(slice_rec['hrn']) 
294         userhrn = hrn.rstrip('_slice')
295         userrecord = sfatable.find({'hrn': userhrn, 'type':'user'})
296         print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES.PY  update_senslab_slice : userrecord  %s slice_rec %s userhrn %s" %( userrecord, slice_rec, userhrn)
297         if (isinstance (userrecord, list)):
298                 userrecord = userrecord[0]
299         columns = [ 'record_id_user', 'oar_job_id']
300         values = [slice_rec['record_id_user'],slice_rec['oar_job_id']]
301         self.update('slice',columns, values,'record_id_slice', slice_rec['record_id_slice'])
302         return 
303         
304        
305     def find(self, tablename,record_filter = None, columns=None):  
306         print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES.PY find :  record_filter %s %s columns %s %s" %( record_filter , type(record_filter),columns , type(columns))
307         if not columns:
308             columns = "*"
309         else:
310             columns = ",".join(columns)
311         sql = "SELECT %s FROM %s WHERE True " % (columns, tablename)
312         
313         #if isinstance(record_filter, (list, tuple, set)):
314             #ints = filter(lambda x: isinstance(x, (int, long)), record_filter)
315             #strs = filter(lambda x: isinstance(x, StringTypes), record_filter)
316             #record_filter = Filter(SfaRecord.all_fields, {'record_id': ints, 'hrn': strs})
317             #sql += "AND (%s) %s " % record_filter.sql("OR") 
318         #elif isinstance(record_filter, dict):
319             #record_filter = Filter(SfaRecord.all_fields, record_filter)        
320             #sql += " AND (%s) %s" % record_filter.sql("AND")
321         #elif isinstance(record_filter, StringTypes):
322             #record_filter = Filter(SfaRecord.all_fields, {'hrn':[record_filter]})    
323             #sql += " AND (%s) %s" % record_filter.sql("AND")
324         #elif isinstance(record_filter, int):
325             #record_filter = Filter(SfaRecord.all_fields, {'record_id':[record_filter]})    
326             #sql += " AND (%s) %s" % record_filter.sql("AND")
327        
328         if isinstance(record_filter, dict):
329             for k in record_filter.keys():
330                 #sql += "AND "+' \''+ str(k) + '\''+ '='+' \''+ str(record_filter[k])+'\''
331                 #sql += "AND "+ str(k) + '=' + str(record_filter[k])
332                 sql += "AND "+ str(k) +'='+' \''+ str(record_filter[k])+'\''
333         elif isinstance(record_filter, str):
334             sql += "AND slice_hrn ="+ ' \''+record_filter+'\''
335
336         #elif isinstance(record_filter, int):
337             #record_filter = Filter(SfaRecord.all_fields, {'record_id':[record_filter]})    
338             #sql += " AND (%s) %s" % record_filter.sql("AND")
339         sql +=  ";"
340         print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES.PY find : sql %s record_filter  %s %s" %(sql, record_filter , type(record_filter))
341         results = self.selectall(sql)
342         if isinstance(results, dict):
343             results = [results]
344         return results
345