Fixing GRE tunnel between localhost and remote host
[nepi.git] / src / nepi / resources / linux / node.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19
20 from nepi.execution.attribute import Attribute, Flags, Types
21 from nepi.execution.resource import ResourceManager, clsinit_copy, \
22         ResourceState, reschedule_delay
23 from nepi.resources.linux import rpmfuncs, debfuncs 
24 from nepi.util import sshfuncs, execfuncs
25 from nepi.util.sshfuncs import ProcStatus
26
27 import collections
28 import os
29 import random
30 import re
31 import socket
32 import tempfile
33 import time
34 import threading
35 import traceback
36
37 # TODO: Unify delays!!
38 # TODO: Validate outcome of uploads!! 
39
40 class ExitCode:
41     """
42     Error codes that the rexitcode function can return if unable to
43     check the exit code of a spawned process
44     """
45     FILENOTFOUND = -1
46     CORRUPTFILE = -2
47     ERROR = -3
48     OK = 0
49
50 class OSType:
51     """
52     Supported flavors of Linux OS
53     """
54     FEDORA_8 = "f8"
55     FEDORA_12 = "f12"
56     FEDORA_14 = "f14"
57     FEDORA = "fedora"
58     UBUNTU = "ubuntu"
59     DEBIAN = "debian"
60
61 @clsinit_copy
62 class LinuxNode(ResourceManager):
63     """
64     .. class:: Class Args :
65       
66         :param ec: The Experiment controller
67         :type ec: ExperimentController
68         :param guid: guid of the RM
69         :type guid: int
70
71     .. note::
72
73         There are different ways in which commands can be executed using the
74         LinuxNode interface (i.e. 'execute' - blocking and non blocking, 'run',
75         'run_and_wait'). 
76         
77         Brief explanation:
78
79             * 'execute' (blocking mode) :  
80
81                      HOW IT WORKS: 'execute', forks a process and run the
82                      command, synchronously, attached to the terminal, in
83                      foreground.
84                      The execute method will block until the command returns
85                      the result on 'out', 'err' (so until it finishes executing).
86   
87                      USAGE: short-lived commands that must be executed attached
88                      to a terminal and in foreground, for which it IS necessary
89                      to block until the command has finished (e.g. if you want
90                      to run 'ls' or 'cat').
91
92             * 'execute' (NON blocking mode - blocking = False) :
93
94                     HOW IT WORKS: Same as before, except that execute method
95                     will return immediately (even if command still running).
96
97                     USAGE: long-lived commands that must be executed attached
98                     to a terminal and in foreground, but for which it is not
99                     necessary to block until the command has finished. (e.g.
100                     start an application using X11 forwarding)
101
102              * 'run' :
103
104                    HOW IT WORKS: Connects to the host ( using SSH if remote)
105                    and launches the command in background, detached from any
106                    terminal (daemonized), and returns. The command continues to
107                    run remotely, but since it is detached from the terminal,
108                    its pipes (stdin, stdout, stderr) can't be redirected to the
109                    console (as normal non detached processes would), and so they
110                    are explicitly redirected to files. The pidfile is created as
111                    part of the process of launching the command. The pidfile
112                    holds the pid and ppid of the process forked in background,
113                    so later on it is possible to check whether the command is still
114                    running.
115
116                     USAGE: long-lived commands that can run detached in background,
117                     for which it is NOT necessary to block (wait) until the command
118                     has finished. (e.g. start an application that is not using X11
119                     forwarding. It can run detached and remotely in background)
120
121              * 'run_and_wait' :
122
123                     HOW IT WORKS: Similar to 'run' except that it 'blocks' until
124                     the command has finished execution. It also checks whether
125                     errors occurred during runtime by reading the exitcode file,
126                     which contains the exit code of the command that was run
127                     (checking stderr only is not always reliable since many
128                     commands throw debugging info to stderr and the only way to
129                     automatically know whether an error really happened is to
130                     check the process exit code).
131
132                     Another difference with respect to 'run', is that instead
133                     of directly executing the command as a bash command line,
134                     it uploads the command to a bash script and runs the script.
135                     This allows to use the bash script to debug errors, since
136                     it remains at the remote host and can be run manually to
137                     reproduce the error.
138                   
139                     USAGE: medium-lived commands that can run detached in
140                     background, for which it IS necessary to block (wait) until
141                     the command has finished. (e.g. Package installation,
142                     source compilation, file download, etc)
143
144     """
145     _rtype = "LinuxNode"
146     _help = "Controls Linux host machines ( either localhost or a host " \
147             "that can be accessed using a SSH key)"
148     _backend_type = "linux"
149
150     @classmethod
151     def _register_attributes(cls):
152         hostname = Attribute("hostname", "Hostname of the machine",
153                 flags = Flags.Design)
154
155         username = Attribute("username", "Local account username", 
156                 flags = Flags.Credential)
157
158         port = Attribute("port", "SSH port", flags = Flags.Design)
159         
160         home = Attribute("home",
161                 "Experiment home directory to store all experiment related files",
162                 flags = Flags.Design)
163         
164         identity = Attribute("identity", "SSH identity file",
165                 flags = Flags.Credential)
166         
167         server_key = Attribute("serverKey", "Server public key", 
168                 flags = Flags.Design)
169         
170         clean_home = Attribute("cleanHome", "Remove all nepi files and directories "
171                 " from node home folder before starting experiment", 
172                 type = Types.Bool,
173                 default = False,
174                 flags = Flags.Design)
175
176         clean_experiment = Attribute("cleanExperiment", "Remove all files and directories " 
177                 " from a previous same experiment, before the new experiment starts", 
178                 type = Types.Bool,
179                 default = False,
180                 flags = Flags.Design)
181         
182         clean_processes = Attribute("cleanProcesses", 
183                 "Kill all running processes before starting experiment",
184                 type = Types.Bool,
185                 default = False,
186                 flags = Flags.Design)
187         
188         tear_down = Attribute("tearDown", "Bash script to be executed before " + \
189                 "releasing the resource",
190                 flags = Flags.Design)
191
192         gateway_user = Attribute("gatewayUser", "Gateway account username",
193                 flags = Flags.Design)
194
195         gateway = Attribute("gateway", "Hostname of the gateway machine",
196                 flags = Flags.Design)
197
198         ip = Attribute("ip", "Linux host public IP address",
199                     flags = Flags.NoWrite)
200
201         cls._register_attribute(hostname)
202         cls._register_attribute(username)
203         cls._register_attribute(port)
204         cls._register_attribute(home)
205         cls._register_attribute(identity)
206         cls._register_attribute(server_key)
207         cls._register_attribute(clean_home)
208         cls._register_attribute(clean_experiment)
209         cls._register_attribute(clean_processes)
210         cls._register_attribute(tear_down)
211         cls._register_attribute(gateway_user)
212         cls._register_attribute(gateway)
213         cls._register_attribute(ip)
214
215     def __init__(self, ec, guid):
216         super(LinuxNode, self).__init__(ec, guid)
217         self._os = None
218         # home directory at Linux host
219         self._home_dir = ""
220
221         # lock to prevent concurrent applications on the same node,
222         # to execute commands at the same time. There are potential
223         # concurrency issues when using SSH to a same host from 
224         # multiple threads. There are also possible operational 
225         # issues, e.g. an application querying the existence 
226         # of a file or folder prior to its creation, and another 
227         # application creating the same file or folder in between.
228         self._node_lock = threading.Lock()
229     
230     def log_message(self, msg):
231         return " guid %d - host %s - %s " % (self.guid, 
232                 self.get("hostname"), msg)
233
234     @property
235     def home_dir(self):
236         home = self.get("home") or ""
237         if not home.startswith("/"):
238            home = os.path.join(self._home_dir, home) 
239         return home
240
241     @property
242     def nepi_home(self):
243         return os.path.join(self.home_dir, ".nepi")
244
245     @property
246     def usr_dir(self):
247         return os.path.join(self.nepi_home, "nepi-usr")
248
249     @property
250     def lib_dir(self):
251         return os.path.join(self.usr_dir, "lib")
252
253     @property
254     def bin_dir(self):
255         return os.path.join(self.usr_dir, "bin")
256
257     @property
258     def src_dir(self):
259         return os.path.join(self.usr_dir, "src")
260
261     @property
262     def share_dir(self):
263         return os.path.join(self.usr_dir, "share")
264
265     @property
266     def exp_dir(self):
267         return os.path.join(self.nepi_home, "nepi-exp")
268
269     @property
270     def exp_home(self):
271         return os.path.join(self.exp_dir, self.ec.exp_id)
272
273     @property
274     def node_home(self):
275         return os.path.join(self.exp_home, "node-%d" % self.guid)
276
277     @property
278     def run_home(self):
279         return os.path.join(self.node_home, self.ec.run_id)
280
281     @property
282     def os(self):
283         if self._os:
284             return self._os
285
286         if not self.localhost and not self.get("username"):
287             msg = "Can't resolve OS, insufficient data "
288             self.error(msg)
289             raise RuntimeError, msg
290
291         out = self.get_os()
292
293         if out.find("Fedora release 8") == 0:
294             self._os = OSType.FEDORA_8
295         elif out.find("Fedora release 12") == 0:
296             self._os = OSType.FEDORA_12
297         elif out.find("Fedora release 14") == 0:
298             self._os = OSType.FEDORA_14
299         elif out.find("Fedora release") == 0:
300             self._os = OSType.FEDORA
301         elif out.find("Debian") == 0: 
302             self._os = OSType.DEBIAN
303         elif out.find("Ubuntu") ==0:
304             self._os = OSType.UBUNTU
305         else:
306             msg = "Unsupported OS"
307             self.error(msg, out)
308             raise RuntimeError, "%s - %s " %( msg, out )
309
310         return self._os
311
312     def get_os(self):
313         # The underlying SSH layer will sometimes return an empty
314         # output (even if the command was executed without errors).
315         # To work arround this, repeat the operation N times or
316         # until the result is not empty string
317         out = ""
318         try:
319             (out, err), proc = self.execute("cat /etc/issue", 
320                     with_lock = True,
321                     blocking = True)
322         except:
323             trace = traceback.format_exc()
324             msg = "Error detecting OS: %s " % trace
325             self.error(msg, out, err)
326         
327         return out
328
329     @property
330     def use_deb(self):
331         return self.os in [OSType.DEBIAN, OSType.UBUNTU]
332
333     @property
334     def use_rpm(self):
335         return self.os in [OSType.FEDORA_12, OSType.FEDORA_14, OSType.FEDORA_8,
336                 OSType.FEDORA]
337
338     @property
339     def localhost(self):
340         return self.get("hostname") in ['localhost', '127.0.0.1', '::1']
341
342     def do_provision(self):
343         # check if host is alive
344         if not self.is_alive():
345             msg = "Deploy failed. Unresponsive node %s" % self.get("hostname")
346             self.error(msg)
347             raise RuntimeError, msg
348
349         self.find_home()
350
351         if self.get("cleanProcesses"):
352             self.clean_processes()
353
354         if self.get("cleanHome"):
355             self.clean_home()
356  
357         if self.get("cleanExperiment"):
358             self.clean_experiment()
359     
360         # Create shared directory structure and node home directory
361         paths = [self.lib_dir, 
362             self.bin_dir, 
363             self.src_dir, 
364             self.share_dir, 
365             self.node_home]
366
367         self.mkdir(paths)
368
369         # Get Public IP address
370         if self.localhost:
371             ip = socket.gethostbyname(socket.gethostname())
372         else:
373             ip = socket.gethostbyname(self.get("hostname"))
374
375         self.set("ip", ip)
376
377         super(LinuxNode, self).do_provision()
378
379     def do_deploy(self):
380         if self.state == ResourceState.NEW:
381             self.info("Deploying node")
382             self.do_discover()
383             self.do_provision()
384
385         # Node needs to wait until all associated interfaces are 
386         # ready before it can finalize deployment
387         from nepi.resources.linux.interface import LinuxInterface
388         ifaces = self.get_connected(LinuxInterface.get_rtype())
389         for iface in ifaces:
390             if iface.state < ResourceState.READY:
391                 self.ec.schedule(reschedule_delay, self.deploy)
392                 return 
393
394         super(LinuxNode, self).do_deploy()
395
396     def do_release(self):
397         rms = self.get_connected()
398         for rm in rms:
399             # Node needs to wait until all associated RMs are released
400             # before it can be released
401             if rm.state != ResourceState.RELEASED:
402                 self.ec.schedule(reschedule_delay, self.release)
403                 return 
404
405         tear_down = self.get("tearDown")
406         if tear_down:
407             self.execute(tear_down)
408
409         self.clean_processes()
410
411         super(LinuxNode, self).do_release()
412
413     def valid_connection(self, guid):
414         # TODO: Validate!
415         return True
416
417     def clean_processes(self):
418         self.info("Cleaning up processes")
419
420         if self.localhost:
421             return 
422         
423         if self.get("username") != 'root':
424             cmd = ("sudo -S killall tcpdump || /bin/true ; " +
425                 "sudo -S killall -u %s || /bin/true ; " % self.get("username"))
426         else:
427             if self.state >= ResourceState.READY:
428                 import pickle
429                 pids = pickle.load(open("/tmp/save.proc", "rb"))
430                 pids_temp = dict()
431                 ps_aux = "ps aux |awk '{print $2,$11}'"
432                 (out, err), proc = self.execute(ps_aux)
433                 if len(out) != 0:
434                     for line in out.strip().split("\n"):
435                         parts = line.strip().split(" ")
436                         pids_temp[parts[0]] = parts[1]
437                     kill_pids = set(pids_temp.items()) - set(pids.items())
438                     kill_pids = ' '.join(dict(kill_pids).keys())
439
440                     cmd = ("killall tcpdump || /bin/true ; " +
441                         "kill $(ps aux | grep '[n]epi' | awk '{print $2}') || /bin/true ; " +
442                         "kill %s || /bin/true ; " % kill_pids)
443                 else:
444                     cmd = ("killall tcpdump || /bin/true ; " +
445                         "kill $(ps aux | grep '[n]epi' | awk '{print $2}') || /bin/true ; ")
446             else:
447                 cmd = ("killall tcpdump || /bin/true ; " +
448                     "kill $(ps aux | grep '[n]epi' | awk '{print $2}') || /bin/true ; ")
449
450         (out, err), proc = self.execute(cmd, retry = 1, with_lock = True)
451
452     def clean_home(self):
453         """ Cleans all NEPI related folders in the Linux host
454         """
455         self.info("Cleaning up home")
456         
457         cmd = "cd %s ; find . -maxdepth 1 -name \.nepi -execdir rm -rf {} + " % (
458                 self.home_dir )
459
460         return self.execute(cmd, with_lock = True)
461
462     def clean_experiment(self):
463         """ Cleans all experiment related files in the Linux host.
464         It preserves NEPI files and folders that have a multi experiment
465         scope.
466         """
467         self.info("Cleaning up experiment files")
468         
469         cmd = "cd %s ; find . -maxdepth 1 -name '%s' -execdir rm -rf {} + " % (
470                 self.exp_dir,
471                 self.ec.exp_id )
472             
473         return self.execute(cmd, with_lock = True)
474
475     def execute(self, command,
476             sudo = False,
477             env = None,
478             tty = False,
479             forward_x11 = False,
480             retry = 3,
481             connect_timeout = 30,
482             strict_host_checking = False,
483             persistent = True,
484             blocking = True,
485             with_lock = False
486             ):
487         """ Notice that this invocation will block until the
488         execution finishes. If this is not the desired behavior,
489         use 'run' instead."""
490
491         if self.localhost:
492             (out, err), proc = execfuncs.lexec(command, 
493                     user = self.get("username"), # still problem with localhost
494                     sudo = sudo,
495                     env = env)
496         else:
497             if with_lock:
498                 # If the execute command is blocking, we don't want to keep
499                 # the node lock. This lock is used to avoid race conditions
500                 # when creating the ControlMaster sockets. A more elegant
501                 # solution is needed.
502                 with self._node_lock:
503                     (out, err), proc = sshfuncs.rexec(
504                         command, 
505                         host = self.get("hostname"),
506                         user = self.get("username"),
507                         port = self.get("port"),
508                         gwuser = self.get("gatewayUser"),
509                         gw = self.get("gateway"),
510                         agent = True,
511                         sudo = sudo,
512                         identity = self.get("identity"),
513                         server_key = self.get("serverKey"),
514                         env = env,
515                         tty = tty,
516                         forward_x11 = forward_x11,
517                         retry = retry,
518                         connect_timeout = connect_timeout,
519                         persistent = persistent,
520                         blocking = blocking, 
521                         strict_host_checking = strict_host_checking
522                         )
523             else:
524                 (out, err), proc = sshfuncs.rexec(
525                     command, 
526                     host = self.get("hostname"),
527                     user = self.get("username"),
528                     port = self.get("port"),
529                     gwuser = self.get("gatewayUser"),
530                     gw = self.get("gateway"),
531                     agent = True,
532                     sudo = sudo,
533                     identity = self.get("identity"),
534                     server_key = self.get("serverKey"),
535                     env = env,
536                     tty = tty,
537                     forward_x11 = forward_x11,
538                     retry = retry,
539                     connect_timeout = connect_timeout,
540                     persistent = persistent,
541                     blocking = blocking, 
542                     strict_host_checking = strict_host_checking
543                     )
544
545         return (out, err), proc
546
547     def run(self, command, home,
548             create_home = False,
549             pidfile = 'pidfile',
550             stdin = None, 
551             stdout = 'stdout', 
552             stderr = 'stderr', 
553             sudo = False,
554             tty = False):
555         
556         self.debug("Running command '%s'" % command)
557         
558         if self.localhost:
559             (out, err), proc = execfuncs.lspawn(command, pidfile,
560                     home = home, 
561                     create_home = create_home, 
562                     stdin = stdin or '/dev/null',
563                     stdout = stdout or '/dev/null',
564                     stderr = stderr or '/dev/null',
565                     sudo = sudo) 
566         else:
567             with self._node_lock:
568                 (out, err), proc = sshfuncs.rspawn(
569                     command,
570                     pidfile = pidfile,
571                     home = home,
572                     create_home = create_home,
573                     stdin = stdin or '/dev/null',
574                     stdout = stdout or '/dev/null',
575                     stderr = stderr or '/dev/null',
576                     sudo = sudo,
577                     host = self.get("hostname"),
578                     user = self.get("username"),
579                     port = self.get("port"),
580                     gwuser = self.get("gatewayUser"),
581                     gw = self.get("gateway"),
582                     agent = True,
583                     identity = self.get("identity"),
584                     server_key = self.get("serverKey"),
585                     tty = tty
586                     )
587
588         return (out, err), proc
589
590     def getpid(self, home, pidfile = "pidfile"):
591         if self.localhost:
592             pidtuple =  execfuncs.lgetpid(os.path.join(home, pidfile))
593         else:
594             with self._node_lock:
595                 pidtuple = sshfuncs.rgetpid(
596                     os.path.join(home, pidfile),
597                     host = self.get("hostname"),
598                     user = self.get("username"),
599                     port = self.get("port"),
600                     gwuser = self.get("gatewayUser"),
601                     gw = self.get("gateway"),
602                     agent = True,
603                     identity = self.get("identity"),
604                     server_key = self.get("serverKey")
605                     )
606         
607         return pidtuple
608
609     def status(self, pid, ppid):
610         if self.localhost:
611             status = execfuncs.lstatus(pid, ppid)
612         else:
613             with self._node_lock:
614                 status = sshfuncs.rstatus(
615                         pid, ppid,
616                         host = self.get("hostname"),
617                         user = self.get("username"),
618                         port = self.get("port"),
619                         gwuser = self.get("gatewayUser"),
620                         gw = self.get("gateway"),
621                         agent = True,
622                         identity = self.get("identity"),
623                         server_key = self.get("serverKey")
624                         )
625            
626         return status
627     
628     def kill(self, pid, ppid, sudo = False):
629         out = err = ""
630         proc = None
631         status = self.status(pid, ppid)
632
633         if status == sshfuncs.ProcStatus.RUNNING:
634             if self.localhost:
635                 (out, err), proc = execfuncs.lkill(pid, ppid, sudo)
636             else:
637                 with self._node_lock:
638                     (out, err), proc = sshfuncs.rkill(
639                         pid, ppid,
640                         host = self.get("hostname"),
641                         user = self.get("username"),
642                         port = self.get("port"),
643                         gwuser = self.get("gatewayUser"),
644                         gw = self.get("gateway"),
645                         agent = True,
646                         sudo = sudo,
647                         identity = self.get("identity"),
648                         server_key = self.get("serverKey")
649                         )
650
651         return (out, err), proc
652
653     def copy(self, src, dst):
654         if self.localhost:
655             (out, err), proc = execfuncs.lcopy(src, dst, 
656                     recursive = True)
657         else:
658             with self._node_lock:
659                 (out, err), proc = sshfuncs.rcopy(
660                     src, dst, 
661                     port = self.get("port"),
662                     gwuser = self.get("gatewayUser"),
663                     gw = self.get("gateway"),
664                     identity = self.get("identity"),
665                     server_key = self.get("serverKey"),
666                     recursive = True,
667                     strict_host_checking = False)
668
669         return (out, err), proc
670
671     def upload(self, src, dst, text = False, overwrite = True,
672             raise_on_error = True):
673         """ Copy content to destination
674
675         src  string with the content to copy. Can be:
676             - plain text
677             - a string with the path to a local file
678             - a string with a semi-colon separeted list of local files
679             - a string with a local directory
680
681         dst  string with destination path on the remote host (remote is 
682             always self.host)
683
684         text src is text input, it must be stored into a temp file before 
685         uploading
686         """
687         # If source is a string input 
688         f = None
689         if text and not os.path.isfile(src):
690             # src is text input that should be uploaded as file
691             # create a temporal file with the content to upload
692             f = tempfile.NamedTemporaryFile(delete=False)
693             f.write(src)
694             f.close()
695             src = f.name
696
697         # If dst files should not be overwritten, check that the files do not
698         # exits already
699         if isinstance(src, str):
700             src = map(str.strip, src.split(";"))
701     
702         if overwrite == False:
703             src = self.filter_existing_files(src, dst)
704             if not src:
705                 return ("", ""), None
706
707         if not self.localhost:
708             # Build destination as <user>@<server>:<path>
709             dst = "%s@%s:%s" % (self.get("username"), self.get("hostname"), dst)
710
711         ((out, err), proc) = self.copy(src, dst)
712
713         # clean up temp file
714         if f:
715             os.remove(f.name)
716
717         if err:
718             msg = " Failed to upload files - src: %s dst: %s" %  (";".join(src), dst) 
719             self.error(msg, out, err)
720             
721             msg = "%s out: %s err: %s" % (msg, out, err)
722             if raise_on_error:
723                 raise RuntimeError, msg
724
725         return ((out, err), proc)
726
727     def download(self, src, dst, raise_on_error = True):
728         if not self.localhost:
729             # Build destination as <user>@<server>:<path>
730             src = "%s@%s:%s" % (self.get("username"), self.get("hostname"), src)
731
732         ((out, err), proc) = self.copy(src, dst)
733
734         if err:
735             msg = " Failed to download files - src: %s dst: %s" %  (";".join(src), dst) 
736             self.error(msg, out, err)
737
738             if raise_on_error:
739                 raise RuntimeError, msg
740
741         return ((out, err), proc)
742
743     def install_packages_command(self, packages):
744         command = ""
745         if self.use_rpm:
746             command = rpmfuncs.install_packages_command(self.os, packages)
747         elif self.use_deb:
748             command = debfuncs.install_packages_command(self.os, packages)
749         else:
750             msg = "Error installing packages ( OS not known ) "
751             self.error(msg, self.os)
752             raise RuntimeError, msg
753
754         return command
755
756     def install_packages(self, packages, home, run_home = None,
757             raise_on_error = True):
758         """ Install packages in the Linux host.
759
760         'home' is the directory to upload the package installation script.
761         'run_home' is the directory from where to execute the script.
762         """
763         command = self.install_packages_command(packages)
764
765         run_home = run_home or home
766
767         (out, err), proc = self.run_and_wait(command, run_home, 
768             shfile = os.path.join(home, "instpkg.sh"),
769             pidfile = "instpkg_pidfile",
770             ecodefile = "instpkg_exitcode",
771             stdout = "instpkg_stdout", 
772             stderr = "instpkg_stderr",
773             overwrite = False,
774             raise_on_error = raise_on_error)
775
776         return (out, err), proc 
777
778     def remove_packages(self, packages, home, run_home = None,
779             raise_on_error = True):
780         """ Uninstall packages from the Linux host.
781
782         'home' is the directory to upload the package un-installation script.
783         'run_home' is the directory from where to execute the script.
784         """
785         if self.use_rpm:
786             command = rpmfuncs.remove_packages_command(self.os, packages)
787         elif self.use_deb:
788             command = debfuncs.remove_packages_command(self.os, packages)
789         else:
790             msg = "Error removing packages ( OS not known ) "
791             self.error(msg)
792             raise RuntimeError, msg
793
794         run_home = run_home or home
795
796         (out, err), proc = self.run_and_wait(command, run_home, 
797             shfile = os.path.join(home, "rmpkg.sh"),
798             pidfile = "rmpkg_pidfile",
799             ecodefile = "rmpkg_exitcode",
800             stdout = "rmpkg_stdout", 
801             stderr = "rmpkg_stderr",
802             overwrite = False,
803             raise_on_error = raise_on_error)
804          
805         return (out, err), proc 
806
807     def mkdir(self, paths, clean = False):
808         """ Paths is either a single remote directory path to create,
809         or a list of directories to create.
810         """
811         if clean:
812             self.rmdir(paths)
813
814         if isinstance(paths, str):
815             paths = [paths]
816
817         cmd = " ; ".join(map(lambda path: "mkdir -p %s" % path, paths))
818
819         return self.execute(cmd, with_lock = True)
820
821     def rmdir(self, paths):
822         """ Paths is either a single remote directory path to delete,
823         or a list of directories to delete.
824         """
825
826         if isinstance(paths, str):
827             paths = [paths]
828
829         cmd = " ; ".join(map(lambda path: "rm -rf %s" % path, paths))
830
831         return self.execute(cmd, with_lock = True)
832         
833     def run_and_wait(self, command, home, 
834             shfile = "cmd.sh",
835             env = None,
836             overwrite = True,
837             pidfile = "pidfile", 
838             ecodefile = "exitcode", 
839             stdin = None, 
840             stdout = "stdout", 
841             stderr = "stderr", 
842             sudo = False,
843             tty = False,
844             raise_on_error = True):
845         """
846         Uploads the 'command' to a bash script in the host.
847         Then runs the script detached in background in the host, and
848         busy-waites until the script finishes executing.
849         """
850
851         if not shfile.startswith("/"):
852             shfile = os.path.join(home, shfile)
853
854         self.upload_command(command, 
855             shfile = shfile, 
856             ecodefile = ecodefile, 
857             env = env,
858             overwrite = overwrite)
859
860         command = "bash %s" % shfile
861         # run command in background in remote host
862         (out, err), proc = self.run(command, home, 
863                 pidfile = pidfile,
864                 stdin = stdin, 
865                 stdout = stdout, 
866                 stderr = stderr, 
867                 sudo = sudo,
868                 tty = tty)
869
870         # check no errors occurred
871         if proc.poll():
872             msg = " Failed to run command '%s' " % command
873             self.error(msg, out, err)
874             if raise_on_error:
875                 raise RuntimeError, msg
876
877         # Wait for pid file to be generated
878         pid, ppid = self.wait_pid(
879                 home = home, 
880                 pidfile = pidfile, 
881                 raise_on_error = raise_on_error)
882
883         # wait until command finishes to execute
884         self.wait_run(pid, ppid)
885       
886         (eout, err), proc = self.check_errors(home,
887             ecodefile = ecodefile,
888             stderr = stderr)
889
890         # Out is what was written in the stderr file
891         if err:
892             msg = " Failed to run command '%s' " % command
893             self.error(msg, eout, err)
894
895             if raise_on_error:
896                 raise RuntimeError, msg
897
898         (out, oerr), proc = self.check_output(home, stdout)
899         
900         return (out, err), proc
901
902     def exitcode(self, home, ecodefile = "exitcode"):
903         """
904         Get the exit code of an application.
905         Returns an integer value with the exit code 
906         """
907         (out, err), proc = self.check_output(home, ecodefile)
908
909         # Succeeded to open file, return exit code in the file
910         if proc.wait() == 0:
911             try:
912                 return int(out.strip())
913             except:
914                 # Error in the content of the file!
915                 return ExitCode.CORRUPTFILE
916
917         # No such file or directory
918         if proc.returncode == 1:
919             return ExitCode.FILENOTFOUND
920         
921         # Other error from 'cat'
922         return ExitCode.ERROR
923
924     def upload_command(self, command, 
925             shfile = "cmd.sh",
926             ecodefile = "exitcode",
927             overwrite = True,
928             env = None):
929         """ Saves the command as a bash script file in the remote host, and
930         forces to save the exit code of the command execution to the ecodefile
931         """
932
933         if not (command.strip().endswith(";") or command.strip().endswith("&")):
934             command += ";"
935       
936         # The exit code of the command will be stored in ecodefile
937         command = " { %(command)s } ; echo $? > %(ecodefile)s ;" % {
938                 'command': command,
939                 'ecodefile': ecodefile,
940                 } 
941
942         # Export environment
943         environ = self.format_environment(env)
944
945         # Add environ to command
946         command = environ + command
947
948         return self.upload(command, shfile, text = True, overwrite = overwrite)
949
950     def format_environment(self, env, inline = False):
951         """ Formats the environment variables for a command to be executed
952         either as an inline command
953         (i.e. export PYTHONPATH=src/..; export LALAL= ..;python script.py) or 
954         as a bash script (i.e. export PYTHONPATH=src/.. \n export LALA=.. \n)
955         """
956         if not env: return ""
957
958         # Remove extra white spaces
959         env = re.sub(r'\s+', ' ', env.strip())
960
961         sep = ";" if inline else "\n"
962         return sep.join(map(lambda e: " export %s" % e, env.split(" "))) + sep 
963
964     def check_errors(self, home, 
965             ecodefile = "exitcode", 
966             stderr = "stderr"):
967         """ Checks whether errors occurred while running a command.
968         It first checks the exit code for the command, and only if the
969         exit code is an error one it returns the error output.
970
971         """
972         proc = None
973         err = ""
974
975         # get exit code saved in the 'exitcode' file
976         ecode = self.exitcode(home, ecodefile)
977
978         if ecode in [ ExitCode.CORRUPTFILE, ExitCode.ERROR ]:
979             err = "Error retrieving exit code status from file %s/%s" % (home, ecodefile)
980         elif ecode > 0 or ecode == ExitCode.FILENOTFOUND:
981             # The process returned an error code or didn't exist. 
982             # Check standard error.
983             (err, eerr), proc = self.check_output(home, stderr)
984
985             # If the stderr file was not found, assume nothing bad happened,
986             # and just ignore the error.
987             # (cat returns 1 for error "No such file or directory")
988             if ecode == ExitCode.FILENOTFOUND and proc.poll() == 1: 
989                 err = "" 
990             
991         return ("", err), proc
992  
993     def wait_pid(self, home, pidfile = "pidfile", raise_on_error = False):
994         """ Waits until the pid file for the command is generated, 
995             and returns the pid and ppid of the process """
996         pid = ppid = None
997         delay = 1.0
998
999         for i in xrange(2):
1000             pidtuple = self.getpid(home = home, pidfile = pidfile)
1001             
1002             if pidtuple:
1003                 pid, ppid = pidtuple
1004                 break
1005             else:
1006                 time.sleep(delay)
1007                 delay = delay * 1.5
1008         else:
1009             msg = " Failed to get pid for pidfile %s/%s " % (
1010                     home, pidfile )
1011             self.error(msg)
1012             
1013             if raise_on_error:
1014                 raise RuntimeError, msg
1015
1016         return pid, ppid
1017
1018     def wait_run(self, pid, ppid, trial = 0):
1019         """ wait for a remote process to finish execution """
1020         delay = 1.0
1021
1022         while True:
1023             status = self.status(pid, ppid)
1024             
1025             if status is ProcStatus.FINISHED:
1026                 break
1027             elif status is not ProcStatus.RUNNING:
1028                 delay = delay * 1.5
1029                 time.sleep(delay)
1030                 # If it takes more than 20 seconds to start, then
1031                 # asume something went wrong
1032                 if delay > 20:
1033                     break
1034             else:
1035                 # The app is running, just wait...
1036                 time.sleep(0.5)
1037
1038     def check_output(self, home, filename):
1039         """ Retrives content of file """
1040         (out, err), proc = self.execute("cat %s" % 
1041             os.path.join(home, filename), retry = 1, with_lock = True)
1042         return (out, err), proc
1043
1044     def is_alive(self):
1045         """ Checks if host is responsive
1046         """
1047         if self.localhost:
1048             return True
1049
1050         out = err = ""
1051         msg = "Unresponsive host. Wrong answer. "
1052
1053         # The underlying SSH layer will sometimes return an empty
1054         # output (even if the command was executed without errors).
1055         # To work arround this, repeat the operation N times or
1056         # until the result is not empty string
1057         try:
1058             (out, err), proc = self.execute("echo 'ALIVE'",
1059                     blocking = True,
1060                     with_lock = True)
1061     
1062             if out.find("ALIVE") > -1:
1063                 return True
1064         except:
1065             trace = traceback.format_exc()
1066             msg = "Unresponsive host. Error reaching host: %s " % trace
1067
1068         self.error(msg, out, err)
1069         return False
1070
1071     def find_home(self):
1072         """ Retrieves host home directory
1073         """
1074         # The underlying SSH layer will sometimes return an empty
1075         # output (even if the command was executed without errors).
1076         # To work arround this, repeat the operation N times or
1077         # until the result is not empty string
1078         msg = "Impossible to retrieve HOME directory"
1079         try:
1080             (out, err), proc = self.execute("echo ${HOME}",
1081                     blocking = True,
1082                     with_lock = True)
1083     
1084             if out.strip() != "":
1085                 self._home_dir =  out.strip()
1086         except:
1087             trace = traceback.format_exc()
1088             msg = "Impossible to retrieve HOME directory %s" % trace
1089
1090         if not self._home_dir:
1091             self.error(msg)
1092             raise RuntimeError, msg
1093
1094     def filter_existing_files(self, src, dst):
1095         """ Removes files that already exist in the Linux host from src list
1096         """
1097         # construct a dictionary with { dst: src }
1098         dests = dict(map(lambda s: (os.path.join(dst, os.path.basename(s)), s), src)) \
1099                     if len(src) > 1 else dict({dst: src[0]})
1100
1101         command = []
1102         for d in dests.keys():
1103             command.append(" [ -f %(dst)s ] && echo '%(dst)s' " % {'dst' : d} )
1104
1105         command = ";".join(command)
1106
1107         (out, err), proc = self.execute(command, retry = 1, with_lock = True)
1108     
1109         for d in dests.keys():
1110             if out.find(d) > -1:
1111                 del dests[d]
1112
1113         if not dests:
1114             return []
1115
1116         return dests.values()
1117