X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=src%2Fneco%2Fresources%2Fomf%2Fomf_api.py;h=7c71090ad0b3482e13ae10d79f4f46c060923811;hb=8f6fe89985578231158f590990794be52367c4e7;hp=38aa72a134031157ff7d135950ee0dfc7153d268;hpb=4e6d8622b6b960689f2ef6b7e54f81535a5fd854;p=nepi.git diff --git a/src/neco/resources/omf/omf_api.py b/src/neco/resources/omf/omf_api.py index 38aa72a1..7c71090a 100644 --- a/src/neco/resources/omf/omf_api.py +++ b/src/neco/resources/omf/omf_api.py @@ -3,6 +3,9 @@ import logging import ssl import sys import time +import hashlib +import neco +import threading from neco.resources.omf.omf_client import OMFClient from neco.resources.omf.omf_messages_5_4 import MessageHandler @@ -28,6 +31,20 @@ class OMFAPI(object): """ def __init__(self, slice, host, port, password, xmpp_root = None): + """ + + :param slice: Xmpp Slice + :type slice: Str + :param host: Xmpp Server + :type host: Str + :param port: Xmpp Port + :type port: Str + :param password: Xmpp password + :type password: Str + :param xmpp_root: Root of the Xmpp Topic Architecture + :type xmpp_root: Str + + """ date = datetime.datetime.now().strftime("%Y-%m-%dt%H.%M.%S") tz = -time.altzone if time.daylight != 0 else -time.timezone date += "%+06.2f" % (tz / 3600) # timezone difference is in seconds @@ -39,7 +56,8 @@ class OMFAPI(object): self._hostnames = [] self._xmpp_root = xmpp_root or "OMF_5.4" - self._logger = logging.getLogger("neco.resources.omf") + self._logger = logging.getLogger("neco.omf.omfApi ") + self._logger.setLevel(neco.LOGLEVEL) # OMF xmpp client self._client = None @@ -70,7 +88,7 @@ class OMFAPI(object): xmpp.ssl_version = ssl.PROTOCOL_SSLv3 if xmpp.connect((self._host, self._port)): - xmpp.process(threaded=True) + xmpp.process(block=False) while not xmpp.ready: time.sleep(1) self._client = xmpp @@ -96,8 +114,8 @@ class OMFAPI(object): """ address = "/%s/%s/%s/%s" % (self._host, self._xmpp_root, self._slice, self._user) - print address - payload = self._message.newexpfunction(self._user, address) + #print address + payload = self._message.newexp_function(self._user, address) slice_sid = "/%s/%s" % (self._xmpp_root, self._slice) self._client.publish(payload, slice_sid) @@ -109,7 +127,7 @@ class OMFAPI(object): self._client.create(xmpp_node) self._client.subscribe(xmpp_node) - payload = self._message.logfunction("2", + payload = self._message.log_function("2", "nodeHandler::NodeHandler", "INFO", "OMF Experiment Controller 5.4 (git 529a626)") @@ -181,7 +199,7 @@ class OMFAPI(object): xmpp_node = self._host_resource_id(hostname) self._client.subscribe(xmpp_node) - payload = self._message.enrollfunction("1", "*", "1", hostname) + payload = self._message.enroll_function("1", "*", "1", hostname) self._client.publish(payload, xmpp_node) def configure(self, hostname, attribute, value): @@ -195,7 +213,7 @@ class OMFAPI(object): :type value: str """ - payload = self._message.configurefunction(hostname, value, attribute) + payload = self._message.configure_function(hostname, value, attribute) xmpp_node = self._host_session_id(hostname) self._client.publish(payload, xmpp_node) @@ -214,7 +232,7 @@ class OMFAPI(object): :type env: str """ - payload = self._message.executefunction(hostname, app_id, arguments, path, env) + payload = self._message.execute_function(hostname, app_id, arguments, path, env) xmpp_node = self._host_session_id(hostname) self._client.publish(payload, xmpp_node) @@ -227,34 +245,42 @@ class OMFAPI(object): :type app_id: str """ - payload = self._message.exitfunction(hostname, app_id) + payload = self._message.exit_function(hostname, app_id) xmpp_node = self._host_session_id(hostname) self._client.publish(payload, xmpp_node) - def disconnect(self): - """ Delete the sesion and logger topic and disconnect + def release(self, hostname): + """ Delete the session and logger topics. Then disconnect + + """ + if hostname in self._hostnames: + self.delete(hostname) + + def disconnect(self) : + """ Delete the session and logger topics. Then disconnect """ self._client.delete(self._exp_session_id) self._client.delete(self._logger_session_id) - for hostname in self._hostnames[:]: - self.delete(hostname) - time.sleep(1) - self._client.disconnect() + + # Wait the send queue to be empty before disconnect + self._client.disconnect(wait=True) + self._logger.debug(" Disconnected from XMPP Server") class OMFAPIFactory(object): """ .. note:: - It allows the different RM to use the same xmpp client if they use the same credentials. For the moment, it is focused on Xmpp. + It allows the different RM to use the same xmpp client if they use the same credentials. + For the moment, it is focused on Xmpp. """ - - # XXX: put '_apis' instead of '_Api' - _Api = dict() + # use lock to avoid concurrent access to the Api list at the same times by 2 different threads + lock = threading.Lock() + _apis = dict() @classmethod def get_api(cls, slice, host, port, password): @@ -271,11 +297,16 @@ class OMFAPIFactory(object): """ if slice and host and port and password: - key = cls._hash_api(slice, host, port) - if key in cls._Api: - return cls._Api[key] + key = cls._make_key(slice, host, port, password) + cls.lock.acquire() + if key in cls._apis: + cls._apis[key]['cnt'] += 1 + cls.lock.release() + return cls._apis[key]['api'] else : - return cls.create_api(slice, host, port, password) + omf_api = cls.create_api(slice, host, port, password) + cls.lock.release() + return omf_api return None @classmethod @@ -292,24 +323,16 @@ class OMFAPIFactory(object): :type password: str """ - OmfApi = OMFAPI(slice, host, port, password) - key = cls._hash_api(slice, host, port) - cls._Api[key] = OmfApi - return OmfApi - - # XXX: this is not a hash :) - # From wikipedia: "A hash function is any algorithm or subroutine that maps large data - # sets of variable length to smaller data sets of a fixed length." - # The idea is to apply a function to get a smaller string. Use hashlib instead. - # e.g: - # import hashlib - # res = slice + "_" + host + "_" + port - # hashlib.md5(res).hexdigest() - # - # XXX: change method name for 'make_key' + omf_api = OMFAPI(slice, host, port, password) + key = cls._make_key(slice, host, port, password) + cls._apis[key] = {} + cls._apis[key]['api'] = omf_api + cls._apis[key]['cnt'] = 1 + return omf_api + @classmethod - def _hash_api(cls, slice, host, port): - """ Hash the credentials in order to create a key + def release_api(cls, slice, host, port, password): + """ Release an API with this credentials :param slice: Xmpp Slice Name :type slice: str @@ -317,12 +340,30 @@ class OMFAPIFactory(object): :type host: str :param port: Xmpp Port (Default : 5222) :type port: str + :param password: Xmpp Password + :type password: str """ - res = slice + "_" + host + "_" + port - return res + if slice and host and port and password: + key = cls._make_key(slice, host, port, password) + if key in cls._apis: + cls._apis[key]['cnt'] -= 1 + #print "Api Counter : " + str(cls._apis[key]['cnt']) + if cls._apis[key]['cnt'] == 0: + omf_api = cls._apis[key]['api'] + omf_api.disconnect() + @classmethod + def _make_key(cls, *args): + """ Hash the credentials in order to create a key + + :param args: list of arguments used to create the hash (user, host, port, ...) + :type args: list of args + + """ + skey = "".join(map(str, args)) + return hashlib.md5(skey).hexdigest()