Merge the nepi-omf6-perf branch with the last update (traces, ...)
authorJulien Tribino <julien.tribino@inria.fr>
Mon, 22 Sep 2014 12:01:15 +0000 (14:01 +0200)
committerJulien Tribino <julien.tribino@inria.fr>
Mon, 22 Sep 2014 12:01:15 +0000 (14:01 +0200)
examples/omf/nepi_omf6_plexus_ping.py
examples/omf/nepi_omf6_plexus_ping_with_traces.py [new file with mode: 0644]
src/nepi/resources/omf/application.py
src/nepi/resources/omf/channel.py
src/nepi/resources/omf/interface.py
src/nepi/resources/omf/node.py
src/nepi/resources/omf/omf6_api.py
src/nepi/resources/omf/omf6_parser.py
test/resources/omf/omf6_vlc_traces.py [new file with mode: 0755]

index 6674c21..96ac2ef 100644 (file)
@@ -63,11 +63,11 @@ ec.set(chan, 'channel', "6")
 
 # Create and Configure the Application
 app1 = ec.register_resource("OMFApplication")
-ec.set(app1, 'command', '/bin/ping 192.168.0.49')
+ec.set(app1, 'command', '/bin/ping -c5 192.168.0.49')
 ec.set(app1, 'env', "")
 
 app2 = ec.register_resource("OMFApplication")
-ec.set(app2, 'command', '/bin/ping 192.168.0.12')
+ec.set(app2, 'command', '/bin/ping -c5 192.168.0.12')
 ec.set(app2, 'env', "")
 
 
diff --git a/examples/omf/nepi_omf6_plexus_ping_with_traces.py b/examples/omf/nepi_omf6_plexus_ping_with_traces.py
new file mode 100644 (file)
index 0000000..1e76917
--- /dev/null
@@ -0,0 +1,107 @@
+"""
+    NEPI, a framework to manage network experiments
+    Copyright (C) 2013 INRIA
+
+    This program is free software: you can redistribute it and/or modify
+    it under the terms of the GNU General Public License as published by
+    the Free Software Foundation, either version 3 of the License, or
+    (at your option) any later version.
+
+    This program is distributed in the hope that it will be useful,
+    but WITHOUT ANY WARRANTY; without even the implied warranty of
+    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+    GNU General Public License for more details.
+
+    You should have received a copy of the GNU General Public License
+    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+    Author: Alina Quereilhac <alina.quereilhac@inria.fr>
+            Julien Tribino <julien.tribino@inria.fr>
+
+
+"""
+
+#!/usr/bin/env python
+from nepi.execution.resource import ResourceFactory, ResourceAction, ResourceState
+from nepi.execution.ec import ExperimentController
+
+# Create the EC
+ec = ExperimentController()
+
+# Create and Configure the Nodes
+
+node1 = ec.register_resource("OMFNode")
+ec.set(node1, 'hostname', 'wlab12')
+ec.set(node1, 'xmppServer', "xmpp-plexus.onelab.eu")
+ec.set(node1, 'xmppUser', "nepi")
+ec.set(node1, 'xmppPort', "5222")
+ec.set(node1, 'xmppPassword', "1234")
+
+iface1 = ec.register_resource("OMFWifiInterface")
+ec.set(iface1, 'name', 'wlan0')
+ec.set(iface1, 'mode', "adhoc")
+ec.set(iface1, 'hw_mode', "g")
+ec.set(iface1, 'essid', "ping")
+ec.set(iface1, 'ip', "192.168.0.12/24")
+
+node2 = ec.register_resource("OMFNode")
+ec.set(node2, 'hostname', 'wlab49')
+ec.set(node2, 'xmppServer', "xmpp-plexus.onelab.eu")
+ec.set(node2, 'xmppUser', "nepi")
+ec.set(node2, 'xmppPort', "5222")
+ec.set(node2, 'xmppPassword', "1234")
+
+iface2 = ec.register_resource("OMFWifiInterface")
+ec.set(iface2, 'name', 'wlan0')
+ec.set(iface2, 'mode', "adhoc")
+ec.set(iface2, 'hw_mode', "g")
+ec.set(iface2, 'essid', "ping")
+ec.set(iface2, 'ip', "192.168.0.49/24")
+
+chan = ec.register_resource("OMFChannel")
+ec.set(chan, 'channel', "6")
+
+# Create and Configure the Application
+app1 = ec.register_resource("OMFApplication")
+ec.set(app1, 'command', '/bin/ping -c5 192.168.0.49')
+ec.set(app1, 'env', "")
+
+app2 = ec.register_resource("OMFApplication")
+ec.set(app2, 'command', '/bin/ping -c5 192.168.0.12')
+ec.set(app2, 'env', "")
+
+
+# Connection
+ec.register_connection(iface1, node1)
+ec.register_connection(iface2, node2)
+ec.register_connection(iface1, chan)
+ec.register_connection(iface2, chan)
+ec.register_connection(app1, node1)
+ec.register_connection(app2, node2)
+
+ec.register_condition([app2], ResourceAction.START, app1, ResourceState.STARTED , "2s")
+ec.register_condition([app1,app2], ResourceAction.STOP, app2, ResourceState.STARTED , "10s")
+
+
+# Deploy
+ec.deploy()
+
+ec.wait_finished([app1,app2])
+
+stdout_1 = ec.trace(app1, "stdout")
+stdout_2 = ec.trace(app2, "stdout")
+
+# Choose a directory to store the traces, by default
+# It it the folder ehere you run Nepi.
+f = open("app1.txt", "w")
+f.write(stdout_1)
+f.close()
+
+g = open("app2.txt", "w")
+g.write(stdout_2)
+g.close()
+
+# Stop Experiment
+ec.shutdown()
+
index 55ca910..7a87d2a 100644 (file)
 
 import os
 
