Added routes to OMF nodes
[nepi.git] / src / nepi / core / execute.py
index 1c51d58..5e0c5ea 100644 (file)
@@ -1,4 +1,3 @@
-#!/usr/bin/env python
 # -*- coding: utf-8 -*-
 
 from nepi.core.attributes import Attribute, AttributesMap
@@ -84,7 +83,8 @@ class TestbedController(object):
         """Instructs the addition of an address"""
         raise NotImplementedError
 
-    def defer_add_route(self, guid, destination, netprefix, nexthop, metric = 0):
+    def defer_add_route(self, guid, destination, netprefix, nexthop, 
+            metric = 0, device = None):
         """Instructs the addition of a route"""
         raise NotImplementedError
 
@@ -241,6 +241,7 @@ class ExperimentController(object):
         self._failed_testbeds = set()
         self._started_time = None
         self._stopped_time = None
+        self._testbed_order = []
       
         self._logger = logging.getLogger('nepi.core.execute')
         level = logging.ERROR
@@ -323,13 +324,17 @@ class ExperimentController(object):
     def _parallel(callables):
         excs = []
         def wrap(callable):
-            @functools.wraps(callable)
             def wrapped(*p, **kw):
                 try:
                     callable(*p, **kw)
                 except:
                     logging.exception("Exception occurred in asynchronous thread:")
                     excs.append(sys.exc_info())
+            try:
+                wrapped = functools.wraps(callable)(wrapped)
+            except:
+                # functools.partial not wrappable
+                pass
             return wrapped
         threads = [ threading.Thread(target=wrap(callable)) for callable in callables ]
         for thread in threads:
@@ -403,6 +408,9 @@ class ExperimentController(object):
                             for guid,testbed in self._testbeds.iteritems()
                             if guid in allowed_guids])
             self._clear_caches()
+            
+            # Store testbed order
+            self._testbed_order.append(allowed_guids)
 
         steps_to_configure(self, to_restart)
 
@@ -693,11 +701,30 @@ class ExperimentController(object):
 
     def shutdown(self):
         exceptions = list()
-        for testbed in self._testbeds.values():
+        ordered_testbeds = set()
+
+        def shutdown_testbed(guid):
             try:
+                testbed = self._testbeds[guid]
+                ordered_testbeds.add(guid)
                 testbed.shutdown()
             except:
                 exceptions.append(sys.exc_info())
+                
+        self._logger.debug("ExperimentController: Starting parallel shutdown")
+        
+        for testbed_guids in reversed(self._testbed_order):
+            testbed_guids = set(testbed_guids) - ordered_testbeds
+            self._logger.debug("ExperimentController: Shutting down %r", testbed_guids)
+            self._parallel([functools.partial(shutdown_testbed, guid)
+                            for guid in testbed_guids])
+        remaining_guids = set(self._testbeds) - ordered_testbeds
+        if remaining_guids:
+            self._logger.debug("ExperimentController: Shutted down %r", ordered_testbeds)
+            self._logger.debug("ExperimentController: Shutting down %r", remaining_guids)
+            self._parallel([functools.partial(shutdown_testbed, guid)
+                            for guid in remaining_guids])
+            
         for exc_info in exceptions:
             raise exc_info[0], exc_info[1], exc_info[2]
 
@@ -991,8 +1018,10 @@ class ExperimentController(object):
                         testbed.defer_add_address(guid, address, netprefix, 
                                 broadcast)
                 # routes
-                for (destination, netprefix, nexthop, metric) in data.get_route_data(guid):
-                    testbed.defer_add_route(guid, destination, netprefix, nexthop, metric)
+                for (destination, netprefix, nexthop, metric, device) in \
+                        data.get_route_data(guid):
+                    testbed.defer_add_route(guid, destination, netprefix, nexthop, 
+                            metric, device)
                 # store connections data
                 for (connector_type_name, other_guid, other_connector_type_name) \
                         in data.get_connection_data(guid):
@@ -1074,20 +1103,33 @@ class ExperimentController(object):
                     elem_cross_data[attr_name] = _undefer(attr_value)
         
         return cross_data
-    """
+
 class ExperimentSuite(object):
-    def __init__(self, experiment_xml, access_config, repetitions,
-            duration, wait_guids):
+    def __init__(self, experiment_xml, access_config, repetitions = None,
+            duration = None, wait_guids = None):
         self._experiment_xml = experiment_xml
         self._access_config = access_config
-        self._experiments = dict()
-        self._repetitions = repetitions
+        self._controllers = dict()
+        self._access_configs = dict()
+        self._repetitions = 1 if not repetitions else repetitions
         self._duration = duration
         self._wait_guids = wait_guids
         self._current = None
         self._status = TS.STATUS_ZERO
         self._thread = None
 
+    def current(self):
+        return self._current
+
+    def status(self):
+        return self._status
+
+    def is_finished(self):
+        return self._status == TS.STATUS_STOPPED
+
+    def get_access_configurations(self):
+        return self._access_configs.values()
+
     def start(self):
         self._status  = TS.STATUS_STARTED
         self._thread = threading.Thread(target = self._run_experiment_suite)
@@ -1097,16 +1139,24 @@ class ExperimentSuite(object):
         if self._thread:
             self._thread.join()
             self._thread = None
+        for controller in self._controllers.values():
+            controller.shutdown()
+
+    def get_current_access_config(self):
+        return self._access_configs[self._current]
 
     def _run_experiment_suite(self):
-        for i in xrange[0, self.repetitions]:
+        for i in xrange(1, self._repetitions):
             self._current = i
             self._run_one_experiment()
+        self._status = TS.STATUS_STOPPED
 
     def _run_one_experiment(self):
+        from nepi.util import proxy
         access_config = proxy.AccessConfiguration()
         for attr in self._access_config.attributes:
-            access_config.set_attribute_value(attr.name, attr.value)
+            if attr.value:
+                access_config.set_attribute_value(attr.name, attr.value)
         access_config.set_attribute_value(DC.DEPLOYMENT_MODE, DC.MODE_DAEMON)
         root_dir = "%s_%d" % (
                 access_config.get_attribute_value(DC.ROOT_DIRECTORY), 
@@ -1114,25 +1164,17 @@ class ExperimentSuite(object):
         access_config.set_attribute_value(DC.ROOT_DIRECTORY, root_dir)
         controller = proxy.create_experiment_controller(self._experiment_xml,
                 access_config)
-        self._experiments[self._current] = controller
+        self._access_configs[self._current] = access_config
+        self._controllers[self._current] = controller
         controller.start()
         started_at = time.time()
         # wait until all specified guids have finished execution
         if self._wait_guids:
-            while all(itertools.imap(controller.is_finished, self._wait_guids):
+            while all(itertools.imap(controller.is_finished, self._wait_guids)):
                 time.sleep(0.5)
         # wait until the minimum experiment duration time has elapsed 
         if self._duration:
             while (time.time() - started_at) < self._duration:
                 time.sleep(0.5)
         controller.stop()
-        #download results!!
-        controller.shutdown()
-    """
-
-
-
-
-
-