Global replace of Nicira Networks.
[sliver-openvswitch.git] / ovsdb / ovsdbmonitor / OVEFetch.py
1 # Copyright (c) 2011 Nicira, Inc.
2 # Copyright (c) 2010 Citrix Systems, Inc.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15
16 from OVEStandard import *
17 from OVEConfig import *
18 from OVELogger import *
19 import ovs.json
20
21 # This sequence installs the qt4reactor before twisted gets a chance to install its reactor
22 import qt4reactor
23 globalApp = QtGui.QApplication([])
24 qt4reactor.install()
25
26 try:
27     from twisted.conch.ssh import transport, userauth, connection, common, keys, channel
28     from twisted.internet import defer, protocol, reactor
29     from twisted.application import reactors
30 except Exception, e:
31     print('+++ Python Twisted Conch module is required\n')
32     raise
33
34 class OVEFetchUserAuth(userauth.SSHUserAuthClient):
35     def __init__(self, fetch, *params):
36         userauth.SSHUserAuthClient.__init__(self, *params)
37         self.fetch = fetch
38         self.authFails = 0
39     
40     def getPassword(self):
41         return defer.succeed(self.fetch.config()['password'])
42
43     def ssh_USERAUTH_FAILURE(self, packet):
44         if self.authFails > 0: # We normally get one so ignore.  Real failures send these repeatedly
45             OVELog('Authentication failure for '+self.fetch.config()['address'])
46         self.authFails += 1
47         userauth.SSHUserAuthClient.ssh_USERAUTH_FAILURE(self, packet)
48
49 class OVEFetchConnection(connection.SSHConnection, QtCore.QObject):
50     def __init__(self, fetch, *params):
51         connection.SSHConnection.__init__(self, *params)
52         QtCore.QObject.__init__(self)
53         self.fetch = fetch
54         self._channel = None
55         self._oldChannels = []
56         
57     def serviceStarted(self):
58         self.emit(QtCore.SIGNAL('connectionService(QObject)'), self)
59
60     def serviceStopped(self):
61         self.emit(QtCore.SIGNAL('connectionService(QObject)'), None)
62
63     def execCommand(self, requester, ref, command, commandType):
64         if self._channel is not None:
65             # Don't delete old channels immediately in case they're e.g. going to time out with a failure
66             self._oldChannels.append(self._channel)
67             if len(self._oldChannels) > 90:
68                 # For 30 second timeouts at 1 second refresh interval and three windows open on a single host, need 90 channels
69                 del self._oldChannels[1]
70         self._channel = OVECommandChannel(self.fetch, requester, ref, command, commandType, 2**16, 2**15, self)
71         self.openChannel(self._channel)
72
73     def connectionLost(self, reason):
74         if self._channel is not None:
75             self._channel.connectionLost(reason)
76
77 class OVEFetchTransport(transport.SSHClientTransport, QtCore.QObject):
78     def __init__(self, fetch, *params):
79         # There is no __init__ method for this class
80         # transport.SSHClientTransport.__init__(self, *params)
81         
82         QtCore.QObject.__init__(self)
83         self.fetch = fetch
84         self._connection = None
85         self.connect(self, QtCore.SIGNAL('channelFailure(QObject, int, QString, QString, QString)'), self.fetch.xon_channelFailure)
86         
87     def verifyHostKey(self, hostKey, fingerprint):
88         return defer.succeed(1)
89
90     def connectionSecure(self):
91         self._connection = OVEFetchConnection(self.fetch)
92         QtCore.QObject.connect(self._connection, QtCore.SIGNAL('connectionService(QObject)'), self.fetch.xon_connectionService)
93         self.requestService(
94             OVEFetchUserAuth(self.fetch, self.fetch.config().get('username', 'root'),
95                 self._connection))
96
97     def connectionLost(self, reason):
98         if self._connection is not None:
99             self._connection.connectionLost(reason)
100
101 class OVEFetchWrapper:
102     def __init__(self, contents):
103         self.contents = contents
104
105 class OVECommandChannel(channel.SSHChannel, QtCore.QObject):
106     name = 'session'
107     MSEC_TIMEOUT=10000
108     STATUS_CONNECTION_LOST = 100001
109     STATUS_TIMEOUT = 100002
110     END_MARKER='END-MARKER'
111     END_MARKER_RE=re.compile(r'^END-MARKER$', re.MULTILINE)
112     
113     def __init__(self, fetch, requester, ref, command, commandType, *params):
114         channel.SSHChannel.__init__(self, *params)
115         QtCore.QObject.__init__(self)        
116         self.fetch = fetch
117         self.requester = requester
118         self.ref = ref
119         self.command = command
120         self.commandType= commandType
121         self._data = ''
122         self._extData = ''
123         self._jsonValues = None
124         self._timerId = None
125         self._status = None
126         self.connect(self, QtCore.SIGNAL('channelData(QObject, int, QString)'), self.fetch.xon_channelData)
127         self.connect(self, QtCore.SIGNAL('channelExtData(QObject, int, QString)'), self.fetch.xon_channelExtData)
128         self.connect(self, QtCore.SIGNAL('channelSuccess(QObject, int, QString, QString, QVariant)'), self.fetch.xon_channelSuccess)
129         self.connect(self, QtCore.SIGNAL('channelFailure(QObject, int, QString, QString, QString)'), self.fetch.xon_channelFailure)
130         
131     def openFailed(self, reason):
132         if self._timerId is not None:
133             self.killTimer(self._timerId)
134         self.emit(QtCore.SIGNAL('channelFailure(QObject, int, QString, QString, QString)'), self.requester, self.ref,
135             'Open failed:'+str(reason), '', '')
136
137     def channelOpen(self, ignoredData):
138         try:
139             nsCommand = common.NS(str(self.command))
140             self._timerId = self.startTimer(self.MSEC_TIMEOUT)
141             self.conn.sendRequest(self, 'exec', nsCommand, wantReply=1)
142         except Exception, e:
143             self.emit(QtCore.SIGNAL('channelFailure(QObject, int, QString, QString, QString)'), self.requester, self.ref,
144                 'Open failed:'+str(e), self._data, self._extData)
145             
146     def dataReceived(self, data):
147         self._data += data
148         if OVEConfig.Inst().logTraffic:
149             self.emit(QtCore.SIGNAL('channelData(QObject, int, QString)'), self.requester, self.ref, data)
150         self.testIfDone()
151         
152     def extDataReceived(self, extData):
153         self._extData += extData
154         if OVEConfig.Inst().logTraffic:
155             self.emit(QtCore.SIGNAL('channelExtData(QObject, int, QString)'), self.requester, self.ref, extData)
156
157     def request_exit_status(self, data):
158         # We can get the exit status before the data, so delay calling sendResult until we get both
159         self._status = struct.unpack('>L', data)[0]
160         self.testIfDone()
161         
162     def testIfDone(self):
163         if self._status is not None:
164             if self._status != 0:
165                 self.sendResult() # Failed, so send what we have
166             elif len(self._data) > 0:
167                 # Status == success and we have some data
168                 if self.commandType == 'JSON':
169                     try:
170                         # Decode the JSON data, to confirm that we have all of the data
171                         self._jsonValues = ovs.json.from_string(str(self._data)) # FIXME: Should handle unicode
172                         self.sendResult()
173                     except:
174                         pass # Wait for more data
175                 elif self.commandType == 'framed':
176                     match = self.END_MARKER_RE.search(self._data)
177                     if match:
178                         self._data = self._data[:match.start()] # Remove end marker
179                         self.sendResult()
180                 else:
181                     OVELog('Bad command type')
182
183     def sendResult(self):
184         if self._timerId is not None:
185             self.killTimer(self._timerId)
186         if self.commandType == 'JSON' and self._status == 0 and self._jsonValues is not None:
187             self.emit(QtCore.SIGNAL('channelSuccess(QObject, int, QString, QString, QVariant)'), self.requester, self.ref, self._data, self._extData, QVariant(OVEFetchWrapper(self._jsonValues)))
188         elif self.commandType != 'JSON' and self._status == 0:
189             self.emit(QtCore.SIGNAL('channelSuccess(QObject, int, QString, QString, QVariant)'), self.requester, self.ref, self._data, self._extData, QVariant(None))
190         else:
191             self.emit(QtCore.SIGNAL('channelFailure(QObject, int, QString, QString, QString)'), self.requester, self.ref, 'Remote command failed (rc='+str(self._status)+')', self._data, self._extData)
192         if self._status != self.STATUS_CONNECTION_LOST:
193             try:
194                 self.loseConnection()
195             except Exception, e:
196                 OVELog('OVECommandChannel.sendResult loseConnection error: '+str(e))
197
198     def connectionLost(self, reason):
199         self._extData += '+++ Connection lost'
200         self._status = self.STATUS_CONNECTION_LOST
201         self.sendResult()
202
203     def timerEvent(self, event):
204         if event.timerId() == self._timerId:
205             self._extData += '+++ Timeout'
206             self._status = self.STATUS_TIMEOUT
207             self.sendResult()
208         else:
209             QtCore.QObject.timerEvent(self, event)
210
211 class OVEFetchEvent(QtCore.QEvent):
212     TYPE = QtCore.QEvent.Type(QtCore.QEvent.registerEventType())
213     def __init__(self, ref, data):
214         QtCore.QEvent.__init__(self, self.TYPE)
215         self.ref = ref
216         self.data = data
217
218 class OVEFetchFailEvent(QtCore.QEvent):
219     TYPE = QtCore.QEvent.Type(QtCore.QEvent.registerEventType())
220     def __init__(self, ref, message):
221         QtCore.QEvent.__init__(self, self.TYPE)
222         self.ref = ref
223         self.message = str(message)
224
225 class OVEFetch(QtCore.QObject):
226     instances = {}
227     SEC_TIMEOUT = 10.0
228     
229     def __init__(self, uuid):
230         QtCore.QObject.__init__(self)
231         self._hostUuid = uuid
232         self._config = None
233         self._transport = None
234         self._connection = None
235         self._commandQueue = []
236         self._timerRef = 0
237         self.refs = {}
238         self.messages = {}
239         self.values = {}
240         self.connect(OVEConfig.Inst(), QtCore.SIGNAL("configUpdated()"), self.xon_configUpdated)
241         
242     @classmethod
243     def Inst(cls, uuid):
244         if uuid not in cls.instances:
245             cls.instances[uuid] = OVEFetch(uuid)
246         return cls.instances[uuid]
247
248     @classmethod
249     def startReactor(cls):
250         reactor.runReturn()
251
252     def xon_configUpdated(self):
253         self._config = None
254         self.resetTransport()
255         
256     def xon_connectionService(self, connection):
257         self._connection = connection
258         if self._connection is not None:
259             OVELog('SSH connection to '+self.config()['address'] +' established')
260             for command in self._commandQueue:
261                 # OVELog('Unqueueing '+str(command))
262                 self.execCommand2(*command)
263             self._commandQueue = []
264
265     def xon_channelData(self, requester, ref, data):
266         if OVEConfig.Inst().logTraffic:
267             OVELog('Channel data received: '+str(data))
268
269     def xon_channelExtData(self, requester, ref, data):
270         if OVEConfig.Inst().logTraffic:
271             OVELog('+++ Channel extData (stderr) received: '+str(data))
272
273     def xon_channelFailure(self, requester, ref, message, data, extData):
274         if OVEConfig.Inst().logTraffic:
275             OVELog('+++ Channel failure: '+str(message))
276             OVELog("Closing SSH session due to failure")
277
278         errMessage = message
279         if len(data) > 0:
280             errMessage += '\n+++ Failed command output: '+data
281         if len(extData) > 0:
282             errMessage += '\n+++ Failed command output (stderr): '+extData
283
284         self.refs[requester] = ref # For PySide workaround
285         self.messages[requester] = errMessage # For PySide workaround
286         event = OVEFetchFailEvent(ref, errMessage)
287         QtCore.QCoreApplication.postEvent(requester, event)
288         self.resetTransport()
289         
290     def xon_channelSuccess(self, requester, ref, data, extData, jsonValueVariant):
291         jsonValues = jsonValueVariant.toPyObject()
292         if OVEConfig.Inst().logTraffic:
293             OVELog('--- Channel success')
294         try:
295             if jsonValues is not None:
296                 values = jsonValues.contents
297             else:
298                 values = str(data)
299
300             self.refs[requester] = ref # For PySide workaround
301             self.values[requester] = values # For PySide workaround
302             event = OVEFetchEvent(ref, values)
303             QtCore.QCoreApplication.postEvent(requester, event)
304         except Exception, e:
305             message = ('+++ Failed to decode JSON reply: '+str(e))
306             if len(data) > 0: message += "\n++++++ Data (stdout): "+str(data)
307             if len(extData) > 0: message += '\n++++++ Error (stderr): '+str(extData)
308             self.refs[requester] = ref # For PySide workaround
309             self.messages[requester] = message # For PySide workaround
310             event = OVEFetchFailEvent(ref, message)
311             QtCore.QCoreApplication.postEvent(requester, event)
312
313     # Use for workaround only
314     def snoopRef(self, requester):
315         return self.refs.get(requester, None)
316
317     # Use for workaround only
318     def snoopValues(self, requester):
319         return self.values.get(requester, None)
320
321     # Use for workaround only
322     def snoopMessage(self, requester):
323         return self.messages.get(requester, None)
324
325     def config(self):
326         if self._config is None:
327             self._config = OVEConfig.Inst().hostFromUuid(self._hostUuid)
328
329         return self._config
330     
331     def resetTransport(self):
332         if OVEConfig.Inst().logTraffic:
333             OVELog('Transport reset for '+self.config()['address'])
334         del self._connection
335         del self._transport
336         self._connection = None
337         self._transport = None
338         
339     def transportErrback(self, failure, requester, ref, address):
340         self._timerRef += 1 # Prevent timeout handling
341         self.resetTransport()
342         message = 'Failure connecting to '+address+': '+failure.getErrorMessage()
343         self.refs[requester] = ref # For PySide workaround
344         self.messages[requester] = message # For PySide workaround
345         event = OVEFetchFailEvent(ref, message)
346         QtCore.QCoreApplication.postEvent(requester, event)        
347         
348     def transportTimeout(self, timerRef, requester, ref, address):
349         if self._timerRef == timerRef and self._transport is not None and self._connection is None:
350             message = 'Connection attempt to ' +address+' timed out'
351             self.refs[requester] = ref # For PySide workaround
352             self.messages[requester] = message # For PySide workaround
353             event = OVEFetchFailEvent(ref, message)
354             QtCore.QCoreApplication.postEvent(requester, event)        
355             self.resetTransport()
356
357     def execCommand(self, requester, ref, command, commandType):
358         if OVEConfig.Inst().logTraffic:
359             hostName = (self.config() or {}).get('address', '<Address not set>')
360             OVELog(str(QtCore.QTime.currentTime().toString())+' '+hostName+': Executing '+command)
361         if self._transport is None:
362             self._connection = None
363             self._commandQueue.append((requester, ref, command, commandType))
364             config = self.config()
365             creator = protocol.ClientCreator(reactor, OVEFetchTransport, self)
366             self._transport = creator.connectTCP(config['address'], config.get('port', 22), timeout = self.SEC_TIMEOUT)
367             self._transport.addErrback(self.transportErrback, requester, ref, config['address'])
368             self._timerRef += 1
369             # Set this timer slightly longer than the twisted.conch timeout, as transportErrback can cancel
370             # the timeout and prevent double handling
371             # lambda timerRef = self._timerRef: takes a copy of self._timerRef
372             QtCore.QTimer.singleShot(int((1+self.SEC_TIMEOUT) * 1000), lambda timerRef = self._timerRef: self.transportTimeout(timerRef, requester, ref, config['address']))
373         else:
374             self.execCommand2(requester, ref, command, commandType)
375
376     def execCommand2(self, requester, ref, command, commandType):
377         if self._connection is None:
378             self._commandQueue.append((requester, ref, command, commandType))
379         else:
380             self._connection.execCommand(requester, ref, command, commandType)
381
382     def getTable(self, requester, tableName, ref = QtCore.QObject()):
383         command = '/usr/bin/ovsdb-client transact '+self.config()['connectTarget']+' \'["Open_vSwitch", {"op":"select","table":"'+tableName+'", "where":[]}]\''
384
385         self.execCommand(requester, ref, command, 'JSON')
386         
387     def execCommandFramed(self, requester, ref, command):
388         self.execCommand(requester, ref, command + ' && echo ' + OVECommandChannel.END_MARKER, 'framed')