bugfix: in server.Server, inside the message read loop, a break was donde after a...
[nepi.git] / src / nepi / util / server.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 from nepi.util.constants import DeploymentConfiguration as DC
5
6 import base64
7 import errno
8 import os
9 import os.path
10 import resource
11 import select
12 import socket
13 import signal
14 import sys
15 import subprocess
16 import threading
17 import time
18 import traceback
19 import signal
20 import re
21 import tempfile
22 import defer
23 import functools
24 import collections
25
26 CTRL_SOCK = "ctrl.sock"
27 STD_ERR = "stderr.log"
28 MAX_FD = 1024
29
30 STOP_MSG = "STOP"
31
32 TRACE = os.environ.get("NEPI_TRACE", "false").lower() in ("true", "1", "on")
33
34 if hasattr(os, "devnull"):
35     DEV_NULL = os.devnull
36 else:
37     DEV_NULL = "/dev/null"
38
39 SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
40
41 def shell_escape(s):
42     """ Escapes strings so that they are safe to use as command-line arguments """
43     if SHELL_SAFE.match(s):
44         # safe string - no escaping needed
45         return s
46     else:
47         # unsafe string - escape
48         def escp(c):
49             if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",):
50                 return c
51             else:
52                 return "'$'\\x%02x''" % (ord(c),)
53         s = ''.join(map(escp,s))
54         return "'%s'" % (s,)
55
56 def eintr_retry(func):
57     import functools
58     @functools.wraps(func)
59     def rv(*p, **kw):
60         retry = kw.pop("_retry", False)
61         for i in xrange(0 if retry else 4):
62             try:
63                 return func(*p, **kw)
64             except (select.error, socket.error), args:
65                 if args[0] == errno.EINTR:
66                     continue
67                 else:
68                     raise 
69             except OSError, e:
70                 if e.errno == errno.EINTR:
71                     continue
72                 else:
73                     raise
74         else:
75             return func(*p, **kw)
76     return rv
77
78 class Server(object):
79     def __init__(self, root_dir = ".", log_level = DC.ERROR_LEVEL, environment_setup = ""):
80         self._root_dir = root_dir
81         self._stop = False
82         self._ctrl_sock = None
83         self._log_level = log_level
84         self._rdbuf = ""
85         self._environment_setup = environment_setup
86
87     def run(self):
88         try:
89             if self.daemonize():
90                 self.post_daemonize()
91                 self.loop()
92                 self.cleanup()
93                 # ref: "os._exit(0)"
94                 # can not return normally after fork beacuse no exec was done.
95                 # This means that if we don't do a os._exit(0) here the code that 
96                 # follows the call to "Server.run()" in the "caller code" will be 
97                 # executed... but by now it has already been executed after the 
98                 # first process (the one that did the first fork) returned.
99                 os._exit(0)
100         except:
101             print >>sys.stderr, "SERVER_ERROR."
102             self.log_error()
103             self.cleanup()
104             os._exit(0)
105         print >>sys.stderr, "SERVER_READY."
106
107     def daemonize(self):
108         # pipes for process synchronization
109         (r, w) = os.pipe()
110         
111         # build root folder
112         root = os.path.normpath(self._root_dir)
113         if not os.path.exists(root):
114             os.makedirs(root, 0755)
115
116         pid1 = os.fork()
117         if pid1 > 0:
118             os.close(w)
119             while True:
120                 try:
121                     os.read(r, 1)
122                 except OSError, e: # pragma: no cover
123                     if e.errno == errno.EINTR:
124                         continue
125                     else:
126                         raise
127                 break
128             os.close(r)
129             # os.waitpid avoids leaving a <defunc> (zombie) process
130             st = os.waitpid(pid1, 0)[1]
131             if st:
132                 raise RuntimeError("Daemonization failed")
133             # return 0 to inform the caller method that this is not the 
134             # daemonized process
135             return 0
136         os.close(r)
137
138         # Decouple from parent environment.
139         os.chdir(self._root_dir)
140         os.umask(0)
141         os.setsid()
142
143         # fork 2
144         pid2 = os.fork()
145         if pid2 > 0:
146             # see ref: "os._exit(0)"
147             os._exit(0)
148
149         # close all open file descriptors.
150         max_fd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
151         if (max_fd == resource.RLIM_INFINITY):
152             max_fd = MAX_FD
153         for fd in range(3, max_fd):
154             if fd != w:
155                 try:
156                     os.close(fd)
157                 except OSError:
158                     pass
159
160         # Redirect standard file descriptors.
161         stdin = open(DEV_NULL, "r")
162         stderr = stdout = open(STD_ERR, "a", 0)
163         os.dup2(stdin.fileno(), sys.stdin.fileno())
164         # NOTE: sys.stdout.write will still be buffered, even if the file
165         # was opened with 0 buffer
166         os.dup2(stdout.fileno(), sys.stdout.fileno())
167         os.dup2(stderr.fileno(), sys.stderr.fileno())
168         
169         # setup environment
170         if self._environment_setup:
171             # parse environment variables and pass to child process
172             # do it by executing shell commands, in case there's some heavy setup involved
173             envproc = subprocess.Popen(
174                 [ "bash", "-c", 
175                     "( %s python -c 'import os,sys ; print \"\\x01\".join(\"\\x02\".join(map(str,x)) for x in os.environ.iteritems())' ) | tail -1" %
176                         ( self._environment_setup, ) ],
177                 stdin = subprocess.PIPE, 
178                 stdout = subprocess.PIPE,
179                 stderr = subprocess.PIPE
180             )
181             out,err = envproc.communicate()
182
183             # parse new environment
184             if out:
185                 environment = dict(map(lambda x:x.split("\x02"), out.split("\x01")))
186             
187                 # apply to current environment
188                 for name, value in environment.iteritems():
189                     os.environ[name] = value
190                 
191                 # apply pythonpath
192                 if 'PYTHONPATH' in environment:
193                     sys.path = environment['PYTHONPATH'].split(':') + sys.path
194
195         # create control socket
196         self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
197         self._ctrl_sock.bind(CTRL_SOCK)
198         self._ctrl_sock.listen(0)
199
200         # let the parent process know that the daemonization is finished
201         os.write(w, "\n")
202         os.close(w)
203         return 1
204
205     def post_daemonize(self):
206         os.environ["NEPI_CONTROLLER_LOGLEVEL"] = self._log_level
207         # QT, for some strange reason, redefines the SIGCHILD handler to write
208         # a \0 to a fd (lets say fileno 'x'), when ever a SIGCHILD is received.
209         # Server dameonization closes all file descriptors from fileno '3',
210         # but the overloaded handler (inherited by the forked process) will
211         # keep trying to write the \0 to fileno 'x', which might have been reused 
212         # after closing, for other operations. This is bad bad bad when fileno 'x'
213         # is in use for communication pouroses, because unexpected \0 start
214         # appearing in the communication messages... this is exactly what happens 
215         # when using netns in daemonized form. Thus, be have no other alternative than
216         # restoring the SIGCHLD handler to the default here.
217         import signal
218         signal.signal(signal.SIGCHLD, signal.SIG_DFL)
219
220     def loop(self):
221         while not self._stop:
222             conn, addr = self._ctrl_sock.accept()
223             self.log_error("ACCEPTED CONNECTION: %s" % (addr,))
224             conn.settimeout(5)
225             while not self._stop:
226                 try:
227                     msg = self.recv_msg(conn)
228                 except socket.timeout, e:
229                     self.log_error("SERVER recv_msg: connection timedout ")
230                     continue
231                 
232                 if not msg:
233                     self.log_error("CONNECTION LOST")
234                     break
235                     
236                 if msg == STOP_MSG:
237                     self._stop = True
238                     reply = self.stop_action()
239                 else:
240                     reply = self.reply_action(msg)
241                 
242                 try:
243                     self.send_reply(conn, reply)
244                 except socket.error:
245                     self.log_error()
246                     self.log_error("NOTICE: Awaiting for reconnection")
247                     break
248             try:
249                 conn.close()
250             except:
251                 # Doesn't matter
252                 self.log_error()
253
254     def recv_msg(self, conn):
255         data = [self._rdbuf]
256         chunk = data[0]
257         while '\n' not in chunk:
258             try:
259                 chunk = conn.recv(1024)
260             except (OSError, socket.error), e:
261                 if e[0] != errno.EINTR:
262                     raise
263                 else:
264                     continue
265             if chunk:
266                 data.append(chunk)
267             else:
268                 # empty chunk = EOF
269                 break
270         data = ''.join(data).split('\n',1)
271         while len(data) < 2:
272             data.append('')
273         data, self._rdbuf = data
274         
275         decoded = base64.b64decode(data)
276         return decoded.rstrip()
277
278     def send_reply(self, conn, reply):
279         encoded = base64.b64encode(reply)
280         conn.send("%s\n" % encoded)
281        
282     def cleanup(self):
283         try:
284             self._ctrl_sock.close()
285             os.remove(CTRL_SOCK)
286         except:
287             self.log_error()
288
289     def stop_action(self):
290         return "Stopping server"
291
292     def reply_action(self, msg):
293         return "Reply to: %s" % msg
294
295     def log_error(self, text = None, context = ''):
296         if text == None:
297             text = traceback.format_exc()
298         date = time.strftime("%Y-%m-%d %H:%M:%S")
299         if context:
300             context = " (%s)" % (context,)
301         sys.stderr.write("ERROR%s: %s\n%s\n" % (context, date, text))
302         return text
303
304     def log_debug(self, text):
305         if self._log_level == DC.DEBUG_LEVEL:
306             date = time.strftime("%Y-%m-%d %H:%M:%S")
307             sys.stderr.write("DEBUG: %s\n%s\n" % (date, text))
308
309 class Forwarder(object):
310     def __init__(self, root_dir = "."):
311         self._ctrl_sock = None
312         self._root_dir = root_dir
313         self._stop = False
314         self._rdbuf = ""
315
316     def forward(self):
317         self.connect()
318         print >>sys.stderr, "FORWARDER_READY."
319         while not self._stop:
320             data = self.read_data()
321             if not data:
322                 # Connection to client lost
323                 break
324             self.send_to_server(data)
325             
326             data = self.recv_from_server()
327             if not data:
328                 # Connection to server lost
329                 raise IOError, "Connection to server lost while "\
330                     "expecting response"
331             self.write_data(data)
332         self.disconnect()
333
334     def read_data(self):
335         return sys.stdin.readline()
336
337     def write_data(self, data):
338         sys.stdout.write(data)
339         # sys.stdout.write is buffered, this is why we need to do a flush()
340         sys.stdout.flush()
341
342     def send_to_server(self, data):
343         try:
344             self._ctrl_sock.send(data)
345         except (IOError, socket.error), e:
346             if e[0] == errno.EPIPE:
347                 self.connect()
348                 self._ctrl_sock.send(data)
349             else:
350                 raise e
351         encoded = data.rstrip() 
352         msg = base64.b64decode(encoded)
353         if msg == STOP_MSG:
354             self._stop = True
355
356     def recv_from_server(self):
357         data = [self._rdbuf]
358         chunk = data[0]
359         while '\n' not in chunk:
360             try:
361                 chunk = self._ctrl_sock.recv(1024)
362             except (OSError, socket.error), e:
363                 if e[0] != errno.EINTR:
364                     raise
365                 continue
366             if chunk:
367                 data.append(chunk)
368             else:
369                 # empty chunk = EOF
370                 break
371         data = ''.join(data).split('\n',1)
372         while len(data) < 2:
373             data.append('')
374         data, self._rdbuf = data
375         
376         return data+'\n'
377  
378     def connect(self):
379         self.disconnect()
380         self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
381         sock_addr = os.path.join(self._root_dir, CTRL_SOCK)
382         self._ctrl_sock.connect(sock_addr)
383
384     def disconnect(self):
385         try:
386             self._ctrl_sock.close()
387         except:
388             pass
389
390 class Client(object):
391     def __init__(self, root_dir = ".", host = None, port = None, user = None, 
392             agent = None, sudo = False, communication = DC.ACCESS_LOCAL,
393             environment_setup = ""):
394         self.root_dir = root_dir
395         self.addr = (host, port)
396         self.user = user
397         self.agent = agent
398         self.sudo = sudo
399         self.communication = communication
400         self.environment_setup = environment_setup
401         self._stopped = False
402         self._deferreds = collections.deque()
403         self.connect()
404     
405     def __del__(self):
406         if self._process.poll() is None:
407             os.kill(self._process.pid, signal.SIGTERM)
408         self._process.wait()
409         
410     def connect(self):
411         root_dir = self.root_dir
412         (host, port) = self.addr
413         user = self.user
414         agent = self.agent
415         sudo = self.sudo
416         communication = self.communication
417         
418         python_code = "from nepi.util import server;c=server.Forwarder(%r);\
419                 c.forward()" % (root_dir,)
420
421         self._process = popen_python(python_code, 
422                     communication = communication,
423                     host = host, 
424                     port = port, 
425                     user = user, 
426                     agent = agent, 
427                     sudo = sudo, 
428                     environment_setup = self.environment_setup)
429                
430         # Wait for the forwarder to be ready, otherwise nobody
431         # will be able to connect to it
432         helo = self._process.stderr.readline()
433         if helo != 'FORWARDER_READY.\n':
434             raise AssertionError, "Expected 'FORWARDER_READY.', got %r: %s" % (helo,
435                     helo + self._process.stderr.read())
436         
437     def send_msg(self, msg):
438         encoded = base64.b64encode(msg)
439         data = "%s\n" % encoded
440         
441         try:
442             self._process.stdin.write(data)
443         except (IOError, ValueError):
444             # dead process, poll it to un-zombify
445             self._process.poll()
446             
447             # try again after reconnect
448             # If it fails again, though, give up
449             self.connect()
450             self._process.stdin.write(data)
451
452     def send_stop(self):
453         self.send_msg(STOP_MSG)
454         self._stopped = True
455
456     def defer_reply(self, transform=None):
457         defer_entry = []
458         self._deferreds.append(defer_entry)
459         return defer.Defer(
460             functools.partial(self.read_reply, defer_entry, transform)
461         )
462         
463     def _read_reply(self):
464         data = self._process.stdout.readline()
465         encoded = data.rstrip() 
466         if not encoded:
467             # empty == eof == dead process, poll it to un-zombify
468             self._process.poll()
469             
470             raise RuntimeError, "Forwarder died while awaiting reply: %s" % (self._process.stderr.read(),)
471         return base64.b64decode(encoded)
472     
473     def read_reply(self, which=None, transform=None):
474         # Test to see if someone did it already
475         if which is not None and len(which):
476             # Ok, they did it...
477             # ...just return the deferred value
478             if transform:
479                 return transform(which[0])
480             else:
481                 return which[0]
482         
483         # Process all deferreds until the one we're looking for
484         # or until the queue is empty
485         while self._deferreds:
486             try:
487                 deferred = self._deferreds.popleft()
488             except IndexError:
489                 # emptied
490                 break
491             
492             deferred.append(self._read_reply())
493             if deferred is which:
494                 # We reached the one we were looking for
495                 if transform:
496                     return transform(deferred[0])
497                 else:
498                     return deferred[0]
499         
500         if which is None:
501             # They've requested a synchronous read
502             if transform:
503                 return transform(self._read_reply())
504             else:
505                 return self._read_reply()
506
507 def _make_server_key_args(server_key, host, port, args):
508     """ 
509     Returns a reference to the created temporary file, and adds the
510     corresponding arguments to the given argument list.
511     
512     Make sure to hold onto it until the process is done with the file
513     """
514     if port is not None:
515         host = '%s:%s' % (host,port)
516     # Create a temporary server key file
517     tmp_known_hosts = tempfile.NamedTemporaryFile()
518     
519     # Add the intended host key
520     tmp_known_hosts.write('%s,%s %s\n' % (host, socket.gethostbyname(host), server_key))
521     
522     # If we're not in strict mode, add user-configured keys
523     if os.environ.get('NEPI_STRICT_AUTH_MODE',"").lower() not in ('1','true','on'):
524         user_hosts_path = '%s/.ssh/known_hosts' % (os.environ.get('HOME',""),)
525         if os.access(user_hosts_path, os.R_OK):
526             f = open(user_hosts_path, "r")
527             tmp_known_hosts.write(f.read())
528             f.close()
529         
530     tmp_known_hosts.flush()
531     
532     args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
533     
534     return tmp_known_hosts
535
536 def popen_ssh_command(command, host, port, user, agent, 
537         stdin="", 
538         ident_key = None,
539         server_key = None,
540         tty = False,
541         timeout = None,
542         retry = 0,
543         err_on_timeout = True):
544     """
545     Executes a remote commands, returns ((stdout,stderr),process)
546     """
547     if TRACE:
548         print "ssh", host, command
549     
550     tmp_known_hosts = None
551     args = ['ssh',
552             # Don't bother with localhost. Makes test easier
553             '-o', 'NoHostAuthenticationForLocalhost=yes',
554             '-l', user, host]
555     if agent:
556         args.append('-A')
557     if port:
558         args.append('-p%d' % port)
559     if ident_key:
560         args.extend(('-i', ident_key))
561     if tty:
562         args.append('-t')
563     if server_key:
564         # Create a temporary server key file
565         tmp_known_hosts = _make_server_key_args(
566             server_key, host, port, args)
567     args.append(command)
568
569     while 1:
570         # connects to the remote host and starts a remote connection
571         proc = subprocess.Popen(args, 
572                 stdout = subprocess.PIPE,
573                 stdin = subprocess.PIPE, 
574                 stderr = subprocess.PIPE)
575         
576         # attach tempfile object to the process, to make sure the file stays
577         # alive until the process is finished with it
578         proc._known_hosts = tmp_known_hosts
579         
580         try:
581             out, err = _communicate(proc, stdin, timeout, err_on_timeout)
582             break
583         except RuntimeError,e:
584             if retry <= 0:
585                 raise
586             if TRACE:
587                 print " timedout -> ", e.args
588             retry -= 1
589         
590     if TRACE:
591         print " -> ", out, err
592
593     return ((out, err), proc)
594
595 def popen_scp(source, dest, 
596         port = None, 
597         agent = None, 
598         recursive = False,
599         ident_key = None,
600         server_key = None):
601     """
602     Copies from/to remote sites.
603     
604     Source and destination should have the user and host encoded
605     as per scp specs.
606     
607     If source is a file object, a special mode will be used to
608     create the remote file with the same contents.
609     
610     If dest is a file object, the remote file (source) will be
611     read and written into dest.
612     
613     In these modes, recursive cannot be True.
614     
615     Source can be a list of files to copy to a single destination,
616     in which case it is advised that the destination be a folder.
617     """
618     
619     if TRACE:
620         print "scp", source, dest
621     
622     if isinstance(source, file) and source.tell() == 0:
623         source = source.name
624     elif hasattr(source, 'read'):
625         tmp = tempfile.NamedTemporaryFile()
626         while True:
627             buf = source.read(65536)
628             if buf:
629                 tmp.write(buf)
630             else:
631                 break
632         tmp.seek(0)
633         source = tmp.name
634     
635     if isinstance(source, file) or isinstance(dest, file) \
636             or hasattr(source, 'read')  or hasattr(dest, 'write'):
637         assert not recursive
638         
639         # Parse source/destination as <user>@<server>:<path>
640         if isinstance(dest, basestring) and ':' in dest:
641             remspec, path = dest.split(':',1)
642         elif isinstance(source, basestring) and ':' in source:
643             remspec, path = source.split(':',1)
644         else:
645             raise ValueError, "Both endpoints cannot be local"
646         user,host = remspec.rsplit('@',1)
647         tmp_known_hosts = None
648         
649         args = ['ssh', '-l', user, '-C',
650                 # Don't bother with localhost. Makes test easier
651                 '-o', 'NoHostAuthenticationForLocalhost=yes',
652                 host ]
653         if port:
654             args.append('-P%d' % port)
655         if ident_key:
656             args.extend(('-i', ident_key))
657         if server_key:
658             # Create a temporary server key file
659             tmp_known_hosts = _make_server_key_args(
660                 server_key, host, port, args)
661         
662         if isinstance(source, file) or hasattr(source, 'read'):
663             args.append('cat > %s' % (shell_escape(path),))
664         elif isinstance(dest, file) or hasattr(dest, 'write'):
665             args.append('cat %s' % (shell_escape(path),))
666         else:
667             raise AssertionError, "Unreachable code reached! :-Q"
668         
669         # connects to the remote host and starts a remote connection
670         if isinstance(source, file):
671             proc = subprocess.Popen(args, 
672                     stdout = open('/dev/null','w'),
673                     stderr = subprocess.PIPE,
674                     stdin = source)
675             err = proc.stderr.read()
676             proc._known_hosts = tmp_known_hosts
677             eintr_retry(proc.wait)()
678             return ((None,err), proc)
679         elif isinstance(dest, file):
680             proc = subprocess.Popen(args, 
681                     stdout = open('/dev/null','w'),
682                     stderr = subprocess.PIPE,
683                     stdin = source)
684             err = proc.stderr.read()
685             proc._known_hosts = tmp_known_hosts
686             eintr_retry(proc.wait)()
687             return ((None,err), proc)
688         elif hasattr(source, 'read'):
689             # file-like (but not file) source
690             proc = subprocess.Popen(args, 
691                     stdout = open('/dev/null','w'),
692                     stderr = subprocess.PIPE,
693                     stdin = subprocess.PIPE)
694             
695             buf = None
696             err = []
697             while True:
698                 if not buf:
699                     buf = source.read(4096)
700                 if not buf:
701                     #EOF
702                     break
703                 
704                 rdrdy, wrdy, broken = select.select(
705                     [proc.stderr],
706                     [proc.stdin],
707                     [proc.stderr,proc.stdin])
708                 
709                 if proc.stderr in rdrdy:
710                     # use os.read for fully unbuffered behavior
711                     err.append(os.read(proc.stderr.fileno(), 4096))
712                 
713                 if proc.stdin in wrdy:
714                     proc.stdin.write(buf)
715                     buf = None
716                 
717                 if broken:
718                     break
719             proc.stdin.close()
720             err.append(proc.stderr.read())
721                 
722             proc._known_hosts = tmp_known_hosts
723             eintr_retry(proc.wait)()
724             return ((None,''.join(err)), proc)
725         elif hasattr(dest, 'write'):
726             # file-like (but not file) dest
727             proc = subprocess.Popen(args, 
728                     stdout = subprocess.PIPE,
729                     stderr = subprocess.PIPE,
730                     stdin = open('/dev/null','w'))
731             
732             buf = None
733             err = []
734             while True:
735                 rdrdy, wrdy, broken = select.select(
736                     [proc.stderr, proc.stdout],
737                     [],
738                     [proc.stderr, proc.stdout])
739                 
740                 if proc.stderr in rdrdy:
741                     # use os.read for fully unbuffered behavior
742                     err.append(os.read(proc.stderr.fileno(), 4096))
743                 
744                 if proc.stdout in rdrdy:
745                     # use os.read for fully unbuffered behavior
746                     buf = os.read(proc.stdout.fileno(), 4096)
747                     dest.write(buf)
748                     
749                     if not buf:
750                         #EOF
751                         break
752                 
753                 if broken:
754                     break
755             err.append(proc.stderr.read())
756                 
757             proc._known_hosts = tmp_known_hosts
758             eintr_retry(proc.wait)()
759             return ((None,''.join(err)), proc)
760         else:
761             raise AssertionError, "Unreachable code reached! :-Q"
762     else:
763         # Parse destination as <user>@<server>:<path>
764         if isinstance(dest, basestring) and ':' in dest:
765             remspec, path = dest.split(':',1)
766         elif isinstance(source, basestring) and ':' in source:
767             remspec, path = source.split(':',1)
768         else:
769             raise ValueError, "Both endpoints cannot be local"
770         user,host = remspec.rsplit('@',1)
771         
772         # plain scp
773         tmp_known_hosts = None
774         args = ['scp', '-q', '-p', '-C',
775                 # Don't bother with localhost. Makes test easier
776                 '-o', 'NoHostAuthenticationForLocalhost=yes' ]
777         if port:
778             args.append('-P%d' % port)
779         if recursive:
780             args.append('-r')
781         if ident_key:
782             args.extend(('-i', ident_key))
783         if server_key:
784             # Create a temporary server key file
785             tmp_known_hosts = _make_server_key_args(
786                 server_key, host, port, args)
787         if isinstance(source,list):
788             args.extend(source)
789         else:
790             args.append(source)
791         args.append(dest)
792
793         # connects to the remote host and starts a remote connection
794         proc = subprocess.Popen(args, 
795                 stdout = subprocess.PIPE,
796                 stdin = subprocess.PIPE, 
797                 stderr = subprocess.PIPE)
798         proc._known_hosts = tmp_known_hosts
799         
800         comm = proc.communicate()
801         eintr_retry(proc.wait)()
802         return (comm, proc)
803
804 def decode_and_execute():
805     # The python code we want to execute might have characters that 
806     # are not compatible with the 'inline' mode we are using. To avoid
807     # problems we receive the encoded python code in base64 as a input 
808     # stream and decode it for execution.
809     import base64, os
810     cmd = ""
811     while True:
812         try:
813             cmd += os.read(0, 1)# one byte from stdin
814         except OSError, e:            
815             if e.errno == errno.EINTR:
816                 continue
817             else:
818                 raise
819         if cmd[-1] == "\n": 
820             break
821     cmd = base64.b64decode(cmd)
822     # Uncomment for debug
823     #os.write(2, "Executing python code: %s\n" % cmd)
824     os.write(1, "OK\n") # send a sync message
825     exec(cmd)
826
827 def popen_python(python_code, 
828         communication = DC.ACCESS_LOCAL,
829         host = None, 
830         port = None, 
831         user = None, 
832         agent = False, 
833         python_path = None,
834         ident_key = None,
835         server_key = None,
836         tty = False,
837         sudo = False, 
838         environment_setup = ""):
839
840     shell = False
841     cmd = ""
842     if sudo:
843         cmd +="sudo "
844     if python_path:
845         python_path.replace("'", r"'\''")
846         cmd = """PYTHONPATH="$PYTHONPATH":'%s' """ % python_path
847         cmd += " ; "
848     if environment_setup:
849         cmd += environment_setup
850         cmd += " ; "
851     # Uncomment for debug (to run everything under strace)
852     # We had to verify if strace works (cannot nest them)
853     #cmd += "if strace echo >/dev/null 2>&1; then CMD='strace -ff -tt -s 200 -o strace.out'; else CMD=''; fi\n"
854     #cmd += "$CMD "
855     #cmd += "strace -f -tt -s 200 -o strace$$.out "
856     import nepi
857     cmd += "python -c 'import sys; sys.path.append(%s); from nepi.util import server; server.decode_and_execute()'" % (
858         repr(os.path.dirname(os.path.dirname(nepi.__file__))).replace("'",'"'),
859     )
860
861     if communication == DC.ACCESS_SSH:
862         tmp_known_hosts = None
863         args = ['ssh',
864                 # Don't bother with localhost. Makes test easier
865                 '-o', 'NoHostAuthenticationForLocalhost=yes',
866                 '-l', user, host]
867         if agent:
868             args.append('-A')
869         if port:
870             args.append('-p%d' % port)
871         if ident_key:
872             args.extend(('-i', ident_key))
873         if tty:
874             args.append('-t')
875         if server_key:
876             # Create a temporary server key file
877             tmp_known_hosts = _make_server_key_args(
878                 server_key, host, port, args)
879         args.append(cmd)
880     else:
881         args = [cmd]
882         shell = True
883
884     # connects to the remote host and starts a remote
885     proc = subprocess.Popen(args,
886             shell = shell, 
887             stdout = subprocess.PIPE,
888             stdin = subprocess.PIPE, 
889             stderr = subprocess.PIPE)
890
891     if communication == DC.ACCESS_SSH:
892         proc._known_hosts = tmp_known_hosts
893
894     # send the command to execute
895     os.write(proc.stdin.fileno(),
896             base64.b64encode(python_code) + "\n")
897  
898     while True: 
899         try:
900             msg = os.read(proc.stdout.fileno(), 3)
901             break
902         except OSError, e:            
903             if e.errno == errno.EINTR:
904                 continue
905             else:
906                 raise
907     
908     if msg != "OK\n":
909         raise RuntimeError, "Failed to start remote python interpreter: \nout:\n%s%s\nerr:\n%s" % (
910             msg, proc.stdout.read(), proc.stderr.read())
911
912     return proc
913
914 # POSIX
915 def _communicate(self, input, timeout=None, err_on_timeout=True):
916     read_set = []
917     write_set = []
918     stdout = None # Return
919     stderr = None # Return
920     
921     killed = False
922     
923     if timeout is not None:
924         timelimit = time.time() + timeout
925         killtime = timelimit + 4
926         bailtime = timelimit + 4
927
928     if self.stdin:
929         # Flush stdio buffer.  This might block, if the user has
930         # been writing to .stdin in an uncontrolled fashion.
931         self.stdin.flush()
932         if input:
933             write_set.append(self.stdin)
934         else:
935             self.stdin.close()
936     if self.stdout:
937         read_set.append(self.stdout)
938         stdout = []
939     if self.stderr:
940         read_set.append(self.stderr)
941         stderr = []
942
943     input_offset = 0
944     while read_set or write_set:
945         if timeout is not None:
946             curtime = time.time()
947             if timeout is None or curtime > timelimit:
948                 if curtime > bailtime:
949                     break
950                 elif curtime > killtime:
951                     signum = signal.SIGKILL
952                 else:
953                     signum = signal.SIGTERM
954                 # Lets kill it
955                 os.kill(self.pid, signum)
956                 select_timeout = 0.5
957             else:
958                 select_timeout = timelimit - curtime + 0.1
959         else:
960             select_timeout = None
961             
962         try:
963             rlist, wlist, xlist = select.select(read_set, write_set, [], select_timeout)
964         except select.error,e:
965             if e[0] != 4:
966                 raise
967             else:
968                 continue
969
970         if self.stdin in wlist:
971             # When select has indicated that the file is writable,
972             # we can write up to PIPE_BUF bytes without risk
973             # blocking.  POSIX defines PIPE_BUF >= 512
974             bytes_written = os.write(self.stdin.fileno(), buffer(input, input_offset, 512))
975             input_offset += bytes_written
976             if input_offset >= len(input):
977                 self.stdin.close()
978                 write_set.remove(self.stdin)
979
980         if self.stdout in rlist:
981             data = os.read(self.stdout.fileno(), 1024)
982             if data == "":
983                 self.stdout.close()
984                 read_set.remove(self.stdout)
985             stdout.append(data)
986
987         if self.stderr in rlist:
988             data = os.read(self.stderr.fileno(), 1024)
989             if data == "":
990                 self.stderr.close()
991                 read_set.remove(self.stderr)
992             stderr.append(data)
993     
994     # All data exchanged.  Translate lists into strings.
995     if stdout is not None:
996         stdout = ''.join(stdout)
997     if stderr is not None:
998         stderr = ''.join(stderr)
999
1000     # Translate newlines, if requested.  We cannot let the file
1001     # object do the translation: It is based on stdio, which is
1002     # impossible to combine with select (unless forcing no
1003     # buffering).
1004     if self.universal_newlines and hasattr(file, 'newlines'):
1005         if stdout:
1006             stdout = self._translate_newlines(stdout)
1007         if stderr:
1008             stderr = self._translate_newlines(stderr)
1009
1010     if killed and err_on_timeout:
1011         errcode = self.poll()
1012         raise RuntimeError, ("Operation timed out", errcode, stdout, stderr)
1013     else:
1014         if killed:
1015             self.poll()
1016         else:
1017             self.wait()
1018         return (stdout, stderr)
1019