* Automatic provisioning
authorClaudio-Daniel Freire <claudio-daniel.freire@inria.fr>
Tue, 26 Apr 2011 11:19:06 +0000 (13:19 +0200)
committerClaudio-Daniel Freire <claudio-daniel.freire@inria.fr>
Tue, 26 Apr 2011 11:19:06 +0000 (13:19 +0200)
* Server key validation
* Trivial resource allocation (only perfectly-defined hosts)

src/nepi/testbeds/planetlab/application.py
src/nepi/testbeds/planetlab/execute.py
src/nepi/testbeds/planetlab/metadata_v01.py
src/nepi/testbeds/planetlab/node.py
src/nepi/testbeds/planetlab/plcapi.py
src/nepi/testbeds/planetlab/rspawn.py
src/nepi/util/server.py

index c71e852..e4ff075 100644 (file)
@@ -85,7 +85,8 @@ class Application(object):
             port = None,
             user = self.slicename,
             agent = None,
-            ident_key = self.ident_path
+            ident_key = self.ident_path,
+            server_key = self.node.server_key
             )
         
         if proc.wait():
@@ -103,7 +104,8 @@ class Application(object):
                 port = None,
                 user = self.slicename,
                 agent = None,
-                ident_key = self.ident_path
+                ident_key = self.ident_path,
+                server_key = self.node.server_key
                 )
             
             if pidtuple:
@@ -145,7 +147,8 @@ class Application(object):
                 port = None,
                 user = self.slicename,
                 agent = None,
-                ident_key = self.ident_path
+                ident_key = self.ident_path,
+                server_key = self.node.server_key
                 )
     
     def remote_trace_path(self, whichtrace):
@@ -179,7 +182,8 @@ class Application(object):
             local_path,
             port = None,
             agent = None,
-            ident_key = self.ident_path
+            ident_key = self.ident_path,
+            server_key = self.node.server_key
             )
         
         if proc.wait():
@@ -201,7 +205,8 @@ class Application(object):
             port = None,
             user = self.slicename,
             agent = None,
-            ident_key = self.ident_path
+            ident_key = self.ident_path,
+            server_key = self.node.server_key
             )
         
         if proc.wait():
@@ -216,7 +221,8 @@ class Application(object):
                     os.path.join(self.home_path, 'stdin') ),
                 port = None,
                 agent = None,
-                ident_key = self.ident_path
+                ident_key = self.ident_path,
+                server_key = self.node.server_key
                 )
             
             if proc.wait():
@@ -232,7 +238,8 @@ class Application(object):
                     source,
                     "%s@%s:%s" % (self.slicename, self.node.hostname, 
                         os.path.join(self.home_path,'.'),),
-                    ident_key = self.ident_path
+                    ident_key = self.ident_path,
+                    server_key = self.node.server_key
                     )
             
                 if proc.wait():
@@ -248,7 +255,8 @@ class Application(object):
                 port = None,
                 user = self.slicename,
                 agent = None,
-                ident_key = self.ident_path
+                ident_key = self.ident_path,
+                server_key = self.node.server_key
                 )
         
             if proc.wait():
@@ -266,7 +274,8 @@ class Application(object):
                 port = None,
                 user = self.slicename,
                 agent = None,
-                ident_key = self.ident_path
+                ident_key = self.ident_path,
+                server_key = self.node.server_key
                 )
         
             if proc.wait():
index b0918c5..860f6c3 100644 (file)
@@ -4,6 +4,7 @@
 from constants import TESTBED_ID
 from nepi.core import testbed_impl
 import os
+import time
 
 class TestbedController(testbed_impl.TestbedController):
     def __init__(self, testbed_version):
@@ -58,10 +59,7 @@ class TestbedController(testbed_impl.TestbedController):
         self.sliceSSHKey = self._attributes.\
             get_attribute_value("sliceSSHKey")
 
-    def do_create(self):
-        # Create node elements per XML data
-        super(TestbedController, self).do_create()
-        
+    def do_preconfigure(self):
         # Perform resource discovery if we don't have
         # specific resources assigned yet
         self.do_resource_discovery()
@@ -69,20 +67,46 @@ class TestbedController(testbed_impl.TestbedController):
         # Create PlanetLab slivers
         self.do_provisioning()
         
