Adding gateway attr and modify CleanProcess
[nepi.git] / src / nepi / resources / linux / node.py
index 953793a..60fad69 100644 (file)
@@ -19,7 +19,7 @@
 
 from nepi.execution.attribute import Attribute, Flags, Types
 from nepi.execution.resource import ResourceManager, clsinit_copy, \
-        ResourceState, reschedule_delay, failtrap
+        ResourceState, reschedule_delay
 from nepi.resources.linux import rpmfuncs, debfuncs 
 from nepi.util import sshfuncs, execfuncs
 from nepi.util.sshfuncs import ProcStatus
@@ -188,6 +188,12 @@ class LinuxNode(ResourceManager):
                 "releasing the resource",
                 flags = Flags.ExecReadOnly)
 
+        gateway_user = Attribute("gatewayUser", "Gateway account username",
+                flags = Flags.ExecReadOnly)
+
+        gateway = Attribute("gateway", "Hostname of the gateway machine",
+                flags = Flags.ExecReadOnly)
+
         cls._register_attribute(hostname)
         cls._register_attribute(username)
         cls._register_attribute(port)
@@ -198,12 +204,17 @@ class LinuxNode(ResourceManager):
         cls._register_attribute(clean_experiment)
         cls._register_attribute(clean_processes)
         cls._register_attribute(tear_down)
+        cls._register_attribute(gateway_user)
+        cls._register_attribute(gateway)
 
     def __init__(self, ec, guid):
         super(LinuxNode, self).__init__(ec, guid)
         self._os = None
         # home directory at Linux host
         self._home_dir = ""
+
+        # list of pids before running the app if the user is root
+        self._pids = []
         
         # lock to prevent concurrent applications on the same node,
         # to execute commands at the same time. There are potential
@@ -279,6 +290,8 @@ class LinuxNode(ResourceManager):
             self._os = OSType.FEDORA_12
         elif out.find("Fedora release 14") == 0:
             self._os = OSType.FEDORA_14
+        elif out.find("Fedora release") == 0:
+            self._os = OSType.FEDORA
         elif out.find("Debian") == 0: 
             self._os = OSType.DEBIAN
         elif out.find("Ubuntu") ==0:
@@ -296,24 +309,16 @@ class LinuxNode(ResourceManager):
         # To work arround this, repeat the operation N times or
         # until the result is not empty string
         out = ""
-        retrydelay = 1.0
-        for i in xrange(10):
-            try:
-                (out, err), proc = self.execute("cat /etc/issue", 
-                        retry = 5,
-                        with_lock = True,
-                        blocking = True)
-
-                if out.strip() != "":
-                    return out
-            except:
-                trace = traceback.format_exc()
-                msg = "Error detecting OS: %s " % trace
-                self.error(msg, out, err)
-                return False
-
-            time.sleep(min(30.0, retrydelay))
-            retrydelay *= 1.5
+        try:
+            (out, err), proc = self.execute("cat /etc/issue", 
+                    with_lock = True,
+                    blocking = True)
+        except:
+            trace = traceback.format_exc()
+            msg = "Error detecting OS: %s " % trace
+            self.error(msg, out, err)
+        
+        return out
 
     @property
     def use_deb(self):
@@ -328,8 +333,7 @@ class LinuxNode(ResourceManager):
     def localhost(self):
         return self.get("hostname") in ['localhost', '127.0.0.7', '::1']
 
-    @failtrap
-    def provision(self):
+    def do_provision(self):
         # check if host is alive
         if not self.is_alive():
             msg = "Deploy failed. Unresponsive node %s" % self.get("hostname")
@@ -356,72 +360,72 @@ class LinuxNode(ResourceManager):
         # Create experiment node home directory
         self.mkdir(self.node_home)
 
-        super(LinuxNode, self).provision()
+        super(LinuxNode, self).do_provision()
 
-    @failtrap
-    def deploy(self):
+    def do_deploy(self):
         if self.state == ResourceState.NEW:
             self.info("Deploying node")
-            self.discover()
-            self.provision()
+            self.do_discover()
+            self.do_provision()
 
         # Node needs to wait until all associated interfaces are 
         # ready before it can finalize deployment
         from nepi.resources.linux.interface import LinuxInterface
-        ifaces = self.get_connected(LinuxInterface.rtype())
+        ifaces = self.get_connected(LinuxInterface.get_rtype())
         for iface in ifaces:
             if iface.state < ResourceState.READY:
                 self.ec.schedule(reschedule_delay, self.deploy)
                 return 
 
-        super(LinuxNode, self).deploy()
+        super(LinuxNode, self).do_deploy()
 
