Fix #128 - [NS3] Test ns-3 with localhost
[nepi.git] / src / nepi / util / sshfuncs.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19 #         Claudio Freire <claudio-daniel.freire@inria.fr>
20
21 ## TODO: This code needs reviewing !!!
22
23 import base64
24 import errno
25 import hashlib
26 import logging
27 import os
28 import os.path
29 import re
30 import select
31 import signal
32 import socket
33 import subprocess
34 import threading
35 import time
36 import tempfile
37
38 logger = logging.getLogger("sshfuncs")
39
40 def log(msg, level, out = None, err = None):
41     if out:
42         msg += " - OUT: %s " % out
43
44     if err:
45         msg += " - ERROR: %s " % err
46
47     logger.log(level, msg)
48
49 if hasattr(os, "devnull"):
50     DEV_NULL = os.devnull
51 else:
52     DEV_NULL = "/dev/null"
53
54 SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
55
56 class STDOUT: 
57     """
58     Special value that when given to rspawn in stderr causes stderr to 
59     redirect to whatever stdout was redirected to.
60     """
61
62 class ProcStatus:
63     """
64     Codes for status of remote spawned process
65     """
66     # Process is still running
67     RUNNING = 1
68
69     # Process is finished
70     FINISHED = 2
71     
72     # Process hasn't started running yet (this should be very rare)
73     NOT_STARTED = 3
74
75 hostbyname_cache = dict()
76 hostbyname_cache_lock = threading.Lock()
77
78 def gethostbyname(host):
79     global hostbyname_cache
80     global hostbyname_cache_lock
81     
82     hostbyname = hostbyname_cache.get(host)
83     if not hostbyname:
84         with hostbyname_cache_lock:
85             hostbyname = socket.gethostbyname(host)
86             hostbyname_cache[host] = hostbyname
87
88             msg = " Added hostbyname %s - %s " % (host, hostbyname)
89             log(msg, logging.DEBUG)
90
91     return hostbyname
92
93 OPENSSH_HAS_PERSIST = None
94
95 def openssh_has_persist():
96     """ The ssh_config options ControlMaster and ControlPersist allow to
97     reuse a same network connection for multiple ssh sessions. In this 
98     way limitations on number of open ssh connections can be bypassed.
99     However, older versions of openSSH do not support this feature.
100     This function is used to determine if ssh connection persist features
101     can be used.
102     """
103     global OPENSSH_HAS_PERSIST
104     if OPENSSH_HAS_PERSIST is None:
105         proc = subprocess.Popen(["ssh","-v"],
106             stdout = subprocess.PIPE,
107             stderr = subprocess.STDOUT,
108             stdin = open("/dev/null","r") )
109         out,err = proc.communicate()
110         proc.wait()
111         
112         vre = re.compile(r'OpenSSH_(?:[6-9]|5[.][8-9]|5[.][1-9][0-9]|[1-9][0-9]).*', re.I)
113         OPENSSH_HAS_PERSIST = bool(vre.match(out))
114     return OPENSSH_HAS_PERSIST
115
116 def make_server_key_args(server_key, host, port):
117     """ Returns a reference to a temporary known_hosts file, to which 
118     the server key has been added. 
119     
120     Make sure to hold onto the temp file reference until the process is 
121     done with it
122
123     :param server_key: the server public key
124     :type server_key: str
125
126     :param host: the hostname
127     :type host: str
128
129     :param port: the ssh port
130     :type port: str
131
132     """
133     if port is not None:
134         host = '%s:%s' % (host, str(port))
135
136     # Create a temporary server key file
137     tmp_known_hosts = tempfile.NamedTemporaryFile()
138    
139     hostbyname = gethostbyname(host) 
140
141     # Add the intended host key
142     tmp_known_hosts.write('%s,%s %s\n' % (host, hostbyname, server_key))
143     
144     # If we're not in strict mode, add user-configured keys
145     if os.environ.get('NEPI_STRICT_AUTH_MODE',"").lower() not in ('1','true','on'):
146         user_hosts_path = '%s/.ssh/known_hosts' % (os.environ.get('HOME',""),)
147         if os.access(user_hosts_path, os.R_OK):
148             f = open(user_hosts_path, "r")
149             tmp_known_hosts.write(f.read())
150             f.close()
151         
152     tmp_known_hosts.flush()
153     
154     return tmp_known_hosts
155
156 def make_control_path(agent, forward_x11):
157     ctrl_path = "/tmp/nepi_ssh"
158
159     if agent:
160         ctrl_path +="_a"
161
162     if forward_x11:
163         ctrl_path +="_x"
164
165     ctrl_path += "-%r@%h:%p"
166
167     return ctrl_path
168
169 def shell_escape(s):
170     """ Escapes strings so that they are safe to use as command-line 
171     arguments """
172     if SHELL_SAFE.match(s):
173         # safe string - no escaping needed
174         return s
175     else:
176         # unsafe string - escape
177         def escp(c):
178             if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",'"'):
179                 return c
180             else:
181                 return "'$'\\x%02x''" % (ord(c),)
182         s = ''.join(map(escp,s))
183         return "'%s'" % (s,)
184
185 def eintr_retry(func):
186     """Retries a function invocation when a EINTR occurs"""
187     import functools
188     @functools.wraps(func)
189     def rv(*p, **kw):
190         retry = kw.pop("_retry", False)
191         for i in xrange(0 if retry else 4):
192             try:
193                 return func(*p, **kw)
194             except (select.error, socket.error), args:
195                 if args[0] == errno.EINTR:
196                     continue
197                 else:
198                     raise 
199             except OSError, e:
200                 if e.errno == errno.EINTR:
201                     continue
202                 else:
203                     raise
204         else:
205             return func(*p, **kw)
206     return rv
207
208 def rexec(command, host, user, 
209         port = None,
210         gwuser = None,
211         gw = None, 
212         agent = True,
213         sudo = False,
214         identity = None,
215         server_key = None,
216         env = None,
217         tty = False,
218         connect_timeout = 30,
219         retry = 3,
220         persistent = True,
221         forward_x11 = False,
222         blocking = True,
223         strict_host_checking = True):
224     """
225     Executes a remote command, returns ((stdout,stderr),process)
226     """
227     
228     tmp_known_hosts = None
229     if not gw:
230         hostip = gethostbyname(host)
231     else: hostip = None
232
233     args = ['ssh', '-C',
234             # Don't bother with localhost. Makes test easier
235             '-o', 'NoHostAuthenticationForLocalhost=yes',
236             '-o', 'ConnectTimeout=%d' % (int(connect_timeout),),
237             '-o', 'ConnectionAttempts=3',
238             '-o', 'ServerAliveInterval=30',
239             '-o', 'TCPKeepAlive=yes',
240             '-o', 'Batchmode=yes',
241             '-l', user, hostip or host]
242
243     if persistent and openssh_has_persist():
244         args.extend([
245             '-o', 'ControlMaster=auto',
246             '-o', 'ControlPath=%s' % (make_control_path(agent, forward_x11),),
247             '-o', 'ControlPersist=60' ])
248
249     if not strict_host_checking:
250         # Do not check for Host key. Unsafe.
251         args.extend(['-o', 'StrictHostKeyChecking=no'])
252
253     if gw:
254         if gwuser:
255             proxycommand = 'ProxyCommand=ssh %s@%s -W %%h:%%p' % (gwuser, gw)
256         else:
257             proxycommand = 'ProxyCommand=ssh %%r@%s -W %%h:%%p' % gw
258         args.extend(['-o', proxycommand])
259
260     if agent:
261         args.append('-A')
262
263     if port:
264         args.append('-p%d' % port)
265
266     if identity:
267         identity = os.path.expanduser(identity)
268         args.extend(('-i', identity))
269
270     if tty:
271         args.append('-t')
272         args.append('-t')
273
274     if forward_x11:
275         args.append('-X')
276
277     if server_key:
278         # Create a temporary server key file
279         tmp_known_hosts = make_server_key_args(server_key, host, port)
280         args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
281
282     if sudo:
283         command = "sudo " + command
284
285     args.append(command)
286     
287     log_msg = " rexec - host %s - command %s " % (host, " ".join(args))
288
289     stdout = stderr = stdin = subprocess.PIPE
290     if forward_x11:
291         stdout = stderr = stdin = None
292
293     return _retry_rexec(args, log_msg, 
294             stderr = stderr,
295             stdin = stdin,
296             stdout = stdout,
297             env = env, 
298             retry = retry, 
299             tmp_known_hosts = tmp_known_hosts,
300             blocking = blocking)
301
302 def rcopy(source, dest,
303         port = None,
304         gwuser = None,
305         gw = None,
306         recursive = False,
307         identity = None,
308         server_key = None,
309         retry = 3,
310         strict_host_checking = True):
311     """
312     Copies from/to remote sites.
313     
314     Source and destination should have the user and host encoded
315     as per scp specs.
316     
317     Source can be a list of files to copy to a single destination, 
318     (in which case it is advised that the destination be a folder),
319     a single file in a string or a semi-colon separated list of files
320     in a string.
321     """
322
323     # Parse destination as <user>@<server>:<path>
324     if isinstance(dest, str) and ':' in dest:
325         remspec, path = dest.split(':',1)
326     elif isinstance(source, str) and ':' in source:
327         remspec, path = source.split(':',1)
328     else:
329         raise ValueError, "Both endpoints cannot be local"
330     user,host = remspec.rsplit('@',1)
331     
332     # plain scp
333     tmp_known_hosts = None
334
335     args = ['scp', '-q', '-p', '-C',
336             # Speed up transfer using blowfish cypher specification which is 
337             # faster than the default one (3des)
338             '-c', 'blowfish',
339             # Don't bother with localhost. Makes test easier
340             '-o', 'NoHostAuthenticationForLocalhost=yes',
341             '-o', 'ConnectTimeout=60',
342             '-o', 'ConnectionAttempts=3',
343             '-o', 'ServerAliveInterval=30',
344             '-o', 'TCPKeepAlive=yes' ]
345             
346     if port:
347         args.append('-P%d' % port)
348
349     if gw:
350         if gwuser:
351             proxycommand = 'ProxyCommand=ssh %s@%s -W %%h:%%p' % (gwuser, gw)
352         else:
353             proxycommand = 'ProxyCommand=ssh %%r@%s -W %%h:%%p' % gw
354         args.extend(['-o', proxycommand])
355
356     if recursive:
357         args.append('-r')
358
359     if identity:
360         identity = os.path.expanduser(identity)
361         args.extend(('-i', identity))
362
363     if server_key:
364         # Create a temporary server key file
365         tmp_known_hosts = make_server_key_args(server_key, host, port)
366         args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
367
368     if not strict_host_checking:
369         # Do not check for Host key. Unsafe.
370         args.extend(['-o', 'StrictHostKeyChecking=no'])
371     
372     if isinstance(source, list):
373         args.extend(source)
374     else:
375         if openssh_has_persist():
376             args.extend([
377                 '-o', 'ControlMaster=auto',
378                 '-o', 'ControlPath=%s' % (make_control_path(False, False),)
379                 ])
380         args.append(source)
381
382     if isinstance(dest, list):
383         args.extend(dest)
384     else:
385         args.append(dest)
386
387     log_msg = " rcopy - host %s - command %s " % (host, " ".join(args))
388     
389     return _retry_rexec(args, log_msg, env = None, retry = retry, 
390             tmp_known_hosts = tmp_known_hosts,
391             blocking = True)
392
393 def rspawn(command, pidfile, 
394         stdout = '/dev/null', 
395         stderr = STDOUT, 
396         stdin = '/dev/null',
397         home = None, 
398         create_home = False, 
399         sudo = False,
400         host = None, 
401         port = None, 
402         user = None, 
403         gwuser = None,
404         gw = None,
405         agent = None, 
406         identity = None, 
407         server_key = None,
408         tty = False):
409     """
410     Spawn a remote command such that it will continue working asynchronously in 
411     background. 
412
413         :param command: The command to run, it should be a single line.
414         :type command: str
415
416         :param pidfile: Path to a file where to store the pid and ppid of the 
417                         spawned process
418         :type pidfile: str
419
420         :param stdout: Path to file to redirect standard output. 
421                        The default value is /dev/null
422         :type stdout: str
423
424         :param stderr: Path to file to redirect standard error.
425                        If the special STDOUT value is used, stderr will 
426                        be redirected to the same file as stdout
427         :type stderr: str
428
429         :param stdin: Path to a file with input to be piped into the command's standard input
430         :type stdin: str
431
432         :param home: Path to working directory folder. 
433                     It is assumed to exist unless the create_home flag is set.
434         :type home: str
435
436         :param create_home: Flag to force creation of the home folder before 
437                             running the command
438         :type create_home: bool
439  
440         :param sudo: Flag forcing execution with sudo user
441         :type sudo: bool
442         
443         :rtype: touple
444
445         (stdout, stderr), process
446         
447         Of the spawning process, which only captures errors at spawning time.
448         Usually only useful for diagnostics.
449     """
450     # Start process in a "daemonized" way, using nohup and heavy
451     # stdin/out redirection to avoid connection issues
452     if stderr is STDOUT:
453         stderr = '&1'
454     else:
455         stderr = ' ' + stderr
456     
457     daemon_command = '{ { %(command)s > %(stdout)s 2>%(stderr)s < %(stdin)s & } ; echo $! 1 > %(pidfile)s ; }' % {
458         'command' : command,
459         'pidfile' : shell_escape(pidfile),
460         'stdout' : stdout,
461         'stderr' : stderr,
462         'stdin' : stdin,
463     }
464     
465     cmd = "%(create)s%(gohome)s rm -f %(pidfile)s ; %(sudo)s nohup bash -c %(command)s " % {
466             'command' : shell_escape(daemon_command),
467             'sudo' : 'sudo -S' if sudo else '',
468             'pidfile' : shell_escape(pidfile),
469             'gohome' : 'cd %s ; ' % (shell_escape(home),) if home else '',
470             'create' : 'mkdir -p %s ; ' % (shell_escape(home),) if create_home and home else '',
471         }
472
473     (out,err),proc = rexec(
474         cmd,
475         host = host,
476         port = port,
477         user = user,
478         gwuser = gwuser,
479         gw = gw,
480         agent = agent,
481         identity = identity,
482         server_key = server_key,
483         tty = tty ,
484         )
485     
486     if proc.wait():
487         raise RuntimeError, "Failed to set up application on host %s: %s %s" % (host, out,err,)
488
489     return ((out, err), proc)
490
491 @eintr_retry
492 def rgetpid(pidfile,
493         host = None, 
494         port = None, 
495         user = None, 
496         gwuser = None,
497         gw = None,
498         agent = None, 
499         identity = None,
500         server_key = None):
501     """
502     Returns the pid and ppid of a process from a remote file where the 
503     information was stored.
504
505         :param home: Path to directory where the pidfile is located
506         :type home: str
507
508         :param pidfile: Name of file containing the pid information
509         :type pidfile: str
510         
511         :rtype: int
512         
513         A (pid, ppid) tuple useful for calling rstatus and rkill,
514         or None if the pidfile isn't valid yet (can happen when process is staring up)
515
516     """
517     (out,err),proc = rexec(
518         "cat %(pidfile)s" % {
519             'pidfile' : pidfile,
520         },
521         host = host,
522         port = port,
523         user = user,
524         gwuser = gwuser,
525         gw = gw,
526         agent = agent,
527         identity = identity,
528         server_key = server_key
529         )
530         
531     if proc.wait():
532         return None
533     
534     if out:
535         try:
536             return map(int,out.strip().split(' ',1))
537         except:
538             # Ignore, many ways to fail that don't matter that much
539             return None
540
541 @eintr_retry
542 def rstatus(pid, ppid, 
543         host = None, 
544         port = None, 
545         user = None, 
546         gwuser = None,
547         gw = None,
548         agent = None, 
549         identity = None,
550         server_key = None):
551     """
552     Returns a code representing the the status of a remote process
553
554         :param pid: Process id of the process
555         :type pid: int
556
557         :param ppid: Parent process id of process
558         :type ppid: int
559     
560         :rtype: int (One of NOT_STARTED, RUNNING, FINISHED)
561     
562     """
563     (out,err),proc = rexec(
564         # Check only by pid. pid+ppid does not always work (especially with sudo) 
565         " (( ps --pid %(pid)d -o pid | grep -c %(pid)d && echo 'wait')  || echo 'done' ) | tail -n 1" % {
566             'ppid' : ppid,
567             'pid' : pid,
568         },
569         host = host,
570         port = port,
571         user = user,
572         gwuser = gwuser,
573         gw = gw,
574         agent = agent,
575         identity = identity,
576         server_key = server_key
577         )
578     
579     if proc.wait():
580         return ProcStatus.NOT_STARTED
581     
582     status = False
583     if err:
584         if err.strip().find("Error, do this: mount -t proc none /proc") >= 0:
585             status = True
586     elif out:
587         status = (out.strip() == 'wait')
588     else:
589         return ProcStatus.NOT_STARTED
590     return ProcStatus.RUNNING if status else ProcStatus.FINISHED
591
592 @eintr_retry
593 def rkill(pid, ppid,
594         host = None, 
595         port = None, 
596         user = None, 
597         gwuser = None,
598         gw = None,
599         agent = None, 
600         sudo = False,
601         identity = None, 
602         server_key = None, 
603         nowait = False):
604     """
605     Sends a kill signal to a remote process.
606
607     First tries a SIGTERM, and if the process does not end in 10 seconds,
608     it sends a SIGKILL.
609  
610         :param pid: Process id of process to be killed
611         :type pid: int
612
613         :param ppid: Parent process id of process to be killed
614         :type ppid: int
615
616         :param sudo: Flag indicating if sudo should be used to kill the process
617         :type sudo: bool
618         
619     """
620     subkill = "$(ps --ppid %(pid)d -o pid h)" % { 'pid' : pid }
621     cmd = """
622 SUBKILL="%(subkill)s" ;
623 %(sudo)s kill -- -%(pid)d $SUBKILL || /bin/true
624 %(sudo)s kill %(pid)d $SUBKILL || /bin/true
625 for x in 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 ; do 
626     sleep 0.2 
627     if [ `ps --pid %(pid)d -o pid | grep -c %(pid)d` == '0' ]; then
628         break
629     else
630         %(sudo)s kill -- -%(pid)d $SUBKILL || /bin/true
631         %(sudo)s kill %(pid)d $SUBKILL || /bin/true
632     fi
633     sleep 1.8
634 done
635 if [ `ps --pid %(pid)d -o pid | grep -c %(pid)d` != '0' ]; then
636     %(sudo)s kill -9 -- -%(pid)d $SUBKILL || /bin/true
637     %(sudo)s kill -9 %(pid)d $SUBKILL || /bin/true
638 fi
639 """
640     if nowait:
641         cmd = "( %s ) >/dev/null 2>/dev/null </dev/null &" % (cmd,)
642
643     (out,err),proc = rexec(
644         cmd % {
645             'ppid' : ppid,
646             'pid' : pid,
647             'sudo' : 'sudo -S' if sudo else '',
648             'subkill' : subkill,
649         },
650         host = host,
651         port = port,
652         user = user,
653         gwuser = gwuser,
654         gw = gw,
655         agent = agent,
656         identity = identity,
657         server_key = server_key
658         )
659     
660     # wait, don't leave zombies around
661     proc.wait()
662
663     return (out, err), proc
664
665 def _retry_rexec(args,
666         log_msg,
667         stdout = subprocess.PIPE,
668         stdin = subprocess.PIPE, 
669         stderr = subprocess.PIPE,
670         env = None,
671         retry = 3,
672         tmp_known_hosts = None,
673         blocking = True):
674
675     for x in xrange(retry):
676         # connects to the remote host and starts a remote connection
677         proc = subprocess.Popen(args,
678                 env = env,
679                 stdout = stdout,
680                 stdin = stdin, 
681                 stderr = stderr)
682         
683         # attach tempfile object to the process, to make sure the file stays
684         # alive until the process is finished with it
685         proc._known_hosts = tmp_known_hosts
686     
687         # The argument block == False forces to rexec to return immediately, 
688         # without blocking 
689         try:
690             err = out = " "
691             if blocking:
692                 (out, err) = proc.communicate()
693             elif stdout:
694                 out = proc.stdout.read()
695                 if proc.poll() and stderr:
696                     err = proc.stderr.read()
697
698             log(log_msg, logging.DEBUG, out, err)
699
700             if proc.poll():
701                 skip = False
702
703                 if err.strip().startswith('ssh: ') or err.strip().startswith('mux_client_hello_exchange: '):
704                     # SSH error, can safely retry
705                     skip = True 
706                 elif retry:
707                     # Probably timed out or plain failed but can retry
708                     skip = True 
709                 
710                 if skip:
711                     t = x*2
712                     msg = "SLEEPING %d ... ATEMPT %d - command %s " % ( 
713                             t, x, " ".join(args))
714                     log(msg, logging.DEBUG)
715
716                     time.sleep(t)
717                     continue
718             break
719         except RuntimeError, e:
720             msg = " rexec EXCEPTION - TIMEOUT -> %s \n %s" % ( e.args, log_msg )
721             log(msg, logging.DEBUG, out, err)
722
723             if retry <= 0:
724                 raise
725             retry -= 1
726         
727     return ((out, err), proc)
728