Replacing string "neco" by "nepi"
authorAlina Quereilhac <alina.quereilhac@inria.fr>
Sun, 19 May 2013 20:12:59 +0000 (22:12 +0200)
committerAlina Quereilhac <alina.quereilhac@inria.fr>
Sun, 19 May 2013 20:12:59 +0000 (22:12 +0200)
34 files changed:
examples/automated_vlc_experiment_plexus.py
examples/linux/ccnx/simple_topo.py
examples/linux/scalability.py
examples/manual_vlc_experiment_plexus.py
setup.py
src/nepi/design/box.py
src/nepi/execution/ec.py
src/nepi/execution/resource.py
src/nepi/resources/linux/application.py
src/nepi/resources/linux/channel.py
src/nepi/resources/linux/interface.py [new file with mode: 0644]
src/nepi/resources/linux/node.py
src/nepi/resources/ns3/ns3wrapper_server.py [new file with mode: 0644]
src/nepi/resources/omf/omf_api.py
src/nepi/resources/omf/omf_application.py
src/nepi/resources/omf/omf_channel.py
src/nepi/resources/omf/omf_client.py
src/nepi/resources/omf/omf_interface.py
src/nepi/resources/omf/omf_node.py
src/nepi/resources/omf/xx_omf_resource.py
src/nepi/util/execfuncs.py
src/nepi/util/parser.py
test/design/box.py
test/execution/ec.py
test/execution/resource.py
test/resources/linux/application.py
test/resources/linux/interface.py
test/resources/linux/node.py
test/resources/linux/test_utils.py
test/resources/ns3/ns3wrapper.py
test/resources/omf/omf_vlc_exp.py
test/util/parser.py
test/util/plot.py
test/util/sshfuncs.py

index 1cae09a..b9d7e35 100644 (file)
@@ -1,11 +1,11 @@
 #!/usr/bin/env python
-from neco.execution.resource import ResourceFactory, ResourceAction, ResourceState
-from neco.execution.ec import ExperimentController
+from nepi.execution.resource import ResourceFactory, ResourceAction, ResourceState
+from nepi.execution.ec import ExperimentController
 
-from neco.resources.omf.omf_node import OMFNode
-from neco.resources.omf.omf_application import OMFApplication
-from neco.resources.omf.omf_interface import OMFWifiInterface
-from neco.resources.omf.omf_channel import OMFChannel
+from nepi.resources.omf.omf_node import OMFNode
+from nepi.resources.omf.omf_application import OMFApplication
+from nepi.resources.omf.omf_interface import OMFWifiInterface
+from nepi.resources.omf.omf_channel import OMFChannel
 
 import logging
 import time
index 65faf71..ee41017 100644 (file)
@@ -1,6 +1,6 @@
 #!/usr/bin/env python
-from neco.execution.ec import ExperimentController, ECState 
-from neco.execution.resource import ResourceState, ResourceAction, \
+from nepi.execution.ec import ExperimentController, ECState 
+from nepi.execution.resource import ResourceState, ResourceAction, \
         populate_factory
 
 from optparse import OptionParser, SUPPRESS_HELP
index 110405a..903e6b9 100644 (file)
@@ -1,6 +1,6 @@
 #!/usr/bin/env python
-from neco.execution.ec import ExperimentController, ECState 
-from neco.execution.resource import ResourceState, ResourceAction, \
+from nepi.execution.ec import ExperimentController, ECState 
+from nepi.execution.resource import ResourceState, ResourceAction, \
         populate_factory
 
 from optparse import OptionParser, SUPPRESS_HELP
index ebdd8e1..fc449c7 100644 (file)
@@ -1,11 +1,11 @@
 #!/usr/bin/env python
-from neco.execution.resource import ResourceFactory
-from neco.execution.ec import ExperimentController
+from nepi.execution.resource import ResourceFactory
+from nepi.execution.ec import ExperimentController
 
-from neco.resources.omf.omf_node import OMFNode
-from neco.resources.omf.omf_application import OMFApplication
-from neco.resources.omf.omf_interface import OMFWifiInterface
-from neco.resources.omf.omf_channel import OMFChannel
+from nepi.resources.omf.omf_node import OMFNode
+from nepi.resources.omf.omf_application import OMFApplication
+from nepi.resources.omf.omf_interface import OMFWifiInterface
+from nepi.resources.omf.omf_channel import OMFChannel
 
 import logging
 import time
index 0be05ec..7bae51b 100755 (executable)
--- a/setup.py
+++ b/setup.py
@@ -3,23 +3,23 @@ from distutils.core import setup
 import sys
 
 setup(
-        name        = "neco",
-        version     = "0.01",
+        name        = "nepi",
+        version     = "3.0",
         description = "Network Experiment Controller",
         author      = "Alina Quereilhac",
         url         = "",
         license     = "GPLv2",
         platforms   = "Linux",
         packages    = [
-            "neco",
-            "neco.design",
-            "neco.execution",
-            "neco.resources",
-            "neco.resources.linux",
-            "neco.resources.netns",
-            "neco.resources.ns3",
-            "neco.resources.omf",
-            "neco.resources.planetlab",
-            "neco.util"],
+            "nepi",
+            "nepi.design",
+            "nepi.execution",
+            "nepi.resources",
+            "nepi.resources.linux",
+            "nepi.resources.netns",
+            "nepi.resources.ns3",
+            "nepi.resources.omf",
+            "nepi.resources.planetlab",
+            "nepi.util"],
         package_dir = {"": "src"},
     )
index 121827a..82bb899 100644 (file)
@@ -1,4 +1,4 @@
-from neco.util import guid
+from nepi.util import guid
 
 guid_gen = guid.GuidGenerator()
 
index 26f0cd4..0e76650 100644 (file)
@@ -5,13 +5,13 @@ import sys
 import time
 import threading
 
