7 from neco.resources.omf.omf_client import OMFClient
8 from neco.resources.omf.omf_messages_5_4 import MessageHandler
11 def __init__(self, slice, host, port, password, xmpp_root = None):
12 date = datetime.datetime.now().strftime("%Y-%m-%dt%H.%M.%S")
13 tz = -time.altzone if time.daylight != 0 else -time.timezone
14 date += "%+06.2f" % (tz / 3600) # timezone difference is in seconds
15 self._user = "%s-%s" % (slice, date)
19 self._password = password
21 self._xmpp_root = xmpp_root or "OMF_5.4"
23 self._logger = logging.getLogger("neco.resources.omf")
30 if sys.version_info < (3, 0):
32 sys.setdefaultencoding('utf8')
34 # instantiate the xmpp client
37 # register xmpp nodes for the experiment
38 self._enroll_experiment()
39 self._enroll_newexperiment()
41 # register xmpp logger for the experiment
44 def _init_client(self):
45 jid = "%s@%s" % (self._user, self._host)
46 xmpp = OMFClient(jid, self._password)
47 # PROTOCOL_SSLv3 required for compatibility with OpenFire
48 xmpp.ssl_version = ssl.PROTOCOL_SSLv3
50 if xmpp.connect((self._host, self._port)):
51 xmpp.process(threaded=True)
55 self._message = MessageHandler(self._slice, self._user)
57 msg = "Unable to connect to the XMPP server."
58 self._logger.error(msg)
59 raise RuntimeError(msg)
61 def _enroll_experiment(self):
62 xmpp_node = self._exp_session_id
63 self._client.create(xmpp_node)
64 #print "Create experiment sesion id topics !!"
65 self._client.subscribe(xmpp_node)
66 #print "Subscribe to experiment sesion id topics !!"
69 def _enroll_newexperiment(self):
70 address = "/%s/%s/%s/%s" % (self._host, self._xmpp_root, self._slice, self._user)
72 payload = self._message.newexpfunction(self._user, address)
73 slice_sid = "/%s/%s" % (self._xmpp_root, self._slice)
74 self._client.publish(payload, slice_sid)
76 def _enroll_logger(self):
77 xmpp_node = self._logger_session_id
78 self._client.create(xmpp_node)
79 self._client.subscribe(xmpp_node)
81 payload = self._message.logfunction("2",
82 "nodeHandler::NodeHandler",
84 "OMF Experiment Controller 5.4 (git 529a626)")
85 self._client.publish(payload, xmpp_node)
87 def _host_session_id(self, hostname):
88 return "/%s/%s/%s/%s" % (self._xmpp_root, self._slice, self._user, hostname)
90 def _host_resource_id(self, hostname):
91 return "/%s/%s/resources/%s" % (self._xmpp_root, self._slice, hostname)
94 def _exp_session_id(self):
95 return "/%s/%s/%s" % (self._xmpp_root, self._slice, self._user)
98 def _logger_session_id(self):
99 return "/%s/%s/%s/LOGGER" % (self._xmpp_root, self._slice, self._user)
101 def delete(self, hostname):
102 if not hostname in self._hostnames:
105 self._hostnames.remove(hostname)
107 xmpp_node = self._host_session_id(hostname)
108 self._client.delete(xmpp_node)
110 def enroll_host(self, hostname):
111 if hostname in self._hostnames:
114 self._hostnames.append(hostname)
116 xmpp_node = self._host_session_id(hostname)
117 self._client.create(xmpp_node)
118 self._client.subscribe(xmpp_node)
120 xmpp_node = self._host_resource_id(hostname)
121 self._client.subscribe(xmpp_node)
123 payload = self._message.enrollfunction("1", "*", "1", hostname)
124 self._client.publish(payload, xmpp_node)
126 def configure(self, hostname, attribute, value):
127 payload = self._message.configurefunction(hostname, value, attribute)
128 xmpp_node = self._host_session_id(hostname)
129 self._client.publish(payload, xmpp_node)
131 def execute(self, hostname, app_id, arguments, path, env):
132 payload = self._message.executefunction(hostname, app_id, arguments, path, env)
133 xmpp_node = self._host_session_id(hostname)
134 self._client.publish(payload, xmpp_node)
136 def exit(self, hostname, app_id):
137 payload = self._message.exitfunction(hostname, app_id)
138 xmpp_node = self._host_session_id(hostname)
139 self._client.publish(payload, xmpp_node)
141 def disconnect(self):
142 self._client.delete(self._exp_session_id)
143 self._client.delete(self._logger_session_id)
145 for hostname in self._hostnames[:]:
146 self.delete(hostname)
149 self._client.disconnect()
152 class OMFAPIFactory(object):
153 # XXX: put '_apis' instead of '_Api'
157 def get_api(cls, slice, host, port, password):
158 if slice and host and port and password:
159 key = cls._hash_api(slice, host, port)
163 return cls.create_api(slice, host, port, password)
167 def create_api(cls, slice, host, port, password):
168 OmfApi = OMFAPI(slice, host, port, password)
169 key = cls._hash_api(slice, host, port)
170 cls._Api[key] = OmfApi
173 # XXX: this is not a hash :)
174 # From wikipedia: "A hash function is any algorithm or subroutine that maps large data
175 # sets of variable length to smaller data sets of a fixed length."
176 # The idea is to apply a function to get a smaller string. Use hashlib instead.
179 # res = slice + "_" + host + "_" + port
180 # hashlib.md5(res).hexdigest()
182 # XXX: change method name for 'make_key'
184 def _hash_api(cls, slice, host, port):
185 res = slice + "_" + host + "_" + port