2 # -*- coding: utf-8 -*-
19 CTRL_SOCK = "ctrl.sock"
20 STD_ERR = "stderr.log"
28 if hasattr(os, "devnull"):
31 DEV_NULL = "/dev/null"
35 SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
38 """ Escapes strings so that they are safe to use as command-line arguments """
39 if SHELL_SAFE.match(s):
40 # safe string - no escaping needed
43 # unsafe string - escape
44 s = s.replace("'","\\'")
48 def __init__(self, root_dir = ".", log_level = ERROR_LEVEL):
49 self._root_dir = root_dir
51 self._ctrl_sock = None
52 self._log_level = log_level
61 # can not return normally after fork beacuse no exec was done.
62 # This means that if we don't do a os._exit(0) here the code that
63 # follows the call to "Server.run()" in the "caller code" will be
64 # executed... but by now it has already been executed after the
65 # first process (the one that did the first fork) returned.
73 # pipes for process synchronization
81 # os.waitpid avoids leaving a <defunc> (zombie) process
82 st = os.waitpid(pid1, 0)[1]
84 raise RuntimeError("Daemonization failed")
85 # return 0 to inform the caller method that this is not the
90 # Decouple from parent environment.
91 os.chdir(self._root_dir)
98 # see ref: "os._exit(0)"
101 # close all open file descriptors.
102 max_fd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
103 if (max_fd == resource.RLIM_INFINITY):
105 for fd in range(3, max_fd):
112 # Redirect standard file descriptors.
113 stdin = open(DEV_NULL, "r")
114 stderr = stdout = open(STD_ERR, "a", 0)
115 os.dup2(stdin.fileno(), sys.stdin.fileno())
116 # NOTE: sys.stdout.write will still be buffered, even if the file
117 # was opened with 0 buffer
118 os.dup2(stdout.fileno(), sys.stdout.fileno())
119 os.dup2(stderr.fileno(), sys.stderr.fileno())
121 # create control socket
122 self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
123 self._ctrl_sock.bind(CTRL_SOCK)
124 self._ctrl_sock.listen(0)
126 # let the parent process know that the daemonization is finished
131 def post_daemonize(self):
135 while not self._stop:
136 conn, addr = self._ctrl_sock.accept()
138 while not self._stop:
140 msg = self.recv_msg(conn)
141 except socket.timeout, e:
146 reply = self.stop_action()
148 reply = self.reply_action(msg)
151 self.send_reply(conn, reply)
154 self.log_error("NOTICE: Awaiting for reconnection")
162 def recv_msg(self, conn):
166 chunk = conn.recv(1024)
168 if e.errno != errno.EINTR:
174 if chunk[-1] == "\n":
179 decoded = base64.b64decode(data)
180 return decoded.rstrip()
182 def send_reply(self, conn, reply):
183 encoded = base64.b64encode(reply)
184 conn.send("%s\n" % encoded)
188 self._ctrl_sock.close()
193 def stop_action(self):
194 return "Stopping server"
196 def reply_action(self, msg):
197 return "Reply to: %s" % msg
199 def log_error(self, text = None, context = ''):
201 text = traceback.format_exc()
202 date = time.strftime("%Y-%m-%d %H:%M:%S")
204 context = " (%s)" % (context,)
205 sys.stderr.write("ERROR%s: %s\n%s\n" % (context, date, text))
208 def log_debug(self, text):
209 if self._log_level == DEBUG_LEVEL:
210 date = time.strftime("%Y-%m-%d %H:%M:%S")
211 sys.stderr.write("DEBUG: %s\n%s\n" % (date, text))
213 class Forwarder(object):
214 def __init__(self, root_dir = "."):
215 self._ctrl_sock = None
216 self._root_dir = root_dir
221 print >>sys.stderr, "READY."
222 while not self._stop:
223 data = self.read_data()
224 self.send_to_server(data)
225 data = self.recv_from_server()
226 self.write_data(data)
230 return sys.stdin.readline()
232 def write_data(self, data):
233 sys.stdout.write(data)
234 # sys.stdout.write is buffered, this is why we need to do a flush()
237 def send_to_server(self, data):
239 self._ctrl_sock.send(data)
241 if e.errno == errno.EPIPE:
243 self._ctrl_sock.send(data)
246 encoded = data.rstrip()
247 msg = base64.b64decode(encoded)
251 def recv_from_server(self):
255 chunk = self._ctrl_sock.recv(1024)
257 if e.errno != errno.EINTR:
262 if chunk[-1] == "\n":
268 self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
269 sock_addr = os.path.join(self._root_dir, CTRL_SOCK)
270 self._ctrl_sock.connect(sock_addr)
272 def disconnect(self):
274 self._ctrl_sock.close()
278 class Client(object):
279 def __init__(self, root_dir = ".", host = None, port = None, user = None,
281 self.root_dir = root_dir
282 self.addr = (host, port)
285 self._stopped = False
289 if self._process.poll() is None:
290 os.kill(self._process.pid, signal.SIGTERM)
294 root_dir = self.root_dir
295 (host, port) = self.addr
299 python_code = "from nepi.util import server;c=server.Forwarder(%r);\
300 c.forward()" % (root_dir,)
302 self._process = popen_ssh_subprocess(python_code, host, port,
304 # popen_ssh_subprocess already waits for readiness
306 self._process = subprocess.Popen(
307 ["python", "-c", python_code],
308 stdin = subprocess.PIPE,
309 stdout = subprocess.PIPE,
310 stderr = subprocess.PIPE
313 # Wait for the forwarder to be ready, otherwise nobody
314 # will be able to connect to it
315 helo = self._process.stderr.readline()
316 if helo != 'READY.\n':
317 raise AssertionError, "Expected 'Ready.', got %r: %s" % (helo,
318 helo + self._process.stderr.read())
320 def send_msg(self, msg):
321 encoded = base64.b64encode(msg)
322 data = "%s\n" % encoded
325 self._process.stdin.write(data)
326 except (IOError, ValueError):
327 # dead process, poll it to un-zombify
330 # try again after reconnect
331 # If it fails again, though, give up
333 self._process.stdin.write(data)
336 self.send_msg(STOP_MSG)
339 def read_reply(self):
340 data = self._process.stdout.readline()
341 encoded = data.rstrip()
342 return base64.b64decode(encoded)
344 def _make_server_key_args(server_key, host, port, args):
346 Returns a reference to the created temporary file, and adds the
347 corresponding arguments to the given argument list.
349 Make sure to hold onto it until the process is done with the file
352 host = '%s:%s' % (host,port)
353 # Create a temporary server key file
354 tmp_known_hosts = tempfile.NamedTemporaryFile()
355 tmp_known_hosts.write('%s,%s %s\n' % (host, socket.gethostbyname(host), server_key))
356 tmp_known_hosts.flush()
357 args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
358 return tmp_known_hosts
360 def popen_ssh_command(command, host, port, user, agent,
366 Executes a remote commands, returns ((stdout,stderr),process)
368 tmp_known_hosts = None
370 # Don't bother with localhost. Makes test easier
371 '-o', 'NoHostAuthenticationForLocalhost=yes',
376 args.append('-p%d' % port)
378 args.extend(('-i', ident_key))
382 # Create a temporary server key file
383 tmp_known_hosts = _make_server_key_args(
384 server_key, host, port, args)
387 # connects to the remote host and starts a remote connection
388 proc = subprocess.Popen(args,
389 stdout = subprocess.PIPE,
390 stdin = subprocess.PIPE,
391 stderr = subprocess.PIPE)
393 # attach tempfile object to the process, to make sure the file stays
394 # alive until the process is finished with it
395 proc._known_hosts = tmp_known_hosts
397 return (proc.communicate(stdin), proc)
399 def popen_scp(source, dest,
406 Copies from/to remote sites.
408 Source and destination should have the user and host encoded
411 If source is a file object, a special mode will be used to
412 create the remote file with the same contents.
414 If dest is a file object, the remote file (source) will be
415 read and written into dest.
417 In these modes, recursive cannot be True.
420 if isinstance(source, file) or isinstance(dest, file) \
421 or hasattr(source, 'read') or hasattr(dest, 'write'):
424 # Parse source/destination as <user>@<server>:<path>
425 if isinstance(dest, basestring) and ':' in dest:
426 remspec, path = dest.split(':',1)
427 elif isinstance(source, basestring) and ':' in source:
428 remspec, path = source.split(':',1)
430 raise ValueError, "Both endpoints cannot be local"
431 user,host = remspec.rsplit('@',1)
432 tmp_known_hosts = None
434 args = ['ssh', '-l', user, '-C',
435 # Don't bother with localhost. Makes test easier
436 '-o', 'NoHostAuthenticationForLocalhost=yes',
439 args.append('-P%d' % port)
441 args.extend(('-i', ident_key))
443 # Create a temporary server key file
444 tmp_known_hosts = _make_server_key_args(
445 server_key, host, port, args)
447 if isinstance(source, file) or hasattr(source, 'read'):
448 args.append('cat > %s' % (shell_escape(path),))
449 elif isinstance(dest, file) or hasattr(dest, 'write'):
450 args.append('cat %s' % (shell_escape(path),))
452 raise AssertionError, "Unreachable code reached! :-Q"
454 # connects to the remote host and starts a remote connection
455 if isinstance(source, file):
456 proc = subprocess.Popen(args,
457 stdout = open('/dev/null','w'),
458 stderr = subprocess.PIPE,
460 err = proc.stderr.read()
461 proc._known_hosts = tmp_known_hosts
463 return ((None,err), proc)
464 elif isinstance(dest, file):
465 proc = subprocess.Popen(args,
466 stdout = open('/dev/null','w'),
467 stderr = subprocess.PIPE,
469 err = proc.stderr.read()
470 proc._known_hosts = tmp_known_hosts
472 return ((None,err), proc)
473 elif hasattr(source, 'read'):
474 # file-like (but not file) source
475 proc = subprocess.Popen(args,
476 stdout = open('/dev/null','w'),
477 stderr = subprocess.PIPE,
478 stdin = subprocess.PIPE)
484 buf = source.read(4096)
489 rdrdy, wrdy, broken = select.select(
492 [proc.stderr,proc.stdin])
494 if proc.stderr in rdrdy:
495 # use os.read for fully unbuffered behavior
496 err.append(os.read(proc.stderr.fileno(), 4096))
498 if proc.stdin in wrdy:
499 proc.stdin.write(buf)
505 err.append(proc.stderr.read())
507 proc._known_hosts = tmp_known_hosts
509 return ((None,''.join(err)), proc)
510 elif hasattr(dest, 'write'):
511 # file-like (but not file) dest
512 proc = subprocess.Popen(args,
513 stdout = subprocess.PIPE,
514 stderr = subprocess.PIPE,
515 stdin = open('/dev/null','w'))
520 rdrdy, wrdy, broken = select.select(
521 [proc.stderr, proc.stdout],
523 [proc.stderr, proc.stdout])
525 if proc.stderr in rdrdy:
526 # use os.read for fully unbuffered behavior
527 err.append(os.read(proc.stderr.fileno(), 4096))
529 if proc.stdout in rdrdy:
530 # use os.read for fully unbuffered behavior
531 buf = os.read(proc.stdout.fileno(), 4096)
540 err.append(proc.stderr.read())
542 proc._known_hosts = tmp_known_hosts
544 return ((None,''.join(err)), proc)
546 raise AssertionError, "Unreachable code reached! :-Q"
548 # Parse destination as <user>@<server>:<path>
549 if isinstance(dest, basestring) and ':' in dest:
550 remspec, path = dest.split(':',1)
551 elif isinstance(source, basestring) and ':' in source:
552 remspec, path = source.split(':',1)
554 raise ValueError, "Both endpoints cannot be local"
555 user,host = remspec.rsplit('@',1)
558 tmp_known_hosts = None
559 args = ['scp', '-q', '-p', '-C',
560 # Don't bother with localhost. Makes test easier
561 '-o', 'NoHostAuthenticationForLocalhost=yes' ]
563 args.append('-P%d' % port)
567 args.extend(('-i', ident_key))
569 # Create a temporary server key file
570 tmp_known_hosts = _make_server_key_args(
571 server_key, host, port, args)
575 # connects to the remote host and starts a remote connection
576 proc = subprocess.Popen(args,
577 stdout = subprocess.PIPE,
578 stdin = subprocess.PIPE,
579 stderr = subprocess.PIPE)
580 proc._known_hosts = tmp_known_hosts
582 comm = proc.communicate()
586 def popen_ssh_subprocess(python_code, host, port, user, agent,
592 python_path.replace("'", r"'\''")
593 cmd = """PYTHONPATH="$PYTHONPATH":'%s' """ % python_path
596 # Uncomment for debug (to run everything under strace)
597 # We had to verify if strace works (cannot nest them)
598 #cmd += "if strace echo >/dev/null 2>&1; then CMD='strace -ff -tt -s 200 -o strace.out'; else CMD=''; fi\n"
600 #if self.mode == MODE_SSH:
601 # cmd += "strace -f -tt -s 200 -o strace$$.out "
603 cmd += "import base64, os\n"
604 cmd += "cmd = \"\"\n"
605 cmd += "while True:\n"
606 cmd += " cmd += os.read(0, 1)\n" # one byte from stdin
607 cmd += " if cmd[-1] == \"\\n\": break\n"
608 cmd += "cmd = base64.b64decode(cmd)\n"
609 # Uncomment for debug
610 #cmd += "os.write(2, \"Executing python code: %s\\n\" % cmd)\n"
611 cmd += "os.write(1, \"OK\\n\")\n" # send a sync message
612 cmd += "exec(cmd)\n'"
614 tmp_known_hosts = None
616 # Don't bother with localhost. Makes test easier
617 '-o', 'NoHostAuthenticationForLocalhost=yes',
622 args.append('-p%d' % port)
624 args.extend(('-i', ident_key))
628 # Create a temporary server key file
629 tmp_known_hosts = _make_server_key_args(
630 server_key, host, port, args)
633 # connects to the remote host and starts a remote rpyc connection
634 proc = subprocess.Popen(args,
635 stdout = subprocess.PIPE,
636 stdin = subprocess.PIPE,
637 stderr = subprocess.PIPE)
638 proc._known_hosts = tmp_known_hosts
640 # send the command to execute
641 os.write(proc.stdin.fileno(),
642 base64.b64encode(python_code) + "\n")
643 msg = os.read(proc.stdout.fileno(), 3)
645 raise RuntimeError("Failed to start remote python interpreter")