Code cleanup. Setting resource state through specific functions
authorAlina Quereilhac <alina.quereilhac@inria.fr>
Sun, 4 Aug 2013 18:48:40 +0000 (11:48 -0700)
committerAlina Quereilhac <alina.quereilhac@inria.fr>
Sun, 4 Aug 2013 18:48:40 +0000 (11:48 -0700)
19 files changed:
src/nepi/execution/ec.py
src/nepi/execution/resource.py
src/nepi/resources/linux/application.py
src/nepi/resources/linux/ccn/ccnapplication.py
src/nepi/resources/linux/ccn/ccncontent.py
src/nepi/resources/linux/ccn/ccnd.py
src/nepi/resources/linux/ccn/ccnr.py
src/nepi/resources/linux/ccn/fibentry.py
src/nepi/resources/linux/node.py
src/nepi/resources/linux/traceroute.py
src/nepi/resources/linux/udptest.py
src/nepi/resources/linux/udptunnel.py
src/nepi/resources/omf/application.py
src/nepi/resources/omf/channel.py
src/nepi/resources/omf/interface.py
src/nepi/resources/omf/node.py
src/nepi/resources/omf/omf_api.py
src/nepi/resources/planetlab/node.py
src/nepi/resources/planetlab/tap.py

index de013d1..6beb3d7 100644 (file)
 #
 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
 
 #
 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
 
-import functools
-import logging
-import os
-import random
-import sys
-import time
-import threading
-
 from nepi.util import guid
 from nepi.util.parallel import ParallelRun
 from nepi.util.timefuncs import tnow, tdiffsec, stabsformat, tsformat 
 from nepi.util import guid
 from nepi.util.parallel import ParallelRun
 from nepi.util.timefuncs import tnow, tdiffsec, stabsformat, tsformat 
@@ -34,9 +26,16 @@ from nepi.execution.scheduler import HeapScheduler, Task, TaskStatus
 from nepi.execution.trace import TraceAttr
 
 # TODO: use multiprocessing instead of threading
 from nepi.execution.trace import TraceAttr
 
 # TODO: use multiprocessing instead of threading
-# TODO: When a failure occurs during deployment, scp and ssh processes are left running behind!!
 # TODO: Allow to reconnect to a running experiment instance! (reconnect mode vs deploy mode)
 
 # TODO: Allow to reconnect to a running experiment instance! (reconnect mode vs deploy mode)
 
+import functools
+import logging
+import os
+import random
+import sys
+import time
+import threading
+
 class ECState(object):
     """ State of the Experiment Controller
    
 class ECState(object):
     """ State of the Experiment Controller
    
@@ -182,7 +181,7 @@ class ExperimentController(object):
 
     def wait_finished(self, guids):
         """ Blocking method that wait until all the RM from the 'guid' list 
 
     def wait_finished(self, guids):
         """ Blocking method that wait until all the RM from the 'guid' list 
-            reached the state FINISHED
+            reached the state FINISHED ( or STOPPED, FAILED or RELEASED )
 
         :param guids: List of guids
         :type guids: list
 
         :param guids: List of guids
         :type guids: list
@@ -191,31 +190,34 @@ class ExperimentController(object):
 
     def wait_started(self, guids):
         """ Blocking method that wait until all the RM from the 'guid' list 
 
     def wait_started(self, guids):
         """ Blocking method that wait until all the RM from the 'guid' list 
-            reached the state STARTED
+            reached the state STARTED ( or STOPPED, FINISHED, FAILED, RELEASED)
 
         :param guids: List of guids
         :type guids: list
         """
 
         :param guids: List of guids
         :type guids: list
         """
-        return self.wait(guids, states = [ResourceState.STARTED,
-            ResourceState.STOPPED,
-            ResourceState.FAILED,
-            ResourceState.FINISHED])
+        return self.wait(guids, state = ResourceState.STARTED)
 
     def wait_released(self, guids):
         """ Blocking method that wait until all the RM from the 'guid' list 
 
     def wait_released(self, guids):
         """ Blocking method that wait until all the RM from the 'guid' list 
-            reached the state RELEASED
+            reached the state RELEASED (or FAILED)
+
+        :param guids: List of guids
+        :type guids: list
+        """
+        # TODO: solve state concurrency BUG and !!!!
+        # correct waited release state to state = ResourceState.FAILED)
+        return self.wait(guids, state = ResourceState.FINISHED)
+
+    def wait_deployed(self, guids):
+        """ Blocking method that wait until all the RM from the 'guid' list 
+            reached the state READY (or any higher state)
 
         :param guids: List of guids
         :type guids: list
         """
 
         :param guids: List of guids
         :type guids: list
         """
-        return self.wait(guids, states = [ResourceState.RELEASED,
-            ResourceState.STOPPED,
-            ResourceState.FAILED,
-            ResourceState.FINISHED])
+        return self.wait(guids, state = ResourceState.READY)
 
 
-    def wait(self, guids, states = [ResourceState.FINISHED, 
-            ResourceState.FAILED,
-            ResourceState.STOPPED]):
+    def wait(self, guids, state = ResourceState.STOPPED):
         """ Blocking method that waits until all the RM from the 'guid' list 
             reached state 'state' or until a failure occurs
             
         """ Blocking method that waits until all the RM from the 'guid' list 
             reached state 'state' or until a failure occurs
             
@@ -237,14 +239,14 @@ class ExperimentController(object):
 
             # If a guid reached one of the target states, remove it from list
             guid = guids[0]
 
             # If a guid reached one of the target states, remove it from list
             guid = guids[0]
-            state = self.state(guid)
+            rstate = self.state(guid)
 
 
-            if state in states:
+            if rstate >= state:
                 guids.remove(guid)
             else:
                 # Debug...
                 guids.remove(guid)
             else:
                 # Debug...
-                self.logger.debug(" WAITING FOR %g - state %s " % (guid,
-                    self.state(guid, hr = True)))
+                self.logger.debug(" WAITING FOR guid %d - state is %s, required is >= %s " % (guid,
+                    self.state(guid, hr = True), state))
 
                 # Take the opportunity to 'refresh' the states of the RMs.
                 # Query only the first up to N guids (not to overwhelm 
 
                 # Take the opportunity to 'refresh' the states of the RMs.
                 # Query only the first up to N guids (not to overwhelm 
@@ -262,7 +264,7 @@ class ExperimentController(object):
                 # If the guid is not in one of the target states, wait and
                 # continue quering. We keep the sleep big to decrease the
                 # number of RM state queries
                 # If the guid is not in one of the target states, wait and
                 # continue quering. We keep the sleep big to decrease the
                 # number of RM state queries
-                time.sleep(2)
+                time.sleep(4)
   
     def get_task(self, tid):
         """ Get a specific task
   
     def get_task(self, tid):
         """ Get a specific task
index 978f0ee..2d738c5 100644 (file)
@@ -200,8 +200,8 @@ class ResourceManager(Logger):
         # the resource instance gets a copy of all traces
         self._trcs = copy.deepcopy(self._traces)
 
         # the resource instance gets a copy of all traces
         self._trcs = copy.deepcopy(self._traces)
 
-        self._state = ResourceState.NEW
-
+        # Each resource is placed on a deployment group by the EC
+        # during deployment
         self.deployment_group = None
 
         self._start_time = None
         self.deployment_group = None
 
         self._start_time = None
@@ -213,6 +213,8 @@ class ResourceManager(Logger):
         self._finish_time = None
         self._failed_time = None
 
         self._finish_time = None
         self._failed_time = None
 
+        self._state = ResourceState.NEW
+
     @property
     def guid(self):
         """ Returns the global unique identifier of the RM """
     @property
     def guid(self):
         """ Returns the global unique identifier of the RM """
@@ -316,9 +318,8 @@ class ResourceManager(Logger):
         This  method is resposible for selecting an individual resource
         matching user requirements.
         This method should be redefined when necessary in child classes.
         This  method is resposible for selecting an individual resource
         matching user requirements.
         This method should be redefined when necessary in child classes.
-        """ 
-        self._discover_time = tnow()
-        self._state = ResourceState.DISCOVERED
+        """
+        self.set_discovered()
 
     def provision(self):
         """ Performs resource provisioning.
 
     def provision(self):
         """ Performs resource provisioning.
@@ -327,9 +328,8 @@ class ResourceManager(Logger):
         After this method has been successfully invoked, the resource
         should be acccesible/controllable by the RM.
         This method should be redefined when necessary in child classes.
         After this method has been successfully invoked, the resource
         should be acccesible/controllable by the RM.
         This method should be redefined when necessary in child classes.
-        """ 
-        self._provision_time = tnow()
-        self._state = ResourceState.PROVISIONED
+        """
+        self.set_provisioned()
 
     def start(self):
         """ Starts the resource.
 
     def start(self):
         """ Starts the resource.
@@ -337,12 +337,11 @@ class ResourceManager(Logger):
         There is no generic start behavior for all resources.
         This method should be redefined when necessary in child classes.
         """
         There is no generic start behavior for all resources.
         This method should be redefined when necessary in child classes.
         """
-        if not self._state in [ResourceState.READY, ResourceState.STOPPED]:
+        if not self.state in [ResourceState.READY, ResourceState.STOPPED]:
             self.error("Wrong state %s for start" % self.state)
             return
 
             self.error("Wrong state %s for start" % self.state)
             return
 
-        self._start_time = tnow()
-        self._state = ResourceState.STARTED
+        self.set_started()
 
     def stop(self):
         """ Stops the resource.
 
     def stop(self):
         """ Stops the resource.
@@ -350,12 +349,31 @@ class ResourceManager(Logger):
         There is no generic stop behavior for all resources.
         This method should be redefined when necessary in child classes.
         """
         There is no generic stop behavior for all resources.
         This method should be redefined when necessary in child classes.
         """
-        if not self._state in [ResourceState.STARTED]:
+        if not self.state in [ResourceState.STARTED]:
             self.error("Wrong state %s for stop" % self.state)
             return
             self.error("Wrong state %s for stop" % self.state)
             return
+        
+        self.set_stopped()
 
 
-        self._stop_time = tnow()
-        self._state = ResourceState.STOPPED
+    def deploy(self):
+        """ Execute all steps required for the RM to reach the state READY
+
+        """
+        if self.state > ResourceState.READY:
+            self.error("Wrong state %s for deploy" % self.state)
+            return
+
+        self.debug("----- READY ---- ")
+        self.set_ready()
+
+    def release(self):
+        self.set_released()
+
+    def finish(self):
+        self.set_finished()
+    def fail(self):
+        self.set_failed()
 
     def set(self, name, value):
         """ Set the value of the attribute
 
     def set(self, name, value):
         """ Set the value of the attribute
@@ -655,39 +673,6 @@ class ResourceManager(Logger):
             self.debug(" ----- STOPPING ---- ") 
             self.stop()
 
             self.debug(" ----- STOPPING ---- ") 
             self.stop()
 
