import re
from lxml import etree
from PyQt4.QtXml import QDomDocument
from sface.xmlwidget import XmlWindow, DomModel, XmlView, XmlDelegate, ElemNodeDelegate, TextNodeDelegate, CommentNodeDelegate
from PyQt4.QtCore import *
from PyQt4.QtGui import *
class XmlrpcReader():
def __init__(self):
self.rawOutput = None
self.responses = []
self.calls = []
def store(self, rawOutput):
self.rawOutput = rawOutput
def paramString(self, item):
"""
Convert an xmlrpc into a human-readable string
Is there an easier way?
"""
if item.tag == "string":
return item.text
elif item.tag == "array":
arrayValues = item.xpath("data/value")
if arrayValues:
children = list(arrayValues[0])
if children:
return self.paramString(children[0]) + ", ..."
else:
return "[]"
else:
return "[]"
elif item.tag == "struct":
dict = {}
members = item.xpath("member")
for member in members:
names = member.xpath("name")
values = member.xpath("value")
if (names) and (values):
name = names[0]
value = values[0]
if len(list(value))>0:
data = list(value)[0]
dict[name.text] = self.paramString(list(value)[0])
return str(dict)
else:
return "unknown"
def parseMethodCall(self, mc):
mc = str(mc)
request = {}
try:
tree = etree.fromstring(mc)
except etree.XMLSyntaxError, e:
return request
if tree.tag != "methodCall" or (len(list(tree))==0):
return request
methodNames = tree.xpath("//methodCall/methodName")
if len(methodNames) == 0:
return request
request["args"] = []
params = tree.xpath("//methodCall/params/param/value")
for param in params:
children = list(param)
if children:
request["args"].append(self.paramString(children[0]))
request["methodName"] = methodNames[0].text
return request
def parseMethodResponse(self, mr):
mr = str(mr) # PyQT supplies a QByteArray; make it a string
response = {"kind": "unknown"}
try:
tree = etree.fromstring(mr)
except etree.XMLSyntaxError, e:
print "failed to parse XML response", str(e)
#file("badparse.xml","w").write(mr)
return response
if tree.tag != "methodResponse" or (len(list(tree))==0):
return response
# a fault should look like:
# kind: "fault"
# faultCode: "102"
# faultString: "Register: Missing authority..."
faults = tree.xpath("//methodResponse/fault")
for fault in faults:
response["kind"] = "fault"
structs = fault.xpath("value/struct")
for struct in structs:
members = struct.xpath("member")
for member in members:
names = member.xpath("name")
values = member.xpath("value")
if (names) and (values):
name = names[0]
value = values[0]
if len(list(value))>0:
data = list(value)[0]
response[name.text] = data.text
# once we have the first fault, return
return response
# whatever didn't fault must have succeeded?
# "success" really isn't quite the right word. The best we can say is
# that the call was executed. The actual op may or may not have
# succeeded.
response["kind"] = "executed"
return response
def extractXml(self):
pttrnAsk = '.*?'
pttrnAns = '.*?'
requests = re.compile(pttrnAsk, re.DOTALL).findall(self.rawOutput)
replies = re.compile(pttrnAns, re.DOTALL).findall(self.rawOutput)
# cleaning
requests = [ x.replace('\\n','\n') for x in requests ]
replies = [ x.replace('\\n','\n').replace("'\nbody: '", '').replace("\"\nbody: '", '') for x in replies ]
# A well-formed XML document must have one, and only one, top-level element
self.responses = []
self.xml = ""
for requestXml in requests:
self.xml += requestXml
callDict = {"request": requestXml}
# we could have less responses than calls, so guard the pop
if replies:
replyXml = replies.pop(0)
self.xml += replyXml
# parse the response to extract fault information
responseDict = self.parseMethodResponse(replyXml)
self.responses.append(responseDict)
requestDict = self.parseMethodCall(requestXml)
callDict["reply"] = replyXml
callDict.update(responseDict)
callDict.update(requestDict)
self.calls.append(callDict)
# just in case somehow we ended up with more responses than calls
while replies:
replyXml = replies.pop()
self.xml += replyXml
self.responses.append(self.parseMethodResponse(replyXml))
return self.xml
def stats(self):
# statistics: round-trip time, size of the com
pass
class XmlrpcTracker(QWidget):
def __init__(self, parent=None):
QWidget.__init__(self, parent=parent)
self.reader = XmlrpcReader()
labelCalls = QLabel("Calls: (from most recent to least recent)")
self.callTable = QTableWidget(self)
self.callTable.setSelectionBehavior(QAbstractItemView.SelectRows)
self.callTable.setSelectionMode(QAbstractItemView.SingleSelection)
self.callTable.setMaximumHeight(140)
labelArgs = QLabel("Params:")
self.argsTable = QTableWidget(self)
self.argsTable.setMaximumHeight(140)
labelXml = QLabel("XMLRPC Contents")
self.xmlView = XmlView(self)
self.delegate = XmlDelegate(self)
self.delegate.insertNodeDelegate('element', ElemNodeDelegate())
self.delegate.insertNodeDelegate('text', TextNodeDelegate())
self.delegate.insertNodeDelegate('comment', CommentNodeDelegate())
self.xmlView.setItemDelegate(self.delegate)
self.layout = QVBoxLayout()
self.layout.addWidget(labelCalls)
self.layout.addWidget(self.callTable)
self.layout.addWidget(labelArgs)
self.layout.addWidget(self.argsTable)
self.layout.addWidget(labelXml)
self.layout.addWidget(self.xmlView)
self.setLayout(self.layout)
self.connect(self.callTable, SIGNAL("itemSelectionChanged ()"), self.onCallSelect)
def getAndPrint(self, rawOutput):
self.reader.store(rawOutput)
self.reader.extractXml()
self.updateCallTable()
def updateCallTable(self):
self.callTable.clear()
self.callTable.setColumnCount(2)
self.callTable.setHorizontalHeaderLabels(["name", "status"])
calls = self.reader.calls
self.callTable.setRowCount(len(calls))
row = 0
reverse_calls = list(enumerate(calls))
reverse_calls.reverse()
for (index,call) in reverse_calls:
item = QTableWidgetItem(call.get("methodName", "unknown"))
item.setFlags(Qt.ItemIsSelectable | Qt.ItemIsEnabled)
item.setData(Qt.UserRole, index)
self.callTable.setItem(row, 0, item)
item = QTableWidgetItem(call.get("kind", "unknown"))
item.setFlags(Qt.ItemIsSelectable | Qt.ItemIsEnabled)
item.setData(Qt.UserRole, index)
self.callTable.setItem(row, 1, item)
row = row + 1
self.callTable.resizeColumnsToContents()
def updateArgsTable(self, args):
self.argsTable.clear()
self.argsTable.setColumnCount(1)
self.argsTable.setHorizontalHeaderLabels(["param"])
self.argsTable.horizontalHeader().hide()
self.argsTable.setHorizontalScrollMode(QAbstractItemView.ScrollPerPixel)
self.argsTable.setRowCount(len(args))
row = 0
for arg in args:
if len(arg)>255:
arg = arg[:255] + "..."
item = QTableWidgetItem(arg)
item.setFlags(Qt.ItemIsSelectable | Qt.ItemIsEnabled)
self.argsTable.setItem(row, 0, item)
row = row + 1
self.argsTable.resizeColumnsToContents()
def onCallSelect(self):
selItems = self.callTable.selectedItems()
if (len(selItems) <= 0):
return
row = selItems[0].data(Qt.UserRole).toInt()[0]
calls = self.reader.calls
if len(calls)<=row:
return
call = calls[row]
xml = "" + call.get("request","") + call.get("reply", "") + ""
#xml = call.get("request","") + call.get("reply", "")
self.displayXml(xml)
self.updateArgsTable(call.get("args",[]))
def displayXml(self, xml):
self.document = QDomDocument("XMLRPC Tracker")
self.document.setContent(xml)
self.model = DomModel(self.document, self)
self.xmlView.setModel(self.model)
# There's one global tracker for XMLRPCs. Use init_tracker() to create it, and
# get_tracker() to get it.
glo_tracker = None
def init_tracker(parent):
global glo_tracker
glo_tracker = XmlrpcTracker(parent)
return glo_tracker
def get_tracker():
global glo_tracker
return glo_tracker