applied the except and raise fixers to the master branch to close the gap with py3
[nepi.git] / src / nepi / resources / omf / application.py
index f0fc5f1..03db088 100644 (file)
@@ -3,9 +3,8 @@
 #    Copyright (C) 2013 INRIA
 #
 #    This program is free software: you can redistribute it and/or modify
 #    Copyright (C) 2013 INRIA
 #
 #    This program is free software: you can redistribute it and/or modify
-#    it under the terms of the GNU General Public License as published by
-#    the Free Software Foundation, either version 3 of the License, or
-#    (at your option) any later version.
+#    it under the terms of the GNU General Public License version 2 as
+#    published by the Free Software Foundation;
 #
 #    This program is distributed in the hope that it will be useful,
 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
 #
 #    This program is distributed in the hope that it will be useful,
 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
 #         Julien Tribino <julien.tribino@inria.fr>
 
 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
 #         Julien Tribino <julien.tribino@inria.fr>
 
-from nepi.execution.resource import ResourceManager, clsinit_copy, ResourceState, \
-        reschedule_delay
+from __future__ import print_function
+
+import os
+
+from nepi.util.timefuncs import tnow
+from nepi.execution.resource import ResourceManager, clsinit_copy, \
+        ResourceState
+from nepi.execution.trace import Trace, TraceAttr
 from nepi.execution.attribute import Attribute, Flags 
 from nepi.resources.omf.omf_resource import ResourceGateway, OMFResource
 from nepi.execution.attribute import Attribute, Flags 
 from nepi.resources.omf.omf_resource import ResourceGateway, OMFResource
-from nepi.resources.omf.node import OMFNode
-from nepi.resources.omf.omf_api import OMFAPIFactory
+from nepi.resources.omf.node import OMFNode, confirmation_counter, reschedule_check
+from nepi.resources.omf.omf_api_factory import OMFAPIFactory
 
 
-#from nepi.util.sshfuncs import ProcStatus
 from nepi.util import sshfuncs
 
 from nepi.util import sshfuncs
 
-
 @clsinit_copy
 class OMFApplication(OMFResource):
     """
 @clsinit_copy
 class OMFApplication(OMFResource):
     """
@@ -38,37 +41,31 @@ class OMFApplication(OMFResource):
         :type ec: ExperimentController
         :param guid: guid of the RM
         :type guid: int
         :type ec: ExperimentController
         :param guid: guid of the RM
         :type guid: int
-        :param creds: Credentials to communicate with the rm (XmppClient)
-        :type creds: dict
-
-    .. note::
-
-       This class is used only by the Experiment Controller through the 
-       Resource Factory
 
     """
 
     """
-    _rtype = "OMFApplication"
-    _authorized_connections = ["OMFNode"]
+    _rtype = "omf::Application"
+    _authorized_connections = ["omf::Node", "wilabt::sfa::Node"]
 
     @classmethod
     def _register_attributes(cls):
         """ Register the attributes of an OMF application
 
         """
 
     @classmethod
     def _register_attributes(cls):
         """ Register the attributes of an OMF application
 
         """
-        appid = Attribute("appid", "Name of the application")
-        path = Attribute("path", "Path of the application")
-        args = Attribute("args", "Argument of the application")
+        command = Attribute("command", "Command to execute")
         env = Attribute("env", "Environnement variable of the application")
         env = Attribute("env", "Environnement variable of the application")
+
+        # For OMF 5:
+        appid = Attribute("appid", "Name of the application")
         stdin = Attribute("stdin", "Input of the application", default = "")
         sources = Attribute("sources", "Sources of the application", 
         stdin = Attribute("stdin", "Input of the application", default = "")
         sources = Attribute("sources", "Sources of the application", 
-                     flags = Flags.ExecReadOnly)
+                     flags = Flags.Design)
         sshuser = Attribute("sshUser", "user to connect with ssh", 
         sshuser = Attribute("sshUser", "user to connect with ssh", 
-                     flags = Flags.ExecReadOnly)
+                     flags = Flags.Design)
         sshkey = Attribute("sshKey", "key to use for ssh", 
         sshkey = Attribute("sshKey", "key to use for ssh", 
-                     flags = Flags.ExecReadOnly)
+                     flags = Flags.Design)
+
         cls._register_attribute(appid)
         cls._register_attribute(appid)
