72af97801a1a50ff78f1e5ac8ca87acea012f723
[nepi.git] / src / nepi / resources / linux / node.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19
20 from nepi.execution.attribute import Attribute, Flags, Types
21 from nepi.execution.resource import ResourceManager, clsinit_copy, \
22         ResourceState, reschedule_delay
23 from nepi.resources.linux import rpmfuncs, debfuncs 
24 from nepi.util import sshfuncs, execfuncs
25 from nepi.util.sshfuncs import ProcStatus
26
27 import collections
28 import os
29 import random
30 import re
31 import socket
32 import tempfile
33 import time
34 import threading
35 import traceback
36
37 # TODO: Unify delays!!
38 # TODO: Validate outcome of uploads!! 
39
40 class ExitCode:
41     """
42     Error codes that the rexitcode function can return if unable to
43     check the exit code of a spawned process
44     """
45     FILENOTFOUND = -1
46     CORRUPTFILE = -2
47     ERROR = -3
48     OK = 0
49
50 class OSType:
51     """
52     Supported flavors of Linux OS
53     """
54     FEDORA_8 = "f8"
55     FEDORA_12 = "f12"
56     FEDORA_14 = "f14"
57     FEDORA = "fedora"
58     UBUNTU = "ubuntu"
59     DEBIAN = "debian"
60
61 @clsinit_copy
62 class LinuxNode(ResourceManager):
63     """
64     .. class:: Class Args :
65       
66         :param ec: The Experiment controller
67         :type ec: ExperimentController
68         :param guid: guid of the RM
69         :type guid: int
70
71     .. note::
72
73         There are different ways in which commands can be executed using the
74         LinuxNode interface (i.e. 'execute' - blocking and non blocking, 'run',
75         'run_and_wait'). 
76         
77         Brief explanation:
78
79             * 'execute' (blocking mode) :  
80
81                      HOW IT WORKS: 'execute', forks a process and run the
82                      command, synchronously, attached to the terminal, in
83                      foreground.
84                      The execute method will block until the command returns
85                      the result on 'out', 'err' (so until it finishes executing).
86   
87                      USAGE: short-lived commands that must be executed attached
88                      to a terminal and in foreground, for which it IS necessary
89                      to block until the command has finished (e.g. if you want
90                      to run 'ls' or 'cat').
91
92             * 'execute' (NON blocking mode - blocking = False) :
93
94                     HOW IT WORKS: Same as before, except that execute method
95                     will return immediately (even if command still running).
96
97                     USAGE: long-lived commands that must be executed attached
98                     to a terminal and in foreground, but for which it is not
99                     necessary to block until the command has finished. (e.g.
100                     start an application using X11 forwarding)
101
102              * 'run' :
103
104                    HOW IT WORKS: Connects to the host ( using SSH if remote)
105                    and launches the command in background, detached from any
106                    terminal (daemonized), and returns. The command continues to
107                    run remotely, but since it is detached from the terminal,
108                    its pipes (stdin, stdout, stderr) can't be redirected to the
109                    console (as normal non detached processes would), and so they
110                    are explicitly redirected to files. The pidfile is created as
111                    part of the process of launching the command. The pidfile
112                    holds the pid and ppid of the process forked in background,
113                    so later on it is possible to check whether the command is still
114                    running.
115
116                     USAGE: long-lived commands that can run detached in background,
117                     for which it is NOT necessary to block (wait) until the command
118                     has finished. (e.g. start an application that is not using X11
119                     forwarding. It can run detached and remotely in background)
120
121              * 'run_and_wait' :
122
123                     HOW IT WORKS: Similar to 'run' except that it 'blocks' until
124                     the command has finished execution. It also checks whether
125                     errors occurred during runtime by reading the exitcode file,
126                     which contains the exit code of the command that was run
127                     (checking stderr only is not always reliable since many
128                     commands throw debugging info to stderr and the only way to
129                     automatically know whether an error really happened is to
130                     check the process exit code).
131
132                     Another difference with respect to 'run', is that instead
133                     of directly executing the command as a bash command line,
134                     it uploads the command to a bash script and runs the script.
135                     This allows to use the bash script to debug errors, since
136                     it remains at the remote host and can be run manually to
137                     reproduce the error.
138                   
139                     USAGE: medium-lived commands that can run detached in
140                     background, for which it IS necessary to block (wait) until
141                     the command has finished. (e.g. Package installation,
142                     source compilation, file download, etc)
143
144     """
145     _rtype = "LinuxNode"
146     _help = "Controls Linux host machines ( either localhost or a host " \
147             "that can be accessed using a SSH key)"
148     _backend_type = "linux"
149
150     @classmethod
151     def _register_attributes(cls):
152         hostname = Attribute("hostname", "Hostname of the machine",
153                 flags = Flags.Design)
154
155         username = Attribute("username", "Local account username", 
156                 flags = Flags.Credential)
157
158         port = Attribute("port", "SSH port", flags = Flags.Design)
159         
160         home = Attribute("home",
161                 "Experiment home directory to store all experiment related files",
162                 flags = Flags.Design)
163         
164         identity = Attribute("identity", "SSH identity file",
165                 flags = Flags.Credential)
166         
167         server_key = Attribute("serverKey", "Server public key", 
168                 flags = Flags.Design)
169         
170         clean_home = Attribute("cleanHome", "Remove all nepi files and directories "
171                 " from node home folder before starting experiment", 
172                 type = Types.Bool,
173                 default = False,
174                 flags = Flags.Design)
175
176         clean_experiment = Attribute("cleanExperiment", "Remove all files and directories " 
177                 " from a previous same experiment, before the new experiment starts", 
178                 type = Types.Bool,
179                 default = False,
180                 flags = Flags.Design)
181         
182         clean_processes = Attribute("cleanProcesses", 
183                 "Kill all running processes before starting experiment",
184                 type = Types.Bool,
185                 default = False,
186                 flags = Flags.Design)
187         
188         tear_down = Attribute("tearDown", "Bash script to be executed before " + \
189                 "releasing the resource",
190                 flags = Flags.Design)
191
192         gateway_user = Attribute("gatewayUser", "Gateway account username",
193                 flags = Flags.Design)
194
195         gateway = Attribute("gateway", "Hostname of the gateway machine",
196                 flags = Flags.Design)
197
198         ip = Attribute("ip", "Linux host public IP address. "
199                    "Must not be modified by the user unless hostname is 'localhost'",
200                     flags = Flags.Design)
201
202         cls._register_attribute(hostname)
203         cls._register_attribute(username)
204         cls._register_attribute(port)
205         cls._register_attribute(home)
206         cls._register_attribute(identity)
207         cls._register_attribute(server_key)
208         cls._register_attribute(clean_home)
209         cls._register_attribute(clean_experiment)
210         cls._register_attribute(clean_processes)
211         cls._register_attribute(tear_down)
212         cls._register_attribute(gateway_user)
213         cls._register_attribute(gateway)
214         cls._register_attribute(ip)
215
216     def __init__(self, ec, guid):
217         super(LinuxNode, self).__init__(ec, guid)
218         self._os = None
219         # home directory at Linux host
220         self._home_dir = ""
221
222         # lock to prevent concurrent applications on the same node,
223         # to execute commands at the same time. There are potential
224         # concurrency issues when using SSH to a same host from 
225         # multiple threads. There are also possible operational 
226         # issues, e.g. an application querying the existence 
227         # of a file or folder prior to its creation, and another 
228         # application creating the same file or folder in between.
229         self._node_lock = threading.Lock()
230     
231     def log_message(self, msg):
232         return " guid %d - host %s - %s " % (self.guid, 
233                 self.get("hostname"), msg)
234
235     @property
236     def home_dir(self):
237         home = self.get("home") or ""
238         if not home.startswith("/"):
239            home = os.path.join(self._home_dir, home) 
240         return home
241
242     @property
243     def nepi_home(self):
244         return os.path.join(self.home_dir, ".nepi")
245
246     @property
247     def usr_dir(self):
248         return os.path.join(self.nepi_home, "nepi-usr")
249
250     @property
251     def lib_dir(self):
252         return os.path.join(self.usr_dir, "lib")
253
254     @property
255     def bin_dir(self):
256         return os.path.join(self.usr_dir, "bin")
257
258     @property
259     def src_dir(self):
260         return os.path.join(self.usr_dir, "src")
261
262     @property
263     def share_dir(self):
264         return os.path.join(self.usr_dir, "share")
265
266     @property
267     def exp_dir(self):
268         return os.path.join(self.nepi_home, "nepi-exp")
269
270     @property
271     def exp_home(self):
272         return os.path.join(self.exp_dir, self.ec.exp_id)
273
274     @property
275     def node_home(self):
276         return os.path.join(self.exp_home, "node-%d" % self.guid)
277
278     @property
279     def run_home(self):
280         return os.path.join(self.node_home, self.ec.run_id)
281
282     @property
283     def os(self):
284         if self._os:
285             return self._os
286
287         if not self.localhost and not self.get("username"):
288             msg = "Can't resolve OS, insufficient data "
289             self.error(msg)
290             raise RuntimeError, msg
291
292         out = self.get_os()
293
294         if out.find("Fedora release 8") == 0:
295             self._os = OSType.FEDORA_8
296         elif out.find("Fedora release 12") == 0:
297             self._os = OSType.FEDORA_12
298         elif out.find("Fedora release 14") == 0:
299             self._os = OSType.FEDORA_14
300         elif out.find("Fedora release") == 0:
301             self._os = OSType.FEDORA
302         elif out.find("Debian") == 0: 
303             self._os = OSType.DEBIAN
304         elif out.find("Ubuntu") ==0:
305             self._os = OSType.UBUNTU
306         else:
307             msg = "Unsupported OS"
308             self.error(msg, out)
309             raise RuntimeError, "%s - %s " %( msg, out )
310
311         return self._os
312
313     def get_os(self):
314         # The underlying SSH layer will sometimes return an empty
315         # output (even if the command was executed without errors).
316         # To work arround this, repeat the operation N times or
317         # until the result is not empty string
318         out = ""
319         try:
320             (out, err), proc = self.execute("cat /etc/issue", 
321                     with_lock = True,
322                     blocking = True)
323         except:
324             trace = traceback.format_exc()
325             msg = "Error detecting OS: %s " % trace
326             self.error(msg, out, err)
327         
328         return out
329
330     @property
331     def use_deb(self):
332         return self.os in [OSType.DEBIAN, OSType.UBUNTU]
333
334     @property
335     def use_rpm(self):
336         return self.os in [OSType.FEDORA_12, OSType.FEDORA_14, OSType.FEDORA_8,
337                 OSType.FEDORA]
338
339     @property
340     def localhost(self):
341         return self.get("hostname") in ['localhost', '127.0.0.1', '::1']
342
343     def do_provision(self):
344         # check if host is alive
345         if not self.is_alive():
346             msg = "Deploy failed. Unresponsive node %s" % self.get("hostname")
347             self.error(msg)
348             raise RuntimeError, msg
349
350         self.find_home()
351
352         if self.get("cleanProcesses"):
353             self.clean_processes()
354
355         if self.get("cleanHome"):
356             self.clean_home()
357  
358         if self.get("cleanExperiment"):
359             self.clean_experiment()
360     
361         # Create shared directory structure and node home directory
362         paths = [self.lib_dir, 
363             self.bin_dir, 
364             self.src_dir, 
365             self.share_dir, 
366             self.node_home]
367
368         self.mkdir(paths)
369
370         # Get Public IP address
371         if not self.get("ip"):
372             if self.localhost:
373                 ip = socket.gethostbyname(socket.gethostname())
374             else:
375                 ip = socket.gethostbyname(self.get("hostname"))
376
377             self.set("ip", ip)
378
379         super(LinuxNode, self).do_provision()
380
381     def do_deploy(self):
382         if self.state == ResourceState.NEW:
383             self.info("Deploying node")
384             self.do_discover()
385             self.do_provision()
386
387         # Node needs to wait until all associated interfaces are 
388         # ready before it can finalize deployment
389         from nepi.resources.linux.interface import LinuxInterface
390         ifaces = self.get_connected(LinuxInterface.get_rtype())
391         for iface in ifaces:
392             if iface.state < ResourceState.READY:
393                 self.ec.schedule(reschedule_delay, self.deploy)
394                 return 
395
396         super(LinuxNode, self).do_deploy()
397
398     def do_release(self):
399         rms = self.get_connected()
400         for rm in rms:
401             # Node needs to wait until all associated RMs are released
402             # before it can be released
403             if rm.state != ResourceState.RELEASED:
404                 self.ec.schedule(reschedule_delay, self.release)
405                 return 
406
407         tear_down = self.get("tearDown")
408         if tear_down:
409             self.execute(tear_down)
410
411         self.clean_processes()
412
413         super(LinuxNode, self).do_release()
414
415     def valid_connection(self, guid):
416         # TODO: Validate!
417         return True
418
419     def clean_processes(self):
420         self.info("Cleaning up processes")
421
422         if self.localhost:
423             return 
424         
425         if self.get("username") != 'root':
426             cmd = ("sudo -S killall tcpdump || /bin/true ; " +
427                 "sudo -S killall -u %s || /bin/true ; " % self.get("username"))
428         else:
429             if self.state >= ResourceState.READY:
430                 import pickle
431                 pids = pickle.load(open("/tmp/save.proc", "rb"))
432                 pids_temp = dict()
433                 ps_aux = "ps aux |awk '{print $2,$11}'"
434                 (out, err), proc = self.execute(ps_aux)
435                 if len(out) != 0:
436                     for line in out.strip().split("\n"):
437                         parts = line.strip().split(" ")
438                         pids_temp[parts[0]] = parts[1]
439                     kill_pids = set(pids_temp.items()) - set(pids.items())
440                     kill_pids = ' '.join(dict(kill_pids).keys())
441
442                     cmd = ("killall tcpdump || /bin/true ; " +
443                         "kill $(ps aux | grep '[n]epi' | awk '{print $2}') || /bin/true ; " +
444                         "kill %s || /bin/true ; " % kill_pids)
445                 else:
446                     cmd = ("killall tcpdump || /bin/true ; " +
447                         "kill $(ps aux | grep '[n]epi' | awk '{print $2}') || /bin/true ; ")
448             else:
449                 cmd = ("killall tcpdump || /bin/true ; " +
450                     "kill $(ps aux | grep '[n]epi' | awk '{print $2}') || /bin/true ; ")
451
452         (out, err), proc = self.execute(cmd, retry = 1, with_lock = True)
453
454     def clean_home(self):
455         """ Cleans all NEPI related folders in the Linux host
456         """
457         self.info("Cleaning up home")
458         
459         cmd = "cd %s ; find . -maxdepth 1 -name \.nepi -execdir rm -rf {} + " % (
460                 self.home_dir )
461
462         return self.execute(cmd, with_lock = True)
463
464     def clean_experiment(self):
465         """ Cleans all experiment related files in the Linux host.
466         It preserves NEPI files and folders that have a multi experiment
467         scope.
468         """
469         self.info("Cleaning up experiment files")
470         
471         cmd = "cd %s ; find . -maxdepth 1 -name '%s' -execdir rm -rf {} + " % (
472                 self.exp_dir,
473                 self.ec.exp_id )
474             
475         return self.execute(cmd, with_lock = True)
476
477     def execute(self, command,
478             sudo = False,
479             env = None,
480             tty = False,
481             forward_x11 = False,
482             retry = 3,
483             connect_timeout = 30,
484             strict_host_checking = False,
485             persistent = True,
486             blocking = True,
487             with_lock = False
488             ):
489         """ Notice that this invocation will block until the
490         execution finishes. If this is not the desired behavior,
491         use 'run' instead."""
492
493         if self.localhost:
494             (out, err), proc = execfuncs.lexec(command, 
495                     user = self.get("username"), # still problem with localhost
496                     sudo = sudo,
497                     env = env)
498         else:
499             if with_lock:
500                 # If the execute command is blocking, we don't want to keep
501                 # the node lock. This lock is used to avoid race conditions
502                 # when creating the ControlMaster sockets. A more elegant
503                 # solution is needed.
504                 with self._node_lock:
505                     (out, err), proc = sshfuncs.rexec(
506                         command, 
507                         host = self.get("hostname"),
508                         user = self.get("username"),
509                         port = self.get("port"),
510                         gwuser = self.get("gatewayUser"),
511                         gw = self.get("gateway"),
512                         agent = True,
513                         sudo = sudo,
514                         identity = self.get("identity"),
515                         server_key = self.get("serverKey"),
516                         env = env,
517                         tty = tty,
518                         forward_x11 = forward_x11,
519                         retry = retry,
520                         connect_timeout = connect_timeout,
521                         persistent = persistent,
522                         blocking = blocking, 
523                         strict_host_checking = strict_host_checking
524                         )
525             else:
526                 (out, err), proc = sshfuncs.rexec(
527                     command, 
528                     host = self.get("hostname"),
529                     user = self.get("username"),
530                     port = self.get("port"),
531                     gwuser = self.get("gatewayUser"),
532                     gw = self.get("gateway"),
533                     agent = True,
534                     sudo = sudo,
535                     identity = self.get("identity"),
536                     server_key = self.get("serverKey"),
537                     env = env,
538                     tty = tty,
539                     forward_x11 = forward_x11,
540                     retry = retry,
541                     connect_timeout = connect_timeout,
542                     persistent = persistent,
543                     blocking = blocking, 
544                     strict_host_checking = strict_host_checking
545                     )
546
547         return (out, err), proc
548
549     def run(self, command, home,
550             create_home = False,
551             pidfile = 'pidfile',
552             stdin = None, 
553             stdout = 'stdout', 
554             stderr = 'stderr', 
555             sudo = False,
556             tty = False):
557         
558         self.debug("Running command '%s'" % command)
559         
560         if self.localhost:
561             (out, err), proc = execfuncs.lspawn(command, pidfile,
562                     home = home, 
563                     create_home = create_home, 
564                     stdin = stdin or '/dev/null',
565                     stdout = stdout or '/dev/null',
566                     stderr = stderr or '/dev/null',
567                     sudo = sudo) 
568         else:
569             with self._node_lock:
570                 (out, err), proc = sshfuncs.rspawn(
571                     command,
572                     pidfile = pidfile,
573                     home = home,
574                     create_home = create_home,
575                     stdin = stdin or '/dev/null',
576                     stdout = stdout or '/dev/null',
577                     stderr = stderr or '/dev/null',
578                     sudo = sudo,
579                     host = self.get("hostname"),
580                     user = self.get("username"),
581                     port = self.get("port"),
582                     gwuser = self.get("gatewayUser"),
583                     gw = self.get("gateway"),
584                     agent = True,
585                     identity = self.get("identity"),
586                     server_key = self.get("serverKey"),
587                     tty = tty
588                     )
589
590         return (out, err), proc
591
592     def getpid(self, home, pidfile = "pidfile"):
593         if self.localhost:
594             pidtuple =  execfuncs.lgetpid(os.path.join(home, pidfile))
595         else:
596             with self._node_lock:
597                 pidtuple = sshfuncs.rgetpid(
598                     os.path.join(home, pidfile),
599                     host = self.get("hostname"),
600                     user = self.get("username"),
601                     port = self.get("port"),
602                     gwuser = self.get("gatewayUser"),
603                     gw = self.get("gateway"),
604                     agent = True,
605                     identity = self.get("identity"),
606                     server_key = self.get("serverKey")
607                     )
608         
609         return pidtuple
610
611     def status(self, pid, ppid):
612         if self.localhost:
613             status = execfuncs.lstatus(pid, ppid)
614         else:
615             with self._node_lock:
616                 status = sshfuncs.rstatus(
617                         pid, ppid,
618                         host = self.get("hostname"),
619                         user = self.get("username"),
620                         port = self.get("port"),
621                         gwuser = self.get("gatewayUser"),
622                         gw = self.get("gateway"),
623                         agent = True,
624                         identity = self.get("identity"),
625                         server_key = self.get("serverKey")
626                         )
627            
628         return status
629     
630     def kill(self, pid, ppid, sudo = False):
631         out = err = ""
632         proc = None
633         status = self.status(pid, ppid)
634
635         if status == sshfuncs.ProcStatus.RUNNING:
636             if self.localhost:
637                 (out, err), proc = execfuncs.lkill(pid, ppid, sudo)
638             else:
639                 with self._node_lock:
640                     (out, err), proc = sshfuncs.rkill(
641                         pid, ppid,
642                         host = self.get("hostname"),
643                         user = self.get("username"),
644                         port = self.get("port"),
645                         gwuser = self.get("gatewayUser"),
646                         gw = self.get("gateway"),
647                         agent = True,
648                         sudo = sudo,
649                         identity = self.get("identity"),
650                         server_key = self.get("serverKey")
651                         )
652
653         return (out, err), proc
654
655     def copy(self, src, dst):
656         if self.localhost:
657             (out, err), proc = execfuncs.lcopy(src, dst, 
658                     recursive = True)
659         else:
660             with self._node_lock:
661                 (out, err), proc = sshfuncs.rcopy(
662                     src, dst, 
663                     port = self.get("port"),
664                     gwuser = self.get("gatewayUser"),
665                     gw = self.get("gateway"),
666                     identity = self.get("identity"),
667                     server_key = self.get("serverKey"),
668                     recursive = True,
669                     strict_host_checking = False)
670
671         return (out, err), proc
672
673     def upload(self, src, dst, text = False, overwrite = True,
674             raise_on_error = True):
675         """ Copy content to destination
676
677         src  string with the content to copy. Can be:
678             - plain text
679             - a string with the path to a local file
680             - a string with a semi-colon separeted list of local files
681             - a string with a local directory
682
683         dst  string with destination path on the remote host (remote is 
684             always self.host)
685
686         text src is text input, it must be stored into a temp file before 
687         uploading
688         """
689         # If source is a string input 
690         f = None
691         if text and not os.path.isfile(src):
692             # src is text input that should be uploaded as file
693             # create a temporal file with the content to upload
694             f = tempfile.NamedTemporaryFile(delete=False)
695             f.write(src)
696             f.close()
697             src = f.name
698
699         # If dst files should not be overwritten, check that the files do not
700         # exits already
701         if isinstance(src, str):
702             src = map(str.strip, src.split(";"))
703     
704         if overwrite == False:
705             src = self.filter_existing_files(src, dst)
706             if not src:
707                 return ("", ""), None
708
709         if not self.localhost:
710             # Build destination as <user>@<server>:<path>
711             dst = "%s@%s:%s" % (self.get("username"), self.get("hostname"), dst)
712
713         ((out, err), proc) = self.copy(src, dst)
714
715         # clean up temp file
716         if f:
717             os.remove(f.name)
718
719         if err:
720             msg = " Failed to upload files - src: %s dst: %s" %  (";".join(src), dst) 
721             self.error(msg, out, err)
722             
723             msg = "%s out: %s err: %s" % (msg, out, err)
724             if raise_on_error:
725                 raise RuntimeError, msg
726
727         return ((out, err), proc)
728
729     def download(self, src, dst, raise_on_error = True):
730         if not self.localhost:
731             # Build destination as <user>@<server>:<path>
732             src = "%s@%s:%s" % (self.get("username"), self.get("hostname"), src)
733
734         ((out, err), proc) = self.copy(src, dst)
735
736         if err:
737             msg = " Failed to download files - src: %s dst: %s" %  (";".join(src), dst) 
738             self.error(msg, out, err)
739
740             if raise_on_error:
741                 raise RuntimeError, msg
742
743         return ((out, err), proc)
744
745     def install_packages_command(self, packages):
746         command = ""
747         if self.use_rpm:
748             command = rpmfuncs.install_packages_command(self.os, packages)
749         elif self.use_deb:
750             command = debfuncs.install_packages_command(self.os, packages)
751         else:
752             msg = "Error installing packages ( OS not known ) "
753             self.error(msg, self.os)
754             raise RuntimeError, msg
755
756         return command
757
758     def install_packages(self, packages, home, run_home = None,
759             raise_on_error = True):
760         """ Install packages in the Linux host.
761
762         'home' is the directory to upload the package installation script.
763         'run_home' is the directory from where to execute the script.
764         """
765         command = self.install_packages_command(packages)
766
767         run_home = run_home or home
768
769         (out, err), proc = self.run_and_wait(command, run_home, 
770             shfile = os.path.join(home, "instpkg.sh"),
771             pidfile = "instpkg_pidfile",
772             ecodefile = "instpkg_exitcode",
773             stdout = "instpkg_stdout", 
774             stderr = "instpkg_stderr",
775             overwrite = False,
776             raise_on_error = raise_on_error)
777
778         return (out, err), proc 
779
780     def remove_packages(self, packages, home, run_home = None,
781             raise_on_error = True):
782         """ Uninstall packages from the Linux host.
783
784         'home' is the directory to upload the package un-installation script.
785         'run_home' is the directory from where to execute the script.
786         """
787         if self.use_rpm:
788             command = rpmfuncs.remove_packages_command(self.os, packages)
789         elif self.use_deb:
790             command = debfuncs.remove_packages_command(self.os, packages)
791         else:
792             msg = "Error removing packages ( OS not known ) "
793             self.error(msg)
794             raise RuntimeError, msg
795
796         run_home = run_home or home
797
798         (out, err), proc = self.run_and_wait(command, run_home, 
799             shfile = os.path.join(home, "rmpkg.sh"),
800             pidfile = "rmpkg_pidfile",
801             ecodefile = "rmpkg_exitcode",
802             stdout = "rmpkg_stdout", 
803             stderr = "rmpkg_stderr",
804             overwrite = False,
805             raise_on_error = raise_on_error)
806          
807         return (out, err), proc 
808
809     def mkdir(self, paths, clean = False):
810         """ Paths is either a single remote directory path to create,
811         or a list of directories to create.
812         """
813         if clean:
814             self.rmdir(paths)
815
816         if isinstance(paths, str):
817             paths = [paths]
818
819         cmd = " ; ".join(map(lambda path: "mkdir -p %s" % path, paths))
820
821         return self.execute(cmd, with_lock = True)
822
823     def rmdir(self, paths):
824         """ Paths is either a single remote directory path to delete,
825         or a list of directories to delete.
826         """
827
828         if isinstance(paths, str):
829             paths = [paths]
830
831         cmd = " ; ".join(map(lambda path: "rm -rf %s" % path, paths))
832
833         return self.execute(cmd, with_lock = True)
834         
835     def run_and_wait(self, command, home, 
836             shfile = "cmd.sh",
837             env = None,
838             overwrite = True,
839             pidfile = "pidfile", 
840             ecodefile = "exitcode", 
841             stdin = None, 
842             stdout = "stdout", 
843             stderr = "stderr", 
844             sudo = False,
845             tty = False,
846             raise_on_error = True):
847         """
848         Uploads the 'command' to a bash script in the host.
849         Then runs the script detached in background in the host, and
850         busy-waites until the script finishes executing.
851         """
852
853         if not shfile.startswith("/"):
854             shfile = os.path.join(home, shfile)
855
856         self.upload_command(command, 
857             shfile = shfile, 
858             ecodefile = ecodefile, 
859             env = env,
860             overwrite = overwrite)
861
862         command = "bash %s" % shfile
863         # run command in background in remote host
864         (out, err), proc = self.run(command, home, 
865                 pidfile = pidfile,
866                 stdin = stdin, 
867                 stdout = stdout, 
868                 stderr = stderr, 
869                 sudo = sudo,
870                 tty = tty)
871
872         # check no errors occurred
873         if proc.poll():
874             msg = " Failed to run command '%s' " % command
875             self.error(msg, out, err)
876             if raise_on_error:
877                 raise RuntimeError, msg
878
879         # Wait for pid file to be generated
880         pid, ppid = self.wait_pid(
881                 home = home, 
882                 pidfile = pidfile, 
883                 raise_on_error = raise_on_error)
884
885         # wait until command finishes to execute
886         self.wait_run(pid, ppid)
887       
888         (eout, err), proc = self.check_errors(home,
889             ecodefile = ecodefile,
890             stderr = stderr)
891
892         # Out is what was written in the stderr file
893         if err:
894             msg = " Failed to run command '%s' " % command
895             self.error(msg, eout, err)
896
897             if raise_on_error:
898                 raise RuntimeError, msg
899
900         (out, oerr), proc = self.check_output(home, stdout)
901         
902         return (out, err), proc
903
904     def exitcode(self, home, ecodefile = "exitcode"):
905         """
906         Get the exit code of an application.
907         Returns an integer value with the exit code 
908         """
909         (out, err), proc = self.check_output(home, ecodefile)
910
911         # Succeeded to open file, return exit code in the file
912         if proc.wait() == 0:
913             try:
914                 return int(out.strip())
915             except:
916                 # Error in the content of the file!
917                 return ExitCode.CORRUPTFILE
918
919         # No such file or directory
920         if proc.returncode == 1:
921             return ExitCode.FILENOTFOUND
922         
923         # Other error from 'cat'
924         return ExitCode.ERROR
925
926     def upload_command(self, command, 
927             shfile = "cmd.sh",
928             ecodefile = "exitcode",
929             overwrite = True,
930             env = None):
931         """ Saves the command as a bash script file in the remote host, and
932         forces to save the exit code of the command execution to the ecodefile
933         """
934
935         if not (command.strip().endswith(";") or command.strip().endswith("&")):
936             command += ";"
937       
938         # The exit code of the command will be stored in ecodefile
939         command = " { %(command)s } ; echo $? > %(ecodefile)s ;" % {
940                 'command': command,
941                 'ecodefile': ecodefile,
942                 } 
943
944         # Export environment
945         environ = self.format_environment(env)
946
947         # Add environ to command
948         command = environ + command
949
950         return self.upload(command, shfile, text = True, overwrite = overwrite)
951
952     def format_environment(self, env, inline = False):
953         """ Formats the environment variables for a command to be executed
954         either as an inline command
955         (i.e. export PYTHONPATH=src/..; export LALAL= ..;python script.py) or 
956         as a bash script (i.e. export PYTHONPATH=src/.. \n export LALA=.. \n)
957         """
958         if not env: return ""
959
960         # Remove extra white spaces
961         env = re.sub(r'\s+', ' ', env.strip())
962
963         sep = ";" if inline else "\n"
964         return sep.join(map(lambda e: " export %s" % e, env.split(" "))) + sep 
965
966     def check_errors(self, home, 
967             ecodefile = "exitcode", 
968             stderr = "stderr"):
969         """ Checks whether errors occurred while running a command.
970         It first checks the exit code for the command, and only if the
971         exit code is an error one it returns the error output.
972
973         """
974         proc = None
975         err = ""
976
977         # get exit code saved in the 'exitcode' file
978         ecode = self.exitcode(home, ecodefile)
979
980         if ecode in [ ExitCode.CORRUPTFILE, ExitCode.ERROR ]:
981             err = "Error retrieving exit code status from file %s/%s" % (home, ecodefile)
982         elif ecode > 0 or ecode == ExitCode.FILENOTFOUND:
983             # The process returned an error code or didn't exist. 
984             # Check standard error.
985             (err, eerr), proc = self.check_output(home, stderr)
986
987             # If the stderr file was not found, assume nothing bad happened,
988             # and just ignore the error.
989             # (cat returns 1 for error "No such file or directory")
990             if ecode == ExitCode.FILENOTFOUND and proc.poll() == 1: 
991                 err = "" 
992             
993         return ("", err), proc
994  
995     def wait_pid(self, home, pidfile = "pidfile", raise_on_error = False):
996         """ Waits until the pid file for the command is generated, 
997             and returns the pid and ppid of the process """
998         pid = ppid = None
999         delay = 1.0
1000
1001         for i in xrange(2):
1002             pidtuple = self.getpid(home = home, pidfile = pidfile)
1003             
1004             if pidtuple:
1005                 pid, ppid = pidtuple
1006                 break
1007             else:
1008                 time.sleep(delay)
1009                 delay = delay * 1.5
1010         else:
1011             msg = " Failed to get pid for pidfile %s/%s " % (
1012                     home, pidfile )
1013             self.error(msg)
1014             
1015             if raise_on_error:
1016                 raise RuntimeError, msg
1017
1018         return pid, ppid
1019
1020     def wait_run(self, pid, ppid, trial = 0):
1021         """ wait for a remote process to finish execution """
1022         delay = 1.0
1023
1024         while True:
1025             status = self.status(pid, ppid)
1026             
1027             if status is ProcStatus.FINISHED:
1028                 break
1029             elif status is not ProcStatus.RUNNING:
1030                 delay = delay * 1.5
1031                 time.sleep(delay)
1032                 # If it takes more than 20 seconds to start, then
1033                 # asume something went wrong
1034                 if delay > 20:
1035                     break
1036             else:
1037                 # The app is running, just wait...
1038                 time.sleep(0.5)
1039
1040     def check_output(self, home, filename):
1041         """ Retrives content of file """
1042         (out, err), proc = self.execute("cat %s" % 
1043             os.path.join(home, filename), retry = 1, with_lock = True)
1044         return (out, err), proc
1045
1046     def is_alive(self):
1047         """ Checks if host is responsive
1048         """
1049         if self.localhost:
1050             return True
1051
1052         out = err = ""
1053         msg = "Unresponsive host. Wrong answer. "
1054
1055         # The underlying SSH layer will sometimes return an empty
1056         # output (even if the command was executed without errors).
1057         # To work arround this, repeat the operation N times or
1058         # until the result is not empty string
1059         try:
1060             (out, err), proc = self.execute("echo 'ALIVE'",
1061                     blocking = True,
1062                     with_lock = True)
1063     
1064             if out.find("ALIVE") > -1:
1065                 return True
1066         except:
1067             trace = traceback.format_exc()
1068             msg = "Unresponsive host. Error reaching host: %s " % trace
1069
1070         self.error(msg, out, err)
1071         return False
1072
1073     def find_home(self):
1074         """ Retrieves host home directory
1075         """
1076         # The underlying SSH layer will sometimes return an empty
1077         # output (even if the command was executed without errors).
1078         # To work arround this, repeat the operation N times or
1079         # until the result is not empty string
1080         msg = "Impossible to retrieve HOME directory"
1081         try:
1082             (out, err), proc = self.execute("echo ${HOME}",
1083                     blocking = True,
1084                     with_lock = True)
1085     
1086             if out.strip() != "":
1087                 self._home_dir =  out.strip()
1088         except:
1089             trace = traceback.format_exc()
1090             msg = "Impossible to retrieve HOME directory %s" % trace
1091
1092         if not self._home_dir:
1093             self.error(msg)
1094             raise RuntimeError, msg
1095
1096     def filter_existing_files(self, src, dst):
1097         """ Removes files that already exist in the Linux host from src list
1098         """
1099         # construct a dictionary with { dst: src }
1100         dests = dict(map(lambda s: (os.path.join(dst, os.path.basename(s)), s), src)) \
1101                     if len(src) > 1 else dict({dst: src[0]})
1102
1103         command = []
1104         for d in dests.keys():
1105             command.append(" [ -f %(dst)s ] && echo '%(dst)s' " % {'dst' : d} )
1106
1107         command = ";".join(command)
1108
1109         (out, err), proc = self.execute(command, retry = 1, with_lock = True)
1110     
1111         for d in dests.keys():
1112             if out.find(d) > -1:
1113                 del dests[d]
1114
1115         if not dests:
1116             return []
1117
1118         return dests.values()
1119