More changes to make cross connections work... not working still
authorAlina Quereilhac <alina.quereilhac@inria.fr>
Thu, 28 Apr 2011 20:58:27 +0000 (22:58 +0200)
committerAlina Quereilhac <alina.quereilhac@inria.fr>
Thu, 28 Apr 2011 20:58:27 +0000 (22:58 +0200)
src/nepi/core/execute.py
src/nepi/core/testbed_impl.py
src/nepi/testbeds/netns/metadata_v01.py
src/nepi/testbeds/ns3/attributes_metadata_v3_9_RC3.py
src/nepi/testbeds/ns3/factories_metadata_v3_9_RC3.py
src/nepi/testbeds/ns3/metadata_v3_9_RC3.py
src/nepi/util/proxy.py

index cbc8f18..7ac77a4 100644 (file)
@@ -164,6 +164,14 @@ class TestbedController(object):
         self._testbed_id = testbed_id
         self._testbed_version = testbed_version
 
+    @property
+    def testbed_id(self):
+        return self._testbed_id
+
+    @property
+    def testbed_version(self):
+        return self._testbed_version
+
     @property
     def guids(self):
         raise NotImplementedError
@@ -350,6 +358,12 @@ class ExperimentController(object):
         self._parallel([lambda : testbed.do_create()
                         for testbed in self._testbeds.itervalues()])
 
+        # TODO! DEBUG!!!!
+        # ONLY THE LAST TESTBED HAS ELEMENTS CREATED!!!
+        #for testbed in self._testbeds.itervalues():
+        #    print testbed._testbed_id
+        #    print testbed._elements
+
         self._parallel([lambda : testbed.do_connect_init()
                         for testbed in self._testbeds.itervalues()])
 
@@ -605,6 +619,8 @@ class ExperimentController(object):
                     testbed.defer_connect(guid, connector_type_name, 
                             cross_guid, cross_connector_type_name)
                 else: 
+                    cross_testbed = self._testbeds[cross_testbed_guid]
+                    cross_testbed_id = cross_testbed.testbed_id
                     testbed.defer_cross_connect(guid, connector_type_name, cross_guid, 
                             cross_testbed_id, cross_factory_id, 
                             cross_connector_type_name)
@@ -622,7 +638,7 @@ class ExperimentController(object):
                 testbed.defer_add_route(guid, destination, netprefix, nexthop)
                 
     def _add_crossdata(self, testbed_guid, guid, cross_testbed_guid, cross_guid):
-        if testbed_guid not in self._crossdata:
+        if testbed_guid not in self._cross_data:
             self._cross_data[testbed_guid] = dict()
         if cross_testbed_guid not in self._cross_data[testbed_guid]:
             self._cross_data[testbed_guid][cross_testbed_guid] = list()
@@ -637,16 +653,17 @@ class ExperimentController(object):
         cross_data = dict()
         if not testbed_guid in self._cross_data:
             return cross_data
-        for cross_testbed_guid, guid_list in self._cross_data[testbed_guid]:
+        for cross_testbed_guid, guid_list in \
+                self._cross_data[testbed_guid].iteritems():
             cross_data[cross_testbed_guid] = dict()
             cross_testbed = self._testbeds[cross_testbed_guid]
             for cross_guid in guid_list:
-                cross_data_guid = dict()
-                cross_data[cross_testbed_guid][cross_guid] = cross_data_guid
+                elem_cross_data = dict()
+                cross_data[cross_testbed_guid][cross_guid] = elem_cross_data
                 attributes_list = cross_testbed.get_attribute_list(cross_guid)
                 for attr_name in attributes_list:
                     attr_value = cross_testbed.get(TIME_NOW, cross_guid, 
                             attr_name)
-                    cross_data_guid[attr_name] = attr_value
-        return cross_data
+                    elem_cross_data[attr_name] = attr_value
+        return elem_cross_data
     