-        # Wait for all nodes to be ready
-        self.wait_nodes()
+        # Configure elements per XML data
+        super(TestbedController, self).do_preconfigure()
     
     def do_resource_discovery(self):
         # Do what?
-        pass
+        
+        # Provisional algo:
+        #   look for perfectly defined nodes
+        #   (ie: those with only one candidate)
+        to_provision = self._to_provision = set()
+        for guid, node in self._elements.iteritems():
+            if isinstance(node, self._node.Node) and node._node_id is None:
+                # Try existing nodes first
+                # If we have only one candidate, simply use it
+                candidates = node.find_candidates(
+                    filter_slice_id = self.slice_id)
+                if len(candidates) == 1:
+                    node.assign_node_id(iter(candidates).next())
+                else:
+                    # Try again including unassigned nodes
+                    candidates = node.find_candidates()
+                    if len(candidates) > 1:
+                        raise RuntimeError, "Cannot assign resources for node %s, too many candidates" % (guid,)
+                    if len(candidates) == 1:
+                        node_id = iter(candidates).next()
+                        node.assign_node_id(node_id)
+                        to_provision.add(node_id)
+                    elif not candidates:
+                        raise RuntimeError, "Cannot assign resources for node %s, no candidates" % (guid,)
 
     def do_provisioning(self):
-        # Que te recontra?
-        pass
+        if self._to_provision:
+            # Add new nodes to the slice
+            cur_nodes = self.plapi.GetSlices(self.slicename, ['node_ids'])[0]['node_ids']
+            new_nodes = list(set(cur_nodes) | self._to_provision)
+            self.plapi.UpdateSlice(self.slicename, nodes=new_nodes)
+    
+        # cleanup
+        del self._to_provision
     
-    def wait_nodes(self):
-        # Suuure...
-        pass
 
     def set(self, time, guid, name, value):
         super(TestbedController, self).set(time, guid, name, value)
index 8289655..2d6b7f0 100644 (file)
@@ -1,6 +1,8 @@
 #!/usr/bin/env python
 # -*- coding: utf-8 -*-
 
+import time
+
 from constants import TESTBED_ID
 from nepi.core import metadata
 from nepi.core.attributes import Attribute
@@ -142,16 +144,17 @@ def configure_node(testbed_instance, guid):
     node.ident_path = testbed_instance.sliceSSHKey
     node.slicename = testbed_instance.slicename
     
-    # If we have only one candidate, simply use it
-    candidates = node.find_candidates(
-        filter_slice_id = testbed_instance.slice_id)
-    if len(candidates) == 1:
-        node.assign_node_id(iter(candidates).next())
-    
     # Do some validations
     node.validate()
     
-    # TODO: this should be done in parallel in all nodes
+    # recently provisioned nodes may not be up yet
+    sleeptime = 1.0
+    while not node.is_alive():
+        time.sleep(sleeptime)
+        sleeptime = min(30.0, sleeptime*1.5)
+    
+    # this will be done in parallel in all nodes
+    # this call only spawns the process
     node.install_dependencies()
 
 def configure_application(testbed_instance, guid):
index 0e31046..10ff593 100644 (file)
@@ -8,6 +8,8 @@ import rspawn
 import time
 import os
 
+from nepi.util import server
+
 class Node(object):
     BASEFILTERS = {
         # Map Node attribute to plcapi filter name
@@ -56,6 +58,7 @@ class Node(object):
         # Testbed-derived attributes
         self.slicename = None
         self.ident_path = None
+        self.server_key = None
         self.home_path = None
         
         # Those are filled when an actual node is allocated
@@ -136,7 +139,7 @@ class Node(object):
         self.fetch_node_info()
     
     def fetch_node_info(self):
-        info = self._api.GetNodes(self._node_id)
+        info = self._api.GetNodes(self._node_id)[0]
         tags = dict( (t['tagname'],t['value'])
                      for t in self._api.GetNodeTags(node_id=self._node_id, fields=('tagname','value')) )
 
@@ -161,6 +164,9 @@ class Node(object):
         if 'interface_ids' in info:
             self.min_num_external_ifaces = \
             self.max_num_external_ifaces = len(info['interface_ids'])
+        
+        if 'ssh_rsa_key' in info:
+            self.server_key = info['ssh_rsa_key']
 
     def validate(self):
         if self.home_path is None:
@@ -191,6 +197,7 @@ class Node(object):
                 user = self.slicename,
                 agent = None,
                 ident_key = self.ident_path,
+                server_key = self.server_key,
                 sudo = True
                 )
             