+from nepi.util.timefuncs import tnow
 from nepi.execution.resource import ResourceManager, clsinit_copy, \
         ResourceState, reschedule_delay
+from nepi.execution.trace import Trace, TraceAttr
 from nepi.execution.attribute import Attribute, Flags 
 from nepi.resources.omf.omf_resource import ResourceGateway, OMFResource
 from nepi.resources.omf.node import OMFNode, confirmation_counter, reschedule_check
@@ -97,6 +99,14 @@ class OMFApplication(OMFResource):
         self.release_id = None
         self._release_cnt = 0
 
+        # For performance tests
+        self.begin_deploy_time = None
+        self.begin_start_time = None
+        self.begin_release_time = None
+        self.dperf = True
+        self.sperf = True
+        self.rperf = True
+
         self.add_set_hook()
 
     def _init_command(self):
@@ -175,6 +185,11 @@ class OMFApplication(OMFResource):
             self.ec.schedule(reschedule_delay, self.deploy)
             return
 
+        ## For performance test
+        if self.dperf:
+            self.begin_deploy_time = tnow()
+            self.dperf = False
+
         self._init_command()
 
         self.set('xmppUser',self.node.get('xmppUser'))
@@ -204,6 +219,9 @@ class OMFApplication(OMFResource):
                self.get('xmppPassword'), exp_id = self.exp_id)
 
         if self.get('version') == "5":
+
+            self.begin_deploy_time = tnow()
+
             if self.get('sources'):
                 gateway = ResourceGateway.AMtoGateway[self.get('xmppServer')]
                 user = self.get('sshUser') or self.get('xmppUser')
@@ -251,16 +269,46 @@ class OMFApplication(OMFResource):
             return uid
         return False
 
+    def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
+        self.info("Retrieving '%s' trace %s " % (name, attr))
+        if name == 'stdout' :
+            suffix = '.out'
+        elif name == 'stderr' :
+            suffix = '.err'
+        else :
+            suffix = '.misc'
+
+        trace_path = '/tmp/'+ self._topic_app + suffix
+
+        if attr == TraceAttr.PATH:
+            return trace_path
+
+        if attr == TraceAttr.ALL:
+            try:
+                f = open(trace_path ,'r')
+            except IOError:
+                print "File with traces has not been found"
+                return False
+            out = f.read()
+            f.close()
+        return out
+
+
     def do_start(self):
         """ Start the RM. It means : Send Xmpp Message Using OMF protocol 
          to execute the application. 
 
         """
+        ## For performance test
+        if self.sperf:
+            self.begin_start_time = tnow()
+            self.sperf = False
 
         if not self.get('env'):
             self.set('env', " ")
 
         if self.get('version') == "5":
+            self.begin_start_time = tnow()
             # Some information to check the command for OMF5
             msg = " " + self.get_rtype() + " ( Guid : " + str(self._guid) +") : " + \
                 self.get('appid') + " : " + self._path + " : " + \
