2 # -*- coding: utf-8 -*-
19 CTRL_SOCK = "ctrl.sock"
20 STD_ERR = "stderr.log"
28 if hasattr(os, "devnull"):
31 DEV_NULL = "/dev/null"
35 SHELL_SAFE = re.compile('[-a-zA-Z0-9_=+:.,/]*')
38 """ Escapes strings so that they are safe to use as command-line arguments """
39 if SHELL_SAFE.match(s):
40 # safe string - no escaping needed
43 # unsafe string - escape
44 s = s.replace("'","\\'")
48 def __init__(self, root_dir = ".", log_level = ERROR_LEVEL):
49 self._root_dir = root_dir
51 self._ctrl_sock = None
52 self._log_level = log_level
61 # can not return normally after fork beacuse no exec was done.
62 # This means that if we don't do a os._exit(0) here the code that
63 # follows the call to "Server.run()" in the "caller code" will be
64 # executed... but by now it has already been executed after the
65 # first process (the one that did the first fork) returned.
73 # pipes for process synchronization
81 # os.waitpid avoids leaving a <defunc> (zombie) process
82 st = os.waitpid(pid1, 0)[1]
84 raise RuntimeError("Daemonization failed")
85 # return 0 to inform the caller method that this is not the
90 # Decouple from parent environment.
91 os.chdir(self._root_dir)
98 # see ref: "os._exit(0)"
101 # close all open file descriptors.
102 max_fd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
103 if (max_fd == resource.RLIM_INFINITY):
105 for fd in range(3, max_fd):
112 # Redirect standard file descriptors.
113 stdin = open(DEV_NULL, "r")
114 stderr = stdout = open(STD_ERR, "a", 0)
115 os.dup2(stdin.fileno(), sys.stdin.fileno())
116 # NOTE: sys.stdout.write will still be buffered, even if the file
117 # was opened with 0 buffer
118 os.dup2(stdout.fileno(), sys.stdout.fileno())
119 os.dup2(stderr.fileno(), sys.stderr.fileno())
121 # create control socket
122 self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
123 self._ctrl_sock.bind(CTRL_SOCK)
124 self._ctrl_sock.listen(0)
126 # let the parent process know that the daemonization is finished
131 def post_daemonize(self):
135 while not self._stop:
136 conn, addr = self._ctrl_sock.accept()
138 while not self._stop:
140 msg = self.recv_msg(conn)
141 except socket.timeout, e:
146 reply = self.stop_action()
148 reply = self.reply_action(msg)
151 self.send_reply(conn, reply)
154 self.log_error("NOTICE: Awaiting for reconnection")
162 def recv_msg(self, conn):
166 chunk = conn.recv(1024)
168 if e.errno != errno.EINTR:
174 if chunk[-1] == "\n":
179 decoded = base64.b64decode(data)
180 return decoded.rstrip()
182 def send_reply(self, conn, reply):
183 encoded = base64.b64encode(reply)
184 conn.send("%s\n" % encoded)
188 self._ctrl_sock.close()
193 def stop_action(self):
194 return "Stopping server"
196 def reply_action(self, msg):
197 return "Reply to: %s" % msg
199 def log_error(self, text = None, context = ''):
201 text = traceback.format_exc()
202 date = time.strftime("%Y-%m-%d %H:%M:%S")
204 context = " (%s)" % (context,)
205 sys.stderr.write("ERROR%s: %s\n%s\n" % (context, date, text))
208 def log_debug(self, text):
209 if self._log_level == DEBUG_LEVEL:
210 date = time.strftime("%Y-%m-%d %H:%M:%S")
211 sys.stderr.write("DEBUG: %s\n%s\n" % (date, text))
213 class Forwarder(object):
214 def __init__(self, root_dir = "."):
215 self._ctrl_sock = None
216 self._root_dir = root_dir
221 print >>sys.stderr, "READY."
222 while not self._stop:
223 data = self.read_data()
224 self.send_to_server(data)
225 data = self.recv_from_server()
226 self.write_data(data)
230 return sys.stdin.readline()
232 def write_data(self, data):
233 sys.stdout.write(data)
234 # sys.stdout.write is buffered, this is why we need to do a flush()
237 def send_to_server(self, data):
239 self._ctrl_sock.send(data)
241 if e.errno == errno.EPIPE:
243 self._ctrl_sock.send(data)
246 encoded = data.rstrip()
247 msg = base64.b64decode(encoded)
251 def recv_from_server(self):
255 chunk = self._ctrl_sock.recv(1024)
257 if e.errno != errno.EINTR:
262 if chunk[-1] == "\n":
268 self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
269 sock_addr = os.path.join(self._root_dir, CTRL_SOCK)
270 self._ctrl_sock.connect(sock_addr)
272 def disconnect(self):
274 self._ctrl_sock.close()
278 class Client(object):
279 def __init__(self, root_dir = ".", host = None, port = None, user = None,
281 self.root_dir = root_dir
282 self.addr = (host, port)
285 self._stopped = False
289 if self._process.poll() is None:
290 os.kill(self._process.pid, signal.SIGTERM)
294 root_dir = self.root_dir
295 (host, port) = self.addr
299 python_code = "from nepi.util import server;c=server.Forwarder(%r);\
300 c.forward()" % (root_dir,)
302 self._process = popen_ssh_subprocess(python_code, host, port,
304 # popen_ssh_subprocess already waits for readiness
306 self._process = subprocess.Popen(
307 ["python", "-c", python_code],
308 stdin = subprocess.PIPE,
309 stdout = subprocess.PIPE,
310 stderr = subprocess.PIPE
313 # Wait for the forwarder to be ready, otherwise nobody
314 # will be able to connect to it
315 helo = self._process.stderr.readline()
316 if helo != 'READY.\n':
317 raise AssertionError, "Expected 'Ready.', got %r: %s" % (helo,
318 helo + self._process.stderr.read())
320 def send_msg(self, msg):
321 encoded = base64.b64encode(msg)
322 data = "%s\n" % encoded
325 self._process.stdin.write(data)
326 except (IOError, ValueError):
327 # dead process, poll it to un-zombify
330 # try again after reconnect
331 # If it fails again, though, give up
333 self._process.stdin.write(data)
336 self.send_msg(STOP_MSG)
339 def read_reply(self):
340 data = self._process.stdout.readline()
341 encoded = data.rstrip()
342 return base64.b64decode(encoded)
344 def _make_server_key_args(server_key, host, port, args):
346 Returns a reference to the created temporary file, and adds the
347 corresponding arguments to the given argument list.
349 Make sure to hold onto it until the process is done with the file
352 host = '%s:%s' % (host,port)
353 # Create a temporary server key file
354 tmp_known_hosts = tempfile.NamedTemporaryFile()
355 tmp_known_hosts.write('%s,%s %s\n' % (host, socket.gethostbyname(host), server_key))
356 tmp_known_hosts.flush()
357 args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
358 return tmp_known_hosts
360 def popen_ssh_command(command, host, port, user, agent,
366 Executes a remote commands, returns ((stdout,stderr),process)
368 tmp_known_hosts = None
370 # Don't bother with localhost. Makes test easier
371 '-o', 'NoHostAuthenticationForLocalhost=yes',
376 args.append('-p%d' % port)
378 args.extend(('-i', ident_key))
382 # Create a temporary server key file
383 tmp_known_hosts = _make_server_key_args(
384 server_key, host, port, args)
387 # connects to the remote host and starts a remote connection
388 proc = subprocess.Popen(args,
389 stdout = subprocess.PIPE,
390 stdin = subprocess.PIPE,
391 stderr = subprocess.PIPE)
393 # attach tempfile object to the process, to make sure the file stays
394 # alive until the process is finished with it
395 proc._known_hosts = tmp_known_hosts
397 return (proc.communicate(stdin), proc)
399 def popen_scp(source, dest,
406 Copies from/to remote sites.
408 Source and destination should have the user and host encoded
411 If source is a file object, a special mode will be used to
412 create the remote file with the same contents.
414 If dest is a file object, the remote file (source) will be
415 read and written into dest.
417 In these modes, recursive cannot be True.
420 if isinstance(source, file) or isinstance(dest, file) \
421 or hasattr(source, 'read') or hasattr(dest, 'write'):
424 # Parse source/destination as <user>@<server>:<path>
425 if isinstance(dest, basestring) and ':' in dest:
426 remspec, path = dest.split(':',1)
427 elif isinstance(source, basestring) and ':' in source:
428 remspec, path = source.split(':',1)
430 raise ValueError, "Both endpoints cannot be local"
431 user,host = remspec.rsplit('@',1)
432 tmp_known_hosts = None
434 args = ['ssh', '-l', user, '-C',
435 # Don't bother with localhost. Makes test easier
436 '-o', 'NoHostAuthenticationForLocalhost=yes' ]
438 args.append('-P%d' % port)
440 args.extend(('-i', ident_key))
442 # Create a temporary server key file
443 tmp_known_hosts = _make_server_key_args(
444 server_key, host, port, args)
446 if isinstance(source, file) or hasattr(source, 'read'):
447 args.append('cat > %s' % (shell_escape(path),))
448 elif isinstance(dest, file) or hasattr(dest, 'write'):
449 args.append('cat %s' % (shell_escape(path),))
451 raise AssertionError, "Unreachable code reached! :-Q"
453 # connects to the remote host and starts a remote connection
454 if isinstance(source, file):
455 proc = subprocess.Popen(args,
456 stdout = open('/dev/null','w'),
457 stderr = subprocess.PIPE,
459 err = proc.stderr.read()
460 proc._known_hosts = tmp_known_hosts
462 return ((None,err), proc)
463 elif isinstance(dest, file):
464 proc = subprocess.Popen(args,
465 stdout = open('/dev/null','w'),
466 stderr = subprocess.PIPE,
468 err = proc.stderr.read()
469 proc._known_hosts = tmp_known_hosts
471 return ((None,err), proc)
472 elif hasattr(source, 'read'):
473 # file-like (but not file) source
474 proc = subprocess.Popen(args,
475 stdout = open('/dev/null','w'),
476 stderr = subprocess.PIPE,
483 buf = source.read(4096)
485 rdrdy, wrdy, broken = os.select(
488 [proc.stderr,proc.stdin])
490 if proc.stderr in rdrdy:
491 # use os.read for fully unbuffered behavior
492 err.append(os.read(proc.stderr.fileno(), 4096))
494 if proc.stdin in wrdy:
495 proc.stdin.write(buf)
500 err.append(proc.stderr.read())
502 proc._known_hosts = tmp_known_hosts
504 return ((None,''.join(err)), proc)
505 elif hasattr(dest, 'write'):
506 # file-like (but not file) dest
507 proc = subprocess.Popen(args,
508 stdout = open('/dev/null','w'),
509 stderr = subprocess.PIPE,
515 rdrdy, wrdy, broken = os.select(
516 [proc.stderr, proc.stdout],
518 [proc.stderr, proc.stdout])
520 if proc.stderr in rdrdy:
521 # use os.read for fully unbuffered behavior
522 err.append(os.read(proc.stderr.fileno(), 4096))
524 if proc.stdout in rdrdy:
525 # use os.read for fully unbuffered behavior
526 dest.write(os.read(proc.stdout.fileno(), 4096))
530 err.append(proc.stderr.read())
532 proc._known_hosts = tmp_known_hosts
534 return ((None,''.join(err)), proc)
536 raise AssertionError, "Unreachable code reached! :-Q"
538 # Parse destination as <user>@<server>:<path>
539 if isinstance(dest, basestring) and ':' in dest:
540 remspec, path = dest.split(':',1)
541 elif isinstance(source, basestring) and ':' in source:
542 remspec, path = source.split(':',1)
544 raise ValueError, "Both endpoints cannot be local"
545 user,host = remspec.rsplit('@',1)
548 tmp_known_hosts = None
549 args = ['scp', '-q', '-p', '-C',
550 # Don't bother with localhost. Makes test easier
551 '-o', 'NoHostAuthenticationForLocalhost=yes' ]
553 args.append('-P%d' % port)
557 args.extend(('-i', ident_key))
559 # Create a temporary server key file
560 tmp_known_hosts = _make_server_key_args(
561 server_key, host, port, args)
565 # connects to the remote host and starts a remote connection
566 proc = subprocess.Popen(args,
567 stdout = subprocess.PIPE,
568 stdin = subprocess.PIPE,
569 stderr = subprocess.PIPE)
570 proc._known_hosts = tmp_known_hosts
572 comm = proc.communicate()
576 def popen_ssh_subprocess(python_code, host, port, user, agent,
582 python_path.replace("'", r"'\''")
583 cmd = """PYTHONPATH="$PYTHONPATH":'%s' """ % python_path
586 # Uncomment for debug (to run everything under strace)
587 # We had to verify if strace works (cannot nest them)
588 #cmd += "if strace echo >/dev/null 2>&1; then CMD='strace -ff -tt -s 200 -o strace.out'; else CMD=''; fi\n"
590 #if self.mode == MODE_SSH:
591 # cmd += "strace -f -tt -s 200 -o strace$$.out "
593 cmd += "import base64, os\n"
594 cmd += "cmd = \"\"\n"
595 cmd += "while True:\n"
596 cmd += " cmd += os.read(0, 1)\n" # one byte from stdin
597 cmd += " if cmd[-1] == \"\\n\": break\n"
598 cmd += "cmd = base64.b64decode(cmd)\n"
599 # Uncomment for debug
600 #cmd += "os.write(2, \"Executing python code: %s\\n\" % cmd)\n"
601 cmd += "os.write(1, \"OK\\n\")\n" # send a sync message
602 cmd += "exec(cmd)\n'"
604 tmp_known_hosts = None
606 # Don't bother with localhost. Makes test easier
607 '-o', 'NoHostAuthenticationForLocalhost=yes',
612 args.append('-p%d' % port)
614 args.extend(('-i', ident_key))
618 # Create a temporary server key file
619 tmp_known_hosts = _make_server_key_args(
620 server_key, host, port, args)
623 # connects to the remote host and starts a remote rpyc connection
624 proc = subprocess.Popen(args,
625 stdout = subprocess.PIPE,
626 stdin = subprocess.PIPE,
627 stderr = subprocess.PIPE)
628 proc._known_hosts = tmp_known_hosts
630 # send the command to execute
631 os.write(proc.stdin.fileno(),
632 base64.b64encode(python_code) + "\n")
633 msg = os.read(proc.stdout.fileno(), 3)
635 raise RuntimeError("Failed to start remote python interpreter")