4 # NEPI, a framework to manage network experiments
5 # Copyright (C) 2013 INRIA
7 # This program is free software: you can redistribute it and/or modify
8 # it under the terms of the GNU General Public License as published by
9 # the Free Software Foundation, either version 3 of the License, or
10 # (at your option) any later version.
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
17 # You should have received a copy of the GNU General Public License
18 # along with this program. If not, see <http://www.gnu.org/licenses/>.
20 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
27 # content l1 / \ l2 ccncat
29 # 0 ----- h1 0 --- 0 h3 ------ 0
35 # - t0 : b2 retrives video published in b1
41 from nepi.execution.ec import ExperimentController, ECState
42 from nepi.execution.resource import ResourceState, ResourceAction, \
44 from nepi.execution.trace import TraceAttr
47 from optparse import OptionParser, SUPPRESS_HELP
52 def add_node(ec, host, user, ssh_key = None):
53 node = ec.register_resource("LinuxNode")
54 ec.set(node, "hostname", host)
55 ec.set(node, "username", user)
56 ec.set(node, "identity", ssh_key)
57 ec.set(node, "cleanHome", True)
58 ec.set(node, "cleanProcesses", True)
61 def add_ccnd(ec, node):
62 ccnd = ec.register_resource("LinuxCCND")
63 ec.set(ccnd, "debug", 7)
64 ec.register_connection(ccnd, node)
67 def add_ccnr(ec, ccnd):
68 ccnr = ec.register_resource("LinuxCCNR")
69 ec.register_connection(ccnr, ccnd)
72 def add_fib_entry(ec, ccnd, peer_host):
73 entry = ec.register_resource("LinuxFIBEntry")
74 ec.set(entry, "host", peer_host)
75 ec.register_connection(entry, ccnd)
78 def add_content(ec, ccnr, content_name, content):
79 co = ec.register_resource("LinuxCCNContent")
80 ec.set(co, "contentName", content_name)
81 ec.set(co, "content", content)
82 ec.register_connection(co, ccnr)
85 def add_stream(ec, ccnd, content_name):
86 # ccnx v7.2 issue 101007
87 command = "ccnpeek %(content_name)s; ccncat %(content_name)s" % {
88 "content_name" : content_name}
90 app = ec.register_resource("LinuxCCNApplication")
91 ec.set(app, "command", command)
92 ec.register_connection(app, ccnd)
96 def add_collector(ec, trace_name, store_dir):
97 collector = ec.register_resource("Collector")
98 ec.set(collector, "traceName", trace_name)
99 ec.set(collector, "storeDir", store_dir)
104 pl_slice = os.environ.get("PL_SLICE")
106 # We use a specific SSH private key for PL if the PL_SSHKEY is specified or the
107 # id_rsa_planetlab exists
108 default_key = "%s/.ssh/id_rsa_planetlab" % (os.environ['HOME'])
109 default_key = default_key if os.path.exists(default_key) else None
110 pl_ssh_key = os.environ.get("PL_SSHKEY", default_key)
112 usage = "usage: %prog -s <pl-user> -m <movie> -e <exp-id> -i <ssh_key> -r <results>"
114 parser = OptionParser(usage=usage)
115 parser.add_option("-s", "--pl-user", dest="pl_user",
116 help="PlanetLab slicename", default = pl_slice, type="str")
117 parser.add_option("-m", "--movie", dest="movie",
118 help="Stream movie", type="str")
119 parser.add_option("-e", "--exp-id", dest="exp_id",
120 help="Label to identify experiment", type="str")
121 parser.add_option("-i", "--pl-ssh-key", dest="pl_ssh_key",
122 help="Path to private SSH key to be used for connection",
123 default = pl_ssh_key, type="str")
124 parser.add_option("-r", "--results", dest="results", default = "/tmp",
125 help="Path to directory where to store results", type="str")
127 (options, args) = parser.parse_args()
129 if not options.movie:
130 parser.error("movie is a required argument")
132 return (options.pl_user, options.movie, options.exp_id, options.pl_ssh_key,
135 if __name__ == '__main__':
136 content_name = "ccnx:/test/VIDEO"
138 ( pl_user, movie, exp_id, pl_ssh_key, results_dir ) = get_options()
140 # Search for available RMs
143 ec = ExperimentController(exp_id = exp_id)
146 #host1 = "planetlab2.u-strasbg.fr"
147 #host2 = "planet1.servers.ua.pt"
148 #host3 = "planetlab1.cs.uoi.gr"
149 #host4 = "planetlab1.aston.ac.uk"
150 #host5 = "planetlab2.willab.fi"
151 #host6 = "planetlab-1.fokus.fraunhofer.de"
154 host1 = "planetlab4.wail.wisc.edu"
155 host2 = "planetlab2.cs.columbia.edu"
156 host3 = "ricepl-2.cs.rice.edu"
157 host4 = "node1.planetlab.albany.edu"
158 host5 = "earth.cs.brown.edu"
159 host6 = "planetlab2.engr.uconn.edu"
161 # describe nodes in the central ring
162 ring_hosts = [host1, host2, host3, host4]
165 for i in xrange(len(ring_hosts)):
167 node = add_node(ec, host, pl_user, pl_ssh_key)
168 ccnd = add_ccnd(ec, node)
169 ccnr = add_ccnr(ec, ccnd)
172 ## Add ccn ring links
173 # l1 : h1 - h2 , h2 - h1
174 l1u = add_fib_entry(ec, ccnds[host1], host2)
175 l1d = add_fib_entry(ec, ccnds[host2], host1)
177 # l2 : h2 - h3 , h3 - h2
178 l2u = add_fib_entry(ec, ccnds[host2], host3)
179 l2d = add_fib_entry(ec, ccnds[host3], host2)
181 # l3 : h3 - h4 , h4 - h3
182 l3u = add_fib_entry(ec, ccnds[host3], host4)
183 l3d = add_fib_entry(ec, ccnds[host4], host3)
185 # l4 : h4 - h1 , h1 - h4
186 l4u = add_fib_entry(ec, ccnds[host4], host1)
187 l4d = add_fib_entry(ec, ccnds[host1], host4)
189 # l5 : h1 - h3 , h3 - h1
190 l5u = add_fib_entry(ec, ccnds[host1], host3)
191 l5d = add_fib_entry(ec, ccnds[host3], host1)
194 bnode1 = add_node(ec, host5, pl_user, pl_ssh_key)
195 ccndb1 = add_ccnd(ec, bnode1)
196 ccnrb1 = add_ccnr(ec, ccndb1)
197 ccnds[host5] = ccndb1
198 co = add_content(ec, ccnrb1, content_name, movie)
201 bnode2 = add_node(ec, host6, pl_user, pl_ssh_key)
202 ccndb2 = add_ccnd(ec, bnode2)
203 ccnrb2 = add_ccnr(ec, ccndb2)
204 ccnds[host6] = ccndb2
205 app = add_stream(ec, ccndb2, content_name)
207 # connect border nodes
208 add_fib_entry(ec, ccndb1, host1)
209 add_fib_entry(ec, ccnds[host1], host5)
211 add_fib_entry(ec, ccndb2, host3)
212 add_fib_entry(ec, ccnds[host3], host6)
214 # Put down l5 10s after transfer started
215 ec.register_condition(l5u, ResourceAction.STOP,
216 app, ResourceState.STARTED, time = "10s")
217 ec.register_condition(l5d, ResourceAction.STOP,
218 app, ResourceState.STARTED, time = "10s")
220 # Register a collector to automatically collect traces
221 collector = add_collector(ec, "stderr", results_dir)
222 for ccnd in ccnds.values():
223 ec.register_connection(collector, ccnd)
225 # deploy all ResourceManagers
228 # Wait until ccncat has started retrieving the content
229 ec.wait_started([app])
231 rvideo_path = ec.trace(app, "stdout", attr = TraceAttr.PATH)
232 command = 'tail -f %s' % rvideo_path
234 # pulling the content of the video received
235 # on b2, to stream it locally
236 proc1 = subprocess.Popen(['ssh',
237 '-o', 'StrictHostKeyChecking=no',
238 '-l', pl_user, host6,
240 stdout = subprocess.PIPE,
241 stderr = subprocess.PIPE)
243 proc2 = subprocess.Popen(['vlc',
244 '--ffmpeg-threads=1',
245 '--sub-filter', 'marq',
247 '(c) copyright 2008, Blender Foundation / www.bigbuckbunny.org',
249 '--no-video-title-show', '-'],
251 stdout=subprocess.PIPE,
252 stderr=subprocess.PIPE)
254 (stdout, stderr) = proc2.communicate()
256 # shutdown the experiment controller