Fix #29 LinuxApplication passing a list of files as 'sources' not working
[nepi.git] / src / nepi / resources / linux / node.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19
20 from nepi.execution.attribute import Attribute, Flags, Types
21 from nepi.execution.resource import ResourceManager, clsinit_copy, \
22         ResourceState, reschedule_delay
23 from nepi.resources.linux import rpmfuncs, debfuncs 
24 from nepi.util import sshfuncs, execfuncs
25 from nepi.util.sshfuncs import ProcStatus
26
27 import collections
28 import os
29 import random
30 import re
31 import tempfile
32 import time
33 import threading
34 import traceback
35
36 # TODO: Unify delays!!
37 # TODO: Validate outcome of uploads!! 
38
39 class ExitCode:
40     """
41     Error codes that the rexitcode function can return if unable to
42     check the exit code of a spawned process
43     """
44     FILENOTFOUND = -1
45     CORRUPTFILE = -2
46     ERROR = -3
47     OK = 0
48
49 class OSType:
50     """
51     Supported flavors of Linux OS
52     """
53     FEDORA_8 = "f8"
54     FEDORA_12 = "f12"
55     FEDORA_14 = "f14"
56     FEDORA = "fedora"
57     UBUNTU = "ubuntu"
58     DEBIAN = "debian"
59
60 @clsinit_copy
61 class LinuxNode(ResourceManager):
62     """
63     .. class:: Class Args :
64       
65         :param ec: The Experiment controller
66         :type ec: ExperimentController
67         :param guid: guid of the RM
68         :type guid: int
69
70     .. note::
71
72         There are different ways in which commands can be executed using the
73         LinuxNode interface (i.e. 'execute' - blocking and non blocking, 'run',
74         'run_and_wait'). 
75         
76         Brief explanation:
77
78             * 'execute' (blocking mode) :  
79
80                      HOW IT WORKS: 'execute', forks a process and run the
81                      command, synchronously, attached to the terminal, in
82                      foreground.
83                      The execute method will block until the command returns
84                      the result on 'out', 'err' (so until it finishes executing).
85   
86                      USAGE: short-lived commands that must be executed attached
87                      to a terminal and in foreground, for which it IS necessary
88                      to block until the command has finished (e.g. if you want
89                      to run 'ls' or 'cat').
90
91             * 'execute' (NON blocking mode - blocking = False) :
92
93                     HOW IT WORKS: Same as before, except that execute method
94                     will return immediately (even if command still running).
95
96                     USAGE: long-lived commands that must be executed attached
97                     to a terminal and in foreground, but for which it is not
98                     necessary to block until the command has finished. (e.g.
99                     start an application using X11 forwarding)
100
101              * 'run' :
102
103                    HOW IT WORKS: Connects to the host ( using SSH if remote)
104                    and launches the command in background, detached from any
105                    terminal (daemonized), and returns. The command continues to
106                    run remotely, but since it is detached from the terminal,
107                    its pipes (stdin, stdout, stderr) can't be redirected to the
108                    console (as normal non detached processes would), and so they
109                    are explicitly redirected to files. The pidfile is created as
110                    part of the process of launching the command. The pidfile
111                    holds the pid and ppid of the process forked in background,
112                    so later on it is possible to check whether the command is still
113                    running.
114
115                     USAGE: long-lived commands that can run detached in background,
116                     for which it is NOT necessary to block (wait) until the command
117                     has finished. (e.g. start an application that is not using X11
118                     forwarding. It can run detached and remotely in background)
119
120              * 'run_and_wait' :
121
122                     HOW IT WORKS: Similar to 'run' except that it 'blocks' until
123                     the command has finished execution. It also checks whether
124                     errors occurred during runtime by reading the exitcode file,
125                     which contains the exit code of the command that was run
126                     (checking stderr only is not always reliable since many
127                     commands throw debugging info to stderr and the only way to
128                     automatically know whether an error really happened is to
129                     check the process exit code).
130
131                     Another difference with respect to 'run', is that instead
132                     of directly executing the command as a bash command line,
133                     it uploads the command to a bash script and runs the script.
134                     This allows to use the bash script to debug errors, since
135                     it remains at the remote host and can be run manually to
136                     reproduce the error.
137                   
138                     USAGE: medium-lived commands that can run detached in
139                     background, for which it IS necessary to block (wait) until
140                     the command has finished. (e.g. Package installation,
141                     source compilation, file download, etc)
142
143     """
144     _rtype = "LinuxNode"
145     _help = "Controls Linux host machines ( either localhost or a host " \
146             "that can be accessed using a SSH key)"
147     _backend_type = "linux"
148
149     @classmethod
150     def _register_attributes(cls):
151         hostname = Attribute("hostname", "Hostname of the machine",
152                 flags = Flags.ExecReadOnly)
153
154         username = Attribute("username", "Local account username", 
155                 flags = Flags.Credential)
156
157         port = Attribute("port", "SSH port", flags = Flags.ExecReadOnly)
158         
159         home = Attribute("home",
160                 "Experiment home directory to store all experiment related files",
161                 flags = Flags.ExecReadOnly)
162         
163         identity = Attribute("identity", "SSH identity file",
164                 flags = Flags.Credential)
165         
166         server_key = Attribute("serverKey", "Server public key", 
167                 flags = Flags.ExecReadOnly)
168         
169         clean_home = Attribute("cleanHome", "Remove all nepi files and directories "
170                 " from node home folder before starting experiment", 
171                 type = Types.Bool,
172                 default = False,
173                 flags = Flags.ExecReadOnly)
174
175         clean_experiment = Attribute("cleanExperiment", "Remove all files and directories " 
176                 " from a previous same experiment, before the new experiment starts", 
177                 type = Types.Bool,
178                 default = False,
179                 flags = Flags.ExecReadOnly)
180         
181         clean_processes = Attribute("cleanProcesses", 
182                 "Kill all running processes before starting experiment",
183                 type = Types.Bool,
184                 default = False,
185                 flags = Flags.ExecReadOnly)
186         
187         tear_down = Attribute("tearDown", "Bash script to be executed before " + \
188                 "releasing the resource",
189                 flags = Flags.ExecReadOnly)
190
191         gateway_user = Attribute("gatewayUser", "Gateway account username",
192                 flags = Flags.ExecReadOnly)
193
194         gateway = Attribute("gateway", "Hostname of the gateway machine",
195                 flags = Flags.ExecReadOnly)
196
197         cls._register_attribute(hostname)
198         cls._register_attribute(username)
199         cls._register_attribute(port)
200         cls._register_attribute(home)
201         cls._register_attribute(identity)
202         cls._register_attribute(server_key)
203         cls._register_attribute(clean_home)
204         cls._register_attribute(clean_experiment)
205         cls._register_attribute(clean_processes)
206         cls._register_attribute(tear_down)
207         cls._register_attribute(gateway_user)
208         cls._register_attribute(gateway)
209
210     def __init__(self, ec, guid):
211         super(LinuxNode, self).__init__(ec, guid)
212         self._os = None
213         # home directory at Linux host
214         self._home_dir = ""
215
216         # lock to prevent concurrent applications on the same node,
217         # to execute commands at the same time. There are potential
218         # concurrency issues when using SSH to a same host from 
219         # multiple threads. There are also possible operational 
220         # issues, e.g. an application querying the existence 
221         # of a file or folder prior to its creation, and another 
222         # application creating the same file or folder in between.
223         self._node_lock = threading.Lock()
224     
225     def log_message(self, msg):
226         return " guid %d - host %s - %s " % (self.guid, 
227                 self.get("hostname"), msg)
228
229     @property
230     def home_dir(self):
231         home = self.get("home") or ""
232         if not home.startswith("/"):
233            home = os.path.join(self._home_dir, home) 
234         return home
235
236     @property
237     def usr_dir(self):
238         return os.path.join(self.home_dir, "nepi-usr")
239
240     @property
241     def lib_dir(self):
242         return os.path.join(self.usr_dir, "lib")
243
244     @property
245     def bin_dir(self):
246         return os.path.join(self.usr_dir, "bin")
247
248     @property
249     def src_dir(self):
250         return os.path.join(self.usr_dir, "src")
251
252     @property
253     def share_dir(self):
254         return os.path.join(self.usr_dir, "share")
255
256     @property
257     def exp_dir(self):
258         return os.path.join(self.home_dir, "nepi-exp")
259
260     @property
261     def exp_home(self):
262         return os.path.join(self.exp_dir, self.ec.exp_id)
263
264     @property
265     def node_home(self):
266         return os.path.join(self.exp_home, "node-%d" % self.guid)
267
268     @property
269     def run_home(self):
270         return os.path.join(self.node_home, self.ec.run_id)
271
272     @property
273     def os(self):
274         if self._os:
275             return self._os
276
277         if (not self.get("hostname") or not self.get("username")):
278             msg = "Can't resolve OS, insufficient data "
279             self.error(msg)
280             raise RuntimeError, msg
281
282         out = self.get_os()
283
284         if out.find("Fedora release 8") == 0:
285             self._os = OSType.FEDORA_8
286         elif out.find("Fedora release 12") == 0:
287             self._os = OSType.FEDORA_12
288         elif out.find("Fedora release 14") == 0:
289             self._os = OSType.FEDORA_14
290         elif out.find("Fedora release") == 0:
291             self._os = OSType.FEDORA
292         elif out.find("Debian") == 0: 
293             self._os = OSType.DEBIAN
294         elif out.find("Ubuntu") ==0:
295             self._os = OSType.UBUNTU
296         else:
297             msg = "Unsupported OS"
298             self.error(msg, out)
299             raise RuntimeError, "%s - %s " %( msg, out )
300
301         return self._os
302
303     def get_os(self):
304         # The underlying SSH layer will sometimes return an empty
305         # output (even if the command was executed without errors).
306         # To work arround this, repeat the operation N times or
307         # until the result is not empty string
308         out = ""
309         try:
310             (out, err), proc = self.execute("cat /etc/issue", 
311                     with_lock = True,
312                     blocking = True)
313         except:
314             trace = traceback.format_exc()
315             msg = "Error detecting OS: %s " % trace
316             self.error(msg, out, err)
317         
318         return out
319
320     @property
321     def use_deb(self):
322         return self.os in [OSType.DEBIAN, OSType.UBUNTU]
323
324     @property
325     def use_rpm(self):
326         return self.os in [OSType.FEDORA_12, OSType.FEDORA_14, OSType.FEDORA_8,
327                 OSType.FEDORA]
328
329     @property
330     def localhost(self):
331         return self.get("hostname") in ['localhost', '127.0.0.7', '::1']
332
333     def do_provision(self):
334         # check if host is alive
335         if not self.is_alive():
336             msg = "Deploy failed. Unresponsive node %s" % self.get("hostname")
337             self.error(msg)
338             raise RuntimeError, msg
339
340         self.find_home()
341
342         if self.get("cleanProcesses"):
343             self.clean_processes()
344
345         if self.get("cleanHome"):
346             self.clean_home()
347  
348         if self.get("cleanExperiment"):
349             self.clean_experiment()
350     
351         # Create shared directory structure
352         self.mkdir(self.lib_dir)
353         self.mkdir(self.bin_dir)
354         self.mkdir(self.src_dir)
355         self.mkdir(self.share_dir)
356
357         # Create experiment node home directory
358         self.mkdir(self.node_home)
359
360         super(LinuxNode, self).do_provision()
361
362     def do_deploy(self):
363         if self.state == ResourceState.NEW:
364             self.info("Deploying node")
365             self.do_discover()
366             self.do_provision()
367
368         # Node needs to wait until all associated interfaces are 
369         # ready before it can finalize deployment
370         from nepi.resources.linux.interface import LinuxInterface
371         ifaces = self.get_connected(LinuxInterface.get_rtype())
372         for iface in ifaces:
373             if iface.state < ResourceState.READY:
374                 self.ec.schedule(reschedule_delay, self.deploy)
375                 return 
376
377         super(LinuxNode, self).do_deploy()
378
379     def do_release(self):
380         rms = self.get_connected()
381         for rm in rms:
382             # Node needs to wait until all associated RMs are released
383             # before it can be released
384             if rm.state != ResourceState.RELEASED:
385                 self.ec.schedule(reschedule_delay, self.release)
386                 return 
387
388         tear_down = self.get("tearDown")
389         if tear_down:
390             self.execute(tear_down)
391
392         self.clean_processes()
393
394         super(LinuxNode, self).do_release()
395
396     def valid_connection(self, guid):
397         # TODO: Validate!
398         return True
399
400     def clean_processes(self):
401         self.info("Cleaning up processes")
402  
403         if self.get("username") != 'root':
404             cmd = ("sudo -S killall tcpdump || /bin/true ; " +
405                 "sudo -S kill $(ps aux | grep '[n]epi' | awk '{print $2}') || /bin/true ; " +
406                 "sudo -S killall -u %s || /bin/true ; " % self.get("username"))
407         else:
408             if self.state >= ResourceState.READY:
409                 import pickle
410                 pids = pickle.load(open("/tmp/save.proc", "rb"))
411                 pids_temp = dict()
412                 ps_aux = "ps aux |awk '{print $2,$11}'"
413                 (out, err), proc = self.execute(ps_aux)
414                 for line in out.strip().split("\n"):
415                     parts = line.strip().split(" ")
416                     pids_temp[parts[0]] = parts[1]
417                 kill_pids = set(pids_temp.items()) - set(pids.items())
418                 kill_pids = ' '.join(dict(kill_pids).keys())
419
420                 cmd = ("killall tcpdump || /bin/true ; " +
421                     "kill $(ps aux | grep '[n]epi' | awk '{print $2}') || /bin/true ; " +
422                     "kill %s || /bin/true ; " % kill_pids)
423             else:
424                 cmd = ("killall tcpdump || /bin/true ; " +
425                     "kill $(ps aux | grep '[n]epi' | awk '{print $2}') || /bin/true ; ")
426
427         out = err = ""
428         (out, err), proc = self.execute(cmd, retry = 1, with_lock = True)
429
430     def clean_home(self):
431         """ Cleans all NEPI related folders in the Linux host
432         """
433         self.info("Cleaning up home")
434         
435         cmd = "cd %s ; find . -maxdepth 1 \( -name 'nepi-usr' -o -name 'nepi-exp' \) -execdir rm -rf {} + " % (
436                 self.home_dir )
437
438         return self.execute(cmd, with_lock = True)
439
440     def clean_experiment(self):
441         """ Cleans all experiment related files in the Linux host.
442         It preserves NEPI files and folders that have a multi experiment
443         scope.
444         """
445         self.info("Cleaning up experiment files")
446         
447         cmd = "cd %s ; find . -maxdepth 1 -name '%s' -execdir rm -rf {} + " % (
448                 self.exp_dir,
449                 self.ec.exp_id )
450             
451         return self.execute(cmd, with_lock = True)
452
453     def execute(self, command,
454             sudo = False,
455             stdin = None, 
456             env = None,
457             tty = False,
458             forward_x11 = False,
459             timeout = None,
460             retry = 3,
461             err_on_timeout = True,
462             connect_timeout = 30,
463             strict_host_checking = False,
464             persistent = True,
465             blocking = True,
466             with_lock = False
467             ):
468         """ Notice that this invocation will block until the
469         execution finishes. If this is not the desired behavior,
470         use 'run' instead."""
471
472         if self.localhost:
473             (out, err), proc = execfuncs.lexec(command, 
474                     user = self.get("username"), # still problem with localhost
475                     sudo = sudo,
476                     stdin = stdin,
477                     env = env)
478         else:
479             if with_lock:
480                 with self._node_lock:
481                     (out, err), proc = sshfuncs.rexec(
482                         command, 
483                         host = self.get("hostname"),
484                         user = self.get("username"),
485                         port = self.get("port"),
486                         gwuser = self.get("gatewayUser"),
487                         gw = self.get("gateway"),
488                         agent = True,
489                         sudo = sudo,
490                         stdin = stdin,
491                         identity = self.get("identity"),
492                         server_key = self.get("serverKey"),
493                         env = env,
494                         tty = tty,
495                         forward_x11 = forward_x11,
496                         timeout = timeout,
497                         retry = retry,
498                         err_on_timeout = err_on_timeout,
499                         connect_timeout = connect_timeout,
500                         persistent = persistent,
501                         blocking = blocking, 
502                         strict_host_checking = strict_host_checking
503                         )
504             else:
505                 (out, err), proc = sshfuncs.rexec(
506                     command, 
507                     host = self.get("hostname"),
508                     user = self.get("username"),
509                     port = self.get("port"),
510                     gwuser = self.get("gatewayUser"),
511                     gw = self.get("gateway"),
512                     agent = True,
513                     sudo = sudo,
514                     stdin = stdin,
515                     identity = self.get("identity"),
516                     server_key = self.get("serverKey"),
517                     env = env,
518                     tty = tty,
519                     forward_x11 = forward_x11,
520                     timeout = timeout,
521                     retry = retry,
522                     err_on_timeout = err_on_timeout,
523                     connect_timeout = connect_timeout,
524                     persistent = persistent,
525                     blocking = blocking, 
526                     strict_host_checking = strict_host_checking
527                     )
528
529         return (out, err), proc
530
531     def run(self, command, home,
532             create_home = False,
533             pidfile = 'pidfile',
534             stdin = None, 
535             stdout = 'stdout', 
536             stderr = 'stderr', 
537             sudo = False,
538             tty = False):
539         
540         self.debug("Running command '%s'" % command)
541         
542         if self.localhost:
543             (out, err), proc = execfuncs.lspawn(command, pidfile, 
544                     stdout = stdout, 
545                     stderr = stderr, 
546                     stdin = stdin, 
547                     home = home, 
548                     create_home = create_home, 
549                     sudo = sudo,
550                     user = user) 
551         else:
552             with self._node_lock:
553                 (out, err), proc = sshfuncs.rspawn(
554                     command,
555                     pidfile = pidfile,
556                     home = home,
557                     create_home = create_home,
558                     stdin = stdin if stdin is not None else '/dev/null',
559                     stdout = stdout if stdout else '/dev/null',
560                     stderr = stderr if stderr else '/dev/null',
561                     sudo = sudo,
562                     host = self.get("hostname"),
563                     user = self.get("username"),
564                     port = self.get("port"),
565                     gwuser = self.get("gatewayUser"),
566                     gw = self.get("gateway"),
567                     agent = True,
568                     identity = self.get("identity"),
569                     server_key = self.get("serverKey"),
570                     tty = tty
571                     )
572
573         return (out, err), proc
574
575     def getpid(self, home, pidfile = "pidfile"):
576         if self.localhost:
577             pidtuple =  execfuncs.lgetpid(os.path.join(home, pidfile))
578         else:
579             with self._node_lock:
580                 pidtuple = sshfuncs.rgetpid(
581                     os.path.join(home, pidfile),
582                     host = self.get("hostname"),
583                     user = self.get("username"),
584                     port = self.get("port"),
585                     gwuser = self.get("gatewayUser"),
586                     gw = self.get("gateway"),
587                     agent = True,
588                     identity = self.get("identity"),
589                     server_key = self.get("serverKey")
590                     )
591         
592         return pidtuple
593
594     def status(self, pid, ppid):
595         if self.localhost:
596             status = execfuncs.lstatus(pid, ppid)
597         else:
598             with self._node_lock:
599                 status = sshfuncs.rstatus(
600                         pid, ppid,
601                         host = self.get("hostname"),
602                         user = self.get("username"),
603                         port = self.get("port"),
604                         gwuser = self.get("gatewayUser"),
605                         gw = self.get("gateway"),
606                         agent = True,
607                         identity = self.get("identity"),
608                         server_key = self.get("serverKey")
609                         )
610            
611         return status
612     
613     def kill(self, pid, ppid, sudo = False):
614         out = err = ""
615         proc = None
616         status = self.status(pid, ppid)
617
618         if status == sshfuncs.ProcStatus.RUNNING:
619             if self.localhost:
620                 (out, err), proc = execfuncs.lkill(pid, ppid, sudo)
621             else:
622                 with self._node_lock:
623                     (out, err), proc = sshfuncs.rkill(
624                         pid, ppid,
625                         host = self.get("hostname"),
626                         user = self.get("username"),
627                         port = self.get("port"),
628                         gwuser = self.get("gatewayUser"),
629                         gw = self.get("gateway"),
630                         agent = True,
631                         sudo = sudo,
632                         identity = self.get("identity"),
633                         server_key = self.get("serverKey")
634                         )
635
636         return (out, err), proc
637
638     def copy(self, src, dst):
639         if self.localhost:
640             (out, err), proc = execfuncs.lcopy(source, dest, 
641                     recursive = True,
642                     strict_host_checking = False)
643         else:
644             with self._node_lock:
645                 (out, err), proc = sshfuncs.rcopy(
646                     src, dst, 
647                     port = self.get("port"),
648                     gwuser = self.get("gatewayUser"),
649                     gw = self.get("gateway"),
650                     identity = self.get("identity"),
651                     server_key = self.get("serverKey"),
652                     recursive = True,
653                     strict_host_checking = False)
654
655         return (out, err), proc
656
657     def upload(self, src, dst, text = False, overwrite = True):
658         """ Copy content to destination
659
660         src  string with the content to copy. Can be:
661             - plain text
662             - a string with the path to a local file
663             - a string with a colon-separeted list of local files
664             - a string with a local directory
665
666         dst  string with destination path on the remote host (remote is 
667             always self.host)
668
669         text src is text input, it must be stored into a temp file before 
670         uploading
671         """
672         # If source is a string input 
673         f = None
674         if text and not os.path.isfile(src):
675             # src is text input that should be uploaded as file
676             # create a temporal file with the content to upload
677             f = tempfile.NamedTemporaryFile(delete=False)
678             f.write(src)
679             f.close()
680             src = f.name
681
682         # If dst files should not be overwritten, check that the files do not
683         # exits already
684         if isinstance(src, str):
685             src = map(str.strip, src.split(";"))
686     
687         if overwrite == False:
688             src = self.filter_existing_files(src, dst)
689             if not src:
690                 return ("", ""), None
691
692         if not self.localhost:
693             # Build destination as <user>@<server>:<path>
694             dst = "%s@%s:%s" % (self.get("username"), self.get("hostname"), dst)
695
696         result = self.copy(src, dst)
697
698         # clean up temp file
699         if f:
700             os.remove(f.name)
701
702         return result
703
704     def download(self, src, dst):
705         if not self.localhost:
706             # Build destination as <user>@<server>:<path>
707             src = "%s@%s:%s" % (self.get("username"), self.get("hostname"), src)
708         return self.copy(src, dst)
709
710     def install_packages_command(self, packages):
711         command = ""
712         if self.use_rpm:
713             command = rpmfuncs.install_packages_command(self.os, packages)
714         elif self.use_deb:
715             command = debfuncs.install_packages_command(self.os, packages)
716         else:
717             msg = "Error installing packages ( OS not known ) "
718             self.error(msg, self.os)
719             raise RuntimeError, msg
720
721         return command
722
723     def install_packages(self, packages, home, run_home = None):
724         """ Install packages in the Linux host.
725
726         'home' is the directory to upload the package installation script.
727         'run_home' is the directory from where to execute the script.
728         """
729         command = self.install_packages_command(packages)
730
731         run_home = run_home or home
732
733         (out, err), proc = self.run_and_wait(command, run_home, 
734             shfile = os.path.join(home, "instpkg.sh"),
735             pidfile = "instpkg_pidfile",
736             ecodefile = "instpkg_exitcode",
737             stdout = "instpkg_stdout", 
738             stderr = "instpkg_stderr",
739             overwrite = False,
740             raise_on_error = True)
741
742         return (out, err), proc 
743
744     def remove_packages(self, packages, home, run_home = None):
745         """ Uninstall packages from the Linux host.
746
747         'home' is the directory to upload the package un-installation script.
748         'run_home' is the directory from where to execute the script.
749         """
750         if self.use_rpm:
751             command = rpmfuncs.remove_packages_command(self.os, packages)
752         elif self.use_deb:
753             command = debfuncs.remove_packages_command(self.os, packages)
754         else:
755             msg = "Error removing packages ( OS not known ) "
756             self.error(msg)
757             raise RuntimeError, msg
758
759         run_home = run_home or home
760
761         (out, err), proc = self.run_and_wait(command, run_home, 
762             shfile = os.path.join(home, "rmpkg.sh"),
763             pidfile = "rmpkg_pidfile",
764             ecodefile = "rmpkg_exitcode",
765             stdout = "rmpkg_stdout", 
766             stderr = "rmpkg_stderr",
767             overwrite = False,
768             raise_on_error = True)
769          
770         return (out, err), proc 
771
772     def mkdir(self, path, clean = False):
773         if clean:
774             self.rmdir(path)
775
776         return self.execute("mkdir -p %s" % path, with_lock = True)
777
778     def rmdir(self, path):
779         return self.execute("rm -rf %s" % path, with_lock = True)
780         
781     def run_and_wait(self, command, home, 
782             shfile = "cmd.sh",
783             env = None,
784             overwrite = True,
785             pidfile = "pidfile", 
786             ecodefile = "exitcode", 
787             stdin = None, 
788             stdout = "stdout", 
789             stderr = "stderr", 
790             sudo = False,
791             tty = False,
792             raise_on_error = False):
793         """
794         Uploads the 'command' to a bash script in the host.
795         Then runs the script detached in background in the host, and
796         busy-waites until the script finishes executing.
797         """
798
799         if not shfile.startswith("/"):
800             shfile = os.path.join(home, shfile)
801
802         self.upload_command(command, 
803             shfile = shfile, 
804             ecodefile = ecodefile, 
805             env = env,
806             overwrite = overwrite)
807
808         command = "bash %s" % shfile
809         # run command in background in remote host
810         (out, err), proc = self.run(command, home, 
811                 pidfile = pidfile,
812                 stdin = stdin, 
813                 stdout = stdout, 
814                 stderr = stderr, 
815                 sudo = sudo,
816                 tty = tty)
817
818         # check no errors occurred
819         if proc.poll():
820             msg = " Failed to run command '%s' " % command
821             self.error(msg, out, err)
822             if raise_on_error:
823                 raise RuntimeError, msg
824
825         # Wait for pid file to be generated
826         pid, ppid = self.wait_pid(
827                 home = home, 
828                 pidfile = pidfile, 
829                 raise_on_error = raise_on_error)
830
831         # wait until command finishes to execute
832         self.wait_run(pid, ppid)
833       
834         (eout, err), proc = self.check_errors(home,
835             ecodefile = ecodefile,
836             stderr = stderr)
837
838         # Out is what was written in the stderr file
839         if err:
840             msg = " Failed to run command '%s' " % command
841             self.error(msg, eout, err)
842
843             if raise_on_error:
844                 raise RuntimeError, msg
845
846         (out, oerr), proc = self.check_output(home, stdout)
847         
848         return (out, err), proc
849
850     def exitcode(self, home, ecodefile = "exitcode"):
851         """
852         Get the exit code of an application.
853         Returns an integer value with the exit code 
854         """
855         (out, err), proc = self.check_output(home, ecodefile)
856
857         # Succeeded to open file, return exit code in the file
858         if proc.wait() == 0:
859             try:
860                 return int(out.strip())
861             except:
862                 # Error in the content of the file!
863                 return ExitCode.CORRUPTFILE
864
865         # No such file or directory
866         if proc.returncode == 1:
867             return ExitCode.FILENOTFOUND
868         
869         # Other error from 'cat'
870         return ExitCode.ERROR
871
872     def upload_command(self, command, 
873             shfile = "cmd.sh",
874             ecodefile = "exitcode",
875             overwrite = True,
876             env = None):
877         """ Saves the command as a bash script file in the remote host, and
878         forces to save the exit code of the command execution to the ecodefile
879         """
880
881         if not (command.strip().endswith(";") or command.strip().endswith("&")):
882             command += ";"
883       
884         # The exit code of the command will be stored in ecodefile
885         command = " { %(command)s } ; echo $? > %(ecodefile)s ;" % {
886                 'command': command,
887                 'ecodefile': ecodefile,
888                 } 
889
890         # Export environment
891         environ = self.format_environment(env)
892
893         # Add environ to command
894         command = environ + command
895
896         return self.upload(command, shfile, text = True, overwrite = overwrite)
897
898     def format_environment(self, env, inline = False):
899         """ Formats the environment variables for a command to be executed
900         either as an inline command
901         (i.e. export PYTHONPATH=src/..; export LALAL= ..;python script.py) or 
902         as a bash script (i.e. export PYTHONPATH=src/.. \n export LALA=.. \n)
903         """
904         if not env: return ""
905
906         # Remove extra white spaces
907         env = re.sub(r'\s+', ' ', env.strip())
908
909         sep = ";" if inline else "\n"
910         return sep.join(map(lambda e: " export %s" % e, env.split(" "))) + sep 
911
912     def check_errors(self, home, 
913             ecodefile = "exitcode", 
914             stderr = "stderr"):
915         """ Checks whether errors occurred while running a command.
916         It first checks the exit code for the command, and only if the
917         exit code is an error one it returns the error output.
918
919         """
920         proc = None
921         err = ""
922
923         # get exit code saved in the 'exitcode' file
924         ecode = self.exitcode(home, ecodefile)
925
926         if ecode in [ ExitCode.CORRUPTFILE, ExitCode.ERROR ]:
927             err = "Error retrieving exit code status from file %s/%s" % (home, ecodefile)
928         elif ecode > 0 or ecode == ExitCode.FILENOTFOUND:
929             # The process returned an error code or didn't exist. 
930             # Check standard error.
931             (err, eerr), proc = self.check_output(home, stderr)
932
933             # If the stderr file was not found, assume nothing bad happened,
934             # and just ignore the error.
935             # (cat returns 1 for error "No such file or directory")
936             if ecode == ExitCode.FILENOTFOUND and proc.poll() == 1: 
937                 err = "" 
938             
939         return ("", err), proc
940  
941     def wait_pid(self, home, pidfile = "pidfile", raise_on_error = False):
942         """ Waits until the pid file for the command is generated, 
943             and returns the pid and ppid of the process """
944         pid = ppid = None
945         delay = 1.0
946
947         for i in xrange(2):
948             pidtuple = self.getpid(home = home, pidfile = pidfile)
949             
950             if pidtuple:
951                 pid, ppid = pidtuple
952                 break
953             else:
954                 time.sleep(delay)
955                 delay = delay * 1.5
956         else:
957             msg = " Failed to get pid for pidfile %s/%s " % (
958                     home, pidfile )
959             self.error(msg)
960             
961             if raise_on_error:
962                 raise RuntimeError, msg
963
964         return pid, ppid
965
966     def wait_run(self, pid, ppid, trial = 0):
967         """ wait for a remote process to finish execution """
968         delay = 1.0
969
970         while True:
971             status = self.status(pid, ppid)
972             
973             if status is ProcStatus.FINISHED:
974                 break
975             elif status is not ProcStatus.RUNNING:
976                 delay = delay * 1.5
977                 time.sleep(delay)
978                 # If it takes more than 20 seconds to start, then
979                 # asume something went wrong
980                 if delay > 20:
981                     break
982             else:
983                 # The app is running, just wait...
984                 time.sleep(0.5)
985
986     def check_output(self, home, filename):
987         """ Retrives content of file """
988         (out, err), proc = self.execute("cat %s" % 
989             os.path.join(home, filename), retry = 1, with_lock = True)
990         return (out, err), proc
991
992     def is_alive(self):
993         """ Checks if host is responsive
994         """
995         if self.localhost:
996             return True
997
998         out = err = ""
999         msg = "Unresponsive host. Wrong answer. "
1000
1001         # The underlying SSH layer will sometimes return an empty
1002         # output (even if the command was executed without errors).
1003         # To work arround this, repeat the operation N times or
1004         # until the result is not empty string
1005         try:
1006             (out, err), proc = self.execute("echo 'ALIVE'",
1007                     blocking = True,
1008                     with_lock = True)
1009     
1010             if out.find("ALIVE") > -1:
1011                 return True
1012         except:
1013             trace = traceback.format_exc()
1014             msg = "Unresponsive host. Error reaching host: %s " % trace
1015
1016         self.error(msg, out, err)
1017         return False
1018
1019     def find_home(self):
1020         """ Retrieves host home directory
1021         """
1022         # The underlying SSH layer will sometimes return an empty
1023         # output (even if the command was executed without errors).
1024         # To work arround this, repeat the operation N times or
1025         # until the result is not empty string
1026         msg = "Impossible to retrieve HOME directory"
1027         try:
1028             (out, err), proc = self.execute("echo ${HOME}",
1029                     blocking = True,
1030                     with_lock = True)
1031     
1032             if out.strip() != "":
1033                 self._home_dir =  out.strip()
1034         except:
1035             trace = traceback.format_exc()
1036             msg = "Impossible to retrieve HOME directory %s" % trace
1037
1038         if not self._home_dir:
1039             self.error(msg)
1040             raise RuntimeError, msg
1041
1042     def filter_existing_files(self, src, dst):
1043         """ Removes files that already exist in the Linux host from src list
1044         """
1045         # construct a dictionary with { dst: src }
1046         dests = dict(map(
1047             lambda s: (os.path.join(dst, os.path.basename(s)), s ), s)) \
1048                     if len(src) > 1 else dict({dst: src[0]})
1049
1050         command = []
1051         for d in dests.keys():
1052             command.append(" [ -f %(dst)s ] && echo '%(dst)s' " % {'dst' : d} )
1053
1054         command = ";".join(command)
1055
1056         (out, err), proc = self.execute(command, retry = 1, with_lock = True)
1057     
1058         for d in dests.keys():
1059             if out.find(d) > -1:
1060                 del dests[d]
1061
1062         if not dests:
1063             return []
1064
1065         return dests.values()
1066