Merge branch 'master' into senslab2
[sfa.git] / sfa / senslab / slabpostgres.py
1 import sys
2
3 from sqlalchemy import create_engine, and_
4 from sqlalchemy.orm import sessionmaker
5
6 from sfa.util.config import Config
7 from sfa.util.sfalogging import logger
8
9 from sqlalchemy import Column, Integer, String, DateTime
10 from sqlalchemy import Table, Column, MetaData, join, ForeignKey
11 import sfa.storage.model as model
12
13 from sqlalchemy.ext.declarative import declarative_base
14 from sqlalchemy.orm import relationship, backref
15
16
17 from sqlalchemy.dialects import postgresql
18
19 from sqlalchemy import MetaData, Table
20 from sqlalchemy.exc import NoSuchTableError
21
22 from sqlalchemy import String
23
24 #Dict holding the columns names of the table as keys
25 #and their type, used for creation of the table
26 slice_table = {'record_id_user':'integer PRIMARY KEY references X ON DELETE CASCADE ON UPDATE CASCADE','oar_job_id':'integer DEFAULT -1',  'record_id_slice':'integer', 'slice_hrn':'text NOT NULL'}
27
28 #Dict with all the specific senslab tables
29 tablenames_dict = {'slice_senslab': slice_table}
30
31 ##############################
32
33
34
35 SlabBase = declarative_base()
36
37
38
39
40 class SliceSenslab (SlabBase):
41     __tablename__ = 'slice_senslab' 
42     record_id_user = Column(Integer, primary_key=True)
43     oar_job_id = Column( Integer,default = -1)
44     record_id_slice = Column(Integer)
45     slice_hrn = Column(String,nullable = False)
46     node_list = Column(postgresql.ARRAY(String), nullable =True)
47     
48     def __init__ (self, slice_hrn =None, oar_job_id=None, record_id_slice=None, record_id_user= None):
49         self.node_list = []
50         if record_id_slice: 
51             self.record_id_slice = record_id_slice
52         if slice_hrn:
53             self.slice_hrn = slice_hrn
54         if oar_job_id:
55             self.oar_job_id = oar_job_id
56         if slice_hrn:
57             self.slice_hrn = slice_hrn 
58         if record_id_user: 
59             self.record_id_user= record_id_user
60             
61             
62     def __repr__(self):
63         result="<Record id user =%s, slice hrn=%s, oar_job id=%s,Record id slice =%s  node_list =%s" % \
64                 (self.record_id_user, self.slice_hrn, self.oar_job_id, self.record_id_slice, self.node_list)
65         result += ">"
66         return result
67           
68     def dumpquerytodict(self):
69         dict = {'slice_hrn':self.slice_hrn,
70         'record_id':self.record_id_slice, 
71         'record_id_user':self.record_id_user,
72         'oar_job_id':self.oar_job_id, 
73          'record_id_slice':self.record_id_slice, 
74          'slice_hrn':self.slice_hrn,
75          'node_list':self.node_list}
76         return dict       
77 #class PeerSenslab(SlabBase):
78     #__tablename__ = 'peer_senslab' 
79     #peername = Column(String, nullable = False)
80     #peerid = Column( Integer,primary_key=True)
81     
82     #def __init__ (self,peername = None ):
83         #if peername:
84             #self.peername = peername
85             
86             
87       #def __repr__(self):
88         #result="<Peer id  =%s, Peer name =%s" % (self.peerid, self.peername)
89         #result += ">"
90         #return result
91           
92 class SlabDB:
93     def __init__(self,config):
94         self.sl_base = SlabBase
95         dbname="slab_sfa"
96         # will be created lazily on-demand
97         self.slab_session = None
98         # the former PostgreSQL.py used the psycopg2 directly and was doing
99         #self.connection.set_client_encoding("UNICODE")
100         # it's unclear how to achieve this in sqlalchemy, nor if it's needed at all
101         # http://www.sqlalchemy.org/docs/dialects/postgresql.html#unicode
102         # we indeed have /var/lib/pgsql/data/postgresql.conf where
103         # this setting is unset, it might be an angle to tweak that if need be
104         # try a unix socket first - omitting the hostname does the trick
105         unix_url = "postgresql+psycopg2://%s:%s@:%s/%s"%\
106             (config.SFA_DB_USER,config.SFA_DB_PASSWORD,config.SFA_DB_PORT,dbname)
107         print >>sys.stderr, " \r\n \r\n SLAPOSTGRES INIT unix_url %s" %(unix_url)
108         # the TCP fallback method
109         tcp_url = "postgresql+psycopg2://%s:%s@%s:%s/%s"%\
110             (config.SFA_DB_USER,config.SFA_DB_PASSWORD,config.SFA_DB_HOST,config.SFA_DB_PORT,dbname)
111         for url in [ unix_url, tcp_url ] :
112             try:
113                 self.slab_engine = create_engine (url,echo_pool=True,echo=True)
114                 self.check()
115                 self.url=url
116                 return
117             except:
118                 pass
119         self.slab_engine=None
120         raise Exception,"Could not connect to database"
121     
122     
123     
124     def check (self):
125         self.slab_engine.execute ("select 1").scalar()
126         
127         
128         
129     def session (self):
130         if self.slab_session is None:
131             Session=sessionmaker ()
132             self.slab_session=Session(bind=self.slab_engine)
133         return self.slab_session
134         
135         
136    
137         
138     #Close connection to database
139     def close(self):
140         if self.connection is not None:
141             self.connection.close()
142             self.connection = None
143             
144    
145         
146         
147     def exists(self, tablename):
148         """
149         Checks if the table specified as tablename exists.
150     
151         """
152        
153         try:
154             metadata = MetaData (bind=self.slab_engine)
155             table=Table (tablename, metadata, autoload=True)
156            
157             return True
158         except NoSuchTableError:
159             print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES EXISTS NOPE! tablename %s " %(tablename)
160             return False
161        
162     
163     def createtable(self, tablename ):
164         """
165         Creates the specifed table. Uses the global dictionnary holding the tablenames and
166         the table schema.
167     
168         """
169
170         print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES createtable SlabBase.metadata.sorted_tables %s \r\n engine %s" %(SlabBase.metadata.sorted_tables , slab_engine)
171         SlabBase.metadata.create_all(slab_engine)
172         return
173     
174     #Updates the job_id and the nodes list 
175     def update_job(self, hrn, job_id= None, nodes = None ):
176         slice_rec = slab_dbsession.query(SliceSenslab).filter_by(slice_hrn = hrn).first()
177         print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES  update_job slice_rec %s"%(slice_rec)
178         if job_id is not None:
179             slice_rec.oar_job_id = job_id
180         if nodes is not None :
181             slice_rec.node_list = nodes
182         slab_dbsession.commit()
183
184     def find (self, name = None, filter_dict = None):
185         print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES find  filter_dict %s"%(filter_dict)
186
187         #Filter_by can not handle more than one argument, hence these functions
188         def filter_id_user(query, user_id):
189             print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES find  filter_id_user"
190             return query.filter_by(record_id_user = user_id)
191         
192         def filter_job(query, job):
193             print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES find filter_job "
194             return query.filter_by(oar_job_id = job)
195         
196         def filer_id_slice (query, id_slice):
197             print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES find  filer_id_slice"
198             return query.filter_by(record_id_slice = id_slice)
199         
200         def filter_slice_hrn(query, hrn):
201             print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES find  filter_slice_hrn"
202             return query.filter_by(slice_hrn = hrn)
203         
204         
205         extended_filter = {'record_id_user': filter_id_user,
206          'oar_job_id':filter_job,
207          'record_id_slice': filer_id_slice,
208          'slice_hrn': filter_slice_hrn}
209          
210         Q = slab_dbsession.query(SliceSenslab) 
211         
212         if filter_dict is not None:
213             for k in filter_dict:
214                 try:
215                   newQ= extended_filter[k](Q, filter_dict[k])
216                   Q = newQ
217                 except KeyError:
218                     print>>sys.stderr, "\r\n \t\t FFFFFFFFFFFFFFFFUUUUUUUUFUFUFU!!!!!!!!"
219         print>>sys.stderr, " HEEEEEEEEEEEEY %s " %(Q.first())
220         rec = Q.first()
221         print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES find  rec %s" %(rec)
222         return dict(zip(['record_id_user','oar_job_id', 'record_id_slice','slice_hrn'],[rec.record_id_user,rec.oar_job_id,rec.record_id_slice, rec.slice_hrn]))
223         #reclist = []
224         ##for rec in Q.all():
225             #reclist.append(dict(zip(['record_id_user','oar_job_id', 'record_id_slice','slice_hrn'],[rec.record_id_user,rec.oar_job_id,rec.record_id_slice, rec.slice_hrn])))
226         #print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES find  reclist %s" %(reclist)
227         #return reclist
228         
229        
230
231
232 from sfa.util.config import Config
233
234 slab_alchemy= SlabDB(Config())
235 slab_engine=slab_alchemy.slab_engine
236 slab_dbsession=slab_alchemy.session()