Adding start_with_condition, stop_with_condition and set_with_condition to the EC...
authorAlina Quereilhac <alina.quereilhac@inria.fr>
Fri, 5 Apr 2013 22:41:36 +0000 (00:41 +0200)
committerAlina Quereilhac <alina.quereilhac@inria.fr>
Fri, 5 Apr 2013 22:41:36 +0000 (00:41 +0200)
src/neco/execution/ec.py
src/neco/execution/resource.py
src/neco/resources/linux/application.py
src/neco/resources/linux/node.py
src/neco/util/timefuncs.py
test/execution/resource.py

index 076e4dd..1b7d4be 100644 (file)
@@ -6,10 +6,13 @@ import threading
 
 from neco.util import guid
 from neco.util.timefuncs import strfnow, strfdiff, strfvalid 
-from neco.execution.resource import ResourceFactory
+from neco.execution.resource import ResourceFactory, ResourceAction, \
+        ResourceState
 from neco.execution.scheduler import HeapScheduler, Task, TaskStatus
 from neco.util.parallel import ParallelRun
 
+# TODO: use multiprocessing instead of threading
+
 class ExperimentController(object):
     def __init__(self, root_dir = "/tmp", loglevel = 'error'): 
         super(ExperimentController, self).__init__()
@@ -75,46 +78,43 @@ class ExperimentController(object):
         rm1.connect(guid2)
         rm2.connect(guid1)
 
-    def discover_resource(self, guid, filters):
-        rm = self.get_resource(guid)
-        return rm.discover(filters)
+    def register_condition(self, group1, action, group2, state,
+            time = None):
+        """ Registers an action START or STOP for all RM on group1 to occur 
+            time 'time' after all elements in group2 reached state 'state'.
 
-    def provision_resource(self, guid, filters):
-        rm = self.get_resource(guid)
-        return rm.provision(filters)
+            :param group1: List of guids of RMs subjected to action
+            :type group1: list
 
-    def register_start(self, group1, time, after_status, group2):
-        if isinstance(group1, int):
-            group1 = list[group1]
-        if isinstance(group2, int):
-            group2 = list[group2]
+            :param action: Action to register (either START or STOP)
+            :type action: ResourceAction
 
-        for guid1 in group1:
-            for guid2 in group2:
-                rm = self.get_resource(guid)
-                rm.start_after(time, after_status, guid2)
+            :param group2: List of guids of RMs to we waited for
+            :type group2: list
 
-    def register_stop(self, group1, time, after_status, group2):
-        if isinstance(group1, int):
-            group1 = list[group1]
-        if isinstance(group2, int):
-            group2 = list[group2]
+            :param state: State to wait for on RMs (STARTED, STOPPED, etc)
+            :type state: ResourceState
 
-        for guid1 in group1:
-            for guid2 in group2:
-                rm = self.get_resource(guid)
-                rm.stop_after(time, after_status, guid2)
+            :param time: Time to wait after group2 has reached status 
+            :type time: string
 
-    def register_set(self, name, value, group1, time, after_status, group2):
+        """
         if isinstance(group1, int):
             group1 = list[group1]
         if isinstance(group2, int):
             group2 = list[group2]
 
         for guid1 in group1:
-            for guid2 in group2:
-                rm = self.get_resource(guid)
-                rm.set_after(name, value, time, after_status, guid2)
+            rm = self.get_resource(guid)
+            rm.register_condition(action, group2, state, time)
+
+    def discover(self, guid, filters):
+        rm = self.get_resource(guid)
+        return rm.discover(filters)
+
+    def provision(self, guid, filters):
+        rm = self.get_resource(guid)
+        return rm.provision(filters)
 
     def get(self, guid, name):
         rm = self.get_resource(guid)
@@ -124,15 +124,83 @@ class ExperimentController(object):
         rm = self.get_resource(guid)
         return rm.set(name, value)
 
-    def status(self, guid):
+    def state(self, guid):
         rm = self.get_resource(guid)
-        return rm.status()
+        return rm.state
 
     def stop(self, guid):
         rm = self.get_resource(guid)
         return rm.stop()
 
