Merge branch 'master' into senslab2
[sfa.git] / sfa / senslab / slabpostgres.py
1 import sys
2
3 from sqlalchemy import create_engine, and_
4 from sqlalchemy.orm import sessionmaker
5
6 from sfa.util.config import Config
7 from sfa.util.sfalogging import logger
8
9 from sqlalchemy import Column, Integer, String, DateTime
10 from sqlalchemy import Table, Column, MetaData, join, ForeignKey
11 import sfa.storage.model as model
12
13 from sqlalchemy.ext.declarative import declarative_base
14 from sqlalchemy.orm import relationship, backref
15
16
17 from sqlalchemy.dialects import postgresql
18
19 from sqlalchemy import MetaData, Table
20 from sqlalchemy.exc import NoSuchTableError
21
22 from sqlalchemy import String
23
24 #Dict holding the columns names of the table as keys
25 #and their type, used for creation of the table
26 slice_table = {'record_id_user':'integer PRIMARY KEY references X ON DELETE CASCADE ON UPDATE CASCADE','oar_job_id':'integer DEFAULT -1',  'record_id_slice':'integer', 'slice_hrn':'text NOT NULL'}
27
28 #Dict with all the specific senslab tables
29 tablenames_dict = {'slice_senslab': slice_table}
30
31 ##############################
32
33
34
35 SlabBase = declarative_base()
36
37
38
39
40 class SliceSenslab (SlabBase):
41     __tablename__ = 'slice_senslab' 
42     #record_id_user = Column(Integer, primary_key=True)
43     slice_hrn = Column(String,primary_key=True)
44     peer_authority = Column( String,nullable = True)
45     record_id_slice = Column(Integer)    
46     record_id_user = Column(Integer)
47     oar_job_id = Column( Integer,default = -1)
48     node_list = Column(postgresql.ARRAY(String), nullable =True)
49     
50     def __init__ (self, slice_hrn =None, oar_job_id=None, record_id_slice=None, record_id_user= None,peer_authority=None):
51         self.node_list = []
52         if record_id_slice: 
53             self.record_id_slice = record_id_slice
54         if slice_hrn:
55             self.slice_hrn = slice_hrn
56         if oar_job_id:
57             self.oar_job_id = oar_job_id
58         if slice_hrn:
59             self.slice_hrn = slice_hrn 
60         if record_id_user: 
61             self.record_id_user= record_id_user
62         if peer_authority:
63             self.peer_authority = peer_authority
64             
65             
66     def __repr__(self):
67         result="<Record id user =%s, slice hrn=%s, oar_job id=%s,Record id slice =%s  node_list =%s peer_authority =%s"% \
68                 (self.record_id_user, self.slice_hrn, self.oar_job_id, self.record_id_slice, self.node_list, self.peer_authority)
69         result += ">"
70         return result
71           
72     def dump_sqlalchemyobj_to_dict(self):
73         dict = {'slice_hrn':self.slice_hrn,
74         'peer_authority':self.peer_authority,
75         'record_id':self.record_id_slice, 
76         'record_id_user':self.record_id_user,
77         'oar_job_id':self.oar_job_id, 
78         'record_id_slice':self.record_id_slice, 
79          'node_list':self.node_list}
80         return dict       
81 #class PeerSenslab(SlabBase):
82     #__tablename__ = 'peer_senslab' 
83     #peername = Column(String, nullable = False)
84     #peerid = Column( Integer,primary_key=True)
85     
86     #def __init__ (self,peername = None ):
87         #if peername:
88             #self.peername = peername
89             
90             
91       #def __repr__(self):
92         #result="<Peer id  =%s, Peer name =%s" % (self.peerid, self.peername)
93         #result += ">"
94         #return result
95           
96 class SlabDB:
97     def __init__(self,config, debug = False):
98         self.sl_base = SlabBase
99         dbname="slab_sfa"
100         if debug == True :
101             l_echo_pool = True
102             l_echo=True 
103         else :
104             l_echo_pool = False
105             l_echo = False 
106         # will be created lazily on-demand
107         self.slab_session = None
108         # the former PostgreSQL.py used the psycopg2 directly and was doing
109         #self.connection.set_client_encoding("UNICODE")
110         # it's unclear how to achieve this in sqlalchemy, nor if it's needed at all
111         # http://www.sqlalchemy.org/docs/dialects/postgresql.html#unicode
112         # we indeed have /var/lib/pgsql/data/postgresql.conf where
113         # this setting is unset, it might be an angle to tweak that if need be
114         # try a unix socket first - omitting the hostname does the trick
115         unix_url = "postgresql+psycopg2://%s:%s@:%s/%s"%\
116             (config.SFA_DB_USER,config.SFA_DB_PASSWORD,config.SFA_DB_PORT,dbname)
117         print >>sys.stderr, " \r\n \r\n SLAPOSTGRES INIT unix_url %s" %(unix_url)
118         # the TCP fallback method
119         tcp_url = "postgresql+psycopg2://%s:%s@%s:%s/%s"%\
120             (config.SFA_DB_USER,config.SFA_DB_PASSWORD,config.SFA_DB_HOST,config.SFA_DB_PORT,dbname)
121         for url in [ unix_url, tcp_url ] :
122             try:
123                 self.slab_engine = create_engine (url,echo_pool = l_echo_pool, echo = l_echo)
124                 self.check()
125                 self.url=url
126                 return
127             except:
128                 pass
129         self.slab_engine=None
130         raise Exception,"Could not connect to database"
131     
132     
133     
134     def check (self):
135         self.slab_engine.execute ("select 1").scalar()
136         
137         
138         
139     def session (self):
140         if self.slab_session is None:
141             Session=sessionmaker ()
142             self.slab_session=Session(bind=self.slab_engine)
143         return self.slab_session
144         
145         
146    
147         
148     #Close connection to database
149     def close(self):
150         if self.connection is not None:
151             self.connection.close()
152             self.connection = None
153             
154    
155         
156         
157     def exists(self, tablename):
158         """
159         Checks if the table specified as tablename exists.
160     
161         """
162        
163         try:
164             metadata = MetaData (bind=self.slab_engine)
165             table=Table (tablename, metadata, autoload=True)
166            
167             return True
168         except NoSuchTableError:
169             print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES EXISTS NOPE! tablename %s " %(tablename)
170             return False
171        
172     
173     def createtable(self, tablename ):
174         """
175         Creates the specifed table. Uses the global dictionnary holding the tablenames and
176         the table schema.
177     
178         """
179
180         print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES createtable SlabBase.metadata.sorted_tables %s \r\n engine %s" %(SlabBase.metadata.sorted_tables , slab_engine)
181         SlabBase.metadata.create_all(slab_engine)
182         return
183     
184     #Updates the job_id and the nodes list 
185     #The nodes list is never erased.
186     def update_job(self, hrn, job_id= None, nodes = None ):
187         slice_rec = slab_dbsession.query(SliceSenslab).filter_by(slice_hrn = hrn).first()
188         print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES  update_job slice_rec %s"%(slice_rec)
189         if job_id is not None:
190             slice_rec.oar_job_id = job_id
191         if nodes is not None :
192             slice_rec.node_list = nodes
193         slab_dbsession.commit()
194
195     def find (self, name = None, filter_dict = None):
196         print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES find  filter_dict %s"%(filter_dict)
197
198         #Filter_by can not handle more than one argument, hence these functions
199         def filter_id_user(query, user_id):
200             print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES find  filter_id_user"
201             return query.filter_by(record_id_user = user_id)
202         
203         def filter_job(query, job):
204             print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES find filter_job "
205             return query.filter_by(oar_job_id = job)
206         
207         def filer_id_slice (query, id_slice):
208             print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES find  filer_id_slice"
209             return query.filter_by(record_id_slice = id_slice)
210         
211         def filter_slice_hrn(query, hrn):
212             print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES find  filter_slice_hrn"
213             return query.filter_by(slice_hrn = hrn)
214         
215         
216         extended_filter = {'record_id_user': filter_id_user,
217          'oar_job_id':filter_job,
218          'record_id_slice': filer_id_slice,
219          'slice_hrn': filter_slice_hrn}
220          
221         Q = slab_dbsession.query(SliceSenslab) 
222         
223         if filter_dict is not None:
224             for k in filter_dict:
225                 try:
226                   newQ= extended_filter[k](Q, filter_dict[k])
227                   Q = newQ
228                 except KeyError:
229                     print>>sys.stderr, "\r\n \t\t FFFFFFFFFFFFFFFFUUUUUUUUFUFUFU!!!!!!!!"
230         print>>sys.stderr, " HEEEEEEEEEEEEY %s " %(Q.first())
231         rec = Q.first()
232         print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES find  rec %s" %(rec)
233         return dict(zip(['record_id_user','oar_job_id', 'record_id_slice','slice_hrn'],[rec.record_id_user,rec.oar_job_id,rec.record_id_slice, rec.slice_hrn]))
234         #reclist = []
235         ##for rec in Q.all():
236             #reclist.append(dict(zip(['record_id_user','oar_job_id', 'record_id_slice','slice_hrn'],[rec.record_id_user,rec.oar_job_id,rec.record_id_slice, rec.slice_hrn])))
237         #return reclist
238         
239        
240
241
242 from sfa.util.config import Config
243
244 slab_alchemy= SlabDB(Config())
245 slab_engine=slab_alchemy.slab_engine
246 slab_dbsession=slab_alchemy.session()