@@ -313,6 +361,8 @@ class OMFApplication(OMFResource):
         State is set to STOPPED after the message is sent.
 
         """
+
+
         if self.get('version') == 5:
             self._omf_api.exit(self.node.get('hostname'),self.get('appid'))
         super(OMFApplication, self).do_stop()
@@ -334,6 +384,11 @@ class OMFApplication(OMFResource):
         """ Clean the RM at the end of the experiment and release the API.
 
         """
+        ## For performance test
+        if self.rperf:
+            self.begin_release_time = tnow()
+            self.rperf = False
+
         if self._omf_api:
             if self.get('version') == "6" and self._topic_app:
                 if not self.release_id:
@@ -350,6 +405,12 @@ class OMFApplication(OMFResource):
                     msg = "Couldn't retrieve the confirmation of the release"
                     self.error(msg)
 
+                # Remove the stdout and stderr of the application
+                try:
+                    os.remove('/tmp/'+self._topic_app +'.out')
+                    os.remove('/tmp/'+self._topic_app +'.err')
+                except OSError:
+                    pass
 
             OMFAPIFactory.release_api(self.get('version'), 
               self.get('xmppServer'), self.get('xmppUser'), self.get('xmppPort'),
index 3abbc14..7cda160 100644 (file)
@@ -18,6 +18,7 @@
 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
 #         Julien Tribino <julien.tribino@inria.fr>
 
+from nepi.util.timefuncs import tnow
 from nepi.execution.resource import ResourceManager, clsinit_copy, \
         ResourceState, reschedule_delay
 from nepi.execution.attribute import Attribute, Flags 
@@ -81,6 +82,11 @@ class OMFChannel(OMFResource):
 
         self._omf_api = None
 
+        # For performance tests
+        self.perf = True
+        self.begin_deploy_time = None
+
+
     @property
     def exp_id(self):
         return self.ec.exp_id
@@ -139,6 +145,12 @@ class OMFChannel(OMFResource):
         using OMF 5.4 or 6 protocol to configure the channel.
 
         """   
+
+      ## For performance test
+        if self.perf:
+            self.begin_deploy_time = tnow()
+            self.perf = False
+
         if not self.get('channel'):
             msg = "Channel's value is not initialized"
             self.error(msg)
@@ -149,7 +161,6 @@ class OMFChannel(OMFResource):
             super(OMFChannel, self).do_deploy()
             return
 
-
         if not self.get('xmppServer'):
             msg = "XmppServer is not initialzed. XMPP Connections impossible"
             self.error(msg)
@@ -167,8 +178,6 @@ class OMFChannel(OMFResource):
 
         self._nodes_guid = self._get_target(self._connections)
 
-
-
         if self._nodes_guid == "reschedule" :
             self.ec.schedule("1s", self.deploy)
         else:
index c8c6f87..cd09cc4 100644 (file)
@@ -19,6 +19,7 @@
 #         Julien Tribino <julien.tribino@inria.fr>
 
 import os, time
+from nepi.util.timefuncs import tnow
 from nepi.execution.resource import ResourceManager, clsinit_copy, \
         ResourceState, reschedule_delay
 from nepi.execution.attribute import Attribute, Flags 
@@ -82,6 +83,9 @@ class OMFWifiInterface(OMFResource):
         self._omf_api = None
         self._type = ""
 
+        # For performance tests
+        self.perf = True
+        self.begin_deploy_time = None
 
     def valid_connection(self, guid):
         """ Check if the connection with the guid in parameter is possible. 
@@ -211,6 +215,11 @@ class OMFWifiInterface(OMFResource):
             self.ec.schedule(reschedule_delay, self.deploy)
             return
 
+        ## For performance test
+        if self.perf:
+            self.begin_deploy_time = tnow()
+            self.perf = False
+
         self.set('xmppUser',self.node.get('xmppUser'))
         self.set('xmppServer',self.node.get('xmppServer'))
         self.set('xmppPort',self.node.get('xmppPort'))
index dab9052..ce2f6c9 100644 (file)
@@ -18,6 +18,7 @@
 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
 #         Julien Tribino <julien.tribino@inria.fr>
 
+from nepi.util.timefuncs import tnow
 from nepi.execution.resource import ResourceManager, clsinit_copy, \
         ResourceState, reschedule_delay
 from nepi.execution.attribute import Attribute, Flags 
@@ -26,8 +27,8 @@ from nepi.resources.omf.omf_api_factory import OMFAPIFactory
 
 import time
 
-reschedule_check = "0.2s"
-confirmation_counter = 600
+reschedule_check = "1s"
+confirmation_counter = 3600
 
 @clsinit_copy
 class OMFNode(OMFResource):
@@ -68,6 +69,11 @@ class OMFNode(OMFResource):
 
         self._omf_api = None 
 
+        # For performance tests
+        self.perf = True
+        self.begin_deploy_time = None
+
+
     @property
     def exp_id(self):
         return self.ec.exp_id
@@ -99,6 +105,11 @@ class OMFNode(OMFResource):
             to enroll the node into the experiment.
 
         """ 
