2 # -*- coding: utf-8 -*-
19 CTRL_SOCK = "ctrl.sock"
20 STD_ERR = "stderr.log"
29 if hasattr(os, "devnull"):
32 DEV_NULL = "/dev/null"
36 SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
39 """ Escapes strings so that they are safe to use as command-line arguments """
40 if SHELL_SAFE.match(s):
41 # safe string - no escaping needed
44 # unsafe string - escape
45 s = s.replace("'","\\'")
49 def __init__(self, root_dir = ".", log_level = ERROR_LEVEL):
50 self._root_dir = root_dir
52 self._ctrl_sock = None
53 self._log_level = log_level
62 # can not return normally after fork beacuse no exec was done.
63 # This means that if we don't do a os._exit(0) here the code that
64 # follows the call to "Server.run()" in the "caller code" will be
65 # executed... but by now it has already been executed after the
66 # first process (the one that did the first fork) returned.
74 # pipes for process synchronization
82 # os.waitpid avoids leaving a <defunc> (zombie) process
83 st = os.waitpid(pid1, 0)[1]
85 raise RuntimeError("Daemonization failed")
86 # return 0 to inform the caller method that this is not the
91 # Decouple from parent environment.
92 os.chdir(self._root_dir)
99 # see ref: "os._exit(0)"
102 # close all open file descriptors.
103 max_fd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
104 if (max_fd == resource.RLIM_INFINITY):
106 for fd in range(3, max_fd):
113 # Redirect standard file descriptors.
114 stdin = open(DEV_NULL, "r")
115 stderr = stdout = open(STD_ERR, "a", 0)
116 os.dup2(stdin.fileno(), sys.stdin.fileno())
117 # NOTE: sys.stdout.write will still be buffered, even if the file
118 # was opened with 0 buffer
119 os.dup2(stdout.fileno(), sys.stdout.fileno())
120 os.dup2(stderr.fileno(), sys.stderr.fileno())
122 # create control socket
123 self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
124 self._ctrl_sock.bind(CTRL_SOCK)
125 self._ctrl_sock.listen(0)
127 # let the parent process know that the daemonization is finished
132 def post_daemonize(self):
136 while not self._stop:
137 conn, addr = self._ctrl_sock.accept()
139 while not self._stop:
141 msg = self.recv_msg(conn)
142 except socket.timeout, e:
147 reply = self.stop_action()
149 reply = self.reply_action(msg)
152 self.send_reply(conn, reply)
155 self.log_error("NOTICE: Awaiting for reconnection")
163 def recv_msg(self, conn):
167 chunk = conn.recv(1024)
169 if e.errno != errno.EINTR:
175 if chunk[-1] == "\n":
180 decoded = base64.b64decode(data)
181 return decoded.rstrip()
183 def send_reply(self, conn, reply):
184 encoded = base64.b64encode(reply)
185 conn.send("%s\n" % encoded)
189 self._ctrl_sock.close()
194 def stop_action(self):
195 return "Stopping server"
197 def reply_action(self, msg):
198 return "Reply to: %s" % msg
200 def log_error(self, text = None, context = ''):
202 text = traceback.format_exc()
203 date = time.strftime("%Y-%m-%d %H:%M:%S")
205 context = " (%s)" % (context,)
206 sys.stderr.write("ERROR%s: %s\n%s\n" % (context, date, text))
209 def log_debug(self, text):
210 if self._log_level == DEBUG_LEVEL:
211 date = time.strftime("%Y-%m-%d %H:%M:%S")
212 sys.stderr.write("DEBUG: %s\n%s\n" % (date, text))
214 class Forwarder(object):
215 def __init__(self, root_dir = "."):
216 self._ctrl_sock = None
217 self._root_dir = root_dir
222 print >>sys.stderr, "READY."
223 while not self._stop:
224 data = self.read_data()
225 self.send_to_server(data)
226 data = self.recv_from_server()
227 self.write_data(data)
231 return sys.stdin.readline()
233 def write_data(self, data):
234 sys.stdout.write(data)
235 # sys.stdout.write is buffered, this is why we need to do a flush()
238 def send_to_server(self, data):
240 self._ctrl_sock.send(data)
242 if e.errno == errno.EPIPE:
244 self._ctrl_sock.send(data)
247 encoded = data.rstrip()
248 msg = base64.b64decode(encoded)
252 def recv_from_server(self):
256 chunk = self._ctrl_sock.recv(1024)
258 if e.errno != errno.EINTR:
263 if chunk[-1] == "\n":
269 self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
270 sock_addr = os.path.join(self._root_dir, CTRL_SOCK)
271 self._ctrl_sock.connect(sock_addr)
273 def disconnect(self):
275 self._ctrl_sock.close()
279 class Client(object):
280 def __init__(self, root_dir = ".", host = None, port = None, user = None,
282 self.root_dir = root_dir
283 self.addr = (host, port)
286 self._stopped = False
290 if self._process.poll() is None:
291 os.kill(self._process.pid, signal.SIGTERM)
295 root_dir = self.root_dir
296 (host, port) = self.addr
300 python_code = "from nepi.util import server;c=server.Forwarder(%r);\
301 c.forward()" % (root_dir,)
303 self._process = popen_ssh_subprocess(python_code, host, port,
305 # popen_ssh_subprocess already waits for readiness
307 self._process = subprocess.Popen(
308 ["python", "-c", python_code],
309 stdin = subprocess.PIPE,
310 stdout = subprocess.PIPE,
311 stderr = subprocess.PIPE
314 # Wait for the forwarder to be ready, otherwise nobody
315 # will be able to connect to it
316 helo = self._process.stderr.readline()
317 if helo != 'READY.\n':
318 raise AssertionError, "Expected 'Ready.', got %r: %s" % (helo,
319 helo + self._process.stderr.read())
321 def send_msg(self, msg):
322 encoded = base64.b64encode(msg)
323 data = "%s\n" % encoded
326 self._process.stdin.write(data)
327 except (IOError, ValueError):
328 # dead process, poll it to un-zombify
331 # try again after reconnect
332 # If it fails again, though, give up
334 self._process.stdin.write(data)
337 self.send_msg(STOP_MSG)
340 def read_reply(self):
341 data = self._process.stdout.readline()
342 encoded = data.rstrip()
343 return base64.b64decode(encoded)
345 def _make_server_key_args(server_key, host, port, args):
347 Returns a reference to the created temporary file, and adds the
348 corresponding arguments to the given argument list.
350 Make sure to hold onto it until the process is done with the file
353 host = '%s:%s' % (host,port)
354 # Create a temporary server key file
355 tmp_known_hosts = tempfile.NamedTemporaryFile()
356 tmp_known_hosts.write('%s,%s %s\n' % (host, socket.gethostbyname(host), server_key))
357 tmp_known_hosts.flush()
358 args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
359 return tmp_known_hosts
361 def popen_ssh_command(command, host, port, user, agent,
367 Executes a remote commands, returns ((stdout,stderr),process)
370 print "ssh", host, command
372 tmp_known_hosts = None
374 # Don't bother with localhost. Makes test easier
375 '-o', 'NoHostAuthenticationForLocalhost=yes',
380 args.append('-p%d' % port)
382 args.extend(('-i', ident_key))
386 # Create a temporary server key file
387 tmp_known_hosts = _make_server_key_args(
388 server_key, host, port, args)
391 # connects to the remote host and starts a remote connection
392 proc = subprocess.Popen(args,
393 stdout = subprocess.PIPE,
394 stdin = subprocess.PIPE,
395 stderr = subprocess.PIPE)
397 # attach tempfile object to the process, to make sure the file stays
398 # alive until the process is finished with it
399 proc._known_hosts = tmp_known_hosts
401 return (proc.communicate(stdin), proc)
403 def popen_scp(source, dest,
410 Copies from/to remote sites.
412 Source and destination should have the user and host encoded
415 If source is a file object, a special mode will be used to
416 create the remote file with the same contents.
418 If dest is a file object, the remote file (source) will be
419 read and written into dest.
421 In these modes, recursive cannot be True.
423 Source can be a list of files to copy to a single destination,
424 in which case it is advised that the destination be a folder.
428 print "scp", source, dest
430 if isinstance(source, file) or isinstance(dest, file) \
431 or hasattr(source, 'read') or hasattr(dest, 'write'):
434 # Parse source/destination as <user>@<server>:<path>
435 if isinstance(dest, basestring) and ':' in dest:
436 remspec, path = dest.split(':',1)
437 elif isinstance(source, basestring) and ':' in source:
438 remspec, path = source.split(':',1)
440 raise ValueError, "Both endpoints cannot be local"
441 user,host = remspec.rsplit('@',1)
442 tmp_known_hosts = None
444 args = ['ssh', '-l', user, '-C',
445 # Don't bother with localhost. Makes test easier
446 '-o', 'NoHostAuthenticationForLocalhost=yes',
449 args.append('-P%d' % port)
451 args.extend(('-i', ident_key))
453 # Create a temporary server key file
454 tmp_known_hosts = _make_server_key_args(
455 server_key, host, port, args)
457 if isinstance(source, file) or hasattr(source, 'read'):
458 args.append('cat > %s' % (shell_escape(path),))
459 elif isinstance(dest, file) or hasattr(dest, 'write'):
460 args.append('cat %s' % (shell_escape(path),))
462 raise AssertionError, "Unreachable code reached! :-Q"
464 # connects to the remote host and starts a remote connection
465 if isinstance(source, file):
466 proc = subprocess.Popen(args,
467 stdout = open('/dev/null','w'),
468 stderr = subprocess.PIPE,
470 err = proc.stderr.read()
471 proc._known_hosts = tmp_known_hosts
473 return ((None,err), proc)
474 elif isinstance(dest, file):
475 proc = subprocess.Popen(args,
476 stdout = open('/dev/null','w'),
477 stderr = subprocess.PIPE,
479 err = proc.stderr.read()
480 proc._known_hosts = tmp_known_hosts
482 return ((None,err), proc)
483 elif hasattr(source, 'read'):
484 # file-like (but not file) source
485 proc = subprocess.Popen(args,
486 stdout = open('/dev/null','w'),
487 stderr = subprocess.PIPE,
488 stdin = subprocess.PIPE)
494 buf = source.read(4096)
499 rdrdy, wrdy, broken = select.select(
502 [proc.stderr,proc.stdin])
504 if proc.stderr in rdrdy:
505 # use os.read for fully unbuffered behavior
506 err.append(os.read(proc.stderr.fileno(), 4096))
508 if proc.stdin in wrdy:
509 proc.stdin.write(buf)
515 err.append(proc.stderr.read())
517 proc._known_hosts = tmp_known_hosts
519 return ((None,''.join(err)), proc)
520 elif hasattr(dest, 'write'):
521 # file-like (but not file) dest
522 proc = subprocess.Popen(args,
523 stdout = subprocess.PIPE,
524 stderr = subprocess.PIPE,
525 stdin = open('/dev/null','w'))
530 rdrdy, wrdy, broken = select.select(
531 [proc.stderr, proc.stdout],
533 [proc.stderr, proc.stdout])
535 if proc.stderr in rdrdy:
536 # use os.read for fully unbuffered behavior
537 err.append(os.read(proc.stderr.fileno(), 4096))
539 if proc.stdout in rdrdy:
540 # use os.read for fully unbuffered behavior
541 buf = os.read(proc.stdout.fileno(), 4096)
550 err.append(proc.stderr.read())
552 proc._known_hosts = tmp_known_hosts
554 return ((None,''.join(err)), proc)
556 raise AssertionError, "Unreachable code reached! :-Q"
558 # Parse destination as <user>@<server>:<path>
559 if isinstance(dest, basestring) and ':' in dest:
560 remspec, path = dest.split(':',1)
561 elif isinstance(source, basestring) and ':' in source:
562 remspec, path = source.split(':',1)
564 raise ValueError, "Both endpoints cannot be local"
565 user,host = remspec.rsplit('@',1)
568 tmp_known_hosts = None
569 args = ['scp', '-q', '-p', '-C',
570 # Don't bother with localhost. Makes test easier
571 '-o', 'NoHostAuthenticationForLocalhost=yes' ]
573 args.append('-P%d' % port)
577 args.extend(('-i', ident_key))
579 # Create a temporary server key file
580 tmp_known_hosts = _make_server_key_args(
581 server_key, host, port, args)
582 if isinstance(source,list):
588 # connects to the remote host and starts a remote connection
589 proc = subprocess.Popen(args,
590 stdout = subprocess.PIPE,
591 stdin = subprocess.PIPE,
592 stderr = subprocess.PIPE)
593 proc._known_hosts = tmp_known_hosts
595 comm = proc.communicate()
599 def popen_ssh_subprocess(python_code, host, port, user, agent,
605 python_path.replace("'", r"'\''")
606 cmd = """PYTHONPATH="$PYTHONPATH":'%s' """ % python_path
609 # Uncomment for debug (to run everything under strace)
610 # We had to verify if strace works (cannot nest them)
611 #cmd += "if strace echo >/dev/null 2>&1; then CMD='strace -ff -tt -s 200 -o strace.out'; else CMD=''; fi\n"
613 #if self.mode == MODE_SSH:
614 # cmd += "strace -f -tt -s 200 -o strace$$.out "
616 cmd += "import base64, os\n"
617 cmd += "cmd = \"\"\n"
618 cmd += "while True:\n"
619 cmd += " cmd += os.read(0, 1)\n" # one byte from stdin
620 cmd += " if cmd[-1] == \"\\n\": break\n"
621 cmd += "cmd = base64.b64decode(cmd)\n"
622 # Uncomment for debug
623 #cmd += "os.write(2, \"Executing python code: %s\\n\" % cmd)\n"
624 cmd += "os.write(1, \"OK\\n\")\n" # send a sync message
625 cmd += "exec(cmd)\n'"
627 tmp_known_hosts = None
629 # Don't bother with localhost. Makes test easier
630 '-o', 'NoHostAuthenticationForLocalhost=yes',
635 args.append('-p%d' % port)
637 args.extend(('-i', ident_key))
641 # Create a temporary server key file
642 tmp_known_hosts = _make_server_key_args(
643 server_key, host, port, args)
646 # connects to the remote host and starts a remote rpyc connection
647 proc = subprocess.Popen(args,
648 stdout = subprocess.PIPE,
649 stdin = subprocess.PIPE,
650 stderr = subprocess.PIPE)
651 proc._known_hosts = tmp_known_hosts
653 # send the command to execute
654 os.write(proc.stdin.fileno(),
655 base64.b64encode(python_code) + "\n")
656 msg = os.read(proc.stdout.fileno(), 3)
658 raise RuntimeError("Failed to start remote python interpreter")