2 # -*- coding: utf-8 -*-
4 from nepi.util.constants import DeploymentConfiguration as DC
26 CTRL_SOCK = "ctrl.sock"
27 STD_ERR = "stderr.log"
34 TRACE = os.environ.get("NEPI_TRACE", "false").lower() in ("true", "1", "on")
36 if hasattr(os, "devnull"):
39 DEV_NULL = "/dev/null"
41 SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
44 """ Escapes strings so that they are safe to use as command-line arguments """
45 if SHELL_SAFE.match(s):
46 # safe string - no escaping needed
49 # unsafe string - escape
51 if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",):
54 return "'$'\\x%02x''" % (ord(c),)
55 s = ''.join(map(escp,s))
58 def eintr_retry(func):
60 @functools.wraps(func)
62 retry = kw.pop("_retry", False)
63 for i in xrange(0 if retry else 4):
66 except (select.error, socket.error), args:
67 if args[0] == errno.EINTR:
72 if e.errno == errno.EINTR:
81 def __init__(self, root_dir = ".", log_level = ERROR_LEVEL, environment_setup = ""):
82 self._root_dir = root_dir
84 self._ctrl_sock = None
85 self._log_level = log_level
87 self._environment_setup = environment_setup
96 # can not return normally after fork beacuse no exec was done.
97 # This means that if we don't do a os._exit(0) here the code that
98 # follows the call to "Server.run()" in the "caller code" will be
99 # executed... but by now it has already been executed after the
100 # first process (the one that did the first fork) returned.
103 print >>sys.stderr, "SERVER_ERROR."
107 print >>sys.stderr, "SERVER_READY."
110 # pipes for process synchronization
114 root = os.path.normpath(self._root_dir)
115 if not os.path.exists(root):
116 os.makedirs(root, 0755)
124 except OSError, e: # pragma: no cover
125 if e.errno == errno.EINTR:
131 # os.waitpid avoids leaving a <defunc> (zombie) process
132 st = os.waitpid(pid1, 0)[1]
134 raise RuntimeError("Daemonization failed")
135 # return 0 to inform the caller method that this is not the
140 # Decouple from parent environment.
141 os.chdir(self._root_dir)
148 # see ref: "os._exit(0)"
151 # close all open file descriptors.
152 max_fd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
153 if (max_fd == resource.RLIM_INFINITY):
155 for fd in range(3, max_fd):
162 # Redirect standard file descriptors.
163 stdin = open(DEV_NULL, "r")
164 stderr = stdout = open(STD_ERR, "a", 0)
165 os.dup2(stdin.fileno(), sys.stdin.fileno())
166 # NOTE: sys.stdout.write will still be buffered, even if the file
167 # was opened with 0 buffer
168 os.dup2(stdout.fileno(), sys.stdout.fileno())
169 os.dup2(stderr.fileno(), sys.stderr.fileno())
172 if self._environment_setup:
173 # parse environment variables and pass to child process
174 # do it by executing shell commands, in case there's some heavy setup involved
175 envproc = subprocess.Popen(
177 "( %s python -c 'import os,sys ; print \"\\x01\".join(\"\\x02\".join(map(str,x)) for x in os.environ.iteritems())' ) | tail -1" %
178 ( self._environment_setup, ) ],
179 stdin = subprocess.PIPE,
180 stdout = subprocess.PIPE,
181 stderr = subprocess.PIPE
183 out,err = envproc.communicate()
185 # parse new environment
187 environment = dict(map(lambda x:x.split("\x02"), out.split("\x01")))
189 # apply to current environment
190 for name, value in environment.iteritems():
191 os.environ[name] = value
194 if 'PYTHONPATH' in environment:
195 sys.path = environment['PYTHONPATH'].split(':') + sys.path
197 # create control socket
198 self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
199 self._ctrl_sock.bind(CTRL_SOCK)
200 self._ctrl_sock.listen(0)
202 # let the parent process know that the daemonization is finished
207 def post_daemonize(self):
208 # QT, for some strange reason, redefines the SIGCHILD handler to write
209 # a \0 to a fd (lets say fileno 'x'), when ever a SIGCHILD is received.
210 # Server dameonization closes all file descriptors from fileno '3',
211 # but the overloaded handler (inherited by the forked process) will
212 # keep trying to write the \0 to fileno 'x', which might have been reused
213 # after closing, for other operations. This is bad bad bad when fileno 'x'
214 # is in use for communication pouroses, because unexpected \0 start
215 # appearing in the communication messages... this is exactly what happens
216 # when using netns in daemonized form. Thus, be have no other alternative than
217 # restoring the SIGCHLD handler to the default here.
219 signal.signal(signal.SIGCHLD, signal.SIG_DFL)
222 while not self._stop:
223 conn, addr = self._ctrl_sock.accept()
224 self.log_error("ACCEPTED CONNECTION: %s" % (addr,))
226 while not self._stop:
228 msg = self.recv_msg(conn)
229 except socket.timeout, e:
234 self.log_error("CONNECTION LOST")
239 reply = self.stop_action()
241 reply = self.reply_action(msg)
244 self.send_reply(conn, reply)
247 self.log_error("NOTICE: Awaiting for reconnection")
255 def recv_msg(self, conn):
258 while '\n' not in chunk:
260 chunk = conn.recv(1024)
261 except (OSError, socket.error), e:
262 if e[0] != errno.EINTR:
271 data = ''.join(data).split('\n',1)
274 data, self._rdbuf = data
276 decoded = base64.b64decode(data)
277 return decoded.rstrip()
279 def send_reply(self, conn, reply):
280 encoded = base64.b64encode(reply)
281 conn.send("%s\n" % encoded)
285 self._ctrl_sock.close()
290 def stop_action(self):
291 return "Stopping server"
293 def reply_action(self, msg):
294 return "Reply to: %s" % msg
296 def log_error(self, text = None, context = ''):
298 text = traceback.format_exc()
299 date = time.strftime("%Y-%m-%d %H:%M:%S")
301 context = " (%s)" % (context,)
302 sys.stderr.write("ERROR%s: %s\n%s\n" % (context, date, text))
305 def log_debug(self, text):
306 if self._log_level == DEBUG_LEVEL:
307 date = time.strftime("%Y-%m-%d %H:%M:%S")
308 sys.stderr.write("DEBUG: %s\n%s\n" % (date, text))
310 class Forwarder(object):
311 def __init__(self, root_dir = "."):
312 self._ctrl_sock = None
313 self._root_dir = root_dir
319 print >>sys.stderr, "FORWARDER_READY."
320 while not self._stop:
321 data = self.read_data()
323 # Connection to client lost
325 self.send_to_server(data)
327 data = self.recv_from_server()
329 # Connection to server lost
330 raise IOError, "Connection to server lost while "\
332 self.write_data(data)
336 return sys.stdin.readline()
338 def write_data(self, data):
339 sys.stdout.write(data)
340 # sys.stdout.write is buffered, this is why we need to do a flush()
343 def send_to_server(self, data):
345 self._ctrl_sock.send(data)
346 except (IOError, socket.error), e:
347 if e[0] == errno.EPIPE:
349 self._ctrl_sock.send(data)
352 encoded = data.rstrip()
353 msg = base64.b64decode(encoded)
357 def recv_from_server(self):
360 while '\n' not in chunk:
362 chunk = self._ctrl_sock.recv(1024)
363 except (OSError, socket.error), e:
364 if e[0] != errno.EINTR:
372 data = ''.join(data).split('\n',1)
375 data, self._rdbuf = data
381 self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
382 sock_addr = os.path.join(self._root_dir, CTRL_SOCK)
383 self._ctrl_sock.connect(sock_addr)
385 def disconnect(self):
387 self._ctrl_sock.close()
391 class Client(object):
392 def __init__(self, root_dir = ".", host = None, port = None, user = None,
393 agent = None, sudo = False, communication = DC.ACCESS_LOCAL,
394 environment_setup = ""):
395 self.root_dir = root_dir
396 self.addr = (host, port)
400 self.communication = communication
401 self.environment_setup = environment_setup
402 self._stopped = False
403 self._deferreds = collections.deque()
407 if self._process.poll() is None:
408 os.kill(self._process.pid, signal.SIGTERM)
412 root_dir = self.root_dir
413 (host, port) = self.addr
417 communication = self.communication
419 python_code = "from nepi.util import server;c=server.Forwarder(%r);\
420 c.forward()" % (root_dir,)
422 self._process = popen_python(python_code,
423 communication = communication,
429 environment_setup = self.environment_setup)
431 # Wait for the forwarder to be ready, otherwise nobody
432 # will be able to connect to it
433 helo = self._process.stderr.readline()
434 if helo != 'FORWARDER_READY.\n':
435 raise AssertionError, "Expected 'FORWARDER_READY.', got %r: %s" % (helo,
436 helo + self._process.stderr.read())
438 def send_msg(self, msg):
439 encoded = base64.b64encode(msg)
440 data = "%s\n" % encoded
443 self._process.stdin.write(data)
444 except (IOError, ValueError):
445 # dead process, poll it to un-zombify
448 # try again after reconnect
449 # If it fails again, though, give up
451 self._process.stdin.write(data)
454 self.send_msg(STOP_MSG)
457 def defer_reply(self, transform=None):
459 self._deferreds.append(defer_entry)
461 functools.partial(self.read_reply, defer_entry, transform)
464 def _read_reply(self):
465 data = self._process.stdout.readline()
466 encoded = data.rstrip()
468 # empty == eof == dead process, poll it to un-zombify
471 raise RuntimeError, "Forwarder died while awaiting reply: %s" % (self._process.stderr.read(),)
472 return base64.b64decode(encoded)
474 def read_reply(self, which=None, transform=None):
475 # Test to see if someone did it already
476 if which is not None and len(which):
478 # ...just return the deferred value
480 return transform(which[0])
484 # Process all deferreds until the one we're looking for
485 # or until the queue is empty
486 while self._deferreds:
488 deferred = self._deferreds.popleft()
493 deferred.append(self._read_reply())
494 if deferred is which:
495 # We reached the one we were looking for
497 return transform(deferred[0])
502 # They've requested a synchronous read
504 return transform(self._read_reply())
506 return self._read_reply()
508 def _make_server_key_args(server_key, host, port, args):
510 Returns a reference to the created temporary file, and adds the
511 corresponding arguments to the given argument list.
513 Make sure to hold onto it until the process is done with the file
516 host = '%s:%s' % (host,port)
517 # Create a temporary server key file
518 tmp_known_hosts = tempfile.NamedTemporaryFile()
520 # Add the intended host key
521 tmp_known_hosts.write('%s,%s %s\n' % (host, socket.gethostbyname(host), server_key))
523 # If we're not in strict mode, add user-configured keys
524 if os.environ.get('NEPI_STRICT_AUTH_MODE',"").lower() not in ('1','true','on'):
525 user_hosts_path = '%s/.ssh/known_hosts' % (os.environ.get('HOME',""),)
526 if os.access(user_hosts_path, os.R_OK):
527 f = open(user_hosts_path, "r")
528 tmp_known_hosts.write(f.read())
531 tmp_known_hosts.flush()
533 args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
535 return tmp_known_hosts
537 def popen_ssh_command(command, host, port, user, agent,
544 err_on_timeout = True):
546 Executes a remote commands, returns ((stdout,stderr),process)
549 print "ssh", host, command
551 tmp_known_hosts = None
553 # Don't bother with localhost. Makes test easier
554 '-o', 'NoHostAuthenticationForLocalhost=yes',
559 args.append('-p%d' % port)
561 args.extend(('-i', ident_key))
565 # Create a temporary server key file
566 tmp_known_hosts = _make_server_key_args(
567 server_key, host, port, args)
571 # connects to the remote host and starts a remote connection
572 proc = subprocess.Popen(args,
573 stdout = subprocess.PIPE,
574 stdin = subprocess.PIPE,
575 stderr = subprocess.PIPE)
577 # attach tempfile object to the process, to make sure the file stays
578 # alive until the process is finished with it
579 proc._known_hosts = tmp_known_hosts
582 out, err = _communicate(proc, stdin, timeout, err_on_timeout)
584 except RuntimeError,e:
588 print " timedout -> ", e.args
592 print " -> ", out, err
594 return ((out, err), proc)
596 def popen_scp(source, dest,
603 Copies from/to remote sites.
605 Source and destination should have the user and host encoded
608 If source is a file object, a special mode will be used to
609 create the remote file with the same contents.
611 If dest is a file object, the remote file (source) will be
612 read and written into dest.
614 In these modes, recursive cannot be True.
616 Source can be a list of files to copy to a single destination,
617 in which case it is advised that the destination be a folder.
621 print "scp", source, dest
623 if isinstance(source, file) and source.tell() == 0:
625 elif hasattr(source, 'read'):
626 tmp = tempfile.NamedTemporaryFile()
628 buf = source.read(65536)
636 if isinstance(source, file) or isinstance(dest, file) \
637 or hasattr(source, 'read') or hasattr(dest, 'write'):
640 # Parse source/destination as <user>@<server>:<path>
641 if isinstance(dest, basestring) and ':' in dest:
642 remspec, path = dest.split(':',1)
643 elif isinstance(source, basestring) and ':' in source:
644 remspec, path = source.split(':',1)
646 raise ValueError, "Both endpoints cannot be local"
647 user,host = remspec.rsplit('@',1)
648 tmp_known_hosts = None
650 args = ['ssh', '-l', user, '-C',
651 # Don't bother with localhost. Makes test easier
652 '-o', 'NoHostAuthenticationForLocalhost=yes',
655 args.append('-P%d' % port)
657 args.extend(('-i', ident_key))
659 # Create a temporary server key file
660 tmp_known_hosts = _make_server_key_args(
661 server_key, host, port, args)
663 if isinstance(source, file) or hasattr(source, 'read'):
664 args.append('cat > %s' % (shell_escape(path),))
665 elif isinstance(dest, file) or hasattr(dest, 'write'):
666 args.append('cat %s' % (shell_escape(path),))
668 raise AssertionError, "Unreachable code reached! :-Q"
670 # connects to the remote host and starts a remote connection
671 if isinstance(source, file):
672 proc = subprocess.Popen(args,
673 stdout = open('/dev/null','w'),
674 stderr = subprocess.PIPE,
676 err = proc.stderr.read()
677 proc._known_hosts = tmp_known_hosts
678 eintr_retry(proc.wait)()
679 return ((None,err), proc)
680 elif isinstance(dest, file):
681 proc = subprocess.Popen(args,
682 stdout = open('/dev/null','w'),
683 stderr = subprocess.PIPE,
685 err = proc.stderr.read()
686 proc._known_hosts = tmp_known_hosts
687 eintr_retry(proc.wait)()
688 return ((None,err), proc)
689 elif hasattr(source, 'read'):
690 # file-like (but not file) source
691 proc = subprocess.Popen(args,
692 stdout = open('/dev/null','w'),
693 stderr = subprocess.PIPE,
694 stdin = subprocess.PIPE)
700 buf = source.read(4096)
705 rdrdy, wrdy, broken = select.select(
708 [proc.stderr,proc.stdin])
710 if proc.stderr in rdrdy:
711 # use os.read for fully unbuffered behavior
712 err.append(os.read(proc.stderr.fileno(), 4096))
714 if proc.stdin in wrdy:
715 proc.stdin.write(buf)
721 err.append(proc.stderr.read())
723 proc._known_hosts = tmp_known_hosts
724 eintr_retry(proc.wait)()
725 return ((None,''.join(err)), proc)
726 elif hasattr(dest, 'write'):
727 # file-like (but not file) dest
728 proc = subprocess.Popen(args,
729 stdout = subprocess.PIPE,
730 stderr = subprocess.PIPE,
731 stdin = open('/dev/null','w'))
736 rdrdy, wrdy, broken = select.select(
737 [proc.stderr, proc.stdout],
739 [proc.stderr, proc.stdout])
741 if proc.stderr in rdrdy:
742 # use os.read for fully unbuffered behavior
743 err.append(os.read(proc.stderr.fileno(), 4096))
745 if proc.stdout in rdrdy:
746 # use os.read for fully unbuffered behavior
747 buf = os.read(proc.stdout.fileno(), 4096)
756 err.append(proc.stderr.read())
758 proc._known_hosts = tmp_known_hosts
759 eintr_retry(proc.wait)()
760 return ((None,''.join(err)), proc)
762 raise AssertionError, "Unreachable code reached! :-Q"
764 # Parse destination as <user>@<server>:<path>
765 if isinstance(dest, basestring) and ':' in dest:
766 remspec, path = dest.split(':',1)
767 elif isinstance(source, basestring) and ':' in source:
768 remspec, path = source.split(':',1)
770 raise ValueError, "Both endpoints cannot be local"
771 user,host = remspec.rsplit('@',1)
774 tmp_known_hosts = None
775 args = ['scp', '-q', '-p', '-C',
776 # Don't bother with localhost. Makes test easier
777 '-o', 'NoHostAuthenticationForLocalhost=yes' ]
779 args.append('-P%d' % port)
783 args.extend(('-i', ident_key))
785 # Create a temporary server key file
786 tmp_known_hosts = _make_server_key_args(
787 server_key, host, port, args)
788 if isinstance(source,list):
794 # connects to the remote host and starts a remote connection
795 proc = subprocess.Popen(args,
796 stdout = subprocess.PIPE,
797 stdin = subprocess.PIPE,
798 stderr = subprocess.PIPE)
799 proc._known_hosts = tmp_known_hosts
801 comm = proc.communicate()
802 eintr_retry(proc.wait)()
805 def decode_and_execute():
806 # The python code we want to execute might have characters that
807 # are not compatible with the 'inline' mode we are using. To avoid
808 # problems we receive the encoded python code in base64 as a input
809 # stream and decode it for execution.
814 cmd += os.read(0, 1)# one byte from stdin
816 if e.errno == errno.EINTR:
822 cmd = base64.b64decode(cmd)
823 # Uncomment for debug
824 #os.write(2, "Executing python code: %s\n" % cmd)
825 os.write(1, "OK\n") # send a sync message
828 def popen_python(python_code,
829 communication = DC.ACCESS_LOCAL,
839 environment_setup = ""):
846 python_path.replace("'", r"'\''")
847 cmd = """PYTHONPATH="$PYTHONPATH":'%s' """ % python_path
849 if environment_setup:
850 cmd += environment_setup
852 # Uncomment for debug (to run everything under strace)
853 # We had to verify if strace works (cannot nest them)
854 #cmd += "if strace echo >/dev/null 2>&1; then CMD='strace -ff -tt -s 200 -o strace.out'; else CMD=''; fi\n"
856 #cmd += "strace -f -tt -s 200 -o strace$$.out "
858 cmd += "python -c 'import sys; sys.path.append(%s); from nepi.util import server; server.decode_and_execute()'" % (
859 repr(os.path.dirname(os.path.dirname(nepi.__file__))).replace("'",'"'),
862 if communication == DC.ACCESS_SSH:
863 tmp_known_hosts = None
865 # Don't bother with localhost. Makes test easier
866 '-o', 'NoHostAuthenticationForLocalhost=yes',
871 args.append('-p%d' % port)
873 args.extend(('-i', ident_key))
877 # Create a temporary server key file
878 tmp_known_hosts = _make_server_key_args(
879 server_key, host, port, args)
885 # connects to the remote host and starts a remote
886 proc = subprocess.Popen(args,
888 stdout = subprocess.PIPE,
889 stdin = subprocess.PIPE,
890 stderr = subprocess.PIPE)
892 if communication == DC.ACCESS_SSH:
893 proc._known_hosts = tmp_known_hosts
895 # send the command to execute
896 os.write(proc.stdin.fileno(),
897 base64.b64encode(python_code) + "\n")
901 msg = os.read(proc.stdout.fileno(), 3)
904 if e.errno == errno.EINTR:
910 raise RuntimeError, "Failed to start remote python interpreter: \nout:\n%s%s\nerr:\n%s" % (
911 msg, proc.stdout.read(), proc.stderr.read())
916 def _communicate(self, input, timeout=None, err_on_timeout=True):
919 stdout = None # Return
920 stderr = None # Return
924 if timeout is not None:
925 timelimit = time.time() + timeout
926 killtime = timelimit + 4
927 bailtime = timelimit + 4
930 # Flush stdio buffer. This might block, if the user has
931 # been writing to .stdin in an uncontrolled fashion.
934 write_set.append(self.stdin)
938 read_set.append(self.stdout)
941 read_set.append(self.stderr)
945 while read_set or write_set:
946 if timeout is not None:
947 curtime = time.time()
948 if timeout is None or curtime > timelimit:
949 if curtime > bailtime:
951 elif curtime > killtime:
952 signum = signal.SIGKILL
954 signum = signal.SIGTERM
956 os.kill(self.pid, signum)
959 select_timeout = timelimit - curtime + 0.1
961 select_timeout = None
964 rlist, wlist, xlist = select.select(read_set, write_set, [], select_timeout)
965 except select.error,e:
971 if self.stdin in wlist:
972 # When select has indicated that the file is writable,
973 # we can write up to PIPE_BUF bytes without risk
974 # blocking. POSIX defines PIPE_BUF >= 512
975 bytes_written = os.write(self.stdin.fileno(), buffer(input, input_offset, 512))
976 input_offset += bytes_written
977 if input_offset >= len(input):
979 write_set.remove(self.stdin)
981 if self.stdout in rlist:
982 data = os.read(self.stdout.fileno(), 1024)
985 read_set.remove(self.stdout)
988 if self.stderr in rlist:
989 data = os.read(self.stderr.fileno(), 1024)
992 read_set.remove(self.stderr)
995 # All data exchanged. Translate lists into strings.
996 if stdout is not None:
997 stdout = ''.join(stdout)
998 if stderr is not None:
999 stderr = ''.join(stderr)
1001 # Translate newlines, if requested. We cannot let the file
1002 # object do the translation: It is based on stdio, which is
1003 # impossible to combine with select (unless forcing no
1005 if self.universal_newlines and hasattr(file, 'newlines'):
1007 stdout = self._translate_newlines(stdout)
1009 stderr = self._translate_newlines(stderr)
1011 if killed and err_on_timeout:
1012 errcode = self.poll()
1013 raise RuntimeError, ("Operation timed out", errcode, stdout, stderr)
1019 return (stdout, stderr)