Supporting many concurrent LinuxApplications on same LinuxNode
[nepi.git] / src / neco / resources / linux / node.py
1 from neco.execution.attribute import Attribute, Flags
2 from neco.execution.resource import ResourceManager, clsinit, ResourceState
3 from neco.resources.linux import rpmfuncs, debfuncs 
4 from neco.util import sshfuncs, execfuncs 
5
6 import collections
7 import logging
8 import os
9 import random
10 import re
11 import tempfile
12 import time
13 import threading
14
15 # TODO: Verify files and dirs exists already
16 # TODO: Blacklist nodes!
17
18 DELAY ="1s"
19
20 @clsinit
21 class LinuxNode(ResourceManager):
22     _rtype = "LinuxNode"
23
24     @classmethod
25     def _register_attributes(cls):
26         hostname = Attribute("hostname", "Hostname of the machine",
27                 flags = Flags.ExecReadOnly)
28
29         username = Attribute("username", "Local account username", 
30                 flags = Flags.Credential)
31
32         port = Attribute("port", "SSH port", flags = Flags.ExecReadOnly)
33         
34         home = Attribute("home",
35                 "Experiment home directory to store all experiment related files",
36                 flags = Flags.ExecReadOnly)
37         
38         identity = Attribute("identity", "SSH identity file",
39                 flags = Flags.Credential)
40         
41         server_key = Attribute("serverKey", "Server public key", 
42                 flags = Flags.ExecReadOnly)
43         
44         clean_home = Attribute("cleanHome", "Remove all files and directories " + \
45                 " from home folder before starting experiment", 
46                 flags = Flags.ExecReadOnly)
47         
48         clean_processes = Attribute("cleanProcesses", 
49                 "Kill all running processes before starting experiment",
50                 flags = Flags.ExecReadOnly)
51         
52         tear_down = Attribute("tearDown", "Bash script to be executed before " + \
53                 "releasing the resource",
54                 flags = Flags.ExecReadOnly)
55
56         cls._register_attribute(hostname)
57         cls._register_attribute(username)
58         cls._register_attribute(port)
59         cls._register_attribute(home)
60         cls._register_attribute(identity)
61         cls._register_attribute(server_key)
62         cls._register_attribute(clean_home)
63         cls._register_attribute(clean_processes)
64         cls._register_attribute(tear_down)
65
66     def __init__(self, ec, guid):
67         super(LinuxNode, self).__init__(ec, guid)
68         self._os = None
69         
70         # lock to avoid concurrency issues on methods used by applications 
71         self._lock = threading.Lock()
72
73         self._logger = logging.getLogger("LinuxNode")
74     
75     def log_message(self, msg):
76         return " guid %d - host %s - %s " % (self.guid, 
77                 self.get("hostname"), msg)
78
79     @property
80     def home(self):
81         return self.get("home") or "/tmp"
82
83     @property
84     def exp_dir(self):
85         exp_dir = os.path.join(self.home, self.ec.exp_id)
86         return exp_dir if exp_dir.startswith('/') else "${HOME}/"
87
88     @property
89     def node_dir(self):
90         node_dir = "node-%d" % self.guid
91         return os.path.join(self.exp_dir, node_dir)
92
93     @property
94     def os(self):
95         if self._os:
96             return self._os
97
98         if (not self.get("hostname") or not self.get("username")):
99             msg = "Can't resolve OS, insufficient data "
100             self.error(msg)
101             raise RuntimeError, msg
102
103         (out, err), proc = self.execute("cat /etc/issue", with_lock = True)
104
105         if err and proc.poll():
106             msg = "Error detecting OS "
107             self.error(msg, out, err)
108             raise RuntimeError, "%s - %s - %s" %( msg, out, err )
109
110         if out.find("Fedora release 12") == 0:
111             self._os = "f12"
112         elif out.find("Fedora release 14") == 0:
113             self._os = "f14"
114         elif out.find("Debian") == 0: 
115             self._os = "debian"
116         elif out.find("Ubuntu") ==0:
117             self._os = "ubuntu"
118         else:
119             msg = "Unsupported OS"
120             self.error(msg, out)
121             raise RuntimeError, "%s - %s " %( msg, out )
122
123         return self._os
124
125     @property
126     def localhost(self):
127         return self.get("hostname") in ['localhost', '127.0.0.7', '::1']
128
129     def provision(self, filters = None):
130         if not self.is_alive():
131             self._state = ResourceState.FAILED
132             self.error("Deploy failed. Unresponsive node")
133             return
134
135         if self.get("cleanProcesses"):
136             self.clean_processes()
137
138         if self.get("cleanHome"):
139             self.clean_home()
140        
141         self.mkdir(self.node_dir)
142
143         super(LinuxNode, self).provision()
144
145     def deploy(self):
146         if self.state == ResourceState.NEW:
147             try:
148                self.discover()
149                self.provision()
150             except:
151                 self._state = ResourceState.FAILED
152                 raise
153
154         # Node needs to wait until all associated interfaces are 
155         # ready before it can finalize deployment
156         from neco.resources.linux.interface import LinuxInterface
157         ifaces = self.get_connected(LinuxInterface.rtype())
158         for iface in ifaces:
159             if iface.state < ResourceState.READY:
160                 self.ec.schedule(DELAY, self.deploy)
161                 return 
162
163         super(LinuxNode, self).deploy()
164
165     def release(self):
166         tear_down = self.get("tearDown")
167         if tear_down:
168             self.execute(tear_down)
169
170         super(LinuxNode, self).release()
171
172     def valid_connection(self, guid):
173         # TODO: Validate!
174         return True
175
176     def clean_processes(self, killer = False):
177         self.info("Cleaning up processes")
178         
179         if killer:
180             # Hardcore kill
181             cmd = ("sudo -S killall python tcpdump || /bin/true ; " +
182                 "sudo -S killall python tcpdump || /bin/true ; " +
183                 "sudo -S kill $(ps -N -T -o pid --no-heading | grep -v $PPID | sort) || /bin/true ; " +
184                 "sudo -S killall -u root || /bin/true ; " +
185                 "sudo -S killall -u root || /bin/true ; ")
186         else:
187             # Be gentler...
188             cmd = ("sudo -S killall tcpdump || /bin/true ; " +
189                 "sudo -S killall tcpdump || /bin/true ; " +
190                 "sudo -S killall -u %s || /bin/true ; " % self.get("username") +
191                 "sudo -S killall -u %s || /bin/true ; " % self.get("username"))
192
193
194         out = err = ""
195         (out, err), proc = self.execute(cmd, retry = 1, with_lock = True) 
196             
197     def clean_home(self):
198         self.info("Cleaning up home")
199
200         cmd = ("cd %s ; " % self.home +
201             "find . -maxdepth 1  \( -name '.cache' -o -name '.local' -o -name '.config' -o -name 'nepi-*' \)"+
202             " -execdir rm -rf {} + ")
203
204         out = err = ""
205         (out, err), proc = self.execute(cmd, with_lock = True)
206
207     def upload(self, src, dst, text = False):
208         """ Copy content to destination
209
210            src  content to copy. Can be a local file, directory or a list of files
211
212            dst  destination path on the remote host (remote is always self.host)
213
214            text src is text input, it must be stored into a temp file before uploading
215         """
216         # If source is a string input 
217         f = None
218         if text and not os.path.isfile(src):
219             # src is text input that should be uploaded as file
220             # create a temporal file with the content to upload
221             f = tempfile.NamedTemporaryFile(delete=False)
222             f.write(src)
223             f.close()
224             src = f.name
225
226         if not self.localhost:
227             # Build destination as <user>@<server>:<path>
228             dst = "%s@%s:%s" % (self.get("username"), self.get("hostname"), dst)
229
230         result = self.copy(src, dst)
231
232         # clean up temp file
233         if f:
234             os.remove(f.name)
235
236         return result
237
238     def download(self, src, dst):
239         if not self.localhost:
240             # Build destination as <user>@<server>:<path>
241             src = "%s@%s:%s" % (self.get("username"), self.get("hostname"), src)
242         return self.copy(src, dst)
243
244     def install_packages(self, packages, home = None):
245         home = home or self.node_dir
246
247         cmd = ""
248         if self.os in ["f12", "f14"]:
249             cmd = rpmfuncs.install_packages_command(self.os, packages)
250         elif self.os in ["debian", "ubuntu"]:
251             cmd = debfuncs.install_packages_command(self.os, packages)
252         else:
253             msg = "Error installing packages ( OS not known ) "
254             self.error(msg, self.os)
255             raise RuntimeError, msg
256
257         out = err = ""
258         (out, err), proc = self.run_and_wait(cmd, home, 
259             pidfile = "instpkg_pid",
260             stdout = "instpkg_log", 
261             stderr = "instpkg_err", 
262             raise_on_error = True)
263
264         return (out, err), proc 
265
266     def remove_packages(self, packages, home = None):
267         home = home or self.node_dir
268
269         cmd = ""
270         if self.os in ["f12", "f14"]:
271             cmd = rpmfuncs.remove_packages_command(self.os, packages)
272         elif self.os in ["debian", "ubuntu"]:
273             cmd = debfuncs.remove_packages_command(self.os, packages)
274         else:
275             msg = "Error removing packages ( OS not known ) "
276             self.error(msg)
277             raise RuntimeError, msg
278
279         out = err = ""
280         (out, err), proc = self.run_and_wait(cmd, home, 
281             pidfile = "rmpkg_pid",
282             stdout = "rmpkg_log", 
283             stderr = "rmpkg_err", 
284             raise_on_error = True)
285          
286         return (out, err), proc 
287
288     def mkdir(self, path, clean = False):
289         if clean:
290             self.rmdir(path)
291
292         return self.execute("mkdir -p %s" % path, with_lock = True)
293
294     def rmdir(self, path):
295         return self.execute("rm -rf %s" % path, with_lock = True)
296
297     def run_and_wait(self, command, 
298             home = ".", 
299             pidfile = "pid", 
300             stdin = None, 
301             stdout = 'stdout', 
302             stderr = 'stderr', 
303             sudo = False,
304             raise_on_error = False):
305         """ runs a command in background on the remote host, but waits
306             until the command finishes execution.
307             This is more robust than doing a simple synchronized 'execute',
308             since in the remote host the command can continue to run detached
309             even if network disconnections occur
310         """
311         # run command in background in remote host
312         (out, err), proc = self.run(command, home, 
313                 pidfile = pidfile,
314                 stdin = stdin, 
315                 stdout = stdout, 
316                 stderr = stderr, 
317                 sudo = sudo)
318
319         # check no errors occurred
320         if proc.poll() and err:
321             msg = " Failed to run command '%s' " % command
322             self.error(msg, out, err)
323             if raise_on_error:
324                 raise RuntimeError, msg
325
326         # Wait for pid file to be generated
327         pid, ppid = self.wait_pid(
328                 home = home, 
329                 pidfile = pidfile, 
330                 raise_on_error = raise_on_error)
331
332         # wait until command finishes to execute
333         self.wait_run(pid, ppid)
334        
335         # check if execution errors occurred
336         (out, err), proc = self.check_output(home, stderr)
337
338         if err or out:
339             msg = " Failed to run command '%s' " % command
340             self.error(msg, out, err)
341
342             if raise_on_error:
343                 raise RuntimeError, msg
344         
345         return (out, err), proc
346  
347     def wait_pid(self, home = ".", pidfile = "pid", raise_on_error = False):
348         """ Waits until the pid file for the command is generated, 
349             and returns the pid and ppid of the process """
350         pid = ppid = None
351         delay = 1.0
352         for i in xrange(5):
353             pidtuple = self.checkpid(home = home, pidfile = pidfile)
354             
355             if pidtuple:
356                 pid, ppid = pidtuple
357                 break
358             else:
359                 time.sleep(delay)
360                 delay = min(30,delay*1.2)
361         else:
362             msg = " Failed to get pid for pidfile %s/%s " % (
363                     home, pidfile )
364             self.error(msg)
365             
366             if raise_on_error:
367                 raise RuntimeError, msg
368
369         return pid, ppid
370
371     def wait_run(self, pid, ppid, trial = 0):
372         """ wait for a remote process to finish execution """
373         delay = 1.0
374         first = True
375         bustspin = 0
376
377         while True:
378             status = self.status(pid, ppid)
379             
380             if status is sshfuncs.FINISHED:
381                 break
382             elif status is not sshfuncs.RUNNING:
383                 bustspin += 1
384                 time.sleep(delay*(5.5+random.random()))
385                 if bustspin > 12:
386                     break
387             else:
388                 if first:
389                     first = False
390
391                 time.sleep(delay*(0.5+random.random()))
392                 delay = min(30,delay*1.2)
393                 bustspin = 0
394
395     def check_output(self, home, filename):
396         """ checks file content """
397         (out, err), proc = self.execute("cat %s" % 
398             os.path.join(home, filename), with_lock = True)
399         return (out, err), proc
400
401     def is_alive(self):
402         if self.localhost:
403             return True
404
405         out = err = ""
406         try:
407             (out, err), proc = self.execute("echo 'ALIVE'", with_lock = True)
408         except:
409             import traceback
410             trace = traceback.format_exc()
411             msg = "Unresponsive host "
412             self.warn(msg, out, trace)
413             return False
414
415         if out.strip().startswith('ALIVE'):
416             return True
417         else:
418             msg = "Unresponsive host "
419             self.warn(msg, out, err)
420             return False
421
422             # TODO!
423             #if self.check_bad_host(out,err):
424             #    self.blacklist()
425
426     def copy(self, src, dst):
427         if self.localhost:
428             (out, err), proc =  execfuncs.lcopy(source, dest, 
429                     recursive = True)
430         else:
431             with self._lock:
432                 (out, err), proc = sshfuncs.rcopy(
433                     src, dst, 
434                     port = self.get("port"),
435                     identity = self.get("identity"),
436                     server_key = self.get("serverKey"),
437                     recursive = True)
438
439         return (out, err), proc
440
441     def execute(self, command,
442             sudo = False,
443             stdin = None, 
444             env = None,
445             tty = False,
446             forward_x11 = False,
447             timeout = None,
448             retry = 3,
449             err_on_timeout = True,
450             connect_timeout = 30,
451             persistent = True,
452             with_lock = False
453             ):
454         """ Notice that this invocation will block until the
455         execution finishes. If this is not the desired behavior,
456         use 'run' instead."""
457
458         if self.localhost:
459             (out, err), proc = execfuncs.lexec(command, 
460                     user = user,
461                     sudo = sudo,
462                     stdin = stdin,
463                     env = env)
464         else:
465             if with_lock:
466                 with self._lock:
467                     (out, err), proc = sshfuncs.rexec(
468                         command, 
469                         host = self.get("hostname"),
470                         user = self.get("username"),
471                         port = self.get("port"),
472                         agent = True,
473                         sudo = sudo,
474                         stdin = stdin,
475                         identity = self.get("identity"),
476                         server_key = self.get("serverKey"),
477                         env = env,
478                         tty = tty,
479                         forward_x11 = forward_x11,
480                         timeout = timeout,
481                         retry = retry,
482                         err_on_timeout = err_on_timeout,
483                         connect_timeout = connect_timeout,
484                         persistent = persistent
485                         )
486             else:
487                 (out, err), proc = sshfuncs.rexec(
488                     command, 
489                     host = self.get("hostname"),
490                     user = self.get("username"),
491                     port = self.get("port"),
492                     agent = True,
493                     sudo = sudo,
494                     stdin = stdin,
495                     identity = self.get("identity"),
496                     server_key = self.get("serverKey"),
497                     env = env,
498                     tty = tty,
499                     forward_x11 = forward_x11,
500                     timeout = timeout,
501                     retry = retry,
502                     err_on_timeout = err_on_timeout,
503                     connect_timeout = connect_timeout,
504                     persistent = persistent
505                     )
506
507         return (out, err), proc
508
509     def run(self, command, 
510             home = None,
511             create_home = True,
512             pidfile = "pid",
513             stdin = None, 
514             stdout = 'stdout', 
515             stderr = 'stderr', 
516             sudo = False):
517
518         self.debug("Running %s" % command)
519         
520         if self.localhost:
521             (out, err), proc = execfuncs.lspawn(command, pidfile, 
522                     stdout = stdout, 
523                     stderr = stderr, 
524                     stdin = stdin, 
525                     home = home, 
526                     create_home = create_home, 
527                     sudo = sudo,
528                     user = user) 
529         else:
530             # Start process in a "daemonized" way, using nohup and heavy
531             # stdin/out redirection to avoid connection issues
532             with self._lock:
533                 (out,err), proc = sshfuncs.rspawn(
534                     command,
535                     pidfile = pidfile,
536                     home = home,
537                     create_home = create_home,
538                     stdin = stdin if stdin is not None else '/dev/null',
539                     stdout = stdout if stdout else '/dev/null',
540                     stderr = stderr if stderr else '/dev/null',
541                     sudo = sudo,
542                     host = self.get("hostname"),
543                     user = self.get("username"),
544                     port = self.get("port"),
545                     agent = True,
546                     identity = self.get("identity"),
547                     server_key = self.get("serverKey")
548                     )
549
550         return (out, err), proc
551
552     def checkpid(self, home = ".", pidfile = "pid"):
553         if self.localhost:
554             pidtuple =  execfuncs.lcheckpid(os.path.join(home, pidfile))
555         else:
556             with self._lock:
557                 pidtuple = sshfuncs.rcheckpid(
558                     os.path.join(home, pidfile),
559                     host = self.get("hostname"),
560                     user = self.get("username"),
561                     port = self.get("port"),
562                     agent = True,
563                     identity = self.get("identity"),
564                     server_key = self.get("serverKey")
565                     )
566         
567         return pidtuple
568     
569     def status(self, pid, ppid):
570         if self.localhost:
571             status = execfuncs.lstatus(pid, ppid)
572         else:
573             with self._lock:
574                 status = sshfuncs.rstatus(
575                         pid, ppid,
576                         host = self.get("hostname"),
577                         user = self.get("username"),
578                         port = self.get("port"),
579                         agent = True,
580                         identity = self.get("identity"),
581                         server_key = self.get("serverKey")
582                         )
583            
584         return status
585     
586     def kill(self, pid, ppid, sudo = False):
587         out = err = ""
588         proc = None
589         status = self.status(pid, ppid)
590
591         if status == sshfuncs.RUNNING:
592             if self.localhost:
593                 (out, err), proc = execfuncs.lkill(pid, ppid, sudo)
594             else:
595                 with self._lock:
596                     (out, err), proc = sshfuncs.rkill(
597                         pid, ppid,
598                         host = self.get("hostname"),
599                         user = self.get("username"),
600                         port = self.get("port"),
601                         agent = True,
602                         sudo = sudo,
603                         identity = self.get("identity"),
604                         server_key = self.get("serverKey")
605                         )
606         return (out, err), proc
607
608     def check_bad_host(self, out, err):
609         badre = re.compile(r'(?:'
610                            r'|Error: disk I/O error'
611                            r')', 
612                            re.I)
613         return badre.search(out) or badre.search(err)
614
615     def blacklist(self):
616         # TODO!!!!
617         self.warn(" Blacklisting malfunctioning node ")
618         #import util
619         #util.appendBlacklist(self.hostname)
620