From: Alina Quereilhac Date: Mon, 8 Oct 2012 17:16:39 +0000 (+0200) Subject: Adding environment setting features for applications under OMF X-Git-Tag: nepi-3.0.0~144 X-Git-Url: http://git.onelab.eu/?a=commitdiff_plain;h=6adcdc48be0c061d3a0ae91344f22d3d053f03b5;p=nepi.git Adding environment setting features for applications under OMF --- diff --git a/DEPENDENCIES b/DEPENDENCIES index 27285a78..1c83aa4d 100644 --- a/DEPENDENCIES +++ b/DEPENDENCIES @@ -1 +1,2 @@ * ipaddr-2.1.7 : http://ipaddr-py.googlecode.com/files/ipaddr-2.1.7.tar.gz +* sleekxmpp-1.0.1dev: diff --git a/src/nepi/testbeds/omf/execute.py b/src/nepi/testbeds/omf/execute.py index 8428dcd5..db2e8107 100644 --- a/src/nepi/testbeds/omf/execute.py +++ b/src/nepi/testbeds/omf/execute.py @@ -34,7 +34,7 @@ class TestbedController(testbed_impl.TestbedController): port = self._attributes.get_attribute_value("xmppPort") password = self._attributes.get_attribute_value("xmppPassword") - self._api = OmfAPI(slice, host, port, password, debug) + self._api = OmfAPI(slice, host, port, password) super(TestbedController, self).do_setup() diff --git a/src/nepi/testbeds/omf/metadata.py b/src/nepi/testbeds/omf/metadata.py index 3c71b9a8..4b1734fa 100644 --- a/src/nepi/testbeds/omf/metadata.py +++ b/src/nepi/testbeds/omf/metadata.py @@ -58,13 +58,20 @@ class OmfApplication(OmfResource): self.app_id = None self.arguments = None self.path = None + self.env = None def start(self): node = self.tc.elements.get(self._node_guid) self.tc.api.execute(node.hostname, self.appId, self.arguments, - self.path) + self.path, + self.env) + + def stop(self): + node = self.tc.elements.get(self._node_guid) + self.tc.api.exit(node.hostname, + self.appId) def status(self): if guid not in testbed_instance.elements.keys(): @@ -202,6 +209,13 @@ attributes = dict({ "flags": Attribute.ExecReadOnly | Attribute.ExecImmutable, "validation_function": validation.is_string }), + "env": dict({ + "name": "env", + "help": "String with space separated values of environment variables to set before starting application (e.g 'FOO=foo BAR=bar')", + "type": Attribute.STRING, + "flags": Attribute.ExecReadOnly | Attribute.ExecImmutable, + "validation_function": validation.is_string + }), "hostname": dict({ "name": "hostname", "help": "Hostname for the target OMF node", @@ -287,7 +301,7 @@ factories_info = dict({ "start_function": start, "stop_function": stop, "status_function": status, - "box_attributes": ["appId", "arguments", "path"], + "box_attributes": ["appId", "arguments", "path", "env"], "connector_types": ["node"], "tags": [tags.APPLICATION], }), diff --git a/src/nepi/testbeds/omf/omf_api.py b/src/nepi/testbeds/omf/omf_api.py index ac7946d8..d76fba34 100644 --- a/src/nepi/testbeds/omf/omf_api.py +++ b/src/nepi/testbeds/omf/omf_api.py @@ -8,7 +8,7 @@ from nepi.testbeds.omf.omf_client import OMFClient from nepi.testbeds.omf.omf_messages import MessageHandler class OmfAPI(object): - def __init__(self, slice, host, port, password, debug): + def __init__(self, slice, host, port, password): date = datetime.datetime.now().strftime("%Y-%m-%dt%H.%M.%S") tz = -time.altzone if time.daylight != 0 else -time.timezone date += "%+06.2f" % (tz / 3600) # timezone difference is in seconds @@ -19,9 +19,7 @@ class OmfAPI(object): self._password = password self._hostnames = [] - self._logger = logging.getLogger('nepi.testbeds.omfapi') - if debug: - self._logger.setLevel(logging.DEBUG) + self._logger = logging.getLogger("nepi.testbeds.omf") # OMF xmpp client self._client = None @@ -123,8 +121,13 @@ class OmfAPI(object): xmpp_node = self._host_session_id(hostname) self._client.publish(payload, xmpp_node) - def execute(self, hostname, app_id, arguments, path): - payload = self._message.executefunction(hostname, app_id, arguments, path) + def execute(self, hostname, app_id, arguments, path, env): + payload = self._message.executefunction(hostname, app_id, arguments, path, env) + xmpp_node = self._host_session_id(hostname) + self._client.publish(payload, xmpp_node) + + def exit(self, hostname, app_id): + payload = self._message.exitfunction(hostname, app_id) xmpp_node = self._host_session_id(hostname) self._client.publish(payload, xmpp_node) diff --git a/src/nepi/testbeds/omf/omf_client.py b/src/nepi/testbeds/omf/omf_client.py index e244d1b3..48568c8d 100644 --- a/src/nepi/testbeds/omf/omf_client.py +++ b/src/nepi/testbeds/omf/omf_client.py @@ -2,7 +2,6 @@ import logging import sleekxmpp from sleekxmpp.exceptions import IqError, IqTimeout import traceback -from xml.etree import cElementTree as ET class OMFClient(sleekxmpp.ClientXMPP): def __init__(self, jid, password): @@ -18,6 +17,9 @@ class OMFClient(sleekxmpp.ClientXMPP): self.add_event_handler("session_start", self.start) self.add_event_handler("register", self.register) + self.add_event_handler("pubsub_publish", self.handle_omf_message) + + self._logger = logging.getLogger("nepi.testbeds.omf") @property def ready(self): @@ -30,7 +32,7 @@ class OMFClient(sleekxmpp.ClientXMPP): def register(self, iq): if self._registered: - logging.info("%s already registered!" % self.boundjid) + self._logger.info("%s already registered!" % self.boundjid) return resp = self.Iq() @@ -40,46 +42,45 @@ class OMFClient(sleekxmpp.ClientXMPP): try: resp.send(now=True) - logging.info("Account created for %s!" % self.boundjid) + self._logger.info("Account created for %s!" % self.boundjid) self._registered = True except IqError as e: - logging.error("Could not register account: %s" % + self._logger.error("Could not register account: %s" % e.iq['error']['text']) except IqTimeout: - logging.error("No response from server.") + self._logger.error("No response from server.") def unregister(self): try: self.plugin['xep_0077'].cancel_registration( ifrom=self.boundjid.full) - logging.info("Account unregistered for %s!" % self.boundjid) + self._logger.info("Account unregistered for %s!" % self.boundjid) except IqError as e: - logging.error("Could not unregister account: %s" % + self._logger.error("Could not unregister account: %s" % e.iq['error']['text']) except IqTimeout: - logging.error("No response from server.") + self._logger.error("No response from server.") def nodes(self): try: result = self['xep_0060'].get_nodes(self._server) for item in result['disco_items']['items']: - print(' - %s' % str(item)) + self._logger.info(' - %s' % str(item)) return result except: - print traceback.format_exc() - logging.error('Could not retrieve node list.') + error = traceback.format_exc() + self._logger.error('Could not retrieve node list.\ntraceback:\n%s', error) - def suscriptions(self): + def subscriptions(self): try: result = self['xep_0060'].get_subscriptions(self._server) #self.boundjid.full) for node in result['node']: - print(' - %s' % str(node)) + self._logger.info(' - %s' % str(node)) return result except: - print traceback.format_exc() - logging.error('Could not retrieve suscriptions.') - + error = traceback.format_exc() + self._logger.error('Could not retrieve subscriptions.\ntraceback:\n%s', error) def create(self, node): config = self['xep_0004'].makeForm('submit') @@ -93,67 +94,81 @@ class OMFClient(sleekxmpp.ClientXMPP): try: self['xep_0060'].create_node(self._server, node, config = config) except: - print traceback.format_exc() - logging.error('Could not create node: %s' % node) + error = traceback.format_exc() + self._logger.error('Could not create node: %s\ntraceback:\n%s' % (node, error)) def delete(self, node): try: self['xep_0060'].delete_node(self._server, node) - print('Deleted node: %s' % node) + self._logger.info('Deleted node: %s' % node) except: - print traceback.format_exc() - logging.error('Could not delete node: %s' % node) - + error = traceback.format_exc() + self._logger.error('Could not delete node: %s\ntraceback:\n%s' % (node, error)) def publish(self, data, node): try: result = self['xep_0060'].publish(self._server,node,payload=data) - id = result['pubsub']['publish']['item']['id'] - #print('Published at item id: %s' % id) + # id = result['pubsub']['publish']['item']['id'] + # print('Published at item id: %s' % id) except: - print traceback.format_exc() - logging.error('Could not publish to: %s' % self.boundjid) + error = traceback.format_exc() + self._logger.error('Could not publish to: %s\ntraceback:\n%s' \ + % (self.boundjid, error)) def get(self, data): try: result = self['xep_0060'].get_item(self._server, self.boundjid, data) for item in result['pubsub']['items']['substanzas']: - print('Retrieved item %s: %s' % (item['id'], tostring(item['payload']))) + self._logger.info('Retrieved item %s: %s' % (item['id'], + tostring(item['payload']))) except: - print traceback.format_exc() - logging.error('Could not retrieve item %s from node %s' % (data, self.boundjid)) + error = traceback.format_exc() + self._logger.error('Could not retrieve item %s from node %s\ntraceback:\n%s' \ + % (data, self.boundjid, error)) def retract(self, data): try: result = self['xep_0060'].retract(self._server, self.boundjid, data) - print('Retracted item %s from node %s' % (data, self.boundjid)) + self._logger.info('Retracted item %s from node %s' % (data, self.boundjid)) except: - print traceback.format_exc() - logging.error('Could not retract item %s from node %s' % (data, self.boundjid)) + error = traceback.format_exc() + self._logger.error('Could not retract item %s from node %s\ntraceback:\n%s' \ + % (data, self.boundjid, error)) def purge(self): try: result = self['xep_0060'].purge(self._server, self.boundjid) - print('Purged all items from node %s' % self.boundjid) + self._logger.info('Purged all items from node %s' % self.boundjid) except: - print traceback.format_exc() - logging.error('Could not purge items from node %s' % self.boundjid) + error = traceback.format_exc() + self._logger.error('Could not purge items from node %s\ntraceback:\n%s' \ + % (self.boundjid, error)) def subscribe(self, node): try: result = self['xep_0060'].subscribe(self._server, node) - print('Subscribed %s to node %s' % (self.boundjid.bare, self.boundjid)) + self._logger.info('Subscribed %s to node %s' \ + % (self.boundjid.bare, self.boundjid)) except: - print traceback.format_exc() - logging.error('Could not subscribe %s to node %s' % (self.boundjid.bare, node)) + error = traceback.format_exc() + self._logger.error('Could not subscribe %s to node %s\ntraceback:\n%s' \ + % (self.boundjid.bare, node, error)) def unsubscribe(self, node): try: result = self['xep_0060'].unsubscribe(self._server, node) - print('Unsubscribed %s from node %s' % (self.boundjid.bare, node)) + self._logger.info('Unsubscribed %s from node %s' % (self.boundjid.bare, node)) except: - print traceback.format_exc() - logging.error('Could not unsubscribe %s from node %s' % (self.boundjid.bare, node)) + error = traceback.format_exc() + self._logger.error('Could not unsubscribe %s from node %s\ntraceback:\n%s' \ + % (self.boundjid.bare, node, error)) + + def handle_omf_message(self, iq): + for i in iq['pubsub_event']['items']: + self._logger.debug(i) + + #2default_slicenodeHandler::NodeHandlerdefault_slice-2012-09-28t16.22.17+02.00INFOOMF Experiment Controller 5.4 (git 529a626) + diff --git a/src/nepi/testbeds/omf/omf_messages.py b/src/nepi/testbeds/omf/omf_messages.py index f4c99c59..77c53dc0 100644 --- a/src/nepi/testbeds/omf/omf_messages.py +++ b/src/nepi/testbeds/omf/omf_messages.py @@ -31,7 +31,6 @@ class MessageHandler(): print "init" + self.ExpID +" "+ self.SliceID pass - def Mid(self, parent, keyword): mid = ET.SubElement(parent, keyword) mid.set("id", "\'omf-payload\'") @@ -42,11 +41,10 @@ class MessageHandler(): mtext.text = text return mtext - - def executefunction(self, target, appid, cmdlineargs, path): + def executefunction(self, target, appid, cmdlineargs, path, env): payload = ET.Element("omf-message") execute = self.Mid(payload,"EXECUTE") - env = self.Mtext(execute, "ENV", "") + env = self.Mtext(execute, "ENV", env) sliceid = self.Mtext(execute,"SLICEID",self.SliceID) expid = self.Mtext(execute,"EXPID",self.ExpID) target = self.Mtext(execute,"TARGET",target) @@ -55,6 +53,15 @@ class MessageHandler(): path = self.Mtext(execute,"PATH",path) return payload + def exitfunction(self, target, appid): + payload = ET.Element("omf-message") + execute = self.Mid(payload,"EXIT") + sliceid = self.Mtext(execute,"SLICEID",self.SliceID) + expid = self.Mtext(execute,"EXPID",self.ExpID) + target = self.Mtext(execute,"TARGET",target) + appid = self.Mtext(execute,"APPID",appid) + return payload + def configurefunction(self, target, value, path): payload = ET.Element("omf-message") config = self.Mid(payload, "CONFIGURE") @@ -104,17 +111,6 @@ class MessageHandler(): target = self.Mtext(noop,"TARGET",target) return payload - def enrollfunction(self, enrollkey, image, index, target ): - payload = ET.Element("omf-message") - enroll = self.Mid(payload,"ENROLL") - enrollkey = self.Mtext(enroll,"ENROLLKEY",enrollkey) - sliceid = self.Mtext(enroll,"SLICEID",self.SliceID) - image = self.Mtext(enroll,"IMAGE",image) - expid = self.Mtext(enroll,"EXPID",self.ExpID) - index = self.Mtext(enroll,"INDEX",index) - target = self.Mtext(enroll,"TARGET",target) - return payload - def newexpfunction(self, experimentid, address): payload = ET.Element("omf-message") newexp = self.Mid(payload,"EXPERIMENT_NEW")