From eb16e40f7b5b7e4aa71c9a1d5ebdf348bcbebb21 Mon Sep 17 00:00:00 2001 From: Alina Quereilhac Date: Tue, 18 Jun 2013 22:47:21 -0700 Subject: [PATCH] Adding CCN RMs for Linux Backend --- src/nepi/resources/linux/application.py | 19 +- src/nepi/resources/linux/ccnd.py | 54 +---- src/nepi/resources/linux/ccnr.py | 260 ++++++++++++++++++++++++ src/nepi/resources/linux/node.py | 16 +- 4 files changed, 280 insertions(+), 69 deletions(-) create mode 100644 src/nepi/resources/linux/ccnr.py diff --git a/src/nepi/resources/linux/application.py b/src/nepi/resources/linux/application.py index 13a8a9b9..727375d6 100644 --- a/src/nepi/resources/linux/application.py +++ b/src/nepi/resources/linux/application.py @@ -447,27 +447,26 @@ class LinuxApplication(ResourceManager): # check if execution errors occurred msg = " Failed to start command '%s' " % command - if proc.poll() and err: + if proc.poll(): self.error(msg, out, err) raise RuntimeError, msg - # Check status of process running in background + # Wait for pid file to be generated pid, ppid = self.node.wait_pid(self.app_home) if pid: self._pid = int(pid) if ppid: self._ppid = int(ppid) - + # If the process is not running, check for error information # on the remote machine if not self.pid or not self.ppid: - (out, err), proc = self.node.check_output(self.app_home, 'stderr') - self.error(msg, out, err) + (out, err), proc = self.check_errors(home, ecodefile, stderr) - msg2 = " Setting state to Failed" - self.debug(msg2) - self._state = ResourceState.FAILED + # Out is what was written in the stderr file + if err: + msg = " Failed to start command '%s' " % command + self.error(msg, out, err) + raise RuntimeError, msg - raise RuntimeError, msg - super(LinuxApplication, self).start() else: diff --git a/src/nepi/resources/linux/ccnd.py b/src/nepi/resources/linux/ccnd.py index 44b5f164..e1926d40 100644 --- a/src/nepi/resources/linux/ccnd.py +++ b/src/nepi/resources/linux/ccnd.py @@ -122,49 +122,6 @@ class LinuxCCND(LinuxApplication): def __init__(self, ec, guid): super(LinuxCCND, self).__init__(ec, guid) - def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0): - self.info("Retrieving '%s' trace %s " % (name, attr)) - - path = os.path.join(self.app_home, name) - - command = "(test -f %s && echo 'success') || echo 'error'" % path - (out, err), proc = self.node.execute(command) - - if (err and proc.poll()) or out.find("error") != -1: - msg = " Couldn't find trace %s " % name - self.error(msg, out, err) - return None - - if attr == TraceAttr.PATH: - return path - - if attr == TraceAttr.ALL: - (out, err), proc = self.node.check_output(self.app_home, name) - - if err and proc.poll(): - msg = " Couldn't read trace %s " % name - self.error(msg, out, err) - return None - - return out - - if attr == TraceAttr.STREAM: - cmd = "dd if=%s bs=%d count=1 skip=%d" % (path, block, offset) - elif attr == TraceAttr.SIZE: - cmd = "stat -c%%s %s " % path - - (out, err), proc = self.node.execute(cmd) - - if err and proc.poll(): - msg = " Couldn't find trace %s " % name - self.error(msg, out, err) - return None - - if attr == TraceAttr.SIZE: - out = int(out.strip()) - - return out - def deploy(self): if not self.get("command"): self.set("command", self._default_command) @@ -186,9 +143,6 @@ class LinuxCCND(LinuxApplication): super(LinuxCCND, self).deploy() - def start(self): - super(LinuxCCND, self).start() - def stop(self): command = self.get('command') or '' state = self.state @@ -318,11 +272,9 @@ class LinuxCCND(LinuxApplication): "prefix" : "CCND_PREFIX", }) - env = "PATH=$PATH:${EXP_HOME}/ccnx/bin" - for key in envs.keys(): - val = self.get(key) - if val: - env += " %s=%s" % (key, val) + env = "PATH=$PATH:${EXP_HOME}/ccnx/bin " + env += " ".join(map(lambda k: "%s=%s" % (envs.get(k), self.get(k)) \ + if self.get(k) else "", envs.keys())) return env diff --git a/src/nepi/resources/linux/ccnr.py b/src/nepi/resources/linux/ccnr.py new file mode 100644 index 00000000..c1df28b5 --- /dev/null +++ b/src/nepi/resources/linux/ccnr.py @@ -0,0 +1,260 @@ +# +# NEPI, a framework to manage network experiments +# Copyright (C) 2013 INRIA +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +# Author: Alina Quereilhac + +from nepi.execution.attribute import Attribute, Flags, Types +from nepi.execution.trace import Trace, TraceAttr +from nepi.execution.resource import ResourceManager, clsinit_copy, ResourceState, \ + ResourceAction +from nepi.resources.linux.application import LinuxApplication +from nepi.resources.linux.ccnd import LinuxCCND +from nepi.resources.linux.node import OSType + +from nepi.util.sshfuncs import ProcStatus +from nepi.util.timefuncs import strfnow, strfdiff +import os + +reschedule_delay = "0.5s" + +@clsinit_copy +class LinuxCCNR(LinuxApplication): + _rtype = "LinuxCCNR" + + @classmethod + def _register_attributes(cls): + max_fanout = Attribute("maxFanout", + "Sets the CCNR_BTREE_MAX_FANOUT environmental variable. ", + flags = Flags.ExecReadOnly) + + max_leaf_entries = Attribute("maxLeafEntries", + "Sets the CCNR_BTREE_MAX_LEAF_ENTRIES environmental variable. ", + flags = Flags.ExecReadOnly) + + max_node_bytes = Attribute("maxNodeBytes", + "Sets the CCNR_BTREE_MAX_NODE_BYTES environmental variable. ", + flags = Flags.ExecReadOnly) + + max_node_pool = Attribute("maxNodePool", + "Sets the CCNR_BTREE_MAX_NODE_POOL environmental variable. ", + flags = Flags.ExecReadOnly) + + content_cache = Attribute("contentCache", + "Sets the CCNR_CONTENT_CACHE environmental variable. ", + flags = Flags.ExecReadOnly) + + debug = Attribute("debug", + "Sets the CCNR_DEBUG environmental variable. " + "Logging level for ccnr. Defaults to WARNING.", + type = Types.Enumerate, + allowed = [ + "NONE", + "SEVERE", + "ERROR", + "WARNING", + "INFO", + "FINE, FINER, FINEST"], + flags = Flags.ExecReadOnly) + + directory = Attribute("directory", + "Sets the CCNR_DIRECTORY environmental variable. ", + flags = Flags.ExecReadOnly) + + global_prefix = Attribute("globalPrefix", + "Sets the CCNR_GLOBAL_PREFIX environmental variable. ", + flags = Flags.ExecReadOnly) + + listen_on = Attribute("listenOn", + "Sets the CCNR_LISTEN_ON environmental variable. ", + flags = Flags.ExecReadOnly) + + min_send_bufsize = Attribute("minSendBufsize", + "Sets the CCNR_MIN_SEND_BUFSIZE environmental variable. ", + flags = Flags.ExecReadOnly) + + proto = Attribute("proto", + "Sets the CCNR_PROTO environmental variable. ", + flags = Flags.ExecReadOnly) + + status_port = Attribute("statusPort", + "Sets the CCNR_STATUS_PORT environmental variable. ", + flags = Flags.ExecReadOnly) + + start_write_scope_limit = Attribute("startWriteScopeLimit", + "Sets the CCNR_START_WRITE_SCOPE_LIMIT environmental variable. ", + flags = Flags.ExecReadOnly) + + ccns_debug = Attribute("ccnsDebug", + "Sets the CCNS_DEBUG environmental variable. ", + flags = Flags.ExecReadOnly) + + ccns_enable = Attribute("ccnsEnable", + "Sets the CCNS_ENABLE environmental variable. ", + flags = Flags.ExecReadOnly) + + ccns_faux_error = Attribute("ccnsFauxError", + "Sets the CCNS_FAUX_ERROR environmental variable. ", + flags = Flags.ExecReadOnly) + + ccns_heartbeat_micros = Attribute("ccnsHeartBeatMicros", + "Sets the CCNS_HEART_BEAT_MICROS environmental variable. ", + flags = Flags.ExecReadOnly) + + ccns_max_compares_busy = Attribute("ccnsMaxComparesBusy", + "Sets the CCNS_MAX_COMPARES_BUSY environmental variable. ", + flags = Flags.ExecReadOnly) + + ccns_max_fetch_busy = Attribute("ccnsMaxFetchBusy", + "Sets the CCNS_MAX_FETCH_BUSY environmental variable. ", + flags = Flags.ExecReadOnly) + + ccns_node_fetch_lifetime = Attribute("ccnsNodeFetchLifetime", + "Sets the CCNS_NODE_FETCH_LIFETIME environmental variable. ", + flags = Flags.ExecReadOnly) + + ccns_note_err = Attribute("ccnsNoteErr", + "Sets the CCNS_NOTE_ERR environmental variable. ", + flags = Flags.ExecReadOnly) + + ccns_repo_store = Attribute("ccnsRepoStore", + "Sets the CCNS_REPO_STORE environmental variable. ", + flags = Flags.ExecReadOnly) + + ccns_root_advise_fresh = Attribute("ccnsRootAdviseFresh", + "Sets the CCNS_ROOT_ADVISE_FRESH environmental variable. ", + flags = Flags.ExecReadOnly) + + ccns_root_advise_lifetime = Attribute("ccnsRootAdviseLifetime", + "Sets the CCNS_ROOT_ADVISE_LIFETIME environmental variable. ", + flags = Flags.ExecReadOnly) + + ccns_stable_enabled = Attribute("ccnsStableEnabled", + "Sets the CCNS_STABLE_ENABLED environmental variable. ", + flags = Flags.ExecReadOnly) + + ccns_sync_scope = Attribute("ccnsSyncScope", + "Sets the CCNS_SYNC_SCOPE environmental variable. ", + flags = Flags.ExecReadOnly) + + cls._register_attribute(max_fanout) + cls._register_attribute(max_leaf_entries) + cls._register_attribute(max_node_bytes) + cls._register_attribute(max_node_pool) + cls._register_attribute(content_cache) + cls._register_attribute(debug) + cls._register_attribute(directory) + cls._register_attribute(global_prefix) + cls._register_attribute(listen_on) + cls._register_attribute(min_send_bufsize) + cls._register_attribute(proto) + cls._register_attribute(status_port) + cls._register_attribute(start_write_scope_limit) + cls._register_attribute(ccns_debug) + cls._register_attribute(ccns_enable) + cls._register_attribute(ccns_faux_error) + cls._register_attribute(ccns_heartbeat_micros) + cls._register_attribute(ccns_max_compares_busy) + cls._register_attribute(ccns_max_fetch_busy) + cls._register_attribute(ccns_node_fetch_lifetime) + cls._register_attribute(ccns_note_err) + cls._register_attribute(ccns_repo_store) + cls._register_attribute(ccns_root_advise_fresh) + cls._register_attribute(ccns_root_advise_lifetime) + cls._register_attribute(ccns_stable_enabled) + cls._register_attribute(ccns_sync_scope) + + @classmethod + def _register_traces(cls): + log = Trace("log", "CCND log output") + + cls._register_trace(log) + + def __init__(self, ec, guid): + super(LinuxCCNR, self).__init__(ec, guid) + + @property + def ccnd(self): + ccnd = self.get_connected(LinuxCCND.rtype()) + if ccnd: return ccnd[0] + return None + + def deploy(self): + if not self.get("command"): + self.set("command", self._default_command) + + if not self.get("env"): + self.set("env", self._default_environment) + + # Wait until associated ccnd is provisioned + ccnd = self.ccnd + + if not ccnd or ccnd.state < ResourceState.PROVISIONED: + self.ec.schedule(reschedule_delay, self.deploy) + else: + # Add a start after condition so CCNR will not start + # before CCND does + self.ec.register_condition(self.guid, ResourceAction.START, + ccnd.guid, ResourceState.STARTED) + + # Invoke the actual deployment + super(LinuxCCNR, self).deploy() + + @property + def _default_command(self): + return "ccnr" + + @property + def _default_environment(self): + envs = dict({ + "maxFanout": "CCNR_BTREE_MAX_FANOUT", + "maxLeafEntries": "CCNR_BTREE_MAX_LEAF_ENTRIES", + "maxNodeBytes": "CCNR_BTREE_MAX_NODE_BYTES", + "maxNodePool": "CCNR_BTREE_MAX_NODE_POOL", + "contentCache": "CCNR_CONTENT_CACHE", + "debug": "CCNR_DEBUG", + "directory": "CCNR_DIRECTORY", + "globalPrefix": "CCNR_GLOBAL_PREFIX", + "listenOn": "CCNR_LISTEN_ON", + "minSendBufsize": "CCNR_MIN_SEND_BUFSIZE", + "proto": "CCNR_PROTO", + "statusPort": "CCNR_STATUS_PORT", + "startWriteScopeLimit": "CCNR_START_WRITE_SCOPE_LIMIT", + "ccnsDebug": "CCNS_DEBUG", + "ccnsEnable": "CCNS_ENABLE", + "ccnsFauxError": "CCNS_FAUX_ERROR", + "ccnsHeartBeatMicros": "CCNS_HEART_BEAT_MICROS", + "ccnsMaxComparesBusy": "CCNS_MAX_COMPARES_BUSY", + "ccnsMaxFetchBusy": "CCNS_MAX_FETCH_BUSY", + "ccnsNodeFetchLifetime": "CCNS_NODE_FETCH_LIFETIME", + "ccnsNoteErr": "CCNS_NOTE_ERR", + "ccnsRepoStore": "CCNS_REPO_STORE", + "ccnsRootAdviseFresh": "CCNS_ROOT_ADVISE_FRESH", + "ccnsRootAdviseLifetime": "CCNS_ROOT_ADVISE_LIFETIME", + "ccnsStableEnabled": "CCNS_STABLE_ENABLED", + "ccnsSyncScope": "CCNS_SYNC_SCOPE", + }) + + env = "PATH=$PATH:${EXP_HOME}/ccnx/bin " + env += " ".join(map(lambda k: "%s=%s" % (envs.get(k), self.get(k)) \ + if self.get(k) else "", envs.keys())) + + return env + + def valid_connection(self, guid): + # TODO: Validate! + return True + diff --git a/src/nepi/resources/linux/node.py b/src/nepi/resources/linux/node.py index 80212586..0d64557f 100644 --- a/src/nepi/resources/linux/node.py +++ b/src/nepi/resources/linux/node.py @@ -370,11 +370,12 @@ class LinuxNode(ResourceManager): tty = tty) # check no errors occurred - if proc.poll() and err: + if proc.poll(): msg = " Failed to run command '%s' " % command self.error(msg, out, err) if raise_on_error: raise RuntimeError, msg + # Wait for pid file to be generated pid, ppid = self.wait_pid( home = home, @@ -448,8 +449,8 @@ class LinuxNode(ResourceManager): """ sep = " " if inline else "\n" export = " " if inline else "export" - return sep.join(map(lambda e: "%s %s" % (export, e), env.split(" "))) \ - + sep if env else "" + return sep.join(map(lambda e: "%s %s" % (export, e), + env.strip().split(" "))) + sep if env else "" def check_errors(self, home, ecodefile = "exitcode", @@ -461,8 +462,10 @@ class LinuxNode(ResourceManager): exit code is an error one it returns the error output. """ - out = err = "" proc = None + err = "" + # retrive standard output from the file + (out, oerr), oproc = self.check_output(home, stdout) # get exit code saved in the 'exitcode' file ecode = self.exitcode(home, ecodefile) @@ -474,15 +477,12 @@ class LinuxNode(ResourceManager): # Check standard error. (err, eerr), proc = self.check_output(home, stderr) - # Alsow retrive standard output for information - (out, oerr), oproc = self.check_output(home, stdout) - # If the stderr file was not found, assume nothing bad happened, # and just ignore the error. # (cat returns 1 for error "No such file or directory") if ecode == ExitCode.FILENOTFOUND and proc.poll() == 1: err = "" - + return (out, err), proc def wait_pid(self, home, pidfile = "pidfile", raise_on_error = False): -- 2.43.0