b0d63c93f5a3e6ab23cdece8f0dc3d2dafa13435
[nepi.git] / src / nepi / resources / linux / node.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License version 2 as
7 #    published by the Free Software Foundation;
8 #
9 #    This program is distributed in the hope that it will be useful,
10 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
11 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 #    GNU General Public License for more details.
13 #
14 #    You should have received a copy of the GNU General Public License
15 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 #
17 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
18
19 from nepi.execution.attribute import Attribute, Flags, Types
20 from nepi.execution.resource import ResourceManager, clsinit_copy, \
21         ResourceState
22 from nepi.resources.linux import rpmfuncs, debfuncs 
23 from nepi.util import sshfuncs, execfuncs
24 from nepi.util.sshfuncs import ProcStatus
25
26 import collections
27 import os
28 import random
29 import re
30 import tempfile
31 import time
32 import threading
33 import traceback
34
35 # TODO: Unify delays!!
36 # TODO: Validate outcome of uploads!! 
37
38 class ExitCode:
39     """
40     Error codes that the rexitcode function can return if unable to
41     check the exit code of a spawned process
42     """
43     FILENOTFOUND = -1
44     CORRUPTFILE = -2
45     ERROR = -3
46     OK = 0
47
48 class OSType:
49     """
50     Supported flavors of Linux OS
51     """
52     DEBIAN = 1 
53     UBUNTU = 1 << 1 
54     FEDORA = 1 << 2
55     FEDORA_8 = 1 << 3 | FEDORA 
56     FEDORA_12 = 1 << 4 | FEDORA 
57     FEDORA_14 = 1 << 5 | FEDORA 
58
59 @clsinit_copy
60 class LinuxNode(ResourceManager):
61     """
62     .. class:: Class Args :
63       
64         :param ec: The Experiment controller
65         :type ec: ExperimentController
66         :param guid: guid of the RM
67         :type guid: int
68
69     .. note::
70
71         There are different ways in which commands can be executed using the
72         LinuxNode interface (i.e. 'execute' - blocking and non blocking, 'run',
73         'run_and_wait'). 
74         
75         Brief explanation:
76
77             * 'execute' (blocking mode) :  
78
79                      HOW IT WORKS: 'execute', forks a process and run the
80                      command, synchronously, attached to the terminal, in
81                      foreground.
82                      The execute method will block until the command returns
83                      the result on 'out', 'err' (so until it finishes executing).
84   
85                      USAGE: short-lived commands that must be executed attached
86                      to a terminal and in foreground, for which it IS necessary
87                      to block until the command has finished (e.g. if you want
88                      to run 'ls' or 'cat').
89
90             * 'execute' (NON blocking mode - blocking = False) :
91
92                     HOW IT WORKS: Same as before, except that execute method
93                     will return immediately (even if command still running).
94
95                     USAGE: long-lived commands that must be executed attached
96                     to a terminal and in foreground, but for which it is not
97                     necessary to block until the command has finished. (e.g.
98                     start an application using X11 forwarding)
99
100              * 'run' :
101
102                    HOW IT WORKS: Connects to the host ( using SSH if remote)
103                    and launches the command in background, detached from any
104                    terminal (daemonized), and returns. The command continues to
105                    run remotely, but since it is detached from the terminal,
106                    its pipes (stdin, stdout, stderr) can't be redirected to the
107                    console (as normal non detached processes would), and so they
108                    are explicitly redirected to files. The pidfile is created as
109                    part of the process of launching the command. The pidfile
110                    holds the pid and ppid of the process forked in background,
111                    so later on it is possible to check whether the command is still
112                    running.
113
114                     USAGE: long-lived commands that can run detached in background,
115                     for which it is NOT necessary to block (wait) until the command
116                     has finished. (e.g. start an application that is not using X11
117                     forwarding. It can run detached and remotely in background)
118
119              * 'run_and_wait' :
120
121                     HOW IT WORKS: Similar to 'run' except that it 'blocks' until
122                     the command has finished execution. It also checks whether
123                     errors occurred during runtime by reading the exitcode file,
124                     which contains the exit code of the command that was run
125                     (checking stderr only is not always reliable since many
126                     commands throw debugging info to stderr and the only way to
127                     automatically know whether an error really happened is to
128                     check the process exit code).
129
130                     Another difference with respect to 'run', is that instead
131                     of directly executing the command as a bash command line,
132                     it uploads the command to a bash script and runs the script.
133                     This allows to use the bash script to debug errors, since
134                     it remains at the remote host and can be run manually to
135                     reproduce the error.
136                   
137                     USAGE: medium-lived commands that can run detached in
138                     background, for which it IS necessary to block (wait) until
139                     the command has finished. (e.g. Package installation,
140                     source compilation, file download, etc)
141
142     """
143     _rtype = "linux::Node"
144     _help = "Controls Linux host machines ( either localhost or a host " \
145             "that can be accessed using a SSH key)"
146     _platform = "linux"
147
148     @classmethod
149     def _register_attributes(cls):
150         hostname = Attribute("hostname", "Hostname of the machine",
151                 flags = Flags.Design)
152
153         username = Attribute("username", "Local account username", 
154                 flags = Flags.Credential)
155
156         port = Attribute("port", "SSH port", flags = Flags.Design)
157         
158         home = Attribute("home",
159                 "Experiment home directory to store all experiment related files",
160                 flags = Flags.Design)
161         
162         identity = Attribute("identity", "SSH identity file",
163                 flags = Flags.Credential)
164         
165         server_key = Attribute("serverKey", "Server public key", 
166                 flags = Flags.Design)
167         
168         clean_home = Attribute("cleanHome", "Remove all nepi files and directories "
169                 " from node home folder before starting experiment", 
170                 type = Types.Bool,
171                 default = False,
172                 flags = Flags.Design)
173
174         clean_experiment = Attribute("cleanExperiment", "Remove all files and directories " 
175                 " from a previous same experiment, before the new experiment starts", 
176                 type = Types.Bool,
177                 default = False,
178                 flags = Flags.Design)
179         
180         clean_processes = Attribute("cleanProcesses", 
181                 "Kill all running processes before starting experiment",
182                 type = Types.Bool,
183                 default = False,
184                 flags = Flags.Design)
185         
186         clean_processes_after = Attribute("cleanProcessesAfter", 
187                 """Kill all running processes after starting experiment
188 This might be dangerous when using user root""",
189                 type = Types.Bool,
190                 default = True,
191                 flags = Flags.Design)
192         
193         tear_down = Attribute("tearDown", "Bash script to be executed before " + \
194                 "releasing the resource",
195                 flags = Flags.Design)
196
197         gateway_user = Attribute("gatewayUser", "Gateway account username",
198                 flags = Flags.Design)
199
200         gateway = Attribute("gateway", "Hostname of the gateway machine",
201                 flags = Flags.Design)
202
203         ip = Attribute("ip", "Linux host public IP address. "
204                    "Must not be modified by the user unless hostname is 'localhost'",
205                     flags = Flags.Design)
206
207         cls._register_attribute(hostname)
208         cls._register_attribute(username)
209         cls._register_attribute(port)
210         cls._register_attribute(home)
211         cls._register_attribute(identity)
212         cls._register_attribute(server_key)
213         cls._register_attribute(clean_home)
214         cls._register_attribute(clean_experiment)
215         cls._register_attribute(clean_processes)
216         cls._register_attribute(clean_processes_after)
217         cls._register_attribute(tear_down)
218         cls._register_attribute(gateway_user)
219         cls._register_attribute(gateway)
220         cls._register_attribute(ip)
221
222     def __init__(self, ec, guid):
223         super(LinuxNode, self).__init__(ec, guid)
224         self._os = None
225         # home directory at Linux host
226         self._home_dir = ""
227
228         # lock to prevent concurrent applications on the same node,
229         # to execute commands at the same time. There are potential
230         # concurrency issues when using SSH to a same host from 
231         # multiple threads. There are also possible operational 
232         # issues, e.g. an application querying the existence 
233         # of a file or folder prior to its creation, and another 
234         # application creating the same file or folder in between.
235         self._node_lock = threading.Lock()
236     
237     def log_message(self, msg):
238         return " guid %d - host %s - %s " % (self.guid, 
239                 self.get("hostname"), msg)
240
241     @property
242     def home_dir(self):
243         home = self.get("home") or ""
244         if not home.startswith("/"):
245            home = os.path.join(self._home_dir, home) 
246         return home
247
248     @property
249     def nepi_home(self):
250         return os.path.join(self.home_dir, ".nepi")
251
252     @property
253     def usr_dir(self):
254         return os.path.join(self.nepi_home, "nepi-usr")
255
256     @property
257     def lib_dir(self):
258         return os.path.join(self.usr_dir, "lib")
259
260     @property
261     def bin_dir(self):
262         return os.path.join(self.usr_dir, "bin")
263
264     @property
265     def src_dir(self):
266         return os.path.join(self.usr_dir, "src")
267
268     @property
269     def share_dir(self):
270         return os.path.join(self.usr_dir, "share")
271
272     @property
273     def exp_dir(self):
274         return os.path.join(self.nepi_home, "nepi-exp")
275
276     @property
277     def exp_home(self):
278         return os.path.join(self.exp_dir, self.ec.exp_id)
279
280     @property
281     def node_home(self):
282         return os.path.join(self.exp_home, "node-%d" % self.guid)
283
284     @property
285     def run_home(self):
286         return os.path.join(self.node_home, self.ec.run_id)
287
288     @property
289     def os(self):
290         if self._os:
291             return self._os
292
293         if not self.localhost and not self.get("username"):
294             msg = "Can't resolve OS, insufficient data "
295             self.error(msg)
296             raise RuntimeError, msg
297
298         out = self.get_os()
299
300         if out.find("Debian") == 0: 
301             self._os = OSType.DEBIAN
302         elif out.find("Ubuntu") ==0:
303             self._os = OSType.UBUNTU
304         elif out.find("Fedora release") == 0:
305             self._os = OSType.FEDORA
306             if out.find("Fedora release 8") == 0:
307                 self._os = OSType.FEDORA_8
308             elif out.find("Fedora release 12") == 0:
309                 self._os = OSType.FEDORA_12
310             elif out.find("Fedora release 14") == 0:
311                 self._os = OSType.FEDORA_14
312         else:
313             msg = "Unsupported OS"
314             self.error(msg, out)
315             raise RuntimeError, "%s - %s " %( msg, out )
316
317         return self._os
318
319     def get_os(self):
320         # The underlying SSH layer will sometimes return an empty
321         # output (even if the command was executed without errors).
322         # To work arround this, repeat the operation N times or
323         # until the result is not empty string
324         out = ""
325         try:
326             (out, err), proc = self.execute("cat /etc/issue", 
327                     with_lock = True,
328                     blocking = True)
329         except:
330             trace = traceback.format_exc()
331             msg = "Error detecting OS: %s " % trace
332             self.error(msg, out, err)
333         
334         return out
335
336     @property
337     def use_deb(self):
338         return (self.os & (OSType.DEBIAN|OSType.UBUNTU))
339
340     @property
341     def use_rpm(self):
342         return (self.os & OSType.FEDORA)
343
344     @property
345     def localhost(self):
346         return self.get("hostname") in ['localhost', '127.0.0.1', '::1']
347
348     def do_provision(self):
349         # check if host is alive
350         if not self.is_alive():
351             msg = "Deploy failed. Unresponsive node %s" % self.get("hostname")
352             self.error(msg)
353             raise RuntimeError, msg
354
355         self.find_home()
356
357         if self.get("cleanProcesses"):
358             self.clean_processes()
359
360         if self.get("cleanHome"):
361             self.clean_home()
362  
363         if self.get("cleanExperiment"):
364             self.clean_experiment()
365     
366         # Create shared directory structure and node home directory
367         paths = [self.lib_dir, 
368             self.bin_dir, 
369             self.src_dir, 
370             self.share_dir, 
371             self.node_home]
372
373         self.mkdir(paths)
374
375         # Get Public IP address if possible
376         if not self.get("ip"):
377             try:
378                 ip = sshfuncs.gethostbyname(self.get("hostname"))
379                 self.set("ip", ip)
380             except:
381                 if self.get("gateway") is None:
382                     msg = "Local DNS can not resolve hostname %s" % self.get("hostname") 
383                     self.error(msg)
384
385         super(LinuxNode, self).do_provision()
386
387     def do_deploy(self):
388         if self.state == ResourceState.NEW:
389             self.info("Deploying node")
390             self.do_discover()
391             self.do_provision()
392
393         # Node needs to wait until all associated interfaces are 
394         # ready before it can finalize deployment
395         from nepi.resources.linux.interface import LinuxInterface
396         ifaces = self.get_connected(LinuxInterface.get_rtype())
397         for iface in ifaces:
398             if iface.state < ResourceState.READY:
399                 self.ec.schedule(self.reschedule_delay, self.deploy)
400                 return 
401
402         super(LinuxNode, self).do_deploy()
403
404     def do_release(self):
405         rms = self.get_connected()
406         for rm in rms:
407             # Node needs to wait until all associated RMs are released
408             # before it can be released
409             if rm.state != ResourceState.RELEASED:
410                 self.ec.schedule(self.reschedule_delay, self.release)
411                 return 
412
413         tear_down = self.get("tearDown")
414         if tear_down:
415             self.execute(tear_down)
416
417         if self.get("cleanProcessesAfter"):
418             self.clean_processes()
419
420         super(LinuxNode, self).do_release()
421
422     def valid_connection(self, guid):
423         # TODO: Validate!
424         return True
425
426     def clean_processes(self):
427         self.info("Cleaning up processes")
428
429         if self.localhost:
430             return 
431         
432         if self.get("username") != 'root':
433             cmd = ("sudo -S killall tcpdump || /bin/true ; " +
434                 "sudo -S kill -9 $(ps aux | grep '[.]nepi' | awk '{print $2}') || /bin/true ; " +
435                 "sudo -S killall -u %s || /bin/true ; " % self.get("username"))
436         else:
437             if self.state >= ResourceState.READY:
438                 import pickle
439                 pids = pickle.load(open("/tmp/save.proc", "rb"))
440                 pids_temp = dict()
441                 ps_aux = "ps aux |awk '{print $2,$11}'"
442                 (out, err), proc = self.execute(ps_aux)
443                 if len(out) != 0:
444                     for line in out.strip().split("\n"):
445                         parts = line.strip().split(" ")
446                         pids_temp[parts[0]] = parts[1]
447                     kill_pids = set(pids_temp.items()) - set(pids.items())
448                     kill_pids = ' '.join(dict(kill_pids).keys())
449
450                     cmd = ("killall tcpdump || /bin/true ; " +
451                         "kill $(ps aux | grep '[.]nepi' | awk '{print $2}') || /bin/true ; " +
452                         "kill %s || /bin/true ; " % kill_pids)
453                 else:
454                     cmd = ("killall tcpdump || /bin/true ; " +
455                         "kill $(ps aux | grep '[.]nepi' | awk '{print $2}') || /bin/true ; ")
456             else:
457                 cmd = ("killall tcpdump || /bin/true ; " +
458                     "kill $(ps aux | grep '[.]nepi' | awk '{print $2}') || /bin/true ; ")
459
460         (out, err), proc = self.execute(cmd, retry = 1, with_lock = True)
461
462     def clean_home(self):
463         """ Cleans all NEPI related folders in the Linux host
464         """
465         self.info("Cleaning up home")
466         
467         cmd = "cd %s ; find . -maxdepth 1 -name \.nepi -execdir rm -rf {} + " % (
468                 self.home_dir )
469
470         return self.execute(cmd, with_lock = True)
471
472     def clean_experiment(self):
473         """ Cleans all experiment related files in the Linux host.
474         It preserves NEPI files and folders that have a multi experiment
475         scope.
476         """
477         self.info("Cleaning up experiment files")
478         
479         cmd = "cd %s ; find . -maxdepth 1 -name '%s' -execdir rm -rf {} + " % (
480                 self.exp_dir,
481                 self.ec.exp_id )
482             
483         return self.execute(cmd, with_lock = True)
484
485     def execute(self, command,
486             sudo = False,
487             env = None,
488             tty = False,
489             forward_x11 = False,
490             retry = 3,
491             connect_timeout = 30,
492             strict_host_checking = False,
493             persistent = True,
494             blocking = True,
495             with_lock = False
496             ):
497         """ Notice that this invocation will block until the
498         execution finishes. If this is not the desired behavior,
499         use 'run' instead."""
500
501         if self.localhost:
502             (out, err), proc = execfuncs.lexec(command, 
503                     user = self.get("username"), # still problem with localhost
504                     sudo = sudo,
505                     env = env)
506         else:
507             if with_lock:
508                 # If the execute command is blocking, we don't want to keep
509                 # the node lock. This lock is used to avoid race conditions
510                 # when creating the ControlMaster sockets. A more elegant
511                 # solution is needed.
512                 with self._node_lock:
513                     (out, err), proc = sshfuncs.rexec(
514                         command, 
515                         host = self.get("hostname"),
516                         user = self.get("username"),
517                         port = self.get("port"),
518                         gwuser = self.get("gatewayUser"),
519                         gw = self.get("gateway"),
520                         agent = True,
521                         sudo = sudo,
522                         identity = self.get("identity"),
523                         server_key = self.get("serverKey"),
524                         env = env,
525                         tty = tty,
526                         forward_x11 = forward_x11,
527                         retry = retry,
528                         connect_timeout = connect_timeout,
529                         persistent = persistent,
530                         blocking = blocking, 
531                         strict_host_checking = strict_host_checking
532                         )
533             else:
534                 (out, err), proc = sshfuncs.rexec(
535                     command, 
536                     host = self.get("hostname"),
537                     user = self.get("username"),
538                     port = self.get("port"),
539                     gwuser = self.get("gatewayUser"),
540                     gw = self.get("gateway"),
541                     agent = True,
542                     sudo = sudo,
543                     identity = self.get("identity"),
544                     server_key = self.get("serverKey"),
545                     env = env,
546                     tty = tty,
547                     forward_x11 = forward_x11,
548                     retry = retry,
549                     connect_timeout = connect_timeout,
550                     persistent = persistent,
551                     blocking = blocking, 
552                     strict_host_checking = strict_host_checking
553                     )
554
555         return (out, err), proc
556
557     def run(self, command, home,
558             create_home = False,
559             pidfile = 'pidfile',
560             stdin = None, 
561             stdout = 'stdout', 
562             stderr = 'stderr', 
563             sudo = False,
564             tty = False,
565             strict_host_checking = False):
566         
567         self.debug("Running command '%s'" % command)
568         
569         if self.localhost:
570             (out, err), proc = execfuncs.lspawn(command, pidfile,
571                     home = home, 
572                     create_home = create_home, 
573                     stdin = stdin or '/dev/null',
574                     stdout = stdout or '/dev/null',
575                     stderr = stderr or '/dev/null',
576                     sudo = sudo) 
577         else:
578             with self._node_lock:
579                 (out, err), proc = sshfuncs.rspawn(
580                     command,
581                     pidfile = pidfile,
582                     home = home,
583                     create_home = create_home,
584                     stdin = stdin or '/dev/null',
585                     stdout = stdout or '/dev/null',
586                     stderr = stderr or '/dev/null',
587                     sudo = sudo,
588                     host = self.get("hostname"),
589                     user = self.get("username"),
590                     port = self.get("port"),
591                     gwuser = self.get("gatewayUser"),
592                     gw = self.get("gateway"),
593                     agent = True,
594                     identity = self.get("identity"),
595                     server_key = self.get("serverKey"),
596                     tty = tty,
597                     strict_host_checking = strict_host_checking
598                     )
599
600         return (out, err), proc
601
602     def getpid(self, home, pidfile = "pidfile"):
603         if self.localhost:
604             pidtuple =  execfuncs.lgetpid(os.path.join(home, pidfile))
605         else:
606             with self._node_lock:
607                 pidtuple = sshfuncs.rgetpid(
608                     os.path.join(home, pidfile),
609                     host = self.get("hostname"),
610                     user = self.get("username"),
611                     port = self.get("port"),
612                     gwuser = self.get("gatewayUser"),
613                     gw = self.get("gateway"),
614                     agent = True,
615                     identity = self.get("identity"),
616                     server_key = self.get("serverKey"),
617                     strict_host_checking = False
618                     )
619         
620         return pidtuple
621
622     def status(self, pid, ppid):
623         if self.localhost:
624             status = execfuncs.lstatus(pid, ppid)
625         else:
626             with self._node_lock:
627                 status = sshfuncs.rstatus(
628                         pid, ppid,
629                         host = self.get("hostname"),
630                         user = self.get("username"),
631                         port = self.get("port"),
632                         gwuser = self.get("gatewayUser"),
633                         gw = self.get("gateway"),
634                         agent = True,
635                         identity = self.get("identity"),
636                         server_key = self.get("serverKey"),
637                         strict_host_checking = False
638                         )
639            
640         return status
641     
642     def kill(self, pid, ppid, sudo = False):
643         out = err = ""
644         proc = None
645         status = self.status(pid, ppid)
646
647         if status == sshfuncs.ProcStatus.RUNNING:
648             if self.localhost:
649                 (out, err), proc = execfuncs.lkill(pid, ppid, sudo)
650             else:
651                 with self._node_lock:
652                     (out, err), proc = sshfuncs.rkill(
653                         pid, ppid,
654                         host = self.get("hostname"),
655                         user = self.get("username"),
656                         port = self.get("port"),
657                         gwuser = self.get("gatewayUser"),
658                         gw = self.get("gateway"),
659                         agent = True,
660                         sudo = sudo,
661                         identity = self.get("identity"),
662                         server_key = self.get("serverKey"),
663                         strict_host_checking = False
664                         )
665
666         return (out, err), proc
667
668     def copy(self, src, dst):
669         if self.localhost:
670             (out, err), proc = execfuncs.lcopy(src, dst, 
671                     recursive = True)
672         else:
673             with self._node_lock:
674                 (out, err), proc = sshfuncs.rcopy(
675                     src, dst, 
676                     port = self.get("port"),
677                     gwuser = self.get("gatewayUser"),
678                     gw = self.get("gateway"),
679                     identity = self.get("identity"),
680                     server_key = self.get("serverKey"),
681                     recursive = True,
682                     strict_host_checking = False)
683
684         return (out, err), proc
685
686     def upload(self, src, dst, text = False, overwrite = True,
687             raise_on_error = True):
688         """ Copy content to destination
689
690         src  string with the content to copy. Can be:
691             - plain text
692             - a string with the path to a local file
693             - a string with a semi-colon separeted list of local files
694             - a string with a local directory
695
696         dst  string with destination path on the remote host (remote is 
697             always self.host)
698
699         text src is text input, it must be stored into a temp file before 
700         uploading
701         """
702         # If source is a string input 
703         f = None
704         if text and not os.path.isfile(src):
705             # src is text input that should be uploaded as file
706             # create a temporal file with the content to upload
707             f = tempfile.NamedTemporaryFile(delete=False)
708             f.write(src)
709             f.close()
710             src = f.name
711
712         # If dst files should not be overwritten, check that the files do not
713         # exits already
714         if isinstance(src, str):
715             src = map(str.strip, src.split(";"))
716     
717         if overwrite == False:
718             src = self.filter_existing_files(src, dst)
719             if not src:
720                 return ("", ""), None
721
722         if not self.localhost:
723             # Build destination as <user>@<server>:<path>
724             dst = "%s@%s:%s" % (self.get("username"), self.get("hostname"), dst)
725
726         ((out, err), proc) = self.copy(src, dst)
727
728         # clean up temp file
729         if f:
730             os.remove(f.name)
731
732         if err:
733             msg = " Failed to upload files - src: %s dst: %s" %  (";".join(src), dst) 
734             self.error(msg, out, err)
735             
736             msg = "%s out: %s err: %s" % (msg, out, err)
737             if raise_on_error:
738                 raise RuntimeError, msg
739
740         return ((out, err), proc)
741
742     def download(self, src, dst, raise_on_error = True):
743         if not self.localhost:
744             # Build destination as <user>@<server>:<path>
745             src = "%s@%s:%s" % (self.get("username"), self.get("hostname"), src)
746
747         ((out, err), proc) = self.copy(src, dst)
748
749         if err:
750             msg = " Failed to download files - src: %s dst: %s" %  (";".join(src), dst) 
751             self.error(msg, out, err)
752
753             if raise_on_error:
754                 raise RuntimeError, msg
755
756         return ((out, err), proc)
757
758     def install_packages_command(self, packages):
759         command = ""
760         if self.use_rpm:
761             command = rpmfuncs.install_packages_command(self.os, packages)
762         elif self.use_deb:
763             command = debfuncs.install_packages_command(self.os, packages)
764         else:
765             msg = "Error installing packages ( OS not known ) "
766             self.error(msg, self.os)
767             raise RuntimeError, msg
768
769         return command
770
771     def install_packages(self, packages, home, run_home = None,
772             raise_on_error = True):
773         """ Install packages in the Linux host.
774
775         'home' is the directory to upload the package installation script.
776         'run_home' is the directory from where to execute the script.
777         """
778         command = self.install_packages_command(packages)
779
780         run_home = run_home or home
781
782         (out, err), proc = self.run_and_wait(command, run_home, 
783             shfile = os.path.join(home, "instpkg.sh"),
784             pidfile = "instpkg_pidfile",
785             ecodefile = "instpkg_exitcode",
786             stdout = "instpkg_stdout", 
787             stderr = "instpkg_stderr",
788             overwrite = False,
789             raise_on_error = raise_on_error)
790
791         return (out, err), proc 
792
793     def remove_packages(self, packages, home, run_home = None,
794             raise_on_error = True):
795         """ Uninstall packages from the Linux host.
796
797         'home' is the directory to upload the package un-installation script.
798         'run_home' is the directory from where to execute the script.
799         """
800         if self.use_rpm:
801             command = rpmfuncs.remove_packages_command(self.os, packages)
802         elif self.use_deb:
803             command = debfuncs.remove_packages_command(self.os, packages)
804         else:
805             msg = "Error removing packages ( OS not known ) "
806             self.error(msg)
807             raise RuntimeError, msg
808
809         run_home = run_home or home
810
811         (out, err), proc = self.run_and_wait(command, run_home, 
812             shfile = os.path.join(home, "rmpkg.sh"),
813             pidfile = "rmpkg_pidfile",
814             ecodefile = "rmpkg_exitcode",
815             stdout = "rmpkg_stdout", 
816             stderr = "rmpkg_stderr",
817             overwrite = False,
818             raise_on_error = raise_on_error)
819          
820         return (out, err), proc 
821
822     def mkdir(self, paths, clean = False):
823         """ Paths is either a single remote directory path to create,
824         or a list of directories to create.
825         """
826         if clean:
827             self.rmdir(paths)
828
829         if isinstance(paths, str):
830             paths = [paths]
831
832         cmd = " ; ".join(map(lambda path: "mkdir -p %s" % path, paths))
833
834         return self.execute(cmd, with_lock = True)
835
836     def rmdir(self, paths):
837         """ Paths is either a single remote directory path to delete,
838         or a list of directories to delete.
839         """
840
841         if isinstance(paths, str):
842             paths = [paths]
843
844         cmd = " ; ".join(map(lambda path: "rm -rf %s" % path, paths))
845
846         return self.execute(cmd, with_lock = True)
847         
848     def run_and_wait(self, command, home, 
849             shfile="cmd.sh",
850             env=None,
851             overwrite=True,
852             wait_run=True,
853             pidfile="pidfile", 
854             ecodefile="exitcode", 
855             stdin=None, 
856             stdout="stdout", 
857             stderr="stderr", 
858             sudo=False,
859             tty=False,
860             raise_on_error=True):
861         """
862         Uploads the 'command' to a bash script in the host.
863         Then runs the script detached in background in the host, and
864         busy-waites until the script finishes executing.
865         """
866
867         if not shfile.startswith("/"):
868             shfile = os.path.join(home, shfile)
869
870         self.upload_command(command, 
871             shfile = shfile, 
872             ecodefile = ecodefile, 
873             env = env,
874             overwrite = overwrite)
875
876         command = "bash %s" % shfile
877         # run command in background in remote host
878         (out, err), proc = self.run(command, home, 
879                 pidfile = pidfile,
880                 stdin = stdin, 
881                 stdout = stdout, 
882                 stderr = stderr, 
883                 sudo = sudo,
884                 tty = tty)
885
886         # check no errors occurred
887         if proc.poll():
888             msg = " Failed to run command '%s' " % command
889             self.error(msg, out, err)
890             if raise_on_error:
891                 raise RuntimeError, msg
892
893         # Wait for pid file to be generated
894         pid, ppid = self.wait_pid(
895                 home = home, 
896                 pidfile = pidfile, 
897                 raise_on_error = raise_on_error)
898
899         if wait_run:
900             # wait until command finishes to execute
901             self.wait_run(pid, ppid)
902           
903             (eout, err), proc = self.check_errors(home,
904                 ecodefile = ecodefile,
905                 stderr = stderr)
906
907             # Out is what was written in the stderr file
908             if err:
909                 msg = " Failed to run command '%s' " % command
910                 self.error(msg, eout, err)
911
912                 if raise_on_error:
913                     raise RuntimeError, msg
914
915         (out, oerr), proc = self.check_output(home, stdout)
916         
917         return (out, err), proc
918         
919     def exitcode(self, home, ecodefile = "exitcode"):
920         """
921         Get the exit code of an application.
922         Returns an integer value with the exit code 
923         """
924         (out, err), proc = self.check_output(home, ecodefile)
925
926         # Succeeded to open file, return exit code in the file
927         if proc.wait() == 0:
928             try:
929                 return int(out.strip())
930             except:
931                 # Error in the content of the file!
932                 return ExitCode.CORRUPTFILE
933
934         # No such file or directory
935         if proc.returncode == 1:
936             return ExitCode.FILENOTFOUND
937         
938         # Other error from 'cat'
939         return ExitCode.ERROR
940
941     def upload_command(self, command, 
942             shfile="cmd.sh",
943             ecodefile="exitcode",
944             overwrite=True,
945             env=None):
946         """ Saves the command as a bash script file in the remote host, and
947         forces to save the exit code of the command execution to the ecodefile
948         """
949
950         if not (command.strip().endswith(";") or command.strip().endswith("&")):
951             command += ";"
952       
953         # The exit code of the command will be stored in ecodefile
954         command = " { %(command)s } ; echo $? > %(ecodefile)s ;" % {
955                 'command': command,
956                 'ecodefile': ecodefile,
957                 } 
958
959         # Export environment
960         environ = self.format_environment(env)
961
962         # Add environ to command
963         command = environ + command
964
965         return self.upload(command, shfile, text=True, overwrite=overwrite)
966
967     def format_environment(self, env, inline=False):
968         """ Formats the environment variables for a command to be executed
969         either as an inline command
970         (i.e. export PYTHONPATH=src/..; export LALAL= ..;python script.py) or 
971         as a bash script (i.e. export PYTHONPATH=src/.. \n export LALA=.. \n)
972         """
973         if not env: return ""
974
975         # Remove extra white spaces
976         env = re.sub(r'\s+', ' ', env.strip())
977
978         sep = ";" if inline else "\n"
979         return sep.join(map(lambda e: " export %s" % e, env.split(" "))) + sep 
980
981     def check_errors(self, home, 
982             ecodefile = "exitcode", 
983             stderr = "stderr"):
984         """ Checks whether errors occurred while running a command.
985         It first checks the exit code for the command, and only if the
986         exit code is an error one it returns the error output.
987
988         """
989         proc = None
990         err = ""
991
992         # get exit code saved in the 'exitcode' file
993         ecode = self.exitcode(home, ecodefile)
994
995         if ecode in [ ExitCode.CORRUPTFILE, ExitCode.ERROR ]:
996             err = "Error retrieving exit code status from file %s/%s" % (home, ecodefile)
997         elif ecode > 0 or ecode == ExitCode.FILENOTFOUND:
998             # The process returned an error code or didn't exist. 
999             # Check standard error.
1000             (err, eerr), proc = self.check_output(home, stderr)
1001
1002             # If the stderr file was not found, assume nothing bad happened,
1003             # and just ignore the error.
1004             # (cat returns 1 for error "No such file or directory")
1005             if ecode == ExitCode.FILENOTFOUND and proc.poll() == 1: 
1006                 err = "" 
1007             
1008         return ("", err), proc
1009  
1010     def wait_pid(self, home, pidfile = "pidfile", raise_on_error = False):
1011         """ Waits until the pid file for the command is generated, 
1012             and returns the pid and ppid of the process """
1013         pid = ppid = None
1014         delay = 1.0
1015
1016         for i in xrange(2):
1017             pidtuple = self.getpid(home = home, pidfile = pidfile)
1018             
1019             if pidtuple:
1020                 pid, ppid = pidtuple
1021                 break
1022             else:
1023                 time.sleep(delay)
1024                 delay = delay * 1.5
1025         else:
1026             msg = " Failed to get pid for pidfile %s/%s " % (
1027                     home, pidfile )
1028             self.error(msg)
1029             
1030             if raise_on_error:
1031                 raise RuntimeError, msg
1032
1033         return pid, ppid
1034
1035     def wait_run(self, pid, ppid, trial = 0):
1036         """ wait for a remote process to finish execution """
1037         delay = 1.0
1038
1039         while True:
1040             status = self.status(pid, ppid)
1041             
1042             if status is ProcStatus.FINISHED:
1043                 break
1044             elif status is not ProcStatus.RUNNING:
1045                 delay = delay * 1.5
1046                 time.sleep(delay)
1047                 # If it takes more than 20 seconds to start, then
1048                 # asume something went wrong
1049                 if delay > 20:
1050                     break
1051             else:
1052                 # The app is running, just wait...
1053                 time.sleep(0.5)
1054
1055     def check_output(self, home, filename):
1056         """ Retrives content of file """
1057         (out, err), proc = self.execute("cat %s" % 
1058             os.path.join(home, filename), retry = 1, with_lock = True)
1059         return (out, err), proc
1060
1061     def is_alive(self):
1062         """ Checks if host is responsive
1063         """
1064         if self.localhost:
1065             return True
1066
1067         out = err = ""
1068         msg = "Unresponsive host. Wrong answer. "
1069
1070         # The underlying SSH layer will sometimes return an empty
1071         # output (even if the command was executed without errors).
1072         # To work arround this, repeat the operation N times or
1073         # until the result is not empty string
1074         try:
1075             (out, err), proc = self.execute("echo 'ALIVE'",
1076                     blocking = True,
1077                     with_lock = True)
1078     
1079             if out.find("ALIVE") > -1:
1080                 return True
1081         except:
1082             trace = traceback.format_exc()
1083             msg = "Unresponsive host. Error reaching host: %s " % trace
1084
1085         self.error(msg, out, err)
1086         return False
1087
1088     def find_home(self):
1089         """ Retrieves host home directory
1090         """
1091         # The underlying SSH layer will sometimes return an empty
1092         # output (even if the command was executed without errors).
1093         # To work arround this, repeat the operation N times or
1094         # until the result is not empty string
1095         msg = "Impossible to retrieve HOME directory"
1096         try:
1097             (out, err), proc = self.execute("echo ${HOME}",
1098                     blocking = True,
1099                     with_lock = True)
1100     
1101             if out.strip() != "":
1102                 self._home_dir =  out.strip()
1103         except:
1104             trace = traceback.format_exc()
1105             msg = "Impossible to retrieve HOME directory %s" % trace
1106
1107         if not self._home_dir:
1108             self.error(msg)
1109             raise RuntimeError, msg
1110
1111     def filter_existing_files(self, src, dst):
1112         """ Removes files that already exist in the Linux host from src list
1113         """
1114         # construct a dictionary with { dst: src }
1115         dests = dict(map(lambda s: (os.path.join(dst, os.path.basename(s)), s), src)) \
1116                     if len(src) > 1 else dict({dst: src[0]})
1117
1118         command = []
1119         for d in dests.keys():
1120             command.append(" [ -f %(dst)s ] && echo '%(dst)s' " % {'dst' : d} )
1121
1122         command = ";".join(command)
1123
1124         (out, err), proc = self.execute(command, retry = 1, with_lock = True)
1125     
1126         for d in dests.keys():
1127             if out.find(d) > -1:
1128                 del dests[d]
1129
1130         if not dests:
1131             return []
1132
1133         return dests.values()
1134