import httplib
from xml.dom import minidom
from types import StringTypes, ListType
+from lxml import etree
+from StringIO import StringIO
-class Rspec:
+from sfa.util.sfalogging import sfa_logger
+
+class RSpec:
def __init__(self, xml = None, xsd = None, NSURL = None):
'''
self.dict = {}
self.schemaDict = {}
self.NSURL = NSURL
- if xml:
+ if xml:
if type(xml) == file:
self.parseFile(xml)
- if type(xml) == str:
+ if type(xml) in StringTypes:
self.parseString(xml)
self.dict = self.toDict()
if xsd:
if nodeDom.hasChildNodes():
for child in nodeDom.childNodes:
childName = self._getName(child)
- # skip null children
- if not childName:
- continue
- # initialize the possible array of children
- if not node[elementName].has_key(childName):
- node[elementName][childName] = []
- # if child node has text child nodes
- # append the children to the array as strings
- if child.hasChildNodes() and isinstance(child.childNodes[0], minidom.Text):
- for nextchild in child.childNodes:
+
+ # skip null children
+ if not childName: continue
+
+ # initialize the possible array of children
+ if not node[elementName].has_key(childName): node[elementName][childName] = []
+
+ if isinstance(child, minidom.Text):
+ # add if data is not empty
+ if child.data.strip():
+ node[elementName][childName].append(nextchild.data)
+ elif child.hasChildNodes() and isinstance(child.childNodes[0], minidom.Text):
+ for nextchild in child.childNodes:
node[elementName][childName].append(nextchild.data)
- # convert element child node to dict
- else:
+ else:
childdict = self.toDict(child)
for value in childdict.values():
node[elementName][childName].append(value)
- #node[childName].append(self.toDict(child))
+
return node
return self.rootNode.toprettyxml()
+ def __removeWhitespaceNodes(self, parent):
+ for child in list(parent.childNodes):
+ if child.nodeType == minidom.Node.TEXT_NODE and child.data.strip() == '':
+ parent.removeChild(child)
+ else:
+ self.__removeWhitespaceNodes(child)
+
def parseFile(self, filename):
"""
read a local xml file and store it as a dom object.
"""
dom = minidom.parse(filename)
+ self.__removeWhitespaceNodes(dom)
self.rootNode = dom.childNodes[0]
"""
read an xml string and store it as a dom object.
"""
- xml = xml.replace('\n', '').replace('\t', '').strip()
dom = minidom.parseString(xml)
+ self.__removeWhitespaceNodes(dom)
self.rootNode = dom.childNodes[0]
def _parseXSD(self, xsdURI):
"""
- Download XSD from URL, or if file, read local xsd file and set schemaDict
+ Download XSD from URL, or if file, read local xsd file and set
+ schemaDict.
+
+ Since the schema definiton is a global namespace shared by and
+ agreed upon by others, this should probably be a URL. Check
+ for URL, download xsd, parse, or if local file, use that.
"""
- # Since the schema definiton is a global namespace shared by and agreed upon by
- # others, this should probably be a URL. Check for URL, download xsd, parse, or
- # if local file, use local file.
schemaDom = None
if xsdURI.startswith("http"):
try:
except Exception, e:
# logging.debug("%s: web file not found" % xsdURI)
# logging.debug("Using local file %s" % self.xsd")
- print e
- print "Can't find %s on the web. Continuing." % xsdURI
+ sfa_logger().log_exc("rspec.parseXSD: can't find %s on the web. Continuing." % xsdURI)
if not schemaDom:
if os.path.exists(xsdURI):
# logging.debug("using local copy.")
- print "Using local %s" % xsdURI
+ sfa_logger().debug("rspec.parseXSD: Using local %s" % xsdURI)
schemaDom = minidom.parse(xsdURI)
else:
- raise Exception("Can't find xsd locally")
+ raise Exception("rspec.parseXSD: can't find xsd locally")
self.schemaDict = self.toDict(schemaDom.childNodes[0])
"""
Convert a dictionary into a dom object and store it.
"""
- self.rootNode = self.dict2dom(rdict, include_doc)
+ self.rootNode = self.dict2dom(rdict, include_doc).childNodes[0]
def getDictsByTagName(self, tagname, dom = None):
self.filter(tagname, attribute, blacklist, whitelist, child)
+ def merge(self, rspecs, tagname, dom=None):
+ """
+ Merge this rspec with the requested rspec based on the specified
+ starting tag name. The start tag (and all of its children) will be merged
+ """
+ tempdict = {}
+ if not dom:
+ dom = self.rootNode
+
+ whitelist = []
+ blacklist = []
+
+ if dom.localName in [tagname] and dom.attributes.has_key(attribute):
+ if whitelist and dom.attributes.get(attribute).value not in whitelist:
+ dom.parentNode.removeChild(dom)
+ if blacklist and dom.attributes.get(attribute).value in blacklist:
+ dom.parentNode.removeChild(dom)
+
+ if dom.hasChildNodes():
+ for child in dom.childNodes:
+ self.filter(tagname, attribute, blacklist, whitelist, child)
+
def validateDicts(self):
types = {
'EInt' : int,
-class RecordSpec(Rspec):
+class RecordSpec(RSpec):
root_tag = 'record'
def parseDict(self, rdict, include_doc = False):
record_dict = rdict
if not len(rdict.keys()) == 1:
record_dict = {self.root_tag : rdict}
- return Rspec.dict2dom(self, record_dict, include_doc)
+ return RSpec.dict2dom(self, record_dict, include_doc)
# vim:ts=4:expandtab