Adding trace Collector RM
authorAlina Quereilhac <alina.quereilhac@inria.fr>
Fri, 28 Jun 2013 06:34:48 +0000 (23:34 -0700)
committerAlina Quereilhac <alina.quereilhac@inria.fr>
Fri, 28 Jun 2013 06:34:48 +0000 (23:34 -0700)
24 files changed:
examples/linux/ccn/ccncat_extended_ring_topo.py
examples/omf/manual_vlc_experiment_plexus.py
setup.py
src/nepi/execution/ec.py
src/nepi/execution/resource.py
src/nepi/resources/all/__init__.py [new file with mode: 0644]
src/nepi/resources/all/collector.py [new file with mode: 0644]
src/nepi/resources/linux/application.py
src/nepi/resources/linux/ccn/ccnapplication.py [new file with mode: 0644]
src/nepi/resources/linux/ccn/ccncontent.py
src/nepi/resources/linux/ccn/ccnd.py
src/nepi/resources/linux/ccn/ccnr.py
src/nepi/resources/linux/ccn/fibentry.py
src/nepi/resources/linux/interface.py
src/nepi/resources/linux/node.py
src/nepi/resources/linux/rpmfuncs.py
src/nepi/resources/omf/application.py
src/nepi/resources/omf/channel.py
src/nepi/resources/omf/interface.py
src/nepi/resources/omf/node.py
src/nepi/resources/omf/omf_api.py
src/nepi/resources/planetlab/node.py
src/nepi/util/timefuncs.py
test/execution/resource.py

index 8c32436..08625ea 100755 (executable)
@@ -48,14 +48,13 @@ from optparse import OptionParser, SUPPRESS_HELP
 
 import os
 import time
-import tempfile
 
 def add_node(ec, host, user, ssh_key = None):
     node = ec.register_resource("LinuxNode")
     ec.set(node, "hostname", host)
     ec.set(node, "username", user)
     ec.set(node, "identity", ssh_key)
-    ec.set(node, "cleanHome", True)
+    #ec.set(node, "cleanHome", True)
     ec.set(node, "cleanProcesses", True)
     return node
 
@@ -94,6 +93,13 @@ def add_stream(ec, ccnd, content_name):
 
     return app
 
+def add_collector(ec, trace_name, store_dir):
+    collector = ec.register_resource("Collector")
+    ec.set(collector, "traceName", trace_name)
+    ec.set(collector, "storeDir", store_dir)
+
+    return collector
+
 def get_options():
     pl_slice = os.environ.get("PL_SLICE")
 
@@ -103,7 +109,7 @@ def get_options():
     default_key = default_key if os.path.exists(default_key) else None
     pl_ssh_key = os.environ.get("PL_SSHKEY", default_key)
 
