Lots of cross-connection fixes, TUN synchronization, etc
authorClaudio-Daniel Freire <claudio-daniel.freire@inria.fr>
Fri, 13 May 2011 13:53:46 +0000 (15:53 +0200)
committerClaudio-Daniel Freire <claudio-daniel.freire@inria.fr>
Fri, 13 May 2011 13:53:46 +0000 (15:53 +0200)
Redirect most PL tests to our NepiPLC
Add multi-PLC capabilities and tests

NOTE: Some tests are degraded because we don't have ennough nodes to do proper tests yet

src/nepi/core/execute.py
src/nepi/core/metadata.py
src/nepi/core/testbed_impl.py
src/nepi/testbeds/planetlab/interfaces.py
src/nepi/testbeds/planetlab/metadata_v01.py
src/nepi/testbeds/planetlab/scripts/tun_connect.py
src/nepi/testbeds/planetlab/tunproto.py
test/testbeds/planetlab/execute.py
test/testbeds/planetlab/integration.py
test/testbeds/planetlab/integration_multi.py [new file with mode: 0755]
test/testbeds/planetlab/integration_ns3.py

index ad5b8a6..4018857 100644 (file)
@@ -44,7 +44,7 @@ class ConnectorType(ConnectorTypeBase):
         self._to_connections[type_id] = (can_cross, init_code, compl_code)
 
     def can_connect(self, testbed_id, factory_id, name, count, 
-            must_cross = False):
+            must_cross):
         connector_type_id = self.make_connector_type_id(testbed_id, factory_id, name)
         for lookup_type_id in self._type_resolution_order(connector_type_id):
             if lookup_type_id in self._from_connections:
@@ -52,26 +52,28 @@ class ConnectorType(ConnectorTypeBase):
             elif lookup_type_id in self._to_connections:
                 (can_cross, init_code, compl_code) = self._to_connections[lookup_type_id]
             else:
-                # keey trying
+                # keep trying
                 continue
             return not must_cross or can_cross
         else:
             return False
 
-    def _connect_to_code(self, testbed_id, factory_id, name):
+    def _connect_to_code(self, testbed_id, factory_id, name,
+            must_cross):
         connector_type_id = self.make_connector_type_id(testbed_id, factory_id, name)
         for lookup_type_id in self._type_resolution_order(connector_type_id):
             if lookup_type_id in self._to_connections:
                 (can_cross, init_code, compl_code) = self._to_connections[lookup_type_id]
-                return (init_code, compl_code)
+                if not must_cross or can_cross:
+                    return (init_code, compl_code)
         else:
             return (False, False)
     
-    def connect_to_init_code(self, testbed_id, factory_id, name):
-        return self._connect_to_code(testbed_id, factory_id, name)[0]
+    def connect_to_init_code(self, testbed_id, factory_id, name, must_cross):
+        return self._connect_to_code(testbed_id, factory_id, name, must_cross)[0]
 
-    def connect_to_compl_code(self, testbed_id, factory_id, name):
-        return self._connect_to_code(testbed_id, factory_id, name)[1]
+    def connect_to_compl_code(self, testbed_id, factory_id, name, must_cross):
+        return self._connect_to_code(testbed_id, factory_id, name, must_cross)[1]
 
 class Factory(AttributesMap):
     def __init__(self, factory_id, create_function, start_function, 
index f0c1417..784158c 100644 (file)
@@ -84,6 +84,15 @@ class VersionedMetadataInfo(object):
         """
         return self.configure_order
 
+    @property
+    def start_order(self):
+        """ list of factory ids that indicates the order in which the elements
+        should be started.
+        
+        Default: same as configure_order
+        """
+        return self.configure_order
+
     @property
     def factories_info(self):
         """ dictionary of dictionaries of factory specific information
@@ -309,6 +318,10 @@ class Metadata(object):
     def preconfigure_order(self):
         return self._metadata.preconfigure_order
 
+    @property
+    def start_order(self):
+        return self._metadata.start_order
+
     def testbed_attributes(self):
         attributes = AttributesMap()
 
index b5f9127..320f7b0 100644 (file)
@@ -14,6 +14,8 @@ from nepi.util.constants import STATUS_UNDETERMINED, TIME_NOW, \
     TESTBED_STATUS_STARTED, \
     TESTBED_STATUS_STOPPED
 
+import collections
+
 class TestbedController(execute.TestbedController):
     def __init__(self, testbed_id, testbed_version):
         super(TestbedController, self).__init__(testbed_id, testbed_version)
@@ -115,7 +117,7 @@ class TestbedController(execute.TestbedController):
         count = self._get_connection_count(guid1, connector_type_name1)
         connector_type = factory1.connector_type(connector_type_name1)
         connector_type.can_connect(self._testbed_id, factory_id2, 
-                connector_type_name2, count)
+                connector_type_name2, count, False)
         if not guid1 in self._connect:
             self._connect[guid1] = dict()
         if not connector_type_name1 in self._connect[guid1]:
@@ -136,7 +138,7 @@ class TestbedController(execute.TestbedController):
         count = self._get_connection_count(guid, connector_type_name)
         connector_type = factory.connector_type(connector_type_name)
         connector_type.can_connect(cross_testbed_id, cross_factory_id, 
-                cross_connector_type_name, count, must_cross = True)
+                cross_connector_type_name, count, True)
         if not guid in self._cross_connect:
             self._cross_connect[guid] = dict()
         if not connector_type_name in self._cross_connect[guid]:
@@ -222,11 +224,13 @@ class TestbedController(execute.TestbedController):
                     if init:
                         connect_code = connector_type1.connect_to_init_code(
                                 self._testbed_id, factory_id2, 
-                                connector_type_name2)
+                                connector_type_name2,
+                                False)
                     else:
                         connect_code = connector_type1.connect_to_compl_code(
                                 self._testbed_id, factory_id2, 
-                                connector_type_name2)
+                                connector_type_name2,
+                                False)
                     if connect_code:
                         connect_code(self, guid1, guid2)
 
@@ -285,11 +289,13 @@ class TestbedController(execute.TestbedController):
                 if init:
                     connect_code = connector_type.connect_to_init_code(
                         cross_testbed_id, cross_factory_id, 
-                        cross_connector_type_name)
+                        cross_connector_type_name,
+                        True)
                 else:
                     connect_code = connector_type.connect_to_compl_code(
                         cross_testbed_id, cross_factory_id, 
-                        cross_connector_type_name)
+                        cross_connector_type_name,
+                        True)
                 if connect_code:
                     elem_cross_data = cross_data[cross_testbed_guid][cross_guid]
                     connect_code(self, guid, elem_cross_data)       
@@ -403,11 +409,23 @@ class TestbedController(execute.TestbedController):
         return factory.box_attributes.attributes_list
 
     def start(self, time = TIME_NOW):
+        # Plan everything
+        #  - group by factory_id
+        #  - enqueue task callables
+        plan = collections.defaultdict(list)
+        
         for guid, factory_id in self._create.iteritems():
             factory = self._factories[factory_id]
             start_function = factory.start_function
             if start_function:
-                start_function(self, guid)
+                plan[factory_id].append((start_function, guid))
+
+        # Execute plan, following the factory_id order
+        for factory_id in self._metadata.start_order:
+            if factory_id in plan:
+                for start_function, guid in plan[factory_id]:
+                    start_function(self, guid)
+        
         self._status = TESTBED_STATUS_STARTED
 
     #action: NotImplementedError
index 127b8a8..1999720 100644 (file)
@@ -89,6 +89,9 @@ class _CrossIface(object):
         self.tun_proto = proto
         self.tun_addr = addr
         self.tun_port = port
+        
+        # Cannot access cross peers
+        self.peer_proto_impl = None
 
 class TunIface(object):
     _PROTO_MAP = tunproto.TUN_PROTO_MAP
@@ -127,6 +130,8 @@ class TunIface(object):
         # These get initialized when the iface is connected to its peer
         self.peer_iface = None
         self.peer_proto = None
+        self.peer_addr = None
+        self.peer_port = None
         self.peer_proto_impl = None
 
         # same as peer proto, but for execute-time standard attribute lookups
