X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=src%2Fnepi%2Fresources%2Fall%2Fcollector.py;h=4729e0153b7774d4973b4e4d22bbc6d0eed0a753;hb=6285ca51026efb69642eea9dfc7c480e722d84a9;hp=61efa22b984efd2092e7ff1b25468c2b9ebf341c;hpb=a0eefc4e266c584dfa3363e30154b3fdaa5b4f60;p=nepi.git diff --git a/src/nepi/resources/all/collector.py b/src/nepi/resources/all/collector.py index 61efa22b..4729e015 100644 --- a/src/nepi/resources/all/collector.py +++ b/src/nepi/resources/all/collector.py @@ -3,9 +3,8 @@ # Copyright (C) 2013 INRIA # # This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. +# it under the terms of the GNU General Public License version 2 as +# published by the Free Software Foundation; # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of @@ -19,16 +18,16 @@ from nepi.execution.attribute import Attribute, Flags, Types from nepi.execution.trace import Trace, TraceAttr -from nepi.execution.resource import ResourceManager, clsinit, ResourceState, \ - ResourceAction +from nepi.execution.resource import ResourceManager, clsinit_copy, \ + ResourceState, ResourceAction from nepi.util.sshfuncs import ProcStatus import os import tempfile -@clsinit +@clsinit_copy class Collector(ResourceManager): - """ The collector is reponsible of collecting traces + """ The collector entity is reponsible of collecting traces of a same type associated to RMs into a local directory. .. class:: Class Args : @@ -37,26 +36,29 @@ class Collector(ResourceManager): :type ec: ExperimentController :param guid: guid of the RM :type guid: int - :param creds: Credentials to communicate with the rm (XmppClient) - :type creds: dict """ _rtype = "Collector" + _help = "A Collector can be attached to a trace name on another " \ + "ResourceManager and will retrieve and store the trace content " \ + "in a local file at the end of the experiment" + _platform = "all" @classmethod def _register_attributes(cls): - trace_name = Attribute("traceName", "Name of the trace to be collected", - flags = Flags.ExecReadOnly) - store_dir = Attribute("storeDir", "Path to local directory to store trace results", - default = tempfile.gettempdir(), - flags = Flags.ExecReadOnly) - sub_dir = Attribute("subDir", "Sub directory to collect traces into", - flags = Flags.ExecReadOnly) - rename = Attribute("rename", "Name to give to the collected trace file", - flags = Flags.ExecReadOnly) + trace_name = Attribute("traceName", + "Name of the trace to be collected", + flags = Flags.Design) + + sub_dir = Attribute("subDir", + "Sub directory to collect traces into", + flags = Flags.Design) + + rename = Attribute("rename", + "Name to give to the collected trace file", + flags = Flags.Design) cls._register_attribute(trace_name) - cls._register_attribute(store_dir) cls._register_attribute(sub_dir) cls._register_attribute(rename) @@ -67,25 +69,24 @@ class Collector(ResourceManager): @property def store_path(self): return self._store_path - - def provision(self): + + def do_provision(self): trace_name = self.get("traceName") if not trace_name: self.fail() msg = "No traceName was specified" self.error(msg) - raise RuntimeError, msg + raise RuntimeError(msg) - store_dir = self.get("storeDir") - self._store_path = os.path.join(store_dir, self.ec.exp_id, self.ec.run_id) + self._store_path = self.ec.run_dir subdir = self.get("subDir") if subdir: - self._store_path = os.path.join(self._store_path, subdir) + self._store_path = os.path.join(self.store_path, subdir) msg = "Creating local directory at %s to store %s traces " % ( - store_dir, trace_name) + self.store_path, trace_name) self.info(msg) try: @@ -93,19 +94,15 @@ class Collector(ResourceManager): except OSError: pass - super(Collector, self).provision() + super(Collector, self).do_provision() - def deploy(self): - try: - self.discover() - self.provision() - except: - self.fail() - raise + def do_deploy(self): + self.do_discover() + self.do_provision() - super(Collector, self).deploy() + super(Collector, self).do_deploy() - def release(self): + def do_release(self): trace_name = self.get("traceName") rename = self.get("rename") or trace_name @@ -115,14 +112,22 @@ class Collector(ResourceManager): rms = self.get_connected() for rm in rms: - result = self.ec.trace(rm.guid, trace_name) fpath = os.path.join(self.store_path, "%d.%s" % (rm.guid, - rename)) - f = open(fpath, "w") - f.write(result) - f.close() - - super(Collector, self).release() + rename)) + + try: + result = self.ec.trace(rm.guid, trace_name) + with open(fpath, "w") as f: + f.write(result) + except: + import traceback + err = traceback.format_exc() + msg = "Couldn't retrieve trace %s for %d at %s " % (trace_name, + rm.guid, fpath) + self.error(msg, out = "", err = err) + continue + + super(Collector, self).do_release() def valid_connection(self, guid): # TODO: Validate!