Added scheduler and task processing thread to ec. Completed deploy and release methods.
[nepi.git] / src / neco / resources / linux / node.py
1 from neco.execution.resource import ResourceManager
2 from neco.util.sshfuncs import eintr_retry, rexec, rcopy, \
3         rspawn, rcheck_pid, rstatus, rkill, make_control_path, RUNNING 
4
5 import cStringIO
6 import logging
7 import os.path
8 import subprocess
9
10 class LinuxNode(ResourceManager):
11     def __init__(self, ec, guid):
12         super(LinuxNode, self).__init__(ec, guid)
13         self.ip = None
14         self.host = None
15         self.user = None
16         self.port = None
17         self.identity_file = None
18         self.enable_x11 = False
19         self.forward_agent = True
20
21         # packet management system - either yum or apt for now...
22         self._pm = None
23        
24         # Logging
25         loglevel = "debug"
26         self._logger = logging.getLogger("neco.resources.base.LinuxNode.%s" %\
27                 self.guid)
28         self._logger.setLevel(getattr(logging, loglevel.upper()))
29
30         # For ssh connections we use the ControlMaster option which 
31         # allows us to decrease the number of open ssh network connections.
32         # Subsequent ssh connections will reuse a same master connection.
33         # This might pose a problem when using X11 and ssh-agent, since
34         # display and agent forwarded will be those of the first connection,
35         # which created the master. 
36         # To avoid reusing a master created by a previous LinuxNode instance,
37         # we explicitly erase the ControlPath socket.
38         control_path = make_control_path(self.user, self.host, self.port)
39         try:
40             os.remove(control_path)
41         except:
42             pass
43
44     @property
45     def pm(self):
46         if self._pm:
47             return self._pm
48
49         if (not (self.host or self.ip) or not self.user):
50             msg = "Can't resolve package management system. Insufficient data."
51             self._logger.error(msg)
52             raise RuntimeError(msg)
53
54         out = self.execute("cat /etc/issue")
55
56         if out.find("Fedora") == 0:
57             self._pm = "yum"
58         elif out.find("Debian") == 0 or out.find("Ubuntu") ==0:
59             self._pm = "apt-get"
60         else:
61             msg = "Can't resolve package management system. Unknown OS."
62             self._logger.error(msg)
63             raise RuntimeError(msg)
64
65         return self._pm
66
67     @property
68     def is_localhost(self):
69         return ( self.host or self.ip ) in ['localhost', '127.0.0.7', '::1']
70
71     def install(self, packages):
72         if not isinstance(packages, list):
73             packages = [packages]
74
75         for p in packages:
76             self.execute("%s -y install %s" % (self.pm, p), sudo = True, 
77                     tty = True)
78
79     def uninstall(self, packages):
80         if not isinstance(packages, list):
81             packages = [packages]
82
83         for p in packages:
84             self.execute("%s -y remove %s" % (self.pm, p), sudo = True, 
85                     tty = True)
86
87     def upload(self, src, dst):
88         if not os.path.isfile(src):
89             src = cStringIO.StringIO(src)
90
91         if not self.is_localhost:
92             # Build destination as <user>@<server>:<path>
93             dst = "%s@%s:%s" % (self.user, self.host or self.ip, dst)
94         return self.copy(src, dst)
95
96     def download(self, src, dst):
97         if not self.is_localhost:
98             # Build destination as <user>@<server>:<path>
99             src = "%s@%s:%s" % (self.user, self.host or self.ip, src)
100         return self.copy(src, dst)
101         
102     def is_alive(self, verbose = False):
103         if self.is_localhost:
104             return True
105
106         try:
107             out = self.execute("echo 'ALIVE'",
108                 timeout = 60,
109                 err_on_timeout = False,
110                 persistent = False)
111         except:
112             if verbose:
113                 self._logger.warn("Unresponsive node %s got:\n%s%s", self.host, out, err)
114             return False
115
116         if out.strip().startswith('ALIVE'):
117             return True
118         else:
119             if verbose:
120                 self._logger.warn("Unresponsive node %s got:\n%s%s", self.host, out, err)
121             return False
122
123     def mkdir(self, path, clean = True):
124         if clean:
125             self.rmdir(path)
126
127         return self.execute(
128             "mkdir -p %s" % path,
129             timeout = 120,
130             retry = 3
131             )
132
133     def rmdir(self, path):
134         return self.execute(
135             "rm -rf %s" % path,
136             timeout = 120,
137             retry = 3
138             )
139
140     def copy(self, src, dst):
141         if self.is_localhost:
142             command = ["cp", "-R", src, dst]
143             p = subprocess.Popen(command, stdout=subprocess.PIPE, 
144                     stderr=subprocess.PIPE)
145             out, err = p.communicate()
146         else:
147             (out, err), proc = eintr_retry(rcopy)(
148                 src, dst, 
149                 port = self.port,
150                 agent = self.agent,
151                 identity_file = self.identity_file)
152
153             if proc.wait():
154                 msg = "Error uploading to %s got:\n%s%s" %\
155                         (self.host or self.ip, out, err)
156                 self._logger.error(msg)
157                 raise RuntimeError(msg)
158
159         return (out, err)
160
161     def execute(self, command,
162             sudo = False,
163             stdin = None, 
164             tty = False,
165             env = None,
166             timeout = None,
167             retry = 0,
168             err_on_timeout = True,
169             connect_timeout = 30,
170             persistent = True):
171         """ Notice that this invocation will block until the
172         execution finishes. If this is not the desired behavior,
173         use 'run' instead."""
174
175         if self.is_localhost:
176             if env:
177                 export = ''
178                 for envkey, envval in env.iteritems():
179                     export += '%s=%s ' % (envkey, envval)
180                 command = export + command
181
182             if sudo:
183                 command = "sudo " + command
184
185             p = subprocess.Popen(command, stdout=subprocess.PIPE, 
186                     stderr=subprocess.PIPE)
187             out, err = p.communicate()
188         else:
189             (out, err), proc = eintr_retry(rexec)(
190                     command, 
191                     self.host or self.ip, 
192                     self.user,
193                     port = self.port, 
194                     agent = self.forward_agent,
195                     sudo = sudo,
196                     stdin = stdin, 
197                     identity_file = self.identity_file,
198                     tty = tty,
199                     x11 = self.enable_x11,
200                     env = env,
201                     timeout = timeout,
202                     retry = retry,
203                     err_on_timeout = err_on_timeout,
204                     connect_timeout = connect_timeout,
205                     persistent = persistent)
206
207             if proc.wait():
208                 msg = "Failed to execute command %s at node %s: %s %s" % \
209                         (command, self.host or self.ip, out, err,)
210                 self._logger.warn(msg)
211                 raise RuntimeError(msg)
212
213         return (out, err)
214
215     def run(self, command, home, 
216             stdin = None, 
217             stdout = 'stdout', 
218             stderr = 'stderr', 
219             sudo = False):
220         self._logger.info("Running %s", command)
221         
222         pidfile = './pid',
223
224         if self.is_localhost:
225             if stderr == stdout:
226                 stderr = '&1'
227             else:
228                 stderr = ' ' + stderr
229             
230             daemon_command = '{ { %(command)s  > %(stdout)s 2>%(stderr)s < %(stdin)s & } ; echo $! 1 > %(pidfile)s ; }' % {
231                 'command' : command,
232                 'pidfile' : pidfile,
233                 
234                 'stdout' : stdout,
235                 'stderr' : stderr,
236                 'stdin' : stdin,
237             }
238             
239             cmd = "%(create)s%(gohome)s rm -f %(pidfile)s ; %(sudo)s nohup bash -c '%(command)s' " % {
240                     'command' : daemon_command,
241                     
242                     'sudo' : 'sudo -S' if sudo else '',
243                     
244                     'pidfile' : pidfile,
245                     'gohome' : 'cd %s ; ' % home if home else '',
246                     'create' : 'mkdir -p %s ; ' % home if create_home else '',
247                 }
248             p = subprocess.Popen(command, stdout=subprocess.PIPE, 
249                     stderr=subprocess.PIPE)
250             out, err = p.communicate()
251         else:
252             # Start process in a "daemonized" way, using nohup and heavy
253             # stdin/out redirection to avoid connection issues
254             (out,err), proc = rspawn(
255                 command,
256                 pidfile = pidfile,
257                 home = home,
258                 stdin = stdin if stdin is not None else '/dev/null',
259                 stdout = stdout if stdout else '/dev/null',
260                 stderr = stderr if stderr else '/dev/null',
261                 sudo = sudo,
262                 host = self.host,
263                 user = self.user,
264                 port = self.port,
265                 agent = self.forward_agent,
266                 identity_file = self.identity_file
267                 )
268             
269             if proc.wait():
270                 raise RuntimeError, "Failed to set up application: %s %s" % (out,err,)
271
272         return (out, err)
273     
274     def checkpid(self, path):            
275         # Get PID/PPID
276         # NOTE: wait a bit for the pidfile to be created
277         pidtuple = rcheck_pid(
278             os.path.join(path, 'pid'),
279             host = self.host,
280             user = self.user,
281             port = self.port,
282             agent = self.forward_agent,
283             identity_file = self.identity_file
284             )
285         
286         return pidtuple
287     
288     def status(self, pid, ppid):
289         status = rstatus(
290                 pid, ppid,
291                 host = self.host,
292                 user = self.user,
293                 port = self.port,
294                 agent = self.forward_agent,
295                 identity_file = self.identity_file
296                 )
297            
298         return status
299     
300     def kill(self, pid, ppid, sudo = False):
301         status = self.status(pid, ppid)
302         if status == RUNNING:
303             # kill by ppid+pid - SIGTERM first, then try SIGKILL
304             rkill(
305                 pid, ppid,
306                 host = self.host,
307                 user = self.user,
308                 port = self.port,
309                 agent = self.forward_agent,
310                 sudo = sudo,
311                 identity_file = self.identity_file
312                 )
313