Code cleanup. Setting resource state through specific functions
authorAlina Quereilhac <alina.quereilhac@inria.fr>
Sun, 4 Aug 2013 18:48:40 +0000 (11:48 -0700)
committerAlina Quereilhac <alina.quereilhac@inria.fr>
Sun, 4 Aug 2013 18:48:40 +0000 (11:48 -0700)
19 files changed:
src/nepi/execution/ec.py
src/nepi/execution/resource.py
src/nepi/resources/linux/application.py
src/nepi/resources/linux/ccn/ccnapplication.py
src/nepi/resources/linux/ccn/ccncontent.py
src/nepi/resources/linux/ccn/ccnd.py
src/nepi/resources/linux/ccn/ccnr.py
src/nepi/resources/linux/ccn/fibentry.py
src/nepi/resources/linux/node.py
src/nepi/resources/linux/traceroute.py
src/nepi/resources/linux/udptest.py
src/nepi/resources/linux/udptunnel.py
src/nepi/resources/omf/application.py
src/nepi/resources/omf/channel.py
src/nepi/resources/omf/interface.py
src/nepi/resources/omf/node.py
src/nepi/resources/omf/omf_api.py
src/nepi/resources/planetlab/node.py
src/nepi/resources/planetlab/tap.py

index de013d1..6beb3d7 100644 (file)
 #
 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
 
-import functools
-import logging
-import os
-import random
-import sys
-import time
-import threading
-
 from nepi.util import guid
 from nepi.util.parallel import ParallelRun
 from nepi.util.timefuncs import tnow, tdiffsec, stabsformat, tsformat 
@@ -34,9 +26,16 @@ from nepi.execution.scheduler import HeapScheduler, Task, TaskStatus
 from nepi.execution.trace import TraceAttr
 
 # TODO: use multiprocessing instead of threading
-# TODO: When a failure occurs during deployment, scp and ssh processes are left running behind!!
 # TODO: Allow to reconnect to a running experiment instance! (reconnect mode vs deploy mode)
 
+import functools
+import logging
+import os
+import random
+import sys
+import time
+import threading
+
 class ECState(object):
     """ State of the Experiment Controller
    
@@ -182,7 +181,7 @@ class ExperimentController(object):
 
     def wait_finished(self, guids):
         """ Blocking method that wait until all the RM from the 'guid' list 
-            reached the state FINISHED
+            reached the state FINISHED ( or STOPPED, FAILED or RELEASED )
 
         :param guids: List of guids
         :type guids: list
@@ -191,31 +190,34 @@ class ExperimentController(object):
 
     def wait_started(self, guids):
         """ Blocking method that wait until all the RM from the 'guid' list 
-            reached the state STARTED
+            reached the state STARTED ( or STOPPED, FINISHED, FAILED, RELEASED)
 
         :param guids: List of guids
         :type guids: list
         """
-        return self.wait(guids, states = [ResourceState.STARTED,
-            ResourceState.STOPPED,
-            ResourceState.FAILED,
-            ResourceState.FINISHED])
+        return self.wait(guids, state = ResourceState.STARTED)
 
     def wait_released(self, guids):
         """ Blocking method that wait until all the RM from the 'guid' list 
-            reached the state RELEASED
+            reached the state RELEASED (or FAILED)
+
+        :param guids: List of guids
+        :type guids: list
+        """
+        # TODO: solve state concurrency BUG and !!!!
+        # correct waited release state to state = ResourceState.FAILED)
+        return self.wait(guids, state = ResourceState.FINISHED)
+
+    def wait_deployed(self, guids):
+        """ Blocking method that wait until all the RM from the 'guid' list 
+            reached the state READY (or any higher state)
 
         :param guids: List of guids
         :type guids: list
         """
-        return self.wait(guids, states = [ResourceState.RELEASED,
-            ResourceState.STOPPED,
-            ResourceState.FAILED,
-            ResourceState.FINISHED])
+        return self.wait(guids, state = ResourceState.READY)
 
-    def wait(self, guids, states = [ResourceState.FINISHED, 
-            ResourceState.FAILED,
-            ResourceState.STOPPED]):
+    def wait(self, guids, state = ResourceState.STOPPED):
         """ Blocking method that waits until all the RM from the 'guid' list 
             reached state 'state' or until a failure occurs
             
@@ -237,14 +239,14 @@ class ExperimentController(object):
 
             # If a guid reached one of the target states, remove it from list
             guid = guids[0]
-            state = self.state(guid)
+            rstate = self.state(guid)
 
-            if state in states:
+            if rstate >= state:
                 guids.remove(guid)
             else:
                 # Debug...
-                self.logger.debug(" WAITING FOR %g - state %s " % (guid,
-                    self.state(guid, hr = True)))
+                self.logger.debug(" WAITING FOR guid %d - state is %s, required is >= %s " % (guid,
+                    self.state(guid, hr = True), state))
 
                 # Take the opportunity to 'refresh' the states of the RMs.
                 # Query only the first up to N guids (not to overwhelm 
@@ -262,7 +264,7 @@ class ExperimentController(object):
                 # If the guid is not in one of the target states, wait and
                 # continue quering. We keep the sleep big to decrease the
                 # number of RM state queries
-                time.sleep(2)
+                time.sleep(4)
   
     def get_task(self, tid):
         """ Get a specific task
index 978f0ee..2d738c5 100644 (file)
@@ -200,8 +200,8 @@ class ResourceManager(Logger):
         # the resource instance gets a copy of all traces
         self._trcs = copy.deepcopy(self._traces)
 
-        self._state = ResourceState.NEW
-
+        # Each resource is placed on a deployment group by the EC
+        # during deployment
         self.deployment_group = None
 
         self._start_time = None
@@ -213,6 +213,8 @@ class ResourceManager(Logger):
         self._finish_time = None
         self._failed_time = None
 
+        self._state = ResourceState.NEW
+
     @property
     def guid(self):
         """ Returns the global unique identifier of the RM """
@@ -316,9 +318,8 @@ class ResourceManager(Logger):
         This  method is resposible for selecting an individual resource
         matching user requirements.
         This method should be redefined when necessary in child classes.
-        """ 
-        self._discover_time = tnow()
-        self._state = ResourceState.DISCOVERED
+        """
+        self.set_discovered()
 
     def provision(self):
         """ Performs resource provisioning.
@@ -327,9 +328,8 @@ class ResourceManager(Logger):
         After this method has been successfully invoked, the resource
         should be acccesible/controllable by the RM.
         This method should be redefined when necessary in child classes.
-        """ 
-        self._provision_time = tnow()
-        self._state = ResourceState.PROVISIONED
+        """
+        self.set_provisioned()
 
     def start(self):
         """ Starts the resource.
@@ -337,12 +337,11 @@ class ResourceManager(Logger):
         There is no generic start behavior for all resources.
         This method should be redefined when necessary in child classes.
         """
-        if not self._state in [ResourceState.READY, ResourceState.STOPPED]:
+        if not self.state in [ResourceState.READY, ResourceState.STOPPED]:
             self.error("Wrong state %s for start" % self.state)
             return
 
-        self._start_time = tnow()
-        self._state = ResourceState.STARTED
+        self.set_started()
 
     def stop(self):
         """ Stops the resource.
@@ -350,12 +349,31 @@ class ResourceManager(Logger):
         There is no generic stop behavior for all resources.
         This method should be redefined when necessary in child classes.
         """
-        if not self._state in [ResourceState.STARTED]:
+        if not self.state in [ResourceState.STARTED]:
             self.error("Wrong state %s for stop" % self.state)
             return
+        
+        self.set_stopped()
 
-        self._stop_time = tnow()
-        self._state = ResourceState.STOPPED
+    def deploy(self):
+        """ Execute all steps required for the RM to reach the state READY
+
+        """
+        if self.state > ResourceState.READY:
+            self.error("Wrong state %s for deploy" % self.state)
+            return
+
+        self.debug("----- READY ---- ")
+        self.set_ready()
+
+    def release(self):
+        self.set_released()
+
+    def finish(self):
+        self.set_finished()
+    def fail(self):
+        self.set_failed()
 
     def set(self, name, value):
         """ Set the value of the attribute
