server/client fixes on:
authorClaudio-Daniel Freire <claudio-daniel.freire@inria.fr>
Tue, 19 Apr 2011 09:33:08 +0000 (11:33 +0200)
committerClaudio-Daniel Freire <claudio-daniel.freire@inria.fr>
Tue, 19 Apr 2011 09:33:08 +0000 (11:33 +0200)
 * error handling
 * no leaving of zombie processes
 * logging on test failures

src/nepi/util/server.py
test/util/server.py

index 975b01c..282c074 100644 (file)
@@ -12,6 +12,7 @@ import subprocess
 import threading
 import time
 import traceback
+import signal
 
 CTRL_SOCK = "ctrl.sock"
 STD_ERR = "stderr.log"
@@ -116,7 +117,7 @@ class Server(object):
         while not self._stop:
             conn, addr = self._ctrl_sock.accept()
             conn.settimeout(5)
-            while True:
+            while not self._stop:
                 try:
                     msg = self.recv_msg(conn)
                 except socket.timeout, e:
@@ -127,8 +128,18 @@ class Server(object):
                     reply = self.stop_action()
                 else:
                     reply = self.reply_action(msg)
-                self.send_reply(conn, reply)
-            conn.close()
+                
+                try:
+                    self.send_reply(conn, reply)
+                except socket.error:
+                    self.log_error()
+                    print >>sys.stderr, "NOTICE: Awaiting for reconnection"
+                    break
+            try:
+                conn.close()
+            except:
+                # Doesn't matter
+                self.log_error()
 
     def recv_msg(self, conn):
         data = ""
@@ -140,8 +151,12 @@ class Server(object):
                     raise
                 if chunk == '':
                     continue
-            data += chunk
-            if chunk[-1] == "\n":
+            if chunk:
+                data += chunk
+                if chunk[-1] == "\n":
+                    break
+            else:
+                # empty chunk = EOF
                 break
         decoded = base64.b64decode(data)
         return decoded.rstrip()
@@ -163,11 +178,13 @@ class Server(object):
     def reply_action(self, msg):
         return "Reply to: %s" % msg
 
-    def log_error(self, text = None):
+    def log_error(self, text = None, context = ''):
         if text == None:
             text = traceback.format_exc()
         date = time.strftime("%Y-%m-%d %H:%M:%S")
-        sys.stderr.write("ERROR: %s\n%s\n" % (date, text))
+        if context:
+            context = " (%s)" % (context,)
+        sys.stderr.write("ERROR%s: %s\n%s\n" % (context, date, text))
         return text
 
     def log_debug(self, text):
@@ -246,7 +263,13 @@ class Client(object):
         self.addr = (host, port)
         self.user = user
         self.agent = agent
+        self._stopped = False
         self.connect()
+    
+    def __del__(self):
+        if self._process.poll() is None:
+            os.kill(self._process.pid, signal.SIGTERM)
+        self._process.wait()
         
     def connect(self):
         root_dir = self.root_dir
@@ -284,6 +307,7 @@ class Client(object):
 
     def send_stop(self):
         self.send_msg(STOP_MSG)
+        self._stopped = True
 
     def read_reply(self):
         data = self._process.stdout.readline()
index e4155fc..f1bdd1c 100755 (executable)
@@ -20,10 +20,10 @@ class ServerTestCase(unittest.TestCase):
         c = server.Client(self.root_dir)
         c.send_msg("Hola")
         reply = c.read_reply()
-        self.assertTrue(reply == "Reply to: Hola")
+        self.assertEqual(reply, "Reply to: Hola")
         c.send_stop()
         reply = c.read_reply()
-        self.assertTrue(reply == "Stopping server")
+        self.assertEqual(reply, "Stopping server")
 
     def test_server_reconnect(self):
         s = server.Server(self.root_dir)
@@ -32,7 +32,7 @@ class ServerTestCase(unittest.TestCase):
         
         c.send_msg("Hola")
         reply = c.read_reply()
-        self.assertTrue(reply == "Reply to: Hola")
+        self.assertEqual(reply, "Reply to: Hola")
         
         # disconnect
         del c
@@ -41,11 +41,11 @@ class ServerTestCase(unittest.TestCase):
         c = server.Client(self.root_dir)
         c.send_msg("Hola")
         reply = c.read_reply()
