From: Tony Mack Date: Tue, 14 Nov 2006 21:52:00 +0000 (+0000) Subject: - initial checking of db upgrade scripts X-Git-Tag: pycurl-7_13_1~309 X-Git-Url: http://git.onelab.eu/?a=commitdiff_plain;h=fcc284695ea36850ee7b69de22a9f49988fc27bc;p=plcapi.git - initial checking of db upgrade scripts --- diff --git a/tools/dzombie.py b/tools/dzombie.py new file mode 100755 index 0000000..6ccc58a --- /dev/null +++ b/tools/dzombie.py @@ -0,0 +1,119 @@ +#!/usr/bin/python +# +# Tool that removes zombie records from database tables# +import sys +import os +import getopt +import pgdb +from pprint import pprint + +schema_file = None +config_file = "/etc/planetlab/plc_config" +config = {} +execfile(config_file, config) + +def usage(): + print "Usage: %s SCHEMA_FILE " % sys.argv[0] + sys.exit(1) + +try: + schema_file = sys.argv[1] +except IndexError: + print "Error: too few arguments" + usage() + +# all foreing keys exist as primary kyes in another table +# will represent all foreign keys as +# { 'table.foreign_key': 'table.primary_key'} +foreign_keys = {} +foreign_keys_ordered = [] +zombie_keys = {} +# parse the schema for foreign keys +try: + file = open(schema_file, 'r') + index = 0 + lines = file.readlines() + while index < len(lines): + line = lines[index].strip() + # find all created objects + if line.startswith("CREATE"): + item_type = line.split(" ")[1].strip() + item_name = line.split(" ")[2].strip() + if item_type.upper() in ['TABLE']: + while index < len(lines): + index = index + 1 + nextline =lines[index].strip() + if nextline.find("--") > -1: + nextline = nextline[0:nextline.index("--")].replace(',', '') + if nextline.upper().find("REFERENCES") > -1: + nextline_parts = nextline.split(" ") + foreign_key_name = nextline_parts[0].strip() + foreign_key_table = nextline_parts[nextline_parts.index("REFERENCES")+1].strip() + foreign_key = item_name + "."+ foreign_key_name + primary_key = foreign_key_table +"."+ foreign_key_name + foreign_keys[foreign_key] = primary_key + foreign_keys_ordered.append(foreign_key) + elif nextline.find(";") >= 0: + break + index = index + 1 +except: + raise + +db = pgdb.connect(user = config['PLC_DB_USER'], + database = config['PLC_DB_NAME']) +cursor = db.cursor() +try: + for foreign_key in foreign_keys_ordered: + primary_key = foreign_keys[foreign_key] + sql = "SELECT distinct %s from %s" + + # get all foreign keys in this table + foreign_key_parts = foreign_key.split(".") + + # do not delete from primary tables + if foreign_key_parts[0] in ['addresses', 'boot_states', 'conf_files', \ + 'keys', 'messages', 'nodegroups', 'nodenetworks', 'nodes', 'pcus', 'peers' \ + 'persons', 'roles', 'sessions', 'sites', 'slices']: + #print "skipping table %s" % foreign_key_parts[0] + continue + + cursor.execute(sql % (foreign_key_parts[1], foreign_key_parts[0])) + foreign_rows = cursor.fetchall() + + # get all the primary keys from this foreign key's primary table + primary_key_parts = primary_key.split(".") + # foreign key name may not match primary key name. must rename these + if primary_key_parts[1] == 'creator_person_id': + primary_key_parts[1] = 'person_id' + elif primary_key_parts[1] == 'min_role_id': + primary_key_parts[1] = 'role_id' + sql = sql % (primary_key_parts[1], primary_key_parts[0]) + + # determin which primary records are deleted + desc = os.popen('psql planetlab4 postgres -c "\d %s;"' % primary_key_parts[0]) + result = desc.readlines() + if primary_key_parts[0] in ['slices']: + sql = sql + " where name not like '%_deleted'" + elif filter(lambda line: line.find("deleted") > -1, result): + sql = sql + " where deleted = false" + + cursor.execute(sql) + primary_key_rows = cursor.fetchall() + + # if foreign key isnt present in primay_key query, it either doesnt exist or marked as deleted + # also, ignore null foreign keys, not considered zombied + zombie_keys_func = lambda key: key not in primary_key_rows and not key == [None] + zombie_keys_list = [zombie_key[0] for zombie_key in filter(zombie_keys_func, foreign_rows)] + print zombie_keys_list + # delete these zombie records + if zombie_keys_list: + print " -> Deleting %d zombie record(s) from %s after checking %s" % \ + (len(zombie_keys_list), foreign_key[0], primary_key[0]) + sql_delete = 'DELETE FROM %s WHERE %s IN %s' % \ + (foreign_key_parts[0], foreign_key_parts[1], tuple(zombie_keys_list)) + cursor.execute(sql_delete) + db.commit() + #zombie_keys[foreign_key] = zombie_keys_list + print "done" +except pgdb.DatabaseError: + raise diff --git a/tools/plcdb.3-4.conf b/tools/plcdb.3-4.conf new file mode 100644 index 0000000..fa16866 --- /dev/null +++ b/tools/plcdb.3-4.conf @@ -0,0 +1,60 @@ +# configuration file that describes the differences + +# new_table_name = (field:old_table.field[:required_join_table.join_using:...]), ... + +DB_VERSION_PREVIOUS = '3' + +DB_VERSION_NEW = '4' + +sites = '(site_id:sites.site_id), (login_base:sites.login_base), (name:sites.name), (abbreviated_name:sites.abbreviated_name), (deleted:sites.deleted), (is_public:sites.is_public), (latitude:sites.latitude), (longitude:sites.longitude), (url:sites.url), (date_created:sites.date_created)' + +persons = '(person_id:persons.person_id), (email:persons.email), (first_name:persons.first_name), (last_name:persons.last_name), (deleted:persons.deleted), (enabled:persons.enabled), (password:persons.password), (verification_key:persons.verification_key), (verification_expires:persons.verification_expires), (title:persons.title), (phone:persons.phone), (url:persons.url), (bio:persons.bio)' + +person_site = 'person_id:person_site.person_id, site_id:person_site.site_id, is_primary:person_site.is_primary' + +address_types = 'address_type_id:address_types.address_type_id, name:address_types.name' + +addresses = 'address_id:addresses.address_id, line1:addresses.line1, line2:addresses.line2, line3:addresses.line3, city:addresses.city, state:addresses.state, postalcode:addresses.postalcode, country:addresses.country' + +address_address_type = 'address_id:addresses.address_id, address_type_id:addresses.address_type_id' + +key_types = 'key_type:key_types.key_type' + +keys = 'key_id:keys.key_id, key_type:keys.key_type, key:keys.key, is_blacklisted:keys.is_blacklisted' + +person_key = 'person_id:person_keys.person_id, key_id:person_keys.key_id' + +roles = 'role_id:roles.role_id, name:roles.name' + +person_role = 'person_id:person_roles.person_id, role_id:person_roles.role_id' + +boot_states = 'boot_state:node_bootstates.boot_state' + +nodes = 'node_id:nodes.node_id, hostname:nodes.hostname, site_id:sites.site_id:nodegroup_nodes.node_id:sites.nodegroup_id, boot_state:nodes.boot_state , deleted:nodes.deleted , model:nodes.model , boot_nonce:nodes.boot_nonce, version:nodes.version, ssh_rsa_key:nodes.ssh_rsa_key, key:nodes.key, date_created:nodes.date_created' + +nodegroups = 'nodegroup_id:nodegroups.nodegroup_id, name:nodegroups.name, description:nodegroups.description' + +nodegroup_node = 'nodegroup_id:nodegroup_nodes.nodegroup_id, node_id:nodegroup_nodes.node_id' + +conf_files = 'conf_file_id:conf_file.conf_file_id, enabled:conf_file.enabled, source:conf_file.source, dest:conf_file.dest, file_permissions:conf_file.file_permissions, file_owner:conf_file.file_owner, file_group:conf_file.file_group, preinstall_cmd:conf_file.preinstall_cmd, postinstall_cmd:conf_file.postinstall_cmd, error_cmd:conf_file.error_cmd, ignore_cmd_errors:conf_file.ignore_cmd_errors, always_update:conf_file.always_update' + +conf_file_node = 'conf_file_id:conf_assoc.conf_file_id, node_id:conf_assoc.node_id' + +conf_file_nodegroup = 'conf_file_id:conf_assoc.conf_file_id, nodegroup_id:conf_assoc.nodegroup_id' + +nodenetworks = 'nodenetwork_id:nodenetworks.nodenetwork_id, node_id:node_nodenetworks.node_id:node_nodenetworks.nodenetwork_id, is_primary:node_nodenetworks.is_primary:node_nodenetworks.nodenetwork_id, type:nodenetworks.type, method:nodenetworks.method, ip:nodenetworks.ip, mac:nodenetworks.mac, gateway:nodenetworks.gateway, network:nodenetworks.network, broadcast:nodenetworks.broadcast, netmask:nodenetworks.netmask, dns1:nodenetworks.dns1, dns2:nodenetworks.dns2, bwlimit:nodenetworks.bwlimit, hostname:nodenetworks.hostname' + +pcus = 'pcu_id:pcu.pcu_id, site_id:pcu.site_id, hostname:pcu.hostname, ip:pcu.ip, protocol:pcu.protocol, username:pcu.username, password:pcu.password, model:pcu.model, notes:pcu.notes' + +pcu_node = 'pcu_id:pcu_ports.pcu_id, node_id:pcu_ports.node_id, port:pcu_ports.port_number' + +slices = 'slice_id:dslice03_slices.slice_id, site_id:dslice03_slices.site_id, name:dslice03_slices.name, instantiation:dslice03_states.name:dslice03_states.state_id, url:dslice03_slices.url, description:dslice03_slices.description, max_nodes:dslice03_siteinfo.max_slices:dslice03_siteinfo.site_id, creator_person_id:dslice03_slices.creator_person_id, created:dslice03_slices.created, expires:dslice03_slices.expires, is_deleted:dslice03_slices.is_deleted' + +slice_node = 'slice_id:dslice03_slicenode.slice_id, node_id:dslice03_slicenode.node_id' + +slice_person = 'slice_id:dslice03_sliceuser.slice_id, person_id:dslice03_sliceuser.person_id' + +slice_attribute_types = 'attribute_type_id:dslice03_attributetypes.type_id, name:dslice03_attributetypes.name, description:dslice03_attributetypes.description, min_role_id:dslice03_attributetypes.min_role_id' + +slice_attribute = 'slice_attribute_id:dslice03_sliceattribute.attribute_id, slice_id:dslice03_sliceattribute.slice_id, attribute_type_id:dslice03_attributes.type_id:dslice03_attributes.attribute_id, value:dslice03_attributes.value1:dslice03_attributes.attribute_id' + diff --git a/tools/upgrade-db.py b/tools/upgrade-db.py new file mode 100755 index 0000000..d7ceaa5 --- /dev/null +++ b/tools/upgrade-db.py @@ -0,0 +1,414 @@ +#!/usr/bin/python +# +# Tool for upgrading a db based on db version # +import sys +import os +import getopt +import pgdb + +config = {} +config_file = "/etc/planetlab/plc_config" +execfile(config_file, config) +upgrade_config_file = "plcdb.3-4.conf" +schema_file = "planetlab4.sql" +temp_dir = "/tmp" + + +def usage(): + print "Usage: %s [OPTION] UPGRADE_CONFIG_FILE " % sys.argv[0] + print "Options:" + print " -s, --schema=FILE Upgraded Database Schema" + print " -t, --temp-dir=DIR Temp Directory" + print " --help This message" + sys.exit(1) + +try: + (opts, argv) = getopt.getopt(sys.argv[1:], + "s:d:", + ["schema=", + "temp-dir=", + "help"]) +except getopt.GetoptError, err: + print "Error: ", err.msg + usage() + +for (opt, optval) in opts: + if opt == "-s" or opt == "--schema": + schema_file = optval + elif opt == "-d" or opt == "--temp-dir": + temp_dir = optval + elif opt == "--help": + usage() +try: + upgrade_config_file = argv[0] +except IndexError: + print "Error: too few arguments" + usage() + +database = config['PLC_DB_NAME'] +archived_database = database + "_archived" +schema = {} +inserts = [] +schema_items_ordered = [] +temp_tables = {} + + +# load conf file for this upgrade +try: + upgrade_config = {} + execfile(upgrade_config_file, upgrade_config) + upgrade_config.pop('__builtins__') + db_version_previous = upgrade_config['DB_VERSION_PREVIOUS'] + db_version_new = upgrade_config['DB_VERSION_NEW'] + +except IOError, fault: + print "ERROR: upgrade config file (%s) not found. Exiting" % \ + (fault) + sys.exit(1) +except KeyError, fault: + print "ERROR: %s not set in upgrade confing (%s). Exiting" % \ + (fault, upgrade_config_file) + sys.exit(1) + + + + +def connect(): + db = pgdb.connect(user = config['PLC_DB_USER'], + database = config['PLC_DB_NAME']) + return db + +def archive_db(): + + print "STATUS: archiving old database" + archive_db = "psql template1 postgres -qc " \ + " 'ALTER DATABASE %s RENAME TO %s;';" \ + " createdb -U postgres %s > /dev/null; " % \ + (database, archived_database, database) + exit_status = os.system(archive_db) + if exit_status: + print "ERROR: unable to archive database. Upgrade failed" + sys.exit(1) + print "STATUS: %s has been archived. now named %s" % (database, archived_database) + + +def encode_utf8(inputfile_name, outputfile_name): + # rewrite a iso-8859-1 encoded file and in utf8 + try: + inputfile = open(inputfile_name, 'r') + outputfile = open(outputfile_name, 'w') + for line in inputfile: + outputfile.write(unicode(line, 'iso-8859-1').encode('utf8')) + inputfile.close() + outputfile.close() + except: + print 'error encoding file' + raise + +def create_item_from_schema(item_name): + + try: + (type, body_list) = schema[item_name] + exit_status = os.system('psql %s %s -qc "%s" > /dev/null 2>&1' % \ + (config['PLC_DB_NAME'], config['PLC_DB_USER'],"".join(body_list) ) ) + if exit_status: + raise Exception + except Exception, fault: + print 'ERROR: create %s failed. Check schema.' % item_name + sys.exit(1) + raise fault + + except KeyError: + print "ERROR: cannot create %s. definition not found in %s" % \ + (key, schema_file) + return False + +def fix_row(row, table_name, table_fields): + + if table_name in ['nodenetworks']: + # convert str bwlimit to bps int + bwlimit_index = table_fields.index('bwlimit') + if isinstance(row[bwlimit_index], int): + pass + elif row[bwlimit_index].find('mbit') > -1: + row[bwlimit_index] = int(row[bwlimit_index].split('mbit')[0]) \ + * 1000000 + elif row[bwlimit_index].find('kbit') > -1: + row[bwlimit_index] = int(row[bwlimit_index].split('kbit')[0]) \ + * 1000 + elif table_name in ['slice_attribute']: + # modify some invalid foreign keys + attribute_type_index = table_fields.index('attribute_type_id') + if row[attribute_type_index] == 10004: + row[attribute_type_index] = 10016 + elif row[attribute_type_index] == 10006: + row[attribute_type_index] = 10017 + elif table_name in ['slice_attribute_types']: + type_id_index = table_fields.index('attribute_type_id') + if row[type_id_index] in [10004, 10006]: + return None + return row + +def fix_table(table, table_name, table_fields): + if table_name in ['slice_attribute_types']: + # remove duplicate/redundant primary keys + type_id_index = table_fields.index('attribute_type_id') + for row in table: + if row[type_id_index] in [10004, 10006]: + table.remove(row) + return table + +def remove_temp_tables(): + # remove temp_tables + try: + for temp_table in temp_tables: + os.remove(temp_tables[temp_table]) + except: + raise + +def generate_temp_table(table_name, db): + cursor = db.cursor() + try: + # get upgrade directions + table_def = upgrade_config[table_name].replace('(', '').replace(')', '').split(',') + table_fields, old_fields, required_joins = [], [], set() + for field in table_def: + field_parts = field.strip().split(':') + table_fields.append(field_parts[0]) + old_fields.append(field_parts[1]) + if field_parts[2:]: + required_joins.update(set(field_parts[2:])) + + # get indices of fields that cannot be null + (type, body_list) = schema[table_name] + not_null_indices = [] + for field in table_fields: + for body_line in body_list: + if body_line.find(field) > -1 and \ + body_line.upper().find("NOT NULL") > -1: + not_null_indices.append(table_fields.index(field)) + + # get index of primary key + primary_key_indices = [] + for body_line in body_list: + if body_line.find("PRIMARY KEY") > -1: + primary_key = body_line + for field in table_fields: + if primary_key.find(field) > -1: + primary_key_indices.append(table_fields.index(field)) + break + + # get old data + get_old_data = "SELECT DISTINCT %s FROM %s" % \ + (", ".join(old_fields), old_fields[0].split(".")[0]) + for join in required_joins: + get_old_data = get_old_data + " INNER JOIN %s USING (%s) " % \ + (join.split('.')[0], join.split('.')[1]) + + cursor.execute(get_old_data) + rows = cursor.fetchall() + + # write data to a temp file + temp_file_name = '%s/%s.tmp' % (temp_dir, table_name) + temp_file = open(temp_file_name, 'w') + for row in rows: + # attempt to make any necessary fixes to data + row = fix_row(row, table_name, table_fields) + # do not attempt to write null rows + if row == None: + continue + # do not attempt to write rows with null primary keys + if filter(lambda x: row[x] == None, primary_key_indices): + continue + for i in range(len(row)): + # convert nulls into something pg can understand + if row[i] == None: + if i in not_null_indices: + # XX doesnt work if column is int type + row[i] = "" + else: + row[i] = "\N" + if isinstance(row[i], int) or isinstance(row[i], float): + row[i] = str(row[i]) + # escape whatever can mess up the data format + if isinstance(row[i], str): + row[i] = row[i].replace('\t', '\\t') + row[i] = row[i].replace('\n', '\\n') + row[i] = row[i].replace('\r', '\\r') + data_row = "\t".join(row) + temp_file.write(data_row + "\n") + temp_file.write("\.\n") + temp_file.close() + temp_tables[table_name] = temp_file_name + + except KeyError: + #print "WARNING: cannot upgrade %s. upgrade def not found. skipping" % \ + # (table_name) + return False + except IndexError, fault: + print "ERROR: error found in upgrade config file. " \ + "check %s configuration. Aborting " % \ + (table_name) + sys.exit(1) + except: + print "ERROR: configuration for %s doesnt match db schema. " \ + " Aborting" % (table_name) + try: + db.rollback() + except: + pass + raise + + +# Connect to current db +db = connect() +cursor = db.cursor() + +# determin current db version +try: + cursor.execute("SELECT relname from pg_class where relname = 'plc_db_version'") + rows = cursor.fetchall() + if not rows: + print "WARNING: current db has no version. Unable to validate config file." + else: + cursor.execute("SELECT version FROM plc_db_version") + rows = cursor.fetchall() + + if rows[0][0] == db_version_new: + print "STATUS: Versions are the same. No upgrade necessary." + sys.exit() + elif not rows[0][0] == db_version_previous: + print "STATUS: DB_VERSION_PREVIOUS in config file (%s) does not" \ + " match current db version %d" % (upgrade_config_file, rows[0][0]) + sys.exit() + else: + print "STATUS: attempting upgrade from %d to %d" % \ + (db_version_previous, db_version_new) + + # check db encoding + sql = " SELECT pg_catalog.pg_encoding_to_char(d.encoding)" \ + " FROM pg_catalog.pg_database d " \ + " LEFT JOIN pg_catalog.pg_user u ON d.datdba = u.usesysid " \ + " WHERE d.datname = '%s' " % config['PLC_DB_NAME'] + cursor.execute(sql) + rows = cursor.fetchall() + if rows[0][0] not in ['UTF8']: + print "WARNING: db encoding is not utf8. Must convert" + db.close() + # generate db dump + dump_file = '%s/dump.sql' % (temp_dir) + dump_file_encoded = dump_file + ".utf8" + dump_cmd = 'pg_dump -i %s -U %s -f %s > /dev/null 2>&1' % \ + (config['PLC_DB_NAME'], config['PLC_DB_USER'], dump_file) + print dump_cmd + if os.system(dump_cmd): + print "ERROR: during db dump. Exiting." + sys.exit(1) + # encode dump to utf8 + print "STATUS: encoding database dump" + encode_utf8(dump_file, dump_file_encoded) + # archive original db + archive_db() + # create a utf8 database and upload encoded data + recreate_cmd = 'createdb -U %s -E UTF8 %s > /dev/null 2>&1; ' \ + 'psql -a -U %s %s < %s > /dev/null 2>&1;' % \ + (config['PLC_DB_USER'], config['PLC_DB_NAME'], \ + config['PLC_DB_USER'], config['PLC_DB_NAME'], dump_file_encoded) + print "STATUS: recreating database as utf8" + if os.system(recreate_cmd): + print "ERROR: database encoding failed. Aborting" + sys.exit(1) + + os.remove(dump_file_encoded) + os.remove(dump_file) +except: + raise + + +# parse the schema user wishes to upgrade to +try: + file = open(schema_file, 'r') + index = 0 + lines = file.readlines() + while index < len(lines): + line = lines[index] + # find all created objects + if line.startswith("CREATE"): + line_parts = line.split(" ") + item_type = line_parts[1] + item_name = line_parts[2] + schema_items_ordered.append(item_name) + if item_type in ['INDEX']: + schema[item_name] = (item_type, line) + + # functions, tables, views span over multiple lines + # handle differently than indexes + elif item_type in ['AGGREGATE', 'TABLE', 'VIEW']: + fields = [line] + while index < len(lines): + index = index + 1 + nextline =lines[index] + fields.append(nextline) + if nextline.find(";") >= 0: + break + schema[item_name] = (item_type, fields) + else: + print "ERROR: unknown type %s" % item_type + elif line.startswith("INSERT"): + inserts.append(line) + index = index + 1 + +except: + raise + +print "STATUS: generating temp tables" +# generate all temp tables +for key in schema_items_ordered: + (type, body_list) = schema[key] + if type == 'TABLE': + generate_temp_table(key, db) + +# disconenct from current database and archive it +cursor.close() +db.close() + +print "STATUS: archiving database" +archive_db() + + +print "STATUS: upgrading database" +# attempt to create and load all items from schema into temp db +try: + for key in schema_items_ordered: + (type, body_list) = schema[key] + create_item_from_schema(key) + if type == 'TABLE': + if upgrade_config.has_key(key): + table_def = upgrade_config[key].replace('(', '').replace(')', '').split(',') + table_fields = [field.strip().split(':')[0] for field in table_def] + insert_cmd = "psql %s %s -c " \ + " 'COPY %s (%s) FROM stdin;' < %s " % \ + (database, config['PLC_DB_USER'], key, ", ".join(table_fields), temp_tables[key] ) + exit_status = os.system(insert_cmd) + if exit_status: + print "ERROR: upgrade %s failed" % key + raise + else: + # check if there are any insert stmts in schema for this table + print "WARNING: %s has no temp data file. Unable to populate with old data" % key + for insert_stmt in inserts: + if insert_stmt.find(key) > -1: + insert_cmd = 'psql %s postgres -qc "%s;" > /dev/null 2>&1' % \ + (database, insert_stmt) + os.system(insert_cmd) +except: + print "ERROR: failed to populate db. Unarchiving original database and aborting" + undo_command = "dropdb -U postgres %s; psql template1 postgres -qc" \ + " 'ALTER DATABASE %s RENAME TO %s;'; > /dev/null" % \ + (database, archived_database, database) + os.system(undo_command) + remove_temp_tables() + +remove_temp_tables() + +print "upgrade complete"