2 # -*- coding: utf-8 -*-
4 from nepi.util.constants import DeploymentConfiguration as DC
26 CTRL_SOCK = "ctrl.sock"
28 STD_ERR = "stderr.log"
33 TRACE = os.environ.get("NEPI_TRACE", "false").lower() in ("true", "1", "on")
35 if hasattr(os, "devnull"):
38 DEV_NULL = "/dev/null"
40 SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
43 """ Escapes strings so that they are safe to use as command-line arguments """
44 if SHELL_SAFE.match(s):
45 # safe string - no escaping needed
48 # unsafe string - escape
50 if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",'"'):
53 return "'$'\\x%02x''" % (ord(c),)
54 s = ''.join(map(escp,s))
57 def eintr_retry(func):
59 @functools.wraps(func)
61 retry = kw.pop("_retry", False)
62 for i in xrange(0 if retry else 4):
65 except (select.error, socket.error), args:
66 if args[0] == errno.EINTR:
71 if e.errno == errno.EINTR:
80 def __init__(self, root_dir = ".", log_level = DC.ERROR_LEVEL,
81 environment_setup = "", clean_root = False):
82 self._root_dir = root_dir
83 self._clean_root = clean_root
85 self._ctrl_sock = None
86 self._log_level = log_level
88 self._environment_setup = environment_setup
97 # can not return normally after fork beacuse no exec was done.
98 # This means that if we don't do a os._exit(0) here the code that
99 # follows the call to "Server.run()" in the "caller code" will be
100 # executed... but by now it has already been executed after the
101 # first process (the one that did the first fork) returned.
104 print >>sys.stderr, "SERVER_ERROR."
108 print >>sys.stderr, "SERVER_READY."
111 # pipes for process synchronization
115 root = os.path.normpath(self._root_dir)
116 if self._root_dir not in [".", ""] and os.path.exists(root) \
117 and self._clean_root:
119 if not os.path.exists(root):
120 os.makedirs(root, 0755)
128 except OSError, e: # pragma: no cover
129 if e.errno == errno.EINTR:
135 # os.waitpid avoids leaving a <defunc> (zombie) process
136 st = os.waitpid(pid1, 0)[1]
138 raise RuntimeError("Daemonization failed")
139 # return 0 to inform the caller method that this is not the
144 # Decouple from parent environment.
145 os.chdir(self._root_dir)
152 # see ref: "os._exit(0)"
155 # close all open file descriptors.
156 max_fd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
157 if (max_fd == resource.RLIM_INFINITY):
159 for fd in range(3, max_fd):
166 # Redirect standard file descriptors.
167 stdin = open(DEV_NULL, "r")
168 stderr = stdout = open(STD_ERR, "a", 0)
169 os.dup2(stdin.fileno(), sys.stdin.fileno())
170 # NOTE: sys.stdout.write will still be buffered, even if the file
171 # was opened with 0 buffer
172 os.dup2(stdout.fileno(), sys.stdout.fileno())
173 os.dup2(stderr.fileno(), sys.stderr.fileno())
176 if self._environment_setup:
177 # parse environment variables and pass to child process
178 # do it by executing shell commands, in case there's some heavy setup involved
179 envproc = subprocess.Popen(
181 "( %s python -c 'import os,sys ; print \"\\x01\".join(\"\\x02\".join(map(str,x)) for x in os.environ.iteritems())' ) | tail -1" %
182 ( self._environment_setup, ) ],
183 stdin = subprocess.PIPE,
184 stdout = subprocess.PIPE,
185 stderr = subprocess.PIPE
187 out,err = envproc.communicate()
189 # parse new environment
191 environment = dict(map(lambda x:x.split("\x02"), out.split("\x01")))
193 # apply to current environment
194 for name, value in environment.iteritems():
195 os.environ[name] = value
198 if 'PYTHONPATH' in environment:
199 sys.path = environment['PYTHONPATH'].split(':') + sys.path
201 # create control socket
202 self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
204 self._ctrl_sock.bind(CTRL_SOCK)
206 # Address in use, check pidfile
209 pidfile = open(CTRL_PID, "r")
218 # Check process liveliness
219 if not os.path.exists("/proc/%d" % (pid,)):
220 # Ok, it's dead, clean the socket
224 self._ctrl_sock.bind(CTRL_SOCK)
226 self._ctrl_sock.listen(0)
229 pidfile = open(CTRL_PID, "w")
230 pidfile.write(str(os.getpid()))
233 # let the parent process know that the daemonization is finished
238 def post_daemonize(self):
239 os.environ["NEPI_CONTROLLER_LOGLEVEL"] = self._log_level
240 # QT, for some strange reason, redefines the SIGCHILD handler to write
241 # a \0 to a fd (lets say fileno 'x'), when ever a SIGCHILD is received.
242 # Server dameonization closes all file descriptors from fileno '3',
243 # but the overloaded handler (inherited by the forked process) will
244 # keep trying to write the \0 to fileno 'x', which might have been reused
245 # after closing, for other operations. This is bad bad bad when fileno 'x'
246 # is in use for communication pouroses, because unexpected \0 start
247 # appearing in the communication messages... this is exactly what happens
248 # when using netns in daemonized form. Thus, be have no other alternative than
249 # restoring the SIGCHLD handler to the default here.
251 signal.signal(signal.SIGCHLD, signal.SIG_DFL)
254 while not self._stop:
255 conn, addr = self._ctrl_sock.accept()
256 self.log_error("ACCEPTED CONNECTION: %s" % (addr,))
258 while not self._stop:
260 msg = self.recv_msg(conn)
261 except socket.timeout, e:
262 #self.log_error("SERVER recv_msg: connection timedout ")
266 self.log_error("CONNECTION LOST")
271 reply = self.stop_action()
273 reply = self.reply_action(msg)
276 self.send_reply(conn, reply)
279 self.log_error("NOTICE: Awaiting for reconnection")
287 def recv_msg(self, conn):
290 while '\n' not in chunk:
292 chunk = conn.recv(1024)
293 except (OSError, socket.error), e:
294 if e[0] != errno.EINTR:
303 data = ''.join(data).split('\n',1)
306 data, self._rdbuf = data
308 decoded = base64.b64decode(data)
309 return decoded.rstrip()
311 def send_reply(self, conn, reply):
312 encoded = base64.b64encode(reply)
313 conn.send("%s\n" % encoded)
317 self._ctrl_sock.close()
322 def stop_action(self):
323 return "Stopping server"
325 def reply_action(self, msg):
326 return "Reply to: %s" % msg
328 def log_error(self, text = None, context = ''):
330 text = traceback.format_exc()
331 date = time.strftime("%Y-%m-%d %H:%M:%S")
333 context = " (%s)" % (context,)
334 sys.stderr.write("ERROR%s: %s\n%s\n" % (context, date, text))
337 def log_debug(self, text):
338 if self._log_level == DC.DEBUG_LEVEL:
339 date = time.strftime("%Y-%m-%d %H:%M:%S")
340 sys.stderr.write("DEBUG: %s\n%s\n" % (date, text))
342 class Forwarder(object):
343 def __init__(self, root_dir = "."):
344 self._ctrl_sock = None
345 self._root_dir = root_dir
351 print >>sys.stderr, "FORWARDER_READY."
352 while not self._stop:
353 data = self.read_data()
355 # Connection to client lost
357 self.send_to_server(data)
359 data = self.recv_from_server()
361 # Connection to server lost
362 raise IOError, "Connection to server lost while "\
364 self.write_data(data)
368 return sys.stdin.readline()
370 def write_data(self, data):
371 sys.stdout.write(data)
372 # sys.stdout.write is buffered, this is why we need to do a flush()
375 def send_to_server(self, data):
377 self._ctrl_sock.send(data)
378 except (IOError, socket.error), e:
379 if e[0] == errno.EPIPE:
381 self._ctrl_sock.send(data)
384 encoded = data.rstrip()
385 msg = base64.b64decode(encoded)
389 def recv_from_server(self):
392 while '\n' not in chunk:
394 chunk = self._ctrl_sock.recv(1024)
395 except (OSError, socket.error), e:
396 if e[0] != errno.EINTR:
404 data = ''.join(data).split('\n',1)
407 data, self._rdbuf = data
413 self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
414 sock_addr = os.path.join(self._root_dir, CTRL_SOCK)
415 self._ctrl_sock.connect(sock_addr)
417 def disconnect(self):
419 self._ctrl_sock.close()
423 class Client(object):
424 def __init__(self, root_dir = ".", host = None, port = None, user = None,
425 agent = None, sudo = False, communication = DC.ACCESS_LOCAL,
426 environment_setup = ""):
427 self.root_dir = root_dir
428 self.addr = (host, port)
432 self.communication = communication
433 self.environment_setup = environment_setup
434 self._stopped = False
435 self._deferreds = collections.deque()
439 if self._process.poll() is None:
440 os.kill(self._process.pid, signal.SIGTERM)
444 root_dir = self.root_dir
445 (host, port) = self.addr
449 communication = self.communication
451 python_code = "from nepi.util import server;c=server.Forwarder(%r);\
452 c.forward()" % (root_dir,)
454 self._process = popen_python(python_code,
455 communication = communication,
461 environment_setup = self.environment_setup)
463 # Wait for the forwarder to be ready, otherwise nobody
464 # will be able to connect to it
468 helo = self._process.stderr.readline()
469 if helo == 'FORWARDER_READY.\n':
473 raise AssertionError, "Expected 'FORWARDER_READY.', got: %s" % (''.join(err),)
475 def send_msg(self, msg):
476 encoded = base64.b64encode(msg)
477 data = "%s\n" % encoded
480 self._process.stdin.write(data)
481 except (IOError, ValueError):
482 # dead process, poll it to un-zombify
485 # try again after reconnect
486 # If it fails again, though, give up
488 self._process.stdin.write(data)
491 self.send_msg(STOP_MSG)
494 def defer_reply(self, transform=None):
496 self._deferreds.append(defer_entry)
498 functools.partial(self.read_reply, defer_entry, transform)
501 def _read_reply(self):
502 data = self._process.stdout.readline()
503 encoded = data.rstrip()
505 # empty == eof == dead process, poll it to un-zombify
508 raise RuntimeError, "Forwarder died while awaiting reply: %s" % (self._process.stderr.read(),)
509 return base64.b64decode(encoded)
511 def read_reply(self, which=None, transform=None):
512 # Test to see if someone did it already
513 if which is not None and len(which):
515 # ...just return the deferred value
517 return transform(which[0])
521 # Process all deferreds until the one we're looking for
522 # or until the queue is empty
523 while self._deferreds:
525 deferred = self._deferreds.popleft()
530 deferred.append(self._read_reply())
531 if deferred is which:
532 # We reached the one we were looking for
534 return transform(deferred[0])
539 # They've requested a synchronous read
541 return transform(self._read_reply())
543 return self._read_reply()
545 def _make_server_key_args(server_key, host, port, args):
547 Returns a reference to the created temporary file, and adds the
548 corresponding arguments to the given argument list.
550 Make sure to hold onto it until the process is done with the file
553 host = '%s:%s' % (host,port)
554 # Create a temporary server key file
555 tmp_known_hosts = tempfile.NamedTemporaryFile()
557 # Add the intended host key
558 tmp_known_hosts.write('%s,%s %s\n' % (host, socket.gethostbyname(host), server_key))
560 # If we're not in strict mode, add user-configured keys
561 if os.environ.get('NEPI_STRICT_AUTH_MODE',"").lower() not in ('1','true','on'):
562 user_hosts_path = '%s/.ssh/known_hosts' % (os.environ.get('HOME',""),)
563 if os.access(user_hosts_path, os.R_OK):
564 f = open(user_hosts_path, "r")
565 tmp_known_hosts.write(f.read())
568 tmp_known_hosts.flush()
570 args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
572 return tmp_known_hosts
574 def popen_ssh_command(command, host, port, user, agent,
581 err_on_timeout = True):
583 Executes a remote commands, returns ((stdout,stderr),process)
586 print "ssh", host, command
588 tmp_known_hosts = None
590 # Don't bother with localhost. Makes test easier
591 '-o', 'NoHostAuthenticationForLocalhost=yes',
596 args.append('-p%d' % port)
598 args.extend(('-i', ident_key))
602 # Create a temporary server key file
603 tmp_known_hosts = _make_server_key_args(
604 server_key, host, port, args)
607 for x in xrange(retry or 3):
608 # connects to the remote host and starts a remote connection
609 proc = subprocess.Popen(args,
610 stdout = subprocess.PIPE,
611 stdin = subprocess.PIPE,
612 stderr = subprocess.PIPE)
614 # attach tempfile object to the process, to make sure the file stays
615 # alive until the process is finished with it
616 proc._known_hosts = tmp_known_hosts
619 out, err = _communicate(proc, stdin, timeout, err_on_timeout)
620 if proc.poll() and err.strip().startswith('ssh: '):
621 # SSH error, can safely retry
624 except RuntimeError,e:
628 print " timedout -> ", e.args
632 print " -> ", out, err
634 return ((out, err), proc)
636 def popen_scp(source, dest,
643 Copies from/to remote sites.
645 Source and destination should have the user and host encoded
648 If source is a file object, a special mode will be used to
649 create the remote file with the same contents.
651 If dest is a file object, the remote file (source) will be
652 read and written into dest.
654 In these modes, recursive cannot be True.
656 Source can be a list of files to copy to a single destination,
657 in which case it is advised that the destination be a folder.
661 print "scp", source, dest
663 if isinstance(source, file) and source.tell() == 0:
665 elif hasattr(source, 'read'):
666 tmp = tempfile.NamedTemporaryFile()
668 buf = source.read(65536)
676 if isinstance(source, file) or isinstance(dest, file) \
677 or hasattr(source, 'read') or hasattr(dest, 'write'):
680 # Parse source/destination as <user>@<server>:<path>
681 if isinstance(dest, basestring) and ':' in dest:
682 remspec, path = dest.split(':',1)
683 elif isinstance(source, basestring) and ':' in source:
684 remspec, path = source.split(':',1)
686 raise ValueError, "Both endpoints cannot be local"
687 user,host = remspec.rsplit('@',1)
688 tmp_known_hosts = None
690 args = ['ssh', '-l', user, '-C',
691 # Don't bother with localhost. Makes test easier
692 '-o', 'NoHostAuthenticationForLocalhost=yes',
695 args.append('-P%d' % port)
697 args.extend(('-i', ident_key))
699 # Create a temporary server key file
700 tmp_known_hosts = _make_server_key_args(
701 server_key, host, port, args)
703 if isinstance(source, file) or hasattr(source, 'read'):
704 args.append('cat > %s' % (shell_escape(path),))
705 elif isinstance(dest, file) or hasattr(dest, 'write'):
706 args.append('cat %s' % (shell_escape(path),))
708 raise AssertionError, "Unreachable code reached! :-Q"
710 # connects to the remote host and starts a remote connection
711 if isinstance(source, file):
712 proc = subprocess.Popen(args,
713 stdout = open('/dev/null','w'),
714 stderr = subprocess.PIPE,
716 err = proc.stderr.read()
717 proc._known_hosts = tmp_known_hosts
718 eintr_retry(proc.wait)()
719 return ((None,err), proc)
720 elif isinstance(dest, file):
721 proc = subprocess.Popen(args,
722 stdout = open('/dev/null','w'),
723 stderr = subprocess.PIPE,
725 err = proc.stderr.read()
726 proc._known_hosts = tmp_known_hosts
727 eintr_retry(proc.wait)()
728 return ((None,err), proc)
729 elif hasattr(source, 'read'):
730 # file-like (but not file) source
731 proc = subprocess.Popen(args,
732 stdout = open('/dev/null','w'),
733 stderr = subprocess.PIPE,
734 stdin = subprocess.PIPE)
740 buf = source.read(4096)
745 rdrdy, wrdy, broken = select.select(
748 [proc.stderr,proc.stdin])
750 if proc.stderr in rdrdy:
751 # use os.read for fully unbuffered behavior
752 err.append(os.read(proc.stderr.fileno(), 4096))
754 if proc.stdin in wrdy:
755 proc.stdin.write(buf)
761 err.append(proc.stderr.read())
763 proc._known_hosts = tmp_known_hosts
764 eintr_retry(proc.wait)()
765 return ((None,''.join(err)), proc)
766 elif hasattr(dest, 'write'):
767 # file-like (but not file) dest
768 proc = subprocess.Popen(args,
769 stdout = subprocess.PIPE,
770 stderr = subprocess.PIPE,
771 stdin = open('/dev/null','w'))
776 rdrdy, wrdy, broken = select.select(
777 [proc.stderr, proc.stdout],
779 [proc.stderr, proc.stdout])
781 if proc.stderr in rdrdy:
782 # use os.read for fully unbuffered behavior
783 err.append(os.read(proc.stderr.fileno(), 4096))
785 if proc.stdout in rdrdy:
786 # use os.read for fully unbuffered behavior
787 buf = os.read(proc.stdout.fileno(), 4096)
796 err.append(proc.stderr.read())
798 proc._known_hosts = tmp_known_hosts
799 eintr_retry(proc.wait)()
800 return ((None,''.join(err)), proc)
802 raise AssertionError, "Unreachable code reached! :-Q"
804 # Parse destination as <user>@<server>:<path>
805 if isinstance(dest, basestring) and ':' in dest:
806 remspec, path = dest.split(':',1)
807 elif isinstance(source, basestring) and ':' in source:
808 remspec, path = source.split(':',1)
810 raise ValueError, "Both endpoints cannot be local"
811 user,host = remspec.rsplit('@',1)
814 tmp_known_hosts = None
815 args = ['scp', '-q', '-p', '-C',
816 # Don't bother with localhost. Makes test easier
817 '-o', 'NoHostAuthenticationForLocalhost=yes' ]
819 args.append('-P%d' % port)
823 args.extend(('-i', ident_key))
825 # Create a temporary server key file
826 tmp_known_hosts = _make_server_key_args(
827 server_key, host, port, args)
828 if isinstance(source,list):
834 # connects to the remote host and starts a remote connection
835 proc = subprocess.Popen(args,
836 stdout = subprocess.PIPE,
837 stdin = subprocess.PIPE,
838 stderr = subprocess.PIPE)
839 proc._known_hosts = tmp_known_hosts
841 comm = proc.communicate()
842 eintr_retry(proc.wait)()
845 def decode_and_execute():
846 # The python code we want to execute might have characters that
847 # are not compatible with the 'inline' mode we are using. To avoid
848 # problems we receive the encoded python code in base64 as a input
849 # stream and decode it for execution.
854 cmd += os.read(0, 1)# one byte from stdin
856 if e.errno == errno.EINTR:
862 cmd = base64.b64decode(cmd)
863 # Uncomment for debug
864 #os.write(2, "Executing python code: %s\n" % cmd)
865 os.write(1, "OK\n") # send a sync message
868 def popen_python(python_code,
869 communication = DC.ACCESS_LOCAL,
879 environment_setup = ""):
883 python_path.replace("'", r"'\''")
884 cmd = """PYTHONPATH="$PYTHONPATH":'%s' """ % python_path
886 if environment_setup:
887 cmd += environment_setup
889 # Uncomment for debug (to run everything under strace)
890 # We had to verify if strace works (cannot nest them)
891 #cmd += "if strace echo >/dev/null 2>&1; then CMD='strace -ff -tt -s 200 -o strace.out'; else CMD=''; fi\n"
893 #cmd += "strace -f -tt -s 200 -o strace$$.out "
895 cmd += "python -c 'import sys; sys.path.insert(0,%s); from nepi.util import server; server.decode_and_execute()'" % (
896 repr(os.path.dirname(os.path.dirname(nepi.__file__))).replace("'",'"'),
901 cmd = "sudo bash -c " + shell_escape(cmd)
905 if communication == DC.ACCESS_SSH:
906 tmp_known_hosts = None
908 # Don't bother with localhost. Makes test easier
909 '-o', 'NoHostAuthenticationForLocalhost=yes',
914 args.append('-p%d' % port)
916 args.extend(('-i', ident_key))
920 # Create a temporary server key file
921 tmp_known_hosts = _make_server_key_args(
922 server_key, host, port, args)
925 args = [ "/bin/bash", "-c", cmd ]
927 # connects to the remote host and starts a remote
928 proc = subprocess.Popen(args,
930 stdout = subprocess.PIPE,
931 stdin = subprocess.PIPE,
932 stderr = subprocess.PIPE)
934 if communication == DC.ACCESS_SSH:
935 proc._known_hosts = tmp_known_hosts
937 # send the command to execute
938 os.write(proc.stdin.fileno(),
939 base64.b64encode(python_code) + "\n")
943 msg = os.read(proc.stdout.fileno(), 3)
946 if e.errno == errno.EINTR:
952 raise RuntimeError, "Failed to start remote python interpreter: \nout:\n%s%s\nerr:\n%s" % (
953 msg, proc.stdout.read(), proc.stderr.read())
958 def _communicate(self, input, timeout=None, err_on_timeout=True):
961 stdout = None # Return
962 stderr = None # Return
966 if timeout is not None:
967 timelimit = time.time() + timeout
968 killtime = timelimit + 4
969 bailtime = timelimit + 4
972 # Flush stdio buffer. This might block, if the user has
973 # been writing to .stdin in an uncontrolled fashion.
976 write_set.append(self.stdin)
980 read_set.append(self.stdout)
983 read_set.append(self.stderr)
987 while read_set or write_set:
988 if timeout is not None:
989 curtime = time.time()
990 if timeout is None or curtime > timelimit:
991 if curtime > bailtime:
993 elif curtime > killtime:
994 signum = signal.SIGKILL
996 signum = signal.SIGTERM
998 os.kill(self.pid, signum)
1001 select_timeout = timelimit - curtime + 0.1
1003 select_timeout = None
1006 rlist, wlist, xlist = select.select(read_set, write_set, [], select_timeout)
1007 except select.error,e:
1013 if self.stdin in wlist:
1014 # When select has indicated that the file is writable,
1015 # we can write up to PIPE_BUF bytes without risk
1016 # blocking. POSIX defines PIPE_BUF >= 512
1017 bytes_written = os.write(self.stdin.fileno(), buffer(input, input_offset, 512))
1018 input_offset += bytes_written
1019 if input_offset >= len(input):
1021 write_set.remove(self.stdin)
1023 if self.stdout in rlist:
1024 data = os.read(self.stdout.fileno(), 1024)
1027 read_set.remove(self.stdout)
1030 if self.stderr in rlist:
1031 data = os.read(self.stderr.fileno(), 1024)
1034 read_set.remove(self.stderr)
1037 # All data exchanged. Translate lists into strings.
1038 if stdout is not None:
1039 stdout = ''.join(stdout)
1040 if stderr is not None:
1041 stderr = ''.join(stderr)
1043 # Translate newlines, if requested. We cannot let the file
1044 # object do the translation: It is based on stdio, which is
1045 # impossible to combine with select (unless forcing no
1047 if self.universal_newlines and hasattr(file, 'newlines'):
1049 stdout = self._translate_newlines(stdout)
1051 stderr = self._translate_newlines(stderr)
1053 if killed and err_on_timeout:
1054 errcode = self.poll()
1055 raise RuntimeError, ("Operation timed out", errcode, stdout, stderr)
1061 return (stdout, stderr)