10 from neco.resources.omf.omf_client import OMFClient
11 from neco.resources.omf.omf_messages_5_4 import MessageHandler
15 .. class:: Class Args :
17 :param slice: Xmpp Slice
19 :param host: Xmpp Server
21 :param port: Xmpp Port
23 :param password: Xmpp password
25 :param xmpp_root: Root of the Xmpp Topic Architecture
30 This class is the implementation of an OMF 5.4 API. Since the version 5.4.1, the Topic Architecture start with OMF_5.4 instead of OMF used for OMF5.3
33 def __init__(self, slice, host, port, password, xmpp_root = None):
36 :param slice: Xmpp Slice
38 :param host: Xmpp Server
40 :param port: Xmpp Port
42 :param password: Xmpp password
44 :param xmpp_root: Root of the Xmpp Topic Architecture
48 date = datetime.datetime.now().strftime("%Y-%m-%dt%H.%M.%S")
49 tz = -time.altzone if time.daylight != 0 else -time.timezone
50 date += "%+06.2f" % (tz / 3600) # timezone difference is in seconds
51 self._user = "%s-%s" % (slice, date)
55 self._password = password
57 self._xmpp_root = xmpp_root or "OMF_5.4"
59 self._logger = logging.getLogger("neco.omf.omfApi ")
60 self._logger.setLevel(neco.LOGLEVEL)
67 if sys.version_info < (3, 0):
69 sys.setdefaultencoding('utf8')
71 # instantiate the xmpp client
74 # register xmpp nodes for the experiment
75 self._enroll_experiment()
76 self._enroll_newexperiment()
78 # register xmpp logger for the experiment
81 def _init_client(self):
82 """ Initialize XMPP Client
85 jid = "%s@%s" % (self._user, self._host)
86 xmpp = OMFClient(jid, self._password)
87 # PROTOCOL_SSLv3 required for compatibility with OpenFire
88 xmpp.ssl_version = ssl.PROTOCOL_SSLv3
90 if xmpp.connect((self._host, self._port)):
91 xmpp.process(block=False)
95 self._message = MessageHandler(self._slice, self._user)
97 msg = "Unable to connect to the XMPP server."
98 self._logger.error(msg)
99 raise RuntimeError(msg)
101 def _enroll_experiment(self):
102 """ Create and Subscribe to the Session Topic
105 xmpp_node = self._exp_session_id
106 self._client.create(xmpp_node)
107 #print "Create experiment sesion id topics !!"
108 self._client.subscribe(xmpp_node)
109 #print "Subscribe to experiment sesion id topics !!"
112 def _enroll_newexperiment(self):
113 """ Publish New Experiment Message
116 address = "/%s/%s/%s/%s" % (self._host, self._xmpp_root, self._slice, self._user)
118 payload = self._message.newexp_function(self._user, address)
119 slice_sid = "/%s/%s" % (self._xmpp_root, self._slice)
120 self._client.publish(payload, slice_sid)
122 def _enroll_logger(self):
123 """ Create and Subscribe to the Logger Topic
126 xmpp_node = self._logger_session_id
127 self._client.create(xmpp_node)
128 self._client.subscribe(xmpp_node)
130 payload = self._message.log_function("2",
131 "nodeHandler::NodeHandler",
133 "OMF Experiment Controller 5.4 (git 529a626)")
134 self._client.publish(payload, xmpp_node)
136 def _host_session_id(self, hostname):
137 """ Return the Topic Name as /xmpp_root/slice/user/hostname
139 :param hostname: Full hrn of the node
143 return "/%s/%s/%s/%s" % (self._xmpp_root, self._slice, self._user, hostname)
145 def _host_resource_id(self, hostname):
146 """ Return the Topic Name as /xmpp_root/slice/resources/hostname
148 :param hostname: Full hrn of the node
152 return "/%s/%s/resources/%s" % (self._xmpp_root, self._slice, hostname)
155 def _exp_session_id(self):
156 """ Return the Topic Name as /xmpp_root/slice/user
159 return "/%s/%s/%s" % (self._xmpp_root, self._slice, self._user)
162 def _logger_session_id(self):
163 """ Return the Topic Name as /xmpp_root/slice/LOGGER
166 return "/%s/%s/%s/LOGGER" % (self._xmpp_root, self._slice, self._user)
168 def delete(self, hostname):
169 """ Delete the topic corresponding to the hostname for this session
171 :param hostname: Full hrn of the node
175 if not hostname in self._hostnames:
178 self._hostnames.remove(hostname)
180 xmpp_node = self._host_session_id(hostname)
181 self._client.delete(xmpp_node)
183 def enroll_host(self, hostname):
184 """ Create and Subscribe to the session topic and the resources corresponding to the hostname
186 :param hostname: Full hrn of the node
190 if hostname in self._hostnames:
193 self._hostnames.append(hostname)
195 xmpp_node = self._host_session_id(hostname)
196 self._client.create(xmpp_node)
197 self._client.subscribe(xmpp_node)
199 xmpp_node = self._host_resource_id(hostname)
200 self._client.subscribe(xmpp_node)
202 payload = self._message.enroll_function("1", "*", "1", hostname)
203 self._client.publish(payload, xmpp_node)
205 def configure(self, hostname, attribute, value):
206 """ Configure attribute on the node
208 :param hostname: Full hrn of the node
210 :param attribute: Attribute that need to be configured (often written as /net/wX/attribute, with X the interface number)
212 :param value: Value of the attribute
216 payload = self._message.configure_function(hostname, value, attribute)
217 xmpp_node = self._host_session_id(hostname)
218 self._client.publish(payload, xmpp_node)
220 def execute(self, hostname, app_id, arguments, path, env):
221 """ Execute command on the node
223 :param hostname: Full hrn of the node
225 :param app_id: Application Id (Any id that represents in a unique way the application)
227 :param arguments: Arguments of the application
229 :param path: Path of the application
231 :param env: Environnement values for the application
235 payload = self._message.execute_function(hostname, app_id, arguments, path, env)
236 xmpp_node = self._host_session_id(hostname)
237 self._client.publish(payload, xmpp_node)
239 def exit(self, hostname, app_id):
240 """ Kill an application started with OMF
242 :param hostname: Full hrn of the node
244 :param app_id: Application Id of the application you want to stop
248 payload = self._message.exit_function(hostname, app_id)
249 xmpp_node = self._host_session_id(hostname)
250 self._client.publish(payload, xmpp_node)
252 def release(self, hostname):
253 """ Delete the session and logger topics. Then disconnect
256 if hostname in self._hostnames:
257 self.delete(hostname)
259 def disconnect(self) :
260 """ Delete the session and logger topics. Then disconnect
263 self._client.delete(self._exp_session_id)
264 self._client.delete(self._logger_session_id)
268 # Wait the send queue to be empty before disconnect
269 self._client.disconnect(wait=True)
270 self._logger.debug(" Disconnected from XMPP Server")
273 class OMFAPIFactory(object):
277 It allows the different RM to use the same xmpp client if they use the same credentials.
278 For the moment, it is focused on Xmpp.
281 # use lock to avoid concurrent access to the Api list at the same times by 2 different threads
282 lock = threading.Lock()
286 def get_api(cls, slice, host, port, password):
289 :param slice: Xmpp Slice Name
291 :param host: Xmpp Server Adress
293 :param port: Xmpp Port (Default : 5222)
295 :param password: Xmpp Password
299 if slice and host and port and password:
300 key = cls._make_key(slice, host, port, password)
303 cls._apis[key]['cnt'] += 1
305 return cls._apis[key]['api']
307 omf_api = cls.create_api(slice, host, port, password)
313 def create_api(cls, slice, host, port, password):
314 """ Create an API if this one doesn't exist yet with this credentials
316 :param slice: Xmpp Slice Name
318 :param host: Xmpp Server Adress
320 :param port: Xmpp Port (Default : 5222)
322 :param password: Xmpp Password
326 omf_api = OMFAPI(slice, host, port, password)
327 key = cls._make_key(slice, host, port, password)
329 cls._apis[key]['api'] = omf_api
330 cls._apis[key]['cnt'] = 1
334 def release_api(cls, slice, host, port, password):
335 """ Release an API with this credentials
337 :param slice: Xmpp Slice Name
339 :param host: Xmpp Server Adress
341 :param port: Xmpp Port (Default : 5222)
343 :param password: Xmpp Password
347 if slice and host and port and password:
348 key = cls._make_key(slice, host, port, password)
350 cls._apis[key]['cnt'] -= 1
351 #print "Api Counter : " + str(cls._apis[key]['cnt'])
352 if cls._apis[key]['cnt'] == 0:
353 omf_api = cls._apis[key]['api']
358 def _make_key(cls, *args):
359 """ Hash the credentials in order to create a key
361 :param args: list of arguments used to create the hash (user, host, port, ...)
362 :type args: list of args
365 skey = "".join(map(str, args))
366 return hashlib.md5(skey).hexdigest()