index 8b765e6..09b741a 100644 (file)
@@ -110,7 +110,7 @@ def crossconnect_tun_iface_peer_init(proto, testbed_instance, iface_guid, peer_i
     iface = testbed_instance._elements[iface_guid]
     iface.peer_iface = None
     iface.peer_addr = peer_iface_data.get("tun_addr")
-    iface.peer_proto = peer_iface_data.get("tun_proto")
+    iface.peer_proto = peer_iface_data.get("tun_proto") or proto
     iface.peer_port = peer_iface_data.get("tun_port")
     iface.tun_key = min(iface.tun_key, peer_iface_data.get("tun_key"))
     iface.tun_proto = proto
@@ -121,7 +121,7 @@ def crossconnect_tun_iface_peer_compl(proto, testbed_instance, iface_guid, peer_
     # refresh (refreshable) attributes for second-phase
     iface = testbed_instance._elements[iface_guid]
     iface.peer_addr = peer_iface_data.get("tun_addr")
-    iface.peer_proto = peer_iface_data.get("tun_proto")
+    iface.peer_proto = peer_iface_data.get("tun_proto") or proto
     iface.peer_port = peer_iface_data.get("tun_port")
     
     postconfigure_tuniface(testbed_instance, iface_guid)
index 5916f10..8dff5a2 100644 (file)
@@ -252,6 +252,16 @@ def pl_vif_start(tun_path, tun_name):
     if out.strip():
         print >>sys.stderr, out
 
+def pl_vif_stop(tun_path, tun_name):
+    stdin = open("/vsys/vif_down.in","w")
+    stdout = open("/vsys/vif_down.out","r")
+    stdin.write(tun_name+"\n")
+    stdin.close()
+    out = stdout.read()
+    stdout.close()
+    if out.strip():
+        print >>sys.stderr, out
+
 
 def ipfmt(ip):
     ipbytes = map(ord,ip.decode("hex"))
@@ -401,9 +411,8 @@ def decrypt(packet, crypter):
     
     return packet
 
-abortme = False
 def tun_fwd(tun, remote):
-    global abortme
+    global TERMINATE
     
     # in PL mode, we cannot strip PI structs
     # so we'll have to handle them
@@ -436,7 +445,7 @@ def tun_fwd(tun, remote):
     # Which is needed, since /dev/net/tun is unbuffered
     fwbuf = ""
     bkbuf = ""
-    while not abortme:
+    while not TERMINATE:
         wset = []
         if packetReady(bkbuf, ether_mode):
             wset.append(tun)
@@ -510,12 +519,12 @@ MODEINFO = {
                   tunopen=tunopen, tunclose=tunclose,
                   dealloc=nop,
                   start=pl_vif_start,
-                  stop=nop),
+                  stop=pl_vif_stop),
     'pl-tap'  : dict(alloc=functools.partial(pl_tuntap_alloc, "tap"),
                   tunopen=tunopen, tunclose=tunclose,
                   dealloc=nop,
                   start=pl_vif_start,
-                  stop=nop),
+                  stop=pl_vif_stop),
 }
     
 tun_path = options.tun_path
@@ -537,6 +546,13 @@ except:
     raise
 
 
+# Trak SIGTERM, and set global termination flag instead of dying
+TERMINATE = False
+def _finalize(sig,frame):
+    global TERMINATE
+    TERMINATE = True
+signal.signal(signal.SIGTERM, _finalize)
+
 try:
     tcpdump = None
     
@@ -552,12 +568,15 @@ try:
         import passfd
         
         sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
-        try:
-            sock.connect(options.pass_fd)
-        except socket.error:
-            # wait a while, retry
-            print >>sys.stderr, "Could not connect. Retrying in a sec..."
-            time.sleep(1)
+        for i in xrange(30):
+            try:
+                sock.connect(options.pass_fd)
+                break
+            except socket.error:
+                # wait a while, retry
+                print >>sys.stderr, "Could not connect. Retrying in a sec..."
+                time.sleep(1)
+        else:
             sock.connect(options.pass_fd)
         passfd.sendfd(sock, tun.fileno(), '0')
         
@@ -567,18 +586,9 @@ try:
         tcpdump = subprocess.Popen(
             ["tcpdump","-l","-n","-i",tun_name])
         
-        def _finalize(sig,frame):
-            os.kill(tcpdump.pid, signal.SIGTERM)
-            tcpdump.wait()
-            if callable(_oldterm):
-                _oldterm(sig,frame)
-            else:
-                sys.exit(0)
-        _oldterm = signal.signal(signal.SIGTERM, _finalize)
-            
         # just wait forever
         def tun_fwd(tun, remote):
-            while True:
+            while not TERMINATE:
                 time.sleep(1)
         remote = None
     elif options.udp:
@@ -598,7 +608,16 @@ try:
         if remaining_args and not remaining_args[0].startswith('-'):
             print >>sys.stderr, "Connecting to: %s:%d" % (remaining_args[0],options.port)
             rsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
-            rsock.connect((remaining_args[0],options.port))
+            for i in xrange(30):
+                try:
+                    rsock.connect((remaining_args[0],options.port))
+                    break
+                except socket.error:
+                    # wait a while, retry
+                    print >>sys.stderr, "Could not connect. Retrying in a sec..."
+                    time.sleep(1)
+            else:
+                rsock.connect((remaining_args[0],options.port))
         else:
             print >>sys.stderr, "Listening at: %s:%d" % (hostaddr,options.port)
             lsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
@@ -613,7 +632,7 @@ try:
 
     if tcpdump:
         os.kill(tcpdump.pid, signal.SIGTERM)
-        proc.wait()
+        tcpdump.wait()
 finally:
     try:
         print >>sys.stderr, "Shutting down..."
