Fix timeout option spec
[nepi.git] / src / nepi / util / server.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 from nepi.util.constants import DeploymentConfiguration as DC
5
6 import base64
7 import errno
8 import os
9 import os.path
10 import resource
11 import select
12 import shutil
13 import signal
14 import socket
15 import sys
16 import subprocess
17 import threading
18 import time
19 import traceback
20 import re
21 import tempfile
22 import defer
23 import functools
24 import collections
25
26 CTRL_SOCK = "ctrl.sock"
27 CTRL_PID = "ctrl.pid"
28 STD_ERR = "stderr.log"
29 MAX_FD = 1024
30
31 STOP_MSG = "STOP"
32
33 TRACE = os.environ.get("NEPI_TRACE", "false").lower() in ("true", "1", "on")
34
35 if hasattr(os, "devnull"):
36     DEV_NULL = os.devnull
37 else:
38     DEV_NULL = "/dev/null"
39
40 SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
41
42 def shell_escape(s):
43     """ Escapes strings so that they are safe to use as command-line arguments """
44     if SHELL_SAFE.match(s):
45         # safe string - no escaping needed
46         return s
47     else:
48         # unsafe string - escape
49         def escp(c):
50             if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",'"'):
51                 return c
52             else:
53                 return "'$'\\x%02x''" % (ord(c),)
54         s = ''.join(map(escp,s))
55         return "'%s'" % (s,)
56
57 def eintr_retry(func):
58     import functools
59     @functools.wraps(func)
60     def rv(*p, **kw):
61         retry = kw.pop("_retry", False)
62         for i in xrange(0 if retry else 4):
63             try:
64                 return func(*p, **kw)
65             except (select.error, socket.error), args:
66                 if args[0] == errno.EINTR:
67                     continue
68                 else:
69                     raise 
70             except OSError, e:
71                 if e.errno == errno.EINTR:
72                     continue
73                 else:
74                     raise
75         else:
76             return func(*p, **kw)
77     return rv
78
79 class Server(object):
80     def __init__(self, root_dir = ".", log_level = DC.ERROR_LEVEL, 
81             environment_setup = "", clean_root = False):
82         self._root_dir = root_dir
83         self._clean_root = clean_root
84         self._stop = False
85         self._ctrl_sock = None
86         self._log_level = log_level
87         self._rdbuf = ""
88         self._environment_setup = environment_setup
89
90     def run(self):
91         try:
92             if self.daemonize():
93                 self.post_daemonize()
94                 self.loop()
95                 self.cleanup()
96                 # ref: "os._exit(0)"
97                 # can not return normally after fork beacuse no exec was done.
98                 # This means that if we don't do a os._exit(0) here the code that 
99                 # follows the call to "Server.run()" in the "caller code" will be 
100                 # executed... but by now it has already been executed after the 
101                 # first process (the one that did the first fork) returned.
102                 os._exit(0)
103         except:
104             print >>sys.stderr, "SERVER_ERROR."
105             self.log_error()
106             self.cleanup()
107             os._exit(0)
108         print >>sys.stderr, "SERVER_READY."
109
110     def daemonize(self):
111         # pipes for process synchronization
112         (r, w) = os.pipe()
113         
114         # build root folder
115         root = os.path.normpath(self._root_dir)
116         if self._root_dir not in [".", ""] and os.path.exists(root) \
117                 and self._clean_root:
118             shutil.rmtree(root)
119         if not os.path.exists(root):
120             os.makedirs(root, 0755)
121
122         pid1 = os.fork()
123         if pid1 > 0:
124             os.close(w)
125             while True:
126                 try:
127                     os.read(r, 1)
128                 except OSError, e: # pragma: no cover
129                     if e.errno == errno.EINTR:
130                         continue
131                     else:
132                         raise
133                 break
134             os.close(r)
135             # os.waitpid avoids leaving a <defunc> (zombie) process
136             st = os.waitpid(pid1, 0)[1]
137             if st:
138                 raise RuntimeError("Daemonization failed")
139             # return 0 to inform the caller method that this is not the 
140             # daemonized process
141             return 0
142         os.close(r)
143
144         # Decouple from parent environment.
145         os.chdir(self._root_dir)
146         os.umask(0)
147         os.setsid()
148
149         # fork 2
150         pid2 = os.fork()
151         if pid2 > 0:
152             # see ref: "os._exit(0)"
153             os._exit(0)
154
155         # close all open file descriptors.
156         max_fd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
157         if (max_fd == resource.RLIM_INFINITY):
158             max_fd = MAX_FD
159         for fd in range(3, max_fd):
160             if fd != w:
161                 try:
162                     os.close(fd)
163                 except OSError:
164                     pass
165
166         # Redirect standard file descriptors.
167         stdin = open(DEV_NULL, "r")
168         stderr = stdout = open(STD_ERR, "a", 0)
169         os.dup2(stdin.fileno(), sys.stdin.fileno())
170         # NOTE: sys.stdout.write will still be buffered, even if the file
171         # was opened with 0 buffer
172         os.dup2(stdout.fileno(), sys.stdout.fileno())
173         os.dup2(stderr.fileno(), sys.stderr.fileno())
174         
175         # setup environment
176         if self._environment_setup:
177             # parse environment variables and pass to child process
178             # do it by executing shell commands, in case there's some heavy setup involved
179             envproc = subprocess.Popen(
180                 [ "bash", "-c", 
181                     "( %s python -c 'import os,sys ; print \"\\x01\".join(\"\\x02\".join(map(str,x)) for x in os.environ.iteritems())' ) | tail -1" %
182                         ( self._environment_setup, ) ],
183                 stdin = subprocess.PIPE, 
184                 stdout = subprocess.PIPE,
185                 stderr = subprocess.PIPE
186             )
187             out,err = envproc.communicate()
188
189             # parse new environment
190             if out:
191                 environment = dict(map(lambda x:x.split("\x02"), out.split("\x01")))
192             
193                 # apply to current environment
194                 for name, value in environment.iteritems():
195                     os.environ[name] = value
196                 
197                 # apply pythonpath
198                 if 'PYTHONPATH' in environment:
199                     sys.path = environment['PYTHONPATH'].split(':') + sys.path
200
201         # create control socket
202         self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
203         try:
204             self._ctrl_sock.bind(CTRL_SOCK)
205         except socket.error:
206             # Address in use, check pidfile
207             pid = None
208             try:
209                 pidfile = open(CTRL_PID, "r")
210                 pid = pidfile.read()
211                 pidfile.close()
212                 pid = int(pid)
213             except:
214                 # no pidfile
215                 pass
216             
217             if pid is not None:
218                 # Check process liveliness
219                 if not os.path.exists("/proc/%d" % (pid,)):
220                     # Ok, it's dead, clean the socket
221                     os.remove(CTRL_SOCK)
222             
223             # try again
224             self._ctrl_sock.bind(CTRL_SOCK)
225             
226         self._ctrl_sock.listen(0)
227         
228         # Save pidfile
229         pidfile = open(CTRL_PID, "w")
230         pidfile.write(str(os.getpid()))
231         pidfile.close()
232
233         # let the parent process know that the daemonization is finished
234         os.write(w, "\n")
235         os.close(w)
236         return 1
237
238     def post_daemonize(self):
239         os.environ["NEPI_CONTROLLER_LOGLEVEL"] = self._log_level
240         # QT, for some strange reason, redefines the SIGCHILD handler to write
241         # a \0 to a fd (lets say fileno 'x'), when ever a SIGCHILD is received.
242         # Server dameonization closes all file descriptors from fileno '3',
243         # but the overloaded handler (inherited by the forked process) will
244         # keep trying to write the \0 to fileno 'x', which might have been reused 
245         # after closing, for other operations. This is bad bad bad when fileno 'x'
246         # is in use for communication pouroses, because unexpected \0 start
247         # appearing in the communication messages... this is exactly what happens 
248         # when using netns in daemonized form. Thus, be have no other alternative than
249         # restoring the SIGCHLD handler to the default here.
250         import signal
251         signal.signal(signal.SIGCHLD, signal.SIG_DFL)
252
253     def loop(self):
254         while not self._stop:
255             conn, addr = self._ctrl_sock.accept()
256             self.log_error("ACCEPTED CONNECTION: %s" % (addr,))
257             conn.settimeout(5)
258             while not self._stop:
259                 try:
260                     msg = self.recv_msg(conn)
261                 except socket.timeout, e:
262                     #self.log_error("SERVER recv_msg: connection timedout ")
263                     continue
264                 
265                 if not msg:
266                     self.log_error("CONNECTION LOST")
267                     break
268                     
269                 if msg == STOP_MSG:
270                     self._stop = True
271                     reply = self.stop_action()
272                 else:
273                     reply = self.reply_action(msg)
274                 
275                 try:
276                     self.send_reply(conn, reply)
277                 except socket.error:
278                     self.log_error()
279                     self.log_error("NOTICE: Awaiting for reconnection")
280                     break
281             try:
282                 conn.close()
283             except:
284                 # Doesn't matter
285                 self.log_error()
286
287     def recv_msg(self, conn):
288         data = [self._rdbuf]
289         chunk = data[0]
290         while '\n' not in chunk:
291             try:
292                 chunk = conn.recv(1024)
293             except (OSError, socket.error), e:
294                 if e[0] != errno.EINTR:
295                     raise
296                 else:
297                     continue
298             if chunk:
299                 data.append(chunk)
300             else:
301                 # empty chunk = EOF
302                 break
303         data = ''.join(data).split('\n',1)
304         while len(data) < 2:
305             data.append('')
306         data, self._rdbuf = data
307         
308         decoded = base64.b64decode(data)
309         return decoded.rstrip()
310
311     def send_reply(self, conn, reply):
312         encoded = base64.b64encode(reply)
313         conn.send("%s\n" % encoded)
314        
315     def cleanup(self):
316         try:
317             self._ctrl_sock.close()
318             os.remove(CTRL_SOCK)
319         except:
320             self.log_error()
321
322     def stop_action(self):
323         return "Stopping server"
324
325     def reply_action(self, msg):
326         return "Reply to: %s" % msg
327
328     def log_error(self, text = None, context = ''):
329         if text == None:
330             text = traceback.format_exc()
331         date = time.strftime("%Y-%m-%d %H:%M:%S")
332         if context:
333             context = " (%s)" % (context,)
334         sys.stderr.write("ERROR%s: %s\n%s\n" % (context, date, text))
335         return text
336
337     def log_debug(self, text):
338         if self._log_level == DC.DEBUG_LEVEL:
339             date = time.strftime("%Y-%m-%d %H:%M:%S")
340             sys.stderr.write("DEBUG: %s\n%s\n" % (date, text))
341
342 class Forwarder(object):
343     def __init__(self, root_dir = "."):
344         self._ctrl_sock = None
345         self._root_dir = root_dir
346         self._stop = False
347         self._rdbuf = ""
348
349     def forward(self):
350         self.connect()
351         print >>sys.stderr, "FORWARDER_READY."
352         while not self._stop:
353             data = self.read_data()
354             if not data:
355                 # Connection to client lost
356                 break
357             self.send_to_server(data)
358             
359             data = self.recv_from_server()
360             if not data:
361                 # Connection to server lost
362                 raise IOError, "Connection to server lost while "\
363                     "expecting response"
364             self.write_data(data)
365         self.disconnect()
366
367     def read_data(self):
368         return sys.stdin.readline()
369
370     def write_data(self, data):
371         sys.stdout.write(data)
372         # sys.stdout.write is buffered, this is why we need to do a flush()
373         sys.stdout.flush()
374
375     def send_to_server(self, data):
376         try:
377             self._ctrl_sock.send(data)
378         except (IOError, socket.error), e:
379             if e[0] == errno.EPIPE:
380                 self.connect()
381                 self._ctrl_sock.send(data)
382             else:
383                 raise e
384         encoded = data.rstrip() 
385         msg = base64.b64decode(encoded)
386         if msg == STOP_MSG:
387             self._stop = True
388
389     def recv_from_server(self):
390         data = [self._rdbuf]
391         chunk = data[0]
392         while '\n' not in chunk:
393             try:
394                 chunk = self._ctrl_sock.recv(1024)
395             except (OSError, socket.error), e:
396                 if e[0] != errno.EINTR:
397                     raise
398                 continue
399             if chunk:
400                 data.append(chunk)
401             else:
402                 # empty chunk = EOF
403                 break
404         data = ''.join(data).split('\n',1)
405         while len(data) < 2:
406             data.append('')
407         data, self._rdbuf = data
408         
409         return data+'\n'
410  
411     def connect(self):
412         self.disconnect()
413         self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
414         sock_addr = os.path.join(self._root_dir, CTRL_SOCK)
415         self._ctrl_sock.connect(sock_addr)
416
417     def disconnect(self):
418         try:
419             self._ctrl_sock.close()
420         except:
421             pass
422
423 class Client(object):
424     def __init__(self, root_dir = ".", host = None, port = None, user = None, 
425             agent = None, sudo = False, communication = DC.ACCESS_LOCAL,
426             environment_setup = ""):
427         self.root_dir = root_dir
428         self.addr = (host, port)
429         self.user = user
430         self.agent = agent
431         self.sudo = sudo
432         self.communication = communication
433         self.environment_setup = environment_setup
434         self._stopped = False
435         self._deferreds = collections.deque()
436         self.connect()
437     
438     def __del__(self):
439         if self._process.poll() is None:
440             os.kill(self._process.pid, signal.SIGTERM)
441         self._process.wait()
442         
443     def connect(self):
444         root_dir = self.root_dir
445         (host, port) = self.addr
446         user = self.user
447         agent = self.agent
448         sudo = self.sudo
449         communication = self.communication
450         
451         python_code = "from nepi.util import server;c=server.Forwarder(%r);\
452                 c.forward()" % (root_dir,)
453
454         self._process = popen_python(python_code, 
455                     communication = communication,
456                     host = host, 
457                     port = port, 
458                     user = user, 
459                     agent = agent, 
460                     sudo = sudo, 
461                     environment_setup = self.environment_setup)
462                
463         # Wait for the forwarder to be ready, otherwise nobody
464         # will be able to connect to it
465         err = []
466         helo = "nope"
467         while helo:
468             helo = self._process.stderr.readline()
469             if helo == 'FORWARDER_READY.\n':
470                 break
471             err.append(helo)
472         else:
473             raise AssertionError, "Expected 'FORWARDER_READY.', got: %s" % (''.join(err),)
474         
475     def send_msg(self, msg):
476         encoded = base64.b64encode(msg)
477         data = "%s\n" % encoded
478         
479         try:
480             self._process.stdin.write(data)
481         except (IOError, ValueError):
482             # dead process, poll it to un-zombify
483             self._process.poll()
484             
485             # try again after reconnect
486             # If it fails again, though, give up
487             self.connect()
488             self._process.stdin.write(data)
489
490     def send_stop(self):
491         self.send_msg(STOP_MSG)
492         self._stopped = True
493
494     def defer_reply(self, transform=None):
495         defer_entry = []
496         self._deferreds.append(defer_entry)
497         return defer.Defer(
498             functools.partial(self.read_reply, defer_entry, transform)
499         )
500         
501     def _read_reply(self):
502         data = self._process.stdout.readline()
503         encoded = data.rstrip() 
504         if not encoded:
505             # empty == eof == dead process, poll it to un-zombify
506             self._process.poll()
507             
508             raise RuntimeError, "Forwarder died while awaiting reply: %s" % (self._process.stderr.read(),)
509         return base64.b64decode(encoded)
510     
511     def read_reply(self, which=None, transform=None):
512         # Test to see if someone did it already
513         if which is not None and len(which):
514             # Ok, they did it...
515             # ...just return the deferred value
516             if transform:
517                 return transform(which[0])
518             else:
519                 return which[0]
520         
521         # Process all deferreds until the one we're looking for
522         # or until the queue is empty
523         while self._deferreds:
524             try:
525                 deferred = self._deferreds.popleft()
526             except IndexError:
527                 # emptied
528                 break
529             
530             deferred.append(self._read_reply())
531             if deferred is which:
532                 # We reached the one we were looking for
533                 if transform:
534                     return transform(deferred[0])
535                 else:
536                     return deferred[0]
537         
538         if which is None:
539             # They've requested a synchronous read
540             if transform:
541                 return transform(self._read_reply())
542             else:
543                 return self._read_reply()
544
545 def _make_server_key_args(server_key, host, port, args):
546     """ 
547     Returns a reference to the created temporary file, and adds the
548     corresponding arguments to the given argument list.
549     
550     Make sure to hold onto it until the process is done with the file
551     """
552     if port is not None:
553         host = '%s:%s' % (host,port)
554     # Create a temporary server key file
555     tmp_known_hosts = tempfile.NamedTemporaryFile()
556     
557     # Add the intended host key
558     tmp_known_hosts.write('%s,%s %s\n' % (host, socket.gethostbyname(host), server_key))
559     
560     # If we're not in strict mode, add user-configured keys
561     if os.environ.get('NEPI_STRICT_AUTH_MODE',"").lower() not in ('1','true','on'):
562         user_hosts_path = '%s/.ssh/known_hosts' % (os.environ.get('HOME',""),)
563         if os.access(user_hosts_path, os.R_OK):
564             f = open(user_hosts_path, "r")
565             tmp_known_hosts.write(f.read())
566             f.close()
567         
568     tmp_known_hosts.flush()
569     
570     args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
571     
572     return tmp_known_hosts
573
574 def popen_ssh_command(command, host, port, user, agent, 
575         stdin="", 
576         ident_key = None,
577         server_key = None,
578         tty = False,
579         timeout = None,
580         retry = 0,
581         err_on_timeout = True,
582         connect_timeout = 30):
583     """
584     Executes a remote commands, returns ((stdout,stderr),process)
585     """
586     if TRACE:
587         print "ssh", host, command
588     
589     tmp_known_hosts = None
590     args = ['ssh',
591             # Don't bother with localhost. Makes test easier
592             '-o', 'NoHostAuthenticationForLocalhost=yes',
593             '-o', 'ConnectTimeout=%d' % (int(connect_timeout),),
594             '-l', user, host]
595     if agent:
596         args.append('-A')
597     if port:
598         args.append('-p%d' % port)
599     if ident_key:
600         args.extend(('-i', ident_key))
601     if tty:
602         args.append('-t')
603     if server_key:
604         # Create a temporary server key file
605         tmp_known_hosts = _make_server_key_args(
606             server_key, host, port, args)
607     args.append(command)
608
609     for x in xrange(retry or 3):
610         # connects to the remote host and starts a remote connection
611         proc = subprocess.Popen(args, 
612                 stdout = subprocess.PIPE,
613                 stdin = subprocess.PIPE, 
614                 stderr = subprocess.PIPE)
615         
616         # attach tempfile object to the process, to make sure the file stays
617         # alive until the process is finished with it
618         proc._known_hosts = tmp_known_hosts
619         
620         try:
621             out, err = _communicate(proc, stdin, timeout, err_on_timeout)
622             if proc.poll() and err.strip().startswith('ssh: '):
623                 # SSH error, can safely retry
624                 continue
625             break
626         except RuntimeError,e:
627             if retry <= 0:
628                 raise
629             if TRACE:
630                 print " timedout -> ", e.args
631             retry -= 1
632         
633     if TRACE:
634         print " -> ", out, err
635
636     return ((out, err), proc)
637
638 def popen_scp(source, dest, 
639         port = None, 
640         agent = None, 
641         recursive = False,
642         ident_key = None,
643         server_key = None):
644     """
645     Copies from/to remote sites.
646     
647     Source and destination should have the user and host encoded
648     as per scp specs.
649     
650     If source is a file object, a special mode will be used to
651     create the remote file with the same contents.
652     
653     If dest is a file object, the remote file (source) will be
654     read and written into dest.
655     
656     In these modes, recursive cannot be True.
657     
658     Source can be a list of files to copy to a single destination,
659     in which case it is advised that the destination be a folder.
660     """
661     
662     if TRACE:
663         print "scp", source, dest
664     
665     if isinstance(source, file) and source.tell() == 0:
666         source = source.name
667     elif hasattr(source, 'read'):
668         tmp = tempfile.NamedTemporaryFile()
669         while True:
670             buf = source.read(65536)
671             if buf:
672                 tmp.write(buf)
673             else:
674                 break
675         tmp.seek(0)
676         source = tmp.name
677     
678     if isinstance(source, file) or isinstance(dest, file) \
679             or hasattr(source, 'read')  or hasattr(dest, 'write'):
680         assert not recursive
681         
682         # Parse source/destination as <user>@<server>:<path>
683         if isinstance(dest, basestring) and ':' in dest:
684             remspec, path = dest.split(':',1)
685         elif isinstance(source, basestring) and ':' in source:
686             remspec, path = source.split(':',1)
687         else:
688             raise ValueError, "Both endpoints cannot be local"
689         user,host = remspec.rsplit('@',1)
690         tmp_known_hosts = None
691         
692         args = ['ssh', '-l', user, '-C',
693                 # Don't bother with localhost. Makes test easier
694                 '-o', 'NoHostAuthenticationForLocalhost=yes',
695                 host ]
696         if port:
697             args.append('-P%d' % port)
698         if ident_key:
699             args.extend(('-i', ident_key))
700         if server_key:
701             # Create a temporary server key file
702             tmp_known_hosts = _make_server_key_args(
703                 server_key, host, port, args)
704         
705         if isinstance(source, file) or hasattr(source, 'read'):
706             args.append('cat > %s' % (shell_escape(path),))
707         elif isinstance(dest, file) or hasattr(dest, 'write'):
708             args.append('cat %s' % (shell_escape(path),))
709         else:
710             raise AssertionError, "Unreachable code reached! :-Q"
711         
712         # connects to the remote host and starts a remote connection
713         if isinstance(source, file):
714             proc = subprocess.Popen(args, 
715                     stdout = open('/dev/null','w'),
716                     stderr = subprocess.PIPE,
717                     stdin = source)
718             err = proc.stderr.read()
719             proc._known_hosts = tmp_known_hosts
720             eintr_retry(proc.wait)()
721             return ((None,err), proc)
722         elif isinstance(dest, file):
723             proc = subprocess.Popen(args, 
724                     stdout = open('/dev/null','w'),
725                     stderr = subprocess.PIPE,
726                     stdin = source)
727             err = proc.stderr.read()
728             proc._known_hosts = tmp_known_hosts
729             eintr_retry(proc.wait)()
730             return ((None,err), proc)
731         elif hasattr(source, 'read'):
732             # file-like (but not file) source
733             proc = subprocess.Popen(args, 
734                     stdout = open('/dev/null','w'),
735                     stderr = subprocess.PIPE,
736                     stdin = subprocess.PIPE)
737             
738             buf = None
739             err = []
740             while True:
741                 if not buf:
742                     buf = source.read(4096)
743                 if not buf:
744                     #EOF
745                     break
746                 
747                 rdrdy, wrdy, broken = select.select(
748                     [proc.stderr],
749                     [proc.stdin],
750                     [proc.stderr,proc.stdin])
751                 
752                 if proc.stderr in rdrdy:
753                     # use os.read for fully unbuffered behavior
754                     err.append(os.read(proc.stderr.fileno(), 4096))
755                 
756                 if proc.stdin in wrdy:
757                     proc.stdin.write(buf)
758                     buf = None
759                 
760                 if broken:
761                     break
762             proc.stdin.close()
763             err.append(proc.stderr.read())
764                 
765             proc._known_hosts = tmp_known_hosts
766             eintr_retry(proc.wait)()
767             return ((None,''.join(err)), proc)
768         elif hasattr(dest, 'write'):
769             # file-like (but not file) dest
770             proc = subprocess.Popen(args, 
771                     stdout = subprocess.PIPE,
772                     stderr = subprocess.PIPE,
773                     stdin = open('/dev/null','w'))
774             
775             buf = None
776             err = []
777             while True:
778                 rdrdy, wrdy, broken = select.select(
779                     [proc.stderr, proc.stdout],
780                     [],
781                     [proc.stderr, proc.stdout])
782                 
783                 if proc.stderr in rdrdy:
784                     # use os.read for fully unbuffered behavior
785                     err.append(os.read(proc.stderr.fileno(), 4096))
786                 
787                 if proc.stdout in rdrdy:
788                     # use os.read for fully unbuffered behavior
789                     buf = os.read(proc.stdout.fileno(), 4096)
790                     dest.write(buf)
791                     
792                     if not buf:
793                         #EOF
794                         break
795                 
796                 if broken:
797                     break
798             err.append(proc.stderr.read())
799                 
800             proc._known_hosts = tmp_known_hosts
801             eintr_retry(proc.wait)()
802             return ((None,''.join(err)), proc)
803         else:
804             raise AssertionError, "Unreachable code reached! :-Q"
805     else:
806         # Parse destination as <user>@<server>:<path>
807         if isinstance(dest, basestring) and ':' in dest:
808             remspec, path = dest.split(':',1)
809         elif isinstance(source, basestring) and ':' in source:
810             remspec, path = source.split(':',1)
811         else:
812             raise ValueError, "Both endpoints cannot be local"
813         user,host = remspec.rsplit('@',1)
814         
815         # plain scp
816         tmp_known_hosts = None
817         args = ['scp', '-q', '-p', '-C',
818                 # Don't bother with localhost. Makes test easier
819                 '-o', 'NoHostAuthenticationForLocalhost=yes' ]
820         if port:
821             args.append('-P%d' % port)
822         if recursive:
823             args.append('-r')
824         if ident_key:
825             args.extend(('-i', ident_key))
826         if server_key:
827             # Create a temporary server key file
828             tmp_known_hosts = _make_server_key_args(
829                 server_key, host, port, args)
830         if isinstance(source,list):
831             args.extend(source)
832         else:
833             args.append(source)
834         args.append(dest)
835
836         # connects to the remote host and starts a remote connection
837         proc = subprocess.Popen(args, 
838                 stdout = subprocess.PIPE,
839                 stdin = subprocess.PIPE, 
840                 stderr = subprocess.PIPE)
841         proc._known_hosts = tmp_known_hosts
842         
843         comm = proc.communicate()
844         eintr_retry(proc.wait)()
845         return (comm, proc)
846
847 def decode_and_execute():
848     # The python code we want to execute might have characters that 
849     # are not compatible with the 'inline' mode we are using. To avoid
850     # problems we receive the encoded python code in base64 as a input 
851     # stream and decode it for execution.
852     import base64, os
853     cmd = ""
854     while True:
855         try:
856             cmd += os.read(0, 1)# one byte from stdin
857         except OSError, e:            
858             if e.errno == errno.EINTR:
859                 continue
860             else:
861                 raise
862         if cmd[-1] == "\n": 
863             break
864     cmd = base64.b64decode(cmd)
865     # Uncomment for debug
866     #os.write(2, "Executing python code: %s\n" % cmd)
867     os.write(1, "OK\n") # send a sync message
868     exec(cmd)
869
870 def popen_python(python_code, 
871         communication = DC.ACCESS_LOCAL,
872         host = None, 
873         port = None, 
874         user = None, 
875         agent = False, 
876         python_path = None,
877         ident_key = None,
878         server_key = None,
879         tty = False,
880         sudo = False, 
881         environment_setup = ""):
882
883     cmd = ""
884     if python_path:
885         python_path.replace("'", r"'\''")
886         cmd = """PYTHONPATH="$PYTHONPATH":'%s' """ % python_path
887         cmd += " ; "
888     if environment_setup:
889         cmd += environment_setup
890         cmd += " ; "
891     # Uncomment for debug (to run everything under strace)
892     # We had to verify if strace works (cannot nest them)
893     #cmd += "if strace echo >/dev/null 2>&1; then CMD='strace -ff -tt -s 200 -o strace.out'; else CMD=''; fi\n"
894     #cmd += "$CMD "
895     #cmd += "strace -f -tt -s 200 -o strace$$.out "
896     import nepi
897     cmd += "python -c 'import sys; sys.path.insert(0,%s); from nepi.util import server; server.decode_and_execute()'" % (
898         repr(os.path.dirname(os.path.dirname(nepi.__file__))).replace("'",'"'),
899     )
900
901     if sudo:
902         if ';' in cmd:
903             cmd = "sudo bash -c " + shell_escape(cmd)
904         else:
905             cmd = "sudo " + cmd
906
907     if communication == DC.ACCESS_SSH:
908         tmp_known_hosts = None
909         args = ['ssh',
910                 # Don't bother with localhost. Makes test easier
911                 '-o', 'NoHostAuthenticationForLocalhost=yes',
912                 '-l', user, host]
913         if agent:
914             args.append('-A')
915         if port:
916             args.append('-p%d' % port)
917         if ident_key:
918             args.extend(('-i', ident_key))
919         if tty:
920             args.append('-t')
921         if server_key:
922             # Create a temporary server key file
923             tmp_known_hosts = _make_server_key_args(
924                 server_key, host, port, args)
925         args.append(cmd)
926     else:
927         args = [ "/bin/bash", "-c", cmd ]
928
929     # connects to the remote host and starts a remote
930     proc = subprocess.Popen(args,
931             shell = False, 
932             stdout = subprocess.PIPE,
933             stdin = subprocess.PIPE, 
934             stderr = subprocess.PIPE)
935
936     if communication == DC.ACCESS_SSH:
937         proc._known_hosts = tmp_known_hosts
938
939     # send the command to execute
940     os.write(proc.stdin.fileno(),
941             base64.b64encode(python_code) + "\n")
942  
943     while True: 
944         try:
945             msg = os.read(proc.stdout.fileno(), 3)
946             break
947         except OSError, e:            
948             if e.errno == errno.EINTR:
949                 continue
950             else:
951                 raise
952     
953     if msg != "OK\n":
954         raise RuntimeError, "Failed to start remote python interpreter: \nout:\n%s%s\nerr:\n%s" % (
955             msg, proc.stdout.read(), proc.stderr.read())
956
957     return proc
958
959 # POSIX
960 def _communicate(self, input, timeout=None, err_on_timeout=True):
961     read_set = []
962     write_set = []
963     stdout = None # Return
964     stderr = None # Return
965     
966     killed = False
967     
968     if timeout is not None:
969         timelimit = time.time() + timeout
970         killtime = timelimit + 4
971         bailtime = timelimit + 4
972
973     if self.stdin:
974         # Flush stdio buffer.  This might block, if the user has
975         # been writing to .stdin in an uncontrolled fashion.
976         self.stdin.flush()
977         if input:
978             write_set.append(self.stdin)
979         else:
980             self.stdin.close()
981     if self.stdout:
982         read_set.append(self.stdout)
983         stdout = []
984     if self.stderr:
985         read_set.append(self.stderr)
986         stderr = []
987
988     input_offset = 0
989     while read_set or write_set:
990         if timeout is not None:
991             curtime = time.time()
992             if timeout is None or curtime > timelimit:
993                 if curtime > bailtime:
994                     break
995                 elif curtime > killtime:
996                     signum = signal.SIGKILL
997                 else:
998                     signum = signal.SIGTERM
999                 # Lets kill it
1000                 os.kill(self.pid, signum)
1001                 select_timeout = 0.5
1002             else:
1003                 select_timeout = timelimit - curtime + 0.1
1004         else:
1005             select_timeout = None
1006             
1007         try:
1008             rlist, wlist, xlist = select.select(read_set, write_set, [], select_timeout)
1009         except select.error,e:
1010             if e[0] != 4:
1011                 raise
1012             else:
1013                 continue
1014
1015         if self.stdin in wlist:
1016             # When select has indicated that the file is writable,
1017             # we can write up to PIPE_BUF bytes without risk
1018             # blocking.  POSIX defines PIPE_BUF >= 512
1019             bytes_written = os.write(self.stdin.fileno(), buffer(input, input_offset, 512))
1020             input_offset += bytes_written
1021             if input_offset >= len(input):
1022                 self.stdin.close()
1023                 write_set.remove(self.stdin)
1024
1025         if self.stdout in rlist:
1026             data = os.read(self.stdout.fileno(), 1024)
1027             if data == "":
1028                 self.stdout.close()
1029                 read_set.remove(self.stdout)
1030             stdout.append(data)
1031
1032         if self.stderr in rlist:
1033             data = os.read(self.stderr.fileno(), 1024)
1034             if data == "":
1035                 self.stderr.close()
1036                 read_set.remove(self.stderr)
1037             stderr.append(data)
1038     
1039     # All data exchanged.  Translate lists into strings.
1040     if stdout is not None:
1041         stdout = ''.join(stdout)
1042     if stderr is not None:
1043         stderr = ''.join(stderr)
1044
1045     # Translate newlines, if requested.  We cannot let the file
1046     # object do the translation: It is based on stdio, which is
1047     # impossible to combine with select (unless forcing no
1048     # buffering).
1049     if self.universal_newlines and hasattr(file, 'newlines'):
1050         if stdout:
1051             stdout = self._translate_newlines(stdout)
1052         if stderr:
1053             stderr = self._translate_newlines(stderr)
1054
1055     if killed and err_on_timeout:
1056         errcode = self.poll()
1057         raise RuntimeError, ("Operation timed out", errcode, stdout, stderr)
1058     else:
1059         if killed:
1060             self.poll()
1061         else:
1062             self.wait()
1063         return (stdout, stderr)
1064