2 # -*- coding: utf-8 -*-
4 from nepi.util.constants import DeploymentConfiguration as DC
26 CTRL_SOCK = "ctrl.sock"
27 STD_ERR = "stderr.log"
32 TRACE = os.environ.get("NEPI_TRACE", "false").lower() in ("true", "1", "on")
34 if hasattr(os, "devnull"):
37 DEV_NULL = "/dev/null"
39 SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
42 """ Escapes strings so that they are safe to use as command-line arguments """
43 if SHELL_SAFE.match(s):
44 # safe string - no escaping needed
47 # unsafe string - escape
49 if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",'"'):
52 return "'$'\\x%02x''" % (ord(c),)
53 s = ''.join(map(escp,s))
56 def eintr_retry(func):
58 @functools.wraps(func)
60 retry = kw.pop("_retry", False)
61 for i in xrange(0 if retry else 4):
64 except (select.error, socket.error), args:
65 if args[0] == errno.EINTR:
70 if e.errno == errno.EINTR:
79 def __init__(self, root_dir = ".", log_level = DC.ERROR_LEVEL,
80 environment_setup = "", clean_root = False):
81 self._root_dir = root_dir
82 self._clean_root = clean_root
84 self._ctrl_sock = None
85 self._log_level = log_level
87 self._environment_setup = environment_setup
96 # can not return normally after fork beacuse no exec was done.
97 # This means that if we don't do a os._exit(0) here the code that
98 # follows the call to "Server.run()" in the "caller code" will be
99 # executed... but by now it has already been executed after the
100 # first process (the one that did the first fork) returned.
103 print >>sys.stderr, "SERVER_ERROR."
107 print >>sys.stderr, "SERVER_READY."
110 # pipes for process synchronization
114 root = os.path.normpath(self._root_dir)
115 if os.path.exists(root) and self._clean_root:
117 if not os.path.exists(root):
118 os.makedirs(root, 0755)
126 except OSError, e: # pragma: no cover
127 if e.errno == errno.EINTR:
133 # os.waitpid avoids leaving a <defunc> (zombie) process
134 st = os.waitpid(pid1, 0)[1]
136 raise RuntimeError("Daemonization failed")
137 # return 0 to inform the caller method that this is not the
142 # Decouple from parent environment.
143 os.chdir(self._root_dir)
150 # see ref: "os._exit(0)"
153 # close all open file descriptors.
154 max_fd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
155 if (max_fd == resource.RLIM_INFINITY):
157 for fd in range(3, max_fd):
164 # Redirect standard file descriptors.
165 stdin = open(DEV_NULL, "r")
166 stderr = stdout = open(STD_ERR, "a", 0)
167 os.dup2(stdin.fileno(), sys.stdin.fileno())
168 # NOTE: sys.stdout.write will still be buffered, even if the file
169 # was opened with 0 buffer
170 os.dup2(stdout.fileno(), sys.stdout.fileno())
171 os.dup2(stderr.fileno(), sys.stderr.fileno())
174 if self._environment_setup:
175 # parse environment variables and pass to child process
176 # do it by executing shell commands, in case there's some heavy setup involved
177 envproc = subprocess.Popen(
179 "( %s python -c 'import os,sys ; print \"\\x01\".join(\"\\x02\".join(map(str,x)) for x in os.environ.iteritems())' ) | tail -1" %
180 ( self._environment_setup, ) ],
181 stdin = subprocess.PIPE,
182 stdout = subprocess.PIPE,
183 stderr = subprocess.PIPE
185 out,err = envproc.communicate()
187 # parse new environment
189 environment = dict(map(lambda x:x.split("\x02"), out.split("\x01")))
191 # apply to current environment
192 for name, value in environment.iteritems():
193 os.environ[name] = value
196 if 'PYTHONPATH' in environment:
197 sys.path = environment['PYTHONPATH'].split(':') + sys.path
199 # create control socket
200 self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
201 self._ctrl_sock.bind(CTRL_SOCK)
202 self._ctrl_sock.listen(0)
204 # let the parent process know that the daemonization is finished
209 def post_daemonize(self):
210 os.environ["NEPI_CONTROLLER_LOGLEVEL"] = self._log_level
211 # QT, for some strange reason, redefines the SIGCHILD handler to write
212 # a \0 to a fd (lets say fileno 'x'), when ever a SIGCHILD is received.
213 # Server dameonization closes all file descriptors from fileno '3',
214 # but the overloaded handler (inherited by the forked process) will
215 # keep trying to write the \0 to fileno 'x', which might have been reused
216 # after closing, for other operations. This is bad bad bad when fileno 'x'
217 # is in use for communication pouroses, because unexpected \0 start
218 # appearing in the communication messages... this is exactly what happens
219 # when using netns in daemonized form. Thus, be have no other alternative than
220 # restoring the SIGCHLD handler to the default here.
222 signal.signal(signal.SIGCHLD, signal.SIG_DFL)
225 while not self._stop:
226 conn, addr = self._ctrl_sock.accept()
227 self.log_error("ACCEPTED CONNECTION: %s" % (addr,))
229 while not self._stop:
231 msg = self.recv_msg(conn)
232 except socket.timeout, e:
233 #self.log_error("SERVER recv_msg: connection timedout ")
237 self.log_error("CONNECTION LOST")
242 reply = self.stop_action()
244 reply = self.reply_action(msg)
247 self.send_reply(conn, reply)
250 self.log_error("NOTICE: Awaiting for reconnection")
258 def recv_msg(self, conn):
261 while '\n' not in chunk:
263 chunk = conn.recv(1024)
264 except (OSError, socket.error), e:
265 if e[0] != errno.EINTR:
274 data = ''.join(data).split('\n',1)
277 data, self._rdbuf = data
279 decoded = base64.b64decode(data)
280 return decoded.rstrip()
282 def send_reply(self, conn, reply):
283 encoded = base64.b64encode(reply)
284 conn.send("%s\n" % encoded)
288 self._ctrl_sock.close()
293 def stop_action(self):
294 return "Stopping server"
296 def reply_action(self, msg):
297 return "Reply to: %s" % msg
299 def log_error(self, text = None, context = ''):
301 text = traceback.format_exc()
302 date = time.strftime("%Y-%m-%d %H:%M:%S")
304 context = " (%s)" % (context,)
305 sys.stderr.write("ERROR%s: %s\n%s\n" % (context, date, text))
308 def log_debug(self, text):
309 if self._log_level == DC.DEBUG_LEVEL:
310 date = time.strftime("%Y-%m-%d %H:%M:%S")
311 sys.stderr.write("DEBUG: %s\n%s\n" % (date, text))
313 class Forwarder(object):
314 def __init__(self, root_dir = "."):
315 self._ctrl_sock = None
316 self._root_dir = root_dir
322 print >>sys.stderr, "FORWARDER_READY."
323 while not self._stop:
324 data = self.read_data()
326 # Connection to client lost
328 self.send_to_server(data)
330 data = self.recv_from_server()
332 # Connection to server lost
333 raise IOError, "Connection to server lost while "\
335 self.write_data(data)
339 return sys.stdin.readline()
341 def write_data(self, data):
342 sys.stdout.write(data)
343 # sys.stdout.write is buffered, this is why we need to do a flush()
346 def send_to_server(self, data):
348 self._ctrl_sock.send(data)
349 except (IOError, socket.error), e:
350 if e[0] == errno.EPIPE:
352 self._ctrl_sock.send(data)
355 encoded = data.rstrip()
356 msg = base64.b64decode(encoded)
360 def recv_from_server(self):
363 while '\n' not in chunk:
365 chunk = self._ctrl_sock.recv(1024)
366 except (OSError, socket.error), e:
367 if e[0] != errno.EINTR:
375 data = ''.join(data).split('\n',1)
378 data, self._rdbuf = data
384 self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
385 sock_addr = os.path.join(self._root_dir, CTRL_SOCK)
386 self._ctrl_sock.connect(sock_addr)
388 def disconnect(self):
390 self._ctrl_sock.close()
394 class Client(object):
395 def __init__(self, root_dir = ".", host = None, port = None, user = None,
396 agent = None, sudo = False, communication = DC.ACCESS_LOCAL,
397 environment_setup = ""):
398 self.root_dir = root_dir
399 self.addr = (host, port)
403 self.communication = communication
404 self.environment_setup = environment_setup
405 self._stopped = False
406 self._deferreds = collections.deque()
410 if self._process.poll() is None:
411 os.kill(self._process.pid, signal.SIGTERM)
415 root_dir = self.root_dir
416 (host, port) = self.addr
420 communication = self.communication
422 python_code = "from nepi.util import server;c=server.Forwarder(%r);\
423 c.forward()" % (root_dir,)
425 self._process = popen_python(python_code,
426 communication = communication,
432 environment_setup = self.environment_setup)
434 # Wait for the forwarder to be ready, otherwise nobody
435 # will be able to connect to it
439 helo = self._process.stderr.readline()
440 if helo == 'FORWARDER_READY.\n':
444 raise AssertionError, "Expected 'FORWARDER_READY.', got: %s" % (''.join(err),)
446 def send_msg(self, msg):
447 encoded = base64.b64encode(msg)
448 data = "%s\n" % encoded
451 self._process.stdin.write(data)
452 except (IOError, ValueError):
453 # dead process, poll it to un-zombify
456 # try again after reconnect
457 # If it fails again, though, give up
459 self._process.stdin.write(data)
462 self.send_msg(STOP_MSG)
465 def defer_reply(self, transform=None):
467 self._deferreds.append(defer_entry)
469 functools.partial(self.read_reply, defer_entry, transform)
472 def _read_reply(self):
473 data = self._process.stdout.readline()
474 encoded = data.rstrip()
476 # empty == eof == dead process, poll it to un-zombify
479 raise RuntimeError, "Forwarder died while awaiting reply: %s" % (self._process.stderr.read(),)
480 return base64.b64decode(encoded)
482 def read_reply(self, which=None, transform=None):
483 # Test to see if someone did it already
484 if which is not None and len(which):
486 # ...just return the deferred value
488 return transform(which[0])
492 # Process all deferreds until the one we're looking for
493 # or until the queue is empty
494 while self._deferreds:
496 deferred = self._deferreds.popleft()
501 deferred.append(self._read_reply())
502 if deferred is which:
503 # We reached the one we were looking for
505 return transform(deferred[0])
510 # They've requested a synchronous read
512 return transform(self._read_reply())
514 return self._read_reply()
516 def _make_server_key_args(server_key, host, port, args):
518 Returns a reference to the created temporary file, and adds the
519 corresponding arguments to the given argument list.
521 Make sure to hold onto it until the process is done with the file
524 host = '%s:%s' % (host,port)
525 # Create a temporary server key file
526 tmp_known_hosts = tempfile.NamedTemporaryFile()
528 # Add the intended host key
529 tmp_known_hosts.write('%s,%s %s\n' % (host, socket.gethostbyname(host), server_key))
531 # If we're not in strict mode, add user-configured keys
532 if os.environ.get('NEPI_STRICT_AUTH_MODE',"").lower() not in ('1','true','on'):
533 user_hosts_path = '%s/.ssh/known_hosts' % (os.environ.get('HOME',""),)
534 if os.access(user_hosts_path, os.R_OK):
535 f = open(user_hosts_path, "r")
536 tmp_known_hosts.write(f.read())
539 tmp_known_hosts.flush()
541 args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
543 return tmp_known_hosts
545 def popen_ssh_command(command, host, port, user, agent,
552 err_on_timeout = True):
554 Executes a remote commands, returns ((stdout,stderr),process)
557 print "ssh", host, command
559 tmp_known_hosts = None
561 # Don't bother with localhost. Makes test easier
562 '-o', 'NoHostAuthenticationForLocalhost=yes',
567 args.append('-p%d' % port)
569 args.extend(('-i', ident_key))
573 # Create a temporary server key file
574 tmp_known_hosts = _make_server_key_args(
575 server_key, host, port, args)
579 # connects to the remote host and starts a remote connection
580 proc = subprocess.Popen(args,
581 stdout = subprocess.PIPE,
582 stdin = subprocess.PIPE,
583 stderr = subprocess.PIPE)
585 # attach tempfile object to the process, to make sure the file stays
586 # alive until the process is finished with it
587 proc._known_hosts = tmp_known_hosts
590 out, err = _communicate(proc, stdin, timeout, err_on_timeout)
592 except RuntimeError,e:
596 print " timedout -> ", e.args
600 print " -> ", out, err
602 return ((out, err), proc)
604 def popen_scp(source, dest,
611 Copies from/to remote sites.
613 Source and destination should have the user and host encoded
616 If source is a file object, a special mode will be used to
617 create the remote file with the same contents.
619 If dest is a file object, the remote file (source) will be
620 read and written into dest.
622 In these modes, recursive cannot be True.
624 Source can be a list of files to copy to a single destination,
625 in which case it is advised that the destination be a folder.
629 print "scp", source, dest
631 if isinstance(source, file) and source.tell() == 0:
633 elif hasattr(source, 'read'):
634 tmp = tempfile.NamedTemporaryFile()
636 buf = source.read(65536)
644 if isinstance(source, file) or isinstance(dest, file) \
645 or hasattr(source, 'read') or hasattr(dest, 'write'):
648 # Parse source/destination as <user>@<server>:<path>
649 if isinstance(dest, basestring) and ':' in dest:
650 remspec, path = dest.split(':',1)
651 elif isinstance(source, basestring) and ':' in source:
652 remspec, path = source.split(':',1)
654 raise ValueError, "Both endpoints cannot be local"
655 user,host = remspec.rsplit('@',1)
656 tmp_known_hosts = None
658 args = ['ssh', '-l', user, '-C',
659 # Don't bother with localhost. Makes test easier
660 '-o', 'NoHostAuthenticationForLocalhost=yes',
663 args.append('-P%d' % port)
665 args.extend(('-i', ident_key))
667 # Create a temporary server key file
668 tmp_known_hosts = _make_server_key_args(
669 server_key, host, port, args)
671 if isinstance(source, file) or hasattr(source, 'read'):
672 args.append('cat > %s' % (shell_escape(path),))
673 elif isinstance(dest, file) or hasattr(dest, 'write'):
674 args.append('cat %s' % (shell_escape(path),))
676 raise AssertionError, "Unreachable code reached! :-Q"
678 # connects to the remote host and starts a remote connection
679 if isinstance(source, file):
680 proc = subprocess.Popen(args,
681 stdout = open('/dev/null','w'),
682 stderr = subprocess.PIPE,
684 err = proc.stderr.read()
685 proc._known_hosts = tmp_known_hosts
686 eintr_retry(proc.wait)()
687 return ((None,err), proc)
688 elif isinstance(dest, file):
689 proc = subprocess.Popen(args,
690 stdout = open('/dev/null','w'),
691 stderr = subprocess.PIPE,
693 err = proc.stderr.read()
694 proc._known_hosts = tmp_known_hosts
695 eintr_retry(proc.wait)()
696 return ((None,err), proc)
697 elif hasattr(source, 'read'):
698 # file-like (but not file) source
699 proc = subprocess.Popen(args,
700 stdout = open('/dev/null','w'),
701 stderr = subprocess.PIPE,
702 stdin = subprocess.PIPE)
708 buf = source.read(4096)
713 rdrdy, wrdy, broken = select.select(
716 [proc.stderr,proc.stdin])
718 if proc.stderr in rdrdy:
719 # use os.read for fully unbuffered behavior
720 err.append(os.read(proc.stderr.fileno(), 4096))
722 if proc.stdin in wrdy:
723 proc.stdin.write(buf)
729 err.append(proc.stderr.read())
731 proc._known_hosts = tmp_known_hosts
732 eintr_retry(proc.wait)()
733 return ((None,''.join(err)), proc)
734 elif hasattr(dest, 'write'):
735 # file-like (but not file) dest
736 proc = subprocess.Popen(args,
737 stdout = subprocess.PIPE,
738 stderr = subprocess.PIPE,
739 stdin = open('/dev/null','w'))
744 rdrdy, wrdy, broken = select.select(
745 [proc.stderr, proc.stdout],
747 [proc.stderr, proc.stdout])
749 if proc.stderr in rdrdy:
750 # use os.read for fully unbuffered behavior
751 err.append(os.read(proc.stderr.fileno(), 4096))
753 if proc.stdout in rdrdy:
754 # use os.read for fully unbuffered behavior
755 buf = os.read(proc.stdout.fileno(), 4096)
764 err.append(proc.stderr.read())
766 proc._known_hosts = tmp_known_hosts
767 eintr_retry(proc.wait)()
768 return ((None,''.join(err)), proc)
770 raise AssertionError, "Unreachable code reached! :-Q"
772 # Parse destination as <user>@<server>:<path>
773 if isinstance(dest, basestring) and ':' in dest:
774 remspec, path = dest.split(':',1)
775 elif isinstance(source, basestring) and ':' in source:
776 remspec, path = source.split(':',1)
778 raise ValueError, "Both endpoints cannot be local"
779 user,host = remspec.rsplit('@',1)
782 tmp_known_hosts = None
783 args = ['scp', '-q', '-p', '-C',
784 # Don't bother with localhost. Makes test easier
785 '-o', 'NoHostAuthenticationForLocalhost=yes' ]
787 args.append('-P%d' % port)
791 args.extend(('-i', ident_key))
793 # Create a temporary server key file
794 tmp_known_hosts = _make_server_key_args(
795 server_key, host, port, args)
796 if isinstance(source,list):
802 # connects to the remote host and starts a remote connection
803 proc = subprocess.Popen(args,
804 stdout = subprocess.PIPE,
805 stdin = subprocess.PIPE,
806 stderr = subprocess.PIPE)
807 proc._known_hosts = tmp_known_hosts
809 comm = proc.communicate()
810 eintr_retry(proc.wait)()
813 def decode_and_execute():
814 # The python code we want to execute might have characters that
815 # are not compatible with the 'inline' mode we are using. To avoid
816 # problems we receive the encoded python code in base64 as a input
817 # stream and decode it for execution.
822 cmd += os.read(0, 1)# one byte from stdin
824 if e.errno == errno.EINTR:
830 cmd = base64.b64decode(cmd)
831 # Uncomment for debug
832 #os.write(2, "Executing python code: %s\n" % cmd)
833 os.write(1, "OK\n") # send a sync message
836 def popen_python(python_code,
837 communication = DC.ACCESS_LOCAL,
847 environment_setup = ""):
851 python_path.replace("'", r"'\''")
852 cmd = """PYTHONPATH="$PYTHONPATH":'%s' """ % python_path
854 if environment_setup:
855 cmd += environment_setup
857 # Uncomment for debug (to run everything under strace)
858 # We had to verify if strace works (cannot nest them)
859 #cmd += "if strace echo >/dev/null 2>&1; then CMD='strace -ff -tt -s 200 -o strace.out'; else CMD=''; fi\n"
861 #cmd += "strace -f -tt -s 200 -o strace$$.out "
863 cmd += "python -c 'import sys; sys.path.insert(0,%s); from nepi.util import server; server.decode_and_execute()'" % (
864 repr(os.path.dirname(os.path.dirname(nepi.__file__))).replace("'",'"'),
869 cmd = "sudo bash -c " + shell_escape(cmd)
873 if communication == DC.ACCESS_SSH:
874 tmp_known_hosts = None
876 # Don't bother with localhost. Makes test easier
877 '-o', 'NoHostAuthenticationForLocalhost=yes',
882 args.append('-p%d' % port)
884 args.extend(('-i', ident_key))
888 # Create a temporary server key file
889 tmp_known_hosts = _make_server_key_args(
890 server_key, host, port, args)
893 args = [ "/bin/bash", "-c", cmd ]
895 # connects to the remote host and starts a remote
896 proc = subprocess.Popen(args,
898 stdout = subprocess.PIPE,
899 stdin = subprocess.PIPE,
900 stderr = subprocess.PIPE)
902 if communication == DC.ACCESS_SSH:
903 proc._known_hosts = tmp_known_hosts
905 # send the command to execute
906 os.write(proc.stdin.fileno(),
907 base64.b64encode(python_code) + "\n")
911 msg = os.read(proc.stdout.fileno(), 3)
914 if e.errno == errno.EINTR:
920 raise RuntimeError, "Failed to start remote python interpreter: \nout:\n%s%s\nerr:\n%s" % (
921 msg, proc.stdout.read(), proc.stderr.read())
926 def _communicate(self, input, timeout=None, err_on_timeout=True):
929 stdout = None # Return
930 stderr = None # Return
934 if timeout is not None:
935 timelimit = time.time() + timeout
936 killtime = timelimit + 4
937 bailtime = timelimit + 4
940 # Flush stdio buffer. This might block, if the user has
941 # been writing to .stdin in an uncontrolled fashion.
944 write_set.append(self.stdin)
948 read_set.append(self.stdout)
951 read_set.append(self.stderr)
955 while read_set or write_set:
956 if timeout is not None:
957 curtime = time.time()
958 if timeout is None or curtime > timelimit:
959 if curtime > bailtime:
961 elif curtime > killtime:
962 signum = signal.SIGKILL
964 signum = signal.SIGTERM
966 os.kill(self.pid, signum)
969 select_timeout = timelimit - curtime + 0.1
971 select_timeout = None
974 rlist, wlist, xlist = select.select(read_set, write_set, [], select_timeout)
975 except select.error,e:
981 if self.stdin in wlist:
982 # When select has indicated that the file is writable,
983 # we can write up to PIPE_BUF bytes without risk
984 # blocking. POSIX defines PIPE_BUF >= 512
985 bytes_written = os.write(self.stdin.fileno(), buffer(input, input_offset, 512))
986 input_offset += bytes_written
987 if input_offset >= len(input):
989 write_set.remove(self.stdin)
991 if self.stdout in rlist:
992 data = os.read(self.stdout.fileno(), 1024)
995 read_set.remove(self.stdout)
998 if self.stderr in rlist:
999 data = os.read(self.stderr.fileno(), 1024)
1002 read_set.remove(self.stderr)
1005 # All data exchanged. Translate lists into strings.
1006 if stdout is not None:
1007 stdout = ''.join(stdout)
1008 if stderr is not None:
1009 stderr = ''.join(stderr)
1011 # Translate newlines, if requested. We cannot let the file
1012 # object do the translation: It is based on stdio, which is
1013 # impossible to combine with select (unless forcing no
1015 if self.universal_newlines and hasattr(file, 'newlines'):
1017 stdout = self._translate_newlines(stdout)
1019 stderr = self._translate_newlines(stderr)
1021 if killed and err_on_timeout:
1022 errcode = self.poll()
1023 raise RuntimeError, ("Operation timed out", errcode, stdout, stderr)
1029 return (stdout, stderr)