X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;ds=sidebyside;f=src%2Fnepi%2Fresources%2Fall%2Fcollector.py;h=af0811cf62bd97a9518b6e84c8d58476e2da5bd6;hb=6096716dbc88a1d9e6a1be8cac477006225d890e;hp=6101cdd8d2fe7d7873a4d357e4ac1fdb435b439a;hpb=bd01c32d68ab642b6220a210fa6d534a0c7bd420;p=nepi.git diff --git a/src/nepi/resources/all/collector.py b/src/nepi/resources/all/collector.py index 6101cdd8..af0811cf 100644 --- a/src/nepi/resources/all/collector.py +++ b/src/nepi/resources/all/collector.py @@ -19,16 +19,16 @@ from nepi.execution.attribute import Attribute, Flags, Types from nepi.execution.trace import Trace, TraceAttr -from nepi.execution.resource import ResourceManager, clsinit, ResourceState, \ - ResourceAction +from nepi.execution.resource import ResourceManager, clsinit_copy, \ + ResourceState, ResourceAction from nepi.util.sshfuncs import ProcStatus import os import tempfile -@clsinit +@clsinit_copy class Collector(ResourceManager): - """ The collector is reponsible of collecting traces + """ The collector entity is reponsible of collecting traces of a same type associated to RMs into a local directory. .. class:: Class Args : @@ -37,28 +37,45 @@ class Collector(ResourceManager): :type ec: ExperimentController :param guid: guid of the RM :type guid: int - :param creds: Credentials to communicate with the rm (XmppClient) - :type creds: dict """ _rtype = "Collector" + _help = "A Collector can be attached to a trace name on another " \ + "ResourceManager and will retrieve and store the trace content " \ + "in a local file at the end of the experiment" + _backend_type = "all" @classmethod def _register_attributes(cls): - trace_name = Attribute("traceName", "Name of the trace to be collected", - flags = Flags.ExecReadOnly) - store_dir = Attribute("storeDir", "Path to local directory to store trace results", + trace_name = Attribute("traceName", + "Name of the trace to be collected", + flags = Flags.Design) + + store_dir = Attribute("storeDir", + "Path to local directory to store trace results", default = tempfile.gettempdir(), - flags = Flags.ExecReadOnly) - sub_dir = Attribute("subDir", "Sub directory to collect traces into", - flags = Flags.ExecReadOnly) - rename = Attribute("rename", "Name to give to the collected trace file", - flags = Flags.ExecReadOnly) + flags = Flags.Design) + + use_run_id = Attribute("useRunId", + "If set to True stores traces into a sub directory named after " + "the RUN ID assigned by the EC", + type = Types.Bool, + default = False, + flags = Flags.Design) + + sub_dir = Attribute("subDir", + "Sub directory to collect traces into", + flags = Flags.Design) + + rename = Attribute("rename", + "Name to give to the collected trace file", + flags = Flags.Design) cls._register_attribute(trace_name) cls._register_attribute(store_dir) cls._register_attribute(sub_dir) cls._register_attribute(rename) + cls._register_attribute(use_run_id) def __init__(self, ec, guid): super(Collector, self).__init__(ec, guid) @@ -67,8 +84,8 @@ class Collector(ResourceManager): @property def store_path(self): return self._store_path - - def provision(self): + + def do_provision(self): trace_name = self.get("traceName") if not trace_name: self.fail() @@ -77,15 +94,17 @@ class Collector(ResourceManager): self.error(msg) raise RuntimeError, msg - store_dir = self.get("storeDir") - self._store_path = os.path.join(store_dir, self.ec.exp_id, self.ec.run_id) + self._store_path = self.get("storeDir") + + if self.get("useRunId"): + self._store_path = os.path.join(self._store_path, self.ec.run_id) subdir = self.get("subDir") if subdir: self._store_path = os.path.join(self._store_path, subdir) msg = "Creating local directory at %s to store %s traces " % ( - store_dir, trace_name) + self._store_path, trace_name) self.info(msg) try: @@ -93,19 +112,15 @@ class Collector(ResourceManager): except OSError: pass - super(Collector, self).provision() + super(Collector, self).do_provision() - def deploy(self): - try: - self.discover() - self.provision() - except: - self.fail() - raise + def do_deploy(self): + self.do_discover() + self.do_provision() - super(Collector, self).deploy() + super(Collector, self).do_deploy() - def release(self): + def do_release(self): trace_name = self.get("traceName") rename = self.get("rename") or trace_name @@ -123,12 +138,14 @@ class Collector(ResourceManager): f.write(result) f.close() except: + import traceback + err = traceback.format_exc() msg = "Couldn't retrieve trace %s for %d at %s " % (trace_name, rm.guid, fpath) - self.error(msg) + self.error(msg, out = "", err = err) continue - super(Collector, self).release() + super(Collector, self).do_release() def valid_connection(self, guid): # TODO: Validate!