from neco.util.timefuncs import strfnow, strfdiff, strfvalid
+from neco.execution.trace import TraceAttr
import copy
import functools
+import inspect
import logging
+import os
+import pkgutil
import weakref
-_reschedule_delay = "1s"
+reschedule_delay = "0.5s"
class ResourceAction:
DEPLOY = 0
READY = 3
STARTED = 4
STOPPED = 5
- FAILED = 6
- RELEASED = 7
+ FINISHED = 6
+ FAILED = 7
+ RELEASED = 8
def clsinit(cls):
cls._clsinit()
_rtype = "Resource"
_filters = None
_attributes = None
+ _traces = None
@classmethod
def _register_filter(cls, attr):
cls._attributes[attr.name] = attr
@classmethod
- def _register_filters(cls):
+ def _register_trace(cls, trace):
""" Resource subclasses will invoke this method to add a
- filter attribute
+ resource trace
+
+ """
+ cls._traces[trace.name] = trace
+
+
+ @classmethod
+ def _register_filters(cls):
+ """ Resource subclasses will invoke this method to register
+ resource filters
"""
pass
@classmethod
def _register_attributes(cls):
- """ Resource subclasses will invoke this method to add a
- resource attribute
+ """ Resource subclasses will invoke this method to register
+ resource attributes
+
+ """
+ pass
+
+ @classmethod
+ def _register_traces(cls):
+ """ Resource subclasses will invoke this method to register
+ resource traces
"""
pass
cls._attributes = dict()
cls._register_attributes()
+ # static template for resource traces
+ cls._traces = dict()
+ cls._register_traces()
+
@classmethod
def rtype(cls):
return cls._rtype
"""
return copy.deepcopy(cls._attributes.values())
+ @classmethod
+ def get_traces(cls):
+ """ Returns a copy of the traces
+
+ """
+ return copy.deepcopy(cls._traces.values())
+
def __init__(self, ec, guid):
self._guid = guid
self._ec = weakref.ref(ec)
self._conditions = dict()
# the resource instance gets a copy of all attributes
- # that can modify
self._attrs = copy.deepcopy(self._attributes)
+ # the resource instance gets a copy of all traces
+ self._trcs = copy.deepcopy(self._traces)
+
self._state = ResourceState.NEW
self._start_time = None
self._release_time = None
# Logging
- self._logger = logging.getLogger("neco.execution.resource.Resource %s.%d " % (self._rtype, self.guid))
+ self._logger = logging.getLogger("Resource")
+
+ def debug(self, msg, out = None, err = None):
+ self.log(msg, logging.DEBUG, out, err)
+
+ def error(self, msg, out = None, err = None):
+ self.log(msg, logging.ERROR, out, err)
+
+ def warn(self, msg, out = None, err = None):
+ self.log(msg, logging.WARNING, out, err)
+
+ def info(self, msg, out = None, err = None):
+ self.log(msg, logging.INFO, out, err)
+
+ def log(self, msg, level, out = None, err = None):
+ if out:
+ msg += " - OUT: %s " % out
+
+ if err:
+ msg += " - ERROR: %s " % err
+
+ msg = self.log_message(msg)
+
+ self.logger.log(level, msg)
+
+ def log_message(self, msg):
+ return " %s guid: %d - %s " % (self._rtype, self.guid, msg)
@property
def logger(self):
"""
if not self._state in [ResourceState.READY, ResourceState.STOPPED]:
- self.logger.error("Wrong state %s for start" % self.state)
+ self.error("Wrong state %s for start" % self.state)
return
self._start_time = strfnow()
"""
if not self._state in [ResourceState.STARTED]:
- self.logger.error("Wrong state %s for stop" % self.state)
+ self.error("Wrong state %s for stop" % self.state)
return
self._stop_time = strfnow()
:type name: str
:param name: Value of the attribute
:type name: str
- :rtype: Boolean
"""
attr = self._attrs[name]
attr.value = value
attr = self._attrs[name]
return attr.value
+ def register_trace(self, name):
+ """ Enable trace
+
+ :param name: Name of the trace
+ :type name: str
+ """
+ trace = self._trcs[name]
+ trace.enabled = True
+
+ def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
+ """ Get information on collected trace
+
+ :param name: Name of the trace
+ :type name: str
+
+ :param attr: Can be one of:
+ - TraceAttr.ALL (complete trace content),
+ - TraceAttr.STREAM (block in bytes to read starting at offset),
+ - TraceAttr.PATH (full path to the trace file),
+ - TraceAttr.SIZE (size of trace file).
+ :type attr: str
+
+ :param block: Number of bytes to retrieve from trace, when attr is TraceAttr.STREAM
+ :type name: int
+
+ :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM
+ :type name: int
+
+ :rtype: str
+ """
+ pass
+
def register_condition(self, action, group, state,
time = None):
""" Registers a condition on the resource manager to allow execution
conditions.append((group, state, time))
+ def get_connected(self, rtype):
+ connected = []
+ for guid in self.connections:
+ rm = self.ec.get_resource(guid)
+ if rm.rtype() == rtype:
+ connected.append(rm)
+ return connected
+
def _needs_reschedule(self, group, state, time):
""" Internal method that verify if 'time' has elapsed since
all elements in 'group' have reached state 'state'.
"""
reschedule = False
- delay = _reschedule_delay
+ delay = reschedule_delay
# check state and time elapsed on all RMs
for guid in group:
"""
reschedule = False
- delay = _reschedule_delay
+ delay = reschedule_delay
## evaluate if set conditions are met
"""
reschedule = False
- delay = _reschedule_delay
+ delay = reschedule_delay
## evaluate if set conditions are met
# only can start when RM is either STOPPED or READY
if self.state not in [ResourceState.STOPPED, ResourceState.READY]:
reschedule = True
+ self.debug("---- RESCHEDULING START ---- state %s " % self.state )
else:
- self.logger.debug("---- START CONDITIONS ---- %s" %
+ self.debug("---- START CONDITIONS ---- %s" %
self.conditions.get(ResourceAction.START))
# Verify all start conditions are met
if reschedule:
self.ec.schedule(delay, self.start_with_conditions)
else:
- self.logger.debug("----- STARTING ---- ")
+ self.debug("----- STARTING ---- ")
self.start()
def stop_with_conditions(self):
"""
reschedule = False
- delay = _reschedule_delay
+ delay = reschedule_delay
## evaluate if set conditions are met
if self.state != ResourceState.STARTED:
reschedule = True
else:
- self.logger.debug(" ---- STOP CONDITIONS ---- %s" %
+ self.debug(" ---- STOP CONDITIONS ---- %s" %
self.conditions.get(ResourceAction.STOP))
stop_conditions = self.conditions.get(ResourceAction.STOP, [])
"""
if self._state > ResourceState.READY:
- self.logger.error("Wrong state %s for deploy" % self.state)
+ self.error("Wrong state %s for deploy" % self.state)
return
+ self.debug("----- DEPLOYING ---- ")
self._ready_time = strfnow()
self._state = ResourceState.READY
rclass = cls._resource_types[rtype]
return rclass(ec, guid)
+def populate_factory():
+ for rclass in find_types():
+ ResourceFactory.register_type(rclass)
+
+def find_types():
+ search_path = os.environ.get("NECO_SEARCH_PATH", "")
+ search_path = set(search_path.split(" "))
+
+ import neco.resources
+ path = os.path.dirname(neco.resources.__file__)
+ search_path.add(path)
+
+ types = []
+
+ for importer, modname, ispkg in pkgutil.walk_packages(search_path):
+ loader = importer.find_module(modname)
+ try:
+ module = loader.load_module(loader.fullname)
+ for attrname in dir(module):
+ if attrname.startswith("_"):
+ continue
+
+ attr = getattr(module, attrname)
+
+ if attr == ResourceManager:
+ continue
+
+ if not inspect.isclass(attr):
+ continue
+
+ if issubclass(attr, ResourceManager):
+ types.append(attr)
+ except:
+ import traceback
+ err = traceback.format_exc()
+ logger = logging.getLogger("Resource.find_types()")
+ logger.error("Error while lading Resource Managers %s" % err)
+
+ return types
+
+