@@ -623,18 +642,19 @@ finally:
     
     # tidy shutdown in every case - swallow exceptions
     try:
-        modeinfo['tunclose'](tun_path, tun_name, tun)
+        modeinfo['stop'](tun_path, tun_name)
     except:
         pass
-        
+
     try:
-        modeinfo['stop'](tun_path, tun_name)
+        modeinfo['tunclose'](tun_path, tun_name, tun)
     except:
         pass
-
+        
     try:
         modeinfo['dealloc'](tun_path, tun_name)
     except:
         pass
-
+    
+    print >>sys.stderr, "TERMINATED GRACEFULLY"
 
index af724f6..6ef7141 100644 (file)
@@ -8,6 +8,7 @@ import rspawn
 import subprocess
 import threading
 import base64
+import time
 
 from nepi.util import server
 
@@ -192,24 +193,57 @@ class TunProtoBase(object):
         
         if proc.wait():
             raise RuntimeError, "Failed to set up TUN: %s %s" % (out,err,)
-
+        
         self._started = True
     
+    def _launch_and_wait(self, *p, **kw):
+        local = self.local()
+        
+        self.launch(*p, **kw)
+        
+        # Wait for the process to be started
+        while self.status() == rspawn.NOT_STARTED:
+            time.sleep(1.0)
+        
+        # Wait for the connection to be established
+        if local.capture:
+            for spin in xrange(30):
+                if self.status() != rspawn.RUNNING:
+                    break
+                
+                (out,err),proc = server.popen_ssh_command(
+                    "cd %(home)s ; grep -c Connected capture" % dict(
+                        home = server.shell_escape(self.home_path)),
+                    host = local.node.hostname,
+                    port = None,
+                    user = local.node.slicename,
+                    agent = None,
+                    ident_key = local.node.ident_path,
+                    server_key = local.node.server_key
+                    )
+                
+                if proc.wait():
+                    break
+                
+                if out.strip() != '0':
+                    break
+                
+                time.sleep(1.0)
+    
     def async_launch(self, check_proto, listen, extra_args=[]):
         if not self._launcher:
             self._launcher = threading.Thread(
-                target = self.launch,
+                target = self._launch_and_wait,
                 args = (check_proto, listen, extra_args))
             self._launcher.start()
     
     def async_launch_wait(self):
-        if not self._started:
-            if self._launcher:
-                self._launcher.join()
-                if not self._started:
-                    raise RuntimeError, "Failed to launch TUN forwarder"
-            else:
-                self.launch()
+        if self._launcher:
+            self._launcher.join()
+            if not self._started:
+                raise RuntimeError, "Failed to launch TUN forwarder"
+        elif not self._started:
+            self.launch()
 
     def checkpid(self):            
         local = self.local()
index 0cb454c..260a006 100755 (executable)
@@ -14,6 +14,14 @@ import test_util
 import sys
 
 class PlanetLabExecuteTestCase(unittest.TestCase):
+    testbed_id = "planetlab"
+    testbed_version = "01"
+    slicename = "inria_nepi"
+    plchost = "nepiplc.pl.sophia.inria.fr"
+    
+    host1 = "nepi1.pl.sophia.inria.fr"
+    host2 = "nepi2.pl.sophia.inria.fr"
+
     def setUp(self):
         self.root_dir = tempfile.mkdtemp()
         
@@ -26,9 +34,12 @@ class PlanetLabExecuteTestCase(unittest.TestCase):
             shutil.rmtree(self.root_dir)
 
     def make_instance(self):
-        testbed_version = "01"
+        testbed_id = self.testbed_id
+        testbed_version = self.testbed_version
+        slicename = self.slicename
+        plchost = self.plchost
+        
         instance = planetlab.TestbedController(testbed_version)
-        slicename = "inria_nepi12"
         pl_ssh_key = os.environ.get(
             "PL_SSH_KEY",
             "%s/.ssh/id_rsa_planetlab" % (os.environ['HOME'],) )
@@ -39,6 +50,7 @@ class PlanetLabExecuteTestCase(unittest.TestCase):
         instance.defer_configure("sliceSSHKey", pl_ssh_key)
         instance.defer_configure("authUser", pl_user)
         instance.defer_configure("authPass", pl_pwd)
+        instance.defer_configure("plcHost", plchost)
         
         return instance
 
@@ -47,9 +59,9 @@ class PlanetLabExecuteTestCase(unittest.TestCase):
         instance = self.make_instance()
         
         instance.defer_create(2, "Node")
-        instance.defer_create_set(2, "hostname", "onelab11.pl.sophia.inria.fr")
+        instance.defer_create_set(2, "hostname", self.host1)
         instance.defer_create(3, "Node")
