2 # -*- coding: utf-8 -*-
19 CTRL_SOCK = "ctrl.sock"
20 STD_ERR = "stderr.log"
29 if hasattr(os, "devnull"):
32 DEV_NULL = "/dev/null"
36 SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
39 """ Escapes strings so that they are safe to use as command-line arguments """
40 if SHELL_SAFE.match(s):
41 # safe string - no escaping needed
44 # unsafe string - escape
45 s = s.replace("'","\\'")
49 def __init__(self, root_dir = ".", log_level = ERROR_LEVEL):
50 self._root_dir = root_dir
52 self._ctrl_sock = None
53 self._log_level = log_level
62 # can not return normally after fork beacuse no exec was done.
63 # This means that if we don't do a os._exit(0) here the code that
64 # follows the call to "Server.run()" in the "caller code" will be
65 # executed... but by now it has already been executed after the
66 # first process (the one that did the first fork) returned.
74 # pipes for process synchronization
82 # os.waitpid avoids leaving a <defunc> (zombie) process
83 st = os.waitpid(pid1, 0)[1]
85 raise RuntimeError("Daemonization failed")
86 # return 0 to inform the caller method that this is not the
91 # Decouple from parent environment.
92 os.chdir(self._root_dir)
99 # see ref: "os._exit(0)"
102 # close all open file descriptors.
103 max_fd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
104 if (max_fd == resource.RLIM_INFINITY):
106 for fd in range(3, max_fd):
113 # Redirect standard file descriptors.
114 stdin = open(DEV_NULL, "r")
115 stderr = stdout = open(STD_ERR, "a", 0)
116 os.dup2(stdin.fileno(), sys.stdin.fileno())
117 # NOTE: sys.stdout.write will still be buffered, even if the file
118 # was opened with 0 buffer
119 os.dup2(stdout.fileno(), sys.stdout.fileno())
120 os.dup2(stderr.fileno(), sys.stderr.fileno())
122 # create control socket
123 self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
124 self._ctrl_sock.bind(CTRL_SOCK)
125 self._ctrl_sock.listen(0)
127 # let the parent process know that the daemonization is finished
132 def post_daemonize(self):
136 while not self._stop:
137 conn, addr = self._ctrl_sock.accept()
139 while not self._stop:
141 msg = self.recv_msg(conn)
142 except socket.timeout, e:
147 reply = self.stop_action()
149 reply = self.reply_action(msg)
152 self.send_reply(conn, reply)
155 self.log_error("NOTICE: Awaiting for reconnection")
163 def recv_msg(self, conn):
167 chunk = conn.recv(1024)
169 if e.errno != errno.EINTR:
175 if chunk[-1] == "\n":
180 decoded = base64.b64decode(data)
181 return decoded.rstrip()
183 def send_reply(self, conn, reply):
184 encoded = base64.b64encode(reply)
185 conn.send("%s\n" % encoded)
189 self._ctrl_sock.close()
194 def stop_action(self):
195 return "Stopping server"
197 def reply_action(self, msg):
198 return "Reply to: %s" % msg
200 def log_error(self, text = None, context = ''):
202 text = traceback.format_exc()
203 date = time.strftime("%Y-%m-%d %H:%M:%S")
205 context = " (%s)" % (context,)
206 sys.stderr.write("ERROR%s: %s\n%s\n" % (context, date, text))
209 def log_debug(self, text):
210 if self._log_level == DEBUG_LEVEL:
211 date = time.strftime("%Y-%m-%d %H:%M:%S")
212 sys.stderr.write("DEBUG: %s\n%s\n" % (date, text))
214 class Forwarder(object):
215 def __init__(self, root_dir = "."):
216 self._ctrl_sock = None
217 self._root_dir = root_dir
222 print >>sys.stderr, "READY."
223 while not self._stop:
224 data = self.read_data()
225 self.send_to_server(data)
226 data = self.recv_from_server()
227 self.write_data(data)
231 return sys.stdin.readline()
233 def write_data(self, data):
234 sys.stdout.write(data)
235 # sys.stdout.write is buffered, this is why we need to do a flush()
238 def send_to_server(self, data):
240 self._ctrl_sock.send(data)
242 if e.errno == errno.EPIPE:
244 self._ctrl_sock.send(data)
247 encoded = data.rstrip()
248 msg = base64.b64decode(encoded)
252 def recv_from_server(self):
256 chunk = self._ctrl_sock.recv(1024)
258 if e.errno != errno.EINTR:
263 if chunk[-1] == "\n":
269 self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
270 sock_addr = os.path.join(self._root_dir, CTRL_SOCK)
271 self._ctrl_sock.connect(sock_addr)
273 def disconnect(self):
275 self._ctrl_sock.close()
279 class Client(object):
280 def __init__(self, root_dir = ".", host = None, port = None, user = None,
282 self.root_dir = root_dir
283 self.addr = (host, port)
286 self._stopped = False
290 if self._process.poll() is None:
291 os.kill(self._process.pid, signal.SIGTERM)
295 root_dir = self.root_dir
296 (host, port) = self.addr
300 python_code = "from nepi.util import server;c=server.Forwarder(%r);\
301 c.forward()" % (root_dir,)
303 self._process = popen_ssh_subprocess(python_code, host, port,
305 # popen_ssh_subprocess already waits for readiness
307 self._process = subprocess.Popen(
308 ["python", "-c", python_code],
309 stdin = subprocess.PIPE,
310 stdout = subprocess.PIPE,
311 stderr = subprocess.PIPE
314 # Wait for the forwarder to be ready, otherwise nobody
315 # will be able to connect to it
316 helo = self._process.stderr.readline()
317 if helo != 'READY.\n':
318 raise AssertionError, "Expected 'Ready.', got %r: %s" % (helo,
319 helo + self._process.stderr.read())
321 def send_msg(self, msg):
322 encoded = base64.b64encode(msg)
323 data = "%s\n" % encoded
326 self._process.stdin.write(data)
327 except (IOError, ValueError):
328 # dead process, poll it to un-zombify
331 # try again after reconnect
332 # If it fails again, though, give up
334 self._process.stdin.write(data)
337 self.send_msg(STOP_MSG)
340 def read_reply(self):
341 data = self._process.stdout.readline()
342 encoded = data.rstrip()
343 return base64.b64decode(encoded)
345 def _make_server_key_args(server_key, host, port, args):
347 Returns a reference to the created temporary file, and adds the
348 corresponding arguments to the given argument list.
350 Make sure to hold onto it until the process is done with the file
353 host = '%s:%s' % (host,port)
354 # Create a temporary server key file
355 tmp_known_hosts = tempfile.NamedTemporaryFile()
357 # Add the intended host key
358 tmp_known_hosts.write('%s,%s %s\n' % (host, socket.gethostbyname(host), server_key))
360 # If we're not in strict mode, add user-configured keys
361 if os.environ.get('NEPI_STRICT_AUTH_MODE',"").lower() not in ('1','true','on'):
362 user_hosts_path = '%s/.ssh/known_hosts' % (os.environ.get('HOME',""),)
363 if os.access(user_hosts_path, os.R_OK):
364 f = open(user_hosts_path, "r")
365 tmp_known_hosts.write(f.read())
368 tmp_known_hosts.flush()
370 args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
371 return tmp_known_hosts
373 def popen_ssh_command(command, host, port, user, agent,
379 Executes a remote commands, returns ((stdout,stderr),process)
382 print "ssh", host, command
384 tmp_known_hosts = None
386 # Don't bother with localhost. Makes test easier
387 '-o', 'NoHostAuthenticationForLocalhost=yes',
392 args.append('-p%d' % port)
394 args.extend(('-i', ident_key))
398 # Create a temporary server key file
399 tmp_known_hosts = _make_server_key_args(
400 server_key, host, port, args)
403 # connects to the remote host and starts a remote connection
404 proc = subprocess.Popen(args,
405 stdout = subprocess.PIPE,
406 stdin = subprocess.PIPE,
407 stderr = subprocess.PIPE)
409 # attach tempfile object to the process, to make sure the file stays
410 # alive until the process is finished with it
411 proc._known_hosts = tmp_known_hosts
413 return (proc.communicate(stdin), proc)
415 def popen_scp(source, dest,
422 Copies from/to remote sites.
424 Source and destination should have the user and host encoded
427 If source is a file object, a special mode will be used to
428 create the remote file with the same contents.
430 If dest is a file object, the remote file (source) will be
431 read and written into dest.
433 In these modes, recursive cannot be True.
435 Source can be a list of files to copy to a single destination,
436 in which case it is advised that the destination be a folder.
440 print "scp", source, dest
442 if isinstance(source, file) or isinstance(dest, file) \
443 or hasattr(source, 'read') or hasattr(dest, 'write'):
446 # Parse source/destination as <user>@<server>:<path>
447 if isinstance(dest, basestring) and ':' in dest:
448 remspec, path = dest.split(':',1)
449 elif isinstance(source, basestring) and ':' in source:
450 remspec, path = source.split(':',1)
452 raise ValueError, "Both endpoints cannot be local"
453 user,host = remspec.rsplit('@',1)
454 tmp_known_hosts = None
456 args = ['ssh', '-l', user, '-C',
457 # Don't bother with localhost. Makes test easier
458 '-o', 'NoHostAuthenticationForLocalhost=yes',
461 args.append('-P%d' % port)
463 args.extend(('-i', ident_key))
465 # Create a temporary server key file
466 tmp_known_hosts = _make_server_key_args(
467 server_key, host, port, args)
469 if isinstance(source, file) or hasattr(source, 'read'):
470 args.append('cat > %s' % (shell_escape(path),))
471 elif isinstance(dest, file) or hasattr(dest, 'write'):
472 args.append('cat %s' % (shell_escape(path),))
474 raise AssertionError, "Unreachable code reached! :-Q"
476 # connects to the remote host and starts a remote connection
477 if isinstance(source, file):
478 proc = subprocess.Popen(args,
479 stdout = open('/dev/null','w'),
480 stderr = subprocess.PIPE,
482 err = proc.stderr.read()
483 proc._known_hosts = tmp_known_hosts
485 return ((None,err), proc)
486 elif isinstance(dest, file):
487 proc = subprocess.Popen(args,
488 stdout = open('/dev/null','w'),
489 stderr = subprocess.PIPE,
491 err = proc.stderr.read()
492 proc._known_hosts = tmp_known_hosts
494 return ((None,err), proc)
495 elif hasattr(source, 'read'):
496 # file-like (but not file) source
497 proc = subprocess.Popen(args,
498 stdout = open('/dev/null','w'),
499 stderr = subprocess.PIPE,
500 stdin = subprocess.PIPE)
506 buf = source.read(4096)
511 rdrdy, wrdy, broken = select.select(
514 [proc.stderr,proc.stdin])
516 if proc.stderr in rdrdy:
517 # use os.read for fully unbuffered behavior
518 err.append(os.read(proc.stderr.fileno(), 4096))
520 if proc.stdin in wrdy:
521 proc.stdin.write(buf)
527 err.append(proc.stderr.read())
529 proc._known_hosts = tmp_known_hosts
531 return ((None,''.join(err)), proc)
532 elif hasattr(dest, 'write'):
533 # file-like (but not file) dest
534 proc = subprocess.Popen(args,
535 stdout = subprocess.PIPE,
536 stderr = subprocess.PIPE,
537 stdin = open('/dev/null','w'))
542 rdrdy, wrdy, broken = select.select(
543 [proc.stderr, proc.stdout],
545 [proc.stderr, proc.stdout])
547 if proc.stderr in rdrdy:
548 # use os.read for fully unbuffered behavior
549 err.append(os.read(proc.stderr.fileno(), 4096))
551 if proc.stdout in rdrdy:
552 # use os.read for fully unbuffered behavior
553 buf = os.read(proc.stdout.fileno(), 4096)
562 err.append(proc.stderr.read())
564 proc._known_hosts = tmp_known_hosts
566 return ((None,''.join(err)), proc)
568 raise AssertionError, "Unreachable code reached! :-Q"
570 # Parse destination as <user>@<server>:<path>
571 if isinstance(dest, basestring) and ':' in dest:
572 remspec, path = dest.split(':',1)
573 elif isinstance(source, basestring) and ':' in source:
574 remspec, path = source.split(':',1)
576 raise ValueError, "Both endpoints cannot be local"
577 user,host = remspec.rsplit('@',1)
580 tmp_known_hosts = None
581 args = ['scp', '-q', '-p', '-C',
582 # Don't bother with localhost. Makes test easier
583 '-o', 'NoHostAuthenticationForLocalhost=yes' ]
585 args.append('-P%d' % port)
589 args.extend(('-i', ident_key))
591 # Create a temporary server key file
592 tmp_known_hosts = _make_server_key_args(
593 server_key, host, port, args)
594 if isinstance(source,list):
600 # connects to the remote host and starts a remote connection
601 proc = subprocess.Popen(args,
602 stdout = subprocess.PIPE,
603 stdin = subprocess.PIPE,
604 stderr = subprocess.PIPE)
605 proc._known_hosts = tmp_known_hosts
607 comm = proc.communicate()
611 def popen_ssh_subprocess(python_code, host, port, user, agent,
617 python_path.replace("'", r"'\''")
618 cmd = """PYTHONPATH="$PYTHONPATH":'%s' """ % python_path
621 # Uncomment for debug (to run everything under strace)
622 # We had to verify if strace works (cannot nest them)
623 #cmd += "if strace echo >/dev/null 2>&1; then CMD='strace -ff -tt -s 200 -o strace.out'; else CMD=''; fi\n"
625 #if self.mode == MODE_SSH:
626 # cmd += "strace -f -tt -s 200 -o strace$$.out "
628 cmd += "import base64, os\n"
629 cmd += "cmd = \"\"\n"
630 cmd += "while True:\n"
631 cmd += " cmd += os.read(0, 1)\n" # one byte from stdin
632 cmd += " if cmd[-1] == \"\\n\": break\n"
633 cmd += "cmd = base64.b64decode(cmd)\n"
634 # Uncomment for debug
635 #cmd += "os.write(2, \"Executing python code: %s\\n\" % cmd)\n"
636 cmd += "os.write(1, \"OK\\n\")\n" # send a sync message
637 cmd += "exec(cmd)\n'"
639 tmp_known_hosts = None
641 # Don't bother with localhost. Makes test easier
642 '-o', 'NoHostAuthenticationForLocalhost=yes',
647 args.append('-p%d' % port)
649 args.extend(('-i', ident_key))
653 # Create a temporary server key file
654 tmp_known_hosts = _make_server_key_args(
655 server_key, host, port, args)
658 # connects to the remote host and starts a remote rpyc connection
659 proc = subprocess.Popen(args,
660 stdout = subprocess.PIPE,
661 stdin = subprocess.PIPE,
662 stderr = subprocess.PIPE)
663 proc._known_hosts = tmp_known_hosts
665 # send the command to execute
666 os.write(proc.stdin.fileno(),
667 base64.b64encode(python_code) + "\n")
668 msg = os.read(proc.stdout.fileno(), 3)
670 raise RuntimeError("Failed to start remote python interpreter")