Testbed status
authorClaudio-Daniel Freire <claudio-daniel.freire@inria.fr>
Thu, 28 Jul 2011 17:09:32 +0000 (19:09 +0200)
committerClaudio-Daniel Freire <claudio-daniel.freire@inria.fr>
Thu, 28 Jul 2011 17:09:32 +0000 (19:09 +0200)
src/nepi/core/execute.py
src/nepi/core/testbed_impl.py
src/nepi/util/constants.py
src/nepi/util/proxy.py

index 8eff5e1..5fad868 100644 (file)
@@ -3,7 +3,7 @@
 
 from nepi.core.attributes import Attribute, AttributesMap
 from nepi.util import validation
-from nepi.util.constants import ApplicationStatus as AS, TIME_NOW, DeploymentConfiguration as DC
+from nepi.util.constants import ApplicationStatus as AS, TestbedStatus as TS, TIME_NOW, DeploymentConfiguration as DC
 from nepi.util.parser._xml import XmlExperimentParser
 import sys
 import re
@@ -200,6 +200,9 @@ class TestbedController(object):
 
     def status(self, guid):
         raise NotImplementedError
+    
+    def testbed_status(self):
+        raise NotImplementedError
 
     def trace(self, guid, trace_id, attribute='value'):
         raise NotImplementedError
@@ -232,6 +235,7 @@ class ExperimentController(object):
         self._root_dir = root_dir
         self._netreffed_testbeds = set()
         self._guids_in_testbed_cache = dict()
+        self._failed_testbeds = set()
         
         if experiment_xml is None and root_dir is not None:
             # Recover
@@ -339,9 +343,13 @@ class ExperimentController(object):
         else:
             # recover recoverable controllers
             for guid in to_recover:
-                self._testbeds[guid].do_setup()
-                self._testbeds[guid].recover()
-        
+                try:
+                    self._testbeds[guid].do_setup()
+                    self._testbeds[guid].recover()
+                except:
+                    # Mark failed
+                    self._failed_testbeds.add(guid)
+    
         def steps_to_configure(self, allowed_guids):
             # perform setup in parallel for all test beds,
             # wait for all threads to finish
@@ -386,8 +394,12 @@ class ExperimentController(object):
             else:
                 # recover recoverable controllers
                 for guid in to_recover:
-                    self._testbeds[guid].do_setup()
-                    self._testbeds[guid].recover()
+                    try:
+                        self._testbeds[guid].do_setup()
+                        self._testbeds[guid].recover()
+                    except:
+                        # Mark failed
+                        self._failed_testbeds.add(guid)
 
             # configure dependant testbeds
             steps_to_configure(self, to_restart)
@@ -561,8 +573,10 @@ class ExperimentController(object):
    
     def recover(self):
         # reload perviously persisted testbed access configurations
+        self._failed_testbeds.clear()
         self._load_testbed_proxies()
 
+        # re-program testbeds that need recovery
         self._start(recover = True)
 
     def is_finished(self, guid):
@@ -570,12 +584,32 @@ class ExperimentController(object):
         if testbed != None:
             return testbed.status(guid) == AS.STATUS_FINISHED
         raise RuntimeError("No element exists with guid %d" % guid)    
+    
+    def _testbed_recovery_policy(self, guid, data = None):
+        if data is None:
+            parser = XmlExperimentParser()
+            data = parser.from_xml_to_data(self._experiment_design_xml)
+        
+        return data.get_attribute_data(guid, DC.RECOVERY_POLICY)
 
     def status(self, guid):
-        testbed = self._testbed_for_guid(guid)
-        if testbed != None:
-            return testbed.status(guid)
-        raise RuntimeError("No element exists with guid %d" % guid)    
+        if guid in self._testbeds:
+            # guid is a testbed
+            # report testbed status
+            if guid in self._failed_testbeds:
+                return TS.STATUS_FAILED
+            else:
+                try:
+                    return self._testbeds[guid].status()
+                except:
+                    return TS.STATUS_UNRESPONSIVE
+        else:
+            # guid is an element
+            testbed = self._testbed_for_guid(guid)
+            if testbed is not None:
+                return testbed.status(guid)
+            else:
+                return AS.STATUS_UNDETERMINED
 
     def set(self, guid, name, value, time = TIME_NOW):
         testbed = self._testbed_for_guid(guid)
@@ -627,6 +661,8 @@ class ExperimentController(object):
     def _testbed_for_guid(self, guid):
         for testbed_guid in self._testbeds.keys():
             if guid in self._guids_in_testbed(testbed_guid):
+                if testbed_guid in self._failed_testbeds:
+                    return None
                 return self._testbeds[testbed_guid]
         return None
 
@@ -756,10 +792,8 @@ class ExperimentController(object):
                             to_restart.add(guid)
                     except:
                         if recover:
-                            policy = data.get_attribute_data(guid, DC.RECOVERY_POLICY)
-                            if policy == DC.POLICY_FAIL:
-                                raise
-                            elif policy == DC.POLICY_RECOVER:
+                            policy = self._testbed_recovery_policy(guid, data=data)
+                            if policy == DC.POLICY_RECOVER:
                                 self._create_testbed_controller(
                                     guid, data, element_guids, False)
                                 to_recover.add(guid)
@@ -768,7 +802,8 @@ class ExperimentController(object):
                                     guid, data, element_guids, False)
                                 to_restart.add(guid)
                             else:
-                                raise
+                                # Mark failed
+                                self._failed_testbeds.add(guid)
                         else:
                             raise
         
index fd947c0..57afb2c 100644 (file)
@@ -457,6 +457,9 @@ class TestbedController(execute.TestbedController):
         if status_function:
             return status_function(self, guid)
         return AS.STATUS_UNDETERMINED
+    
+    def testbed_status(self):
+        return self._status
 
     def trace(self, guid, trace_id, attribute='value'):
         if attribute == 'value':
index 741edd8..be566e7 100644 (file)
@@ -48,6 +48,7 @@ class TestbedStatus:
     STATUS_STARTED = 6
     STATUS_STOPPED = 7
     STATUS_FAILED = 8
+    STATUS_UNRESPONSIVE = 9
 
 class DeploymentConfiguration:
     MODE_SINGLE_PROCESS = "SINGLE"
index a323688..4fe9061 100644 (file)
@@ -61,6 +61,7 @@ GET_TESTBED_ID = 39
 GET_TESTBED_VERSION = 40
 TRACES_INFO = 41
 EXEC_XML = 42
+TESTBED_STATUS  = 43
 
 instruction_text = dict({
     OK:     "OK",
@@ -674,6 +675,12 @@ class TestbedControllerServer(BaseServer):
     def status(self, guid):
         return self._testbed.status(guid)
 
+    @Marshalling.handles(TESTBED_STATUS)
+    @Marshalling.args()
+    @Marshalling.retval(int)
+    def testbed_status(self):
+        return self._testbed.testbed_status()
+
     @Marshalling.handles(GET_ATTRIBUTE_LIST)
     @Marshalling.args(int, Marshalling.nullint, Marshalling.bool)
     @Marshalling.retval( Marshalling.pickled_data )