From: Alina Quereilhac Date: Sun, 19 May 2013 20:12:59 +0000 (+0200) Subject: Replacing string "neco" by "nepi" X-Git-Tag: nepi-3.0.0~122^2 X-Git-Url: http://git.onelab.eu/?p=nepi.git;a=commitdiff_plain;h=98a98f7058ec7180303ea88e8ccc212ddf740ac6 Replacing string "neco" by "nepi" --- diff --git a/examples/automated_vlc_experiment_plexus.py b/examples/automated_vlc_experiment_plexus.py index 1cae09a6..b9d7e35a 100644 --- a/examples/automated_vlc_experiment_plexus.py +++ b/examples/automated_vlc_experiment_plexus.py @@ -1,11 +1,11 @@ #!/usr/bin/env python -from neco.execution.resource import ResourceFactory, ResourceAction, ResourceState -from neco.execution.ec import ExperimentController +from nepi.execution.resource import ResourceFactory, ResourceAction, ResourceState +from nepi.execution.ec import ExperimentController -from neco.resources.omf.omf_node import OMFNode -from neco.resources.omf.omf_application import OMFApplication -from neco.resources.omf.omf_interface import OMFWifiInterface -from neco.resources.omf.omf_channel import OMFChannel +from nepi.resources.omf.omf_node import OMFNode +from nepi.resources.omf.omf_application import OMFApplication +from nepi.resources.omf.omf_interface import OMFWifiInterface +from nepi.resources.omf.omf_channel import OMFChannel import logging import time diff --git a/examples/linux/ccnx/simple_topo.py b/examples/linux/ccnx/simple_topo.py index 65faf715..ee41017d 100644 --- a/examples/linux/ccnx/simple_topo.py +++ b/examples/linux/ccnx/simple_topo.py @@ -1,6 +1,6 @@ #!/usr/bin/env python -from neco.execution.ec import ExperimentController, ECState -from neco.execution.resource import ResourceState, ResourceAction, \ +from nepi.execution.ec import ExperimentController, ECState +from nepi.execution.resource import ResourceState, ResourceAction, \ populate_factory from optparse import OptionParser, SUPPRESS_HELP diff --git a/examples/linux/scalability.py b/examples/linux/scalability.py index 110405a1..903e6b9a 100644 --- a/examples/linux/scalability.py +++ b/examples/linux/scalability.py @@ -1,6 +1,6 @@ #!/usr/bin/env python -from neco.execution.ec import ExperimentController, ECState -from neco.execution.resource import ResourceState, ResourceAction, \ +from nepi.execution.ec import ExperimentController, ECState +from nepi.execution.resource import ResourceState, ResourceAction, \ populate_factory from optparse import OptionParser, SUPPRESS_HELP diff --git a/examples/manual_vlc_experiment_plexus.py b/examples/manual_vlc_experiment_plexus.py index ebdd8e11..fc449c70 100644 --- a/examples/manual_vlc_experiment_plexus.py +++ b/examples/manual_vlc_experiment_plexus.py @@ -1,11 +1,11 @@ #!/usr/bin/env python -from neco.execution.resource import ResourceFactory -from neco.execution.ec import ExperimentController +from nepi.execution.resource import ResourceFactory +from nepi.execution.ec import ExperimentController -from neco.resources.omf.omf_node import OMFNode -from neco.resources.omf.omf_application import OMFApplication -from neco.resources.omf.omf_interface import OMFWifiInterface -from neco.resources.omf.omf_channel import OMFChannel +from nepi.resources.omf.omf_node import OMFNode +from nepi.resources.omf.omf_application import OMFApplication +from nepi.resources.omf.omf_interface import OMFWifiInterface +from nepi.resources.omf.omf_channel import OMFChannel import logging import time diff --git a/setup.py b/setup.py index 0be05ecb..7bae51b4 100755 --- a/setup.py +++ b/setup.py @@ -3,23 +3,23 @@ from distutils.core import setup import sys setup( - name = "neco", - version = "0.01", + name = "nepi", + version = "3.0", description = "Network Experiment Controller", author = "Alina Quereilhac", url = "", license = "GPLv2", platforms = "Linux", packages = [ - "neco", - "neco.design", - "neco.execution", - "neco.resources", - "neco.resources.linux", - "neco.resources.netns", - "neco.resources.ns3", - "neco.resources.omf", - "neco.resources.planetlab", - "neco.util"], + "nepi", + "nepi.design", + "nepi.execution", + "nepi.resources", + "nepi.resources.linux", + "nepi.resources.netns", + "nepi.resources.ns3", + "nepi.resources.omf", + "nepi.resources.planetlab", + "nepi.util"], package_dir = {"": "src"}, ) diff --git a/src/nepi/design/box.py b/src/nepi/design/box.py index 121827aa..82bb899b 100644 --- a/src/nepi/design/box.py +++ b/src/nepi/design/box.py @@ -1,4 +1,4 @@ -from neco.util import guid +from nepi.util import guid guid_gen = guid.GuidGenerator() diff --git a/src/nepi/execution/ec.py b/src/nepi/execution/ec.py index 26f0cd47..0e76650f 100644 --- a/src/nepi/execution/ec.py +++ b/src/nepi/execution/ec.py @@ -5,13 +5,13 @@ import sys import time import threading -from neco.util import guid -from neco.util.parallel import ParallelRun -from neco.util.timefuncs import strfnow, strfdiff, strfvalid -from neco.execution.resource import ResourceFactory, ResourceAction, \ +from nepi.util import guid +from nepi.util.parallel import ParallelRun +from nepi.util.timefuncs import strfnow, strfdiff, strfvalid +from nepi.execution.resource import ResourceFactory, ResourceAction, \ ResourceState -from neco.execution.scheduler import HeapScheduler, Task, TaskStatus -from neco.execution.trace import TraceAttr +from nepi.execution.scheduler import HeapScheduler, Task, TaskStatus +from nepi.execution.trace import TraceAttr # TODO: use multiprocessing instead of threading # TODO: When a failure occurrs during deployment scp and ssh processes are left running behind!! diff --git a/src/nepi/execution/resource.py b/src/nepi/execution/resource.py index a7e7758f..dfa219e5 100644 --- a/src/nepi/execution/resource.py +++ b/src/nepi/execution/resource.py @@ -1,5 +1,5 @@ -from neco.util.timefuncs import strfnow, strfdiff, strfvalid -from neco.execution.trace import TraceAttr +from nepi.util.timefuncs import strfnow, strfdiff, strfvalid +from nepi.execution.trace import TraceAttr import copy import functools @@ -572,8 +572,8 @@ def find_types(): search_path = os.environ.get("NEPI_SEARCH_PATH", "") search_path = set(search_path.split(" ")) - import neco.resources - path = os.path.dirname(neco.resources.__file__) + import nepi.resources + path = os.path.dirname(nepi.resources.__file__) search_path.add(path) types = [] diff --git a/src/nepi/resources/linux/application.py b/src/nepi/resources/linux/application.py index 882fb4c3..0364939a 100644 --- a/src/nepi/resources/linux/application.py +++ b/src/nepi/resources/linux/application.py @@ -1,9 +1,9 @@ -from neco.execution.attribute import Attribute, Flags, Types -from neco.execution.trace import Trace, TraceAttr -from neco.execution.resource import ResourceManager, clsinit, ResourceState -from neco.resources.linux.node import LinuxNode -from neco.util import sshfuncs -from neco.util.timefuncs import strfnow, strfdiff +from nepi.execution.attribute import Attribute, Flags, Types +from nepi.execution.trace import Trace, TraceAttr +from nepi.execution.resource import ResourceManager, clsinit, ResourceState +from nepi.resources.linux.node import LinuxNode +from nepi.util import sshfuncs +from nepi.util.timefuncs import strfnow, strfdiff import logging import os diff --git a/src/nepi/resources/linux/channel.py b/src/nepi/resources/linux/channel.py index f4c1cf22..142478b2 100644 --- a/src/nepi/resources/linux/channel.py +++ b/src/nepi/resources/linux/channel.py @@ -1,6 +1,6 @@ -from neco.execution.attribute import Attribute, Flags -from neco.execution.resource import ResourceManager, clsinit, ResourceState -from neco.resources.linux.node import LinuxNode +from nepi.execution.attribute import Attribute, Flags +from nepi.execution.resource import ResourceManager, clsinit, ResourceState +from nepi.resources.linux.node import LinuxNode import collections import logging diff --git a/src/nepi/resources/linux/interface.py b/src/nepi/resources/linux/interface.py new file mode 100644 index 00000000..f5727668 --- /dev/null +++ b/src/nepi/resources/linux/interface.py @@ -0,0 +1,296 @@ +from nepi.execution.attribute import Attribute, Types, Flags +from nepi.execution.resource import ResourceManager, clsinit, ResourceState +from nepi.resources.linux.node import LinuxNode +from nepi.resources.linux.channel import LinuxChannel + +import collections +import logging +import os +import random +import re +import tempfile +import time + +# TODO: UP, MTU attributes! + +reschedule_delay = "0.5s" + +@clsinit +class LinuxInterface(ResourceManager): + _rtype = "LinuxInterface" + + @classmethod + def _register_attributes(cls): + ip4 = Attribute("ip4", "IPv4 Address", + flags = Flags.ExecReadOnly) + + ip6 = Attribute("ip6", "IPv6 Address", + flags = Flags.ExecReadOnly) + + mac = Attribute("mac", "MAC Address", + flags = Flags.ExecReadOnly) + + mask4 = Attribute("mask4", "IPv4 network mask", + flags = Flags.ExecReadOnly) + + mask6 = Attribute("mask6", "IPv6 network mask", + type = Types.Integer, + flags = Flags.ExecReadOnly) + + mtu = Attribute("mtu", "Maximum transmition unit for device", + type = Types.Integer) + + devname = Attribute("deviceName", + "Name of the network interface (e.g. eth0, wlan0, etc)", + flags = Flags.ExecReadOnly) + + up = Attribute("up", "Link up", type = Types.Bool) + + tear_down = Attribute("tearDown", "Bash script to be executed before " + \ + "releasing the resource", + flags = Flags.ExecReadOnly) + + cls._register_attribute(ip4) + cls._register_attribute(ip6) + cls._register_attribute(mac) + cls._register_attribute(mask4) + cls._register_attribute(mask6) + cls._register_attribute(mtu) + cls._register_attribute(devname) + cls._register_attribute(up) + cls._register_attribute(tear_down) + + def __init__(self, ec, guid): + super(LinuxInterface, self).__init__(ec, guid) + self._configured = False + + self._logger = logging.getLogger("LinuxInterface") + + self.add_set_hooks() + + def log_message(self, msg): + return " guid %d - host %s - %s " % (self.guid, + self.node.get("hostname"), msg) + + @property + def node(self): + node = self.get_connected(LinuxNode.rtype()) + if node: return node[0] + return None + + @property + def channel(self): + chan = self.get_connected(LinuxChannel.rtype()) + if chan: return chan[0] + return None + + def discover(self, filters = None): + devname = self.get("deviceName") + ip4 = self.get("ip4") + ip6 = self.get("ip4") + mac = self.get("mac") + mask4 = self.get("mask4") + mask6 = self.get("mask6") + mtu = self.get("mtu") + + # Get current interfaces information + (out, err), proc = self.node.execute("ifconfig", sudo = True) + + if err and proc.poll(): + msg = " Error retrieving interface information " + self.error(msg, out, err) + raise RuntimeError, "%s - %s - %s" % (msg, out, err) + + # Check if an interface is found matching the RM attributes + ifaces = out.split("\n\n") + + for i in ifaces: + m = re.findall("(\w+)\s+Link\s+encap:\w+(\s+HWaddr\s+(([0-9a-fA-F]{2}:?){6}))?(\s+inet\s+addr:((\d+\.?){4}).+Mask:(\d+\.\d+\.\d+\.\d+))?(.+inet6\s+addr:\s+([0-9a-fA-F:.]+)/(\d+))?(.+(UP))?(.+MTU:(\d+))?", i, re.DOTALL) + + m = m[0] + dn = m[0] + mc = m[2] + i4 = m[5] + msk4 = m[7] + i6 = m[9] + msk6 = m[10] + up = True if m[12] else False + mu = m[14] + + self.debug("Found interface %(devname)s with MAC %(mac)s," + "IPv4 %(ipv4)s %(mask4)s IPv6 %(ipv6)s/%(mask6)s %(up)s %(mtu)s" % ({ + 'devname': dn, + 'mac': mc, + 'ipv4': i4, + 'mask4': msk4, + 'ipv6': i6, + 'mask6': msk6, + 'up': up, + 'mtu': mu + }) ) + + # If the user didn't provide information we take the first + # interface that is UP + if not devname and not ip4 and not ip6 and up: + self._configured = True + self.load_configuration(dn, mc, i4, msk4, i6, msk6, mu, up) + break + + # If the user provided ipv4 or ipv6 matching that of an interface + # load the interface info + if (ip4 and ip4 == i4) or (ip6 and ip6 == i6): + self._configured = True + self.load_configuration(dn, mc, i4, msk4, i6, msk6, mu, up) + break + + # If the user provided the device name we load the associated info + if devname and devname == dn: + if ((ip4 and ip4 == i4) and (ipv6 and ip6 == i6)) or \ + not (ip4 or ip6): + self._configured = True + + # If the user gave a different ip than the existing, asume ip + # needs to be changed + i4 = ip4 or i4 + i6 = ip6 or i6 + mu = mtu or mu + + self.load_configuration(dn, mc, i4, msk4, i6, msk6, mu, up) + break + + if not self.get("deviceName"): + msg = "Unable to resolve interface " + self.error(msg) + raise RuntimeError, msg + + super(LinuxInterface, self).discover(filters = filters) + + def provision(self, filters = None): + devname = self.get("deviceName") + ip4 = self.get("ip4") + ip6 = self.get("ip4") + mac = self.get("mac") + mask4 = self.get("mask4") + mask6 = self.get("mask6") + mtu = self.get("mtu") + + # Must configure interface if configuration is required + if not self._configured: + cmd = "ifconfig %s" % devname + + if ip4 and mask4: + cmd += " %(ip4)s netmask %(mask4)s broadcast %(bcast)s up" % ({ + 'ip4': ip4, + 'mask4': mask4, + 'bcast': bcast}) + if mtu: + cmd += " mtu %d " % mtu + + (out, err), proc = self.node.execute(cmd, sudo = True) + + if err and proc.poll(): + msg = "Error configuring interface with command '%s'" % cmd + self.error(msg, out, err) + raise RuntimeError, "%s - %s - %s" % (msg, out, err) + + if ip6 and mask6: + cmd = "ifconfig %(devname)s inet6 add %(ip6)s/%(mask6)d" % ({ + 'devname': devname, + 'ip6': ip6, + 'mask6': mask6}) + + (out, err), proc = self.node.execute(cmd, sudo = True) + + if err and proc.poll(): + msg = "Error seting ipv6 for interface using command '%s' " % cmd + self.error(msg, out, err) + raise RuntimeError, "%s - %s - %s" % (msg, out, err) + + super(LinuxInterface, self).provision(filters = filters) + + def deploy(self): + # Wait until node is provisioned + node = self.node + chan = self.channel + + if not node or node.state < ResourceState.PROVISIONED: + self.ec.schedule(reschedule_delay, self.deploy) + elif not chan or chan.state < ResourceState.READY: + self.ec.schedule(reschedule_delay, self.deploy) + else: + # Verify if the interface exists in node. If not, configue + # if yes, load existing configuration + try: + self.discover() + self.provision() + except: + self._state = ResourceState.FAILED + raise + + super(LinuxInterface, self).deploy() + + def release(self): + tear_down = self.get("tearDown") + if tear_down: + self.execute(tear_down) + + super(LinuxInterface, self).release() + + def valid_connection(self, guid): + # TODO: Validate! + return True + + def load_configuration(self, devname, mac, ip4, mask4, ip6, mask6, mtu, up): + self.set("deviceName", devname) + self.set("mac", mac) + self.set("ip4", ip4) + self.set("mask4", mask4) + self.set("ip6", ip6) + self.set("mask6", mask6) + + # set the following without validating or triggering hooks + attr = self._attrs["up"] + attr._value = up + attr = self._attrs["mtu"] + + def add_set_hooks(self): + attrup = self._attrs["up"] + attrup.set_hook = self.set_hook_up + + attrmtu = self._attrs["mtu"] + attrmtu.set_hook = self.set_hook_mtu + + def set_hook_up(self, oldval, newval): + if oldval == newval: + return oldval + + # configure interface up + if newval == True: + cmd = "ifup %s" % self.get("deviceName") + elif newval == False: + cmd = "ifdown %s" % self.get("deviceName") + + (out, err), proc = self.node.execute(cmd, sudo = True) + + if err and proc.poll(): + msg = "Error setting interface up/down using command '%s' " % cmd + self.error(msg, err, out) + return oldval + + return newval + + def set_hook_mtu(self, oldval, newval): + if oldval == newval: + return oldval + + cmd = "ifconfig %s mtu %d" % (self.get("deviceName"), newval) + + (out, err), proc = self.node.execute(cmd, sudo = True) + + if err and proc.poll(): + msg = "Error setting interface MTU using command '%s' " % cmd + self.error(msg, err, out) + return oldval + + return newval + diff --git a/src/nepi/resources/linux/node.py b/src/nepi/resources/linux/node.py index 3f306338..85e22863 100644 --- a/src/nepi/resources/linux/node.py +++ b/src/nepi/resources/linux/node.py @@ -1,7 +1,7 @@ -from neco.execution.attribute import Attribute, Flags -from neco.execution.resource import ResourceManager, clsinit, ResourceState -from neco.resources.linux import rpmfuncs, debfuncs -from neco.util import sshfuncs, execfuncs +from nepi.execution.attribute import Attribute, Flags +from nepi.execution.resource import ResourceManager, clsinit, ResourceState +from nepi.resources.linux import rpmfuncs, debfuncs +from nepi.util import sshfuncs, execfuncs import collections import logging @@ -156,7 +156,7 @@ class LinuxNode(ResourceManager): # Node needs to wait until all associated interfaces are # ready before it can finalize deployment - from neco.resources.linux.interface import LinuxInterface + from nepi.resources.linux.interface import LinuxInterface ifaces = self.get_connected(LinuxInterface.rtype()) for iface in ifaces: if iface.state < ResourceState.READY: diff --git a/src/nepi/resources/ns3/ns3wrapper_server.py b/src/nepi/resources/ns3/ns3wrapper_server.py new file mode 100644 index 00000000..5a82909a --- /dev/null +++ b/src/nepi/resources/ns3/ns3wrapper_server.py @@ -0,0 +1,344 @@ +class Server(object): + def __init__(self, root_dir = ".", log_level = "ERROR", + environment_setup = "", clean_root = False): + self._root_dir = root_dir + self._clean_root = clean_root + self._stop = False + self._ctrl_sock = None + self._log_level = log_level + self._rdbuf = "" + self._environment_setup = environment_setup + + def run(self): + try: + if self.daemonize(): + self.post_daemonize() + self.loop() + self.cleanup() + # ref: "os._exit(0)" + # can not return normally after fork beacuse no exec was done. + # This means that if we don't do a os._exit(0) here the code that + # follows the call to "Server.run()" in the "caller code" will be + # executed... but by now it has already been executed after the + # first process (the one that did the first fork) returned. + os._exit(0) + except: + print >>sys.stderr, "SERVER_ERROR." + self.log_error() + self.cleanup() + os._exit(0) + print >>sys.stderr, "SERVER_READY." + + def daemonize(self): + # pipes for process synchronization + (r, w) = os.pipe() + + # build root folder + root = os.path.normpath(self._root_dir) + if self._root_dir not in [".", ""] and os.path.exists(root) \ + and self._clean_root: + shutil.rmtree(root) + if not os.path.exists(root): + os.makedirs(root, 0755) + + pid1 = os.fork() + if pid1 > 0: + os.close(w) + while True: + try: + os.read(r, 1) + except OSError, e: # pragma: no cover + if e.errno == errno.EINTR: + continue + else: + raise + break + os.close(r) + # os.waitpid avoids leaving a (zombie) process + st = os.waitpid(pid1, 0)[1] + if st: + raise RuntimeError("Daemonization failed") + # return 0 to inform the caller method that this is not the + # daemonized process + return 0 + os.close(r) + + # Decouple from parent environment. + os.chdir(self._root_dir) + os.umask(0) + os.setsid() + + # fork 2 + pid2 = os.fork() + if pid2 > 0: + # see ref: "os._exit(0)" + os._exit(0) + + # close all open file descriptors. + max_fd = resource.getrlimit(resource.RLIMIT_NOFILE)[1] + if (max_fd == resource.RLIM_INFINITY): + max_fd = MAX_FD + for fd in range(3, max_fd): + if fd != w: + try: + os.close(fd) + except OSError: + pass + + # Redirect standard file descriptors. + stdin = open(DEV_NULL, "r") + stderr = stdout = open(STD_ERR, "a", 0) + os.dup2(stdin.fileno(), sys.stdin.fileno()) + # NOTE: sys.stdout.write will still be buffered, even if the file + # was opened with 0 buffer + os.dup2(stdout.fileno(), sys.stdout.fileno()) + os.dup2(stderr.fileno(), sys.stderr.fileno()) + + # setup environment + if self._environment_setup: + # parse environment variables and pass to child process + # do it by executing shell commands, in case there's some heavy setup involved + envproc = subprocess.Popen( + [ "bash", "-c", + "( %s python -c 'import os,sys ; print \"\\x01\".join(\"\\x02\".join(map(str,x)) for x in os.environ.iteritems())' ) | tail -1" % + ( self._environment_setup, ) ], + stdin = subprocess.PIPE, + stdout = subprocess.PIPE, + stderr = subprocess.PIPE + ) + out,err = envproc.communicate() + + # parse new environment + if out: + environment = dict(map(lambda x:x.split("\x02"), out.split("\x01"))) + + # apply to current environment + for name, value in environment.iteritems(): + os.environ[name] = value + + # apply pythonpath + if 'PYTHONPATH' in environment: + sys.path = environment['PYTHONPATH'].split(':') + sys.path + + # create control socket + self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + try: + self._ctrl_sock.bind(CTRL_SOCK) + except socket.error: + # Address in use, check pidfile + pid = None + try: + pidfile = open(CTRL_PID, "r") + pid = pidfile.read() + pidfile.close() + pid = int(pid) + except: + # no pidfile + pass + + if pid is not None: + # Check process liveliness + if not os.path.exists("/proc/%d" % (pid,)): + # Ok, it's dead, clean the socket + os.remove(CTRL_SOCK) + + # try again + self._ctrl_sock.bind(CTRL_SOCK) + + self._ctrl_sock.listen(0) + + # Save pidfile + pidfile = open(CTRL_PID, "w") + pidfile.write(str(os.getpid())) + pidfile.close() + + # let the parent process know that the daemonization is finished + os.write(w, "\n") + os.close(w) + return 1 + + def post_daemonize(self): + os.environ["NEPI_CONTROLLER_LOGLEVEL"] = self._log_level + # QT, for some strange reason, redefines the SIGCHILD handler to write + # a \0 to a fd (lets say fileno 'x'), when ever a SIGCHILD is received. + # Server dameonization closes all file descriptors from fileno '3', + # but the overloaded handler (inherited by the forked process) will + # keep trying to write the \0 to fileno 'x', which might have been reused + # after closing, for other operations. This is bad bad bad when fileno 'x' + # is in use for communication pouroses, because unexpected \0 start + # appearing in the communication messages... this is exactly what happens + # when using netns in daemonized form. Thus, be have no other alternative than + # restoring the SIGCHLD handler to the default here. + import signal + signal.signal(signal.SIGCHLD, signal.SIG_DFL) + + def loop(self): + while not self._stop: + conn, addr = self._ctrl_sock.accept() + self.log_error("ACCEPTED CONNECTION: %s" % (addr,)) + conn.settimeout(5) + while not self._stop: + try: + msg = self.recv_msg(conn) + except socket.timeout, e: + #self.log_error("SERVER recv_msg: connection timedout ") + continue + + if not msg: + self.log_error("CONNECTION LOST") + break + + if msg == STOP_MSG: + self._stop = True + reply = self.stop_action() + else: + reply = self.reply_action(msg) + + try: + self.send_reply(conn, reply) + except socket.error: + self.log_error() + self.log_error("NOTICE: Awaiting for reconnection") + break + try: + conn.close() + except: + # Doesn't matter + self.log_error() + + def recv_msg(self, conn): + data = [self._rdbuf] + chunk = data[0] + while '\n' not in chunk: + try: + chunk = conn.recv(1024) + except (OSError, socket.error), e: + if e[0] != errno.EINTR: + raise + else: + continue + if chunk: + data.append(chunk) + else: + # empty chunk = EOF + break + data = ''.join(data).split('\n',1) + while len(data) < 2: + data.append('') + data, self._rdbuf = data + + decoded = base64.b64decode(data) + return decoded.rstrip() + + def send_reply(self, conn, reply): + encoded = base64.b64encode(reply) + conn.send("%s\n" % encoded) + + def cleanup(self): + try: + self._ctrl_sock.close() + os.remove(CTRL_SOCK) + except: + self.log_error() + + def stop_action(self): + return "Stopping server" + + def reply_action(self, msg): + return "Reply to: %s" % msg + + def log_error(self, text = None, context = ''): + if text == None: + text = traceback.format_exc() + date = time.strftime("%Y-%m-%d %H:%M:%S") + if context: + context = " (%s)" % (context,) + sys.stderr.write("ERROR%s: %s\n%s\n" % (context, date, text)) + return text + + def log_debug(self, text): + if self._log_level == DC.DEBUG_LEVEL: + date = time.strftime("%Y-%m-%d %H:%M:%S") + sys.stderr.write("DEBUG: %s\n%s\n" % (date, text)) + +class Forwarder(object): + def __init__(self, root_dir = "."): + self._ctrl_sock = None + self._root_dir = root_dir + self._stop = False + self._rdbuf = "" + + def forward(self): + self.connect() + print >>sys.stderr, "FORWARDER_READY." + while not self._stop: + data = self.read_data() + if not data: + # Connection to client lost + break + self.send_to_server(data) + + data = self.recv_from_server() + if not data: + # Connection to server lost + raise IOError, "Connection to server lost while "\ + "expecting response" + self.write_data(data) + self.disconnect() + + def read_data(self): + return sys.stdin.readline() + + def write_data(self, data): + sys.stdout.write(data) + # sys.stdout.write is buffered, this is why we need to do a flush() + sys.stdout.flush() + + def send_to_server(self, data): + try: + self._ctrl_sock.send(data) + except (IOError, socket.error), e: + if e[0] == errno.EPIPE: + self.connect() + self._ctrl_sock.send(data) + else: + raise e + encoded = data.rstrip() + msg = base64.b64decode(encoded) + if msg == STOP_MSG: + self._stop = True + + def recv_from_server(self): + data = [self._rdbuf] + chunk = data[0] + while '\n' not in chunk: + try: + chunk = self._ctrl_sock.recv(1024) + except (OSError, socket.error), e: + if e[0] != errno.EINTR: + raise + continue + if chunk: + data.append(chunk) + else: + # empty chunk = EOF + break + data = ''.join(data).split('\n',1) + while len(data) < 2: + data.append('') + data, self._rdbuf = data + + return data+'\n' + + def connect(self): + self.disconnect() + self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + sock_addr = os.path.join(self._root_dir, CTRL_SOCK) + self._ctrl_sock.connect(sock_addr) + + def disconnect(self): + try: + self._ctrl_sock.close() + except: + pass + diff --git a/src/nepi/resources/omf/omf_api.py b/src/nepi/resources/omf/omf_api.py index 7c71090a..754be530 100644 --- a/src/nepi/resources/omf/omf_api.py +++ b/src/nepi/resources/omf/omf_api.py @@ -4,11 +4,11 @@ import ssl import sys import time import hashlib -import neco +import nepi import threading -from neco.resources.omf.omf_client import OMFClient -from neco.resources.omf.omf_messages_5_4 import MessageHandler +from nepi.resources.omf.omf_client import OMFClient +from nepi.resources.omf.omf_messages_5_4 import MessageHandler class OMFAPI(object): """ @@ -56,8 +56,8 @@ class OMFAPI(object): self._hostnames = [] self._xmpp_root = xmpp_root or "OMF_5.4" - self._logger = logging.getLogger("neco.omf.omfApi ") - self._logger.setLevel(neco.LOGLEVEL) + self._logger = logging.getLogger("nepi.omf.omfApi ") + self._logger.setLevel(nepi.LOGLEVEL) # OMF xmpp client self._client = None diff --git a/src/nepi/resources/omf/omf_application.py b/src/nepi/resources/omf/omf_application.py index 8653fa3b..5196ecd0 100644 --- a/src/nepi/resources/omf/omf_application.py +++ b/src/nepi/resources/omf/omf_application.py @@ -1,10 +1,10 @@ #!/usr/bin/env python -from neco.execution.resource import ResourceManager, clsinit -from neco.execution.attribute import Attribute, Flags -from neco.resources.omf.omf_api import OMFAPIFactory +from nepi.execution.resource import ResourceManager, clsinit +from nepi.execution.attribute import Attribute, Flags +from nepi.resources.omf.omf_api import OMFAPIFactory -import neco +import nepi import logging @clsinit @@ -73,8 +73,8 @@ class OMFApplication(ResourceManager): self._omf_api = None - self._logger = logging.getLogger("neco.omf.omfApp ") - self._logger.setLevel(neco.LOGLEVEL) + self._logger = logging.getLogger("nepi.omf.omfApp ") + self._logger.setLevel(nepi.LOGLEVEL) def _validate_connection(self, guid): diff --git a/src/nepi/resources/omf/omf_channel.py b/src/nepi/resources/omf/omf_channel.py index 111af67a..80198968 100644 --- a/src/nepi/resources/omf/omf_channel.py +++ b/src/nepi/resources/omf/omf_channel.py @@ -1,10 +1,10 @@ #!/usr/bin/env python -from neco.execution.resource import ResourceManager, clsinit -from neco.execution.attribute import Attribute, Flags +from nepi.execution.resource import ResourceManager, clsinit +from nepi.execution.attribute import Attribute, Flags -from neco.resources.omf.omf_api import OMFAPIFactory +from nepi.resources.omf.omf_api import OMFAPIFactory -import neco +import nepi import logging @clsinit @@ -60,8 +60,8 @@ class OMFChannel(ResourceManager): self._omf_api = None - self._logger = logging.getLogger("neco.omf.omfChannel") - self._logger.setLevel(neco.LOGLEVEL) + self._logger = logging.getLogger("nepi.omf.omfChannel") + self._logger.setLevel(nepi.LOGLEVEL) def _validate_connection(self, guid): """Check if the connection is available. diff --git a/src/nepi/resources/omf/omf_client.py b/src/nepi/resources/omf/omf_client.py index 53dcb39c..e5058f3e 100644 --- a/src/nepi/resources/omf/omf_client.py +++ b/src/nepi/resources/omf/omf_client.py @@ -4,7 +4,7 @@ from sleekxmpp.exceptions import IqError, IqTimeout import traceback import xml.etree.ElementTree as ET -import neco +import nepi # inherit from BaseXmpp and XMLStream classes class OMFClient(sleekxmpp.ClientXMPP): @@ -46,8 +46,8 @@ class OMFClient(sleekxmpp.ClientXMPP): self.add_event_handler("register", self.register) self.add_event_handler("pubsub_publish", self.handle_omf_message) - self._logger = logging.getLogger("neco.omf.xmppClient") - self._logger.setLevel(neco.LOGLEVEL) + self._logger = logging.getLogger("nepi.omf.xmppClient") + self._logger.setLevel(nepi.LOGLEVEL) @property def ready(self): diff --git a/src/nepi/resources/omf/omf_interface.py b/src/nepi/resources/omf/omf_interface.py index 1bc0fc48..36a479f8 100644 --- a/src/nepi/resources/omf/omf_interface.py +++ b/src/nepi/resources/omf/omf_interface.py @@ -1,10 +1,10 @@ #!/usr/bin/env python -from neco.execution.resource import ResourceManager, clsinit -from neco.execution.attribute import Attribute, Flags +from nepi.execution.resource import ResourceManager, clsinit +from nepi.execution.attribute import Attribute, Flags -from neco.resources.omf.omf_api import OMFAPIFactory +from nepi.resources.omf.omf_api import OMFAPIFactory -import neco +import nepi import logging @clsinit @@ -69,8 +69,8 @@ class OMFWifiInterface(ResourceManager): self._omf_api = None self._alias = self.get('alias') - self._logger = logging.getLogger("neco.omf.omfIface ") - self._logger.setLevel(neco.LOGLEVEL) + self._logger = logging.getLogger("nepi.omf.omfIface ") + self._logger.setLevel(nepi.LOGLEVEL) def _validate_connection(self, guid): """ Check if the connection is available. diff --git a/src/nepi/resources/omf/omf_node.py b/src/nepi/resources/omf/omf_node.py index d5025a00..1396cdb2 100644 --- a/src/nepi/resources/omf/omf_node.py +++ b/src/nepi/resources/omf/omf_node.py @@ -1,10 +1,10 @@ #!/usr/bin/env python -from neco.execution.resource import ResourceManager, clsinit -from neco.execution.attribute import Attribute, Flags +from nepi.execution.resource import ResourceManager, clsinit +from nepi.execution.attribute import Attribute, Flags -from neco.resources.omf.omf_api import OMFAPIFactory +from nepi.resources.omf.omf_api import OMFAPIFactory -import neco +import nepi import logging import time @@ -79,10 +79,10 @@ class OMFNode(ResourceManager): self._omf_api = None - self._logger = logging.getLogger("neco.omf.omfNode ") + self._logger = logging.getLogger("nepi.omf.omfNode ") # XXX: TO DISCUSS - self._logger.setLevel(neco.LOGLEVEL) + self._logger.setLevel(nepi.LOGLEVEL) def _validate_connection(self, guid): """Check if the connection is available. diff --git a/src/nepi/resources/omf/xx_omf_resource.py b/src/nepi/resources/omf/xx_omf_resource.py index 978ac198..0ca5962a 100644 --- a/src/nepi/resources/omf/xx_omf_resource.py +++ b/src/nepi/resources/omf/xx_omf_resource.py @@ -1,8 +1,8 @@ #!/usr/bin/env python -from neco.execution.resource import ResourceManager, clsinit -from neco.execution.attribute import Attribute +from nepi.execution.resource import ResourceManager, clsinit +from nepi.execution.attribute import Attribute -from neco.resources.omf.omf_api import OMFAPIFactory +from nepi.resources.omf.omf_api import OMFAPIFactory @clsinit class OMFResource(ResourceManager): diff --git a/src/nepi/util/execfuncs.py b/src/nepi/util/execfuncs.py index 2ff76ee1..7e2d506d 100644 --- a/src/nepi/util/execfuncs.py +++ b/src/nepi/util/execfuncs.py @@ -1,4 +1,4 @@ -from neco.util.sshfuncs import RUNNING, FINISHED, NOT_STARTED, STDOUT +from nepi.util.sshfuncs import RUNNING, FINISHED, NOT_STARTED, STDOUT import subprocess diff --git a/src/nepi/util/parser.py b/src/nepi/util/parser.py index e2c54783..b06cb7e7 100644 --- a/src/nepi/util/parser.py +++ b/src/nepi/util/parser.py @@ -1,4 +1,4 @@ -from neco.design.box import Box +from nepi.design.box import Box from xml.dom import minidom import sys diff --git a/test/design/box.py b/test/design/box.py index 71c8f154..f59c5be0 100755 --- a/test/design/box.py +++ b/test/design/box.py @@ -1,6 +1,6 @@ #!/usr/bin/env python -from neco.design.box import Box +from nepi.design.box import Box import unittest diff --git a/test/execution/ec.py b/test/execution/ec.py index 93225406..6c44878f 100755 --- a/test/execution/ec.py +++ b/test/execution/ec.py @@ -1,7 +1,7 @@ #!/usr/bin/env python -from neco.execution.ec import ExperimentController, ECState -from neco.execution.scheduler import TaskStatus +from nepi.execution.ec import ExperimentController, ECState +from nepi.execution.scheduler import TaskStatus import datetime import time diff --git a/test/execution/resource.py b/test/execution/resource.py index 128c6df8..cb9423d9 100755 --- a/test/execution/resource.py +++ b/test/execution/resource.py @@ -1,7 +1,7 @@ #!/usr/bin/env python -from neco.execution.attribute import Attribute -from neco.execution.ec import ExperimentController -from neco.execution.resource import ResourceManager, ResourceState, clsinit +from nepi.execution.attribute import Attribute +from nepi.execution.ec import ExperimentController +from nepi.execution.resource import ResourceManager, ResourceState, clsinit import time import unittest @@ -27,7 +27,7 @@ class AnotherResource(ResourceManager): class ResourceFactoryTestCase(unittest.TestCase): def test_add_resource_factory(self): - from neco.execution.resource import ResourceFactory + from nepi.execution.resource import ResourceFactory ResourceFactory.register_type(MyResource) ResourceFactory.register_type(AnotherResource) @@ -123,7 +123,7 @@ class ResourceManagerTestCase(unittest.TestCase): - The channel doesn't wait for any other resource to be ready """ - from neco.execution.resource import ResourceFactory + from nepi.execution.resource import ResourceFactory ResourceFactory.register_type(Application) ResourceFactory.register_type(Node) diff --git a/test/resources/linux/application.py b/test/resources/linux/application.py index 7167978e..4f9f90e8 100644 --- a/test/resources/linux/application.py +++ b/test/resources/linux/application.py @@ -1,9 +1,9 @@ #!/usr/bin/env python -from neco.execution.ec import ExperimentController -from neco.execution.resource import ResourceState, ResourceAction -from neco.execution.trace import TraceAttr -from neco.resources.linux.node import LinuxNode -from neco.resources.linux.application import LinuxApplication +from nepi.execution.ec import ExperimentController +from nepi.execution.resource import ResourceState, ResourceAction +from nepi.execution.trace import TraceAttr +from nepi.resources.linux.node import LinuxNode +from nepi.resources.linux.application import LinuxApplication from test_utils import skipIfNotAlive @@ -24,7 +24,7 @@ class LinuxApplicationTestCase(unittest.TestCase): @skipIfNotAlive def t_stdout(self, host, user): - from neco.execution.resource import ResourceFactory + from nepi.execution.resource import ResourceFactory ResourceFactory.register_type(LinuxNode) ResourceFactory.register_type(LinuxApplication) @@ -56,7 +56,7 @@ class LinuxApplicationTestCase(unittest.TestCase): @skipIfNotAlive def t_ping(self, host, user): - from neco.execution.resource import ResourceFactory + from nepi.execution.resource import ResourceFactory ResourceFactory.register_type(LinuxNode) ResourceFactory.register_type(LinuxApplication) @@ -98,7 +98,7 @@ class LinuxApplicationTestCase(unittest.TestCase): @skipIfNotAlive def t_concurrency(self, host, user): - from neco.execution.resource import ResourceFactory + from nepi.execution.resource import ResourceFactory ResourceFactory.register_type(LinuxNode) ResourceFactory.register_type(LinuxApplication) @@ -146,7 +146,7 @@ class LinuxApplicationTestCase(unittest.TestCase): @skipIfNotAlive def t_condition(self, host, user, depends): - from neco.execution.resource import ResourceFactory + from nepi.execution.resource import ResourceFactory ResourceFactory.register_type(LinuxNode) ResourceFactory.register_type(LinuxApplication) @@ -189,7 +189,7 @@ class LinuxApplicationTestCase(unittest.TestCase): @skipIfNotAlive def t_http_sources(self, host, user): - from neco.execution.resource import ResourceFactory + from nepi.execution.resource import ResourceFactory ResourceFactory.register_type(LinuxNode) ResourceFactory.register_type(LinuxApplication) diff --git a/test/resources/linux/interface.py b/test/resources/linux/interface.py index f94c7e6e..44c293af 100644 --- a/test/resources/linux/interface.py +++ b/test/resources/linux/interface.py @@ -1,10 +1,10 @@ #!/usr/bin/env python -from neco.execution.ec import ExperimentController -from neco.execution.resource import ResourceState -from neco.resources.linux.node import LinuxNode -from neco.resources.linux.interface import LinuxInterface -from neco.resources.linux.channel import LinuxChannel -from neco.util.sshfuncs import RUNNING, FINISHED +from nepi.execution.ec import ExperimentController +from nepi.execution.resource import ResourceState +from nepi.resources.linux.node import LinuxNode +from nepi.resources.linux.interface import LinuxInterface +from nepi.resources.linux.channel import LinuxChannel +from nepi.util.sshfuncs import RUNNING, FINISHED from test_utils import skipIfNotAlive @@ -23,7 +23,7 @@ class LinuxInterfaceTestCase(unittest.TestCase): @skipIfNotAlive def t_deploy(self, host, user): - from neco.execution.resource import ResourceFactory + from nepi.execution.resource import ResourceFactory ResourceFactory.register_type(LinuxNode) ResourceFactory.register_type(LinuxInterface) diff --git a/test/resources/linux/node.py b/test/resources/linux/node.py index 8849a4cb..a09154bc 100644 --- a/test/resources/linux/node.py +++ b/test/resources/linux/node.py @@ -1,6 +1,6 @@ #!/usr/bin/env python -from neco.resources.linux.node import LinuxNode -from neco.util.sshfuncs import RUNNING, FINISHED +from nepi.resources.linux.node import LinuxNode +from nepi.util.sshfuncs import RUNNING, FINISHED from test_utils import skipIfNotAlive, skipInteractive, create_node diff --git a/test/resources/linux/test_utils.py b/test/resources/linux/test_utils.py index 1cf8aba3..d9157073 100644 --- a/test/resources/linux/test_utils.py +++ b/test/resources/linux/test_utils.py @@ -1,4 +1,4 @@ -from neco.resources.linux.node import LinuxNode +from nepi.resources.linux.node import LinuxNode import os diff --git a/test/resources/ns3/ns3wrapper.py b/test/resources/ns3/ns3wrapper.py index de9faa83..815c0f14 100644 --- a/test/resources/ns3/ns3wrapper.py +++ b/test/resources/ns3/ns3wrapper.py @@ -10,7 +10,7 @@ # node n0 sends IGMP traffic to node n3 -from neco.resources.ns3.ns3wrapper import NS3Wrapper +from nepi.resources.ns3.ns3wrapper import NS3Wrapper import os.path import time diff --git a/test/resources/omf/omf_vlc_exp.py b/test/resources/omf/omf_vlc_exp.py index 5c375473..d9ab6c5f 100755 --- a/test/resources/omf/omf_vlc_exp.py +++ b/test/resources/omf/omf_vlc_exp.py @@ -1,15 +1,15 @@ #!/usr/bin/env python -from neco.execution.resource import ResourceFactory, ResourceManager, ResourceAction, ResourceState -from neco.execution.ec import ExperimentController +from nepi.execution.resource import ResourceFactory, ResourceManager, ResourceAction, ResourceState +from nepi.execution.ec import ExperimentController -from neco.resources.omf.omf_node import OMFNode -from neco.resources.omf.omf_application import OMFApplication -from neco.resources.omf.omf_interface import OMFWifiInterface -from neco.resources.omf.omf_channel import OMFChannel -from neco.resources.omf.omf_api import OMFAPIFactory +from nepi.resources.omf.omf_node import OMFNode +from nepi.resources.omf.omf_application import OMFApplication +from nepi.resources.omf.omf_interface import OMFWifiInterface +from nepi.resources.omf.omf_channel import OMFChannel +from nepi.resources.omf.omf_api import OMFAPIFactory -from neco.util import guid -from neco.util.timefuncs import * +from nepi.util import guid +from nepi.util.timefuncs import * import time import unittest diff --git a/test/util/parser.py b/test/util/parser.py index 5ab70fe5..44cac7aa 100755 --- a/test/util/parser.py +++ b/test/util/parser.py @@ -1,7 +1,7 @@ #!/usr/bin/env python -from neco.design.box import Box -from neco.util.parser import XMLParser +from nepi.design.box import Box +from nepi.util.parser import XMLParser import unittest diff --git a/test/util/plot.py b/test/util/plot.py index 20ae5110..806ae5d0 100755 --- a/test/util/plot.py +++ b/test/util/plot.py @@ -1,7 +1,7 @@ #!/usr/bin/env python -from neco.design.box import Box -from neco.util.plot import Plotter +from nepi.design.box import Box +from nepi.util.plot import Plotter import subprocess import unittest diff --git a/test/util/sshfuncs.py b/test/util/sshfuncs.py index c9afb32f..f8602145 100644 --- a/test/util/sshfuncs.py +++ b/test/util/sshfuncs.py @@ -1,6 +1,6 @@ #!/usr/bin/env python -from neco.util.sshfuncs import rexec, rcopy, rspawn, rcheckpid, rstatus, rkill,\ +from nepi.util.sshfuncs import rexec, rcopy, rspawn, rcheckpid, rstatus, rkill,\ RUNNING, FINISHED import getpass