4 # NEPI, a framework to manage network experiments
5 # Copyright (C) 2013 INRIA
7 # This program is free software: you can redistribute it and/or modify
8 # it under the terms of the GNU General Public License version 2 as
9 # published by the Free Software Foundation;
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
26 # content l1 / \ l2 ccncat
28 # 0 ----- h1 0 --- 0 h3 ------ 0
34 # - t0 : b2 retrives video published in b1
40 from nepi.execution.ec import ExperimentController, ECState
41 from nepi.execution.resource import ResourceState, ResourceAction
42 from nepi.execution.trace import TraceAttr
45 from optparse import OptionParser, SUPPRESS_HELP
50 def add_node(ec, host, user, ssh_key = None):
51 node = ec.register_resource("linuxNode")
52 ec.set(node, "hostname", host)
53 ec.set(node, "username", user)
54 ec.set(node, "identity", ssh_key)
55 ec.set(node, "cleanExperiment", True)
56 ec.set(node, "cleanProcesses", True)
59 def add_ccnd(ec, node):
60 ccnd = ec.register_resource("linux::CCND")
61 ec.set(ccnd, "debug", 7)
62 ec.register_connection(ccnd, node)
65 def add_ccnr(ec, ccnd):
66 ccnr = ec.register_resource("linux::CCNR")
67 ec.register_connection(ccnr, ccnd)
70 def add_fib_entry(ec, ccnd, peer_host):
71 entry = ec.register_resource("linux::FIBEntry")
72 ec.set(entry, "host", peer_host)
73 ec.register_connection(entry, ccnd)
76 def add_content(ec, ccnr, content_name, content):
77 co = ec.register_resource("linux::CCNContent")
78 ec.set(co, "contentName", content_name)
79 ec.set(co, "content", content)
80 ec.register_connection(co, ccnr)
83 def add_stream(ec, ccnd, content_name):
84 # ccnx v7.2 issue 101007
85 command = "ccnpeek %(content_name)s; ccncat %(content_name)s" % {
86 "content_name" : content_name}
88 app = ec.register_resource("linux::CCNApplication")
89 ec.set(app, "command", command)
90 ec.register_connection(app, ccnd)
94 def add_collector(ec, trace_name):
95 collector = ec.register_resource("Collector")
96 ec.set(collector, "traceName", trace_name)
101 pl_slice = os.environ.get("PL_SLICE")
103 # We use a specific SSH private key for PL if the PL_SSHKEY is specified or the
104 # id_rsa_planetlab exists
105 default_key = "%s/.ssh/id_rsa_planetlab" % (os.environ['HOME'])
106 default_key = default_key if os.path.exists(default_key) else None
107 pl_ssh_key = os.environ.get("PL_SSHKEY", default_key)
109 usage = "usage: %prog -s <pl-user> -m <movie> -e <exp-id> -i <ssh_key> -r <results>"
111 parser = OptionParser(usage=usage)
112 parser.add_option("-s", "--pl-user", dest="pl_user",
113 help="PlanetLab slicename", default = pl_slice, type="str")
114 parser.add_option("-m", "--movie", dest="movie",
115 help="Stream movie", type="str")
116 parser.add_option("-e", "--exp-id", dest="exp_id",
117 help="Label to identify experiment", type="str")
118 parser.add_option("-i", "--pl-ssh-key", dest="pl_ssh_key",
119 help="Path to private SSH key to be used for connection",
120 default = pl_ssh_key, type="str")
121 parser.add_option("-r", "--results", dest="results", default = "/tmp",
122 help="Path to directory where to store results", type="str")
124 (options, args) = parser.parse_args()
126 if not options.movie:
127 parser.error("movie is a required argument")
129 return (options.pl_user, options.movie, options.exp_id, options.pl_ssh_key,
132 if __name__ == '__main__':
133 content_name = "ccnx:/test/VIDEO"
135 ( pl_user, movie, exp_id, pl_ssh_key, results_dir ) = get_options()
137 ec = ExperimentController(exp_id = exp_id, local_dir = results_dir)
140 #host1 = "planetlab4.wail.wisc.edu"
141 #host2 = "planetlab2.cs.columbia.edu"
142 #host3 = "ricepl-2.cs.rice.edu"
143 #host4 = "node1.planetlab.albany.edu"
144 #host5 = "earth.cs.brown.edu"
145 #host6 = "planetlab2.engr.uconn.edu"
148 host1 = "planetlab2.fct.ualg.pt"
149 host2 = "planet2.unipr.it"
150 host3 = "planetlab1.aston.ac.uk"
151 host4 = "itchy.comlab.bth.se"
152 host5 = "rochefort.infonet.fundp.ac.be"
153 host6 = "planetlab1.u-strasbg.fr"
155 # describe nodes in the central ring
156 ring_hosts = [host1, host2, host3, host4]
159 for i in range(len(ring_hosts)):
161 node = add_node(ec, host, pl_user, pl_ssh_key)
162 ccnd = add_ccnd(ec, node)
163 ccnr = add_ccnr(ec, ccnd)
166 ## Add ccn ring links
167 # l1 : h1 - h2 , h2 - h1
168 l1u = add_fib_entry(ec, ccnds[host1], host2)
169 l1d = add_fib_entry(ec, ccnds[host2], host1)
171 # l2 : h2 - h3 , h3 - h2
172 l2u = add_fib_entry(ec, ccnds[host2], host3)
173 l2d = add_fib_entry(ec, ccnds[host3], host2)
175 # l3 : h3 - h4 , h4 - h3
176 l3u = add_fib_entry(ec, ccnds[host3], host4)
177 l3d = add_fib_entry(ec, ccnds[host4], host3)
179 # l4 : h4 - h1 , h1 - h4
180 l4u = add_fib_entry(ec, ccnds[host4], host1)
181 l4d = add_fib_entry(ec, ccnds[host1], host4)
183 # l5 : h1 - h3 , h3 - h1
184 l5u = add_fib_entry(ec, ccnds[host1], host3)
185 l5d = add_fib_entry(ec, ccnds[host3], host1)
188 bnode1 = add_node(ec, host5, pl_user, pl_ssh_key)
189 ccndb1 = add_ccnd(ec, bnode1)
190 ccnrb1 = add_ccnr(ec, ccndb1)
191 ccnds[host5] = ccndb1
192 co = add_content(ec, ccnrb1, content_name, movie)
195 bnode2 = add_node(ec, host6, pl_user, pl_ssh_key)
196 ccndb2 = add_ccnd(ec, bnode2)
197 ccnrb2 = add_ccnr(ec, ccndb2)
198 ccnds[host6] = ccndb2
199 app = add_stream(ec, ccndb2, content_name)
201 # connect border nodes
202 add_fib_entry(ec, ccndb1, host1)
203 add_fib_entry(ec, ccnds[host1], host5)
205 add_fib_entry(ec, ccndb2, host3)
206 add_fib_entry(ec, ccnds[host3], host6)
208 # Put down l5 10s after transfer started
209 ec.register_condition(l5u, ResourceAction.STOP,
210 app, ResourceState.STARTED, time = "10s")
211 ec.register_condition(l5d, ResourceAction.STOP,
212 app, ResourceState.STARTED, time = "10s")
214 # Register a collector to automatically collect traces
215 collector = add_collector(ec, "stderr")
217 ec.register_connection(collector, ccnd)
219 # deploy all ResourceManagers
222 # Wait until ccncat has started retrieving the content
223 ec.wait_started([app])
225 rvideo_path = ec.trace(app, "stdout", attr = TraceAttr.PATH)
226 command = 'tail -f %s' % rvideo_path
228 # pulling the content of the video received
229 # on b2, to stream it locally
230 proc1 = subprocess.Popen(['ssh',
231 '-o', 'StrictHostKeyChecking=no',
232 '-l', pl_user, host6,
234 stdout = subprocess.PIPE,
235 stderr = subprocess.PIPE)
237 proc2 = subprocess.Popen(['vlc',
238 '--ffmpeg-threads=1',
239 '--sub-filter', 'marq',
241 '(c) copyright 2008, Blender Foundation / www.bigbuckbunny.org',
243 '--no-video-title-show', '-'],
245 stdout=subprocess.PIPE,
246 stderr=subprocess.PIPE)
248 (stdout, stderr) = proc2.communicate()
250 # shutdown the experiment controller