2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License version 2 as
7 # published by the Free Software Foundation;
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
17 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
18 # Julien Tribino <julien.tribino@inria.fr>
20 from __future__ import print_function
24 from nepi.util.timefuncs import tnow
25 from nepi.execution.resource import ResourceManager, clsinit_copy, \
27 from nepi.execution.trace import Trace, TraceAttr
28 from nepi.execution.attribute import Attribute, Flags
29 from nepi.resources.omf.omf_resource import ResourceGateway, OMFResource
30 from nepi.resources.omf.node import OMFNode, confirmation_counter, reschedule_check
31 from nepi.resources.omf.omf_api_factory import OMFAPIFactory
33 from nepi.util import sshfuncs
36 class OMFApplication(OMFResource):
38 .. class:: Class Args :
40 :param ec: The Experiment controller
41 :type ec: ExperimentController
42 :param guid: guid of the RM
46 _rtype = "omf::Application"
47 _authorized_connections = ["omf::Node", "wilabt::sfa::Node"]
50 def _register_attributes(cls):
51 """ Register the attributes of an OMF application
54 command = Attribute("command", "Command to execute")
55 env = Attribute("env", "Environnement variable of the application")
58 appid = Attribute("appid", "Name of the application")
59 stdin = Attribute("stdin", "Input of the application", default = "")
60 sources = Attribute("sources", "Sources of the application",
62 sshuser = Attribute("sshUser", "user to connect with ssh",
64 sshkey = Attribute("sshKey", "key to use for ssh",
67 cls._register_attribute(appid)
68 cls._register_attribute(command)
69 cls._register_attribute(env)
70 cls._register_attribute(stdin)
71 cls._register_attribute(sources)
72 cls._register_attribute(sshuser)
73 cls._register_attribute(sshkey)
75 def __init__(self, ec, guid):
77 :param ec: The Experiment controller
78 :type ec: ExperimentController
79 :param guid: guid of the RM
81 :param creds: Credentials to communicate with the rm (XmppClient for OMF)
85 super(OMFApplication, self).__init__(ec, guid)
87 self.set('command', "")
96 self._topic_app = None
100 self.release_id = None
101 self._release_cnt = 0
103 # For performance tests
104 self.begin_deploy_time = None
105 self.begin_start_time = None
106 self.begin_release_time = None
113 def _init_command(self):
114 comm = self.get('command').split(' ')
117 self._args = ' '.join(comm[1:])
121 return self.ec.exp_id
125 rm_list = self.get_connected(OMFNode.get_rtype())
126 if rm_list: return rm_list[0]
129 def stdin_hook(self, old_value, new_value):
130 """ Set a hook to the stdin attribute in order to send a message at each time
131 the value of this parameter is changed. Used ofr OMF 5.4 only
134 self._omf_api.send_stdin(self.node.get('hostname'), new_value, self.get('appid'))
137 def add_set_hook(self):
138 """ Initialize the hooks for OMF 5.4 only
141 attr = self._attrs["stdin"]
142 attr.set_hook = self.stdin_hook
144 def valid_connection(self, guid):
145 """ Check if the connection with the guid in parameter is possible.
146 Only meaningful connections are allowed.
148 :param guid: Guid of RM it will be connected
153 rm = self.ec.get_resource(guid)
154 if rm.get_rtype() not in self._authorized_connections:
155 msg = ("Connection between %s %s and %s %s refused: "
156 "An Application can be connected only to a Node" ) % \
157 (self.get_rtype(), self._guid, rm.get_rtype(), guid)
162 elif len(self.connections) != 0 :
163 msg = ("Connection between %s %s and %s %s refused: "
164 "This Application is already connected" ) % \
165 (self.get_rtype(), self._guid, rm.get_rtype(), guid)
171 msg = "Connection between %s %s and %s %s accepted" % (
172 self.get_rtype(), self._guid, rm.get_rtype(), guid)
178 """ Deploy the RM. It means nothing special for an application
179 for now (later it will be upload sources, ...)
180 It becomes DEPLOYED after the topic for the application has been created
183 if not self.node or self.node.state < ResourceState.READY:
184 self.debug("---- RESCHEDULING DEPLOY ---- node state %s "
186 self.ec.schedule(self.reschedule_delay, self.deploy)
189 ## For performance test
191 self.begin_deploy_time = tnow()
196 self.set('xmppUser',self.node.get('xmppUser'))
197 self.set('xmppServer',self.node.get('xmppServer'))
198 self.set('xmppPort',self.node.get('xmppPort'))
199 self.set('xmppPassword',self.node.get('xmppPassword'))
200 self.set('version',self.node.get('version'))
202 if not self.get('xmppServer'):
203 msg = "XmppServer is not initialzed. XMPP Connections impossible"
205 raise RuntimeError, msg
207 if not (self.get('xmppUser') or self.get('xmppPort')
208 or self.get('xmppPassword')):
209 msg = "Credentials are not all initialzed. Default values will be used"
212 if not self.get('command') :
213 msg = "Application's Command is not initialized"
215 raise RuntimeError, msg
217 if not self._omf_api :
218 self._omf_api = OMFAPIFactory.get_api(self.get('version'),
219 self.get('xmppServer'), self.get('xmppUser'), self.get('xmppPort'),
220 self.get('xmppPassword'), exp_id = self.exp_id)
222 if self.get('version') == "5":
224 self.begin_deploy_time = tnow()
226 if self.get('sources'):
227 gateway = ResourceGateway.AMtoGateway[self.get('xmppServer')]
228 user = self.get('sshUser') or self.get('xmppUser')
229 dst = user + "@"+ gateway + ":"
230 (out, err), proc = sshfuncs.rcopy(self.get('sources'), dst)
233 if not self.create_id:
235 if self.get('command'):
236 props['application:binary_path'] = self.get('command')
237 props['application:hrn'] = self.get('command')
238 props['application:membership'] = self._topic_app
239 props['application:type'] = "application"
241 self.create_id = os.urandom(16).encode('hex')
242 self._omf_api.frcp_create( self.create_id, self.node.get('hostname'), "application", props = props)
244 if self._create_cnt > confirmation_counter:
245 msg = "Couldn't retrieve the confirmation of the creation"
247 raise RuntimeError, msg
249 uid = self.check_deploy(self.create_id)
252 self.ec.schedule(reschedule_check, self.deploy)
255 self._topic_app = uid
256 self._omf_api.enroll_topic(self._topic_app)
258 super(OMFApplication, self).do_deploy()
260 def check_deploy(self, cid):
261 """ Check, through the mail box in the parser,
262 if the confirmation of the creation has been received
264 :param cid: the id of the original message
268 uid = self._omf_api.check_mailbox("create", cid)
273 def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
274 self.info("Retrieving '%s' trace %s " % (name, attr))
275 if name == 'stdout' :
277 elif name == 'stderr' :
282 trace_path = '/tmp/'+ self._topic_app + suffix
284 if attr == TraceAttr.PATH:
287 if attr == TraceAttr.ALL:
289 f = open(trace_path ,'r')
291 print("File with traces has not been found")
299 """ Start the RM. It means : Send Xmpp Message Using OMF protocol
300 to execute the application.
303 ## For performance test
305 self.begin_start_time = tnow()
308 if not self.get('env'):
311 if self.get('version') == "5":
312 self.begin_start_time = tnow()
313 # Some information to check the command for OMF5
314 msg = " " + self.get_rtype() + " ( Guid : " + str(self._guid) +") : " + \
315 self.get('appid') + " : " + self._path + " : " + \
316 self._args + " : " + self.get('env')
319 self._omf_api.execute(self.node.get('hostname'),self.get('appid'), \
320 self._args, self._path, self.get('env'))
323 if self._start_cnt == 0:
325 props['state'] = "running"
328 guards['type'] = "application"
329 guards['name'] = self.get('command')
331 self._omf_api.frcp_configure(self._topic_app, props = props, guards = guards )
333 if self._start_cnt > confirmation_counter:
334 msg = "Couldn't retrieve the confirmation that the application started"
336 raise RuntimeError, msg
338 res = self.check_start(self._topic_app)
341 self.ec.schedule(reschedule_check, self.start)
344 super(OMFApplication, self).do_start()
346 def check_start(self, uid):
347 """ Check, through the mail box in the parser,
348 if the confirmation of the start has been received
350 :param uid: the id of the original message
354 res = self._omf_api.check_mailbox("started", uid)
360 """ Stop the RM. It means : Send Xmpp Message Using OMF protocol to
361 kill the application.
362 State is set to STOPPED after the message is sent.
367 if self.get('version') == 5:
368 self._omf_api.exit(self.node.get('hostname'),self.get('appid'))
369 super(OMFApplication, self).do_stop()
371 def check_release(self, cid):
372 """ Check, through the mail box in the parser,
373 if the confirmation of the release has been received
375 :param cid: the id of the original message
379 res = self._omf_api.check_mailbox("release", cid)
384 def do_release(self):
385 """ Clean the RM at the end of the experiment and release the API.
388 ## For performance test
390 self.begin_release_time = tnow()
394 if self.get('version') == "6" and self._topic_app:
395 if not self.release_id:
396 self.release_id = os.urandom(16).encode('hex')
397 self._omf_api.frcp_release( self.release_id, self.node.get('hostname'),self._topic_app, res_id=self._topic_app)
399 if self._release_cnt < confirmation_counter:
400 cid = self.check_release(self.release_id)
402 self._release_cnt +=1
403 self.ec.schedule(reschedule_check, self.release)
406 msg = "Couldn't retrieve the confirmation of the release"
409 # Remove the stdout and stderr of the application
411 os.remove('/tmp/'+self._topic_app +'.out')
412 os.remove('/tmp/'+self._topic_app +'.err')
416 OMFAPIFactory.release_api(self.get('version'),
417 self.get('xmppServer'), self.get('xmppUser'), self.get('xmppPort'),
418 self.get('xmppPassword'), exp_id = self.exp_id)
420 super(OMFApplication, self).do_release()