--- /dev/null
+#!/usr/bin/env python
+from neco.execution.resource import ResourceFactory, ResourceAction, ResourceState
+from neco.execution.ec import ExperimentController
+
+from neco.resources.omf.omf_node import OMFNode
+from neco.resources.omf.omf_application import OMFApplication
+from neco.resources.omf.omf_interface import OMFWifiInterface
+from neco.resources.omf.omf_channel import OMFChannel
+
+import logging
+import time
+
+logging.basicConfig()
+
+# Create the EC
+ec = ExperimentController()
+
+# Register the different RM that will be used
+ResourceFactory.register_type(OMFNode)
+ResourceFactory.register_type(OMFWifiInterface)
+ResourceFactory.register_type(OMFChannel)
+ResourceFactory.register_type(OMFApplication)
+
+# Create and Configure the Nodes
+node1 = ec.register_resource("OMFNode")
+ec.set(node1, 'hostname', 'omf.plexus.wlab17')
+ec.set(node1, 'xmppSlice', "nepi")
+ec.set(node1, 'xmppHost', "xmpp-plexus.onelab.eu")
+ec.set(node1, 'xmppPort', "5222")
+ec.set(node1, 'xmppPassword', "1234")
+
+node2 = ec.register_resource("OMFNode")
+ec.set(node2, 'hostname', "omf.plexus.wlab37")
+ec.set(node2, 'xmppSlice', "nepi")
+ec.set(node2, 'xmppHost', "xmpp-plexus.onelab.eu")
+ec.set(node2, 'xmppPort', "5222")
+ec.set(node2, 'xmppPassword', "1234")
+
+# Create and Configure the Interfaces
+iface1 = ec.register_resource("OMFWifiInterface")
+ec.set(iface1, 'alias', "w0")
+ec.set(iface1, 'mode', "adhoc")
+ec.set(iface1, 'type', "g")
+ec.set(iface1, 'essid', "vlcexp")
+#ec.set(iface1, 'ap', "11:22:33:44:55:66")
+ec.set(iface1, 'ip', "10.0.0.17")
+ec.set(iface1, 'xmppSlice', "nepi")
+ec.set(iface1, 'xmppHost', "xmpp-plexus.onelab.eu")
+ec.set(iface1, 'xmppPort', "5222")
+ec.set(iface1, 'xmppPassword', "1234")
+
+iface2 = ec.register_resource("OMFWifiInterface")
+ec.set(iface2, 'alias', "w0")
+ec.set(iface2, 'mode', "adhoc")
+ec.set(iface2, 'type', 'g')
+ec.set(iface2, 'essid', "vlcexp")
+#ec.set(iface2, 'ap', "11:22:33:44:55:66")
+ec.set(iface2, 'ip', "10.0.0.37")
+ec.set(iface2, 'xmppSlice', "nepi")
+ec.set(iface2, 'xmppHost', "xmpp-plexus.onelab.eu")
+ec.set(iface2, 'xmppPort', "5222")
+ec.set(iface2, 'xmppPassword', "1234")
+
+# Create and Configure the Channel
+channel = ec.register_resource("OMFChannel")
+ec.set(channel, 'channel', "6")
+ec.set(channel, 'xmppSlice', "nepi")
+ec.set(channel, 'xmppHost', "xmpp-plexus.onelab.eu")
+ec.set(channel, 'xmppPort', "5222")
+ec.set(channel, 'xmppPassword', "1234")
+
+# Create and Configure the Application
+app1 = ec.register_resource("OMFApplication")
+ec.set(app1, 'appid', 'Vlc#1')
+ec.set(app1, 'path', "/opt/vlc-1.1.13/cvlc")
+ec.set(app1, 'args', "/opt/10-by-p0d.avi --sout '#rtp{dst=10.0.0.37,port=1234,mux=ts}'")
+ec.set(app1, 'env', "DISPLAY=localhost:10.0 XAUTHORITY=/root/.Xauthority")
+ec.set(app1, 'xmppSlice', "nepi")
+ec.set(app1, 'xmppHost', "xmpp-plexus.onelab.eu")
+ec.set(app1, 'xmppPort', "5222")
+ec.set(app1, 'xmppPassword', "1234")
+
+app2 = ec.register_resource("OMFApplication")
+ec.set(app2, 'appid', 'Vlc#2')
+ec.set(app2, 'path', "/opt/vlc-1.1.13/cvlc")
+ec.set(app2, 'args', "rtp://10.0.0.37:1234")
+ec.set(app2, 'env', "DISPLAY=localhost:10.0 XAUTHORITY=/root/.Xauthority")
+ec.set(app2, 'xmppSlice', "nepi")
+ec.set(app2, 'xmppHost', "xmpp-plexus.onelab.eu")
+ec.set(app2, 'xmppPort', "5222")
+ec.set(app2, 'xmppPassword', "1234")
+
+app3 = ec.register_resource("OMFApplication")
+ec.set(app3, 'appid', 'Kill#2')
+ec.set(app3, 'path', "/usr/bin/killall")
+ec.set(app3, 'args', "vlc")
+ec.set(app3, 'env', " ")
+ec.set(app3, 'xmppSlice', "nepi")
+ec.set(app3, 'xmppHost', "xmpp-plexus.onelab.eu")
+ec.set(app3, 'xmppPort', "5222")
+ec.set(app3, 'xmppPassword', "1234")
+
+# Connection
+ec.register_connection(app3, node1)
+ec.register_connection(app1, node1)
+ec.register_connection(node1, iface1)
+ec.register_connection(iface1, channel)
+ec.register_connection(iface2, channel)
+ec.register_connection(node2, iface2)
+ec.register_connection(app2, node2)
+
+# Condition
+# Topology behaviour : It should not be done by the user, but ....
+ec.register_condition([iface1, iface2, channel], ResourceAction.START, [node1, node2], ResourceState.STARTED , 2)
+ec.register_condition(channel, ResourceAction.START, [iface1, iface2], ResourceState.STARTED , 1)
+ec.register_condition(app1, ResourceAction.START, channel, ResourceState.STARTED , 1)
+
+# User Behaviour
+ec.register_condition(app2, ResourceAction.START, app1, ResourceState.STARTED , 4)
+ec.register_condition([app1, app2], ResourceAction.STOP, app2, ResourceState.STARTED , 20)
+ec.register_condition(app3, ResourceAction.START, app2, ResourceState.STARTED , 25)
+
+# Deploy
+ec.deploy()
+
+# Stop Experiment
+time.sleep(45)
+ec.shutdown()
--- /dev/null
+#!/usr/bin/env python
+from neco.execution.resource import ResourceFactory
+from neco.execution.ec import ExperimentController
+
+from neco.resources.omf.omf_node import OMFNode
+from neco.resources.omf.omf_application import OMFApplication
+from neco.resources.omf.omf_interface import OMFWifiInterface
+from neco.resources.omf.omf_channel import OMFChannel
+
+import logging
+import time
+
+logging.basicConfig()
+
+# Create the EC
+ec = ExperimentController()
+
+# Register the different RM that will be used
+ResourceFactory.register_type(OMFNode)
+ResourceFactory.register_type(OMFWifiInterface)
+ResourceFactory.register_type(OMFChannel)
+ResourceFactory.register_type(OMFApplication)
+
+# Create and Configure the Nodes
+guid = ec.register_resource("OMFNode")
+node1 = ec.get_resource(guid)
+node1.set('hostname', 'omf.plexus.wlab17')
+node1.set('xmppSlice', "nepi")
+node1.set('xmppHost', "xmpp-plexus.onelab.eu")
+node1.set('xmppPort', "5222")
+node1.set('xmppPassword', "1234")
+
+guid = ec.register_resource("OMFNode")
+node2 = ec.get_resource(guid)
+node2.set('hostname', "omf.plexus.wlab37")
+node2.set('xmppSlice', "nepi")
+node2.set('xmppHost', "xmpp-plexus.onelab.eu")
+node2.set('xmppPort', "5222")
+node2.set('xmppPassword', "1234")
+
+# Create and Configure the Interfaces
+guid = ec.register_resource("OMFWifiInterface")
+iface1 = ec.get_resource(guid)
+iface1.set('alias', "w0")
+iface1.set('mode', "adhoc")
+iface1.set('type', "g")
+iface1.set('essid', "helloworld")
+iface1.set('ip', "10.0.0.17")
+iface1.set('xmppSlice', "nepi")
+iface1.set('xmppHost', "xmpp-plexus.onelab.eu")
+iface1.set('xmppPort', "5222")
+iface1.set('xmppPassword', "1234")
+
+guid = ec.register_resource("OMFWifiInterface")
+iface2 = ec.get_resource(guid)
+iface2.set('alias', "w0")
+iface2.set('mode', "adhoc")
+iface2.set('type', 'g')
+iface2.set('essid', "helloworld")
+iface2.set('ip', "10.0.0.37")
+iface2.set('xmppSlice', "nepi")
+iface2.set('xmppHost', "xmpp-plexus.onelab.eu")
+iface2.set('xmppPort', "5222")
+iface2.set('xmppPassword', "1234")
+
+# Create and Configure the Channel
+guid = ec.register_resource("OMFChannel")
+channel = ec.get_resource(guid)
+channel.set('channel', "6")
+channel.set('xmppSlice', "nepi")
+channel.set('xmppHost', "xmpp-plexus.onelab.eu")
+channel.set('xmppPort', "5222")
+channel.set('xmppPassword', "1234")
+
+# Create and Configure the Application
+guid = ec.register_resource("OMFApplication")
+app1 = ec.get_resource(guid)
+app1.set('appid', 'Vlc#1')
+app1.set('path', "/opt/vlc-1.1.13/cvlc")
+app1.set('args', "/opt/10-by-p0d.avi --sout '#rtp{dst=10.0.0.37,port=1234,mux=ts}'")
+app1.set('env', "DISPLAY=localhost:10.0 XAUTHORITY=/root/.Xauthority")
+app1.set('xmppSlice', "nepi")
+app1.set('xmppHost', "xmpp-plexus.onelab.eu")
+app1.set('xmppPort', "5222")
+app1.set('xmppPassword', "1234")
+
+guid = ec.register_resource("OMFApplication")
+app2 = ec.get_resource(guid)
+app2.set('appid', 'Vlc#2')
+app2.set('path', "/opt/vlc-1.1.13/cvlc")
+app2.set('args', "rtp://10.0.0.37:1234")
+app2.set('env', "DISPLAY=localhost:10.0 XAUTHORITY=/root/.Xauthority")
+app2.set('xmppSlice', "nepi")
+app2.set('xmppHost', "xmpp-plexus.onelab.eu")
+app2.set('xmppPort', "5222")
+app2.set('xmppPassword', "1234")
+
+guid = ec.register_resource("OMFApplication")
+app3 = ec.get_resource(guid)
+app3.set('appid', 'Kill#2')
+app3.set('path', "/usr/bin/killall")
+app3.set('args', "vlc")
+app3.set('env', " ")
+app3.set('xmppSlice', "nepi")
+app3.set('xmppHost', "xmpp-plexus.onelab.eu")
+app3.set('xmppPort', "5222")
+app3.set('xmppPassword', "1234")
+
+# Connection
+app3.connect(node1.guid)
+node1.connect(app3.guid)
+
+app1.connect(node1.guid)
+node1.connect(app1.guid)
+
+node1.connect(iface1.guid)
+iface1.connect(node1.guid)
+
+iface1.connect(channel.guid)
+channel.connect(iface1.guid)
+
+channel.connect(iface2.guid)
+iface2.connect(channel.guid)
+
+iface2.connect(node2.guid)
+node2.connect(iface2.guid)
+
+node2.connect(app2.guid)
+app2.connect(node2.guid)
+
+# Local Deploy
+node1.deploy()
+node2.deploy()
+iface1.deploy()
+iface2.deploy()
+channel.deploy()
+app1.deploy()
+app2.deploy()
+app3.deploy()
+
+# Start the Nodes
+node1.start()
+node2.start()
+time.sleep(2)
+
+# Start the Interfaces
+iface1.start()
+iface2.start()
+
+# Start the Channel
+time.sleep(2)
+channel.start()
+time.sleep(2)
+
+# Start the Application
+app1.start()
+time.sleep(2)
+app2.start()
+
+time.sleep(20)
+
+# Stop the Application
+app1.stop()
+app2.stop()
+time.sleep(1)
+app3.start()
+time.sleep(2)
+
+# Stop Experiment
+ec.shutdown()
# Resource managers
self._resources = dict()
+ # Resource managers
+ self._group = dict()
+
# Scheduler
self._scheduler = HeapScheduler()
def resources(self):
return self._resources.keys()
- def register_resource(self, rtype, guid = None, creds = None):
+ def register_resource(self, rtype, guid = None):
# Get next available guid
guid = self._guid_generator.next(guid)
# Instantiate RM
- rm = ResourceFactory.create(rtype, self, guid, creds)
+ rm = ResourceFactory.create(rtype, self, guid)
# Store RM
self._resources[guid] = rm
return guid
+ def create_group(self, *args):
+ guid = self._guid_generator.next(guid)
+
+ grp = [arg for arg in args]
+
+ self._resources[guid] = grp
+
+ return guid
+
+
def get_attributes(self, guid):
rm = self.get_resource(guid)
return rm.get_attributes()
"""
if isinstance(group1, int):
- group1 = list[group1]
+ group1 = [group1]
if isinstance(group2, int):
- group2 = list[group2]
+ group2 = [group2]
for guid1 in group1:
- rm = self.get_resource(guid)
+ rm = self.get_resource(guid1)
rm.register_condition(action, group2, state, time)
def discover(self, guid, filters):
"""
if isinstance(group1, int):
- group1 = list[group1]
+ group1 = [group1]
if isinstance(group2, int):
- group2 = list[group2]
+ group2 = [group2]
for guid1 in group1:
rm = self.get_resource(guid)
rm = self.get_resource(guid)
return rm.start_with_condition()
- def deploy(self, group = None, wait_all_ready = True):
+ def deploy(self, group = None, wait_all_deployed = True):
""" Deploy all resource manager in group
:param group: List of guids of RMs to deploy
:type group: list
- :param wait_all_ready: Wait until all RMs are deployed in
+ :param wait_all_deployed: Wait until all RMs are deployed in
order to start the RMs
:type guid: int
for guid in group:
rm = self.get_resource(guid)
- if wait_all_ready:
+ if wait_all_deployed:
towait = list(group)
towait.remove(guid)
self.register_condition(guid, ResourceAction.START,
towait, ResourceState.DEPLOYED)
- thread = threading.Thread(target = steps, args = (rm))
+ thread = threading.Thread(target = steps, args = (rm,))
threads.append(thread)
thread.start()
import functools
import logging
import weakref
+import time as TIME
_reschedule_delay = "1s"
@classmethod
def _register_filter(cls, attr):
""" Resource subclasses will invoke this method to add a
- filter attribute"""
+ filter attribute
+
+ """
cls._filters[attr.name] = attr
@classmethod
def _register_attribute(cls, attr):
""" Resource subclasses will invoke this method to add a
- resource attribute"""
+ resource attribute
+
+ """
cls._attributes[attr.name] = attr
@classmethod
def _register_filters(cls):
""" Resource subclasses will invoke this method to add a
- filter attribute"""
+ filter attribute
+
+ """
pass
@classmethod
def _register_attributes(cls):
""" Resource subclasses will invoke this method to add a
- resource attribute"""
+ resource attribute
+
+ """
pass
@classmethod
def _clsinit(cls):
+ """ Create a new dictionnary instance of the dictionnary
+ with the same template.
+
+ Each ressource should have the same registration dictionary
+ template with different instances.
+ """
# static template for resource filters
cls._filters = dict()
cls._register_filters()
@classmethod
def get_filters(cls):
+ """ Returns a copy of the filters
+
+ """
return copy.deepcopy(cls._filters.values())
@classmethod
def get_attributes(cls):
+ """ Returns a copy of the attributes
+
+ """
return copy.deepcopy(cls._attributes.values())
def __init__(self, ec, guid):
return self._connections
@property
- def conditons(self):
+ def conditions(self):
return self._conditions
@property
pass
def start(self):
+ """ Start the Resource Manager
+
+ """
if not self._state in [ResourceState.DEPLOYED, ResourceState.STOPPED]:
self.logger.error("Wrong state %s for start" % self.state)
self._state = ResourceState.STARTED
def stop(self):
+ """ Start the Resource Manager
+
+ """
if not self._state in [ResourceState.STARTED]:
self.logger.error("Wrong state %s for stop" % self.state)
self._state = ResourceState.STOPPED
def set(self, name, value):
+ """ Set the value of the attribute
+
+ :param name: Name of the attribute
+ :type name: str
+ :param name: Value of the attribute
+ :type name: str
+ :rtype: Boolean
+ """
attr = self._attrs[name]
attr.value = value
def get(self, name):
+ """ Start the Resource Manager
+
+ :param name: Name of the attribute
+ :type name: str
+ :rtype: str
+ """
attr = self._attrs[name]
return attr.value
def register_condition(self, action, group, state,
time = None):
+ """ Do the 'action' after 'time' on the current RM when 'group'
+ reach the state 'state'
+
+ :param action: Action to do. Either 'START' or 'STOP'
+ :type action: str
+ :param group: group of RM
+ :type group: str
+ :param state: RM that are part of the condition
+ :type state: list
+ :param time: Time to wait after the state is reached (ex : '2s' )
+ :type time: str
+
+ """
if action not in self.conditions:
self._conditions[action] = set()
- self.conditions.get(action).add((group, state, time))
+ # We need to use only sequence inside a set and not a list.
+ # As group is a list, we need to change it.
+ #print (tuple(group), state, time)
+ self.conditions.get(action).add((tuple(group), state, time))
def _needs_reschedule(self, group, state, time):
+ """ Internal method that verify if 'time' has elapsed since
+ all elements in 'group' have reached state 'state'.
+
+ :param group: RM that are part of the condition
+ :type group: list
+ :param state: State that group need to reach for the condtion
+ :type state: str
+ :param time: time to wait after the state
+ :type time: str
+
+
+ """
reschedule = False
delay = _reschedule_delay
break
if time:
- if state == ResourceAction.START:
+ if state == ResourceState.STARTED:
t = rm.start_time
- elif state == ResourceAction.STOP:
+ elif state == ResourceState.STOPPED:
t = rm.stop_time
else:
# Only keep time information for START and STOP
break
- delay = strfdiff(t, strnow())
- if delay < time:
+ d = strfdiff(strfnow(), t)
+ if d < time:
reschedule = True
+ delay = "%ds" % (int(time - d) +1)
break
-
return reschedule, delay
def set_with_conditions(self, name, value, group, state, time):
+ """ Set value 'value' on attribute with name 'name' when 'time'
+ has elapsed since all elements in 'group' have reached state
+ 'state'.
+
+ :param name: Name of the attribute
+ :type name: str
+ :param name: Value of the attribute
+ :type name: str
+ :param group: RM that are part of the condition
+ :type group: list
+ :param state: State that group need to reach before set
+ :type state: str
+ :param time: Time to wait after the state is reached (ex : '2s' )
+ :type time: str
+
+ """
+
reschedule = False
delay = _reschedule_delay
## evaluate if set conditions are met
# only can set with conditions after the RM is started
- if self.status != ResourceStatus.STARTED:
+ if self.state != ResourceState.STARTED:
reschedule = True
else:
reschedule, delay = self._needs_reschedule(group, state, time)
self.set(name, value)
def start_with_conditions(self):
+ """ Starts when all the conditions are reached
+
+ """
reschedule = False
delay = _reschedule_delay
## evaluate if set conditions are met
# only can start when RM is either STOPPED or DEPLOYED
- if self.status not in [ResourceStatus.STOPPED, ResourceStatus.DEPLOYED]:
+ if self.state not in [ResourceState.STOPPED, ResourceState.DEPLOYED]:
reschedule = True
else:
- for action, (group, state, time) in self.conditions.iteritems():
- if action == ResourceAction.START:
- reschedule, delay = self._needs_reschedule(group, state, time)
+ print TIME.strftime("%H:%M:%S", TIME.localtime()) + " RM : " + self._rtype + " (Guid : "+ str(self.guid) +") ----- start condition : " + str(self.conditions.items())
+ # Need to separate because it could have more that tuple of condition
+ # for the same action.
+ if self.conditions.get(ResourceAction.START):
+ for (group, state, time) in self.conditions.get(ResourceAction.START):
+ reschedule, delay = self._needs_reschedule(group, state, time)
if reschedule:
break
if reschedule:
- callback = functools.partial(self.start_with_conditions,
- group, state, time)
+ callback = functools.partial(self.start_with_conditions)
self.ec.schedule(delay, callback)
else:
+ print TIME.strftime("%H:%M:%S", TIME.localtime()) + " RM : " + self._rtype + " (Guid : "+ str(self.guid) +") ----\
+------------------------------------------------------------------------------\
+---------------------------------------------------------------- STARTING -- "
self.start()
def stop_with_conditions(self):
+ """ Starts when all the conditions are reached
+
+ """
reschedule = False
delay = _reschedule_delay
## evaluate if set conditions are met
- # only can start when RM is either STOPPED or DEPLOYED
- if self.status != ResourceStatus.STARTED:
+ # only can stop when RM is STARTED
+ if self.state != ResourceState.STARTED:
reschedule = True
else:
- for action, (group, state, time) in self.conditions.iteritems():
- if action == ResourceAction.STOP:
- reschedule, delay = self._needs_reschedule(group, state, time)
- if reschedule:
- break
+ print TIME.strftime("%H:%M:%S", TIME.localtime()) + " RM : " + self._rtype + " (Guid : "+ str(self.guid) +") ---- stop condition : " + str(self.conditions.items())
+ # Need to separate because it could have more that tuple of condition
+ # for the same action.
+ conditions = self.conditions.get(ResourceAction.STOP, [])
+ for (group, state, time) in conditions:
+ reschedule, delay = self._needs_reschedule(group, state, time)
+ if reschedule:
+ break
+
+ #else:
+ # for action, (group, state, time) in self.conditions.iteritems():
+ # if action == ResourceAction.STOP:
+ # reschedule, delay = self._needs_reschedule(group, state, time)
+ # if reschedule:
+ # break
if reschedule:
- callback = functools.partial(self.stop_with_conditions,
- group, state, time)
+ callback = functools.partial(self.stop_with_conditions)
self.ec.schedule(delay, callback)
else:
self.stop()
def deploy(self):
+ """Execute all the differents steps required to reach the state DEPLOYED
+
+ """
self.discover()
self.provision()
self._state = ResourceState.DEPLOYED
def release(self):
+ """Clean the resource at the end of the Experiment and change the status
+
+ """
self._state = ResourceState.RELEASED
def _validate_connection(self, guid):
+ """Check if the connection is available.
+
+ :param guid: Guid of the current Resource Manager
+ :type guid: int
+ :rtype: Boolean
+
+ """
# TODO: Validate!
return True
return True
# XXX: What if it is connected to more than one node?
resources = self.find_resources(exact_tags = [tags.NODE])
- self._node = resources[0] is len(resources) == 1 else None
+ self._node = resources[0] if len(resources) == 1 else None
return self._node
import ssl
import sys
import time
+import hashlib
+import neco
+import threading
from neco.resources.omf.omf_client import OMFClient
from neco.resources.omf.omf_messages_5_4 import MessageHandler
"""
def __init__(self, slice, host, port, password, xmpp_root = None):
+ """
+
+ :param slice: Xmpp Slice
+ :type slice: Str
+ :param host: Xmpp Server
+ :type host: Str
+ :param port: Xmpp Port
+ :type port: Str
+ :param password: Xmpp password
+ :type password: Str
+ :param xmpp_root: Root of the Xmpp Topic Architecture
+ :type xmpp_root: Str
+
+ """
date = datetime.datetime.now().strftime("%Y-%m-%dt%H.%M.%S")
tz = -time.altzone if time.daylight != 0 else -time.timezone
date += "%+06.2f" % (tz / 3600) # timezone difference is in seconds
self._hostnames = []
self._xmpp_root = xmpp_root or "OMF_5.4"
- self._logger = logging.getLogger("neco.resources.omf")
+ self._logger = logging.getLogger("neco.omf.omfApi ")
+ self._logger.setLevel(neco.LOGLEVEL)
# OMF xmpp client
self._client = None
xmpp.ssl_version = ssl.PROTOCOL_SSLv3
if xmpp.connect((self._host, self._port)):
- xmpp.process(threaded=True)
+ xmpp.process(block=False)
while not xmpp.ready:
time.sleep(1)
self._client = xmpp
"""
address = "/%s/%s/%s/%s" % (self._host, self._xmpp_root, self._slice, self._user)
- print address
- payload = self._message.newexpfunction(self._user, address)
+ #print address
+ payload = self._message.newexp_function(self._user, address)
slice_sid = "/%s/%s" % (self._xmpp_root, self._slice)
self._client.publish(payload, slice_sid)
self._client.create(xmpp_node)
self._client.subscribe(xmpp_node)
- payload = self._message.logfunction("2",
+ payload = self._message.log_function("2",
"nodeHandler::NodeHandler",
"INFO",
"OMF Experiment Controller 5.4 (git 529a626)")
xmpp_node = self._host_resource_id(hostname)
self._client.subscribe(xmpp_node)
- payload = self._message.enrollfunction("1", "*", "1", hostname)
+ payload = self._message.enroll_function("1", "*", "1", hostname)
self._client.publish(payload, xmpp_node)
def configure(self, hostname, attribute, value):
:type value: str
"""
- payload = self._message.configurefunction(hostname, value, attribute)
+ payload = self._message.configure_function(hostname, value, attribute)
xmpp_node = self._host_session_id(hostname)
self._client.publish(payload, xmpp_node)
:type env: str
"""
- payload = self._message.executefunction(hostname, app_id, arguments, path, env)
+ payload = self._message.execute_function(hostname, app_id, arguments, path, env)
xmpp_node = self._host_session_id(hostname)
self._client.publish(payload, xmpp_node)
:type app_id: str
"""
- payload = self._message.exitfunction(hostname, app_id)
+ payload = self._message.exit_function(hostname, app_id)
xmpp_node = self._host_session_id(hostname)
self._client.publish(payload, xmpp_node)
- def disconnect(self):
- """ Delete the sesion and logger topic and disconnect
+ def release(self, hostname):
+ """ Delete the session and logger topics. Then disconnect
+
+ """
+ if hostname in self._hostnames:
+ self.delete(hostname)
+
+ def disconnect(self) :
+ """ Delete the session and logger topics. Then disconnect
"""
self._client.delete(self._exp_session_id)
self._client.delete(self._logger_session_id)
- for hostname in self._hostnames[:]:
- self.delete(hostname)
-
time.sleep(1)
- self._client.disconnect()
+
+ # Wait the send queue to be empty before disconnect
+ self._client.disconnect(wait=True)
+ self._logger.debug(" Disconnected from XMPP Server")
class OMFAPIFactory(object):
"""
.. note::
- It allows the different RM to use the same xmpp client if they use the same credentials. For the moment, it is focused on Xmpp.
+ It allows the different RM to use the same xmpp client if they use the same credentials.
+ For the moment, it is focused on Xmpp.
"""
-
- # XXX: put '_apis' instead of '_Api'
- _Api = dict()
+ # use lock to avoid concurrent access to the Api list at the same times by 2 different threads
+ lock = threading.Lock()
+ _apis = dict()
@classmethod
def get_api(cls, slice, host, port, password):
"""
if slice and host and port and password:
- key = cls._hash_api(slice, host, port)
- if key in cls._Api:
- return cls._Api[key]
+ key = cls._make_key(slice, host, port, password)
+ cls.lock.acquire()
+ if key in cls._apis:
+ cls._apis[key]['cnt'] += 1
+ cls.lock.release()
+ return cls._apis[key]['api']
else :
- return cls.create_api(slice, host, port, password)
+ omf_api = cls.create_api(slice, host, port, password)
+ cls.lock.release()
+ return omf_api
return None
@classmethod
:type password: str
"""
- OmfApi = OMFAPI(slice, host, port, password)
- key = cls._hash_api(slice, host, port)
- cls._Api[key] = OmfApi
- return OmfApi
-
- # XXX: this is not a hash :)
- # From wikipedia: "A hash function is any algorithm or subroutine that maps large data
- # sets of variable length to smaller data sets of a fixed length."
- # The idea is to apply a function to get a smaller string. Use hashlib instead.
- # e.g:
- # import hashlib
- # res = slice + "_" + host + "_" + port
- # hashlib.md5(res).hexdigest()
- #
- # XXX: change method name for 'make_key'
+ omf_api = OMFAPI(slice, host, port, password)
+ key = cls._make_key(slice, host, port, password)
+ cls._apis[key] = {}
+ cls._apis[key]['api'] = omf_api
+ cls._apis[key]['cnt'] = 1
+ return omf_api
+
@classmethod
- def _hash_api(cls, slice, host, port):
- """ Hash the credentials in order to create a key
+ def release_api(cls, slice, host, port, password):
+ """ Release an API with this credentials
:param slice: Xmpp Slice Name
:type slice: str
:type host: str
:param port: Xmpp Port (Default : 5222)
:type port: str
+ :param password: Xmpp Password
+ :type password: str
"""
- res = slice + "_" + host + "_" + port
- return res
+ if slice and host and port and password:
+ key = cls._make_key(slice, host, port, password)
+ if key in cls._apis:
+ cls._apis[key]['cnt'] -= 1
+ #print "Api Counter : " + str(cls._apis[key]['cnt'])
+ if cls._apis[key]['cnt'] == 0:
+ omf_api = cls._apis[key]['api']
+ omf_api.disconnect()
+ @classmethod
+ def _make_key(cls, *args):
+ """ Hash the credentials in order to create a key
+
+ :param args: list of arguments used to create the hash (user, host, port, ...)
+ :type args: list of args
+
+ """
+ skey = "".join(map(str, args))
+ return hashlib.md5(skey).hexdigest()
#!/usr/bin/env python
-from neco.execution.resource import Resource, clsinit
-from neco.execution.attribute import Attribute
+from neco.execution.resource import ResourceManager, clsinit
+from neco.execution.attribute import Attribute, Flags
from neco.resources.omf.omf_api import OMFAPIFactory
import neco
import logging
@clsinit
-class OMFApplication(Resource):
+class OMFApplication(ResourceManager):
"""
.. class:: Class Args :
:type ec: ExperimentController
:param guid: guid of the RM
:type guid: int
- :param creds: Credentials to communicate with the rm (XmppClient for OMF)
+ :param creds: Credentials to communicate with the rm (XmppClient)
:type creds: dict
.. note::
path = Attribute("path", "Path of the application")
args = Attribute("args", "Argument of the application")
env = Attribute("env", "Environnement variable of the application")
- xmppSlice = Attribute("xmppSlice","Name of the slice", flags = "0x02")
- xmppHost = Attribute("xmppHost", "Xmpp Server",flags = "0x02")
- xmppPort = Attribute("xmppPort", "Xmpp Port",flags = "0x02")
- xmppPassword = Attribute("xmppPassword", "Xmpp Port",flags = "0x02")
+ xmppSlice = Attribute("xmppSlice","Name of the slice", flags = Flags.Credential)
+ xmppHost = Attribute("xmppHost", "Xmpp Server",flags = Flags.Credential)
+ xmppPort = Attribute("xmppPort", "Xmpp Port",flags = Flags.Credential)
+ xmppPassword = Attribute("xmppPassword", "Xmpp Port",flags = Flags.Credential)
cls._register_attribute(appid)
cls._register_attribute(path)
cls._register_attribute(args)
cls._register_attribute(xmppPassword)
- def __init__(self, ec, guid, creds):
+ def __init__(self, ec, guid):
"""
:param ec: The Experiment controller
:type ec: ExperimentController
"""
super(OMFApplication, self).__init__(ec, guid)
- self.set('xmppSlice', creds['xmppSlice'])
- self.set('xmppHost', creds['xmppHost'])
- self.set('xmppPort', creds['xmppPort'])
- self.set('xmppPassword', creds['xmppPassword'])
self.set('appid', "")
self.set('path', "")
self._node = None
- self._omf_api = OMFAPIFactory.get_api(self.get('xmppSlice'), self.get('xmppHost'), self.get('xmppPort'), self.get('xmppPassword'))
+ self._omf_api = None
self._logger = logging.getLogger("neco.omf.omfApp ")
self._logger.setLevel(neco.LOGLEVEL)
:rtype: Boolean
"""
- rm = self.ec.resource(guid)
+ rm = self.ec.get_resource(guid)
if rm.rtype() not in self._authorized_connections:
self._logger.debug("Connection between %s %s and %s %s refused : An Application can be connected only to a Node" % (self.rtype(), self._guid, rm.rtype(), guid))
return False
"""
for elt in conn_set:
- rm = self.ec.resource(elt)
+ rm = self.ec.get_resource(elt)
if rm.rtype() == "OMFNode":
return rm
return None
+ def deploy(self):
+ """Deploy the RM
+
+ """
+ super(OMFApplication, self).deploy()
+ self._omf_api = OMFAPIFactory.get_api(self.get('xmppSlice'),
+ self.get('xmppHost'), self.get('xmppPort'), self.get('xmppPassword'))
+
def start(self):
"""Send Xmpp Message Using OMF protocol to execute the application
"""
+ super(OMFApplication, self).start()
self._logger.debug(" " + self.rtype() + " ( Guid : " + str(self._guid) +") : " + self.get('appid') + " : " + self.get('path') + " : " + self.get('args') + " : " + self.get('env'))
- #try:
+
if self.get('appid') and self.get('path') and self.get('args') and self.get('env'):
rm_node = self._get_nodes(self._connections)
self._omf_api.execute(rm_node.get('hostname'),self.get('appid'), self.get('args'), self.get('path'), self.get('env'))
"""Send Xmpp Message Using OMF protocol to kill the application
"""
+
rm_node = self._get_nodes(self._connections)
self._omf_api.exit(rm_node.get('hostname'),self.get('appid'))
+ super(OMFApplication, self).stop()
+ def release(self):
+ """Clean the RM at the end of the experiment
+ """
+ OMFAPIFactory.release_api(self.get('xmppSlice'),
+ self.get('xmppHost'), self.get('xmppPort'), self.get('xmppPassword'))
#!/usr/bin/env python
-from neco.execution.resource import Resource, clsinit
-from neco.execution.attribute import Attribute
+from neco.execution.resource import ResourceManager, clsinit
+from neco.execution.attribute import Attribute, Flags
from neco.resources.omf.omf_api import OMFAPIFactory
import logging
@clsinit
-class OMFChannel(Resource):
+class OMFChannel(ResourceManager):
"""
.. class:: Class Args :
"""Register the attributes of an OMF channel
"""
channel = Attribute("channel", "Name of the application")
- xmppSlice = Attribute("xmppSlice","Name of the slice", flags = "0x02")
- xmppHost = Attribute("xmppHost", "Xmpp Server",flags = "0x02")
- xmppPort = Attribute("xmppPort", "Xmpp Port",flags = "0x02")
- xmppPassword = Attribute("xmppPassword", "Xmpp Port",flags = "0x02")
+ xmppSlice = Attribute("xmppSlice","Name of the slice", flags = Flags.Credential)
+ xmppHost = Attribute("xmppHost", "Xmpp Server",flags = Flags.Credential)
+ xmppPort = Attribute("xmppPort", "Xmpp Port",flags = Flags.Credential)
+ xmppPassword = Attribute("xmppPassword", "Xmpp Port",flags = Flags.Credential)
cls._register_attribute(channel)
cls._register_attribute(xmppSlice)
cls._register_attribute(xmppHost)
cls._register_attribute(xmppPort)
cls._register_attribute(xmppPassword)
- def __init__(self, ec, guid, creds):
+ def __init__(self, ec, guid):
"""
:param ec: The Experiment controller
:type ec: ExperimentController
"""
super(OMFChannel, self).__init__(ec, guid)
- self.set('xmppSlice', creds['xmppSlice'])
- self.set('xmppHost', creds['xmppHost'])
- self.set('xmppPort', creds['xmppPort'])
- self.set('xmppPassword', creds['xmppPassword'])
self._nodes_guid = list()
- self._omf_api = OMFAPIFactory.get_api(self.get('xmppSlice'), self.get('xmppHost'), self.get('xmppPort'), self.get('xmppPassword'))
+ self._omf_api = None
self._logger = logging.getLogger("neco.omf.omfChannel")
self._logger.setLevel(neco.LOGLEVEL)
:rtype: Boolean
"""
- rm = self.ec.resource(guid)
+ rm = self.ec.get_resource(guid)
if rm.rtype() in self._authorized_connections:
- self._logger.debug("Connection between %s %s and %s %s accepted" % (self.rtype(), self._guid, rm.rtype(), guid))
+ self._logger.debug("Connection between %s %s and %s %s accepted" %
+ (self.rtype(), self._guid, rm.rtype(), guid))
return True
self._logger.debug("Connection between %s %s and %s %s refused" % (self.rtype(), self._guid, rm.rtype(), guid))
return False
:type conn_set: set
:rtype: list
:return: self._nodes_guid
+
"""
for elt in conn_set:
- rm_iface = self.ec.resource(elt)
+ rm_iface = self.ec.get_resource(elt)
for conn in rm_iface._connections:
- rm_node = self.ec.resource(conn)
+ rm_node = self.ec.get_resource(conn)
if rm_node.rtype() == "OMFNode":
couple = [rm_node.get('hostname'), rm_iface.get('alias')]
#print couple
self._nodes_guid.append(couple)
return self._nodes_guid
+ def deploy(self):
+ """Deploy the RM
+
+ """
+ super(OMFChannel, self).deploy()
+ self._omf_api = OMFAPIFactory.get_api(self.get('xmppSlice'),
+ self.get('xmppHost'), self.get('xmppPort'), self.get('xmppPassword'))
+
def discover(self):
""" Discover the availables channels
"""
pass
- def provision(self, credential):
+ def provision(self):
""" Provision some availables channels
"""
"""
if self.get('channel'):
set_nodes = self._get_target(self._connections)
- #print set_nodes
+ print set_nodes
for couple in set_nodes:
#print "Couple node/alias : " + couple[0] + " , " + couple[1]
attrval = self.get('channel')
attrname = "net/%s/%s" % (couple[1], 'channel')
#print "Send the configure message"
self._omf_api.configure(couple[0], attrname, attrval)
+ super(OMFChannel, self).start()
- def xstart(self):
- try:
- if self.get('channel'):
- node = self.tc.elements.get(self._node_guid)
- attrval = self.get('channel')
- attrname = "net/%s/%s" % (self._alias, 'channel')
- self._omf_api.configure('omf.plexus.wlab17', attrname, attrval)
- except AttributeError:
- # If the attribute is not yet defined, ignore the error
- pass
+ def stop(self):
+ """Send Xmpp Message Using OMF protocol to put down the interface
+ """
+ super(OMFChannel, self).stop()
+
+ def release(self):
+ """Clean the RM at the end of the experiment
+
+ """
+ OMFAPIFactory.release_api(self.get('xmppSlice'),
+ self.get('xmppHost'), self.get('xmppPort'), self.get('xmppPassword'))
import neco
-class OMFClient(sleekxmpp.ClientXMPP):
+# inherit from BaseXmpp and XMLStream classes
+class OMFClient(sleekxmpp.ClientXMPP):
"""
.. class:: Class Args :
"""
def __init__(self, jid, password):
+ """
+
+ :param jid: Jabber Id (= Xmpp Slice + Date)
+ :type jid: Str
+ :param password: Jabber Password (= Xmpp Password)
+ :type password: Str
+
+
+ """
sleekxmpp.ClientXMPP.__init__(self, jid, password)
self._ready = False
self._registered = False
@property
def ready(self):
+ """ Check if the client is ready
+
+ """
return self._ready
def start(self, event):
+ """ Send presence to the Xmppp Server. This function is called directly by the sleekXmpp library
+
+ """
self.send_presence()
self._ready = True
self._server = "pubsub.%s" % self.boundjid.domain
def register(self, iq):
+ """ Register to the Xmppp Server. This function is called directly by the sleekXmpp library
+
+ """
if self._registered:
- self._logger.info("%s already registered!" % self.boundjid)
+ self._logger.info(" %s already registered!" % self.boundjid)
return
resp = self.Iq()
try:
resp.send(now=True)
- self._logger.info("Account created for %s!" % self.boundjid)
+ self._logger.info(" Account created for %s!" % self.boundjid)
self._registered = True
except IqError as e:
- self._logger.error("Could not register account: %s" %
+ self._logger.error(" Could not register account: %s" %
e.iq['error']['text'])
except IqTimeout:
- self._logger.error("No response from server.")
+ self._logger.error(" No response from server.")
def unregister(self):
+ """ Unregister from the Xmppp Server.
+
+ """
try:
self.plugin['xep_0077'].cancel_registration(
ifrom=self.boundjid.full)
- self._logger.info("Account unregistered for %s!" % self.boundjid)
+ self._logger.info(" Account unregistered for %s!" % self.boundjid)
except IqError as e:
- self._logger.error("Could not unregister account: %s" %
+ self._logger.error(" Could not unregister account: %s" %
e.iq['error']['text'])
except IqTimeout:
- self._logger.error("No response from server.")
+ self._logger.error(" No response from server.")
def nodes(self):
+ """ Get all the nodes of the Xmppp Server.
+
+ """
try:
result = self['xep_0060'].get_nodes(self._server)
for item in result['disco_items']['items']:
return result
except:
error = traceback.format_exc()
- self._logger.error('Could not retrieve node list.\ntraceback:\n%s', error)
+ self._logger.error(' Could not retrieve node list.\ntraceback:\n%s', error)
def subscriptions(self):
+ """ Get all the subscriptions of the Xmppp Server.
+
+ """
try:
result = self['xep_0060'].get_subscriptions(self._server)
#self.boundjid.full)
return result
except:
error = traceback.format_exc()
- self._logger.error('Could not retrieve subscriptions.\ntraceback:\n%s', error)
+ self._logger.error(' Could not retrieve subscriptions.\ntraceback:\n%s', error)
def create(self, node):
+ """ Create the topic corresponding to the node
+
+ :param node: Name of the topic, corresponding to the node (ex : omf.plexus.wlab17)
+ :type node: str
+
+ """
self._logger.debug(" Create Topic : " + node)
config = self['xep_0004'].makeForm('submit')
self['xep_0060'].create_node(self._server, node, config = config)
except:
error = traceback.format_exc()
- self._logger.error('Could not create topic: %s\ntraceback:\n%s' % (node, error))
+ self._logger.error(' Could not create topic: %s\ntraceback:\n%s' % (node, error))
def delete(self, node):
+ """ Delete the topic corresponding to the node
+
+ :param node: Name of the topic, corresponding to the node (ex : omf.plexus.wlab17)
+ :type node: str
+
+ """
+ # To check if the queue are well empty at the end
+ #print " length of the queue : " + str(self.send_queue.qsize())
+ #print " length of the queue : " + str(self.event_queue.qsize())
try:
self['xep_0060'].delete_node(self._server, node)
- self._logger.info('Deleted node: %s' % node)
+ self._logger.info(' Deleted node: %s' % node)
except:
error = traceback.format_exc()
- self._logger.error('Could not delete topic: %s\ntraceback:\n%s' % (node, error))
+ self._logger.error(' Could not delete topic: %s\ntraceback:\n%s' % (node, error))
def publish(self, data, node):
- self._logger.debug(" Publish to Topic :" + node)
+ """ Publish the data to the corresponding topic
+
+ :param data: Data that will be published
+ :type data: str
+ :param node: Name of the topic
+ :type node: str
+
+ """
+
+ self._logger.debug(" Publish to Topic : " + node)
try:
result = self['xep_0060'].publish(self._server,node,payload=data)
# id = result['pubsub']['publish']['item']['id']
# print('Published at item id: %s' % id)
except:
error = traceback.format_exc()
- self._logger.error('Could not publish to: %s\ntraceback:\n%s' \
+ self._logger.error(' Could not publish to: %s\ntraceback:\n%s' \
% (node, error))
def get(self, data):
+ """ Get the item
+
+ :param data: data from which the items will be get back
+ :type data: str
+
+
+ """
try:
result = self['xep_0060'].get_item(self._server, self.boundjid,
data)
tostring(item['payload'])))
except:
error = traceback.format_exc()
- self._logger.error('Could not retrieve item %s from topic %s\ntraceback:\n%s' \
+ self._logger.error(' Could not retrieve item %s from topic %s\ntraceback:\n%s' \
% (data, self.boundjid, error))
def retract(self, data):
+ """ Retract the item
+
+ :param data: data from which the item will be retracted
+ :type data: str
+
+ """
try:
result = self['xep_0060'].retract(self._server, self.boundjid, data)
- self._logger.info('Retracted item %s from topic %s' % (data, self.boundjid))
+ self._logger.info(' Retracted item %s from topic %s' % (data, self.boundjid))
except:
error = traceback.format_exc()
- self._logger.error('Could not retract item %s from topic %s\ntraceback:\n%s' \
+ self._logger.error(' Could not retract item %s from topic %s\ntraceback:\n%s' \
% (data, self.boundjid, error))
def purge(self):
+ """ Purge the information in the server
+
+ """
try:
result = self['xep_0060'].purge(self._server, self.boundjid)
- self._logger.info('Purged all items from topic %s' % self.boundjid)
+ self._logger.info(' Purged all items from topic %s' % self.boundjid)
except:
error = traceback.format_exc()
- self._logger.error('Could not purge items from topic %s\ntraceback:\n%s' \
+ self._logger.error(' Could not purge items from topic %s\ntraceback:\n%s' \
% (self.boundjid, error))
def subscribe(self, node):
+ """ Subscribe to a topic
+
+ :param node: Name of the topic
+ :type node: str
+
+ """
try:
result = self['xep_0060'].subscribe(self._server, node)
#self._logger.debug('Subscribed %s to node %s' \
% (self.boundjid.bare, node, error))
def unsubscribe(self, node):
+ """ Unsubscribe to a topic
+
+ :param node: Name of the topic
+ :type node: str
+
+ """
try:
result = self['xep_0060'].unsubscribe(self._server, node)
self._logger.info(' Unsubscribed %s from topic %s' % (self.boundjid.bare, node))
self._logger.error(' Could not unsubscribe %s from topic %s\ntraceback:\n%s' \
% (self.boundjid.bare, node, error))
- def _check_for_tag(self, treeroot, namespaces, tag):
- for element in treeroot.iter(namespaces+tag):
+ def _check_for_tag(self, root, namespaces, tag):
+ """ Check if an element markup is in the ElementTree
+
+ :param root: Root of the tree
+ :type root: ElementTree Element
+ :param namespaces: Namespaces of the element
+ :type namespaces: str
+ :param tag: Tag that will search in the tree
+ :type tag: str
+
+ """
+ for element in root.iter(namespaces+tag):
if element.text:
return element
else :
return None
- def _check_output(self, treeroot, namespaces):
- output_param = ["TARGET", "REASON", "PATH", "APPID", "VALUE"]
+ def _check_output(self, root, namespaces):
+ """ Check the significative element in the answer and display it
+
+ :param root: Root of the tree
+ :type root: ElementTree Element
+ :param namespaces: Namespaces of the tree
+ :type namespaces: str
+
+ """
+ fields = ["TARGET", "REASON", "PATH", "APPID", "VALUE"]
response = ""
- for elt in output_param:
- msg = self._check_for_tag(treeroot, namespaces, elt)
+ for elt in fields:
+ msg = self._check_for_tag(root, namespaces, elt)
if msg is not None:
response = response + " " + msg.text + " :"
- deb = self._check_for_tag(treeroot, namespaces, "MESSAGE")
+ deb = self._check_for_tag(root, namespaces, "MESSAGE")
if deb is not None:
self._logger.debug(response + " " + deb.text)
else :
self._logger.info(response)
def handle_omf_message(self, iq):
+ """ Handle published/received message
+
+ :param iq: Stanzas that is currently published/received
+ :type iq: Iq Stanza
+
+ """
namespaces = "{http://jabber.org/protocol/pubsub}"
for i in iq['pubsub_event']['items']:
root = ET.fromstring(str(i))
#!/usr/bin/env python
-from neco.execution.resource import Resource, clsinit
-from neco.execution.attribute import Attribute
+from neco.execution.resource import ResourceManager, clsinit
+from neco.execution.attribute import Attribute, Flags
from neco.resources.omf.omf_api import OMFAPIFactory
import logging
@clsinit
-class OMFWifiInterface(Resource):
+class OMFWifiInterface(ResourceManager):
"""
.. class:: Class Args :
@classmethod
def _register_attributes(cls):
"""Register the attributes of an OMF interface
+
"""
- alias = Attribute("alias","Alias of the interface", default_value = "w0")
+ alias = Attribute("alias","Alias of the interface", default_value = "w0")
mode = Attribute("mode","Mode of the interface")
type = Attribute("type","Type of the interface")
essid = Attribute("essid","Essid of the interface")
ip = Attribute("ip","IP of the interface")
- xmppSlice = Attribute("xmppSlice","Name of the slice", flags = "0x02")
- xmppHost = Attribute("xmppHost", "Xmpp Server",flags = "0x02")
- xmppPort = Attribute("xmppPort", "Xmpp Port",flags = "0x02")
- xmppPassword = Attribute("xmppPassword", "Xmpp Port",flags = "0x02")
+ xmppSlice = Attribute("xmppSlice","Name of the slice", flags = Flags.Credential)
+ xmppHost = Attribute("xmppHost", "Xmpp Server",flags = Flags.Credential)
+ xmppPort = Attribute("xmppPort", "Xmpp Port",flags = Flags.Credential)
+ xmppPassword = Attribute("xmppPassword", "Xmpp Port",flags = Flags.Credential)
cls._register_attribute(alias)
cls._register_attribute(xmppSlice)
cls._register_attribute(xmppHost)
cls._register_attribute(essid)
cls._register_attribute(ip)
- def __init__(self, ec, guid, creds):
+ def __init__(self, ec, guid):
"""
:param ec: The Experiment controller
:type ec: ExperimentController
"""
super(OMFWifiInterface, self).__init__(ec, guid)
- self.set('xmppSlice', creds['xmppSlice'])
- self.set('xmppHost', creds['xmppHost'])
- self.set('xmppPort', creds['xmppPort'])
- self.set('xmppPassword', creds['xmppPassword'])
- self._omf_api = OMFAPIFactory.get_api(self.get('xmppSlice'), self.get('xmppHost'), self.get('xmppPort'), self.get('xmppPassword'))
+ self._omf_api = None
self._alias = self.get('alias')
self._logger = logging.getLogger("neco.omf.omfIface ")
self._logger.setLevel(neco.LOGLEVEL)
def _validate_connection(self, guid):
- """Check if the connection is available.
+ """ Check if the connection is available.
:param guid: Guid of the current RM
:type guid: int
:rtype: Boolean
"""
- rm = self.ec.resource(guid)
+ rm = self.ec.get_resource(guid)
if rm.rtype() in self._authorized_connections:
- self._logger.debug("Connection between %s %s and %s %s accepted" % (self.rtype(), self._guid, rm.rtype(), guid))
+ self._logger.debug("Connection between %s %s and %s %s accepted" %
+ (self.rtype(), self._guid, rm.rtype(), guid))
return True
- self._logger.debug("Connection between %s %s and %s %s refused" % (self.rtype(), self._guid, rm.rtype(), guid))
+ self._logger.debug("Connection between %s %s and %s %s refused" %
+ (self.rtype(), self._guid, rm.rtype(), guid))
return False
def _get_nodes(self, conn_set):
- """
- Get the RM of the node to which the application is connected
+ """ Get the RM of the node to which the application is connected
:param conn_set: Connections of the current Guid
:type conn_set: set
:rtype: ResourceManager
+
"""
for elt in conn_set:
- rm = self.ec.resource(elt)
+ rm = self.ec.get_resource(elt)
if rm.rtype() == "OMFNode":
return rm
return None
+ def deploy(self):
+ """Deploy the RM
+
+ """
+ super(OMFWifiInterface, self).deploy()
+ self._omf_api = OMFAPIFactory.get_api(self.get('xmppSlice'),
+ self.get('xmppHost'), self.get('xmppPort'), self.get('xmppPassword'))
+
def start(self):
"""Send Xmpp Messages Using OMF protocol to configure Interface
"""
- self._logger.debug(self.rtype() + " ( Guid : " + str(self._guid) +") : " + self.get('mode') + " : " + self.get('type') + " : " + self.get('essid') + " : " + self.get('ip'))
+ self._logger.debug(" " + self.rtype() + " ( Guid : " + str(self._guid) +") : " +
+ self.get('mode') + " : " + self.get('type') + " : " +
+ self.get('essid') + " : " + self.get('ip'))
#try:
if self.get('mode') and self.get('type') and self.get('essid') and self.get('ip'):
rm_node = self._get_nodes(self._connections)
attrname = "net/%s/%s" % (self._alias, attrname)
#print "Send the configure message"
self._omf_api.configure(rm_node.get('hostname'), attrname, attrval)
+ super(OMFWifiInterface, self).start()
def stop(self):
"""Send Xmpp Message Using OMF protocol to put down the interface
"""
- self._omf_api.disconnect()
+ super(OMFWifiInterface, self).stop()
+ def release(self):
+ """Clean the RM at the end of the experiment
+
+ """
+ OMFAPIFactory.release_api(self.get('xmppSlice'),
+ self.get('xmppHost'), self.get('xmppPort'), self.get('xmppPassword'))
from xml.etree import cElementTree as ET
-EXECUTE = "EXECUTE"
-KILL = "KILL"
-STDIN = "STDIN"
-NOOP = "NOOP"
-PM_INSTALL = "PM_INSTALL"
-APT_INSTALL = "APT_INSTALL"
-RPM_INSTALL = "RPM_INSTALL"
-RESET = "RESET"
-REBOOT = "REBOOT"
-MODPROBE = "MODPROBE"
-CONFIGURE = "CONFIGURE"
-LOAD_IMAGE = "LOAD_IMAGE"
-SAVE_IMAGE = "SAVE_IMAGE"
-LOAD_DATA = "LOAD_DATA"
-SET_LINK = "SET_LINK"
-ALIAS = "ALIAS"
-SET_DISCONNECTION = "SET_DISCONNECTION"
-RESTART = "RESTART"
-ENROLL = "ENROLL"
-EXIT = "EXIT"
-
class MessageHandler():
"""
.. class:: Class Args :
"""
-
def __init__(self, sliceid, expid ):
+ """
+
+ :param sliceid: Slice Name (= Xmpp Slice)
+ :type expid: Str
+ :param expid: Experiment ID (= Xmpp User)
+ :type expid: Str
+
+ """
self._slice_id = sliceid
self._exp_id = expid
- print "init" + self._exp_id +" "+ self._slice_id
- pass
-
- def Mid(self, parent, keyword):
- mid = ET.SubElement(parent, keyword)
- mid.set("id", "\'omf-payload\'")
- return mid
- def Mtext(self, parent, keyword, text):
- mtext = ET.SubElement(parent, keyword)
- mtext.text = text
- return mtext
- def executefunction(self, target, appid, cmdlineargs, path, env):
+ def _id_element(self, parent, markup):
+ """ Insert a markup element with an id
+
+ :param parent: Parent element in an XML point of view
+ :type parent: ElementTree Element
+ :param markup: Name of the markup
+ :type markup: str
+
+ """
+ elt = ET.SubElement(parent, markup)
+ elt.set("id", "\'omf-payload\'")
+ return elt
+
+ def _attr_element(self, parent, markup, text):
+ """ Insert a markup element with a text (value)
+
+ :param parent: Parent element in an XML point of view
+ :type parent: ElementTree Element
+ :param markup: Name of the markup
+ :type markup: str
+ :param text: Value of the markup element
+ :type text: str
+
+ """
+ elt = ET.SubElement(parent, markup)
+ elt.text = text
+ return elt
+
+ def execute_function(self, target, appid, cmdlineargs, path, env):
+ """ Build an Execute Message
+
+ :param target: Hrn of the target node (ex : omf.plexus.wlab17)
+ :type target: str
+ :param appid: Application id
+ :type appid: str
+ :param cmdlineargs: Arguments of the application
+ :type cmdlineargs: str
+ :param path: Path of the application
+ :type path: str
+ :param env: Environment variables
+ :type env: str
+
+ """
payload = ET.Element("omf-message")
- execute = self.Mid(payload,"EXECUTE")
- env = self.Mtext(execute, "ENV", env)
- sliceid = self.Mtext(execute,"SLICEID",self._slice_id)
- expid = self.Mtext(execute,"EXPID",self._exp_id)
- target = self.Mtext(execute,"TARGET",target)
- appid = self.Mtext(execute,"APPID",appid)
- cmdlineargs = self.Mtext(execute,"CMDLINEARGS",cmdlineargs)
- path = self.Mtext(execute,"PATH",path)
+ execute = self._id_element(payload,"EXECUTE")
+ env = self._attr_element(execute, "ENV", env)
+ sliceid = self._attr_element(execute,"SLICEID",self._slice_id)
+ expid = self._attr_element(execute,"EXPID",self._exp_id)
+ target = self._attr_element(execute,"TARGET",target)
+ appid = self._attr_element(execute,"APPID",appid)
+ cmdlineargs = self._attr_element(execute,"CMDLINEARGS",cmdlineargs)
+ path = self._attr_element(execute,"PATH",path)
return payload
- def exitfunction(self, target, appid):
+ def exit_function(self, target, appid):
+ """ Build an Exit Message
+
+ :param target: Hrn of the target node (ex : omf.plexus.wlab17)
+ :type target: str
+ :param appid: Application id (ex : vlc#1)
+ :type appid: str
+
+ """
payload = ET.Element("omf-message")
- execute = self.Mid(payload,"EXIT")
- sliceid = self.Mtext(execute,"SLICEID",self._slice_id)
- expid = self.Mtext(execute,"EXPID",self._exp_id)
- target = self.Mtext(execute,"TARGET",target)
- appid = self.Mtext(execute,"APPID",appid)
+ execute = self._id_element(payload,"EXIT")
+ sliceid = self._attr_element(execute,"SLICEID",self._slice_id)
+ expid = self._attr_element(execute,"EXPID",self._exp_id)
+ target = self._attr_element(execute,"TARGET",target)
+ appid = self._attr_element(execute,"APPID",appid)
return payload
- def configurefunction(self, target, value, path):
+ def configure_function(self, target, value, path):
+ """ Build a Configure Message
+
+ :param target: Hrn of the target node (ex : omf.plexus.wlab17)
+ :type target: str
+ :param value: guid of the RM
+ :type value: int
+ :param path: Path of the element to configure (ex : net/w0/channel)
+ :type path: dict
+
+ """
payload = ET.Element("omf-message")
- config = self.Mid(payload, "CONFIGURE")
- sliceid = self.Mtext(config,"SLICEID",self._slice_id)
- expid = self.Mtext(config,"EXPID",self._exp_id)
- target = self.Mtext(config,"TARGET",target)
- value = self.Mtext(config,"VALUE",value)
- path = self.Mtext(config,"PATH",path)
+ config = self._id_element(payload, "CONFIGURE")
+ sliceid = self._attr_element(config,"SLICEID",self._slice_id)
+ expid = self._attr_element(config,"EXPID",self._exp_id)
+ target = self._attr_element(config,"TARGET",target)
+ value = self._attr_element(config,"VALUE",value)
+ path = self._attr_element(config,"PATH",path)
return payload
- def logfunction(self,level, logger, level_name, data):
+ def log_function(self,level, logger, level_name, data):
+ """ Build a Log Message
+
+ :param level: Level of logging
+ :type level: str
+ :param logger: Element publishing the log
+ :type logger: str
+ :param level_name: Name of the level (ex : INFO)
+ :type level_name: str
+ :param data: Content to publish
+ :type data: str
+
+ """
payload = ET.Element("omf-message")
- log = self.Mid(payload, "LOGGING")
- level = self.Mtext(log,"LEVEL",level)
- sliceid = self.Mtext(log,"SLICEID",self._slice_id)
- logger = self.Mtext(log,"LOGGER",logger)
- expid = self.Mtext(log,"EXPID",self._exp_id)
- level_name = self.Mtext(log,"LEVEL_NAME",level_name)
- data = self.Mtext(log,"DATA",data)
+ log = self._id_element(payload, "LOGGING")
+ level = self._attr_element(log,"LEVEL",level)
+ sliceid = self._attr_element(log,"SLICEID",self._slice_id)
+ logger = self._attr_element(log,"LOGGER",logger)
+ expid = self._attr_element(log,"EXPID",self._exp_id)
+ level_name = self._attr_element(log,"LEVEL_NAME",level_name)
+ data = self._attr_element(log,"DATA",data)
return payload
- def aliasfunction(self, name, target):
+ def alias_function(self, name, target):
+ """ Build a Alias Message
+
+ :param name: Name of the new alias
+ :type name: str
+ :param target: Hrn of the target node (ex : omf.plexus.wlab17)
+ :type target: str
+
+ """
payload = ET.Element("omf-message")
- alias = self.Mid(payload,"ALIAS")
- sliceid = self.Mtext(alias,"SLICEID",self._slice_id)
- expid = self.Mtext(alias,"EXPID",self._exp_id)
- name = self.Mtext(alias,"NAME",name)
- target = self.Mtext(alias,"TARGET",target)
+ alias = self._id_element(payload,"ALIAS")
+ sliceid = self._attr_element(alias,"SLICEID",self._slice_id)
+ expid = self._attr_element(alias,"EXPID",self._exp_id)
+ name = self._attr_element(alias,"NAME",name)
+ target = self._attr_element(alias,"TARGET",target)
return payload
- def enrollfunction(self, enrollkey, image, index, target ):
+ def enroll_function(self, enrollkey, image, index, target ):
+ """ Build an Enroll Message
+
+ :param enrollkey: Type of enrollment (= 1)
+ :type enrollkey: str
+ :param image: Image (= * when all the nodes are concerned)
+ :type image: str
+ :param index: Index (= 1 in general)
+ :type index: str
+ :param target: Hrn of the target node (ex : omf.plexus.wlab17)
+ :type target: str
+
+ """
payload = ET.Element("omf-message")
- enroll = self.Mid(payload,"ENROLL")
- enrollkey = self.Mtext(enroll,"ENROLLKEY",enrollkey)
- sliceid = self.Mtext(enroll,"SLICEID",self._slice_id)
- image = self.Mtext(enroll,"IMAGE",image)
- expid = self.Mtext(enroll,"EXPID",self._exp_id)
- index = self.Mtext(enroll,"INDEX",index)
- target = self.Mtext(enroll,"TARGET",target)
+ enroll = self._id_element(payload,"ENROLL")
+ enrollkey = self._attr_element(enroll,"ENROLLKEY",enrollkey)
+ sliceid = self._attr_element(enroll,"SLICEID",self._slice_id)
+ image = self._attr_element(enroll,"IMAGE",image)
+ expid = self._attr_element(enroll,"EXPID",self._exp_id)
+ index = self._attr_element(enroll,"INDEX",index)
+ target = self._attr_element(enroll,"TARGET",target)
return payload
- def noopfunction(self,target):
+ def noop_function(self,target):
+ """ Build a Noop Message
+
+ :param target: Hrn of the target node (ex : omf.plexus.wlab17)
+ :type target: str
+
+ """
payload = ET.Element("omf-message")
- noop = self.Mid(payload,"NOOP")
- sliceid = self.Mtext(noop,"SLICEID",self._slice_id)
- expid = self.Mtext(noop,"EXPID",self._exp_id)
- target = self.Mtext(noop,"TARGET",target)
+ noop = self._id_element(payload,"NOOP")
+ sliceid = self._attr_element(noop,"SLICEID",self._slice_id)
+ expid = self._attr_element(noop,"EXPID",self._exp_id)
+ target = self._attr_element(noop,"TARGET",target)
return payload
- def newexpfunction(self, experimentid, address):
+ def newexp_function(self, experimentid, address):
+ """ Build a NewExp Message
+
+ :param experimentid: Id of the new experiment
+ :type experimentid: str
+ :param address: Adress of the destination set of nodes
+ :type address: str
+
+ """
payload = ET.Element("omf-message")
- newexp = self.Mid(payload,"EXPERIMENT_NEW")
- experimentid = self.Mtext(newexp,"EXPERIMENT_ID",experimentid)
- sliceid = self.Mtext(newexp,"SLICEID",self._slice_id)
- expid = self.Mtext(newexp,"EXPID",self._exp_id)
- address = self.Mtext(newexp,"ADDRESS",address)
+ newexp = self._id_element(payload,"EXPERIMENT_NEW")
+ experimentid = self._attr_element(newexp,"EXPERIMENT_ID",experimentid)
+ sliceid = self._attr_element(newexp,"SLICEID",self._slice_id)
+ expid = self._attr_element(newexp,"EXPID",self._exp_id)
+ address = self._attr_element(newexp,"ADDRESS",address)
return payload
- def handle_message(self, msg):
- # Do something!!!
- return msg
#!/usr/bin/env python
from neco.execution.resource import ResourceManager, clsinit
-from neco.execution.attribute import Attribute
+from neco.execution.attribute import Attribute, Flags
from neco.resources.omf.omf_api import OMFAPIFactory
hostname = Attribute("hostname", "Hostname of the machine")
cpu = Attribute("cpu", "CPU of the node")
ram = Attribute("ram", "RAM of the node")
- # XXX: flags = "0x02" is not human readable.
- # instead:
- # from neco.execution.attribute import Attribute, Flags
- # xmppSlice = Attribute("xmppSlice","Name of the slice", flags = Flags.Credential)
- xmppSlice = Attribute("xmppSlice","Name of the slice", flags = "0x02")
- xmppHost = Attribute("xmppHost", "Xmpp Server",flags = "0x02")
- xmppPort = Attribute("xmppPort", "Xmpp Port",flags = "0x02")
- xmppPassword = Attribute("xmppPassword", "Xmpp Port",flags = "0x02")
+ xmppSlice = Attribute("xmppSlice","Name of the slice", flags = Flags.Credential)
+ xmppHost = Attribute("xmppHost", "Xmpp Server",flags = Flags.Credential)
+ xmppPort = Attribute("xmppPort", "Xmpp Port",flags = Flags.Credential)
+ xmppPassword = Attribute("xmppPassword", "Xmpp Port",flags = Flags.Credential)
cls._register_attribute(hostname)
cls._register_attribute(ram)
cls._register_attribute(cpu)
cls._register_attribute(xmppSlice)
cls._register_attribute(xmppHost)
cls._register_attribute(xmppPort)
- ls._register_attribute(xmppPassword)
+ cls._register_attribute(xmppPassword)
@classmethod
def _register_filters(cls):
# XXX: We don't necessary need to have the credentials at the
# moment we create the RM
- # THE OMF API SHOULD BE CREATED ON THE DEPLOY METHOD, NOT NOW
- # THIS FORCES MORE CONSTRAINES ON THE WAY WE WILL AUTHOMATE DEPLOYMENT!
- def __init__(self, ec, guid, creds):
+ def __init__(self, ec, guid):
"""
:param ec: The Experiment controller
:type ec: ExperimentController
"""
super(OMFNode, self).__init__(ec, guid)
- self.set('xmppSlice', creds['xmppSlice'])
- self.set('xmppHost', creds['xmppHost'])
- self.set('xmppPort', creds['xmppPort'])
- self.set('xmppPassword', creds['xmppPassword'])
- # XXX: Lines should not be more than 80 characters!
- self._omf_api = OMFAPIFactory.get_api(self.get('xmppSlice'), self.get('xmppHost'), self.get('xmppPort'), self.get('xmppPassword'))
+ self._omf_api = None
self._logger = logging.getLogger("neco.omf.omfNode ")
:rtype: Boolean
"""
- rm = self.ec.resource(guid)
+ rm = self.ec.get_resource(guid)
if rm.rtype() in self._authorized_connections:
- self._logger.debug("Connection between %s %s and %s %s accepted" % (self.rtype(), self._guid, rm.rtype(), guid))
+ self._logger.debug("Connection between %s %s and %s %s accepted" %
+ (self.rtype(), self._guid, rm.rtype(), guid))
return True
- self._logger.debug("Connection between %s %s and %s %s refused" % (self.rtype(), self._guid, rm.rtype(), guid))
+ self._logger.debug("Connection between %s %s and %s %s refused" %
+ (self.rtype(), self._guid, rm.rtype(), guid))
return False
+ def deploy(self):
+ """Deploy the RM
+
+ """
+ self._omf_api = OMFAPIFactory.get_api(self.get('xmppSlice'),
+ self.get('xmppHost'), self.get('xmppPort'), self.get('xmppPassword'))
+ super(OMFNode, self).deploy()
+
def discover(self):
""" Discover the availables nodes
"""
pass
- def provision(self, credential):
+ def provision(self):
""" Provision some availables nodes
"""
"""Send Xmpp Message Using OMF protocol to enroll the node into the experiment
"""
+ super(OMFNode, self).start()
self._omf_api.enroll_host(self.get('hostname'))
def stop(self):
"""Send Xmpp Message Using OMF protocol to disconnect the node
"""
- self._omf_api.disconnect()
+ super(OMFNode, self).stop()
+
+ def release(self):
+ """Clean the RM at the end of the experiment
+
+ """
+ self._omf_api.release(self.get('hostname'))
+ OMFAPIFactory.release_api(self.get('xmppSlice'),
+ self.get('xmppHost'), self.get('xmppPort'), self.get('xmppPassword'))
+
def configure(self):
#routes = self.tc._add_route.get(self.guid, [])
#!/usr/bin/env python
-from neco.execution.resource import ResourceFactory
+from neco.execution.resource import ResourceFactory, ResourceManager, ResourceAction, ResourceState
from neco.execution.ec import ExperimentController
from neco.resources.omf.omf_node import OMFNode
class DummyEC(ExperimentController):
pass
-class OMFVLCTestCase(unittest.TestCase):
+class DummyRM(ResourceManager):
+ pass
- def setUp(self):
- #self.guid_generator = guid.GuidGenerator()
- self._creds = {'xmppSlice' : 'nepi' , 'xmppHost' : 'xmpp-plexus.onelab.eu' , 'xmppPort' : '5222', 'xmppPassword' : '1234' }
- def tearDown(self):
- pass
+class OMFResourceFactoryTestCase(unittest.TestCase):
def test_creation_phase(self):
- ec = DummyEC()
-
ResourceFactory.register_type(OMFNode)
ResourceFactory.register_type(OMFWifiInterface)
ResourceFactory.register_type(OMFChannel)
self.assertEquals(len(ResourceFactory.resource_types()), 4)
- #def xtest_creation_and_configuration_node(self):
- guid = ec.register_resource("OMFNode", creds = self._creds)
- node1 = ec._resources[guid]
- node1.set('hostname', 'omf.plexus.wlab17')
-
- guid = ec.register_resource("OMFNode", creds = self._creds)
- node2 = ec._resources[guid]
- node2.set('hostname', "omf.plexus.wlab37")
-
- #def xtest_creation_and_configuration_interface(self):
- guid = ec.register_resource("OMFWifiInterface", creds = self._creds)
- iface1 = ec._resources[guid]
- iface1.set('alias', "w0")
- iface1.set('mode', "adhoc")
- iface1.set('type', "g")
- iface1.set('essid', "helloworld")
- iface1.set('ip', "10.0.0.17")
-
- guid = ec.register_resource("OMFWifiInterface", creds = self._creds)
- iface2 = ec._resources[guid]
- iface2.set('alias', "w0")
- iface2.set('mode', "adhoc")
- iface2.set('type', 'g')
- iface2.set('essid', "helloworld")
- iface2.set('ip', "10.0.0.37")
-
- #def xtest_creation_and_configuration_channel(self):
- guid = ec.register_resource("OMFChannel", creds = self._creds)
- channel = ec._resources[guid]
- channel.set('channel', "6")
-
- #def xtest_creation_and_configuration_application(self):
- guid = ec.register_resource("OMFApplication", creds = self._creds)
- app1 = ec._resources[guid]
- app1.set('appid', 'Vlc#1')
- app1.set('path', "/opt/vlc-1.1.13/cvlc")
- app1.set('args', "/opt/10-by-p0d.avi --sout '#rtp{dst=10.0.0.37,port=1234,mux=ts}'")
- app1.set('env', "DISPLAY=localhost:10.0 XAUTHORITY=/root/.Xauthority")
-
- guid = ec.register_resource("OMFApplication", creds = self._creds)
- app2 = ec._resources[guid]
- app2.set('appid', 'Vlc#2')
- app2.set('path', "/opt/vlc-1.1.13/cvlc")
- app2.set('args', "rtp://10.0.0.37:1234")
- app2.set('env', "DISPLAY=localhost:10.0 XAUTHORITY=/root/.Xauthority")
- self.assertEquals(len(OMFAPIFactory._Api), 1)
-
- #def test_connection(self):
- app1.connect(node1._guid)
- node1.connect(app1._guid)
-
- node1.connect(iface1._guid)
- iface1.connect(node1._guid)
-
- iface1.connect(channel._guid)
- channel.connect(iface1._guid)
-
- channel.connect(iface2._guid)
- iface2.connect(channel._guid)
-
- iface2.connect(node2._guid)
- node2.connect(iface2._guid)
-
- node2.connect(app2._guid)
- app2.connect(node2._guid)
-
- #def test_start_node(self):
- node1.start()
- node2.start()
- time.sleep(1)
- #pass
-
- #def test_start_interface(self):
- iface1.start()
- iface2.start()
-
- #def test_start_channel(self):
- channel.start()
- time.sleep(1)
-
- #def test_start_application(self):
- app1.start()
- time.sleep(2)
- app2.start()
-
- time.sleep(10)
-
- #def test_stop_application(self):
- app1.stop()
- app2.stop()
- time.sleep(2)
-
-
- #def test_stop_nodes(self):
- node1.stop()
- #node2.stop()
+
+class OMFVLCTestCase(unittest.TestCase):
+
+ def setUp(self):
+ self.ec = DummyEC()
+ ResourceFactory.register_type(OMFNode)
+ ResourceFactory.register_type(OMFWifiInterface)
+ ResourceFactory.register_type(OMFChannel)
+ ResourceFactory.register_type(OMFApplication)
+
+ def tearDown(self):
+ self.ec.shutdown()
+
+ def test_creation_and_configuration_node(self):
+
+ node1 = self.ec.register_resource("OMFNode")
+ self.ec.set(node1, 'hostname', 'omf.plexus.wlab17')
+ self.ec.set(node1, 'xmppSlice', "nepi")
+ self.ec.set(node1, 'xmppHost', "xmpp-plexus.onelab.eu")
+ self.ec.set(node1, 'xmppPort', "5222")
+ self.ec.set(node1, 'xmppPassword', "1234")
+
+ self.assertEquals(self.ec.get(node1, 'hostname'), 'omf.plexus.wlab17')
+ self.assertEquals(self.ec.get(node1, 'xmppSlice'), 'nepi')
+ self.assertEquals(self.ec.get(node1, 'xmppHost'), 'xmpp-plexus.onelab.eu')
+ self.assertEquals(self.ec.get(node1, 'xmppPort'), '5222')
+ self.assertEquals(self.ec.get(node1, 'xmppPassword'), '1234')
+
+ def test_creation_and_configuration_interface(self):
+
+ iface1 = self.ec.register_resource("OMFWifiInterface")
+ self.ec.set(iface1, 'alias', "w0")
+ self.ec.set(iface1, 'mode', "adhoc")
+ self.ec.set(iface1, 'type', "g")
+ self.ec.set(iface1, 'essid', "vlcexp")
+ self.ec.set(iface1, 'ip', "10.0.0.17")
+ self.ec.set(iface1, 'xmppSlice', "nepi")
+ self.ec.set(iface1, 'xmppHost', "xmpp-plexus.onelab.eu")
+ self.ec.set(iface1, 'xmppPort', "5222")
+ self.ec.set(iface1, 'xmppPassword', "1234")
+
+ self.assertEquals(self.ec.get(iface1, 'alias'), 'w0')
+ self.assertEquals(self.ec.get(iface1, 'mode'), 'adhoc')
+ self.assertEquals(self.ec.get(iface1, 'type'), 'g')
+ self.assertEquals(self.ec.get(iface1, 'essid'), 'vlcexp')
+ self.assertEquals(self.ec.get(iface1, 'ip'), '10.0.0.17')
+ self.assertEquals(self.ec.get(iface1, 'xmppSlice'), 'nepi')
+ self.assertEquals(self.ec.get(iface1, 'xmppHost'), 'xmpp-plexus.onelab.eu')
+ self.assertEquals(self.ec.get(iface1, 'xmppPort'), '5222')
+ self.assertEquals(self.ec.get(iface1, 'xmppPassword'), '1234')
+
+ def test_creation_and_configuration_channel(self):
+
+ channel = self.ec.register_resource("OMFChannel")
+ self.ec.set(channel, 'channel', "6")
+ self.ec.set(channel, 'xmppSlice', "nepi")
+ self.ec.set(channel, 'xmppHost', "xmpp-plexus.onelab.eu")
+ self.ec.set(channel, 'xmppPort', "5222")
+ self.ec.set(channel, 'xmppPassword', "1234")
+
+ self.assertEquals(self.ec.get(channel, 'channel'), '6')
+ self.assertEquals(self.ec.get(channel, 'xmppSlice'), 'nepi')
+ self.assertEquals(self.ec.get(channel, 'xmppHost'), 'xmpp-plexus.onelab.eu')
+ self.assertEquals(self.ec.get(channel, 'xmppPort'), '5222')
+ self.assertEquals(self.ec.get(channel, 'xmppPassword'), '1234')
+
+ def test_creation_and_configuration_application(self):
+
+ app1 = self.ec.register_resource("OMFApplication")
+ self.ec.set(app1, 'appid', 'Vlc#1')
+ self.ec.set(app1, 'path', "/opt/vlc-1.1.13/cvlc")
+ self.ec.set(app1, 'args', "/opt/10-by-p0d.avi --sout '#rtp{dst=10.0.0.37,port=1234,mux=ts}'")
+ self.ec.set(app1, 'env', "DISPLAY=localhost:10.0 XAUTHORITY=/root/.Xauthority")
+ self.ec.set(app1, 'xmppSlice', "nepi")
+ self.ec.set(app1, 'xmppHost', "xmpp-plexus.onelab.eu")
+ self.ec.set(app1, 'xmppPort', "5222")
+ self.ec.set(app1, 'xmppPassword', "1234")
+
+ self.assertEquals(self.ec.get(app1, 'appid'), 'Vlc#1')
+ self.assertEquals(self.ec.get(app1, 'path'), '/opt/vlc-1.1.13/cvlc')
+ self.assertEquals(self.ec.get(app1, 'args'), "/opt/10-by-p0d.avi --sout '#rtp{dst=10.0.0.37,port=1234,mux=ts}'")
+ self.assertEquals(self.ec.get(app1, 'env'), 'DISPLAY=localhost:10.0 XAUTHORITY=/root/.Xauthority')
+ self.assertEquals(self.ec.get(app1, 'xmppSlice'), 'nepi')
+ self.assertEquals(self.ec.get(app1, 'xmppHost'), 'xmpp-plexus.onelab.eu')
+ self.assertEquals(self.ec.get(app1, 'xmppPort'), '5222')
+ self.assertEquals(self.ec.get(app1, 'xmppPassword'), '1234')
+
+ def test_connection(self):
+
+ node1 = self.ec.register_resource("OMFNode")
+ iface1 = self.ec.register_resource("OMFWifiInterface")
+ channel = self.ec.register_resource("OMFChannel")
+ app1 = self.ec.register_resource("OMFApplication")
+ app2 = self.ec.register_resource("OMFApplication")
+
+ self.ec.register_connection(app1, node1)
+ self.ec.register_connection(app2, node1)
+ self.ec.register_connection(node1, iface1)
+ self.ec.register_connection(iface1, channel)
+
+ self.assertEquals(len(self.ec.get_resource(node1).connections), 3)
+ self.assertEquals(len(self.ec.get_resource(iface1).connections), 2)
+ self.assertEquals(len(self.ec.get_resource(channel).connections), 1)
+ self.assertEquals(len(self.ec.get_resource(app1).connections), 1)
+ self.assertEquals(len(self.ec.get_resource(app2).connections), 1)
+
+ def test_condition(self):
+
+ node1 = self.ec.register_resource("OMFNode")
+ iface1 = self.ec.register_resource("OMFWifiInterface")
+ channel = self.ec.register_resource("OMFChannel")
+ app1 = self.ec.register_resource("OMFApplication")
+ app2 = self.ec.register_resource("OMFApplication")
+
+ self.ec.register_connection(app1, node1)
+ self.ec.register_connection(app2, node1)
+ self.ec.register_connection(node1, iface1)
+ self.ec.register_connection(iface1, channel)
+
+ # For the moment
+ self.ec.register_condition([iface1, channel], ResourceAction.START, node1, ResourceState.STARTED , 2)
+ self.ec.register_condition(channel, ResourceAction.START, iface1, ResourceState.STARTED , 1)
+ self.ec.register_condition(app1, ResourceAction.START, channel, ResourceState.STARTED , 1)
+
+ # Real test
+ self.ec.register_condition(app2, ResourceAction.START, app1, ResourceState.STARTED , 4)
+
+ self.assertEquals(len(self.ec.get_resource(node1).conditions), 0)
+ self.assertEquals(len(self.ec.get_resource(iface1).conditions), 1)
+ self.assertEquals(len(self.ec.get_resource(channel).conditions), 1)
+ self.assertEquals(len(self.ec.get_resource(app1).conditions), 1)
+
+
+ def xtest_deploy(self):
+ ec.deploy()
+
+ #In order to release everythings
+ time.sleep(45)
+ ec.shutdown()
if __name__ == '__main__':
unittest.main()
+
+