ec.set(node1, 'xmppHost', "xmpp-plexus.onelab.eu")
ec.set(node1, 'xmppPort', "5222")
ec.set(node1, 'xmppPassword', "1234")
+ec.set(node1, 'xmppSlice', "nepi")
+ec.set(node1, 'xmppHost', "xmpp-plexus.onelab.eu")
+ec.set(node1, 'xmppPort', "5222")
+ec.set(node1, 'xmppPassword', "1234")
node2 = ec.register_resource("OMFNode")
ec.set(node2, 'hostname', "omf.plexus.wlab37")
ec.set(node2, 'xmppHost', "xmpp-plexus.onelab.eu")
ec.set(node2, 'xmppPort', "5222")
ec.set(node2, 'xmppPassword', "1234")
+ec.set(node2, 'xmppSlice', "nepi")
+ec.set(node2, 'xmppHost', "xmpp-plexus.onelab.eu")
+ec.set(node2, 'xmppPort', "5222")
+ec.set(node2, 'xmppPassword', "1234")
# Create and Configure the Interfaces
iface1 = ec.register_resource("OMFWifiInterface")
"""
from nepi.util.timefuncs import strfnow, strfdiff, strfvalid
+from nepi.util.logger import Logger
from nepi.execution.trace import TraceAttr
import copy
import functools
import inspect
-import logging
import os
import pkgutil
import weakref
RELEASED = 8
ResourceState2str = dict({
- NEW = "NEW",
- DISCOVERED = "DISCOVERED",
- PROVISIONED = "PROVISIONED",
- READY = "READY",
- STARTED = "STARTED",
- STOPPED = "STOPPED",
- FINISHED = "FINISHED",
- FAILED = "FAILED",
- RELEASED = "RELEASED",
+ ResourceState.NEW : "NEW",
+ ResourceState.DISCOVERED : "DISCOVERED",
+ ResourceState.PROVISIONED : "PROVISIONED",
+ ResourceState.READY : "READY",
+ ResourceState.STARTED : "STARTED",
+ ResourceState.STOPPED : "STOPPED",
+ ResourceState.FINISHED : "FINISHED",
+ ResourceState.FAILED : "FAILED",
+ ResourceState.RELEASED : "RELEASED",
})
def clsinit(cls):
# Decorator to invoke class initialization method
@clsinit
-class ResourceManager(object):
+class ResourceManager(Logger):
_rtype = "Resource"
_attributes = None
_traces = None
return copy.deepcopy(cls._traces.values())
def __init__(self, ec, guid):
+ super(ResourceManager, self).__init__(self.rtype())
+
self._guid = guid
self._ec = weakref.ref(ec)
self._connections = set()
self._ready_time = None
self._release_time = None
- # Logging
- self._logger = logging.getLogger("Resource")
-
- def debug(self, msg, out = None, err = None):
- self.log(msg, logging.DEBUG, out, err)
-
- def error(self, msg, out = None, err = None):
- self.log(msg, logging.ERROR, out, err)
-
- def warn(self, msg, out = None, err = None):
- self.log(msg, logging.WARNING, out, err)
-
- def info(self, msg, out = None, err = None):
- self.log(msg, logging.INFO, out, err)
-
- def log(self, msg, level, out = None, err = None):
- if out:
- msg += " - OUT: %s " % out
-
- if err:
- msg += " - ERROR: %s " % err
-
- msg = self.log_message(msg)
-
- self.logger.log(level, msg)
-
- def log_message(self, msg):
- return " %s guid: %d - %s " % (self._rtype, self.guid, msg)
-
- @property
- def logger(self):
- return self._logger
-
@property
def guid(self):
return self._guid
def state(self):
return self._state
+ def log_message(self, msg):
+ return " %s guid: %d - %s " % (self._rtype, self.guid, msg)
+
def connect(self, guid):
if self.valid_connection(guid):
self._connections.add(guid)
from nepi.util import sshfuncs
from nepi.util.timefuncs import strfnow, strfdiff
-import logging
import os
reschedule_delay = "0.5s"
# timestamp of last state check of the application
self._last_state_check = strfnow()
-
- self._logger = logging.getLogger("LinuxApplication")
def log_message(self, msg):
return " guid %d - host %s - %s " % (self.guid,
from nepi.execution.attribute import Attribute, Flags
from nepi.execution.resource import ResourceManager, clsinit, ResourceState
-from nepi.resources.linux.node import LinuxNode
-
-import collections
-import logging
-import os
-import random
-import re
-import tempfile
-import time
-import threading
@clsinit
class LinuxChannel(ResourceManager):
def __init__(self, ec, guid):
super(LinuxChannel, self).__init__(ec, guid)
- self._logger = logging.getLogger("LinuxChannel")
def log_message(self, msg):
return " guid %d - %s " % (self.guid, msg)
from nepi.resources.linux.channel import LinuxChannel
import collections
-import logging
import os
import random
import re
def __init__(self, ec, guid):
super(LinuxInterface, self).__init__(ec, guid)
self._configured = False
-
- self._logger = logging.getLogger("LinuxInterface")
self.add_set_hooks()
from nepi.util import sshfuncs, execfuncs
import collections
-import logging
import os
import random
import re
# lock to avoid concurrency issues on methods used by applications
self._lock = threading.Lock()
-
- self._logger = logging.getLogger("LinuxNode")
def log_message(self, msg):
return " guid %d - host %s - %s " % (self.guid,
from nepi.execution.attribute import Attribute, Flags
from nepi.resources.omf.omf_api import OMFAPIFactory
-import nepi
-import logging
-
@clsinit
class OMFApplication(ResourceManager):
"""
self._omf_api = None
- self._logger = logging.getLogger("nepi.omf.omfApp ")
- self._logger.setLevel(nepi.LOGLEVEL)
-
-
def _validate_connection(self, guid):
"""Check if the connection is available.
"""
rm = self.ec.get_resource(guid)
if rm.rtype() not in self._authorized_connections:
- self._logger.debug("Connection between %s %s and %s %s refused : An Application can be connected only to a Node" % (self.rtype(), self._guid, rm.rtype(), guid))
+ msg = "Connection between %s %s and %s %s refused : An Application can be connected only to a Node" % (self.rtype(), self._guid, rm.rtype(), guid)
+ self._logger.debug(msg)
return False
elif len(self.connections) != 0 :
- self._logger.debug("Connection between %s %s and %s %s refused : Already Connected" % (self.rtype(), self._guid, rm.rtype(), guid))
+ msg = "Connection between %s %s and %s %s refused : Already Connected" % (self.rtype(), self._guid, rm.rtype(), guid)
+ self._logger.debug(msg)
return False
else :
- self._logger.debug("Connection between %s %s and %s %s accepted" % (self.rtype(), self._guid, rm.rtype(), guid))
+ msg = "Connection between %s %s and %s %s accepted" % (self.rtype(), self._guid, rm.rtype(), guid)
+ self._logger.debug(msg)
return True
def _get_nodes(self, conn_set):
return rm
return None
- def deploy_action(self):
+ def deploy(self):
"""Deploy the RM
"""
self._omf_api = OMFAPIFactory.get_api(self.get('xmppSlice'),
self.get('xmppHost'), self.get('xmppPort'), self.get('xmppPassword'))
- super(OMFApplication, self).deploy_action()
+ super(OMFApplication, self).deploy()
def start(self):
"""Send Xmpp Message Using OMF protocol to execute the application
"""
super(OMFApplication, self).start()
- self._logger.debug(" " + self.rtype() + " ( Guid : " + str(self._guid) +") : " + self.get('appid') + " : " + self.get('path') + " : " + self.get('args') + " : " + self.get('env'))
+ msg = " " + self.rtype() + " ( Guid : " + str(self._guid) +") : " + self.get('appid') + " : " + self.get('path') + " : " + self.get('args') + " : " + self.get('env')
+ self.debug(msg)
if self.get('appid') and self.get('path') and self.get('args') and self.get('env'):
rm_node = self._get_nodes(self._connections)
"""
rm = self.ec.get_resource(guid)
if rm.rtype() in self._authorized_connections:
- self._logger.debug("Connection between %s %s and %s %s accepted" %
- (self.rtype(), self._guid, rm.rtype(), guid))
+ msg = "Connection between %s %s and %s %s accepted" % (self.rtype(), self._guid, rm.rtype(), guid)
+ self.debug(msg)
return True
- self._logger.debug("Connection between %s %s and %s %s refused" % (self.rtype(), self._guid, rm.rtype(), guid))
+ msg = "Connection between %s %s and %s %s refused" % (self.rtype(), self._guid, rm.rtype(), guid)
+ self.debug(msg)
return False
def _get_target(self, conn_set):
self._nodes_guid.append(couple)
return self._nodes_guid
- def deploy_action(self):
+ def deploy(self):
"""Deploy the RM
"""
#print "Send the configure message"
self._omf_api.configure(couple[0], attrname, attrval)
- super(OMFChannel, self).deploy_action()
+ super(OMFChannel, self).deploy()
def discover(self):
""" Discover the availables channels
from nepi.resources.omf.omf_api import OMFAPIFactory
-import nepi
-import logging
@clsinit
class OMFWifiInterface(ResourceManager):
self._omf_api = None
self._alias = self.get('alias')
- self._logger = logging.getLogger("nepi.omf.omfIface ")
- self._logger.setLevel(nepi.LOGLEVEL)
-
def _validate_connection(self, guid):
""" Check if the connection is available.
"""
rm = self.ec.get_resource(guid)
if rm.rtype() in self._authorized_connections:
- self._logger.debug("Connection between %s %s and %s %s accepted" %
- (self.rtype(), self._guid, rm.rtype(), guid))
+ msg = "Connection between %s %s and %s %s accepted" % (self.rtype(), self._guid, rm.rtype(), guid)
+ self.debug(msg)
return True
- self._logger.debug("Connection between %s %s and %s %s refused" %
- (self.rtype(), self._guid, rm.rtype(), guid))
+ msg = "Connection between %s %s and %s %s refused" % (self.rtype(), self._guid, rm.rtype(), guid)
+ self.debug(msg)
return False
def _get_nodes(self, conn_set):
return rm
return None
- def deploy_action(self):
+ def deploy(self):
"""Deploy the RM
"""
#print "Send the configure message"
self._omf_api.configure(rm_node.get('hostname'), attrname, attrval)
- super(OMFWifiInterface, self).deploy_action()
+ super(OMFWifiInterface, self).deploy()
def start(self):
from nepi.resources.omf.omf_api import OMFAPIFactory
-import nepi
-import logging
import time
@clsinit
self._omf_api = None
- self._logger = logging.getLogger("nepi.omf.omfNode ")
-
# XXX: TO DISCUSS
- self._logger.setLevel(nepi.LOGLEVEL)
def _validate_connection(self, guid):
"""Check if the connection is available.
"""
rm = self.ec.get_resource(guid)
if rm.rtype() in self._authorized_connections:
- self._logger.debug("Connection between %s %s and %s %s accepted" %
- (self.rtype(), self._guid, rm.rtype(), guid))
+ msg = "Connection between %s %s and %s %s accepted" % (self.rtype(), self._guid, rm.rtype(), guid)
+ self.debug(msg)
return True
- self._logger.debug("Connection between %s %s and %s %s refused" %
- (self.rtype(), self._guid, rm.rtype(), guid))
+ msg = "Connection between %s %s and %s %s refused" % (self.rtype(), self._guid, rm.rtype(), guid)
+ self.debug(msg)
return False
- def deploy_action(self):
+ def deploy(self):
"""Deploy the RM
"""
self.get('xmppHost'), self.get('xmppPort'), self.get('xmppPassword'))
self._omf_api.enroll_host(self.get('hostname'))
- super(OMFNode, self).deploy_action()
+ super(OMFNode, self).deploy()
def discover(self):
""" Discover the availables nodes
"""
import datetime
-import logging
import ssl
import sys
import time
import hashlib
-import nepi
import threading
+from nepi.util.logger import Logger
+
from nepi.resources.omf.omf_client import OMFClient
from nepi.resources.omf.messages_5_4 import MessageHandler
-class OMFAPI(object):
+class OMFAPI(Logger):
"""
.. class:: Class Args :
:type xmpp_root: Str
"""
+ super(OMFAPI, self).__init__("OMFAPI")
+
date = datetime.datetime.now().strftime("%Y-%m-%dt%H.%M.%S")
tz = -time.altzone if time.daylight != 0 else -time.timezone
date += "%+06.2f" % (tz / 3600) # timezone difference is in seconds
self._hostnames = []
self._xmpp_root = xmpp_root or "OMF_5.4"
- self._logger = logging.getLogger("nepi.omf.omfApi ")
- self._logger.setLevel(nepi.LOGLEVEL)
-
# OMF xmpp client
self._client = None
# message handler
self._message = MessageHandler(self._slice, self._user)
else:
msg = "Unable to connect to the XMPP server."
- self._logger.error(msg)
+ self.error(msg)
raise RuntimeError(msg)
def _enroll_experiment(self):
# Wait the send queue to be empty before disconnect
self._client.disconnect(wait=True)
- self._logger.debug(" Disconnected from XMPP Server")
+ msg = " Disconnected from XMPP Server"
+ self.debug(msg)
class OMFAPIFactory(object):
"""
-import logging
+from nepi.util.logger import Logger
+
import sleekxmpp
from sleekxmpp.exceptions import IqError, IqTimeout
import traceback
import xml.etree.ElementTree as ET
-import nepi
-
# inherit from BaseXmpp and XMLStream classes
-class OMFClient(sleekxmpp.ClientXMPP):
+class OMFClient(sleekxmpp.ClientXMPP, Logger):
"""
.. class:: Class Args :
"""
+ Logger.__init__(self, "OMFClient")
+
sleekxmpp.ClientXMPP.__init__(self, jid, password)
self._ready = False
self._registered = False
self.add_event_handler("register", self.register)
self.add_event_handler("pubsub_publish", self.handle_omf_message)
- self._logger = logging.getLogger("nepi.omf.xmppClient")
- self._logger.setLevel(nepi.LOGLEVEL)
-
@property
def ready(self):
""" Check if the client is ready
"""
if self._registered:
- self._logger.info(" %s already registered!" % self.boundjid)
+ msg = " %s already registered!" % self.boundjid
+ self.info(msg)
return
resp = self.Iq()
try:
resp.send(now=True)
- self._logger.info(" Account created for %s!" % self.boundjid)
+ msg = " Account created for %s!" % self.boundjid
+ self.info(msg)
self._registered = True
except IqError as e:
- self._logger.error(" Could not register account: %s" %
- e.iq['error']['text'])
+ msg = " Could not register account: %s" % e.iq['error']['text']
+ selferror(msg)
except IqTimeout:
- self._logger.error(" No response from server.")
+ msg = " No response from server."
+ self.error(msg)
def unregister(self):
""" Unregister from the Xmppp Server.
try:
self.plugin['xep_0077'].cancel_registration(
ifrom=self.boundjid.full)
- self._logger.info(" Account unregistered for %s!" % self.boundjid)
+ msg = " Account unregistered for %s!" % self.boundjid
+ self.info(msg)
except IqError as e:
- self._logger.error(" Could not unregister account: %s" %
- e.iq['error']['text'])
+ msg = " Could not unregister account: %s" % e.iq['error']['text']
+ self.error(msg)
except IqTimeout:
- self._logger.error(" No response from server.")
+ msg = " No response from server."
+ self.error(msg)
def nodes(self):
""" Get all the nodes of the Xmppp Server.
try:
result = self['xep_0060'].get_nodes(self._server)
for item in result['disco_items']['items']:
- self._logger.info(' - %s' % str(item))
+ msg = ' - %s' % str(item)
+ self.info(msg)
return result
except:
error = traceback.format_exc()
- self._logger.error(' Could not retrieve node list.\ntraceback:\n%s', error)
+ msg = 'Could not retrieve node list.\ntraceback:\n%s' % error
+ self.error(msg)
def subscriptions(self):
""" Get all the subscriptions of the Xmppp Server.
result = self['xep_0060'].get_subscriptions(self._server)
#self.boundjid.full)
for node in result['node']:
- self._logger.info(' - %s' % str(node))
+ msg = ' - %s' % str(node)
+ self.info(msg)
return result
except:
error = traceback.format_exc()
- self._logger.error(' Could not retrieve subscriptions.\ntraceback:\n%s', error)
+ msg = ' Could not retrieve subscriptions.\ntraceback:\n%s' % error
+ self.error(msg)
def create(self, node):
""" Create the topic corresponding to the node
:type node: str
"""
- self._logger.debug(" Create Topic : " + node)
+ self.debug(" Create Topic : " + node)
config = self['xep_0004'].makeForm('submit')
config.add_field(var='pubsub#node_type', value='leaf')
self['xep_0060'].create_node(self._server, node, config = config)
except:
error = traceback.format_exc()
- self._logger.error(' Could not create topic: %s\ntraceback:\n%s' % (node, error))
+ msg = ' Could not create topic: %s\ntraceback:\n%s' % (node, error)
+ self.error(msg)
def delete(self, node):
""" Delete the topic corresponding to the node
#print " length of the queue : " + str(self.event_queue.qsize())
try:
self['xep_0060'].delete_node(self._server, node)
- self._logger.info(' Deleted node: %s' % node)
+ msg = ' Deleted node: %s' % node
+ self.info(msg)
except:
error = traceback.format_exc()
- self._logger.error(' Could not delete topic: %s\ntraceback:\n%s' % (node, error))
+ msg = ' Could not delete topic: %s\ntraceback:\n%s' % (node, error)
+ self.error(msg)
def publish(self, data, node):
""" Publish the data to the corresponding topic
"""
- self._logger.debug(" Publish to Topic : " + node)
+ msg = " Publish to Topic : " + node
+ self.debug(msg)
try:
result = self['xep_0060'].publish(self._server,node,payload=data)
# id = result['pubsub']['publish']['item']['id']
# print('Published at item id: %s' % id)
except:
error = traceback.format_exc()
- self._logger.error(' Could not publish to: %s\ntraceback:\n%s' \
- % (node, error))
+ msg = ' Could not publish to: %s\ntraceback:\n%s' % (node, error)
+ self.error(msg)
def get(self, data):
""" Get the item
result = self['xep_0060'].get_item(self._server, self.boundjid,
data)
for item in result['pubsub']['items']['substanzas']:
- self._logger.info('Retrieved item %s: %s' % (item['id'],
- tostring(item['payload'])))
+ msg = 'Retrieved item %s: %s' % (item['id'], tostring(item['payload']))
+ self.info(msg)
except:
error = traceback.format_exc()
- self._logger.error(' Could not retrieve item %s from topic %s\ntraceback:\n%s' \
- % (data, self.boundjid, error))
+ msg = ' Could not retrieve item %s from topic %s\ntraceback:\n%s' \
+ % (data, self.boundjid, error)
+ self.error(msg)
def retract(self, data):
""" Retract the item
"""
try:
result = self['xep_0060'].retract(self._server, self.boundjid, data)
- self._logger.info(' Retracted item %s from topic %s' % (data, self.boundjid))
+ msg = ' Retracted item %s from topic %s' % (data, self.boundjid)
+ self.info(msg)
except:
error = traceback.format_exc()
- self._logger.error(' Could not retract item %s from topic %s\ntraceback:\n%s' \
- % (data, self.boundjid, error))
+ msg = 'Could not retract item %s from topic %s\ntraceback:\n%s' \
+ % (data, self.boundjid, error)
+ self.error(msg)
def purge(self):
""" Purge the information in the server
"""
try:
result = self['xep_0060'].purge(self._server, self.boundjid)
- self._logger.info(' Purged all items from topic %s' % self.boundjid)
+ msg = ' Purged all items from topic %s' % self.boundjid
+ self.info(msg)
except:
error = traceback.format_exc()
- self._logger.error(' Could not purge items from topic %s\ntraceback:\n%s' \
- % (self.boundjid, error))
+ msg = ' Could not purge items from topic %s\ntraceback:\n%s' \
+ % (self.boundjid, error)
+ self.error(msg)
def subscribe(self, node):
""" Subscribe to a topic
"""
try:
result = self['xep_0060'].subscribe(self._server, node)
- #self._logger.debug('Subscribed %s to node %s' \
- #% (self.boundjid.bare, node))
- self._logger.info(' Subscribed %s to topic %s' \
- % (self.boundjid.user, node))
+ msg = ' Subscribed %s to topic %s' \
+ % (self.boundjid.user, node)
+ self.info(msg)
+ #self.debug(msg)
except:
error = traceback.format_exc()
- self._logger.error(' Could not subscribe %s to topic %s\ntraceback:\n%s' \
- % (self.boundjid.bare, node, error))
+ msg = ' Could not subscribe %s to topic %s\ntraceback:\n%s' \
+ % (self.boundjid.bare, node, error)
+ self.error(msg)
def unsubscribe(self, node):
""" Unsubscribe to a topic
"""
try:
result = self['xep_0060'].unsubscribe(self._server, node)
- self._logger.info(' Unsubscribed %s from topic %s' % (self.boundjid.bare, node))
+ msg = ' Unsubscribed %s from topic %s' % (self.boundjid.bare, node)
+ self.info(msg)
except:
error = traceback.format_exc()
- self._logger.error(' Could not unsubscribe %s from topic %s\ntraceback:\n%s' \
- % (self.boundjid.bare, node, error))
+ msg = ' Could not unsubscribe %s from topic %s\ntraceback:\n%s' \
+ % (self.boundjid.bare, node, error)
+ self.error(msg)
def _check_for_tag(self, root, namespaces, tag):
""" Check if an element markup is in the ElementTree
response = response + " " + msg.text + " :"
deb = self._check_for_tag(root, namespaces, "MESSAGE")
if deb is not None:
- self._logger.debug(response + " " + deb.text)
+ msg = response + " " + deb.text
+ self.debug(msg)
else :
- self._logger.info(response)
+ self.info(response)
def handle_omf_message(self, iq):
""" Handle published/received message
from nepi.resources.planetlab.plcapi import PLCAPIFactory
-import logging
-
reschedule_delay = "0.5s"
@clsinit_copy
super(PLanetLabNode, self).__init__(ec, guid)
self._plapi = None
-
- self._logger = logging.getLogger("PlanetLabNode")
@property
def plapi(self):
--- /dev/null
+"""
+ NEPI, a framework to manage network experiments
+ Copyright (C) 2013 INRIA
+
+ This program is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+"""
+
+import logging
+
+
+class Logger(object):
+ def __init__(self, logger_component):
+ self._logger = logging.getLogger(logger_component)
+
+ def debug(self, msg, out = None, err = None):
+ self.log(msg, logging.DEBUG, out, err)
+
+ def error(self, msg, out = None, err = None):
+ self.log(msg, logging.ERROR, out, err)
+
+ def warn(self, msg, out = None, err = None):
+ self.log(msg, logging.WARNING, out, err)
+
+ def info(self, msg, out = None, err = None):
+ self.log(msg, logging.INFO, out, err)
+
+ def log(self, msg, level, out = None, err = None):
+ if out:
+ msg += " - OUT: %s " % out
+
+ if err:
+ msg += " - ERROR: %s " % err
+
+ msg = self.log_message(msg)
+
+ self.logger.log(level, msg)
+
+ def log_message(self, msg):
+ return msg
+
+ @property
+ def logger(self):
+ return self._logger