-        cls._register_attribute(path)
-        cls._register_attribute(args)
+        cls._register_attribute(command)
         cls._register_attribute(env)
         cls._register_attribute(stdin)
         cls._register_attribute(sources)
         cls._register_attribute(env)
         cls._register_attribute(stdin)
         cls._register_attribute(sources)
@@ -87,32 +84,60 @@ class OMFApplication(OMFResource):
         """
         super(OMFApplication, self).__init__(ec, guid)
 
         """
         super(OMFApplication, self).__init__(ec, guid)
 
+        self.set('command', "")
         self.set('appid', "")
         self.set('appid', "")
-        self.set('path', "")
-        self.set('args', "")
+        self._path= ""
+        self._args = ""
         self.set('env', "")
 
         self._node = None
 
         self._omf_api = None
         self.set('env', "")
 
         self._node = None
 
         self._omf_api = None
+        self._topic_app = None
+        self.create_id = None
+        self._create_cnt = 0
+        self._start_cnt = 0
+        self.release_id = None
+        self._release_cnt = 0
+
+        # For performance tests
+        self.begin_deploy_time = None
+        self.begin_start_time = None
+        self.begin_release_time = None
+        self.dperf = True
+        self.sperf = True
+        self.rperf = True
 
         self.add_set_hook()
 
 
         self.add_set_hook()
 
+    def _init_command(self):
+        comm = self.get('command').split(' ')
+        self._path= comm[0]
+        if len(comm)>1:
+            self._args = ' '.join(comm[1:])
+
     @property
     def exp_id(self):
         return self.ec.exp_id
 
     @property
     def node(self):
     @property
     def exp_id(self):
         return self.ec.exp_id
 
     @property
     def node(self):
-        rm_list = self.get_connected(OMFNode.rtype())
+        rm_list = self.get_connected(OMFNode.get_rtype())
         if rm_list: return rm_list[0]
         return None
 
     def stdin_hook(self, old_value, new_value):
         if rm_list: return rm_list[0]
         return None
 
     def stdin_hook(self, old_value, new_value):
+        """ Set a hook to the stdin attribute in order to send a message at each time
+        the value of this parameter is changed. Used ofr OMF 5.4 only
+
+        """
         self._omf_api.send_stdin(self.node.get('hostname'), new_value, self.get('appid'))
         return new_value
 
     def add_set_hook(self):
         self._omf_api.send_stdin(self.node.get('hostname'), new_value, self.get('appid'))
         return new_value
 
     def add_set_hook(self):
+        """ Initialize the hooks for OMF 5.4 only
+
+        """
         attr = self._attrs["stdin"]
         attr.set_hook = self.stdin_hook
 
         attr = self._attrs["stdin"]
         attr.set_hook = self.stdin_hook
 
@@ -126,10 +151,10 @@ class OMFApplication(OMFResource):
 
         """
         rm = self.ec.get_resource(guid)
 
         """
         rm = self.ec.get_resource(guid)
-        if rm.rtype() not in self._authorized_connections:
+        if rm.get_rtype() not in self._authorized_connections:
             msg = ("Connection between %s %s and %s %s refused: "
                     "An Application can be connected only to a Node" ) % \
             msg = ("Connection between %s %s and %s %s refused: "
                     "An Application can be connected only to a Node" ) % \
-                (self.rtype(), self._guid, rm.rtype(), guid)
+                (self.get_rtype(), self._guid, rm.get_rtype(), guid)
             self.debug(msg)
 
             return False
             self.debug(msg)
 
             return False
