Adding environment setting features for applications under OMF
authorAlina Quereilhac <alina.quereilhac@inria.fr>
Mon, 8 Oct 2012 17:16:39 +0000 (19:16 +0200)
committerAlina Quereilhac <alina.quereilhac@inria.fr>
Mon, 8 Oct 2012 17:16:39 +0000 (19:16 +0200)
DEPENDENCIES
src/nepi/testbeds/omf/execute.py
src/nepi/testbeds/omf/metadata.py
src/nepi/testbeds/omf/omf_api.py
src/nepi/testbeds/omf/omf_client.py
src/nepi/testbeds/omf/omf_messages.py

index 27285a7..1c83aa4 100644 (file)
@@ -1 +1,2 @@
 * ipaddr-2.1.7 : http://ipaddr-py.googlecode.com/files/ipaddr-2.1.7.tar.gz
+* sleekxmpp-1.0.1dev: 
index 8428dcd..db2e810 100644 (file)
@@ -34,7 +34,7 @@ class TestbedController(testbed_impl.TestbedController):
         port = self._attributes.get_attribute_value("xmppPort")
         password = self._attributes.get_attribute_value("xmppPassword")
 
-        self._api = OmfAPI(slice, host, port, password, debug)
+        self._api = OmfAPI(slice, host, port, password)
  
         super(TestbedController, self).do_setup()
 
index 3c71b9a..4b1734f 100644 (file)
@@ -58,13 +58,20 @@ class OmfApplication(OmfResource):
         self.app_id = None
         self.arguments = None
         self.path = None
+        self.env = None
 
     def start(self):
         node = self.tc.elements.get(self._node_guid)
         self.tc.api.execute(node.hostname, 
                 self.appId, 
                 self.arguments, 
-                self.path)
+                self.path,
+                self.env)
+
+    def stop(self):
+        node = self.tc.elements.get(self._node_guid)
+        self.tc.api.exit(node.hostname, 
+                self.appId) 
 
     def status(self):
         if guid not in testbed_instance.elements.keys():
@@ -202,6 +209,13 @@ attributes = dict({
                 "flags": Attribute.ExecReadOnly | Attribute.ExecImmutable,
                 "validation_function": validation.is_string
             }),
