dccb80ccdc66c13b2d415fdb57c5271c41295ff4
[nepi.git] / src / nepi / testbeds / omf / execute.py
1 # -*- coding: utf-8 -*-
2
3 from constants import TESTBED_ID, TESTBED_VERSION
4 from nepi.core import testbed_impl
5 from nepi.util.constants import TIME_NOW
6
7 import datetime
8 import logging
9 import os
10 import sys
11 import ssl
12 import time
13
14 from nepi.testbeds.omf.omf_client import OMFClient
15 from nepi.testbeds.omf.omf_messages import MessageHandler
16
17
18 class TestbedController(testbed_impl.TestbedController):
19     def __init__(self):
20         super(TestbedController, self).__init__(TESTBED_ID, TESTBED_VERSION)
21         self._slice = None
22         self._user = None
23         self._host = None
24         self._xmpp = None
25         self._message = None
26         self._home = None
27         
28         self._logger = logging.getLogger('nepi.testbeds.omf')
29  
30     def do_setup(self):
31         if self._attributes.get_attribute_value("enableDebug") == True:
32             self._logger.setLevel(logging.DEBUG)
33
34         # create home
35         self._home = self._attributes.\
36             get_attribute_value("homeDirectory")
37         home = os.path.normpath(self._home)
38         if not os.path.exists(home):
39             os.makedirs(home, 0755)
40     
41         # instantiate the xmpp client
42         self._init_client()
43         # register xmpp nodes for the experiment
44         self._publish_and_enroll_experiment()
45         # register xmpp logger for the experiment
46         self._publish_and_enroll_logger()
47
48         super(TestbedController, self).do_setup()
49
50     def set(self, guid, name, value, time = TIME_NOW):
51         super(TestbedController, self).set(guid, name, value, time)
52         pass
53
54     def get(self, guid, name, time = TIME_NOW):
55         value = super(TestbedController, self).get(guid, name, time)
56         return "MISS"
57
58     def shutdown(self):
59         node_sid = "/OMF/%s/%s" % (self._slice, self._user)
60         self._clean_up(node_sid)
61         logger = "/OMF/%s/%s/LOGGER" % (self._slice, self._user)
62         self._clean_up(logger)
63
64         for hostname in self._elements.values():
65             if not hostname:
66                 continue
67             node_sid = self._host_sid(hostname)
68             self._clean_up(node_sid)
69             #node_res = self._host_res(hostname)
70             #self._clean_up(node_res)
71
72         time.sleep(5)
73         self._xmpp.disconnect()
74
75     def _host_sid(self, hostname):
76         return "/OMF/%s/%s/%s" % (self._slice, self._user, hostname)
77
78     def _host_res(self, hostname):
79         return "/OMF/%s/resources/%s" % (self._slice, hostname)
80
81     def _init_client(self):
82         self._slice = self._attributes.get_attribute_value("xmppSlice")
83         self._host = self._attributes.get_attribute_value("xmppHost")
84         port = self._attributes.get_attribute_value("xmppPort")
85         password = self._attributes.get_attribute_value("xmppPassword")
86        
87         #date = "2012-04-18t16.06.34+02.00"
88         date = datetime.datetime.now().strftime("%Y-%m-%dt%H.%M.%S+02.00")
89         self._user = "%s-%s" % (self._slice, date)
90         jid = "%s@%s" % (self._user, self._host)
91
92         xmpp = OMFClient(jid, password)
93         # PROTOCOL_SSLv3 required for compatibility with OpenFire
94         xmpp.ssl_version = ssl.PROTOCOL_SSLv3
95
96         if xmpp.connect((self._host, port)):
97             xmpp.process(threaded=True)
98             while not xmpp.ready:
99                 time.sleep(1)
100             self._xmpp = xmpp
101             self._message = MessageHandler(self._slice, self._user)
102         else:
103             msg = "Unable to connect to the XMPP server."
104             self._logger.error(msg)
105             raise RuntimeError(msg)
106
107     def _publish_and_enroll_experiment(self):
108         node_sid = "/OMF/%s/%s" % (self._slice, self._user)
109         self._create_and_subscribe(node_sid)  
110
111         node_slice = "/OMF/%s" % (self._slice)
112         address = "/%s/OMF/%s/%s" % (self._host, self._slice, self._user)
113         payload = self._message.newexpfunction(self._user, address)
114         self._xmpp.publish(payload, node_slice)
115    
116     def _publish_and_enroll_logger(self):
117         logger = "/OMF/%s/%s/LOGGER" % (self._slice, self._user)
118         self._create_and_subscribe(logger)
119
120         payload = self._message.logfunction("2", 
121                 "nodeHandler::NodeHandler", 
122                 "INFO", 
123                 "OMF Experiment Controller 5.4 (git 529a626)")
124         self._xmpp.publish(payload, logger)
125
126     def _clean_up(self, xmpp_node):
127         self._xmpp.delete(xmpp_node)
128
129         if sys.version_info < (3, 0):
130             reload(sys)
131             sys.setdefaultencoding('utf8')
132
133     def _create_and_subscribe(self, xmpp_node):
134         self._xmpp.suscriptions()
135         self._xmpp.create(xmpp_node)
136         self._xmpp.subscribe(xmpp_node)
137         self._xmpp.nodes()
138
139     def _publish_and_enroll_host(self, hostname):
140         node_sid =  self._host_sid(hostname)
141         self._create_and_subscribe(node_sid)  
142         
143         node_res =  self._host_res(hostname)
144         self._create_and_subscribe(node_res)  
145
146         payload = self._message.enrollfunction("1", "*", "1", hostname)
147         self._xmpp.publish(payload, node_res)
148
149     def _publish_configure(self, hostname, attribute, value): 
150         payload = self._message.configurefunction(hostname, value, attribute)
151         node_sid =  self._host_sid(hostname)
152         self._xmpp.publish(payload, node_sid)
153
154     def _publish_execute(self, hostname, app_id, arguments, path):
155         payload = self._message.executefunction(hostname, app_id, arguments, path)
156         node_sid =  self._host_sid(hostname)
157         self._xmpp.publish(payload, node_sid)
158
159
160