Ticket #21: emulation support - finished :D
authorClaudio-Daniel Freire <claudio-daniel.freire@inria.fr>
Tue, 26 Apr 2011 16:13:32 +0000 (18:13 +0200)
committerClaudio-Daniel Freire <claudio-daniel.freire@inria.fr>
Tue, 26 Apr 2011 16:13:32 +0000 (18:13 +0200)
src/nepi/testbeds/planetlab/execute.py
src/nepi/testbeds/planetlab/interfaces.py
src/nepi/testbeds/planetlab/metadata_v01.py
test/testbeds/planetlab/design.py
test/testbeds/planetlab/execute.py

index 0a22f14..1562f26 100644 (file)
@@ -114,6 +114,10 @@ class TestbedController(testbed_impl.TestbedController):
         element = self._elements[guid]
         if element:
             setattr(element, name, value)
+            
+            if hasattr(element, 'refresh'):
+                # invoke attribute refresh hook
+                element.refresh()
 
     def get(self, time, guid, name):
         # TODO: take on account schedule time for the task
@@ -163,8 +167,9 @@ class TestbedController(testbed_impl.TestbedController):
         for trace in self._traces.values():
             trace.close()
         for element in self._elements.values():
-            pass
-            #element.destroy()
+            # invoke cleanup hooks
+            if hasattr(element, 'cleanup'):
+                element.cleanup()
 
     def trace(self, guid, trace_id, attribute='value'):
         app = self._elements[guid]
@@ -222,6 +227,16 @@ class TestbedController(testbed_impl.TestbedController):
         
         return iface
     
+    def _make_netpipe(self, parameters):
+        iface = self._interfaces.NetPipe(self.plapi)
+        
+        # Note: there is 1-to-1 correspondence between attribute names
+        #   If that changes, this has to change as well
+        for attr,val in parameters.iteritems():
+            setattr(iface, attr, val)
+        
+        return iface
+    
     def _make_internet(self, parameters):
         return self._interfaces.Internet(self.plapi)
     
index 4be8f53..876e6d0 100644 (file)
@@ -3,6 +3,7 @@
 
 from constants import TESTBED_ID
 import nepi.util.ipaddr2 as ipaddr2
+import nepi.util.server as server
 import plcapi
 
 class NodeIface(object):
@@ -125,4 +126,126 @@ class Internet(object):
             api = plcapi.PLCAPI()
         self._api = api
 
+class NetPipe(object):
+    def __init__(self, api=None):
+        if not api:
+            api = plcapi.PLCAPI()
+        self._api = api
+
+        # Attributes
+        self.mode = None
+        self.addrList = None
+        self.portList = None
+        
+        self.plrIn = None
+        self.bwIn = None
+        self.delayIn = None
 
+        self.plrOut = None
+        self.bwOut = None
+        self.delayOut = None
+        
+        # These get initialized when the pipe is connected to its node
+        self.node = None
+        self.configured = False
+    
+    def validate(self):
+        if not self.mode:
+            raise RuntimeError, "Undefined NetPipe mode"
+        if not self.portList:
+            raise RuntimeError, "Undefined NetPipe port list - must always define the scope"
+        if not (self.plrIn or self.bwIn or self.delayIn):
+            raise RuntimeError, "Undefined NetPipe inbound characteristics"
+        if not (self.plrOut or self.bwOut or self.delayOut):
+            raise RuntimeError, "Undefined NetPipe outbound characteristics"
+        if not self.node:
+            raise RuntimeError, "Unconnected NetPipe"
+    
+    def _add_pipedef(self, bw, plr, delay, options):
+        if delay:
+            options.extend(("delay","%dms" % (delay,)))
+        if bw:
+            options.extend(("bw","%.8fMbit/s" % (bw,)))
+        if plr:
+            options.extend(("plr","%.8f" % (plr,)))
+    
+    def _get_ruledef(self):
+        scope = "%s%s%s" % (
+            self.portList,
+            "@" if self.addrList else "",
+            self.addrList or "",
+        )
+        
+        options = []
+        if self.bwIn or self.plrIn or self.delayIn:
+            options.append("IN")
+            self._add_pipedef(self.bwIn, self.plrIn, self.delayIn, options)
+        if self.bwOut or self.plrOut or self.delayOut:
+            options.append("OUT")
+            self._add_pipedef(self.bwOut, self.plrOut, self.delayOut, options)
+        options = ' '.join(options)
+        
+        return (scope,options)
+
+    def configure(self):
+        # set up rule
+        scope, options = self._get_ruledef()
+        command = "sudo -S netconfig config %s %s %s" % (self.mode, scope, options)
+        print command
+        
+        (out,err),proc = server.popen_ssh_command(
+            command,
+            host = self.node.hostname,
+            port = None,
+            user = self.node.slicename,
+            agent = None,
+            ident_key = self.node.ident_path,
+            server_key = self.node.server_key
+            )
+    
+        if proc.wait():
+            raise RuntimeError, "Failed instal build sources: %s %s" % (out,err,)
+        
+        # we have to clean up afterwards
+        self.configured = True
+    
+    def refresh(self):
+        if self.configured:
+            # refresh rule
+            scope, options = self._get_ruledef()
+            command = "sudo -S netconfig refresh %s %s %s" % (self.mode, scope, options)
+            
+            (out,err),proc = server.popen_ssh_command(
+                command,
+                host = self.node.hostname,
+                port = None,
+                user = self.node.slicename,
+                agent = None,
+                ident_key = self.node.ident_path,
+                server_key = self.node.server_key
+                )
+        
+            if proc.wait():
+                raise RuntimeError, "Failed instal build sources: %s %s" % (out,err,)
+    
+    def cleanup(self):
+        if self.configured:
+            # remove rule
+            scope, options = self._get_ruledef()
+            command = "sudo -S netconfig delete %s %s" % (self.mode, scope)
+            
+            (out,err),proc = server.popen_ssh_command(
+                command,
+                host = self.node.hostname,
+                port = None,
+                user = self.node.slicename,
+                agent = None,
+                ident_key = self.node.ident_path,
+                server_key = self.node.server_key
+                )
+        
+            if proc.wait():
+                raise RuntimeError, "Failed instal build sources: %s %s" % (out,err,)
+            
+            self.configured = False
+    
index 41a123e..faa5379 100644 (file)
@@ -15,9 +15,61 @@ NODEIFACE = "NodeInterface"
 TUNIFACE = "TunInterface"
 APPLICATION = "Application"
 INTERNET = "Internet"
