2 # -*- coding: utf-8 -*-
19 CTRL_SOCK = "ctrl.sock"
20 STD_ERR = "stderr.log"
27 TRACE = os.environ.get("NEPI_TRACE", "false").lower() in ("true", "1", "on")
29 if hasattr(os, "devnull"):
32 DEV_NULL = "/dev/null"
36 SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
39 """ Escapes strings so that they are safe to use as command-line arguments """
40 if SHELL_SAFE.match(s):
41 # safe string - no escaping needed
44 # unsafe string - escape
45 s = s.replace("'","\\'")
49 def __init__(self, root_dir = ".", log_level = ERROR_LEVEL):
50 self._root_dir = root_dir
52 self._ctrl_sock = None
53 self._log_level = log_level
62 # can not return normally after fork beacuse no exec was done.
63 # This means that if we don't do a os._exit(0) here the code that
64 # follows the call to "Server.run()" in the "caller code" will be
65 # executed... but by now it has already been executed after the
66 # first process (the one that did the first fork) returned.
74 # pipes for process synchronization
82 # os.waitpid avoids leaving a <defunc> (zombie) process
83 st = os.waitpid(pid1, 0)[1]
85 raise RuntimeError("Daemonization failed")
86 # return 0 to inform the caller method that this is not the
91 # Decouple from parent environment.
92 os.chdir(self._root_dir)
99 # see ref: "os._exit(0)"
102 # close all open file descriptors.
103 max_fd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
104 if (max_fd == resource.RLIM_INFINITY):
106 for fd in range(3, max_fd):
113 # Redirect standard file descriptors.
114 stdin = open(DEV_NULL, "r")
115 stderr = stdout = open(STD_ERR, "a", 0)
116 os.dup2(stdin.fileno(), sys.stdin.fileno())
117 # NOTE: sys.stdout.write will still be buffered, even if the file
118 # was opened with 0 buffer
119 os.dup2(stdout.fileno(), sys.stdout.fileno())
120 os.dup2(stderr.fileno(), sys.stderr.fileno())
122 # create control socket
123 self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
124 self._ctrl_sock.bind(CTRL_SOCK)
125 self._ctrl_sock.listen(0)
127 # let the parent process know that the daemonization is finished
132 def post_daemonize(self):
136 while not self._stop:
137 conn, addr = self._ctrl_sock.accept()
139 while not self._stop:
141 msg = self.recv_msg(conn)
142 except socket.timeout, e:
147 reply = self.stop_action()
149 reply = self.reply_action(msg)
152 self.send_reply(conn, reply)
155 self.log_error("NOTICE: Awaiting for reconnection")
163 def recv_msg(self, conn):
167 chunk = conn.recv(1024)
169 if e.errno != errno.EINTR:
175 if chunk[-1] == "\n":
180 decoded = base64.b64decode(data)
181 return decoded.rstrip()
183 def send_reply(self, conn, reply):
184 encoded = base64.b64encode(reply)
185 conn.send("%s\n" % encoded)
189 self._ctrl_sock.close()
194 def stop_action(self):
195 return "Stopping server"
197 def reply_action(self, msg):
198 return "Reply to: %s" % msg
200 def log_error(self, text = None, context = ''):
202 text = traceback.format_exc()
203 date = time.strftime("%Y-%m-%d %H:%M:%S")
205 context = " (%s)" % (context,)
206 sys.stderr.write("ERROR%s: %s\n%s\n" % (context, date, text))
209 def log_debug(self, text):
210 if self._log_level == DEBUG_LEVEL:
211 date = time.strftime("%Y-%m-%d %H:%M:%S")
212 sys.stderr.write("DEBUG: %s\n%s\n" % (date, text))
214 class Forwarder(object):
215 def __init__(self, root_dir = "."):
216 self._ctrl_sock = None
217 self._root_dir = root_dir
222 print >>sys.stderr, "READY."
223 while not self._stop:
224 data = self.read_data()
225 self.send_to_server(data)
226 data = self.recv_from_server()
227 self.write_data(data)
231 return sys.stdin.readline()
233 def write_data(self, data):
234 sys.stdout.write(data)
235 # sys.stdout.write is buffered, this is why we need to do a flush()
238 def send_to_server(self, data):
240 self._ctrl_sock.send(data)
242 if e.errno == errno.EPIPE:
244 self._ctrl_sock.send(data)
247 encoded = data.rstrip()
248 msg = base64.b64decode(encoded)
252 def recv_from_server(self):
256 chunk = self._ctrl_sock.recv(1024)
258 if e.errno != errno.EINTR:
263 if chunk[-1] == "\n":
269 self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
270 sock_addr = os.path.join(self._root_dir, CTRL_SOCK)
271 self._ctrl_sock.connect(sock_addr)
273 def disconnect(self):
275 self._ctrl_sock.close()
279 class Client(object):
280 def __init__(self, root_dir = ".", host = None, port = None, user = None,
282 self.root_dir = root_dir
283 self.addr = (host, port)
286 self._stopped = False
290 if self._process.poll() is None:
291 os.kill(self._process.pid, signal.SIGTERM)
295 root_dir = self.root_dir
296 (host, port) = self.addr
300 python_code = "from nepi.util import server;c=server.Forwarder(%r);\
301 c.forward()" % (root_dir,)
303 self._process = popen_ssh_subprocess(python_code, host, port,
305 # popen_ssh_subprocess already waits for readiness
307 self._process = subprocess.Popen(
308 ["python", "-c", python_code],
309 stdin = subprocess.PIPE,
310 stdout = subprocess.PIPE,
311 stderr = subprocess.PIPE
314 # Wait for the forwarder to be ready, otherwise nobody
315 # will be able to connect to it
316 helo = self._process.stderr.readline()
317 if helo != 'READY.\n':
318 raise AssertionError, "Expected 'Ready.', got %r: %s" % (helo,
319 helo + self._process.stderr.read())
321 def send_msg(self, msg):
322 encoded = base64.b64encode(msg)
323 data = "%s\n" % encoded
326 self._process.stdin.write(data)
327 except (IOError, ValueError):
328 # dead process, poll it to un-zombify
331 # try again after reconnect
332 # If it fails again, though, give up
334 self._process.stdin.write(data)
337 self.send_msg(STOP_MSG)
340 def read_reply(self):
341 data = self._process.stdout.readline()
342 encoded = data.rstrip()
343 return base64.b64decode(encoded)
345 def _make_server_key_args(server_key, host, port, args):
347 Returns a reference to the created temporary file, and adds the
348 corresponding arguments to the given argument list.
350 Make sure to hold onto it until the process is done with the file
353 host = '%s:%s' % (host,port)
354 # Create a temporary server key file
355 tmp_known_hosts = tempfile.NamedTemporaryFile()
357 # Add the intended host key
358 tmp_known_hosts.write('%s,%s %s\n' % (host, socket.gethostbyname(host), server_key))
360 # If we're not in strict mode, add user-configured keys
361 if os.environ.get('NEPI_STRICT_AUTH_MODE',"").lower() not in ('1','true','on'):
362 user_hosts_path = '%s/.ssh/known_hosts' % (os.environ.get('HOME',""),)
363 if os.access(user_hosts_path, os.R_OK):
364 f = open(user_hosts_path, "r")
365 tmp_known_hosts.write(f.read())
368 tmp_known_hosts.flush()
370 args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
371 return tmp_known_hosts
373 def popen_ssh_command(command, host, port, user, agent,
379 Executes a remote commands, returns ((stdout,stderr),process)
382 print "ssh", host, command
384 tmp_known_hosts = None
386 # Don't bother with localhost. Makes test easier
387 '-o', 'NoHostAuthenticationForLocalhost=yes',
392 args.append('-p%d' % port)
394 args.extend(('-i', ident_key))
398 # Create a temporary server key file
399 tmp_known_hosts = _make_server_key_args(
400 server_key, host, port, args)
403 # connects to the remote host and starts a remote connection
404 proc = subprocess.Popen(args,
405 stdout = subprocess.PIPE,
406 stdin = subprocess.PIPE,
407 stderr = subprocess.PIPE)
409 # attach tempfile object to the process, to make sure the file stays
410 # alive until the process is finished with it
411 proc._known_hosts = tmp_known_hosts
413 out, err = proc.communicate(stdin)
415 print " -> ", out, err
417 return ((out, err), proc)
419 def popen_scp(source, dest,
426 Copies from/to remote sites.
428 Source and destination should have the user and host encoded
431 If source is a file object, a special mode will be used to
432 create the remote file with the same contents.
434 If dest is a file object, the remote file (source) will be
435 read and written into dest.
437 In these modes, recursive cannot be True.
439 Source can be a list of files to copy to a single destination,
440 in which case it is advised that the destination be a folder.
444 print "scp", source, dest
446 if isinstance(source, file) or isinstance(dest, file) \
447 or hasattr(source, 'read') or hasattr(dest, 'write'):
450 # Parse source/destination as <user>@<server>:<path>
451 if isinstance(dest, basestring) and ':' in dest:
452 remspec, path = dest.split(':',1)
453 elif isinstance(source, basestring) and ':' in source:
454 remspec, path = source.split(':',1)
456 raise ValueError, "Both endpoints cannot be local"
457 user,host = remspec.rsplit('@',1)
458 tmp_known_hosts = None
460 args = ['ssh', '-l', user, '-C',
461 # Don't bother with localhost. Makes test easier
462 '-o', 'NoHostAuthenticationForLocalhost=yes',
465 args.append('-P%d' % port)
467 args.extend(('-i', ident_key))
469 # Create a temporary server key file
470 tmp_known_hosts = _make_server_key_args(
471 server_key, host, port, args)
473 if isinstance(source, file) or hasattr(source, 'read'):
474 args.append('cat > %s' % (shell_escape(path),))
475 elif isinstance(dest, file) or hasattr(dest, 'write'):
476 args.append('cat %s' % (shell_escape(path),))
478 raise AssertionError, "Unreachable code reached! :-Q"
480 # connects to the remote host and starts a remote connection
481 if isinstance(source, file):
482 proc = subprocess.Popen(args,
483 stdout = open('/dev/null','w'),
484 stderr = subprocess.PIPE,
486 err = proc.stderr.read()
487 proc._known_hosts = tmp_known_hosts
489 return ((None,err), proc)
490 elif isinstance(dest, file):
491 proc = subprocess.Popen(args,
492 stdout = open('/dev/null','w'),
493 stderr = subprocess.PIPE,
495 err = proc.stderr.read()
496 proc._known_hosts = tmp_known_hosts
498 return ((None,err), proc)
499 elif hasattr(source, 'read'):
500 # file-like (but not file) source
501 proc = subprocess.Popen(args,
502 stdout = open('/dev/null','w'),
503 stderr = subprocess.PIPE,
504 stdin = subprocess.PIPE)
510 buf = source.read(4096)
515 rdrdy, wrdy, broken = select.select(
518 [proc.stderr,proc.stdin])
520 if proc.stderr in rdrdy:
521 # use os.read for fully unbuffered behavior
522 err.append(os.read(proc.stderr.fileno(), 4096))
524 if proc.stdin in wrdy:
525 proc.stdin.write(buf)
531 err.append(proc.stderr.read())
533 proc._known_hosts = tmp_known_hosts
535 return ((None,''.join(err)), proc)
536 elif hasattr(dest, 'write'):
537 # file-like (but not file) dest
538 proc = subprocess.Popen(args,
539 stdout = subprocess.PIPE,
540 stderr = subprocess.PIPE,
541 stdin = open('/dev/null','w'))
546 rdrdy, wrdy, broken = select.select(
547 [proc.stderr, proc.stdout],
549 [proc.stderr, proc.stdout])
551 if proc.stderr in rdrdy:
552 # use os.read for fully unbuffered behavior
553 err.append(os.read(proc.stderr.fileno(), 4096))
555 if proc.stdout in rdrdy:
556 # use os.read for fully unbuffered behavior
557 buf = os.read(proc.stdout.fileno(), 4096)
566 err.append(proc.stderr.read())
568 proc._known_hosts = tmp_known_hosts
570 return ((None,''.join(err)), proc)
572 raise AssertionError, "Unreachable code reached! :-Q"
574 # Parse destination as <user>@<server>:<path>
575 if isinstance(dest, basestring) and ':' in dest:
576 remspec, path = dest.split(':',1)
577 elif isinstance(source, basestring) and ':' in source:
578 remspec, path = source.split(':',1)
580 raise ValueError, "Both endpoints cannot be local"
581 user,host = remspec.rsplit('@',1)
584 tmp_known_hosts = None
585 args = ['scp', '-q', '-p', '-C',
586 # Don't bother with localhost. Makes test easier
587 '-o', 'NoHostAuthenticationForLocalhost=yes' ]
589 args.append('-P%d' % port)
593 args.extend(('-i', ident_key))
595 # Create a temporary server key file
596 tmp_known_hosts = _make_server_key_args(
597 server_key, host, port, args)
598 if isinstance(source,list):
604 # connects to the remote host and starts a remote connection
605 proc = subprocess.Popen(args,
606 stdout = subprocess.PIPE,
607 stdin = subprocess.PIPE,
608 stderr = subprocess.PIPE)
609 proc._known_hosts = tmp_known_hosts
611 comm = proc.communicate()
615 def popen_ssh_subprocess(python_code, host, port, user, agent,
621 python_path.replace("'", r"'\''")
622 cmd = """PYTHONPATH="$PYTHONPATH":'%s' """ % python_path
625 # Uncomment for debug (to run everything under strace)
626 # We had to verify if strace works (cannot nest them)
627 #cmd += "if strace echo >/dev/null 2>&1; then CMD='strace -ff -tt -s 200 -o strace.out'; else CMD=''; fi\n"
629 #if self.mode == MODE_SSH:
630 # cmd += "strace -f -tt -s 200 -o strace$$.out "
632 cmd += "import base64, os\n"
633 cmd += "cmd = \"\"\n"
634 cmd += "while True:\n"
635 cmd += " cmd += os.read(0, 1)\n" # one byte from stdin
636 cmd += " if cmd[-1] == \"\\n\": break\n"
637 cmd += "cmd = base64.b64decode(cmd)\n"
638 # Uncomment for debug
639 #cmd += "os.write(2, \"Executing python code: %s\\n\" % cmd)\n"
640 cmd += "os.write(1, \"OK\\n\")\n" # send a sync message
641 cmd += "exec(cmd)\n'"
643 tmp_known_hosts = None
645 # Don't bother with localhost. Makes test easier
646 '-o', 'NoHostAuthenticationForLocalhost=yes',
651 args.append('-p%d' % port)
653 args.extend(('-i', ident_key))
657 # Create a temporary server key file
658 tmp_known_hosts = _make_server_key_args(
659 server_key, host, port, args)
662 # connects to the remote host and starts a remote rpyc connection
663 proc = subprocess.Popen(args,
664 stdout = subprocess.PIPE,
665 stdin = subprocess.PIPE,
666 stderr = subprocess.PIPE)
667 proc._known_hosts = tmp_known_hosts
669 # send the command to execute
670 os.write(proc.stdin.fileno(),
671 base64.b64encode(python_code) + "\n")
672 msg = os.read(proc.stdout.fileno(), 3)
674 raise RuntimeError("Failed to start remote python interpreter")