@@ -137,106 +162,258 @@ class OMFApplication(OMFResource):
         elif len(self.connections) != 0 :
             msg = ("Connection between %s %s and %s %s refused: "
                     "This Application is already connected" ) % \
         elif len(self.connections) != 0 :
             msg = ("Connection between %s %s and %s %s refused: "
                     "This Application is already connected" ) % \
-                (self.rtype(), self._guid, rm.rtype(), guid)
+                (self.get_rtype(), self._guid, rm.get_rtype(), guid)
             self.debug(msg)
             self.debug(msg)
+
             return False
             return False
+
         else :
             msg = "Connection between %s %s and %s %s accepted" % (
         else :
             msg = "Connection between %s %s and %s %s accepted" % (
-                    self.rtype(), self._guid, rm.rtype(), guid)
+                    self.get_rtype(), self._guid, rm.get_rtype(), guid)
             self.debug(msg)
 
             return True
 
             self.debug(msg)
 
             return True
 
-    def deploy(self):
+    def do_deploy(self):
         """ Deploy the RM. It means nothing special for an application 
         for now (later it will be upload sources, ...)
         """ Deploy the RM. It means nothing special for an application 
         for now (later it will be upload sources, ...)
-        It becomes DEPLOYED after getting the xmpp client.
+        It becomes DEPLOYED after the topic for the application has been created
 
         """
 
         """
-        self.set('xmppSlice', self.node.get('xmppSlice'))
-        self.set('xmppHost', self.node.get('xmppHost'))
-        self.set('xmppPort', self.node.get('xmppPort'))
-        self.set('xmppPassword', self.node.get('xmppPassword'))
+        if not self.node or self.node.state < ResourceState.READY:
+            self.debug("---- RESCHEDULING DEPLOY ---- node state %s "
+                       % self.node.state )
+            self.ec.schedule(self.reschedule_delay, self.deploy)
+            return
 
 
-        if not self._omf_api :
-            self._omf_api = OMFAPIFactory.get_api(self.get('xmppSlice'), 
-                self.get('xmppHost'), self.get('xmppPort'), 
-                self.get('xmppPassword'), exp_id = self.exp_id)
+        ## For performance test
+        if self.dperf:
+            self.begin_deploy_time = tnow()
+            self.dperf = False
 
 
-        if not self._omf_api :
-            msg = "Credentials are not initialzed. XMPP Connections impossible"
+        self._init_command()
+
+        self.set('xmppUser',self.node.get('xmppUser'))
+        self.set('xmppServer',self.node.get('xmppServer'))
+        self.set('xmppPort',self.node.get('xmppPort'))
+        self.set('xmppPassword',self.node.get('xmppPassword'))
+        self.set('version',self.node.get('version'))
+
+        if not self.get('xmppServer'):
+            msg = "XmppServer is not initialzed. XMPP Connections impossible"
             self.error(msg)
             self.error(msg)