@@ -655,39 +673,6 @@ class ResourceManager(Logger):
             self.debug(" ----- STOPPING ---- ") 
             self.stop()
 
-    def deploy(self):
-        """ Execute all steps required for the RM to reach the state READY
-
-        """
-        if self._state > ResourceState.READY:
-            self.error("Wrong state %s for deploy" % self.state)
-            return
-
-        self.debug("----- READY ---- ")
-        self._ready_time = tnow()
-        self._state = ResourceState.READY
-
-    def release(self):
-        """Release any resources used by this RM
-
-        """
-        self._release_time = tnow()
-        self._state = ResourceState.RELEASED
-
-    def finish(self):
-        """ Mark ResourceManager as FINISHED
-
-        """
-        self._finish_time = tnow()
-        self._state = ResourceState.FINISHED
-
-    def fail(self):
-        """ Mark ResourceManager as FAILED
-
-        """
-        self._failed_time = tnow()
-        self._state = ResourceState.FAILED
-
     def connect(self, guid):
         """ Performs actions that need to be taken upon associating RMs.
         This method should be redefined when necessary in child classes.
@@ -712,6 +697,46 @@ class ResourceManager(Logger):
         """
         # TODO: Validate!
         return True
+    
+    def set_started(self):
+        """ Mark ResourceManager as STARTED """
+        self._start_time = tnow()
+        self._state = ResourceState.STARTED
+        
+    def set_stopped(self):
+        """ Mark ResourceManager as STOPPED """
+        self._stop_time = tnow()
+        self._state = ResourceState.STOPPED
+
+    def set_ready(self):
+        """ Mark ResourceManager as READY """
+        self._ready_time = tnow()
+        self._state = ResourceState.READY
+
+    def set_released(self):
+        """ Mark ResourceManager as REALEASED """
+        self._release_time = tnow()
+        self._state = ResourceState.RELEASED
+
+    def set_finished(self):
+        """ Mark ResourceManager as FINISHED """
+        self._finish_time = tnow()
+        self._state = ResourceState.FINISHED
+
+    def set_failed(self):
+        """ Mark ResourceManager as FAILED """
+        self._failed_time = tnow()
+        self._state = ResourceState.FAILED
+
+    def set_discovered(self):
+        """ Mark ResourceManager as DISCOVERED """
+        self._discover_time = tnow()
+        self._state = ResourceState.DISCOVERED
+
+    def set_provisioned(self):
+        """ Mark ResourceManager as PROVISIONED """
+        self._provision_time = tnow()
+        self._state = ResourceState.PROVISIONED
 
 class ResourceFactory(object):
     _resource_types = dict()
index 3519138..f9c46c8 100644 (file)
@@ -20,7 +20,7 @@
 from nepi.execution.attribute import Attribute, Flags, Types
 from nepi.execution.trace import Trace, TraceAttr
 from nepi.execution.resource import ResourceManager, clsinit, ResourceState, \
-    reschedule_delay
+        reschedule_delay
 from nepi.resources.linux.node import LinuxNode
 from nepi.util.sshfuncs import ProcStatus
 from nepi.util.timefuncs import tnow, tdiffsec
@@ -29,6 +29,7 @@ import os
 import subprocess
 
 # TODO: Resolve wildcards in commands!!
+# TODO: When a failure occurs during deployment, scp and ssh processes are left running behind!!
 
 @clsinit
 class LinuxApplication(ResourceManager):
@@ -483,7 +484,7 @@ class LinuxApplication(ResourceManager):
                 raise
 
             super(LinuxApplication, self).deploy()
-
+    
     def start(self):
         command = self.get("command")
 
@@ -492,7 +493,7 @@ class LinuxApplication(ResourceManager):
         if not command:
             # If no command was given (i.e. Application was used for dependency
             # installation), then the application is directly marked as FINISHED
-            self._state = ResourceState.FINISHED
+            self.set_finished()
         else:
 
             if self.in_foreground:
@@ -585,14 +586,12 @@ class LinuxApplication(ResourceManager):
 
         if self.state == ResourceState.STARTED:
         
-            self.info("Stopping command '%s'" % command)
+            self.info("Stopping command '%s' " % command)
         
             # If the command is running in foreground (it was launched using
             # the node 'execute' method), then we use the handler to the Popen
             # process to kill it. Else we send a kill signal using the pid and ppid
             # retrieved after running the command with the node 'run' method
-            stopped = True
-
             if self._proc:
                 self._proc.kill()
             else:
@@ -602,12 +601,12 @@ class LinuxApplication(ResourceManager):
                     (out, err), proc = self.node.kill(self.pid, self.ppid,
                             sudo = self._sudo_kill)
 
+                    # TODO: check if execution errors occurred
                     if proc.poll() or err:
-                        # check if execution errors occurred
                         msg = " Failed to STOP command '%s' " % self.get("command")
                         self.error(msg, out, err)
                         self.fail()
-
+        
         if self.state == ResourceState.STARTED:
             super(LinuxApplication, self).stop()
 
@@ -620,11 +619,11 @@ class LinuxApplication(ResourceManager):
 
         self.stop()
 
-        if self.state == ResourceState.STOPPED:
+        if self.state != ResourceState.FAILED:
             self.info("Resource released")
 
             super(LinuxApplication, self).release()