-    def deploy(self):
-        """ Execute all steps required for the RM to reach the state READY
-
-        """
-        if self._state > ResourceState.READY:
-            self.error("Wrong state %s for deploy" % self.state)
-            return
-
-        self.debug("----- READY ---- ")
-        self._ready_time = tnow()
-        self._state = ResourceState.READY
-
-    def release(self):
-        """Release any resources used by this RM
-
-        """
-        self._release_time = tnow()
-        self._state = ResourceState.RELEASED
-
-    def finish(self):
-        """ Mark ResourceManager as FINISHED
-
-        """
-        self._finish_time = tnow()
-        self._state = ResourceState.FINISHED
-
-    def fail(self):
-        """ Mark ResourceManager as FAILED
-
-        """
-        self._failed_time = tnow()
-        self._state = ResourceState.FAILED
-
     def connect(self, guid):
         """ Performs actions that need to be taken upon associating RMs.
         This method should be redefined when necessary in child classes.
     def connect(self, guid):
         """ Performs actions that need to be taken upon associating RMs.
         This method should be redefined when necessary in child classes.
@@ -712,6 +697,46 @@ class ResourceManager(Logger):
         """
         # TODO: Validate!
         return True
         """
         # TODO: Validate!
         return True
+    
+    def set_started(self):
+        """ Mark ResourceManager as STARTED """
+        self._start_time = tnow()
+        self._state = ResourceState.STARTED
+        
+    def set_stopped(self):
+        """ Mark ResourceManager as STOPPED """
+        self._stop_time = tnow()
+        self._state = ResourceState.STOPPED
+
+    def set_ready(self):
+        """ Mark ResourceManager as READY """
+        self._ready_time = tnow()
+        self._state = ResourceState.READY
+
+    def set_released(self):
+        """ Mark ResourceManager as REALEASED """
+        self._release_time = tnow()
+        self._state = ResourceState.RELEASED
+
+    def set_finished(self):
+        """ Mark ResourceManager as FINISHED """
+        self._finish_time = tnow()
+        self._state = ResourceState.FINISHED
+
+    def set_failed(self):
+        """ Mark ResourceManager as FAILED """
+        self._failed_time = tnow()
+        self._state = ResourceState.FAILED
+
+    def set_discovered(self):
+        """ Mark ResourceManager as DISCOVERED """
+        self._discover_time = tnow()
+        self._state = ResourceState.DISCOVERED
+
+    def set_provisioned(self):
+        """ Mark ResourceManager as PROVISIONED """
+        self._provision_time = tnow()
+        self._state = ResourceState.PROVISIONED
 
 class ResourceFactory(object):
     _resource_types = dict()
 
 class ResourceFactory(object):
     _resource_types = dict()
index 3519138..f9c46c8 100644 (file)
@@ -20,7 +20,7 @@
 from nepi.execution.attribute import Attribute, Flags, Types
 from nepi.execution.trace import Trace, TraceAttr
 from nepi.execution.resource import ResourceManager, clsinit, ResourceState, \
 from nepi.execution.attribute import Attribute, Flags, Types
 from nepi.execution.trace import Trace, TraceAttr
 from nepi.execution.resource import ResourceManager, clsinit, ResourceState, \
-    reschedule_delay
+        reschedule_delay
 from nepi.resources.linux.node import LinuxNode
 from nepi.util.sshfuncs import ProcStatus
 from nepi.util.timefuncs import tnow, tdiffsec
 from nepi.resources.linux.node import LinuxNode
 from nepi.util.sshfuncs import ProcStatus
 from nepi.util.timefuncs import tnow, tdiffsec
@@ -29,6 +29,7 @@ import os
 import subprocess
 
 # TODO: Resolve wildcards in commands!!
 import subprocess
 
 # TODO: Resolve wildcards in commands!!
+# TODO: When a failure occurs during deployment, scp and ssh processes are left running behind!!
 
 @clsinit
 class LinuxApplication(ResourceManager):
 
 @clsinit
 class LinuxApplication(ResourceManager):
@@ -483,7 +484,7 @@ class LinuxApplication(ResourceManager):
                 raise
 
             super(LinuxApplication, self).deploy()
                 raise
 
             super(LinuxApplication, self).deploy()
-
+    
     def start(self):
         command = self.get("command")
 
     def start(self):
         command = self.get("command")
 
@@ -492,7 +493,7 @@ class LinuxApplication(ResourceManager):
         if not command:
             # If no command was given (i.e. Application was used for dependency
             # installation), then the application is directly marked as FINISHED
         if not command:
             # If no command was given (i.e. Application was used for dependency
             # installation), then the application is directly marked as FINISHED
-            self._state = ResourceState.FINISHED
+            self.set_finished()
         else:
 
             if self.in_foreground:
         else:
 
             if self.in_foreground:
@@ -585,14 +586,12 @@ class LinuxApplication(ResourceManager):
 
         if self.state == ResourceState.STARTED:
         
 
         if self.state == ResourceState.STARTED:
         
-            self.info("Stopping command '%s'" % command)
+            self.info("Stopping command '%s' " % command)
         
             # If the command is running in foreground (it was launched using
             # the node 'execute' method), then we use the handler to the Popen
             # process to kill it. Else we send a kill signal using the pid and ppid
             # retrieved after running the command with the node 'run' method
         
             # If the command is running in foreground (it was launched using
             # the node 'execute' method), then we use the handler to the Popen
             # process to kill it. Else we send a kill signal using the pid and ppid
             # retrieved after running the command with the node 'run' method
-            stopped = True
-
             if self._proc:
                 self._proc.kill()
             else:
             if self._proc:
                 self._proc.kill()
             else:
@@ -602,12 +601,12 @@ class LinuxApplication(ResourceManager):
                     (out, err), proc = self.node.kill(self.pid, self.ppid,
                             sudo = self._sudo_kill)
 
                     (out, err), proc = self.node.kill(self.pid, self.ppid,
                             sudo = self._sudo_kill)
 
+                    # TODO: check if execution errors occurred
                     if proc.poll() or err:
                     if proc.poll() or err:
-                        # check if execution errors occurred
                         msg = " Failed to STOP command '%s' " % self.get("command")
                         self.error(msg, out, err)
                         self.fail()
                         msg = " Failed to STOP command '%s' " % self.get("command")
                         self.error(msg, out, err)
                         self.fail()
-
+        
         if self.state == ResourceState.STARTED:
             super(LinuxApplication, self).stop()
 
         if self.state == ResourceState.STARTED:
             super(LinuxApplication, self).stop()
 
@@ -620,11 +619,11 @@ class LinuxApplication(ResourceManager):
 
         self.stop()
 
 
         self.stop()
 
-        if self.state == ResourceState.STOPPED:
+        if self.state != ResourceState.FAILED:
             self.info("Resource released")
 
             super(LinuxApplication, self).release()
             self.info("Resource released")
 
             super(LinuxApplication, self).release()
-    
+   
     @property
     def state(self):
         """ Returns the state of the application
     @property
     def state(self):
         """ Returns the state of the application
@@ -644,9 +643,9 @@ class LinuxApplication(ResourceManager):
                     err = self._proc.stderr.read()
                     self.error(msg, out, err)
                     self.fail()
                     err = self._proc.stderr.read()
                     self.error(msg, out, err)
                     self.fail()
-                elif retcode == 0:
-                    self._state = ResourceState.FINISHED
 
 
+                elif retcode == 0:
+                    self.finish()
             else:
                 # We need to query the status of the command we launched in 
                 # background. In order to avoid overwhelming the remote host and
             else:
                 # We need to query the status of the command we launched in 
                 # background. In order to avoid overwhelming the remote host and
@@ -665,12 +664,12 @@ class LinuxApplication(ResourceManager):
                                     self.run_home)
 
                             if err:
                                     self.run_home)
 
                             if err:
-                                msg = " Failed to execute command '%s'" % \
+                                msg = "Failed to execute command '%s'" % \
                                         self.get("command")
                                 self.error(msg, out, err)
                                 self.fail()
                             else:
                                         self.get("command")
                                 self.error(msg, out, err)
                                 self.fail()
                             else:
-                               self._state = ResourceState.FINISHED
+                                self.finish()
 
                     self._last_state_check = tnow()
 
 
                     self._last_state_check = tnow()
 
index 8a09122..3d8943a 100644 (file)
@@ -17,7 +17,7 @@
 #
 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
 
 #
 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
 
-from nepi.execution.resource import ResourceManager, clsinit_copy, ResourceState, \
+from nepi.execution.resource import clsinit_copy, ResourceState, \
     reschedule_delay
 from nepi.resources.linux.application import LinuxApplication
 from nepi.resources.linux.ccn.ccnd import LinuxCCND
     reschedule_delay
 from nepi.resources.linux.application import LinuxApplication
 from nepi.resources.linux.ccn.ccnd import LinuxCCND
@@ -64,8 +64,7 @@ class LinuxCCNApplication(LinuxApplication):
                 raise
  
             self.debug("----- READY ---- ")
                 raise
  
             self.debug("----- READY ---- ")
-            self._ready_time = tnow()
-            self._state = ResourceState.READY
+            self.set_ready()
 
     @property
     def _environment(self):
 
     @property
     def _environment(self):
index 5b9d925..cae55b5 100644 (file)
@@ -101,8 +101,7 @@ class LinuxCCNContent(LinuxApplication):
                 raise
  
             self.debug("----- READY ---- ")
                 raise
  
             self.debug("----- READY ---- ")
-            self._ready_time = tnow()
-            self._state = ResourceState.READY
+            self.set_ready()
 
     def upload_start_command(self):
         command = self.get("command")
 
     def upload_start_command(self):
         command = self.get("command")
@@ -128,22 +127,17 @@ class LinuxCCNContent(LinuxApplication):
             raise RuntimeError, msg
 
     def start(self):
             raise RuntimeError, msg
 
     def start(self):
-        if self._state == ResourceState.READY:
+        if self.state == ResourceState.READY:
             command = self.get("command")
             self.info("Starting command '%s'" % command)
 
             command = self.get("command")
             self.info("Starting command '%s'" % command)
 
-            self._start_time = tnow()
-            self._state = ResourceState.STARTED
+            self.set_started()
         else:
             msg = " Failed to execute command '%s'" % command
             self.error(msg, out, err)
         else:
             msg = " Failed to execute command '%s'" % command
             self.error(msg, out, err)
-            self._state = ResourceState.FAILED
+            sef.fail()
             raise RuntimeError, msg
 
             raise RuntimeError, msg
 
-    @property
-    def state(self):
-        return self._state
-
     @property
     def _start_command(self):
         command = ["ccnseqwriter"]
     @property
     def _start_command(self):
         command = ["ccnseqwriter"]