-    usage = "usage: %prog -s <pl-user> -m <movie> -c -e <exp-id> -i <ssh_key>"
+    usage = "usage: %prog -s <pl-user> -m <movie> -e <exp-id> -i <ssh_key> -r <results"
 
     parser = OptionParser(usage=usage)
     parser.add_option("-s", "--pl-user", dest="pl_user", 
@@ -115,40 +121,50 @@ def get_options():
     parser.add_option("-i", "--pl-ssh-key", dest="pl_ssh_key", 
             help="Path to private SSH key to be used for connection", 
             default = pl_ssh_key, type="str")
+    parser.add_option("-r", "--results", dest="results", default = "/tmp",  
+            help="Path to directory where to store results", type="str") 
 
     (options, args) = parser.parse_args()
 
     if not options.movie:
         parser.error("movie is a required argument")
 
-    return (options.pl_user, options.movie, options.exp_id, options.pl_ssh_key)
-
+    return (options.pl_user, options.movie, options.exp_id, options.pl_ssh_key,
+            options.results)
 
 if __name__ == '__main__':
     content_name = "ccnx:/test/VIDEO"
     
-    ( pl_user, movie, exp_id, pl_ssh_key ) = get_options()
+    ( pl_user, movie, exp_id, pl_ssh_key, results_dir ) = get_options()
 
     # Search for available RMs
     populate_factory()
     
     ec = ExperimentController(exp_id = exp_id)
     
-    # hosts
-    host1 = "planetlab2.u-strasbg.fr"
-    host2 = "planet1.servers.ua.pt"
-    host3 = "planetlab1.cs.uoi.gr"
-    host4 = "planetlab1.aston.ac.uk"
-    host5 = "planetlab2.willab.fi"
-    host6 = "planetlab-1.fokus.fraunhofer.de"
-    
+    # hosts in Europe
+    #host1 = "planetlab2.u-strasbg.fr"
+    #host2 = "planet1.servers.ua.pt"
+    #host3 = "planetlab1.cs.uoi.gr"
+    #host4 = "planetlab1.aston.ac.uk"
+    #host5 = "planetlab2.willab.fi"
+    #host6 = "planetlab-1.fokus.fraunhofer.de"
+
+    # host in the US
+    host1 = "planetlab4.wail.wisc.edu"
+    host2 = "planetlab2.cs.columbia.edu"
+    host3 = "ricepl-2.cs.rice.edu"
+    host4 = "node1.planetlab.albany.edu"
+    host5 = "earth.cs.brown.edu"
+    host6 = "planetlab2.engr.uconn.edu"
+
     # describe nodes in the central ring
     ring_hosts = [host1, host2, host3, host4]
     ccnds = dict()
 
     for i in xrange(len(ring_hosts)):
         host = ring_hosts[i]
-        node = add_node(ec, host, pl_user)
+        node = add_node(ec, host, pl_user, pl_ssh_key)
         ccnd = add_ccnd(ec, node)
         ccnr = add_ccnr(ec, ccnd)
         ccnds[host] = ccnd
@@ -175,14 +191,14 @@ if __name__ == '__main__':
     l5d = add_fib_entry(ec, ccnds[host3], host1)
     
     # border node 1
-    bnode1 = add_node(ec, host5, pl_user)
+    bnode1 = add_node(ec, host5, pl_user, pl_ssh_key)
     ccndb1 = add_ccnd(ec, bnode1)
     ccnrb1 = add_ccnr(ec, ccndb1)
     ccnds[host5] = ccndb1
     co = add_content(ec, ccnrb1, content_name, movie)
 
     # border node 2
-    bnode2 = add_node(ec, host6, pl_user)
+    bnode2 = add_node(ec, host6, pl_user, pl_ssh_key)
     ccndb2 = add_ccnd(ec, bnode2)
     ccnrb2 = add_ccnr(ec, ccndb2)
     ccnds[host6] = ccndb2
@@ -201,9 +217,15 @@ if __name__ == '__main__':
     ec.register_condition(l5d, ResourceAction.STOP, 
             app, ResourceState.STARTED, time = "10s")
  
+    # Register a collector to automatically collect traces
+    collector = add_collector(ec, "stderr", results_dir)
+    for ccnd in ccnds.values():
+        ec.register_connection(collector, ccnd)
+
     # deploy all ResourceManagers
     ec.deploy()
 
+    # Wait until ccncat has started retrieving the content
     ec.wait_started([app])
 
     rvideo_path = ec.trace(app, "stdout", attr = TraceAttr.PATH)
@@ -229,18 +251,7 @@ if __name__ == '__main__':
         stdout=subprocess.PIPE, 
         stderr=subprocess.PIPE)
 
-    (stdout, stderr) = proc2.communicate() 
-
-    dirpath = tempfile.mkdtemp()
-    print "Storing to DIRPATH ", dirpath
-
-    for ccnd in ccnds.values():
-        stdout = ec.trace(ccnd, "stderr")
-        fname = "log-%d" % ccnd
-        path = os.path.join(dirpath, fname)
-        f = open(path, "w")
-        f.write(stdout)
-        f.close()
+    (stdout, stderr) = proc2.communicate()
 
     # shutdown the experiment controller
     ec.shutdown()
index 472fe76..1b9ec40 100755 (executable)
@@ -125,27 +125,27 @@ app3.set('xmppHost', "xmpp-plexus.onelab.eu")
 app3.set('xmppPort', "5222")
 app3.set('xmppPassword', "1234")
 
-# Connection
-app3.connect(node1.guid)
-node1.connect(app3.guid)
+# register_connection
+app3.register_connection(node1.guid)
+node1.register_connection(app3.guid)
 
-app1.connect(node1.guid)
-node1.connect(app1.guid)
+app1.register_connection(node1.guid)
+node1.register_connection(app1.guid)
 
-node1.connect(iface1.guid)
-iface1.connect(node1.guid)
+node1.register_connection(iface1.guid)
+iface1.register_connection(node1.guid)
 
-iface1.connect(channel.guid)
-channel.connect(iface1.guid)
+iface1.register_connection(channel.guid)
+channel.register_connection(iface1.guid)
 
-channel.connect(iface2.guid)
-iface2.connect(channel.guid)
+channel.register_connection(iface2.guid)
+iface2.register_connection(channel.guid)
 
-iface2.connect(node2.guid)
-node2.connect(iface2.guid)
+iface2.register_connection(node2.guid)
+node2.register_connection(iface2.guid)
 
-node2.connect(app2.guid)
-app2.connect(node2.guid)
+node2.register_connection(app2.guid)
+app2.register_connection(node2.guid)
 
 # Local Deploy
 node1.deploy()