-    
+   
     @property
     def state(self):
         """ Returns the state of the application
@@ -644,9 +643,9 @@ class LinuxApplication(ResourceManager):
                     err = self._proc.stderr.read()
                     self.error(msg, out, err)
                     self.fail()
-                elif retcode == 0:
-                    self._state = ResourceState.FINISHED
 
+                elif retcode == 0:
+                    self.finish()
             else:
                 # We need to query the status of the command we launched in 
                 # background. In order to avoid overwhelming the remote host and
@@ -665,12 +664,12 @@ class LinuxApplication(ResourceManager):
                                     self.run_home)
 
                             if err:
-                                msg = " Failed to execute command '%s'" % \
+                                msg = "Failed to execute command '%s'" % \
                                         self.get("command")
                                 self.error(msg, out, err)
                                 self.fail()
                             else:
-                               self._state = ResourceState.FINISHED
+                                self.finish()
 
                     self._last_state_check = tnow()
 
index 8a09122..3d8943a 100644 (file)
@@ -17,7 +17,7 @@
 #
 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
 
-from nepi.execution.resource import ResourceManager, clsinit_copy, ResourceState, \
+from nepi.execution.resource import clsinit_copy, ResourceState, \
     reschedule_delay
 from nepi.resources.linux.application import LinuxApplication
 from nepi.resources.linux.ccn.ccnd import LinuxCCND
@@ -64,8 +64,7 @@ class LinuxCCNApplication(LinuxApplication):
                 raise
  
             self.debug("----- READY ---- ")
-            self._ready_time = tnow()
-            self._state = ResourceState.READY
+            self.set_ready()
 
     @property
     def _environment(self):
index 5b9d925..cae55b5 100644 (file)
@@ -101,8 +101,7 @@ class LinuxCCNContent(LinuxApplication):
                 raise
  
             self.debug("----- READY ---- ")
-            self._ready_time = tnow()
-            self._state = ResourceState.READY
+            self.set_ready()
 
     def upload_start_command(self):
         command = self.get("command")
@@ -128,22 +127,17 @@ class LinuxCCNContent(LinuxApplication):
             raise RuntimeError, msg
 
     def start(self):
-        if self._state == ResourceState.READY:
+        if self.state == ResourceState.READY:
             command = self.get("command")
             self.info("Starting command '%s'" % command)
 
-            self._start_time = tnow()
-            self._state = ResourceState.STARTED
+            self.set_started()
         else:
             msg = " Failed to execute command '%s'" % command
             self.error(msg, out, err)
-            self._state = ResourceState.FAILED
+            sef.fail()
             raise RuntimeError, msg
 
-    @property
-    def state(self):
-        return self._state
-
     @property
     def _start_command(self):
         command = ["ccnseqwriter"]
index 20c04fc..4fdb0ce 100644 (file)
@@ -19,8 +19,8 @@
 
 from nepi.execution.attribute import Attribute, Flags, Types
 from nepi.execution.trace import Trace, TraceAttr
-from nepi.execution.resource import ResourceManager, clsinit_copy, ResourceState, \
-    reschedule_delay
+from nepi.execution.resource import ResourceManager, clsinit_copy, \
+        ResourceState, reschedule_delay
 from nepi.resources.linux.application import LinuxApplication
 from nepi.resources.linux.node import OSType
 from nepi.util.timefuncs import tnow, tdiffsec
@@ -181,8 +181,7 @@ class LinuxCCND(LinuxApplication):
                 raise
  
             self.debug("----- READY ---- ")
-            self._ready_time = tnow()
-            self._state = ResourceState.READY
+            self.set_ready()
 
     def upload_start_command(self):
         command = self.get("command")
@@ -204,23 +203,21 @@ class LinuxCCND(LinuxApplication):
                 raise_on_error = True)
 
     def start(self):
-        if self._state == ResourceState.READY:
+        if self.state == ResourceState.READY:
             command = self.get("command")
             self.info("Starting command '%s'" % command)
 
-            self._start_time = tnow()
-            self._state = ResourceState.STARTED
+            self.set_started()
         else:
             msg = " Failed to execute command '%s'" % command
             self.error(msg, out, err)
-            self._state = ResourceState.FAILED
+            self.set_failed()
             raise RuntimeError, msg
 
     def stop(self):
         command = self.get('command') or ''
-        state = self.state
         
-        if state == ResourceState.STARTED:
+        if self.state == ResourceState.STARTED:
             self.info("Stopping command '%s'" % command)
 
             command = "ccndstop"
@@ -241,8 +238,7 @@ class LinuxCCND(LinuxApplication):
                         stdout = "ccndstop_stdout", 
                         stderr = "ccndstop_stderr")
 
-            self._stop_time = tnow()
-            self._state = ResourceState.STOPPED
+            self.set_stopped()
     
     @property
     def state(self):
@@ -256,12 +252,12 @@ class LinuxCCND(LinuxApplication):
 
             if retcode == 1 and err.find("No such file or directory") > -1:
                 # ccnd is not running (socket not found)
-                self._state = ResourceState.FINISHED
+                self.set_finished()
             elif retcode:
                 # other errors ...
                 msg = " Failed to execute command '%s'" % self.get("command")
                 self.error(msg, out, err)
-                self._state = ResourceState.FAILED
+                self.fail()
 
             self._last_state_check = tnow()
 
index ac10840..378f93c 100644 (file)
@@ -225,8 +225,7 @@ class LinuxCCNR(LinuxApplication):
                 raise
  
             self.debug("----- READY ---- ")
-            self._ready_time = tnow()
-            self._state = ResourceState.READY
+            self.set_ready()
 
     def upload_start_command(self):
         command = self.get("command")
@@ -257,16 +256,15 @@ class LinuxCCNR(LinuxApplication):
                 raise_on_error = True)
 
     def start(self):
-        if self._state == ResourceState.READY:
+        if self.state == ResourceState.READY:
             command = self.get("command")
             self.info("Starting command '%s'" % command)
 
-            self._start_time = tnow()
-            self._state = ResourceState.STARTED
+            self.set_started()
         else:
             msg = " Failed to execute command '%s'" % command
             self.error(msg, out, err)
-            self._state = ResourceState.FAILED
+            self.fail()
             raise RuntimeError, msg
 
     @property
index 62d9049..9d8e8c2 100644 (file)
@@ -139,8 +139,7 @@ class LinuxFIBEntry(LinuxApplication):
                 raise
  
             self.debug("----- READY ---- ")
-            self._ready_time = tnow()
-            self._state = ResourceState.READY
+            self.set_ready()
 
     def upload_start_command(self):
         command = self.get("command")
@@ -160,9 +159,9 @@ class LinuxFIBEntry(LinuxApplication):
                 env, blocking = True)
 
         if proc.poll():
-            self._state = ResourceState.FAILED
             msg = "Failed to execute command"
             self.error(msg, out, err)
+            self.fail()
             raise RuntimeError, msg
         
     def configure(self):
@@ -197,16 +196,15 @@ class LinuxFIBEntry(LinuxApplication):
             self.ec.deploy(guids=[self._traceroute], group = self.deployment_group)
 
     def start(self):
-        if self._state in [ResourceState.READY, ResourceState.STARTED]:
+        if self.state == ResourceState.READY:
             command = self.get("command")
             self.info("Starting command '%s'" % command)
 
-            self._start_time = tnow()
-            self._state = ResourceState.STARTED
+            self.set_started()
         else:
             msg = " Failed to execute command '%s'" % command
             self.error(msg, out, err)
-            self._state = ResourceState.FAILED
+            self.fail()
             raise RuntimeError, msg
 
     def stop(self):
@@ -222,12 +220,7 @@ class LinuxFIBEntry(LinuxApplication):
             if proc.poll():
                 pass
 
-            self._stop_time = tnow()
-            self._state = ResourceState.STOPPED
-
-    @property
-    def state(self):
-        return self._state
+            self.set_stopped()
 
     @property
     def _start_command(self):
index 9188022..d71d21f 100644 (file)
@@ -196,8 +196,14 @@ class LinuxNode(ResourceManager):
         # home directory at Linux host
         self._home_dir = ""
         
-        # lock to avoid concurrency issues on methods used by applications 
-        self._lock = threading.Lock()
+        # lock to prevent concurrent applications on the same node,
+        # to execute commands at the same time. There are potential
+        # concurrency issues when using SSH to a same host from 
+        # multiple threads. There are also possible operational 
+        # issues, e.g. an application querying the existence 
+        # of a file or folder prior to its creation, and another 
+        # application creating the same file or folder in between.
+        self._node_lock = threading.Lock()
     
     def log_message(self, msg):
         return " guid %d - host %s - %s " % (self.guid, 
@@ -351,7 +357,7 @@ class LinuxNode(ResourceManager):
                 self.discover()
                 self.provision()
             except:
-                self._state = ResourceState.FAILED
+                self.fail()
                 raise
 
         # Node needs to wait until all associated interfaces are 
@@ -456,7 +462,7 @@ class LinuxNode(ResourceManager):
                     env = env)
         else:
             if with_lock:
-                with self._lock:
+                with self._node_lock:
                     (out, err), proc = sshfuncs.rexec(
                         command, 
                         host = self.get("hostname"),
@@ -524,7 +530,7 @@ class LinuxNode(ResourceManager):
                     sudo = sudo,
                     user = user) 
         else:
-            with self._lock:
+            with self._node_lock:
                 (out, err), proc = sshfuncs.rspawn(
                     command,
                     pidfile = pidfile,
@@ -549,7 +555,7 @@ class LinuxNode(ResourceManager):
         if self.localhost:
             pidtuple =  execfuncs.lgetpid(os.path.join(home, pidfile))
         else:
-            with self._lock:
+            with self._node_lock:
                 pidtuple = sshfuncs.rgetpid(
                     os.path.join(home, pidfile),
                     host = self.get("hostname"),
@@ -566,7 +572,7 @@ class LinuxNode(ResourceManager):
         if self.localhost:
             status = execfuncs.lstatus(pid, ppid)
         else:
-            with self._lock:
+            with self._node_lock:
                 status = sshfuncs.rstatus(
                         pid, ppid,
                         host = self.get("hostname"),
@@ -588,7 +594,7 @@ class LinuxNode(ResourceManager):
             if self.localhost:
                 (out, err), proc = execfuncs.lkill(pid, ppid, sudo)
             else:
-                with self._lock:
+                with self._node_lock:
                     (out, err), proc = sshfuncs.rkill(
                         pid, ppid,
                         host = self.get("hostname"),
@@ -608,7 +614,7 @@ class LinuxNode(ResourceManager):
                     recursive = True,
                     strict_host_checking = False)
         else:
-            with self._lock:
+            with self._node_lock:
                 (out, err), proc = sshfuncs.rcopy(
                     src, dst, 
                     port = self.get("port"),
index 4d55eb1..2e03f62 100644 (file)
@@ -23,6 +23,7 @@ from nepi.resources.linux.application import LinuxApplication
 from nepi.util.timefuncs import tnow
 
 import os
+import socket
 
 @clsinit_copy
 class LinuxTraceroute(LinuxApplication):
@@ -42,12 +43,21 @@ class LinuxTraceroute(LinuxApplication):
             default = False,
             flags = Flags.ExecReadOnly)
 
+        use_ip = Attribute("useIP",
+            "Use the IP address instead of the host domain name. "
+            "Useful for environments were dns resolution problems occur "
+            "frequently",
+            type = Types.Bool,
+            default = False,
+            flags = Flags.ExecReadOnly)
+
         target = Attribute("target",
             "Traceroute target host (host that will be pinged)",
             flags = Flags.ExecReadOnly)
 
         cls._register_attribute(countinuous)
         cls._register_attribute(print_timestamp)
+        cls._register_attribute(use_ip)
         cls._register_attribute(target)
 
     def __init__(self, ec, guid):
@@ -71,7 +81,12 @@ class LinuxTraceroute(LinuxApplication):
         if self.get("printTimestamp") == True:
             args.append("""echo "`date +'%Y%m%d%H%M%S'`";""")
         args.append("traceroute")
-        args.append(self.get("target"))
+
+        target = self.get("target")
+        if self.get("useIP") == True:
+            target = socket.gethostbyname(target)
+        args.append(target)
+        
         if self.get("continuous") == True:
             args.append("; sleep 2 ; done ")
 
index dbb0813..779b8c1 100644 (file)
@@ -18,7 +18,7 @@
 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
 
 from nepi.execution.attribute import Attribute, Flags, Types
-from nepi.execution.resource import ResourceManager, clsinit_copy, ResourceState, \
+from nepi.execution.resource import clsinit_copy, ResourceState, \
         reschedule_delay
 from nepi.execution.resource import clsinit_copy 
 from nepi.resources.linux.application import LinuxApplication
@@ -250,16 +250,15 @@ class LinuxUdpTest(LinuxApplication):
     def start(self):
         if self.get("s") == True:
             # Server is already running
-            if self._state == ResourceState.READY:
+            if self.state == ResourceState.READY:
                 command = self.get("command")
                 self.info("Starting command '%s'" % command)
 
-                self._start_time = tnow()
-                self._state = ResourceState.STARTED
+                self.set_started()
             else:
                 msg = " Failed to execute command '%s'" % command
                 self.error(msg, out, err)
-                self._state = ResourceState.FAILED
+                self.fail()
                 raise RuntimeError, msg
         else:
             super(LinuxUdpTest, self).start()
index 349ddc2..1d23736 100644 (file)
@@ -18,7 +18,7 @@
 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
 
 from nepi.execution.attribute import Attribute, Flags, Types
-from nepi.execution.resource import ResourceManager, clsinit_copy, ResourceState, \
+from nepi.execution.resource import clsinit_copy, ResourceState, \
         reschedule_delay
 from nepi.resources.linux.application import LinuxApplication
 from nepi.util.sshfuncs import ProcStatus
@@ -187,9 +187,7 @@ class UdpTunnel(LinuxApplication):
        
         self.info("Provisioning finished")
  
-        self.debug("----- READY ---- ")
-        self._provision_time = tnow()
-        self._state = ResourceState.PROVISIONED
+        self.set_provisioned()
 
     def deploy(self):
         if (not self.endpoint1 or self.endpoint1.state < ResourceState.READY) or \
@@ -204,29 +202,24 @@ class UdpTunnel(LinuxApplication):
                 raise
  
             self.debug("----- READY ---- ")
-            self._ready_time = tnow()
-            self._state = ResourceState.READY
+            self.set_ready()
 
     def start(self):
-        if self._state == ResourceState.READY:
+        if self.state == ResourceState.READY:
             command = self.get("command")
             self.info("Starting command '%s'" % command)
-
-            self._start_time = tnow()
-            self._state = ResourceState.STARTED
+            
+            self.set_started()
         else:
             msg = " Failed to execute command '%s'" % command
             self.error(msg, out, err)
-            self._state = ResourceState.FAILED
+            self.fail()
             raise RuntimeError, msg
 
-    # XXX: Leaves process unkilled!! 
-    #       Implement another mechanism to kill the tunnel!
     def stop(self):
         """ Stops application execution
         """
         if self.state == ResourceState.STARTED:
-            stopped = True
             self.info("Stopping tunnel")
     
             # Only try to kill the process if the pid and ppid
@@ -242,11 +235,9 @@ class UdpTunnel(LinuxApplication):
                     msg = " Failed to STOP tunnel"
                     self.error(msg, err1, err2)
                     self.fail()
-                    stopped = False
 
-            if stopped:
-                self._stop_time = tnow()
-                self._state = ResourceState.STOPPED
+        if self.state == ResourceState.STARTED:
+            self.set_stopped()
 
     @property
     def state(self):
@@ -280,7 +271,7 @@ class UdpTunnel(LinuxApplication):
                             self.error(msg, err1, err2)
                             self.fail()
                         else:
-                            self._state = ResourceState.FINISHED
+                            self.set_finished()
 
                 self._last_state_check = tnow()
 
@@ -288,11 +279,15 @@ class UdpTunnel(LinuxApplication):
 
     def wait_local_port(self, endpoint):
         """ Waits until the local_port file for the endpoint is generated, 
