clearer names for actions, and infer actions better
[monitor.git] / monitor / Rpyc / Stream.py
1 import select\r
2 import socket\r
3 \r
4 \r
5 class Stream(object):\r
6     """\r
7     a stream is a file-like object that is used to expose a consistent and uniform interface\r
8     to the 'physical' file-like objects (like sockets and pipes), which have many quirks (sockets\r
9     may recv() less than `count`, pipes are simplex and don't flush, etc.).\r
10     a stream is always in blocking mode.\r
11     """\r
12     \r
13     def close(self):\r
14         raise NotImplementedError()\r
15 \r
16     def fileno(self):\r
17         raise NotImplementedError()\r
18 \r
19     def is_available(self):\r
20         rlist, wlist, xlist = select.select([self], [], [], 0)\r
21         return bool(rlist)\r
22 \r
23     def wait(self):\r
24         select.select([self], [], [])\r
25 \r
26     def read(self, count):\r
27         raise NotImplementedError()\r
28 \r
29     def write(self, data):\r
30         raise NotImplementedError()\r
31         \r
32         \r
33 class SocketStream(Stream):\r
34     """\r
35     a stream that operates over a socket. note: \r
36         * the socket is expected to be reliable (i.e., TCP)\r
37         * the socket is expected to be in blocking mode\r
38     """\r
39     def __init__(self, sock):\r
40         self.sock = sock\r
41     \r
42     def __repr__(self):\r
43         host, port = self.sock.getpeername()\r
44         return "<%s(%s:%d)>" % (self.__class__.__name__, host, port)\r
45 \r
46     def from_new_socket(cls, host, port, **kw):\r
47         sock = socket.socket(**kw)\r
48         sock.connect((host, port))\r
49         return cls(sock)\r
50     from_new_socket = classmethod( from_new_socket )\r
51 \r
52     def fileno(self):\r
53         return self.sock.fileno()\r
54         \r
55     def close(self):\r
56         self.sock.close()\r
57         \r
58     def read(self, count):\r
59         data = []\r
60         while count > 0:\r
61             buf = self.sock.recv(count)\r
62             if not buf:\r
63                 raise EOFError()\r
64             count -= len(buf)\r
65             data.append(buf)\r
66         return "".join(data)\r
67             \r
68     def write(self, data):\r
69         while data:\r
70             count = self.sock.send(data)\r
71             data = data[count:]\r
72 \r
73 \r
74 class PipeStream(Stream):\r
75     """\r
76     a stream that operates over two simplex pipes. \r
77     note: the pipes are expected to be in blocking mode\r
78     """\r
79     \r
80     def __init__(self, incoming, outgoing):\r
81         self.incoming = incoming\r
82         self.outgoing = outgoing\r
83 \r
84     def fileno(self):\r
85         return self.incoming.fileno()\r
86         \r
87     def close(self):\r
88         self.incoming.close()\r
89         self.outgoing.close()\r
90         \r
91     def read(self, count):\r
92         data = []\r
93         while count > 0:\r
94             buf = self.incoming.read(count)\r
95             if not buf:\r
96                 raise EOFError()\r
97             count -= len(buf)\r
98             data.append(buf)\r
99         return "".join(data)\r
100             \r
101     def write(self, data):\r
102         self.outgoing.write(data)\r
103         self.outgoing.flush()\r
104 \r
105     # win32: stubs\r
106     import sys\r
107     if sys.platform == "win32":\r
108         def is_available(self):\r
109             return True\r
110 \r
111         def wait(self):\r
112             pass\r
113 \r
114 \r
115 \r
116 \r
117 \r