Merging ns-3 into nepi-3-dev
[nepi.git] / src / nepi / util / sshfuncs.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19 #         Claudio Freire <claudio-daniel.freire@inria.fr>
20
21 ## TODO: This code needs reviewing !!!
22
23 import base64
24 import errno
25 import hashlib
26 import logging
27 import os
28 import os.path
29 import re
30 import select
31 import signal
32 import socket
33 import subprocess
34 import threading
35 import time
36 import tempfile
37
38 logger = logging.getLogger("sshfuncs")
39
40 def log(msg, level, out = None, err = None):
41     if out:
42         msg += " - OUT: %s " % out
43
44     if err:
45         msg += " - ERROR: %s " % err
46
47     logger.log(level, msg)
48
49
50 if hasattr(os, "devnull"):
51     DEV_NULL = os.devnull
52 else:
53     DEV_NULL = "/dev/null"
54
55 SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
56
57 class STDOUT: 
58     """
59     Special value that when given to rspawn in stderr causes stderr to 
60     redirect to whatever stdout was redirected to.
61     """
62
63 class ProcStatus:
64     """
65     Codes for status of remote spawned process
66     """
67     # Process is still running
68     RUNNING = 1
69
70     # Process is finished
71     FINISHED = 2
72     
73     # Process hasn't started running yet (this should be very rare)
74     NOT_STARTED = 3
75
76 hostbyname_cache = dict()
77 hostbyname_cache_lock = threading.Lock()
78
79 def gethostbyname(host):
80     global hostbyname_cache
81     global hostbyname_cache_lock
82     
83     hostbyname = hostbyname_cache.get(host)
84     if not hostbyname:
85         with hostbyname_cache_lock:
86             hostbyname = socket.gethostbyname(host)
87             hostbyname_cache[host] = hostbyname
88
89             msg = " Added hostbyname %s - %s " % (host, hostbyname)
90             log(msg, logging.DEBUG)
91
92     return hostbyname
93
94 OPENSSH_HAS_PERSIST = None
95
96 def openssh_has_persist():
97     """ The ssh_config options ControlMaster and ControlPersist allow to
98     reuse a same network connection for multiple ssh sessions. In this 
99     way limitations on number of open ssh connections can be bypassed.
100     However, older versions of openSSH do not support this feature.
101     This function is used to determine if ssh connection persist features
102     can be used.
103     """
104     global OPENSSH_HAS_PERSIST
105     if OPENSSH_HAS_PERSIST is None:
106         proc = subprocess.Popen(["ssh","-v"],
107             stdout = subprocess.PIPE,
108             stderr = subprocess.STDOUT,
109             stdin = open("/dev/null","r") )
110         out,err = proc.communicate()
111         proc.wait()
112         
113         vre = re.compile(r'OpenSSH_(?:[6-9]|5[.][8-9]|5[.][1-9][0-9]|[1-9][0-9]).*', re.I)
114         OPENSSH_HAS_PERSIST = bool(vre.match(out))
115     return OPENSSH_HAS_PERSIST
116
117 def make_server_key_args(server_key, host, port):
118     """ Returns a reference to a temporary known_hosts file, to which 
119     the server key has been added. 
120     
121     Make sure to hold onto the temp file reference until the process is 
122     done with it
123
124     :param server_key: the server public key
125     :type server_key: str
126
127     :param host: the hostname
128     :type host: str
129
130     :param port: the ssh port
131     :type port: str
132
133     """
134     if port is not None:
135         host = '%s:%s' % (host, str(port))
136
137     # Create a temporary server key file
138     tmp_known_hosts = tempfile.NamedTemporaryFile()
139    
140     hostbyname = gethostbyname(host) 
141
142     # Add the intended host key
143     tmp_known_hosts.write('%s,%s %s\n' % (host, hostbyname, server_key))
144     
145     # If we're not in strict mode, add user-configured keys
146     if os.environ.get('NEPI_STRICT_AUTH_MODE',"").lower() not in ('1','true','on'):
147         user_hosts_path = '%s/.ssh/known_hosts' % (os.environ.get('HOME',""),)
148         if os.access(user_hosts_path, os.R_OK):
149             f = open(user_hosts_path, "r")
150             tmp_known_hosts.write(f.read())
151             f.close()
152         
153     tmp_known_hosts.flush()
154     
155     return tmp_known_hosts
156
157 def make_control_path(agent, forward_x11):
158     ctrl_path = "/tmp/nepi_ssh"
159
160     if agent:
161         ctrl_path +="_a"
162
163     if forward_x11:
164         ctrl_path +="_x"
165
166     ctrl_path += "-%r@%h:%p"
167
168     return ctrl_path
169
170 def shell_escape(s):
171     """ Escapes strings so that they are safe to use as command-line 
172     arguments """
173     if SHELL_SAFE.match(s):
174         # safe string - no escaping needed
175         return s
176     else:
177         # unsafe string - escape
178         def escp(c):
179             if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",'"'):
180                 return c
181             else:
182                 return "'$'\\x%02x''" % (ord(c),)
183         s = ''.join(map(escp,s))
184         return "'%s'" % (s,)
185
186 def eintr_retry(func):
187     """Retries a function invocation when a EINTR occurs"""
188     import functools
189     @functools.wraps(func)
190     def rv(*p, **kw):
191         retry = kw.pop("_retry", False)
192         for i in xrange(0 if retry else 4):
193             try:
194                 return func(*p, **kw)
195             except (select.error, socket.error), args:
196                 if args[0] == errno.EINTR:
197                     continue
198                 else:
199                     raise 
200             except OSError, e:
201                 if e.errno == errno.EINTR:
202                     continue
203                 else:
204                     raise
205         else:
206             return func(*p, **kw)
207     return rv
208
209 def rexec(command, host, user, 
210         port = None,
211         gwuser = None,
212         gw = None, 
213         agent = True,
214         sudo = False,
215         identity = None,
216         server_key = None,
217         env = None,
218         tty = False,
219         connect_timeout = 30,
220         retry = 3,
221         persistent = True,
222         forward_x11 = False,
223         blocking = True,
224         strict_host_checking = True):
225     """
226     Executes a remote command, returns ((stdout,stderr),process)
227     """
228     
229     tmp_known_hosts = None
230     if not gw:
231         hostip = gethostbyname(host)
232     else: hostip = None
233
234     args = ['ssh', '-C',
235             # Don't bother with localhost. Makes test easier
236             '-o', 'NoHostAuthenticationForLocalhost=yes',
237             '-o', 'ConnectTimeout=%d' % (int(connect_timeout),),
238             '-o', 'ConnectionAttempts=3',
239             '-o', 'ServerAliveInterval=30',
240             '-o', 'TCPKeepAlive=yes',
241             '-o', 'Batchmode=yes',
242             '-l', user, hostip or host]
243
244     if persistent and openssh_has_persist():
245         args.extend([
246             '-o', 'ControlMaster=auto',
247             '-o', 'ControlPath=%s' % (make_control_path(agent, forward_x11),),
248             '-o', 'ControlPersist=60' ])
249
250     if not strict_host_checking:
251         # Do not check for Host key. Unsafe.
252         args.extend(['-o', 'StrictHostKeyChecking=no'])
253
254     if gw:
255         if gwuser:
256             proxycommand = 'ProxyCommand=ssh %s@%s -W %%h:%%p' % (gwuser, gw)
257         else:
258             proxycommand = 'ProxyCommand=ssh %%r@%s -W %%h:%%p' % gw
259         args.extend(['-o', proxycommand])
260
261     if agent:
262         args.append('-A')
263
264     if port:
265         args.append('-p%d' % port)
266
267     if identity:
268         identity = os.path.expanduser(identity)
269         args.extend(('-i', identity))
270
271     if tty:
272         args.append('-t')
273         args.append('-t')
274
275     if forward_x11:
276         args.append('-X')
277
278     if server_key:
279         # Create a temporary server key file
280         tmp_known_hosts = make_server_key_args(server_key, host, port)
281         args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
282
283     if sudo:
284         command = "sudo " + command
285
286     args.append(command)
287     
288     log_msg = " rexec - host %s - command %s " % (host, " ".join(args))
289
290     stdout = stderr = stdin = subprocess.PIPE
291     if forward_x11:
292         stdout = stderr = stdin = None
293
294     return _retry_rexec(args, log_msg, 
295             stderr = stderr,
296             stdin = stdin,
297             stdout = stdout,
298             env = env, 
299             retry = retry, 
300             tmp_known_hosts = tmp_known_hosts,
301             blocking = blocking)
302
303 def rcopy(source, dest,
304         port = None,
305         gwuser = None,
306         gw = None,
307         agent = True, 
308         recursive = False,
309         identity = None,
310         server_key = None,
311         retry = 3,
312         strict_host_checking = True):
313     """
314     Copies from/to remote sites.
315     
316     Source and destination should have the user and host encoded
317     as per scp specs.
318     
319     Source can be a list of files to copy to a single destination, 
320     (in which case it is advised that the destination be a folder),
321     a single file in a string or a semi-colon separated list of files
322     in a string.
323     """
324
325     # Parse destination as <user>@<server>:<path>
326     if isinstance(dest, str) and ':' in dest:
327         remspec, path = dest.split(':',1)
328     elif isinstance(source, str) and ':' in source:
329         remspec, path = source.split(':',1)
330     else:
331         raise ValueError, "Both endpoints cannot be local"
332     user,host = remspec.rsplit('@',1)
333     
334     # plain scp
335     tmp_known_hosts = None
336
337     args = ['scp', '-q', '-p', '-C',
338             # Speed up transfer using blowfish cypher specification which is 
339             # faster than the default one (3des)
340             '-c', 'blowfish',
341             # Don't bother with localhost. Makes test easier
342             '-o', 'NoHostAuthenticationForLocalhost=yes',
343             '-o', 'ConnectTimeout=60',
344             '-o', 'ConnectionAttempts=3',
345             '-o', 'ServerAliveInterval=30',
346             '-o', 'TCPKeepAlive=yes' ]
347             
348     if port:
349         args.append('-P%d' % port)
350
351     if gw:
352         if gwuser:
353             proxycommand = 'ProxyCommand=ssh %s@%s -W %%h:%%p' % (gwuser, gw)
354         else:
355             proxycommand = 'ProxyCommand=ssh %%r@%s -W %%h:%%p' % gw
356         args.extend(['-o', proxycommand])
357
358     if recursive:
359         args.append('-r')
360
361     if identity:
362         identity = os.path.expanduser(identity)
363         args.extend(('-i', identity))
364
365     if server_key:
366         # Create a temporary server key file
367         tmp_known_hosts = make_server_key_args(server_key, host, port)
368         args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
369
370     if not strict_host_checking:
371         # Do not check for Host key. Unsafe.
372         args.extend(['-o', 'StrictHostKeyChecking=no'])
373     
374     if isinstance(source, list):
375         args.extend(source)
376     else:
377         if openssh_has_persist():
378             args.extend([
379                 '-o', 'ControlMaster=auto',
380                 '-o', 'ControlPath=%s' % (make_control_path(agent, False),)
381                 ])
382         args.append(source)
383
384     args.append(dest)
385
386     log_msg = " rcopy - host %s - command %s " % (host, " ".join(args))
387     
388     return _retry_rexec(args, log_msg, env = None, retry = retry, 
389             tmp_known_hosts = tmp_known_hosts,
390             blocking = True)
391
392 def rspawn(command, pidfile, 
393         stdout = '/dev/null', 
394         stderr = STDOUT, 
395         stdin = '/dev/null',
396         home = None, 
397         create_home = False, 
398         sudo = False,
399         host = None, 
400         port = None, 
401         user = None, 
402         gwuser = None,
403         gw = None,
404         agent = None, 
405         identity = None, 
406         server_key = None,
407         tty = False):
408     """
409     Spawn a remote command such that it will continue working asynchronously in 
410     background. 
411
412         :param command: The command to run, it should be a single line.
413         :type command: str
414
415         :param pidfile: Path to a file where to store the pid and ppid of the 
416                         spawned process
417         :type pidfile: str
418
419         :param stdout: Path to file to redirect standard output. 
420                        The default value is /dev/null
421         :type stdout: str
422
423         :param stderr: Path to file to redirect standard error.
424                        If the special STDOUT value is used, stderr will 
425                        be redirected to the same file as stdout
426         :type stderr: str
427
428         :param stdin: Path to a file with input to be piped into the command's standard input
429         :type stdin: str
430
431         :param home: Path to working directory folder. 
432                     It is assumed to exist unless the create_home flag is set.
433         :type home: str
434
435         :param create_home: Flag to force creation of the home folder before 
436                             running the command
437         :type create_home: bool
438  
439         :param sudo: Flag forcing execution with sudo user
440         :type sudo: bool
441         
442         :rtype: touple
443
444         (stdout, stderr), process
445         
446         Of the spawning process, which only captures errors at spawning time.
447         Usually only useful for diagnostics.
448     """
449     # Start process in a "daemonized" way, using nohup and heavy
450     # stdin/out redirection to avoid connection issues
451     if stderr is STDOUT:
452         stderr = '&1'
453     else:
454         stderr = ' ' + stderr
455     
456     daemon_command = '{ { %(command)s > %(stdout)s 2>%(stderr)s < %(stdin)s & } ; echo $! 1 > %(pidfile)s ; }' % {
457         'command' : command,
458         'pidfile' : shell_escape(pidfile),
459         'stdout' : stdout,
460         'stderr' : stderr,
461         'stdin' : stdin,
462     }
463     
464     cmd = "%(create)s%(gohome)s rm -f %(pidfile)s ; %(sudo)s nohup bash -c %(command)s " % {
465             'command' : shell_escape(daemon_command),
466             'sudo' : 'sudo -S' if sudo else '',
467             'pidfile' : shell_escape(pidfile),
468             'gohome' : 'cd %s ; ' % (shell_escape(home),) if home else '',
469             'create' : 'mkdir -p %s ; ' % (shell_escape(home),) if create_home and home else '',
470         }
471
472     (out,err),proc = rexec(
473         cmd,
474         host = host,
475         port = port,
476         user = user,
477         gwuser = gwuser,
478         gw = gw,
479         agent = agent,
480         identity = identity,
481         server_key = server_key,
482         tty = tty ,
483         )
484     
485     if proc.wait():
486         raise RuntimeError, "Failed to set up application on host %s: %s %s" % (host, out,err,)
487
488     return ((out, err), proc)
489
490 @eintr_retry
491 def rgetpid(pidfile,
492         host = None, 
493         port = None, 
494         user = None, 
495         gwuser = None,
496         gw = None,
497         agent = None, 
498         identity = None,
499         server_key = None):
500     """
501     Returns the pid and ppid of a process from a remote file where the 
502     information was stored.
503
504         :param home: Path to directory where the pidfile is located
505         :type home: str
506
507         :param pidfile: Name of file containing the pid information
508         :type pidfile: str
509         
510         :rtype: int
511         
512         A (pid, ppid) tuple useful for calling rstatus and rkill,
513         or None if the pidfile isn't valid yet (can happen when process is staring up)
514
515     """
516     (out,err),proc = rexec(
517         "cat %(pidfile)s" % {
518             'pidfile' : pidfile,
519         },
520         host = host,
521         port = port,
522         user = user,
523         gwuser = gwuser,
524         gw = gw,
525         agent = agent,
526         identity = identity,
527         server_key = server_key
528         )
529         
530     if proc.wait():
531         return None
532     
533     if out:
534         try:
535             return map(int,out.strip().split(' ',1))
536         except:
537             # Ignore, many ways to fail that don't matter that much
538             return None
539
540 @eintr_retry
541 def rstatus(pid, ppid, 
542         host = None, 
543         port = None, 
544         user = None, 
545         gwuser = None,
546         gw = None,
547         agent = None, 
548         identity = None,
549         server_key = None):
550     """
551     Returns a code representing the the status of a remote process
552
553         :param pid: Process id of the process
554         :type pid: int
555
556         :param ppid: Parent process id of process
557         :type ppid: int
558     
559         :rtype: int (One of NOT_STARTED, RUNNING, FINISHED)
560     
561     """
562     (out,err),proc = rexec(
563         # Check only by pid. pid+ppid does not always work (especially with sudo) 
564         " (( ps --pid %(pid)d -o pid | grep -c %(pid)d && echo 'wait')  || echo 'done' ) | tail -n 1" % {
565             'ppid' : ppid,
566             'pid' : pid,
567         },
568         host = host,
569         port = port,
570         user = user,
571         gwuser = gwuser,
572         gw = gw,
573         agent = agent,
574         identity = identity,
575         server_key = server_key
576         )
577     
578     if proc.wait():
579         return ProcStatus.NOT_STARTED
580     
581     status = False
582     if err:
583         if err.strip().find("Error, do this: mount -t proc none /proc") >= 0:
584             status = True
585     elif out:
586         status = (out.strip() == 'wait')
587     else:
588         return ProcStatus.NOT_STARTED
589     return ProcStatus.RUNNING if status else ProcStatus.FINISHED
590
591 @eintr_retry
592 def rkill(pid, ppid,
593         host = None, 
594         port = None, 
595         user = None, 
596         gwuser = None,
597         gw = None,
598         agent = None, 
599         sudo = False,
600         identity = None, 
601         server_key = None, 
602         nowait = False):
603     """
604     Sends a kill signal to a remote process.
605
606     First tries a SIGTERM, and if the process does not end in 10 seconds,
607     it sends a SIGKILL.
608  
609         :param pid: Process id of process to be killed
610         :type pid: int
611
612         :param ppid: Parent process id of process to be killed
613         :type ppid: int
614
615         :param sudo: Flag indicating if sudo should be used to kill the process
616         :type sudo: bool
617         
618     """
619     subkill = "$(ps --ppid %(pid)d -o pid h)" % { 'pid' : pid }
620     cmd = """
621 SUBKILL="%(subkill)s" ;
622 %(sudo)s kill -- -%(pid)d $SUBKILL || /bin/true
623 %(sudo)s kill %(pid)d $SUBKILL || /bin/true
624 for x in 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 ; do 
625     sleep 0.2 
626     if [ `ps --pid %(pid)d -o pid | grep -c %(pid)d` == '0' ]; then
627         break
628     else
629         %(sudo)s kill -- -%(pid)d $SUBKILL || /bin/true
630         %(sudo)s kill %(pid)d $SUBKILL || /bin/true
631     fi
632     sleep 1.8
633 done
634 if [ `ps --pid %(pid)d -o pid | grep -c %(pid)d` != '0' ]; then
635     %(sudo)s kill -9 -- -%(pid)d $SUBKILL || /bin/true
636     %(sudo)s kill -9 %(pid)d $SUBKILL || /bin/true
637 fi
638 """
639     if nowait:
640         cmd = "( %s ) >/dev/null 2>/dev/null </dev/null &" % (cmd,)
641
642     (out,err),proc = rexec(
643         cmd % {
644             'ppid' : ppid,
645             'pid' : pid,
646             'sudo' : 'sudo -S' if sudo else '',
647             'subkill' : subkill,
648         },
649         host = host,
650         port = port,
651         user = user,
652         gwuser = gwuser,
653         gw = gw,
654         agent = agent,
655         identity = identity,
656         server_key = server_key
657         )
658     
659     # wait, don't leave zombies around
660     proc.wait()
661
662     return (out, err), proc
663
664 def _retry_rexec(args,
665         log_msg,
666         stdout = subprocess.PIPE,
667         stdin = subprocess.PIPE, 
668         stderr = subprocess.PIPE,
669         env = None,
670         retry = 3,
671         tmp_known_hosts = None,
672         blocking = True):
673
674     for x in xrange(retry):
675         # connects to the remote host and starts a remote connection
676         proc = subprocess.Popen(args,
677                 env = env,
678                 stdout = stdout,
679                 stdin = stdin, 
680                 stderr = stderr)
681         
682         # attach tempfile object to the process, to make sure the file stays
683         # alive until the process is finished with it
684         proc._known_hosts = tmp_known_hosts
685     
686         # The argument block == False forces to rexec to return immediately, 
687         # without blocking 
688         try:
689             err = out = " "
690             if blocking:
691                 (out, err) = proc.communicate()
692             elif stdout:
693                 out = proc.stdout.read()
694                 if proc.poll() and stderr:
695                     err = proc.stderr.read()
696
697             log(log_msg, logging.DEBUG, out, err)
698
699             if proc.poll():
700                 skip = False
701
702                 if err.strip().startswith('ssh: ') or err.strip().startswith('mux_client_hello_exchange: '):
703                     # SSH error, can safely retry
704                     skip = True 
705                 elif retry:
706                     # Probably timed out or plain failed but can retry
707                     skip = True 
708                 
709                 if skip:
710                     t = x*2
711                     msg = "SLEEPING %d ... ATEMPT %d - command %s " % ( 
712                             t, x, " ".join(args))
713                     log(msg, logging.DEBUG)
714
715                     time.sleep(t)
716                     continue
717             break
718         except RuntimeError, e:
719             msg = " rexec EXCEPTION - TIMEOUT -> %s \n %s" % ( e.args, log_msg )
720             log(msg, logging.DEBUG, out, err)
721
722             if retry <= 0:
723                 raise
724             retry -= 1
725         
726     return ((out, err), proc)
727