-    def deploy(self, group = None, start_when_all_ready = True):
+    def start(self, guid):
+        rm = self.get_resource(guid)
+        return rm.start()
+
+    def set_with_conditions(self, name, value, group1, group2, state,
+            time = None):
+        """ Set value 'value' on attribute with name 'name' on all RMs of
+            group1 when 'time' has elapsed since all elements in group2 
+            have reached state 'state'.
+
+            :param name: Name of attribute to set in RM
+            :type name: string
+
+            :param value: Value of attribute to set in RM
+            :type name: string
+
+            :param group1: List of guids of RMs subjected to action
+            :type group1: list
+
+            :param action: Action to register (either START or STOP)
+            :type action: ResourceAction
+
+            :param group2: List of guids of RMs to we waited for
+            :type group2: list
+
+            :param state: State to wait for on RMs (STARTED, STOPPED, etc)
+            :type state: ResourceState
+
+            :param time: Time to wait after group2 has reached status 
+            :type time: string
+
+        """
+        if isinstance(group1, int):
+            group1 = list[group1]
+        if isinstance(group2, int):
+            group2 = list[group2]
+
+        for guid1 in group1:
+            rm = self.get_resource(guid)
+            rm.set_with_conditions(name, value, group2, state, time)
+
+    def stop_with_conditions(self, guid):
+        rm = self.get_resource(guid)
+        return rm.stop_with_conditions()
+
+    def start_with_conditions(self, guid):
+        rm = self.get_resource(guid)
+        return rm.start_with_condition()
+
+    def deploy(self, group = None, wait_all_ready = True):
+        """ Deploy all resource manager in group
+
+        :param group: List of guids of RMs to deploy
+        :type group: list
+
+        :param wait_all_ready: Wait until all RMs are deployed in
+            order to start the RMs
+        :type guid: int
+
+        """
+        def steps(rm):
+            rm.deploy()
+            rm.start_with_conditions()
+
+            # Only if the RM has STOP consitions we
+            # schedule a stop. Otherwise the RM will stop immediately
+            if rm.conditions.get(ResourceAction.STOP):
+                rm.stop_with_conditions()
+
         if not group:
             group = self.resources
 
@@ -140,13 +208,13 @@ class ExperimentController(object):
         for guid in group:
             rm = self.get_resource(guid)
 
-            kwargs = {'target': rm.deploy}
-            if start_when_all_ready:
+            if wait_all_ready:
                 towait = list(group)
                 towait.remove(guid)
-                kwargs['args'] = towait
+                self.register_condition(guid, ResourceAction.START, 
+                        towait, ResourceState.DEPLOYED)
 
-            thread = threading.Thread(kwargs)
+            thread = threading.Thread(target = steps, args = (rm))
             threads.append(thread)
             thread.start()
 
index 62bf821..deaa063 100644 (file)
@@ -1,7 +1,25 @@
+
+from neco.util.timefuncs import strfnow, strfdiff, strfvalid 
+
 import copy
+import functools
 import logging
 import weakref
 
+_reschedule_delay = "1s"
+
+class ResourceAction:
+    START = 0
+    STOP = 1
+
+class ResourceState:
+    NEW = 0
+    DEPLOYED = 1
+    STARTED = 2
+    STOPPED = 3
+    FAILED = 4
+    RELEASED = 5
+
 def clsinit(cls):
     cls._clsinit()
     return cls
@@ -63,14 +81,25 @@ class ResourceManager(object):
         self._guid = guid
         self._ec = weakref.ref(ec)
         self._connections = set()
+        self._conditions = dict() 
+
         # the resource instance gets a copy of all attributes
         # that can modify
         self._attrs = copy.deepcopy(self._attributes)
 
+        self._state = ResourceState.NEW
+
+        self._start_time = None
+        self._stop_time = None
+
         # Logging
         self._logger = logging.getLogger("neco.execution.resource.Resource.%s" % 
             self.guid)
 
+    @property
+    def logger(self):
+        return self._logger
+
     @property
     def guid(self):
         return self._guid
@@ -79,48 +108,167 @@ class ResourceManager(object):
     def ec(self):
         return self._ec()
 
-    def connect(self, guid):
-        if (self._validate_connection(guid)):
-            self._connections.add(guid)
-
     @property
     def connections(self):
         return self._connections
 
-    def discover(self, filters):
-        pass
+    @property
+    def conditons(self):
+        return self._conditions
 
-    def provision(self, filters):
-        pass
+    @property
+    def start_time(self):
+        """ timestamp with  """
+        return self._start_time
 
-    def set(self, name, value):
-        attr = self._attrs[name]
-        attr.value = value
+    @property
+    def stop_time(self):
+        return self._stop_time
 
