2 # -*- coding: utf-8 -*-
4 from nepi.util.constants import DeploymentConfiguration as DC
26 CTRL_SOCK = "ctrl.sock"
28 STD_ERR = "stderr.log"
33 TRACE = os.environ.get("NEPI_TRACE", "false").lower() in ("true", "1", "on")
35 if hasattr(os, "devnull"):
38 DEV_NULL = "/dev/null"
40 SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
43 """ Escapes strings so that they are safe to use as command-line arguments """
44 if SHELL_SAFE.match(s):
45 # safe string - no escaping needed
48 # unsafe string - escape
50 if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",'"'):
53 return "'$'\\x%02x''" % (ord(c),)
54 s = ''.join(map(escp,s))
57 def eintr_retry(func):
59 @functools.wraps(func)
61 retry = kw.pop("_retry", False)
62 for i in xrange(0 if retry else 4):
65 except (select.error, socket.error), args:
66 if args[0] == errno.EINTR:
71 if e.errno == errno.EINTR:
80 def __init__(self, root_dir = ".", log_level = DC.ERROR_LEVEL,
81 environment_setup = "", clean_root = False):
82 self._root_dir = root_dir
83 self._clean_root = clean_root
85 self._ctrl_sock = None
86 self._log_level = log_level
88 self._environment_setup = environment_setup
97 # can not return normally after fork beacuse no exec was done.
98 # This means that if we don't do a os._exit(0) here the code that
99 # follows the call to "Server.run()" in the "caller code" will be
100 # executed... but by now it has already been executed after the
101 # first process (the one that did the first fork) returned.
104 print >>sys.stderr, "SERVER_ERROR."
108 print >>sys.stderr, "SERVER_READY."
111 # pipes for process synchronization
115 root = os.path.normpath(self._root_dir)
116 if self._root_dir not in [".", ""] and os.path.exists(root) \
117 and self._clean_root:
119 if not os.path.exists(root):
120 os.makedirs(root, 0755)
128 except OSError, e: # pragma: no cover
129 if e.errno == errno.EINTR:
135 # os.waitpid avoids leaving a <defunc> (zombie) process
136 st = os.waitpid(pid1, 0)[1]
138 raise RuntimeError("Daemonization failed")
139 # return 0 to inform the caller method that this is not the
144 # Decouple from parent environment.
145 os.chdir(self._root_dir)
152 # see ref: "os._exit(0)"
155 # close all open file descriptors.
156 max_fd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
157 if (max_fd == resource.RLIM_INFINITY):
159 for fd in range(3, max_fd):
166 # Redirect standard file descriptors.
167 stdin = open(DEV_NULL, "r")
168 stderr = stdout = open(STD_ERR, "a", 0)
169 os.dup2(stdin.fileno(), sys.stdin.fileno())
170 # NOTE: sys.stdout.write will still be buffered, even if the file
171 # was opened with 0 buffer
172 os.dup2(stdout.fileno(), sys.stdout.fileno())
173 os.dup2(stderr.fileno(), sys.stderr.fileno())
176 if self._environment_setup:
177 # parse environment variables and pass to child process
178 # do it by executing shell commands, in case there's some heavy setup involved
179 envproc = subprocess.Popen(
181 "( %s python -c 'import os,sys ; print \"\\x01\".join(\"\\x02\".join(map(str,x)) for x in os.environ.iteritems())' ) | tail -1" %
182 ( self._environment_setup, ) ],
183 stdin = subprocess.PIPE,
184 stdout = subprocess.PIPE,
185 stderr = subprocess.PIPE
187 out,err = envproc.communicate()
189 # parse new environment
191 environment = dict(map(lambda x:x.split("\x02"), out.split("\x01")))
193 # apply to current environment
194 for name, value in environment.iteritems():
195 os.environ[name] = value
198 if 'PYTHONPATH' in environment:
199 sys.path = environment['PYTHONPATH'].split(':') + sys.path
201 # create control socket
202 self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
204 self._ctrl_sock.bind(CTRL_SOCK)
206 # Address in use, check pidfile
209 pidfile = open(CTRL_PID, "r")
218 # Check process liveliness
219 if not os.path.exists("/proc/%d" % (pid,)):
220 # Ok, it's dead, clean the socket
224 self._ctrl_sock.bind(CTRL_SOCK)
226 self._ctrl_sock.listen(0)
229 pidfile = open(CTRL_PID, "w")
230 pidfile.write(str(os.getpid()))
233 # let the parent process know that the daemonization is finished
238 def post_daemonize(self):
239 os.environ["NEPI_CONTROLLER_LOGLEVEL"] = self._log_level
240 # QT, for some strange reason, redefines the SIGCHILD handler to write
241 # a \0 to a fd (lets say fileno 'x'), when ever a SIGCHILD is received.
242 # Server dameonization closes all file descriptors from fileno '3',
243 # but the overloaded handler (inherited by the forked process) will
244 # keep trying to write the \0 to fileno 'x', which might have been reused
245 # after closing, for other operations. This is bad bad bad when fileno 'x'
246 # is in use for communication pouroses, because unexpected \0 start
247 # appearing in the communication messages... this is exactly what happens
248 # when using netns in daemonized form. Thus, be have no other alternative than
249 # restoring the SIGCHLD handler to the default here.
251 signal.signal(signal.SIGCHLD, signal.SIG_DFL)
254 while not self._stop:
255 conn, addr = self._ctrl_sock.accept()
256 self.log_error("ACCEPTED CONNECTION: %s" % (addr,))
258 while not self._stop:
260 msg = self.recv_msg(conn)
261 except socket.timeout, e:
262 #self.log_error("SERVER recv_msg: connection timedout ")
266 self.log_error("CONNECTION LOST")
271 reply = self.stop_action()
273 reply = self.reply_action(msg)
276 self.send_reply(conn, reply)
279 self.log_error("NOTICE: Awaiting for reconnection")
287 def recv_msg(self, conn):
290 while '\n' not in chunk:
292 chunk = conn.recv(1024)
293 except (OSError, socket.error), e:
294 if e[0] != errno.EINTR:
303 data = ''.join(data).split('\n',1)
306 data, self._rdbuf = data
308 decoded = base64.b64decode(data)
309 return decoded.rstrip()
311 def send_reply(self, conn, reply):
312 encoded = base64.b64encode(reply)
313 conn.send("%s\n" % encoded)
317 self._ctrl_sock.close()
322 def stop_action(self):
323 return "Stopping server"
325 def reply_action(self, msg):
326 return "Reply to: %s" % msg
328 def log_error(self, text = None, context = ''):
330 text = traceback.format_exc()
331 date = time.strftime("%Y-%m-%d %H:%M:%S")
333 context = " (%s)" % (context,)
334 sys.stderr.write("ERROR%s: %s\n%s\n" % (context, date, text))
337 def log_debug(self, text):
338 if self._log_level == DC.DEBUG_LEVEL:
339 date = time.strftime("%Y-%m-%d %H:%M:%S")
340 sys.stderr.write("DEBUG: %s\n%s\n" % (date, text))
342 class Forwarder(object):
343 def __init__(self, root_dir = "."):
344 self._ctrl_sock = None
345 self._root_dir = root_dir
351 print >>sys.stderr, "FORWARDER_READY."
352 while not self._stop:
353 data = self.read_data()
355 # Connection to client lost
357 self.send_to_server(data)
359 data = self.recv_from_server()
361 # Connection to server lost
362 raise IOError, "Connection to server lost while "\
364 self.write_data(data)
368 return sys.stdin.readline()
370 def write_data(self, data):
371 sys.stdout.write(data)
372 # sys.stdout.write is buffered, this is why we need to do a flush()
375 def send_to_server(self, data):
377 self._ctrl_sock.send(data)
378 except (IOError, socket.error), e:
379 if e[0] == errno.EPIPE:
381 self._ctrl_sock.send(data)
384 encoded = data.rstrip()
385 msg = base64.b64decode(encoded)
389 def recv_from_server(self):
392 while '\n' not in chunk:
394 chunk = self._ctrl_sock.recv(1024)
395 except (OSError, socket.error), e:
396 if e[0] != errno.EINTR:
404 data = ''.join(data).split('\n',1)
407 data, self._rdbuf = data
413 self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
414 sock_addr = os.path.join(self._root_dir, CTRL_SOCK)
415 self._ctrl_sock.connect(sock_addr)
417 def disconnect(self):
419 self._ctrl_sock.close()
423 class Client(object):
424 def __init__(self, root_dir = ".", host = None, port = None, user = None,
425 agent = None, sudo = False, communication = DC.ACCESS_LOCAL,
426 environment_setup = ""):
427 self.root_dir = root_dir
428 self.addr = (host, port)
432 self.communication = communication
433 self.environment_setup = environment_setup
434 self._stopped = False
435 self._deferreds = collections.deque()
439 if self._process.poll() is None:
440 os.kill(self._process.pid, signal.SIGTERM)
444 root_dir = self.root_dir
445 (host, port) = self.addr
449 communication = self.communication
451 python_code = "from nepi.util import server;c=server.Forwarder(%r);\
452 c.forward()" % (root_dir,)
454 self._process = popen_python(python_code,
455 communication = communication,
461 environment_setup = self.environment_setup)
463 # Wait for the forwarder to be ready, otherwise nobody
464 # will be able to connect to it
468 helo = self._process.stderr.readline()
469 if helo == 'FORWARDER_READY.\n':
473 raise AssertionError, "Expected 'FORWARDER_READY.', got: %s" % (''.join(err),)
475 def send_msg(self, msg):
476 encoded = base64.b64encode(msg)
477 data = "%s\n" % encoded
480 self._process.stdin.write(data)
481 except (IOError, ValueError):
482 # dead process, poll it to un-zombify
485 # try again after reconnect
486 # If it fails again, though, give up
488 self._process.stdin.write(data)
491 self.send_msg(STOP_MSG)
494 def defer_reply(self, transform=None):
496 self._deferreds.append(defer_entry)
498 functools.partial(self.read_reply, defer_entry, transform)
501 def _read_reply(self):
502 data = self._process.stdout.readline()
503 encoded = data.rstrip()
505 # empty == eof == dead process, poll it to un-zombify
508 raise RuntimeError, "Forwarder died while awaiting reply: %s" % (self._process.stderr.read(),)
509 return base64.b64decode(encoded)
511 def read_reply(self, which=None, transform=None):
512 # Test to see if someone did it already
513 if which is not None and len(which):
515 # ...just return the deferred value
517 return transform(which[0])
521 # Process all deferreds until the one we're looking for
522 # or until the queue is empty
523 while self._deferreds:
525 deferred = self._deferreds.popleft()
530 deferred.append(self._read_reply())
531 if deferred is which:
532 # We reached the one we were looking for
534 return transform(deferred[0])
539 # They've requested a synchronous read
541 return transform(self._read_reply())
543 return self._read_reply()
545 def _make_server_key_args(server_key, host, port, args):
547 Returns a reference to the created temporary file, and adds the
548 corresponding arguments to the given argument list.
550 Make sure to hold onto it until the process is done with the file
553 host = '%s:%s' % (host,port)
554 # Create a temporary server key file
555 tmp_known_hosts = tempfile.NamedTemporaryFile()
557 # Add the intended host key
558 tmp_known_hosts.write('%s,%s %s\n' % (host, socket.gethostbyname(host), server_key))
560 # If we're not in strict mode, add user-configured keys
561 if os.environ.get('NEPI_STRICT_AUTH_MODE',"").lower() not in ('1','true','on'):
562 user_hosts_path = '%s/.ssh/known_hosts' % (os.environ.get('HOME',""),)
563 if os.access(user_hosts_path, os.R_OK):
564 f = open(user_hosts_path, "r")
565 tmp_known_hosts.write(f.read())
568 tmp_known_hosts.flush()
570 args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
572 return tmp_known_hosts
574 def popen_ssh_command(command, host, port, user, agent,
581 err_on_timeout = True):
583 Executes a remote commands, returns ((stdout,stderr),process)
586 print "ssh", host, command
588 tmp_known_hosts = None
590 # Don't bother with localhost. Makes test easier
591 '-o', 'NoHostAuthenticationForLocalhost=yes',
596 args.append('-p%d' % port)
598 args.extend(('-i', ident_key))
602 # Create a temporary server key file
603 tmp_known_hosts = _make_server_key_args(
604 server_key, host, port, args)
608 # connects to the remote host and starts a remote connection
609 proc = subprocess.Popen(args,
610 stdout = subprocess.PIPE,
611 stdin = subprocess.PIPE,
612 stderr = subprocess.PIPE)
614 # attach tempfile object to the process, to make sure the file stays
615 # alive until the process is finished with it
616 proc._known_hosts = tmp_known_hosts
619 out, err = _communicate(proc, stdin, timeout, err_on_timeout)
621 except RuntimeError,e:
625 print " timedout -> ", e.args
629 print " -> ", out, err
631 return ((out, err), proc)
633 def popen_scp(source, dest,
640 Copies from/to remote sites.
642 Source and destination should have the user and host encoded
645 If source is a file object, a special mode will be used to
646 create the remote file with the same contents.
648 If dest is a file object, the remote file (source) will be
649 read and written into dest.
651 In these modes, recursive cannot be True.
653 Source can be a list of files to copy to a single destination,
654 in which case it is advised that the destination be a folder.
658 print "scp", source, dest
660 if isinstance(source, file) and source.tell() == 0:
662 elif hasattr(source, 'read'):
663 tmp = tempfile.NamedTemporaryFile()
665 buf = source.read(65536)
673 if isinstance(source, file) or isinstance(dest, file) \
674 or hasattr(source, 'read') or hasattr(dest, 'write'):
677 # Parse source/destination as <user>@<server>:<path>
678 if isinstance(dest, basestring) and ':' in dest:
679 remspec, path = dest.split(':',1)
680 elif isinstance(source, basestring) and ':' in source:
681 remspec, path = source.split(':',1)
683 raise ValueError, "Both endpoints cannot be local"
684 user,host = remspec.rsplit('@',1)
685 tmp_known_hosts = None
687 args = ['ssh', '-l', user, '-C',
688 # Don't bother with localhost. Makes test easier
689 '-o', 'NoHostAuthenticationForLocalhost=yes',
692 args.append('-P%d' % port)
694 args.extend(('-i', ident_key))
696 # Create a temporary server key file
697 tmp_known_hosts = _make_server_key_args(
698 server_key, host, port, args)
700 if isinstance(source, file) or hasattr(source, 'read'):
701 args.append('cat > %s' % (shell_escape(path),))
702 elif isinstance(dest, file) or hasattr(dest, 'write'):
703 args.append('cat %s' % (shell_escape(path),))
705 raise AssertionError, "Unreachable code reached! :-Q"
707 # connects to the remote host and starts a remote connection
708 if isinstance(source, file):
709 proc = subprocess.Popen(args,
710 stdout = open('/dev/null','w'),
711 stderr = subprocess.PIPE,
713 err = proc.stderr.read()
714 proc._known_hosts = tmp_known_hosts
715 eintr_retry(proc.wait)()
716 return ((None,err), proc)
717 elif isinstance(dest, file):
718 proc = subprocess.Popen(args,
719 stdout = open('/dev/null','w'),
720 stderr = subprocess.PIPE,
722 err = proc.stderr.read()
723 proc._known_hosts = tmp_known_hosts
724 eintr_retry(proc.wait)()
725 return ((None,err), proc)
726 elif hasattr(source, 'read'):
727 # file-like (but not file) source
728 proc = subprocess.Popen(args,
729 stdout = open('/dev/null','w'),
730 stderr = subprocess.PIPE,
731 stdin = subprocess.PIPE)
737 buf = source.read(4096)
742 rdrdy, wrdy, broken = select.select(
745 [proc.stderr,proc.stdin])
747 if proc.stderr in rdrdy:
748 # use os.read for fully unbuffered behavior
749 err.append(os.read(proc.stderr.fileno(), 4096))
751 if proc.stdin in wrdy:
752 proc.stdin.write(buf)
758 err.append(proc.stderr.read())
760 proc._known_hosts = tmp_known_hosts
761 eintr_retry(proc.wait)()
762 return ((None,''.join(err)), proc)
763 elif hasattr(dest, 'write'):
764 # file-like (but not file) dest
765 proc = subprocess.Popen(args,
766 stdout = subprocess.PIPE,
767 stderr = subprocess.PIPE,
768 stdin = open('/dev/null','w'))
773 rdrdy, wrdy, broken = select.select(
774 [proc.stderr, proc.stdout],
776 [proc.stderr, proc.stdout])
778 if proc.stderr in rdrdy:
779 # use os.read for fully unbuffered behavior
780 err.append(os.read(proc.stderr.fileno(), 4096))
782 if proc.stdout in rdrdy:
783 # use os.read for fully unbuffered behavior
784 buf = os.read(proc.stdout.fileno(), 4096)
793 err.append(proc.stderr.read())
795 proc._known_hosts = tmp_known_hosts
796 eintr_retry(proc.wait)()
797 return ((None,''.join(err)), proc)
799 raise AssertionError, "Unreachable code reached! :-Q"
801 # Parse destination as <user>@<server>:<path>
802 if isinstance(dest, basestring) and ':' in dest:
803 remspec, path = dest.split(':',1)
804 elif isinstance(source, basestring) and ':' in source:
805 remspec, path = source.split(':',1)
807 raise ValueError, "Both endpoints cannot be local"
808 user,host = remspec.rsplit('@',1)
811 tmp_known_hosts = None
812 args = ['scp', '-q', '-p', '-C',
813 # Don't bother with localhost. Makes test easier
814 '-o', 'NoHostAuthenticationForLocalhost=yes' ]
816 args.append('-P%d' % port)
820 args.extend(('-i', ident_key))
822 # Create a temporary server key file
823 tmp_known_hosts = _make_server_key_args(
824 server_key, host, port, args)
825 if isinstance(source,list):
831 # connects to the remote host and starts a remote connection
832 proc = subprocess.Popen(args,
833 stdout = subprocess.PIPE,
834 stdin = subprocess.PIPE,
835 stderr = subprocess.PIPE)
836 proc._known_hosts = tmp_known_hosts
838 comm = proc.communicate()
839 eintr_retry(proc.wait)()
842 def decode_and_execute():
843 # The python code we want to execute might have characters that
844 # are not compatible with the 'inline' mode we are using. To avoid
845 # problems we receive the encoded python code in base64 as a input
846 # stream and decode it for execution.
851 cmd += os.read(0, 1)# one byte from stdin
853 if e.errno == errno.EINTR:
859 cmd = base64.b64decode(cmd)
860 # Uncomment for debug
861 #os.write(2, "Executing python code: %s\n" % cmd)
862 os.write(1, "OK\n") # send a sync message
865 def popen_python(python_code,
866 communication = DC.ACCESS_LOCAL,
876 environment_setup = ""):
880 python_path.replace("'", r"'\''")
881 cmd = """PYTHONPATH="$PYTHONPATH":'%s' """ % python_path
883 if environment_setup:
884 cmd += environment_setup
886 # Uncomment for debug (to run everything under strace)
887 # We had to verify if strace works (cannot nest them)
888 #cmd += "if strace echo >/dev/null 2>&1; then CMD='strace -ff -tt -s 200 -o strace.out'; else CMD=''; fi\n"
890 #cmd += "strace -f -tt -s 200 -o strace$$.out "
892 cmd += "python -c 'import sys; sys.path.insert(0,%s); from nepi.util import server; server.decode_and_execute()'" % (
893 repr(os.path.dirname(os.path.dirname(nepi.__file__))).replace("'",'"'),
898 cmd = "sudo bash -c " + shell_escape(cmd)
902 if communication == DC.ACCESS_SSH:
903 tmp_known_hosts = None
905 # Don't bother with localhost. Makes test easier
906 '-o', 'NoHostAuthenticationForLocalhost=yes',
911 args.append('-p%d' % port)
913 args.extend(('-i', ident_key))
917 # Create a temporary server key file
918 tmp_known_hosts = _make_server_key_args(
919 server_key, host, port, args)
922 args = [ "/bin/bash", "-c", cmd ]
924 # connects to the remote host and starts a remote
925 proc = subprocess.Popen(args,
927 stdout = subprocess.PIPE,
928 stdin = subprocess.PIPE,
929 stderr = subprocess.PIPE)
931 if communication == DC.ACCESS_SSH:
932 proc._known_hosts = tmp_known_hosts
934 # send the command to execute
935 os.write(proc.stdin.fileno(),
936 base64.b64encode(python_code) + "\n")
940 msg = os.read(proc.stdout.fileno(), 3)
943 if e.errno == errno.EINTR:
949 raise RuntimeError, "Failed to start remote python interpreter: \nout:\n%s%s\nerr:\n%s" % (
950 msg, proc.stdout.read(), proc.stderr.read())
955 def _communicate(self, input, timeout=None, err_on_timeout=True):
958 stdout = None # Return
959 stderr = None # Return
963 if timeout is not None:
964 timelimit = time.time() + timeout
965 killtime = timelimit + 4
966 bailtime = timelimit + 4
969 # Flush stdio buffer. This might block, if the user has
970 # been writing to .stdin in an uncontrolled fashion.
973 write_set.append(self.stdin)
977 read_set.append(self.stdout)
980 read_set.append(self.stderr)
984 while read_set or write_set:
985 if timeout is not None:
986 curtime = time.time()
987 if timeout is None or curtime > timelimit:
988 if curtime > bailtime:
990 elif curtime > killtime:
991 signum = signal.SIGKILL
993 signum = signal.SIGTERM
995 os.kill(self.pid, signum)
998 select_timeout = timelimit - curtime + 0.1
1000 select_timeout = None
1003 rlist, wlist, xlist = select.select(read_set, write_set, [], select_timeout)
1004 except select.error,e:
1010 if self.stdin in wlist:
1011 # When select has indicated that the file is writable,
1012 # we can write up to PIPE_BUF bytes without risk
1013 # blocking. POSIX defines PIPE_BUF >= 512
1014 bytes_written = os.write(self.stdin.fileno(), buffer(input, input_offset, 512))
1015 input_offset += bytes_written
1016 if input_offset >= len(input):
1018 write_set.remove(self.stdin)
1020 if self.stdout in rlist:
1021 data = os.read(self.stdout.fileno(), 1024)
1024 read_set.remove(self.stdout)
1027 if self.stderr in rlist:
1028 data = os.read(self.stderr.fileno(), 1024)
1031 read_set.remove(self.stderr)
1034 # All data exchanged. Translate lists into strings.
1035 if stdout is not None:
1036 stdout = ''.join(stdout)
1037 if stderr is not None:
1038 stderr = ''.join(stderr)
1040 # Translate newlines, if requested. We cannot let the file
1041 # object do the translation: It is based on stdio, which is
1042 # impossible to combine with select (unless forcing no
1044 if self.universal_newlines and hasattr(file, 'newlines'):
1046 stdout = self._translate_newlines(stdout)
1048 stderr = self._translate_newlines(stderr)
1050 if killed and err_on_timeout:
1051 errcode = self.poll()
1052 raise RuntimeError, ("Operation timed out", errcode, stdout, stderr)
1058 return (stdout, stderr)