2 # -*- coding: utf-8 -*-
4 from nepi.util.constants import DeploymentConfiguration as DC
26 CTRL_SOCK = "ctrl.sock"
27 STD_ERR = "stderr.log"
34 TRACE = os.environ.get("NEPI_TRACE", "false").lower() in ("true", "1", "on")
36 if hasattr(os, "devnull"):
39 DEV_NULL = "/dev/null"
41 SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
44 """ Escapes strings so that they are safe to use as command-line arguments """
45 if SHELL_SAFE.match(s):
46 # safe string - no escaping needed
49 # unsafe string - escape
51 if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",):
54 return "'$'\\x%02x''" % (ord(c),)
55 s = ''.join(map(escp,s))
58 def eintr_retry(func):
60 @functools.wraps(func)
62 retry = kw.pop("_retry", False)
63 for i in xrange(0 if retry else 4):
66 except (select.error, socket.error), args:
67 if args[0] == errno.EINTR:
72 if e.errno == errno.EINTR:
81 def __init__(self, root_dir = ".", log_level = ERROR_LEVEL, environment_setup = ""):
82 self._root_dir = root_dir
84 self._ctrl_sock = None
85 self._log_level = log_level
87 self._environment_setup = environment_setup
96 # can not return normally after fork beacuse no exec was done.
97 # This means that if we don't do a os._exit(0) here the code that
98 # follows the call to "Server.run()" in the "caller code" will be
99 # executed... but by now it has already been executed after the
100 # first process (the one that did the first fork) returned.
103 print >>sys.stderr, "SERVER_ERROR."
107 print >>sys.stderr, "SERVER_READY."
110 # pipes for process synchronization
114 root = os.path.normpath(self._root_dir)
115 if not os.path.exists(root):
116 os.makedirs(root, 0755)
124 except OSError, e: # pragma: no cover
125 if e.errno == errno.EINTR:
131 # os.waitpid avoids leaving a <defunc> (zombie) process
132 st = os.waitpid(pid1, 0)[1]
134 raise RuntimeError("Daemonization failed")
135 # return 0 to inform the caller method that this is not the
140 # Decouple from parent environment.
141 os.chdir(self._root_dir)
148 # see ref: "os._exit(0)"
151 # close all open file descriptors.
152 max_fd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
153 if (max_fd == resource.RLIM_INFINITY):
155 for fd in range(3, max_fd):
162 # Redirect standard file descriptors.
163 stdin = open(DEV_NULL, "r")
164 stderr = stdout = open(STD_ERR, "a", 0)
165 os.dup2(stdin.fileno(), sys.stdin.fileno())
166 # NOTE: sys.stdout.write will still be buffered, even if the file
167 # was opened with 0 buffer
168 os.dup2(stdout.fileno(), sys.stdout.fileno())
169 os.dup2(stderr.fileno(), sys.stderr.fileno())
172 if self._environment_setup:
173 # parse environment variables and pass to child process
174 # do it by executing shell commands, in case there's some heavy setup involved
175 envproc = subprocess.Popen(
177 "( %s python -c 'import os,sys ; print \"\\x01\".join(\"\\x02\".join(map(str,x)) for x in os.environ.iteritems())' ) | tail -1" %
178 ( self._environment_setup, ) ],
179 stdin = subprocess.PIPE,
180 stdout = subprocess.PIPE,
181 stderr = subprocess.PIPE
183 out,err = envproc.communicate()
185 # parse new environment
187 environment = dict(map(lambda x:x.split("\x02"), out.split("\x01")))
189 # apply to current environment
190 for name, value in environment.iteritems():
191 os.environ[name] = value
194 if 'PYTHONPATH' in environment:
195 sys.path = environment['PYTHONPATH'].split(':') + sys.path
197 # create control socket
198 self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
199 self._ctrl_sock.bind(CTRL_SOCK)
200 self._ctrl_sock.listen(0)
202 # let the parent process know that the daemonization is finished
207 def post_daemonize(self):
208 # QT, for some strange reason, redefines the SIGCHILD handler to write
209 # a \0 to a fd (lets say fileno 'x'), when ever a SIGCHILD is received.
210 # Server dameonization closes all file descriptors from fileno '3',
211 # but the overloaded handler (inherited by the forked process) will
212 # keep trying to write the \0 to fileno 'x', which might have been reused
213 # after closing, for other operations. This is bad bad bad when fileno 'x'
214 # is in use for communication pouroses, because unexpected \0 start
215 # appearing in the communication messages... this is exactly what happens
216 # when using netns in daemonized form. Thus, be have no other alternative than
217 # restoring the SIGCHLD handler to the default here.
219 signal.signal(signal.SIGCHLD, signal.SIG_DFL)
222 while not self._stop:
223 conn, addr = self._ctrl_sock.accept()
225 while not self._stop:
227 msg = self.recv_msg(conn)
228 except socket.timeout, e:
234 reply = self.stop_action()
236 reply = self.reply_action(msg)
239 self.send_reply(conn, reply)
242 self.log_error("NOTICE: Awaiting for reconnection")
250 def recv_msg(self, conn):
253 while '\n' not in chunk:
255 chunk = conn.recv(1024)
256 except (OSError, socket.error), e:
257 if e[0] != errno.EINTR:
266 data = ''.join(data).split('\n',1)
269 data, self._rdbuf = data
271 decoded = base64.b64decode(data)
272 return decoded.rstrip()
274 def send_reply(self, conn, reply):
275 encoded = base64.b64encode(reply)
276 conn.send("%s\n" % encoded)
280 self._ctrl_sock.close()
285 def stop_action(self):
286 return "Stopping server"
288 def reply_action(self, msg):
289 return "Reply to: %s" % msg
291 def log_error(self, text = None, context = ''):
293 text = traceback.format_exc()
294 date = time.strftime("%Y-%m-%d %H:%M:%S")
296 context = " (%s)" % (context,)
297 sys.stderr.write("ERROR%s: %s\n%s\n" % (context, date, text))
300 def log_debug(self, text):
301 if self._log_level == DEBUG_LEVEL:
302 date = time.strftime("%Y-%m-%d %H:%M:%S")
303 sys.stderr.write("DEBUG: %s\n%s\n" % (date, text))
305 class Forwarder(object):
306 def __init__(self, root_dir = "."):
307 self._ctrl_sock = None
308 self._root_dir = root_dir
314 print >>sys.stderr, "FORWARDER_READY."
315 while not self._stop:
316 data = self.read_data()
317 self.send_to_server(data)
318 data = self.recv_from_server()
319 self.write_data(data)
323 return sys.stdin.readline()
325 def write_data(self, data):
326 sys.stdout.write(data)
327 # sys.stdout.write is buffered, this is why we need to do a flush()
330 def send_to_server(self, data):
332 self._ctrl_sock.send(data)
333 except (IOError, socket.error), e:
334 if e[0] == errno.EPIPE:
336 self._ctrl_sock.send(data)
339 encoded = data.rstrip()
340 msg = base64.b64decode(encoded)
344 def recv_from_server(self):
347 while '\n' not in chunk:
349 chunk = self._ctrl_sock.recv(1024)
350 except (OSError, socket.error), e:
351 if e[0] != errno.EINTR:
359 data = ''.join(data).split('\n',1)
362 data, self._rdbuf = data
368 self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
369 sock_addr = os.path.join(self._root_dir, CTRL_SOCK)
370 self._ctrl_sock.connect(sock_addr)
372 def disconnect(self):
374 self._ctrl_sock.close()
378 class Client(object):
379 def __init__(self, root_dir = ".", host = None, port = None, user = None,
380 agent = None, sudo = False, communication = DC.ACCESS_LOCAL,
381 environment_setup = ""):
382 self.root_dir = root_dir
383 self.addr = (host, port)
387 self.communication = communication
388 self.environment_setup = environment_setup
389 self._stopped = False
390 self._deferreds = collections.deque()
394 if self._process.poll() is None:
395 os.kill(self._process.pid, signal.SIGTERM)
399 root_dir = self.root_dir
400 (host, port) = self.addr
404 communication = self.communication
406 python_code = "from nepi.util import server;c=server.Forwarder(%r);\
407 c.forward()" % (root_dir,)
409 self._process = popen_python(python_code,
410 communication = communication,
416 environment_setup = self.environment_setup)
418 # Wait for the forwarder to be ready, otherwise nobody
419 # will be able to connect to it
420 helo = self._process.stderr.readline()
421 if helo != 'FORWARDER_READY.\n':
422 raise AssertionError, "Expected 'FORWARDER_READY.', got %r: %s" % (helo,
423 helo + self._process.stderr.read())
425 def send_msg(self, msg):
426 encoded = base64.b64encode(msg)
427 data = "%s\n" % encoded
430 self._process.stdin.write(data)
431 except (IOError, ValueError):
432 # dead process, poll it to un-zombify
435 # try again after reconnect
436 # If it fails again, though, give up
438 self._process.stdin.write(data)
441 self.send_msg(STOP_MSG)
444 def defer_reply(self, transform=None):
446 self._deferreds.append(defer_entry)
448 functools.partial(self.read_reply, defer_entry, transform)
451 def _read_reply(self):
452 data = self._process.stdout.readline()
453 encoded = data.rstrip()
455 # empty == eof == dead process, poll it to un-zombify
458 raise RuntimeError, "Forwarder died while awaiting reply: %s" % (self._process.stderr.read(),)
459 return base64.b64decode(encoded)
461 def read_reply(self, which=None, transform=None):
462 # Test to see if someone did it already
463 if which is not None and len(which):
465 # ...just return the deferred value
467 return transform(which[0])
471 # Process all deferreds until the one we're looking for
472 # or until the queue is empty
473 while self._deferreds:
475 deferred = self._deferreds.popleft()
480 deferred.append(self._read_reply())
481 if deferred is which:
482 # We reached the one we were looking for
484 return transform(deferred[0])
489 # They've requested a synchronous read
491 return transform(self._read_reply())
493 return self._read_reply()
495 def _make_server_key_args(server_key, host, port, args):
497 Returns a reference to the created temporary file, and adds the
498 corresponding arguments to the given argument list.
500 Make sure to hold onto it until the process is done with the file
503 host = '%s:%s' % (host,port)
504 # Create a temporary server key file
505 tmp_known_hosts = tempfile.NamedTemporaryFile()
507 # Add the intended host key
508 tmp_known_hosts.write('%s,%s %s\n' % (host, socket.gethostbyname(host), server_key))
510 # If we're not in strict mode, add user-configured keys
511 if os.environ.get('NEPI_STRICT_AUTH_MODE',"").lower() not in ('1','true','on'):
512 user_hosts_path = '%s/.ssh/known_hosts' % (os.environ.get('HOME',""),)
513 if os.access(user_hosts_path, os.R_OK):
514 f = open(user_hosts_path, "r")
515 tmp_known_hosts.write(f.read())
518 tmp_known_hosts.flush()
520 args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
522 return tmp_known_hosts
524 def popen_ssh_command(command, host, port, user, agent,
531 err_on_timeout = True):
533 Executes a remote commands, returns ((stdout,stderr),process)
536 print "ssh", host, command
538 tmp_known_hosts = None
540 # Don't bother with localhost. Makes test easier
541 '-o', 'NoHostAuthenticationForLocalhost=yes',
546 args.append('-p%d' % port)
548 args.extend(('-i', ident_key))
552 # Create a temporary server key file
553 tmp_known_hosts = _make_server_key_args(
554 server_key, host, port, args)
558 # connects to the remote host and starts a remote connection
559 proc = subprocess.Popen(args,
560 stdout = subprocess.PIPE,
561 stdin = subprocess.PIPE,
562 stderr = subprocess.PIPE)
564 # attach tempfile object to the process, to make sure the file stays
565 # alive until the process is finished with it
566 proc._known_hosts = tmp_known_hosts
569 out, err = _communicate(proc, stdin, timeout, err_on_timeout)
571 except RuntimeError,e:
575 print " timedout -> ", e.args
579 print " -> ", out, err
581 return ((out, err), proc)
583 def popen_scp(source, dest,
590 Copies from/to remote sites.
592 Source and destination should have the user and host encoded
595 If source is a file object, a special mode will be used to
596 create the remote file with the same contents.
598 If dest is a file object, the remote file (source) will be
599 read and written into dest.
601 In these modes, recursive cannot be True.
603 Source can be a list of files to copy to a single destination,
604 in which case it is advised that the destination be a folder.
608 print "scp", source, dest
610 if isinstance(source, file) and source.tell() == 0:
612 elif hasattr(source, 'read'):
613 tmp = tempfile.NamedTemporaryFile()
615 buf = source.read(65536)
623 if isinstance(source, file) or isinstance(dest, file) \
624 or hasattr(source, 'read') or hasattr(dest, 'write'):
627 # Parse source/destination as <user>@<server>:<path>
628 if isinstance(dest, basestring) and ':' in dest:
629 remspec, path = dest.split(':',1)
630 elif isinstance(source, basestring) and ':' in source:
631 remspec, path = source.split(':',1)
633 raise ValueError, "Both endpoints cannot be local"
634 user,host = remspec.rsplit('@',1)
635 tmp_known_hosts = None
637 args = ['ssh', '-l', user, '-C',
638 # Don't bother with localhost. Makes test easier
639 '-o', 'NoHostAuthenticationForLocalhost=yes',
642 args.append('-P%d' % port)
644 args.extend(('-i', ident_key))
646 # Create a temporary server key file
647 tmp_known_hosts = _make_server_key_args(
648 server_key, host, port, args)
650 if isinstance(source, file) or hasattr(source, 'read'):
651 args.append('cat > %s' % (shell_escape(path),))
652 elif isinstance(dest, file) or hasattr(dest, 'write'):
653 args.append('cat %s' % (shell_escape(path),))
655 raise AssertionError, "Unreachable code reached! :-Q"
657 # connects to the remote host and starts a remote connection
658 if isinstance(source, file):
659 proc = subprocess.Popen(args,
660 stdout = open('/dev/null','w'),
661 stderr = subprocess.PIPE,
663 err = proc.stderr.read()
664 proc._known_hosts = tmp_known_hosts
665 eintr_retry(proc.wait)()
666 return ((None,err), proc)
667 elif isinstance(dest, file):
668 proc = subprocess.Popen(args,
669 stdout = open('/dev/null','w'),
670 stderr = subprocess.PIPE,
672 err = proc.stderr.read()
673 proc._known_hosts = tmp_known_hosts
674 eintr_retry(proc.wait)()
675 return ((None,err), proc)
676 elif hasattr(source, 'read'):
677 # file-like (but not file) source
678 proc = subprocess.Popen(args,
679 stdout = open('/dev/null','w'),
680 stderr = subprocess.PIPE,
681 stdin = subprocess.PIPE)
687 buf = source.read(4096)
692 rdrdy, wrdy, broken = select.select(
695 [proc.stderr,proc.stdin])
697 if proc.stderr in rdrdy:
698 # use os.read for fully unbuffered behavior
699 err.append(os.read(proc.stderr.fileno(), 4096))
701 if proc.stdin in wrdy:
702 proc.stdin.write(buf)
708 err.append(proc.stderr.read())
710 proc._known_hosts = tmp_known_hosts
711 eintr_retry(proc.wait)()
712 return ((None,''.join(err)), proc)
713 elif hasattr(dest, 'write'):
714 # file-like (but not file) dest
715 proc = subprocess.Popen(args,
716 stdout = subprocess.PIPE,
717 stderr = subprocess.PIPE,
718 stdin = open('/dev/null','w'))
723 rdrdy, wrdy, broken = select.select(
724 [proc.stderr, proc.stdout],
726 [proc.stderr, proc.stdout])
728 if proc.stderr in rdrdy:
729 # use os.read for fully unbuffered behavior
730 err.append(os.read(proc.stderr.fileno(), 4096))
732 if proc.stdout in rdrdy:
733 # use os.read for fully unbuffered behavior
734 buf = os.read(proc.stdout.fileno(), 4096)
743 err.append(proc.stderr.read())
745 proc._known_hosts = tmp_known_hosts
746 eintr_retry(proc.wait)()
747 return ((None,''.join(err)), proc)
749 raise AssertionError, "Unreachable code reached! :-Q"
751 # Parse destination as <user>@<server>:<path>
752 if isinstance(dest, basestring) and ':' in dest:
753 remspec, path = dest.split(':',1)
754 elif isinstance(source, basestring) and ':' in source:
755 remspec, path = source.split(':',1)
757 raise ValueError, "Both endpoints cannot be local"
758 user,host = remspec.rsplit('@',1)
761 tmp_known_hosts = None
762 args = ['scp', '-q', '-p', '-C',
763 # Don't bother with localhost. Makes test easier
764 '-o', 'NoHostAuthenticationForLocalhost=yes' ]
766 args.append('-P%d' % port)
770 args.extend(('-i', ident_key))
772 # Create a temporary server key file
773 tmp_known_hosts = _make_server_key_args(
774 server_key, host, port, args)
775 if isinstance(source,list):
781 # connects to the remote host and starts a remote connection
782 proc = subprocess.Popen(args,
783 stdout = subprocess.PIPE,
784 stdin = subprocess.PIPE,
785 stderr = subprocess.PIPE)
786 proc._known_hosts = tmp_known_hosts
788 comm = proc.communicate()
789 eintr_retry(proc.wait)()
792 def decode_and_execute():
793 # The python code we want to execute might have characters that
794 # are not compatible with the 'inline' mode we are using. To avoid
795 # problems we receive the encoded python code in base64 as a input
796 # stream and decode it for execution.
800 cmd += os.read(0, 1)# one byte from stdin
803 cmd = base64.b64decode(cmd)
804 # Uncomment for debug
805 #os.write(2, "Executing python code: %s\n" % cmd)
806 os.write(1, "OK\n") # send a sync message
809 def popen_python(python_code,
810 communication = DC.ACCESS_LOCAL,
820 environment_setup = ""):
826 python_path.replace("'", r"'\''")
827 cmd = """PYTHONPATH="$PYTHONPATH":'%s' """ % python_path
829 if environment_setup:
830 cmd += environment_setup
832 # Uncomment for debug (to run everything under strace)
833 # We had to verify if strace works (cannot nest them)
834 #cmd += "if strace echo >/dev/null 2>&1; then CMD='strace -ff -tt -s 200 -o strace.out'; else CMD=''; fi\n"
836 #cmd += "strace -f -tt -s 200 -o strace$$.out "
837 cmd += "python -c 'from nepi.util import server; server.decode_and_execute()'"
839 if communication == DC.ACCESS_SSH:
840 tmp_known_hosts = None
842 # Don't bother with localhost. Makes test easier
843 '-o', 'NoHostAuthenticationForLocalhost=yes',
848 args.append('-p%d' % port)
850 args.extend(('-i', ident_key))
854 # Create a temporary server key file
855 tmp_known_hosts = _make_server_key_args(
856 server_key, host, port, args)
862 # connects to the remote host and starts a remote
863 proc = subprocess.Popen(args,
865 stdout = subprocess.PIPE,
866 stdin = subprocess.PIPE,
867 stderr = subprocess.PIPE)
869 if communication == DC.ACCESS_SSH:
870 proc._known_hosts = tmp_known_hosts
872 # send the command to execute
873 os.write(proc.stdin.fileno(),
874 base64.b64encode(python_code) + "\n")
875 msg = os.read(proc.stdout.fileno(), 3)
877 raise RuntimeError, "Failed to start remote python interpreter: \nout:\n%s%s\nerr:\n%s" % (
878 msg, proc.stdout.read(), proc.stderr.read())
883 def _communicate(self, input, timeout=None, err_on_timeout=True):
886 stdout = None # Return
887 stderr = None # Return
891 if timeout is not None:
892 timelimit = time.time() + timeout
893 killtime = timelimit + 4
894 bailtime = timelimit + 4
897 # Flush stdio buffer. This might block, if the user has
898 # been writing to .stdin in an uncontrolled fashion.
901 write_set.append(self.stdin)
905 read_set.append(self.stdout)
908 read_set.append(self.stderr)
912 while read_set or write_set:
913 if timeout is not None:
914 curtime = time.time()
915 if timeout is None or curtime > timelimit:
916 if curtime > bailtime:
918 elif curtime > killtime:
919 signum = signal.SIGKILL
921 signum = signal.SIGTERM
923 os.kill(self.pid, signum)
926 select_timeout = timelimit - curtime + 0.1
928 select_timeout = None
931 rlist, wlist, xlist = select.select(read_set, write_set, [], select_timeout)
932 except select.error,e:
938 if self.stdin in wlist:
939 # When select has indicated that the file is writable,
940 # we can write up to PIPE_BUF bytes without risk
941 # blocking. POSIX defines PIPE_BUF >= 512
942 bytes_written = os.write(self.stdin.fileno(), buffer(input, input_offset, 512))
943 input_offset += bytes_written
944 if input_offset >= len(input):
946 write_set.remove(self.stdin)
948 if self.stdout in rlist:
949 data = os.read(self.stdout.fileno(), 1024)
952 read_set.remove(self.stdout)
955 if self.stderr in rlist:
956 data = os.read(self.stderr.fileno(), 1024)
959 read_set.remove(self.stderr)
962 # All data exchanged. Translate lists into strings.
963 if stdout is not None:
964 stdout = ''.join(stdout)
965 if stderr is not None:
966 stderr = ''.join(stderr)
968 # Translate newlines, if requested. We cannot let the file
969 # object do the translation: It is based on stdio, which is
970 # impossible to combine with select (unless forcing no
972 if self.universal_newlines and hasattr(file, 'newlines'):
974 stdout = self._translate_newlines(stdout)
976 stderr = self._translate_newlines(stderr)
978 if killed and err_on_timeout:
979 errcode = self.poll()
980 raise RuntimeError, ("Operation timed out", errcode, stdout, stderr)
986 return (stdout, stderr)