index 20c04fc..4fdb0ce 100644 (file)
@@ -19,8 +19,8 @@
 
 from nepi.execution.attribute import Attribute, Flags, Types
 from nepi.execution.trace import Trace, TraceAttr
 
 from nepi.execution.attribute import Attribute, Flags, Types
 from nepi.execution.trace import Trace, TraceAttr
-from nepi.execution.resource import ResourceManager, clsinit_copy, ResourceState, \
-    reschedule_delay
+from nepi.execution.resource import ResourceManager, clsinit_copy, \
+        ResourceState, reschedule_delay
 from nepi.resources.linux.application import LinuxApplication
 from nepi.resources.linux.node import OSType
 from nepi.util.timefuncs import tnow, tdiffsec
 from nepi.resources.linux.application import LinuxApplication
 from nepi.resources.linux.node import OSType
 from nepi.util.timefuncs import tnow, tdiffsec
@@ -181,8 +181,7 @@ class LinuxCCND(LinuxApplication):
                 raise
  
             self.debug("----- READY ---- ")
                 raise
  
             self.debug("----- READY ---- ")
-            self._ready_time = tnow()
-            self._state = ResourceState.READY
+            self.set_ready()
 
     def upload_start_command(self):
         command = self.get("command")
 
     def upload_start_command(self):
         command = self.get("command")
@@ -204,23 +203,21 @@ class LinuxCCND(LinuxApplication):
                 raise_on_error = True)
 
     def start(self):
                 raise_on_error = True)
 
     def start(self):
-        if self._state == ResourceState.READY:
+        if self.state == ResourceState.READY:
             command = self.get("command")
             self.info("Starting command '%s'" % command)
 
             command = self.get("command")
             self.info("Starting command '%s'" % command)
 
-            self._start_time = tnow()
-            self._state = ResourceState.STARTED
+            self.set_started()
         else:
             msg = " Failed to execute command '%s'" % command
             self.error(msg, out, err)
         else:
             msg = " Failed to execute command '%s'" % command
             self.error(msg, out, err)
-            self._state = ResourceState.FAILED
+            self.set_failed()
             raise RuntimeError, msg
 
     def stop(self):
         command = self.get('command') or ''
             raise RuntimeError, msg
 
     def stop(self):
         command = self.get('command') or ''
-        state = self.state
         
         
-        if state == ResourceState.STARTED:
+        if self.state == ResourceState.STARTED:
             self.info("Stopping command '%s'" % command)
 
             command = "ccndstop"
             self.info("Stopping command '%s'" % command)
 
             command = "ccndstop"
@@ -241,8 +238,7 @@ class LinuxCCND(LinuxApplication):
                         stdout = "ccndstop_stdout", 
                         stderr = "ccndstop_stderr")
 
                         stdout = "ccndstop_stdout", 
                         stderr = "ccndstop_stderr")
 
-            self._stop_time = tnow()
-            self._state = ResourceState.STOPPED
+            self.set_stopped()
     
     @property
     def state(self):
     
     @property
     def state(self):
@@ -256,12 +252,12 @@ class LinuxCCND(LinuxApplication):
 
             if retcode == 1 and err.find("No such file or directory") > -1:
                 # ccnd is not running (socket not found)
 
             if retcode == 1 and err.find("No such file or directory") > -1:
                 # ccnd is not running (socket not found)
-                self._state = ResourceState.FINISHED
+                self.set_finished()
             elif retcode:
                 # other errors ...
                 msg = " Failed to execute command '%s'" % self.get("command")
                 self.error(msg, out, err)
             elif retcode:
                 # other errors ...
                 msg = " Failed to execute command '%s'" % self.get("command")
                 self.error(msg, out, err)
-                self._state = ResourceState.FAILED
+                self.fail()
 
             self._last_state_check = tnow()
 
 
             self._last_state_check = tnow()
 
index ac10840..378f93c 100644 (file)
@@ -225,8 +225,7 @@ class LinuxCCNR(LinuxApplication):
                 raise
  
             self.debug("----- READY ---- ")
                 raise
  
             self.debug("----- READY ---- ")
-            self._ready_time = tnow()
-            self._state = ResourceState.READY
+            self.set_ready()
 
     def upload_start_command(self):
         command = self.get("command")
 
     def upload_start_command(self):
         command = self.get("command")
@@ -257,16 +256,15 @@ class LinuxCCNR(LinuxApplication):
                 raise_on_error = True)
 
     def start(self):
                 raise_on_error = True)
 
     def start(self):
-        if self._state == ResourceState.READY:
+        if self.state == ResourceState.READY:
             command = self.get("command")
             self.info("Starting command '%s'" % command)
 
             command = self.get("command")
             self.info("Starting command '%s'" % command)
 
-            self._start_time = tnow()
-            self._state = ResourceState.STARTED
+            self.set_started()
         else:
             msg = " Failed to execute command '%s'" % command
             self.error(msg, out, err)
         else:
             msg = " Failed to execute command '%s'" % command
             self.error(msg, out, err)
-            self._state = ResourceState.FAILED
+            self.fail()
             raise RuntimeError, msg
 
     @property
             raise RuntimeError, msg
 
     @property
index 62d9049..9d8e8c2 100644 (file)
@@ -139,8 +139,7 @@ class LinuxFIBEntry(LinuxApplication):
                 raise
  
             self.debug("----- READY ---- ")
                 raise
  
             self.debug("----- READY ---- ")
-            self._ready_time = tnow()
-            self._state = ResourceState.READY
+            self.set_ready()
 
     def upload_start_command(self):
         command = self.get("command")
 
     def upload_start_command(self):
         command = self.get("command")
@@ -160,9 +159,9 @@ class LinuxFIBEntry(LinuxApplication):
                 env, blocking = True)
 
         if proc.poll():
                 env, blocking = True)
 
         if proc.poll():
-            self._state = ResourceState.FAILED
             msg = "Failed to execute command"
             self.error(msg, out, err)
             msg = "Failed to execute command"
             self.error(msg, out, err)
+            self.fail()
             raise RuntimeError, msg
         
     def configure(self):
             raise RuntimeError, msg
         
     def configure(self):
@@ -197,16 +196,15 @@ class LinuxFIBEntry(LinuxApplication):
             self.ec.deploy(guids=[self._traceroute], group = self.deployment_group)
 
     def start(self):
             self.ec.deploy(guids=[self._traceroute], group = self.deployment_group)
 
     def start(self):
-        if self._state in [ResourceState.READY, ResourceState.STARTED]:
+        if self.state == ResourceState.READY:
             command = self.get("command")
             self.info("Starting command '%s'" % command)
 
             command = self.get("command")
             self.info("Starting command '%s'" % command)
 
-            self._start_time = tnow()
-            self._state = ResourceState.STARTED
+            self.set_started()
         else:
             msg = " Failed to execute command '%s'" % command
             self.error(msg, out, err)
         else:
             msg = " Failed to execute command '%s'" % command
             self.error(msg, out, err)
-            self._state = ResourceState.FAILED
+            self.fail()
             raise RuntimeError, msg
 
     def stop(self):
             raise RuntimeError, msg
 
     def stop(self):
@@ -222,12 +220,7 @@ class LinuxFIBEntry(LinuxApplication):
             if proc.poll():
                 pass
 
             if proc.poll():
                 pass
 
-            self._stop_time = tnow()
-            self._state = ResourceState.STOPPED
-
-    @property
-    def state(self):
-        return self._state
+            self.set_stopped()
 
     @property
     def _start_command(self):
 
     @property
     def _start_command(self):
index 9188022..d71d21f 100644 (file)
@@ -196,8 +196,14 @@ class LinuxNode(ResourceManager):
         # home directory at Linux host
         self._home_dir = ""
         
         # home directory at Linux host
         self._home_dir = ""
         
-        # lock to avoid concurrency issues on methods used by applications 
-        self._lock = threading.Lock()
+        # lock to prevent concurrent applications on the same node,
+        # to execute commands at the same time. There are potential
+        # concurrency issues when using SSH to a same host from 
+        # multiple threads. There are also possible operational 
+        # issues, e.g. an application querying the existence 
+        # of a file or folder prior to its creation, and another 
+        # application creating the same file or folder in between.
+        self._node_lock = threading.Lock()
     
     def log_message(self, msg):
         return " guid %d - host %s - %s " % (self.guid, 
     
     def log_message(self, msg):
         return " guid %d - host %s - %s " % (self.guid, 
@@ -351,7 +357,7 @@ class LinuxNode(ResourceManager):
                 self.discover()
                 self.provision()
             except:
                 self.discover()
                 self.provision()
             except:
-                self._state = ResourceState.FAILED
+                self.fail()
                 raise
 
         # Node needs to wait until all associated interfaces are 
                 raise
 
         # Node needs to wait until all associated interfaces are 
@@ -456,7 +462,7 @@ class LinuxNode(ResourceManager):
                     env = env)
         else:
             if with_lock:
                     env = env)
         else:
             if with_lock:
-                with self._lock:
+                with self._node_lock:
                     (out, err), proc = sshfuncs.rexec(
                         command, 
                         host = self.get("hostname"),
                     (out, err), proc = sshfuncs.rexec(
                         command, 
                         host = self.get("hostname"),
@@ -524,7 +530,7 @@ class LinuxNode(ResourceManager):
                     sudo = sudo,
                     user = user) 
         else:
                     sudo = sudo,
                     user = user) 
         else:
-            with self._lock:
+            with self._node_lock:
                 (out, err), proc = sshfuncs.rspawn(
                     command,
                     pidfile = pidfile,
                 (out, err), proc = sshfuncs.rspawn(
                     command,
                     pidfile = pidfile,
@@ -549,7 +555,7 @@ class LinuxNode(ResourceManager):
         if self.localhost:
             pidtuple =  execfuncs.lgetpid(os.path.join(home, pidfile))
         else:
         if self.localhost:
             pidtuple =  execfuncs.lgetpid(os.path.join(home, pidfile))
         else:
-            with self._lock:
+            with self._node_lock:
                 pidtuple = sshfuncs.rgetpid(
                     os.path.join(home, pidfile),
                     host = self.get("hostname"),
                 pidtuple = sshfuncs.rgetpid(
                     os.path.join(home, pidfile),
                     host = self.get("hostname"),
@@ -566,7 +572,7 @@ class LinuxNode(ResourceManager):
         if self.localhost:
             status = execfuncs.lstatus(pid, ppid)
         else:
         if self.localhost:
             status = execfuncs.lstatus(pid, ppid)
         else:
-            with self._lock:
+            with self._node_lock:
                 status = sshfuncs.rstatus(
                         pid, ppid,
                         host = self.get("hostname"),
                 status = sshfuncs.rstatus(
                         pid, ppid,
                         host = self.get("hostname"),
@@ -588,7 +594,7 @@ class LinuxNode(ResourceManager):
             if self.localhost:
                 (out, err), proc = execfuncs.lkill(pid, ppid, sudo)
             else:
             if self.localhost:
                 (out, err), proc = execfuncs.lkill(pid, ppid, sudo)
             else:
-                with self._lock:
+                with self._node_lock:
                     (out, err), proc = sshfuncs.rkill(
                         pid, ppid,
                         host = self.get("hostname"),
                     (out, err), proc = sshfuncs.rkill(
                         pid, ppid,
                         host = self.get("hostname"),
@@ -608,7 +614,7 @@ class LinuxNode(ResourceManager):
                     recursive = True,
                     strict_host_checking = False)
         else:
                     recursive = True,
                     strict_host_checking = False)
         else:
-            with self._lock:
+            with self._node_lock:
                 (out, err), proc = sshfuncs.rcopy(
                     src, dst, 
                     port = self.get("port"),
                 (out, err), proc = sshfuncs.rcopy(
                     src, dst, 
                     port = self.get("port"),
index 4d55eb1..2e03f62 100644 (file)
@@ -23,6 +23,7 @@ from nepi.resources.linux.application import LinuxApplication
 from nepi.util.timefuncs import tnow
 
 import os
 from nepi.util.timefuncs import tnow
 
 import os
+import socket
 
 @clsinit_copy
 class LinuxTraceroute(LinuxApplication):
 
 @clsinit_copy
 class LinuxTraceroute(LinuxApplication):
@@ -42,12 +43,21 @@ class LinuxTraceroute(LinuxApplication):
             default = False,
             flags = Flags.ExecReadOnly)
 
             default = False,
             flags = Flags.ExecReadOnly)
 
+        use_ip = Attribute("useIP",
+            "Use the IP address instead of the host domain name. "
+            "Useful for environments were dns resolution problems occur "
+            "frequently",
+            type = Types.Bool,
+            default = False,
+            flags = Flags.ExecReadOnly)
+
         target = Attribute("target",
             "Traceroute target host (host that will be pinged)",
             flags = Flags.ExecReadOnly)
 
         cls._register_attribute(countinuous)
         cls._register_attribute(print_timestamp)
         target = Attribute("target",
             "Traceroute target host (host that will be pinged)",
             flags = Flags.ExecReadOnly)
 
         cls._register_attribute(countinuous)
         cls._register_attribute(print_timestamp)
+        cls._register_attribute(use_ip)
         cls._register_attribute(target)
 
     def __init__(self, ec, guid):
         cls._register_attribute(target)
 
     def __init__(self, ec, guid):
@@ -71,7 +81,12 @@ class LinuxTraceroute(LinuxApplication):
         if self.get("printTimestamp") == True:
             args.append("""echo "`date +'%Y%m%d%H%M%S'`";""")
         args.append("traceroute")
         if self.get("printTimestamp") == True:
             args.append("""echo "`date +'%Y%m%d%H%M%S'`";""")
         args.append("traceroute")
-        args.append(self.get("target"))
+
+        target = self.get("target")
+        if self.get("useIP") == True:
+            target = socket.gethostbyname(target)
+        args.append(target)
+        
         if self.get("continuous") == True:
             args.append("; sleep 2 ; done ")
 
         if self.get("continuous") == True:
             args.append("; sleep 2 ; done ")
 
index dbb0813..779b8c1 100644 (file)
@@ -18,7 +18,7 @@
 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
 
 from nepi.execution.attribute import Attribute, Flags, Types
 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
 
 from nepi.execution.attribute import Attribute, Flags, Types
-from nepi.execution.resource import ResourceManager, clsinit_copy, ResourceState, \
+from nepi.execution.resource import clsinit_copy, ResourceState, \
         reschedule_delay
 from nepi.execution.resource import clsinit_copy 
 from nepi.resources.linux.application import LinuxApplication
         reschedule_delay
 from nepi.execution.resource import clsinit_copy 
 from nepi.resources.linux.application import LinuxApplication
@@ -250,16 +250,15 @@ class LinuxUdpTest(LinuxApplication):
     def start(self):
         if self.get("s") == True:
             # Server is already running
     def start(self):
         if self.get("s") == True:
             # Server is already running
-            if self._state == ResourceState.READY:
+            if self.state == ResourceState.READY:
                 command = self.get("command")
                 self.info("Starting command '%s'" % command)
 
                 command = self.get("command")
                 self.info("Starting command '%s'" % command)
 
-                self._start_time = tnow()
-                self._state = ResourceState.STARTED
+                self.set_started()
             else:
                 msg = " Failed to execute command '%s'" % command
                 self.error(msg, out, err)
             else:
                 msg = " Failed to execute command '%s'" % command
                 self.error(msg, out, err)
-                self._state = ResourceState.FAILED
+                self.fail()
                 raise RuntimeError, msg
         else:
             super(LinuxUdpTest, self).start()
                 raise RuntimeError, msg
         else:
             super(LinuxUdpTest, self).start()
index 349ddc2..1d23736 100644 (file)
@@ -18,7 +18,7 @@
 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
 
 from nepi.execution.attribute import Attribute, Flags, Types
 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
 
 from nepi.execution.attribute import Attribute, Flags, Types
-from nepi.execution.resource import ResourceManager, clsinit_copy, ResourceState, \
+from nepi.execution.resource import clsinit_copy, ResourceState, \
         reschedule_delay
 from nepi.resources.linux.application import LinuxApplication
 from nepi.util.sshfuncs import ProcStatus
         reschedule_delay
 from nepi.resources.linux.application import LinuxApplication
 from nepi.util.sshfuncs import ProcStatus
@@ -187,9 +187,7 @@ class UdpTunnel(LinuxApplication):
        
         self.info("Provisioning finished")
  
        
         self.info("Provisioning finished")
  
-        self.debug("----- READY ---- ")
-        self._provision_time = tnow()
-        self._state = ResourceState.PROVISIONED
+        self.set_provisioned()
 
     def deploy(self):
         if (not self.endpoint1 or self.endpoint1.state < ResourceState.READY) or \
 
     def deploy(self):
         if (not self.endpoint1 or self.endpoint1.state < ResourceState.READY) or \
@@ -204,29 +202,24 @@ class UdpTunnel(LinuxApplication):
                 raise
  
             self.debug("----- READY ---- ")
                 raise
  
             self.debug("----- READY ---- ")
-            self._ready_time = tnow()
-            self._state = ResourceState.READY
+            self.set_ready()
 
     def start(self):
 
     def start(self):
-        if self._state == ResourceState.READY:
+        if self.state == ResourceState.READY:
             command = self.get("command")
             self.info("Starting command '%s'" % command)
             command = self.get("command")
             self.info("Starting command '%s'" % command)
-
-            self._start_time = tnow()
-            self._state = ResourceState.STARTED
+            
+            self.set_started()
         else:
             msg = " Failed to execute command '%s'" % command
             self.error(msg, out, err)
         else:
             msg = " Failed to execute command '%s'" % command
             self.error(msg, out, err)
-            self._state = ResourceState.FAILED
+            self.fail()
             raise RuntimeError, msg
 
             raise RuntimeError, msg
 
-    # XXX: Leaves process unkilled!! 
-    #       Implement another mechanism to kill the tunnel!
     def stop(self):
         """ Stops application execution
         """
         if self.state == ResourceState.STARTED:
     def stop(self):
         """ Stops application execution
         """
         if self.state == ResourceState.STARTED:
-            stopped = True
             self.info("Stopping tunnel")
     
             # Only try to kill the process if the pid and ppid
             self.info("Stopping tunnel")
     
             # Only try to kill the process if the pid and ppid
@@ -242,11 +235,9 @@ class UdpTunnel(LinuxApplication):
                     msg = " Failed to STOP tunnel"
                     self.error(msg, err1, err2)
                     self.fail()
                     msg = " Failed to STOP tunnel"
                     self.error(msg, err1, err2)
                     self.fail()
-                    stopped = False
 
 
-            if stopped:
-                self._stop_time = tnow()
-                self._state = ResourceState.STOPPED
+        if self.state == ResourceState.STARTED:
+            self.set_stopped()
 
     @property
     def state(self):
 
     @property
     def state(self):
@@ -280,7 +271,7 @@ class UdpTunnel(LinuxApplication):
                             self.error(msg, err1, err2)
                             self.fail()
                         else:
                             self.error(msg, err1, err2)
                             self.fail()
                         else:
-                            self._state = ResourceState.FINISHED
+                            self.set_finished()
 
                 self._last_state_check = tnow()
 
 
                 self._last_state_check = tnow()
 
@@ -288,11 +279,15 @@ class UdpTunnel(LinuxApplication):
 
     def wait_local_port(self, endpoint):
         """ Waits until the local_port file for the endpoint is generated, 
 
     def wait_local_port(self, endpoint):
         """ Waits until the local_port file for the endpoint is generated, 
-            and returns the port number """
+        and returns the port number 
+        
+        """
         return self.wait_file(endpoint, "local_port")
 
     def wait_result(self, endpoint):
         return self.wait_file(endpoint, "local_port")
 
     def wait_result(self, endpoint):
-        """ Waits until the return code file for the endpoint is generated """ 
+        """ Waits until the return code file for the endpoint is generated 
+        
+        """ 
         return self.wait_file(endpoint, "ret_file")
  
     def wait_file(self, endpoint, filename):
         return self.wait_file(endpoint, "ret_file")
  
     def wait_file(self, endpoint, filename):
index 3b80df1..673f810 100644 (file)
@@ -39,7 +39,8 @@ class OMFApplication(ResourceManager):
 
     .. note::
 
 
     .. note::
 
-       This class is used only by the Experiment Controller through the Resource Factory
+       This class is used only by the Experiment Controller through the 
+       Resource Factory
 
     """
     _rtype = "OMFApplication"
 
     """
     _rtype = "OMFApplication"
@@ -47,7 +48,8 @@ class OMFApplication(ResourceManager):
 
     @classmethod
     def _register_attributes(cls):
 
     @classmethod
     def _register_attributes(cls):
-        """Register the attributes of an OMF application
+        """ Register the attributes of an OMF application
+
         """
 
         appid = Attribute("appid", "Name of the application")
         """
 
         appid = Attribute("appid", "Name of the application")
@@ -78,7 +80,6 @@ class OMFApplication(ResourceManager):
         :type creds: dict
 
         """
         :type creds: dict
 
         """
-        
         super(OMFApplication, self).__init__(ec, guid)
 
         self.set('appid', "")
         super(OMFApplication, self).__init__(ec, guid)
 
         self.set('appid', "")
@@ -90,12 +91,6 @@ class OMFApplication(ResourceManager):
 
         self._omf_api = None
 
 
         self._omf_api = None
 
-    @property
-    def exp_id(self):
-        if self.ec.exp_id.startswith('exp-'):
-            return None
-        return self.ec.exp_id
-
     @property
     def node(self):
         rm_list = self.get_connected(OMFNode.rtype())
     @property
     def node(self):
         rm_list = self.get_connected(OMFNode.rtype())
@@ -103,7 +98,8 @@ class OMFApplication(ResourceManager):
         return None
 
     def valid_connection(self, guid):
         return None
 
     def valid_connection(self, guid):
-        """Check if the connection with the guid in parameter is possible. Only meaningful connections are allowed.
+        """ Check if the connection with the guid in parameter is possible. 
+        Only meaningful connections are allowed.
 
         :param guid: Guid of RM it will be connected
         :type guid: int
 
         :param guid: Guid of RM it will be connected
         :type guid: int
@@ -112,47 +108,57 @@ class OMFApplication(ResourceManager):
         """
         rm = self.ec.get_resource(guid)
         if rm.rtype() not in self._authorized_connections:
         """
         rm = self.ec.get_resource(guid)
         if rm.rtype() not in self._authorized_connections:
-            msg = "Connection between %s %s and %s %s refused : An Application can be connected only to a Node" %\
+            msg = ("Connection between %s %s and %s %s refused: "
+                    "An Application can be connected only to a Node" ) % \
                 (self.rtype(), self._guid, rm.rtype(), guid)
             self.debug(msg)
                 (self.rtype(), self._guid, rm.rtype(), guid)
             self.debug(msg)
+
             return False
             return False
+
         elif len(self.connections) != 0 :
         elif len(self.connections) != 0 :
-            msg = "Connection between %s %s and %s %s refused : This Application is already connected" % \
+            msg = ("Connection between %s %s and %s %s refused: "
+                    "This Application is already connected" ) % \
                 (self.rtype(), self._guid, rm.rtype(), guid)
             self.debug(msg)
                 (self.rtype(), self._guid, rm.rtype(), guid)
             self.debug(msg)
+
             return False
             return False
+
         else :
         else :
-            msg = "Connection between %s %s and %s %s accepted" % (self.rtype(), self._guid, rm.rtype(), guid)
+            msg = "Connection between %s %s and %s %s accepted" % (
+                    self.rtype(), self._guid, rm.rtype(), guid)
             self.debug(msg)
             self.debug(msg)
-            return True
-
 
 
+            return True
 
     def deploy(self):
 
     def deploy(self):
-        """Deploy the RM. It means nothing special for an application for now (later it will be upload sources, ...)
-           It becomes DEPLOYED after getting the xmpp client.
+        """ Deploy the RM. It means nothing special for an application 
+        for now (later it will be upload sources, ...)
+        It becomes DEPLOYED after getting the xmpp client.
+
         """
         if not self._omf_api :
             self._omf_api = OMFAPIFactory.get_api(self.get('xmppSlice'), 
         """
         if not self._omf_api :
             self._omf_api = OMFAPIFactory.get_api(self.get('xmppSlice'), 
-                self.get('xmppHost'), self.get('xmppPort'), self.get('xmppPassword'), exp_id = self.exp_id)
+                self.get('xmppHost'), self.get('xmppPort'), 
+                self.get('xmppPassword'), exp_id = self.ec.exp_id)
 
         if not self._omf_api :
 
         if not self._omf_api :
-            self._state = ResourceState.FAILED
             msg = "Credentials are not initialzed. XMPP Connections impossible"
             self.error(msg)
             msg = "Credentials are not initialzed. XMPP Connections impossible"
             self.error(msg)
+            self.fail()
             return
 
         super(OMFApplication, self).deploy()
 
     def start(self):
             return
 
         super(OMFApplication, self).deploy()
 
     def start(self):
-        """Start the RM. It means : Send Xmpp Message Using OMF protocol to execute the application
-           It becomes STARTED before the messages are sent (for coordination)
+        """ Start the RM. It means : Send Xmpp Message Using OMF protocol 
+         to execute the application. 
+         It becomes STARTED before the messages are sent (for coordination)
 
         """
         if not (self.get('appid') and self.get('path')) :
 
         """
         if not (self.get('appid') and self.get('path')) :
-            self._state = ResourceState.FAILED
             msg = "Application's information are not initialized"
             self.error(msg)
             msg = "Application's information are not initialized"
             self.error(msg)
+            self.fail()
             return
 
         if not self.get('args'):
             return
 
         if not self.get('args'):
@@ -170,39 +176,37 @@ class OMFApplication(ResourceManager):
             self._omf_api.execute(self.node.get('hostname'),self.get('appid'), \
                 self.get('args'), self.get('path'), self.get('env'))
         except AttributeError:
             self._omf_api.execute(self.node.get('hostname'),self.get('appid'), \
                 self.get('args'), self.get('path'), self.get('env'))
         except AttributeError:
-            self._state = ResourceState.FAILED
             msg = "Credentials are not initialzed. XMPP Connections impossible"
             self.error(msg)
             msg = "Credentials are not initialzed. XMPP Connections impossible"
             self.error(msg)
+            self.fail()
             raise
 
             raise
 
-
         super(OMFApplication, self).start()
 
         super(OMFApplication, self).start()
 
-
     def stop(self):
     def stop(self):
-        """Stop the RM. It means : Send Xmpp Message Using OMF protocol to kill the application
-           It becomes STOPPED after the message is sent.
+        """ Stop the RM. It means : Send Xmpp Message Using OMF protocol to 
+        kill the application. 
+        State is set to STOPPED after the message is sent.
 
         """
         try:
             self._omf_api.exit(self.node.get('hostname'),self.get('appid'))
         except AttributeError:
 
         """
         try:
             self._omf_api.exit(self.node.get('hostname'),self.get('appid'))
         except AttributeError:
-            self._state = ResourceState.FAILED
             msg = "Credentials were not initialzed. XMPP Connections impossible"
             self.error(msg)
             msg = "Credentials were not initialzed. XMPP Connections impossible"
             self.error(msg)
+            self.fail()
             #raise
 
         super(OMFApplication, self).stop()
             #raise
 
         super(OMFApplication, self).stop()
-        self._state = ResourceState.FINISHED
-        
 
     def release(self):
 
     def release(self):
-        """Clean the RM at the end of the experiment and release the API.
+        """ Clean the RM at the end of the experiment and release the API.
 
         """
         if self._omf_api :
             OMFAPIFactory.release_api(self.get('xmppSlice'), 
 
         """
         if self._omf_api :
             OMFAPIFactory.release_api(self.get('xmppSlice'), 
-                self.get('xmppHost'), self.get('xmppPort'), self.get('xmppPassword'), exp_id = self.exp_id)
+                self.get('xmppHost'), self.get('xmppPort'), 
+                self.get('xmppPassword'), exp_id = self.ec.exp_id)
 
         super(OMFApplication, self).release()
 
 
         super(OMFApplication, self).release()
 
index bc4f9b5..b51cd89 100644 (file)
@@ -1,21 +1,22 @@
-"""
-    NEPI, a framework to manage network experiments
-    Copyright (C) 2013 INRIA
-
-    This program is free software: you can redistribute it and/or modify
-    it under the terms of the GNU General Public License as published by
-    the Free Software Foundation, either version 3 of the License, or
-    (at your option) any later version.
-
-    This program is distributed in the hope that it will be useful,
-    but WITHOUT ANY WARRANTY; without even the implied warranty of
-    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
-    GNU General Public License for more details.
-
-    You should have received a copy of the GNU General Public License
-    along with this program.  If not, see <http://www.gnu.org/licenses/>.
-
-"""
+#
+#    NEPI, a framework to manage network experiments
+#    Copyright (C) 2013 INRIA
+#
+#    This program is free software: you can redistribute it and/or modify
+#    it under the terms of the GNU General Public License as published by
+#    the Free Software Foundation, either version 3 of the License, or
+#    (at your option) any later version.
+#
+#    This program is distributed in the hope that it will be useful,
+#    but WITHOUT ANY WARRANTY; without even the implied warranty of
+#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+#    GNU General Public License for more details.
+#
+#    You should have received a copy of the GNU General Public License
+#    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+#
+# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
+#         Julien Tribino <julien.tribino@inria.fr>
 
 from nepi.execution.resource import ResourceManager, clsinit, ResourceState, \
         reschedule_delay
 
 from nepi.execution.resource import ResourceManager, clsinit, ResourceState, \
         reschedule_delay
@@ -44,10 +45,10 @@ class OMFChannel(ResourceManager):
     _rtype = "OMFChannel"
     _authorized_connections = ["OMFWifiInterface", "OMFNode"]
 
     _rtype = "OMFChannel"
     _authorized_connections = ["OMFWifiInterface", "OMFNode"]
 
-
     @classmethod
     def _register_attributes(cls):
         """Register the attributes of an OMF channel
     @classmethod
     def _register_attributes(cls):
         """Register the attributes of an OMF channel
+        
         """
         channel = Attribute("channel", "Name of the application")
         xmppSlice = Attribute("xmppSlice","Name of the slice", flags = Flags.Credential)
         """
         channel = Attribute("channel", "Name of the application")
         xmppSlice = Attribute("xmppSlice","Name of the slice", flags = Flags.Credential)
@@ -83,7 +84,8 @@ class OMFChannel(ResourceManager):
         return self.ec.exp_id
 
     def valid_connection(self, guid):
         return self.ec.exp_id
 
     def valid_connection(self, guid):
-        """Check if the connection with the guid in parameter is possible. Only meaningful connections are allowed.
+        """ Check if the connection with the guid in parameter is possible.
+        Only meaningful connections are allowed.
 
         :param guid: Guid of the current RM
         :type guid: int
 
         :param guid: Guid of the current RM
         :type guid: int
@@ -91,12 +93,17 @@ class OMFChannel(ResourceManager):
 
         """
         rm = self.ec.get_resource(guid)
 
         """
         rm = self.ec.get_resource(guid)
+        
         if rm.rtype() in self._authorized_connections:
         if rm.rtype() in self._authorized_connections:
-            msg = "Connection between %s %s and %s %s accepted" % (self.rtype(), self._guid, rm.rtype(), guid)
+            msg = "Connection between %s %s and %s %s accepted" % (
+                    self.rtype(), self._guid, rm.rtype(), guid)
             self.debug(msg)
             return True
             self.debug(msg)
             return True
-        msg = "Connection between %s %s and %s %s refused" % (self.rtype(), self._guid, rm.rtype(), guid)
+
+        msg = "Connection between %s %s and %s %s refused" % (
+                self.rtype(), self._guid, rm.rtype(), guid)
         self.debug(msg)
         self.debug(msg)
+        
         return False
 
     def _get_target(self, conn_set):
         return False
 
     def _get_target(self, conn_set):
@@ -115,7 +122,8 @@ class OMFChannel(ResourceManager):
             for conn in rm_iface.connections:
                 rm_node = self.ec.get_resource(conn)
                 if rm_node.rtype() == "OMFNode" and rm_node.get('hostname'):
             for conn in rm_iface.connections:
                 rm_node = self.ec.get_resource(conn)
                 if rm_node.rtype() == "OMFNode" and rm_node.get('hostname'):
-                    if rm_iface.state < ResourceState.PROVISIONED or rm_node.state < ResourceState.READY:
+                    if rm_iface.state < ResourceState.PROVISIONED or \
+                            rm_node.state < ResourceState.READY:
                         return "reschedule"
                     couple = [rm_node.get('hostname'), rm_iface.get('alias')]
                     #print couple
                         return "reschedule"
                     couple = [rm_node.get('hostname'), rm_iface.get('alias')]
                     #print couple
@@ -135,24 +143,26 @@ class OMFChannel(ResourceManager):
         pass
 
     def deploy(self):
         pass
 
     def deploy(self):
-        """Deploy the RM. It means : Get the xmpp client and send messages using OMF 5.4 protocol to configure the channel
-           It becomes DEPLOYED after sending messages to configure the channel
+        """ Deploy the RM. It means : Get the xmpp client and send messages 
+        using OMF 5.4 protocol to configure the channel.
+        It becomes DEPLOYED after sending messages to configure the channel
 
         """
         if not self._omf_api :
             self._omf_api = OMFAPIFactory.get_api(self.get('xmppSlice'), 
 
         """
         if not self._omf_api :
             self._omf_api = OMFAPIFactory.get_api(self.get('xmppSlice'), 
-                self.get('xmppHost'), self.get('xmppPort'), self.get('xmppPassword'), exp_id = self.exp_id)
+                self.get('xmppHost'), self.get('xmppPort'), 
+                self.get('xmppPassword'), exp_id = self.exp_id)
 
         if not self._omf_api :
 
         if not self._omf_api :
-            self._state = ResourceState.FAILED
             msg = "Credentials are not initialzed. XMPP Connections impossible"
             self.error(msg)
             msg = "Credentials are not initialzed. XMPP Connections impossible"
             self.error(msg)
+            self.fail()
             return
 
         if not self.get('channel'):
             return
 
         if not self.get('channel'):
-            self._state = ResourceState.FAILED
             msg = "Channel's value is not initialized"
             self.error(msg)
             msg = "Channel's value is not initialized"
             self.error(msg)
+            self.fail()
             raise
 
         self._nodes_guid = self._get_target(self._connections) 
             raise
 
         self._nodes_guid = self._get_target(self._connections) 
@@ -167,35 +177,36 @@ class OMFChannel(ResourceManager):
                 attrname = "net/%s/%s" % (couple[1], 'channel')
                 self._omf_api.configure(couple[0], attrname, attrval)
         except AttributeError:
                 attrname = "net/%s/%s" % (couple[1], 'channel')
                 self._omf_api.configure(couple[0], attrname, attrval)
         except AttributeError:
-            self._state = ResourceState.FAILED
             msg = "Credentials are not initialzed. XMPP Connections impossible"
             self.error(msg)
             msg = "Credentials are not initialzed. XMPP Connections impossible"
             self.error(msg)
+            self.fail()
             raise
 
         super(OMFChannel, self).deploy()
 
     def start(self):
             raise
 
         super(OMFChannel, self).deploy()
 
     def start(self):
-        """Start the RM. It means nothing special for a channel for now
-           It becomes STARTED as soon as this method starts.
+        """ Start the RM. It means nothing special for a channel for now
+        It becomes STARTED as soon as this method starts.
 
         """
 
         super(OMFChannel, self).start()
 
     def stop(self):
 
         """
 
         super(OMFChannel, self).start()
 
     def stop(self):
-        """Stop the RM. It means nothing special for a channel for now
-           It becomes STOPPED as soon as this method is called
+        """ Stop the RM. It means nothing special for a channel for now
+        It becomes STOPPED as soon as this method is called
 
         """
         super(OMFChannel, self).stop()
 
     def release(self):
 
         """
         super(OMFChannel, self).stop()
 
     def release(self):
-        """Clean the RM at the end of the experiment and release the API
+        """ Clean the RM at the end of the experiment and release the API
 
         """
         if self._omf_api :
             OMFAPIFactory.release_api(self.get('xmppSlice'), 
 
         """
         if self._omf_api :
             OMFAPIFactory.release_api(self.get('xmppSlice'), 
-                self.get('xmppHost'), self.get('xmppPort'), self.get('xmppPassword'), exp_id = self.exp_id)
+                self.get('xmppHost'), self.get('xmppPort'), 
+                self.get('xmppPassword'), exp_id = self.exp_id)
 
         super(OMFChannel, self).release()
 
 
         super(OMFChannel, self).release()
 
index c2be38e..a3b9c3a 100644 (file)
@@ -26,7 +26,6 @@ from nepi.resources.omf.node import OMFNode
 from nepi.resources.omf.channel import OMFChannel
 from nepi.resources.omf.omf_api import OMFAPIFactory
 
 from nepi.resources.omf.channel import OMFChannel
 from nepi.resources.omf.omf_api import OMFAPIFactory
 
-
 @clsinit
 class OMFWifiInterface(ResourceManager):
     """
 @clsinit
 class OMFWifiInterface(ResourceManager):
     """
@@ -41,7 +40,8 @@ class OMFWifiInterface(ResourceManager):
 
     .. note::
 
 
     .. note::
 
-       This class is used only by the Experiment Controller through the Resource Factory
+       This class is used only by the Experiment Controller through the Resource 
+       Factory
 
     """
     _rtype = "OMFWifiInterface"
 
     """
     _rtype = "OMFWifiInterface"
@@ -88,14 +88,9 @@ class OMFWifiInterface(ResourceManager):
         self._omf_api = None
         self._alias = self.get('alias')
 
         self._omf_api = None
         self._alias = self.get('alias')
 
-    @property
-    def exp_id(self):
-        if self.ec.exp_id.startswith('exp-'):
-            return None
-        return self.ec.exp_id
-
     def valid_connection(self, guid):
     def valid_connection(self, guid):
-        """ Check if the connection with the guid in parameter is possible. Only meaningful connections are allowed.
+        """ Check if the connection with the guid in parameter is possible. 
+        Only meaningful connections are allowed.
 
         :param guid: Guid of the current RM
         :type guid: int
 
         :param guid: Guid of the current RM
         :type guid: int
@@ -107,10 +102,13 @@ class OMFWifiInterface(ResourceManager):
             msg = "Connection between %s %s and %s %s accepted" % \
                 (self.rtype(), self._guid, rm.rtype(), guid)
             self.debug(msg)
             msg = "Connection between %s %s and %s %s accepted" % \
                 (self.rtype(), self._guid, rm.rtype(), guid)
             self.debug(msg)
+
             return True
             return True
+
         msg = "Connection between %s %s and %s %s refused" % \
              (self.rtype(), self._guid, rm.rtype(), guid)
         self.debug(msg)
         msg = "Connection between %s %s and %s %s refused" % \
              (self.rtype(), self._guid, rm.rtype(), guid)
         self.debug(msg)
+
         return False
 
     @property
         return False
 
     @property
@@ -138,7 +136,8 @@ class OMFWifiInterface(ResourceManager):
             for attrname in ["mode", "type", "essid"]:
                 attrval = self.get(attrname)
                 attrname = "net/%s/%s" % (self._alias, attrname)
             for attrname in ["mode", "type", "essid"]:
                 attrval = self.get(attrname)
                 attrname = "net/%s/%s" % (self._alias, attrname)
-                self._omf_api.configure(self.node.get('hostname'), attrname, attrval)
+                self._omf_api.configure(self.node.get('hostname'), attrname, 
+                        attrval)
         except AttributeError:
             self._state = ResourceState.FAILED
             msg = "Credentials are not initialzed. XMPP Connections impossible"
         except AttributeError:
             self._state = ResourceState.FAILED
             msg = "Credentials are not initialzed. XMPP Connections impossible"
@@ -152,7 +151,6 @@ class OMFWifiInterface(ResourceManager):
         """ Configure the ip of the interface
 
         """
         """ Configure the ip of the interface
 
         """
-
         if self.channel.state < ResourceState.READY:
             self.ec.schedule(reschedule_delay, self.deploy)
             return False
         if self.channel.state < ResourceState.READY:
             self.ec.schedule(reschedule_delay, self.deploy)
             return False
@@ -160,39 +158,43 @@ class OMFWifiInterface(ResourceManager):
         try :
             attrval = self.get("ip")
             attrname = "net/%s/%s" % (self._alias, "ip")
         try :
             attrval = self.get("ip")
             attrname = "net/%s/%s" % (self._alias, "ip")
-            self._omf_api.configure(self.node.get('hostname'), attrname, attrval)
+            self._omf_api.configure(self.node.get('hostname'), attrname, 
+                    attrval)
         except AttributeError:
         except AttributeError:
-            self._state = ResourceState.FAILED
             msg = "Credentials are not initialzed. XMPP Connections impossible"
             self.debug(msg)
             msg = "Credentials are not initialzed. XMPP Connections impossible"
             self.debug(msg)
+            self.fail()
             #raise
 
         return True
 
             #raise
 
         return True
 
-
     def deploy(self):
     def deploy(self):
-        """Deploy the RM. It means : Get the xmpp client and send messages using OMF 5.4 protocol to configure the interface
-           It becomes DEPLOYED after sending messages to configure the interface
+        """ Deploy the RM. It means : Get the xmpp client and send messages 
+        using OMF 5.4 protocol to configure the interface.
+        It becomes DEPLOYED after sending messages to configure the interface
         """
         if not self._omf_api :
             self._omf_api = OMFAPIFactory.get_api(self.get('xmppSlice'), 
         """
         if not self._omf_api :
             self._omf_api = OMFAPIFactory.get_api(self.get('xmppSlice'), 
-                self.get('xmppHost'), self.get('xmppPort'), self.get('xmppPassword'), exp_id = self.exp_id)
+                self.get('xmppHost'), self.get('xmppPort'), 
+                self.get('xmppPassword'), exp_id = self.ec.exp_id)
 
         if not self._omf_api :
 
         if not self._omf_api :
-            self._state = ResourceState.FAILED
             msg = "Credentials are not initialzed. XMPP Connections impossible"
             self.error(msg)
             msg = "Credentials are not initialzed. XMPP Connections impossible"
             self.error(msg)
+            self.fail()
             return
 
             return
 
-        if not (self.get('mode') and self.get('type') and self.get('essid') and self.get('ip')):
-            self._state = ResourceState.FAILED
+        if not (self.get('mode') and self.get('type') and self.get('essid') \
+                and self.get('ip')):
             msg = "Interface's variable are not initialized"
             self.error(msg)
             msg = "Interface's variable are not initialized"
             self.error(msg)
+            self.fail()
             return False
 
         if not self.node.get('hostname') :
             msg = "The channel is connected with an undefined node"
             self.error(msg)
             return False
 
         if not self.node.get('hostname') :
             msg = "The channel is connected with an undefined node"
             self.error(msg)
+            self.fail()
             return False
 
         # Just for information
             return False
 
         # Just for information
@@ -213,28 +215,14 @@ class OMFWifiInterface(ResourceManager):
         super(OMFWifiInterface, self).deploy()
         return True
 
         super(OMFWifiInterface, self).deploy()
         return True
 
-    def start(self):
-        """Start the RM. It means nothing special for an interface for now
-           It becomes STARTED as soon as this method starts.
-
-        """
-
-        super(OMFWifiInterface, self).start()
-
-    def stop(self):
-        """Stop the RM. It means nothing special for an interface for now
-           It becomes STOPPED as soon as this method stops
-
-        """
-        super(OMFWifiInterface, self).stop()
-
     def release(self):
     def release(self):
-        """Clean the RM at the end of the experiment and release the API
+        """ Clean the RM at the end of the experiment and release the API
 
         """
         if self._omf_api :
             OMFAPIFactory.release_api(self.get('xmppSlice'), 
 
         """
         if self._omf_api :
             OMFAPIFactory.release_api(self.get('xmppSlice'), 
-                self.get('xmppHost'), self.get('xmppPort'), self.get('xmppPassword'), exp_id = self.exp_id)
+                self.get('xmppHost'), self.get('xmppPort'), 
+                self.get('xmppPassword'), exp_id = self.ec.exp_id)
 
         super(OMFWifiInterface, self).release()
 
 
         super(OMFWifiInterface, self).release()
 
