#
# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
-import functools
-import logging
-import os
-import random
-import sys
-import time
-import threading
-
from nepi.util import guid
from nepi.util.parallel import ParallelRun
from nepi.util.timefuncs import tnow, tdiffsec, stabsformat, tsformat
from nepi.execution.trace import TraceAttr
# TODO: use multiprocessing instead of threading
-# TODO: When a failure occurs during deployment, scp and ssh processes are left running behind!!
# TODO: Allow to reconnect to a running experiment instance! (reconnect mode vs deploy mode)
+import functools
+import logging
+import os
+import random
+import sys
+import time
+import threading
+
class ECState(object):
""" State of the Experiment Controller
def wait_finished(self, guids):
""" Blocking method that wait until all the RM from the 'guid' list
- reached the state FINISHED
+ reached the state FINISHED ( or STOPPED, FAILED or RELEASED )
:param guids: List of guids
:type guids: list
def wait_started(self, guids):
""" Blocking method that wait until all the RM from the 'guid' list
- reached the state STARTED
+ reached the state STARTED ( or STOPPED, FINISHED, FAILED, RELEASED)
:param guids: List of guids
:type guids: list
"""
- return self.wait(guids, states = [ResourceState.STARTED,
- ResourceState.STOPPED,
- ResourceState.FAILED,
- ResourceState.FINISHED])
+ return self.wait(guids, state = ResourceState.STARTED)
def wait_released(self, guids):
""" Blocking method that wait until all the RM from the 'guid' list
- reached the state RELEASED
+ reached the state RELEASED (or FAILED)
+
+ :param guids: List of guids
+ :type guids: list
+ """
+ # TODO: solve state concurrency BUG and !!!!
+ # correct waited release state to state = ResourceState.FAILED)
+ return self.wait(guids, state = ResourceState.FINISHED)
+
+ def wait_deployed(self, guids):
+ """ Blocking method that wait until all the RM from the 'guid' list
+ reached the state READY (or any higher state)
:param guids: List of guids
:type guids: list
"""
- return self.wait(guids, states = [ResourceState.RELEASED,
- ResourceState.STOPPED,
- ResourceState.FAILED,
- ResourceState.FINISHED])
+ return self.wait(guids, state = ResourceState.READY)
- def wait(self, guids, states = [ResourceState.FINISHED,
- ResourceState.FAILED,
- ResourceState.STOPPED]):
+ def wait(self, guids, state = ResourceState.STOPPED):
""" Blocking method that waits until all the RM from the 'guid' list
reached state 'state' or until a failure occurs
# If a guid reached one of the target states, remove it from list
guid = guids[0]
- state = self.state(guid)
+ rstate = self.state(guid)
- if state in states:
+ if rstate >= state:
guids.remove(guid)
else:
# Debug...
- self.logger.debug(" WAITING FOR %g - state %s " % (guid,
- self.state(guid, hr = True)))
+ self.logger.debug(" WAITING FOR guid %d - state is %s, required is >= %s " % (guid,
+ self.state(guid, hr = True), state))
# Take the opportunity to 'refresh' the states of the RMs.
# Query only the first up to N guids (not to overwhelm
# If the guid is not in one of the target states, wait and
# continue quering. We keep the sleep big to decrease the
# number of RM state queries
- time.sleep(2)
+ time.sleep(4)
def get_task(self, tid):
""" Get a specific task
# the resource instance gets a copy of all traces
self._trcs = copy.deepcopy(self._traces)
- self._state = ResourceState.NEW
-
+ # Each resource is placed on a deployment group by the EC
+ # during deployment
self.deployment_group = None
self._start_time = None
self._finish_time = None
self._failed_time = None
+ self._state = ResourceState.NEW
+
@property
def guid(self):
""" Returns the global unique identifier of the RM """
This method is resposible for selecting an individual resource
matching user requirements.
This method should be redefined when necessary in child classes.
- """
- self._discover_time = tnow()
- self._state = ResourceState.DISCOVERED
+ """
+ self.set_discovered()
def provision(self):
""" Performs resource provisioning.
After this method has been successfully invoked, the resource
should be acccesible/controllable by the RM.
This method should be redefined when necessary in child classes.
- """
- self._provision_time = tnow()
- self._state = ResourceState.PROVISIONED
+ """
+ self.set_provisioned()
def start(self):
""" Starts the resource.
There is no generic start behavior for all resources.
This method should be redefined when necessary in child classes.
"""
- if not self._state in [ResourceState.READY, ResourceState.STOPPED]:
+ if not self.state in [ResourceState.READY, ResourceState.STOPPED]:
self.error("Wrong state %s for start" % self.state)
return
- self._start_time = tnow()
- self._state = ResourceState.STARTED
+ self.set_started()
def stop(self):
""" Stops the resource.
There is no generic stop behavior for all resources.
This method should be redefined when necessary in child classes.
"""
- if not self._state in [ResourceState.STARTED]:
+ if not self.state in [ResourceState.STARTED]:
self.error("Wrong state %s for stop" % self.state)
return
+
+ self.set_stopped()
- self._stop_time = tnow()
- self._state = ResourceState.STOPPED
+ def deploy(self):
+ """ Execute all steps required for the RM to reach the state READY
+
+ """
+ if self.state > ResourceState.READY:
+ self.error("Wrong state %s for deploy" % self.state)
+ return
+
+ self.debug("----- READY ---- ")
+ self.set_ready()
+
+ def release(self):
+ self.set_released()
+
+ def finish(self):
+ self.set_finished()
+
+ def fail(self):
+ self.set_failed()
def set(self, name, value):
""" Set the value of the attribute
self.debug(" ----- STOPPING ---- ")
self.stop()
- def deploy(self):
- """ Execute all steps required for the RM to reach the state READY
-
- """
- if self._state > ResourceState.READY:
- self.error("Wrong state %s for deploy" % self.state)
- return
-
- self.debug("----- READY ---- ")
- self._ready_time = tnow()
- self._state = ResourceState.READY
-
- def release(self):
- """Release any resources used by this RM
-
- """
- self._release_time = tnow()
- self._state = ResourceState.RELEASED
-
- def finish(self):
- """ Mark ResourceManager as FINISHED
-
- """
- self._finish_time = tnow()
- self._state = ResourceState.FINISHED
-
- def fail(self):
- """ Mark ResourceManager as FAILED
-
- """
- self._failed_time = tnow()
- self._state = ResourceState.FAILED
-
def connect(self, guid):
""" Performs actions that need to be taken upon associating RMs.
This method should be redefined when necessary in child classes.
"""
# TODO: Validate!
return True
+
+ def set_started(self):
+ """ Mark ResourceManager as STARTED """
+ self._start_time = tnow()
+ self._state = ResourceState.STARTED
+
+ def set_stopped(self):
+ """ Mark ResourceManager as STOPPED """
+ self._stop_time = tnow()
+ self._state = ResourceState.STOPPED
+
+ def set_ready(self):
+ """ Mark ResourceManager as READY """
+ self._ready_time = tnow()
+ self._state = ResourceState.READY
+
+ def set_released(self):
+ """ Mark ResourceManager as REALEASED """
+ self._release_time = tnow()
+ self._state = ResourceState.RELEASED
+
+ def set_finished(self):
+ """ Mark ResourceManager as FINISHED """
+ self._finish_time = tnow()
+ self._state = ResourceState.FINISHED
+
+ def set_failed(self):
+ """ Mark ResourceManager as FAILED """
+ self._failed_time = tnow()
+ self._state = ResourceState.FAILED
+
+ def set_discovered(self):
+ """ Mark ResourceManager as DISCOVERED """
+ self._discover_time = tnow()
+ self._state = ResourceState.DISCOVERED
+
+ def set_provisioned(self):
+ """ Mark ResourceManager as PROVISIONED """
+ self._provision_time = tnow()
+ self._state = ResourceState.PROVISIONED
class ResourceFactory(object):
_resource_types = dict()
from nepi.execution.attribute import Attribute, Flags, Types
from nepi.execution.trace import Trace, TraceAttr
from nepi.execution.resource import ResourceManager, clsinit, ResourceState, \
- reschedule_delay
+ reschedule_delay
from nepi.resources.linux.node import LinuxNode
from nepi.util.sshfuncs import ProcStatus
from nepi.util.timefuncs import tnow, tdiffsec
import subprocess
# TODO: Resolve wildcards in commands!!
+# TODO: When a failure occurs during deployment, scp and ssh processes are left running behind!!
@clsinit
class LinuxApplication(ResourceManager):
raise
super(LinuxApplication, self).deploy()
-
+
def start(self):
command = self.get("command")
if not command:
# If no command was given (i.e. Application was used for dependency
# installation), then the application is directly marked as FINISHED
- self._state = ResourceState.FINISHED
+ self.set_finished()
else:
if self.in_foreground:
if self.state == ResourceState.STARTED:
- self.info("Stopping command '%s'" % command)
+ self.info("Stopping command '%s' " % command)
# If the command is running in foreground (it was launched using
# the node 'execute' method), then we use the handler to the Popen
# process to kill it. Else we send a kill signal using the pid and ppid
# retrieved after running the command with the node 'run' method
- stopped = True
-
if self._proc:
self._proc.kill()
else:
(out, err), proc = self.node.kill(self.pid, self.ppid,
sudo = self._sudo_kill)
+ # TODO: check if execution errors occurred
if proc.poll() or err:
- # check if execution errors occurred
msg = " Failed to STOP command '%s' " % self.get("command")
self.error(msg, out, err)
self.fail()
-
+
if self.state == ResourceState.STARTED:
super(LinuxApplication, self).stop()
self.stop()
- if self.state == ResourceState.STOPPED:
+ if self.state != ResourceState.FAILED:
self.info("Resource released")
super(LinuxApplication, self).release()
-
+
@property
def state(self):
""" Returns the state of the application
err = self._proc.stderr.read()
self.error(msg, out, err)
self.fail()
- elif retcode == 0:
- self._state = ResourceState.FINISHED
+ elif retcode == 0:
+ self.finish()
else:
# We need to query the status of the command we launched in
# background. In order to avoid overwhelming the remote host and
self.run_home)
if err:
- msg = " Failed to execute command '%s'" % \
+ msg = "Failed to execute command '%s'" % \
self.get("command")
self.error(msg, out, err)
self.fail()
else:
- self._state = ResourceState.FINISHED
+ self.finish()
self._last_state_check = tnow()
#
# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
-from nepi.execution.resource import ResourceManager, clsinit_copy, ResourceState, \
+from nepi.execution.resource import clsinit_copy, ResourceState, \
reschedule_delay
from nepi.resources.linux.application import LinuxApplication
from nepi.resources.linux.ccn.ccnd import LinuxCCND
raise
self.debug("----- READY ---- ")
- self._ready_time = tnow()
- self._state = ResourceState.READY
+ self.set_ready()
@property
def _environment(self):
raise
self.debug("----- READY ---- ")
- self._ready_time = tnow()
- self._state = ResourceState.READY
+ self.set_ready()
def upload_start_command(self):
command = self.get("command")
raise RuntimeError, msg
def start(self):
- if self._state == ResourceState.READY:
+ if self.state == ResourceState.READY:
command = self.get("command")
self.info("Starting command '%s'" % command)
- self._start_time = tnow()
- self._state = ResourceState.STARTED
+ self.set_started()
else:
msg = " Failed to execute command '%s'" % command
self.error(msg, out, err)
- self._state = ResourceState.FAILED
+ sef.fail()
raise RuntimeError, msg
- @property
- def state(self):
- return self._state
-
@property
def _start_command(self):
command = ["ccnseqwriter"]
from nepi.execution.attribute import Attribute, Flags, Types
from nepi.execution.trace import Trace, TraceAttr
-from nepi.execution.resource import ResourceManager, clsinit_copy, ResourceState, \
- reschedule_delay
+from nepi.execution.resource import ResourceManager, clsinit_copy, \
+ ResourceState, reschedule_delay
from nepi.resources.linux.application import LinuxApplication
from nepi.resources.linux.node import OSType
from nepi.util.timefuncs import tnow, tdiffsec
raise
self.debug("----- READY ---- ")
- self._ready_time = tnow()
- self._state = ResourceState.READY
+ self.set_ready()
def upload_start_command(self):
command = self.get("command")
raise_on_error = True)
def start(self):
- if self._state == ResourceState.READY:
+ if self.state == ResourceState.READY:
command = self.get("command")
self.info("Starting command '%s'" % command)
- self._start_time = tnow()
- self._state = ResourceState.STARTED
+ self.set_started()
else:
msg = " Failed to execute command '%s'" % command
self.error(msg, out, err)
- self._state = ResourceState.FAILED
+ self.set_failed()
raise RuntimeError, msg
def stop(self):
command = self.get('command') or ''
- state = self.state
- if state == ResourceState.STARTED:
+ if self.state == ResourceState.STARTED:
self.info("Stopping command '%s'" % command)
command = "ccndstop"
stdout = "ccndstop_stdout",
stderr = "ccndstop_stderr")
- self._stop_time = tnow()
- self._state = ResourceState.STOPPED
+ self.set_stopped()
@property
def state(self):
if retcode == 1 and err.find("No such file or directory") > -1:
# ccnd is not running (socket not found)
- self._state = ResourceState.FINISHED
+ self.set_finished()
elif retcode:
# other errors ...
msg = " Failed to execute command '%s'" % self.get("command")
self.error(msg, out, err)
- self._state = ResourceState.FAILED
+ self.fail()
self._last_state_check = tnow()
raise
self.debug("----- READY ---- ")
- self._ready_time = tnow()
- self._state = ResourceState.READY
+ self.set_ready()
def upload_start_command(self):
command = self.get("command")
raise_on_error = True)
def start(self):
- if self._state == ResourceState.READY:
+ if self.state == ResourceState.READY:
command = self.get("command")
self.info("Starting command '%s'" % command)
- self._start_time = tnow()
- self._state = ResourceState.STARTED
+ self.set_started()
else:
msg = " Failed to execute command '%s'" % command
self.error(msg, out, err)
- self._state = ResourceState.FAILED
+ self.fail()
raise RuntimeError, msg
@property
raise
self.debug("----- READY ---- ")
- self._ready_time = tnow()
- self._state = ResourceState.READY
+ self.set_ready()
def upload_start_command(self):
command = self.get("command")
env, blocking = True)
if proc.poll():
- self._state = ResourceState.FAILED
msg = "Failed to execute command"
self.error(msg, out, err)
+ self.fail()
raise RuntimeError, msg
def configure(self):
self.ec.deploy(guids=[self._traceroute], group = self.deployment_group)
def start(self):
- if self._state in [ResourceState.READY, ResourceState.STARTED]:
+ if self.state == ResourceState.READY:
command = self.get("command")
self.info("Starting command '%s'" % command)
- self._start_time = tnow()
- self._state = ResourceState.STARTED
+ self.set_started()
else:
msg = " Failed to execute command '%s'" % command
self.error(msg, out, err)
- self._state = ResourceState.FAILED
+ self.fail()
raise RuntimeError, msg
def stop(self):
if proc.poll():
pass
- self._stop_time = tnow()
- self._state = ResourceState.STOPPED
-
- @property
- def state(self):
- return self._state
+ self.set_stopped()
@property
def _start_command(self):
# home directory at Linux host
self._home_dir = ""
- # lock to avoid concurrency issues on methods used by applications
- self._lock = threading.Lock()
+ # lock to prevent concurrent applications on the same node,
+ # to execute commands at the same time. There are potential
+ # concurrency issues when using SSH to a same host from
+ # multiple threads. There are also possible operational
+ # issues, e.g. an application querying the existence
+ # of a file or folder prior to its creation, and another
+ # application creating the same file or folder in between.
+ self._node_lock = threading.Lock()
def log_message(self, msg):
return " guid %d - host %s - %s " % (self.guid,
self.discover()
self.provision()
except:
- self._state = ResourceState.FAILED
+ self.fail()
raise
# Node needs to wait until all associated interfaces are
env = env)
else:
if with_lock:
- with self._lock:
+ with self._node_lock:
(out, err), proc = sshfuncs.rexec(
command,
host = self.get("hostname"),
sudo = sudo,
user = user)
else:
- with self._lock:
+ with self._node_lock:
(out, err), proc = sshfuncs.rspawn(
command,
pidfile = pidfile,
if self.localhost:
pidtuple = execfuncs.lgetpid(os.path.join(home, pidfile))
else:
- with self._lock:
+ with self._node_lock:
pidtuple = sshfuncs.rgetpid(
os.path.join(home, pidfile),
host = self.get("hostname"),
if self.localhost:
status = execfuncs.lstatus(pid, ppid)
else:
- with self._lock:
+ with self._node_lock:
status = sshfuncs.rstatus(
pid, ppid,
host = self.get("hostname"),
if self.localhost:
(out, err), proc = execfuncs.lkill(pid, ppid, sudo)
else:
- with self._lock:
+ with self._node_lock:
(out, err), proc = sshfuncs.rkill(
pid, ppid,
host = self.get("hostname"),
recursive = True,
strict_host_checking = False)
else:
- with self._lock:
+ with self._node_lock:
(out, err), proc = sshfuncs.rcopy(
src, dst,
port = self.get("port"),
from nepi.util.timefuncs import tnow
import os
+import socket
@clsinit_copy
class LinuxTraceroute(LinuxApplication):
default = False,
flags = Flags.ExecReadOnly)
+ use_ip = Attribute("useIP",
+ "Use the IP address instead of the host domain name. "
+ "Useful for environments were dns resolution problems occur "
+ "frequently",
+ type = Types.Bool,
+ default = False,
+ flags = Flags.ExecReadOnly)
+
target = Attribute("target",
"Traceroute target host (host that will be pinged)",
flags = Flags.ExecReadOnly)
cls._register_attribute(countinuous)
cls._register_attribute(print_timestamp)
+ cls._register_attribute(use_ip)
cls._register_attribute(target)
def __init__(self, ec, guid):
if self.get("printTimestamp") == True:
args.append("""echo "`date +'%Y%m%d%H%M%S'`";""")
args.append("traceroute")
- args.append(self.get("target"))
+
+ target = self.get("target")
+ if self.get("useIP") == True:
+ target = socket.gethostbyname(target)
+ args.append(target)
+
if self.get("continuous") == True:
args.append("; sleep 2 ; done ")
# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
from nepi.execution.attribute import Attribute, Flags, Types
-from nepi.execution.resource import ResourceManager, clsinit_copy, ResourceState, \
+from nepi.execution.resource import clsinit_copy, ResourceState, \
reschedule_delay
from nepi.execution.resource import clsinit_copy
from nepi.resources.linux.application import LinuxApplication
def start(self):
if self.get("s") == True:
# Server is already running
- if self._state == ResourceState.READY:
+ if self.state == ResourceState.READY:
command = self.get("command")
self.info("Starting command '%s'" % command)
- self._start_time = tnow()
- self._state = ResourceState.STARTED
+ self.set_started()
else:
msg = " Failed to execute command '%s'" % command
self.error(msg, out, err)
- self._state = ResourceState.FAILED
+ self.fail()
raise RuntimeError, msg
else:
super(LinuxUdpTest, self).start()
# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
from nepi.execution.attribute import Attribute, Flags, Types
-from nepi.execution.resource import ResourceManager, clsinit_copy, ResourceState, \
+from nepi.execution.resource import clsinit_copy, ResourceState, \
reschedule_delay
from nepi.resources.linux.application import LinuxApplication
from nepi.util.sshfuncs import ProcStatus
self.info("Provisioning finished")
- self.debug("----- READY ---- ")
- self._provision_time = tnow()
- self._state = ResourceState.PROVISIONED
+ self.set_provisioned()
def deploy(self):
if (not self.endpoint1 or self.endpoint1.state < ResourceState.READY) or \
raise
self.debug("----- READY ---- ")
- self._ready_time = tnow()
- self._state = ResourceState.READY
+ self.set_ready()
def start(self):
- if self._state == ResourceState.READY:
+ if self.state == ResourceState.READY:
command = self.get("command")
self.info("Starting command '%s'" % command)
-
- self._start_time = tnow()
- self._state = ResourceState.STARTED
+
+ self.set_started()
else:
msg = " Failed to execute command '%s'" % command
self.error(msg, out, err)
- self._state = ResourceState.FAILED
+ self.fail()
raise RuntimeError, msg
- # XXX: Leaves process unkilled!!
- # Implement another mechanism to kill the tunnel!
def stop(self):
""" Stops application execution
"""
if self.state == ResourceState.STARTED:
- stopped = True
self.info("Stopping tunnel")
# Only try to kill the process if the pid and ppid
msg = " Failed to STOP tunnel"
self.error(msg, err1, err2)
self.fail()
- stopped = False
- if stopped:
- self._stop_time = tnow()
- self._state = ResourceState.STOPPED
+ if self.state == ResourceState.STARTED:
+ self.set_stopped()
@property
def state(self):
self.error(msg, err1, err2)
self.fail()
else:
- self._state = ResourceState.FINISHED
+ self.set_finished()
self._last_state_check = tnow()
def wait_local_port(self, endpoint):
""" Waits until the local_port file for the endpoint is generated,
- and returns the port number """
+ and returns the port number
+
+ """
return self.wait_file(endpoint, "local_port")
def wait_result(self, endpoint):
- """ Waits until the return code file for the endpoint is generated """
+ """ Waits until the return code file for the endpoint is generated
+
+ """
return self.wait_file(endpoint, "ret_file")
def wait_file(self, endpoint, filename):
.. note::
- This class is used only by the Experiment Controller through the Resource Factory
+ This class is used only by the Experiment Controller through the
+ Resource Factory
"""
_rtype = "OMFApplication"
@classmethod
def _register_attributes(cls):
- """Register the attributes of an OMF application
+ """ Register the attributes of an OMF application
+
"""
appid = Attribute("appid", "Name of the application")
:type creds: dict
"""
-
super(OMFApplication, self).__init__(ec, guid)
self.set('appid', "")
self._omf_api = None
- @property
- def exp_id(self):
- if self.ec.exp_id.startswith('exp-'):
- return None
- return self.ec.exp_id
-
@property
def node(self):
rm_list = self.get_connected(OMFNode.rtype())
return None
def valid_connection(self, guid):
- """Check if the connection with the guid in parameter is possible. Only meaningful connections are allowed.
+ """ Check if the connection with the guid in parameter is possible.
+ Only meaningful connections are allowed.
:param guid: Guid of RM it will be connected
:type guid: int
"""
rm = self.ec.get_resource(guid)
if rm.rtype() not in self._authorized_connections:
- msg = "Connection between %s %s and %s %s refused : An Application can be connected only to a Node" %\
+ msg = ("Connection between %s %s and %s %s refused: "
+ "An Application can be connected only to a Node" ) % \
(self.rtype(), self._guid, rm.rtype(), guid)
self.debug(msg)
+
return False
+
elif len(self.connections) != 0 :
- msg = "Connection between %s %s and %s %s refused : This Application is already connected" % \
+ msg = ("Connection between %s %s and %s %s refused: "
+ "This Application is already connected" ) % \
(self.rtype(), self._guid, rm.rtype(), guid)
self.debug(msg)
+
return False
+
else :
- msg = "Connection between %s %s and %s %s accepted" % (self.rtype(), self._guid, rm.rtype(), guid)
+ msg = "Connection between %s %s and %s %s accepted" % (
+ self.rtype(), self._guid, rm.rtype(), guid)
self.debug(msg)
- return True
-
+ return True
def deploy(self):
- """Deploy the RM. It means nothing special for an application for now (later it will be upload sources, ...)
- It becomes DEPLOYED after getting the xmpp client.
+ """ Deploy the RM. It means nothing special for an application
+ for now (later it will be upload sources, ...)
+ It becomes DEPLOYED after getting the xmpp client.
+
"""
if not self._omf_api :
self._omf_api = OMFAPIFactory.get_api(self.get('xmppSlice'),
- self.get('xmppHost'), self.get('xmppPort'), self.get('xmppPassword'), exp_id = self.exp_id)
+ self.get('xmppHost'), self.get('xmppPort'),
+ self.get('xmppPassword'), exp_id = self.ec.exp_id)
if not self._omf_api :
- self._state = ResourceState.FAILED
msg = "Credentials are not initialzed. XMPP Connections impossible"
self.error(msg)
+ self.fail()
return
super(OMFApplication, self).deploy()
def start(self):
- """Start the RM. It means : Send Xmpp Message Using OMF protocol to execute the application
- It becomes STARTED before the messages are sent (for coordination)
+ """ Start the RM. It means : Send Xmpp Message Using OMF protocol
+ to execute the application.
+ It becomes STARTED before the messages are sent (for coordination)
"""
if not (self.get('appid') and self.get('path')) :
- self._state = ResourceState.FAILED
msg = "Application's information are not initialized"
self.error(msg)
+ self.fail()
return
if not self.get('args'):
self._omf_api.execute(self.node.get('hostname'),self.get('appid'), \
self.get('args'), self.get('path'), self.get('env'))
except AttributeError:
- self._state = ResourceState.FAILED
msg = "Credentials are not initialzed. XMPP Connections impossible"
self.error(msg)
+ self.fail()
raise
-
super(OMFApplication, self).start()
-
def stop(self):
- """Stop the RM. It means : Send Xmpp Message Using OMF protocol to kill the application
- It becomes STOPPED after the message is sent.
+ """ Stop the RM. It means : Send Xmpp Message Using OMF protocol to
+ kill the application.
+ State is set to STOPPED after the message is sent.
"""
try:
self._omf_api.exit(self.node.get('hostname'),self.get('appid'))
except AttributeError:
- self._state = ResourceState.FAILED
msg = "Credentials were not initialzed. XMPP Connections impossible"
self.error(msg)
+ self.fail()
#raise
super(OMFApplication, self).stop()
- self._state = ResourceState.FINISHED
-
def release(self):
- """Clean the RM at the end of the experiment and release the API.
+ """ Clean the RM at the end of the experiment and release the API.
"""
if self._omf_api :
OMFAPIFactory.release_api(self.get('xmppSlice'),
- self.get('xmppHost'), self.get('xmppPort'), self.get('xmppPassword'), exp_id = self.exp_id)
+ self.get('xmppHost'), self.get('xmppPort'),
+ self.get('xmppPassword'), exp_id = self.ec.exp_id)
super(OMFApplication, self).release()
-"""
- NEPI, a framework to manage network experiments
- Copyright (C) 2013 INRIA
-
- This program is free software: you can redistribute it and/or modify
- it under the terms of the GNU General Public License as published by
- the Free Software Foundation, either version 3 of the License, or
- (at your option) any later version.
-
- This program is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-"""
+#
+# NEPI, a framework to manage network experiments
+# Copyright (C) 2013 INRIA
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
+# Julien Tribino <julien.tribino@inria.fr>
from nepi.execution.resource import ResourceManager, clsinit, ResourceState, \
reschedule_delay
_rtype = "OMFChannel"
_authorized_connections = ["OMFWifiInterface", "OMFNode"]
-
@classmethod
def _register_attributes(cls):
"""Register the attributes of an OMF channel
+
"""
channel = Attribute("channel", "Name of the application")
xmppSlice = Attribute("xmppSlice","Name of the slice", flags = Flags.Credential)
return self.ec.exp_id
def valid_connection(self, guid):
- """Check if the connection with the guid in parameter is possible. Only meaningful connections are allowed.
+ """ Check if the connection with the guid in parameter is possible.
+ Only meaningful connections are allowed.
:param guid: Guid of the current RM
:type guid: int
"""
rm = self.ec.get_resource(guid)
+
if rm.rtype() in self._authorized_connections:
- msg = "Connection between %s %s and %s %s accepted" % (self.rtype(), self._guid, rm.rtype(), guid)
+ msg = "Connection between %s %s and %s %s accepted" % (
+ self.rtype(), self._guid, rm.rtype(), guid)
self.debug(msg)
return True
- msg = "Connection between %s %s and %s %s refused" % (self.rtype(), self._guid, rm.rtype(), guid)
+
+ msg = "Connection between %s %s and %s %s refused" % (
+ self.rtype(), self._guid, rm.rtype(), guid)
self.debug(msg)
+
return False
def _get_target(self, conn_set):
for conn in rm_iface.connections:
rm_node = self.ec.get_resource(conn)
if rm_node.rtype() == "OMFNode" and rm_node.get('hostname'):
- if rm_iface.state < ResourceState.PROVISIONED or rm_node.state < ResourceState.READY:
+ if rm_iface.state < ResourceState.PROVISIONED or \
+ rm_node.state < ResourceState.READY:
return "reschedule"
couple = [rm_node.get('hostname'), rm_iface.get('alias')]
#print couple
pass
def deploy(self):
- """Deploy the RM. It means : Get the xmpp client and send messages using OMF 5.4 protocol to configure the channel
- It becomes DEPLOYED after sending messages to configure the channel
+ """ Deploy the RM. It means : Get the xmpp client and send messages
+ using OMF 5.4 protocol to configure the channel.
+ It becomes DEPLOYED after sending messages to configure the channel
"""
if not self._omf_api :
self._omf_api = OMFAPIFactory.get_api(self.get('xmppSlice'),
- self.get('xmppHost'), self.get('xmppPort'), self.get('xmppPassword'), exp_id = self.exp_id)
+ self.get('xmppHost'), self.get('xmppPort'),
+ self.get('xmppPassword'), exp_id = self.exp_id)
if not self._omf_api :
- self._state = ResourceState.FAILED
msg = "Credentials are not initialzed. XMPP Connections impossible"
self.error(msg)
+ self.fail()
return
if not self.get('channel'):
- self._state = ResourceState.FAILED
msg = "Channel's value is not initialized"
self.error(msg)
+ self.fail()
raise
self._nodes_guid = self._get_target(self._connections)
attrname = "net/%s/%s" % (couple[1], 'channel')
self._omf_api.configure(couple[0], attrname, attrval)
except AttributeError:
- self._state = ResourceState.FAILED
msg = "Credentials are not initialzed. XMPP Connections impossible"
self.error(msg)
+ self.fail()
raise
super(OMFChannel, self).deploy()
def start(self):
- """Start the RM. It means nothing special for a channel for now
- It becomes STARTED as soon as this method starts.
+ """ Start the RM. It means nothing special for a channel for now
+ It becomes STARTED as soon as this method starts.
"""
super(OMFChannel, self).start()
def stop(self):
- """Stop the RM. It means nothing special for a channel for now
- It becomes STOPPED as soon as this method is called
+ """ Stop the RM. It means nothing special for a channel for now
+ It becomes STOPPED as soon as this method is called
"""
super(OMFChannel, self).stop()
def release(self):
- """Clean the RM at the end of the experiment and release the API
+ """ Clean the RM at the end of the experiment and release the API
"""
if self._omf_api :
OMFAPIFactory.release_api(self.get('xmppSlice'),
- self.get('xmppHost'), self.get('xmppPort'), self.get('xmppPassword'), exp_id = self.exp_id)
+ self.get('xmppHost'), self.get('xmppPort'),
+ self.get('xmppPassword'), exp_id = self.exp_id)
super(OMFChannel, self).release()
from nepi.resources.omf.channel import OMFChannel
from nepi.resources.omf.omf_api import OMFAPIFactory
-
@clsinit
class OMFWifiInterface(ResourceManager):
"""
.. note::
- This class is used only by the Experiment Controller through the Resource Factory
+ This class is used only by the Experiment Controller through the Resource
+ Factory
"""
_rtype = "OMFWifiInterface"
self._omf_api = None
self._alias = self.get('alias')
- @property
- def exp_id(self):
- if self.ec.exp_id.startswith('exp-'):
- return None
- return self.ec.exp_id
-
def valid_connection(self, guid):
- """ Check if the connection with the guid in parameter is possible. Only meaningful connections are allowed.
+ """ Check if the connection with the guid in parameter is possible.
+ Only meaningful connections are allowed.
:param guid: Guid of the current RM
:type guid: int
msg = "Connection between %s %s and %s %s accepted" % \
(self.rtype(), self._guid, rm.rtype(), guid)
self.debug(msg)
+
return True
+
msg = "Connection between %s %s and %s %s refused" % \
(self.rtype(), self._guid, rm.rtype(), guid)
self.debug(msg)
+
return False
@property
for attrname in ["mode", "type", "essid"]:
attrval = self.get(attrname)
attrname = "net/%s/%s" % (self._alias, attrname)
- self._omf_api.configure(self.node.get('hostname'), attrname, attrval)
+ self._omf_api.configure(self.node.get('hostname'), attrname,
+ attrval)
except AttributeError:
self._state = ResourceState.FAILED
msg = "Credentials are not initialzed. XMPP Connections impossible"
""" Configure the ip of the interface
"""
-
if self.channel.state < ResourceState.READY:
self.ec.schedule(reschedule_delay, self.deploy)
return False
try :
attrval = self.get("ip")
attrname = "net/%s/%s" % (self._alias, "ip")
- self._omf_api.configure(self.node.get('hostname'), attrname, attrval)
+ self._omf_api.configure(self.node.get('hostname'), attrname,
+ attrval)
except AttributeError:
- self._state = ResourceState.FAILED
msg = "Credentials are not initialzed. XMPP Connections impossible"
self.debug(msg)
+ self.fail()
#raise
return True
-
def deploy(self):
- """Deploy the RM. It means : Get the xmpp client and send messages using OMF 5.4 protocol to configure the interface
- It becomes DEPLOYED after sending messages to configure the interface
+ """ Deploy the RM. It means : Get the xmpp client and send messages
+ using OMF 5.4 protocol to configure the interface.
+ It becomes DEPLOYED after sending messages to configure the interface
"""
if not self._omf_api :
self._omf_api = OMFAPIFactory.get_api(self.get('xmppSlice'),
- self.get('xmppHost'), self.get('xmppPort'), self.get('xmppPassword'), exp_id = self.exp_id)
+ self.get('xmppHost'), self.get('xmppPort'),
+ self.get('xmppPassword'), exp_id = self.ec.exp_id)
if not self._omf_api :
- self._state = ResourceState.FAILED
msg = "Credentials are not initialzed. XMPP Connections impossible"
self.error(msg)
+ self.fail()
return
- if not (self.get('mode') and self.get('type') and self.get('essid') and self.get('ip')):
- self._state = ResourceState.FAILED
+ if not (self.get('mode') and self.get('type') and self.get('essid') \
+ and self.get('ip')):
msg = "Interface's variable are not initialized"
self.error(msg)
+ self.fail()
return False
if not self.node.get('hostname') :
msg = "The channel is connected with an undefined node"
self.error(msg)
+ self.fail()
return False
# Just for information
super(OMFWifiInterface, self).deploy()
return True
- def start(self):
- """Start the RM. It means nothing special for an interface for now
- It becomes STARTED as soon as this method starts.
-
- """
-
- super(OMFWifiInterface, self).start()
-
- def stop(self):
- """Stop the RM. It means nothing special for an interface for now
- It becomes STOPPED as soon as this method stops
-
- """
- super(OMFWifiInterface, self).stop()
-
def release(self):
- """Clean the RM at the end of the experiment and release the API
+ """ Clean the RM at the end of the experiment and release the API
"""
if self._omf_api :
OMFAPIFactory.release_api(self.get('xmppSlice'),
- self.get('xmppHost'), self.get('xmppPort'), self.get('xmppPassword'), exp_id = self.exp_id)
+ self.get('xmppHost'), self.get('xmppPort'),
+ self.get('xmppPassword'), exp_id = self.ec.exp_id)
super(OMFWifiInterface, self).release()
self._omf_api = None
- @property
- def exp_id(self):
- if self.ec.exp_id.startswith('exp-'):
- return None
- return self.ec.exp_id
-
def valid_connection(self, guid):
- """Check if the connection with the guid in parameter is possible. Only meaningful connections are allowed.
+ """ Check if the connection with the guid in parameter is possible.
+ Only meaningful connections are allowed.
:param guid: Guid of the current RM
:type guid: int
"""
rm = self.ec.get_resource(guid)
if rm.rtype() in self._authorized_connections:
- msg = "Connection between %s %s and %s %s accepted" % (self.rtype(), self._guid, rm.rtype(), guid)
+ msg = "Connection between %s %s and %s %s accepted" % (
+ self.rtype(), self._guid, rm.rtype(), guid)
self.debug(msg)
+
return True
- msg = "Connection between %s %s and %s %s refused" % (self.rtype(), self._guid, rm.rtype(), guid)
+
+ msg = "Connection between %s %s and %s %s refused" % (
+ self.rtype(), self._guid, rm.rtype(), guid)
self.debug(msg)
+
return False
def deploy(self):
- """Deploy the RM. It means : Send Xmpp Message Using OMF protocol to enroll the node into the experiment
- It becomes DEPLOYED after sending messages to enroll the node
+ """ Deploy the RM. It means : Send Xmpp Message Using OMF protocol
+ to enroll the node into the experiment.
+ It becomes DEPLOYED after sending messages to enroll the node
"""
if not self._omf_api :
self._omf_api = OMFAPIFactory.get_api(self.get('xmppSlice'),
- self.get('xmppHost'), self.get('xmppPort'), self.get('xmppPassword'), exp_id = self.exp_id)
+ self.get('xmppHost'), self.get('xmppPort'),
+ self.get('xmppPassword'), exp_id = self.ec.exp_id)
if not self._omf_api :
- self._state = ResourceState.FAILED
msg = "Credentials are not initialzed. XMPP Connections impossible"
self.error(msg)
+ self.fail()
return
if not self.get('hostname') :
- self._state = ResourceState.FAILED
msg = "Hostname's value is not initialized"
self.error(msg)
+ self.fail()
return False
try:
self._omf_api.enroll_host(self.get('hostname'))
except AttributeError:
- self._state = ResourceState.FAILED
msg = "Credentials are not initialzed. XMPP Connections impossible"
- self.debug(msg)
+ self.error(msg)
+ self.fail()
#raise AttributeError, msg
super(OMFNode, self).deploy()
"""
if self._omf_api :
self._omf_api.release(self.get('hostname'))
+
OMFAPIFactory.release_api(self.get('xmppSlice'),
- self.get('xmppHost'), self.get('xmppPort'), self.get('xmppPassword'), exp_id = self.exp_id)
+ self.get('xmppHost'), self.get('xmppPort'),
+ self.get('xmppPassword'), exp_id = self.ec.exp_id)
super(OMFNode, self).release()
.. note::
- This class is the implementation of an OMF 5.4 API. Since the version 5.4.1, the Topic Architecture start with OMF_5.4 instead of OMF used for OMF5.3
+ This class is the implementation of an OMF 5.4 API.
+ Since the version 5.4.1, the Topic Architecture start with OMF_5.4
+ instead of OMF used for OMF5.3
"""
- def __init__(self, slice, host, port, password, xmpp_root = None, exp_id = None):
+ def __init__(self, slice, host, port, password, xmpp_root = None,
+ exp_id = None):
"""
:param slice: Xmpp Slice
# OMF xmpp client
self._client = None
+
# message handler
self._message = None
""" Publish New Experiment Message
"""
- address = "/%s/%s/%s/%s" % (self._host, self._xmpp_root, self._slice, self._user)
+ address = "/%s/%s/%s/%s" % (self._host, self._xmpp_root, self._slice,
+ self._user)
#print address
payload = self._message.newexp_function(self._user, address)
slice_sid = "/%s/%s" % (self._xmpp_root, self._slice)
:type hostname: str
"""
- return "/%s/%s/%s/%s" % (self._xmpp_root, self._slice, self._user, hostname)
+ return "/%s/%s/%s/%s" % (self._xmpp_root, self._slice, self._user,
+ hostname)
def _host_resource_id(self, hostname):
""" Return the Topic Name as /xmpp_root/slice/resources/hostname
self._client.delete(xmpp_node)
def enroll_host(self, hostname):
- """ Create and Subscribe to the session topic and the resources corresponding to the hostname
+ """ Create and Subscribe to the session topic and the resources
+ corresponding to the hostname
:param hostname: Full hrn of the node
:type hostname: str
:param hostname: Full hrn of the node
:type hostname: str
- :param attribute: Attribute that need to be configured (often written as /net/wX/attribute, with X the interface number)
+ :param attribute: Attribute that need to be configured (
+ often written as /net/wX/attribute, with X the interface number)
:type attribute: str
:param value: Value of the attribute
:type value: str
:param hostname: Full hrn of the node
:type hostname: str
- :param app_id: Application Id (Any id that represents in a unique way the application)
+ :param app_id: Application Id (Any id that represents in a unique
+ way the application)
:type app_id: str
:param arguments: Arguments of the application
:type arguments: str
:type env: str
"""
- payload = self._message.execute_function(hostname, app_id, arguments, path, env)
+ payload = self._message.execute_function(hostname, app_id, arguments,
+ path, env)
xmpp_node = self._host_session_id(hostname)
self._client.publish(payload, xmpp_node)
"""
.. note::
- It allows the different RM to use the same xmpp client if they use the same credentials.
- For the moment, it is focused on Xmpp.
+ It allows the different RM to use the same xmpp client if they use
+ the same credentials. For the moment, it is focused on XMPP.
"""
- # use lock to avoid concurrent access to the Api list at the same times by 2 different threads
+ # use lock to avoid concurrent access to the Api list at the same times by 2
+ # different threads
lock = threading.Lock()
_apis = dict()
from nepi.resources.linux.node import LinuxNode
from nepi.resources.planetlab.plcapi import PLCAPIFactory
from nepi.util.timefuncs import tnow, tdiff, tdiffsec, stabsformat
-import threading
+
import subprocess
+import threading
+
+# A.Q. GENERAL COMMENTS: This module needs major cleaning up
+# - Lines should be 80 characters
+# - Most methods have too many lines and there are no comments or spaces
+# - There should be only two line breaks between two methods
+# - Code is too compressed. Hard to read. Add spaces when needed
+# - In general the code needs to be more subdivided. Use more methods
+# with clear names to divide operations (even if you don't reuse the
+# methods else where, this will make the code more readable)
@clsinit_copy
class PlanetlabNode(LinuxNode):
"""
return cls._blacklist
+ ### A.Q. COMMENT: Why did you wrapped the locks inside methods ?
@classmethod
def in_provision(cls):
""" Returns the nodes that anohter RM is trying to provision
return self._plapi
def discoverl(self):
+ #### A.Q. COMMENT: no need to have methods for the locks and
+ ## other attributes. Please remove.
bl = PlanetlabNode.blacklist()
inpro = PlanetlabNode.in_provision()
lockbl = PlanetlabNode.lock_bl()
if node_id not in bl and node_id not in inpro:
try_other = self.do_ping(node_id)
if try_other:
+ # A.Q. COMMENT: Here you could do
+ #
+ # with self._lockbl:
+ # ...
+ #
+ # Class attributes can still be accesed with 'self'
lockbl.acquire()
bl.append(node_id)
lockbl.release()
def provisionl(self):
+ # A.Q. COMMENT: you can import time on the top
import time
bl = PlanetlabNode.blacklist()
lockbl = PlanetlabNode.lock_bl()
t = 0
while t < timeout and not ssh_ok:
# check ssh connection
+
+ # A.Q. COMMENT IMPORTANT! Instead of issuing SSH commands directly use the
+ # "execute" method inherithed from LinuxNode with blocking = True
command = "ssh %s@%s -o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no 'echo \'GOOD NODE\''" % (slicename, ip)
p = subprocess.Popen(command, shell=True, stdout = subprocess.PIPE, stderr = subprocess.PIPE)
stdout, stderr = p.communicate()
with lockbl:
bl.append(node)
print bl
+ # A.Q. COMMENT: Make method "delete_slice_node" and there
+ # put this code. Repeat this for all calls to plapi.
+ # This will make the code cleaner.
self.plapi.delete_slice_node(slicename, [node])
self.discover()
continue
p = subprocess.Popen(command, shell=True, stdout = subprocess.PIPE, stderr = subprocess.PIPE)
stdout, stderr = p.communicate()
if stdout.find("/proc type proc") < 0:
+ # A.Q. COMMENT: lines 382-384 should go to a method
+ # "blacklist_node()"
lockbl.acquire()
bl.append(node)
lockbl.release()
# call provision de linux node?
super(PlanetlabNode, self).provision()
-
def filter_based_on_attributes(self):
# Map attributes with tagnames of PL
timeframe = self.get("timeframe")[0]
return nodes_inslice
def do_ping(self, node_id):
+ # A.Q. COMMENT: the execfuncs module in utils will do the local ping for you
+ # code reuse is good...
ip = self.plapi.get_interfaces({'node_id':node_id}, fields=['ip'])
ip = ip[0]['ip']
result = subprocess.call(["ping","-c","2",ip],stdout=subprocess.PIPE,stderr=subprocess.PIPE)
elif result == 1 or result == 2:
return True
-
+ # A.Q. Unclear name for method "fail2"
def fail2(self):
self.fail()
msg = "Discovery failed. No candidates found for node"
# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
from nepi.execution.attribute import Attribute, Flags, Types
-from nepi.execution.resource import ResourceManager, clsinit_copy, ResourceState, \
+from nepi.execution.resource import clsinit_copy, ResourceState, \
reschedule_delay
from nepi.resources.linux.application import LinuxApplication
from nepi.resources.planetlab.node import PlanetlabNode
raise
self.debug("----- READY ---- ")
- self._ready_time = tnow()
- self._state = ResourceState.READY
+ self.set_ready()
def start(self):
- if self._state == ResourceState.READY:
+ if self.state == ResourceState.READY:
command = self.get("command")
self.info("Starting command '%s'" % command)
- self._start_time = tnow()
- self._state = ResourceState.STARTED
+ self.set_started()
else:
msg = " Failed to execute command '%s'" % command
self.error(msg, out, err)
- self._state = ResourceState.FAILED
+ self.fail()
raise RuntimeError, msg
def stop(self):
command = self.get('command') or ''
- state = self.state
- if state == ResourceState.STARTED:
+ if self.state == ResourceState.STARTED:
self.info("Stopping command '%s'" % command)
command = "bash %s" % os.path.join(self.app_home, "stop.sh")
(out, err), proc = self.execute_command(command,
blocking = True)
- self._stop_time = tnow()
- self._state = ResourceState.STOPPED
+ self.set_stopped()
@property
def state(self):
if out.strip().find(self.get("deviceName")) == -1:
# tap is not running is not running (socket not found)
+ self._finish_time = tnow()
self._state = ResourceState.FINISHED
self._last_state_check = tnow()