2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19 # Julien Tribino <julien.tribino@inria.fr>
23 from nepi.util.timefuncs import tnow
24 from nepi.execution.resource import ResourceManager, clsinit_copy, \
25 ResourceState, reschedule_delay
26 from nepi.execution.attribute import Attribute, Flags
27 from nepi.resources.omf.omf_resource import ResourceGateway, OMFResource
28 from nepi.resources.omf.node import OMFNode, confirmation_counter, reschedule_check
29 from nepi.resources.omf.omf_api_factory import OMFAPIFactory
31 from nepi.util import sshfuncs
34 class OMFApplication(OMFResource):
36 .. class:: Class Args :
38 :param ec: The Experiment controller
39 :type ec: ExperimentController
40 :param guid: guid of the RM
44 _rtype = "OMFApplication"
45 _authorized_connections = ["OMFNode", "WilabtSfaNode"]
48 def _register_attributes(cls):
49 """ Register the attributes of an OMF application
52 command = Attribute("command", "Command to execute")
53 env = Attribute("env", "Environnement variable of the application")
56 appid = Attribute("appid", "Name of the application")
57 stdin = Attribute("stdin", "Input of the application", default = "")
58 sources = Attribute("sources", "Sources of the application",
60 sshuser = Attribute("sshUser", "user to connect with ssh",
62 sshkey = Attribute("sshKey", "key to use for ssh",
65 cls._register_attribute(appid)
66 cls._register_attribute(command)
67 cls._register_attribute(env)
68 cls._register_attribute(stdin)
69 cls._register_attribute(sources)
70 cls._register_attribute(sshuser)
71 cls._register_attribute(sshkey)
73 def __init__(self, ec, guid):
75 :param ec: The Experiment controller
76 :type ec: ExperimentController
77 :param guid: guid of the RM
79 :param creds: Credentials to communicate with the rm (XmppClient for OMF)
83 super(OMFApplication, self).__init__(ec, guid)
85 self.set('command', "")
94 self._topic_app = None
98 self.release_id = None
101 # For performance tests
102 self.begin_deploy_time = None
103 self.begin_start_time = None
104 self.begin_release_time = None
111 def _init_command(self):
112 comm = self.get('command').split(' ')
115 self._args = ' '.join(comm[1:])
119 return self.ec.exp_id
123 rm_list = self.get_connected(OMFNode.get_rtype())
124 if rm_list: return rm_list[0]
127 def stdin_hook(self, old_value, new_value):
128 """ Set a hook to the stdin attribute in order to send a message at each time
129 the value of this parameter is changed. Used ofr OMF 5.4 only
132 self._omf_api.send_stdin(self.node.get('hostname'), new_value, self.get('appid'))
135 def add_set_hook(self):
136 """ Initialize the hooks for OMF 5.4 only
139 attr = self._attrs["stdin"]
140 attr.set_hook = self.stdin_hook
142 def valid_connection(self, guid):
143 """ Check if the connection with the guid in parameter is possible.
144 Only meaningful connections are allowed.
146 :param guid: Guid of RM it will be connected
151 rm = self.ec.get_resource(guid)
152 if rm.get_rtype() not in self._authorized_connections:
153 msg = ("Connection between %s %s and %s %s refused: "
154 "An Application can be connected only to a Node" ) % \
155 (self.get_rtype(), self._guid, rm.get_rtype(), guid)
160 elif len(self.connections) != 0 :
161 msg = ("Connection between %s %s and %s %s refused: "
162 "This Application is already connected" ) % \
163 (self.get_rtype(), self._guid, rm.get_rtype(), guid)
169 msg = "Connection between %s %s and %s %s accepted" % (
170 self.get_rtype(), self._guid, rm.get_rtype(), guid)
176 """ Deploy the RM. It means nothing special for an application
177 for now (later it will be upload sources, ...)
178 It becomes DEPLOYED after the topic for the application has been created
181 if not self.node or self.node.state < ResourceState.READY:
182 self.debug("---- RESCHEDULING DEPLOY ---- node state %s "
184 self.ec.schedule(reschedule_delay, self.deploy)
187 ## For performance test
189 self.begin_deploy_time = tnow()
194 self.set('xmppUser',self.node.get('xmppUser'))
195 self.set('xmppServer',self.node.get('xmppServer'))
196 self.set('xmppPort',self.node.get('xmppPort'))
197 self.set('xmppPassword',self.node.get('xmppPassword'))
198 self.set('version',self.node.get('version'))
200 if not self.get('xmppServer'):
201 msg = "XmppServer is not initialzed. XMPP Connections impossible"
203 raise RuntimeError, msg
205 if not (self.get('xmppUser') or self.get('xmppPort')
206 or self.get('xmppPassword')):
207 msg = "Credentials are not all initialzed. Default values will be used"
210 if not self.get('command') :
211 msg = "Application's Command is not initialized"
213 raise RuntimeError, msg
215 if not self._omf_api :
216 self._omf_api = OMFAPIFactory.get_api(self.get('version'),
217 self.get('xmppServer'), self.get('xmppUser'), self.get('xmppPort'),
218 self.get('xmppPassword'), exp_id = self.exp_id)
220 if self.get('version') == "5":
222 self.begin_deploy_time = tnow()
224 if self.get('sources'):
225 gateway = ResourceGateway.AMtoGateway[self.get('xmppServer')]
226 user = self.get('sshUser') or self.get('xmppUser')
227 dst = user + "@"+ gateway + ":"
228 (out, err), proc = sshfuncs.rcopy(self.get('sources'), dst)
231 if not self.create_id:
233 if self.get('command'):
234 props['application:binary_path'] = self.get('command')
235 props['application:hrn'] = self.get('command')
236 props['application:membership'] = self._topic_app
237 props['application:type'] = "application"
239 self.create_id = os.urandom(16).encode('hex')
240 self._omf_api.frcp_create( self.create_id, self.node.get('hostname'), "application", props = props)
242 if self._create_cnt > confirmation_counter:
243 msg = "Couldn't retrieve the confirmation of the creation"
245 raise RuntimeError, msg
247 uid = self.check_deploy(self.create_id)
250 self.ec.schedule(reschedule_check, self.deploy)
253 self._topic_app = uid
254 self._omf_api.enroll_topic(self._topic_app)
256 super(OMFApplication, self).do_deploy()
258 def check_deploy(self, cid):
259 """ Check, through the mail box in the parser,
260 if the confirmation of the creation has been received
262 :param cid: the id of the original message
266 uid = self._omf_api.check_mailbox("create", cid)
271 def trace_filepath(self, filename):
272 return os.path.join('~/', filename)
274 def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
275 self.info("Retrieving '%s' trace %s " % (name, attr))
277 path = self.trace_filepath(str(self.guid) + '_' + name)
279 command = "(test -f %s && echo 'success') || echo 'error'" % path
280 (out, err), proc = self.node.execute(command)
282 if (err and proc.poll()) or out.find("error") != -1:
283 msg = " Couldn't find trace %s " % name
284 self.error(msg, out, err)
287 if attr == TraceAttr.PATH:
290 if attr == TraceAttr.ALL:
291 (out, err), proc = self.node.check_output(self.run_home, name)
294 msg = " Couldn't read trace %s " % name
295 self.error(msg, out, err)
302 def check_output(self, home, filename):
303 """ Retrives content of file """
304 (out, err), proc = self.execute("cat %s" %
305 os.path.join(home, filename), retry = 1, with_lock = True)
306 return (out, err), proc
310 """ Start the RM. It means : Send Xmpp Message Using OMF protocol
311 to execute the application.
314 ## For performance test
316 self.begin_start_time = tnow()
319 if not self.get('env'):
322 if self.get('version') == "5":
323 self.begin_start_time = tnow()
324 # Some information to check the command for OMF5
325 msg = " " + self.get_rtype() + " ( Guid : " + str(self._guid) +") : " + \
326 self.get('appid') + " : " + self._path + " : " + \
327 self._args + " : " + self.get('env')
330 self._omf_api.execute(self.node.get('hostname'),self.get('appid'), \
331 self._args, self._path, self.get('env'))
334 if self._start_cnt == 0:
336 props['state'] = "running"
339 guards['type'] = "application"
340 guards['name'] = self.get('command')
342 self._omf_api.frcp_configure(self._topic_app, props = props, guards = guards )
344 if self._start_cnt > confirmation_counter:
345 msg = "Couldn't retrieve the confirmation that the application started"
347 raise RuntimeError, msg
349 res = self.check_start(self._topic_app)
352 self.ec.schedule(reschedule_check, self.start)
355 super(OMFApplication, self).do_start()
357 def check_start(self, uid):
358 """ Check, through the mail box in the parser,
359 if the confirmation of the start has been received
361 :param uid: the id of the original message
365 res = self._omf_api.check_mailbox("started", uid)
371 """ Stop the RM. It means : Send Xmpp Message Using OMF protocol to
372 kill the application.
373 State is set to STOPPED after the message is sent.
376 if self.get('version') == 5:
377 self._omf_api.exit(self.node.get('hostname'),self.get('appid'))
378 super(OMFApplication, self).do_stop()
380 def check_release(self, cid):
381 """ Check, through the mail box in the parser,
382 if the confirmation of the release has been received
384 :param cid: the id of the original message
388 res = self._omf_api.check_mailbox("release", cid)
393 def do_release(self):
394 """ Clean the RM at the end of the experiment and release the API.
397 ## For performance test
399 self.begin_release_time = tnow()
403 if self.get('version') == "6" and self._topic_app:
404 if not self.release_id:
405 self.release_id = os.urandom(16).encode('hex')
406 self._omf_api.frcp_release( self.release_id, self.node.get('hostname'),self._topic_app, res_id=self._topic_app)
408 if self._release_cnt < confirmation_counter:
409 cid = self.check_release(self.release_id)
411 self._release_cnt +=1
412 self.ec.schedule(reschedule_check, self.release)
415 msg = "Couldn't retrieve the confirmation of the release"
419 OMFAPIFactory.release_api(self.get('version'),
420 self.get('xmppServer'), self.get('xmppUser'), self.get('xmppPort'),
421 self.get('xmppPassword'), exp_id = self.exp_id)
423 super(OMFApplication, self).do_release()