2 # -*- coding: utf-8 -*-
4 from nepi.util.constants import DeploymentConfiguration as DC
26 CTRL_SOCK = "ctrl.sock"
27 STD_ERR = "stderr.log"
34 TRACE = os.environ.get("NEPI_TRACE", "false").lower() in ("true", "1", "on")
36 if hasattr(os, "devnull"):
39 DEV_NULL = "/dev/null"
41 SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
44 """ Escapes strings so that they are safe to use as command-line arguments """
45 if SHELL_SAFE.match(s):
46 # safe string - no escaping needed
49 # unsafe string - escape
51 if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",):
54 return "'$'\\x%02x''" % (ord(c),)
55 s = ''.join(map(escp,s))
58 def eintr_retry(func):
60 @functools.wraps(func)
62 retry = kw.pop("_retry", False)
63 for i in xrange(0 if retry else 4):
66 except (select.error, socket.error), args:
67 if args[0] == errno.EINTR:
72 if e.errno == errno.EINTR:
81 def __init__(self, root_dir = ".", log_level = ERROR_LEVEL, environment_setup = ""):
82 self._root_dir = root_dir
84 self._ctrl_sock = None
85 self._log_level = log_level
87 self._environment_setup = environment_setup
96 # can not return normally after fork beacuse no exec was done.
97 # This means that if we don't do a os._exit(0) here the code that
98 # follows the call to "Server.run()" in the "caller code" will be
99 # executed... but by now it has already been executed after the
100 # first process (the one that did the first fork) returned.
103 print >>sys.stderr, "SERVER_ERROR."
107 print >>sys.stderr, "SERVER_READY."
110 # pipes for process synchronization
114 root = os.path.normpath(self._root_dir)
115 if not os.path.exists(root):
116 os.makedirs(root, 0755)
124 except OSError, e: # pragma: no cover
125 if e.errno == errno.EINTR:
131 # os.waitpid avoids leaving a <defunc> (zombie) process
132 st = os.waitpid(pid1, 0)[1]
134 raise RuntimeError("Daemonization failed")
135 # return 0 to inform the caller method that this is not the
140 # Decouple from parent environment.
141 os.chdir(self._root_dir)
148 # see ref: "os._exit(0)"
151 # close all open file descriptors.
152 max_fd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
153 if (max_fd == resource.RLIM_INFINITY):
155 for fd in range(3, max_fd):
162 # Redirect standard file descriptors.
163 stdin = open(DEV_NULL, "r")
164 stderr = stdout = open(STD_ERR, "a", 0)
165 os.dup2(stdin.fileno(), sys.stdin.fileno())
166 # NOTE: sys.stdout.write will still be buffered, even if the file
167 # was opened with 0 buffer
168 os.dup2(stdout.fileno(), sys.stdout.fileno())
169 os.dup2(stderr.fileno(), sys.stderr.fileno())
172 if self._environment_setup:
173 # parse environment variables and pass to child process
174 # do it by executing shell commands, in case there's some heavy setup involved
175 envproc = subprocess.Popen(
177 "( %s python -c 'import os,sys ; print \"\\x01\".join(\"\\x02\".join(map(str,x)) for x in os.environ.iteritems())' ) | tail -1" %
178 ( self._environment_setup, ) ],
179 stdin = subprocess.PIPE,
180 stdout = subprocess.PIPE,
181 stderr = subprocess.PIPE
183 out,err = envproc.communicate()
185 # parse new environment
187 environment = dict(map(lambda x:x.split("\x02"), out.split("\x01")))
189 # apply to current environment
190 for name, value in environment.iteritems():
191 os.environ[name] = value
194 if 'PYTHONPATH' in environment:
195 sys.path = environment['PYTHONPATH'].split(':') + sys.path
197 # create control socket
198 self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
199 self._ctrl_sock.bind(CTRL_SOCK)
200 self._ctrl_sock.listen(0)
202 # let the parent process know that the daemonization is finished
207 def post_daemonize(self):
208 # QT, for some strange reason, redefines the SIGCHILD handler to write
209 # a \0 to a fd (lets say fileno 'x'), when ever a SIGCHILD is received.
210 # Server dameonization closes all file descriptors from fileno '3',
211 # but the overloaded handler (inherited by the forked process) will
212 # keep trying to write the \0 to fileno 'x', which might have been reused
213 # after closing, for other operations. This is bad bad bad when fileno 'x'
214 # is in use for communication pouroses, because unexpected \0 start
215 # appearing in the communication messages... this is exactly what happens
216 # when using netns in daemonized form. Thus, be have no other alternative than
217 # restoring the SIGCHLD handler to the default here.
219 signal.signal(signal.SIGCHLD, signal.SIG_DFL)
222 while not self._stop:
223 conn, addr = self._ctrl_sock.accept()
224 self.log_error("ACCEPTED CONNECTION: %s" % (addr,))
226 while not self._stop:
228 msg = self.recv_msg(conn)
229 except socket.timeout, e:
234 self.log_error("CONNECTION LOST")
239 reply = self.stop_action()
241 reply = self.reply_action(msg)
244 self.send_reply(conn, reply)
247 self.log_error("NOTICE: Awaiting for reconnection")
255 def recv_msg(self, conn):
258 while '\n' not in chunk:
260 chunk = conn.recv(1024)
261 except (OSError, socket.error), e:
262 if e[0] != errno.EINTR:
271 data = ''.join(data).split('\n',1)
274 data, self._rdbuf = data
276 decoded = base64.b64decode(data)
277 return decoded.rstrip()
279 def send_reply(self, conn, reply):
280 encoded = base64.b64encode(reply)
281 conn.send("%s\n" % encoded)
285 self._ctrl_sock.close()
290 def stop_action(self):
291 return "Stopping server"
293 def reply_action(self, msg):
294 return "Reply to: %s" % msg
296 def log_error(self, text = None, context = ''):
298 text = traceback.format_exc()
299 date = time.strftime("%Y-%m-%d %H:%M:%S")
301 context = " (%s)" % (context,)
302 sys.stderr.write("ERROR%s: %s\n%s\n" % (context, date, text))
305 def log_debug(self, text):
306 if self._log_level == DEBUG_LEVEL:
307 date = time.strftime("%Y-%m-%d %H:%M:%S")
308 sys.stderr.write("DEBUG: %s\n%s\n" % (date, text))
310 class Forwarder(object):
311 def __init__(self, root_dir = "."):
312 self._ctrl_sock = None
313 self._root_dir = root_dir
319 print >>sys.stderr, "FORWARDER_READY."
320 while not self._stop:
321 data = self.read_data()
323 # Connection to client lost
325 self.send_to_server(data)
327 data = self.recv_from_server()
329 # Connection to server lost
330 raise IOError, "Connection to server lost while "\
332 self.write_data(data)
336 return sys.stdin.readline()
338 def write_data(self, data):
339 sys.stdout.write(data)
340 # sys.stdout.write is buffered, this is why we need to do a flush()
343 def send_to_server(self, data):
345 self._ctrl_sock.send(data)
346 except (IOError, socket.error), e:
347 if e[0] == errno.EPIPE:
349 self._ctrl_sock.send(data)
352 encoded = data.rstrip()
353 msg = base64.b64decode(encoded)
357 def recv_from_server(self):
360 while '\n' not in chunk:
362 chunk = self._ctrl_sock.recv(1024)
363 except (OSError, socket.error), e:
364 if e[0] != errno.EINTR:
372 data = ''.join(data).split('\n',1)
375 data, self._rdbuf = data
381 self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
382 sock_addr = os.path.join(self._root_dir, CTRL_SOCK)
383 self._ctrl_sock.connect(sock_addr)
385 def disconnect(self):
387 self._ctrl_sock.close()
391 class Client(object):
392 def __init__(self, root_dir = ".", host = None, port = None, user = None,
393 agent = None, sudo = False, communication = DC.ACCESS_LOCAL,
394 environment_setup = ""):
395 self.root_dir = root_dir
396 self.addr = (host, port)
400 self.communication = communication
401 self.environment_setup = environment_setup
402 self._stopped = False
403 self._deferreds = collections.deque()
407 if self._process.poll() is None:
408 os.kill(self._process.pid, signal.SIGTERM)
412 root_dir = self.root_dir
413 (host, port) = self.addr
417 communication = self.communication
419 python_code = "from nepi.util import server;c=server.Forwarder(%r);\
420 c.forward()" % (root_dir,)
422 self._process = popen_python(python_code,
423 communication = communication,
429 environment_setup = self.environment_setup)
431 # Wait for the forwarder to be ready, otherwise nobody
432 # will be able to connect to it
433 helo = self._process.stderr.readline()
434 if helo != 'FORWARDER_READY.\n':
435 raise AssertionError, "Expected 'FORWARDER_READY.', got %r: %s" % (helo,
436 helo + self._process.stderr.read())
438 def send_msg(self, msg):
439 encoded = base64.b64encode(msg)
440 data = "%s\n" % encoded
443 self._process.stdin.write(data)
444 except (IOError, ValueError):
445 # dead process, poll it to un-zombify
448 # try again after reconnect
449 # If it fails again, though, give up
451 self._process.stdin.write(data)
454 self.send_msg(STOP_MSG)
457 def defer_reply(self, transform=None):
459 self._deferreds.append(defer_entry)
461 functools.partial(self.read_reply, defer_entry, transform)
464 def _read_reply(self):
465 data = self._process.stdout.readline()
466 encoded = data.rstrip()
468 # empty == eof == dead process, poll it to un-zombify
471 raise RuntimeError, "Forwarder died while awaiting reply: %s" % (self._process.stderr.read(),)
472 return base64.b64decode(encoded)
474 def read_reply(self, which=None, transform=None):
475 # Test to see if someone did it already
476 if which is not None and len(which):
478 # ...just return the deferred value
480 return transform(which[0])
484 # Process all deferreds until the one we're looking for
485 # or until the queue is empty
486 while self._deferreds:
488 deferred = self._deferreds.popleft()
493 deferred.append(self._read_reply())
494 if deferred is which:
495 # We reached the one we were looking for
497 return transform(deferred[0])
502 # They've requested a synchronous read
504 return transform(self._read_reply())
506 return self._read_reply()
508 def _make_server_key_args(server_key, host, port, args):
510 Returns a reference to the created temporary file, and adds the
511 corresponding arguments to the given argument list.
513 Make sure to hold onto it until the process is done with the file
516 host = '%s:%s' % (host,port)
517 # Create a temporary server key file
518 tmp_known_hosts = tempfile.NamedTemporaryFile()
520 # Add the intended host key
521 tmp_known_hosts.write('%s,%s %s\n' % (host, socket.gethostbyname(host), server_key))
523 # If we're not in strict mode, add user-configured keys
524 if os.environ.get('NEPI_STRICT_AUTH_MODE',"").lower() not in ('1','true','on'):
525 user_hosts_path = '%s/.ssh/known_hosts' % (os.environ.get('HOME',""),)
526 if os.access(user_hosts_path, os.R_OK):
527 f = open(user_hosts_path, "r")
528 tmp_known_hosts.write(f.read())
531 tmp_known_hosts.flush()
533 args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
535 return tmp_known_hosts
537 def popen_ssh_command(command, host, port, user, agent,
544 err_on_timeout = True):
546 Executes a remote commands, returns ((stdout,stderr),process)
549 print "ssh", host, command
551 tmp_known_hosts = None
553 # Don't bother with localhost. Makes test easier
554 '-o', 'NoHostAuthenticationForLocalhost=yes',
559 args.append('-p%d' % port)
561 args.extend(('-i', ident_key))
565 # Create a temporary server key file
566 tmp_known_hosts = _make_server_key_args(
567 server_key, host, port, args)
571 # connects to the remote host and starts a remote connection
572 proc = subprocess.Popen(args,
573 stdout = subprocess.PIPE,
574 stdin = subprocess.PIPE,
575 stderr = subprocess.PIPE)
577 # attach tempfile object to the process, to make sure the file stays
578 # alive until the process is finished with it
579 proc._known_hosts = tmp_known_hosts
582 out, err = _communicate(proc, stdin, timeout, err_on_timeout)
584 except RuntimeError,e:
588 print " timedout -> ", e.args
592 print " -> ", out, err
594 return ((out, err), proc)
596 def popen_scp(source, dest,
603 Copies from/to remote sites.
605 Source and destination should have the user and host encoded
608 If source is a file object, a special mode will be used to
609 create the remote file with the same contents.
611 If dest is a file object, the remote file (source) will be
612 read and written into dest.
614 In these modes, recursive cannot be True.
616 Source can be a list of files to copy to a single destination,
617 in which case it is advised that the destination be a folder.
621 print "scp", source, dest
623 if isinstance(source, file) and source.tell() == 0:
625 elif hasattr(source, 'read'):
626 tmp = tempfile.NamedTemporaryFile()
628 buf = source.read(65536)
636 if isinstance(source, file) or isinstance(dest, file) \
637 or hasattr(source, 'read') or hasattr(dest, 'write'):
640 # Parse source/destination as <user>@<server>:<path>
641 if isinstance(dest, basestring) and ':' in dest:
642 remspec, path = dest.split(':',1)
643 elif isinstance(source, basestring) and ':' in source:
644 remspec, path = source.split(':',1)
646 raise ValueError, "Both endpoints cannot be local"
647 user,host = remspec.rsplit('@',1)
648 tmp_known_hosts = None
650 args = ['ssh', '-l', user, '-C',
651 # Don't bother with localhost. Makes test easier
652 '-o', 'NoHostAuthenticationForLocalhost=yes',
655 args.append('-P%d' % port)
657 args.extend(('-i', ident_key))
659 # Create a temporary server key file
660 tmp_known_hosts = _make_server_key_args(
661 server_key, host, port, args)
663 if isinstance(source, file) or hasattr(source, 'read'):
664 args.append('cat > %s' % (shell_escape(path),))
665 elif isinstance(dest, file) or hasattr(dest, 'write'):
666 args.append('cat %s' % (shell_escape(path),))
668 raise AssertionError, "Unreachable code reached! :-Q"
670 # connects to the remote host and starts a remote connection
671 if isinstance(source, file):
672 proc = subprocess.Popen(args,
673 stdout = open('/dev/null','w'),
674 stderr = subprocess.PIPE,
676 err = proc.stderr.read()
677 proc._known_hosts = tmp_known_hosts
678 eintr_retry(proc.wait)()
679 return ((None,err), proc)
680 elif isinstance(dest, file):
681 proc = subprocess.Popen(args,
682 stdout = open('/dev/null','w'),
683 stderr = subprocess.PIPE,
685 err = proc.stderr.read()
686 proc._known_hosts = tmp_known_hosts
687 eintr_retry(proc.wait)()
688 return ((None,err), proc)
689 elif hasattr(source, 'read'):
690 # file-like (but not file) source
691 proc = subprocess.Popen(args,
692 stdout = open('/dev/null','w'),
693 stderr = subprocess.PIPE,
694 stdin = subprocess.PIPE)
700 buf = source.read(4096)
705 rdrdy, wrdy, broken = select.select(
708 [proc.stderr,proc.stdin])
710 if proc.stderr in rdrdy:
711 # use os.read for fully unbuffered behavior
712 err.append(os.read(proc.stderr.fileno(), 4096))
714 if proc.stdin in wrdy:
715 proc.stdin.write(buf)
721 err.append(proc.stderr.read())
723 proc._known_hosts = tmp_known_hosts
724 eintr_retry(proc.wait)()
725 return ((None,''.join(err)), proc)
726 elif hasattr(dest, 'write'):
727 # file-like (but not file) dest
728 proc = subprocess.Popen(args,
729 stdout = subprocess.PIPE,
730 stderr = subprocess.PIPE,
731 stdin = open('/dev/null','w'))
736 rdrdy, wrdy, broken = select.select(
737 [proc.stderr, proc.stdout],
739 [proc.stderr, proc.stdout])
741 if proc.stderr in rdrdy:
742 # use os.read for fully unbuffered behavior
743 err.append(os.read(proc.stderr.fileno(), 4096))
745 if proc.stdout in rdrdy:
746 # use os.read for fully unbuffered behavior
747 buf = os.read(proc.stdout.fileno(), 4096)
756 err.append(proc.stderr.read())
758 proc._known_hosts = tmp_known_hosts
759 eintr_retry(proc.wait)()
760 return ((None,''.join(err)), proc)
762 raise AssertionError, "Unreachable code reached! :-Q"
764 # Parse destination as <user>@<server>:<path>
765 if isinstance(dest, basestring) and ':' in dest:
766 remspec, path = dest.split(':',1)
767 elif isinstance(source, basestring) and ':' in source:
768 remspec, path = source.split(':',1)
770 raise ValueError, "Both endpoints cannot be local"
771 user,host = remspec.rsplit('@',1)
774 tmp_known_hosts = None
775 args = ['scp', '-q', '-p', '-C',
776 # Don't bother with localhost. Makes test easier
777 '-o', 'NoHostAuthenticationForLocalhost=yes' ]
779 args.append('-P%d' % port)
783 args.extend(('-i', ident_key))
785 # Create a temporary server key file
786 tmp_known_hosts = _make_server_key_args(
787 server_key, host, port, args)
788 if isinstance(source,list):
794 # connects to the remote host and starts a remote connection
795 proc = subprocess.Popen(args,
796 stdout = subprocess.PIPE,
797 stdin = subprocess.PIPE,
798 stderr = subprocess.PIPE)
799 proc._known_hosts = tmp_known_hosts
801 comm = proc.communicate()
802 eintr_retry(proc.wait)()
805 def decode_and_execute():
806 # The python code we want to execute might have characters that
807 # are not compatible with the 'inline' mode we are using. To avoid
808 # problems we receive the encoded python code in base64 as a input
809 # stream and decode it for execution.
813 cmd += os.read(0, 1)# one byte from stdin
816 cmd = base64.b64decode(cmd)
817 # Uncomment for debug
818 #os.write(2, "Executing python code: %s\n" % cmd)
819 os.write(1, "OK\n") # send a sync message
822 def popen_python(python_code,
823 communication = DC.ACCESS_LOCAL,
833 environment_setup = ""):
840 python_path.replace("'", r"'\''")
841 cmd = """PYTHONPATH="$PYTHONPATH":'%s' """ % python_path
843 if environment_setup:
844 cmd += environment_setup
846 # Uncomment for debug (to run everything under strace)
847 # We had to verify if strace works (cannot nest them)
848 #cmd += "if strace echo >/dev/null 2>&1; then CMD='strace -ff -tt -s 200 -o strace.out'; else CMD=''; fi\n"
850 #cmd += "strace -f -tt -s 200 -o strace$$.out "
852 cmd += "python -c 'import sys; sys.path.append(%s); from nepi.util import server; server.decode_and_execute()'" % (
853 repr(os.path.dirname(os.path.dirname(nepi.__file__))).replace("'",'"'),
856 if communication == DC.ACCESS_SSH:
857 tmp_known_hosts = None
859 # Don't bother with localhost. Makes test easier
860 '-o', 'NoHostAuthenticationForLocalhost=yes',
865 args.append('-p%d' % port)
867 args.extend(('-i', ident_key))
871 # Create a temporary server key file
872 tmp_known_hosts = _make_server_key_args(
873 server_key, host, port, args)
879 # connects to the remote host and starts a remote
880 proc = subprocess.Popen(args,
882 stdout = subprocess.PIPE,
883 stdin = subprocess.PIPE,
884 stderr = subprocess.PIPE)
886 if communication == DC.ACCESS_SSH:
887 proc._known_hosts = tmp_known_hosts
889 # send the command to execute
890 os.write(proc.stdin.fileno(),
891 base64.b64encode(python_code) + "\n")
892 msg = os.read(proc.stdout.fileno(), 3)
894 raise RuntimeError, "Failed to start remote python interpreter: \nout:\n%s%s\nerr:\n%s" % (
895 msg, proc.stdout.read(), proc.stderr.read())
900 def _communicate(self, input, timeout=None, err_on_timeout=True):
903 stdout = None # Return
904 stderr = None # Return
908 if timeout is not None:
909 timelimit = time.time() + timeout
910 killtime = timelimit + 4
911 bailtime = timelimit + 4
914 # Flush stdio buffer. This might block, if the user has
915 # been writing to .stdin in an uncontrolled fashion.
918 write_set.append(self.stdin)
922 read_set.append(self.stdout)
925 read_set.append(self.stderr)
929 while read_set or write_set:
930 if timeout is not None:
931 curtime = time.time()
932 if timeout is None or curtime > timelimit:
933 if curtime > bailtime:
935 elif curtime > killtime:
936 signum = signal.SIGKILL
938 signum = signal.SIGTERM
940 os.kill(self.pid, signum)
943 select_timeout = timelimit - curtime + 0.1
945 select_timeout = None
948 rlist, wlist, xlist = select.select(read_set, write_set, [], select_timeout)
949 except select.error,e:
955 if self.stdin in wlist:
956 # When select has indicated that the file is writable,
957 # we can write up to PIPE_BUF bytes without risk
958 # blocking. POSIX defines PIPE_BUF >= 512
959 bytes_written = os.write(self.stdin.fileno(), buffer(input, input_offset, 512))
960 input_offset += bytes_written
961 if input_offset >= len(input):
963 write_set.remove(self.stdin)
965 if self.stdout in rlist:
966 data = os.read(self.stdout.fileno(), 1024)
969 read_set.remove(self.stdout)
972 if self.stderr in rlist:
973 data = os.read(self.stderr.fileno(), 1024)
976 read_set.remove(self.stderr)
979 # All data exchanged. Translate lists into strings.
980 if stdout is not None:
981 stdout = ''.join(stdout)
982 if stderr is not None:
983 stderr = ''.join(stderr)
985 # Translate newlines, if requested. We cannot let the file
986 # object do the translation: It is based on stdio, which is
987 # impossible to combine with select (unless forcing no
989 if self.universal_newlines and hasattr(file, 'newlines'):
991 stdout = self._translate_newlines(stdout)
993 stderr = self._translate_newlines(stderr)
995 if killed and err_on_timeout:
996 errcode = self.poll()
997 raise RuntimeError, ("Operation timed out", errcode, stdout, stderr)
1003 return (stdout, stderr)