-from neco.util import guid
-from neco.util.parallel import ParallelRun
-from neco.util.timefuncs import strfnow, strfdiff, strfvalid 
-from neco.execution.resource import ResourceFactory, ResourceAction, \
+from nepi.util import guid
+from nepi.util.parallel import ParallelRun
+from nepi.util.timefuncs import strfnow, strfdiff, strfvalid 
+from nepi.execution.resource import ResourceFactory, ResourceAction, \
         ResourceState
-from neco.execution.scheduler import HeapScheduler, Task, TaskStatus
-from neco.execution.trace import TraceAttr
+from nepi.execution.scheduler import HeapScheduler, Task, TaskStatus
+from nepi.execution.trace import TraceAttr
 
 # TODO: use multiprocessing instead of threading
 # TODO: When a failure occurrs during deployment scp and ssh processes are left running behind!!
index a7e7758..dfa219e 100644 (file)
@@ -1,5 +1,5 @@
-from neco.util.timefuncs import strfnow, strfdiff, strfvalid
-from neco.execution.trace import TraceAttr
+from nepi.util.timefuncs import strfnow, strfdiff, strfvalid
+from nepi.execution.trace import TraceAttr
 
 import copy
 import functools
@@ -572,8 +572,8 @@ def find_types():
     search_path = os.environ.get("NEPI_SEARCH_PATH", "")
     search_path = set(search_path.split(" "))
    
-    import neco.resources 
-    path = os.path.dirname(neco.resources.__file__)
+    import nepi.resources 
+    path = os.path.dirname(nepi.resources.__file__)
     search_path.add(path)
 
     types = []
index 882fb4c..0364939 100644 (file)
@@ -1,9 +1,9 @@
-from neco.execution.attribute import Attribute, Flags, Types
-from neco.execution.trace import Trace, TraceAttr
-from neco.execution.resource import ResourceManager, clsinit, ResourceState
-from neco.resources.linux.node import LinuxNode
-from neco.util import sshfuncs 
-from neco.util.timefuncs import strfnow, strfdiff
+from nepi.execution.attribute import Attribute, Flags, Types
+from nepi.execution.trace import Trace, TraceAttr
+from nepi.execution.resource import ResourceManager, clsinit, ResourceState
+from nepi.resources.linux.node import LinuxNode
+from nepi.util import sshfuncs 
+from nepi.util.timefuncs import strfnow, strfdiff
 
 import logging
 import os
index f4c1cf2..142478b 100644 (file)
@@ -1,6 +1,6 @@
-from neco.execution.attribute import Attribute, Flags
-from neco.execution.resource import ResourceManager, clsinit, ResourceState
-from neco.resources.linux.node import LinuxNode
+from nepi.execution.attribute import Attribute, Flags
+from nepi.execution.resource import ResourceManager, clsinit, ResourceState
+from nepi.resources.linux.node import LinuxNode
 
 import collections
 import logging