index 612fa38..1421078 100644 (file)
@@ -101,14 +101,9 @@ class OMFNode(ResourceManager):
 
         self._omf_api = None 
 
 
         self._omf_api = None 
 
-    @property
-    def exp_id(self):
-        if self.ec.exp_id.startswith('exp-'):
-            return None
-        return self.ec.exp_id
-
     def valid_connection(self, guid):
     def valid_connection(self, guid):
-        """Check if the connection with the guid in parameter is possible. Only meaningful connections are allowed.
+        """ Check if the connection with the guid in parameter is possible. 
+        Only meaningful connections are allowed.
 
         :param guid: Guid of the current RM
         :type guid: int
 
         :param guid: Guid of the current RM
         :type guid: int
@@ -117,40 +112,47 @@ class OMFNode(ResourceManager):
         """
         rm = self.ec.get_resource(guid)
         if rm.rtype() in self._authorized_connections:
         """
         rm = self.ec.get_resource(guid)
         if rm.rtype() in self._authorized_connections:
-            msg = "Connection between %s %s and %s %s accepted" % (self.rtype(), self._guid, rm.rtype(), guid)
+            msg = "Connection between %s %s and %s %s accepted" % (
+                    self.rtype(), self._guid, rm.rtype(), guid)
             self.debug(msg)
             self.debug(msg)
