Update the traces and add some tests and examples
authorJulien Tribino <julien.tribino@inria.fr>
Tue, 5 Aug 2014 16:18:55 +0000 (18:18 +0200)
committerJulien Tribino <julien.tribino@inria.fr>
Tue, 5 Aug 2014 16:18:55 +0000 (18:18 +0200)
examples/omf/nepi_omf6_plexus_ping.py
examples/omf/nepi_omf6_plexus_ping_with_traces.py [new file with mode: 0644]
src/nepi/resources/omf/application.py
src/nepi/resources/omf/node.py
src/nepi/resources/omf/omf6_api.py
src/nepi/resources/omf/omf6_parser.py
test/resources/omf/omf6_vlc_traces.py [new file with mode: 0755]

index 6674c21..96ac2ef 100644 (file)
@@ -63,11 +63,11 @@ ec.set(chan, 'channel', "6")
 
 # Create and Configure the Application
 app1 = ec.register_resource("OMFApplication")
-ec.set(app1, 'command', '/bin/ping 192.168.0.49')
+ec.set(app1, 'command', '/bin/ping -c5 192.168.0.49')
 ec.set(app1, 'env', "")
 
 app2 = ec.register_resource("OMFApplication")
-ec.set(app2, 'command', '/bin/ping 192.168.0.12')
+ec.set(app2, 'command', '/bin/ping -c5 192.168.0.12')
 ec.set(app2, 'env', "")
 
 
diff --git a/examples/omf/nepi_omf6_plexus_ping_with_traces.py b/examples/omf/nepi_omf6_plexus_ping_with_traces.py
new file mode 100644 (file)
index 0000000..1e76917
--- /dev/null
@@ -0,0 +1,107 @@
+"""
+    NEPI, a framework to manage network experiments
+    Copyright (C) 2013 INRIA
+
+    This program is free software: you can redistribute it and/or modify
+    it under the terms of the GNU General Public License as published by
+    the Free Software Foundation, either version 3 of the License, or
+    (at your option) any later version.
+
+    This program is distributed in the hope that it will be useful,
+    but WITHOUT ANY WARRANTY; without even the implied warranty of
+    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+    GNU General Public License for more details.
+
+    You should have received a copy of the GNU General Public License
+    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+    Author: Alina Quereilhac <alina.quereilhac@inria.fr>
+            Julien Tribino <julien.tribino@inria.fr>
+
+
+"""
+
+#!/usr/bin/env python
+from nepi.execution.resource import ResourceFactory, ResourceAction, ResourceState
+from nepi.execution.ec import ExperimentController
+
+# Create the EC
+ec = ExperimentController()
+
+# Create and Configure the Nodes
+
+node1 = ec.register_resource("OMFNode")
+ec.set(node1, 'hostname', 'wlab12')
+ec.set(node1, 'xmppServer', "xmpp-plexus.onelab.eu")
+ec.set(node1, 'xmppUser', "nepi")
+ec.set(node1, 'xmppPort', "5222")
+ec.set(node1, 'xmppPassword', "1234")
+
+iface1 = ec.register_resource("OMFWifiInterface")
+ec.set(iface1, 'name', 'wlan0')
+ec.set(iface1, 'mode', "adhoc")
+ec.set(iface1, 'hw_mode', "g")
+ec.set(iface1, 'essid', "ping")
+ec.set(iface1, 'ip', "192.168.0.12/24")
+
+node2 = ec.register_resource("OMFNode")
+ec.set(node2, 'hostname', 'wlab49')
+ec.set(node2, 'xmppServer', "xmpp-plexus.onelab.eu")
+ec.set(node2, 'xmppUser', "nepi")
+ec.set(node2, 'xmppPort', "5222")
+ec.set(node2, 'xmppPassword', "1234")
+
+iface2 = ec.register_resource("OMFWifiInterface")
+ec.set(iface2, 'name', 'wlan0')
+ec.set(iface2, 'mode', "adhoc")
+ec.set(iface2, 'hw_mode', "g")
+ec.set(iface2, 'essid', "ping")
+ec.set(iface2, 'ip', "192.168.0.49/24")
+
+chan = ec.register_resource("OMFChannel")
+ec.set(chan, 'channel', "6")
+
+# Create and Configure the Application
+app1 = ec.register_resource("OMFApplication")
+ec.set(app1, 'command', '/bin/ping -c5 192.168.0.49')
+ec.set(app1, 'env', "")
+
+app2 = ec.register_resource("OMFApplication")
+ec.set(app2, 'command', '/bin/ping -c5 192.168.0.12')
+ec.set(app2, 'env', "")
+
+
+# Connection
+ec.register_connection(iface1, node1)
+ec.register_connection(iface2, node2)
+ec.register_connection(iface1, chan)
+ec.register_connection(iface2, chan)
+ec.register_connection(app1, node1)
+ec.register_connection(app2, node2)
+
+ec.register_condition([app2], ResourceAction.START, app1, ResourceState.STARTED , "2s")
+ec.register_condition([app1,app2], ResourceAction.STOP, app2, ResourceState.STARTED , "10s")
+
+
+# Deploy
+ec.deploy()
+
+ec.wait_finished([app1,app2])
+
+stdout_1 = ec.trace(app1, "stdout")
+stdout_2 = ec.trace(app2, "stdout")
+
+# Choose a directory to store the traces, by default
+# It it the folder ehere you run Nepi.
+f = open("app1.txt", "w")
+f.write(stdout_1)
+f.close()
+
+g = open("app2.txt", "w")
+g.write(stdout_2)
+g.close()
+
+# Stop Experiment
+ec.shutdown()
+
index 6af1d89..7a87d2a 100644 (file)
@@ -23,6 +23,7 @@ import os
 from nepi.util.timefuncs import tnow
 from nepi.execution.resource import ResourceManager, clsinit_copy, \
         ResourceState, reschedule_delay
