Tunnel between 2 ns-3s in remote PL hosts:q
[nepi.git] / src / nepi / resources / linux / ns3 / p2pfdudptunnel.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19
20 from nepi.execution.attribute import Attribute, Flags, Types
21 from nepi.execution.resource import clsinit_copy, ResourceState
22 from nepi.resources.linux.udptunnel import LinuxUdpTunnel
23 from nepi.util.sshfuncs import ProcStatus
24 from nepi.util.timefuncs import tnow, tdiffsec
25
26 import base64
27 import os
28 import socket
29 import time
30
31 @clsinit_copy
32 class LinuxNs3FdUdpTunnel(LinuxUdpTunnel):
33     _rtype = "linux::ns3::FdUdpTunnel"
34     _help = "Constructs a tunnel between two Ns-3 FdNetdevices " \
35             "located in remote Linux nodes using a UDP connection "
36     _platform = "linux::ns3"
37
38     @classmethod
39     def _register_attributes(cls):
40         cipher = Attribute("cipher",
41                "Cipher to encript communication. "
42                 "One of PLAIN, AES, Blowfish, DES, DES3. ",
43                 default = None,
44                 allowed = ["PLAIN", "AES", "Blowfish", "DES", "DES3"],
45                 type = Types.Enumerate, 
46                 flags = Flags.Design)
47
48         cipher_key = Attribute("cipherKey",
49                 "Specify a symmetric encryption key with which to protect "
50                 "packets across the tunnel. python-crypto must be installed "
51                 "on the system." ,
52                 flags = Flags.Design)
53
54         txqueuelen = Attribute("txQueueLen",
55                 "Specifies the interface's transmission queue length. "
56                 "Defaults to 1000. ", 
57                 type = Types.Integer, 
58                 flags = Flags.Design)
59
60         bwlimit = Attribute("bwLimit",
61                 "Specifies the interface's emulated bandwidth in bytes "
62                 "per second.",
63                 type = Types.Integer, 
64                 flags = Flags.Design)
65
66         cls._register_attribute(cipher)
67         cls._register_attribute(cipher_key)
68         cls._register_attribute(txqueuelen)
69         cls._register_attribute(bwlimit)
70
71     def __init__(self, ec, guid):
72         super(LinuxUdpTunnel, self).__init__(ec, guid)
73         self._home = "fd-udp-tunnel-%s" % self.guid
74         self._pids = dict()
75         self._fd1 = None
76         self._fd1node = None
77         self._fd2 = None
78         self._fd2node = None
79
80     def log_message(self, msg):
81         self.get_endpoints()
82         return " guid %d - fd-udptunnel %s - %s - %s " % (self.guid, 
83                 self.node1.get("hostname"), 
84                 self.node2.get("hostname"), 
85                 msg)
86
87     def get_endpoints(self):
88         """ Returns the list of RM that are endpoints to the tunnel 
89         """
90         if not self._fd2 or not self._fd1:
91             from nepi.resources.ns3.ns3fdnetdevice import NS3BaseFdNetDevice
92             devices = self.get_connected(NS3BaseFdNetDevice.get_rtype())
93             if not devices or len(devices) != 2: 
94                 msg = "linux::ns3::TunTapFdLink must be connected to exactly one FdNetDevice"
95                 self.error(msg)
96                 raise RuntimeError, msg
97
98             self._fd1 = devices[0]
99             self._fd2 = devices[1]
100         
101             simu = self._fd1.simulation
102             from nepi.resources.linux.node import LinuxNode
103             nodes = simu.get_connected(LinuxNode.get_rtype())
104             self._fd1node = nodes[0]
105      
106             simu = self._fd2.simulation
107             from nepi.resources.linux.node import LinuxNode
108             nodes = simu.get_connected(LinuxNode.get_rtype())
109             self._fd2node = nodes[0]
110
111             if self._fd1node.get("hostname") == \
112                     self._fd2node.get("hostname"):
113                 msg = "linux::ns3::FdUdpTunnel requires endpoints on different hosts"
114                 self.error(msg)
115                 raise RuntimeError, msg
116
117         return [self._fd1, self._fd2]
118
119     @property
120     def endpoint1(self):
121         return self._fd1
122
123     @property
124     def endpoint2(self):
125         return self._fd2
126
127     @property
128     def node1(self):
129         return self._fd1node
130
131     @property
132     def node2(self):
133         return self._fd2node
134
135     def endpoint_node(self, endpoint):
136         node = None
137         if endpoint == self.endpoint1:
138             node = self.node1
139         else:
140             node = self.node2
141
142         return node
143  
144     def app_home(self, endpoint):
145         node = self.endpoint_node(endpoint)
146         return os.path.join(node.exp_home, self._home)
147
148     def run_home(self, endpoint):
149         return os.path.join(self.app_home(endpoint), self.ec.run_id)
150
151     def upload_sources(self, endpoint):
152         scripts = []
153
154         # vif-passfd python script
155         linux_passfd = os.path.join(os.path.dirname(__file__),
156                 "..",
157                 "scripts",
158                 "linux-ns3-fd-udp-connect.py")
159
160         scripts.append(linux_passfd)
161        
162         # tunnel creation python script
163         tunchannel = os.path.join(os.path.dirname(__file__), 
164                 "..", 
165                 "scripts", 
166                 "tunchannel.py")
167
168         scripts.append(tunchannel)
169
170         # Upload scripts
171         scripts = ";".join(scripts)
172
173         node = self.endpoint_node(endpoint)
174         node.upload(scripts,
175                 os.path.join(node.src_dir),
176                 overwrite = False)
177
178     def endpoint_mkdir(self, endpoint):
179         node = self.endpoint_node(endpoint) 
180         run_home = self.run_home(endpoint)
181         node.mkdir(run_home)
182
183     def initiate_connection(self, endpoint, remote_endpoint):
184         cipher = self.get("cipher")
185         cipher_key = self.get("cipherKey")
186         bwlimit = self.get("bwLimit")
187         txqueuelen = self.get("txQueueLen")
188
189         # Upload the tunnel creating script
190         self.upload_sources(endpoint)
191
192         # Request an address to send the file descriptor to the ns-3 simulation
193         address = endpoint.recv_fd()
194
195         # execute the tunnel creation script
196         node = self.endpoint_node(remote_endpoint) 
197         port = self.initiate(endpoint, remote_endpoint, address, cipher, 
198                 cipher_key, bwlimit, txqueuelen)
199
200         return port
201
202     def establish_connection(self, endpoint, remote_endpoint, port):
203         self.establish(endpoint, remote_endpoint, port)
204
205     def verify_connection(self, endpoint, remote_endpoint):
206         self.verify(endpoint)
207
208     def terminate_connection(self, endpoint, remote_endpoint):
209         # Nothing to do
210         return
211
212     def check_state_connection(self):
213         # Make sure the process is still running in background
214         # No execution errors occurred. Make sure the background
215         # process with the recorded pid is still running.
216
217         node1 = self.endpoint_node(self.endpoint1) 
218         node2 = self.endpoint_node(self.endpoint2) 
219         run_home1 = self.run_home(self.endpoint1)
220         run_home2 = self.run_home(self.endpoint1)
221         (pid1, ppid1) = self._pids[endpoint1]
222         (pid2, ppid2) = self._pids[endpoint2]
223         
224         status1 = node1.status(pid1, ppid1)
225         status2 = node2.status(pid2, ppid2)
226
227         if status1 == ProcStatus.FINISHED and \
228                 status2 == ProcStatus.FINISHED:
229
230             # check if execution errors occurred
231             (out1, err1), proc1 = node1.check_errors(run_home1)
232             (out2, err2), proc2 = node2.check_errors(run_home2)
233
234             if err1 or err2: 
235                 msg = "Error occurred in tunnel"
236                 self.error(msg, err1, err2)
237                 self.fail()
238             else:
239                 self.set_stopped()
240
241     def wait_local_port(self, endpoint):
242         """ Waits until the local_port file for the endpoint is generated, 
243         and returns the port number 
244         
245         """
246         return self.wait_file(endpoint, "local_port")
247
248     def wait_result(self, endpoint):
249         """ Waits until the return code file for the endpoint is generated 
250         
251         """ 
252         return self.wait_file(endpoint, "ret_file")
253  
254     def wait_file(self, endpoint, filename):
255         """ Waits until file on endpoint is generated """
256         result = None
257         delay = 1.0
258         
259         node = self.endpoint_node(endpoint) 
260         run_home = self.run_home(endpoint)
261
262         for i in xrange(20):
263             (out, err), proc = node.check_output(run_home, filename)
264
265             if out:
266                 result = out.strip()
267                 break
268             else:
269                 time.sleep(delay)
270                 delay = delay * 1.5
271         else:
272             msg = "Couldn't retrieve %s" % filename
273             self.error(msg, out, err)
274             raise RuntimeError, msg
275
276         return result
277
278     def initiate(self, endpoint, remote_endpoint, address, cipher, cipher_key, 
279             bwlimit, txqueuelen):
280
281         command = self._initiate_command(endpoint, remote_endpoint, 
282                 address, cipher, cipher_key, bwlimit, txqueuelen)
283
284         node = self.endpoint_node(endpoint) 
285         run_home = self.run_home(endpoint)
286         app_home = self.app_home(endpoint)
287
288         # upload command to connect.sh script
289         shfile = os.path.join(app_home, "fd-udp-connect.sh")
290         node.upload_command(command,
291                 shfile = shfile,
292                 overwrite = False)
293
294         # invoke connect script
295         cmd = "bash %s" % shfile
296         (out, err), proc = node.run(cmd, run_home) 
297              
298         # check if execution errors occurred
299         msg = "Failed to connect endpoints "
300         
301         if proc.poll():
302             self.error(msg, out, err)
303             raise RuntimeError, msg
304     
305         # Wait for pid file to be generated
306         pid, ppid = node.wait_pid(run_home)
307
308         self._pids[endpoint] = (pid, ppid)
309         
310         # Check for error information on the remote machine
311         (out, err), proc = node.check_errors(run_home)
312         # Out is what was written in the stderr file
313         if err:
314             msg = " Failed to start command '%s' " % command
315             self.error(msg, out, err)
316             raise RuntimeError, msg
317
318         port = self.wait_local_port(endpoint)
319
320         return port
321
322     def _initiate_command(self, endpoint, remote_endpoint, address,
323             cipher, cipher_key, bwlimit, txqueuelen):
324         local_node = self.endpoint_node(endpoint) 
325         local_run_home = self.run_home(endpoint)
326         local_app_home = self.app_home(endpoint)
327         remote_node = self.endpoint_node(remote_endpoint) 
328
329         local_ip = local_node.get("ip")
330         remote_ip = remote_node.get("ip")
331
332         local_port_file = os.path.join(local_run_home, "local_port")
333         remote_port_file = os.path.join(local_run_home,  "remote_port")
334         ret_file = os.path.join(local_run_home, "ret_file")
335
336         address = base64.b64encode(address)
337         
338         command = [""]
339         command.append("PYTHONPATH=$PYTHONPATH:${SRC}")
340         command.append("python ${SRC}/linux-ns3-fd-udp-connect.py")
341         command.append("-a %s" % address)
342         command.append("-p %s " % local_port_file)
343         command.append("-P %s " % remote_port_file)
344         command.append("-o %s " % local_ip)
345         command.append("-O %s " % remote_ip)
346         command.append("-R %s " % ret_file)
347         if cipher:
348             command.append("-c %s " % cipher)
349         if cipher_key:
350             command.append("-k %s " % cipher_key)
351         if txqueuelen:
352             command.append("-q %s " % txqueuelen)
353         if bwlimit:
354             command.append("-b %s " % bwlimit)
355
356         command = " ".join(command)
357         command = self.replace_paths(command, node=local_node, 
358                 app_home=local_app_home, run_home=local_run_home)
359
360         return command
361
362     def establish(self, endpoint, remote_endpoint, port):
363         node = self.endpoint_node(endpoint) 
364         run_home = self.run_home(endpoint)
365
366         # upload remote port number to file
367         remote_port = "%s\n" % port
368         node.upload(remote_port,
369                 os.path.join(run_home, "remote_port"),
370                 text = True, 
371                 overwrite = False)
372
373     def verify(self, endpoint):
374         self.wait_result(endpoint)
375