Added Linux Application
[nepi.git] / src / neco / resources / linux / node.py
1 from neco.execution.attribute import Attribute, Flags
2 from neco.execution.resource import ResourceManager, clsinit, ResourceState
3 from neco.resources.linux import rpmfuncs, debfuncs 
4 from neco.util import sshfuncs, execfuncs 
5
6 import collections
7 import logging
8 import os
9 import random
10 import re
11 import tempfile
12 import time
13 import threading
14
15 # TODO: Verify files and dirs exists already
16 # TODO: Blacklist node!
17
18 DELAY ="1s"
19
20 @clsinit
21 class LinuxNode(ResourceManager):
22     _rtype = "LinuxNode"
23
24     @classmethod
25     def _register_attributes(cls):
26         hostname = Attribute("hostname", "Hostname of the machine",
27                 flags = Flags.ExecReadOnly)
28
29         username = Attribute("username", "Local account username", 
30                 flags = Flags.Credential)
31
32         port = Attribute("port", "SSH port", flags = Flags.ExecReadOnly)
33         
34         home = Attribute("home",
35                 "Experiment home directory to store all experiment related files",
36                 flags = Flags.ExecReadOnly)
37         
38         identity = Attribute("identity", "SSH identity file",
39                 flags = Flags.Credential)
40         
41         server_key = Attribute("serverKey", "Server public key", 
42                 flags = Flags.ExecReadOnly)
43         
44         clean_home = Attribute("cleanHome", "Remove all files and directories " + \
45                 " from home folder before starting experiment", 
46                 flags = Flags.ExecReadOnly)
47         
48         clean_processes = Attribute("cleanProcesses", 
49                 "Kill all running processes before starting experiment",
50                 flags = Flags.ExecReadOnly)
51         
52         tear_down = Attribute("tearDown", "Bash script to be executed before " + \
53                 "releasing the resource",
54                 flags = Flags.ExecReadOnly)
55
56         cls._register_attribute(hostname)
57         cls._register_attribute(username)
58         cls._register_attribute(port)
59         cls._register_attribute(home)
60         cls._register_attribute(identity)
61         cls._register_attribute(server_key)
62         cls._register_attribute(clean_home)
63         cls._register_attribute(clean_processes)
64         cls._register_attribute(tear_down)
65
66     def __init__(self, ec, guid):
67         super(LinuxNode, self).__init__(ec, guid)
68         self._os = None
69         
70         # lock to avoid concurrency issues on methods used by applications 
71         self._lock = threading.Lock()
72
73         self._logger = logging.getLogger("neco.linux.Node.%d " % self.guid)
74
75     @property
76     def home(self):
77         return self.get("home") or "/tmp"
78
79     @property
80     def exp_dir(self):
81         exp_dir = os.path.join(self.home, self.ec.exp_id)
82         return exp_dir if exp_dir.startswith('/') else "${HOME}/"
83
84     @property
85     def node_dir(self):
86         node_dir = "node-%d" % self.guid
87         return os.path.join(self.exp_dir, node_dir)
88
89     @property
90     def os(self):
91         if self._os:
92             return self._os
93
94         if (not self.get("hostname") or not self.get("username")):
95             msg = "Can't resolve OS for guid %d. Insufficient data." % self.guid
96             self.logger.error(msg)
97             raise RuntimeError, msg
98
99         (out, err), proc = self.execute("cat /etc/issue")
100
101         if err and proc.poll():
102             msg = "Error detecting OS for host %s. err: %s " % (self.get("hostname"), err)
103             self.logger.error(msg)
104             raise RuntimeError, msg
105
106         if out.find("Fedora release 12") == 0:
107             self._os = "f12"
108         elif out.find("Fedora release 14") == 0:
109             self._os = "f14"
110         elif out.find("Debian") == 0: 
111             self._os = "debian"
112         elif out.find("Ubuntu") ==0:
113             self._os = "ubuntu"
114         else:
115             msg = "Unsupported OS %s for host %s" % (out, self.get("hostname"))
116             self.logger.error(msg)
117             raise RuntimeError, msg
118
119         return self._os
120
121     @property
122     def localhost(self):
123         return self.get("hostname") in ['localhost', '127.0.0.7', '::1']
124
125     def provision(self, filters = None):
126         if not self.is_alive():
127             self._state = ResourceState.FAILED
128             self.logger.error("Deploy failed. Unresponsive node")
129             return
130
131         if self.get("cleanProcesses"):
132             self.clean_processes()
133
134         if self.get("cleanHome"):
135             self.clean_home()
136        
137         self.mkdir(self.node_dir)
138
139         super(LinuxNode, self).provision()
140
141     def deploy(self):
142         if self.state == ResourceState.NEW:
143             self.discover()
144             self.provision()
145
146         # Node needs to wait until all associated interfaces are 
147         # ready before it can finalize deployment
148         from neco.resources.linux.interface import LinuxInterface
149         ifaces = self.get_connected(LinuxInterface.rtype())
150         for iface in ifaces:
151             if iface.state < ResourceState.READY:
152                 self.ec.schedule(DELAY, self.deploy)
153                 return 
154
155         super(LinuxNode, self).deploy()
156
157     def release(self):
158         tear_down = self.get("tearDown")
159         if tear_down:
160             self.execute(tear_down)
161
162         super(LinuxNode, self).release()
163
164     def valid_connection(self, guid):
165         # TODO: Validate!
166         return True
167
168     def clean_processes(self):
169         self.logger.info("Cleaning up processes")
170         
171         cmd = ("sudo -S killall python tcpdump || /bin/true ; " +
172             "sudo -S killall python tcpdump || /bin/true ; " +
173             "sudo -S kill $(ps -N -T -o pid --no-heading | grep -v $PPID | sort) || /bin/true ; " +
174             "sudo -S killall -u root || /bin/true ; " +
175             "sudo -S killall -u root || /bin/true ; ")
176
177         out = err = ""
178         with self._lock:
179            (out, err), proc = self.execute(cmd) 
180             
181     def clean_home(self):
182         self.logger.info("Cleaning up home")
183
184         cmd = ("cd %s ; " % self.home +
185             "find . -maxdepth 1  \( -name '.cache' -o -name '.local' -o -name '.config' -o -name 'nepi-*' \)"+
186             " -execdir rm -rf {} + ")
187
188         out = err = ""
189         with self._lock:
190             (out, err), proc = self.execute(cmd)
191
192     def upload(self, src, dst, text = False):
193         """ Copy content to destination
194
195            src  content to copy. Can be a local file, directory or a list of files
196
197            dst  destination path on the remote host (remote is always self.host)
198
199            text src is text input, it must be stored into a temp file before uploading
200         """
201         # If source is a string input 
202         f = None
203         if text and not os.path.isfile(src):
204             # src is text input that should be uploaded as file
205             # create a temporal file with the content to upload
206             f = tempfile.NamedTemporaryFile(delete=False)
207             f.write(src)
208             f.close()
209             src = f.name
210
211         if not self.localhost:
212             # Build destination as <user>@<server>:<path>
213             dst = "%s@%s:%s" % (self.get("username"), self.get("hostname"), dst)
214
215         result = self.copy(src, dst)
216
217         # clean up temp file
218         if f:
219             os.remove(f.name)
220
221         return result
222
223     def download(self, src, dst):
224         if not self.localhost:
225             # Build destination as <user>@<server>:<path>
226             src = "%s@%s:%s" % (self.get("username"), self.get("hostname"), src)
227         return self.copy(src, dst)
228
229     def install_packages(self, packages, home = None):
230         home = home or self.node_dir
231
232         cmd = ""
233         if self.os in ["f12", "f14"]:
234             cmd = rpmfuncs.install_packages_command(self.os, packages)
235         elif self.os in ["debian", "ubuntu"]:
236             cmd = debfuncs.install_packages_command(self.os, packages)
237         else:
238             msg = "Error installing packages. OS not known for host %s " % (
239                     self.get("hostname"))
240             self.logger.error(msg)
241             raise RuntimeError, msg
242
243         out = err = ""
244         with self._lock:
245             (out, err), proc = self.run_and_wait(cmd, home, 
246                 pidfile = "instpkg_pid",
247                 stdout = "instpkg_log", 
248                 stderr = "instpkg_err", 
249                 raise_on_error = True)
250
251         return (out, err), proc 
252
253     def remove_packages(self, packages, home = None):
254         home = home or self.node_dir
255
256         cmd = ""
257         if self.os in ["f12", "f14"]:
258             cmd = rpmfuncs.remove_packages_command(self.os, packages)
259         elif self.os in ["debian", "ubuntu"]:
260             cmd = debfuncs.remove_packages_command(self.os, packages)
261         else:
262             msg = "Error removing packages. OS not known for host %s " % (
263                     self.get("hostname"))
264             self.logger.error(msg)
265             raise RuntimeError, msg
266
267         out = err = ""
268         with self._lock:
269             (out, err), proc = self.run_and_wait(cmd, home, 
270                 pidfile = "rmpkg_pid",
271                 stdout = "rmpkg_log", 
272                 stderr = "rmpkg_err", 
273                 raise_on_error = True)
274          
275         return (out, err), proc 
276
277     def mkdir(self, path, clean = False):
278         if clean:
279             self.rmdir(path)
280
281         return self.execute("mkdir -p %s" % path)
282
283     def rmdir(self, path):
284         return self.execute("rm -rf %s" % path)
285
286     def run_and_wait(self, command, 
287             home = ".", 
288             pidfile = "pid", 
289             stdin = None, 
290             stdout = 'stdout', 
291             stderr = 'stderr', 
292             sudo = False,
293             raise_on_error = False):
294         """ runs a command in background on the remote host, but waits
295             until the command finishes execution.
296             This is more robust than doing a simple synchronized 'execute',
297             since in the remote host the command can continue to run detached
298             even if network disconnections occur
299         """
300         # run command in background in remote host
301         (out, err), proc = self.run(command, home, 
302                 pidfile = pidfile,
303                 stdin = stdin, 
304                 stdout = stdout, 
305                 stderr = stderr, 
306                 sudo = sudo)
307
308         # check no errors occurred
309         if proc.poll() and err:
310             msg = " Failed to run command %s on host %s" % (
311                     command, self.get("hostname"))
312             self.logger.error(msg)
313             if raise_on_error:
314                 raise RuntimeError, msg
315
316         # Wait for pid file to be generated
317         pid, ppid = self.wait_pid(
318                 home = home, 
319                 pidfile = pidfile, 
320                 raise_on_error = raise_on_error)
321
322         # wait until command finishes to execute
323         self.wait_run(pid, ppid)
324        
325         # check if execution errors occurred
326         (out, err), proc = self.check_output(home, stderr)
327
328         if err or out:
329             msg = "Error while running command %s on host %s. error output: %s" % (
330                     command, self.get("hostname"), out)
331             if err:
332                 msg += " . err: %s" % err
333
334             self.logger.error(msg)
335             if raise_on_error:
336                 raise RuntimeError, msg
337         
338         return (out, err), proc
339  
340     def wait_pid(self, home = ".", pidfile = "pid", raise_on_error = False):
341         """ Waits until the pid file for the command is generated, 
342             and returns the pid and ppid of the process """
343         pid = ppid = None
344         delay = 1.0
345         for i in xrange(5):
346             pidtuple = self.checkpid(home = home, pidfile = pidfile)
347             
348             if pidtuple:
349                 pid, ppid = pidtuple
350                 break
351             else:
352                 time.sleep(delay)
353                 delay = min(30,delay*1.2)
354         else:
355             msg = " Failed to get pid for pidfile %s/%s on host %s" % (
356                     home, pidfile, self.get("hostname"))
357             self.logger.error(msg)
358             if raise_on_error:
359                 raise RuntimeError, msg
360
361         return pid, ppid
362
363     def wait_run(self, pid, ppid, trial = 0):
364         """ wait for a remote process to finish execution """
365         delay = 1.0
366         first = True
367         bustspin = 0
368
369         while True:
370             status = self.status(pid, ppid)
371             
372             if status is sshfuncs.FINISHED:
373                 break
374             elif status is not sshfuncs.RUNNING:
375                 bustspin += 1
376                 time.sleep(delay*(5.5+random.random()))
377                 if bustspin > 12:
378                     break
379             else:
380                 if first:
381                     first = False
382
383                 time.sleep(delay*(0.5+random.random()))
384                 delay = min(30,delay*1.2)
385                 bustspin = 0
386
387     def check_output(self, home, filename):
388         """ checks file content """
389         (out, err), proc = self.execute("cat %s" % 
390                 os.path.join(home, filename))
391         return (out, err), proc
392
393     def is_alive(self):
394         if self.localhost:
395             return True
396
397         out = err = ""
398         try:
399             (out, err), proc = self.execute("echo 'ALIVE'")
400         except:
401             import traceback
402             trace = traceback.format_exc()
403             self.logger.warn("Unresponsive host %s. got:\n out: %s err: %s\n traceback: %s", 
404                     self.get("hostname"), out, err, trace)
405             return False
406
407         if out.strip().startswith('ALIVE'):
408             return True
409         else:
410             self.logger.warn("Unresponsive host %s. got:\n%s%s", 
411                     self.get("hostname"), out, err)
412             return False
413
414             # TODO!
415             #if self.check_bad_host(out,err):
416             #    self.blacklist()
417
418     def copy(self, src, dst):
419         if self.localhost:
420             (out, err), proc =  execfuncs.lcopy(source, dest, 
421                     recursive = True)
422         else:
423             (out, err), proc = self.safe_retry(sshfuncs.rcopy)(
424                 src, dst, 
425                 port = self.get("port"),
426                 identity = self.get("identity"),
427                 server_key = self.get("serverKey"),
428                 recursive = True)
429
430         return (out, err), proc
431
432     def execute(self, command,
433             sudo = False,
434             stdin = None, 
435             env = None,
436             tty = False,
437             forward_x11 = False,
438             timeout = None,
439             retry = 0,
440             err_on_timeout = True,
441             connect_timeout = 30,
442             persistent = True
443             ):
444         """ Notice that this invocation will block until the
445         execution finishes. If this is not the desired behavior,
446         use 'run' instead."""
447
448         if self.localhost:
449             (out, err), proc = execfuncs.lexec(command, 
450                     user = user,
451                     sudo = sudo,
452                     stdin = stdin,
453                     env = env)
454         else:
455             (out, err), proc = self.safe_retry(sshfuncs.rexec)(
456                     command, 
457                     host = self.get("hostname"),
458                     user = self.get("username"),
459                     port = self.get("port"),
460                     agent = True,
461                     sudo = sudo,
462                     stdin = stdin,
463                     identity = self.get("identity"),
464                     server_key = self.get("serverKey"),
465                     env = env,
466                     tty = tty,
467                     forward_x11 = forward_x11,
468                     timeout = timeout,
469                     retry = retry,
470                     err_on_timeout = err_on_timeout,
471                     connect_timeout = connect_timeout,
472                     persistent = persistent
473                     )
474
475         return (out, err), proc
476
477     def run(self, command, 
478             home = None,
479             create_home = True,
480             pidfile = "pid",
481             stdin = None, 
482             stdout = 'stdout', 
483             stderr = 'stderr', 
484             sudo = False):
485
486         self.logger.info("Running %s", command)
487         
488         if self.localhost:
489             (out, err), proc = execfuncs.lspawn(command, pidfile, 
490                     stdout = stdout, 
491                     stderr = stderr, 
492                     stdin = stdin, 
493                     home = home, 
494                     create_home = create_home, 
495                     sudo = sudo,
496                     user = user) 
497         else:
498             # Start process in a "daemonized" way, using nohup and heavy
499             # stdin/out redirection to avoid connection issues
500             (out,err), proc = self.safe_retry(sshfuncs.rspawn)(
501                 command,
502                 pidfile = pidfile,
503                 home = home,
504                 create_home = create_home,
505                 stdin = stdin if stdin is not None else '/dev/null',
506                 stdout = stdout if stdout else '/dev/null',
507                 stderr = stderr if stderr else '/dev/null',
508                 sudo = sudo,
509                 host = self.get("hostname"),
510                 user = self.get("username"),
511                 port = self.get("port"),
512                 agent = True,
513                 identity = self.get("identity"),
514                 server_key = self.get("serverKey")
515                 )
516
517         return (out, err), proc
518
519     def checkpid(self, home = ".", pidfile = "pid"):
520         if self.localhost:
521             pidtuple =  execfuncs.lcheckpid(os.path.join(home, pidfile))
522         else:
523             pidtuple = sshfuncs.rcheckpid(
524                 os.path.join(home, pidfile),
525                 host = self.get("hostname"),
526                 user = self.get("username"),
527                 port = self.get("port"),
528                 agent = True,
529                 identity = self.get("identity"),
530                 server_key = self.get("serverKey")
531                 )
532         
533         return pidtuple
534     
535     def status(self, pid, ppid):
536         if self.localhost:
537             status = execfuncs.lstatus(pid, ppid)
538         else:
539             status = sshfuncs.rstatus(
540                     pid, ppid,
541                     host = self.get("hostname"),
542                     user = self.get("username"),
543                     port = self.get("port"),
544                     agent = True,
545                     identity = self.get("identity"),
546                     server_key = self.get("serverKey")
547                     )
548            
549         return status
550     
551     def kill(self, pid, ppid, sudo = False):
552         out = err = ""
553         proc = None
554         status = self.status(pid, ppid)
555
556         if status == sshfuncs.RUNNING:
557             if self.localhost:
558                 (out, err), proc = execfuncs.lkill(pid, ppid, sudo)
559             else:
560                 (out, err), proc = self.safe_retry(sshfuncs.rkill)(
561                     pid, ppid,
562                     host = self.get("hostname"),
563                     user = self.get("username"),
564                     port = self.get("port"),
565                     agent = True,
566                     sudo = sudo,
567                     identity = self.get("identity"),
568                     server_key = self.get("serverKey")
569                     )
570         return (out, err), proc
571
572     def check_bad_host(self, out, err):
573         badre = re.compile(r'(?:'
574                            r'|Error: disk I/O error'
575                            r')', 
576                            re.I)
577         return badre.search(out) or badre.search(err)
578
579     def blacklist(self):
580         # TODO!!!!
581         self.logger.warn("Blacklisting malfunctioning node %s", self.hostname)
582         #import util
583         #util.appendBlacklist(self.hostname)
584
585     def safe_retry(self, func):
586         """Retries a function invocation using a lock"""
587         import functools
588         @functools.wraps(func)
589         def rv(*p, **kw):
590             fail_msg = " Failed to execute function %s(%s, %s) at host %s" % (
591                 func.__name__, p, kw, self.get("hostname"))
592             retry = kw.pop("_retry", False)
593             wlock = kw.pop("_with_lock", False)
594
595             out = err = ""
596             proc = None
597             for i in xrange(0 if retry else 4):
598                 try:
599                     if wlock:
600                         with self._lock:
601                             (out, err), proc = func(*p, **kw)
602                     else:
603                         (out, err), proc = func(*p, **kw)
604                         
605                     if proc.poll():
606                         if retry:
607                             time.sleep(i*15)
608                             continue
609                         else:
610                             self.logger.error("%s. out: %s error: %s", fail_msg, out, err)
611                     break
612                 except RuntimeError, e:
613                     if i >= 3:
614                         self.logger.error("%s. error: %s", fail_msg, e.args)
615             return (out, err), proc
616
617         return rv
618