2 # -*- coding: utf-8 -*-
20 CTRL_SOCK = "ctrl.sock"
21 STD_ERR = "stderr.log"
28 TRACE = os.environ.get("NEPI_TRACE", "false").lower() in ("true", "1", "on")
30 if hasattr(os, "devnull"):
33 DEV_NULL = "/dev/null"
35 SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
38 """ Escapes strings so that they are safe to use as command-line arguments """
39 if SHELL_SAFE.match(s):
40 # safe string - no escaping needed
43 # unsafe string - escape
45 if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",):
48 return "'$'\\x%02x''" % (ord(c),)
49 s = ''.join(map(escp,s))
53 def __init__(self, root_dir = ".", log_level = ERROR_LEVEL):
54 self._root_dir = root_dir
56 self._ctrl_sock = None
57 self._log_level = log_level
66 # can not return normally after fork beacuse no exec was done.
67 # This means that if we don't do a os._exit(0) here the code that
68 # follows the call to "Server.run()" in the "caller code" will be
69 # executed... but by now it has already been executed after the
70 # first process (the one that did the first fork) returned.
78 # pipes for process synchronization
82 root = os.path.normpath(self._root_dir)
83 if not os.path.exists(root):
84 os.makedirs(root, 0755)
92 except OSError, e: # pragma: no cover
93 if e.errno == errno.EINTR:
99 # os.waitpid avoids leaving a <defunc> (zombie) process
100 st = os.waitpid(pid1, 0)[1]
102 raise RuntimeError("Daemonization failed")
103 # return 0 to inform the caller method that this is not the
108 # Decouple from parent environment.
109 os.chdir(self._root_dir)
116 # see ref: "os._exit(0)"
119 # close all open file descriptors.
120 max_fd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
121 if (max_fd == resource.RLIM_INFINITY):
123 for fd in range(3, max_fd):
130 # Redirect standard file descriptors.
131 stdin = open(DEV_NULL, "r")
132 stderr = stdout = open(STD_ERR, "a", 0)
133 os.dup2(stdin.fileno(), sys.stdin.fileno())
134 # NOTE: sys.stdout.write will still be buffered, even if the file
135 # was opened with 0 buffer
136 os.dup2(stdout.fileno(), sys.stdout.fileno())
137 os.dup2(stderr.fileno(), sys.stderr.fileno())
139 # create control socket
140 self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
141 self._ctrl_sock.bind(CTRL_SOCK)
142 self._ctrl_sock.listen(0)
144 # let the parent process know that the daemonization is finished
149 def post_daemonize(self):
153 while not self._stop:
154 conn, addr = self._ctrl_sock.accept()
156 while not self._stop:
158 msg = self.recv_msg(conn)
159 except socket.timeout, e:
164 reply = self.stop_action()
166 reply = self.reply_action(msg)
169 self.send_reply(conn, reply)
172 self.log_error("NOTICE: Awaiting for reconnection")
180 def recv_msg(self, conn):
184 chunk = conn.recv(1024)
186 if e.errno != errno.EINTR:
192 if chunk[-1] == "\n":
197 decoded = base64.b64decode(data)
198 return decoded.rstrip()
200 def send_reply(self, conn, reply):
201 encoded = base64.b64encode(reply)
202 conn.send("%s\n" % encoded)
206 self._ctrl_sock.close()
211 def stop_action(self):
212 return "Stopping server"
214 def reply_action(self, msg):
215 return "Reply to: %s" % msg
217 def log_error(self, text = None, context = ''):
219 text = traceback.format_exc()
220 date = time.strftime("%Y-%m-%d %H:%M:%S")
222 context = " (%s)" % (context,)
223 sys.stderr.write("ERROR%s: %s\n%s\n" % (context, date, text))
226 def log_debug(self, text):
227 if self._log_level == DEBUG_LEVEL:
228 date = time.strftime("%Y-%m-%d %H:%M:%S")
229 sys.stderr.write("DEBUG: %s\n%s\n" % (date, text))
231 class Forwarder(object):
232 def __init__(self, root_dir = "."):
233 self._ctrl_sock = None
234 self._root_dir = root_dir
239 print >>sys.stderr, "READY."
240 while not self._stop:
241 data = self.read_data()
242 self.send_to_server(data)
243 data = self.recv_from_server()
244 self.write_data(data)
248 return sys.stdin.readline()
250 def write_data(self, data):
251 sys.stdout.write(data)
252 # sys.stdout.write is buffered, this is why we need to do a flush()
255 def send_to_server(self, data):
257 self._ctrl_sock.send(data)
259 if e.errno == errno.EPIPE:
261 self._ctrl_sock.send(data)
264 encoded = data.rstrip()
265 msg = base64.b64decode(encoded)
269 def recv_from_server(self):
273 chunk = self._ctrl_sock.recv(1024)
275 if e.errno != errno.EINTR:
280 if chunk[-1] == "\n":
286 self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
287 sock_addr = os.path.join(self._root_dir, CTRL_SOCK)
288 self._ctrl_sock.connect(sock_addr)
290 def disconnect(self):
292 self._ctrl_sock.close()
296 class Client(object):
297 def __init__(self, root_dir = ".", host = None, port = None, user = None,
298 agent = None, environment_setup = ""):
299 self.root_dir = root_dir
300 self.addr = (host, port)
303 self.environment_setup = environment_setup
304 self._stopped = False
308 if self._process.poll() is None:
309 os.kill(self._process.pid, signal.SIGTERM)
313 root_dir = self.root_dir
314 (host, port) = self.addr
318 python_code = "from nepi.util import server;c=server.Forwarder(%r);\
319 c.forward()" % (root_dir,)
321 self._process = popen_ssh_subprocess(python_code, host, port,
323 environment_setup = self.environment_setup)
324 # popen_ssh_subprocess already waits for readiness
325 if self._process.poll():
326 err = proc.stderr.read()
327 raise RuntimeError("Client could not be reached: %s" % \
330 self._process = subprocess.Popen(
331 ["python", "-c", python_code],
332 stdin = subprocess.PIPE,
333 stdout = subprocess.PIPE,
334 stderr = subprocess.PIPE
337 # Wait for the forwarder to be ready, otherwise nobody
338 # will be able to connect to it
339 helo = self._process.stderr.readline()
340 if helo != 'READY.\n':
341 raise AssertionError, "Expected 'Ready.', got %r: %s" % (helo,
342 helo + self._process.stderr.read())
344 def send_msg(self, msg):
345 encoded = base64.b64encode(msg)
346 data = "%s\n" % encoded
349 self._process.stdin.write(data)
350 except (IOError, ValueError):
351 # dead process, poll it to un-zombify
354 # try again after reconnect
355 # If it fails again, though, give up
357 self._process.stdin.write(data)
360 self.send_msg(STOP_MSG)
363 def read_reply(self):
364 data = self._process.stdout.readline()
365 encoded = data.rstrip()
366 return base64.b64decode(encoded)
368 def _make_server_key_args(server_key, host, port, args):
370 Returns a reference to the created temporary file, and adds the
371 corresponding arguments to the given argument list.
373 Make sure to hold onto it until the process is done with the file
376 host = '%s:%s' % (host,port)
377 # Create a temporary server key file
378 tmp_known_hosts = tempfile.NamedTemporaryFile()
380 # Add the intended host key
381 tmp_known_hosts.write('%s,%s %s\n' % (host, socket.gethostbyname(host), server_key))
383 # If we're not in strict mode, add user-configured keys
384 if os.environ.get('NEPI_STRICT_AUTH_MODE',"").lower() not in ('1','true','on'):
385 user_hosts_path = '%s/.ssh/known_hosts' % (os.environ.get('HOME',""),)
386 if os.access(user_hosts_path, os.R_OK):
387 f = open(user_hosts_path, "r")
388 tmp_known_hosts.write(f.read())
391 tmp_known_hosts.flush()
393 args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
394 return tmp_known_hosts
396 def popen_ssh_command(command, host, port, user, agent,
402 Executes a remote commands, returns ((stdout,stderr),process)
405 print "ssh", host, command
407 tmp_known_hosts = None
409 # Don't bother with localhost. Makes test easier
410 '-o', 'NoHostAuthenticationForLocalhost=yes',
415 args.append('-p%d' % port)
417 args.extend(('-i', ident_key))
421 # Create a temporary server key file
422 tmp_known_hosts = _make_server_key_args(
423 server_key, host, port, args)
426 # connects to the remote host and starts a remote connection
427 proc = subprocess.Popen(args,
428 stdout = subprocess.PIPE,
429 stdin = subprocess.PIPE,
430 stderr = subprocess.PIPE)
432 # attach tempfile object to the process, to make sure the file stays
433 # alive until the process is finished with it
434 proc._known_hosts = tmp_known_hosts
436 out, err = proc.communicate(stdin)
438 print " -> ", out, err
440 return ((out, err), proc)
442 def popen_scp(source, dest,
449 Copies from/to remote sites.
451 Source and destination should have the user and host encoded
454 If source is a file object, a special mode will be used to
455 create the remote file with the same contents.
457 If dest is a file object, the remote file (source) will be
458 read and written into dest.
460 In these modes, recursive cannot be True.
462 Source can be a list of files to copy to a single destination,
463 in which case it is advised that the destination be a folder.
467 print "scp", source, dest
469 if isinstance(source, file) or isinstance(dest, file) \
470 or hasattr(source, 'read') or hasattr(dest, 'write'):
473 # Parse source/destination as <user>@<server>:<path>
474 if isinstance(dest, basestring) and ':' in dest:
475 remspec, path = dest.split(':',1)
476 elif isinstance(source, basestring) and ':' in source:
477 remspec, path = source.split(':',1)
479 raise ValueError, "Both endpoints cannot be local"
480 user,host = remspec.rsplit('@',1)
481 tmp_known_hosts = None
483 args = ['ssh', '-l', user, '-C',
484 # Don't bother with localhost. Makes test easier
485 '-o', 'NoHostAuthenticationForLocalhost=yes',
488 args.append('-P%d' % port)
490 args.extend(('-i', ident_key))
492 # Create a temporary server key file
493 tmp_known_hosts = _make_server_key_args(
494 server_key, host, port, args)
496 if isinstance(source, file) or hasattr(source, 'read'):
497 args.append('cat > %s' % (shell_escape(path),))
498 elif isinstance(dest, file) or hasattr(dest, 'write'):
499 args.append('cat %s' % (shell_escape(path),))
501 raise AssertionError, "Unreachable code reached! :-Q"
503 # connects to the remote host and starts a remote connection
504 if isinstance(source, file):
505 proc = subprocess.Popen(args,
506 stdout = open('/dev/null','w'),
507 stderr = subprocess.PIPE,
509 err = proc.stderr.read()
510 proc._known_hosts = tmp_known_hosts
512 return ((None,err), proc)
513 elif isinstance(dest, file):
514 proc = subprocess.Popen(args,
515 stdout = open('/dev/null','w'),
516 stderr = subprocess.PIPE,
518 err = proc.stderr.read()
519 proc._known_hosts = tmp_known_hosts
521 return ((None,err), proc)
522 elif hasattr(source, 'read'):
523 # file-like (but not file) source
524 proc = subprocess.Popen(args,
525 stdout = open('/dev/null','w'),
526 stderr = subprocess.PIPE,
527 stdin = subprocess.PIPE)
533 buf = source.read(4096)
538 rdrdy, wrdy, broken = select.select(
541 [proc.stderr,proc.stdin])
543 if proc.stderr in rdrdy:
544 # use os.read for fully unbuffered behavior
545 err.append(os.read(proc.stderr.fileno(), 4096))
547 if proc.stdin in wrdy:
548 proc.stdin.write(buf)
554 err.append(proc.stderr.read())
556 proc._known_hosts = tmp_known_hosts
558 return ((None,''.join(err)), proc)
559 elif hasattr(dest, 'write'):
560 # file-like (but not file) dest
561 proc = subprocess.Popen(args,
562 stdout = subprocess.PIPE,
563 stderr = subprocess.PIPE,
564 stdin = open('/dev/null','w'))
569 rdrdy, wrdy, broken = select.select(
570 [proc.stderr, proc.stdout],
572 [proc.stderr, proc.stdout])
574 if proc.stderr in rdrdy:
575 # use os.read for fully unbuffered behavior
576 err.append(os.read(proc.stderr.fileno(), 4096))
578 if proc.stdout in rdrdy:
579 # use os.read for fully unbuffered behavior
580 buf = os.read(proc.stdout.fileno(), 4096)
589 err.append(proc.stderr.read())
591 proc._known_hosts = tmp_known_hosts
593 return ((None,''.join(err)), proc)
595 raise AssertionError, "Unreachable code reached! :-Q"
597 # Parse destination as <user>@<server>:<path>
598 if isinstance(dest, basestring) and ':' in dest:
599 remspec, path = dest.split(':',1)
600 elif isinstance(source, basestring) and ':' in source:
601 remspec, path = source.split(':',1)
603 raise ValueError, "Both endpoints cannot be local"
604 user,host = remspec.rsplit('@',1)
607 tmp_known_hosts = None
608 args = ['scp', '-q', '-p', '-C',
609 # Don't bother with localhost. Makes test easier
610 '-o', 'NoHostAuthenticationForLocalhost=yes' ]
612 args.append('-P%d' % port)
616 args.extend(('-i', ident_key))
618 # Create a temporary server key file
619 tmp_known_hosts = _make_server_key_args(
620 server_key, host, port, args)
621 if isinstance(source,list):
627 # connects to the remote host and starts a remote connection
628 proc = subprocess.Popen(args,
629 stdout = subprocess.PIPE,
630 stdin = subprocess.PIPE,
631 stderr = subprocess.PIPE)
632 proc._known_hosts = tmp_known_hosts
634 comm = proc.communicate()
638 def popen_ssh_subprocess(python_code, host, port, user, agent,
643 environment_setup = "",
644 waitcommand = False):
647 python_path.replace("'", r"'\''")
648 cmd = """PYTHONPATH="$PYTHONPATH":'%s' """ % python_path
650 if environment_setup:
651 cmd += environment_setup
653 # Uncomment for debug (to run everything under strace)
654 # We had to verify if strace works (cannot nest them)
655 #cmd += "if strace echo >/dev/null 2>&1; then CMD='strace -ff -tt -s 200 -o strace.out'; else CMD=''; fi\n"
657 #cmd += "strace -f -tt -s 200 -o strace$$.out "
659 cmd += "import base64, os\n"
660 cmd += "cmd = \"\"\n"
661 cmd += "while True:\n"
662 cmd += " cmd += os.read(0, 1)\n" # one byte from stdin
663 cmd += " if cmd[-1] == \"\\n\": break\n"
664 cmd += "cmd = base64.b64decode(cmd)\n"
665 # Uncomment for debug
666 #cmd += "os.write(2, \"Executing python code: %s\\n\" % cmd)\n"
668 cmd += "os.write(1, \"OK\\n\")\n" # send a sync message
671 cmd += "os.write(1, \"OK\\n\")\n" # send a sync message
674 tmp_known_hosts = None
676 # Don't bother with localhost. Makes test easier
677 '-o', 'NoHostAuthenticationForLocalhost=yes',
682 args.append('-p%d' % port)
684 args.extend(('-i', ident_key))
688 # Create a temporary server key file
689 tmp_known_hosts = _make_server_key_args(
690 server_key, host, port, args)
693 # connects to the remote host and starts a remote rpyc connection
694 proc = subprocess.Popen(args,
695 stdout = subprocess.PIPE,
696 stdin = subprocess.PIPE,
697 stderr = subprocess.PIPE)
698 proc._known_hosts = tmp_known_hosts
700 # send the command to execute
701 os.write(proc.stdin.fileno(),
702 base64.b64encode(python_code) + "\n")
703 msg = os.read(proc.stdout.fileno(), 3)
705 raise RuntimeError, "Failed to start remote python interpreter: \nout:\n%s%s\nerr:\n%s" % (
706 msg, proc.stdout.read(), proc.stderr.read())