2 # -*- coding: utf-8 -*-
4 from nepi.util.constants import DeploymentConfiguration as DC
26 CTRL_SOCK = "ctrl.sock"
27 STD_ERR = "stderr.log"
32 TRACE = os.environ.get("NEPI_TRACE", "false").lower() in ("true", "1", "on")
34 if hasattr(os, "devnull"):
37 DEV_NULL = "/dev/null"
39 SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
42 """ Escapes strings so that they are safe to use as command-line arguments """
43 if SHELL_SAFE.match(s):
44 # safe string - no escaping needed
47 # unsafe string - escape
49 if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",):
52 return "'$'\\x%02x''" % (ord(c),)
53 s = ''.join(map(escp,s))
56 def eintr_retry(func):
58 @functools.wraps(func)
60 retry = kw.pop("_retry", False)
61 for i in xrange(0 if retry else 4):
64 except (select.error, socket.error), args:
65 if args[0] == errno.EINTR:
70 if e.errno == errno.EINTR:
79 def __init__(self, root_dir = ".", log_level = DC.ERROR_LEVEL,
80 environment_setup = "", clean_root = False):
81 self._root_dir = root_dir
82 self._clean_root = clean_root
84 self._ctrl_sock = None
85 self._log_level = log_level
87 self._environment_setup = environment_setup
96 # can not return normally after fork beacuse no exec was done.
97 # This means that if we don't do a os._exit(0) here the code that
98 # follows the call to "Server.run()" in the "caller code" will be
99 # executed... but by now it has already been executed after the
100 # first process (the one that did the first fork) returned.
103 print >>sys.stderr, "SERVER_ERROR."
107 print >>sys.stderr, "SERVER_READY."
110 # pipes for process synchronization
114 root = os.path.normpath(self._root_dir)
115 if os.path.exists(root) and self._clean_root:
117 if not os.path.exists(root):
118 os.makedirs(root, 0755)
126 except OSError, e: # pragma: no cover
127 if e.errno == errno.EINTR:
133 # os.waitpid avoids leaving a <defunc> (zombie) process
134 st = os.waitpid(pid1, 0)[1]
136 raise RuntimeError("Daemonization failed")
137 # return 0 to inform the caller method that this is not the
142 # Decouple from parent environment.
143 os.chdir(self._root_dir)
150 # see ref: "os._exit(0)"
153 # close all open file descriptors.
154 max_fd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
155 if (max_fd == resource.RLIM_INFINITY):
157 for fd in range(3, max_fd):
164 # Redirect standard file descriptors.
165 stdin = open(DEV_NULL, "r")
166 stderr = stdout = open(STD_ERR, "a", 0)
167 os.dup2(stdin.fileno(), sys.stdin.fileno())
168 # NOTE: sys.stdout.write will still be buffered, even if the file
169 # was opened with 0 buffer
170 os.dup2(stdout.fileno(), sys.stdout.fileno())
171 os.dup2(stderr.fileno(), sys.stderr.fileno())
174 if self._environment_setup:
175 # parse environment variables and pass to child process
176 # do it by executing shell commands, in case there's some heavy setup involved
177 envproc = subprocess.Popen(
179 "( %s python -c 'import os,sys ; print \"\\x01\".join(\"\\x02\".join(map(str,x)) for x in os.environ.iteritems())' ) | tail -1" %
180 ( self._environment_setup, ) ],
181 stdin = subprocess.PIPE,
182 stdout = subprocess.PIPE,
183 stderr = subprocess.PIPE
185 out,err = envproc.communicate()
187 # parse new environment
189 environment = dict(map(lambda x:x.split("\x02"), out.split("\x01")))
191 # apply to current environment
192 for name, value in environment.iteritems():
193 os.environ[name] = value
196 if 'PYTHONPATH' in environment:
197 sys.path = environment['PYTHONPATH'].split(':') + sys.path
199 # create control socket
200 self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
201 self._ctrl_sock.bind(CTRL_SOCK)
202 self._ctrl_sock.listen(0)
204 # let the parent process know that the daemonization is finished
209 def post_daemonize(self):
210 os.environ["NEPI_CONTROLLER_LOGLEVEL"] = self._log_level
211 # QT, for some strange reason, redefines the SIGCHILD handler to write
212 # a \0 to a fd (lets say fileno 'x'), when ever a SIGCHILD is received.
213 # Server dameonization closes all file descriptors from fileno '3',
214 # but the overloaded handler (inherited by the forked process) will
215 # keep trying to write the \0 to fileno 'x', which might have been reused
216 # after closing, for other operations. This is bad bad bad when fileno 'x'
217 # is in use for communication pouroses, because unexpected \0 start
218 # appearing in the communication messages... this is exactly what happens
219 # when using netns in daemonized form. Thus, be have no other alternative than
220 # restoring the SIGCHLD handler to the default here.
222 signal.signal(signal.SIGCHLD, signal.SIG_DFL)
225 while not self._stop:
226 conn, addr = self._ctrl_sock.accept()
227 self.log_error("ACCEPTED CONNECTION: %s" % (addr,))
229 while not self._stop:
231 msg = self.recv_msg(conn)
232 except socket.timeout, e:
233 self.log_error("SERVER recv_msg: connection timedout ")
237 self.log_error("CONNECTION LOST")
242 reply = self.stop_action()
244 reply = self.reply_action(msg)
247 self.send_reply(conn, reply)
250 self.log_error("NOTICE: Awaiting for reconnection")
258 def recv_msg(self, conn):
261 while '\n' not in chunk:
263 chunk = conn.recv(1024)
264 except (OSError, socket.error), e:
265 if e[0] != errno.EINTR:
274 data = ''.join(data).split('\n',1)
277 data, self._rdbuf = data
279 decoded = base64.b64decode(data)
280 return decoded.rstrip()
282 def send_reply(self, conn, reply):
283 encoded = base64.b64encode(reply)
284 conn.send("%s\n" % encoded)
288 self._ctrl_sock.close()
293 def stop_action(self):
294 return "Stopping server"
296 def reply_action(self, msg):
297 return "Reply to: %s" % msg
299 def log_error(self, text = None, context = ''):
301 text = traceback.format_exc()
302 date = time.strftime("%Y-%m-%d %H:%M:%S")
304 context = " (%s)" % (context,)
305 sys.stderr.write("ERROR%s: %s\n%s\n" % (context, date, text))
308 def log_debug(self, text):
309 if self._log_level == DC.DEBUG_LEVEL:
310 date = time.strftime("%Y-%m-%d %H:%M:%S")
311 sys.stderr.write("DEBUG: %s\n%s\n" % (date, text))
313 class Forwarder(object):
314 def __init__(self, root_dir = "."):
315 self._ctrl_sock = None
316 self._root_dir = root_dir
322 print >>sys.stderr, "FORWARDER_READY."
323 while not self._stop:
324 data = self.read_data()
326 # Connection to client lost
328 self.send_to_server(data)
330 data = self.recv_from_server()
332 # Connection to server lost
333 raise IOError, "Connection to server lost while "\
335 self.write_data(data)
339 return sys.stdin.readline()
341 def write_data(self, data):
342 sys.stdout.write(data)
343 # sys.stdout.write is buffered, this is why we need to do a flush()
346 def send_to_server(self, data):
348 self._ctrl_sock.send(data)
349 except (IOError, socket.error), e:
350 if e[0] == errno.EPIPE:
352 self._ctrl_sock.send(data)
355 encoded = data.rstrip()
356 msg = base64.b64decode(encoded)
360 def recv_from_server(self):
363 while '\n' not in chunk:
365 chunk = self._ctrl_sock.recv(1024)
366 except (OSError, socket.error), e:
367 if e[0] != errno.EINTR:
375 data = ''.join(data).split('\n',1)
378 data, self._rdbuf = data
384 self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
385 sock_addr = os.path.join(self._root_dir, CTRL_SOCK)
386 self._ctrl_sock.connect(sock_addr)
388 def disconnect(self):
390 self._ctrl_sock.close()
394 class Client(object):
395 def __init__(self, root_dir = ".", host = None, port = None, user = None,
396 agent = None, sudo = False, communication = DC.ACCESS_LOCAL,
397 environment_setup = ""):
398 self.root_dir = root_dir
399 self.addr = (host, port)
403 self.communication = communication
404 self.environment_setup = environment_setup
405 self._stopped = False
406 self._deferreds = collections.deque()
410 if self._process.poll() is None:
411 os.kill(self._process.pid, signal.SIGTERM)
415 root_dir = self.root_dir
416 (host, port) = self.addr
420 communication = self.communication
422 python_code = "from nepi.util import server;c=server.Forwarder(%r);\
423 c.forward()" % (root_dir,)
425 self._process = popen_python(python_code,
426 communication = communication,
432 environment_setup = self.environment_setup)
434 # Wait for the forwarder to be ready, otherwise nobody
435 # will be able to connect to it
436 helo = self._process.stderr.readline()
437 if helo != 'FORWARDER_READY.\n':
438 raise AssertionError, "Expected 'FORWARDER_READY.', got %r: %s" % (helo,
439 helo + self._process.stderr.read())
441 def send_msg(self, msg):
442 encoded = base64.b64encode(msg)
443 data = "%s\n" % encoded
446 self._process.stdin.write(data)
447 except (IOError, ValueError):
448 # dead process, poll it to un-zombify
451 # try again after reconnect
452 # If it fails again, though, give up
454 self._process.stdin.write(data)
457 self.send_msg(STOP_MSG)
460 def defer_reply(self, transform=None):
462 self._deferreds.append(defer_entry)
464 functools.partial(self.read_reply, defer_entry, transform)
467 def _read_reply(self):
468 data = self._process.stdout.readline()
469 encoded = data.rstrip()
471 # empty == eof == dead process, poll it to un-zombify
474 raise RuntimeError, "Forwarder died while awaiting reply: %s" % (self._process.stderr.read(),)
475 return base64.b64decode(encoded)
477 def read_reply(self, which=None, transform=None):
478 # Test to see if someone did it already
479 if which is not None and len(which):
481 # ...just return the deferred value
483 return transform(which[0])
487 # Process all deferreds until the one we're looking for
488 # or until the queue is empty
489 while self._deferreds:
491 deferred = self._deferreds.popleft()
496 deferred.append(self._read_reply())
497 if deferred is which:
498 # We reached the one we were looking for
500 return transform(deferred[0])
505 # They've requested a synchronous read
507 return transform(self._read_reply())
509 return self._read_reply()
511 def _make_server_key_args(server_key, host, port, args):
513 Returns a reference to the created temporary file, and adds the
514 corresponding arguments to the given argument list.
516 Make sure to hold onto it until the process is done with the file
519 host = '%s:%s' % (host,port)
520 # Create a temporary server key file
521 tmp_known_hosts = tempfile.NamedTemporaryFile()
523 # Add the intended host key
524 tmp_known_hosts.write('%s,%s %s\n' % (host, socket.gethostbyname(host), server_key))
526 # If we're not in strict mode, add user-configured keys
527 if os.environ.get('NEPI_STRICT_AUTH_MODE',"").lower() not in ('1','true','on'):
528 user_hosts_path = '%s/.ssh/known_hosts' % (os.environ.get('HOME',""),)
529 if os.access(user_hosts_path, os.R_OK):
530 f = open(user_hosts_path, "r")
531 tmp_known_hosts.write(f.read())
534 tmp_known_hosts.flush()
536 args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
538 return tmp_known_hosts
540 def popen_ssh_command(command, host, port, user, agent,
547 err_on_timeout = True):
549 Executes a remote commands, returns ((stdout,stderr),process)
552 print "ssh", host, command
554 tmp_known_hosts = None
556 # Don't bother with localhost. Makes test easier
557 '-o', 'NoHostAuthenticationForLocalhost=yes',
562 args.append('-p%d' % port)
564 args.extend(('-i', ident_key))
568 # Create a temporary server key file
569 tmp_known_hosts = _make_server_key_args(
570 server_key, host, port, args)
574 # connects to the remote host and starts a remote connection
575 proc = subprocess.Popen(args,
576 stdout = subprocess.PIPE,
577 stdin = subprocess.PIPE,
578 stderr = subprocess.PIPE)
580 # attach tempfile object to the process, to make sure the file stays
581 # alive until the process is finished with it
582 proc._known_hosts = tmp_known_hosts
585 out, err = _communicate(proc, stdin, timeout, err_on_timeout)
587 except RuntimeError,e:
591 print " timedout -> ", e.args
595 print " -> ", out, err
597 return ((out, err), proc)
599 def popen_scp(source, dest,
606 Copies from/to remote sites.
608 Source and destination should have the user and host encoded
611 If source is a file object, a special mode will be used to
612 create the remote file with the same contents.
614 If dest is a file object, the remote file (source) will be
615 read and written into dest.
617 In these modes, recursive cannot be True.
619 Source can be a list of files to copy to a single destination,
620 in which case it is advised that the destination be a folder.
624 print "scp", source, dest
626 if isinstance(source, file) and source.tell() == 0:
628 elif hasattr(source, 'read'):
629 tmp = tempfile.NamedTemporaryFile()
631 buf = source.read(65536)
639 if isinstance(source, file) or isinstance(dest, file) \
640 or hasattr(source, 'read') or hasattr(dest, 'write'):
643 # Parse source/destination as <user>@<server>:<path>
644 if isinstance(dest, basestring) and ':' in dest:
645 remspec, path = dest.split(':',1)
646 elif isinstance(source, basestring) and ':' in source:
647 remspec, path = source.split(':',1)
649 raise ValueError, "Both endpoints cannot be local"
650 user,host = remspec.rsplit('@',1)
651 tmp_known_hosts = None
653 args = ['ssh', '-l', user, '-C',
654 # Don't bother with localhost. Makes test easier
655 '-o', 'NoHostAuthenticationForLocalhost=yes',
658 args.append('-P%d' % port)
660 args.extend(('-i', ident_key))
662 # Create a temporary server key file
663 tmp_known_hosts = _make_server_key_args(
664 server_key, host, port, args)
666 if isinstance(source, file) or hasattr(source, 'read'):
667 args.append('cat > %s' % (shell_escape(path),))
668 elif isinstance(dest, file) or hasattr(dest, 'write'):
669 args.append('cat %s' % (shell_escape(path),))
671 raise AssertionError, "Unreachable code reached! :-Q"
673 # connects to the remote host and starts a remote connection
674 if isinstance(source, file):
675 proc = subprocess.Popen(args,
676 stdout = open('/dev/null','w'),
677 stderr = subprocess.PIPE,
679 err = proc.stderr.read()
680 proc._known_hosts = tmp_known_hosts
681 eintr_retry(proc.wait)()
682 return ((None,err), proc)
683 elif isinstance(dest, file):
684 proc = subprocess.Popen(args,
685 stdout = open('/dev/null','w'),
686 stderr = subprocess.PIPE,
688 err = proc.stderr.read()
689 proc._known_hosts = tmp_known_hosts
690 eintr_retry(proc.wait)()
691 return ((None,err), proc)
692 elif hasattr(source, 'read'):
693 # file-like (but not file) source
694 proc = subprocess.Popen(args,
695 stdout = open('/dev/null','w'),
696 stderr = subprocess.PIPE,
697 stdin = subprocess.PIPE)
703 buf = source.read(4096)
708 rdrdy, wrdy, broken = select.select(
711 [proc.stderr,proc.stdin])
713 if proc.stderr in rdrdy:
714 # use os.read for fully unbuffered behavior
715 err.append(os.read(proc.stderr.fileno(), 4096))
717 if proc.stdin in wrdy:
718 proc.stdin.write(buf)
724 err.append(proc.stderr.read())
726 proc._known_hosts = tmp_known_hosts
727 eintr_retry(proc.wait)()
728 return ((None,''.join(err)), proc)
729 elif hasattr(dest, 'write'):
730 # file-like (but not file) dest
731 proc = subprocess.Popen(args,
732 stdout = subprocess.PIPE,
733 stderr = subprocess.PIPE,
734 stdin = open('/dev/null','w'))
739 rdrdy, wrdy, broken = select.select(
740 [proc.stderr, proc.stdout],
742 [proc.stderr, proc.stdout])
744 if proc.stderr in rdrdy:
745 # use os.read for fully unbuffered behavior
746 err.append(os.read(proc.stderr.fileno(), 4096))
748 if proc.stdout in rdrdy:
749 # use os.read for fully unbuffered behavior
750 buf = os.read(proc.stdout.fileno(), 4096)
759 err.append(proc.stderr.read())
761 proc._known_hosts = tmp_known_hosts
762 eintr_retry(proc.wait)()
763 return ((None,''.join(err)), proc)
765 raise AssertionError, "Unreachable code reached! :-Q"
767 # Parse destination as <user>@<server>:<path>
768 if isinstance(dest, basestring) and ':' in dest:
769 remspec, path = dest.split(':',1)
770 elif isinstance(source, basestring) and ':' in source:
771 remspec, path = source.split(':',1)
773 raise ValueError, "Both endpoints cannot be local"
774 user,host = remspec.rsplit('@',1)
777 tmp_known_hosts = None
778 args = ['scp', '-q', '-p', '-C',
779 # Don't bother with localhost. Makes test easier
780 '-o', 'NoHostAuthenticationForLocalhost=yes' ]
782 args.append('-P%d' % port)
786 args.extend(('-i', ident_key))
788 # Create a temporary server key file
789 tmp_known_hosts = _make_server_key_args(
790 server_key, host, port, args)
791 if isinstance(source,list):
797 # connects to the remote host and starts a remote connection
798 proc = subprocess.Popen(args,
799 stdout = subprocess.PIPE,
800 stdin = subprocess.PIPE,
801 stderr = subprocess.PIPE)
802 proc._known_hosts = tmp_known_hosts
804 comm = proc.communicate()
805 eintr_retry(proc.wait)()
808 def decode_and_execute():
809 # The python code we want to execute might have characters that
810 # are not compatible with the 'inline' mode we are using. To avoid
811 # problems we receive the encoded python code in base64 as a input
812 # stream and decode it for execution.
817 cmd += os.read(0, 1)# one byte from stdin
819 if e.errno == errno.EINTR:
825 cmd = base64.b64decode(cmd)
826 # Uncomment for debug
827 #os.write(2, "Executing python code: %s\n" % cmd)
828 os.write(1, "OK\n") # send a sync message
831 def popen_python(python_code,
832 communication = DC.ACCESS_LOCAL,
842 environment_setup = ""):
849 python_path.replace("'", r"'\''")
850 cmd = """PYTHONPATH="$PYTHONPATH":'%s' """ % python_path
852 if environment_setup:
853 cmd += environment_setup
855 # Uncomment for debug (to run everything under strace)
856 # We had to verify if strace works (cannot nest them)
857 #cmd += "if strace echo >/dev/null 2>&1; then CMD='strace -ff -tt -s 200 -o strace.out'; else CMD=''; fi\n"
859 #cmd += "strace -f -tt -s 200 -o strace$$.out "
861 cmd += "python -c 'import sys; sys.path.append(%s); from nepi.util import server; server.decode_and_execute()'" % (
862 repr(os.path.dirname(os.path.dirname(nepi.__file__))).replace("'",'"'),
865 if communication == DC.ACCESS_SSH:
866 tmp_known_hosts = None
868 # Don't bother with localhost. Makes test easier
869 '-o', 'NoHostAuthenticationForLocalhost=yes',
874 args.append('-p%d' % port)
876 args.extend(('-i', ident_key))
880 # Create a temporary server key file
881 tmp_known_hosts = _make_server_key_args(
882 server_key, host, port, args)
888 # connects to the remote host and starts a remote
889 proc = subprocess.Popen(args,
891 stdout = subprocess.PIPE,
892 stdin = subprocess.PIPE,
893 stderr = subprocess.PIPE)
895 if communication == DC.ACCESS_SSH:
896 proc._known_hosts = tmp_known_hosts
898 # send the command to execute
899 os.write(proc.stdin.fileno(),
900 base64.b64encode(python_code) + "\n")
904 msg = os.read(proc.stdout.fileno(), 3)
907 if e.errno == errno.EINTR:
913 raise RuntimeError, "Failed to start remote python interpreter: \nout:\n%s%s\nerr:\n%s" % (
914 msg, proc.stdout.read(), proc.stderr.read())
919 def _communicate(self, input, timeout=None, err_on_timeout=True):
922 stdout = None # Return
923 stderr = None # Return
927 if timeout is not None:
928 timelimit = time.time() + timeout
929 killtime = timelimit + 4
930 bailtime = timelimit + 4
933 # Flush stdio buffer. This might block, if the user has
934 # been writing to .stdin in an uncontrolled fashion.
937 write_set.append(self.stdin)
941 read_set.append(self.stdout)
944 read_set.append(self.stderr)
948 while read_set or write_set:
949 if timeout is not None:
950 curtime = time.time()
951 if timeout is None or curtime > timelimit:
952 if curtime > bailtime:
954 elif curtime > killtime:
955 signum = signal.SIGKILL
957 signum = signal.SIGTERM
959 os.kill(self.pid, signum)
962 select_timeout = timelimit - curtime + 0.1
964 select_timeout = None
967 rlist, wlist, xlist = select.select(read_set, write_set, [], select_timeout)
968 except select.error,e:
974 if self.stdin in wlist:
975 # When select has indicated that the file is writable,
976 # we can write up to PIPE_BUF bytes without risk
977 # blocking. POSIX defines PIPE_BUF >= 512
978 bytes_written = os.write(self.stdin.fileno(), buffer(input, input_offset, 512))
979 input_offset += bytes_written
980 if input_offset >= len(input):
982 write_set.remove(self.stdin)
984 if self.stdout in rlist:
985 data = os.read(self.stdout.fileno(), 1024)
988 read_set.remove(self.stdout)
991 if self.stderr in rlist:
992 data = os.read(self.stderr.fileno(), 1024)
995 read_set.remove(self.stderr)
998 # All data exchanged. Translate lists into strings.
999 if stdout is not None:
1000 stdout = ''.join(stdout)
1001 if stderr is not None:
1002 stderr = ''.join(stderr)
1004 # Translate newlines, if requested. We cannot let the file
1005 # object do the translation: It is based on stdio, which is
1006 # impossible to combine with select (unless forcing no
1008 if self.universal_newlines and hasattr(file, 'newlines'):
1010 stdout = self._translate_newlines(stdout)
1012 stderr = self._translate_newlines(stderr)
1014 if killed and err_on_timeout:
1015 errcode = self.poll()
1016 raise RuntimeError, ("Operation timed out", errcode, stdout, stderr)
1022 return (stdout, stderr)