-            and returns the port number """
+        and returns the port number 
+        
+        """
         return self.wait_file(endpoint, "local_port")
 
     def wait_result(self, endpoint):
-        """ Waits until the return code file for the endpoint is generated """ 
+        """ Waits until the return code file for the endpoint is generated 
+        
+        """ 
         return self.wait_file(endpoint, "ret_file")
  
     def wait_file(self, endpoint, filename):
index 3b80df1..673f810 100644 (file)
@@ -39,7 +39,8 @@ class OMFApplication(ResourceManager):
 
     .. note::
 
-       This class is used only by the Experiment Controller through the Resource Factory
+       This class is used only by the Experiment Controller through the 
+       Resource Factory
 
     """
     _rtype = "OMFApplication"
@@ -47,7 +48,8 @@ class OMFApplication(ResourceManager):
 
     @classmethod
     def _register_attributes(cls):
-        """Register the attributes of an OMF application
+        """ Register the attributes of an OMF application
+
         """
 
         appid = Attribute("appid", "Name of the application")
@@ -78,7 +80,6 @@ class OMFApplication(ResourceManager):
         :type creds: dict
 
         """
-        
         super(OMFApplication, self).__init__(ec, guid)
 
         self.set('appid', "")
@@ -90,12 +91,6 @@ class OMFApplication(ResourceManager):
 
         self._omf_api = None
 
-    @property
-    def exp_id(self):
-        if self.ec.exp_id.startswith('exp-'):
-            return None
-        return self.ec.exp_id
-
     @property
     def node(self):
         rm_list = self.get_connected(OMFNode.rtype())
@@ -103,7 +98,8 @@ class OMFApplication(ResourceManager):
         return None
 
     def valid_connection(self, guid):
-        """Check if the connection with the guid in parameter is possible. Only meaningful connections are allowed.
+        """ Check if the connection with the guid in parameter is possible. 
+        Only meaningful connections are allowed.
 
         :param guid: Guid of RM it will be connected
         :type guid: int
@@ -112,47 +108,57 @@ class OMFApplication(ResourceManager):
         """
         rm = self.ec.get_resource(guid)
         if rm.rtype() not in self._authorized_connections:
-            msg = "Connection between %s %s and %s %s refused : An Application can be connected only to a Node" %\
+            msg = ("Connection between %s %s and %s %s refused: "
+                    "An Application can be connected only to a Node" ) % \
                 (self.rtype(), self._guid, rm.rtype(), guid)
             self.debug(msg)
+
             return False
+
         elif len(self.connections) != 0 :
-            msg = "Connection between %s %s and %s %s refused : This Application is already connected" % \
+            msg = ("Connection between %s %s and %s %s refused: "
+                    "This Application is already connected" ) % \
                 (self.rtype(), self._guid, rm.rtype(), guid)
             self.debug(msg)
+
             return False
+
         else :
-            msg = "Connection between %s %s and %s %s accepted" % (self.rtype(), self._guid, rm.rtype(), guid)
+            msg = "Connection between %s %s and %s %s accepted" % (
+                    self.rtype(), self._guid, rm.rtype(), guid)
             self.debug(msg)
-            return True
-
 
+            return True
 
     def deploy(self):
-        """Deploy the RM. It means nothing special for an application for now (later it will be upload sources, ...)
-           It becomes DEPLOYED after getting the xmpp client.
+        """ Deploy the RM. It means nothing special for an application 
+        for now (later it will be upload sources, ...)
+        It becomes DEPLOYED after getting the xmpp client.
+
         """
         if not self._omf_api :
             self._omf_api = OMFAPIFactory.get_api(self.get('xmppSlice'), 
-                self.get('xmppHost'), self.get('xmppPort'), self.get('xmppPassword'), exp_id = self.exp_id)
+                self.get('xmppHost'), self.get('xmppPort'), 
+                self.get('xmppPassword'), exp_id = self.ec.exp_id)
 
         if not self._omf_api :
-            self._state = ResourceState.FAILED
             msg = "Credentials are not initialzed. XMPP Connections impossible"
             self.error(msg)
+            self.fail()
             return
 
         super(OMFApplication, self).deploy()
 
     def start(self):
-        """Start the RM. It means : Send Xmpp Message Using OMF protocol to execute the application
-           It becomes STARTED before the messages are sent (for coordination)
+        """ Start the RM. It means : Send Xmpp Message Using OMF protocol 
+         to execute the application. 
+         It becomes STARTED before the messages are sent (for coordination)
 
         """
         if not (self.get('appid') and self.get('path')) :
-            self._state = ResourceState.FAILED
             msg = "Application's information are not initialized"
             self.error(msg)
+            self.fail()
             return
 
         if not self.get('args'):
@@ -170,39 +176,37 @@ class OMFApplication(ResourceManager):
             self._omf_api.execute(self.node.get('hostname'),self.get('appid'), \
                 self.get('args'), self.get('path'), self.get('env'))
         except AttributeError:
-            self._state = ResourceState.FAILED
             msg = "Credentials are not initialzed. XMPP Connections impossible"
             self.error(msg)
+            self.fail()
             raise
 
-
         super(OMFApplication, self).start()
 
-
     def stop(self):
-        """Stop the RM. It means : Send Xmpp Message Using OMF protocol to kill the application
-           It becomes STOPPED after the message is sent.
+        """ Stop the RM. It means : Send Xmpp Message Using OMF protocol to 
+        kill the application. 
+        State is set to STOPPED after the message is sent.
 
         """
         try:
             self._omf_api.exit(self.node.get('hostname'),self.get('appid'))
         except AttributeError:
-            self._state = ResourceState.FAILED
             msg = "Credentials were not initialzed. XMPP Connections impossible"
             self.error(msg)
+            self.fail()
             #raise
 
         super(OMFApplication, self).stop()
-        self._state = ResourceState.FINISHED
-        
 
     def release(self):
-        """Clean the RM at the end of the experiment and release the API.
+        """ Clean the RM at the end of the experiment and release the API.
 
         """
         if self._omf_api :
             OMFAPIFactory.release_api(self.get('xmppSlice'), 
-                self.get('xmppHost'), self.get('xmppPort'), self.get('xmppPassword'), exp_id = self.exp_id)
+                self.get('xmppHost'), self.get('xmppPort'), 
+                self.get('xmppPassword'), exp_id = self.ec.exp_id)
 
         super(OMFApplication, self).release()
 
index bc4f9b5..b51cd89 100644 (file)
@@ -1,21 +1,22 @@
-"""
-    NEPI, a framework to manage network experiments
-    Copyright (C) 2013 INRIA
-
-    This program is free software: you can redistribute it and/or modify
-    it under the terms of the GNU General Public License as published by
-    the Free Software Foundation, either version 3 of the License, or
-    (at your option) any later version.
-
-    This program is distributed in the hope that it will be useful,
-    but WITHOUT ANY WARRANTY; without even the implied warranty of
-    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
-    GNU General Public License for more details.
-
-    You should have received a copy of the GNU General Public License
-    along with this program.  If not, see <http://www.gnu.org/licenses/>.
-
-"""
+#
+#    NEPI, a framework to manage network experiments
+#    Copyright (C) 2013 INRIA
+#
+#    This program is free software: you can redistribute it and/or modify
+#    it under the terms of the GNU General Public License as published by
+#    the Free Software Foundation, either version 3 of the License, or
+#    (at your option) any later version.
+#
+#    This program is distributed in the hope that it will be useful,
+#    but WITHOUT ANY WARRANTY; without even the implied warranty of
+#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+#    GNU General Public License for more details.
+#
+#    You should have received a copy of the GNU General Public License
+#    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+#
+# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
+#         Julien Tribino <julien.tribino@inria.fr>
 
 from nepi.execution.resource import ResourceManager, clsinit, ResourceState, \
         reschedule_delay
@@ -44,10 +45,10 @@ class OMFChannel(ResourceManager):
     _rtype = "OMFChannel"
     _authorized_connections = ["OMFWifiInterface", "OMFNode"]
 
-
     @classmethod
     def _register_attributes(cls):
         """Register the attributes of an OMF channel
+        
         """
         channel = Attribute("channel", "Name of the application")
         xmppSlice = Attribute("xmppSlice","Name of the slice", flags = Flags.Credential)
@@ -83,7 +84,8 @@ class OMFChannel(ResourceManager):
         return self.ec.exp_id
 
     def valid_connection(self, guid):
-        """Check if the connection with the guid in parameter is possible. Only meaningful connections are allowed.
+        """ Check if the connection with the guid in parameter is possible.
+        Only meaningful connections are allowed.
 
         :param guid: Guid of the current RM
         :type guid: int
