Adding trace Collector RM
[nepi.git] / src / nepi / resources / linux / node.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19
20 from nepi.execution.attribute import Attribute, Flags
21 from nepi.execution.resource import ResourceManager, clsinit, ResourceState, \
22         reschedule_delay
23 from nepi.resources.linux import rpmfuncs, debfuncs 
24 from nepi.util import sshfuncs, execfuncs
25 from nepi.util.sshfuncs import ProcStatus
26
27 import collections
28 import os
29 import random
30 import re
31 import tempfile
32 import time
33 import threading
34
35 # TODO: Verify files and dirs exists already
36 # TODO: Blacklist nodes!
37 # TODO: Unify delays!!
38 # TODO: Validate outcome of uploads!! 
39
40
41 class ExitCode:
42     """
43     Error codes that the rexitcode function can return if unable to
44     check the exit code of a spawned process
45     """
46     FILENOTFOUND = -1
47     CORRUPTFILE = -2
48     ERROR = -3
49     OK = 0
50
51 class OSType:
52     """
53     Supported flavors of Linux OS
54     """
55     FEDORA_8 = "f8"
56     FEDORA_12 = "f12"
57     FEDORA_14 = "f14"
58     FEDORA = "fedora"
59     UBUNTU = "ubuntu"
60     DEBIAN = "debian"
61
62 @clsinit
63 class LinuxNode(ResourceManager):
64     """
65     .. class:: Class Args :
66       
67         :param ec: The Experiment controller
68         :type ec: ExperimentController
69         :param guid: guid of the RM
70         :type guid: int
71
72     .. note::
73
74         There are different ways in which commands can be executed using the
75         LinuxNode interface (i.e. 'execute' - blocking and non blocking, 'run',
76         'run_and_wait'). 
77         
78         Brief explanation:
79
80             * 'execute' (blocking mode) :  
81
82                      HOW IT WORKS: 'execute', forks a process and run the
83                      command, synchronously, attached to the terminal, in
84                      foreground.
85                      The execute method will block until the command returns
86                      the result on 'out', 'err' (so until it finishes executing).
87   
88                      USAGE: short-lived commands that must be executed attached
89                      to a terminal and in foreground, for which it IS necessary
90                      to block until the command has finished (e.g. if you want
91                      to run 'ls' or 'cat').
92
93             * 'execute' (NON blocking mode - blocking = False) :
94
95                     HOW IT WORKS: Same as before, except that execute method
96                     will return immediately (even if command still running).
97
98                     USAGE: long-lived commands that must be executed attached
99                     to a terminal and in foreground, but for which it is not
100                     necessary to block until the command has finished. (e.g.
101                     start an application using X11 forwarding)
102
103              * 'run' :
104
105                    HOW IT WORKS: Connects to the host ( using SSH if remote)
106                    and launches the command in background, detached from any
107                    terminal (daemonized), and returns. The command continues to
108                    run remotely, but since it is detached from the terminal,
109                    its pipes (stdin, stdout, stderr) can't be redirected to the
110                    console (as normal non detached processes would), and so they
111                    are explicitly redirected to files. The pidfile is created as
112                    part of the process of launching the command. The pidfile
113                    holds the pid and ppid of the process forked in background,
114                    so later on it is possible to check whether the command is still
115                    running.
116
117                     USAGE: long-lived commands that can run detached in background,
118                     for which it is NOT necessary to block (wait) until the command
119                     has finished. (e.g. start an application that is not using X11
120                     forwarding. It can run detached and remotely in background)
121
122              * 'run_and_wait' :
123
124                     HOW IT WORKS: Similar to 'run' except that it 'blocks' until
125                     the command has finished execution. It also checks whether
126                     errors occurred during runtime by reading the exitcode file,
127                     which contains the exit code of the command that was run
128                     (checking stderr only is not always reliable since many
129                     commands throw debugging info to stderr and the only way to
130                     automatically know whether an error really happened is to
131                     check the process exit code).
132
133                     Another difference with respect to 'run', is that instead
134                     of directly executing the command as a bash command line,
135                     it uploads the command to a bash script and runs the script.
136                     This allows to use the bash script to debug errors, since
137                     it remains at the remote host and can be run manually to
138                     reproduce the error.
139                   
140                     USAGE: medium-lived commands that can run detached in
141                     background, for which it IS necessary to block (wait) until
142                     the command has finished. (e.g. Package installation,
143                     source compilation, file download, etc)
144
145     """
146     _rtype = "LinuxNode"
147
148     @classmethod
149     def _register_attributes(cls):
150         hostname = Attribute("hostname", "Hostname of the machine",
151                 flags = Flags.ExecReadOnly)
152
153         username = Attribute("username", "Local account username", 
154                 flags = Flags.Credential)
155
156         port = Attribute("port", "SSH port", flags = Flags.ExecReadOnly)
157         
158         home = Attribute("home",
159                 "Experiment home directory to store all experiment related files",
160                 flags = Flags.ExecReadOnly)
161         
162         identity = Attribute("identity", "SSH identity file",
163                 flags = Flags.Credential)
164         
165         server_key = Attribute("serverKey", "Server public key", 
166                 flags = Flags.ExecReadOnly)
167         
168         clean_home = Attribute("cleanHome", "Remove all files and directories " + \
169                 " from home folder before starting experiment", 
170                 flags = Flags.ExecReadOnly)
171         
172         clean_processes = Attribute("cleanProcesses", 
173                 "Kill all running processes before starting experiment",
174                 flags = Flags.ExecReadOnly)
175         
176         tear_down = Attribute("tearDown", "Bash script to be executed before " + \
177                 "releasing the resource",
178                 flags = Flags.ExecReadOnly)
179
180         cls._register_attribute(hostname)
181         cls._register_attribute(username)
182         cls._register_attribute(port)
183         cls._register_attribute(home)
184         cls._register_attribute(identity)
185         cls._register_attribute(server_key)
186         cls._register_attribute(clean_home)
187         cls._register_attribute(clean_processes)
188         cls._register_attribute(tear_down)
189
190     def __init__(self, ec, guid):
191         super(LinuxNode, self).__init__(ec, guid)
192         self._os = None
193         
194         # lock to avoid concurrency issues on methods used by applications 
195         self._lock = threading.Lock()
196     
197     def log_message(self, msg):
198         return " guid %d - host %s - %s " % (self.guid, 
199                 self.get("hostname"), msg)
200
201     @property
202     def home(self):
203         return self.get("home") or ""
204
205     @property
206     def exp_home(self):
207         return os.path.join(self.home, self.ec.exp_id)
208
209     @property
210     def node_home(self):
211         node_home = "node-%d" % self.guid
212         return os.path.join(self.exp_home, node_home)
213
214     @property
215     def os(self):
216         if self._os:
217             return self._os
218
219         if (not self.get("hostname") or not self.get("username")):
220             msg = "Can't resolve OS, insufficient data "
221             self.error(msg)
222             raise RuntimeError, msg
223
224         (out, err), proc = self.execute("cat /etc/issue", with_lock = True)
225
226         if err and proc.poll():
227             msg = "Error detecting OS "
228             self.error(msg, out, err)
229             raise RuntimeError, "%s - %s - %s" %( msg, out, err )
230
231         if out.find("Fedora release 8") == 0:
232             self._os = OSType.FEDORA_8
233         elif out.find("Fedora release 12") == 0:
234             self._os = OSType.FEDORA_12
235         elif out.find("Fedora release 14") == 0:
236             self._os = OSType.FEDORA_14
237         elif out.find("Debian") == 0: 
238             self._os = OSType.DEBIAN
239         elif out.find("Ubuntu") ==0:
240             self._os = OSType.UBUNTU
241         else:
242             msg = "Unsupported OS"
243             self.error(msg, out)
244             raise RuntimeError, "%s - %s " %( msg, out )
245
246         return self._os
247
248     @property
249     def use_deb(self):
250         return self.os in [OSType.DEBIAN, OSType.UBUNTU]
251
252     @property
253     def use_rpm(self):
254         return self.os in [OSType.FEDORA_12, OSType.FEDORA_14, OSType.FEDORA_8,
255                 OSType.FEDORA]
256
257     @property
258     def localhost(self):
259         return self.get("hostname") in ['localhost', '127.0.0.7', '::1']
260
261     def provision(self):
262         if not self.is_alive():
263             self._state = ResourceState.FAILED
264             msg = "Deploy failed. Unresponsive node %s" % self.get("hostname")
265             self.error(msg)
266             raise RuntimeError, msg
267
268         if self.get("cleanProcesses"):
269             self.clean_processes()
270
271         if self.get("cleanHome"):
272             self.clean_home()
273        
274         self.mkdir(self.node_home)
275
276         super(LinuxNode, self).provision()
277
278     def deploy(self):
279         if self.state == ResourceState.NEW:
280             try:
281                 self.discover()
282                 self.provision()
283             except:
284                 self._state = ResourceState.FAILED
285                 raise
286
287         # Node needs to wait until all associated interfaces are 
288         # ready before it can finalize deployment
289         from nepi.resources.linux.interface import LinuxInterface
290         ifaces = self.get_connected(LinuxInterface.rtype())
291         for iface in ifaces:
292             if iface.state < ResourceState.READY:
293                 self.ec.schedule(reschedule_delay, self.deploy)
294                 return 
295
296         super(LinuxNode, self).deploy()
297
298     def release(self):
299         tear_down = self.get("tearDown")
300         if tear_down:
301             self.execute(tear_down)
302
303         super(LinuxNode, self).release()
304
305     def valid_connection(self, guid):
306         # TODO: Validate!
307         return True
308
309     def clean_processes(self, killer = False):
310         self.info("Cleaning up processes")
311         
312         if killer:
313             # Hardcore kill
314             cmd = ("sudo -S killall python tcpdump || /bin/true ; " +
315                 "sudo -S killall python tcpdump || /bin/true ; " +
316                 "sudo -S kill $(ps -N -T -o pid --no-heading | grep -v $PPID | sort) || /bin/true ; " +
317                 "sudo -S killall -u root || /bin/true ; " +
318                 "sudo -S killall -u root || /bin/true ; ")
319         else:
320             # Be gentler...
321             cmd = ("sudo -S killall tcpdump || /bin/true ; " +
322                 "sudo -S killall tcpdump || /bin/true ; " +
323                 "sudo -S killall -u %s || /bin/true ; " % self.get("username") +
324                 "sudo -S killall -u %s || /bin/true ; " % self.get("username"))
325
326         out = err = ""
327         (out, err), proc = self.execute(cmd, retry = 1, with_lock = True) 
328             
329     def clean_home(self):
330         self.info("Cleaning up home")
331         
332         cmd = (
333             # "find . -maxdepth 1  \( -name '.cache' -o -name '.local' -o -name '.config' -o -name 'nepi-*' \)" +
334             "find . -maxdepth 1 -name 'nepi-*' " +
335             " -execdir rm -rf {} + "
336             )
337             
338         if self.home:
339             cmd = "cd %s ; " % self.home + cmd
340
341         out = err = ""
342         (out, err), proc = self.execute(cmd, with_lock = True)
343
344     def upload(self, src, dst, text = False):
345         """ Copy content to destination
346
347            src  content to copy. Can be a local file, directory or a list of files
348
349            dst  destination path on the remote host (remote is always self.host)
350
351            text src is text input, it must be stored into a temp file before uploading
352         """
353         # If source is a string input 
354         f = None
355         if text and not os.path.isfile(src):
356             # src is text input that should be uploaded as file
357             # create a temporal file with the content to upload
358             f = tempfile.NamedTemporaryFile(delete=False)
359             f.write(src)
360             f.close()
361             src = f.name
362
363         if not self.localhost:
364             # Build destination as <user>@<server>:<path>
365             dst = "%s@%s:%s" % (self.get("username"), self.get("hostname"), dst)
366         result = self.copy(src, dst)
367
368         # clean up temp file
369         if f:
370             os.remove(f.name)
371
372         return result
373
374     def download(self, src, dst):
375         if not self.localhost:
376             # Build destination as <user>@<server>:<path>
377             src = "%s@%s:%s" % (self.get("username"), self.get("hostname"), src)
378         return self.copy(src, dst)
379
380     def install_packages(self, packages, home):
381         command = ""
382         if self.use_rpm:
383             command = rpmfuncs.install_packages_command(self.os, packages)
384         elif self.use_deb:
385             command = debfuncs.install_packages_command(self.os, packages)
386         else:
387             msg = "Error installing packages ( OS not known ) "
388             self.error(msg, self.os)
389             raise RuntimeError, msg
390
391         out = err = ""
392         (out, err), proc = self.run_and_wait(command, home, 
393             shfile = "instpkg.sh",
394             pidfile = "instpkg_pidfile",
395             ecodefile = "instpkg_exitcode",
396             stdout = "instpkg_stdout", 
397             stderr = "instpkg_stderr",
398             raise_on_error = True)
399
400         return (out, err), proc 
401
402     def remove_packages(self, packages, home):
403         command = ""
404         if self.use_rpm:
405             command = rpmfuncs.remove_packages_command(self.os, packages)
406         elif self.use_deb:
407             command = debfuncs.remove_packages_command(self.os, packages)
408         else:
409             msg = "Error removing packages ( OS not known ) "
410             self.error(msg)
411             raise RuntimeError, msg
412
413         out = err = ""
414         (out, err), proc = self.run_and_wait(command, home, 
415             shfile = "rmpkg.sh",
416             pidfile = "rmpkg_pidfile",
417             ecodefile = "rmpkg_exitcode",
418             stdout = "rmpkg_stdout", 
419             stderr = "rmpkg_stderr",
420             raise_on_error = True)
421          
422         return (out, err), proc 
423
424     def mkdir(self, path, clean = False):
425         if clean:
426             self.rmdir(path)
427
428         return self.execute("mkdir -p %s" % path, with_lock = True)
429
430     def rmdir(self, path):
431         return self.execute("rm -rf %s" % path, with_lock = True)
432         
433     def run_and_wait(self, command, home, 
434             shfile = "cmd.sh",
435             env = None,
436             pidfile = "pidfile", 
437             ecodefile = "exitcode", 
438             stdin = None, 
439             stdout = "stdout", 
440             stderr = "stderr", 
441             sudo = False,
442             tty = False,
443             raise_on_error = False):
444         """
445         Uploads the 'command' to a bash script in the host.
446         Then runs the script detached in background in the host, and
447         busy-waites until the script finishes executing.
448         """
449         self.upload_command(command, home, 
450             shfile = shfile, 
451             ecodefile = ecodefile, 
452             env = env)
453
454         command = "bash ./%s" % shfile
455         # run command in background in remote host
456         (out, err), proc = self.run(command, home, 
457                 pidfile = pidfile,
458                 stdin = stdin, 
459                 stdout = stdout, 
460                 stderr = stderr, 
461                 sudo = sudo,
462                 tty = tty)
463
464         # check no errors occurred
465         if proc.poll():
466             msg = " Failed to run command '%s' " % command
467             self.error(msg, out, err)
468             if raise_on_error:
469                 raise RuntimeError, msg
470
471         # Wait for pid file to be generated
472         pid, ppid = self.wait_pid(
473                 home = home, 
474                 pidfile = pidfile, 
475                 raise_on_error = raise_on_error)
476
477         # wait until command finishes to execute
478         self.wait_run(pid, ppid)
479       
480         (out, err), proc = self.check_errors(home,
481             ecodefile = ecodefile,
482             stdout = stdout,
483             stderr= stderr)
484
485         # Out is what was written in the stderr file
486         if err:
487             msg = " Failed to run command '%s' " % command
488             self.error(msg, out, err)
489
490             if raise_on_error:
491                 raise RuntimeError, msg
492         
493         return (out, err), proc
494
495     def exitcode(self, home, ecodefile = "exitcode"):
496         """
497         Get the exit code of an application.
498         Returns an integer value with the exit code 
499         """
500         (out, err), proc = self.check_output(home, ecodefile)
501
502         # Succeeded to open file, return exit code in the file
503         if proc.wait() == 0:
504             try:
505                 return int(out.strip())
506             except:
507                 # Error in the content of the file!
508                 return ExitCode.CORRUPTFILE
509
510         # No such file or directory
511         if proc.returncode == 1:
512             return ExitCode.FILENOTFOUND
513         
514         # Other error from 'cat'
515         return ExitCode.ERROR
516
517     def upload_command(self, command, home, 
518             shfile = "cmd.sh",
519             ecodefile = "exitcode",
520             env = None):
521         """ Saves the command as a bash script file in the remote host, and
522         forces to save the exit code of the command execution to the ecodefile
523         """
524
525         if not (command.strip().endswith(";") or command.strip().endswith("&")):
526             command += ";"
527       
528         # The exit code of the command will be stored in ecodefile
529         command = " { %(command)s } ; echo $? > %(ecodefile)s ;" % {
530                 'command': command,
531                 'ecodefile': ecodefile,
532                 } 
533
534         # Export environment
535         environ = self.format_environment(env)
536
537         # Add environ to command
538         command = environ + command
539
540         dst = os.path.join(home, shfile)
541         return self.upload(command, dst, text = True)
542
543     def format_environment(self, env, inline = False):
544         """Format environmental variables for command to be executed either
545         as an inline command
546         (i.e. export PYTHONPATH=src/..; export LALAL= ..;python script.py) or 
547         as a bash script (i.e. export PYTHONPATH=src/.. \n export LALA=.. \n)
548         """
549         if not env: return ""
550
551         # Remove extra white spaces
552         env = re.sub(r'\s+', ' ', env.strip())
553
554         sep = ";" if inline else "\n"
555         return sep.join(map(lambda e: " export %s" % e, env.split(" "))) + sep 
556
557     def check_errors(self, home, 
558             ecodefile = "exitcode", 
559             stdout = "stdout",
560             stderr = "stderr"):
561         """
562         Checks whether errors occurred while running a command.
563         It first checks the exit code for the command, and only if the
564         exit code is an error one it returns the error output.
565
566         """
567         proc = None
568         err = ""
569         # retrive standard output from the file
570         (out, oerr), oproc = self.check_output(home, stdout)
571
572         # get exit code saved in the 'exitcode' file
573         ecode = self.exitcode(home, ecodefile)
574
575         if ecode in [ ExitCode.CORRUPTFILE, ExitCode.ERROR ]:
576             err = "Error retrieving exit code status from file %s/%s" % (home, ecodefile)
577         elif ecode > 0 or ecode == ExitCode.FILENOTFOUND:
578             # The process returned an error code or didn't exist. 
579             # Check standard error.
580             (err, eerr), proc = self.check_output(home, stderr)
581
582             # If the stderr file was not found, assume nothing bad happened,
583             # and just ignore the error.
584             # (cat returns 1 for error "No such file or directory")
585             if ecode == ExitCode.FILENOTFOUND and proc.poll() == 1: 
586                 err = "" 
587             
588         return (out, err), proc
589  
590     def wait_pid(self, home, pidfile = "pidfile", raise_on_error = False):
591         """ Waits until the pid file for the command is generated, 
592             and returns the pid and ppid of the process """
593         pid = ppid = None
594         delay = 1.0
595
596         for i in xrange(4):
597             pidtuple = self.getpid(home = home, pidfile = pidfile)
598             
599             if pidtuple:
600                 pid, ppid = pidtuple
601                 break
602             else:
603                 time.sleep(delay)
604                 delay = delay * 1.5
605         else:
606             msg = " Failed to get pid for pidfile %s/%s " % (
607                     home, pidfile )
608             self.error(msg)
609             
610             if raise_on_error:
611                 raise RuntimeError, msg
612
613         return pid, ppid
614
615     def wait_run(self, pid, ppid, trial = 0):
616         """ wait for a remote process to finish execution """
617         start_delay = 1.0
618
619         while True:
620             status = self.status(pid, ppid)
621             
622             if status is ProcStatus.FINISHED:
623                 break
624             elif status is not ProcStatus.RUNNING:
625                 delay = delay * 1.5
626                 time.sleep(delay)
627                 # If it takes more than 20 seconds to start, then
628                 # asume something went wrong
629                 if delay > 20:
630                     break
631             else:
632                 # The app is running, just wait...
633                 time.sleep(0.5)
634
635     def check_output(self, home, filename):
636         """ Retrives content of file """
637         (out, err), proc = self.execute("cat %s" % 
638             os.path.join(home, filename), retry = 1, with_lock = True)
639         return (out, err), proc
640
641     def is_alive(self):
642         if self.localhost:
643             return True
644
645         out = err = ""
646         try:
647             # TODO: FIX NOT ALIVE!!!!
648             (out, err), proc = self.execute("echo 'ALIVE' || (echo 'NOTALIVE') >&2", retry = 5, 
649                     with_lock = True)
650         except:
651             import traceback
652             trace = traceback.format_exc()
653             msg = "Unresponsive host  %s " % err
654             self.error(msg, out, trace)
655             return False
656
657         if out.strip().startswith('ALIVE'):
658             return True
659         else:
660             msg = "Unresponsive host "
661             self.error(msg, out, err)
662             return False
663
664     def copy(self, src, dst):
665         if self.localhost:
666             (out, err), proc = execfuncs.lcopy(source, dest, 
667                     recursive = True,
668                     strict_host_checking = False)
669         else:
670             with self._lock:
671                 (out, err), proc = sshfuncs.rcopy(
672                     src, dst, 
673                     port = self.get("port"),
674                     identity = self.get("identity"),
675                     server_key = self.get("serverKey"),
676                     recursive = True,
677                     strict_host_checking = False)
678
679         return (out, err), proc
680
681     def execute(self, command,
682             sudo = False,
683             stdin = None, 
684             env = None,
685             tty = False,
686             forward_x11 = False,
687             timeout = None,
688             retry = 3,
689             err_on_timeout = True,
690             connect_timeout = 30,
691             strict_host_checking = False,
692             persistent = True,
693             blocking = True,
694             with_lock = False
695             ):
696         """ Notice that this invocation will block until the
697         execution finishes. If this is not the desired behavior,
698         use 'run' instead."""
699
700         if self.localhost:
701             (out, err), proc = execfuncs.lexec(command, 
702                     user = user,
703                     sudo = sudo,
704                     stdin = stdin,
705                     env = env)
706         else:
707             if with_lock:
708                 with self._lock:
709                     (out, err), proc = sshfuncs.rexec(
710                         command, 
711                         host = self.get("hostname"),
712                         user = self.get("username"),
713                         port = self.get("port"),
714                         agent = True,
715                         sudo = sudo,
716                         stdin = stdin,
717                         identity = self.get("identity"),
718                         server_key = self.get("serverKey"),
719                         env = env,
720                         tty = tty,
721                         forward_x11 = forward_x11,
722                         timeout = timeout,
723                         retry = retry,
724                         err_on_timeout = err_on_timeout,
725                         connect_timeout = connect_timeout,
726                         persistent = persistent,
727                         blocking = blocking, 
728                         strict_host_checking = strict_host_checking
729                         )
730             else:
731                 (out, err), proc = sshfuncs.rexec(
732                     command, 
733                     host = self.get("hostname"),
734                     user = self.get("username"),
735                     port = self.get("port"),
736                     agent = True,
737                     sudo = sudo,
738                     stdin = stdin,
739                     identity = self.get("identity"),
740                     server_key = self.get("serverKey"),
741                     env = env,
742                     tty = tty,
743                     forward_x11 = forward_x11,
744                     timeout = timeout,
745                     retry = retry,
746                     err_on_timeout = err_on_timeout,
747                     connect_timeout = connect_timeout,
748                     persistent = persistent,
749                     blocking = blocking, 
750                     strict_host_checking = strict_host_checking
751                     )
752
753         return (out, err), proc
754
755     def run(self, command, home,
756             create_home = False,
757             pidfile = 'pidfile',
758             stdin = None, 
759             stdout = 'stdout', 
760             stderr = 'stderr', 
761             sudo = False,
762             tty = False):
763         
764         self.debug("Running command '%s'" % command)
765         
766         if self.localhost:
767             (out, err), proc = execfuncs.lspawn(command, pidfile, 
768                     stdout = stdout, 
769                     stderr = stderr, 
770                     stdin = stdin, 
771                     home = home, 
772                     create_home = create_home, 
773                     sudo = sudo,
774                     user = user) 
775         else:
776             with self._lock:
777                 (out, err), proc = sshfuncs.rspawn(
778                     command,
779                     pidfile = pidfile,
780                     home = home,
781                     create_home = create_home,
782                     stdin = stdin if stdin is not None else '/dev/null',
783                     stdout = stdout if stdout else '/dev/null',
784                     stderr = stderr if stderr else '/dev/null',
785                     sudo = sudo,
786                     host = self.get("hostname"),
787                     user = self.get("username"),
788                     port = self.get("port"),
789                     agent = True,
790                     identity = self.get("identity"),
791                     server_key = self.get("serverKey"),
792                     tty = tty
793                     )
794
795         return (out, err), proc
796
797     def getpid(self, home, pidfile = "pidfile"):
798         if self.localhost:
799             pidtuple =  execfuncs.lgetpid(os.path.join(home, pidfile))
800         else:
801             with self._lock:
802                 pidtuple = sshfuncs.rgetpid(
803                     os.path.join(home, pidfile),
804                     host = self.get("hostname"),
805                     user = self.get("username"),
806                     port = self.get("port"),
807                     agent = True,
808                     identity = self.get("identity"),
809                     server_key = self.get("serverKey")
810                     )
811         
812         return pidtuple
813
814     def status(self, pid, ppid):
815         if self.localhost:
816             status = execfuncs.lstatus(pid, ppid)
817         else:
818             with self._lock:
819                 status = sshfuncs.rstatus(
820                         pid, ppid,
821                         host = self.get("hostname"),
822                         user = self.get("username"),
823                         port = self.get("port"),
824                         agent = True,
825                         identity = self.get("identity"),
826                         server_key = self.get("serverKey")
827                         )
828            
829         return status
830     
831     def kill(self, pid, ppid, sudo = False):
832         out = err = ""
833         proc = None
834         status = self.status(pid, ppid)
835
836         if status == sshfuncs.ProcStatus.RUNNING:
837             if self.localhost:
838                 (out, err), proc = execfuncs.lkill(pid, ppid, sudo)
839             else:
840                 with self._lock:
841                     (out, err), proc = sshfuncs.rkill(
842                         pid, ppid,
843                         host = self.get("hostname"),
844                         user = self.get("username"),
845                         port = self.get("port"),
846                         agent = True,
847                         sudo = sudo,
848                         identity = self.get("identity"),
849                         server_key = self.get("serverKey")
850                         )
851
852         return (out, err), proc
853