+
             return True
             return True
-        msg = "Connection between %s %s and %s %s refused" % (self.rtype(), self._guid, rm.rtype(), guid)
+
+        msg = "Connection between %s %s and %s %s refused" % (
+                self.rtype(), self._guid, rm.rtype(), guid)
         self.debug(msg)
         self.debug(msg)
+
         return False
 
     def deploy(self):
         return False
 
     def deploy(self):
-        """Deploy the RM. It means : Send Xmpp Message Using OMF protocol to enroll the node into the experiment
-           It becomes DEPLOYED after sending messages to enroll the node
+        """ Deploy the RM. It means : Send Xmpp Message Using OMF protocol 
+            to enroll the node into the experiment.
+            It becomes DEPLOYED after sending messages to enroll the node
 
         """ 
         if not self._omf_api :
             self._omf_api = OMFAPIFactory.get_api(self.get('xmppSlice'), 
 
         """ 
         if not self._omf_api :
             self._omf_api = OMFAPIFactory.get_api(self.get('xmppSlice'), 
-                self.get('xmppHost'), self.get('xmppPort'), self.get('xmppPassword'), exp_id = self.exp_id)
+                self.get('xmppHost'), self.get('xmppPort'), 
+                self.get('xmppPassword'), exp_id = self.ec.exp_id)
 
         if not self._omf_api :
 
         if not self._omf_api :