-    def release(self):
-        try:
-            rms = self.get_connected()
-            for rm in rms:
-                # Node needs to wait until all associated RMs are released
-                # before it can be released
-                if rm.state < ResourceState.STOPPED:
-                    self.ec.schedule(reschedule_delay, self.release)
-                    return 
-
-            tear_down = self.get("tearDown")
-            if tear_down:
-                self.execute(tear_down)
+    def do_release(self):
+        rms = self.get_connected()
+        for rm in rms:
+            # Node needs to wait until all associated RMs are released
+            # before it can be released
+            if rm.state != ResourceState.RELEASED:
+                self.ec.schedule(reschedule_delay, self.release)
+                return 
 
-            self.clean_processes()
-        except:
-            import traceback
-            err = traceback.format_exc()
-            self.error(err)
+        tear_down = self.get("tearDown")
+        if tear_down:
+            self.execute(tear_down)
+
+        self.clean_processes()
 
-        super(LinuxNode, self).release()
+        super(LinuxNode, self).do_release()
 
     def valid_connection(self, guid):
         # TODO: Validate!
         return True
 
-    def clean_processes(self, killer = False):
+    def clean_processes(self):
         self.info("Cleaning up processes")
-        
-        if killer:
-            # Hardcore kill
-            cmd = ("sudo -S killall python tcpdump || /bin/true ; " +
-                "sudo -S killall python tcpdump || /bin/true ; " +
-                "sudo -S kill $(ps -N -T -o pid --no-heading | grep -v $PPID | sort) || /bin/true ; " +
-                "sudo -S killall -u root || /bin/true ; " +
-                "sudo -S killall -u root || /bin/true ; ")
-        else:
-            # Be gentler...
+        if self.get("username") != 'root':
             cmd = ("sudo -S killall tcpdump || /bin/true ; " +
-                "sudo -S killall tcpdump || /bin/true ; " +
-                "sudo -S killall -u %s || /bin/true ; " % self.get("username") +
+                "sudo -S kill $(ps aux | grep '[n]epi' | awk '{print $2}') || /bin/true ; " +
                 "sudo -S killall -u %s || /bin/true ; " % self.get("username"))
+        else:
+            pids_temp = []
+            if self.state >= ResourceState.READY:
+                ps_aux = "ps aux |awk '{print $2}' |sort -u"
+                (out, err), proc = self.execute(ps_aux)
+                pids_temp = out.split()
+                kill_pids = list(set(pids_temp) - set(self._pids))
+                kill_pids = ' '.join(kill_pids)
+
+                cmd = ("killall tcpdump || /bin/true ; " +
+                    "kill $(ps aux | grep '[n]epi' | awk '{print $2}') || /bin/true ; " +
+                    "kill %s || /bin/true ; " % kill_pids)
+            else:
+                cmd = ("killall tcpdump || /bin/true ; " +
+                    "kill $(ps aux | grep '[n]epi' | awk '{print $2}') || /bin/true ; ")
 
         out = err = ""
-        (out, err), proc = self.execute(cmd, retry = 1, with_lock = True) 
-            
+        (out, err), proc = self.execute(cmd, retry = 1, with_lock = True)
+
     def clean_home(self):
         """ Cleans all NEPI related folders in the Linux host
         """
@@ -466,7 +470,7 @@ class LinuxNode(ResourceManager):
 
         if self.localhost:
             (out, err), proc = execfuncs.lexec(command, 
-                    user = user,
+                    user = self.get("username"), # still problem with localhost
                     sudo = sudo,
                     stdin = stdin,
                     env = env)
@@ -478,6 +482,8 @@ class LinuxNode(ResourceManager):
                         host = self.get("hostname"),
                         user = self.get("username"),
                         port = self.get("port"),
+                        gwuser = self.get("gatewayUser"),
+                        gw = self.get("gateway"),
                         agent = True,
                         sudo = sudo,
                         stdin = stdin,
@@ -500,6 +506,8 @@ class LinuxNode(ResourceManager):
                     host = self.get("hostname"),
                     user = self.get("username"),
                     port = self.get("port"),
+                    gwuser = self.get("gatewayUser"),
+                    gw = self.get("gateway"),
                     agent = True,
                     sudo = sudo,
                     stdin = stdin,
@@ -553,6 +561,8 @@ class LinuxNode(ResourceManager):
                     host = self.get("hostname"),
                     user = self.get("username"),
                     port = self.get("port"),
+                    gwuser = self.get("gatewayUser"),
+                    gw = self.get("gateway"),
                     agent = True,
                     identity = self.get("identity"),
                     server_key = self.get("serverKey"),
