From fe42f33a7a49165888f9af1776001629bbe09b94 Mon Sep 17 00:00:00 2001 From: Claudio-Daniel Freire Date: Tue, 26 Apr 2011 13:19:06 +0200 Subject: [PATCH] * Automatic provisioning * Server key validation * Trivial resource allocation (only perfectly-defined hosts) --- src/nepi/testbeds/planetlab/application.py | 27 +++++--- src/nepi/testbeds/planetlab/execute.py | 48 +++++++++---- src/nepi/testbeds/planetlab/metadata_v01.py | 17 +++-- src/nepi/testbeds/planetlab/node.py | 36 ++++++++-- src/nepi/testbeds/planetlab/plcapi.py | 6 +- src/nepi/testbeds/planetlab/rspawn.py | 23 +++++-- src/nepi/util/server.py | 75 +++++++++++++++++++-- 7 files changed, 188 insertions(+), 44 deletions(-) diff --git a/src/nepi/testbeds/planetlab/application.py b/src/nepi/testbeds/planetlab/application.py index c71e852a..e4ff0758 100644 --- a/src/nepi/testbeds/planetlab/application.py +++ b/src/nepi/testbeds/planetlab/application.py @@ -85,7 +85,8 @@ class Application(object): port = None, user = self.slicename, agent = None, - ident_key = self.ident_path + ident_key = self.ident_path, + server_key = self.node.server_key ) if proc.wait(): @@ -103,7 +104,8 @@ class Application(object): port = None, user = self.slicename, agent = None, - ident_key = self.ident_path + ident_key = self.ident_path, + server_key = self.node.server_key ) if pidtuple: @@ -145,7 +147,8 @@ class Application(object): port = None, user = self.slicename, agent = None, - ident_key = self.ident_path + ident_key = self.ident_path, + server_key = self.node.server_key ) def remote_trace_path(self, whichtrace): @@ -179,7 +182,8 @@ class Application(object): local_path, port = None, agent = None, - ident_key = self.ident_path + ident_key = self.ident_path, + server_key = self.node.server_key ) if proc.wait(): @@ -201,7 +205,8 @@ class Application(object): port = None, user = self.slicename, agent = None, - ident_key = self.ident_path + ident_key = self.ident_path, + server_key = self.node.server_key ) if proc.wait(): @@ -216,7 +221,8 @@ class Application(object): os.path.join(self.home_path, 'stdin') ), port = None, agent = None, - ident_key = self.ident_path + ident_key = self.ident_path, + server_key = self.node.server_key ) if proc.wait(): @@ -232,7 +238,8 @@ class Application(object): source, "%s@%s:%s" % (self.slicename, self.node.hostname, os.path.join(self.home_path,'.'),), - ident_key = self.ident_path + ident_key = self.ident_path, + server_key = self.node.server_key ) if proc.wait(): @@ -248,7 +255,8 @@ class Application(object): port = None, user = self.slicename, agent = None, - ident_key = self.ident_path + ident_key = self.ident_path, + server_key = self.node.server_key ) if proc.wait(): @@ -266,7 +274,8 @@ class Application(object): port = None, user = self.slicename, agent = None, - ident_key = self.ident_path + ident_key = self.ident_path, + server_key = self.node.server_key ) if proc.wait(): diff --git a/src/nepi/testbeds/planetlab/execute.py b/src/nepi/testbeds/planetlab/execute.py index b0918c57..860f6c30 100644 --- a/src/nepi/testbeds/planetlab/execute.py +++ b/src/nepi/testbeds/planetlab/execute.py @@ -4,6 +4,7 @@ from constants import TESTBED_ID from nepi.core import testbed_impl import os +import time class TestbedController(testbed_impl.TestbedController): def __init__(self, testbed_version): @@ -58,10 +59,7 @@ class TestbedController(testbed_impl.TestbedController): self.sliceSSHKey = self._attributes.\ get_attribute_value("sliceSSHKey") - def do_create(self): - # Create node elements per XML data - super(TestbedController, self).do_create() - + def do_preconfigure(self): # Perform resource discovery if we don't have # specific resources assigned yet self.do_resource_discovery() @@ -69,20 +67,46 @@ class TestbedController(testbed_impl.TestbedController): # Create PlanetLab slivers self.do_provisioning() - # Wait for all nodes to be ready - self.wait_nodes() + # Configure elements per XML data + super(TestbedController, self).do_preconfigure() def do_resource_discovery(self): # Do what? - pass + + # Provisional algo: + # look for perfectly defined nodes + # (ie: those with only one candidate) + to_provision = self._to_provision = set() + for guid, node in self._elements.iteritems(): + if isinstance(node, self._node.Node) and node._node_id is None: + # Try existing nodes first + # If we have only one candidate, simply use it + candidates = node.find_candidates( + filter_slice_id = self.slice_id) + if len(candidates) == 1: + node.assign_node_id(iter(candidates).next()) + else: + # Try again including unassigned nodes + candidates = node.find_candidates() + if len(candidates) > 1: + raise RuntimeError, "Cannot assign resources for node %s, too many candidates" % (guid,) + if len(candidates) == 1: + node_id = iter(candidates).next() + node.assign_node_id(node_id) + to_provision.add(node_id) + elif not candidates: + raise RuntimeError, "Cannot assign resources for node %s, no candidates" % (guid,) def do_provisioning(self): - # Que te recontra? - pass + if self._to_provision: + # Add new nodes to the slice + cur_nodes = self.plapi.GetSlices(self.slicename, ['node_ids'])[0]['node_ids'] + new_nodes = list(set(cur_nodes) | self._to_provision) + self.plapi.UpdateSlice(self.slicename, nodes=new_nodes) + + # cleanup + del self._to_provision - def wait_nodes(self): - # Suuure... - pass def set(self, time, guid, name, value): super(TestbedController, self).set(time, guid, name, value) diff --git a/src/nepi/testbeds/planetlab/metadata_v01.py b/src/nepi/testbeds/planetlab/metadata_v01.py index 8289655e..2d6b7f00 100644 --- a/src/nepi/testbeds/planetlab/metadata_v01.py +++ b/src/nepi/testbeds/planetlab/metadata_v01.py @@ -1,6 +1,8 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- +import time + from constants import TESTBED_ID from nepi.core import metadata from nepi.core.attributes import Attribute @@ -142,16 +144,17 @@ def configure_node(testbed_instance, guid): node.ident_path = testbed_instance.sliceSSHKey node.slicename = testbed_instance.slicename - # If we have only one candidate, simply use it - candidates = node.find_candidates( - filter_slice_id = testbed_instance.slice_id) - if len(candidates) == 1: - node.assign_node_id(iter(candidates).next()) - # Do some validations node.validate() - # TODO: this should be done in parallel in all nodes + # recently provisioned nodes may not be up yet + sleeptime = 1.0 + while not node.is_alive(): + time.sleep(sleeptime) + sleeptime = min(30.0, sleeptime*1.5) + + # this will be done in parallel in all nodes + # this call only spawns the process node.install_dependencies() def configure_application(testbed_instance, guid): diff --git a/src/nepi/testbeds/planetlab/node.py b/src/nepi/testbeds/planetlab/node.py index 0e31046b..10ff5930 100644 --- a/src/nepi/testbeds/planetlab/node.py +++ b/src/nepi/testbeds/planetlab/node.py @@ -8,6 +8,8 @@ import rspawn import time import os +from nepi.util import server + class Node(object): BASEFILTERS = { # Map Node attribute to plcapi filter name @@ -56,6 +58,7 @@ class Node(object): # Testbed-derived attributes self.slicename = None self.ident_path = None + self.server_key = None self.home_path = None # Those are filled when an actual node is allocated @@ -136,7 +139,7 @@ class Node(object): self.fetch_node_info() def fetch_node_info(self): - info = self._api.GetNodes(self._node_id) + info = self._api.GetNodes(self._node_id)[0] tags = dict( (t['tagname'],t['value']) for t in self._api.GetNodeTags(node_id=self._node_id, fields=('tagname','value')) ) @@ -161,6 +164,9 @@ class Node(object): if 'interface_ids' in info: self.min_num_external_ifaces = \ self.max_num_external_ifaces = len(info['interface_ids']) + + if 'ssh_rsa_key' in info: + self.server_key = info['ssh_rsa_key'] def validate(self): if self.home_path is None: @@ -191,6 +197,7 @@ class Node(object): user = self.slicename, agent = None, ident_key = self.ident_path, + server_key = self.server_key, sudo = True ) @@ -210,7 +217,8 @@ class Node(object): port = None, user = self.slicename, agent = None, - ident_key = self.ident_path + ident_key = self.ident_path, + server_key = self.server_key ) if pidtuple: pid, ppid = pidtuple @@ -227,9 +235,29 @@ class Node(object): port = None, user = self.slicename, agent = None, - ident_key = self.ident_path + ident_key = self.ident_path, + server_key = self.server_key ): time.sleep(probe) - + def is_alive(self): + # Make sure all the paths are created where + # they have to be created for deployment + (out,err),proc = server.popen_ssh_command( + "echo 'ALIVE'", + host = self.hostname, + port = None, + user = self.slicename, + agent = None, + ident_key = self.ident_path, + server_key = self.server_key + ) + + if proc.wait(): + return False + elif not err and out.strip() == 'ALIVE': + return True + else: + return False + diff --git a/src/nepi/testbeds/planetlab/plcapi.py b/src/nepi/testbeds/planetlab/plcapi.py index 2c1752e1..825713c4 100644 --- a/src/nepi/testbeds/planetlab/plcapi.py +++ b/src/nepi/testbeds/planetlab/plcapi.py @@ -115,10 +115,11 @@ class PLCAPI(object): (peer['peername'], peer['peer_id']) for peer in peers ) - self._peer_map = dict( + self._peer_map.update( (peer['peer_id'], peer['shortname']) for peer in peers ) + self._peer_map[None] = self._localPeerName return self._peer_map @@ -246,4 +247,7 @@ class PLCAPI(object): filters.update(kw) return self.api.GetSlices(self.auth, filters, *fieldstuple) + def UpdateSlice(self, sliceIdOrName, **kw): + return self.api.UpdateSlice(self.auth, sliceIdOrName, kw) + diff --git a/src/nepi/testbeds/planetlab/rspawn.py b/src/nepi/testbeds/planetlab/rspawn.py index b6567734..175540bb 100644 --- a/src/nepi/testbeds/planetlab/rspawn.py +++ b/src/nepi/testbeds/planetlab/rspawn.py @@ -24,7 +24,9 @@ class NOT_STARTED: """ def remote_spawn(command, pidfile, stdout='/dev/null', stderr=STDOUT, stdin='/dev/null', home=None, create_home=False, sudo=False, - host = None, port = None, user = None, agent = None, ident_key = None, tty = False): + host = None, port = None, user = None, agent = None, + ident_key = None, server_key = None, + tty = False): """ Spawn a remote command such that it will continue working asynchronously. @@ -78,6 +80,7 @@ def remote_spawn(command, pidfile, stdout='/dev/null', stderr=STDOUT, stdin='/de user = user, agent = agent, ident_key = ident_key, + server_key = server_key, tty = tty ) @@ -87,7 +90,8 @@ def remote_spawn(command, pidfile, stdout='/dev/null', stderr=STDOUT, stdin='/de return (out,err),proc def remote_check_pid(pidfile, - host = None, port = None, user = None, agent = None, ident_key = None): + host = None, port = None, user = None, agent = None, + ident_key = None, server_key = None): """ Check the pidfile of a process spawned with remote_spawn. @@ -110,7 +114,8 @@ def remote_check_pid(pidfile, port = port, user = user, agent = agent, - ident_key = ident_key + ident_key = ident_key, + server_key = server_key ) if proc.wait(): @@ -125,7 +130,8 @@ def remote_check_pid(pidfile, def remote_status(pid, ppid, - host = None, port = None, user = None, agent = None, ident_key = None): + host = None, port = None, user = None, agent = None, + ident_key = None, server_key = None): """ Check the status of a process spawned with remote_spawn. @@ -148,7 +154,8 @@ def remote_status(pid, ppid, port = port, user = user, agent = agent, - ident_key = ident_key + ident_key = ident_key, + server_key = server_key ) if proc.wait(): @@ -165,7 +172,8 @@ def remote_status(pid, ppid, def remote_kill(pid, ppid, sudo = False, - host = None, port = None, user = None, agent = None, ident_key = None): + host = None, port = None, user = None, agent = None, + ident_key = None, server_key = None): """ Kill a process spawned with remote_spawn. @@ -206,7 +214,8 @@ fi port = port, user = user, agent = agent, - ident_key = ident_key + ident_key = ident_key, + server_key = server_key ) # wait, don't leave zombies around diff --git a/src/nepi/util/server.py b/src/nepi/util/server.py index 68c10fe0..f52acf59 100644 --- a/src/nepi/util/server.py +++ b/src/nepi/util/server.py @@ -14,6 +14,7 @@ import time import traceback import signal import re +import tempfile CTRL_SOCK = "ctrl.sock" STD_ERR = "stderr.log" @@ -340,13 +341,31 @@ class Client(object): encoded = data.rstrip() return base64.b64decode(encoded) +def _make_server_key_args(server_key, host, port, args): + """ + Returns a reference to the created temporary file, and adds the + corresponding arguments to the given argument list. + + Make sure to hold onto it until the process is done with the file + """ + if port is not None: + host = '%s:%s' % (host,port) + # Create a temporary server key file + tmp_known_hosts = tempfile.NamedTemporaryFile() + tmp_known_hosts.write('%s,%s %s\n' % (host, socket.gethostbyname(host), server_key)) + tmp_known_hosts.flush() + args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)]) + return tmp_known_hosts + def popen_ssh_command(command, host, port, user, agent, stdin="", ident_key = None, + server_key = None, tty = False): """ Executes a remote commands, returns ((stdout,stderr),process) """ + tmp_known_hosts = None args = ['ssh', # Don't bother with localhost. Makes test easier '-o', 'NoHostAuthenticationForLocalhost=yes', @@ -359,6 +378,10 @@ def popen_ssh_command(command, host, port, user, agent, args.extend(('-i', ident_key)) if tty: args.append('-t') + if server_key: + # Create a temporary server key file + tmp_known_hosts = _make_server_key_args( + server_key, host, port, args) args.append(command) # connects to the remote host and starts a remote connection @@ -366,13 +389,19 @@ def popen_ssh_command(command, host, port, user, agent, stdout = subprocess.PIPE, stdin = subprocess.PIPE, stderr = subprocess.PIPE) + + # attach tempfile object to the process, to make sure the file stays + # alive until the process is finished with it + proc._known_hosts = tmp_known_hosts + return (proc.communicate(stdin), proc) def popen_scp(source, dest, port = None, agent = None, recursive = False, - ident_key = None): + ident_key = None, + server_key = None): """ Copies from/to remote sites. @@ -392,9 +421,15 @@ def popen_scp(source, dest, or hasattr(source, 'read') or hasattr(dest, 'write'): assert not resursive - # Parse destination as @: - tgtspec, path = dest.split(':',1) - user,host = tgtspec.rsplit('@',1) + # Parse source/destination as @: + if isinstance(dest, basestring) and ':' in dest: + remspec, path = dest.split(':',1) + elif isinstance(source, basestring) and ':' in source: + remspec, path = source.split(':',1) + else: + raise ValueError, "Both endpoints cannot be local" + user,host = remspec.rsplit('@',1) + tmp_known_hosts = None args = ['ssh', '-l', user, '-C', # Don't bother with localhost. Makes test easier @@ -403,6 +438,10 @@ def popen_scp(source, dest, args.append('-P%d' % port) if ident_key: args.extend(('-i', ident_key)) + if server_key: + # Create a temporary server key file + tmp_known_hosts = _make_server_key_args( + server_key, host, port, args) if isinstance(source, file) or hasattr(source, 'read'): args.append('cat > %s' % (shell_escape(path),)) @@ -418,6 +457,7 @@ def popen_scp(source, dest, stderr = subprocess.PIPE, stdin = source) err = proc.stderr.read() + proc._known_hosts = tmp_known_hosts proc.wait() return ((None,err), proc) elif isinstance(dest, file): @@ -426,6 +466,7 @@ def popen_scp(source, dest, stderr = subprocess.PIPE, stdin = source) err = proc.stderr.read() + proc._known_hosts = tmp_known_hosts proc.wait() return ((None,err), proc) elif hasattr(source, 'read'): @@ -458,6 +499,7 @@ def popen_scp(source, dest, break err.append(proc.stderr.read()) + proc._known_hosts = tmp_known_hosts proc.wait() return ((None,''.join(err)), proc) elif hasattr(dest, 'write'): @@ -487,12 +529,23 @@ def popen_scp(source, dest, break err.append(proc.stderr.read()) + proc._known_hosts = tmp_known_hosts proc.wait() return ((None,''.join(err)), proc) else: raise AssertionError, "Unreachable code reached! :-Q" else: + # Parse destination as @: + if isinstance(dest, basestring) and ':' in dest: + remspec, path = dest.split(':',1) + elif isinstance(source, basestring) and ':' in source: + remspec, path = source.split(':',1) + else: + raise ValueError, "Both endpoints cannot be local" + user,host = remspec.rsplit('@',1) + # plain scp + tmp_known_hosts = None args = ['scp', '-q', '-p', '-C', # Don't bother with localhost. Makes test easier '-o', 'NoHostAuthenticationForLocalhost=yes' ] @@ -502,6 +555,10 @@ def popen_scp(source, dest, args.append('-r') if ident_key: args.extend(('-i', ident_key)) + if server_key: + # Create a temporary server key file + tmp_known_hosts = _make_server_key_args( + server_key, host, port, args) args.append(source) args.append(dest) @@ -510,6 +567,8 @@ def popen_scp(source, dest, stdout = subprocess.PIPE, stdin = subprocess.PIPE, stderr = subprocess.PIPE) + proc._known_hosts = tmp_known_hosts + comm = proc.communicate() proc.wait() return (comm, proc) @@ -517,6 +576,7 @@ def popen_scp(source, dest, def popen_ssh_subprocess(python_code, host, port, user, agent, python_path = None, ident_key = None, + server_key = None, tty = False): if python_path: python_path.replace("'", r"'\''") @@ -541,6 +601,7 @@ def popen_ssh_subprocess(python_code, host, port, user, agent, cmd += "os.write(1, \"OK\\n\")\n" # send a sync message cmd += "exec(cmd)\n'" + tmp_known_hosts = None args = ['ssh', # Don't bother with localhost. Makes test easier '-o', 'NoHostAuthenticationForLocalhost=yes', @@ -553,6 +614,10 @@ def popen_ssh_subprocess(python_code, host, port, user, agent, args.extend(('-i', ident_key)) if tty: args.append('-t') + if server_key: + # Create a temporary server key file + tmp_known_hosts = _make_server_key_args( + server_key, host, port, args) args.append(cmd) # connects to the remote host and starts a remote rpyc connection @@ -560,6 +625,8 @@ def popen_ssh_subprocess(python_code, host, port, user, agent, stdout = subprocess.PIPE, stdin = subprocess.PIPE, stderr = subprocess.PIPE) + proc._known_hosts = tmp_known_hosts + # send the command to execute os.write(proc.stdin.fileno(), base64.b64encode(python_code) + "\n") -- 2.47.0