git://git.onelab.eu
/
nepi.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
Replacing _backend for _platform class attribute in ResourceManager
[nepi.git]
/
src
/
nepi
/
execution
/
resource.py
diff --git
a/src/nepi/execution/resource.py
b/src/nepi/execution/resource.py
index
c56aa5b
..
f1669d0
100644
(file)
--- a/
src/nepi/execution/resource.py
+++ b/
src/nepi/execution/resource.py
@@
-31,8
+31,6
@@
import sys
import threading
import weakref
import threading
import weakref
-reschedule_delay = "1s"
-
class ResourceAction:
""" Action that a user can order to a Resource Manager
class ResourceAction:
""" Action that a user can order to a Resource Manager
@@
-113,11
+111,13
@@
def failtrap(func):
try:
return func(self, *args, **kwargs)
except:
try:
return func(self, *args, **kwargs)
except:
+ self.fail()
+
import traceback
err = traceback.format_exc()
import traceback
err = traceback.format_exc()
-
self.error(err
)
-
self.debug("SETTING guid %d to state FAILED" % self.guid
)
-
self.fail(
)
+
logger = Logger(self._rtype
)
+
logger.error(err
)
+
logger.error("SETTING guid %d to state FAILED" % self.guid
)
raise
return wrapped
raise
return wrapped
@@
-127,19
+127,20
@@
class ResourceManager(Logger):
""" Base clase for all ResourceManagers.
A ResourceManger is specific to a resource type (e.g. Node,
""" Base clase for all ResourceManagers.
A ResourceManger is specific to a resource type (e.g. Node,
- Switch, Application, etc) on a specific
backend
(e.g. PlanetLab,
+ Switch, Application, etc) on a specific
platform
(e.g. PlanetLab,
OMF, etc).
The ResourceManager instances are responsible for interacting with
and controlling concrete (physical or virtual) resources in the
OMF, etc).
The ResourceManager instances are responsible for interacting with
and controlling concrete (physical or virtual) resources in the
- experimental
backend
s.
+ experimental
platform
s.
"""
_rtype = "Resource"
_attributes = None
_traces = None
_help = None
"""
_rtype = "Resource"
_attributes = None
_traces = None
_help = None
- _backend = None
+ _platform = None
+ _reschedule_delay = "0.5s"
@classmethod
def _register_attribute(cls, attr):
@classmethod
def _register_attribute(cls, attr):
@@
-193,8
+194,17
@@
class ResourceManager(Logger):
type = Types.Bool,
default = True,
flags = Flags.Design)
type = Types.Bool,
default = True,
flags = Flags.Design)
+ hard_release = Attribute("hardRelease",
+ "Forces removal of all result files and directories associated "
+ "to the RM upon resource release. After release the RM will "
+ "be removed from the EC and the results will not longer be "
+ "accessible",
+ type = Types.Bool,
+ default = False,
+ flags = Flags.Design)
cls._register_attribute(critical)
cls._register_attribute(critical)
+ cls._register_attribute(hard_release)
@classmethod
def _register_traces(cls):
@classmethod
def _register_traces(cls):
@@
-265,7
+275,6
@@
class ResourceManager(Logger):
"""
return copy.deepcopy(cls._attributes[name])
"""
return copy.deepcopy(cls._attributes[name])
-
@classmethod
def get_traces(cls):
""" Returns a copy of the traces
@classmethod
def get_traces(cls):
""" Returns a copy of the traces
@@
-281,12
+290,12
@@
class ResourceManager(Logger):
return cls._help
@classmethod
return cls._help
@classmethod
- def get_
backend
(cls):
- """ Returns the identified of the
backend (i.e. testbed, environment
)
+ def get_
platform
(cls):
+ """ Returns the identified of the
platform (i.e. testbed type
)
for the Resource
"""
for the Resource
"""
- return cls._
backend
+ return cls._
platform
@classmethod
def get_global(cls, name):
@classmethod
def get_global(cls, name):
@@
-344,7
+353,7
@@
class ResourceManager(Logger):
# instance lock to synchronize exclusive state change methods (such
# as deploy and release methods), in order to prevent them from being
# instance lock to synchronize exclusive state change methods (such
# as deploy and release methods), in order to prevent them from being
- # executed at the same time
+ # executed at the same time
and corrupt internal resource state
self._release_lock = threading.Lock()
@property
self._release_lock = threading.Lock()
@property
@@
-412,6
+421,11
@@
class ResourceManager(Logger):
""" Get the current state of the RM """
return self._state
""" Get the current state of the RM """
return self._state
+ @property
+ def reschedule_delay(self):
+ """ Returns default reschedule delay """
+ return self._reschedule_delay
+
def log_message(self, msg):
""" Returns the log message formatted with added information.
def log_message(self, msg):
""" Returns the log message formatted with added information.
@@
-420,7
+434,7
@@
class ResourceManager(Logger):
:rtype: str
"""
:rtype: str
"""
- return " %s guid
:
%d - %s " % (self._rtype, self.guid, msg)
+ return " %s guid %d - %s " % (self._rtype, self.guid, msg)
def register_connection(self, guid):
""" Registers a connection to the RM identified by guid
def register_connection(self, guid):
""" Registers a connection to the RM identified by guid
@@
-552,11
+566,14
@@
class ResourceManager(Logger):
try:
self.do_release()
except:
try:
self.do_release()
except:
+ self.set_released()
+
import traceback
err = traceback.format_exc()
import traceback
err = traceback.format_exc()
- self.error(err)
-
- self.set_released()
+ msg = " %s guid %d ----- FAILED TO RELEASE ----- \n %s " % (
+ self._rtype, self.guid, err)
+ logger = Logger(self._rtype)
+ logger.debug(msg)
def fail(self):
""" Sets the RM to state FAILED.
def fail(self):
""" Sets the RM to state FAILED.
@@
-589,8
+606,12
@@
class ResourceManager(Logger):
:rtype: str
"""
attr = self._attrs[name]
:rtype: str
"""
attr = self._attrs[name]
+
+ """
+ A.Q. Commenting due to performance impact
if attr.has_flag(Flags.Global):
self.warning( "Attribute %s is global. Use get_global instead." % name)
if attr.has_flag(Flags.Global):
self.warning( "Attribute %s is global. Use get_global instead." % name)
+ """
return attr.value
return attr.value
@@
-603,7
+624,7
@@
class ResourceManager(Logger):
:rtype: str
"""
attr = self._attrs[name]
:rtype: str
"""
attr = self._attrs[name]
- return attr.has_changed
()
+ return attr.has_changed
def has_flag(self, name, flag):
""" Returns true if the attribute has the flag 'flag'
def has_flag(self, name, flag):
""" Returns true if the attribute has the flag 'flag'
@@
-732,12
+753,23
@@
class ResourceManager(Logger):
connected = []
rclass = ResourceFactory.get_resource_type(rtype)
for guid in self.connections:
connected = []
rclass = ResourceFactory.get_resource_type(rtype)
for guid in self.connections:
-
rm = self.ec.get_resource(guid)
if not rtype or isinstance(rm, rclass):
connected.append(rm)
return connected
rm = self.ec.get_resource(guid)
if not rtype or isinstance(rm, rclass):
connected.append(rm)
return connected
+ def is_rm_instance(self, rtype):
+ """ Returns True if the RM is instance of 'rtype'
+
+ :param rtype: Type of the RM we look for
+ :type rtype: str
+ :return: True|False
+ """
+ rclass = ResourceFactory.get_resource_type(rtype)
+ if isinstance(self, rclass):
+ return True
+ return False
+
@failtrap
def _needs_reschedule(self, group, state, time):
""" Internal method that verify if 'time' has elapsed since
@failtrap
def _needs_reschedule(self, group, state, time):
""" Internal method that verify if 'time' has elapsed since
@@
-756,7
+788,7
@@
class ResourceManager(Logger):
"""
reschedule = False
"""
reschedule = False
- delay = reschedule_delay
+ delay =
self.
reschedule_delay
# check state and time elapsed on all RMs
for guid in group:
# check state and time elapsed on all RMs
for guid in group:
@@
-825,7
+857,7
@@
class ResourceManager(Logger):
"""
reschedule = False
"""
reschedule = False
- delay = reschedule_delay
+ delay =
self.
reschedule_delay
## evaluate if set conditions are met
## evaluate if set conditions are met
@@
-850,7
+882,7
@@
class ResourceManager(Logger):
#import pdb;pdb.set_trace()
reschedule = False
#import pdb;pdb.set_trace()
reschedule = False
- delay = reschedule_delay
+ delay =
self.
reschedule_delay
## evaluate if conditions to start are met
## evaluate if conditions to start are met
@@
-869,12
+901,12
@@
class ResourceManager(Logger):
# Verify all start conditions are met
for (group, state, time) in start_conditions:
# Uncomment for debug
# Verify all start conditions are met
for (group, state, time) in start_conditions:
# Uncomment for debug
- unmet = []
- for guid in group:
- rm = self.ec.get_resource(guid)
- unmet.append((guid, rm._state))
-
- self.debug("---- WAITED STATES ---- %s" % unmet )
+
#
unmet = []
+
#
for guid in group:
+
#
rm = self.ec.get_resource(guid)
+
#
unmet.append((guid, rm._state))
+ #
+
#
self.debug("---- WAITED STATES ---- %s" % unmet )
reschedule, delay = self._needs_reschedule(group, state, time)
if reschedule:
reschedule, delay = self._needs_reschedule(group, state, time)
if reschedule:
@@
-892,7
+924,7
@@
class ResourceManager(Logger):
"""
reschedule = False
"""
reschedule = False
- delay = reschedule_delay
+ delay =
self.
reschedule_delay
## evaluate if conditions to stop are met
if self.ec.abort:
## evaluate if conditions to stop are met
if self.ec.abort:
@@
-925,7
+957,7
@@
class ResourceManager(Logger):
"""
reschedule = False
"""
reschedule = False
- delay = reschedule_delay
+ delay =
self.
reschedule_delay
## evaluate if conditions to deploy are met
if self.ec.abort:
## evaluate if conditions to deploy are met
if self.ec.abort:
@@
-934,6
+966,7
@@
class ResourceManager(Logger):
# only can deploy when RM is either NEW, DISCOVERED or PROVISIONED
if self.state not in [ResourceState.NEW, ResourceState.DISCOVERED,
ResourceState.PROVISIONED]:
# only can deploy when RM is either NEW, DISCOVERED or PROVISIONED
if self.state not in [ResourceState.NEW, ResourceState.DISCOVERED,
ResourceState.PROVISIONED]:
+ #### XXX: A.Q. IT SHOULD FAIL IF DEPLOY IS CALLED IN OTHER STATES!
reschedule = True
self.debug("---- RESCHEDULING DEPLOY ---- state %s " % self.state )
else:
reschedule = True
self.debug("---- RESCHEDULING DEPLOY ---- state %s " % self.state )
else:
@@
-1006,50
+1039,62
@@
class ResourceManager(Logger):
def do_fail(self):
self.set_failed()
def do_fail(self):
self.set_failed()
+ self.ec.inform_failure(self.guid)
- def set_started(self):
+ def set_started(self
, time = None
):
""" Mark ResourceManager as STARTED """
""" Mark ResourceManager as STARTED """
- self.set_state(ResourceState.STARTED, "_start_time")
+ self.set_state(ResourceState.STARTED, "_start_time"
, time
)
self.debug("----- STARTED ---- ")
self.debug("----- STARTED ---- ")
- def set_stopped(self):
+ def set_stopped(self
, time = None
):
""" Mark ResourceManager as STOPPED """
""" Mark ResourceManager as STOPPED """
- self.set_state(ResourceState.STOPPED, "_stop_time")
+ self.set_state(ResourceState.STOPPED, "_stop_time"
, time
)
self.debug("----- STOPPED ---- ")
self.debug("----- STOPPED ---- ")
- def set_ready(self):
+ def set_ready(self
, time = None
):
""" Mark ResourceManager as READY """
""" Mark ResourceManager as READY """
- self.set_state(ResourceState.READY, "_ready_time")
+ self.set_state(ResourceState.READY, "_ready_time"
, time
)
self.debug("----- READY ---- ")
self.debug("----- READY ---- ")
- def set_released(self):
+ def set_released(self
, time = None
):
""" Mark ResourceManager as REALEASED """
""" Mark ResourceManager as REALEASED """
- self.set_state(ResourceState.RELEASED, "_release_time")
- self.debug("----- RELEASED ---- ")
+ self.set_state(ResourceState.RELEASED, "_release_time", time)
+
+ msg = " %s guid %d ----- RELEASED ----- " % (self._rtype, self.guid)
+ logger = Logger(self._rtype)
+ logger.debug(msg)
- def set_failed(self):
+ def set_failed(self
, time = None
):
""" Mark ResourceManager as FAILED """
""" Mark ResourceManager as FAILED """
- self.set_state(ResourceState.FAILED, "_failed_time")
- self.debug("----- FAILED ---- ")
+ self.set_state(ResourceState.FAILED, "_failed_time", time)
- def set_discovered(self):
+ msg = " %s guid %d ----- FAILED ----- " % (self._rtype, self.guid)
+ logger = Logger(self._rtype)
+ logger.debug(msg)
+
+ def set_discovered(self, time = None):
""" Mark ResourceManager as DISCOVERED """
""" Mark ResourceManager as DISCOVERED """
- self.set_state(ResourceState.DISCOVERED, "_discover_time")
+ self.set_state(ResourceState.DISCOVERED, "_discover_time"
, time
)
self.debug("----- DISCOVERED ---- ")
self.debug("----- DISCOVERED ---- ")
- def set_provisioned(self):
+ def set_provisioned(self
, time = None
):
""" Mark ResourceManager as PROVISIONED """
""" Mark ResourceManager as PROVISIONED """
- self.set_state(ResourceState.PROVISIONED, "_provision_time")
+ self.set_state(ResourceState.PROVISIONED, "_provision_time"
, time
)
self.debug("----- PROVISIONED ---- ")
self.debug("----- PROVISIONED ---- ")
- def set_state(self, state, state_time_attr):
+ def set_state(self, state, state_time_attr
, time = None
):
""" Set the state of the RM while keeping a trace of the time """
# Ensure that RM state will not change after released
if self._state == ResourceState.RELEASED:
return
""" Set the state of the RM while keeping a trace of the time """
# Ensure that RM state will not change after released
if self._state == ResourceState.RELEASED:
return
-
- setattr(self, state_time_attr, tnow())
+
+ time = time or tnow()
+ self.set_state_time(state, state_time_attr, time)
+
+ def set_state_time(self, state, state_time_attr, time):
+ """ Set the time for the RM state change """
+ setattr(self, state_time_attr, time)
self._state = state
class ResourceFactory(object):
self._state = state
class ResourceFactory(object):
@@
-1077,7
+1122,7
@@
class ResourceFactory(object):
return rclass(ec, guid)
def populate_factory():
return rclass(ec, guid)
def populate_factory():
- """
Register all the possible RM that exists in the current version of Nepi.
+ """
Find and rgister all available RMs
"""
# Once the factory is populated, don't repopulate
if not ResourceFactory.resource_types():
"""
# Once the factory is populated, don't repopulate
if not ResourceFactory.resource_types():