@@ -91,12 +93,17 @@ class OMFChannel(ResourceManager):
 
         """
         rm = self.ec.get_resource(guid)
+        
         if rm.rtype() in self._authorized_connections:
-            msg = "Connection between %s %s and %s %s accepted" % (self.rtype(), self._guid, rm.rtype(), guid)
+            msg = "Connection between %s %s and %s %s accepted" % (
+                    self.rtype(), self._guid, rm.rtype(), guid)
             self.debug(msg)
             return True
-        msg = "Connection between %s %s and %s %s refused" % (self.rtype(), self._guid, rm.rtype(), guid)
+
+        msg = "Connection between %s %s and %s %s refused" % (
+                self.rtype(), self._guid, rm.rtype(), guid)
         self.debug(msg)
+        
         return False
 
     def _get_target(self, conn_set):
@@ -115,7 +122,8 @@ class OMFChannel(ResourceManager):
             for conn in rm_iface.connections:
                 rm_node = self.ec.get_resource(conn)
                 if rm_node.rtype() == "OMFNode" and rm_node.get('hostname'):
-                    if rm_iface.state < ResourceState.PROVISIONED or rm_node.state < ResourceState.READY:
+                    if rm_iface.state < ResourceState.PROVISIONED or \
+                            rm_node.state < ResourceState.READY:
                         return "reschedule"
                     couple = [rm_node.get('hostname'), rm_iface.get('alias')]
                     #print couple
@@ -135,24 +143,26 @@ class OMFChannel(ResourceManager):
         pass
 
     def deploy(self):
-        """Deploy the RM. It means : Get the xmpp client and send messages using OMF 5.4 protocol to configure the channel
-           It becomes DEPLOYED after sending messages to configure the channel
+        """ Deploy the RM. It means : Get the xmpp client and send messages 
+        using OMF 5.4 protocol to configure the channel.
+        It becomes DEPLOYED after sending messages to configure the channel
 
         """
         if not self._omf_api :
             self._omf_api = OMFAPIFactory.get_api(self.get('xmppSlice'), 
-                self.get('xmppHost'), self.get('xmppPort'), self.get('xmppPassword'), exp_id = self.exp_id)
+                self.get('xmppHost'), self.get('xmppPort'), 
+                self.get('xmppPassword'), exp_id = self.exp_id)
 
         if not self._omf_api :
-            self._state = ResourceState.FAILED
             msg = "Credentials are not initialzed. XMPP Connections impossible"
             self.error(msg)
+            self.fail()
             return
 
         if not self.get('channel'):
-            self._state = ResourceState.FAILED
             msg = "Channel's value is not initialized"
             self.error(msg)
+            self.fail()
             raise
 
         self._nodes_guid = self._get_target(self._connections) 
@@ -167,35 +177,36 @@ class OMFChannel(ResourceManager):
                 attrname = "net/%s/%s" % (couple[1], 'channel')
                 self._omf_api.configure(couple[0], attrname, attrval)
         except AttributeError:
-            self._state = ResourceState.FAILED
             msg = "Credentials are not initialzed. XMPP Connections impossible"
             self.error(msg)
+            self.fail()
             raise
 
         super(OMFChannel, self).deploy()
 
     def start(self):
-        """Start the RM. It means nothing special for a channel for now
-           It becomes STARTED as soon as this method starts.
+        """ Start the RM. It means nothing special for a channel for now
+        It becomes STARTED as soon as this method starts.
 
         """
 
         super(OMFChannel, self).start()
 
     def stop(self):
-        """Stop the RM. It means nothing special for a channel for now
-           It becomes STOPPED as soon as this method is called
+        """ Stop the RM. It means nothing special for a channel for now
+        It becomes STOPPED as soon as this method is called
 
         """
         super(OMFChannel, self).stop()
 
     def release(self):
-        """Clean the RM at the end of the experiment and release the API
+        """ Clean the RM at the end of the experiment and release the API
 
         """
         if self._omf_api :
             OMFAPIFactory.release_api(self.get('xmppSlice'), 
-                self.get('xmppHost'), self.get('xmppPort'), self.get('xmppPassword'), exp_id = self.exp_id)
+                self.get('xmppHost'), self.get('xmppPort'), 
+                self.get('xmppPassword'), exp_id = self.exp_id)
 
         super(OMFChannel, self).release()
 
index c2be38e..a3b9c3a 100644 (file)
@@ -26,7 +26,6 @@ from nepi.resources.omf.node import OMFNode
 from nepi.resources.omf.channel import OMFChannel
 from nepi.resources.omf.omf_api import OMFAPIFactory
 
-
 @clsinit
 class OMFWifiInterface(ResourceManager):
     """
@@ -41,7 +40,8 @@ class OMFWifiInterface(ResourceManager):
 
     .. note::
 
-       This class is used only by the Experiment Controller through the Resource Factory
+       This class is used only by the Experiment Controller through the Resource 
+       Factory
 
     """
     _rtype = "OMFWifiInterface"
@@ -88,14 +88,9 @@ class OMFWifiInterface(ResourceManager):
         self._omf_api = None
         self._alias = self.get('alias')
 
-    @property
-    def exp_id(self):
-        if self.ec.exp_id.startswith('exp-'):
-            return None
-        return self.ec.exp_id
-
     def valid_connection(self, guid):
-        """ Check if the connection with the guid in parameter is possible. Only meaningful connections are allowed.
+        """ Check if the connection with the guid in parameter is possible. 
+        Only meaningful connections are allowed.
 
         :param guid: Guid of the current RM
         :type guid: int
@@ -107,10 +102,13 @@ class OMFWifiInterface(ResourceManager):
             msg = "Connection between %s %s and %s %s accepted" % \
                 (self.rtype(), self._guid, rm.rtype(), guid)
             self.debug(msg)
+
             return True
+
         msg = "Connection between %s %s and %s %s refused" % \
              (self.rtype(), self._guid, rm.rtype(), guid)
         self.debug(msg)
+
         return False
 
     @property
@@ -138,7 +136,8 @@ class OMFWifiInterface(ResourceManager):
             for attrname in ["mode", "type", "essid"]:
                 attrval = self.get(attrname)
                 attrname = "net/%s/%s" % (self._alias, attrname)
-                self._omf_api.configure(self.node.get('hostname'), attrname, attrval)
+                self._omf_api.configure(self.node.get('hostname'), attrname, 
+                        attrval)
         except AttributeError:
             self._state = ResourceState.FAILED
             msg = "Credentials are not initialzed. XMPP Connections impossible"
@@ -152,7 +151,6 @@ class OMFWifiInterface(ResourceManager):
         """ Configure the ip of the interface
 
         """
-
         if self.channel.state < ResourceState.READY:
             self.ec.schedule(reschedule_delay, self.deploy)
             return False
@@ -160,39 +158,43 @@ class OMFWifiInterface(ResourceManager):
         try :
             attrval = self.get("ip")
             attrname = "net/%s/%s" % (self._alias, "ip")
-            self._omf_api.configure(self.node.get('hostname'), attrname, attrval)
+            self._omf_api.configure(self.node.get('hostname'), attrname, 
+                    attrval)
         except AttributeError:
-            self._state = ResourceState.FAILED
             msg = "Credentials are not initialzed. XMPP Connections impossible"
             self.debug(msg)
+            self.fail()
             #raise
 
         return True
 
-
     def deploy(self):
-        """Deploy the RM. It means : Get the xmpp client and send messages using OMF 5.4 protocol to configure the interface
-           It becomes DEPLOYED after sending messages to configure the interface
+        """ Deploy the RM. It means : Get the xmpp client and send messages 
+        using OMF 5.4 protocol to configure the interface.
+        It becomes DEPLOYED after sending messages to configure the interface
         """
         if not self._omf_api :
             self._omf_api = OMFAPIFactory.get_api(self.get('xmppSlice'), 
-                self.get('xmppHost'), self.get('xmppPort'), self.get('xmppPassword'), exp_id = self.exp_id)
+                self.get('xmppHost'), self.get('xmppPort'), 
+                self.get('xmppPassword'), exp_id = self.ec.exp_id)
 
         if not self._omf_api :
-            self._state = ResourceState.FAILED
             msg = "Credentials are not initialzed. XMPP Connections impossible"
             self.error(msg)
+            self.fail()
             return
 
-        if not (self.get('mode') and self.get('type') and self.get('essid') and self.get('ip')):
-            self._state = ResourceState.FAILED
+        if not (self.get('mode') and self.get('type') and self.get('essid') \
+                and self.get('ip')):
             msg = "Interface's variable are not initialized"
             self.error(msg)
