Merge from multihop_ssh to nepi-3-dev
authorLucia Guevgeozian Odizzio <lucia.guevgeozian_odizzio@inria.fr>
Thu, 30 Jan 2014 15:17:47 +0000 (16:17 +0100)
committerLucia Guevgeozian Odizzio <lucia.guevgeozian_odizzio@inria.fr>
Thu, 30 Jan 2014 15:17:47 +0000 (16:17 +0100)
examples/linux/transfer/file_transfer.py
src/nepi/execution/resource.py
src/nepi/resources/linux/application.py
src/nepi/resources/linux/node.py
src/nepi/resources/planetlab/node.py
src/nepi/util/execfuncs.py
src/nepi/util/sshfuncs.py
test/resources/planetlab/node.py

index 3296b07..d5d6aaf 100755 (executable)
@@ -70,7 +70,7 @@ server = add_node(ec, server_name, slicename)
 client = add_node(ec, client_name, slicename)
 
 # Add resource managers for the linux applications
-app_server =  add_app(ec, "cat ${SOURCES}/big_buck_bunny_240p_mpeg4_lq.ts | pv -fbt 2> \
+app_server =  add_app(ec, "cat ${SRC}/big_buck_bunny_240p_mpeg4_lq.ts | pv -fbt 2> \
      bw.txt | nc %s 1234" % client_name, server, video=video, depends=depends_server)
 
 # Note: is important to add the -d option in nc command to not attempt to read from the 
index d0d9079..49b842b 100644 (file)
@@ -531,9 +531,8 @@ class ResourceManager(Logger):
                 import traceback
                 err = traceback.format_exc()
                 self.error(err)
-
-            self.set_released()
-            self.debug("----- RELEASED ---- ")
+                self.set_released()
+                self.debug("----- RELEASED ---- ")
 
     def fail(self):
         """ Sets the RM to state FAILED.
@@ -946,7 +945,8 @@ class ResourceManager(Logger):
         self.set_ready()
 
     def do_release(self):
-        pass
+        self.set_released()
+        self.debug("----- RELEASED ---- ")
 
     def do_fail(self):
         self.set_failed()
index 5ae8f0b..db92fec 100644 (file)
@@ -187,7 +187,7 @@ class LinuxApplication(ResourceManager):
 
         # timestamp of last state check of the application
         self._last_state_check = tnow()
-
+        
     def log_message(self, msg):
         return " guid %d - host %s - %s " % (self.guid, 
                 self.node.get("hostname"), msg)
@@ -271,6 +271,13 @@ class LinuxApplication(ResourceManager):
         return out
 
     def do_provision(self):
+        # take a snapshot of the system if user is root
+        # to assure cleanProcess kill every nepi process
+        if self.node.get("username") == 'root':
+            ps_aux = "ps aux |awk '{print $2}' |sort -u"
+            (out, err), proc = self.node.execute(ps_aux)
+            self.node._pids = out.split()
+        
         # create run dir for application
         self.node.mkdir(self.run_home)
    
index 710561b..60fad69 100644 (file)
@@ -188,6 +188,12 @@ class LinuxNode(ResourceManager):
                 "releasing the resource",
                 flags = Flags.ExecReadOnly)
 
+        gateway_user = Attribute("gatewayUser", "Gateway account username",
+                flags = Flags.ExecReadOnly)
+
+        gateway = Attribute("gateway", "Hostname of the gateway machine",
+                flags = Flags.ExecReadOnly)
+
         cls._register_attribute(hostname)
         cls._register_attribute(username)
         cls._register_attribute(port)
@@ -198,12 +204,17 @@ class LinuxNode(ResourceManager):
         cls._register_attribute(clean_experiment)
         cls._register_attribute(clean_processes)
         cls._register_attribute(tear_down)
+        cls._register_attribute(gateway_user)
+        cls._register_attribute(gateway)
 
     def __init__(self, ec, guid):
         super(LinuxNode, self).__init__(ec, guid)
         self._os = None
         # home directory at Linux host
         self._home_dir = ""
+
+        # list of pids before running the app if the user is root
+        self._pids = []
         
         # lock to prevent concurrent applications on the same node,
         # to execute commands at the same time. There are potential
@@ -391,10 +402,26 @@ class LinuxNode(ResourceManager):
 
     def clean_processes(self):
         self.info("Cleaning up processes")
-        
-        cmd = ("sudo -S killall tcpdump || /bin/true ; " +
-            "sudo -S kill $(ps aux | grep '[n]epi' | awk '{print $2}') || /bin/true ; " +
-            "sudo -S killall -u %s || /bin/true ; " % self.get("username"))
+        if self.get("username") != 'root':
+            cmd = ("sudo -S killall tcpdump || /bin/true ; " +
+                "sudo -S kill $(ps aux | grep '[n]epi' | awk '{print $2}') || /bin/true ; " +
+                "sudo -S killall -u %s || /bin/true ; " % self.get("username"))
+        else:
+            pids_temp = []
+            if self.state >= ResourceState.READY:
+                ps_aux = "ps aux |awk '{print $2}' |sort -u"
+                (out, err), proc = self.execute(ps_aux)
+                pids_temp = out.split()
+                kill_pids = list(set(pids_temp) - set(self._pids))
+                kill_pids = ' '.join(kill_pids)
+
+                cmd = ("killall tcpdump || /bin/true ; " +
+                    "kill $(ps aux | grep '[n]epi' | awk '{print $2}') || /bin/true ; " +
+                    "kill %s || /bin/true ; " % kill_pids)
+            else:
+                cmd = ("killall tcpdump || /bin/true ; " +
+                    "kill $(ps aux | grep '[n]epi' | awk '{print $2}') || /bin/true ; ")
 
         out = err = ""
         (out, err), proc = self.execute(cmd, retry = 1, with_lock = True)
@@ -455,6 +482,8 @@ class LinuxNode(ResourceManager):
                         host = self.get("hostname"),
                         user = self.get("username"),
                         port = self.get("port"),
+                        gwuser = self.get("gatewayUser"),
+                        gw = self.get("gateway"),
                         agent = True,
                         sudo = sudo,
                         stdin = stdin,
@@ -477,6 +506,8 @@ class LinuxNode(ResourceManager):
                     host = self.get("hostname"),
                     user = self.get("username"),
                     port = self.get("port"),
+                    gwuser = self.get("gatewayUser"),
+                    gw = self.get("gateway"),
                     agent = True,
                     sudo = sudo,
                     stdin = stdin,
@@ -530,6 +561,8 @@ class LinuxNode(ResourceManager):
                     host = self.get("hostname"),
                     user = self.get("username"),
                     port = self.get("port"),
+                    gwuser = self.get("gatewayUser"),
+                    gw = self.get("gateway"),
                     agent = True,
                     identity = self.get("identity"),
                     server_key = self.get("serverKey"),
@@ -548,6 +581,8 @@ class LinuxNode(ResourceManager):
                     host = self.get("hostname"),
                     user = self.get("username"),
                     port = self.get("port"),
+                    gwuser = self.get("gatewayUser"),
+                    gw = self.get("gateway"),
                     agent = True,
                     identity = self.get("identity"),
                     server_key = self.get("serverKey")
@@ -565,6 +600,8 @@ class LinuxNode(ResourceManager):
                         host = self.get("hostname"),
                         user = self.get("username"),
                         port = self.get("port"),
+                        gwuser = self.get("gatewayUser"),
+                        gw = self.get("gateway"),
                         agent = True,
                         identity = self.get("identity"),
                         server_key = self.get("serverKey")
@@ -587,6 +624,8 @@ class LinuxNode(ResourceManager):
                         host = self.get("hostname"),
                         user = self.get("username"),
                         port = self.get("port"),
+                        gwuser = self.get("gatewayUser"),
+                        gw = self.get("gateway"),
                         agent = True,
                         sudo = sudo,
                         identity = self.get("identity"),
@@ -605,6 +644,8 @@ class LinuxNode(ResourceManager):
                 (out, err), proc = sshfuncs.rcopy(
                     src, dst, 
                     port = self.get("port"),
+                    gwuser = self.get("gatewayUser"),
+                    gw = self.get("gateway"),
                     identity = self.get("identity"),
                     server_key = self.get("serverKey"),
                     recursive = True,
index 00f9ae9..c24cb12 100644 (file)
@@ -198,6 +198,10 @@ class PlanetlabNode(LinuxNode):
         self._slicenode = False
         self._hostname = False
 
+        if self.get("gateway") or self.get("gatewayUser"):
+            self.set("gateway", None)
+            self.set("gatewayUser", None)
+
     def _skip_provision(self):
         pl_user = self.get("pluser")
         pl_pass = self.get("plpassword")
@@ -339,7 +343,6 @@ class PlanetlabNode(LinuxNode):
                     self.warn(" Could not SSH login ")
                     self._blacklist_node(node)
                     #self._delete_node_from_slice(node)
-                #self.set('hostname', None)
                 self.do_discover()
                 continue
             
@@ -352,7 +355,6 @@ class PlanetlabNode(LinuxNode):
                         self.warn(" Could not find directory /proc ")
                         self._blacklist_node(node)
                         #self._delete_node_from_slice(node)
-                    #self.set('hostname', None)
                     self.do_discover()
                     continue
             
@@ -523,7 +525,6 @@ class PlanetlabNode(LinuxNode):
                         self._set_hostname_attr(node_id)
                         self.warn(" Node not responding PING ")
                         self._blacklist_node(node_id)
-                        #self.set('hostname', None)
                     else:
                         # discovered node for provision, added to provision list
                         self._put_node_in_provision(node_id)
@@ -583,7 +584,8 @@ class PlanetlabNode(LinuxNode):
         ip = self._get_ip(node_id)
         if not ip: return ping_ok
 
-        command = "ping -c4 %s" % ip
+        command = ['ping', '-c4']
+        command.append(ip)
 
         (out, err) = lexec(command)
         if not out.find("2 received") or not out.find("3 received") or not \
index e09d490..eaadf50 100644 (file)
@@ -41,10 +41,10 @@ def lexec(command,
         command = "su %s ; %s " % (user, command)
 
 
-    p = subprocess.Popen(command, shell=True, 
+    p = subprocess.Popen(command, 
             stdout = subprocess.PIPE, 
-            stderr = subprocess.PIPE)
-            #stdin  = stdin)
+            stderr = subprocess.PIPE,
+            stdin  = stdin)
 
     out, err = p.communicate()
     return (out, err)
index 43ca61c..d4d02ff 100644 (file)
@@ -205,7 +205,9 @@ def eintr_retry(func):
     return rv
 
 def rexec(command, host, user, 
-        port = None, 
+        port = None,
+        gwuser = None,
+        gw = None, 
         agent = True,
         sudo = False,
         stdin = None,
@@ -226,7 +228,9 @@ def rexec(command, host, user,
     """
     
     tmp_known_hosts = None
-    hostip = gethostbyname(host)
+    if not gw:
+        hostip = gethostbyname(host)
+    else: hostip = None
 
     args = ['ssh', '-C',
             # Don't bother with localhost. Makes test easier
@@ -235,6 +239,7 @@ def rexec(command, host, user,
             '-o', 'ConnectionAttempts=3',
             '-o', 'ServerAliveInterval=30',
             '-o', 'TCPKeepAlive=yes',
+            '-o', 'Batchmode=yes',
             '-l', user, hostip or host]
 
     if persistent and openssh_has_persist():
@@ -247,6 +252,13 @@ def rexec(command, host, user,
         # Do not check for Host key. Unsafe.
         args.extend(['-o', 'StrictHostKeyChecking=no'])
 
+    if gw:
+        if gwuser:
+            proxycommand = 'ProxyCommand=ssh %s@%s -W %%h:%%p' % (gwuser, gw)
+        else:
+            proxycommand = 'ProxyCommand=ssh %%r@%s -W %%h:%%p' % gw
+        args.extend(['-o', proxycommand])
+
     if agent:
         args.append('-A')
 
@@ -280,7 +292,7 @@ def rexec(command, host, user,
                 stdout = subprocess.PIPE,
                 stdin = subprocess.PIPE, 
                 stderr = subprocess.PIPE)
-        
+       
         # attach tempfile object to the process, to make sure the file stays
         # alive until the process is finished with it
         proc._known_hosts = tmp_known_hosts
@@ -328,7 +340,9 @@ def rexec(command, host, user,
     return ((out, err), proc)
 
 def rcopy(source, dest,
-        port = None, 
+        port = None,
+        gwuser = None,
+        gw = None,
         agent = True, 
         recursive = False,
         identity = None,
@@ -380,7 +394,9 @@ def rcopy(source, dest,
         user,host = remspec.rsplit('@',1)
         
         tmp_known_hosts = None
-        hostip = gethostbyname(host)
+        if not gw:
+            hostip = gethostbyname(host)
+        else: hostip = None
         
         args = ['ssh', '-l', user, '-C',
                 # Don't bother with localhost. Makes test easier
@@ -397,6 +413,13 @@ def rcopy(source, dest,
                 '-o', 'ControlPath=%s' % (make_control_path(agent, False),),
                 '-o', 'ControlPersist=60' ])
 
+        if gw:
+            if gwuser:
+                proxycommand = 'ProxyCommand=ssh %s@%s -W %%h:%%p' % (gwuser, gw)
+            else:
+                proxycommand = 'ProxyCommand=ssh %%r@%s -W %%h:%%p' % gw
+            args.extend(['-o', proxycommand])
+
         if port:
             args.append('-P%d' % port)
 
@@ -535,6 +558,13 @@ def rcopy(source, dest,
         if port:
             args.append('-P%d' % port)
 
+        if gw:
+            if gwuser:
+                proxycommand = 'ProxyCommand=ssh %s@%s -W %%h:%%p' % (gwuser, gw)
+            else:
+                proxycommand = 'ProxyCommand=ssh %%r@%s -W %%h:%%p' % gw
+            args.extend(['-o', proxycommand])
+
         if recursive:
             args.append('-r')
 
@@ -561,7 +591,7 @@ def rcopy(source, dest,
             args.append(source)
 
         args.append(dest)
-
+        
         for x in xrange(retry):
             # connects to the remote host and starts a remote connection
             proc = subprocess.Popen(args,
@@ -609,6 +639,8 @@ def rspawn(command, pidfile,
         host = None, 
         port = None, 
         user = None, 
+        gwuser = None,
+        gw = None,
         agent = None, 
         identity = None, 
         server_key = None,
@@ -682,6 +714,8 @@ def rspawn(command, pidfile,
         host = host,
         port = port,
         user = user,
+        gwuser = gwuser,
+        gw = gw,
         agent = agent,
         identity = identity,
         server_key = server_key,
@@ -698,6 +732,8 @@ def rgetpid(pidfile,
         host = None, 
         port = None, 
         user = None, 
+        gwuser = None,
+        gw = None,
         agent = None, 
         identity = None,
         server_key = None):
@@ -724,6 +760,8 @@ def rgetpid(pidfile,
         host = host,
         port = port,
         user = user,
+        gwuser = gwuser,
+        gw = gw,
         agent = agent,
         identity = identity,
         server_key = server_key
@@ -744,6 +782,8 @@ def rstatus(pid, ppid,
         host = None, 
         port = None, 
         user = None, 
+        gwuser = None,
+        gw = None,
         agent = None, 
         identity = None,
         server_key = None):
@@ -768,6 +808,8 @@ def rstatus(pid, ppid,
         host = host,
         port = port,
         user = user,
+        gwuser = gwuser,
+        gw = gw,
         agent = agent,
         identity = identity,
         server_key = server_key
@@ -791,6 +833,8 @@ def rkill(pid, ppid,
         host = None, 
         port = None, 
         user = None, 
+        gwuser = None,
+        gw = None,
         agent = None, 
         sudo = False,
         identity = None, 
@@ -845,6 +889,8 @@ fi
         host = host,
         port = port,
         user = user,
+        gwuser = gwuser,
+        gw = gw,
         agent = agent,
         identity = identity,
         server_key = server_key
index 933fe17..6599a2b 100755 (executable)
@@ -75,7 +75,7 @@ class PLNodeFactoryTestCase(unittest.TestCase):
 
     def test_creation_phase(self):
         self.assertEquals(PlanetlabNode._rtype, "PlanetlabNode")
-        self.assertEquals(len(PlanetlabNode._attributes), 30)
+        self.assertEquals(len(PlanetlabNode._attributes), 32)
 
 class PLNodeTestCase(unittest.TestCase):
     """