Some emails from LDAP are missing = set field to None.
[sfa.git] / sfa / senslab / slabpostgres.py
1 #import psycopg2
2 #import psycopg2.extensions
3 #psycopg2.extensions.register_type(psycopg2.extensions.UNICODE)
4 ## UNICODEARRAY not exported yet
5 #psycopg2.extensions.register_type(psycopg2._psycopg.UNICODEARRAY)
6 from sfa.util.config import Config
7 #from sfa.storage.table import SfaTable
8 from sfa.util.sfalogging import logger
9 # allow to run sfa2wsdl if this is missing (for mac)
10 import sys
11 #try: import pgdb
12 #except: print >> sys.stderr, "WARNING, could not import pgdb"
13 from sqlalchemy import create_engine
14 from sqlalchemy.orm import sessionmaker
15
16 from sqlalchemy import Column, Integer, String, DateTime
17 from sqlalchemy import Table, Column, MetaData, join, ForeignKey
18 import sfa.storage.model as model
19
20 from sqlalchemy.ext.declarative import declarative_base
21 from sqlalchemy.orm import relationship, backref
22
23
24 from sqlalchemy import MetaData, Table
25 from sqlalchemy.exc import NoSuchTableError
26
27 #Dict holding the columns names of the table as keys
28 #and their type, used for creation of the table
29 slice_table = {'record_id_user':'integer PRIMARY KEY references X ON DELETE CASCADE ON UPDATE CASCADE','oar_job_id':'integer DEFAULT -1',  'record_id_slice':'integer', 'slice_hrn':'text NOT NULL'}
30
31 #Dict with all the specific senslab tables
32 tablenames_dict = {'slice_senslab': slice_table}
33
34 ##############################
35
36
37
38 SlabBase = declarative_base()
39
40
41
42
43 class SlabSliceDB (SlabBase):
44     __tablename__ = 'slice_senslab' 
45     record_id_user = Column(Integer, primary_key=True)
46     oar_job_id = Column( Integer,default = -1)
47     record_id_slice = Column(Integer)
48     slice_hrn = Column(String,nullable = False)
49     
50     def __init__ (self, slice_hrn =None, oar_job_id=None, record_id_slice=None, record_id_user= None):
51         if record_id_slice: 
52             self.record_id_slice = record_id_slice
53         if slice_hrn:
54             self.slice_hrn = slice_hrn
55         if oar_job_id:
56             self.oar_job_id = oar_job_id
57         if slice_hrn:
58             self.slice_hrn = slice_hrn 
59         if record_id_user: 
60             self.record_id_user= record_id_user
61             
62     def __repr__(self):
63         result="<Record id user =%s, slice hrn=%s, oar_job id=%s,Record id slice =%s" % \
64                 (self.record_id_user, self.slice_hrn, self.oar_job_id, self.record_id_slice)
65         result += ">"
66         return result
67           
68             
69
70           
71 class SlabDB:
72     def __init__(self,config):
73         #self.config = Config()
74         #self.connection = None
75         self.sl_base = SlabBase
76         #self.init_create_query()
77           
78         dbname="slab_sfa"
79         # will be created lazily on-demand
80         self.slab_session = None
81         # the former PostgreSQL.py used the psycopg2 directly and was doing
82         #self.connection.set_client_encoding("UNICODE")
83         # it's unclear how to achieve this in sqlalchemy, nor if it's needed at all
84         # http://www.sqlalchemy.org/docs/dialects/postgresql.html#unicode
85         # we indeed have /var/lib/pgsql/data/postgresql.conf where
86         # this setting is unset, it might be an angle to tweak that if need be
87         # try a unix socket first - omitting the hostname does the trick
88         unix_url = "postgresql+psycopg2://%s:%s@:%s/%s"%\
89             (config.SFA_DB_USER,config.SFA_DB_PASSWORD,config.SFA_DB_PORT,dbname)
90         print >>sys.stderr, " \r\n \r\n SLAPOSTGRES INIT unix_url %s" %(unix_url)
91         # the TCP fallback method
92         tcp_url = "postgresql+psycopg2://%s:%s@%s:%s/%s"%\
93             (config.SFA_DB_USER,config.SFA_DB_PASSWORD,config.SFA_DB_HOST,config.SFA_DB_PORT,dbname)
94         for url in [ unix_url, tcp_url ] :
95             try:
96                 self.slab_engine = create_engine (url,echo_pool=True,echo=True)
97                 self.check()
98                 self.url=url
99                 return
100             except:
101                 pass
102         self.slab_engine=None
103         raise Exception,"Could not connect to database"
104
105     def check (self):
106         self.slab_engine.execute ("select 1").scalar()
107
108
109     def session (self):
110         if self.slab_session is None:
111             Session=sessionmaker ()
112             self.slab_session=Session(bind=self.slab_engine)
113         return self.slab_session
114         
115         
116     def cursor(self):
117         if self.connection is None:
118             # (Re)initialize database connection
119             if psycopg2:
120                 try:
121                     # Try UNIX socket first                    
122                     self.connection = psycopg2.connect(user = 'sfa',
123                                                        password = 'sfa',
124                                                        database = 'sfa')
125                     #self.connection = psycopg2.connect(user = self.config.SFA_PLC_DB_USER,
126                                                        #password = self.config.SFA_PLC_DB_PASSWORD,
127                                                        #database = self.config.SFA_PLC_DB_NAME)
128                 except psycopg2.OperationalError:
129                     # Fall back on TCP
130                     self.connection = psycopg2.connect(user = self.config.SFA_PLC_DB_USER,
131                                                        password = self.config.SFA_PLC_DB_PASSWORD,
132                                                        database = self.config.SFA_PLC_DB_NAME,
133                                                        host = self.config.SFA_PLC_DB_HOST,
134                                                        port = self.config.SFA_PLC_DB_PORT)
135                 self.connection.set_client_encoding("UNICODE")
136             else:
137                 self.connection = pgdb.connect(user = self.config.SFA_PLC_DB_USER,
138                                                password = self.config.SFA_PLC_DB_PASSWORD,
139                                                host = "%s:%d" % (self.config.SFA_PLC_DB_HOST, self.config.SFA_PLC_DB_PORT),
140                                                database = self.config.SFA_PLC_DB_NAME)
141
142         return self.connection.cursor()
143         
144     #Close connection to database
145     def close(self):
146         if self.connection is not None:
147             self.connection.close()
148             self.connection = None
149             
150     def selectall(self, query,  hashref = True, key_field = None):
151         """
152         Return each row as a dictionary keyed on field name (like DBI
153         selectrow_hashref()). If key_field is specified, return rows
154         as a dictionary keyed on the specified field (like DBI
155         selectall_hashref()).
156
157         """
158         cursor = self.cursor()
159         cursor.execute(query)
160         rows = cursor.fetchall()
161         cursor.close()
162         self.connection.commit()
163
164         if hashref or key_field is not None:
165             # Return each row as a dictionary keyed on field name
166             # (like DBI selectrow_hashref()).
167             labels = [column[0] for column in cursor.description]
168             rows = [dict(zip(labels, row)) for row in rows]
169
170         if key_field is not None and key_field in labels:
171             # Return rows as a dictionary keyed on the specified field
172             # (like DBI selectall_hashref()).
173             return dict([(row[key_field], row) for row in rows])
174         else:
175             return rows
176         
177         
178     def exists(self, tablename):
179         """
180         Checks if the table specified as tablename exists.
181     
182         """
183         
184        
185         try:
186             metadata = MetaData (bind=self.slab_engine)
187             table=Table (tablename, metadata, autoload=True)
188            
189             return True
190         except NoSuchTableError:
191             print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES EXISTS NOPE! tablename %s " %(tablename)
192             return False
193         ##mark = self.cursor()
194         #sql = "SELECT * from pg_tables"
195         ##mark.execute(sql)
196         ##rows = mark.fetchall()
197         ##mark.close()
198         ##labels = [column[0] for column in mark.description]
199         ##rows = [dict(zip(labels, row)) for row in rows]
200         #rows = self.selectall(sql)
201         #rows = filter(lambda row: row['tablename'].startswith(tablename), rows)
202         #if rows:
203             #return True
204         #print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES EXISTS NOPE ! tablename %s " %(tablename)
205         #return False
206     
207     def createtable(self, tablename ):
208         """
209         Creates the specifed table. Uses the global dictionnary holding the tablenames and
210         the table schema.
211     
212         """
213         #metadata = MetaData (bind=engine)
214         #table=Table (tablename, metadata, autoload=True)
215         #records = Table ( 'records', SlabBase.metadata,autoload=True )
216         #records = Table ( 'records', SlabBase.metadata,Column ('record_id', Integer, primary_key=True), )
217         print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES createtable SlabBase.metadata.sorted_tables %s \r\n engine %s" %(SlabBase.metadata.sorted_tables , slab_engine)
218         #table.create(bind =engine)
219         SlabBase.metadata.create_all(slab_engine)
220         return
221     
222
223
224
225     def insert(self, table, columns,values):
226         """
227         Inserts data (values) into the columns of the specified table. 
228     
229         """
230         mark = self.cursor()
231         statement = "INSERT INTO " + table + \
232                     "(" + ",".join(columns) + ") " + \
233                     "VALUES(" + ", ".join(values) + ");"
234
235         mark.execute(statement) 
236         self.connection.commit()
237         mark.close()
238         self.close()
239         return
240     
241     #def insert_slab_slice(self, person_rec):
242         #"""
243         #Inserts information about a user and his slice into the slice table. 
244     
245         #"""
246         #sfatable = SfaTable()
247         #keys = slice_table.keys()
248         
249         ##returns a list of records from the sfa table (dicts)
250         ##the filters specified will return only one matching record, into a list of dicts
251         ##Finds the slice associated with the user (Senslabs slices  hrns contains the user hrn)
252
253         #userrecord = sfatable.find({'hrn': person_rec['hrn'], 'type':'user'})
254         #slicerec =  sfatable.find({'hrn': person_rec['hrn']+'_slice', 'type':'slice'})
255         #if slicerec :
256             #if (isinstance (userrecord, list)):
257                 #userrecord = userrecord[0]
258             #if (isinstance (slicerec, list)):
259                 #slicerec = slicerec[0]
260                 
261             #oar_dflt_jobid = -1
262             #values = [ str(oar_dflt_jobid), ' \''+ str(slicerec['hrn']) + '\'', str(userrecord['record_id']), str( slicerec['record_id'])]
263     
264             #self.insert('slice_senslab', keys, values)
265         #else :
266             #logger.error("Trying to import a not senslab slice")
267         #return
268         
269         
270     def update(self, table, column_names, values, whereclause, valueclause):
271         """
272         Updates a record in a given table. 
273     
274         """
275         #Creates the values string for the update SQL command
276         vclause = valueclause
277         if len(column_names) is not len(values):
278             return
279         else:
280             valueslist = []
281             valuesdict = dict(zip(column_names,values))
282             for k in valuesdict.keys():
283                 valuesdict[k] = str(valuesdict[k])
284                 #v = ' \''+ str(k) + '\''+ '='+' \''+ valuesdict[k]+'\''
285                 v = str(k) + '=' + valuesdict[k]
286                 valueslist.append(v)
287         if isinstance(vclause,str):
288             vclause = '\''+ vclause + '\''
289         statement = "UPDATE %s SET %s WHERE %s = %s" % \
290                     (table, ", ".join(valueslist), whereclause, vclause)
291         print>>sys.stderr,"\r\n \r\n SLABPOSTGRES.PY update statement %s valuesdict %s valueslist %s" %(statement,valuesdict,valueslist)
292         mark = self.cursor()
293         mark.execute(statement) 
294         self.connection.commit()
295         mark.close()
296         self.close()
297
298         return
299
300     #def update_senslab_slice(self, slice_rec):
301         #sfatable = SfaTable()
302         #hrn = str(slice_rec['hrn']) 
303         #userhrn = hrn.rstrip('_slice')
304         #userrecord = sfatable.find({'hrn': userhrn, 'type':'user'})
305         #print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES.PY  update_senslab_slice : userrecord  %s slice_rec %s userhrn %s" %( userrecord, slice_rec, userhrn)
306         #if (isinstance (userrecord, list)):
307                 #userrecord = userrecord[0]
308         #columns = [ 'record_id_user', 'oar_job_id']
309         #values = [slice_rec['record_id_user'],slice_rec['oar_job_id']]
310         #self.update('slice',columns, values,'record_id_slice', slice_rec['record_id_slice'])
311         #return 
312         
313        
314     def find(self, tablename,record_filter = None, columns=None):  
315         print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES.PY find :  record_filter %s %s columns %s %s" %( record_filter , type(record_filter),columns , type(columns))
316         if not columns:
317             columns = "*"
318         else:
319             columns = ",".join(columns)
320         sql = "SELECT %s FROM %s WHERE True " % (columns, tablename)
321         
322         #if isinstance(record_filter, (list, tuple, set)):
323             #ints = filter(lambda x: isinstance(x, (int, long)), record_filter)
324             #strs = filter(lambda x: isinstance(x, StringTypes), record_filter)
325             #record_filter = Filter(SfaRecord.all_fields, {'record_id': ints, 'hrn': strs})
326             #sql += "AND (%s) %s " % record_filter.sql("OR") 
327         #elif isinstance(record_filter, dict):
328             #record_filter = Filter(SfaRecord.all_fields, record_filter)        
329             #sql += " AND (%s) %s" % record_filter.sql("AND")
330         #elif isinstance(record_filter, StringTypes):
331             #record_filter = Filter(SfaRecord.all_fields, {'hrn':[record_filter]})    
332             #sql += " AND (%s) %s" % record_filter.sql("AND")
333         #elif isinstance(record_filter, int):
334             #record_filter = Filter(SfaRecord.all_fields, {'record_id':[record_filter]})    
335             #sql += " AND (%s) %s" % record_filter.sql("AND")
336        
337         if isinstance(record_filter, dict):
338             for k in record_filter.keys():
339                 #sql += "AND "+' \''+ str(k) + '\''+ '='+' \''+ str(record_filter[k])+'\''
340                 #sql += "AND "+ str(k) + '=' + str(record_filter[k])
341                 sql += "AND "+ str(k) +'='+' \''+ str(record_filter[k])+'\''
342         elif isinstance(record_filter, str):
343             sql += "AND slice_hrn ="+ ' \''+record_filter+'\''
344
345         #elif isinstance(record_filter, int):
346             #record_filter = Filter(SfaRecord.all_fields, {'record_id':[record_filter]})    
347             #sql += " AND (%s) %s" % record_filter.sql("AND")
348         sql +=  ";"
349         print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES.PY find : sql %s record_filter  %s %s" %(sql, record_filter , type(record_filter))
350         results = self.selectall(sql)
351         if isinstance(results, dict):
352             results = [results]
353         return results
354        
355
356
357 from sfa.util.config import Config
358
359 slab_alchemy= SlabDB(Config())
360 slab_engine=slab_alchemy.slab_engine
361 slab_dbsession=slab_alchemy.session()