+            self.fail()
             return False
 
         if not self.node.get('hostname') :
             msg = "The channel is connected with an undefined node"
             self.error(msg)
+            self.fail()
             return False
 
         # Just for information
@@ -213,28 +215,14 @@ class OMFWifiInterface(ResourceManager):
         super(OMFWifiInterface, self).deploy()
         return True
 
-    def start(self):
-        """Start the RM. It means nothing special for an interface for now
-           It becomes STARTED as soon as this method starts.
-
-        """
-
-        super(OMFWifiInterface, self).start()
-
-    def stop(self):
-        """Stop the RM. It means nothing special for an interface for now
-           It becomes STOPPED as soon as this method stops
-
-        """
-        super(OMFWifiInterface, self).stop()
-
     def release(self):
-        """Clean the RM at the end of the experiment and release the API
+        """ Clean the RM at the end of the experiment and release the API
 
         """
         if self._omf_api :
             OMFAPIFactory.release_api(self.get('xmppSlice'), 
-                self.get('xmppHost'), self.get('xmppPort'), self.get('xmppPassword'), exp_id = self.exp_id)
+                self.get('xmppHost'), self.get('xmppPort'), 
+                self.get('xmppPassword'), exp_id = self.ec.exp_id)
 
         super(OMFWifiInterface, self).release()
 
index 612fa38..1421078 100644 (file)
@@ -101,14 +101,9 @@ class OMFNode(ResourceManager):
 
         self._omf_api = None 
 
-    @property
-    def exp_id(self):
-        if self.ec.exp_id.startswith('exp-'):
-            return None
-        return self.ec.exp_id
-
     def valid_connection(self, guid):
-        """Check if the connection with the guid in parameter is possible. Only meaningful connections are allowed.
+        """ Check if the connection with the guid in parameter is possible. 
+        Only meaningful connections are allowed.
 
         :param guid: Guid of the current RM
         :type guid: int
@@ -117,40 +112,47 @@ class OMFNode(ResourceManager):
         """
         rm = self.ec.get_resource(guid)
         if rm.rtype() in self._authorized_connections:
-            msg = "Connection between %s %s and %s %s accepted" % (self.rtype(), self._guid, rm.rtype(), guid)
+            msg = "Connection between %s %s and %s %s accepted" % (
+                    self.rtype(), self._guid, rm.rtype(), guid)
             self.debug(msg)
+
             return True
-        msg = "Connection between %s %s and %s %s refused" % (self.rtype(), self._guid, rm.rtype(), guid)
+
+        msg = "Connection between %s %s and %s %s refused" % (
+                self.rtype(), self._guid, rm.rtype(), guid)
         self.debug(msg)
+
         return False
 
     def deploy(self):
-        """Deploy the RM. It means : Send Xmpp Message Using OMF protocol to enroll the node into the experiment
-           It becomes DEPLOYED after sending messages to enroll the node
+        """ Deploy the RM. It means : Send Xmpp Message Using OMF protocol 
+            to enroll the node into the experiment.
+            It becomes DEPLOYED after sending messages to enroll the node
 
         """ 
         if not self._omf_api :
             self._omf_api = OMFAPIFactory.get_api(self.get('xmppSlice'), 
-                self.get('xmppHost'), self.get('xmppPort'), self.get('xmppPassword'), exp_id = self.exp_id)
+                self.get('xmppHost'), self.get('xmppPort'), 
+                self.get('xmppPassword'), exp_id = self.ec.exp_id)
 
         if not self._omf_api :
-            self._state = ResourceState.FAILED
             msg = "Credentials are not initialzed. XMPP Connections impossible"
             self.error(msg)
+            self.fail()
             return
 
         if not self.get('hostname') :
-            self._state = ResourceState.FAILED
             msg = "Hostname's value is not initialized"
             self.error(msg)
+            self.fail()
             return False
 
         try:
             self._omf_api.enroll_host(self.get('hostname'))
         except AttributeError:
-            self._state = ResourceState.FAILED
             msg = "Credentials are not initialzed. XMPP Connections impossible"
-            self.debug(msg)
+            self.error(msg)
+            self.fail()
             #raise AttributeError, msg
 
         super(OMFNode, self).deploy()
@@ -188,8 +190,10 @@ class OMFNode(ResourceManager):
         """
         if self._omf_api :
             self._omf_api.release(self.get('hostname'))
+
             OMFAPIFactory.release_api(self.get('xmppSlice'), 
-                self.get('xmppHost'), self.get('xmppPort'), self.get('xmppPassword'), exp_id = self.exp_id)
+                self.get('xmppHost'), self.get('xmppPort'), 
+                self.get('xmppPassword'), exp_id = self.ec.exp_id)
 
         super(OMFNode, self).release()
 
index b4b01c5..dc03b93 100644 (file)
@@ -48,10 +48,13 @@ class OMFAPI(Logger):
 
     .. note::
 
-       This class is the implementation of an OMF 5.4 API. Since the version 5.4.1, the Topic Architecture start with OMF_5.4 instead of OMF used for OMF5.3
+       This class is the implementation of an OMF 5.4 API. 
+       Since the version 5.4.1, the Topic Architecture start with OMF_5.4 
+       instead of OMF used for OMF5.3
 
     """
-    def __init__(self, slice, host, port, password, xmpp_root = None, exp_id = None):
+    def __init__(self, slice, host, port, password, xmpp_root = None, 
+            exp_id = None):
         """
     
         :param slice: Xmpp Slice
@@ -81,6 +84,7 @@ class OMFAPI(Logger):
 
         # OMF xmpp client
         self._client = None
+
         # message handler
         self._message = None
 
@@ -133,7 +137,8 @@ class OMFAPI(Logger):
         """ Publish New Experiment Message
 
         """
-        address = "/%s/%s/%s/%s" % (self._host, self._xmpp_root, self._slice, self._user)
+        address = "/%s/%s/%s/%s" % (self._host, self._xmpp_root, self._slice,
+                self._user)
         #print address
         payload = self._message.newexp_function(self._user, address)
         slice_sid = "/%s/%s" % (self._xmpp_root, self._slice)
@@ -160,7 +165,8 @@ class OMFAPI(Logger):
         :type hostname: str
 
         """
-        return "/%s/%s/%s/%s" % (self._xmpp_root, self._slice, self._user, hostname)
+        return "/%s/%s/%s/%s" % (self._xmpp_root, self._slice, self._user, 
+                hostname)
 
     def _host_resource_id(self, hostname):
         """ Return the Topic Name as /xmpp_root/slice/resources/hostname
@@ -201,7 +207,8 @@ class OMFAPI(Logger):
         self._client.delete(xmpp_node)
 
     def enroll_host(self, hostname):
-        """ Create and Subscribe to the session topic and the resources corresponding to the hostname
+        """ Create and Subscribe to the session topic and the resources
+            corresponding to the hostname
 
         :param hostname: Full hrn of the node
         :type hostname: str
@@ -227,7 +234,8 @@ class OMFAPI(Logger):
 
         :param hostname: Full hrn of the node
         :type hostname: str
-        :param attribute: Attribute that need to be configured (often written as /net/wX/attribute, with X the interface number)
+        :param attribute: Attribute that need to be configured (
+            often written as /net/wX/attribute, with X the interface number)
         :type attribute: str
         :param value: Value of the attribute
         :type value: str
@@ -242,7 +250,8 @@ class OMFAPI(Logger):
 
         :param hostname: Full hrn of the node
         :type hostname: str
-        :param app_id: Application Id (Any id that represents in a unique way the application)
+        :param app_id: Application Id (Any id that represents in a unique 
+            way the application)
         :type app_id: str
         :param arguments: Arguments of the application
         :type arguments: str