index 72ebaa1..b1c6d4c 100755 (executable)
--- a/setup.py
+++ b/setup.py
@@ -15,6 +15,7 @@ setup(
             "nepi.design",
             "nepi.execution",
             "nepi.resources",
+            "nepi.resources.all",
             "nepi.resources.linux",
             "nepi.resources.linux.ccn",
             "nepi.resources.netns",
index cc636a5..c323e36 100644 (file)
@@ -251,8 +251,8 @@ class ExperimentController(object):
         rm1 = self.get_resource(guid1)
         rm2 = self.get_resource(guid2)
 
-        rm1.connect(guid2)
-        rm2.connect(guid1)
+        rm1.register_connection(guid2)
+        rm2.register_connection(guid1)
 
     def register_condition(self, group1, action, group2, state,
             time = None):
index 0fcc198..3882523 100644 (file)
@@ -28,7 +28,7 @@ import os
 import pkgutil
 import weakref
 
-reschedule_delay = "0.5s"
+reschedule_delay = "1s"
 
 class ResourceAction:
     """ Action that a user can order to a Resource Manager
@@ -288,22 +288,24 @@ class ResourceManager(Logger):
         """
         return " %s guid: %d - %s " % (self._rtype, self.guid, msg)
 
-    def connect(self, guid):
-        """ Establishes a connection to the RM identified by guid
+    def register_connection(self, guid):
+        """ Registers a connection to the RM identified by guid
 
         :param guid: Global unique identified of the RM to connect to
         :type guid: int
         """
         if self.valid_connection(guid):
+            self.connect(guid)
             self._connections.add(guid)
 
-    def disconnect(self, guid):
-        """ Removes connection to the RM identified by guid
+    def unregister_connection(self, guid):
+        """ Removes a registered connection to the RM identified by guid
 
         :param guid: Global unique identified of the RM to connect to
         :type guid: int
         """
         if guid in self._connections:
+            self.disconnect(guid)
             self._connections.remove(guid)
 
     def discover(self):
@@ -406,8 +408,7 @@ class ResourceManager(Logger):
         """
         pass
 
-    def register_condition(self, action, group, state, 
-            time = None):
+    def register_condition(self, action, group, state, time = None):
         """ Registers a condition on the resource manager to allow execution 
         of 'action' only after 'time' has elapsed from the moment all resources 
         in 'group' reached state 'state'
@@ -422,10 +423,11 @@ class ResourceManager(Logger):
         :type time: str
 
         """
+
+        if not action in self.conditions:
+            self._conditions[action] = list()
+        
         conditions = self.conditions.get(action)
-        if not conditions:
-            conditions = list()
-            self._conditions[action] = conditions
 
         # For each condition to register a tuple of (group, state, time) is 
         # added to the 'action' list
@@ -434,6 +436,37 @@ class ResourceManager(Logger):
 
         conditions.append((group, state, time))
 
+    def unregister_condition(self, group, action = None):
+        """ Removed conditions for a certain group of guids
+
+        :param action: Action to restrict to condition (either 'START' or 'STOP')
+        :type action: str
+
+        :param group: Group of RMs to wait for (list of guids)
+        :type group: int or list of int
+
+        """
+        # For each condition a tuple of (group, state, time) is 
+        # added to the 'action' list
+        if not isinstance(group, list):
+            group = [group]
+
+        for act, conditions in self.conditions.iteritems():
+            if action and act != action:
+                continue
+
+            for condition in list(conditions):
+                (grp, state, time) = condition
+
+                # If there is an intersection between grp and group,
+                # then remove intersected elements
+                intsec = set(group).intersection(set(grp))
+                if intsec:
+                    idx = conditions.index(condition)
+                    newgrp = set(grp)
+                    newgrp.difference_update(intsec)
+                    conditions[idx] = (newgrp, state, time)
+                 
     def get_connected(self, rtype = None):
         """ Returns the list of RM with the type 'rtype'
 
@@ -471,7 +504,7 @@ class ResourceManager(Logger):
         for guid in group:
             rm = self.ec.get_resource(guid)
             # If the RM state is lower than the requested state we must
-            # reschedule (e.g. if RM is READY but we required STARTED)
+            # reschedule (e.g. if RM is READY but we required STARTED).
             if rm.state < state:
                 reschedule = True
                 break
@@ -499,7 +532,7 @@ class ResourceManager(Logger):
                 # time still to wait
                 wait = tdiffsec(stabsformat(time), stabsformat(waited))
 
-                if wait > 0:
+                if wait > 0.001:
                     reschedule = True
                     delay = "%fs" % wait
                     break
@@ -643,6 +676,18 @@ class ResourceManager(Logger):
         self._failed_time = tnow()
         self._state = ResourceState.FAILED
 
+    def connect(self, guid):
+        """ Performs actions that need to be taken upon associating RMs.
+        This method should be redefined when necessary in child classes.
+        """
+        pass
+
+    def disconnect(self, guid):
+        """ Performs actions that need to be taken upon disassociating RMs.
+        This method should be redefined when necessary in child classes.
+        """
+        pass
+
     def valid_connection(self, guid):
         """Checks whether a connection with the other RM
         is valid.
diff --git a/src/nepi/resources/all/__init__.py b/src/nepi/resources/all/__init__.py
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/src/nepi/resources/all/collector.py b/src/nepi/resources/all/collector.py
new file mode 100644 (file)
index 0000000..a9f7667
--- /dev/null
@@ -0,0 +1,121 @@
+#
+#    NEPI, a framework to manage network experiments
+#    Copyright (C) 2013 INRIA
+#
+#    This program is free software: you can redistribute it and/or modify
+#    it under the terms of the GNU General Public License as published by
+#    the Free Software Foundation, either version 3 of the License, or
+#    (at your option) any later version.
+#
+#    This program is distributed in the hope that it will be useful,
+#    but WITHOUT ANY WARRANTY; without even the implied warranty of
+#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+#    GNU General Public License for more details.
+#
+#    You should have received a copy of the GNU General Public License
+#    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+#
+# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
+
+from nepi.execution.attribute import Attribute, Flags, Types
+from nepi.execution.trace import Trace, TraceAttr
+from nepi.execution.resource import ResourceManager, clsinit, ResourceState, \
+        ResourceAction
+from nepi.util.sshfuncs import ProcStatus
+from nepi.util.timefuncs import tsformat
+
+import os
+import tempfile
+
+@clsinit
+class Collector(ResourceManager):
+    """ The collector is reponsible of collecting traces
+    of a same type associated to RMs into a local directory.
+
+    .. class:: Class Args :
+
+        :param ec: The Experiment controller
+        :type ec: ExperimentController
+        :param guid: guid of the RM
+        :type guid: int
+        :param creds: Credentials to communicate with the rm (XmppClient)
+        :type creds: dict
+
+    """
+    _rtype = "Collector"
+
+    @classmethod
+    def _register_attributes(cls):
+        trace_name = Attribute("traceName", "Name of the trace to be collected", 
+                flags = Flags.ExecReadOnly)
+        store_dir = Attribute("storeDir", "Path to local directory to store trace results", 
+                default = tempfile.gettempdir(),
+                flags = Flags.ExecReadOnly)
+
+        cls._register_attribute(trace_name)
+        cls._register_attribute(store_dir)
+
+    def __init__(self, ec, guid):
+        super(Collector, self).__init__(ec, guid)
+        self._store_path =  None
+
+    @property
+    def store_path(self):
+        return self._store_path
+    
+    def provision(self):
+        trace_name = self.get("traceName")
+        if not trace_name:
+            self.fail()
+            
+            msg = "No traceName was specified"
+            self.error(msg)
+            raise RuntimeError, msg
+
+        store_dir = self.get("storeDir")
+        timestamp = tsformat()
+        self._store_path = os.path.join(store_dir, self.ec.exp_id, timestamp)
+        
+        msg = "Creating local directory at %s to store %s traces " % (
+            store_dir, trace_name)
+        self.info(msg)
+
+        try:
+            os.makedirs(self.store_path)
+        except OSError:
+            pass
+
+        super(Collector, self).provision()
+
+    def deploy(self):
+        try:
+            self.discover()
+            self.provision()
+        except:
+            self.fail()
+            raise
+
+        super(Collector, self).deploy()
+
+    def release(self):
+        trace_name = self.get("traceName")
+
+        msg = "Collecting '%s' traces to local directory %s" % (
+            trace_name, self.store_path)
+        self.info(msg)
+
+        rms = self.get_connected()
+        for rm in rms:
+            result = self.ec.trace(rm.guid, trace_name)
+            fpath = os.path.join(self.store_path, "%d.%s" % (rm.guid, 
+                trace_name))
+            f = open(fpath, "w")
+            f.write(result)
+            f.close()
+
+        super(Collector, self).release()
+
+    def valid_connection(self, guid):
+        # TODO: Validate!
+        return True
+
index dc50230..6c7f82c 100644 (file)
@@ -19,7 +19,8 @@
 
 from nepi.execution.attribute import Attribute, Flags, Types
 from nepi.execution.trace import Trace, TraceAttr
-from nepi.execution.resource import ResourceManager, clsinit, ResourceState
+from nepi.execution.resource import ResourceManager, clsinit, ResourceState, \
+    reschedule_delay
 from nepi.resources.linux.node import LinuxNode
 from nepi.util.sshfuncs import ProcStatus
 from nepi.util.timefuncs import tnow, tdiffsec
@@ -375,8 +376,6 @@ class LinuxApplication(ResourceManager):
         node = self.node
         if not node or node.state < ResourceState.READY:
             self.debug("---- RESCHEDULING DEPLOY ---- node state %s " % self.node.state )
-            
-            reschedule_delay = "0.5s"
             self.ec.schedule(reschedule_delay, self.deploy)
         else:
             try:
diff --git a/src/nepi/resources/linux/ccn/ccnapplication.py b/src/nepi/resources/linux/ccn/ccnapplication.py
new file mode 100644 (file)
index 0000000..e5ae00e
--- /dev/null
@@ -0,0 +1,72 @@
+#
+#    NEPI, a framework to manage network experiments
+#    Copyright (C) 2013 INRIA
+#
+#    This program is free software: you can redistribute it and/or modify
+#    it under the terms of the GNU General Public License as published by
+#    the Free Software Foundation, either version 3 of the License, or
+#    (at your option) any later version.
+#
+#    This program is distributed in the hope that it will be useful,
+#    but WITHOUT ANY WARRANTY; without even the implied warranty of
+#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+#    GNU General Public License for more details.
+#
+#    You should have received a copy of the GNU General Public License
+#    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+#
+# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
+
+from nepi.execution.resource import clsinit_copy, ResourceState, \
+    ResourceAction
+from nepi.resources.linux.application import LinuxApplication
+from nepi.resources.linux.ccn.ccnd import LinuxCCND
+
+import os
+
+@clsinit_copy
+class LinuxCCNApplication(LinuxApplication):
+    _rtype = "LinuxCCNApplication"
+
+    def __init__(self, ec, guid):
+        super(LinuxCCNApplication, self).__init__(ec, guid)
+        self._home = "ccnapp-%s" % self.guid
+
+    @property
+    def ccnd(self):
+        ccnd = self.get_connected(LinuxCCND.rtype())
+        if ccnd: return ccnd[0]
+        return None
+
+    @property
+    def node(self):
+        if self.ccnd: return self.ccnd.node
+        return None
+
+    def deploy(self):
+        if not self.get("env"):
+            self.set("env", self._environment)
+
+        super(LinuxCCNApplication, self).deploy()
+
+    @property
+    def _environment(self):
+        env = "PATH=$PATH:${EXP_HOME}/ccnx/bin "
+        return env            
+       
+    def execute_command(self, command, env):
+        environ = self.node.format_environment(env, inline = True)
+        command = environ + command
+        command = self.replace_paths(command)
+
+        (out, err), proc = self.node.execute(command)
+
+        if proc.poll():
+            self._state = ResourceState.FAILED
+            self.error(msg, out, err)
+            raise RuntimeError, msg
+
+    def valid_connection(self, guid):
+        # TODO: Validate!
+        return True
+
index f608212..e5216d2 100644 (file)
@@ -19,7 +19,7 @@
 
 from nepi.execution.attribute import Attribute, Flags, Types
 from nepi.execution.resource import clsinit_copy, ResourceState, \
-    ResourceAction
+    ResourceAction, reschedule_delay
 from nepi.resources.linux.ccn.ccnapplication import LinuxCCNApplication
 from nepi.resources.linux.ccn.ccnr import LinuxCCNR
 from nepi.util.timefuncs import tnow
@@ -62,7 +62,6 @@ class LinuxCCNContent(LinuxCCNApplication):
         if not self.ccnr or self.ccnr.state < ResourceState.READY:
             self.debug("---- RESCHEDULING DEPLOY ---- node state %s " % self.node.state )
             
-            reschedule_delay = "0.5s"
             # ccnr needs to wait until ccnd is deployed and running
             self.ec.schedule(reschedule_delay, self.deploy)
         else:
index 6ebe5e3..44c1afb 100644 (file)
 
 from nepi.execution.attribute import Attribute, Flags, Types
 from nepi.execution.trace import Trace, TraceAttr
-from nepi.execution.resource import ResourceManager, clsinit_copy, ResourceState
+from nepi.execution.resource import ResourceManager, clsinit_copy, ResourceState, \
+    reschedule_delay
 from nepi.resources.linux.application import LinuxApplication
 from nepi.resources.linux.node import OSType
-from nepi.util.timefuncs import tnow, tdiff
+from nepi.util.timefuncs import tnow, tdiffsec
 
 import os
 
 # TODO: use ccndlogging to dynamically change the logging level
 
+
 @clsinit_copy
 class LinuxCCND(LinuxApplication):
     _rtype = "LinuxCCND"
@@ -129,8 +131,7 @@ class LinuxCCND(LinuxApplication):
         if not self.node or self.node.state < ResourceState.READY:
             self.debug("---- RESCHEDULING DEPLOY ---- node state %s " % self.node.state )
             
-            reschedule_delay = "0.5s"
-            # ccnr needs to wait until ccnd is deployed and running
+            # ccnd needs to wait until node is deployed and running
             self.ec.schedule(reschedule_delay, self.deploy)
         else:
             if not self.get("command"):
@@ -238,7 +239,7 @@ class LinuxCCND(LinuxApplication):
         # First check if the ccnd has failed
         state_check_delay = 0.5
         if self._state == ResourceState.STARTED and \
-                tdiff(tnow(), self._last_state_check) > state_check_delay:
+                tdiffsec(tnow(), self._last_state_check) > state_check_delay:
             (out, err), proc = self._ccndstatus
 
             retcode = proc.poll()
@@ -271,11 +272,11 @@ class LinuxCCND(LinuxApplication):
 
     @property
     def _dependencies(self):
-        if self.node.os in [ OSType.FEDORA_12 , OSType.FEDORA_14 ]:
+        if self.node.use_rpm:
             return ( " autoconf openssl-devel  expat-devel libpcap-devel "
                 " ecryptfs-utils-devel libxml2-devel automake gawk " 
                 " gcc gcc-c++ git pcre-devel make ")
-        elif self.node.os in [ OSType.UBUNTU , OSType.DEBIAN]:
+        elif self.node.use_deb:
             return ( " autoconf libssl-dev libexpat-dev libpcap-dev "
                 " libecryptfs0 libxml2-utils automake gawk gcc g++ "
                 " git-core pkg-config libpcre3-dev make ")
index f2e6c0e..cdf48e3 100644 (file)
@@ -20,7 +20,7 @@
 from nepi.execution.attribute import Attribute, Flags, Types
 from nepi.execution.trace import Trace, TraceAttr
 from nepi.execution.resource import clsinit_copy, ResourceState, \
-    ResourceAction
+    ResourceAction, reschedule_delay
 from nepi.resources.linux.ccn.ccnapplication import LinuxCCNApplication
 from nepi.resources.linux.ccn.ccnd import LinuxCCND
 from nepi.util.timefuncs import tnow
@@ -187,7 +187,6 @@ class LinuxCCNR(LinuxCCNApplication):
         if not self.ccnd or self.ccnd.state < ResourceState.READY:
             self.debug("---- RESCHEDULING DEPLOY ---- CCND state %s " % self.ccnd.state )
             
-            reschedule_delay = "0.5s"
             # ccnr needs to wait until ccnd is deployed and running
             self.ec.schedule(reschedule_delay, self.deploy)
         else:
index c669858..c6398fe 100644 (file)
 from nepi.execution.attribute import Attribute, Flags, Types
 from nepi.execution.trace import Trace, TraceAttr
 from nepi.execution.resource import clsinit_copy, ResourceState, \
-    ResourceAction
+    ResourceAction, reschedule_delay
 from nepi.resources.linux.ccn.ccnapplication import LinuxCCNApplication
 from nepi.util.timefuncs import tnow
 
 import os
 
-reschedule_delay = "0.5s"
 
 # TODO: Add rest of options for ccndc!!!
 #       Implement ENTRY DELETE!!
index 68fc0cd..68c46eb 100644 (file)
@@ -18,7 +18,8 @@
 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
 
 from nepi.execution.attribute import Attribute, Types, Flags
-from nepi.execution.resource import ResourceManager, clsinit, ResourceState
+from nepi.execution.resource import ResourceManager, clsinit, ResourceState, \
+        reschedule_delay
 from nepi.resources.linux.node import LinuxNode
 from nepi.resources.linux.channel import LinuxChannel
 
@@ -31,7 +32,6 @@ import time
 
 # TODO: UP, MTU attributes!
 
-reschedule_delay = "0.5s"
 
 @clsinit
 class LinuxInterface(ResourceManager):
index a96267f..0d2597f 100644 (file)
@@ -18,7 +18,8 @@
 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
 
 from nepi.execution.attribute import Attribute, Flags
-from nepi.execution.resource import ResourceManager, clsinit, ResourceState
+from nepi.execution.resource import ResourceManager, clsinit, ResourceState, \
+        reschedule_delay
 from nepi.resources.linux import rpmfuncs, debfuncs 
 from nepi.util import sshfuncs, execfuncs
 from nepi.util.sshfuncs import ProcStatus
@@ -36,7 +37,6 @@ import threading
 # TODO: Unify delays!!
 # TODO: Validate outcome of uploads!! 
 
-reschedule_delay = "0.5s"
 
 class ExitCode:
     """
@@ -52,6 +52,7 @@ class OSType:
     """
     Supported flavors of Linux OS
     """
+    FEDORA_8 = "f8"
     FEDORA_12 = "f12"
     FEDORA_14 = "f14"
     FEDORA = "fedora"
@@ -227,7 +228,9 @@ class LinuxNode(ResourceManager):
             self.error(msg, out, err)
             raise RuntimeError, "%s - %s - %s" %( msg, out, err )
 
-        if out.find("Fedora release 12") == 0:
+        if out.find("Fedora release 8") == 0:
+            self._os = OSType.FEDORA_8
+        elif out.find("Fedora release 12") == 0:
             self._os = OSType.FEDORA_12
         elif out.find("Fedora release 14") == 0:
             self._os = OSType.FEDORA_14
@@ -242,6 +245,15 @@ class LinuxNode(ResourceManager):
 
         return self._os
 
+    @property
+    def use_deb(self):
+        return self.os in [OSType.DEBIAN, OSType.UBUNTU]
+
+    @property
+    def use_rpm(self):
+        return self.os in [OSType.FEDORA_12, OSType.FEDORA_14, OSType.FEDORA_8,
+                OSType.FEDORA]
+
     @property
     def localhost(self):
         return self.get("hostname") in ['localhost', '127.0.0.7', '::1']
@@ -367,9 +379,9 @@ class LinuxNode(ResourceManager):
 
     def install_packages(self, packages, home):
         command = ""
-        if self.os in [OSType.FEDORA_12, OSType.FEDORA_14, OSType.FEDORA]:
+        if self.use_rpm:
             command = rpmfuncs.install_packages_command(self.os, packages)
-        elif self.os in [OSType.DEBIAN, OSType.UBUNTU]:
+        elif self.use_deb:
             command = debfuncs.install_packages_command(self.os, packages)
         else:
             msg = "Error installing packages ( OS not known ) "
@@ -389,9 +401,9 @@ class LinuxNode(ResourceManager):
 
     def remove_packages(self, packages, home):
         command = ""
-        if self.os in [OSType.FEDORA_12, OSType.FEDORA_14, OSType.FEDORA]:
+        if self.use_rpm:
             command = rpmfuncs.remove_packages_command(self.os, packages)
-        elif self.os in [OSType.DEBIAN, OSType.UBUNTU]:
+        elif self.use_deb:
             command = debfuncs.remove_packages_command(self.os, packages)
         else:
             msg = "Error removing packages ( OS not known ) "
index 5fa0592..5497a57 100644 (file)
@@ -26,7 +26,8 @@ def install_packages_command(os, packages):
     if not isinstance(packages, list):
         packages = [packages]
 
-    cmd = install_rpmfusion_command(os) + " && "
+    cmd = install_rpmfusion_command(os)
+    if cmd: cmd += " && "
     cmd += " && ".join(map(lambda p: 
             " { rpm -q %(package)s || sudo -S yum -y install %(package)s ; } " % {
                     'package': p}, packages))
@@ -51,11 +52,13 @@ def install_rpmfusion_command(os):
     cmd = " { rpm -q rpmfusion-free-release ||  sudo -S rpm -i %(package)s ; } "
 
     if os in [OSType.FEDORA, OSType.FEDORA_12]:
+        # For f12
         cmd =  cmd %  {'package': RPM_FUSION_URL_F12}
     elif os == OSType.FEDORA_14:
-        # This one works for f13+
+        # For f13+
         cmd = cmd %  {'package': RPM_FUSION_URL}
     else:
+        # Fedora 8 is unmaintained 
         cmd = ""
 
     return cmd
index 6da1160..170410b 100644 (file)
 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
 #         Julien Tribino <julien.tribino@inria.fr>
 
-from nepi.execution.resource import ResourceManager, clsinit, ResourceState
+from nepi.execution.resource import ResourceManager, clsinit, ResourceState, \
+        reschedule_delay
 from nepi.execution.attribute import Attribute, Flags 
 from nepi.resources.omf.omf_api import OMFAPIFactory
 
-reschedule_delay = "0.5s"
 
 @clsinit
 class OMFApplication(ResourceManager):
index fa8df03..8d8a5db 100644 (file)
 
 """
 
-from nepi.execution.resource import ResourceManager, clsinit, ResourceState
+from nepi.execution.resource import ResourceManager, clsinit, ResourceState, \
+        reschedule_delay
 from nepi.execution.attribute import Attribute, Flags 
 
 from nepi.resources.omf.omf_api import OMFAPIFactory
 
-reschedule_delay = "0.5s"
 
 @clsinit
 class OMFChannel(ResourceManager):
index 0e1dad9..b15be5e 100644 (file)
 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
 #         Julien Tribino <julien.tribino@inria.fr>
 
-from nepi.execution.resource import ResourceManager, clsinit, ResourceState
+from nepi.execution.resource import ResourceManager, clsinit, ResourceState, \
+        reschedule_delay
 from nepi.execution.attribute import Attribute, Flags 
 
 from nepi.resources.omf.omf_api import OMFAPIFactory
 
-reschedule_delay = "0.5s"
 
 @clsinit
 class OMFWifiInterface(ResourceManager):
index 4658cb5..fad525b 100644 (file)
 #         Julien Tribino <julien.tribino@inria.fr>
 
 
-from nepi.execution.resource import ResourceManager, clsinit, ResourceState
+from nepi.execution.resource import ResourceManager, clsinit, ResourceState, \
+        reschedule_delay
 from nepi.execution.attribute import Attribute, Flags 
 
 from nepi.resources.omf.omf_api import OMFAPIFactory
 
 import time
 
-reschedule_delay = "0.5s"
 
 @clsinit
 class OMFNode(ResourceManager):
index 3316f01..7d96cfb 100644 (file)
@@ -29,7 +29,7 @@ from nepi.util.logger import Logger
 from nepi.resources.omf.omf_client import OMFClient
 from nepi.resources.omf.messages_5_4 import MessageHandler
 
-from nepi.util.timefuncs import tsfromat 
+from nepi.util.timefuncs import tsformat 
 
 class OMFAPI(Logger):
     """
@@ -68,7 +68,7 @@ class OMFAPI(Logger):
         """
         super(OMFAPI, self).__init__("OMFAPI")
         
-        date = tsfromat()
+        date = tsformat()
         tz = -time.altzone if time.daylight != 0 else -time.timezone
         date += "%+06.2f" % (tz / 3600) # timezone difference is in seconds
         self._user = "%s-%s" % (slice, date)
index bdcc19f..25358a2 100644 (file)
 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
 
 from nepi.execution.attribute import Attribute, Flags, Types
-from nepi.execution.resource import ResourceManager, clsinit_copy, ResourceState
+from nepi.execution.resource import ResourceManager, clsinit_copy, ResourceState, \
+        reschedule_delay
 from nepi.resources.linux.node import LinuxNode
-
 from nepi.resources.planetlab.plcapi import PLCAPIFactory 
 
-reschedule_delay = "0.5s"
 
 @clsinit_copy
 class PlanetlabNode(LinuxNode):
index 625dd70..1a8e102 100644 (file)
@@ -35,7 +35,7 @@ def stformat(sdate):
     """
     return datetime.datetime.strptime(sdate, _strf).date()
 
-def tsfromat(date = None):
+def tsformat(date = None):
     """ Formats a datetime object to a string with format YYYYMMddHHMMSSffff.
     If no date is given, the current date is used.
     
index 7122b92..365ca08 100755 (executable)
@@ -21,7 +21,8 @@
 
 from nepi.execution.attribute import Attribute
 from nepi.execution.ec import ExperimentController 
-from nepi.execution.resource import ResourceManager, ResourceState, clsinit
+from nepi.execution.resource import ResourceManager, ResourceState, clsinit, \
+        ResourceAction
 
 import random
 import time
@@ -45,24 +46,7 @@ class AnotherResource(ResourceManager):
 
     def __init__(self, ec, guid):
         super(AnotherResource, self).__init__(ec, guid)
-     
-class ResourceFactoryTestCase(unittest.TestCase):
-    def test_add_resource_factory(self):
-        from nepi.execution.resource import ResourceFactory
-
-        ResourceFactory.register_type(MyResource)
-        ResourceFactory.register_type(AnotherResource)
-
-        self.assertEquals(MyResource.rtype(), "MyResource")
-        self.assertEquals(len(MyResource._attributes), 1)
-
-        self.assertEquals(ResourceManager.rtype(), "Resource")
-        self.assertEquals(len(ResourceManager._attributes), 0)
-
-        self.assertEquals(AnotherResource.rtype(), "AnotherResource")
-        self.assertEquals(len(AnotherResource._attributes), 0)
 
-        self.assertEquals(len(ResourceFactory.resource_types()), 2)
 
 class Channel(ResourceManager):
     _rtype = "Channel"
@@ -135,8 +119,56 @@ class Application(ResourceManager):
         super(Application, self).start()
         time.sleep(random.random() * 5)
         self._state = ResourceState.FINISHED
+   
+
+class ResourceFactoryTestCase(unittest.TestCase):
+    def test_add_resource_factory(self):
+        from nepi.execution.resource import ResourceFactory
+
+        ResourceFactory.register_type(MyResource)
+        ResourceFactory.register_type(AnotherResource)
+
+        self.assertEquals(MyResource.rtype(), "MyResource")
+        self.assertEquals(len(MyResource._attributes), 1)
+
+        self.assertEquals(ResourceManager.rtype(), "Resource")
+        self.assertEquals(len(ResourceManager._attributes), 0)
+
+        self.assertEquals(AnotherResource.rtype(), "AnotherResource")
+        self.assertEquals(len(AnotherResource._attributes), 0)
+
+        self.assertEquals(len(ResourceFactory.resource_types()), 2)
 
 class ResourceManagerTestCase(unittest.TestCase):
+    def test_register_condition(self):
+        ec = ExperimentController()
+        rm = ResourceManager(ec, 15)
+
+        group = [1,3,5,7]
+        rm.register_condition(ResourceAction.START, group,
+                ResourceState.STARTED)
+
+        group = [10,8]
+        rm.register_condition(ResourceAction.START,
+                group, ResourceState.STARTED, time = "10s")
+
+        waiting_for = []
+        conditions = rm.conditions.get(ResourceAction.START)
+        for (group, state, time) in conditions:
+            waiting_for.extend(group)
+
+        self.assertEquals(waiting_for, [1, 3, 5, 7, 10, 8])
+
+        group = [1, 2, 3, 4, 6]
+        rm.unregister_condition(group)
+
+        waiting_for = []
+        conditions = rm.conditions.get(ResourceAction.START)
+        for (group, state, time) in conditions:
+            waiting_for.extend(group)
+
+        self.assertEquals(waiting_for, [5, 7, 10, 8])
+
     def test_deploy_in_order(self):
         """
         Test scenario: 2 applications running one on 1 node each.