2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19 # Julien Tribino <julien.tribino@inria.fr>
23 from nepi.execution.resource import ResourceManager, clsinit_copy, \
24 ResourceState, reschedule_delay
25 from nepi.execution.attribute import Attribute, Flags
26 from nepi.resources.omf.omf_resource import ResourceGateway, OMFResource
27 from nepi.resources.omf.node import OMFNode, confirmation_counter, reschedule_check
28 from nepi.resources.omf.omf_api_factory import OMFAPIFactory
30 from nepi.util import sshfuncs
33 class OMFApplication(OMFResource):
35 .. class:: Class Args :
37 :param ec: The Experiment controller
38 :type ec: ExperimentController
39 :param guid: guid of the RM
43 _rtype = "OMFApplication"
44 _authorized_connections = ["OMFNode", "WilabtSfaNode"]
47 def _register_attributes(cls):
48 """ Register the attributes of an OMF application
51 command = Attribute("command", "Command to execute")
52 env = Attribute("env", "Environnement variable of the application")
55 appid = Attribute("appid", "Name of the application")
56 stdin = Attribute("stdin", "Input of the application", default = "")
57 sources = Attribute("sources", "Sources of the application",
59 sshuser = Attribute("sshUser", "user to connect with ssh",
61 sshkey = Attribute("sshKey", "key to use for ssh",
64 cls._register_attribute(appid)
65 cls._register_attribute(command)
66 cls._register_attribute(env)
67 cls._register_attribute(stdin)
68 cls._register_attribute(sources)
69 cls._register_attribute(sshuser)
70 cls._register_attribute(sshkey)
72 def __init__(self, ec, guid):
74 :param ec: The Experiment controller
75 :type ec: ExperimentController
76 :param guid: guid of the RM
78 :param creds: Credentials to communicate with the rm (XmppClient for OMF)
82 super(OMFApplication, self).__init__(ec, guid)
84 self.set('command', "")
93 self._topic_app = None
97 self.release_id = None
102 def _init_command(self):
103 comm = self.get('command').split(' ')
106 self._args = ' '.join(comm[1:])
110 return self.ec.exp_id
114 rm_list = self.get_connected(OMFNode.get_rtype())
115 if rm_list: return rm_list[0]
118 def stdin_hook(self, old_value, new_value):
119 """ Set a hook to the stdin attribute in order to send a message at each time
120 the value of this parameter is changed. Used ofr OMF 5.4 only
123 self._omf_api.send_stdin(self.node.get('hostname'), new_value, self.get('appid'))
126 def add_set_hook(self):
127 """ Initialize the hooks for OMF 5.4 only
130 attr = self._attrs["stdin"]
131 attr.set_hook = self.stdin_hook
133 def valid_connection(self, guid):
134 """ Check if the connection with the guid in parameter is possible.
135 Only meaningful connections are allowed.
137 :param guid: Guid of RM it will be connected
142 rm = self.ec.get_resource(guid)
143 if rm.get_rtype() not in self._authorized_connections:
144 msg = ("Connection between %s %s and %s %s refused: "
145 "An Application can be connected only to a Node" ) % \
146 (self.get_rtype(), self._guid, rm.get_rtype(), guid)
151 elif len(self.connections) != 0 :
152 msg = ("Connection between %s %s and %s %s refused: "
153 "This Application is already connected" ) % \
154 (self.get_rtype(), self._guid, rm.get_rtype(), guid)
160 msg = "Connection between %s %s and %s %s accepted" % (
161 self.get_rtype(), self._guid, rm.get_rtype(), guid)
167 """ Deploy the RM. It means nothing special for an application
168 for now (later it will be upload sources, ...)
169 It becomes DEPLOYED after the topic for the application has been created
172 if not self.node or self.node.state < ResourceState.READY:
173 self.debug("---- RESCHEDULING DEPLOY ---- node state %s "
175 self.ec.schedule(reschedule_delay, self.deploy)
180 self.set('xmppUser',self.node.get('xmppUser'))
181 self.set('xmppServer',self.node.get('xmppServer'))
182 self.set('xmppPort',self.node.get('xmppPort'))
183 self.set('xmppPassword',self.node.get('xmppPassword'))
184 self.set('version',self.node.get('version'))
186 if not self.get('xmppServer'):
187 msg = "XmppServer is not initialzed. XMPP Connections impossible"
189 raise RuntimeError, msg
191 if not (self.get('xmppUser') or self.get('xmppPort')
192 or self.get('xmppPassword')):
193 msg = "Credentials are not all initialzed. Default values will be used"
196 if not self.get('command') :
197 msg = "Application's Command is not initialized"
199 raise RuntimeError, msg
201 if not self._omf_api :
202 self._omf_api = OMFAPIFactory.get_api(self.get('version'),
203 self.get('xmppServer'), self.get('xmppUser'), self.get('xmppPort'),
204 self.get('xmppPassword'), exp_id = self.exp_id)
206 if self.get('version') == "5":
207 if self.get('sources'):
208 gateway = ResourceGateway.AMtoGateway[self.get('xmppServer')]
209 user = self.get('sshUser') or self.get('xmppUser')
210 dst = user + "@"+ gateway + ":"
211 (out, err), proc = sshfuncs.rcopy(self.get('sources'), dst)
214 if not self.create_id:
216 if self.get('command'):
217 props['application:binary_path'] = self.get('command')
218 props['application:hrn'] = self.get('command')
219 props['application:membership'] = self._topic_app
220 props['application:type'] = "application"
222 self.create_id = os.urandom(16).encode('hex')
223 self._omf_api.frcp_create( self.create_id, self.node.get('hostname'), "application", props = props)
225 if self._create_cnt > confirmation_counter:
226 msg = "Couldn't retrieve the confirmation of the creation"
228 raise RuntimeError, msg
230 uid = self.check_deploy(self.create_id)
233 self.ec.schedule(reschedule_check, self.deploy)
236 self._topic_app = uid
237 self._omf_api.enroll_topic(self._topic_app)
239 super(OMFApplication, self).do_deploy()
241 def check_deploy(self, cid):
242 """ Check, through the mail box in the parser,
243 if the confirmation of the creation has been received
245 :param cid: the id of the original message
249 uid = self._omf_api.check_mailbox("create", cid)
255 """ Start the RM. It means : Send Xmpp Message Using OMF protocol
256 to execute the application.
260 if not self.get('env'):
263 if self.get('version') == "5":
264 # Some information to check the command for OMF5
265 msg = " " + self.get_rtype() + " ( Guid : " + str(self._guid) +") : " + \
266 self.get('appid') + " : " + self._path + " : " + \
267 self._args + " : " + self.get('env')
270 self._omf_api.execute(self.node.get('hostname'),self.get('appid'), \
271 self._args, self._path, self.get('env'))
274 if self._start_cnt == 0:
276 props['state'] = "running"
279 guards['type'] = "application"
280 guards['name'] = self.get('command')
282 self._omf_api.frcp_configure(self._topic_app, props = props, guards = guards )
284 if self._start_cnt > confirmation_counter:
285 msg = "Couldn't retrieve the confirmation that the application started"
287 raise RuntimeError, msg
289 res = self.check_start(self._topic_app)
292 self.ec.schedule(reschedule_check, self.start)
295 super(OMFApplication, self).do_start()
297 def check_start(self, uid):
298 """ Check, through the mail box in the parser,
299 if the confirmation of the start has been received
301 :param uid: the id of the original message
305 res = self._omf_api.check_mailbox("started", uid)
311 """ Stop the RM. It means : Send Xmpp Message Using OMF protocol to
312 kill the application.
313 State is set to STOPPED after the message is sent.
316 if self.get('version') == 5:
317 self._omf_api.exit(self.node.get('hostname'),self.get('appid'))
318 super(OMFApplication, self).do_stop()
320 def check_release(self, cid):
321 """ Check, through the mail box in the parser,
322 if the confirmation of the release has been received
324 :param cid: the id of the original message
328 res = self._omf_api.check_mailbox("release", cid)
333 def do_release(self):
334 """ Clean the RM at the end of the experiment and release the API.
338 if self.get('version') == "6" and self._topic_app:
339 if not self.release_id:
340 self.release_id = os.urandom(16).encode('hex')
341 self._omf_api.frcp_release( self.release_id, self.node.get('hostname'),self._topic_app, res_id=self._topic_app)
343 if self._release_cnt < confirmation_counter:
344 cid = self.check_release(self.release_id)
346 self._release_cnt +=1
347 self.ec.schedule(reschedule_check, self.release)
350 msg = "Couldn't retrieve the confirmation of the release"
354 OMFAPIFactory.release_api(self.get('version'),
355 self.get('xmppServer'), self.get('xmppUser'), self.get('xmppPort'),
356 self.get('xmppPassword'), exp_id = self.exp_id)
358 super(OMFApplication, self).do_release()