NECo: A tool to design and run experiments on arbitrary platforms.
authorAlina Quereilhac <alina.quereilhac@inria.fr>
Mon, 5 Nov 2012 18:47:41 +0000 (19:47 +0100)
committerAlina Quereilhac <alina.quereilhac@inria.fr>
Mon, 5 Nov 2012 18:47:41 +0000 (19:47 +0100)
30 files changed:
Makefile [new file with mode: 0644]
setup.cfg [new file with mode: 0644]
setup.py [new file with mode: 0755]
src/neco/__init__.py [new file with mode: 0644]
src/neco/design/__init__.py [new file with mode: 0644]
src/neco/design/box.py [new file with mode: 0644]
src/neco/execution/__init__.py [new file with mode: 0644]
src/neco/execution/callbacks.py [new file with mode: 0644]
src/neco/execution/ec.py [new file with mode: 0644]
src/neco/execution/resource.py [new file with mode: 0644]
src/neco/execution/scheduler.py [new file with mode: 0644]
src/neco/execution/tags.py [new file with mode: 0644]
src/neco/execution/tasks.py [new file with mode: 0644]
src/neco/resources/__init__.py [new file with mode: 0644]
src/neco/resources/base/__init__.py [new file with mode: 0644]
src/neco/resources/base/application.py [new file with mode: 0644]
src/neco/resources/base/linux_node.py [new file with mode: 0644]
src/neco/resources/netns/__init__.py [new file with mode: 0644]
src/neco/resources/ns3/__init__.py [new file with mode: 0644]
src/neco/util/__init__.py [new file with mode: 0644]
src/neco/util/guid.py [new file with mode: 0644]
src/neco/util/parallel.py [new file with mode: 0644]
src/neco/util/parser.py [new file with mode: 0644]
src/neco/util/plot.py [new file with mode: 0644]
src/neco/util/sshfuncs.py [new file with mode: 0644]
src/neco/util/timefuncs.py [new file with mode: 0644]
test/design/box.py [new file with mode: 0755]
test/execution/ec.py [new file with mode: 0755]
test/util/parser.py [new file with mode: 0755]
test/util/plot.py [new file with mode: 0755]

