from nepi.testbeds.omf.omf_messages import MessageHandler
class OmfAPI(object):
- def __init__(self, slice, host, port, password, debug):
+ def __init__(self, slice, host, port, password):
date = datetime.datetime.now().strftime("%Y-%m-%dt%H.%M.%S")
tz = -time.altzone if time.daylight != 0 else -time.timezone
date += "%+06.2f" % (tz / 3600) # timezone difference is in seconds
self._password = password
self._hostnames = []
- self._logger = logging.getLogger('nepi.testbeds.omfapi')
- if debug:
- self._logger.setLevel(logging.DEBUG)
+ self._logger = logging.getLogger("nepi.testbeds.omf")
# OMF xmpp client
self._client = None
xmpp_node = self._host_session_id(hostname)
self._client.publish(payload, xmpp_node)
- def execute(self, hostname, app_id, arguments, path):
- payload = self._message.executefunction(hostname, app_id, arguments, path)
+ def execute(self, hostname, app_id, arguments, path, env):
+ payload = self._message.executefunction(hostname, app_id, arguments, path, env)
+ xmpp_node = self._host_session_id(hostname)
+ self._client.publish(payload, xmpp_node)
+
+ def exit(self, hostname, app_id):
+ payload = self._message.exitfunction(hostname, app_id)
xmpp_node = self._host_session_id(hostname)
self._client.publish(payload, xmpp_node)
import sleekxmpp
from sleekxmpp.exceptions import IqError, IqTimeout
import traceback
-from xml.etree import cElementTree as ET
class OMFClient(sleekxmpp.ClientXMPP):
def __init__(self, jid, password):
self.add_event_handler("session_start", self.start)
self.add_event_handler("register", self.register)
+ self.add_event_handler("pubsub_publish", self.handle_omf_message)
+
+ self._logger = logging.getLogger("nepi.testbeds.omf")
@property
def ready(self):
def register(self, iq):
if self._registered:
- logging.info("%s already registered!" % self.boundjid)
+ self._logger.info("%s already registered!" % self.boundjid)
return
resp = self.Iq()
try:
resp.send(now=True)
- logging.info("Account created for %s!" % self.boundjid)
+ self._logger.info("Account created for %s!" % self.boundjid)
self._registered = True
except IqError as e:
- logging.error("Could not register account: %s" %
+ self._logger.error("Could not register account: %s" %
e.iq['error']['text'])
except IqTimeout:
- logging.error("No response from server.")
+ self._logger.error("No response from server.")
def unregister(self):
try:
self.plugin['xep_0077'].cancel_registration(
ifrom=self.boundjid.full)
- logging.info("Account unregistered for %s!" % self.boundjid)
+ self._logger.info("Account unregistered for %s!" % self.boundjid)
except IqError as e:
- logging.error("Could not unregister account: %s" %
+ self._logger.error("Could not unregister account: %s" %
e.iq['error']['text'])
except IqTimeout:
- logging.error("No response from server.")
+ self._logger.error("No response from server.")
def nodes(self):
try:
result = self['xep_0060'].get_nodes(self._server)
for item in result['disco_items']['items']:
- print(' - %s' % str(item))
+ self._logger.info(' - %s' % str(item))
return result
except:
- print traceback.format_exc()
- logging.error('Could not retrieve node list.')
+ error = traceback.format_exc()
+ self._logger.error('Could not retrieve node list.\ntraceback:\n%s', error)
- def suscriptions(self):
+ def subscriptions(self):
try:
result = self['xep_0060'].get_subscriptions(self._server)
#self.boundjid.full)
for node in result['node']:
- print(' - %s' % str(node))
+ self._logger.info(' - %s' % str(node))
return result
except:
- print traceback.format_exc()
- logging.error('Could not retrieve suscriptions.')
-
+ error = traceback.format_exc()
+ self._logger.error('Could not retrieve subscriptions.\ntraceback:\n%s', error)
def create(self, node):
config = self['xep_0004'].makeForm('submit')
try:
self['xep_0060'].create_node(self._server, node, config = config)
except:
- print traceback.format_exc()
- logging.error('Could not create node: %s' % node)
+ error = traceback.format_exc()
+ self._logger.error('Could not create node: %s\ntraceback:\n%s' % (node, error))
def delete(self, node):
try:
self['xep_0060'].delete_node(self._server, node)
- print('Deleted node: %s' % node)
+ self._logger.info('Deleted node: %s' % node)
except:
- print traceback.format_exc()
- logging.error('Could not delete node: %s' % node)
-
+ error = traceback.format_exc()
+ self._logger.error('Could not delete node: %s\ntraceback:\n%s' % (node, error))
def publish(self, data, node):
try:
result = self['xep_0060'].publish(self._server,node,payload=data)
- id = result['pubsub']['publish']['item']['id']
- #print('Published at item id: %s' % id)
+ # id = result['pubsub']['publish']['item']['id']
+ # print('Published at item id: %s' % id)
except:
- print traceback.format_exc()
- logging.error('Could not publish to: %s' % self.boundjid)
+ error = traceback.format_exc()
+ self._logger.error('Could not publish to: %s\ntraceback:\n%s' \
+ % (self.boundjid, error))
def get(self, data):
try:
result = self['xep_0060'].get_item(self._server, self.boundjid,
data)
for item in result['pubsub']['items']['substanzas']:
- print('Retrieved item %s: %s' % (item['id'], tostring(item['payload'])))
+ self._logger.info('Retrieved item %s: %s' % (item['id'],
+ tostring(item['payload'])))
except:
- print traceback.format_exc()
- logging.error('Could not retrieve item %s from node %s' % (data, self.boundjid))
+ error = traceback.format_exc()
+ self._logger.error('Could not retrieve item %s from node %s\ntraceback:\n%s' \
+ % (data, self.boundjid, error))
def retract(self, data):
try:
result = self['xep_0060'].retract(self._server, self.boundjid, data)
- print('Retracted item %s from node %s' % (data, self.boundjid))
+ self._logger.info('Retracted item %s from node %s' % (data, self.boundjid))
except:
- print traceback.format_exc()
- logging.error('Could not retract item %s from node %s' % (data, self.boundjid))
+ error = traceback.format_exc()
+ self._logger.error('Could not retract item %s from node %s\ntraceback:\n%s' \
+ % (data, self.boundjid, error))
def purge(self):
try:
result = self['xep_0060'].purge(self._server, self.boundjid)
- print('Purged all items from node %s' % self.boundjid)
+ self._logger.info('Purged all items from node %s' % self.boundjid)
except:
- print traceback.format_exc()
- logging.error('Could not purge items from node %s' % self.boundjid)
+ error = traceback.format_exc()
+ self._logger.error('Could not purge items from node %s\ntraceback:\n%s' \
+ % (self.boundjid, error))
def subscribe(self, node):
try:
result = self['xep_0060'].subscribe(self._server, node)
- print('Subscribed %s to node %s' % (self.boundjid.bare, self.boundjid))
+ self._logger.info('Subscribed %s to node %s' \
+ % (self.boundjid.bare, self.boundjid))
except:
- print traceback.format_exc()
- logging.error('Could not subscribe %s to node %s' % (self.boundjid.bare, node))
+ error = traceback.format_exc()
+ self._logger.error('Could not subscribe %s to node %s\ntraceback:\n%s' \
+ % (self.boundjid.bare, node, error))
def unsubscribe(self, node):
try:
result = self['xep_0060'].unsubscribe(self._server, node)
- print('Unsubscribed %s from node %s' % (self.boundjid.bare, node))
+ self._logger.info('Unsubscribed %s from node %s' % (self.boundjid.bare, node))
except:
- print traceback.format_exc()
- logging.error('Could not unsubscribe %s from node %s' % (self.boundjid.bare, node))
+ error = traceback.format_exc()
+ self._logger.error('Could not unsubscribe %s from node %s\ntraceback:\n%s' \
+ % (self.boundjid.bare, node, error))
+
+ def handle_omf_message(self, iq):
+ for i in iq['pubsub_event']['items']:
+ self._logger.debug(i)
+
+ #<item xmlns="http://jabber.org/protocol/pubsub#event" id="dFbv6WRlMuKghJ0"><omf-message xmlns="http://jabber.org/protocol/pubsub"><LOGGING id="'omf-payload'"><LEVEL>2</LEVEL><SLICEID>default_slice</SLICEID><LOGGER>nodeHandler::NodeHandler</LOGGER><EXPID>default_slice-2012-09-28t16.22.17+02.00</EXPID><LEVEL_NAME>INFO</LEVEL_NAME><DATA>OMF Experiment Controller 5.4 (git 529a626)</DATA></LOGGING></omf-message></item>
+
print "init" + self.ExpID +" "+ self.SliceID
pass
-
def Mid(self, parent, keyword):
mid = ET.SubElement(parent, keyword)
mid.set("id", "\'omf-payload\'")
mtext.text = text
return mtext
-
- def executefunction(self, target, appid, cmdlineargs, path):
+ def executefunction(self, target, appid, cmdlineargs, path, env):
payload = ET.Element("omf-message")
execute = self.Mid(payload,"EXECUTE")
- env = self.Mtext(execute, "ENV", "")
+ env = self.Mtext(execute, "ENV", env)
sliceid = self.Mtext(execute,"SLICEID",self.SliceID)
expid = self.Mtext(execute,"EXPID",self.ExpID)
target = self.Mtext(execute,"TARGET",target)
path = self.Mtext(execute,"PATH",path)
return payload
+ def exitfunction(self, target, appid):
+ payload = ET.Element("omf-message")
+ execute = self.Mid(payload,"EXIT")
+ sliceid = self.Mtext(execute,"SLICEID",self.SliceID)
+ expid = self.Mtext(execute,"EXPID",self.ExpID)
+ target = self.Mtext(execute,"TARGET",target)
+ appid = self.Mtext(execute,"APPID",appid)
+ return payload
+
def configurefunction(self, target, value, path):
payload = ET.Element("omf-message")
config = self.Mid(payload, "CONFIGURE")
target = self.Mtext(noop,"TARGET",target)
return payload
- def enrollfunction(self, enrollkey, image, index, target ):
- payload = ET.Element("omf-message")
- enroll = self.Mid(payload,"ENROLL")
- enrollkey = self.Mtext(enroll,"ENROLLKEY",enrollkey)
- sliceid = self.Mtext(enroll,"SLICEID",self.SliceID)
- image = self.Mtext(enroll,"IMAGE",image)
- expid = self.Mtext(enroll,"EXPID",self.ExpID)
- index = self.Mtext(enroll,"INDEX",index)
- target = self.Mtext(enroll,"TARGET",target)
- return payload
-
def newexpfunction(self, experimentid, address):
payload = ET.Element("omf-message")
newexp = self.Mid(payload,"EXPERIMENT_NEW")