3 from sqlalchemy import create_engine, and_
4 from sqlalchemy.orm import sessionmaker
6 from sfa.util.config import Config
7 from sfa.util.sfalogging import logger
9 from sqlalchemy import Column, Integer, String, DateTime
10 from sqlalchemy import Table, Column, MetaData, join, ForeignKey
11 import sfa.storage.model as model
12 from sfa.storage.model import RegSlice
13 from sqlalchemy.ext.declarative import declarative_base
14 from sqlalchemy.orm import relationship, backref
17 from sqlalchemy.dialects import postgresql
19 from sqlalchemy import MetaData, Table
20 from sqlalchemy.exc import NoSuchTableError
22 from sqlalchemy import String
23 from sfa.storage.alchemy import dbsession
25 #Dict holding the columns names of the table as keys
26 #and their type, used for creation of the table
27 slice_table = {'record_id_user':'integer PRIMARY KEY references X ON DELETE CASCADE ON UPDATE CASCADE','oar_job_id':'integer DEFAULT -1', 'record_id_slice':'integer', 'slice_hrn':'text NOT NULL'}
29 #Dict with all the specific senslab tables
30 tablenames_dict = {'slice_senslab': slice_table}
32 ##############################
36 SlabBase = declarative_base()
41 class SliceSenslab (SlabBase):
42 __tablename__ = 'slice_senslab'
43 #record_id_user = Column(Integer, primary_key=True)
45 slice_hrn = Column(String,primary_key=True)
46 peer_authority = Column( String,nullable = True)
47 record_id_slice = Column(Integer)
48 record_id_user = Column(Integer)
50 #oar_job_id = Column( Integer,default = -1)
51 #node_list = Column(postgresql.ARRAY(String), nullable =True)
53 def __init__ (self, slice_hrn =None, record_id_slice=None, record_id_user= None,peer_authority=None):
55 self.record_id_slice = record_id_slice
57 self.slice_hrn = slice_hrn
59 self.record_id_user= record_id_user
61 self.peer_authority = peer_authority
65 result="<Record id user =%s, slice hrn=%s, Record id slice =%s ,peer_authority =%s"% \
66 (self.record_id_user, self.slice_hrn, self.record_id_slice, self.peer_authority)
70 def dump_sqlalchemyobj_to_dict(self):
71 dict = {'slice_hrn':self.slice_hrn,
72 'peer_authority':self.peer_authority,
73 'record_id':self.record_id_slice,
74 'record_id_user':self.record_id_user,
75 'record_id_slice':self.record_id_slice, }
79 class JobSenslab (SlabBase):
80 __tablename__ = 'job_senslab'
81 #record_id_user = Column(Integer, primary_key=True)
82 # Multiple primary key aka composite primary key
83 # so that we can have several job id for a given slice hrn
84 slice_hrn = Column(String,ForeignKey('slice_senslab.slice_hrn'))
85 oar_job_id = Column( Integer, primary_key=True)
86 record_id_slice = Column(Integer)
87 record_id_user = Column(Integer)
89 #oar_job_id = Column( Integer,default = -1)
90 node_list = Column(postgresql.ARRAY(String), nullable =True)
92 slice_complete = relationship("SliceSenslab", backref=backref('job_senslab', order_by=slice_hrn))
94 def __init__ (self, slice_hrn =None, oar_job_id=None, record_id_slice=None, record_id_user= None):
97 self.record_id_slice = record_id_slice
99 self.slice_hrn = slice_hrn
101 self.oar_job_id = oar_job_id
103 self.record_id_user= record_id_user
107 result="<Record id user =%s, slice hrn=%s, oar_job id=%s,Record id slice =%s node_list =%s "% \
108 (self.record_id_user, self.slice_hrn, self.oar_job_id, self.record_id_slice, self.node_list)
112 def dump_sqlalchemyobj_to_dict(self):
113 dict = {'slice_hrn':self.slice_hrn,
114 'record_id_user':self.record_id_user,
115 'oar_job_id':self.oar_job_id,
116 'record_id_slice':self.record_id_slice,
117 'node_list':self.node_list}
120 #class PeerSenslab(SlabBase):
121 #__tablename__ = 'peer_senslab'
122 #peername = Column(String, nullable = False)
123 #peerid = Column( Integer,primary_key=True)
125 #def __init__ (self,peername = None ):
127 #self.peername = peername
131 #result="<Peer id =%s, Peer name =%s" % (self.peerid, self.peername)
136 def __init__(self,config, debug = False):
137 self.sl_base = SlabBase
145 # will be created lazily on-demand
146 self.slab_session = None
147 # the former PostgreSQL.py used the psycopg2 directly and was doing
148 #self.connection.set_client_encoding("UNICODE")
149 # it's unclear how to achieve this in sqlalchemy, nor if it's needed at all
150 # http://www.sqlalchemy.org/docs/dialects/postgresql.html#unicode
151 # we indeed have /var/lib/pgsql/data/postgresql.conf where
152 # this setting is unset, it might be an angle to tweak that if need be
153 # try a unix socket first - omitting the hostname does the trick
154 unix_url = "postgresql+psycopg2://%s:%s@:%s/%s"%\
155 (config.SFA_DB_USER,config.SFA_DB_PASSWORD,config.SFA_DB_PORT,dbname)
156 print >>sys.stderr, " \r\n \r\n SLAPOSTGRES INIT unix_url %s" %(unix_url)
157 # the TCP fallback method
158 tcp_url = "postgresql+psycopg2://%s:%s@%s:%s/%s"%\
159 (config.SFA_DB_USER,config.SFA_DB_PASSWORD,config.SFA_DB_HOST,config.SFA_DB_PORT,dbname)
160 for url in [ unix_url, tcp_url ] :
162 self.slab_engine = create_engine (url,echo_pool = l_echo_pool, echo = l_echo)
168 self.slab_engine=None
169 raise Exception,"Could not connect to database"
174 self.slab_engine.execute ("select 1").scalar()
179 if self.slab_session is None:
180 Session=sessionmaker ()
181 self.slab_session=Session(bind=self.slab_engine)
182 return self.slab_session
187 #Close connection to database
189 if self.connection is not None:
190 self.connection.close()
191 self.connection = None
196 def exists(self, tablename):
198 Checks if the table specified as tablename exists.
203 metadata = MetaData (bind=self.slab_engine)
204 table=Table (tablename, metadata, autoload=True)
207 except NoSuchTableError:
208 print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES EXISTS NOPE! tablename %s " %(tablename)
212 def createtable(self, tablename ):
214 Creates the specifed table. Uses the global dictionnary holding the tablenames and
219 print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES createtable SlabBase.metadata.sorted_tables %s \r\n engine %s" %(SlabBase.metadata.sorted_tables , slab_engine)
220 SlabBase.metadata.create_all(slab_engine)
223 def add_job (self, hrn, job_id, nodes = None ):
224 job_row = slab_dbsession.query(JobSenslab).filter_by(oar_job_id=job_id).first()
226 slice_rec = dbsession.query(RegSlice).filter(RegSlice.hrn.match(hrn)).first()
228 user_record = slice_rec.reg_researchers
229 slab_slice = JobSenslab(slice_hrn = hrn, oar_job_id = job_id, \
230 record_id_slice=slice_rec.record_id, record_id_user= user_record[0].record_id)
231 #slab_slice = SliceSenslab(slice_hrn = hrn, oar_job_id = job_id, \
232 #record_id_slice=slice_rec.record_id, record_id_user= user_record[0].record_id)
233 logger.debug("============SLABPOSTGRES \t add_job slab_slice %s" %(slab_slice))
234 slab_dbsession.add(slab_slice)
235 slab_slice.node_list = nodes
236 slab_dbsession.commit()
241 def delete_job (self, hrn, job_id):
243 slab_dbsession.query(JobSenslab).filter_by(slice_hrn = hrn).filter_by(oar_job_id =job_id).delete()
244 #slab_dbsession.delete(slab_slice)
245 slab_dbsession.commit()
247 #Updates the job_id and the nodes list
248 #The nodes list is never erased.
252 def find (self, name = None, filter_dict = None):
253 print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES find filter_dict %s"%(filter_dict)
255 #Filter_by can not handle more than one argument, hence these functions
256 def filter_id_user(query, user_id):
257 print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES find filter_id_user"
258 return query.filter_by(record_id_user = user_id)
260 def filter_job(query, job):
261 print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES find filter_job "
262 return query.filter_by(oar_job_id = job)
264 def filer_id_slice (query, id_slice):
265 print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES find filer_id_slice"
266 return query.filter_by(record_id_slice = id_slice)
268 def filter_slice_hrn(query, hrn):
269 print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES find filter_slice_hrn"
270 return query.filter_by(slice_hrn = hrn)
273 extended_filter = {'record_id_user': filter_id_user,
274 'oar_job_id':filter_job,
275 'record_id_slice': filer_id_slice,
276 'slice_hrn': filter_slice_hrn}
278 Q = slab_dbsession.query(SliceSenslab)
280 if filter_dict is not None:
281 for k in filter_dict:
283 newQ= extended_filter[k](Q, filter_dict[k])
286 print>>sys.stderr, "\r\n \t\t FFFFFFFFFFFFFFFFUUUUUUUUFUFUFU!!!!!!!!"
287 print>>sys.stderr, " HEEEEEEEEEEEEY %s " %(Q.first())
289 print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES find rec %s" %(rec)
290 return dict(zip(['record_id_user','oar_job_id', 'record_id_slice','slice_hrn'],[rec.record_id_user,rec.oar_job_id,rec.record_id_slice, rec.slice_hrn]))
292 ##for rec in Q.all():
293 #reclist.append(dict(zip(['record_id_user','oar_job_id', 'record_id_slice','slice_hrn'],[rec.record_id_user,rec.oar_job_id,rec.record_id_slice, rec.slice_hrn])))
299 from sfa.util.config import Config
301 slab_alchemy= SlabDB(Config())
302 slab_engine=slab_alchemy.slab_engine
303 slab_dbsession=slab_alchemy.session()