2 # -*- coding: utf-8 -*-
4 from nepi.util.constants import DeploymentConfiguration as DC
26 CTRL_SOCK = "ctrl.sock"
28 STD_ERR = "stderr.log"
33 TRACE = os.environ.get("NEPI_TRACE", "false").lower() in ("true", "1", "on")
35 if hasattr(os, "devnull"):
38 DEV_NULL = "/dev/null"
40 SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
43 """ Escapes strings so that they are safe to use as command-line arguments """
44 if SHELL_SAFE.match(s):
45 # safe string - no escaping needed
48 # unsafe string - escape
50 if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",'"'):
53 return "'$'\\x%02x''" % (ord(c),)
54 s = ''.join(map(escp,s))
57 def eintr_retry(func):
59 @functools.wraps(func)
61 retry = kw.pop("_retry", False)
62 for i in xrange(0 if retry else 4):
65 except (select.error, socket.error), args:
66 if args[0] == errno.EINTR:
71 if e.errno == errno.EINTR:
80 def __init__(self, root_dir = ".", log_level = DC.ERROR_LEVEL,
81 environment_setup = "", clean_root = False):
82 self._root_dir = root_dir
83 self._clean_root = clean_root
85 self._ctrl_sock = None
86 self._log_level = log_level
88 self._environment_setup = environment_setup
97 # can not return normally after fork beacuse no exec was done.
98 # This means that if we don't do a os._exit(0) here the code that
99 # follows the call to "Server.run()" in the "caller code" will be
100 # executed... but by now it has already been executed after the
101 # first process (the one that did the first fork) returned.
104 print >>sys.stderr, "SERVER_ERROR."
108 print >>sys.stderr, "SERVER_READY."
111 # pipes for process synchronization
115 root = os.path.normpath(self._root_dir)
116 if self._root_dir not in [".", ""] and os.path.exists(root) \
117 and self._clean_root:
119 if not os.path.exists(root):
120 os.makedirs(root, 0755)
128 except OSError, e: # pragma: no cover
129 if e.errno == errno.EINTR:
135 # os.waitpid avoids leaving a <defunc> (zombie) process
136 st = os.waitpid(pid1, 0)[1]
138 raise RuntimeError("Daemonization failed")
139 # return 0 to inform the caller method that this is not the
144 # Decouple from parent environment.
145 os.chdir(self._root_dir)
152 # see ref: "os._exit(0)"
155 # close all open file descriptors.
156 max_fd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
157 if (max_fd == resource.RLIM_INFINITY):
159 for fd in range(3, max_fd):
166 # Redirect standard file descriptors.
167 stdin = open(DEV_NULL, "r")
168 stderr = stdout = open(STD_ERR, "a", 0)
169 os.dup2(stdin.fileno(), sys.stdin.fileno())
170 # NOTE: sys.stdout.write will still be buffered, even if the file
171 # was opened with 0 buffer
172 os.dup2(stdout.fileno(), sys.stdout.fileno())
173 os.dup2(stderr.fileno(), sys.stderr.fileno())
176 if self._environment_setup:
177 # parse environment variables and pass to child process
178 # do it by executing shell commands, in case there's some heavy setup involved
179 envproc = subprocess.Popen(
181 "( %s python -c 'import os,sys ; print \"\\x01\".join(\"\\x02\".join(map(str,x)) for x in os.environ.iteritems())' ) | tail -1" %
182 ( self._environment_setup, ) ],
183 stdin = subprocess.PIPE,
184 stdout = subprocess.PIPE,
185 stderr = subprocess.PIPE
187 out,err = envproc.communicate()
189 # parse new environment
191 environment = dict(map(lambda x:x.split("\x02"), out.split("\x01")))
193 # apply to current environment
194 for name, value in environment.iteritems():
195 os.environ[name] = value
198 if 'PYTHONPATH' in environment:
199 sys.path = environment['PYTHONPATH'].split(':') + sys.path
201 # create control socket
202 self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
204 self._ctrl_sock.bind(CTRL_SOCK)
206 # Address in use, check pidfile
209 pidfile = open(CTRL_PID, "r")
218 # Check process liveliness
219 if not os.path.exists("/proc/%d" % (pid,)):
220 # Ok, it's dead, clean the socket
224 self._ctrl_sock.bind(CTRL_SOCK)
226 self._ctrl_sock.listen(0)
229 pidfile = open(CTRL_PID, "w")
230 pidfile.write(str(os.getpid()))
233 # let the parent process know that the daemonization is finished
238 def post_daemonize(self):
239 os.environ["NEPI_CONTROLLER_LOGLEVEL"] = self._log_level
240 # QT, for some strange reason, redefines the SIGCHILD handler to write
241 # a \0 to a fd (lets say fileno 'x'), when ever a SIGCHILD is received.
242 # Server dameonization closes all file descriptors from fileno '3',
243 # but the overloaded handler (inherited by the forked process) will
244 # keep trying to write the \0 to fileno 'x', which might have been reused
245 # after closing, for other operations. This is bad bad bad when fileno 'x'
246 # is in use for communication pouroses, because unexpected \0 start
247 # appearing in the communication messages... this is exactly what happens
248 # when using netns in daemonized form. Thus, be have no other alternative than
249 # restoring the SIGCHLD handler to the default here.
251 signal.signal(signal.SIGCHLD, signal.SIG_DFL)
254 while not self._stop:
255 conn, addr = self._ctrl_sock.accept()
256 self.log_error("ACCEPTED CONNECTION: %s" % (addr,))
258 while not self._stop:
260 msg = self.recv_msg(conn)
261 except socket.timeout, e:
262 #self.log_error("SERVER recv_msg: connection timedout ")
266 self.log_error("CONNECTION LOST")
271 reply = self.stop_action()
273 reply = self.reply_action(msg)
276 self.send_reply(conn, reply)
279 self.log_error("NOTICE: Awaiting for reconnection")
287 def recv_msg(self, conn):
290 while '\n' not in chunk:
292 chunk = conn.recv(1024)
293 except (OSError, socket.error), e:
294 if e[0] != errno.EINTR:
303 data = ''.join(data).split('\n',1)
306 data, self._rdbuf = data
308 decoded = base64.b64decode(data)
309 return decoded.rstrip()
311 def send_reply(self, conn, reply):
312 encoded = base64.b64encode(reply)
313 conn.send("%s\n" % encoded)
317 self._ctrl_sock.close()
322 def stop_action(self):
323 return "Stopping server"
325 def reply_action(self, msg):
326 return "Reply to: %s" % msg
328 def log_error(self, text = None, context = ''):
330 text = traceback.format_exc()
331 date = time.strftime("%Y-%m-%d %H:%M:%S")
333 context = " (%s)" % (context,)
334 sys.stderr.write("ERROR%s: %s\n%s\n" % (context, date, text))
337 def log_debug(self, text):
338 if self._log_level == DC.DEBUG_LEVEL:
339 date = time.strftime("%Y-%m-%d %H:%M:%S")
340 sys.stderr.write("DEBUG: %s\n%s\n" % (date, text))
342 class Forwarder(object):
343 def __init__(self, root_dir = "."):
344 self._ctrl_sock = None
345 self._root_dir = root_dir
351 print >>sys.stderr, "FORWARDER_READY."
352 while not self._stop:
353 data = self.read_data()
355 # Connection to client lost
357 self.send_to_server(data)
359 data = self.recv_from_server()
361 # Connection to server lost
362 raise IOError, "Connection to server lost while "\
364 self.write_data(data)
368 return sys.stdin.readline()
370 def write_data(self, data):
371 sys.stdout.write(data)
372 # sys.stdout.write is buffered, this is why we need to do a flush()
375 def send_to_server(self, data):
377 self._ctrl_sock.send(data)
378 except (IOError, socket.error), e:
379 if e[0] == errno.EPIPE:
381 self._ctrl_sock.send(data)
384 encoded = data.rstrip()
385 msg = base64.b64decode(encoded)
389 def recv_from_server(self):
392 while '\n' not in chunk:
394 chunk = self._ctrl_sock.recv(1024)
395 except (OSError, socket.error), e:
396 if e[0] != errno.EINTR:
404 data = ''.join(data).split('\n',1)
407 data, self._rdbuf = data
413 self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
414 sock_addr = os.path.join(self._root_dir, CTRL_SOCK)
415 self._ctrl_sock.connect(sock_addr)
417 def disconnect(self):
419 self._ctrl_sock.close()
423 class Client(object):
424 def __init__(self, root_dir = ".", host = None, port = None, user = None,
425 agent = None, sudo = False, communication = DC.ACCESS_LOCAL,
426 environment_setup = ""):
427 self.root_dir = root_dir
428 self.addr = (host, port)
432 self.communication = communication
433 self.environment_setup = environment_setup
434 self._stopped = False
435 self._deferreds = collections.deque()
439 if self._process.poll() is None:
440 os.kill(self._process.pid, signal.SIGTERM)
444 root_dir = self.root_dir
445 (host, port) = self.addr
449 communication = self.communication
451 python_code = "from nepi.util import server;c=server.Forwarder(%r);\
452 c.forward()" % (root_dir,)
454 self._process = popen_python(python_code,
455 communication = communication,
461 environment_setup = self.environment_setup)
463 # Wait for the forwarder to be ready, otherwise nobody
464 # will be able to connect to it
468 helo = self._process.stderr.readline()
469 if helo == 'FORWARDER_READY.\n':
473 raise AssertionError, "Expected 'FORWARDER_READY.', got: %s" % (''.join(err),)
475 def send_msg(self, msg):
476 encoded = base64.b64encode(msg)
477 data = "%s\n" % encoded
480 self._process.stdin.write(data)
481 except (IOError, ValueError):
482 # dead process, poll it to un-zombify
485 # try again after reconnect
486 # If it fails again, though, give up
488 self._process.stdin.write(data)
491 self.send_msg(STOP_MSG)
494 def defer_reply(self, transform=None):
496 self._deferreds.append(defer_entry)
498 functools.partial(self.read_reply, defer_entry, transform)
501 def _read_reply(self):
502 data = self._process.stdout.readline()
503 encoded = data.rstrip()
505 # empty == eof == dead process, poll it to un-zombify
508 raise RuntimeError, "Forwarder died while awaiting reply: %s" % (self._process.stderr.read(),)
509 return base64.b64decode(encoded)
511 def read_reply(self, which=None, transform=None):
512 # Test to see if someone did it already
513 if which is not None and len(which):
515 # ...just return the deferred value
517 return transform(which[0])
521 # Process all deferreds until the one we're looking for
522 # or until the queue is empty
523 while self._deferreds:
525 deferred = self._deferreds.popleft()
530 deferred.append(self._read_reply())
531 if deferred is which:
532 # We reached the one we were looking for
534 return transform(deferred[0])
539 # They've requested a synchronous read
541 return transform(self._read_reply())
543 return self._read_reply()
545 def _make_server_key_args(server_key, host, port, args):
547 Returns a reference to the created temporary file, and adds the
548 corresponding arguments to the given argument list.
550 Make sure to hold onto it until the process is done with the file
553 host = '%s:%s' % (host,port)
554 # Create a temporary server key file
555 tmp_known_hosts = tempfile.NamedTemporaryFile()
557 # Add the intended host key
558 tmp_known_hosts.write('%s,%s %s\n' % (host, socket.gethostbyname(host), server_key))
560 # If we're not in strict mode, add user-configured keys
561 if os.environ.get('NEPI_STRICT_AUTH_MODE',"").lower() not in ('1','true','on'):
562 user_hosts_path = '%s/.ssh/known_hosts' % (os.environ.get('HOME',""),)
563 if os.access(user_hosts_path, os.R_OK):
564 f = open(user_hosts_path, "r")
565 tmp_known_hosts.write(f.read())
568 tmp_known_hosts.flush()
570 args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
572 return tmp_known_hosts
574 def popen_ssh_command(command, host, port, user, agent,
581 err_on_timeout = True,
582 connect_timeout = 30):
584 Executes a remote commands, returns ((stdout,stderr),process)
587 print "ssh", host, command
589 tmp_known_hosts = None
591 # Don't bother with localhost. Makes test easier
592 '-o', 'NoHostAuthenticationForLocalhost=yes,ConnectTimeout=%s' % (connect_timeout,),
597 args.append('-p%d' % port)
599 args.extend(('-i', ident_key))
603 # Create a temporary server key file
604 tmp_known_hosts = _make_server_key_args(
605 server_key, host, port, args)
608 for x in xrange(retry or 3):
609 # connects to the remote host and starts a remote connection
610 proc = subprocess.Popen(args,
611 stdout = subprocess.PIPE,
612 stdin = subprocess.PIPE,
613 stderr = subprocess.PIPE)
615 # attach tempfile object to the process, to make sure the file stays
616 # alive until the process is finished with it
617 proc._known_hosts = tmp_known_hosts
620 out, err = _communicate(proc, stdin, timeout, err_on_timeout)
621 if proc.poll() and err.strip().startswith('ssh: '):
622 # SSH error, can safely retry
625 except RuntimeError,e:
629 print " timedout -> ", e.args
633 print " -> ", out, err
635 return ((out, err), proc)
637 def popen_scp(source, dest,
644 Copies from/to remote sites.
646 Source and destination should have the user and host encoded
649 If source is a file object, a special mode will be used to
650 create the remote file with the same contents.
652 If dest is a file object, the remote file (source) will be
653 read and written into dest.
655 In these modes, recursive cannot be True.
657 Source can be a list of files to copy to a single destination,
658 in which case it is advised that the destination be a folder.
662 print "scp", source, dest
664 if isinstance(source, file) and source.tell() == 0:
666 elif hasattr(source, 'read'):
667 tmp = tempfile.NamedTemporaryFile()
669 buf = source.read(65536)
677 if isinstance(source, file) or isinstance(dest, file) \
678 or hasattr(source, 'read') or hasattr(dest, 'write'):
681 # Parse source/destination as <user>@<server>:<path>
682 if isinstance(dest, basestring) and ':' in dest:
683 remspec, path = dest.split(':',1)
684 elif isinstance(source, basestring) and ':' in source:
685 remspec, path = source.split(':',1)
687 raise ValueError, "Both endpoints cannot be local"
688 user,host = remspec.rsplit('@',1)
689 tmp_known_hosts = None
691 args = ['ssh', '-l', user, '-C',
692 # Don't bother with localhost. Makes test easier
693 '-o', 'NoHostAuthenticationForLocalhost=yes',
696 args.append('-P%d' % port)
698 args.extend(('-i', ident_key))
700 # Create a temporary server key file
701 tmp_known_hosts = _make_server_key_args(
702 server_key, host, port, args)
704 if isinstance(source, file) or hasattr(source, 'read'):
705 args.append('cat > %s' % (shell_escape(path),))
706 elif isinstance(dest, file) or hasattr(dest, 'write'):
707 args.append('cat %s' % (shell_escape(path),))
709 raise AssertionError, "Unreachable code reached! :-Q"
711 # connects to the remote host and starts a remote connection
712 if isinstance(source, file):
713 proc = subprocess.Popen(args,
714 stdout = open('/dev/null','w'),
715 stderr = subprocess.PIPE,
717 err = proc.stderr.read()
718 proc._known_hosts = tmp_known_hosts
719 eintr_retry(proc.wait)()
720 return ((None,err), proc)
721 elif isinstance(dest, file):
722 proc = subprocess.Popen(args,
723 stdout = open('/dev/null','w'),
724 stderr = subprocess.PIPE,
726 err = proc.stderr.read()
727 proc._known_hosts = tmp_known_hosts
728 eintr_retry(proc.wait)()
729 return ((None,err), proc)
730 elif hasattr(source, 'read'):
731 # file-like (but not file) source
732 proc = subprocess.Popen(args,
733 stdout = open('/dev/null','w'),
734 stderr = subprocess.PIPE,
735 stdin = subprocess.PIPE)
741 buf = source.read(4096)
746 rdrdy, wrdy, broken = select.select(
749 [proc.stderr,proc.stdin])
751 if proc.stderr in rdrdy:
752 # use os.read for fully unbuffered behavior
753 err.append(os.read(proc.stderr.fileno(), 4096))
755 if proc.stdin in wrdy:
756 proc.stdin.write(buf)
762 err.append(proc.stderr.read())
764 proc._known_hosts = tmp_known_hosts
765 eintr_retry(proc.wait)()
766 return ((None,''.join(err)), proc)
767 elif hasattr(dest, 'write'):
768 # file-like (but not file) dest
769 proc = subprocess.Popen(args,
770 stdout = subprocess.PIPE,
771 stderr = subprocess.PIPE,
772 stdin = open('/dev/null','w'))
777 rdrdy, wrdy, broken = select.select(
778 [proc.stderr, proc.stdout],
780 [proc.stderr, proc.stdout])
782 if proc.stderr in rdrdy:
783 # use os.read for fully unbuffered behavior
784 err.append(os.read(proc.stderr.fileno(), 4096))
786 if proc.stdout in rdrdy:
787 # use os.read for fully unbuffered behavior
788 buf = os.read(proc.stdout.fileno(), 4096)
797 err.append(proc.stderr.read())
799 proc._known_hosts = tmp_known_hosts
800 eintr_retry(proc.wait)()
801 return ((None,''.join(err)), proc)
803 raise AssertionError, "Unreachable code reached! :-Q"
805 # Parse destination as <user>@<server>:<path>
806 if isinstance(dest, basestring) and ':' in dest:
807 remspec, path = dest.split(':',1)
808 elif isinstance(source, basestring) and ':' in source:
809 remspec, path = source.split(':',1)
811 raise ValueError, "Both endpoints cannot be local"
812 user,host = remspec.rsplit('@',1)
815 tmp_known_hosts = None
816 args = ['scp', '-q', '-p', '-C',
817 # Don't bother with localhost. Makes test easier
818 '-o', 'NoHostAuthenticationForLocalhost=yes' ]
820 args.append('-P%d' % port)
824 args.extend(('-i', ident_key))
826 # Create a temporary server key file
827 tmp_known_hosts = _make_server_key_args(
828 server_key, host, port, args)
829 if isinstance(source,list):
835 # connects to the remote host and starts a remote connection
836 proc = subprocess.Popen(args,
837 stdout = subprocess.PIPE,
838 stdin = subprocess.PIPE,
839 stderr = subprocess.PIPE)
840 proc._known_hosts = tmp_known_hosts
842 comm = proc.communicate()
843 eintr_retry(proc.wait)()
846 def decode_and_execute():
847 # The python code we want to execute might have characters that
848 # are not compatible with the 'inline' mode we are using. To avoid
849 # problems we receive the encoded python code in base64 as a input
850 # stream and decode it for execution.
855 cmd += os.read(0, 1)# one byte from stdin
857 if e.errno == errno.EINTR:
863 cmd = base64.b64decode(cmd)
864 # Uncomment for debug
865 #os.write(2, "Executing python code: %s\n" % cmd)
866 os.write(1, "OK\n") # send a sync message
869 def popen_python(python_code,
870 communication = DC.ACCESS_LOCAL,
880 environment_setup = ""):
884 python_path.replace("'", r"'\''")
885 cmd = """PYTHONPATH="$PYTHONPATH":'%s' """ % python_path
887 if environment_setup:
888 cmd += environment_setup
890 # Uncomment for debug (to run everything under strace)
891 # We had to verify if strace works (cannot nest them)
892 #cmd += "if strace echo >/dev/null 2>&1; then CMD='strace -ff -tt -s 200 -o strace.out'; else CMD=''; fi\n"
894 #cmd += "strace -f -tt -s 200 -o strace$$.out "
896 cmd += "python -c 'import sys; sys.path.insert(0,%s); from nepi.util import server; server.decode_and_execute()'" % (
897 repr(os.path.dirname(os.path.dirname(nepi.__file__))).replace("'",'"'),
902 cmd = "sudo bash -c " + shell_escape(cmd)
906 if communication == DC.ACCESS_SSH:
907 tmp_known_hosts = None
909 # Don't bother with localhost. Makes test easier
910 '-o', 'NoHostAuthenticationForLocalhost=yes',
915 args.append('-p%d' % port)
917 args.extend(('-i', ident_key))
921 # Create a temporary server key file
922 tmp_known_hosts = _make_server_key_args(
923 server_key, host, port, args)
926 args = [ "/bin/bash", "-c", cmd ]
928 # connects to the remote host and starts a remote
929 proc = subprocess.Popen(args,
931 stdout = subprocess.PIPE,
932 stdin = subprocess.PIPE,
933 stderr = subprocess.PIPE)
935 if communication == DC.ACCESS_SSH:
936 proc._known_hosts = tmp_known_hosts
938 # send the command to execute
939 os.write(proc.stdin.fileno(),
940 base64.b64encode(python_code) + "\n")
944 msg = os.read(proc.stdout.fileno(), 3)
947 if e.errno == errno.EINTR:
953 raise RuntimeError, "Failed to start remote python interpreter: \nout:\n%s%s\nerr:\n%s" % (
954 msg, proc.stdout.read(), proc.stderr.read())
959 def _communicate(self, input, timeout=None, err_on_timeout=True):
962 stdout = None # Return
963 stderr = None # Return
967 if timeout is not None:
968 timelimit = time.time() + timeout
969 killtime = timelimit + 4
970 bailtime = timelimit + 4
973 # Flush stdio buffer. This might block, if the user has
974 # been writing to .stdin in an uncontrolled fashion.
977 write_set.append(self.stdin)
981 read_set.append(self.stdout)
984 read_set.append(self.stderr)
988 while read_set or write_set:
989 if timeout is not None:
990 curtime = time.time()
991 if timeout is None or curtime > timelimit:
992 if curtime > bailtime:
994 elif curtime > killtime:
995 signum = signal.SIGKILL
997 signum = signal.SIGTERM
999 os.kill(self.pid, signum)
1000 select_timeout = 0.5
1002 select_timeout = timelimit - curtime + 0.1
1004 select_timeout = None
1007 rlist, wlist, xlist = select.select(read_set, write_set, [], select_timeout)
1008 except select.error,e:
1014 if self.stdin in wlist:
1015 # When select has indicated that the file is writable,
1016 # we can write up to PIPE_BUF bytes without risk
1017 # blocking. POSIX defines PIPE_BUF >= 512
1018 bytes_written = os.write(self.stdin.fileno(), buffer(input, input_offset, 512))
1019 input_offset += bytes_written
1020 if input_offset >= len(input):
1022 write_set.remove(self.stdin)
1024 if self.stdout in rlist:
1025 data = os.read(self.stdout.fileno(), 1024)
1028 read_set.remove(self.stdout)
1031 if self.stderr in rlist:
1032 data = os.read(self.stderr.fileno(), 1024)
1035 read_set.remove(self.stderr)
1038 # All data exchanged. Translate lists into strings.
1039 if stdout is not None:
1040 stdout = ''.join(stdout)
1041 if stderr is not None:
1042 stderr = ''.join(stderr)
1044 # Translate newlines, if requested. We cannot let the file
1045 # object do the translation: It is based on stdio, which is
1046 # impossible to combine with select (unless forcing no
1048 if self.universal_newlines and hasattr(file, 'newlines'):
1050 stdout = self._translate_newlines(stdout)
1052 stderr = self._translate_newlines(stderr)
1054 if killed and err_on_timeout:
1055 errcode = self.poll()
1056 raise RuntimeError, ("Operation timed out", errcode, stdout, stderr)
1062 return (stdout, stderr)