-            self.fail()
-            return
+            raise RuntimeError(msg)
+
+        if not (self.get('xmppUser') or self.get('xmppPort') 
+                   or self.get('xmppPassword')):
+            msg = "Credentials are not all initialzed. Default values will be used"
+            self.warn(msg)
+
+        if not self.get('command') :
+            msg = "Application's Command is not initialized"
+            self.error(msg)
+            raise RuntimeError(msg)
+
+        if not self._omf_api :
+            self._omf_api = OMFAPIFactory.get_api(self.get('version'), 
+              self.get('xmppServer'), self.get('xmppUser'), self.get('xmppPort'),
+               self.get('xmppPassword'), exp_id = self.exp_id)
 
 
-        if self.get('sources'):
-            gateway = ResourceGateway.AMtoGateway[self.get('xmppHost')]
-            user = self.get('sshUser') or self.get('xmppSlice')
-            dst = user + "@"+ gateway + ":"
-            (out, err), proc = sshfuncs.rcopy(self.get('sources'), dst)
+        if self.get('version') == "5":
 
 
-        super(OMFApplication, self).deploy()
+            self.begin_deploy_time = tnow()
 
 
+            if self.get('sources'):
+                gateway = ResourceGateway.AMtoGateway[self.get('xmppServer')]
+                user = self.get('sshUser') or self.get('xmppUser')
+                dst = user + "@"+ gateway + ":"
+                (out, err), proc = sshfuncs.rcopy(self.get('sources'), dst)
+        else :
+            # For OMF 6 :
+            if not self.create_id:
+                props = {}
+                if self.get('command'):
+                    props['application:binary_path'] = self.get('command')
+                    props['application:hrn'] = self.get('command')
+                    props['application:membership'] = self._topic_app
+                props['application:type'] = "application"
+    
+                self.create_id = os.urandom(16).encode('hex')
+                self._omf_api.frcp_create( self.create_id, self.node.get('hostname'), "application", props = props)
+   
+            if self._create_cnt > confirmation_counter:
+                msg = "Couldn't retrieve the confirmation of the creation"
+                self.error(msg)
+                raise RuntimeError(msg)
+
+            uid = self.check_deploy(self.create_id)
+            if not uid:
+                self._create_cnt +=1
+                self.ec.schedule(reschedule_check, self.deploy)
+                return
+
+            self._topic_app = uid
+            self._omf_api.enroll_topic(self._topic_app)
+
+        super(OMFApplication, self).do_deploy()
+
+    def check_deploy(self, cid):
+        """ Check, through the mail box in the parser, 
+        if the confirmation of the creation has been received
+
+        :param cid: the id of the original message
+        :type guid: string
+
+        """
+        uid = self._omf_api.check_mailbox("create", cid)
+        if uid : 
+            return uid
+        return False
+
+    def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
+        self.info("Retrieving '%s' trace %s " % (name, attr))
+        if name == 'stdout' :
+            suffix = '.out'
+        elif name == 'stderr' :
+            suffix = '.err'
+        else :
+            suffix = '.misc'
 
 
-    def start(self):
+        trace_path = '/tmp/'+ self._topic_app + suffix
+
+        if attr == TraceAttr.PATH:
+            return trace_path
+
+        if attr == TraceAttr.ALL:
+            try:
+                with open(trace_path ,'r') as f:
+                    return f.read()
+            except IOError:
+                print("File with traces has not been found")
+                return False
+
+
+    def do_start(self):
         """ Start the RM. It means : Send Xmpp Message Using OMF protocol 
          to execute the application. 
         """ Start the RM. It means : Send Xmpp Message Using OMF protocol 
          to execute the application. 
-         It becomes STARTED before the messages are sent (for coordination)
 
         """
 
         """
-        if not (self.get('appid') and self.get('path')) :
-            msg = "Application's information are not initialized"
-            self.error(msg)
-            self.fail()
-            return
+        ## For performance test
+        if self.sperf:
+            self.begin_start_time = tnow()
+            self.sperf = False
 
 
-        if not self.get('args'):
-            self.set('args', " ")
         if not self.get('env'):
             self.set('env', " ")
 
         if not self.get('env'):
             self.set('env', " ")
 
