Clean up of repo
[nepi.git] / src / nepi / util / server.py
1 # -*- coding: utf-8 -*-
2
3 from nepi.util.constants import DeploymentConfiguration as DC
4
5 import base64
6 import errno
7 import os
8 import os.path
9 import resource
10 import select
11 import shutil
12 import signal
13 import socket
14 import sys
15 import subprocess
16 import threading
17 import time
18 import traceback
19 import re
20 import tempfile
21 import defer
22 import functools
23 import collections
24 import hashlib
25
26 CTRL_SOCK = "ctrl.sock"
27 CTRL_PID = "ctrl.pid"
28 STD_ERR = "stderr.log"
29 MAX_FD = 1024
30
31 STOP_MSG = "STOP"
32
33 TRACE = os.environ.get("NEPI_TRACE", "false").lower() in ("true", "1", "on")
34
35 OPENSSH_HAS_PERSIST = None
36
37 if hasattr(os, "devnull"):
38     DEV_NULL = os.devnull
39 else:
40     DEV_NULL = "/dev/null"
41
42 SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
43
44 hostbyname_cache = dict()
45
46 def gethostbyname(host):
47     hostbyname = hostbyname_cache.get(host)
48     if not hostbyname:
49         hostbyname = socket.gethostbyname(host)
50         hostbyname_cache[host] = hostbyname
51     return hostbyname
52
53 def openssh_has_persist():
54     global OPENSSH_HAS_PERSIST
55     if OPENSSH_HAS_PERSIST is None:
56         proc = subprocess.Popen(["ssh","-v"],
57             stdout = subprocess.PIPE,
58             stderr = subprocess.STDOUT,
59             stdin = open("/dev/null","r") )
60         out,err = proc.communicate()
61         proc.wait()
62         
63         vre = re.compile(r'OpenSSH_(?:[6-9]|5[.][8-9]|5[.][1-9][0-9]|[1-9][0-9]).*', re.I)
64         OPENSSH_HAS_PERSIST = bool(vre.match(out))
65     return OPENSSH_HAS_PERSIST
66
67 def shell_escape(s):
68     """ Escapes strings so that they are safe to use as command-line arguments """
69     if SHELL_SAFE.match(s):
70         # safe string - no escaping needed
71         return s
72     else:
73         # unsafe string - escape
74         def escp(c):
75             if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",'"'):
76                 return c
77             else:
78                 return "'$'\\x%02x''" % (ord(c),)
79         s = ''.join(map(escp,s))
80         return "'%s'" % (s,)
81
82 def eintr_retry(func):
83     import functools
84     @functools.wraps(func)
85     def rv(*p, **kw):
86         retry = kw.pop("_retry", False)
87         for i in xrange(0 if retry else 4):
88             try:
89                 return func(*p, **kw)
90             except (select.error, socket.error), args:
91                 if args[0] == errno.EINTR:
92                     continue
93                 else:
94                     raise 
95             except OSError, e:
96                 if e.errno == errno.EINTR:
97                     continue
98                 else:
99                     raise
100         else:
101             return func(*p, **kw)
102     return rv
103
104 class Server(object):
105     def __init__(self, root_dir = ".", log_level = DC.ERROR_LEVEL, 
106             environment_setup = "", clean_root = False):
107         self._root_dir = root_dir
108         self._clean_root = clean_root
109         self._stop = False
110         self._ctrl_sock = None
111         self._log_level = log_level
112         self._rdbuf = ""
113         self._environment_setup = environment_setup
114
115     def run(self):
116         try:
117             if self.daemonize():
118                 self.post_daemonize()
119                 self.loop()
120                 self.cleanup()
121                 # ref: "os._exit(0)"
122                 # can not return normally after fork beacuse no exec was done.
123                 # This means that if we don't do a os._exit(0) here the code that 
124                 # follows the call to "Server.run()" in the "caller code" will be 
125                 # executed... but by now it has already been executed after the 
126                 # first process (the one that did the first fork) returned.
127                 os._exit(0)
128         except:
129             print >>sys.stderr, "SERVER_ERROR."
130             self.log_error()
131             self.cleanup()
132             os._exit(0)
133         print >>sys.stderr, "SERVER_READY."
134
135     def daemonize(self):
136         # pipes for process synchronization
137         (r, w) = os.pipe()
138         
139         # build root folder
140         root = os.path.normpath(self._root_dir)
141         if self._root_dir not in [".", ""] and os.path.exists(root) \
142                 and self._clean_root:
143             shutil.rmtree(root)
144         if not os.path.exists(root):
145             os.makedirs(root, 0755)
146
147         pid1 = os.fork()
148         if pid1 > 0:
149             os.close(w)
150             while True:
151                 try:
152                     os.read(r, 1)
153                 except OSError, e: # pragma: no cover
154                     if e.errno == errno.EINTR:
155                         continue
156                     else:
157                         raise
158                 break
159             os.close(r)
160             # os.waitpid avoids leaving a <defunc> (zombie) process
161             st = os.waitpid(pid1, 0)[1]
162             if st:
163                 raise RuntimeError("Daemonization failed")
164             # return 0 to inform the caller method that this is not the 
165             # daemonized process
166             return 0
167         os.close(r)
168
169         # Decouple from parent environment.
170         os.chdir(self._root_dir)
171         os.umask(0)
172         os.setsid()
173
174         # fork 2
175         pid2 = os.fork()
176         if pid2 > 0:
177             # see ref: "os._exit(0)"
178             os._exit(0)
179
180         # close all open file descriptors.
181         max_fd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
182         if (max_fd == resource.RLIM_INFINITY):
183             max_fd = MAX_FD
184         for fd in range(3, max_fd):
185             if fd != w:
186                 try:
187                     os.close(fd)
188                 except OSError:
189                     pass
190
191         # Redirect standard file descriptors.
192         stdin = open(DEV_NULL, "r")
193         stderr = stdout = open(STD_ERR, "a", 0)
194         os.dup2(stdin.fileno(), sys.stdin.fileno())
195         # NOTE: sys.stdout.write will still be buffered, even if the file
196         # was opened with 0 buffer
197         os.dup2(stdout.fileno(), sys.stdout.fileno())
198         os.dup2(stderr.fileno(), sys.stderr.fileno())
199         
200         # setup environment
201         if self._environment_setup:
202             # parse environment variables and pass to child process
203             # do it by executing shell commands, in case there's some heavy setup involved
204             envproc = subprocess.Popen(
205                 [ "bash", "-c", 
206                     "( %s python -c 'import os,sys ; print \"\\x01\".join(\"\\x02\".join(map(str,x)) for x in os.environ.iteritems())' ) | tail -1" %
207                         ( self._environment_setup, ) ],
208                 stdin = subprocess.PIPE, 
209                 stdout = subprocess.PIPE,
210                 stderr = subprocess.PIPE
211             )
212             out,err = envproc.communicate()
213
214             # parse new environment
215             if out:
216                 environment = dict(map(lambda x:x.split("\x02"), out.split("\x01")))
217             
218                 # apply to current environment
219                 for name, value in environment.iteritems():
220                     os.environ[name] = value
221                 
222                 # apply pythonpath
223                 if 'PYTHONPATH' in environment:
224                     sys.path = environment['PYTHONPATH'].split(':') + sys.path
225
226         # create control socket
227         self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
228         try:
229             self._ctrl_sock.bind(CTRL_SOCK)
230         except socket.error:
231             # Address in use, check pidfile
232             pid = None
233             try:
234                 pidfile = open(CTRL_PID, "r")
235                 pid = pidfile.read()
236                 pidfile.close()
237                 pid = int(pid)
238             except:
239                 # no pidfile
240                 pass
241             
242             if pid is not None:
243                 # Check process liveliness
244                 if not os.path.exists("/proc/%d" % (pid,)):
245                     # Ok, it's dead, clean the socket
246                     os.remove(CTRL_SOCK)
247             
248             # try again
249             self._ctrl_sock.bind(CTRL_SOCK)
250             
251         self._ctrl_sock.listen(0)
252         
253         # Save pidfile
254         pidfile = open(CTRL_PID, "w")
255         pidfile.write(str(os.getpid()))
256         pidfile.close()
257
258         # let the parent process know that the daemonization is finished
259         os.write(w, "\n")
260         os.close(w)
261         return 1
262
263     def post_daemonize(self):
264         os.environ["NEPI_CONTROLLER_LOGLEVEL"] = self._log_level
265         # QT, for some strange reason, redefines the SIGCHILD handler to write
266         # a \0 to a fd (lets say fileno 'x'), when ever a SIGCHILD is received.
267         # Server dameonization closes all file descriptors from fileno '3',
268         # but the overloaded handler (inherited by the forked process) will
269         # keep trying to write the \0 to fileno 'x', which might have been reused 
270         # after closing, for other operations. This is bad bad bad when fileno 'x'
271         # is in use for communication pouroses, because unexpected \0 start
272         # appearing in the communication messages... this is exactly what happens 
273         # when using netns in daemonized form. Thus, be have no other alternative than
274         # restoring the SIGCHLD handler to the default here.
275         import signal
276         signal.signal(signal.SIGCHLD, signal.SIG_DFL)
277
278     def loop(self):
279         while not self._stop:
280             conn, addr = self._ctrl_sock.accept()
281             self.log_error("ACCEPTED CONNECTION: %s" % (addr,))
282             conn.settimeout(5)
283             while not self._stop:
284                 try:
285                     msg = self.recv_msg(conn)
286                 except socket.timeout, e:
287                     #self.log_error("SERVER recv_msg: connection timedout ")
288                     continue
289                 
290                 if not msg:
291                     self.log_error("CONNECTION LOST")
292                     break
293                     
294                 if msg == STOP_MSG:
295                     self._stop = True
296                     reply = self.stop_action()
297                 else:
298                     reply = self.reply_action(msg)
299                 
300                 try:
301                     self.send_reply(conn, reply)
302                 except socket.error:
303                     self.log_error()
304                     self.log_error("NOTICE: Awaiting for reconnection")
305                     break
306             try:
307                 conn.close()
308             except:
309                 # Doesn't matter
310                 self.log_error()
311
312     def recv_msg(self, conn):
313         data = [self._rdbuf]
314         chunk = data[0]
315         while '\n' not in chunk:
316             try:
317                 chunk = conn.recv(1024)
318             except (OSError, socket.error), e:
319                 if e[0] != errno.EINTR:
320                     raise
321                 else:
322                     continue
323             if chunk:
324                 data.append(chunk)
325             else:
326                 # empty chunk = EOF
327                 break
328         data = ''.join(data).split('\n',1)
329         while len(data) < 2:
330             data.append('')
331         data, self._rdbuf = data
332         
333         decoded = base64.b64decode(data)
334         return decoded.rstrip()
335
336     def send_reply(self, conn, reply):
337         encoded = base64.b64encode(reply)
338         conn.send("%s\n" % encoded)
339        
340     def cleanup(self):
341         try:
342             self._ctrl_sock.close()
343             os.remove(CTRL_SOCK)
344         except:
345             self.log_error()
346
347     def stop_action(self):
348         return "Stopping server"
349
350     def reply_action(self, msg):
351         return "Reply to: %s" % msg
352
353     def log_error(self, text = None, context = ''):
354         if text == None:
355             text = traceback.format_exc()
356         date = time.strftime("%Y-%m-%d %H:%M:%S")
357         if context:
358             context = " (%s)" % (context,)
359         sys.stderr.write("ERROR%s: %s\n%s\n" % (context, date, text))
360         return text
361
362     def log_debug(self, text):
363         if self._log_level == DC.DEBUG_LEVEL:
364             date = time.strftime("%Y-%m-%d %H:%M:%S")
365             sys.stderr.write("DEBUG: %s\n%s\n" % (date, text))
366
367 class Forwarder(object):
368     def __init__(self, root_dir = "."):
369         self._ctrl_sock = None
370         self._root_dir = root_dir
371         self._stop = False
372         self._rdbuf = ""
373
374     def forward(self):
375         self.connect()
376         print >>sys.stderr, "FORWARDER_READY."
377         while not self._stop:
378             data = self.read_data()
379             if not data:
380                 # Connection to client lost
381                 break
382             self.send_to_server(data)
383             
384             data = self.recv_from_server()
385             if not data:
386                 # Connection to server lost
387                 raise IOError, "Connection to server lost while "\
388                     "expecting response"
389             self.write_data(data)
390         self.disconnect()
391
392     def read_data(self):
393         return sys.stdin.readline()
394
395     def write_data(self, data):
396         sys.stdout.write(data)
397         # sys.stdout.write is buffered, this is why we need to do a flush()
398         sys.stdout.flush()
399
400     def send_to_server(self, data):
401         try:
402             self._ctrl_sock.send(data)
403         except (IOError, socket.error), e:
404             if e[0] == errno.EPIPE:
405                 self.connect()
406                 self._ctrl_sock.send(data)
407             else:
408                 raise e
409         encoded = data.rstrip() 
410         msg = base64.b64decode(encoded)
411         if msg == STOP_MSG:
412             self._stop = True
413
414     def recv_from_server(self):
415         data = [self._rdbuf]
416         chunk = data[0]
417         while '\n' not in chunk:
418             try:
419                 chunk = self._ctrl_sock.recv(1024)
420             except (OSError, socket.error), e:
421                 if e[0] != errno.EINTR:
422                     raise
423                 continue
424             if chunk:
425                 data.append(chunk)
426             else:
427                 # empty chunk = EOF
428                 break
429         data = ''.join(data).split('\n',1)
430         while len(data) < 2:
431             data.append('')
432         data, self._rdbuf = data
433         
434         return data+'\n'
435  
436     def connect(self):
437         self.disconnect()
438         self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
439         sock_addr = os.path.join(self._root_dir, CTRL_SOCK)
440         self._ctrl_sock.connect(sock_addr)
441
442     def disconnect(self):
443         try:
444             self._ctrl_sock.close()
445         except:
446             pass
447
448 class Client(object):
449     def __init__(self, root_dir = ".", host = None, port = None, user = None, 
450             agent = None, sudo = False, communication = DC.ACCESS_LOCAL,
451             environment_setup = ""):
452         self.root_dir = root_dir
453         self.addr = (host, port)
454         self.user = user
455         self.agent = agent
456         self.sudo = sudo
457         self.communication = communication
458         self.environment_setup = environment_setup
459         self._stopped = False
460         self._deferreds = collections.deque()
461         self.connect()
462     
463     def __del__(self):
464         if self._process.poll() is None:
465             os.kill(self._process.pid, signal.SIGTERM)
466         self._process.wait()
467         
468     def connect(self):
469         root_dir = self.root_dir
470         (host, port) = self.addr
471         user = self.user
472         agent = self.agent
473         sudo = self.sudo
474         communication = self.communication
475         
476         python_code = "from nepi.util import server;c=server.Forwarder(%r);\
477                 c.forward()" % (root_dir,)
478
479         self._process = popen_python(python_code, 
480                     communication = communication,
481                     host = host, 
482                     port = port, 
483                     user = user, 
484                     agent = agent, 
485                     sudo = sudo, 
486                     environment_setup = self.environment_setup)
487                
488         # Wait for the forwarder to be ready, otherwise nobody
489         # will be able to connect to it
490         err = []
491         helo = "nope"
492         while helo:
493             helo = self._process.stderr.readline()
494             if helo == 'FORWARDER_READY.\n':
495                 break
496             err.append(helo)
497         else:
498             raise AssertionError, "Expected 'FORWARDER_READY.', got: %s" % (''.join(err),)
499         
500     def send_msg(self, msg):
501         encoded = base64.b64encode(msg)
502         data = "%s\n" % encoded
503         
504         try:
505             self._process.stdin.write(data)
506         except (IOError, ValueError):
507             # dead process, poll it to un-zombify
508             self._process.poll()
509             
510             # try again after reconnect
511             # If it fails again, though, give up
512             self.connect()
513             self._process.stdin.write(data)
514
515     def send_stop(self):
516         self.send_msg(STOP_MSG)
517         self._stopped = True
518
519     def defer_reply(self, transform=None):
520         defer_entry = []
521         self._deferreds.append(defer_entry)
522         return defer.Defer(
523             functools.partial(self.read_reply, defer_entry, transform)
524         )
525         
526     def _read_reply(self):
527         data = self._process.stdout.readline()
528         encoded = data.rstrip() 
529         if not encoded:
530             # empty == eof == dead process, poll it to un-zombify
531             self._process.poll()
532             
533             raise RuntimeError, "Forwarder died while awaiting reply: %s" % (self._process.stderr.read(),)
534         return base64.b64decode(encoded)
535     
536     def read_reply(self, which=None, transform=None):
537         # Test to see if someone did it already
538         if which is not None and len(which):
539             # Ok, they did it...
540             # ...just return the deferred value
541             if transform:
542                 return transform(which[0])
543             else:
544                 return which[0]
545         
546         # Process all deferreds until the one we're looking for
547         # or until the queue is empty
548         while self._deferreds:
549             try:
550                 deferred = self._deferreds.popleft()
551             except IndexError:
552                 # emptied
553                 break
554             
555             deferred.append(self._read_reply())
556             if deferred is which:
557                 # We reached the one we were looking for
558                 if transform:
559                     return transform(deferred[0])
560                 else:
561                     return deferred[0]
562         
563         if which is None:
564             # They've requested a synchronous read
565             if transform:
566                 return transform(self._read_reply())
567             else:
568                 return self._read_reply()
569
570 def _make_server_key_args(server_key, host, port, args):
571     """ 
572     Returns a reference to the created temporary file, and adds the
573     corresponding arguments to the given argument list.
574     
575     Make sure to hold onto it until the process is done with the file
576     """
577     if port is not None:
578         host = '%s:%s' % (host,port)
579     # Create a temporary server key file
580     tmp_known_hosts = tempfile.NamedTemporaryFile()
581    
582     hostbyname = gethostbyname(host) 
583
584     # Add the intended host key
585     tmp_known_hosts.write('%s,%s %s\n' % (host, hostbyname, server_key))
586     
587     # If we're not in strict mode, add user-configured keys
588     if os.environ.get('NEPI_STRICT_AUTH_MODE',"").lower() not in ('1','true','on'):
589         user_hosts_path = '%s/.ssh/known_hosts' % (os.environ.get('HOME',""),)
590         if os.access(user_hosts_path, os.R_OK):
591             f = open(user_hosts_path, "r")
592             tmp_known_hosts.write(f.read())
593             f.close()
594         
595     tmp_known_hosts.flush()
596     
597     args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
598     
599     return tmp_known_hosts
600
601 def popen_ssh_command(command, host, port, user, agent, 
602         stdin="", 
603         ident_key = None,
604         server_key = None,
605         tty = False,
606         timeout = None,
607         retry = 0,
608         err_on_timeout = True,
609         connect_timeout = 60,
610         persistent = True,
611         hostip = None):
612     """
613     Executes a remote commands, returns ((stdout,stderr),process)
614     """
615    
616     tmp_known_hosts = None
617     args = ['ssh', '-C',
618             # Don't bother with localhost. Makes test easier
619             '-o', 'NoHostAuthenticationForLocalhost=yes',
620             '-o', 'ConnectTimeout=%d' % (int(connect_timeout),),
621             '-o', 'ConnectionAttempts=3',
622             '-o', 'ServerAliveInterval=30',
623             '-o', 'TCPKeepAlive=yes',
624             '-l', user, hostip or host]
625     if persistent and openssh_has_persist():
626         args.extend([
627             '-o', 'ControlMaster=auto',
628             '-o', 'ControlPath=/tmp/nepi_ssh-%r@%h:%p',
629             '-o', 'ControlPersist=60' ])
630     if agent:
631         args.append('-A')
632     if port:
633         args.append('-p%d' % port)
634     if ident_key:
635         args.extend(('-i', ident_key))
636     if tty:
637         args.append('-t')
638         args.append('-t')
639     if server_key:
640         # Create a temporary server key file
641         tmp_known_hosts = _make_server_key_args(
642             server_key, host, port, args)
643     args.append(command)
644
645     for x in xrange(retry or 3):
646         # connects to the remote host and starts a remote connection
647         proc = subprocess.Popen(args, 
648                 stdout = subprocess.PIPE,
649                 stdin = subprocess.PIPE, 
650                 stderr = subprocess.PIPE)
651         
652         # attach tempfile object to the process, to make sure the file stays
653         # alive until the process is finished with it
654         proc._known_hosts = tmp_known_hosts
655     
656         try:
657             out, err = _communicate(proc, stdin, timeout, err_on_timeout)
658             if TRACE:
659                 print "COMMAND host %s, command %s, out %s, error %s" % (host, " ".join(args), out, err)
660
661             if proc.poll():
662                 if err.strip().startswith('ssh: ') or err.strip().startswith('mux_client_hello_exchange: '):
663                     # SSH error, can safely retry
664                     continue
665                 elif retry:
666                     # Probably timed out or plain failed but can retry
667                     continue
668             break
669         except RuntimeError,e:
670             if TRACE:
671                 print "EXCEPTION host %s, command %s, out %s, error %s, exception TIMEOUT ->  %s" % (
672                         host, " ".join(args), out, err, e.args)
673
674             if retry <= 0:
675                 raise
676             retry -= 1
677         
678     return ((out, err), proc)
679
680 def popen_scp(source, dest, 
681         port = None, 
682         agent = None, 
683         recursive = False,
684         ident_key = None,
685         server_key = None):
686     """
687     Copies from/to remote sites.
688     
689     Source and destination should have the user and host encoded
690     as per scp specs.
691     
692     If source is a file object, a special mode will be used to
693     create the remote file with the same contents.
694     
695     If dest is a file object, the remote file (source) will be
696     read and written into dest.
697     
698     In these modes, recursive cannot be True.
699     
700     Source can be a list of files to copy to a single destination,
701     in which case it is advised that the destination be a folder.
702     """
703     
704     if TRACE:
705         print "scp", source, dest
706     
707     if isinstance(source, file) and source.tell() == 0:
708         source = source.name
709     elif hasattr(source, 'read'):
710         tmp = tempfile.NamedTemporaryFile()
711         while True:
712             buf = source.read(65536)
713             if buf:
714                 tmp.write(buf)
715             else:
716                 break
717         tmp.seek(0)
718         source = tmp.name
719     
720     if isinstance(source, file) or isinstance(dest, file) \
721             or hasattr(source, 'read')  or hasattr(dest, 'write'):
722         assert not recursive
723         
724         # Parse source/destination as <user>@<server>:<path>
725         if isinstance(dest, basestring) and ':' in dest:
726             remspec, path = dest.split(':',1)
727         elif isinstance(source, basestring) and ':' in source:
728             remspec, path = source.split(':',1)
729         else:
730             raise ValueError, "Both endpoints cannot be local"
731         user,host = remspec.rsplit('@',1)
732         tmp_known_hosts = None
733         
734         args = ['ssh', '-l', user, '-C',
735                 # Don't bother with localhost. Makes test easier
736                 '-o', 'NoHostAuthenticationForLocalhost=yes',
737                 '-o', 'ConnectTimeout=60',
738                 '-o', 'ConnectionAttempts=3',
739                 '-o', 'ServerAliveInterval=30',
740                 '-o', 'TCPKeepAlive=yes',
741                 host ]
742         if openssh_has_persist():
743             args.extend([
744                 '-o', 'ControlMaster=auto',
745                 '-o', 'ControlPath=/tmp/nepi_ssh-%r@%h:%p',
746                 '-o', 'ControlPersist=60' ])
747         if port:
748             args.append('-P%d' % port)
749         if ident_key:
750             args.extend(('-i', ident_key))
751         if server_key:
752             # Create a temporary server key file
753             tmp_known_hosts = _make_server_key_args(
754                 server_key, host, port, args)
755         
756         if isinstance(source, file) or hasattr(source, 'read'):
757             args.append('cat > %s' % (shell_escape(path),))
758         elif isinstance(dest, file) or hasattr(dest, 'write'):
759             args.append('cat %s' % (shell_escape(path),))
760         else:
761             raise AssertionError, "Unreachable code reached! :-Q"
762         
763         # connects to the remote host and starts a remote connection
764         if isinstance(source, file):
765             proc = subprocess.Popen(args, 
766                     stdout = open('/dev/null','w'),
767                     stderr = subprocess.PIPE,
768                     stdin = source)
769             err = proc.stderr.read()
770             proc._known_hosts = tmp_known_hosts
771             eintr_retry(proc.wait)()
772             return ((None,err), proc)
773         elif isinstance(dest, file):
774             proc = subprocess.Popen(args, 
775                     stdout = open('/dev/null','w'),
776                     stderr = subprocess.PIPE,
777                     stdin = source)
778             err = proc.stderr.read()
779             proc._known_hosts = tmp_known_hosts
780             eintr_retry(proc.wait)()
781             return ((None,err), proc)
782         elif hasattr(source, 'read'):
783             # file-like (but not file) source
784             proc = subprocess.Popen(args, 
785                     stdout = open('/dev/null','w'),
786                     stderr = subprocess.PIPE,
787                     stdin = subprocess.PIPE)
788             
789             buf = None
790             err = []
791             while True:
792                 if not buf:
793                     buf = source.read(4096)
794                 if not buf:
795                     #EOF
796                     break
797                 
798                 rdrdy, wrdy, broken = select.select(
799                     [proc.stderr],
800                     [proc.stdin],
801                     [proc.stderr,proc.stdin])
802                 
803                 if proc.stderr in rdrdy:
804                     # use os.read for fully unbuffered behavior
805                     err.append(os.read(proc.stderr.fileno(), 4096))
806                 
807                 if proc.stdin in wrdy:
808                     proc.stdin.write(buf)
809                     buf = None
810                 
811                 if broken:
812                     break
813             proc.stdin.close()
814             err.append(proc.stderr.read())
815                 
816             proc._known_hosts = tmp_known_hosts
817             eintr_retry(proc.wait)()
818             return ((None,''.join(err)), proc)
819         elif hasattr(dest, 'write'):
820             # file-like (but not file) dest
821             proc = subprocess.Popen(args, 
822                     stdout = subprocess.PIPE,
823                     stderr = subprocess.PIPE,
824                     stdin = open('/dev/null','w'))
825             
826             buf = None
827             err = []
828             while True:
829                 rdrdy, wrdy, broken = select.select(
830                     [proc.stderr, proc.stdout],
831                     [],
832                     [proc.stderr, proc.stdout])
833                 
834                 if proc.stderr in rdrdy:
835                     # use os.read for fully unbuffered behavior
836                     err.append(os.read(proc.stderr.fileno(), 4096))
837                 
838                 if proc.stdout in rdrdy:
839                     # use os.read for fully unbuffered behavior
840                     buf = os.read(proc.stdout.fileno(), 4096)
841                     dest.write(buf)
842                     
843                     if not buf:
844                         #EOF
845                         break
846                 
847                 if broken:
848                     break
849             err.append(proc.stderr.read())
850                 
851             proc._known_hosts = tmp_known_hosts
852             eintr_retry(proc.wait)()
853             return ((None,''.join(err)), proc)
854         else:
855             raise AssertionError, "Unreachable code reached! :-Q"
856     else:
857         # Parse destination as <user>@<server>:<path>
858         if isinstance(dest, basestring) and ':' in dest:
859             remspec, path = dest.split(':',1)
860         elif isinstance(source, basestring) and ':' in source:
861             remspec, path = source.split(':',1)
862         else:
863             raise ValueError, "Both endpoints cannot be local"
864         user,host = remspec.rsplit('@',1)
865         
866         # plain scp
867         tmp_known_hosts = None
868         args = ['scp', '-q', '-p', '-C',
869                 # Don't bother with localhost. Makes test easier
870                 '-o', 'NoHostAuthenticationForLocalhost=yes',
871                 '-o', 'ConnectTimeout=60',
872                 '-o', 'ConnectionAttempts=3',
873                 '-o', 'ServerAliveInterval=30',
874                 '-o', 'TCPKeepAlive=yes' ]
875                 
876         if port:
877             args.append('-P%d' % port)
878         if recursive:
879             args.append('-r')
880         if ident_key:
881             args.extend(('-i', ident_key))
882         if server_key:
883             # Create a temporary server key file
884             tmp_known_hosts = _make_server_key_args(
885                 server_key, host, port, args)
886         if isinstance(source,list):
887             args.extend(source)
888         else:
889             if openssh_has_persist():
890                 args.extend([
891                     '-o', 'ControlMaster=auto',
892                     '-o', 'ControlPath=/tmp/nepi_ssh-%r@%h:%p'])
893             args.append(source)
894         args.append(dest)
895
896         # connects to the remote host and starts a remote connection
897         proc = subprocess.Popen(args, 
898                 stdout = subprocess.PIPE,
899                 stdin = subprocess.PIPE, 
900                 stderr = subprocess.PIPE)
901         proc._known_hosts = tmp_known_hosts
902         
903         comm = proc.communicate()
904         eintr_retry(proc.wait)()
905         return (comm, proc)
906
907 def decode_and_execute():
908     # The python code we want to execute might have characters that 
909     # are not compatible with the 'inline' mode we are using. To avoid
910     # problems we receive the encoded python code in base64 as a input 
911     # stream and decode it for execution.
912     import base64, os
913     cmd = ""
914     while True:
915         try:
916             cmd += os.read(0, 1)# one byte from stdin
917         except OSError, e:            
918             if e.errno == errno.EINTR:
919                 continue
920             else:
921                 raise
922         if cmd[-1] == "\n": 
923             break
924     cmd = base64.b64decode(cmd)
925     # Uncomment for debug
926     #os.write(2, "Executing python code: %s\n" % cmd)
927     os.write(1, "OK\n") # send a sync message
928     exec(cmd)
929
930 def popen_python(python_code, 
931         communication = DC.ACCESS_LOCAL,
932         host = None, 
933         port = None, 
934         user = None, 
935         agent = False, 
936         python_path = None,
937         ident_key = None,
938         server_key = None,
939         tty = False,
940         sudo = False, 
941         environment_setup = ""):
942
943     cmd = ""
944     if python_path:
945         python_path.replace("'", r"'\''")
946         cmd = """PYTHONPATH="$PYTHONPATH":'%s' """ % python_path
947         cmd += " ; "
948     if environment_setup:
949         cmd += environment_setup
950         cmd += " ; "
951     # Uncomment for debug (to run everything under strace)
952     # We had to verify if strace works (cannot nest them)
953     #cmd += "if strace echo >/dev/null 2>&1; then CMD='strace -ff -tt -s 200 -o strace.out'; else CMD=''; fi\n"
954     #cmd += "$CMD "
955     #cmd += "strace -f -tt -s 200 -o strace$$.out "
956     import nepi
957     cmd += "python -c 'import sys; sys.path.insert(0,%s); from nepi.util import server; server.decode_and_execute()'" % (
958         repr(os.path.dirname(os.path.dirname(nepi.__file__))).replace("'",'"'),
959     )
960
961     if sudo:
962         if ';' in cmd:
963             cmd = "sudo bash -c " + shell_escape(cmd)
964         else:
965             cmd = "sudo " + cmd
966
967     if communication == DC.ACCESS_SSH:
968         tmp_known_hosts = None
969         args = ['ssh', '-C',
970                 # Don't bother with localhost. Makes test easier
971                 '-o', 'NoHostAuthenticationForLocalhost=yes',
972                 '-o', 'ConnectionAttempts=3',
973                 '-o', 'ServerAliveInterval=30',
974                 '-o', 'TCPKeepAlive=yes',
975                 '-l', user, host]
976         if agent:
977             args.append('-A')
978         if port:
979             args.append('-p%d' % port)
980         if ident_key:
981             args.extend(('-i', ident_key))
982         if tty:
983             args.append('-t')
984         if server_key:
985             # Create a temporary server key file
986             tmp_known_hosts = _make_server_key_args(
987                 server_key, host, port, args)
988         args.append(cmd)
989     else:
990         args = [ "/bin/bash", "-c", cmd ]
991
992     # connects to the remote host and starts a remote
993     proc = subprocess.Popen(args,
994             shell = False, 
995             stdout = subprocess.PIPE,
996             stdin = subprocess.PIPE, 
997             stderr = subprocess.PIPE)
998
999     if communication == DC.ACCESS_SSH:
1000         proc._known_hosts = tmp_known_hosts
1001
1002     # send the command to execute
1003     os.write(proc.stdin.fileno(),
1004             base64.b64encode(python_code) + "\n")
1005  
1006     while True: 
1007         try:
1008             msg = os.read(proc.stdout.fileno(), 3)
1009             break
1010         except OSError, e:            
1011             if e.errno == errno.EINTR:
1012                 continue
1013             else:
1014                 raise
1015     
1016     if msg != "OK\n":
1017         raise RuntimeError, "Failed to start remote python interpreter: \nout:\n%s%s\nerr:\n%s" % (
1018             msg, proc.stdout.read(), proc.stderr.read())
1019
1020     return proc
1021
1022 # POSIX
1023 def _communicate(self, input, timeout=None, err_on_timeout=True):
1024     read_set = []
1025     write_set = []
1026     stdout = None # Return
1027     stderr = None # Return
1028     
1029     killed = False
1030     
1031     if timeout is not None:
1032         timelimit = time.time() + timeout
1033         killtime = timelimit + 4
1034         bailtime = timelimit + 4
1035
1036     if self.stdin:
1037         # Flush stdio buffer.  This might block, if the user has
1038         # been writing to .stdin in an uncontrolled fashion.
1039         self.stdin.flush()
1040         if input:
1041             write_set.append(self.stdin)
1042         else:
1043             self.stdin.close()
1044     if self.stdout:
1045         read_set.append(self.stdout)
1046         stdout = []
1047     if self.stderr:
1048         read_set.append(self.stderr)
1049         stderr = []
1050
1051     input_offset = 0
1052     while read_set or write_set:
1053         if timeout is not None:
1054             curtime = time.time()
1055             if timeout is None or curtime > timelimit:
1056                 if curtime > bailtime:
1057                     break
1058                 elif curtime > killtime:
1059                     signum = signal.SIGKILL
1060                 else:
1061                     signum = signal.SIGTERM
1062                 # Lets kill it
1063                 os.kill(self.pid, signum)
1064                 select_timeout = 0.5
1065             else:
1066                 select_timeout = timelimit - curtime + 0.1
1067         else:
1068             select_timeout = 1.0
1069         
1070         if select_timeout > 1.0:
1071             select_timeout = 1.0
1072             
1073         try:
1074             rlist, wlist, xlist = select.select(read_set, write_set, [], select_timeout)
1075         except select.error,e:
1076             if e[0] != 4:
1077                 raise
1078             else:
1079                 continue
1080         
1081         if not rlist and not wlist and not xlist and self.poll() is not None:
1082             # timeout and process exited, say bye
1083             break
1084
1085         if self.stdin in wlist:
1086             # When select has indicated that the file is writable,
1087             # we can write up to PIPE_BUF bytes without risk
1088             # blocking.  POSIX defines PIPE_BUF >= 512
1089             bytes_written = os.write(self.stdin.fileno(), buffer(input, input_offset, 512))
1090             input_offset += bytes_written
1091             if input_offset >= len(input):
1092                 self.stdin.close()
1093                 write_set.remove(self.stdin)
1094
1095         if self.stdout in rlist:
1096             data = os.read(self.stdout.fileno(), 1024)
1097             if data == "":
1098                 self.stdout.close()
1099                 read_set.remove(self.stdout)
1100             stdout.append(data)
1101
1102         if self.stderr in rlist:
1103             data = os.read(self.stderr.fileno(), 1024)
1104             if data == "":
1105                 self.stderr.close()
1106                 read_set.remove(self.stderr)
1107             stderr.append(data)
1108     
1109     # All data exchanged.  Translate lists into strings.
1110     if stdout is not None:
1111         stdout = ''.join(stdout)
1112     if stderr is not None:
1113         stderr = ''.join(stderr)
1114
1115     # Translate newlines, if requested.  We cannot let the file
1116     # object do the translation: It is based on stdio, which is
1117     # impossible to combine with select (unless forcing no
1118     # buffering).
1119     if self.universal_newlines and hasattr(file, 'newlines'):
1120         if stdout:
1121             stdout = self._translate_newlines(stdout)
1122         if stderr:
1123             stderr = self._translate_newlines(stderr)
1124
1125     if killed and err_on_timeout:
1126         errcode = self.poll()
1127         raise RuntimeError, ("Operation timed out", errcode, stdout, stderr)
1128     else:
1129         if killed:
1130             self.poll()
1131         else:
1132             self.wait()
1133         return (stdout, stderr)
1134