Merge Master in geni-v3 conflict resolution
[sfa.git] / sfa / senslab / slabpostgres.py
index ac9f994..ca21776 100644 (file)
-import psycopg2
-import psycopg2.extensions
-psycopg2.extensions.register_type(psycopg2.extensions.UNICODE)
-# UNICODEARRAY not exported yet
-psycopg2.extensions.register_type(psycopg2._psycopg.UNICODEARRAY)
+from sqlalchemy import create_engine
+from sqlalchemy.orm import sessionmaker
+
 from sfa.util.config import Config
-from sfa.util.table import SfaTable
 from sfa.util.sfalogging import logger
-# allow to run sfa2wsdl if this is missing (for mac)
-import sys
-try: import pgdb
-except: print >> sys.stderr, "WARNING, could not import pgdb"
+
+from sqlalchemy import Column, Integer, String
+from sqlalchemy import Table, MetaData
+from sqlalchemy.ext.declarative import declarative_base
+
+from sqlalchemy.dialects import postgresql
+
+from sqlalchemy.exc import NoSuchTableError
+
 
 #Dict holding the columns names of the table as keys
 #and their type, used for creation of the table
-slice_table = {'record_id_user':'integer PRIMARY KEY references sfa ON DELETE CASCADE ON UPDATE CASCADE','oar_job_id':'integer DEFAULT -1',  'record_id_slice':'integer', 'slice_hrn':'text NOT NULL'}
+slice_table = {'record_id_user': 'integer PRIMARY KEY references X ON DELETE \
+CASCADE ON UPDATE CASCADE','oar_job_id':'integer DEFAULT -1',  \
+'record_id_slice':'integer', 'slice_hrn':'text NOT NULL'}
 
 #Dict with all the specific senslab tables
-tablenames_dict = {'slice': slice_table}
+tablenames_dict = {'slab_xp': slice_table}
 
-class SlabDB:
-    def __init__(self):
-        self.config = Config()
-        self.connection = None
 
-    def cursor(self):
-        if self.connection is None:
-            # (Re)initialize database connection
-            if psycopg2:
-                try:
-                    # Try UNIX socket first                    
-                    self.connection = psycopg2.connect(user = 'sfa',
-                                                       password = 'sfa',
-                                                       database = 'sfa')
-                    #self.connection = psycopg2.connect(user = self.config.SFA_PLC_DB_USER,
-                                                       #password = self.config.SFA_PLC_DB_PASSWORD,
-                                                       #database = self.config.SFA_PLC_DB_NAME)
-                except psycopg2.OperationalError:
-                    # Fall back on TCP
-                    self.connection = psycopg2.connect(user = self.config.SFA_PLC_DB_USER,
-                                                       password = self.config.SFA_PLC_DB_PASSWORD,
-                                                       database = self.config.SFA_PLC_DB_NAME,
-                                                       host = self.config.SFA_PLC_DB_HOST,
-                                                       port = self.config.SFA_PLC_DB_PORT)
-                self.connection.set_client_encoding("UNICODE")
-            else:
-                self.connection = pgdb.connect(user = self.config.SFA_PLC_DB_USER,
-                                               password = self.config.SFA_PLC_DB_PASSWORD,
-                                               host = "%s:%d" % (self.config.SFA_PLC_DB_HOST, self.config.SFA_PLC_DB_PORT),
-                                               database = self.config.SFA_PLC_DB_NAME)
+SlabBase = declarative_base()
+
 
-        return self.connection.cursor()
         
-    #Close connection to database
-    def close(self):
-        if self.connection is not None:
-            self.connection.close()
-            self.connection = None
-            
-    def selectall(self, query,  hashref = True, key_field = None):
-        """
-        Return each row as a dictionary keyed on field name (like DBI
-        selectrow_hashref()). If key_field is specified, return rows
-        as a dictionary keyed on the specified field (like DBI
-        selectall_hashref()).
+class SenslabXP (SlabBase):
+    """ SQL alchemy class to manipulate slice_senslab table in 
+    slab_sfa database.
+    
+    """
+    __tablename__ = 'slab_xp' 
 
-        """
-        cursor = self.cursor()
-        cursor.execute(query)
-        rows = cursor.fetchall()
-        cursor.close()
-        self.connection.commit()
 