diff --git a/src/nepi/resources/linux/interface.py b/src/nepi/resources/linux/interface.py
new file mode 100644 (file)
index 0000000..f572766
--- /dev/null
@@ -0,0 +1,296 @@
+from nepi.execution.attribute import Attribute, Types, Flags
+from nepi.execution.resource import ResourceManager, clsinit, ResourceState
+from nepi.resources.linux.node import LinuxNode
+from nepi.resources.linux.channel import LinuxChannel
+
+import collections
+import logging
+import os
+import random
+import re
+import tempfile
+import time
+
+# TODO: UP, MTU attributes!
+
+reschedule_delay = "0.5s"
+
+@clsinit
+class LinuxInterface(ResourceManager):
+    _rtype = "LinuxInterface"
+
+    @classmethod
+    def _register_attributes(cls):
+        ip4 = Attribute("ip4", "IPv4 Address",
+                flags = Flags.ExecReadOnly)
+
+        ip6 = Attribute("ip6", "IPv6 Address",
+                flags = Flags.ExecReadOnly)
+
+        mac = Attribute("mac", "MAC Address",
+                flags = Flags.ExecReadOnly)
+
+        mask4 = Attribute("mask4", "IPv4 network mask",
+                flags = Flags.ExecReadOnly)
+
+        mask6 = Attribute("mask6", "IPv6 network mask",
+                type = Types.Integer,
+                flags = Flags.ExecReadOnly)
+
+        mtu = Attribute("mtu", "Maximum transmition unit for device",
+            type = Types.Integer)
+
+        devname = Attribute("deviceName", 
+                "Name of the network interface (e.g. eth0, wlan0, etc)",
+                flags = Flags.ExecReadOnly)
+
+        up = Attribute("up", "Link up", type = Types.Bool)
+
+        tear_down = Attribute("tearDown", "Bash script to be executed before " + \
+                "releasing the resource",
+                flags = Flags.ExecReadOnly)
+
+        cls._register_attribute(ip4)
+        cls._register_attribute(ip6)
+        cls._register_attribute(mac)
+        cls._register_attribute(mask4)
+        cls._register_attribute(mask6)
+        cls._register_attribute(mtu)
+        cls._register_attribute(devname)
+        cls._register_attribute(up)
+        cls._register_attribute(tear_down)
+
+    def __init__(self, ec, guid):
+        super(LinuxInterface, self).__init__(ec, guid)
+        self._configured = False
+
+        self._logger = logging.getLogger("LinuxInterface")
+        
+        self.add_set_hooks()
+
+    def log_message(self, msg):
+        return " guid %d - host %s - %s " % (self.guid, 
+                self.node.get("hostname"), msg)
+
+    @property
+    def node(self):
+        node = self.get_connected(LinuxNode.rtype())
+        if node: return node[0]
+        return None
+
+    @property
+    def channel(self):
+        chan = self.get_connected(LinuxChannel.rtype())
+        if chan: return chan[0]
+        return None
+
+    def discover(self, filters = None):
+        devname = self.get("deviceName")
+        ip4 = self.get("ip4")
+        ip6 = self.get("ip4")
+        mac = self.get("mac")
+        mask4 = self.get("mask4")
+        mask6 = self.get("mask6")
+        mtu = self.get("mtu")
+
+        # Get current interfaces information
+        (out, err), proc = self.node.execute("ifconfig", sudo = True)
+
+        if err and proc.poll():
+            msg = " Error retrieving interface information "
+            self.error(msg, out, err)
+            raise RuntimeError, "%s - %s - %s" % (msg, out, err)
+        
+        # Check if an interface is found matching the RM attributes
+        ifaces = out.split("\n\n")
+
+        for i in ifaces:
+            m = re.findall("(\w+)\s+Link\s+encap:\w+(\s+HWaddr\s+(([0-9a-fA-F]{2}:?){6}))?(\s+inet\s+addr:((\d+\.?){4}).+Mask:(\d+\.\d+\.\d+\.\d+))?(.+inet6\s+addr:\s+([0-9a-fA-F:.]+)/(\d+))?(.+(UP))?(.+MTU:(\d+))?", i, re.DOTALL)
+            
+            m = m[0]
+            dn = m[0]
+            mc = m[2]
+            i4 = m[5]
+            msk4 = m[7]
+            i6 = m[9]
+            msk6 = m[10]
+            up = True if m[12] else False
+            mu = m[14]
+
+            self.debug("Found interface %(devname)s with MAC %(mac)s,"
+                    "IPv4 %(ipv4)s %(mask4)s IPv6 %(ipv6)s/%(mask6)s %(up)s %(mtu)s" % ({
+                'devname': dn,
+                'mac': mc,
+                'ipv4': i4,
+                'mask4': msk4,
+                'ipv6': i6,
+                'mask6': msk6,
+                'up': up,
+                'mtu': mu
+                }) )
+
+            # If the user didn't provide information we take the first 
+            # interface that is UP
+            if not devname and not ip4 and not ip6 and up:
+                self._configured = True
+                self.load_configuration(dn, mc, i4, msk4, i6, msk6, mu, up)
+                break
+
+            # If the user provided ipv4 or ipv6 matching that of an interface
+            # load the interface info
+            if (ip4 and ip4 == i4) or (ip6 and ip6 == i6):
+                self._configured = True
+                self.load_configuration(dn, mc, i4, msk4, i6, msk6, mu, up)
+                break
+
+            # If the user provided the device name we load the associated info
+            if devname and devname == dn:
+                if ((ip4 and ip4 == i4) and (ipv6 and ip6 == i6)) or \
+                        not (ip4 or ip6):
+                    self._configured = True
+               
+                # If the user gave a different ip than the existing, asume ip 
+                # needs to be changed
+                i4 = ip4 or i4
+                i6 = ip6 or i6
+                mu = mtu or mu 
+
+                self.load_configuration(dn, mc, i4, msk4, i6, msk6, mu, up)
+                break
+       
+        if not self.get("deviceName"):
+            msg = "Unable to resolve interface "
+            self.error(msg)
+            raise RuntimeError, msg
+
+        super(LinuxInterface, self).discover(filters = filters)
+
+    def provision(self, filters = None):
+        devname = self.get("deviceName")
+        ip4 = self.get("ip4")
+        ip6 = self.get("ip4")
+        mac = self.get("mac")
+        mask4 = self.get("mask4")
+        mask6 = self.get("mask6")
+        mtu = self.get("mtu")
+
+        # Must configure interface if configuration is required
+        if not self._configured:
+            cmd = "ifconfig %s" % devname
+
+            if ip4 and mask4:
+                cmd += " %(ip4)s netmask %(mask4)s broadcast %(bcast)s up" % ({
+                    'ip4': ip4,
+                    'mask4': mask4,
+                    'bcast': bcast})
+            if mtu:
+                cmd += " mtu %d " % mtu
+
+            (out, err), proc = self.node.execute(cmd, sudo = True)
+
+            if err and proc.poll():
+                msg = "Error configuring interface with command '%s'" % cmd
+                self.error(msg, out, err)
+                raise RuntimeError, "%s - %s - %s" % (msg, out, err)
+
+            if ip6 and mask6:
+                cmd = "ifconfig %(devname)s inet6 add %(ip6)s/%(mask6)d" % ({
+                        'devname': devname,
+                        'ip6': ip6,
+                        'mask6': mask6})
+
+            (out, err), proc = self.node.execute(cmd, sudo = True)
+
+            if err and proc.poll():
+                msg = "Error seting ipv6 for interface using command '%s' " % cmd
+                self.error(msg, out, err)
+                raise RuntimeError, "%s - %s - %s" % (msg, out, err)
+
+        super(LinuxInterface, self).provision(filters = filters)
+
+    def deploy(self):
+        # Wait until node is provisioned
+        node = self.node
+        chan = self.channel
+
+        if not node or node.state < ResourceState.PROVISIONED:
+            self.ec.schedule(reschedule_delay, self.deploy)
+        elif not chan or chan.state < ResourceState.READY:
+            self.ec.schedule(reschedule_delay, self.deploy)
+        else:
+            # Verify if the interface exists in node. If not, configue
+            # if yes, load existing configuration
+            try:
+                self.discover()
+                self.provision()
+            except:
+                self._state = ResourceState.FAILED
+                raise
+
+            super(LinuxInterface, self).deploy()
+
+    def release(self):
+        tear_down = self.get("tearDown")
+        if tear_down:
+            self.execute(tear_down)
+
+        super(LinuxInterface, self).release()
+
+    def valid_connection(self, guid):
+        # TODO: Validate!
+        return True
+
+    def load_configuration(self, devname, mac, ip4, mask4, ip6, mask6, mtu, up):
+        self.set("deviceName", devname)
+        self.set("mac", mac)
+        self.set("ip4", ip4)
+        self.set("mask4", mask4)
+        self.set("ip6", ip6)
+        self.set("mask6", mask6)
+
+        # set the following without validating or triggering hooks
+        attr = self._attrs["up"]
+        attr._value = up
+        attr = self._attrs["mtu"]
+
+    def add_set_hooks(self):
+        attrup = self._attrs["up"]
+        attrup.set_hook = self.set_hook_up
+
+        attrmtu = self._attrs["mtu"]
+        attrmtu.set_hook = self.set_hook_mtu
+
+    def set_hook_up(self, oldval, newval):
+        if oldval == newval:
+            return oldval
+
+        # configure interface up
+        if newval == True:
+            cmd = "ifup %s" % self.get("deviceName")
+        elif newval == False:
+            cmd = "ifdown %s" % self.get("deviceName")
+
+        (out, err), proc = self.node.execute(cmd, sudo = True)
+
+        if err and proc.poll():
+            msg = "Error setting interface up/down using command '%s' " % cmd
+            self.error(msg, err, out)
+            return oldval
+        
+        return newval
+
+    def set_hook_mtu(self, oldval, newval):
+        if oldval == newval:
+            return oldval
+
+        cmd = "ifconfig %s mtu %d" % (self.get("deviceName"), newval)
+
+        (out, err), proc = self.node.execute(cmd, sudo = True)
+
+        if err and proc.poll():
+            msg = "Error setting interface MTU using command '%s' " % cmd
+            self.error(msg, err, out)
+            return  oldval
+        
+        return newval
+
index 3f30633..85e2286 100644 (file)
@@ -1,7 +1,7 @@
-from neco.execution.attribute import Attribute, Flags
-from neco.execution.resource import ResourceManager, clsinit, ResourceState
-from neco.resources.linux import rpmfuncs, debfuncs 
-from neco.util import sshfuncs, execfuncs 
+from nepi.execution.attribute import Attribute, Flags
+from nepi.execution.resource import ResourceManager, clsinit, ResourceState
+from nepi.resources.linux import rpmfuncs, debfuncs 
+from nepi.util import sshfuncs, execfuncs 
 
 import collections
 import logging
