if msg == STOP_MSG:
self._stop = True
- reply = "Stopping server"
+ reply = self.stop_action()
self.send_reply(conn, reply)
break
else:
- reply = "Replying to %s" % msg
+ reply = self.reply_action(msg)
self.send_reply(conn, reply)
conn.close()
except e:
sys.stderr.write("ERROR: %s\n" % str(e))
+ def stop_action(self):
+ return "Stopping server"
+
+ def reply_action(self, msg):
+ return "Reply to: %s" % msg
+
class Forwarder(object):
def __init__(self, root_dir = "."):
self._ctrl_sock = None
while not self._stop:
data = self.read_data()
self.send_to_server(data)
- reply = self.recv_from_server()
- self.write_reply(reply)
+ data = self.recv_from_server()
+ self.write_data(data)
self.disconnect()
def read_data(self):
return sys.stdin.readline()
- def write_reply(self, reply):
- sys.stdout.write("%s\n" % reply)
+ def write_data(self, data):
+ sys.stdout.write(data)
+ sys.stdout.flush()
def send_to_server(self, data):
try:
self._stop = True
def recv_from_server(self):
- data = self._ctrl_sock.recv(1024)
- encoded = data.rstrip()
- reply = base64.b64decode(encoded)
- return reply
+ return self._ctrl_sock.recv(1024)
def connect(self):
self.disconnect()
c.forward()" % root_dir
],
stdin = subprocess.PIPE,
+ stdout = subprocess.PIPE,
env = os.environ)
def send_msg(self, msg):
encoded = base64.b64encode(msg)
- self._process.stdin.write("%s\n" % encoded)
+ data = "%s\n" % encoded
+ self._process.stdin.write(data)
def send_stop(self):
self.send_msg(STOP_MSG)
- self._process.wait()
+
+ def read_reply(self):
+ data = self._process.stdout.readline()
+ encoded = data.rstrip()
+ return base64.b64decode(encoded)
--- /dev/null
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+from nepi.util import server
+import os
+import shutil
+import sys
+import unittest
+import uuid
+
+class ServerTestCase(unittest.TestCase):
+ def setUp(self):
+ self._root_dir = os.path.join(os.getenv("HOME"), ".nepi",
+ str(uuid.uuid1()))
+ os.makedirs(self._root_dir)
+
+ def test_server(self):
+ s = server.Server(self._root_dir)
+ s.run()
+ c = server.Client(self._root_dir)
+ c.send_msg("Hola")
+ reply = c.read_reply()
+ self.assertTrue(reply == "Reply to: Hola")
+ c.send_stop()
+ reply = c.read_reply()
+ self.assertTrue(reply == "Stopping server")
+
+ def tearDown(self):
+ shutil.rmtree(self._root_dir)
+
+if __name__ == '__main__':
+ unittest.main()
+