-        instance.defer_create_set(3, "hostname", "onelab10.pl.sophia.inria.fr")
+        instance.defer_create_set(3, "hostname", self.host2)
         instance.defer_create(4, "NodeInterface")
         instance.defer_connect(2, "devs", 4, "node")
         instance.defer_create(5, "NodeInterface")
@@ -102,7 +114,7 @@ class PlanetLabExecuteTestCase(unittest.TestCase):
         instance = self.make_instance()
         
         instance.defer_create(2, "Node")
-        instance.defer_create_set(2, "hostname", "onelab11.pl.sophia.inria.fr")
+        instance.defer_create_set(2, "hostname", self.host1)
         instance.defer_create(3, "NodeInterface")
         instance.defer_connect(2, "devs", 3, "node")
         instance.defer_create(4, "Internet")
@@ -140,7 +152,7 @@ class PlanetLabExecuteTestCase(unittest.TestCase):
         instance = self.make_instance()
         
         instance.defer_create(2, "Node")
-        instance.defer_create_set(2, "hostname", "onelab11.pl.sophia.inria.fr")
+        instance.defer_create_set(2, "hostname", self.host1)
         instance.defer_create(3, "NodeInterface")
         instance.defer_connect(2, "devs", 3, "node")
         instance.defer_create(4, "Internet")
@@ -195,7 +207,7 @@ FIONREAD = 0x[0-9a-fA-F]{8}.*
         instance = self.make_instance()
         
         instance.defer_create(2, "Node")
-        instance.defer_create_set(2, "hostname", "onelab11.pl.sophia.inria.fr")
+        instance.defer_create_set(2, "hostname", self.host1)
         instance.defer_create_set(2, "emulation", True) # require emulation
         instance.defer_create(3, "NodeInterface")
         instance.defer_connect(2, "devs", 3, "node")
@@ -243,7 +255,7 @@ echo 'OKIDOKI'
         instance = self.make_instance()
         
         instance.defer_create(2, "Node")
-        instance.defer_create_set(2, "hostname", "onelab11.pl.sophia.inria.fr")
+        instance.defer_create_set(2, "hostname", self.host1)
         instance.defer_create_set(2, "emulation", True) # require emulation
         instance.defer_create(3, "NodeInterface")
         instance.defer_connect(2, "devs", 3, "node")
@@ -300,7 +312,7 @@ echo 'OKIDOKI'
         instance = self.make_instance()
         
         instance.defer_create(2, "Node")
-        instance.defer_create_set(2, "hostname", "onelab11.pl.sophia.inria.fr")
+        instance.defer_create_set(2, "hostname", self.host1)
         instance.defer_create(3, "NodeInterface")
         instance.defer_connect(2, "devs", 3, "node")
         instance.defer_create(4, "Internet")
@@ -330,10 +342,10 @@ echo 'OKIDOKI'
         instance = self.make_instance()
         
         instance.defer_create(2, "Node")
-        instance.defer_create_set(2, "hostname", "onelab11.pl.sophia.inria.fr")
+        instance.defer_create_set(2, "hostname", self.host1)
         instance.defer_create_set(2, "emulation", True) # require emulation
         instance.defer_create(3, "Node")
-        instance.defer_create_set(3, "hostname", "onelab10.pl.sophia.inria.fr")
+        instance.defer_create_set(3, "hostname", self.host2)
         instance.defer_create_set(3, "emulation", True) # require emulation
         instance.defer_create(4, "NodeInterface")
         instance.defer_connect(2, "devs", 4, "node")
@@ -417,7 +429,7 @@ echo 'OKIDOKI'
         instance = self.make_instance()
         
         instance.defer_create(2, "Node")
-        instance.defer_create_set(2, "hostname", "onelab11.pl.sophia.inria.fr")
+        instance.defer_create_set(2, "hostname", self.host1)
         instance.defer_create(3, "NodeInterface")
         instance.defer_connect(2, "devs", 3, "node")
         instance.defer_create(4, "Internet")
@@ -456,7 +468,7 @@ echo 'OKIDOKI'
         instance = self.make_instance()
         
         instance.defer_create(2, "Node")
-        instance.defer_create_set(2, "hostname", "onelab11.pl.sophia.inria.fr")
+        instance.defer_create_set(2, "hostname", self.host1)
         instance.defer_create(3, "NodeInterface")
         instance.defer_connect(2, "devs", 3, "node")
         instance.defer_create(4, "Internet")
index 0edcb1e..8b143d7 100755 (executable)
@@ -14,6 +14,14 @@ import unittest
 import re
 
 class PlanetLabIntegrationTestCase(unittest.TestCase):
