Initially working version of PlanetLab testbed implementation.
[nepi.git] / src / nepi / testbeds / planetlab / application.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 from constants import TESTBED_ID
5 import plcapi
6 import operator
7 import os
8 import os.path
9 import nepi.util.server as server
10 import cStringIO
11 import subprocess
12
13 from nepi.util.constants import STATUS_NOT_STARTED, STATUS_RUNNING, \
14         STATUS_FINISHED
15
16 class Application(object):
17     def __init__(self, api=None):
18         if not api:
19             api = plcapi.PLCAPI()
20         self._api = api
21         
22         # Attributes
23         self.command = None
24         self.sudo = False
25         
26         self.stdin = None
27         self.stdout = None
28         self.stderr = None
29         
30         # Those are filled when the app is configured
31         self.home_path = None
32         self.ident_path = None
33         self.slicename = None
34         
35         # Those are filled when an actual node is connected
36         self.node = None
37         
38         # Those are filled when the app is started
39         #   Having both pid and ppid makes it harder
40         #   for pid rollover to induce tracking mistakes
41         self._started = False
42         self._pid = None
43         self._ppid = None
44     
45     def __str__(self):
46         return "%s<command:%s%s>" % (
47             self.__class__.__name__,
48             "sudo " if self.sudo else "",
49             self.command,
50         )
51     
52     def validate(self):
53         if self.home_path is None:
54             raise AssertionError, "Misconfigured application: missing home path"
55         if self.ident_path is None or not os.access(self.ident_path, os.R_OK):
56             raise AssertionError, "Misconfigured application: missing slice SSH key"
57         if self.node is None:
58             raise AssertionError, "Misconfigured application: unconnected node"
59         if self.node.hostname is None:
60             raise AssertionError, "Misconfigured application: misconfigured node"
61         if self.slicename is None:
62             raise AssertionError, "Misconfigured application: unspecified slice"
63
64     def start(self):
65         # Start process in a "daemonized" way, using nohup and heavy
66         # stdin/out redirection to avoid connection issues
67         (out,err),proc = server.popen_ssh_command(
68             "cd %(home)s ; rm -f ./pid ; ( echo $$ $PPID > ./pid ; %(sudo)s nohup %(command)s > %(stdout)s 2> %(stderr)s < %(stdin)s ) &" % {
69                 'home' : server.shell_escape(self.home_path),
70                 'command' : self.command,
71                 'stdout' : 'stdout' if self.stdout else '/dev/null' ,
72                 'stderr' : 'stderr' if self.stderr else '/dev/null' ,
73                 'stdin' : 'stdin' if self.stdin is not None else '/dev/null' ,
74                 'sudo' : 'sudo' if self.sudo else '',
75             },
76             host = self.node.hostname,
77             port = None,
78             user = self.slicename,
79             agent = None,
80             ident_key = self.ident_path
81             )
82         
83         if proc.wait():
84             raise RuntimeError, "Failed to set up application: %s %s" % (out,err,)
85
86         self._started = True
87
88     def checkpid(self):            
89         # Get PID/PPID
90         # NOTE: wait a bit for the pidfile to be created
91         if self._started and not self._pid or not self._ppid:
92             (out,err),proc = server.popen_ssh_command(
93                 "cat %(pidfile)s" % {
94                     'pidfile' : server.shell_escape(os.path.join(self.home_path,'pid')),
95                 },
96                 host = self.node.hostname,
97                 port = None,
98                 user = self.slicename,
99                 agent = None,
100                 ident_key = self.ident_path
101                 )
102             if out:
103                 try:
104                     self._pid, self._ppid = map(int,out.strip().split(' ',1))
105                 except:
106                     # Ignore, many ways to fail that don't matter that much
107                     pass
108     
109     def status(self):
110         self.checkpid()
111         if not self._started:
112             return STATUS_NOT_STARTED
113         elif not self._pid or not self._ppid:
114             return STATUS_NOT_STARTED
115         else:
116             (out,err),proc = server.popen_ssh_command(
117                 "ps --ppid $(ppid)d -o pid | grep -c $(pid)d" % {
118                     'ppid' : self._ppid,
119                     'pid' : self._pid,
120                 },
121                 host = self.node.hostname,
122                 port = None,
123                 user = self.slicename,
124                 agent = None,
125                 ident_key = self.ident_path
126                 )
127             
128             status = False
129             if out:
130                 try:
131                     status = bool(int(out.strip()))
132                 except:
133                     # Ignore, many ways to fail that don't matter that much
134                     pass
135             return STATUS_RUNNING if status else STATUS_FINISHED
136     
137     def kill(self):
138         status = self.status()
139         if status == STATUS_RUNNING:
140             # kill by ppid+pid - SIGTERM first, then try SIGKILL
141             (out,err),proc = server.popen_ssh_command(
142                 """
143 kill $(pid)d $(ppid)d 
144 for x in 1 2 3 4 5 6 7 8 9 0 ; do 
145     sleep 0.1 
146     if [ `ps --pid $(ppid)d -o pid | grep -c $(pid)d` == `0` ]; then
147         break
148     fi
149     sleep 0.9
150 done
151 if [ `ps --pid $(ppid)d -o pid | grep -c $(pid)d` != `0` ]; then
152     kill -9 $(pid)d $(ppid)d
153 fi
154 """ % {
155                     'ppid' : self._ppid,
156                     'pid' : self._pid,
157                 },
158                 host = self.node.hostname,
159                 port = None,
160                 user = self.slicename,
161                 agent = None,
162                 ident_key = self.ident_path
163                 )
164             
165             status = False
166             if out:
167                 try:
168                     status = bool(int(out.strip()))
169                 except:
170                     # Ignore, many ways to fail that don't matter that much
171                     pass
172             return STATUS_RUNNING if status else STATUS_FINISHED
173     
174     def remote_trace_path(self, whichtrace):
175         if whichtrace in ('stdout','stderr'):
176             tracefile = os.path.join(self.home_path, whichtrace)
177         else:
178             tracefile = None
179         
180         return tracefile
181     
182     def sync_trace(self, local_dir, whichtrace):
183         tracefile = self.remote_trace_path(whichtrace)
184         if not tracefile:
185             return None
186         
187         local_path = os.path.join(local_dir, tracefile)
188         
189         # create parent local folders
190         proc = subprocess.Popen(
191             ["mkdir", "-p", os.path.dirname(local_path)],
192             stdout = open("/dev/null","w"),
193             stdin = open("/dev/null","r"))
194
195         if proc.wait():
196             raise RuntimeError, "Failed to synchronize trace: %s %s" % (out,err,)
197         
198         # sync files
199         (out,err),proc = server.popen_scp(
200             '%s@%s:%s' % (self.slicename, self.node.hostname, 
201                 tracefile),
202             local_path,
203             port = None,
204             agent = None,
205             ident_key = self.ident_path
206             )
207         
208         if proc.wait():
209             raise RuntimeError, "Failed to synchronize trace: %s %s" % (out,err,)
210         
211         return local_path
212     
213
214     def setup(self):
215         # Make sure all the paths are created where 
216         # they have to be created for deployment
217         (out,err),proc = server.popen_ssh_command(
218             "mkdir -p %s" % (server.shell_escape(self.home_path),),
219             host = self.node.hostname,
220             port = None,
221             user = self.slicename,
222             agent = None,
223             ident_key = self.ident_path
224             )
225         
226         if proc.wait():
227             raise RuntimeError, "Failed to set up application: %s %s" % (out,err,)
228         
229         
230         if self.stdin:
231             # Write program input
232             (out,err),proc = server.popen_scp(
233                 cStringIO.StringIO(self.stdin),
234                 '%s@%s:%s' % (self.slicename, self.node.hostname, 
235                     os.path.join(self.home_path, 'stdin') ),
236                 port = None,
237                 agent = None,
238                 ident_key = self.ident_path
239                 )
240             
241             if proc.wait():
242                 raise RuntimeError, "Failed to set up application: %s %s" % (out,err,)
243
244