Add ovsdbmonitor GUI tool by Andy Southgate, contributed by Citrix.
[sliver-openvswitch.git] / ovsdb / ovsdbmonitor / OVEFetch.py
diff --git a/ovsdb/ovsdbmonitor/OVEFetch.py b/ovsdb/ovsdbmonitor/OVEFetch.py
new file mode 100644 (file)
index 0000000..9dd1118
--- /dev/null
@@ -0,0 +1,386 @@
+# Copyright (c) 2010 Citrix Systems, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at:
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from OVEStandard import *
+from OVEConfig import *
+from OVELogger import *
+
+# This sequence installs the qt4reactor before twisted gets a chance to install its reactor
+import qt4reactor
+globalApp = QtGui.QApplication([])
+qt4reactor.install()
+
+try:
+    from twisted.conch.ssh import transport, userauth, connection, common, keys, channel
+    from twisted.internet import defer, protocol, reactor
+    from twisted.application import reactors
+except Exception, e:
+    print('+++ Python Twisted Conch module is required\n')
+    raise
+
+class OVEFetchUserAuth(userauth.SSHUserAuthClient):
+    def __init__(self, fetch, *params):
+        userauth.SSHUserAuthClient.__init__(self, *params)
+        self.fetch = fetch
+        self.authFails = 0
+    
+    def getPassword(self):
+        return defer.succeed(self.fetch.config()['password'])
+
+    def ssh_USERAUTH_FAILURE(self, packet):
+        if self.authFails > 0: # We normally get one so ignore.  Real failures send these repeatedly
+            OVELog('Authentication failure for '+self.fetch.config()['address'])
+        self.authFails += 1
+        userauth.SSHUserAuthClient.ssh_USERAUTH_FAILURE(self, packet)
+
+class OVEFetchConnection(connection.SSHConnection, QtCore.QObject):
+    def __init__(self, fetch, *params):
+        connection.SSHConnection.__init__(self, *params)
+        QtCore.QObject.__init__(self)
+        self.fetch = fetch
+        self._channel = None
+        self._oldChannels = []
+        
+    def serviceStarted(self):
+        self.emit(QtCore.SIGNAL('connectionService(QObject)'), self)
+
+    def serviceStopped(self):
+        self.emit(QtCore.SIGNAL('connectionService(QObject)'), None)
+
+    def execCommand(self, requester, ref, command, commandType):
+        if self._channel is not None:
+            # Don't delete old channels immediately in case they're e.g. going to time out with a failure
+            self._oldChannels.append(self._channel)
+            if len(self._oldChannels) > 90:
+                # For 30 second timeouts at 1 second refresh interval and three windows open on a single host, need 90 channels
+                del self._oldChannels[1]
+        self._channel = OVECommandChannel(self.fetch, requester, ref, command, commandType, 2**16, 2**15, self)
+        self.openChannel(self._channel)
+
+    def connectionLost(self, reason):
+        if self._channel is not None:
+            self._channel.connectionLost(reason)
+
+class OVEFetchTransport(transport.SSHClientTransport, QtCore.QObject):
+    def __init__(self, fetch, *params):
+        # There is no __init__ method for this class
+        # transport.SSHClientTransport.__init__(self, *params)
+        
+        QtCore.QObject.__init__(self)
+        self.fetch = fetch
+        self._connection = None
+        self.connect(self, QtCore.SIGNAL('channelFailure(QObject, int, QString, QString, QString)'), self.fetch.xon_channelFailure)
+        
+    def verifyHostKey(self, hostKey, fingerprint):
+        return defer.succeed(1)
+
+    def connectionSecure(self):
+        self._connection = OVEFetchConnection(self.fetch)
+        QtCore.QObject.connect(self._connection, QtCore.SIGNAL('connectionService(QObject)'), self.fetch.xon_connectionService)
+        self.requestService(
+            OVEFetchUserAuth(self.fetch, self.fetch.config().get('username', 'root'),
+                self._connection))
+
+    def connectionLost(self, reason):
+        if self._connection is not None:
+            self._connection.connectionLost(reason)
+
+class OVEFetchWrapper:
+    def __init__(self, contents):
+        self.contents = contents
+
+class OVECommandChannel(channel.SSHChannel, QtCore.QObject):
+    name = 'session'
+    MSEC_TIMEOUT=10000
+    STATUS_CONNECTION_LOST = 100001
+    STATUS_TIMEOUT = 100002
+    END_MARKER='END-MARKER'
+    END_MARKER_RE=re.compile(r'^END-MARKER$', re.MULTILINE)
+    
+    def __init__(self, fetch, requester, ref, command, commandType, *params):
+        channel.SSHChannel.__init__(self, *params)
+        QtCore.QObject.__init__(self)        
+        self.fetch = fetch
+        self.requester = requester
+        self.ref = ref
+        self.command = command
+        self.commandType= commandType
+        self._data = ''
+        self._extData = ''
+        self._jsonValues = None
+        self._timerId = None
+        self._status = None
+        self.connect(self, QtCore.SIGNAL('channelData(QObject, int, QString)'), self.fetch.xon_channelData)
+        self.connect(self, QtCore.SIGNAL('channelExtData(QObject, int, QString)'), self.fetch.xon_channelExtData)
+        self.connect(self, QtCore.SIGNAL('channelSuccess(QObject, int, QString, QString, QVariant)'), self.fetch.xon_channelSuccess)
+        self.connect(self, QtCore.SIGNAL('channelFailure(QObject, int, QString, QString, QString)'), self.fetch.xon_channelFailure)
+        
+    def openFailed(self, reason):
+        if self._timerId is not None:
+            self.killTimer(self._timerId)
+        self.emit(QtCore.SIGNAL('channelFailure(QObject, int, QString, QString, QString)'), self.requester, self.ref,
+            'Open failed:'+str(reason), '', '')
+
+    def channelOpen(self, ignoredData):
+        try:
+            nsCommand = common.NS(str(self.command))
+            self._timerId = self.startTimer(self.MSEC_TIMEOUT)
+            self.conn.sendRequest(self, 'exec', nsCommand, wantReply=1)
+        except Exception, e:
+            self.emit(QtCore.SIGNAL('channelFailure(QObject, int, QString, QString, QString)'), self.requester, self.ref,
+                'Open failed:'+str(e), self._data, self._extData)
+            
+    def dataReceived(self, data):
+        self._data += data
+        if OVEConfig.Inst().logTraffic:
+            self.emit(QtCore.SIGNAL('channelData(QObject, int, QString)'), self.requester, self.ref, data)
+        self.testIfDone()
+        
+    def extDataReceived(self, extData):
+        self._extData += extData
+        if OVEConfig.Inst().logTraffic:
+            self.emit(QtCore.SIGNAL('channelExtData(QObject, int, QString)'), self.requester, self.ref, extData)
+
+    def request_exit_status(self, data):
+        # We can get the exit status before the data, so delay calling sendResult until we get both
+        self._status = struct.unpack('>L', data)[0]
+        self.testIfDone()
+        
+    def testIfDone(self):
+        if self._status is not None:
+            if self._status != 0:
+                self.sendResult() # Failed, so send what we have
+            elif len(self._data) > 0:
+                # Status == success and we have some data
+                if self.commandType == 'JSON':
+                    try:
+                        # Decode the JSON data, to confirm that we have all of the data
+                        self._jsonValues = json.read(str(self._data)) # FIXME: Should handle unicode
+                        self.sendResult()
+                    except:
+                        pass # Wait for more data
+                elif self.commandType == 'framed':
+                    match = self.END_MARKER_RE.search(self._data)
+                    if match:
+                        self._data = self._data[:match.start()] # Remove end marker
+                        self.sendResult()
+                else:
+                    OVELog('Bad command type')
+
+    def sendResult(self):
+        if self._timerId is not None:
+            self.killTimer(self._timerId)
+        if self.commandType == 'JSON' and self._status == 0 and self._jsonValues is not None:
+            self.emit(QtCore.SIGNAL('channelSuccess(QObject, int, QString, QString, QVariant)'), self.requester, self.ref, self._data, self._extData, QVariant(OVEFetchWrapper(self._jsonValues)))
+        elif self.commandType != 'JSON' and self._status == 0:
+            self.emit(QtCore.SIGNAL('channelSuccess(QObject, int, QString, QString, QVariant)'), self.requester, self.ref, self._data, self._extData, QVariant(None))
+        else:
+            self.emit(QtCore.SIGNAL('channelFailure(QObject, int, QString, QString, QString)'), self.requester, self.ref, 'Remote command failed (rc='+str(self._status)+')', self._data, self._extData)
+        if self._status != self.STATUS_CONNECTION_LOST:
+            try:
+                self.loseConnection()
+            except Exception, e:
+                OVELog('OVECommandChannel.sendResult loseConnection error: '+str(e))
+
+    def connectionLost(self, reason):
+        self._extData += '+++ Connection lost'
+        self._status = self.STATUS_CONNECTION_LOST
+        self.sendResult()
+
+    def timerEvent(self, event):
+        if event.timerId() == self._timerId:
+            self._extData += '+++ Timeout'
+            self._status = self.STATUS_TIMEOUT
+            self.sendResult()
+        else:
+            QtCore.QObject.timerEvent(self, event)
+
+class OVEFetchEvent(QtCore.QEvent):
+    TYPE = QtCore.QEvent.Type(QtCore.QEvent.registerEventType())
+    def __init__(self, ref, data):
+        QtCore.QEvent.__init__(self, self.TYPE)
+        self.ref = ref
+        self.data = data
+
+class OVEFetchFailEvent(QtCore.QEvent):
+    TYPE = QtCore.QEvent.Type(QtCore.QEvent.registerEventType())
+    def __init__(self, ref, message):
+        QtCore.QEvent.__init__(self, self.TYPE)
+        self.ref = ref
+        self.message = str(message)
+
+class OVEFetch(QtCore.QObject):
+    instances = {}
+    SEC_TIMEOUT = 10.0
+    
+    def __init__(self, uuid):
+        QtCore.QObject.__init__(self)
+        self._hostUuid = uuid
+        self._config = None
+        self._transport = None
+        self._connection = None
+        self._commandQueue = []
+        self._timerRef = 0
+        self.refs = {}
+        self.messages = {}
+        self.values = {}
+        self.connect(OVEConfig.Inst(), QtCore.SIGNAL("configUpdated()"), self.xon_configUpdated)
+        
+    @classmethod
+    def Inst(cls, uuid):
+        if uuid not in cls.instances:
+            cls.instances[uuid] = OVEFetch(uuid)
+        return cls.instances[uuid]
+
+    @classmethod
+    def startReactor(cls):
+        reactor.runReturn()
+
+    def xon_configUpdated(self):
+        self._config = None
+        self.resetTransport()
+        
+    def xon_connectionService(self, connection):
+        self._connection = connection
+        if self._connection is not None:
+            OVELog('SSH connection to '+self.config()['address'] +' established')
+            for command in self._commandQueue:
+                # OVELog('Unqueueing '+str(command))
+                self.execCommand2(*command)
+            self._commandQueue = []
+
+    def xon_channelData(self, requester, ref, data):
+        if OVEConfig.Inst().logTraffic:
+            OVELog('Channel data received: '+str(data))
+
+    def xon_channelExtData(self, requester, ref, data):
+        if OVEConfig.Inst().logTraffic:
+            OVELog('+++ Channel extData (stderr) received: '+str(data))
+
+    def xon_channelFailure(self, requester, ref, message, data, extData):
+        if OVEConfig.Inst().logTraffic:
+            OVELog('+++ Channel failure: '+str(message))
+            OVELog("Closing SSH session due to failure")
+
+        errMessage = message
+        if len(data) > 0:
+            errMessage += '\n+++ Failed command output: '+data
+        if len(extData) > 0:
+            errMessage += '\n+++ Failed command output (stderr): '+extData
+
+        self.refs[requester] = ref # For PySide workaround
+        self.messages[requester] = errMessage # For PySide workaround
+        event = OVEFetchFailEvent(ref, errMessage)
+        QtCore.QCoreApplication.postEvent(requester, event)
+        self.resetTransport()
+        
+    def xon_channelSuccess(self, requester, ref, data, extData, jsonValueVariant):
+        jsonValues = jsonValueVariant.toPyObject()
+        if OVEConfig.Inst().logTraffic:
+            OVELog('--- Channel success')
+        try:
+            if jsonValues is not None:
+                values = jsonValues.contents
+            else:
+                values = str(data)
+
+            self.refs[requester] = ref # For PySide workaround
+            self.values[requester] = values # For PySide workaround
+            event = OVEFetchEvent(ref, values)
+            QtCore.QCoreApplication.postEvent(requester, event)
+        except Exception, e:
+            message = ('+++ Failed to decode JSON reply: '+str(e))
+            if len(data) > 0: message += "\n++++++ Data (stdout): "+str(data)
+            if len(extData) > 0: message += '\n++++++ Error (stderr): '+str(extData)
+            self.refs[requester] = ref # For PySide workaround
+            self.messages[requester] = message # For PySide workaround
+            event = OVEFetchFailEvent(ref, message)
+            QtCore.QCoreApplication.postEvent(requester, event)
+
+    # Use for workaround only
+    def snoopRef(self, requester):
+        return self.refs.get(requester, None)
+
+    # Use for workaround only
+    def snoopValues(self, requester):
+        return self.values.get(requester, None)
+
+    # Use for workaround only
+    def snoopMessage(self, requester):
+        return self.messages.get(requester, None)
+
+    def config(self):
+        if self._config is None:
+            self._config = OVEConfig.Inst().hostFromUuid(self._hostUuid)
+
+        return self._config
+    
+    def resetTransport(self):
+        if OVEConfig.Inst().logTraffic:
+            OVELog('Transport reset for '+self.config()['address'])
+        del self._connection
+        del self._transport
+        self._connection = None
+        self._transport = None
+        
+    def transportErrback(self, failure, requester, ref, address):
+        self._timerRef += 1 # Prevent timeout handling
+        self.resetTransport()
+        message = 'Failure connecting to '+address+': '+failure.getErrorMessage()
+        self.refs[requester] = ref # For PySide workaround
+        self.messages[requester] = message # For PySide workaround
+        event = OVEFetchFailEvent(ref, message)
+        QtCore.QCoreApplication.postEvent(requester, event)        
+        
+    def transportTimeout(self, timerRef, requester, ref, address):
+        if self._timerRef == timerRef and self._transport is not None and self._connection is None:
+            message = 'Connection attempt to ' +address+' timed out'
+            self.refs[requester] = ref # For PySide workaround
+            self.messages[requester] = message # For PySide workaround
+            event = OVEFetchFailEvent(ref, message)
+            QtCore.QCoreApplication.postEvent(requester, event)        
+            self.resetTransport()
+
+    def execCommand(self, requester, ref, command, commandType):
+        if OVEConfig.Inst().logTraffic:
+            hostName = (self.config() or {}).get('address', '<Address not set>')
+            OVELog(str(QtCore.QTime.currentTime().toString())+' '+hostName+': Executing '+command)
+        if self._transport is None:
+            self._connection = None
+            self._commandQueue.append((requester, ref, command, commandType))
+            config = self.config()
+            creator = protocol.ClientCreator(reactor, OVEFetchTransport, self)
+            self._transport = creator.connectTCP(config['address'], config.get('port', 22), timeout = self.SEC_TIMEOUT)
+            self._transport.addErrback(self.transportErrback, requester, ref, config['address'])
+            self._timerRef += 1
+            # Set this timer slightly longer than the twisted.conch timeout, as transportErrback can cancel
+            # the timeout and prevent double handling
+            # lambda timerRef = self._timerRef: takes a copy of self._timerRef
+            QtCore.QTimer.singleShot(int((1+self.SEC_TIMEOUT) * 1000), lambda timerRef = self._timerRef: self.transportTimeout(timerRef, requester, ref, config['address']))
+        else:
+            self.execCommand2(requester, ref, command, commandType)
+
+    def execCommand2(self, requester, ref, command, commandType):
+        if self._connection is None:
+            self._commandQueue.append((requester, ref, command, commandType))
+        else:
+            self._connection.execCommand(requester, ref, command, commandType)
+
+    def getTable(self, requester, tableName, ref = QtCore.QObject()):
+        command = '/usr/bin/ovsdb-client transact '+self.config()['connectTarget']+' \'["Open_vSwitch", {"op":"select","table":"'+tableName+'", "where":[]}]\''
+
+        self.execCommand(requester, ref, command, 'JSON')
+        
+    def execCommandFramed(self, requester, ref, command):
+        self.execCommand(requester, ref, command + ' && echo ' + OVECommandChannel.END_MARKER, 'framed')