X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=src%2Fnepi%2Fresources%2Fns3%2Fns3server.py;h=8b3cd79a6a28ee7b0a8be68a9c66283958c83700;hb=6285ca51026efb69642eea9dfc7c480e722d84a9;hp=f538259af89eab701da612c95d5686b84f6277cf;hpb=fdbe8428159db491e9498449443be4d093cd9b8a;p=nepi.git diff --git a/src/nepi/resources/ns3/ns3server.py b/src/nepi/resources/ns3/ns3server.py index f538259a..8b3cd79a 100644 --- a/src/nepi/resources/ns3/ns3server.py +++ b/src/nepi/resources/ns3/ns3server.py @@ -3,9 +3,8 @@ # Copyright (C) 2014 INRIA # # This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. +# it under the terms of the GNU General Public License version 2 as +# published by the Free Software Foundation; # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of @@ -20,74 +19,95 @@ import base64 import cPickle import errno +import logging +import os import socket +import sys + from optparse import OptionParser, SUPPRESS_HELP from ns3wrapper import NS3Wrapper class NS3WrapperMessage: CREATE = "CREATE" + FACTORY = "FACTORY" INVOKE = "INVOKE" SET = "SET" GET = "GET" - TRACE = "TRACE" + FLUSH = "FLUSH" START = "START" STOP = "STOP" SHUTDOWN = "SHUTDOWN" -def handle_message(ns3_wrapper, msg, args): - if msg == NS3WrapperMessage.SHUTDOWN: +def handle_message(ns3_wrapper, msg_type, args, kwargs): + if msg_type == NS3WrapperMessage.SHUTDOWN: ns3_wrapper.shutdown() + return "BYEBYE" - if msg == NS3WrapperMessage.STOP: - time = None - if args: - time = args[0] + if msg_type == NS3WrapperMessage.STOP: + time = kwargs.get("time") ns3_wrapper.stop(time=time) + return "STOPPED" - if msg == NS3WrapperMessage.START: + if msg_type == NS3WrapperMessage.START: ns3_wrapper.start() + return "STARTED" - if msg == NS3WrapperMessage.CREATE: + if msg_type == NS3WrapperMessage.CREATE: clazzname = args.pop(0) - uuid = ns3_wrapper.create(clazzname, *args) - return uuid + return ns3_wrapper.create(clazzname, *args) + + if msg_type == NS3WrapperMessage.FACTORY: + type_name = args.pop(0) - if msg == NS3WrapperMessage.INVOKE: + return ns3_wrapper.factory(type_name, **kwargs) + + if msg_type == NS3WrapperMessage.INVOKE: uuid = args.pop(0) operation = args.pop(0) - - uuid = ns3_wrapper.invoke(uuid, operation, *args) - return uuid + + return ns3_wrapper.invoke(uuid, operation, *args, **kwargs) - if msg == NS3WrapperMessage.GET: + if msg_type == NS3WrapperMessage.GET: uuid = args.pop(0) name = args.pop(0) - value = ns3_wrapper.get(uuid, name) - return value - - if msg == NS3WrapperMessage.SET: + return ns3_wrapper.get(uuid, name) + + if msg_type == NS3WrapperMessage.SET: uuid = args.pop(0) name = args.pop(0) value = args.pop(0) - value = ns3_wrapper.set(uuid, name, value) - return value - - if msg == NS3WrapperMessage.TRACE: - return "NOT IMPLEMENTED" + return ns3_wrapper.set(uuid, name, value) + + if msg_type == NS3WrapperMessage.FLUSH: + # Forces flushing output and error streams. + # NS-3 output will stay unflushed until the program exits or + # explicit invocation flush is done + sys.stdout.flush() + sys.stderr.flush() + + ns3_wrapper.logger.debug("FLUSHED") + + return "FLUSHED" -def create_socket(socket_name): +def open_socket(socket_name): sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) sock.bind(socket_name) return sock +def close_socket(sock): + try: + sock.close() + except: + pass + def recv_msg(conn): msg = [] chunk = '' @@ -95,7 +115,7 @@ def recv_msg(conn): while '\n' not in chunk: try: chunk = conn.recv(1024) - except (OSError, socket.error), e: + except (OSError, socket.error) as e: if e[0] != errno.EINTR: raise # Ignore eintr errors @@ -107,27 +127,32 @@ def recv_msg(conn): # empty chunk = EOF break - msg = ''.join(msg).split('\n')[0] + msg = ''.join(msg).strip() + + # The message is formatted as follows: + # MESSAGE_TYPE|args|kwargs + # + # where MESSAGE_TYPE, args and kwargs are pickld and enoded in base64 - # The message might have arguments that will be appended - # as a '|' separated list after the message identifier - def decode(arg): - arg = base64.b64decode(arg).rstrip() - return cPickle.loads(arg) + def decode(item): + item = base64.b64decode(item).rstrip() + return cPickle.loads(item) - dargs = map(decode, msg.split("|")) + decoded = map(decode, msg.split("|")) # decoded message - dmsg = dargs.pop(0) + dmsg_type = decoded.pop(0) + dargs = list(decoded.pop(0)) # transforming touple into list + dkwargs = decoded.pop(0) - return (dmsg, dargs) + return (dmsg_type, dargs, dkwargs) def send_reply(conn, reply): encoded = base64.b64encode(cPickle.dumps(reply)) conn.send("%s\n" % encoded) def get_options(): - usage = ("usage: %prog -S ") + usage = ("usage: %prog -S -L -D -v ") parser = OptionParser(usage = usage) @@ -135,15 +160,41 @@ def get_options(): help = "Name for the unix socket used to interact with this process", default = "tap.sock", type="str") + parser.add_option("-L", "--ns-log", dest="ns_log", + help = "NS_LOG environmental variable to be set", + default = "", type="str") + + parser.add_option("-D", "--enable-dump", dest="enable_dump", + help = "Enable dumping the remote executed ns-3 commands to a script " + "in order to later reproduce and debug the experiment", + action = "store_true", + default = False) + + parser.add_option("-v", "--verbose", + help="Print debug output", + action="store_true", + dest="verbose", default=False) + (options, args) = parser.parse_args() - return options.socket_name + return (options.socket_name, options.verbose, options.ns_log, + options.enable_dump) + +def run_server(socket_name, level = logging.INFO, ns_log = None, + enable_dump = False): + + # Sets NS_LOG environmental variable for NS debugging + if ns_log: + os.environ["NS_LOG"] = ns_log + + ###### ns-3 wrapper instantiation -def run_server(socket_name): - ns3_wrapper = NS3Wrapper() + ns3_wrapper = NS3Wrapper(loglevel=level, enable_dump = enable_dump) + + ns3_wrapper.logger.info("STARTING...") # create unix socket to receive instructions - sock = create_socket(socket_name) + sock = open_socket(socket_name) sock.listen(0) # wait for messages to arrive and process them @@ -151,33 +202,57 @@ def run_server(socket_name): while not stop: conn, addr = sock.accept() - conn.settimeout(5) + conn.settimeout(30) try: - (msg, args) = recv_msg(conn) - except socket.timeout, e: + (msg_type, args, kwargs) = recv_msg(conn) + except socket.timeout as e: # Ingore time-out + close_socket(conn) continue - if not msg: + if not msg_type: # Ignore - connection lost - break - - ns3_wrapper.logger.debug("Message received %s args %s" % ( msg, str(args))) + close_socket(conn) + continue - if msg == NS3WrapperMessage.SHUTDOWN: + if msg_type == NS3WrapperMessage.SHUTDOWN: stop = True - - reply = handle_message(ns3_wrapper, msg, args) + + try: + reply = handle_message(ns3_wrapper, msg_type, args, kwargs) + except: + import traceback + err = traceback.format_exc() + ns3_wrapper.logger.error(err) + close_socket(conn) + raise try: send_reply(conn, reply) except socket.error: - break + import traceback + err = traceback.format_exc() + ns3_wrapper.logger.error(err) + close_socket(conn) + raise + + close_socket(conn) + + close_socket(sock) + + ns3_wrapper.logger.info("EXITING...") if __name__ == '__main__': - socket_name = get_options() + (socket_name, verbose, ns_log, enable_dump) = get_options() + + ## configure logging + FORMAT = "%(asctime)s %(name)s %(levelname)-4s %(message)s" + level = logging.DEBUG if verbose else logging.INFO + + logging.basicConfig(format = FORMAT, level = level) - run_server(socket_name) + ## Run the server + run_server(socket_name, level, ns_log, enable_dump)