import sys import pprint import os from StringIO import StringIO from types import StringTypes, ListType import httplib from xml.dom import minidom from lxml import etree import codecs from sfa.util.sfalogging import info_logger class RSpec: def __init__(self, xml = None, xsd = None, NSURL = None): ''' Class to manipulate RSpecs. Reads and parses rspec xml into python dicts and reads python dicts and writes rspec xml self.xsd = # Schema. Can be local or remote file. self.NSURL = # If schema is remote, Name Space URL to query (full path minus filename) self.rootNode = # root of the DOM self.dict = # dict of the RSpec. self.schemaDict = {} # dict of the Schema ''' self.xsd = xsd self.rootNode = None self.dict = {} self.schemaDict = {} self.NSURL = NSURL if xml: if type(xml) == file: self.parseFile(xml) if type(xml) in StringTypes: self.parseString(xml) self.dict = self.toDict() if xsd: self._parseXSD(self.NSURL + self.xsd) def _getText(self, nodelist): rc = "" for node in nodelist: if node.nodeType == node.TEXT_NODE: rc = rc + node.data return rc # The rspec is comprised of 2 parts, and 1 reference: # attributes/elements describe individual resources # complexTypes are used to describe a set of attributes/elements # complexTypes can include a reference to other complexTypes. def _getName(self, node): '''Gets name of node. If tag has no name, then return tag's localName''' name = None if not node.nodeName.startswith("#"): if node.localName: name = node.localName elif node.attributes.has_key("name"): name = node.attributes.get("name").value return name # Attribute. {name : nameofattribute, {items: values}) def _attributeDict(self, attributeDom): '''Traverse single attribute node. Create a dict {attributename : {name: value,}]}''' node = {} # parsed dict for attr in attributeDom.attributes.keys(): node[attr] = attributeDom.attributes.get(attr).value return node def appendToDictOrCreate(self, dict, key, value): if (dict.has_key(key)): dict[key].append(value) else: dict[key]=[value] return dict def toGenDict(self, nodeDom=None, parentdict=None, siblingdict={}, parent=None): """ convert an XML to a nested dict: * Non-terminal nodes (elements with string children and attributes) are simple dictionaries * Terminal nodes (the rest) are nested dictionaries """ if (not nodeDom): nodeDom=self.rootNode curNodeName = nodeDom.localName if (nodeDom.hasChildNodes()): childdict={} for attribute in nodeDom.attributes.keys(): childdict = self.appendToDictOrCreate(childdict, attribute, nodeDom.getAttribute(attribute)) for child in nodeDom.childNodes[:-1]: if (child.nodeValue): siblingdict = self.appendToDictOrCreate(siblingdict, curNodeName, child.nodeValue) else: childdict = self.toGenDict(child, None, childdict, curNodeName) child = nodeDom.childNodes[-1] if (child.nodeValue): siblingdict = self.appendToDictOrCreate(siblingdict, curNodeName, child.nodeValue) if (childdict): siblingdict = self.appendToDictOrCreate(siblingdict, curNodeName, childdict) else: siblingdict = self.toGenDict(child, siblingdict, childdict, curNodeName) else: childdict={} for attribute in nodeDom.attributes.keys(): childdict = self.appendToDictOrCreate(childdict, attribute, nodeDom.getAttribute(attribute)) self.appendToDictOrCreate(siblingdict, curNodeName, childdict) if (parentdict is not None): parentdict = self.appendToDictOrCreate(parentdict, parent, siblingdict) return parentdict else: return siblingdict def toDict(self, nodeDom = None): """ convert this rspec to a dict and return it. """ node = {} if not nodeDom: nodeDom = self.rootNode elementName = nodeDom.nodeName if elementName and not elementName.startswith("#"): # attributes have tags and values. get {tag: value}, else {type: value} node[elementName] = self._attributeDict(nodeDom) # resolve the child nodes. if nodeDom.hasChildNodes(): for child in nodeDom.childNodes: childName = self._getName(child) # skip null children if not childName: continue # initialize the possible array of children if not node[elementName].has_key(childName): node[elementName][childName] = [] if isinstance(child, minidom.Text): # add if data is not empty if child.data.strip(): node[elementName][childName].append(nextchild.data) elif child.hasChildNodes() and isinstance(child.childNodes[0], minidom.Text): for nextchild in child.childNodes: node[elementName][childName].append(nextchild.data) else: childdict = self.toDict(child) for value in childdict.values(): node[elementName][childName].append(value) return node def toxml(self): """ convert this rspec to an xml string and return it. """ return self.rootNode.toxml() def toprettyxml(self): """ print this rspec in xml in a pretty format. """ return self.rootNode.toprettyxml() def __removeWhitespaceNodes(self, parent): for child in list(parent.childNodes): if child.nodeType == minidom.Node.TEXT_NODE and child.data.strip() == '': parent.removeChild(child) else: self.__removeWhitespaceNodes(child) def parseFile(self, filename): """ read a local xml file and store it as a dom object. """ dom = minidom.parse(filename) self.__removeWhitespaceNodes(dom) self.rootNode = dom.childNodes[0] def parseString(self, xml): """ read an xml string and store it as a dom object. """ print>>sys.stderr, "\r\n \t RSPEC.PY parseString xml \r\n %s " %(xml) #xmlUnicode = unicode( xml, 'utf-8' ) xml = (xml.encode("utf-8")) dom = minidom.parseString(xml) print>>sys.stderr, "\r\n \t RSPEC.PY OKKK parseString dom \r\n %s " %(dom) self.__removeWhitespaceNodes(dom) self.rootNode = dom.childNodes[0] def _httpGetXSD(self, xsdURI): # split the URI into relevant parts host = xsdURI.split("/")[2] if xsdURI.startswith("https"): conn = httplib.HTTPSConnection(host, httplib.HTTPSConnection.default_port) elif xsdURI.startswith("http"): conn = httplib.HTTPConnection(host, httplib.HTTPConnection.default_port) conn.request("GET", xsdURI) # If we can't download the schema, raise an exception r1 = conn.getresponse() if r1.status != 200: raise Exception return r1.read().replace('\n', '').replace('\t', '').strip() def _parseXSD(self, xsdURI): """ Download XSD from URL, or if file, read local xsd file and set schemaDict. Since the schema definiton is a global namespace shared by and agreed upon by others, this should probably be a URL. Check for URL, download xsd, parse, or if local file, use that. """ schemaDom = None if xsdURI.startswith("http"): try: schemaDom = minidom.parseString(self._httpGetXSD(xsdURI)) except Exception, e: # logging.debug("%s: web file not found" % xsdURI) # logging.debug("Using local file %s" % self.xsd") info_logger.log_exc("rspec.parseXSD: can't find %s on the web. Continuing." % xsdURI) if not schemaDom: if os.path.exists(xsdURI): # logging.debug("using local copy.") info_logger.debug("rspec.parseXSD: Using local %s" % xsdURI) schemaDom = minidom.parse(xsdURI) else: raise Exception("rspec.parseXSD: can't find xsd locally") self.schemaDict = self.toDict(schemaDom.childNodes[0]) def dict2dom(self, rdict, include_doc = False): """ convert a dict object into a dom object. """ def elementNode(tagname, rd): element = minidom.Element(tagname) for key in rd.keys(): if isinstance(rd[key], StringTypes) or isinstance(rd[key], int): element.setAttribute(key, unicode(rd[key])) elif isinstance(rd[key], dict): child = elementNode(key, rd[key]) element.appendChild(child) elif isinstance(rd[key], list): for item in rd[key]: if isinstance(item, dict): child = elementNode(key, item) element.appendChild(child) elif isinstance(item, StringTypes) or isinstance(item, int): child = minidom.Element(key) text = minidom.Text() text.data = item child.appendChild(text) element.appendChild(child) return element # Minidom does not allow documents to have more then one # child, but elements may have many children. Because of # this, the document's root node will be the first key/value # pair in the dictionary. node = elementNode(rdict.keys()[0], rdict.values()[0]) if include_doc: rootNode = minidom.Document() rootNode.appendChild(node) else: rootNode = node return rootNode def parseDict(self, rdict, include_doc = True): """ Convert a dictionary into a dom object and store it. """ self.rootNode = self.dict2dom(rdict, include_doc).childNodes[0] def getDictsByTagName(self, tagname, dom = None): """ Search the dom for all elements with the specified tagname and return them as a list of dicts """ if not dom: dom = self.rootNode dicts = [] doms = dom.getElementsByTagName(tagname) dictlist = [self.toDict(d) for d in doms] for item in dictlist: for value in item.values(): dicts.append(value) return dicts def getDictByTagNameValue(self, tagname, value, dom = None): """ Search the dom for the first element with the specified tagname and value and return it as a dict. """ tempdict = {} if not dom: dom = self.rootNode dicts = self.getDictsByTagName(tagname, dom) for rdict in dicts: if rdict.has_key('name') and rdict['name'] in [value]: return rdict return tempdict def filter(self, tagname, attribute, blacklist = [], whitelist = [], dom = None): """ Removes all elements where: 1. tagname matches the element tag 2. attribute matches the element attribte 3. attribute value is in valuelist """ tempdict = {} if not dom: dom = self.rootNode if dom.localName in [tagname] and dom.attributes.has_key(attribute): if whitelist and dom.attributes.get(attribute).value not in whitelist: dom.parentNode.removeChild(dom) if blacklist and dom.attributes.get(attribute).value in blacklist: dom.parentNode.removeChild(dom) if dom.hasChildNodes(): for child in dom.childNodes: self.filter(tagname, attribute, blacklist, whitelist, child) def merge(self, rspecs, tagname, dom=None): """ Merge this rspec with the requested rspec based on the specified starting tag name. The start tag (and all of its children) will be merged """ tempdict = {} if not dom: dom = self.rootNode whitelist = [] blacklist = [] if dom.localName in [tagname] and dom.attributes.has_key(attribute): if whitelist and dom.attributes.get(attribute).value not in whitelist: dom.parentNode.removeChild(dom) if blacklist and dom.attributes.get(attribute).value in blacklist: dom.parentNode.removeChild(dom) if dom.hasChildNodes(): for child in dom.childNodes: self.filter(tagname, attribute, blacklist, whitelist, child) def validateDicts(self): types = { 'EInt' : int, 'EString' : str, 'EByteArray' : list, 'EBoolean' : bool, 'EFloat' : float, 'EDate' : date} def pprint(self, r = None, depth = 0): """ Pretty print the dict """ line = "" if r == None: r = self.dict # Set the dept for tab in range(0,depth): line += " " # check if it's nested if type(r) == dict: for i in r.keys(): print line + "%s:" % i self.pprint(r[i], depth + 1) elif type(r) in (tuple, list): for j in r: self.pprint(j, depth + 1) # not nested so just print. else: print line + "%s" % r class RecordSpec(RSpec): root_tag = 'record' def parseDict(self, rdict, include_doc = False): """ Convert a dictionary into a dom object and store it. """ self.rootNode = self.dict2dom(rdict, include_doc) def dict2dom(self, rdict, include_doc = False): record_dict = rdict if not len(rdict.keys()) == 1: record_dict = {self.root_tag : rdict} return RSpec.dict2dom(self, record_dict, include_doc) # vim:ts=4:expandtab