Changing reschedule_delay internals
[nepi.git] / src / nepi / resources / linux / node.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19
20 from nepi.execution.attribute import Attribute, Flags, Types
21 from nepi.execution.resource import ResourceManager, clsinit_copy, \
22         ResourceState
23 from nepi.resources.linux import rpmfuncs, debfuncs 
24 from nepi.util import sshfuncs, execfuncs
25 from nepi.util.sshfuncs import ProcStatus
26
27 import collections
28 import os
29 import random
30 import re
31 import socket
32 import tempfile
33 import time
34 import threading
35 import traceback
36
37 # TODO: Unify delays!!
38 # TODO: Validate outcome of uploads!! 
39
40 class ExitCode:
41     """
42     Error codes that the rexitcode function can return if unable to
43     check the exit code of a spawned process
44     """
45     FILENOTFOUND = -1
46     CORRUPTFILE = -2
47     ERROR = -3
48     OK = 0
49
50 class OSType:
51     """
52     Supported flavors of Linux OS
53     """
54     FEDORA_8 = "f8"
55     FEDORA_12 = "f12"
56     FEDORA_14 = "f14"
57     FEDORA = "fedora"
58     UBUNTU = "ubuntu"
59     DEBIAN = "debian"
60
61 @clsinit_copy
62 class LinuxNode(ResourceManager):
63     """
64     .. class:: Class Args :
65       
66         :param ec: The Experiment controller
67         :type ec: ExperimentController
68         :param guid: guid of the RM
69         :type guid: int
70
71     .. note::
72
73         There are different ways in which commands can be executed using the
74         LinuxNode interface (i.e. 'execute' - blocking and non blocking, 'run',
75         'run_and_wait'). 
76         
77         Brief explanation:
78
79             * 'execute' (blocking mode) :  
80
81                      HOW IT WORKS: 'execute', forks a process and run the
82                      command, synchronously, attached to the terminal, in
83                      foreground.
84                      The execute method will block until the command returns
85                      the result on 'out', 'err' (so until it finishes executing).
86   
87                      USAGE: short-lived commands that must be executed attached
88                      to a terminal and in foreground, for which it IS necessary
89                      to block until the command has finished (e.g. if you want
90                      to run 'ls' or 'cat').
91
92             * 'execute' (NON blocking mode - blocking = False) :
93
94                     HOW IT WORKS: Same as before, except that execute method
95                     will return immediately (even if command still running).
96
97                     USAGE: long-lived commands that must be executed attached
98                     to a terminal and in foreground, but for which it is not
99                     necessary to block until the command has finished. (e.g.
100                     start an application using X11 forwarding)
101
102              * 'run' :
103
104                    HOW IT WORKS: Connects to the host ( using SSH if remote)
105                    and launches the command in background, detached from any
106                    terminal (daemonized), and returns. The command continues to
107                    run remotely, but since it is detached from the terminal,
108                    its pipes (stdin, stdout, stderr) can't be redirected to the
109                    console (as normal non detached processes would), and so they
110                    are explicitly redirected to files. The pidfile is created as
111                    part of the process of launching the command. The pidfile
112                    holds the pid and ppid of the process forked in background,
113                    so later on it is possible to check whether the command is still
114                    running.
115
116                     USAGE: long-lived commands that can run detached in background,
117                     for which it is NOT necessary to block (wait) until the command
118                     has finished. (e.g. start an application that is not using X11
119                     forwarding. It can run detached and remotely in background)
120
121              * 'run_and_wait' :
122
123                     HOW IT WORKS: Similar to 'run' except that it 'blocks' until
124                     the command has finished execution. It also checks whether
125                     errors occurred during runtime by reading the exitcode file,
126                     which contains the exit code of the command that was run
127                     (checking stderr only is not always reliable since many
128                     commands throw debugging info to stderr and the only way to
129                     automatically know whether an error really happened is to
130                     check the process exit code).
131
132                     Another difference with respect to 'run', is that instead
133                     of directly executing the command as a bash command line,
134                     it uploads the command to a bash script and runs the script.
135                     This allows to use the bash script to debug errors, since
136                     it remains at the remote host and can be run manually to
137                     reproduce the error.
138                   
139                     USAGE: medium-lived commands that can run detached in
140                     background, for which it IS necessary to block (wait) until
141                     the command has finished. (e.g. Package installation,
142                     source compilation, file download, etc)
143
144     """
145     _rtype = "LinuxNode"
146     _help = "Controls Linux host machines ( either localhost or a host " \
147             "that can be accessed using a SSH key)"
148     _backend_type = "linux"
149
150     @classmethod
151     def _register_attributes(cls):
152         hostname = Attribute("hostname", "Hostname of the machine",
153                 flags = Flags.Design)
154
155         username = Attribute("username", "Local account username", 
156                 flags = Flags.Credential)
157
158         port = Attribute("port", "SSH port", flags = Flags.Design)
159         
160         home = Attribute("home",
161                 "Experiment home directory to store all experiment related files",
162                 flags = Flags.Design)
163         
164         identity = Attribute("identity", "SSH identity file",
165                 flags = Flags.Credential)
166         
167         server_key = Attribute("serverKey", "Server public key", 
168                 flags = Flags.Design)
169         
170         clean_home = Attribute("cleanHome", "Remove all nepi files and directories "
171                 " from node home folder before starting experiment", 
172                 type = Types.Bool,
173                 default = False,
174                 flags = Flags.Design)
175
176         clean_experiment = Attribute("cleanExperiment", "Remove all files and directories " 
177                 " from a previous same experiment, before the new experiment starts", 
178                 type = Types.Bool,
179                 default = False,
180                 flags = Flags.Design)
181         
182         clean_processes = Attribute("cleanProcesses", 
183                 "Kill all running processes before starting experiment",
184                 type = Types.Bool,
185                 default = False,
186                 flags = Flags.Design)
187         
188         tear_down = Attribute("tearDown", "Bash script to be executed before " + \
189                 "releasing the resource",
190                 flags = Flags.Design)
191
192         gateway_user = Attribute("gatewayUser", "Gateway account username",
193                 flags = Flags.Design)
194
195         gateway = Attribute("gateway", "Hostname of the gateway machine",
196                 flags = Flags.Design)
197
198         ip = Attribute("ip", "Linux host public IP address. "
199                    "Must not be modified by the user unless hostname is 'localhost'",
200                     flags = Flags.Design)
201
202         cls._register_attribute(hostname)
203         cls._register_attribute(username)
204         cls._register_attribute(port)
205         cls._register_attribute(home)
206         cls._register_attribute(identity)
207         cls._register_attribute(server_key)
208         cls._register_attribute(clean_home)
209         cls._register_attribute(clean_experiment)
210         cls._register_attribute(clean_processes)
211         cls._register_attribute(tear_down)
212         cls._register_attribute(gateway_user)
213         cls._register_attribute(gateway)
214         cls._register_attribute(ip)
215
216     def __init__(self, ec, guid):
217         super(LinuxNode, self).__init__(ec, guid)
218         self._os = None
219         # home directory at Linux host
220         self._home_dir = ""
221
222         # lock to prevent concurrent applications on the same node,
223         # to execute commands at the same time. There are potential
224         # concurrency issues when using SSH to a same host from 
225         # multiple threads. There are also possible operational 
226         # issues, e.g. an application querying the existence 
227         # of a file or folder prior to its creation, and another 
228         # application creating the same file or folder in between.
229         self._node_lock = threading.Lock()
230     
231     def log_message(self, msg):
232         return " guid %d - host %s - %s " % (self.guid, 
233                 self.get("hostname"), msg)
234
235     @property
236     def home_dir(self):
237         home = self.get("home") or ""
238         if not home.startswith("/"):
239            home = os.path.join(self._home_dir, home) 
240         return home
241
242     @property
243     def nepi_home(self):
244         return os.path.join(self.home_dir, ".nepi")
245
246     @property
247     def usr_dir(self):
248         return os.path.join(self.nepi_home, "nepi-usr")
249
250     @property
251     def lib_dir(self):
252         return os.path.join(self.usr_dir, "lib")
253
254     @property
255     def bin_dir(self):
256         return os.path.join(self.usr_dir, "bin")
257
258     @property
259     def src_dir(self):
260         return os.path.join(self.usr_dir, "src")
261
262     @property
263     def share_dir(self):
264         return os.path.join(self.usr_dir, "share")
265
266     @property
267     def exp_dir(self):
268         return os.path.join(self.nepi_home, "nepi-exp")
269
270     @property
271     def exp_home(self):
272         return os.path.join(self.exp_dir, self.ec.exp_id)
273
274     @property
275     def node_home(self):
276         return os.path.join(self.exp_home, "node-%d" % self.guid)
277
278     @property
279     def run_home(self):
280         return os.path.join(self.node_home, self.ec.run_id)
281
282     @property
283     def os(self):
284         if self._os:
285             return self._os
286
287         if not self.localhost and not self.get("username"):
288             msg = "Can't resolve OS, insufficient data "
289             self.error(msg)
290             raise RuntimeError, msg
291
292         out = self.get_os()
293
294         if out.find("Fedora release 8") == 0:
295             self._os = OSType.FEDORA_8
296         elif out.find("Fedora release 12") == 0:
297             self._os = OSType.FEDORA_12
298         elif out.find("Fedora release 14") == 0:
299             self._os = OSType.FEDORA_14
300         elif out.find("Fedora release") == 0:
301             self._os = OSType.FEDORA
302         elif out.find("Debian") == 0: 
303             self._os = OSType.DEBIAN
304         elif out.find("Ubuntu") ==0:
305             self._os = OSType.UBUNTU
306         else:
307             msg = "Unsupported OS"
308             self.error(msg, out)
309             raise RuntimeError, "%s - %s " %( msg, out )
310
311         return self._os
312
313     def get_os(self):
314         # The underlying SSH layer will sometimes return an empty
315         # output (even if the command was executed without errors).
316         # To work arround this, repeat the operation N times or
317         # until the result is not empty string
318         out = ""
319         try:
320             (out, err), proc = self.execute("cat /etc/issue", 
321                     with_lock = True,
322                     blocking = True)
323         except:
324             trace = traceback.format_exc()
325             msg = "Error detecting OS: %s " % trace
326             self.error(msg, out, err)
327         
328         return out
329
330     @property
331     def use_deb(self):
332         return self.os in [OSType.DEBIAN, OSType.UBUNTU]
333
334     @property
335     def use_rpm(self):
336         return self.os in [OSType.FEDORA_12, OSType.FEDORA_14, OSType.FEDORA_8,
337                 OSType.FEDORA]
338
339     @property
340     def localhost(self):
341         return self.get("hostname") in ['localhost', '127.0.0.1', '::1']
342
343     def do_provision(self):
344         # check if host is alive
345         if not self.is_alive():
346             msg = "Deploy failed. Unresponsive node %s" % self.get("hostname")
347             self.error(msg)
348             raise RuntimeError, msg
349
350         self.find_home()
351
352         if self.get("cleanProcesses"):
353             self.clean_processes()
354
355         if self.get("cleanHome"):
356             self.clean_home()
357  
358         if self.get("cleanExperiment"):
359             self.clean_experiment()
360     
361         # Create shared directory structure and node home directory
362         paths = [self.lib_dir, 
363             self.bin_dir, 
364             self.src_dir, 
365             self.share_dir, 
366             self.node_home]
367
368         self.mkdir(paths)
369
370         # Get Public IP address if possible
371         if not self.get("ip"):
372             ip = None
373
374             if self.localhost:
375                 ip = socket.gethostbyname(socket.gethostname())
376             else:
377                 try:
378                     ip = socket.gethostbyname(self.get("hostname"))
379                 except:
380                     msg = "DNS can not resolve hostname %s" % self.get("hostname") 
381                     self.debug(msg)
382
383             self.set("ip", ip)
384
385         super(LinuxNode, self).do_provision()
386
387     def do_deploy(self):
388         if self.state == ResourceState.NEW:
389             self.info("Deploying node")
390             self.do_discover()
391             self.do_provision()
392
393         # Node needs to wait until all associated interfaces are 
394         # ready before it can finalize deployment
395         from nepi.resources.linux.interface import LinuxInterface
396         ifaces = self.get_connected(LinuxInterface.get_rtype())
397         for iface in ifaces:
398             if iface.state < ResourceState.READY:
399                 self.ec.schedule(self.reschedule_delay, self.deploy)
400                 return 
401
402         super(LinuxNode, self).do_deploy()
403
404     def do_release(self):
405         rms = self.get_connected()
406         for rm in rms:
407             # Node needs to wait until all associated RMs are released
408             # before it can be released
409             if rm.state != ResourceState.RELEASED:
410                 self.ec.schedule(self.reschedule_delay, self.release)
411                 return 
412
413         tear_down = self.get("tearDown")
414         if tear_down:
415             self.execute(tear_down)
416
417         self.clean_processes()
418
419         super(LinuxNode, self).do_release()
420
421     def valid_connection(self, guid):
422         # TODO: Validate!
423         return True
424
425     def clean_processes(self):
426         self.info("Cleaning up processes")
427
428         if self.localhost:
429             return 
430         
431         if self.get("username") != 'root':
432             cmd = ("sudo -S killall tcpdump || /bin/true ; " +
433                 "sudo -S killall -u %s || /bin/true ; " % self.get("username"))
434         else:
435             if self.state >= ResourceState.READY:
436                 import pickle
437                 pids = pickle.load(open("/tmp/save.proc", "rb"))
438                 pids_temp = dict()
439                 ps_aux = "ps aux |awk '{print $2,$11}'"
440                 (out, err), proc = self.execute(ps_aux)
441                 if len(out) != 0:
442                     for line in out.strip().split("\n"):
443                         parts = line.strip().split(" ")
444                         pids_temp[parts[0]] = parts[1]
445                     kill_pids = set(pids_temp.items()) - set(pids.items())
446                     kill_pids = ' '.join(dict(kill_pids).keys())
447
448                     cmd = ("killall tcpdump || /bin/true ; " +
449                         "kill $(ps aux | grep '[n]epi' | awk '{print $2}') || /bin/true ; " +
450                         "kill %s || /bin/true ; " % kill_pids)
451                 else:
452                     cmd = ("killall tcpdump || /bin/true ; " +
453                         "kill $(ps aux | grep '[n]epi' | awk '{print $2}') || /bin/true ; ")
454             else:
455                 cmd = ("killall tcpdump || /bin/true ; " +
456                     "kill $(ps aux | grep '[n]epi' | awk '{print $2}') || /bin/true ; ")
457
458         (out, err), proc = self.execute(cmd, retry = 1, with_lock = True)
459
460     def clean_home(self):
461         """ Cleans all NEPI related folders in the Linux host
462         """
463         self.info("Cleaning up home")
464         
465         cmd = "cd %s ; find . -maxdepth 1 -name \.nepi -execdir rm -rf {} + " % (
466                 self.home_dir )
467
468         return self.execute(cmd, with_lock = True)
469
470     def clean_experiment(self):
471         """ Cleans all experiment related files in the Linux host.
472         It preserves NEPI files and folders that have a multi experiment
473         scope.
474         """
475         self.info("Cleaning up experiment files")
476         
477         cmd = "cd %s ; find . -maxdepth 1 -name '%s' -execdir rm -rf {} + " % (
478                 self.exp_dir,
479                 self.ec.exp_id )
480             
481         return self.execute(cmd, with_lock = True)
482
483     def execute(self, command,
484             sudo = False,
485             env = None,
486             tty = False,
487             forward_x11 = False,
488             retry = 3,
489             connect_timeout = 30,
490             strict_host_checking = False,
491             persistent = True,
492             blocking = True,
493             with_lock = False
494             ):
495         """ Notice that this invocation will block until the
496         execution finishes. If this is not the desired behavior,
497         use 'run' instead."""
498
499         if self.localhost:
500             (out, err), proc = execfuncs.lexec(command, 
501                     user = self.get("username"), # still problem with localhost
502                     sudo = sudo,
503                     env = env)
504         else:
505             if with_lock:
506                 # If the execute command is blocking, we don't want to keep
507                 # the node lock. This lock is used to avoid race conditions
508                 # when creating the ControlMaster sockets. A more elegant
509                 # solution is needed.
510                 with self._node_lock:
511                     (out, err), proc = sshfuncs.rexec(
512                         command, 
513                         host = self.get("hostname"),
514                         user = self.get("username"),
515                         port = self.get("port"),
516                         gwuser = self.get("gatewayUser"),
517                         gw = self.get("gateway"),
518                         agent = True,
519                         sudo = sudo,
520                         identity = self.get("identity"),
521                         server_key = self.get("serverKey"),
522                         env = env,
523                         tty = tty,
524                         forward_x11 = forward_x11,
525                         retry = retry,
526                         connect_timeout = connect_timeout,
527                         persistent = persistent,
528                         blocking = blocking, 
529                         strict_host_checking = strict_host_checking
530                         )
531             else:
532                 (out, err), proc = sshfuncs.rexec(
533                     command, 
534                     host = self.get("hostname"),
535                     user = self.get("username"),
536                     port = self.get("port"),
537                     gwuser = self.get("gatewayUser"),
538                     gw = self.get("gateway"),
539                     agent = True,
540                     sudo = sudo,
541                     identity = self.get("identity"),
542                     server_key = self.get("serverKey"),
543                     env = env,
544                     tty = tty,
545                     forward_x11 = forward_x11,
546                     retry = retry,
547                     connect_timeout = connect_timeout,
548                     persistent = persistent,
549                     blocking = blocking, 
550                     strict_host_checking = strict_host_checking
551                     )
552
553         return (out, err), proc
554
555     def run(self, command, home,
556             create_home = False,
557             pidfile = 'pidfile',
558             stdin = None, 
559             stdout = 'stdout', 
560             stderr = 'stderr', 
561             sudo = False,
562             tty = False,
563             strict_host_checking = False):
564         
565         self.debug("Running command '%s'" % command)
566         
567         if self.localhost:
568             (out, err), proc = execfuncs.lspawn(command, pidfile,
569                     home = home, 
570                     create_home = create_home, 
571                     stdin = stdin or '/dev/null',
572                     stdout = stdout or '/dev/null',
573                     stderr = stderr or '/dev/null',
574                     sudo = sudo) 
575         else:
576             with self._node_lock:
577                 (out, err), proc = sshfuncs.rspawn(
578                     command,
579                     pidfile = pidfile,
580                     home = home,
581                     create_home = create_home,
582                     stdin = stdin or '/dev/null',
583                     stdout = stdout or '/dev/null',
584                     stderr = stderr or '/dev/null',
585                     sudo = sudo,
586                     host = self.get("hostname"),
587                     user = self.get("username"),
588                     port = self.get("port"),
589                     gwuser = self.get("gatewayUser"),
590                     gw = self.get("gateway"),
591                     agent = True,
592                     identity = self.get("identity"),
593                     server_key = self.get("serverKey"),
594                     tty = tty,
595                     strict_host_checking = strict_host_checking
596                     )
597
598         return (out, err), proc
599
600     def getpid(self, home, pidfile = "pidfile"):
601         if self.localhost:
602             pidtuple =  execfuncs.lgetpid(os.path.join(home, pidfile))
603         else:
604             with self._node_lock:
605                 pidtuple = sshfuncs.rgetpid(
606                     os.path.join(home, pidfile),
607                     host = self.get("hostname"),
608                     user = self.get("username"),
609                     port = self.get("port"),
610                     gwuser = self.get("gatewayUser"),
611                     gw = self.get("gateway"),
612                     agent = True,
613                     identity = self.get("identity"),
614                     server_key = self.get("serverKey"),
615                     strict_host_checking = False
616                     )
617         
618         return pidtuple
619
620     def status(self, pid, ppid):
621         if self.localhost:
622             status = execfuncs.lstatus(pid, ppid)
623         else:
624             with self._node_lock:
625                 status = sshfuncs.rstatus(
626                         pid, ppid,
627                         host = self.get("hostname"),
628                         user = self.get("username"),
629                         port = self.get("port"),
630                         gwuser = self.get("gatewayUser"),
631                         gw = self.get("gateway"),
632                         agent = True,
633                         identity = self.get("identity"),
634                         server_key = self.get("serverKey"),
635                         strict_host_checking = False
636                         )
637            
638         return status
639     
640     def kill(self, pid, ppid, sudo = False):
641         out = err = ""
642         proc = None
643         status = self.status(pid, ppid)
644
645         if status == sshfuncs.ProcStatus.RUNNING:
646             if self.localhost:
647                 (out, err), proc = execfuncs.lkill(pid, ppid, sudo)
648             else:
649                 with self._node_lock:
650                     (out, err), proc = sshfuncs.rkill(
651                         pid, ppid,
652                         host = self.get("hostname"),
653                         user = self.get("username"),
654                         port = self.get("port"),
655                         gwuser = self.get("gatewayUser"),
656                         gw = self.get("gateway"),
657                         agent = True,
658                         sudo = sudo,
659                         identity = self.get("identity"),
660                         server_key = self.get("serverKey"),
661                         strict_host_checking = False
662                         )
663
664         return (out, err), proc
665
666     def copy(self, src, dst):
667         if self.localhost:
668             (out, err), proc = execfuncs.lcopy(src, dst, 
669                     recursive = True)
670         else:
671             with self._node_lock:
672                 (out, err), proc = sshfuncs.rcopy(
673                     src, dst, 
674                     port = self.get("port"),
675                     gwuser = self.get("gatewayUser"),
676                     gw = self.get("gateway"),
677                     identity = self.get("identity"),
678                     server_key = self.get("serverKey"),
679                     recursive = True,
680                     strict_host_checking = False)
681
682         return (out, err), proc
683
684     def upload(self, src, dst, text = False, overwrite = True,
685             raise_on_error = True):
686         """ Copy content to destination
687
688         src  string with the content to copy. Can be:
689             - plain text
690             - a string with the path to a local file
691             - a string with a semi-colon separeted list of local files
692             - a string with a local directory
693
694         dst  string with destination path on the remote host (remote is 
695             always self.host)
696
697         text src is text input, it must be stored into a temp file before 
698         uploading
699         """
700         # If source is a string input 
701         f = None
702         if text and not os.path.isfile(src):
703             # src is text input that should be uploaded as file
704             # create a temporal file with the content to upload
705             f = tempfile.NamedTemporaryFile(delete=False)
706             f.write(src)
707             f.close()
708             src = f.name
709
710         # If dst files should not be overwritten, check that the files do not
711         # exits already
712         if isinstance(src, str):
713             src = map(str.strip, src.split(";"))
714     
715         if overwrite == False:
716             src = self.filter_existing_files(src, dst)
717             if not src:
718                 return ("", ""), None
719
720         if not self.localhost:
721             # Build destination as <user>@<server>:<path>
722             dst = "%s@%s:%s" % (self.get("username"), self.get("hostname"), dst)
723
724         ((out, err), proc) = self.copy(src, dst)
725
726         # clean up temp file
727         if f:
728             os.remove(f.name)
729
730         if err:
731             msg = " Failed to upload files - src: %s dst: %s" %  (";".join(src), dst) 
732             self.error(msg, out, err)
733             
734             msg = "%s out: %s err: %s" % (msg, out, err)
735             if raise_on_error:
736                 raise RuntimeError, msg
737
738         return ((out, err), proc)
739
740     def download(self, src, dst, raise_on_error = True):
741         if not self.localhost:
742             # Build destination as <user>@<server>:<path>
743             src = "%s@%s:%s" % (self.get("username"), self.get("hostname"), src)
744
745         ((out, err), proc) = self.copy(src, dst)
746
747         if err:
748             msg = " Failed to download files - src: %s dst: %s" %  (";".join(src), dst) 
749             self.error(msg, out, err)
750
751             if raise_on_error:
752                 raise RuntimeError, msg
753
754         return ((out, err), proc)
755
756     def install_packages_command(self, packages):
757         command = ""
758         if self.use_rpm:
759             command = rpmfuncs.install_packages_command(self.os, packages)
760         elif self.use_deb:
761             command = debfuncs.install_packages_command(self.os, packages)
762         else:
763             msg = "Error installing packages ( OS not known ) "
764             self.error(msg, self.os)
765             raise RuntimeError, msg
766
767         return command
768
769     def install_packages(self, packages, home, run_home = None,
770             raise_on_error = True):
771         """ Install packages in the Linux host.
772
773         'home' is the directory to upload the package installation script.
774         'run_home' is the directory from where to execute the script.
775         """
776         command = self.install_packages_command(packages)
777
778         run_home = run_home or home
779
780         (out, err), proc = self.run_and_wait(command, run_home, 
781             shfile = os.path.join(home, "instpkg.sh"),
782             pidfile = "instpkg_pidfile",
783             ecodefile = "instpkg_exitcode",
784             stdout = "instpkg_stdout", 
785             stderr = "instpkg_stderr",
786             overwrite = False,
787             raise_on_error = raise_on_error)
788
789         return (out, err), proc 
790
791     def remove_packages(self, packages, home, run_home = None,
792             raise_on_error = True):
793         """ Uninstall packages from the Linux host.
794
795         'home' is the directory to upload the package un-installation script.
796         'run_home' is the directory from where to execute the script.
797         """
798         if self.use_rpm:
799             command = rpmfuncs.remove_packages_command(self.os, packages)
800         elif self.use_deb:
801             command = debfuncs.remove_packages_command(self.os, packages)
802         else:
803             msg = "Error removing packages ( OS not known ) "
804             self.error(msg)
805             raise RuntimeError, msg
806
807         run_home = run_home or home
808
809         (out, err), proc = self.run_and_wait(command, run_home, 
810             shfile = os.path.join(home, "rmpkg.sh"),
811             pidfile = "rmpkg_pidfile",
812             ecodefile = "rmpkg_exitcode",
813             stdout = "rmpkg_stdout", 
814             stderr = "rmpkg_stderr",
815             overwrite = False,
816             raise_on_error = raise_on_error)
817          
818         return (out, err), proc 
819
820     def mkdir(self, paths, clean = False):
821         """ Paths is either a single remote directory path to create,
822         or a list of directories to create.
823         """
824         if clean:
825             self.rmdir(paths)
826
827         if isinstance(paths, str):
828             paths = [paths]
829
830         cmd = " ; ".join(map(lambda path: "mkdir -p %s" % path, paths))
831
832         return self.execute(cmd, with_lock = True)
833
834     def rmdir(self, paths):
835         """ Paths is either a single remote directory path to delete,
836         or a list of directories to delete.
837         """
838
839         if isinstance(paths, str):
840             paths = [paths]
841
842         cmd = " ; ".join(map(lambda path: "rm -rf %s" % path, paths))
843
844         return self.execute(cmd, with_lock = True)
845         
846     def run_and_wait(self, command, home, 
847             shfile = "cmd.sh",
848             env = None,
849             overwrite = True,
850             pidfile = "pidfile", 
851             ecodefile = "exitcode", 
852             stdin = None, 
853             stdout = "stdout", 
854             stderr = "stderr", 
855             sudo = False,
856             tty = False,
857             raise_on_error = True):
858         """
859         Uploads the 'command' to a bash script in the host.
860         Then runs the script detached in background in the host, and
861         busy-waites until the script finishes executing.
862         """
863
864         if not shfile.startswith("/"):
865             shfile = os.path.join(home, shfile)
866
867         self.upload_command(command, 
868             shfile = shfile, 
869             ecodefile = ecodefile, 
870             env = env,
871             overwrite = overwrite)
872
873         command = "bash %s" % shfile
874         # run command in background in remote host
875         (out, err), proc = self.run(command, home, 
876                 pidfile = pidfile,
877                 stdin = stdin, 
878                 stdout = stdout, 
879                 stderr = stderr, 
880                 sudo = sudo,
881                 tty = tty)
882
883         # check no errors occurred
884         if proc.poll():
885             msg = " Failed to run command '%s' " % command
886             self.error(msg, out, err)
887             if raise_on_error:
888                 raise RuntimeError, msg
889
890         # Wait for pid file to be generated
891         pid, ppid = self.wait_pid(
892                 home = home, 
893                 pidfile = pidfile, 
894                 raise_on_error = raise_on_error)
895
896         # wait until command finishes to execute
897         self.wait_run(pid, ppid)
898       
899         (eout, err), proc = self.check_errors(home,
900             ecodefile = ecodefile,
901             stderr = stderr)
902
903         # Out is what was written in the stderr file
904         if err:
905             msg = " Failed to run command '%s' " % command
906             self.error(msg, eout, err)
907
908             if raise_on_error:
909                 raise RuntimeError, msg
910
911         (out, oerr), proc = self.check_output(home, stdout)
912         
913         return (out, err), proc
914
915     def exitcode(self, home, ecodefile = "exitcode"):
916         """
917         Get the exit code of an application.
918         Returns an integer value with the exit code 
919         """
920         (out, err), proc = self.check_output(home, ecodefile)
921
922         # Succeeded to open file, return exit code in the file
923         if proc.wait() == 0:
924             try:
925                 return int(out.strip())
926             except:
927                 # Error in the content of the file!
928                 return ExitCode.CORRUPTFILE
929
930         # No such file or directory
931         if proc.returncode == 1:
932             return ExitCode.FILENOTFOUND
933         
934         # Other error from 'cat'
935         return ExitCode.ERROR
936
937     def upload_command(self, command, 
938             shfile = "cmd.sh",
939             ecodefile = "exitcode",
940             overwrite = True,
941             env = None):
942         """ Saves the command as a bash script file in the remote host, and
943         forces to save the exit code of the command execution to the ecodefile
944         """
945
946         if not (command.strip().endswith(";") or command.strip().endswith("&")):
947             command += ";"
948       
949         # The exit code of the command will be stored in ecodefile
950         command = " { %(command)s } ; echo $? > %(ecodefile)s ;" % {
951                 'command': command,
952                 'ecodefile': ecodefile,
953                 } 
954
955         # Export environment
956         environ = self.format_environment(env)
957
958         # Add environ to command
959         command = environ + command
960
961         return self.upload(command, shfile, text = True, overwrite = overwrite)
962
963     def format_environment(self, env, inline = False):
964         """ Formats the environment variables for a command to be executed
965         either as an inline command
966         (i.e. export PYTHONPATH=src/..; export LALAL= ..;python script.py) or 
967         as a bash script (i.e. export PYTHONPATH=src/.. \n export LALA=.. \n)
968         """
969         if not env: return ""
970
971         # Remove extra white spaces
972         env = re.sub(r'\s+', ' ', env.strip())
973
974         sep = ";" if inline else "\n"
975         return sep.join(map(lambda e: " export %s" % e, env.split(" "))) + sep 
976
977     def check_errors(self, home, 
978             ecodefile = "exitcode", 
979             stderr = "stderr"):
980         """ Checks whether errors occurred while running a command.
981         It first checks the exit code for the command, and only if the
982         exit code is an error one it returns the error output.
983
984         """
985         proc = None
986         err = ""
987
988         # get exit code saved in the 'exitcode' file
989         ecode = self.exitcode(home, ecodefile)
990
991         if ecode in [ ExitCode.CORRUPTFILE, ExitCode.ERROR ]:
992             err = "Error retrieving exit code status from file %s/%s" % (home, ecodefile)
993         elif ecode > 0 or ecode == ExitCode.FILENOTFOUND:
994             # The process returned an error code or didn't exist. 
995             # Check standard error.
996             (err, eerr), proc = self.check_output(home, stderr)
997
998             # If the stderr file was not found, assume nothing bad happened,
999             # and just ignore the error.
1000             # (cat returns 1 for error "No such file or directory")
1001             if ecode == ExitCode.FILENOTFOUND and proc.poll() == 1: 
1002                 err = "" 
1003             
1004         return ("", err), proc
1005  
1006     def wait_pid(self, home, pidfile = "pidfile", raise_on_error = False):
1007         """ Waits until the pid file for the command is generated, 
1008             and returns the pid and ppid of the process """
1009         pid = ppid = None
1010         delay = 1.0
1011
1012         for i in xrange(2):
1013             pidtuple = self.getpid(home = home, pidfile = pidfile)
1014             
1015             if pidtuple:
1016                 pid, ppid = pidtuple
1017                 break
1018             else:
1019                 time.sleep(delay)
1020                 delay = delay * 1.5
1021         else:
1022             msg = " Failed to get pid for pidfile %s/%s " % (
1023                     home, pidfile )
1024             self.error(msg)
1025             
1026             if raise_on_error:
1027                 raise RuntimeError, msg
1028
1029         return pid, ppid
1030
1031     def wait_run(self, pid, ppid, trial = 0):
1032         """ wait for a remote process to finish execution """
1033         delay = 1.0
1034
1035         while True:
1036             status = self.status(pid, ppid)
1037             
1038             if status is ProcStatus.FINISHED:
1039                 break
1040             elif status is not ProcStatus.RUNNING:
1041                 delay = delay * 1.5
1042                 time.sleep(delay)
1043                 # If it takes more than 20 seconds to start, then
1044                 # asume something went wrong
1045                 if delay > 20:
1046                     break
1047             else:
1048                 # The app is running, just wait...
1049                 time.sleep(0.5)
1050
1051     def check_output(self, home, filename):
1052         """ Retrives content of file """
1053         (out, err), proc = self.execute("cat %s" % 
1054             os.path.join(home, filename), retry = 1, with_lock = True)
1055         return (out, err), proc
1056
1057     def is_alive(self):
1058         """ Checks if host is responsive
1059         """
1060         if self.localhost:
1061             return True
1062
1063         out = err = ""
1064         msg = "Unresponsive host. Wrong answer. "
1065
1066         # The underlying SSH layer will sometimes return an empty
1067         # output (even if the command was executed without errors).
1068         # To work arround this, repeat the operation N times or
1069         # until the result is not empty string
1070         try:
1071             (out, err), proc = self.execute("echo 'ALIVE'",
1072                     blocking = True,
1073                     with_lock = True)
1074     
1075             if out.find("ALIVE") > -1:
1076                 return True
1077         except:
1078             trace = traceback.format_exc()
1079             msg = "Unresponsive host. Error reaching host: %s " % trace
1080
1081         self.error(msg, out, err)
1082         return False
1083
1084     def find_home(self):
1085         """ Retrieves host home directory
1086         """
1087         # The underlying SSH layer will sometimes return an empty
1088         # output (even if the command was executed without errors).
1089         # To work arround this, repeat the operation N times or
1090         # until the result is not empty string
1091         msg = "Impossible to retrieve HOME directory"
1092         try:
1093             (out, err), proc = self.execute("echo ${HOME}",
1094                     blocking = True,
1095                     with_lock = True)
1096     
1097             if out.strip() != "":
1098                 self._home_dir =  out.strip()
1099         except:
1100             trace = traceback.format_exc()
1101             msg = "Impossible to retrieve HOME directory %s" % trace
1102
1103         if not self._home_dir:
1104             self.error(msg)
1105             raise RuntimeError, msg
1106
1107     def filter_existing_files(self, src, dst):
1108         """ Removes files that already exist in the Linux host from src list
1109         """
1110         # construct a dictionary with { dst: src }
1111         dests = dict(map(lambda s: (os.path.join(dst, os.path.basename(s)), s), src)) \
1112                     if len(src) > 1 else dict({dst: src[0]})
1113
1114         command = []
1115         for d in dests.keys():
1116             command.append(" [ -f %(dst)s ] && echo '%(dst)s' " % {'dst' : d} )
1117
1118         command = ";".join(command)
1119
1120         (out, err), proc = self.execute(command, retry = 1, with_lock = True)
1121     
1122         for d in dests.keys():
1123             if out.find(d) > -1:
1124                 del dests[d]
1125
1126         if not dests:
1127             return []
1128
1129         return dests.values()
1130