2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19 # Julien Tribino <julien.tribino@inria.fr>
21 from nepi.execution.resource import ResourceManager, clsinit_copy, \
22 ResourceState, reschedule_delay
23 from nepi.execution.attribute import Attribute, Flags
24 from nepi.resources.omf.omf_resource import ResourceGateway, OMFResource
25 from nepi.resources.omf.node import OMFNode
26 from nepi.resources.omf.omf_api import OMFAPIFactory
28 from nepi.util import sshfuncs
31 class OMFApplication(OMFResource):
33 .. class:: Class Args :
35 :param ec: The Experiment controller
36 :type ec: ExperimentController
37 :param guid: guid of the RM
39 :param creds: Credentials to communicate with the rm (XmppClient)
44 This class is used only by the Experiment Controller through the
48 _rtype = "OMFApplication"
49 _authorized_connections = ["OMFNode"]
52 def _register_attributes(cls):
53 """ Register the attributes of an OMF application
56 appid = Attribute("appid", "Name of the application")
57 path = Attribute("path", "Path of the application")
58 args = Attribute("args", "Argument of the application")
59 env = Attribute("env", "Environnement variable of the application")
60 stdin = Attribute("stdin", "Input of the application", default = "")
61 sources = Attribute("sources", "Sources of the application",
62 flags = Flags.ExecReadOnly)
63 sshuser = Attribute("sshUser", "user to connect with ssh",
64 flags = Flags.ExecReadOnly)
65 sshkey = Attribute("sshKey", "key to use for ssh",
66 flags = Flags.ExecReadOnly)
67 cls._register_attribute(appid)
68 cls._register_attribute(path)
69 cls._register_attribute(args)
70 cls._register_attribute(env)
71 cls._register_attribute(stdin)
72 cls._register_attribute(sources)
73 cls._register_attribute(sshuser)
74 cls._register_attribute(sshkey)
76 def __init__(self, ec, guid):
78 :param ec: The Experiment controller
79 :type ec: ExperimentController
80 :param guid: guid of the RM
82 :param creds: Credentials to communicate with the rm (XmppClient for OMF)
86 super(OMFApplication, self).__init__(ec, guid)
101 return self.ec.exp_id
105 rm_list = self.get_connected(OMFNode.get_rtype())
106 if rm_list: return rm_list[0]
109 def stdin_hook(self, old_value, new_value):
110 """ Set a hook to the stdin attribute in order to send a message at each time
111 the value of this parameter is changed
114 self._omf_api.send_stdin(self.node.get('hostname'), new_value, self.get('appid'))
117 def add_set_hook(self):
118 """ Initialize the hooks
121 attr = self._attrs["stdin"]
122 attr.set_hook = self.stdin_hook
124 def valid_connection(self, guid):
125 """ Check if the connection with the guid in parameter is possible.
126 Only meaningful connections are allowed.
128 :param guid: Guid of RM it will be connected
133 rm = self.ec.get_resource(guid)
134 if rm.get_rtype() not in self._authorized_connections:
135 msg = ("Connection between %s %s and %s %s refused: "
136 "An Application can be connected only to a Node" ) % \
137 (self.get_rtype(), self._guid, rm.get_rtype(), guid)
142 elif len(self.connections) != 0 :
143 msg = ("Connection between %s %s and %s %s refused: "
144 "This Application is already connected" ) % \
145 (self.get_rtype(), self._guid, rm.get_rtype(), guid)
151 msg = "Connection between %s %s and %s %s accepted" % (
152 self.get_rtype(), self._guid, rm.get_rtype(), guid)
158 """ Deploy the RM. It means nothing special for an application
159 for now (later it will be upload sources, ...)
160 It becomes DEPLOYED after getting the xmpp client.
164 self.set('xmppSlice',self.node.get('xmppSlice'))
165 self.set('xmppHost',self.node.get('xmppHost'))
166 self.set('xmppPort',self.node.get('xmppPort'))
167 self.set('xmppPassword',self.node.get('xmppPassword'))
169 if not (self.get('xmppSlice') and self.get('xmppHost')
170 and self.get('xmppPort') and self.get('xmppPassword')):
171 msg = "Credentials are not initialzed. XMPP Connections impossible"
173 raise RuntimeError, msg
175 if not self._omf_api :
176 self._omf_api = OMFAPIFactory.get_api(self.get('xmppSlice'),
177 self.get('xmppHost'), self.get('xmppPort'),
178 self.get('xmppPassword'), exp_id = self.exp_id)
180 if self.get('sources'):
181 gateway = ResourceGateway.AMtoGateway[self.get('xmppHost')]
182 user = self.get('sshUser') or self.get('xmppSlice')
183 dst = user + "@"+ gateway + ":"
184 (out, err), proc = sshfuncs.rcopy(self.get('sources'), dst)
186 super(OMFApplication, self).do_deploy()
189 """ Start the RM. It means : Send Xmpp Message Using OMF protocol
190 to execute the application.
191 It becomes STARTED before the messages are sent (for coordination)
194 if not (self.get('appid') and self.get('path')) :
195 msg = "Application's information are not initialized"
197 raise RuntimeError, msg
199 if not self.get('args'):
200 self.set('args', " ")
201 if not self.get('env'):
204 # Some information to check the information in parameter
205 msg = " " + self.get_rtype() + " ( Guid : " + str(self._guid) +") : " + \
206 self.get('appid') + " : " + self.get('path') + " : " + \
207 self.get('args') + " : " + self.get('env')
210 self._omf_api.execute(self.node.get('hostname'),self.get('appid'), \
211 self.get('args'), self.get('path'), self.get('env'))
213 super(OMFApplication, self).do_start()
216 """ Stop the RM. It means : Send Xmpp Message Using OMF protocol to
217 kill the application.
218 State is set to STOPPED after the message is sent.
222 self._omf_api.exit(self.node.get('hostname'),self.get('appid'))
223 super(OMFApplication, self).do_stop()
225 def do_release(self):
226 """ Clean the RM at the end of the experiment and release the API.
230 OMFAPIFactory.release_api(self.get('xmppSlice'),
231 self.get('xmppHost'), self.get('xmppPort'),
232 self.get('xmppPassword'), exp_id = self.exp_id)
234 super(OMFApplication, self).do_release()