cecb78f5b9de854f38fe5729c48a5531d6fb14f0
[nepi.git] / examples / ccnx / planetlab_ccnx_multicast.py
1 #!/usr/bin/env python
2
3 ##
4 ## Experiment topology:
5 ## 
6 ##  ccncatchunks                                ccnsendchunks
7 ##       |                                            |
8 ##       .->  node1 -- .. -- nodei -- .. -- nodeN   <-.
9 ##    
10 ##
11 ##  - Nodes are connected through an overlay network over the Intenet
12 ##  - On each node runs a CCNx daemon
13 ##  - Static multicast entries are added to the CCNx FIB on each node to communicate them in series.
14 ##    (Nodes only have FIB entries to at most two nodes)
15 ##
16
17
18 from nepi.core.design import ExperimentDescription, FactoriesProvider
19 from nepi.core.execute import ExperimentController
20 from nepi.util.constants import ApplicationStatus as AS
21 import ipaddr
22 import math
23 from optparse import OptionParser, SUPPRESS_HELP
24 import os
25 import signal
26 import string
27 import subprocess
28 import tempfile
29 import time
30
31 # Trak SIGTERM, and set global termination flag instead of dying
32 TERMINATE = []
33 def _finalize(sig,frame):
34     global TERMINATE
35     TERMINATE.append(None)
36 signal.signal(signal.SIGTERM, _finalize)
37 signal.signal(signal.SIGINT, _finalize)
38
39 def create_slice_desc(slicename, plc_host, pl_user, pl_pwd, pl_ssh_key, 
40         port_base, root_dir, exp_desc):
41     pl_provider = FactoriesProvider("planetlab")
42     slice_desc = exp_desc.add_testbed_description(pl_provider)
43     slice_desc.set_attribute_value("homeDirectory", root_dir)
44     slice_desc.set_attribute_value("slice", slicename)
45     slice_desc.set_attribute_value("sliceSSHKey", pl_ssh_key)
46     slice_desc.set_attribute_value("authUser", pl_user)
47     slice_desc.set_attribute_value("authPass", pl_pwd)
48     slice_desc.set_attribute_value("plcHost", plc_host)
49     slice_desc.set_attribute_value("tapPortBase", port_base)
50     # Kills all running processes before starting the experiment
51     slice_desc.set_attribute_value("cleanProc", True)
52     # NOTICE: Setting 'cleanHome' to 'True' will erase all previous
53     # folders in the sliver Home directory, including result files!
54     #slice_desc.set_attribute_value("cleanHome", True)
55     slice_desc.set_attribute_value("plLogLevel", "DEBUG")
56     return slice_desc
57  
58 def create_node(hostname, pl_inet, slice_desc):
59     pl_node = slice_desc.create("Node")
60     pl_node.set_attribute_value("hostname", hostname)
61     pl_node.set_attribute_value("label", hostname)
62     pl_iface = slice_desc.create("NodeInterface")
63     pl_iface.connector("inet").connect(pl_inet.connector("devs"))
64     pl_node.connector("devs").connect(pl_iface.connector("node"))
65     return pl_node
66
67 def create_tunnel(node, peer, pl_nodes, slice_desc, subnet):
68     pl_node = pl_nodes[node]
69     pl_peer = pl_nodes[peer]
70
71     pl_tun = slice_desc.create("TunInterface")
72     pl_tun.set_attribute_value("label", "tun_%s%s" % (node, peer))
73     pl_node.connector("devs").connect(pl_tun.connector("node"))
74
75     pl_tunpeer = slice_desc.create("TunInterface")
76     pl_tunpeer.set_attribute_value("label", "tun_%s%s" % (peer, node))
77     pl_peer.connector("devs").connect(pl_tunpeer.connector("node"))
78
79     pl_tun.connector("udp").connect(pl_tunpeer.connector("udp"))
80     
81     iterhosts = subnet.iterhosts()
82     addr = iterhosts.next()
83     ip = pl_tun.add_address()
84     ip.set_attribute_value("Address", addr.exploded)
85     ip.set_attribute_value("NetPrefix", subnet.prefixlen)
86
87     peeraddr = iterhosts.next()
88     peerip = pl_tunpeer.add_address()
89     peerip.set_attribute_value("Address", peeraddr.exploded)
90     peerip.set_attribute_value("NetPrefix", subnet.prefixlen)
91
92 def create_ccnd(pl_node, port, hostname, routes, slice_desc):
93     pl_app = slice_desc.create("CCNxDaemon")
94     # We use a wildcard to replace the TUN IP address of the node during runtime
95     routes = "|".join(map(lambda route: "udp 224.0.23.170 %d 3 1 {#[tun_%s%s].addr[0].[Address]#}" \
96             % (route[1], hostname, route[0]), routes))
97     # Add multicast ccn routes 
98     pl_app.set_attribute_value("ccnRoutes", routes)
99     # Use a specific port to bind the CCNx daemon
100     if port:
101         pl_app.set_attribute_value("ccnLocalPort", port)
102     pl_app.enable_trace("stdout")
103     pl_app.enable_trace("stderr")
104     pl_app.connector("node").connect(pl_node.connector("apps"))
105
106 def create_ccnsendchunks(pl_node, port, slice_desc):
107     pl_app = slice_desc.create("Application")
108     path_to_video = os.path.join(os.path.dirname(os.path.abspath(__file__)),
109         "../big_buck_bunny_240p_mpeg4_lq.ts")
110     pl_app.set_attribute_value("stdin", path_to_video)
111
112     command = "ccnsendchunks ccnx:/VIDEO"
113     if port:
114         command = "CCN_LOCAL_PORT=%d %s " % (port, command)
115     pl_app.set_attribute_value("command", command)
116
117     pl_app.enable_trace("stdout")
118     pl_app.enable_trace("stderr")
119     pl_app.connector("node").connect(pl_node.connector("apps"))
120     return pl_app
121
122 def exec_ccncatchunks(slicename, port, hostname):
123     print "Starting Vlc streamming ..."
124
125     command = 'PATH=$PATH:$(ls | egrep nepi-ccnd- | head -1)/bin;'
126     if port:
127         command += "CCN_LOCAL_PORT=%d " % port
128     command += ' ccncatchunks2 ccnx:/VIDEO'
129
130     login = "%s@%s" % (slicename, hostname)
131     proc1 = subprocess.Popen(['ssh', login, command], stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell = False)
132     proc2 = subprocess.Popen(['vlc', 
133         '--sub-filter', 'marq', 
134         '--marq-marquee', 
135         '(c) copyright 2008, Blender Foundation / www.bigbuckbunny.org', 
136         '--marq-position=8', 
137         '--no-video-title-show',  '-'], 
138         stdin=proc1.stdout, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
139     return proc2
140
141 def create_ed(hostnames, vsys_vnet, slicename, plc_host, pl_user, pl_pwd, pl_ssh_key, 
142         port_base, root_dir, port):
143
144     # Create the experiment description object
145     exp_desc = ExperimentDescription()
146
147     # Create the slice description object
148     slice_desc = create_slice_desc(slicename, plc_host, pl_user, pl_pwd, pl_ssh_key, 
149         port_base, root_dir, exp_desc)
150     
151     # Create the Internet box object
152     pl_inet = slice_desc.create("Internet")
153     
154     # Create the Node boxes
155     pl_nodes = dict()
156     ccn_routes = dict()
157     prev_hostname = None
158     mport = port
159     for hostname in hostnames:
160         pl_node = create_node(hostname, pl_inet, slice_desc)
161         pl_nodes[hostname] = pl_node
162
163         ccn_routes[hostname] = list()
164         if prev_hostname:
165             ccn_routes[hostname].append((prev_hostname, mport))
166             ccn_routes[prev_hostname].append((hostname, mport))
167             mport +=1
168         prev_hostname = hostname
169
170     # Get the base network segment (slice vsys_vnet) to assign all the IP addresses
171     # to the virtual interfaces
172     base = ipaddr.IPNetwork(vsys_vnet)
173
174     # Calculate the number of virtual networks required to connect all the nodes 
175     # with all other nodes as the binomial coeficient C(n, 2), with n = #nodes
176     n = len(hostnames)
177     c = n * (n-1) / 2
178
179     # Validate that we can get 'c' /30 subnetworks
180     if c > math.pow(2, (30 - base.prefixlen)):
181         raise RuntimeError("Insufficient address segment %s for experiment", vsys_vnet)
182             
183     # Create the subnetwors iterator 
184     iter_sub = base.iter_subnets(new_prefix=30)
185
186     # Create tunnels between nodes
187     for i, node in enumerate(hostnames):
188         peers = hostnames[i+1:]
189         for peer in peers:
190             subnet = iter_sub.next()
191             create_tunnel(node, peer, pl_nodes, slice_desc, subnet)
192
193     # Create ccnd daemons in all nodes
194     for hostname, pl_node in pl_nodes.iteritems():
195         routes = ccn_routes[hostname]
196         create_ccnd(pl_node, port, hostname, routes, slice_desc)
197
198     # Create a ccnsendchunks application box in the first node
199     hostname = hostnames[0]
200     pl_node = pl_nodes[hostname]
201     pl_app = create_ccnsendchunks(pl_node, port, slice_desc)
202
203     return exp_desc, pl_nodes, hostname, pl_app
204
205 def run(hostnames, vsys_vnet, slicename, plc_host, pl_user, pl_pwd, pl_ssh_key, 
206         port_base, root_dir, port):
207
208     exp_desc, pl_nodes, hostname, pl_app = create_ed(hostnames, vsys_vnet, 
209             slicename, plc_host, pl_user, pl_pwd, pl_ssh_key, port_base, 
210             root_dir, port)
211
212     xml = exp_desc.to_xml()
213     controller = ExperimentController(xml, root_dir)
214     controller.start()
215     
216     while not TERMINATE and controller.status(pl_app.guid) == AS.STATUS_NOT_STARTED:
217         time.sleep(0.5)
218
219     proc = None
220     if not TERMINATE:
221         hostname = hostnames[-1]
222         proc = exec_ccncatchunks(slicename, port, hostname)
223
224     while not TERMINATE and proc and proc.poll() is None:
225         time.sleep(0.5)
226     
227     if proc:
228         if proc.poll() < 1:
229            err = proc.stderr.read()
230            print "ERROR ", err
231         else:   
232            out = proc.stdout.read()
233            print "OUTPUT ", out
234
235     controller.stop()
236     controller.shutdown()
237
238 if __name__ == '__main__':
239     root_dir = tempfile.mkdtemp()
240     slicename = os.environ.get("PL_SLICE")
241     pl_host = os.environ.get("PL_HOST", "www.planet-lab.eu")
242     port_base = 2000 + (os.getpid() % 1000) * 13
243     pl_ssh_key = os.environ.get(
244         "PL_SSH_KEY",
245         "%s/.ssh/id_rsa_planetlab" % (os.environ['HOME'],) )
246     pl_user = os.environ.get('PL_USER')
247     pl_pwd = os.environ.get('PL_PASS')
248     pl_vsys_vnet = os.environ.get('PL_VSYS_NET')
249     pl_hostnames = os.environ.get('PL_HOSTNAMES')
250     default_hostnames = ['openlab02.pl.sophia.inria.fr',
251                  'ple4.ipv6.lip6.fr',
252                  'planetlab2.di.unito.it',
253                  'merkur.planetlab.haw-hamburg.de',
254                  'planetlab1.cs.uit.no',
255                  'planetlab3.cs.st-andrews.ac.uk',
256                  'planetlab2.cs.uoi.gr',
257                  'planetlab3.xeno.cl.cam.ac.uk',
258                  'planet2.inf.tu-dresden.de',
259                  'planetlab2.csg.uzh.ch',
260                  'planetlab2.upm.ro',
261                  'planetlab-um00.di.uminho.pt',
262                  'planetlabpc2.upf.edu',
263                  'planet2.elte.hu',
264                  'planetlab2.esprit-tn.com' ]
265     ccn_local_port = os.environ.get('CCN_LOCAL_PORT', 49695)
266
267     usage = "usage: %prog -s <pl_slice> -H <pl_host> -k <ssh_key> -u <pl_user> -p <pl_password> -v <vsys_vnet> -N <host_names> -c <node_count> -P <ccn-local-port>"
268
269     parser = OptionParser(usage=usage)
270     parser.add_option("-s", "--slicename", dest="slicename", 
271             help="PlanetLab slicename", default=slicename, type="str")
272     parser.add_option("-H", "--pl-host", dest="pl_host", 
273             help="PlanetLab site (e.g. www.planet-lab.eu)", 
274             default=pl_host, type="str")
275     parser.add_option("-k", "--ssh-key", dest="pl_ssh_key", 
276             help="Path to private ssh key used for PlanetLab authentication", 
277             default=pl_ssh_key, type="str")
278     parser.add_option("-u", "--pl-user", dest="pl_user", 
279             help="PlanetLab account user (i.e. Registration email address)", 
280             default=pl_user, type="str")
281     parser.add_option("-p", "--pl-pwd", dest="pl_pwd", 
282             help="PlanetLab account password", default=pl_pwd, type="str")
283     parser.add_option("-v", "--vsys-vnet", dest="vsys_vnet", 
284             help="Value of the vsys_vnet tag addigned to your slice. (e.g. 192.168.3.0/16)", 
285             default=pl_vsys_vnet, type="str")
286     parser.add_option("-N", "--host-names", dest="hostnames", 
287             help="Comma separated list of PlanetLab hostnames to use", 
288             default=pl_hostnames, type="str")
289     parser.add_option("-c", "--node-count", dest="node_count", 
290             help="Number of nodes to use", 
291             default=5, type="str")
292     parser.add_option("-P", "--ccn-local-port", dest="port", 
293             help="Port to bind the CCNx daemon", 
294             default=ccn_local_port, type="int")
295
296     (options, args) = parser.parse_args()
297
298     hostnames = map(string.strip, options.hostnames.split(",")) if options.hostnames else default_hostnames
299     if options.node_count > 0 and options.node_count < len(hostnames):
300        hostnames = hostnames[0:options.node_count]
301     vsys_vnet = options.vsys_vnet
302     slicename = options.slicename
303     pl_host = options.pl_host
304     pl_user= options.pl_user
305     pl_pwd = options.pl_pwd
306     pl_ssh_key = options.pl_ssh_key
307     port = options.port
308
309     run(hostnames, vsys_vnet, slicename, pl_host, pl_user, pl_pwd, pl_ssh_key, 
310             port_base, root_dir, port)
311