08625eaf1c143670c80580d04410f1bde3e939d6
[nepi.git] / examples / linux / ccn / ccncat_extended_ring_topo.py
1 #!/usr/bin/env python
2
3 #
4 #    NEPI, a framework to manage network experiments
5 #    Copyright (C) 2013 INRIA
6 #
7 #    This program is free software: you can redistribute it and/or modify
8 #    it under the terms of the GNU General Public License as published by
9 #    the Free Software Foundation, either version 3 of the License, or
10 #    (at your option) any later version.
11 #
12 #    This program is distributed in the hope that it will be useful,
13 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
14 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 #    GNU General Public License for more details.
16 #
17 #    You should have received a copy of the GNU General Public License
18 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
19 #
20 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
21
22 #
23 # CCN topology:
24 #
25 #                h2
26 #                0 
27 #  content   l1 / \ l2         ccncat
28 #  b1          /l5 \           b2
29 #  0 ----- h1 0 --- 0 h3 ------ 0
30 #              \   / 
31 #            l4 \ / l3
32 #                0
33 #                h4
34 # Experiment:
35 # - t0 : b2 retrives video published in b1
36 # - t1 : l1 goes down
37 # - t2 : l2 goes down
38 # - t3 : l5 goes up
39 #
40
41 from nepi.execution.ec import ExperimentController, ECState 
42 from nepi.execution.resource import ResourceState, ResourceAction, \
43         populate_factory
44 from nepi.execution.trace import TraceAttr
45
46 import subprocess
47 from optparse import OptionParser, SUPPRESS_HELP
48
49 import os
50 import time
51
52 def add_node(ec, host, user, ssh_key = None):
53     node = ec.register_resource("LinuxNode")
54     ec.set(node, "hostname", host)
55     ec.set(node, "username", user)
56     ec.set(node, "identity", ssh_key)
57     #ec.set(node, "cleanHome", True)
58     ec.set(node, "cleanProcesses", True)
59     return node
60
61 def add_ccnd(ec, node):
62     ccnd = ec.register_resource("LinuxCCND")
63     ec.set(ccnd, "debug", 7)
64     ec.register_connection(ccnd, node)
65     return ccnd
66
67 def add_ccnr(ec, ccnd):
68     ccnr = ec.register_resource("LinuxCCNR")
69     ec.register_connection(ccnr, ccnd)
70     return ccnr
71
72 def add_fib_entry(ec, ccnd, peer_host):
73     entry = ec.register_resource("LinuxFIBEntry")
74     ec.set(entry, "host", peer_host)
75     ec.register_connection(entry, ccnd)
76     return entry
77
78 def add_content(ec, ccnr, content_name, content):
79     co = ec.register_resource("LinuxCCNContent")
80     ec.set(co, "contentName", content_name)
81     ec.set(co, "content", content)
82     ec.register_connection(co, ccnr)
83     return co
84
85 def add_stream(ec, ccnd, content_name):
86     # ccnx v7.2 issue 101007 
87     command = "ccnpeek %(content_name)s; ccncat %(content_name)s" % {
88             "content_name" : content_name}
89
90     app = ec.register_resource("LinuxCCNApplication")
91     ec.set(app, "command", command)
92     ec.register_connection(app, ccnd)
93
94     return app
95
96 def add_collector(ec, trace_name, store_dir):
97     collector = ec.register_resource("Collector")
98     ec.set(collector, "traceName", trace_name)
99     ec.set(collector, "storeDir", store_dir)
100
101     return collector
102
103 def get_options():
104     pl_slice = os.environ.get("PL_SLICE")
105
106     # We use a specific SSH private key for PL if the PL_SSHKEY is specified or the
107     # id_rsa_planetlab exists 
108     default_key = "%s/.ssh/id_rsa_planetlab" % (os.environ['HOME'])
109     default_key = default_key if os.path.exists(default_key) else None
110     pl_ssh_key = os.environ.get("PL_SSHKEY", default_key)
111
112     usage = "usage: %prog -s <pl-user> -m <movie> -e <exp-id> -i <ssh_key> -r <results"
113
114     parser = OptionParser(usage=usage)
115     parser.add_option("-s", "--pl-user", dest="pl_user", 
116             help="PlanetLab slicename", default = pl_slice, type="str")
117     parser.add_option("-m", "--movie", dest="movie", 
118             help="Stream movie", type="str")
119     parser.add_option("-e", "--exp-id", dest="exp_id", 
120             help="Label to identify experiment", type="str")
121     parser.add_option("-i", "--pl-ssh-key", dest="pl_ssh_key", 
122             help="Path to private SSH key to be used for connection", 
123             default = pl_ssh_key, type="str")
124     parser.add_option("-r", "--results", dest="results", default = "/tmp",  
125             help="Path to directory where to store results", type="str") 
126
127     (options, args) = parser.parse_args()
128
129     if not options.movie:
130         parser.error("movie is a required argument")
131
132     return (options.pl_user, options.movie, options.exp_id, options.pl_ssh_key,
133             options.results)
134
135 if __name__ == '__main__':
136     content_name = "ccnx:/test/VIDEO"
137     
138     ( pl_user, movie, exp_id, pl_ssh_key, results_dir ) = get_options()
139
140     # Search for available RMs
141     populate_factory()
142     
143     ec = ExperimentController(exp_id = exp_id)
144     
145     # hosts in Europe
146     #host1 = "planetlab2.u-strasbg.fr"
147     #host2 = "planet1.servers.ua.pt"
148     #host3 = "planetlab1.cs.uoi.gr"
149     #host4 = "planetlab1.aston.ac.uk"
150     #host5 = "planetlab2.willab.fi"
151     #host6 = "planetlab-1.fokus.fraunhofer.de"
152
153     # host in the US
154     host1 = "planetlab4.wail.wisc.edu"
155     host2 = "planetlab2.cs.columbia.edu"
156     host3 = "ricepl-2.cs.rice.edu"
157     host4 = "node1.planetlab.albany.edu"
158     host5 = "earth.cs.brown.edu"
159     host6 = "planetlab2.engr.uconn.edu"
160
161     # describe nodes in the central ring
162     ring_hosts = [host1, host2, host3, host4]
163     ccnds = dict()
164
165     for i in xrange(len(ring_hosts)):
166         host = ring_hosts[i]
167         node = add_node(ec, host, pl_user, pl_ssh_key)
168         ccnd = add_ccnd(ec, node)
169         ccnr = add_ccnr(ec, ccnd)
170         ccnds[host] = ccnd
171     
172     ## Add ccn ring links
173     # l1 : h1 - h2 , h2 - h1
174     l1u = add_fib_entry(ec, ccnds[host1], host2)
175     l1d = add_fib_entry(ec, ccnds[host2], host1)
176
177     # l2 : h2 - h3 , h3 - h2
178     l2u = add_fib_entry(ec, ccnds[host2], host3)
179     l2d = add_fib_entry(ec, ccnds[host3], host2)
180
181     # l3 : h3 - h4 , h4 - h3
182     l3u = add_fib_entry(ec, ccnds[host3], host4)
183     l3d = add_fib_entry(ec, ccnds[host4], host3)
184
185     # l4 : h4 - h1 , h1 - h4
186     l4u = add_fib_entry(ec, ccnds[host4], host1)
187     l4d = add_fib_entry(ec, ccnds[host1], host4)
188
189     # l5 : h1 - h3 , h3 - h1
190     l5u = add_fib_entry(ec, ccnds[host1], host3)
191     l5d = add_fib_entry(ec, ccnds[host3], host1)
192     
193     # border node 1
194     bnode1 = add_node(ec, host5, pl_user, pl_ssh_key)
195     ccndb1 = add_ccnd(ec, bnode1)
196     ccnrb1 = add_ccnr(ec, ccndb1)
197     ccnds[host5] = ccndb1
198     co = add_content(ec, ccnrb1, content_name, movie)
199
200     # border node 2
201     bnode2 = add_node(ec, host6, pl_user, pl_ssh_key)
202     ccndb2 = add_ccnd(ec, bnode2)
203     ccnrb2 = add_ccnr(ec, ccndb2)
204     ccnds[host6] = ccndb2
205     app = add_stream(ec, ccndb2, content_name)
206  
207     # connect border nodes
208     add_fib_entry(ec, ccndb1, host1)
209     add_fib_entry(ec, ccnds[host1], host5)
210
211     add_fib_entry(ec, ccndb2, host3)
212     add_fib_entry(ec, ccnds[host3], host6)
213
214     # Put down l5 10s after transfer started
215     ec.register_condition(l5u, ResourceAction.STOP, 
216             app, ResourceState.STARTED, time = "10s")
217     ec.register_condition(l5d, ResourceAction.STOP, 
218             app, ResourceState.STARTED, time = "10s")
219  
220     # Register a collector to automatically collect traces
221     collector = add_collector(ec, "stderr", results_dir)
222     for ccnd in ccnds.values():
223         ec.register_connection(collector, ccnd)
224
225     # deploy all ResourceManagers
226     ec.deploy()
227
228     # Wait until ccncat has started retrieving the content
229     ec.wait_started([app])
230
231     rvideo_path = ec.trace(app, "stdout", attr = TraceAttr.PATH)
232     command = 'tail -f %s' % rvideo_path
233
234     # pulling the content of the video received
235     # on b2, to stream it locally
236     proc1 = subprocess.Popen(['ssh',
237         '-o', 'StrictHostKeyChecking=no',
238         '-l', pl_user, host6,
239         command],
240         stdout = subprocess.PIPE, 
241         stderr = subprocess.PIPE)
242     
243     proc2 = subprocess.Popen(['vlc', 
244         '--ffmpeg-threads=1',
245         '--sub-filter', 'marq', 
246         '--marq-marquee', 
247         '(c) copyright 2008, Blender Foundation / www.bigbuckbunny.org', 
248         '--marq-position=8', 
249         '--no-video-title-show', '-'], 
250         stdin=proc1.stdout, 
251         stdout=subprocess.PIPE, 
252         stderr=subprocess.PIPE)
253
254     (stdout, stderr) = proc2.communicate()
255
256     # shutdown the experiment controller
257     ec.shutdown()
258