@@ -252,7 +261,8 @@ class OMFAPI(Logger):
         :type env: str
 
         """
-        payload = self._message.execute_function(hostname, app_id, arguments, path, env)
+        payload = self._message.execute_function(hostname, app_id, arguments, 
+                path, env)
         xmpp_node =  self._host_session_id(hostname)
         self._client.publish(payload, xmpp_node)
 
@@ -295,11 +305,12 @@ class OMFAPIFactory(object):
     """ 
     .. note::
 
-        It allows the different RM to use the same xmpp client if they use the same credentials. 
-        For the moment, it is focused on Xmpp.
+        It allows the different RM to use the same xmpp client if they use 
+        the same credentials.  For the moment, it is focused on XMPP.
 
     """
-    # use lock to avoid concurrent access to the Api list at the same times by 2 different threads
+    # use lock to avoid concurrent access to the Api list at the same times by 2 
+    # different threads
     lock = threading.Lock()
     _apis = dict()
 
index f85c3d5..f71bc5f 100644 (file)
@@ -23,8 +23,18 @@ from nepi.execution.resource import ResourceManager, clsinit_copy, ResourceState
 from nepi.resources.linux.node import LinuxNode
 from nepi.resources.planetlab.plcapi import PLCAPIFactory 
 from nepi.util.timefuncs import tnow, tdiff, tdiffsec, stabsformat
-import threading
+
 import subprocess
+import threading
+
+# A.Q. GENERAL COMMENTS: This module needs major cleaning up
+#     - Lines should be 80 characters
+#     - Most methods have too many lines and there are no comments or spaces
+#     - There should be only two line breaks between two methods
+#     - Code is too compressed. Hard to read. Add spaces when needed
+#     - In general the code needs to be more subdivided. Use more methods 
+#       with clear names to divide operations (even if you don't reuse the 
+#       methods else where, this will make the code more readable)
 
 @clsinit_copy
 class PlanetlabNode(LinuxNode):
@@ -43,6 +53,7 @@ class PlanetlabNode(LinuxNode):
         """
         return cls._blacklist
 
+    ### A.Q. COMMENT: Why did you wrapped the locks inside methods ?
     @classmethod
     def in_provision(cls):
         """ Returns the nodes that anohter RM is trying to provision
@@ -221,6 +232,8 @@ class PlanetlabNode(LinuxNode):
         return self._plapi
 
     def discoverl(self):
+        #### A.Q. COMMENT: no need to have methods for the locks and 
+        ##                 other attributes. Please remove.
         bl = PlanetlabNode.blacklist()
         inpro = PlanetlabNode.in_provision()
         lockbl = PlanetlabNode.lock_bl()
@@ -231,6 +244,12 @@ class PlanetlabNode(LinuxNode):
             if node_id not in bl and node_id not in inpro:
                 try_other = self.do_ping(node_id)
                 if try_other:
+                    # A.Q. COMMENT: Here you could do 
+                    #
+                    #   with self._lockbl:
+                    #       ...
+                    #
+                    #  Class attributes can still be accesed with 'self'
                     lockbl.acquire()
                     bl.append(node_id)
                     lockbl.release()
@@ -306,6 +325,7 @@ class PlanetlabNode(LinuxNode):
                     
 
     def provisionl(self):
+        # A.Q. COMMENT: you can import time on the top
         import time
         bl = PlanetlabNode.blacklist()
         lockbl = PlanetlabNode.lock_bl()
@@ -325,6 +345,9 @@ class PlanetlabNode(LinuxNode):
             t = 0 
             while t < timeout and not ssh_ok:
                 # check ssh connection
+
+                # A.Q. COMMENT IMPORTANT! Instead of issuing SSH commands directly use the
+                #    "execute" method inherithed from LinuxNode with blocking = True
                 command = "ssh %s@%s -o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no 'echo \'GOOD NODE\''" % (slicename, ip)
                 p = subprocess.Popen(command, shell=True, stdout = subprocess.PIPE, stderr = subprocess.PIPE) 
                 stdout, stderr = p.communicate()
@@ -341,6 +364,9 @@ class PlanetlabNode(LinuxNode):
                 with lockbl:
                     bl.append(node)
                     print bl
+                    # A.Q. COMMENT: Make method "delete_slice_node" and there 
+                    #               put this code. Repeat this for all calls to plapi.
+                    #               This will make the code cleaner.
                     self.plapi.delete_slice_node(slicename, [node])
                     self.discover()
                 continue
@@ -351,6 +377,8 @@ class PlanetlabNode(LinuxNode):
                 p = subprocess.Popen(command, shell=True, stdout = subprocess.PIPE, stderr = subprocess.PIPE)
                 stdout, stderr = p.communicate()
                 if stdout.find("/proc type proc") < 0:
+                    # A.Q. COMMENT: lines 382-384 should go to a method
+                    #       "blacklist_node()"
                     lockbl.acquire()
                     bl.append(node)
                     lockbl.release()
@@ -370,7 +398,6 @@ class PlanetlabNode(LinuxNode):
         # call provision de linux node?
         super(PlanetlabNode, self).provision()
 
-        
     def filter_based_on_attributes(self):
         # Map attributes with tagnames of PL
         timeframe = self.get("timeframe")[0]
@@ -485,6 +512,8 @@ class PlanetlabNode(LinuxNode):
         return nodes_inslice
 
     def do_ping(self, node_id):
+        # A.Q. COMMENT: the execfuncs module in utils will do the local ping for you
+        #               code reuse is good...
         ip = self.plapi.get_interfaces({'node_id':node_id}, fields=['ip'])
         ip = ip[0]['ip']
         result = subprocess.call(["ping","-c","2",ip],stdout=subprocess.PIPE,stderr=subprocess.PIPE)
@@ -493,7 +522,7 @@ class PlanetlabNode(LinuxNode):
         elif result == 1 or result == 2:
             return True
 
-
+    # A.Q. Unclear name for method "fail2"
     def fail2(self):
         self.fail()
         msg = "Discovery failed. No candidates found for node"
index 01e7b14..411eb51 100644 (file)
@@ -18,7 +18,7 @@
 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
 
 from nepi.execution.attribute import Attribute, Flags, Types
-from nepi.execution.resource import ResourceManager, clsinit_copy, ResourceState, \
+from nepi.execution.resource import clsinit_copy, ResourceState, \
         reschedule_delay
 from nepi.resources.linux.application import LinuxApplication
 from nepi.resources.planetlab.node import PlanetlabNode
@@ -166,35 +166,31 @@ class PlanetlabTap(LinuxApplication):
                 raise
  
             self.debug("----- READY ---- ")
-            self._ready_time = tnow()
-            self._state = ResourceState.READY
+            self.set_ready()
 
     def start(self):
-        if self._state == ResourceState.READY:
+        if self.state == ResourceState.READY:
             command = self.get("command")
             self.info("Starting command '%s'" % command)
 
-            self._start_time = tnow()
-            self._state = ResourceState.STARTED
+            self.set_started()
         else:
             msg = " Failed to execute command '%s'" % command
             self.error(msg, out, err)
-            self._state = ResourceState.FAILED
+            self.fail()
             raise RuntimeError, msg
 
     def stop(self):
         command = self.get('command') or ''
-        state = self.state
         
-        if state == ResourceState.STARTED:
+        if self.state == ResourceState.STARTED:
             self.info("Stopping command '%s'" % command)
 
             command = "bash %s" % os.path.join(self.app_home, "stop.sh")
             (out, err), proc = self.execute_command(command,
                     blocking = True)
 
-            self._stop_time = tnow()
-            self._state = ResourceState.STOPPED
+            self.set_stopped()
 
     @property
     def state(self):
@@ -208,6 +204,7 @@ class PlanetlabTap(LinuxApplication):
 
                 if out.strip().find(self.get("deviceName")) == -1: 
                     # tap is not running is not running (socket not found)
+                    self._finish_time = tnow()
                     self._state = ResourceState.FINISHED
 
             self._last_state_check = tnow()