PlanetLab optimizations. Use server cache to avoid querying socket.gethostbyname...
[nepi.git] / src / nepi / util / server.py
1 # -*- coding: utf-8 -*-
2
3 from nepi.util.constants import DeploymentConfiguration as DC
4
5 import base64
6 import errno
7 import os
8 import os.path
9 import resource
10 import select
11 import shutil
12 import signal
13 import socket
14 import sys
15 import subprocess
16 import threading
17 import time
18 import traceback
19 import re
20 import tempfile
21 import defer
22 import functools
23 import collections
24 import hashlib
25
26 CTRL_SOCK = "ctrl.sock"
27 CTRL_PID = "ctrl.pid"
28 STD_ERR = "stderr.log"
29 MAX_FD = 1024
30
31 STOP_MSG = "STOP"
32
33 TRACE = os.environ.get("NEPI_TRACE", "false").lower() in ("true", "1", "on")
34
35 OPENSSH_HAS_PERSIST = None
36
37 if hasattr(os, "devnull"):
38     DEV_NULL = os.devnull
39 else:
40     DEV_NULL = "/dev/null"
41
42 SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
43
44 hostbyname_cache = dict()
45
46 def gethostbyname(host):
47     hostbyname = hostbyname_cache.get(host)
48     if not hostbyname:
49         hostbyname = socket.gethostbyname(host)
50         hostbyname_cache[host] = hostbyname
51     return hostbyname
52
53 def openssh_has_persist():
54     global OPENSSH_HAS_PERSIST
55     if OPENSSH_HAS_PERSIST is None:
56         proc = subprocess.Popen(["ssh","-v"],
57             stdout = subprocess.PIPE,
58             stderr = subprocess.STDOUT,
59             stdin = open("/dev/null","r") )
60         out,err = proc.communicate()
61         proc.wait()
62         
63         vre = re.compile(r'OpenSSH_(?:[6-9]|5[.][8-9]|5[.][1-9][0-9]|[1-9][0-9]).*', re.I)
64         OPENSSH_HAS_PERSIST = bool(vre.match(out))
65     return OPENSSH_HAS_PERSIST
66
67 def shell_escape(s):
68     """ Escapes strings so that they are safe to use as command-line arguments """
69     if SHELL_SAFE.match(s):
70         # safe string - no escaping needed
71         return s
72     else:
73         # unsafe string - escape
74         def escp(c):
75             if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",'"'):
76                 return c
77             else:
78                 return "'$'\\x%02x''" % (ord(c),)
79         s = ''.join(map(escp,s))
80         return "'%s'" % (s,)
81
82 def eintr_retry(func):
83     import functools
84     @functools.wraps(func)
85     def rv(*p, **kw):
86         retry = kw.pop("_retry", False)
87         for i in xrange(0 if retry else 4):
88             try:
89                 return func(*p, **kw)
90             except (select.error, socket.error), args:
91                 if args[0] == errno.EINTR:
92                     continue
93                 else:
94                     raise 
95             except OSError, e:
96                 if e.errno == errno.EINTR:
97                     continue
98                 else:
99                     raise
100         else:
101             return func(*p, **kw)
102     return rv
103
104 class Server(object):
105     def __init__(self, root_dir = ".", log_level = DC.ERROR_LEVEL, 
106             environment_setup = "", clean_root = False):
107         self._root_dir = root_dir
108         self._clean_root = clean_root
109         self._stop = False
110         self._ctrl_sock = None
111         self._log_level = log_level
112         self._rdbuf = ""
113         self._environment_setup = environment_setup
114
115     def run(self):
116         try:
117             if self.daemonize():
118                 self.post_daemonize()
119                 self.loop()
120                 self.cleanup()
121                 # ref: "os._exit(0)"
122                 # can not return normally after fork beacuse no exec was done.
123                 # This means that if we don't do a os._exit(0) here the code that 
124                 # follows the call to "Server.run()" in the "caller code" will be 
125                 # executed... but by now it has already been executed after the 
126                 # first process (the one that did the first fork) returned.
127                 os._exit(0)
128         except:
129             print >>sys.stderr, "SERVER_ERROR."
130             self.log_error()
131             self.cleanup()
132             os._exit(0)
133         print >>sys.stderr, "SERVER_READY."
134
135     def daemonize(self):
136         # pipes for process synchronization
137         (r, w) = os.pipe()
138         
139         # build root folder
140         root = os.path.normpath(self._root_dir)
141         if self._root_dir not in [".", ""] and os.path.exists(root) \
142                 and self._clean_root:
143             shutil.rmtree(root)
144         if not os.path.exists(root):
145             os.makedirs(root, 0755)
146
147         pid1 = os.fork()
148         if pid1 > 0:
149             os.close(w)
150             while True:
151                 try:
152                     os.read(r, 1)
153                 except OSError, e: # pragma: no cover
154                     if e.errno == errno.EINTR:
155                         continue
156                     else:
157                         raise
158                 break
159             os.close(r)
160             # os.waitpid avoids leaving a <defunc> (zombie) process
161             st = os.waitpid(pid1, 0)[1]
162             if st:
163                 raise RuntimeError("Daemonization failed")
164             # return 0 to inform the caller method that this is not the 
165             # daemonized process
166             return 0
167         os.close(r)
168
169         # Decouple from parent environment.
170         os.chdir(self._root_dir)
171         os.umask(0)
172         os.setsid()
173
174         # fork 2
175         pid2 = os.fork()
176         if pid2 > 0:
177             # see ref: "os._exit(0)"
178             os._exit(0)
179
180         # close all open file descriptors.
181         max_fd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
182         if (max_fd == resource.RLIM_INFINITY):
183             max_fd = MAX_FD
184         for fd in range(3, max_fd):
185             if fd != w:
186                 try:
187                     os.close(fd)
188                 except OSError:
189                     pass
190
191         # Redirect standard file descriptors.
192         stdin = open(DEV_NULL, "r")
193         stderr = stdout = open(STD_ERR, "a", 0)
194         os.dup2(stdin.fileno(), sys.stdin.fileno())
195         # NOTE: sys.stdout.write will still be buffered, even if the file
196         # was opened with 0 buffer
197         os.dup2(stdout.fileno(), sys.stdout.fileno())
198         os.dup2(stderr.fileno(), sys.stderr.fileno())
199         
200         # setup environment
201         if self._environment_setup:
202             # parse environment variables and pass to child process
203             # do it by executing shell commands, in case there's some heavy setup involved
204             envproc = subprocess.Popen(
205                 [ "bash", "-c", 
206                     "( %s python -c 'import os,sys ; print \"\\x01\".join(\"\\x02\".join(map(str,x)) for x in os.environ.iteritems())' ) | tail -1" %
207                         ( self._environment_setup, ) ],
208                 stdin = subprocess.PIPE, 
209                 stdout = subprocess.PIPE,
210                 stderr = subprocess.PIPE
211             )
212             out,err = envproc.communicate()
213
214             # parse new environment
215             if out:
216                 environment = dict(map(lambda x:x.split("\x02"), out.split("\x01")))
217             
218                 # apply to current environment
219                 for name, value in environment.iteritems():
220                     os.environ[name] = value
221                 
222                 # apply pythonpath
223                 if 'PYTHONPATH' in environment:
224                     sys.path = environment['PYTHONPATH'].split(':') + sys.path
225
226         # create control socket
227         self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
228         try:
229             self._ctrl_sock.bind(CTRL_SOCK)
230         except socket.error:
231             # Address in use, check pidfile
232             pid = None
233             try:
234                 pidfile = open(CTRL_PID, "r")
235                 pid = pidfile.read()
236                 pidfile.close()
237                 pid = int(pid)
238             except:
239                 # no pidfile
240                 pass
241             
242             if pid is not None:
243                 # Check process liveliness
244                 if not os.path.exists("/proc/%d" % (pid,)):
245                     # Ok, it's dead, clean the socket
246                     os.remove(CTRL_SOCK)
247             
248             # try again
249             self._ctrl_sock.bind(CTRL_SOCK)
250             
251         self._ctrl_sock.listen(0)
252         
253         # Save pidfile
254         pidfile = open(CTRL_PID, "w")
255         pidfile.write(str(os.getpid()))
256         pidfile.close()
257
258         # let the parent process know that the daemonization is finished
259         os.write(w, "\n")
260         os.close(w)
261         return 1
262
263     def post_daemonize(self):
264         os.environ["NEPI_CONTROLLER_LOGLEVEL"] = self._log_level
265         # QT, for some strange reason, redefines the SIGCHILD handler to write
266         # a \0 to a fd (lets say fileno 'x'), when ever a SIGCHILD is received.
267         # Server dameonization closes all file descriptors from fileno '3',
268         # but the overloaded handler (inherited by the forked process) will
269         # keep trying to write the \0 to fileno 'x', which might have been reused 
270         # after closing, for other operations. This is bad bad bad when fileno 'x'
271         # is in use for communication pouroses, because unexpected \0 start
272         # appearing in the communication messages... this is exactly what happens 
273         # when using netns in daemonized form. Thus, be have no other alternative than
274         # restoring the SIGCHLD handler to the default here.
275         import signal
276         signal.signal(signal.SIGCHLD, signal.SIG_DFL)
277
278     def loop(self):
279         while not self._stop:
280             conn, addr = self._ctrl_sock.accept()
281             self.log_error("ACCEPTED CONNECTION: %s" % (addr,))
282             conn.settimeout(5)
283             while not self._stop:
284                 try:
285                     msg = self.recv_msg(conn)
286                 except socket.timeout, e:
287                     #self.log_error("SERVER recv_msg: connection timedout ")
288                     continue
289                 
290                 if not msg:
291                     self.log_error("CONNECTION LOST")
292                     break
293                     
294                 if msg == STOP_MSG:
295                     self._stop = True
296                     reply = self.stop_action()
297                 else:
298                     reply = self.reply_action(msg)
299                 
300                 try:
301                     self.send_reply(conn, reply)
302                 except socket.error:
303                     self.log_error()
304                     self.log_error("NOTICE: Awaiting for reconnection")
305                     break
306             try:
307                 conn.close()
308             except:
309                 # Doesn't matter
310                 self.log_error()
311
312     def recv_msg(self, conn):
313         data = [self._rdbuf]
314         chunk = data[0]
315         while '\n' not in chunk:
316             try:
317                 chunk = conn.recv(1024)
318             except (OSError, socket.error), e:
319                 if e[0] != errno.EINTR:
320                     raise
321                 else:
322                     continue
323             if chunk:
324                 data.append(chunk)
325             else:
326                 # empty chunk = EOF
327                 break
328         data = ''.join(data).split('\n',1)
329         while len(data) < 2:
330             data.append('')
331         data, self._rdbuf = data
332         
333         decoded = base64.b64decode(data)
334         return decoded.rstrip()
335
336     def send_reply(self, conn, reply):
337         encoded = base64.b64encode(reply)
338         conn.send("%s\n" % encoded)
339        
340     def cleanup(self):
341         try:
342             self._ctrl_sock.close()
343             os.remove(CTRL_SOCK)
344         except:
345             self.log_error()
346
347     def stop_action(self):
348         return "Stopping server"
349
350     def reply_action(self, msg):
351         return "Reply to: %s" % msg
352
353     def log_error(self, text = None, context = ''):
354         if text == None:
355             text = traceback.format_exc()
356         date = time.strftime("%Y-%m-%d %H:%M:%S")
357         if context:
358             context = " (%s)" % (context,)
359         sys.stderr.write("ERROR%s: %s\n%s\n" % (context, date, text))
360         return text
361
362     def log_debug(self, text):
363         if self._log_level == DC.DEBUG_LEVEL:
364             date = time.strftime("%Y-%m-%d %H:%M:%S")
365             sys.stderr.write("DEBUG: %s\n%s\n" % (date, text))
366
367 class Forwarder(object):
368     def __init__(self, root_dir = "."):
369         self._ctrl_sock = None
370         self._root_dir = root_dir
371         self._stop = False
372         self._rdbuf = ""
373
374     def forward(self):
375         self.connect()
376         print >>sys.stderr, "FORWARDER_READY."
377         while not self._stop:
378             data = self.read_data()
379             if not data:
380                 # Connection to client lost
381                 break
382             self.send_to_server(data)
383             
384             data = self.recv_from_server()
385             if not data:
386                 # Connection to server lost
387                 raise IOError, "Connection to server lost while "\
388                     "expecting response"
389             self.write_data(data)
390         self.disconnect()
391
392     def read_data(self):
393         return sys.stdin.readline()
394
395     def write_data(self, data):
396         sys.stdout.write(data)
397         # sys.stdout.write is buffered, this is why we need to do a flush()
398         sys.stdout.flush()
399
400     def send_to_server(self, data):
401         try:
402             self._ctrl_sock.send(data)
403         except (IOError, socket.error), e:
404             if e[0] == errno.EPIPE:
405                 self.connect()
406                 self._ctrl_sock.send(data)
407             else:
408                 raise e
409         encoded = data.rstrip() 
410         msg = base64.b64decode(encoded)
411         if msg == STOP_MSG:
412             self._stop = True
413
414     def recv_from_server(self):
415         data = [self._rdbuf]
416         chunk = data[0]
417         while '\n' not in chunk:
418             try:
419                 chunk = self._ctrl_sock.recv(1024)
420             except (OSError, socket.error), e:
421                 if e[0] != errno.EINTR:
422                     raise
423                 continue
424             if chunk:
425                 data.append(chunk)
426             else:
427                 # empty chunk = EOF
428                 break
429         data = ''.join(data).split('\n',1)
430         while len(data) < 2:
431             data.append('')
432         data, self._rdbuf = data
433         
434         return data+'\n'
435  
436     def connect(self):
437         self.disconnect()
438         self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
439         sock_addr = os.path.join(self._root_dir, CTRL_SOCK)
440         self._ctrl_sock.connect(sock_addr)
441
442     def disconnect(self):
443         try:
444             self._ctrl_sock.close()
445         except:
446             pass
447
448 class Client(object):
449     def __init__(self, root_dir = ".", host = None, port = None, user = None, 
450             agent = None, sudo = False, communication = DC.ACCESS_LOCAL,
451             environment_setup = ""):
452         self.root_dir = root_dir
453         self.addr = (host, port)
454         self.user = user
455         self.agent = agent
456         self.sudo = sudo
457         self.communication = communication
458         self.environment_setup = environment_setup
459         self._stopped = False
460         self._deferreds = collections.deque()
461         self.connect()
462     
463     def __del__(self):
464         if self._process.poll() is None:
465             os.kill(self._process.pid, signal.SIGTERM)
466         self._process.wait()
467         
468     def connect(self):
469         root_dir = self.root_dir
470         (host, port) = self.addr
471         user = self.user
472         agent = self.agent
473         sudo = self.sudo
474         communication = self.communication
475         
476         python_code = "from nepi.util import server;c=server.Forwarder(%r);\
477                 c.forward()" % (root_dir,)
478
479         self._process = popen_python(python_code, 
480                     communication = communication,
481                     host = host, 
482                     port = port, 
483                     user = user, 
484                     agent = agent, 
485                     sudo = sudo, 
486                     environment_setup = self.environment_setup)
487                
488         # Wait for the forwarder to be ready, otherwise nobody
489         # will be able to connect to it
490         err = []
491         helo = "nope"
492         while helo:
493             helo = self._process.stderr.readline()
494             if helo == 'FORWARDER_READY.\n':
495                 break
496             err.append(helo)
497         else:
498             raise AssertionError, "Expected 'FORWARDER_READY.', got: %s" % (''.join(err),)
499         
500     def send_msg(self, msg):
501         encoded = base64.b64encode(msg)
502         data = "%s\n" % encoded
503         
504         try:
505             self._process.stdin.write(data)
506         except (IOError, ValueError):
507             # dead process, poll it to un-zombify
508             self._process.poll()
509             
510             # try again after reconnect
511             # If it fails again, though, give up
512             self.connect()
513             self._process.stdin.write(data)
514
515     def send_stop(self):
516         self.send_msg(STOP_MSG)
517         self._stopped = True
518
519     def defer_reply(self, transform=None):
520         defer_entry = []
521         self._deferreds.append(defer_entry)
522         return defer.Defer(
523             functools.partial(self.read_reply, defer_entry, transform)
524         )
525         
526     def _read_reply(self):
527         data = self._process.stdout.readline()
528         encoded = data.rstrip() 
529         if not encoded:
530             # empty == eof == dead process, poll it to un-zombify
531             self._process.poll()
532             
533             raise RuntimeError, "Forwarder died while awaiting reply: %s" % (self._process.stderr.read(),)
534         return base64.b64decode(encoded)
535     
536     def read_reply(self, which=None, transform=None):
537         # Test to see if someone did it already
538         if which is not None and len(which):
539             # Ok, they did it...
540             # ...just return the deferred value
541             if transform:
542                 return transform(which[0])
543             else:
544                 return which[0]
545         
546         # Process all deferreds until the one we're looking for
547         # or until the queue is empty
548         while self._deferreds:
549             try:
550                 deferred = self._deferreds.popleft()
551             except IndexError:
552                 # emptied
553                 break
554             
555             deferred.append(self._read_reply())
556             if deferred is which:
557                 # We reached the one we were looking for
558                 if transform:
559                     return transform(deferred[0])
560                 else:
561                     return deferred[0]
562         
563         if which is None:
564             # They've requested a synchronous read
565             if transform:
566                 return transform(self._read_reply())
567             else:
568                 return self._read_reply()
569
570 def _make_server_key_args(server_key, host, port, args):
571     """ 
572     Returns a reference to the created temporary file, and adds the
573     corresponding arguments to the given argument list.
574     
575     Make sure to hold onto it until the process is done with the file
576     """
577     if port is not None:
578         host = '%s:%s' % (host,port)
579     # Create a temporary server key file
580     tmp_known_hosts = tempfile.NamedTemporaryFile()
581    
582     hostbyname = gethostbyname(host) 
583
584     # Add the intended host key
585     tmp_known_hosts.write('%s,%s %s\n' % (host, hostbyname, server_key))
586     
587     # If we're not in strict mode, add user-configured keys
588     if os.environ.get('NEPI_STRICT_AUTH_MODE',"").lower() not in ('1','true','on'):
589         user_hosts_path = '%s/.ssh/known_hosts' % (os.environ.get('HOME',""),)
590         if os.access(user_hosts_path, os.R_OK):
591             f = open(user_hosts_path, "r")
592             tmp_known_hosts.write(f.read())
593             f.close()
594         
595     tmp_known_hosts.flush()
596     
597     args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
598     
599     return tmp_known_hosts
600
601 def popen_ssh_command(command, host, port, user, agent, 
602         stdin="", 
603         ident_key = None,
604         server_key = None,
605         tty = False,
606         timeout = None,
607         retry = 0,
608         err_on_timeout = True,
609         connect_timeout = 900,
610         persistent = True,
611         hostip = None):
612     """
613     Executes a remote commands, returns ((stdout,stderr),process)
614     """
615     if TRACE:
616         print "ssh", host, command
617     
618     tmp_known_hosts = None
619     args = ['ssh', '-C',
620             # Don't bother with localhost. Makes test easier
621             '-o', 'NoHostAuthenticationForLocalhost=yes',
622             # XXX: Security vulnerability
623             #'-o', 'StrictHostKeyChecking=no',
624             '-o', 'ConnectTimeout=%d' % (int(connect_timeout),),
625             '-o', 'ConnectionAttempts=3',
626             '-o', 'ServerAliveInterval=30',
627             '-o', 'TCPKeepAlive=yes',
628             '-l', user, hostip or host]
629     if persistent and openssh_has_persist():
630         args.extend([
631             '-o', 'ControlMaster=auto',
632             '-o', 'ControlPath=/tmp/nepi_ssh-%r@%h:%p',
633             '-o', 'ControlPersist=60' ])
634     if agent:
635         args.append('-A')
636     if port:
637         args.append('-p%d' % port)
638     if ident_key:
639         args.extend(('-i', ident_key))
640     if tty:
641         args.append('-t')
642     if server_key:
643         # Create a temporary server key file
644         tmp_known_hosts = _make_server_key_args(
645             server_key, host, port, args)
646     args.append(command)
647
648
649     for x in xrange(retry or 3):
650         # connects to the remote host and starts a remote connection
651         proc = subprocess.Popen(args, 
652                 stdout = subprocess.PIPE,
653                 stdin = subprocess.PIPE, 
654                 stderr = subprocess.PIPE)
655         
656         # attach tempfile object to the process, to make sure the file stays
657         # alive until the process is finished with it
658         proc._known_hosts = tmp_known_hosts
659     
660         try:
661             out, err = _communicate(proc, stdin, timeout, err_on_timeout)
662             if proc.poll():
663                 if err.strip().startswith('ssh: ') or err.strip().startswith('mux_client_hello_exchange: '):
664                     # SSH error, can safely retry
665                     continue
666                 elif retry:
667                     # Probably timed out or plain failed but can retry
668                     continue
669             break
670         except RuntimeError,e:
671             if retry <= 0:
672                 raise
673             if TRACE:
674                 print " timedout -> ", e.args
675             retry -= 1
676         
677     if TRACE:
678         print " -> ", out, err
679
680     return ((out, err), proc)
681
682 def popen_scp(source, dest, 
683         port = None, 
684         agent = None, 
685         recursive = False,
686         ident_key = None,
687         server_key = None):
688     """
689     Copies from/to remote sites.
690     
691     Source and destination should have the user and host encoded
692     as per scp specs.
693     
694     If source is a file object, a special mode will be used to
695     create the remote file with the same contents.
696     
697     If dest is a file object, the remote file (source) will be
698     read and written into dest.
699     
700     In these modes, recursive cannot be True.
701     
702     Source can be a list of files to copy to a single destination,
703     in which case it is advised that the destination be a folder.
704     """
705     
706     if TRACE:
707         print "scp", source, dest
708     
709     if isinstance(source, file) and source.tell() == 0:
710         source = source.name
711     elif hasattr(source, 'read'):
712         tmp = tempfile.NamedTemporaryFile()
713         while True:
714             buf = source.read(65536)
715             if buf:
716                 tmp.write(buf)
717             else:
718                 break
719         tmp.seek(0)
720         source = tmp.name
721     
722     if isinstance(source, file) or isinstance(dest, file) \
723             or hasattr(source, 'read')  or hasattr(dest, 'write'):
724         assert not recursive
725         
726         # Parse source/destination as <user>@<server>:<path>
727         if isinstance(dest, basestring) and ':' in dest:
728             remspec, path = dest.split(':',1)
729         elif isinstance(source, basestring) and ':' in source:
730             remspec, path = source.split(':',1)
731         else:
732             raise ValueError, "Both endpoints cannot be local"
733         user,host = remspec.rsplit('@',1)
734         tmp_known_hosts = None
735         
736         args = ['ssh', '-l', user, '-C',
737                 # Don't bother with localhost. Makes test easier
738                 '-o', 'NoHostAuthenticationForLocalhost=yes',
739                 # XXX: Security vulnerability
740                 #'-o', 'StrictHostKeyChecking=no',
741                 '-o', 'ConnectTimeout=900',
742                 '-o', 'ConnectionAttempts=3',
743                 '-o', 'ServerAliveInterval=30',
744                 '-o', 'TCPKeepAlive=yes',
745                 host ]
746         if openssh_has_persist():
747             args.extend([
748                 '-o', 'ControlMaster=auto',
749                 '-o', 'ControlPath=/tmp/nepi_ssh-%r@%h:%p',
750                 '-o', 'ControlPersist=60' ])
751         if port:
752             args.append('-P%d' % port)
753         if ident_key:
754             args.extend(('-i', ident_key))
755         if server_key:
756             # Create a temporary server key file
757             tmp_known_hosts = _make_server_key_args(
758                 server_key, host, port, args)
759         
760         if isinstance(source, file) or hasattr(source, 'read'):
761             args.append('cat > %s' % (shell_escape(path),))
762         elif isinstance(dest, file) or hasattr(dest, 'write'):
763             args.append('cat %s' % (shell_escape(path),))
764         else:
765             raise AssertionError, "Unreachable code reached! :-Q"
766         
767         # connects to the remote host and starts a remote connection
768         if isinstance(source, file):
769             proc = subprocess.Popen(args, 
770                     stdout = open('/dev/null','w'),
771                     stderr = subprocess.PIPE,
772                     stdin = source)
773             err = proc.stderr.read()
774             proc._known_hosts = tmp_known_hosts
775             eintr_retry(proc.wait)()
776             return ((None,err), proc)
777         elif isinstance(dest, file):
778             proc = subprocess.Popen(args, 
779                     stdout = open('/dev/null','w'),
780                     stderr = subprocess.PIPE,
781                     stdin = source)
782             err = proc.stderr.read()
783             proc._known_hosts = tmp_known_hosts
784             eintr_retry(proc.wait)()
785             return ((None,err), proc)
786         elif hasattr(source, 'read'):
787             # file-like (but not file) source
788             proc = subprocess.Popen(args, 
789                     stdout = open('/dev/null','w'),
790                     stderr = subprocess.PIPE,
791                     stdin = subprocess.PIPE)
792             
793             buf = None
794             err = []
795             while True:
796                 if not buf:
797                     buf = source.read(4096)
798                 if not buf:
799                     #EOF
800                     break
801                 
802                 rdrdy, wrdy, broken = select.select(
803                     [proc.stderr],
804                     [proc.stdin],
805                     [proc.stderr,proc.stdin])
806                 
807                 if proc.stderr in rdrdy:
808                     # use os.read for fully unbuffered behavior
809                     err.append(os.read(proc.stderr.fileno(), 4096))
810                 
811                 if proc.stdin in wrdy:
812                     proc.stdin.write(buf)
813                     buf = None
814                 
815                 if broken:
816                     break
817             proc.stdin.close()
818             err.append(proc.stderr.read())
819                 
820             proc._known_hosts = tmp_known_hosts
821             eintr_retry(proc.wait)()
822             return ((None,''.join(err)), proc)
823         elif hasattr(dest, 'write'):
824             # file-like (but not file) dest
825             proc = subprocess.Popen(args, 
826                     stdout = subprocess.PIPE,
827                     stderr = subprocess.PIPE,
828                     stdin = open('/dev/null','w'))
829             
830             buf = None
831             err = []
832             while True:
833                 rdrdy, wrdy, broken = select.select(
834                     [proc.stderr, proc.stdout],
835                     [],
836                     [proc.stderr, proc.stdout])
837                 
838                 if proc.stderr in rdrdy:
839                     # use os.read for fully unbuffered behavior
840                     err.append(os.read(proc.stderr.fileno(), 4096))
841                 
842                 if proc.stdout in rdrdy:
843                     # use os.read for fully unbuffered behavior
844                     buf = os.read(proc.stdout.fileno(), 4096)
845                     dest.write(buf)
846                     
847                     if not buf:
848                         #EOF
849                         break
850                 
851                 if broken:
852                     break
853             err.append(proc.stderr.read())
854                 
855             proc._known_hosts = tmp_known_hosts
856             eintr_retry(proc.wait)()
857             return ((None,''.join(err)), proc)
858         else:
859             raise AssertionError, "Unreachable code reached! :-Q"
860     else:
861         # Parse destination as <user>@<server>:<path>
862         if isinstance(dest, basestring) and ':' in dest:
863             remspec, path = dest.split(':',1)
864         elif isinstance(source, basestring) and ':' in source:
865             remspec, path = source.split(':',1)
866         else:
867             raise ValueError, "Both endpoints cannot be local"
868         user,host = remspec.rsplit('@',1)
869         
870         # plain scp
871         tmp_known_hosts = None
872         args = ['scp', '-q', '-p', '-C',
873                 # Don't bother with localhost. Makes test easier
874                 '-o', 'NoHostAuthenticationForLocalhost=yes',
875                 # XXX: Security vulnerability
876                 #'-o', 'StrictHostKeyChecking=no',
877                 '-o', 'ConnectTimeout=900',
878                 '-o', 'ConnectionAttempts=3',
879                 '-o', 'ServerAliveInterval=30',
880                 '-o', 'TCPKeepAlive=yes' ]
881                 
882         if port:
883             args.append('-P%d' % port)
884         if recursive:
885             args.append('-r')
886         if ident_key:
887             args.extend(('-i', ident_key))
888         if server_key:
889             # Create a temporary server key file
890             tmp_known_hosts = _make_server_key_args(
891                 server_key, host, port, args)
892         if isinstance(source,list):
893             args.extend(source)
894         else:
895             if openssh_has_persist():
896                 args.extend([
897                     '-o', 'ControlMaster=auto',
898                     '-o', 'ControlPath=/tmp/nepi_ssh-%r@%h:%p'])
899             args.append(source)
900         args.append(dest)
901
902         # connects to the remote host and starts a remote connection
903         proc = subprocess.Popen(args, 
904                 stdout = subprocess.PIPE,
905                 stdin = subprocess.PIPE, 
906                 stderr = subprocess.PIPE)
907         proc._known_hosts = tmp_known_hosts
908         
909         comm = proc.communicate()
910         eintr_retry(proc.wait)()
911         return (comm, proc)
912
913 def decode_and_execute():
914     # The python code we want to execute might have characters that 
915     # are not compatible with the 'inline' mode we are using. To avoid
916     # problems we receive the encoded python code in base64 as a input 
917     # stream and decode it for execution.
918     import base64, os
919     cmd = ""
920     while True:
921         try:
922             cmd += os.read(0, 1)# one byte from stdin
923         except OSError, e:            
924             if e.errno == errno.EINTR:
925                 continue
926             else:
927                 raise
928         if cmd[-1] == "\n": 
929             break
930     cmd = base64.b64decode(cmd)
931     # Uncomment for debug
932     #os.write(2, "Executing python code: %s\n" % cmd)
933     os.write(1, "OK\n") # send a sync message
934     exec(cmd)
935
936 def popen_python(python_code, 
937         communication = DC.ACCESS_LOCAL,
938         host = None, 
939         port = None, 
940         user = None, 
941         agent = False, 
942         python_path = None,
943         ident_key = None,
944         server_key = None,
945         tty = False,
946         sudo = False, 
947         environment_setup = ""):
948
949     cmd = ""
950     if python_path:
951         python_path.replace("'", r"'\''")
952         cmd = """PYTHONPATH="$PYTHONPATH":'%s' """ % python_path
953         cmd += " ; "
954     if environment_setup:
955         cmd += environment_setup
956         cmd += " ; "
957     # Uncomment for debug (to run everything under strace)
958     # We had to verify if strace works (cannot nest them)
959     #cmd += "if strace echo >/dev/null 2>&1; then CMD='strace -ff -tt -s 200 -o strace.out'; else CMD=''; fi\n"
960     #cmd += "$CMD "
961     #cmd += "strace -f -tt -s 200 -o strace$$.out "
962     import nepi
963     cmd += "python -c 'import sys; sys.path.insert(0,%s); from nepi.util import server; server.decode_and_execute()'" % (
964         repr(os.path.dirname(os.path.dirname(nepi.__file__))).replace("'",'"'),
965     )
966
967     if sudo:
968         if ';' in cmd:
969             cmd = "sudo bash -c " + shell_escape(cmd)
970         else:
971             cmd = "sudo " + cmd
972
973     if communication == DC.ACCESS_SSH:
974         tmp_known_hosts = None
975         args = ['ssh', '-C',
976                 # Don't bother with localhost. Makes test easier
977                 '-o', 'NoHostAuthenticationForLocalhost=yes',
978                 # XXX: Security vulnerability
979                 #'-o', 'StrictHostKeyChecking=no',
980                 '-o', 'ConnectionAttempts=3',
981                 '-o', 'ServerAliveInterval=30',
982                 '-o', 'TCPKeepAlive=yes',
983                 '-l', user, host]
984         if agent:
985             args.append('-A')
986         if port:
987             args.append('-p%d' % port)
988         if ident_key:
989             args.extend(('-i', ident_key))
990         if tty:
991             args.append('-t')
992         if server_key:
993             # Create a temporary server key file
994             tmp_known_hosts = _make_server_key_args(
995                 server_key, host, port, args)
996         args.append(cmd)
997     else:
998         args = [ "/bin/bash", "-c", cmd ]
999
1000     # connects to the remote host and starts a remote
1001     proc = subprocess.Popen(args,
1002             shell = False, 
1003             stdout = subprocess.PIPE,
1004             stdin = subprocess.PIPE, 
1005             stderr = subprocess.PIPE)
1006
1007     if communication == DC.ACCESS_SSH:
1008         proc._known_hosts = tmp_known_hosts
1009
1010     # send the command to execute
1011     os.write(proc.stdin.fileno(),
1012             base64.b64encode(python_code) + "\n")
1013  
1014     while True: 
1015         try:
1016             msg = os.read(proc.stdout.fileno(), 3)
1017             break
1018         except OSError, e:            
1019             if e.errno == errno.EINTR:
1020                 continue
1021             else:
1022                 raise
1023     
1024     if msg != "OK\n":
1025         raise RuntimeError, "Failed to start remote python interpreter: \nout:\n%s%s\nerr:\n%s" % (
1026             msg, proc.stdout.read(), proc.stderr.read())
1027
1028     return proc
1029
1030 # POSIX
1031 def _communicate(self, input, timeout=None, err_on_timeout=True):
1032     read_set = []
1033     write_set = []
1034     stdout = None # Return
1035     stderr = None # Return
1036     
1037     killed = False
1038     
1039     if timeout is not None:
1040         timelimit = time.time() + timeout
1041         killtime = timelimit + 4
1042         bailtime = timelimit + 4
1043
1044     if self.stdin:
1045         # Flush stdio buffer.  This might block, if the user has
1046         # been writing to .stdin in an uncontrolled fashion.
1047         self.stdin.flush()
1048         if input:
1049             write_set.append(self.stdin)
1050         else:
1051             self.stdin.close()
1052     if self.stdout:
1053         read_set.append(self.stdout)
1054         stdout = []
1055     if self.stderr:
1056         read_set.append(self.stderr)
1057         stderr = []
1058
1059     input_offset = 0
1060     while read_set or write_set:
1061         if timeout is not None:
1062             curtime = time.time()
1063             if timeout is None or curtime > timelimit:
1064                 if curtime > bailtime:
1065                     break
1066                 elif curtime > killtime:
1067                     signum = signal.SIGKILL
1068                 else:
1069                     signum = signal.SIGTERM
1070                 # Lets kill it
1071                 os.kill(self.pid, signum)
1072                 select_timeout = 0.5
1073             else:
1074                 select_timeout = timelimit - curtime + 0.1
1075         else:
1076             select_timeout = 1.0
1077         
1078         if select_timeout > 1.0:
1079             select_timeout = 1.0
1080             
1081         try:
1082             rlist, wlist, xlist = select.select(read_set, write_set, [], select_timeout)
1083         except select.error,e:
1084             if e[0] != 4:
1085                 raise
1086             else:
1087                 continue
1088         
1089         if not rlist and not wlist and not xlist and self.poll() is not None:
1090             # timeout and process exited, say bye
1091             break
1092
1093         if self.stdin in wlist:
1094             # When select has indicated that the file is writable,
1095             # we can write up to PIPE_BUF bytes without risk
1096             # blocking.  POSIX defines PIPE_BUF >= 512
1097             bytes_written = os.write(self.stdin.fileno(), buffer(input, input_offset, 512))
1098             input_offset += bytes_written
1099             if input_offset >= len(input):
1100                 self.stdin.close()
1101                 write_set.remove(self.stdin)
1102
1103         if self.stdout in rlist:
1104             data = os.read(self.stdout.fileno(), 1024)
1105             if data == "":
1106                 self.stdout.close()
1107                 read_set.remove(self.stdout)
1108             stdout.append(data)
1109
1110         if self.stderr in rlist:
1111             data = os.read(self.stderr.fileno(), 1024)
1112             if data == "":
1113                 self.stderr.close()
1114                 read_set.remove(self.stderr)
1115             stderr.append(data)
1116     
1117     # All data exchanged.  Translate lists into strings.
1118     if stdout is not None:
1119         stdout = ''.join(stdout)
1120     if stderr is not None:
1121         stderr = ''.join(stderr)
1122
1123     # Translate newlines, if requested.  We cannot let the file
1124     # object do the translation: It is based on stdio, which is
1125     # impossible to combine with select (unless forcing no
1126     # buffering).
1127     if self.universal_newlines and hasattr(file, 'newlines'):
1128         if stdout:
1129             stdout = self._translate_newlines(stdout)
1130         if stderr:
1131             stderr = self._translate_newlines(stderr)
1132
1133     if killed and err_on_timeout:
1134         errcode = self.poll()
1135         raise RuntimeError, ("Operation timed out", errcode, stdout, stderr)
1136     else:
1137         if killed:
1138             self.poll()
1139         else:
1140             self.wait()
1141         return (stdout, stderr)
1142