Limit ControlPath's length (it's got a rather small limit)
[nepi.git] / src / nepi / util / server.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 from nepi.util.constants import DeploymentConfiguration as DC
5
6 import base64
7 import errno
8 import os
9 import os.path
10 import resource
11 import select
12 import shutil
13 import signal
14 import socket
15 import sys
16 import subprocess
17 import threading
18 import time
19 import traceback
20 import re
21 import tempfile
22 import defer
23 import functools
24 import collections
25 import hashlib
26
27 CTRL_SOCK = "ctrl.sock"
28 CTRL_PID = "ctrl.pid"
29 STD_ERR = "stderr.log"
30 MAX_FD = 1024
31
32 STOP_MSG = "STOP"
33
34 TRACE = os.environ.get("NEPI_TRACE", "false").lower() in ("true", "1", "on")
35
36 if hasattr(os, "devnull"):
37     DEV_NULL = os.devnull
38 else:
39     DEV_NULL = "/dev/null"
40
41 SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
42
43 def shell_escape(s):
44     """ Escapes strings so that they are safe to use as command-line arguments """
45     if SHELL_SAFE.match(s):
46         # safe string - no escaping needed
47         return s
48     else:
49         # unsafe string - escape
50         def escp(c):
51             if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",'"'):
52                 return c
53             else:
54                 return "'$'\\x%02x''" % (ord(c),)
55         s = ''.join(map(escp,s))
56         return "'%s'" % (s,)
57
58 def eintr_retry(func):
59     import functools
60     @functools.wraps(func)
61     def rv(*p, **kw):
62         retry = kw.pop("_retry", False)
63         for i in xrange(0 if retry else 4):
64             try:
65                 return func(*p, **kw)
66             except (select.error, socket.error), args:
67                 if args[0] == errno.EINTR:
68                     continue
69                 else:
70                     raise 
71             except OSError, e:
72                 if e.errno == errno.EINTR:
73                     continue
74                 else:
75                     raise
76         else:
77             return func(*p, **kw)
78     return rv
79
80 class Server(object):
81     def __init__(self, root_dir = ".", log_level = DC.ERROR_LEVEL, 
82             environment_setup = "", clean_root = False):
83         self._root_dir = root_dir
84         self._clean_root = clean_root
85         self._stop = False
86         self._ctrl_sock = None
87         self._log_level = log_level
88         self._rdbuf = ""
89         self._environment_setup = environment_setup
90
91     def run(self):
92         try:
93             if self.daemonize():
94                 self.post_daemonize()
95                 self.loop()
96                 self.cleanup()
97                 # ref: "os._exit(0)"
98                 # can not return normally after fork beacuse no exec was done.
99                 # This means that if we don't do a os._exit(0) here the code that 
100                 # follows the call to "Server.run()" in the "caller code" will be 
101                 # executed... but by now it has already been executed after the 
102                 # first process (the one that did the first fork) returned.
103                 os._exit(0)
104         except:
105             print >>sys.stderr, "SERVER_ERROR."
106             self.log_error()
107             self.cleanup()
108             os._exit(0)
109         print >>sys.stderr, "SERVER_READY."
110
111     def daemonize(self):
112         # pipes for process synchronization
113         (r, w) = os.pipe()
114         
115         # build root folder
116         root = os.path.normpath(self._root_dir)
117         if self._root_dir not in [".", ""] and os.path.exists(root) \
118                 and self._clean_root:
119             shutil.rmtree(root)
120         if not os.path.exists(root):
121             os.makedirs(root, 0755)
122
123         pid1 = os.fork()
124         if pid1 > 0:
125             os.close(w)
126             while True:
127                 try:
128                     os.read(r, 1)
129                 except OSError, e: # pragma: no cover
130                     if e.errno == errno.EINTR:
131                         continue
132                     else:
133                         raise
134                 break
135             os.close(r)
136             # os.waitpid avoids leaving a <defunc> (zombie) process
137             st = os.waitpid(pid1, 0)[1]
138             if st:
139                 raise RuntimeError("Daemonization failed")
140             # return 0 to inform the caller method that this is not the 
141             # daemonized process
142             return 0
143         os.close(r)
144
145         # Decouple from parent environment.
146         os.chdir(self._root_dir)
147         os.umask(0)
148         os.setsid()
149
150         # fork 2
151         pid2 = os.fork()
152         if pid2 > 0:
153             # see ref: "os._exit(0)"
154             os._exit(0)
155
156         # close all open file descriptors.
157         max_fd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
158         if (max_fd == resource.RLIM_INFINITY):
159             max_fd = MAX_FD
160         for fd in range(3, max_fd):
161             if fd != w:
162                 try:
163                     os.close(fd)
164                 except OSError:
165                     pass
166
167         # Redirect standard file descriptors.
168         stdin = open(DEV_NULL, "r")
169         stderr = stdout = open(STD_ERR, "a", 0)
170         os.dup2(stdin.fileno(), sys.stdin.fileno())
171         # NOTE: sys.stdout.write will still be buffered, even if the file
172         # was opened with 0 buffer
173         os.dup2(stdout.fileno(), sys.stdout.fileno())
174         os.dup2(stderr.fileno(), sys.stderr.fileno())
175         
176         # setup environment
177         if self._environment_setup:
178             # parse environment variables and pass to child process
179             # do it by executing shell commands, in case there's some heavy setup involved
180             envproc = subprocess.Popen(
181                 [ "bash", "-c", 
182                     "( %s python -c 'import os,sys ; print \"\\x01\".join(\"\\x02\".join(map(str,x)) for x in os.environ.iteritems())' ) | tail -1" %
183                         ( self._environment_setup, ) ],
184                 stdin = subprocess.PIPE, 
185                 stdout = subprocess.PIPE,
186                 stderr = subprocess.PIPE
187             )
188             out,err = envproc.communicate()
189
190             # parse new environment
191             if out:
192                 environment = dict(map(lambda x:x.split("\x02"), out.split("\x01")))
193             
194                 # apply to current environment
195                 for name, value in environment.iteritems():
196                     os.environ[name] = value
197                 
198                 # apply pythonpath
199                 if 'PYTHONPATH' in environment:
200                     sys.path = environment['PYTHONPATH'].split(':') + sys.path
201
202         # create control socket
203         self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
204         try:
205             self._ctrl_sock.bind(CTRL_SOCK)
206         except socket.error:
207             # Address in use, check pidfile
208             pid = None
209             try:
210                 pidfile = open(CTRL_PID, "r")
211                 pid = pidfile.read()
212                 pidfile.close()
213                 pid = int(pid)
214             except:
215                 # no pidfile
216                 pass
217             
218             if pid is not None:
219                 # Check process liveliness
220                 if not os.path.exists("/proc/%d" % (pid,)):
221                     # Ok, it's dead, clean the socket
222                     os.remove(CTRL_SOCK)
223             
224             # try again
225             self._ctrl_sock.bind(CTRL_SOCK)
226             
227         self._ctrl_sock.listen(0)
228         
229         # Save pidfile
230         pidfile = open(CTRL_PID, "w")
231         pidfile.write(str(os.getpid()))
232         pidfile.close()
233
234         # let the parent process know that the daemonization is finished
235         os.write(w, "\n")
236         os.close(w)
237         return 1
238
239     def post_daemonize(self):
240         os.environ["NEPI_CONTROLLER_LOGLEVEL"] = self._log_level
241         # QT, for some strange reason, redefines the SIGCHILD handler to write
242         # a \0 to a fd (lets say fileno 'x'), when ever a SIGCHILD is received.
243         # Server dameonization closes all file descriptors from fileno '3',
244         # but the overloaded handler (inherited by the forked process) will
245         # keep trying to write the \0 to fileno 'x', which might have been reused 
246         # after closing, for other operations. This is bad bad bad when fileno 'x'
247         # is in use for communication pouroses, because unexpected \0 start
248         # appearing in the communication messages... this is exactly what happens 
249         # when using netns in daemonized form. Thus, be have no other alternative than
250         # restoring the SIGCHLD handler to the default here.
251         import signal
252         signal.signal(signal.SIGCHLD, signal.SIG_DFL)
253
254     def loop(self):
255         while not self._stop:
256             conn, addr = self._ctrl_sock.accept()
257             self.log_error("ACCEPTED CONNECTION: %s" % (addr,))
258             conn.settimeout(5)
259             while not self._stop:
260                 try:
261                     msg = self.recv_msg(conn)
262                 except socket.timeout, e:
263                     #self.log_error("SERVER recv_msg: connection timedout ")
264                     continue
265                 
266                 if not msg:
267                     self.log_error("CONNECTION LOST")
268                     break
269                     
270                 if msg == STOP_MSG:
271                     self._stop = True
272                     reply = self.stop_action()
273                 else:
274                     reply = self.reply_action(msg)
275                 
276                 try:
277                     self.send_reply(conn, reply)
278                 except socket.error:
279                     self.log_error()
280                     self.log_error("NOTICE: Awaiting for reconnection")
281                     break
282             try:
283                 conn.close()
284             except:
285                 # Doesn't matter
286                 self.log_error()
287
288     def recv_msg(self, conn):
289         data = [self._rdbuf]
290         chunk = data[0]
291         while '\n' not in chunk:
292             try:
293                 chunk = conn.recv(1024)
294             except (OSError, socket.error), e:
295                 if e[0] != errno.EINTR:
296                     raise
297                 else:
298                     continue
299             if chunk:
300                 data.append(chunk)
301             else:
302                 # empty chunk = EOF
303                 break
304         data = ''.join(data).split('\n',1)
305         while len(data) < 2:
306             data.append('')
307         data, self._rdbuf = data
308         
309         decoded = base64.b64decode(data)
310         return decoded.rstrip()
311
312     def send_reply(self, conn, reply):
313         encoded = base64.b64encode(reply)
314         conn.send("%s\n" % encoded)
315        
316     def cleanup(self):
317         try:
318             self._ctrl_sock.close()
319             os.remove(CTRL_SOCK)
320         except:
321             self.log_error()
322
323     def stop_action(self):
324         return "Stopping server"
325
326     def reply_action(self, msg):
327         return "Reply to: %s" % msg
328
329     def log_error(self, text = None, context = ''):
330         if text == None:
331             text = traceback.format_exc()
332         date = time.strftime("%Y-%m-%d %H:%M:%S")
333         if context:
334             context = " (%s)" % (context,)
335         sys.stderr.write("ERROR%s: %s\n%s\n" % (context, date, text))
336         return text
337
338     def log_debug(self, text):
339         if self._log_level == DC.DEBUG_LEVEL:
340             date = time.strftime("%Y-%m-%d %H:%M:%S")
341             sys.stderr.write("DEBUG: %s\n%s\n" % (date, text))
342
343 class Forwarder(object):
344     def __init__(self, root_dir = "."):
345         self._ctrl_sock = None
346         self._root_dir = root_dir
347         self._stop = False
348         self._rdbuf = ""
349
350     def forward(self):
351         self.connect()
352         print >>sys.stderr, "FORWARDER_READY."
353         while not self._stop:
354             data = self.read_data()
355             if not data:
356                 # Connection to client lost
357                 break
358             self.send_to_server(data)
359             
360             data = self.recv_from_server()
361             if not data:
362                 # Connection to server lost
363                 raise IOError, "Connection to server lost while "\
364                     "expecting response"
365             self.write_data(data)
366         self.disconnect()
367
368     def read_data(self):
369         return sys.stdin.readline()
370
371     def write_data(self, data):
372         sys.stdout.write(data)
373         # sys.stdout.write is buffered, this is why we need to do a flush()
374         sys.stdout.flush()
375
376     def send_to_server(self, data):
377         try:
378             self._ctrl_sock.send(data)
379         except (IOError, socket.error), e:
380             if e[0] == errno.EPIPE:
381                 self.connect()
382                 self._ctrl_sock.send(data)
383             else:
384                 raise e
385         encoded = data.rstrip() 
386         msg = base64.b64decode(encoded)
387         if msg == STOP_MSG:
388             self._stop = True
389
390     def recv_from_server(self):
391         data = [self._rdbuf]
392         chunk = data[0]
393         while '\n' not in chunk:
394             try:
395                 chunk = self._ctrl_sock.recv(1024)
396             except (OSError, socket.error), e:
397                 if e[0] != errno.EINTR:
398                     raise
399                 continue
400             if chunk:
401                 data.append(chunk)
402             else:
403                 # empty chunk = EOF
404                 break
405         data = ''.join(data).split('\n',1)
406         while len(data) < 2:
407             data.append('')
408         data, self._rdbuf = data
409         
410         return data+'\n'
411  
412     def connect(self):
413         self.disconnect()
414         self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
415         sock_addr = os.path.join(self._root_dir, CTRL_SOCK)
416         self._ctrl_sock.connect(sock_addr)
417
418     def disconnect(self):
419         try:
420             self._ctrl_sock.close()
421         except:
422             pass
423
424 class Client(object):
425     def __init__(self, root_dir = ".", host = None, port = None, user = None, 
426             agent = None, sudo = False, communication = DC.ACCESS_LOCAL,
427             environment_setup = ""):
428         self.root_dir = root_dir
429         self.addr = (host, port)
430         self.user = user
431         self.agent = agent
432         self.sudo = sudo
433         self.communication = communication
434         self.environment_setup = environment_setup
435         self._stopped = False
436         self._deferreds = collections.deque()
437         self.connect()
438     
439     def __del__(self):
440         if self._process.poll() is None:
441             os.kill(self._process.pid, signal.SIGTERM)
442         self._process.wait()
443         
444     def connect(self):
445         root_dir = self.root_dir
446         (host, port) = self.addr
447         user = self.user
448         agent = self.agent
449         sudo = self.sudo
450         communication = self.communication
451         
452         python_code = "from nepi.util import server;c=server.Forwarder(%r);\
453                 c.forward()" % (root_dir,)
454
455         self._process = popen_python(python_code, 
456                     communication = communication,
457                     host = host, 
458                     port = port, 
459                     user = user, 
460                     agent = agent, 
461                     sudo = sudo, 
462                     environment_setup = self.environment_setup)
463                
464         # Wait for the forwarder to be ready, otherwise nobody
465         # will be able to connect to it
466         err = []
467         helo = "nope"
468         while helo:
469             helo = self._process.stderr.readline()
470             if helo == 'FORWARDER_READY.\n':
471                 break
472             err.append(helo)
473         else:
474             raise AssertionError, "Expected 'FORWARDER_READY.', got: %s" % (''.join(err),)
475         
476     def send_msg(self, msg):
477         encoded = base64.b64encode(msg)
478         data = "%s\n" % encoded
479         
480         try:
481             self._process.stdin.write(data)
482         except (IOError, ValueError):
483             # dead process, poll it to un-zombify
484             self._process.poll()
485             
486             # try again after reconnect
487             # If it fails again, though, give up
488             self.connect()
489             self._process.stdin.write(data)
490
491     def send_stop(self):
492         self.send_msg(STOP_MSG)
493         self._stopped = True
494
495     def defer_reply(self, transform=None):
496         defer_entry = []
497         self._deferreds.append(defer_entry)
498         return defer.Defer(
499             functools.partial(self.read_reply, defer_entry, transform)
500         )
501         
502     def _read_reply(self):
503         data = self._process.stdout.readline()
504         encoded = data.rstrip() 
505         if not encoded:
506             # empty == eof == dead process, poll it to un-zombify
507             self._process.poll()
508             
509             raise RuntimeError, "Forwarder died while awaiting reply: %s" % (self._process.stderr.read(),)
510         return base64.b64decode(encoded)
511     
512     def read_reply(self, which=None, transform=None):
513         # Test to see if someone did it already
514         if which is not None and len(which):
515             # Ok, they did it...
516             # ...just return the deferred value
517             if transform:
518                 return transform(which[0])
519             else:
520                 return which[0]
521         
522         # Process all deferreds until the one we're looking for
523         # or until the queue is empty
524         while self._deferreds:
525             try:
526                 deferred = self._deferreds.popleft()
527             except IndexError:
528                 # emptied
529                 break
530             
531             deferred.append(self._read_reply())
532             if deferred is which:
533                 # We reached the one we were looking for
534                 if transform:
535                     return transform(deferred[0])
536                 else:
537                     return deferred[0]
538         
539         if which is None:
540             # They've requested a synchronous read
541             if transform:
542                 return transform(self._read_reply())
543             else:
544                 return self._read_reply()
545
546 def _make_server_key_args(server_key, host, port, args):
547     """ 
548     Returns a reference to the created temporary file, and adds the
549     corresponding arguments to the given argument list.
550     
551     Make sure to hold onto it until the process is done with the file
552     """
553     if port is not None:
554         host = '%s:%s' % (host,port)
555     # Create a temporary server key file
556     tmp_known_hosts = tempfile.NamedTemporaryFile()
557     
558     # Add the intended host key
559     tmp_known_hosts.write('%s,%s %s\n' % (host, socket.gethostbyname(host), server_key))
560     
561     # If we're not in strict mode, add user-configured keys
562     if os.environ.get('NEPI_STRICT_AUTH_MODE',"").lower() not in ('1','true','on'):
563         user_hosts_path = '%s/.ssh/known_hosts' % (os.environ.get('HOME',""),)
564         if os.access(user_hosts_path, os.R_OK):
565             f = open(user_hosts_path, "r")
566             tmp_known_hosts.write(f.read())
567             f.close()
568         
569     tmp_known_hosts.flush()
570     
571     args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
572     
573     return tmp_known_hosts
574
575 def make_connkey(user, host, port):
576     connkey = repr((user,host,port)).encode("base64").strip().replace('/','.')
577     if len(connkey) > 60:
578         connkey = hashlib.sha1(connkey).hexdigest()
579     return connkey
580
581 def popen_ssh_command(command, host, port, user, agent, 
582         stdin="", 
583         ident_key = None,
584         server_key = None,
585         tty = False,
586         timeout = None,
587         retry = 0,
588         err_on_timeout = True,
589         connect_timeout = 30,
590         persistent = True):
591     """
592     Executes a remote commands, returns ((stdout,stderr),process)
593     """
594     if TRACE:
595         print "ssh", host, command
596     
597     tmp_known_hosts = None
598     connkey = make_connkey(user,host,port)
599     args = ['ssh', '-C',
600             # Don't bother with localhost. Makes test easier
601             '-o', 'NoHostAuthenticationForLocalhost=yes',
602             '-o', 'ConnectTimeout=%d' % (int(connect_timeout),),
603             '-o', 'ConnectionAttempts=3',
604             '-o', 'ServerAliveInterval=30',
605             '-o', 'TCPKeepAlive=yes',
606             '-l', user, host]
607     if persistent:
608         args.extend([
609             '-o', 'ControlMaster=auto',
610             '-o', 'ControlPath=/tmp/nepi_ssh_pl_%s' % ( connkey, ),
611             '-o', 'ControlPersist=60' ])
612     if agent:
613         args.append('-A')
614     if port:
615         args.append('-p%d' % port)
616     if ident_key:
617         args.extend(('-i', ident_key))
618     if tty:
619         args.append('-t')
620     if server_key:
621         # Create a temporary server key file
622         tmp_known_hosts = _make_server_key_args(
623             server_key, host, port, args)
624     args.append(command)
625
626     for x in xrange(retry or 3):
627         # connects to the remote host and starts a remote connection
628         proc = subprocess.Popen(args, 
629                 stdout = subprocess.PIPE,
630                 stdin = subprocess.PIPE, 
631                 stderr = subprocess.PIPE)
632         
633         # attach tempfile object to the process, to make sure the file stays
634         # alive until the process is finished with it
635         proc._known_hosts = tmp_known_hosts
636         
637         try:
638             out, err = _communicate(proc, stdin, timeout, err_on_timeout)
639             if proc.poll() and err.strip().startswith('ssh: '):
640                 # SSH error, can safely retry
641                 continue
642             break
643         except RuntimeError,e:
644             if retry <= 0:
645                 raise
646             if TRACE:
647                 print " timedout -> ", e.args
648             retry -= 1
649         
650     if TRACE:
651         print " -> ", out, err
652
653     return ((out, err), proc)
654
655 def popen_scp(source, dest, 
656         port = None, 
657         agent = None, 
658         recursive = False,
659         ident_key = None,
660         server_key = None):
661     """
662     Copies from/to remote sites.
663     
664     Source and destination should have the user and host encoded
665     as per scp specs.
666     
667     If source is a file object, a special mode will be used to
668     create the remote file with the same contents.
669     
670     If dest is a file object, the remote file (source) will be
671     read and written into dest.
672     
673     In these modes, recursive cannot be True.
674     
675     Source can be a list of files to copy to a single destination,
676     in which case it is advised that the destination be a folder.
677     """
678     
679     if TRACE:
680         print "scp", source, dest
681     
682     if isinstance(source, file) and source.tell() == 0:
683         source = source.name
684     elif hasattr(source, 'read'):
685         tmp = tempfile.NamedTemporaryFile()
686         while True:
687             buf = source.read(65536)
688             if buf:
689                 tmp.write(buf)
690             else:
691                 break
692         tmp.seek(0)
693         source = tmp.name
694     
695     if isinstance(source, file) or isinstance(dest, file) \
696             or hasattr(source, 'read')  or hasattr(dest, 'write'):
697         assert not recursive
698         
699         # Parse source/destination as <user>@<server>:<path>
700         if isinstance(dest, basestring) and ':' in dest:
701             remspec, path = dest.split(':',1)
702         elif isinstance(source, basestring) and ':' in source:
703             remspec, path = source.split(':',1)
704         else:
705             raise ValueError, "Both endpoints cannot be local"
706         user,host = remspec.rsplit('@',1)
707         tmp_known_hosts = None
708         
709         connkey = make_connkey(user,host,port)
710         args = ['ssh', '-l', user, '-C',
711                 # Don't bother with localhost. Makes test easier
712                 '-o', 'NoHostAuthenticationForLocalhost=yes',
713                 '-o', 'ConnectTimeout=30',
714                 '-o', 'ConnectionAttempts=3',
715                 '-o', 'ServerAliveInterval=30',
716                 '-o', 'TCPKeepAlive=yes',
717                 '-o', 'ControlMaster=auto',
718                 '-o', 'ControlPath=/tmp/nepi_ssh_pl_%s' % ( connkey, ),
719                 '-o', 'ControlPersist=60',
720                 host ]
721         if port:
722             args.append('-P%d' % port)
723         if ident_key:
724             args.extend(('-i', ident_key))
725         if server_key:
726             # Create a temporary server key file
727             tmp_known_hosts = _make_server_key_args(
728                 server_key, host, port, args)
729         
730         if isinstance(source, file) or hasattr(source, 'read'):
731             args.append('cat > %s' % (shell_escape(path),))
732         elif isinstance(dest, file) or hasattr(dest, 'write'):
733             args.append('cat %s' % (shell_escape(path),))
734         else:
735             raise AssertionError, "Unreachable code reached! :-Q"
736         
737         # connects to the remote host and starts a remote connection
738         if isinstance(source, file):
739             proc = subprocess.Popen(args, 
740                     stdout = open('/dev/null','w'),
741                     stderr = subprocess.PIPE,
742                     stdin = source)
743             err = proc.stderr.read()
744             proc._known_hosts = tmp_known_hosts
745             eintr_retry(proc.wait)()
746             return ((None,err), proc)
747         elif isinstance(dest, file):
748             proc = subprocess.Popen(args, 
749                     stdout = open('/dev/null','w'),
750                     stderr = subprocess.PIPE,
751                     stdin = source)
752             err = proc.stderr.read()
753             proc._known_hosts = tmp_known_hosts
754             eintr_retry(proc.wait)()
755             return ((None,err), proc)
756         elif hasattr(source, 'read'):
757             # file-like (but not file) source
758             proc = subprocess.Popen(args, 
759                     stdout = open('/dev/null','w'),
760                     stderr = subprocess.PIPE,
761                     stdin = subprocess.PIPE)
762             
763             buf = None
764             err = []
765             while True:
766                 if not buf:
767                     buf = source.read(4096)
768                 if not buf:
769                     #EOF
770                     break
771                 
772                 rdrdy, wrdy, broken = select.select(
773                     [proc.stderr],
774                     [proc.stdin],
775                     [proc.stderr,proc.stdin])
776                 
777                 if proc.stderr in rdrdy:
778                     # use os.read for fully unbuffered behavior
779                     err.append(os.read(proc.stderr.fileno(), 4096))
780                 
781                 if proc.stdin in wrdy:
782                     proc.stdin.write(buf)
783                     buf = None
784                 
785                 if broken:
786                     break
787             proc.stdin.close()
788             err.append(proc.stderr.read())
789                 
790             proc._known_hosts = tmp_known_hosts
791             eintr_retry(proc.wait)()
792             return ((None,''.join(err)), proc)
793         elif hasattr(dest, 'write'):
794             # file-like (but not file) dest
795             proc = subprocess.Popen(args, 
796                     stdout = subprocess.PIPE,
797                     stderr = subprocess.PIPE,
798                     stdin = open('/dev/null','w'))
799             
800             buf = None
801             err = []
802             while True:
803                 rdrdy, wrdy, broken = select.select(
804                     [proc.stderr, proc.stdout],
805                     [],
806                     [proc.stderr, proc.stdout])
807                 
808                 if proc.stderr in rdrdy:
809                     # use os.read for fully unbuffered behavior
810                     err.append(os.read(proc.stderr.fileno(), 4096))
811                 
812                 if proc.stdout in rdrdy:
813                     # use os.read for fully unbuffered behavior
814                     buf = os.read(proc.stdout.fileno(), 4096)
815                     dest.write(buf)
816                     
817                     if not buf:
818                         #EOF
819                         break
820                 
821                 if broken:
822                     break
823             err.append(proc.stderr.read())
824                 
825             proc._known_hosts = tmp_known_hosts
826             eintr_retry(proc.wait)()
827             return ((None,''.join(err)), proc)
828         else:
829             raise AssertionError, "Unreachable code reached! :-Q"
830     else:
831         # Parse destination as <user>@<server>:<path>
832         if isinstance(dest, basestring) and ':' in dest:
833             remspec, path = dest.split(':',1)
834         elif isinstance(source, basestring) and ':' in source:
835             remspec, path = source.split(':',1)
836         else:
837             raise ValueError, "Both endpoints cannot be local"
838         user,host = remspec.rsplit('@',1)
839         
840         # plain scp
841         tmp_known_hosts = None
842         args = ['scp', '-q', '-p', '-C',
843                 # Don't bother with localhost. Makes test easier
844                 '-o', 'NoHostAuthenticationForLocalhost=yes' ]
845         if port:
846             args.append('-P%d' % port)
847         if recursive:
848             args.append('-r')
849         if ident_key:
850             args.extend(('-i', ident_key))
851         if server_key:
852             # Create a temporary server key file
853             tmp_known_hosts = _make_server_key_args(
854                 server_key, host, port, args)
855         if isinstance(source,list):
856             args.extend(source)
857         else:
858             args.append(source)
859         args.append(dest)
860
861         # connects to the remote host and starts a remote connection
862         proc = subprocess.Popen(args, 
863                 stdout = subprocess.PIPE,
864                 stdin = subprocess.PIPE, 
865                 stderr = subprocess.PIPE)
866         proc._known_hosts = tmp_known_hosts
867         
868         comm = proc.communicate()
869         eintr_retry(proc.wait)()
870         return (comm, proc)
871
872 def decode_and_execute():
873     # The python code we want to execute might have characters that 
874     # are not compatible with the 'inline' mode we are using. To avoid
875     # problems we receive the encoded python code in base64 as a input 
876     # stream and decode it for execution.
877     import base64, os
878     cmd = ""
879     while True:
880         try:
881             cmd += os.read(0, 1)# one byte from stdin
882         except OSError, e:            
883             if e.errno == errno.EINTR:
884                 continue
885             else:
886                 raise
887         if cmd[-1] == "\n": 
888             break
889     cmd = base64.b64decode(cmd)
890     # Uncomment for debug
891     #os.write(2, "Executing python code: %s\n" % cmd)
892     os.write(1, "OK\n") # send a sync message
893     exec(cmd)
894
895 def popen_python(python_code, 
896         communication = DC.ACCESS_LOCAL,
897         host = None, 
898         port = None, 
899         user = None, 
900         agent = False, 
901         python_path = None,
902         ident_key = None,
903         server_key = None,
904         tty = False,
905         sudo = False, 
906         environment_setup = ""):
907
908     cmd = ""
909     if python_path:
910         python_path.replace("'", r"'\''")
911         cmd = """PYTHONPATH="$PYTHONPATH":'%s' """ % python_path
912         cmd += " ; "
913     if environment_setup:
914         cmd += environment_setup
915         cmd += " ; "
916     # Uncomment for debug (to run everything under strace)
917     # We had to verify if strace works (cannot nest them)
918     #cmd += "if strace echo >/dev/null 2>&1; then CMD='strace -ff -tt -s 200 -o strace.out'; else CMD=''; fi\n"
919     #cmd += "$CMD "
920     #cmd += "strace -f -tt -s 200 -o strace$$.out "
921     import nepi
922     cmd += "python -c 'import sys; sys.path.insert(0,%s); from nepi.util import server; server.decode_and_execute()'" % (
923         repr(os.path.dirname(os.path.dirname(nepi.__file__))).replace("'",'"'),
924     )
925
926     if sudo:
927         if ';' in cmd:
928             cmd = "sudo bash -c " + shell_escape(cmd)
929         else:
930             cmd = "sudo " + cmd
931
932     if communication == DC.ACCESS_SSH:
933         tmp_known_hosts = None
934         args = ['ssh', '-C',
935                 # Don't bother with localhost. Makes test easier
936                 '-o', 'NoHostAuthenticationForLocalhost=yes',
937                 '-o', 'ConnectionAttempts=3',
938                 '-o', 'ServerAliveInterval=30',
939                 '-o', 'TCPKeepAlive=yes',
940                 '-l', user, host]
941         if agent:
942             args.append('-A')
943         if port:
944             args.append('-p%d' % port)
945         if ident_key:
946             args.extend(('-i', ident_key))
947         if tty:
948             args.append('-t')
949         if server_key:
950             # Create a temporary server key file
951             tmp_known_hosts = _make_server_key_args(
952                 server_key, host, port, args)
953         args.append(cmd)
954     else:
955         args = [ "/bin/bash", "-c", cmd ]
956
957     # connects to the remote host and starts a remote
958     proc = subprocess.Popen(args,
959             shell = False, 
960             stdout = subprocess.PIPE,
961             stdin = subprocess.PIPE, 
962             stderr = subprocess.PIPE)
963
964     if communication == DC.ACCESS_SSH:
965         proc._known_hosts = tmp_known_hosts
966
967     # send the command to execute
968     os.write(proc.stdin.fileno(),
969             base64.b64encode(python_code) + "\n")
970  
971     while True: 
972         try:
973             msg = os.read(proc.stdout.fileno(), 3)
974             break
975         except OSError, e:            
976             if e.errno == errno.EINTR:
977                 continue
978             else:
979                 raise
980     
981     if msg != "OK\n":
982         raise RuntimeError, "Failed to start remote python interpreter: \nout:\n%s%s\nerr:\n%s" % (
983             msg, proc.stdout.read(), proc.stderr.read())
984
985     return proc
986
987 # POSIX
988 def _communicate(self, input, timeout=None, err_on_timeout=True):
989     read_set = []
990     write_set = []
991     stdout = None # Return
992     stderr = None # Return
993     
994     killed = False
995     
996     if timeout is not None:
997         timelimit = time.time() + timeout
998         killtime = timelimit + 4
999         bailtime = timelimit + 4
1000
1001     if self.stdin:
1002         # Flush stdio buffer.  This might block, if the user has
1003         # been writing to .stdin in an uncontrolled fashion.
1004         self.stdin.flush()
1005         if input:
1006             write_set.append(self.stdin)
1007         else:
1008             self.stdin.close()
1009     if self.stdout:
1010         read_set.append(self.stdout)
1011         stdout = []
1012     if self.stderr:
1013         read_set.append(self.stderr)
1014         stderr = []
1015
1016     input_offset = 0
1017     while read_set or write_set:
1018         if timeout is not None:
1019             curtime = time.time()
1020             if timeout is None or curtime > timelimit:
1021                 if curtime > bailtime:
1022                     break
1023                 elif curtime > killtime:
1024                     signum = signal.SIGKILL
1025                 else:
1026                     signum = signal.SIGTERM
1027                 # Lets kill it
1028                 os.kill(self.pid, signum)
1029                 select_timeout = 0.5
1030             else:
1031                 select_timeout = timelimit - curtime + 0.1
1032         else:
1033             select_timeout = 1.0
1034         
1035         if select_timeout > 1.0:
1036             select_timeout = 1.0
1037             
1038         try:
1039             rlist, wlist, xlist = select.select(read_set, write_set, [], select_timeout)
1040         except select.error,e:
1041             if e[0] != 4:
1042                 raise
1043             else:
1044                 continue
1045         
1046         if not rlist and not wlist and not xlist and self.poll() is not None:
1047             # timeout and process exited, say bye
1048             break
1049
1050         if self.stdin in wlist:
1051             # When select has indicated that the file is writable,
1052             # we can write up to PIPE_BUF bytes without risk
1053             # blocking.  POSIX defines PIPE_BUF >= 512
1054             bytes_written = os.write(self.stdin.fileno(), buffer(input, input_offset, 512))
1055             input_offset += bytes_written
1056             if input_offset >= len(input):
1057                 self.stdin.close()
1058                 write_set.remove(self.stdin)
1059
1060         if self.stdout in rlist:
1061             data = os.read(self.stdout.fileno(), 1024)
1062             if data == "":
1063                 self.stdout.close()
1064                 read_set.remove(self.stdout)
1065             stdout.append(data)
1066
1067         if self.stderr in rlist:
1068             data = os.read(self.stderr.fileno(), 1024)
1069             if data == "":
1070                 self.stderr.close()
1071                 read_set.remove(self.stderr)
1072             stderr.append(data)
1073     
1074     # All data exchanged.  Translate lists into strings.
1075     if stdout is not None:
1076         stdout = ''.join(stdout)
1077     if stderr is not None:
1078         stderr = ''.join(stderr)
1079
1080     # Translate newlines, if requested.  We cannot let the file
1081     # object do the translation: It is based on stdio, which is
1082     # impossible to combine with select (unless forcing no
1083     # buffering).
1084     if self.universal_newlines and hasattr(file, 'newlines'):
1085         if stdout:
1086             stdout = self._translate_newlines(stdout)
1087         if stderr:
1088             stderr = self._translate_newlines(stderr)
1089
1090     if killed and err_on_timeout:
1091         errcode = self.poll()
1092         raise RuntimeError, ("Operation timed out", errcode, stdout, stderr)
1093     else:
1094         if killed:
1095             self.poll()
1096         else:
1097             self.wait()
1098         return (stdout, stderr)
1099