Add ovsdbmonitor GUI tool by Andy Southgate, contributed by Citrix.
[sliver-openvswitch.git] / ovsdb / ovsdbmonitor / OVEFetch.py
1 # Copyright (c) 2010 Citrix Systems, Inc.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 from OVEStandard import *
16 from OVEConfig import *
17 from OVELogger import *
18
19 # This sequence installs the qt4reactor before twisted gets a chance to install its reactor
20 import qt4reactor
21 globalApp = QtGui.QApplication([])
22 qt4reactor.install()
23
24 try:
25     from twisted.conch.ssh import transport, userauth, connection, common, keys, channel
26     from twisted.internet import defer, protocol, reactor
27     from twisted.application import reactors
28 except Exception, e:
29     print('+++ Python Twisted Conch module is required\n')
30     raise
31
32 class OVEFetchUserAuth(userauth.SSHUserAuthClient):
33     def __init__(self, fetch, *params):
34         userauth.SSHUserAuthClient.__init__(self, *params)
35         self.fetch = fetch
36         self.authFails = 0
37     
38     def getPassword(self):
39         return defer.succeed(self.fetch.config()['password'])
40
41     def ssh_USERAUTH_FAILURE(self, packet):
42         if self.authFails > 0: # We normally get one so ignore.  Real failures send these repeatedly
43             OVELog('Authentication failure for '+self.fetch.config()['address'])
44         self.authFails += 1
45         userauth.SSHUserAuthClient.ssh_USERAUTH_FAILURE(self, packet)
46
47 class OVEFetchConnection(connection.SSHConnection, QtCore.QObject):
48     def __init__(self, fetch, *params):
49         connection.SSHConnection.__init__(self, *params)
50         QtCore.QObject.__init__(self)
51         self.fetch = fetch
52         self._channel = None
53         self._oldChannels = []
54         
55     def serviceStarted(self):
56         self.emit(QtCore.SIGNAL('connectionService(QObject)'), self)
57
58     def serviceStopped(self):
59         self.emit(QtCore.SIGNAL('connectionService(QObject)'), None)
60
61     def execCommand(self, requester, ref, command, commandType):
62         if self._channel is not None:
63             # Don't delete old channels immediately in case they're e.g. going to time out with a failure
64             self._oldChannels.append(self._channel)
65             if len(self._oldChannels) > 90:
66                 # For 30 second timeouts at 1 second refresh interval and three windows open on a single host, need 90 channels
67                 del self._oldChannels[1]
68         self._channel = OVECommandChannel(self.fetch, requester, ref, command, commandType, 2**16, 2**15, self)
69         self.openChannel(self._channel)
70
71     def connectionLost(self, reason):
72         if self._channel is not None:
73             self._channel.connectionLost(reason)
74
75 class OVEFetchTransport(transport.SSHClientTransport, QtCore.QObject):
76     def __init__(self, fetch, *params):
77         # There is no __init__ method for this class
78         # transport.SSHClientTransport.__init__(self, *params)
79         
80         QtCore.QObject.__init__(self)
81         self.fetch = fetch
82         self._connection = None
83         self.connect(self, QtCore.SIGNAL('channelFailure(QObject, int, QString, QString, QString)'), self.fetch.xon_channelFailure)
84         
85     def verifyHostKey(self, hostKey, fingerprint):
86         return defer.succeed(1)
87
88     def connectionSecure(self):
89         self._connection = OVEFetchConnection(self.fetch)
90         QtCore.QObject.connect(self._connection, QtCore.SIGNAL('connectionService(QObject)'), self.fetch.xon_connectionService)
91         self.requestService(
92             OVEFetchUserAuth(self.fetch, self.fetch.config().get('username', 'root'),
93                 self._connection))
94
95     def connectionLost(self, reason):
96         if self._connection is not None:
97             self._connection.connectionLost(reason)
98
99 class OVEFetchWrapper:
100     def __init__(self, contents):
101         self.contents = contents
102
103 class OVECommandChannel(channel.SSHChannel, QtCore.QObject):
104     name = 'session'
105     MSEC_TIMEOUT=10000
106     STATUS_CONNECTION_LOST = 100001
107     STATUS_TIMEOUT = 100002
108     END_MARKER='END-MARKER'
109     END_MARKER_RE=re.compile(r'^END-MARKER$', re.MULTILINE)
110     
111     def __init__(self, fetch, requester, ref, command, commandType, *params):
112         channel.SSHChannel.__init__(self, *params)
113         QtCore.QObject.__init__(self)        
114         self.fetch = fetch
115         self.requester = requester
116         self.ref = ref
117         self.command = command
118         self.commandType= commandType
119         self._data = ''
120         self._extData = ''
121         self._jsonValues = None
122         self._timerId = None
123         self._status = None
124         self.connect(self, QtCore.SIGNAL('channelData(QObject, int, QString)'), self.fetch.xon_channelData)
125         self.connect(self, QtCore.SIGNAL('channelExtData(QObject, int, QString)'), self.fetch.xon_channelExtData)
126         self.connect(self, QtCore.SIGNAL('channelSuccess(QObject, int, QString, QString, QVariant)'), self.fetch.xon_channelSuccess)
127         self.connect(self, QtCore.SIGNAL('channelFailure(QObject, int, QString, QString, QString)'), self.fetch.xon_channelFailure)
128         
129     def openFailed(self, reason):
130         if self._timerId is not None:
131             self.killTimer(self._timerId)
132         self.emit(QtCore.SIGNAL('channelFailure(QObject, int, QString, QString, QString)'), self.requester, self.ref,
133             'Open failed:'+str(reason), '', '')
134
135     def channelOpen(self, ignoredData):
136         try:
137             nsCommand = common.NS(str(self.command))
138             self._timerId = self.startTimer(self.MSEC_TIMEOUT)
139             self.conn.sendRequest(self, 'exec', nsCommand, wantReply=1)
140         except Exception, e:
141             self.emit(QtCore.SIGNAL('channelFailure(QObject, int, QString, QString, QString)'), self.requester, self.ref,
142                 'Open failed:'+str(e), self._data, self._extData)
143             
144     def dataReceived(self, data):
145         self._data += data
146         if OVEConfig.Inst().logTraffic:
147             self.emit(QtCore.SIGNAL('channelData(QObject, int, QString)'), self.requester, self.ref, data)
148         self.testIfDone()
149         
150     def extDataReceived(self, extData):
151         self._extData += extData
152         if OVEConfig.Inst().logTraffic:
153             self.emit(QtCore.SIGNAL('channelExtData(QObject, int, QString)'), self.requester, self.ref, extData)
154
155     def request_exit_status(self, data):
156         # We can get the exit status before the data, so delay calling sendResult until we get both
157         self._status = struct.unpack('>L', data)[0]
158         self.testIfDone()
159         
160     def testIfDone(self):
161         if self._status is not None:
162             if self._status != 0:
163                 self.sendResult() # Failed, so send what we have
164             elif len(self._data) > 0:
165                 # Status == success and we have some data
166                 if self.commandType == 'JSON':
167                     try:
168                         # Decode the JSON data, to confirm that we have all of the data
169                         self._jsonValues = json.read(str(self._data)) # FIXME: Should handle unicode
170                         self.sendResult()
171                     except:
172                         pass # Wait for more data
173                 elif self.commandType == 'framed':
174                     match = self.END_MARKER_RE.search(self._data)
175                     if match:
176                         self._data = self._data[:match.start()] # Remove end marker
177                         self.sendResult()
178                 else:
179                     OVELog('Bad command type')
180
181     def sendResult(self):
182         if self._timerId is not None:
183             self.killTimer(self._timerId)
184         if self.commandType == 'JSON' and self._status == 0 and self._jsonValues is not None:
185             self.emit(QtCore.SIGNAL('channelSuccess(QObject, int, QString, QString, QVariant)'), self.requester, self.ref, self._data, self._extData, QVariant(OVEFetchWrapper(self._jsonValues)))
186         elif self.commandType != 'JSON' and self._status == 0:
187             self.emit(QtCore.SIGNAL('channelSuccess(QObject, int, QString, QString, QVariant)'), self.requester, self.ref, self._data, self._extData, QVariant(None))
188         else:
189             self.emit(QtCore.SIGNAL('channelFailure(QObject, int, QString, QString, QString)'), self.requester, self.ref, 'Remote command failed (rc='+str(self._status)+')', self._data, self._extData)
190         if self._status != self.STATUS_CONNECTION_LOST:
191             try:
192                 self.loseConnection()
193             except Exception, e:
194                 OVELog('OVECommandChannel.sendResult loseConnection error: '+str(e))
195
196     def connectionLost(self, reason):
197         self._extData += '+++ Connection lost'
198         self._status = self.STATUS_CONNECTION_LOST
199         self.sendResult()
200
201     def timerEvent(self, event):
202         if event.timerId() == self._timerId:
203             self._extData += '+++ Timeout'
204             self._status = self.STATUS_TIMEOUT
205             self.sendResult()
206         else:
207             QtCore.QObject.timerEvent(self, event)
208
209 class OVEFetchEvent(QtCore.QEvent):
210     TYPE = QtCore.QEvent.Type(QtCore.QEvent.registerEventType())
211     def __init__(self, ref, data):
212         QtCore.QEvent.__init__(self, self.TYPE)
213         self.ref = ref
214         self.data = data
215
216 class OVEFetchFailEvent(QtCore.QEvent):
217     TYPE = QtCore.QEvent.Type(QtCore.QEvent.registerEventType())
218     def __init__(self, ref, message):
219         QtCore.QEvent.__init__(self, self.TYPE)
220         self.ref = ref
221         self.message = str(message)
222
223 class OVEFetch(QtCore.QObject):
224     instances = {}
225     SEC_TIMEOUT = 10.0
226     
227     def __init__(self, uuid):
228         QtCore.QObject.__init__(self)
229         self._hostUuid = uuid
230         self._config = None
231         self._transport = None
232         self._connection = None
233         self._commandQueue = []
234         self._timerRef = 0
235         self.refs = {}
236         self.messages = {}
237         self.values = {}
238         self.connect(OVEConfig.Inst(), QtCore.SIGNAL("configUpdated()"), self.xon_configUpdated)
239         
240     @classmethod
241     def Inst(cls, uuid):
242         if uuid not in cls.instances:
243             cls.instances[uuid] = OVEFetch(uuid)
244         return cls.instances[uuid]
245
246     @classmethod
247     def startReactor(cls):
248         reactor.runReturn()
249
250     def xon_configUpdated(self):
251         self._config = None
252         self.resetTransport()
253         
254     def xon_connectionService(self, connection):
255         self._connection = connection
256         if self._connection is not None:
257             OVELog('SSH connection to '+self.config()['address'] +' established')
258             for command in self._commandQueue:
259                 # OVELog('Unqueueing '+str(command))
260                 self.execCommand2(*command)
261             self._commandQueue = []
262
263     def xon_channelData(self, requester, ref, data):
264         if OVEConfig.Inst().logTraffic:
265             OVELog('Channel data received: '+str(data))
266
267     def xon_channelExtData(self, requester, ref, data):
268         if OVEConfig.Inst().logTraffic:
269             OVELog('+++ Channel extData (stderr) received: '+str(data))
270
271     def xon_channelFailure(self, requester, ref, message, data, extData):
272         if OVEConfig.Inst().logTraffic:
273             OVELog('+++ Channel failure: '+str(message))
274             OVELog("Closing SSH session due to failure")
275
276         errMessage = message
277         if len(data) > 0:
278             errMessage += '\n+++ Failed command output: '+data
279         if len(extData) > 0:
280             errMessage += '\n+++ Failed command output (stderr): '+extData
281
282         self.refs[requester] = ref # For PySide workaround
283         self.messages[requester] = errMessage # For PySide workaround
284         event = OVEFetchFailEvent(ref, errMessage)
285         QtCore.QCoreApplication.postEvent(requester, event)
286         self.resetTransport()
287         
288     def xon_channelSuccess(self, requester, ref, data, extData, jsonValueVariant):
289         jsonValues = jsonValueVariant.toPyObject()
290         if OVEConfig.Inst().logTraffic:
291             OVELog('--- Channel success')
292         try:
293             if jsonValues is not None:
294                 values = jsonValues.contents
295             else:
296                 values = str(data)
297
298             self.refs[requester] = ref # For PySide workaround
299             self.values[requester] = values # For PySide workaround
300             event = OVEFetchEvent(ref, values)
301             QtCore.QCoreApplication.postEvent(requester, event)
302         except Exception, e:
303             message = ('+++ Failed to decode JSON reply: '+str(e))
304             if len(data) > 0: message += "\n++++++ Data (stdout): "+str(data)
305             if len(extData) > 0: message += '\n++++++ Error (stderr): '+str(extData)
306             self.refs[requester] = ref # For PySide workaround
307             self.messages[requester] = message # For PySide workaround
308             event = OVEFetchFailEvent(ref, message)
309             QtCore.QCoreApplication.postEvent(requester, event)
310
311     # Use for workaround only
312     def snoopRef(self, requester):
313         return self.refs.get(requester, None)
314
315     # Use for workaround only
316     def snoopValues(self, requester):
317         return self.values.get(requester, None)
318
319     # Use for workaround only
320     def snoopMessage(self, requester):
321         return self.messages.get(requester, None)
322
323     def config(self):
324         if self._config is None:
325             self._config = OVEConfig.Inst().hostFromUuid(self._hostUuid)
326
327         return self._config
328     
329     def resetTransport(self):
330         if OVEConfig.Inst().logTraffic:
331             OVELog('Transport reset for '+self.config()['address'])
332         del self._connection
333         del self._transport
334         self._connection = None
335         self._transport = None
336         
337     def transportErrback(self, failure, requester, ref, address):
338         self._timerRef += 1 # Prevent timeout handling
339         self.resetTransport()
340         message = 'Failure connecting to '+address+': '+failure.getErrorMessage()
341         self.refs[requester] = ref # For PySide workaround
342         self.messages[requester] = message # For PySide workaround
343         event = OVEFetchFailEvent(ref, message)
344         QtCore.QCoreApplication.postEvent(requester, event)        
345         
346     def transportTimeout(self, timerRef, requester, ref, address):
347         if self._timerRef == timerRef and self._transport is not None and self._connection is None:
348             message = 'Connection attempt to ' +address+' timed out'
349             self.refs[requester] = ref # For PySide workaround
350             self.messages[requester] = message # For PySide workaround
351             event = OVEFetchFailEvent(ref, message)
352             QtCore.QCoreApplication.postEvent(requester, event)        
353             self.resetTransport()
354
355     def execCommand(self, requester, ref, command, commandType):
356         if OVEConfig.Inst().logTraffic:
357             hostName = (self.config() or {}).get('address', '<Address not set>')
358             OVELog(str(QtCore.QTime.currentTime().toString())+' '+hostName+': Executing '+command)
359         if self._transport is None:
360             self._connection = None
361             self._commandQueue.append((requester, ref, command, commandType))
362             config = self.config()
363             creator = protocol.ClientCreator(reactor, OVEFetchTransport, self)
364             self._transport = creator.connectTCP(config['address'], config.get('port', 22), timeout = self.SEC_TIMEOUT)
365             self._transport.addErrback(self.transportErrback, requester, ref, config['address'])
366             self._timerRef += 1
367             # Set this timer slightly longer than the twisted.conch timeout, as transportErrback can cancel
368             # the timeout and prevent double handling
369             # lambda timerRef = self._timerRef: takes a copy of self._timerRef
370             QtCore.QTimer.singleShot(int((1+self.SEC_TIMEOUT) * 1000), lambda timerRef = self._timerRef: self.transportTimeout(timerRef, requester, ref, config['address']))
371         else:
372             self.execCommand2(requester, ref, command, commandType)
373
374     def execCommand2(self, requester, ref, command, commandType):
375         if self._connection is None:
376             self._commandQueue.append((requester, ref, command, commandType))
377         else:
378             self._connection.execCommand(requester, ref, command, commandType)
379
380     def getTable(self, requester, tableName, ref = QtCore.QObject()):
381         command = '/usr/bin/ovsdb-client transact '+self.config()['connectTarget']+' \'["Open_vSwitch", {"op":"select","table":"'+tableName+'", "where":[]}]\''
382
383         self.execCommand(requester, ref, command, 'JSON')
384         
385     def execCommandFramed(self, requester, ref, command):
386         self.execCommand(requester, ref, command + ' && echo ' + OVECommandChannel.END_MARKER, 'framed')