@@ -156,7 +156,7 @@ class LinuxNode(ResourceManager):
 
         # Node needs to wait until all associated interfaces are 
         # ready before it can finalize deployment
-        from neco.resources.linux.interface import LinuxInterface
+        from nepi.resources.linux.interface import LinuxInterface
         ifaces = self.get_connected(LinuxInterface.rtype())
         for iface in ifaces:
             if iface.state < ResourceState.READY:
diff --git a/src/nepi/resources/ns3/ns3wrapper_server.py b/src/nepi/resources/ns3/ns3wrapper_server.py
new file mode 100644 (file)
index 0000000..5a82909
--- /dev/null
@@ -0,0 +1,344 @@
+class Server(object):
+    def __init__(self, root_dir = ".", log_level = "ERROR", 
+            environment_setup = "", clean_root = False):
+        self._root_dir = root_dir
+        self._clean_root = clean_root
+        self._stop = False
+        self._ctrl_sock = None
+        self._log_level = log_level
+        self._rdbuf = ""
+        self._environment_setup = environment_setup
+
+    def run(self):
+        try:
+            if self.daemonize():
+                self.post_daemonize()
+                self.loop()
+                self.cleanup()
+                # ref: "os._exit(0)"
+                # can not return normally after fork beacuse no exec was done.
+                # This means that if we don't do a os._exit(0) here the code that 
+                # follows the call to "Server.run()" in the "caller code" will be 
+                # executed... but by now it has already been executed after the 
+                # first process (the one that did the first fork) returned.
+                os._exit(0)
+        except:
+            print >>sys.stderr, "SERVER_ERROR."
+            self.log_error()
+            self.cleanup()
+            os._exit(0)
+        print >>sys.stderr, "SERVER_READY."
+
+    def daemonize(self):
+        # pipes for process synchronization
+        (r, w) = os.pipe()
+        
+        # build root folder
+        root = os.path.normpath(self._root_dir)
+        if self._root_dir not in [".", ""] and os.path.exists(root) \
+                and self._clean_root:
+            shutil.rmtree(root)
+        if not os.path.exists(root):
+            os.makedirs(root, 0755)
+
+        pid1 = os.fork()
+        if pid1 > 0:
+            os.close(w)
+            while True:
+                try:
+                    os.read(r, 1)
+                except OSError, e: # pragma: no cover
+                    if e.errno == errno.EINTR:
+                        continue
+                    else:
+                        raise
+                break
+            os.close(r)
+            # os.waitpid avoids leaving a <defunc> (zombie) process
+            st = os.waitpid(pid1, 0)[1]
+            if st:
+                raise RuntimeError("Daemonization failed")
+            # return 0 to inform the caller method that this is not the 
+            # daemonized process
+            return 0
+        os.close(r)
+
+        # Decouple from parent environment.
+        os.chdir(self._root_dir)
+        os.umask(0)
+        os.setsid()
+
+        # fork 2
+        pid2 = os.fork()
+        if pid2 > 0:
+            # see ref: "os._exit(0)"
+            os._exit(0)
+
+        # close all open file descriptors.
+        max_fd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
+        if (max_fd == resource.RLIM_INFINITY):
+            max_fd = MAX_FD
+        for fd in range(3, max_fd):
+            if fd != w:
+                try:
+                    os.close(fd)
+                except OSError:
+                    pass
+
+        # Redirect standard file descriptors.
+        stdin = open(DEV_NULL, "r")
+        stderr = stdout = open(STD_ERR, "a", 0)
+        os.dup2(stdin.fileno(), sys.stdin.fileno())
+        # NOTE: sys.stdout.write will still be buffered, even if the file
+        # was opened with 0 buffer
+        os.dup2(stdout.fileno(), sys.stdout.fileno())
+        os.dup2(stderr.fileno(), sys.stderr.fileno())
+        
+        # setup environment
+        if self._environment_setup:
+            # parse environment variables and pass to child process
+            # do it by executing shell commands, in case there's some heavy setup involved
+            envproc = subprocess.Popen(
+                [ "bash", "-c", 
+                    "( %s python -c 'import os,sys ; print \"\\x01\".join(\"\\x02\".join(map(str,x)) for x in os.environ.iteritems())' ) | tail -1" %
+                        ( self._environment_setup, ) ],
+                stdin = subprocess.PIPE, 
+                stdout = subprocess.PIPE,
+                stderr = subprocess.PIPE
+            )
+            out,err = envproc.communicate()
+
+            # parse new environment
+            if out:
+                environment = dict(map(lambda x:x.split("\x02"), out.split("\x01")))
+            
+                # apply to current environment
+                for name, value in environment.iteritems():
+                    os.environ[name] = value
+                
+                # apply pythonpath
+                if 'PYTHONPATH' in environment:
+                    sys.path = environment['PYTHONPATH'].split(':') + sys.path
+
+        # create control socket
+        self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+        try:
+            self._ctrl_sock.bind(CTRL_SOCK)
+        except socket.error:
+            # Address in use, check pidfile
+            pid = None
+            try:
+                pidfile = open(CTRL_PID, "r")
+                pid = pidfile.read()
+                pidfile.close()
+                pid = int(pid)
+            except:
+                # no pidfile
+                pass
+            
+            if pid is not None:
+                # Check process liveliness
+                if not os.path.exists("/proc/%d" % (pid,)):
+                    # Ok, it's dead, clean the socket
+                    os.remove(CTRL_SOCK)
+            
+            # try again
+            self._ctrl_sock.bind(CTRL_SOCK)
+            
+        self._ctrl_sock.listen(0)
+        
+        # Save pidfile
+        pidfile = open(CTRL_PID, "w")
+        pidfile.write(str(os.getpid()))
+        pidfile.close()
+
+        # let the parent process know that the daemonization is finished
+        os.write(w, "\n")
+        os.close(w)
+        return 1
+
+    def post_daemonize(self):
+        os.environ["NEPI_CONTROLLER_LOGLEVEL"] = self._log_level
+        # QT, for some strange reason, redefines the SIGCHILD handler to write
+        # a \0 to a fd (lets say fileno 'x'), when ever a SIGCHILD is received.
+        # Server dameonization closes all file descriptors from fileno '3',
+        # but the overloaded handler (inherited by the forked process) will
+        # keep trying to write the \0 to fileno 'x', which might have been reused 
+        # after closing, for other operations. This is bad bad bad when fileno 'x'
+        # is in use for communication pouroses, because unexpected \0 start
+        # appearing in the communication messages... this is exactly what happens 
+        # when using netns in daemonized form. Thus, be have no other alternative than
+        # restoring the SIGCHLD handler to the default here.
+        import signal
+        signal.signal(signal.SIGCHLD, signal.SIG_DFL)
+
+    def loop(self):
+        while not self._stop:
+            conn, addr = self._ctrl_sock.accept()
+            self.log_error("ACCEPTED CONNECTION: %s" % (addr,))
+            conn.settimeout(5)
+            while not self._stop:
+                try:
+                    msg = self.recv_msg(conn)
+                except socket.timeout, e:
+                    #self.log_error("SERVER recv_msg: connection timedout ")
+                    continue
+                
+                if not msg:
+                    self.log_error("CONNECTION LOST")
+                    break
+                    
+                if msg == STOP_MSG:
+                    self._stop = True
+                    reply = self.stop_action()
+                else:
+                    reply = self.reply_action(msg)
+                
+                try:
+                    self.send_reply(conn, reply)
+                except socket.error:
+                    self.log_error()
+                    self.log_error("NOTICE: Awaiting for reconnection")
+                    break
+            try:
+                conn.close()
+            except:
+                # Doesn't matter
+                self.log_error()
+
+    def recv_msg(self, conn):
+        data = [self._rdbuf]
+        chunk = data[0]
+        while '\n' not in chunk:
+            try:
+                chunk = conn.recv(1024)
+            except (OSError, socket.error), e:
+                if e[0] != errno.EINTR:
+                    raise
+                else:
+                    continue
+            if chunk:
+                data.append(chunk)
+            else:
+                # empty chunk = EOF
+                break
+        data = ''.join(data).split('\n',1)
+        while len(data) < 2:
+            data.append('')
+        data, self._rdbuf = data
+        
+        decoded = base64.b64decode(data)
+        return decoded.rstrip()
+
+    def send_reply(self, conn, reply):
+        encoded = base64.b64encode(reply)
+        conn.send("%s\n" % encoded)
+       
+    def cleanup(self):
+        try:
+            self._ctrl_sock.close()
+            os.remove(CTRL_SOCK)
+        except:
+            self.log_error()
+
+    def stop_action(self):
+        return "Stopping server"
+
+    def reply_action(self, msg):
+        return "Reply to: %s" % msg
+
+    def log_error(self, text = None, context = ''):
+        if text == None:
+            text = traceback.format_exc()
+        date = time.strftime("%Y-%m-%d %H:%M:%S")
+        if context:
+            context = " (%s)" % (context,)
+        sys.stderr.write("ERROR%s: %s\n%s\n" % (context, date, text))
+        return text
+
+    def log_debug(self, text):
+        if self._log_level == DC.DEBUG_LEVEL:
+            date = time.strftime("%Y-%m-%d %H:%M:%S")
+            sys.stderr.write("DEBUG: %s\n%s\n" % (date, text))
+
+class Forwarder(object):
+    def __init__(self, root_dir = "."):
+        self._ctrl_sock = None
+        self._root_dir = root_dir
+        self._stop = False
+        self._rdbuf = ""
+
+    def forward(self):
+        self.connect()
+        print >>sys.stderr, "FORWARDER_READY."
+        while not self._stop:
+            data = self.read_data()
+            if not data:
+                # Connection to client lost
+                break
+            self.send_to_server(data)
+            
+            data = self.recv_from_server()
+            if not data:
+                # Connection to server lost
+                raise IOError, "Connection to server lost while "\
+                    "expecting response"
+            self.write_data(data)
+        self.disconnect()
+
+    def read_data(self):
+        return sys.stdin.readline()
+
+    def write_data(self, data):
+        sys.stdout.write(data)
+        # sys.stdout.write is buffered, this is why we need to do a flush()
+        sys.stdout.flush()
+
+    def send_to_server(self, data):
+        try:
+            self._ctrl_sock.send(data)
+        except (IOError, socket.error), e:
+            if e[0] == errno.EPIPE:
+                self.connect()
+                self._ctrl_sock.send(data)
+            else:
+                raise e
+        encoded = data.rstrip() 
+        msg = base64.b64decode(encoded)
+        if msg == STOP_MSG:
+            self._stop = True
+
+    def recv_from_server(self):
+        data = [self._rdbuf]
+        chunk = data[0]
+        while '\n' not in chunk:
+            try:
+                chunk = self._ctrl_sock.recv(1024)
+            except (OSError, socket.error), e:
+                if e[0] != errno.EINTR:
+                    raise
+                continue
+            if chunk:
+                data.append(chunk)
+            else:
+                # empty chunk = EOF
+                break
+        data = ''.join(data).split('\n',1)
+        while len(data) < 2:
+            data.append('')
+        data, self._rdbuf = data
+        
+        return data+'\n'
+    def connect(self):
+        self.disconnect()
+        self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+        sock_addr = os.path.join(self._root_dir, CTRL_SOCK)
+        self._ctrl_sock.connect(sock_addr)
+
+    def disconnect(self):
+        try:
+            self._ctrl_sock.close()
+        except:
+            pass
+
index 7c71090..754be53 100644 (file)
@@ -4,11 +4,11 @@ import ssl
 import sys
 import time
 import hashlib
