Bug fixing and ordering openvswitch code
[nepi.git] / src / nepi / resources / linux / node.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19
20 from nepi.execution.attribute import Attribute, Flags, Types
21 from nepi.execution.resource import ResourceManager, clsinit_copy, \
22         ResourceState
23 from nepi.resources.linux import rpmfuncs, debfuncs 
24 from nepi.util import sshfuncs, execfuncs
25 from nepi.util.sshfuncs import ProcStatus
26
27 import collections
28 import os
29 import random
30 import re
31 import tempfile
32 import time
33 import threading
34 import traceback
35
36 # TODO: Unify delays!!
37 # TODO: Validate outcome of uploads!! 
38
39 class ExitCode:
40     """
41     Error codes that the rexitcode function can return if unable to
42     check the exit code of a spawned process
43     """
44     FILENOTFOUND = -1
45     CORRUPTFILE = -2
46     ERROR = -3
47     OK = 0
48
49 class OSType:
50     """
51     Supported flavors of Linux OS
52     """
53     DEBIAN = 1 
54     UBUNTU = 1 << 1 
55     FEDORA = 1 << 2
56     FEDORA_8 = 1 << 3 | FEDORA 
57     FEDORA_12 = 1 << 4 | FEDORA 
58     FEDORA_14 = 1 << 5 | FEDORA 
59
60 @clsinit_copy
61 class LinuxNode(ResourceManager):
62     """
63     .. class:: Class Args :
64       
65         :param ec: The Experiment controller
66         :type ec: ExperimentController
67         :param guid: guid of the RM
68         :type guid: int
69
70     .. note::
71
72         There are different ways in which commands can be executed using the
73         LinuxNode interface (i.e. 'execute' - blocking and non blocking, 'run',
74         'run_and_wait'). 
75         
76         Brief explanation:
77
78             * 'execute' (blocking mode) :  
79
80                      HOW IT WORKS: 'execute', forks a process and run the
81                      command, synchronously, attached to the terminal, in
82                      foreground.
83                      The execute method will block until the command returns
84                      the result on 'out', 'err' (so until it finishes executing).
85   
86                      USAGE: short-lived commands that must be executed attached
87                      to a terminal and in foreground, for which it IS necessary
88                      to block until the command has finished (e.g. if you want
89                      to run 'ls' or 'cat').
90
91             * 'execute' (NON blocking mode - blocking = False) :
92
93                     HOW IT WORKS: Same as before, except that execute method
94                     will return immediately (even if command still running).
95
96                     USAGE: long-lived commands that must be executed attached
97                     to a terminal and in foreground, but for which it is not
98                     necessary to block until the command has finished. (e.g.
99                     start an application using X11 forwarding)
100
101              * 'run' :
102
103                    HOW IT WORKS: Connects to the host ( using SSH if remote)
104                    and launches the command in background, detached from any
105                    terminal (daemonized), and returns. The command continues to
106                    run remotely, but since it is detached from the terminal,
107                    its pipes (stdin, stdout, stderr) can't be redirected to the
108                    console (as normal non detached processes would), and so they
109                    are explicitly redirected to files. The pidfile is created as
110                    part of the process of launching the command. The pidfile
111                    holds the pid and ppid of the process forked in background,
112                    so later on it is possible to check whether the command is still
113                    running.
114
115                     USAGE: long-lived commands that can run detached in background,
116                     for which it is NOT necessary to block (wait) until the command
117                     has finished. (e.g. start an application that is not using X11
118                     forwarding. It can run detached and remotely in background)
119
120              * 'run_and_wait' :
121
122                     HOW IT WORKS: Similar to 'run' except that it 'blocks' until
123                     the command has finished execution. It also checks whether
124                     errors occurred during runtime by reading the exitcode file,
125                     which contains the exit code of the command that was run
126                     (checking stderr only is not always reliable since many
127                     commands throw debugging info to stderr and the only way to
128                     automatically know whether an error really happened is to
129                     check the process exit code).
130
131                     Another difference with respect to 'run', is that instead
132                     of directly executing the command as a bash command line,
133                     it uploads the command to a bash script and runs the script.
134                     This allows to use the bash script to debug errors, since
135                     it remains at the remote host and can be run manually to
136                     reproduce the error.
137                   
138                     USAGE: medium-lived commands that can run detached in
139                     background, for which it IS necessary to block (wait) until
140                     the command has finished. (e.g. Package installation,
141                     source compilation, file download, etc)
142
143     """
144     _rtype = "linux::Node"
145     _help = "Controls Linux host machines ( either localhost or a host " \
146             "that can be accessed using a SSH key)"
147     _platform = "linux"
148
149     @classmethod
150     def _register_attributes(cls):
151         hostname = Attribute("hostname", "Hostname of the machine",
152                 flags = Flags.Design)
153
154         username = Attribute("username", "Local account username", 
155                 flags = Flags.Credential)
156
157         port = Attribute("port", "SSH port", flags = Flags.Design)
158         
159         home = Attribute("home",
160                 "Experiment home directory to store all experiment related files",
161                 flags = Flags.Design)
162         
163         identity = Attribute("identity", "SSH identity file",
164                 flags = Flags.Credential)
165         
166         server_key = Attribute("serverKey", "Server public key", 
167                 flags = Flags.Design)
168         
169         clean_home = Attribute("cleanHome", "Remove all nepi files and directories "
170                 " from node home folder before starting experiment", 
171                 type = Types.Bool,
172                 default = False,
173                 flags = Flags.Design)
174
175         clean_experiment = Attribute("cleanExperiment", "Remove all files and directories " 
176                 " from a previous same experiment, before the new experiment starts", 
177                 type = Types.Bool,
178                 default = False,
179                 flags = Flags.Design)
180         
181         clean_processes = Attribute("cleanProcesses", 
182                 "Kill all running processes before starting experiment",
183                 type = Types.Bool,
184                 default = False,
185                 flags = Flags.Design)
186         
187         tear_down = Attribute("tearDown", "Bash script to be executed before " + \
188                 "releasing the resource",
189                 flags = Flags.Design)
190
191         gateway_user = Attribute("gatewayUser", "Gateway account username",
192                 flags = Flags.Design)
193
194         gateway = Attribute("gateway", "Hostname of the gateway machine",
195                 flags = Flags.Design)
196
197         ip = Attribute("ip", "Linux host public IP address. "
198                    "Must not be modified by the user unless hostname is 'localhost'",
199                     flags = Flags.Design)
200
201         cls._register_attribute(hostname)
202         cls._register_attribute(username)
203         cls._register_attribute(port)
204         cls._register_attribute(home)
205         cls._register_attribute(identity)
206         cls._register_attribute(server_key)
207         cls._register_attribute(clean_home)
208         cls._register_attribute(clean_experiment)
209         cls._register_attribute(clean_processes)
210         cls._register_attribute(tear_down)
211         cls._register_attribute(gateway_user)
212         cls._register_attribute(gateway)
213         cls._register_attribute(ip)
214
215     def __init__(self, ec, guid):
216         super(LinuxNode, self).__init__(ec, guid)
217         self._os = None
218         # home directory at Linux host
219         self._home_dir = ""
220
221         # lock to prevent concurrent applications on the same node,
222         # to execute commands at the same time. There are potential
223         # concurrency issues when using SSH to a same host from 
224         # multiple threads. There are also possible operational 
225         # issues, e.g. an application querying the existence 
226         # of a file or folder prior to its creation, and another 
227         # application creating the same file or folder in between.
228         self._node_lock = threading.Lock()
229     
230     def log_message(self, msg):
231         return " guid %d - host %s - %s " % (self.guid, 
232                 self.get("hostname"), msg)
233
234     @property
235     def home_dir(self):
236         home = self.get("home") or ""
237         if not home.startswith("/"):
238            home = os.path.join(self._home_dir, home) 
239         return home
240
241     @property
242     def nepi_home(self):
243         return os.path.join(self.home_dir, ".nepi")
244
245     @property
246     def usr_dir(self):
247         return os.path.join(self.nepi_home, "nepi-usr")
248
249     @property
250     def lib_dir(self):
251         return os.path.join(self.usr_dir, "lib")
252
253     @property
254     def bin_dir(self):
255         return os.path.join(self.usr_dir, "bin")
256
257     @property
258     def src_dir(self):
259         return os.path.join(self.usr_dir, "src")
260
261     @property
262     def share_dir(self):
263         return os.path.join(self.usr_dir, "share")
264
265     @property
266     def exp_dir(self):
267         return os.path.join(self.nepi_home, "nepi-exp")
268
269     @property
270     def exp_home(self):
271         return os.path.join(self.exp_dir, self.ec.exp_id)
272
273     @property
274     def node_home(self):
275         return os.path.join(self.exp_home, "node-%d" % self.guid)
276
277     @property
278     def run_home(self):
279         return os.path.join(self.node_home, self.ec.run_id)
280
281     @property
282     def os(self):
283         if self._os:
284             return self._os
285
286         if not self.localhost and not self.get("username"):
287             msg = "Can't resolve OS, insufficient data "
288             self.error(msg)
289             raise RuntimeError, msg
290
291         out = self.get_os()
292
293         if out.find("Debian") == 0: 
294             self._os = OSType.DEBIAN
295         elif out.find("Ubuntu") ==0:
296             self._os = OSType.UBUNTU
297         elif out.find("Fedora release") == 0:
298             self._os = OSType.FEDORA
299             if out.find("Fedora release 8") == 0:
300                 self._os = OSType.FEDORA_8
301             elif out.find("Fedora release 12") == 0:
302                 self._os = OSType.FEDORA_12
303             elif out.find("Fedora release 14") == 0:
304                 self._os = OSType.FEDORA_14
305         else:
306             msg = "Unsupported OS"
307             self.error(msg, out)
308             raise RuntimeError, "%s - %s " %( msg, out )
309
310         return self._os
311
312     def get_os(self):
313         # The underlying SSH layer will sometimes return an empty
314         # output (even if the command was executed without errors).
315         # To work arround this, repeat the operation N times or
316         # until the result is not empty string
317         out = ""
318         try:
319             (out, err), proc = self.execute("cat /etc/issue", 
320                     with_lock = True,
321                     blocking = True)
322         except:
323             trace = traceback.format_exc()
324             msg = "Error detecting OS: %s " % trace
325             self.error(msg, out, err)
326         
327         return out
328
329     @property
330     def use_deb(self):
331         return (self.os & (OSType.DEBIAN|OSType.UBUNTU))
332
333     @property
334     def use_rpm(self):
335         return (self.os & OSType.FEDORA)
336
337     @property
338     def localhost(self):
339         return self.get("hostname") in ['localhost', '127.0.0.1', '::1']
340
341     def do_provision(self):
342         # check if host is alive
343         if not self.is_alive():
344             msg = "Deploy failed. Unresponsive node %s" % self.get("hostname")
345             self.error(msg)
346             raise RuntimeError, msg
347
348         self.find_home()
349
350         if self.get("cleanProcesses"):
351             self.clean_processes()
352
353         if self.get("cleanHome"):
354             self.clean_home()
355  
356         if self.get("cleanExperiment"):
357             self.clean_experiment()
358     
359         # Create shared directory structure and node home directory
360         paths = [self.lib_dir, 
361             self.bin_dir, 
362             self.src_dir, 
363             self.share_dir, 
364             self.node_home]
365
366         self.mkdir(paths)
367
368         # Get Public IP address if possible
369         if not self.get("ip"):
370             try:
371                 ip = sshfuncs.gethostbyname(self.get("hostname"))
372             except:
373                 msg = "DNS can not resolve hostname %s" % self.get("hostname") 
374                 self.debug(msg)
375
376             self.set("ip", ip)
377
378         super(LinuxNode, self).do_provision()
379
380     def do_deploy(self):
381         if self.state == ResourceState.NEW:
382             self.info("Deploying node")
383             self.do_discover()
384             self.do_provision()
385
386         # Node needs to wait until all associated interfaces are 
387         # ready before it can finalize deployment
388         from nepi.resources.linux.interface import LinuxInterface
389         ifaces = self.get_connected(LinuxInterface.get_rtype())
390         for iface in ifaces:
391             if iface.state < ResourceState.READY:
392                 self.ec.schedule(self.reschedule_delay, self.deploy)
393                 return 
394
395         super(LinuxNode, self).do_deploy()
396
397     def do_release(self):
398         rms = self.get_connected()
399         for rm in rms:
400             # Node needs to wait until all associated RMs are released
401             # before it can be released
402             if rm.state != ResourceState.RELEASED:
403                 self.ec.schedule(self.reschedule_delay, self.release)
404                 return 
405
406         tear_down = self.get("tearDown")
407         if tear_down:
408             self.execute(tear_down)
409
410         self.clean_processes()
411
412         super(LinuxNode, self).do_release()
413
414     def valid_connection(self, guid):
415         # TODO: Validate!
416         return True
417
418     def clean_processes(self):
419         self.info("Cleaning up processes")
420
421         if self.localhost:
422             return 
423         
424         if self.get("username") != 'root':
425             cmd = ("sudo -S killall tcpdump || /bin/true ; " +
426                 "sudo -S kill -9 $(ps aux | grep '[.]nepi' | awk '{print $2}') || /bin/true ; " +
427                 "sudo -S killall -u %s || /bin/true ; " % self.get("username"))
428         else:
429             if self.state >= ResourceState.READY:
430                 import pickle
431                 pids = pickle.load(open("/tmp/save.proc", "rb"))
432                 pids_temp = dict()
433                 ps_aux = "ps aux |awk '{print $2,$11}'"
434                 (out, err), proc = self.execute(ps_aux)
435                 if len(out) != 0:
436                     for line in out.strip().split("\n"):
437                         parts = line.strip().split(" ")
438                         pids_temp[parts[0]] = parts[1]
439                     kill_pids = set(pids_temp.items()) - set(pids.items())
440                     kill_pids = ' '.join(dict(kill_pids).keys())
441
442                     cmd = ("killall tcpdump || /bin/true ; " +
443                         "kill $(ps aux | grep '[.]nepi' | awk '{print $2}') || /bin/true ; " +
444                         "kill %s || /bin/true ; " % kill_pids)
445                 else:
446                     cmd = ("killall tcpdump || /bin/true ; " +
447                         "kill $(ps aux | grep '[.]nepi' | awk '{print $2}') || /bin/true ; ")
448             else:
449                 cmd = ("killall tcpdump || /bin/true ; " +
450                     "kill $(ps aux | grep '[.]nepi' | awk '{print $2}') || /bin/true ; ")
451
452         (out, err), proc = self.execute(cmd, retry = 1, with_lock = True)
453
454     def clean_home(self):
455         """ Cleans all NEPI related folders in the Linux host
456         """
457         self.info("Cleaning up home")
458         
459         cmd = "cd %s ; find . -maxdepth 1 -name \.nepi -execdir rm -rf {} + " % (
460                 self.home_dir )
461
462         return self.execute(cmd, with_lock = True)
463
464     def clean_experiment(self):
465         """ Cleans all experiment related files in the Linux host.
466         It preserves NEPI files and folders that have a multi experiment
467         scope.
468         """
469         self.info("Cleaning up experiment files")
470         
471         cmd = "cd %s ; find . -maxdepth 1 -name '%s' -execdir rm -rf {} + " % (
472                 self.exp_dir,
473                 self.ec.exp_id )
474             
475         return self.execute(cmd, with_lock = True)
476
477     def execute(self, command,
478             sudo = False,
479             env = None,
480             tty = False,
481             forward_x11 = False,
482             retry = 3,
483             connect_timeout = 30,
484             strict_host_checking = False,
485             persistent = True,
486             blocking = True,
487             with_lock = False
488             ):
489         """ Notice that this invocation will block until the
490         execution finishes. If this is not the desired behavior,
491         use 'run' instead."""
492
493         if self.localhost:
494             (out, err), proc = execfuncs.lexec(command, 
495                     user = self.get("username"), # still problem with localhost
496                     sudo = sudo,
497                     env = env)
498         else:
499             if with_lock:
500                 # If the execute command is blocking, we don't want to keep
501                 # the node lock. This lock is used to avoid race conditions
502                 # when creating the ControlMaster sockets. A more elegant
503                 # solution is needed.
504                 with self._node_lock:
505                     (out, err), proc = sshfuncs.rexec(
506                         command, 
507                         host = self.get("hostname"),
508                         user = self.get("username"),
509                         port = self.get("port"),
510                         gwuser = self.get("gatewayUser"),
511                         gw = self.get("gateway"),
512                         agent = True,
513                         sudo = sudo,
514                         identity = self.get("identity"),
515                         server_key = self.get("serverKey"),
516                         env = env,
517                         tty = tty,
518                         forward_x11 = forward_x11,
519                         retry = retry,
520                         connect_timeout = connect_timeout,
521                         persistent = persistent,
522                         blocking = blocking, 
523                         strict_host_checking = strict_host_checking
524                         )
525             else:
526                 (out, err), proc = sshfuncs.rexec(
527                     command, 
528                     host = self.get("hostname"),
529                     user = self.get("username"),
530                     port = self.get("port"),
531                     gwuser = self.get("gatewayUser"),
532                     gw = self.get("gateway"),
533                     agent = True,
534                     sudo = sudo,
535                     identity = self.get("identity"),
536                     server_key = self.get("serverKey"),
537                     env = env,
538                     tty = tty,
539                     forward_x11 = forward_x11,
540                     retry = retry,
541                     connect_timeout = connect_timeout,
542                     persistent = persistent,
543                     blocking = blocking, 
544                     strict_host_checking = strict_host_checking
545                     )
546
547         return (out, err), proc
548
549     def run(self, command, home,
550             create_home = False,
551             pidfile = 'pidfile',
552             stdin = None, 
553             stdout = 'stdout', 
554             stderr = 'stderr', 
555             sudo = False,
556             tty = False,
557             strict_host_checking = False):
558         
559         self.debug("Running command '%s'" % command)
560         
561         if self.localhost:
562             (out, err), proc = execfuncs.lspawn(command, pidfile,
563                     home = home, 
564                     create_home = create_home, 
565                     stdin = stdin or '/dev/null',
566                     stdout = stdout or '/dev/null',
567                     stderr = stderr or '/dev/null',
568                     sudo = sudo) 
569         else:
570             with self._node_lock:
571                 (out, err), proc = sshfuncs.rspawn(
572                     command,
573                     pidfile = pidfile,
574                     home = home,
575                     create_home = create_home,
576                     stdin = stdin or '/dev/null',
577                     stdout = stdout or '/dev/null',
578                     stderr = stderr or '/dev/null',
579                     sudo = sudo,
580                     host = self.get("hostname"),
581                     user = self.get("username"),
582                     port = self.get("port"),
583                     gwuser = self.get("gatewayUser"),
584                     gw = self.get("gateway"),
585                     agent = True,
586                     identity = self.get("identity"),
587                     server_key = self.get("serverKey"),
588                     tty = tty,
589                     strict_host_checking = strict_host_checking
590                     )
591
592         return (out, err), proc
593
594     def getpid(self, home, pidfile = "pidfile"):
595         if self.localhost:
596             pidtuple =  execfuncs.lgetpid(os.path.join(home, pidfile))
597         else:
598             with self._node_lock:
599                 pidtuple = sshfuncs.rgetpid(
600                     os.path.join(home, pidfile),
601                     host = self.get("hostname"),
602                     user = self.get("username"),
603                     port = self.get("port"),
604                     gwuser = self.get("gatewayUser"),
605                     gw = self.get("gateway"),
606                     agent = True,
607                     identity = self.get("identity"),
608                     server_key = self.get("serverKey"),
609                     strict_host_checking = False
610                     )
611         
612         return pidtuple
613
614     def status(self, pid, ppid):
615         if self.localhost:
616             status = execfuncs.lstatus(pid, ppid)
617         else:
618             with self._node_lock:
619                 status = sshfuncs.rstatus(
620                         pid, ppid,
621                         host = self.get("hostname"),
622                         user = self.get("username"),
623                         port = self.get("port"),
624                         gwuser = self.get("gatewayUser"),
625                         gw = self.get("gateway"),
626                         agent = True,
627                         identity = self.get("identity"),
628                         server_key = self.get("serverKey"),
629                         strict_host_checking = False
630                         )
631            
632         return status
633     
634     def kill(self, pid, ppid, sudo = False):
635         out = err = ""
636         proc = None
637         status = self.status(pid, ppid)
638
639         if status == sshfuncs.ProcStatus.RUNNING:
640             if self.localhost:
641                 (out, err), proc = execfuncs.lkill(pid, ppid, sudo)
642             else:
643                 with self._node_lock:
644                     (out, err), proc = sshfuncs.rkill(
645                         pid, ppid,
646                         host = self.get("hostname"),
647                         user = self.get("username"),
648                         port = self.get("port"),
649                         gwuser = self.get("gatewayUser"),
650                         gw = self.get("gateway"),
651                         agent = True,
652                         sudo = sudo,
653                         identity = self.get("identity"),
654                         server_key = self.get("serverKey"),
655                         strict_host_checking = False
656                         )
657
658         return (out, err), proc
659
660     def copy(self, src, dst):
661         if self.localhost:
662             (out, err), proc = execfuncs.lcopy(src, dst, 
663                     recursive = True)
664         else:
665             with self._node_lock:
666                 (out, err), proc = sshfuncs.rcopy(
667                     src, dst, 
668                     port = self.get("port"),
669                     gwuser = self.get("gatewayUser"),
670                     gw = self.get("gateway"),
671                     identity = self.get("identity"),
672                     server_key = self.get("serverKey"),
673                     recursive = True,
674                     strict_host_checking = False)
675
676         return (out, err), proc
677
678     def upload(self, src, dst, text = False, overwrite = True,
679             raise_on_error = True):
680         """ Copy content to destination
681
682         src  string with the content to copy. Can be:
683             - plain text
684             - a string with the path to a local file
685             - a string with a semi-colon separeted list of local files
686             - a string with a local directory
687
688         dst  string with destination path on the remote host (remote is 
689             always self.host)
690
691         text src is text input, it must be stored into a temp file before 
692         uploading
693         """
694         # If source is a string input 
695         f = None
696         if text and not os.path.isfile(src):
697             # src is text input that should be uploaded as file
698             # create a temporal file with the content to upload
699             f = tempfile.NamedTemporaryFile(delete=False)
700             f.write(src)
701             f.close()
702             src = f.name
703
704         # If dst files should not be overwritten, check that the files do not
705         # exits already
706         if isinstance(src, str):
707             src = map(str.strip, src.split(";"))
708     
709         if overwrite == False:
710             src = self.filter_existing_files(src, dst)
711             if not src:
712                 return ("", ""), None
713
714         if not self.localhost:
715             # Build destination as <user>@<server>:<path>
716             dst = "%s@%s:%s" % (self.get("username"), self.get("hostname"), dst)
717
718         ((out, err), proc) = self.copy(src, dst)
719
720         # clean up temp file
721         if f:
722             os.remove(f.name)
723
724         if err:
725             msg = " Failed to upload files - src: %s dst: %s" %  (";".join(src), dst) 
726             self.error(msg, out, err)
727             
728             msg = "%s out: %s err: %s" % (msg, out, err)
729             if raise_on_error:
730                 raise RuntimeError, msg
731
732         return ((out, err), proc)
733
734     def download(self, src, dst, raise_on_error = True):
735         if not self.localhost:
736             # Build destination as <user>@<server>:<path>
737             src = "%s@%s:%s" % (self.get("username"), self.get("hostname"), src)
738
739         ((out, err), proc) = self.copy(src, dst)
740
741         if err:
742             msg = " Failed to download files - src: %s dst: %s" %  (";".join(src), dst) 
743             self.error(msg, out, err)
744
745             if raise_on_error:
746                 raise RuntimeError, msg
747
748         return ((out, err), proc)
749
750     def install_packages_command(self, packages):
751         command = ""
752         if self.use_rpm:
753             command = rpmfuncs.install_packages_command(self.os, packages)
754         elif self.use_deb:
755             command = debfuncs.install_packages_command(self.os, packages)
756         else:
757             msg = "Error installing packages ( OS not known ) "
758             self.error(msg, self.os)
759             raise RuntimeError, msg
760
761         return command
762
763     def install_packages(self, packages, home, run_home = None,
764             raise_on_error = True):
765         """ Install packages in the Linux host.
766
767         'home' is the directory to upload the package installation script.
768         'run_home' is the directory from where to execute the script.
769         """
770         command = self.install_packages_command(packages)
771
772         run_home = run_home or home
773
774         (out, err), proc = self.run_and_wait(command, run_home, 
775             shfile = os.path.join(home, "instpkg.sh"),
776             pidfile = "instpkg_pidfile",
777             ecodefile = "instpkg_exitcode",
778             stdout = "instpkg_stdout", 
779             stderr = "instpkg_stderr",
780             overwrite = False,
781             raise_on_error = raise_on_error)
782
783         return (out, err), proc 
784
785     def remove_packages(self, packages, home, run_home = None,
786             raise_on_error = True):
787         """ Uninstall packages from the Linux host.
788
789         'home' is the directory to upload the package un-installation script.
790         'run_home' is the directory from where to execute the script.
791         """
792         if self.use_rpm:
793             command = rpmfuncs.remove_packages_command(self.os, packages)
794         elif self.use_deb:
795             command = debfuncs.remove_packages_command(self.os, packages)
796         else:
797             msg = "Error removing packages ( OS not known ) "
798             self.error(msg)
799             raise RuntimeError, msg
800
801         run_home = run_home or home
802
803         (out, err), proc = self.run_and_wait(command, run_home, 
804             shfile = os.path.join(home, "rmpkg.sh"),
805             pidfile = "rmpkg_pidfile",
806             ecodefile = "rmpkg_exitcode",
807             stdout = "rmpkg_stdout", 
808             stderr = "rmpkg_stderr",
809             overwrite = False,
810             raise_on_error = raise_on_error)
811          
812         return (out, err), proc 
813
814     def mkdir(self, paths, clean = False):
815         """ Paths is either a single remote directory path to create,
816         or a list of directories to create.
817         """
818         if clean:
819             self.rmdir(paths)
820
821         if isinstance(paths, str):
822             paths = [paths]
823
824         cmd = " ; ".join(map(lambda path: "mkdir -p %s" % path, paths))
825
826         return self.execute(cmd, with_lock = True)
827
828     def rmdir(self, paths):
829         """ Paths is either a single remote directory path to delete,
830         or a list of directories to delete.
831         """
832
833         if isinstance(paths, str):
834             paths = [paths]
835
836         cmd = " ; ".join(map(lambda path: "rm -rf %s" % path, paths))
837
838         return self.execute(cmd, with_lock = True)
839         
840     def run_and_wait(self, command, home, 
841             shfile="cmd.sh",
842             env=None,
843             overwrite=True,
844             wait_run=True,
845             pidfile="pidfile", 
846             ecodefile="exitcode", 
847             stdin=None, 
848             stdout="stdout", 
849             stderr="stderr", 
850             sudo=False,
851             tty=False,
852             raise_on_error=True):
853         """
854         Uploads the 'command' to a bash script in the host.
855         Then runs the script detached in background in the host, and
856         busy-waites until the script finishes executing.
857         """
858
859         if not shfile.startswith("/"):
860             shfile = os.path.join(home, shfile)
861
862         self.upload_command(command, 
863             shfile = shfile, 
864             ecodefile = ecodefile, 
865             env = env,
866             overwrite = overwrite)
867
868         command = "bash %s" % shfile
869         # run command in background in remote host
870         (out, err), proc = self.run(command, home, 
871                 pidfile = pidfile,
872                 stdin = stdin, 
873                 stdout = stdout, 
874                 stderr = stderr, 
875                 sudo = sudo,
876                 tty = tty)
877
878         # check no errors occurred
879         if proc.poll():
880             msg = " Failed to run command '%s' " % command
881             self.error(msg, out, err)
882             if raise_on_error:
883                 raise RuntimeError, msg
884
885         # Wait for pid file to be generated
886         pid, ppid = self.wait_pid(
887                 home = home, 
888                 pidfile = pidfile, 
889                 raise_on_error = raise_on_error)
890
891         if wait_run:
892             # wait until command finishes to execute
893             self.wait_run(pid, ppid)
894           
895             (eout, err), proc = self.check_errors(home,
896                 ecodefile = ecodefile,
897                 stderr = stderr)
898
899             # Out is what was written in the stderr file
900             if err:
901                 msg = " Failed to run command '%s' " % command
902                 self.error(msg, eout, err)
903
904                 if raise_on_error:
905                     raise RuntimeError, msg
906
907         (out, oerr), proc = self.check_output(home, stdout)
908         
909         return (out, err), proc
910         
911     def exitcode(self, home, ecodefile = "exitcode"):
912         """
913         Get the exit code of an application.
914         Returns an integer value with the exit code 
915         """
916         (out, err), proc = self.check_output(home, ecodefile)
917
918         # Succeeded to open file, return exit code in the file
919         if proc.wait() == 0:
920             try:
921                 return int(out.strip())
922             except:
923                 # Error in the content of the file!
924                 return ExitCode.CORRUPTFILE
925
926         # No such file or directory
927         if proc.returncode == 1:
928             return ExitCode.FILENOTFOUND
929         
930         # Other error from 'cat'
931         return ExitCode.ERROR
932
933     def upload_command(self, command, 
934             shfile="cmd.sh",
935             ecodefile="exitcode",
936             overwrite=True,
937             env=None):
938         """ Saves the command as a bash script file in the remote host, and
939         forces to save the exit code of the command execution to the ecodefile
940         """
941
942         if not (command.strip().endswith(";") or command.strip().endswith("&")):
943             command += ";"
944       
945         # The exit code of the command will be stored in ecodefile
946         command = " { %(command)s } ; echo $? > %(ecodefile)s ;" % {
947                 'command': command,
948                 'ecodefile': ecodefile,
949                 } 
950
951         # Export environment
952         environ = self.format_environment(env)
953
954         # Add environ to command
955         command = environ + command
956
957         return self.upload(command, shfile, text=True, overwrite=overwrite)
958
959     def format_environment(self, env, inline=False):
960         """ Formats the environment variables for a command to be executed
961         either as an inline command
962         (i.e. export PYTHONPATH=src/..; export LALAL= ..;python script.py) or 
963         as a bash script (i.e. export PYTHONPATH=src/.. \n export LALA=.. \n)
964         """
965         if not env: return ""
966
967         # Remove extra white spaces
968         env = re.sub(r'\s+', ' ', env.strip())
969
970         sep = ";" if inline else "\n"
971         return sep.join(map(lambda e: " export %s" % e, env.split(" "))) + sep 
972
973     def check_errors(self, home, 
974             ecodefile = "exitcode", 
975             stderr = "stderr"):
976         """ Checks whether errors occurred while running a command.
977         It first checks the exit code for the command, and only if the
978         exit code is an error one it returns the error output.
979
980         """
981         proc = None
982         err = ""
983
984         # get exit code saved in the 'exitcode' file
985         ecode = self.exitcode(home, ecodefile)
986
987         if ecode in [ ExitCode.CORRUPTFILE, ExitCode.ERROR ]:
988             err = "Error retrieving exit code status from file %s/%s" % (home, ecodefile)
989         elif ecode > 0 or ecode == ExitCode.FILENOTFOUND:
990             # The process returned an error code or didn't exist. 
991             # Check standard error.
992             (err, eerr), proc = self.check_output(home, stderr)
993
994             # If the stderr file was not found, assume nothing bad happened,
995             # and just ignore the error.
996             # (cat returns 1 for error "No such file or directory")
997             if ecode == ExitCode.FILENOTFOUND and proc.poll() == 1: 
998                 err = "" 
999             
1000         return ("", err), proc
1001  
1002     def wait_pid(self, home, pidfile = "pidfile", raise_on_error = False):
1003         """ Waits until the pid file for the command is generated, 
1004             and returns the pid and ppid of the process """
1005         pid = ppid = None
1006         delay = 1.0
1007
1008         for i in xrange(2):
1009             pidtuple = self.getpid(home = home, pidfile = pidfile)
1010             
1011             if pidtuple:
1012                 pid, ppid = pidtuple
1013                 break
1014             else:
1015                 time.sleep(delay)
1016                 delay = delay * 1.5
1017         else:
1018             msg = " Failed to get pid for pidfile %s/%s " % (
1019                     home, pidfile )
1020             self.error(msg)
1021             
1022             if raise_on_error:
1023                 raise RuntimeError, msg
1024
1025         return pid, ppid
1026
1027     def wait_run(self, pid, ppid, trial = 0):
1028         """ wait for a remote process to finish execution """
1029         delay = 1.0
1030
1031         while True:
1032             status = self.status(pid, ppid)
1033             
1034             if status is ProcStatus.FINISHED:
1035                 break
1036             elif status is not ProcStatus.RUNNING:
1037                 delay = delay * 1.5
1038                 time.sleep(delay)
1039                 # If it takes more than 20 seconds to start, then
1040                 # asume something went wrong
1041                 if delay > 20:
1042                     break
1043             else:
1044                 # The app is running, just wait...
1045                 time.sleep(0.5)
1046
1047     def check_output(self, home, filename):
1048         """ Retrives content of file """
1049         (out, err), proc = self.execute("cat %s" % 
1050             os.path.join(home, filename), retry = 1, with_lock = True)
1051         return (out, err), proc
1052
1053     def is_alive(self):
1054         """ Checks if host is responsive
1055         """
1056         if self.localhost:
1057             return True
1058
1059         out = err = ""
1060         msg = "Unresponsive host. Wrong answer. "
1061
1062         # The underlying SSH layer will sometimes return an empty
1063         # output (even if the command was executed without errors).
1064         # To work arround this, repeat the operation N times or
1065         # until the result is not empty string
1066         try:
1067             (out, err), proc = self.execute("echo 'ALIVE'",
1068                     blocking = True,
1069                     with_lock = True)
1070     
1071             if out.find("ALIVE") > -1:
1072                 return True
1073         except:
1074             trace = traceback.format_exc()
1075             msg = "Unresponsive host. Error reaching host: %s " % trace
1076
1077         self.error(msg, out, err)
1078         return False
1079
1080     def find_home(self):
1081         """ Retrieves host home directory
1082         """
1083         # The underlying SSH layer will sometimes return an empty
1084         # output (even if the command was executed without errors).
1085         # To work arround this, repeat the operation N times or
1086         # until the result is not empty string
1087         msg = "Impossible to retrieve HOME directory"
1088         try:
1089             (out, err), proc = self.execute("echo ${HOME}",
1090                     blocking = True,
1091                     with_lock = True)
1092     
1093             if out.strip() != "":
1094                 self._home_dir =  out.strip()
1095         except:
1096             trace = traceback.format_exc()
1097             msg = "Impossible to retrieve HOME directory %s" % trace
1098
1099         if not self._home_dir:
1100             self.error(msg)
1101             raise RuntimeError, msg
1102
1103     def filter_existing_files(self, src, dst):
1104         """ Removes files that already exist in the Linux host from src list
1105         """
1106         # construct a dictionary with { dst: src }
1107         dests = dict(map(lambda s: (os.path.join(dst, os.path.basename(s)), s), src)) \
1108                     if len(src) > 1 else dict({dst: src[0]})
1109
1110         command = []
1111         for d in dests.keys():
1112             command.append(" [ -f %(dst)s ] && echo '%(dst)s' " % {'dst' : d} )
1113
1114         command = ";".join(command)
1115
1116         (out, err), proc = self.execute(command, retry = 1, with_lock = True)
1117     
1118         for d in dests.keys():
1119             if out.find(d) > -1:
1120                 del dests[d]
1121
1122         if not dests:
1123             return []
1124
1125         return dests.values()
1126