79067125ee8b44aa112b77191261f95651d24ddc
[nepi.git] / src / nepi / util / server.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 from nepi.util.constants import DeploymentConfiguration as DC
5
6 import base64
7 import errno
8 import os
9 import os.path
10 import resource
11 import select
12 import shutil
13 import signal
14 import socket
15 import sys
16 import subprocess
17 import threading
18 import time
19 import traceback
20 import re
21 import tempfile
22 import defer
23 import functools
24 import collections
25
26 CTRL_SOCK = "ctrl.sock"
27 STD_ERR = "stderr.log"
28 MAX_FD = 1024
29
30 STOP_MSG = "STOP"
31
32 TRACE = os.environ.get("NEPI_TRACE", "false").lower() in ("true", "1", "on")
33
34 if hasattr(os, "devnull"):
35     DEV_NULL = os.devnull
36 else:
37     DEV_NULL = "/dev/null"
38
39 SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
40
41 def shell_escape(s):
42     """ Escapes strings so that they are safe to use as command-line arguments """
43     if SHELL_SAFE.match(s):
44         # safe string - no escaping needed
45         return s
46     else:
47         # unsafe string - escape
48         def escp(c):
49             if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",):
50                 return c
51             else:
52                 return "'$'\\x%02x''" % (ord(c),)
53         s = ''.join(map(escp,s))
54         return "'%s'" % (s,)
55
56 def eintr_retry(func):
57     import functools
58     @functools.wraps(func)
59     def rv(*p, **kw):
60         retry = kw.pop("_retry", False)
61         for i in xrange(0 if retry else 4):
62             try:
63                 return func(*p, **kw)
64             except (select.error, socket.error), args:
65                 if args[0] == errno.EINTR:
66                     continue
67                 else:
68                     raise 
69             except OSError, e:
70                 if e.errno == errno.EINTR:
71                     continue
72                 else:
73                     raise
74         else:
75             return func(*p, **kw)
76     return rv
77
78 class Server(object):
79     def __init__(self, root_dir = ".", log_level = DC.ERROR_LEVEL, 
80             environment_setup = "", clean_root = False):
81         self._root_dir = root_dir
82         self._clean_root = clean_root
83         self._stop = False
84         self._ctrl_sock = None
85         self._log_level = log_level
86         self._rdbuf = ""
87         self._environment_setup = environment_setup
88
89     def run(self):
90         try:
91             if self.daemonize():
92                 self.post_daemonize()
93                 self.loop()
94                 self.cleanup()
95                 # ref: "os._exit(0)"
96                 # can not return normally after fork beacuse no exec was done.
97                 # This means that if we don't do a os._exit(0) here the code that 
98                 # follows the call to "Server.run()" in the "caller code" will be 
99                 # executed... but by now it has already been executed after the 
100                 # first process (the one that did the first fork) returned.
101                 os._exit(0)
102         except:
103             print >>sys.stderr, "SERVER_ERROR."
104             self.log_error()
105             self.cleanup()
106             os._exit(0)
107         print >>sys.stderr, "SERVER_READY."
108
109     def daemonize(self):
110         # pipes for process synchronization
111         (r, w) = os.pipe()
112         
113         # build root folder
114         root = os.path.normpath(self._root_dir)
115         if os.path.exists(root) and self._clean_root:
116             shutil.rmtree(root)
117         if not os.path.exists(root):
118             os.makedirs(root, 0755)
119
120         pid1 = os.fork()
121         if pid1 > 0:
122             os.close(w)
123             while True:
124                 try:
125                     os.read(r, 1)
126                 except OSError, e: # pragma: no cover
127                     if e.errno == errno.EINTR:
128                         continue
129                     else:
130                         raise
131                 break
132             os.close(r)
133             # os.waitpid avoids leaving a <defunc> (zombie) process
134             st = os.waitpid(pid1, 0)[1]
135             if st:
136                 raise RuntimeError("Daemonization failed")
137             # return 0 to inform the caller method that this is not the 
138             # daemonized process
139             return 0
140         os.close(r)
141
142         # Decouple from parent environment.
143         os.chdir(self._root_dir)
144         os.umask(0)
145         os.setsid()
146
147         # fork 2
148         pid2 = os.fork()
149         if pid2 > 0:
150             # see ref: "os._exit(0)"
151             os._exit(0)
152
153         # close all open file descriptors.
154         max_fd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
155         if (max_fd == resource.RLIM_INFINITY):
156             max_fd = MAX_FD
157         for fd in range(3, max_fd):
158             if fd != w:
159                 try:
160                     os.close(fd)
161                 except OSError:
162                     pass
163
164         # Redirect standard file descriptors.
165         stdin = open(DEV_NULL, "r")
166         stderr = stdout = open(STD_ERR, "a", 0)
167         os.dup2(stdin.fileno(), sys.stdin.fileno())
168         # NOTE: sys.stdout.write will still be buffered, even if the file
169         # was opened with 0 buffer
170         os.dup2(stdout.fileno(), sys.stdout.fileno())
171         os.dup2(stderr.fileno(), sys.stderr.fileno())
172         
173         # setup environment
174         if self._environment_setup:
175             # parse environment variables and pass to child process
176             # do it by executing shell commands, in case there's some heavy setup involved
177             envproc = subprocess.Popen(
178                 [ "bash", "-c", 
179                     "( %s python -c 'import os,sys ; print \"\\x01\".join(\"\\x02\".join(map(str,x)) for x in os.environ.iteritems())' ) | tail -1" %
180                         ( self._environment_setup, ) ],
181                 stdin = subprocess.PIPE, 
182                 stdout = subprocess.PIPE,
183                 stderr = subprocess.PIPE
184             )
185             out,err = envproc.communicate()
186
187             # parse new environment
188             if out:
189                 environment = dict(map(lambda x:x.split("\x02"), out.split("\x01")))
190             
191                 # apply to current environment
192                 for name, value in environment.iteritems():
193                     os.environ[name] = value
194                 
195                 # apply pythonpath
196                 if 'PYTHONPATH' in environment:
197                     sys.path = environment['PYTHONPATH'].split(':') + sys.path
198
199         # create control socket
200         self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
201         self._ctrl_sock.bind(CTRL_SOCK)
202         self._ctrl_sock.listen(0)
203
204         # let the parent process know that the daemonization is finished
205         os.write(w, "\n")
206         os.close(w)
207         return 1
208
209     def post_daemonize(self):
210         os.environ["NEPI_CONTROLLER_LOGLEVEL"] = self._log_level
211         # QT, for some strange reason, redefines the SIGCHILD handler to write
212         # a \0 to a fd (lets say fileno 'x'), when ever a SIGCHILD is received.
213         # Server dameonization closes all file descriptors from fileno '3',
214         # but the overloaded handler (inherited by the forked process) will
215         # keep trying to write the \0 to fileno 'x', which might have been reused 
216         # after closing, for other operations. This is bad bad bad when fileno 'x'
217         # is in use for communication pouroses, because unexpected \0 start
218         # appearing in the communication messages... this is exactly what happens 
219         # when using netns in daemonized form. Thus, be have no other alternative than
220         # restoring the SIGCHLD handler to the default here.
221         import signal
222         signal.signal(signal.SIGCHLD, signal.SIG_DFL)
223
224     def loop(self):
225         while not self._stop:
226             conn, addr = self._ctrl_sock.accept()
227             self.log_error("ACCEPTED CONNECTION: %s" % (addr,))
228             conn.settimeout(5)
229             while not self._stop:
230                 try:
231                     msg = self.recv_msg(conn)
232                 except socket.timeout, e:
233                     #self.log_error("SERVER recv_msg: connection timedout ")
234                     continue
235                 
236                 if not msg:
237                     self.log_error("CONNECTION LOST")
238                     break
239                     
240                 if msg == STOP_MSG:
241                     self._stop = True
242                     reply = self.stop_action()
243                 else:
244                     reply = self.reply_action(msg)
245                 
246                 try:
247                     self.send_reply(conn, reply)
248                 except socket.error:
249                     self.log_error()
250                     self.log_error("NOTICE: Awaiting for reconnection")
251                     break
252             try:
253                 conn.close()
254             except:
255                 # Doesn't matter
256                 self.log_error()
257
258     def recv_msg(self, conn):
259         data = [self._rdbuf]
260         chunk = data[0]
261         while '\n' not in chunk:
262             try:
263                 chunk = conn.recv(1024)
264             except (OSError, socket.error), e:
265                 if e[0] != errno.EINTR:
266                     raise
267                 else:
268                     continue
269             if chunk:
270                 data.append(chunk)
271             else:
272                 # empty chunk = EOF
273                 break
274         data = ''.join(data).split('\n',1)
275         while len(data) < 2:
276             data.append('')
277         data, self._rdbuf = data
278         
279         decoded = base64.b64decode(data)
280         return decoded.rstrip()
281
282     def send_reply(self, conn, reply):
283         encoded = base64.b64encode(reply)
284         conn.send("%s\n" % encoded)
285        
286     def cleanup(self):
287         try:
288             self._ctrl_sock.close()
289             os.remove(CTRL_SOCK)
290         except:
291             self.log_error()
292
293     def stop_action(self):
294         return "Stopping server"
295
296     def reply_action(self, msg):
297         return "Reply to: %s" % msg
298
299     def log_error(self, text = None, context = ''):
300         if text == None:
301             text = traceback.format_exc()
302         date = time.strftime("%Y-%m-%d %H:%M:%S")
303         if context:
304             context = " (%s)" % (context,)
305         sys.stderr.write("ERROR%s: %s\n%s\n" % (context, date, text))
306         return text
307
308     def log_debug(self, text):
309         if self._log_level == DC.DEBUG_LEVEL:
310             date = time.strftime("%Y-%m-%d %H:%M:%S")
311             sys.stderr.write("DEBUG: %s\n%s\n" % (date, text))
312
313 class Forwarder(object):
314     def __init__(self, root_dir = "."):
315         self._ctrl_sock = None
316         self._root_dir = root_dir
317         self._stop = False
318         self._rdbuf = ""
319
320     def forward(self):
321         self.connect()
322         print >>sys.stderr, "FORWARDER_READY."
323         while not self._stop:
324             data = self.read_data()
325             if not data:
326                 # Connection to client lost
327                 break
328             self.send_to_server(data)
329             
330             data = self.recv_from_server()
331             if not data:
332                 # Connection to server lost
333                 raise IOError, "Connection to server lost while "\
334                     "expecting response"
335             self.write_data(data)
336         self.disconnect()
337
338     def read_data(self):
339         return sys.stdin.readline()
340
341     def write_data(self, data):
342         sys.stdout.write(data)
343         # sys.stdout.write is buffered, this is why we need to do a flush()
344         sys.stdout.flush()
345
346     def send_to_server(self, data):
347         try:
348             self._ctrl_sock.send(data)
349         except (IOError, socket.error), e:
350             if e[0] == errno.EPIPE:
351                 self.connect()
352                 self._ctrl_sock.send(data)
353             else:
354                 raise e
355         encoded = data.rstrip() 
356         msg = base64.b64decode(encoded)
357         if msg == STOP_MSG:
358             self._stop = True
359
360     def recv_from_server(self):
361         data = [self._rdbuf]
362         chunk = data[0]
363         while '\n' not in chunk:
364             try:
365                 chunk = self._ctrl_sock.recv(1024)
366             except (OSError, socket.error), e:
367                 if e[0] != errno.EINTR:
368                     raise
369                 continue
370             if chunk:
371                 data.append(chunk)
372             else:
373                 # empty chunk = EOF
374                 break
375         data = ''.join(data).split('\n',1)
376         while len(data) < 2:
377             data.append('')
378         data, self._rdbuf = data
379         
380         return data+'\n'
381  
382     def connect(self):
383         self.disconnect()
384         self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
385         sock_addr = os.path.join(self._root_dir, CTRL_SOCK)
386         self._ctrl_sock.connect(sock_addr)
387
388     def disconnect(self):
389         try:
390             self._ctrl_sock.close()
391         except:
392             pass
393
394 class Client(object):
395     def __init__(self, root_dir = ".", host = None, port = None, user = None, 
396             agent = None, sudo = False, communication = DC.ACCESS_LOCAL,
397             environment_setup = ""):
398         self.root_dir = root_dir
399         self.addr = (host, port)
400         self.user = user
401         self.agent = agent
402         self.sudo = sudo
403         self.communication = communication
404         self.environment_setup = environment_setup
405         self._stopped = False
406         self._deferreds = collections.deque()
407         self.connect()
408     
409     def __del__(self):
410         if self._process.poll() is None:
411             os.kill(self._process.pid, signal.SIGTERM)
412         self._process.wait()
413         
414     def connect(self):
415         root_dir = self.root_dir
416         (host, port) = self.addr
417         user = self.user
418         agent = self.agent
419         sudo = self.sudo
420         communication = self.communication
421         
422         python_code = "from nepi.util import server;c=server.Forwarder(%r);\
423                 c.forward()" % (root_dir,)
424
425         self._process = popen_python(python_code, 
426                     communication = communication,
427                     host = host, 
428                     port = port, 
429                     user = user, 
430                     agent = agent, 
431                     sudo = sudo, 
432                     environment_setup = self.environment_setup)
433                
434         # Wait for the forwarder to be ready, otherwise nobody
435         # will be able to connect to it
436         err = []
437         helo = "nope"
438         while helo:
439             helo = self._process.stderr.readline()
440             if helo == 'FORWARDER_READY.\n':
441                 break
442             err.append(helo)
443         else:
444             raise AssertionError, "Expected 'FORWARDER_READY.', got: %s" % (''.join(err),)
445         
446     def send_msg(self, msg):
447         encoded = base64.b64encode(msg)
448         data = "%s\n" % encoded
449         
450         try:
451             self._process.stdin.write(data)
452         except (IOError, ValueError):
453             # dead process, poll it to un-zombify
454             self._process.poll()
455             
456             # try again after reconnect
457             # If it fails again, though, give up
458             self.connect()
459             self._process.stdin.write(data)
460
461     def send_stop(self):
462         self.send_msg(STOP_MSG)
463         self._stopped = True
464
465     def defer_reply(self, transform=None):
466         defer_entry = []
467         self._deferreds.append(defer_entry)
468         return defer.Defer(
469             functools.partial(self.read_reply, defer_entry, transform)
470         )
471         
472     def _read_reply(self):
473         data = self._process.stdout.readline()
474         encoded = data.rstrip() 
475         if not encoded:
476             # empty == eof == dead process, poll it to un-zombify
477             self._process.poll()
478             
479             raise RuntimeError, "Forwarder died while awaiting reply: %s" % (self._process.stderr.read(),)
480         return base64.b64decode(encoded)
481     
482     def read_reply(self, which=None, transform=None):
483         # Test to see if someone did it already
484         if which is not None and len(which):
485             # Ok, they did it...
486             # ...just return the deferred value
487             if transform:
488                 return transform(which[0])
489             else:
490                 return which[0]
491         
492         # Process all deferreds until the one we're looking for
493         # or until the queue is empty
494         while self._deferreds:
495             try:
496                 deferred = self._deferreds.popleft()
497             except IndexError:
498                 # emptied
499                 break
500             
501             deferred.append(self._read_reply())
502             if deferred is which:
503                 # We reached the one we were looking for
504                 if transform:
505                     return transform(deferred[0])
506                 else:
507                     return deferred[0]
508         
509         if which is None:
510             # They've requested a synchronous read
511             if transform:
512                 return transform(self._read_reply())
513             else:
514                 return self._read_reply()
515
516 def _make_server_key_args(server_key, host, port, args):
517     """ 
518     Returns a reference to the created temporary file, and adds the
519     corresponding arguments to the given argument list.
520     
521     Make sure to hold onto it until the process is done with the file
522     """
523     if port is not None:
524         host = '%s:%s' % (host,port)
525     # Create a temporary server key file
526     tmp_known_hosts = tempfile.NamedTemporaryFile()
527     
528     # Add the intended host key
529     tmp_known_hosts.write('%s,%s %s\n' % (host, socket.gethostbyname(host), server_key))
530     
531     # If we're not in strict mode, add user-configured keys
532     if os.environ.get('NEPI_STRICT_AUTH_MODE',"").lower() not in ('1','true','on'):
533         user_hosts_path = '%s/.ssh/known_hosts' % (os.environ.get('HOME',""),)
534         if os.access(user_hosts_path, os.R_OK):
535             f = open(user_hosts_path, "r")
536             tmp_known_hosts.write(f.read())
537             f.close()
538         
539     tmp_known_hosts.flush()
540     
541     args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
542     
543     return tmp_known_hosts
544
545 def popen_ssh_command(command, host, port, user, agent, 
546         stdin="", 
547         ident_key = None,
548         server_key = None,
549         tty = False,
550         timeout = None,
551         retry = 0,
552         err_on_timeout = True):
553     """
554     Executes a remote commands, returns ((stdout,stderr),process)
555     """
556     if TRACE:
557         print "ssh", host, command
558     
559     tmp_known_hosts = None
560     args = ['ssh',
561             # Don't bother with localhost. Makes test easier
562             '-o', 'NoHostAuthenticationForLocalhost=yes',
563             '-l', user, host]
564     if agent:
565         args.append('-A')
566     if port:
567         args.append('-p%d' % port)
568     if ident_key:
569         args.extend(('-i', ident_key))
570     if tty:
571         args.append('-t')
572     if server_key:
573         # Create a temporary server key file
574         tmp_known_hosts = _make_server_key_args(
575             server_key, host, port, args)
576     args.append(command)
577
578     while 1:
579         # connects to the remote host and starts a remote connection
580         proc = subprocess.Popen(args, 
581                 stdout = subprocess.PIPE,
582                 stdin = subprocess.PIPE, 
583                 stderr = subprocess.PIPE)
584         
585         # attach tempfile object to the process, to make sure the file stays
586         # alive until the process is finished with it
587         proc._known_hosts = tmp_known_hosts
588         
589         try:
590             out, err = _communicate(proc, stdin, timeout, err_on_timeout)
591             break
592         except RuntimeError,e:
593             if retry <= 0:
594                 raise
595             if TRACE:
596                 print " timedout -> ", e.args
597             retry -= 1
598         
599     if TRACE:
600         print " -> ", out, err
601
602     return ((out, err), proc)
603
604 def popen_scp(source, dest, 
605         port = None, 
606         agent = None, 
607         recursive = False,
608         ident_key = None,
609         server_key = None):
610     """
611     Copies from/to remote sites.
612     
613     Source and destination should have the user and host encoded
614     as per scp specs.
615     
616     If source is a file object, a special mode will be used to
617     create the remote file with the same contents.
618     
619     If dest is a file object, the remote file (source) will be
620     read and written into dest.
621     
622     In these modes, recursive cannot be True.
623     
624     Source can be a list of files to copy to a single destination,
625     in which case it is advised that the destination be a folder.
626     """
627     
628     if TRACE:
629         print "scp", source, dest
630     
631     if isinstance(source, file) and source.tell() == 0:
632         source = source.name
633     elif hasattr(source, 'read'):
634         tmp = tempfile.NamedTemporaryFile()
635         while True:
636             buf = source.read(65536)
637             if buf:
638                 tmp.write(buf)
639             else:
640                 break
641         tmp.seek(0)
642         source = tmp.name
643     
644     if isinstance(source, file) or isinstance(dest, file) \
645             or hasattr(source, 'read')  or hasattr(dest, 'write'):
646         assert not recursive
647         
648         # Parse source/destination as <user>@<server>:<path>
649         if isinstance(dest, basestring) and ':' in dest:
650             remspec, path = dest.split(':',1)
651         elif isinstance(source, basestring) and ':' in source:
652             remspec, path = source.split(':',1)
653         else:
654             raise ValueError, "Both endpoints cannot be local"
655         user,host = remspec.rsplit('@',1)
656         tmp_known_hosts = None
657         
658         args = ['ssh', '-l', user, '-C',
659                 # Don't bother with localhost. Makes test easier
660                 '-o', 'NoHostAuthenticationForLocalhost=yes',
661                 host ]
662         if port:
663             args.append('-P%d' % port)
664         if ident_key:
665             args.extend(('-i', ident_key))
666         if server_key:
667             # Create a temporary server key file
668             tmp_known_hosts = _make_server_key_args(
669                 server_key, host, port, args)
670         
671         if isinstance(source, file) or hasattr(source, 'read'):
672             args.append('cat > %s' % (shell_escape(path),))
673         elif isinstance(dest, file) or hasattr(dest, 'write'):
674             args.append('cat %s' % (shell_escape(path),))
675         else:
676             raise AssertionError, "Unreachable code reached! :-Q"
677         
678         # connects to the remote host and starts a remote connection
679         if isinstance(source, file):
680             proc = subprocess.Popen(args, 
681                     stdout = open('/dev/null','w'),
682                     stderr = subprocess.PIPE,
683                     stdin = source)
684             err = proc.stderr.read()
685             proc._known_hosts = tmp_known_hosts
686             eintr_retry(proc.wait)()
687             return ((None,err), proc)
688         elif isinstance(dest, file):
689             proc = subprocess.Popen(args, 
690                     stdout = open('/dev/null','w'),
691                     stderr = subprocess.PIPE,
692                     stdin = source)
693             err = proc.stderr.read()
694             proc._known_hosts = tmp_known_hosts
695             eintr_retry(proc.wait)()
696             return ((None,err), proc)
697         elif hasattr(source, 'read'):
698             # file-like (but not file) source
699             proc = subprocess.Popen(args, 
700                     stdout = open('/dev/null','w'),
701                     stderr = subprocess.PIPE,
702                     stdin = subprocess.PIPE)
703             
704             buf = None
705             err = []
706             while True:
707                 if not buf:
708                     buf = source.read(4096)
709                 if not buf:
710                     #EOF
711                     break
712                 
713                 rdrdy, wrdy, broken = select.select(
714                     [proc.stderr],
715                     [proc.stdin],
716                     [proc.stderr,proc.stdin])
717                 
718                 if proc.stderr in rdrdy:
719                     # use os.read for fully unbuffered behavior
720                     err.append(os.read(proc.stderr.fileno(), 4096))
721                 
722                 if proc.stdin in wrdy:
723                     proc.stdin.write(buf)
724                     buf = None
725                 
726                 if broken:
727                     break
728             proc.stdin.close()
729             err.append(proc.stderr.read())
730                 
731             proc._known_hosts = tmp_known_hosts
732             eintr_retry(proc.wait)()
733             return ((None,''.join(err)), proc)
734         elif hasattr(dest, 'write'):
735             # file-like (but not file) dest
736             proc = subprocess.Popen(args, 
737                     stdout = subprocess.PIPE,
738                     stderr = subprocess.PIPE,
739                     stdin = open('/dev/null','w'))
740             
741             buf = None
742             err = []
743             while True:
744                 rdrdy, wrdy, broken = select.select(
745                     [proc.stderr, proc.stdout],
746                     [],
747                     [proc.stderr, proc.stdout])
748                 
749                 if proc.stderr in rdrdy:
750                     # use os.read for fully unbuffered behavior
751                     err.append(os.read(proc.stderr.fileno(), 4096))
752                 
753                 if proc.stdout in rdrdy:
754                     # use os.read for fully unbuffered behavior
755                     buf = os.read(proc.stdout.fileno(), 4096)
756                     dest.write(buf)
757                     
758                     if not buf:
759                         #EOF
760                         break
761                 
762                 if broken:
763                     break
764             err.append(proc.stderr.read())
765                 
766             proc._known_hosts = tmp_known_hosts
767             eintr_retry(proc.wait)()
768             return ((None,''.join(err)), proc)
769         else:
770             raise AssertionError, "Unreachable code reached! :-Q"
771     else:
772         # Parse destination as <user>@<server>:<path>
773         if isinstance(dest, basestring) and ':' in dest:
774             remspec, path = dest.split(':',1)
775         elif isinstance(source, basestring) and ':' in source:
776             remspec, path = source.split(':',1)
777         else:
778             raise ValueError, "Both endpoints cannot be local"
779         user,host = remspec.rsplit('@',1)
780         
781         # plain scp
782         tmp_known_hosts = None
783         args = ['scp', '-q', '-p', '-C',
784                 # Don't bother with localhost. Makes test easier
785                 '-o', 'NoHostAuthenticationForLocalhost=yes' ]
786         if port:
787             args.append('-P%d' % port)
788         if recursive:
789             args.append('-r')
790         if ident_key:
791             args.extend(('-i', ident_key))
792         if server_key:
793             # Create a temporary server key file
794             tmp_known_hosts = _make_server_key_args(
795                 server_key, host, port, args)
796         if isinstance(source,list):
797             args.extend(source)
798         else:
799             args.append(source)
800         args.append(dest)
801
802         # connects to the remote host and starts a remote connection
803         proc = subprocess.Popen(args, 
804                 stdout = subprocess.PIPE,
805                 stdin = subprocess.PIPE, 
806                 stderr = subprocess.PIPE)
807         proc._known_hosts = tmp_known_hosts
808         
809         comm = proc.communicate()
810         eintr_retry(proc.wait)()
811         return (comm, proc)
812
813 def decode_and_execute():
814     # The python code we want to execute might have characters that 
815     # are not compatible with the 'inline' mode we are using. To avoid
816     # problems we receive the encoded python code in base64 as a input 
817     # stream and decode it for execution.
818     import base64, os
819     cmd = ""
820     while True:
821         try:
822             cmd += os.read(0, 1)# one byte from stdin
823         except OSError, e:            
824             if e.errno == errno.EINTR:
825                 continue
826             else:
827                 raise
828         if cmd[-1] == "\n": 
829             break
830     cmd = base64.b64decode(cmd)
831     # Uncomment for debug
832     #os.write(2, "Executing python code: %s\n" % cmd)
833     os.write(1, "OK\n") # send a sync message
834     exec(cmd)
835
836 def popen_python(python_code, 
837         communication = DC.ACCESS_LOCAL,
838         host = None, 
839         port = None, 
840         user = None, 
841         agent = False, 
842         python_path = None,
843         ident_key = None,
844         server_key = None,
845         tty = False,
846         sudo = False, 
847         environment_setup = ""):
848
849     shell = False
850     cmd = ""
851     if sudo:
852         cmd +="sudo "
853     if python_path:
854         python_path.replace("'", r"'\''")
855         cmd = """PYTHONPATH="$PYTHONPATH":'%s' """ % python_path
856         cmd += " ; "
857     if environment_setup:
858         cmd += environment_setup
859         cmd += " ; "
860     # Uncomment for debug (to run everything under strace)
861     # We had to verify if strace works (cannot nest them)
862     #cmd += "if strace echo >/dev/null 2>&1; then CMD='strace -ff -tt -s 200 -o strace.out'; else CMD=''; fi\n"
863     #cmd += "$CMD "
864     #cmd += "strace -f -tt -s 200 -o strace$$.out "
865     import nepi
866     cmd += "python -c 'import sys; sys.path.insert(0,%s); from nepi.util import server; server.decode_and_execute()'" % (
867         repr(os.path.dirname(os.path.dirname(nepi.__file__))).replace("'",'"'),
868     )
869
870     if communication == DC.ACCESS_SSH:
871         tmp_known_hosts = None
872         args = ['ssh',
873                 # Don't bother with localhost. Makes test easier
874                 '-o', 'NoHostAuthenticationForLocalhost=yes',
875                 '-l', user, host]
876         if agent:
877             args.append('-A')
878         if port:
879             args.append('-p%d' % port)
880         if ident_key:
881             args.extend(('-i', ident_key))
882         if tty:
883             args.append('-t')
884         if server_key:
885             # Create a temporary server key file
886             tmp_known_hosts = _make_server_key_args(
887                 server_key, host, port, args)
888         args.append(cmd)
889     else:
890         args = [cmd]
891         shell = True
892
893     # connects to the remote host and starts a remote
894     proc = subprocess.Popen(args,
895             shell = shell, 
896             stdout = subprocess.PIPE,
897             stdin = subprocess.PIPE, 
898             stderr = subprocess.PIPE)
899
900     if communication == DC.ACCESS_SSH:
901         proc._known_hosts = tmp_known_hosts
902
903     # send the command to execute
904     os.write(proc.stdin.fileno(),
905             base64.b64encode(python_code) + "\n")
906  
907     while True: 
908         try:
909             msg = os.read(proc.stdout.fileno(), 3)
910             break
911         except OSError, e:            
912             if e.errno == errno.EINTR:
913                 continue
914             else:
915                 raise
916     
917     if msg != "OK\n":
918         raise RuntimeError, "Failed to start remote python interpreter: \nout:\n%s%s\nerr:\n%s" % (
919             msg, proc.stdout.read(), proc.stderr.read())
920
921     return proc
922
923 # POSIX
924 def _communicate(self, input, timeout=None, err_on_timeout=True):
925     read_set = []
926     write_set = []
927     stdout = None # Return
928     stderr = None # Return
929     
930     killed = False
931     
932     if timeout is not None:
933         timelimit = time.time() + timeout
934         killtime = timelimit + 4
935         bailtime = timelimit + 4
936
937     if self.stdin:
938         # Flush stdio buffer.  This might block, if the user has
939         # been writing to .stdin in an uncontrolled fashion.
940         self.stdin.flush()
941         if input:
942             write_set.append(self.stdin)
943         else:
944             self.stdin.close()
945     if self.stdout:
946         read_set.append(self.stdout)
947         stdout = []
948     if self.stderr:
949         read_set.append(self.stderr)
950         stderr = []
951
952     input_offset = 0
953     while read_set or write_set:
954         if timeout is not None:
955             curtime = time.time()
956             if timeout is None or curtime > timelimit:
957                 if curtime > bailtime:
958                     break
959                 elif curtime > killtime:
960                     signum = signal.SIGKILL
961                 else:
962                     signum = signal.SIGTERM
963                 # Lets kill it
964                 os.kill(self.pid, signum)
965                 select_timeout = 0.5
966             else:
967                 select_timeout = timelimit - curtime + 0.1
968         else:
969             select_timeout = None
970             
971         try:
972             rlist, wlist, xlist = select.select(read_set, write_set, [], select_timeout)
973         except select.error,e:
974             if e[0] != 4:
975                 raise
976             else:
977                 continue
978
979         if self.stdin in wlist:
980             # When select has indicated that the file is writable,
981             # we can write up to PIPE_BUF bytes without risk
982             # blocking.  POSIX defines PIPE_BUF >= 512
983             bytes_written = os.write(self.stdin.fileno(), buffer(input, input_offset, 512))
984             input_offset += bytes_written
985             if input_offset >= len(input):
986                 self.stdin.close()
987                 write_set.remove(self.stdin)
988
989         if self.stdout in rlist:
990             data = os.read(self.stdout.fileno(), 1024)
991             if data == "":
992                 self.stdout.close()
993                 read_set.remove(self.stdout)
994             stdout.append(data)
995
996         if self.stderr in rlist:
997             data = os.read(self.stderr.fileno(), 1024)
998             if data == "":
999                 self.stderr.close()
1000                 read_set.remove(self.stderr)
1001             stderr.append(data)
1002     
1003     # All data exchanged.  Translate lists into strings.
1004     if stdout is not None:
1005         stdout = ''.join(stdout)
1006     if stderr is not None:
1007         stderr = ''.join(stderr)
1008
1009     # Translate newlines, if requested.  We cannot let the file
1010     # object do the translation: It is based on stdio, which is
1011     # impossible to combine with select (unless forcing no
1012     # buffering).
1013     if self.universal_newlines and hasattr(file, 'newlines'):
1014         if stdout:
1015             stdout = self._translate_newlines(stdout)
1016         if stderr:
1017             stderr = self._translate_newlines(stderr)
1018
1019     if killed and err_on_timeout:
1020         errcode = self.poll()
1021         raise RuntimeError, ("Operation timed out", errcode, stdout, stderr)
1022     else:
1023         if killed:
1024             self.poll()
1025         else:
1026             self.wait()
1027         return (stdout, stderr)
1028