-import neco
+import nepi
 import threading
 
-from neco.resources.omf.omf_client import OMFClient
-from neco.resources.omf.omf_messages_5_4 import MessageHandler
+from nepi.resources.omf.omf_client import OMFClient
+from nepi.resources.omf.omf_messages_5_4 import MessageHandler
 
 class OMFAPI(object):
     """
@@ -56,8 +56,8 @@ class OMFAPI(object):
         self._hostnames = []
         self._xmpp_root = xmpp_root or "OMF_5.4"
 
-        self._logger = logging.getLogger("neco.omf.omfApi    ")
-        self._logger.setLevel(neco.LOGLEVEL)
+        self._logger = logging.getLogger("nepi.omf.omfApi    ")
+        self._logger.setLevel(nepi.LOGLEVEL)
 
         # OMF xmpp client
         self._client = None
index 8653fa3..5196ecd 100644 (file)
@@ -1,10 +1,10 @@
 #!/usr/bin/env python
 
-from neco.execution.resource import ResourceManager, clsinit
-from neco.execution.attribute import Attribute, Flags 
-from neco.resources.omf.omf_api import OMFAPIFactory
+from nepi.execution.resource import ResourceManager, clsinit
+from nepi.execution.attribute import Attribute, Flags 
+from nepi.resources.omf.omf_api import OMFAPIFactory
 
-import neco
+import nepi
 import logging
 
 @clsinit
@@ -73,8 +73,8 @@ class OMFApplication(ResourceManager):
 
         self._omf_api = None
 
-        self._logger = logging.getLogger("neco.omf.omfApp    ")
-        self._logger.setLevel(neco.LOGLEVEL)
+        self._logger = logging.getLogger("nepi.omf.omfApp    ")
+        self._logger.setLevel(nepi.LOGLEVEL)
 
 
     def _validate_connection(self, guid):
index 111af67..8019896 100644 (file)
@@ -1,10 +1,10 @@
 #!/usr/bin/env python
-from neco.execution.resource import ResourceManager, clsinit
-from neco.execution.attribute import Attribute, Flags 
+from nepi.execution.resource import ResourceManager, clsinit
+from nepi.execution.attribute import Attribute, Flags 
 
-from neco.resources.omf.omf_api import OMFAPIFactory
+from nepi.resources.omf.omf_api import OMFAPIFactory
 
-import neco
+import nepi
 import logging
 
 @clsinit
@@ -60,8 +60,8 @@ class OMFChannel(ResourceManager):
 
         self._omf_api = None
 
-        self._logger = logging.getLogger("neco.omf.omfChannel")
-        self._logger.setLevel(neco.LOGLEVEL)
+        self._logger = logging.getLogger("nepi.omf.omfChannel")
+        self._logger.setLevel(nepi.LOGLEVEL)
 
     def _validate_connection(self, guid):
         """Check if the connection is available.