-        if hashref or key_field is not None:
-            # Return each row as a dictionary keyed on field name
-            # (like DBI selectrow_hashref()).
-            labels = [column[0] for column in cursor.description]
-            rows = [dict(zip(labels, row)) for row in rows]
+    slice_hrn = Column(String)
+    job_id = Column(Integer, primary_key = True)
+    end_time = Column(Integer, nullable = False)
 
-        if key_field is not None and key_field in labels:
-            # Return rows as a dictionary keyed on the specified field
-            # (like DBI selectall_hashref()).
-            return dict([(row[key_field], row) for row in rows])
-        else:
-            return rows
-        
-        
-    def exists(self, tablename):
-        """
-        Checks if the table specified as tablename exists.
-    
-        """
-        #mark = self.cursor()
-        sql = "SELECT * from pg_tables"
-        #mark.execute(sql)
-        #rows = mark.fetchall()
-        #mark.close()
-        #labels = [column[0] for column in mark.description]
-        #rows = [dict(zip(labels, row)) for row in rows]
-        rows = self.selectall(sql)
-        rows = filter(lambda row: row['tablename'].startswith(tablename), rows)
-        if rows:
-            return True
-        return False
+
+    #oar_job_id = Column( Integer,default = -1)
+    #node_list = Column(postgresql.ARRAY(String), nullable =True)
     
-    def createtable(self, tablename ):
+    def __init__ (self, slice_hrn =None, job_id=None,  end_time=None):
         """
-        Creates the specifed table. Uses the global dictionnary holding the tablenames and
-        the table schema.
-    
+        Defines a row of the slice_senslab table
         """
-        mark = self.cursor()
-        tablelist =[]
-        if tablename not in tablenames_dict:
-            logger.error("Tablename unknown - creation failed")
-            return
+        if slice_hrn:
+            self.slice_hrn = slice_hrn
+        if job_id :
+            self.job_id = job_id
+        if end_time:
+            self.end_time = end_time
             
-        T  = tablenames_dict[tablename]
-        
-        for k in T.keys(): 
-            tmp = str(k) +' ' + T[k]
-            tablelist.append(tmp)
             
-        end_of_statement = ",".join(tablelist)
-        
-        statement = "CREATE TABLE " + tablename + " ("+ end_of_statement +");"
-     
-        #template = "CREATE INDEX %s_%s_idx ON %s (%s);"
-        #indexes = [template % ( self.tablename, field, self.tablename, field) \
-                    #for field in ['hrn', 'type', 'authority', 'peer_authority', 'pointer']]
-        # IF EXISTS doenst exist in postgres < 8.2
-        try:
-            mark.execute('DROP TABLE IF EXISTS ' + tablename +';')
-        except:
+    def __repr__(self):
+        """Prints the SQLAlchemy record to the format defined
+        by the function.
+        """
+        result = "<slab_xp : slice_hrn = %s , job_id %s end_time = %s" \
+            %(self.slice_hrn, self.job_id, self.end_time)
+        result += ">"
+        return result
+          
+   
+          
+class SlabDB:
+    """ SQL Alchemy connection class.
+    From alchemy.py
+    """
+    def __init__(self, config, debug = False):
+        self.sl_base = SlabBase
+        dbname = "slab_sfa"
+        if debug == True :
+            l_echo_pool = True
+            l_echo = True 
+        else :
+            l_echo_pool = False
+            l_echo = False 
+
+        self.slab_session = None
+        # the former PostgreSQL.py used the psycopg2 directly and was doing
+        #self.connection.set_client_encoding("UNICODE")
+        # it's unclear how to achieve this in sqlalchemy, nor if it's needed 
+        # at all
+        # http://www.sqlalchemy.org/docs/dialects/postgresql.html#unicode
+        # we indeed have /var/lib/pgsql/data/postgresql.conf where
+        # this setting is unset, it might be an angle to tweak that if need be
+        # try a unix socket first - omitting the hostname does the trick
+        unix_url = "postgresql+psycopg2://%s:%s@:%s/%s"% \
+            (config.SFA_DB_USER, config.SFA_DB_PASSWORD, \
+                                    config.SFA_DB_PORT, dbname)
+
+        # the TCP fallback method
+        tcp_url = "postgresql+psycopg2://%s:%s@%s:%s/%s"% \
+            (config.SFA_DB_USER, config.SFA_DB_PASSWORD, config.SFA_DB_HOST, \
+                                    config.SFA_DB_PORT, dbname)
+        for url in [ unix_url, tcp_url ] :
             try:
-                mark.execute('DROP TABLE' + tablename +';')
+                self.slab_engine = create_engine (url, echo_pool = \
+                                            l_echo_pool, echo = l_echo)
+                self.check()
+                self.url = url
+                return
             except:
                 pass
-            
-        mark.execute(statement)
-        #for index in indexes:
-            #self.db.do(index)
-        self.connection.commit()
-        mark.close()
-        self.close()
-        return
+        self.slab_engine = None
+        raise Exception, "Could not connect to database"
     
-
-
-
-    def insert(self, table, columns,values):
-        """
-        Inserts data (values) into the columns of the specified table. 
     
-        """
-        mark = self.cursor()
-        statement = "INSERT INTO " + table + \
-                    "(" + ",".join(columns) + ") " + \
-                    "VALUES(" + ", ".join(values) + ");"
-
-        mark.execute(statement) 
-        self.connection.commit()
-        mark.close()
-        self.close()
-        return
     
-    def insert_slab_slice(self, person_rec):
+    def check (self):
+        """ Cehck if a table exists by trying a selection
+        on the table. 
+        
         """
-        Inserts information about a user and his slice into the slice table. 
-    
+        self.slab_engine.execute ("select 1").scalar()
+        
+        
+    def session (self):
         """
-        sfatable = SfaTable()
-        keys = slice_table.keys()
+        Creates a SQLalchemy session. Once the session object is created
+        it should be used throughout the code for all the operations on 
+        tables for this given database.
+        
+        """ 
+        if self.slab_session is None:
+            Session = sessionmaker()
+            self.slab_session = Session(bind = self.slab_engine)
+        return self.slab_session
+        
+    def close_session(self): 
+        """
+        Closes connection to database. 
+        
+        """
+        if self.slab_session is None: return
+        self.slab_session.close()
+        self.slab_session = None   
         
-        #returns a list of records from the sfa table (dicts)
-        #the filters specified will return only one matching record, into a list of dicts
-        #Finds the slice associated with the user (Senslabs slices  hrns contains the user hrn)
 
-        userrecord = sfatable.find({'hrn': person_rec['hrn'], 'type':'user'})
-        slicerec =  sfatable.find({'hrn': person_rec['hrn']+'_slice', 'type':'slice'})
-        if slicerec :
-            if (isinstance (userrecord, list)):
-                userrecord = userrecord[0]
-            if (isinstance (slicerec, list)):
-                slicerec = slicerec[0]
-                
-            oar_dflt_jobid = -1
-            values = [ str(oar_dflt_jobid), ' \''+ str(slicerec['hrn']) + '\'', str(userrecord['record_id']), str( slicerec['record_id'])]
+    def exists(self, tablename):
+        """
+        Checks if the table specified as tablename exists.
     
-            self.insert('slice', keys, values)
-        else :
-            logger.error("Trying to import a not senslab slice")
-        return
-        
+        """
+       
+        try:
+            metadata = MetaData (bind=self.slab_engine)
+            table = Table (tablename, metadata, autoload=True)
+            return True
         
-    def update(self, table, column_names, values, whereclause, valueclause):
+        except NoSuchTableError:
+            logger.log_exc("SLABPOSTGRES tablename %s does not exists" \
+                            %(tablename))
+            return False
+       
+    
+    def createtable(self):
         """
-        Updates a record in a given table. 
+        Creates all the table sof the engine.  
+        Uses the global dictionnary holding the tablenames and the table schema.
     
         """
