2 # -*- coding: utf-8 -*-
4 from nepi.util.constants import DeploymentConfiguration as DC
26 CTRL_SOCK = "ctrl.sock"
27 STD_ERR = "stderr.log"
32 TRACE = os.environ.get("NEPI_TRACE", "false").lower() in ("true", "1", "on")
34 if hasattr(os, "devnull"):
37 DEV_NULL = "/dev/null"
39 SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
42 """ Escapes strings so that they are safe to use as command-line arguments """
43 if SHELL_SAFE.match(s):
44 # safe string - no escaping needed
47 # unsafe string - escape
49 if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",):
52 return "'$'\\x%02x''" % (ord(c),)
53 s = ''.join(map(escp,s))
56 def eintr_retry(func):
58 @functools.wraps(func)
60 retry = kw.pop("_retry", False)
61 for i in xrange(0 if retry else 4):
64 except (select.error, socket.error), args:
65 if args[0] == errno.EINTR:
70 if e.errno == errno.EINTR:
79 def __init__(self, root_dir = ".", log_level = DC.ERROR_LEVEL, environment_setup = ""):
80 self._root_dir = root_dir
82 self._ctrl_sock = None
83 self._log_level = log_level
85 self._environment_setup = environment_setup
94 # can not return normally after fork beacuse no exec was done.
95 # This means that if we don't do a os._exit(0) here the code that
96 # follows the call to "Server.run()" in the "caller code" will be
97 # executed... but by now it has already been executed after the
98 # first process (the one that did the first fork) returned.
101 print >>sys.stderr, "SERVER_ERROR."
105 print >>sys.stderr, "SERVER_READY."
108 # pipes for process synchronization
112 root = os.path.normpath(self._root_dir)
113 if not os.path.exists(root):
114 os.makedirs(root, 0755)
122 except OSError, e: # pragma: no cover
123 if e.errno == errno.EINTR:
129 # os.waitpid avoids leaving a <defunc> (zombie) process
130 st = os.waitpid(pid1, 0)[1]
132 raise RuntimeError("Daemonization failed")
133 # return 0 to inform the caller method that this is not the
138 # Decouple from parent environment.
139 os.chdir(self._root_dir)
146 # see ref: "os._exit(0)"
149 # close all open file descriptors.
150 max_fd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
151 if (max_fd == resource.RLIM_INFINITY):
153 for fd in range(3, max_fd):
160 # Redirect standard file descriptors.
161 stdin = open(DEV_NULL, "r")
162 stderr = stdout = open(STD_ERR, "a", 0)
163 os.dup2(stdin.fileno(), sys.stdin.fileno())
164 # NOTE: sys.stdout.write will still be buffered, even if the file
165 # was opened with 0 buffer
166 os.dup2(stdout.fileno(), sys.stdout.fileno())
167 os.dup2(stderr.fileno(), sys.stderr.fileno())
170 if self._environment_setup:
171 # parse environment variables and pass to child process
172 # do it by executing shell commands, in case there's some heavy setup involved
173 envproc = subprocess.Popen(
175 "( %s python -c 'import os,sys ; print \"\\x01\".join(\"\\x02\".join(map(str,x)) for x in os.environ.iteritems())' ) | tail -1" %
176 ( self._environment_setup, ) ],
177 stdin = subprocess.PIPE,
178 stdout = subprocess.PIPE,
179 stderr = subprocess.PIPE
181 out,err = envproc.communicate()
183 # parse new environment
185 environment = dict(map(lambda x:x.split("\x02"), out.split("\x01")))
187 # apply to current environment
188 for name, value in environment.iteritems():
189 os.environ[name] = value
192 if 'PYTHONPATH' in environment:
193 sys.path = environment['PYTHONPATH'].split(':') + sys.path
195 # create control socket
196 self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
197 self._ctrl_sock.bind(CTRL_SOCK)
198 self._ctrl_sock.listen(0)
200 # let the parent process know that the daemonization is finished
205 def post_daemonize(self):
206 os.environ["NEPI_CONTROLLER_LOGLEVEL"] = self._log_level
207 # QT, for some strange reason, redefines the SIGCHILD handler to write
208 # a \0 to a fd (lets say fileno 'x'), when ever a SIGCHILD is received.
209 # Server dameonization closes all file descriptors from fileno '3',
210 # but the overloaded handler (inherited by the forked process) will
211 # keep trying to write the \0 to fileno 'x', which might have been reused
212 # after closing, for other operations. This is bad bad bad when fileno 'x'
213 # is in use for communication pouroses, because unexpected \0 start
214 # appearing in the communication messages... this is exactly what happens
215 # when using netns in daemonized form. Thus, be have no other alternative than
216 # restoring the SIGCHLD handler to the default here.
218 signal.signal(signal.SIGCHLD, signal.SIG_DFL)
221 while not self._stop:
222 conn, addr = self._ctrl_sock.accept()
223 self.log_error("ACCEPTED CONNECTION: %s" % (addr,))
225 while not self._stop:
227 msg = self.recv_msg(conn)
228 except socket.timeout, e:
229 self.log_error("SERVER recv_msg: connection timedout ")
233 self.log_error("CONNECTION LOST")
238 reply = self.stop_action()
240 reply = self.reply_action(msg)
243 self.send_reply(conn, reply)
246 self.log_error("NOTICE: Awaiting for reconnection")
254 def recv_msg(self, conn):
257 while '\n' not in chunk:
259 chunk = conn.recv(1024)
260 except (OSError, socket.error), e:
261 if e[0] != errno.EINTR:
270 data = ''.join(data).split('\n',1)
273 data, self._rdbuf = data
275 decoded = base64.b64decode(data)
276 return decoded.rstrip()
278 def send_reply(self, conn, reply):
279 encoded = base64.b64encode(reply)
280 conn.send("%s\n" % encoded)
284 self._ctrl_sock.close()
289 def stop_action(self):
290 return "Stopping server"
292 def reply_action(self, msg):
293 return "Reply to: %s" % msg
295 def log_error(self, text = None, context = ''):
297 text = traceback.format_exc()
298 date = time.strftime("%Y-%m-%d %H:%M:%S")
300 context = " (%s)" % (context,)
301 sys.stderr.write("ERROR%s: %s\n%s\n" % (context, date, text))
304 def log_debug(self, text):
305 if self._log_level == DC.DEBUG_LEVEL:
306 date = time.strftime("%Y-%m-%d %H:%M:%S")
307 sys.stderr.write("DEBUG: %s\n%s\n" % (date, text))
309 class Forwarder(object):
310 def __init__(self, root_dir = "."):
311 self._ctrl_sock = None
312 self._root_dir = root_dir
318 print >>sys.stderr, "FORWARDER_READY."
319 while not self._stop:
320 data = self.read_data()
322 # Connection to client lost
324 self.send_to_server(data)
326 data = self.recv_from_server()
328 # Connection to server lost
329 raise IOError, "Connection to server lost while "\
331 self.write_data(data)
335 return sys.stdin.readline()
337 def write_data(self, data):
338 sys.stdout.write(data)
339 # sys.stdout.write is buffered, this is why we need to do a flush()
342 def send_to_server(self, data):
344 self._ctrl_sock.send(data)
345 except (IOError, socket.error), e:
346 if e[0] == errno.EPIPE:
348 self._ctrl_sock.send(data)
351 encoded = data.rstrip()
352 msg = base64.b64decode(encoded)
356 def recv_from_server(self):
359 while '\n' not in chunk:
361 chunk = self._ctrl_sock.recv(1024)
362 except (OSError, socket.error), e:
363 if e[0] != errno.EINTR:
371 data = ''.join(data).split('\n',1)
374 data, self._rdbuf = data
380 self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
381 sock_addr = os.path.join(self._root_dir, CTRL_SOCK)
382 self._ctrl_sock.connect(sock_addr)
384 def disconnect(self):
386 self._ctrl_sock.close()
390 class Client(object):
391 def __init__(self, root_dir = ".", host = None, port = None, user = None,
392 agent = None, sudo = False, communication = DC.ACCESS_LOCAL,
393 environment_setup = ""):
394 self.root_dir = root_dir
395 self.addr = (host, port)
399 self.communication = communication
400 self.environment_setup = environment_setup
401 self._stopped = False
402 self._deferreds = collections.deque()
406 if self._process.poll() is None:
407 os.kill(self._process.pid, signal.SIGTERM)
411 root_dir = self.root_dir
412 (host, port) = self.addr
416 communication = self.communication
418 python_code = "from nepi.util import server;c=server.Forwarder(%r);\
419 c.forward()" % (root_dir,)
421 self._process = popen_python(python_code,
422 communication = communication,
428 environment_setup = self.environment_setup)
430 # Wait for the forwarder to be ready, otherwise nobody
431 # will be able to connect to it
432 helo = self._process.stderr.readline()
433 if helo != 'FORWARDER_READY.\n':
434 raise AssertionError, "Expected 'FORWARDER_READY.', got %r: %s" % (helo,
435 helo + self._process.stderr.read())
437 def send_msg(self, msg):
438 encoded = base64.b64encode(msg)
439 data = "%s\n" % encoded
442 self._process.stdin.write(data)
443 except (IOError, ValueError):
444 # dead process, poll it to un-zombify
447 # try again after reconnect
448 # If it fails again, though, give up
450 self._process.stdin.write(data)
453 self.send_msg(STOP_MSG)
456 def defer_reply(self, transform=None):
458 self._deferreds.append(defer_entry)
460 functools.partial(self.read_reply, defer_entry, transform)
463 def _read_reply(self):
464 data = self._process.stdout.readline()
465 encoded = data.rstrip()
467 # empty == eof == dead process, poll it to un-zombify
470 raise RuntimeError, "Forwarder died while awaiting reply: %s" % (self._process.stderr.read(),)
471 return base64.b64decode(encoded)
473 def read_reply(self, which=None, transform=None):
474 # Test to see if someone did it already
475 if which is not None and len(which):
477 # ...just return the deferred value
479 return transform(which[0])
483 # Process all deferreds until the one we're looking for
484 # or until the queue is empty
485 while self._deferreds:
487 deferred = self._deferreds.popleft()
492 deferred.append(self._read_reply())
493 if deferred is which:
494 # We reached the one we were looking for
496 return transform(deferred[0])
501 # They've requested a synchronous read
503 return transform(self._read_reply())
505 return self._read_reply()
507 def _make_server_key_args(server_key, host, port, args):
509 Returns a reference to the created temporary file, and adds the
510 corresponding arguments to the given argument list.
512 Make sure to hold onto it until the process is done with the file
515 host = '%s:%s' % (host,port)
516 # Create a temporary server key file
517 tmp_known_hosts = tempfile.NamedTemporaryFile()
519 # Add the intended host key
520 tmp_known_hosts.write('%s,%s %s\n' % (host, socket.gethostbyname(host), server_key))
522 # If we're not in strict mode, add user-configured keys
523 if os.environ.get('NEPI_STRICT_AUTH_MODE',"").lower() not in ('1','true','on'):
524 user_hosts_path = '%s/.ssh/known_hosts' % (os.environ.get('HOME',""),)
525 if os.access(user_hosts_path, os.R_OK):
526 f = open(user_hosts_path, "r")
527 tmp_known_hosts.write(f.read())
530 tmp_known_hosts.flush()
532 args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
534 return tmp_known_hosts
536 def popen_ssh_command(command, host, port, user, agent,
543 err_on_timeout = True):
545 Executes a remote commands, returns ((stdout,stderr),process)
548 print "ssh", host, command
550 tmp_known_hosts = None
552 # Don't bother with localhost. Makes test easier
553 '-o', 'NoHostAuthenticationForLocalhost=yes',
558 args.append('-p%d' % port)
560 args.extend(('-i', ident_key))
564 # Create a temporary server key file
565 tmp_known_hosts = _make_server_key_args(
566 server_key, host, port, args)
570 # connects to the remote host and starts a remote connection
571 proc = subprocess.Popen(args,
572 stdout = subprocess.PIPE,
573 stdin = subprocess.PIPE,
574 stderr = subprocess.PIPE)
576 # attach tempfile object to the process, to make sure the file stays
577 # alive until the process is finished with it
578 proc._known_hosts = tmp_known_hosts
581 out, err = _communicate(proc, stdin, timeout, err_on_timeout)
583 except RuntimeError,e:
587 print " timedout -> ", e.args
591 print " -> ", out, err
593 return ((out, err), proc)
595 def popen_scp(source, dest,
602 Copies from/to remote sites.
604 Source and destination should have the user and host encoded
607 If source is a file object, a special mode will be used to
608 create the remote file with the same contents.
610 If dest is a file object, the remote file (source) will be
611 read and written into dest.
613 In these modes, recursive cannot be True.
615 Source can be a list of files to copy to a single destination,
616 in which case it is advised that the destination be a folder.
620 print "scp", source, dest
622 if isinstance(source, file) and source.tell() == 0:
624 elif hasattr(source, 'read'):
625 tmp = tempfile.NamedTemporaryFile()
627 buf = source.read(65536)
635 if isinstance(source, file) or isinstance(dest, file) \
636 or hasattr(source, 'read') or hasattr(dest, 'write'):
639 # Parse source/destination as <user>@<server>:<path>
640 if isinstance(dest, basestring) and ':' in dest:
641 remspec, path = dest.split(':',1)
642 elif isinstance(source, basestring) and ':' in source:
643 remspec, path = source.split(':',1)
645 raise ValueError, "Both endpoints cannot be local"
646 user,host = remspec.rsplit('@',1)
647 tmp_known_hosts = None
649 args = ['ssh', '-l', user, '-C',
650 # Don't bother with localhost. Makes test easier
651 '-o', 'NoHostAuthenticationForLocalhost=yes',
654 args.append('-P%d' % port)
656 args.extend(('-i', ident_key))
658 # Create a temporary server key file
659 tmp_known_hosts = _make_server_key_args(
660 server_key, host, port, args)
662 if isinstance(source, file) or hasattr(source, 'read'):
663 args.append('cat > %s' % (shell_escape(path),))
664 elif isinstance(dest, file) or hasattr(dest, 'write'):
665 args.append('cat %s' % (shell_escape(path),))
667 raise AssertionError, "Unreachable code reached! :-Q"
669 # connects to the remote host and starts a remote connection
670 if isinstance(source, file):
671 proc = subprocess.Popen(args,
672 stdout = open('/dev/null','w'),
673 stderr = subprocess.PIPE,
675 err = proc.stderr.read()
676 proc._known_hosts = tmp_known_hosts
677 eintr_retry(proc.wait)()
678 return ((None,err), proc)
679 elif isinstance(dest, file):
680 proc = subprocess.Popen(args,
681 stdout = open('/dev/null','w'),
682 stderr = subprocess.PIPE,
684 err = proc.stderr.read()
685 proc._known_hosts = tmp_known_hosts
686 eintr_retry(proc.wait)()
687 return ((None,err), proc)
688 elif hasattr(source, 'read'):
689 # file-like (but not file) source
690 proc = subprocess.Popen(args,
691 stdout = open('/dev/null','w'),
692 stderr = subprocess.PIPE,
693 stdin = subprocess.PIPE)
699 buf = source.read(4096)
704 rdrdy, wrdy, broken = select.select(
707 [proc.stderr,proc.stdin])
709 if proc.stderr in rdrdy:
710 # use os.read for fully unbuffered behavior
711 err.append(os.read(proc.stderr.fileno(), 4096))
713 if proc.stdin in wrdy:
714 proc.stdin.write(buf)
720 err.append(proc.stderr.read())
722 proc._known_hosts = tmp_known_hosts
723 eintr_retry(proc.wait)()
724 return ((None,''.join(err)), proc)
725 elif hasattr(dest, 'write'):
726 # file-like (but not file) dest
727 proc = subprocess.Popen(args,
728 stdout = subprocess.PIPE,
729 stderr = subprocess.PIPE,
730 stdin = open('/dev/null','w'))
735 rdrdy, wrdy, broken = select.select(
736 [proc.stderr, proc.stdout],
738 [proc.stderr, proc.stdout])
740 if proc.stderr in rdrdy:
741 # use os.read for fully unbuffered behavior
742 err.append(os.read(proc.stderr.fileno(), 4096))
744 if proc.stdout in rdrdy:
745 # use os.read for fully unbuffered behavior
746 buf = os.read(proc.stdout.fileno(), 4096)
755 err.append(proc.stderr.read())
757 proc._known_hosts = tmp_known_hosts
758 eintr_retry(proc.wait)()
759 return ((None,''.join(err)), proc)
761 raise AssertionError, "Unreachable code reached! :-Q"
763 # Parse destination as <user>@<server>:<path>
764 if isinstance(dest, basestring) and ':' in dest:
765 remspec, path = dest.split(':',1)
766 elif isinstance(source, basestring) and ':' in source:
767 remspec, path = source.split(':',1)
769 raise ValueError, "Both endpoints cannot be local"
770 user,host = remspec.rsplit('@',1)
773 tmp_known_hosts = None
774 args = ['scp', '-q', '-p', '-C',
775 # Don't bother with localhost. Makes test easier
776 '-o', 'NoHostAuthenticationForLocalhost=yes' ]
778 args.append('-P%d' % port)
782 args.extend(('-i', ident_key))
784 # Create a temporary server key file
785 tmp_known_hosts = _make_server_key_args(
786 server_key, host, port, args)
787 if isinstance(source,list):
793 # connects to the remote host and starts a remote connection
794 proc = subprocess.Popen(args,
795 stdout = subprocess.PIPE,
796 stdin = subprocess.PIPE,
797 stderr = subprocess.PIPE)
798 proc._known_hosts = tmp_known_hosts
800 comm = proc.communicate()
801 eintr_retry(proc.wait)()
804 def decode_and_execute():
805 # The python code we want to execute might have characters that
806 # are not compatible with the 'inline' mode we are using. To avoid
807 # problems we receive the encoded python code in base64 as a input
808 # stream and decode it for execution.
813 cmd += os.read(0, 1)# one byte from stdin
815 if e.errno == errno.EINTR:
821 cmd = base64.b64decode(cmd)
822 # Uncomment for debug
823 #os.write(2, "Executing python code: %s\n" % cmd)
824 os.write(1, "OK\n") # send a sync message
827 def popen_python(python_code,
828 communication = DC.ACCESS_LOCAL,
838 environment_setup = ""):
845 python_path.replace("'", r"'\''")
846 cmd = """PYTHONPATH="$PYTHONPATH":'%s' """ % python_path
848 if environment_setup:
849 cmd += environment_setup
851 # Uncomment for debug (to run everything under strace)
852 # We had to verify if strace works (cannot nest them)
853 #cmd += "if strace echo >/dev/null 2>&1; then CMD='strace -ff -tt -s 200 -o strace.out'; else CMD=''; fi\n"
855 #cmd += "strace -f -tt -s 200 -o strace$$.out "
857 cmd += "python -c 'import sys; sys.path.append(%s); from nepi.util import server; server.decode_and_execute()'" % (
858 repr(os.path.dirname(os.path.dirname(nepi.__file__))).replace("'",'"'),
861 if communication == DC.ACCESS_SSH:
862 tmp_known_hosts = None
864 # Don't bother with localhost. Makes test easier
865 '-o', 'NoHostAuthenticationForLocalhost=yes',
870 args.append('-p%d' % port)
872 args.extend(('-i', ident_key))
876 # Create a temporary server key file
877 tmp_known_hosts = _make_server_key_args(
878 server_key, host, port, args)
884 # connects to the remote host and starts a remote
885 proc = subprocess.Popen(args,
887 stdout = subprocess.PIPE,
888 stdin = subprocess.PIPE,
889 stderr = subprocess.PIPE)
891 if communication == DC.ACCESS_SSH:
892 proc._known_hosts = tmp_known_hosts
894 # send the command to execute
895 os.write(proc.stdin.fileno(),
896 base64.b64encode(python_code) + "\n")
900 msg = os.read(proc.stdout.fileno(), 3)
903 if e.errno == errno.EINTR:
909 raise RuntimeError, "Failed to start remote python interpreter: \nout:\n%s%s\nerr:\n%s" % (
910 msg, proc.stdout.read(), proc.stderr.read())
915 def _communicate(self, input, timeout=None, err_on_timeout=True):
918 stdout = None # Return
919 stderr = None # Return
923 if timeout is not None:
924 timelimit = time.time() + timeout
925 killtime = timelimit + 4
926 bailtime = timelimit + 4
929 # Flush stdio buffer. This might block, if the user has
930 # been writing to .stdin in an uncontrolled fashion.
933 write_set.append(self.stdin)
937 read_set.append(self.stdout)
940 read_set.append(self.stderr)
944 while read_set or write_set:
945 if timeout is not None:
946 curtime = time.time()
947 if timeout is None or curtime > timelimit:
948 if curtime > bailtime:
950 elif curtime > killtime:
951 signum = signal.SIGKILL
953 signum = signal.SIGTERM
955 os.kill(self.pid, signum)
958 select_timeout = timelimit - curtime + 0.1
960 select_timeout = None
963 rlist, wlist, xlist = select.select(read_set, write_set, [], select_timeout)
964 except select.error,e:
970 if self.stdin in wlist:
971 # When select has indicated that the file is writable,
972 # we can write up to PIPE_BUF bytes without risk
973 # blocking. POSIX defines PIPE_BUF >= 512
974 bytes_written = os.write(self.stdin.fileno(), buffer(input, input_offset, 512))
975 input_offset += bytes_written
976 if input_offset >= len(input):
978 write_set.remove(self.stdin)
980 if self.stdout in rlist:
981 data = os.read(self.stdout.fileno(), 1024)
984 read_set.remove(self.stdout)
987 if self.stderr in rlist:
988 data = os.read(self.stderr.fileno(), 1024)
991 read_set.remove(self.stderr)
994 # All data exchanged. Translate lists into strings.
995 if stdout is not None:
996 stdout = ''.join(stdout)
997 if stderr is not None:
998 stderr = ''.join(stderr)
1000 # Translate newlines, if requested. We cannot let the file
1001 # object do the translation: It is based on stdio, which is
1002 # impossible to combine with select (unless forcing no
1004 if self.universal_newlines and hasattr(file, 'newlines'):
1006 stdout = self._translate_newlines(stdout)
1008 stderr = self._translate_newlines(stderr)
1010 if killed and err_on_timeout:
1011 errcode = self.poll()
1012 raise RuntimeError, ("Operation timed out", errcode, stdout, stderr)
1018 return (stdout, stderr)