+from nepi.execution.trace import Trace, TraceAttr
 from nepi.execution.attribute import Attribute, Flags 
 from nepi.resources.omf.omf_resource import ResourceGateway, OMFResource
 from nepi.resources.omf.node import OMFNode, confirmation_counter, reschedule_check
@@ -268,43 +269,30 @@ class OMFApplication(OMFResource):
             return uid
         return False
 
-    def trace_filepath(self, filename):
-        return os.path.join('~/', filename)
-
     def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
         self.info("Retrieving '%s' trace %s " % (name, attr))
+        if name == 'stdout' :
+            suffix = '.out'
+        elif name == 'stderr' :
+            suffix = '.err'
+        else :
+            suffix = '.misc'
 
-        path = self.trace_filepath(str(self.guid) + '_' + name)
-        
-        command = "(test -f %s && echo 'success') || echo 'error'" % path
-        (out, err), proc = self.node.execute(command)
+        trace_path = '/tmp/'+ self._topic_app + suffix
 
-        if (err and proc.poll()) or out.find("error") != -1:
-            msg = " Couldn't find trace %s " % name
-            self.error(msg, out, err)
-            return None
-    
         if attr == TraceAttr.PATH:
-            return path
+            return trace_path
 
         if attr == TraceAttr.ALL:
-            (out, err), proc = self.node.check_output(self.run_home, name)
-            
-            if proc.poll():
-                msg = " Couldn't read trace %s " % name
-                self.error(msg, out, err)
-                return None
-
-            return out
-
+            try:
+                f = open(trace_path ,'r')
+            except IOError:
+                print "File with traces has not been found"
+                return False
+            out = f.read()
+            f.close()
         return out
 
-    def check_output(self, home, filename):
-        """ Retrives content of file """
-        (out, err), proc = self.execute("cat %s" % 
-            os.path.join(home, filename), retry = 1, with_lock = True)
-        return (out, err), proc
-
 
     def do_start(self):
         """ Start the RM. It means : Send Xmpp Message Using OMF protocol 
@@ -373,6 +361,8 @@ class OMFApplication(OMFResource):
         State is set to STOPPED after the message is sent.
 
         """
+
+
         if self.get('version') == 5:
             self._omf_api.exit(self.node.get('hostname'),self.get('appid'))
         super(OMFApplication, self).do_stop()
@@ -415,6 +405,12 @@ class OMFApplication(OMFResource):
                     msg = "Couldn't retrieve the confirmation of the release"
                     self.error(msg)
 
