Async setup of TUNs and APPs, for much quicker deployment.
[nepi.git] / src / nepi / testbeds / planetlab / application.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 from constants import TESTBED_ID
5 import plcapi
6 import operator
7 import os
8 import os.path
9 import nepi.util.server as server
10 import cStringIO
11 import subprocess
12 import rspawn
13
14 from nepi.util.constants import STATUS_NOT_STARTED, STATUS_RUNNING, \
15         STATUS_FINISHED
16
17 class Application(object):
18     def __init__(self, api=None):
19         if not api:
20             api = plcapi.PLCAPI()
21         self._api = api
22         
23         # Attributes
24         self.command = None
25         self.sudo = False
26         
27         self.build = None
28         self.install = None
29         self.depends = None
30         self.buildDepends = None
31         self.sources = None
32         
33         self.stdin = None
34         self.stdout = None
35         self.stderr = None
36         self.buildlog = None
37         
38         # Those are filled when the app is configured
39         self.home_path = None
40         self.ident_path = None
41         self.slicename = None
42         
43         # Those are filled when an actual node is connected
44         self.node = None
45         
46         # Those are filled when the app is started
47         #   Having both pid and ppid makes it harder
48         #   for pid rollover to induce tracking mistakes
49         self._started = False
50         self._setup = False
51         self._setuper = None
52         self._pid = None
53         self._ppid = None
54     
55     def __str__(self):
56         return "%s<command:%s%s>" % (
57             self.__class__.__name__,
58             "sudo " if self.sudo else "",
59             self.command,
60         )
61     
62     def validate(self):
63         if self.home_path is None:
64             raise AssertionError, "Misconfigured application: missing home path"
65         if self.ident_path is None or not os.access(self.ident_path, os.R_OK):
66             raise AssertionError, "Misconfigured application: missing slice SSH key"
67         if self.node is None:
68             raise AssertionError, "Misconfigured application: unconnected node"
69         if self.node.hostname is None:
70             raise AssertionError, "Misconfigured application: misconfigured node"
71         if self.slicename is None:
72             raise AssertionError, "Misconfigured application: unspecified slice"
73
74     def start(self):
75         # Create shell script with the command
76         # This way, complex commands and scripts can be ran seamlessly
77         # sync files
78         (out,err),proc = server.popen_scp(
79             cStringIO.StringIO(self.command),
80             '%s@%s:%s' % (self.slicename, self.node.hostname, 
81                 os.path.join(self.home_path, "app.sh")),
82             port = None,
83             agent = None,
84             ident_key = self.ident_path,
85             server_key = self.node.server_key
86             )
87         
88         if proc.wait():
89             raise RuntimeError, "Failed to set up application: %s %s" % (out,err,)
90         
91         # Start process in a "daemonized" way, using nohup and heavy
92         # stdin/out redirection to avoid connection issues
93         (out,err),proc = rspawn.remote_spawn(
94             self._replace_paths("bash ./app.sh"),
95             
96             pidfile = './pid',
97             home = self.home_path,
98             stdin = 'stdin' if self.stdin is not None else '/dev/null',
99             stdout = 'stdout' if self.stdout else '/dev/null',
100             stderr = 'stderr' if self.stderr else '/dev/null',
101             sudo = self.sudo,
102             
103             host = self.node.hostname,
104             port = None,
105             user = self.slicename,
106             agent = None,
107             ident_key = self.ident_path,
108             server_key = self.node.server_key
109             )
110         
111         if proc.wait():
112             raise RuntimeError, "Failed to set up application: %s %s" % (out,err,)
113
114         self._started = True
115
116     def checkpid(self):            
117         # Get PID/PPID
118         # NOTE: wait a bit for the pidfile to be created
119         if self._started and not self._pid or not self._ppid:
120             pidtuple = rspawn.remote_check_pid(
121                 os.path.join(self.home_path,'pid'),
122                 host = self.node.hostname,
123                 port = None,
124                 user = self.slicename,
125                 agent = None,
126                 ident_key = self.ident_path,
127                 server_key = self.node.server_key
128                 )
129             
130             if pidtuple:
131                 self._pid, self._ppid = pidtuple
132     
133     def status(self):
134         self.checkpid()
135         if not self._started:
136             return STATUS_NOT_STARTED
137         elif not self._pid or not self._ppid:
138             return STATUS_NOT_STARTED
139         else:
140             status = rspawn.remote_status(
141                 self._pid, self._ppid,
142                 host = self.node.hostname,
143                 port = None,
144                 user = self.slicename,
145                 agent = None,
146                 ident_key = self.ident_path
147                 )
148             
149             if status is rspawn.NOT_STARTED:
150                 return STATUS_NOT_STARTED
151             elif status is rspawn.RUNNING:
152                 return STATUS_RUNNING
153             elif status is rspawn.FINISHED:
154                 return STATUS_FINISHED
155             else:
156                 # WTF?
157                 return STATUS_NOT_STARTED
158     
159     def kill(self):
160         status = self.status()
161         if status == STATUS_RUNNING:
162             # kill by ppid+pid - SIGTERM first, then try SIGKILL
163             rspawn.remote_kill(
164                 self._pid, self._ppid,
165                 host = self.node.hostname,
166                 port = None,
167                 user = self.slicename,
168                 agent = None,
169                 ident_key = self.ident_path,
170                 server_key = self.node.server_key
171                 )
172     
173     def remote_trace_path(self, whichtrace):
174         if whichtrace in ('stdout','stderr'):
175             tracefile = os.path.join(self.home_path, whichtrace)
176         else:
177             tracefile = None
178         
179         return tracefile
180     
181     def sync_trace(self, local_dir, whichtrace):
182         tracefile = self.remote_trace_path(whichtrace)
183         if not tracefile:
184             return None
185         
186         local_path = os.path.join(local_dir, tracefile)
187         
188         # create parent local folders
189         proc = subprocess.Popen(
190             ["mkdir", "-p", os.path.dirname(local_path)],
191             stdout = open("/dev/null","w"),
192             stdin = open("/dev/null","r"))
193
194         if proc.wait():
195             raise RuntimeError, "Failed to synchronize trace: %s %s" % (out,err,)
196         
197         # sync files
198         (out,err),proc = server.popen_scp(
199             '%s@%s:%s' % (self.slicename, self.node.hostname, 
200                 tracefile),
201             local_path,
202             port = None,
203             agent = None,
204             ident_key = self.ident_path,
205             server_key = self.node.server_key
206             )
207         
208         if proc.wait():
209             raise RuntimeError, "Failed to synchronize trace: %s %s" % (out,err,)
210         
211         return local_path
212     
213
214     def setup(self):
215         self._make_home()
216         self._build()
217         self._setup = True
218     
219     def async_setup(self):
220         if not self._setuper:
221             self._setuper = threading.Thread(
222                 target = self.setup)
223             self._setuper.start()
224     
225     def async_setup_wait(self):
226         if not self._setup:
227             if self._setuper:
228                 self._setuper.join()
229                 if not self._setup:
230                     raise RuntimeError, "Failed to setup application"
231             else:
232                 self.setup()
233         
234     def _make_home(self):
235         # Make sure all the paths are created where 
236         # they have to be created for deployment
237         (out,err),proc = server.popen_ssh_command(
238             "mkdir -p %s" % (server.shell_escape(self.home_path),),
239             host = self.node.hostname,
240             port = None,
241             user = self.slicename,
242             agent = None,
243             ident_key = self.ident_path,
244             server_key = self.node.server_key
245             )
246         
247         if proc.wait():
248             raise RuntimeError, "Failed to set up application: %s %s" % (out,err,)
249         
250         
251         if self.stdin:
252             # Write program input
253             (out,err),proc = server.popen_scp(
254                 cStringIO.StringIO(self.stdin),
255                 '%s@%s:%s' % (self.slicename, self.node.hostname, 
256                     os.path.join(self.home_path, 'stdin') ),
257                 port = None,
258                 agent = None,
259                 ident_key = self.ident_path,
260                 server_key = self.node.server_key
261                 )
262             
263             if proc.wait():
264                 raise RuntimeError, "Failed to set up application: %s %s" % (out,err,)
265
266     def _replace_paths(self, command):
267         """
268         Replace all special path tags with shell-escaped actual paths.
269         """
270         # need to append ${HOME} if paths aren't absolute, to MAKE them absolute.
271         root = '' if self.home_path.startswith('/') else "${HOME}/"
272         return ( command
273             .replace("${SOURCES}", root+server.shell_escape(self.home_path))
274             .replace("${BUILD}", root+server.shell_escape(os.path.join(self.home_path,'build'))) )
275
276     def _build(self):
277         if self.sources:
278             sources = self.sources.split(' ')
279             
280             # Copy all sources
281             (out,err),proc = server.popen_scp(
282                 sources,
283                 "%s@%s:%s" % (self.slicename, self.node.hostname, 
284                     os.path.join(self.home_path,'.'),),
285                 ident_key = self.ident_path,
286                 server_key = self.node.server_key
287                 )
288         
289             if proc.wait():
290                 raise RuntimeError, "Failed upload source file %r: %s %s" % (source, out,err,)
291             
292         if self.buildDepends:
293             # Install build dependencies
294             (out,err),proc = server.popen_ssh_command(
295                 "sudo -S yum -y install %(packages)s" % {
296                     'packages' : self.buildDepends
297                 },
298                 host = self.node.hostname,
299                 port = None,
300                 user = self.slicename,
301                 agent = None,
302                 ident_key = self.ident_path,
303                 server_key = self.node.server_key
304                 )
305         
306             if proc.wait():
307                 raise RuntimeError, "Failed instal build dependencies: %s %s" % (out,err,)
308         
309             
310         if self.build:
311             # Build sources
312             (out,err),proc = server.popen_ssh_command(
313                 "cd %(home)s && mkdir -p build && cd build && %(command)s" % {
314                     'command' : self._replace_paths(self.build),
315                     'home' : server.shell_escape(self.home_path),
316                 },
317                 host = self.node.hostname,
318                 port = None,
319                 user = self.slicename,
320                 agent = None,
321                 ident_key = self.ident_path,
322                 server_key = self.node.server_key
323                 )
324         
325             if proc.wait():
326                 raise RuntimeError, "Failed instal build sources: %s %s" % (out,err,)
327
328             # Make archive
329             (out,err),proc = server.popen_ssh_command(
330                 "cd %(home)s && tar czf build.tar.gz build" % {
331                     'command' : self._replace_paths(self.build),
332                     'home' : server.shell_escape(self.home_path),
333                 },
334                 host = self.node.hostname,
335                 port = None,
336                 user = self.slicename,
337                 agent = None,
338                 ident_key = self.ident_path,
339                 server_key = self.node.server_key
340                 )
341         
342             if proc.wait():
343                 raise RuntimeError, "Failed instal build sources: %s %s" % (out,err,)
344
345         if self.install:
346             # Install application
347             (out,err),proc = server.popen_ssh_command(
348                 "cd %(home)s && cd build && %(command)s" % {
349                     'command' : self._replace_paths(self.install),
350                     'home' : server.shell_escape(self.home_path),
351                 },
352                 host = self.node.hostname,
353                 port = None,
354                 user = self.slicename,
355                 agent = None,
356                 ident_key = self.ident_path,
357                 server_key = self.node.server_key
358                 )
359         
360             if proc.wait():
361                 raise RuntimeError, "Failed instal build sources: %s %s" % (out,err,)
362