Adding ICN PlanetLab large experiment scenarios
[nepi.git] / src / nepi / util / server.py
1 # -*- coding: utf-8 -*-
2
3 from nepi.util.constants import DeploymentConfiguration as DC
4
5 import base64
6 import errno
7 import os
8 import os.path
9 import resource
10 import select
11 import shutil
12 import signal
13 import socket
14 import sys
15 import subprocess
16 import threading
17 import time
18 import traceback
19 import re
20 import tempfile
21 import defer
22 import functools
23 import collections
24 import hashlib
25
26 CTRL_SOCK = "ctrl.sock"
27 CTRL_PID = "ctrl.pid"
28 STD_ERR = "stderr.log"
29 MAX_FD = 1024
30
31 STOP_MSG = "STOP"
32
33 TRACE = os.environ.get("NEPI_TRACE", "false").lower() in ("true", "1", "on")
34
35 OPENSSH_HAS_PERSIST = None
36
37 if hasattr(os, "devnull"):
38     DEV_NULL = os.devnull
39 else:
40     DEV_NULL = "/dev/null"
41
42 SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
43
44 hostbyname_cache = dict()
45
46 def gethostbyname(host):
47     hostbyname = hostbyname_cache.get(host)
48     if not hostbyname:
49         hostbyname = socket.gethostbyname(host)
50         hostbyname_cache[host] = hostbyname
51     return hostbyname
52
53 def openssh_has_persist():
54     global OPENSSH_HAS_PERSIST
55     if OPENSSH_HAS_PERSIST is None:
56         proc = subprocess.Popen(["ssh","-v"],
57             stdout = subprocess.PIPE,
58             stderr = subprocess.STDOUT,
59             stdin = open("/dev/null","r") )
60         out,err = proc.communicate()
61         proc.wait()
62         
63         vre = re.compile(r'OpenSSH_(?:[6-9]|5[.][8-9]|5[.][1-9][0-9]|[1-9][0-9]).*', re.I)
64         OPENSSH_HAS_PERSIST = bool(vre.match(out))
65     return OPENSSH_HAS_PERSIST
66
67 def shell_escape(s):
68     """ Escapes strings so that they are safe to use as command-line arguments """
69     if SHELL_SAFE.match(s):
70         # safe string - no escaping needed
71         return s
72     else:
73         # unsafe string - escape
74         def escp(c):
75             if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",'"'):
76                 return c
77             else:
78                 return "'$'\\x%02x''" % (ord(c),)
79         s = ''.join(map(escp,s))
80         return "'%s'" % (s,)
81
82 def eintr_retry(func):
83     import functools
84     @functools.wraps(func)
85     def rv(*p, **kw):
86         retry = kw.pop("_retry", False)
87         for i in xrange(0 if retry else 4):
88             try:
89                 return func(*p, **kw)
90             except (select.error, socket.error), args:
91                 if args[0] == errno.EINTR:
92                     continue
93                 else:
94                     raise 
95             except OSError, e:
96                 if e.errno == errno.EINTR:
97                     continue
98                 else:
99                     raise
100         else:
101             return func(*p, **kw)
102     return rv
103
104 class Server(object):
105     def __init__(self, root_dir = ".", log_level = DC.ERROR_LEVEL, 
106             environment_setup = "", clean_root = False):
107         self._root_dir = root_dir
108         self._clean_root = clean_root
109         self._stop = False
110         self._ctrl_sock = None
111         self._log_level = log_level
112         self._rdbuf = ""
113         self._environment_setup = environment_setup
114
115     def run(self):
116         try:
117             if self.daemonize():
118                 self.post_daemonize()
119                 self.loop()
120                 self.cleanup()
121                 # ref: "os._exit(0)"
122                 # can not return normally after fork beacuse no exec was done.
123                 # This means that if we don't do a os._exit(0) here the code that 
124                 # follows the call to "Server.run()" in the "caller code" will be 
125                 # executed... but by now it has already been executed after the 
126                 # first process (the one that did the first fork) returned.
127                 os._exit(0)
128         except:
129             print >>sys.stderr, "SERVER_ERROR."
130             self.log_error()
131             self.cleanup()
132             os._exit(0)
133         print >>sys.stderr, "SERVER_READY."
134
135     def daemonize(self):
136         # pipes for process synchronization
137         (r, w) = os.pipe()
138         
139         # build root folder
140         root = os.path.normpath(self._root_dir)
141         if self._root_dir not in [".", ""] and os.path.exists(root) \
142                 and self._clean_root:
143             shutil.rmtree(root)
144         if not os.path.exists(root):
145             os.makedirs(root, 0755)
146
147         pid1 = os.fork()
148         if pid1 > 0:
149             os.close(w)
150             while True:
151                 try:
152                     os.read(r, 1)
153                 except OSError, e: # pragma: no cover
154                     if e.errno == errno.EINTR:
155                         continue
156                     else:
157                         raise
158                 break
159             os.close(r)
160             # os.waitpid avoids leaving a <defunc> (zombie) process
161             st = os.waitpid(pid1, 0)[1]
162             if st:
163                 raise RuntimeError("Daemonization failed")
164             # return 0 to inform the caller method that this is not the 
165             # daemonized process
166             return 0
167         os.close(r)
168
169         # Decouple from parent environment.
170         os.chdir(self._root_dir)
171         os.umask(0)
172         os.setsid()
173
174         # fork 2
175         pid2 = os.fork()
176         if pid2 > 0:
177             # see ref: "os._exit(0)"
178             os._exit(0)
179
180         # close all open file descriptors.
181         max_fd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
182         if (max_fd == resource.RLIM_INFINITY):
183             max_fd = MAX_FD
184         for fd in range(3, max_fd):
185             if fd != w:
186                 try:
187                     os.close(fd)
188                 except OSError:
189                     pass
190
191         # Redirect standard file descriptors.
192         stdin = open(DEV_NULL, "r")
193         stderr = stdout = open(STD_ERR, "a", 0)
194         os.dup2(stdin.fileno(), sys.stdin.fileno())
195         # NOTE: sys.stdout.write will still be buffered, even if the file
196         # was opened with 0 buffer
197         os.dup2(stdout.fileno(), sys.stdout.fileno())
198         os.dup2(stderr.fileno(), sys.stderr.fileno())
199         
200         # setup environment
201         if self._environment_setup:
202             # parse environment variables and pass to child process
203             # do it by executing shell commands, in case there's some heavy setup involved
204             envproc = subprocess.Popen(
205                 [ "bash", "-c", 
206                     "( %s python -c 'import os,sys ; print \"\\x01\".join(\"\\x02\".join(map(str,x)) for x in os.environ.iteritems())' ) | tail -1" %
207                         ( self._environment_setup, ) ],
208                 stdin = subprocess.PIPE, 
209                 stdout = subprocess.PIPE,
210                 stderr = subprocess.PIPE
211             )
212             out,err = envproc.communicate()
213
214             # parse new environment
215             if out:
216                 environment = dict(map(lambda x:x.split("\x02"), out.split("\x01")))
217             
218                 # apply to current environment
219                 for name, value in environment.iteritems():
220                     os.environ[name] = value
221                 
222                 # apply pythonpath
223                 if 'PYTHONPATH' in environment:
224                     sys.path = environment['PYTHONPATH'].split(':') + sys.path
225
226         # create control socket
227         self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
228         try:
229             self._ctrl_sock.bind(CTRL_SOCK)
230         except socket.error:
231             # Address in use, check pidfile
232             pid = None
233             try:
234                 pidfile = open(CTRL_PID, "r")
235                 pid = pidfile.read()
236                 pidfile.close()
237                 pid = int(pid)
238             except:
239                 # no pidfile
240                 pass
241             
242             if pid is not None:
243                 # Check process liveliness
244                 if not os.path.exists("/proc/%d" % (pid,)):
245                     # Ok, it's dead, clean the socket
246                     os.remove(CTRL_SOCK)
247             
248             # try again
249             self._ctrl_sock.bind(CTRL_SOCK)
250             
251         self._ctrl_sock.listen(0)
252         
253         # Save pidfile
254         pidfile = open(CTRL_PID, "w")
255         pidfile.write(str(os.getpid()))
256         pidfile.close()
257
258         # let the parent process know that the daemonization is finished
259         os.write(w, "\n")
260         os.close(w)
261         return 1
262
263     def post_daemonize(self):
264         os.environ["NEPI_CONTROLLER_LOGLEVEL"] = self._log_level
265         # QT, for some strange reason, redefines the SIGCHILD handler to write
266         # a \0 to a fd (lets say fileno 'x'), when ever a SIGCHILD is received.
267         # Server dameonization closes all file descriptors from fileno '3',
268         # but the overloaded handler (inherited by the forked process) will
269         # keep trying to write the \0 to fileno 'x', which might have been reused 
270         # after closing, for other operations. This is bad bad bad when fileno 'x'
271         # is in use for communication pouroses, because unexpected \0 start
272         # appearing in the communication messages... this is exactly what happens 
273         # when using netns in daemonized form. Thus, be have no other alternative than
274         # restoring the SIGCHLD handler to the default here.
275         import signal
276         signal.signal(signal.SIGCHLD, signal.SIG_DFL)
277
278     def loop(self):
279         while not self._stop:
280             conn, addr = self._ctrl_sock.accept()
281             self.log_error("ACCEPTED CONNECTION: %s" % (addr,))
282             conn.settimeout(5)
283             while not self._stop:
284                 try:
285                     msg = self.recv_msg(conn)
286                 except socket.timeout, e:
287                     #self.log_error("SERVER recv_msg: connection timedout ")
288                     continue
289                 
290                 if not msg:
291                     self.log_error("CONNECTION LOST")
292                     break
293                     
294                 if msg == STOP_MSG:
295                     self._stop = True
296                     reply = self.stop_action()
297                 else:
298                     reply = self.reply_action(msg)
299                 
300                 try:
301                     self.send_reply(conn, reply)
302                 except socket.error:
303                     self.log_error()
304                     self.log_error("NOTICE: Awaiting for reconnection")
305                     break
306             try:
307                 conn.close()
308             except:
309                 # Doesn't matter
310                 self.log_error()
311
312     def recv_msg(self, conn):
313         data = [self._rdbuf]
314         chunk = data[0]
315         while '\n' not in chunk:
316             try:
317                 chunk = conn.recv(1024)
318             except (OSError, socket.error), e:
319                 if e[0] != errno.EINTR:
320                     raise
321                 else:
322                     continue
323             if chunk:
324                 data.append(chunk)
325             else:
326                 # empty chunk = EOF
327                 break
328         data = ''.join(data).split('\n',1)
329         while len(data) < 2:
330             data.append('')
331         data, self._rdbuf = data
332         
333         decoded = base64.b64decode(data)
334         return decoded.rstrip()
335
336     def send_reply(self, conn, reply):
337         encoded = base64.b64encode(reply)
338         conn.send("%s\n" % encoded)
339        
340     def cleanup(self):
341         try:
342             self._ctrl_sock.close()
343             os.remove(CTRL_SOCK)
344         except:
345             self.log_error()
346
347     def stop_action(self):
348         return "Stopping server"
349
350     def reply_action(self, msg):
351         return "Reply to: %s" % msg
352
353     def log_error(self, text = None, context = ''):
354         if text == None:
355             text = traceback.format_exc()
356         date = time.strftime("%Y-%m-%d %H:%M:%S")
357         if context:
358             context = " (%s)" % (context,)
359         sys.stderr.write("ERROR%s: %s\n%s\n" % (context, date, text))
360         return text
361
362     def log_debug(self, text):
363         if self._log_level == DC.DEBUG_LEVEL:
364             date = time.strftime("%Y-%m-%d %H:%M:%S")
365             sys.stderr.write("DEBUG: %s\n%s\n" % (date, text))
366
367 class Forwarder(object):
368     def __init__(self, root_dir = "."):
369         self._ctrl_sock = None
370         self._root_dir = root_dir
371         self._stop = False
372         self._rdbuf = ""
373
374     def forward(self):
375         self.connect()
376         print >>sys.stderr, "FORWARDER_READY."
377         while not self._stop:
378             data = self.read_data()
379             if not data:
380                 # Connection to client lost
381                 break
382             self.send_to_server(data)
383             
384             data = self.recv_from_server()
385             if not data:
386                 # Connection to server lost
387                 raise IOError, "Connection to server lost while "\
388                     "expecting response"
389             self.write_data(data)
390         self.disconnect()
391
392     def read_data(self):
393         return sys.stdin.readline()
394
395     def write_data(self, data):
396         sys.stdout.write(data)
397         # sys.stdout.write is buffered, this is why we need to do a flush()
398         sys.stdout.flush()
399
400     def send_to_server(self, data):
401         try:
402             self._ctrl_sock.send(data)
403         except (IOError, socket.error), e:
404             if e[0] == errno.EPIPE:
405                 self.connect()
406                 self._ctrl_sock.send(data)
407             else:
408                 raise e
409         encoded = data.rstrip() 
410         msg = base64.b64decode(encoded)
411         if msg == STOP_MSG:
412             self._stop = True
413
414     def recv_from_server(self):
415         data = [self._rdbuf]
416         chunk = data[0]
417         while '\n' not in chunk:
418             try:
419                 chunk = self._ctrl_sock.recv(1024)
420             except (OSError, socket.error), e:
421                 if e[0] != errno.EINTR:
422                     raise
423                 continue
424             if chunk:
425                 data.append(chunk)
426             else:
427                 # empty chunk = EOF
428                 break
429         data = ''.join(data).split('\n',1)
430         while len(data) < 2:
431             data.append('')
432         data, self._rdbuf = data
433         
434         return data+'\n'
435  
436     def connect(self):
437         self.disconnect()
438         self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
439         sock_addr = os.path.join(self._root_dir, CTRL_SOCK)
440         self._ctrl_sock.connect(sock_addr)
441
442     def disconnect(self):
443         try:
444             self._ctrl_sock.close()
445         except:
446             pass
447
448 class Client(object):
449     def __init__(self, root_dir = ".", host = None, port = None, user = None, 
450             agent = None, sudo = False, communication = DC.ACCESS_LOCAL,
451             environment_setup = ""):
452         self.root_dir = root_dir
453         self.addr = (host, port)
454         self.user = user
455         self.agent = agent
456         self.sudo = sudo
457         self.communication = communication
458         self.environment_setup = environment_setup
459         self._stopped = False
460         self._deferreds = collections.deque()
461         self.connect()
462     
463     def __del__(self):
464         if self._process.poll() is None:
465             os.kill(self._process.pid, signal.SIGTERM)
466         self._process.wait()
467         
468     def connect(self):
469         root_dir = self.root_dir
470         (host, port) = self.addr
471         user = self.user
472         agent = self.agent
473         sudo = self.sudo
474         communication = self.communication
475         
476         python_code = "from nepi.util import server;c=server.Forwarder(%r);\
477                 c.forward()" % (root_dir,)
478
479         self._process = popen_python(python_code, 
480                     communication = communication,
481                     host = host, 
482                     port = port, 
483                     user = user, 
484                     agent = agent, 
485                     sudo = sudo, 
486                     environment_setup = self.environment_setup)
487                
488         # Wait for the forwarder to be ready, otherwise nobody
489         # will be able to connect to it
490         err = []
491         helo = "nope"
492         while helo:
493             helo = self._process.stderr.readline()
494             if helo == 'FORWARDER_READY.\n':
495                 break
496             err.append(helo)
497         else:
498             raise AssertionError, "Expected 'FORWARDER_READY.', got: %s" % (''.join(err),)
499         
500     def send_msg(self, msg):
501         encoded = base64.b64encode(msg)
502         data = "%s\n" % encoded
503         
504         try:
505             self._process.stdin.write(data)
506         except (IOError, ValueError):
507             # dead process, poll it to un-zombify
508             self._process.poll()
509             
510             # try again after reconnect
511             # If it fails again, though, give up
512             self.connect()
513             self._process.stdin.write(data)
514
515     def send_stop(self):
516         self.send_msg(STOP_MSG)
517         self._stopped = True
518
519     def defer_reply(self, transform=None):
520         defer_entry = []
521         self._deferreds.append(defer_entry)
522         return defer.Defer(
523             functools.partial(self.read_reply, defer_entry, transform)
524         )
525         
526     def _read_reply(self):
527         data = self._process.stdout.readline()
528         encoded = data.rstrip() 
529         if not encoded:
530             # empty == eof == dead process, poll it to un-zombify
531             self._process.poll()
532             
533             raise RuntimeError, "Forwarder died while awaiting reply: %s" % (self._process.stderr.read(),)
534         return base64.b64decode(encoded)
535     
536     def read_reply(self, which=None, transform=None):
537         # Test to see if someone did it already
538         if which is not None and len(which):
539             # Ok, they did it...
540             # ...just return the deferred value
541             if transform:
542                 return transform(which[0])
543             else:
544                 return which[0]
545         
546         # Process all deferreds until the one we're looking for
547         # or until the queue is empty
548         while self._deferreds:
549             try:
550                 deferred = self._deferreds.popleft()
551             except IndexError:
552                 # emptied
553                 break
554             
555             deferred.append(self._read_reply())
556             if deferred is which:
557                 # We reached the one we were looking for
558                 if transform:
559                     return transform(deferred[0])
560                 else:
561                     return deferred[0]
562         
563         if which is None:
564             # They've requested a synchronous read
565             if transform:
566                 return transform(self._read_reply())
567             else:
568                 return self._read_reply()
569
570 def _make_server_key_args(server_key, host, port, args):
571     """ 
572     Returns a reference to the created temporary file, and adds the
573     corresponding arguments to the given argument list.
574     
575     Make sure to hold onto it until the process is done with the file
576     """
577     if port is not None:
578         host = '%s:%s' % (host,port)
579     # Create a temporary server key file
580     tmp_known_hosts = tempfile.NamedTemporaryFile()
581    
582     hostbyname = gethostbyname(host) 
583
584     # Add the intended host key
585     tmp_known_hosts.write('%s,%s %s\n' % (host, hostbyname, server_key))
586     
587     # If we're not in strict mode, add user-configured keys
588     if os.environ.get('NEPI_STRICT_AUTH_MODE',"").lower() not in ('1','true','on'):
589         user_hosts_path = '%s/.ssh/known_hosts' % (os.environ.get('HOME',""),)
590         if os.access(user_hosts_path, os.R_OK):
591             f = open(user_hosts_path, "r")
592             tmp_known_hosts.write(f.read())
593             f.close()
594         
595     tmp_known_hosts.flush()
596     
597     args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
598     
599     return tmp_known_hosts
600
601 def popen_ssh_command(command, host, port, user, agent, 
602         stdin="", 
603         ident_key = None,
604         server_key = None,
605         tty = False,
606         timeout = None,
607         retry = 0,
608         err_on_timeout = True,
609         connect_timeout = 60,
610         persistent = True,
611         hostip = None):
612     """
613     Executes a remote commands, returns ((stdout,stderr),process)
614     """
615    
616     tmp_known_hosts = None
617     args = ['ssh', '-C',
618             # Don't bother with localhost. Makes test easier
619             '-o', 'NoHostAuthenticationForLocalhost=yes',
620             '-o', 'ConnectTimeout=%d' % (int(connect_timeout),),
621             '-o', 'ConnectionAttempts=3',
622             '-o', 'ServerAliveInterval=30',
623             '-o', 'TCPKeepAlive=yes',
624             '-l', user, hostip or host]
625     if persistent and openssh_has_persist():
626         args.extend([
627             '-o', 'ControlMaster=auto',
628             '-o', 'ControlPath=/tmp/nepi_ssh-%r@%h:%p',
629             '-o', 'ControlPersist=60' ])
630     if agent:
631         args.append('-A')
632     if port:
633         args.append('-p%d' % port)
634     if ident_key:
635         args.extend(('-i', ident_key))
636     if tty:
637         args.append('-t')
638         args.append('-t')
639     if server_key:
640         # Create a temporary server key file
641         tmp_known_hosts = _make_server_key_args(
642             server_key, host, port, args)
643     args.append(command)
644
645     for x in xrange(retry or 3):
646         # connects to the remote host and starts a remote connection
647         proc = subprocess.Popen(args, 
648                 stdout = subprocess.PIPE,
649                 stdin = subprocess.PIPE, 
650                 stderr = subprocess.PIPE)
651         
652         # attach tempfile object to the process, to make sure the file stays
653         # alive until the process is finished with it
654         proc._known_hosts = tmp_known_hosts
655     
656         try:
657             out, err = _communicate(proc, stdin, timeout, err_on_timeout)
658             if TRACE:
659                 print "COMMAND host %s, command %s, out %s, error %s" % (host, " ".join(args), out, err)
660
661             if proc.poll():
662                 if err.strip().startswith('ssh: ') or err.strip().startswith('mux_client_hello_exchange: '):
663                     # SSH error, can safely retry
664                     continue
665                 elif :
666                     ControlSocket /tmp/nepi_ssh-inria_alina@planetlab04.cnds.unibe.ch:22 already exists, disabling multiplexing
667                     # SSH error, can safely retry (but need to delete controlpath file)
668                     # TODO: delete file
669                     continue
670                 elif retry:
671                     # Probably timed out or plain failed but can retry
672                     continue
673             break
674         except RuntimeError,e:
675             if TRACE:
676                 print "EXCEPTION host %s, command %s, out %s, error %s, exception TIMEOUT ->  %s" % (
677                         host, " ".join(args), out, err, e.args)
678
679             if retry <= 0:
680                 raise
681             retry -= 1
682         
683     return ((out, err), proc)
684
685 def popen_scp(source, dest, 
686         port = None, 
687         agent = None, 
688         recursive = False,
689         ident_key = None,
690         server_key = None):
691     """
692     Copies from/to remote sites.
693     
694     Source and destination should have the user and host encoded
695     as per scp specs.
696     
697     If source is a file object, a special mode will be used to
698     create the remote file with the same contents.
699     
700     If dest is a file object, the remote file (source) will be
701     read and written into dest.
702     
703     In these modes, recursive cannot be True.
704     
705     Source can be a list of files to copy to a single destination,
706     in which case it is advised that the destination be a folder.
707     """
708     
709     if TRACE:
710         print "scp", source, dest
711     
712     if isinstance(source, file) and source.tell() == 0:
713         source = source.name
714     elif hasattr(source, 'read'):
715         tmp = tempfile.NamedTemporaryFile()
716         while True:
717             buf = source.read(65536)
718             if buf:
719                 tmp.write(buf)
720             else:
721                 break
722         tmp.seek(0)
723         source = tmp.name
724     
725     if isinstance(source, file) or isinstance(dest, file) \
726             or hasattr(source, 'read')  or hasattr(dest, 'write'):
727         assert not recursive
728         
729         # Parse source/destination as <user>@<server>:<path>
730         if isinstance(dest, basestring) and ':' in dest:
731             remspec, path = dest.split(':',1)
732         elif isinstance(source, basestring) and ':' in source:
733             remspec, path = source.split(':',1)
734         else:
735             raise ValueError, "Both endpoints cannot be local"
736         user,host = remspec.rsplit('@',1)
737         tmp_known_hosts = None
738         
739         args = ['ssh', '-l', user, '-C',
740                 # Don't bother with localhost. Makes test easier
741                 '-o', 'NoHostAuthenticationForLocalhost=yes',
742                 '-o', 'ConnectTimeout=60',
743                 '-o', 'ConnectionAttempts=3',
744                 '-o', 'ServerAliveInterval=30',
745                 '-o', 'TCPKeepAlive=yes',
746                 host ]
747         if openssh_has_persist():
748             args.extend([
749                 '-o', 'ControlMaster=auto',
750                 '-o', 'ControlPath=/tmp/nepi_ssh-%r@%h:%p',
751                 '-o', 'ControlPersist=60' ])
752         if port:
753             args.append('-P%d' % port)
754         if ident_key:
755             args.extend(('-i', ident_key))
756         if server_key:
757             # Create a temporary server key file
758             tmp_known_hosts = _make_server_key_args(
759                 server_key, host, port, args)
760         
761         if isinstance(source, file) or hasattr(source, 'read'):
762             args.append('cat > %s' % (shell_escape(path),))
763         elif isinstance(dest, file) or hasattr(dest, 'write'):
764             args.append('cat %s' % (shell_escape(path),))
765         else:
766             raise AssertionError, "Unreachable code reached! :-Q"
767         
768         # connects to the remote host and starts a remote connection
769         if isinstance(source, file):
770             proc = subprocess.Popen(args, 
771                     stdout = open('/dev/null','w'),
772                     stderr = subprocess.PIPE,
773                     stdin = source)
774             err = proc.stderr.read()
775             proc._known_hosts = tmp_known_hosts
776             eintr_retry(proc.wait)()
777             return ((None,err), proc)
778         elif isinstance(dest, file):
779             proc = subprocess.Popen(args, 
780                     stdout = open('/dev/null','w'),
781                     stderr = subprocess.PIPE,
782                     stdin = source)
783             err = proc.stderr.read()
784             proc._known_hosts = tmp_known_hosts
785             eintr_retry(proc.wait)()
786             return ((None,err), proc)
787         elif hasattr(source, 'read'):
788             # file-like (but not file) source
789             proc = subprocess.Popen(args, 
790                     stdout = open('/dev/null','w'),
791                     stderr = subprocess.PIPE,
792                     stdin = subprocess.PIPE)
793             
794             buf = None
795             err = []
796             while True:
797                 if not buf:
798                     buf = source.read(4096)
799                 if not buf:
800                     #EOF
801                     break
802                 
803                 rdrdy, wrdy, broken = select.select(
804                     [proc.stderr],
805                     [proc.stdin],
806                     [proc.stderr,proc.stdin])
807                 
808                 if proc.stderr in rdrdy:
809                     # use os.read for fully unbuffered behavior
810                     err.append(os.read(proc.stderr.fileno(), 4096))
811                 
812                 if proc.stdin in wrdy:
813                     proc.stdin.write(buf)
814                     buf = None
815                 
816                 if broken:
817                     break
818             proc.stdin.close()
819             err.append(proc.stderr.read())
820                 
821             proc._known_hosts = tmp_known_hosts
822             eintr_retry(proc.wait)()
823             return ((None,''.join(err)), proc)
824         elif hasattr(dest, 'write'):
825             # file-like (but not file) dest
826             proc = subprocess.Popen(args, 
827                     stdout = subprocess.PIPE,
828                     stderr = subprocess.PIPE,
829                     stdin = open('/dev/null','w'))
830             
831             buf = None
832             err = []
833             while True:
834                 rdrdy, wrdy, broken = select.select(
835                     [proc.stderr, proc.stdout],
836                     [],
837                     [proc.stderr, proc.stdout])
838                 
839                 if proc.stderr in rdrdy:
840                     # use os.read for fully unbuffered behavior
841                     err.append(os.read(proc.stderr.fileno(), 4096))
842                 
843                 if proc.stdout in rdrdy:
844                     # use os.read for fully unbuffered behavior
845                     buf = os.read(proc.stdout.fileno(), 4096)
846                     dest.write(buf)
847                     
848                     if not buf:
849                         #EOF
850                         break
851                 
852                 if broken:
853                     break
854             err.append(proc.stderr.read())
855                 
856             proc._known_hosts = tmp_known_hosts
857             eintr_retry(proc.wait)()
858             return ((None,''.join(err)), proc)
859         else:
860             raise AssertionError, "Unreachable code reached! :-Q"
861     else:
862         # Parse destination as <user>@<server>:<path>
863         if isinstance(dest, basestring) and ':' in dest:
864             remspec, path = dest.split(':',1)
865         elif isinstance(source, basestring) and ':' in source:
866             remspec, path = source.split(':',1)
867         else:
868             raise ValueError, "Both endpoints cannot be local"
869         user,host = remspec.rsplit('@',1)
870         
871         # plain scp
872         tmp_known_hosts = None
873         args = ['scp', '-q', '-p', '-C',
874                 # Don't bother with localhost. Makes test easier
875                 '-o', 'NoHostAuthenticationForLocalhost=yes',
876                 '-o', 'ConnectTimeout=60',
877                 '-o', 'ConnectionAttempts=3',
878                 '-o', 'ServerAliveInterval=30',
879                 '-o', 'TCPKeepAlive=yes' ]
880                 
881         if port:
882             args.append('-P%d' % port)
883         if recursive:
884             args.append('-r')
885         if ident_key:
886             args.extend(('-i', ident_key))
887         if server_key:
888             # Create a temporary server key file
889             tmp_known_hosts = _make_server_key_args(
890                 server_key, host, port, args)
891         if isinstance(source,list):
892             args.extend(source)
893         else:
894             if openssh_has_persist():
895                 args.extend([
896                     '-o', 'ControlMaster=auto',
897                     '-o', 'ControlPath=/tmp/nepi_ssh-%r@%h:%p'])
898             args.append(source)
899         args.append(dest)
900
901         # connects to the remote host and starts a remote connection
902         proc = subprocess.Popen(args, 
903                 stdout = subprocess.PIPE,
904                 stdin = subprocess.PIPE, 
905                 stderr = subprocess.PIPE)
906         proc._known_hosts = tmp_known_hosts
907         
908         comm = proc.communicate()
909         eintr_retry(proc.wait)()
910         return (comm, proc)
911
912 def decode_and_execute():
913     # The python code we want to execute might have characters that 
914     # are not compatible with the 'inline' mode we are using. To avoid
915     # problems we receive the encoded python code in base64 as a input 
916     # stream and decode it for execution.
917     import base64, os
918     cmd = ""
919     while True:
920         try:
921             cmd += os.read(0, 1)# one byte from stdin
922         except OSError, e:            
923             if e.errno == errno.EINTR:
924                 continue
925             else:
926                 raise
927         if cmd[-1] == "\n": 
928             break
929     cmd = base64.b64decode(cmd)
930     # Uncomment for debug
931     #os.write(2, "Executing python code: %s\n" % cmd)
932     os.write(1, "OK\n") # send a sync message
933     exec(cmd)
934
935 def popen_python(python_code, 
936         communication = DC.ACCESS_LOCAL,
937         host = None, 
938         port = None, 
939         user = None, 
940         agent = False, 
941         python_path = None,
942         ident_key = None,
943         server_key = None,
944         tty = False,
945         sudo = False, 
946         environment_setup = ""):
947
948     cmd = ""
949     if python_path:
950         python_path.replace("'", r"'\''")
951         cmd = """PYTHONPATH="$PYTHONPATH":'%s' """ % python_path
952         cmd += " ; "
953     if environment_setup:
954         cmd += environment_setup
955         cmd += " ; "
956     # Uncomment for debug (to run everything under strace)
957     # We had to verify if strace works (cannot nest them)
958     #cmd += "if strace echo >/dev/null 2>&1; then CMD='strace -ff -tt -s 200 -o strace.out'; else CMD=''; fi\n"
959     #cmd += "$CMD "
960     #cmd += "strace -f -tt -s 200 -o strace$$.out "
961     import nepi
962     cmd += "python -c 'import sys; sys.path.insert(0,%s); from nepi.util import server; server.decode_and_execute()'" % (
963         repr(os.path.dirname(os.path.dirname(nepi.__file__))).replace("'",'"'),
964     )
965
966     if sudo:
967         if ';' in cmd:
968             cmd = "sudo bash -c " + shell_escape(cmd)
969         else:
970             cmd = "sudo " + cmd
971
972     if communication == DC.ACCESS_SSH:
973         tmp_known_hosts = None
974         args = ['ssh', '-C',
975                 # Don't bother with localhost. Makes test easier
976                 '-o', 'NoHostAuthenticationForLocalhost=yes',
977                 '-o', 'ConnectionAttempts=3',
978                 '-o', 'ServerAliveInterval=30',
979                 '-o', 'TCPKeepAlive=yes',
980                 '-l', user, host]
981         if agent:
982             args.append('-A')
983         if port:
984             args.append('-p%d' % port)
985         if ident_key:
986             args.extend(('-i', ident_key))
987         if tty:
988             args.append('-t')
989         if server_key:
990             # Create a temporary server key file
991             tmp_known_hosts = _make_server_key_args(
992                 server_key, host, port, args)
993         args.append(cmd)
994     else:
995         args = [ "/bin/bash", "-c", cmd ]
996
997     # connects to the remote host and starts a remote
998     proc = subprocess.Popen(args,
999             shell = False, 
1000             stdout = subprocess.PIPE,
1001             stdin = subprocess.PIPE, 
1002             stderr = subprocess.PIPE)
1003
1004     if communication == DC.ACCESS_SSH:
1005         proc._known_hosts = tmp_known_hosts
1006
1007     # send the command to execute
1008     os.write(proc.stdin.fileno(),
1009             base64.b64encode(python_code) + "\n")
1010  
1011     while True: 
1012         try:
1013             msg = os.read(proc.stdout.fileno(), 3)
1014             break
1015         except OSError, e:            
1016             if e.errno == errno.EINTR:
1017                 continue
1018             else:
1019                 raise
1020     
1021     if msg != "OK\n":
1022         raise RuntimeError, "Failed to start remote python interpreter: \nout:\n%s%s\nerr:\n%s" % (
1023             msg, proc.stdout.read(), proc.stderr.read())
1024
1025     return proc
1026
1027 # POSIX
1028 def _communicate(self, input, timeout=None, err_on_timeout=True):
1029     read_set = []
1030     write_set = []
1031     stdout = None # Return
1032     stderr = None # Return
1033     
1034     killed = False
1035     
1036     if timeout is not None:
1037         timelimit = time.time() + timeout
1038         killtime = timelimit + 4
1039         bailtime = timelimit + 4
1040
1041     if self.stdin:
1042         # Flush stdio buffer.  This might block, if the user has
1043         # been writing to .stdin in an uncontrolled fashion.
1044         self.stdin.flush()
1045         if input:
1046             write_set.append(self.stdin)
1047         else:
1048             self.stdin.close()
1049     if self.stdout:
1050         read_set.append(self.stdout)
1051         stdout = []
1052     if self.stderr:
1053         read_set.append(self.stderr)
1054         stderr = []
1055
1056     input_offset = 0
1057     while read_set or write_set:
1058         if timeout is not None:
1059             curtime = time.time()
1060             if timeout is None or curtime > timelimit:
1061                 if curtime > bailtime:
1062                     break
1063                 elif curtime > killtime:
1064                     signum = signal.SIGKILL
1065                 else:
1066                     signum = signal.SIGTERM
1067                 # Lets kill it
1068                 os.kill(self.pid, signum)
1069                 select_timeout = 0.5
1070             else:
1071                 select_timeout = timelimit - curtime + 0.1
1072         else:
1073             select_timeout = 1.0
1074         
1075         if select_timeout > 1.0:
1076             select_timeout = 1.0
1077             
1078         try:
1079             rlist, wlist, xlist = select.select(read_set, write_set, [], select_timeout)
1080         except select.error,e:
1081             if e[0] != 4:
1082                 raise
1083             else:
1084                 continue
1085         
1086         if not rlist and not wlist and not xlist and self.poll() is not None:
1087             # timeout and process exited, say bye
1088             break
1089
1090         if self.stdin in wlist:
1091             # When select has indicated that the file is writable,
1092             # we can write up to PIPE_BUF bytes without risk
1093             # blocking.  POSIX defines PIPE_BUF >= 512
1094             bytes_written = os.write(self.stdin.fileno(), buffer(input, input_offset, 512))
1095             input_offset += bytes_written
1096             if input_offset >= len(input):
1097                 self.stdin.close()
1098                 write_set.remove(self.stdin)
1099
1100         if self.stdout in rlist:
1101             data = os.read(self.stdout.fileno(), 1024)
1102             if data == "":
1103                 self.stdout.close()
1104                 read_set.remove(self.stdout)
1105             stdout.append(data)
1106
1107         if self.stderr in rlist:
1108             data = os.read(self.stderr.fileno(), 1024)
1109             if data == "":
1110                 self.stderr.close()
1111                 read_set.remove(self.stderr)
1112             stderr.append(data)
1113     
1114     # All data exchanged.  Translate lists into strings.
1115     if stdout is not None:
1116         stdout = ''.join(stdout)
1117     if stderr is not None:
1118         stderr = ''.join(stderr)
1119
1120     # Translate newlines, if requested.  We cannot let the file
1121     # object do the translation: It is based on stdio, which is
1122     # impossible to combine with select (unless forcing no
1123     # buffering).
1124     if self.universal_newlines and hasattr(file, 'newlines'):
1125         if stdout:
1126             stdout = self._translate_newlines(stdout)
1127         if stderr:
1128             stderr = self._translate_newlines(stderr)
1129
1130     if killed and err_on_timeout:
1131         errcode = self.poll()
1132         raise RuntimeError, ("Operation timed out", errcode, stdout, stderr)
1133     else:
1134         if killed:
1135             self.poll()
1136         else:
1137             self.wait()
1138         return (stdout, stderr)
1139