index 53dcb39..e5058f3 100644 (file)
@@ -4,7 +4,7 @@ from sleekxmpp.exceptions import IqError, IqTimeout
 import traceback
 import xml.etree.ElementTree as ET
 
-import neco
+import nepi
 
 # inherit from BaseXmpp and XMLStream classes
 class OMFClient(sleekxmpp.ClientXMPP): 
@@ -46,8 +46,8 @@ class OMFClient(sleekxmpp.ClientXMPP):
         self.add_event_handler("register", self.register)
         self.add_event_handler("pubsub_publish", self.handle_omf_message)
         
-        self._logger = logging.getLogger("neco.omf.xmppClient")
-        self._logger.setLevel(neco.LOGLEVEL)
+        self._logger = logging.getLogger("nepi.omf.xmppClient")
+        self._logger.setLevel(nepi.LOGLEVEL)
 
     @property
     def ready(self):
index 1bc0fc4..36a479f 100644 (file)
@@ -1,10 +1,10 @@
 #!/usr/bin/env python
-from neco.execution.resource import ResourceManager, clsinit
-from neco.execution.attribute import Attribute, Flags 
+from nepi.execution.resource import ResourceManager, clsinit
+from nepi.execution.attribute import Attribute, Flags 
 
-from neco.resources.omf.omf_api import OMFAPIFactory
+from nepi.resources.omf.omf_api import OMFAPIFactory
 
