Fix metadata breakage from recent commit
[nepi.git] / src / nepi / util / server.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 from nepi.util.constants import DeploymentConfiguration as DC
5
6 import base64
7 import errno
8 import os
9 import os.path
10 import resource
11 import select
12 import shutil
13 import signal
14 import socket
15 import sys
16 import subprocess
17 import threading
18 import time
19 import traceback
20 import re
21 import tempfile
22 import defer
23 import functools
24 import collections
25
26 CTRL_SOCK = "ctrl.sock"
27 STD_ERR = "stderr.log"
28 MAX_FD = 1024
29
30 STOP_MSG = "STOP"
31
32 TRACE = os.environ.get("NEPI_TRACE", "false").lower() in ("true", "1", "on")
33
34 if hasattr(os, "devnull"):
35     DEV_NULL = os.devnull
36 else:
37     DEV_NULL = "/dev/null"
38
39 SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
40
41 def shell_escape(s):
42     """ Escapes strings so that they are safe to use as command-line arguments """
43     if SHELL_SAFE.match(s):
44         # safe string - no escaping needed
45         return s
46     else:
47         # unsafe string - escape
48         def escp(c):
49             if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",):
50                 return c
51             else:
52                 return "'$'\\x%02x''" % (ord(c),)
53         s = ''.join(map(escp,s))
54         return "'%s'" % (s,)
55
56 def eintr_retry(func):
57     import functools
58     @functools.wraps(func)
59     def rv(*p, **kw):
60         retry = kw.pop("_retry", False)
61         for i in xrange(0 if retry else 4):
62             try:
63                 return func(*p, **kw)
64             except (select.error, socket.error), args:
65                 if args[0] == errno.EINTR:
66                     continue
67                 else:
68                     raise 
69             except OSError, e:
70                 if e.errno == errno.EINTR:
71                     continue
72                 else:
73                     raise
74         else:
75             return func(*p, **kw)
76     return rv
77
78 class Server(object):
79     def __init__(self, root_dir = ".", log_level = DC.ERROR_LEVEL, 
80             environment_setup = "", clean_root = False):
81         self._root_dir = root_dir
82         self._clean_root = clean_root
83         self._stop = False
84         self._ctrl_sock = None
85         self._log_level = log_level
86         self._rdbuf = ""
87         self._environment_setup = environment_setup
88
89     def run(self):
90         try:
91             if self.daemonize():
92                 self.post_daemonize()
93                 self.loop()
94                 self.cleanup()
95                 # ref: "os._exit(0)"
96                 # can not return normally after fork beacuse no exec was done.
97                 # This means that if we don't do a os._exit(0) here the code that 
98                 # follows the call to "Server.run()" in the "caller code" will be 
99                 # executed... but by now it has already been executed after the 
100                 # first process (the one that did the first fork) returned.
101                 os._exit(0)
102         except:
103             print >>sys.stderr, "SERVER_ERROR."
104             self.log_error()
105             self.cleanup()
106             os._exit(0)
107         print >>sys.stderr, "SERVER_READY."
108
109     def daemonize(self):
110         # pipes for process synchronization
111         (r, w) = os.pipe()
112         
113         # build root folder
114         root = os.path.normpath(self._root_dir)
115         if os.path.exists(root) and self._clean_root:
116             shutil.rmtree(root)
117         if not os.path.exists(root):
118             os.makedirs(root, 0755)
119
120         pid1 = os.fork()
121         if pid1 > 0:
122             os.close(w)
123             while True:
124                 try:
125                     os.read(r, 1)
126                 except OSError, e: # pragma: no cover
127                     if e.errno == errno.EINTR:
128                         continue
129                     else:
130                         raise
131                 break
132             os.close(r)
133             # os.waitpid avoids leaving a <defunc> (zombie) process
134             st = os.waitpid(pid1, 0)[1]
135             if st:
136                 raise RuntimeError("Daemonization failed")
137             # return 0 to inform the caller method that this is not the 
138             # daemonized process
139             return 0
140         os.close(r)
141
142         # Decouple from parent environment.
143         os.chdir(self._root_dir)
144         os.umask(0)
145         os.setsid()
146
147         # fork 2
148         pid2 = os.fork()
149         if pid2 > 0:
150             # see ref: "os._exit(0)"
151             os._exit(0)
152
153         # close all open file descriptors.
154         max_fd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
155         if (max_fd == resource.RLIM_INFINITY):
156             max_fd = MAX_FD
157         for fd in range(3, max_fd):
158             if fd != w:
159                 try:
160                     os.close(fd)
161                 except OSError:
162                     pass
163
164         # Redirect standard file descriptors.
165         stdin = open(DEV_NULL, "r")
166         stderr = stdout = open(STD_ERR, "a", 0)
167         os.dup2(stdin.fileno(), sys.stdin.fileno())
168         # NOTE: sys.stdout.write will still be buffered, even if the file
169         # was opened with 0 buffer
170         os.dup2(stdout.fileno(), sys.stdout.fileno())
171         os.dup2(stderr.fileno(), sys.stderr.fileno())
172         
173         # setup environment
174         if self._environment_setup:
175             # parse environment variables and pass to child process
176             # do it by executing shell commands, in case there's some heavy setup involved
177             envproc = subprocess.Popen(
178                 [ "bash", "-c", 
179                     "( %s python -c 'import os,sys ; print \"\\x01\".join(\"\\x02\".join(map(str,x)) for x in os.environ.iteritems())' ) | tail -1" %
180                         ( self._environment_setup, ) ],
181                 stdin = subprocess.PIPE, 
182                 stdout = subprocess.PIPE,
183                 stderr = subprocess.PIPE
184             )
185             out,err = envproc.communicate()
186
187             # parse new environment
188             if out:
189                 environment = dict(map(lambda x:x.split("\x02"), out.split("\x01")))
190             
191                 # apply to current environment
192                 for name, value in environment.iteritems():
193                     os.environ[name] = value
194                 
195                 # apply pythonpath
196                 if 'PYTHONPATH' in environment:
197                     sys.path = environment['PYTHONPATH'].split(':') + sys.path
198
199         # create control socket
200         self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
201         self._ctrl_sock.bind(CTRL_SOCK)
202         self._ctrl_sock.listen(0)
203
204         # let the parent process know that the daemonization is finished
205         os.write(w, "\n")
206         os.close(w)
207         return 1
208
209     def post_daemonize(self):
210         os.environ["NEPI_CONTROLLER_LOGLEVEL"] = self._log_level
211         # QT, for some strange reason, redefines the SIGCHILD handler to write
212         # a \0 to a fd (lets say fileno 'x'), when ever a SIGCHILD is received.
213         # Server dameonization closes all file descriptors from fileno '3',
214         # but the overloaded handler (inherited by the forked process) will
215         # keep trying to write the \0 to fileno 'x', which might have been reused 
216         # after closing, for other operations. This is bad bad bad when fileno 'x'
217         # is in use for communication pouroses, because unexpected \0 start
218         # appearing in the communication messages... this is exactly what happens 
219         # when using netns in daemonized form. Thus, be have no other alternative than
220         # restoring the SIGCHLD handler to the default here.
221         import signal
222         signal.signal(signal.SIGCHLD, signal.SIG_DFL)
223
224     def loop(self):
225         while not self._stop:
226             conn, addr = self._ctrl_sock.accept()
227             self.log_error("ACCEPTED CONNECTION: %s" % (addr,))
228             conn.settimeout(5)
229             while not self._stop:
230                 try:
231                     msg = self.recv_msg(conn)
232                 except socket.timeout, e:
233                     self.log_error("SERVER recv_msg: connection timedout ")
234                     continue
235                 
236                 if not msg:
237                     self.log_error("CONNECTION LOST")
238                     break
239                     
240                 if msg == STOP_MSG:
241                     self._stop = True
242                     reply = self.stop_action()
243                 else:
244                     reply = self.reply_action(msg)
245                 
246                 try:
247                     self.send_reply(conn, reply)
248                 except socket.error:
249                     self.log_error()
250                     self.log_error("NOTICE: Awaiting for reconnection")
251                     break
252             try:
253                 conn.close()
254             except:
255                 # Doesn't matter
256                 self.log_error()
257
258     def recv_msg(self, conn):
259         data = [self._rdbuf]
260         chunk = data[0]
261         while '\n' not in chunk:
262             try:
263                 chunk = conn.recv(1024)
264             except (OSError, socket.error), e:
265                 if e[0] != errno.EINTR:
266                     raise
267                 else:
268                     continue
269             if chunk:
270                 data.append(chunk)
271             else:
272                 # empty chunk = EOF
273                 break
274         data = ''.join(data).split('\n',1)
275         while len(data) < 2:
276             data.append('')
277         data, self._rdbuf = data
278         
279         decoded = base64.b64decode(data)
280         return decoded.rstrip()
281
282     def send_reply(self, conn, reply):
283         encoded = base64.b64encode(reply)
284         conn.send("%s\n" % encoded)
285        
286     def cleanup(self):
287         try:
288             self._ctrl_sock.close()
289             os.remove(CTRL_SOCK)
290         except:
291             self.log_error()
292
293     def stop_action(self):
294         return "Stopping server"
295
296     def reply_action(self, msg):
297         return "Reply to: %s" % msg
298
299     def log_error(self, text = None, context = ''):
300         if text == None:
301             text = traceback.format_exc()
302         date = time.strftime("%Y-%m-%d %H:%M:%S")
303         if context:
304             context = " (%s)" % (context,)
305         sys.stderr.write("ERROR%s: %s\n%s\n" % (context, date, text))
306         return text
307
308     def log_debug(self, text):
309         if self._log_level == DC.DEBUG_LEVEL:
310             date = time.strftime("%Y-%m-%d %H:%M:%S")
311             sys.stderr.write("DEBUG: %s\n%s\n" % (date, text))
312
313 class Forwarder(object):
314     def __init__(self, root_dir = "."):
315         self._ctrl_sock = None
316         self._root_dir = root_dir
317         self._stop = False
318         self._rdbuf = ""
319
320     def forward(self):
321         self.connect()
322         print >>sys.stderr, "FORWARDER_READY."
323         while not self._stop:
324             data = self.read_data()
325             if not data:
326                 # Connection to client lost
327                 break
328             self.send_to_server(data)
329             
330             data = self.recv_from_server()
331             if not data:
332                 # Connection to server lost
333                 raise IOError, "Connection to server lost while "\
334                     "expecting response"
335             self.write_data(data)
336         self.disconnect()
337
338     def read_data(self):
339         return sys.stdin.readline()
340
341     def write_data(self, data):
342         sys.stdout.write(data)
343         # sys.stdout.write is buffered, this is why we need to do a flush()
344         sys.stdout.flush()
345
346     def send_to_server(self, data):
347         try:
348             self._ctrl_sock.send(data)
349         except (IOError, socket.error), e:
350             if e[0] == errno.EPIPE:
351                 self.connect()
352                 self._ctrl_sock.send(data)
353             else:
354                 raise e
355         encoded = data.rstrip() 
356         msg = base64.b64decode(encoded)
357         if msg == STOP_MSG:
358             self._stop = True
359
360     def recv_from_server(self):
361         data = [self._rdbuf]
362         chunk = data[0]
363         while '\n' not in chunk:
364             try:
365                 chunk = self._ctrl_sock.recv(1024)
366             except (OSError, socket.error), e:
367                 if e[0] != errno.EINTR:
368                     raise
369                 continue
370             if chunk:
371                 data.append(chunk)
372             else:
373                 # empty chunk = EOF
374                 break
375         data = ''.join(data).split('\n',1)
376         while len(data) < 2:
377             data.append('')
378         data, self._rdbuf = data
379         
380         return data+'\n'
381  
382     def connect(self):
383         self.disconnect()
384         self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
385         sock_addr = os.path.join(self._root_dir, CTRL_SOCK)
386         self._ctrl_sock.connect(sock_addr)
387
388     def disconnect(self):
389         try:
390             self._ctrl_sock.close()
391         except:
392             pass
393
394 class Client(object):
395     def __init__(self, root_dir = ".", host = None, port = None, user = None, 
396             agent = None, sudo = False, communication = DC.ACCESS_LOCAL,
397             environment_setup = ""):
398         self.root_dir = root_dir
399         self.addr = (host, port)
400         self.user = user
401         self.agent = agent
402         self.sudo = sudo
403         self.communication = communication
404         self.environment_setup = environment_setup
405         self._stopped = False
406         self._deferreds = collections.deque()
407         self.connect()
408     
409     def __del__(self):
410         if self._process.poll() is None:
411             os.kill(self._process.pid, signal.SIGTERM)
412         self._process.wait()
413         
414     def connect(self):
415         root_dir = self.root_dir
416         (host, port) = self.addr
417         user = self.user
418         agent = self.agent
419         sudo = self.sudo
420         communication = self.communication
421         
422         python_code = "from nepi.util import server;c=server.Forwarder(%r);\
423                 c.forward()" % (root_dir,)
424
425         self._process = popen_python(python_code, 
426                     communication = communication,
427                     host = host, 
428                     port = port, 
429                     user = user, 
430                     agent = agent, 
431                     sudo = sudo, 
432                     environment_setup = self.environment_setup)
433                
434         # Wait for the forwarder to be ready, otherwise nobody
435         # will be able to connect to it
436         helo = self._process.stderr.readline()
437         if helo != 'FORWARDER_READY.\n':
438             raise AssertionError, "Expected 'FORWARDER_READY.', got %r: %s" % (helo,
439                     helo + self._process.stderr.read())
440         
441     def send_msg(self, msg):
442         encoded = base64.b64encode(msg)
443         data = "%s\n" % encoded
444         
445         try:
446             self._process.stdin.write(data)
447         except (IOError, ValueError):
448             # dead process, poll it to un-zombify
449             self._process.poll()
450             
451             # try again after reconnect
452             # If it fails again, though, give up
453             self.connect()
454             self._process.stdin.write(data)
455
456     def send_stop(self):
457         self.send_msg(STOP_MSG)
458         self._stopped = True
459
460     def defer_reply(self, transform=None):
461         defer_entry = []
462         self._deferreds.append(defer_entry)
463         return defer.Defer(
464             functools.partial(self.read_reply, defer_entry, transform)
465         )
466         
467     def _read_reply(self):
468         data = self._process.stdout.readline()
469         encoded = data.rstrip() 
470         if not encoded:
471             # empty == eof == dead process, poll it to un-zombify
472             self._process.poll()
473             
474             raise RuntimeError, "Forwarder died while awaiting reply: %s" % (self._process.stderr.read(),)
475         return base64.b64decode(encoded)
476     
477     def read_reply(self, which=None, transform=None):
478         # Test to see if someone did it already
479         if which is not None and len(which):
480             # Ok, they did it...
481             # ...just return the deferred value
482             if transform:
483                 return transform(which[0])
484             else:
485                 return which[0]
486         
487         # Process all deferreds until the one we're looking for
488         # or until the queue is empty
489         while self._deferreds:
490             try:
491                 deferred = self._deferreds.popleft()
492             except IndexError:
493                 # emptied
494                 break
495             
496             deferred.append(self._read_reply())
497             if deferred is which:
498                 # We reached the one we were looking for
499                 if transform:
500                     return transform(deferred[0])
501                 else:
502                     return deferred[0]
503         
504         if which is None:
505             # They've requested a synchronous read
506             if transform:
507                 return transform(self._read_reply())
508             else:
509                 return self._read_reply()
510
511 def _make_server_key_args(server_key, host, port, args):
512     """ 
513     Returns a reference to the created temporary file, and adds the
514     corresponding arguments to the given argument list.
515     
516     Make sure to hold onto it until the process is done with the file
517     """
518     if port is not None:
519         host = '%s:%s' % (host,port)
520     # Create a temporary server key file
521     tmp_known_hosts = tempfile.NamedTemporaryFile()
522     
523     # Add the intended host key
524     tmp_known_hosts.write('%s,%s %s\n' % (host, socket.gethostbyname(host), server_key))
525     
526     # If we're not in strict mode, add user-configured keys
527     if os.environ.get('NEPI_STRICT_AUTH_MODE',"").lower() not in ('1','true','on'):
528         user_hosts_path = '%s/.ssh/known_hosts' % (os.environ.get('HOME',""),)
529         if os.access(user_hosts_path, os.R_OK):
530             f = open(user_hosts_path, "r")
531             tmp_known_hosts.write(f.read())
532             f.close()
533         
534     tmp_known_hosts.flush()
535     
536     args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
537     
538     return tmp_known_hosts
539
540 def popen_ssh_command(command, host, port, user, agent, 
541         stdin="", 
542         ident_key = None,
543         server_key = None,
544         tty = False,
545         timeout = None,
546         retry = 0,
547         err_on_timeout = True):
548     """
549     Executes a remote commands, returns ((stdout,stderr),process)
550     """
551     if TRACE:
552         print "ssh", host, command
553     
554     tmp_known_hosts = None
555     args = ['ssh',
556             # Don't bother with localhost. Makes test easier
557             '-o', 'NoHostAuthenticationForLocalhost=yes',
558             '-l', user, host]
559     if agent:
560         args.append('-A')
561     if port:
562         args.append('-p%d' % port)
563     if ident_key:
564         args.extend(('-i', ident_key))
565     if tty:
566         args.append('-t')
567     if server_key:
568         # Create a temporary server key file
569         tmp_known_hosts = _make_server_key_args(
570             server_key, host, port, args)
571     args.append(command)
572
573     while 1:
574         # connects to the remote host and starts a remote connection
575         proc = subprocess.Popen(args, 
576                 stdout = subprocess.PIPE,
577                 stdin = subprocess.PIPE, 
578                 stderr = subprocess.PIPE)
579         
580         # attach tempfile object to the process, to make sure the file stays
581         # alive until the process is finished with it
582         proc._known_hosts = tmp_known_hosts
583         
584         try:
585             out, err = _communicate(proc, stdin, timeout, err_on_timeout)
586             break
587         except RuntimeError,e:
588             if retry <= 0:
589                 raise
590             if TRACE:
591                 print " timedout -> ", e.args
592             retry -= 1
593         
594     if TRACE:
595         print " -> ", out, err
596
597     return ((out, err), proc)
598
599 def popen_scp(source, dest, 
600         port = None, 
601         agent = None, 
602         recursive = False,
603         ident_key = None,
604         server_key = None):
605     """
606     Copies from/to remote sites.
607     
608     Source and destination should have the user and host encoded
609     as per scp specs.
610     
611     If source is a file object, a special mode will be used to
612     create the remote file with the same contents.
613     
614     If dest is a file object, the remote file (source) will be
615     read and written into dest.
616     
617     In these modes, recursive cannot be True.
618     
619     Source can be a list of files to copy to a single destination,
620     in which case it is advised that the destination be a folder.
621     """
622     
623     if TRACE:
624         print "scp", source, dest
625     
626     if isinstance(source, file) and source.tell() == 0:
627         source = source.name
628     elif hasattr(source, 'read'):
629         tmp = tempfile.NamedTemporaryFile()
630         while True:
631             buf = source.read(65536)
632             if buf:
633                 tmp.write(buf)
634             else:
635                 break
636         tmp.seek(0)
637         source = tmp.name
638     
639     if isinstance(source, file) or isinstance(dest, file) \
640             or hasattr(source, 'read')  or hasattr(dest, 'write'):
641         assert not recursive
642         
643         # Parse source/destination as <user>@<server>:<path>
644         if isinstance(dest, basestring) and ':' in dest:
645             remspec, path = dest.split(':',1)
646         elif isinstance(source, basestring) and ':' in source:
647             remspec, path = source.split(':',1)
648         else:
649             raise ValueError, "Both endpoints cannot be local"
650         user,host = remspec.rsplit('@',1)
651         tmp_known_hosts = None
652         
653         args = ['ssh', '-l', user, '-C',
654                 # Don't bother with localhost. Makes test easier
655                 '-o', 'NoHostAuthenticationForLocalhost=yes',
656                 host ]
657         if port:
658             args.append('-P%d' % port)
659         if ident_key:
660             args.extend(('-i', ident_key))
661         if server_key:
662             # Create a temporary server key file
663             tmp_known_hosts = _make_server_key_args(
664                 server_key, host, port, args)
665         
666         if isinstance(source, file) or hasattr(source, 'read'):
667             args.append('cat > %s' % (shell_escape(path),))
668         elif isinstance(dest, file) or hasattr(dest, 'write'):
669             args.append('cat %s' % (shell_escape(path),))
670         else:
671             raise AssertionError, "Unreachable code reached! :-Q"
672         
673         # connects to the remote host and starts a remote connection
674         if isinstance(source, file):
675             proc = subprocess.Popen(args, 
676                     stdout = open('/dev/null','w'),
677                     stderr = subprocess.PIPE,
678                     stdin = source)
679             err = proc.stderr.read()
680             proc._known_hosts = tmp_known_hosts
681             eintr_retry(proc.wait)()
682             return ((None,err), proc)
683         elif isinstance(dest, file):
684             proc = subprocess.Popen(args, 
685                     stdout = open('/dev/null','w'),
686                     stderr = subprocess.PIPE,
687                     stdin = source)
688             err = proc.stderr.read()
689             proc._known_hosts = tmp_known_hosts
690             eintr_retry(proc.wait)()
691             return ((None,err), proc)
692         elif hasattr(source, 'read'):
693             # file-like (but not file) source
694             proc = subprocess.Popen(args, 
695                     stdout = open('/dev/null','w'),
696                     stderr = subprocess.PIPE,
697                     stdin = subprocess.PIPE)
698             
699             buf = None
700             err = []
701             while True:
702                 if not buf:
703                     buf = source.read(4096)
704                 if not buf:
705                     #EOF
706                     break
707                 
708                 rdrdy, wrdy, broken = select.select(
709                     [proc.stderr],
710                     [proc.stdin],
711                     [proc.stderr,proc.stdin])
712                 
713                 if proc.stderr in rdrdy:
714                     # use os.read for fully unbuffered behavior
715                     err.append(os.read(proc.stderr.fileno(), 4096))
716                 
717                 if proc.stdin in wrdy:
718                     proc.stdin.write(buf)
719                     buf = None
720                 
721                 if broken:
722                     break
723             proc.stdin.close()
724             err.append(proc.stderr.read())
725                 
726             proc._known_hosts = tmp_known_hosts
727             eintr_retry(proc.wait)()
728             return ((None,''.join(err)), proc)
729         elif hasattr(dest, 'write'):
730             # file-like (but not file) dest
731             proc = subprocess.Popen(args, 
732                     stdout = subprocess.PIPE,
733                     stderr = subprocess.PIPE,
734                     stdin = open('/dev/null','w'))
735             
736             buf = None
737             err = []
738             while True:
739                 rdrdy, wrdy, broken = select.select(
740                     [proc.stderr, proc.stdout],
741                     [],
742                     [proc.stderr, proc.stdout])
743                 
744                 if proc.stderr in rdrdy:
745                     # use os.read for fully unbuffered behavior
746                     err.append(os.read(proc.stderr.fileno(), 4096))
747                 
748                 if proc.stdout in rdrdy:
749                     # use os.read for fully unbuffered behavior
750                     buf = os.read(proc.stdout.fileno(), 4096)
751                     dest.write(buf)
752                     
753                     if not buf:
754                         #EOF
755                         break
756                 
757                 if broken:
758                     break
759             err.append(proc.stderr.read())
760                 
761             proc._known_hosts = tmp_known_hosts
762             eintr_retry(proc.wait)()
763             return ((None,''.join(err)), proc)
764         else:
765             raise AssertionError, "Unreachable code reached! :-Q"
766     else:
767         # Parse destination as <user>@<server>:<path>
768         if isinstance(dest, basestring) and ':' in dest:
769             remspec, path = dest.split(':',1)
770         elif isinstance(source, basestring) and ':' in source:
771             remspec, path = source.split(':',1)
772         else:
773             raise ValueError, "Both endpoints cannot be local"
774         user,host = remspec.rsplit('@',1)
775         
776         # plain scp
777         tmp_known_hosts = None
778         args = ['scp', '-q', '-p', '-C',
779                 # Don't bother with localhost. Makes test easier
780                 '-o', 'NoHostAuthenticationForLocalhost=yes' ]
781         if port:
782             args.append('-P%d' % port)
783         if recursive:
784             args.append('-r')
785         if ident_key:
786             args.extend(('-i', ident_key))
787         if server_key:
788             # Create a temporary server key file
789             tmp_known_hosts = _make_server_key_args(
790                 server_key, host, port, args)
791         if isinstance(source,list):
792             args.extend(source)
793         else:
794             args.append(source)
795         args.append(dest)
796
797         # connects to the remote host and starts a remote connection
798         proc = subprocess.Popen(args, 
799                 stdout = subprocess.PIPE,
800                 stdin = subprocess.PIPE, 
801                 stderr = subprocess.PIPE)
802         proc._known_hosts = tmp_known_hosts
803         
804         comm = proc.communicate()
805         eintr_retry(proc.wait)()
806         return (comm, proc)
807
808 def decode_and_execute():
809     # The python code we want to execute might have characters that 
810     # are not compatible with the 'inline' mode we are using. To avoid
811     # problems we receive the encoded python code in base64 as a input 
812     # stream and decode it for execution.
813     import base64, os
814     cmd = ""
815     while True:
816         try:
817             cmd += os.read(0, 1)# one byte from stdin
818         except OSError, e:            
819             if e.errno == errno.EINTR:
820                 continue
821             else:
822                 raise
823         if cmd[-1] == "\n": 
824             break
825     cmd = base64.b64decode(cmd)
826     # Uncomment for debug
827     #os.write(2, "Executing python code: %s\n" % cmd)
828     os.write(1, "OK\n") # send a sync message
829     exec(cmd)
830
831 def popen_python(python_code, 
832         communication = DC.ACCESS_LOCAL,
833         host = None, 
834         port = None, 
835         user = None, 
836         agent = False, 
837         python_path = None,
838         ident_key = None,
839         server_key = None,
840         tty = False,
841         sudo = False, 
842         environment_setup = ""):
843
844     shell = False
845     cmd = ""
846     if sudo:
847         cmd +="sudo "
848     if python_path:
849         python_path.replace("'", r"'\''")
850         cmd = """PYTHONPATH="$PYTHONPATH":'%s' """ % python_path
851         cmd += " ; "
852     if environment_setup:
853         cmd += environment_setup
854         cmd += " ; "
855     # Uncomment for debug (to run everything under strace)
856     # We had to verify if strace works (cannot nest them)
857     #cmd += "if strace echo >/dev/null 2>&1; then CMD='strace -ff -tt -s 200 -o strace.out'; else CMD=''; fi\n"
858     #cmd += "$CMD "
859     #cmd += "strace -f -tt -s 200 -o strace$$.out "
860     import nepi
861     cmd += "python -c 'import sys; sys.path.append(%s); from nepi.util import server; server.decode_and_execute()'" % (
862         repr(os.path.dirname(os.path.dirname(nepi.__file__))).replace("'",'"'),
863     )
864
865     if communication == DC.ACCESS_SSH:
866         tmp_known_hosts = None
867         args = ['ssh',
868                 # Don't bother with localhost. Makes test easier
869                 '-o', 'NoHostAuthenticationForLocalhost=yes',
870                 '-l', user, host]
871         if agent:
872             args.append('-A')
873         if port:
874             args.append('-p%d' % port)
875         if ident_key:
876             args.extend(('-i', ident_key))
877         if tty:
878             args.append('-t')
879         if server_key:
880             # Create a temporary server key file
881             tmp_known_hosts = _make_server_key_args(
882                 server_key, host, port, args)
883         args.append(cmd)
884     else:
885         args = [cmd]
886         shell = True
887
888     # connects to the remote host and starts a remote
889     proc = subprocess.Popen(args,
890             shell = shell, 
891             stdout = subprocess.PIPE,
892             stdin = subprocess.PIPE, 
893             stderr = subprocess.PIPE)
894
895     if communication == DC.ACCESS_SSH:
896         proc._known_hosts = tmp_known_hosts
897
898     # send the command to execute
899     os.write(proc.stdin.fileno(),
900             base64.b64encode(python_code) + "\n")
901  
902     while True: 
903         try:
904             msg = os.read(proc.stdout.fileno(), 3)
905             break
906         except OSError, e:            
907             if e.errno == errno.EINTR:
908                 continue
909             else:
910                 raise
911     
912     if msg != "OK\n":
913         raise RuntimeError, "Failed to start remote python interpreter: \nout:\n%s%s\nerr:\n%s" % (
914             msg, proc.stdout.read(), proc.stderr.read())
915
916     return proc
917
918 # POSIX
919 def _communicate(self, input, timeout=None, err_on_timeout=True):
920     read_set = []
921     write_set = []
922     stdout = None # Return
923     stderr = None # Return
924     
925     killed = False
926     
927     if timeout is not None:
928         timelimit = time.time() + timeout
929         killtime = timelimit + 4
930         bailtime = timelimit + 4
931
932     if self.stdin:
933         # Flush stdio buffer.  This might block, if the user has
934         # been writing to .stdin in an uncontrolled fashion.
935         self.stdin.flush()
936         if input:
937             write_set.append(self.stdin)
938         else:
939             self.stdin.close()
940     if self.stdout:
941         read_set.append(self.stdout)
942         stdout = []
943     if self.stderr:
944         read_set.append(self.stderr)
945         stderr = []
946
947     input_offset = 0
948     while read_set or write_set:
949         if timeout is not None:
950             curtime = time.time()
951             if timeout is None or curtime > timelimit:
952                 if curtime > bailtime:
953                     break
954                 elif curtime > killtime:
955                     signum = signal.SIGKILL
956                 else:
957                     signum = signal.SIGTERM
958                 # Lets kill it
959                 os.kill(self.pid, signum)
960                 select_timeout = 0.5
961             else:
962                 select_timeout = timelimit - curtime + 0.1
963         else:
964             select_timeout = None
965             
966         try:
967             rlist, wlist, xlist = select.select(read_set, write_set, [], select_timeout)
968         except select.error,e:
969             if e[0] != 4:
970                 raise
971             else:
972                 continue
973
974         if self.stdin in wlist:
975             # When select has indicated that the file is writable,
976             # we can write up to PIPE_BUF bytes without risk
977             # blocking.  POSIX defines PIPE_BUF >= 512
978             bytes_written = os.write(self.stdin.fileno(), buffer(input, input_offset, 512))
979             input_offset += bytes_written
980             if input_offset >= len(input):
981                 self.stdin.close()
982                 write_set.remove(self.stdin)
983
984         if self.stdout in rlist:
985             data = os.read(self.stdout.fileno(), 1024)
986             if data == "":
987                 self.stdout.close()
988                 read_set.remove(self.stdout)
989             stdout.append(data)
990
991         if self.stderr in rlist:
992             data = os.read(self.stderr.fileno(), 1024)
993             if data == "":
994                 self.stderr.close()
995                 read_set.remove(self.stderr)
996             stderr.append(data)
997     
998     # All data exchanged.  Translate lists into strings.
999     if stdout is not None:
1000         stdout = ''.join(stdout)
1001     if stderr is not None:
1002         stderr = ''.join(stderr)
1003
1004     # Translate newlines, if requested.  We cannot let the file
1005     # object do the translation: It is based on stdio, which is
1006     # impossible to combine with select (unless forcing no
1007     # buffering).
1008     if self.universal_newlines and hasattr(file, 'newlines'):
1009         if stdout:
1010             stdout = self._translate_newlines(stdout)
1011         if stderr:
1012             stderr = self._translate_newlines(stderr)
1013
1014     if killed and err_on_timeout:
1015         errcode = self.poll()
1016         raise RuntimeError, ("Operation timed out", errcode, stdout, stderr)
1017     else:
1018         if killed:
1019             self.poll()
1020         else:
1021             self.wait()
1022         return (stdout, stderr)
1023