8 from xml.dom import minidom
9 from types import StringTypes, ListType
13 def __init__(self, xml = None, xsd = None, NSURL = None):
15 Class to manipulate RSpecs. Reads and parses rspec xml into python dicts
16 and reads python dicts and writes rspec xml
18 self.xsd = # Schema. Can be local or remote file.
19 self.NSURL = # If schema is remote, Name Space URL to query (full path minus filename)
20 self.rootNode = # root of the DOM
21 self.dict = # dict of the RSpec.
22 self.schemaDict = {} # dict of the Schema
33 if type(xml) in StringTypes:
35 self.dict = self.toDict()
37 self._parseXSD(self.NSURL + self.xsd)
40 def _getText(self, nodelist):
43 if node.nodeType == node.TEXT_NODE:
47 # The rspec is comprised of 2 parts, and 1 reference:
48 # attributes/elements describe individual resources
49 # complexTypes are used to describe a set of attributes/elements
50 # complexTypes can include a reference to other complexTypes.
53 def _getName(self, node):
54 '''Gets name of node. If tag has no name, then return tag's localName'''
56 if not node.nodeName.startswith("#"):
59 elif node.attributes.has_key("name"):
60 name = node.attributes.get("name").value
64 # Attribute. {name : nameofattribute, {items: values})
65 def _attributeDict(self, attributeDom):
66 '''Traverse single attribute node. Create a dict {attributename : {name: value,}]}'''
67 node = {} # parsed dict
68 for attr in attributeDom.attributes.keys():
69 node[attr] = attributeDom.attributes.get(attr).value
73 def appendToDictOrCreate(self, dict, key, value):
74 if (dict.has_key(key)):
75 dict[key].append(value)
80 def toGenDict(self, nodeDom=None, parentdict=None, siblingdict={}, parent=None):
82 convert an XML to a nested dict:
83 * Non-terminal nodes (elements with string children and attributes) are simple dictionaries
84 * Terminal nodes (the rest) are nested dictionaries
90 curNodeName = nodeDom.localName
92 if (nodeDom.hasChildNodes()):
94 for attribute in nodeDom.attributes.keys():
95 childdict = self.appendToDictOrCreate(childdict, attribute, nodeDom.getAttribute(attribute))
96 for child in nodeDom.childNodes[:-1]:
98 siblingdict = self.appendToDictOrCreate(siblingdict, curNodeName, child.nodeValue)
100 childdict = self.toGenDict(child, None, childdict, curNodeName)
102 child = nodeDom.childNodes[-1]
103 if (child.nodeValue):
104 siblingdict = self.appendToDictOrCreate(siblingdict, curNodeName, child.nodeValue)
106 siblingdict = self.appendToDictOrCreate(siblingdict, curNodeName, childdict)
108 siblingdict = self.toGenDict(child, siblingdict, childdict, curNodeName)
111 for attribute in nodeDom.attributes.keys():
112 childdict = self.appendToDictOrCreate(childdict, attribute, nodeDom.getAttribute(attribute))
114 self.appendToDictOrCreate(siblingdict, curNodeName, childdict)
116 if (parentdict is not None):
117 parentdict = self.appendToDictOrCreate(parentdict, parent, siblingdict)
124 def toDict(self, nodeDom = None):
126 convert this rspec to a dict and return it.
130 nodeDom = self.rootNode
132 elementName = nodeDom.nodeName
133 if elementName and not elementName.startswith("#"):
134 # attributes have tags and values. get {tag: value}, else {type: value}
135 node[elementName] = self._attributeDict(nodeDom)
136 # resolve the child nodes.
137 if nodeDom.hasChildNodes():
138 for child in nodeDom.childNodes:
139 childName = self._getName(child)
142 if not childName: continue
144 # initialize the possible array of children
145 if not node[elementName].has_key(childName): node[elementName][childName] = []
147 if isinstance(child, minidom.Text):
148 # add if data is not empty
149 if child.data.strip():
150 node[elementName][childName].append(nextchild.data)
151 elif child.hasChildNodes() and isinstance(child.childNodes[0], minidom.Text):
152 for nextchild in child.childNodes:
153 node[elementName][childName].append(nextchild.data)
155 childdict = self.toDict(child)
156 for value in childdict.values():
157 node[elementName][childName].append(value)
164 convert this rspec to an xml string and return it.
166 return self.rootNode.toxml()
169 def toprettyxml(self):
171 print this rspec in xml in a pretty format.
173 return self.rootNode.toprettyxml()
176 def __removeWhitespaceNodes(self, parent):
177 for child in list(parent.childNodes):
178 if child.nodeType == minidom.Node.TEXT_NODE and child.data.strip() == '':
179 parent.removeChild(child)
181 self.__removeWhitespaceNodes(child)
183 def parseFile(self, filename):
185 read a local xml file and store it as a dom object.
187 dom = minidom.parse(filename)
188 self.__removeWhitespaceNodes(dom)
189 self.rootNode = dom.childNodes[0]
192 def parseString(self, xml):
194 read an xml string and store it as a dom object.
196 dom = minidom.parseString(xml)
197 self.__removeWhitespaceNodes(dom)
198 self.rootNode = dom.childNodes[0]
201 def _httpGetXSD(self, xsdURI):
202 # split the URI into relevant parts
203 host = xsdURI.split("/")[2]
204 if xsdURI.startswith("https"):
205 conn = httplib.HTTPSConnection(host,
206 httplib.HTTPSConnection.default_port)
207 elif xsdURI.startswith("http"):
208 conn = httplib.HTTPConnection(host,
209 httplib.HTTPConnection.default_port)
210 conn.request("GET", xsdURI)
211 # If we can't download the schema, raise an exception
212 r1 = conn.getresponse()
215 return r1.read().replace('\n', '').replace('\t', '').strip()
218 def _parseXSD(self, xsdURI):
220 Download XSD from URL, or if file, read local xsd file and set
223 Since the schema definiton is a global namespace shared by and
224 agreed upon by others, this should probably be a URL. Check
225 for URL, download xsd, parse, or if local file, use that.
228 if xsdURI.startswith("http"):
230 schemaDom = minidom.parseString(self._httpGetXSD(xsdURI))
232 # logging.debug("%s: web file not found" % xsdURI)
233 # logging.debug("Using local file %s" % self.xsd")
235 print "Can't find %s on the web. Continuing." % xsdURI
237 if os.path.exists(xsdURI):
238 # logging.debug("using local copy.")
239 print "Using local %s" % xsdURI
240 schemaDom = minidom.parse(xsdURI)
242 raise Exception("Can't find xsd locally")
243 self.schemaDict = self.toDict(schemaDom.childNodes[0])
246 def dict2dom(self, rdict, include_doc = False):
248 convert a dict object into a dom object.
251 def elementNode(tagname, rd):
252 element = minidom.Element(tagname)
253 for key in rd.keys():
254 if isinstance(rd[key], StringTypes) or isinstance(rd[key], int):
255 element.setAttribute(key, str(rd[key]))
256 elif isinstance(rd[key], dict):
257 child = elementNode(key, rd[key])
258 element.appendChild(child)
259 elif isinstance(rd[key], list):
261 if isinstance(item, dict):
262 child = elementNode(key, item)
263 element.appendChild(child)
264 elif isinstance(item, StringTypes) or isinstance(item, int):
265 child = minidom.Element(key)
266 text = minidom.Text()
268 child.appendChild(text)
269 element.appendChild(child)
272 # Minidom does not allow documents to have more then one
273 # child, but elements may have many children. Because of
274 # this, the document's root node will be the first key/value
275 # pair in the dictionary.
276 node = elementNode(rdict.keys()[0], rdict.values()[0])
278 rootNode = minidom.Document()
279 rootNode.appendChild(node)
285 def parseDict(self, rdict, include_doc = True):
287 Convert a dictionary into a dom object and store it.
289 self.rootNode = self.dict2dom(rdict, include_doc).childNodes[0]
292 def getDictsByTagName(self, tagname, dom = None):
294 Search the dom for all elements with the specified tagname
295 and return them as a list of dicts
300 doms = dom.getElementsByTagName(tagname)
301 dictlist = [self.toDict(d) for d in doms]
302 for item in dictlist:
303 for value in item.values():
307 def getDictByTagNameValue(self, tagname, value, dom = None):
309 Search the dom for the first element with the specified tagname
310 and value and return it as a dict.
315 dicts = self.getDictsByTagName(tagname, dom)
318 if rdict.has_key('name') and rdict['name'] in [value]:
324 def filter(self, tagname, attribute, blacklist = [], whitelist = [], dom = None):
326 Removes all elements where:
327 1. tagname matches the element tag
328 2. attribute matches the element attribte
329 3. attribute value is in valuelist
336 if dom.localName in [tagname] and dom.attributes.has_key(attribute):
337 if whitelist and dom.attributes.get(attribute).value not in whitelist:
338 dom.parentNode.removeChild(dom)
339 if blacklist and dom.attributes.get(attribute).value in blacklist:
340 dom.parentNode.removeChild(dom)
342 if dom.hasChildNodes():
343 for child in dom.childNodes:
344 self.filter(tagname, attribute, blacklist, whitelist, child)
347 def merge(self, rspecs, tagname, dom=None):
349 Merge this rspec with the requested rspec based on the specified
350 starting tag name. The start tag (and all of its children) will be merged
359 if dom.localName in [tagname] and dom.attributes.has_key(attribute):
360 if whitelist and dom.attributes.get(attribute).value not in whitelist:
361 dom.parentNode.removeChild(dom)
362 if blacklist and dom.attributes.get(attribute).value in blacklist:
363 dom.parentNode.removeChild(dom)
365 if dom.hasChildNodes():
366 for child in dom.childNodes:
367 self.filter(tagname, attribute, blacklist, whitelist, child)
369 def validateDicts(self):
379 def pprint(self, r = None, depth = 0):
381 Pretty print the dict
384 if r == None: r = self.dict
386 for tab in range(0,depth): line += " "
387 # check if it's nested
390 print line + "%s:" % i
391 self.pprint(r[i], depth + 1)
392 elif type(r) in (tuple, list):
393 for j in r: self.pprint(j, depth + 1)
394 # not nested so just print.
396 print line + "%s" % r
400 class RecordSpec(RSpec):
403 def parseDict(self, rdict, include_doc = False):
405 Convert a dictionary into a dom object and store it.
407 self.rootNode = self.dict2dom(rdict, include_doc)
409 def dict2dom(self, rdict, include_doc = False):
411 if not len(rdict.keys()) == 1:
412 record_dict = {self.root_tag : rdict}
413 return RSpec.dict2dom(self, record_dict, include_doc)