Tunnel between 2 ns-3s in remote PL hosts:q
[nepi.git] / src / nepi / resources / planetlab / ns3 / p2pfdudptunnel.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19
20 from nepi.execution.attribute import Attribute, Flags, Types
21 from nepi.execution.resource import clsinit_copy, ResourceState
22 from nepi.resources.linux.ns3.p2pfdudptunnel import LinuxNs3P2PFdUdpTunnel
23 from nepi.util.sshfuncs import ProcStatus
24 from nepi.util.timefuncs import tnow, tdiffsec
25
26 import base64
27 import os
28 import socket
29 import time
30
31 @clsinit_copy
32 class PlanetlabNs3P2PFdUdpTunnel(LinuxUdpTunnel):
33     _rtype = "planetlab::ns3::P2PFdUdpTunnel"
34     _help = "Constructs a tunnel between two Ns-3 FdNetdevices " \
35             "located in remote PlanetLab nodes using a UDP connection "
36     _platform = "planetlab::ns3"
37
38     @classmethod
39     def _register_attributes(cls):
40         cipher = Attribute("cipher",
41                "Cipher to encript communication. "
42                 "One of PLAIN, AES, Blowfish, DES, DES3. ",
43                 default = None,
44                 allowed = ["PLAIN", "AES", "Blowfish", "DES", "DES3"],
45                 type = Types.Enumerate, 
46                 flags = Flags.Design)
47
48         cipher_key = Attribute("cipherKey",
49                 "Specify a symmetric encryption key with which to protect "
50                 "packets across the tunnel. python-crypto must be installed "
51                 "on the system." ,
52                 flags = Flags.Design)
53
54         txqueuelen = Attribute("txQueueLen",
55                 "Specifies the interface's transmission queue length. "
56                 "Defaults to 1000. ", 
57                 type = Types.Integer, 
58                 flags = Flags.Design)
59
60         bwlimit = Attribute("bwLimit",
61                 "Specifies the interface's emulated bandwidth in bytes "
62                 "per second.",
63                 type = Types.Integer, 
64                 flags = Flags.Design)
65
66         cls._register_attribute(cipher)
67         cls._register_attribute(cipher_key)
68         cls._register_attribute(txqueuelen)
69         cls._register_attribute(bwlimit)
70
71     def __init__(self, ec, guid):
72         super(PlanetlabNs3P2PFdUdpTunnel, self).__init__(ec, guid)
73         self._home = "p2p-fd-udp-tunnel-%s" % self.guid
74         self._pids = dict()
75         self._fd1 = None
76         self._fd1node = None
77         self._fd2 = None
78         self._fd2node = None
79
80     def log_message(self, msg):
81         self.get_endpoints()
82         return " guid %d - PlanetlabNs3P2PFdUdpTunnel - %s - %s - %s " % (self.guid, 
83                 self.node1.get("hostname"), 
84                 self.node2.get("hostname"), 
85                 msg)
86
87     def get_endpoints(self):
88         """ Returns the list of RM that are endpoints to the tunnel 
89         """
90         if not self._fd2 or not self._fd1:
91             from nepi.resources.ns3.ns3fdnetdevice import NS3BaseFdNetDevice
92             devices = self.get_connected(NS3BaseFdNetDevice.get_rtype())
93             if not devices or len(devices) != 2: 
94                 msg = "Tunnel must be connected to exactly two FdNetDevices"
95                 self.error(msg)
96                 raise RuntimeError, msg
97
98             self._fd1 = devices[0]
99             self._fd2 = devices[1]
100  
101             # Set PI headers on
102             self._fd1.set("EncapsulationMode", "DixPi")
103             self._fd2.set("EncapsulationMode", "DixPi")
104         
105             simu = self._fd1.simulation
106             from nepi.resources.linux.node import LinuxNode
107             nodes = simu.get_connected(LinuxNode.get_rtype())
108             self._fd1node = nodes[0]
109      
110             simu = self._fd2.simulation
111             from nepi.resources.linux.node import LinuxNode
112             nodes = simu.get_connected(LinuxNode.get_rtype())
113             self._fd2node = nodes[0]
114
115             if self._fd1node.get("hostname") == \
116                     self._fd2node.get("hostname"):
117                 msg = "Tunnel requires endpoints on different hosts"
118                 self.error(msg)
119                 raise RuntimeError, msg
120
121         return [self._fd1, self._fd2]
122
123     @property
124     def endpoint1(self):
125         return self._fd1
126
127     @property
128     def endpoint2(self):
129         return self._fd2
130
131     @property
132     def node1(self):
133         return self._fd1node
134
135     @property
136     def node2(self):
137         return self._fd2node
138
139     def endpoint_node(self, endpoint):
140         node = None
141         if endpoint == self.endpoint1:
142             node = self.node1
143         else:
144             node = self.node2
145
146         return node
147  
148     def app_home(self, endpoint):
149         node = self.endpoint_node(endpoint)
150         return os.path.join(node.exp_home, self._home)
151
152     def run_home(self, endpoint):
153         return os.path.join(self.app_home(endpoint), self.ec.run_id)
154
155     def upload_sources(self, endpoint):
156         scripts = []
157
158         # vif-passfd python script
159         fd_udp_connect = os.path.join(os.path.dirname(__file__),
160                 "..",
161                 "scripts",
162                 "pl-fd-udp-connect.py")
163
164         scripts.append(fd_udp_connect)
165        
166         # tunnel creation python script
167         tunchannel = os.path.join(os.path.dirname(__file__), 
168                 "..", "..", "linux", 
169                 "scripts", 
170                 "tunchannel.py")
171
172         scripts.append(tunchannel)
173
174         # Upload scripts
175         scripts = ";".join(scripts)
176
177         node = self.endpoint_node(endpoint)
178         node.upload(scripts,
179                 os.path.join(node.src_dir),
180                 overwrite = False)
181
182     def endpoint_mkdir(self, endpoint):
183         node = self.endpoint_node(endpoint) 
184         run_home = self.run_home(endpoint)
185         node.mkdir(run_home)
186
187     def initiate_connection(self, endpoint, remote_endpoint):
188         cipher = self.get("cipher")
189         cipher_key = self.get("cipherKey")
190         bwlimit = self.get("bwLimit")
191         txqueuelen = self.get("txQueueLen")
192
193         # Upload the tunnel creating script
194         self.upload_sources(endpoint)
195
196         # Request an address to send the file descriptor to the ns-3 simulation
197         address = endpoint.recv_fd()
198
199         # execute the tunnel creation script
200         node = self.endpoint_node(remote_endpoint) 
201         port = self.initiate(endpoint, remote_endpoint, address, cipher, 
202                 cipher_key, bwlimit, txqueuelen)
203
204         return port
205
206     def establish_connection(self, endpoint, remote_endpoint, port):
207         self.establish(endpoint, remote_endpoint, port)
208
209     def verify_connection(self, endpoint, remote_endpoint):
210         self.verify(endpoint)
211
212     def terminate_connection(self, endpoint, remote_endpoint):
213         # Nothing to do
214         return
215
216     def check_state_connection(self):
217         # Make sure the process is still running in background
218         # No execution errors occurred. Make sure the background
219         # process with the recorded pid is still running.
220
221         node1 = self.endpoint_node(self.endpoint1) 
222         node2 = self.endpoint_node(self.endpoint2) 
223         run_home1 = self.run_home(self.endpoint1)
224         run_home2 = self.run_home(self.endpoint1)
225         (pid1, ppid1) = self._pids[endpoint1]
226         (pid2, ppid2) = self._pids[endpoint2]
227         
228         status1 = node1.status(pid1, ppid1)
229         status2 = node2.status(pid2, ppid2)
230
231         if status1 == ProcStatus.FINISHED and \
232                 status2 == ProcStatus.FINISHED:
233
234             # check if execution errors occurred
235             (out1, err1), proc1 = node1.check_errors(run_home1)
236             (out2, err2), proc2 = node2.check_errors(run_home2)
237
238             if err1 or err2: 
239                 msg = "Error occurred in tunnel"
240                 self.error(msg, err1, err2)
241                 self.fail()
242             else:
243                 self.set_stopped()
244
245     def wait_local_port(self, endpoint):
246         """ Waits until the local_port file for the endpoint is generated, 
247         and returns the port number 
248         
249         """
250         return self.wait_file(endpoint, "local_port")
251
252     def wait_result(self, endpoint):
253         """ Waits until the return code file for the endpoint is generated 
254         
255         """ 
256         return self.wait_file(endpoint, "ret_file")
257  
258     def wait_file(self, endpoint, filename):
259         """ Waits until file on endpoint is generated """
260         result = None
261         delay = 1.0
262         
263         node = self.endpoint_node(endpoint) 
264         run_home = self.run_home(endpoint)
265
266         for i in xrange(20):
267             (out, err), proc = node.check_output(run_home, filename)
268
269             if out:
270                 result = out.strip()
271                 break
272             else:
273                 time.sleep(delay)
274                 delay = delay * 1.5
275         else:
276             msg = "Couldn't retrieve %s" % filename
277             self.error(msg, out, err)
278             raise RuntimeError, msg
279
280         return result
281
282     def initiate(self, endpoint, remote_endpoint, address, cipher, cipher_key, 
283             bwlimit, txqueuelen):
284
285         command = self._initiate_command(endpoint, remote_endpoint, 
286                 address, cipher, cipher_key, bwlimit, txqueuelen)
287
288         node = self.endpoint_node(endpoint) 
289         run_home = self.run_home(endpoint)
290         app_home = self.app_home(endpoint)
291
292         # upload command to connect.sh script
293         shfile = os.path.join(app_home, "fd-udp-connect.sh")
294         node.upload_command(command,
295                 shfile = shfile,
296                 overwrite = False)
297
298         # invoke connect script
299         cmd = "bash %s" % shfile
300         (out, err), proc = node.run(cmd, run_home) 
301              
302         # check if execution errors occurred
303         msg = "Failed to connect endpoints "
304         
305         if proc.poll():
306             self.error(msg, out, err)
307             raise RuntimeError, msg
308     
309         # Wait for pid file to be generated
310         pid, ppid = node.wait_pid(run_home)
311
312         self._pids[endpoint] = (pid, ppid)
313         
314         # Check for error information on the remote machine
315         (out, err), proc = node.check_errors(run_home)
316         # Out is what was written in the stderr file
317         if err:
318             msg = " Failed to start command '%s' " % command
319             self.error(msg, out, err)
320             raise RuntimeError, msg
321
322         port = self.wait_local_port(endpoint)
323
324         return port
325
326     def _initiate_command(self, endpoint, remote_endpoint, address,
327             cipher, cipher_key, bwlimit, txqueuelen):
328         local_node = self.endpoint_node(endpoint) 
329         local_run_home = self.run_home(endpoint)
330         local_app_home = self.app_home(endpoint)
331         remote_node = self.endpoint_node(remote_endpoint) 
332
333         local_ip = local_node.get("ip")
334         remote_ip = remote_node.get("ip")
335
336         local_port_file = os.path.join(local_run_home, "local_port")
337         remote_port_file = os.path.join(local_run_home,  "remote_port")
338         ret_file = os.path.join(local_run_home, "ret_file")
339
340         address = base64.b64encode(address)
341         
342         command = [""]
343         command.append("PYTHONPATH=$PYTHONPATH:${SRC}")
344         command.append("python ${SRC}/pl-fd-udp-connect.py")
345         command.append("-a %s" % address)
346         command.append("-p %s " % local_port_file)
347         command.append("-P %s " % remote_port_file)
348         command.append("-o %s " % local_ip)
349         command.append("-O %s " % remote_ip)
350         command.append("-R %s " % ret_file)
351         if cipher:
352             command.append("-c %s " % cipher)
353         if cipher_key:
354             command.append("-k %s " % cipher_key)
355         if txqueuelen:
356             command.append("-q %s " % txqueuelen)
357         if bwlimit:
358             command.append("-b %s " % bwlimit)
359
360         command = " ".join(command)
361         command = self.replace_paths(command, node=local_node, 
362                 app_home=local_app_home, run_home=local_run_home)
363
364         return command
365
366     def establish(self, endpoint, remote_endpoint, port):
367         node = self.endpoint_node(endpoint) 
368         run_home = self.run_home(endpoint)
369
370         # upload remote port number to file
371         remote_port = "%s\n" % port
372         node.upload(remote_port,
373                 os.path.join(run_home, "remote_port"),
374                 text = True, 
375                 overwrite = False)
376
377     def verify(self, endpoint):
378         self.wait_result(endpoint)
379