+NETPIPE = "NetPipe"
 
 PL_TESTBED_ID = "planetlab"
 
+
+### Custom validation functions ###
+def is_addrlist(attribute, value):
+    if not validation.is_string(attribute, value):
+        return False
+    
+    if not value:
+        # No empty strings
+        return False
+    
+    components = value.split(',')
+    
+    for component in components:
+        if '/' in component:
+            addr, mask = component.split('/',1)
+        else:
+            addr, mask = component, 32
+        
+        if mask is not None and not (mask and mask.isdigit()):
+            # No empty or nonnumeric masks
+            return False
+        
+        if not validation.is_ip4_address(attribute, value):
+            # Address part must be ipv4
+            return False
+        
+    return True
+
+def is_portlist(attribute, value):
+    if not validation.is_string(attribute, value):
+        return False
+    
+    if not value:
+        # No empty strings
+        return False
+    
+    components = value.split(',')
+    
+    for component in components:
+        if '-' in component:
+            pfrom, pto = component.split('-',1)
+        else:
+            pfrom = pto = component
+        
+        if not pfrom or not pto or not pfrom.isdigit() or not pto.isdigit():
+            # No empty or nonnumeric ports
+            return False
+        
+    return True
+
+
 ### Connection functions ####
 
 def connect_node_iface_node(testbed_instance, node, iface):
@@ -28,7 +80,7 @@ def connect_node_iface_inet(testbed_instance, iface, inet):
 
 def connect_tun_iface_node(testbed_instance, node, iface):
     if not node.emulation:
-        raise RuntimeError, "Usage of TUN interfaces requires emulation"
+        raise RuntimeError, "Use of TUN interfaces requires emulation"
     iface.node = node
     node.required_vsys.update(('fd_tuntap', 'vif_up'))
 
@@ -38,6 +90,11 @@ def connect_app(testbed_instance, node, app):
     if app.depends:
         node.required_packages.update(set(
             app.depends.split() ))
+
+def connect_node_netpipe(testbed_instance, node, netpipe):
+    if not node.emulation:
+        raise RuntimeError, "Use of NetPipes requires emulation"
+    netpipe.node = node
     
 
 ### Creation functions ###
@@ -78,6 +135,11 @@ def create_internet(testbed_instance, guid):
     element = testbed_instance._make_internet(parameters)
     testbed_instance.elements[guid] = element
 
+def create_netpipe(testbed_instance, guid):
+    parameters = testbed_instance._get_parameters(guid)
+    element = testbed_instance._make_netpipe(parameters)
+    testbed_instance.elements[guid] = element
+
 ### Start/Stop functions ###
 
 def start_application(testbed_instance, guid):
@@ -177,6 +239,18 @@ def configure_application(testbed_instance, guid):
     # Install stuff
     app.setup()
 
