From 8d99050a6c824aa79f3df2a9e85574ded644a2a7 Mon Sep 17 00:00:00 2001 From: Sandrine Avakian Date: Tue, 29 Nov 2011 15:50:41 +0100 Subject: [PATCH] Added support for new slice table for Senslab. --- sfa/generic/slab.py | 4 +- sfa/senslab/slab-import.py | 16 ++- sfa/senslab/slabpostgres.py | 194 ++++++++++++++++++++++++++++++++++++ 3 files changed, 207 insertions(+), 7 deletions(-) create mode 100644 sfa/senslab/slabpostgres.py diff --git a/sfa/generic/slab.py b/sfa/generic/slab.py index 0d1df18c..5c9f780c 100644 --- a/sfa/generic/slab.py +++ b/sfa/generic/slab.py @@ -4,7 +4,7 @@ import sfa.server.sfaapi import sfa.senslab.slabdriver import sfa.managers.registry_manager_slab import sfa.managers.slice_manager -import sfa.managers.aggregate_manager +import sfa.managers.aggregate_manager_slab class slab (Generic): @@ -18,7 +18,7 @@ class slab (Generic): def slicemgr_manager_class (self) : return sfa.managers.slice_manager.SliceManager def aggregate_manager_class (self) : - return sfa.managers.aggregate_manager.AggregateManager + return sfa.managers.aggregate_manager_slab.AggregateManager # driver class for server-side services, talk to the whole testbed def driver_class (self): diff --git a/sfa/senslab/slab-import.py b/sfa/senslab/slab-import.py index 9699f4b5..fe1c3342 100644 --- a/sfa/senslab/slab-import.py +++ b/sfa/senslab/slab-import.py @@ -11,6 +11,7 @@ import time from sfa.senslab.OARrestapi import OARapi from sfa.senslab.LDAPapi import LDAPapi from sfa.senslab.slabdriver import SlabDriver +from sfa.senslab.slabpostgres import SlabDB from sfa.util.config import Config from sfa.util.xrn import hrn_to_urn, get_authority,Xrn,get_leaf from sfa.util.table import SfaTable @@ -24,6 +25,7 @@ config = Config() TrustedR = TrustedRoots(Config.get_trustedroots_dir(config)) AuthHierarchy = Hierarchy() table = SfaTable() +db = SlabDB() if not table.exists(): table.create() @@ -151,17 +153,21 @@ def import_slice(person): extime = datetime.datetime.utcnow() slice_record['date_created'] = int(time.mktime(extime.timetuple())) - + #special slice table for Senslab, to store nodes info (OAR) existing_records = table.find({'hrn': slice_record['hrn'], 'type': 'slice'}) if not existing_records: print>>sys.stderr, " \r\n \t slab-import : slice record %s inserted" %(slice_record['hrn']) table.insert(slice_record) + #table.insert_slice(person) + db.insert_slice(person) + else: print>>sys.stderr, " \r\n \t slab-import : slice record %s updated" %(slice_record['hrn']) existing_record = existing_records[0] slice_record['record_id'] = existing_record['record_id'] - table.update(slice_record) + table.update(slice_record) + db.update_slice(slice_record) def delete_record( hrn, type): # delete the record @@ -179,7 +185,9 @@ def hostname_to_hrn(root_auth,hostname): def main(): - + if not db.exists('slice'): + db.createtable('slice') + if not config.SFA_REGISTRY_ENABLED: sys.exit(0) root_auth = config.SFA_REGISTRY_ROOT_AUTH @@ -224,9 +232,7 @@ def main(): #print "\r\n NODES8DICT ",nodes_dict ldap_person_list = Driver.GetPersons() - - # import node records for node in nodes_dict: hrn = hostname_to_hrn( root_auth, node['hostname']) diff --git a/sfa/senslab/slabpostgres.py b/sfa/senslab/slabpostgres.py new file mode 100644 index 00000000..19f9a81a --- /dev/null +++ b/sfa/senslab/slabpostgres.py @@ -0,0 +1,194 @@ +########################################################################### +# Copyright (C) 2011 by +# +# +# Copyright: See COPYING file that comes with this distribution +# +########################################################################### +import psycopg2 +import psycopg2.extensions +psycopg2.extensions.register_type(psycopg2.extensions.UNICODE) +# UNICODEARRAY not exported yet +psycopg2.extensions.register_type(psycopg2._psycopg.UNICODEARRAY) +from sfa.util.config import Config +from sfa.util.table import SfaTable +# allow to run sfa2wsdl if this is missing (for mac) +import sys +try: import pgdb +except: print >> sys.stderr, "WARNING, could not import pgdb" + +slice_table = {'oar_job_id':'integer DEFAULT -1', 'record_id_user':'integer PRIMARY KEY references sfa ON DELETE CASCADE ON UPDATE CASCADE', 'record_id_slice':'integer', 'slice_hrn':'text NOT NULL'} +tablenames_dict = {'slice': slice_table} + +class SlabDB: + def __init__(self): + self.config = Config() + self.debug = False + + self.connection = None + + #@handle_exception + def cursor(self): + if self.connection is None: + # (Re)initialize database connection + if psycopg2: + try: + # Try UNIX socket first + self.connection = psycopg2.connect(user = 'sfa', + password = 'sfa', + database = 'sfa') + #self.connection = psycopg2.connect(user = self.config.SFA_PLC_DB_USER, + #password = self.config.SFA_PLC_DB_PASSWORD, + #database = self.config.SFA_PLC_DB_NAME) + except psycopg2.OperationalError: + # Fall back on TCP + self.connection = psycopg2.connect(user = self.config.SFA_PLC_DB_USER, + password = self.config.SFA_PLC_DB_PASSWORD, + database = self.config.SFA_PLC_DB_NAME, + host = self.config.SFA_PLC_DB_HOST, + port = self.config.SFA_PLC_DB_PORT) + self.connection.set_client_encoding("UNICODE") + else: + self.connection = pgdb.connect(user = self.config.SFA_PLC_DB_USER, + password = self.config.SFA_PLC_DB_PASSWORD, + host = "%s:%d" % (self.config.SFA_PLC_DB_HOST, self.config.SFA_PLC_DB_PORT), + database = self.config.SFA_PLC_DB_NAME) + + (self.rowcount, self.description, self.lastrowid) = \ + (None, None, None) + + return self.connection.cursor() + + def close(self): + if self.connection is not None: + self.connection.close() + self.connection = None + + def exists(self, tablename): + mark = self.cursor() + sql = "SELECT * from pg_tables" + mark.execute(sql) + rows = mark.fetchall() + mark.close() + labels = [column[0] for column in mark.description] + rows = [dict(zip(labels, row)) for row in rows] + + rows = filter(lambda row: row['tablename'].startswith(tablename), rows) + if rows: + return True + return False + + def createtable(self, tablename ): + mark = self.cursor() + tablelist =[] + T = tablenames_dict[tablename] + for k in T.keys(): + tmp = str(k) +' ' + T[k] + tablelist.append(tmp) + end = ",".join(tablelist) + + statement = "CREATE TABLE " + tablename + " ("+ end +");" + + #template = "CREATE INDEX %s_%s_idx ON %s (%s);" + #indexes = [template % ( self.tablename, field, self.tablename, field) \ + #for field in ['hrn', 'type', 'authority', 'peer_authority', 'pointer']] + # IF EXISTS doenst exist in postgres < 8.2 + try: + mark.execute('DROP TABLE IF EXISTS ' + tablename +';') + except: + try: + mark.execute('DROP TABLE' + tablename +';') + except: + pass + + mark.execute(statement) + #for index in indexes: + #self.db.do(index) + self.connection.commit() + mark.close() + #self.connection.close() + self.close() + return + + + def findRecords(self,table, column, operator, string): + mark = self.cursor() + + statement = 'SELECT * FROM ' + table + ' WHERE ' + column + ' ' + operator + ' ' + ' \'' + string +'\'' + mark.execute(statement) + record = mark.fetchall() + mark.close() + self.connection.close() + return record + + + def insert(self, table, columns,values): + mark = self.cursor() + statement = "INSERT INTO " + table + \ + "(" + ",".join(columns) + ") " + \ + "VALUES(" + ", ".join(values) + ");" + + #statement = 'INSERT INTO ' + table + ' (' + columns + ') VALUES (' + values + ')' + print>>sys.stderr, " \r\n insert statement", statement + mark.execute(statement) + self.connection.commit() + mark.close() + #self.connection.close() + self.close() + return + + def insert_slice(self, person_rec): + sfatable = SfaTable() + keys = slice_table.keys() + + #returns a list of records (dicts) + #the filters specified will return only one matching record, into a list of dicts + + userrecord = sfatable.find({'hrn': person_rec['hrn'], 'type':'user'}) + + slicerec = sfatable.find({'hrn': person_rec['hrn']+'_slice', 'type':'slice'}) + if (isinstance (userrecord, list)): + userrecord = userrecord[0] + if (isinstance (slicerec, list)): + slicerec = slicerec[0] + + values = [ '-1', ' \''+ str(slicerec['hrn']) + '\'', str(userrecord['record_id']), str( slicerec['record_id'])] + + self.insert('slice', keys, values) + return + + def update(self, table, column_names, values, whereclause, valueclause): + + #Creates the values string for the update SQL command + if len(column_names) is not len(values): + return + else: + valueslist = [] + valuesdict = dict(zip(column_names,values)) + for k in valuesdict.keys(): + valuesdict[k] = str(valuesdict[k]) + v = ' \''+ str(k) + '\''+ '='+' \''+ valuesdict[k]+'\'' + valueslist.append(v) + + statement = "UPDATE %s SET %s WHERE %s = %s" % \ + (table, ", ".join(valueslist), whereclause, valueclause) + print >>sys.stderr, "\r\n \r\n \t SLABPOSTGRES.PY UPDATE statement ", statement + mark = self.cursor() + mark.execute(statement) + self.connection.commit() + mark.close() + self.close() + #self.connection.close() + return + + def update_slice(self, slice_rec): + sfatable = SfaTable() + userhrn = slice_rec['hrn'].strip('_slice') + userrecords = sfatable.find({'hrn': userhrn, 'type':'user'}) + columns = [ 'record_user_id', 'oar_job_id'] + values = [slice_rec['record_user_id'],slice_rec['oar_job_id']] + self.update('slice',columns, values,'record_slice_id', slice_rec['record_slice_id']) + return + + + \ No newline at end of file -- 2.43.0