2 # -*- coding: utf-8 -*-
4 from nepi.util.constants import DeploymentConfiguration as DC
26 CTRL_SOCK = "ctrl.sock"
28 STD_ERR = "stderr.log"
33 TRACE = os.environ.get("NEPI_TRACE", "false").lower() in ("true", "1", "on")
35 if hasattr(os, "devnull"):
38 DEV_NULL = "/dev/null"
40 SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
43 """ Escapes strings so that they are safe to use as command-line arguments """
44 if SHELL_SAFE.match(s):
45 # safe string - no escaping needed
48 # unsafe string - escape
50 if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",'"'):
53 return "'$'\\x%02x''" % (ord(c),)
54 s = ''.join(map(escp,s))
57 def eintr_retry(func):
59 @functools.wraps(func)
61 retry = kw.pop("_retry", False)
62 for i in xrange(0 if retry else 4):
65 except (select.error, socket.error), args:
66 if args[0] == errno.EINTR:
71 if e.errno == errno.EINTR:
80 def __init__(self, root_dir = ".", log_level = DC.ERROR_LEVEL,
81 environment_setup = "", clean_root = False):
82 self._root_dir = root_dir
83 self._clean_root = clean_root
85 self._ctrl_sock = None
86 self._log_level = log_level
88 self._environment_setup = environment_setup
97 # can not return normally after fork beacuse no exec was done.
98 # This means that if we don't do a os._exit(0) here the code that
99 # follows the call to "Server.run()" in the "caller code" will be
100 # executed... but by now it has already been executed after the
101 # first process (the one that did the first fork) returned.
104 print >>sys.stderr, "SERVER_ERROR."
108 print >>sys.stderr, "SERVER_READY."
111 # pipes for process synchronization
115 root = os.path.normpath(self._root_dir)
116 if self._root_dir not in [".", ""] and os.path.exists(root) \
117 and self._clean_root:
119 if not os.path.exists(root):
120 os.makedirs(root, 0755)
128 except OSError, e: # pragma: no cover
129 if e.errno == errno.EINTR:
135 # os.waitpid avoids leaving a <defunc> (zombie) process
136 st = os.waitpid(pid1, 0)[1]
138 raise RuntimeError("Daemonization failed")
139 # return 0 to inform the caller method that this is not the
144 # Decouple from parent environment.
145 os.chdir(self._root_dir)
152 # see ref: "os._exit(0)"
155 # close all open file descriptors.
156 max_fd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
157 if (max_fd == resource.RLIM_INFINITY):
159 for fd in range(3, max_fd):
166 # Redirect standard file descriptors.
167 stdin = open(DEV_NULL, "r")
168 stderr = stdout = open(STD_ERR, "a", 0)
169 os.dup2(stdin.fileno(), sys.stdin.fileno())
170 # NOTE: sys.stdout.write will still be buffered, even if the file
171 # was opened with 0 buffer
172 os.dup2(stdout.fileno(), sys.stdout.fileno())
173 os.dup2(stderr.fileno(), sys.stderr.fileno())
176 if self._environment_setup:
177 # parse environment variables and pass to child process
178 # do it by executing shell commands, in case there's some heavy setup involved
179 envproc = subprocess.Popen(
181 "( %s python -c 'import os,sys ; print \"\\x01\".join(\"\\x02\".join(map(str,x)) for x in os.environ.iteritems())' ) | tail -1" %
182 ( self._environment_setup, ) ],
183 stdin = subprocess.PIPE,
184 stdout = subprocess.PIPE,
185 stderr = subprocess.PIPE
187 out,err = envproc.communicate()
189 # parse new environment
191 environment = dict(map(lambda x:x.split("\x02"), out.split("\x01")))
193 # apply to current environment
194 for name, value in environment.iteritems():
195 os.environ[name] = value
198 if 'PYTHONPATH' in environment:
199 sys.path = environment['PYTHONPATH'].split(':') + sys.path
201 # create control socket
202 self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
204 self._ctrl_sock.bind(CTRL_SOCK)
206 # Address in use, check pidfile
209 pidfile = open(CTRL_PID, "r")
218 # Check process liveliness
219 if not os.path.exists("/proc/%d" % (pid,)):
220 # Ok, it's dead, clean the socket
224 self._ctrl_sock.bind(CTRL_SOCK)
226 self._ctrl_sock.listen(0)
229 pidfile = open(CTRL_PID, "w")
230 pidfile.write(str(os.getpid()))
233 # let the parent process know that the daemonization is finished
238 def post_daemonize(self):
239 os.environ["NEPI_CONTROLLER_LOGLEVEL"] = self._log_level
240 # QT, for some strange reason, redefines the SIGCHILD handler to write
241 # a \0 to a fd (lets say fileno 'x'), when ever a SIGCHILD is received.
242 # Server dameonization closes all file descriptors from fileno '3',
243 # but the overloaded handler (inherited by the forked process) will
244 # keep trying to write the \0 to fileno 'x', which might have been reused
245 # after closing, for other operations. This is bad bad bad when fileno 'x'
246 # is in use for communication pouroses, because unexpected \0 start
247 # appearing in the communication messages... this is exactly what happens
248 # when using netns in daemonized form. Thus, be have no other alternative than
249 # restoring the SIGCHLD handler to the default here.
251 signal.signal(signal.SIGCHLD, signal.SIG_DFL)
254 while not self._stop:
255 conn, addr = self._ctrl_sock.accept()
256 self.log_error("ACCEPTED CONNECTION: %s" % (addr,))
258 while not self._stop:
260 msg = self.recv_msg(conn)
261 except socket.timeout, e:
262 #self.log_error("SERVER recv_msg: connection timedout ")
266 self.log_error("CONNECTION LOST")
271 reply = self.stop_action()
273 reply = self.reply_action(msg)
276 self.send_reply(conn, reply)
279 self.log_error("NOTICE: Awaiting for reconnection")
287 def recv_msg(self, conn):
290 while '\n' not in chunk:
292 chunk = conn.recv(1024)
293 except (OSError, socket.error), e:
294 if e[0] != errno.EINTR:
303 data = ''.join(data).split('\n',1)
306 data, self._rdbuf = data
308 decoded = base64.b64decode(data)
309 return decoded.rstrip()
311 def send_reply(self, conn, reply):
312 encoded = base64.b64encode(reply)
313 conn.send("%s\n" % encoded)
317 self._ctrl_sock.close()
322 def stop_action(self):
323 return "Stopping server"
325 def reply_action(self, msg):
326 return "Reply to: %s" % msg
328 def log_error(self, text = None, context = ''):
330 text = traceback.format_exc()
331 date = time.strftime("%Y-%m-%d %H:%M:%S")
333 context = " (%s)" % (context,)
334 sys.stderr.write("ERROR%s: %s\n%s\n" % (context, date, text))
337 def log_debug(self, text):
338 if self._log_level == DC.DEBUG_LEVEL:
339 date = time.strftime("%Y-%m-%d %H:%M:%S")
340 sys.stderr.write("DEBUG: %s\n%s\n" % (date, text))
342 class Forwarder(object):
343 def __init__(self, root_dir = "."):
344 self._ctrl_sock = None
345 self._root_dir = root_dir
351 print >>sys.stderr, "FORWARDER_READY."
352 while not self._stop:
353 data = self.read_data()
355 # Connection to client lost
357 self.send_to_server(data)
359 data = self.recv_from_server()
361 # Connection to server lost
362 raise IOError, "Connection to server lost while "\
364 self.write_data(data)
368 return sys.stdin.readline()
370 def write_data(self, data):
371 sys.stdout.write(data)
372 # sys.stdout.write is buffered, this is why we need to do a flush()
375 def send_to_server(self, data):
377 self._ctrl_sock.send(data)
378 except (IOError, socket.error), e:
379 if e[0] == errno.EPIPE:
381 self._ctrl_sock.send(data)
384 encoded = data.rstrip()
385 msg = base64.b64decode(encoded)
389 def recv_from_server(self):
392 while '\n' not in chunk:
394 chunk = self._ctrl_sock.recv(1024)
395 except (OSError, socket.error), e:
396 if e[0] != errno.EINTR:
404 data = ''.join(data).split('\n',1)
407 data, self._rdbuf = data
413 self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
414 sock_addr = os.path.join(self._root_dir, CTRL_SOCK)
415 self._ctrl_sock.connect(sock_addr)
417 def disconnect(self):
419 self._ctrl_sock.close()
423 class Client(object):
424 def __init__(self, root_dir = ".", host = None, port = None, user = None,
425 agent = None, sudo = False, communication = DC.ACCESS_LOCAL,
426 environment_setup = ""):
427 self.root_dir = root_dir
428 self.addr = (host, port)
432 self.communication = communication
433 self.environment_setup = environment_setup
434 self._stopped = False
435 self._deferreds = collections.deque()
439 if self._process.poll() is None:
440 os.kill(self._process.pid, signal.SIGTERM)
444 root_dir = self.root_dir
445 (host, port) = self.addr
449 communication = self.communication
451 python_code = "from nepi.util import server;c=server.Forwarder(%r);\
452 c.forward()" % (root_dir,)
454 self._process = popen_python(python_code,
455 communication = communication,
461 environment_setup = self.environment_setup)
463 # Wait for the forwarder to be ready, otherwise nobody
464 # will be able to connect to it
468 helo = self._process.stderr.readline()
469 if helo == 'FORWARDER_READY.\n':
473 raise AssertionError, "Expected 'FORWARDER_READY.', got: %s" % (''.join(err),)
475 def send_msg(self, msg):
476 encoded = base64.b64encode(msg)
477 data = "%s\n" % encoded
480 self._process.stdin.write(data)
481 except (IOError, ValueError):
482 # dead process, poll it to un-zombify
485 # try again after reconnect
486 # If it fails again, though, give up
488 self._process.stdin.write(data)
491 self.send_msg(STOP_MSG)
494 def defer_reply(self, transform=None):
496 self._deferreds.append(defer_entry)
498 functools.partial(self.read_reply, defer_entry, transform)
501 def _read_reply(self):
502 data = self._process.stdout.readline()
503 encoded = data.rstrip()
505 # empty == eof == dead process, poll it to un-zombify
508 raise RuntimeError, "Forwarder died while awaiting reply: %s" % (self._process.stderr.read(),)
509 return base64.b64decode(encoded)
511 def read_reply(self, which=None, transform=None):
512 # Test to see if someone did it already
513 if which is not None and len(which):
515 # ...just return the deferred value
517 return transform(which[0])
521 # Process all deferreds until the one we're looking for
522 # or until the queue is empty
523 while self._deferreds:
525 deferred = self._deferreds.popleft()
530 deferred.append(self._read_reply())
531 if deferred is which:
532 # We reached the one we were looking for
534 return transform(deferred[0])
539 # They've requested a synchronous read
541 return transform(self._read_reply())
543 return self._read_reply()
545 def _make_server_key_args(server_key, host, port, args):
547 Returns a reference to the created temporary file, and adds the
548 corresponding arguments to the given argument list.
550 Make sure to hold onto it until the process is done with the file
553 host = '%s:%s' % (host,port)
554 # Create a temporary server key file
555 tmp_known_hosts = tempfile.NamedTemporaryFile()
557 # Add the intended host key
558 tmp_known_hosts.write('%s,%s %s\n' % (host, socket.gethostbyname(host), server_key))
560 # If we're not in strict mode, add user-configured keys
561 if os.environ.get('NEPI_STRICT_AUTH_MODE',"").lower() not in ('1','true','on'):
562 user_hosts_path = '%s/.ssh/known_hosts' % (os.environ.get('HOME',""),)
563 if os.access(user_hosts_path, os.R_OK):
564 f = open(user_hosts_path, "r")
565 tmp_known_hosts.write(f.read())
568 tmp_known_hosts.flush()
570 args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
572 return tmp_known_hosts
574 def popen_ssh_command(command, host, port, user, agent,
581 err_on_timeout = True,
582 connect_timeout = 30):
584 Executes a remote commands, returns ((stdout,stderr),process)
587 print "ssh", host, command
589 tmp_known_hosts = None
591 # Don't bother with localhost. Makes test easier
592 '-o', 'NoHostAuthenticationForLocalhost=yes',
593 '-o', 'ConnectTimeout=%d' % (int(connect_timeout),),
598 args.append('-p%d' % port)
600 args.extend(('-i', ident_key))
604 # Create a temporary server key file
605 tmp_known_hosts = _make_server_key_args(
606 server_key, host, port, args)
609 for x in xrange(retry or 3):
610 # connects to the remote host and starts a remote connection
611 proc = subprocess.Popen(args,
612 stdout = subprocess.PIPE,
613 stdin = subprocess.PIPE,
614 stderr = subprocess.PIPE)
616 # attach tempfile object to the process, to make sure the file stays
617 # alive until the process is finished with it
618 proc._known_hosts = tmp_known_hosts
621 out, err = _communicate(proc, stdin, timeout, err_on_timeout)
622 if proc.poll() and err.strip().startswith('ssh: '):
623 # SSH error, can safely retry
626 except RuntimeError,e:
630 print " timedout -> ", e.args
634 print " -> ", out, err
636 return ((out, err), proc)
638 def popen_scp(source, dest,
645 Copies from/to remote sites.
647 Source and destination should have the user and host encoded
650 If source is a file object, a special mode will be used to
651 create the remote file with the same contents.
653 If dest is a file object, the remote file (source) will be
654 read and written into dest.
656 In these modes, recursive cannot be True.
658 Source can be a list of files to copy to a single destination,
659 in which case it is advised that the destination be a folder.
663 print "scp", source, dest
665 if isinstance(source, file) and source.tell() == 0:
667 elif hasattr(source, 'read'):
668 tmp = tempfile.NamedTemporaryFile()
670 buf = source.read(65536)
678 if isinstance(source, file) or isinstance(dest, file) \
679 or hasattr(source, 'read') or hasattr(dest, 'write'):
682 # Parse source/destination as <user>@<server>:<path>
683 if isinstance(dest, basestring) and ':' in dest:
684 remspec, path = dest.split(':',1)
685 elif isinstance(source, basestring) and ':' in source:
686 remspec, path = source.split(':',1)
688 raise ValueError, "Both endpoints cannot be local"
689 user,host = remspec.rsplit('@',1)
690 tmp_known_hosts = None
692 args = ['ssh', '-l', user, '-C',
693 # Don't bother with localhost. Makes test easier
694 '-o', 'NoHostAuthenticationForLocalhost=yes',
697 args.append('-P%d' % port)
699 args.extend(('-i', ident_key))
701 # Create a temporary server key file
702 tmp_known_hosts = _make_server_key_args(
703 server_key, host, port, args)
705 if isinstance(source, file) or hasattr(source, 'read'):
706 args.append('cat > %s' % (shell_escape(path),))
707 elif isinstance(dest, file) or hasattr(dest, 'write'):
708 args.append('cat %s' % (shell_escape(path),))
710 raise AssertionError, "Unreachable code reached! :-Q"
712 # connects to the remote host and starts a remote connection
713 if isinstance(source, file):
714 proc = subprocess.Popen(args,
715 stdout = open('/dev/null','w'),
716 stderr = subprocess.PIPE,
718 err = proc.stderr.read()
719 proc._known_hosts = tmp_known_hosts
720 eintr_retry(proc.wait)()
721 return ((None,err), proc)
722 elif isinstance(dest, file):
723 proc = subprocess.Popen(args,
724 stdout = open('/dev/null','w'),
725 stderr = subprocess.PIPE,
727 err = proc.stderr.read()
728 proc._known_hosts = tmp_known_hosts
729 eintr_retry(proc.wait)()
730 return ((None,err), proc)
731 elif hasattr(source, 'read'):
732 # file-like (but not file) source
733 proc = subprocess.Popen(args,
734 stdout = open('/dev/null','w'),
735 stderr = subprocess.PIPE,
736 stdin = subprocess.PIPE)
742 buf = source.read(4096)
747 rdrdy, wrdy, broken = select.select(
750 [proc.stderr,proc.stdin])
752 if proc.stderr in rdrdy:
753 # use os.read for fully unbuffered behavior
754 err.append(os.read(proc.stderr.fileno(), 4096))
756 if proc.stdin in wrdy:
757 proc.stdin.write(buf)
763 err.append(proc.stderr.read())
765 proc._known_hosts = tmp_known_hosts
766 eintr_retry(proc.wait)()
767 return ((None,''.join(err)), proc)
768 elif hasattr(dest, 'write'):
769 # file-like (but not file) dest
770 proc = subprocess.Popen(args,
771 stdout = subprocess.PIPE,
772 stderr = subprocess.PIPE,
773 stdin = open('/dev/null','w'))
778 rdrdy, wrdy, broken = select.select(
779 [proc.stderr, proc.stdout],
781 [proc.stderr, proc.stdout])
783 if proc.stderr in rdrdy:
784 # use os.read for fully unbuffered behavior
785 err.append(os.read(proc.stderr.fileno(), 4096))
787 if proc.stdout in rdrdy:
788 # use os.read for fully unbuffered behavior
789 buf = os.read(proc.stdout.fileno(), 4096)
798 err.append(proc.stderr.read())
800 proc._known_hosts = tmp_known_hosts
801 eintr_retry(proc.wait)()
802 return ((None,''.join(err)), proc)
804 raise AssertionError, "Unreachable code reached! :-Q"
806 # Parse destination as <user>@<server>:<path>
807 if isinstance(dest, basestring) and ':' in dest:
808 remspec, path = dest.split(':',1)
809 elif isinstance(source, basestring) and ':' in source:
810 remspec, path = source.split(':',1)
812 raise ValueError, "Both endpoints cannot be local"
813 user,host = remspec.rsplit('@',1)
816 tmp_known_hosts = None
817 args = ['scp', '-q', '-p', '-C',
818 # Don't bother with localhost. Makes test easier
819 '-o', 'NoHostAuthenticationForLocalhost=yes' ]
821 args.append('-P%d' % port)
825 args.extend(('-i', ident_key))
827 # Create a temporary server key file
828 tmp_known_hosts = _make_server_key_args(
829 server_key, host, port, args)
830 if isinstance(source,list):
836 # connects to the remote host and starts a remote connection
837 proc = subprocess.Popen(args,
838 stdout = subprocess.PIPE,
839 stdin = subprocess.PIPE,
840 stderr = subprocess.PIPE)
841 proc._known_hosts = tmp_known_hosts
843 comm = proc.communicate()
844 eintr_retry(proc.wait)()
847 def decode_and_execute():
848 # The python code we want to execute might have characters that
849 # are not compatible with the 'inline' mode we are using. To avoid
850 # problems we receive the encoded python code in base64 as a input
851 # stream and decode it for execution.
856 cmd += os.read(0, 1)# one byte from stdin
858 if e.errno == errno.EINTR:
864 cmd = base64.b64decode(cmd)
865 # Uncomment for debug
866 #os.write(2, "Executing python code: %s\n" % cmd)
867 os.write(1, "OK\n") # send a sync message
870 def popen_python(python_code,
871 communication = DC.ACCESS_LOCAL,
881 environment_setup = ""):
885 python_path.replace("'", r"'\''")
886 cmd = """PYTHONPATH="$PYTHONPATH":'%s' """ % python_path
888 if environment_setup:
889 cmd += environment_setup
891 # Uncomment for debug (to run everything under strace)
892 # We had to verify if strace works (cannot nest them)
893 #cmd += "if strace echo >/dev/null 2>&1; then CMD='strace -ff -tt -s 200 -o strace.out'; else CMD=''; fi\n"
895 #cmd += "strace -f -tt -s 200 -o strace$$.out "
897 cmd += "python -c 'import sys; sys.path.insert(0,%s); from nepi.util import server; server.decode_and_execute()'" % (
898 repr(os.path.dirname(os.path.dirname(nepi.__file__))).replace("'",'"'),
903 cmd = "sudo bash -c " + shell_escape(cmd)
907 if communication == DC.ACCESS_SSH:
908 tmp_known_hosts = None
910 # Don't bother with localhost. Makes test easier
911 '-o', 'NoHostAuthenticationForLocalhost=yes',
916 args.append('-p%d' % port)
918 args.extend(('-i', ident_key))
922 # Create a temporary server key file
923 tmp_known_hosts = _make_server_key_args(
924 server_key, host, port, args)
927 args = [ "/bin/bash", "-c", cmd ]
929 # connects to the remote host and starts a remote
930 proc = subprocess.Popen(args,
932 stdout = subprocess.PIPE,
933 stdin = subprocess.PIPE,
934 stderr = subprocess.PIPE)
936 if communication == DC.ACCESS_SSH:
937 proc._known_hosts = tmp_known_hosts
939 # send the command to execute
940 os.write(proc.stdin.fileno(),
941 base64.b64encode(python_code) + "\n")
945 msg = os.read(proc.stdout.fileno(), 3)
948 if e.errno == errno.EINTR:
954 raise RuntimeError, "Failed to start remote python interpreter: \nout:\n%s%s\nerr:\n%s" % (
955 msg, proc.stdout.read(), proc.stderr.read())
960 def _communicate(self, input, timeout=None, err_on_timeout=True):
963 stdout = None # Return
964 stderr = None # Return
968 if timeout is not None:
969 timelimit = time.time() + timeout
970 killtime = timelimit + 4
971 bailtime = timelimit + 4
974 # Flush stdio buffer. This might block, if the user has
975 # been writing to .stdin in an uncontrolled fashion.
978 write_set.append(self.stdin)
982 read_set.append(self.stdout)
985 read_set.append(self.stderr)
989 while read_set or write_set:
990 if timeout is not None:
991 curtime = time.time()
992 if timeout is None or curtime > timelimit:
993 if curtime > bailtime:
995 elif curtime > killtime:
996 signum = signal.SIGKILL
998 signum = signal.SIGTERM
1000 os.kill(self.pid, signum)
1001 select_timeout = 0.5
1003 select_timeout = timelimit - curtime + 0.1
1005 select_timeout = None
1008 rlist, wlist, xlist = select.select(read_set, write_set, [], select_timeout)
1009 except select.error,e:
1015 if self.stdin in wlist:
1016 # When select has indicated that the file is writable,
1017 # we can write up to PIPE_BUF bytes without risk
1018 # blocking. POSIX defines PIPE_BUF >= 512
1019 bytes_written = os.write(self.stdin.fileno(), buffer(input, input_offset, 512))
1020 input_offset += bytes_written
1021 if input_offset >= len(input):
1023 write_set.remove(self.stdin)
1025 if self.stdout in rlist:
1026 data = os.read(self.stdout.fileno(), 1024)
1029 read_set.remove(self.stdout)
1032 if self.stderr in rlist:
1033 data = os.read(self.stderr.fileno(), 1024)
1036 read_set.remove(self.stderr)
1039 # All data exchanged. Translate lists into strings.
1040 if stdout is not None:
1041 stdout = ''.join(stdout)
1042 if stderr is not None:
1043 stderr = ''.join(stderr)
1045 # Translate newlines, if requested. We cannot let the file
1046 # object do the translation: It is based on stdio, which is
1047 # impossible to combine with select (unless forcing no
1049 if self.universal_newlines and hasattr(file, 'newlines'):
1051 stdout = self._translate_newlines(stdout)
1053 stderr = self._translate_newlines(stderr)
1055 if killed and err_on_timeout:
1056 errcode = self.poll()
1057 raise RuntimeError, ("Operation timed out", errcode, stdout, stderr)
1063 return (stdout, stderr)