@@ -210,7 +217,8 @@ class Node(object):
                     port = None,
                     user = self.slicename,
                     agent = None,
-                    ident_key = self.ident_path
+                    ident_key = self.ident_path,
+                    server_key = self.server_key
                     )
                 if pidtuple:
                     pid, ppid = pidtuple
@@ -227,9 +235,29 @@ class Node(object):
                     port = None,
                     user = self.slicename,
                     agent = None,
-                    ident_key = self.ident_path
+                    ident_key = self.ident_path,
+                    server_key = self.server_key
                     ):
                 time.sleep(probe)
         
-
+    def is_alive(self):
+        # Make sure all the paths are created where 
+        # they have to be created for deployment
+        (out,err),proc = server.popen_ssh_command(
+            "echo 'ALIVE'",
+            host = self.hostname,
+            port = None,
+            user = self.slicename,
+            agent = None,
+            ident_key = self.ident_path,
+            server_key = self.server_key
+            )
+        
+        if proc.wait():
+            return False
+        elif not err and out.strip() == 'ALIVE':
+            return True
+        else:
+            return False
+    
 
index 2c1752e..825713c 100644 (file)
@@ -115,10 +115,11 @@ class PLCAPI(object):
                 (peer['peername'], peer['peer_id'])
                 for peer in peers
             )