-            self._state = ResourceState.FAILED
             msg = "Credentials are not initialzed. XMPP Connections impossible"
             self.error(msg)
             msg = "Credentials are not initialzed. XMPP Connections impossible"
             self.error(msg)
+            self.fail()
             return
 
         if not self.get('hostname') :
             return
 
         if not self.get('hostname') :
-            self._state = ResourceState.FAILED
             msg = "Hostname's value is not initialized"
             self.error(msg)
             msg = "Hostname's value is not initialized"
             self.error(msg)
+            self.fail()
             return False
 
         try:
             self._omf_api.enroll_host(self.get('hostname'))
         except AttributeError:
             return False
 
         try:
             self._omf_api.enroll_host(self.get('hostname'))
         except AttributeError:
-            self._state = ResourceState.FAILED
             msg = "Credentials are not initialzed. XMPP Connections impossible"
             msg = "Credentials are not initialzed. XMPP Connections impossible"
-            self.debug(msg)
+            self.error(msg)
+            self.fail()
             #raise AttributeError, msg
 
         super(OMFNode, self).deploy()
             #raise AttributeError, msg
 
         super(OMFNode, self).deploy()
@@ -188,8 +190,10 @@ class OMFNode(ResourceManager):
         """
         if self._omf_api :
             self._omf_api.release(self.get('hostname'))
         """
         if self._omf_api :
             self._omf_api.release(self.get('hostname'))
+
             OMFAPIFactory.release_api(self.get('xmppSlice'), 
             OMFAPIFactory.release_api(self.get('xmppSlice'), 
-                self.get('xmppHost'), self.get('xmppPort'), self.get('xmppPassword'), exp_id = self.exp_id)
+                self.get('xmppHost'), self.get('xmppPort'), 
+                self.get('xmppPassword'), exp_id = self.ec.exp_id)
 
         super(OMFNode, self).release()
 
 
         super(OMFNode, self).release()
 
index b4b01c5..dc03b93 100644 (file)
@@ -48,10 +48,13 @@ class OMFAPI(Logger):
 
     .. note::
 
 
     .. note::
 
-       This class is the implementation of an OMF 5.4 API. Since the version 5.4.1, the Topic Architecture start with OMF_5.4 instead of OMF used for OMF5.3
+       This class is the implementation of an OMF 5.4 API. 
+       Since the version 5.4.1, the Topic Architecture start with OMF_5.4 
+       instead of OMF used for OMF5.3
 
     """
 
     """
-    def __init__(self, slice, host, port, password, xmpp_root = None, exp_id = None):
+    def __init__(self, slice, host, port, password, xmpp_root = None, 
+            exp_id = None):
         """
     
         :param slice: Xmpp Slice
         """
     
         :param slice: Xmpp Slice
@@ -81,6 +84,7 @@ class OMFAPI(Logger):
 
         # OMF xmpp client
         self._client = None
 
         # OMF xmpp client
         self._client = None
+
         # message handler
         self._message = None
 
         # message handler
         self._message = None
 
@@ -133,7 +137,8 @@ class OMFAPI(Logger):
         """ Publish New Experiment Message
 
         """
         """ Publish New Experiment Message
 
         """
-        address = "/%s/%s/%s/%s" % (self._host, self._xmpp_root, self._slice, self._user)
+        address = "/%s/%s/%s/%s" % (self._host, self._xmpp_root, self._slice,
+                self._user)
         #print address
         payload = self._message.newexp_function(self._user, address)
         slice_sid = "/%s/%s" % (self._xmpp_root, self._slice)
         #print address
         payload = self._message.newexp_function(self._user, address)
         slice_sid = "/%s/%s" % (self._xmpp_root, self._slice)
@@ -160,7 +165,8 @@ class OMFAPI(Logger):
         :type hostname: str
 
         """
         :type hostname: str
 
         """
-        return "/%s/%s/%s/%s" % (self._xmpp_root, self._slice, self._user, hostname)
+        return "/%s/%s/%s/%s" % (self._xmpp_root, self._slice, self._user, 
+                hostname)
 
     def _host_resource_id(self, hostname):
         """ Return the Topic Name as /xmpp_root/slice/resources/hostname
 
     def _host_resource_id(self, hostname):
         """ Return the Topic Name as /xmpp_root/slice/resources/hostname
@@ -201,7 +207,8 @@ class OMFAPI(Logger):
         self._client.delete(xmpp_node)
 
     def enroll_host(self, hostname):
         self._client.delete(xmpp_node)
 
     def enroll_host(self, hostname):
-        """ Create and Subscribe to the session topic and the resources corresponding to the hostname
+        """ Create and Subscribe to the session topic and the resources
+            corresponding to the hostname
 
         :param hostname: Full hrn of the node
         :type hostname: str
 
         :param hostname: Full hrn of the node
         :type hostname: str
@@ -227,7 +234,8 @@ class OMFAPI(Logger):
 
         :param hostname: Full hrn of the node
         :type hostname: str
 
         :param hostname: Full hrn of the node
         :type hostname: str
-        :param attribute: Attribute that need to be configured (often written as /net/wX/attribute, with X the interface number)
+        :param attribute: Attribute that need to be configured (
+            often written as /net/wX/attribute, with X the interface number)
         :type attribute: str
         :param value: Value of the attribute
         :type value: str
         :type attribute: str
         :param value: Value of the attribute
         :type value: str
@@ -242,7 +250,8 @@ class OMFAPI(Logger):
 
         :param hostname: Full hrn of the node
         :type hostname: str
 
         :param hostname: Full hrn of the node
         :type hostname: str
-        :param app_id: Application Id (Any id that represents in a unique way the application)
+        :param app_id: Application Id (Any id that represents in a unique 
+            way the application)
         :type app_id: str
         :param arguments: Arguments of the application
         :type arguments: str
         :type app_id: str
         :param arguments: Arguments of the application
         :type arguments: str
@@ -252,7 +261,8 @@ class OMFAPI(Logger):
         :type env: str
 
         """
         :type env: str
 
         """
-        payload = self._message.execute_function(hostname, app_id, arguments, path, env)
+        payload = self._message.execute_function(hostname, app_id, arguments, 
+                path, env)
         xmpp_node =  self._host_session_id(hostname)
         self._client.publish(payload, xmpp_node)
 
         xmpp_node =  self._host_session_id(hostname)
         self._client.publish(payload, xmpp_node)
 
@@ -295,11 +305,12 @@ class OMFAPIFactory(object):
     """ 
     .. note::
 
     """ 
     .. note::
 
-        It allows the different RM to use the same xmpp client if they use the same credentials. 
-        For the moment, it is focused on Xmpp.
+        It allows the different RM to use the same xmpp client if they use 
+        the same credentials.  For the moment, it is focused on XMPP.
 
     """
 
     """