-    def get(self, name):
-        attr = self._attrs[name]
-        return attr.value
+    @property
+    def state(self):
+        return self._state
 
-    def start_after(self, time, after_status, guid):
-        pass
+    def connect(self, guid):
+        if (self._validate_connection(guid)):
+            self._connections.add(guid)
 
-    def stop_after(self, time, after_status, guid):
+    def discover(self, filters = None):
         pass
 
-    def set_after(self, name, value, time, after_status, guid):
+    def provision(self, filters = None):
         pass
 
     def start(self):
-        pass
+        if not self._state in [ResourceState.DEPLOYED, ResourceState.STOPPED]:
+            self.logger.error("Wrong state %s for start" % self.state)
+
+        self._start_time = strfnow()
+        self._state = ResourceState.STARTED
 
     def stop(self):
-        pass
+        if not self._state in [ResourceState.STARTED]:
+            self.logger.error("Wrong state %s for stop" % self.state)
 
-    def deploy(self, group = None):
-        pass
+        self._stop_time = strfnow()
+        self._state = ResourceState.STOPPED
+
+    def set(self, name, value):
+        attr = self._attrs[name]
+        attr.value = value
+
+    def get(self, name):
+        attr = self._attrs[name]
+        return attr.value
+
+    def register_condition(self, action, group, state, 
+            time = None):
+        if action not in self.conditions:
+            self._conditions[action] = set()
+
+        self.conditions.get(action).add((group, state, time))
+
+    def _needs_reschedule(self, group, state, time):
+        reschedule = False
+        delay = _reschedule_delay 
+
+        # check state and time elapsed on all RMs
+        for guid in group:
+            rm = self.ec.get_resource(guid)
+            # If the RMs is lower than the requested state we must
+            # reschedule (e.g. if RM is DEPLOYED but we required STARTED)
+            if rm.state < state:
+                reschedule = True
+                break
+
+            if time:
+                if state == ResourceAction.START:
+                    t = rm.start_time
+                elif state == ResourceAction.STOP:
+                    t = rm.stop_time
+                else:
+                    # Only keep time information for START and STOP
+                    break
+
+                delay = strfdiff(t, strnow()) 
+                if delay < time:
+                    reschedule = True
+                    break
+
+        return reschedule, delay
+
+    def set_with_conditions(self, name, value, group, state, time):
+        reschedule = False
+        delay = _reschedule_delay 
+
+        ## evaluate if set conditions are met
+
+        # only can set with conditions after the RM is started
+        if self.status != ResourceStatus.STARTED:
+            reschedule = True
+        else:
+            reschedule, delay = self._needs_reschedule(group, state, time)
+
+        if reschedule:
+            callback = functools.partial(self.set_with_conditions, 
+                    name, value, group, state, time)
+            self.ec.schedule(delay, callback)
+        else:
+            self.set(name, value)
+
+    def start_with_conditions(self):
+        reschedule = False
+        delay = _reschedule_delay 
+
+        ## evaluate if set conditions are met
+
+        # only can start when RM is either STOPPED or DEPLOYED
+        if self.status not in [ResourceStatus.STOPPED, ResourceStatus.DEPLOYED]:
+            reschedule = True
+        else:
+            for action, (group, state, time) in self.conditions.iteritems():
+                if action == ResourceAction.START:
+                    reschedule, delay = self._needs_reschedule(group, state, time)   
+                    if reschedule:
+                        break
+
+        if reschedule:
+            callback = functools.partial(self.start_with_conditions, 
+                    group, state, time)
+            self.ec.schedule(delay, callback)
+        else:
+            self.start()
+
+    def stop_with_conditions(self):
+        reschedule = False
+        delay = _reschedule_delay 
+
+        ## evaluate if set conditions are met
+
+        # only can start when RM is either STOPPED or DEPLOYED
+        if self.status != ResourceStatus.STARTED:
+            reschedule = True
+        else:
+            for action, (group, state, time) in self.conditions.iteritems():
+                if action == ResourceAction.STOP:
+                    reschedule, delay = self._needs_reschedule(group, state, time)   
+                    if reschedule:
+                        break
+
+        if reschedule:
+            callback = functools.partial(self.stop_with_conditions, 
+                    group, state, time)
+            self.ec.schedule(delay, callback)
+        else:
+            self.stop()
+
+    def deploy(self):
+        self.discover()
+        self.provision()
+        self._state = ResourceState.DEPLOYED
 
     def release(self):
