ExperimentSuite strcuture. Still not working, missing proxy and tests.
[nepi.git] / src / nepi / util / server.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 from nepi.util.constants import DeploymentConfiguration as DC
5
6 import base64
7 import errno
8 import os
9 import os.path
10 import resource
11 import select
12 import shutil
13 import signal
14 import socket
15 import sys
16 import subprocess
17 import threading
18 import time
19 import traceback
20 import re
21 import tempfile
22 import defer
23 import functools
24 import collections
25
26 CTRL_SOCK = "ctrl.sock"
27 CTRL_PID = "ctrl.pid"
28 STD_ERR = "stderr.log"
29 MAX_FD = 1024
30
31 STOP_MSG = "STOP"
32
33 TRACE = os.environ.get("NEPI_TRACE", "false").lower() in ("true", "1", "on")
34
35 if hasattr(os, "devnull"):
36     DEV_NULL = os.devnull
37 else:
38     DEV_NULL = "/dev/null"
39
40 SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
41
42 def shell_escape(s):
43     """ Escapes strings so that they are safe to use as command-line arguments """
44     if SHELL_SAFE.match(s):
45         # safe string - no escaping needed
46         return s
47     else:
48         # unsafe string - escape
49         def escp(c):
50             if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",'"'):
51                 return c
52             else:
53                 return "'$'\\x%02x''" % (ord(c),)
54         s = ''.join(map(escp,s))
55         return "'%s'" % (s,)
56
57 def eintr_retry(func):
58     import functools
59     @functools.wraps(func)
60     def rv(*p, **kw):
61         retry = kw.pop("_retry", False)
62         for i in xrange(0 if retry else 4):
63             try:
64                 return func(*p, **kw)
65             except (select.error, socket.error), args:
66                 if args[0] == errno.EINTR:
67                     continue
68                 else:
69                     raise 
70             except OSError, e:
71                 if e.errno == errno.EINTR:
72                     continue
73                 else:
74                     raise
75         else:
76             return func(*p, **kw)
77     return rv
78
79 class Server(object):
80     def __init__(self, root_dir = ".", log_level = DC.ERROR_LEVEL, 
81             environment_setup = "", clean_root = False):
82         self._root_dir = root_dir
83         self._clean_root = clean_root
84         self._stop = False
85         self._ctrl_sock = None
86         self._log_level = log_level
87         self._rdbuf = ""
88         self._environment_setup = environment_setup
89
90     def run(self):
91         try:
92             if self.daemonize():
93                 self.post_daemonize()
94                 self.loop()
95                 self.cleanup()
96                 # ref: "os._exit(0)"
97                 # can not return normally after fork beacuse no exec was done.
98                 # This means that if we don't do a os._exit(0) here the code that 
99                 # follows the call to "Server.run()" in the "caller code" will be 
100                 # executed... but by now it has already been executed after the 
101                 # first process (the one that did the first fork) returned.
102                 os._exit(0)
103         except:
104             print >>sys.stderr, "SERVER_ERROR."
105             self.log_error()
106             self.cleanup()
107             os._exit(0)
108         print >>sys.stderr, "SERVER_READY."
109
110     def daemonize(self):
111         # pipes for process synchronization
112         (r, w) = os.pipe()
113         
114         # build root folder
115         root = os.path.normpath(self._root_dir)
116         if self._root_dir not in [".", ""] and os.path.exists(root) \
117                 and self._clean_root:
118             shutil.rmtree(root)
119         if not os.path.exists(root):
120             os.makedirs(root, 0755)
121
122         pid1 = os.fork()
123         if pid1 > 0:
124             os.close(w)
125             while True:
126                 try:
127                     os.read(r, 1)
128                 except OSError, e: # pragma: no cover
129                     if e.errno == errno.EINTR:
130                         continue
131                     else:
132                         raise
133                 break
134             os.close(r)
135             # os.waitpid avoids leaving a <defunc> (zombie) process
136             st = os.waitpid(pid1, 0)[1]
137             if st:
138                 raise RuntimeError("Daemonization failed")
139             # return 0 to inform the caller method that this is not the 
140             # daemonized process
141             return 0
142         os.close(r)
143
144         # Decouple from parent environment.
145         os.chdir(self._root_dir)
146         os.umask(0)
147         os.setsid()
148
149         # fork 2
150         pid2 = os.fork()
151         if pid2 > 0:
152             # see ref: "os._exit(0)"
153             os._exit(0)
154
155         # close all open file descriptors.
156         max_fd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
157         if (max_fd == resource.RLIM_INFINITY):
158             max_fd = MAX_FD
159         for fd in range(3, max_fd):
160             if fd != w:
161                 try:
162                     os.close(fd)
163                 except OSError:
164                     pass
165
166         # Redirect standard file descriptors.
167         stdin = open(DEV_NULL, "r")
168         stderr = stdout = open(STD_ERR, "a", 0)
169         os.dup2(stdin.fileno(), sys.stdin.fileno())
170         # NOTE: sys.stdout.write will still be buffered, even if the file
171         # was opened with 0 buffer
172         os.dup2(stdout.fileno(), sys.stdout.fileno())
173         os.dup2(stderr.fileno(), sys.stderr.fileno())
174         
175         # setup environment
176         if self._environment_setup:
177             # parse environment variables and pass to child process
178             # do it by executing shell commands, in case there's some heavy setup involved
179             envproc = subprocess.Popen(
180                 [ "bash", "-c", 
181                     "( %s python -c 'import os,sys ; print \"\\x01\".join(\"\\x02\".join(map(str,x)) for x in os.environ.iteritems())' ) | tail -1" %
182                         ( self._environment_setup, ) ],
183                 stdin = subprocess.PIPE, 
184                 stdout = subprocess.PIPE,
185                 stderr = subprocess.PIPE
186             )
187             out,err = envproc.communicate()
188
189             # parse new environment
190             if out:
191                 environment = dict(map(lambda x:x.split("\x02"), out.split("\x01")))
192             
193                 # apply to current environment
194                 for name, value in environment.iteritems():
195                     os.environ[name] = value
196                 
197                 # apply pythonpath
198                 if 'PYTHONPATH' in environment:
199                     sys.path = environment['PYTHONPATH'].split(':') + sys.path
200
201         # create control socket
202         self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
203         try:
204             self._ctrl_sock.bind(CTRL_SOCK)
205         except socket.error:
206             # Address in use, check pidfile
207             pid = None
208             try:
209                 pidfile = open(CTRL_PID, "r")
210                 pid = pidfile.read()
211                 pidfile.close()
212                 pid = int(pid)
213             except:
214                 # no pidfile
215                 pass
216             
217             if pid is not None:
218                 # Check process liveliness
219                 if not os.path.exists("/proc/%d" % (pid,)):
220                     # Ok, it's dead, clean the socket
221                     os.remove(CTRL_SOCK)
222             
223             # try again
224             self._ctrl_sock.bind(CTRL_SOCK)
225             
226         self._ctrl_sock.listen(0)
227         
228         # Save pidfile
229         pidfile = open(CTRL_PID, "w")
230         pidfile.write(str(os.getpid()))
231         pidfile.close()
232
233         # let the parent process know that the daemonization is finished
234         os.write(w, "\n")
235         os.close(w)
236         return 1
237
238     def post_daemonize(self):
239         os.environ["NEPI_CONTROLLER_LOGLEVEL"] = self._log_level
240         # QT, for some strange reason, redefines the SIGCHILD handler to write
241         # a \0 to a fd (lets say fileno 'x'), when ever a SIGCHILD is received.
242         # Server dameonization closes all file descriptors from fileno '3',
243         # but the overloaded handler (inherited by the forked process) will
244         # keep trying to write the \0 to fileno 'x', which might have been reused 
245         # after closing, for other operations. This is bad bad bad when fileno 'x'
246         # is in use for communication pouroses, because unexpected \0 start
247         # appearing in the communication messages... this is exactly what happens 
248         # when using netns in daemonized form. Thus, be have no other alternative than
249         # restoring the SIGCHLD handler to the default here.
250         import signal
251         signal.signal(signal.SIGCHLD, signal.SIG_DFL)
252
253     def loop(self):
254         while not self._stop:
255             conn, addr = self._ctrl_sock.accept()
256             self.log_error("ACCEPTED CONNECTION: %s" % (addr,))
257             conn.settimeout(5)
258             while not self._stop:
259                 try:
260                     msg = self.recv_msg(conn)
261                 except socket.timeout, e:
262                     #self.log_error("SERVER recv_msg: connection timedout ")
263                     continue
264                 
265                 if not msg:
266                     self.log_error("CONNECTION LOST")
267                     break
268                     
269                 if msg == STOP_MSG:
270                     self._stop = True
271                     reply = self.stop_action()
272                 else:
273                     reply = self.reply_action(msg)
274                 
275                 try:
276                     self.send_reply(conn, reply)
277                 except socket.error:
278                     self.log_error()
279                     self.log_error("NOTICE: Awaiting for reconnection")
280                     break
281             try:
282                 conn.close()
283             except:
284                 # Doesn't matter
285                 self.log_error()
286
287     def recv_msg(self, conn):
288         data = [self._rdbuf]
289         chunk = data[0]
290         while '\n' not in chunk:
291             try:
292                 chunk = conn.recv(1024)
293             except (OSError, socket.error), e:
294                 if e[0] != errno.EINTR:
295                     raise
296                 else:
297                     continue
298             if chunk:
299                 data.append(chunk)
300             else:
301                 # empty chunk = EOF
302                 break
303         data = ''.join(data).split('\n',1)
304         while len(data) < 2:
305             data.append('')
306         data, self._rdbuf = data
307         
308         decoded = base64.b64decode(data)
309         return decoded.rstrip()
310
311     def send_reply(self, conn, reply):
312         encoded = base64.b64encode(reply)
313         conn.send("%s\n" % encoded)
314        
315     def cleanup(self):
316         try:
317             self._ctrl_sock.close()
318             os.remove(CTRL_SOCK)
319         except:
320             self.log_error()
321
322     def stop_action(self):
323         return "Stopping server"
324
325     def reply_action(self, msg):
326         return "Reply to: %s" % msg
327
328     def log_error(self, text = None, context = ''):
329         if text == None:
330             text = traceback.format_exc()
331         date = time.strftime("%Y-%m-%d %H:%M:%S")
332         if context:
333             context = " (%s)" % (context,)
334         sys.stderr.write("ERROR%s: %s\n%s\n" % (context, date, text))
335         return text
336
337     def log_debug(self, text):
338         if self._log_level == DC.DEBUG_LEVEL:
339             date = time.strftime("%Y-%m-%d %H:%M:%S")
340             sys.stderr.write("DEBUG: %s\n%s\n" % (date, text))
341
342 class Forwarder(object):
343     def __init__(self, root_dir = "."):
344         self._ctrl_sock = None
345         self._root_dir = root_dir
346         self._stop = False
347         self._rdbuf = ""
348
349     def forward(self):
350         self.connect()
351         print >>sys.stderr, "FORWARDER_READY."
352         while not self._stop:
353             data = self.read_data()
354             if not data:
355                 # Connection to client lost
356                 break
357             self.send_to_server(data)
358             
359             data = self.recv_from_server()
360             if not data:
361                 # Connection to server lost
362                 raise IOError, "Connection to server lost while "\
363                     "expecting response"
364             self.write_data(data)
365         self.disconnect()
366
367     def read_data(self):
368         return sys.stdin.readline()
369
370     def write_data(self, data):
371         sys.stdout.write(data)
372         # sys.stdout.write is buffered, this is why we need to do a flush()
373         sys.stdout.flush()
374
375     def send_to_server(self, data):
376         try:
377             self._ctrl_sock.send(data)
378         except (IOError, socket.error), e:
379             if e[0] == errno.EPIPE:
380                 self.connect()
381                 self._ctrl_sock.send(data)
382             else:
383                 raise e
384         encoded = data.rstrip() 
385         msg = base64.b64decode(encoded)
386         if msg == STOP_MSG:
387             self._stop = True
388
389     def recv_from_server(self):
390         data = [self._rdbuf]
391         chunk = data[0]
392         while '\n' not in chunk:
393             try:
394                 chunk = self._ctrl_sock.recv(1024)
395             except (OSError, socket.error), e:
396                 if e[0] != errno.EINTR:
397                     raise
398                 continue
399             if chunk:
400                 data.append(chunk)
401             else:
402                 # empty chunk = EOF
403                 break
404         data = ''.join(data).split('\n',1)
405         while len(data) < 2:
406             data.append('')
407         data, self._rdbuf = data
408         
409         return data+'\n'
410  
411     def connect(self):
412         self.disconnect()
413         self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
414         sock_addr = os.path.join(self._root_dir, CTRL_SOCK)
415         self._ctrl_sock.connect(sock_addr)
416
417     def disconnect(self):
418         try:
419             self._ctrl_sock.close()
420         except:
421             pass
422
423 class Client(object):
424     def __init__(self, root_dir = ".", host = None, port = None, user = None, 
425             agent = None, sudo = False, communication = DC.ACCESS_LOCAL,
426             environment_setup = ""):
427         self.root_dir = root_dir
428         self.addr = (host, port)
429         self.user = user
430         self.agent = agent
431         self.sudo = sudo
432         self.communication = communication
433         self.environment_setup = environment_setup
434         self._stopped = False
435         self._deferreds = collections.deque()
436         self.connect()
437     
438     def __del__(self):
439         if self._process.poll() is None:
440             os.kill(self._process.pid, signal.SIGTERM)
441         self._process.wait()
442         
443     def connect(self):
444         root_dir = self.root_dir
445         (host, port) = self.addr
446         user = self.user
447         agent = self.agent
448         sudo = self.sudo
449         communication = self.communication
450         
451         python_code = "from nepi.util import server;c=server.Forwarder(%r);\
452                 c.forward()" % (root_dir,)
453
454         self._process = popen_python(python_code, 
455                     communication = communication,
456                     host = host, 
457                     port = port, 
458                     user = user, 
459                     agent = agent, 
460                     sudo = sudo, 
461                     environment_setup = self.environment_setup)
462                
463         # Wait for the forwarder to be ready, otherwise nobody
464         # will be able to connect to it
465         err = []
466         helo = "nope"
467         while helo:
468             helo = self._process.stderr.readline()
469             if helo == 'FORWARDER_READY.\n':
470                 break
471             err.append(helo)
472         else:
473             raise AssertionError, "Expected 'FORWARDER_READY.', got: %s" % (''.join(err),)
474         
475     def send_msg(self, msg):
476         encoded = base64.b64encode(msg)
477         data = "%s\n" % encoded
478         
479         try:
480             self._process.stdin.write(data)
481         except (IOError, ValueError):
482             # dead process, poll it to un-zombify
483             self._process.poll()
484             
485             # try again after reconnect
486             # If it fails again, though, give up
487             self.connect()
488             self._process.stdin.write(data)
489
490     def send_stop(self):
491         self.send_msg(STOP_MSG)
492         self._stopped = True
493
494     def defer_reply(self, transform=None):
495         defer_entry = []
496         self._deferreds.append(defer_entry)
497         return defer.Defer(
498             functools.partial(self.read_reply, defer_entry, transform)
499         )
500         
501     def _read_reply(self):
502         data = self._process.stdout.readline()
503         encoded = data.rstrip() 
504         if not encoded:
505             # empty == eof == dead process, poll it to un-zombify
506             self._process.poll()
507             
508             raise RuntimeError, "Forwarder died while awaiting reply: %s" % (self._process.stderr.read(),)
509         return base64.b64decode(encoded)
510     
511     def read_reply(self, which=None, transform=None):
512         # Test to see if someone did it already
513         if which is not None and len(which):
514             # Ok, they did it...
515             # ...just return the deferred value
516             if transform:
517                 return transform(which[0])
518             else:
519                 return which[0]
520         
521         # Process all deferreds until the one we're looking for
522         # or until the queue is empty
523         while self._deferreds:
524             try:
525                 deferred = self._deferreds.popleft()
526             except IndexError:
527                 # emptied
528                 break
529             
530             deferred.append(self._read_reply())
531             if deferred is which:
532                 # We reached the one we were looking for
533                 if transform:
534                     return transform(deferred[0])
535                 else:
536                     return deferred[0]
537         
538         if which is None:
539             # They've requested a synchronous read
540             if transform:
541                 return transform(self._read_reply())
542             else:
543                 return self._read_reply()
544
545 def _make_server_key_args(server_key, host, port, args):
546     """ 
547     Returns a reference to the created temporary file, and adds the
548     corresponding arguments to the given argument list.
549     
550     Make sure to hold onto it until the process is done with the file
551     """
552     if port is not None:
553         host = '%s:%s' % (host,port)
554     # Create a temporary server key file
555     tmp_known_hosts = tempfile.NamedTemporaryFile()
556     
557     # Add the intended host key
558     tmp_known_hosts.write('%s,%s %s\n' % (host, socket.gethostbyname(host), server_key))
559     
560     # If we're not in strict mode, add user-configured keys
561     if os.environ.get('NEPI_STRICT_AUTH_MODE',"").lower() not in ('1','true','on'):
562         user_hosts_path = '%s/.ssh/known_hosts' % (os.environ.get('HOME',""),)
563         if os.access(user_hosts_path, os.R_OK):
564             f = open(user_hosts_path, "r")
565             tmp_known_hosts.write(f.read())
566             f.close()
567         
568     tmp_known_hosts.flush()
569     
570     args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
571     
572     return tmp_known_hosts
573
574 def popen_ssh_command(command, host, port, user, agent, 
575         stdin="", 
576         ident_key = None,
577         server_key = None,
578         tty = False,
579         timeout = None,
580         retry = 0,
581         err_on_timeout = True):
582     """
583     Executes a remote commands, returns ((stdout,stderr),process)
584     """
585     if TRACE:
586         print "ssh", host, command
587     
588     tmp_known_hosts = None
589     args = ['ssh',
590             # Don't bother with localhost. Makes test easier
591             '-o', 'NoHostAuthenticationForLocalhost=yes',
592             '-l', user, host]
593     if agent:
594         args.append('-A')
595     if port:
596         args.append('-p%d' % port)
597     if ident_key:
598         args.extend(('-i', ident_key))
599     if tty:
600         args.append('-t')
601     if server_key:
602         # Create a temporary server key file
603         tmp_known_hosts = _make_server_key_args(
604             server_key, host, port, args)
605     args.append(command)
606
607     while 1:
608         # connects to the remote host and starts a remote connection
609         proc = subprocess.Popen(args, 
610                 stdout = subprocess.PIPE,
611                 stdin = subprocess.PIPE, 
612                 stderr = subprocess.PIPE)
613         
614         # attach tempfile object to the process, to make sure the file stays
615         # alive until the process is finished with it
616         proc._known_hosts = tmp_known_hosts
617         
618         try:
619             out, err = _communicate(proc, stdin, timeout, err_on_timeout)
620             break
621         except RuntimeError,e:
622             if retry <= 0:
623                 raise
624             if TRACE:
625                 print " timedout -> ", e.args
626             retry -= 1
627         
628     if TRACE:
629         print " -> ", out, err
630
631     return ((out, err), proc)
632
633 def popen_scp(source, dest, 
634         port = None, 
635         agent = None, 
636         recursive = False,
637         ident_key = None,
638         server_key = None):
639     """
640     Copies from/to remote sites.
641     
642     Source and destination should have the user and host encoded
643     as per scp specs.
644     
645     If source is a file object, a special mode will be used to
646     create the remote file with the same contents.
647     
648     If dest is a file object, the remote file (source) will be
649     read and written into dest.
650     
651     In these modes, recursive cannot be True.
652     
653     Source can be a list of files to copy to a single destination,
654     in which case it is advised that the destination be a folder.
655     """
656     
657     if TRACE:
658         print "scp", source, dest
659     
660     if isinstance(source, file) and source.tell() == 0:
661         source = source.name
662     elif hasattr(source, 'read'):
663         tmp = tempfile.NamedTemporaryFile()
664         while True:
665             buf = source.read(65536)
666             if buf:
667                 tmp.write(buf)
668             else:
669                 break
670         tmp.seek(0)
671         source = tmp.name
672     
673     if isinstance(source, file) or isinstance(dest, file) \
674             or hasattr(source, 'read')  or hasattr(dest, 'write'):
675         assert not recursive
676         
677         # Parse source/destination as <user>@<server>:<path>
678         if isinstance(dest, basestring) and ':' in dest:
679             remspec, path = dest.split(':',1)
680         elif isinstance(source, basestring) and ':' in source:
681             remspec, path = source.split(':',1)
682         else:
683             raise ValueError, "Both endpoints cannot be local"
684         user,host = remspec.rsplit('@',1)
685         tmp_known_hosts = None
686         
687         args = ['ssh', '-l', user, '-C',
688                 # Don't bother with localhost. Makes test easier
689                 '-o', 'NoHostAuthenticationForLocalhost=yes',
690                 host ]
691         if port:
692             args.append('-P%d' % port)
693         if ident_key:
694             args.extend(('-i', ident_key))
695         if server_key:
696             # Create a temporary server key file
697             tmp_known_hosts = _make_server_key_args(
698                 server_key, host, port, args)
699         
700         if isinstance(source, file) or hasattr(source, 'read'):
701             args.append('cat > %s' % (shell_escape(path),))
702         elif isinstance(dest, file) or hasattr(dest, 'write'):
703             args.append('cat %s' % (shell_escape(path),))
704         else:
705             raise AssertionError, "Unreachable code reached! :-Q"
706         
707         # connects to the remote host and starts a remote connection
708         if isinstance(source, file):
709             proc = subprocess.Popen(args, 
710                     stdout = open('/dev/null','w'),
711                     stderr = subprocess.PIPE,
712                     stdin = source)
713             err = proc.stderr.read()
714             proc._known_hosts = tmp_known_hosts
715             eintr_retry(proc.wait)()
716             return ((None,err), proc)
717         elif isinstance(dest, file):
718             proc = subprocess.Popen(args, 
719                     stdout = open('/dev/null','w'),
720                     stderr = subprocess.PIPE,
721                     stdin = source)
722             err = proc.stderr.read()
723             proc._known_hosts = tmp_known_hosts
724             eintr_retry(proc.wait)()
725             return ((None,err), proc)
726         elif hasattr(source, 'read'):
727             # file-like (but not file) source
728             proc = subprocess.Popen(args, 
729                     stdout = open('/dev/null','w'),
730                     stderr = subprocess.PIPE,
731                     stdin = subprocess.PIPE)
732             
733             buf = None
734             err = []
735             while True:
736                 if not buf:
737                     buf = source.read(4096)
738                 if not buf:
739                     #EOF
740                     break
741                 
742                 rdrdy, wrdy, broken = select.select(
743                     [proc.stderr],
744                     [proc.stdin],
745                     [proc.stderr,proc.stdin])
746                 
747                 if proc.stderr in rdrdy:
748                     # use os.read for fully unbuffered behavior
749                     err.append(os.read(proc.stderr.fileno(), 4096))
750                 
751                 if proc.stdin in wrdy:
752                     proc.stdin.write(buf)
753                     buf = None
754                 
755                 if broken:
756                     break
757             proc.stdin.close()
758             err.append(proc.stderr.read())
759                 
760             proc._known_hosts = tmp_known_hosts
761             eintr_retry(proc.wait)()
762             return ((None,''.join(err)), proc)
763         elif hasattr(dest, 'write'):
764             # file-like (but not file) dest
765             proc = subprocess.Popen(args, 
766                     stdout = subprocess.PIPE,
767                     stderr = subprocess.PIPE,
768                     stdin = open('/dev/null','w'))
769             
770             buf = None
771             err = []
772             while True:
773                 rdrdy, wrdy, broken = select.select(
774                     [proc.stderr, proc.stdout],
775                     [],
776                     [proc.stderr, proc.stdout])
777                 
778                 if proc.stderr in rdrdy:
779                     # use os.read for fully unbuffered behavior
780                     err.append(os.read(proc.stderr.fileno(), 4096))
781                 
782                 if proc.stdout in rdrdy:
783                     # use os.read for fully unbuffered behavior
784                     buf = os.read(proc.stdout.fileno(), 4096)
785                     dest.write(buf)
786                     
787                     if not buf:
788                         #EOF
789                         break
790                 
791                 if broken:
792                     break
793             err.append(proc.stderr.read())
794                 
795             proc._known_hosts = tmp_known_hosts
796             eintr_retry(proc.wait)()
797             return ((None,''.join(err)), proc)
798         else:
799             raise AssertionError, "Unreachable code reached! :-Q"
800     else:
801         # Parse destination as <user>@<server>:<path>
802         if isinstance(dest, basestring) and ':' in dest:
803             remspec, path = dest.split(':',1)
804         elif isinstance(source, basestring) and ':' in source:
805             remspec, path = source.split(':',1)
806         else:
807             raise ValueError, "Both endpoints cannot be local"
808         user,host = remspec.rsplit('@',1)
809         
810         # plain scp
811         tmp_known_hosts = None
812         args = ['scp', '-q', '-p', '-C',
813                 # Don't bother with localhost. Makes test easier
814                 '-o', 'NoHostAuthenticationForLocalhost=yes' ]
815         if port:
816             args.append('-P%d' % port)
817         if recursive:
818             args.append('-r')
819         if ident_key:
820             args.extend(('-i', ident_key))
821         if server_key:
822             # Create a temporary server key file
823             tmp_known_hosts = _make_server_key_args(
824                 server_key, host, port, args)
825         if isinstance(source,list):
826             args.extend(source)
827         else:
828             args.append(source)
829         args.append(dest)
830
831         # connects to the remote host and starts a remote connection
832         proc = subprocess.Popen(args, 
833                 stdout = subprocess.PIPE,
834                 stdin = subprocess.PIPE, 
835                 stderr = subprocess.PIPE)
836         proc._known_hosts = tmp_known_hosts
837         
838         comm = proc.communicate()
839         eintr_retry(proc.wait)()
840         return (comm, proc)
841
842 def decode_and_execute():
843     # The python code we want to execute might have characters that 
844     # are not compatible with the 'inline' mode we are using. To avoid
845     # problems we receive the encoded python code in base64 as a input 
846     # stream and decode it for execution.
847     import base64, os
848     cmd = ""
849     while True:
850         try:
851             cmd += os.read(0, 1)# one byte from stdin
852         except OSError, e:            
853             if e.errno == errno.EINTR:
854                 continue
855             else:
856                 raise
857         if cmd[-1] == "\n": 
858             break
859     cmd = base64.b64decode(cmd)
860     # Uncomment for debug
861     #os.write(2, "Executing python code: %s\n" % cmd)
862     os.write(1, "OK\n") # send a sync message
863     exec(cmd)
864
865 def popen_python(python_code, 
866         communication = DC.ACCESS_LOCAL,
867         host = None, 
868         port = None, 
869         user = None, 
870         agent = False, 
871         python_path = None,
872         ident_key = None,
873         server_key = None,
874         tty = False,
875         sudo = False, 
876         environment_setup = ""):
877
878     cmd = ""
879     if python_path:
880         python_path.replace("'", r"'\''")
881         cmd = """PYTHONPATH="$PYTHONPATH":'%s' """ % python_path
882         cmd += " ; "
883     if environment_setup:
884         cmd += environment_setup
885         cmd += " ; "
886     # Uncomment for debug (to run everything under strace)
887     # We had to verify if strace works (cannot nest them)
888     #cmd += "if strace echo >/dev/null 2>&1; then CMD='strace -ff -tt -s 200 -o strace.out'; else CMD=''; fi\n"
889     #cmd += "$CMD "
890     #cmd += "strace -f -tt -s 200 -o strace$$.out "
891     import nepi
892     cmd += "python -c 'import sys; sys.path.insert(0,%s); from nepi.util import server; server.decode_and_execute()'" % (
893         repr(os.path.dirname(os.path.dirname(nepi.__file__))).replace("'",'"'),
894     )
895
896     if sudo:
897         if ';' in cmd:
898             cmd = "sudo bash -c " + shell_escape(cmd)
899         else:
900             cmd = "sudo " + cmd
901
902     if communication == DC.ACCESS_SSH:
903         tmp_known_hosts = None
904         args = ['ssh',
905                 # Don't bother with localhost. Makes test easier
906                 '-o', 'NoHostAuthenticationForLocalhost=yes',
907                 '-l', user, host]
908         if agent:
909             args.append('-A')
910         if port:
911             args.append('-p%d' % port)
912         if ident_key:
913             args.extend(('-i', ident_key))
914         if tty:
915             args.append('-t')
916         if server_key:
917             # Create a temporary server key file
918             tmp_known_hosts = _make_server_key_args(
919                 server_key, host, port, args)
920         args.append(cmd)
921     else:
922         args = [ "/bin/bash", "-c", cmd ]
923
924     # connects to the remote host and starts a remote
925     proc = subprocess.Popen(args,
926             shell = False, 
927             stdout = subprocess.PIPE,
928             stdin = subprocess.PIPE, 
929             stderr = subprocess.PIPE)
930
931     if communication == DC.ACCESS_SSH:
932         proc._known_hosts = tmp_known_hosts
933
934     # send the command to execute
935     os.write(proc.stdin.fileno(),
936             base64.b64encode(python_code) + "\n")
937  
938     while True: 
939         try:
940             msg = os.read(proc.stdout.fileno(), 3)
941             break
942         except OSError, e:            
943             if e.errno == errno.EINTR:
944                 continue
945             else:
946                 raise
947     
948     if msg != "OK\n":
949         raise RuntimeError, "Failed to start remote python interpreter: \nout:\n%s%s\nerr:\n%s" % (
950             msg, proc.stdout.read(), proc.stderr.read())
951
952     return proc
953
954 # POSIX
955 def _communicate(self, input, timeout=None, err_on_timeout=True):
956     read_set = []
957     write_set = []
958     stdout = None # Return
959     stderr = None # Return
960     
961     killed = False
962     
963     if timeout is not None:
964         timelimit = time.time() + timeout
965         killtime = timelimit + 4
966         bailtime = timelimit + 4
967
968     if self.stdin:
969         # Flush stdio buffer.  This might block, if the user has
970         # been writing to .stdin in an uncontrolled fashion.
971         self.stdin.flush()
972         if input:
973             write_set.append(self.stdin)
974         else:
975             self.stdin.close()
976     if self.stdout:
977         read_set.append(self.stdout)
978         stdout = []
979     if self.stderr:
980         read_set.append(self.stderr)
981         stderr = []
982
983     input_offset = 0
984     while read_set or write_set:
985         if timeout is not None:
986             curtime = time.time()
987             if timeout is None or curtime > timelimit:
988                 if curtime > bailtime:
989                     break
990                 elif curtime > killtime:
991                     signum = signal.SIGKILL
992                 else:
993                     signum = signal.SIGTERM
994                 # Lets kill it
995                 os.kill(self.pid, signum)
996                 select_timeout = 0.5
997             else:
998                 select_timeout = timelimit - curtime + 0.1
999         else:
1000             select_timeout = None
1001             
1002         try:
1003             rlist, wlist, xlist = select.select(read_set, write_set, [], select_timeout)
1004         except select.error,e:
1005             if e[0] != 4:
1006                 raise
1007             else:
1008                 continue
1009
1010         if self.stdin in wlist:
1011             # When select has indicated that the file is writable,
1012             # we can write up to PIPE_BUF bytes without risk
1013             # blocking.  POSIX defines PIPE_BUF >= 512
1014             bytes_written = os.write(self.stdin.fileno(), buffer(input, input_offset, 512))
1015             input_offset += bytes_written
1016             if input_offset >= len(input):
1017                 self.stdin.close()
1018                 write_set.remove(self.stdin)
1019
1020         if self.stdout in rlist:
1021             data = os.read(self.stdout.fileno(), 1024)
1022             if data == "":
1023                 self.stdout.close()
1024                 read_set.remove(self.stdout)
1025             stdout.append(data)
1026
1027         if self.stderr in rlist:
1028             data = os.read(self.stderr.fileno(), 1024)
1029             if data == "":
1030                 self.stderr.close()
1031                 read_set.remove(self.stderr)
1032             stderr.append(data)
1033     
1034     # All data exchanged.  Translate lists into strings.
1035     if stdout is not None:
1036         stdout = ''.join(stdout)
1037     if stderr is not None:
1038         stderr = ''.join(stderr)
1039
1040     # Translate newlines, if requested.  We cannot let the file
1041     # object do the translation: It is based on stdio, which is
1042     # impossible to combine with select (unless forcing no
1043     # buffering).
1044     if self.universal_newlines and hasattr(file, 'newlines'):
1045         if stdout:
1046             stdout = self._translate_newlines(stdout)
1047         if stderr:
1048             stderr = self._translate_newlines(stderr)
1049
1050     if killed and err_on_timeout:
1051         errcode = self.poll()
1052         raise RuntimeError, ("Operation timed out", errcode, stdout, stderr)
1053     else:
1054         if killed:
1055             self.poll()
1056         else:
1057             self.wait()
1058         return (stdout, stderr)
1059