Merging with HEAD
[nepi.git] / src / nepi / testbeds / planetlab / execute.py
index 1562f26..8813ae4 100644 (file)
@@ -3,8 +3,27 @@
 
 from constants import TESTBED_ID
 from nepi.core import testbed_impl
+from nepi.util.constants import TIME_NOW
+from nepi.util.graphtools import mst
+from nepi.util import ipaddr2
 import os
+import os.path
 import time
+import resourcealloc
+import collections
+import operator
+import functools
+import socket
+import struct
+import tempfile
+import subprocess
+import random
+import shutil
+
+from nepi.util.constants import TESTBED_STATUS_CONFIGURED
+
+class TempKeyError(Exception):
+    pass
 
 class TestbedController(testbed_impl.TestbedController):
     def __init__(self, testbed_version):
@@ -12,7 +31,7 @@ class TestbedController(testbed_impl.TestbedController):
         self._home_directory = None
         self.slicename = None
         self._traces = dict()
-        
+
         import node, interfaces, application
         self._node = node
         self._interfaces = interfaces
@@ -21,21 +40,24 @@ class TestbedController(testbed_impl.TestbedController):
     @property
     def home_directory(self):
         return self._home_directory
-    
+
     @property
     def plapi(self):
         if not hasattr(self, '_plapi'):
             import plcapi
-            
+
             if self.authUser:
                 self._plapi = plcapi.PLCAPI(
                     username = self.authUser,
-                    password = self.authString)
+                    password = self.authString,
+                    hostname = self.plcHost,
+                    urlpattern = self.plcUrl
+                    )
             else:
                 # anonymous access - may not be enough for much
                 self._plapi = plcapi.PLCAPI()
         return self._plapi
-    
+
     @property
     def slice_id(self):
         if not hasattr(self, '_slice_id'):
@@ -58,25 +80,44 @@ class TestbedController(testbed_impl.TestbedController):
             get_attribute_value("authPass")
         self.sliceSSHKey = self._attributes.\
             get_attribute_value("sliceSSHKey")
+        self.sliceSSHKeyPass = None
+        self.plcHost = self._attributes.\
+            get_attribute_value("plcHost")
+        self.plcUrl = self._attributes.\
+            get_attribute_value("plcUrl")
+        super(TestbedController, self).do_setup()
+
+    def do_post_asynclaunch(self, guid):
+        # Dependencies were launched asynchronously,
+        # so wait for them
+        dep = self._elements[guid]
+        if isinstance(dep, self._app.Dependency):
+            dep.async_setup_wait()
+    
+    # Two-phase configuration for asynchronous launch
+    do_poststep_preconfigure = staticmethod(do_post_asynclaunch)
+    do_poststep_configure = staticmethod(do_post_asynclaunch)
 
     def do_preconfigure(self):
         # Perform resource discovery if we don't have
         # specific resources assigned yet
         self.do_resource_discovery()
-        
+
         # Create PlanetLab slivers
         self.do_provisioning()
         
+        # Plan application deployment
+        self.do_spanning_deployment_plan()
+
         # Configure elements per XML data
         super(TestbedController, self).do_preconfigure()
-    
+
     def do_resource_discovery(self):
-        # Do what?
+        to_provision = self._to_provision = set()
         
-        # Provisional algo:
+        # Initial algo:
         #   look for perfectly defined nodes
         #   (ie: those with only one candidate)
-        to_provision = self._to_provision = set()
         for guid, node in self._elements.iteritems():
             if isinstance(node, self._node.Node) and node._node_id is None:
                 # Try existing nodes first
@@ -89,13 +130,44 @@ class TestbedController(testbed_impl.TestbedController):
                     # Try again including unassigned nodes
                     candidates = node.find_candidates()
                     if len(candidates) > 1:
-                        raise RuntimeError, "Cannot assign resources for node %s, too many candidates" % (guid,)
+                        continue
                     if len(candidates) == 1:
                         node_id = iter(candidates).next()
                         node.assign_node_id(node_id)
                         to_provision.add(node_id)
                     elif not candidates:
-                        raise RuntimeError, "Cannot assign resources for node %s, no candidates" % (guid,)
+                        raise RuntimeError, "Cannot assign resources for node %s, no candidates sith %s" % (guid,
+                            node.make_filter_description())
+        
+        # Now do the backtracking search for a suitable solution
+        # First with existing slice nodes
+        reqs = []
+        nodes = []
+        for guid, node in self._elements.iteritems():
+            if isinstance(node, self._node.Node) and node._node_id is None:
+                # Try existing nodes first
+                # If we have only one candidate, simply use it
+                candidates = node.find_candidates(
+                    filter_slice_id = self.slice_id)
+                reqs.append(candidates)
+                nodes.append(node)
+        
+        if nodes and reqs:
+            try:
+                solution = resourcealloc.alloc(reqs)
+            except resourcealloc.ResourceAllocationError:
+                # Failed, try again with all nodes
+                reqs = []
+                for node in nodes:
+                    candidates = node.find_candidates()
+                    reqs.append(candidates)
+                
+                solution = resourcealloc.alloc(reqs)
+                to_provision.update(solution)
+            
+            # Do assign nodes
+            for node, node_id in zip(nodes, solution):
+                node.assign_node_id(node_id)
 
     def do_provisioning(self):
         if self._to_provision:
@@ -103,46 +175,181 @@ class TestbedController(testbed_impl.TestbedController):
             cur_nodes = self.plapi.GetSlices(self.slicename, ['node_ids'])[0]['node_ids']
             new_nodes = list(set(cur_nodes) | self._to_provision)
             self.plapi.UpdateSlice(self.slicename, nodes=new_nodes)
-    
+
         # cleanup
         del self._to_provision
     
+    def do_spanning_deployment_plan(self):
+        # Create application groups by collecting all applications
+        # based on their hash - the hash should contain everything that
+        # defines them and the platform they're built
+        
+        def dephash(app):
+            return (
+                frozenset((app.depends or "").split(' ')),
+                frozenset((app.sources or "").split(' ')),
+                app.build,
+                app.install,
+                app.node.architecture,
+                app.node.operatingSystem,
+                app.node.pl_distro,
+            )
+        
+        depgroups = collections.defaultdict(list)
+        
+        for element in self._elements.itervalues():
+            if isinstance(element, self._app.Dependency):
+                depgroups[dephash(element)].append(element)
+        
+        # Set up spanning deployment for those applications that
+        # have been deployed in several nodes.
+        for dh, group in depgroups.iteritems():
+            if len(group) > 1:
+                # Pick root (deterministically)
+                root = min(group, key=lambda app:app.node.hostname)
+                
+                # Obtain all IPs in numeric format
+                # (which means faster distance computations)
+                for dep in group:
+                    dep._ip = socket.gethostbyname(dep.node.hostname)
+                    dep._ip_n = struct.unpack('!L', socket.inet_aton(dep._ip))[0]
+                
+                # Compute plan
+                # NOTE: the plan is an iterator
+                plan = mst.mst(
+                    group,
+                    lambda a,b : ipaddr2.ipdistn(a._ip_n, b._ip_n),
+                    root = root,
+                    maxbranching = 2)
+                
+                # Re-sign private key
+                try:
+                    tempprk, temppuk, tmppass = self._make_temp_private_key()
+                except TempKeyError:
+                    continue
+                
+                # Set up slaves
+                plan = list(plan)
+                for slave, master in plan:
+                    slave.set_master(master)
+                    slave.install_keys(tempprk, temppuk, tmppass)
+                    
+        # We don't need the user's passphrase anymore
+        self.sliceSSHKeyPass = None
+    
+    def _make_temp_private_key(self):
+        # Get the user's key's passphrase
+        if not self.sliceSSHKeyPass:
+            if 'SSH_ASKPASS' in os.environ:
+                proc = subprocess.Popen(
+                    [ os.environ['SSH_ASKPASS'],
+                      "Please type the passphrase for the %s SSH identity file. "
+                      "The passphrase will be used to re-cipher the identity file with "
+                      "a random 256-bit key for automated chain deployment on the "
+                      "%s PlanetLab slice" % ( 
+                        os.path.basename(self.sliceSSHKey), 
+                        self.slicename
+                    ) ],
+                    stdin = open("/dev/null"),
+                    stdout = subprocess.PIPE,
+                    stderr = subprocess.PIPE)
+                out,err = proc.communicate()
+                self.sliceSSHKeyPass = out.strip()
+        
+        if not self.sliceSSHKeyPass:
+            raise TempKeyError
+        
+        # Create temporary key files
+        prk = tempfile.NamedTemporaryFile(
+            dir = self.root_directory,
+            prefix = "pl_deploy_tmpk_",
+            suffix = "")
 
