2 # -*- coding: utf-8 -*-
4 from constants import TESTBED_ID
5 from nepi.core import testbed_impl
6 from nepi.core.attributes import Attribute
7 from nepi.util.constants import TIME_NOW
15 class TunChannel(object):
17 # These get initialized when the channel is configured
18 self.external_addr = None
20 # These get initialized when the channel is configured
21 # They're part of the TUN standard attribute set
25 # These get initialized when the channel is connected to its peer
26 self.peer_proto = None
30 # These get initialized when the channel is connected to its iface
31 self.tun_socket = None
33 # same as peer proto, but for execute-time standard attribute lookups
39 self._terminate = [] # terminate signaller
40 self._connected = threading.Event()
41 self._forwarder_thread = None
43 # Generate an initial random cryptographic key to use for tunnelling
44 # Upon connection, both endpoints will agree on a common one based on
46 self.tun_key = ( ''.join(map(chr, [
49 for r in (random.SystemRandom(),) ])
50 ).encode("base64").strip() )
54 return "%s<ip:%s/%s %s%s>" % (
55 self.__class__.__name__,
56 self.address, self.netprefix,
57 " up" if self.up else " down",
58 " snat" if self.snat else "",
62 if not self.udp and self.listen and not self._forwarder_thread:
63 if self.listen or (self.peer_addr and self.peer_port and self.peer_proto):
67 if not self._forwarder_thread:
71 if self._forwarder_thread:
75 if self._forwarder_thread:
76 self._connected.wait()
79 if self._forwarder_thread:
80 if not self._terminate:
81 self._terminate.append(None)
82 self._forwarder_thread.join()
85 # Launch forwarder thread with a weak reference
86 # to self, so that we don't create any strong cycles
87 # and automatic refcounting works as expected
88 self._forwarder_thread = threading.Thread(
90 args = (weakref.ref(self),) )
91 self._forwarder_thread.start()
94 def _forwarder(weak_self):
97 # grab strong reference
102 peer_port = self.peer_port
103 peer_addr = self.peer_addr
104 peer_proto= self.peer_proto
106 local_port = self.tun_port
107 local_addr = self.tun_addr
108 local_proto = self.tun_proto
110 if local_proto != peer_proto:
111 raise RuntimeError, "Peering protocol mismatch: %s != %s" % (local_proto, peer_proto)
113 udp = local_proto == 'udp'
116 if (udp or not listen) and (not peer_port or not peer_addr):
117 raise RuntimeError, "Misconfigured peer for: %s" % (self,)
119 if (udp or listen) and (not local_port or not local_addr):
120 raise RuntimeError, "Misconfigured TUN: %s" % (self,)
122 TERMINATE = self._terminate
123 cipher_key = self.tun_key
124 tun = self.tun_socket
127 raise RuntimeError, "Unconnected TUN channel %s" % (self,)
131 if remaining_args and not remaining_args[0].startswith('-'):
132 rsock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
133 rsock.bind((local_addr,local_port))
134 rsock.connect((peer_addr,peer_port))
135 remote = os.fdopen(rsock.fileno(), 'r+b', 0)
137 # accept tcp connections
138 lsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
139 lsock.bind((local_addr,local_port))
141 rsock,raddr = lsock.accept()
142 remote = os.fdopen(rsock.fileno(), 'r+b', 0)
144 # connect to tcp server
145 rsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
148 rsock.connect((peer_addr,peer_port))
151 # wait a while, retry
154 rsock.connect((peer_addr,peer_port))
155 remote = os.fdopen(rsock.fileno(), 'r+b', 0)
157 # notify that we're ready
158 self._connected.set()
160 # drop strong reference
163 tunchannel.tun_fwd(tun, remote,
166 cipher_key = cipher_key,
168 TERMINATE = TERMINATE,
169 stderr = open("/dev/null","w") # silence logging
176 class TestbedController(testbed_impl.TestbedController):
178 'ns3::Nepi::TunChannel' : TunChannel,
181 def __init__(self, testbed_version):
182 super(TestbedController, self).__init__(TESTBED_ID, testbed_version)
184 self._home_directory = None
185 self._traces = dict()
186 self._simulator_thread = None
187 self._condition = None
190 self.TunChannel = TunChannel
193 def home_directory(self):
194 return self._home_directory
201 self._home_directory = self._attributes.\
202 get_attribute_value("homeDirectory")
203 self._ns3 = self._load_ns3_module()
206 home = os.path.normpath(self.home_directory)
207 if not os.path.exists(home):
208 os.makedirs(home, 0755)
210 super(TestbedController, self).do_setup()
213 super(TestbedController, self).start()
214 self._condition = threading.Condition()
215 self._simulator_thread = threading.Thread(target = self._simulator_run,
216 args = [self._condition])
217 self._simulator_thread.start()
219 def set(self, guid, name, value, time = TIME_NOW):
220 super(TestbedController, self).set(guid, name, value, time)
221 # TODO: take on account schedule time for the task
222 factory_id = self._create[guid]
223 factory = self._factories[factory_id]
224 if factory.box_attributes.is_attribute_design_only(name) or \
225 factory.box_attributes.is_attribute_invisible(name):
227 element = self._elements[guid]
228 if factory_id in self.LOCAL_FACTORIES:
229 setattr(element, name, value)
231 ns3_value = self._to_ns3_value(guid, name, value)
232 element.SetAttribute(name, ns3_value)
234 def get(self, guid, name, time = TIME_NOW):
235 value = super(TestbedController, self).get(guid, name, time)
236 # TODO: take on account schedule time for the task
237 factory_id = self._create[guid]
238 factory = self._factories[factory_id]
239 if factory.box_attributes.is_attribute_design_only(name) or \
240 factory.box_attributes.is_attribute_invisible(name):
242 if factory_id in self.LOCAL_FACTORIES:
243 return getattr(element, name)
244 TypeId = self.ns3.TypeId()
245 typeid = TypeId.LookupByName(factory_id)
246 info = TypeId.AttributeInfo()
247 if not typeid or not typeid.LookupAttributeByName(name, info):
248 raise AttributeError("Invalid attribute %s for element type %d" % \
250 checker = info.checker
251 ns3_value = checker.Create()
252 element = self._elements[guid]
253 element.GetAttribute(name, ns3_value)
254 value = ns3_value.SerializeToString(checker)
255 attr_type = factory.box_attributes.get_attribute_type(name)
256 if attr_type == Attribute.INTEGER:
258 if attr_type == Attribute.DOUBLE:
260 if attr_type == Attribute.BOOL:
261 return value == "true"
264 def action(self, time, guid, action):
265 raise NotImplementedError
267 def trace_filename(self, guid, trace_id):
268 # TODO: Need to be defined inside a home!!!! with and experiment id_code
269 filename = self._traces[guid][trace_id]
270 return os.path.join(self.home_directory, filename)
272 def follow_trace(self, guid, trace_id, filename):
273 if guid not in self._traces:
274 self._traces[guid] = dict()
275 self._traces[guid][trace_id] = filename
278 for element in self._elements.values():
281 def _simulator_run(self, condition):
283 self.ns3.Simulator.Run()
284 # Signal condition on simulation end to notify waiting threads
286 condition.notifyAll()
289 def _schedule_event(self, condition, func, *args):
290 """Schedules event on running experiment"""
291 def execute_event(condition, has_event_occurred, func, *args):
295 has_event_occurred[0] = True
296 # notify condition indicating attribute was set
298 condition.notifyAll()
301 # contextId is defined as general context
302 contextId = long(0xffffffff)
303 # delay 0 means that the event is expected to execute inmediately
304 delay = self.ns3.Seconds(0)
305 # flag to indicate that the event occured
306 # because bool is an inmutable object in python, in order to create a
307 # bool flag, a list is used as wrapper
308 has_event_occurred = [False]
310 if not self.ns3.Simulator.IsFinished():
311 self.ns3.Simulator.ScheduleWithContext(contextId, delay, execute_event,
312 condition, has_event_occurred, func, *args)
313 while not has_event_occurred[0] and not self.ns3.Simulator.IsFinished():
316 if not has_event_occurred[0]:
317 raise RuntimeError('Event could not be scheduled : %s %s ' \
318 % (repr(func), repr(args)))
320 def _to_ns3_value(self, guid, name, value):
321 factory_id = self._create[guid]
322 TypeId = self.ns3.TypeId()
323 typeid = TypeId.LookupByName(factory_id)
324 info = TypeId.AttributeInfo()
325 if not typeid.LookupAttributeByName(name, info):
326 raise RuntimeError("Attribute %s doesn't belong to element %s" \
327 % (name, factory_id))
328 str_value = str(value)
329 if isinstance(value, bool):
330 str_value = str_value.lower()
331 checker = info.checker
332 ns3_value = checker.Create()
333 ns3_value.DeserializeFromString(str_value, checker)
336 def _load_ns3_module(self):
340 simu_impl_type = self._attributes.get_attribute_value(
341 "SimulatorImplementationType")
342 checksum = self._attributes.get_attribute_value("ChecksumEnabled")
344 bindings = os.environ["NEPI_NS3BINDINGS"] \
345 if "NEPI_NS3BINDINGS" in os.environ else None
346 libfile = os.environ["NEPI_NS3LIBRARY"] \
347 if "NEPI_NS3LIBRARY" in os.environ else None
350 ctypes.CDLL(libfile, ctypes.RTLD_GLOBAL)
352 path = [ os.path.dirname(__file__) ] + sys.path
354 path = [ bindings ] + path
357 module = imp.find_module ('ns3', path)
358 mod = imp.load_module ('ns3', *module)
360 # In some environments, ns3 per-se does not exist,
361 # only the low-level _ns3
362 module = imp.find_module ('_ns3', path)
363 mod = imp.load_module ('_ns3', *module)
364 sys.modules["ns3"] = mod # install it as ns3 too
366 # When using _ns3, we have to make sure we destroy
367 # the simulator when the process finishes
369 atexit.register(mod.Simulator.Destroy)
372 value = mod.StringValue(simu_impl_type)
373 mod.GlobalValue.Bind ("SimulatorImplementationType", value)
375 value = mod.BooleanValue(checksum)
376 mod.GlobalValue.Bind ("ChecksumEnabled", value)
379 def _get_construct_parameters(self, guid):
380 params = self._get_parameters(guid)
381 construct_params = dict()
382 factory_id = self._create[guid]
383 TypeId = self.ns3.TypeId()
384 typeid = TypeId.LookupByName(factory_id)
385 for name, value in params.iteritems():
386 info = self.ns3.TypeId.AttributeInfo()
387 found = typeid.LookupAttributeByName(name, info)
389 (info.flags & TypeId.ATTR_CONSTRUCT == TypeId.ATTR_CONSTRUCT):
390 construct_params[name] = value
391 return construct_params