"""
#!/usr/bin/env python
-from nepi.execution.resource import ResourceFactory, ResourceAction, ResourceState
+from nepi.execution.resource import ResourceFactory, ResourceAction, ResourceState, populate_factory
from nepi.execution.ec import ExperimentController
from nepi.resources.omf.node import OMFNode
ec = ExperimentController()
# Register the different RM that will be used
-ResourceFactory.register_type(OMFNode)
-ResourceFactory.register_type(OMFWifiInterface)
-ResourceFactory.register_type(OMFChannel)
-ResourceFactory.register_type(OMFApplication)
+populate_factory()
# Create and Configure the Nodes
node1 = ec.register_resource("OMFNode")
ec.set(app1, 'appid', 'Vlc#1')
ec.set(app1, 'path', "/opt/vlc-1.1.13/cvlc")
ec.set(app1, 'args', "/opt/10-by-p0d.avi --sout '#rtp{dst=10.0.0.37,port=1234,mux=ts}'")
+#ec.set(app1, 'args', "--quiet /opt/big_buck_bunny_240p_mpeg4.ts --sout '#rtp{dst=10.0.0.37,port=1234,mux=ts} '")
ec.set(app1, 'env', "DISPLAY=localhost:10.0 XAUTHORITY=/root/.Xauthority")
ec.set(app1, 'xmppSlice', "nepi")
ec.set(app1, 'xmppHost', "xmpp-plexus.onelab.eu")
app2 = ec.register_resource("OMFApplication")
ec.set(app2, 'appid', 'Vlc#2')
ec.set(app2, 'path', "/opt/vlc-1.1.13/cvlc")
-ec.set(app2, 'args', "rtp://10.0.0.37:1234")
+ec.set(app2, 'args', "--quiet rtp://10.0.0.37:1234")
ec.set(app2, 'env', "DISPLAY=localhost:10.0 XAUTHORITY=/root/.Xauthority")
ec.set(app2, 'xmppSlice', "nepi")
ec.set(app2, 'xmppHost', "xmpp-plexus.onelab.eu")
ec.register_connection(node2, iface2)
ec.register_connection(app2, node2)
-# Condition
-# Topology behaviour : It should not be done by the user, but ....
-#ec.register_condition([iface1, iface2, channel], ResourceAction.START, [node1, node2], ResourceState.STARTED , 2)
-#ec.register_condition(channel, ResourceAction.START, [iface1, iface2], ResourceState.STARTED , 1)
-#ec.register_condition(app1, ResourceAction.START, channel, ResourceState.STARTED , 1)
-
# User Behaviour
ec.register_condition(app2, ResourceAction.START, app1, ResourceState.STARTED , "4s")
-ec.register_condition([app1, app2], ResourceAction.STOP, app2, ResourceState.STARTED , "20s")
+ec.register_condition([app1, app2], ResourceAction.STOP, app2, ResourceState.STARTED , "22s")
ec.register_condition(app3, ResourceAction.START, app2, ResourceState.STARTED , "25s")
+ec.register_condition(app3, ResourceAction.STOP, app3, ResourceState.STARTED , "1s")
# Deploy
ec.deploy()
+ec.wait_finished([app1, app2, app3])
+
# Stop Experiment
-time.sleep(50)
+#time.sleep(55)
ec.shutdown()
callback = functools.partial(self.stop_with_conditions)
self.ec.schedule(delay, callback)
else:
- self.logger.debug(" ----- STOPPING ---- ")
+ self.debug(" ----- STOPPING ---- ")
self.stop()
def deploy(self):
"""
-from nepi.execution.resource import ResourceManager, clsinit
+from nepi.execution.resource import ResourceManager, clsinit, ResourceState
from nepi.execution.attribute import Attribute, Flags
from nepi.resources.omf.omf_api import OMFAPIFactory
+reschedule_delay = "0.5s"
+
@clsinit
class OMFApplication(ResourceManager):
"""
"""
_rtype = "OMFApplication"
_authorized_connections = ["OMFNode"]
- _waiters = ["OMFNode", "OMFChannel", "OMFWifiInterface"]
@classmethod
def _register_attributes(cls):
rm = self.ec.get_resource(guid)
if rm.rtype() not in self._authorized_connections:
msg = "Connection between %s %s and %s %s refused : An Application can be connected only to a Node" % (self.rtype(), self._guid, rm.rtype(), guid)
- self._logger.debug(msg)
+ self.debug(msg)
return False
elif len(self.connections) != 0 :
msg = "Connection between %s %s and %s %s refused : Already Connected" % (self.rtype(), self._guid, rm.rtype(), guid)
- self._logger.debug(msg)
+ self.debug(msg)
return False
else :
msg = "Connection between %s %s and %s %s accepted" % (self.rtype(), self._guid, rm.rtype(), guid)
- self._logger.debug(msg)
+ self.debug(msg)
return True
- def _get_nodes(self, conn_set):
- """Get the RM of the node to which the application is connected
-
- :param conn_set: Connections of the current Guid
- :type conn_set: set
- :rtype: ResourceManager
- """
-
- for elt in conn_set:
- rm = self.ec.get_resource(elt)
- if rm.rtype() == "OMFNode":
- return rm
- return None
-
def deploy(self):
"""Deploy the RM
"""
super(OMFApplication, self).start()
msg = " " + self.rtype() + " ( Guid : " + str(self._guid) +") : " + self.get('appid') + " : " + self.get('path') + " : " + self.get('args') + " : " + self.get('env')
- self.debug(msg)
+ self.info(msg)
if self.get('appid') and self.get('path') and self.get('args') and self.get('env'):
- rm_node = self._get_nodes(self._connections)
- self._omf_api.execute(rm_node.get('hostname'),self.get('appid'), self.get('args'), self.get('path'), self.get('env'))
+ rm_list = self.get_connected("OMFNode")
+ for rm_node in rm_list:
+ self._omf_api.execute(rm_node.get('hostname'),self.get('appid'), self.get('args'), self.get('path'), self.get('env'))
+ else :
+ msg = "Credentials are not initialized"
+ self.error(msg)
def stop(self):
"""Send Xmpp Message Using OMF protocol to kill the application
"""
- rm_node = self._get_nodes(self._connections)
- self._omf_api.exit(rm_node.get('hostname'),self.get('appid'))
+ rm_list = self.get_connected("OMFNode")
+ for rm_node in rm_list :
+ self._omf_api.exit(rm_node.get('hostname'),self.get('appid'))
super(OMFApplication, self).stop()
+ self._state = ResourceState.FINISHED
+
def release(self):
"""Clean the RM at the end of the experiment
"""
-from nepi.execution.resource import ResourceManager, clsinit
+from nepi.execution.resource import ResourceManager, clsinit, ResourceState
from nepi.execution.attribute import Attribute, Flags
from nepi.resources.omf.omf_api import OMFAPIFactory
-import nepi
-import logging
+reschedule_delay = "0.5s"
@clsinit
class OMFChannel(ResourceManager):
"""
_rtype = "OMFChannel"
_authorized_connections = ["OMFWifiInterface", "OMFNode"]
- _waiters = ["OMFNode", "OMFWifiInterface"]
@classmethod
self._omf_api = None
- self._logger = logging.getLogger("nepi.omf.omfChannel")
- self._logger.setLevel(nepi.LOGLEVEL)
-
def _validate_connection(self, guid):
"""Check if the connection is available.
for conn in rm_iface.connections:
rm_node = self.ec.get_resource(conn)
if rm_node.rtype() == "OMFNode":
+ if rm_iface.state < ResourceState.READY or rm_node.state < ResourceState.READY:
+ return "reschedule"
couple = [rm_node.get('hostname'), rm_iface.get('alias')]
#print couple
self._nodes_guid.append(couple)
"""Deploy the RM
"""
- self._omf_api = OMFAPIFactory.get_api(self.get('xmppSlice'),
- self.get('xmppHost'), self.get('xmppPort'), self.get('xmppPassword'))
+ if not self._omf_api :
+ self._omf_api = OMFAPIFactory.get_api(self.get('xmppSlice'),
+ self.get('xmppHost'), self.get('xmppPort'), self.get('xmppPassword'))
if self.get('channel'):
set_nodes = self._get_target(self._connections)
+ if set_nodes == "reschedule" :
+ self.ec.schedule(reschedule_delay, self.deploy)
+ return
print set_nodes
for couple in set_nodes:
#print "Couple node/alias : " + couple[0] + " , " + couple[1]
attrval = self.get('channel')
attrname = "net/%s/%s" % (couple[1], 'channel')
- #print "Send the configure message"
self._omf_api.configure(couple[0], attrname, attrval)
super(OMFChannel, self).deploy()
"""
-from nepi.execution.resource import ResourceManager, clsinit
+from nepi.execution.resource import ResourceManager, clsinit, ResourceState
from nepi.execution.attribute import Attribute, Flags
from nepi.resources.omf.omf_api import OMFAPIFactory
+reschedule_delay = "0.5s"
@clsinit
class OMFWifiInterface(ResourceManager):
"""
_rtype = "OMFWifiInterface"
_authorized_connections = ["OMFNode" , "OMFChannel"]
- _waiters = ["OMFNode"]
#alias2name = dict({'w0':'wlan0', 'w1':'wlan1'})
"""Deploy the RM
"""
- self._omf_api = OMFAPIFactory.get_api(self.get('xmppSlice'),
- self.get('xmppHost'), self.get('xmppPort'), self.get('xmppPassword'))
+ if not self._omf_api :
+ self._omf_api = OMFAPIFactory.get_api(self.get('xmppSlice'),
+ self.get('xmppHost'), self.get('xmppPort'), self.get('xmppPassword'))
self._logger.debug(" " + self.rtype() + " ( Guid : " + str(self._guid) +") : " +
self.get('mode') + " : " + self.get('type') + " : " +
self.get('essid') + " : " + self.get('ip'))
#try:
if self.get('mode') and self.get('type') and self.get('essid') and self.get('ip'):
- rm_node = self._get_nodes(self._connections)
+ rm_list = self.get_connected("OMFNode")
+ for rm_node in rm_list:
+ if rm_node.state < ResourceState.READY:
+ self.ec.schedule(reschedule_delay, self.deploy)
+ return
for attrname in ["mode", "type", "essid", "ip"]:
attrval = self.get(attrname)
attrname = "net/%s/%s" % (self._alias, attrname)
"""
-from nepi.execution.resource import ResourceManager, clsinit
+from nepi.execution.resource import ResourceManager, clsinit, ResourceState
from nepi.execution.attribute import Attribute, Flags
from nepi.resources.omf.omf_api import OMFAPIFactory
import time
+reschedule_delay = "0.5s"
+
@clsinit
class OMFNode(ResourceManager):
"""
"""
_rtype = "OMFNode"
_authorized_connections = ["OMFApplication" , "OMFWifiInterface"]
- _waiters = []
@classmethod
def _register_attributes(cls):
"""Deploy the RM
"""
- self._omf_api = OMFAPIFactory.get_api(self.get('xmppSlice'),
- self.get('xmppHost'), self.get('xmppPort'), self.get('xmppPassword'))
+ if not self._omf_api :
+ self._omf_api = OMFAPIFactory.get_api(self.get('xmppSlice'),
+ self.get('xmppHost'), self.get('xmppPort'), self.get('xmppPassword'))
self._omf_api.enroll_host(self.get('hostname'))
super(OMFNode, self).deploy()
key = cls._make_key(slice, host, port, password)
cls.lock.acquire()
if key in cls._apis:
+ #print "Api Counter : " + str(cls._apis[key]['cnt'])
cls._apis[key]['cnt'] += 1
cls.lock.release()
return cls._apis[key]['api']
self._registered = True
except IqError as e:
msg = " Could not register account: %s" % e.iq['error']['text']
- selferror(msg)
+ self.error(msg)
except IqTimeout:
msg = " No response from server."
self.error(msg)
result = self['xep_0060'].get_nodes(self._server)
for item in result['disco_items']['items']:
msg = ' - %s' % str(item)
- self.info(msg)
+ self.debug(msg)
return result
except:
error = traceback.format_exc()
#self.boundjid.full)
for node in result['node']:
msg = ' - %s' % str(node)
- self.info(msg)
+ self.debug(msg)
return result
except:
error = traceback.format_exc()
:type node: str
"""
- self.debug(" Create Topic : " + node)
+ msg = " Create Topic : " + node
+ self.info(msg)
config = self['xep_0004'].makeForm('submit')
config.add_field(var='pubsub#node_type', value='leaf')
"""
msg = " Publish to Topic : " + node
- self.debug(msg)
+ self.info(msg)
try:
result = self['xep_0060'].publish(self._server,node,payload=data)
# id = result['pubsub']['publish']['item']['id']
data)
for item in result['pubsub']['items']['substanzas']:
msg = 'Retrieved item %s: %s' % (item['id'], tostring(item['payload']))
- self.info(msg)
+ self.debug(msg)
except:
error = traceback.format_exc()
msg = ' Could not retrieve item %s from topic %s\ntraceback:\n%s' \
try:
result = self['xep_0060'].retract(self._server, self.boundjid, data)
msg = ' Retracted item %s from topic %s' % (data, self.boundjid)
- self.info(msg)
+ self.debug(msg)
except:
error = traceback.format_exc()
msg = 'Could not retract item %s from topic %s\ntraceback:\n%s' \
try:
result = self['xep_0060'].purge(self._server, self.boundjid)
msg = ' Purged all items from topic %s' % self.boundjid
- self.info(msg)
+ self.debug(msg)
except:
error = traceback.format_exc()
msg = ' Could not purge items from topic %s\ntraceback:\n%s' \
result = self['xep_0060'].subscribe(self._server, node)
msg = ' Subscribed %s to topic %s' \
% (self.boundjid.user, node)
- self.info(msg)
- #self.debug(msg)
+ #self.info(msg)
+ self.debug(msg)
except:
error = traceback.format_exc()
msg = ' Could not subscribe %s to topic %s\ntraceback:\n%s' \
try:
result = self['xep_0060'].unsubscribe(self._server, node)
msg = ' Unsubscribed %s from topic %s' % (self.boundjid.bare, node)
- self.info(msg)
+ self.debug(msg)
except:
error = traceback.format_exc()
msg = ' Could not unsubscribe %s from topic %s\ntraceback:\n%s' \
from nepi.execution.resource import ResourceFactory, ResourceManager, ResourceAction, ResourceState
from nepi.execution.ec import ExperimentController
-from nepi.resources.omf.omf_node import OMFNode
-from nepi.resources.omf.omf_application import OMFApplication
-from nepi.resources.omf.omf_interface import OMFWifiInterface
-from nepi.resources.omf.omf_channel import OMFChannel
+from nepi.resources.omf.node import OMFNode
+from nepi.resources.omf.application import OMFApplication
+from nepi.resources.omf.interface import OMFWifiInterface
+from nepi.resources.omf.channel import OMFChannel
from nepi.resources.omf.omf_api import OMFAPIFactory
from nepi.util import guid
self.ec.register_condition(app3, ResourceAction.START, app2, ResourceState.STARTED , "2s")
self.ec.register_condition(app4, ResourceAction.START, app3, ResourceState.STARTED , "3s")
self.ec.register_condition(app5, ResourceAction.START, [app3, app2], ResourceState.STARTED , "2s")
- self.ec.register_condition(app5, ResourceAction.START, app1, ResourceState.STARTED , "1m20s")
+ self.ec.register_condition(app5, ResourceAction.START, app1, ResourceState.STARTED , "25s")
self.ec.deploy()
- time.sleep(150)
+ time.sleep(60)
self.assertEquals(round(strfdiff(self.ec.get_resource(app2).start_time, self.ec.get_resource(app1).start_time),1), 3.0)
self.assertEquals(round(strfdiff(self.ec.get_resource(app3).start_time, self.ec.get_resource(app2).start_time),1), 2.0)