2 # -*- coding: utf-8 -*-
19 CTRL_SOCK = "ctrl.sock"
20 STD_ERR = "stderr.log"
28 if hasattr(os, "devnull"):
31 DEV_NULL = "/dev/null"
35 SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
38 """ Escapes strings so that they are safe to use as command-line arguments """
39 if SHELL_SAFE.match(s):
40 # safe string - no escaping needed
43 # unsafe string - escape
44 s = s.replace("'","\\'")
48 def __init__(self, root_dir = ".", log_level = ERROR_LEVEL):
49 self._root_dir = root_dir
51 self._ctrl_sock = None
52 self._log_level = log_level
61 # can not return normally after fork beacuse no exec was done.
62 # This means that if we don't do a os._exit(0) here the code that
63 # follows the call to "Server.run()" in the "caller code" will be
64 # executed... but by now it has already been executed after the
65 # first process (the one that did the first fork) returned.
73 # pipes for process synchronization
81 # os.waitpid avoids leaving a <defunc> (zombie) process
82 st = os.waitpid(pid1, 0)[1]
84 raise RuntimeError("Daemonization failed")
85 # return 0 to inform the caller method that this is not the
90 # Decouple from parent environment.
91 os.chdir(self._root_dir)
98 # see ref: "os._exit(0)"
101 # close all open file descriptors.
102 max_fd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
103 if (max_fd == resource.RLIM_INFINITY):
105 for fd in range(3, max_fd):
112 # Redirect standard file descriptors.
113 stdin = open(DEV_NULL, "r")
114 stderr = stdout = open(STD_ERR, "a", 0)
115 os.dup2(stdin.fileno(), sys.stdin.fileno())
116 # NOTE: sys.stdout.write will still be buffered, even if the file
117 # was opened with 0 buffer
118 os.dup2(stdout.fileno(), sys.stdout.fileno())
119 os.dup2(stderr.fileno(), sys.stderr.fileno())
121 # create control socket
122 self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
123 self._ctrl_sock.bind(CTRL_SOCK)
124 self._ctrl_sock.listen(0)
126 # let the parent process know that the daemonization is finished
131 def post_daemonize(self):
135 while not self._stop:
136 conn, addr = self._ctrl_sock.accept()
138 while not self._stop:
140 msg = self.recv_msg(conn)
141 except socket.timeout, e:
146 reply = self.stop_action()
148 reply = self.reply_action(msg)
151 self.send_reply(conn, reply)
154 self.log_error("NOTICE: Awaiting for reconnection")
162 def recv_msg(self, conn):
166 chunk = conn.recv(1024)
168 if e.errno != errno.EINTR:
174 if chunk[-1] == "\n":
179 decoded = base64.b64decode(data)
180 return decoded.rstrip()
182 def send_reply(self, conn, reply):
183 encoded = base64.b64encode(reply)
184 conn.send("%s\n" % encoded)
188 self._ctrl_sock.close()
193 def stop_action(self):
194 return "Stopping server"
196 def reply_action(self, msg):
197 return "Reply to: %s" % msg
199 def log_error(self, text = None, context = ''):
201 text = traceback.format_exc()
202 date = time.strftime("%Y-%m-%d %H:%M:%S")
204 context = " (%s)" % (context,)
205 sys.stderr.write("ERROR%s: %s\n%s\n" % (context, date, text))
208 def log_debug(self, text):
209 if self._log_level == DEBUG_LEVEL:
210 date = time.strftime("%Y-%m-%d %H:%M:%S")
211 sys.stderr.write("DEBUG: %s\n%s\n" % (date, text))
213 class Forwarder(object):
214 def __init__(self, root_dir = "."):
215 self._ctrl_sock = None
216 self._root_dir = root_dir
221 print >>sys.stderr, "READY."
222 while not self._stop:
223 data = self.read_data()
224 self.send_to_server(data)
225 data = self.recv_from_server()
226 self.write_data(data)
230 return sys.stdin.readline()
232 def write_data(self, data):
233 sys.stdout.write(data)
234 # sys.stdout.write is buffered, this is why we need to do a flush()
237 def send_to_server(self, data):
239 self._ctrl_sock.send(data)
241 if e.errno == errno.EPIPE:
243 self._ctrl_sock.send(data)
246 encoded = data.rstrip()
247 msg = base64.b64decode(encoded)
251 def recv_from_server(self):
255 chunk = self._ctrl_sock.recv(1024)
257 if e.errno != errno.EINTR:
262 if chunk[-1] == "\n":
268 self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
269 sock_addr = os.path.join(self._root_dir, CTRL_SOCK)
270 self._ctrl_sock.connect(sock_addr)
272 def disconnect(self):
274 self._ctrl_sock.close()
278 class Client(object):
279 def __init__(self, root_dir = ".", host = None, port = None, user = None,
281 self.root_dir = root_dir
282 self.addr = (host, port)
285 self._stopped = False
289 if self._process.poll() is None:
290 os.kill(self._process.pid, signal.SIGTERM)
294 root_dir = self.root_dir
295 (host, port) = self.addr
299 python_code = "from nepi.util import server;c=server.Forwarder(%r);\
300 c.forward()" % (root_dir,)
302 self._process = popen_ssh_subprocess(python_code, host, port,
304 # popen_ssh_subprocess already waits for readiness
306 self._process = subprocess.Popen(
307 ["python", "-c", python_code],
308 stdin = subprocess.PIPE,
309 stdout = subprocess.PIPE,
310 stderr = subprocess.PIPE
313 # Wait for the forwarder to be ready, otherwise nobody
314 # will be able to connect to it
315 helo = self._process.stderr.readline()
316 if helo != 'READY.\n':
317 raise AssertionError, "Expected 'Ready.', got %r: %s" % (helo,
318 helo + self._process.stderr.read())
320 def send_msg(self, msg):
321 encoded = base64.b64encode(msg)
322 data = "%s\n" % encoded
325 self._process.stdin.write(data)
326 except (IOError, ValueError):
327 # dead process, poll it to un-zombify
330 # try again after reconnect
331 # If it fails again, though, give up
333 self._process.stdin.write(data)
336 self.send_msg(STOP_MSG)
339 def read_reply(self):
340 data = self._process.stdout.readline()
341 encoded = data.rstrip()
342 return base64.b64decode(encoded)
344 def _make_server_key_args(server_key, host, port, args):
346 Returns a reference to the created temporary file, and adds the
347 corresponding arguments to the given argument list.
349 Make sure to hold onto it until the process is done with the file
352 host = '%s:%s' % (host,port)
353 # Create a temporary server key file
354 tmp_known_hosts = tempfile.NamedTemporaryFile()
355 tmp_known_hosts.write('%s,%s %s\n' % (host, socket.gethostbyname(host), server_key))
356 tmp_known_hosts.flush()
357 args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
358 return tmp_known_hosts
360 def popen_ssh_command(command, host, port, user, agent,
366 Executes a remote commands, returns ((stdout,stderr),process)
368 tmp_known_hosts = None
370 # Don't bother with localhost. Makes test easier
371 '-o', 'NoHostAuthenticationForLocalhost=yes',
376 args.append('-p%d' % port)
378 args.extend(('-i', ident_key))
382 # Create a temporary server key file
383 tmp_known_hosts = _make_server_key_args(
384 server_key, host, port, args)
387 # connects to the remote host and starts a remote connection
388 proc = subprocess.Popen(args,
389 stdout = subprocess.PIPE,
390 stdin = subprocess.PIPE,
391 stderr = subprocess.PIPE)
393 # attach tempfile object to the process, to make sure the file stays
394 # alive until the process is finished with it
395 proc._known_hosts = tmp_known_hosts
397 return (proc.communicate(stdin), proc)
399 def popen_scp(source, dest,
406 Copies from/to remote sites.
408 Source and destination should have the user and host encoded
411 If source is a file object, a special mode will be used to
412 create the remote file with the same contents.
414 If dest is a file object, the remote file (source) will be
415 read and written into dest.
417 In these modes, recursive cannot be True.
419 Source can be a list of files to copy to a single destination,
420 in which case it is advised that the destination be a folder.
423 if isinstance(source, file) or isinstance(dest, file) \
424 or hasattr(source, 'read') or hasattr(dest, 'write'):
427 # Parse source/destination as <user>@<server>:<path>
428 if isinstance(dest, basestring) and ':' in dest:
429 remspec, path = dest.split(':',1)
430 elif isinstance(source, basestring) and ':' in source:
431 remspec, path = source.split(':',1)
433 raise ValueError, "Both endpoints cannot be local"
434 user,host = remspec.rsplit('@',1)
435 tmp_known_hosts = None
437 args = ['ssh', '-l', user, '-C',
438 # Don't bother with localhost. Makes test easier
439 '-o', 'NoHostAuthenticationForLocalhost=yes',
442 args.append('-P%d' % port)
444 args.extend(('-i', ident_key))
446 # Create a temporary server key file
447 tmp_known_hosts = _make_server_key_args(
448 server_key, host, port, args)
450 if isinstance(source, file) or hasattr(source, 'read'):
451 args.append('cat > %s' % (shell_escape(path),))
452 elif isinstance(dest, file) or hasattr(dest, 'write'):
453 args.append('cat %s' % (shell_escape(path),))
455 raise AssertionError, "Unreachable code reached! :-Q"
457 # connects to the remote host and starts a remote connection
458 if isinstance(source, file):
459 proc = subprocess.Popen(args,
460 stdout = open('/dev/null','w'),
461 stderr = subprocess.PIPE,
463 err = proc.stderr.read()
464 proc._known_hosts = tmp_known_hosts
466 return ((None,err), proc)
467 elif isinstance(dest, file):
468 proc = subprocess.Popen(args,
469 stdout = open('/dev/null','w'),
470 stderr = subprocess.PIPE,
472 err = proc.stderr.read()
473 proc._known_hosts = tmp_known_hosts
475 return ((None,err), proc)
476 elif hasattr(source, 'read'):
477 # file-like (but not file) source
478 proc = subprocess.Popen(args,
479 stdout = open('/dev/null','w'),
480 stderr = subprocess.PIPE,
481 stdin = subprocess.PIPE)
487 buf = source.read(4096)
492 rdrdy, wrdy, broken = select.select(
495 [proc.stderr,proc.stdin])
497 if proc.stderr in rdrdy:
498 # use os.read for fully unbuffered behavior
499 err.append(os.read(proc.stderr.fileno(), 4096))
501 if proc.stdin in wrdy:
502 proc.stdin.write(buf)
508 err.append(proc.stderr.read())
510 proc._known_hosts = tmp_known_hosts
512 return ((None,''.join(err)), proc)
513 elif hasattr(dest, 'write'):
514 # file-like (but not file) dest
515 proc = subprocess.Popen(args,
516 stdout = subprocess.PIPE,
517 stderr = subprocess.PIPE,
518 stdin = open('/dev/null','w'))
523 rdrdy, wrdy, broken = select.select(
524 [proc.stderr, proc.stdout],
526 [proc.stderr, proc.stdout])
528 if proc.stderr in rdrdy:
529 # use os.read for fully unbuffered behavior
530 err.append(os.read(proc.stderr.fileno(), 4096))
532 if proc.stdout in rdrdy:
533 # use os.read for fully unbuffered behavior
534 buf = os.read(proc.stdout.fileno(), 4096)
543 err.append(proc.stderr.read())
545 proc._known_hosts = tmp_known_hosts
547 return ((None,''.join(err)), proc)
549 raise AssertionError, "Unreachable code reached! :-Q"
551 # Parse destination as <user>@<server>:<path>
552 if isinstance(dest, basestring) and ':' in dest:
553 remspec, path = dest.split(':',1)
554 elif isinstance(source, basestring) and ':' in source:
555 remspec, path = source.split(':',1)
557 raise ValueError, "Both endpoints cannot be local"
558 user,host = remspec.rsplit('@',1)
561 tmp_known_hosts = None
562 args = ['scp', '-q', '-p', '-C',
563 # Don't bother with localhost. Makes test easier
564 '-o', 'NoHostAuthenticationForLocalhost=yes' ]
566 args.append('-P%d' % port)
570 args.extend(('-i', ident_key))
572 # Create a temporary server key file
573 tmp_known_hosts = _make_server_key_args(
574 server_key, host, port, args)
575 if isinstance(source,list):
581 # connects to the remote host and starts a remote connection
582 proc = subprocess.Popen(args,
583 stdout = subprocess.PIPE,
584 stdin = subprocess.PIPE,
585 stderr = subprocess.PIPE)
586 proc._known_hosts = tmp_known_hosts
588 comm = proc.communicate()
592 def popen_ssh_subprocess(python_code, host, port, user, agent,
598 python_path.replace("'", r"'\''")
599 cmd = """PYTHONPATH="$PYTHONPATH":'%s' """ % python_path
602 # Uncomment for debug (to run everything under strace)
603 # We had to verify if strace works (cannot nest them)
604 #cmd += "if strace echo >/dev/null 2>&1; then CMD='strace -ff -tt -s 200 -o strace.out'; else CMD=''; fi\n"
606 #if self.mode == MODE_SSH:
607 # cmd += "strace -f -tt -s 200 -o strace$$.out "
609 cmd += "import base64, os\n"
610 cmd += "cmd = \"\"\n"
611 cmd += "while True:\n"
612 cmd += " cmd += os.read(0, 1)\n" # one byte from stdin
613 cmd += " if cmd[-1] == \"\\n\": break\n"
614 cmd += "cmd = base64.b64decode(cmd)\n"
615 # Uncomment for debug
616 #cmd += "os.write(2, \"Executing python code: %s\\n\" % cmd)\n"
617 cmd += "os.write(1, \"OK\\n\")\n" # send a sync message
618 cmd += "exec(cmd)\n'"
620 tmp_known_hosts = None
622 # Don't bother with localhost. Makes test easier
623 '-o', 'NoHostAuthenticationForLocalhost=yes',
628 args.append('-p%d' % port)
630 args.extend(('-i', ident_key))
634 # Create a temporary server key file
635 tmp_known_hosts = _make_server_key_args(
636 server_key, host, port, args)
639 # connects to the remote host and starts a remote rpyc connection
640 proc = subprocess.Popen(args,
641 stdout = subprocess.PIPE,
642 stdin = subprocess.PIPE,
643 stderr = subprocess.PIPE)
644 proc._known_hosts = tmp_known_hosts
646 # send the command to execute
647 os.write(proc.stdin.fileno(),
648 base64.b64encode(python_code) + "\n")
649 msg = os.read(proc.stdout.fileno(), 3)
651 raise RuntimeError("Failed to start remote python interpreter")