From eb2e41c982c902039b1985d446756a4c3992f130 Mon Sep 17 00:00:00 2001 From: Stephen Soltesz Date: Wed, 17 Jun 2009 20:38:04 +0000 Subject: [PATCH] copy Rpyc from 1.0 branch into trunk --- Rpyc/AsyncNetProxy.py | 82 ++++++++++ Rpyc/Authentication.py | 42 +++++ Rpyc/Boxing.py | 124 +++++++++++++++ Rpyc/Channel.py | 48 ++++++ Rpyc/Connection.py | 215 +++++++++++++++++++++++++ Rpyc/Demo/__init__.py | 0 Rpyc/Demo/demo-1.py | 156 ++++++++++++++++++ Rpyc/Demo/demo-2.py | 81 ++++++++++ Rpyc/Demo/demo-3.py | 130 +++++++++++++++ Rpyc/Demo/demo-4.py | 41 +++++ Rpyc/Demo/demo-5.py | 66 ++++++++ Rpyc/Demo/demo-6.py | 130 +++++++++++++++ Rpyc/Demo/pipe-child.py | 8 + Rpyc/Demo/pipe-parent.py | 17 ++ Rpyc/Demo/testmodule.py | 19 +++ Rpyc/Demo/testsuite.bat | 6 + Rpyc/Factories.py | 57 +++++++ Rpyc/Lib.py | 53 +++++++ Rpyc/ModuleNetProxy.py | 46 ++++++ Rpyc/NetProxy.py | 119 ++++++++++++++ Rpyc/Servers/ServerUtils.py | 90 +++++++++++ Rpyc/Servers/__init__.py | 0 Rpyc/Servers/auth_server.py | 19 +++ Rpyc/Servers/forking_server.py | 26 +++ Rpyc/Servers/selecting_server.py | 29 ++++ Rpyc/Servers/simple_server.py | 13 ++ Rpyc/Servers/std_server.py | 31 ++++ Rpyc/Servers/threaded_server.py | 9 ++ Rpyc/Stream.py | 117 ++++++++++++++ Rpyc/Utils.py | 265 +++++++++++++++++++++++++++++++ Rpyc/__init__.py | 10 ++ Rpyc/changelog.txt | 129 +++++++++++++++ 32 files changed, 2178 insertions(+) create mode 100644 Rpyc/AsyncNetProxy.py create mode 100644 Rpyc/Authentication.py create mode 100644 Rpyc/Boxing.py create mode 100644 Rpyc/Channel.py create mode 100644 Rpyc/Connection.py create mode 100644 Rpyc/Demo/__init__.py create mode 100644 Rpyc/Demo/demo-1.py create mode 100644 Rpyc/Demo/demo-2.py create mode 100644 Rpyc/Demo/demo-3.py create mode 100644 Rpyc/Demo/demo-4.py create mode 100644 Rpyc/Demo/demo-5.py create mode 100644 Rpyc/Demo/demo-6.py create mode 100644 Rpyc/Demo/pipe-child.py create mode 100644 Rpyc/Demo/pipe-parent.py create mode 100644 Rpyc/Demo/testmodule.py create mode 100644 Rpyc/Demo/testsuite.bat create mode 100644 Rpyc/Factories.py create mode 100644 Rpyc/Lib.py create mode 100644 Rpyc/ModuleNetProxy.py create mode 100644 Rpyc/NetProxy.py create mode 100644 Rpyc/Servers/ServerUtils.py create mode 100644 Rpyc/Servers/__init__.py create mode 100644 Rpyc/Servers/auth_server.py create mode 100644 Rpyc/Servers/forking_server.py create mode 100644 Rpyc/Servers/selecting_server.py create mode 100644 Rpyc/Servers/simple_server.py create mode 100644 Rpyc/Servers/std_server.py create mode 100644 Rpyc/Servers/threaded_server.py create mode 100644 Rpyc/Stream.py create mode 100644 Rpyc/Utils.py create mode 100644 Rpyc/__init__.py create mode 100644 Rpyc/changelog.txt diff --git a/Rpyc/AsyncNetProxy.py b/Rpyc/AsyncNetProxy.py new file mode 100644 index 0000000..0c1fc05 --- /dev/null +++ b/Rpyc/AsyncNetProxy.py @@ -0,0 +1,82 @@ +from NetProxy import NetProxyWrapper +from Lib import raise_exception + + +class InvalidAsyncResultState(Exception): + pass + + +class AsyncNetProxy(NetProxyWrapper): + """wraps an exiting synchronous netproxy to make is asynchronous + (remote operations return AsyncResult objects)""" + __doc__ = NetProxyWrapper.__doc__ + + def __request__(self, handler, *args): + res = AsyncResult(self.__dict__["____conn"]) + self.__dict__["____conn"].async_request(res.callback, handler, self.__dict__["____oid"], *args) + return res + + # must return a string... and it's not meaningful to return the repr of an async result + def __repr__(self, *args): + return self.__request__("handle_repr", *args).result + def __str__(self, *args): + return self.__request__("handle_str", *args).result + + +class AsyncResult(object): + """represents the result of an asynchronous operation""" + STATE_PENDING = "pending" + STATE_READY = "ready" + STATE_EXCEPTION = "exception" + STATE_UNKNOWN = "unknown" + + def __init__(self, conn): + self.conn = conn + self._state = self.STATE_PENDING + self._result = None + self._on_ready = None + + def __repr__(self): + return "" % (self._state, id(self)) + + def callback(self, event, obj): + if event == "result": + self._state = self.STATE_READY + self._result = obj + elif event == "exception": + self._state = self.STATE_EXCEPTION + self._result = obj + else: + self._state = self.STATE_UNKNOWN + self._result = obj + + if self._on_ready is not None: + self._on_ready(self) + + def _get_on_ready(self): + return self._ready_callback + + def _set_on_ready(self, obj): + self._on_ready = obj + if self._state != self.STATE_PENDING: + self._on_ready(self) + + def _get_is_ready(self): + if self._state == self.STATE_PENDING: + self.conn.poll() + return self._state != self.STATE_PENDING + + def _get_result(self): + while self._state == self.STATE_PENDING: + self.conn.serve() + if self._state == self.STATE_READY: + return self._result + elif self._state == self.STATE_EXCEPTION: + raise_exception(*self._result) + else: + raise InvalidAsyncResultState(self._state) + + is_ready = property(_get_is_ready, doc = "indicates whether or not the result is ready") + result = property(_get_result, doc = "the value of the async result (may block)") + on_ready = property(_get_on_ready, _set_on_ready, doc = + "if not None, specifies a callback which is called when the result is ready") diff --git a/Rpyc/Authentication.py b/Rpyc/Authentication.py new file mode 100644 index 0000000..afa1172 --- /dev/null +++ b/Rpyc/Authentication.py @@ -0,0 +1,42 @@ +""" +Challenge-Response authentication algorithm over channels: the client sends the +username, recvs a challenge, generates a response based on the challenge and +password, sends it back to the server, which also calculates the expected response. +the server returns "WRONG" or "OKAY". this way the password is never sent over +the wire. + +* servers should call the accept function over a channel, with a dict of +(username, password) pairs. +* clients should call login over a channel, with the username and password +""" +import md5 +import random + +def get_bytes(num_bytes): + ret = [] + for n in xrange(num_bytes): + ret.append( chr(random.randint(0,255)) ) + return ''.join(ret) + +def accept(chan, users): + username = chan.recv() + challenge = get_bytes(16) + chan.send(challenge) + response = chan.recv() + if username not in users: + chan.send("WRONG") + return False + expected_response = md5.new(users[username] + challenge).digest() + if response != expected_response: + chan.send("WRONG") + return False + chan.send("OKAY") + return True + +def login(chan, username, password): + chan.send(username) + challenge = chan.recv() + response = md5.new(password + challenge).digest() + chan.send(response) + return chan.recv() == "OKAY" + diff --git a/Rpyc/Boxing.py b/Rpyc/Boxing.py new file mode 100644 index 0000000..93e89ba --- /dev/null +++ b/Rpyc/Boxing.py @@ -0,0 +1,124 @@ +import sys +import traceback +import cPickle as pickle +from weakref import WeakValueDictionary +from Lib import ImmDict +from NetProxy import NetProxy, SyncNetProxy + + +class BoxingError(Exception): + pass +class NestedException(Exception): + pass + +PICKLE_PROTOCOL = pickle.HIGHEST_PROTOCOL +TYPE_SIMPLE = 0 +TYPE_PROXY = 1 +TYPE_TUPLE = 2 +TYPE_SLICE = 3 +TYPE_LOCAL_PROXY = 4 +TYPE_IMMDICT = 5 +simple_types = ( + bool, + int, + long, + float, + complex, + basestring, + type(None), +) + +def dump_exception(typ, val, tb): + """dumps the given exception using pickle (since not all exceptions are picklable)""" + tbtext = "".join(traceback.format_exception(typ, val, tb)) + sys.last_type, sys.last_value, sys.last_traceback = typ, val, tb + try: + pickled_exc = pickle.dumps((typ, val, tbtext), PICKLE_PROTOCOL) + except pickle.PicklingError, ex: + newval = NestedException("pickling error %s\nexception type: %r\nexception object: %s" % (ex, typ, val)) + pickled_exc = pickle.dumps((NestedException, newval, tbtext), PICKLE_PROTOCOL) + return pickled_exc + +def load_exception(package): + """returns an exception object""" + try: + return pickle.loads(package) + except pickle.PicklingError, ex: + return NestedException("failed to unpickle remote exception -- %r" % (ex,)) + +class Box(object): + """a box is where local objects are stored, and remote proxies are created""" + def __init__(self, conn): + self.conn = conn + self.objects = {} + self.proxy_cache = WeakValueDictionary() + + def close(self): + del self.conn + del self.objects + del self.proxy_cache + + def __getitem__(self, oid): + return self.objects[oid][1] + + def _box(self, obj): + if isinstance(obj, simple_types): + return TYPE_SIMPLE, obj + elif isinstance(obj, slice): + return TYPE_SLICE, (obj.start, obj.stop, obj.step) + elif isinstance(obj, NetProxy) and obj.__dict__["____conn"] is self.conn: + return TYPE_LOCAL_PROXY, obj.__dict__["____oid"] + elif isinstance(obj, tuple): + if obj: + return TYPE_TUPLE, [self._box(subobj) for subobj in obj] + else: + return TYPE_SIMPLE, () + elif isinstance(obj, ImmDict): + if not obj.dict: + return TYPE_SIMPLE, {} + else: + return TYPE_IMMDICT, [(self._box(k), self._box(v)) for k, v in obj.items()] + else: + oid = id(obj) + if oid not in self.objects: + self.objects[oid] = [0, obj] + return TYPE_PROXY, oid + + def _unbox(self, (type, value)): + if type == TYPE_SIMPLE: + return value + elif type == TYPE_TUPLE: + return tuple([self._unbox(subobj) for subobj in value]) + elif type == TYPE_SLICE: + return slice(*value) + elif type == TYPE_LOCAL_PROXY: + return self[value] + elif type == TYPE_IMMDICT: + return dict([(self._unbox(k), self._unbox(v)) for k, v in value]) + elif type == TYPE_PROXY: + if value in self.proxy_cache: + proxy = self.proxy_cache[value] + else: + proxy = SyncNetProxy(self.conn, value) + self.proxy_cache[value] = proxy + return proxy + else: + raise BoxingError("invalid boxed object type", type, value) + + def incref(self, oid): + self.objects[oid][0] += 1 + + def decref(self, oid): + self.objects[oid][0] -= 1 + if self.objects[oid][0] <= 0: + del self.objects[oid] + + def pack(self, obj): + """packs an object (returns a package)""" + return pickle.dumps(self._box(obj), PICKLE_PROTOCOL) + + def unpack(self, package): + """unpacks a package (returns an object)""" + return self._unbox(pickle.loads(package)) + + diff --git a/Rpyc/Channel.py b/Rpyc/Channel.py new file mode 100644 index 0000000..106f936 --- /dev/null +++ b/Rpyc/Channel.py @@ -0,0 +1,48 @@ +from threading import RLock +import struct + + +class Channel(object): + """a channel transfers packages over a stream. a package is any blob of data, + up to 4GB in size. channels are gauranteed to be thread-safe""" + HEADER_FORMAT = ">L" # byte order must be the same at both sides! + HEADER_SIZE = struct.calcsize(HEADER_FORMAT) + + def __init__(self, stream): + self.lock = RLock() + self.stream = stream + + def __repr__(self): + return "<%s(%r)>" % (self.__class__.__name__, self.stream) + + def send(self, data): + """sends a package""" + try: + self.lock.acquire() + header = struct.pack(self.HEADER_FORMAT, len(data)) + self.stream.write(header + data) + finally: + self.lock.release() + + def recv(self): + """receives a package (blocking)""" + try: + self.lock.acquire() + length, = struct.unpack(self.HEADER_FORMAT, self.stream.read(self.HEADER_SIZE)) + return self.stream.read(length) + finally: + self.lock.release() + + def close(self): + return self.stream.close() + + def fileno(self): + return self.stream.fileno() + + def is_available(self): + return self.stream.is_available() + + def wait(self): + return self.stream.wait() + + diff --git a/Rpyc/Connection.py b/Rpyc/Connection.py new file mode 100644 index 0000000..81bed23 --- /dev/null +++ b/Rpyc/Connection.py @@ -0,0 +1,215 @@ +import sys +from Boxing import Box, dump_exception, load_exception +from ModuleNetProxy import RootImporter +from Lib import raise_exception, AttrFrontend + + +class Connection(object): + """ + the rpyc connection layer (protocol and APIs). generally speaking, the only + things you'll need to access directly from this object are: + * modules - represents the remote python interprerer's modules namespace + * execute - executes the given code on the other side of the connection + * namespace - the namespace in which the code you `execute` resides + + the rest of the attributes should be of no intresent to you, except maybe for + `remote_conn`, which represents the other side of the connection. it is unlikely, + however, you'll need to use it (it is used interally). + + when you are done using a connection, and wish to release the resources it + uses, you should call close(). you don't have to, but if you don't, the gc + can't release the memory because of cyclic references. + """ + + def __init__(self, channel): + self._closed = False + self._local_namespace = {} + self.channel = channel + self.box = Box(self) + self.async_replies = {} + self.sync_replies = {} + self.request_seq = 0 + self.module_cache = {} + # user APIs: + self.modules = RootImporter(self) + self.remote_conn = self.sync_request("handle_getconn") + self.namespace = AttrFrontend(self.remote_conn._local_namespace) + + def __repr__(self): + if self._closed: + return "<%s - closed>" % (self.__class__.__name__,) + else: + return "<%s(%r)>" % (self.__class__.__name__, self.channel) + + # + # file api layer + # + def close(self): + if self._closed: + return + self._closed = True + self.box.close() + self.channel.close() + # untangle circular references + del self._local_namespace + del self.channel + del self.box + del self.async_replies + del self.sync_replies + del self.request_seq + del self.module_cache + del self.modules + del self.remote_conn + del self.namespace + + def fileno(self): + return self.channel.fileno() + + # + # protocol layer + # + def _recv(self): + return self.box.unpack(self.channel.recv()) + + def _send(self, *args): + return self.channel.send(self.box.pack(args)) + + def send_request(self, handlername, *args): + try: + self.channel.lock.acquire() + # this must be atomic { + self.request_seq += 1 + self._send(handlername, self.request_seq, args) + return self.request_seq + # } + finally: + self.channel.lock.release() + + def send_exception(self, seq, exc_info): + self._send("exception", seq, dump_exception(*exc_info)) + + def send_result(self, seq, obj): + self._send("result", seq, obj) + + def dispatch_result(self, seq, obj): + if seq in self.async_replies: + self.async_replies.pop(seq)("result", obj) + else: + self.sync_replies[seq] = obj + + def dispatch_exception(self, seq, obj): + excobj = load_exception(obj) + if seq in self.async_replies: + self.async_replies.pop(seq)("exception", excobj) + else: + raise_exception(*excobj) + + def dispatch_request(self, handlername, seq, args): + try: + res = getattr(self, handlername)(*args) + except SystemExit: + raise + except: + self.send_exception(seq, sys.exc_info()) + else: + self.send_result(seq, res) + + def sync_request(self, *args): + """performs a synchronous (blocking) request""" + seq = self.send_request(*args) + while seq not in self.sync_replies: + self.serve() + return self.sync_replies.pop(seq) + + def async_request(self, callback, *args): + """performs an asynchronous (non-blocking) request""" + seq = self.send_request(*args) + self.async_replies[seq] = callback + + # + # servers api + # + def poll(self): + """if available, serves a single request, otherwise returns (non-blocking serve)""" + if self.channel.is_available(): + self.serve() + return True + else: + return False + + def serve(self): + """serves a single request or reply (may block)""" + self.channel.wait() + handler, seq, obj = self._recv() + if handler == "result": + self.dispatch_result(seq, obj) + elif handler == "exception": + self.dispatch_exception(seq, obj) + else: + self.dispatch_request(handler, seq, obj) + + # + # root requests + # + def rimport(self, modulename): + """imports a module by name (as a string)""" + if modulename not in self.module_cache: + module = self.sync_request("handle_import", modulename) + self.module_cache[modulename] = module + return self.module_cache[modulename] + + def execute(self, expr, mode = "exec"): + """execute the given code at the remote side of the connection""" + return self.sync_request("handle_execute", expr, mode) + + # + # handlers layer + # + def handle_incref(self, oid): + self.box.incref(oid) + + def handle_decref(self, oid): + self.box.decref(oid) + + def handle_delattr(self, oid, name): + delattr(self.box[oid], name) + + def handle_getattr(self, oid, name): + return getattr(self.box[oid], name) + + def handle_setattr(self, oid, name, value): + setattr(self.box[oid], name, value) + + def handle_delitem(self, oid, index): + del self.box[oid][index] + + def handle_getitem(self, oid, index): + return self.box[oid][index] + + def handle_setitem(self, oid, index, value): + self.box[oid][index] = value + + def handle_call(self, oid, args, kwargs): + return self.box[oid](*args, **kwargs) + + def handle_repr(self, oid): + return repr(self.box[oid]) + + def handle_str(self, oid): + return str(self.box[oid]) + + def handle_bool(self, oid): + return bool(self.box[oid]) + + def handle_import(self, modulename): + return __import__(modulename, None, None, modulename.split(".")[-1]) + + def handle_getconn(self): + return self + + def handle_execute(self, expr, mode): + codeobj = compile(expr, "" % (self,), mode) + return eval(codeobj, self._local_namespace) + + + diff --git a/Rpyc/Demo/__init__.py b/Rpyc/Demo/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/Rpyc/Demo/demo-1.py b/Rpyc/Demo/demo-1.py new file mode 100644 index 0000000..ea1c5a1 --- /dev/null +++ b/Rpyc/Demo/demo-1.py @@ -0,0 +1,156 @@ +# +# welcome to RPyC. this demo serves as an introduction. i believe in learning through +# showcases, and that's why this package comes with a demo subpackage, instead of +# documentation +# +# so, the first thing we're gonna do is import the SocketConnection. this is a factory +# function that returns us a new Connection object over a socket stream. we dont need +# to get into details here. +# +from Rpyc.Factories import SocketConnection + +# +# next, we'll get all the helpful utilities. the utilities include wrappers for builtin +# functions, like dir(), so they'd work as expected with netproxies. +# +from Rpyc.Utils import * + +# +# by now you should have an rpyc server running. if you dont, go to the Servers directory +# and choose your favorite version of a socket server. for unixes i'd recommend the +# forking server; for windows -- the threaded server. +# +# so let's connect to the server +# +c = SocketConnection("localhost", 22222) + +# +# now it's time to explain a little about how rpyc works. it's quite simple really. the +# magic comes from a concept called NetProxy. a NetProxy object delivers all of the +# operations performed on it to the remote object. so if you get a list from your host, +# what you're are really getting is a NetProxy to that list. it looks and works just +# like a real list -- but everytime you do something on it, it actually performs a +# request on the list object stored on the host. this is called boxing. this means +# you can change the object you get locally, and the remote object changes, etc. +# +# however, for efficiency and other reason, not all objects you get are NetProxies. +# all immutable and pickle-able objects pass by value (through pickle). these types +# of objects include ints, longs, strings, and some other types. all other types are +# passed by boxing. +# +# this boxing mechanism works on everything -- objects, functions, classes, and modules, +# which is why rpyc is considered transparent. your code looks just as if it was meant +# to run locally. +# + +# +# let's start with something simple -- getting a remote module. accessing the remote +# namespace always starts with the `modules` attribute, then the module (or package) +# name, and then the attribute you want to get. +# + +print c.modules.sys +print c.modules.sys.path +c.modules.sys.path.append("lucy") +print c.modules.sys.path[-1] + +# +# these remote objects are first class objects, like all others in python. this means +# you can store them in variables, pass them as parameters, etc. +# +rsys = c.modules.sys +rpath = rsys.path +rpath.pop(-1) + +# +# and as you might expect, netproxies also look like the real objects +# +print dir(rpath) + +# +# but there are a couple of issues with netproxies. the type(), isinstance(), and +# issubclass() classes dont work on them... as they query the underlying object, not +# the remote one. so: +# +print type(rsys.maxint) # -- because it's a simple type which is passed by value) +print type(rsys.path) # -- because, after all, it's a netproxy, not a list + +# +# now for a demo of packages +# (which looks very much like 'from xml.dom.minidom import parseString') +# +parseString = c.modules.xml.dom.minidom.parseString +x = parseString("lala") +print x +x.toxml() +print x.firstChild.nodeName + +# +# however, there's a catch when working with packages like that. the way it works is +# trying to find an attribute with that name, and if not found, trying to import a sub- +# module. +# +# now in english: +# c.module.xml is the xml module of the server. when you do c.module.xml.dom, rpyc looks +# for an attribute named 'dom' inside the xml module. since there's no such attribute, +# it tries to import a subpackage called xml.dom, which succeeds. then it does the same +# for xml.dom.minidom, and xml.dom.minidom.parseString. +# +# but there are times when that's ambiguous. this mean that the module has both a sub- +# module called 'X', and an attribute called 'X'. according to rpyc's algorithm, the +# attribute 'X' is returned, not the sub-module. +# +# but if you need to be explicit, you can, and it works like this: +# + +c.modules["xml.dom.minidom"].parseString("") + +# +# this will make sure the module 'xml.dom.minidom' is returned, and not an attribute. +# in general, it's better to use this form, unless you know there are no such conflicts. +# remeber that "Explicit is better than implicit", although it requires four more key +# strokes. perhaps in a later version it will raise an exception if there's a conflict. +# + +# +# and now for a little demo of working with files (a common task) +# +f = c.modules.__builtin__.open("lala.txt", "w") +f.write("lucy") +f.close() +c.modules.os.remove("lala.txt") + +# +# now to a bitter part of life: exceptions. as you could expect, they work just like +# regular exceptions +# +try: + a = c.modules.sys.nonexistent_attribute +except AttributeError: + pass +else: + assert False + +try: + a = c.modules.__builtin__.open("**\\//##~~..::!@#$%^&*()_+\n <,>?") +except IOError: + pass +else: + assert False + +print "goodbye" + + + + + + + + + + + + + + + diff --git a/Rpyc/Demo/demo-2.py b/Rpyc/Demo/demo-2.py new file mode 100644 index 0000000..7ed797c --- /dev/null +++ b/Rpyc/Demo/demo-2.py @@ -0,0 +1,81 @@ +# +# okay, this demo is more advanced. here we'll learn about: +# * redirecting standard files +# * synchronous callbacks +# * the ulitities module +# +import sys +import os +from Rpyc.Factories import SocketConnection +from Rpyc.Utils import hasattr, getattr, reload, upload, remote_interpreter + +c = SocketConnection("localhost", 22222) + +# +# redirect our stdout to the server +# +sys.stdout = c.modules.sys.stdout +print "this time we focus on `the seatle music`" + +# +# and the other way round +# +sys.stdout = sys.__stdout__ +c.modules.sys.stdout = sys.stdout +c.modules.sys.stdout.write("alice in chains\n") + +# +# but you dont believe me, so +# +c.modules.Rpyc.Demo.testmodule.printer("tool") + +# +# and restore that +# +c.modules.sys.stdout = c.modules.sys.__stdout__ + +# +# now let's play with callbacks +# +def f(text): + print text + +c.modules.Rpyc.Demo.testmodule.caller(f, "nirvana") + +# +# and if you insist +# +def g(func, text): + c.modules.Rpyc.Demo.testmodule.caller(func, text) + +c.modules.Rpyc.Demo.testmodule.caller(g, f, "soundgarden") + +# +# now for the utilities module. it gives us the following cool functions: +# * dir, getattr, hasattr, help, reload -- overriding builtins +# * upload, download -- transfering files/directories to/from the client/server (all the permutations) +# * remote_shell, remote_interpreter -- running remote processess and debugging +# +print hasattr(sys, "path") +print hasattr(c.modules.sys, "path") + +print getattr(sys, "maxint") +print getattr(c.modules.sys, "maxint") + +print reload(sys) +print reload(c.modules.sys) + +f=open("lala.txt", "w") +f.write("king crimson") +f.close() +upload(c, "lala.txt", "../lala.txt") +os.remove("lala.txt") +c.modules.os.remove("../lala.txt") + +remote_interpreter(c) + + +print "goodbye" + + + diff --git a/Rpyc/Demo/demo-3.py b/Rpyc/Demo/demo-3.py new file mode 100644 index 0000000..220681e --- /dev/null +++ b/Rpyc/Demo/demo-3.py @@ -0,0 +1,130 @@ +# +# this is the grand finale: asynchronous proxies as super-events +# +from Rpyc.Factories import SocketConnection, Async +from Rpyc.Utils import * + +c = SocketConnection("localhost") + +# +# this is the remote int type +# +rint = c.modules.__builtin__.int + +# +# and we'll wrap it in an asynchronous wrapper +# +rint = Async(rint) + +# +# now it still looks like a normal proxy... but operations on it return something called +# an AsyncResult -- it's an object that represents the would-be result of the operation. +# it has a .is_ready property, which indicates whether or not the result is ready, and +# a .result property, which holds the result of the operations. when you access the .result +# property, it will block until the result is returned +# +a = rint("123") +b = rint("metallica") +print a +print b.is_ready +print a.result +print a + +# +# and when an exception occurs, it looks like that +# +try: + b.result +except ValueError: + pass + +# +# only when you access the result you get the exception, which may look weird, but hey, +# it's an asynchronous world out there. +# + +# +# there's another methodology for async proxies -- on_ready callbacks. instead of +# getting the async result, you can register a callback to collect it, when it arrives. +# +def f(res): + print "the result is", + try: + print res.result + except: + print "an exception" + +rint = Async(c.modules.__builtin__.int) + +ar = rint("123") +ar.on_ready = f + +# this will cause an exception +ar = rint("a perfect circle") +ar.on_ready = f + +# or when you dont need to keep the async result +rint("456").on_ready = f + +# and it's not limited to calling it. anything you do to the async proxy is asynchronous. +# for example, you can also get attributes asynchronously: +ar = rint.__str__ + +# +# now we'll do some other request, which will cause the results to arrive, and the callback +# to be called. +# +print c.modules.sys + +############################################################################################ +# +# this is where we get hardcore: threads and event callbacks +# +xxx = 0 +def blah(): + global xxx + xxx += 1 + +# +# we'll start a thread on the server which on threadfunc (which is defined in the testmodule). +# this function will call the callback we give it every second, but will ignore the result. +# this practically means it's like an event -- trigger and forget. on the client side, the +# callback will increment `xxx` every time it's called +# +c.modules.thread.start_new_thread(c.modules.Rpyc.Demo.testmodule.threadfunc, (blah,)) + +# +# we'll wait a little +# +import time +time.sleep(5) + +# +# and do some operation, which, along with it, will pull all incoming requests +# +print c.modules.sys +print xxx + +# +# and we can start a thread of our own to pull the requests in the background +# +import thread +worker_running = True + +def worker(conn): + while worker_running: + conn.serve() + +thread.start_new_thread(worker, (c,)) + +time.sleep(5) +worker_running = False + +print xxx +print "goodbye" + +# +# L33TN3SS +# + + diff --git a/Rpyc/Demo/demo-4.py b/Rpyc/Demo/demo-4.py new file mode 100644 index 0000000..1a651ae --- /dev/null +++ b/Rpyc/Demo/demo-4.py @@ -0,0 +1,41 @@ +import time +from Rpyc.Factories import SocketConnection, Async + +c = SocketConnection("localhost") +c2 = SocketConnection("localhost") + +huge_xml = " " * 50000 + " " * 50000 +parseString = Async(c.modules.xml.dom.minidom.parseString) +res = parseString(huge_xml) + +print "while we're waiting for the server to complete, we do other stuff" +t = time.time() +while not res.is_ready: + time.sleep(0.5) + # we dont want to use `c`, because it would block us (as the server is blocking) + # but `c2` runs on another thread/process, so it wouldn't block + print c2.modules.os.getpid() + +t = time.time() - t +print "it took %d seconds" % (t,) + +print res.result + + +# +# note: to improve performance, delete the result when you no longer need it. +# this should be done because the server might (as in this case) hold enormous +# amounts of memory, which will slow it down +# +# if you do this: +# res = parseString(huge_xml) +# res = parseString(huge_xml) +# res will be deleted only after the second operation finishes, because only when +# the second result is assigned, the first is released -- server still holds +# around 160MB of the old xml tree for nothing. so it's a good idea to `del res` +# when you dont need it. +# +# also, there's a memory leak on the server, which i'm working on solving. +# + + diff --git a/Rpyc/Demo/demo-5.py b/Rpyc/Demo/demo-5.py new file mode 100644 index 0000000..4ea4491 --- /dev/null +++ b/Rpyc/Demo/demo-5.py @@ -0,0 +1,66 @@ +# +# this demo will show you working with asynch proxies and callback +# verison 2.3 removes the AsyncCallback factory, and instead provides a mechanism +# where async results can provide a callback. it simplifies the design, so i +# went for it. +# +import time +from Rpyc.Factories import SocketConnection, Async + +c1 = SocketConnection("localhost") + +# f1 is an async proxy to the server's sleep function +f1 = Async(c1.modules.time.sleep) + +# this would block the server for 9 seconds +r1 = f1(9) +# and this would block it for 11 +r2 = f1(11) + +# of course the client isnt affected (that's the whole point of Async) +# but since the same server can't block simultaneously, the second request is +# queued. this is a good example of queuing. + +# now we'll wait for both results to finish. this should print around 20 lines +# (more or less, depending on the phase) +while not r1.is_ready or not r2.is_ready: + print "!" + time.sleep(1) + +print "---" + +# now we'll dig in the h4xx0r shit -- running things simultaneously +# for this, we'll need another connection, and another proxy: +c2 = SocketConnection("localhost") +f2 = Async(c2.modules.time.sleep) + +# now we'll do the same as the above, but this time, it will happen simulatenously +# becuase f1 and f2 work on different connections +r1 = f1(9) +r2 = f2(11) + +# so this time, it will print around 11 lines +while not r1.is_ready or not r2.is_ready: + print "!" + time.sleep(1) + +print "---" + +# very haxxor indeed. now, we'll see how to use the on_ready callback +r1 = f1(9) +r2 = f2(11) + +def blah(res): + print "look mama, no hands! res = %r" % (res.result,) + +# set the on_ready callback -- when r1 is becomes ready, the callback will +# be called automagically +r1.on_ready = blah + +# this should print 9 "!", then "look mama", then two more "!" +while not r1.is_ready or not r2.is_ready: + print "!" + time.sleep(1) + + + diff --git a/Rpyc/Demo/demo-6.py b/Rpyc/Demo/demo-6.py new file mode 100644 index 0000000..1c34039 --- /dev/null +++ b/Rpyc/Demo/demo-6.py @@ -0,0 +1,130 @@ +# as you can see - the import line now requires even less typing! +from Rpyc import * +c = SocketConnection("localhost") + +#------------------------------------------------------------------------------ +# this demo shows the new `execute` and `namespace` features of rpyc +#------------------------------------------------------------------------------ + + +# the code below will run AT THE OTHER SIDE OF THE CONNECTION... so you'll see +# 'hello world' on the server's console +c.execute("print 'hello world'") + +import sys +c.modules.sys.stdout = sys.stdout + +# and this time, on our console +c.execute("print 'brave new world'") + +# restore that +c.modules.sys.stdout = c.modules.sys.__stdout__ + +# anyway, the `execute` method runs the given code at the other side of the connection +# and works in the `namespace` dict. what? +c.execute("x = [1,2,3]") +print c.namespace.x + +# now it makes sense, doesn't it? the 'namespace' attribute is something i called +# AttrFrontend -- it wraps a dict with the attribute protocol, so you can access +# it with the dot notation, instead of the braces notation (more intuitive). +# this namespace works both ways -- executing code affects the namespace, while +# altering the namespace directly also affects it: +c.namespace.x.append(4) +c.execute("x.append(5)") +print c.namespace.x + +# but you should not assign complex objects (not int/float/str, etc) to this namespace +# directy, or NetProxies will be created. there's nothing wrong with that, but keep +# in mind it might cause blocking (and even deadlocks), as i'll explain later. + +# another cool thing i want to show is the second, optional parameter to execute: mode. +# the mode controls how the code is compiled. the default mode is "exec", which means +# it executes the code as a module. the other option is "eval" which returns a value. +# so if you want to _do_ something, like printing of assigning a variable, you do it +# with "exec", and if you want to evaluate something, you do it with "eval" +# for example: + +# this will print None +print c.execute("1+2") + +# while this will print 3 +print c.execute("1+2", "eval") + +# but there's a time in a man's life when he asks himself, why the heck? you can, as i +# showed in other places, just do this: +# c.modules.__builtin__.eval("1+2") +# so what's the point? +# +# well, i've been waiting for this question. the rationale behind this seemingly useless +# feature is for times you NEED to have the code executing remotely, but writing a +# dedicated module for it is overdoing it: +# * more files to update ==> more chance that you'll forget to update +# * distributing the module to all of the machines +# * making a mess on the file system +# * it's really not a module... it's just some code that logically belongs to one single +# module, but technical difficulties prevent it +# +# and to show you what i mean -- i want to start a thread on the server, like it did in +# several places over the demos. this thread will send me an event every second. what i +# used to do was, creating another module, like testmodule.py to define the thread +# function, so it will exist on the server, and i could call it. +# if i defined thread_func at the client side, then the thread will block when trying +# to execute the code, because the client holds it. so this new mechanism lets you +# distribute code in a volatile fashion: +# * when the connection is closed, everything you defined is gone +# * no file-system mess +# * no need to distribute files across the network +# * only one place to maintain + +c.execute(""" +my_thread_active = True + +def my_thread_func(callback): + import time + from Rpyc import Async + + callback = Async(callback) + while my_thread_active: + callback(time.time()) + time.sleep(1) + print "the thread says goodbye" +""") + +def callback(timestamp): + print "the timestamp is", timestamp + +c.modules.thread.start_new_thread(c.namespace.my_thread_func, (callback,)) +c.modules.time.sleep(5) +c.namespace.my_thread_active = False +c.close() + +# it's not only for threads of course. there are many times when you NEED the code/objects +# on the remote side. for example: +# * situations that would block (like having the thread func on the client) +# * code that check the type of the object (type or isinstance), and a NetProxy would make +# it cry. DONT CHECK THE TYPE OF OBJECTS, PEOPLE, JUST USE THEM! that's why they invented +# duck-typing. argh. +# * other places i didnt think of as of yet. i want to sleep. leave me alone ;) zzzZZZ +# +# so enjoy! + + + + + + + + + + + + + + + + + + + + diff --git a/Rpyc/Demo/pipe-child.py b/Rpyc/Demo/pipe-child.py new file mode 100644 index 0000000..517d0ef --- /dev/null +++ b/Rpyc/Demo/pipe-child.py @@ -0,0 +1,8 @@ +import sys +from Rpyc import PipeConnection + +c = PipeConnection(sys.stdin, sys.stdout) +c.modules.sys.path.append("i love lucy") + + +# child dies \ No newline at end of file diff --git a/Rpyc/Demo/pipe-parent.py b/Rpyc/Demo/pipe-parent.py new file mode 100644 index 0000000..bd8cc89 --- /dev/null +++ b/Rpyc/Demo/pipe-parent.py @@ -0,0 +1,17 @@ +# a demo for parent/child over pipes + +import sys +from popen2 import popen3 +from Rpyc import PipeConnection + +cout, cin, cerr = popen3("python pipe-child.py") +conn = PipeConnection(cout, cin) + +try: + while True: + conn.serve() +except EOFError: + print "goodbye child" + +print sys.path[-1] + diff --git a/Rpyc/Demo/testmodule.py b/Rpyc/Demo/testmodule.py new file mode 100644 index 0000000..20d2bc9 --- /dev/null +++ b/Rpyc/Demo/testmodule.py @@ -0,0 +1,19 @@ +import time +from Rpyc.Factories import Async + +def threadfunc(callback): + """this function will call the callback every second""" + callback = Async(callback) + try: + while True: + callback() + time.sleep(1) + except: + print "thread exiting" + +def printer(text): + print text + +def caller(func, *args): + func(*args) + \ No newline at end of file diff --git a/Rpyc/Demo/testsuite.bat b/Rpyc/Demo/testsuite.bat new file mode 100644 index 0000000..fa46892 --- /dev/null +++ b/Rpyc/Demo/testsuite.bat @@ -0,0 +1,6 @@ +python demo-1.py +python demo-2.py +python demo-3.py +python demo-4.py +python demo-5.py +python demo-6.py diff --git a/Rpyc/Factories.py b/Rpyc/Factories.py new file mode 100644 index 0000000..9de3b69 --- /dev/null +++ b/Rpyc/Factories.py @@ -0,0 +1,57 @@ +""" +the factory: +exposes a nice and easy interface to the internals of rpyc. +this module, along with Utils, are the only modules most clients will need. +""" + +from Stream import SocketStream, PipeStream +from Channel import Channel +from Connection import Connection +from AsyncNetProxy import AsyncNetProxy +from weakref import WeakValueDictionary +from Lib import DEFAULT_PORT + + +__all__ = ["SocketConnection", "AuthSocketConnection", "PipeConnection", "Async"] +_async_proxy_cache = WeakValueDictionary() + +class LoginError(Exception): + pass + +def SocketConnection(host, port = DEFAULT_PORT, **kw): + """shorthand for creating a conneciton over a socket to a server""" + return Connection(Channel(SocketStream.from_new_socket(host, port, **kw))) + +def _create_auth_connection(chan, username, password): + from Authentication import login + if not login(chan, username, password): + raise LoginError("the server did not accept the login") + return Connection(chan) + +def AuthSocketConnection(host, username, password, port = DEFAULT_PORT, **kw): + """shorthand for creating a conneciton over a socket to a server, with authentication""" + chan = Channel(SocketStream.from_new_socket(host, port, **kw)) + return _create_auth_connection(chan, username, password) + +def PipeConnection(incoming, outgoing): + """shorthand for creating a conneciton over a pipe""" + return Connection(Channel(PipeStream(incoming, outgoing))) + +def AuthPipeConnection(incoming, outgoing, username, password): + """shorthand for creating a conneciton over a pipe""" + chan = Channel(PipeStream(incoming, outgoing)) + return _create_auth_connection(chan, username, password) + +def Async(proxy): + """a factory for creating asynchronous proxies for existing synchronous ones""" + key = id(proxy) + if key in _async_proxy_cache: + return _async_proxy_cache[key] + else: + new_proxy = AsyncNetProxy(proxy) + _async_proxy_cache[key] = new_proxy + return new_proxy + + + + diff --git a/Rpyc/Lib.py b/Rpyc/Lib.py new file mode 100644 index 0000000..bff6957 --- /dev/null +++ b/Rpyc/Lib.py @@ -0,0 +1,53 @@ +""" +shared types, functions and constants +""" +import sys + + +DEFAULT_PORT = 18812 + +def raise_exception(typ, val, tbtext): + """a helper for raising remote exceptions""" + if type(typ) == str: + raise typ + else: + val._remote_traceback = tbtext + raise val + +class ImmDict(object): + """immutable dict (passes by value)""" + def __init__(self, dict): + self.dict = dict + def items(self): + return self.dict.items() + +class AttrFrontend(object): + """a wrapper that implements the attribute protocol for a dict backend""" + def __init__(self, dict): + self.__dict__["____dict"] = dict + def __delattr__(self, name): + del self.__dict__["____dict"][name] + def __getattr__(self, name): + return self.__dict__["____dict"][name] + def __setattr__(self, name, value): + self.__dict__["____dict"][name] = value + def __repr__(self): + return "" % (self.__dict__["____dict"].keys(),) + +# installs an rpyc-enabled exception hook. this happens automatically when the module +# is imported. also, make sure the current excepthook is the original one, so we dont +# install our hook twice (and thus cause infinite recursion) in case the module is reloaded +def rpyc_excepthook(exctype, value, traceback): + if hasattr(value, "_remote_traceback"): + print >> sys.stderr, "======= Remote traceback =======" + print >> sys.stderr, value._remote_traceback + print >> sys.stderr, "======= Local exception =======" + orig_excepthook(exctype, value, traceback) + else: + orig_excepthook(exctype, value, traceback) + +if sys.excepthook.__name__ != "rpyc_excepthook": + orig_excepthook = sys.excepthook + sys.excepthook = rpyc_excepthook + + diff --git a/Rpyc/ModuleNetProxy.py b/Rpyc/ModuleNetProxy.py new file mode 100644 index 0000000..3445088 --- /dev/null +++ b/Rpyc/ModuleNetProxy.py @@ -0,0 +1,46 @@ +from NetProxy import NetProxyWrapper + + +class ModuleNetProxy(NetProxyWrapper): + """a netproxy specialzied for exposing remote modules (first tries to getattr, + if it fails tries to import)""" + __doc__ = NetProxyWrapper.__doc__ + + def __init__(self, proxy, base): + NetProxyWrapper.__init__(self, proxy) + self.__dict__["____base"] = base + self.__dict__["____cache"] = {} + + def __request__(self, handler, *args): + return self.__dict__["____conn"].sync_request(handler, self.__dict__["____oid"], *args) + + def __getattr__(self, name): + if name in self.__dict__["____cache"]: + return self.__dict__["____cache"][name] + + try: + return self.__request__("handle_getattr", name) + except AttributeError: + pass + + try: + fullname = self.__dict__["____base"] + "." + name + obj = self.__dict__["____conn"].rimport(fullname) + module = ModuleNetProxy(obj, fullname) + self.__dict__["____cache"][name] = module + return module + except ImportError: + raise AttributeError("'module' object has not attribute or submodule %r" % (name,)) + +class RootImporter(object): + """the root of the interpreter's import hierarchy""" + + def __init__(self, conn): + self.__dict__["____conn"] = conn + + def __getitem__(self, name): + return self.__dict__["____conn"].rimport(name) + + def __getattr__(self, name): + return ModuleNetProxy(self[name], name) + diff --git a/Rpyc/NetProxy.py b/Rpyc/NetProxy.py new file mode 100644 index 0000000..96ed53e --- /dev/null +++ b/Rpyc/NetProxy.py @@ -0,0 +1,119 @@ +from Lib import ImmDict + + +def fully_dynamic_metaclass(name, bases, dict): + """ + a meta class that enables special methods to be accessed like regular names + (via __getattr__), like it used to be in old-style classes. + """ + + special_methods = [ + "__hash__", "__len__", "__iter__", "next", "__reversed__", + "__add__", "__iadd__", "__radd__", "__sub__", "__isub__", "__rsub__", "__mul__", + "__imul__", "__rmul__", "__div__", "__idiv__", "__rdiv__", "__truediv__", + "__itruediv__", "__rtruediv__", "__floordiv__", "__ifloordiv__", "__rfloorfiv__", + "__pow__", "__ipow__", "__rpow__", "__lshift__", "__ilshift__", "__rlshift__", + "__rshift__", "__irshift__", "__rrshift__", "__and__", "__iand__", "__rand__", + "__or__", "__ior__", "__ror__", "__xor__", "__ixor__", "__rxor__", "__mod__", + "__imod__", "__rmod__", "__divmod__", "__idivmod__", "__rdivmod__", "__pos__", + "__neg__", "__int__", "__float__", "__long__", "__oct__", "__hex__", "__coerce__", + "__eq__", "__ne__", "__le__", "__ge__", "__lt__", "__gt__", "__cmp__", + ] + + # i added '__class__' to the special attributes, but it broke some things + # (like dir), so we'll have to live without it + special_attributes = ["__doc__", "__module__"] + + def make_method(name): + def caller(self, *a, **k): + return self.__getattr__(name)(*a, **k) + return caller + + def make_property(name): + def getter(self): + return self.__getattr__(name) + def setter(self, value): + self.__setattr__(name, value) + def deller(self): + self.__delattr__(name) + return property(getter, setter, deller) + + classdict = {} + for sn in special_methods: + classdict[sn] = make_method(sn) + classdict.update(dict) + for sa in special_attributes: + classdict[sa] = make_property(sa) + return type(name, bases, classdict) + +class NetProxy(object): + """NetProxy - convey local operations to the remote object. this is an abstract class""" + __metaclass__ = fully_dynamic_metaclass + + def __init__(self, conn, oid): + self.__dict__["____conn"] = conn + self.__dict__["____oid"] = oid + + def __request__(self, handler, *args): + raise NotImplementedError() + + def __call__(self, *args, **kwargs): + return self.__request__("handle_call", args, ImmDict(kwargs)) + + def __delattr__(self, *args): + return self.__request__("handle_delattr", *args) + def __getattr__(self, *args): + return self.__request__("handle_getattr", *args) + def __setattr__(self, *args): + return self.__request__("handle_setattr", *args) + + def __delitem__(self, *args): + return self.__request__("handle_delitem", *args) + def __getitem__(self, *args): + return self.__request__("handle_getitem", *args) + def __setitem__(self, *args): + return self.__request__("handle_setitem", *args) + + # special cases + def __repr__(self, *args): + return self.__request__("handle_repr", *args) + def __str__(self, *args): + return self.__request__("handle_str", *args) + def __nonzero__(self, *args): + return self.__request__("handle_bool", *args) + +class NetProxyWrapper(NetProxy): + """a netproxy that wraps an inner netproxy""" + __doc__ = NetProxy.__doc__ + + def __init__(self, proxy): + NetProxy.__init__(self, proxy.__dict__["____conn"], proxy.__dict__["____oid"]) + # we must keep the original proxy alive + self.__dict__["____original_proxy"] = proxy + +def _dummy_callback(*args, **kw): + pass + +class SyncNetProxy(NetProxy): + """the default, synchronous netproxy""" + __doc__ = NetProxy.__doc__ + + def __init__(self, conn, oid): + NetProxy.__init__(self, conn, oid) + self.__dict__["____conn"].sync_request("handle_incref", self.__dict__["____oid"]) + + def __del__(self): + try: + # decref'ing is done asynchronously, because we dont need to wait for the remote + # object to die. moreover, we dont care if it fails, because that would mean the + # connection is broken, so the remote object is already dead + self.__dict__["____conn"].async_request(_dummy_callback, "handle_decref", self.__dict__["____oid"]) + except: + pass + + def __request__(self, handler, *args): + return self.__dict__["____conn"].sync_request(handler, self.__dict__["____oid"], *args) + + + + diff --git a/Rpyc/Servers/ServerUtils.py b/Rpyc/Servers/ServerUtils.py new file mode 100644 index 0000000..e9b361d --- /dev/null +++ b/Rpyc/Servers/ServerUtils.py @@ -0,0 +1,90 @@ +import os +import socket +import sys +import gc +from threading import Thread +from Rpyc.Connection import Connection +from Rpyc.Stream import SocketStream, PipeStream +from Rpyc.Channel import Channel +from Rpyc.Lib import DEFAULT_PORT + + +class Logger(object): + def __init__(self, logfile = None, active = True): + self.logfile = logfile + self.active = active + def __call__(self, *args): + if not self.logfile: + return + if not self.active: + return + text = " ".join([str(a) for a in args]) + self.logfile.write("[%d] %s\n" % (os.getpid(), text)) + self.logfile.flush() + +log = Logger(sys.stdout) + +def _serve(chan): + conn = Connection(chan) + try: + try: + while True: + conn.serve() + except EOFError: + pass + finally: + conn.close() + gc.collect() + +def serve_stream(stream, authenticate = False, users = None): + chan = Channel(stream) + + if authenticate: + from Rpyc.Authentication import accept + log("requiring authentication") + if accept(chan, users): + log("authenication successful") + else: + log("authentication failed") + return + + _serve(chan) + +def create_listener_socket(port): + sock = socket.socket() + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + #sock.bind(("", port)) + sock.bind(("localhost", port)) + sock.listen(4) + log("listening on", sock.getsockname()) + return sock + +def serve_socket(sock, **kw): + sockname = sock.getpeername() + log("welcome", sockname) + try: + try: + serve_stream(SocketStream(sock), **kw) + except socket.error: + pass + finally: + log("goodbye", sockname) + +def serve_pipes(incoming, outgoing, **kw): + serve_stream(PipeStream(incoming, outgoing), **kw) + +def threaded_server(port = DEFAULT_PORT, **kwargs): + sock = create_listener_socket(port) + while True: + newsock, name = sock.accept() + t = Thread(target = serve_socket, args = (newsock,), kwargs = kwargs) + t.setDaemon(True) + t.start() + +def start_threaded_server(*args, **kwargs): + """starts the threaded_server on a separate thread. this turns the + threaded_server into a mix-in you can place anywhere in your code""" + t = Thread(target = threaded_server, args = args, kwargs = kwargs) + t.setDaemon(True) + t.start() + diff --git a/Rpyc/Servers/__init__.py b/Rpyc/Servers/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/Rpyc/Servers/auth_server.py b/Rpyc/Servers/auth_server.py new file mode 100644 index 0000000..b13fb20 --- /dev/null +++ b/Rpyc/Servers/auth_server.py @@ -0,0 +1,19 @@ +from ServerUtils import DEFAULT_PORT, threaded_server + +# +# define user:password pairs of your own +# +users = { + "johnnash" : "t3hgam3r", + "tolkien" : "1ring", + "yossarian" : "catch22", +} + +def main(port = DEFAULT_PORT): + threaded_server(port, authenticate = True, users = users) + +if __name__ == "__main__": + main() + + + diff --git a/Rpyc/Servers/forking_server.py b/Rpyc/Servers/forking_server.py new file mode 100644 index 0000000..bfe78c8 --- /dev/null +++ b/Rpyc/Servers/forking_server.py @@ -0,0 +1,26 @@ +import sys +import os +from ServerUtils import serve_socket, create_listener_socket, DEFAULT_PORT + + +def serve_in_child(sock): + """forks a child to run the server in. the parent doesnt wait() for the child, + so if you do a `ps`, you'll see zombies. but who cares. i used to do a doublefork() + for that, but it's really meaningless. anyway, when the parent dies, the zombies + die as well.""" + if os.fork() == 0: + try: + serve_socket(sock) + finally: + sys.exit() + +def main(port = DEFAULT_PORT): + sock = create_listener_socket(port) + while True: + newsock, name = sock.accept() + serve_in_child(newsock) + +if __name__ == "__main__": + main() + + \ No newline at end of file diff --git a/Rpyc/Servers/selecting_server.py b/Rpyc/Servers/selecting_server.py new file mode 100644 index 0000000..0136d63 --- /dev/null +++ b/Rpyc/Servers/selecting_server.py @@ -0,0 +1,29 @@ +import select +import socket +from ServerUtils import log, create_listener_socket, DEFAULT_PORT, SocketStream, Connection + + +def main(port = DEFAULT_PORT): + sock = create_listener_socket(port) + connections = [] + + while True: + rlist, wlist, xlist = select.select([sock] + connections, [], []) + + if sock in rlist: + rlist.remove(sock) + newsock, name = sock.accept() + conn = Connection(SocketStream(newsock)) + conn.sockname = name + connections.append(conn) + log("welcome", conn.sockname) + + for conn in rlist: + try: + conn.serve() + except (EOFError, socket.error): + connections.remove(conn) + log("goodbyte", conn.sockname) + +if __name__ == "__main__": + main() diff --git a/Rpyc/Servers/simple_server.py b/Rpyc/Servers/simple_server.py new file mode 100644 index 0000000..60315a5 --- /dev/null +++ b/Rpyc/Servers/simple_server.py @@ -0,0 +1,13 @@ +from ServerUtils import serve_socket, create_listener_socket, DEFAULT_PORT + + +def main(port = DEFAULT_PORT): + sock = create_listener_socket(port) + while True: + newsock, name = sock.accept() + serve_socket(newsock) + +if __name__ == "__main__": + main() + + \ No newline at end of file diff --git a/Rpyc/Servers/std_server.py b/Rpyc/Servers/std_server.py new file mode 100644 index 0000000..c556b83 --- /dev/null +++ b/Rpyc/Servers/std_server.py @@ -0,0 +1,31 @@ +#!/usr/bin/env python +# +# installation instructions +# * add a service in /etc/services for rpyc: tcp port 18812 +# * add "rpyc .... /usr/lib/pythonXX/site-packages/Rpyc/Servers/std_server.py" +# to /etc/inetd.conf (i dont remember syntax, rtfm) +# * dont forget to chmod +x this file +# * restart inetd with sighup +# +import sys +import time +from traceback import format_exception +from ServerUtils import log, serve_pipes + + +def main(filename = "/tmp/rpyc-server.log"): + log.logfile = open(filename, "a") + log("-" * 80) + log("started serving at", time.asctime()) + try: + try: + serve_pipes(sys.stdin, sys.stdout) + except: + log(*format_exception(*sys.exc_info())) + finally: + log("server exits at", time.asctime()) + +if __name__ == "__main__": + main() + + \ No newline at end of file diff --git a/Rpyc/Servers/threaded_server.py b/Rpyc/Servers/threaded_server.py new file mode 100644 index 0000000..bc65605 --- /dev/null +++ b/Rpyc/Servers/threaded_server.py @@ -0,0 +1,9 @@ +from ServerUtils import DEFAULT_PORT, threaded_server + + +def main(port = DEFAULT_PORT): + threaded_server(port) + +if __name__ == "__main__": + main() + diff --git a/Rpyc/Stream.py b/Rpyc/Stream.py new file mode 100644 index 0000000..66568b6 --- /dev/null +++ b/Rpyc/Stream.py @@ -0,0 +1,117 @@ +import select +import socket + + +class Stream(object): + """ + a stream is a file-like object that is used to expose a consistent and uniform interface + to the 'physical' file-like objects (like sockets and pipes), which have many quirks (sockets + may recv() less than `count`, pipes are simplex and don't flush, etc.). + a stream is always in blocking mode. + """ + + def close(self): + raise NotImplementedError() + + def fileno(self): + raise NotImplementedError() + + def is_available(self): + rlist, wlist, xlist = select.select([self], [], [], 0) + return bool(rlist) + + def wait(self): + select.select([self], [], []) + + def read(self, count): + raise NotImplementedError() + + def write(self, data): + raise NotImplementedError() + + +class SocketStream(Stream): + """ + a stream that operates over a socket. note: + * the socket is expected to be reliable (i.e., TCP) + * the socket is expected to be in blocking mode + """ + def __init__(self, sock): + self.sock = sock + + def __repr__(self): + host, port = self.sock.getpeername() + return "<%s(%s:%d)>" % (self.__class__.__name__, host, port) + + def from_new_socket(cls, host, port, **kw): + sock = socket.socket(**kw) + sock.connect((host, port)) + return cls(sock) + from_new_socket = classmethod( from_new_socket ) + + def fileno(self): + return self.sock.fileno() + + def close(self): + self.sock.close() + + def read(self, count): + data = [] + while count > 0: + buf = self.sock.recv(count) + if not buf: + raise EOFError() + count -= len(buf) + data.append(buf) + return "".join(data) + + def write(self, data): + while data: + count = self.sock.send(data) + data = data[count:] + + +class PipeStream(Stream): + """ + a stream that operates over two simplex pipes. + note: the pipes are expected to be in blocking mode + """ + + def __init__(self, incoming, outgoing): + self.incoming = incoming + self.outgoing = outgoing + + def fileno(self): + return self.incoming.fileno() + + def close(self): + self.incoming.close() + self.outgoing.close() + + def read(self, count): + data = [] + while count > 0: + buf = self.incoming.read(count) + if not buf: + raise EOFError() + count -= len(buf) + data.append(buf) + return "".join(data) + + def write(self, data): + self.outgoing.write(data) + self.outgoing.flush() + + # win32: stubs + import sys + if sys.platform == "win32": + def is_available(self): + return True + + def wait(self): + pass + + + + + diff --git a/Rpyc/Utils.py b/Rpyc/Utils.py new file mode 100644 index 0000000..00963c2 --- /dev/null +++ b/Rpyc/Utils.py @@ -0,0 +1,265 @@ +""" +convenience utilities: + * provides dir(), getattr(), hasattr(), help() and reload() that support netproxies + * provides obtain() for really transfering remote objects + * provides upload() and download() for working with files + * provides a nice interface for remote shell operations (like os.system) + and a openning a remote python interpreter (for debugging, etc.) + +i removed lots of stuff from the __all__, keeping only the useful things, +so that import * wouldnt mess your namespace too much +""" +import __builtin__ +import sys +import os +import inspect +from NetProxy import NetProxy + +__all__ = [ + "dir", "getattr", "hasattr", "help", "reload", "obtain", + "upload", "download", +] + +CHUNK_SIZE = 4096 + +class UtilityError(Exception): + pass + +# +# working with netproxies +# +def dir(*obj): + """a version of dir() that supports NetProxies""" + if not obj: + inspect_stack = [inspect.stack()[1][0].f_locals.keys()] + inspect_stack.sort() + return inspect_stack + if not len(obj) == 1: + raise TypeError("dir expected at most 1 arguments, got %d" % (len(obj),)) + obj = obj[0] + if isinstance(obj, NetProxy): + return obj.__dict__["____conn"].modules["__builtin__"].dir(obj) + else: + return __builtin__.dir(obj) + +def getattr(obj, name, *default): + """a version of getattr() that supports NetProxies""" + if len(default) > 1: + raise TypeError("getattr expected at most 3 arguments, got %d" % (2 + len(default),)) + if isinstance(obj, NetProxy): + try: + return obj.__getattr__(name) + except AttributeError: + if not default: + raise + return default[0] + else: + return __builtin__.getattr(obj, name, *default) + +def hasattr(obj, name): + """a version of hasattr() that supports NetProxies""" + try: + getattr(obj, name) + except AttributeError: + return False + else: + return True + +class _Helper(object): + """a version of help() that supports NetProxies""" + def __repr__(self): + return repr(__builtin__.help) + def __call__(self, obj = None): + if isinstance(obj, NetProxy): + print "Help on NetProxy object for an instance of %r:" % (obj.__getattr__("__class__").__name__,) + print + print "Doc:" + print obj.__doc__ + print + print "Members:" + print dir(obj) + else: + __builtin__.help(obj) +help = _Helper() + +def reload(module): + """a version of reload() that supports NetProxies""" + if isinstance(module, NetProxy): + return module.__dict__["____conn"].modules["__builtin__"].reload(module) + else: + return __builtin__.reload(module) + +def obtain(proxy): + """transfers a remote object to this process. this is done by pickling it, so it + must be a picklable object, and should be immutable (otherwise the local object + may be different from the remote one, which may cause problems). generally speaking, + you should not obtain remote objects, as NetProxies provide a stronger mechanism. + but there are times when you want to get the real object in your hand, for pickling + it locally (e.g., storing test results in a file), or if the connection is too slow.""" + import cPickle + dumped = proxy.__dict__["____conn"].modules.cPickle.dumps(proxy) + return cPickle.loads(dumped) + +def getconn(obj): + """returns the connection of a NetProxy""" + if "____conn" not in obj.__dict__: + raise TypeError("`obj` is not a NetProxy") + return proxy.__dict__["____conn"] + +# +# working with files +# +def upload(conn, localpath, remotepath, *a, **k): + """uploads a file or a directory recursively (depending on what `localpath` is)""" + if os.path.isdir(localpath): + upload_dir(conn, localpath, remotepath, *a, **k) + elif os.path.isfile(localpath): + upload_file(conn, localpath, remotepath, *a, **k) + else: + raise UtilityError("can only upload files or directories") + +def download(conn, remotepath, localpath, *a, **k): + """downloads a file or a directory recursively (depending on what `remotepath` is)""" + if conn.modules.os.path.isdir(remotepath): + download_dir(conn, remotepath, localpath, *a, **k) + elif conn.modules.os.path.isfile(remotepath): + download_file(conn, remotepath, localpath, *a, **k) + else: + raise UtilityError("can only download files or directories") + +def upload_file(conn, localpath, remotepath): + lf = open(localpath, "rb") + rf = conn.modules.__builtin__.open(remotepath, "wb") + while True: + chunk = lf.read(CHUNK_SIZE) + if not chunk: + break + rf.write(chunk) + lf.close() + rf.close() + +def download_file(conn, remotepath, localpath): + lf = open(localpath, "wb") + rf = conn.modules.__builtin__.open(remotepath, "rb") + while True: + chunk = rf.read(CHUNK_SIZE) + if not chunk: + break + lf.write(chunk) + lf.close() + rf.close() + +def upload_dir(conn, localpath, remotepath, extensions = [""]): + if not conn.modules.os.path.exists(remotepath): + conn.modules.os.makedirs(remotepath) + for fn in os.listdir(localpath): + lfn = os.path.join(localpath, fn) + rfn = conn.modules.os.path.join(remotepath, fn) + if os.path.isdir(lfn): + upload_dir(conn, lfn, rfn, extensions) + elif os.path.isfile(lfn): + for ext in extensions: + if fn.endswith(ext): + upload_file(conn, lfn, rfn) + break + +def download_dir(conn, remotepath, localpath, extensions = [""]): + if not os.path.exists(localpath): + os.makedirs(localpath) + for fn in conn.modules.os.listdir(remotepath): + lfn = os.path.join(localpath, fn) + rfn = conn.modules.os.path.join(remotepath, fn) + if conn.modules.os.path.isdir(lfn): + download_dir(conn, rfn, lfn, extensions) + elif conn.modules.os.path.isfile(lfn): + for ext in extensions: + if fn.endswith(ext): + download_file(conn, rfn, lfn) + break + +# +# distributing modules between hosts +# +def upload_package(conn, module, remotepath = None): + """uploads the given package to the server, storing it in `remotepath`. if + remotepath is None, it defaults to the server's site-packages. if the package + already exists, it is overwritten. + usage: + import xml + upload_package(conn, xml)""" + if remotepath is None: + remotepath = conn.modules["distutils.sysconfig"].get_python_lib() + localpath = os.path.dirname(module.__file__) + upload_dir(conn, localpath, remotepath, [".py", ".pyd", ".dll", ".so", ".zip"]) + +def update_module(conn, module): + """updates an existing module on the server. the local module is transfered to the + server, overwriting the old one, and is reloaded. + usage: + import xml.dom.minidom + upload_module(conn, xml.dom.minidom)""" + remote_module = conn.modules[module.__name__] + local_file = inspect.getsourcefile(module) + remote_file = inspect.getsourcefile(remote_module) + upload_file(conn, local_filem, remote_file) + reload(remote_module) + +# +# remote shell and interpreter +# +def _redirect_std(conn): + rsys = conn.modules.sys + orig = (rsys.stdin, rsys.stdout, rsys.stderr) + rsys.stdin = sys.stdin + rsys.stdout = sys.stdout + rsys.stderr = sys.stderr + return orig + +def _restore_std(conn, (stdin, stdout, stderr)): + rsys = conn.modules.sys + rsys.stdin = stdin + rsys.stdout = stdout + rsys.stderr = stderr + +def remote_shell(conn, command = None): + """runs the given command on the server, just like os.system, but takes care + of redirecting the server's stdout/stdin to the client""" + # BUG: on windows, there's a problem with redirecting the output of spawned command. + # it runs fine and all, just the client can't see the output. again, windows sucks. + if command is None: + if sys.platform == "win32": + command = "%COMSPEC%" + else: + command = "/bin/sh" + try: + orig = _redirect_std(conn) + return conn.modules.os.system(command) + finally: + _restore_std(conn, orig) + +def remote_interpreter(conn, namespace = None): + """starts an interactive interpreter on the server""" + if namespace is None: + #namespace = inspect.stack()[1][0].f_globals.copy() + #namespace.update(inspect.stack()[1][0].f_locals) + namespace = {"conn" : conn} + try: + orig = _redirect_std(conn) + return conn.modules["Rpyc.Utils"]._remote_interpreter_server_side(**namespace) + finally: + _restore_std(conn, orig) + +def _remote_interpreter_server_side(**namespace): + import code + namespace.update(globals()) + code.interact(local = namespace) + +def remote_post_mortem(conn): + """a version of pdb.pm() that operates on exceptions at the remote side of the connection""" + import pdb + pdb.post_mortem(c.modules.sys.last_traceback) + + + + + diff --git a/Rpyc/__init__.py b/Rpyc/__init__.py new file mode 100644 index 0000000..41784bb --- /dev/null +++ b/Rpyc/__init__.py @@ -0,0 +1,10 @@ +""" +RPyC +by tomer filiba (tomerfiliba at gmail dot com) +""" +from Factories import * +from Utils import * +from Factories import __all__ as Factories_exports +from Utils import __all__ as Utils_exports + +__all__ = Factories_exports + Utils_exports diff --git a/Rpyc/changelog.txt b/Rpyc/changelog.txt new file mode 100644 index 0000000..698b93e --- /dev/null +++ b/Rpyc/changelog.txt @@ -0,0 +1,129 @@ +1.2: +----- +the first 1.XX release. supports synchronous operations + +1.21: +----- +bugfix release: fixed the problem with objects that don't have a __repr__ or +__str__ + +1.6: +----- +this version never came public. added thread synchronization and events, +which allow the server to inform the client of events, without blocking +for client response. + +2.2: +----- +non backwards-compatible! +first 2.XX release, added asynchronous proxies and thread synchronization +this has brought with it a new kind of server -- the threaded_server, +which performs well on both linux and windows. the older 'events' mechanism +was removed as asynchornous proxies are much better, and also allow +distributed computing. +also, added the Utils module, which provide many convenience functions. +in version 1.XX, i just overridden __builtin__.xxx, which was something +you might not have wanted. so now you do "from Utils import *" +also: revised demos +note: the selecing and simple servers and deprecated, and are there only +for systems that don't support threads (some older flavors of unix). + +knonw bugs: + * windows bug: the pipe parent/child dont work on windows + +2.22: +----- +some bug fixes to the servers, etc. +the selecting server turned out buggy. don't use it. +added a new demo + +known bugs: + * the selecting server + * windows bug: the Utils.remote_shell doesnt redirect the output of the +spawned command. + +2.25: +----- +fixed the selecting server +fixed a bug in download (the problem with copy-paste). thanks go to steff. +added two new utils: upload_package and update_module. they allow you to +upload packages to the server, and update and existing module. i dont think +they are very useful, but what the heck. + +2.26: +----- +fixed a bug when the remote object does not provide __nonzero__ + +2.30: +----- +fixed several minor bugs (mostly semantic) +added protection for installing excepthook more than once +added the `obtain` funcion, which "brings forth" a remote object +added remote_post_mortem (debugging the traceback of a remote exception) +added an optional callback for Async proxies (which is called when they are ready) +therefore, the AsyncCallback mechanism was removed. +changed demo-3 and added demo-5. +fixed a bug: __del__ should not be synchronous +connection objects now have a `remote_conn` property, letting you mess around with +the remote side of the connection + +2.31: +----- +when you close() a connection, it releases all resources as well (this is done to +untangle cyclic-dependencies and make the gc happy) +thanks to that, servers now release resources better and sooner + +2.32: +----- +fixed a bug in __all__ of factories.py +removed all the stuff from __init__.py (just useless) +cleanups +semantic corrections + +2.35: +----- +fixed a potential bug in Connection.close +converted FullyDyanmicMetaClass to a function (instead of a class). metaclasses are +too magical by themselves, so i didn't want to over-do it. this caused trouble with +__doc__ of netproxies, but i found a way around it. +added docstrings throughout the code +updated the servers +i will add an ssl-enabled server when i have time +w00t! rpyc is getting very stable (no real bugs) + +2.36: +----- +added 'threaded_server' to ServerUtils. it's a mix-in you can use wherever you +like, instead of writing a server by yourself. +improved the logging mechanism of ServerUtils + +2.40: +----- +added imports to __init__.py, so you can now do "from Rpyc import *". this +is backwards compatible however, "from Rpyc.Factories import SocketConnection" +still works. +cleaned a little the __all__ of Utils.py +new feature: added 'execute' and 'namespace'. 'execute' lets you execute code +on the remote side of the connection, and 'namespace' is the namespace in which +'execute' evaluates. +added demo-6.py to show how to use it +fixed demo-2.py (now that remote_interpreter isn't a part of Utils.__al__) + +2.45: +----- +cleanups: improved the unboxing of ImmDicts, some other minor things +bugfix: PipeStream.write expected write to return the number of bytes written, +as is the case with sockets. this is not the case with file objects, however, +which meant the operation blocked indefinitely. thanks to rotem yaari for +reporting the bug +this also solves a long time bug with windows: the pipe demo now works with +windows. i had to stub stream.wait() and stream.is_available() on windows, +so it's less efficient and Async wouldn't work properly, but that's life. +changed a little the semantics of Stream and Channel +added authentication: this will make several people happy. the auth_server +was added, which supports authentication, so you can run the server over the +internet. only authentication is added, not encryption. you are still encouraged +to use SSL/VPNs. this was added because many people had trouble with python SSL. + + + -- 2.43.0