X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=src%2Fnepi%2Fresources%2Fns3%2Fns3server.py;h=8b3cd79a6a28ee7b0a8be68a9c66283958c83700;hb=6285ca51026efb69642eea9dfc7c480e722d84a9;hp=becf60570f41ec6f87821331c39fce4db9dab5bd;hpb=68adac66099b08e3daae7a84b29af0f7c69ee955;p=nepi.git diff --git a/src/nepi/resources/ns3/ns3server.py b/src/nepi/resources/ns3/ns3server.py index becf6057..8b3cd79a 100644 --- a/src/nepi/resources/ns3/ns3server.py +++ b/src/nepi/resources/ns3/ns3server.py @@ -3,9 +3,8 @@ # Copyright (C) 2014 INRIA # # This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. +# it under the terms of the GNU General Public License version 2 as +# published by the Free Software Foundation; # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of @@ -35,89 +34,80 @@ class NS3WrapperMessage: INVOKE = "INVOKE" SET = "SET" GET = "GET" - TRACE = "TRACE" + FLUSH = "FLUSH" START = "START" STOP = "STOP" SHUTDOWN = "SHUTDOWN" -def handle_message(ns3_wrapper, msg, args): - if msg == NS3WrapperMessage.SHUTDOWN: +def handle_message(ns3_wrapper, msg_type, args, kwargs): + if msg_type == NS3WrapperMessage.SHUTDOWN: ns3_wrapper.shutdown() - - ns3_wrapper.logger.debug("SHUTDOWN") - + return "BYEBYE" - if msg == NS3WrapperMessage.STOP: - time = None - if args: - time = args[0] - - ns3_wrapper.logger.debug("STOP time=%s" % str(time)) + if msg_type == NS3WrapperMessage.STOP: + time = kwargs.get("time") ns3_wrapper.stop(time=time) - return "STOPPED" - if msg == NS3WrapperMessage.START: - ns3_wrapper.logger.debug("START") + return "STOPPED" + if msg_type == NS3WrapperMessage.START: ns3_wrapper.start() + return "STARTED" - if msg == NS3WrapperMessage.CREATE: + if msg_type == NS3WrapperMessage.CREATE: clazzname = args.pop(0) - ns3_wrapper.logger.debug("CREATE %s %s" % (clazzname, str(args))) - - uuid = ns3_wrapper.create(clazzname, *args) - return uuid - - if msg == NS3WrapperMessage.FACTORY: + return ns3_wrapper.create(clazzname, *args) + + if msg_type == NS3WrapperMessage.FACTORY: type_name = args.pop(0) - kwargs = args.pop(0) - - ns3_wrapper.logger.debug("FACTORY %s %s" % (type_name, str(kwargs))) - uuid = ns3_wrapper.factory(type_name, **kwargs) - return uuid - - if msg == NS3WrapperMessage.INVOKE: + return ns3_wrapper.factory(type_name, **kwargs) + + if msg_type == NS3WrapperMessage.INVOKE: uuid = args.pop(0) operation = args.pop(0) - - ns3_wrapper.logger.debug("INVOKE %s %s %s" % (uuid, operation, str(args))) - - uuid = ns3_wrapper.invoke(uuid, operation, *args) - return uuid + + return ns3_wrapper.invoke(uuid, operation, *args, **kwargs) - if msg == NS3WrapperMessage.GET: + if msg_type == NS3WrapperMessage.GET: uuid = args.pop(0) name = args.pop(0) - ns3_wrapper.logger.debug("GET %s %s" % (uuid, name)) - - value = ns3_wrapper.get(uuid, name) - return value - - if msg == NS3WrapperMessage.SET: + return ns3_wrapper.get(uuid, name) + + if msg_type == NS3WrapperMessage.SET: uuid = args.pop(0) name = args.pop(0) value = args.pop(0) - ns3_wrapper.logger.debug("SET %s %s" % (uuid, name, str(value))) + return ns3_wrapper.set(uuid, name, value) - value = ns3_wrapper.set(uuid, name, value) - return value - - if msg == NS3WrapperMessage.TRACE: - ns3_wrapper.logger.debug("TRACE") - return "NOT IMPLEMENTED" + if msg_type == NS3WrapperMessage.FLUSH: + # Forces flushing output and error streams. + # NS-3 output will stay unflushed until the program exits or + # explicit invocation flush is done + sys.stdout.flush() + sys.stderr.flush() + + ns3_wrapper.logger.debug("FLUSHED") + + return "FLUSHED" -def create_socket(socket_name): +def open_socket(socket_name): sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) sock.bind(socket_name) return sock +def close_socket(sock): + try: + sock.close() + except: + pass + def recv_msg(conn): msg = [] chunk = '' @@ -125,7 +115,7 @@ def recv_msg(conn): while '\n' not in chunk: try: chunk = conn.recv(1024) - except (OSError, socket.error), e: + except (OSError, socket.error) as e: if e[0] != errno.EINTR: raise # Ignore eintr errors @@ -137,27 +127,32 @@ def recv_msg(conn): # empty chunk = EOF break - msg = ''.join(msg).split('\n')[0] + msg = ''.join(msg).strip() - # The message might have arguments that will be appended - # as a '|' separated list after the message identifier - def decode(arg): - arg = base64.b64decode(arg).rstrip() - return cPickle.loads(arg) + # The message is formatted as follows: + # MESSAGE_TYPE|args|kwargs + # + # where MESSAGE_TYPE, args and kwargs are pickld and enoded in base64 - dargs = map(decode, msg.split("|")) + def decode(item): + item = base64.b64decode(item).rstrip() + return cPickle.loads(item) + + decoded = map(decode, msg.split("|")) # decoded message - dmsg = dargs.pop(0) + dmsg_type = decoded.pop(0) + dargs = list(decoded.pop(0)) # transforming touple into list + dkwargs = decoded.pop(0) - return (dmsg, dargs) + return (dmsg_type, dargs, dkwargs) def send_reply(conn, reply): encoded = base64.b64encode(cPickle.dumps(reply)) conn.send("%s\n" % encoded) def get_options(): - usage = ("usage: %prog -S -L -H -v ") + usage = ("usage: %prog -S -L -D -v ") parser = OptionParser(usage = usage) @@ -169,9 +164,11 @@ def get_options(): help = "NS_LOG environmental variable to be set", default = "", type="str") - parser.add_option("-H", "--homedir", dest="homedir", - help = "Home directory where to store results", - default = "", type="str") + parser.add_option("-D", "--enable-dump", dest="enable_dump", + help = "Enable dumping the remote executed ns-3 commands to a script " + "in order to later reproduce and debug the experiment", + action = "store_true", + default = False) parser.add_option("-v", "--verbose", help="Print debug output", @@ -180,11 +177,11 @@ def get_options(): (options, args) = parser.parse_args() - return (options.socket_name, options.homedir, options.verbose, - options.ns_log) + return (options.socket_name, options.verbose, options.ns_log, + options.enable_dump) -def run_server(socket_name, homedir = None, level = logging.INFO, - ns_log = None): +def run_server(socket_name, level = logging.INFO, ns_log = None, + enable_dump = False): # Sets NS_LOG environmental variable for NS debugging if ns_log: @@ -192,12 +189,12 @@ def run_server(socket_name, homedir = None, level = logging.INFO, ###### ns-3 wrapper instantiation - ns3_wrapper = NS3Wrapper(homedir = homedir, loglevel=level) + ns3_wrapper = NS3Wrapper(loglevel=level, enable_dump = enable_dump) ns3_wrapper.logger.info("STARTING...") # create unix socket to receive instructions - sock = create_socket(socket_name) + sock = open_socket(socket_name) sock.listen(0) # wait for messages to arrive and process them @@ -205,39 +202,50 @@ def run_server(socket_name, homedir = None, level = logging.INFO, while not stop: conn, addr = sock.accept() - conn.settimeout(5) + conn.settimeout(30) try: - (msg, args) = recv_msg(conn) - except socket.timeout, e: + (msg_type, args, kwargs) = recv_msg(conn) + except socket.timeout as e: # Ingore time-out + close_socket(conn) continue - if not msg: + if not msg_type: # Ignore - connection lost - break + close_socket(conn) + continue - if msg == NS3WrapperMessage.SHUTDOWN: + if msg_type == NS3WrapperMessage.SHUTDOWN: stop = True try: - reply = handle_message(ns3_wrapper, msg, args) + reply = handle_message(ns3_wrapper, msg_type, args, kwargs) except: import traceback err = traceback.format_exc() ns3_wrapper.logger.error(err) + close_socket(conn) raise try: send_reply(conn, reply) except socket.error: - break + import traceback + err = traceback.format_exc() + ns3_wrapper.logger.error(err) + close_socket(conn) + raise + close_socket(conn) + + close_socket(sock) + ns3_wrapper.logger.info("EXITING...") if __name__ == '__main__': - (socket_name, homedir, verbose, ns_log) = get_options() + (socket_name, verbose, ns_log, enable_dump) = get_options() ## configure logging FORMAT = "%(asctime)s %(name)s %(levelname)-4s %(message)s" @@ -245,12 +253,6 @@ if __name__ == '__main__': logging.basicConfig(format = FORMAT, level = level) - # Make sure to send DEBUG messages to stdout instead of stderr - root = logging.getLogger() - handler = logging.StreamHandler(sys.stdout) - handler.setLevel(logging.DEBUG) - root.addHandler(handler) - ## Run the server - run_server(socket_name, homedir, level, ns_log) + run_server(socket_name, level, ns_log, enable_dump)