7 from nepi.testbeds.omf.omf_client import OMFClient
8 from nepi.testbeds.omf.omf_messages import MessageHandler
11 def __init__(self, slice, host, port, password, xmpp_root = None):
12 date = datetime.datetime.now().strftime("%Y-%m-%dt%H.%M.%S")
13 tz = -time.altzone if time.daylight != 0 else -time.timezone
14 date += "%+06.2f" % (tz / 3600) # timezone difference is in seconds
15 self._user = "%s-%s" % (slice, date)
19 self._password = password
21 self._xmpp_root = xmpp_root or "OMF_5.4"
23 self._logger = logging.getLogger("nepi.testbeds.omf")
30 if sys.version_info < (3, 0):
32 sys.setdefaultencoding('utf8')
34 # instantiate the xmpp client
37 # register xmpp nodes for the experiment
38 self._enroll_experiment()
40 # register xmpp logger for the experiment
43 def _init_client(self):
44 jid = "%s@%s" % (self._user, self._host)
45 xmpp = OMFClient(jid, self._password)
46 # PROTOCOL_SSLv3 required for compatibility with OpenFire
47 xmpp.ssl_version = ssl.PROTOCOL_SSLv3
49 if xmpp.connect((self._host, self._port)):
50 xmpp.process(threaded=True)
54 self._message = MessageHandler(self._slice, self._user)
56 msg = "Unable to connect to the XMPP server."
57 self._logger.error(msg)
58 raise RuntimeError(msg)
60 def _enroll_experiment(self):
61 xmpp_node = self._exp_session_id
62 self._client.create(xmpp_node)
63 #print "Create experiment sesion id topics !!"
64 self._client.subscribe(xmpp_node)
65 #print "Subscribe to experiment sesion id topics !!"
68 address = "/%s/%s/%s/%s" % (self._host, self._xmpp_root, self._slice, self._user)
70 payload = self._message.newexpfunction(self._user, address)
71 slice_sid = "/%s/%s" % (self._xmpp_root, self._slice)
72 self._client.publish(payload, slice_sid)
74 def _enroll_logger(self):
75 xmpp_node = self._logger_session_id
76 self._client.create(xmpp_node)
77 self._client.subscribe(xmpp_node)
79 payload = self._message.logfunction("2",
80 "nodeHandler::NodeHandler",
82 "OMF Experiment Controller 5.4 (git 529a626)")
83 self._client.publish(payload, xmpp_node)
85 def _host_session_id(self, hostname):
86 return "/%s/%s/%s/%s" % (self._xmpp_root, self._slice, self._user, hostname)
88 def _host_resource_id(self, hostname):
89 return "/%s/%s/resources/%s" % (self._xmpp_root, self._slice, hostname)
92 def _exp_session_id(self):
93 return "/%s/%s/%s" % (self._xmpp_root, self._slice, self._user)
96 def _logger_session_id(self):
97 return "/%s/%s/%s/LOGGER" % (self._xmpp_root, self._slice, self._user)
99 def delete(self, hostname):
100 if not hostname in self._hostnames:
103 self._hostnames.remove(hostname)
105 xmpp_node = self._host_session_id(hostname)
106 self._client.delete(xmpp_node)
108 def enroll_host(self, hostname):
109 if hostname in self._hostnames:
112 self._hostnames.append(hostname)
114 xmpp_node = self._host_session_id(hostname)
115 self._client.create(xmpp_node)
116 self._client.subscribe(xmpp_node)
118 xmpp_node = self._host_resource_id(hostname)
119 self._client.subscribe(xmpp_node)
121 payload = self._message.enrollfunction("1", "*", "1", hostname)
122 self._client.publish(payload, xmpp_node)
124 def configure(self, hostname, attribute, value):
125 payload = self._message.configurefunction(hostname, value, attribute)
126 xmpp_node = self._host_session_id(hostname)
127 self._client.publish(payload, xmpp_node)
129 def execute(self, hostname, app_id, arguments, path, env):
130 payload = self._message.executefunction(hostname, app_id, arguments, path, env)
131 xmpp_node = self._host_session_id(hostname)
132 self._client.publish(payload, xmpp_node)
134 def exit(self, hostname, app_id):
135 payload = self._message.exitfunction(hostname, app_id)
136 xmpp_node = self._host_session_id(hostname)
137 self._client.publish(payload, xmpp_node)
139 def disconnect(self):
140 self._client.delete(self._exp_session_id)
141 self._client.delete(self._logger_session_id)
143 for hostname in self._hostnames[:]:
144 self.delete(hostname)
147 self._client.disconnect()