+                # Remove the stdout and stderr of the application
+                try:
+                    os.remove('/tmp/'+self._topic_app +'.out')
+                    os.remove('/tmp/'+self._topic_app +'.err')
+                except OSError:
+                    pass
 
             OMFAPIFactory.release_api(self.get('version'), 
               self.get('xmppServer'), self.get('xmppUser'), self.get('xmppPort'),
index 74f1775..6ac01de 100644 (file)
@@ -28,7 +28,7 @@ from nepi.resources.omf.omf_api_factory import OMFAPIFactory
 import time
 
 reschedule_check = "0.2s"
-confirmation_counter = 600
+confirmation_counter = 3000
 
 @clsinit_copy
 class OMFNode(OMFResource):
index 352a18e..f3574a2 100644 (file)
@@ -249,6 +249,9 @@ class OMF6API(Logger):
         """ Delete the session and logger topics. Then disconnect 
 
         """
+        # To receive the last messages
+        time.sleep(2)
+
         self._client.delete(self._nepi_topic)
        
         # Wait the send queue to be empty before disconnect
index 4ea5717..c3b37a8 100644 (file)
@@ -51,9 +51,12 @@ class OMF6Parser(Logger):
         """
         super(OMF6Parser, self).__init__("OMF6API")
         self.mailbox={}
+        self.traces={}
+        self.trace='NULL'
 
         self.init_mailbox()
 
+
     def init_mailbox(self):
         self.mailbox['create'] = []
         self.mailbox['started'] = []
@@ -169,18 +172,40 @@ class OMF6Parser(Logger):
         """
         props = self._check_for_props(root, namespaces)
         uid = self._check_for_tag(root, namespaces, "uid")
-        msg = "STATUS -- "
+        event = self._check_for_tag(root, namespaces, "event")
+
+        if event == "STDOUT":
+            if not uid+'out' in self.traces:
+                f = open('/tmp/'+ uid + '.out','w')
+                self.traces[uid+'out'] = f
+            self.trace = self.traces[uid+'out']
+        elif event == "STDERR" :
+            if not uid+'err' in self.traces:
+                g = open('/tmp/'+ uid + '.err','w')
+                self.traces[uid+'err'] = g
+            self.trace = self.traces[uid+'err']
+        elif event == "EXIT" :
+            if uid+'out' in self.traces:
+                self.traces[uid+'out'].close()
+            if uid+'err' in self.traces:
+                self.traces[uid+'err'].close()
+            
+        log = "STATUS -- "
         for elt in props.keys():
             ns, tag = elt.split('}')
             if tag == "it":
-                msg = msg + "membership : " + props[elt]+" -- "
+                log = log + "membership : " + props[elt]+" -- "
             elif tag == "event":
                 self.mailbox['started'].append(uid)
-                msg = msg + "event : " + props[elt]+" -- "
+                log = log + "event : " + props[elt]+" -- "
+            elif tag == "msg":
+                if event == "STDOUT" or event == "STDERR" :
+                    self.trace.write(props[elt]+'\n')
+                log = log + tag +" : " + props[elt]+" -- "
             else:
-                msg = msg + tag +" : " + props[elt]+" -- "
-        msg = msg + " STATUS "
-        self.info(msg)
+                log = log + tag +" : " + props[elt]+" -- "
+        log = log + " STATUS "
+        self.info(log)
 
     def _inform_released(self, root, namespaces):
         """ Parse and Display RELEASED message
diff --git a/test/resources/omf/omf6_vlc_traces.py b/test/resources/omf/omf6_vlc_traces.py
new file mode 100755 (executable)
index 0000000..e06fa33
--- /dev/null
@@ -0,0 +1,116 @@
+#!/usr/bin/env python
+#
+#    NEPI, a framework to manage network experiments
+#    Copyright (C) 2013 INRIA
+#
+#    This program is free software: you can redistribute it and/or modify
+#    it under the terms of the GNU General Public License as published by
+#    the Free Software Foundation, either version 3 of the License, or
+#    (at your option) any later version.
+#
+#    This program is distributed in the hope that it will be useful,
+#    but WITHOUT ANY WARRANTY; without even the implied warranty of
+#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+#    GNU General Public License for more details.
+#
+#    You should have received a copy of the GNU General Public License
+#    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+#
+# Author: Julien Tribino <julien.tribino@inria.fr>
+
+
+from nepi.execution.resource import ResourceFactory, ResourceManager, ResourceAction, ResourceState
+from nepi.execution.ec import ExperimentController
+
+from nepi.resources.omf.node import OMFNode
+from nepi.resources.omf.application import OMFApplication
+from nepi.resources.omf.interface import OMFWifiInterface
+from nepi.resources.omf.channel import OMFChannel
+from nepi.resources.omf.omf_api_factory import OMFAPIFactory
+
+from nepi.util.timefuncs import *
+
+import os
+import time
+import unittest
+
+class OMFPingNormalCase(unittest.TestCase):
+    def test_deploy(self):
+        ec = ExperimentController(exp_id = "5421" )
+
+        self.node1 = ec.register_resource("OMFNode")
+        ec.set(self.node1, 'hostname', 'wlab12')
+        ec.set(self.node1, 'xmppUser', "nepi")
+        ec.set(self.node1, 'xmppServer', "xmpp-plexus.onelab.eu")
+        ec.set(self.node1, 'xmppPort', "5222")
+        ec.set(self.node1, 'xmppPassword', "1234")
+        
+        self.iface1 = ec.register_resource("OMFWifiInterface")
+        ec.set(self.iface1, 'name', "wlan0")
+        ec.set(self.iface1, 'mode', "adhoc")
+        ec.set(self.iface1, 'hw_mode', "g")
+        ec.set(self.iface1, 'essid', "vlcexp")
+        ec.set(self.iface1, 'ip', "10.0.0.17/24")
+        
+        self.channel = ec.register_resource("OMFChannel")
+        ec.set(self.channel, 'channel', "6")
+        ec.set(self.channel, 'xmppUser', "nepi")
+        ec.set(self.channel, 'xmppServer', "xmpp-plexus.onelab.eu")
+        ec.set(self.channel, 'xmppPort', "5222")
+        ec.set(self.channel, 'xmppPassword', "1234")
+        
+        self.app1 = ec.register_resource("OMFApplication")
+        ec.set(self.app1, 'appid', 'Vlc#1')
+        ec.set(self.app1, 'command', "ping -c5 10.0.0.17")
+
+        ec.register_connection(self.app1, self.node1)
+        ec.register_connection(self.node1, self.iface1)
+        ec.register_connection(self.iface1, self.channel)
+
+        ec.register_condition(self.app1, ResourceAction.STOP, self.app1, ResourceState.STARTED , "10s")
+
+        ec.deploy()
+
+        ec.wait_finished(self.app1)
+
+        stdout_1 = ec.trace(self.app1, "stdout")
+        stderr_1 = ec.trace(self.app1, "stderr")
+
+        if stdout_1:
+            f = open("app1_out.txt", "w")
+            f.write(stdout_1)
+            f.close()
+
+        if stderr_1:
+            f = open("app1_err.txt", "w")
+            f.write(stderr_1)
+            f.close()
+
+        self.assertEquals(ec.get_resource(self.node1).state, ResourceState.STARTED)
+        self.assertEquals(ec.get_resource(self.iface1).state, ResourceState.STARTED)
+        self.assertEquals(ec.get_resource(self.channel).state, ResourceState.STARTED)
+        self.assertEquals(ec.get_resource(self.app1).state, ResourceState.STOPPED)
+
+        ec.shutdown()
+
+        self.assertEquals(ec.get_resource(self.node1).state, ResourceState.RELEASED)
+        self.assertEquals(ec.get_resource(self.iface1).state, ResourceState.RELEASED)
+        self.assertEquals(ec.get_resource(self.channel).state, ResourceState.RELEASED)
+        self.assertEquals(ec.get_resource(self.app1).state, ResourceState.RELEASED)
+
+        t = open("app1_out.txt", "r")
+        l = t.readlines()
+        self.assertEquals(l[0], "PING 10.0.0.17 (10.0.0.17) 56(84) bytes of data.\n")
+        self.assertIn("5 packets transmitted, 5 received, 0% packet loss, time", l[-2])
+        self.assertIn("rtt min/avg/max/mdev = ", l[-1])
+        
+        t.close()
+        os.remove("app1_out.txt")
+        
+
+
+if __name__ == '__main__':
+    unittest.main()
+
+
+