Make servers able to launch when a stale ctrl.sock from a pervious server remains.
[nepi.git] / src / nepi / util / server.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 from nepi.util.constants import DeploymentConfiguration as DC
5
6 import base64
7 import errno
8 import os
9 import os.path
10 import resource
11 import select
12 import shutil
13 import signal
14 import socket
15 import sys
16 import subprocess
17 import threading
18 import time
19 import traceback
20 import re
21 import tempfile
22 import defer
23 import functools
24 import collections
25
26 CTRL_SOCK = "ctrl.sock"
27 CTRL_PID = "ctrl.pid"
28 STD_ERR = "stderr.log"
29 MAX_FD = 1024
30
31 STOP_MSG = "STOP"
32
33 TRACE = os.environ.get("NEPI_TRACE", "false").lower() in ("true", "1", "on")
34
35 if hasattr(os, "devnull"):
36     DEV_NULL = os.devnull
37 else:
38     DEV_NULL = "/dev/null"
39
40 SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
41
42 def shell_escape(s):
43     """ Escapes strings so that they are safe to use as command-line arguments """
44     if SHELL_SAFE.match(s):
45         # safe string - no escaping needed
46         return s
47     else:
48         # unsafe string - escape
49         def escp(c):
50             if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",'"'):
51                 return c
52             else:
53                 return "'$'\\x%02x''" % (ord(c),)
54         s = ''.join(map(escp,s))
55         return "'%s'" % (s,)
56
57 def eintr_retry(func):
58     import functools
59     @functools.wraps(func)
60     def rv(*p, **kw):
61         retry = kw.pop("_retry", False)
62         for i in xrange(0 if retry else 4):
63             try:
64                 return func(*p, **kw)
65             except (select.error, socket.error), args:
66                 if args[0] == errno.EINTR:
67                     continue
68                 else:
69                     raise 
70             except OSError, e:
71                 if e.errno == errno.EINTR:
72                     continue
73                 else:
74                     raise
75         else:
76             return func(*p, **kw)
77     return rv
78
79 class Server(object):
80     def __init__(self, root_dir = ".", log_level = DC.ERROR_LEVEL, 
81             environment_setup = "", clean_root = False):
82         self._root_dir = root_dir
83         self._clean_root = clean_root
84         self._stop = False
85         self._ctrl_sock = None
86         self._log_level = log_level
87         self._rdbuf = ""
88         self._environment_setup = environment_setup
89
90     def run(self):
91         try:
92             if self.daemonize():
93                 self.post_daemonize()
94                 self.loop()
95                 self.cleanup()
96                 # ref: "os._exit(0)"
97                 # can not return normally after fork beacuse no exec was done.
98                 # This means that if we don't do a os._exit(0) here the code that 
99                 # follows the call to "Server.run()" in the "caller code" will be 
100                 # executed... but by now it has already been executed after the 
101                 # first process (the one that did the first fork) returned.
102                 os._exit(0)
103         except:
104             print >>sys.stderr, "SERVER_ERROR."
105             self.log_error()
106             self.cleanup()
107             os._exit(0)
108         print >>sys.stderr, "SERVER_READY."
109
110     def daemonize(self):
111         # pipes for process synchronization
112         (r, w) = os.pipe()
113         
114         # build root folder
115         root = os.path.normpath(self._root_dir)
116         if os.path.exists(root) and self._clean_root:
117             shutil.rmtree(root)
118         if not os.path.exists(root):
119             os.makedirs(root, 0755)
120
121         pid1 = os.fork()
122         if pid1 > 0:
123             os.close(w)
124             while True:
125                 try:
126                     os.read(r, 1)
127                 except OSError, e: # pragma: no cover
128                     if e.errno == errno.EINTR:
129                         continue
130                     else:
131                         raise
132                 break
133             os.close(r)
134             # os.waitpid avoids leaving a <defunc> (zombie) process
135             st = os.waitpid(pid1, 0)[1]
136             if st:
137                 raise RuntimeError("Daemonization failed")
138             # return 0 to inform the caller method that this is not the 
139             # daemonized process
140             return 0
141         os.close(r)
142
143         # Decouple from parent environment.
144         os.chdir(self._root_dir)
145         os.umask(0)
146         os.setsid()
147
148         # fork 2
149         pid2 = os.fork()
150         if pid2 > 0:
151             # see ref: "os._exit(0)"
152             os._exit(0)
153
154         # close all open file descriptors.
155         max_fd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
156         if (max_fd == resource.RLIM_INFINITY):
157             max_fd = MAX_FD
158         for fd in range(3, max_fd):
159             if fd != w:
160                 try:
161                     os.close(fd)
162                 except OSError:
163                     pass
164
165         # Redirect standard file descriptors.
166         stdin = open(DEV_NULL, "r")
167         stderr = stdout = open(STD_ERR, "a", 0)
168         os.dup2(stdin.fileno(), sys.stdin.fileno())
169         # NOTE: sys.stdout.write will still be buffered, even if the file
170         # was opened with 0 buffer
171         os.dup2(stdout.fileno(), sys.stdout.fileno())
172         os.dup2(stderr.fileno(), sys.stderr.fileno())
173         
174         # setup environment
175         if self._environment_setup:
176             # parse environment variables and pass to child process
177             # do it by executing shell commands, in case there's some heavy setup involved
178             envproc = subprocess.Popen(
179                 [ "bash", "-c", 
180                     "( %s python -c 'import os,sys ; print \"\\x01\".join(\"\\x02\".join(map(str,x)) for x in os.environ.iteritems())' ) | tail -1" %
181                         ( self._environment_setup, ) ],
182                 stdin = subprocess.PIPE, 
183                 stdout = subprocess.PIPE,
184                 stderr = subprocess.PIPE
185             )
186             out,err = envproc.communicate()
187
188             # parse new environment
189             if out:
190                 environment = dict(map(lambda x:x.split("\x02"), out.split("\x01")))
191             
192                 # apply to current environment
193                 for name, value in environment.iteritems():
194                     os.environ[name] = value
195                 
196                 # apply pythonpath
197                 if 'PYTHONPATH' in environment:
198                     sys.path = environment['PYTHONPATH'].split(':') + sys.path
199
200         # create control socket
201         self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
202         try:
203             self._ctrl_sock.bind(CTRL_SOCK)
204         except socket.error:
205             # Address in use, check pidfile
206             pid = None
207             try:
208                 pidfile = open(CTRL_PID, "r")
209                 pid = pidfile.read()
210                 pidfile.close()
211                 pid = int(pid)
212             except:
213                 # no pidfile
214                 pass
215             
216             if pid is not None:
217                 # Check process liveliness
218                 if not os.path.exists("/proc/%d" % (pid,)):
219                     # Ok, it's dead, clean the socket
220                     os.remove(CTRL_SOCK)
221             
222             # try again
223             self._ctrl_sock.bind(CTRL_SOCK)
224             
225         self._ctrl_sock.listen(0)
226         
227         # Save pidfile
228         pidfile = open(CTRL_PID, "w")
229         pidfile.write(str(os.getpid()))
230         pidfile.close()
231
232         # let the parent process know that the daemonization is finished
233         os.write(w, "\n")
234         os.close(w)
235         return 1
236
237     def post_daemonize(self):
238         os.environ["NEPI_CONTROLLER_LOGLEVEL"] = self._log_level
239         # QT, for some strange reason, redefines the SIGCHILD handler to write
240         # a \0 to a fd (lets say fileno 'x'), when ever a SIGCHILD is received.
241         # Server dameonization closes all file descriptors from fileno '3',
242         # but the overloaded handler (inherited by the forked process) will
243         # keep trying to write the \0 to fileno 'x', which might have been reused 
244         # after closing, for other operations. This is bad bad bad when fileno 'x'
245         # is in use for communication pouroses, because unexpected \0 start
246         # appearing in the communication messages... this is exactly what happens 
247         # when using netns in daemonized form. Thus, be have no other alternative than
248         # restoring the SIGCHLD handler to the default here.
249         import signal
250         signal.signal(signal.SIGCHLD, signal.SIG_DFL)
251
252     def loop(self):
253         while not self._stop:
254             conn, addr = self._ctrl_sock.accept()
255             self.log_error("ACCEPTED CONNECTION: %s" % (addr,))
256             conn.settimeout(5)
257             while not self._stop:
258                 try:
259                     msg = self.recv_msg(conn)
260                 except socket.timeout, e:
261                     #self.log_error("SERVER recv_msg: connection timedout ")
262                     continue
263                 
264                 if not msg:
265                     self.log_error("CONNECTION LOST")
266                     break
267                     
268                 if msg == STOP_MSG:
269                     self._stop = True
270                     reply = self.stop_action()
271                 else:
272                     reply = self.reply_action(msg)
273                 
274                 try:
275                     self.send_reply(conn, reply)
276                 except socket.error:
277                     self.log_error()
278                     self.log_error("NOTICE: Awaiting for reconnection")
279                     break
280             try:
281                 conn.close()
282             except:
283                 # Doesn't matter
284                 self.log_error()
285
286     def recv_msg(self, conn):
287         data = [self._rdbuf]
288         chunk = data[0]
289         while '\n' not in chunk:
290             try:
291                 chunk = conn.recv(1024)
292             except (OSError, socket.error), e:
293                 if e[0] != errno.EINTR:
294                     raise
295                 else:
296                     continue
297             if chunk:
298                 data.append(chunk)
299             else:
300                 # empty chunk = EOF
301                 break
302         data = ''.join(data).split('\n',1)
303         while len(data) < 2:
304             data.append('')
305         data, self._rdbuf = data
306         
307         decoded = base64.b64decode(data)
308         return decoded.rstrip()
309
310     def send_reply(self, conn, reply):
311         encoded = base64.b64encode(reply)
312         conn.send("%s\n" % encoded)
313        
314     def cleanup(self):
315         try:
316             self._ctrl_sock.close()
317             os.remove(CTRL_SOCK)
318         except:
319             self.log_error()
320
321     def stop_action(self):
322         return "Stopping server"
323
324     def reply_action(self, msg):
325         return "Reply to: %s" % msg
326
327     def log_error(self, text = None, context = ''):
328         if text == None:
329             text = traceback.format_exc()
330         date = time.strftime("%Y-%m-%d %H:%M:%S")
331         if context:
332             context = " (%s)" % (context,)
333         sys.stderr.write("ERROR%s: %s\n%s\n" % (context, date, text))
334         return text
335
336     def log_debug(self, text):
337         if self._log_level == DC.DEBUG_LEVEL:
338             date = time.strftime("%Y-%m-%d %H:%M:%S")
339             sys.stderr.write("DEBUG: %s\n%s\n" % (date, text))
340
341 class Forwarder(object):
342     def __init__(self, root_dir = "."):
343         self._ctrl_sock = None
344         self._root_dir = root_dir
345         self._stop = False
346         self._rdbuf = ""
347
348     def forward(self):
349         self.connect()
350         print >>sys.stderr, "FORWARDER_READY."
351         while not self._stop:
352             data = self.read_data()
353             if not data:
354                 # Connection to client lost
355                 break
356             self.send_to_server(data)
357             
358             data = self.recv_from_server()
359             if not data:
360                 # Connection to server lost
361                 raise IOError, "Connection to server lost while "\
362                     "expecting response"
363             self.write_data(data)
364         self.disconnect()
365
366     def read_data(self):
367         return sys.stdin.readline()
368
369     def write_data(self, data):
370         sys.stdout.write(data)
371         # sys.stdout.write is buffered, this is why we need to do a flush()
372         sys.stdout.flush()
373
374     def send_to_server(self, data):
375         try:
376             self._ctrl_sock.send(data)
377         except (IOError, socket.error), e:
378             if e[0] == errno.EPIPE:
379                 self.connect()
380                 self._ctrl_sock.send(data)
381             else:
382                 raise e
383         encoded = data.rstrip() 
384         msg = base64.b64decode(encoded)
385         if msg == STOP_MSG:
386             self._stop = True
387
388     def recv_from_server(self):
389         data = [self._rdbuf]
390         chunk = data[0]
391         while '\n' not in chunk:
392             try:
393                 chunk = self._ctrl_sock.recv(1024)
394             except (OSError, socket.error), e:
395                 if e[0] != errno.EINTR:
396                     raise
397                 continue
398             if chunk:
399                 data.append(chunk)
400             else:
401                 # empty chunk = EOF
402                 break
403         data = ''.join(data).split('\n',1)
404         while len(data) < 2:
405             data.append('')
406         data, self._rdbuf = data
407         
408         return data+'\n'
409  
410     def connect(self):
411         self.disconnect()
412         self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
413         sock_addr = os.path.join(self._root_dir, CTRL_SOCK)
414         self._ctrl_sock.connect(sock_addr)
415
416     def disconnect(self):
417         try:
418             self._ctrl_sock.close()
419         except:
420             pass
421
422 class Client(object):
423     def __init__(self, root_dir = ".", host = None, port = None, user = None, 
424             agent = None, sudo = False, communication = DC.ACCESS_LOCAL,
425             environment_setup = ""):
426         self.root_dir = root_dir
427         self.addr = (host, port)
428         self.user = user
429         self.agent = agent
430         self.sudo = sudo
431         self.communication = communication
432         self.environment_setup = environment_setup
433         self._stopped = False
434         self._deferreds = collections.deque()
435         self.connect()
436     
437     def __del__(self):
438         if self._process.poll() is None:
439             os.kill(self._process.pid, signal.SIGTERM)
440         self._process.wait()
441         
442     def connect(self):
443         root_dir = self.root_dir
444         (host, port) = self.addr
445         user = self.user
446         agent = self.agent
447         sudo = self.sudo
448         communication = self.communication
449         
450         python_code = "from nepi.util import server;c=server.Forwarder(%r);\
451                 c.forward()" % (root_dir,)
452
453         self._process = popen_python(python_code, 
454                     communication = communication,
455                     host = host, 
456                     port = port, 
457                     user = user, 
458                     agent = agent, 
459                     sudo = sudo, 
460                     environment_setup = self.environment_setup)
461                
462         # Wait for the forwarder to be ready, otherwise nobody
463         # will be able to connect to it
464         err = []
465         helo = "nope"
466         while helo:
467             helo = self._process.stderr.readline()
468             if helo == 'FORWARDER_READY.\n':
469                 break
470             err.append(helo)
471         else:
472             raise AssertionError, "Expected 'FORWARDER_READY.', got: %s" % (''.join(err),)
473         
474     def send_msg(self, msg):
475         encoded = base64.b64encode(msg)
476         data = "%s\n" % encoded
477         
478         try:
479             self._process.stdin.write(data)
480         except (IOError, ValueError):
481             # dead process, poll it to un-zombify
482             self._process.poll()
483             
484             # try again after reconnect
485             # If it fails again, though, give up
486             self.connect()
487             self._process.stdin.write(data)
488
489     def send_stop(self):
490         self.send_msg(STOP_MSG)
491         self._stopped = True
492
493     def defer_reply(self, transform=None):
494         defer_entry = []
495         self._deferreds.append(defer_entry)
496         return defer.Defer(
497             functools.partial(self.read_reply, defer_entry, transform)
498         )
499         
500     def _read_reply(self):
501         data = self._process.stdout.readline()
502         encoded = data.rstrip() 
503         if not encoded:
504             # empty == eof == dead process, poll it to un-zombify
505             self._process.poll()
506             
507             raise RuntimeError, "Forwarder died while awaiting reply: %s" % (self._process.stderr.read(),)
508         return base64.b64decode(encoded)
509     
510     def read_reply(self, which=None, transform=None):
511         # Test to see if someone did it already
512         if which is not None and len(which):
513             # Ok, they did it...
514             # ...just return the deferred value
515             if transform:
516                 return transform(which[0])
517             else:
518                 return which[0]
519         
520         # Process all deferreds until the one we're looking for
521         # or until the queue is empty
522         while self._deferreds:
523             try:
524                 deferred = self._deferreds.popleft()
525             except IndexError:
526                 # emptied
527                 break
528             
529             deferred.append(self._read_reply())
530             if deferred is which:
531                 # We reached the one we were looking for
532                 if transform:
533                     return transform(deferred[0])
534                 else:
535                     return deferred[0]
536         
537         if which is None:
538             # They've requested a synchronous read
539             if transform:
540                 return transform(self._read_reply())
541             else:
542                 return self._read_reply()
543
544 def _make_server_key_args(server_key, host, port, args):
545     """ 
546     Returns a reference to the created temporary file, and adds the
547     corresponding arguments to the given argument list.
548     
549     Make sure to hold onto it until the process is done with the file
550     """
551     if port is not None:
552         host = '%s:%s' % (host,port)
553     # Create a temporary server key file
554     tmp_known_hosts = tempfile.NamedTemporaryFile()
555     
556     # Add the intended host key
557     tmp_known_hosts.write('%s,%s %s\n' % (host, socket.gethostbyname(host), server_key))
558     
559     # If we're not in strict mode, add user-configured keys
560     if os.environ.get('NEPI_STRICT_AUTH_MODE',"").lower() not in ('1','true','on'):
561         user_hosts_path = '%s/.ssh/known_hosts' % (os.environ.get('HOME',""),)
562         if os.access(user_hosts_path, os.R_OK):
563             f = open(user_hosts_path, "r")
564             tmp_known_hosts.write(f.read())
565             f.close()
566         
567     tmp_known_hosts.flush()
568     
569     args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
570     
571     return tmp_known_hosts
572
573 def popen_ssh_command(command, host, port, user, agent, 
574         stdin="", 
575         ident_key = None,
576         server_key = None,
577         tty = False,
578         timeout = None,
579         retry = 0,
580         err_on_timeout = True):
581     """
582     Executes a remote commands, returns ((stdout,stderr),process)
583     """
584     if TRACE:
585         print "ssh", host, command
586     
587     tmp_known_hosts = None
588     args = ['ssh',
589             # Don't bother with localhost. Makes test easier
590             '-o', 'NoHostAuthenticationForLocalhost=yes',
591             '-l', user, host]
592     if agent:
593         args.append('-A')
594     if port:
595         args.append('-p%d' % port)
596     if ident_key:
597         args.extend(('-i', ident_key))
598     if tty:
599         args.append('-t')
600     if server_key:
601         # Create a temporary server key file
602         tmp_known_hosts = _make_server_key_args(
603             server_key, host, port, args)
604     args.append(command)
605
606     while 1:
607         # connects to the remote host and starts a remote connection
608         proc = subprocess.Popen(args, 
609                 stdout = subprocess.PIPE,
610                 stdin = subprocess.PIPE, 
611                 stderr = subprocess.PIPE)
612         
613         # attach tempfile object to the process, to make sure the file stays
614         # alive until the process is finished with it
615         proc._known_hosts = tmp_known_hosts
616         
617         try:
618             out, err = _communicate(proc, stdin, timeout, err_on_timeout)
619             break
620         except RuntimeError,e:
621             if retry <= 0:
622                 raise
623             if TRACE:
624                 print " timedout -> ", e.args
625             retry -= 1
626         
627     if TRACE:
628         print " -> ", out, err
629
630     return ((out, err), proc)
631
632 def popen_scp(source, dest, 
633         port = None, 
634         agent = None, 
635         recursive = False,
636         ident_key = None,
637         server_key = None):
638     """
639     Copies from/to remote sites.
640     
641     Source and destination should have the user and host encoded
642     as per scp specs.
643     
644     If source is a file object, a special mode will be used to
645     create the remote file with the same contents.
646     
647     If dest is a file object, the remote file (source) will be
648     read and written into dest.
649     
650     In these modes, recursive cannot be True.
651     
652     Source can be a list of files to copy to a single destination,
653     in which case it is advised that the destination be a folder.
654     """
655     
656     if TRACE:
657         print "scp", source, dest
658     
659     if isinstance(source, file) and source.tell() == 0:
660         source = source.name
661     elif hasattr(source, 'read'):
662         tmp = tempfile.NamedTemporaryFile()
663         while True:
664             buf = source.read(65536)
665             if buf:
666                 tmp.write(buf)
667             else:
668                 break
669         tmp.seek(0)
670         source = tmp.name
671     
672     if isinstance(source, file) or isinstance(dest, file) \
673             or hasattr(source, 'read')  or hasattr(dest, 'write'):
674         assert not recursive
675         
676         # Parse source/destination as <user>@<server>:<path>
677         if isinstance(dest, basestring) and ':' in dest:
678             remspec, path = dest.split(':',1)
679         elif isinstance(source, basestring) and ':' in source:
680             remspec, path = source.split(':',1)
681         else:
682             raise ValueError, "Both endpoints cannot be local"
683         user,host = remspec.rsplit('@',1)
684         tmp_known_hosts = None
685         
686         args = ['ssh', '-l', user, '-C',
687                 # Don't bother with localhost. Makes test easier
688                 '-o', 'NoHostAuthenticationForLocalhost=yes',
689                 host ]
690         if port:
691             args.append('-P%d' % port)
692         if ident_key:
693             args.extend(('-i', ident_key))
694         if server_key:
695             # Create a temporary server key file
696             tmp_known_hosts = _make_server_key_args(
697                 server_key, host, port, args)
698         
699         if isinstance(source, file) or hasattr(source, 'read'):
700             args.append('cat > %s' % (shell_escape(path),))
701         elif isinstance(dest, file) or hasattr(dest, 'write'):
702             args.append('cat %s' % (shell_escape(path),))
703         else:
704             raise AssertionError, "Unreachable code reached! :-Q"
705         
706         # connects to the remote host and starts a remote connection
707         if isinstance(source, file):
708             proc = subprocess.Popen(args, 
709                     stdout = open('/dev/null','w'),
710                     stderr = subprocess.PIPE,
711                     stdin = source)
712             err = proc.stderr.read()
713             proc._known_hosts = tmp_known_hosts
714             eintr_retry(proc.wait)()
715             return ((None,err), proc)
716         elif isinstance(dest, file):
717             proc = subprocess.Popen(args, 
718                     stdout = open('/dev/null','w'),
719                     stderr = subprocess.PIPE,
720                     stdin = source)
721             err = proc.stderr.read()
722             proc._known_hosts = tmp_known_hosts
723             eintr_retry(proc.wait)()
724             return ((None,err), proc)
725         elif hasattr(source, 'read'):
726             # file-like (but not file) source
727             proc = subprocess.Popen(args, 
728                     stdout = open('/dev/null','w'),
729                     stderr = subprocess.PIPE,
730                     stdin = subprocess.PIPE)
731             
732             buf = None
733             err = []
734             while True:
735                 if not buf:
736                     buf = source.read(4096)
737                 if not buf:
738                     #EOF
739                     break
740                 
741                 rdrdy, wrdy, broken = select.select(
742                     [proc.stderr],
743                     [proc.stdin],
744                     [proc.stderr,proc.stdin])
745                 
746                 if proc.stderr in rdrdy:
747                     # use os.read for fully unbuffered behavior
748                     err.append(os.read(proc.stderr.fileno(), 4096))
749                 
750                 if proc.stdin in wrdy:
751                     proc.stdin.write(buf)
752                     buf = None
753                 
754                 if broken:
755                     break
756             proc.stdin.close()
757             err.append(proc.stderr.read())
758                 
759             proc._known_hosts = tmp_known_hosts
760             eintr_retry(proc.wait)()
761             return ((None,''.join(err)), proc)
762         elif hasattr(dest, 'write'):
763             # file-like (but not file) dest
764             proc = subprocess.Popen(args, 
765                     stdout = subprocess.PIPE,
766                     stderr = subprocess.PIPE,
767                     stdin = open('/dev/null','w'))
768             
769             buf = None
770             err = []
771             while True:
772                 rdrdy, wrdy, broken = select.select(
773                     [proc.stderr, proc.stdout],
774                     [],
775                     [proc.stderr, proc.stdout])
776                 
777                 if proc.stderr in rdrdy:
778                     # use os.read for fully unbuffered behavior
779                     err.append(os.read(proc.stderr.fileno(), 4096))
780                 
781                 if proc.stdout in rdrdy:
782                     # use os.read for fully unbuffered behavior
783                     buf = os.read(proc.stdout.fileno(), 4096)
784                     dest.write(buf)
785                     
786                     if not buf:
787                         #EOF
788                         break
789                 
790                 if broken:
791                     break
792             err.append(proc.stderr.read())
793                 
794             proc._known_hosts = tmp_known_hosts
795             eintr_retry(proc.wait)()
796             return ((None,''.join(err)), proc)
797         else:
798             raise AssertionError, "Unreachable code reached! :-Q"
799     else:
800         # Parse destination as <user>@<server>:<path>
801         if isinstance(dest, basestring) and ':' in dest:
802             remspec, path = dest.split(':',1)
803         elif isinstance(source, basestring) and ':' in source:
804             remspec, path = source.split(':',1)
805         else:
806             raise ValueError, "Both endpoints cannot be local"
807         user,host = remspec.rsplit('@',1)
808         
809         # plain scp
810         tmp_known_hosts = None
811         args = ['scp', '-q', '-p', '-C',
812                 # Don't bother with localhost. Makes test easier
813                 '-o', 'NoHostAuthenticationForLocalhost=yes' ]
814         if port:
815             args.append('-P%d' % port)
816         if recursive:
817             args.append('-r')
818         if ident_key:
819             args.extend(('-i', ident_key))
820         if server_key:
821             # Create a temporary server key file
822             tmp_known_hosts = _make_server_key_args(
823                 server_key, host, port, args)
824         if isinstance(source,list):
825             args.extend(source)
826         else:
827             args.append(source)
828         args.append(dest)
829
830         # connects to the remote host and starts a remote connection
831         proc = subprocess.Popen(args, 
832                 stdout = subprocess.PIPE,
833                 stdin = subprocess.PIPE, 
834                 stderr = subprocess.PIPE)
835         proc._known_hosts = tmp_known_hosts
836         
837         comm = proc.communicate()
838         eintr_retry(proc.wait)()
839         return (comm, proc)
840
841 def decode_and_execute():
842     # The python code we want to execute might have characters that 
843     # are not compatible with the 'inline' mode we are using. To avoid
844     # problems we receive the encoded python code in base64 as a input 
845     # stream and decode it for execution.
846     import base64, os
847     cmd = ""
848     while True:
849         try:
850             cmd += os.read(0, 1)# one byte from stdin
851         except OSError, e:            
852             if e.errno == errno.EINTR:
853                 continue
854             else:
855                 raise
856         if cmd[-1] == "\n": 
857             break
858     cmd = base64.b64decode(cmd)
859     # Uncomment for debug
860     #os.write(2, "Executing python code: %s\n" % cmd)
861     os.write(1, "OK\n") # send a sync message
862     exec(cmd)
863
864 def popen_python(python_code, 
865         communication = DC.ACCESS_LOCAL,
866         host = None, 
867         port = None, 
868         user = None, 
869         agent = False, 
870         python_path = None,
871         ident_key = None,
872         server_key = None,
873         tty = False,
874         sudo = False, 
875         environment_setup = ""):
876
877     cmd = ""
878     if python_path:
879         python_path.replace("'", r"'\''")
880         cmd = """PYTHONPATH="$PYTHONPATH":'%s' """ % python_path
881         cmd += " ; "
882     if environment_setup:
883         cmd += environment_setup
884         cmd += " ; "
885     # Uncomment for debug (to run everything under strace)
886     # We had to verify if strace works (cannot nest them)
887     #cmd += "if strace echo >/dev/null 2>&1; then CMD='strace -ff -tt -s 200 -o strace.out'; else CMD=''; fi\n"
888     #cmd += "$CMD "
889     #cmd += "strace -f -tt -s 200 -o strace$$.out "
890     import nepi
891     cmd += "python -c 'import sys; sys.path.insert(0,%s); from nepi.util import server; server.decode_and_execute()'" % (
892         repr(os.path.dirname(os.path.dirname(nepi.__file__))).replace("'",'"'),
893     )
894
895     if sudo:
896         if ';' in cmd:
897             cmd = "sudo bash -c " + shell_escape(cmd)
898         else:
899             cmd = "sudo " + cmd
900
901     if communication == DC.ACCESS_SSH:
902         tmp_known_hosts = None
903         args = ['ssh',
904                 # Don't bother with localhost. Makes test easier
905                 '-o', 'NoHostAuthenticationForLocalhost=yes',
906                 '-l', user, host]
907         if agent:
908             args.append('-A')
909         if port:
910             args.append('-p%d' % port)
911         if ident_key:
912             args.extend(('-i', ident_key))
913         if tty:
914             args.append('-t')
915         if server_key:
916             # Create a temporary server key file
917             tmp_known_hosts = _make_server_key_args(
918                 server_key, host, port, args)
919         args.append(cmd)
920     else:
921         args = [ "/bin/bash", "-c", cmd ]
922
923     # connects to the remote host and starts a remote
924     proc = subprocess.Popen(args,
925             shell = False, 
926             stdout = subprocess.PIPE,
927             stdin = subprocess.PIPE, 
928             stderr = subprocess.PIPE)
929
930     if communication == DC.ACCESS_SSH:
931         proc._known_hosts = tmp_known_hosts
932
933     # send the command to execute
934     os.write(proc.stdin.fileno(),
935             base64.b64encode(python_code) + "\n")
936  
937     while True: 
938         try:
939             msg = os.read(proc.stdout.fileno(), 3)
940             break
941         except OSError, e:            
942             if e.errno == errno.EINTR:
943                 continue
944             else:
945                 raise
946     
947     if msg != "OK\n":
948         raise RuntimeError, "Failed to start remote python interpreter: \nout:\n%s%s\nerr:\n%s" % (
949             msg, proc.stdout.read(), proc.stderr.read())
950
951     return proc
952
953 # POSIX
954 def _communicate(self, input, timeout=None, err_on_timeout=True):
955     read_set = []
956     write_set = []
957     stdout = None # Return
958     stderr = None # Return
959     
960     killed = False
961     
962     if timeout is not None:
963         timelimit = time.time() + timeout
964         killtime = timelimit + 4
965         bailtime = timelimit + 4
966
967     if self.stdin:
968         # Flush stdio buffer.  This might block, if the user has
969         # been writing to .stdin in an uncontrolled fashion.
970         self.stdin.flush()
971         if input:
972             write_set.append(self.stdin)
973         else:
974             self.stdin.close()
975     if self.stdout:
976         read_set.append(self.stdout)
977         stdout = []
978     if self.stderr:
979         read_set.append(self.stderr)
980         stderr = []
981
982     input_offset = 0
983     while read_set or write_set:
984         if timeout is not None:
985             curtime = time.time()
986             if timeout is None or curtime > timelimit:
987                 if curtime > bailtime:
988                     break
989                 elif curtime > killtime:
990                     signum = signal.SIGKILL
991                 else:
992                     signum = signal.SIGTERM
993                 # Lets kill it
994                 os.kill(self.pid, signum)
995                 select_timeout = 0.5
996             else:
997                 select_timeout = timelimit - curtime + 0.1
998         else:
999             select_timeout = None
1000             
1001         try:
1002             rlist, wlist, xlist = select.select(read_set, write_set, [], select_timeout)
1003         except select.error,e:
1004             if e[0] != 4:
1005                 raise
1006             else:
1007                 continue
1008
1009         if self.stdin in wlist:
1010             # When select has indicated that the file is writable,
1011             # we can write up to PIPE_BUF bytes without risk
1012             # blocking.  POSIX defines PIPE_BUF >= 512
1013             bytes_written = os.write(self.stdin.fileno(), buffer(input, input_offset, 512))
1014             input_offset += bytes_written
1015             if input_offset >= len(input):
1016                 self.stdin.close()
1017                 write_set.remove(self.stdin)
1018
1019         if self.stdout in rlist:
1020             data = os.read(self.stdout.fileno(), 1024)
1021             if data == "":
1022                 self.stdout.close()
1023                 read_set.remove(self.stdout)
1024             stdout.append(data)
1025
1026         if self.stderr in rlist:
1027             data = os.read(self.stderr.fileno(), 1024)
1028             if data == "":
1029                 self.stderr.close()
1030                 read_set.remove(self.stderr)
1031             stderr.append(data)
1032     
1033     # All data exchanged.  Translate lists into strings.
1034     if stdout is not None:
1035         stdout = ''.join(stdout)
1036     if stderr is not None:
1037         stderr = ''.join(stderr)
1038
1039     # Translate newlines, if requested.  We cannot let the file
1040     # object do the translation: It is based on stdio, which is
1041     # impossible to combine with select (unless forcing no
1042     # buffering).
1043     if self.universal_newlines and hasattr(file, 'newlines'):
1044         if stdout:
1045             stdout = self._translate_newlines(stdout)
1046         if stderr:
1047             stderr = self._translate_newlines(stderr)
1048
1049     if killed and err_on_timeout:
1050         errcode = self.poll()
1051         raise RuntimeError, ("Operation timed out", errcode, stdout, stderr)
1052     else:
1053         if killed:
1054             self.poll()
1055         else:
1056             self.wait()
1057         return (stdout, stderr)
1058