import re import xmlrpclib from lxml import etree from PyQt4.QtXml import QDomDocument from sface.xmlwidget import XmlWindow, DomModel, XmlView, XmlDelegate, ElemNodeDelegate, TextNodeDelegate, CommentNodeDelegate from PyQt4.QtCore import * from PyQt4.QtGui import * class XmlrpcReader(): def __init__(self): self.rawOutput = None self.responses = [] self.calls = [] def store(self, rawOutput): self.rawOutput = rawOutput def parseMethodCall(self, mc): mc = str(mc) request = {} try: tree = etree.fromstring(mc) except etree.XMLSyntaxError, e: return request if tree.tag != "methodCall" or (len(list(tree))==0): return request try: (pythonParams, methodName) = xmlrpclib.loads(etree.tostring(tree)) except: # in case something went wrong when unmarashaling... pythonParams = [] methodName = "" request["args"] = [str(x) for x in pythonParams] request["methodName"] = methodName return request def parseMethodResponse(self, mr): mr = str(mr) # PyQT supplies a QByteArray; make it a string response = {"kind": "unknown"} try: tree = etree.fromstring(mr) except etree.XMLSyntaxError, e: print "failed to parse XML response", str(e) #file("badparse.xml","w").write(mr) return response if tree.tag != "methodResponse" or (len(list(tree))==0): return response # a fault should look like: # kind: "fault" # faultCode: "102" # faultString: "Register: Missing authority..." faults = tree.xpath("//methodResponse/fault") for fault in faults: response["kind"] = "fault" structs = fault.xpath("value/struct") for struct in structs: members = struct.xpath("member") for member in members: names = member.xpath("name") values = member.xpath("value") if (names) and (values): name = names[0] value = values[0] if len(list(value))>0: data = list(value)[0] response[name.text] = data.text # once we have the first fault, return return response # whatever didn't fault must have succeeded? # "success" really isn't quite the right word. The best we can say is # that the call was executed. The actual op may or may not have # succeeded. response["kind"] = "executed" return response def extractXml(self): pttrnAsk = '.*?' pttrnAns = '.*?' requests = re.compile(pttrnAsk, re.DOTALL).findall(self.rawOutput) replies = re.compile(pttrnAns, re.DOTALL).findall(self.rawOutput) # cleaning requests = [ x.replace('\\n','\n') for x in requests ] replies = [ x.replace('\\n','\n').replace("'\nbody: '", '').replace("\"\nbody: '", '') for x in replies ] # A well-formed XML document must have one, and only one, top-level element self.responses = [] self.xml = "" for requestXml in requests: self.xml += requestXml callDict = {"request": requestXml} # we could have less responses than calls, so guard the pop if replies: replyXml = replies.pop(0) self.xml += replyXml # parse the response to extract fault information responseDict = self.parseMethodResponse(replyXml) self.responses.append(responseDict) requestDict = self.parseMethodCall(requestXml) callDict["reply"] = replyXml callDict.update(responseDict) callDict.update(requestDict) self.calls.append(callDict) # just in case somehow we ended up with more responses than calls while replies: replyXml = replies.pop() self.xml += replyXml self.responses.append(self.parseMethodResponse(replyXml)) return self.xml def stats(self): # statistics: round-trip time, size of the com pass class XmlrpcTracker(QWidget): def __init__(self, parent=None): QWidget.__init__(self, parent=parent) self.reader = XmlrpcReader() labelCalls = QLabel("Calls: (from most recent to least recent)") self.callTable = QTableWidget(self) self.callTable.setSelectionBehavior(QAbstractItemView.SelectRows) self.callTable.setSelectionMode(QAbstractItemView.SingleSelection) self.callTable.setMaximumHeight(140) labelArgs = QLabel("Params:") self.argsTable = QTableWidget(self) self.argsTable.setMaximumHeight(140) labelXml = QLabel("XMLRPC Contents") self.xmlView = XmlView(self) self.delegate = XmlDelegate(self) self.delegate.insertNodeDelegate('element', ElemNodeDelegate()) self.delegate.insertNodeDelegate('text', TextNodeDelegate()) self.delegate.insertNodeDelegate('comment', CommentNodeDelegate()) self.xmlView.setItemDelegate(self.delegate) exportButton = QPushButton("&Export Call") layoutButtons = QHBoxLayout() layoutButtons.addWidget(exportButton) layoutButtons.addStretch() self.layout = QVBoxLayout() self.layout.addWidget(labelCalls) self.layout.addWidget(self.callTable) self.layout.addWidget(labelArgs) self.layout.addWidget(self.argsTable) self.layout.addWidget(labelXml) self.layout.addWidget(self.xmlView) self.layout.addLayout(layoutButtons) self.setLayout(self.layout) self.connect(self.callTable, SIGNAL("itemSelectionChanged ()"), self.onCallSelect) self.connect(exportButton, SIGNAL("clicked()"), self.onExportClicked) def getAndPrint(self, rawOutput): self.reader.store(rawOutput) self.reader.extractXml() self.updateCallTable() def updateCallTable(self): self.callTable.clear() self.callTable.setColumnCount(3) self.callTable.setHorizontalHeaderLabels(["name", "status", "faultString"]) self.callTable.setHorizontalScrollMode(QAbstractItemView.ScrollPerPixel) calls = self.reader.calls self.callTable.setRowCount(len(calls)) row = 0 reverse_calls = list(enumerate(calls)) reverse_calls.reverse() for (index,call) in reverse_calls: item = QTableWidgetItem(call.get("methodName", "unknown")) item.setFlags(Qt.ItemIsSelectable | Qt.ItemIsEnabled) item.setData(Qt.UserRole, index) self.callTable.setItem(row, 0, item) item = QTableWidgetItem(call.get("kind", "unknown")) item.setFlags(Qt.ItemIsSelectable | Qt.ItemIsEnabled) item.setData(Qt.UserRole, index) self.callTable.setItem(row, 1, item) item = QTableWidgetItem(call.get("faultString", "")) item.setFlags(Qt.ItemIsSelectable | Qt.ItemIsEnabled) item.setData(Qt.UserRole, index) self.callTable.setItem(row, 2, item) row = row + 1 self.callTable.resizeColumnsToContents() def updateArgsTable(self, args): self.argsTable.clear() self.argsTable.setColumnCount(1) self.argsTable.setHorizontalHeaderLabels(["param"]) self.argsTable.horizontalHeader().hide() self.argsTable.setHorizontalScrollMode(QAbstractItemView.ScrollPerPixel) self.argsTable.setRowCount(len(args)) row = 0 for arg in args: if len(arg)>255: arg = arg[:255] + "..." item = QTableWidgetItem(arg) item.setFlags(Qt.ItemIsSelectable | Qt.ItemIsEnabled) self.argsTable.setItem(row, 0, item) row = row + 1 self.argsTable.resizeColumnsToContents() def getSelectedCall(self): selItems = self.callTable.selectedItems() if (len(selItems) <= 0): return None row = selItems[0].data(Qt.UserRole).toInt()[0] calls = self.reader.calls if len(calls)<=row: return None call = calls[row] return call def onCallSelect(self): call = self.getSelectedCall() if not call: return xml = "" + call.get("request","") + call.get("reply", "") + "" #xml = call.get("request","") + call.get("reply", "") self.displayXml(xml) self.updateArgsTable(call.get("args",[])) def onExportClicked(self): call = self.getSelectedCall() if not call: return filename = QFileDialog.getSaveFileName(self, 'Save File', '.') f = open(filename, "w") f.write(call.get("request","") + "\n") f.write(call.get("reply","") + "\n") f.close() def displayXml(self, xml): self.document = QDomDocument("XMLRPC Tracker") self.document.setContent(xml) self.model = DomModel(self.document, self) self.xmlView.setModel(self.model) # There's one global tracker for XMLRPCs. Use init_tracker() to create it, and # get_tracker() to get it. glo_tracker = None def init_tracker(parent): global glo_tracker glo_tracker = XmlrpcTracker(parent) return glo_tracker def get_tracker(): global glo_tracker return glo_tracker