import logging
import weakref
-def match_tags(box, all_tags, exact_tags):
- """ returns True if box has required tags """
- tall = set(all_tags)
- texact = set(exact_tags)
-
- if texact and box.connections == texact:
- return True
-
- if tall and tall.issubset(box.connections):
- return True
-
- return False
-
-def find_boxes(box, all_tags = None, exact_tags = None, max_depth = 1):
- """ Look for the connected boxes with the required tags, doing breath-first
- search, until max_depth ( max_depth = None will traverse the entire graph ).
- """
- if not all_tags and not exact_tags:
- msg = "No matching criteria for resources."
- raise RuntimeError(msg)
-
- queue = set()
- # enqueue (depth, box)
- queue.add((0, box))
-
- traversed = set()
- traversed.add(box)
-
- depth = 0
-
- result = set()
-
- while len(q) > 0:
- (depth, a) = queue.pop()
- if match_tags(a, all_tags, exact_tags):
- result.add(a)
-
- if not max_depth or depth <= max_depth:
- depth += 1
- for b in sorted(a.connections):
- if b not in traversed:
- traversed.add(b)
- queue.add((depth, b))
-
- return result
-
class Resource(object):
- def __init__(self, box, ec):
- self._box = weakref.ref(box)
+ def __init__(self, ec, guid):
+ self._guid = guid
self._ec = weakref.ref(ec)
# Logging
loglevel = "debug"
self._logger = logging.getLogger("neco.execution.Resource.%s" %
- self.box.guid)
+ self.guid)
self._logger.setLevel(getattr(logging, loglevel.upper()))
@property
- def box(self):
- return self._box()
+ def guid(self):
+ return self._guid()
@property
def ec(self):
return self._ec()
- def find_resources(self, all_tags = None, exact_tags = None,
- max_depth = 1):
- resources = set()
-
- boxes = find_boxes(self.box, all_tags, exact_tags, max_depth)
- for b in boxes:
- r = self.ec.resource(b.guid)
- resources.add(r)
-
- return resources
-
-class ResourceResolver(object):
- def __init__(self):
- pass
-
import subprocess
class LinuxNode(Resource):
- def __init__(self, box, ec):
- super(LinuxNode, self).__init__(box, ec)
+ def __init__(self, ec, guif):
+ super(LinuxNode, self).__init__(ec, guid)
self.ip = None
self.host = None
self.user = None
# Logging
loglevel = "debug"
self._logger = logging.getLogger("neco.resources.base.LinuxNode.%s" %\
- self.box.guid)
+ self.guid)
self._logger.setLevel(getattr(logging, loglevel.upper()))
# For ssh connections we use the ControlMaster option which
--- /dev/null
+import logging
+import os
+import sys
+import threading
+import uuid
+
+class NS3Wrapper(object):
+ def __init__(self, homedir = None):
+ super(NS3Wrapper, self).__init__()
+ self._ns3 = None
+ self._uuid = self.make_uuid()
+ self._homedir = homedir or os.path.join("/tmp", self._uuid)
+ self._simulation_thread = None
+ self._condition = None
+
+ self._started = False
+ self._stopped = False
+
+ # holds reference to all ns-3 objects in the simulation
+ self._resources = dict()
+
+ # create home dir (where all simulation related files will end up)
+ home = os.path.normpath(self.homedir)
+ if not os.path.exists(home):
+ os.makedirs(home, 0755)
+
+ # Logging
+ loglevel = os.environ.get("NS3LOGLEVEL", "debug")
+ self._logger = logging.getLogger("ns3wrapper.%s" % self.uuid)
+ self._logger.setLevel(getattr(logging, loglevel.upper()))
+ hdlr = logging.FileHandler(os.path.join(self.homedir, "ns3wrapper.log"))
+ formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
+ hdlr.setFormatter(formatter)
+ self._logger.addHandler(hdlr)
+
+ # Load ns-3 shared libraries and import modules
+ self._load_ns3_module()
+
+ @property
+ def ns3(self):
+ return self._ns3
+
+ @property
+ def homedir(self):
+ return self._homedir
+
+ @property
+ def uuid(self):
+ return self._uuid
+
+ @property
+ def logger(self):
+ return self._logger
+
+ def make_uuid(self):
+ return "uuid%s" % uuid.uuid4()
+
+ def singleton(self, clazzname):
+ uuid = "uuid%s"%clazzname
+
+ if not uuid in self._resources:
+ if not hasattr(self.ns3, clazzname):
+ msg = "Type %s not supported" % (typeid)
+ self.logger.error(msg)
+
+ clazz = getattr(self.ns3, clazzname)
+ typeid = "ns3::%s" % clazzname
+ self._resources[uuid] = (clazz, typeid)
+
+ return uuid
+
+ def get_trace(self, trace, offset = None, nbytes = None ):
+ pass
+
+ def is_running(self):
+ return self._started and not self._stopped
+
+ def get_resource(self, uuid):
+ (resource, typeid) = self._resources.get(uuid)
+ return resource
+
+ def get_typeid(self, uuid):
+ (resource, typeid) = self._resources.get(uuid)
+ return typeid
+
+ def create(self, clazzname, *args):
+ if not hasattr(self.ns3, clazzname):
+ msg = "Type %s not supported" % (clazzname)
+ self.logger.error(msg)
+
+ clazz = getattr(self.ns3, clazzname)
+ #typeid = clazz.GetInstanceTypeId().GetName()
+ typeid = "ns3::%s" % clazzname
+
+ realargs = [self.get_resource(arg) if \
+ str(arg).startswith("uuid") else arg for arg in args]
+
+ resource = clazz(*realargs)
+
+ uuid = self.make_uuid()
+ self._resources[uuid] = (resource, typeid)
+ return uuid
+
+ def set(self, uuid, name, value):
+ resource = self.get_resource(uuid)
+
+ if hasattr(resource, name):
+ setattr(resource, name, value)
+ else:
+ self._set_ns3_attr(uuid, name, value)
+
+ def get(self, name, uuid = None):
+ resource = self.get_resource(uuid)
+
+ value = None
+ if hasattr(resource, name):
+ value = getattr(resource, name)
+ else:
+ value = self._get_ns3_attr(uuid, name)
+
+ return value
+
+ def invoke(self, uuid, operation, *args):
+ resource = self.get_resource(uuid)
+ typeid = self.get_typeid(uuid)
+ method = getattr(resource, operation)
+
+ realargs = [self.get_resource(arg) if \
+ str(arg).startswith("uuid") else arg for arg in args]
+
+ result = method(*realargs)
+
+ if not result:
+ return None
+
+ uuid = self.make_uuid()
+ self._resources[uuid] = (result, typeid)
+
+ return uuid
+
+ def start(self):
+ self._condition = threading.Condition()
+ self._simulator_thread = threading.Thread(
+ target = self._simulator_run,
+ args = [self._condition])
+ self._simulator_thread.setDaemon(True)
+ self._simulator_thread.start()
+ self._started = True
+
+ def stop(self, time = None):
+ if not self.ns3:
+ return
+
+ if time is None:
+ self.ns3.Simulator.Stop()
+ else:
+ self.ns3.Simulator.Stop(self.ns3.Time(time))
+ self._stopped = True
+
+ def shutdown(self):
+ if self.ns3:
+ if not self.ns3.Simulator.IsFinished():
+ self.stop()
+
+ # TODO!!!! SHOULD WAIT UNTIL THE THREAD FINISHES
+ if self._simulator_thread:
+ self._simulator_thread.join()
+
+ self.ns3.Simulator.Destroy()
+
+ self._resources.clear()
+
+ self._ns3 = None
+ sys.stdout.flush()
+ sys.stderr.flush()
+
+ def _simulator_run(self, condition):
+ # Run simulation
+ self.ns3.Simulator.Run()
+ # Signal condition on simulation end to notify waiting threads
+ condition.acquire()
+ condition.notifyAll()
+ condition.release()
+
+ def _schedule_event(self, condition, func, *args):
+ """ Schedules event on running simulation, and wait until
+ event is executed"""
+
+ def execute_event(contextId, condition, has_event_occurred, func, *args):
+ try:
+ func(*args)
+ finally:
+ # flag event occured
+ has_event_occurred[0] = True
+ # notify condition indicating attribute was set
+ condition.acquire()
+ condition.notifyAll()
+ condition.release()
+
+ # contextId is defined as general context
+ contextId = long(0xffffffff)
+
+ # delay 0 means that the event is expected to execute inmediately
+ delay = self.ns3.Seconds(0)
+
+ # flag to indicate that the event occured
+ # because bool is an inmutable object in python, in order to create a
+ # bool flag, a list is used as wrapper
+ has_event_occurred = [False]
+ condition.acquire()
+ try:
+ if not self.ns3.Simulator.IsFinished():
+ self.ns3.Simulator.ScheduleWithContext(contextId, delay, execute_event,
+ condition, has_event_occurred, func, *args)
+ while not has_event_occurred[0] and not self.ns3.Simulator.IsFinished():
+ condition.wait()
+ finally:
+ condition.release()
+
+ def _set_ns3_attr(self, uuid, name, value):
+ resource = self.get_resource(uuid)
+ ns3_value = self._to_ns3_value(uuid, name, value)
+
+ def set_attr(resource, name, ns3_value):
+ resource.SetAttribute(name, ns3_value)
+
+ if self._is_running:
+ # schedule the event in the Simulator
+ self._schedule_event(self._condition, set_attr, resource,
+ name, ns3_value)
+ else:
+ set_attr(resource, name, ns3_value)
+
+ def _get_ns3_attr(self, uuid, name):
+ resource = self.get_resource(uuid)
+ ns3_value = self._create_ns3_value(uuid, name)
+
+ def get_attr(resource, name, ns3_value):
+ resource.GetAttribute(name, ns3_value)
+
+ if self._is_running:
+ # schedule the event in the Simulator
+ self._schedule_event(self._condition, get_attr, resource,
+ name, ns3_value)
+ else:
+ get_attr(resource, name, ns3_value)
+
+ return self._from_ns3_value(uuid, name, ns3_value)
+
+ def _create_ns3_value(self, uuid, name):
+ typeid = get_typeid(uuid)
+ TypeId = self.ns3.TypeId()
+ tid = TypeId.LookupByName(typeid)
+ info = TypeId.AttributeInformation()
+ if not tid.LookupAttributeByName(name, info):
+ msg = "TypeId %s has no attribute %s" % (typeid, name)
+ self.logger.error(msg)
+
+ checker = info.checker
+ ns3_value = checker.Create()
+ return ns3_value
+
+ def _from_ns3_value(self, uuid, name, ns3_value):
+ typeid = get_typeid(uuid)
+ TypeId = self.ns3.TypeId()
+ tid = TypeId.LookupByName(typeid)
+ info = TypeId.AttributeInformation()
+ if not tid.LookupAttributeByName(name, info):
+ msg = "TypeId %s has no attribute %s" % (typeid, name)
+ self.logger.error(msg)
+
+ checker = info.checker
+ value = ns3_value.SerializeToString(checker)
+
+ type_name = checker.GetValueTypeName()
+ if type_name in ["ns3::UintegerValue", "ns3::IntegerValue"]:
+ return int(value)
+ if type_name == "ns3::DoubleValue":
+ return float(value)
+ if type_name == "ns3::BooleanValue":
+ return value == "true"
+
+ return value
+
+ def _to_ns3_value(self, uuid, name, value):
+ typeid = get_typeid(uuid)
+ TypeId = self.ns3.TypeId()
+ typeid = TypeId.LookupByName(typeid)
+ info = TypeId.AttributeInformation()
+ if not tid.LookupAttributeByName(name, info):
+ msg = "TypeId %s has no attribute %s" % (typeid, name)
+ self.logger.error(msg)
+
+ str_value = str(value)
+ if isinstance(value, bool):
+ str_value = str_value.lower()
+
+ checker = info.checker
+ ns3_value = checker.Create()
+ ns3_value.DeserializeFromString(str_value, checker)
+ return ns3_value
+
+ def _load_ns3_module(self):
+ if self.ns3:
+ return
+
+ import ctypes
+ import imp
+ import re
+ import pkgutil
+
+ bindings = os.environ.get("NS3BINDINGS")
+ libdir = os.environ.get("NS3LIBRARIES")
+
+ # Load the ns-3 modules shared libraries
+ if libdir:
+ files = os.listdir(libdir)
+ regex = re.compile("(.*\.so)$")
+ libs = [m.group(1) for filename in files for m in [regex.search(filename)] if m]
+
+ libscp = list(libs)
+ while len(libs) > 0:
+ for lib in libs:
+ libfile = os.path.join(libdir, lib)
+ try:
+ ctypes.CDLL(libfile, ctypes.RTLD_GLOBAL)
+ libs.remove(lib)
+ except:
+ pass
+
+ # if did not load any libraries in the last iteration break
+ # to prevent infinit loop
+ if len(libscp) == len(libs):
+ raise RuntimeError("Imposible to load shared libraries %s" % str(libs))
+ libscp = list(libs)
+
+ # import the python bindings for the ns-3 modules
+ if bindings:
+ sys.path.append(bindings)
+
+ # create a module to add all ns3 classes
+ ns3mod = imp.new_module("ns3")
+ sys.modules["ns3"] = ns3mod
+
+ # retrieve all ns3 classes and add them to the ns3 module
+ import ns
+ for importer, modname, ispkg in pkgutil.iter_modules(ns.__path__):
+ fullmodname = "ns.%s" % modname
+ module = __import__(fullmodname, globals(), locals(), ['*'])
+
+ # netanim.Config singleton overrides ns3::Config
+ if modname in ['netanim']:
+ continue
+
+ for sattr in dir(module):
+ if not sattr.startswith("_"):
+ attr = getattr(module, sattr)
+ setattr(ns3mod, sattr, attr)
+
+ self._ns3 = ns3mod
+
--- /dev/null
+
+def match_tags(box, all_tags, exact_tags):
+ """ returns True if box has required tags """
+ tall = set(all_tags)
+ texact = set(exact_tags)
+
+ if texact and box.connections == texact:
+ return True
+
+ if tall and tall.issubset(box.connections):
+ return True
+
+ return False
+
+def find_boxes(box, all_tags = None, exact_tags = None, max_depth = 1):
+ """ Look for the connected boxes with the required tags, doing breath-first
+ search, until max_depth ( max_depth = None will traverse the entire graph ).
+ """
+ if not all_tags and not exact_tags:
+ msg = "No matching criteria for resources."
+ raise RuntimeError(msg)
+
+ queue = set()
+ # enqueue (depth, box)
+ queue.add((0, box))
+
+ traversed = set()
+ traversed.add(box)
+
+ depth = 0
+
+ result = set()
+
+ while len(q) > 0:
+ (depth, a) = queue.pop()
+ if match_tags(a, all_tags, exact_tags):
+ result.add(a)
+
+ if not max_depth or depth <= max_depth:
+ depth += 1
+ for b in sorted(a.connections):
+ if b not in traversed:
+ traversed.add(b)
+ queue.add((depth, b))
+
+ return result
#!/usr/bin/env python
from neco.resources.linux.node import LinuxNode
-from neco.design.box import Box
from neco.util.sshfuncs import RUNNING, FINISHED
import os.path
class DummyEC(object):
pass
-class LinuxBoxTestCase(unittest.TestCase):
+class LinuxNodeTestCase(unittest.TestCase):
def setUp(self):
host = 'nepi2.pl.sophia.inria.fr'
user = 'inria_nepi'
self.home = '${HOME}/test-app'
def create_node(self, host, user):
- box = Box()
ec = DummyEC()
- node = LinuxNode(box, ec)
+ node = LinuxNode(ec, 1)
node.host = host
node.user = user
--- /dev/null
+#!/usr/bin/env python
+# Test based on ns-3 csma/examples/csma-ping.cc file
+#
+# Network topology
+#
+# n0 n1 n2 n3
+# | | | |
+# -----------------
+#
+# node n0 sends IGMP traffic to node n3
+
+
+from neco.resources.ns3.ns3wrapper import NS3Wrapper
+
+import os.path
+import time
+import tempfile
+import unittest
+
+class NS3WrapperTest(unittest.TestCase):
+ def setUp(self):
+ pass
+
+ def test_csma_ping(self):
+ wrapper = NS3Wrapper()
+
+ ### create 4 nodes
+ # NodeContainer c;
+ c = wrapper.create("NodeContainer")
+
+ # c.Create (4);
+ wrapper.invoke(c, "Create", 4)
+
+ ### connect the nodes to a shared channel
+ # CsmaHelper csma;
+ csma = wrapper.create("CsmaHelper")
+
+ # csma.SetChannelAttribute ("DataRate", DataRateValue (DataRate (5000000)));
+ dr = wrapper.create("DataRate", 5000000)
+ drv = wrapper.create("DataRateValue", dr)
+ wrapper.invoke(csma, "SetChannelAttribute", "DataRate", drv)
+
+ # csma.SetChannelAttribute ("Delay", TimeValue (MilliSeconds (2)));
+ ms = wrapper.create("MilliSeconds", 2)
+ delay = wrapper.create("TimeValue", ms)
+ wrapper.invoke(csma, "SetChannelAttribute", "Delay", delay)
+
+ # csma.SetDeviceAttribute ("EncapsulationMode", StringValue ("Llc"));
+ encap = wrapper.create("StringValue", "Llc")
+ wrapper.invoke(csma, "SetDeviceAttribute", "EncapsulationMode", encap)
+
+ # NetDeviceContainer devs = csma.Install (c);
+ devs = wrapper.invoke(csma, "Install", c)
+
+ ### add IP stack to all nodes
+ # InternetStackHelper ipStack;
+ ipStack = wrapper.create("InternetStackHelper")
+
+ # ipStack.Install (c);
+ wrapper.invoke(ipStack, "Install", c)
+
+ ### assign ip addresses
+ #Ipv4AddressHelper ip;
+ ip = wrapper.create("Ipv4AddressHelper")
+
+ # ip.SetBase ("192.168.1.0", "255.255.255.0");
+ ip4 = wrapper.create("Ipv4Address", "192.168.1.0")
+ mask4 = wrapper.create("Ipv4Mask", "255.255.255.0")
+ wrapper.invoke(ip, "SetBase", ip4, mask4)
+
+ # Ipv4InterfaceContainer addresses = ip.Assign (devs);
+ addresses = wrapper.invoke(ip, "Assign", devs)
+
+ ### Create source
+ config = wrapper.singleton("Config")
+
+ # Config::SetDefault ("ns3::Ipv4RawSocketImpl::Protocol", StringValue ("2"));
+ proto = wrapper.create("StringValue", "2")
+ wrapper.invoke(config, "SetDefault", "ns3::Ipv4RawSocketImpl::Protocol", proto)
+
+ # InetSocketAddress dst = InetSocketAddress (addresses.GetAddress (3));
+ addr3 = wrapper.invoke(addresses, "GetAddress", 3)
+ dst = wrapper.create("InetSocketAddress", addr3)
+
+ # OnOffHelper onoff = OnOffHelper ("ns3::Ipv4RawSocketFactory", dst);
+ onoff = wrapper.create("OnOffHelper", "ns3::Ipv4RawSocketFactory", dst)
+
+ # onoff.SetAttribute ("OnTime", RandomVariableValue (ConstantVariable (1.0)));
+ cv1 = wrapper.create("ConstantVariable", 1.0)
+ rand1 = wrapper.create("RandomVariableValue", cv1)
+ wrapper.invoke(onoff, "SetAttribute", "OnTime", rand1)
+
+ # onoff.SetAttribute ("OffTime", RandomVariableValue (ConstantVariable (0.0)));
+ cv2 = wrapper.create("ConstantVariable", 0.0)
+ rand2 = wrapper.create("RandomVariableValue", cv2)
+ wrapper.invoke(onoff, "SetAttribute", "OffTime", rand2)
+
+ # onoff.SetAttribute ("DataRate", DataRateValue (DataRate (15000)));
+ dr2 = wrapper.create("DataRate", 15000)
+ drv2 = wrapper.create("DataRateValue", dr2)
+ wrapper.invoke(onoff, "SetAttribute", "DataRate", drv2)
+
+ # onoff.SetAttribute ("PacketSize", UintegerValue (1200));
+ uiv = wrapper.create("UintegerValue", 1200)
+ wrapper.invoke(onoff, "SetAttribute", "PacketSize", uiv)
+
+ # ApplicationContainer apps = onoff.Install (c.Get (0));
+ n1 = wrapper.invoke(c, "Get", 0)
+ apps = wrapper.invoke(onoff, "Install", n1)
+
+ # apps.Start (Seconds (1.0));
+ s1 = wrapper.create("Seconds", 1.0)
+ wrapper.invoke(apps, "Start", s1)
+
+ # apps.Stop (Seconds (10.0));
+ s2 = wrapper.create("Seconds", 10.0)
+ wrapper.invoke(apps, "Stop", s2)
+
+ ### create sink
+ # PacketSinkHelper sink = PacketSinkHelper ("ns3::Ipv4RawSocketFactory", dst);
+ sink = wrapper.create("PacketSinkHelper", "ns3::Ipv4RawSocketFactory", dst)
+
+ # apps = sink.Install (c.Get (3));
+ n3 = wrapper.invoke(c, "Get", 3)
+ apps = wrapper.invoke (sink, "Install", n3)
+
+ # apps.Start (Seconds (0.0));
+ s3 = wrapper.create ("Seconds", 0.0)
+ wrapper.invoke (apps, "Start", s3)
+
+ # apps.Stop (Seconds (11.0));
+ s4 = wrapper.create ("Seconds", 11.0)
+ wrapper.invoke (apps, "Stop", s4)
+
+ ### create pinger
+ #V4PingHelper ping = V4PingHelper (addresses.GetAddress (2));
+ addr2 = wrapper.invoke(addresses, "GetAddress", 2)
+ ping = wrapper.create("V4PingHelper", addr2)
+
+ #NodeContainer pingers;
+ pingers = wrapper.create("NodeContainer")
+
+ #pingers.Add (c.Get (0));
+ n0 = wrapper.invoke(c, "Get", 0)
+ wrapper.invoke(pingers, "Add", n0)
+
+ #pingers.Add (c.Get (1));
+ n1 = wrapper.invoke(c, "Get", 1)
+ wrapper.invoke(pingers, "Add", n1)
+
+ #pingers.Add (c.Get (3));
+ n3 = wrapper.invoke(c, "Get", 3)
+ wrapper.invoke(pingers, "Add", n3)
+
+ #apps = ping.Install (pingers);
+ apps = wrapper.invoke(ping, "Install", pingers)
+
+ #apps.Start (Seconds (2.0));
+ s5 = wrapper.create ("Seconds", 2.0)
+ wrapper.invoke (apps, "Start", s5)
+
+ #apps.Stop (Seconds (5.0));
+ s6 = wrapper.create ("Seconds", 5.0)
+ wrapper.invoke (apps, "Stop", s6)
+
+ def SinkRx(packet, address):
+ print packet
+
+ def PingRtt(context, rtt):
+ print context, rtt
+
+ ### configure tracing
+ #csma.EnablePcapAll ("csma-ping", false);
+ wrapper.invoke(csma, "EnablePcapAll", "csma-ping", False)
+
+ # No binging for callback
+ #Config::ConnectWithoutContext ("/NodeList/3/ApplicationList/0/$ns3::PacketSink/Rx",
+ # MakeCallback (&SinkRx));
+ #cb = wrapper.create("MakeCallback", SinkRx)
+ #wrapper.invoke(config, "ConnectWithoutContext",
+ # "/NodeList/3/ApplicationList/0/$ns3::PacketSink/Rx", cb)
+
+ # Config::Connect ("/NodeList/*/ApplicationList/*/$ns3::V4Ping/Rtt",
+ # MakeCallback (&PingRtt));
+ #cb2 = wrapper.create("MakeCallback", PingRtt)
+ #wrapper.invoke(config, "ConnectWithoutContext",
+ # "/NodeList/*/ApplicationList/*/$ns3::V4Ping/Rtt",
+ # cb2)
+
+ # Packet::EnablePrinting ();
+ packet = wrapper.singleton("Packet")
+ wrapper.invoke(packet, "EnablePrinting")
+
+ ### run Simulation
+ # Simulator::Run ();
+ simulator = wrapper.singleton("Simulator")
+ wrapper.invoke(simulator, "Run")
+
+ # Simulator::Destroy ();
+ wrapper.invoke(simulator, "Destroy")
+
+if __name__ == '__main__':
+ unittest.main()
+