index 78cbb1f..e0608bc 100644 (file)
@@ -121,7 +121,7 @@ class TestbedController(execute.TestbedController):
         connector_type = factory.connector_type(connector_type_name)
         connector_type.can_connect(cross_testbed_id, cross_factory_id, 
                 cross_connector_type_name, count, must_cross = True)
-        if not guid in self._connect:
+        if not guid in self._cross_connect:
             self._cross_connect[guid] = dict()
         if not connector_type_name in self._cross_connect[guid]:
              self._cross_connect[guid][connector_type_name] = dict()
@@ -277,8 +277,8 @@ class TestbedController(execute.TestbedController):
                         cross_testbed_id, cross_factory_id, 
                         cross_conector_type_name)
                 if connect_code:
-                    cross_data_guid = cross_data[cross_testbed_id][cross_guid]
-                    connect_code(element, cross_guid, cross_data_guid)       
+                    elem_data_guid = cross_data[cross_testbed_id][cross_guid]
+                    connect_code(self, element, elem_cross_data)       
 
     def do_cross_connect_init(self, cross_data):
         self._do_cross_connect(cross_data)
index 472935c..526158a 100644 (file)
@@ -23,13 +23,11 @@ FDNETDEV = "ns3::FileDescriptorNetDevice"
 def connect_switch(testbed_instance, switch, interface):
     switch.connect(interface)
    
-#XXX: This connection function cannot be use to transfer a file descriptor
-# to a remote tap device
-def connect_fd_local(testbed_instance, tap, fdnd):
+def connect_fd(testbed_instance, tap, cross_data):
     import passfd
     import socket
     fd = tap.file_descriptor
-    address = fdnd.socket_address
+    address = cross_data["LinuxSocketAddress"]
     sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
     sock.connect(address)
     passfd.sendfd(sock, fd, '0')