-        self.assertTrue(reply == "Reply to: Hola")
+        self.assertEqual(reply, "Reply to: Hola")
                 
         c.send_stop()
         reply = c.read_reply()
-        self.assertTrue(reply == "Stopping server")
+        self.assertEqual(reply, "Stopping server")
 
     def test_server_auto_reconnect(self):
         s = server.Server(self.root_dir)
@@ -54,7 +54,7 @@ class ServerTestCase(unittest.TestCase):
         
         c.send_msg("Hola")
         reply = c.read_reply()
-        self.assertTrue(reply == "Reply to: Hola")
+        self.assertEqual(reply, "Reply to: Hola")
         
         # purposedly break the connection
         c._process.stdin.close()
@@ -64,11 +64,11 @@ class ServerTestCase(unittest.TestCase):
         # assert that the communication works (possible with auto-reconnection)
         c.send_msg("Hola")
         reply = c.read_reply()
-        self.assertTrue(reply == "Reply to: Hola")
+        self.assertEqual(reply, "Reply to: Hola")
                 
         c.send_stop()
         reply = c.read_reply()
-        self.assertTrue(reply == "Stopping server")
+        self.assertEqual(reply, "Stopping server")
 
     def test_server_long_message(self):
         s = server.Server(self.root_dir)
@@ -77,10 +77,10 @@ class ServerTestCase(unittest.TestCase):
         msg = "1"*1145
         c.send_msg(msg)
         reply = c.read_reply()
-        self.assertTrue(reply == ("Reply to: "+msg))
+        self.assertEqual(reply, ("Reply to: "+msg))
         c.send_stop()
         reply = c.read_reply()
-        self.assertTrue(reply == "Stopping server")
+        self.assertEqual(reply, "Stopping server")
 
     def test_ssh_server(self):
         env = test_util.test_environment()
@@ -94,10 +94,10 @@ class ServerTestCase(unittest.TestCase):
                 user = user, agent = True)
         c.send_msg("Hola")
         reply = c.read_reply()
-        self.assertTrue(reply == "Reply to: Hola")
+        self.assertEqual(reply, "Reply to: Hola")
         c.send_stop()
         reply = c.read_reply()
-        self.assertTrue(reply == "Stopping server")
+        self.assertEqual(reply, "Stopping server")
 
     def test_ssh_server_reconnect(self):
         env = test_util.test_environment()
@@ -113,7 +113,7 @@ class ServerTestCase(unittest.TestCase):
                 
         c.send_msg("Hola")
         reply = c.read_reply()
-        self.assertTrue(reply == "Reply to: Hola")
+        self.assertEqual(reply, "Reply to: Hola")
         
         # disconnect
         del c
@@ -124,11 +124,11 @@ class ServerTestCase(unittest.TestCase):
                 
         c.send_msg("Hola")
         reply = c.read_reply()
-        self.assertTrue(reply == "Reply to: Hola")
+        self.assertEqual(reply, "Reply to: Hola")
         
         c.send_stop()
         reply = c.read_reply()
-        self.assertTrue(reply == "Stopping server")
+        self.assertEqual(reply, "Stopping server")
 
     def test_ssh_server_auto_reconnect(self):
         env = test_util.test_environment()
@@ -144,7 +144,7 @@ class ServerTestCase(unittest.TestCase):
                 
         c.send_msg("Hola")
         reply = c.read_reply()
-        self.assertTrue(reply == "Reply to: Hola")
+        self.assertEqual(reply, "Reply to: Hola")
         
         # purposedly break the connection
         c._process.stdin.close()
@@ -154,11 +154,11 @@ class ServerTestCase(unittest.TestCase):
         # assert that the communication works (possible with auto-reconnection)
         c.send_msg("Hola")
         reply = c.read_reply()
-        self.assertTrue(reply == "Reply to: Hola")
+        self.assertEqual(reply, "Reply to: Hola")
         
         c.send_stop()
         reply = c.read_reply()
-        self.assertTrue(reply == "Stopping server")
+        self.assertEqual(reply, "Stopping server")
 
     def tearDown(self):
         shutil.rmtree(self.root_dir)