minor changes to examples
[nepi.git] / src / nepi / testbeds / planetlab / application.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 from constants import TESTBED_ID
5 import plcapi
6 import operator
7 import os
8 import os.path
9 import nepi.util.server as server
10 import cStringIO
11 import subprocess
12 import rspawn
13
14 from nepi.util.constants import STATUS_NOT_STARTED, STATUS_RUNNING, \
15         STATUS_FINISHED
16
17 class Dependency(object):
18     """
19     A Dependency is in every respect like an application.
20     
21     It depends on some packages, it may require building binaries, it must deploy
22     them...
23     
24     But it has no command. Dependencies aren't ever started, or stopped, and have
25     no status.
26     """
27
28     TRACES = ('buildlog')
29
30     def __init__(self, api=None):
31         if not api:
32             api = plcapi.PLCAPI()
33         self._api = api
34         
35         # Attributes
36         self.command = None
37         self.sudo = False
38         
39         self.build = None
40         self.install = None
41         self.depends = None
42         self.buildDepends = None
43         self.sources = None
44         
45         self.stdin = None
46         self.stdout = None
47         self.stderr = None
48         self.buildlog = None
49         
50         self.add_to_path = True
51         
52         # Those are filled when the app is configured
53         self.home_path = None
54         
55         # Those are filled when an actual node is connected
56         self.node = None
57         
58         # Those are filled when the app is started
59         #   Having both pid and ppid makes it harder
60         #   for pid rollover to induce tracking mistakes
61         self._started = False
62         self._setup = False
63         self._setuper = None
64         self._pid = None
65         self._ppid = None
66     
67     def __str__(self):
68         return "%s<%s>" % (
69             self.__class__.__name__,
70             ' '.join(list(self.depends or [])
71                    + list(self.sources or []))
72         )
73     
74     def validate(self):
75         if self.home_path is None:
76             raise AssertionError, "Misconfigured application: missing home path"
77         if self.node.ident_path is None or not os.access(self.node.ident_path, os.R_OK):
78             raise AssertionError, "Misconfigured application: missing slice SSH key"
79         if self.node is None:
80             raise AssertionError, "Misconfigured application: unconnected node"
81         if self.node.hostname is None:
82             raise AssertionError, "Misconfigured application: misconfigured node"
83         if self.node.slicename is None:
84             raise AssertionError, "Misconfigured application: unspecified slice"
85     
86     def remote_trace_path(self, whichtrace):
87         if whichtrace in self.TRACES:
88             tracefile = os.path.join(self.home_path, whichtrace)
89         else:
90             tracefile = None
91         
92         return tracefile
93     
94     def sync_trace(self, local_dir, whichtrace):
95         tracefile = self.remote_trace_path(whichtrace)
96         if not tracefile:
97             return None
98         
99         local_path = os.path.join(local_dir, tracefile)
100         
101         # create parent local folders
102         proc = subprocess.Popen(
103             ["mkdir", "-p", os.path.dirname(local_path)],
104             stdout = open("/dev/null","w"),
105             stdin = open("/dev/null","r"))
106
107         if proc.wait():
108             raise RuntimeError, "Failed to synchronize trace"
109         
110         # sync files
111         (out,err),proc = server.popen_scp(
112             '%s@%s:%s' % (self.node.slicename, self.node.hostname, 
113                 tracefile),
114             local_path,
115             port = None,
116             agent = None,
117             ident_key = self.node.ident_path,
118             server_key = self.node.server_key
119             )
120         
121         if proc.wait():
122             raise RuntimeError, "Failed to synchronize trace: %s %s" % (out,err,)
123         
124         return local_path
125     
126
127     def setup(self):
128         self._make_home()
129         self._build()
130         self._setup = True
131     
132     def async_setup(self):
133         if not self._setuper:
134             self._setuper = threading.Thread(
135                 target = self.setup)
136             self._setuper.start()
137     
138     def async_setup_wait(self):
139         if not self._setup:
140             if self._setuper:
141                 self._setuper.join()
142                 if not self._setup:
143                     raise RuntimeError, "Failed to setup application"
144             else:
145                 self.setup()
146         
147     def _make_home(self):
148         # Make sure all the paths are created where 
149         # they have to be created for deployment
150         (out,err),proc = server.popen_ssh_command(
151             "mkdir -p %s" % (server.shell_escape(self.home_path),),
152             host = self.node.hostname,
153             port = None,
154             user = self.node.slicename,
155             agent = None,
156             ident_key = self.node.ident_path,
157             server_key = self.node.server_key
158             )
159         
160         if proc.wait():
161             raise RuntimeError, "Failed to set up application: %s %s" % (out,err,)
162         
163         
164         if self.stdin:
165             # Write program input
166             (out,err),proc = server.popen_scp(
167                 cStringIO.StringIO(self.stdin),
168                 '%s@%s:%s' % (self.node.slicename, self.node.hostname, 
169                     os.path.join(self.home_path, 'stdin') ),
170                 port = None,
171                 agent = None,
172                 ident_key = self.node.ident_path,
173                 server_key = self.node.server_key
174                 )
175             
176             if proc.wait():
177                 raise RuntimeError, "Failed to set up application: %s %s" % (out,err,)
178
179     def _replace_paths(self, command):
180         """
181         Replace all special path tags with shell-escaped actual paths.
182         """
183         # need to append ${HOME} if paths aren't absolute, to MAKE them absolute.
184         root = '' if self.home_path.startswith('/') else "${HOME}/"
185         return ( command
186             .replace("${SOURCES}", root+server.shell_escape(self.home_path))
187             .replace("${BUILD}", root+server.shell_escape(os.path.join(self.home_path,'build'))) )
188
189     def _build(self):
190         if self.sources:
191             sources = self.sources.split(' ')
192             
193             # Copy all sources
194             (out,err),proc = server.popen_scp(
195                 sources,
196                 "%s@%s:%s" % (self.node.slicename, self.node.hostname, 
197                     os.path.join(self.home_path,'.'),),
198                 ident_key = self.node.ident_path,
199                 server_key = self.node.server_key
200                 )
201         
202             if proc.wait():
203                 raise RuntimeError, "Failed upload source file %r: %s %s" % (source, out,err,)
204             
205         if self.buildDepends:
206             # Install build dependencies
207             (out,err),proc = server.popen_ssh_command(
208                 "sudo -S yum -y install %(packages)s" % {
209                     'packages' : self.buildDepends
210                 },
211                 host = self.node.hostname,
212                 port = None,
213                 user = self.node.slicename,
214                 agent = None,
215                 ident_key = self.node.ident_path,
216                 server_key = self.node.server_key
217                 )
218         
219             if proc.wait():
220                 raise RuntimeError, "Failed instal build dependencies: %s %s" % (out,err,)
221         
222             
223         if self.build:
224             # Build sources
225             (out,err),proc = server.popen_ssh_command(
226                 "cd %(home)s && mkdir -p build && cd build && %(command)s" % {
227                     'command' : self._replace_paths(self.build),
228                     'home' : server.shell_escape(self.home_path),
229                 },
230                 host = self.node.hostname,
231                 port = None,
232                 user = self.node.slicename,
233                 agent = None,
234                 ident_key = self.node.ident_path,
235                 server_key = self.node.server_key
236                 )
237         
238             if proc.wait():
239                 raise RuntimeError, "Failed instal build sources: %s %s" % (out,err,)
240
241             # Make archive
242             (out,err),proc = server.popen_ssh_command(
243                 "cd %(home)s && tar czf build.tar.gz build" % {
244                     'command' : self._replace_paths(self.build),
245                     'home' : server.shell_escape(self.home_path),
246                 },
247                 host = self.node.hostname,
248                 port = None,
249                 user = self.node.slicename,
250                 agent = None,
251                 ident_key = self.node.ident_path,
252                 server_key = self.node.server_key
253                 )
254         
255             if proc.wait():
256                 raise RuntimeError, "Failed instal build sources: %s %s" % (out,err,)
257
258         if self.install:
259             # Install application
260             (out,err),proc = server.popen_ssh_command(
261                 "cd %(home)s && cd build && %(command)s" % {
262                     'command' : self._replace_paths(self.install),
263                     'home' : server.shell_escape(self.home_path),
264                 },
265                 host = self.node.hostname,
266                 port = None,
267                 user = self.node.slicename,
268                 agent = None,
269                 ident_key = self.node.ident_path,
270                 server_key = self.node.server_key
271                 )
272         
273             if proc.wait():
274                 raise RuntimeError, "Failed instal build sources: %s %s" % (out,err,)
275
276 class Application(Dependency):
277     """
278     An application also has dependencies, but also a command to be ran and monitored.
279     
280     It adds the output of that command as traces.
281     """
282     
283     TRACES = ('stdout','stderr','buildlog')
284     
285     def __init__(self, api=None):
286         super(Application,self).__init__(api)
287         
288         # Attributes
289         self.command = None
290         self.sudo = False
291         
292         self.stdin = None
293         self.stdout = None
294         self.stderr = None
295         
296         # Those are filled when the app is started
297         #   Having both pid and ppid makes it harder
298         #   for pid rollover to induce tracking mistakes
299         self._started = False
300         self._pid = None
301         self._ppid = None
302
303         # Do not add to the python path of nodes
304         self.add_to_path = False
305     
306     def __str__(self):
307         return "%s<command:%s%s>" % (
308             self.__class__.__name__,
309             "sudo " if self.sudo else "",
310             self.command,
311         )
312     
313     def start(self):
314         # Create shell script with the command
315         # This way, complex commands and scripts can be ran seamlessly
316         # sync files
317         command = cStringIO.StringIO()
318         command.write('export PYTHONPATH=$PYTHONPATH:%s\n' % (
319             ':'.join(["${HOME}/"+server.shell_escape(s) for s in self.node.pythonpath])
320         ))
321         command.write('export PATH=$PATH:%s\n' % (
322             ':'.join(["${HOME}/"+server.shell_escape(s) for s in self.node.pythonpath])
323         ))
324         command.write(self.command)
325         command.seek(0)
326         
327         (out,err),proc = server.popen_scp(
328             command,
329             '%s@%s:%s' % (self.node.slicename, self.node.hostname, 
330                 os.path.join(self.home_path, "app.sh")),
331             port = None,
332             agent = None,
333             ident_key = self.node.ident_path,
334             server_key = self.node.server_key
335             )
336         
337         if proc.wait():
338             raise RuntimeError, "Failed to set up application: %s %s" % (out,err,)
339         
340         # Start process in a "daemonized" way, using nohup and heavy
341         # stdin/out redirection to avoid connection issues
342         (out,err),proc = rspawn.remote_spawn(
343             self._replace_paths("bash ./app.sh"),
344             
345             pidfile = './pid',
346             home = self.home_path,
347             stdin = 'stdin' if self.stdin is not None else '/dev/null',
348             stdout = 'stdout' if self.stdout else '/dev/null',
349             stderr = 'stderr' if self.stderr else '/dev/null',
350             sudo = self.sudo,
351             
352             host = self.node.hostname,
353             port = None,
354             user = self.node.slicename,
355             agent = None,
356             ident_key = self.node.ident_path,
357             server_key = self.node.server_key
358             )
359         
360         if proc.wait():
361             raise RuntimeError, "Failed to set up application: %s %s" % (out,err,)
362
363         self._started = True
364
365     def checkpid(self):            
366         # Get PID/PPID
367         # NOTE: wait a bit for the pidfile to be created
368         if self._started and not self._pid or not self._ppid:
369             pidtuple = rspawn.remote_check_pid(
370                 os.path.join(self.home_path,'pid'),
371                 host = self.node.hostname,
372                 port = None,
373                 user = self.node.slicename,
374                 agent = None,
375                 ident_key = self.node.ident_path,
376                 server_key = self.node.server_key
377                 )
378             
379             if pidtuple:
380                 self._pid, self._ppid = pidtuple
381     
382     def status(self):
383         self.checkpid()
384         if not self._started:
385             return STATUS_NOT_STARTED
386         elif not self._pid or not self._ppid:
387             return STATUS_NOT_STARTED
388         else:
389             status = rspawn.remote_status(
390                 self._pid, self._ppid,
391                 host = self.node.hostname,
392                 port = None,
393                 user = self.node.slicename,
394                 agent = None,
395                 ident_key = self.node.ident_path
396                 )
397             
398             if status is rspawn.NOT_STARTED:
399                 return STATUS_NOT_STARTED
400             elif status is rspawn.RUNNING:
401                 return STATUS_RUNNING
402             elif status is rspawn.FINISHED:
403                 return STATUS_FINISHED
404             else:
405                 # WTF?
406                 return STATUS_NOT_STARTED
407     
408     def kill(self):
409         status = self.status()
410         if status == STATUS_RUNNING:
411             # kill by ppid+pid - SIGTERM first, then try SIGKILL
412             rspawn.remote_kill(
413                 self._pid, self._ppid,
414                 host = self.node.hostname,
415                 port = None,
416                 user = self.node.slicename,
417                 agent = None,
418                 ident_key = self.node.ident_path,
419                 server_key = self.node.server_key
420                 )
421     
422 class NepiDependency(Dependency):
423     """
424     A Dependency is in every respect like an application.
425     
426     It depends on some packages, it may require building binaries, it must deploy
427     them...
428     
429     But it has no command. Dependencies aren't ever started, or stopped, and have
430     no status.
431     """
432     
433     # Class attribute holding a *weak* reference to the shared NEPI tar file
434     # so that they may share it. Don't operate on the file itself, it would
435     # be a mess, just use its path.
436     _shared_nepi_tar = None
437     
438     def __init__(self, api = None):
439         super(NepiDependency, self).__init__(api)
440         
441         self._tarball = None
442         
443         self.depends = 'python python-ipaddrn python-setuptools'
444         
445         # our sources are in our ad-hoc tarball
446         self.sources = self.tarball.name
447         
448         tarname = os.path.basename(self.tarball.name)
449         
450         # it's already built - just move the tarball into place
451         self.build = "mv ${SOURCES}/%s ." % (tarname,)
452         
453         # unpack it into sources, and we're done
454         self.install = "tar xzf ${BUILD}/%s -C .." % (tarname,)
455     
456     @property
457     def tarball(self):
458         if self._tarball is None:
459             shared_tar = self._shared_nepi_tar and self._shared_nepi_tar()
460             if shared_tar is not None:
461                 self._tarball = shared_tar
462             else:
463                 # Build an ad-hoc tarball
464                 # Prebuilt
465                 import nepi
466                 import tempfile
467                 
468                 shared_tar = tempfile.NamedTemporaryFile(prefix='nepi-src-', suffix='.tar.gz')
469                 
470                 proc = subprocess.Popen(
471                     ["tar", "czf", shared_tar.name, 
472                         '-C', os.path.join(os.path.dirname(os.path.dirname(nepi.__file__)),'.'), 
473                         'nepi'],
474                     stdout = open("/dev/null","w"),
475                     stdin = open("/dev/null","r"))
476
477                 if proc.wait():
478                     raise RuntimeError, "Failed to create nepi tarball"
479                 
480                 self._tarball = self._shared_nepi_tar = shared_tar
481                 
482         return self._tarball
483
484