2 # -*- coding: utf-8 -*-
5 from nepi.core.design import ExperimentDescription, FactoriesProvider
6 from nepi.core.execute import ExperimentController
7 from nepi.util import proxy
8 from nepi.util.constants import DeploymentConfiguration as DC, ATTR_NEPI_TESTBED_ENVIRONMENT_SETUP
9 from nepi.testbeds.planetlab import util as plutil
10 from optparse import OptionParser
23 class PlanetLabMulticastOverlay:
24 testbed_id = "planetlab"
25 slicename = "inria_nepi"
26 plchost = "www.planet-lab.eu"
27 plkey = os.environ.get(
29 "%s/.ssh/id_rsa_planetlab" % (os.environ['HOME'],) )
30 pluser = os.environ.get("PL_USER")
31 plpass = os.environ.get("PL_PASS")
34 port_base = 2000 + (os.getpid() % 1000) * 13
37 self.root_dir = tempfile.mkdtemp()
38 self.__class__.port_base = self.__class__.port_base + 100
42 shutil.rmtree(self.root_dir)
46 shutil.rmtree(self.root_dir)
48 def make_experiment_desc(self):
49 testbed_id = self.testbed_id
50 slicename = self.slicename
51 plchost = self.plchost
52 pl_ssh_key = self.plkey
56 exp_desc = ExperimentDescription()
57 pl_provider = FactoriesProvider(testbed_id)
58 pl_desc = exp_desc.add_testbed_description(pl_provider)
59 pl_desc.set_attribute_value("homeDirectory", self.root_dir)
60 pl_desc.set_attribute_value("slice", slicename)
61 pl_desc.set_attribute_value("sliceSSHKey", pl_ssh_key)
62 pl_desc.set_attribute_value("authUser", pl_user)
63 pl_desc.set_attribute_value("authPass", pl_pwd)
64 pl_desc.set_attribute_value("plcHost", plchost)
65 pl_desc.set_attribute_value("tapPortBase", self.port_base)
66 pl_desc.set_attribute_value("p2pDeployment", True)
67 pl_desc.set_attribute_value("dedicatedSlice", True)
68 pl_desc.set_attribute_value("plLogLevel", "INFO")
70 netns_provider = FactoriesProvider("netns")
71 netns_desc = exp_desc.add_testbed_description(netns_provider)
72 netns_desc.set_attribute_value("homeDirectory", self.root_dir)
73 netns_desc.set_attribute_value(DC.DEPLOYMENT_MODE, DC.MODE_DAEMON)
74 netns_root_dir = os.path.join(self.root_dir, "netns")
75 os.mkdir(netns_root_dir)
76 netns_desc.set_attribute_value(DC.ROOT_DIRECTORY, netns_root_dir)
77 netns_desc.set_attribute_value(DC.LOG_LEVEL, DC.DEBUG_LEVEL)
78 netns_desc.set_attribute_value(DC.USE_SUDO, True)
80 return pl_desc, netns_desc, exp_desc
83 def make_pl_tapnode(self, pl, ip, inet = None, label = None, hostname = None, routes = None, mcast = False, mcastrouter = False):
84 if not isinstance(ip, list):
88 node1 = pl.create("Node")
90 node1.set_attribute_value("label", label)
92 node1.set_attribute_value("hostname", hostname)
93 iface1 = pl.create("NodeInterface")
95 iface1.set_attribute_value("label", label+"iface")
98 for i,ip in enumerate(ips):
99 _tap1 = pl.create("TapInterface")
100 _tap1.set_attribute_value("multicast", True)
101 _tap1.enable_trace("pcap") # for error output
103 _tap1.set_attribute_value("label", label+"tap"+(str(i+1) if i else ""))
105 _tap1ip = _tap1.add_address()
106 _tap1ip.set_attribute_value("Address", ip)
107 _tap1ip.set_attribute_value("NetPrefix", 32)
108 _tap1ip.set_attribute_value("Broadcast", False)
110 node1.connector("devs").connect(_tap1.connector("node"))
113 tap1ip.append(_tap1ip)
115 inet = inet or pl.create("Internet")
116 node1.connector("devs").connect(iface1.connector("node"))
117 iface1.connector("inet").connect(inet.connector("devs"))
119 for destip, destprefix, nexthop in routes:
120 r1 = node1.add_route()
121 r1.set_attribute_value("Destination", destip)
122 r1.set_attribute_value("NetPrefix", destprefix)
123 r1.set_attribute_value("NextHop", nexthop)
126 fwd = pl.create("MulticastForwarder")
127 fwd.enable_trace("stderr")
128 fwd.connector("node").connect(node1.connector("apps"))
130 mrt = pl.create("MulticastRouter")
131 mrt.connector("fwd").connect(fwd.connector("router"))
132 mrt.enable_trace("stderr")
134 return node1, iface1, tap1, tap1ip, inet
136 def add_vlc_base(self, pl, node):
137 app = pl.create("Application")
138 app.set_attribute_value("rpmFusion", True)
139 app.set_attribute_value("depends", "vlc")
140 app.set_attribute_value("command", "vlc --version")
141 app.enable_trace("stdout")
142 app.enable_trace("stderr")
143 node.connector("apps").connect(app.connector("node"))
146 def add_vlc_restreamer(self, pl, node):
147 hostname = node.get_attribute_value("hostname")
148 app = self.add_vlc_base(pl, node)
149 app.set_attribute_value("label","vlc_restreamer_%d" % (node.guid,))
150 app.set_attribute_value("command",
152 " udp://@239.255.12.42"
153 " --sout '#rtp{port=6060,sdp=rtsp://"+hostname+":8080/test.sdp}'")
156 def add_vlc_dumper(self, pl, node):
157 app = self.add_vlc_base(pl, node)
158 app.set_attribute_value("label","vlc_dumper_%d" % (node.guid,))
159 app.set_attribute_value("command",
161 " udp://@239.255.12.42"
163 app.enable_trace("output")
166 def add_vlc_source(self, netns, node, iflabel):
167 app = netns_desc.create("Application")
168 app.set_attribute_value("user", os.getlogin())
169 app.set_attribute_value("label","vlc_source_%d" % (node.guid,))
170 app.set_attribute_value("command",
172 +os.path.basename(self.movie_source)
173 +"--miface-addr {#[%s].addr[0].[Address]#} " % (iflabel,)
174 +"--sout '#udp{dst=239.255.12.42,ttl=64}'")
175 app.connector("node").connect(node.connector("apps"))
178 def add_net_monitor(self, pl, node):
179 app = pl.create("Application")
180 app.set_attribute_value("label","network_monitor_%d" % (node.guid,))
181 app.set_attribute_value("command",
182 r"""head -n 2 /proc/net/dev ; while true ; do cat /proc/net/dev | sed -r 's/.*/'"$(date -R)"': \0/' | grep eth0 ; sleep 1 ; done""")
183 app.enable_trace("stdout")
184 node.connector("apps").connect(app.connector("node"))
187 def make_ns_in_pl(self, pl, exp, node1, iface1, root):
188 ns3_testbed_id = "ns3"
190 # Add NS3 support in node1
191 plnepi = pl.create("NepiDependency")
192 plns3 = pl.create("NS3Dependency")
193 plnepi.connector("node").connect(node1.connector("deps"))
194 plns3.connector("node").connect(node1.connector("deps"))
196 # Create NS3 testbed running in node1
197 ns3_provider = FactoriesProvider(ns3_testbed_id)
198 ns3_desc = exp.add_testbed_description(ns3_provider)
199 ns3_desc.set_attribute_value("rootDirectory", root)
200 ns3_desc.set_attribute_value("SimulatorImplementationType", "ns3::RealtimeSimulatorImpl")
201 ns3_desc.set_attribute_value("ChecksumEnabled", True)
202 ns3_desc.set_attribute_value(DC.DEPLOYMENT_HOST, "{#[%s].addr[0].[Address]#}" % (
203 iface1.get_attribute_value("label"),))
204 ns3_desc.set_attribute_value(DC.DEPLOYMENT_USER,
205 pl.get_attribute_value("slice"))
206 ns3_desc.set_attribute_value(DC.DEPLOYMENT_KEY,
207 pl.get_attribute_value("sliceSSHKey"))
208 ns3_desc.set_attribute_value(DC.DEPLOYMENT_MODE, DC.MODE_DAEMON)
209 ns3_desc.set_attribute_value(DC.DEPLOYMENT_COMMUNICATION, DC.ACCESS_SSH)
210 ns3_desc.set_attribute_value(DC.DEPLOYMENT_ENVIRONMENT_SETUP,
212 node1.get_attribute_value("label"),
213 ATTR_NEPI_TESTBED_ENVIRONMENT_SETUP,))
214 ns3_desc.set_attribute_value(DC.LOG_LEVEL, DC.DEBUG_LEVEL)
218 def make_netns_node(self, netns_desc):
219 node = netns_desc.create("Node")
220 node.set_attribute_value("forward_X11", True)
223 def make_pl_netns_connection(self, pl_desc, pl_node, netns_desc,
224 netns_node, netns_iface_label, vnet):
225 base = struct.unpack('!L',socket.inet_aton(vnet))[0]
226 netns_addr = socket.inet_ntoa(struct.pack('!L',(base | 1)))
227 pl_addr = socket.inet_ntoa(struct.pack('!L',(base | 2)))
229 pl_tap = pl_desc.create("TunInterface")
230 pl_tap.set_attribute_value("multicast", True)
231 #pl_tap.set_attribute_value("tun_cipher", "PLAIN")
232 #pl_tap.enable_trace("pcap")
233 #pl_tap.enable_trace("packets")
234 addr = pl_tap.add_address()
235 adrr.set_attribute_value("Address", pl_addr)
236 addr.set_attribute_value("NetPrefix", 32)
237 addr.set_attribute_value("Broadcast", False)
238 pl_node.connector("devs").connect(pl_tap.connector("node"))
240 netns_tap = netns_desc.create("TunNodeInterface")
241 netns_tap.set_attribute_value("label", netns_iface_label)
242 netns_tap.set_attribute_value("up", True)
243 netns_tap.set_attribute_value("mtu", 1448)
244 addr = netns_tap.add_address()
245 adrr.set_attribute_value("Address", netns_addr)
246 addr.set_attribute_value("NetPrefix", 32)
247 addr.set_attribute_value("Broadcast", False)
248 route = netns_node.add_route()
249 route.set_attribute_value("Destination", vnet)
250 r1.set_attribute_value("NetPrefix", 24)
251 r1.set_attribute_value("NextHop", pl_addr)
252 netns_node.connector("devs").connect(netns_tap.connector("node"))
254 netns_tunchannel = netns_desc.create("TunChannel")
255 #netns_tunchannel.set_attribute_value("tun_cipher", "PLAIN")
256 netns_tunchannel.connector("->fd").connect(netns_tap.connector("fd->"))
257 pl_tap.connector("tcp").connect(netns_tunchannel.connector("tcp"))
259 def make_pl_overlay(self, numnodes, num_wifi):
260 ns3_testbed_id = "ns3"
262 pl, netns, exp = self.make_experiment_desc()
263 # We'll make a distribution spanning tree using prefix matching as a distance
264 api = plutil.getAPI(self.pluser, self.plpass)
265 nodes = plutil.getNodes(api, numnodes, operatingSystem = 'f12')
266 root = min(nodes, key=operator.attrgetter('hostname'))
267 links = list(plutil.getSpanningTree(nodes, root=root))
272 node.childips = set()
274 # Build an explicit tree
275 for slave, master in links:
276 master.children.append(slave)
278 # We have to assign IPs and routes.
279 # The IP will be assigned sequentially, depth-first.
280 # This will result in rather compact routing rules
281 nextip = [128-numnodes]
282 def traverse(traverse, node, parent=None, base=struct.unpack('!L',socket.inet_aton(self.vnet))[0]):
284 raise RuntimeError, "Too many IPs to assign!"
286 node.vif_addr = base | (nextip[0])
287 nips = 1+len(node.children) # one vif per child, plus one for the parent
290 for i in xrange(nips):
291 node.vif_ips.add(node.vif_addr+i)
294 parent.childips.update(node.vif_ips)
296 for i,child in enumerate(node.children):
297 traverse(traverse, child, node, base)
300 parent.childips.update(node.childips)
302 traverse(traverse, root)
304 def printtree(printtree, node, indent=''):
305 print indent, '-', socket.inet_ntoa(struct.pack('!L',node.vif_addr)), node.country, node.city, node.site
306 for child in node.children:
307 childips = map(ipaddr.IPAddress, child.childips)
308 childnets = ipaddr.collapse_address_list(childips)
309 cip = ipaddr.IPAddress(child.vif_addr)
310 for cnet in childnets:
311 print indent, '|- R', cnet, '->', cip
312 printtree(printtree, child, indent+' | ')
313 printtree(printtree, root)
315 inet = pl.create("Internet")
317 def maketree(maketree, node, parent=None, parentIp=None):
320 for i,child in enumerate(node.children):
321 childips = map(ipaddr.IPAddress, child.childips)
322 childnets = ipaddr.collapse_address_list(childips)
323 cip = ipaddr.IPAddress(child.vif_addr)
324 pip = ipaddr.IPAddress(node.vif_addr+1+i)
325 for cnet in childnets:
326 routes.append((cnet.ip.exploded, cnet.prefixlen, cip.exploded))
327 ctaps.append( maketree(maketree, child, node, pip) )
329 routes.append((self.vnet,24,parentIp))
335 ips = [ ipaddr.IPAddress(node.vif_addr+i) for i in xrange(1+len(node.children)) ]
336 node1, iface1, tap1, tap1ip, _ = self.make_pl_tapnode(pl, ips, inet,
337 hostname = node.hostname,
339 mcastrouter = bool(node.children),
343 for tap, ctap in zip(tap1[1:], ctaps):
344 tap.connector("udp").connect(ctap.connector("udp"))
346 self.add_net_monitor(pl, node1)
347 self.add_vlc_restreamer(pl, node1)
348 if random.random() < 0.1 and parent:
349 self.add_vlc_dumper(pl, node1)
352 maketree(maketree, root)
354 # create a netns node and connect it to the root pl node
355 pl_root = exp_desc.get_element_by_label("root")
356 netns_source = self.make_netns_node(netns)
357 iflabel = "source-iface"
358 self.make_pl_netns_connection(pl_desc, pl_root, netns_desc,
359 netns_source, iflabel, self.vnet)
360 self.add_vlc_source(netns, netns_n, iflabel)
363 test_dir = "./results"
366 controller = ExperimentController(xml, self.root_dir)
369 print >>sys.stderr, "Press CTRL-C to shut down"
373 except KeyboardInterrupt:
377 for testbed_guid, guids in controller.traces_info().iteritems():
378 for guid, traces in guids.iteritems():
379 for name, data in traces.iteritems():
380 path = data["filepath"]
385 print >>sys.stderr, "Downloading trace", path
387 filepath = os.path.join(test_dir, path)
390 trace = controller.trace(guid, name)
392 traceback.print_exc(file=sys.stderr)
395 if not os.path.exists(os.path.dirname(filepath)):
396 os.makedirs(os.path.dirname(filepath))
398 traceback.print_exc(file=sys.stderr)
401 if len(trace) >= 2**20:
402 # Bigger than 1M, compress
403 tracefile = gzip.GzipFile(filepath+".gz", "wb")
405 tracefile = open(filepath,"wb")
407 tracefile.write(trace)
411 traceback.print_exc(file=sys.stderr)
417 traceback.print_exc()
419 controller.shutdown()
422 traceback.print_exc()
425 if __name__ == '__main__':
426 usage = "usage: %prog -n number_sta -m movie -u user"
427 parser = OptionParser(usage=usage)
428 parser.add_option("-u", "--user", dest="user", help="Valid linux system user (not root).", type="str")
429 parser.add_option("-m", "--movie", dest="movie", help="Path to movie file to play", type="str")
430 parser.add_option("-n", "--nsta", dest="nsta", help="Number of wifi stations", type="int")
431 parser.add_option("-N", "--nodes", dest="nsta", help="Number of overlay nodes", type="int")
432 parser.add_option("-a", "--base_addr", dest="base_addr", help="Base address segment for the experiment", type="str")
433 parser.add_option("-s", "--slicename", dest="slicename", help="PlanetLab slice", type="str")
434 (options, args) = parser.parse_args()
435 if not options.movie:
436 parser.error("Missing 'movie' option.")
437 if options.user == 'root':
438 parser.error("Missing or invalid 'user' option.")
439 if options.nsta and options.nsta > 8:
440 parser.error("Try a number of stations under 9.")
442 exp = PlanetLabMulticastOverlay()
443 exp.movie_source = options.movie
446 exp.make_pl_overlay(50, 8)