4db5b8988d3bc0ecdc10f39830ab18e8d2b160d3
[sfa.git] / sfa / senslab / slabpostgres.py
1 import sys
2
3 from sqlalchemy import create_engine, and_
4 from sqlalchemy.orm import sessionmaker
5
6 from sfa.util.config import Config
7 from sfa.util.sfalogging import logger
8
9 from sqlalchemy import Column, Integer, String, DateTime
10 from sqlalchemy import Table, Column, MetaData, join, ForeignKey
11 import sfa.storage.model as model
12 from sfa.storage.model import RegSlice
13 from sqlalchemy.ext.declarative import declarative_base
14 from sqlalchemy.orm import relationship, backref
15
16
17 from sqlalchemy.dialects import postgresql
18
19 from sqlalchemy import MetaData, Table
20 from sqlalchemy.exc import NoSuchTableError
21
22 from sqlalchemy import String
23 from sfa.storage.alchemy import dbsession
24
25 #Dict holding the columns names of the table as keys
26 #and their type, used for creation of the table
27 slice_table = {'record_id_user':'integer PRIMARY KEY references X ON DELETE CASCADE ON UPDATE CASCADE','oar_job_id':'integer DEFAULT -1',  'record_id_slice':'integer', 'slice_hrn':'text NOT NULL'}
28
29 #Dict with all the specific senslab tables
30 tablenames_dict = {'slice_senslab': slice_table}
31
32 ##############################
33
34
35
36 SlabBase = declarative_base()
37
38
39
40
41 class SliceSenslab (SlabBase):
42     __tablename__ = 'slice_senslab' 
43     #record_id_user = Column(Integer, primary_key=True)
44
45     slice_hrn = Column(String,primary_key=True)
46     peer_authority = Column( String,nullable = True)
47     record_id_slice = Column(Integer)    
48     record_id_user = Column(Integer) 
49
50     #oar_job_id = Column( Integer,default = -1)
51     #node_list = Column(postgresql.ARRAY(String), nullable =True)
52     
53     def __init__ (self, slice_hrn =None, record_id_slice=None, record_id_user= None,peer_authority=None):
54         if record_id_slice: 
55             self.record_id_slice = record_id_slice
56         if slice_hrn:
57             self.slice_hrn = slice_hrn
58         if record_id_user: 
59             self.record_id_user= record_id_user
60         if peer_authority:
61             self.peer_authority = peer_authority
62             
63             
64     def __repr__(self):
65         result="<Record id user =%s, slice hrn=%s, Record id slice =%s ,peer_authority =%s"% \
66                 (self.record_id_user, self.slice_hrn, self.record_id_slice, self.peer_authority)
67         result += ">"
68         return result
69           
70     def dump_sqlalchemyobj_to_dict(self):
71         dict = {'slice_hrn':self.slice_hrn,
72         'peer_authority':self.peer_authority,
73         'record_id':self.record_id_slice, 
74         'record_id_user':self.record_id_user,
75         'record_id_slice':self.record_id_slice, }
76         return dict 
77           
78           
79 class JobSenslab (SlabBase):
80     __tablename__ = 'job_senslab' 
81     #record_id_user = Column(Integer, primary_key=True)
82     # Multiple primary key aka composite primary key
83     # so that we can have several job id for a given slice hrn
84     slice_hrn = Column(String,ForeignKey('slice_senslab.slice_hrn'))
85     oar_job_id = Column( Integer, primary_key=True)
86     record_id_slice = Column(Integer)    
87     record_id_user = Column(Integer) 
88     
89     #oar_job_id = Column( Integer,default = -1)
90     node_list = Column(postgresql.ARRAY(String), nullable =True)
91     
92     slice_complete = relationship("SliceSenslab", backref=backref('job_senslab', order_by=slice_hrn))
93     
94     def __init__ (self, slice_hrn =None, oar_job_id=None, record_id_slice=None, record_id_user= None):
95         self.node_list = []
96         if record_id_slice: 
97             self.record_id_slice = record_id_slice
98         if slice_hrn:
99             self.slice_hrn = slice_hrn
100         if oar_job_id:
101             self.oar_job_id = oar_job_id
102         if record_id_user: 
103             self.record_id_user= record_id_user
104            
105             
106     def __repr__(self):
107         result="<Record id user =%s, slice hrn=%s, oar_job id=%s,Record id slice =%s  node_list =%s "% \
108                 (self.record_id_user, self.slice_hrn, self.oar_job_id, self.record_id_slice, self.node_list)
109         result += ">"
110         return result
111           
112     def dump_sqlalchemyobj_to_dict(self):
113         dict = {'slice_hrn':self.slice_hrn,
114         'record_id_user':self.record_id_user,
115         'oar_job_id':self.oar_job_id, 
116         'record_id_slice':self.record_id_slice, 
117          'node_list':self.node_list}
118         return dict       
119
120 #class PeerSenslab(SlabBase):
121     #__tablename__ = 'peer_senslab' 
122     #peername = Column(String, nullable = False)
123     #peerid = Column( Integer,primary_key=True)
124     
125     #def __init__ (self,peername = None ):
126         #if peername:
127             #self.peername = peername
128             
129             
130       #def __repr__(self):
131         #result="<Peer id  =%s, Peer name =%s" % (self.peerid, self.peername)
132         #result += ">"
133         #return result
134           
135 class SlabDB:
136     def __init__(self,config, debug = False):
137         self.sl_base = SlabBase
138         dbname="slab_sfa"
139         if debug == True :
140             l_echo_pool = True
141             l_echo=True 
142         else :
143             l_echo_pool = False
144             l_echo = False 
145         # will be created lazily on-demand
146         self.slab_session = None
147         # the former PostgreSQL.py used the psycopg2 directly and was doing
148         #self.connection.set_client_encoding("UNICODE")
149         # it's unclear how to achieve this in sqlalchemy, nor if it's needed at all
150         # http://www.sqlalchemy.org/docs/dialects/postgresql.html#unicode
151         # we indeed have /var/lib/pgsql/data/postgresql.conf where
152         # this setting is unset, it might be an angle to tweak that if need be
153         # try a unix socket first - omitting the hostname does the trick
154         unix_url = "postgresql+psycopg2://%s:%s@:%s/%s"%\
155             (config.SFA_DB_USER,config.SFA_DB_PASSWORD,config.SFA_DB_PORT,dbname)
156         print >>sys.stderr, " \r\n \r\n SLAPOSTGRES INIT unix_url %s" %(unix_url)
157         # the TCP fallback method
158         tcp_url = "postgresql+psycopg2://%s:%s@%s:%s/%s"%\
159             (config.SFA_DB_USER,config.SFA_DB_PASSWORD,config.SFA_DB_HOST,config.SFA_DB_PORT,dbname)
160         for url in [ unix_url, tcp_url ] :
161             try:
162                 self.slab_engine = create_engine (url,echo_pool = l_echo_pool, echo = l_echo)
163                 self.check()
164                 self.url=url
165                 return
166             except:
167                 pass
168         self.slab_engine=None
169         raise Exception,"Could not connect to database"
170     
171     
172     
173     def check (self):
174         self.slab_engine.execute ("select 1").scalar()
175         
176         
177         
178     def session (self):
179         if self.slab_session is None:
180             Session=sessionmaker ()
181             self.slab_session=Session(bind=self.slab_engine)
182         return self.slab_session
183         
184         
185    
186         
187     #Close connection to database
188     def close(self):
189         if self.connection is not None:
190             self.connection.close()
191             self.connection = None
192             
193    
194         
195         
196     def exists(self, tablename):
197         """
198         Checks if the table specified as tablename exists.
199     
200         """
201        
202         try:
203             metadata = MetaData (bind=self.slab_engine)
204             table=Table (tablename, metadata, autoload=True)
205            
206             return True
207         except NoSuchTableError:
208             print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES EXISTS NOPE! tablename %s " %(tablename)
209             return False
210        
211     
212     def createtable(self, tablename ):
213         """
214         Creates the specifed table. Uses the global dictionnary holding the tablenames and
215         the table schema.
216     
217         """
218
219         print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES createtable SlabBase.metadata.sorted_tables %s \r\n engine %s" %(SlabBase.metadata.sorted_tables , slab_engine)
220         SlabBase.metadata.create_all(slab_engine)
221         return
222     
223     def add_job (self, hrn, job_id, nodes = None ):
224         job_row = slab_dbsession.query(JobSenslab).filter_by(oar_job_id=job_id).first()
225         if job_row is None:
226             slice_rec = dbsession.query(RegSlice).filter(RegSlice.hrn.match(hrn)).first()
227             if slice_rec : 
228                 user_record = slice_rec.reg_researchers   
229                 slab_slice = JobSenslab(slice_hrn = hrn, oar_job_id = job_id, \
230                     record_id_slice=slice_rec.record_id, record_id_user= user_record[0].record_id)
231                 #slab_slice = SliceSenslab(slice_hrn = hrn, oar_job_id = job_id, \
232                     #record_id_slice=slice_rec.record_id, record_id_user= user_record[0].record_id)
233                 logger.debug("============SLABPOSTGRES \t add_job slab_slice %s" %(slab_slice))
234                 slab_dbsession.add(slab_slice)
235                 slab_slice.node_list = nodes
236                 slab_dbsession.commit()
237         else:
238             return
239      
240         
241     def delete_job (self, hrn, job_id):
242         #slab_slice = 
243         slab_dbsession.query(JobSenslab).filter_by(slice_hrn = hrn).filter_by(oar_job_id =job_id).delete()
244         #slab_dbsession.delete(slab_slice)
245         slab_dbsession.commit()
246         
247     #Updates the job_id and the nodes list 
248     #The nodes list is never erased.
249
250
251
252     def find (self, name = None, filter_dict = None):
253         print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES find  filter_dict %s"%(filter_dict)
254
255         #Filter_by can not handle more than one argument, hence these functions
256         def filter_id_user(query, user_id):
257             print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES find  filter_id_user"
258             return query.filter_by(record_id_user = user_id)
259         
260         def filter_job(query, job):
261             print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES find filter_job "
262             return query.filter_by(oar_job_id = job)
263         
264         def filer_id_slice (query, id_slice):
265             print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES find  filer_id_slice"
266             return query.filter_by(record_id_slice = id_slice)
267         
268         def filter_slice_hrn(query, hrn):
269             print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES find  filter_slice_hrn"
270             return query.filter_by(slice_hrn = hrn)
271         
272         
273         extended_filter = {'record_id_user': filter_id_user,
274          'oar_job_id':filter_job,
275          'record_id_slice': filer_id_slice,
276          'slice_hrn': filter_slice_hrn}
277          
278         Q = slab_dbsession.query(SliceSenslab) 
279         
280         if filter_dict is not None:
281             for k in filter_dict:
282                 try:
283                   newQ= extended_filter[k](Q, filter_dict[k])
284                   Q = newQ
285                 except KeyError:
286                     print>>sys.stderr, "\r\n \t\t FFFFFFFFFFFFFFFFUUUUUUUUFUFUFU!!!!!!!!"
287         print>>sys.stderr, " HEEEEEEEEEEEEY %s " %(Q.first())
288         rec = Q.first()
289         print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES find  rec %s" %(rec)
290         return dict(zip(['record_id_user','oar_job_id', 'record_id_slice','slice_hrn'],[rec.record_id_user,rec.oar_job_id,rec.record_id_slice, rec.slice_hrn]))
291         #reclist = []
292         ##for rec in Q.all():
293             #reclist.append(dict(zip(['record_id_user','oar_job_id', 'record_id_slice','slice_hrn'],[rec.record_id_user,rec.oar_job_id,rec.record_id_slice, rec.slice_hrn])))
294         #return reclist
295         
296        
297
298
299 from sfa.util.config import Config
300
301 slab_alchemy= SlabDB(Config())
302 slab_engine=slab_alchemy.slab_engine
303 slab_dbsession=slab_alchemy.session()