7372f92a75ea3dad73b742b1e043c07228e93a69
[nepi.git] / examples / multicast_overlay.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 import getpass
5 from nepi.core.design import ExperimentDescription, FactoriesProvider
6 from nepi.core.execute import ExperimentController
7 from nepi.util import proxy
8 from nepi.util.constants import DeploymentConfiguration as DC, ATTR_NEPI_TESTBED_ENVIRONMENT_SETUP
9 from nepi.testbeds.planetlab import util as plutil
10 from optparse import OptionParser
11 import os
12 import sys
13 import shutil
14 import tempfile
15 import time
16 import struct
17 import socket
18 import operator
19 import ipaddr
20 import gzip
21 import random
22
23 class PlanetLabMulticastOverlay:
24     testbed_id = "planetlab"
25     slicename = "inria_nepi"
26     plchost = "www.planet-lab.eu"
27     plkey = os.environ.get(
28             "PL_SSH_KEY",
29             "%s/.ssh/id_rsa_planetlab" % (os.environ['HOME'],) )
30     pluser = os.environ.get("PL_USER")
31     plpass = os.environ.get("PL_PASS")
32     vnet = "192.168.3.0"
33     
34     port_base = 2000 + (os.getpid() % 1000) * 13
35     
36     def setUp(self):
37         self.root_dir = tempfile.mkdtemp()
38         self.__class__.port_base = self.__class__.port_base + 100
39
40     def tearDown(self):
41         try:
42             shutil.rmtree(self.root_dir)
43         except:
44             # retry
45             time.sleep(0.1)
46             shutil.rmtree(self.root_dir)
47
48     def make_experiment_desc(self):
49         testbed_id = self.testbed_id
50         slicename = self.slicename
51         plchost = self.plchost
52         pl_ssh_key = self.plkey
53         pl_user = self.pluser
54         pl_pwd = self.plpass
55
56         exp_desc = ExperimentDescription()
57         pl_provider = FactoriesProvider(testbed_id)
58         pl_desc = exp_desc.add_testbed_description(pl_provider)
59         pl_desc.set_attribute_value("homeDirectory", self.root_dir)
60         pl_desc.set_attribute_value("slice", slicename)
61         pl_desc.set_attribute_value("sliceSSHKey", pl_ssh_key)
62         pl_desc.set_attribute_value("authUser", pl_user)
63         pl_desc.set_attribute_value("authPass", pl_pwd)
64         pl_desc.set_attribute_value("plcHost", plchost)
65         pl_desc.set_attribute_value("tapPortBase", self.port_base)
66         pl_desc.set_attribute_value("p2pDeployment", True)
67         pl_desc.set_attribute_value("dedicatedSlice", True)
68         pl_desc.set_attribute_value("plLogLevel", "INFO")
69    
70         netns_provider = FactoriesProvider("netns")
71         netns_desc = exp_desc.add_testbed_description(netns_provider)
72         netns_desc.set_attribute_value("homeDirectory", self.root_dir)
73         netns_desc.set_attribute_value(DC.DEPLOYMENT_MODE, DC.MODE_DAEMON)
74         netns_root_dir = os.path.join(self.root_dir, "netns")
75         os.mkdir(netns_root_dir)
76         netns_desc.set_attribute_value(DC.ROOT_DIRECTORY, netns_root_dir)
77         netns_desc.set_attribute_value(DC.LOG_LEVEL, DC.DEBUG_LEVEL)
78         netns_desc.set_attribute_value(DC.USE_SUDO, True)
79
80         return pl_desc, netns_desc, exp_desc
81     
82
83     def make_pl_tapnode(self, pl, ip, inet = None, label = None, hostname = None, routes = None, mcast = False, mcastrouter = False):
84         if not isinstance(ip, list):
85             ips = [ip]
86         else:
87             ips = ip
88         node1 = pl.create("Node")
89         if label: 
90             node1.set_attribute_value("label", label)
91         if hostname: 
92             node1.set_attribute_value("hostname", hostname)
93         iface1 = pl.create("NodeInterface")
94         if label:
95             iface1.set_attribute_value("label", label+"iface")
96         tap1 = []
97         tap1ip = []
98         for i,ip in enumerate(ips):
99             _tap1 = pl.create("TapInterface")
100             _tap1.set_attribute_value("multicast", True)
101             _tap1.enable_trace("pcap") # for error output
102             if label:
103                 _tap1.set_attribute_value("label", label+"tap"+(str(i+1) if i else ""))
104         
105             _tap1ip = _tap1.add_address()
106             _tap1ip.set_attribute_value("Address", ip)
107             _tap1ip.set_attribute_value("NetPrefix", 32)
108             _tap1ip.set_attribute_value("Broadcast", False)
109         
110             node1.connector("devs").connect(_tap1.connector("node"))
111             
112             tap1.append(_tap1)
113             tap1ip.append(_tap1ip)
114             
115         inet = inet or pl.create("Internet")
116         node1.connector("devs").connect(iface1.connector("node"))
117         iface1.connector("inet").connect(inet.connector("devs"))
118         
119         for destip, destprefix, nexthop in routes:
120             r1 = node1.add_route()
121             r1.set_attribute_value("Destination", destip)
122             r1.set_attribute_value("NetPrefix", destprefix)
123             r1.set_attribute_value("NextHop", nexthop)
124         
125         if mcast:
126             fwd = pl.create("MulticastForwarder")
127             fwd.enable_trace("stderr")
128             fwd.connector("node").connect(node1.connector("apps"))
129             if mcastrouter:
130                 mrt = pl.create("MulticastRouter")
131                 mrt.connector("fwd").connect(fwd.connector("router"))
132                 mrt.enable_trace("stderr")
133                 
134         return node1, iface1, tap1, tap1ip, inet
135     
136     def add_vlc_base(self, pl, node):
137         app = pl.create("Application")
138         app.set_attribute_value("rpmFusion", True)
139         app.set_attribute_value("depends", "vlc")
140         app.set_attribute_value("command", "vlc --version")
141         app.enable_trace("stdout")
142         app.enable_trace("stderr")
143         node.connector("apps").connect(app.connector("node"))
144         return app
145     
146     def add_vlc_restreamer(self, pl, node):
147         hostname = node.get_attribute_value("hostname")
148         app = self.add_vlc_base(pl, node)
149         app.set_attribute_value("label","vlc_restreamer_%d" % (node.guid,))
150         app.set_attribute_value("command",
151             "vlc -vvv -I dummy"
152             " udp://@239.255.12.42"
153             " --sout '#rtp{port=6060,sdp=rtsp://"+hostname+":8080/test.sdp}'")
154         return app
155     
156     def add_vlc_dumper(self, pl, node):
157         app = self.add_vlc_base(pl, node)
158         app.set_attribute_value("label","vlc_dumper_%d" % (node.guid,))
159         app.set_attribute_value("command",
160             "vlc -vvv -I dummy"
161             " udp://@239.255.12.42"
162             " --sout output")
163         app.enable_trace("output")
164         return app
165     
166     def add_vlc_source(self, netns, node, iflabel):
167         app = netns_desc.create("Application")
168         app.set_attribute_value("user", os.getlogin())
169         app.set_attribute_value("label","vlc_source_%d" % (node.guid,))
170         app.set_attribute_value("command",
171             "vlc -vvv -I dummy "
172             +os.path.basename(self.movie_source)
173             +"--miface-addr {#[%s].addr[0].[Address]#} " % (iflabel,)
174             +"--sout '#udp{dst=239.255.12.42,ttl=64}'")
175         app.connector("node").connect(node.connector("apps"))
176         return app
177     
178     def add_net_monitor(self, pl, node):
179         app = pl.create("Application")
180         app.set_attribute_value("label","network_monitor_%d" % (node.guid,))
181         app.set_attribute_value("command", 
182             r"""head -n 2 /proc/net/dev ; while true ; do cat /proc/net/dev | sed -r 's/.*/'"$(date -R)"': \0/' | grep eth0 ; sleep 1 ; done""")
183         app.enable_trace("stdout")
184         node.connector("apps").connect(app.connector("node"))
185         return app
186     
187     def make_ns_in_pl(self, pl, exp, node1, iface1, root):
188         ns3_testbed_id = "ns3"
189         
190         # Add NS3 support in node1
191         plnepi = pl.create("NepiDependency")
192         plns3 = pl.create("NS3Dependency")
193         plnepi.connector("node").connect(node1.connector("deps"))
194         plns3.connector("node").connect(node1.connector("deps"))
195
196         # Create NS3 testbed running in node1
197         ns3_provider = FactoriesProvider(ns3_testbed_id)
198         ns3_desc = exp.add_testbed_description(ns3_provider)
199         ns3_desc.set_attribute_value("rootDirectory", root)
200         ns3_desc.set_attribute_value("SimulatorImplementationType", "ns3::RealtimeSimulatorImpl")
201         ns3_desc.set_attribute_value("ChecksumEnabled", True)
202         ns3_desc.set_attribute_value(DC.DEPLOYMENT_HOST, "{#[%s].addr[0].[Address]#}" % (
203             iface1.get_attribute_value("label"),))
204         ns3_desc.set_attribute_value(DC.DEPLOYMENT_USER, 
205             pl.get_attribute_value("slice"))
206         ns3_desc.set_attribute_value(DC.DEPLOYMENT_KEY, 
207             pl.get_attribute_value("sliceSSHKey"))
208         ns3_desc.set_attribute_value(DC.DEPLOYMENT_MODE, DC.MODE_DAEMON)
209         ns3_desc.set_attribute_value(DC.DEPLOYMENT_COMMUNICATION, DC.ACCESS_SSH)
210         ns3_desc.set_attribute_value(DC.DEPLOYMENT_ENVIRONMENT_SETUP,
211             "{#[%s].[%s]#}" % (
212                 node1.get_attribute_value("label"),
213                 ATTR_NEPI_TESTBED_ENVIRONMENT_SETUP,))
214         ns3_desc.set_attribute_value(DC.LOG_LEVEL, DC.DEBUG_LEVEL)
215         
216         return ns3_desc
217     
218     def make_netns_node(self, netns_desc):
219         node = netns_desc.create("Node")
220         node.set_attribute_value("forward_X11", True)
221         return node
222
223     def make_pl_netns_connection(self, pl_desc, pl_node, netns_desc,
224             netns_node, netns_iface_label, vnet):
225         base = struct.unpack('!L',socket.inet_aton(vnet))[0]
226         netns_addr = socket.inet_ntoa(struct.pack('!L',(base | 1)))
227         pl_addr = socket.inet_ntoa(struct.pack('!L',(base | 2)))
228
229         pl_tap = pl_desc.create("TunInterface")
230         pl_tap.set_attribute_value("multicast", True) 
231         #pl_tap.set_attribute_value("tun_cipher", "PLAIN") 
232         #pl_tap.enable_trace("pcap")
233         #pl_tap.enable_trace("packets")
234         addr = pl_tap.add_address()
235         adrr.set_attribute_value("Address", pl_addr)
236         addr.set_attribute_value("NetPrefix", 32)
237         addr.set_attribute_value("Broadcast", False)
238         pl_node.connector("devs").connect(pl_tap.connector("node"))
239         
240         netns_tap = netns_desc.create("TunNodeInterface")
241         netns_tap.set_attribute_value("label", netns_iface_label)
242         netns_tap.set_attribute_value("up", True)
243         netns_tap.set_attribute_value("mtu", 1448)
244         addr = netns_tap.add_address()
245         adrr.set_attribute_value("Address", netns_addr)
246         addr.set_attribute_value("NetPrefix", 32)
247         addr.set_attribute_value("Broadcast", False)
248         route = netns_node.add_route()
249         route.set_attribute_value("Destination", vnet)
250         r1.set_attribute_value("NetPrefix", 24)
251         r1.set_attribute_value("NextHop", pl_addr)
252         netns_node.connector("devs").connect(netns_tap.connector("node"))
253
254         netns_tunchannel = netns_desc.create("TunChannel")
255         #netns_tunchannel.set_attribute_value("tun_cipher", "PLAIN") 
256         netns_tunchannel.connector("->fd").connect(netns_tap.connector("fd->"))
257         pl_tap.connector("tcp").connect(netns_tunchannel.connector("tcp"))
258
259     def make_pl_overlay(self, numnodes, num_wifi):
260         ns3_testbed_id = "ns3"
261         
262         pl, netns, exp = self.make_experiment_desc()
263         # We'll make a distribution spanning tree using prefix matching as a distance
264         api = plutil.getAPI(self.pluser, self.plpass)
265         nodes = plutil.getNodes(api, numnodes, operatingSystem = 'f12')
266         root = min(nodes, key=operator.attrgetter('hostname'))
267         links = list(plutil.getSpanningTree(nodes, root=root))
268       
269         for node in nodes:
270             node.vif_ips = set()
271             node.children = []
272             node.childips = set()
273         
274         # Build an explicit tree
275         for slave, master in links:
276             master.children.append(slave)
277         
278         # We have to assign IPs and routes.
279         # The IP will be assigned sequentially, depth-first.
280         # This will result in rather compact routing rules
281         nextip = [128-numnodes]
282         def traverse(traverse, node, parent=None, base=struct.unpack('!L',socket.inet_aton(self.vnet))[0]):
283             if nextip[0] >= 254:
284                 raise RuntimeError, "Too many IPs to assign!"
285             
286             node.vif_addr = base | (nextip[0])
287             nips = 1+len(node.children) # one vif per child, plus one for the parent
288             nextip[0] += nips
289             
290             for i in xrange(nips):
291                 node.vif_ips.add(node.vif_addr+i)
292
293             if parent:
294                 parent.childips.update(node.vif_ips)
295
296             for i,child in enumerate(node.children):
297                 traverse(traverse, child, node, base)
298                 
299             if parent:
300                 parent.childips.update(node.childips)
301                 
302         traverse(traverse, root)
303         
304         def printtree(printtree, node, indent=''):
305             print indent, '-', socket.inet_ntoa(struct.pack('!L',node.vif_addr)), node.country, node.city, node.site
306             for child in node.children:
307                 childips = map(ipaddr.IPAddress, child.childips)
308                 childnets = ipaddr.collapse_address_list(childips)
309                 cip = ipaddr.IPAddress(child.vif_addr)
310                 for cnet in childnets:
311                     print indent, '|- R', cnet, '->', cip
312                 printtree(printtree, child, indent+' | ')
313         printtree(printtree, root)
314
315         inet = pl.create("Internet")
316         
317         def maketree(maketree, node, parent=None, parentIp=None):
318             routes = []
319             ctaps = []
320             for i,child in enumerate(node.children):
321                 childips = map(ipaddr.IPAddress, child.childips)
322                 childnets = ipaddr.collapse_address_list(childips)
323                 cip = ipaddr.IPAddress(child.vif_addr)
324                 pip = ipaddr.IPAddress(node.vif_addr+1+i)
325                 for cnet in childnets:
326                     routes.append((cnet.ip.exploded, cnet.prefixlen, cip.exploded))
327                 ctaps.append( maketree(maketree, child, node, pip) )
328             if parentIp:
329                 routes.append((self.vnet,24,parentIp))
330             
331             if not parent:
332                 label = "root"
333             else:
334                 label = None
335             ips = [ ipaddr.IPAddress(node.vif_addr+i) for i in xrange(1+len(node.children)) ]
336             node1, iface1, tap1, tap1ip, _ = self.make_pl_tapnode(pl, ips, inet, 
337                 hostname = node.hostname,
338                 routes = routes,
339                 mcastrouter = bool(node.children),
340                 mcast = True,
341                 label = label )
342             
343             for tap, ctap in zip(tap1[1:], ctaps):
344                 tap.connector("udp").connect(ctap.connector("udp"))
345             
346             self.add_net_monitor(pl, node1)
347             self.add_vlc_restreamer(pl, node1)
348             if random.random() < 0.1 and parent:
349                 self.add_vlc_dumper(pl, node1)
350             
351             return tap1[0]
352         maketree(maketree, root)
353
354         # create a netns node and connect it to the root pl node
355         pl_root = exp_desc.get_element_by_label("root")
356         netns_source = self.make_netns_node(netns)
357         iflabel = "source-iface"
358         self.make_pl_netns_connection(pl_desc, pl_root, netns_desc, 
359                 netns_source, iflabel, self.vnet)
360         self.add_vlc_source(netns, netns_n, iflabel)
361  
362         xml = exp.to_xml()
363         test_dir = "./results"
364
365         try:
366             controller = ExperimentController(xml, self.root_dir)
367             controller.start()
368             
369             print >>sys.stderr, "Press CTRL-C to shut down"
370             try:
371                 while True:
372                     time.sleep(10)
373             except KeyboardInterrupt:
374                 pass
375             
376             # download results
377             for testbed_guid, guids in controller.traces_info().iteritems():
378                 for guid, traces in guids.iteritems():
379                     for name, data in traces.iteritems():
380                         path = data["filepath"]
381                         
382                         if not path:
383                             continue
384                         
385                         print >>sys.stderr, "Downloading trace", path
386                         
387                         filepath = os.path.join(test_dir, path)
388                         
389                         try:
390                             trace = controller.trace(guid, name)
391                         except:
392                             traceback.print_exc(file=sys.stderr)
393                             continue
394                         try:
395                             if not os.path.exists(os.path.dirname(filepath)):
396                                 os.makedirs(os.path.dirname(filepath))
397                         except:
398                             traceback.print_exc(file=sys.stderr)
399                         
400                         try:
401                             if len(trace) >= 2**20:
402                                 # Bigger than 1M, compress
403                                 tracefile = gzip.GzipFile(filepath+".gz", "wb")
404                             else:
405                                 tracefile = open(filepath,"wb")
406                             try:
407                                 tracefile.write(trace)
408                             finally:
409                                 tracefile.close()
410                         except:
411                             traceback.print_exc(file=sys.stderr)
412         finally:
413             try:
414                 controller.stop()
415             except:
416                 import traceback
417                 traceback.print_exc()
418             try:
419                 controller.shutdown()
420             except:
421                 import traceback
422                 traceback.print_exc()
423
424
425 if __name__ == '__main__':
426     usage = "usage: %prog -n number_sta -m movie -u user"
427     parser = OptionParser(usage=usage)
428     parser.add_option("-u", "--user", dest="user", help="Valid linux system user (not root).", type="str")
429     parser.add_option("-m", "--movie", dest="movie", help="Path to movie file to play", type="str")
430     parser.add_option("-n", "--nsta", dest="nsta", help="Number of wifi stations", type="int")
431     parser.add_option("-N", "--nodes", dest="nsta", help="Number of overlay nodes", type="int")
432     parser.add_option("-a", "--base_addr", dest="base_addr", help="Base address segment for the experiment", type="str")
433     parser.add_option("-s", "--slicename", dest="slicename", help="PlanetLab slice", type="str")
434     (options, args) = parser.parse_args()
435     if not options.movie:
436         parser.error("Missing 'movie' option.")
437     if options.user == 'root':
438         parser.error("Missing or invalid 'user' option.")
439     if options.nsta and options.nsta > 8:
440         parser.error("Try a number of stations under 9.")
441
442     exp = PlanetLabMulticastOverlay()
443     exp.movie_source = options.movie
444     try:
445         exp.setUp()
446         exp.make_pl_overlay(50, 8)
447     finally:
448         exp.tearDown()
449