-import neco
+import nepi
 import logging
 
 @clsinit
@@ -69,8 +69,8 @@ class OMFWifiInterface(ResourceManager):
         self._omf_api = None
         self._alias = self.get('alias')
 
-        self._logger = logging.getLogger("neco.omf.omfIface  ")
-        self._logger.setLevel(neco.LOGLEVEL)
+        self._logger = logging.getLogger("nepi.omf.omfIface  ")
+        self._logger.setLevel(nepi.LOGLEVEL)
 
     def _validate_connection(self, guid):
         """ Check if the connection is available.
index d5025a0..1396cdb 100644 (file)
@@ -1,10 +1,10 @@
 #!/usr/bin/env python
-from neco.execution.resource import ResourceManager, clsinit
-from neco.execution.attribute import Attribute, Flags 
+from nepi.execution.resource import ResourceManager, clsinit
+from nepi.execution.attribute import Attribute, Flags 
 
-from neco.resources.omf.omf_api import OMFAPIFactory
+from nepi.resources.omf.omf_api import OMFAPIFactory
 
-import neco
+import nepi
 import logging
 import time
 
@@ -79,10 +79,10 @@ class OMFNode(ResourceManager):
 
         self._omf_api = None 
 
-        self._logger = logging.getLogger("neco.omf.omfNode   ")
+        self._logger = logging.getLogger("nepi.omf.omfNode   ")
 
         # XXX: TO DISCUSS
-        self._logger.setLevel(neco.LOGLEVEL)
+        self._logger.setLevel(nepi.LOGLEVEL)
 
     def _validate_connection(self, guid):
         """Check if the connection is available.
index 978ac19..0ca5962 100644 (file)
@@ -1,8 +1,8 @@
 #!/usr/bin/env python
-from neco.execution.resource import ResourceManager, clsinit
-from neco.execution.attribute import Attribute
+from nepi.execution.resource import ResourceManager, clsinit
+from nepi.execution.attribute import Attribute
 
-from neco.resources.omf.omf_api import OMFAPIFactory
+from nepi.resources.omf.omf_api import OMFAPIFactory
 
 @clsinit
 class OMFResource(ResourceManager):
index 2ff76ee..7e2d506 100644 (file)
@@ -1,4 +1,4 @@
-from neco.util.sshfuncs import RUNNING, FINISHED, NOT_STARTED, STDOUT 
+from nepi.util.sshfuncs import RUNNING, FINISHED, NOT_STARTED, STDOUT 
 
 import subprocess
 
index e2c5478..b06cb7e 100644 (file)
@@ -1,4 +1,4 @@
-from neco.design.box import Box
+from nepi.design.box import Box
 
 from xml.dom import minidom
 import sys
index 71c8f15..f59c5be 100755 (executable)
@@ -1,6 +1,6 @@
 #!/usr/bin/env python
 
-from neco.design.box import Box 
+from nepi.design.box import Box 
 
 import unittest
 
index 9322540..6c44878 100755 (executable)
@@ -1,7 +1,7 @@
 #!/usr/bin/env python
 
-from neco.execution.ec import ExperimentController, ECState 
-from neco.execution.scheduler import TaskStatus
+from nepi.execution.ec import ExperimentController, ECState 
+from nepi.execution.scheduler import TaskStatus
 
 import datetime
 import time
index 128c6df..cb9423d 100755 (executable)
@@ -1,7 +1,7 @@
 #!/usr/bin/env python
-from neco.execution.attribute import Attribute
-from neco.execution.ec import ExperimentController 
-from neco.execution.resource import ResourceManager, ResourceState, clsinit
+from nepi.execution.attribute import Attribute
+from nepi.execution.ec import ExperimentController 
+from nepi.execution.resource import ResourceManager, ResourceState, clsinit
 
 import time
 import unittest
@@ -27,7 +27,7 @@ class AnotherResource(ResourceManager):
      
 class ResourceFactoryTestCase(unittest.TestCase):
     def test_add_resource_factory(self):
-        from neco.execution.resource import ResourceFactory
+        from nepi.execution.resource import ResourceFactory
 
         ResourceFactory.register_type(MyResource)
         ResourceFactory.register_type(AnotherResource)