-        pass
+        self._state = ResourceState.RELEASED
 
     def _validate_connection(self, guid):
         # TODO: Validate!
index 1ffcdf5..de4f850 100644 (file)
-from neco.execution import tags
-from neco.execution.resource import ResourceManager
+from neco.execution.attribute import Attribute, Flags
+from neco.execution.resource import ResourceManager, clsinit, ResourceState
+from neco.resources.linux.ssh_api import SSHApiFactory
 
-import cStringIO
 import logging
 
-class Application(ResourceManager):
-    def __init__(self, box, ec):
-        super(Application, self).__init__(box, ec)
-        self.command = None
-        self.pid = None
-        self.ppid = None
-        self.stdin = None
-        self.del_app_home = True
-        self.env = None
-        
-        self.app_home = "${HOME}/app-%s" % self.box.guid
+@clsinit
+class LinuxApplication(ResourceManager):
+    _rtype = "LinuxApplication"
+
+    @classmethod
+    def _register_attributes(cls):
+        command = Attribute("command", "Command to execute", 
+                flags = Flags.ReadOnly)
+        env = Attribute("env", "Environment variables string for command execution",
+                flags = Flags.ReadOnly)
+        sudo = Attribute("sudo", "Run with root privileges", 
+                flags = Flags.ReadOnly)
+        depends = Attribute("depends", 
+                "Space-separated list of packages required to run the application",
+                flags = Flags.ReadOnly)
+        sources = Attribute("sources", 
+                "Space-separated list of regular files to be deployed in the working "
+                "path prior to building. Archives won't be expanded automatically.",
+                flags = Flags.ReadOnly)
+        build = Attribute("build", 
+                "Build commands to execute after deploying the sources. "
+                "Sources will be in the ${SOURCES} folder. "
+                "Example: tar xzf ${SOURCES}/my-app.tgz && cd my-app && ./configure && make && make clean.\n"
+                "Try to make the commands return with a nonzero exit code on error.\n"
+                "Also, do not install any programs here, use the 'install' attribute. This will "
+                "help keep the built files constrained to the build folder (which may "
+                "not be the home folder), and will result in faster deployment. Also, "
+                "make sure to clean up temporary files, to reduce bandwidth usage between "
+                "nodes when transferring built packages.",
+                flags = Flags.ReadOnly)
+        install = Attribute("install", 
+                "Commands to transfer built files to their final destinations. "
+                "Sources will be in the initial working folder, and a special "
+                "tag ${SOURCES} can be used to reference the experiment's "
+                "home folder (where the application commands will run).\n"
+                "ALL sources and targets needed for execution must be copied there, "
+                "if building has been enabled.\n"
+                "That is, 'slave' nodes will not automatically get any source files. "
+                "'slave' nodes don't get build dependencies either, so if you need "
+                "make and other tools to install, be sure to provide them as "
+                "actual dependencies instead.",
+                flags = Flags.ReadOnly)
+        stdin = Attribute("stdin", "Standard input", flags = Flags.ReadOnly)
+        stdout = Attribute("stdout", "Standard output", flags = Flags.ReadOnly)
+        stderr = Attribute("stderr", "Standard error", flags = Flags.ReadOnly)
+
+        tear_down = Attribute("tearDown", "Bash script to be executed before
+                releasing the resource", flags = Flags.ReadOnly)
+
+        cls._register_attribute(command)
+        cls._register_attribute(env)
+        cls._register_attribute(sudo)
+        cls._register_attribute(depends)
+        cls._register_attribute(sources)
+        cls._register_attribute(build)
+        cls._register_attribute(install)
+        cls._register_attribute(stdin)
+        cls._register_attribute(stdout)
+        cls._register_attribute(stderr)
+        cls._register_attribute(tear_down)
+
+    def __init__(self, ec, guid):
+        super(LinuxApplication, self).__init__(ec, guid)
+        self._pid = None
+        self._ppid = None
+        self._home = "${HOME}/app-%s" % self.box.guid
         self._node = None
-       
-        # Logging
-        loglevel = "debug"
-        self._logger = logging.getLogger("neco.resources.base.Application.%s" % self.guid)
-        self._logger.setLevel(getattr(logging, loglevel.upper()))
+
+        self._logger = logging.getLogger("neco.linux.Application.%d" % guid)
+
+    @property
+    def api(self):
+        return self.node.api
 
     @property
     def node(self):
-        if self._node:
-            return self._node
+        self._node
 
-        # XXX: What if it is connected to more than one node?
-        resources = self.find_resources(exact_tags = [tags.NODE])
-        self._node = resources[0] is len(resources) == 1 else None
-        return self._node
+    @property
+    def home(self):
+        return self._home
 
-    def make_app_home(self):
-        self.node.mkdir(self.app_home)
+    @property
+    def pid(self):
+        return self._pid
 
-        if self.stdin:
-            self.node.upload(self.stdin, os.path.join(self.app_home, 'stdin'))
+    @property
+    def ppid(self):
+        return self._ppid
 
-    def cleanup(self):
-        self.kill()
+    def provision(self, filters = None):
+        # clean home
+        # upload
+        # build  
+        # Install stuff!!
+        pass
 
-    def run(self):
-        dst = os.path.join(self.app_home, "app.sh")
+    def start(self):
+        dst = os.path.join(self.home, "app.sh")
         
         # Create shell script with the command
         # This way, complex commands and scripts can be ran seamlessly
         # sync files
         cmd = ""
-        if self.env:
+        env = self.get("env")
+        if env:
             for envkey, envvals in env.iteritems():
                 for envval in envvals:
                     cmd += 'export %s=%s\n' % (envkey, envval)
 
-        cmd += self.command
-        self.node.upload(cmd, dst)
+        cmd += self.get("command")
+        self.api.upload(cmd, dst)
 
         command = 'bash ./app.sh'
-        stdin = 'stdin' if self.stdin else None
-        self.node.run(command, self.app_home, stdin = stdin)
-        self.pid, self.ppid = self.node.checkpid(self.app_home)
+        stdin = 'stdin' if self.get("stdin") else None
+        self.api.run(command, self.home, stdin = stdin)
+        self._pid, self._ppid = self.api.checkpid(self.app_home)
+
+    def stop(self):
+        self._state = ResourceState.STOPPED
+
+    def release(self):
+        tear_down = self.get("tearDown")
+        if tear_down:
+            self.api.execute(tear_down)
+
+        return self.api.kill(self.pid, self.ppid)
 
     def status(self):
-        return self.node.status(self.pid, self.ppid)
+        return self.api.status(self.pid, self.ppid)
+
+    def make_app_home(self):
+        self.api.mkdir(self.home)
+
+        stdin = self.get("stdin")
+        if stdin:
+            self.api.upload(stdin, os.path.join(self.home, 'stdin'))
+
+    def _validate_connection(self, guid):
+        # TODO: Validate!
+        return True
+        # XXX: What if it is connected to more than one node?
+        resources = self.find_resources(exact_tags = [tags.NODE])
+        self._node = resources[0] is len(resources) == 1 else None
+        return self._node
+
 
-    def kill(self):
-        return self.node.kill(self.pid, self.ppid)
 
index 80bb103..0424f1e 100644 (file)
@@ -1,5 +1,8 @@
-from neco.execution.resource import ResourceManager, clsinit
 from neco.execution.attribute import Attribute, Flags
+from neco.execution.resource import ResourceManager, clsinit, ResourceState
+from neco.resources.linux.ssh_api import SSHApiFactory
+
+import logging
 
 @clsinit
 class LinuxNode(ResourceManager):
@@ -10,42 +13,93 @@ class LinuxNode(ResourceManager):
         hostname = Attribute("hostname", "Hostname of the machine")
         username = Attribute("username", "Local account username", 
                 flags = Flags.Credential)
-        password = Attribute("pasword", "Local account password",
+        identity = Attribute("identity", "SSH identity file",
                 flags = Flags.Credential)
+        clean_home = Attribute("cleanHome", "Remove all files and directories 
+                from home folder before starting experiment", 
+                flags = Flags.ReadOnly)
+        clean_processes = Attribute("cleanProcesses", 
+                "Kill all running processes before starting experiment", 
+                flags = Flags.ReadOnly)
+        tear_down = Attribute("tearDown", "Bash script to be executed before
+                releasing the resource", flags = Flags.ReadOnly)
 
         cls._register_attribute(hostname)
         cls._register_attribute(username)
-        cls._register_attribute(password)
+        cls._register_attribute(identity)
+        cls._register_attribute(clean_home)
+        cls._register_attribute(clean_processes)
+        cls._register_attribute(tear_down)
 
     def __init__(self, ec, guid):
         super(LinuxNode, self).__init__(ec, guid)
 
         self._logger = logging.getLogger("neco.linux.Node.%d" % guid)
-        #elf._logger.setLevel(neco.LOGLEVEL)
-
-    def deploy(self):
-        pass
 
-    def discover(self, filters):
-        pass
+    def provision(self, filters = None):
+        if not self.api.is_alive():
+            self._state = ResourceState.FAILED
+            self.logger.error("Deploy failed. Unresponsive node")
+            return
+        
+        if self.get("cleanProcesses"):
+            self._clean_processes()
 
-    def provision(self, filters):
-        pass
+        if self.get("cleanHome"):
+            # self._clean_home() -> this is dangerous
+            pass
 
-    def start(self):
-        pass
-
-    def stop(self):
-        pass
-
-    def deploy(self, group = None):
-        pass
+    def deploy(self):
+        self.provision()
+        super(LinuxNode, self).deploy()
 
     def release(self):
-        pass
+        tear_down = self.get("tearDown")
+        if tear_down:
+            self.api.execute(tear_down)
+
+        super(LinuxNode, self).release()
 
     def _validate_connection(self, guid):
         # TODO: Validate!
         return True
 
+    @property
+    def api(self):
+        host = self.get("host")
+        user = self.get("user")
+        identity = self.get("identity")
+        return SSHApiFactory.get_api(host, user, identity)
+
+    def _clean_processes(self):
+        hostname = self.get("hostname")
+        self.logger.info("Cleaning up processes on %s", hostname)
+        
+        cmds = [
+            "sudo -S killall python tcpdump || /bin/true ; "
+            "sudo -S killall python tcpdump || /bin/true ; "
+            "sudo -S kill $(ps -N -T -o pid --no-heading | grep -v $PPID | sort) || /bin/true ",
+            "sudo -S killall -u root || /bin/true ",
+            "sudo -S killall -u root || /bin/true ",
+        ]
+
+        api = self.api
+        for cmd in cmds:
+            out, err = api.execute(cmd)
+            if err:
+                self.logger.error(err)
+            
+    def _clean_home(self):
+        hostname = self.get("hostname")
+        self.logger.info("Cleaning up home on %s", hostname)
+
+         cmds = [
+            "find . -maxdepth 1 ! -name '.bash*' ! -name '.' -execdir rm -rf {} + "
+        ]
+
+        api = self.api
+        for cmd in cmds:
+            out, err = api.execute(cmd)
+            if err:
+                self.logger.error(err)
 
index 7bd3d56..b62c640 100644 (file)
@@ -5,7 +5,7 @@ _strf = "%Y%m%d%H%M%S%f"
 _reabs = re.compile("^\d{20}$")
 _rerel = re.compile("^(?P<time>\d+(.\d+)?)(?P<units>h|m|s|ms|us)$")
 
-# Work around to fix "ImportError: Failed to import _strptime because the import lockis held by another thread."
+# Work around to fix "ImportError: Failed to import _strptime because the import lock is held by another thread."
 datetime.datetime.strptime("20120807124732894211", _strf)
 
 def strfnow():
index 202a148..15b6e22 100755 (executable)
@@ -26,8 +26,7 @@ class AnotherResource(ResourceManager):
 class EC(object):
     pass
 
-
-class ResourceTestCase(unittest.TestCase):
+class ResourceFactoryTestCase(unittest.TestCase):
     def test_add_resource_factory(self):
         ResourceFactory.register_type(MyResource)
         ResourceFactory.register_type(AnotherResource)
@@ -41,11 +40,20 @@ class ResourceTestCase(unittest.TestCase):
         self.assertEquals(AnotherResource.rtype(), "AnotherResource")
         self.assertEquals(len(AnotherResource._attributes), 0)
 
-        #self.assertEquals(OmfNode.rtype(), "OmfNode")
-        #self.assertEquals(len(OmfNode._attributes), 0)
-
         self.assertEquals(len(ResourceFactory.resource_types()), 2)
 
+# TODO:!!!
+class ResourceManagerTestCase(unittest.TestCase):
+    def test_start_with_condition(self):
+        pass
+    
+    def test_stop_with_condition(self):
+        pass
+
+    def test_set_with_condition(self):
+        pass
+
+
 if __name__ == '__main__':
     unittest.main()