+      ## For performance test
+        if self.perf:
+            self.begin_deploy_time = tnow()
+            self.perf = False
+
         if not self.get('xmppServer'):
             msg = "XmppServer is not initialzed. XMPP Connections impossible"
             self.error(msg)
index 352a18e..13a598a 100644 (file)
@@ -115,7 +115,7 @@ class OMF6API(Logger):
 
     def check_ready(self, xmpp):
         delay = 1.0
-        for i in xrange(10):
+        for i in xrange(15):
             if xmpp.ready:
                 break
             else:
@@ -249,6 +249,9 @@ class OMF6API(Logger):
         """ Delete the session and logger topics. Then disconnect 
 
         """
+        # To receive the last messages
+        time.sleep(2)
+
         self._client.delete(self._nepi_topic)
        
         # Wait the send queue to be empty before disconnect
index 4ea5717..c3b37a8 100644 (file)
@@ -51,9 +51,12 @@ class OMF6Parser(Logger):
         """
         super(OMF6Parser, self).__init__("OMF6API")
         self.mailbox={}
+        self.traces={}
+        self.trace='NULL'
 
         self.init_mailbox()
 
+
     def init_mailbox(self):
         self.mailbox['create'] = []
         self.mailbox['started'] = []
@@ -169,18 +172,40 @@ class OMF6Parser(Logger):
         """
         props = self._check_for_props(root, namespaces)
         uid = self._check_for_tag(root, namespaces, "uid")
-        msg = "STATUS -- "
+        event = self._check_for_tag(root, namespaces, "event")
+
+        if event == "STDOUT":
+            if not uid+'out' in self.traces:
+                f = open('/tmp/'+ uid + '.out','w')
+                self.traces[uid+'out'] = f
+            self.trace = self.traces[uid+'out']
+        elif event == "STDERR" :
+            if not uid+'err' in self.traces:
+                g = open('/tmp/'+ uid + '.err','w')
+                self.traces[uid+'err'] = g
+            self.trace = self.traces[uid+'err']
+        elif event == "EXIT" :
+            if uid+'out' in self.traces:
+                self.traces[uid+'out'].close()
+            if uid+'err' in self.traces:
+                self.traces[uid+'err'].close()
+            
+        log = "STATUS -- "
         for elt in props.keys():
             ns, tag = elt.split('}')
             if tag == "it":
-                msg = msg + "membership : " + props[elt]+" -- "
+                log = log + "membership : " + props[elt]+" -- "
             elif tag == "event":
                 self.mailbox['started'].append(uid)
-                msg = msg + "event : " + props[elt]+" -- "
+                log = log + "event : " + props[elt]+" -- "
+            elif tag == "msg":
+                if event == "STDOUT" or event == "STDERR" :
+                    self.trace.write(props[elt]+'\n')
+                log = log + tag +" : " + props[elt]+" -- "
             else:
-                msg = msg + tag +" : " + props[elt]+" -- "
-        msg = msg + " STATUS "
-        self.info(msg)
+                log = log + tag +" : " + props[elt]+" -- "
+        log = log + " STATUS "
+        self.info(log)
 
     def _inform_released(self, root, namespaces):
         """ Parse and Display RELEASED message
diff --git a/test/resources/omf/omf6_vlc_traces.py b/test/resources/omf/omf6_vlc_traces.py
new file mode 100755 (executable)
index 0000000..e06fa33
--- /dev/null
@@ -0,0 +1,116 @@
+#!/usr/bin/env python
+#
+#    NEPI, a framework to manage network experiments
+#    Copyright (C) 2013 INRIA
+#
+#    This program is free software: you can redistribute it and/or modify
+#    it under the terms of the GNU General Public License as published by
+#    the Free Software Foundation, either version 3 of the License, or
+#    (at your option) any later version.
+#
+#    This program is distributed in the hope that it will be useful,
+#    but WITHOUT ANY WARRANTY; without even the implied warranty of
+#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+#    GNU General Public License for more details.
+#
+#    You should have received a copy of the GNU General Public License
+#    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+#
+# Author: Julien Tribino <julien.tribino@inria.fr>
+
+
+from nepi.execution.resource import ResourceFactory, ResourceManager, ResourceAction, ResourceState
+from nepi.execution.ec import ExperimentController
+
+from nepi.resources.omf.node import OMFNode
+from nepi.resources.omf.application import OMFApplication
+from nepi.resources.omf.interface import OMFWifiInterface
+from nepi.resources.omf.channel import OMFChannel
+from nepi.resources.omf.omf_api_factory import OMFAPIFactory
+
+from nepi.util.timefuncs import *
+
+import os
+import time
+import unittest
+
+class OMFPingNormalCase(unittest.TestCase):
+    def test_deploy(self):
+        ec = ExperimentController(exp_id = "5421" )
+
+        self.node1 = ec.register_resource("OMFNode")
+        ec.set(self.node1, 'hostname', 'wlab12')
+        ec.set(self.node1, 'xmppUser', "nepi")
+        ec.set(self.node1, 'xmppServer', "xmpp-plexus.onelab.eu")
+        ec.set(self.node1, 'xmppPort', "5222")
+        ec.set(self.node1, 'xmppPassword', "1234")
+        
+        self.iface1 = ec.register_resource("OMFWifiInterface")
+        ec.set(self.iface1, 'name', "wlan0")
+        ec.set(self.iface1, 'mode', "adhoc")
+        ec.set(self.iface1, 'hw_mode', "g")
+        ec.set(self.iface1, 'essid', "vlcexp")
+        ec.set(self.iface1, 'ip', "10.0.0.17/24")
+        
+        self.channel = ec.register_resource("OMFChannel")
+        ec.set(self.channel, 'channel', "6")
+        ec.set(self.channel, 'xmppUser', "nepi")
+        ec.set(self.channel, 'xmppServer', "xmpp-plexus.onelab.eu")
+        ec.set(self.channel, 'xmppPort', "5222")
+        ec.set(self.channel, 'xmppPassword', "1234")
+        
+        self.app1 = ec.register_resource("OMFApplication")
+        ec.set(self.app1, 'appid', 'Vlc#1')
+        ec.set(self.app1, 'command', "ping -c5 10.0.0.17")
+
+        ec.register_connection(self.app1, self.node1)
+        ec.register_connection(self.node1, self.iface1)
+        ec.register_connection(self.iface1, self.channel)
+
+        ec.register_condition(self.app1, ResourceAction.STOP, self.app1, ResourceState.STARTED , "10s")
+
+        ec.deploy()
+
+        ec.wait_finished(self.app1)
+
+        stdout_1 = ec.trace(self.app1, "stdout")
+        stderr_1 = ec.trace(self.app1, "stderr")
+
+        if stdout_1:
+            f = open("app1_out.txt", "w")
+            f.write(stdout_1)
+            f.close()
+
+        if stderr_1:
+            f = open("app1_err.txt", "w")
+            f.write(stderr_1)
+            f.close()
+
+        self.assertEquals(ec.get_resource(self.node1).state, ResourceState.STARTED)
+        self.assertEquals(ec.get_resource(self.iface1).state, ResourceState.STARTED)
+        self.assertEquals(ec.get_resource(self.channel).state, ResourceState.STARTED)
+        self.assertEquals(ec.get_resource(self.app1).state, ResourceState.STOPPED)
+
+        ec.shutdown()
+
+        self.assertEquals(ec.get_resource(self.node1).state, ResourceState.RELEASED)
+        self.assertEquals(ec.get_resource(self.iface1).state, ResourceState.RELEASED)
+        self.assertEquals(ec.get_resource(self.channel).state, ResourceState.RELEASED)
+        self.assertEquals(ec.get_resource(self.app1).state, ResourceState.RELEASED)
+
+        t = open("app1_out.txt", "r")
+        l = t.readlines()
+        self.assertEquals(l[0], "PING 10.0.0.17 (10.0.0.17) 56(84) bytes of data.\n")
+        self.assertIn("5 packets transmitted, 5 received, 0% packet loss, time", l[-2])
+        self.assertIn("rtt min/avg/max/mdev = ", l[-1])
+        
+        t.close()
+        os.remove("app1_out.txt")
+        
+
+
+if __name__ == '__main__':
+    unittest.main()
+
+
+