3 from sqlalchemy import create_engine, and_
4 from sqlalchemy.orm import sessionmaker
6 from sfa.util.config import Config
7 from sfa.util.sfalogging import logger
9 from sqlalchemy import Column, Integer, String, DateTime
10 from sqlalchemy import Table, Column, MetaData, join, ForeignKey
11 import sfa.storage.model as model
13 from sqlalchemy.ext.declarative import declarative_base
14 from sqlalchemy.orm import relationship, backref
17 from sqlalchemy.dialects import postgresql
19 from sqlalchemy import MetaData, Table
20 from sqlalchemy.exc import NoSuchTableError
22 from sqlalchemy import String
24 #Dict holding the columns names of the table as keys
25 #and their type, used for creation of the table
26 slice_table = {'record_id_user':'integer PRIMARY KEY references X ON DELETE CASCADE ON UPDATE CASCADE','oar_job_id':'integer DEFAULT -1', 'record_id_slice':'integer', 'slice_hrn':'text NOT NULL'}
28 #Dict with all the specific senslab tables
29 tablenames_dict = {'slice_senslab': slice_table}
31 ##############################
35 SlabBase = declarative_base()
40 class SliceSenslab (SlabBase):
41 __tablename__ = 'slice_senslab'
42 #record_id_user = Column(Integer, primary_key=True)
43 # Multiple primary key aka composite primary key
44 # so that we can have several job id for a given slice hrn
45 slice_hrn = Column(String,primary_key=True)
46 oar_job_id = Column( Integer, primary_key=True)
47 peer_authority = Column( String,nullable = True)
48 record_id_slice = Column(Integer)
49 record_id_user = Column(Integer)
51 #oar_job_id = Column( Integer,default = -1)
52 node_list = Column(postgresql.ARRAY(String), nullable =True)
54 def __init__ (self, slice_hrn =None, oar_job_id=None, record_id_slice=None, record_id_user= None,peer_authority=None):
57 self.record_id_slice = record_id_slice
59 self.slice_hrn = slice_hrn
61 self.oar_job_id = oar_job_id
63 self.slice_hrn = slice_hrn
65 self.record_id_user= record_id_user
67 self.peer_authority = peer_authority
71 result="<Record id user =%s, slice hrn=%s, oar_job id=%s,Record id slice =%s node_list =%s peer_authority =%s"% \
72 (self.record_id_user, self.slice_hrn, self.oar_job_id, self.record_id_slice, self.node_list, self.peer_authority)
76 def dump_sqlalchemyobj_to_dict(self):
77 dict = {'slice_hrn':self.slice_hrn,
78 'peer_authority':self.peer_authority,
79 'record_id':self.record_id_slice,
80 'record_id_user':self.record_id_user,
81 'oar_job_id':self.oar_job_id,
82 'record_id_slice':self.record_id_slice,
83 'node_list':self.node_list}
85 #class PeerSenslab(SlabBase):
86 #__tablename__ = 'peer_senslab'
87 #peername = Column(String, nullable = False)
88 #peerid = Column( Integer,primary_key=True)
90 #def __init__ (self,peername = None ):
92 #self.peername = peername
96 #result="<Peer id =%s, Peer name =%s" % (self.peerid, self.peername)
101 def __init__(self,config, debug = False):
102 self.sl_base = SlabBase
110 # will be created lazily on-demand
111 self.slab_session = None
112 # the former PostgreSQL.py used the psycopg2 directly and was doing
113 #self.connection.set_client_encoding("UNICODE")
114 # it's unclear how to achieve this in sqlalchemy, nor if it's needed at all
115 # http://www.sqlalchemy.org/docs/dialects/postgresql.html#unicode
116 # we indeed have /var/lib/pgsql/data/postgresql.conf where
117 # this setting is unset, it might be an angle to tweak that if need be
118 # try a unix socket first - omitting the hostname does the trick
119 unix_url = "postgresql+psycopg2://%s:%s@:%s/%s"%\
120 (config.SFA_DB_USER,config.SFA_DB_PASSWORD,config.SFA_DB_PORT,dbname)
121 print >>sys.stderr, " \r\n \r\n SLAPOSTGRES INIT unix_url %s" %(unix_url)
122 # the TCP fallback method
123 tcp_url = "postgresql+psycopg2://%s:%s@%s:%s/%s"%\
124 (config.SFA_DB_USER,config.SFA_DB_PASSWORD,config.SFA_DB_HOST,config.SFA_DB_PORT,dbname)
125 for url in [ unix_url, tcp_url ] :
127 self.slab_engine = create_engine (url,echo_pool = l_echo_pool, echo = l_echo)
133 self.slab_engine=None
134 raise Exception,"Could not connect to database"
139 self.slab_engine.execute ("select 1").scalar()
144 if self.slab_session is None:
145 Session=sessionmaker ()
146 self.slab_session=Session(bind=self.slab_engine)
147 return self.slab_session
152 #Close connection to database
154 if self.connection is not None:
155 self.connection.close()
156 self.connection = None
161 def exists(self, tablename):
163 Checks if the table specified as tablename exists.
168 metadata = MetaData (bind=self.slab_engine)
169 table=Table (tablename, metadata, autoload=True)
172 except NoSuchTableError:
173 print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES EXISTS NOPE! tablename %s " %(tablename)
177 def createtable(self, tablename ):
179 Creates the specifed table. Uses the global dictionnary holding the tablenames and
184 print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES createtable SlabBase.metadata.sorted_tables %s \r\n engine %s" %(SlabBase.metadata.sorted_tables , slab_engine)
185 SlabBase.metadata.create_all(slab_engine)
188 def add_job (self, hrn, job_id, nodes = None ):
189 slice_rec = dbsession.query(RegSlice).filter(RegSlice.hrn.match(hrn)).first()
191 user_record = slice_rec.reg_researchers
192 slab_slice = SliceSenslab(slice_hrn = hrn, oar_job_id = job_id, \
193 record_id_slice=slice_rec.record_id, record_id_user= user_record[0].record_id, nodes_list = nodes)
194 logger.debug("============SLABPOSTGRES \t add_job slab_slice %s" %(slab_slice))
195 slab_dbsession.add(slab_slice)
196 slab_dbsession.commit()
199 def delete_job (self, hrn, job_id):
200 slab_slice = slab_dbsession.query(SliceSenslab).filter_by(slice_hrn = hrn).filter_by(oar_job_id =job_id).first()
201 slab_dbsession.delete(slab_slice)
202 slab_dbsession.commit()
204 #Updates the job_id and the nodes list
205 #The nodes list is never erased.
206 def update_job(self, hrn, job_id, nodes = None ):
209 #Delete the job in DB
210 self.delete_job(hrn, job_id)
212 self.add_job(hrn, job_id, nodes)
213 #slice_rec = slab_dbsession.query(SliceSenslab).filter_by(slice_hrn = hrn).first()
214 #print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES update_job slice_rec %s"%(slice_rec)
215 #if job_id is not None:
216 #slice_rec.oar_job_id = job_id
217 #if nodes is not None :
218 #slice_rec.node_list = nodes
219 #slab_dbsession.commit()
221 def find (self, name = None, filter_dict = None):
222 print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES find filter_dict %s"%(filter_dict)
224 #Filter_by can not handle more than one argument, hence these functions
225 def filter_id_user(query, user_id):
226 print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES find filter_id_user"
227 return query.filter_by(record_id_user = user_id)
229 def filter_job(query, job):
230 print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES find filter_job "
231 return query.filter_by(oar_job_id = job)
233 def filer_id_slice (query, id_slice):
234 print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES find filer_id_slice"
235 return query.filter_by(record_id_slice = id_slice)
237 def filter_slice_hrn(query, hrn):
238 print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES find filter_slice_hrn"
239 return query.filter_by(slice_hrn = hrn)
242 extended_filter = {'record_id_user': filter_id_user,
243 'oar_job_id':filter_job,
244 'record_id_slice': filer_id_slice,
245 'slice_hrn': filter_slice_hrn}
247 Q = slab_dbsession.query(SliceSenslab)
249 if filter_dict is not None:
250 for k in filter_dict:
252 newQ= extended_filter[k](Q, filter_dict[k])
255 print>>sys.stderr, "\r\n \t\t FFFFFFFFFFFFFFFFUUUUUUUUFUFUFU!!!!!!!!"
256 print>>sys.stderr, " HEEEEEEEEEEEEY %s " %(Q.first())
258 print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES find rec %s" %(rec)
259 return dict(zip(['record_id_user','oar_job_id', 'record_id_slice','slice_hrn'],[rec.record_id_user,rec.oar_job_id,rec.record_id_slice, rec.slice_hrn]))
261 ##for rec in Q.all():
262 #reclist.append(dict(zip(['record_id_user','oar_job_id', 'record_id_slice','slice_hrn'],[rec.record_id_user,rec.oar_job_id,rec.record_id_slice, rec.slice_hrn])))
268 from sfa.util.config import Config
270 slab_alchemy= SlabDB(Config())
271 slab_engine=slab_alchemy.slab_engine
272 slab_dbsession=slab_alchemy.session()