@@ -123,7 +123,7 @@ class ResourceManagerTestCase(unittest.TestCase):
          - The channel doesn't wait for any other resource to be ready
 
         """
-        from neco.execution.resource import ResourceFactory
+        from nepi.execution.resource import ResourceFactory
         
         ResourceFactory.register_type(Application)
         ResourceFactory.register_type(Node)
index 7167978..4f9f90e 100644 (file)
@@ -1,9 +1,9 @@
 #!/usr/bin/env python
-from neco.execution.ec import ExperimentController 
-from neco.execution.resource import ResourceState, ResourceAction
-from neco.execution.trace import TraceAttr
-from neco.resources.linux.node import LinuxNode
-from neco.resources.linux.application import LinuxApplication
+from nepi.execution.ec import ExperimentController 
+from nepi.execution.resource import ResourceState, ResourceAction
+from nepi.execution.trace import TraceAttr
+from nepi.resources.linux.node import LinuxNode
+from nepi.resources.linux.application import LinuxApplication
 
 from test_utils import skipIfNotAlive
 
@@ -24,7 +24,7 @@ class LinuxApplicationTestCase(unittest.TestCase):
 
     @skipIfNotAlive
     def t_stdout(self, host, user):
-        from neco.execution.resource import ResourceFactory
+        from nepi.execution.resource import ResourceFactory
         
         ResourceFactory.register_type(LinuxNode)
         ResourceFactory.register_type(LinuxApplication)
@@ -56,7 +56,7 @@ class LinuxApplicationTestCase(unittest.TestCase):
 
     @skipIfNotAlive
     def t_ping(self, host, user):
-        from neco.execution.resource import ResourceFactory
+        from nepi.execution.resource import ResourceFactory
         
         ResourceFactory.register_type(LinuxNode)
         ResourceFactory.register_type(LinuxApplication)
@@ -98,7 +98,7 @@ class LinuxApplicationTestCase(unittest.TestCase):
 
     @skipIfNotAlive
     def t_concurrency(self, host, user):
-        from neco.execution.resource import ResourceFactory
+        from nepi.execution.resource import ResourceFactory
         
         ResourceFactory.register_type(LinuxNode)
         ResourceFactory.register_type(LinuxApplication)
@@ -146,7 +146,7 @@ class LinuxApplicationTestCase(unittest.TestCase):
 
     @skipIfNotAlive
     def t_condition(self, host, user, depends):
-        from neco.execution.resource import ResourceFactory
+        from nepi.execution.resource import ResourceFactory
         
         ResourceFactory.register_type(LinuxNode)
         ResourceFactory.register_type(LinuxApplication)
@@ -189,7 +189,7 @@ class LinuxApplicationTestCase(unittest.TestCase):
 
     @skipIfNotAlive
     def t_http_sources(self, host, user):
-        from neco.execution.resource import ResourceFactory
+        from nepi.execution.resource import ResourceFactory
         
         ResourceFactory.register_type(LinuxNode)
         ResourceFactory.register_type(LinuxApplication)
index f94c7e6..44c293a 100644 (file)
@@ -1,10 +1,10 @@
 #!/usr/bin/env python
-from neco.execution.ec import ExperimentController 
-from neco.execution.resource import ResourceState
-from neco.resources.linux.node import LinuxNode
-from neco.resources.linux.interface import LinuxInterface
-from neco.resources.linux.channel import LinuxChannel
-from neco.util.sshfuncs import RUNNING, FINISHED
+from nepi.execution.ec import ExperimentController 
+from nepi.execution.resource import ResourceState
+from nepi.resources.linux.node import LinuxNode
+from nepi.resources.linux.interface import LinuxInterface
+from nepi.resources.linux.channel import LinuxChannel
+from nepi.util.sshfuncs import RUNNING, FINISHED
 
 from test_utils import skipIfNotAlive
 
@@ -23,7 +23,7 @@ class LinuxInterfaceTestCase(unittest.TestCase):
 
     @skipIfNotAlive
     def t_deploy(self, host, user):
-        from neco.execution.resource import ResourceFactory
+        from nepi.execution.resource import ResourceFactory
         
         ResourceFactory.register_type(LinuxNode)
         ResourceFactory.register_type(LinuxInterface)
index 8849a4c..a09154b 100644 (file)
@@ -1,6 +1,6 @@
 #!/usr/bin/env python
-from neco.resources.linux.node import LinuxNode
-from neco.util.sshfuncs import RUNNING, FINISHED
+from nepi.resources.linux.node import LinuxNode
+from nepi.util.sshfuncs import RUNNING, FINISHED
 
 from test_utils import skipIfNotAlive, skipInteractive, create_node
 
index 1cf8aba..d915707 100644 (file)
@@ -1,4 +1,4 @@
-from neco.resources.linux.node import LinuxNode
+from nepi.resources.linux.node import LinuxNode
 
 import os
 
index de9faa8..815c0f1 100644 (file)
@@ -10,7 +10,7 @@
 #  node n0 sends IGMP traffic to node n3
 
 
-from neco.resources.ns3.ns3wrapper import NS3Wrapper
+from nepi.resources.ns3.ns3wrapper import NS3Wrapper
 
 import os.path
 import time
index 5c37547..d9ab6c5 100755 (executable)
@@ -1,15 +1,15 @@
 #!/usr/bin/env python
-from neco.execution.resource import ResourceFactory, ResourceManager, ResourceAction, ResourceState
-from neco.execution.ec import ExperimentController
+from nepi.execution.resource import ResourceFactory, ResourceManager, ResourceAction, ResourceState
+from nepi.execution.ec import ExperimentController
 
-from neco.resources.omf.omf_node import OMFNode
-from neco.resources.omf.omf_application import OMFApplication
-from neco.resources.omf.omf_interface import OMFWifiInterface
-from neco.resources.omf.omf_channel import OMFChannel
-from neco.resources.omf.omf_api import OMFAPIFactory
+from nepi.resources.omf.omf_node import OMFNode
+from nepi.resources.omf.omf_application import OMFApplication
+from nepi.resources.omf.omf_interface import OMFWifiInterface
+from nepi.resources.omf.omf_channel import OMFChannel
+from nepi.resources.omf.omf_api import OMFAPIFactory
 
-from neco.util import guid
-from neco.util.timefuncs import *
+from nepi.util import guid
+from nepi.util.timefuncs import *
 
 import time
 import unittest
index 5ab70fe..44cac7a 100755 (executable)
@@ -1,7 +1,7 @@
 #!/usr/bin/env python
 
-from neco.design.box import Box 
-from neco.util.parser import XMLParser
+from nepi.design.box import Box 
+from nepi.util.parser import XMLParser
 
 import unittest
 
index 20ae511..806ae5d 100755 (executable)
@@ -1,7 +1,7 @@
 #!/usr/bin/env python
 
-from neco.design.box import Box 
-from neco.util.plot import Plotter
+from nepi.design.box import Box 
+from nepi.util.plot import Plotter
 
 import subprocess
 import unittest
index c9afb32..f860214 100644 (file)
@@ -1,6 +1,6 @@
 #!/usr/bin/env python
 
-from neco.util.sshfuncs import rexec, rcopy, rspawn, rcheckpid, rstatus, rkill,\
+from nepi.util.sshfuncs import rexec, rcopy, rspawn, rcheckpid, rstatus, rkill,\
         RUNNING, FINISHED 
 
 import getpass