X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=python%2Fovs%2Fjsonrpc.py;h=cf08131327b77bd316e6cb462f90f8beb6581ce7;hb=f26ddb5b5fc9f04cf91823e045b41a3ab421330d;hp=906e93c6823680f9b078d2d7594cfd3fd1afbc0f;hpb=8758e8a373338e409d7f2863ee91e01060f35628;p=sliver-openvswitch.git diff --git a/python/ovs/jsonrpc.py b/python/ovs/jsonrpc.py index 906e93c68..cf0813132 100644 --- a/python/ovs/jsonrpc.py +++ b/python/ovs/jsonrpc.py @@ -1,4 +1,4 @@ -# Copyright (c) 2010, 2011 Nicira Networks +# Copyright (c) 2010, 2011, 2012 Nicira, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -13,7 +13,6 @@ # limitations under the License. import errno -import logging import os import ovs.json @@ -21,8 +20,12 @@ import ovs.poller import ovs.reconnect import ovs.stream import ovs.timeval +import ovs.util +import ovs.vlog + +EOF = ovs.util.EOF +vlog = ovs.vlog.Vlog("jsonrpc") -EOF = -1 class Message(object): T_REQUEST = 0 # Request. @@ -34,7 +37,6 @@ class Message(object): T_NOTIFY: "notification", T_REPLY: "reply", T_ERROR: "error"} - __next_id = 0 def __init__(self, type_, method, params, result, error, id): self.type = type_ @@ -45,6 +47,7 @@ class Message(object): self.id = id _next_id = 0 + @staticmethod def _create_id(): this_id = Message._next_id @@ -73,8 +76,7 @@ class Message(object): def type_to_string(type_): return Message.__types[type_] - @staticmethod - def __validate_arg(value, name, must_have): + def __validate_arg(self, value, name, must_have): if (value is not None) == (must_have != 0): return None else: @@ -97,11 +99,11 @@ class Message(object): return "invalid JSON-RPC message type %s" % self.type return ( - Message.__validate_arg(self.method, "method", pattern & 0x10000) or - Message.__validate_arg(self.params, "params", pattern & 0x1000) or - Message.__validate_arg(self.result, "result", pattern & 0x100) or - Message.__validate_arg(self.error, "error", pattern & 0x10) or - Message.__validate_arg(self.id, "id", pattern & 0x1)) + self.__validate_arg(self.method, "method", pattern & 0x10000) or + self.__validate_arg(self.params, "params", pattern & 0x1000) or + self.__validate_arg(self.result, "result", pattern & 0x100) or + self.__validate_arg(self.error, "error", pattern & 0x10) or + self.__validate_arg(self.id, "id", pattern & 0x1)) @staticmethod def from_json(json): @@ -121,7 +123,7 @@ class Message(object): params = json.pop("params", None) result = json.pop("result", None) error = json.pop("error", None) - id = json.pop("id", None) + id_ = json.pop("id", None) if len(json): return "message has unexpected member \"%s\"" % json.popitem()[0] @@ -129,12 +131,12 @@ class Message(object): msg_type = Message.T_REPLY elif error is not None: msg_type = Message.T_ERROR - elif id is not None: + elif id_ is not None: msg_type = Message.T_REQUEST else: msg_type = Message.T_NOTIFY - - msg = Message(msg_type, method, params, result, error, id) + + msg = Message(msg_type, method, params, result, error, id_) validation_error = msg.is_valid() if validation_error is not None: return validation_error @@ -167,15 +169,18 @@ class Message(object): s.append("method=\"%s\"" % self.method) if self.params is not None: s.append("params=" + ovs.json.to_string(self.params)) + if self.result is not None: + s.append("result=" + ovs.json.to_string(self.result)) if self.error is not None: s.append("error=" + ovs.json.to_string(self.error)) if self.id is not None: s.append("id=" + ovs.json.to_string(self.id)) return ", ".join(s) + class Connection(object): def __init__(self, stream): - self.name = stream.get_name() + self.name = stream.name self.stream = stream self.status = 0 self.input = "" @@ -196,8 +201,8 @@ class Connection(object): self.output = self.output[retval:] else: if retval != -errno.EAGAIN: - logging.warn("%s: send error: %s" % (self.name, - os.strerror(-retval))) + vlog.warn("%s: send error: %s" % + (self.name, os.strerror(-retval))) self.error(-retval) break @@ -216,11 +221,8 @@ class Connection(object): else: return len(self.output) - def get_name(self): - return self.name - def __log_msg(self, title, msg): - logging.debug("%s: %s %s" % (self.name, title, msg)) + vlog.dbg("%s: %s %s" % (self.name, title, msg)) def send(self, msg): if self.status: @@ -253,18 +255,18 @@ class Connection(object): return self.status, None while True: - if len(self.input) == 0: + if not self.input: error, data = self.stream.recv(4096) if error: if error == errno.EAGAIN: return error, None else: # XXX rate-limit - logging.warning("%s: receive error: %s" - % (self.name, os.strerror(error))) + vlog.warn("%s: receive error: %s" + % (self.name, os.strerror(error))) self.error(error) return self.status, None - elif len(data) == 0: + elif not data: self.error(EOF) return EOF, None else: @@ -292,15 +294,18 @@ class Connection(object): self.wait(poller) self.recv_wait(poller) poller.block() - + def transact_block(self, request): - id = request.id + id_ = request.id error = self.send(request) reply = None while not error: error, reply = self.recv_block() - if reply and reply.type == Message.T_REPLY and reply.id == id: + if (reply + and (reply.type == Message.T_REPLY + or reply.type == Message.T_ERROR) + and reply.id == id_): break return error, reply @@ -309,23 +314,23 @@ class Connection(object): self.parser = None if type(json) in [str, unicode]: # XXX rate-limit - logging.warning("%s: error parsing stream: %s" % (self.name, json)) + vlog.warn("%s: error parsing stream: %s" % (self.name, json)) self.error(errno.EPROTO) return msg = Message.from_json(json) if not isinstance(msg, Message): # XXX rate-limit - logging.warning("%s: received bad JSON-RPC message: %s" - % (self.name, msg)) + vlog.warn("%s: received bad JSON-RPC message: %s" + % (self.name, msg)) self.error(errno.EPROTO) return self.__log_msg("received", msg) return msg - + def recv_wait(self, poller): - if self.status or len(self.input) > 0: + if self.status or self.input: poller.immediate_wake() else: self.stream.recv_wait(poller) @@ -335,7 +340,8 @@ class Connection(object): self.status = error self.stream.close() self.output = "" - + + class Session(object): """A JSON-RPC session with reconnection.""" @@ -351,10 +357,10 @@ class Session(object): """Creates and returns a Session that maintains a JSON-RPC session to 'name', which should be a string acceptable to ovs.stream.Stream or ovs.stream.PassiveStream's initializer. - + If 'name' is an active connection method, e.g. "tcp:127.1.2.3", the new session connects and reconnects, with back-off, to 'name'. - + If 'name' is a passive connection method, e.g. "ptcp:", the new session listens for connections to 'name'. It maintains at most one connection at any given time. Any new connection causes the previous one (if any) @@ -364,7 +370,10 @@ class Session(object): reconnect.enable(ovs.timeval.msec()) if ovs.stream.PassiveStream.is_valid_name(name): - self.reconnect.set_passive(True, ovs.timeval.msec()) + reconnect.set_passive(True, ovs.timeval.msec()) + + if ovs.stream.stream_or_pstream_needs_probes(name): + reconnect.set_probe_interval(0) return Session(reconnect, None) @@ -372,7 +381,7 @@ class Session(object): def open_unreliably(jsonrpc): reconnect = ovs.reconnect.Reconnect(ovs.timeval.msec()) reconnect.set_quiet(True) - reconnect.set_name(jsonrpc.get_name()) + reconnect.set_name(jsonrpc.name) reconnect.set_max_tries(0) reconnect.connected(ovs.timeval.msec()) return Session(reconnect, jsonrpc) @@ -398,7 +407,7 @@ class Session(object): self.stream.close() self.stream = None self.seqno += 1 - + def __connect(self): self.__disconnect() @@ -424,8 +433,8 @@ class Session(object): if error == 0: if self.rpc or self.stream: # XXX rate-limit - logging.info("%s: new connection replacing active " - "connection" % self.reconnect.get_name()) + vlog.info("%s: new connection replacing active " + "connection" % self.reconnect.get_name()) self.__disconnect() self.reconnect.connected(ovs.timeval.msec()) self.rpc = Connection(stream) @@ -516,7 +525,7 @@ class Session(object): else: max_tries = self.reconnect.get_max_tries() return max_tries is None or max_tries > 0 - + def is_connected(self): return self.rpc is not None