9ef92b8b2fd80eef0d7c10e34be2bb53ee7d09cf
[nepi.git] / src / nepi / util / server.py
1 # -*- coding: utf-8 -*-
2
3 from nepi.util.constants import DeploymentConfiguration as DC
4
5 import base64
6 import errno
7 import os
8 import os.path
9 import resource
10 import select
11 import shutil
12 import signal
13 import socket
14 import sys
15 import subprocess
16 import threading
17 import time
18 import traceback
19 import re
20 import tempfile
21 import defer
22 import functools
23 import collections
24 import hashlib
25
26 CTRL_SOCK = "ctrl.sock"
27 CTRL_PID = "ctrl.pid"
28 STD_ERR = "stderr.log"
29 MAX_FD = 1024
30
31 STOP_MSG = "STOP"
32
33 TRACE = os.environ.get("NEPI_TRACE", "false").lower() in ("true", "1", "on")
34
35 OPENSSH_HAS_PERSIST = None
36
37 if hasattr(os, "devnull"):
38     DEV_NULL = os.devnull
39 else:
40     DEV_NULL = "/dev/null"
41
42 SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
43
44 hostbyname_cache = dict()
45
46 def gethostbyname(host):
47     hostbyname = hostbyname_cache.get(host)
48     if not hostbyname:
49         hostbyname = socket.gethostbyname(host)
50         hostbyname_cache[host] = hostbyname
51     return hostbyname
52
53 def openssh_has_persist():
54     global OPENSSH_HAS_PERSIST
55     if OPENSSH_HAS_PERSIST is None:
56         proc = subprocess.Popen(["ssh","-v"],
57             stdout = subprocess.PIPE,
58             stderr = subprocess.STDOUT,
59             stdin = open("/dev/null","r") )
60         out,err = proc.communicate()
61         proc.wait()
62         
63         vre = re.compile(r'OpenSSH_(?:[6-9]|5[.][8-9]|5[.][1-9][0-9]|[1-9][0-9]).*', re.I)
64         OPENSSH_HAS_PERSIST = bool(vre.match(out))
65     return OPENSSH_HAS_PERSIST
66
67 def shell_escape(s):
68     """ Escapes strings so that they are safe to use as command-line arguments """
69     if SHELL_SAFE.match(s):
70         # safe string - no escaping needed
71         return s
72     else:
73         # unsafe string - escape
74         def escp(c):
75             if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",'"'):
76                 return c
77             else:
78                 return "'$'\\x%02x''" % (ord(c),)
79         s = ''.join(map(escp,s))
80         return "'%s'" % (s,)
81
82 def eintr_retry(func):
83     import functools
84     @functools.wraps(func)
85     def rv(*p, **kw):
86         retry = kw.pop("_retry", False)
87         for i in xrange(0 if retry else 4):
88             try:
89                 return func(*p, **kw)
90             except (select.error, socket.error), args:
91                 if args[0] == errno.EINTR:
92                     continue
93                 else:
94                     raise 
95             except OSError, e:
96                 if e.errno == errno.EINTR:
97                     continue
98                 else:
99                     raise
100         else:
101             return func(*p, **kw)
102     return rv
103
104 class Server(object):
105     def __init__(self, root_dir = ".", log_level = DC.ERROR_LEVEL, 
106             environment_setup = "", clean_root = False):
107         self._root_dir = root_dir
108         self._clean_root = clean_root
109         self._stop = False
110         self._ctrl_sock = None
111         self._log_level = log_level
112         self._rdbuf = ""
113         self._environment_setup = environment_setup
114
115     def run(self):
116         try:
117             if self.daemonize():
118                 self.post_daemonize()
119                 self.loop()
120                 self.cleanup()
121                 # ref: "os._exit(0)"
122                 # can not return normally after fork beacuse no exec was done.
123                 # This means that if we don't do a os._exit(0) here the code that 
124                 # follows the call to "Server.run()" in the "caller code" will be 
125                 # executed... but by now it has already been executed after the 
126                 # first process (the one that did the first fork) returned.
127                 os._exit(0)
128         except:
129             print >>sys.stderr, "SERVER_ERROR."
130             self.log_error()
131             self.cleanup()
132             os._exit(0)
133         print >>sys.stderr, "SERVER_READY."
134
135     def daemonize(self):
136         # pipes for process synchronization
137         (r, w) = os.pipe()
138         
139         # build root folder
140         root = os.path.normpath(self._root_dir)
141         if self._root_dir not in [".", ""] and os.path.exists(root) \
142                 and self._clean_root:
143             shutil.rmtree(root)
144         if not os.path.exists(root):
145             os.makedirs(root, 0755)
146
147         pid1 = os.fork()
148         if pid1 > 0:
149             os.close(w)
150             while True:
151                 try:
152                     os.read(r, 1)
153                 except OSError, e: # pragma: no cover
154                     if e.errno == errno.EINTR:
155                         continue
156                     else:
157                         raise
158                 break
159             os.close(r)
160             # os.waitpid avoids leaving a <defunc> (zombie) process
161             st = os.waitpid(pid1, 0)[1]
162             if st:
163                 raise RuntimeError("Daemonization failed")
164             # return 0 to inform the caller method that this is not the 
165             # daemonized process
166             return 0
167         os.close(r)
168
169         # Decouple from parent environment.
170         os.chdir(self._root_dir)
171         os.umask(0)
172         os.setsid()
173
174         # fork 2
175         pid2 = os.fork()
176         if pid2 > 0:
177             # see ref: "os._exit(0)"
178             os._exit(0)
179
180         # close all open file descriptors.
181         max_fd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
182         if (max_fd == resource.RLIM_INFINITY):
183             max_fd = MAX_FD
184         for fd in range(3, max_fd):
185             if fd != w:
186                 try:
187                     os.close(fd)
188                 except OSError:
189                     pass
190
191         # Redirect standard file descriptors.
192         stdin = open(DEV_NULL, "r")
193         stderr = stdout = open(STD_ERR, "a", 0)
194         os.dup2(stdin.fileno(), sys.stdin.fileno())
195         # NOTE: sys.stdout.write will still be buffered, even if the file
196         # was opened with 0 buffer
197         os.dup2(stdout.fileno(), sys.stdout.fileno())
198         os.dup2(stderr.fileno(), sys.stderr.fileno())
199         
200         # setup environment
201         if self._environment_setup:
202             # parse environment variables and pass to child process
203             # do it by executing shell commands, in case there's some heavy setup involved
204             envproc = subprocess.Popen(
205                 [ "bash", "-c", 
206                     "( %s python -c 'import os,sys ; print \"\\x01\".join(\"\\x02\".join(map(str,x)) for x in os.environ.iteritems())' ) | tail -1" %
207                         ( self._environment_setup, ) ],
208                 stdin = subprocess.PIPE, 
209                 stdout = subprocess.PIPE,
210                 stderr = subprocess.PIPE
211             )
212             out,err = envproc.communicate()
213
214             # parse new environment
215             if out:
216                 environment = dict(map(lambda x:x.split("\x02"), out.split("\x01")))
217             
218                 # apply to current environment
219                 for name, value in environment.iteritems():
220                     os.environ[name] = value
221                 
222                 # apply pythonpath
223                 if 'PYTHONPATH' in environment:
224                     sys.path = environment['PYTHONPATH'].split(':') + sys.path
225
226         # create control socket
227         self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
228         try:
229             self._ctrl_sock.bind(CTRL_SOCK)
230         except socket.error:
231             # Address in use, check pidfile
232             pid = None
233             try:
234                 pidfile = open(CTRL_PID, "r")
235                 pid = pidfile.read()
236                 pidfile.close()
237                 pid = int(pid)
238             except:
239                 # no pidfile
240                 pass
241             
242             if pid is not None:
243                 # Check process liveliness
244                 if not os.path.exists("/proc/%d" % (pid,)):
245                     # Ok, it's dead, clean the socket
246                     os.remove(CTRL_SOCK)
247             
248             # try again
249             self._ctrl_sock.bind(CTRL_SOCK)
250             
251         self._ctrl_sock.listen(0)
252         
253         # Save pidfile
254         pidfile = open(CTRL_PID, "w")
255         pidfile.write(str(os.getpid()))
256         pidfile.close()
257
258         # let the parent process know that the daemonization is finished
259         os.write(w, "\n")
260         os.close(w)
261         return 1
262
263     def post_daemonize(self):
264         os.environ["NEPI_CONTROLLER_LOGLEVEL"] = self._log_level
265         # QT, for some strange reason, redefines the SIGCHILD handler to write
266         # a \0 to a fd (lets say fileno 'x'), when ever a SIGCHILD is received.
267         # Server dameonization closes all file descriptors from fileno '3',
268         # but the overloaded handler (inherited by the forked process) will
269         # keep trying to write the \0 to fileno 'x', which might have been reused 
270         # after closing, for other operations. This is bad bad bad when fileno 'x'
271         # is in use for communication pouroses, because unexpected \0 start
272         # appearing in the communication messages... this is exactly what happens 
273         # when using netns in daemonized form. Thus, be have no other alternative than
274         # restoring the SIGCHLD handler to the default here.
275         import signal
276         signal.signal(signal.SIGCHLD, signal.SIG_DFL)
277
278     def loop(self):
279         while not self._stop:
280             conn, addr = self._ctrl_sock.accept()
281             self.log_error("ACCEPTED CONNECTION: %s" % (addr,))
282             conn.settimeout(5)
283             while not self._stop:
284                 try:
285                     msg = self.recv_msg(conn)
286                 except socket.timeout, e:
287                     #self.log_error("SERVER recv_msg: connection timedout ")
288                     continue
289                 
290                 if not msg:
291                     self.log_error("CONNECTION LOST")
292                     break
293                     
294                 if msg == STOP_MSG:
295                     self._stop = True
296                     reply = self.stop_action()
297                 else:
298                     reply = self.reply_action(msg)
299                 
300                 try:
301                     self.send_reply(conn, reply)
302                 except socket.error:
303                     self.log_error()
304                     self.log_error("NOTICE: Awaiting for reconnection")
305                     break
306             try:
307                 conn.close()
308             except:
309                 # Doesn't matter
310                 self.log_error()
311
312     def recv_msg(self, conn):
313         data = [self._rdbuf]
314         chunk = data[0]
315         while '\n' not in chunk:
316             try:
317                 chunk = conn.recv(1024)
318             except (OSError, socket.error), e:
319                 if e[0] != errno.EINTR:
320                     raise
321                 else:
322                     continue
323             if chunk:
324                 data.append(chunk)
325             else:
326                 # empty chunk = EOF
327                 break
328         data = ''.join(data).split('\n',1)
329         while len(data) < 2:
330             data.append('')
331         data, self._rdbuf = data
332         
333         decoded = base64.b64decode(data)
334         return decoded.rstrip()
335
336     def send_reply(self, conn, reply):
337         encoded = base64.b64encode(reply)
338         conn.send("%s\n" % encoded)
339        
340     def cleanup(self):
341         try:
342             self._ctrl_sock.close()
343             os.remove(CTRL_SOCK)
344         except:
345             self.log_error()
346
347     def stop_action(self):
348         return "Stopping server"
349
350     def reply_action(self, msg):
351         return "Reply to: %s" % msg
352
353     def log_error(self, text = None, context = ''):
354         if text == None:
355             text = traceback.format_exc()
356         date = time.strftime("%Y-%m-%d %H:%M:%S")
357         if context:
358             context = " (%s)" % (context,)
359         sys.stderr.write("ERROR%s: %s\n%s\n" % (context, date, text))
360         return text
361
362     def log_debug(self, text):
363         if self._log_level == DC.DEBUG_LEVEL:
364             date = time.strftime("%Y-%m-%d %H:%M:%S")
365             sys.stderr.write("DEBUG: %s\n%s\n" % (date, text))
366
367 class Forwarder(object):
368     def __init__(self, root_dir = "."):
369         self._ctrl_sock = None
370         self._root_dir = root_dir
371         self._stop = False
372         self._rdbuf = ""
373
374     def forward(self):
375         self.connect()
376         print >>sys.stderr, "FORWARDER_READY."
377         while not self._stop:
378             data = self.read_data()
379             if not data:
380                 # Connection to client lost
381                 break
382             self.send_to_server(data)
383             
384             data = self.recv_from_server()
385             if not data:
386                 # Connection to server lost
387                 raise IOError, "Connection to server lost while "\
388                     "expecting response"
389             self.write_data(data)
390         self.disconnect()
391
392     def read_data(self):
393         return sys.stdin.readline()
394
395     def write_data(self, data):
396         sys.stdout.write(data)
397         # sys.stdout.write is buffered, this is why we need to do a flush()
398         sys.stdout.flush()
399
400     def send_to_server(self, data):
401         try:
402             self._ctrl_sock.send(data)
403         except (IOError, socket.error), e:
404             if e[0] == errno.EPIPE:
405                 self.connect()
406                 self._ctrl_sock.send(data)
407             else:
408                 raise e
409         encoded = data.rstrip() 
410         msg = base64.b64decode(encoded)
411         if msg == STOP_MSG:
412             self._stop = True
413
414     def recv_from_server(self):
415         data = [self._rdbuf]
416         chunk = data[0]
417         while '\n' not in chunk:
418             try:
419                 chunk = self._ctrl_sock.recv(1024)
420             except (OSError, socket.error), e:
421                 if e[0] != errno.EINTR:
422                     raise
423                 continue
424             if chunk:
425                 data.append(chunk)
426             else:
427                 # empty chunk = EOF
428                 break
429         data = ''.join(data).split('\n',1)
430         while len(data) < 2:
431             data.append('')
432         data, self._rdbuf = data
433         
434         return data+'\n'
435  
436     def connect(self):
437         self.disconnect()
438         self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
439         sock_addr = os.path.join(self._root_dir, CTRL_SOCK)
440         self._ctrl_sock.connect(sock_addr)
441
442     def disconnect(self):
443         try:
444             self._ctrl_sock.close()
445         except:
446             pass
447
448 class Client(object):
449     def __init__(self, root_dir = ".", host = None, port = None, user = None, 
450             agent = None, sudo = False, communication = DC.ACCESS_LOCAL,
451             environment_setup = ""):
452         self.root_dir = root_dir
453         self.addr = (host, port)
454         self.user = user
455         self.agent = agent
456         self.sudo = sudo
457         self.communication = communication
458         self.environment_setup = environment_setup
459         self._stopped = False
460         self._deferreds = collections.deque()
461         self.connect()
462     
463     def __del__(self):
464         if self._process.poll() is None:
465             os.kill(self._process.pid, signal.SIGTERM)
466         self._process.wait()
467         
468     def connect(self):
469         root_dir = self.root_dir
470         (host, port) = self.addr
471         user = self.user
472         agent = self.agent
473         sudo = self.sudo
474         communication = self.communication
475         
476         python_code = "from nepi.util import server;c=server.Forwarder(%r);\
477                 c.forward()" % (root_dir,)
478
479         self._process = popen_python(python_code, 
480                     communication = communication,
481                     host = host, 
482                     port = port, 
483                     user = user, 
484                     agent = agent, 
485                     sudo = sudo, 
486                     environment_setup = self.environment_setup)
487                
488         # Wait for the forwarder to be ready, otherwise nobody
489         # will be able to connect to it
490         err = []
491         helo = "nope"
492         while helo:
493             helo = self._process.stderr.readline()
494             if helo == 'FORWARDER_READY.\n':
495                 break
496             err.append(helo)
497         else:
498             raise AssertionError, "Expected 'FORWARDER_READY.', got: %s" % (''.join(err),)
499         
500     def send_msg(self, msg):
501         encoded = base64.b64encode(msg)
502         data = "%s\n" % encoded
503         
504         try:
505             self._process.stdin.write(data)
506         except (IOError, ValueError):
507             # dead process, poll it to un-zombify
508             self._process.poll()
509             
510             # try again after reconnect
511             # If it fails again, though, give up
512             self.connect()
513             self._process.stdin.write(data)
514
515     def send_stop(self):
516         self.send_msg(STOP_MSG)
517         self._stopped = True
518
519     def defer_reply(self, transform=None):
520         defer_entry = []
521         self._deferreds.append(defer_entry)
522         return defer.Defer(
523             functools.partial(self.read_reply, defer_entry, transform)
524         )
525         
526     def _read_reply(self):
527         data = self._process.stdout.readline()
528         encoded = data.rstrip() 
529         if not encoded:
530             # empty == eof == dead process, poll it to un-zombify
531             self._process.poll()
532             
533             raise RuntimeError, "Forwarder died while awaiting reply: %s" % (self._process.stderr.read(),)
534         return base64.b64decode(encoded)
535     
536     def read_reply(self, which=None, transform=None):
537         # Test to see if someone did it already
538         if which is not None and len(which):
539             # Ok, they did it...
540             # ...just return the deferred value
541             if transform:
542                 return transform(which[0])
543             else:
544                 return which[0]
545         
546         # Process all deferreds until the one we're looking for
547         # or until the queue is empty
548         while self._deferreds:
549             try:
550                 deferred = self._deferreds.popleft()
551             except IndexError:
552                 # emptied
553                 break
554             
555             deferred.append(self._read_reply())
556             if deferred is which:
557                 # We reached the one we were looking for
558                 if transform:
559                     return transform(deferred[0])
560                 else:
561                     return deferred[0]
562         
563         if which is None:
564             # They've requested a synchronous read
565             if transform:
566                 return transform(self._read_reply())
567             else:
568                 return self._read_reply()
569
570 def _make_server_key_args(server_key, host, port, args):
571     """ 
572     Returns a reference to the created temporary file, and adds the
573     corresponding arguments to the given argument list.
574     
575     Make sure to hold onto it until the process is done with the file
576     """
577     if port is not None:
578         host = '%s:%s' % (host,port)
579     # Create a temporary server key file
580     tmp_known_hosts = tempfile.NamedTemporaryFile()
581    
582     hostbyname = gethostbyname(host) 
583
584     # Add the intended host key
585     tmp_known_hosts.write('%s,%s %s\n' % (host, hostbyname, server_key))
586     
587     # If we're not in strict mode, add user-configured keys
588     if os.environ.get('NEPI_STRICT_AUTH_MODE',"").lower() not in ('1','true','on'):
589         user_hosts_path = '%s/.ssh/known_hosts' % (os.environ.get('HOME',""),)
590         if os.access(user_hosts_path, os.R_OK):
591             f = open(user_hosts_path, "r")
592             tmp_known_hosts.write(f.read())
593             f.close()
594         
595     tmp_known_hosts.flush()
596     
597     args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
598     
599     return tmp_known_hosts
600
601 def popen_ssh_command(command, host, port, user, agent, 
602         stdin="", 
603         ident_key = None,
604         server_key = None,
605         tty = False,
606         timeout = None,
607         retry = 0,
608         err_on_timeout = True,
609         connect_timeout = 60,
610         persistent = True,
611         hostip = None):
612     """
613     Executes a remote commands, returns ((stdout,stderr),process)
614     """
615     if TRACE:
616         print "ssh", host, command
617     
618     tmp_known_hosts = None
619     args = ['ssh', '-C',
620             # Don't bother with localhost. Makes test easier
621             '-o', 'NoHostAuthenticationForLocalhost=yes',
622             # XXX: Security vulnerability
623             #'-o', 'StrictHostKeyChecking=no',
624             '-o', 'ConnectTimeout=%d' % (int(connect_timeout),),
625             '-o', 'ConnectionAttempts=3',
626             '-o', 'ServerAliveInterval=30',
627             '-o', 'TCPKeepAlive=yes',
628             '-l', user, hostip or host]
629     if persistent and openssh_has_persist():
630         args.extend([
631             '-o', 'ControlMaster=auto',
632             '-o', 'ControlPath=/tmp/nepi_ssh-%r@%h:%p',
633             '-o', 'ControlPersist=60' ])
634     if agent:
635         args.append('-A')
636     if port:
637         args.append('-p%d' % port)
638     if ident_key:
639         args.extend(('-i', ident_key))
640     if tty:
641         args.append('-t')
642         args.append('-t')
643     if server_key:
644         # Create a temporary server key file
645         tmp_known_hosts = _make_server_key_args(
646             server_key, host, port, args)
647     args.append(command)
648
649
650     for x in xrange(retry or 3):
651         # connects to the remote host and starts a remote connection
652         proc = subprocess.Popen(args, 
653                 stdout = subprocess.PIPE,
654                 stdin = subprocess.PIPE, 
655                 stderr = subprocess.PIPE)
656         
657         # attach tempfile object to the process, to make sure the file stays
658         # alive until the process is finished with it
659         proc._known_hosts = tmp_known_hosts
660     
661         try:
662             out, err = _communicate(proc, stdin, timeout, err_on_timeout)
663             if proc.poll():
664                 if TRACE:
665                     print "COMMAND host %s, command %s, error %s" % (host, " ".join(args), err)
666                 if err.strip().startswith('ssh: ') or err.strip().startswith('mux_client_hello_exchange: '):
667                     # SSH error, can safely retry
668                     continue
669                 elif retry:
670                     # Probably timed out or plain failed but can retry
671                     continue
672             break
673         except RuntimeError,e:
674             if TRACE:
675                 print "COMMAND host %s, command %s, error %s" % (host, " ".join(args), err)
676                 print " timedout -> ", e.args
677             if retry <= 0:
678                 raise
679             retry -= 1
680         
681     if TRACE:
682         print " -> ", out, err
683
684     return ((out, err), proc)
685
686 def popen_scp(source, dest, 
687         port = None, 
688         agent = None, 
689         recursive = False,
690         ident_key = None,
691         server_key = None):
692     """
693     Copies from/to remote sites.
694     
695     Source and destination should have the user and host encoded
696     as per scp specs.
697     
698     If source is a file object, a special mode will be used to
699     create the remote file with the same contents.
700     
701     If dest is a file object, the remote file (source) will be
702     read and written into dest.
703     
704     In these modes, recursive cannot be True.
705     
706     Source can be a list of files to copy to a single destination,
707     in which case it is advised that the destination be a folder.
708     """
709     
710     if TRACE:
711         print "scp", source, dest
712     
713     if isinstance(source, file) and source.tell() == 0:
714         source = source.name
715     elif hasattr(source, 'read'):
716         tmp = tempfile.NamedTemporaryFile()
717         while True:
718             buf = source.read(65536)
719             if buf:
720                 tmp.write(buf)
721             else:
722                 break
723         tmp.seek(0)
724         source = tmp.name
725     
726     if isinstance(source, file) or isinstance(dest, file) \
727             or hasattr(source, 'read')  or hasattr(dest, 'write'):
728         assert not recursive
729         
730         # Parse source/destination as <user>@<server>:<path>
731         if isinstance(dest, basestring) and ':' in dest:
732             remspec, path = dest.split(':',1)
733         elif isinstance(source, basestring) and ':' in source:
734             remspec, path = source.split(':',1)
735         else:
736             raise ValueError, "Both endpoints cannot be local"
737         user,host = remspec.rsplit('@',1)
738         tmp_known_hosts = None
739         
740         args = ['ssh', '-l', user, '-C',
741                 # Don't bother with localhost. Makes test easier
742                 '-o', 'NoHostAuthenticationForLocalhost=yes',
743                 # XXX: Security vulnerability
744                 #'-o', 'StrictHostKeyChecking=no',
745                 '-o', 'ConnectTimeout=60',
746                 '-o', 'ConnectionAttempts=3',
747                 '-o', 'ServerAliveInterval=30',
748                 '-o', 'TCPKeepAlive=yes',
749                 host ]
750         if openssh_has_persist():
751             args.extend([
752                 '-o', 'ControlMaster=auto',
753                 '-o', 'ControlPath=/tmp/nepi_ssh-%r@%h:%p',
754                 '-o', 'ControlPersist=60' ])
755         if port:
756             args.append('-P%d' % port)
757         if ident_key:
758             args.extend(('-i', ident_key))
759         if server_key:
760             # Create a temporary server key file
761             tmp_known_hosts = _make_server_key_args(
762                 server_key, host, port, args)
763         
764         if isinstance(source, file) or hasattr(source, 'read'):
765             args.append('cat > %s' % (shell_escape(path),))
766         elif isinstance(dest, file) or hasattr(dest, 'write'):
767             args.append('cat %s' % (shell_escape(path),))
768         else:
769             raise AssertionError, "Unreachable code reached! :-Q"
770         
771         # connects to the remote host and starts a remote connection
772         if isinstance(source, file):
773             proc = subprocess.Popen(args, 
774                     stdout = open('/dev/null','w'),
775                     stderr = subprocess.PIPE,
776                     stdin = source)
777             err = proc.stderr.read()
778             proc._known_hosts = tmp_known_hosts
779             eintr_retry(proc.wait)()
780             return ((None,err), proc)
781         elif isinstance(dest, file):
782             proc = subprocess.Popen(args, 
783                     stdout = open('/dev/null','w'),
784                     stderr = subprocess.PIPE,
785                     stdin = source)
786             err = proc.stderr.read()
787             proc._known_hosts = tmp_known_hosts
788             eintr_retry(proc.wait)()
789             return ((None,err), proc)
790         elif hasattr(source, 'read'):
791             # file-like (but not file) source
792             proc = subprocess.Popen(args, 
793                     stdout = open('/dev/null','w'),
794                     stderr = subprocess.PIPE,
795                     stdin = subprocess.PIPE)
796             
797             buf = None
798             err = []
799             while True:
800                 if not buf:
801                     buf = source.read(4096)
802                 if not buf:
803                     #EOF
804                     break
805                 
806                 rdrdy, wrdy, broken = select.select(
807                     [proc.stderr],
808                     [proc.stdin],
809                     [proc.stderr,proc.stdin])
810                 
811                 if proc.stderr in rdrdy:
812                     # use os.read for fully unbuffered behavior
813                     err.append(os.read(proc.stderr.fileno(), 4096))
814                 
815                 if proc.stdin in wrdy:
816                     proc.stdin.write(buf)
817                     buf = None
818                 
819                 if broken:
820                     break
821             proc.stdin.close()
822             err.append(proc.stderr.read())
823                 
824             proc._known_hosts = tmp_known_hosts
825             eintr_retry(proc.wait)()
826             return ((None,''.join(err)), proc)
827         elif hasattr(dest, 'write'):
828             # file-like (but not file) dest
829             proc = subprocess.Popen(args, 
830                     stdout = subprocess.PIPE,
831                     stderr = subprocess.PIPE,
832                     stdin = open('/dev/null','w'))
833             
834             buf = None
835             err = []
836             while True:
837                 rdrdy, wrdy, broken = select.select(
838                     [proc.stderr, proc.stdout],
839                     [],
840                     [proc.stderr, proc.stdout])
841                 
842                 if proc.stderr in rdrdy:
843                     # use os.read for fully unbuffered behavior
844                     err.append(os.read(proc.stderr.fileno(), 4096))
845                 
846                 if proc.stdout in rdrdy:
847                     # use os.read for fully unbuffered behavior
848                     buf = os.read(proc.stdout.fileno(), 4096)
849                     dest.write(buf)
850                     
851                     if not buf:
852                         #EOF
853                         break
854                 
855                 if broken:
856                     break
857             err.append(proc.stderr.read())
858                 
859             proc._known_hosts = tmp_known_hosts
860             eintr_retry(proc.wait)()
861             return ((None,''.join(err)), proc)
862         else:
863             raise AssertionError, "Unreachable code reached! :-Q"
864     else:
865         # Parse destination as <user>@<server>:<path>
866         if isinstance(dest, basestring) and ':' in dest:
867             remspec, path = dest.split(':',1)
868         elif isinstance(source, basestring) and ':' in source:
869             remspec, path = source.split(':',1)
870         else:
871             raise ValueError, "Both endpoints cannot be local"
872         user,host = remspec.rsplit('@',1)
873         
874         # plain scp
875         tmp_known_hosts = None
876         args = ['scp', '-q', '-p', '-C',
877                 # Don't bother with localhost. Makes test easier
878                 '-o', 'NoHostAuthenticationForLocalhost=yes',
879                 # XXX: Security vulnerability
880                 #'-o', 'StrictHostKeyChecking=no',
881                 '-o', 'ConnectTimeout=60',
882                 '-o', 'ConnectionAttempts=3',
883                 '-o', 'ServerAliveInterval=30',
884                 '-o', 'TCPKeepAlive=yes' ]
885                 
886         if port:
887             args.append('-P%d' % port)
888         if recursive:
889             args.append('-r')
890         if ident_key:
891             args.extend(('-i', ident_key))
892         if server_key:
893             # Create a temporary server key file
894             tmp_known_hosts = _make_server_key_args(
895                 server_key, host, port, args)
896         if isinstance(source,list):
897             args.extend(source)
898         else:
899             if openssh_has_persist():
900                 args.extend([
901                     '-o', 'ControlMaster=auto',
902                     '-o', 'ControlPath=/tmp/nepi_ssh-%r@%h:%p'])
903             args.append(source)
904         args.append(dest)
905
906         # connects to the remote host and starts a remote connection
907         proc = subprocess.Popen(args, 
908                 stdout = subprocess.PIPE,
909                 stdin = subprocess.PIPE, 
910                 stderr = subprocess.PIPE)
911         proc._known_hosts = tmp_known_hosts
912         
913         comm = proc.communicate()
914         eintr_retry(proc.wait)()
915         return (comm, proc)
916
917 def decode_and_execute():
918     # The python code we want to execute might have characters that 
919     # are not compatible with the 'inline' mode we are using. To avoid
920     # problems we receive the encoded python code in base64 as a input 
921     # stream and decode it for execution.
922     import base64, os
923     cmd = ""
924     while True:
925         try:
926             cmd += os.read(0, 1)# one byte from stdin
927         except OSError, e:            
928             if e.errno == errno.EINTR:
929                 continue
930             else:
931                 raise
932         if cmd[-1] == "\n": 
933             break
934     cmd = base64.b64decode(cmd)
935     # Uncomment for debug
936     #os.write(2, "Executing python code: %s\n" % cmd)
937     os.write(1, "OK\n") # send a sync message
938     exec(cmd)
939
940 def popen_python(python_code, 
941         communication = DC.ACCESS_LOCAL,
942         host = None, 
943         port = None, 
944         user = None, 
945         agent = False, 
946         python_path = None,
947         ident_key = None,
948         server_key = None,
949         tty = False,
950         sudo = False, 
951         environment_setup = ""):
952
953     cmd = ""
954     if python_path:
955         python_path.replace("'", r"'\''")
956         cmd = """PYTHONPATH="$PYTHONPATH":'%s' """ % python_path
957         cmd += " ; "
958     if environment_setup:
959         cmd += environment_setup
960         cmd += " ; "
961     # Uncomment for debug (to run everything under strace)
962     # We had to verify if strace works (cannot nest them)
963     #cmd += "if strace echo >/dev/null 2>&1; then CMD='strace -ff -tt -s 200 -o strace.out'; else CMD=''; fi\n"
964     #cmd += "$CMD "
965     #cmd += "strace -f -tt -s 200 -o strace$$.out "
966     import nepi
967     cmd += "python -c 'import sys; sys.path.insert(0,%s); from nepi.util import server; server.decode_and_execute()'" % (
968         repr(os.path.dirname(os.path.dirname(nepi.__file__))).replace("'",'"'),
969     )
970
971     if sudo:
972         if ';' in cmd:
973             cmd = "sudo bash -c " + shell_escape(cmd)
974         else:
975             cmd = "sudo " + cmd
976
977     if communication == DC.ACCESS_SSH:
978         tmp_known_hosts = None
979         args = ['ssh', '-C',
980                 # Don't bother with localhost. Makes test easier
981                 '-o', 'NoHostAuthenticationForLocalhost=yes',
982                 # XXX: Security vulnerability
983                 #'-o', 'StrictHostKeyChecking=no',
984                 '-o', 'ConnectionAttempts=3',
985                 '-o', 'ServerAliveInterval=30',
986                 '-o', 'TCPKeepAlive=yes',
987                 '-l', user, host]
988         if agent:
989             args.append('-A')
990         if port:
991             args.append('-p%d' % port)
992         if ident_key:
993             args.extend(('-i', ident_key))
994         if tty:
995             args.append('-t')
996         if server_key:
997             # Create a temporary server key file
998             tmp_known_hosts = _make_server_key_args(
999                 server_key, host, port, args)
1000         args.append(cmd)
1001     else:
1002         args = [ "/bin/bash", "-c", cmd ]
1003
1004     # connects to the remote host and starts a remote
1005     proc = subprocess.Popen(args,
1006             shell = False, 
1007             stdout = subprocess.PIPE,
1008             stdin = subprocess.PIPE, 
1009             stderr = subprocess.PIPE)
1010
1011     if communication == DC.ACCESS_SSH:
1012         proc._known_hosts = tmp_known_hosts
1013
1014     # send the command to execute
1015     os.write(proc.stdin.fileno(),
1016             base64.b64encode(python_code) + "\n")
1017  
1018     while True: 
1019         try:
1020             msg = os.read(proc.stdout.fileno(), 3)
1021             break
1022         except OSError, e:            
1023             if e.errno == errno.EINTR:
1024                 continue
1025             else:
1026                 raise
1027     
1028     if msg != "OK\n":
1029         raise RuntimeError, "Failed to start remote python interpreter: \nout:\n%s%s\nerr:\n%s" % (
1030             msg, proc.stdout.read(), proc.stderr.read())
1031
1032     return proc
1033
1034 # POSIX
1035 def _communicate(self, input, timeout=None, err_on_timeout=True):
1036     read_set = []
1037     write_set = []
1038     stdout = None # Return
1039     stderr = None # Return
1040     
1041     killed = False
1042     
1043     if timeout is not None:
1044         timelimit = time.time() + timeout
1045         killtime = timelimit + 4
1046         bailtime = timelimit + 4
1047
1048     if self.stdin:
1049         # Flush stdio buffer.  This might block, if the user has
1050         # been writing to .stdin in an uncontrolled fashion.
1051         self.stdin.flush()
1052         if input:
1053             write_set.append(self.stdin)
1054         else:
1055             self.stdin.close()
1056     if self.stdout:
1057         read_set.append(self.stdout)
1058         stdout = []
1059     if self.stderr:
1060         read_set.append(self.stderr)
1061         stderr = []
1062
1063     input_offset = 0
1064     while read_set or write_set:
1065         if timeout is not None:
1066             curtime = time.time()
1067             if timeout is None or curtime > timelimit:
1068                 if curtime > bailtime:
1069                     break
1070                 elif curtime > killtime:
1071                     signum = signal.SIGKILL
1072                 else:
1073                     signum = signal.SIGTERM
1074                 # Lets kill it
1075                 os.kill(self.pid, signum)
1076                 select_timeout = 0.5
1077             else:
1078                 select_timeout = timelimit - curtime + 0.1
1079         else:
1080             select_timeout = 1.0
1081         
1082         if select_timeout > 1.0:
1083             select_timeout = 1.0
1084             
1085         try:
1086             rlist, wlist, xlist = select.select(read_set, write_set, [], select_timeout)
1087         except select.error,e:
1088             if e[0] != 4:
1089                 raise
1090             else:
1091                 continue
1092         
1093         if not rlist and not wlist and not xlist and self.poll() is not None:
1094             # timeout and process exited, say bye
1095             break
1096
1097         if self.stdin in wlist:
1098             # When select has indicated that the file is writable,
1099             # we can write up to PIPE_BUF bytes without risk
1100             # blocking.  POSIX defines PIPE_BUF >= 512
1101             bytes_written = os.write(self.stdin.fileno(), buffer(input, input_offset, 512))
1102             input_offset += bytes_written
1103             if input_offset >= len(input):
1104                 self.stdin.close()
1105                 write_set.remove(self.stdin)
1106
1107         if self.stdout in rlist:
1108             data = os.read(self.stdout.fileno(), 1024)
1109             if data == "":
1110                 self.stdout.close()
1111                 read_set.remove(self.stdout)
1112             stdout.append(data)
1113
1114         if self.stderr in rlist:
1115             data = os.read(self.stderr.fileno(), 1024)
1116             if data == "":
1117                 self.stderr.close()
1118                 read_set.remove(self.stderr)
1119             stderr.append(data)
1120     
1121     # All data exchanged.  Translate lists into strings.
1122     if stdout is not None:
1123         stdout = ''.join(stdout)
1124     if stderr is not None:
1125         stderr = ''.join(stderr)
1126
1127     # Translate newlines, if requested.  We cannot let the file
1128     # object do the translation: It is based on stdio, which is
1129     # impossible to combine with select (unless forcing no
1130     # buffering).
1131     if self.universal_newlines and hasattr(file, 'newlines'):
1132         if stdout:
1133             stdout = self._translate_newlines(stdout)
1134         if stderr:
1135             stderr = self._translate_newlines(stderr)
1136
1137     if killed and err_on_timeout:
1138         errcode = self.poll()
1139         raise RuntimeError, ("Operation timed out", errcode, stdout, stderr)
1140     else:
1141         if killed:
1142             self.poll()
1143         else:
1144             self.wait()
1145         return (stdout, stderr)
1146