-        # Some information to check the information in parameter
-        msg = " " + self.rtype() + " ( Guid : " + str(self._guid) +") : " + \
-            self.get('appid') + " : " + self.get('path') + " : " + \
-            self.get('args') + " : " + self.get('env')
-        self.info(msg)
+        if self.get('version') == "5":
+            self.begin_start_time = tnow()
+            # Some information to check the command for OMF5
+            msg = " " + self.get_rtype() + " ( Guid : " + str(self._guid) +") : " + \
+                self.get('appid') + " : " + self._path + " : " + \
+                self._args + " : " + self.get('env')
+            self.debug(msg)
 
 
-        try:
             self._omf_api.execute(self.node.get('hostname'),self.get('appid'), \
             self._omf_api.execute(self.node.get('hostname'),self.get('appid'), \
-                self.get('args'), self.get('path'), self.get('env'))
-        except AttributeError:
-            msg = "Credentials are not initialzed. XMPP Connections impossible"
-            self.error(msg)
-            self.fail()
-            raise
+                self._args, self._path, self.get('env'))
+        else:
+            #For OMF 6
+            if self._start_cnt == 0:
+                props = {}
+                props['state'] = "running"
+    
+                guards = {}
+                guards['type'] = "application"
+                guards['name'] = self.get('command')
+
+                self._omf_api.frcp_configure(self._topic_app, props = props, guards = guards )
+
+            if self._start_cnt > confirmation_counter:
+                msg = "Couldn't retrieve the confirmation that the application started"
+                self.error(msg)
+                raise RuntimeError(msg)
+
+            res = self.check_start(self._topic_app)
+            if not res:
+                self._start_cnt +=1
+                self.ec.schedule(reschedule_check, self.start)
+                return
+
+        super(OMFApplication, self).do_start()
+
+    def check_start(self, uid):
+        """ Check, through the mail box in the parser, 
+        if the confirmation of the start has been received
+
+        :param uid: the id of the original message
+        :type guid: string
 
 
-        super(OMFApplication, self).start()
+        """
+        res = self._omf_api.check_mailbox("started", uid)
+        if res : 
+            return True
+        return False
 
 
-    def stop(self):
+    def do_stop(self):
         """ Stop the RM. It means : Send Xmpp Message Using OMF protocol to 
         kill the application. 
         State is set to STOPPED after the message is sent.
 
         """
         """ Stop the RM. It means : Send Xmpp Message Using OMF protocol to 
         kill the application. 
         State is set to STOPPED after the message is sent.
 
         """
-        try:
+
+
+        if self.get('version') == 5:
             self._omf_api.exit(self.node.get('hostname'),self.get('appid'))
             self._omf_api.exit(self.node.get('hostname'),self.get('appid'))
-        except AttributeError:
-            msg = "Credentials were not initialzed. XMPP Connections impossible"
-            self.error(msg)
-            self.fail()
-            return
+        super(OMFApplication, self).do_stop()
 
 
-        super(OMFApplication, self).stop()
-        self.set_finished()
+    def check_release(self, cid):
+        """ Check, through the mail box in the parser, 
+        if the confirmation of the release has been received
 
 
-    def release(self):
-        """ Clean the RM at the end of the experiment and release the API.
+        :param cid: the id of the original message
+        :type guid: string
 
         """
 
         """
-        if self._omf_api :
-            OMFAPIFactory.release_api(self.get('xmppSlice'), 
-                self.get('xmppHost'), self.get('xmppPort'), 
-                self.get('xmppPassword'), exp_id = self.exp_id)
+        res = self._omf_api.check_mailbox("release", cid)
+        if res : 
+            return res
+        return False
 
 
-        super(OMFApplication, self).release()
+    def do_release(self):
+        """ Clean the RM at the end of the experiment and release the API.
+
+        """
+        ## For performance test
+        if self.rperf:
+            self.begin_release_time = tnow()
+            self.rperf = False
+
+        if self._omf_api:
+            if self.get('version') == "6" and self._topic_app:
+                if not self.release_id:
+                    self.release_id = os.urandom(16).encode('hex')
+                    self._omf_api.frcp_release( self.release_id, self.node.get('hostname'),self._topic_app, res_id=self._topic_app)
+    
+                if self._release_cnt < confirmation_counter:
+                    cid = self.check_release(self.release_id)
+                    if not cid:
+                        self._release_cnt +=1
+                        self.ec.schedule(reschedule_check, self.release)
+                        return
+                else:
+                    msg = "Couldn't retrieve the confirmation of the release"
+                    self.error(msg)
+
+                # Remove the stdout and stderr of the application
+                try:
+                    os.remove('/tmp/'+self._topic_app +'.out')
+                    os.remove('/tmp/'+self._topic_app +'.err')
+                except OSError:
+                    pass
+
+            OMFAPIFactory.release_api(self.get('version'), 
+              self.get('xmppServer'), self.get('xmppUser'), self.get('xmppPort'),
+               self.get('xmppPassword'), exp_id = self.exp_id)
+
+        super(OMFApplication, self).do_release()