2 # -*- coding: utf-8 -*-
20 CTRL_SOCK = "ctrl.sock"
21 STD_ERR = "stderr.log"
28 TRACE = os.environ.get("NEPI_TRACE", "false").lower() in ("true", "1", "on")
30 if hasattr(os, "devnull"):
33 DEV_NULL = "/dev/null"
37 SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
40 """ Escapes strings so that they are safe to use as command-line arguments """
41 if SHELL_SAFE.match(s):
42 # safe string - no escaping needed
45 # unsafe string - escape
47 if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",):
50 return "'$'\\x%02x''" % (ord(c),)
51 s = ''.join(map(escp,s))
55 def __init__(self, root_dir = ".", log_level = ERROR_LEVEL):
56 self._root_dir = root_dir
58 self._ctrl_sock = None
59 self._log_level = log_level
68 # can not return normally after fork beacuse no exec was done.
69 # This means that if we don't do a os._exit(0) here the code that
70 # follows the call to "Server.run()" in the "caller code" will be
71 # executed... but by now it has already been executed after the
72 # first process (the one that did the first fork) returned.
80 # pipes for process synchronization
84 root = os.path.normpath(self._root_dir)
85 if not os.path.exists(root):
86 os.makedirs(root, 0755)
93 # os.waitpid avoids leaving a <defunc> (zombie) process
94 st = os.waitpid(pid1, 0)[1]
96 raise RuntimeError("Daemonization failed")
97 # return 0 to inform the caller method that this is not the
102 # Decouple from parent environment.
103 os.chdir(self._root_dir)
110 # see ref: "os._exit(0)"
113 # close all open file descriptors.
114 max_fd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
115 if (max_fd == resource.RLIM_INFINITY):
117 for fd in range(3, max_fd):
124 # Redirect standard file descriptors.
125 stdin = open(DEV_NULL, "r")
126 stderr = stdout = open(STD_ERR, "a", 0)
127 os.dup2(stdin.fileno(), sys.stdin.fileno())
128 # NOTE: sys.stdout.write will still be buffered, even if the file
129 # was opened with 0 buffer
130 os.dup2(stdout.fileno(), sys.stdout.fileno())
131 os.dup2(stderr.fileno(), sys.stderr.fileno())
133 # create control socket
134 self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
135 self._ctrl_sock.bind(CTRL_SOCK)
136 self._ctrl_sock.listen(0)
138 # let the parent process know that the daemonization is finished
143 def post_daemonize(self):
147 while not self._stop:
148 conn, addr = self._ctrl_sock.accept()
150 while not self._stop:
152 msg = self.recv_msg(conn)
153 except socket.timeout, e:
158 reply = self.stop_action()
160 reply = self.reply_action(msg)
163 self.send_reply(conn, reply)
166 self.log_error("NOTICE: Awaiting for reconnection")
174 def recv_msg(self, conn):
178 chunk = conn.recv(1024)
180 if e.errno != errno.EINTR:
186 if chunk[-1] == "\n":
191 decoded = base64.b64decode(data)
192 return decoded.rstrip()
194 def send_reply(self, conn, reply):
195 encoded = base64.b64encode(reply)
196 conn.send("%s\n" % encoded)
200 self._ctrl_sock.close()
205 def stop_action(self):
206 return "Stopping server"
208 def reply_action(self, msg):
209 return "Reply to: %s" % msg
211 def log_error(self, text = None, context = ''):
213 text = traceback.format_exc()
214 date = time.strftime("%Y-%m-%d %H:%M:%S")
216 context = " (%s)" % (context,)
217 sys.stderr.write("ERROR%s: %s\n%s\n" % (context, date, text))
220 def log_debug(self, text):
221 if self._log_level == DEBUG_LEVEL:
222 date = time.strftime("%Y-%m-%d %H:%M:%S")
223 sys.stderr.write("DEBUG: %s\n%s\n" % (date, text))
225 class Forwarder(object):
226 def __init__(self, root_dir = "."):
227 self._ctrl_sock = None
228 self._root_dir = root_dir
233 print >>sys.stderr, "READY."
234 while not self._stop:
235 data = self.read_data()
236 self.send_to_server(data)
237 data = self.recv_from_server()
238 self.write_data(data)
242 return sys.stdin.readline()
244 def write_data(self, data):
245 sys.stdout.write(data)
246 # sys.stdout.write is buffered, this is why we need to do a flush()
249 def send_to_server(self, data):
251 self._ctrl_sock.send(data)
253 if e.errno == errno.EPIPE:
255 self._ctrl_sock.send(data)
258 encoded = data.rstrip()
259 msg = base64.b64decode(encoded)
263 def recv_from_server(self):
267 chunk = self._ctrl_sock.recv(1024)
269 if e.errno != errno.EINTR:
274 if chunk[-1] == "\n":
280 self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
281 sock_addr = os.path.join(self._root_dir, CTRL_SOCK)
282 self._ctrl_sock.connect(sock_addr)
284 def disconnect(self):
286 self._ctrl_sock.close()
290 class Client(object):
291 def __init__(self, root_dir = ".", host = None, port = None, user = None,
292 agent = None, environment_setup = ""):
293 self.root_dir = root_dir
294 self.addr = (host, port)
297 self.environment_setup = environment_setup
298 self._stopped = False
302 if self._process.poll() is None:
303 os.kill(self._process.pid, signal.SIGTERM)
307 root_dir = self.root_dir
308 (host, port) = self.addr
312 python_code = "from nepi.util import server;c=server.Forwarder(%r);\
313 c.forward()" % (root_dir,)
315 self._process = popen_ssh_subprocess(python_code, host, port,
317 environment_setup = self.environment_setup)
318 # popen_ssh_subprocess already waits for readiness
319 if self._process.poll():
320 err = proc.stderr.read()
321 raise RuntimeError("Client could not be reached: %s" % \
324 self._process = subprocess.Popen(
325 ["python", "-c", python_code],
326 stdin = subprocess.PIPE,
327 stdout = subprocess.PIPE,
328 stderr = subprocess.PIPE
331 # Wait for the forwarder to be ready, otherwise nobody
332 # will be able to connect to it
333 helo = self._process.stderr.readline()
334 if helo != 'READY.\n':
335 raise AssertionError, "Expected 'Ready.', got %r: %s" % (helo,
336 helo + self._process.stderr.read())
338 def send_msg(self, msg):
339 encoded = base64.b64encode(msg)
340 data = "%s\n" % encoded
343 self._process.stdin.write(data)
344 except (IOError, ValueError):
345 # dead process, poll it to un-zombify
348 # try again after reconnect
349 # If it fails again, though, give up
351 self._process.stdin.write(data)
354 self.send_msg(STOP_MSG)
357 def read_reply(self):
358 data = self._process.stdout.readline()
359 encoded = data.rstrip()
360 return base64.b64decode(encoded)
362 def _make_server_key_args(server_key, host, port, args):
364 Returns a reference to the created temporary file, and adds the
365 corresponding arguments to the given argument list.
367 Make sure to hold onto it until the process is done with the file
370 host = '%s:%s' % (host,port)
371 # Create a temporary server key file
372 tmp_known_hosts = tempfile.NamedTemporaryFile()
374 # Add the intended host key
375 tmp_known_hosts.write('%s,%s %s\n' % (host, socket.gethostbyname(host), server_key))
377 # If we're not in strict mode, add user-configured keys
378 if os.environ.get('NEPI_STRICT_AUTH_MODE',"").lower() not in ('1','true','on'):
379 user_hosts_path = '%s/.ssh/known_hosts' % (os.environ.get('HOME',""),)
380 if os.access(user_hosts_path, os.R_OK):
381 f = open(user_hosts_path, "r")
382 tmp_known_hosts.write(f.read())
385 tmp_known_hosts.flush()
387 args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
388 return tmp_known_hosts
390 def popen_ssh_command(command, host, port, user, agent,
396 Executes a remote commands, returns ((stdout,stderr),process)
399 print "ssh", host, command
401 tmp_known_hosts = None
403 # Don't bother with localhost. Makes test easier
404 '-o', 'NoHostAuthenticationForLocalhost=yes',
409 args.append('-p%d' % port)
411 args.extend(('-i', ident_key))
415 # Create a temporary server key file
416 tmp_known_hosts = _make_server_key_args(
417 server_key, host, port, args)
420 # connects to the remote host and starts a remote connection
421 proc = subprocess.Popen(args,
422 stdout = subprocess.PIPE,
423 stdin = subprocess.PIPE,
424 stderr = subprocess.PIPE)
426 # attach tempfile object to the process, to make sure the file stays
427 # alive until the process is finished with it
428 proc._known_hosts = tmp_known_hosts
430 out, err = proc.communicate(stdin)
432 print " -> ", out, err
434 return ((out, err), proc)
436 def popen_scp(source, dest,
443 Copies from/to remote sites.
445 Source and destination should have the user and host encoded
448 If source is a file object, a special mode will be used to
449 create the remote file with the same contents.
451 If dest is a file object, the remote file (source) will be
452 read and written into dest.
454 In these modes, recursive cannot be True.
456 Source can be a list of files to copy to a single destination,
457 in which case it is advised that the destination be a folder.
461 print "scp", source, dest
463 if isinstance(source, file) or isinstance(dest, file) \
464 or hasattr(source, 'read') or hasattr(dest, 'write'):
467 # Parse source/destination as <user>@<server>:<path>
468 if isinstance(dest, basestring) and ':' in dest:
469 remspec, path = dest.split(':',1)
470 elif isinstance(source, basestring) and ':' in source:
471 remspec, path = source.split(':',1)
473 raise ValueError, "Both endpoints cannot be local"
474 user,host = remspec.rsplit('@',1)
475 tmp_known_hosts = None
477 args = ['ssh', '-l', user, '-C',
478 # Don't bother with localhost. Makes test easier
479 '-o', 'NoHostAuthenticationForLocalhost=yes',
482 args.append('-P%d' % port)
484 args.extend(('-i', ident_key))
486 # Create a temporary server key file
487 tmp_known_hosts = _make_server_key_args(
488 server_key, host, port, args)
490 if isinstance(source, file) or hasattr(source, 'read'):
491 args.append('cat > %s' % (shell_escape(path),))
492 elif isinstance(dest, file) or hasattr(dest, 'write'):
493 args.append('cat %s' % (shell_escape(path),))
495 raise AssertionError, "Unreachable code reached! :-Q"
497 # connects to the remote host and starts a remote connection
498 if isinstance(source, file):
499 proc = subprocess.Popen(args,
500 stdout = open('/dev/null','w'),
501 stderr = subprocess.PIPE,
503 err = proc.stderr.read()
504 proc._known_hosts = tmp_known_hosts
506 return ((None,err), proc)
507 elif isinstance(dest, file):
508 proc = subprocess.Popen(args,
509 stdout = open('/dev/null','w'),
510 stderr = subprocess.PIPE,
512 err = proc.stderr.read()
513 proc._known_hosts = tmp_known_hosts
515 return ((None,err), proc)
516 elif hasattr(source, 'read'):
517 # file-like (but not file) source
518 proc = subprocess.Popen(args,
519 stdout = open('/dev/null','w'),
520 stderr = subprocess.PIPE,
521 stdin = subprocess.PIPE)
527 buf = source.read(4096)
532 rdrdy, wrdy, broken = select.select(
535 [proc.stderr,proc.stdin])
537 if proc.stderr in rdrdy:
538 # use os.read for fully unbuffered behavior
539 err.append(os.read(proc.stderr.fileno(), 4096))
541 if proc.stdin in wrdy:
542 proc.stdin.write(buf)
548 err.append(proc.stderr.read())
550 proc._known_hosts = tmp_known_hosts
552 return ((None,''.join(err)), proc)
553 elif hasattr(dest, 'write'):
554 # file-like (but not file) dest
555 proc = subprocess.Popen(args,
556 stdout = subprocess.PIPE,
557 stderr = subprocess.PIPE,
558 stdin = open('/dev/null','w'))
563 rdrdy, wrdy, broken = select.select(
564 [proc.stderr, proc.stdout],
566 [proc.stderr, proc.stdout])
568 if proc.stderr in rdrdy:
569 # use os.read for fully unbuffered behavior
570 err.append(os.read(proc.stderr.fileno(), 4096))
572 if proc.stdout in rdrdy:
573 # use os.read for fully unbuffered behavior
574 buf = os.read(proc.stdout.fileno(), 4096)
583 err.append(proc.stderr.read())
585 proc._known_hosts = tmp_known_hosts
587 return ((None,''.join(err)), proc)
589 raise AssertionError, "Unreachable code reached! :-Q"
591 # Parse destination as <user>@<server>:<path>
592 if isinstance(dest, basestring) and ':' in dest:
593 remspec, path = dest.split(':',1)
594 elif isinstance(source, basestring) and ':' in source:
595 remspec, path = source.split(':',1)
597 raise ValueError, "Both endpoints cannot be local"
598 user,host = remspec.rsplit('@',1)
601 tmp_known_hosts = None
602 args = ['scp', '-q', '-p', '-C',
603 # Don't bother with localhost. Makes test easier
604 '-o', 'NoHostAuthenticationForLocalhost=yes' ]
606 args.append('-P%d' % port)
610 args.extend(('-i', ident_key))
612 # Create a temporary server key file
613 tmp_known_hosts = _make_server_key_args(
614 server_key, host, port, args)
615 if isinstance(source,list):
621 # connects to the remote host and starts a remote connection
622 proc = subprocess.Popen(args,
623 stdout = subprocess.PIPE,
624 stdin = subprocess.PIPE,
625 stderr = subprocess.PIPE)
626 proc._known_hosts = tmp_known_hosts
628 comm = proc.communicate()
632 def popen_ssh_subprocess(python_code, host, port, user, agent,
637 environment_setup = "",
638 waitcommand = False):
641 python_path.replace("'", r"'\''")
642 cmd = """PYTHONPATH="$PYTHONPATH":'%s' """ % python_path
644 if environment_setup:
645 cmd += environment_setup
647 # Uncomment for debug (to run everything under strace)
648 # We had to verify if strace works (cannot nest them)
649 #cmd += "if strace echo >/dev/null 2>&1; then CMD='strace -ff -tt -s 200 -o strace.out'; else CMD=''; fi\n"
651 #if self.mode == MODE_SSH:
652 # cmd += "strace -f -tt -s 200 -o strace$$.out "
654 cmd += "import base64, os\n"
655 cmd += "cmd = \"\"\n"
656 cmd += "while True:\n"
657 cmd += " cmd += os.read(0, 1)\n" # one byte from stdin
658 cmd += " if cmd[-1] == \"\\n\": break\n"
659 cmd += "cmd = base64.b64decode(cmd)\n"
660 # Uncomment for debug
661 #cmd += "os.write(2, \"Executing python code: %s\\n\" % cmd)\n"
663 cmd += "os.write(1, \"OK\\n\")\n" # send a sync message
666 cmd += "os.write(1, \"OK\\n\")\n" # send a sync message
669 tmp_known_hosts = None
671 # Don't bother with localhost. Makes test easier
672 '-o', 'NoHostAuthenticationForLocalhost=yes',
677 args.append('-p%d' % port)
679 args.extend(('-i', ident_key))
683 # Create a temporary server key file
684 tmp_known_hosts = _make_server_key_args(
685 server_key, host, port, args)
688 # connects to the remote host and starts a remote rpyc connection
689 proc = subprocess.Popen(args,
690 stdout = subprocess.PIPE,
691 stdin = subprocess.PIPE,
692 stderr = subprocess.PIPE)
693 proc._known_hosts = tmp_known_hosts
695 # send the command to execute
696 os.write(proc.stdin.fileno(),
697 base64.b64encode(python_code) + "\n")
698 msg = os.read(proc.stdout.fileno(), 3)
700 raise RuntimeError, "Failed to start remote python interpreter: \nout:\n%s%s\nerr:\n%s" % (
701 msg, proc.stdout.read(), proc.stderr.read())