From 4f7387b9c2f8e427bff72a05dc547730937a242a Mon Sep 17 00:00:00 2001 From: Alina Quereilhac Date: Mon, 5 Nov 2012 19:47:41 +0100 Subject: [PATCH] NECo: A tool to design and run experiments on arbitrary platforms. --- Makefile | 62 +++ setup.cfg | 2 + setup.py | 24 + src/neco/__init__.py | 2 + src/neco/design/__init__.py | 0 src/neco/design/box.py | 101 ++++ src/neco/execution/__init__.py | 0 src/neco/execution/callbacks.py | 20 + src/neco/execution/ec.py | 164 ++++++ src/neco/execution/resource.py | 84 +++ src/neco/execution/scheduler.py | 40 ++ src/neco/execution/tags.py | 21 + src/neco/execution/tasks.py | 18 + src/neco/resources/__init__.py | 0 src/neco/resources/base/__init__.py | 0 src/neco/resources/base/application.py | 68 +++ src/neco/resources/base/linux_node.py | 207 +++++++ src/neco/resources/netns/__init__.py | 0 src/neco/resources/ns3/__init__.py | 0 src/neco/util/__init__.py | 0 src/neco/util/guid.py | 16 + src/neco/util/parallel.py | 259 +++++++++ src/neco/util/parser.py | 147 +++++ src/neco/util/plot.py | 30 + src/neco/util/sshfuncs.py | 742 +++++++++++++++++++++++++ src/neco/util/timefuncs.py | 50 ++ test/design/box.py | 50 ++ test/execution/ec.py | 52 ++ test/util/parser.py | 38 ++ test/util/plot.py | 35 ++ 30 files changed, 2232 insertions(+) create mode 100644 Makefile create mode 100644 setup.cfg create mode 100755 setup.py create mode 100644 src/neco/__init__.py create mode 100644 src/neco/design/__init__.py create mode 100644 src/neco/design/box.py create mode 100644 src/neco/execution/__init__.py create mode 100644 src/neco/execution/callbacks.py create mode 100644 src/neco/execution/ec.py create mode 100644 src/neco/execution/resource.py create mode 100644 src/neco/execution/scheduler.py create mode 100644 src/neco/execution/tags.py create mode 100644 src/neco/execution/tasks.py create mode 100644 src/neco/resources/__init__.py create mode 100644 src/neco/resources/base/__init__.py create mode 100644 src/neco/resources/base/application.py create mode 100644 src/neco/resources/base/linux_node.py create mode 100644 src/neco/resources/netns/__init__.py create mode 100644 src/neco/resources/ns3/__init__.py create mode 100644 src/neco/util/__init__.py create mode 100644 src/neco/util/guid.py create mode 100644 src/neco/util/parallel.py create mode 100644 src/neco/util/parser.py create mode 100644 src/neco/util/plot.py create mode 100644 src/neco/util/sshfuncs.py create mode 100644 src/neco/util/timefuncs.py create mode 100755 test/design/box.py create mode 100755 test/execution/ec.py create mode 100755 test/util/parser.py create mode 100755 test/util/plot.py diff --git a/Makefile b/Makefile new file mode 100644 index 00000000..cc56b5ef --- /dev/null +++ b/Makefile @@ -0,0 +1,62 @@ +SRCDIR = $(CURDIR)/src +TESTDIR = $(CURDIR)/test +BUILDDIR = $(CURDIR)/build +DISTDIR = $(CURDIR)/dist + +# stupid distutils, it's broken in so many ways +SUBBUILDDIR = $(shell python -c 'import distutils.util, sys; \ + print "lib.%s-%s" % (distutils.util.get_platform(), \ + sys.version[0:3])') +PYTHON25 := $(shell python -c 'import sys; v = sys.version_info; \ + print (1 if v[0] <= 2 and v[1] <= 5 else 0)') + +ifeq ($(PYTHON25),0) +BUILDDIR := $(BUILDDIR)/$(SUBBUILDDIR) +else +BUILDDIR := $(BUILDDIR)/lib +endif + +PYPATH = $(BUILDDIR):$(PYTHONPATH) +COVERAGE = $(or $(shell which coverage), $(shell which python-coverage), \ + coverage) + +all: + ./setup.py build + +install: all + ./setup.py install + +test: all + retval=0; \ + for i in `find "$(TESTDIR)" -iname '*.py' -perm -u+x -type f`; do \ + echo $$i; \ + PYTHONPATH="$(PYPATH)" $$i -v || retval=$$?; \ + done; exit $$retval + +coverage: all + rm -f .coverage + for i in `find "$(TESTDIR)" -perm -u+x -type f`; do \ + set -e; \ + PYTHONPATH="$(PYPATH)" $(COVERAGE) -x $$i -v; \ + done + $(COVERAGE) -c + $(COVERAGE) -r -m `find "$(BUILDDIR)" -name \\*.py -type f` + rm -f .coverage + +clean: + ./setup.py clean + rm -f `find -name \*.pyc` .coverage *.pcap + +distclean: clean + rm -rf "$(DISTDIR)" + +MANIFEST: + find . -path ./.hg\* -prune -o -path ./build -prune -o \ + -name \*.pyc -prune -o -name \*.swp -prune -o \ + -name MANIFEST -prune -o -type f -print | \ + sed 's#^\./##' | sort > MANIFEST + +dist: MANIFEST + ./setup.py sdist + +.PHONY: all clean distclean dist test coverage install MANIFEST diff --git a/setup.cfg b/setup.cfg new file mode 100644 index 00000000..e1b8322c --- /dev/null +++ b/setup.cfg @@ -0,0 +1,2 @@ +[clean] +all = 1 diff --git a/setup.py b/setup.py new file mode 100755 index 00000000..a8cf0290 --- /dev/null +++ b/setup.py @@ -0,0 +1,24 @@ +#!/usr/bin/env python +from distutils.core import setup +import sys + +setup( + name = "neco", + version = "0.01", + description = "Network Experiment Controller", + author = "Alina Quereilhac", + url = "", + license = "GPLv2", + platforms = "Linux", + packages = [ + "neco", + "neco.design", + "neco.execution", + "neco.resources", + "neco.resources.base", + "neco.resources.netns", + "neco.resources.ns3", + "neco.tags", + "neco.util"], + package_dir = {"": "src"}, + ) diff --git a/src/neco/__init__.py b/src/neco/__init__.py new file mode 100644 index 00000000..df19bd55 --- /dev/null +++ b/src/neco/__init__.py @@ -0,0 +1,2 @@ +import logging +logging.basicConfig() diff --git a/src/neco/design/__init__.py b/src/neco/design/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/neco/design/box.py b/src/neco/design/box.py new file mode 100644 index 00000000..121827aa --- /dev/null +++ b/src/neco/design/box.py @@ -0,0 +1,101 @@ +from neco.util import guid + +guid_gen = guid.GuidGenerator() + +class Attributes(object): + def __init__(self): + super(Attributes, self).__init__() + self._attributes = dict() + + def __getattr__(self, name): + try: + return self._attributes[name] + except: + return super(Attributes, self).__getattribute__(name) + + def __setattr__(self, name, value): + try: + if value == None: + old = self._attributes[name] + del self._attributes[name] + return old + + self._attributes[name] = value + return value + except: + return super(Attributes, self).__setattr__(name, value) + +class Connections(object): + def __init__(self): + super(Connections, self).__init__() + self._connections = set() + + def __getattr__(self, guid_or_label): + try: + for b in self._connections: + if guid_or_label in [b.guid, b.label]: + return b + except: + return super(Connections, self).__getattribute__(guid_or_label) + +class Box(object): + def __init__(self, label = None, guid = None): + super(Box, self).__init__() + self._guid = guid_gen.next(guid) + self._a = Attributes() + self._c = Connections() + self._tags = set() + self.label = label or self._guid + + # Graphical information to draw box + self.x = 0 + self.y = 0 + self.width = 4 + self.height = 4 + + @property + def tags(self): + return self._tags + + @property + def attributes(self): + return self._a._attributes.keys() + + @property + def a(self): + return self._a + + @property + def c(self): + return self._c + + @property + def guid(self): + return self._guid + + @property + def connections(self): + return set(self._c._connections) + + def tadd(self, name): + self._tags.add(name) + + def tdel(self, name): + self._tags.remove(name) + + def thas(self, name): + return name in self._tags + + def connect(self, box, cascade = True): + self._c._connections.add(box) + if cascade: + box.connect(self, cascade = False) + + def disconnect(self, box, cascade = True): + self._c._connections.remove(box) + if cascade: + box.disconnect(self, cascade = False) + + def is_connected(self, box): + return box in self.connections + diff --git a/src/neco/execution/__init__.py b/src/neco/execution/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/neco/execution/callbacks.py b/src/neco/execution/callbacks.py new file mode 100644 index 00000000..a30411f9 --- /dev/null +++ b/src/neco/execution/callbacks.py @@ -0,0 +1,20 @@ + +def deploy(ec_weakref, xml): + from neco.util.parser import XMLParser + + # parse xml and build topology graph + parser = XMLParser() + box = parser.from_xml(xml) + + # instantiate resource boxes + + + # allocate physical resources + # configure physical resources + # allocate virtual resources + # configure virtual resources + # allocate software resources + # configure software resources + # schedule application start/stop + + diff --git a/src/neco/execution/ec.py b/src/neco/execution/ec.py new file mode 100644 index 00000000..01a6aba2 --- /dev/null +++ b/src/neco/execution/ec.py @@ -0,0 +1,164 @@ +import logging +import os +import sys +import threading +import time +import weakref + +from neco.execution import scheduler, tasks +from neco.util import guid +from neco.util.timefuncs import strfnow, strfdiff, strfvalid +from neco.util.parallel import ParallelRun + +_reschedule_delay = "0.1s" + +class ExperimentController(object): + def __init__(self, root_dir = "/tmp", loglevel = 'error'): + super(ExperimentController, self).__init__() + # root directory to store files + self._root_dir = root_dir + + # generator of globally unique ids + self._guid_generator = guid.GuidGenerator() + + # Scheduler + self._scheduler = scheduler.HeapScheduler() + + # Tasks + self._tasks = dict() + + # Resources + self._resources = dict() + + # Event processing thread + self._cond = threading.Condition() + self._stop = False + self._thread = threading.Thread(target = self._process_tasks) + self._thread.start() + + # Logging + self._logger = logging.getLogger("neco.execution.ec") + self._logger.setLevel(getattr(logging, loglevel.upper())) + + def resource(self, guid): + return self._resources.get(guid) + + def terminate(self): + self._stop = True + self._cond.acquire() + self._cond.notify() + self._cond.release() + if self._thread.is_alive(): + self._thread.join() + + def task_info(self, tid): + task = self._tasks.get(tid) + if not task: + return (None, None) + return (task.status, task.result) + + def schedule(self, date, callback, args = None, kwargs = None): + """ + date string containing execution time for the task. + It can be expressed as an absolute time, using + timestamp format, or as a relative time matching + ^\d+.\d+(h|m|s|ms|us)$ + + callback code to be executed for the task. Must be a + Python function, and receives args and kwargs + as arguments. + The callback will always be invoked passing a + week reference to the controller as first + argument. + The callback must return a (status, result) + tuple where status is one of : + task.TaskStatus.FAIL, + task.TaskStatus.SUCCESS, + task.TaskStatus.RETRY, + task.TaskStatus.RECYCLE + """ + timestamp = strfvalid(date) + + args = args or [] + kwargs = kwargs or {} + + task = tasks.Task(timestamp, callback, args, kwargs) + task = self._schedule(task) + + self._tasks[task.id] = task + + return task.id + + ########################################################################### + #### Internal methods + ########################################################################### + + def _schedule(self, task): + task = self._scheduler.schedule(task) + + # Notify condition to wake up the processing thread + self._cond.acquire() + self._cond.notify() + self._cond.release() + return task + + def _process_tasks(self): + runner = ParallelRun(maxthreads = 50) + runner.start() + + try: + while not self._stop: + self._cond.acquire() + task = self._scheduler.next() + self._cond.release() + + if not task: + # It there are not tasks in the tasks queue we need to + # wait until a call to schedule wakes us up + self._cond.acquire() + self._cond.wait() + self._cond.release() + else: + # If the task timestamp is in the future the thread needs to wait + # until time elapse or until another task is scheduled + now = strfnow() + if now < task.timestamp: + # Calculate time difference in seconds + timeout = strfdiff(task.timestamp, now) + # Re-schedule task with the same timestamp + self._scheduler.schedule(task) + # Sleep until timeout or until a new task awakes the condition + self._cond.acquire() + self._cond.wait(timeout) + self._cond.release() + else: + # Process tasks in parallel + runner.put(self._execute_task, task) + except: + import traceback + err = traceback.format_exc() + self._logger.error("Error while processing tasks in the EC: %s" % err) + + def _execute_task(self, task): + # Invoke callback + ec = weakref.ref(self) + try: + (task.status, task.result) = task.callback(ec, *task.args, **task.kwargs) + except: + import traceback + err = traceback.format_exc() + self._logger.error("Error while executing event: %s" % err) + + # task marked as FAIL + task.status = tasks.TaskStatus.FAIL + task.result = err + + if task.status == tasks.TaskStatus.RETRY: + # Re-schedule same task in the near future + task.timestamp = strfvalid(_reschedule_delay) + self._schedule(task) + elif task.status == tasks.TaskStatus.RECYCLE: + # Re-schedule t in the future + timestamp = strfvalid(task.result) + self.schedule(timestamp, task.callback, task.args, task.kwargs) + diff --git a/src/neco/execution/resource.py b/src/neco/execution/resource.py new file mode 100644 index 00000000..30294d39 --- /dev/null +++ b/src/neco/execution/resource.py @@ -0,0 +1,84 @@ +import logging +import weakref + +def match_tags(box, all_tags, exact_tags): + """ returns True if box has required tags """ + tall = set(all_tags) + texact = set(exact_tags) + + if texact and box.connections == texact: + return True + + if tall and tall.issubset(box.connections): + return True + + return False + +def find_boxes(box, all_tags = None, exact_tags = None, max_depth = 1): + """ Look for the connected boxes with the required tags, doing breath-first + search, until max_depth ( max_depth = None will traverse the entire graph ). + """ + if not all_tags and not exact_tags: + msg = "No matching criteria for resources." + raise RuntimeError(msg) + + queue = set() + # enqueue (depth, box) + queue.add((0, box)) + + traversed = set() + traversed.add(box) + + depth = 0 + + result = set() + + while len(q) > 0: + (depth, a) = queue.pop() + if match_tags(a, all_tags, exact_tags): + result.add(a) + + if not max_depth or depth <= max_depth: + depth += 1 + for b in sorted(a.connections): + if b not in traversed: + traversed.add(b) + queue.add((depth, b)) + + return result + +class Resource(object): + def __init__(self, box, ec): + self._box = weakref.ref(box) + self._ec = weakref.ref(ec) + + # Logging + loglevel = "debug" + self._logger = logging.getLogger("neco.execution.Resource.%s" % + self.box.guid) + self._logger.setLevel(getattr(logging, loglevel.upper())) + + @property + def box(self): + return self._box() + + @property + def ec(self): + return self._ec() + + def find_resources(self, all_tags = None, exact_tags = None, + max_depth = 1): + resources = set() + + boxes = find_boxes(self.box, all_tags, exact_tags, max_depth) + for b in boxes: + r = self.ec.resource(b.guid) + resources.add(r) + + return resources + +class ResourceResolver(object): + def __init__(self): + pass + + diff --git a/src/neco/execution/scheduler.py b/src/neco/execution/scheduler.py new file mode 100644 index 00000000..202b711a --- /dev/null +++ b/src/neco/execution/scheduler.py @@ -0,0 +1,40 @@ +import itertools +import heapq + +class HeapScheduler(object): + """ This class is thread safe. + All calls to C Extensions are made atomic by the GIL in the CPython implementation. + heapq.heappush, heapq.heappop, and list access are therefore thread-safe """ + + def __init__(self): + super(HeapScheduler, self).__init__() + self._queue = list() + self._valid = set() + self._idgen = itertools.count(1) + + def schedule(self, task): + if task.id == None: + task.id = self._idgen.next() + entry = (task.timestamp, task.id, task) + self._valid.add(task.id) + heapq.heappush(self._queue, entry) + return task + + def remove(self, tid): + try: + self._valid.remove(tid) + except: + pass + + def next(self): + while self._queue: + try: + timestamp, tid, task = heapq.heappop(self._queue) + if tid in self._valid: + self.remove(tid) + return task + except IndexError: + # heap empty + pass + return None + diff --git a/src/neco/execution/tags.py b/src/neco/execution/tags.py new file mode 100644 index 00000000..244713ab --- /dev/null +++ b/src/neco/execution/tags.py @@ -0,0 +1,21 @@ +NODE = "node" +NETWORK_INTERFACE = "network interface" +SWITCH = "switch" +TUNNEL = "tunnel" +APPLICATION = "application" +CHANNEL = "channel" +CPU = "cpu" + +IP4ADDRESS = "ipv4" +IP6ADDRESS = "ipv6" +MACADDRESS = "mac" +IPADDRESS = "ip" +ROUTE = "route" +FLOW = "flow" + +WIRELESS = "wireless" +ETHERNET = "ethernet" +SIMULATED = "simulated" +VIRTUAL = "virtual" +MOBILE = "mobile" + diff --git a/src/neco/execution/tasks.py b/src/neco/execution/tasks.py new file mode 100644 index 00000000..ee24fe80 --- /dev/null +++ b/src/neco/execution/tasks.py @@ -0,0 +1,18 @@ + +class TaskStatus: + NEW = 0 + RETRY = 1 + SUCCESS = 2 + FAIL = 3 + RECYCLE = 4 + +class Task(object): + def __init__(self, timestamp, callback, args, kwargs): + self.id = None + self.timestamp = timestamp + self.callback = callback + self.args = args + self.kwargs = kwargs + self.result = None + self.status = TaskStatus.NEW + diff --git a/src/neco/resources/__init__.py b/src/neco/resources/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/neco/resources/base/__init__.py b/src/neco/resources/base/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/neco/resources/base/application.py b/src/neco/resources/base/application.py new file mode 100644 index 00000000..40e07ff5 --- /dev/null +++ b/src/neco/resources/base/application.py @@ -0,0 +1,68 @@ +from neco.execution import tags +from neco.execution.resource import Resource + +import cStringIO +import logging + +class Application(Resource): + def __init__(self, box, ec): + super(Application, self).__init__(box, ec) + self.command = None + self.pid = None + self.ppid = None + self.stdin = None + self.del_app_home = True + self.env = None + + self.app_home = "${HOME}/app-%s" % self.box.guid + self._node = None + + # Logging + loglevel = "debug" + self._logger = logging.getLogger("neco.resources.base.Application.%s" % self.guid) + self._logger.setLevel(getattr(logging, loglevel.upper())) + + @property + def node(self): + if self._node: + return self._node + + # XXX: What if it is connected to more than one node? + resources = self.find_resources(exact_tags = [tags.NODE]) + self._node = resources[0] is len(resources) == 1 else None + return self._node + + def make_app_home(self): + self.node.mkdir(self.app_home) + + if self.stdin: + self.node.upload(self.stdin, os.path.join(self.app_home, 'stdin')) + + def cleanup(self): + self.kill() + + def run(self): + dst = os.path.join(self.app_home, "app.ssh") + + # Create shell script with the command + # This way, complex commands and scripts can be ran seamlessly + # sync files + cmd = "" + if self.env: + for envkey, envvals in env.iteritems(): + for envval in envvals: + cmd += 'export %s=%s\n' % (envkey, envval) + + cmd += self.command + self.node.upload(cmd, dst) + + cmd = "bash ./app.sh", + self.node.run(cmd, self.app_home) + self.pid, self.ppid = self.node.checkpid(self.app_home) + + def status(self): + return self.node.status(self.pid, self.ppid) + + def kill(self): + return self.node.kill(self.pid, self.ppid) + diff --git a/src/neco/resources/base/linux_node.py b/src/neco/resources/base/linux_node.py new file mode 100644 index 00000000..b7d10cc6 --- /dev/null +++ b/src/neco/resources/base/linux_node.py @@ -0,0 +1,207 @@ +from neco.execution.resource import Resource +from neco.util.sshfuncs import eintr_retry, shell_escape, rexec, rcopy, \ + rspawn, rcheck_pid, rstatus, rkill, RUNNING + +import cStringIO +import logging +import os.path + +class LinuxNode(Resource): + def __init__(self, box, ec): + super(LinuxNode, self).__init__(box, ec) + self.ip = None + self.host = None + self.user = None + self.port = None + self.identity_file = None + # packet management system - either yum or apt for now... + self._pm = None + + # Logging + loglevel = "debug" + self._logger = logging.getLogger("neco.resources.base.LinuxNode.%s" %\ + self.box.guid) + self._logger.setLevel(getattr(logging, loglevel.upper())) + + @property + def pm(self): + if self._pm: + return self._pm + + if (not (self.host or self.ip) or not self.user): + msg = "Can't resolve package management system. Insufficient data." + self._logger.error(msg) + raise RuntimeError(msg) + + out = self.execute("cat /etc/issue") + + if out.find("Fedora") == 0: + self._pm = "yum -y " + elif out.find("Debian") == 0 or out.find("Ubuntu") ==0: + self._pm = "apt-get -y " + else: + msg = "Can't resolve package management system. Unknown OS." + self._logger.error(msg) + raise RuntimeError(msg) + + return self._pm + + def execute(self, command, + agent = True, + sudo = False, + stdin = "", + tty = False, + timeout = None, + retry = 0, + err_on_timeout = True, + connect_timeout = 30, + persistent = True): + """ Notice that this invocation will block until the + execution finishes. If this is not the desired behavior, + use 'run' instead.""" + (out, err), proc = eintr_retry(rexec)( + command, + self.host or self.ip, + self.user, + port = self.port, + agent = agent, + sudo = sudo, + stdin = stdin, + identity_file = self.identity_file, + tty = tty, + timeout = timeout, + retry = retry, + err_on_timeout = err_on_timeout, + connect_timeout = connect_timeout, + persistent = persistent) + + if proc.wait(): + msg = "Failed to execute command %s at node %s: %s %s" % \ + (command, self.host or self.ip, out, err,) + self._logger.warn(msg) + raise RuntimeError(msg) + + return out + + def package_install(self, dependencies): + if not isinstance(dependencies, list): + dependencies = [dependencies] + + for d in dependencies: + self.execute("%s install %s" % (self.pm, d), sudo = True, + tty2 = True) + + def upload(self, src, dst): + if not os.path.isfile(src): + src = cStringIO.StringIO(src) + + (out, err), proc = eintr_retry(rcopy)( + src, dst, + self.host or self.ip, + self.user, + port = self.port, + identity_file = self.identity_file) + + if proc.wait(): + msg = "Error uploading to %s got:\n%s%s" %\ + (self.host or self.ip, out, err) + self._logger.error(msg) + raise RuntimeError(msg) + + def is_alive(self, verbose = False): + (out, err), proc = eintr_retry(rexec)( + "echo 'ALIVE'", + self.host or self.ip, + self.user, + port = self.port, + identity_file = self.identity_file, + timeout = 60, + err_on_timeout = False, + persistent = False) + + if proc.wait(): + self._logger.warn("Unresponsive node %s got:\n%s%s", self.host, out, err) + return False + elif out.strip().startswith('ALIVE'): + return True + else: + self._logger.warn("Unresponsive node %s got:\n%s%s", self.host, out, err) + return False + + def mkdir(self, path, clean = True): + if clean: + self.execute( + "rm -f %s" % shell_escape(path), + timeout = 120, + retry = 3 + ) + + self.execute( + "mkdir -p %s" % shell_escape(path), + timeout = 120, + retry = 3 + ) + + def run(self, command, home, + stdin = 'stdin', + stdout = 'stdout', + stderr = 'stderr', + sudo = False): + self._logger.info("Running %s", command) + + # Start process in a "daemonized" way, using nohup and heavy + # stdin/out redirection to avoid connection issues + (out,err), proc = rspawn( + command, + pidfile = './pid', + home = home, + stdin = stdin if stdin is not None else '/dev/null', + stdout = stdout if stdout else '/dev/null', + stderr = stderr if stderr else '/dev/null', + sudo = sudo, + host = self.host, + user = self.user, + port = self.port, + identity_file = self.identity_file + ) + + if proc.wait(): + raise RuntimeError, "Failed to set up application: %s %s" % (out,err,) + + def checkpid(self, path): + # Get PID/PPID + # NOTE: wait a bit for the pidfile to be created + pidtuple = rcheck_pid( + os.path.join(path, 'pid'), + host = self.host, + user = self.user, + port = self.port, + identity_file = self.identity_file + ) + + return pidtuple + + def status(self, pid, ppid): + status = rstatus( + pid, ppid, + host = self.host, + user = self.user, + port = self.port, + identity_file = self.identity_file + ) + + return status + + def kill(self, pid, ppid, sudo = False): + status = self.status(pid, ppid) + if status == RUNNING: + # kill by ppid+pid - SIGTERM first, then try SIGKILL + rkill( + pid, ppid, + host = self.host, + user = self.user, + port = self.port, + sudo = sudo, + identity_file = self.identity_file + ) + diff --git a/src/neco/resources/netns/__init__.py b/src/neco/resources/netns/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/neco/resources/ns3/__init__.py b/src/neco/resources/ns3/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/neco/util/__init__.py b/src/neco/util/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/neco/util/guid.py b/src/neco/util/guid.py new file mode 100644 index 00000000..913e6ad1 --- /dev/null +++ b/src/neco/util/guid.py @@ -0,0 +1,16 @@ +# FIXME: This class is not thread-safe. +# Should it be made thread-safe? +class GuidGenerator(object): + def __init__(self): + self._guids = list() + + def next(self, guid = None): + if guid != None: + return guid + else: + last_guid = 0 if len(self._guids) == 0 else self._guids[-1] + guid = last_guid + 1 + self._guids.append(guid) + self._guids.sort() + return guid + diff --git a/src/neco/util/parallel.py b/src/neco/util/parallel.py new file mode 100644 index 00000000..8dc39a76 --- /dev/null +++ b/src/neco/util/parallel.py @@ -0,0 +1,259 @@ +import threading +import Queue +import traceback +import sys +import os + +N_PROCS = None + +THREADCACHE = [] +THREADCACHEPID = None + +class WorkerThread(threading.Thread): + class QUIT: + pass + class REASSIGNED: + pass + + def run(self): + while True: + task = self.queue.get() + if task is None: + self.done = True + self.queue.task_done() + continue + elif task is self.QUIT: + self.done = True + self.queue.task_done() + break + elif task is self.REASSIGNED: + continue + else: + self.done = False + + try: + try: + callable, args, kwargs = task + rv = callable(*args, **kwargs) + + if self.rvqueue is not None: + self.rvqueue.put(rv) + finally: + self.queue.task_done() + except: + traceback.print_exc(file = sys.stderr) + self.delayed_exceptions.append(sys.exc_info()) + + def waitdone(self): + while not self.queue.empty() and not self.done: + self.queue.join() + + def attach(self, queue, rvqueue, delayed_exceptions): + if self.isAlive(): + self.waitdone() + oldqueue = self.queue + self.queue = queue + self.rvqueue = rvqueue + self.delayed_exceptions = delayed_exceptions + if self.isAlive(): + oldqueue.put(self.REASSIGNED) + + def detach(self): + if self.isAlive(): + self.waitdone() + self.oldqueue = self.queue + self.queue = Queue.Queue() + self.rvqueue = None + self.delayed_exceptions = [] + + def detach_signal(self): + if self.isAlive(): + self.oldqueue.put(self.REASSIGNED) + del self.oldqueue + + def quit(self): + self.queue.put(self.QUIT) + self.join() + +class ParallelMap(object): + def __init__(self, maxthreads = None, maxqueue = None, results = True): + global N_PROCS + global THREADCACHE + global THREADCACHEPID + + if maxthreads is None: + if N_PROCS is None: + try: + f = open("/proc/cpuinfo") + try: + N_PROCS = sum("processor" in l for l in f) + finally: + f.close() + except: + pass + maxthreads = N_PROCS + + if maxthreads is None: + maxthreads = 4 + + self.queue = Queue.Queue(maxqueue or 0) + + self.delayed_exceptions = [] + + if results: + self.rvqueue = Queue.Queue() + else: + self.rvqueue = None + + # Check threadcache + if THREADCACHEPID is None or THREADCACHEPID != os.getpid(): + del THREADCACHE[:] + THREADCACHEPID = os.getpid() + + self.workers = [] + for x in xrange(maxthreads): + t = None + if THREADCACHE: + try: + t = THREADCACHE.pop() + except: + pass + if t is None: + t = WorkerThread() + t.setDaemon(True) + else: + t.waitdone() + t.attach(self.queue, self.rvqueue, self.delayed_exceptions) + self.workers.append(t) + + def __del__(self): + self.destroy() + + def destroy(self): + # Check threadcache + global THREADCACHE + global THREADCACHEPID + if THREADCACHEPID is None or THREADCACHEPID != os.getpid(): + del THREADCACHE[:] + THREADCACHEPID = os.getpid() + + for worker in self.workers: + worker.waitdone() + for worker in self.workers: + worker.detach() + for worker in self.workers: + worker.detach_signal() + THREADCACHE.extend(self.workers) + del self.workers[:] + + def put(self, callable, *args, **kwargs): + self.queue.put((callable, args, kwargs)) + + def put_nowait(self, callable, *args, **kwargs): + self.queue.put_nowait((callable, args, kwargs)) + + def start(self): + for thread in self.workers: + if not thread.isAlive(): + thread.start() + + def join(self): + for thread in self.workers: + # That's the sync signal + self.queue.put(None) + + self.queue.join() + for thread in self.workers: + thread.waitdone() + + if self.delayed_exceptions: + typ,val,loc = self.delayed_exceptions[0] + del self.delayed_exceptions[:] + raise typ,val,loc + + self.destroy() + + def sync(self): + self.queue.join() + if self.delayed_exceptions: + typ,val,loc = self.delayed_exceptions[0] + del self.delayed_exceptions[:] + raise typ,val,loc + + def __iter__(self): + if self.rvqueue is not None: + while True: + try: + yield self.rvqueue.get_nowait() + except Queue.Empty: + self.queue.join() + try: + yield self.rvqueue.get_nowait() + except Queue.Empty: + raise StopIteration + + +class ParallelFilter(ParallelMap): + class _FILTERED: + pass + + def __filter(self, x): + if self.filter_condition(x): + return x + else: + return self._FILTERED + + def __init__(self, filter_condition, maxthreads = None, maxqueue = None): + super(ParallelFilter, self).__init__(maxthreads, maxqueue, True) + self.filter_condition = filter_condition + + def put(self, what): + super(ParallelFilter, self).put(self.__filter, what) + + def put_nowait(self, what): + super(ParallelFilter, self).put_nowait(self.__filter, what) + + def __iter__(self): + for rv in super(ParallelFilter, self).__iter__(): + if rv is not self._FILTERED: + yield rv + +class ParallelRun(ParallelMap): + def __run(self, x): + fn, args, kwargs = x + return fn(*args, **kwargs) + + def __init__(self, maxthreads = None, maxqueue = None): + super(ParallelRun, self).__init__(maxthreads, maxqueue, True) + + def put(self, what, *args, **kwargs): + super(ParallelRun, self).put(self.__run, (what, args, kwargs)) + + def put_nowait(self, what, *args, **kwargs): + super(ParallelRun, self).put_nowait(self.__filter, (what, args, kwargs)) + + +def pmap(mapping, iterable, maxthreads = None, maxqueue = None): + mapper = ParallelMap( + maxthreads = maxthreads, + maxqueue = maxqueue, + results = True) + mapper.start() + for elem in iterable: + mapper.put(elem) + rv = list(mapper) + mapper.join() + return rv + +def pfilter(condition, iterable, maxthreads = None, maxqueue = None): + filtrer = ParallelFilter( + condition, + maxthreads = maxthreads, + maxqueue = maxqueue) + filtrer.start() + for elem in iterable: + filtrer.put(elem) + rv = list(filtrer) + filtrer.join() + return rv + diff --git a/src/neco/util/parser.py b/src/neco/util/parser.py new file mode 100644 index 00000000..e2c54783 --- /dev/null +++ b/src/neco/util/parser.py @@ -0,0 +1,147 @@ +from neco.design.box import Box + +from xml.dom import minidom +import sys + +STRING = "string" +BOOL = "bool" +INTEGER = "integer" +DOUBLE = "float" + +def xmlencode(s): + if isinstance(s, str): + rv = s.decode("latin1") + elif not isinstance(s, unicode): + rv = unicode(s) + else: + rv = s + return rv.replace(u'\x00',u'�') + +def xmldecode(s): + return s.replace(u'�',u'\x00').encode("utf8") + +def from_type(value): + if isinstance(value, str): + return STRING + if isinstance(value, bool): + return BOOL + if isinstance(value, int): + return INTEGER + if isinstance(value, float): + return DOUBLE + +def to_type(type, value): + if type == STRING: + return str(value) + if type == BOOL: + return value == "True" + if type == INTEGER: + return int(value) + if type == DOUBLE: + return float(value) + +class XMLParser(object): + def to_xml(self, box): + doc = minidom.Document() + + root = doc.createElement("boxes") + doc.appendChild(root) + + traversed = dict() + self._traverse_boxes(doc, traversed, box) + + # Keep the order + for guid in sorted(traversed.keys()): + bnode = traversed[guid] + root.appendChild(bnode) + + try: + xml = doc.toprettyxml(indent=" ", encoding="UTF-8") + except: + print >>sys.stderr, "Oops: generating XML from %s" % (data,) + raise + + return xml + + def _traverse_boxes(self, doc, traversed, box): + bnode = doc.createElement("box") + bnode.setAttribute("guid", xmlencode(box.guid)) + bnode.setAttribute("label", xmlencode(box.label)) + bnode.setAttribute("x", xmlencode(box.x)) + bnode.setAttribute("y", xmlencode(box.y)) + bnode.setAttribute("width", xmlencode(box.width)) + bnode.setAttribute("height", xmlencode(box.height)) + + traversed[box.guid] = bnode + + anode = doc.createElement("attributes") + bnode.appendChild(anode) + for name in sorted(box.attributes): + value = getattr(box.a, name) + aanode = doc.createElement("attribute") + anode.appendChild(aanode) + aanode.setAttribute("name", xmlencode(name)) + aanode.setAttribute("value", xmlencode(value)) + aanode.setAttribute("type", from_type(value)) + + tnode = doc.createElement("tags") + bnode.appendChild(tnode) + for tag in sorted(box.tags): + ttnode = doc.createElement("tag") + tnode.appendChild(ttnode) + ttnode.setAttribute("name", xmlencode(tag)) + + cnode = doc.createElement("connections") + bnode.appendChild(cnode) + for b in sorted(box.connections): + ccnode = doc.createElement("connection") + cnode.appendChild(ccnode) + ccnode.setAttribute("guid", xmlencode(b.guid)) + if b.guid not in traversed: + self._traverse_boxes(doc, traversed, b) + + def from_xml(self, xml): + doc = minidom.parseString(xml) + bnode_list = doc.getElementsByTagName("box") + + boxes = dict() + connections = dict() + + for bnode in bnode_list: + if bnode.nodeType == doc.ELEMENT_NODE: + guid = int(bnode.getAttribute("guid")) + label = xmldecode(bnode.getAttribute("label")) + x = float(bnode.getAttribute("x")) + y = float(bnode.getAttribute("y")) + height = float(bnode.getAttribute("height")) + width = float(bnode.getAttribute("width")) + box = Box(label=label, guid=guid) + boxes[guid] = box + + anode_list = bnode.getElementsByTagName("attribute") + for anode in anode_list: + name = xmldecode(anode.getAttribute("name")) + value = xmldecode(anode.getAttribute("value")) + type = xmldecode(anode.getAttribute("type")) + value = to_type(type, value) + setattr(box.a, name, value) + + tnode_list = bnode.getElementsByTagName("tag") + for tnode in tnode_list: + value = xmldecode(tnode.getAttribute("name")) + box.tadd(value) + + connections[box] = set() + cnode_list = bnode.getElementsByTagName("connection") + for cnode in cnode_list: + guid = int(cnode.getAttribute("guid")) + connections[box].add(guid) + + for box, conns in connections.iteritems(): + for guid in conns: + b = boxes[guid] + box.connect(b) + + return box + + diff --git a/src/neco/util/plot.py b/src/neco/util/plot.py new file mode 100644 index 00000000..29596393 --- /dev/null +++ b/src/neco/util/plot.py @@ -0,0 +1,30 @@ +import networkx +import tempfile + +class Plotter(object): + def __init__(self, box): + self._graph = networkx.Graph(graph = dict(overlap = "false")) + + traversed = set() + self._traverse_boxes(traversed, box) + + def _traverse_boxes(self, traversed, box): + traversed.add(box.guid) + + self._graph.add_node(box.label, + width = 50/72.0, # 1 inch = 72 points + height = 50/72.0, + shape = "circle") + + for b in box.connections: + self._graph.add_edge(box.label, b.label) + if b.guid not in traversed: + self._traverse_boxes(traversed, b) + + def plot(self): + f = tempfile.NamedTemporaryFile(delete=False) + networkx.draw_graphviz(self._graph) + networkx.write_dot(self._graph, f.name) + f.close() + return f.name + diff --git a/src/neco/util/sshfuncs.py b/src/neco/util/sshfuncs.py new file mode 100644 index 00000000..77698ca2 --- /dev/null +++ b/src/neco/util/sshfuncs.py @@ -0,0 +1,742 @@ +import base64 +import errno +import os +import os.path +import select +import signal +import socket +import subprocess +import time +import traceback +import re +import tempfile +import hashlib + +OPENSSH_HAS_PERSIST = None +CONTROL_PATH = "yyyyy_ssh_control_path" + +if hasattr(os, "devnull"): + DEV_NULL = os.devnull +else: + DEV_NULL = "/dev/null" + +SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$') + +hostbyname_cache = dict() + +class STDOUT: + """ + Special value that when given to rspawn in stderr causes stderr to + redirect to whatever stdout was redirected to. + """ + +class RUNNING: + """ + Process is still running + """ + +class FINISHED: + """ + Process is finished + """ + +class NOT_STARTED: + """ + Process hasn't started running yet (this should be very rare) + """ + +def openssh_has_persist(): + """ The ssh_config options ControlMaster and ControlPersist allow to + reuse a same network connection for multiple ssh sessions. In this + way limitations on number of open ssh connections can be bypassed. + However, older versions of openSSH do not support this feature. + This function is used to determine if ssh connection persist features + can be used. + """ + global OPENSSH_HAS_PERSIST + if OPENSSH_HAS_PERSIST is None: + proc = subprocess.Popen(["ssh","-v"], + stdout = subprocess.PIPE, + stderr = subprocess.STDOUT, + stdin = open("/dev/null","r") ) + out,err = proc.communicate() + proc.wait() + + vre = re.compile(r'OpenSSH_(?:[6-9]|5[.][8-9]|5[.][1-9][0-9]|[1-9][0-9]).*', re.I) + OPENSSH_HAS_PERSIST = bool(vre.match(out)) + return OPENSSH_HAS_PERSIST + +def shell_escape(s): + """ Escapes strings so that they are safe to use as command-line + arguments """ + if SHELL_SAFE.match(s): + # safe string - no escaping needed + return s + else: + # unsafe string - escape + def escp(c): + if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",'"'): + return c + else: + return "'$'\\x%02x''" % (ord(c),) + s = ''.join(map(escp,s)) + return "'%s'" % (s,) + +def eintr_retry(func): + """Retries a function invocation when a EINTR occurs""" + import functools + @functools.wraps(func) + def rv(*p, **kw): + retry = kw.pop("_retry", False) + for i in xrange(0 if retry else 4): + try: + return func(*p, **kw) + except (select.error, socket.error), args: + if args[0] == errno.EINTR: + continue + else: + raise + except OSError, e: + if e.errno == errno.EINTR: + continue + else: + raise + else: + return func(*p, **kw) + return rv + +def make_connkey(user, host, port): + connkey = repr((user,host,port)).encode("base64").strip().replace('/','.') + if len(connkey) > 60: + connkey = hashlib.sha1(connkey).hexdigest() + return connkey + +def rexec(command, host, user, + port = None, + agent = True, + sudo = False, + stdin = "", + identity_file = None, + tty = False, + tty2 = False, + timeout = None, + retry = 0, + err_on_timeout = True, + connect_timeout = 30, + persistent = True): + """ + Executes a remote command, returns ((stdout,stderr),process) + """ + connkey = make_connkey(user, host, port) + args = ['ssh', '-C', + # Don't bother with localhost. Makes test easier + '-o', 'NoHostAuthenticationForLocalhost=yes', + # XXX: Possible security issue + # Avoid interactive requests to accept new host keys + '-o', 'StrictHostKeyChecking=no', + '-o', 'ConnectTimeout=%d' % (int(connect_timeout),), + '-o', 'ConnectionAttempts=3', + '-o', 'ServerAliveInterval=30', + '-o', 'TCPKeepAlive=yes', + '-l', user, host] + + if persistent and openssh_has_persist(): + args.extend([ + '-o', 'ControlMaster=auto', + '-o', 'ControlPath=/tmp/%s_%s' % ( CONTROL_PATH, connkey, ), + '-o', 'ControlPersist=60' ]) + if agent: + args.append('-A') + if port: + args.append('-p%d' % port) + if identity_file: + args.extend(('-i', identity_file)) + if tty: + args.append('-t') + elif tty2: + args.append('-t') + args.append('-t') + if sudo: + command = "sudo " + command + args.append(command) + + print " ".join(args) + + for x in xrange(retry or 3): + # connects to the remote host and starts a remote connection + proc = subprocess.Popen(args, + stdout = subprocess.PIPE, + stdin = subprocess.PIPE, + stderr = subprocess.PIPE) + + try: + out, err = _communicate(proc, stdin, timeout, err_on_timeout) + if proc.poll(): + if err.strip().startswith('ssh: ') or err.strip().startswith('mux_client_hello_exchange: '): + # SSH error, can safely retry + continue + elif retry: + # Probably timed out or plain failed but can retry + continue + break + except RuntimeError,e: + if retry <= 0: + raise + retry -= 1 + + return ((out, err), proc) + +def rcopy(source, dest, host, user, + port = None, + agent = True, + recursive = False, + identity_file = None): + """ + Copies file from/to remote sites. + + Source and destination should have the user and host encoded + as per scp specs. + + If source is a file object, a special mode will be used to + create the remote file with the same contents. + + If dest is a file object, the remote file (source) will be + read and written into dest. + + In these modes, recursive cannot be True. + + Source can be a list of files to copy to a single destination, + in which case it is advised that the destination be a folder. + """ + + if isinstance(source, file) and source.tell() == 0: + source = source.name + + elif hasattr(source, 'read'): + tmp = tempfile.NamedTemporaryFile() + while True: + buf = source.read(65536) + if buf: + tmp.write(buf) + else: + break + tmp.seek(0) + source = tmp.name + + if isinstance(source, file) or isinstance(dest, file) \ + or hasattr(source, 'read') or hasattr(dest, 'write'): + assert not recursive + + connkey = make_connkey(user,host,port) + args = ['ssh', '-l', user, '-C', + # Don't bother with localhost. Makes test easier + '-o', 'NoHostAuthenticationForLocalhost=yes', + # XXX: Possible security issue + # Avoid interactive requests to accept new host keys + '-o', 'StrictHostKeyChecking=no', + '-o', 'ConnectTimeout=30', + '-o', 'ConnectionAttempts=3', + '-o', 'ServerAliveInterval=30', + '-o', 'TCPKeepAlive=yes', + host ] + if openssh_has_persist(): + args.extend([ + '-o', 'ControlMaster=auto', + '-o', 'ControlPath=/tmp/%s_%s' % ( CONTROL_PATH, connkey, ), + '-o', 'ControlPersist=60' ]) + if port: + args.append('-P%d' % port) + if identity_file: + args.extend(('-i', identity_file)) + + if isinstance(source, file) or hasattr(source, 'read'): + args.append('cat > %s' % (shell_escape(dest),)) + elif isinstance(dest, file) or hasattr(dest, 'write'): + args.append('cat %s' % (shell_escape(dest),)) + else: + raise AssertionError, "Unreachable code reached! :-Q" + + # connects to the remote host and starts a remote connection + if isinstance(source, file): + proc = subprocess.Popen(args, + stdout = open('/dev/null','w'), + stderr = subprocess.PIPE, + stdin = source) + err = proc.stderr.read() + eintr_retry(proc.wait)() + return ((None,err), proc) + elif isinstance(dest, file): + proc = subprocess.Popen(args, + stdout = open('/dev/null','w'), + stderr = subprocess.PIPE, + stdin = source) + err = proc.stderr.read() + eintr_retry(proc.wait)() + return ((None,err), proc) + elif hasattr(source, 'read'): + # file-like (but not file) source + proc = subprocess.Popen(args, + stdout = open('/dev/null','w'), + stderr = subprocess.PIPE, + stdin = subprocess.PIPE) + + buf = None + err = [] + while True: + if not buf: + buf = source.read(4096) + if not buf: + #EOF + break + + rdrdy, wrdy, broken = select.select( + [proc.stderr], + [proc.stdin], + [proc.stderr,proc.stdin]) + + if proc.stderr in rdrdy: + # use os.read for fully unbuffered behavior + err.append(os.read(proc.stderr.fileno(), 4096)) + + if proc.stdin in wrdy: + proc.stdin.write(buf) + buf = None + + if broken: + break + proc.stdin.close() + err.append(proc.stderr.read()) + + eintr_retry(proc.wait)() + return ((None,''.join(err)), proc) + elif hasattr(dest, 'write'): + # file-like (but not file) dest + proc = subprocess.Popen(args, + stdout = subprocess.PIPE, + stderr = subprocess.PIPE, + stdin = open('/dev/null','w')) + + buf = None + err = [] + while True: + rdrdy, wrdy, broken = select.select( + [proc.stderr, proc.stdout], + [], + [proc.stderr, proc.stdout]) + + if proc.stderr in rdrdy: + # use os.read for fully unbuffered behavior + err.append(os.read(proc.stderr.fileno(), 4096)) + + if proc.stdout in rdrdy: + # use os.read for fully unbuffered behavior + buf = os.read(proc.stdout.fileno(), 4096) + dest.write(buf) + + if not buf: + #EOF + break + + if broken: + break + err.append(proc.stderr.read()) + + eintr_retry(proc.wait)() + return ((None,''.join(err)), proc) + else: + raise AssertionError, "Unreachable code reached! :-Q" + else: + # plain scp + args = ['scp', '-q', '-p', '-C', + # Don't bother with localhost. Makes test easier + '-o', 'NoHostAuthenticationForLocalhost=yes', + # XXX: Possible security issue + # Avoid interactive requests to accept new host keys + '-o', 'StrictHostKeyChecking=no', + '-o', 'ConnectTimeout=30', + '-o', 'ConnectionAttempts=3', + '-o', 'ServerAliveInterval=30', + '-o', 'TCPKeepAlive=yes' ] + + if port: + args.append('-P%d' % port) + if recursive: + args.append('-r') + if identity_file: + args.extend(('-i', identity_file)) + + if isinstance(source,list): + args.extend(source) + else: + if openssh_has_persist(): + connkey = make_connkey(user,host,port) + args.extend([ + '-o', 'ControlMaster=no', + '-o', 'ControlPath=/tmp/%s_%s' % ( CONTROL_PATH, connkey, )]) + args.append(source) + args.append("%s@%s:%s" %(user, host, dest)) + + # connects to the remote host and starts a remote connection + proc = subprocess.Popen(args, + stdout = subprocess.PIPE, + stdin = subprocess.PIPE, + stderr = subprocess.PIPE) + + comm = proc.communicate() + eintr_retry(proc.wait)() + return (comm, proc) + +def rspawn(command, pidfile, + stdout = '/dev/null', + stderr = STDOUT, + stdin = '/dev/null', + home = None, + create_home = False, + host = None, + port = None, + user = None, + agent = None, + sudo = False, + identity_file = None, + tty = False): + """ + Spawn a remote command such that it will continue working asynchronously. + + Parameters: + command: the command to run - it should be a single line. + + pidfile: path of a (ideally unique to this task) pidfile for tracking the process. + + stdout: path of a file to redirect standard output to - must be a string. + Defaults to /dev/null + stderr: path of a file to redirect standard error to - string or the special STDOUT value + to redirect to the same file stdout was redirected to. Defaults to STDOUT. + stdin: path of a file with input to be piped into the command's standard input + + home: path of a folder to use as working directory - should exist, unless you specify create_home + + create_home: if True, the home folder will be created first with mkdir -p + + sudo: whether the command needs to be executed as root + + host/port/user/agent/identity_file: see rexec + + Returns: + (stdout, stderr), process + + Of the spawning process, which only captures errors at spawning time. + Usually only useful for diagnostics. + """ + # Start process in a "daemonized" way, using nohup and heavy + # stdin/out redirection to avoid connection issues + if stderr is STDOUT: + stderr = '&1' + else: + stderr = ' ' + stderr + + daemon_command = '{ { %(command)s > %(stdout)s 2>%(stderr)s < %(stdin)s & } ; echo $! 1 > %(pidfile)s ; }' % { + 'command' : command, + 'pidfile' : shell_escape(pidfile), + + 'stdout' : stdout, + 'stderr' : stderr, + 'stdin' : stdin, + } + + cmd = "%(create)s%(gohome)s rm -f %(pidfile)s ; %(sudo)s nohup bash -c %(command)s " % { + 'command' : shell_escape(daemon_command), + + 'sudo' : 'sudo -S' if sudo else '', + + 'pidfile' : shell_escape(pidfile), + 'gohome' : 'cd %s ; ' % (shell_escape(home),) if home else '', + 'create' : 'mkdir -p %s ; ' % (shell_escape,) if create_home else '', + } + + (out,err), proc = rexec( + cmd, + host = host, + port = port, + user = user, + agent = agent, + identity_file = identity_file, + tty = tty + ) + + if proc.wait(): + raise RuntimeError, "Failed to set up application: %s %s" % (out,err,) + + return (out,err),proc + +@eintr_retry +def rcheck_pid(pidfile, + host = None, + port = None, + user = None, + agent = None, + identity_file = None): + """ + Check the pidfile of a process spawned with remote_spawn. + + Parameters: + pidfile: the pidfile passed to remote_span + + host/port/user/agent/identity_file: see rexec + + Returns: + + A (pid, ppid) tuple useful for calling remote_status and remote_kill, + or None if the pidfile isn't valid yet (maybe the process is still starting). + """ + + (out,err),proc = rexec( + "cat %(pidfile)s" % { + 'pidfile' : pidfile, + }, + host = host, + port = port, + user = user, + agent = agent, + identity_file = identity_file + ) + + if proc.wait(): + return None + + if out: + try: + return map(int,out.strip().split(' ',1)) + except: + # Ignore, many ways to fail that don't matter that much + return None + +@eintr_retry +def rstatus(pid, ppid, + host = None, + port = None, + user = None, + agent = None, + identity_file = None): + """ + Check the status of a process spawned with remote_spawn. + + Parameters: + pid/ppid: pid and parent-pid of the spawned process. See remote_check_pid + + host/port/user/agent/identity_file: see rexec + + Returns: + + One of NOT_STARTED, RUNNING, FINISHED + """ + + (out,err),proc = rexec( + "ps --pid %(pid)d -o pid | grep -c %(pid)d ; true" % { + 'ppid' : ppid, + 'pid' : pid, + }, + host = host, + port = port, + user = user, + agent = agent, + identity_file = identity_file + ) + + if proc.wait(): + return NOT_STARTED + + status = False + if out: + try: + status = bool(int(out.strip())) + except: + if out or err: + logging.warn("Error checking remote status:\n%s%s\n", out, err) + # Ignore, many ways to fail that don't matter that much + return NOT_STARTED + return RUNNING if status else FINISHED + + +@eintr_retry +def rkill(pid, ppid, + host = None, + port = None, + user = None, + agent = None, + sudo = False, + identity_file = None, + nowait = False): + """ + Kill a process spawned with remote_spawn. + + First tries a SIGTERM, and if the process does not end in 10 seconds, + it sends a SIGKILL. + + Parameters: + pid/ppid: pid and parent-pid of the spawned process. See remote_check_pid + + sudo: whether the command was run with sudo - careful killing like this. + + host/port/user/agent/identity_file: see rexec + + Returns: + + Nothing, should have killed the process + """ + + if sudo: + subkill = "$(ps --ppid %(pid)d -o pid h)" % { 'pid' : pid } + else: + subkill = "" + cmd = """ +SUBKILL="%(subkill)s" ; +%(sudo)s kill -- -%(pid)d $SUBKILL || /bin/true +%(sudo)s kill %(pid)d $SUBKILL || /bin/true +for x in 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 ; do + sleep 0.2 + if [ `ps --pid %(pid)d -o pid | grep -c %(pid)d` == '0' ]; then + break + else + %(sudo)s kill -- -%(pid)d $SUBKILL || /bin/true + %(sudo)s kill %(pid)d $SUBKILL || /bin/true + fi + sleep 1.8 +done +if [ `ps --pid %(pid)d -o pid | grep -c %(pid)d` != '0' ]; then + %(sudo)s kill -9 -- -%(pid)d $SUBKILL || /bin/true + %(sudo)s kill -9 %(pid)d $SUBKILL || /bin/true +fi +""" + if nowait: + cmd = "( %s ) >/dev/null 2>/dev/null timelimit: + if curtime > bailtime: + break + elif curtime > killtime: + signum = signal.SIGKILL + else: + signum = signal.SIGTERM + # Lets kill it + os.kill(self.pid, signum) + select_timeout = 0.5 + else: + select_timeout = timelimit - curtime + 0.1 + else: + select_timeout = 1.0 + + if select_timeout > 1.0: + select_timeout = 1.0 + + try: + rlist, wlist, xlist = select.select(read_set, write_set, [], select_timeout) + except select.error,e: + if e[0] != 4: + raise + else: + continue + + if not rlist and not wlist and not xlist and self.poll() is not None: + # timeout and process exited, say bye + break + + if self.stdin in wlist: + # When select has indicated that the file is writable, + # we can write up to PIPE_BUF bytes without risk + # blocking. POSIX defines PIPE_BUF >= 512 + bytes_written = os.write(self.stdin.fileno(), buffer(input, input_offset, 512)) + input_offset += bytes_written + if input_offset >= len(input): + self.stdin.close() + write_set.remove(self.stdin) + + if self.stdout in rlist: + data = os.read(self.stdout.fileno(), 1024) + if data == "": + self.stdout.close() + read_set.remove(self.stdout) + stdout.append(data) + + if self.stderr in rlist: + data = os.read(self.stderr.fileno(), 1024) + if data == "": + self.stderr.close() + read_set.remove(self.stderr) + stderr.append(data) + + # All data exchanged. Translate lists into strings. + if stdout is not None: + stdout = ''.join(stdout) + if stderr is not None: + stderr = ''.join(stderr) + + # Translate newlines, if requested. We cannot let the file + # object do the translation: It is based on stdio, which is + # impossible to combine with select (unless forcing no + # buffering). + if self.universal_newlines and hasattr(file, 'newlines'): + if stdout: + stdout = self._translate_newlines(stdout) + if stderr: + stderr = self._translate_newlines(stderr) + + if killed and err_on_timeout: + errcode = self.poll() + raise RuntimeError, ("Operation timed out", errcode, stdout, stderr) + else: + if killed: + self.poll() + else: + self.wait() + return (stdout, stderr) + diff --git a/src/neco/util/timefuncs.py b/src/neco/util/timefuncs.py new file mode 100644 index 00000000..7bd3d56a --- /dev/null +++ b/src/neco/util/timefuncs.py @@ -0,0 +1,50 @@ +import datetime +import re + +_strf = "%Y%m%d%H%M%S%f" +_reabs = re.compile("^\d{20}$") +_rerel = re.compile("^(?P