62a33c6ba87e503fc30bdfd27accc7d7497c8736
[nepi.git] / examples / ccnx / planetlab_ccnx_multicast.py
1 #!/usr/bin/env python
2
3 from nepi.core.design import ExperimentDescription, FactoriesProvider
4 from nepi.core.execute import ExperimentController
5 from nepi.util.constants import ApplicationStatus as AS
6 import ipaddr
7 import math
8 from optparse import OptionParser, SUPPRESS_HELP
9 import os
10 import signal
11 import string
12 import subprocess
13 import tempfile
14 import time
15
16 # Trak SIGTERM, and set global termination flag instead of dying
17 TERMINATE = []
18 def _finalize(sig,frame):
19     global TERMINATE
20     TERMINATE.append(None)
21 signal.signal(signal.SIGTERM, _finalize)
22 signal.signal(signal.SIGINT, _finalize)
23
24 def create_slice_desc(slicename, plc_host, pl_user, pl_pwd, pl_ssh_key, 
25         port_base, root_dir, exp_desc):
26     pl_provider = FactoriesProvider("planetlab")
27     slice_desc = exp_desc.add_testbed_description(pl_provider)
28     slice_desc.set_attribute_value("homeDirectory", root_dir)
29     slice_desc.set_attribute_value("slice", slicename)
30     slice_desc.set_attribute_value("sliceSSHKey", pl_ssh_key)
31     slice_desc.set_attribute_value("authUser", pl_user)
32     slice_desc.set_attribute_value("authPass", pl_pwd)
33     slice_desc.set_attribute_value("plcHost", plc_host)
34     slice_desc.set_attribute_value("tapPortBase", port_base)
35     # Kills all running processes before starting the experiment
36     slice_desc.set_attribute_value("dedicatedSlice", True)
37     slice_desc.set_attribute_value("plLogLevel", "DEBUG")
38     return slice_desc
39  
40 def create_node(hostname, pl_inet, slice_desc):
41     pl_node = slice_desc.create("Node")
42     pl_node.set_attribute_value("hostname", hostname)
43     pl_node.set_attribute_value("label", hostname)
44     pl_iface = slice_desc.create("NodeInterface")
45     pl_iface.connector("inet").connect(pl_inet.connector("devs"))
46     pl_node.connector("devs").connect(pl_iface.connector("node"))
47     return pl_node
48
49 def create_tunnel(node, peer, pl_nodes, slice_desc, subnet):
50     pl_node = pl_nodes[node]
51     pl_peer = pl_nodes[peer]
52
53     pl_tun = slice_desc.create("TunInterface")
54     pl_tun.set_attribute_value("label", "tun_%s%s" % (node, peer))
55     pl_node.connector("devs").connect(pl_tun.connector("node"))
56
57     pl_tunpeer = slice_desc.create("TunInterface")
58     pl_tunpeer.set_attribute_value("label", "tun_%s%s" % (peer, node))
59     pl_peer.connector("devs").connect(pl_tunpeer.connector("node"))
60
61     pl_tun.connector("udp").connect(pl_tunpeer.connector("udp"))
62     
63     iterhosts = subnet.iterhosts()
64     addr = iterhosts.next()
65     ip = pl_tun.add_address()
66     ip.set_attribute_value("Address", addr.exploded)
67     ip.set_attribute_value("NetPrefix", subnet.prefixlen)
68
69     peeraddr = iterhosts.next()
70     peerip = pl_tunpeer.add_address()
71     peerip.set_attribute_value("Address", peeraddr.exploded)
72     peerip.set_attribute_value("NetPrefix", subnet.prefixlen)
73
74 def create_ccnd(pl_node, hostname, routes, slice_desc):
75     pl_app = slice_desc.create("CCNxDaemon")
76     # We use a wildcard to replace the TUN IP address of the node during runtime
77     routes = "|".join(map(lambda route: "udp 224.0.23.170 %d 3 1 {#[tun_%s%s].addr[0].[Address]#}" \
78             % (route[1], hostname, route[0]), routes))
79     # Add multicast ccn routes 
80     pl_app.set_attribute_value("ccnroutes", routes)
81     pl_app.enable_trace("stdout")
82     pl_app.enable_trace("stderr")
83     pl_app.connector("node").connect(pl_node.connector("apps"))
84
85 def create_ccnsendchunks(pl_node, slice_desc):
86     pl_app = slice_desc.create("Application")
87     path_to_video = os.path.join(os.path.dirname(os.path.abspath(__file__)),
88         "../big_buck_bunny_240p_mpeg4_lq.ts")
89     pl_app.set_attribute_value("stdin", path_to_video)
90     pl_app.set_attribute_value("command", "ccnsendchunks ccnx:/VIDEO")
91     pl_app.enable_trace("stdout")
92     pl_app.enable_trace("stderr")
93     pl_app.connector("node").connect(pl_node.connector("apps"))
94     return pl_app
95
96 def exec_ccncatchunks(slicename, hostname):
97     print "Starting Vlc streamming ..."
98     login = "%s@%s" % (slicename, hostname)
99     command = 'PATH=$PATH:$(ls | egrep nepi-ccnd- | head -1)/bin; ccncatchunks2 ccnx:/VIDEO'
100     proc1 = subprocess.Popen(['ssh', login, command], stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell = False)
101     proc2 = subprocess.Popen(['vlc', '-'], stdin=proc1.stdout, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
102     return proc2
103
104 def create_ed(hostnames, vsys_vnet, slicename, plc_host, pl_user, pl_pwd, pl_ssh_key, 
105         port_base, root_dir):
106
107     # Create the experiment description object
108     exp_desc = ExperimentDescription()
109
110     # Create the slice description object
111     slice_desc = create_slice_desc(slicename, plc_host, pl_user, pl_pwd, pl_ssh_key, 
112         port_base, root_dir, exp_desc)
113     
114     # Create the Internet box object
115     pl_inet = slice_desc.create("Internet")
116     
117     # Create the Node boxes
118     pl_nodes = dict()
119     ccn_routes = dict()
120     prev_hostname = None
121     port = 49695
122     for hostname in hostnames:
123         pl_node = create_node(hostname, pl_inet, slice_desc)
124         pl_nodes[hostname] = pl_node
125
126         ccn_routes[hostname] = list()
127         if prev_hostname:
128             ccn_routes[hostname].append((prev_hostname, port))
129             ccn_routes[prev_hostname].append((hostname,  port))
130             port +=1
131         prev_hostname = hostname
132
133     # Get the base network segment (slice vsys_vnet) to assign all the IP addresses
134     # to the virtual interfaces
135     base = ipaddr.IPNetwork(vsys_vnet)
136
137     # Calculate the number of virtual networks required to connect all the nodes 
138     # with all other nodes as the binomial coeficient C(n, 2), with n = #nodes
139     n = len(hostnames)
140     c = math.factorial(n) / (2 * math.factorial(n-2)) 
141
142     # Validate that we can get 'c' /30 subnetworks
143     if c > math.pow(2, (30 - base.prefixlen)):
144         raise RuntimeError("Insufficient address segment %s for experiment", vsys_vnet)
145             
146     # Create the subnetwors iterator 
147     iter_sub = base.iter_subnets(new_prefix=30)
148
149     # Create tunnels between nodes
150     for i, node in enumerate(hostnames):
151         peers = hostnames[i+1:]
152         for peer in peers:
153             subnet = iter_sub.next()
154             create_tunnel(node, peer, pl_nodes, slice_desc, subnet)
155
156     # Create ccnd daemons in all nodes
157     for hostname, pl_node in pl_nodes.iteritems():
158         routes = ccn_routes[hostname]
159         create_ccnd(pl_node, hostname, routes, slice_desc)
160
161     # Create a ccnsendchunks application box in the first node
162     hostname = hostnames[0]
163     pl_node = pl_nodes[hostname]
164     pl_app = create_ccnsendchunks(pl_node, slice_desc)
165
166     return exp_desc, pl_nodes, hostname, pl_app
167
168 def run(hostnames, vsys_vnet, slicename, plc_host, pl_user, pl_pwd, pl_ssh_key, 
169         port_base, root_dir):
170
171     exp_desc, pl_nodes, hostname, pl_app = create_ed(hostnames, vsys_vnet, 
172             slicename, plc_host, pl_user, pl_pwd, pl_ssh_key, port_base, 
173             root_dir)
174
175     xml = exp_desc.to_xml()
176     controller = ExperimentController(xml, root_dir)
177     controller.start()
178     
179     while not TERMINATE and controller.status(pl_app.guid) == AS.STATUS_NOT_STARTED:
180         time.sleep(0.5)
181
182     proc = None
183     if not TERMINATE:
184         hostname = hostnames[-1]
185         proc = exec_ccncatchunks(slicename, hostname)
186
187     while not TERMINATE and proc and proc.poll() is None:
188         time.sleep(0.5)
189     
190     if proc:
191         if proc.poll() < 1:
192            err = proc.stderr.read()
193            print "ERROR ", err
194         else:   
195            out = proc.stdout.read()
196            print "OUTPUT ", out
197
198     controller.stop()
199     controller.shutdown()
200
201 if __name__ == '__main__':
202     root_dir = tempfile.mkdtemp()
203     slicename = os.environ.get("PL_SLICE")
204     pl_host = os.environ.get("PL_HOST", "www.planet-lab.eu")
205     port_base = 2000 + (os.getpid() % 1000) * 13
206     pl_ssh_key = os.environ.get(
207         "PL_SSH_KEY",
208         "%s/.ssh/id_rsa_planetlab" % (os.environ['HOME'],) )
209     pl_user = os.environ.get('PL_USER')
210     pl_pwd = os.environ.get('PL_PASS')
211     pl_vsys_vnet = os.environ.get('PL_VSYS_NET')
212     pl_hostnames = os.environ.get('PL_HOSTNAMES')
213     default_hostnames = ['openlab02.pl.sophia.inria.fr',
214                  'ple4.ipv6.lip6.fr',
215                  'planetlab2.di.unito.it',
216                  'merkur.planetlab.haw-hamburg.de',
217                  'planetlab1.cs.uit.no',
218                  'planetlab3.cs.st-andrews.ac.uk',
219                  'planetlab2.cs.uoi.gr',
220                  'planetlab3.xeno.cl.cam.ac.uk',
221                  'planet2.inf.tu-dresden.de',
222                  'planetlab2.csg.uzh.ch',
223                  'planetlab2.upm.ro',
224                  'planetlab-um00.di.uminho.pt',
225                  'planetlabpc2.upf.edu',
226                  'planet2.elte.hu',
227                  'planetlab2.esprit-tn.com' ]
228
229     usage = "usage: %prog -s <pl_slice> -H <pl_host> -k <ssh_key> -u <pl_user> -p <pl_password> -v <vsys_vnet> -N <host_names> -c <node_count>"
230
231     parser = OptionParser(usage=usage)
232     parser.add_option("-s", "--slicename", dest="slicename", 
233             help="PlanetLab slicename", default=slicename, type="str")
234     parser.add_option("-H", "--pl-host", dest="pl_host", 
235             help="PlanetLab site (e.g. www.planet-lab.eu)", 
236             default=pl_host, type="str")
237     parser.add_option("-k", "--ssh-key", dest="pl_ssh_key", 
238             help="Path to private ssh key used for PlanetLab authentication", 
239             default=pl_ssh_key, type="str")
240     parser.add_option("-u", "--pl-user", dest="pl_user", 
241             help="PlanetLab account user (i.e. Registration email address)", 
242             default=pl_user, type="str")
243     parser.add_option("-p", "--pl-pwd", dest="pl_pwd", 
244             help="PlanetLab account password", default=pl_pwd, type="str")
245     parser.add_option("-v", "--vsys-vnet", dest="vsys_vnet", 
246             help="Value of the vsys_vnet tag addigned to your slice. (e.g. 192.168.3.0/16)", 
247             default=pl_vsys_vnet, type="str")
248     parser.add_option("-N", "--host-names", dest="hostnames", 
249             help="Comma separated list of PlanetLab hostnames to use", 
250             default=pl_hostnames, type="str")
251     parser.add_option("-c", "--node-count", dest="node_count", 
252             help="Number of nodes to use", 
253             default=5, type="str")
254     (options, args) = parser.parse_args()
255
256     hostnames = map(string.strip, options.hostnames.split(",")) if options.hostnames else default_hostnames
257     if options.node_count > 0 and options.node_count < len(hostnames):
258        hostnames = hostnames[0:options.node_count]
259     vsys_vnet = options.vsys_vnet
260     slicename = options.slicename
261     pl_host = options.pl_host
262     pl_user= options.pl_user
263     pl_pwd = options.pl_pwd
264     pl_ssh_key = options.pl_ssh_key
265
266     run(hostnames, vsys_vnet, slicename, pl_host, pl_user, pl_pwd, pl_ssh_key, 
267             port_base, root_dir)
268