-    # use lock to avoid concurrent access to the Api list at the same times by 2 different threads
+    # use lock to avoid concurrent access to the Api list at the same times by 2 
+    # different threads
     lock = threading.Lock()
     _apis = dict()
 
     lock = threading.Lock()
     _apis = dict()
 
index f85c3d5..f71bc5f 100644 (file)
@@ -23,8 +23,18 @@ from nepi.execution.resource import ResourceManager, clsinit_copy, ResourceState
 from nepi.resources.linux.node import LinuxNode
 from nepi.resources.planetlab.plcapi import PLCAPIFactory 
 from nepi.util.timefuncs import tnow, tdiff, tdiffsec, stabsformat
 from nepi.resources.linux.node import LinuxNode
 from nepi.resources.planetlab.plcapi import PLCAPIFactory 
 from nepi.util.timefuncs import tnow, tdiff, tdiffsec, stabsformat
-import threading
+
 import subprocess
 import subprocess
+import threading
+
+# A.Q. GENERAL COMMENTS: This module needs major cleaning up
+#     - Lines should be 80 characters
+#     - Most methods have too many lines and there are no comments or spaces
+#     - There should be only two line breaks between two methods
+#     - Code is too compressed. Hard to read. Add spaces when needed
+#     - In general the code needs to be more subdivided. Use more methods 
+#       with clear names to divide operations (even if you don't reuse the 
+#       methods else where, this will make the code more readable)
 
 @clsinit_copy
 class PlanetlabNode(LinuxNode):
 
 @clsinit_copy
 class PlanetlabNode(LinuxNode):
@@ -43,6 +53,7 @@ class PlanetlabNode(LinuxNode):
         """
         return cls._blacklist
 
         """
         return cls._blacklist
 
+    ### A.Q. COMMENT: Why did you wrapped the locks inside methods ?
     @classmethod
     def in_provision(cls):
         """ Returns the nodes that anohter RM is trying to provision
     @classmethod
     def in_provision(cls):
         """ Returns the nodes that anohter RM is trying to provision
@@ -221,6 +232,8 @@ class PlanetlabNode(LinuxNode):
         return self._plapi
 
     def discoverl(self):
         return self._plapi
 
     def discoverl(self):
+        #### A.Q. COMMENT: no need to have methods for the locks and 
+        ##                 other attributes. Please remove.
         bl = PlanetlabNode.blacklist()
         inpro = PlanetlabNode.in_provision()
         lockbl = PlanetlabNode.lock_bl()
         bl = PlanetlabNode.blacklist()
         inpro = PlanetlabNode.in_provision()
         lockbl = PlanetlabNode.lock_bl()
@@ -231,6 +244,12 @@ class PlanetlabNode(LinuxNode):
             if node_id not in bl and node_id not in inpro:
                 try_other = self.do_ping(node_id)
                 if try_other:
             if node_id not in bl and node_id not in inpro:
                 try_other = self.do_ping(node_id)
                 if try_other:
+                    # A.Q. COMMENT: Here you could do 
+                    #
+                    #   with self._lockbl:
+                    #       ...
+                    #
+                    #  Class attributes can still be accesed with 'self'
                     lockbl.acquire()
                     bl.append(node_id)
                     lockbl.release()
                     lockbl.acquire()
                     bl.append(node_id)
                     lockbl.release()
@@ -306,6 +325,7 @@ class PlanetlabNode(LinuxNode):
                     
 
     def provisionl(self):
                     
 
     def provisionl(self):
+        # A.Q. COMMENT: you can import time on the top
         import time
         bl = PlanetlabNode.blacklist()
         lockbl = PlanetlabNode.lock_bl()
         import time
         bl = PlanetlabNode.blacklist()
         lockbl = PlanetlabNode.lock_bl()
@@ -325,6 +345,9 @@ class PlanetlabNode(LinuxNode):
             t = 0 
             while t < timeout and not ssh_ok:
                 # check ssh connection
             t = 0 
             while t < timeout and not ssh_ok:
                 # check ssh connection
+
+                # A.Q. COMMENT IMPORTANT! Instead of issuing SSH commands directly use the
+                #    "execute" method inherithed from LinuxNode with blocking = True
                 command = "ssh %s@%s -o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no 'echo \'GOOD NODE\''" % (slicename, ip)
                 p = subprocess.Popen(command, shell=True, stdout = subprocess.PIPE, stderr = subprocess.PIPE) 
                 stdout, stderr = p.communicate()
                 command = "ssh %s@%s -o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no 'echo \'GOOD NODE\''" % (slicename, ip)
                 p = subprocess.Popen(command, shell=True, stdout = subprocess.PIPE, stderr = subprocess.PIPE) 
                 stdout, stderr = p.communicate()
@@ -341,6 +364,9 @@ class PlanetlabNode(LinuxNode):
                 with lockbl:
                     bl.append(node)
                     print bl
                 with lockbl:
                     bl.append(node)
                     print bl
+                    # A.Q. COMMENT: Make method "delete_slice_node" and there 
+                    #               put this code. Repeat this for all calls to plapi.
+                    #               This will make the code cleaner.
                     self.plapi.delete_slice_node(slicename, [node])
                     self.discover()
                 continue
                     self.plapi.delete_slice_node(slicename, [node])
                     self.discover()
                 continue
@@ -351,6 +377,8 @@ class PlanetlabNode(LinuxNode):
                 p = subprocess.Popen(command, shell=True, stdout = subprocess.PIPE, stderr = subprocess.PIPE)
                 stdout, stderr = p.communicate()
                 if stdout.find("/proc type proc") < 0:
                 p = subprocess.Popen(command, shell=True, stdout = subprocess.PIPE, stderr = subprocess.PIPE)
                 stdout, stderr = p.communicate()
                 if stdout.find("/proc type proc") < 0:
+                    # A.Q. COMMENT: lines 382-384 should go to a method
+                    #       "blacklist_node()"
                     lockbl.acquire()
                     bl.append(node)
                     lockbl.release()
                     lockbl.acquire()
                     bl.append(node)
                     lockbl.release()
@@ -370,7 +398,6 @@ class PlanetlabNode(LinuxNode):
         # call provision de linux node?
         super(PlanetlabNode, self).provision()
 
         # call provision de linux node?
         super(PlanetlabNode, self).provision()
 
-        
     def filter_based_on_attributes(self):
         # Map attributes with tagnames of PL
         timeframe = self.get("timeframe")[0]
     def filter_based_on_attributes(self):
         # Map attributes with tagnames of PL
         timeframe = self.get("timeframe")[0]
@@ -485,6 +512,8 @@ class PlanetlabNode(LinuxNode):
         return nodes_inslice
 
     def do_ping(self, node_id):
         return nodes_inslice
 
     def do_ping(self, node_id):
+        # A.Q. COMMENT: the execfuncs module in utils will do the local ping for you
+        #               code reuse is good...
         ip = self.plapi.get_interfaces({'node_id':node_id}, fields=['ip'])
         ip = ip[0]['ip']
         result = subprocess.call(["ping","-c","2",ip],stdout=subprocess.PIPE,stderr=subprocess.PIPE)
         ip = self.plapi.get_interfaces({'node_id':node_id}, fields=['ip'])
         ip = ip[0]['ip']
         result = subprocess.call(["ping","-c","2",ip],stdout=subprocess.PIPE,stderr=subprocess.PIPE)
@@ -493,7 +522,7 @@ class PlanetlabNode(LinuxNode):
         elif result == 1 or result == 2:
             return True
 
         elif result == 1 or result == 2:
             return True
 
-
+    # A.Q. Unclear name for method "fail2"
     def fail2(self):
         self.fail()
         msg = "Discovery failed. No candidates found for node"
     def fail2(self):
         self.fail()
         msg = "Discovery failed. No candidates found for node"
index 01e7b14..411eb51 100644 (file)
@@ -18,7 +18,7 @@
 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
 
 from nepi.execution.attribute import Attribute, Flags, Types
 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
 
 from nepi.execution.attribute import Attribute, Flags, Types
-from nepi.execution.resource import ResourceManager, clsinit_copy, ResourceState, \
+from nepi.execution.resource import clsinit_copy, ResourceState, \
         reschedule_delay
 from nepi.resources.linux.application import LinuxApplication
 from nepi.resources.planetlab.node import PlanetlabNode
         reschedule_delay
 from nepi.resources.linux.application import LinuxApplication
 from nepi.resources.planetlab.node import PlanetlabNode
@@ -166,35 +166,31 @@ class PlanetlabTap(LinuxApplication):
                 raise
  
             self.debug("----- READY ---- ")
                 raise
  
             self.debug("----- READY ---- ")
-            self._ready_time = tnow()
-            self._state = ResourceState.READY
+            self.set_ready()
 
     def start(self):
 
     def start(self):
-        if self._state == ResourceState.READY:
+        if self.state == ResourceState.READY:
             command = self.get("command")
             self.info("Starting command '%s'" % command)
 
             command = self.get("command")
             self.info("Starting command '%s'" % command)
 
-            self._start_time = tnow()
-            self._state = ResourceState.STARTED
+            self.set_started()
         else:
             msg = " Failed to execute command '%s'" % command
             self.error(msg, out, err)
         else:
             msg = " Failed to execute command '%s'" % command
             self.error(msg, out, err)
-            self._state = ResourceState.FAILED
+            self.fail()
             raise RuntimeError, msg
 
     def stop(self):
         command = self.get('command') or ''
             raise RuntimeError, msg
 
     def stop(self):
         command = self.get('command') or ''
-        state = self.state
         
         
-        if state == ResourceState.STARTED:
+        if self.state == ResourceState.STARTED:
             self.info("Stopping command '%s'" % command)
 
             command = "bash %s" % os.path.join(self.app_home, "stop.sh")
             (out, err), proc = self.execute_command(command,
                     blocking = True)
 
             self.info("Stopping command '%s'" % command)
 
             command = "bash %s" % os.path.join(self.app_home, "stop.sh")
             (out, err), proc = self.execute_command(command,
                     blocking = True)
 
-            self._stop_time = tnow()
-            self._state = ResourceState.STOPPED
+            self.set_stopped()
 
     @property
     def state(self):
 
     @property
     def state(self):
@@ -208,6 +204,7 @@ class PlanetlabTap(LinuxApplication):
 
                 if out.strip().find(self.get("deviceName")) == -1: 
                     # tap is not running is not running (socket not found)
 
                 if out.strip().find(self.get("deviceName")) == -1: 
                     # tap is not running is not running (socket not found)
+                    self._finish_time = tnow()
                     self._state = ResourceState.FINISHED
 
             self._last_state_check = tnow()
                     self._state = ResourceState.FINISHED
 
             self._last_state_check = tnow()