Added routes to OMF nodes
[nepi.git] / src / nepi / testbeds / omf / omf_api.py
1 import datetime
2 import logging
3 import ssl
4 import sys
5 import time
6
7 from nepi.testbeds.omf.omf_client import OMFClient
8 from nepi.testbeds.omf.omf_messages import MessageHandler
9
10 class OmfAPI(object):
11     def __init__(self, slice, host, port, password, xmpp_root = None):
12         date = datetime.datetime.now().strftime("%Y-%m-%dt%H.%M.%S")
13         tz = -time.altzone if time.daylight != 0 else -time.timezone
14         date += "%+06.2f" % (tz / 3600) # timezone difference is in seconds
15         self._user = "%s-%s" % (slice, date)
16         self._slice = slice
17         self._host = host
18         self._port = port
19         self._password = password
20         self._hostnames = []
21         self._xmpp_root = xmpp_root or "OMF"
22
23         self._logger = logging.getLogger("nepi.testbeds.omf")
24
25         # OMF xmpp client
26         self._client = None
27         # message handler
28         self._message = None
29
30         if sys.version_info < (3, 0):
31             reload(sys)
32             sys.setdefaultencoding('utf8')
33
34         # instantiate the xmpp client
35         self._init_client()
36
37         # register xmpp nodes for the experiment
38         self._enroll_experiment()
39
40         # register xmpp logger for the experiment
41         self._enroll_logger()
42
43     def _init_client(self):
44         jid = "%s@%s" % (self._user, self._host)
45         xmpp = OMFClient(jid, self._password)
46         # PROTOCOL_SSLv3 required for compatibility with OpenFire
47         xmpp.ssl_version = ssl.PROTOCOL_SSLv3
48
49         if xmpp.connect((self._host, self._port)):
50             xmpp.process(threaded=True)
51             while not xmpp.ready:
52                 time.sleep(1)
53             self._client = xmpp
54             self._message = MessageHandler(self._slice, self._user)
55         else:
56             msg = "Unable to connect to the XMPP server."
57             self._logger.error(msg)
58             raise RuntimeError(msg)
59
60     def _enroll_experiment(self):
61         xmpp_node = self._exp_session_id
62         self._client.create(xmpp_node)
63         self._client.subscribe(xmpp_node)
64
65         address = "/%s/%s/%s/%s" % (self._host, self._xmpp_root, self._slice, self._user)
66         payload = self._message.newexpfunction(self._user, address)
67         slice_sid = "/%s/%s" % (self._xmpp_root, self._slice)
68         self._client.publish(payload, slice_sid)
69
70     def _enroll_logger(self):
71         xmpp_node = self._logger_session_id
72         self._client.create(xmpp_node)
73         self._client.subscribe(xmpp_node)
74
75         payload = self._message.logfunction("2", 
76                 "nodeHandler::NodeHandler", 
77                 "INFO", 
78                 "OMF Experiment Controller 5.4 (git 529a626)")
79         self._client.publish(payload, xmpp_node)
80
81     def _host_session_id(self, hostname):
82         return "/%s/%s/%s/%s" % (self._xmpp_root, self._slice, self._user, hostname)
83
84     def _host_resource_id(self, hostname):
85         return "/%s/%s/resources/%s" % (self._xmpp_root, self._slice, hostname)
86
87     @property
88     def _exp_session_id(self):
89         return "/%s/%s/%s" % (self._xmpp_root, self._slice, self._user)
90
91     @property
92     def _logger_session_id(self):
93         return "/%s/%s/%s/LOGGER" % (self._xmpp_root, self._slice, self._user)
94
95     def delete(self, hostname):
96         if not hostname in self._hostnames:
97             return
98
99         self._hostnames.remove(hostname)
100
101         xmpp_node = self._host_session_id(hostname)
102         self._client.delete(xmpp_node)
103
104     def enroll_host(self, hostname):
105         if hostname in self._hostnames:
106             return 
107
108         self._hostnames.append(hostname)
109
110         xmpp_node =  self._host_session_id(hostname)
111         self._client.create(xmpp_node)
112         self._client.subscribe(xmpp_node)
113
114         xmpp_node =  self._host_resource_id(hostname)
115         self._client.subscribe(xmpp_node)
116
117         payload = self._message.enrollfunction("1", "*", "1", hostname)
118         self._client.publish(payload, xmpp_node)
119
120     def configure(self, hostname, attribute, value): 
121         payload = self._message.configurefunction(hostname, value, attribute)
122         xmpp_node =  self._host_session_id(hostname)
123         self._client.publish(payload, xmpp_node)
124
125     def execute(self, hostname, app_id, arguments, path, env):
126         payload = self._message.executefunction(hostname, app_id, arguments, path, env)
127         xmpp_node =  self._host_session_id(hostname)
128         self._client.publish(payload, xmpp_node)
129
130     def exit(self, hostname, app_id):
131         payload = self._message.exitfunction(hostname, app_id)
132         xmpp_node =  self._host_session_id(hostname)
133         self._client.publish(payload, xmpp_node)
134
135     def disconnect(self):
136         self._client.delete(self._exp_session_id)
137         self._client.delete(self._logger_session_id)
138
139         for hostname in self._hostnames[:]:
140             self.delete(hostname)
141
142         time.sleep(5)
143         self._client.disconnect()
144