2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19 # Julien Tribino <julien.tribino@inria.fr>
23 from nepi.execution.resource import ResourceManager, clsinit_copy, \
24 ResourceState, reschedule_delay
25 from nepi.execution.attribute import Attribute, Flags
26 from nepi.resources.omf.omf_resource import ResourceGateway, OMFResource
27 from nepi.resources.omf.node import OMFNode, confirmation_counter
28 from nepi.resources.omf.omf_api_factory import OMFAPIFactory
30 from nepi.util import sshfuncs
33 class OMFApplication(OMFResource):
35 .. class:: Class Args :
37 :param ec: The Experiment controller
38 :type ec: ExperimentController
39 :param guid: guid of the RM
43 _rtype = "OMFApplication"
44 _authorized_connections = ["OMFNode"]
47 def _register_attributes(cls):
48 """ Register the attributes of an OMF application
51 command = Attribute("command", "Command to execute")
52 env = Attribute("env", "Environnement variable of the application")
55 appid = Attribute("appid", "Name of the application")
56 stdin = Attribute("stdin", "Input of the application", default = "")
57 sources = Attribute("sources", "Sources of the application",
58 flags = Flags.ExecReadOnly)
59 sshuser = Attribute("sshUser", "user to connect with ssh",
60 flags = Flags.ExecReadOnly)
61 sshkey = Attribute("sshKey", "key to use for ssh",
62 flags = Flags.ExecReadOnly)
64 cls._register_attribute(appid)
65 cls._register_attribute(command)
66 cls._register_attribute(env)
67 cls._register_attribute(stdin)
68 cls._register_attribute(sources)
69 cls._register_attribute(sshuser)
70 cls._register_attribute(sshkey)
72 def __init__(self, ec, guid):
74 :param ec: The Experiment controller
75 :type ec: ExperimentController
76 :param guid: guid of the RM
78 :param creds: Credentials to communicate with the rm (XmppClient for OMF)
82 super(OMFApplication, self).__init__(ec, guid)
84 self.set('command', "")
93 self._topic_app = None
97 self.release_id = None
102 def _init_command(self):
103 comm = self.get('command').split(' ')
106 self.args = ' '.join(comm[1:])
110 return self.ec.exp_id
114 rm_list = self.get_connected(OMFNode.get_rtype())
115 if rm_list: return rm_list[0]
118 def stdin_hook(self, old_value, new_value):
119 """ Set a hook to the stdin attribute in order to send a message at each time
120 the value of this parameter is changed
123 self._omf_api.send_stdin(self.node.get('hostname'), new_value, self.get('appid'))
126 def add_set_hook(self):
127 """ Initialize the hooks
130 attr = self._attrs["stdin"]
131 attr.set_hook = self.stdin_hook
133 def valid_connection(self, guid):
134 """ Check if the connection with the guid in parameter is possible.
135 Only meaningful connections are allowed.
137 :param guid: Guid of RM it will be connected
142 rm = self.ec.get_resource(guid)
143 if rm.get_rtype() not in self._authorized_connections:
144 msg = ("Connection between %s %s and %s %s refused: "
145 "An Application can be connected only to a Node" ) % \
146 (self.get_rtype(), self._guid, rm.get_rtype(), guid)
151 elif len(self.connections) != 0 :
152 msg = ("Connection between %s %s and %s %s refused: "
153 "This Application is already connected" ) % \
154 (self.get_rtype(), self._guid, rm.get_rtype(), guid)
160 msg = "Connection between %s %s and %s %s accepted" % (
161 self.get_rtype(), self._guid, rm.get_rtype(), guid)
167 """ Deploy the RM. It means nothing special for an application
168 for now (later it will be upload sources, ...)
169 It becomes DEPLOYED after getting the xmpp client.
172 if not self.node or self.node.state < ResourceState.READY:
173 self.debug("---- RESCHEDULING DEPLOY ---- node state %s "
175 self.ec.schedule(reschedule_delay, self.deploy)
180 self.set('xmppUser',self.node.get('xmppUser'))
181 self.set('xmppServer',self.node.get('xmppServer'))
182 self.set('xmppPort',self.node.get('xmppPort'))
183 self.set('xmppPassword',self.node.get('xmppPassword'))
184 self.set('version',self.node.get('version'))
186 if not self.get('xmppServer'):
187 msg = "XmppServer is not initialzed. XMPP Connections impossible"
189 raise RuntimeError, msg
191 if not (self.get('xmppUser') or self.get('xmppPort')
192 or self.get('xmppPassword')):
193 msg = "Credentials are not all initialzed. Default values will be used"
196 if not self._omf_api :
197 self._omf_api = OMFAPIFactory.get_api(self.get('version'),
198 self.get('xmppServer'), self.get('xmppUser'), self.get('xmppPort'),
199 self.get('xmppPassword'), exp_id = self.exp_id)
201 if self.get('version') == "5":
202 if self.get('sources'):
203 gateway = ResourceGateway.AMtoGateway[self.get('xmppServer')]
204 user = self.get('sshUser') or self.get('xmppUser')
205 dst = user + "@"+ gateway + ":"
206 (out, err), proc = sshfuncs.rcopy(self.get('sources'), dst)
209 if not self.create_id:
211 if self.get('command'):
212 props['application:binary_path'] = self.get('command')
213 props['application:hrn'] = self.get('command')
214 props['application:membership'] = self._topic_app
215 props['application:type'] = "application"
217 self.create_id = os.urandom(16).encode('hex')
218 self._omf_api.frcp_create( self.create_id, self.node.get('hostname'), "application", props = props)
220 if self._create_cnt > confirmation_counter:
221 msg = "Couldn't retrieve the confirmation of the creation"
223 raise RuntimeError, msg
225 uid = self.check_deploy(self.create_id)
228 self.ec.schedule(reschedule_delay, self.deploy)
231 self._topic_app = uid
232 self._omf_api.enroll_topic(self._topic_app)
234 super(OMFApplication, self).do_deploy()
236 def check_deploy(self, cid):
237 uid = self._omf_api.check_mailbox("create", cid)
243 """ Start the RM. It means : Send Xmpp Message Using OMF protocol
244 to execute the application.
245 It becomes STARTED before the messages are sent (for coordination)
248 if not self.get('command') :
249 msg = "Application's Command is not initialized"
251 raise RuntimeError, msg
253 if not self.get('env'):
256 if self.get('version') == "5":
257 # Some information to check the command for OMF5
258 msg = " " + self.get_rtype() + " ( Guid : " + str(self._guid) +") : " + \
259 self.get('appid') + " : " + self.path + " : " + \
260 self.args + " : " + self.get('env')
263 self._omf_api.execute(self.node.get('hostname'),self.get('appid'), \
264 self.get('args'), self.get('path'), self.get('env'))
267 if self._start_cnt == 0:
269 props['state'] = "running"
272 guards['type'] = "application"
273 guards['name'] = self.get('command')
275 self._omf_api.frcp_configure(self._topic_app, props = props, guards = guards )
277 if self._start_cnt > confirmation_counter:
278 msg = "Couldn't retrieve the confirmation that the application started"
280 raise RuntimeError, msg
282 res = self.check_start(self._topic_app)
285 self.ec.schedule(reschedule_delay, self.start)
288 super(OMFApplication, self).do_start()
290 def check_start(self, uid):
291 res = self._omf_api.check_mailbox("started", uid)
297 """ Stop the RM. It means : Send Xmpp Message Using OMF protocol to
298 kill the application.
299 State is set to STOPPED after the message is sent.
302 if self.get('version') == 5:
303 self._omf_api.exit(self.node.get('hostname'),self.get('appid'))
304 super(OMFApplication, self).do_stop()
306 def check_release(self, cid):
307 res = self._omf_api.check_mailbox("release", cid)
312 def do_release(self):
313 """ Clean the RM at the end of the experiment and release the API.
316 if self.get('version') == "6":
317 if not self.release_id:
318 self.release_id = os.urandom(16).encode('hex')
319 self._omf_api.frcp_release( self.release_id, self.node.get('hostname'),self._topic_app, res_id=self._topic_app)
321 if self._release_cnt < confirmation_counter:
322 cid = self.check_release(self.release_id)
324 self._release_cnt +=1
325 self.ec.schedule(reschedule_delay, self.release)
328 msg = "Couldn't retrieve the confirmation of the release"
332 OMFAPIFactory.release_api(self.get('version'),
333 self.get('xmppServer'), self.get('xmppUser'), self.get('xmppPort'),
334 self.get('xmppPassword'), exp_id = self.exp_id)
336 super(OMFApplication, self).do_release()