From 9f64fca59a82e96c4ce4b40684b76d2f97aa12e2 Mon Sep 17 00:00:00 2001 From: Alina Quereilhac Date: Wed, 20 Feb 2013 14:47:46 +0100 Subject: [PATCH] adding first version of ns3wrapper --- src/neco/execution/resource.py | 71 +----- src/neco/resources/linux/node.py | 6 +- src/neco/resources/ns3/ns3wrapper.py | 361 +++++++++++++++++++++++++++ src/neco/util/rmatcher.py | 46 ++++ test/resources/linux/node.py | 6 +- test/resources/ns3/ns3wrapper.py | 204 +++++++++++++++ 6 files changed, 621 insertions(+), 73 deletions(-) create mode 100644 src/neco/resources/ns3/ns3wrapper.py create mode 100644 src/neco/util/rmatcher.py create mode 100644 test/resources/ns3/ns3wrapper.py diff --git a/src/neco/execution/resource.py b/src/neco/execution/resource.py index 30294d39..bbba113e 100644 --- a/src/neco/execution/resource.py +++ b/src/neco/execution/resource.py @@ -1,84 +1,23 @@ import logging import weakref -def match_tags(box, all_tags, exact_tags): - """ returns True if box has required tags """ - tall = set(all_tags) - texact = set(exact_tags) - - if texact and box.connections == texact: - return True - - if tall and tall.issubset(box.connections): - return True - - return False - -def find_boxes(box, all_tags = None, exact_tags = None, max_depth = 1): - """ Look for the connected boxes with the required tags, doing breath-first - search, until max_depth ( max_depth = None will traverse the entire graph ). - """ - if not all_tags and not exact_tags: - msg = "No matching criteria for resources." - raise RuntimeError(msg) - - queue = set() - # enqueue (depth, box) - queue.add((0, box)) - - traversed = set() - traversed.add(box) - - depth = 0 - - result = set() - - while len(q) > 0: - (depth, a) = queue.pop() - if match_tags(a, all_tags, exact_tags): - result.add(a) - - if not max_depth or depth <= max_depth: - depth += 1 - for b in sorted(a.connections): - if b not in traversed: - traversed.add(b) - queue.add((depth, b)) - - return result - class Resource(object): - def __init__(self, box, ec): - self._box = weakref.ref(box) + def __init__(self, ec, guid): + self._guid = guid self._ec = weakref.ref(ec) # Logging loglevel = "debug" self._logger = logging.getLogger("neco.execution.Resource.%s" % - self.box.guid) + self.guid) self._logger.setLevel(getattr(logging, loglevel.upper())) @property - def box(self): - return self._box() + def guid(self): + return self._guid() @property def ec(self): return self._ec() - def find_resources(self, all_tags = None, exact_tags = None, - max_depth = 1): - resources = set() - - boxes = find_boxes(self.box, all_tags, exact_tags, max_depth) - for b in boxes: - r = self.ec.resource(b.guid) - resources.add(r) - - return resources - -class ResourceResolver(object): - def __init__(self): - pass - diff --git a/src/neco/resources/linux/node.py b/src/neco/resources/linux/node.py index 0ec3c16b..e00562f0 100644 --- a/src/neco/resources/linux/node.py +++ b/src/neco/resources/linux/node.py @@ -8,8 +8,8 @@ import os.path import subprocess class LinuxNode(Resource): - def __init__(self, box, ec): - super(LinuxNode, self).__init__(box, ec) + def __init__(self, ec, guif): + super(LinuxNode, self).__init__(ec, guid) self.ip = None self.host = None self.user = None @@ -24,7 +24,7 @@ class LinuxNode(Resource): # Logging loglevel = "debug" self._logger = logging.getLogger("neco.resources.base.LinuxNode.%s" %\ - self.box.guid) + self.guid) self._logger.setLevel(getattr(logging, loglevel.upper())) # For ssh connections we use the ControlMaster option which diff --git a/src/neco/resources/ns3/ns3wrapper.py b/src/neco/resources/ns3/ns3wrapper.py new file mode 100644 index 00000000..a908c1bb --- /dev/null +++ b/src/neco/resources/ns3/ns3wrapper.py @@ -0,0 +1,361 @@ +import logging +import os +import sys +import threading +import uuid + +class NS3Wrapper(object): + def __init__(self, homedir = None): + super(NS3Wrapper, self).__init__() + self._ns3 = None + self._uuid = self.make_uuid() + self._homedir = homedir or os.path.join("/tmp", self._uuid) + self._simulation_thread = None + self._condition = None + + self._started = False + self._stopped = False + + # holds reference to all ns-3 objects in the simulation + self._resources = dict() + + # create home dir (where all simulation related files will end up) + home = os.path.normpath(self.homedir) + if not os.path.exists(home): + os.makedirs(home, 0755) + + # Logging + loglevel = os.environ.get("NS3LOGLEVEL", "debug") + self._logger = logging.getLogger("ns3wrapper.%s" % self.uuid) + self._logger.setLevel(getattr(logging, loglevel.upper())) + hdlr = logging.FileHandler(os.path.join(self.homedir, "ns3wrapper.log")) + formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s') + hdlr.setFormatter(formatter) + self._logger.addHandler(hdlr) + + # Load ns-3 shared libraries and import modules + self._load_ns3_module() + + @property + def ns3(self): + return self._ns3 + + @property + def homedir(self): + return self._homedir + + @property + def uuid(self): + return self._uuid + + @property + def logger(self): + return self._logger + + def make_uuid(self): + return "uuid%s" % uuid.uuid4() + + def singleton(self, clazzname): + uuid = "uuid%s"%clazzname + + if not uuid in self._resources: + if not hasattr(self.ns3, clazzname): + msg = "Type %s not supported" % (typeid) + self.logger.error(msg) + + clazz = getattr(self.ns3, clazzname) + typeid = "ns3::%s" % clazzname + self._resources[uuid] = (clazz, typeid) + + return uuid + + def get_trace(self, trace, offset = None, nbytes = None ): + pass + + def is_running(self): + return self._started and not self._stopped + + def get_resource(self, uuid): + (resource, typeid) = self._resources.get(uuid) + return resource + + def get_typeid(self, uuid): + (resource, typeid) = self._resources.get(uuid) + return typeid + + def create(self, clazzname, *args): + if not hasattr(self.ns3, clazzname): + msg = "Type %s not supported" % (clazzname) + self.logger.error(msg) + + clazz = getattr(self.ns3, clazzname) + #typeid = clazz.GetInstanceTypeId().GetName() + typeid = "ns3::%s" % clazzname + + realargs = [self.get_resource(arg) if \ + str(arg).startswith("uuid") else arg for arg in args] + + resource = clazz(*realargs) + + uuid = self.make_uuid() + self._resources[uuid] = (resource, typeid) + return uuid + + def set(self, uuid, name, value): + resource = self.get_resource(uuid) + + if hasattr(resource, name): + setattr(resource, name, value) + else: + self._set_ns3_attr(uuid, name, value) + + def get(self, name, uuid = None): + resource = self.get_resource(uuid) + + value = None + if hasattr(resource, name): + value = getattr(resource, name) + else: + value = self._get_ns3_attr(uuid, name) + + return value + + def invoke(self, uuid, operation, *args): + resource = self.get_resource(uuid) + typeid = self.get_typeid(uuid) + method = getattr(resource, operation) + + realargs = [self.get_resource(arg) if \ + str(arg).startswith("uuid") else arg for arg in args] + + result = method(*realargs) + + if not result: + return None + + uuid = self.make_uuid() + self._resources[uuid] = (result, typeid) + + return uuid + + def start(self): + self._condition = threading.Condition() + self._simulator_thread = threading.Thread( + target = self._simulator_run, + args = [self._condition]) + self._simulator_thread.setDaemon(True) + self._simulator_thread.start() + self._started = True + + def stop(self, time = None): + if not self.ns3: + return + + if time is None: + self.ns3.Simulator.Stop() + else: + self.ns3.Simulator.Stop(self.ns3.Time(time)) + self._stopped = True + + def shutdown(self): + if self.ns3: + if not self.ns3.Simulator.IsFinished(): + self.stop() + + # TODO!!!! SHOULD WAIT UNTIL THE THREAD FINISHES + if self._simulator_thread: + self._simulator_thread.join() + + self.ns3.Simulator.Destroy() + + self._resources.clear() + + self._ns3 = None + sys.stdout.flush() + sys.stderr.flush() + + def _simulator_run(self, condition): + # Run simulation + self.ns3.Simulator.Run() + # Signal condition on simulation end to notify waiting threads + condition.acquire() + condition.notifyAll() + condition.release() + + def _schedule_event(self, condition, func, *args): + """ Schedules event on running simulation, and wait until + event is executed""" + + def execute_event(contextId, condition, has_event_occurred, func, *args): + try: + func(*args) + finally: + # flag event occured + has_event_occurred[0] = True + # notify condition indicating attribute was set + condition.acquire() + condition.notifyAll() + condition.release() + + # contextId is defined as general context + contextId = long(0xffffffff) + + # delay 0 means that the event is expected to execute inmediately + delay = self.ns3.Seconds(0) + + # flag to indicate that the event occured + # because bool is an inmutable object in python, in order to create a + # bool flag, a list is used as wrapper + has_event_occurred = [False] + condition.acquire() + try: + if not self.ns3.Simulator.IsFinished(): + self.ns3.Simulator.ScheduleWithContext(contextId, delay, execute_event, + condition, has_event_occurred, func, *args) + while not has_event_occurred[0] and not self.ns3.Simulator.IsFinished(): + condition.wait() + finally: + condition.release() + + def _set_ns3_attr(self, uuid, name, value): + resource = self.get_resource(uuid) + ns3_value = self._to_ns3_value(uuid, name, value) + + def set_attr(resource, name, ns3_value): + resource.SetAttribute(name, ns3_value) + + if self._is_running: + # schedule the event in the Simulator + self._schedule_event(self._condition, set_attr, resource, + name, ns3_value) + else: + set_attr(resource, name, ns3_value) + + def _get_ns3_attr(self, uuid, name): + resource = self.get_resource(uuid) + ns3_value = self._create_ns3_value(uuid, name) + + def get_attr(resource, name, ns3_value): + resource.GetAttribute(name, ns3_value) + + if self._is_running: + # schedule the event in the Simulator + self._schedule_event(self._condition, get_attr, resource, + name, ns3_value) + else: + get_attr(resource, name, ns3_value) + + return self._from_ns3_value(uuid, name, ns3_value) + + def _create_ns3_value(self, uuid, name): + typeid = get_typeid(uuid) + TypeId = self.ns3.TypeId() + tid = TypeId.LookupByName(typeid) + info = TypeId.AttributeInformation() + if not tid.LookupAttributeByName(name, info): + msg = "TypeId %s has no attribute %s" % (typeid, name) + self.logger.error(msg) + + checker = info.checker + ns3_value = checker.Create() + return ns3_value + + def _from_ns3_value(self, uuid, name, ns3_value): + typeid = get_typeid(uuid) + TypeId = self.ns3.TypeId() + tid = TypeId.LookupByName(typeid) + info = TypeId.AttributeInformation() + if not tid.LookupAttributeByName(name, info): + msg = "TypeId %s has no attribute %s" % (typeid, name) + self.logger.error(msg) + + checker = info.checker + value = ns3_value.SerializeToString(checker) + + type_name = checker.GetValueTypeName() + if type_name in ["ns3::UintegerValue", "ns3::IntegerValue"]: + return int(value) + if type_name == "ns3::DoubleValue": + return float(value) + if type_name == "ns3::BooleanValue": + return value == "true" + + return value + + def _to_ns3_value(self, uuid, name, value): + typeid = get_typeid(uuid) + TypeId = self.ns3.TypeId() + typeid = TypeId.LookupByName(typeid) + info = TypeId.AttributeInformation() + if not tid.LookupAttributeByName(name, info): + msg = "TypeId %s has no attribute %s" % (typeid, name) + self.logger.error(msg) + + str_value = str(value) + if isinstance(value, bool): + str_value = str_value.lower() + + checker = info.checker + ns3_value = checker.Create() + ns3_value.DeserializeFromString(str_value, checker) + return ns3_value + + def _load_ns3_module(self): + if self.ns3: + return + + import ctypes + import imp + import re + import pkgutil + + bindings = os.environ.get("NS3BINDINGS") + libdir = os.environ.get("NS3LIBRARIES") + + # Load the ns-3 modules shared libraries + if libdir: + files = os.listdir(libdir) + regex = re.compile("(.*\.so)$") + libs = [m.group(1) for filename in files for m in [regex.search(filename)] if m] + + libscp = list(libs) + while len(libs) > 0: + for lib in libs: + libfile = os.path.join(libdir, lib) + try: + ctypes.CDLL(libfile, ctypes.RTLD_GLOBAL) + libs.remove(lib) + except: + pass + + # if did not load any libraries in the last iteration break + # to prevent infinit loop + if len(libscp) == len(libs): + raise RuntimeError("Imposible to load shared libraries %s" % str(libs)) + libscp = list(libs) + + # import the python bindings for the ns-3 modules + if bindings: + sys.path.append(bindings) + + # create a module to add all ns3 classes + ns3mod = imp.new_module("ns3") + sys.modules["ns3"] = ns3mod + + # retrieve all ns3 classes and add them to the ns3 module + import ns + for importer, modname, ispkg in pkgutil.iter_modules(ns.__path__): + fullmodname = "ns.%s" % modname + module = __import__(fullmodname, globals(), locals(), ['*']) + + # netanim.Config singleton overrides ns3::Config + if modname in ['netanim']: + continue + + for sattr in dir(module): + if not sattr.startswith("_"): + attr = getattr(module, sattr) + setattr(ns3mod, sattr, attr) + + self._ns3 = ns3mod + diff --git a/src/neco/util/rmatcher.py b/src/neco/util/rmatcher.py new file mode 100644 index 00000000..08563f69 --- /dev/null +++ b/src/neco/util/rmatcher.py @@ -0,0 +1,46 @@ + +def match_tags(box, all_tags, exact_tags): + """ returns True if box has required tags """ + tall = set(all_tags) + texact = set(exact_tags) + + if texact and box.connections == texact: + return True + + if tall and tall.issubset(box.connections): + return True + + return False + +def find_boxes(box, all_tags = None, exact_tags = None, max_depth = 1): + """ Look for the connected boxes with the required tags, doing breath-first + search, until max_depth ( max_depth = None will traverse the entire graph ). + """ + if not all_tags and not exact_tags: + msg = "No matching criteria for resources." + raise RuntimeError(msg) + + queue = set() + # enqueue (depth, box) + queue.add((0, box)) + + traversed = set() + traversed.add(box) + + depth = 0 + + result = set() + + while len(q) > 0: + (depth, a) = queue.pop() + if match_tags(a, all_tags, exact_tags): + result.add(a) + + if not max_depth or depth <= max_depth: + depth += 1 + for b in sorted(a.connections): + if b not in traversed: + traversed.add(b) + queue.add((depth, b)) + + return result diff --git a/test/resources/linux/node.py b/test/resources/linux/node.py index d3d94704..c3563e36 100755 --- a/test/resources/linux/node.py +++ b/test/resources/linux/node.py @@ -1,6 +1,5 @@ #!/usr/bin/env python from neco.resources.linux.node import LinuxNode -from neco.design.box import Box from neco.util.sshfuncs import RUNNING, FINISHED import os.path @@ -12,7 +11,7 @@ import unittest class DummyEC(object): pass -class LinuxBoxTestCase(unittest.TestCase): +class LinuxNodeTestCase(unittest.TestCase): def setUp(self): host = 'nepi2.pl.sophia.inria.fr' user = 'inria_nepi' @@ -26,10 +25,9 @@ class LinuxBoxTestCase(unittest.TestCase): self.home = '${HOME}/test-app' def create_node(self, host, user): - box = Box() ec = DummyEC() - node = LinuxNode(box, ec) + node = LinuxNode(ec, 1) node.host = host node.user = user diff --git a/test/resources/ns3/ns3wrapper.py b/test/resources/ns3/ns3wrapper.py new file mode 100644 index 00000000..de9faa83 --- /dev/null +++ b/test/resources/ns3/ns3wrapper.py @@ -0,0 +1,204 @@ +#!/usr/bin/env python +# Test based on ns-3 csma/examples/csma-ping.cc file +# +# Network topology +# +# n0 n1 n2 n3 +# | | | | +# ----------------- +# +# node n0 sends IGMP traffic to node n3 + + +from neco.resources.ns3.ns3wrapper import NS3Wrapper + +import os.path +import time +import tempfile +import unittest + +class NS3WrapperTest(unittest.TestCase): + def setUp(self): + pass + + def test_csma_ping(self): + wrapper = NS3Wrapper() + + ### create 4 nodes + # NodeContainer c; + c = wrapper.create("NodeContainer") + + # c.Create (4); + wrapper.invoke(c, "Create", 4) + + ### connect the nodes to a shared channel + # CsmaHelper csma; + csma = wrapper.create("CsmaHelper") + + # csma.SetChannelAttribute ("DataRate", DataRateValue (DataRate (5000000))); + dr = wrapper.create("DataRate", 5000000) + drv = wrapper.create("DataRateValue", dr) + wrapper.invoke(csma, "SetChannelAttribute", "DataRate", drv) + + # csma.SetChannelAttribute ("Delay", TimeValue (MilliSeconds (2))); + ms = wrapper.create("MilliSeconds", 2) + delay = wrapper.create("TimeValue", ms) + wrapper.invoke(csma, "SetChannelAttribute", "Delay", delay) + + # csma.SetDeviceAttribute ("EncapsulationMode", StringValue ("Llc")); + encap = wrapper.create("StringValue", "Llc") + wrapper.invoke(csma, "SetDeviceAttribute", "EncapsulationMode", encap) + + # NetDeviceContainer devs = csma.Install (c); + devs = wrapper.invoke(csma, "Install", c) + + ### add IP stack to all nodes + # InternetStackHelper ipStack; + ipStack = wrapper.create("InternetStackHelper") + + # ipStack.Install (c); + wrapper.invoke(ipStack, "Install", c) + + ### assign ip addresses + #Ipv4AddressHelper ip; + ip = wrapper.create("Ipv4AddressHelper") + + # ip.SetBase ("192.168.1.0", "255.255.255.0"); + ip4 = wrapper.create("Ipv4Address", "192.168.1.0") + mask4 = wrapper.create("Ipv4Mask", "255.255.255.0") + wrapper.invoke(ip, "SetBase", ip4, mask4) + + # Ipv4InterfaceContainer addresses = ip.Assign (devs); + addresses = wrapper.invoke(ip, "Assign", devs) + + ### Create source + config = wrapper.singleton("Config") + + # Config::SetDefault ("ns3::Ipv4RawSocketImpl::Protocol", StringValue ("2")); + proto = wrapper.create("StringValue", "2") + wrapper.invoke(config, "SetDefault", "ns3::Ipv4RawSocketImpl::Protocol", proto) + + # InetSocketAddress dst = InetSocketAddress (addresses.GetAddress (3)); + addr3 = wrapper.invoke(addresses, "GetAddress", 3) + dst = wrapper.create("InetSocketAddress", addr3) + + # OnOffHelper onoff = OnOffHelper ("ns3::Ipv4RawSocketFactory", dst); + onoff = wrapper.create("OnOffHelper", "ns3::Ipv4RawSocketFactory", dst) + + # onoff.SetAttribute ("OnTime", RandomVariableValue (ConstantVariable (1.0))); + cv1 = wrapper.create("ConstantVariable", 1.0) + rand1 = wrapper.create("RandomVariableValue", cv1) + wrapper.invoke(onoff, "SetAttribute", "OnTime", rand1) + + # onoff.SetAttribute ("OffTime", RandomVariableValue (ConstantVariable (0.0))); + cv2 = wrapper.create("ConstantVariable", 0.0) + rand2 = wrapper.create("RandomVariableValue", cv2) + wrapper.invoke(onoff, "SetAttribute", "OffTime", rand2) + + # onoff.SetAttribute ("DataRate", DataRateValue (DataRate (15000))); + dr2 = wrapper.create("DataRate", 15000) + drv2 = wrapper.create("DataRateValue", dr2) + wrapper.invoke(onoff, "SetAttribute", "DataRate", drv2) + + # onoff.SetAttribute ("PacketSize", UintegerValue (1200)); + uiv = wrapper.create("UintegerValue", 1200) + wrapper.invoke(onoff, "SetAttribute", "PacketSize", uiv) + + # ApplicationContainer apps = onoff.Install (c.Get (0)); + n1 = wrapper.invoke(c, "Get", 0) + apps = wrapper.invoke(onoff, "Install", n1) + + # apps.Start (Seconds (1.0)); + s1 = wrapper.create("Seconds", 1.0) + wrapper.invoke(apps, "Start", s1) + + # apps.Stop (Seconds (10.0)); + s2 = wrapper.create("Seconds", 10.0) + wrapper.invoke(apps, "Stop", s2) + + ### create sink + # PacketSinkHelper sink = PacketSinkHelper ("ns3::Ipv4RawSocketFactory", dst); + sink = wrapper.create("PacketSinkHelper", "ns3::Ipv4RawSocketFactory", dst) + + # apps = sink.Install (c.Get (3)); + n3 = wrapper.invoke(c, "Get", 3) + apps = wrapper.invoke (sink, "Install", n3) + + # apps.Start (Seconds (0.0)); + s3 = wrapper.create ("Seconds", 0.0) + wrapper.invoke (apps, "Start", s3) + + # apps.Stop (Seconds (11.0)); + s4 = wrapper.create ("Seconds", 11.0) + wrapper.invoke (apps, "Stop", s4) + + ### create pinger + #V4PingHelper ping = V4PingHelper (addresses.GetAddress (2)); + addr2 = wrapper.invoke(addresses, "GetAddress", 2) + ping = wrapper.create("V4PingHelper", addr2) + + #NodeContainer pingers; + pingers = wrapper.create("NodeContainer") + + #pingers.Add (c.Get (0)); + n0 = wrapper.invoke(c, "Get", 0) + wrapper.invoke(pingers, "Add", n0) + + #pingers.Add (c.Get (1)); + n1 = wrapper.invoke(c, "Get", 1) + wrapper.invoke(pingers, "Add", n1) + + #pingers.Add (c.Get (3)); + n3 = wrapper.invoke(c, "Get", 3) + wrapper.invoke(pingers, "Add", n3) + + #apps = ping.Install (pingers); + apps = wrapper.invoke(ping, "Install", pingers) + + #apps.Start (Seconds (2.0)); + s5 = wrapper.create ("Seconds", 2.0) + wrapper.invoke (apps, "Start", s5) + + #apps.Stop (Seconds (5.0)); + s6 = wrapper.create ("Seconds", 5.0) + wrapper.invoke (apps, "Stop", s6) + + def SinkRx(packet, address): + print packet + + def PingRtt(context, rtt): + print context, rtt + + ### configure tracing + #csma.EnablePcapAll ("csma-ping", false); + wrapper.invoke(csma, "EnablePcapAll", "csma-ping", False) + + # No binging for callback + #Config::ConnectWithoutContext ("/NodeList/3/ApplicationList/0/$ns3::PacketSink/Rx", + # MakeCallback (&SinkRx)); + #cb = wrapper.create("MakeCallback", SinkRx) + #wrapper.invoke(config, "ConnectWithoutContext", + # "/NodeList/3/ApplicationList/0/$ns3::PacketSink/Rx", cb) + + # Config::Connect ("/NodeList/*/ApplicationList/*/$ns3::V4Ping/Rtt", + # MakeCallback (&PingRtt)); + #cb2 = wrapper.create("MakeCallback", PingRtt) + #wrapper.invoke(config, "ConnectWithoutContext", + # "/NodeList/*/ApplicationList/*/$ns3::V4Ping/Rtt", + # cb2) + + # Packet::EnablePrinting (); + packet = wrapper.singleton("Packet") + wrapper.invoke(packet, "EnablePrinting") + + ### run Simulation + # Simulator::Run (); + simulator = wrapper.singleton("Simulator") + wrapper.invoke(simulator, "Run") + + # Simulator::Destroy (); + wrapper.invoke(simulator, "Destroy") + +if __name__ == '__main__': + unittest.main() + -- 2.43.0