+    testbed_id = "planetlab"
+    testbed_version = "01"
+    slicename = "inria_nepi"
+    plchost = "nepiplc.pl.sophia.inria.fr"
+    
+    host1 = "nepi1.pl.sophia.inria.fr"
+    host2 = "nepi2.pl.sophia.inria.fr"
+
     def setUp(self):
         self.root_dir = tempfile.mkdtemp()
 
@@ -26,9 +34,10 @@ class PlanetLabIntegrationTestCase(unittest.TestCase):
             shutil.rmtree(self.root_dir)
 
     def make_experiment_desc(self):
-        testbed_id = "planetlab"
-        testbed_version = "01"
-        slicename = "inria_nepi12"
+        testbed_id = self.testbed_id
+        testbed_version = self.testbed_version
+        slicename = self.slicename
+        plchost = self.plchost
         pl_ssh_key = os.environ.get(
             "PL_SSH_KEY",
             "%s/.ssh/id_rsa_planetlab" % (os.environ['HOME'],) )
@@ -42,6 +51,7 @@ class PlanetLabIntegrationTestCase(unittest.TestCase):
         pl_desc.set_attribute_value("sliceSSHKey", pl_ssh_key)
         pl_desc.set_attribute_value("authUser", pl_user)
         pl_desc.set_attribute_value("authPass", pl_pwd)
+        pl_desc.set_attribute_value("plcHost", plchost)
         
         return pl_desc, exp_desc
 
@@ -51,8 +61,8 @@ class PlanetLabIntegrationTestCase(unittest.TestCase):
         
         node1 = pl.create("Node")
         node2 = pl.create("Node")
-        node1.set_attribute_value("hostname", "onelab11.pl.sophia.inria.fr")
-        node2.set_attribute_value("hostname", "onelab10.pl.sophia.inria.fr")
+        node1.set_attribute_value("hostname", self.host1)
+        node2.set_attribute_value("hostname", self.host2)
         iface1 = pl.create("NodeInterface")
         iface2 = pl.create("NodeInterface")
         iface2.set_attribute_value("label", "node2iface")
