X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=src%2Fnepi%2Fresources%2Fall%2Fcollector.py;h=792e25fbdb966898b362cffb47508e1efab36519;hb=bac63fdc5983e2ade1902f711c1e7899d82ca4ae;hp=2dfd2007e505329ae20f72db5b4518727ced214b;hpb=3ec952ecb376f66a3c083249c6deec2f19b82947;p=nepi.git diff --git a/src/nepi/resources/all/collector.py b/src/nepi/resources/all/collector.py index 2dfd2007..792e25fb 100644 --- a/src/nepi/resources/all/collector.py +++ b/src/nepi/resources/all/collector.py @@ -19,16 +19,16 @@ from nepi.execution.attribute import Attribute, Flags, Types from nepi.execution.trace import Trace, TraceAttr -from nepi.execution.resource import ResourceManager, clsinit, ResourceState, \ - ResourceAction +from nepi.execution.resource import ResourceManager, clsinit_copy, \ + ResourceState, ResourceAction from nepi.util.sshfuncs import ProcStatus import os import tempfile -@clsinit +@clsinit_copy class Collector(ResourceManager): - """ The collector is reponsible of collecting traces + """ The collector entity is reponsible of collecting traces of a same type associated to RMs into a local directory. .. class:: Class Args : @@ -37,25 +37,31 @@ class Collector(ResourceManager): :type ec: ExperimentController :param guid: guid of the RM :type guid: int - :param creds: Credentials to communicate with the rm (XmppClient) - :type creds: dict """ _rtype = "Collector" + _help = "A Collector can be attached to a trace name on another " \ + "ResourceManager and will retrieve and store the trace content " \ + "in a local file at the end of the experiment" + _backend_type = "all" @classmethod def _register_attributes(cls): - trace_name = Attribute("traceName", "Name of the trace to be collected", - flags = Flags.ExecReadOnly) - store_dir = Attribute("storeDir", "Path to local directory to store trace results", - default = tempfile.gettempdir(), - flags = Flags.ExecReadOnly) - sub_dir = Attribute("subDir", "Sub directory to collect traces into", - flags = Flags.ExecReadOnly) + trace_name = Attribute("traceName", + "Name of the trace to be collected", + flags = Flags.Design) + + sub_dir = Attribute("subDir", + "Sub directory to collect traces into", + flags = Flags.Design) + + rename = Attribute("rename", + "Name to give to the collected trace file", + flags = Flags.Design) cls._register_attribute(trace_name) - cls._register_attribute(store_dir) cls._register_attribute(sub_dir) + cls._register_attribute(rename) def __init__(self, ec, guid): super(Collector, self).__init__(ec, guid) @@ -64,8 +70,8 @@ class Collector(ResourceManager): @property def store_path(self): return self._store_path - - def provision(self): + + def do_provision(self): trace_name = self.get("traceName") if not trace_name: self.fail() @@ -74,15 +80,14 @@ class Collector(ResourceManager): self.error(msg) raise RuntimeError, msg - store_dir = self.get("storeDir") - self._store_path = os.path.join(store_dir, self.ec.exp_id, self.ec.run_id) + self._store_path = self.ec.run_dir subdir = self.get("subDir") if subdir: - self._store_path = os.path.join(self._store_path, subdir) + self._store_path = os.path.join(self.store_path, subdir) msg = "Creating local directory at %s to store %s traces " % ( - store_dir, trace_name) + self.store_path, trace_name) self.info(msg) try: @@ -90,20 +95,17 @@ class Collector(ResourceManager): except OSError: pass - super(Collector, self).provision() + super(Collector, self).do_provision() - def deploy(self): - try: - self.discover() - self.provision() - except: - self.fail() - raise + def do_deploy(self): + self.do_discover() + self.do_provision() - super(Collector, self).deploy() + super(Collector, self).do_deploy() - def release(self): + def do_release(self): trace_name = self.get("traceName") + rename = self.get("rename") or trace_name msg = "Collecting '%s' traces to local directory %s" % ( trace_name, self.store_path) @@ -113,12 +115,20 @@ class Collector(ResourceManager): for rm in rms: result = self.ec.trace(rm.guid, trace_name) fpath = os.path.join(self.store_path, "%d.%s" % (rm.guid, - trace_name)) - f = open(fpath, "w") - f.write(result) - f.close() - - super(Collector, self).release() + rename)) + try: + f = open(fpath, "w") + f.write(result) + f.close() + except: + import traceback + err = traceback.format_exc() + msg = "Couldn't retrieve trace %s for %d at %s " % (trace_name, + rm.guid, fpath) + self.error(msg, out = "", err = err) + continue + + super(Collector, self).do_release() def valid_connection(self, guid): # TODO: Validate!