2 # -*- coding: utf-8 -*-
4 from nepi.util.constants import DeploymentConfiguration as DC
26 CTRL_SOCK = "ctrl.sock"
28 STD_ERR = "stderr.log"
33 TRACE = os.environ.get("NEPI_TRACE", "false").lower() in ("true", "1", "on")
35 if hasattr(os, "devnull"):
38 DEV_NULL = "/dev/null"
40 SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
43 """ Escapes strings so that they are safe to use as command-line arguments """
44 if SHELL_SAFE.match(s):
45 # safe string - no escaping needed
48 # unsafe string - escape
50 if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",'"'):
53 return "'$'\\x%02x''" % (ord(c),)
54 s = ''.join(map(escp,s))
57 def eintr_retry(func):
59 @functools.wraps(func)
61 retry = kw.pop("_retry", False)
62 for i in xrange(0 if retry else 4):
65 except (select.error, socket.error), args:
66 if args[0] == errno.EINTR:
71 if e.errno == errno.EINTR:
80 def __init__(self, root_dir = ".", log_level = DC.ERROR_LEVEL,
81 environment_setup = "", clean_root = False):
82 self._root_dir = root_dir
83 self._clean_root = clean_root
85 self._ctrl_sock = None
86 self._log_level = log_level
88 self._environment_setup = environment_setup
97 # can not return normally after fork beacuse no exec was done.
98 # This means that if we don't do a os._exit(0) here the code that
99 # follows the call to "Server.run()" in the "caller code" will be
100 # executed... but by now it has already been executed after the
101 # first process (the one that did the first fork) returned.
104 print >>sys.stderr, "SERVER_ERROR."
108 print >>sys.stderr, "SERVER_READY."
111 # pipes for process synchronization
115 root = os.path.normpath(self._root_dir)
116 if os.path.exists(root) and self._clean_root:
118 if not os.path.exists(root):
119 os.makedirs(root, 0755)
127 except OSError, e: # pragma: no cover
128 if e.errno == errno.EINTR:
134 # os.waitpid avoids leaving a <defunc> (zombie) process
135 st = os.waitpid(pid1, 0)[1]
137 raise RuntimeError("Daemonization failed")
138 # return 0 to inform the caller method that this is not the
143 # Decouple from parent environment.
144 os.chdir(self._root_dir)
151 # see ref: "os._exit(0)"
154 # close all open file descriptors.
155 max_fd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
156 if (max_fd == resource.RLIM_INFINITY):
158 for fd in range(3, max_fd):
165 # Redirect standard file descriptors.
166 stdin = open(DEV_NULL, "r")
167 stderr = stdout = open(STD_ERR, "a", 0)
168 os.dup2(stdin.fileno(), sys.stdin.fileno())
169 # NOTE: sys.stdout.write will still be buffered, even if the file
170 # was opened with 0 buffer
171 os.dup2(stdout.fileno(), sys.stdout.fileno())
172 os.dup2(stderr.fileno(), sys.stderr.fileno())
175 if self._environment_setup:
176 # parse environment variables and pass to child process
177 # do it by executing shell commands, in case there's some heavy setup involved
178 envproc = subprocess.Popen(
180 "( %s python -c 'import os,sys ; print \"\\x01\".join(\"\\x02\".join(map(str,x)) for x in os.environ.iteritems())' ) | tail -1" %
181 ( self._environment_setup, ) ],
182 stdin = subprocess.PIPE,
183 stdout = subprocess.PIPE,
184 stderr = subprocess.PIPE
186 out,err = envproc.communicate()
188 # parse new environment
190 environment = dict(map(lambda x:x.split("\x02"), out.split("\x01")))
192 # apply to current environment
193 for name, value in environment.iteritems():
194 os.environ[name] = value
197 if 'PYTHONPATH' in environment:
198 sys.path = environment['PYTHONPATH'].split(':') + sys.path
200 # create control socket
201 self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
203 self._ctrl_sock.bind(CTRL_SOCK)
205 # Address in use, check pidfile
208 pidfile = open(CTRL_PID, "r")
217 # Check process liveliness
218 if not os.path.exists("/proc/%d" % (pid,)):
219 # Ok, it's dead, clean the socket
223 self._ctrl_sock.bind(CTRL_SOCK)
225 self._ctrl_sock.listen(0)
228 pidfile = open(CTRL_PID, "w")
229 pidfile.write(str(os.getpid()))
232 # let the parent process know that the daemonization is finished
237 def post_daemonize(self):
238 os.environ["NEPI_CONTROLLER_LOGLEVEL"] = self._log_level
239 # QT, for some strange reason, redefines the SIGCHILD handler to write
240 # a \0 to a fd (lets say fileno 'x'), when ever a SIGCHILD is received.
241 # Server dameonization closes all file descriptors from fileno '3',
242 # but the overloaded handler (inherited by the forked process) will
243 # keep trying to write the \0 to fileno 'x', which might have been reused
244 # after closing, for other operations. This is bad bad bad when fileno 'x'
245 # is in use for communication pouroses, because unexpected \0 start
246 # appearing in the communication messages... this is exactly what happens
247 # when using netns in daemonized form. Thus, be have no other alternative than
248 # restoring the SIGCHLD handler to the default here.
250 signal.signal(signal.SIGCHLD, signal.SIG_DFL)
253 while not self._stop:
254 conn, addr = self._ctrl_sock.accept()
255 self.log_error("ACCEPTED CONNECTION: %s" % (addr,))
257 while not self._stop:
259 msg = self.recv_msg(conn)
260 except socket.timeout, e:
261 #self.log_error("SERVER recv_msg: connection timedout ")
265 self.log_error("CONNECTION LOST")
270 reply = self.stop_action()
272 reply = self.reply_action(msg)
275 self.send_reply(conn, reply)
278 self.log_error("NOTICE: Awaiting for reconnection")
286 def recv_msg(self, conn):
289 while '\n' not in chunk:
291 chunk = conn.recv(1024)
292 except (OSError, socket.error), e:
293 if e[0] != errno.EINTR:
302 data = ''.join(data).split('\n',1)
305 data, self._rdbuf = data
307 decoded = base64.b64decode(data)
308 return decoded.rstrip()
310 def send_reply(self, conn, reply):
311 encoded = base64.b64encode(reply)
312 conn.send("%s\n" % encoded)
316 self._ctrl_sock.close()
321 def stop_action(self):
322 return "Stopping server"
324 def reply_action(self, msg):
325 return "Reply to: %s" % msg
327 def log_error(self, text = None, context = ''):
329 text = traceback.format_exc()
330 date = time.strftime("%Y-%m-%d %H:%M:%S")
332 context = " (%s)" % (context,)
333 sys.stderr.write("ERROR%s: %s\n%s\n" % (context, date, text))
336 def log_debug(self, text):
337 if self._log_level == DC.DEBUG_LEVEL:
338 date = time.strftime("%Y-%m-%d %H:%M:%S")
339 sys.stderr.write("DEBUG: %s\n%s\n" % (date, text))
341 class Forwarder(object):
342 def __init__(self, root_dir = "."):
343 self._ctrl_sock = None
344 self._root_dir = root_dir
350 print >>sys.stderr, "FORWARDER_READY."
351 while not self._stop:
352 data = self.read_data()
354 # Connection to client lost
356 self.send_to_server(data)
358 data = self.recv_from_server()
360 # Connection to server lost
361 raise IOError, "Connection to server lost while "\
363 self.write_data(data)
367 return sys.stdin.readline()
369 def write_data(self, data):
370 sys.stdout.write(data)
371 # sys.stdout.write is buffered, this is why we need to do a flush()
374 def send_to_server(self, data):
376 self._ctrl_sock.send(data)
377 except (IOError, socket.error), e:
378 if e[0] == errno.EPIPE:
380 self._ctrl_sock.send(data)
383 encoded = data.rstrip()
384 msg = base64.b64decode(encoded)
388 def recv_from_server(self):
391 while '\n' not in chunk:
393 chunk = self._ctrl_sock.recv(1024)
394 except (OSError, socket.error), e:
395 if e[0] != errno.EINTR:
403 data = ''.join(data).split('\n',1)
406 data, self._rdbuf = data
412 self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
413 sock_addr = os.path.join(self._root_dir, CTRL_SOCK)
414 self._ctrl_sock.connect(sock_addr)
416 def disconnect(self):
418 self._ctrl_sock.close()
422 class Client(object):
423 def __init__(self, root_dir = ".", host = None, port = None, user = None,
424 agent = None, sudo = False, communication = DC.ACCESS_LOCAL,
425 environment_setup = ""):
426 self.root_dir = root_dir
427 self.addr = (host, port)
431 self.communication = communication
432 self.environment_setup = environment_setup
433 self._stopped = False
434 self._deferreds = collections.deque()
438 if self._process.poll() is None:
439 os.kill(self._process.pid, signal.SIGTERM)
443 root_dir = self.root_dir
444 (host, port) = self.addr
448 communication = self.communication
450 python_code = "from nepi.util import server;c=server.Forwarder(%r);\
451 c.forward()" % (root_dir,)
453 self._process = popen_python(python_code,
454 communication = communication,
460 environment_setup = self.environment_setup)
462 # Wait for the forwarder to be ready, otherwise nobody
463 # will be able to connect to it
467 helo = self._process.stderr.readline()
468 if helo == 'FORWARDER_READY.\n':
472 raise AssertionError, "Expected 'FORWARDER_READY.', got: %s" % (''.join(err),)
474 def send_msg(self, msg):
475 encoded = base64.b64encode(msg)
476 data = "%s\n" % encoded
479 self._process.stdin.write(data)
480 except (IOError, ValueError):
481 # dead process, poll it to un-zombify
484 # try again after reconnect
485 # If it fails again, though, give up
487 self._process.stdin.write(data)
490 self.send_msg(STOP_MSG)
493 def defer_reply(self, transform=None):
495 self._deferreds.append(defer_entry)
497 functools.partial(self.read_reply, defer_entry, transform)
500 def _read_reply(self):
501 data = self._process.stdout.readline()
502 encoded = data.rstrip()
504 # empty == eof == dead process, poll it to un-zombify
507 raise RuntimeError, "Forwarder died while awaiting reply: %s" % (self._process.stderr.read(),)
508 return base64.b64decode(encoded)
510 def read_reply(self, which=None, transform=None):
511 # Test to see if someone did it already
512 if which is not None and len(which):
514 # ...just return the deferred value
516 return transform(which[0])
520 # Process all deferreds until the one we're looking for
521 # or until the queue is empty
522 while self._deferreds:
524 deferred = self._deferreds.popleft()
529 deferred.append(self._read_reply())
530 if deferred is which:
531 # We reached the one we were looking for
533 return transform(deferred[0])
538 # They've requested a synchronous read
540 return transform(self._read_reply())
542 return self._read_reply()
544 def _make_server_key_args(server_key, host, port, args):
546 Returns a reference to the created temporary file, and adds the
547 corresponding arguments to the given argument list.
549 Make sure to hold onto it until the process is done with the file
552 host = '%s:%s' % (host,port)
553 # Create a temporary server key file
554 tmp_known_hosts = tempfile.NamedTemporaryFile()
556 # Add the intended host key
557 tmp_known_hosts.write('%s,%s %s\n' % (host, socket.gethostbyname(host), server_key))
559 # If we're not in strict mode, add user-configured keys
560 if os.environ.get('NEPI_STRICT_AUTH_MODE',"").lower() not in ('1','true','on'):
561 user_hosts_path = '%s/.ssh/known_hosts' % (os.environ.get('HOME',""),)
562 if os.access(user_hosts_path, os.R_OK):
563 f = open(user_hosts_path, "r")
564 tmp_known_hosts.write(f.read())
567 tmp_known_hosts.flush()
569 args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
571 return tmp_known_hosts
573 def popen_ssh_command(command, host, port, user, agent,
580 err_on_timeout = True):
582 Executes a remote commands, returns ((stdout,stderr),process)
585 print "ssh", host, command
587 tmp_known_hosts = None
589 # Don't bother with localhost. Makes test easier
590 '-o', 'NoHostAuthenticationForLocalhost=yes',
595 args.append('-p%d' % port)
597 args.extend(('-i', ident_key))
601 # Create a temporary server key file
602 tmp_known_hosts = _make_server_key_args(
603 server_key, host, port, args)
607 # connects to the remote host and starts a remote connection
608 proc = subprocess.Popen(args,
609 stdout = subprocess.PIPE,
610 stdin = subprocess.PIPE,
611 stderr = subprocess.PIPE)
613 # attach tempfile object to the process, to make sure the file stays
614 # alive until the process is finished with it
615 proc._known_hosts = tmp_known_hosts
618 out, err = _communicate(proc, stdin, timeout, err_on_timeout)
620 except RuntimeError,e:
624 print " timedout -> ", e.args
628 print " -> ", out, err
630 return ((out, err), proc)
632 def popen_scp(source, dest,
639 Copies from/to remote sites.
641 Source and destination should have the user and host encoded
644 If source is a file object, a special mode will be used to
645 create the remote file with the same contents.
647 If dest is a file object, the remote file (source) will be
648 read and written into dest.
650 In these modes, recursive cannot be True.
652 Source can be a list of files to copy to a single destination,
653 in which case it is advised that the destination be a folder.
657 print "scp", source, dest
659 if isinstance(source, file) and source.tell() == 0:
661 elif hasattr(source, 'read'):
662 tmp = tempfile.NamedTemporaryFile()
664 buf = source.read(65536)
672 if isinstance(source, file) or isinstance(dest, file) \
673 or hasattr(source, 'read') or hasattr(dest, 'write'):
676 # Parse source/destination as <user>@<server>:<path>
677 if isinstance(dest, basestring) and ':' in dest:
678 remspec, path = dest.split(':',1)
679 elif isinstance(source, basestring) and ':' in source:
680 remspec, path = source.split(':',1)
682 raise ValueError, "Both endpoints cannot be local"
683 user,host = remspec.rsplit('@',1)
684 tmp_known_hosts = None
686 args = ['ssh', '-l', user, '-C',
687 # Don't bother with localhost. Makes test easier
688 '-o', 'NoHostAuthenticationForLocalhost=yes',
691 args.append('-P%d' % port)
693 args.extend(('-i', ident_key))
695 # Create a temporary server key file
696 tmp_known_hosts = _make_server_key_args(
697 server_key, host, port, args)
699 if isinstance(source, file) or hasattr(source, 'read'):
700 args.append('cat > %s' % (shell_escape(path),))
701 elif isinstance(dest, file) or hasattr(dest, 'write'):
702 args.append('cat %s' % (shell_escape(path),))
704 raise AssertionError, "Unreachable code reached! :-Q"
706 # connects to the remote host and starts a remote connection
707 if isinstance(source, file):
708 proc = subprocess.Popen(args,
709 stdout = open('/dev/null','w'),
710 stderr = subprocess.PIPE,
712 err = proc.stderr.read()
713 proc._known_hosts = tmp_known_hosts
714 eintr_retry(proc.wait)()
715 return ((None,err), proc)
716 elif isinstance(dest, file):
717 proc = subprocess.Popen(args,
718 stdout = open('/dev/null','w'),
719 stderr = subprocess.PIPE,
721 err = proc.stderr.read()
722 proc._known_hosts = tmp_known_hosts
723 eintr_retry(proc.wait)()
724 return ((None,err), proc)
725 elif hasattr(source, 'read'):
726 # file-like (but not file) source
727 proc = subprocess.Popen(args,
728 stdout = open('/dev/null','w'),
729 stderr = subprocess.PIPE,
730 stdin = subprocess.PIPE)
736 buf = source.read(4096)
741 rdrdy, wrdy, broken = select.select(
744 [proc.stderr,proc.stdin])
746 if proc.stderr in rdrdy:
747 # use os.read for fully unbuffered behavior
748 err.append(os.read(proc.stderr.fileno(), 4096))
750 if proc.stdin in wrdy:
751 proc.stdin.write(buf)
757 err.append(proc.stderr.read())
759 proc._known_hosts = tmp_known_hosts
760 eintr_retry(proc.wait)()
761 return ((None,''.join(err)), proc)
762 elif hasattr(dest, 'write'):
763 # file-like (but not file) dest
764 proc = subprocess.Popen(args,
765 stdout = subprocess.PIPE,
766 stderr = subprocess.PIPE,
767 stdin = open('/dev/null','w'))
772 rdrdy, wrdy, broken = select.select(
773 [proc.stderr, proc.stdout],
775 [proc.stderr, proc.stdout])
777 if proc.stderr in rdrdy:
778 # use os.read for fully unbuffered behavior
779 err.append(os.read(proc.stderr.fileno(), 4096))
781 if proc.stdout in rdrdy:
782 # use os.read for fully unbuffered behavior
783 buf = os.read(proc.stdout.fileno(), 4096)
792 err.append(proc.stderr.read())
794 proc._known_hosts = tmp_known_hosts
795 eintr_retry(proc.wait)()
796 return ((None,''.join(err)), proc)
798 raise AssertionError, "Unreachable code reached! :-Q"
800 # Parse destination as <user>@<server>:<path>
801 if isinstance(dest, basestring) and ':' in dest:
802 remspec, path = dest.split(':',1)
803 elif isinstance(source, basestring) and ':' in source:
804 remspec, path = source.split(':',1)
806 raise ValueError, "Both endpoints cannot be local"
807 user,host = remspec.rsplit('@',1)
810 tmp_known_hosts = None
811 args = ['scp', '-q', '-p', '-C',
812 # Don't bother with localhost. Makes test easier
813 '-o', 'NoHostAuthenticationForLocalhost=yes' ]
815 args.append('-P%d' % port)
819 args.extend(('-i', ident_key))
821 # Create a temporary server key file
822 tmp_known_hosts = _make_server_key_args(
823 server_key, host, port, args)
824 if isinstance(source,list):
830 # connects to the remote host and starts a remote connection
831 proc = subprocess.Popen(args,
832 stdout = subprocess.PIPE,
833 stdin = subprocess.PIPE,
834 stderr = subprocess.PIPE)
835 proc._known_hosts = tmp_known_hosts
837 comm = proc.communicate()
838 eintr_retry(proc.wait)()
841 def decode_and_execute():
842 # The python code we want to execute might have characters that
843 # are not compatible with the 'inline' mode we are using. To avoid
844 # problems we receive the encoded python code in base64 as a input
845 # stream and decode it for execution.
850 cmd += os.read(0, 1)# one byte from stdin
852 if e.errno == errno.EINTR:
858 cmd = base64.b64decode(cmd)
859 # Uncomment for debug
860 #os.write(2, "Executing python code: %s\n" % cmd)
861 os.write(1, "OK\n") # send a sync message
864 def popen_python(python_code,
865 communication = DC.ACCESS_LOCAL,
875 environment_setup = ""):
879 python_path.replace("'", r"'\''")
880 cmd = """PYTHONPATH="$PYTHONPATH":'%s' """ % python_path
882 if environment_setup:
883 cmd += environment_setup
885 # Uncomment for debug (to run everything under strace)
886 # We had to verify if strace works (cannot nest them)
887 #cmd += "if strace echo >/dev/null 2>&1; then CMD='strace -ff -tt -s 200 -o strace.out'; else CMD=''; fi\n"
889 #cmd += "strace -f -tt -s 200 -o strace$$.out "
891 cmd += "python -c 'import sys; sys.path.insert(0,%s); from nepi.util import server; server.decode_and_execute()'" % (
892 repr(os.path.dirname(os.path.dirname(nepi.__file__))).replace("'",'"'),
897 cmd = "sudo bash -c " + shell_escape(cmd)
901 if communication == DC.ACCESS_SSH:
902 tmp_known_hosts = None
904 # Don't bother with localhost. Makes test easier
905 '-o', 'NoHostAuthenticationForLocalhost=yes',
910 args.append('-p%d' % port)
912 args.extend(('-i', ident_key))
916 # Create a temporary server key file
917 tmp_known_hosts = _make_server_key_args(
918 server_key, host, port, args)
921 args = [ "/bin/bash", "-c", cmd ]
923 # connects to the remote host and starts a remote
924 proc = subprocess.Popen(args,
926 stdout = subprocess.PIPE,
927 stdin = subprocess.PIPE,
928 stderr = subprocess.PIPE)
930 if communication == DC.ACCESS_SSH:
931 proc._known_hosts = tmp_known_hosts
933 # send the command to execute
934 os.write(proc.stdin.fileno(),
935 base64.b64encode(python_code) + "\n")
939 msg = os.read(proc.stdout.fileno(), 3)
942 if e.errno == errno.EINTR:
948 raise RuntimeError, "Failed to start remote python interpreter: \nout:\n%s%s\nerr:\n%s" % (
949 msg, proc.stdout.read(), proc.stderr.read())
954 def _communicate(self, input, timeout=None, err_on_timeout=True):
957 stdout = None # Return
958 stderr = None # Return
962 if timeout is not None:
963 timelimit = time.time() + timeout
964 killtime = timelimit + 4
965 bailtime = timelimit + 4
968 # Flush stdio buffer. This might block, if the user has
969 # been writing to .stdin in an uncontrolled fashion.
972 write_set.append(self.stdin)
976 read_set.append(self.stdout)
979 read_set.append(self.stderr)
983 while read_set or write_set:
984 if timeout is not None:
985 curtime = time.time()
986 if timeout is None or curtime > timelimit:
987 if curtime > bailtime:
989 elif curtime > killtime:
990 signum = signal.SIGKILL
992 signum = signal.SIGTERM
994 os.kill(self.pid, signum)
997 select_timeout = timelimit - curtime + 0.1
999 select_timeout = None
1002 rlist, wlist, xlist = select.select(read_set, write_set, [], select_timeout)
1003 except select.error,e:
1009 if self.stdin in wlist:
1010 # When select has indicated that the file is writable,
1011 # we can write up to PIPE_BUF bytes without risk
1012 # blocking. POSIX defines PIPE_BUF >= 512
1013 bytes_written = os.write(self.stdin.fileno(), buffer(input, input_offset, 512))
1014 input_offset += bytes_written
1015 if input_offset >= len(input):
1017 write_set.remove(self.stdin)
1019 if self.stdout in rlist:
1020 data = os.read(self.stdout.fileno(), 1024)
1023 read_set.remove(self.stdout)
1026 if self.stderr in rlist:
1027 data = os.read(self.stderr.fileno(), 1024)
1030 read_set.remove(self.stderr)
1033 # All data exchanged. Translate lists into strings.
1034 if stdout is not None:
1035 stdout = ''.join(stdout)
1036 if stderr is not None:
1037 stderr = ''.join(stderr)
1039 # Translate newlines, if requested. We cannot let the file
1040 # object do the translation: It is based on stdio, which is
1041 # impossible to combine with select (unless forcing no
1043 if self.universal_newlines and hasattr(file, 'newlines'):
1045 stdout = self._translate_newlines(stdout)
1047 stderr = self._translate_newlines(stderr)
1049 if killed and err_on_timeout:
1050 errcode = self.poll()
1051 raise RuntimeError, ("Operation timed out", errcode, stdout, stderr)
1057 return (stdout, stderr)