7c8b3624f4bc1c85eed6820e710ca30d81db34ae
[nepi.git] / src / nepi / resources / linux / node.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License version 2 as
7 #    published by the Free Software Foundation;
8 #
9 #    This program is distributed in the hope that it will be useful,
10 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
11 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 #    GNU General Public License for more details.
13 #
14 #    You should have received a copy of the GNU General Public License
15 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 #
17 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
18
19 from nepi.execution.attribute import Attribute, Flags, Types
20 from nepi.execution.resource import ResourceManager, clsinit_copy, \
21         ResourceState
22 from nepi.resources.linux import rpmfuncs, debfuncs 
23 from nepi.util import sshfuncs, execfuncs
24 from nepi.util.sshfuncs import ProcStatus
25
26 import collections
27 import os
28 import random
29 import re
30 import tempfile
31 import time
32 import threading
33 import traceback
34
35 # TODO: Unify delays!!
36 # TODO: Validate outcome of uploads!! 
37
38 class ExitCode:
39     """
40     Error codes that the rexitcode function can return if unable to
41     check the exit code of a spawned process
42     """
43     FILENOTFOUND = -1
44     CORRUPTFILE = -2
45     ERROR = -3
46     OK = 0
47
48 class OSType:
49     """
50     Supported flavors of Linux OS
51     """
52     DEBIAN = 1 
53     UBUNTU = 1 << 1 
54     FEDORA = 1 << 2
55     FEDORA_8 = 1 << 3 | FEDORA 
56     FEDORA_12 = 1 << 4 | FEDORA 
57     FEDORA_14 = 1 << 5 | FEDORA 
58
59 @clsinit_copy
60 class LinuxNode(ResourceManager):
61     """
62     .. class:: Class Args :
63       
64         :param ec: The Experiment controller
65         :type ec: ExperimentController
66         :param guid: guid of the RM
67         :type guid: int
68
69     .. note::
70
71         There are different ways in which commands can be executed using the
72         LinuxNode interface (i.e. 'execute' - blocking and non blocking, 'run',
73         'run_and_wait'). 
74         
75         Brief explanation:
76
77             * 'execute' (blocking mode) :  
78
79                      HOW IT WORKS: 'execute', forks a process and run the
80                      command, synchronously, attached to the terminal, in
81                      foreground.
82                      The execute method will block until the command returns
83                      the result on 'out', 'err' (so until it finishes executing).
84   
85                      USAGE: short-lived commands that must be executed attached
86                      to a terminal and in foreground, for which it IS necessary
87                      to block until the command has finished (e.g. if you want
88                      to run 'ls' or 'cat').
89
90             * 'execute' (NON blocking mode - blocking = False) :
91
92                     HOW IT WORKS: Same as before, except that execute method
93                     will return immediately (even if command still running).
94
95                     USAGE: long-lived commands that must be executed attached
96                     to a terminal and in foreground, but for which it is not
97                     necessary to block until the command has finished. (e.g.
98                     start an application using X11 forwarding)
99
100              * 'run' :
101
102                    HOW IT WORKS: Connects to the host ( using SSH if remote)
103                    and launches the command in background, detached from any
104                    terminal (daemonized), and returns. The command continues to
105                    run remotely, but since it is detached from the terminal,
106                    its pipes (stdin, stdout, stderr) can't be redirected to the
107                    console (as normal non detached processes would), and so they
108                    are explicitly redirected to files. The pidfile is created as
109                    part of the process of launching the command. The pidfile
110                    holds the pid and ppid of the process forked in background,
111                    so later on it is possible to check whether the command is still
112                    running.
113
114                     USAGE: long-lived commands that can run detached in background,
115                     for which it is NOT necessary to block (wait) until the command
116                     has finished. (e.g. start an application that is not using X11
117                     forwarding. It can run detached and remotely in background)
118
119              * 'run_and_wait' :
120
121                     HOW IT WORKS: Similar to 'run' except that it 'blocks' until
122                     the command has finished execution. It also checks whether
123                     errors occurred during runtime by reading the exitcode file,
124                     which contains the exit code of the command that was run
125                     (checking stderr only is not always reliable since many
126                     commands throw debugging info to stderr and the only way to
127                     automatically know whether an error really happened is to
128                     check the process exit code).
129
130                     Another difference with respect to 'run', is that instead
131                     of directly executing the command as a bash command line,
132                     it uploads the command to a bash script and runs the script.
133                     This allows to use the bash script to debug errors, since
134                     it remains at the remote host and can be run manually to
135                     reproduce the error.
136                   
137                     USAGE: medium-lived commands that can run detached in
138                     background, for which it IS necessary to block (wait) until
139                     the command has finished. (e.g. Package installation,
140                     source compilation, file download, etc)
141
142     """
143     _rtype = "linux::Node"
144     _help = "Controls Linux host machines ( either localhost or a host " \
145             "that can be accessed using a SSH key)"
146     _platform = "linux"
147
148     @classmethod
149     def _register_attributes(cls):
150         hostname = Attribute("hostname", "Hostname of the machine",
151                 flags = Flags.Design)
152
153         username = Attribute("username", "Local account username", 
154                 flags = Flags.Credential)
155
156         port = Attribute("port", "SSH port", flags = Flags.Design)
157         
158         home = Attribute("home",
159                 "Experiment home directory to store all experiment related files",
160                 flags = Flags.Design)
161         
162         identity = Attribute("identity", "SSH identity file",
163                 flags = Flags.Credential)
164         
165         server_key = Attribute("serverKey", "Server public key", 
166                 flags = Flags.Design)
167         
168         clean_home = Attribute("cleanHome", "Remove all nepi files and directories "
169                 " from node home folder before starting experiment", 
170                 type = Types.Bool,
171                 default = False,
172                 flags = Flags.Design)
173
174         clean_experiment = Attribute("cleanExperiment", "Remove all files and directories " 
175                 " from a previous same experiment, before the new experiment starts", 
176                 type = Types.Bool,
177                 default = False,
178                 flags = Flags.Design)
179         
180         clean_processes = Attribute("cleanProcesses", 
181                 "Kill all running processes before starting experiment",
182                 type = Types.Bool,
183                 default = False,
184                 flags = Flags.Design)
185         
186         tear_down = Attribute("tearDown", "Bash script to be executed before " + \
187                 "releasing the resource",
188                 flags = Flags.Design)
189
190         gateway_user = Attribute("gatewayUser", "Gateway account username",
191                 flags = Flags.Design)
192
193         gateway = Attribute("gateway", "Hostname of the gateway machine",
194                 flags = Flags.Design)
195
196         ip = Attribute("ip", "Linux host public IP address. "
197                    "Must not be modified by the user unless hostname is 'localhost'",
198                     flags = Flags.Design)
199
200         cls._register_attribute(hostname)
201         cls._register_attribute(username)
202         cls._register_attribute(port)
203         cls._register_attribute(home)
204         cls._register_attribute(identity)
205         cls._register_attribute(server_key)
206         cls._register_attribute(clean_home)
207         cls._register_attribute(clean_experiment)
208         cls._register_attribute(clean_processes)
209         cls._register_attribute(tear_down)
210         cls._register_attribute(gateway_user)
211         cls._register_attribute(gateway)
212         cls._register_attribute(ip)
213
214     def __init__(self, ec, guid):
215         super(LinuxNode, self).__init__(ec, guid)
216         self._os = None
217         # home directory at Linux host
218         self._home_dir = ""
219
220         # lock to prevent concurrent applications on the same node,
221         # to execute commands at the same time. There are potential
222         # concurrency issues when using SSH to a same host from 
223         # multiple threads. There are also possible operational 
224         # issues, e.g. an application querying the existence 
225         # of a file or folder prior to its creation, and another 
226         # application creating the same file or folder in between.
227         self._node_lock = threading.Lock()
228     
229     def log_message(self, msg):
230         return " guid %d - host %s - %s " % (self.guid, 
231                 self.get("hostname"), msg)
232
233     @property
234     def home_dir(self):
235         home = self.get("home") or ""
236         if not home.startswith("/"):
237            home = os.path.join(self._home_dir, home) 
238         return home
239
240     @property
241     def nepi_home(self):
242         return os.path.join(self.home_dir, ".nepi")
243
244     @property
245     def usr_dir(self):
246         return os.path.join(self.nepi_home, "nepi-usr")
247
248     @property
249     def lib_dir(self):
250         return os.path.join(self.usr_dir, "lib")
251
252     @property
253     def bin_dir(self):
254         return os.path.join(self.usr_dir, "bin")
255
256     @property
257     def src_dir(self):
258         return os.path.join(self.usr_dir, "src")
259
260     @property
261     def share_dir(self):
262         return os.path.join(self.usr_dir, "share")
263
264     @property
265     def exp_dir(self):
266         return os.path.join(self.nepi_home, "nepi-exp")
267
268     @property
269     def exp_home(self):
270         return os.path.join(self.exp_dir, self.ec.exp_id)
271
272     @property
273     def node_home(self):
274         return os.path.join(self.exp_home, "node-%d" % self.guid)
275
276     @property
277     def run_home(self):
278         return os.path.join(self.node_home, self.ec.run_id)
279
280     @property
281     def os(self):
282         if self._os:
283             return self._os
284
285         if not self.localhost and not self.get("username"):
286             msg = "Can't resolve OS, insufficient data "
287             self.error(msg)
288             raise RuntimeError, msg
289
290         out = self.get_os()
291
292         if out.find("Debian") == 0: 
293             self._os = OSType.DEBIAN
294         elif out.find("Ubuntu") ==0:
295             self._os = OSType.UBUNTU
296         elif out.find("Fedora release") == 0:
297             self._os = OSType.FEDORA
298             if out.find("Fedora release 8") == 0:
299                 self._os = OSType.FEDORA_8
300             elif out.find("Fedora release 12") == 0:
301                 self._os = OSType.FEDORA_12
302             elif out.find("Fedora release 14") == 0:
303                 self._os = OSType.FEDORA_14
304         else:
305             msg = "Unsupported OS"
306             self.error(msg, out)
307             raise RuntimeError, "%s - %s " %( msg, out )
308
309         return self._os
310
311     def get_os(self):
312         # The underlying SSH layer will sometimes return an empty
313         # output (even if the command was executed without errors).
314         # To work arround this, repeat the operation N times or
315         # until the result is not empty string
316         out = ""
317         try:
318             (out, err), proc = self.execute("cat /etc/issue", 
319                     with_lock = True,
320                     blocking = True)
321         except:
322             trace = traceback.format_exc()
323             msg = "Error detecting OS: %s " % trace
324             self.error(msg, out, err)
325         
326         return out
327
328     @property
329     def use_deb(self):
330         return (self.os & (OSType.DEBIAN|OSType.UBUNTU))
331
332     @property
333     def use_rpm(self):
334         return (self.os & OSType.FEDORA)
335
336     @property
337     def localhost(self):
338         return self.get("hostname") in ['localhost', '127.0.0.1', '::1']
339
340     def do_provision(self):
341         # check if host is alive
342         if not self.is_alive():
343             msg = "Deploy failed. Unresponsive node %s" % self.get("hostname")
344             self.error(msg)
345             raise RuntimeError, msg
346
347         self.find_home()
348
349         if self.get("cleanProcesses"):
350             self.clean_processes()
351
352         if self.get("cleanHome"):
353             self.clean_home()
354  
355         if self.get("cleanExperiment"):
356             self.clean_experiment()
357     
358         # Create shared directory structure and node home directory
359         paths = [self.lib_dir, 
360             self.bin_dir, 
361             self.src_dir, 
362             self.share_dir, 
363             self.node_home]
364
365         self.mkdir(paths)
366
367         # Get Public IP address if possible
368         if not self.get("ip"):
369             try:
370                 ip = sshfuncs.gethostbyname(self.get("hostname"))
371                 self.set("ip", ip)
372             except:
373                 if self.get("gateway") is None:
374                     msg = "Local DNS can not resolve hostname %s" % self.get("hostname") 
375                     self.error(msg)
376
377         super(LinuxNode, self).do_provision()
378
379     def do_deploy(self):
380         if self.state == ResourceState.NEW:
381             self.info("Deploying node")
382             self.do_discover()
383             self.do_provision()
384
385         # Node needs to wait until all associated interfaces are 
386         # ready before it can finalize deployment
387         from nepi.resources.linux.interface import LinuxInterface
388         ifaces = self.get_connected(LinuxInterface.get_rtype())
389         for iface in ifaces:
390             if iface.state < ResourceState.READY:
391                 self.ec.schedule(self.reschedule_delay, self.deploy)
392                 return 
393
394         super(LinuxNode, self).do_deploy()
395
396     def do_release(self):
397         rms = self.get_connected()
398         for rm in rms:
399             # Node needs to wait until all associated RMs are released
400             # before it can be released
401             if rm.state != ResourceState.RELEASED:
402                 self.ec.schedule(self.reschedule_delay, self.release)
403                 return 
404
405         tear_down = self.get("tearDown")
406         if tear_down:
407             self.execute(tear_down)
408
409         self.clean_processes()
410
411         super(LinuxNode, self).do_release()
412
413     def valid_connection(self, guid):
414         # TODO: Validate!
415         return True
416
417     def clean_processes(self):
418         self.info("Cleaning up processes")
419
420         if self.localhost:
421             return 
422         
423         if self.get("username") != 'root':
424             cmd = ("sudo -S killall tcpdump || /bin/true ; " +
425                 "sudo -S kill -9 $(ps aux | grep '[.]nepi' | awk '{print $2}') || /bin/true ; " +
426                 "sudo -S killall -u %s || /bin/true ; " % self.get("username"))
427         else:
428             if self.state >= ResourceState.READY:
429                 import pickle
430                 pids = pickle.load(open("/tmp/save.proc", "rb"))
431                 pids_temp = dict()
432                 ps_aux = "ps aux |awk '{print $2,$11}'"
433                 (out, err), proc = self.execute(ps_aux)
434                 if len(out) != 0:
435                     for line in out.strip().split("\n"):
436                         parts = line.strip().split(" ")
437                         pids_temp[parts[0]] = parts[1]
438                     kill_pids = set(pids_temp.items()) - set(pids.items())
439                     kill_pids = ' '.join(dict(kill_pids).keys())
440
441                     cmd = ("killall tcpdump || /bin/true ; " +
442                         "kill $(ps aux | grep '[.]nepi' | awk '{print $2}') || /bin/true ; " +
443                         "kill %s || /bin/true ; " % kill_pids)
444                 else:
445                     cmd = ("killall tcpdump || /bin/true ; " +
446                         "kill $(ps aux | grep '[.]nepi' | awk '{print $2}') || /bin/true ; ")
447             else:
448                 cmd = ("killall tcpdump || /bin/true ; " +
449                     "kill $(ps aux | grep '[.]nepi' | awk '{print $2}') || /bin/true ; ")
450
451         (out, err), proc = self.execute(cmd, retry = 1, with_lock = True)
452
453     def clean_home(self):
454         """ Cleans all NEPI related folders in the Linux host
455         """
456         self.info("Cleaning up home")
457         
458         cmd = "cd %s ; find . -maxdepth 1 -name \.nepi -execdir rm -rf {} + " % (
459                 self.home_dir )
460
461         return self.execute(cmd, with_lock = True)
462
463     def clean_experiment(self):
464         """ Cleans all experiment related files in the Linux host.
465         It preserves NEPI files and folders that have a multi experiment
466         scope.
467         """
468         self.info("Cleaning up experiment files")
469         
470         cmd = "cd %s ; find . -maxdepth 1 -name '%s' -execdir rm -rf {} + " % (
471                 self.exp_dir,
472                 self.ec.exp_id )
473             
474         return self.execute(cmd, with_lock = True)
475
476     def execute(self, command,
477             sudo = False,
478             env = None,
479             tty = False,
480             forward_x11 = False,
481             retry = 3,
482             connect_timeout = 30,
483             strict_host_checking = False,
484             persistent = True,
485             blocking = True,
486             with_lock = False
487             ):
488         """ Notice that this invocation will block until the
489         execution finishes. If this is not the desired behavior,
490         use 'run' instead."""
491
492         if self.localhost:
493             (out, err), proc = execfuncs.lexec(command, 
494                     user = self.get("username"), # still problem with localhost
495                     sudo = sudo,
496                     env = env)
497         else:
498             if with_lock:
499                 # If the execute command is blocking, we don't want to keep
500                 # the node lock. This lock is used to avoid race conditions
501                 # when creating the ControlMaster sockets. A more elegant
502                 # solution is needed.
503                 with self._node_lock:
504                     (out, err), proc = sshfuncs.rexec(
505                         command, 
506                         host = self.get("hostname"),
507                         user = self.get("username"),
508                         port = self.get("port"),
509                         gwuser = self.get("gatewayUser"),
510                         gw = self.get("gateway"),
511                         agent = True,
512                         sudo = sudo,
513                         identity = self.get("identity"),
514                         server_key = self.get("serverKey"),
515                         env = env,
516                         tty = tty,
517                         forward_x11 = forward_x11,
518                         retry = retry,
519                         connect_timeout = connect_timeout,
520                         persistent = persistent,
521                         blocking = blocking, 
522                         strict_host_checking = strict_host_checking
523                         )
524             else:
525                 (out, err), proc = sshfuncs.rexec(
526                     command, 
527                     host = self.get("hostname"),
528                     user = self.get("username"),
529                     port = self.get("port"),
530                     gwuser = self.get("gatewayUser"),
531                     gw = self.get("gateway"),
532                     agent = True,
533                     sudo = sudo,
534                     identity = self.get("identity"),
535                     server_key = self.get("serverKey"),
536                     env = env,
537                     tty = tty,
538                     forward_x11 = forward_x11,
539                     retry = retry,
540                     connect_timeout = connect_timeout,
541                     persistent = persistent,
542                     blocking = blocking, 
543                     strict_host_checking = strict_host_checking
544                     )
545
546         return (out, err), proc
547
548     def run(self, command, home,
549             create_home = False,
550             pidfile = 'pidfile',
551             stdin = None, 
552             stdout = 'stdout', 
553             stderr = 'stderr', 
554             sudo = False,
555             tty = False,
556             strict_host_checking = False):
557         
558         self.debug("Running command '%s'" % command)
559         
560         if self.localhost:
561             (out, err), proc = execfuncs.lspawn(command, pidfile,
562                     home = home, 
563                     create_home = create_home, 
564                     stdin = stdin or '/dev/null',
565                     stdout = stdout or '/dev/null',
566                     stderr = stderr or '/dev/null',
567                     sudo = sudo) 
568         else:
569             with self._node_lock:
570                 (out, err), proc = sshfuncs.rspawn(
571                     command,
572                     pidfile = pidfile,
573                     home = home,
574                     create_home = create_home,
575                     stdin = stdin or '/dev/null',
576                     stdout = stdout or '/dev/null',
577                     stderr = stderr or '/dev/null',
578                     sudo = sudo,
579                     host = self.get("hostname"),
580                     user = self.get("username"),
581                     port = self.get("port"),
582                     gwuser = self.get("gatewayUser"),
583                     gw = self.get("gateway"),
584                     agent = True,
585                     identity = self.get("identity"),
586                     server_key = self.get("serverKey"),
587                     tty = tty,
588                     strict_host_checking = strict_host_checking
589                     )
590
591         return (out, err), proc
592
593     def getpid(self, home, pidfile = "pidfile"):
594         if self.localhost:
595             pidtuple =  execfuncs.lgetpid(os.path.join(home, pidfile))
596         else:
597             with self._node_lock:
598                 pidtuple = sshfuncs.rgetpid(
599                     os.path.join(home, pidfile),
600                     host = self.get("hostname"),
601                     user = self.get("username"),
602                     port = self.get("port"),
603                     gwuser = self.get("gatewayUser"),
604                     gw = self.get("gateway"),
605                     agent = True,
606                     identity = self.get("identity"),
607                     server_key = self.get("serverKey"),
608                     strict_host_checking = False
609                     )
610         
611         return pidtuple
612
613     def status(self, pid, ppid):
614         if self.localhost:
615             status = execfuncs.lstatus(pid, ppid)
616         else:
617             with self._node_lock:
618                 status = sshfuncs.rstatus(
619                         pid, ppid,
620                         host = self.get("hostname"),
621                         user = self.get("username"),
622                         port = self.get("port"),
623                         gwuser = self.get("gatewayUser"),
624                         gw = self.get("gateway"),
625                         agent = True,
626                         identity = self.get("identity"),
627                         server_key = self.get("serverKey"),
628                         strict_host_checking = False
629                         )
630            
631         return status
632     
633     def kill(self, pid, ppid, sudo = False):
634         out = err = ""
635         proc = None
636         status = self.status(pid, ppid)
637
638         if status == sshfuncs.ProcStatus.RUNNING:
639             if self.localhost:
640                 (out, err), proc = execfuncs.lkill(pid, ppid, sudo)
641             else:
642                 with self._node_lock:
643                     (out, err), proc = sshfuncs.rkill(
644                         pid, ppid,
645                         host = self.get("hostname"),
646                         user = self.get("username"),
647                         port = self.get("port"),
648                         gwuser = self.get("gatewayUser"),
649                         gw = self.get("gateway"),
650                         agent = True,
651                         sudo = sudo,
652                         identity = self.get("identity"),
653                         server_key = self.get("serverKey"),
654                         strict_host_checking = False
655                         )
656
657         return (out, err), proc
658
659     def copy(self, src, dst):
660         if self.localhost:
661             (out, err), proc = execfuncs.lcopy(src, dst, 
662                     recursive = True)
663         else:
664             with self._node_lock:
665                 (out, err), proc = sshfuncs.rcopy(
666                     src, dst, 
667                     port = self.get("port"),
668                     gwuser = self.get("gatewayUser"),
669                     gw = self.get("gateway"),
670                     identity = self.get("identity"),
671                     server_key = self.get("serverKey"),
672                     recursive = True,
673                     strict_host_checking = False)
674
675         return (out, err), proc
676
677     def upload(self, src, dst, text = False, overwrite = True,
678             raise_on_error = True):
679         """ Copy content to destination
680
681         src  string with the content to copy. Can be:
682             - plain text
683             - a string with the path to a local file
684             - a string with a semi-colon separeted list of local files
685             - a string with a local directory
686
687         dst  string with destination path on the remote host (remote is 
688             always self.host)
689
690         text src is text input, it must be stored into a temp file before 
691         uploading
692         """
693         # If source is a string input 
694         f = None
695         if text and not os.path.isfile(src):
696             # src is text input that should be uploaded as file
697             # create a temporal file with the content to upload
698             f = tempfile.NamedTemporaryFile(delete=False)
699             f.write(src)
700             f.close()
701             src = f.name
702
703         # If dst files should not be overwritten, check that the files do not
704         # exits already
705         if isinstance(src, str):
706             src = map(str.strip, src.split(";"))
707     
708         if overwrite == False:
709             src = self.filter_existing_files(src, dst)
710             if not src:
711                 return ("", ""), None
712
713         if not self.localhost:
714             # Build destination as <user>@<server>:<path>
715             dst = "%s@%s:%s" % (self.get("username"), self.get("hostname"), dst)
716
717         ((out, err), proc) = self.copy(src, dst)
718
719         # clean up temp file
720         if f:
721             os.remove(f.name)
722
723         if err:
724             msg = " Failed to upload files - src: %s dst: %s" %  (";".join(src), dst) 
725             self.error(msg, out, err)
726             
727             msg = "%s out: %s err: %s" % (msg, out, err)
728             if raise_on_error:
729                 raise RuntimeError, msg
730
731         return ((out, err), proc)
732
733     def download(self, src, dst, raise_on_error = True):
734         if not self.localhost:
735             # Build destination as <user>@<server>:<path>
736             src = "%s@%s:%s" % (self.get("username"), self.get("hostname"), src)
737
738         ((out, err), proc) = self.copy(src, dst)
739
740         if err:
741             msg = " Failed to download files - src: %s dst: %s" %  (";".join(src), dst) 
742             self.error(msg, out, err)
743
744             if raise_on_error:
745                 raise RuntimeError, msg
746
747         return ((out, err), proc)
748
749     def install_packages_command(self, packages):
750         command = ""
751         if self.use_rpm:
752             command = rpmfuncs.install_packages_command(self.os, packages)
753         elif self.use_deb:
754             command = debfuncs.install_packages_command(self.os, packages)
755         else:
756             msg = "Error installing packages ( OS not known ) "
757             self.error(msg, self.os)
758             raise RuntimeError, msg
759
760         return command
761
762     def install_packages(self, packages, home, run_home = None,
763             raise_on_error = True):
764         """ Install packages in the Linux host.
765
766         'home' is the directory to upload the package installation script.
767         'run_home' is the directory from where to execute the script.
768         """
769         command = self.install_packages_command(packages)
770
771         run_home = run_home or home
772
773         (out, err), proc = self.run_and_wait(command, run_home, 
774             shfile = os.path.join(home, "instpkg.sh"),
775             pidfile = "instpkg_pidfile",
776             ecodefile = "instpkg_exitcode",
777             stdout = "instpkg_stdout", 
778             stderr = "instpkg_stderr",
779             overwrite = False,
780             raise_on_error = raise_on_error)
781
782         return (out, err), proc 
783
784     def remove_packages(self, packages, home, run_home = None,
785             raise_on_error = True):
786         """ Uninstall packages from the Linux host.
787
788         'home' is the directory to upload the package un-installation script.
789         'run_home' is the directory from where to execute the script.
790         """
791         if self.use_rpm:
792             command = rpmfuncs.remove_packages_command(self.os, packages)
793         elif self.use_deb:
794             command = debfuncs.remove_packages_command(self.os, packages)
795         else:
796             msg = "Error removing packages ( OS not known ) "
797             self.error(msg)
798             raise RuntimeError, msg
799
800         run_home = run_home or home
801
802         (out, err), proc = self.run_and_wait(command, run_home, 
803             shfile = os.path.join(home, "rmpkg.sh"),
804             pidfile = "rmpkg_pidfile",
805             ecodefile = "rmpkg_exitcode",
806             stdout = "rmpkg_stdout", 
807             stderr = "rmpkg_stderr",
808             overwrite = False,
809             raise_on_error = raise_on_error)
810          
811         return (out, err), proc 
812
813     def mkdir(self, paths, clean = False):
814         """ Paths is either a single remote directory path to create,
815         or a list of directories to create.
816         """
817         if clean:
818             self.rmdir(paths)
819
820         if isinstance(paths, str):
821             paths = [paths]
822
823         cmd = " ; ".join(map(lambda path: "mkdir -p %s" % path, paths))
824
825         return self.execute(cmd, with_lock = True)
826
827     def rmdir(self, paths):
828         """ Paths is either a single remote directory path to delete,
829         or a list of directories to delete.
830         """
831
832         if isinstance(paths, str):
833             paths = [paths]
834
835         cmd = " ; ".join(map(lambda path: "rm -rf %s" % path, paths))
836
837         return self.execute(cmd, with_lock = True)
838         
839     def run_and_wait(self, command, home, 
840             shfile="cmd.sh",
841             env=None,
842             overwrite=True,
843             wait_run=True,
844             pidfile="pidfile", 
845             ecodefile="exitcode", 
846             stdin=None, 
847             stdout="stdout", 
848             stderr="stderr", 
849             sudo=False,
850             tty=False,
851             raise_on_error=True):
852         """
853         Uploads the 'command' to a bash script in the host.
854         Then runs the script detached in background in the host, and
855         busy-waites until the script finishes executing.
856         """
857
858         if not shfile.startswith("/"):
859             shfile = os.path.join(home, shfile)
860
861         self.upload_command(command, 
862             shfile = shfile, 
863             ecodefile = ecodefile, 
864             env = env,
865             overwrite = overwrite)
866
867         command = "bash %s" % shfile
868         # run command in background in remote host
869         (out, err), proc = self.run(command, home, 
870                 pidfile = pidfile,
871                 stdin = stdin, 
872                 stdout = stdout, 
873                 stderr = stderr, 
874                 sudo = sudo,
875                 tty = tty)
876
877         # check no errors occurred
878         if proc.poll():
879             msg = " Failed to run command '%s' " % command
880             self.error(msg, out, err)
881             if raise_on_error:
882                 raise RuntimeError, msg
883
884         # Wait for pid file to be generated
885         pid, ppid = self.wait_pid(
886                 home = home, 
887                 pidfile = pidfile, 
888                 raise_on_error = raise_on_error)
889
890         if wait_run:
891             # wait until command finishes to execute
892             self.wait_run(pid, ppid)
893           
894             (eout, err), proc = self.check_errors(home,
895                 ecodefile = ecodefile,
896                 stderr = stderr)
897
898             # Out is what was written in the stderr file
899             if err:
900                 msg = " Failed to run command '%s' " % command
901                 self.error(msg, eout, err)
902
903                 if raise_on_error:
904                     raise RuntimeError, msg
905
906         (out, oerr), proc = self.check_output(home, stdout)
907         
908         return (out, err), proc
909         
910     def exitcode(self, home, ecodefile = "exitcode"):
911         """
912         Get the exit code of an application.
913         Returns an integer value with the exit code 
914         """
915         (out, err), proc = self.check_output(home, ecodefile)
916
917         # Succeeded to open file, return exit code in the file
918         if proc.wait() == 0:
919             try:
920                 return int(out.strip())
921             except:
922                 # Error in the content of the file!
923                 return ExitCode.CORRUPTFILE
924
925         # No such file or directory
926         if proc.returncode == 1:
927             return ExitCode.FILENOTFOUND
928         
929         # Other error from 'cat'
930         return ExitCode.ERROR
931
932     def upload_command(self, command, 
933             shfile="cmd.sh",
934             ecodefile="exitcode",
935             overwrite=True,
936             env=None):
937         """ Saves the command as a bash script file in the remote host, and
938         forces to save the exit code of the command execution to the ecodefile
939         """
940
941         if not (command.strip().endswith(";") or command.strip().endswith("&")):
942             command += ";"
943       
944         # The exit code of the command will be stored in ecodefile
945         command = " { %(command)s } ; echo $? > %(ecodefile)s ;" % {
946                 'command': command,
947                 'ecodefile': ecodefile,
948                 } 
949
950         # Export environment
951         environ = self.format_environment(env)
952
953         # Add environ to command
954         command = environ + command
955
956         return self.upload(command, shfile, text=True, overwrite=overwrite)
957
958     def format_environment(self, env, inline=False):
959         """ Formats the environment variables for a command to be executed
960         either as an inline command
961         (i.e. export PYTHONPATH=src/..; export LALAL= ..;python script.py) or 
962         as a bash script (i.e. export PYTHONPATH=src/.. \n export LALA=.. \n)
963         """
964         if not env: return ""
965
966         # Remove extra white spaces
967         env = re.sub(r'\s+', ' ', env.strip())
968
969         sep = ";" if inline else "\n"
970         return sep.join(map(lambda e: " export %s" % e, env.split(" "))) + sep 
971
972     def check_errors(self, home, 
973             ecodefile = "exitcode", 
974             stderr = "stderr"):
975         """ Checks whether errors occurred while running a command.
976         It first checks the exit code for the command, and only if the
977         exit code is an error one it returns the error output.
978
979         """
980         proc = None
981         err = ""
982
983         # get exit code saved in the 'exitcode' file
984         ecode = self.exitcode(home, ecodefile)
985
986         if ecode in [ ExitCode.CORRUPTFILE, ExitCode.ERROR ]:
987             err = "Error retrieving exit code status from file %s/%s" % (home, ecodefile)
988         elif ecode > 0 or ecode == ExitCode.FILENOTFOUND:
989             # The process returned an error code or didn't exist. 
990             # Check standard error.
991             (err, eerr), proc = self.check_output(home, stderr)
992
993             # If the stderr file was not found, assume nothing bad happened,
994             # and just ignore the error.
995             # (cat returns 1 for error "No such file or directory")
996             if ecode == ExitCode.FILENOTFOUND and proc.poll() == 1: 
997                 err = "" 
998             
999         return ("", err), proc
1000  
1001     def wait_pid(self, home, pidfile = "pidfile", raise_on_error = False):
1002         """ Waits until the pid file for the command is generated, 
1003             and returns the pid and ppid of the process """
1004         pid = ppid = None
1005         delay = 1.0
1006
1007         for i in xrange(2):
1008             pidtuple = self.getpid(home = home, pidfile = pidfile)
1009             
1010             if pidtuple:
1011                 pid, ppid = pidtuple
1012                 break
1013             else:
1014                 time.sleep(delay)
1015                 delay = delay * 1.5
1016         else:
1017             msg = " Failed to get pid for pidfile %s/%s " % (
1018                     home, pidfile )
1019             self.error(msg)
1020             
1021             if raise_on_error:
1022                 raise RuntimeError, msg
1023
1024         return pid, ppid
1025
1026     def wait_run(self, pid, ppid, trial = 0):
1027         """ wait for a remote process to finish execution """
1028         delay = 1.0
1029
1030         while True:
1031             status = self.status(pid, ppid)
1032             
1033             if status is ProcStatus.FINISHED:
1034                 break
1035             elif status is not ProcStatus.RUNNING:
1036                 delay = delay * 1.5
1037                 time.sleep(delay)
1038                 # If it takes more than 20 seconds to start, then
1039                 # asume something went wrong
1040                 if delay > 20:
1041                     break
1042             else:
1043                 # The app is running, just wait...
1044                 time.sleep(0.5)
1045
1046     def check_output(self, home, filename):
1047         """ Retrives content of file """
1048         (out, err), proc = self.execute("cat %s" % 
1049             os.path.join(home, filename), retry = 1, with_lock = True)
1050         return (out, err), proc
1051
1052     def is_alive(self):
1053         """ Checks if host is responsive
1054         """
1055         if self.localhost:
1056             return True
1057
1058         out = err = ""
1059         msg = "Unresponsive host. Wrong answer. "
1060
1061         # The underlying SSH layer will sometimes return an empty
1062         # output (even if the command was executed without errors).
1063         # To work arround this, repeat the operation N times or
1064         # until the result is not empty string
1065         try:
1066             (out, err), proc = self.execute("echo 'ALIVE'",
1067                     blocking = True,
1068                     with_lock = True)
1069     
1070             if out.find("ALIVE") > -1:
1071                 return True
1072         except:
1073             trace = traceback.format_exc()
1074             msg = "Unresponsive host. Error reaching host: %s " % trace
1075
1076         self.error(msg, out, err)
1077         return False
1078
1079     def find_home(self):
1080         """ Retrieves host home directory
1081         """
1082         # The underlying SSH layer will sometimes return an empty
1083         # output (even if the command was executed without errors).
1084         # To work arround this, repeat the operation N times or
1085         # until the result is not empty string
1086         msg = "Impossible to retrieve HOME directory"
1087         try:
1088             (out, err), proc = self.execute("echo ${HOME}",
1089                     blocking = True,
1090                     with_lock = True)
1091     
1092             if out.strip() != "":
1093                 self._home_dir =  out.strip()
1094         except:
1095             trace = traceback.format_exc()
1096             msg = "Impossible to retrieve HOME directory %s" % trace
1097
1098         if not self._home_dir:
1099             self.error(msg)
1100             raise RuntimeError, msg
1101
1102     def filter_existing_files(self, src, dst):
1103         """ Removes files that already exist in the Linux host from src list
1104         """
1105         # construct a dictionary with { dst: src }
1106         dests = dict(map(lambda s: (os.path.join(dst, os.path.basename(s)), s), src)) \
1107                     if len(src) > 1 else dict({dst: src[0]})
1108
1109         command = []
1110         for d in dests.keys():
1111             command.append(" [ -f %(dst)s ] && echo '%(dst)s' " % {'dst' : d} )
1112
1113         command = ";".join(command)
1114
1115         (out, err), proc = self.execute(command, retry = 1, with_lock = True)
1116     
1117         for d in dests.keys():
1118             if out.find(d) > -1:
1119                 del dests[d]
1120
1121         if not dests:
1122             return []
1123
1124         return dests.values()
1125