Added unit tests for SFA PlanetLab.
[nepi.git] / src / nepi / util / server.py
1 # -*- coding: utf-8 -*-
2
3 from nepi.util.constants import DeploymentConfiguration as DC
4
5 import base64
6 import errno
7 import os
8 import os.path
9 import resource
10 import select
11 import shutil
12 import signal
13 import socket
14 import sys
15 import subprocess
16 import threading
17 import time
18 import traceback
19 import re
20 import tempfile
21 import defer
22 import functools
23 import collections
24 import hashlib
25
26 CTRL_SOCK = "ctrl.sock"
27 CTRL_PID = "ctrl.pid"
28 STD_ERR = "stderr.log"
29 MAX_FD = 1024
30
31 STOP_MSG = "STOP"
32
33 TRACE = os.environ.get("NEPI_TRACE", "false").lower() in ("true", "1", "on")
34
35 OPENSSH_HAS_PERSIST = None
36
37 if hasattr(os, "devnull"):
38     DEV_NULL = os.devnull
39 else:
40     DEV_NULL = "/dev/null"
41
42 SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
43
44 def openssh_has_persist():
45     global OPENSSH_HAS_PERSIST
46     if OPENSSH_HAS_PERSIST is None:
47         proc = subprocess.Popen(["ssh","-v"],
48             stdout = subprocess.PIPE,
49             stderr = subprocess.STDOUT,
50             stdin = open("/dev/null","r") )
51         out,err = proc.communicate()
52         proc.wait()
53         
54         vre = re.compile(r'OpenSSH_(?:[6-9]|5[.][8-9]|5[.][1-9][0-9]|[1-9][0-9]).*', re.I)
55         OPENSSH_HAS_PERSIST = bool(vre.match(out))
56     return OPENSSH_HAS_PERSIST
57
58 def shell_escape(s):
59     """ Escapes strings so that they are safe to use as command-line arguments """
60     if SHELL_SAFE.match(s):
61         # safe string - no escaping needed
62         return s
63     else:
64         # unsafe string - escape
65         def escp(c):
66             if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",'"'):
67                 return c
68             else:
69                 return "'$'\\x%02x''" % (ord(c),)
70         s = ''.join(map(escp,s))
71         return "'%s'" % (s,)
72
73 def eintr_retry(func):
74     import functools
75     @functools.wraps(func)
76     def rv(*p, **kw):
77         retry = kw.pop("_retry", False)
78         for i in xrange(0 if retry else 4):
79             try:
80                 return func(*p, **kw)
81             except (select.error, socket.error), args:
82                 if args[0] == errno.EINTR:
83                     continue
84                 else:
85                     raise 
86             except OSError, e:
87                 if e.errno == errno.EINTR:
88                     continue
89                 else:
90                     raise
91         else:
92             return func(*p, **kw)
93     return rv
94
95 class Server(object):
96     def __init__(self, root_dir = ".", log_level = DC.ERROR_LEVEL, 
97             environment_setup = "", clean_root = False):
98         self._root_dir = root_dir
99         self._clean_root = clean_root
100         self._stop = False
101         self._ctrl_sock = None
102         self._log_level = log_level
103         self._rdbuf = ""
104         self._environment_setup = environment_setup
105
106     def run(self):
107         try:
108             if self.daemonize():
109                 self.post_daemonize()
110                 self.loop()
111                 self.cleanup()
112                 # ref: "os._exit(0)"
113                 # can not return normally after fork beacuse no exec was done.
114                 # This means that if we don't do a os._exit(0) here the code that 
115                 # follows the call to "Server.run()" in the "caller code" will be 
116                 # executed... but by now it has already been executed after the 
117                 # first process (the one that did the first fork) returned.
118                 os._exit(0)
119         except:
120             print >>sys.stderr, "SERVER_ERROR."
121             self.log_error()
122             self.cleanup()
123             os._exit(0)
124         print >>sys.stderr, "SERVER_READY."
125
126     def daemonize(self):
127         # pipes for process synchronization
128         (r, w) = os.pipe()
129         
130         # build root folder
131         root = os.path.normpath(self._root_dir)
132         if self._root_dir not in [".", ""] and os.path.exists(root) \
133                 and self._clean_root:
134             shutil.rmtree(root)
135         if not os.path.exists(root):
136             os.makedirs(root, 0755)
137
138         pid1 = os.fork()
139         if pid1 > 0:
140             os.close(w)
141             while True:
142                 try:
143                     os.read(r, 1)
144                 except OSError, e: # pragma: no cover
145                     if e.errno == errno.EINTR:
146                         continue
147                     else:
148                         raise
149                 break
150             os.close(r)
151             # os.waitpid avoids leaving a <defunc> (zombie) process
152             st = os.waitpid(pid1, 0)[1]
153             if st:
154                 raise RuntimeError("Daemonization failed")
155             # return 0 to inform the caller method that this is not the 
156             # daemonized process
157             return 0
158         os.close(r)
159
160         # Decouple from parent environment.
161         os.chdir(self._root_dir)
162         os.umask(0)
163         os.setsid()
164
165         # fork 2
166         pid2 = os.fork()
167         if pid2 > 0:
168             # see ref: "os._exit(0)"
169             os._exit(0)
170
171         # close all open file descriptors.
172         max_fd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
173         if (max_fd == resource.RLIM_INFINITY):
174             max_fd = MAX_FD
175         for fd in range(3, max_fd):
176             if fd != w:
177                 try:
178                     os.close(fd)
179                 except OSError:
180                     pass
181
182         # Redirect standard file descriptors.
183         stdin = open(DEV_NULL, "r")
184         stderr = stdout = open(STD_ERR, "a", 0)
185         os.dup2(stdin.fileno(), sys.stdin.fileno())
186         # NOTE: sys.stdout.write will still be buffered, even if the file
187         # was opened with 0 buffer
188         os.dup2(stdout.fileno(), sys.stdout.fileno())
189         os.dup2(stderr.fileno(), sys.stderr.fileno())
190         
191         # setup environment
192         if self._environment_setup:
193             # parse environment variables and pass to child process
194             # do it by executing shell commands, in case there's some heavy setup involved
195             envproc = subprocess.Popen(
196                 [ "bash", "-c", 
197                     "( %s python -c 'import os,sys ; print \"\\x01\".join(\"\\x02\".join(map(str,x)) for x in os.environ.iteritems())' ) | tail -1" %
198                         ( self._environment_setup, ) ],
199                 stdin = subprocess.PIPE, 
200                 stdout = subprocess.PIPE,
201                 stderr = subprocess.PIPE
202             )
203             out,err = envproc.communicate()
204
205             # parse new environment
206             if out:
207                 environment = dict(map(lambda x:x.split("\x02"), out.split("\x01")))
208             
209                 # apply to current environment
210                 for name, value in environment.iteritems():
211                     os.environ[name] = value
212                 
213                 # apply pythonpath
214                 if 'PYTHONPATH' in environment:
215                     sys.path = environment['PYTHONPATH'].split(':') + sys.path
216
217         # create control socket
218         self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
219         try:
220             self._ctrl_sock.bind(CTRL_SOCK)
221         except socket.error:
222             # Address in use, check pidfile
223             pid = None
224             try:
225                 pidfile = open(CTRL_PID, "r")
226                 pid = pidfile.read()
227                 pidfile.close()
228                 pid = int(pid)
229             except:
230                 # no pidfile
231                 pass
232             
233             if pid is not None:
234                 # Check process liveliness
235                 if not os.path.exists("/proc/%d" % (pid,)):
236                     # Ok, it's dead, clean the socket
237                     os.remove(CTRL_SOCK)
238             
239             # try again
240             self._ctrl_sock.bind(CTRL_SOCK)
241             
242         self._ctrl_sock.listen(0)
243         
244         # Save pidfile
245         pidfile = open(CTRL_PID, "w")
246         pidfile.write(str(os.getpid()))
247         pidfile.close()
248
249         # let the parent process know that the daemonization is finished
250         os.write(w, "\n")
251         os.close(w)
252         return 1
253
254     def post_daemonize(self):
255         os.environ["NEPI_CONTROLLER_LOGLEVEL"] = self._log_level
256         # QT, for some strange reason, redefines the SIGCHILD handler to write
257         # a \0 to a fd (lets say fileno 'x'), when ever a SIGCHILD is received.
258         # Server dameonization closes all file descriptors from fileno '3',
259         # but the overloaded handler (inherited by the forked process) will
260         # keep trying to write the \0 to fileno 'x', which might have been reused 
261         # after closing, for other operations. This is bad bad bad when fileno 'x'
262         # is in use for communication pouroses, because unexpected \0 start
263         # appearing in the communication messages... this is exactly what happens 
264         # when using netns in daemonized form. Thus, be have no other alternative than
265         # restoring the SIGCHLD handler to the default here.
266         import signal
267         signal.signal(signal.SIGCHLD, signal.SIG_DFL)
268
269     def loop(self):
270         while not self._stop:
271             conn, addr = self._ctrl_sock.accept()
272             self.log_error("ACCEPTED CONNECTION: %s" % (addr,))
273             conn.settimeout(5)
274             while not self._stop:
275                 try:
276                     msg = self.recv_msg(conn)
277                 except socket.timeout, e:
278                     #self.log_error("SERVER recv_msg: connection timedout ")
279                     continue
280                 
281                 if not msg:
282                     self.log_error("CONNECTION LOST")
283                     break
284                     
285                 if msg == STOP_MSG:
286                     self._stop = True
287                     reply = self.stop_action()
288                 else:
289                     reply = self.reply_action(msg)
290                 
291                 try:
292                     self.send_reply(conn, reply)
293                 except socket.error:
294                     self.log_error()
295                     self.log_error("NOTICE: Awaiting for reconnection")
296                     break
297             try:
298                 conn.close()
299             except:
300                 # Doesn't matter
301                 self.log_error()
302
303     def recv_msg(self, conn):
304         data = [self._rdbuf]
305         chunk = data[0]
306         while '\n' not in chunk:
307             try:
308                 chunk = conn.recv(1024)
309             except (OSError, socket.error), e:
310                 if e[0] != errno.EINTR:
311                     raise
312                 else:
313                     continue
314             if chunk:
315                 data.append(chunk)
316             else:
317                 # empty chunk = EOF
318                 break
319         data = ''.join(data).split('\n',1)
320         while len(data) < 2:
321             data.append('')
322         data, self._rdbuf = data
323         
324         decoded = base64.b64decode(data)
325         return decoded.rstrip()
326
327     def send_reply(self, conn, reply):
328         encoded = base64.b64encode(reply)
329         conn.send("%s\n" % encoded)
330        
331     def cleanup(self):
332         try:
333             self._ctrl_sock.close()
334             os.remove(CTRL_SOCK)
335         except:
336             self.log_error()
337
338     def stop_action(self):
339         return "Stopping server"
340
341     def reply_action(self, msg):
342         return "Reply to: %s" % msg
343
344     def log_error(self, text = None, context = ''):
345         if text == None:
346             text = traceback.format_exc()
347         date = time.strftime("%Y-%m-%d %H:%M:%S")
348         if context:
349             context = " (%s)" % (context,)
350         sys.stderr.write("ERROR%s: %s\n%s\n" % (context, date, text))
351         return text
352
353     def log_debug(self, text):
354         if self._log_level == DC.DEBUG_LEVEL:
355             date = time.strftime("%Y-%m-%d %H:%M:%S")
356             sys.stderr.write("DEBUG: %s\n%s\n" % (date, text))
357
358 class Forwarder(object):
359     def __init__(self, root_dir = "."):
360         self._ctrl_sock = None
361         self._root_dir = root_dir
362         self._stop = False
363         self._rdbuf = ""
364
365     def forward(self):
366         self.connect()
367         print >>sys.stderr, "FORWARDER_READY."
368         while not self._stop:
369             data = self.read_data()
370             if not data:
371                 # Connection to client lost
372                 break
373             self.send_to_server(data)
374             
375             data = self.recv_from_server()
376             if not data:
377                 # Connection to server lost
378                 raise IOError, "Connection to server lost while "\
379                     "expecting response"
380             self.write_data(data)
381         self.disconnect()
382
383     def read_data(self):
384         return sys.stdin.readline()
385
386     def write_data(self, data):
387         sys.stdout.write(data)
388         # sys.stdout.write is buffered, this is why we need to do a flush()
389         sys.stdout.flush()
390
391     def send_to_server(self, data):
392         try:
393             self._ctrl_sock.send(data)
394         except (IOError, socket.error), e:
395             if e[0] == errno.EPIPE:
396                 self.connect()
397                 self._ctrl_sock.send(data)
398             else:
399                 raise e
400         encoded = data.rstrip() 
401         msg = base64.b64decode(encoded)
402         if msg == STOP_MSG:
403             self._stop = True
404
405     def recv_from_server(self):
406         data = [self._rdbuf]
407         chunk = data[0]
408         while '\n' not in chunk:
409             try:
410                 chunk = self._ctrl_sock.recv(1024)
411             except (OSError, socket.error), e:
412                 if e[0] != errno.EINTR:
413                     raise
414                 continue
415             if chunk:
416                 data.append(chunk)
417             else:
418                 # empty chunk = EOF
419                 break
420         data = ''.join(data).split('\n',1)
421         while len(data) < 2:
422             data.append('')
423         data, self._rdbuf = data
424         
425         return data+'\n'
426  
427     def connect(self):
428         self.disconnect()
429         self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
430         sock_addr = os.path.join(self._root_dir, CTRL_SOCK)
431         self._ctrl_sock.connect(sock_addr)
432
433     def disconnect(self):
434         try:
435             self._ctrl_sock.close()
436         except:
437             pass
438
439 class Client(object):
440     def __init__(self, root_dir = ".", host = None, port = None, user = None, 
441             agent = None, sudo = False, communication = DC.ACCESS_LOCAL,
442             environment_setup = ""):
443         self.root_dir = root_dir
444         self.addr = (host, port)
445         self.user = user
446         self.agent = agent
447         self.sudo = sudo
448         self.communication = communication
449         self.environment_setup = environment_setup
450         self._stopped = False
451         self._deferreds = collections.deque()
452         self.connect()
453     
454     def __del__(self):
455         if self._process.poll() is None:
456             os.kill(self._process.pid, signal.SIGTERM)
457         self._process.wait()
458         
459     def connect(self):
460         root_dir = self.root_dir
461         (host, port) = self.addr
462         user = self.user
463         agent = self.agent
464         sudo = self.sudo
465         communication = self.communication
466         
467         python_code = "from nepi.util import server;c=server.Forwarder(%r);\
468                 c.forward()" % (root_dir,)
469
470         self._process = popen_python(python_code, 
471                     communication = communication,
472                     host = host, 
473                     port = port, 
474                     user = user, 
475                     agent = agent, 
476                     sudo = sudo, 
477                     environment_setup = self.environment_setup)
478                
479         # Wait for the forwarder to be ready, otherwise nobody
480         # will be able to connect to it
481         err = []
482         helo = "nope"
483         while helo:
484             helo = self._process.stderr.readline()
485             if helo == 'FORWARDER_READY.\n':
486                 break
487             err.append(helo)
488         else:
489             raise AssertionError, "Expected 'FORWARDER_READY.', got: %s" % (''.join(err),)
490         
491     def send_msg(self, msg):
492         encoded = base64.b64encode(msg)
493         data = "%s\n" % encoded
494         
495         try:
496             self._process.stdin.write(data)
497         except (IOError, ValueError):
498             # dead process, poll it to un-zombify
499             self._process.poll()
500             
501             # try again after reconnect
502             # If it fails again, though, give up
503             self.connect()
504             self._process.stdin.write(data)
505
506     def send_stop(self):
507         self.send_msg(STOP_MSG)
508         self._stopped = True
509
510     def defer_reply(self, transform=None):
511         defer_entry = []
512         self._deferreds.append(defer_entry)
513         return defer.Defer(
514             functools.partial(self.read_reply, defer_entry, transform)
515         )
516         
517     def _read_reply(self):
518         data = self._process.stdout.readline()
519         encoded = data.rstrip() 
520         if not encoded:
521             # empty == eof == dead process, poll it to un-zombify
522             self._process.poll()
523             
524             raise RuntimeError, "Forwarder died while awaiting reply: %s" % (self._process.stderr.read(),)
525         return base64.b64decode(encoded)
526     
527     def read_reply(self, which=None, transform=None):
528         # Test to see if someone did it already
529         if which is not None and len(which):
530             # Ok, they did it...
531             # ...just return the deferred value
532             if transform:
533                 return transform(which[0])
534             else:
535                 return which[0]
536         
537         # Process all deferreds until the one we're looking for
538         # or until the queue is empty
539         while self._deferreds:
540             try:
541                 deferred = self._deferreds.popleft()
542             except IndexError:
543                 # emptied
544                 break
545             
546             deferred.append(self._read_reply())
547             if deferred is which:
548                 # We reached the one we were looking for
549                 if transform:
550                     return transform(deferred[0])
551                 else:
552                     return deferred[0]
553         
554         if which is None:
555             # They've requested a synchronous read
556             if transform:
557                 return transform(self._read_reply())
558             else:
559                 return self._read_reply()
560
561 def _make_server_key_args(server_key, host, port, args):
562     """ 
563     Returns a reference to the created temporary file, and adds the
564     corresponding arguments to the given argument list.
565     
566     Make sure to hold onto it until the process is done with the file
567     """
568     if port is not None:
569         host = '%s:%s' % (host,port)
570     # Create a temporary server key file
571     tmp_known_hosts = tempfile.NamedTemporaryFile()
572     
573     # Add the intended host key
574     tmp_known_hosts.write('%s,%s %s\n' % (host, socket.gethostbyname(host), server_key))
575     
576     # If we're not in strict mode, add user-configured keys
577     if os.environ.get('NEPI_STRICT_AUTH_MODE',"").lower() not in ('1','true','on'):
578         user_hosts_path = '%s/.ssh/known_hosts' % (os.environ.get('HOME',""),)
579         if os.access(user_hosts_path, os.R_OK):
580             f = open(user_hosts_path, "r")
581             tmp_known_hosts.write(f.read())
582             f.close()
583         
584     tmp_known_hosts.flush()
585     
586     args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
587     
588     return tmp_known_hosts
589
590 def make_connkey(user, host, port):
591     connkey = repr((user,host,port)).encode("base64").strip().replace('/','.')
592     if len(connkey) > 60:
593         connkey = hashlib.sha1(connkey).hexdigest()
594     return connkey
595
596 def popen_ssh_command(command, host, port, user, agent, 
597         stdin="", 
598         ident_key = None,
599         server_key = None,
600         tty = False,
601         timeout = None,
602         retry = 0,
603         err_on_timeout = True,
604         connect_timeout = 30,
605         persistent = True,
606         hostip = None):
607     """
608     Executes a remote commands, returns ((stdout,stderr),process)
609     """
610     if TRACE:
611         print "ssh", host, command
612     
613     tmp_known_hosts = None
614     connkey = make_connkey(user,host,port)
615     args = ['ssh', '-C',
616             # Don't bother with localhost. Makes test easier
617             '-o', 'NoHostAuthenticationForLocalhost=yes',
618             '-o', 'ConnectTimeout=%d' % (int(connect_timeout),),
619             '-o', 'ConnectionAttempts=3',
620             '-o', 'ServerAliveInterval=30',
621             '-o', 'TCPKeepAlive=yes',
622             '-l', user, hostip or host]
623     if persistent and openssh_has_persist():
624         args.extend([
625             '-o', 'ControlMaster=auto',
626             '-o', 'ControlPath=/tmp/nepi_ssh_pl_%s' % ( connkey, ),
627             '-o', 'ControlPersist=60' ])
628     if agent:
629         args.append('-A')
630     if port:
631         args.append('-p%d' % port)
632     if ident_key:
633         args.extend(('-i', ident_key))
634     if tty:
635         args.append('-t')
636     if server_key:
637         # Create a temporary server key file
638         tmp_known_hosts = _make_server_key_args(
639             server_key, host, port, args)
640     args.append(command)
641
642     for x in xrange(retry or 3):
643         # connects to the remote host and starts a remote connection
644         proc = subprocess.Popen(args, 
645                 stdout = subprocess.PIPE,
646                 stdin = subprocess.PIPE, 
647                 stderr = subprocess.PIPE)
648         
649         # attach tempfile object to the process, to make sure the file stays
650         # alive until the process is finished with it
651         proc._known_hosts = tmp_known_hosts
652         
653         try:
654             out, err = _communicate(proc, stdin, timeout, err_on_timeout)
655             if proc.poll():
656                 if err.strip().startswith('ssh: ') or err.strip().startswith('mux_client_hello_exchange: '):
657                     # SSH error, can safely retry
658                     continue
659                 elif retry:
660                     # Probably timed out or plain failed but can retry
661                     continue
662             break
663         except RuntimeError,e:
664             if retry <= 0:
665                 raise
666             if TRACE:
667                 print " timedout -> ", e.args
668             retry -= 1
669         
670     if TRACE:
671         print " -> ", out, err
672
673     return ((out, err), proc)
674
675 def popen_scp(source, dest, 
676         port = None, 
677         agent = None, 
678         recursive = False,
679         ident_key = None,
680         server_key = None):
681     """
682     Copies from/to remote sites.
683     
684     Source and destination should have the user and host encoded
685     as per scp specs.
686     
687     If source is a file object, a special mode will be used to
688     create the remote file with the same contents.
689     
690     If dest is a file object, the remote file (source) will be
691     read and written into dest.
692     
693     In these modes, recursive cannot be True.
694     
695     Source can be a list of files to copy to a single destination,
696     in which case it is advised that the destination be a folder.
697     """
698     
699     if TRACE:
700         print "scp", source, dest
701     
702     if isinstance(source, file) and source.tell() == 0:
703         source = source.name
704     elif hasattr(source, 'read'):
705         tmp = tempfile.NamedTemporaryFile()
706         while True:
707             buf = source.read(65536)
708             if buf:
709                 tmp.write(buf)
710             else:
711                 break
712         tmp.seek(0)
713         source = tmp.name
714     
715     if isinstance(source, file) or isinstance(dest, file) \
716             or hasattr(source, 'read')  or hasattr(dest, 'write'):
717         assert not recursive
718         
719         # Parse source/destination as <user>@<server>:<path>
720         if isinstance(dest, basestring) and ':' in dest:
721             remspec, path = dest.split(':',1)
722         elif isinstance(source, basestring) and ':' in source:
723             remspec, path = source.split(':',1)
724         else:
725             raise ValueError, "Both endpoints cannot be local"
726         user,host = remspec.rsplit('@',1)
727         tmp_known_hosts = None
728         
729         connkey = make_connkey(user,host,port)
730         args = ['ssh', '-l', user, '-C',
731                 # Don't bother with localhost. Makes test easier
732                 '-o', 'NoHostAuthenticationForLocalhost=yes',
733                 '-o', 'ConnectTimeout=30',
734                 '-o', 'ConnectionAttempts=3',
735                 '-o', 'ServerAliveInterval=30',
736                 '-o', 'TCPKeepAlive=yes',
737                 host ]
738         if openssh_has_persist():
739             args.extend([
740                 '-o', 'ControlMaster=auto',
741                 '-o', 'ControlPath=/tmp/nepi_ssh_pl_%s' % ( connkey, ),
742                 '-o', 'ControlPersist=60' ])
743         if port:
744             args.append('-P%d' % port)
745         if ident_key:
746             args.extend(('-i', ident_key))
747         if server_key:
748             # Create a temporary server key file
749             tmp_known_hosts = _make_server_key_args(
750                 server_key, host, port, args)
751         
752         if isinstance(source, file) or hasattr(source, 'read'):
753             args.append('cat > %s' % (shell_escape(path),))
754         elif isinstance(dest, file) or hasattr(dest, 'write'):
755             args.append('cat %s' % (shell_escape(path),))
756         else:
757             raise AssertionError, "Unreachable code reached! :-Q"
758         
759         # connects to the remote host and starts a remote connection
760         if isinstance(source, file):
761             proc = subprocess.Popen(args, 
762                     stdout = open('/dev/null','w'),
763                     stderr = subprocess.PIPE,
764                     stdin = source)
765             err = proc.stderr.read()
766             proc._known_hosts = tmp_known_hosts
767             eintr_retry(proc.wait)()
768             return ((None,err), proc)
769         elif isinstance(dest, file):
770             proc = subprocess.Popen(args, 
771                     stdout = open('/dev/null','w'),
772                     stderr = subprocess.PIPE,
773                     stdin = source)
774             err = proc.stderr.read()
775             proc._known_hosts = tmp_known_hosts
776             eintr_retry(proc.wait)()
777             return ((None,err), proc)
778         elif hasattr(source, 'read'):
779             # file-like (but not file) source
780             proc = subprocess.Popen(args, 
781                     stdout = open('/dev/null','w'),
782                     stderr = subprocess.PIPE,
783                     stdin = subprocess.PIPE)
784             
785             buf = None
786             err = []
787             while True:
788                 if not buf:
789                     buf = source.read(4096)
790                 if not buf:
791                     #EOF
792                     break
793                 
794                 rdrdy, wrdy, broken = select.select(
795                     [proc.stderr],
796                     [proc.stdin],
797                     [proc.stderr,proc.stdin])
798                 
799                 if proc.stderr in rdrdy:
800                     # use os.read for fully unbuffered behavior
801                     err.append(os.read(proc.stderr.fileno(), 4096))
802                 
803                 if proc.stdin in wrdy:
804                     proc.stdin.write(buf)
805                     buf = None
806                 
807                 if broken:
808                     break
809             proc.stdin.close()
810             err.append(proc.stderr.read())
811                 
812             proc._known_hosts = tmp_known_hosts
813             eintr_retry(proc.wait)()
814             return ((None,''.join(err)), proc)
815         elif hasattr(dest, 'write'):
816             # file-like (but not file) dest
817             proc = subprocess.Popen(args, 
818                     stdout = subprocess.PIPE,
819                     stderr = subprocess.PIPE,
820                     stdin = open('/dev/null','w'))
821             
822             buf = None
823             err = []
824             while True:
825                 rdrdy, wrdy, broken = select.select(
826                     [proc.stderr, proc.stdout],
827                     [],
828                     [proc.stderr, proc.stdout])
829                 
830                 if proc.stderr in rdrdy:
831                     # use os.read for fully unbuffered behavior
832                     err.append(os.read(proc.stderr.fileno(), 4096))
833                 
834                 if proc.stdout in rdrdy:
835                     # use os.read for fully unbuffered behavior
836                     buf = os.read(proc.stdout.fileno(), 4096)
837                     dest.write(buf)
838                     
839                     if not buf:
840                         #EOF
841                         break
842                 
843                 if broken:
844                     break
845             err.append(proc.stderr.read())
846                 
847             proc._known_hosts = tmp_known_hosts
848             eintr_retry(proc.wait)()
849             return ((None,''.join(err)), proc)
850         else:
851             raise AssertionError, "Unreachable code reached! :-Q"
852     else:
853         # Parse destination as <user>@<server>:<path>
854         if isinstance(dest, basestring) and ':' in dest:
855             remspec, path = dest.split(':',1)
856         elif isinstance(source, basestring) and ':' in source:
857             remspec, path = source.split(':',1)
858         else:
859             raise ValueError, "Both endpoints cannot be local"
860         user,host = remspec.rsplit('@',1)
861         
862         # plain scp
863         tmp_known_hosts = None
864         args = ['scp', '-q', '-p', '-C',
865                 # Don't bother with localhost. Makes test easier
866                 '-o', 'NoHostAuthenticationForLocalhost=yes',
867                 '-o', 'ConnectTimeout=30',
868                 '-o', 'ConnectionAttempts=3',
869                 '-o', 'ServerAliveInterval=30',
870                 '-o', 'TCPKeepAlive=yes' ]
871                 
872         if port:
873             args.append('-P%d' % port)
874         if recursive:
875             args.append('-r')
876         if ident_key:
877             args.extend(('-i', ident_key))
878         if server_key:
879             # Create a temporary server key file
880             tmp_known_hosts = _make_server_key_args(
881                 server_key, host, port, args)
882         if isinstance(source,list):
883             args.extend(source)
884         else:
885             if openssh_has_persist():
886                 connkey = make_connkey(user,host,port)
887                 args.extend([
888                     '-o', 'ControlMaster=no',
889                     '-o', 'ControlPath=/tmp/nepi_ssh_pl_%s' % ( connkey, ) ])
890             args.append(source)
891         args.append(dest)
892
893         # connects to the remote host and starts a remote connection
894         proc = subprocess.Popen(args, 
895                 stdout = subprocess.PIPE,
896                 stdin = subprocess.PIPE, 
897                 stderr = subprocess.PIPE)
898         proc._known_hosts = tmp_known_hosts
899         
900         comm = proc.communicate()
901         eintr_retry(proc.wait)()
902         return (comm, proc)
903
904 def decode_and_execute():
905     # The python code we want to execute might have characters that 
906     # are not compatible with the 'inline' mode we are using. To avoid
907     # problems we receive the encoded python code in base64 as a input 
908     # stream and decode it for execution.
909     import base64, os
910     cmd = ""
911     while True:
912         try:
913             cmd += os.read(0, 1)# one byte from stdin
914         except OSError, e:            
915             if e.errno == errno.EINTR:
916                 continue
917             else:
918                 raise
919         if cmd[-1] == "\n": 
920             break
921     cmd = base64.b64decode(cmd)
922     # Uncomment for debug
923     #os.write(2, "Executing python code: %s\n" % cmd)
924     os.write(1, "OK\n") # send a sync message
925     exec(cmd)
926
927 def popen_python(python_code, 
928         communication = DC.ACCESS_LOCAL,
929         host = None, 
930         port = None, 
931         user = None, 
932         agent = False, 
933         python_path = None,
934         ident_key = None,
935         server_key = None,
936         tty = False,
937         sudo = False, 
938         environment_setup = ""):
939
940     cmd = ""
941     if python_path:
942         python_path.replace("'", r"'\''")
943         cmd = """PYTHONPATH="$PYTHONPATH":'%s' """ % python_path
944         cmd += " ; "
945     if environment_setup:
946         cmd += environment_setup
947         cmd += " ; "
948     # Uncomment for debug (to run everything under strace)
949     # We had to verify if strace works (cannot nest them)
950     #cmd += "if strace echo >/dev/null 2>&1; then CMD='strace -ff -tt -s 200 -o strace.out'; else CMD=''; fi\n"
951     #cmd += "$CMD "
952     #cmd += "strace -f -tt -s 200 -o strace$$.out "
953     import nepi
954     cmd += "python -c 'import sys; sys.path.insert(0,%s); from nepi.util import server; server.decode_and_execute()'" % (
955         repr(os.path.dirname(os.path.dirname(nepi.__file__))).replace("'",'"'),
956     )
957
958     if sudo:
959         if ';' in cmd:
960             cmd = "sudo bash -c " + shell_escape(cmd)
961         else:
962             cmd = "sudo " + cmd
963
964     if communication == DC.ACCESS_SSH:
965         tmp_known_hosts = None
966         args = ['ssh', '-C',
967                 # Don't bother with localhost. Makes test easier
968                 '-o', 'NoHostAuthenticationForLocalhost=yes',
969                 '-o', 'ConnectionAttempts=3',
970                 '-o', 'ServerAliveInterval=30',
971                 '-o', 'TCPKeepAlive=yes',
972                 '-l', user, host]
973         if agent:
974             args.append('-A')
975         if port:
976             args.append('-p%d' % port)
977         if ident_key:
978             args.extend(('-i', ident_key))
979         if tty:
980             args.append('-t')
981         if server_key:
982             # Create a temporary server key file
983             tmp_known_hosts = _make_server_key_args(
984                 server_key, host, port, args)
985         args.append(cmd)
986     else:
987         args = [ "/bin/bash", "-c", cmd ]
988
989     # connects to the remote host and starts a remote
990     proc = subprocess.Popen(args,
991             shell = False, 
992             stdout = subprocess.PIPE,
993             stdin = subprocess.PIPE, 
994             stderr = subprocess.PIPE)
995
996     if communication == DC.ACCESS_SSH:
997         proc._known_hosts = tmp_known_hosts
998
999     # send the command to execute
1000     os.write(proc.stdin.fileno(),
1001             base64.b64encode(python_code) + "\n")
1002  
1003     while True: 
1004         try:
1005             msg = os.read(proc.stdout.fileno(), 3)
1006             break
1007         except OSError, e:            
1008             if e.errno == errno.EINTR:
1009                 continue
1010             else:
1011                 raise
1012     
1013     if msg != "OK\n":
1014         raise RuntimeError, "Failed to start remote python interpreter: \nout:\n%s%s\nerr:\n%s" % (
1015             msg, proc.stdout.read(), proc.stderr.read())
1016
1017     return proc
1018
1019 # POSIX
1020 def _communicate(self, input, timeout=None, err_on_timeout=True):
1021     read_set = []
1022     write_set = []
1023     stdout = None # Return
1024     stderr = None # Return
1025     
1026     killed = False
1027     
1028     if timeout is not None:
1029         timelimit = time.time() + timeout
1030         killtime = timelimit + 4
1031         bailtime = timelimit + 4
1032
1033     if self.stdin:
1034         # Flush stdio buffer.  This might block, if the user has
1035         # been writing to .stdin in an uncontrolled fashion.
1036         self.stdin.flush()
1037         if input:
1038             write_set.append(self.stdin)
1039         else:
1040             self.stdin.close()
1041     if self.stdout:
1042         read_set.append(self.stdout)
1043         stdout = []
1044     if self.stderr:
1045         read_set.append(self.stderr)
1046         stderr = []
1047
1048     input_offset = 0
1049     while read_set or write_set:
1050         if timeout is not None:
1051             curtime = time.time()
1052             if timeout is None or curtime > timelimit:
1053                 if curtime > bailtime:
1054                     break
1055                 elif curtime > killtime:
1056                     signum = signal.SIGKILL
1057                 else:
1058                     signum = signal.SIGTERM
1059                 # Lets kill it
1060                 os.kill(self.pid, signum)
1061                 select_timeout = 0.5
1062             else:
1063                 select_timeout = timelimit - curtime + 0.1
1064         else:
1065             select_timeout = 1.0
1066         
1067         if select_timeout > 1.0:
1068             select_timeout = 1.0
1069             
1070         try:
1071             rlist, wlist, xlist = select.select(read_set, write_set, [], select_timeout)
1072         except select.error,e:
1073             if e[0] != 4:
1074                 raise
1075             else:
1076                 continue
1077         
1078         if not rlist and not wlist and not xlist and self.poll() is not None:
1079             # timeout and process exited, say bye
1080             break
1081
1082         if self.stdin in wlist:
1083             # When select has indicated that the file is writable,
1084             # we can write up to PIPE_BUF bytes without risk
1085             # blocking.  POSIX defines PIPE_BUF >= 512
1086             bytes_written = os.write(self.stdin.fileno(), buffer(input, input_offset, 512))
1087             input_offset += bytes_written
1088             if input_offset >= len(input):
1089                 self.stdin.close()
1090                 write_set.remove(self.stdin)
1091
1092         if self.stdout in rlist:
1093             data = os.read(self.stdout.fileno(), 1024)
1094             if data == "":
1095                 self.stdout.close()
1096                 read_set.remove(self.stdout)
1097             stdout.append(data)
1098
1099         if self.stderr in rlist:
1100             data = os.read(self.stderr.fileno(), 1024)
1101             if data == "":
1102                 self.stderr.close()
1103                 read_set.remove(self.stderr)
1104             stderr.append(data)
1105     
1106     # All data exchanged.  Translate lists into strings.
1107     if stdout is not None:
1108         stdout = ''.join(stdout)
1109     if stderr is not None:
1110         stderr = ''.join(stderr)
1111
1112     # Translate newlines, if requested.  We cannot let the file
1113     # object do the translation: It is based on stdio, which is
1114     # impossible to combine with select (unless forcing no
1115     # buffering).
1116     if self.universal_newlines and hasattr(file, 'newlines'):
1117         if stdout:
1118             stdout = self._translate_newlines(stdout)
1119         if stderr:
1120             stderr = self._translate_newlines(stderr)
1121
1122     if killed and err_on_timeout:
1123         errcode = self.poll()
1124         raise RuntimeError, ("Operation timed out", errcode, stdout, stderr)
1125     else:
1126         if killed:
1127             self.poll()
1128         else:
1129             self.wait()
1130         return (stdout, stderr)
1131