@@ -571,6 +581,8 @@ class LinuxNode(ResourceManager):
                     host = self.get("hostname"),
                     user = self.get("username"),
                     port = self.get("port"),
+                    gwuser = self.get("gatewayUser"),
+                    gw = self.get("gateway"),
                     agent = True,
                     identity = self.get("identity"),
                     server_key = self.get("serverKey")
@@ -588,6 +600,8 @@ class LinuxNode(ResourceManager):
                         host = self.get("hostname"),
                         user = self.get("username"),
                         port = self.get("port"),
+                        gwuser = self.get("gatewayUser"),
+                        gw = self.get("gateway"),
                         agent = True,
                         identity = self.get("identity"),
                         server_key = self.get("serverKey")
@@ -610,6 +624,8 @@ class LinuxNode(ResourceManager):
                         host = self.get("hostname"),
                         user = self.get("username"),
                         port = self.get("port"),
+                        gwuser = self.get("gatewayUser"),
+                        gw = self.get("gateway"),
                         agent = True,
                         sudo = sudo,
                         identity = self.get("identity"),
@@ -628,6 +644,8 @@ class LinuxNode(ResourceManager):
                 (out, err), proc = sshfuncs.rcopy(
                     src, dst, 
                     port = self.get("port"),
+                    gwuser = self.get("gatewayUser"),
+                    gw = self.get("gateway"),
                     identity = self.get("identity"),
                     server_key = self.get("serverKey"),
                     recursive = True,
@@ -916,7 +934,7 @@ class LinuxNode(ResourceManager):
         pid = ppid = None
         delay = 1.0
 
-        for i in xrange(4):
+        for i in xrange(2):
             pidtuple = self.getpid(home = home, pidfile = pidfile)
             
             if pidtuple:
@@ -968,35 +986,25 @@ class LinuxNode(ResourceManager):
             return True
 
         out = err = ""
+        msg = "Unresponsive host. Wrong answer. "
+
         # The underlying SSH layer will sometimes return an empty
         # output (even if the command was executed without errors).
         # To work arround this, repeat the operation N times or
         # until the result is not empty string
-        retrydelay = 1.0
-        for i in xrange(10):
-            try:
-                (out, err), proc = self.execute("echo 'ALIVE'",
-                        retry = 5,
-                        blocking = True,
-                        with_lock = True)
-        
-                if out.find("ALIVE") > -1:
-                    return True
-            except:
-                trace = traceback.format_exc()
-                msg = "Unresponsive host. Error reaching host: %s " % trace
-                self.error(msg, out, err)
-                return False
+        try:
+            (out, err), proc = self.execute("echo 'ALIVE'",
+                    blocking = True,
+                    with_lock = True)
+    
+            if out.find("ALIVE") > -1:
+                return True
+        except:
+            trace = traceback.format_exc()
+            msg = "Unresponsive host. Error reaching host: %s " % trace
 
-            time.sleep(min(30.0, retrydelay))
-            retrydelay *= 1.5
-
-        if out.find("ALIVE") > -1:
-            return True
-        else:
-            msg = "Unresponsive host. Wrong answer. "
-            self.error(msg, out, err)
-            return False
+        self.error(msg, out, err)
+        return False
 
     def find_home(self):
         """ Retrieves host home directory
@@ -1005,29 +1013,20 @@ class LinuxNode(ResourceManager):
         # output (even if the command was executed without errors).
         # To work arround this, repeat the operation N times or
         # until the result is not empty string
-        retrydelay = 1.0
-        for i in xrange(10):
-            try:
-                (out, err), proc = self.execute("echo ${HOME}",
-                        retry = 5,
-                        blocking = True,
-                        with_lock = True)
-        
-                if out.strip() != "":
-                    self._home_dir =  out.strip()
-                    break
-            except:
-                trace = traceback.format_exc()
-                msg = "Impossible to retrieve HOME directory" % trace
-                self.error(msg, out, err)
-                return False
-
-            time.sleep(min(30.0, retrydelay))
-            retrydelay *= 1.5
+        msg = "Impossible to retrieve HOME directory"
+        try:
+            (out, err), proc = self.execute("echo ${HOME}",
+                    blocking = True,
+                    with_lock = True)
+    
+            if out.strip() != "":
+                self._home_dir =  out.strip()
+        except:
+            trace = traceback.format_exc()
+            msg = "Impossible to retrieve HOME directory %s" % trace
 
         if not self._home_dir:
-            msg = "Impossible to retrieve HOME directory"
-            self.error(msg, out, err)
+            self.error(msg)
             raise RuntimeError, msg
 
     def filter_existing_files(self, src, dst):