diff --git a/test/testbeds/planetlab/integration_multi.py b/test/testbeds/planetlab/integration_multi.py
new file mode 100755 (executable)
index 0000000..9274c46
--- /dev/null
@@ -0,0 +1,165 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+import getpass
+from nepi.core.design import ExperimentDescription, FactoriesProvider
+from nepi.core.execute import ExperimentController
+from nepi.util import proxy
+from nepi.util.constants import DeploymentConfiguration as DC, ATTR_NEPI_TESTBED_ENVIRONMENT_SETUP
+import os
+import shutil
+import tempfile
+import test_util
+import time
+import unittest
+import re
+
+class PlanetLabMultiIntegrationTestCase(unittest.TestCase):
+    testbed_id = "planetlab"
+    testbed_version = "01"
+    
+    slicename1 = "inria_nepi"
+    plchost1 = "nepiplc.pl.sophia.inria.fr"
+
+    slicename2 = "inria_nepi12"
+    plchost2 = "www.planet-lab.eu"
+    
+    host1pl1 = "nepi1.pl.sophia.inria.fr"
+    host2pl1 = "nepi2.pl.sophia.inria.fr"
+
+    host1pl2 = "onelab11.pl.sophia.inria.fr"
+    host2pl2 = "onelab10.pl.sophia.inria.fr"
+
+    def setUp(self):
+        self.root_dir = tempfile.mkdtemp()
+
+    def tearDown(self):
+        try:
+            shutil.rmtree(self.root_dir)
+        except:
+            # retry
+            time.sleep(0.1)
+            shutil.rmtree(self.root_dir)
+
+    def make_experiment_desc(self):
+        testbed_id = self.testbed_id
+        testbed_version = self.testbed_version
+        
+        slicename1 = self.slicename1
+        plchost1 = self.plchost1
+        
+        slicename2 = self.slicename2
+        plchost2 = self.plchost2
+        
+        pl_ssh_key = os.environ.get(
+            "PL_SSH_KEY",
+            "%s/.ssh/id_rsa_planetlab" % (os.environ['HOME'],) )
+        pl_user, pl_pwd = test_util.pl_auth()
+
+        exp_desc = ExperimentDescription()
+        pl_provider = FactoriesProvider(testbed_id, testbed_version)
+        pl_desc = exp_desc.add_testbed_description(pl_provider)
+        pl_desc.set_attribute_value("homeDirectory", self.root_dir)
+        pl_desc.set_attribute_value("slice", slicename1)
+        pl_desc.set_attribute_value("sliceSSHKey", pl_ssh_key)
+        pl_desc.set_attribute_value("authUser", pl_user)
+        pl_desc.set_attribute_value("authPass", pl_pwd)
+        pl_desc.set_attribute_value("plcHost", plchost1)
+
+        pl_desc2 = exp_desc.add_testbed_description(pl_provider)
+        pl_desc2.set_attribute_value("homeDirectory", self.root_dir+"v2")
+        pl_desc2.set_attribute_value("slice", slicename2)
+        pl_desc2.set_attribute_value("sliceSSHKey", pl_ssh_key)
+        pl_desc2.set_attribute_value("authUser", pl_user)
+        pl_desc2.set_attribute_value("authPass", pl_pwd)
+        pl_desc2.set_attribute_value("plcHost", plchost2)
+        
+        return pl_desc, pl_desc2, exp_desc
+    
+    def make_pl_tapnode(self, pl, tapip, hostname, label_prefix):
+        node1 = pl.create("Node")
+        node1.set_attribute_value("hostname", hostname)
+        node1.set_attribute_value("label", label_prefix)
+        node1.set_attribute_value("emulation", True) # require emulation
+        iface1 = pl.create("NodeInterface")
+        iface1.set_attribute_value("label", label_prefix+"iface")
+        tap1 = pl.create("TapInterface")
+        tap1.enable_trace("packets") # for error output
+        tap1.set_attribute_value("label", label_prefix+"tap")
+        inet = pl.create("Internet")
+        node1.connector("devs").connect(iface1.connector("node"))
+        node1.connector("devs").connect(tap1.connector("node"))
+        iface1.connector("inet").connect(inet.connector("devs"))
+        
+        tap1ip = tap1.add_address()
+        tap1ip.set_attribute_value("Address", tapip)
+        tap1ip.set_attribute_value("NetPrefix", 24)
+        tap1ip.set_attribute_value("Broadcast", False)
+        
+        return node1, iface1, tap1, tap1ip, inet
+    
+    def _test_plpl_crossconnect(self, proto):
+        pl, pl2, exp = self.make_experiment_desc()
+        
+        # Create PL node, ifaces, assign addresses
+        node1, iface1, tap1, tap1ip, inet1 = self.make_pl_tapnode(pl, 
+            "192.168.2.2", self.host1pl1, "node1")
+        node2, iface2, tap2, tap2ip, inet2 = self.make_pl_tapnode(pl2, 
+            "192.168.2.3", self.host1pl2, "node2")
+            
+        # Connect the two
+        tap1.connector(proto).connect(tap2.connector(proto))
+        
+        # Create PlanetLab ping application, pinging the from one PL to another
+        ping = pl.create("Application")
+        ping.set_attribute_value("command", "ping -qc10 {#[node2tap].addr[0].[Address]#}")
+        ping.enable_trace("stdout")
+        ping.enable_trace("stderr")
+        ping.connector("node").connect(node1.connector("apps"))
+
+        comp_result = r"""PING .* \(.*\) \d*\(\d*\) bytes of data.
+
+--- .* ping statistics ---
+10 packets transmitted, 10 received, 0% packet loss, time \d*ms.*
+"""
+
+        xml = exp.to_xml()
+
+        controller = ExperimentController(xml, self.root_dir)
+        controller.start()
+
+        while not controller.is_finished(ping.guid):
+            time.sleep(0.5)
+          
+        ping_result = controller.trace(pl.guid, ping.guid, "stdout")
+        tap_trace = controller.trace(pl.guid, tap1.guid, "packets")
+        tap2_trace = controller.trace(pl2.guid, tap2.guid, "packets")
+
+        controller.stop()
+        controller.shutdown()
+
+        # asserts at the end, to make sure there's proper cleanup
+        self.assertTrue(re.match(comp_result, ping_result, re.MULTILINE),
+            "Unexpected trace:\n%s\nTap trace at origin:\n%s\nTap trace at destination:\n%s\n" % (
+                ping_result,
+                tap_trace,
+                tap2_trace) )
+
+    @test_util.skipUnless(test_util.pl_auth() is not None, 
+        "Test requires PlanetLab authentication info (PL_USER and PL_PASS environment variables)")
+    @test_util.skipUnless(os.environ.get('NEPI_FULL_TESTS','').lower() in ('1','yes','true','on'),
+        "Test is expensive, requires NEPI_FULL_TESTS=yes")
+    def test_plpl_crossconnect_udp(self):
+        self._test_plpl_crossconnect("udp")
+
+    @test_util.skipUnless(test_util.pl_auth() is not None, 
+        "Test requires PlanetLab authentication info (PL_USER and PL_PASS environment variables)")
+    @test_util.skipUnless(os.environ.get('NEPI_FULL_TESTS','').lower() in ('1','yes','true','on'),
+        "Test is expensive, requires NEPI_FULL_TESTS=yes")
+    def test_plpl_crossconnect_tcp(self):
+        self._test_plpl_crossconnect("tcp")
+
+
+if __name__ == '__main__':
+    unittest.main()
+
index 71f899d..6a02847 100755 (executable)
@@ -15,6 +15,14 @@ import unittest
 import re
 
 class PlanetLabCrossIntegrationTestCase(unittest.TestCase):