@@ -220,7 +218,7 @@ connections = [
     dict({
         "from": (TESTBED_ID, TAPIFACE, "fd"),
         "to":   (NS3_TESTBED_ID, FDNETDEV, "fd"),
-        "compl_code": connect_fd_local,
+        "compl_code": connect_fd,
         "can_cross": True
     }),
      dict({
index 819d334..b469236 100644 (file)
@@ -2393,4 +2393,12 @@ attributes = dict({
         "allowed": wifi_standards.keys(),
         "help": "Wifi PHY standard"
     }),
+    "LinuxSocketAddress": dict({
+        "name": "LinuxSocketAddress",
+        "value": "",
+        "validation_function": None,
+        "flags": Attribute.Invisible,
+        "type": Attribute.STRING,
+        "help": "Socket address assigned to the Linux socket created to recive file descriptor"
+    }),
 })
index 6bb651a..1f4f34f 100644 (file)
@@ -629,7 +629,8 @@ factories_info = dict({
         "help": "Network interface associated to a file descriptor",
         "connector_types": ["node", "fd"],
         "allow_addresses": True,
-        "box_attributes": ["Address"],
+        "box_attributes": ["Address", 
+            "LinuxSocketAddress"],
         "traces": ["fdpcap"]
     }),
      "ns3::CsmaNetDevice": dict({
index 3a3891c..59b7413 100644 (file)
@@ -61,6 +61,10 @@ def connect_node_application(testbed_instance, node, application):
 def connect_node_other(tesbed_instance, node, other):
     node.AggregateObject(other)
 
+def connect_fd(testbed_instance, fdnd, cross_data):
+    address = fdnd.socket_address
+    fdnd.set_attribute_value("LinuxSocketAddress", address)
+
 ### Connector information ###
 
 connector_types = dict({
@@ -448,7 +452,8 @@ connections = [
     dict({
         "from": ( "ns3", "ns3::FileDescriptorNetDevice", "fd" ),
         "to":   ( "netns", "TapNodeInterface", "fd" ),
-        "can_cross": False
+        "init_code": connect_fd,
+        "can_cross": True
     }),
 ]
 
index 0fdc47a..40f06d4 100644 (file)
@@ -46,10 +46,12 @@ GUIDS  = 27
 GET_ROUTE = 28
 GET_ADDRESS = 29
 RECOVER = 30
-DO_PRECONFIGURE    = 31
-GET_ATTRIBUTE_LIST = 32
-DO_CONNECT_COMPL   = 33
+DO_PRECONFIGURE     = 31
+GET_ATTRIBUTE_LIST  = 32
+DO_CONNECT_COMPL    = 33
 DO_CROSS_CONNECT_COMPL  = 34
+TESTBED_ID  = 35
+TESTBED_VERSION  = 36
 
 # PARAMETER TYPE
 STRING  =  100
@@ -100,7 +102,9 @@ testbed_messages = dict({
     STATUS: "%d|%s" % (STATUS, "%d"),
     GUIDS:  "%d" % GUIDS,
     GET_ATTRIBUTE_LIST:  "%d" % GET_ATTRIBUTE_LIST,
-    })
+    TESTBED_ID:  "%d" % TESTBED_ID,
+    TESTBED_VERSION:  "%d" % TESTBED_VERSION,
+   })
 
 instruction_text = dict({
     OK:     "OK",
@@ -141,7 +145,9 @@ instruction_text = dict({
     STRING: "STRING",
     INTEGER:    "INTEGER",
     BOOL:   "BOOL",
-    FLOAT:  "FLOAT"
+    FLOAT:  "FLOAT",
+    TESTBED_ID: "TESTBED_ID",
+    TESTBED_VERSION: "TESTBED_VERSION",
     })
 
 def get_type(value):
@@ -402,6 +408,10 @@ class TestbedControllerServer(server.Server):
                     reply = self.guids(params)
                 elif instruction == GET_ATTRIBUTE_LIST:
                     reply = self.get_attribute_list(params)
+                elif instruction == TESTBED_ID:
+                    reply = self.testbed_id(params)
+                elif instruction == TESTBED_VERSION:
+                    reply = self.testbed_version(params)
                 else:
                     error = "Invalid instruction %s" % instruction
                     self.log_error(error)
@@ -416,8 +426,18 @@ class TestbedControllerServer(server.Server):
 
     def guids(self, params):
         guids = self._testbed.guids
-        guids = ",".join(map(str, guids))
-        result = base64.b64encode(guids)
+        value = cPickle.dumps(guids)
+        result = base64.b64encode(value)
+        return "%d|%s" % (OK, result)
+
+    def testbed_id(self, params):
+        testbed_id = self._testbed.testbed_id
+        result = base64.b64encode(str(testbed_id))
+        return "%d|%s" % (OK, result)
+
+    def testbed_version(self, params):
+        testbed_version = self._testbed.testbed_version
+        result = base64.b64encode(str(testbed_version))
         return "%d|%s" % (OK, result)
 
     def defer_create(self, params):
@@ -754,7 +774,32 @@ class TestbedControllerProxy(object):
         text = base64.b64decode(result[1])
         if code == ERROR:
             raise RuntimeError(text)
-        return map(int, text.split(","))
+        guids = cPickle.loads(text)
+        return guids
+
+    @property
+    def testbed_id(self):
+        msg = testbed_messages[TESTBED_ID]
+        self._client.send_msg(msg)
+        reply = self._client.read_reply()
+        result = reply.split("|")
+        code = int(result[0])
+        text = base64.b64decode(result[1])
+        if code == ERROR:
+            raise RuntimeError(text)
+        return int(text)
+
+    @property
+    def testbed_version(self):
+        msg = testbed_messages[TESTBED_VERSION]
+        self._client.send_msg(msg)
+        reply = self._client.read_reply()
+        result = reply.split("|")
+        code = int(result[0])
+        text = base64.b64decode(result[1])
+        if code == ERROR:
+            raise RuntimeError(text)
+        return int(text)
 
     def defer_configure(self, name, value):
         msg = testbed_messages[CONFIGURE]