-            self._peer_map = dict(
+            self._peer_map.update(
                 (peer['peer_id'], peer['shortname'])
                 for peer in peers
             )
+            self._peer_map[None] = self._localPeerName
             return self._peer_map
     
 
@@ -246,4 +247,7 @@ class PLCAPI(object):
             filters.update(kw)
             return self.api.GetSlices(self.auth, filters, *fieldstuple)
         
+    def UpdateSlice(self, sliceIdOrName, **kw):
+        return self.api.UpdateSlice(self.auth, sliceIdOrName, kw)
+        
 
index b656773..175540b 100644 (file)
@@ -24,7 +24,9 @@ class NOT_STARTED:
     """
 
 def remote_spawn(command, pidfile, stdout='/dev/null', stderr=STDOUT, stdin='/dev/null', home=None, create_home=False, sudo=False,
-        host = None, port = None, user = None, agent = None, ident_key = None, tty = False):
+        host = None, port = None, user = None, agent = None, 
+        ident_key = None, server_key = None,
+        tty = False):
     """
     Spawn a remote command such that it will continue working asynchronously.
     
@@ -78,6 +80,7 @@ def remote_spawn(command, pidfile, stdout='/dev/null', stderr=STDOUT, stdin='/de
         user = user,
         agent = agent,
         ident_key = ident_key,
+        server_key = server_key,
         tty = tty 
         )
     
@@ -87,7 +90,8 @@ def remote_spawn(command, pidfile, stdout='/dev/null', stderr=STDOUT, stdin='/de
     return (out,err),proc
 
 def remote_check_pid(pidfile,
-        host = None, port = None, user = None, agent = None, ident_key = None):
+        host = None, port = None, user = None, agent = None, 
+        ident_key = None, server_key = None):
     """
     Check the pidfile of a process spawned with remote_spawn.
     
@@ -110,7 +114,8 @@ def remote_check_pid(pidfile,
         port = port,
         user = user,
         agent = agent,
-        ident_key = ident_key
+        ident_key = ident_key,
+        server_key = server_key
         )
         
     if proc.wait():
@@ -125,7 +130,8 @@ def remote_check_pid(pidfile,
 
 
 def remote_status(pid, ppid, 
-        host = None, port = None, user = None, agent = None, ident_key = None):
+        host = None, port = None, user = None, agent = None, 
+        ident_key = None, server_key = None):
     """
     Check the status of a process spawned with remote_spawn.
     
@@ -148,7 +154,8 @@ def remote_status(pid, ppid,
         port = port,
         user = user,
         agent = agent,
-        ident_key = ident_key
+        ident_key = ident_key,
+        server_key = server_key
         )
     
     if proc.wait():
@@ -165,7 +172,8 @@ def remote_status(pid, ppid,
     
 
 def remote_kill(pid, ppid, sudo = False,
-        host = None, port = None, user = None, agent = None, ident_key = None):
+        host = None, port = None, user = None, agent = None, 
+        ident_key = None, server_key = None):
     """
     Kill a process spawned with remote_spawn.
     
@@ -206,7 +214,8 @@ fi
         port = port,
         user = user,
         agent = agent,
-        ident_key = ident_key
+        ident_key = ident_key,
+        server_key = server_key
         )
     
     # wait, don't leave zombies around
index 68c10fe..f52acf5 100644 (file)
@@ -14,6 +14,7 @@ import time
 import traceback
 import signal
 import re
+import tempfile
 
 CTRL_SOCK = "ctrl.sock"
 STD_ERR = "stderr.log"
@@ -340,13 +341,31 @@ class Client(object):
         encoded = data.rstrip() 
         return base64.b64decode(encoded)
 
+def _make_server_key_args(server_key, host, port, args):
+    """ 
+    Returns a reference to the created temporary file, and adds the
+    corresponding arguments to the given argument list.
+    
+    Make sure to hold onto it until the process is done with the file
+    """
+    if port is not None:
+        host = '%s:%s' % (host,port)
+    # Create a temporary server key file
+    tmp_known_hosts = tempfile.NamedTemporaryFile()
+    tmp_known_hosts.write('%s,%s %s\n' % (host, socket.gethostbyname(host), server_key))
+    tmp_known_hosts.flush()
+    args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
+    return tmp_known_hosts
+
 def popen_ssh_command(command, host, port, user, agent, 
             stdin="", 
             ident_key = None,
+            server_key = None,
             tty = False):
         """
         Executes a remote commands, returns ((stdout,stderr),process)
         """
+        tmp_known_hosts = None
         args = ['ssh',
                 # Don't bother with localhost. Makes test easier
                 '-o', 'NoHostAuthenticationForLocalhost=yes',
@@ -359,6 +378,10 @@ def popen_ssh_command(command, host, port, user, agent,
             args.extend(('-i', ident_key))
         if tty:
             args.append('-t')
+        if server_key:
+            # Create a temporary server key file
+            tmp_known_hosts = _make_server_key_args(
+                server_key, host, port, args)
         args.append(command)
 
         # connects to the remote host and starts a remote connection
@@ -366,13 +389,19 @@ def popen_ssh_command(command, host, port, user, agent,
                 stdout = subprocess.PIPE,
                 stdin = subprocess.PIPE, 
                 stderr = subprocess.PIPE)
+        
+        # attach tempfile object to the process, to make sure the file stays
+        # alive until the process is finished with it
+        proc._known_hosts = tmp_known_hosts
+        
         return (proc.communicate(stdin), proc)
  
 def popen_scp(source, dest, 
             port = None, 
             agent = None, 
             recursive = False,
-            ident_key = None):
+            ident_key = None,
+            server_key = None):
         """
         Copies from/to remote sites.
         
@@ -392,9 +421,15 @@ def popen_scp(source, dest,
                 or hasattr(source, 'read')  or hasattr(dest, 'write'):
             assert not resursive
             
-            # Parse destination as <user>@<server>:<path>
-            tgtspec, path = dest.split(':',1)
-            user,host = tgtspec.rsplit('@',1)
+            # Parse source/destination as <user>@<server>:<path>
+            if isinstance(dest, basestring) and ':' in dest:
+                remspec, path = dest.split(':',1)
+            elif isinstance(source, basestring) and ':' in source:
+                remspec, path = source.split(':',1)
+            else:
+                raise ValueError, "Both endpoints cannot be local"
+            user,host = remspec.rsplit('@',1)
+            tmp_known_hosts = None
             
             args = ['ssh', '-l', user, '-C',
                     # Don't bother with localhost. Makes test easier
@@ -403,6 +438,10 @@ def popen_scp(source, dest,
                 args.append('-P%d' % port)
             if ident_key:
                 args.extend(('-i', ident_key))
+            if server_key:
+                # Create a temporary server key file
+                tmp_known_hosts = _make_server_key_args(
+                    server_key, host, port, args)
             
             if isinstance(source, file) or hasattr(source, 'read'):
                 args.append('cat > %s' % (shell_escape(path),))
@@ -418,6 +457,7 @@ def popen_scp(source, dest,
                         stderr = subprocess.PIPE,
                         stdin = source)
                 err = proc.stderr.read()
+                proc._known_hosts = tmp_known_hosts
                 proc.wait()
                 return ((None,err), proc)
             elif isinstance(dest, file):
@@ -426,6 +466,7 @@ def popen_scp(source, dest,
                         stderr = subprocess.PIPE,
                         stdin = source)
                 err = proc.stderr.read()
+                proc._known_hosts = tmp_known_hosts
                 proc.wait()
                 return ((None,err), proc)
             elif hasattr(source, 'read'):
@@ -458,6 +499,7 @@ def popen_scp(source, dest,
                         break
                 err.append(proc.stderr.read())
                     
+                proc._known_hosts = tmp_known_hosts
                 proc.wait()
                 return ((None,''.join(err)), proc)
             elif hasattr(dest, 'write'):
@@ -487,12 +529,23 @@ def popen_scp(source, dest,
                         break
                 err.append(proc.stderr.read())
                     
+                proc._known_hosts = tmp_known_hosts
                 proc.wait()
                 return ((None,''.join(err)), proc)
             else:
                 raise AssertionError, "Unreachable code reached! :-Q"
         else:
+            # Parse destination as <user>@<server>:<path>
+            if isinstance(dest, basestring) and ':' in dest:
+                remspec, path = dest.split(':',1)
+            elif isinstance(source, basestring) and ':' in source:
+                remspec, path = source.split(':',1)
+            else:
+                raise ValueError, "Both endpoints cannot be local"
+            user,host = remspec.rsplit('@',1)
+            
             # plain scp
+            tmp_known_hosts = None
             args = ['scp', '-q', '-p', '-C',
                     # Don't bother with localhost. Makes test easier
                     '-o', 'NoHostAuthenticationForLocalhost=yes' ]
@@ -502,6 +555,10 @@ def popen_scp(source, dest,
                 args.append('-r')
             if ident_key:
                 args.extend(('-i', ident_key))
+            if server_key:
+                # Create a temporary server key file
+                tmp_known_hosts = _make_server_key_args(
+                    server_key, host, port, args)
             args.append(source)
             args.append(dest)
 
@@ -510,6 +567,8 @@ def popen_scp(source, dest,
                     stdout = subprocess.PIPE,
                     stdin = subprocess.PIPE, 
                     stderr = subprocess.PIPE)
+            proc._known_hosts = tmp_known_hosts
+            
             comm = proc.communicate()
             proc.wait()
             return (comm, proc)
@@ -517,6 +576,7 @@ def popen_scp(source, dest,
 def popen_ssh_subprocess(python_code, host, port, user, agent, 
         python_path = None,
         ident_key = None,
+        server_key = None,
         tty = False):
         if python_path:
             python_path.replace("'", r"'\''")
@@ -541,6 +601,7 @@ def popen_ssh_subprocess(python_code, host, port, user, agent,
         cmd += "os.write(1, \"OK\\n\")\n" # send a sync message
         cmd += "exec(cmd)\n'"
 
+        tmp_known_hosts = None
         args = ['ssh',
                 # Don't bother with localhost. Makes test easier
                 '-o', 'NoHostAuthenticationForLocalhost=yes',
@@ -553,6 +614,10 @@ def popen_ssh_subprocess(python_code, host, port, user, agent,
             args.extend(('-i', ident_key))
         if tty:
             args.append('-t')
+        if server_key:
+            # Create a temporary server key file
+            tmp_known_hosts = _make_server_key_args(
+                server_key, host, port, args)
         args.append(cmd)
 
         # connects to the remote host and starts a remote rpyc connection
@@ -560,6 +625,8 @@ def popen_ssh_subprocess(python_code, host, port, user, agent,
                 stdout = subprocess.PIPE,
                 stdin = subprocess.PIPE, 
                 stderr = subprocess.PIPE)
+        proc._known_hosts = tmp_known_hosts
+        
         # send the command to execute
         os.write(proc.stdin.fileno(),
                 base64.b64encode(python_code) + "\n")