+    "env": dict({
+                "name": "env",
+                "help": "String with space separated values of environment variables to set before starting application (e.g 'FOO=foo BAR=bar')",
+                "type": Attribute.STRING,
+                "flags": Attribute.ExecReadOnly | Attribute.ExecImmutable,
+                "validation_function": validation.is_string
+            }),
     "hostname": dict({
                 "name": "hostname",
                 "help": "Hostname for the target OMF node",
@@ -287,7 +301,7 @@ factories_info = dict({
             "start_function": start,
             "stop_function": stop,
             "status_function": status,
-            "box_attributes": ["appId", "arguments", "path"],
+            "box_attributes": ["appId", "arguments", "path", "env"],
             "connector_types": ["node"],
             "tags": [tags.APPLICATION],
         }),
index ac7946d..d76fba3 100644 (file)
@@ -8,7 +8,7 @@ from nepi.testbeds.omf.omf_client import OMFClient
 from nepi.testbeds.omf.omf_messages import MessageHandler
 
 class OmfAPI(object):
-    def __init__(self, slice, host, port, password, debug):
+    def __init__(self, slice, host, port, password):
         date = datetime.datetime.now().strftime("%Y-%m-%dt%H.%M.%S")
         tz = -time.altzone if time.daylight != 0 else -time.timezone
         date += "%+06.2f" % (tz / 3600) # timezone difference is in seconds
@@ -19,9 +19,7 @@ class OmfAPI(object):
         self._password = password
         self._hostnames = []
 
-        self._logger = logging.getLogger('nepi.testbeds.omfapi')
-        if debug:
-            self._logger.setLevel(logging.DEBUG)
+        self._logger = logging.getLogger("nepi.testbeds.omf")
 
         # OMF xmpp client
         self._client = None
@@ -123,8 +121,13 @@ class OmfAPI(object):
         xmpp_node =  self._host_session_id(hostname)
         self._client.publish(payload, xmpp_node)
 
-    def execute(self, hostname, app_id, arguments, path):
-        payload = self._message.executefunction(hostname, app_id, arguments, path)
+    def execute(self, hostname, app_id, arguments, path, env):
+        payload = self._message.executefunction(hostname, app_id, arguments, path, env)
+        xmpp_node =  self._host_session_id(hostname)
+        self._client.publish(payload, xmpp_node)
+
+    def exit(self, hostname, app_id):
+        payload = self._message.exitfunction(hostname, app_id)
         xmpp_node =  self._host_session_id(hostname)
         self._client.publish(payload, xmpp_node)
 
index e244d1b..48568c8 100644 (file)
@@ -2,7 +2,6 @@ import logging
 import sleekxmpp
 from sleekxmpp.exceptions import IqError, IqTimeout
 import traceback
-from xml.etree import cElementTree as ET
 
 class OMFClient(sleekxmpp.ClientXMPP):
     def __init__(self, jid, password):
@@ -18,6 +17,9 @@ class OMFClient(sleekxmpp.ClientXMPP):
 
         self.add_event_handler("session_start", self.start)
         self.add_event_handler("register", self.register)
+        self.add_event_handler("pubsub_publish", self.handle_omf_message)
+        
+        self._logger = logging.getLogger("nepi.testbeds.omf")
     
     @property
     def ready(self):
@@ -30,7 +32,7 @@ class OMFClient(sleekxmpp.ClientXMPP):
 
     def register(self, iq):
         if self._registered:
-            logging.info("%s already registered!" % self.boundjid)
+            self._logger.info("%s already registered!" % self.boundjid)
             return 
 
         resp = self.Iq()
@@ -40,46 +42,45 @@ class OMFClient(sleekxmpp.ClientXMPP):
 
         try:
             resp.send(now=True)
-            logging.info("Account created for %s!" % self.boundjid)
+            self._logger.info("Account created for %s!" % self.boundjid)
             self._registered = True
         except IqError as e:
-            logging.error("Could not register account: %s" %
+            self._logger.error("Could not register account: %s" %
                     e.iq['error']['text'])
         except IqTimeout:
-            logging.error("No response from server.")
+            self._logger.error("No response from server.")
 
     def unregister(self):
         try:
             self.plugin['xep_0077'].cancel_registration(
                 ifrom=self.boundjid.full)
-            logging.info("Account unregistered for %s!" % self.boundjid)
+            self._logger.info("Account unregistered for %s!" % self.boundjid)
         except IqError as e:
-            logging.error("Could not unregister account: %s" %
+            self._logger.error("Could not unregister account: %s" %
                     e.iq['error']['text'])
         except IqTimeout:
-            logging.error("No response from server.")
+            self._logger.error("No response from server.")
 
     def nodes(self):
         try:
             result = self['xep_0060'].get_nodes(self._server)
             for item in result['disco_items']['items']:
-                print(' - %s' % str(item))
+                self._logger.info(' - %s' % str(item))
             return result
         except:
-            print traceback.format_exc()
-            logging.error('Could not retrieve node list.')
+            error = traceback.format_exc()
+            self._logger.error('Could not retrieve node list.\ntraceback:\n%s', error)
 
-    def suscriptions(self):
+    def subscriptions(self):
         try:
             result = self['xep_0060'].get_subscriptions(self._server)
                 #self.boundjid.full)
             for node in result['node']:
-                print(' - %s' % str(node))
+                self._logger.info(' - %s' % str(node))
             return result
         except:
-            print traceback.format_exc()
-            logging.error('Could not retrieve suscriptions.')
-
+            error = traceback.format_exc()
+            self._logger.error('Could not retrieve subscriptions.\ntraceback:\n%s', error)
 
     def create(self, node):
         config = self['xep_0004'].makeForm('submit')
@@ -93,67 +94,81 @@ class OMFClient(sleekxmpp.ClientXMPP):
         try:
             self['xep_0060'].create_node(self._server, node, config = config)
         except:
-            print traceback.format_exc()
-            logging.error('Could not create node: %s' % node)
+            error = traceback.format_exc()
+            self._logger.error('Could not create node: %s\ntraceback:\n%s' % (node, error))
 
     def delete(self, node):
         try:
             self['xep_0060'].delete_node(self._server, node)
-            print('Deleted node: %s' % node)
+            self._logger.info('Deleted node: %s' % node)
         except:
-            print traceback.format_exc()
-            logging.error('Could not delete node: %s' % node)
-
+            error = traceback.format_exc()
+            self._logger.error('Could not delete node: %s\ntraceback:\n%s' % (node, error))
     
     def publish(self, data, node):
         try:
             result = self['xep_0060'].publish(self._server,node,payload=data)
-            id = result['pubsub']['publish']['item']['id']
-            #print('Published at item id: %s' % id)
+            id = result['pubsub']['publish']['item']['id']
+            # print('Published at item id: %s' % id)
         except:
-            print traceback.format_exc()
-            logging.error('Could not publish to: %s' % self.boundjid)
+            error = traceback.format_exc()
+            self._logger.error('Could not publish to: %s\ntraceback:\n%s' \
+                    % (self.boundjid, error))
 
     def get(self, data):
         try:
             result = self['xep_0060'].get_item(self._server, self.boundjid,
                 data)
             for item in result['pubsub']['items']['substanzas']:
-                print('Retrieved item %s: %s' % (item['id'], tostring(item['payload'])))
+                self._logger.info('Retrieved item %s: %s' % (item['id'], 
+                    tostring(item['payload'])))
         except:
-            print traceback.format_exc()
-            logging.error('Could not retrieve item %s from node %s' % (data, self.boundjid))
+            error = traceback.format_exc()
+            self._logger.error('Could not retrieve item %s from node %s\ntraceback:\n%s' \
+                    % (data, self.boundjid, error))
 
     def retract(self, data):
         try:
             result = self['xep_0060'].retract(self._server, self.boundjid, data)
-            print('Retracted item %s from node %s' % (data, self.boundjid))
+            self._logger.info('Retracted item %s from node %s' % (data, self.boundjid))
         except:
-            print traceback.format_exc()
-            logging.error('Could not retract item %s from node %s' % (data, self.boundjid))
+            error = traceback.format_exc()
+            self._logger.error('Could not retract item %s from node %s\ntraceback:\n%s' \
+                    % (data, self.boundjid, error))
 
     def purge(self):
         try:
             result = self['xep_0060'].purge(self._server, self.boundjid)
-            print('Purged all items from node %s' % self.boundjid)
+            self._logger.info('Purged all items from node %s' % self.boundjid)
         except:
-            print traceback.format_exc()
-            logging.error('Could not purge items from node %s' % self.boundjid)
+            error = traceback.format_exc()
+            self._logger.error('Could not purge items from node %s\ntraceback:\n%s' \
+                    % (self.boundjid, error))
 
     def subscribe(self, node):
         try:
             result = self['xep_0060'].subscribe(self._server, node)
-            print('Subscribed %s to node %s' % (self.boundjid.bare, self.boundjid))
+            self._logger.info('Subscribed %s to node %s' \
+                    % (self.boundjid.bare, self.boundjid))
         except:
-            print traceback.format_exc()
-            logging.error('Could not subscribe %s to node %s' % (self.boundjid.bare, node))
+            error = traceback.format_exc()
+            self._logger.error('Could not subscribe %s to node %s\ntraceback:\n%s' \
+                    % (self.boundjid.bare, node, error))
 
     def unsubscribe(self, node):
         try:
             result = self['xep_0060'].unsubscribe(self._server, node)
-            print('Unsubscribed %s from node %s' % (self.boundjid.bare, node))
+            self._logger.info('Unsubscribed %s from node %s' % (self.boundjid.bare, node))
         except:
-            print traceback.format_exc()
-            logging.error('Could not unsubscribe %s from node %s' % (self.boundjid.bare, node))
+            error = traceback.format_exc()
+            self._logger.error('Could not unsubscribe %s from node %s\ntraceback:\n%s' \
+                    % (self.boundjid.bare, node, error))
+
+    def handle_omf_message(self, iq):
+        for i in iq['pubsub_event']['items']:
+            self._logger.debug(i)
+
+            #<item xmlns="http://jabber.org/protocol/pubsub#event" id="dFbv6WRlMuKghJ0"><omf-message xmlns="http://jabber.org/protocol/pubsub"><LOGGING id="&apos;omf-payload&apos;"><LEVEL>2</LEVEL><SLICEID>default_slice</SLICEID><LOGGER>nodeHandler::NodeHandler</LOGGER><EXPID>default_slice-2012-09-28t16.22.17+02.00</EXPID><LEVEL_NAME>INFO</LEVEL_NAME><DATA>OMF Experiment Controller 5.4 (git 529a626)</DATA></LOGGING></omf-message></item>
+
 
 
index f4c99c5..77c53dc 100644 (file)
@@ -31,7 +31,6 @@ class MessageHandler():
         print "init" + self.ExpID +"  "+ self.SliceID
         pass
 
-
     def Mid(self, parent, keyword):
         mid = ET.SubElement(parent, keyword)
         mid.set("id", "\'omf-payload\'")
@@ -42,11 +41,10 @@ class MessageHandler():
         mtext.text = text
         return mtext
 
-
-    def executefunction(self, target, appid, cmdlineargs, path):
+    def executefunction(self, target, appid, cmdlineargs, path, env):
         payload = ET.Element("omf-message")
         execute = self.Mid(payload,"EXECUTE")
-        env = self.Mtext(execute, "ENV", "")
+        env = self.Mtext(execute, "ENV", env)
         sliceid = self.Mtext(execute,"SLICEID",self.SliceID)
         expid = self.Mtext(execute,"EXPID",self.ExpID)
         target = self.Mtext(execute,"TARGET",target)
@@ -55,6 +53,15 @@ class MessageHandler():
         path = self.Mtext(execute,"PATH",path)
         return payload
 
+    def exitfunction(self, target, appid):
+        payload = ET.Element("omf-message")
+        execute = self.Mid(payload,"EXIT")
+        sliceid = self.Mtext(execute,"SLICEID",self.SliceID)
+        expid = self.Mtext(execute,"EXPID",self.ExpID)
+        target = self.Mtext(execute,"TARGET",target)
+        appid = self.Mtext(execute,"APPID",appid)
+        return payload
+
     def configurefunction(self, target, value, path):
         payload = ET.Element("omf-message")
         config = self.Mid(payload, "CONFIGURE")
@@ -104,17 +111,6 @@ class MessageHandler():
         target = self.Mtext(noop,"TARGET",target)
         return payload
 
-    def enrollfunction(self, enrollkey, image, index, target ):
-        payload = ET.Element("omf-message")
-        enroll = self.Mid(payload,"ENROLL")
-        enrollkey = self.Mtext(enroll,"ENROLLKEY",enrollkey)
-        sliceid = self.Mtext(enroll,"SLICEID",self.SliceID)
-        image = self.Mtext(enroll,"IMAGE",image)
-        expid = self.Mtext(enroll,"EXPID",self.ExpID)
-        index = self.Mtext(enroll,"INDEX",index)
-        target = self.Mtext(enroll,"TARGET",target)
-        return payload
-
     def newexpfunction(self, experimentid, address):
         payload = ET.Element("omf-message")
         newexp = self.Mid(payload,"EXPERIMENT_NEW")