-    def set(self, time, guid, name, value):
-        super(TestbedController, self).set(time, guid, name, value)
-        # TODO: take on account schedule time for the task 
+        puk = tempfile.NamedTemporaryFile(
+            dir = self.root_directory,
+            prefix = "pl_deploy_tmpk_",
+            suffix = ".pub")
+            
+        # Create secure 256-bits temporary passphrase
+        passphrase = ''.join(map(chr,[rng.randint(0,255) 
+                                      for rng in (random.SystemRandom(),)
+                                      for i in xrange(32)] )).encode("hex")
+                
+        # Copy keys
+        oprk = open(self.sliceSSHKey, "rb")
+        opuk = open(self.sliceSSHKey+".pub", "rb")
+        shutil.copymode(oprk.name, prk.name)
+        shutil.copymode(opuk.name, puk.name)
+        shutil.copyfileobj(oprk, prk)
+        shutil.copyfileobj(opuk, puk)
+        prk.flush()
+        puk.flush()
+        oprk.close()
+        opuk.close()
+        
+        # A descriptive comment
+        comment = "%s#NEPI_INTERNAL@%s" % (self.authUser, self.slicename)
+        
+        # Recipher keys
+        proc = subprocess.Popen(
+            ["ssh-keygen", "-p",
+             "-f", prk.name,
+             "-P", self.sliceSSHKeyPass,
+             "-N", passphrase,
+             "-C", comment ],
+            stdout = subprocess.PIPE,
+            stderr = subprocess.PIPE,
+            stdin = subprocess.PIPE
+        )
+        out, err = proc.communicate()
+        
+        if err:
+            raise RuntimeError, "Problem generating keys: \n%s\n%r" % (
+                out, err)
+        
+        prk.seek(0)
+        puk.seek(0)
+        
+        # Change comment on public key
+        puklines = puk.readlines()
+        puklines[0] = puklines[0].split(' ')
+        puklines[0][-1] = comment+'\n'
+        puklines[0] = ' '.join(puklines[0])
+        puk.seek(0)
+        puk.truncate()
+        puk.writelines(puklines)
+        del puklines
+        puk.flush()
+        
+        return prk, puk, passphrase
+    
+    def set(self, guid, name, value, time = TIME_NOW):
+        super(TestbedController, self).set(guid, name, value, time)
+        # TODO: take on account schedule time for the task
         element = self._elements[guid]
         if element:
             setattr(element, name, value)
-            
+
             if hasattr(element, 'refresh'):
                 # invoke attribute refresh hook
                 element.refresh()
 
-    def get(self, time, guid, name):
+    def get(self, guid, name, time = TIME_NOW):
+        value = super(TestbedController, self).get(guid, name, time)
         # TODO: take on account schedule time for the task
+        factory_id = self._create[guid]
+        factory = self._factories[factory_id]
+        if factory.box_attributes.is_attribute_design_only(name):
+            return value
         element = self._elements.get(guid)
-        if element:
-            try:
-                if hasattr(element, name):
-                    # Runtime attribute
-                    return getattr(element, name)
-                else:
-                    # Try design-time attributes
-                    return self.box_get(time, guid, name)
-            except KeyError, AttributeError:
-                return None
-
-    def get_route(self, guid, index, attribute):
-        # TODO: fetch real data from planetlab
         try:
-            return self.box_get_route(guid, int(index), attribute)
+            return getattr(element, name)
         except KeyError, AttributeError:
-            return None
+            return value
 
     def get_address(self, guid, index, attribute='Address'):
         index = int(index)
-        
+
         # try the real stuff
         iface = self._elements.get(guid)
         if iface and index == 0:
@@ -152,28 +359,30 @@ class TestbedController(testbed_impl.TestbedController):
                 return iface.netprefix
             elif attribute == 'Broadcast':
                 return iface.broadcast
-        
-        # if all else fails, query box
-        try:
-            return self.box_get_address(guid, index, attribute)
-        except KeyError, AttributeError:
-            return None
 
