PlanetLab support toon-up: home_cleanup only nepi folders + make server support longe...
[nepi.git] / src / nepi / util / server.py
1 # -*- coding: utf-8 -*-
2
3 from nepi.util.constants import DeploymentConfiguration as DC
4
5 import base64
6 import errno
7 import os
8 import os.path
9 import resource
10 import select
11 import shutil
12 import signal
13 import socket
14 import sys
15 import subprocess
16 import threading
17 import time
18 import traceback
19 import re
20 import tempfile
21 import defer
22 import functools
23 import collections
24 import hashlib
25
26 CTRL_SOCK = "ctrl.sock"
27 CTRL_PID = "ctrl.pid"
28 STD_ERR = "stderr.log"
29 MAX_FD = 1024
30
31 STOP_MSG = "STOP"
32
33 TRACE = os.environ.get("NEPI_TRACE", "false").lower() in ("true", "1", "on")
34
35 OPENSSH_HAS_PERSIST = None
36
37 if hasattr(os, "devnull"):
38     DEV_NULL = os.devnull
39 else:
40     DEV_NULL = "/dev/null"
41
42 SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
43
44 hostbyname_cache = dict()
45
46 def openssh_has_persist():
47     global OPENSSH_HAS_PERSIST
48     if OPENSSH_HAS_PERSIST is None:
49         proc = subprocess.Popen(["ssh","-v"],
50             stdout = subprocess.PIPE,
51             stderr = subprocess.STDOUT,
52             stdin = open("/dev/null","r") )
53         out,err = proc.communicate()
54         proc.wait()
55         
56         vre = re.compile(r'OpenSSH_(?:[6-9]|5[.][8-9]|5[.][1-9][0-9]|[1-9][0-9]).*', re.I)
57         OPENSSH_HAS_PERSIST = bool(vre.match(out))
58     return OPENSSH_HAS_PERSIST
59
60 def shell_escape(s):
61     """ Escapes strings so that they are safe to use as command-line arguments """
62     if SHELL_SAFE.match(s):
63         # safe string - no escaping needed
64         return s
65     else:
66         # unsafe string - escape
67         def escp(c):
68             if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",'"'):
69                 return c
70             else:
71                 return "'$'\\x%02x''" % (ord(c),)
72         s = ''.join(map(escp,s))
73         return "'%s'" % (s,)
74
75 def eintr_retry(func):
76     import functools
77     @functools.wraps(func)
78     def rv(*p, **kw):
79         retry = kw.pop("_retry", False)
80         for i in xrange(0 if retry else 4):
81             try:
82                 return func(*p, **kw)
83             except (select.error, socket.error), args:
84                 if args[0] == errno.EINTR:
85                     continue
86                 else:
87                     raise 
88             except OSError, e:
89                 if e.errno == errno.EINTR:
90                     continue
91                 else:
92                     raise
93         else:
94             return func(*p, **kw)
95     return rv
96
97 class Server(object):
98     def __init__(self, root_dir = ".", log_level = DC.ERROR_LEVEL, 
99             environment_setup = "", clean_root = False):
100         self._root_dir = root_dir
101         self._clean_root = clean_root
102         self._stop = False
103         self._ctrl_sock = None
104         self._log_level = log_level
105         self._rdbuf = ""
106         self._environment_setup = environment_setup
107
108     def run(self):
109         try:
110             if self.daemonize():
111                 self.post_daemonize()
112                 self.loop()
113                 self.cleanup()
114                 # ref: "os._exit(0)"
115                 # can not return normally after fork beacuse no exec was done.
116                 # This means that if we don't do a os._exit(0) here the code that 
117                 # follows the call to "Server.run()" in the "caller code" will be 
118                 # executed... but by now it has already been executed after the 
119                 # first process (the one that did the first fork) returned.
120                 os._exit(0)
121         except:
122             print >>sys.stderr, "SERVER_ERROR."
123             self.log_error()
124             self.cleanup()
125             os._exit(0)
126         print >>sys.stderr, "SERVER_READY."
127
128     def daemonize(self):
129         # pipes for process synchronization
130         (r, w) = os.pipe()
131         
132         # build root folder
133         root = os.path.normpath(self._root_dir)
134         if self._root_dir not in [".", ""] and os.path.exists(root) \
135                 and self._clean_root:
136             shutil.rmtree(root)
137         if not os.path.exists(root):
138             os.makedirs(root, 0755)
139
140         pid1 = os.fork()
141         if pid1 > 0:
142             os.close(w)
143             while True:
144                 try:
145                     os.read(r, 1)
146                 except OSError, e: # pragma: no cover
147                     if e.errno == errno.EINTR:
148                         continue
149                     else:
150                         raise
151                 break
152             os.close(r)
153             # os.waitpid avoids leaving a <defunc> (zombie) process
154             st = os.waitpid(pid1, 0)[1]
155             if st:
156                 raise RuntimeError("Daemonization failed")
157             # return 0 to inform the caller method that this is not the 
158             # daemonized process
159             return 0
160         os.close(r)
161
162         # Decouple from parent environment.
163         os.chdir(self._root_dir)
164         os.umask(0)
165         os.setsid()
166
167         # fork 2
168         pid2 = os.fork()
169         if pid2 > 0:
170             # see ref: "os._exit(0)"
171             os._exit(0)
172
173         # close all open file descriptors.
174         max_fd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
175         if (max_fd == resource.RLIM_INFINITY):
176             max_fd = MAX_FD
177         for fd in range(3, max_fd):
178             if fd != w:
179                 try:
180                     os.close(fd)
181                 except OSError:
182                     pass
183
184         # Redirect standard file descriptors.
185         stdin = open(DEV_NULL, "r")
186         stderr = stdout = open(STD_ERR, "a", 0)
187         os.dup2(stdin.fileno(), sys.stdin.fileno())
188         # NOTE: sys.stdout.write will still be buffered, even if the file
189         # was opened with 0 buffer
190         os.dup2(stdout.fileno(), sys.stdout.fileno())
191         os.dup2(stderr.fileno(), sys.stderr.fileno())
192         
193         # setup environment
194         if self._environment_setup:
195             # parse environment variables and pass to child process
196             # do it by executing shell commands, in case there's some heavy setup involved
197             envproc = subprocess.Popen(
198                 [ "bash", "-c", 
199                     "( %s python -c 'import os,sys ; print \"\\x01\".join(\"\\x02\".join(map(str,x)) for x in os.environ.iteritems())' ) | tail -1" %
200                         ( self._environment_setup, ) ],
201                 stdin = subprocess.PIPE, 
202                 stdout = subprocess.PIPE,
203                 stderr = subprocess.PIPE
204             )
205             out,err = envproc.communicate()
206
207             # parse new environment
208             if out:
209                 environment = dict(map(lambda x:x.split("\x02"), out.split("\x01")))
210             
211                 # apply to current environment
212                 for name, value in environment.iteritems():
213                     os.environ[name] = value
214                 
215                 # apply pythonpath
216                 if 'PYTHONPATH' in environment:
217                     sys.path = environment['PYTHONPATH'].split(':') + sys.path
218
219         # create control socket
220         self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
221         try:
222             self._ctrl_sock.bind(CTRL_SOCK)
223         except socket.error:
224             # Address in use, check pidfile
225             pid = None
226             try:
227                 pidfile = open(CTRL_PID, "r")
228                 pid = pidfile.read()
229                 pidfile.close()
230                 pid = int(pid)
231             except:
232                 # no pidfile
233                 pass
234             
235             if pid is not None:
236                 # Check process liveliness
237                 if not os.path.exists("/proc/%d" % (pid,)):
238                     # Ok, it's dead, clean the socket
239                     os.remove(CTRL_SOCK)
240             
241             # try again
242             self._ctrl_sock.bind(CTRL_SOCK)
243             
244         self._ctrl_sock.listen(0)
245         
246         # Save pidfile
247         pidfile = open(CTRL_PID, "w")
248         pidfile.write(str(os.getpid()))
249         pidfile.close()
250
251         # let the parent process know that the daemonization is finished
252         os.write(w, "\n")
253         os.close(w)
254         return 1
255
256     def post_daemonize(self):
257         os.environ["NEPI_CONTROLLER_LOGLEVEL"] = self._log_level
258         # QT, for some strange reason, redefines the SIGCHILD handler to write
259         # a \0 to a fd (lets say fileno 'x'), when ever a SIGCHILD is received.
260         # Server dameonization closes all file descriptors from fileno '3',
261         # but the overloaded handler (inherited by the forked process) will
262         # keep trying to write the \0 to fileno 'x', which might have been reused 
263         # after closing, for other operations. This is bad bad bad when fileno 'x'
264         # is in use for communication pouroses, because unexpected \0 start
265         # appearing in the communication messages... this is exactly what happens 
266         # when using netns in daemonized form. Thus, be have no other alternative than
267         # restoring the SIGCHLD handler to the default here.
268         import signal
269         signal.signal(signal.SIGCHLD, signal.SIG_DFL)
270
271     def loop(self):
272         while not self._stop:
273             conn, addr = self._ctrl_sock.accept()
274             self.log_error("ACCEPTED CONNECTION: %s" % (addr,))
275             conn.settimeout(5)
276             while not self._stop:
277                 try:
278                     msg = self.recv_msg(conn)
279                 except socket.timeout, e:
280                     #self.log_error("SERVER recv_msg: connection timedout ")
281                     continue
282                 
283                 if not msg:
284                     self.log_error("CONNECTION LOST")
285                     break
286                     
287                 if msg == STOP_MSG:
288                     self._stop = True
289                     reply = self.stop_action()
290                 else:
291                     reply = self.reply_action(msg)
292                 
293                 try:
294                     self.send_reply(conn, reply)
295                 except socket.error:
296                     self.log_error()
297                     self.log_error("NOTICE: Awaiting for reconnection")
298                     break
299             try:
300                 conn.close()
301             except:
302                 # Doesn't matter
303                 self.log_error()
304
305     def recv_msg(self, conn):
306         data = [self._rdbuf]
307         chunk = data[0]
308         while '\n' not in chunk:
309             try:
310                 chunk = conn.recv(1024)
311             except (OSError, socket.error), e:
312                 if e[0] != errno.EINTR:
313                     raise
314                 else:
315                     continue
316             if chunk:
317                 data.append(chunk)
318             else:
319                 # empty chunk = EOF
320                 break
321         data = ''.join(data).split('\n',1)
322         while len(data) < 2:
323             data.append('')
324         data, self._rdbuf = data
325         
326         decoded = base64.b64decode(data)
327         return decoded.rstrip()
328
329     def send_reply(self, conn, reply):
330         encoded = base64.b64encode(reply)
331         conn.send("%s\n" % encoded)
332        
333     def cleanup(self):
334         try:
335             self._ctrl_sock.close()
336             os.remove(CTRL_SOCK)
337         except:
338             self.log_error()
339
340     def stop_action(self):
341         return "Stopping server"
342
343     def reply_action(self, msg):
344         return "Reply to: %s" % msg
345
346     def log_error(self, text = None, context = ''):
347         if text == None:
348             text = traceback.format_exc()
349         date = time.strftime("%Y-%m-%d %H:%M:%S")
350         if context:
351             context = " (%s)" % (context,)
352         sys.stderr.write("ERROR%s: %s\n%s\n" % (context, date, text))
353         return text
354
355     def log_debug(self, text):
356         if self._log_level == DC.DEBUG_LEVEL:
357             date = time.strftime("%Y-%m-%d %H:%M:%S")
358             sys.stderr.write("DEBUG: %s\n%s\n" % (date, text))
359
360 class Forwarder(object):
361     def __init__(self, root_dir = "."):
362         self._ctrl_sock = None
363         self._root_dir = root_dir
364         self._stop = False
365         self._rdbuf = ""
366
367     def forward(self):
368         self.connect()
369         print >>sys.stderr, "FORWARDER_READY."
370         while not self._stop:
371             data = self.read_data()
372             if not data:
373                 # Connection to client lost
374                 break
375             self.send_to_server(data)
376             
377             data = self.recv_from_server()
378             if not data:
379                 # Connection to server lost
380                 raise IOError, "Connection to server lost while "\
381                     "expecting response"
382             self.write_data(data)
383         self.disconnect()
384
385     def read_data(self):
386         return sys.stdin.readline()
387
388     def write_data(self, data):
389         sys.stdout.write(data)
390         # sys.stdout.write is buffered, this is why we need to do a flush()
391         sys.stdout.flush()
392
393     def send_to_server(self, data):
394         try:
395             self._ctrl_sock.send(data)
396         except (IOError, socket.error), e:
397             if e[0] == errno.EPIPE:
398                 self.connect()
399                 self._ctrl_sock.send(data)
400             else:
401                 raise e
402         encoded = data.rstrip() 
403         msg = base64.b64decode(encoded)
404         if msg == STOP_MSG:
405             self._stop = True
406
407     def recv_from_server(self):
408         data = [self._rdbuf]
409         chunk = data[0]
410         while '\n' not in chunk:
411             try:
412                 chunk = self._ctrl_sock.recv(1024)
413             except (OSError, socket.error), e:
414                 if e[0] != errno.EINTR:
415                     raise
416                 continue
417             if chunk:
418                 data.append(chunk)
419             else:
420                 # empty chunk = EOF
421                 break
422         data = ''.join(data).split('\n',1)
423         while len(data) < 2:
424             data.append('')
425         data, self._rdbuf = data
426         
427         return data+'\n'
428  
429     def connect(self):
430         self.disconnect()
431         self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
432         sock_addr = os.path.join(self._root_dir, CTRL_SOCK)
433         self._ctrl_sock.connect(sock_addr)
434
435     def disconnect(self):
436         try:
437             self._ctrl_sock.close()
438         except:
439             pass
440
441 class Client(object):
442     def __init__(self, root_dir = ".", host = None, port = None, user = None, 
443             agent = None, sudo = False, communication = DC.ACCESS_LOCAL,
444             environment_setup = ""):
445         self.root_dir = root_dir
446         self.addr = (host, port)
447         self.user = user
448         self.agent = agent
449         self.sudo = sudo
450         self.communication = communication
451         self.environment_setup = environment_setup
452         self._stopped = False
453         self._deferreds = collections.deque()
454         self.connect()
455     
456     def __del__(self):
457         if self._process.poll() is None:
458             os.kill(self._process.pid, signal.SIGTERM)
459         self._process.wait()
460         
461     def connect(self):
462         root_dir = self.root_dir
463         (host, port) = self.addr
464         user = self.user
465         agent = self.agent
466         sudo = self.sudo
467         communication = self.communication
468         
469         python_code = "from nepi.util import server;c=server.Forwarder(%r);\
470                 c.forward()" % (root_dir,)
471
472         self._process = popen_python(python_code, 
473                     communication = communication,
474                     host = host, 
475                     port = port, 
476                     user = user, 
477                     agent = agent, 
478                     sudo = sudo, 
479                     environment_setup = self.environment_setup)
480                
481         # Wait for the forwarder to be ready, otherwise nobody
482         # will be able to connect to it
483         err = []
484         helo = "nope"
485         while helo:
486             helo = self._process.stderr.readline()
487             if helo == 'FORWARDER_READY.\n':
488                 break
489             err.append(helo)
490         else:
491             raise AssertionError, "Expected 'FORWARDER_READY.', got: %s" % (''.join(err),)
492         
493     def send_msg(self, msg):
494         encoded = base64.b64encode(msg)
495         data = "%s\n" % encoded
496         
497         try:
498             self._process.stdin.write(data)
499         except (IOError, ValueError):
500             # dead process, poll it to un-zombify
501             self._process.poll()
502             
503             # try again after reconnect
504             # If it fails again, though, give up
505             self.connect()
506             self._process.stdin.write(data)
507
508     def send_stop(self):
509         self.send_msg(STOP_MSG)
510         self._stopped = True
511
512     def defer_reply(self, transform=None):
513         defer_entry = []
514         self._deferreds.append(defer_entry)
515         return defer.Defer(
516             functools.partial(self.read_reply, defer_entry, transform)
517         )
518         
519     def _read_reply(self):
520         data = self._process.stdout.readline()
521         encoded = data.rstrip() 
522         if not encoded:
523             # empty == eof == dead process, poll it to un-zombify
524             self._process.poll()
525             
526             raise RuntimeError, "Forwarder died while awaiting reply: %s" % (self._process.stderr.read(),)
527         return base64.b64decode(encoded)
528     
529     def read_reply(self, which=None, transform=None):
530         # Test to see if someone did it already
531         if which is not None and len(which):
532             # Ok, they did it...
533             # ...just return the deferred value
534             if transform:
535                 return transform(which[0])
536             else:
537                 return which[0]
538         
539         # Process all deferreds until the one we're looking for
540         # or until the queue is empty
541         while self._deferreds:
542             try:
543                 deferred = self._deferreds.popleft()
544             except IndexError:
545                 # emptied
546                 break
547             
548             deferred.append(self._read_reply())
549             if deferred is which:
550                 # We reached the one we were looking for
551                 if transform:
552                     return transform(deferred[0])
553                 else:
554                     return deferred[0]
555         
556         if which is None:
557             # They've requested a synchronous read
558             if transform:
559                 return transform(self._read_reply())
560             else:
561                 return self._read_reply()
562
563 def _make_server_key_args(server_key, host, port, args):
564     """ 
565     Returns a reference to the created temporary file, and adds the
566     corresponding arguments to the given argument list.
567     
568     Make sure to hold onto it until the process is done with the file
569     """
570     if port is not None:
571         host = '%s:%s' % (host,port)
572     # Create a temporary server key file
573     tmp_known_hosts = tempfile.NamedTemporaryFile()
574    
575     hostbyname = hostbyname_cache.get(host)
576     if not hostbyname:
577         hostbyname = socket.gethostbyname(host)
578         hostbyname_cache[host] = hostbyname
579
580     # Add the intended host key
581     tmp_known_hosts.write('%s,%s %s\n' % (host, hostbyname, server_key))
582     
583     # If we're not in strict mode, add user-configured keys
584     if os.environ.get('NEPI_STRICT_AUTH_MODE',"").lower() not in ('1','true','on'):
585         user_hosts_path = '%s/.ssh/known_hosts' % (os.environ.get('HOME',""),)
586         if os.access(user_hosts_path, os.R_OK):
587             f = open(user_hosts_path, "r")
588             tmp_known_hosts.write(f.read())
589             f.close()
590         
591     tmp_known_hosts.flush()
592     
593     args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
594     
595     return tmp_known_hosts
596
597 def make_connkey(user, host, port):
598     connkey = repr((user,host,port)).encode("base64").strip().replace('/','.')
599     if len(connkey) > 60:
600         connkey = hashlib.sha1(connkey).hexdigest()
601     return connkey
602
603 def popen_ssh_command(command, host, port, user, agent, 
604         stdin="", 
605         ident_key = None,
606         server_key = None,
607         tty = False,
608         timeout = None,
609         retry = 0,
610         err_on_timeout = True,
611         connect_timeout = 1200,
612         persistent = True,
613         hostip = None):
614     """
615     Executes a remote commands, returns ((stdout,stderr),process)
616     """
617     if TRACE:
618         print "ssh", host, command
619     
620     tmp_known_hosts = None
621     connkey = make_connkey(user,host,port)
622     args = ['ssh', '-C',
623             # Don't bother with localhost. Makes test easier
624             '-o', 'NoHostAuthenticationForLocalhost=yes',
625             '-o', 'ConnectTimeout=%d' % (int(connect_timeout),),
626             '-o', 'ConnectionAttempts=3',
627             '-o', 'ServerAliveInterval=30',
628             '-o', 'TCPKeepAlive=yes',
629             '-l', user, hostip or host]
630     if persistent and openssh_has_persist():
631         args.extend([
632             '-o', 'ControlMaster=auto',
633             '-o', 'ControlPath=/tmp/nepi_ssh_pl_%s' % ( connkey, ),
634             '-o', 'ControlPersist=60' ])
635     if agent:
636         args.append('-A')
637     if port:
638         args.append('-p%d' % port)
639     if ident_key:
640         args.extend(('-i', ident_key))
641     if tty:
642         args.append('-t')
643     if server_key:
644         # Create a temporary server key file
645         tmp_known_hosts = _make_server_key_args(
646             server_key, host, port, args)
647     args.append(command)
648
649     for x in xrange(retry or 3):
650         # connects to the remote host and starts a remote connection
651         proc = subprocess.Popen(args, 
652                 stdout = subprocess.PIPE,
653                 stdin = subprocess.PIPE, 
654                 stderr = subprocess.PIPE)
655         
656         # attach tempfile object to the process, to make sure the file stays
657         # alive until the process is finished with it
658         proc._known_hosts = tmp_known_hosts
659         
660         try:
661             out, err = _communicate(proc, stdin, timeout, err_on_timeout)
662             if proc.poll():
663                 if err.strip().startswith('ssh: ') or err.strip().startswith('mux_client_hello_exchange: '):
664                     # SSH error, can safely retry
665                     continue
666                 elif retry:
667                     # Probably timed out or plain failed but can retry
668                     continue
669             break
670         except RuntimeError,e:
671             if retry <= 0:
672                 raise
673             if TRACE:
674                 print " timedout -> ", e.args
675             retry -= 1
676         
677     if TRACE:
678         print " -> ", out, err
679
680     return ((out, err), proc)
681
682 def popen_scp(source, dest, 
683         port = None, 
684         agent = None, 
685         recursive = False,
686         ident_key = None,
687         server_key = None):
688     """
689     Copies from/to remote sites.
690     
691     Source and destination should have the user and host encoded
692     as per scp specs.
693     
694     If source is a file object, a special mode will be used to
695     create the remote file with the same contents.
696     
697     If dest is a file object, the remote file (source) will be
698     read and written into dest.
699     
700     In these modes, recursive cannot be True.
701     
702     Source can be a list of files to copy to a single destination,
703     in which case it is advised that the destination be a folder.
704     """
705     
706     if TRACE:
707         print "scp", source, dest
708     
709     if isinstance(source, file) and source.tell() == 0:
710         source = source.name
711     elif hasattr(source, 'read'):
712         tmp = tempfile.NamedTemporaryFile()
713         while True:
714             buf = source.read(65536)
715             if buf:
716                 tmp.write(buf)
717             else:
718                 break
719         tmp.seek(0)
720         source = tmp.name
721     
722     if isinstance(source, file) or isinstance(dest, file) \
723             or hasattr(source, 'read')  or hasattr(dest, 'write'):
724         assert not recursive
725         
726         # Parse source/destination as <user>@<server>:<path>
727         if isinstance(dest, basestring) and ':' in dest:
728             remspec, path = dest.split(':',1)
729         elif isinstance(source, basestring) and ':' in source:
730             remspec, path = source.split(':',1)
731         else:
732             raise ValueError, "Both endpoints cannot be local"
733         user,host = remspec.rsplit('@',1)
734         tmp_known_hosts = None
735         
736         connkey = make_connkey(user,host,port)
737         args = ['ssh', '-l', user, '-C',
738                 # Don't bother with localhost. Makes test easier
739                 '-o', 'NoHostAuthenticationForLocalhost=yes',
740                 '-o', 'ConnectTimeout=1200',
741                 '-o', 'ConnectionAttempts=3',
742                 '-o', 'ServerAliveInterval=30',
743                 '-o', 'TCPKeepAlive=yes',
744                 host ]
745         if openssh_has_persist():
746             args.extend([
747                 '-o', 'ControlMaster=auto',
748                 '-o', 'ControlPath=/tmp/nepi_ssh_pl_%s' % ( connkey, ),
749                 '-o', 'ControlPersist=60' ])
750         if port:
751             args.append('-P%d' % port)
752         if ident_key:
753             args.extend(('-i', ident_key))
754         if server_key:
755             # Create a temporary server key file
756             tmp_known_hosts = _make_server_key_args(
757                 server_key, host, port, args)
758         
759         if isinstance(source, file) or hasattr(source, 'read'):
760             args.append('cat > %s' % (shell_escape(path),))
761         elif isinstance(dest, file) or hasattr(dest, 'write'):
762             args.append('cat %s' % (shell_escape(path),))
763         else:
764             raise AssertionError, "Unreachable code reached! :-Q"
765         
766         # connects to the remote host and starts a remote connection
767         if isinstance(source, file):
768             proc = subprocess.Popen(args, 
769                     stdout = open('/dev/null','w'),
770                     stderr = subprocess.PIPE,
771                     stdin = source)
772             err = proc.stderr.read()
773             proc._known_hosts = tmp_known_hosts
774             eintr_retry(proc.wait)()
775             return ((None,err), proc)
776         elif isinstance(dest, file):
777             proc = subprocess.Popen(args, 
778                     stdout = open('/dev/null','w'),
779                     stderr = subprocess.PIPE,
780                     stdin = source)
781             err = proc.stderr.read()
782             proc._known_hosts = tmp_known_hosts
783             eintr_retry(proc.wait)()
784             return ((None,err), proc)
785         elif hasattr(source, 'read'):
786             # file-like (but not file) source
787             proc = subprocess.Popen(args, 
788                     stdout = open('/dev/null','w'),
789                     stderr = subprocess.PIPE,
790                     stdin = subprocess.PIPE)
791             
792             buf = None
793             err = []
794             while True:
795                 if not buf:
796                     buf = source.read(4096)
797                 if not buf:
798                     #EOF
799                     break
800                 
801                 rdrdy, wrdy, broken = select.select(
802                     [proc.stderr],
803                     [proc.stdin],
804                     [proc.stderr,proc.stdin])
805                 
806                 if proc.stderr in rdrdy:
807                     # use os.read for fully unbuffered behavior
808                     err.append(os.read(proc.stderr.fileno(), 4096))
809                 
810                 if proc.stdin in wrdy:
811                     proc.stdin.write(buf)
812                     buf = None
813                 
814                 if broken:
815                     break
816             proc.stdin.close()
817             err.append(proc.stderr.read())
818                 
819             proc._known_hosts = tmp_known_hosts
820             eintr_retry(proc.wait)()
821             return ((None,''.join(err)), proc)
822         elif hasattr(dest, 'write'):
823             # file-like (but not file) dest
824             proc = subprocess.Popen(args, 
825                     stdout = subprocess.PIPE,
826                     stderr = subprocess.PIPE,
827                     stdin = open('/dev/null','w'))
828             
829             buf = None
830             err = []
831             while True:
832                 rdrdy, wrdy, broken = select.select(
833                     [proc.stderr, proc.stdout],
834                     [],
835                     [proc.stderr, proc.stdout])
836                 
837                 if proc.stderr in rdrdy:
838                     # use os.read for fully unbuffered behavior
839                     err.append(os.read(proc.stderr.fileno(), 4096))
840                 
841                 if proc.stdout in rdrdy:
842                     # use os.read for fully unbuffered behavior
843                     buf = os.read(proc.stdout.fileno(), 4096)
844                     dest.write(buf)
845                     
846                     if not buf:
847                         #EOF
848                         break
849                 
850                 if broken:
851                     break
852             err.append(proc.stderr.read())
853                 
854             proc._known_hosts = tmp_known_hosts
855             eintr_retry(proc.wait)()
856             return ((None,''.join(err)), proc)
857         else:
858             raise AssertionError, "Unreachable code reached! :-Q"
859     else:
860         # Parse destination as <user>@<server>:<path>
861         if isinstance(dest, basestring) and ':' in dest:
862             remspec, path = dest.split(':',1)
863         elif isinstance(source, basestring) and ':' in source:
864             remspec, path = source.split(':',1)
865         else:
866             raise ValueError, "Both endpoints cannot be local"
867         user,host = remspec.rsplit('@',1)
868         
869         # plain scp
870         tmp_known_hosts = None
871         args = ['scp', '-q', '-p', '-C',
872                 # Don't bother with localhost. Makes test easier
873                 '-o', 'NoHostAuthenticationForLocalhost=yes',
874                 '-o', 'ConnectTimeout=1200',
875                 '-o', 'ConnectionAttempts=3',
876                 '-o', 'ServerAliveInterval=30',
877                 '-o', 'TCPKeepAlive=yes' ]
878                 
879         if port:
880             args.append('-P%d' % port)
881         if recursive:
882             args.append('-r')
883         if ident_key:
884             args.extend(('-i', ident_key))
885         if server_key:
886             # Create a temporary server key file
887             tmp_known_hosts = _make_server_key_args(
888                 server_key, host, port, args)
889         if isinstance(source,list):
890             args.extend(source)
891         else:
892             if openssh_has_persist():
893                 connkey = make_connkey(user,host,port)
894                 args.extend([
895                     '-o', 'ControlMaster=no',
896                     '-o', 'ControlPath=/tmp/nepi_ssh_pl_%s' % ( connkey, ) ])
897             args.append(source)
898         args.append(dest)
899
900         # connects to the remote host and starts a remote connection
901         proc = subprocess.Popen(args, 
902                 stdout = subprocess.PIPE,
903                 stdin = subprocess.PIPE, 
904                 stderr = subprocess.PIPE)
905         proc._known_hosts = tmp_known_hosts
906         
907         comm = proc.communicate()
908         eintr_retry(proc.wait)()
909         return (comm, proc)
910
911 def decode_and_execute():
912     # The python code we want to execute might have characters that 
913     # are not compatible with the 'inline' mode we are using. To avoid
914     # problems we receive the encoded python code in base64 as a input 
915     # stream and decode it for execution.
916     import base64, os
917     cmd = ""
918     while True:
919         try:
920             cmd += os.read(0, 1)# one byte from stdin
921         except OSError, e:            
922             if e.errno == errno.EINTR:
923                 continue
924             else:
925                 raise
926         if cmd[-1] == "\n": 
927             break
928     cmd = base64.b64decode(cmd)
929     # Uncomment for debug
930     #os.write(2, "Executing python code: %s\n" % cmd)
931     os.write(1, "OK\n") # send a sync message
932     exec(cmd)
933
934 def popen_python(python_code, 
935         communication = DC.ACCESS_LOCAL,
936         host = None, 
937         port = None, 
938         user = None, 
939         agent = False, 
940         python_path = None,
941         ident_key = None,
942         server_key = None,
943         tty = False,
944         sudo = False, 
945         environment_setup = ""):
946
947     cmd = ""
948     if python_path:
949         python_path.replace("'", r"'\''")
950         cmd = """PYTHONPATH="$PYTHONPATH":'%s' """ % python_path
951         cmd += " ; "
952     if environment_setup:
953         cmd += environment_setup
954         cmd += " ; "
955     # Uncomment for debug (to run everything under strace)
956     # We had to verify if strace works (cannot nest them)
957     #cmd += "if strace echo >/dev/null 2>&1; then CMD='strace -ff -tt -s 200 -o strace.out'; else CMD=''; fi\n"
958     #cmd += "$CMD "
959     #cmd += "strace -f -tt -s 200 -o strace$$.out "
960     import nepi
961     cmd += "python -c 'import sys; sys.path.insert(0,%s); from nepi.util import server; server.decode_and_execute()'" % (
962         repr(os.path.dirname(os.path.dirname(nepi.__file__))).replace("'",'"'),
963     )
964
965     if sudo:
966         if ';' in cmd:
967             cmd = "sudo bash -c " + shell_escape(cmd)
968         else:
969             cmd = "sudo " + cmd
970
971     if communication == DC.ACCESS_SSH:
972         tmp_known_hosts = None
973         args = ['ssh', '-C',
974                 # Don't bother with localhost. Makes test easier
975                 '-o', 'NoHostAuthenticationForLocalhost=yes',
976                 '-o', 'ConnectionAttempts=3',
977                 '-o', 'ServerAliveInterval=30',
978                 '-o', 'TCPKeepAlive=yes',
979                 '-l', user, host]
980         if agent:
981             args.append('-A')
982         if port:
983             args.append('-p%d' % port)
984         if ident_key:
985             args.extend(('-i', ident_key))
986         if tty:
987             args.append('-t')
988         if server_key:
989             # Create a temporary server key file
990             tmp_known_hosts = _make_server_key_args(
991                 server_key, host, port, args)
992         args.append(cmd)
993     else:
994         args = [ "/bin/bash", "-c", cmd ]
995
996     # connects to the remote host and starts a remote
997     proc = subprocess.Popen(args,
998             shell = False, 
999             stdout = subprocess.PIPE,
1000             stdin = subprocess.PIPE, 
1001             stderr = subprocess.PIPE)
1002
1003     if communication == DC.ACCESS_SSH:
1004         proc._known_hosts = tmp_known_hosts
1005
1006     # send the command to execute
1007     os.write(proc.stdin.fileno(),
1008             base64.b64encode(python_code) + "\n")
1009  
1010     while True: 
1011         try:
1012             msg = os.read(proc.stdout.fileno(), 3)
1013             break
1014         except OSError, e:            
1015             if e.errno == errno.EINTR:
1016                 continue
1017             else:
1018                 raise
1019     
1020     if msg != "OK\n":
1021         raise RuntimeError, "Failed to start remote python interpreter: \nout:\n%s%s\nerr:\n%s" % (
1022             msg, proc.stdout.read(), proc.stderr.read())
1023
1024     return proc
1025
1026 # POSIX
1027 def _communicate(self, input, timeout=None, err_on_timeout=True):
1028     read_set = []
1029     write_set = []
1030     stdout = None # Return
1031     stderr = None # Return
1032     
1033     killed = False
1034     
1035     if timeout is not None:
1036         timelimit = time.time() + timeout
1037         killtime = timelimit + 4
1038         bailtime = timelimit + 4
1039
1040     if self.stdin:
1041         # Flush stdio buffer.  This might block, if the user has
1042         # been writing to .stdin in an uncontrolled fashion.
1043         self.stdin.flush()
1044         if input:
1045             write_set.append(self.stdin)
1046         else:
1047             self.stdin.close()
1048     if self.stdout:
1049         read_set.append(self.stdout)
1050         stdout = []
1051     if self.stderr:
1052         read_set.append(self.stderr)
1053         stderr = []
1054
1055     input_offset = 0
1056     while read_set or write_set:
1057         if timeout is not None:
1058             curtime = time.time()
1059             if timeout is None or curtime > timelimit:
1060                 if curtime > bailtime:
1061                     break
1062                 elif curtime > killtime:
1063                     signum = signal.SIGKILL
1064                 else:
1065                     signum = signal.SIGTERM
1066                 # Lets kill it
1067                 os.kill(self.pid, signum)
1068                 select_timeout = 0.5
1069             else:
1070                 select_timeout = timelimit - curtime + 0.1
1071         else:
1072             select_timeout = 1.0
1073         
1074         if select_timeout > 1.0:
1075             select_timeout = 1.0
1076             
1077         try:
1078             rlist, wlist, xlist = select.select(read_set, write_set, [], select_timeout)
1079         except select.error,e:
1080             if e[0] != 4:
1081                 raise
1082             else:
1083                 continue
1084         
1085         if not rlist and not wlist and not xlist and self.poll() is not None:
1086             # timeout and process exited, say bye
1087             break
1088
1089         if self.stdin in wlist:
1090             # When select has indicated that the file is writable,
1091             # we can write up to PIPE_BUF bytes without risk
1092             # blocking.  POSIX defines PIPE_BUF >= 512
1093             bytes_written = os.write(self.stdin.fileno(), buffer(input, input_offset, 512))
1094             input_offset += bytes_written
1095             if input_offset >= len(input):
1096                 self.stdin.close()
1097                 write_set.remove(self.stdin)
1098
1099         if self.stdout in rlist:
1100             data = os.read(self.stdout.fileno(), 1024)
1101             if data == "":
1102                 self.stdout.close()
1103                 read_set.remove(self.stdout)
1104             stdout.append(data)
1105
1106         if self.stderr in rlist:
1107             data = os.read(self.stderr.fileno(), 1024)
1108             if data == "":
1109                 self.stderr.close()
1110                 read_set.remove(self.stderr)
1111             stderr.append(data)
1112     
1113     # All data exchanged.  Translate lists into strings.
1114     if stdout is not None:
1115         stdout = ''.join(stdout)
1116     if stderr is not None:
1117         stderr = ''.join(stderr)
1118
1119     # Translate newlines, if requested.  We cannot let the file
1120     # object do the translation: It is based on stdio, which is
1121     # impossible to combine with select (unless forcing no
1122     # buffering).
1123     if self.universal_newlines and hasattr(file, 'newlines'):
1124         if stdout:
1125             stdout = self._translate_newlines(stdout)
1126         if stderr:
1127             stderr = self._translate_newlines(stderr)
1128
1129     if killed and err_on_timeout:
1130         errcode = self.poll()
1131         raise RuntimeError, ("Operation timed out", errcode, stdout, stderr)
1132     else:
1133         if killed:
1134             self.poll()
1135         else:
1136             self.wait()
1137         return (stdout, stderr)
1138