Merging ns-3 into nepi-3-dev
[nepi.git] / src / nepi / resources / linux / node.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19
20 from nepi.execution.attribute import Attribute, Flags, Types
21 from nepi.execution.resource import ResourceManager, clsinit_copy, \
22         ResourceState, reschedule_delay
23 from nepi.resources.linux import rpmfuncs, debfuncs 
24 from nepi.util import sshfuncs, execfuncs
25 from nepi.util.sshfuncs import ProcStatus
26
27 import collections
28 import os
29 import random
30 import re
31 import tempfile
32 import time
33 import threading
34 import traceback
35
36 # TODO: Unify delays!!
37 # TODO: Validate outcome of uploads!! 
38
39 class ExitCode:
40     """
41     Error codes that the rexitcode function can return if unable to
42     check the exit code of a spawned process
43     """
44     FILENOTFOUND = -1
45     CORRUPTFILE = -2
46     ERROR = -3
47     OK = 0
48
49 class OSType:
50     """
51     Supported flavors of Linux OS
52     """
53     FEDORA_8 = "f8"
54     FEDORA_12 = "f12"
55     FEDORA_14 = "f14"
56     FEDORA = "fedora"
57     UBUNTU = "ubuntu"
58     DEBIAN = "debian"
59
60 @clsinit_copy
61 class LinuxNode(ResourceManager):
62     """
63     .. class:: Class Args :
64       
65         :param ec: The Experiment controller
66         :type ec: ExperimentController
67         :param guid: guid of the RM
68         :type guid: int
69
70     .. note::
71
72         There are different ways in which commands can be executed using the
73         LinuxNode interface (i.e. 'execute' - blocking and non blocking, 'run',
74         'run_and_wait'). 
75         
76         Brief explanation:
77
78             * 'execute' (blocking mode) :  
79
80                      HOW IT WORKS: 'execute', forks a process and run the
81                      command, synchronously, attached to the terminal, in
82                      foreground.
83                      The execute method will block until the command returns
84                      the result on 'out', 'err' (so until it finishes executing).
85   
86                      USAGE: short-lived commands that must be executed attached
87                      to a terminal and in foreground, for which it IS necessary
88                      to block until the command has finished (e.g. if you want
89                      to run 'ls' or 'cat').
90
91             * 'execute' (NON blocking mode - blocking = False) :
92
93                     HOW IT WORKS: Same as before, except that execute method
94                     will return immediately (even if command still running).
95
96                     USAGE: long-lived commands that must be executed attached
97                     to a terminal and in foreground, but for which it is not
98                     necessary to block until the command has finished. (e.g.
99                     start an application using X11 forwarding)
100
101              * 'run' :
102
103                    HOW IT WORKS: Connects to the host ( using SSH if remote)
104                    and launches the command in background, detached from any
105                    terminal (daemonized), and returns. The command continues to
106                    run remotely, but since it is detached from the terminal,
107                    its pipes (stdin, stdout, stderr) can't be redirected to the
108                    console (as normal non detached processes would), and so they
109                    are explicitly redirected to files. The pidfile is created as
110                    part of the process of launching the command. The pidfile
111                    holds the pid and ppid of the process forked in background,
112                    so later on it is possible to check whether the command is still
113                    running.
114
115                     USAGE: long-lived commands that can run detached in background,
116                     for which it is NOT necessary to block (wait) until the command
117                     has finished. (e.g. start an application that is not using X11
118                     forwarding. It can run detached and remotely in background)
119
120              * 'run_and_wait' :
121
122                     HOW IT WORKS: Similar to 'run' except that it 'blocks' until
123                     the command has finished execution. It also checks whether
124                     errors occurred during runtime by reading the exitcode file,
125                     which contains the exit code of the command that was run
126                     (checking stderr only is not always reliable since many
127                     commands throw debugging info to stderr and the only way to
128                     automatically know whether an error really happened is to
129                     check the process exit code).
130
131                     Another difference with respect to 'run', is that instead
132                     of directly executing the command as a bash command line,
133                     it uploads the command to a bash script and runs the script.
134                     This allows to use the bash script to debug errors, since
135                     it remains at the remote host and can be run manually to
136                     reproduce the error.
137                   
138                     USAGE: medium-lived commands that can run detached in
139                     background, for which it IS necessary to block (wait) until
140                     the command has finished. (e.g. Package installation,
141                     source compilation, file download, etc)
142
143     """
144     _rtype = "LinuxNode"
145     _help = "Controls Linux host machines ( either localhost or a host " \
146             "that can be accessed using a SSH key)"
147     _backend_type = "linux"
148
149     @classmethod
150     def _register_attributes(cls):
151         hostname = Attribute("hostname", "Hostname of the machine",
152                 flags = Flags.Design)
153
154         username = Attribute("username", "Local account username", 
155                 flags = Flags.Credential)
156
157         port = Attribute("port", "SSH port", flags = Flags.Design)
158         
159         home = Attribute("home",
160                 "Experiment home directory to store all experiment related files",
161                 flags = Flags.Design)
162         
163         identity = Attribute("identity", "SSH identity file",
164                 flags = Flags.Credential)
165         
166         server_key = Attribute("serverKey", "Server public key", 
167                 flags = Flags.Design)
168         
169         clean_home = Attribute("cleanHome", "Remove all nepi files and directories "
170                 " from node home folder before starting experiment", 
171                 type = Types.Bool,
172                 default = False,
173                 flags = Flags.Design)
174
175         clean_experiment = Attribute("cleanExperiment", "Remove all files and directories " 
176                 " from a previous same experiment, before the new experiment starts", 
177                 type = Types.Bool,
178                 default = False,
179                 flags = Flags.Design)
180         
181         clean_processes = Attribute("cleanProcesses", 
182                 "Kill all running processes before starting experiment",
183                 type = Types.Bool,
184                 default = False,
185                 flags = Flags.Design)
186         
187         tear_down = Attribute("tearDown", "Bash script to be executed before " + \
188                 "releasing the resource",
189                 flags = Flags.Design)
190
191         gateway_user = Attribute("gatewayUser", "Gateway account username",
192                 flags = Flags.Design)
193
194         gateway = Attribute("gateway", "Hostname of the gateway machine",
195                 flags = Flags.Design)
196
197         cls._register_attribute(hostname)
198         cls._register_attribute(username)
199         cls._register_attribute(port)
200         cls._register_attribute(home)
201         cls._register_attribute(identity)
202         cls._register_attribute(server_key)
203         cls._register_attribute(clean_home)
204         cls._register_attribute(clean_experiment)
205         cls._register_attribute(clean_processes)
206         cls._register_attribute(tear_down)
207         cls._register_attribute(gateway_user)
208         cls._register_attribute(gateway)
209
210     def __init__(self, ec, guid):
211         super(LinuxNode, self).__init__(ec, guid)
212         self._os = None
213         # home directory at Linux host
214         self._home_dir = ""
215
216         # lock to prevent concurrent applications on the same node,
217         # to execute commands at the same time. There are potential
218         # concurrency issues when using SSH to a same host from 
219         # multiple threads. There are also possible operational 
220         # issues, e.g. an application querying the existence 
221         # of a file or folder prior to its creation, and another 
222         # application creating the same file or folder in between.
223         self._node_lock = threading.Lock()
224     
225     def log_message(self, msg):
226         return " guid %d - host %s - %s " % (self.guid, 
227                 self.get("hostname"), msg)
228
229     @property
230     def home_dir(self):
231         home = self.get("home") or ""
232         if not home.startswith("/"):
233            home = os.path.join(self._home_dir, home) 
234         return home
235
236     @property
237     def usr_dir(self):
238         return os.path.join(self.home_dir, "nepi-usr")
239
240     @property
241     def lib_dir(self):
242         return os.path.join(self.usr_dir, "lib")
243
244     @property
245     def bin_dir(self):
246         return os.path.join(self.usr_dir, "bin")
247
248     @property
249     def src_dir(self):
250         return os.path.join(self.usr_dir, "src")
251
252     @property
253     def share_dir(self):
254         return os.path.join(self.usr_dir, "share")
255
256     @property
257     def exp_dir(self):
258         return os.path.join(self.home_dir, "nepi-exp")
259
260     @property
261     def exp_home(self):
262         return os.path.join(self.exp_dir, self.ec.exp_id)
263
264     @property
265     def node_home(self):
266         return os.path.join(self.exp_home, "node-%d" % self.guid)
267
268     @property
269     def run_home(self):
270         return os.path.join(self.node_home, self.ec.run_id)
271
272     @property
273     def os(self):
274         if self._os:
275             return self._os
276
277         if (not self.get("hostname") or not self.get("username")):
278             msg = "Can't resolve OS, insufficient data "
279             self.error(msg)
280             raise RuntimeError, msg
281
282         out = self.get_os()
283
284         if out.find("Fedora release 8") == 0:
285             self._os = OSType.FEDORA_8
286         elif out.find("Fedora release 12") == 0:
287             self._os = OSType.FEDORA_12
288         elif out.find("Fedora release 14") == 0:
289             self._os = OSType.FEDORA_14
290         elif out.find("Fedora release") == 0:
291             self._os = OSType.FEDORA
292         elif out.find("Debian") == 0: 
293             self._os = OSType.DEBIAN
294         elif out.find("Ubuntu") ==0:
295             self._os = OSType.UBUNTU
296         else:
297             msg = "Unsupported OS"
298             self.error(msg, out)
299             raise RuntimeError, "%s - %s " %( msg, out )
300
301         return self._os
302
303     def get_os(self):
304         # The underlying SSH layer will sometimes return an empty
305         # output (even if the command was executed without errors).
306         # To work arround this, repeat the operation N times or
307         # until the result is not empty string
308         out = ""
309         try:
310             (out, err), proc = self.execute("cat /etc/issue", 
311                     with_lock = True,
312                     blocking = True)
313         except:
314             trace = traceback.format_exc()
315             msg = "Error detecting OS: %s " % trace
316             self.error(msg, out, err)
317         
318         return out
319
320     @property
321     def use_deb(self):
322         return self.os in [OSType.DEBIAN, OSType.UBUNTU]
323
324     @property
325     def use_rpm(self):
326         return self.os in [OSType.FEDORA_12, OSType.FEDORA_14, OSType.FEDORA_8,
327                 OSType.FEDORA]
328
329     @property
330     def localhost(self):
331         return self.get("hostname") in ['localhost', '127.0.0.7', '::1']
332
333     def do_provision(self):
334         # check if host is alive
335         if not self.is_alive():
336             msg = "Deploy failed. Unresponsive node %s" % self.get("hostname")
337             self.error(msg)
338             raise RuntimeError, msg
339
340         self.find_home()
341
342         if self.get("cleanProcesses"):
343             self.clean_processes()
344
345         if self.get("cleanHome"):
346             self.clean_home()
347  
348         if self.get("cleanExperiment"):
349             self.clean_experiment()
350     
351         # Create shared directory structure
352         self.mkdir(self.lib_dir)
353         self.mkdir(self.bin_dir)
354         self.mkdir(self.src_dir)
355         self.mkdir(self.share_dir)
356
357         # Create experiment node home directory
358         self.mkdir(self.node_home)
359
360         super(LinuxNode, self).do_provision()
361
362     def do_deploy(self):
363         if self.state == ResourceState.NEW:
364             self.info("Deploying node")
365             self.do_discover()
366             self.do_provision()
367
368         # Node needs to wait until all associated interfaces are 
369         # ready before it can finalize deployment
370         from nepi.resources.linux.interface import LinuxInterface
371         ifaces = self.get_connected(LinuxInterface.get_rtype())
372         for iface in ifaces:
373             if iface.state < ResourceState.READY:
374                 self.ec.schedule(reschedule_delay, self.deploy)
375                 return 
376
377         super(LinuxNode, self).do_deploy()
378
379     def do_release(self):
380         rms = self.get_connected()
381         for rm in rms:
382             # Node needs to wait until all associated RMs are released
383             # before it can be released
384             if rm.state != ResourceState.RELEASED:
385                 self.ec.schedule(reschedule_delay, self.release)
386                 return 
387
388         tear_down = self.get("tearDown")
389         if tear_down:
390             self.execute(tear_down)
391
392         self.clean_processes()
393
394         super(LinuxNode, self).do_release()
395
396     def valid_connection(self, guid):
397         # TODO: Validate!
398         return True
399
400     def clean_processes(self):
401         self.info("Cleaning up processes")
402  
403         if self.get("username") != 'root':
404             cmd = ("sudo -S killall tcpdump || /bin/true ; " +
405                 "sudo -S kill $(ps aux | grep '[n]epi' | awk '{print $2}') || /bin/true ; " +
406                 "sudo -S killall -u %s || /bin/true ; " % self.get("username"))
407         else:
408             if self.state >= ResourceState.READY:
409                 import pickle
410                 pids = pickle.load(open("/tmp/save.proc", "rb"))
411                 pids_temp = dict()
412                 ps_aux = "ps aux |awk '{print $2,$11}'"
413                 (out, err), proc = self.execute(ps_aux)
414                 for line in out.strip().split("\n"):
415                     parts = line.strip().split(" ")
416                     pids_temp[parts[0]] = parts[1]
417                 kill_pids = set(pids_temp.items()) - set(pids.items())
418                 kill_pids = ' '.join(dict(kill_pids).keys())
419
420                 cmd = ("killall tcpdump || /bin/true ; " +
421                     "kill $(ps aux | grep '[n]epi' | awk '{print $2}') || /bin/true ; " +
422                     "kill %s || /bin/true ; " % kill_pids)
423             else:
424                 cmd = ("killall tcpdump || /bin/true ; " +
425                     "kill $(ps aux | grep '[n]epi' | awk '{print $2}') || /bin/true ; ")
426
427         out = err = ""
428         (out, err), proc = self.execute(cmd, retry = 1, with_lock = True)
429
430     def clean_home(self):
431         """ Cleans all NEPI related folders in the Linux host
432         """
433         self.info("Cleaning up home")
434         
435         cmd = "cd %s ; find . -maxdepth 1 \( -name 'nepi-usr' -o -name 'nepi-exp' \) -execdir rm -rf {} + " % (
436                 self.home_dir )
437
438         return self.execute(cmd, with_lock = True)
439
440     def clean_experiment(self):
441         """ Cleans all experiment related files in the Linux host.
442         It preserves NEPI files and folders that have a multi experiment
443         scope.
444         """
445         self.info("Cleaning up experiment files")
446         
447         cmd = "cd %s ; find . -maxdepth 1 -name '%s' -execdir rm -rf {} + " % (
448                 self.exp_dir,
449                 self.ec.exp_id )
450             
451         return self.execute(cmd, with_lock = True)
452
453     def execute(self, command,
454             sudo = False,
455             env = None,
456             tty = False,
457             forward_x11 = False,
458             retry = 3,
459             connect_timeout = 30,
460             strict_host_checking = False,
461             persistent = True,
462             blocking = True,
463             with_lock = False
464             ):
465         """ Notice that this invocation will block until the
466         execution finishes. If this is not the desired behavior,
467         use 'run' instead."""
468
469         if self.localhost:
470             (out, err), proc = execfuncs.lexec(command, 
471                     user = self.get("username"), # still problem with localhost
472                     sudo = sudo,
473                     env = env)
474         else:
475             if with_lock:
476                 # If the execute command is blocking, we don't want to keep
477                 # the node lock. This lock is used to avoid race conditions
478                 # when creating the ControlMaster sockets. A more elegant
479                 # solution is needed.
480                 with self._node_lock:
481                     (out, err), proc = sshfuncs.rexec(
482                         command, 
483                         host = self.get("hostname"),
484                         user = self.get("username"),
485                         port = self.get("port"),
486                         gwuser = self.get("gatewayUser"),
487                         gw = self.get("gateway"),
488                         agent = True,
489                         sudo = sudo,
490                         identity = self.get("identity"),
491                         server_key = self.get("serverKey"),
492                         env = env,
493                         tty = tty,
494                         forward_x11 = forward_x11,
495                         retry = retry,
496                         connect_timeout = connect_timeout,
497                         persistent = persistent,
498                         blocking = blocking, 
499                         strict_host_checking = strict_host_checking
500                         )
501             else:
502                 (out, err), proc = sshfuncs.rexec(
503                     command, 
504                     host = self.get("hostname"),
505                     user = self.get("username"),
506                     port = self.get("port"),
507                     gwuser = self.get("gatewayUser"),
508                     gw = self.get("gateway"),
509                     agent = True,
510                     sudo = sudo,
511                     identity = self.get("identity"),
512                     server_key = self.get("serverKey"),
513                     env = env,
514                     tty = tty,
515                     forward_x11 = forward_x11,
516                     retry = retry,
517                     connect_timeout = connect_timeout,
518                     persistent = persistent,
519                     blocking = blocking, 
520                     strict_host_checking = strict_host_checking
521                     )
522
523         return (out, err), proc
524
525     def run(self, command, home,
526             create_home = False,
527             pidfile = 'pidfile',
528             stdin = None, 
529             stdout = 'stdout', 
530             stderr = 'stderr', 
531             sudo = False,
532             tty = False):
533         
534         self.debug("Running command '%s'" % command)
535         
536         if self.localhost:
537             (out, err), proc = execfuncs.lspawn(command, pidfile, 
538                     stdout = stdout, 
539                     stderr = stderr, 
540                     stdin = stdin, 
541                     home = home, 
542                     create_home = create_home, 
543                     sudo = sudo,
544                     user = user) 
545         else:
546             with self._node_lock:
547                 (out, err), proc = sshfuncs.rspawn(
548                     command,
549                     pidfile = pidfile,
550                     home = home,
551                     create_home = create_home,
552                     stdin = stdin if stdin is not None else '/dev/null',
553                     stdout = stdout if stdout else '/dev/null',
554                     stderr = stderr if stderr else '/dev/null',
555                     sudo = sudo,
556                     host = self.get("hostname"),
557                     user = self.get("username"),
558                     port = self.get("port"),
559                     gwuser = self.get("gatewayUser"),
560                     gw = self.get("gateway"),
561                     agent = True,
562                     identity = self.get("identity"),
563                     server_key = self.get("serverKey"),
564                     tty = tty
565                     )
566
567         return (out, err), proc
568
569     def getpid(self, home, pidfile = "pidfile"):
570         if self.localhost:
571             pidtuple =  execfuncs.lgetpid(os.path.join(home, pidfile))
572         else:
573             with self._node_lock:
574                 pidtuple = sshfuncs.rgetpid(
575                     os.path.join(home, pidfile),
576                     host = self.get("hostname"),
577                     user = self.get("username"),
578                     port = self.get("port"),
579                     gwuser = self.get("gatewayUser"),
580                     gw = self.get("gateway"),
581                     agent = True,
582                     identity = self.get("identity"),
583                     server_key = self.get("serverKey")
584                     )
585         
586         return pidtuple
587
588     def status(self, pid, ppid):
589         if self.localhost:
590             status = execfuncs.lstatus(pid, ppid)
591         else:
592             with self._node_lock:
593                 status = sshfuncs.rstatus(
594                         pid, ppid,
595                         host = self.get("hostname"),
596                         user = self.get("username"),
597                         port = self.get("port"),
598                         gwuser = self.get("gatewayUser"),
599                         gw = self.get("gateway"),
600                         agent = True,
601                         identity = self.get("identity"),
602                         server_key = self.get("serverKey")
603                         )
604            
605         return status
606     
607     def kill(self, pid, ppid, sudo = False):
608         out = err = ""
609         proc = None
610         status = self.status(pid, ppid)
611
612         if status == sshfuncs.ProcStatus.RUNNING:
613             if self.localhost:
614                 (out, err), proc = execfuncs.lkill(pid, ppid, sudo)
615             else:
616                 with self._node_lock:
617                     (out, err), proc = sshfuncs.rkill(
618                         pid, ppid,
619                         host = self.get("hostname"),
620                         user = self.get("username"),
621                         port = self.get("port"),
622                         gwuser = self.get("gatewayUser"),
623                         gw = self.get("gateway"),
624                         agent = True,
625                         sudo = sudo,
626                         identity = self.get("identity"),
627                         server_key = self.get("serverKey")
628                         )
629
630         return (out, err), proc
631
632     def copy(self, src, dst):
633         if self.localhost:
634             (out, err), proc = execfuncs.lcopy(source, dest, 
635                     recursive = True,
636                     strict_host_checking = False)
637         else:
638             with self._node_lock:
639                 (out, err), proc = sshfuncs.rcopy(
640                     src, dst, 
641                     port = self.get("port"),
642                     gwuser = self.get("gatewayUser"),
643                     gw = self.get("gateway"),
644                     identity = self.get("identity"),
645                     server_key = self.get("serverKey"),
646                     recursive = True,
647                     strict_host_checking = False)
648
649         return (out, err), proc
650
651     def upload(self, src, dst, text = False, overwrite = True):
652         """ Copy content to destination
653
654         src  string with the content to copy. Can be:
655             - plain text
656             - a string with the path to a local file
657             - a string with a semi-colon separeted list of local files
658             - a string with a local directory
659
660         dst  string with destination path on the remote host (remote is 
661             always self.host)
662
663         text src is text input, it must be stored into a temp file before 
664         uploading
665         """
666         # If source is a string input 
667         f = None
668         if text and not os.path.isfile(src):
669             # src is text input that should be uploaded as file
670             # create a temporal file with the content to upload
671             f = tempfile.NamedTemporaryFile(delete=False)
672             f.write(src)
673             f.close()
674             src = f.name
675
676         # If dst files should not be overwritten, check that the files do not
677         # exits already
678         if isinstance(src, str):
679             src = map(str.strip, src.split(";"))
680     
681         if overwrite == False:
682             src = self.filter_existing_files(src, dst)
683             if not src:
684                 return ("", ""), None
685
686         if not self.localhost:
687             # Build destination as <user>@<server>:<path>
688             dst = "%s@%s:%s" % (self.get("username"), self.get("hostname"), dst)
689
690         result = self.copy(src, dst)
691
692         # clean up temp file
693         if f:
694             os.remove(f.name)
695
696         return result
697
698     def download(self, src, dst):
699         if not self.localhost:
700             # Build destination as <user>@<server>:<path>
701             src = "%s@%s:%s" % (self.get("username"), self.get("hostname"), src)
702         return self.copy(src, dst)
703
704     def install_packages_command(self, packages):
705         command = ""
706         if self.use_rpm:
707             command = rpmfuncs.install_packages_command(self.os, packages)
708         elif self.use_deb:
709             command = debfuncs.install_packages_command(self.os, packages)
710         else:
711             msg = "Error installing packages ( OS not known ) "
712             self.error(msg, self.os)
713             raise RuntimeError, msg
714
715         return command
716
717     def install_packages(self, packages, home, run_home = None):
718         """ Install packages in the Linux host.
719
720         'home' is the directory to upload the package installation script.
721         'run_home' is the directory from where to execute the script.
722         """
723         command = self.install_packages_command(packages)
724
725         run_home = run_home or home
726
727         (out, err), proc = self.run_and_wait(command, run_home, 
728             shfile = os.path.join(home, "instpkg.sh"),
729             pidfile = "instpkg_pidfile",
730             ecodefile = "instpkg_exitcode",
731             stdout = "instpkg_stdout", 
732             stderr = "instpkg_stderr",
733             overwrite = False,
734             raise_on_error = True)
735
736         return (out, err), proc 
737
738     def remove_packages(self, packages, home, run_home = None):
739         """ Uninstall packages from the Linux host.
740
741         'home' is the directory to upload the package un-installation script.
742         'run_home' is the directory from where to execute the script.
743         """
744         if self.use_rpm:
745             command = rpmfuncs.remove_packages_command(self.os, packages)
746         elif self.use_deb:
747             command = debfuncs.remove_packages_command(self.os, packages)
748         else:
749             msg = "Error removing packages ( OS not known ) "
750             self.error(msg)
751             raise RuntimeError, msg
752
753         run_home = run_home or home
754
755         (out, err), proc = self.run_and_wait(command, run_home, 
756             shfile = os.path.join(home, "rmpkg.sh"),
757             pidfile = "rmpkg_pidfile",
758             ecodefile = "rmpkg_exitcode",
759             stdout = "rmpkg_stdout", 
760             stderr = "rmpkg_stderr",
761             overwrite = False,
762             raise_on_error = True)
763          
764         return (out, err), proc 
765
766     def mkdir(self, path, clean = False):
767         if clean:
768             self.rmdir(path)
769
770         return self.execute("mkdir -p %s" % path, with_lock = True)
771
772     def rmdir(self, path):
773         return self.execute("rm -rf %s" % path, with_lock = True)
774         
775     def run_and_wait(self, command, home, 
776             shfile = "cmd.sh",
777             env = None,
778             overwrite = True,
779             pidfile = "pidfile", 
780             ecodefile = "exitcode", 
781             stdin = None, 
782             stdout = "stdout", 
783             stderr = "stderr", 
784             sudo = False,
785             tty = False,
786             raise_on_error = False):
787         """
788         Uploads the 'command' to a bash script in the host.
789         Then runs the script detached in background in the host, and
790         busy-waites until the script finishes executing.
791         """
792
793         if not shfile.startswith("/"):
794             shfile = os.path.join(home, shfile)
795
796         self.upload_command(command, 
797             shfile = shfile, 
798             ecodefile = ecodefile, 
799             env = env,
800             overwrite = overwrite)
801
802         command = "bash %s" % shfile
803         # run command in background in remote host
804         (out, err), proc = self.run(command, home, 
805                 pidfile = pidfile,
806                 stdin = stdin, 
807                 stdout = stdout, 
808                 stderr = stderr, 
809                 sudo = sudo,
810                 tty = tty)
811
812         # check no errors occurred
813         if proc.poll():
814             msg = " Failed to run command '%s' " % command
815             self.error(msg, out, err)
816             if raise_on_error:
817                 raise RuntimeError, msg
818
819         # Wait for pid file to be generated
820         pid, ppid = self.wait_pid(
821                 home = home, 
822                 pidfile = pidfile, 
823                 raise_on_error = raise_on_error)
824
825         # wait until command finishes to execute
826         self.wait_run(pid, ppid)
827       
828         (eout, err), proc = self.check_errors(home,
829             ecodefile = ecodefile,
830             stderr = stderr)
831
832         # Out is what was written in the stderr file
833         if err:
834             msg = " Failed to run command '%s' " % command
835             self.error(msg, eout, err)
836
837             if raise_on_error:
838                 raise RuntimeError, msg
839
840         (out, oerr), proc = self.check_output(home, stdout)
841         
842         return (out, err), proc
843
844     def exitcode(self, home, ecodefile = "exitcode"):
845         """
846         Get the exit code of an application.
847         Returns an integer value with the exit code 
848         """
849         (out, err), proc = self.check_output(home, ecodefile)
850
851         # Succeeded to open file, return exit code in the file
852         if proc.wait() == 0:
853             try:
854                 return int(out.strip())
855             except:
856                 # Error in the content of the file!
857                 return ExitCode.CORRUPTFILE
858
859         # No such file or directory
860         if proc.returncode == 1:
861             return ExitCode.FILENOTFOUND
862         
863         # Other error from 'cat'
864         return ExitCode.ERROR
865
866     def upload_command(self, command, 
867             shfile = "cmd.sh",
868             ecodefile = "exitcode",
869             overwrite = True,
870             env = None):
871         """ Saves the command as a bash script file in the remote host, and
872         forces to save the exit code of the command execution to the ecodefile
873         """
874
875         if not (command.strip().endswith(";") or command.strip().endswith("&")):
876             command += ";"
877       
878         # The exit code of the command will be stored in ecodefile
879         command = " { %(command)s } ; echo $? > %(ecodefile)s ;" % {
880                 'command': command,
881                 'ecodefile': ecodefile,
882                 } 
883
884         # Export environment
885         environ = self.format_environment(env)
886
887         # Add environ to command
888         command = environ + command
889
890         return self.upload(command, shfile, text = True, overwrite = overwrite)
891
892     def format_environment(self, env, inline = False):
893         """ Formats the environment variables for a command to be executed
894         either as an inline command
895         (i.e. export PYTHONPATH=src/..; export LALAL= ..;python script.py) or 
896         as a bash script (i.e. export PYTHONPATH=src/.. \n export LALA=.. \n)
897         """
898         if not env: return ""
899
900         # Remove extra white spaces
901         env = re.sub(r'\s+', ' ', env.strip())
902
903         sep = ";" if inline else "\n"
904         return sep.join(map(lambda e: " export %s" % e, env.split(" "))) + sep 
905
906     def check_errors(self, home, 
907             ecodefile = "exitcode", 
908             stderr = "stderr"):
909         """ Checks whether errors occurred while running a command.
910         It first checks the exit code for the command, and only if the
911         exit code is an error one it returns the error output.
912
913         """
914         proc = None
915         err = ""
916
917         # get exit code saved in the 'exitcode' file
918         ecode = self.exitcode(home, ecodefile)
919
920         if ecode in [ ExitCode.CORRUPTFILE, ExitCode.ERROR ]:
921             err = "Error retrieving exit code status from file %s/%s" % (home, ecodefile)
922         elif ecode > 0 or ecode == ExitCode.FILENOTFOUND:
923             # The process returned an error code or didn't exist. 
924             # Check standard error.
925             (err, eerr), proc = self.check_output(home, stderr)
926
927             # If the stderr file was not found, assume nothing bad happened,
928             # and just ignore the error.
929             # (cat returns 1 for error "No such file or directory")
930             if ecode == ExitCode.FILENOTFOUND and proc.poll() == 1: 
931                 err = "" 
932             
933         return ("", err), proc
934  
935     def wait_pid(self, home, pidfile = "pidfile", raise_on_error = False):
936         """ Waits until the pid file for the command is generated, 
937             and returns the pid and ppid of the process """
938         pid = ppid = None
939         delay = 1.0
940
941         for i in xrange(2):
942             pidtuple = self.getpid(home = home, pidfile = pidfile)
943             
944             if pidtuple:
945                 pid, ppid = pidtuple
946                 break
947             else:
948                 time.sleep(delay)
949                 delay = delay * 1.5
950         else:
951             msg = " Failed to get pid for pidfile %s/%s " % (
952                     home, pidfile )
953             self.error(msg)
954             
955             if raise_on_error:
956                 raise RuntimeError, msg
957
958         return pid, ppid
959
960     def wait_run(self, pid, ppid, trial = 0):
961         """ wait for a remote process to finish execution """
962         delay = 1.0
963
964         while True:
965             status = self.status(pid, ppid)
966             
967             if status is ProcStatus.FINISHED:
968                 break
969             elif status is not ProcStatus.RUNNING:
970                 delay = delay * 1.5
971                 time.sleep(delay)
972                 # If it takes more than 20 seconds to start, then
973                 # asume something went wrong
974                 if delay > 20:
975                     break
976             else:
977                 # The app is running, just wait...
978                 time.sleep(0.5)
979
980     def check_output(self, home, filename):
981         """ Retrives content of file """
982         (out, err), proc = self.execute("cat %s" % 
983             os.path.join(home, filename), retry = 1, with_lock = True)
984         return (out, err), proc
985
986     def is_alive(self):
987         """ Checks if host is responsive
988         """
989         if self.localhost:
990             return True
991
992         out = err = ""
993         msg = "Unresponsive host. Wrong answer. "
994
995         # The underlying SSH layer will sometimes return an empty
996         # output (even if the command was executed without errors).
997         # To work arround this, repeat the operation N times or
998         # until the result is not empty string
999         try:
1000             (out, err), proc = self.execute("echo 'ALIVE'",
1001                     blocking = True,
1002                     with_lock = True)
1003     
1004             if out.find("ALIVE") > -1:
1005                 return True
1006         except:
1007             trace = traceback.format_exc()
1008             msg = "Unresponsive host. Error reaching host: %s " % trace
1009
1010         self.error(msg, out, err)
1011         return False
1012
1013     def find_home(self):
1014         """ Retrieves host home directory
1015         """
1016         # The underlying SSH layer will sometimes return an empty
1017         # output (even if the command was executed without errors).
1018         # To work arround this, repeat the operation N times or
1019         # until the result is not empty string
1020         msg = "Impossible to retrieve HOME directory"
1021         try:
1022             (out, err), proc = self.execute("echo ${HOME}",
1023                     blocking = True,
1024                     with_lock = True)
1025     
1026             if out.strip() != "":
1027                 self._home_dir =  out.strip()
1028         except:
1029             trace = traceback.format_exc()
1030             msg = "Impossible to retrieve HOME directory %s" % trace
1031
1032         if not self._home_dir:
1033             self.error(msg)
1034             raise RuntimeError, msg
1035
1036     def filter_existing_files(self, src, dst):
1037         """ Removes files that already exist in the Linux host from src list
1038         """
1039         # construct a dictionary with { dst: src }
1040         dests = dict(map(
1041             lambda s: (os.path.join(dst, os.path.basename(s)), s ), s)) \
1042                     if len(src) > 1 else dict({dst: src[0]})
1043
1044         command = []
1045         for d in dests.keys():
1046             command.append(" [ -f %(dst)s ] && echo '%(dst)s' " % {'dst' : d} )
1047
1048         command = ";".join(command)
1049
1050         (out, err), proc = self.execute(command, retry = 1, with_lock = True)
1051     
1052         for d in dests.keys():
1053             if out.find(d) > -1:
1054                 del dests[d]
1055
1056         if not dests:
1057             return []
1058
1059         return dests.values()
1060