+    testbed_id = "planetlab"
+    testbed_version = "01"
+    slicename = "inria_nepi"
+    plchost = "nepiplc.pl.sophia.inria.fr"
+    
+    host1 = "nepi1.pl.sophia.inria.fr"
+    host2 = "nepi2.pl.sophia.inria.fr"
+
     def setUp(self):
         self.root_dir = tempfile.mkdtemp()
 
@@ -27,9 +35,10 @@ class PlanetLabCrossIntegrationTestCase(unittest.TestCase):
             shutil.rmtree(self.root_dir)
 
     def make_experiment_desc(self):
-        testbed_id = "planetlab"
-        testbed_version = "01"
-        slicename = "inria_nepi12"
+        testbed_id = self.testbed_id
+        testbed_version = self.testbed_version
+        slicename = self.slicename
+        plchost = self.plchost
         pl_ssh_key = os.environ.get(
             "PL_SSH_KEY",
             "%s/.ssh/id_rsa_planetlab" % (os.environ['HOME'],) )
@@ -43,12 +52,13 @@ class PlanetLabCrossIntegrationTestCase(unittest.TestCase):
         pl_desc.set_attribute_value("sliceSSHKey", pl_ssh_key)
         pl_desc.set_attribute_value("authUser", pl_user)
         pl_desc.set_attribute_value("authPass", pl_pwd)
+        pl_desc.set_attribute_value("plcHost", plchost)
         
         return pl_desc, exp_desc
     
     def make_pl_tapnode(self, pl):
         node1 = pl.create("Node")
-        node1.set_attribute_value("hostname", "onelab11.pl.sophia.inria.fr")
+        node1.set_attribute_value("hostname", self.host1)
         node1.set_attribute_value("label", "node1")
         node1.set_attribute_value("emulation", True) # require emulation
         iface1 = pl.create("NodeInterface")
@@ -112,7 +122,7 @@ class PlanetLabCrossIntegrationTestCase(unittest.TestCase):
         pl, exp = self.make_experiment_desc()
         
         node1 = pl.create("Node")
-        node1.set_attribute_value("hostname", "onelab11.pl.sophia.inria.fr")
+        node1.set_attribute_value("hostname", self.host1)
         node1.set_attribute_value("label", "node1")
         iface1 = pl.create("NodeInterface")
         iface1.set_attribute_value("label", "node1iface")
@@ -213,13 +223,13 @@ class PlanetLabCrossIntegrationTestCase(unittest.TestCase):
         tap1.set_attribute_value("snat", True)
         
         # Add second PL node (ping target)
-        node2 = pl.create("Node")
-        node2.set_attribute_value("hostname", "onelab10.pl.sophia.inria.fr")
-        node2.set_attribute_value("label", "node2")
-        iface2 = pl.create("NodeInterface")
-        iface2.set_attribute_value("label", "node2iface")
-        node2.connector("devs").connect(iface2.connector("node"))
-        iface2.connector("inet").connect(inet.connector("devs"))
+        #node2 = pl.create("Node")
+        #node2.set_attribute_value("hostname", self.host2)
+        #node2.set_attribute_value("label", "node2")
+        #iface2 = pl.create("NodeInterface")
+        #iface2.set_attribute_value("label", "node2iface")
+        #node2.connector("devs").connect(iface2.connector("node"))
+        #iface2.connector("inet").connect(inet.connector("devs"))
         
         # Create NS3 node that is responsive to pings, connected
         # to node1 through the Tap interface
@@ -248,7 +258,7 @@ class PlanetLabCrossIntegrationTestCase(unittest.TestCase):
 
         # Create NS3 ping application, pinging the PL node
         ping = ns3_desc.create("ns3::V4Ping")
-        ping.set_attribute_value("Remote", "{#[node2iface].addr[0].[Address]#}")
+        ping.set_attribute_value("Remote", "209.85.146.147") #"{#[node2iface].addr[0].[Address]#}")
         ping.set_attribute_value("StartTime", "0s")
         ping.set_attribute_value("StopTime", "10s")
         ping.connector("node").connect(ns1.connector("apps"))