2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
20 from nepi.execution.attribute import Attribute, Flags, Types
21 from nepi.execution.trace import Trace, TraceAttr
22 from nepi.execution.resource import ResourceManager, clsinit_copy, ResourceState
23 from nepi.resources.linux.application import LinuxApplication
24 from nepi.resources.linux.node import OSType
29 class LinuxCCND(LinuxApplication):
33 def _register_attributes(cls):
34 debug = Attribute("debug", "Sets the CCND_DEBUG environmental variable. "
35 " Allowed values are : \n"
37 " 1 - basic messages (any non-zero value gets these) \n"
38 " 2 - interest messages \n"
39 " 4 - content messages \n"
40 " 8 - matching details \n"
41 " 16 - interest details \n"
42 " 32 - gory interest details \n"
43 " 64 - log occasional human-readable timestamps \n"
44 " 128 - face registration debugging \n"
45 " -1 - max logging \n"
46 " Or apply bitwise OR to these values to get combinations of them",
47 flags = Flags.ExecReadOnly)
49 port = Attribute("port", "Sets the CCN_LOCAL_PORT environmental variable. "
51 flags = Flags.ExecReadOnly)
53 sockname = Attribute("sockname",
54 "Sets the CCN_LOCAL_SCOKNAME environmental variable. "
55 "Defaults to /tmp/.ccnd.sock",
56 flags = Flags.ExecReadOnly)
58 capacity = Attribute("capacity",
59 "Sets the CCND_CAP environmental variable. "
60 "Capacity limit in terms of ContentObjects",
61 flags = Flags.ExecReadOnly)
63 mtu = Attribute("mtu", "Sets the CCND_MTU environmental variable. ",
64 flags = Flags.ExecReadOnly)
66 data_pause = Attribute("dataPauseMicrosec",
67 "Sets the CCND_DATA_PAUSE_MICROSEC environmental variable. ",
68 flags = Flags.ExecReadOnly)
70 default_stale = Attribute("defaultTimeToStale",
71 "Sets the CCND_DEFAULT_TIME_TO_STALE environmental variable. ",
72 flags = Flags.ExecReadOnly)
74 max_stale = Attribute("maxTimeToStale",
75 "Sets the CCND_MAX_TIME_TO_STALE environmental variable. ",
76 flags = Flags.ExecReadOnly)
78 max_rte = Attribute("maxRteMicrosec",
79 "Sets the CCND_MAX_RTE_MICROSEC environmental variable. ",
80 flags = Flags.ExecReadOnly)
82 keystore = Attribute("keyStoreDirectory",
83 "Sets the CCND_KEYSTORE_DIRECTORY environmental variable. ",
84 flags = Flags.ExecReadOnly)
86 listen_on = Attribute("listenOn",
87 "Sets the CCND_LISTEN_ON environmental variable. ",
88 flags = Flags.ExecReadOnly)
90 autoreg = Attribute("autoreg",
91 "Sets the CCND_AUTOREG environmental variable. ",
92 flags = Flags.ExecReadOnly)
94 prefix = Attribute("prefix",
95 "Sets the CCND_PREFIX environmental variable. ",
96 flags = Flags.ExecReadOnly)
98 cls._register_attribute(debug)
99 cls._register_attribute(port)
100 cls._register_attribute(sockname)
101 cls._register_attribute(capacity)
102 cls._register_attribute(mtu)
103 cls._register_attribute(data_pause)
104 cls._register_attribute(default_stale)
105 cls._register_attribute(max_stale)
106 cls._register_attribute(max_rte)
107 cls._register_attribute(keystore)
108 cls._register_attribute(listen_on)
109 cls._register_attribute(autoreg)
110 cls._register_attribute(prefix)
113 def _register_traces(cls):
114 log = Trace("log", "CCND log output")
115 status = Trace("status", "ccndstatus output")
117 cls._register_trace(log)
118 cls._register_trace(status)
120 def __init__(self, ec, guid):
121 super(LinuxCCND, self).__init__(ec, guid)
123 def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
124 self.info("Retrieving '%s' trace %s " % (name, attr))
126 path = os.path.join(self.app_home, name)
128 command = "(test -f %s && echo 'success') || echo 'error'" % path
129 (out, err), proc = self.node.execute(command)
131 if (err and proc.poll()) or out.find("error") != -1:
132 msg = " Couldn't find trace %s " % name
133 self.error(msg, out, err)
136 if attr == TraceAttr.PATH:
139 if attr == TraceAttr.ALL:
140 (out, err), proc = self.node.check_output(self.app_home, name)
142 if err and proc.poll():
143 msg = " Couldn't read trace %s " % name
144 self.error(msg, out, err)
149 if attr == TraceAttr.STREAM:
150 cmd = "dd if=%s bs=%d count=1 skip=%d" % (path, block, offset)
151 elif attr == TraceAttr.SIZE:
152 cmd = "stat -c%%s %s " % path
154 (out, err), proc = self.node.execute(cmd)
156 if err and proc.poll():
157 msg = " Couldn't find trace %s " % name
158 self.error(msg, out, err)
161 if attr == TraceAttr.SIZE:
162 out = int(out.strip())
167 if not self.get("command"):
168 self.set("command", self._default_command)
170 if not self.get("depends"):
171 self.set("depends", self._default_dependencies)
173 if not self.get("sources"):
174 self.set("sources", self._default_sources)
176 if not self.get("build"):
177 self.set("build", self._default_build)
179 if not self.get("install"):
180 self.set("install", self._default_install)
182 if not self.get("env"):
183 self.set("env", self._default_environment)
185 super(LinuxCCND, self).deploy()
188 command = self.get("command")
189 env = self.get("env")
190 stdin = "stdin" if self.get("stdin") else None
191 stdout = "stdout" if self.get("stdout") else "stdout"
192 stderr = "stderr" if self.get("stderr") else "stderr"
193 sudo = self.get("sudo") or False
194 x11 = self.get("forwardX11") or False
198 # If no command was given, then the application
199 # is directly marked as FINISHED
200 self._state = ResourceState.FINISHED
202 self.info("Starting command '%s'" % command)
205 # If X11 forwarding was specified, then the application
206 # can not run detached, so instead of invoking asynchronous
207 # 'run' we invoke synchronous 'execute'.
209 msg = "No command is defined but X11 forwarding has been set"
211 self._state = ResourceState.FAILED
212 raise RuntimeError, msg
215 environ = "\n".join(map(lambda e: "export %s" % e, env.split(" ")))\
218 command = environ + command
219 command = self.replace_paths(command)
221 # Mark application as started before executing the command
222 # since after the thread will be blocked by the execution
224 super(LinuxApplication, self).start()
226 # If the command requires X11 forwarding, we
227 # can't run it asynchronously
228 (out, err), proc = self.node.execute(command,
233 self._state = ResourceState.FINISHED
235 if proc.poll() and err:
238 # Command was previously uploaded, now run the remote
239 # bash file asynchronously
240 cmd = "bash ./app.sh"
241 (out, err), proc = self.node.run(cmd, self.app_home,
247 # check if execution errors occurred
248 msg = " Failed to start command '%s' " % command
250 if proc.poll() and err:
251 self.error(msg, out, err)
252 raise RuntimeError, msg
254 # Check status of process running in background
255 pid, ppid = self.node.wait_pid(self.app_home)
256 if pid: self._pid = int(pid)
257 if ppid: self._ppid = int(ppid)
259 # If the process is not running, check for error information
260 # on the remote machine
261 if not self.pid or not self.ppid:
262 (out, err), proc = self.node.check_output(self.app_home, 'stderr')
263 self.error(msg, out, err)
265 msg2 = " Setting state to Failed"
267 self._state = ResourceState.FAILED
269 raise RuntimeError, msg
271 super(LinuxApplication, self).start()
274 command = self.get('command') or ''
277 if state == ResourceState.STARTED:
278 self.info("Stopping command '%s'" % command)
281 env = self.get("env")
283 # replace application specific paths in the command
284 command = self.replace_paths(command)
285 env = env and self.replace_paths(env)
287 # Upload the command to a file, and execute asynchronously
288 self.node.run_and_wait(command, self.app_home,
289 shfile = "ccndstop.sh",
291 pidfile = "ccndstop_pidfile",
292 ecodefile = "ccndstop_exitcode",
293 stdout = "ccndstop_stdout",
294 stderr = "ccndstop_stderr")
297 super(LinuxCCND, self).stop()
301 if self._state == ResourceState.STARTED:
302 # To avoid overwhelming the remote hosts and the local processor
303 # with too many ssh queries, the state is only requested
304 # every 'state_check_delay' seconds.
305 state_check_delay = 0.5
306 if strfdiff(strfnow(), self._last_state_check) > state_check_delay:
307 # check if execution errors occurred
308 (out, err), proc = self.node.check_errors(self.app_home)
311 if err.find("No such file or directory") >= 0 :
312 # The resource is marked as started, but the
313 # command was not yet executed
314 return ResourceState.READY
316 msg = " Failed to execute command '%s'" % self.get("command")
317 self.error(msg, out, err)
318 self._state = ResourceState.FAILED
320 elif self.pid and self.ppid:
321 status = self.node.status(self.pid, self.ppid)
323 if status == ProcStatus.FINISHED:
324 self._state = ResourceState.FINISHED
327 self._last_state_check = strfnow()
332 def _default_command(self):
336 def _default_dependencies(self):
337 if self.node.os in [ OSType.FEDORA_12 , OSType.FEDORA_14 ]:
338 return ( " autoconf openssl-devel expat-devel libpcap-devel "
339 " ecryptfs-utils-devel libxml2-devel automake gawk "
340 " gcc gcc-c++ git pcre-devel make ")
341 elif self.node.os in [ OSType.UBUNTU , OSType.DEBIAN]:
342 return ( " autoconf libssl-dev libexpat-dev libpcap-dev "
343 " libecryptfs0 libxml2-utils automake gawk gcc g++ "
344 " git-core pkg-config libpcre3-dev make ")
348 def _default_sources(self):
349 return "http://www.ccnx.org/releases/ccnx-0.7.1.tar.gz"
352 def _default_build(self):
353 sources = self.get("sources").split(" ")[0]
354 sources = os.path.basename(sources)
357 # Evaluate if ccnx binaries are already installed
359 " test -f ${EXP_HOME}/ccnx/bin/ccnd"
361 # If not, untar and build
363 " mkdir -p ${SOURCES}/ccnx && "
364 " tar xf ${SOURCES}/%(sources)s --strip-components=1 -C ${SOURCES}/ccnx "
366 "cd ${SOURCES}/ccnx && "
367 # Just execute and silence warnings...
368 " ( ./configure && make ) "
369 " )") % ({ 'sources': sources })
372 def _default_install(self):
374 # Evaluate if ccnx binaries are already installed
376 " test -f ${EXP_HOME}/ccnx/bin/ccnd"
378 " mkdir -p ${EXP_HOME}/ccnx/bin && "
379 " cp -r ${SOURCES}/ccnx ${EXP_HOME}"
384 def _default_environment(self):
386 "debug": "CCND_DEBUG",
387 "port": "CCN_LOCAL_PORT",
388 "sockname" : "CCN_LOCAL_SOCKNAME",
389 "capacity" : "CCND_CAP",
391 "dataPauseMicrosec" : "CCND_DATA_PAUSE_MICROSEC",
392 "defaultTimeToStale" : "CCND_DEFAULT_TIME_TO_STALE",
393 "maxTimeToStale" : "CCND_MAX_TIME_TO_STALE",
394 "maxRteMicrosec" : "CCND_MAX_RTE_MICROSEC",
395 "keyStoreDirectory" : "CCND_KEYSTORE_DIRECTORY",
396 "listenOn" : "CCND_LISTEN_ON",
397 "autoreg" : "CCND_AUTOREG",
398 "prefix" : "CCND_PREFIX",
401 env = "PATH=$PATH:${EXP_HOME}/ccnx/bin"
402 for key in envs.keys():
405 env += " %s=%s" % (key, val)
409 def valid_connection(self, guid):