+def configure_netpipe(testbed_instance, guid):
+    netpipe = testbed_instance._elements[guid]
+    
+    # Do some validations
+    netpipe.validate()
+    
+    # Wait for dependencies
+    netpipe.node.wait_dependencies()
+    
+    # Install rules
+    netpipe.configure()
+
 ### Factory information ###
 
 connector_types = dict({
@@ -204,6 +278,12 @@ connector_types = dict({
                 "max": 1, 
                 "min": 1
             }),
+    "pipes": dict({
+                "help": "Connector to a NetPipe", 
+                "name": "pipes",
+                "max": 2, 
+                "min": 0
+            }),
    })
 
 connections = [
@@ -230,7 +310,13 @@ connections = [
         "to":   (TESTBED_ID, APPLICATION, "node"),
         "code": connect_app,
         "can_cross": False
-    })
+    }),
+    dict({
+        "from": (TESTBED_ID, NODE, "pipes"),
+        "to":   (TESTBED_ID, NETPIPE, "node"),
+        "code": connect_node_netpipe,
+        "can_cross": False
+    }),
 ]
 
 attributes = dict({
@@ -439,6 +525,70 @@ attributes = dict({
                 "flags": Attribute.DesignOnly,
                 "validation_function": validation.is_string
             }),
+    
+    "netpipe_mode": dict({      
+                "name": "mode",
+                "help": "Link mode:\n"
+                        " * SERVER: applies to incoming connections\n"
+                        " * CLIENT: applies to outgoing connections\n"
+                        " * SERVICE: applies to both",
+                "type": Attribute.ENUM, 
+                "flags": Attribute.DesignOnly,
+                "allowed": ["SERVER",
+                            "CLIENT",
+                            "SERVICE"],
+                "validation_function": validation.is_enum,
+            }),
+    "port_list":  dict({
+                "name": "portList", 
+                "help": "Port list or range. Eg: '22', '22,23,27', '20-2000'",
+                "type": Attribute.STRING,
+                "validation_function": is_portlist,
+            }),
+    "addr_list":  dict({
+                "name": "addrList", 
+                "help": "Address list or range. Eg: '127.0.0.1', '127.0.0.1,127.0.1.1', '127.0.0.1/8'",
+                "type": Attribute.STRING,
+                "validation_function": is_addrlist,
+            }),
+    "bw_in":  dict({
+                "name": "bwIn", 
+                "help": "Inbound bandwidth limit (in Mbit/s)",
+                "type": Attribute.DOUBLE,
+                "validation_function": validation.is_double,
+            }),
+    "bw_out":  dict({
+                "name": "bwOut", 
+                "help": "Outbound bandwidth limit (in Mbit/s)",
+                "type": Attribute.DOUBLE,
+                "validation_function": validation.is_double,
+            }),
+    "plr_in":  dict({
+                "name": "plrIn", 
+                "help": "Inbound packet loss rate (0 = no loss, 1 = 100% loss)",
+                "type": Attribute.DOUBLE,
+                "validation_function": validation.is_double,
+            }),
+    "plr_out":  dict({
+                "name": "plrOut", 
+                "help": "Outbound packet loss rate (0 = no loss, 1 = 100% loss)",
+                "type": Attribute.DOUBLE,
+                "validation_function": validation.is_double,
+            }),
+    "delay_in":  dict({
+                "name": "delayIn", 
+                "help": "Inbound packet delay (in milliseconds)",
+                "type": Attribute.INTEGER,
+                "range": (0,60000),
+                "validation_function": validation.is_integer,
+            }),
+    "delay_out":  dict({
+                "name": "delayOut", 
+                "help": "Outbound packet delay (in milliseconds)",
+                "type": Attribute.INTEGER,
+                "range": (0,60000),
+                "validation_function": validation.is_integer,
+            }),
     })
 
 traces = dict({
@@ -456,9 +606,9 @@ traces = dict({
               }), 
     })
 
-create_order = [ INTERNET, NODE, NODEIFACE, TUNIFACE, APPLICATION ]
+create_order = [ INTERNET, NODE, NODEIFACE, TUNIFACE, NETPIPE, APPLICATION ]
 
