Merge branch 'master' into senslab2
[sfa.git] / sfa / senslab / slabpostgres.py
1 import sys
2
3 from sqlalchemy import create_engine, and_
4 from sqlalchemy.orm import sessionmaker
5
6 from sfa.util.config import Config
7 from sfa.util.sfalogging import logger
8
9 from sqlalchemy import Column, Integer, String, DateTime
10 from sqlalchemy import Table, Column, MetaData, join, ForeignKey
11 import sfa.storage.model as model
12
13 from sqlalchemy.ext.declarative import declarative_base
14 from sqlalchemy.orm import relationship, backref
15
16
17 from sqlalchemy.dialects import postgresql
18
19 from sqlalchemy import MetaData, Table
20 from sqlalchemy.exc import NoSuchTableError
21
22 from sqlalchemy import String
23
24 #Dict holding the columns names of the table as keys
25 #and their type, used for creation of the table
26 slice_table = {'record_id_user':'integer PRIMARY KEY references X ON DELETE CASCADE ON UPDATE CASCADE','oar_job_id':'integer DEFAULT -1',  'record_id_slice':'integer', 'slice_hrn':'text NOT NULL'}
27
28 #Dict with all the specific senslab tables
29 tablenames_dict = {'slice_senslab': slice_table}
30
31 ##############################
32
33
34
35 SlabBase = declarative_base()
36
37
38
39
40 class SliceSenslab (SlabBase):
41     __tablename__ = 'slice_senslab' 
42     #record_id_user = Column(Integer, primary_key=True)
43     slice_hrn = Column(String,primary_key=True)
44     peer_authority = Column( String,nullable = True)
45     record_id_slice = Column(Integer)    
46     record_id_user = Column(Integer)
47     oar_job_id = Column( Integer,default = -1)
48     node_list = Column(postgresql.ARRAY(String), nullable =True)
49     
50     def __init__ (self, slice_hrn =None, oar_job_id=None, record_id_slice=None, record_id_user= None,peer_authority=None):
51         self.node_list = []
52         if record_id_slice: 
53             self.record_id_slice = record_id_slice
54         if slice_hrn:
55             self.slice_hrn = slice_hrn
56         if oar_job_id:
57             self.oar_job_id = oar_job_id
58         if slice_hrn:
59             self.slice_hrn = slice_hrn 
60         if record_id_user: 
61             self.record_id_user= record_id_user
62         if peer_authority:
63             self.peer_authority = peer_authority
64             
65             
66     def __repr__(self):
67         result="<Record id user =%s, slice hrn=%s, oar_job id=%s,Record id slice =%s  node_list =%s peer_authority =%s"% \
68                 (self.record_id_user, self.slice_hrn, self.oar_job_id, self.record_id_slice, self.node_list, self.peer_authority)
69         result += ">"
70         return result
71           
72     def dumpquerytodict(self):
73         dict = {'slice_hrn':self.slice_hrn,
74         'peer_authority':self.peer_authority,
75         'record_id':self.record_id_slice, 
76         'record_id_user':self.record_id_user,
77         'oar_job_id':self.oar_job_id, 
78         'record_id_slice':self.record_id_slice, 
79          'node_list':self.node_list}
80         return dict       
81 #class PeerSenslab(SlabBase):
82     #__tablename__ = 'peer_senslab' 
83     #peername = Column(String, nullable = False)
84     #peerid = Column( Integer,primary_key=True)
85     
86     #def __init__ (self,peername = None ):
87         #if peername:
88             #self.peername = peername
89             
90             
91       #def __repr__(self):
92         #result="<Peer id  =%s, Peer name =%s" % (self.peerid, self.peername)
93         #result += ">"
94         #return result
95           
96 class SlabDB:
97     def __init__(self,config):
98         self.sl_base = SlabBase
99         dbname="slab_sfa"
100         # will be created lazily on-demand
101         self.slab_session = None
102         # the former PostgreSQL.py used the psycopg2 directly and was doing
103         #self.connection.set_client_encoding("UNICODE")
104         # it's unclear how to achieve this in sqlalchemy, nor if it's needed at all
105         # http://www.sqlalchemy.org/docs/dialects/postgresql.html#unicode
106         # we indeed have /var/lib/pgsql/data/postgresql.conf where
107         # this setting is unset, it might be an angle to tweak that if need be
108         # try a unix socket first - omitting the hostname does the trick
109         unix_url = "postgresql+psycopg2://%s:%s@:%s/%s"%\
110             (config.SFA_DB_USER,config.SFA_DB_PASSWORD,config.SFA_DB_PORT,dbname)
111         print >>sys.stderr, " \r\n \r\n SLAPOSTGRES INIT unix_url %s" %(unix_url)
112         # the TCP fallback method
113         tcp_url = "postgresql+psycopg2://%s:%s@%s:%s/%s"%\
114             (config.SFA_DB_USER,config.SFA_DB_PASSWORD,config.SFA_DB_HOST,config.SFA_DB_PORT,dbname)
115         for url in [ unix_url, tcp_url ] :
116             try:
117                 self.slab_engine = create_engine (url,echo_pool=True,echo=True)
118                 self.check()
119                 self.url=url
120                 return
121             except:
122                 pass
123         self.slab_engine=None
124         raise Exception,"Could not connect to database"
125     
126     
127     
128     def check (self):
129         self.slab_engine.execute ("select 1").scalar()
130         
131         
132         
133     def session (self):
134         if self.slab_session is None:
135             Session=sessionmaker ()
136             self.slab_session=Session(bind=self.slab_engine)
137         return self.slab_session
138         
139         
140    
141         
142     #Close connection to database
143     def close(self):
144         if self.connection is not None:
145             self.connection.close()
146             self.connection = None
147             
148    
149         
150         
151     def exists(self, tablename):
152         """
153         Checks if the table specified as tablename exists.
154     
155         """
156        
157         try:
158             metadata = MetaData (bind=self.slab_engine)
159             table=Table (tablename, metadata, autoload=True)
160            
161             return True
162         except NoSuchTableError:
163             print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES EXISTS NOPE! tablename %s " %(tablename)
164             return False
165        
166     
167     def createtable(self, tablename ):
168         """
169         Creates the specifed table. Uses the global dictionnary holding the tablenames and
170         the table schema.
171     
172         """
173
174         print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES createtable SlabBase.metadata.sorted_tables %s \r\n engine %s" %(SlabBase.metadata.sorted_tables , slab_engine)
175         SlabBase.metadata.create_all(slab_engine)
176         return
177     
178     #Updates the job_id and the nodes list 
179     #The nodes list is never erased.
180     def update_job(self, hrn, job_id= None, nodes = None ):
181         slice_rec = slab_dbsession.query(SliceSenslab).filter_by(slice_hrn = hrn).first()
182         print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES  update_job slice_rec %s"%(slice_rec)
183         if job_id is not None:
184             slice_rec.oar_job_id = job_id
185         if nodes is not None :
186             slice_rec.node_list = nodes
187         slab_dbsession.commit()
188
189     def find (self, name = None, filter_dict = None):
190         print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES find  filter_dict %s"%(filter_dict)
191
192         #Filter_by can not handle more than one argument, hence these functions
193         def filter_id_user(query, user_id):
194             print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES find  filter_id_user"
195             return query.filter_by(record_id_user = user_id)
196         
197         def filter_job(query, job):
198             print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES find filter_job "
199             return query.filter_by(oar_job_id = job)
200         
201         def filer_id_slice (query, id_slice):
202             print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES find  filer_id_slice"
203             return query.filter_by(record_id_slice = id_slice)
204         
205         def filter_slice_hrn(query, hrn):
206             print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES find  filter_slice_hrn"
207             return query.filter_by(slice_hrn = hrn)
208         
209         
210         extended_filter = {'record_id_user': filter_id_user,
211          'oar_job_id':filter_job,
212          'record_id_slice': filer_id_slice,
213          'slice_hrn': filter_slice_hrn}
214          
215         Q = slab_dbsession.query(SliceSenslab) 
216         
217         if filter_dict is not None:
218             for k in filter_dict:
219                 try:
220                   newQ= extended_filter[k](Q, filter_dict[k])
221                   Q = newQ
222                 except KeyError:
223                     print>>sys.stderr, "\r\n \t\t FFFFFFFFFFFFFFFFUUUUUUUUFUFUFU!!!!!!!!"
224         print>>sys.stderr, " HEEEEEEEEEEEEY %s " %(Q.first())
225         rec = Q.first()
226         print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES find  rec %s" %(rec)
227         return dict(zip(['record_id_user','oar_job_id', 'record_id_slice','slice_hrn'],[rec.record_id_user,rec.oar_job_id,rec.record_id_slice, rec.slice_hrn]))
228         #reclist = []
229         ##for rec in Q.all():
230             #reclist.append(dict(zip(['record_id_user','oar_job_id', 'record_id_slice','slice_hrn'],[rec.record_id_user,rec.oar_job_id,rec.record_id_slice, rec.slice_hrn])))
231         #return reclist
232         
233        
234
235
236 from sfa.util.config import Config
237
238 slab_alchemy= SlabDB(Config())
239 slab_engine=slab_alchemy.slab_engine
240 slab_dbsession=slab_alchemy.session()