ovs.stream: Drop Stream.get_name() since clients can use 'name' directly.
[sliver-openvswitch.git] / python / ovs / jsonrpc.py
1 # Copyright (c) 2010, 2011 Nicira Networks
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import errno
16 import logging
17 import os
18
19 import ovs.json
20 import ovs.poller
21 import ovs.reconnect
22 import ovs.stream
23 import ovs.timeval
24
25 EOF = -1
26
27 class Message(object):
28     T_REQUEST = 0               # Request.
29     T_NOTIFY = 1                # Notification.
30     T_REPLY = 2                 # Successful reply.
31     T_ERROR = 3                 # Error reply.
32
33     __types = {T_REQUEST: "request",
34                T_NOTIFY: "notification",
35                T_REPLY: "reply",
36                T_ERROR: "error"}
37
38     def __init__(self, type_, method, params, result, error, id):
39         self.type = type_
40         self.method = method
41         self.params = params
42         self.result = result
43         self.error = error
44         self.id = id
45
46     _next_id = 0
47     @staticmethod
48     def _create_id():
49         this_id = Message._next_id
50         Message._next_id += 1
51         return this_id
52
53     @staticmethod
54     def create_request(method, params):
55         return Message(Message.T_REQUEST, method, params, None, None,
56                        Message._create_id())
57
58     @staticmethod
59     def create_notify(method, params):
60         return Message(Message.T_NOTIFY, method, params, None, None,
61                        None)
62
63     @staticmethod
64     def create_reply(result, id):
65         return Message(Message.T_REPLY, None, None, result, None, id)
66
67     @staticmethod
68     def create_error(error, id):
69         return Message(Message.T_ERROR, None, None, None, error, id)
70
71     @staticmethod
72     def type_to_string(type_):
73         return Message.__types[type_]
74
75     @staticmethod
76     def __validate_arg(value, name, must_have):
77         if (value is not None) == (must_have != 0):
78             return None
79         else:
80             type_name = Message.type_to_string(self.type)
81             if must_have:
82                 verb = "must"
83             else:
84                 verb = "must not"
85             return "%s %s have \"%s\"" % (type_name, verb, name)
86
87     def is_valid(self):
88         if self.params is not None and type(self.params) != list:
89             return "\"params\" must be JSON array"
90
91         pattern = {Message.T_REQUEST: 0x11001,
92                    Message.T_NOTIFY:  0x11000,
93                    Message.T_REPLY:   0x00101,
94                    Message.T_ERROR:   0x00011}.get(self.type)
95         if pattern is None:
96             return "invalid JSON-RPC message type %s" % self.type
97
98         return (
99             Message.__validate_arg(self.method, "method", pattern & 0x10000) or
100             Message.__validate_arg(self.params, "params", pattern & 0x1000) or
101             Message.__validate_arg(self.result, "result", pattern & 0x100) or
102             Message.__validate_arg(self.error, "error", pattern & 0x10) or
103             Message.__validate_arg(self.id, "id", pattern & 0x1))
104
105     @staticmethod
106     def from_json(json):
107         if type(json) != dict:
108             return "message is not a JSON object"
109
110         # Make a copy to avoid modifying the caller's dict.
111         json = dict(json)
112
113         if "method" in json:
114             method = json.pop("method")
115             if type(method) not in [str, unicode]:
116                 return "method is not a JSON string"
117         else:
118             method = None
119
120         params = json.pop("params", None)
121         result = json.pop("result", None)
122         error = json.pop("error", None)
123         id = json.pop("id", None)
124         if len(json):
125             return "message has unexpected member \"%s\"" % json.popitem()[0]
126
127         if result is not None:
128             msg_type = Message.T_REPLY
129         elif error is not None:
130             msg_type = Message.T_ERROR
131         elif id is not None:
132             msg_type = Message.T_REQUEST
133         else:
134             msg_type = Message.T_NOTIFY
135         
136         msg = Message(msg_type, method, params, result, error, id)
137         validation_error = msg.is_valid()
138         if validation_error is not None:
139             return validation_error
140         else:
141             return msg
142
143     def to_json(self):
144         json = {}
145
146         if self.method is not None:
147             json["method"] = self.method
148
149         if self.params is not None:
150             json["params"] = self.params
151
152         if self.result is not None or self.type == Message.T_ERROR:
153             json["result"] = self.result
154
155         if self.error is not None or self.type == Message.T_REPLY:
156             json["error"] = self.error
157
158         if self.id is not None or self.type == Message.T_NOTIFY:
159             json["id"] = self.id
160
161         return json
162
163     def __str__(self):
164         s = [Message.type_to_string(self.type)]
165         if self.method is not None:
166             s.append("method=\"%s\"" % self.method)
167         if self.params is not None:
168             s.append("params=" + ovs.json.to_string(self.params))
169         if self.error is not None:
170             s.append("error=" + ovs.json.to_string(self.error))
171         if self.id is not None:
172             s.append("id=" + ovs.json.to_string(self.id))
173         return ", ".join(s)
174
175 class Connection(object):
176     def __init__(self, stream):
177         self.name = stream.name
178         self.stream = stream
179         self.status = 0
180         self.input = ""
181         self.output = ""
182         self.parser = None
183
184     def close(self):
185         self.stream.close()
186         self.stream = None
187
188     def run(self):
189         if self.status:
190             return
191
192         while len(self.output):
193             retval = self.stream.send(self.output)
194             if retval >= 0:
195                 self.output = self.output[retval:]
196             else:
197                 if retval != -errno.EAGAIN:
198                     logging.warn("%s: send error: %s" % (self.name,
199                                                          os.strerror(-retval)))
200                     self.error(-retval)
201                 break
202
203     def wait(self, poller):
204         if not self.status:
205             self.stream.run_wait(poller)
206             if len(self.output):
207                 self.stream.send_wait()
208
209     def get_status(self):
210         return self.status
211
212     def get_backlog(self):
213         if self.status != 0:
214             return 0
215         else:
216             return len(self.output)
217
218     def __log_msg(self, title, msg):
219         logging.debug("%s: %s %s" % (self.name, title, msg))
220
221     def send(self, msg):
222         if self.status:
223             return self.status
224
225         self.__log_msg("send", msg)
226
227         was_empty = len(self.output) == 0
228         self.output += ovs.json.to_string(msg.to_json())
229         if was_empty:
230             self.run()
231         return self.status
232
233     def send_block(self, msg):
234         error = self.send(msg)
235         if error:
236             return error
237
238         while True:
239             self.run()
240             if not self.get_backlog() or self.get_status():
241                 return self.status
242
243             poller = ovs.poller.Poller()
244             self.wait(poller)
245             poller.block()
246
247     def recv(self):
248         if self.status:
249             return self.status, None
250
251         while True:
252             if not self.input:
253                 error, data = self.stream.recv(4096)
254                 if error:
255                     if error == errno.EAGAIN:
256                         return error, None
257                     else:
258                         # XXX rate-limit
259                         logging.warning("%s: receive error: %s"
260                                         % (self.name, os.strerror(error)))
261                         self.error(error)
262                         return self.status, None
263                 elif not data:
264                     self.error(EOF)
265                     return EOF, None
266                 else:
267                     self.input += data
268             else:
269                 if self.parser is None:
270                     self.parser = ovs.json.Parser()
271                 self.input = self.input[self.parser.feed(self.input):]
272                 if self.parser.is_done():
273                     msg = self.__process_msg()
274                     if msg:
275                         return 0, msg
276                     else:
277                         return self.status, None
278
279     def recv_block(self):
280         while True:
281             error, msg = self.recv()
282             if error != errno.EAGAIN:
283                 return error, msg
284
285             self.run()
286
287             poller = ovs.poller.Poller()
288             self.wait(poller)
289             self.recv_wait(poller)
290             poller.block()
291     
292     def transact_block(self, request):
293         id = request.id
294
295         error = self.send(request)
296         reply = None
297         while not error:
298             error, reply = self.recv_block()
299             if reply and reply.type == Message.T_REPLY and reply.id == id:
300                 break
301         return error, reply
302
303     def __process_msg(self):
304         json = self.parser.finish()
305         self.parser = None
306         if type(json) in [str, unicode]:
307             # XXX rate-limit
308             logging.warning("%s: error parsing stream: %s" % (self.name, json))
309             self.error(errno.EPROTO)
310             return
311
312         msg = Message.from_json(json)
313         if not isinstance(msg, Message):
314             # XXX rate-limit
315             logging.warning("%s: received bad JSON-RPC message: %s"
316                             % (self.name, msg))
317             self.error(errno.EPROTO)
318             return
319
320         self.__log_msg("received", msg)
321         return msg
322         
323     def recv_wait(self, poller):
324         if self.status or self.input:
325             poller.immediate_wake()
326         else:
327             self.stream.recv_wait(poller)
328
329     def error(self, error):
330         if self.status == 0:
331             self.status = error
332             self.stream.close()
333             self.output = ""
334             
335 class Session(object):
336     """A JSON-RPC session with reconnection."""
337
338     def __init__(self, reconnect, rpc):
339         self.reconnect = reconnect
340         self.rpc = rpc
341         self.stream = None
342         self.pstream = None
343         self.seqno = 0
344
345     @staticmethod
346     def open(name):
347         """Creates and returns a Session that maintains a JSON-RPC session to
348         'name', which should be a string acceptable to ovs.stream.Stream or
349         ovs.stream.PassiveStream's initializer.
350         
351         If 'name' is an active connection method, e.g. "tcp:127.1.2.3", the new
352         session connects and reconnects, with back-off, to 'name'.
353         
354         If 'name' is a passive connection method, e.g. "ptcp:", the new session
355         listens for connections to 'name'.  It maintains at most one connection
356         at any given time.  Any new connection causes the previous one (if any)
357         to be dropped."""
358         reconnect = ovs.reconnect.Reconnect(ovs.timeval.msec())
359         reconnect.set_name(name)
360         reconnect.enable(ovs.timeval.msec())
361
362         if ovs.stream.PassiveStream.is_valid_name(name):
363             self.reconnect.set_passive(True, ovs.timeval.msec())
364
365         return Session(reconnect, None)
366
367     @staticmethod
368     def open_unreliably(jsonrpc):
369         reconnect = ovs.reconnect.Reconnect(ovs.timeval.msec())
370         reconnect.set_quiet(True)
371         reconnect.set_name(jsonrpc.name)
372         reconnect.set_max_tries(0)
373         reconnect.connected(ovs.timeval.msec())
374         return Session(reconnect, jsonrpc)
375
376     def close(self):
377         if self.rpc is not None:
378             self.rpc.close()
379             self.rpc = None
380         if self.stream is not None:
381             self.stream.close()
382             self.stream = None
383         if self.pstream is not None:
384             self.pstream.close()
385             self.pstream = None
386
387     def __disconnect(self):
388         if self.rpc is not None:
389             self.rpc.error(EOF)
390             self.rpc.close()
391             self.rpc = None
392             self.seqno += 1
393         elif self.stream is not None:
394             self.stream.close()
395             self.stream = None
396             self.seqno += 1
397     
398     def __connect(self):
399         self.__disconnect()
400
401         name = self.reconnect.get_name()
402         if not self.reconnect.is_passive():
403             error, self.stream = ovs.stream.Stream.open(name)
404             if not error:
405                 self.reconnect.connecting(ovs.timeval.msec())
406             else:
407                 self.reconnect.connect_failed(ovs.timeval.msec(), error)
408         elif self.pstream is not None:
409             error, self.pstream = ovs.stream.PassiveStream.open(name)
410             if not error:
411                 self.reconnect.listening(ovs.timeval.msec())
412             else:
413                 self.reconnect.connect_failed(ovs.timeval.msec(), error)
414
415         self.seqno += 1
416
417     def run(self):
418         if self.pstream is not None:
419             error, stream = self.pstream.accept()
420             if error == 0:
421                 if self.rpc or self.stream:
422                     # XXX rate-limit
423                     logging.info("%s: new connection replacing active "
424                                  "connection" % self.reconnect.get_name())
425                     self.__disconnect()
426                 self.reconnect.connected(ovs.timeval.msec())
427                 self.rpc = Connection(stream)
428             elif error != errno.EAGAIN:
429                 self.reconnect.listen_error(ovs.timeval.msec(), error)
430                 self.pstream.close()
431                 self.pstream = None
432
433         if self.rpc:
434             self.rpc.run()
435             error = self.rpc.get_status()
436             if error != 0:
437                 self.reconnect.disconnected(ovs.timeval.msec(), error)
438                 self.__disconnect()
439         elif self.stream is not None:
440             self.stream.run()
441             error = self.stream.connect()
442             if error == 0:
443                 self.reconnect.connected(ovs.timeval.msec())
444                 self.rpc = Connection(self.stream)
445                 self.stream = None
446             elif error != errno.EAGAIN:
447                 self.reconnect.connect_failed(ovs.timeval.msec(), error)
448                 self.stream.close()
449                 self.stream = None
450
451         action = self.reconnect.run(ovs.timeval.msec())
452         if action == ovs.reconnect.CONNECT:
453             self.__connect()
454         elif action == ovs.reconnect.DISCONNECT:
455             self.reconnect.disconnected(ovs.timeval.msec(), 0)
456             self.__disconnect()
457         elif action == ovs.reconnect.PROBE:
458             if self.rpc:
459                 request = Message.create_request("echo", [])
460                 request.id = "echo"
461                 self.rpc.send(request)
462         else:
463             assert action == None
464
465     def wait(self, poller):
466         if self.rpc is not None:
467             self.rpc.wait(poller)
468         elif self.stream is not None:
469             self.stream.run_wait(poller)
470             self.stream.connect_wait(poller)
471         if self.pstream is not None:
472             self.pstream.wait(poller)
473         self.reconnect.wait(poller, ovs.timeval.msec())
474
475     def get_backlog(self):
476         if self.rpc is not None:
477             return self.rpc.get_backlog()
478         else:
479             return 0
480
481     def get_name(self):
482         return self.reconnect.get_name()
483
484     def send(self, msg):
485         if self.rpc is not None:
486             return self.rpc.send(msg)
487         else:
488             return errno.ENOTCONN
489
490     def recv(self):
491         if self.rpc is not None:
492             error, msg = self.rpc.recv()
493             if not error:
494                 self.reconnect.received(ovs.timeval.msec())
495                 if msg.type == Message.T_REQUEST and msg.method == "echo":
496                     # Echo request.  Send reply.
497                     self.send(Message.create_reply(msg.params, msg.id))
498                 elif msg.type == Message.T_REPLY and msg.id == "echo":
499                     # It's a reply to our echo request.  Suppress it.
500                     pass
501                 else:
502                     return msg
503         return None
504
505     def recv_wait(self, poller):
506         if self.rpc is not None:
507             self.rpc.recv_wait(poller)
508
509     def is_alive(self):
510         if self.rpc is not None or self.stream is not None:
511             return True
512         else:
513             max_tries = self.reconnect.get_max_tries()
514             return max_tries is None or max_tries > 0
515     
516     def is_connected(self):
517         return self.rpc is not None
518
519     def get_seqno(self):
520         return self.seqno
521
522     def force_reconnect(self):
523         self.reconnect.force_reconnect(ovs.timeval.msec())