+        # if all else fails, query box
+        return super(TestbedController, self).get_address(guid, index, attribute)
 
     def action(self, time, guid, action):
         raise NotImplementedError
 
     def shutdown(self):
-        for trace in self._traces.values():
+        for trace in self._traces.itervalues():
             trace.close()
-        for element in self._elements.values():
+        for element in self._elements.itervalues():
             # invoke cleanup hooks
             if hasattr(element, 'cleanup'):
                 element.cleanup()
+        for element in self._elements.itervalues():
+            # invoke destroy hooks
+            if hasattr(element, 'destroy'):
+                element.destroy()
+        self._elements.clear()
+        self._traces.clear()
 
     def trace(self, guid, trace_id, attribute='value'):
         app = self._elements[guid]
-        
+
         if attribute == 'value':
             path = app.sync_trace(self.home_directory, trace_id)
             if path:
@@ -184,71 +393,61 @@ class TestbedController(testbed_impl.TestbedController):
                 content = None
         elif attribute == 'path':
             content = app.remote_trace_path(trace_id)
+        elif attribute == 'size':
+            # TODO
+            raise NotImplementedError
         else:
             content = None
         return content
-        
+
     def follow_trace(self, trace_id, trace):
         self._traces[trace_id] = trace
+    
+    def _make_generic(self, parameters, kind):
+        app = kind(self.plapi)
 
-    def _make_node(self, parameters):
-        node = self._node.Node(self.plapi)
-        
         # Note: there is 1-to-1 correspondence between attribute names
         #   If that changes, this has to change as well
         for attr,val in parameters.iteritems():
-            setattr(node, attr, val)
-        
-        # If emulation is enabled, we automatically need 
+            setattr(app, attr, val)
+
+        return app
+
+    def _make_node(self, parameters):
+        node = self._make_generic(parameters, self._node.Node)
+
+        # If emulation is enabled, we automatically need
         # some vsys interfaces and packages
         if node.emulation:
             node.required_vsys.add('ipfw-be')
             node.required_packages.add('ipfwslice')
-        
+
         return node
-    
+
     def _make_node_iface(self, parameters):
-        iface = self._interfaces.NodeIface(self.plapi)
-        
-        # Note: there is 1-to-1 correspondence between attribute names
-        #   If that changes, this has to change as well
-        for attr,val in parameters.iteritems():
-            setattr(iface, attr, val)
-        
-        return iface
-    
+        return self._make_generic(parameters, self._interfaces.NodeIface)
+
     def _make_tun_iface(self, parameters):
-        iface = self._interfaces.TunIface(self.plapi)
-        
-        # Note: there is 1-to-1 correspondence between attribute names
-        #   If that changes, this has to change as well
-        for attr,val in parameters.iteritems():
-            setattr(iface, attr, val)
-        
-        return iface
-    
+        return self._make_generic(parameters, self._interfaces.TunIface)
+
+    def _make_tap_iface(self, parameters):
+        return self._make_generic(parameters, self._interfaces.TapIface)
+
     def _make_netpipe(self, parameters):
-        iface = self._interfaces.NetPipe(self.plapi)
-        
-        # Note: there is 1-to-1 correspondence between attribute names
-        #   If that changes, this has to change as well
-        for attr,val in parameters.iteritems():
-            setattr(iface, attr, val)
-        
-        return iface
-    
+        return self._make_generic(parameters, self._interfaces.NetPipe)
+
     def _make_internet(self, parameters):
-        return self._interfaces.Internet(self.plapi)
-    
+        return self._make_generic(parameters, self._interfaces.Internet)
+
     def _make_application(self, parameters):
-        app = self._app.Application(self.plapi)
-        
-        # Note: there is 1-to-1 correspondence between attribute names
-        #   If that changes, this has to change as well
-        for attr,val in parameters.iteritems():
-            setattr(app, attr, val)
-        
-        return app
-        
+        return self._make_generic(parameters, self._app.Application)
+
+    def _make_dependency(self, parameters):
+        return self._make_generic(parameters, self._app.Dependency)
+
+    def _make_nepi_dependency(self, parameters):
+        return self._make_generic(parameters, self._app.NepiDependency)
 
+    def _make_ns3_dependency(self, parameters):
+        return self._make_generic(parameters, self._app.NS3Dependency)