-configure_order = [ INTERNET, NODE, NODEIFACE, TUNIFACE, APPLICATION ]
+configure_order = [ INTERNET, NODE, NODEIFACE, TUNIFACE, NETPIPE, APPLICATION ]
 
 factories_info = dict({
     NODE: dict({
@@ -479,7 +629,7 @@ factories_info = dict({
                 "min_bandwidth",
                 "max_bandwidth",
             ],
-            "connector_types": ["devs", "apps"]
+            "connector_types": ["devs", "apps", "pipes"]
        }),
     NODEIFACE: dict({
             "allow_addresses": True,
@@ -521,6 +671,18 @@ factories_info = dict({
             "create_function": create_internet,
             "connector_types": ["devs"],
         }),
+    NETPIPE: dict({
+            "help": "Link emulation",
+            "category": "topology",
+            "create_function": create_netpipe,
+            "configure_function": configure_netpipe,
+            "box_attributes": ["netpipe_mode",
+                               "addr_list", "port_list",
+                               "bw_in","plr_in","delay_in",
+                               "bw_out","plr_out","delay_out"],
+            "connector_types": ["node"],
+            "traces": ["stdout", "stderr"]
+        }),
 })
 
 testbed_attributes = dict({
index 93acdae..4259cd5 100755 (executable)
@@ -62,5 +62,27 @@ class PlanetlabDesignTestCase(unittest.TestCase):
         xml2 = exp_desc2.to_xml()
         self.assertTrue(xml == xml2)
         
+    def test_design_emulation(self):
+        exp_desc, tstbd_desc, node1, node2, iface1, iface2, app = self.make_test_design()
+        
+        node1.set_attribute_value("emulation",True)
+        
+        netpipe1 = tstbd_desc.create("NetPipe")
+        netpipe1.set_attribute_value("mode","CLIENT")
+        netpipe1.set_attribute_value("portList","80,443")
+        netpipe1.set_attribute_value("bwIn",1.0)
+        netpipe1.set_attribute_value("bwOut",128.0/1024.0)
+        netpipe1.set_attribute_value("delayIn",12)
+        netpipe1.set_attribute_value("delayOut",92)
+        netpipe1.set_attribute_value("plrIn",0.05)
+        netpipe1.set_attribute_value("plrOut",0.15)
+        node1.connector("pipes").connect(netpipe1.connector("node"))
+
+        xml = exp_desc.to_xml()
+        exp_desc2 = ExperimentDescription()
+        exp_desc2.from_xml(xml)
+        xml2 = exp_desc2.to_xml()
+        self.assertTrue(xml == xml2)
+
 if __name__ == '__main__':
     unittest.main()
index 91e2a41..0156d73 100755 (executable)
@@ -202,6 +202,53 @@ echo 'OKIDOKI'
         instance.stop()
         instance.shutdown()
 
+    @test_util.skipUnless(test_util.pl_auth() is not None, "Test requires PlanetLab authentication info (PL_USER and PL_PASS environment variables)")
+    def test_emulation(self):
+        instance = self.make_instance()
+        
+        instance.defer_create(2, "Node")
+        instance.defer_create_set(2, "hostname", "onelab11.pl.sophia.inria.fr")
+        instance.defer_create_set(2, "emulation", True) # require emulation
+        instance.defer_create(3, "NodeInterface")
+        instance.defer_connect(2, "devs", 3, "node")
+        instance.defer_create(4, "Internet")
+        instance.defer_connect(3, "inet", 4, "devs")
+        instance.defer_create(7, "NetPipe")
+        instance.defer_create_set(7, "mode", "CLIENT")
+        instance.defer_create_set(7, "portList", "80")
+        instance.defer_create_set(7, "bwOut", 12.0/1024.0) # 12kbps
+        instance.defer_create_set(7, "bwIn", 64.0/1024.0) # 64kbps
+        instance.defer_create_set(7, "plrOut", 0.01) # 1% plr outbound - high loss
+        instance.defer_create_set(7, "plrIn", 0.001) # 0.1% plr inbound - regular loss
+        instance.defer_create_set(7, "delayOut", int(1500 * 8 / (12.0/1024.0) / 1000)) # tx delay at 12kbps in ms
+        instance.defer_create_set(7, "delayIn", int(1500 * 8 / (64.0/1024.0) / 1000)) # rx delay at 64kbps in ms
+        instance.defer_connect(2, "pipes", 7, "node")
+        instance.defer_create(8, "Application")
+        instance.defer_create_set(8, "command", "time wget -q -O /dev/null http://www.google.com/") # Fetch ~10kb
+        instance.defer_add_trace(8, "stderr")
+        instance.defer_connect(8, "node", 2, "apps")
+
+        instance.do_setup()
+        instance.do_create()
+        instance.do_connect()
+        instance.do_preconfigure()
+        instance.do_configure()
+        
+        instance.start()
+        while instance.status(8) != STATUS_FINISHED:
+            time.sleep(0.5)
+        test_result = (instance.trace(8, "stderr") or "").strip()
+        comp_result = r".*real\s*(?P<min>[0-9]+)m(?P<sec>[0-9]+[.][0-9]+)s.*"
+        
+        match = re.match(comp_result, test_result, re.MULTILINE)
+        self.assertTrue(match, "Unexpected output: %s" % (test_result,))
+        
+        minutes = int(match.group("min"))
+        seconds = float(match.group("sec"))
+        self.assertTrue((minutes * 60 + seconds) > 1.0, "Emulation not effective: %s" % (test_result,))
+        instance.stop()
+        instance.shutdown()
+
     @test_util.skipUnless(test_util.pl_auth() is not None, "Test requires PlanetLab authentication info (PL_USER and PL_PASS environment variables)")
     def test_tun_emulation_requirement(self):
         instance = self.make_instance()