Modified database slice_senslab table.
[sfa.git] / sfa / senslab / slabpostgres.py
1 import sys
2
3 from sqlalchemy import create_engine, and_
4 from sqlalchemy.orm import sessionmaker
5
6 from sfa.util.config import Config
7 from sfa.util.sfalogging import logger
8
9 from sqlalchemy import Column, Integer, String, DateTime
10 from sqlalchemy import Table, Column, MetaData, join, ForeignKey
11 import sfa.storage.model as model
12
13 from sqlalchemy.ext.declarative import declarative_base
14 from sqlalchemy.orm import relationship, backref
15
16
17 from sqlalchemy.dialects import postgresql
18
19 from sqlalchemy import MetaData, Table
20 from sqlalchemy.exc import NoSuchTableError
21
22 from sqlalchemy import String
23
24 #Dict holding the columns names of the table as keys
25 #and their type, used for creation of the table
26 slice_table = {'record_id_user':'integer PRIMARY KEY references X ON DELETE CASCADE ON UPDATE CASCADE','oar_job_id':'integer DEFAULT -1',  'record_id_slice':'integer', 'slice_hrn':'text NOT NULL'}
27
28 #Dict with all the specific senslab tables
29 tablenames_dict = {'slice_senslab': slice_table}
30
31 ##############################
32
33
34
35 SlabBase = declarative_base()
36
37
38
39
40 class SliceSenslab (SlabBase):
41     __tablename__ = 'slice_senslab' 
42     #record_id_user = Column(Integer, primary_key=True)
43     # Multiple primary key aka composite primary key
44     # so that we can have several job id for a given slice hrn
45     slice_hrn = Column(String,primary_key=True)
46     oar_job_id = Column( Integer, primary_key=True)
47     peer_authority = Column( String,nullable = True)
48     record_id_slice = Column(Integer)    
49     record_id_user = Column(Integer) 
50
51     #oar_job_id = Column( Integer,default = -1)
52     node_list = Column(postgresql.ARRAY(String), nullable =True)
53     
54     def __init__ (self, slice_hrn =None, oar_job_id=None, record_id_slice=None, record_id_user= None,peer_authority=None):
55         self.node_list = []
56         if record_id_slice: 
57             self.record_id_slice = record_id_slice
58         if slice_hrn:
59             self.slice_hrn = slice_hrn
60         if oar_job_id:
61             self.oar_job_id = oar_job_id
62         if slice_hrn:
63             self.slice_hrn = slice_hrn 
64         if record_id_user: 
65             self.record_id_user= record_id_user
66         if peer_authority:
67             self.peer_authority = peer_authority
68             
69             
70     def __repr__(self):
71         result="<Record id user =%s, slice hrn=%s, oar_job id=%s,Record id slice =%s  node_list =%s peer_authority =%s"% \
72                 (self.record_id_user, self.slice_hrn, self.oar_job_id, self.record_id_slice, self.node_list, self.peer_authority)
73         result += ">"
74         return result
75           
76     def dump_sqlalchemyobj_to_dict(self):
77         dict = {'slice_hrn':self.slice_hrn,
78         'peer_authority':self.peer_authority,
79         'record_id':self.record_id_slice, 
80         'record_id_user':self.record_id_user,
81         'oar_job_id':self.oar_job_id, 
82         'record_id_slice':self.record_id_slice, 
83          'node_list':self.node_list}
84         return dict       
85 #class PeerSenslab(SlabBase):
86     #__tablename__ = 'peer_senslab' 
87     #peername = Column(String, nullable = False)
88     #peerid = Column( Integer,primary_key=True)
89     
90     #def __init__ (self,peername = None ):
91         #if peername:
92             #self.peername = peername
93             
94             
95       #def __repr__(self):
96         #result="<Peer id  =%s, Peer name =%s" % (self.peerid, self.peername)
97         #result += ">"
98         #return result
99           
100 class SlabDB:
101     def __init__(self,config, debug = False):
102         self.sl_base = SlabBase
103         dbname="slab_sfa"
104         if debug == True :
105             l_echo_pool = True
106             l_echo=True 
107         else :
108             l_echo_pool = False
109             l_echo = False 
110         # will be created lazily on-demand
111         self.slab_session = None
112         # the former PostgreSQL.py used the psycopg2 directly and was doing
113         #self.connection.set_client_encoding("UNICODE")
114         # it's unclear how to achieve this in sqlalchemy, nor if it's needed at all
115         # http://www.sqlalchemy.org/docs/dialects/postgresql.html#unicode
116         # we indeed have /var/lib/pgsql/data/postgresql.conf where
117         # this setting is unset, it might be an angle to tweak that if need be
118         # try a unix socket first - omitting the hostname does the trick
119         unix_url = "postgresql+psycopg2://%s:%s@:%s/%s"%\
120             (config.SFA_DB_USER,config.SFA_DB_PASSWORD,config.SFA_DB_PORT,dbname)
121         print >>sys.stderr, " \r\n \r\n SLAPOSTGRES INIT unix_url %s" %(unix_url)
122         # the TCP fallback method
123         tcp_url = "postgresql+psycopg2://%s:%s@%s:%s/%s"%\
124             (config.SFA_DB_USER,config.SFA_DB_PASSWORD,config.SFA_DB_HOST,config.SFA_DB_PORT,dbname)
125         for url in [ unix_url, tcp_url ] :
126             try:
127                 self.slab_engine = create_engine (url,echo_pool = l_echo_pool, echo = l_echo)
128                 self.check()
129                 self.url=url
130                 return
131             except:
132                 pass
133         self.slab_engine=None
134         raise Exception,"Could not connect to database"
135     
136     
137     
138     def check (self):
139         self.slab_engine.execute ("select 1").scalar()
140         
141         
142         
143     def session (self):
144         if self.slab_session is None:
145             Session=sessionmaker ()
146             self.slab_session=Session(bind=self.slab_engine)
147         return self.slab_session
148         
149         
150    
151         
152     #Close connection to database
153     def close(self):
154         if self.connection is not None:
155             self.connection.close()
156             self.connection = None
157             
158    
159         
160         
161     def exists(self, tablename):
162         """
163         Checks if the table specified as tablename exists.
164     
165         """
166        
167         try:
168             metadata = MetaData (bind=self.slab_engine)
169             table=Table (tablename, metadata, autoload=True)
170            
171             return True
172         except NoSuchTableError:
173             print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES EXISTS NOPE! tablename %s " %(tablename)
174             return False
175        
176     
177     def createtable(self, tablename ):
178         """
179         Creates the specifed table. Uses the global dictionnary holding the tablenames and
180         the table schema.
181     
182         """
183
184         print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES createtable SlabBase.metadata.sorted_tables %s \r\n engine %s" %(SlabBase.metadata.sorted_tables , slab_engine)
185         SlabBase.metadata.create_all(slab_engine)
186         return
187     
188     def add_job (self, hrn, job_id, nodes = None ):
189         slice_rec = dbsession.query(RegSlice).filter(RegSlice.hrn.match(hrn)).first()
190         if slice_rec : 
191             user_record = slice_rec.reg_researchers
192             slab_slice = SliceSenslab(slice_hrn = hrn, oar_job_id = job_id, \
193                 record_id_slice=slice_rec.record_id, record_id_user= user_record[0].record_id, nodes_list = nodes)
194             logger.debug("============SLABPOSTGRES \t add_job slab_slice %s" %(slab_slice))
195             slab_dbsession.add(slab_slice)
196             slab_dbsession.commit()
197      
198         
199     def delete_job (self, hrn, job_id):
200         slab_slice = slab_dbsession.query(SliceSenslab).filter_by(slice_hrn = hrn).filter_by(oar_job_id =job_id).first()
201         slab_dbsession.delete(slab_slice)
202         slab_dbsession.commit()
203         
204     #Updates the job_id and the nodes list 
205     #The nodes list is never erased.
206     def update_job(self, hrn, job_id, nodes = None ):
207         
208         if job_id == -1:
209             #Delete the job in DB
210             self.delete_job(hrn, job_id)
211         else :
212             self.add_job(hrn, job_id, nodes)
213         #slice_rec = slab_dbsession.query(SliceSenslab).filter_by(slice_hrn = hrn).first()
214         #print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES  update_job slice_rec %s"%(slice_rec)
215         #if job_id is not None:
216             #slice_rec.oar_job_id = job_id
217         #if nodes is not None :
218             #slice_rec.node_list = nodes
219         #slab_dbsession.commit()
220
221     def find (self, name = None, filter_dict = None):
222         print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES find  filter_dict %s"%(filter_dict)
223
224         #Filter_by can not handle more than one argument, hence these functions
225         def filter_id_user(query, user_id):
226             print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES find  filter_id_user"
227             return query.filter_by(record_id_user = user_id)
228         
229         def filter_job(query, job):
230             print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES find filter_job "
231             return query.filter_by(oar_job_id = job)
232         
233         def filer_id_slice (query, id_slice):
234             print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES find  filer_id_slice"
235             return query.filter_by(record_id_slice = id_slice)
236         
237         def filter_slice_hrn(query, hrn):
238             print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES find  filter_slice_hrn"
239             return query.filter_by(slice_hrn = hrn)
240         
241         
242         extended_filter = {'record_id_user': filter_id_user,
243          'oar_job_id':filter_job,
244          'record_id_slice': filer_id_slice,
245          'slice_hrn': filter_slice_hrn}
246          
247         Q = slab_dbsession.query(SliceSenslab) 
248         
249         if filter_dict is not None:
250             for k in filter_dict:
251                 try:
252                   newQ= extended_filter[k](Q, filter_dict[k])
253                   Q = newQ
254                 except KeyError:
255                     print>>sys.stderr, "\r\n \t\t FFFFFFFFFFFFFFFFUUUUUUUUFUFUFU!!!!!!!!"
256         print>>sys.stderr, " HEEEEEEEEEEEEY %s " %(Q.first())
257         rec = Q.first()
258         print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES find  rec %s" %(rec)
259         return dict(zip(['record_id_user','oar_job_id', 'record_id_slice','slice_hrn'],[rec.record_id_user,rec.oar_job_id,rec.record_id_slice, rec.slice_hrn]))
260         #reclist = []
261         ##for rec in Q.all():
262             #reclist.append(dict(zip(['record_id_user','oar_job_id', 'record_id_slice','slice_hrn'],[rec.record_id_user,rec.oar_job_id,rec.record_id_slice, rec.slice_hrn])))
263         #return reclist
264         
265        
266
267
268 from sfa.util.config import Config
269
270 slab_alchemy= SlabDB(Config())
271 slab_engine=slab_alchemy.slab_engine
272 slab_dbsession=slab_alchemy.session()