655bc62f621928584bf00264fac5e1f1eff10de7
[nepi.git] / src / nepi / resources / linux / ccnd.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19
20 from nepi.execution.attribute import Attribute, Flags, Types
21 from nepi.execution.trace import Trace, TraceAttr
22 from nepi.execution.resource import ResourceManager, clsinit_copy, ResourceState
23 from nepi.resources.linux.application import LinuxApplication
24 from nepi.resources.linux.node import OSType
25
26 import os
27
28 @clsinit_copy
29 class LinuxCCND(LinuxApplication):
30     _rtype = "LinuxCCND"
31
32     @classmethod
33     def _register_attributes(cls):
34         debug = Attribute("debug", "Sets the CCND_DEBUG environmental variable. "
35             " Allowed values are : \n"
36             "  0 - no messages \n"
37             "  1 - basic messages (any non-zero value gets these) \n"
38             "  2 - interest messages \n"
39             "  4 - content messages \n"
40             "  8 - matching details \n"
41             "  16 - interest details \n"
42             "  32 - gory interest details \n"
43             "  64 - log occasional human-readable timestamps \n"
44             "  128 - face registration debugging \n"
45             "  -1 - max logging \n"
46             "  Or apply bitwise OR to these values to get combinations of them",
47             flags = Flags.ExecReadOnly)
48
49         port = Attribute("port", "Sets the CCN_LOCAL_PORT environmental variable. "
50             "Defaults to 9695 ", 
51             flags = Flags.ExecReadOnly)
52  
53         sockname = Attribute("sockname",
54             "Sets the CCN_LOCAL_SCOKNAME environmental variable. "
55             "Defaults to /tmp/.ccnd.sock", 
56             flags = Flags.ExecReadOnly)
57
58         capacity = Attribute("capacity",
59             "Sets the CCND_CAP environmental variable. "
60             "Capacity limit in terms of ContentObjects",
61             flags = Flags.ExecReadOnly)
62
63         mtu = Attribute("mtu", "Sets the CCND_MTU environmental variable. ",
64             flags = Flags.ExecReadOnly)
65   
66         data_pause = Attribute("dataPauseMicrosec",
67             "Sets the CCND_DATA_PAUSE_MICROSEC environmental variable. ",
68             flags = Flags.ExecReadOnly)
69
70         default_stale = Attribute("defaultTimeToStale",
71              "Sets the CCND_DEFAULT_TIME_TO_STALE environmental variable. ",
72             flags = Flags.ExecReadOnly)
73
74         max_stale = Attribute("maxTimeToStale",
75             "Sets the CCND_MAX_TIME_TO_STALE environmental variable. ",
76             flags = Flags.ExecReadOnly)
77
78         max_rte = Attribute("maxRteMicrosec",
79             "Sets the CCND_MAX_RTE_MICROSEC environmental variable. ",
80             flags = Flags.ExecReadOnly)
81
82         keystore = Attribute("keyStoreDirectory",
83             "Sets the CCND_KEYSTORE_DIRECTORY environmental variable. ",
84             flags = Flags.ExecReadOnly)
85
86         listen_on = Attribute("listenOn",
87             "Sets the CCND_LISTEN_ON environmental variable. ",
88             flags = Flags.ExecReadOnly)
89
90         autoreg = Attribute("autoreg",
91             "Sets the CCND_AUTOREG environmental variable. ",
92             flags = Flags.ExecReadOnly)
93
94         prefix = Attribute("prefix",
95             "Sets the CCND_PREFIX environmental variable. ",
96             flags = Flags.ExecReadOnly)
97
98         cls._register_attribute(debug)
99         cls._register_attribute(port)
100         cls._register_attribute(sockname)
101         cls._register_attribute(capacity)
102         cls._register_attribute(mtu)
103         cls._register_attribute(data_pause)
104         cls._register_attribute(default_stale)
105         cls._register_attribute(max_stale)
106         cls._register_attribute(max_rte)
107         cls._register_attribute(keystore)
108         cls._register_attribute(listen_on)
109         cls._register_attribute(autoreg)
110         cls._register_attribute(prefix)
111
112     @classmethod
113     def _register_traces(cls):
114         log = Trace("log", "CCND log output")
115         status = Trace("status", "ccndstatus output")
116
117         cls._register_trace(log)
118         cls._register_trace(status)
119
120     def __init__(self, ec, guid):
121         super(LinuxCCND, self).__init__(ec, guid)
122
123     def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
124         self.info("Retrieving '%s' trace %s " % (name, attr))
125
126         path = os.path.join(self.app_home, name)
127         
128         command = "(test -f %s && echo 'success') || echo 'error'" % path
129         (out, err), proc = self.node.execute(command)
130
131         if (err and proc.poll()) or out.find("error") != -1:
132             msg = " Couldn't find trace %s " % name
133             self.error(msg, out, err)
134             return None
135     
136         if attr == TraceAttr.PATH:
137             return path
138
139         if attr == TraceAttr.ALL:
140             (out, err), proc = self.node.check_output(self.app_home, name)
141             
142             if err and proc.poll():
143                 msg = " Couldn't read trace %s " % name
144                 self.error(msg, out, err)
145                 return None
146
147             return out
148
149         if attr == TraceAttr.STREAM:
150             cmd = "dd if=%s bs=%d count=1 skip=%d" % (path, block, offset)
151         elif attr == TraceAttr.SIZE:
152             cmd = "stat -c%%s %s " % path
153
154         (out, err), proc = self.node.execute(cmd)
155
156         if err and proc.poll():
157             msg = " Couldn't find trace %s " % name
158             self.error(msg, out, err)
159             return None
160         
161         if attr == TraceAttr.SIZE:
162             out = int(out.strip())
163
164         return out
165             
166     def deploy(self):
167         if not self.get("command"):
168             self.set("command", self._default_command)
169         
170         if not self.get("depends"):
171             self.set("depends", self._default_dependencies)
172
173         if not self.get("sources"):
174             self.set("sources", self._default_sources)
175
176         if not self.get("build"):
177             self.set("build", self._default_build)
178
179         if not self.get("install"):
180             self.set("install", self._default_install)
181
182         if not self.get("env"):
183             self.set("env", self._default_environment)
184
185         super(LinuxCCND, self).deploy()
186
187     def start(self):
188         command = self.get("command")
189         env = self.get("env")
190         stdin = "stdin" if self.get("stdin") else None
191         stdout = "stdout" if self.get("stdout") else "stdout"
192         stderr = "stderr" if self.get("stderr") else "stderr"
193         sudo = self.get("sudo") or False
194         x11 = self.get("forwardX11") or False
195         failed = False
196
197         if not command:
198             # If no command was given, then the application 
199             # is directly marked as FINISHED
200             self._state = ResourceState.FINISHED
201     
202         self.info("Starting command '%s'" % command)
203
204         if x11:
205             # If X11 forwarding was specified, then the application
206             # can not run detached, so instead of invoking asynchronous
207             # 'run' we invoke synchronous 'execute'.
208             if not command:
209                 msg = "No command is defined but X11 forwarding has been set"
210                 self.error(msg)
211                 self._state = ResourceState.FAILED
212                 raise RuntimeError, msg
213
214             # Export environment
215             environ = "\n".join(map(lambda e: "export %s" % e, env.split(" ")))\
216                 if env else ""
217
218             command = environ + command
219             command = self.replace_paths(command)
220
221             # Mark application as started before executing the command
222             # since after the thread will be blocked by the execution
223             # until it finished
224             super(LinuxApplication, self).start()
225             
226             # If the command requires X11 forwarding, we
227             # can't run it asynchronously
228             (out, err), proc = self.node.execute(command,
229                     sudo = sudo,
230                     stdin = stdin,
231                     forward_x11 = x11)
232
233             self._state = ResourceState.FINISHED
234
235             if proc.poll() and err:
236                 failed = True
237         else:
238             # Command was  previously uploaded, now run the remote
239             # bash file asynchronously
240             cmd = "bash ./app.sh"
241             (out, err), proc = self.node.run(cmd, self.app_home, 
242                 stdin = stdin, 
243                 stdout = stdout,
244                 stderr = stderr,
245                 sudo = sudo)
246
247             # check if execution errors occurred
248             msg = " Failed to start command '%s' " % command
249             
250             if proc.poll() and err:
251                 self.error(msg, out, err)
252                 raise RuntimeError, msg
253         
254             # Check status of process running in background
255             pid, ppid = self.node.wait_pid(self.app_home)
256             if pid: self._pid = int(pid)
257             if ppid: self._ppid = int(ppid)
258
259             # If the process is not running, check for error information
260             # on the remote machine
261             if not self.pid or not self.ppid:
262                 (out, err), proc = self.node.check_output(self.app_home, 'stderr')
263                 self.error(msg, out, err)
264
265                 msg2 = " Setting state to Failed"
266                 self.debug(msg2)
267                 self._state = ResourceState.FAILED
268
269                 raise RuntimeError, msg
270             
271             super(LinuxApplication, self).start()
272
273     def stop(self):
274         command = self.get('command') or ''
275         state = self.state
276         
277         if state == ResourceState.STARTED:
278             self.info("Stopping command '%s'" % command)
279
280             command = "ccndstop"
281             env = self.get("env") 
282
283             # replace application specific paths in the command
284             command = self.replace_paths(command)
285             env = env and self.replace_paths(env)
286
287             # Upload the command to a file, and execute asynchronously
288             self.node.run_and_wait(command, self.app_home,
289                         shfile = "ccndstop.sh",
290                         env = env,
291                         pidfile = "ccndstop_pidfile", 
292                         ecodefile = "ccndstop_exitcode", 
293                         stdout = "ccndstop_stdout", 
294                         stderr = "ccndstop_stderr")
295
296
297             super(LinuxCCND, self).stop()
298
299     @property
300     def state(self):
301         if self._state == ResourceState.STARTED:
302             # To avoid overwhelming the remote hosts and the local processor
303             # with too many ssh queries, the state is only requested
304             # every 'state_check_delay' seconds.
305             state_check_delay = 0.5
306             if strfdiff(strfnow(), self._last_state_check) > state_check_delay:
307                 # check if execution errors occurred
308                 (out, err), proc = self.node.check_errors(self.app_home)
309
310                 if out or err:
311                     if err.find("No such file or directory") >= 0 :
312                         # The resource is marked as started, but the
313                         # command was not yet executed
314                         return ResourceState.READY
315
316                     msg = " Failed to execute command '%s'" % self.get("command")
317                     self.error(msg, out, err)
318                     self._state = ResourceState.FAILED
319
320                 elif self.pid and self.ppid:
321                     status = self.node.status(self.pid, self.ppid)
322
323                     if status == ProcStatus.FINISHED:
324                         self._state = ResourceState.FINISHED
325
326
327                 self._last_state_check = strfnow()
328
329         return self._state
330
331     @property
332     def _default_command(self):
333         return "ccndstart"
334
335     @property
336     def _default_dependencies(self):
337         if self.node.os in [ OSType.FEDORA_12 , OSType.FEDORA_14 ]:
338             return ( " autoconf openssl-devel  expat-devel libpcap-devel "
339                 " ecryptfs-utils-devel libxml2-devel automake gawk " 
340                 " gcc gcc-c++ git pcre-devel make ")
341         elif self.node.os in [ OSType.UBUNTU , OSType.DEBIAN]:
342             return ( " autoconf libssl-dev libexpat-dev libpcap-dev "
343                 " libecryptfs0 libxml2-utils automake gawk gcc g++ "
344                 " git-core pkg-config libpcre3-dev make ")
345         return ""
346
347     @property
348     def _default_sources(self):
349         return "http://www.ccnx.org/releases/ccnx-0.7.1.tar.gz"
350
351     @property
352     def _default_build(self):
353         sources = self.get("sources").split(" ")[0]
354         sources = os.path.basename(sources)
355
356         return (
357             # Evaluate if ccnx binaries are already installed
358             " ( "
359                 "  test -f ${EXP_HOME}/ccnx/bin/ccnd"
360             " ) || ( "
361             # If not, untar and build
362                 " ( "
363                     " mkdir -p ${SOURCES}/ccnx && "
364                     " tar xf ${SOURCES}/%(sources)s --strip-components=1 -C ${SOURCES}/ccnx "
365                  " ) && "
366                     "cd ${SOURCES}/ccnx && "
367                     # Just execute and silence warnings...
368                     " ( ./configure && make ) "
369              " )") % ({ 'sources': sources })
370
371     @property
372     def _default_install(self):
373         return (
374             # Evaluate if ccnx binaries are already installed
375             " ( "
376                 "  test -f ${EXP_HOME}/ccnx/bin/ccnd"
377             " ) || ( "
378                 "  mkdir -p ${EXP_HOME}/ccnx/bin && "
379                 "  cp -r ${SOURCES}/ccnx ${EXP_HOME}"
380             " )"
381             )
382
383     @property
384     def _default_environment(self):
385         envs = dict({
386             "debug": "CCND_DEBUG",
387             "port": "CCN_LOCAL_PORT",
388             "sockname" : "CCN_LOCAL_SOCKNAME",
389             "capacity" : "CCND_CAP",
390             "mtu" : "CCND_MTU",
391             "dataPauseMicrosec" : "CCND_DATA_PAUSE_MICROSEC",
392             "defaultTimeToStale" : "CCND_DEFAULT_TIME_TO_STALE",
393             "maxTimeToStale" : "CCND_MAX_TIME_TO_STALE",
394             "maxRteMicrosec" : "CCND_MAX_RTE_MICROSEC",
395             "keyStoreDirectory" : "CCND_KEYSTORE_DIRECTORY",
396             "listenOn" : "CCND_LISTEN_ON",
397             "autoreg" : "CCND_AUTOREG",
398             "prefix" : "CCND_PREFIX",
399             })
400
401         env = "PATH=$PATH:${EXP_HOME}/ccnx/bin"
402         for key in envs.keys():
403             val = self.get(key)
404             if val:
405                 env += " %s=%s" % (key, val)
406         
407         return env            
408         
409     def valid_connection(self, guid):
410         # TODO: Validate!
411         return True
412