From acf0b69092947f5d3df2887c15486c09eb85a326 Mon Sep 17 00:00:00 2001 From: Alina Quereilhac Date: Thu, 27 Jun 2013 10:36:20 -0700 Subject: [PATCH] Replace use of string formatted dates by datetime objects for Taks scheduling --- src/nepi/execution/ec.py | 13 +- src/nepi/execution/resource.py | 142 ++++++++++++++------- src/nepi/resources/linux/application.py | 8 +- src/nepi/resources/linux/ccn/ccncontent.py | 6 +- src/nepi/resources/linux/ccn/ccnd.py | 12 +- src/nepi/resources/linux/ccn/ccnr.py | 6 +- src/nepi/resources/omf/omf_api.py | 5 +- src/nepi/util/timefuncs.py | 91 +++++++++---- test/execution/scheduler.py | 65 ++++++++++ test/resources/omf/vlc.py | 26 ++-- 10 files changed, 268 insertions(+), 106 deletions(-) create mode 100755 test/execution/scheduler.py diff --git a/src/nepi/execution/ec.py b/src/nepi/execution/ec.py index d9508048..cc636a53 100644 --- a/src/nepi/execution/ec.py +++ b/src/nepi/execution/ec.py @@ -27,7 +27,7 @@ import threading from nepi.util import guid from nepi.util.parallel import ParallelRun -from nepi.util.timefuncs import strfnow, strfdiff, strfvalid +from nepi.util.timefuncs import tnow, tdiffsec, stabsformat from nepi.execution.resource import ResourceFactory, ResourceAction, \ ResourceState, ResourceState2str from nepi.execution.scheduler import HeapScheduler, Task, TaskStatus @@ -185,7 +185,7 @@ class ExperimentController(object): :param tid: Id of the task :type tid: int - :rtype: unknow + :rtype: Task """ return self._tasks.get(tid) @@ -194,7 +194,7 @@ class ExperimentController(object): :param guid: Id of the task :type guid: int - :rtype: ResourceManager + :rtype: ResourceManager """ return self._resources.get(guid) @@ -594,8 +594,7 @@ class ExperimentController(object): :return : The Id of the task """ - timestamp = strfvalid(date) - + timestamp = stabsformat(date) task = Task(timestamp, callback) task = self._scheduler.schedule(task) @@ -661,10 +660,10 @@ class ExperimentController(object): else: # The task timestamp is in the future. Wait for timeout # or until another task is scheduled. - now = strfnow() + now = tnow() if now < task.timestamp: # Calculate timeout in seconds - timeout = strfdiff(task.timestamp, now) + timeout = tdiffsec(task.timestamp, now) # Re-schedule task with the same timestamp self._scheduler.schedule(task) diff --git a/src/nepi/execution/resource.py b/src/nepi/execution/resource.py index 9f1cfa4f..0fcc1986 100644 --- a/src/nepi/execution/resource.py +++ b/src/nepi/execution/resource.py @@ -17,7 +17,7 @@ # # Author: Alina Quereilhac -from nepi.util.timefuncs import strfnow, strfdiff, strfvalid +from nepi.util.timefuncs import tnow, tdiff, tdiffsec, stabsformat from nepi.util.logger import Logger from nepi.execution.trace import TraceAttr @@ -208,10 +208,12 @@ class ResourceManager(Logger): self._provision_time = None self._ready_time = None self._release_time = None + self._finish_time = None + self._failed_time = None @property def guid(self): - """ Returns the guid of the current RM """ + """ Returns the global unique identifier of the RM """ return self._guid @property @@ -221,106 +223,134 @@ class ResourceManager(Logger): @property def connections(self): - """ Returns the set of connection for this RM""" + """ Returns the set of guids of connected RMs""" return self._connections @property def conditions(self): - """ Returns the list of conditions for this RM - The list is a dictionary with for each action, a list of tuple - describing the conditions. """ + """ Returns the conditions to which the RM is subjected to. + + The object returned by this method is a dictionary indexed by + ResourceAction.""" return self._conditions @property def start_time(self): - """ Returns timestamp with the time the RM started """ + """ Returns the start time of the RM as a timestamp""" return self._start_time @property def stop_time(self): - """ Returns timestamp with the time the RM stopped """ + """ Returns the stop time of the RM as a timestamp""" return self._stop_time @property def discover_time(self): - """ Returns timestamp with the time the RM passed to state discovered """ + """ Returns the time discovering was finished for the RM as a timestamp""" return self._discover_time @property def provision_time(self): - """ Returns timestamp with the time the RM passed to state provisioned """ + """ Returns the time provisioning was finished for the RM as a timestamp""" return self._provision_time @property def ready_time(self): - """ Returns timestamp with the time the RM passed to state ready """ + """ Returns the time deployment was finished for the RM as a timestamp""" return self._ready_time @property def release_time(self): - """ Returns timestamp with the time the RM was released """ + """ Returns the release time of the RM as a timestamp""" return self._release_time + @property + def finish_time(self): + """ Returns the finalization time of the RM as a timestamp""" + return self._finish_time + + @property + def failed_time(self): + """ Returns the time failure occured for the RM as a timestamp""" + return self._failed_time + @property def state(self): """ Get the state of the current RM """ return self._state def log_message(self, msg): - """ Improve debugging message by adding more information - as the guid and the type of the RM + """ Returns the log message formatted with added information. - :param msg: Message to log + :param msg: text message :type msg: str :rtype: str """ return " %s guid: %d - %s " % (self._rtype, self.guid, msg) def connect(self, guid): - """ Connect the current RM with the RM 'guid' + """ Establishes a connection to the RM identified by guid - :param guid: Guid of the RM the current RM will be connected + :param guid: Global unique identified of the RM to connect to :type guid: int """ if self.valid_connection(guid): self._connections.add(guid) + def disconnect(self, guid): + """ Removes connection to the RM identified by guid + + :param guid: Global unique identified of the RM to connect to + :type guid: int + """ + if guid in self._connections: + self._connections.remove(guid) + def discover(self): - """ Discover the Resource. As it is specific for each RM, - this method take the time when the RM become DISCOVERED and - change the status """ - self._discover_time = strfnow() + """ Performs resource discovery. + + This method is resposible for selecting an individual resource + matching user requirements. + This method should be redefined when necessary in child classes. + """ + self._discover_time = tnow() self._state = ResourceState.DISCOVERED def provision(self): - """ Provision the Resource. As it is specific for each RM, - this method take the time when the RM become PROVISIONNED and - change the status """ - self._provision_time = strfnow() + """ Performs resource provisioning. + + This method is resposible for provisioning one resource. + After this method has been successfully invoked, the resource + should be acccesible/controllable by the RM. + This method should be redefined when necessary in child classes. + """ + self._provision_time = tnow() self._state = ResourceState.PROVISIONED def start(self): - """ Start the Resource Manager. As it is specific to each RM, this methods - just change, after some verifications, the status to STARTED and save the time. - + """ Starts the resource. + + There is no generic start behavior for all resources. + This method should be redefined when necessary in child classes. """ if not self._state in [ResourceState.READY, ResourceState.STOPPED]: self.error("Wrong state %s for start" % self.state) return - self._start_time = strfnow() + self._start_time = tnow() self._state = ResourceState.STARTED def stop(self): - """ Stop the Resource Manager. As it is specific to each RM, this methods - just change, after some verifications, the status to STOPPED and save the time. - + """ Stops the resource. + + There is no generic stop behavior for all resources. + This method should be redefined when necessary in child classes. """ if not self._state in [ResourceState.STARTED]: self.error("Wrong state %s for stop" % self.state) return - self._stop_time = strfnow() + self._stop_time = tnow() self._state = ResourceState.STOPPED def set(self, name, value): @@ -335,7 +365,7 @@ class ResourceManager(Logger): attr.value = value def get(self, name): - """ Start the Resource Manager + """ Returns the value of the attribute :param name: Name of the attribute :type name: str @@ -345,7 +375,7 @@ class ResourceManager(Logger): return attr.value def register_trace(self, name): - """ Enable trace + """ Explicitly enable trace generation :param name: Name of the trace :type name: str @@ -404,8 +434,8 @@ class ResourceManager(Logger): conditions.append((group, state, time)) - def get_connected(self, rtype): - """ Return the list of RM with the type 'rtype' + def get_connected(self, rtype = None): + """ Returns the list of RM with the type 'rtype' :param rtype: Type of the RM we look for :type rtype: str @@ -414,7 +444,7 @@ class ResourceManager(Logger): connected = [] for guid in self.connections: rm = self.ec.get_resource(guid) - if rm.rtype() == rtype: + if not rtype or rm.rtype() == rtype: connected.append(rm) return connected @@ -463,12 +493,17 @@ class ResourceManager(Logger): # Only keep time information for START and STOP break - d = strfdiff(strfnow(), t) - wait = strfdiff(strfvalid(time),strfvalid(str(d)+"s")) - if wait > 0.001: + # time already elapsed since RM changed state + waited = "%fs" % tdiffsec(tnow(), t) + + # time still to wait + wait = tdiffsec(stabsformat(time), stabsformat(waited)) + + if wait > 0: reschedule = True delay = "%fs" % wait break + return reschedule, delay def set_with_conditions(self, name, value, group, state, time): @@ -584,19 +619,34 @@ class ResourceManager(Logger): return self.debug("----- READY ---- ") - self._ready_time = strfnow() + self._ready_time = tnow() self._state = ResourceState.READY def release(self): - """Clean the resource at the end of the Experiment and change the status + """Release any resources used by this RM """ - self._release_time = strfnow() + self._release_time = tnow() self._state = ResourceState.RELEASED + def finish(self): + """ Mark ResourceManager as FINISHED + + """ + self._finish_time = tnow() + self._state = ResourceState.FINISHED + + def fail(self): + """ Mark ResourceManager as FAILED + + """ + self._failed_time = tnow() + self._state = ResourceState.FAILED + def valid_connection(self, guid): - """Check if the connection is available. This method need to be - redefined by each new Resource Manager. + """Checks whether a connection with the other RM + is valid. + This method need to be redefined by each new Resource Manager. :param guid: Guid of the current Resource Manager :type guid: int diff --git a/src/nepi/resources/linux/application.py b/src/nepi/resources/linux/application.py index ea4b0a4b..dc502308 100644 --- a/src/nepi/resources/linux/application.py +++ b/src/nepi/resources/linux/application.py @@ -22,7 +22,7 @@ from nepi.execution.trace import Trace, TraceAttr from nepi.execution.resource import ResourceManager, clsinit, ResourceState from nepi.resources.linux.node import LinuxNode from nepi.util.sshfuncs import ProcStatus -from nepi.util.timefuncs import strfnow, strfdiff +from nepi.util.timefuncs import tnow, tdiffsec import os import subprocess @@ -120,7 +120,7 @@ class LinuxApplication(ResourceManager): self._proc = None # timestamp of last state check of the application - self._last_state_check = strfnow() + self._last_state_check = tnow() def log_message(self, msg): return " guid %d - host %s - %s " % (self.guid, @@ -556,7 +556,7 @@ class LinuxApplication(ResourceManager): # the local processor with too many ssh queries, the state is only # requested every 'state_check_delay' seconds. state_check_delay = 0.5 - if strfdiff(strfnow(), self._last_state_check) > state_check_delay: + if tdiffsec(tnow(), self._last_state_check) > state_check_delay: # check if execution errors occurred (out, err), proc = self.node.check_errors(self.app_home) @@ -573,7 +573,7 @@ class LinuxApplication(ResourceManager): if status == ProcStatus.FINISHED: self._state = ResourceState.FINISHED - self._last_state_check = strfnow() + self._last_state_check = tnow() return self._state diff --git a/src/nepi/resources/linux/ccn/ccncontent.py b/src/nepi/resources/linux/ccn/ccncontent.py index 3b081710..f6082120 100644 --- a/src/nepi/resources/linux/ccn/ccncontent.py +++ b/src/nepi/resources/linux/ccn/ccncontent.py @@ -22,7 +22,7 @@ from nepi.execution.resource import clsinit_copy, ResourceState, \ ResourceAction from nepi.resources.linux.ccn.ccnapplication import LinuxCCNApplication from nepi.resources.linux.ccn.ccnr import LinuxCCNR -from nepi.util.timefuncs import strfnow, strfdiff +from nepi.util.timefuncs import tnow import os @@ -91,7 +91,7 @@ class LinuxCCNContent(LinuxCCNApplication): self.execute_command(command, env) self.debug("----- READY ---- ") - self._ready_time = strfnow() + self._ready_time = tnow() self._state = ResourceState.READY def start(self): @@ -99,7 +99,7 @@ class LinuxCCNContent(LinuxCCNApplication): command = self.get("command") self.info("Starting command '%s'" % command) - self._start_time = strfnow() + self._start_time = tnow() self._state = ResourceState.STARTED else: msg = " Failed to execute command '%s'" % command diff --git a/src/nepi/resources/linux/ccn/ccnd.py b/src/nepi/resources/linux/ccn/ccnd.py index bd001b4a..6ebe5e37 100644 --- a/src/nepi/resources/linux/ccn/ccnd.py +++ b/src/nepi/resources/linux/ccn/ccnd.py @@ -22,7 +22,7 @@ from nepi.execution.trace import Trace, TraceAttr from nepi.execution.resource import ResourceManager, clsinit_copy, ResourceState from nepi.resources.linux.application import LinuxApplication from nepi.resources.linux.node import OSType -from nepi.util.timefuncs import strfnow, strfdiff +from nepi.util.timefuncs import tnow, tdiff import os @@ -191,7 +191,7 @@ class LinuxCCND(LinuxApplication): raise_on_error = True) self.debug("----- READY ---- ") - self._ready_time = strfnow() + self._ready_time = tnow() self._state = ResourceState.READY def start(self): @@ -199,7 +199,7 @@ class LinuxCCND(LinuxApplication): command = self.get("command") self.info("Starting command '%s'" % command) - self._start_time = strfnow() + self._start_time = tnow() self._state = ResourceState.STARTED else: msg = " Failed to execute command '%s'" % command @@ -230,7 +230,7 @@ class LinuxCCND(LinuxApplication): stdout = "ccndstop_stdout", stderr = "ccndstop_stderr") - self._stop_time = strfnow() + self._stop_time = tnow() self._state = ResourceState.STOPPED @property @@ -238,7 +238,7 @@ class LinuxCCND(LinuxApplication): # First check if the ccnd has failed state_check_delay = 0.5 if self._state == ResourceState.STARTED and \ - strfdiff(strfnow(), self._last_state_check) > state_check_delay: + tdiff(tnow(), self._last_state_check) > state_check_delay: (out, err), proc = self._ccndstatus retcode = proc.poll() @@ -252,7 +252,7 @@ class LinuxCCND(LinuxApplication): self.error(msg, out, err) self._state = ResourceState.FAILED - self._last_state_check = strfnow() + self._last_state_check = tnow() return self._state diff --git a/src/nepi/resources/linux/ccn/ccnr.py b/src/nepi/resources/linux/ccn/ccnr.py index ef7ac26a..f2e6c0e1 100644 --- a/src/nepi/resources/linux/ccn/ccnr.py +++ b/src/nepi/resources/linux/ccn/ccnr.py @@ -23,7 +23,7 @@ from nepi.execution.resource import clsinit_copy, ResourceState, \ ResourceAction from nepi.resources.linux.ccn.ccnapplication import LinuxCCNApplication from nepi.resources.linux.ccn.ccnd import LinuxCCND -from nepi.util.timefuncs import strfnow, strfdiff +from nepi.util.timefuncs import tnow import os @@ -218,7 +218,7 @@ class LinuxCCNR(LinuxCCNApplication): raise_on_error = True) self.debug("----- READY ---- ") - self._ready_time = strfnow() + self._ready_time = tnow() self._state = ResourceState.READY def start(self): @@ -226,7 +226,7 @@ class LinuxCCNR(LinuxCCNApplication): command = self.get("command") self.info("Starting command '%s'" % command) - self._start_time = strfnow() + self._start_time = tnow() self._state = ResourceState.STARTED else: msg = " Failed to execute command '%s'" % command diff --git a/src/nepi/resources/omf/omf_api.py b/src/nepi/resources/omf/omf_api.py index 9654b68d..3316f017 100644 --- a/src/nepi/resources/omf/omf_api.py +++ b/src/nepi/resources/omf/omf_api.py @@ -18,7 +18,6 @@ # Author: Alina Quereilhac # Julien Tribino -import datetime import ssl import sys import time @@ -30,6 +29,8 @@ from nepi.util.logger import Logger from nepi.resources.omf.omf_client import OMFClient from nepi.resources.omf.messages_5_4 import MessageHandler +from nepi.util.timefuncs import tsfromat + class OMFAPI(Logger): """ .. class:: Class Args : @@ -67,7 +68,7 @@ class OMFAPI(Logger): """ super(OMFAPI, self).__init__("OMFAPI") - date = datetime.datetime.now().strftime("%Y-%m-%dt%H.%M.%S") + date = tsfromat() tz = -time.altzone if time.daylight != 0 else -time.timezone date += "%+06.2f" % (tz / 3600) # timezone difference is in seconds self._user = "%s-%s" % (slice, date) diff --git a/src/nepi/util/timefuncs.py b/src/nepi/util/timefuncs.py index 18ed0275..625dd704 100644 --- a/src/nepi/util/timefuncs.py +++ b/src/nepi/util/timefuncs.py @@ -28,33 +28,67 @@ _rerel = re.compile("^(?P