+++ /dev/null
-### $Id$
-### $URL$
-
-import sys
-import pprint
-import os
-import httplib
-from xml.dom import minidom
-from types import StringTypes, ListType
-
-class Rspec():
-
- def __init__(self, xml = None, xsd = None, NSURL = None):
- '''
- Class to manipulate RSpecs. Reads and parses rspec xml into python dicts
- and reads python dicts and writes rspec xml
-
- self.xsd = # Schema. Can be local or remote file.
- self.NSURL = # If schema is remote, Name Space URL to query (full path minus filename)
- self.rootNode = # root of the DOM
- self.dict = # dict of the RSpec.
- self.schemaDict = {} # dict of the Schema
- '''
-
- self.xsd = xsd
- self.rootNode = None
- self.dict = {}
- self.schemaDict = {}
- self.NSURL = NSURL
- if xml:
- if type(xml) == file:
- self.parseFile(xml)
- if type(xml) == str:
- self.parseString(xml)
- self.dict = self.toDict()
- if xsd:
- self._parseXSD(self.NSURL + self.xsd)
-
-
- def _getText(self, nodelist):
- rc = ""
- for node in nodelist:
- if node.nodeType == node.TEXT_NODE:
- rc = rc + node.data
- return rc
-
- # The rspec is comprised of 2 parts, and 1 reference:
- # attributes/elements describe individual resources
- # complexTypes are used to describe a set of attributes/elements
- # complexTypes can include a reference to other complexTypes.
-
-
- def _getName(self, node):
- '''Gets name of node. If tag has no name, then return tag's localName'''
- name = None
- if not node.nodeName.startswith("#"):
- if node.localName:
- name = node.localName
- elif node.attributes.has_key("name"):
- name = node.attributes.get("name").value
- return name
-
-
- # Attribute. {name : nameofattribute, {items: values})
- def _attributeDict(self, attributeDom):
- '''Traverse single attribute node. Create a dict {attributename : {name: value,}]}'''
- node = {} # parsed dict
- for attr in attributeDom.attributes.keys():
- node[attr] = attributeDom.attributes.get(attr).value
- return node
-
-
- def toDict(self, nodeDom = None):
- """
- convert this rspec to a dict and return it.
- """
- node = {}
- if not nodeDom:
- nodeDom = self.rootNode
-
- elementName = nodeDom.nodeName
- if elementName and not elementName.startswith("#"):
- # attributes have tags and values. get {tag: value}, else {type: value}
- node[elementName] = self._attributeDict(nodeDom)
- # resolve the child nodes.
- if nodeDom.hasChildNodes():
- for child in nodeDom.childNodes:
- childName = self._getName(child)
- # skip null children
- if not childName:
- continue
- # initialize the possible array of children
- if not node[elementName].has_key(childName):
- node[elementName][childName] = []
- # if child node has text child nodes
- # append the children to the array as strings
- if child.hasChildNodes() and isinstance(child.childNodes[0], minidom.Text):
- for nextchild in child.childNodes:
- node[elementName][childName].append(nextchild.data)
- # convert element child node to dict
- else:
- childdict = self.toDict(child)
- for value in childdict.values():
- node[elementName][childName].append(value)
- #node[childName].append(self.toDict(child))
- return node
-
-
- def toxml(self):
- """
- convert this rspec to an xml string and return it.
- """
- return self.rootNode.toxml()
-
-
- def toprettyxml(self):
- """
- print this rspec in xml in a pretty format.
- """
- return self.rootNode.toprettyxml()
-
-
- def parseFile(self, filename):
- """
- read a local xml file and store it as a dom object.
- """
- dom = minidom.parse(filename)
- self.rootNode = dom.childNodes[0]
-
-
- def parseString(self, xml):
- """
- read an xml string and store it as a dom object.
- """
- xml = xml.replace('\n', '').replace('\t', '').strip()
- dom = minidom.parseString(xml)
- self.rootNode = dom.childNodes[0]
-
-
- def _httpGetXSD(self, xsdURI):
- # split the URI into relevant parts
- host = xsdURI.split("/")[2]
- if xsdURI.startswith("https"):
- conn = httplib.HTTPSConnection(host,
- httplib.HTTPSConnection.default_port)
- elif xsdURI.startswith("http"):
- conn = httplib.HTTPConnection(host,
- httplib.HTTPConnection.default_port)
- conn.request("GET", xsdURI)
- # If we can't download the schema, raise an exception
- r1 = conn.getresponse()
- if r1.status != 200:
- raise Exception
- return r1.read().replace('\n', '').replace('\t', '').strip()
-
-
- def _parseXSD(self, xsdURI):
- """
- Download XSD from URL, or if file, read local xsd file and set schemaDict
- """
- # Since the schema definiton is a global namespace shared by and agreed upon by
- # others, this should probably be a URL. Check for URL, download xsd, parse, or
- # if local file, use local file.
- schemaDom = None
- if xsdURI.startswith("http"):
- try:
- schemaDom = minidom.parseString(self._httpGetXSD(xsdURI))
- except Exception, e:
- # logging.debug("%s: web file not found" % xsdURI)
- # logging.debug("Using local file %s" % self.xsd")
- print e
- print "Can't find %s on the web. Continuing." % xsdURI
- if not schemaDom:
- if os.path.exists(xsdURI):
- # logging.debug("using local copy.")
- print "Using local %s" % xsdURI
- schemaDom = minidom.parse(xsdURI)
- else:
- raise Exception("Can't find xsd locally")
- self.schemaDict = self.toDict(schemaDom.childNodes[0])
-
-
- def dict2dom(self, rdict, include_doc = False):
- """
- convert a dict object into a dom object.
- """
-
- def elementNode(tagname, rd):
- element = minidom.Element(tagname)
- for key in rd.keys():
- if isinstance(rd[key], StringTypes) or isinstance(rd[key], int):
- element.setAttribute(key, str(rd[key]))
- elif isinstance(rd[key], dict):
- child = elementNode(key, rd[key])
- element.appendChild(child)
- elif isinstance(rd[key], list):
- for item in rd[key]:
- if isinstance(item, dict):
- child = elementNode(key, item)
- element.appendChild(child)
- elif isinstance(item, StringTypes) or isinstance(item, int):
- child = minidom.Element(key)
- text = minidom.Text()
- text.data = item
- child.appendChild(text)
- element.appendChild(child)
- return element
-
- # Minidom does not allow documents to have more then one
- # child, but elements may have many children. Because of
- # this, the document's root node will be the first key/value
- # pair in the dictionary.
- node = elementNode(rdict.keys()[0], rdict.values()[0])
- if include_doc:
- rootNode = minidom.Document()
- rootNode.appendChild(node)
- else:
- rootNode = node
- return rootNode
-
-
- def parseDict(self, rdict, include_doc = True):
- """
- Convert a dictionary into a dom object and store it.
- """
- self.rootNode = self.dict2dom(rdict, include_doc)
-
-
- def getDictsByTagName(self, tagname, dom = None):
- """
- Search the dom for all elements with the specified tagname
- and return them as a list of dicts
- """
- if not dom:
- dom = self.rootNode
- dicts = []
- doms = dom.getElementsByTagName(tagname)
- dictlist = [self.toDict(d) for d in doms]
- for item in dictlist:
- for value in item.values():
- dicts.append(value)
- return dicts
-
- def getDictByTagNameValue(self, tagname, value, dom = None):
- """
- Search the dom for the first element with the specified tagname
- and value and return it as a dict.
- """
- tempdict = {}
- if not dom:
- dom = self.rootNode
- dicts = self.getDictsByTagName(tagname, dom)
-
- for rdict in dicts:
- if rdict.has_key('name') and rdict['name'] in [value]:
- return rdict
-
- return tempdict
-
-
- def filter(self, tagname, attribute, blacklist = [], whitelist = [], dom = None):
- """
- Removes all elements where:
- 1. tagname matches the element tag
- 2. attribute matches the element attribte
- 3. attribute value is in valuelist
- """
-
- tempdict = {}
- if not dom:
- dom = self.rootNode
-
- if dom.localName in [tagname] and dom.attributes.has_key(attribute):
- if whitelist and dom.attributes.get(attribute).value not in whitelist:
- dom.parentNode.removeChild(dom)
- if blacklist and dom.attributes.get(attribute).value in blacklist:
- dom.parentNode.removeChild(dom)
-
- if dom.hasChildNodes():
- for child in dom.childNodes:
- self.filter(tagname, attribute, blacklist, whitelist, child)
-
-
- def validateDicts(self):
- types = {
- 'EInt' : int,
- 'EString' : str,
- 'EByteArray' : list,
- 'EBoolean' : bool,
- 'EFloat' : float,
- 'EDate' : date}
-
-
- def pprint(self, r = None, depth = 0):
- """
- Pretty print the dict
- """
- line = ""
- if r == None: r = self.dict
- # Set the dept
- for tab in range(0,depth): line += " "
- # check if it's nested
- if type(r) == dict:
- for i in r.keys():
- print line + "%s:" % i
- self.pprint(r[i], depth + 1)
- elif type(r) in (tuple, list):
- for j in r: self.pprint(j, depth + 1)
- # not nested so just print.
- else:
- print line + "%s" % r
-
-
-
-class RecordSpec(Rspec):
-
- root_tag = 'record'
- def parseDict(self, rdict, include_doc = False):
- """
- Convert a dictionary into a dom object and store it.
- """
- self.rootNode = self.dict2dom(rdict, include_doc)
-
- def dict2dom(self, rdict, include_doc = False):
- record_dict = rdict
- if not len(rdict.keys()) == 1:
- record_dict = {self.root_tag : rdict}
- return Rspec.dict2dom(self, record_dict, include_doc)
-
-
-# vim:ts=4:expandtab
-