2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19 # Julien Tribino <julien.tribino@inria.fr>
23 from nepi.execution.resource import ResourceManager, clsinit_copy, \
24 ResourceState, reschedule_delay
25 from nepi.execution.attribute import Attribute, Flags
26 from nepi.resources.omf.omf_resource import ResourceGateway, OMFResource
27 from nepi.resources.omf.node import OMFNode
28 from nepi.resources.omf.omf_api_factory import OMFAPIFactory
30 from nepi.util import sshfuncs
33 class OMFApplication(OMFResource):
35 .. class:: Class Args :
37 :param ec: The Experiment controller
38 :type ec: ExperimentController
39 :param guid: guid of the RM
43 _rtype = "OMFApplication"
44 _authorized_connections = ["OMFNode"]
47 def _register_attributes(cls):
48 """ Register the attributes of an OMF application
51 command = Attribute("command", "Command to execute")
52 env = Attribute("env", "Environnement variable of the application")
55 appid = Attribute("appid", "Name of the application")
56 stdin = Attribute("stdin", "Input of the application", default = "")
57 sources = Attribute("sources", "Sources of the application",
58 flags = Flags.ExecReadOnly)
59 sshuser = Attribute("sshUser", "user to connect with ssh",
60 flags = Flags.ExecReadOnly)
61 sshkey = Attribute("sshKey", "key to use for ssh",
62 flags = Flags.ExecReadOnly)
64 cls._register_attribute(appid)
65 cls._register_attribute(command)
66 cls._register_attribute(env)
67 cls._register_attribute(stdin)
68 cls._register_attribute(sources)
69 cls._register_attribute(sshuser)
70 cls._register_attribute(sshkey)
72 def __init__(self, ec, guid):
74 :param ec: The Experiment controller
75 :type ec: ExperimentController
76 :param guid: guid of the RM
78 :param creds: Credentials to communicate with the rm (XmppClient for OMF)
82 super(OMFApplication, self).__init__(ec, guid)
84 self.set('command', "")
93 self._topic_app = None
95 self.release_id = None
99 def _init_command(self):
100 comm = self.get('command').split(' ')
103 self.args = ' '.join(comm[1:])
107 return self.ec.exp_id
111 rm_list = self.get_connected(OMFNode.get_rtype())
112 if rm_list: return rm_list[0]
115 def stdin_hook(self, old_value, new_value):
116 """ Set a hook to the stdin attribute in order to send a message at each time
117 the value of this parameter is changed
120 self._omf_api.send_stdin(self.node.get('hostname'), new_value, self.get('appid'))
123 def add_set_hook(self):
124 """ Initialize the hooks
127 attr = self._attrs["stdin"]
128 attr.set_hook = self.stdin_hook
130 def valid_connection(self, guid):
131 """ Check if the connection with the guid in parameter is possible.
132 Only meaningful connections are allowed.
134 :param guid: Guid of RM it will be connected
139 rm = self.ec.get_resource(guid)
140 if rm.get_rtype() not in self._authorized_connections:
141 msg = ("Connection between %s %s and %s %s refused: "
142 "An Application can be connected only to a Node" ) % \
143 (self.get_rtype(), self._guid, rm.get_rtype(), guid)
148 elif len(self.connections) != 0 :
149 msg = ("Connection between %s %s and %s %s refused: "
150 "This Application is already connected" ) % \
151 (self.get_rtype(), self._guid, rm.get_rtype(), guid)
157 msg = "Connection between %s %s and %s %s accepted" % (
158 self.get_rtype(), self._guid, rm.get_rtype(), guid)
164 """ Deploy the RM. It means nothing special for an application
165 for now (later it will be upload sources, ...)
166 It becomes DEPLOYED after getting the xmpp client.
169 if not self.node or self.node.state < ResourceState.READY:
170 self.debug("---- RESCHEDULING DEPLOY ---- node state %s "
172 self.ec.schedule(reschedule_delay, self.deploy)
177 self.set('xmppUser',self.node.get('xmppUser'))
178 self.set('xmppServer',self.node.get('xmppServer'))
179 self.set('xmppPort',self.node.get('xmppPort'))
180 self.set('xmppPassword',self.node.get('xmppPassword'))
181 self.set('version',self.node.get('version'))
183 if not self.get('xmppServer'):
184 msg = "XmppServer is not initialzed. XMPP Connections impossible"
186 raise RuntimeError, msg
188 if not (self.get('xmppUser') or self.get('xmppPort')
189 or self.get('xmppPassword')):
190 msg = "Credentials are not all initialzed. Default values will be used"
193 if not self._omf_api :
194 self._omf_api = OMFAPIFactory.get_api(self.get('version'),
195 self.get('xmppServer'), self.get('xmppUser'), self.get('xmppPort'),
196 self.get('xmppPassword'), exp_id = self.exp_id)
198 if self.get('version') == "5":
199 if self.get('sources'):
200 gateway = ResourceGateway.AMtoGateway[self.get('xmppServer')]
201 user = self.get('sshUser') or self.get('xmppUser')
202 dst = user + "@"+ gateway + ":"
203 (out, err), proc = sshfuncs.rcopy(self.get('sources'), dst)
206 if not self.create_id:
208 if self.get('command'):
209 props['application:binary_path'] = self.get('command')
210 props['application:hrn'] = self.get('command')
211 props['application:membership'] = self._topic_app
212 props['application:type'] = "application"
214 self.create_id = os.urandom(16).encode('hex')
215 self._omf_api.frcp_create( self.create_id, self.node.get('hostname'), "application", props = props)
217 uid = self.check_deploy(self.create_id)
219 self.ec.schedule(reschedule_delay, self.deploy)
222 self._topic_app = uid
223 self._omf_api.enroll_topic(self._topic_app)
225 super(OMFApplication, self).do_deploy()
227 def check_deploy(self, cid):
228 uid = self._omf_api.check_mailbox("create", cid)
234 """ Start the RM. It means : Send Xmpp Message Using OMF protocol
235 to execute the application.
236 It becomes STARTED before the messages are sent (for coordination)
239 if not self.get('command') :
240 msg = "Application's Command is not initialized"
242 raise RuntimeError, msg
244 if not self.get('env'):
247 if self.get('version') == "5":
248 # Some information to check the command for OMF5
249 msg = " " + self.get_rtype() + " ( Guid : " + str(self._guid) +") : " + \
250 self.get('appid') + " : " + self.path + " : " + \
251 self.args + " : " + self.get('env')
254 self._omf_api.execute(self.node.get('hostname'),self.get('appid'), \
255 self.get('args'), self.get('path'), self.get('env'))
259 props['state'] = "running"
262 guards['type'] = "application"
263 guards['name'] = self.get('command')
265 self._omf_api.frcp_configure(self._topic_app, props = props, guards = guards )
268 super(OMFApplication, self).do_start()
271 """ Stop the RM. It means : Send Xmpp Message Using OMF protocol to
272 kill the application.
273 State is set to STOPPED after the message is sent.
276 if self.get('version') == 5:
277 self._omf_api.exit(self.node.get('hostname'),self.get('appid'))
278 super(OMFApplication, self).do_stop()
280 def check_release(self, cid):
281 res = self._omf_api.check_mailbox("release", cid)
286 def do_release(self):
287 """ Clean the RM at the end of the experiment and release the API.
290 if self.get('version') == "6":
291 if not self.release_id:
292 self.release_id = os.urandom(16).encode('hex')
293 self._omf_api.frcp_release( self.release_id, self.node.get('hostname'),self._topic_app, res_id=self._topic_app)
295 cid = self.check_release(self.release_id)
297 self.ec.schedule(reschedule_delay, self.release)
301 OMFAPIFactory.release_api(self.get('version'),
302 self.get('xmppServer'), self.get('xmppUser'), self.get('xmppPort'),
303 self.get('xmppPassword'), exp_id = self.exp_id)
305 super(OMFApplication, self).do_release()