--- /dev/null
+SRCDIR = $(CURDIR)/src
+TESTDIR = $(CURDIR)/test
+BUILDDIR = $(CURDIR)/build
+DISTDIR = $(CURDIR)/dist
+
+# stupid distutils, it's broken in so many ways
+SUBBUILDDIR = $(shell python -c 'import distutils.util, sys; \
+ print "lib.%s-%s" % (distutils.util.get_platform(), \
+ sys.version[0:3])')
+PYTHON25 := $(shell python -c 'import sys; v = sys.version_info; \
+ print (1 if v[0] <= 2 and v[1] <= 5 else 0)')
+
+ifeq ($(PYTHON25),0)
+BUILDDIR := $(BUILDDIR)/$(SUBBUILDDIR)
+else
+BUILDDIR := $(BUILDDIR)/lib
+endif
+
+PYPATH = $(BUILDDIR):$(PYTHONPATH)
+COVERAGE = $(or $(shell which coverage), $(shell which python-coverage), \
+ coverage)
+
+all:
+ ./setup.py build
+
+install: all
+ ./setup.py install
+
+test: all
+ retval=0; \
+ for i in `find "$(TESTDIR)" -iname '*.py' -perm -u+x -type f`; do \
+ echo $$i; \
+ PYTHONPATH="$(PYPATH)" $$i -v || retval=$$?; \
+ done; exit $$retval
+
+coverage: all
+ rm -f .coverage
+ for i in `find "$(TESTDIR)" -perm -u+x -type f`; do \
+ set -e; \
+ PYTHONPATH="$(PYPATH)" $(COVERAGE) -x $$i -v; \
+ done
+ $(COVERAGE) -c
+ $(COVERAGE) -r -m `find "$(BUILDDIR)" -name \\*.py -type f`
+ rm -f .coverage
+
+clean:
+ ./setup.py clean
+ rm -f `find -name \*.pyc` .coverage *.pcap
+
+distclean: clean
+ rm -rf "$(DISTDIR)"
+
+MANIFEST:
+ find . -path ./.hg\* -prune -o -path ./build -prune -o \
+ -name \*.pyc -prune -o -name \*.swp -prune -o \
+ -name MANIFEST -prune -o -type f -print | \
+ sed 's#^\./##' | sort > MANIFEST
+
+dist: MANIFEST
+ ./setup.py sdist
+
+.PHONY: all clean distclean dist test coverage install MANIFEST
--- /dev/null
+[clean]
+all = 1
--- /dev/null
+#!/usr/bin/env python
+from distutils.core import setup
+import sys
+
+setup(
+ name = "neco",
+ version = "0.01",
+ description = "Network Experiment Controller",
+ author = "Alina Quereilhac",
+ url = "",
+ license = "GPLv2",
+ platforms = "Linux",
+ packages = [
+ "neco",
+ "neco.design",
+ "neco.execution",
+ "neco.resources",
+ "neco.resources.base",
+ "neco.resources.netns",
+ "neco.resources.ns3",
+ "neco.tags",
+ "neco.util"],
+ package_dir = {"": "src"},
+ )
--- /dev/null
+import logging
+logging.basicConfig()
--- /dev/null
+from neco.util import guid
+
+guid_gen = guid.GuidGenerator()
+
+class Attributes(object):
+ def __init__(self):
+ super(Attributes, self).__init__()
+ self._attributes = dict()
+
+ def __getattr__(self, name):
+ try:
+ return self._attributes[name]
+ except:
+ return super(Attributes, self).__getattribute__(name)
+
+ def __setattr__(self, name, value):
+ try:
+ if value == None:
+ old = self._attributes[name]
+ del self._attributes[name]
+ return old
+
+ self._attributes[name] = value
+ return value
+ except:
+ return super(Attributes, self).__setattr__(name, value)
+
+class Connections(object):
+ def __init__(self):
+ super(Connections, self).__init__()
+ self._connections = set()
+
+ def __getattr__(self, guid_or_label):
+ try:
+ for b in self._connections:
+ if guid_or_label in [b.guid, b.label]:
+ return b
+ except:
+ return super(Connections, self).__getattribute__(guid_or_label)
+
+class Box(object):
+ def __init__(self, label = None, guid = None):
+ super(Box, self).__init__()
+ self._guid = guid_gen.next(guid)
+ self._a = Attributes()
+ self._c = Connections()
+ self._tags = set()
+ self.label = label or self._guid
+
+ # Graphical information to draw box
+ self.x = 0
+ self.y = 0
+ self.width = 4
+ self.height = 4
+
+ @property
+ def tags(self):
+ return self._tags
+
+ @property
+ def attributes(self):
+ return self._a._attributes.keys()
+
+ @property
+ def a(self):
+ return self._a
+
+ @property
+ def c(self):
+ return self._c
+
+ @property
+ def guid(self):
+ return self._guid
+
+ @property
+ def connections(self):
+ return set(self._c._connections)
+
+ def tadd(self, name):
+ self._tags.add(name)
+
+ def tdel(self, name):
+ self._tags.remove(name)
+
+ def thas(self, name):
+ return name in self._tags
+
+ def connect(self, box, cascade = True):
+ self._c._connections.add(box)
+ if cascade:
+ box.connect(self, cascade = False)
+
+ def disconnect(self, box, cascade = True):
+ self._c._connections.remove(box)
+ if cascade:
+ box.disconnect(self, cascade = False)
+
+ def is_connected(self, box):
+ return box in self.connections
+
--- /dev/null
+
+def deploy(ec_weakref, xml):
+ from neco.util.parser import XMLParser
+
+ # parse xml and build topology graph
+ parser = XMLParser()
+ box = parser.from_xml(xml)
+
+ # instantiate resource boxes
+
+
+ # allocate physical resources
+ # configure physical resources
+ # allocate virtual resources
+ # configure virtual resources
+ # allocate software resources
+ # configure software resources
+ # schedule application start/stop
+
+
--- /dev/null
+import logging
+import os
+import sys
+import threading
+import time
+import weakref
+
+from neco.execution import scheduler, tasks
+from neco.util import guid
+from neco.util.timefuncs import strfnow, strfdiff, strfvalid
+from neco.util.parallel import ParallelRun
+
+_reschedule_delay = "0.1s"
+
+class ExperimentController(object):
+ def __init__(self, root_dir = "/tmp", loglevel = 'error'):
+ super(ExperimentController, self).__init__()
+ # root directory to store files
+ self._root_dir = root_dir
+
+ # generator of globally unique ids
+ self._guid_generator = guid.GuidGenerator()
+
+ # Scheduler
+ self._scheduler = scheduler.HeapScheduler()
+
+ # Tasks
+ self._tasks = dict()
+
+ # Resources
+ self._resources = dict()
+
+ # Event processing thread
+ self._cond = threading.Condition()
+ self._stop = False
+ self._thread = threading.Thread(target = self._process_tasks)
+ self._thread.start()
+
+ # Logging
+ self._logger = logging.getLogger("neco.execution.ec")
+ self._logger.setLevel(getattr(logging, loglevel.upper()))
+
+ def resource(self, guid):
+ return self._resources.get(guid)
+
+ def terminate(self):
+ self._stop = True
+ self._cond.acquire()
+ self._cond.notify()
+ self._cond.release()
+ if self._thread.is_alive():
+ self._thread.join()
+
+ def task_info(self, tid):
+ task = self._tasks.get(tid)
+ if not task:
+ return (None, None)
+ return (task.status, task.result)
+
+ def schedule(self, date, callback, args = None, kwargs = None):
+ """
+ date string containing execution time for the task.
+ It can be expressed as an absolute time, using
+ timestamp format, or as a relative time matching
+ ^\d+.\d+(h|m|s|ms|us)$
+
+ callback code to be executed for the task. Must be a
+ Python function, and receives args and kwargs
+ as arguments.
+ The callback will always be invoked passing a
+ week reference to the controller as first
+ argument.
+ The callback must return a (status, result)
+ tuple where status is one of :
+ task.TaskStatus.FAIL,
+ task.TaskStatus.SUCCESS,
+ task.TaskStatus.RETRY,
+ task.TaskStatus.RECYCLE
+ """
+ timestamp = strfvalid(date)
+
+ args = args or []
+ kwargs = kwargs or {}
+
+ task = tasks.Task(timestamp, callback, args, kwargs)
+ task = self._schedule(task)
+
+ self._tasks[task.id] = task
+
+ return task.id
+
+ ###########################################################################
+ #### Internal methods
+ ###########################################################################
+
+ def _schedule(self, task):
+ task = self._scheduler.schedule(task)
+
+ # Notify condition to wake up the processing thread
+ self._cond.acquire()
+ self._cond.notify()
+ self._cond.release()
+ return task
+
+ def _process_tasks(self):
+ runner = ParallelRun(maxthreads = 50)
+ runner.start()
+
+ try:
+ while not self._stop:
+ self._cond.acquire()
+ task = self._scheduler.next()
+ self._cond.release()
+
+ if not task:
+ # It there are not tasks in the tasks queue we need to
+ # wait until a call to schedule wakes us up
+ self._cond.acquire()
+ self._cond.wait()
+ self._cond.release()
+ else:
+ # If the task timestamp is in the future the thread needs to wait
+ # until time elapse or until another task is scheduled
+ now = strfnow()
+ if now < task.timestamp:
+ # Calculate time difference in seconds
+ timeout = strfdiff(task.timestamp, now)
+ # Re-schedule task with the same timestamp
+ self._scheduler.schedule(task)
+ # Sleep until timeout or until a new task awakes the condition
+ self._cond.acquire()
+ self._cond.wait(timeout)
+ self._cond.release()
+ else:
+ # Process tasks in parallel
+ runner.put(self._execute_task, task)
+ except:
+ import traceback
+ err = traceback.format_exc()
+ self._logger.error("Error while processing tasks in the EC: %s" % err)
+
+ def _execute_task(self, task):
+ # Invoke callback
+ ec = weakref.ref(self)
+ try:
+ (task.status, task.result) = task.callback(ec, *task.args, **task.kwargs)
+ except:
+ import traceback
+ err = traceback.format_exc()
+ self._logger.error("Error while executing event: %s" % err)
+
+ # task marked as FAIL
+ task.status = tasks.TaskStatus.FAIL
+ task.result = err
+
+ if task.status == tasks.TaskStatus.RETRY:
+ # Re-schedule same task in the near future
+ task.timestamp = strfvalid(_reschedule_delay)
+ self._schedule(task)
+ elif task.status == tasks.TaskStatus.RECYCLE:
+ # Re-schedule t in the future
+ timestamp = strfvalid(task.result)
+ self.schedule(timestamp, task.callback, task.args, task.kwargs)
+
--- /dev/null
+import logging
+import weakref
+
+def match_tags(box, all_tags, exact_tags):
+ """ returns True if box has required tags """
+ tall = set(all_tags)
+ texact = set(exact_tags)
+
+ if texact and box.connections == texact:
+ return True
+
+ if tall and tall.issubset(box.connections):
+ return True
+
+ return False
+
+def find_boxes(box, all_tags = None, exact_tags = None, max_depth = 1):
+ """ Look for the connected boxes with the required tags, doing breath-first
+ search, until max_depth ( max_depth = None will traverse the entire graph ).
+ """
+ if not all_tags and not exact_tags:
+ msg = "No matching criteria for resources."
+ raise RuntimeError(msg)
+
+ queue = set()
+ # enqueue (depth, box)
+ queue.add((0, box))
+
+ traversed = set()
+ traversed.add(box)
+
+ depth = 0
+
+ result = set()
+
+ while len(q) > 0:
+ (depth, a) = queue.pop()
+ if match_tags(a, all_tags, exact_tags):
+ result.add(a)
+
+ if not max_depth or depth <= max_depth:
+ depth += 1
+ for b in sorted(a.connections):
+ if b not in traversed:
+ traversed.add(b)
+ queue.add((depth, b))
+
+ return result
+
+class Resource(object):
+ def __init__(self, box, ec):
+ self._box = weakref.ref(box)
+ self._ec = weakref.ref(ec)
+
+ # Logging
+ loglevel = "debug"
+ self._logger = logging.getLogger("neco.execution.Resource.%s" %
+ self.box.guid)
+ self._logger.setLevel(getattr(logging, loglevel.upper()))
+
+ @property
+ def box(self):
+ return self._box()
+
+ @property
+ def ec(self):
+ return self._ec()
+
+ def find_resources(self, all_tags = None, exact_tags = None,
+ max_depth = 1):
+ resources = set()
+
+ boxes = find_boxes(self.box, all_tags, exact_tags, max_depth)
+ for b in boxes:
+ r = self.ec.resource(b.guid)
+ resources.add(r)
+
+ return resources
+
+class ResourceResolver(object):
+ def __init__(self):
+ pass
+
+
--- /dev/null
+import itertools
+import heapq
+
+class HeapScheduler(object):
+ """ This class is thread safe.
+ All calls to C Extensions are made atomic by the GIL in the CPython implementation.
+ heapq.heappush, heapq.heappop, and list access are therefore thread-safe """
+
+ def __init__(self):
+ super(HeapScheduler, self).__init__()
+ self._queue = list()
+ self._valid = set()
+ self._idgen = itertools.count(1)
+
+ def schedule(self, task):
+ if task.id == None:
+ task.id = self._idgen.next()
+ entry = (task.timestamp, task.id, task)
+ self._valid.add(task.id)
+ heapq.heappush(self._queue, entry)
+ return task
+
+ def remove(self, tid):
+ try:
+ self._valid.remove(tid)
+ except:
+ pass
+
+ def next(self):
+ while self._queue:
+ try:
+ timestamp, tid, task = heapq.heappop(self._queue)
+ if tid in self._valid:
+ self.remove(tid)
+ return task
+ except IndexError:
+ # heap empty
+ pass
+ return None
+
--- /dev/null
+NODE = "node"
+NETWORK_INTERFACE = "network interface"
+SWITCH = "switch"
+TUNNEL = "tunnel"
+APPLICATION = "application"
+CHANNEL = "channel"
+CPU = "cpu"
+
+IP4ADDRESS = "ipv4"
+IP6ADDRESS = "ipv6"
+MACADDRESS = "mac"
+IPADDRESS = "ip"
+ROUTE = "route"
+FLOW = "flow"
+
+WIRELESS = "wireless"
+ETHERNET = "ethernet"
+SIMULATED = "simulated"
+VIRTUAL = "virtual"
+MOBILE = "mobile"
+
--- /dev/null
+
+class TaskStatus:
+ NEW = 0
+ RETRY = 1
+ SUCCESS = 2
+ FAIL = 3
+ RECYCLE = 4
+
+class Task(object):
+ def __init__(self, timestamp, callback, args, kwargs):
+ self.id = None
+ self.timestamp = timestamp
+ self.callback = callback
+ self.args = args
+ self.kwargs = kwargs
+ self.result = None
+ self.status = TaskStatus.NEW
+
--- /dev/null
+from neco.execution import tags
+from neco.execution.resource import Resource
+
+import cStringIO
+import logging
+
+class Application(Resource):
+ def __init__(self, box, ec):
+ super(Application, self).__init__(box, ec)
+ self.command = None
+ self.pid = None
+ self.ppid = None
+ self.stdin = None
+ self.del_app_home = True
+ self.env = None
+
+ self.app_home = "${HOME}/app-%s" % self.box.guid
+ self._node = None
+
+ # Logging
+ loglevel = "debug"
+ self._logger = logging.getLogger("neco.resources.base.Application.%s" % self.guid)
+ self._logger.setLevel(getattr(logging, loglevel.upper()))
+
+ @property
+ def node(self):
+ if self._node:
+ return self._node
+
+ # XXX: What if it is connected to more than one node?
+ resources = self.find_resources(exact_tags = [tags.NODE])
+ self._node = resources[0] is len(resources) == 1 else None
+ return self._node
+
+ def make_app_home(self):
+ self.node.mkdir(self.app_home)
+
+ if self.stdin:
+ self.node.upload(self.stdin, os.path.join(self.app_home, 'stdin'))
+
+ def cleanup(self):
+ self.kill()
+
+ def run(self):
+ dst = os.path.join(self.app_home, "app.ssh")
+
+ # Create shell script with the command
+ # This way, complex commands and scripts can be ran seamlessly
+ # sync files
+ cmd = ""
+ if self.env:
+ for envkey, envvals in env.iteritems():
+ for envval in envvals:
+ cmd += 'export %s=%s\n' % (envkey, envval)
+
+ cmd += self.command
+ self.node.upload(cmd, dst)
+
+ cmd = "bash ./app.sh",
+ self.node.run(cmd, self.app_home)
+ self.pid, self.ppid = self.node.checkpid(self.app_home)
+
+ def status(self):
+ return self.node.status(self.pid, self.ppid)
+
+ def kill(self):
+ return self.node.kill(self.pid, self.ppid)
+
--- /dev/null
+from neco.execution.resource import Resource
+from neco.util.sshfuncs import eintr_retry, shell_escape, rexec, rcopy, \
+ rspawn, rcheck_pid, rstatus, rkill, RUNNING
+
+import cStringIO
+import logging
+import os.path
+
+class LinuxNode(Resource):
+ def __init__(self, box, ec):
+ super(LinuxNode, self).__init__(box, ec)
+ self.ip = None
+ self.host = None
+ self.user = None
+ self.port = None
+ self.identity_file = None
+ # packet management system - either yum or apt for now...
+ self._pm = None
+
+ # Logging
+ loglevel = "debug"
+ self._logger = logging.getLogger("neco.resources.base.LinuxNode.%s" %\
+ self.box.guid)
+ self._logger.setLevel(getattr(logging, loglevel.upper()))
+
+ @property
+ def pm(self):
+ if self._pm:
+ return self._pm
+
+ if (not (self.host or self.ip) or not self.user):
+ msg = "Can't resolve package management system. Insufficient data."
+ self._logger.error(msg)
+ raise RuntimeError(msg)
+
+ out = self.execute("cat /etc/issue")
+
+ if out.find("Fedora") == 0:
+ self._pm = "yum -y "
+ elif out.find("Debian") == 0 or out.find("Ubuntu") ==0:
+ self._pm = "apt-get -y "
+ else:
+ msg = "Can't resolve package management system. Unknown OS."
+ self._logger.error(msg)
+ raise RuntimeError(msg)
+
+ return self._pm
+
+ def execute(self, command,
+ agent = True,
+ sudo = False,
+ stdin = "",
+ tty = False,
+ timeout = None,
+ retry = 0,
+ err_on_timeout = True,
+ connect_timeout = 30,
+ persistent = True):
+ """ Notice that this invocation will block until the
+ execution finishes. If this is not the desired behavior,
+ use 'run' instead."""
+ (out, err), proc = eintr_retry(rexec)(
+ command,
+ self.host or self.ip,
+ self.user,
+ port = self.port,
+ agent = agent,
+ sudo = sudo,
+ stdin = stdin,
+ identity_file = self.identity_file,
+ tty = tty,
+ timeout = timeout,
+ retry = retry,
+ err_on_timeout = err_on_timeout,
+ connect_timeout = connect_timeout,
+ persistent = persistent)
+
+ if proc.wait():
+ msg = "Failed to execute command %s at node %s: %s %s" % \
+ (command, self.host or self.ip, out, err,)
+ self._logger.warn(msg)
+ raise RuntimeError(msg)
+
+ return out
+
+ def package_install(self, dependencies):
+ if not isinstance(dependencies, list):
+ dependencies = [dependencies]
+
+ for d in dependencies:
+ self.execute("%s install %s" % (self.pm, d), sudo = True,
+ tty2 = True)
+
+ def upload(self, src, dst):
+ if not os.path.isfile(src):
+ src = cStringIO.StringIO(src)
+
+ (out, err), proc = eintr_retry(rcopy)(
+ src, dst,
+ self.host or self.ip,
+ self.user,
+ port = self.port,
+ identity_file = self.identity_file)
+
+ if proc.wait():
+ msg = "Error uploading to %s got:\n%s%s" %\
+ (self.host or self.ip, out, err)
+ self._logger.error(msg)
+ raise RuntimeError(msg)
+
+ def is_alive(self, verbose = False):
+ (out, err), proc = eintr_retry(rexec)(
+ "echo 'ALIVE'",
+ self.host or self.ip,
+ self.user,
+ port = self.port,
+ identity_file = self.identity_file,
+ timeout = 60,
+ err_on_timeout = False,
+ persistent = False)
+
+ if proc.wait():
+ self._logger.warn("Unresponsive node %s got:\n%s%s", self.host, out, err)
+ return False
+ elif out.strip().startswith('ALIVE'):
+ return True
+ else:
+ self._logger.warn("Unresponsive node %s got:\n%s%s", self.host, out, err)
+ return False
+
+ def mkdir(self, path, clean = True):
+ if clean:
+ self.execute(
+ "rm -f %s" % shell_escape(path),
+ timeout = 120,
+ retry = 3
+ )
+
+ self.execute(
+ "mkdir -p %s" % shell_escape(path),
+ timeout = 120,
+ retry = 3
+ )
+
+ def run(self, command, home,
+ stdin = 'stdin',
+ stdout = 'stdout',
+ stderr = 'stderr',
+ sudo = False):
+ self._logger.info("Running %s", command)
+
+ # Start process in a "daemonized" way, using nohup and heavy
+ # stdin/out redirection to avoid connection issues
+ (out,err), proc = rspawn(
+ command,
+ pidfile = './pid',
+ home = home,
+ stdin = stdin if stdin is not None else '/dev/null',
+ stdout = stdout if stdout else '/dev/null',
+ stderr = stderr if stderr else '/dev/null',
+ sudo = sudo,
+ host = self.host,
+ user = self.user,
+ port = self.port,
+ identity_file = self.identity_file
+ )
+
+ if proc.wait():
+ raise RuntimeError, "Failed to set up application: %s %s" % (out,err,)
+
+ def checkpid(self, path):
+ # Get PID/PPID
+ # NOTE: wait a bit for the pidfile to be created
+ pidtuple = rcheck_pid(
+ os.path.join(path, 'pid'),
+ host = self.host,
+ user = self.user,
+ port = self.port,
+ identity_file = self.identity_file
+ )
+
+ return pidtuple
+
+ def status(self, pid, ppid):
+ status = rstatus(
+ pid, ppid,
+ host = self.host,
+ user = self.user,
+ port = self.port,
+ identity_file = self.identity_file
+ )
+
+ return status
+
+ def kill(self, pid, ppid, sudo = False):
+ status = self.status(pid, ppid)
+ if status == RUNNING:
+ # kill by ppid+pid - SIGTERM first, then try SIGKILL
+ rkill(
+ pid, ppid,
+ host = self.host,
+ user = self.user,
+ port = self.port,
+ sudo = sudo,
+ identity_file = self.identity_file
+ )
+
--- /dev/null
+# FIXME: This class is not thread-safe.
+# Should it be made thread-safe?
+class GuidGenerator(object):
+ def __init__(self):
+ self._guids = list()
+
+ def next(self, guid = None):
+ if guid != None:
+ return guid
+ else:
+ last_guid = 0 if len(self._guids) == 0 else self._guids[-1]
+ guid = last_guid + 1
+ self._guids.append(guid)
+ self._guids.sort()
+ return guid
+
--- /dev/null
+import threading
+import Queue
+import traceback
+import sys
+import os
+
+N_PROCS = None
+
+THREADCACHE = []
+THREADCACHEPID = None
+
+class WorkerThread(threading.Thread):
+ class QUIT:
+ pass
+ class REASSIGNED:
+ pass
+
+ def run(self):
+ while True:
+ task = self.queue.get()
+ if task is None:
+ self.done = True
+ self.queue.task_done()
+ continue
+ elif task is self.QUIT:
+ self.done = True
+ self.queue.task_done()
+ break
+ elif task is self.REASSIGNED:
+ continue
+ else:
+ self.done = False
+
+ try:
+ try:
+ callable, args, kwargs = task
+ rv = callable(*args, **kwargs)
+
+ if self.rvqueue is not None:
+ self.rvqueue.put(rv)
+ finally:
+ self.queue.task_done()
+ except:
+ traceback.print_exc(file = sys.stderr)
+ self.delayed_exceptions.append(sys.exc_info())
+
+ def waitdone(self):
+ while not self.queue.empty() and not self.done:
+ self.queue.join()
+
+ def attach(self, queue, rvqueue, delayed_exceptions):
+ if self.isAlive():
+ self.waitdone()
+ oldqueue = self.queue
+ self.queue = queue
+ self.rvqueue = rvqueue
+ self.delayed_exceptions = delayed_exceptions
+ if self.isAlive():
+ oldqueue.put(self.REASSIGNED)
+
+ def detach(self):
+ if self.isAlive():
+ self.waitdone()
+ self.oldqueue = self.queue
+ self.queue = Queue.Queue()
+ self.rvqueue = None
+ self.delayed_exceptions = []
+
+ def detach_signal(self):
+ if self.isAlive():
+ self.oldqueue.put(self.REASSIGNED)
+ del self.oldqueue
+
+ def quit(self):
+ self.queue.put(self.QUIT)
+ self.join()
+
+class ParallelMap(object):
+ def __init__(self, maxthreads = None, maxqueue = None, results = True):
+ global N_PROCS
+ global THREADCACHE
+ global THREADCACHEPID
+
+ if maxthreads is None:
+ if N_PROCS is None:
+ try:
+ f = open("/proc/cpuinfo")
+ try:
+ N_PROCS = sum("processor" in l for l in f)
+ finally:
+ f.close()
+ except:
+ pass
+ maxthreads = N_PROCS
+
+ if maxthreads is None:
+ maxthreads = 4
+
+ self.queue = Queue.Queue(maxqueue or 0)
+
+ self.delayed_exceptions = []
+
+ if results:
+ self.rvqueue = Queue.Queue()
+ else:
+ self.rvqueue = None
+
+ # Check threadcache
+ if THREADCACHEPID is None or THREADCACHEPID != os.getpid():
+ del THREADCACHE[:]
+ THREADCACHEPID = os.getpid()
+
+ self.workers = []
+ for x in xrange(maxthreads):
+ t = None
+ if THREADCACHE:
+ try:
+ t = THREADCACHE.pop()
+ except:
+ pass
+ if t is None:
+ t = WorkerThread()
+ t.setDaemon(True)
+ else:
+ t.waitdone()
+ t.attach(self.queue, self.rvqueue, self.delayed_exceptions)
+ self.workers.append(t)
+
+ def __del__(self):
+ self.destroy()
+
+ def destroy(self):
+ # Check threadcache
+ global THREADCACHE
+ global THREADCACHEPID
+ if THREADCACHEPID is None or THREADCACHEPID != os.getpid():
+ del THREADCACHE[:]
+ THREADCACHEPID = os.getpid()
+
+ for worker in self.workers:
+ worker.waitdone()
+ for worker in self.workers:
+ worker.detach()
+ for worker in self.workers:
+ worker.detach_signal()
+ THREADCACHE.extend(self.workers)
+ del self.workers[:]
+
+ def put(self, callable, *args, **kwargs):
+ self.queue.put((callable, args, kwargs))
+
+ def put_nowait(self, callable, *args, **kwargs):
+ self.queue.put_nowait((callable, args, kwargs))
+
+ def start(self):
+ for thread in self.workers:
+ if not thread.isAlive():
+ thread.start()
+
+ def join(self):
+ for thread in self.workers:
+ # That's the sync signal
+ self.queue.put(None)
+
+ self.queue.join()
+ for thread in self.workers:
+ thread.waitdone()
+
+ if self.delayed_exceptions:
+ typ,val,loc = self.delayed_exceptions[0]
+ del self.delayed_exceptions[:]
+ raise typ,val,loc
+
+ self.destroy()
+
+ def sync(self):
+ self.queue.join()
+ if self.delayed_exceptions:
+ typ,val,loc = self.delayed_exceptions[0]
+ del self.delayed_exceptions[:]
+ raise typ,val,loc
+
+ def __iter__(self):
+ if self.rvqueue is not None:
+ while True:
+ try:
+ yield self.rvqueue.get_nowait()
+ except Queue.Empty:
+ self.queue.join()
+ try:
+ yield self.rvqueue.get_nowait()
+ except Queue.Empty:
+ raise StopIteration
+
+
+class ParallelFilter(ParallelMap):
+ class _FILTERED:
+ pass
+
+ def __filter(self, x):
+ if self.filter_condition(x):
+ return x
+ else:
+ return self._FILTERED
+
+ def __init__(self, filter_condition, maxthreads = None, maxqueue = None):
+ super(ParallelFilter, self).__init__(maxthreads, maxqueue, True)
+ self.filter_condition = filter_condition
+
+ def put(self, what):
+ super(ParallelFilter, self).put(self.__filter, what)
+
+ def put_nowait(self, what):
+ super(ParallelFilter, self).put_nowait(self.__filter, what)
+
+ def __iter__(self):
+ for rv in super(ParallelFilter, self).__iter__():
+ if rv is not self._FILTERED:
+ yield rv
+
+class ParallelRun(ParallelMap):
+ def __run(self, x):
+ fn, args, kwargs = x
+ return fn(*args, **kwargs)
+
+ def __init__(self, maxthreads = None, maxqueue = None):
+ super(ParallelRun, self).__init__(maxthreads, maxqueue, True)
+
+ def put(self, what, *args, **kwargs):
+ super(ParallelRun, self).put(self.__run, (what, args, kwargs))
+
+ def put_nowait(self, what, *args, **kwargs):
+ super(ParallelRun, self).put_nowait(self.__filter, (what, args, kwargs))
+
+
+def pmap(mapping, iterable, maxthreads = None, maxqueue = None):
+ mapper = ParallelMap(
+ maxthreads = maxthreads,
+ maxqueue = maxqueue,
+ results = True)
+ mapper.start()
+ for elem in iterable:
+ mapper.put(elem)
+ rv = list(mapper)
+ mapper.join()
+ return rv
+
+def pfilter(condition, iterable, maxthreads = None, maxqueue = None):
+ filtrer = ParallelFilter(
+ condition,
+ maxthreads = maxthreads,
+ maxqueue = maxqueue)
+ filtrer.start()
+ for elem in iterable:
+ filtrer.put(elem)
+ rv = list(filtrer)
+ filtrer.join()
+ return rv
+
--- /dev/null
+from neco.design.box import Box
+
+from xml.dom import minidom
+import sys
+
+STRING = "string"
+BOOL = "bool"
+INTEGER = "integer"
+DOUBLE = "float"
+
+def xmlencode(s):
+ if isinstance(s, str):
+ rv = s.decode("latin1")
+ elif not isinstance(s, unicode):
+ rv = unicode(s)
+ else:
+ rv = s
+ return rv.replace(u'\x00',u'�')
+
+def xmldecode(s):
+ return s.replace(u'�',u'\x00').encode("utf8")
+
+def from_type(value):
+ if isinstance(value, str):
+ return STRING
+ if isinstance(value, bool):
+ return BOOL
+ if isinstance(value, int):
+ return INTEGER
+ if isinstance(value, float):
+ return DOUBLE
+
+def to_type(type, value):
+ if type == STRING:
+ return str(value)
+ if type == BOOL:
+ return value == "True"
+ if type == INTEGER:
+ return int(value)
+ if type == DOUBLE:
+ return float(value)
+
+class XMLParser(object):
+ def to_xml(self, box):
+ doc = minidom.Document()
+
+ root = doc.createElement("boxes")
+ doc.appendChild(root)
+
+ traversed = dict()
+ self._traverse_boxes(doc, traversed, box)
+
+ # Keep the order
+ for guid in sorted(traversed.keys()):
+ bnode = traversed[guid]
+ root.appendChild(bnode)
+
+ try:
+ xml = doc.toprettyxml(indent=" ", encoding="UTF-8")
+ except:
+ print >>sys.stderr, "Oops: generating XML from %s" % (data,)
+ raise
+
+ return xml
+
+ def _traverse_boxes(self, doc, traversed, box):
+ bnode = doc.createElement("box")
+ bnode.setAttribute("guid", xmlencode(box.guid))
+ bnode.setAttribute("label", xmlencode(box.label))
+ bnode.setAttribute("x", xmlencode(box.x))
+ bnode.setAttribute("y", xmlencode(box.y))
+ bnode.setAttribute("width", xmlencode(box.width))
+ bnode.setAttribute("height", xmlencode(box.height))
+
+ traversed[box.guid] = bnode
+
+ anode = doc.createElement("attributes")
+ bnode.appendChild(anode)
+ for name in sorted(box.attributes):
+ value = getattr(box.a, name)
+ aanode = doc.createElement("attribute")
+ anode.appendChild(aanode)
+ aanode.setAttribute("name", xmlencode(name))
+ aanode.setAttribute("value", xmlencode(value))
+ aanode.setAttribute("type", from_type(value))
+
+ tnode = doc.createElement("tags")
+ bnode.appendChild(tnode)
+ for tag in sorted(box.tags):
+ ttnode = doc.createElement("tag")
+ tnode.appendChild(ttnode)
+ ttnode.setAttribute("name", xmlencode(tag))
+
+ cnode = doc.createElement("connections")
+ bnode.appendChild(cnode)
+ for b in sorted(box.connections):
+ ccnode = doc.createElement("connection")
+ cnode.appendChild(ccnode)
+ ccnode.setAttribute("guid", xmlencode(b.guid))
+ if b.guid not in traversed:
+ self._traverse_boxes(doc, traversed, b)
+
+ def from_xml(self, xml):
+ doc = minidom.parseString(xml)
+ bnode_list = doc.getElementsByTagName("box")
+
+ boxes = dict()
+ connections = dict()
+
+ for bnode in bnode_list:
+ if bnode.nodeType == doc.ELEMENT_NODE:
+ guid = int(bnode.getAttribute("guid"))
+ label = xmldecode(bnode.getAttribute("label"))
+ x = float(bnode.getAttribute("x"))
+ y = float(bnode.getAttribute("y"))
+ height = float(bnode.getAttribute("height"))
+ width = float(bnode.getAttribute("width"))
+ box = Box(label=label, guid=guid)
+ boxes[guid] = box
+
+ anode_list = bnode.getElementsByTagName("attribute")
+ for anode in anode_list:
+ name = xmldecode(anode.getAttribute("name"))
+ value = xmldecode(anode.getAttribute("value"))
+ type = xmldecode(anode.getAttribute("type"))
+ value = to_type(type, value)
+ setattr(box.a, name, value)
+
+ tnode_list = bnode.getElementsByTagName("tag")
+ for tnode in tnode_list:
+ value = xmldecode(tnode.getAttribute("name"))
+ box.tadd(value)
+
+ connections[box] = set()
+ cnode_list = bnode.getElementsByTagName("connection")
+ for cnode in cnode_list:
+ guid = int(cnode.getAttribute("guid"))
+ connections[box].add(guid)
+
+ for box, conns in connections.iteritems():
+ for guid in conns:
+ b = boxes[guid]
+ box.connect(b)
+
+ return box
+
+
--- /dev/null
+import networkx
+import tempfile
+
+class Plotter(object):
+ def __init__(self, box):
+ self._graph = networkx.Graph(graph = dict(overlap = "false"))
+
+ traversed = set()
+ self._traverse_boxes(traversed, box)
+
+ def _traverse_boxes(self, traversed, box):
+ traversed.add(box.guid)
+
+ self._graph.add_node(box.label,
+ width = 50/72.0, # 1 inch = 72 points
+ height = 50/72.0,
+ shape = "circle")
+
+ for b in box.connections:
+ self._graph.add_edge(box.label, b.label)
+ if b.guid not in traversed:
+ self._traverse_boxes(traversed, b)
+
+ def plot(self):
+ f = tempfile.NamedTemporaryFile(delete=False)
+ networkx.draw_graphviz(self._graph)
+ networkx.write_dot(self._graph, f.name)
+ f.close()
+ return f.name
+
--- /dev/null
+import base64
+import errno
+import os
+import os.path
+import select
+import signal
+import socket
+import subprocess
+import time
+import traceback
+import re
+import tempfile
+import hashlib
+
+OPENSSH_HAS_PERSIST = None
+CONTROL_PATH = "yyyyy_ssh_control_path"
+
+if hasattr(os, "devnull"):
+ DEV_NULL = os.devnull
+else:
+ DEV_NULL = "/dev/null"
+
+SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
+
+hostbyname_cache = dict()
+
+class STDOUT:
+ """
+ Special value that when given to rspawn in stderr causes stderr to
+ redirect to whatever stdout was redirected to.
+ """
+
+class RUNNING:
+ """
+ Process is still running
+ """
+
+class FINISHED:
+ """
+ Process is finished
+ """
+
+class NOT_STARTED:
+ """
+ Process hasn't started running yet (this should be very rare)
+ """
+
+def openssh_has_persist():
+ """ The ssh_config options ControlMaster and ControlPersist allow to
+ reuse a same network connection for multiple ssh sessions. In this
+ way limitations on number of open ssh connections can be bypassed.
+ However, older versions of openSSH do not support this feature.
+ This function is used to determine if ssh connection persist features
+ can be used.
+ """
+ global OPENSSH_HAS_PERSIST
+ if OPENSSH_HAS_PERSIST is None:
+ proc = subprocess.Popen(["ssh","-v"],
+ stdout = subprocess.PIPE,
+ stderr = subprocess.STDOUT,
+ stdin = open("/dev/null","r") )
+ out,err = proc.communicate()
+ proc.wait()
+
+ vre = re.compile(r'OpenSSH_(?:[6-9]|5[.][8-9]|5[.][1-9][0-9]|[1-9][0-9]).*', re.I)
+ OPENSSH_HAS_PERSIST = bool(vre.match(out))
+ return OPENSSH_HAS_PERSIST
+
+def shell_escape(s):
+ """ Escapes strings so that they are safe to use as command-line
+ arguments """
+ if SHELL_SAFE.match(s):
+ # safe string - no escaping needed
+ return s
+ else:
+ # unsafe string - escape
+ def escp(c):
+ if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",'"'):
+ return c
+ else:
+ return "'$'\\x%02x''" % (ord(c),)
+ s = ''.join(map(escp,s))
+ return "'%s'" % (s,)
+
+def eintr_retry(func):
+ """Retries a function invocation when a EINTR occurs"""
+ import functools
+ @functools.wraps(func)
+ def rv(*p, **kw):
+ retry = kw.pop("_retry", False)
+ for i in xrange(0 if retry else 4):
+ try:
+ return func(*p, **kw)
+ except (select.error, socket.error), args:
+ if args[0] == errno.EINTR:
+ continue
+ else:
+ raise
+ except OSError, e:
+ if e.errno == errno.EINTR:
+ continue
+ else:
+ raise
+ else:
+ return func(*p, **kw)
+ return rv
+
+def make_connkey(user, host, port):
+ connkey = repr((user,host,port)).encode("base64").strip().replace('/','.')
+ if len(connkey) > 60:
+ connkey = hashlib.sha1(connkey).hexdigest()
+ return connkey
+
+def rexec(command, host, user,
+ port = None,
+ agent = True,
+ sudo = False,
+ stdin = "",
+ identity_file = None,
+ tty = False,
+ tty2 = False,
+ timeout = None,
+ retry = 0,
+ err_on_timeout = True,
+ connect_timeout = 30,
+ persistent = True):
+ """
+ Executes a remote command, returns ((stdout,stderr),process)
+ """
+ connkey = make_connkey(user, host, port)
+ args = ['ssh', '-C',
+ # Don't bother with localhost. Makes test easier
+ '-o', 'NoHostAuthenticationForLocalhost=yes',
+ # XXX: Possible security issue
+ # Avoid interactive requests to accept new host keys
+ '-o', 'StrictHostKeyChecking=no',
+ '-o', 'ConnectTimeout=%d' % (int(connect_timeout),),
+ '-o', 'ConnectionAttempts=3',
+ '-o', 'ServerAliveInterval=30',
+ '-o', 'TCPKeepAlive=yes',
+ '-l', user, host]
+
+ if persistent and openssh_has_persist():
+ args.extend([
+ '-o', 'ControlMaster=auto',
+ '-o', 'ControlPath=/tmp/%s_%s' % ( CONTROL_PATH, connkey, ),
+ '-o', 'ControlPersist=60' ])
+ if agent:
+ args.append('-A')
+ if port:
+ args.append('-p%d' % port)
+ if identity_file:
+ args.extend(('-i', identity_file))
+ if tty:
+ args.append('-t')
+ elif tty2:
+ args.append('-t')
+ args.append('-t')
+ if sudo:
+ command = "sudo " + command
+ args.append(command)
+
+ print " ".join(args)
+
+ for x in xrange(retry or 3):
+ # connects to the remote host and starts a remote connection
+ proc = subprocess.Popen(args,
+ stdout = subprocess.PIPE,
+ stdin = subprocess.PIPE,
+ stderr = subprocess.PIPE)
+
+ try:
+ out, err = _communicate(proc, stdin, timeout, err_on_timeout)
+ if proc.poll():
+ if err.strip().startswith('ssh: ') or err.strip().startswith('mux_client_hello_exchange: '):
+ # SSH error, can safely retry
+ continue
+ elif retry:
+ # Probably timed out or plain failed but can retry
+ continue
+ break
+ except RuntimeError,e:
+ if retry <= 0:
+ raise
+ retry -= 1
+
+ return ((out, err), proc)
+
+def rcopy(source, dest, host, user,
+ port = None,
+ agent = True,
+ recursive = False,
+ identity_file = None):
+ """
+ Copies file from/to remote sites.
+
+ Source and destination should have the user and host encoded
+ as per scp specs.
+
+ If source is a file object, a special mode will be used to
+ create the remote file with the same contents.
+
+ If dest is a file object, the remote file (source) will be
+ read and written into dest.
+
+ In these modes, recursive cannot be True.
+
+ Source can be a list of files to copy to a single destination,
+ in which case it is advised that the destination be a folder.
+ """
+
+ if isinstance(source, file) and source.tell() == 0:
+ source = source.name
+
+ elif hasattr(source, 'read'):
+ tmp = tempfile.NamedTemporaryFile()
+ while True:
+ buf = source.read(65536)
+ if buf:
+ tmp.write(buf)
+ else:
+ break
+ tmp.seek(0)
+ source = tmp.name
+
+ if isinstance(source, file) or isinstance(dest, file) \
+ or hasattr(source, 'read') or hasattr(dest, 'write'):
+ assert not recursive
+
+ connkey = make_connkey(user,host,port)
+ args = ['ssh', '-l', user, '-C',
+ # Don't bother with localhost. Makes test easier
+ '-o', 'NoHostAuthenticationForLocalhost=yes',
+ # XXX: Possible security issue
+ # Avoid interactive requests to accept new host keys
+ '-o', 'StrictHostKeyChecking=no',
+ '-o', 'ConnectTimeout=30',
+ '-o', 'ConnectionAttempts=3',
+ '-o', 'ServerAliveInterval=30',
+ '-o', 'TCPKeepAlive=yes',
+ host ]
+ if openssh_has_persist():
+ args.extend([
+ '-o', 'ControlMaster=auto',
+ '-o', 'ControlPath=/tmp/%s_%s' % ( CONTROL_PATH, connkey, ),
+ '-o', 'ControlPersist=60' ])
+ if port:
+ args.append('-P%d' % port)
+ if identity_file:
+ args.extend(('-i', identity_file))
+
+ if isinstance(source, file) or hasattr(source, 'read'):
+ args.append('cat > %s' % (shell_escape(dest),))
+ elif isinstance(dest, file) or hasattr(dest, 'write'):
+ args.append('cat %s' % (shell_escape(dest),))
+ else:
+ raise AssertionError, "Unreachable code reached! :-Q"
+
+ # connects to the remote host and starts a remote connection
+ if isinstance(source, file):
+ proc = subprocess.Popen(args,
+ stdout = open('/dev/null','w'),
+ stderr = subprocess.PIPE,
+ stdin = source)
+ err = proc.stderr.read()
+ eintr_retry(proc.wait)()
+ return ((None,err), proc)
+ elif isinstance(dest, file):
+ proc = subprocess.Popen(args,
+ stdout = open('/dev/null','w'),
+ stderr = subprocess.PIPE,
+ stdin = source)
+ err = proc.stderr.read()
+ eintr_retry(proc.wait)()
+ return ((None,err), proc)
+ elif hasattr(source, 'read'):
+ # file-like (but not file) source
+ proc = subprocess.Popen(args,
+ stdout = open('/dev/null','w'),
+ stderr = subprocess.PIPE,
+ stdin = subprocess.PIPE)
+
+ buf = None
+ err = []
+ while True:
+ if not buf:
+ buf = source.read(4096)
+ if not buf:
+ #EOF
+ break
+
+ rdrdy, wrdy, broken = select.select(
+ [proc.stderr],
+ [proc.stdin],
+ [proc.stderr,proc.stdin])
+
+ if proc.stderr in rdrdy:
+ # use os.read for fully unbuffered behavior
+ err.append(os.read(proc.stderr.fileno(), 4096))
+
+ if proc.stdin in wrdy:
+ proc.stdin.write(buf)
+ buf = None
+
+ if broken:
+ break
+ proc.stdin.close()
+ err.append(proc.stderr.read())
+
+ eintr_retry(proc.wait)()
+ return ((None,''.join(err)), proc)
+ elif hasattr(dest, 'write'):
+ # file-like (but not file) dest
+ proc = subprocess.Popen(args,
+ stdout = subprocess.PIPE,
+ stderr = subprocess.PIPE,
+ stdin = open('/dev/null','w'))
+
+ buf = None
+ err = []
+ while True:
+ rdrdy, wrdy, broken = select.select(
+ [proc.stderr, proc.stdout],
+ [],
+ [proc.stderr, proc.stdout])
+
+ if proc.stderr in rdrdy:
+ # use os.read for fully unbuffered behavior
+ err.append(os.read(proc.stderr.fileno(), 4096))
+
+ if proc.stdout in rdrdy:
+ # use os.read for fully unbuffered behavior
+ buf = os.read(proc.stdout.fileno(), 4096)
+ dest.write(buf)
+
+ if not buf:
+ #EOF
+ break
+
+ if broken:
+ break
+ err.append(proc.stderr.read())
+
+ eintr_retry(proc.wait)()
+ return ((None,''.join(err)), proc)
+ else:
+ raise AssertionError, "Unreachable code reached! :-Q"
+ else:
+ # plain scp
+ args = ['scp', '-q', '-p', '-C',
+ # Don't bother with localhost. Makes test easier
+ '-o', 'NoHostAuthenticationForLocalhost=yes',
+ # XXX: Possible security issue
+ # Avoid interactive requests to accept new host keys
+ '-o', 'StrictHostKeyChecking=no',
+ '-o', 'ConnectTimeout=30',
+ '-o', 'ConnectionAttempts=3',
+ '-o', 'ServerAliveInterval=30',
+ '-o', 'TCPKeepAlive=yes' ]
+
+ if port:
+ args.append('-P%d' % port)
+ if recursive:
+ args.append('-r')
+ if identity_file:
+ args.extend(('-i', identity_file))
+
+ if isinstance(source,list):
+ args.extend(source)
+ else:
+ if openssh_has_persist():
+ connkey = make_connkey(user,host,port)
+ args.extend([
+ '-o', 'ControlMaster=no',
+ '-o', 'ControlPath=/tmp/%s_%s' % ( CONTROL_PATH, connkey, )])
+ args.append(source)
+ args.append("%s@%s:%s" %(user, host, dest))
+
+ # connects to the remote host and starts a remote connection
+ proc = subprocess.Popen(args,
+ stdout = subprocess.PIPE,
+ stdin = subprocess.PIPE,
+ stderr = subprocess.PIPE)
+
+ comm = proc.communicate()
+ eintr_retry(proc.wait)()
+ return (comm, proc)
+
+def rspawn(command, pidfile,
+ stdout = '/dev/null',
+ stderr = STDOUT,
+ stdin = '/dev/null',
+ home = None,
+ create_home = False,
+ host = None,
+ port = None,
+ user = None,
+ agent = None,
+ sudo = False,
+ identity_file = None,
+ tty = False):
+ """
+ Spawn a remote command such that it will continue working asynchronously.
+
+ Parameters:
+ command: the command to run - it should be a single line.
+
+ pidfile: path of a (ideally unique to this task) pidfile for tracking the process.
+
+ stdout: path of a file to redirect standard output to - must be a string.
+ Defaults to /dev/null
+ stderr: path of a file to redirect standard error to - string or the special STDOUT value
+ to redirect to the same file stdout was redirected to. Defaults to STDOUT.
+ stdin: path of a file with input to be piped into the command's standard input
+
+ home: path of a folder to use as working directory - should exist, unless you specify create_home
+
+ create_home: if True, the home folder will be created first with mkdir -p
+
+ sudo: whether the command needs to be executed as root
+
+ host/port/user/agent/identity_file: see rexec
+
+ Returns:
+ (stdout, stderr), process
+
+ Of the spawning process, which only captures errors at spawning time.
+ Usually only useful for diagnostics.
+ """
+ # Start process in a "daemonized" way, using nohup and heavy
+ # stdin/out redirection to avoid connection issues
+ if stderr is STDOUT:
+ stderr = '&1'
+ else:
+ stderr = ' ' + stderr
+
+ daemon_command = '{ { %(command)s > %(stdout)s 2>%(stderr)s < %(stdin)s & } ; echo $! 1 > %(pidfile)s ; }' % {
+ 'command' : command,
+ 'pidfile' : shell_escape(pidfile),
+
+ 'stdout' : stdout,
+ 'stderr' : stderr,
+ 'stdin' : stdin,
+ }
+
+ cmd = "%(create)s%(gohome)s rm -f %(pidfile)s ; %(sudo)s nohup bash -c %(command)s " % {
+ 'command' : shell_escape(daemon_command),
+
+ 'sudo' : 'sudo -S' if sudo else '',
+
+ 'pidfile' : shell_escape(pidfile),
+ 'gohome' : 'cd %s ; ' % (shell_escape(home),) if home else '',
+ 'create' : 'mkdir -p %s ; ' % (shell_escape,) if create_home else '',
+ }
+
+ (out,err), proc = rexec(
+ cmd,
+ host = host,
+ port = port,
+ user = user,
+ agent = agent,
+ identity_file = identity_file,
+ tty = tty
+ )
+
+ if proc.wait():
+ raise RuntimeError, "Failed to set up application: %s %s" % (out,err,)
+
+ return (out,err),proc
+
+@eintr_retry
+def rcheck_pid(pidfile,
+ host = None,
+ port = None,
+ user = None,
+ agent = None,
+ identity_file = None):
+ """
+ Check the pidfile of a process spawned with remote_spawn.
+
+ Parameters:
+ pidfile: the pidfile passed to remote_span
+
+ host/port/user/agent/identity_file: see rexec
+
+ Returns:
+
+ A (pid, ppid) tuple useful for calling remote_status and remote_kill,
+ or None if the pidfile isn't valid yet (maybe the process is still starting).
+ """
+
+ (out,err),proc = rexec(
+ "cat %(pidfile)s" % {
+ 'pidfile' : pidfile,
+ },
+ host = host,
+ port = port,
+ user = user,
+ agent = agent,
+ identity_file = identity_file
+ )
+
+ if proc.wait():
+ return None
+
+ if out:
+ try:
+ return map(int,out.strip().split(' ',1))
+ except:
+ # Ignore, many ways to fail that don't matter that much
+ return None
+
+@eintr_retry
+def rstatus(pid, ppid,
+ host = None,
+ port = None,
+ user = None,
+ agent = None,
+ identity_file = None):
+ """
+ Check the status of a process spawned with remote_spawn.
+
+ Parameters:
+ pid/ppid: pid and parent-pid of the spawned process. See remote_check_pid
+
+ host/port/user/agent/identity_file: see rexec
+
+ Returns:
+
+ One of NOT_STARTED, RUNNING, FINISHED
+ """
+
+ (out,err),proc = rexec(
+ "ps --pid %(pid)d -o pid | grep -c %(pid)d ; true" % {
+ 'ppid' : ppid,
+ 'pid' : pid,
+ },
+ host = host,
+ port = port,
+ user = user,
+ agent = agent,
+ identity_file = identity_file
+ )
+
+ if proc.wait():
+ return NOT_STARTED
+
+ status = False
+ if out:
+ try:
+ status = bool(int(out.strip()))
+ except:
+ if out or err:
+ logging.warn("Error checking remote status:\n%s%s\n", out, err)
+ # Ignore, many ways to fail that don't matter that much
+ return NOT_STARTED
+ return RUNNING if status else FINISHED
+
+
+@eintr_retry
+def rkill(pid, ppid,
+ host = None,
+ port = None,
+ user = None,
+ agent = None,
+ sudo = False,
+ identity_file = None,
+ nowait = False):
+ """
+ Kill a process spawned with remote_spawn.
+
+ First tries a SIGTERM, and if the process does not end in 10 seconds,
+ it sends a SIGKILL.
+
+ Parameters:
+ pid/ppid: pid and parent-pid of the spawned process. See remote_check_pid
+
+ sudo: whether the command was run with sudo - careful killing like this.
+
+ host/port/user/agent/identity_file: see rexec
+
+ Returns:
+
+ Nothing, should have killed the process
+ """
+
+ if sudo:
+ subkill = "$(ps --ppid %(pid)d -o pid h)" % { 'pid' : pid }
+ else:
+ subkill = ""
+ cmd = """
+SUBKILL="%(subkill)s" ;
+%(sudo)s kill -- -%(pid)d $SUBKILL || /bin/true
+%(sudo)s kill %(pid)d $SUBKILL || /bin/true
+for x in 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 ; do
+ sleep 0.2
+ if [ `ps --pid %(pid)d -o pid | grep -c %(pid)d` == '0' ]; then
+ break
+ else
+ %(sudo)s kill -- -%(pid)d $SUBKILL || /bin/true
+ %(sudo)s kill %(pid)d $SUBKILL || /bin/true
+ fi
+ sleep 1.8
+done
+if [ `ps --pid %(pid)d -o pid | grep -c %(pid)d` != '0' ]; then
+ %(sudo)s kill -9 -- -%(pid)d $SUBKILL || /bin/true
+ %(sudo)s kill -9 %(pid)d $SUBKILL || /bin/true
+fi
+"""
+ if nowait:
+ cmd = "( %s ) >/dev/null 2>/dev/null </dev/null &" % (cmd,)
+
+ (out,err),proc = rexec(
+ cmd % {
+ 'ppid' : ppid,
+ 'pid' : pid,
+ 'sudo' : 'sudo -S' if sudo else '',
+ 'subkill' : subkill,
+ },
+ host = host,
+ port = port,
+ user = user,
+ agent = agent,
+ identity_file = identity_file
+ )
+
+ # wait, don't leave zombies around
+ proc.wait()
+
+# POSIX
+def _communicate(self, input, timeout=None, err_on_timeout=True):
+ read_set = []
+ write_set = []
+ stdout = None # Return
+ stderr = None # Return
+
+ killed = False
+
+ if timeout is not None:
+ timelimit = time.time() + timeout
+ killtime = timelimit + 4
+ bailtime = timelimit + 4
+
+ if self.stdin:
+ # Flush stdio buffer. This might block, if the user has
+ # been writing to .stdin in an uncontrolled fashion.
+ self.stdin.flush()
+ if input:
+ write_set.append(self.stdin)
+ else:
+ self.stdin.close()
+ if self.stdout:
+ read_set.append(self.stdout)
+ stdout = []
+ if self.stderr:
+ read_set.append(self.stderr)
+ stderr = []
+
+ input_offset = 0
+ while read_set or write_set:
+ if timeout is not None:
+ curtime = time.time()
+ if timeout is None or curtime > timelimit:
+ if curtime > bailtime:
+ break
+ elif curtime > killtime:
+ signum = signal.SIGKILL
+ else:
+ signum = signal.SIGTERM
+ # Lets kill it
+ os.kill(self.pid, signum)
+ select_timeout = 0.5
+ else:
+ select_timeout = timelimit - curtime + 0.1
+ else:
+ select_timeout = 1.0
+
+ if select_timeout > 1.0:
+ select_timeout = 1.0
+
+ try:
+ rlist, wlist, xlist = select.select(read_set, write_set, [], select_timeout)
+ except select.error,e:
+ if e[0] != 4:
+ raise
+ else:
+ continue
+
+ if not rlist and not wlist and not xlist and self.poll() is not None:
+ # timeout and process exited, say bye
+ break
+
+ if self.stdin in wlist:
+ # When select has indicated that the file is writable,
+ # we can write up to PIPE_BUF bytes without risk
+ # blocking. POSIX defines PIPE_BUF >= 512
+ bytes_written = os.write(self.stdin.fileno(), buffer(input, input_offset, 512))
+ input_offset += bytes_written
+ if input_offset >= len(input):
+ self.stdin.close()
+ write_set.remove(self.stdin)
+
+ if self.stdout in rlist:
+ data = os.read(self.stdout.fileno(), 1024)
+ if data == "":
+ self.stdout.close()
+ read_set.remove(self.stdout)
+ stdout.append(data)
+
+ if self.stderr in rlist:
+ data = os.read(self.stderr.fileno(), 1024)
+ if data == "":
+ self.stderr.close()
+ read_set.remove(self.stderr)
+ stderr.append(data)
+
+ # All data exchanged. Translate lists into strings.
+ if stdout is not None:
+ stdout = ''.join(stdout)
+ if stderr is not None:
+ stderr = ''.join(stderr)
+
+ # Translate newlines, if requested. We cannot let the file
+ # object do the translation: It is based on stdio, which is
+ # impossible to combine with select (unless forcing no
+ # buffering).
+ if self.universal_newlines and hasattr(file, 'newlines'):
+ if stdout:
+ stdout = self._translate_newlines(stdout)
+ if stderr:
+ stderr = self._translate_newlines(stderr)
+
+ if killed and err_on_timeout:
+ errcode = self.poll()
+ raise RuntimeError, ("Operation timed out", errcode, stdout, stderr)
+ else:
+ if killed:
+ self.poll()
+ else:
+ self.wait()
+ return (stdout, stderr)
+
--- /dev/null
+import datetime
+import re
+
+_strf = "%Y%m%d%H%M%S%f"
+_reabs = re.compile("^\d{20}$")
+_rerel = re.compile("^(?P<time>\d+(.\d+)?)(?P<units>h|m|s|ms|us)$")
+
+# Work around to fix "ImportError: Failed to import _strptime because the import lockis held by another thread."
+datetime.datetime.strptime("20120807124732894211", _strf)
+
+def strfnow():
+ """ Current date """
+ return datetime.datetime.now().strftime(_strf)
+
+def strfdiff(str1, str2):
+ # Time difference in seconds without ignoring miliseconds
+ d1 = datetime.datetime.strptime(str1, _strf)
+ d2 = datetime.datetime.strptime(str2, _strf)
+ diff = d1 - d2
+ ddays = diff.days * 86400
+ dus = round(diff.microseconds * 1.0e-06, 2)
+ ret = ddays + diff.seconds + dus
+ # delay must be > 0
+ return (ret or 0.001)
+
+def strfvalid(date):
+ """ User defined date to scheduler date """
+ if not date:
+ return strfnow()
+ if _reabs.match(date):
+ return date
+ m = _rerel.match(date)
+ if m:
+ time = float(m.groupdict()['time'])
+ units = m.groupdict()['units']
+ if units == 'h':
+ delta = datetime.timedelta(hours = time)
+ elif units == 'm':
+ delta = datetime.timedelta(minutes = time)
+ elif units == 's':
+ delta = datetime.timedelta(seconds = time)
+ elif units == 'ms':
+ delta = datetime.timedelta(microseconds = (time*1000))
+ else:
+ delta = datetime.timedelta(microseconds = time)
+ now = datetime.datetime.now()
+ d = now + delta
+ return d.strftime(_strf)
+ return None
+
--- /dev/null
+#!/usr/bin/env python
+
+from neco.design.box import Box
+
+import unittest
+
+class BoxDesignTestCase(unittest.TestCase):
+ def test_simple_design(self):
+ node1 = Box()
+ node2 = Box()
+
+ node1.label = "uno"
+ node2.label = "dos"
+
+ node1.tadd('nodo')
+ node2.tadd('mynodo')
+
+ self.assertEquals(node1.tags, set(['nodo']))
+ self.assertEquals(node2.tags, set(['mynodo']))
+
+ node1.a.hola = "chau"
+ node2.a.hello = "bye"
+
+ self.assertEquals(node1.a.hola, "chau")
+ self.assertEquals(node2.a.hello, "bye")
+
+ node1.connect(node2)
+
+ self.assertEquals(node1.connections, set([node2]))
+ self.assertEquals(node2.connections, set([node1]))
+ self.assertTrue(node1.is_connected(node2))
+ self.assertTrue(node2.is_connected(node1))
+
+ self.assertEquals(node1.c.dos.a.hello, "bye")
+ self.assertEquals(node2.c.uno.a.hola, "chau")
+
+ node2.disconnect(node1)
+
+ self.assertEquals(node1.connections, set([]))
+ self.assertEquals(node2.connections, set([]))
+ self.assertFalse(node1.is_connected(node2))
+ self.assertFalse(node2.is_connected(node1))
+
+ self.assertRaises(AttributeError, node1.c.dos)
+ self.assertRaises(AttributeError, node2.c.uno)
+
+
+if __name__ == '__main__':
+ unittest.main()
+
--- /dev/null
+#!/usr/bin/env python
+
+from neco.execution.ec import ExperimentController
+from neco.execution.tasks import TaskStatus
+
+import datetime
+import unittest
+
+class ExecuteControllersTestCase(unittest.TestCase):
+ def test_schedule_print(self):
+ def myfunc(ec_weakref):
+ result = id(ec_weakref())
+ return (TaskStatus.SUCCESS, result)
+
+ try:
+ ec = ExperimentController()
+
+ tid = ec.schedule("0s", myfunc)
+ status = None
+ while status != TaskStatus.SUCCESS:
+ (status, result) = ec.task_info(tid)
+
+ self.assertEquals(id(ec), result)
+ finally:
+ ec.terminate()
+
+ def test_schedule_date(self):
+ def get_time(ec_weakref):
+ timestamp = datetime.datetime.now()
+ return (TaskStatus.SUCCESS, timestamp)
+
+ try:
+ ec = ExperimentController()
+
+ schedule_time = datetime.datetime.now()
+
+ tid = ec.schedule("4s", get_time)
+ status = None
+ while status != TaskStatus.SUCCESS:
+ (status, execution_time) = ec.task_info(tid)
+
+ delta = execution_time - schedule_time
+ self.assertTrue(delta > datetime.timedelta(seconds=4))
+ self.assertTrue(delta < datetime.timedelta(seconds=5))
+
+ finally:
+ ec.terminate()
+
+
+if __name__ == '__main__':
+ unittest.main()
+
--- /dev/null
+#!/usr/bin/env python
+
+from neco.design.box import Box
+from neco.util.parser import XMLParser
+
+import unittest
+
+class BoxDesignTestCase(unittest.TestCase):
+ def test_to_xml(self):
+ node1 = Box()
+ node2 = Box()
+
+ node1.label = "node1"
+ node2.label = "node2"
+
+ node1.connect(node2)
+
+ node1.a.dog = "cat"
+ node1.a.one = "two"
+ node1.a.t = "q"
+
+ node1.c.node2.a.sky = "sea"
+ node2.a.bee = "honey"
+
+ node1.tadd("unooo")
+ node2.tadd("dosss")
+
+ parser = XMLParser()
+ xml = parser.to_xml(node1)
+
+ node = parser.from_xml(xml)
+ xml2 = parser.to_xml(node)
+
+ self.assertEquals(xml, xml2)
+
+if __name__ == '__main__':
+ unittest.main()
+
--- /dev/null
+#!/usr/bin/env python
+
+from neco.design.box import Box
+from neco.util.plot import Plotter
+
+import subprocess
+import unittest
+
+class BoxPlotTestCase(unittest.TestCase):
+ def xtest_plot(self):
+ node1 = Box(label="node1")
+ ping1 = Box(label="ping")
+ mobility1 = Box(label="mob1")
+ node2 = Box(label="node2")
+ mobility2 = Box(label="mob2")
+ iface1 = Box(label="iface1")
+ iface2 = Box(label="iface2")
+ channel = Box(label="chan")
+
+ node1.connect(ping1)
+ node1.connect(mobility1)
+ node1.connect(iface1)
+ channel.connect(iface1)
+ channel.connect(iface2)
+ node2.connect(iface2)
+ node2.connect(mobility2)
+
+ plotter = Plotter(node1)
+ fname = plotter.plot()
+ subprocess.call(["dot", "-Tps", fname, "-o", "%s.ps"%fname])
+ subprocess.call(["evince","%s.ps"%fname])
+
+if __name__ == '__main__':
+ unittest.main()
+