In test linux_node.py ignoring unresponsive nodes.
[nepi.git] / src / neco / resources / linux / node.py
1 from neco.execution.resource import Resource
2 from neco.util.sshfuncs import eintr_retry, rexec, rcopy, \
3         rspawn, rcheck_pid, rstatus, rkill, RUNNING 
4
5 import cStringIO
6 import logging
7 import os.path
8
9 class LinuxNode(Resource):
10     def __init__(self, box, ec):
11         super(LinuxNode, self).__init__(box, ec)
12         self.ip = None
13         self.host = None
14         self.user = None
15         self.port = None
16         self.identity_file = None
17         # packet management system - either yum or apt for now...
18         self._pm = None
19        
20         # Logging
21         loglevel = "debug"
22         self._logger = logging.getLogger("neco.resources.base.LinuxNode.%s" %\
23                 self.box.guid)
24         self._logger.setLevel(getattr(logging, loglevel.upper()))
25
26     @property
27     def pm(self):
28         if self._pm:
29             return self._pm
30
31         if (not (self.host or self.ip) or not self.user):
32             msg = "Can't resolve package management system. Insufficient data."
33             self._logger.error(msg)
34             raise RuntimeError(msg)
35
36         out = self.execute("cat /etc/issue")
37
38         if out.find("Fedora") == 0:
39             self._pm = "yum"
40         elif out.find("Debian") == 0 or out.find("Ubuntu") ==0:
41             self._pm = "apt-get"
42         else:
43             msg = "Can't resolve package management system. Unknown OS."
44             self._logger.error(msg)
45             raise RuntimeError(msg)
46
47         return self._pm
48
49     def install(self, packages):
50         if not isinstance(packages, list):
51             packages = [packages]
52
53         for p in packages:
54             self.execute("%s -y install %s" % (self.pm, p), sudo = True, 
55                     tty = True)
56
57     def uninstall(self, packages):
58         if not isinstance(packages, list):
59             packages = [packages]
60
61         for p in packages:
62             self.execute("%s -y remove %s" % (self.pm, p), sudo = True, 
63                     tty = True)
64
65     def upload(self, src, dst):
66         if not os.path.isfile(src):
67             src = cStringIO.StringIO(src)
68
69         (out, err), proc = eintr_retry(rcopy)(
70             src, dst, 
71             self.host or self.ip, 
72             self.user,
73             port = self.port,
74             identity_file = self.identity_file)
75
76         if proc.wait():
77             msg = "Error uploading to %s got:\n%s%s" %\
78                     (self.host or self.ip, out, err)
79             self._logger.error(msg)
80             raise RuntimeError(msg)
81
82     def is_alive(self, verbose = False):
83         (out, err), proc = eintr_retry(rexec)(
84                 "echo 'ALIVE'",
85                 self.host or self.ip, 
86                 self.user,
87                 port = self.port, 
88                 identity_file = self.identity_file,
89                 timeout = 60,
90                 err_on_timeout = False,
91                 persistent = False)
92         
93         if proc.wait():
94             if verbose:
95                 self._logger.warn("Unresponsive node %s got:\n%s%s", self.host, out, err)
96             return False
97         elif out.strip().startswith('ALIVE'):
98             return True
99         else:
100             if verbose:
101                 self._logger.warn("Unresponsive node %s got:\n%s%s", self.host, out, err)
102             return False
103
104     def mkdir(self, path, clean = True):
105         if clean:
106             self.rmdir(path)
107
108         self.execute(
109             "mkdir -p %s" % path,
110             timeout = 120,
111             retry = 3
112             )
113
114     def rmdir(self, path):
115         self.execute(
116             "rm -rf %s" % path,
117             timeout = 120,
118             retry = 3
119             )
120
121     def execute(self, command,
122             agent = True,
123             sudo = False,
124             stdin = "", 
125             tty = False,
126             timeout = None,
127             retry = 0,
128             err_on_timeout = True,
129             connect_timeout = 30,
130             persistent = True):
131         """ Notice that this invocation will block until the
132         execution finishes. If this is not the desired behavior,
133         use 'run' instead."""
134         (out, err), proc = eintr_retry(rexec)(
135                 command, 
136                 self.host or self.ip, 
137                 self.user,
138                 port = self.port, 
139                 agent = agent,
140                 sudo = sudo,
141                 stdin = stdin, 
142                 identity_file = self.identity_file,
143                 tty = tty,
144                 timeout = timeout,
145                 retry = retry,
146                 err_on_timeout = err_on_timeout,
147                 connect_timeout = connect_timeout,
148                 persistent = persistent)
149
150         if proc.wait():
151             msg = "Failed to execute command %s at node %s: %s %s" % \
152                     (command, self.host or self.ip, out, err,)
153             self._logger.warn(msg)
154             raise RuntimeError(msg)
155
156         return out
157
158     def run(self, command, home, 
159             stdin = None, 
160             stdout = 'stdout', 
161             stderr = 'stderr', 
162             sudo = False):
163         self._logger.info("Running %s", command)
164
165         # Start process in a "daemonized" way, using nohup and heavy
166         # stdin/out redirection to avoid connection issues
167         (out,err), proc = rspawn(
168             command,
169             pidfile = './pid',
170             home = home,
171             stdin = stdin if stdin is not None else '/dev/null',
172             stdout = stdout if stdout else '/dev/null',
173             stderr = stderr if stderr else '/dev/null',
174             sudo = sudo,
175             host = self.host,
176             user = self.user,
177             port = self.port,
178             identity_file = self.identity_file
179             )
180         
181         if proc.wait():
182             raise RuntimeError, "Failed to set up application: %s %s" % (out,err,)
183     
184     def checkpid(self, path):            
185         # Get PID/PPID
186         # NOTE: wait a bit for the pidfile to be created
187         pidtuple = rcheck_pid(
188             os.path.join(path, 'pid'),
189             host = self.host,
190             user = self.user,
191             port = self.port,
192             identity_file = self.identity_file
193             )
194         
195         return pidtuple
196     
197     def status(self, pid, ppid):
198         status = rstatus(
199                 pid, ppid,
200                 host = self.host,
201                 user = self.user,
202                 port = self.port,
203                 identity_file = self.identity_file
204                 )
205            
206         return status
207     
208     def kill(self, pid, ppid, sudo = False):
209         status = self.status(pid, ppid)
210         if status == RUNNING:
211             # kill by ppid+pid - SIGTERM first, then try SIGKILL
212             rkill(
213                 pid, ppid,
214                 host = self.host,
215                 user = self.user,
216                 port = self.port,
217                 sudo = sudo,
218                 identity_file = self.identity_file
219                 )
220