2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19 # Julien Tribino <julien.tribino@inria.fr>
23 from nepi.util.timefuncs import tnow
24 from nepi.execution.resource import ResourceManager, clsinit_copy, \
26 from nepi.execution.trace import Trace, TraceAttr
27 from nepi.execution.attribute import Attribute, Flags
28 from nepi.resources.omf.omf_resource import ResourceGateway, OMFResource
29 from nepi.resources.omf.node import OMFNode, confirmation_counter, reschedule_check
30 from nepi.resources.omf.omf_api_factory import OMFAPIFactory
32 from nepi.util import sshfuncs
35 class OMFApplication(OMFResource):
37 .. class:: Class Args :
39 :param ec: The Experiment controller
40 :type ec: ExperimentController
41 :param guid: guid of the RM
45 _rtype = "OMFApplication"
46 _authorized_connections = ["OMFNode", "WilabtSfaNode"]
49 def _register_attributes(cls):
50 """ Register the attributes of an OMF application
53 command = Attribute("command", "Command to execute")
54 env = Attribute("env", "Environnement variable of the application")
57 appid = Attribute("appid", "Name of the application")
58 stdin = Attribute("stdin", "Input of the application", default = "")
59 sources = Attribute("sources", "Sources of the application",
61 sshuser = Attribute("sshUser", "user to connect with ssh",
63 sshkey = Attribute("sshKey", "key to use for ssh",
66 cls._register_attribute(appid)
67 cls._register_attribute(command)
68 cls._register_attribute(env)
69 cls._register_attribute(stdin)
70 cls._register_attribute(sources)
71 cls._register_attribute(sshuser)
72 cls._register_attribute(sshkey)
74 def __init__(self, ec, guid):
76 :param ec: The Experiment controller
77 :type ec: ExperimentController
78 :param guid: guid of the RM
80 :param creds: Credentials to communicate with the rm (XmppClient for OMF)
84 super(OMFApplication, self).__init__(ec, guid)
86 self.set('command', "")
95 self._topic_app = None
99 self.release_id = None
100 self._release_cnt = 0
102 # For performance tests
103 self.begin_deploy_time = None
104 self.begin_start_time = None
105 self.begin_release_time = None
112 def _init_command(self):
113 comm = self.get('command').split(' ')
116 self._args = ' '.join(comm[1:])
120 return self.ec.exp_id
124 rm_list = self.get_connected(OMFNode.get_rtype())
125 if rm_list: return rm_list[0]
128 def stdin_hook(self, old_value, new_value):
129 """ Set a hook to the stdin attribute in order to send a message at each time
130 the value of this parameter is changed. Used ofr OMF 5.4 only
133 self._omf_api.send_stdin(self.node.get('hostname'), new_value, self.get('appid'))
136 def add_set_hook(self):
137 """ Initialize the hooks for OMF 5.4 only
140 attr = self._attrs["stdin"]
141 attr.set_hook = self.stdin_hook
143 def valid_connection(self, guid):
144 """ Check if the connection with the guid in parameter is possible.
145 Only meaningful connections are allowed.
147 :param guid: Guid of RM it will be connected
152 rm = self.ec.get_resource(guid)
153 if rm.get_rtype() not in self._authorized_connections:
154 msg = ("Connection between %s %s and %s %s refused: "
155 "An Application can be connected only to a Node" ) % \
156 (self.get_rtype(), self._guid, rm.get_rtype(), guid)
161 elif len(self.connections) != 0 :
162 msg = ("Connection between %s %s and %s %s refused: "
163 "This Application is already connected" ) % \
164 (self.get_rtype(), self._guid, rm.get_rtype(), guid)
170 msg = "Connection between %s %s and %s %s accepted" % (
171 self.get_rtype(), self._guid, rm.get_rtype(), guid)
177 """ Deploy the RM. It means nothing special for an application
178 for now (later it will be upload sources, ...)
179 It becomes DEPLOYED after the topic for the application has been created
182 if not self.node or self.node.state < ResourceState.READY:
183 self.debug("---- RESCHEDULING DEPLOY ---- node state %s "
185 self.ec.schedule(self.reschedule_delay, self.deploy)
188 ## For performance test
190 self.begin_deploy_time = tnow()
195 self.set('xmppUser',self.node.get('xmppUser'))
196 self.set('xmppServer',self.node.get('xmppServer'))
197 self.set('xmppPort',self.node.get('xmppPort'))
198 self.set('xmppPassword',self.node.get('xmppPassword'))
199 self.set('version',self.node.get('version'))
201 if not self.get('xmppServer'):
202 msg = "XmppServer is not initialzed. XMPP Connections impossible"
204 raise RuntimeError, msg
206 if not (self.get('xmppUser') or self.get('xmppPort')
207 or self.get('xmppPassword')):
208 msg = "Credentials are not all initialzed. Default values will be used"
211 if not self.get('command') :
212 msg = "Application's Command is not initialized"
214 raise RuntimeError, msg
216 if not self._omf_api :
217 self._omf_api = OMFAPIFactory.get_api(self.get('version'),
218 self.get('xmppServer'), self.get('xmppUser'), self.get('xmppPort'),
219 self.get('xmppPassword'), exp_id = self.exp_id)
221 if self.get('version') == "5":
223 self.begin_deploy_time = tnow()
225 if self.get('sources'):
226 gateway = ResourceGateway.AMtoGateway[self.get('xmppServer')]
227 user = self.get('sshUser') or self.get('xmppUser')
228 dst = user + "@"+ gateway + ":"
229 (out, err), proc = sshfuncs.rcopy(self.get('sources'), dst)
232 if not self.create_id:
234 if self.get('command'):
235 props['application:binary_path'] = self.get('command')
236 props['application:hrn'] = self.get('command')
237 props['application:membership'] = self._topic_app
238 props['application:type'] = "application"
240 self.create_id = os.urandom(16).encode('hex')
241 self._omf_api.frcp_create( self.create_id, self.node.get('hostname'), "application", props = props)
243 if self._create_cnt > confirmation_counter:
244 msg = "Couldn't retrieve the confirmation of the creation"
246 raise RuntimeError, msg
248 uid = self.check_deploy(self.create_id)
251 self.ec.schedule(reschedule_check, self.deploy)
254 self._topic_app = uid
255 self._omf_api.enroll_topic(self._topic_app)
257 super(OMFApplication, self).do_deploy()
259 def check_deploy(self, cid):
260 """ Check, through the mail box in the parser,
261 if the confirmation of the creation has been received
263 :param cid: the id of the original message
267 uid = self._omf_api.check_mailbox("create", cid)
272 def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
273 self.info("Retrieving '%s' trace %s " % (name, attr))
274 if name == 'stdout' :
276 elif name == 'stderr' :
281 trace_path = '/tmp/'+ self._topic_app + suffix
283 if attr == TraceAttr.PATH:
286 if attr == TraceAttr.ALL:
288 f = open(trace_path ,'r')
290 print "File with traces has not been found"
298 """ Start the RM. It means : Send Xmpp Message Using OMF protocol
299 to execute the application.
302 ## For performance test
304 self.begin_start_time = tnow()
307 if not self.get('env'):
310 if self.get('version') == "5":
311 self.begin_start_time = tnow()
312 # Some information to check the command for OMF5
313 msg = " " + self.get_rtype() + " ( Guid : " + str(self._guid) +") : " + \
314 self.get('appid') + " : " + self._path + " : " + \
315 self._args + " : " + self.get('env')
318 self._omf_api.execute(self.node.get('hostname'),self.get('appid'), \
319 self._args, self._path, self.get('env'))
322 if self._start_cnt == 0:
324 props['state'] = "running"
327 guards['type'] = "application"
328 guards['name'] = self.get('command')
330 self._omf_api.frcp_configure(self._topic_app, props = props, guards = guards )
332 if self._start_cnt > confirmation_counter:
333 msg = "Couldn't retrieve the confirmation that the application started"
335 raise RuntimeError, msg
337 res = self.check_start(self._topic_app)
340 self.ec.schedule(reschedule_check, self.start)
343 super(OMFApplication, self).do_start()
345 def check_start(self, uid):
346 """ Check, through the mail box in the parser,
347 if the confirmation of the start has been received
349 :param uid: the id of the original message
353 res = self._omf_api.check_mailbox("started", uid)
359 """ Stop the RM. It means : Send Xmpp Message Using OMF protocol to
360 kill the application.
361 State is set to STOPPED after the message is sent.
366 if self.get('version') == 5:
367 self._omf_api.exit(self.node.get('hostname'),self.get('appid'))
368 super(OMFApplication, self).do_stop()
370 def check_release(self, cid):
371 """ Check, through the mail box in the parser,
372 if the confirmation of the release has been received
374 :param cid: the id of the original message
378 res = self._omf_api.check_mailbox("release", cid)
383 def do_release(self):
384 """ Clean the RM at the end of the experiment and release the API.
387 ## For performance test
389 self.begin_release_time = tnow()
393 if self.get('version') == "6" and self._topic_app:
394 if not self.release_id:
395 self.release_id = os.urandom(16).encode('hex')
396 self._omf_api.frcp_release( self.release_id, self.node.get('hostname'),self._topic_app, res_id=self._topic_app)
398 if self._release_cnt < confirmation_counter:
399 cid = self.check_release(self.release_id)
401 self._release_cnt +=1
402 self.ec.schedule(reschedule_check, self.release)
405 msg = "Couldn't retrieve the confirmation of the release"
408 # Remove the stdout and stderr of the application
410 os.remove('/tmp/'+self._topic_app +'.out')
411 os.remove('/tmp/'+self._topic_app +'.err')
415 OMFAPIFactory.release_api(self.get('version'),
416 self.get('xmppServer'), self.get('xmppUser'), self.get('xmppPort'),
417 self.get('xmppPassword'), exp_id = self.exp_id)
419 super(OMFApplication, self).do_release()