diff --git a/Makefile b/Makefile
new file mode 100644 (file)
index 0000000..cc56b5e
--- /dev/null
+++ b/Makefile
@@ -0,0 +1,62 @@
+SRCDIR      = $(CURDIR)/src
+TESTDIR     = $(CURDIR)/test
+BUILDDIR    = $(CURDIR)/build
+DISTDIR     = $(CURDIR)/dist
+
+# stupid distutils, it's broken in so many ways
+SUBBUILDDIR = $(shell python -c 'import distutils.util, sys; \
+             print "lib.%s-%s" % (distutils.util.get_platform(), \
+             sys.version[0:3])')
+PYTHON25 := $(shell python -c 'import sys; v = sys.version_info; \
+    print (1 if v[0] <= 2 and v[1] <= 5 else 0)')
+
+ifeq ($(PYTHON25),0)
+BUILDDIR := $(BUILDDIR)/$(SUBBUILDDIR)
+else
+BUILDDIR := $(BUILDDIR)/lib
+endif
+
+PYPATH = $(BUILDDIR):$(PYTHONPATH)
+COVERAGE = $(or $(shell which coverage), $(shell which python-coverage), \
+          coverage)
+
+all:
+       ./setup.py build
+
+install: all
+       ./setup.py install
+
+test: all
+       retval=0; \
+              for i in `find "$(TESTDIR)" -iname '*.py' -perm -u+x -type f`; do \
+              echo $$i; \
+              PYTHONPATH="$(PYPATH)" $$i -v || retval=$$?; \
+              done; exit $$retval
+
+coverage: all
+       rm -f .coverage
+       for i in `find "$(TESTDIR)" -perm -u+x -type f`; do \
+               set -e; \
+               PYTHONPATH="$(PYPATH)" $(COVERAGE) -x $$i -v; \
+               done
+       $(COVERAGE) -c
+       $(COVERAGE) -r -m `find "$(BUILDDIR)" -name \\*.py -type f`
+       rm -f .coverage
+
+clean:
+       ./setup.py clean
+       rm -f `find -name \*.pyc` .coverage *.pcap
+
+distclean: clean
+       rm -rf "$(DISTDIR)"
+
+MANIFEST:
+       find . -path ./.hg\* -prune -o -path ./build -prune -o \
+               -name \*.pyc -prune -o -name \*.swp -prune -o \
+               -name MANIFEST -prune -o -type f -print | \
+               sed 's#^\./##' | sort > MANIFEST
+
+dist: MANIFEST
+       ./setup.py sdist
+
+.PHONY: all clean distclean dist test coverage install MANIFEST
diff --git a/setup.cfg b/setup.cfg
new file mode 100644 (file)
index 0000000..e1b8322
--- /dev/null
+++ b/setup.cfg
@@ -0,0 +1,2 @@
+[clean]
+all = 1
diff --git a/setup.py b/setup.py
new file mode 100755 (executable)
index 0000000..a8cf029
--- /dev/null
+++ b/setup.py
@@ -0,0 +1,24 @@
+#!/usr/bin/env python
+from distutils.core import setup
+import sys
+
+setup(
+        name        = "neco",
+        version     = "0.01",
+        description = "Network Experiment Controller",
+        author      = "Alina Quereilhac",
+        url         = "",
+        license     = "GPLv2",
+        platforms   = "Linux",
+        packages    = [
+            "neco",
+            "neco.design",
+            "neco.execution",
+            "neco.resources",
+            "neco.resources.base",
+            "neco.resources.netns",
+            "neco.resources.ns3",
+            "neco.tags",
+            "neco.util"],
+        package_dir = {"": "src"},
+    )
diff --git a/src/neco/__init__.py b/src/neco/__init__.py
new file mode 100644 (file)
index 0000000..df19bd5
--- /dev/null
@@ -0,0 +1,2 @@
+import logging
+logging.basicConfig()
diff --git a/src/neco/design/__init__.py b/src/neco/design/__init__.py
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/src/neco/design/box.py b/src/neco/design/box.py
new file mode 100644 (file)
index 0000000..121827a
--- /dev/null
@@ -0,0 +1,101 @@
+from neco.util import guid
+
+guid_gen = guid.GuidGenerator()
+
+class Attributes(object):
+    def __init__(self):
+        super(Attributes, self).__init__()
+        self._attributes = dict()
+
+    def __getattr__(self, name):
+        try:
+            return self._attributes[name]
+        except:
+            return super(Attributes, self).__getattribute__(name)
+
+    def __setattr__(self, name, value):
+        try:
+            if value == None:
+                old = self._attributes[name]
+                del self._attributes[name]
+                return old
+
+            self._attributes[name] = value
+            return value
+        except:
+            return super(Attributes, self).__setattr__(name, value)
+
+class Connections(object):
+    def __init__(self):
+        super(Connections, self).__init__()
+        self._connections = set()
+
+    def __getattr__(self, guid_or_label):
+        try:
+            for b in self._connections:
+                if guid_or_label in [b.guid, b.label]:
+                    return b
+        except:
+            return super(Connections, self).__getattribute__(guid_or_label)
+
+class Box(object):
+    def __init__(self, label = None, guid = None):
+        super(Box, self).__init__()
+        self._guid = guid_gen.next(guid)
+        self._a = Attributes()
+        self._c = Connections()
+        self._tags = set()
+        self.label = label or self._guid
+
+        # Graphical information to draw box
+        self.x = 0
+        self.y = 0
+        self.width = 4
+        self.height = 4
+
+    @property
+    def tags(self):
+        return self._tags
+
+    @property
+    def attributes(self):
+        return self._a._attributes.keys()
+
+    @property
+    def a(self):
+        return self._a
+
+    @property
+    def c(self):
+        return self._c
+
+    @property
+    def guid(self):
+        return self._guid
+
+    @property
+    def connections(self):
+        return set(self._c._connections)
+
+    def tadd(self, name):
+        self._tags.add(name)
+
+    def tdel(self, name):
+        self._tags.remove(name)
+
+    def thas(self, name):
+        return name in self._tags
+
+    def connect(self, box, cascade = True):
+        self._c._connections.add(box)
+        if cascade:
+            box.connect(self, cascade = False)
+
+    def disconnect(self, box, cascade = True):
+        self._c._connections.remove(box)
+        if cascade:
+            box.disconnect(self, cascade = False)
+
+    def is_connected(self, box):
+        return box in self.connections
+
diff --git a/src/neco/execution/__init__.py b/src/neco/execution/__init__.py
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/src/neco/execution/callbacks.py b/src/neco/execution/callbacks.py
new file mode 100644 (file)
index 0000000..a30411f
--- /dev/null
@@ -0,0 +1,20 @@
+
+def deploy(ec_weakref, xml):
+    from neco.util.parser import XMLParser    
+
+    # parse xml and build topology graph
+    parser = XMLParser()
+    box = parser.from_xml(xml)
+
+    # instantiate resource boxes
+    
+
+    # allocate physical resources
+    # configure physical resources
+    # allocate virtual resources
+    # configure virtual resources
+    # allocate software resources
+    # configure software resources
+    # schedule application start/stop
+
+
diff --git a/src/neco/execution/ec.py b/src/neco/execution/ec.py
new file mode 100644 (file)
index 0000000..01a6aba
--- /dev/null
@@ -0,0 +1,164 @@
+import logging
+import os
+import sys
+import threading
+import time
+import weakref
+
+from neco.execution import scheduler, tasks
+from neco.util import guid
+from neco.util.timefuncs import strfnow, strfdiff, strfvalid 
+from neco.util.parallel import ParallelRun
+
+_reschedule_delay = "0.1s"
+
+class ExperimentController(object):
+    def __init__(self, root_dir = "/tmp", loglevel = 'error'):
+        super(ExperimentController, self).__init__()
+        # root directory to store files
+        self._root_dir = root_dir
+
+        # generator of globally unique ids
+        self._guid_generator = guid.GuidGenerator()
+        
+        # Scheduler
+        self._scheduler = scheduler.HeapScheduler()
+
+        # Tasks
+        self._tasks = dict()
+        # Resources
+        self._resources = dict()
+       
+        # Event processing thread
+        self._cond = threading.Condition()
+        self._stop = False
+        self._thread = threading.Thread(target = self._process_tasks)
+        self._thread.start()
+       
+        # Logging
+        self._logger = logging.getLogger("neco.execution.ec")
+        self._logger.setLevel(getattr(logging, loglevel.upper()))
+
+    def resource(self, guid):
+        return self._resources.get(guid)
+
+    def terminate(self):
+        self._stop = True
+        self._cond.acquire()
+        self._cond.notify()
+        self._cond.release()
+        if self._thread.is_alive():
+           self._thread.join()
+
+    def task_info(self, tid):
+        task = self._tasks.get(tid)
+        if not task:
+            return (None, None)
+        return (task.status, task.result)
+
+    def schedule(self, date, callback, args = None, kwargs = None):
+        """
+            date    string containing execution time for the task.
+                    It can be expressed as an absolute time, using
+                    timestamp format, or as a relative time matching
+                    ^\d+.\d+(h|m|s|ms|us)$
+
+            callback    code to be executed for the task. Must be a
+                        Python function, and receives args and kwargs
+                        as arguments.
+                        The callback will always be invoked passing a 
+                        week reference to the controller as first 
+                        argument.
+                        The callback must return a (status, result) 
+                        tuple where status is one of : 
+                        task.TaskStatus.FAIL, 
+                        task.TaskStatus.SUCCESS, 
+                        task.TaskStatus.RETRY, 
+                        task.TaskStatus.RECYCLE 
+        """
+        timestamp = strfvalid(date)
+        
+        args = args or []
+        kwargs = kwargs or {}
+
+        task = tasks.Task(timestamp, callback, args, kwargs)
+        task = self._schedule(task)
+
+        self._tasks[task.id] = task
+
+        return task.id
+
+    ###########################################################################
+    #### Internal methods
+    ###########################################################################
+
+    def _schedule(self, task):
+        task = self._scheduler.schedule(task)
+
+        # Notify condition to wake up the processing thread
+        self._cond.acquire()
+        self._cond.notify()
+        self._cond.release()
+        return task
+     
+    def _process_tasks(self):
+        runner = ParallelRun(maxthreads = 50)
+        runner.start()
+
+        try:
+            while not self._stop:
+                self._cond.acquire()
+                task = self._scheduler.next()
+                self._cond.release()
+
+                if not task:
+                    # It there are not tasks in the tasks queue we need to 
+                    # wait until a call to schedule wakes us up
+                    self._cond.acquire()
+                    self._cond.wait()
+                    self._cond.release()
+                else: 
+                    # If the task timestamp is in the future the thread needs to wait
+                    # until time elapse or until another task is scheduled
+                    now = strfnow()
+                    if now < task.timestamp:
+                        # Calculate time difference in seconds
+                        timeout = strfdiff(task.timestamp, now)
+                        # Re-schedule task with the same timestamp
+                        self._scheduler.schedule(task)
+                        # Sleep until timeout or until a new task awakes the condition
+                        self._cond.acquire()
+                        self._cond.wait(timeout)
+                        self._cond.release()
+                    else:
+                        # Process tasks in parallel
+                        runner.put(self._execute_task, task)
+        except:  
+            import traceback
+            err = traceback.format_exc()
+            self._logger.error("Error while processing tasks in the EC: %s" % err)
+    def _execute_task(self, task):
+        # Invoke callback
+        ec = weakref.ref(self)
+        try:
+            (task.status, task.result) = task.callback(ec, *task.args, **task.kwargs)
+        except:
+            import traceback
+            err = traceback.format_exc()
+            self._logger.error("Error while executing event: %s" % err)
+
+            # task marked as FAIL
+            task.status = tasks.TaskStatus.FAIL
+            task.result = err
+
+        if task.status == tasks.TaskStatus.RETRY:
+            # Re-schedule same task in the near future
+            task.timestamp = strfvalid(_reschedule_delay)
+            self._schedule(task)
+        elif task.status == tasks.TaskStatus.RECYCLE:
+            # Re-schedule t in the future
+            timestamp = strfvalid(task.result)
+            self.schedule(timestamp, task.callback, task.args, task.kwargs)
+
diff --git a/src/neco/execution/resource.py b/src/neco/execution/resource.py
new file mode 100644 (file)
index 0000000..30294d3
--- /dev/null
@@ -0,0 +1,84 @@
+import logging
+import weakref
+
+def match_tags(box, all_tags, exact_tags):
+    """ returns True if box has required tags """
+    tall = set(all_tags)
+    texact = set(exact_tags)
+
+    if texact and box.connections == texact:
+        return True
+
+    if tall and tall.issubset(box.connections):
+        return True
+
+    return False
+
+def find_boxes(box, all_tags = None, exact_tags = None, max_depth = 1):
+    """ Look for the connected boxes with the required tags, doing breath-first
+    search, until max_depth ( max_depth = None will traverse the entire graph ).
+    """
+    if not all_tags and not exact_tags:
+        msg = "No matching criteria for resources."
+        raise RuntimeError(msg)
+
+    queue = set()
+    # enqueue (depth, box) 
+    queue.add((0, box))
+    
+    traversed = set()
+    traversed.add(box)
+
+    depth = 0
+
+    result = set()
+
+    while len(q) > 0: 
+        (depth, a) = queue.pop()
+        if match_tags(a, all_tags, exact_tags):
+            result.add(a)
+
+        if not max_depth or depth <= max_depth:
+            depth += 1
+            for b in sorted(a.connections):
+                if b not in traversed:
+                    traversed.add(b)
+                    queue.add((depth, b))
+    
+    return result
+
+class Resource(object):
+    def __init__(self, box, ec):
+        self._box = weakref.ref(box)
+        self._ec = weakref.ref(ec)
+
+        # Logging
+        loglevel = "debug"
+        self._logger = logging.getLogger("neco.execution.Resource.%s" % 
+            self.box.guid)
+        self._logger.setLevel(getattr(logging, loglevel.upper()))
+
+    @property
+    def box(self):
+        return self._box()
+
+    @property
+    def ec(self):
+        return self._ec()
+
+    def find_resources(self, all_tags = None, exact_tags = None, 
+        max_depth = 1):
+        resources = set()
+
+        boxes = find_boxes(self.box, all_tags, exact_tags, max_depth)
+        for b in boxes:
+            r = self.ec.resource(b.guid)
+            resources.add(r)
+
+        return resources
+
+class ResourceResolver(object):
+    def __init__(self):
+        pass  
+
+
diff --git a/src/neco/execution/scheduler.py b/src/neco/execution/scheduler.py
new file mode 100644 (file)
index 0000000..202b711
--- /dev/null
@@ -0,0 +1,40 @@
+import itertools
+import heapq
+
+class HeapScheduler(object):
+    """ This class is thread safe.
+    All calls to C Extensions are made atomic by the GIL in the CPython implementation.
+    heapq.heappush, heapq.heappop, and list access are therefore thread-safe """
+
+    def __init__(self):
+        super(HeapScheduler, self).__init__()
+        self._queue = list() 
+        self._valid = set()
+        self._idgen = itertools.count(1)
+
+    def schedule(self, task):
+        if task.id == None:
+            task.id = self._idgen.next()
+        entry = (task.timestamp, task.id, task)
+        self._valid.add(task.id)
+        heapq.heappush(self._queue, entry)
+        return task
+
+    def remove(self, tid):
+        try:
+            self._valid.remove(tid)
+        except:
+            pass
+
+    def next(self):
+        while self._queue:
+            try:
+                timestamp, tid, task = heapq.heappop(self._queue)
+                if tid in self._valid:
+                    self.remove(tid)
+                    return task
+            except IndexError:
+                # heap empty
+                pass
+        return None
+
diff --git a/src/neco/execution/tags.py b/src/neco/execution/tags.py
new file mode 100644 (file)
index 0000000..244713a
--- /dev/null
@@ -0,0 +1,21 @@
+NODE = "node"
+NETWORK_INTERFACE = "network interface"
+SWITCH = "switch"
+TUNNEL = "tunnel"
+APPLICATION = "application"
+CHANNEL = "channel"
+CPU = "cpu"
+
+IP4ADDRESS = "ipv4"
+IP6ADDRESS = "ipv6"
+MACADDRESS = "mac"
+IPADDRESS = "ip"
+ROUTE = "route"
+FLOW = "flow"
+
+WIRELESS = "wireless"
+ETHERNET = "ethernet"
+SIMULATED = "simulated"
+VIRTUAL = "virtual"
+MOBILE = "mobile"
+
diff --git a/src/neco/execution/tasks.py b/src/neco/execution/tasks.py
new file mode 100644 (file)
index 0000000..ee24fe8
--- /dev/null
@@ -0,0 +1,18 @@
+
+class TaskStatus:
+    NEW = 0
+    RETRY = 1
+    SUCCESS = 2
+    FAIL = 3
+    RECYCLE = 4
+
+class Task(object):
+    def __init__(self, timestamp, callback, args, kwargs):
+        self.id = None 
+        self.timestamp = timestamp
+        self.callback = callback
+        self.args = args
+        self.kwargs = kwargs
+        self.result = None
+        self.status = TaskStatus.NEW
+
diff --git a/src/neco/resources/__init__.py b/src/neco/resources/__init__.py
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/src/neco/resources/base/__init__.py b/src/neco/resources/base/__init__.py
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/src/neco/resources/base/application.py b/src/neco/resources/base/application.py
new file mode 100644 (file)
index 0000000..40e07ff
--- /dev/null
@@ -0,0 +1,68 @@
+from neco.execution import tags
+from neco.execution.resource import Resource
+
+import cStringIO
+import logging
+
+class Application(Resource):
+    def __init__(self, box, ec):
+        super(Application, self).__init__(box, ec)
+        self.command = None
+        self.pid = None
+        self.ppid = None
+        self.stdin = None
+        self.del_app_home = True
+        self.env = None
+        
+        self.app_home = "${HOME}/app-%s" % self.box.guid
+        self._node = None
+       
+        # Logging
+        loglevel = "debug"
+        self._logger = logging.getLogger("neco.resources.base.Application.%s" % self.guid)
+        self._logger.setLevel(getattr(logging, loglevel.upper()))
+
+    @property
+    def node(self):
+        if self._node:
+            return self._node
+
+        # XXX: What if it is connected to more than one node?
+        resources = self.find_resources(exact_tags = [tags.NODE])
+        self._node = resources[0] is len(resources) == 1 else None
+        return self._node
+
+    def make_app_home(self):
+        self.node.mkdir(self.app_home)
+
+        if self.stdin:
+            self.node.upload(self.stdin, os.path.join(self.app_home, 'stdin'))
+
+    def cleanup(self):
+        self.kill()
+
+    def run(self):
+        dst = os.path.join(self.app_home, "app.ssh")
+        
+        # Create shell script with the command
+        # This way, complex commands and scripts can be ran seamlessly
+        # sync files
+        cmd = ""
+        if self.env:
+            for envkey, envvals in env.iteritems():
+                for envval in envvals:
+                    cmd += 'export %s=%s\n' % (envkey, envval)
+
+        cmd += self.command
+        self.node.upload(cmd, dst)
+
+        cmd = "bash ./app.sh",
+        self.node.run(cmd, self.app_home)
+        self.pid, self.ppid = self.node.checkpid(self.app_home)
+
+    def status(self):
+        return self.node.status(self.pid, self.ppid)
+
+    def kill(self):
+        return self.node.kill(self.pid, self.ppid)
+
diff --git a/src/neco/resources/base/linux_node.py b/src/neco/resources/base/linux_node.py
new file mode 100644 (file)
index 0000000..b7d10cc
--- /dev/null
@@ -0,0 +1,207 @@
+from neco.execution.resource import Resource
+from neco.util.sshfuncs import eintr_retry, shell_escape, rexec, rcopy, \
+        rspawn, rcheck_pid, rstatus, rkill, RUNNING 
+
+import cStringIO
+import logging
+import os.path
+
+class LinuxNode(Resource):
+    def __init__(self, box, ec):
+        super(LinuxNode, self).__init__(box, ec)
+        self.ip = None
+        self.host = None
+        self.user = None
+        self.port = None
+        self.identity_file = None
+        # packet management system - either yum or apt for now...
+        self._pm = None
+       
+        # Logging
+        loglevel = "debug"
+        self._logger = logging.getLogger("neco.resources.base.LinuxNode.%s" %\
+                self.box.guid)
+        self._logger.setLevel(getattr(logging, loglevel.upper()))
+
+    @property
+    def pm(self):
+        if self._pm:
+            return self._pm
+
+        if (not (self.host or self.ip) or not self.user):
+            msg = "Can't resolve package management system. Insufficient data."
+            self._logger.error(msg)
+            raise RuntimeError(msg)
+
+        out = self.execute("cat /etc/issue")
+
+        if out.find("Fedora") == 0:
+            self._pm = "yum -y "
+        elif out.find("Debian") == 0 or out.find("Ubuntu") ==0:
+            self._pm = "apt-get -y "
+        else:
+            msg = "Can't resolve package management system. Unknown OS."
+            self._logger.error(msg)
+            raise RuntimeError(msg)
+
+        return self._pm
+    
+    def execute(self, command,
+            agent = True,
+            sudo = False,
+            stdin = "", 
+            tty = False,
+            timeout = None,
+            retry = 0,
+            err_on_timeout = True,
+            connect_timeout = 30,
+            persistent = True):
+        """ Notice that this invocation will block until the
+        execution finishes. If this is not the desired behavior,
+        use 'run' instead."""
+        (out, err), proc = eintr_retry(rexec)(
+                command, 
+                self.host or self.ip, 
+                self.user,
+                port = self.port, 
+                agent = agent,
+                sudo = sudo,
+                stdin = stdin, 
+                identity_file = self.identity_file,
+                tty = tty,
+                timeout = timeout,
+                retry = retry,
+                err_on_timeout = err_on_timeout,
+                connect_timeout = connect_timeout,
+                persistent = persistent)
+
+        if proc.wait():
+            msg = "Failed to execute command %s at node %s: %s %s" % \
+                    (command, self.host or self.ip, out, err,)
+            self._logger.warn(msg)
+            raise RuntimeError(msg)
+
+        return out
+
+    def package_install(self, dependencies):
+        if not isinstance(dependencies, list):
+            dependencies = [dependencies]
+
+        for d in dependencies:
+            self.execute("%s install %s" % (self.pm, d), sudo = True, 
+                    tty2 = True)
+
+    def upload(self, src, dst):
+        if not os.path.isfile(src):
+            src = cStringIO.StringIO(src)
+
+        (out, err), proc = eintr_retry(rcopy)(
+            src, dst, 
+            self.host or self.ip, 
+            self.user,
+            port = self.port,
+            identity_file = self.identity_file)
+
+        if proc.wait():
+            msg = "Error uploading to %s got:\n%s%s" %\
+                    (self.host or self.ip, out, err)
+            self._logger.error(msg)
+            raise RuntimeError(msg)
+
+    def is_alive(self, verbose = False):
+        (out, err), proc = eintr_retry(rexec)(
+                "echo 'ALIVE'",
+                self.host or self.ip, 
+                self.user,
+                port = self.port, 
+                identity_file = self.identity_file,
+                timeout = 60,
+                err_on_timeout = False,
+                persistent = False)
+        
+        if proc.wait():
+            self._logger.warn("Unresponsive node %s got:\n%s%s", self.host, out, err)
+            return False
+        elif out.strip().startswith('ALIVE'):
+            return True
+        else:
+            self._logger.warn("Unresponsive node %s got:\n%s%s", self.host, out, err)
+            return False
+
+    def mkdir(self, path, clean = True):
+        if clean:
+            self.execute(
+                "rm -f %s" % shell_escape(path),
+                timeout = 120,
+                retry = 3
+                )
+
+        self.execute(
+            "mkdir -p %s" % shell_escape(path),
+            timeout = 120,
+            retry = 3
+            )
+
+    def run(self, command, home, 
+            stdin = 'stdin', 
+            stdout = 'stdout', 
+            stderr = 'stderr', 
+            sudo = False):
+        self._logger.info("Running %s", command)
+
+        # Start process in a "daemonized" way, using nohup and heavy
+        # stdin/out redirection to avoid connection issues
+        (out,err), proc = rspawn(
+            command,
+            pidfile = './pid',
+            home = home,
+            stdin = stdin if stdin is not None else '/dev/null',
+            stdout = stdout if stdout else '/dev/null',
+            stderr = stderr if stderr else '/dev/null',
+            sudo = sudo,
+            host = self.host,
+            user = self.user,
+            port = self.port,
+            identity_file = self.identity_file
+            )
+        
+        if proc.wait():
+            raise RuntimeError, "Failed to set up application: %s %s" % (out,err,)
+    
+    def checkpid(self, path):            
+        # Get PID/PPID
+        # NOTE: wait a bit for the pidfile to be created
+        pidtuple = rcheck_pid(
+            os.path.join(path, 'pid'),
+            host = self.host,
+            user = self.user,
+            port = self.port,
+            identity_file = self.identity_file
+            )
+        
+        return pidtuple
+    
+    def status(self, pid, ppid):
+        status = rstatus(
+                pid, ppid,
+                host = self.host,
+                user = self.user,
+                port = self.port,
+                identity_file = self.identity_file
+                )
+           
+        return status
+    
+    def kill(self, pid, ppid, sudo = False):
+        status = self.status(pid, ppid)
+        if status == RUNNING:
+            # kill by ppid+pid - SIGTERM first, then try SIGKILL
+            rkill(
+                pid, ppid,
+                host = self.host,
+                user = self.user,
+                port = self.port,
+                sudo = sudo,
+                identity_file = self.identity_file
+                )
+
diff --git a/src/neco/resources/netns/__init__.py b/src/neco/resources/netns/__init__.py
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/src/neco/resources/ns3/__init__.py b/src/neco/resources/ns3/__init__.py
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/src/neco/util/__init__.py b/src/neco/util/__init__.py
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/src/neco/util/guid.py b/src/neco/util/guid.py
new file mode 100644 (file)
index 0000000..913e6ad
--- /dev/null
@@ -0,0 +1,16 @@
+# FIXME: This class is not thread-safe. 
+# Should it be made thread-safe?
+class GuidGenerator(object):
+    def __init__(self):
+        self._guids = list()
+
+    def next(self, guid = None):
+        if guid != None:
+            return guid
+        else:
+            last_guid = 0 if len(self._guids) == 0 else self._guids[-1]
+            guid = last_guid + 1 
+        self._guids.append(guid)
+        self._guids.sort()
+        return guid
+
diff --git a/src/neco/util/parallel.py b/src/neco/util/parallel.py
new file mode 100644 (file)
index 0000000..8dc39a7
--- /dev/null
@@ -0,0 +1,259 @@
+import threading
+import Queue
+import traceback
+import sys
+import os
+
+N_PROCS = None
+
+THREADCACHE = []
+THREADCACHEPID = None
+
+class WorkerThread(threading.Thread):
+    class QUIT:
+        pass
+    class REASSIGNED:
+        pass
+    
+    def run(self):
+        while True:
+            task = self.queue.get()
+            if task is None:
+                self.done = True
+                self.queue.task_done()
+                continue
+            elif task is self.QUIT:
+                self.done = True
+                self.queue.task_done()
+                break
+            elif task is self.REASSIGNED:
+                continue
+            else:
+                self.done = False
+            
+            try:
+                try:
+                    callable, args, kwargs = task
+                    rv = callable(*args, **kwargs)
+                    
+                    if self.rvqueue is not None:
+                        self.rvqueue.put(rv)
+                finally:
+                    self.queue.task_done()
+            except:
+                traceback.print_exc(file = sys.stderr)
+                self.delayed_exceptions.append(sys.exc_info())
+    
+    def waitdone(self):
+        while not self.queue.empty() and not self.done:
+            self.queue.join()
+    
+    def attach(self, queue, rvqueue, delayed_exceptions):
+        if self.isAlive():
+            self.waitdone()
+            oldqueue = self.queue
+        self.queue = queue
+        self.rvqueue = rvqueue
+        self.delayed_exceptions = delayed_exceptions
+        if self.isAlive():
+            oldqueue.put(self.REASSIGNED)
+    
+    def detach(self):
+        if self.isAlive():
+            self.waitdone()
+            self.oldqueue = self.queue
+        self.queue = Queue.Queue()
+        self.rvqueue = None
+        self.delayed_exceptions = []
+    
+    def detach_signal(self):
+        if self.isAlive():
+            self.oldqueue.put(self.REASSIGNED)
+            del self.oldqueue
+        
+    def quit(self):
+        self.queue.put(self.QUIT)
+        self.join()
+
+class ParallelMap(object):
+    def __init__(self, maxthreads = None, maxqueue = None, results = True):
+        global N_PROCS
+        global THREADCACHE
+        global THREADCACHEPID
+        
+        if maxthreads is None:
+            if N_PROCS is None:
+                try:
+                    f = open("/proc/cpuinfo")
+                    try:
+                        N_PROCS = sum("processor" in l for l in f)
+                    finally:
+                        f.close()
+                except:
+                    pass
+            maxthreads = N_PROCS
+        
+        if maxthreads is None:
+            maxthreads = 4
+        
+        self.queue = Queue.Queue(maxqueue or 0)
+
+        self.delayed_exceptions = []
+        
+        if results:
+            self.rvqueue = Queue.Queue()
+        else:
+            self.rvqueue = None
+        
+        # Check threadcache
+        if THREADCACHEPID is None or THREADCACHEPID != os.getpid():
+            del THREADCACHE[:]
+            THREADCACHEPID = os.getpid()
+    
+        self.workers = []
+        for x in xrange(maxthreads):
+            t = None
+            if THREADCACHE:
+                try:
+                    t = THREADCACHE.pop()
+                except:
+                    pass
+            if t is None:
+                t = WorkerThread()
+                t.setDaemon(True)
+            else:
+                t.waitdone()
+            t.attach(self.queue, self.rvqueue, self.delayed_exceptions)
+            self.workers.append(t)
+    
+    def __del__(self):
+        self.destroy()
+    
+    def destroy(self):
+        # Check threadcache
+        global THREADCACHE
+        global THREADCACHEPID
+        if THREADCACHEPID is None or THREADCACHEPID != os.getpid():
+            del THREADCACHE[:]
+            THREADCACHEPID = os.getpid()
+
+        for worker in self.workers:
+            worker.waitdone()
+        for worker in self.workers:
+            worker.detach()
+        for worker in self.workers:
+            worker.detach_signal()
+        THREADCACHE.extend(self.workers)
+        del self.workers[:]
+        
+    def put(self, callable, *args, **kwargs):
+        self.queue.put((callable, args, kwargs))
+    
+    def put_nowait(self, callable, *args, **kwargs):
+        self.queue.put_nowait((callable, args, kwargs))
+
+    def start(self):
+        for thread in self.workers:
+            if not thread.isAlive():
+                thread.start()
+    
+    def join(self):
+        for thread in self.workers:
+            # That's the sync signal
+            self.queue.put(None)
+            
+        self.queue.join()
+        for thread in self.workers:
+            thread.waitdone()
+        
+        if self.delayed_exceptions:
+            typ,val,loc = self.delayed_exceptions[0]
+            del self.delayed_exceptions[:]
+            raise typ,val,loc
+        
+        self.destroy()
+    
+    def sync(self):
+        self.queue.join()
+        if self.delayed_exceptions:
+            typ,val,loc = self.delayed_exceptions[0]
+            del self.delayed_exceptions[:]
+            raise typ,val,loc
+        
+    def __iter__(self):
+        if self.rvqueue is not None:
+            while True:
+                try:
+                    yield self.rvqueue.get_nowait()
+                except Queue.Empty:
+                    self.queue.join()
+                    try:
+                        yield self.rvqueue.get_nowait()
+                    except Queue.Empty:
+                        raise StopIteration
+            
+    
+class ParallelFilter(ParallelMap):
+    class _FILTERED:
+        pass
+    
+    def __filter(self, x):
+        if self.filter_condition(x):
+            return x
+        else:
+            return self._FILTERED
+    
+    def __init__(self, filter_condition, maxthreads = None, maxqueue = None):
+        super(ParallelFilter, self).__init__(maxthreads, maxqueue, True)
+        self.filter_condition = filter_condition
+
+    def put(self, what):
+        super(ParallelFilter, self).put(self.__filter, what)
+    
+    def put_nowait(self, what):
+        super(ParallelFilter, self).put_nowait(self.__filter, what)
+        
+    def __iter__(self):
+        for rv in super(ParallelFilter, self).__iter__():
+            if rv is not self._FILTERED:
+                yield rv
+
+class ParallelRun(ParallelMap):
+    def __run(self, x):
+        fn, args, kwargs = x
+        return fn(*args, **kwargs)
+    
+    def __init__(self, maxthreads = None, maxqueue = None):
+        super(ParallelRun, self).__init__(maxthreads, maxqueue, True)
+
+    def put(self, what, *args, **kwargs):
+        super(ParallelRun, self).put(self.__run, (what, args, kwargs))
+    
+    def put_nowait(self, what, *args, **kwargs):
+        super(ParallelRun, self).put_nowait(self.__filter, (what, args, kwargs))
+
+
+def pmap(mapping, iterable, maxthreads = None, maxqueue = None):
+    mapper = ParallelMap(
+        maxthreads = maxthreads,
+        maxqueue = maxqueue,
+        results = True)
+    mapper.start()
+    for elem in iterable:
+        mapper.put(elem)
+    rv = list(mapper)
+    mapper.join()
+    return rv
+
+def pfilter(condition, iterable, maxthreads = None, maxqueue = None):
+    filtrer = ParallelFilter(
+        condition,
+        maxthreads = maxthreads,
+        maxqueue = maxqueue)
+    filtrer.start()
+    for elem in iterable:
+        filtrer.put(elem)
+    rv = list(filtrer)
+    filtrer.join()
+    return rv
+
diff --git a/src/neco/util/parser.py b/src/neco/util/parser.py
new file mode 100644 (file)
index 0000000..e2c5478
--- /dev/null
@@ -0,0 +1,147 @@
+from neco.design.box import Box
+
+from xml.dom import minidom
+import sys
+
+STRING = "string"
+BOOL = "bool"
+INTEGER = "integer"
+DOUBLE = "float"
+
+def xmlencode(s):
+    if isinstance(s, str):
+        rv = s.decode("latin1")
+    elif not isinstance(s, unicode):
+        rv = unicode(s)
+    else:
+        rv = s
+    return rv.replace(u'\x00',u'&#0000;')
+
+def xmldecode(s):
+    return s.replace(u'&#0000',u'\x00').encode("utf8")
+
+def from_type(value):
+    if isinstance(value, str):
+        return STRING
+    if isinstance(value, bool):
+        return BOOL
+    if isinstance(value, int):
+        return INTEGER
+    if isinstance(value, float):
+        return DOUBLE
+
+def to_type(type, value):
+    if type == STRING:
+        return str(value)
+    if type == BOOL:
+        return value == "True"
+    if type == INTEGER:
+        return int(value)
+    if type == DOUBLE:
+        return float(value)
+
+class XMLParser(object):
+    def to_xml(self, box):
+        doc = minidom.Document()
+
+        root = doc.createElement("boxes")
+        doc.appendChild(root)
+
+        traversed = dict()
+        self._traverse_boxes(doc, traversed, box)
+
+        # Keep the order
+        for guid in sorted(traversed.keys()):
+            bnode = traversed[guid]
+            root.appendChild(bnode)
+       
+        try:
+            xml = doc.toprettyxml(indent="    ", encoding="UTF-8")
+        except:
+            print >>sys.stderr, "Oops: generating XML from %s" % (data,)
+            raise
+        
+        return xml
+
+    def _traverse_boxes(self, doc, traversed, box):
+        bnode = doc.createElement("box")
+        bnode.setAttribute("guid", xmlencode(box.guid))
+        bnode.setAttribute("label", xmlencode(box.label))
+        bnode.setAttribute("x", xmlencode(box.x))
+        bnode.setAttribute("y", xmlencode(box.y))
+        bnode.setAttribute("width", xmlencode(box.width))
+        bnode.setAttribute("height", xmlencode(box.height))
+
+        traversed[box.guid] = bnode
+
+        anode = doc.createElement("attributes")
+        bnode.appendChild(anode)
+        for name in sorted(box.attributes):
+            value = getattr(box.a, name)
+            aanode = doc.createElement("attribute")
+            anode.appendChild(aanode)
+            aanode.setAttribute("name", xmlencode(name))
+            aanode.setAttribute("value", xmlencode(value))
+            aanode.setAttribute("type", from_type(value))
+
+        tnode = doc.createElement("tags")
+        bnode.appendChild(tnode)
+        for tag in sorted(box.tags):
+            ttnode = doc.createElement("tag")
+            tnode.appendChild(ttnode)
+            ttnode.setAttribute("name", xmlencode(tag))
+
+        cnode = doc.createElement("connections")
+        bnode.appendChild(cnode)
+        for b in sorted(box.connections):
+            ccnode = doc.createElement("connection")
+            cnode.appendChild(ccnode)
+            ccnode.setAttribute("guid", xmlencode(b.guid))
+            if b.guid not in traversed:
+                self._traverse_boxes(doc, traversed, b)
+
+    def from_xml(self, xml):
+        doc = minidom.parseString(xml)
+        bnode_list = doc.getElementsByTagName("box")
+
+        boxes = dict()
+        connections = dict()
+
+        for bnode in bnode_list:
+            if bnode.nodeType == doc.ELEMENT_NODE:
+                guid = int(bnode.getAttribute("guid"))
+                label = xmldecode(bnode.getAttribute("label"))
+                x = float(bnode.getAttribute("x"))
+                y = float(bnode.getAttribute("y"))
+                height = float(bnode.getAttribute("height"))
+                width = float(bnode.getAttribute("width"))
+                box = Box(label=label, guid=guid)
+                boxes[guid] = box
+
+                anode_list = bnode.getElementsByTagName("attribute") 
+                for anode in anode_list:
+                    name = xmldecode(anode.getAttribute("name"))
+                    value = xmldecode(anode.getAttribute("value"))
+                    type = xmldecode(anode.getAttribute("type"))
+                    value = to_type(type, value)
+                    setattr(box.a, name, value)
+                    
+                tnode_list = bnode.getElementsByTagName("tag") 
+                for tnode in tnode_list:
+                    value = xmldecode(tnode.getAttribute("name"))
+                    box.tadd(value)
+
+                connections[box] = set()
+                cnode_list = bnode.getElementsByTagName("connection")
+                for cnode in cnode_list:
+                    guid = int(cnode.getAttribute("guid"))
+                    connections[box].add(guid)
+
+        for box, conns in connections.iteritems():
+            for guid in conns:
+                b = boxes[guid]
+                box.connect(b)
+
+        return box
+
+
diff --git a/src/neco/util/plot.py b/src/neco/util/plot.py
new file mode 100644 (file)
index 0000000..2959639
--- /dev/null
@@ -0,0 +1,30 @@
+import networkx
+import tempfile
+
+class Plotter(object):
+    def __init__(self, box):
+        self._graph = networkx.Graph(graph = dict(overlap = "false"))
+
+        traversed = set()
+        self._traverse_boxes(traversed, box)
+
+    def _traverse_boxes(self, traversed, box):
+        traversed.add(box.guid)
+
+        self._graph.add_node(box.label, 
+                width = 50/72.0, # 1 inch = 72 points
+                height = 50/72.0, 
+                shape = "circle")
+
+        for b in box.connections:
+            self._graph.add_edge(box.label, b.label)
+            if b.guid not in traversed:
+                self._traverse_boxes(traversed, b)
+
+    def plot(self):
+        f = tempfile.NamedTemporaryFile(delete=False)
+        networkx.draw_graphviz(self._graph)
+        networkx.write_dot(self._graph, f.name)
+        f.close()
+        return f.name
+
diff --git a/src/neco/util/sshfuncs.py b/src/neco/util/sshfuncs.py
new file mode 100644 (file)
index 0000000..77698ca
--- /dev/null
@@ -0,0 +1,742 @@
+import base64
+import errno
+import os
+import os.path
+import select
+import signal
+import socket
+import subprocess
+import time
+import traceback
+import re
+import tempfile
+import hashlib
+
+OPENSSH_HAS_PERSIST = None
+CONTROL_PATH = "yyyyy_ssh_control_path"
+
+if hasattr(os, "devnull"):
+    DEV_NULL = os.devnull
+else:
+    DEV_NULL = "/dev/null"
+
+SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
+
+hostbyname_cache = dict()
+
+class STDOUT: 
+    """
+    Special value that when given to rspawn in stderr causes stderr to 
+    redirect to whatever stdout was redirected to.
+    """
+
+class RUNNING:
+    """
+    Process is still running
+    """
+
+class FINISHED:
+    """
+    Process is finished
+    """
+
+class NOT_STARTED:
+    """
+    Process hasn't started running yet (this should be very rare)
+    """
+
+def openssh_has_persist():
+    """ The ssh_config options ControlMaster and ControlPersist allow to
+    reuse a same network connection for multiple ssh sessions. In this 
+    way limitations on number of open ssh connections can be bypassed.
+    However, older versions of openSSH do not support this feature.
+    This function is used to determine if ssh connection persist features
+    can be used.
+    """
+    global OPENSSH_HAS_PERSIST
+    if OPENSSH_HAS_PERSIST is None:
+        proc = subprocess.Popen(["ssh","-v"],
+            stdout = subprocess.PIPE,
+            stderr = subprocess.STDOUT,
+            stdin = open("/dev/null","r") )
+        out,err = proc.communicate()
+        proc.wait()
+        
+        vre = re.compile(r'OpenSSH_(?:[6-9]|5[.][8-9]|5[.][1-9][0-9]|[1-9][0-9]).*', re.I)
+        OPENSSH_HAS_PERSIST = bool(vre.match(out))
+    return OPENSSH_HAS_PERSIST
+
+def shell_escape(s):
+    """ Escapes strings so that they are safe to use as command-line 
+    arguments """
+    if SHELL_SAFE.match(s):
+        # safe string - no escaping needed
+        return s
+    else:
+        # unsafe string - escape
+        def escp(c):
+            if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",'"'):
+                return c
+            else:
+                return "'$'\\x%02x''" % (ord(c),)
+        s = ''.join(map(escp,s))
+        return "'%s'" % (s,)
+
+def eintr_retry(func):
+    """Retries a function invocation when a EINTR occurs"""
+    import functools
+    @functools.wraps(func)
+    def rv(*p, **kw):
+        retry = kw.pop("_retry", False)
+        for i in xrange(0 if retry else 4):
+            try:
+                return func(*p, **kw)
+            except (select.error, socket.error), args:
+                if args[0] == errno.EINTR:
+                    continue
+                else:
+                    raise 
+            except OSError, e:
+                if e.errno == errno.EINTR:
+                    continue
+                else:
+                    raise
+        else:
+            return func(*p, **kw)
+    return rv
+
+def make_connkey(user, host, port):
+    connkey = repr((user,host,port)).encode("base64").strip().replace('/','.')
+    if len(connkey) > 60:
+        connkey = hashlib.sha1(connkey).hexdigest()
+    return connkey
+
+def rexec(command, host, user, 
+        port = None, 
+        agent = True,
+        sudo = False,
+        stdin = "", 
+        identity_file = None,
+        tty = False,
+        tty2 = False,
+        timeout = None,
+        retry = 0,
+        err_on_timeout = True,
+        connect_timeout = 30,
+        persistent = True):
+    """
+    Executes a remote command, returns ((stdout,stderr),process)
+    """
+    connkey = make_connkey(user, host, port)
+    args = ['ssh', '-C',
+            # Don't bother with localhost. Makes test easier
+            '-o', 'NoHostAuthenticationForLocalhost=yes',
+            # XXX: Possible security issue
+            # Avoid interactive requests to accept new host keys
+            '-o', 'StrictHostKeyChecking=no',
+            '-o', 'ConnectTimeout=%d' % (int(connect_timeout),),
+            '-o', 'ConnectionAttempts=3',
+            '-o', 'ServerAliveInterval=30',
+            '-o', 'TCPKeepAlive=yes',
+            '-l', user, host]
+
+    if persistent and openssh_has_persist():
+        args.extend([
+            '-o', 'ControlMaster=auto',
+            '-o', 'ControlPath=/tmp/%s_%s' % ( CONTROL_PATH, connkey, ),
+            '-o', 'ControlPersist=60' ])
+    if agent:
+        args.append('-A')
+    if port:
+        args.append('-p%d' % port)
+    if identity_file:
+        args.extend(('-i', identity_file))
+    if tty:
+        args.append('-t')
+    elif tty2:
+        args.append('-t')
+        args.append('-t')
+    if sudo:
+        command = "sudo " + command
+    args.append(command)
+
+    print " ".join(args)
+
+    for x in xrange(retry or 3):
+        # connects to the remote host and starts a remote connection
+        proc = subprocess.Popen(args, 
+                stdout = subprocess.PIPE,
+                stdin = subprocess.PIPE, 
+                stderr = subprocess.PIPE)
+        
+        try:
+            out, err = _communicate(proc, stdin, timeout, err_on_timeout)
+            if proc.poll():
+                if err.strip().startswith('ssh: ') or err.strip().startswith('mux_client_hello_exchange: '):
+                    # SSH error, can safely retry
+                    continue
+                elif retry:
+                    # Probably timed out or plain failed but can retry
+                    continue
+            break
+        except RuntimeError,e:
+            if retry <= 0:
+                raise
+            retry -= 1
+        
+    return ((out, err), proc)
+
+def rcopy(source, dest, host, user,
+        port = None, 
+        agent = True, 
+        recursive = False,
+        identity_file = None):
+    """
+    Copies file from/to remote sites.
+    
+    Source and destination should have the user and host encoded
+    as per scp specs.
+    
+    If source is a file object, a special mode will be used to
+    create the remote file with the same contents.
+    
+    If dest is a file object, the remote file (source) will be
+    read and written into dest.
+    
+    In these modes, recursive cannot be True.
+    
+    Source can be a list of files to copy to a single destination,
+    in which case it is advised that the destination be a folder.
+    """
+    
+    if isinstance(source, file) and source.tell() == 0:
+        source = source.name
+
+    elif hasattr(source, 'read'):
+        tmp = tempfile.NamedTemporaryFile()
+        while True:
+            buf = source.read(65536)
+            if buf:
+                tmp.write(buf)
+            else:
+                break
+        tmp.seek(0)
+        source = tmp.name
+    
+    if isinstance(source, file) or isinstance(dest, file) \
+            or hasattr(source, 'read')  or hasattr(dest, 'write'):
+        assert not recursive
+        
+        connkey = make_connkey(user,host,port)
+        args = ['ssh', '-l', user, '-C',
+                # Don't bother with localhost. Makes test easier
+                '-o', 'NoHostAuthenticationForLocalhost=yes',
+                # XXX: Possible security issue
+                # Avoid interactive requests to accept new host keys
+                '-o', 'StrictHostKeyChecking=no',
+                '-o', 'ConnectTimeout=30',
+                '-o', 'ConnectionAttempts=3',
+                '-o', 'ServerAliveInterval=30',
+                '-o', 'TCPKeepAlive=yes',
+                host ]
+        if openssh_has_persist():
+            args.extend([
+                '-o', 'ControlMaster=auto',
+                '-o', 'ControlPath=/tmp/%s_%s' % ( CONTROL_PATH, connkey, ),
+                '-o', 'ControlPersist=60' ])
+        if port:
+            args.append('-P%d' % port)
+        if identity_file:
+            args.extend(('-i', identity_file))
+        
+        if isinstance(source, file) or hasattr(source, 'read'):
+            args.append('cat > %s' % (shell_escape(dest),))
+        elif isinstance(dest, file) or hasattr(dest, 'write'):
+            args.append('cat %s' % (shell_escape(dest),))
+        else:
+            raise AssertionError, "Unreachable code reached! :-Q"
+        
+        # connects to the remote host and starts a remote connection
+        if isinstance(source, file):
+            proc = subprocess.Popen(args, 
+                    stdout = open('/dev/null','w'),
+                    stderr = subprocess.PIPE,
+                    stdin = source)
+            err = proc.stderr.read()
+            eintr_retry(proc.wait)()
+            return ((None,err), proc)
+        elif isinstance(dest, file):
+            proc = subprocess.Popen(args, 
+                    stdout = open('/dev/null','w'),
+                    stderr = subprocess.PIPE,
+                    stdin = source)
+            err = proc.stderr.read()
+            eintr_retry(proc.wait)()
+            return ((None,err), proc)
+        elif hasattr(source, 'read'):
+            # file-like (but not file) source
+            proc = subprocess.Popen(args, 
+                    stdout = open('/dev/null','w'),
+                    stderr = subprocess.PIPE,
+                    stdin = subprocess.PIPE)
+            
+            buf = None
+            err = []
+            while True:
+                if not buf:
+                    buf = source.read(4096)
+                if not buf:
+                    #EOF
+                    break
+                
+                rdrdy, wrdy, broken = select.select(
+                    [proc.stderr],
+                    [proc.stdin],
+                    [proc.stderr,proc.stdin])
+                
+                if proc.stderr in rdrdy:
+                    # use os.read for fully unbuffered behavior
+                    err.append(os.read(proc.stderr.fileno(), 4096))
+                
+                if proc.stdin in wrdy:
+                    proc.stdin.write(buf)
+                    buf = None
+                
+                if broken:
+                    break
+            proc.stdin.close()
+            err.append(proc.stderr.read())
+                
+            eintr_retry(proc.wait)()
+            return ((None,''.join(err)), proc)
+        elif hasattr(dest, 'write'):
+            # file-like (but not file) dest
+            proc = subprocess.Popen(args, 
+                    stdout = subprocess.PIPE,
+                    stderr = subprocess.PIPE,
+                    stdin = open('/dev/null','w'))
+            
+            buf = None
+            err = []
+            while True:
+                rdrdy, wrdy, broken = select.select(
+                    [proc.stderr, proc.stdout],
+                    [],
+                    [proc.stderr, proc.stdout])
+                
+                if proc.stderr in rdrdy:
+                    # use os.read for fully unbuffered behavior
+                    err.append(os.read(proc.stderr.fileno(), 4096))
+                
+                if proc.stdout in rdrdy:
+                    # use os.read for fully unbuffered behavior
+                    buf = os.read(proc.stdout.fileno(), 4096)
+                    dest.write(buf)
+                    
+                    if not buf:
+                        #EOF
+                        break
+                
+                if broken:
+                    break
+            err.append(proc.stderr.read())
+                
+            eintr_retry(proc.wait)()
+            return ((None,''.join(err)), proc)
+        else:
+            raise AssertionError, "Unreachable code reached! :-Q"
+    else:
+        # plain scp
+        args = ['scp', '-q', '-p', '-C',
+                # Don't bother with localhost. Makes test easier
+                '-o', 'NoHostAuthenticationForLocalhost=yes',
+                # XXX: Possible security issue
+                # Avoid interactive requests to accept new host keys
+                '-o', 'StrictHostKeyChecking=no',
+                '-o', 'ConnectTimeout=30',
+                '-o', 'ConnectionAttempts=3',
+                '-o', 'ServerAliveInterval=30',
+                '-o', 'TCPKeepAlive=yes' ]
+                
+        if port:
+            args.append('-P%d' % port)
+        if recursive:
+            args.append('-r')
+        if identity_file:
+            args.extend(('-i', identity_file))
+
+        if isinstance(source,list):
+            args.extend(source)
+        else:
+            if openssh_has_persist():
+                connkey = make_connkey(user,host,port)
+                args.extend([
+                    '-o', 'ControlMaster=no',
+                    '-o', 'ControlPath=/tmp/%s_%s' % ( CONTROL_PATH, connkey, )])
+            args.append(source)
+        args.append("%s@%s:%s" %(user, host, dest))
+
+        # connects to the remote host and starts a remote connection
+        proc = subprocess.Popen(args, 
+                stdout = subprocess.PIPE,
+                stdin = subprocess.PIPE, 
+                stderr = subprocess.PIPE)
+        
+        comm = proc.communicate()
+        eintr_retry(proc.wait)()
+        return (comm, proc)
+
+def rspawn(command, pidfile, 
+        stdout = '/dev/null', 
+        stderr = STDOUT, 
+        stdin = '/dev/null', 
+        home = None, 
+        create_home = False, 
+        host = None, 
+        port = None, 
+        user = None, 
+        agent = None, 
+        sudo = False,
+        identity_file = None, 
+        tty = False):
+    """
+    Spawn a remote command such that it will continue working asynchronously.
+    
+    Parameters:
+        command: the command to run - it should be a single line.
+        
+        pidfile: path of a (ideally unique to this task) pidfile for tracking the process.
+        
+        stdout: path of a file to redirect standard output to - must be a string.
+            Defaults to /dev/null
+        stderr: path of a file to redirect standard error to - string or the special STDOUT value
+            to redirect to the same file stdout was redirected to. Defaults to STDOUT.
+        stdin: path of a file with input to be piped into the command's standard input
+        
+        home: path of a folder to use as working directory - should exist, unless you specify create_home
+        
+        create_home: if True, the home folder will be created first with mkdir -p
+        
+        sudo: whether the command needs to be executed as root
+        
+        host/port/user/agent/identity_file: see rexec
+    
+    Returns:
+        (stdout, stderr), process
+        
+        Of the spawning process, which only captures errors at spawning time.
+        Usually only useful for diagnostics.
+    """
+    # Start process in a "daemonized" way, using nohup and heavy
+    # stdin/out redirection to avoid connection issues
+    if stderr is STDOUT:
+        stderr = '&1'
+    else:
+        stderr = ' ' + stderr
+    
+    daemon_command = '{ { %(command)s  > %(stdout)s 2>%(stderr)s < %(stdin)s & } ; echo $! 1 > %(pidfile)s ; }' % {
+        'command' : command,
+        'pidfile' : shell_escape(pidfile),
+        
+        'stdout' : stdout,
+        'stderr' : stderr,
+        'stdin' : stdin,
+    }
+    
+    cmd = "%(create)s%(gohome)s rm -f %(pidfile)s ; %(sudo)s nohup bash -c %(command)s " % {
+            'command' : shell_escape(daemon_command),
+            
+            'sudo' : 'sudo -S' if sudo else '',
+            
+            'pidfile' : shell_escape(pidfile),
+            'gohome' : 'cd %s ; ' % (shell_escape(home),) if home else '',
+            'create' : 'mkdir -p %s ; ' % (shell_escape,) if create_home else '',
+        }
+
+    (out,err), proc = rexec(
+        cmd,
+        host = host,
+        port = port,
+        user = user,
+        agent = agent,
+        identity_file = identity_file,
+        tty = tty
+        )
+    
+    if proc.wait():
+        raise RuntimeError, "Failed to set up application: %s %s" % (out,err,)
+
+    return (out,err),proc
+
+@eintr_retry
+def rcheck_pid(pidfile,
+        host = None, 
+        port = None, 
+        user = None, 
+        agent = None, 
+        identity_file = None):
+    """
+    Check the pidfile of a process spawned with remote_spawn.
+    
+    Parameters:
+        pidfile: the pidfile passed to remote_span
+        
+        host/port/user/agent/identity_file: see rexec
+    
+    Returns:
+        
+        A (pid, ppid) tuple useful for calling remote_status and remote_kill,
+        or None if the pidfile isn't valid yet (maybe the process is still starting).
+    """
+
+    (out,err),proc = rexec(
+        "cat %(pidfile)s" % {
+            'pidfile' : pidfile,
+        },
+        host = host,
+        port = port,
+        user = user,
+        agent = agent,
+        identity_file = identity_file
+        )
+        
+    if proc.wait():
+        return None
+    
+    if out:
+        try:
+            return map(int,out.strip().split(' ',1))
+        except:
+            # Ignore, many ways to fail that don't matter that much
+            return None
+
+@eintr_retry
+def rstatus(pid, ppid, 
+        host = None, 
+        port = None, 
+        user = None, 
+        agent = None, 
+        identity_file = None):
+    """
+    Check the status of a process spawned with remote_spawn.
+    
+    Parameters:
+        pid/ppid: pid and parent-pid of the spawned process. See remote_check_pid
+        
+        host/port/user/agent/identity_file: see rexec
+    
+    Returns:
+        
+        One of NOT_STARTED, RUNNING, FINISHED
+    """
+
+    (out,err),proc = rexec(
+        "ps --pid %(pid)d -o pid | grep -c %(pid)d ; true" % {
+            'ppid' : ppid,
+            'pid' : pid,
+        },
+        host = host,
+        port = port,
+        user = user,
+        agent = agent,
+        identity_file = identity_file
+        )
+    
+    if proc.wait():
+        return NOT_STARTED
+    
+    status = False
+    if out:
+        try:
+            status = bool(int(out.strip()))
+        except:
+            if out or err:
+                logging.warn("Error checking remote status:\n%s%s\n", out, err)
+            # Ignore, many ways to fail that don't matter that much
+            return NOT_STARTED
+    return RUNNING if status else FINISHED
+    
+
+@eintr_retry
+def rkill(pid, ppid, 
+        host = None, 
+        port = None, 
+        user = None, 
+        agent = None, 
+        sudo = False,
+        identity_file = None, 
+        nowait = False):
+    """
+    Kill a process spawned with remote_spawn.
+    
+    First tries a SIGTERM, and if the process does not end in 10 seconds,
+    it sends a SIGKILL.
+    
+    Parameters:
+        pid/ppid: pid and parent-pid of the spawned process. See remote_check_pid
+        
+        sudo: whether the command was run with sudo - careful killing like this.
+        
+        host/port/user/agent/identity_file: see rexec
+    
+    Returns:
+        
+        Nothing, should have killed the process
+    """
+    
+    if sudo:
+        subkill = "$(ps --ppid %(pid)d -o pid h)" % { 'pid' : pid }
+    else:
+        subkill = ""
+    cmd = """
+SUBKILL="%(subkill)s" ;
+%(sudo)s kill -- -%(pid)d $SUBKILL || /bin/true
+%(sudo)s kill %(pid)d $SUBKILL || /bin/true
+for x in 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 ; do 
+    sleep 0.2 
+    if [ `ps --pid %(pid)d -o pid | grep -c %(pid)d` == '0' ]; then
+        break
+    else
+        %(sudo)s kill -- -%(pid)d $SUBKILL || /bin/true
+        %(sudo)s kill %(pid)d $SUBKILL || /bin/true
+    fi
+    sleep 1.8
+done
+if [ `ps --pid %(pid)d -o pid | grep -c %(pid)d` != '0' ]; then
+    %(sudo)s kill -9 -- -%(pid)d $SUBKILL || /bin/true
+    %(sudo)s kill -9 %(pid)d $SUBKILL || /bin/true
+fi
+"""
+    if nowait:
+        cmd = "( %s ) >/dev/null 2>/dev/null </dev/null &" % (cmd,)
+
+    (out,err),proc = rexec(
+        cmd % {
+            'ppid' : ppid,
+            'pid' : pid,
+            'sudo' : 'sudo -S' if sudo else '',
+            'subkill' : subkill,
+        },
+        host = host,
+        port = port,
+        user = user,
+        agent = agent,
+        identity_file = identity_file
+        )
+    
+    # wait, don't leave zombies around
+    proc.wait()
+
+# POSIX
+def _communicate(self, input, timeout=None, err_on_timeout=True):
+    read_set = []
+    write_set = []
+    stdout = None # Return
+    stderr = None # Return
+    
+    killed = False
+    
+    if timeout is not None:
+        timelimit = time.time() + timeout
+        killtime = timelimit + 4
+        bailtime = timelimit + 4
+
+    if self.stdin:
+        # Flush stdio buffer.  This might block, if the user has
+        # been writing to .stdin in an uncontrolled fashion.
+        self.stdin.flush()
+        if input:
+            write_set.append(self.stdin)
+        else:
+            self.stdin.close()
+    if self.stdout:
+        read_set.append(self.stdout)
+        stdout = []
+    if self.stderr:
+        read_set.append(self.stderr)
+        stderr = []
+
+    input_offset = 0
+    while read_set or write_set:
+        if timeout is not None:
+            curtime = time.time()
+            if timeout is None or curtime > timelimit:
+                if curtime > bailtime:
+                    break
+                elif curtime > killtime:
+                    signum = signal.SIGKILL
+                else:
+                    signum = signal.SIGTERM
+                # Lets kill it
+                os.kill(self.pid, signum)
+                select_timeout = 0.5
+            else:
+                select_timeout = timelimit - curtime + 0.1
+        else:
+            select_timeout = 1.0
+        
+        if select_timeout > 1.0:
+            select_timeout = 1.0
+            
+        try:
+            rlist, wlist, xlist = select.select(read_set, write_set, [], select_timeout)
+        except select.error,e:
+            if e[0] != 4:
+                raise
+            else:
+                continue
+        
+        if not rlist and not wlist and not xlist and self.poll() is not None:
+            # timeout and process exited, say bye
+            break
+
+        if self.stdin in wlist:
+            # When select has indicated that the file is writable,
+            # we can write up to PIPE_BUF bytes without risk
+            # blocking.  POSIX defines PIPE_BUF >= 512
+            bytes_written = os.write(self.stdin.fileno(), buffer(input, input_offset, 512))
+            input_offset += bytes_written
+            if input_offset >= len(input):
+                self.stdin.close()
+                write_set.remove(self.stdin)
+
+        if self.stdout in rlist:
+            data = os.read(self.stdout.fileno(), 1024)
+            if data == "":
+                self.stdout.close()
+                read_set.remove(self.stdout)
+            stdout.append(data)
+
+        if self.stderr in rlist:
+            data = os.read(self.stderr.fileno(), 1024)
+            if data == "":
+                self.stderr.close()
+                read_set.remove(self.stderr)
+            stderr.append(data)
+    
+    # All data exchanged.  Translate lists into strings.
+    if stdout is not None:
+        stdout = ''.join(stdout)
+    if stderr is not None:
+        stderr = ''.join(stderr)
+
+    # Translate newlines, if requested.  We cannot let the file
+    # object do the translation: It is based on stdio, which is
+    # impossible to combine with select (unless forcing no
+    # buffering).
+    if self.universal_newlines and hasattr(file, 'newlines'):
+        if stdout:
+            stdout = self._translate_newlines(stdout)
+        if stderr:
+            stderr = self._translate_newlines(stderr)
+
+    if killed and err_on_timeout:
+        errcode = self.poll()
+        raise RuntimeError, ("Operation timed out", errcode, stdout, stderr)
+    else:
+        if killed:
+            self.poll()
+        else:
+            self.wait()
+        return (stdout, stderr)
+
diff --git a/src/neco/util/timefuncs.py b/src/neco/util/timefuncs.py
new file mode 100644 (file)
index 0000000..7bd3d56
--- /dev/null
@@ -0,0 +1,50 @@
+import datetime
+import re
+
+_strf = "%Y%m%d%H%M%S%f"
+_reabs = re.compile("^\d{20}$")
+_rerel = re.compile("^(?P<time>\d+(.\d+)?)(?P<units>h|m|s|ms|us)$")
+
+# Work around to fix "ImportError: Failed to import _strptime because the import lockis held by another thread."
+datetime.datetime.strptime("20120807124732894211", _strf)
+
+def strfnow():
+    """ Current date """
+    return datetime.datetime.now().strftime(_strf)
+
+def strfdiff(str1, str2):
+    # Time difference in seconds without ignoring miliseconds
+    d1 = datetime.datetime.strptime(str1, _strf)
+    d2 = datetime.datetime.strptime(str2, _strf)
+    diff = d1 - d2
+    ddays = diff.days * 86400
+    dus = round(diff.microseconds * 1.0e-06, 2) 
+    ret = ddays + diff.seconds + dus
+    # delay must be > 0
+    return (ret or 0.001)
+
+def strfvalid(date):
+    """ User defined date to scheduler date """
+    if not date:
+        return strfnow()
+    if _reabs.match(date):
+        return date
+    m = _rerel.match(date)
+    if m:
+        time = float(m.groupdict()['time'])
+        units = m.groupdict()['units']
+        if units == 'h':
+            delta = datetime.timedelta(hours = time) 
+        elif units == 'm':
+            delta = datetime.timedelta(minutes = time) 
+        elif units == 's':
+            delta = datetime.timedelta(seconds = time) 
+        elif units == 'ms':
+            delta = datetime.timedelta(microseconds = (time*1000)) 
+        else:
+            delta = datetime.timedelta(microseconds = time) 
+        now = datetime.datetime.now()
+        d = now + delta
+        return d.strftime(_strf)
+    return None
+
diff --git a/test/design/box.py b/test/design/box.py
new file mode 100755 (executable)
index 0000000..71c8f15
--- /dev/null
@@ -0,0 +1,50 @@
+#!/usr/bin/env python
+
+from neco.design.box import Box 
+
+import unittest
+
+class BoxDesignTestCase(unittest.TestCase):
+    def test_simple_design(self):
+        node1 = Box()
+        node2 = Box()
+
+        node1.label = "uno"
+        node2.label = "dos"
+
+        node1.tadd('nodo')
+        node2.tadd('mynodo')
+
+        self.assertEquals(node1.tags, set(['nodo']))
+        self.assertEquals(node2.tags, set(['mynodo']))
+       
+        node1.a.hola = "chau"
+        node2.a.hello = "bye"
+
+        self.assertEquals(node1.a.hola, "chau")
+        self.assertEquals(node2.a.hello, "bye")
+
+        node1.connect(node2)
+        
+        self.assertEquals(node1.connections, set([node2]))
+        self.assertEquals(node2.connections, set([node1]))
+        self.assertTrue(node1.is_connected(node2))
+        self.assertTrue(node2.is_connected(node1))
+
+        self.assertEquals(node1.c.dos.a.hello, "bye")
+        self.assertEquals(node2.c.uno.a.hola, "chau")
+       
+        node2.disconnect(node1)
+
+        self.assertEquals(node1.connections, set([]))
+        self.assertEquals(node2.connections, set([]))
+        self.assertFalse(node1.is_connected(node2))
+        self.assertFalse(node2.is_connected(node1))
+
+        self.assertRaises(AttributeError, node1.c.dos)
+        self.assertRaises(AttributeError, node2.c.uno)
+
+
+if __name__ == '__main__':
+    unittest.main()
+
diff --git a/test/execution/ec.py b/test/execution/ec.py
new file mode 100755 (executable)
index 0000000..eda875e
--- /dev/null
@@ -0,0 +1,52 @@
+#!/usr/bin/env python
+
+from neco.execution.ec import ExperimentController 
+from neco.execution.tasks import TaskStatus
+
+import datetime
+import unittest
+
+class ExecuteControllersTestCase(unittest.TestCase):
+    def test_schedule_print(self):
+        def myfunc(ec_weakref):
+            result = id(ec_weakref())
+            return (TaskStatus.SUCCESS, result)
+
+        try:
+            ec = ExperimentController()
+
+            tid = ec.schedule("0s", myfunc)
+            status = None
+            while status != TaskStatus.SUCCESS:
+                (status, result) = ec.task_info(tid)
+
+            self.assertEquals(id(ec), result)
+        finally:
+            ec.terminate()
+
+    def test_schedule_date(self):
+        def get_time(ec_weakref):
+            timestamp = datetime.datetime.now() 
+            return (TaskStatus.SUCCESS, timestamp)
+
+        try:
+            ec = ExperimentController()
+
+            schedule_time = datetime.datetime.now()
+            
+            tid = ec.schedule("4s", get_time)
+            status = None
+            while status != TaskStatus.SUCCESS:
+                (status, execution_time) = ec.task_info(tid)
+
+            delta = execution_time - schedule_time
+            self.assertTrue(delta > datetime.timedelta(seconds=4))
+            self.assertTrue(delta < datetime.timedelta(seconds=5))
+
+        finally:
+            ec.terminate()
+
+
+if __name__ == '__main__':
+    unittest.main()
+
diff --git a/test/util/parser.py b/test/util/parser.py
new file mode 100755 (executable)
index 0000000..5ab70fe
--- /dev/null
@@ -0,0 +1,38 @@
+#!/usr/bin/env python
+
+from neco.design.box import Box 
+from neco.util.parser import XMLParser
+
+import unittest
+
+class BoxDesignTestCase(unittest.TestCase):
+    def test_to_xml(self):
+        node1 = Box()
+        node2 = Box()
+
+        node1.label = "node1"
+        node2.label = "node2"
+
+        node1.connect(node2)
+
+        node1.a.dog = "cat"
+        node1.a.one = "two"
+        node1.a.t = "q"
+
+        node1.c.node2.a.sky = "sea"
+        node2.a.bee = "honey"
+
+        node1.tadd("unooo")
+        node2.tadd("dosss")
+
+        parser = XMLParser()
+        xml = parser.to_xml(node1)
+        
+        node = parser.from_xml(xml)
+        xml2 = parser.to_xml(node)
+        
+        self.assertEquals(xml, xml2)
+
+if __name__ == '__main__':
+    unittest.main()
+
diff --git a/test/util/plot.py b/test/util/plot.py
new file mode 100755 (executable)
index 0000000..6c20da6
--- /dev/null
@@ -0,0 +1,35 @@
+#!/usr/bin/env python
+
+from neco.design.box import Box 
+from neco.util.plot import Plotter
+
+import subprocess
+import unittest
+
+class BoxPlotTestCase(unittest.TestCase):
+    def xtest_plot(self):
+        node1 = Box(label="node1")
+        ping1 = Box(label="ping")
+        mobility1 = Box(label="mob1")
+        node2 = Box(label="node2")
+        mobility2 = Box(label="mob2")
+        iface1 = Box(label="iface1")
+        iface2 = Box(label="iface2")
+        channel = Box(label="chan")
+
+        node1.connect(ping1)
+        node1.connect(mobility1)
+        node1.connect(iface1)
+        channel.connect(iface1)
+        channel.connect(iface2)
+        node2.connect(iface2)
+        node2.connect(mobility2)
+
+        plotter = Plotter(node1)
+        fname = plotter.plot()
+        subprocess.call(["dot", "-Tps", fname, "-o", "%s.ps"%fname])
+        subprocess.call(["evince","%s.ps"%fname])
+       
+if __name__ == '__main__':
+    unittest.main()
+