+ def do_start(self):
+ super(Application, self).do_start()
+ time.sleep(random.random() * 3)
+ self.ec.schedule("0.5s", self.stop)
+
+class ErrorApplication(ResourceManager):
+ _rtype = "ErrorApplication"
+
+ def __init__(self, ec, guid):
+ super(ErrorApplication, self).__init__(ec, guid)
+
+ def do_deploy(self):
+ node = self.get_connected(Node.get_rtype())[0]
+ if node.state < ResourceState.READY:
+ self.ec.schedule("0.5s", self.deploy)
+ else:
+ time.sleep(random.random() * 2)
+ raise RuntimeError("NOT A REAL ERROR. JUST TESTING")
+
+class ResourceFactoryTestCase(unittest.TestCase):
+ def test_add_resource_factory(self):
+ from nepi.execution.resource import ResourceFactory
+
+ ResourceFactory._resource_types = dict()
+ ResourceFactory.register_type(MyResource)
+ ResourceFactory.register_type(AnotherResource)
+
+ # Take into account default 'Critical' attribute
+ self.assertEqual(MyResource.get_rtype(), "MyResource")
+ self.assertEqual(len(MyResource._attributes), 3)
+
+ self.assertEqual(ResourceManager.get_rtype(), "Resource")
+ self.assertEqual(len(ResourceManager._attributes), 2)
+
+ self.assertEqual(AnotherResource.get_rtype(), "AnotherResource")
+ self.assertEqual(len(AnotherResource._attributes), 2)
+
+ self.assertEqual(len(ResourceFactory.resource_types()), 2)
+
+ # restore factory state for other tests
+ from nepi.execution.resource import populate_factory
+ ResourceFactory._resource_types = dict()
+ populate_factory()