import sys
import pprint
import os
+import httplib
from xml.dom import minidom
+from types import StringTypes, ListType
class Rspec():
- def __init__(self, xml = None, xsd = None):
- self.xsd = xsd # schema
- self.rootNode = None # root of the dom
- self.dict = {} # dict of the rspec.
+ def __init__(self, xml = None, xsd = None, NSURL = None):
+ '''
+ Class to manipulate RSpecs. Reads and parses rspec xml into python dicts
+ and reads python dicts and writes rspec xml
+
+ self.xsd = # Schema. Can be local or remote file.
+ self.NSURL = # If schema is remote, Name Space URL to query (full path minus filename)
+ self.rootNode = # root of the DOM
+ self.dict = # dict of the RSpec.
+ self.schemaDict = {} # dict of the Schema
+ '''
+
+ self.xsd = xsd
+ self.rootNode = None
+ self.dict = {}
+ self.schemaDict = {}
+ self.NSURL = NSURL
if xml:
if type(xml) == file:
self.parseFile(xml)
if type(xml) == str:
self.parseString(xml)
self.dict = self.toDict()
-
+ if xsd:
+ self._parseXSD(self.NSURL + self.xsd)
+
+
def _getText(self, nodelist):
rc = ""
for node in nodelist:
if elementName and not elementName.startswith("#"):
# attributes have tags and values. get {tag: value}, else {type: value}
node[elementName] = self._attributeDict(nodeDom)
- #node.update(self._attributeDict(nodeDom))
# resolve the child nodes.
if nodeDom.hasChildNodes():
for child in nodeDom.childNodes:
childName = self._getName(child)
+ # skip null children
if not childName:
continue
+ # initialize the possible array of children
if not node[elementName].has_key(childName):
- node[elementName][childName] = []
- #node[childName] = []
- childdict = self.toDict(child)
- for value in childdict.values():
- node[elementName][childName].append(value)
+ node[elementName][childName] = []
+ # if child node has text child nodes
+ # append the children to the array as strings
+ if child.hasChildNodes() and isinstance(child.childNodes[0], minidom.Text):
+ for nextchild in child.childNodes:
+ node[elementName][childName].append(nextchild.data)
+ # convert element child node to dict
+ else:
+ childdict = self.toDict(child)
+ for value in childdict.values():
+ node[elementName][childName].append(value)
#node[childName].append(self.toDict(child))
return node
"""
dom = minidom.parse(filename)
self.rootNode = dom.childNodes[0]
-
-
+
+
def parseString(self, xml):
"""
read an xml string and store it as a dom object.
self.rootNode = dom.childNodes[0]
+ def _httpGetXSD(self, xsdURI):
+ # split the URI into relevant parts
+ host = xsdURI.split("/")[2]
+ if xsdURI.startswith("https"):
+ conn = httplib.HTTPSConnection(host,
+ httplib.HTTPSConnection.default_port)
+ elif xsdURI.startswith("http"):
+ conn = httplib.HTTPConnection(host,
+ httplib.HTTPConnection.default_port)
+ conn.request("GET", xsdURI)
+ # If we can't download the schema, raise an exception
+ r1 = conn.getresponse()
+ if r1.status != 200:
+ raise Exception
+ return r1.read().replace('\n', '').replace('\t', '').strip()
+
+
+ def _parseXSD(self, xsdURI):
+ """
+ Download XSD from URL, or if file, read local xsd file and set schemaDict
+ """
+ # Since the schema definiton is a global namespace shared by and agreed upon by
+ # others, this should probably be a URL. Check for URL, download xsd, parse, or
+ # if local file, use local file.
+ schemaDom = None
+ if xsdURI.startswith("http"):
+ try:
+ schemaDom = minidom.parseString(self._httpGetXSD(xsdURI))
+ except Exception, e:
+ # logging.debug("%s: web file not found" % xsdURI)
+ # logging.debug("Using local file %s" % self.xsd")
+ print e
+ print "Can't find %s on the web. Continuing." % xsdURI
+ if not schemaDom:
+ if os.path.exists(xsdURI):
+ # logging.debug("using local copy.")
+ print "Using local %s" % xsdURI
+ schemaDom = minidom.parse(xsdURI)
+ else:
+ raise Exception("Can't find xsd locally")
+ self.schemaDict = self.toDict(schemaDom.childNodes[0])
+
+
def dict2dom(self, rdict, include_doc = False):
"""
convert a dict object into a dom object.
"""
def elementNode(tagname, rd):
- element = minidom.Element(tagname)
+ element = minidom.Element(tagname)
for key in rd.keys():
- if isinstance(rd[key], StringTypes):
- element.setAttribute(key, rd[key])
+ if isinstance(rd[key], StringTypes) or isinstance(rd[key], int):
+ element.setAttribute(key, str(rd[key]))
elif isinstance(rd[key], dict):
child = elementNode(key, rd[key])
element.appendChild(child)
elif isinstance(rd[key], list):
for item in rd[key]:
- child = elementNode(key, item)
- element.appendChild(child)
+ if isinstance(item, dict):
+ child = elementNode(key, item)
+ element.appendChild(child)
+ elif isinstance(item, StringTypes) or isinstance(item, int):
+ child = minidom.Element(key)
+ text = minidom.Text()
+ text.data = item
+ child.appendChild(text)
+ element.appendChild(child)
return element
-
+
+ # Minidom does not allow documents to have more then one
+ # child, but elements may have many children. Because of
+ # this, the document's root node will be the first key/value
+ # pair in the dictionary.
node = elementNode(rdict.keys()[0], rdict.values()[0])
if include_doc:
rootNode = minidom.Document()
return rdict
return tempdict
+
+
+ def filter(self, tagname, attribute, blacklist = [], whitelist = [], dom = None):
+ """
+ Removes all elements where:
+ 1. tagname matches the element tag
+ 2. attribute matches the element attribte
+ 3. attribute value is in valuelist
+ """
+
+ tempdict = {}
+ if not dom:
+ dom = self.rootNode
+
+ if dom.localName in [tagname] and dom.attributes.has_key(attribute):
+ if whitelist and dom.attributes.get(attribute).value not in whitelist:
+ dom.parentNode.removeChild(dom)
+ if blacklist and dom.attributes.get(attribute).value in blacklist:
+ dom.parentNode.removeChild(dom)
+
+ if dom.hasChildNodes():
+ for child in dom.childNodes:
+ self.filter(tagname, attribute, blacklist, whitelist, child)
+
+
+ def validateDicts(self):
+ types = {
+ 'EInt' : int,
+ 'EString' : str,
+ 'EByteArray' : list,
+ 'EBoolean' : bool,
+ 'EFloat' : float,
+ 'EDate' : date}
+
+
+
+class RecordSpec(Rspec):
+
+ def parseDict(self, rdict, include_doc = False):
+ """
+ Convert a dictionary into a dom object and store it.
+ """
+ self.rootNode = self.dict2dom(rdict, include_doc)
+
+ def dict2dom(self, rdict, include_doc = False):
+ record_dict = {'': rdict}
+ return Rspec.dict2dom(self, record_dict, include_doc)
+
# vim:ts=4:expandtab