Merging the openflow part to the nepi-3-dev branch
[nepi.git] / src / nepi / resources / linux / node.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19
20 from nepi.execution.attribute import Attribute, Flags, Types
21 from nepi.execution.resource import ResourceManager, clsinit_copy, \
22         ResourceState, reschedule_delay
23 from nepi.resources.linux import rpmfuncs, debfuncs 
24 from nepi.util import sshfuncs, execfuncs
25 from nepi.util.sshfuncs import ProcStatus
26
27 import collections
28 import os
29 import random
30 import re
31 import tempfile
32 import time
33 import threading
34 import traceback
35
36 # TODO: Unify delays!!
37 # TODO: Validate outcome of uploads!! 
38
39 class ExitCode:
40     """
41     Error codes that the rexitcode function can return if unable to
42     check the exit code of a spawned process
43     """
44     FILENOTFOUND = -1
45     CORRUPTFILE = -2
46     ERROR = -3
47     OK = 0
48
49 class OSType:
50     """
51     Supported flavors of Linux OS
52     """
53     FEDORA_8 = "f8"
54     FEDORA_12 = "f12"
55     FEDORA_14 = "f14"
56     FEDORA = "fedora"
57     UBUNTU = "ubuntu"
58     DEBIAN = "debian"
59
60 @clsinit_copy
61 class LinuxNode(ResourceManager):
62     """
63     .. class:: Class Args :
64       
65         :param ec: The Experiment controller
66         :type ec: ExperimentController
67         :param guid: guid of the RM
68         :type guid: int
69
70     .. note::
71
72         There are different ways in which commands can be executed using the
73         LinuxNode interface (i.e. 'execute' - blocking and non blocking, 'run',
74         'run_and_wait'). 
75         
76         Brief explanation:
77
78             * 'execute' (blocking mode) :  
79
80                      HOW IT WORKS: 'execute', forks a process and run the
81                      command, synchronously, attached to the terminal, in
82                      foreground.
83                      The execute method will block until the command returns
84                      the result on 'out', 'err' (so until it finishes executing).
85   
86                      USAGE: short-lived commands that must be executed attached
87                      to a terminal and in foreground, for which it IS necessary
88                      to block until the command has finished (e.g. if you want
89                      to run 'ls' or 'cat').
90
91             * 'execute' (NON blocking mode - blocking = False) :
92
93                     HOW IT WORKS: Same as before, except that execute method
94                     will return immediately (even if command still running).
95
96                     USAGE: long-lived commands that must be executed attached
97                     to a terminal and in foreground, but for which it is not
98                     necessary to block until the command has finished. (e.g.
99                     start an application using X11 forwarding)
100
101              * 'run' :
102
103                    HOW IT WORKS: Connects to the host ( using SSH if remote)
104                    and launches the command in background, detached from any
105                    terminal (daemonized), and returns. The command continues to
106                    run remotely, but since it is detached from the terminal,
107                    its pipes (stdin, stdout, stderr) can't be redirected to the
108                    console (as normal non detached processes would), and so they
109                    are explicitly redirected to files. The pidfile is created as
110                    part of the process of launching the command. The pidfile
111                    holds the pid and ppid of the process forked in background,
112                    so later on it is possible to check whether the command is still
113                    running.
114
115                     USAGE: long-lived commands that can run detached in background,
116                     for which it is NOT necessary to block (wait) until the command
117                     has finished. (e.g. start an application that is not using X11
118                     forwarding. It can run detached and remotely in background)
119
120              * 'run_and_wait' :
121
122                     HOW IT WORKS: Similar to 'run' except that it 'blocks' until
123                     the command has finished execution. It also checks whether
124                     errors occurred during runtime by reading the exitcode file,
125                     which contains the exit code of the command that was run
126                     (checking stderr only is not always reliable since many
127                     commands throw debugging info to stderr and the only way to
128                     automatically know whether an error really happened is to
129                     check the process exit code).
130
131                     Another difference with respect to 'run', is that instead
132                     of directly executing the command as a bash command line,
133                     it uploads the command to a bash script and runs the script.
134                     This allows to use the bash script to debug errors, since
135                     it remains at the remote host and can be run manually to
136                     reproduce the error.
137                   
138                     USAGE: medium-lived commands that can run detached in
139                     background, for which it IS necessary to block (wait) until
140                     the command has finished. (e.g. Package installation,
141                     source compilation, file download, etc)
142
143     """
144     _rtype = "LinuxNode"
145     _help = "Controls Linux host machines ( either localhost or a host " \
146             "that can be accessed using a SSH key)"
147     _backend_type = "linux"
148
149     @classmethod
150     def _register_attributes(cls):
151         hostname = Attribute("hostname", "Hostname of the machine",
152                 flags = Flags.Design)
153
154         username = Attribute("username", "Local account username", 
155                 flags = Flags.Credential)
156
157         port = Attribute("port", "SSH port", flags = Flags.Design)
158         
159         home = Attribute("home",
160                 "Experiment home directory to store all experiment related files",
161                 flags = Flags.Design)
162         
163         identity = Attribute("identity", "SSH identity file",
164                 flags = Flags.Credential)
165         
166         server_key = Attribute("serverKey", "Server public key", 
167                 flags = Flags.Design)
168         
169         clean_home = Attribute("cleanHome", "Remove all nepi files and directories "
170                 " from node home folder before starting experiment", 
171                 type = Types.Bool,
172                 default = False,
173                 flags = Flags.Design)
174
175         clean_experiment = Attribute("cleanExperiment", "Remove all files and directories " 
176                 " from a previous same experiment, before the new experiment starts", 
177                 type = Types.Bool,
178                 default = False,
179                 flags = Flags.Design)
180         
181         clean_processes = Attribute("cleanProcesses", 
182                 "Kill all running processes before starting experiment",
183                 type = Types.Bool,
184                 default = False,
185                 flags = Flags.Design)
186         
187         tear_down = Attribute("tearDown", "Bash script to be executed before " + \
188                 "releasing the resource",
189                 flags = Flags.Design)
190
191         gateway_user = Attribute("gatewayUser", "Gateway account username",
192                 flags = Flags.Design)
193
194         gateway = Attribute("gateway", "Hostname of the gateway machine",
195                 flags = Flags.Design)
196
197         cls._register_attribute(hostname)
198         cls._register_attribute(username)
199         cls._register_attribute(port)
200         cls._register_attribute(home)
201         cls._register_attribute(identity)
202         cls._register_attribute(server_key)
203         cls._register_attribute(clean_home)
204         cls._register_attribute(clean_experiment)
205         cls._register_attribute(clean_processes)
206         cls._register_attribute(tear_down)
207         cls._register_attribute(gateway_user)
208         cls._register_attribute(gateway)
209
210     def __init__(self, ec, guid):
211         super(LinuxNode, self).__init__(ec, guid)
212         self._os = None
213         # home directory at Linux host
214         self._home_dir = ""
215
216         # lock to prevent concurrent applications on the same node,
217         # to execute commands at the same time. There are potential
218         # concurrency issues when using SSH to a same host from 
219         # multiple threads. There are also possible operational 
220         # issues, e.g. an application querying the existence 
221         # of a file or folder prior to its creation, and another 
222         # application creating the same file or folder in between.
223         self._node_lock = threading.Lock()
224     
225     def log_message(self, msg):
226         return " guid %d - host %s - %s " % (self.guid, 
227                 self.get("hostname"), msg)
228
229     @property
230     def home_dir(self):
231         home = self.get("home") or ""
232         if not home.startswith("/"):
233            home = os.path.join(self._home_dir, home) 
234         return home
235
236     @property
237     def nepi_home(self):
238         return os.path.join(self.home_dir, ".nepi")
239
240     @property
241     def usr_dir(self):
242         return os.path.join(self.nepi_home, "nepi-usr")
243
244     @property
245     def lib_dir(self):
246         return os.path.join(self.usr_dir, "lib")
247
248     @property
249     def bin_dir(self):
250         return os.path.join(self.usr_dir, "bin")
251
252     @property
253     def src_dir(self):
254         return os.path.join(self.usr_dir, "src")
255
256     @property
257     def share_dir(self):
258         return os.path.join(self.usr_dir, "share")
259
260     @property
261     def exp_dir(self):
262         return os.path.join(self.nepi_home, "nepi-exp")
263
264     @property
265     def exp_home(self):
266         return os.path.join(self.exp_dir, self.ec.exp_id)
267
268     @property
269     def node_home(self):
270         return os.path.join(self.exp_home, "node-%d" % self.guid)
271
272     @property
273     def run_home(self):
274         return os.path.join(self.node_home, self.ec.run_id)
275
276     @property
277     def os(self):
278         if self._os:
279             return self._os
280
281         if self.get("hostname") not in ["localhost", "127.0.0.1"] and \
282                 not self.get("username"):
283             msg = "Can't resolve OS, insufficient data "
284             self.error(msg)
285             raise RuntimeError, msg
286
287         out = self.get_os()
288
289         if out.find("Fedora release 8") == 0:
290             self._os = OSType.FEDORA_8
291         elif out.find("Fedora release 12") == 0:
292             self._os = OSType.FEDORA_12
293         elif out.find("Fedora release 14") == 0:
294             self._os = OSType.FEDORA_14
295         elif out.find("Fedora release") == 0:
296             self._os = OSType.FEDORA
297         elif out.find("Debian") == 0: 
298             self._os = OSType.DEBIAN
299         elif out.find("Ubuntu") ==0:
300             self._os = OSType.UBUNTU
301         else:
302             msg = "Unsupported OS"
303             self.error(msg, out)
304             raise RuntimeError, "%s - %s " %( msg, out )
305
306         return self._os
307
308     def get_os(self):
309         # The underlying SSH layer will sometimes return an empty
310         # output (even if the command was executed without errors).
311         # To work arround this, repeat the operation N times or
312         # until the result is not empty string
313         out = ""
314         try:
315             (out, err), proc = self.execute("cat /etc/issue", 
316                     with_lock = True,
317                     blocking = True)
318         except:
319             trace = traceback.format_exc()
320             msg = "Error detecting OS: %s " % trace
321             self.error(msg, out, err)
322         
323         return out
324
325     @property
326     def use_deb(self):
327         return self.os in [OSType.DEBIAN, OSType.UBUNTU]
328
329     @property
330     def use_rpm(self):
331         return self.os in [OSType.FEDORA_12, OSType.FEDORA_14, OSType.FEDORA_8,
332                 OSType.FEDORA]
333
334     @property
335     def localhost(self):
336         return self.get("hostname") in ['localhost', '127.0.0.7', '::1']
337
338     def do_provision(self):
339         # check if host is alive
340         if not self.is_alive():
341             msg = "Deploy failed. Unresponsive node %s" % self.get("hostname")
342             self.error(msg)
343             raise RuntimeError, msg
344
345         self.find_home()
346
347         if self.get("cleanProcesses"):
348             self.clean_processes()
349
350         if self.get("cleanHome"):
351             self.clean_home()
352  
353         if self.get("cleanExperiment"):
354             self.clean_experiment()
355     
356         # Create shared directory structure and node home directory
357         paths = [self.lib_dir, 
358             self.bin_dir, 
359             self.src_dir, 
360             self.share_dir, 
361             self.node_home]
362
363         self.mkdir(paths)
364
365         super(LinuxNode, self).do_provision()
366
367     def do_deploy(self):
368         if self.state == ResourceState.NEW:
369             self.info("Deploying node")
370             self.do_discover()
371             self.do_provision()
372
373         # Node needs to wait until all associated interfaces are 
374         # ready before it can finalize deployment
375         from nepi.resources.linux.interface import LinuxInterface
376         ifaces = self.get_connected(LinuxInterface.get_rtype())
377         for iface in ifaces:
378             if iface.state < ResourceState.READY:
379                 self.ec.schedule(reschedule_delay, self.deploy)
380                 return 
381
382         super(LinuxNode, self).do_deploy()
383
384     def do_release(self):
385         rms = self.get_connected()
386         for rm in rms:
387             # Node needs to wait until all associated RMs are released
388             # before it can be released
389             if rm.state != ResourceState.RELEASED:
390                 self.ec.schedule(reschedule_delay, self.release)
391                 return 
392
393         tear_down = self.get("tearDown")
394         if tear_down:
395             self.execute(tear_down)
396
397         self.clean_processes()
398
399         super(LinuxNode, self).do_release()
400
401     def valid_connection(self, guid):
402         # TODO: Validate!
403         return True
404
405     def clean_processes(self):
406         self.info("Cleaning up processes")
407  
408         if self.get("hostname") in ["localhost", "127.0.0.2"]:
409             return 
410         
411         if self.get("username") != 'root':
412             cmd = ("sudo -S killall tcpdump || /bin/true ; " +
413                 "sudo -S killall -u %s || /bin/true ; " % self.get("username"))
414         else:
415             if self.state >= ResourceState.READY:
416                 import pickle
417                 pids = pickle.load(open("/tmp/save.proc", "rb"))
418                 pids_temp = dict()
419                 ps_aux = "ps aux |awk '{print $2,$11}'"
420                 (out, err), proc = self.execute(ps_aux)
421                 for line in out.strip().split("\n"):
422                     parts = line.strip().split(" ")
423                     pids_temp[parts[0]] = parts[1]
424                 kill_pids = set(pids_temp.items()) - set(pids.items())
425                 kill_pids = ' '.join(dict(kill_pids).keys())
426
427                 cmd = ("killall tcpdump || /bin/true ; " +
428                     "kill $(ps aux | grep '[n]epi' | awk '{print $2}') || /bin/true ; " +
429                     "kill %s || /bin/true ; " % kill_pids)
430             else:
431                 cmd = ("killall tcpdump || /bin/true ; " +
432                     "kill $(ps aux | grep '[n]epi' | awk '{print $2}') || /bin/true ; ")
433
434         (out, err), proc = self.execute(cmd, retry = 1, with_lock = True)
435
436     def clean_home(self):
437         """ Cleans all NEPI related folders in the Linux host
438         """
439         self.info("Cleaning up home")
440         
441         cmd = "cd %s ; find . -maxdepth 1 -name \.nepi -execdir rm -rf {} + " % (
442                 self.home_dir )
443
444         return self.execute(cmd, with_lock = True)
445
446     def clean_experiment(self):
447         """ Cleans all experiment related files in the Linux host.
448         It preserves NEPI files and folders that have a multi experiment
449         scope.
450         """
451         self.info("Cleaning up experiment files")
452         
453         cmd = "cd %s ; find . -maxdepth 1 -name '%s' -execdir rm -rf {} + " % (
454                 self.exp_dir,
455                 self.ec.exp_id )
456             
457         return self.execute(cmd, with_lock = True)
458
459     def execute(self, command,
460             sudo = False,
461             env = None,
462             tty = False,
463             forward_x11 = False,
464             retry = 3,
465             connect_timeout = 30,
466             strict_host_checking = False,
467             persistent = True,
468             blocking = True,
469             with_lock = False
470             ):
471         """ Notice that this invocation will block until the
472         execution finishes. If this is not the desired behavior,
473         use 'run' instead."""
474
475         if self.localhost:
476             (out, err), proc = execfuncs.lexec(command, 
477                     user = self.get("username"), # still problem with localhost
478                     sudo = sudo,
479                     env = env)
480         else:
481             if with_lock:
482                 # If the execute command is blocking, we don't want to keep
483                 # the node lock. This lock is used to avoid race conditions
484                 # when creating the ControlMaster sockets. A more elegant
485                 # solution is needed.
486                 with self._node_lock:
487                     (out, err), proc = sshfuncs.rexec(
488                         command, 
489                         host = self.get("hostname"),
490                         user = self.get("username"),
491                         port = self.get("port"),
492                         gwuser = self.get("gatewayUser"),
493                         gw = self.get("gateway"),
494                         agent = True,
495                         sudo = sudo,
496                         identity = self.get("identity"),
497                         server_key = self.get("serverKey"),
498                         env = env,
499                         tty = tty,
500                         forward_x11 = forward_x11,
501                         retry = retry,
502                         connect_timeout = connect_timeout,
503                         persistent = persistent,
504                         blocking = blocking, 
505                         strict_host_checking = strict_host_checking
506                         )
507             else:
508                 (out, err), proc = sshfuncs.rexec(
509                     command, 
510                     host = self.get("hostname"),
511                     user = self.get("username"),
512                     port = self.get("port"),
513                     gwuser = self.get("gatewayUser"),
514                     gw = self.get("gateway"),
515                     agent = True,
516                     sudo = sudo,
517                     identity = self.get("identity"),
518                     server_key = self.get("serverKey"),
519                     env = env,
520                     tty = tty,
521                     forward_x11 = forward_x11,
522                     retry = retry,
523                     connect_timeout = connect_timeout,
524                     persistent = persistent,
525                     blocking = blocking, 
526                     strict_host_checking = strict_host_checking
527                     )
528
529         return (out, err), proc
530
531     def run(self, command, home,
532             create_home = False,
533             pidfile = 'pidfile',
534             stdin = None, 
535             stdout = 'stdout', 
536             stderr = 'stderr', 
537             sudo = False,
538             tty = False):
539         
540         self.debug("Running command '%s'" % command)
541         
542         if self.localhost:
543             (out, err), proc = execfuncs.lspawn(command, pidfile,
544                     home = home, 
545                     create_home = create_home, 
546                     stdin = stdin or '/dev/null',
547                     stdout = stdout or '/dev/null',
548                     stderr = stderr or '/dev/null',
549                     sudo = sudo) 
550         else:
551             with self._node_lock:
552                 (out, err), proc = sshfuncs.rspawn(
553                     command,
554                     pidfile = pidfile,
555                     home = home,
556                     create_home = create_home,
557                     stdin = stdin or '/dev/null',
558                     stdout = stdout or '/dev/null',
559                     stderr = stderr or '/dev/null',
560                     sudo = sudo,
561                     host = self.get("hostname"),
562                     user = self.get("username"),
563                     port = self.get("port"),
564                     gwuser = self.get("gatewayUser"),
565                     gw = self.get("gateway"),
566                     agent = True,
567                     identity = self.get("identity"),
568                     server_key = self.get("serverKey"),
569                     tty = tty
570                     )
571
572         return (out, err), proc
573
574     def getpid(self, home, pidfile = "pidfile"):
575         if self.localhost:
576             pidtuple =  execfuncs.lgetpid(os.path.join(home, pidfile))
577         else:
578             with self._node_lock:
579                 pidtuple = sshfuncs.rgetpid(
580                     os.path.join(home, pidfile),
581                     host = self.get("hostname"),
582                     user = self.get("username"),
583                     port = self.get("port"),
584                     gwuser = self.get("gatewayUser"),
585                     gw = self.get("gateway"),
586                     agent = True,
587                     identity = self.get("identity"),
588                     server_key = self.get("serverKey")
589                     )
590         
591         return pidtuple
592
593     def status(self, pid, ppid):
594         if self.localhost:
595             status = execfuncs.lstatus(pid, ppid)
596         else:
597             with self._node_lock:
598                 status = sshfuncs.rstatus(
599                         pid, ppid,
600                         host = self.get("hostname"),
601                         user = self.get("username"),
602                         port = self.get("port"),
603                         gwuser = self.get("gatewayUser"),
604                         gw = self.get("gateway"),
605                         agent = True,
606                         identity = self.get("identity"),
607                         server_key = self.get("serverKey")
608                         )
609            
610         return status
611     
612     def kill(self, pid, ppid, sudo = False):
613         out = err = ""
614         proc = None
615         status = self.status(pid, ppid)
616
617         if status == sshfuncs.ProcStatus.RUNNING:
618             if self.localhost:
619                 (out, err), proc = execfuncs.lkill(pid, ppid, sudo)
620             else:
621                 with self._node_lock:
622                     (out, err), proc = sshfuncs.rkill(
623                         pid, ppid,
624                         host = self.get("hostname"),
625                         user = self.get("username"),
626                         port = self.get("port"),
627                         gwuser = self.get("gatewayUser"),
628                         gw = self.get("gateway"),
629                         agent = True,
630                         sudo = sudo,
631                         identity = self.get("identity"),
632                         server_key = self.get("serverKey")
633                         )
634
635         return (out, err), proc
636
637     def copy(self, src, dst):
638         if self.localhost:
639             (out, err), proc = execfuncs.lcopy(src, dst, 
640                     recursive = True)
641         else:
642             with self._node_lock:
643                 (out, err), proc = sshfuncs.rcopy(
644                     src, dst, 
645                     port = self.get("port"),
646                     gwuser = self.get("gatewayUser"),
647                     gw = self.get("gateway"),
648                     identity = self.get("identity"),
649                     server_key = self.get("serverKey"),
650                     recursive = True,
651                     strict_host_checking = False)
652
653         return (out, err), proc
654
655     def upload(self, src, dst, text = False, overwrite = True,
656             raise_on_error = True):
657         """ Copy content to destination
658
659         src  string with the content to copy. Can be:
660             - plain text
661             - a string with the path to a local file
662             - a string with a semi-colon separeted list of local files
663             - a string with a local directory
664
665         dst  string with destination path on the remote host (remote is 
666             always self.host)
667
668         text src is text input, it must be stored into a temp file before 
669         uploading
670         """
671         # If source is a string input 
672         f = None
673         if text and not os.path.isfile(src):
674             # src is text input that should be uploaded as file
675             # create a temporal file with the content to upload
676             f = tempfile.NamedTemporaryFile(delete=False)
677             f.write(src)
678             f.close()
679             src = f.name
680
681         # If dst files should not be overwritten, check that the files do not
682         # exits already
683         if isinstance(src, str):
684             src = map(str.strip, src.split(";"))
685     
686         if overwrite == False:
687             src = self.filter_existing_files(src, dst)
688             if not src:
689                 return ("", ""), None
690
691         if not self.localhost:
692             # Build destination as <user>@<server>:<path>
693             dst = "%s@%s:%s" % (self.get("username"), self.get("hostname"), dst)
694
695         ((out, err), proc) = self.copy(src, dst)
696
697         # clean up temp file
698         if f:
699             os.remove(f.name)
700
701         if err:
702             msg = " Failed to upload files - src: %s dst: %s" %  (";".join(src), dst) 
703             self.error(msg, out, err)
704             
705             msg = "%s out: %s err: %s" % (msg, out, err)
706             if raise_on_error:
707                 raise RuntimeError, msg
708
709         return ((out, err), proc)
710
711     def download(self, src, dst, raise_on_error = True):
712         if not self.localhost:
713             # Build destination as <user>@<server>:<path>
714             src = "%s@%s:%s" % (self.get("username"), self.get("hostname"), src)
715
716         ((out, err), proc) = self.copy(src, dst)
717
718         if err:
719             msg = " Failed to download files - src: %s dst: %s" %  (";".join(src), dst) 
720             self.error(msg, out, err)
721
722             if raise_on_error:
723                 raise RuntimeError, msg
724
725         return ((out, err), proc)
726
727     def install_packages_command(self, packages):
728         command = ""
729         if self.use_rpm:
730             command = rpmfuncs.install_packages_command(self.os, packages)
731         elif self.use_deb:
732             command = debfuncs.install_packages_command(self.os, packages)
733         else:
734             msg = "Error installing packages ( OS not known ) "
735             self.error(msg, self.os)
736             raise RuntimeError, msg
737
738         return command
739
740     def install_packages(self, packages, home, run_home = None,
741             raise_on_error = True):
742         """ Install packages in the Linux host.
743
744         'home' is the directory to upload the package installation script.
745         'run_home' is the directory from where to execute the script.
746         """
747         command = self.install_packages_command(packages)
748
749         run_home = run_home or home
750
751         (out, err), proc = self.run_and_wait(command, run_home, 
752             shfile = os.path.join(home, "instpkg.sh"),
753             pidfile = "instpkg_pidfile",
754             ecodefile = "instpkg_exitcode",
755             stdout = "instpkg_stdout", 
756             stderr = "instpkg_stderr",
757             overwrite = False,
758             raise_on_error = raise_on_error)
759
760         return (out, err), proc 
761
762     def remove_packages(self, packages, home, run_home = None,
763             raise_on_error = True):
764         """ Uninstall packages from the Linux host.
765
766         'home' is the directory to upload the package un-installation script.
767         'run_home' is the directory from where to execute the script.
768         """
769         if self.use_rpm:
770             command = rpmfuncs.remove_packages_command(self.os, packages)
771         elif self.use_deb:
772             command = debfuncs.remove_packages_command(self.os, packages)
773         else:
774             msg = "Error removing packages ( OS not known ) "
775             self.error(msg)
776             raise RuntimeError, msg
777
778         run_home = run_home or home
779
780         (out, err), proc = self.run_and_wait(command, run_home, 
781             shfile = os.path.join(home, "rmpkg.sh"),
782             pidfile = "rmpkg_pidfile",
783             ecodefile = "rmpkg_exitcode",
784             stdout = "rmpkg_stdout", 
785             stderr = "rmpkg_stderr",
786             overwrite = False,
787             raise_on_error = raise_on_error)
788          
789         return (out, err), proc 
790
791     def mkdir(self, paths, clean = False):
792         """ Paths is either a single remote directory path to create,
793         or a list of directories to create.
794         """
795         if clean:
796             self.rmdir(paths)
797
798         if isinstance(paths, str):
799             paths = [paths]
800
801         cmd = " ; ".join(map(lambda path: "mkdir -p %s" % path, paths))
802
803         return self.execute(cmd, with_lock = True)
804
805     def rmdir(self, paths):
806         """ Paths is either a single remote directory path to delete,
807         or a list of directories to delete.
808         """
809
810         if isinstance(paths, str):
811             paths = [paths]
812
813         cmd = " ; ".join(map(lambda path: "rm -rf %s" % path, paths))
814
815         return self.execute(cmd, with_lock = True)
816         
817     def run_and_wait(self, command, home, 
818             shfile = "cmd.sh",
819             env = None,
820             overwrite = True,
821             pidfile = "pidfile", 
822             ecodefile = "exitcode", 
823             stdin = None, 
824             stdout = "stdout", 
825             stderr = "stderr", 
826             sudo = False,
827             tty = False,
828             raise_on_error = True):
829         """
830         Uploads the 'command' to a bash script in the host.
831         Then runs the script detached in background in the host, and
832         busy-waites until the script finishes executing.
833         """
834
835         if not shfile.startswith("/"):
836             shfile = os.path.join(home, shfile)
837
838         self.upload_command(command, 
839             shfile = shfile, 
840             ecodefile = ecodefile, 
841             env = env,
842             overwrite = overwrite)
843
844         command = "bash %s" % shfile
845         # run command in background in remote host
846         (out, err), proc = self.run(command, home, 
847                 pidfile = pidfile,
848                 stdin = stdin, 
849                 stdout = stdout, 
850                 stderr = stderr, 
851                 sudo = sudo,
852                 tty = tty)
853
854         # check no errors occurred
855         if proc.poll():
856             msg = " Failed to run command '%s' " % command
857             self.error(msg, out, err)
858             if raise_on_error:
859                 raise RuntimeError, msg
860
861         # Wait for pid file to be generated
862         pid, ppid = self.wait_pid(
863                 home = home, 
864                 pidfile = pidfile, 
865                 raise_on_error = raise_on_error)
866
867         # wait until command finishes to execute
868         self.wait_run(pid, ppid)
869       
870         (eout, err), proc = self.check_errors(home,
871             ecodefile = ecodefile,
872             stderr = stderr)
873
874         # Out is what was written in the stderr file
875         if err:
876             msg = " Failed to run command '%s' " % command
877             self.error(msg, eout, err)
878
879             if raise_on_error:
880                 raise RuntimeError, msg
881
882         (out, oerr), proc = self.check_output(home, stdout)
883         
884         return (out, err), proc
885
886     def exitcode(self, home, ecodefile = "exitcode"):
887         """
888         Get the exit code of an application.
889         Returns an integer value with the exit code 
890         """
891         (out, err), proc = self.check_output(home, ecodefile)
892
893         # Succeeded to open file, return exit code in the file
894         if proc.wait() == 0:
895             try:
896                 return int(out.strip())
897             except:
898                 # Error in the content of the file!
899                 return ExitCode.CORRUPTFILE
900
901         # No such file or directory
902         if proc.returncode == 1:
903             return ExitCode.FILENOTFOUND
904         
905         # Other error from 'cat'
906         return ExitCode.ERROR
907
908     def upload_command(self, command, 
909             shfile = "cmd.sh",
910             ecodefile = "exitcode",
911             overwrite = True,
912             env = None):
913         """ Saves the command as a bash script file in the remote host, and
914         forces to save the exit code of the command execution to the ecodefile
915         """
916
917         if not (command.strip().endswith(";") or command.strip().endswith("&")):
918             command += ";"
919       
920         # The exit code of the command will be stored in ecodefile
921         command = " { %(command)s } ; echo $? > %(ecodefile)s ;" % {
922                 'command': command,
923                 'ecodefile': ecodefile,
924                 } 
925
926         # Export environment
927         environ = self.format_environment(env)
928
929         # Add environ to command
930         command = environ + command
931
932         return self.upload(command, shfile, text = True, overwrite = overwrite)
933
934     def format_environment(self, env, inline = False):
935         """ Formats the environment variables for a command to be executed
936         either as an inline command
937         (i.e. export PYTHONPATH=src/..; export LALAL= ..;python script.py) or 
938         as a bash script (i.e. export PYTHONPATH=src/.. \n export LALA=.. \n)
939         """
940         if not env: return ""
941
942         # Remove extra white spaces
943         env = re.sub(r'\s+', ' ', env.strip())
944
945         sep = ";" if inline else "\n"
946         return sep.join(map(lambda e: " export %s" % e, env.split(" "))) + sep 
947
948     def check_errors(self, home, 
949             ecodefile = "exitcode", 
950             stderr = "stderr"):
951         """ Checks whether errors occurred while running a command.
952         It first checks the exit code for the command, and only if the
953         exit code is an error one it returns the error output.
954
955         """
956         proc = None
957         err = ""
958
959         # get exit code saved in the 'exitcode' file
960         ecode = self.exitcode(home, ecodefile)
961
962         if ecode in [ ExitCode.CORRUPTFILE, ExitCode.ERROR ]:
963             err = "Error retrieving exit code status from file %s/%s" % (home, ecodefile)
964         elif ecode > 0 or ecode == ExitCode.FILENOTFOUND:
965             # The process returned an error code or didn't exist. 
966             # Check standard error.
967             (err, eerr), proc = self.check_output(home, stderr)
968
969             # If the stderr file was not found, assume nothing bad happened,
970             # and just ignore the error.
971             # (cat returns 1 for error "No such file or directory")
972             if ecode == ExitCode.FILENOTFOUND and proc.poll() == 1: 
973                 err = "" 
974             
975         return ("", err), proc
976  
977     def wait_pid(self, home, pidfile = "pidfile", raise_on_error = False):
978         """ Waits until the pid file for the command is generated, 
979             and returns the pid and ppid of the process """
980         pid = ppid = None
981         delay = 1.0
982
983         for i in xrange(2):
984             pidtuple = self.getpid(home = home, pidfile = pidfile)
985             
986             if pidtuple:
987                 pid, ppid = pidtuple
988                 break
989             else:
990                 time.sleep(delay)
991                 delay = delay * 1.5
992         else:
993             msg = " Failed to get pid for pidfile %s/%s " % (
994                     home, pidfile )
995             self.error(msg)
996             
997             if raise_on_error:
998                 raise RuntimeError, msg
999
1000         return pid, ppid
1001
1002     def wait_run(self, pid, ppid, trial = 0):
1003         """ wait for a remote process to finish execution """
1004         delay = 1.0
1005
1006         while True:
1007             status = self.status(pid, ppid)
1008             
1009             if status is ProcStatus.FINISHED:
1010                 break
1011             elif status is not ProcStatus.RUNNING:
1012                 delay = delay * 1.5
1013                 time.sleep(delay)
1014                 # If it takes more than 20 seconds to start, then
1015                 # asume something went wrong
1016                 if delay > 20:
1017                     break
1018             else:
1019                 # The app is running, just wait...
1020                 time.sleep(0.5)
1021
1022     def check_output(self, home, filename):
1023         """ Retrives content of file """
1024         (out, err), proc = self.execute("cat %s" % 
1025             os.path.join(home, filename), retry = 1, with_lock = True)
1026         return (out, err), proc
1027
1028     def is_alive(self):
1029         """ Checks if host is responsive
1030         """
1031         if self.localhost:
1032             return True
1033
1034         out = err = ""
1035         msg = "Unresponsive host. Wrong answer. "
1036
1037         # The underlying SSH layer will sometimes return an empty
1038         # output (even if the command was executed without errors).
1039         # To work arround this, repeat the operation N times or
1040         # until the result is not empty string
1041         try:
1042             (out, err), proc = self.execute("echo 'ALIVE'",
1043                     blocking = True,
1044                     with_lock = True)
1045     
1046             if out.find("ALIVE") > -1:
1047                 return True
1048         except:
1049             trace = traceback.format_exc()
1050             msg = "Unresponsive host. Error reaching host: %s " % trace
1051
1052         self.error(msg, out, err)
1053         return False
1054
1055     def find_home(self):
1056         """ Retrieves host home directory
1057         """
1058         # The underlying SSH layer will sometimes return an empty
1059         # output (even if the command was executed without errors).
1060         # To work arround this, repeat the operation N times or
1061         # until the result is not empty string
1062         msg = "Impossible to retrieve HOME directory"
1063         try:
1064             (out, err), proc = self.execute("echo ${HOME}",
1065                     blocking = True,
1066                     with_lock = True)
1067     
1068             if out.strip() != "":
1069                 self._home_dir =  out.strip()
1070         except:
1071             trace = traceback.format_exc()
1072             msg = "Impossible to retrieve HOME directory %s" % trace
1073
1074         if not self._home_dir:
1075             self.error(msg)
1076             raise RuntimeError, msg
1077
1078     def filter_existing_files(self, src, dst):
1079         """ Removes files that already exist in the Linux host from src list
1080         """
1081         # construct a dictionary with { dst: src }
1082         dests = dict(map(lambda s: (os.path.join(dst, os.path.basename(s)), s), src)) \
1083                     if len(src) > 1 else dict({dst: src[0]})
1084
1085         command = []
1086         for d in dests.keys():
1087             command.append(" [ -f %(dst)s ] && echo '%(dst)s' " % {'dst' : d} )
1088
1089         command = ";".join(command)
1090
1091         (out, err), proc = self.execute(command, retry = 1, with_lock = True)
1092     
1093         for d in dests.keys():
1094             if out.find(d) > -1:
1095                 del dests[d]
1096
1097         if not dests:
1098             return []
1099
1100         return dests.values()
1101