# Julien Tribino <julien.tribino@inria.fr>
from nepi.execution.resource import ResourceManager, clsinit_copy, \
- ResourceState, reschedule_delay, failtrap
+ ResourceState, reschedule_delay
from nepi.execution.attribute import Attribute, Flags
from nepi.resources.omf.omf_resource import ResourceGateway, OMFResource
from nepi.resources.omf.node import OMFNode
from nepi.resources.omf.omf_api import OMFAPIFactory
+from nepi.util import sshfuncs
+
@clsinit_copy
class OMFApplication(OMFResource):
"""
:type ec: ExperimentController
:param guid: guid of the RM
:type guid: int
- :param creds: Credentials to communicate with the rm (XmppClient)
- :type creds: dict
-
- .. note::
-
- This class is used only by the Experiment Controller through the
- Resource Factory
"""
_rtype = "OMFApplication"
args = Attribute("args", "Argument of the application")
env = Attribute("env", "Environnement variable of the application")
stdin = Attribute("stdin", "Input of the application", default = "")
+ sources = Attribute("sources", "Sources of the application",
+ flags = Flags.Design)
+ sshuser = Attribute("sshUser", "user to connect with ssh",
+ flags = Flags.Design)
+ sshkey = Attribute("sshKey", "key to use for ssh",
+ flags = Flags.Design)
cls._register_attribute(appid)
cls._register_attribute(path)
cls._register_attribute(args)
cls._register_attribute(env)
cls._register_attribute(stdin)
+ cls._register_attribute(sources)
+ cls._register_attribute(sshuser)
+ cls._register_attribute(sshkey)
def __init__(self, ec, guid):
"""
@property
def node(self):
- rm_list = self.get_connected(OMFNode.rtype())
+ rm_list = self.get_connected(OMFNode.get_rtype())
if rm_list: return rm_list[0]
return None
def stdin_hook(self, old_value, new_value):
+ """ Set a hook to the stdin attribute in order to send a message at each time
+ the value of this parameter is changed
+
+ """
self._omf_api.send_stdin(self.node.get('hostname'), new_value, self.get('appid'))
return new_value
def add_set_hook(self):
+ """ Initialize the hooks
+
+ """
attr = self._attrs["stdin"]
attr.set_hook = self.stdin_hook
"""
rm = self.ec.get_resource(guid)
- if rm.rtype() not in self._authorized_connections:
+ if rm.get_rtype() not in self._authorized_connections:
msg = ("Connection between %s %s and %s %s refused: "
"An Application can be connected only to a Node" ) % \
- (self.rtype(), self._guid, rm.rtype(), guid)
+ (self.get_rtype(), self._guid, rm.get_rtype(), guid)
self.debug(msg)
return False
elif len(self.connections) != 0 :
msg = ("Connection between %s %s and %s %s refused: "
"This Application is already connected" ) % \
- (self.rtype(), self._guid, rm.rtype(), guid)
+ (self.get_rtype(), self._guid, rm.get_rtype(), guid)
self.debug(msg)
return False
else :
msg = "Connection between %s %s and %s %s accepted" % (
- self.rtype(), self._guid, rm.rtype(), guid)
+ self.get_rtype(), self._guid, rm.get_rtype(), guid)
self.debug(msg)
return True
- @failtrap
- def deploy(self):
+ def do_deploy(self):
""" Deploy the RM. It means nothing special for an application
for now (later it will be upload sources, ...)
It becomes DEPLOYED after getting the xmpp client.
"""
+
+ self.set('xmppSlice',self.node.get('xmppSlice'))
+ self.set('xmppHost',self.node.get('xmppHost'))
+ self.set('xmppPort',self.node.get('xmppPort'))
+ self.set('xmppPassword',self.node.get('xmppPassword'))
+
+ if not (self.get('xmppSlice') and self.get('xmppHost')
+ and self.get('xmppPort') and self.get('xmppPassword')):
+ msg = "Credentials are not initialzed. XMPP Connections impossible"
+ self.error(msg)
+ raise RuntimeError, msg
+
if not self._omf_api :
self._omf_api = OMFAPIFactory.get_api(self.get('xmppSlice'),
self.get('xmppHost'), self.get('xmppPort'),
self.get('xmppPassword'), exp_id = self.exp_id)
- if not self._omf_api :
- msg = "Credentials are not initialzed. XMPP Connections impossible"
- self.error(msg)
- raise RuntimeError, msg
+ if self.get('sources'):
+ gateway = ResourceGateway.AMtoGateway[self.get('xmppHost')]
+ user = self.get('sshUser') or self.get('xmppSlice')
+ dst = user + "@"+ gateway + ":"
+ (out, err), proc = sshfuncs.rcopy(self.get('sources'), dst)
- super(OMFApplication, self).deploy()
+ super(OMFApplication, self).do_deploy()
- @failtrap
- def start(self):
+ def do_start(self):
""" Start the RM. It means : Send Xmpp Message Using OMF protocol
to execute the application.
It becomes STARTED before the messages are sent (for coordination)
self.set('env', " ")
# Some information to check the information in parameter
- msg = " " + self.rtype() + " ( Guid : " + str(self._guid) +") : " + \
+ msg = " " + self.get_rtype() + " ( Guid : " + str(self._guid) +") : " + \
self.get('appid') + " : " + self.get('path') + " : " + \
self.get('args') + " : " + self.get('env')
self.info(msg)
- try:
- self._omf_api.execute(self.node.get('hostname'),self.get('appid'), \
- self.get('args'), self.get('path'), self.get('env'))
- except AttributeError:
- msg = "Credentials are not initialzed. XMPP Connections impossible"
- self.error(msg)
- raise
+ self._omf_api.execute(self.node.get('hostname'),self.get('appid'), \
+ self.get('args'), self.get('path'), self.get('env'))
- super(OMFApplication, self).start()
+ super(OMFApplication, self).do_start()
- @failtrap
- def stop(self):
+ def do_stop(self):
""" Stop the RM. It means : Send Xmpp Message Using OMF protocol to
kill the application.
State is set to STOPPED after the message is sent.
"""
- try:
- self._omf_api.exit(self.node.get('hostname'),self.get('appid'))
- except AttributeError:
- msg = "Credentials were not initialzed. XMPP Connections impossible"
- self.error(msg)
- raise
- super(OMFApplication, self).stop()
- self.set_finished()
+ self._omf_api.exit(self.node.get('hostname'),self.get('appid'))
+ super(OMFApplication, self).do_stop()
- def release(self):
+ def do_release(self):
""" Clean the RM at the end of the experiment and release the API.
"""
- try:
- if self._omf_api :
- OMFAPIFactory.release_api(self.get('xmppSlice'),
- self.get('xmppHost'), self.get('xmppPort'),
- self.get('xmppPassword'), exp_id = self.exp_id)
- except:
- import traceback
- err = traceback.format_exc()
- self.error(err)
-
- super(OMFApplication, self).release()
+ if self._omf_api:
+ OMFAPIFactory.release_api(self.get('xmppSlice'),
+ self.get('xmppHost'), self.get('xmppPort'),
+ self.get('xmppPassword'), exp_id = self.exp_id)
+
+ super(OMFApplication, self).do_release()