X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=src%2Fnepi%2Fresources%2Flinux%2Fns3%2Fns3client.py;h=1dd0c5a73ba18ef7db0e6065bb3cb4182d77d91e;hb=258959a9047e0a84d2b1889863d65808413162ca;hp=972e8005906be63c77c2c200e8b4d166cddbeefc;hpb=e0dd7383a1241e77ba32efa6e6a3cec573b3757d;p=nepi.git diff --git a/src/nepi/resources/linux/ns3/ns3client.py b/src/nepi/resources/linux/ns3/ns3client.py index 972e8005..1dd0c5a7 100644 --- a/src/nepi/resources/linux/ns3/ns3client.py +++ b/src/nepi/resources/linux/ns3/ns3client.py @@ -20,74 +20,99 @@ import base64 import cPickle import errno +import os import socket +import time +import weakref +import threading + from optparse import OptionParser, SUPPRESS_HELP from nepi.resources.ns3.ns3client import NS3Client from nepi.resources.ns3.ns3server import NS3WrapperMessage class LinuxNS3Client(NS3Client): - def __init__(self, socket_name): + def __init__(self, simulation): super(LinuxNS3Client, self).__init__() - self._socket_name = socket_name + self._simulation = weakref.ref(simulation) + self._socket_lock = threading.Lock() @property - def socket_name(self): - return self._socket_name - - def send_msg(self, msg, *args): - args = list(args) - - sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - sock.connect(self.socket_name) - - args.insert(0, msg) - def encode(arg): - arg = cPickle.dumps(arg) - return base64.b64encode(arg) - - encoded = "|".join(map(encode, args)) - sock.send("%s\n" % encoded) - - reply = sock.recv(1024) - return cPickle.loads(base64.b64decode(reply)) - - def create(self, clazzname, *args): - args = list(args) - args.insert(0, clazzname) - - return self.send_msg(NS3WrapperMessage.CREATE, *args) - - def invoke(self, uuid, operation, *args): - args = list(args) - args.insert(0, operation) - args.insert(0, uuid) - - return self.send_msg(NS3WrapperMessage.INVOKE, *args) - - def set(self, uuid, name, value): - args = [uuid, name, value] - - return self.send_msg(NS3WrapperMessage.SET, *args) - - def get(self, uuid, name): - args = [uuid, name] - - return self.send_msg(NS3WrapperMessage.GET, *args) - - def trace(self, *args): - return self.send_msg(NS3WrapperMessage.TRACE, *args) - - def start(self): - return self.send_msg(NS3WrapperMessage.START, []) - - def stop(self, time = None): - args = None - if time: - args = [time] - - return self.send_msg(NS3WrapperMessage.STOP, *args) - - def shutdown(self): - return self.send_msg(NS3WrapperMessage.SHUTDOWN, []) + def simulation(self): + return self._simulation() + + def send_msg(self, msg_type, *args, **kwargs): + msg = [msg_type, args, kwargs] + + def encode(item): + item = cPickle.dumps(item) + return base64.b64encode(item) + + encoded = "|".join(map(encode, msg)) + + with self._socket_lock: + if self.simulation.node.get("hostname") in ['localhost', '127.0.0.1']: + sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + sock.connect(self.simulation.remote_socket) + sock.send("%s\n" % encoded) + reply = sock.recv(1024) + sock.close() + else: + command = ( "python -c 'import socket;" + "sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM);" + "sock.connect(\"%(socket_addr)s\");" + "msg = \"%(encoded_message)s\\n\";" + "sock.send(msg);" + "reply = sock.recv(1024);" + "sock.close();" + "print reply'") % { + "encoded_message": encoded, + "socket_addr": self.simulation.remote_socket, + } + + (reply, err), proc = self.simulation.node.execute(command, + with_lock = True) + + if (err and proc.poll()) or reply.strip() == "": + msg = (" Couldn't connect to remote socket %s - REPLY: %s " + "- ERROR: %s ") % ( + self.simulation.remote_socket, reply, err) + self.simulation.error(msg, reply, err) + raise RuntimeError(msg) + + reply = cPickle.loads(base64.b64decode(reply)) + + return reply + + def create(self, *args, **kwargs): + return self.send_msg(NS3WrapperMessage.CREATE, *args, **kwargs) + + def factory(self, *args, **kwargs): + return self.send_msg(NS3WrapperMessage.FACTORY, *args, **kwargs) + + def invoke(self, *args, **kwargs): + return self.send_msg(NS3WrapperMessage.INVOKE, *args, **kwargs) + + def set(self, *args, **kwargs): + return self.send_msg(NS3WrapperMessage.SET, *args, **kwargs) + + def get(self, *args, **kwargs): + return self.send_msg(NS3WrapperMessage.GET, *args, **kwargs) + + def flush(self, *args, **kwargs): + return self.send_msg(NS3WrapperMessage.FLUSH, *args, **kwargs) + + def start(self, *args, **kwargs): + return self.send_msg(NS3WrapperMessage.START, *args, **kwargs) + + def stop(self, *args, **kwargs): + return self.send_msg(NS3WrapperMessage.STOP, *args, **kwargs) + + def shutdown(self, *args, **kwargs): + try: + return self.send_msg(NS3WrapperMessage.SHUTDOWN, *args, **kwargs) + except: + pass + + return None