-        #Creates the values string for the update SQL command
-        vclause = valueclause
-        if len(column_names) is not len(values):
-            return
-        else:
-            valueslist = []
-            valuesdict = dict(zip(column_names,values))
-            for k in valuesdict.keys():
-                valuesdict[k] = str(valuesdict[k])
-                #v = ' \''+ str(k) + '\''+ '='+' \''+ valuesdict[k]+'\''
-                v = str(k) + '=' + valuesdict[k]
-                valueslist.append(v)
-        if isinstance(vclause,str):
-            vclause = '\''+ vclause + '\''
-        statement = "UPDATE %s SET %s WHERE %s = %s" % \
-                    (table, ", ".join(valueslist), whereclause, vclause)
-        print>>sys.stderr,"\r\n \r\n SLABPOSTGRES.PY update statement %s valuesdict %s valueslist %s" %(statement,valuesdict,valueslist)
-        mark = self.cursor()
-        mark.execute(statement) 
-        self.connection.commit()
-        mark.close()
-        self.close()
 
+        logger.debug("SLABPOSTGRES createtable SlabBase.metadata.sorted_tables \
+            %s \r\n engine %s" %(SlabBase.metadata.sorted_tables , slab_engine))
+        SlabBase.metadata.create_all(slab_engine)
         return
+    
 
-    def update_senslab_slice(self, slice_rec):
-        sfatable = SfaTable()
-        userhrn = slice_rec['hrn'].strip('_slice')
-        userrecord = sfatable.find({'hrn': userhrn, 'type':'user'})
-        if (isinstance (userrecord, list)):
-                userrecord = userrecord[0]
-        columns = [ 'record_user_id', 'oar_job_id']
-        values = [slice_rec['record_user_id'],slice_rec['oar_job_id']]
-        self.update('slice',columns, values,'record_slice_id', slice_rec['record_slice_id'])
-        return 
-        
-       
-    def find(self, tablename,record_filter = None, columns=None):
-        if not columns:
-            columns = "*"
-        else:
-            columns = ",".join(columns)
-        sql = "SELECT %s FROM %s WHERE True " % (columns, tablename)
-        
-        #if isinstance(record_filter, (list, tuple, set)):
-            #ints = filter(lambda x: isinstance(x, (int, long)), record_filter)
-            #strs = filter(lambda x: isinstance(x, StringTypes), record_filter)
-            #record_filter = Filter(SfaRecord.all_fields, {'record_id': ints, 'hrn': strs})
-            #sql += "AND (%s) %s " % record_filter.sql("OR") 
-        #elif isinstance(record_filter, dict):
-            #record_filter = Filter(SfaRecord.all_fields, record_filter)        
-            #sql += " AND (%s) %s" % record_filter.sql("AND")
-        #elif isinstance(record_filter, StringTypes):
-            #record_filter = Filter(SfaRecord.all_fields, {'hrn':[record_filter]})    
-            #sql += " AND (%s) %s" % record_filter.sql("AND")
-        #elif isinstance(record_filter, int):
-            #record_filter = Filter(SfaRecord.all_fields, {'record_id':[record_filter]})    
-            #sql += " AND (%s) %s" % record_filter.sql("AND")
-       
-        if isinstance(record_filter, dict):
-            for k in record_filter.keys():
-                sql += "AND "+' \''+ str(k) + '\''+ '='+' \''+ str(record_filter[k])+'\''
-            
-        elif isinstance(record_filter, str):
-            sql += "AND slice_hrn ="+ ' \''+record_filter+'\''
 
-        #elif isinstance(record_filter, int):
-            #record_filter = Filter(SfaRecord.all_fields, {'record_id':[record_filter]})    
-            #sql += " AND (%s) %s" % record_filter.sql("AND")
-        sql +=  ";"
-        print>>sys.stderr, " \r\n \r\n \t SLABPOSTGRES.PY find : sql %s record_filter  %s %s" %(sql, record_filter , type(record_filter))
-        results = self.selectall(sql)
-        if isinstance(results, dict):
-            results = [results]
-        return results
-       
+slab_alchemy = SlabDB(Config())
+slab_engine = slab_alchemy.slab_engine
+slab_dbsession = slab_alchemy.session()