from nepi.util import guid
from nepi.util.parallel import ParallelRun
-from nepi.util.timefuncs import strfnow, strfdiff, strfvalid
+from nepi.util.timefuncs import tnow, tdiffsec, stabsformat
from nepi.execution.resource import ResourceFactory, ResourceAction, \
ResourceState, ResourceState2str
from nepi.execution.scheduler import HeapScheduler, Task, TaskStatus
:param tid: Id of the task
:type tid: int
- :rtype: unknow
+ :rtype: Task
"""
return self._tasks.get(tid)
:param guid: Id of the task
:type guid: int
- :rtype: ResourceManager
+ :rtype: ResourceManager
"""
return self._resources.get(guid)
:return : The Id of the task
"""
- timestamp = strfvalid(date)
-
+ timestamp = stabsformat(date)
task = Task(timestamp, callback)
task = self._scheduler.schedule(task)
else:
# The task timestamp is in the future. Wait for timeout
# or until another task is scheduled.
- now = strfnow()
+ now = tnow()
if now < task.timestamp:
# Calculate timeout in seconds
- timeout = strfdiff(task.timestamp, now)
+ timeout = tdiffsec(task.timestamp, now)
# Re-schedule task with the same timestamp
self._scheduler.schedule(task)
#
# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
-from nepi.util.timefuncs import strfnow, strfdiff, strfvalid
+from nepi.util.timefuncs import tnow, tdiff, tdiffsec, stabsformat
from nepi.util.logger import Logger
from nepi.execution.trace import TraceAttr
self._provision_time = None
self._ready_time = None
self._release_time = None
+ self._finish_time = None
+ self._failed_time = None
@property
def guid(self):
- """ Returns the guid of the current RM """
+ """ Returns the global unique identifier of the RM """
return self._guid
@property
@property
def connections(self):
- """ Returns the set of connection for this RM"""
+ """ Returns the set of guids of connected RMs"""
return self._connections
@property
def conditions(self):
- """ Returns the list of conditions for this RM
- The list is a dictionary with for each action, a list of tuple
- describing the conditions. """
+ """ Returns the conditions to which the RM is subjected to.
+
+ The object returned by this method is a dictionary indexed by
+ ResourceAction."""
return self._conditions
@property
def start_time(self):
- """ Returns timestamp with the time the RM started """
+ """ Returns the start time of the RM as a timestamp"""
return self._start_time
@property
def stop_time(self):
- """ Returns timestamp with the time the RM stopped """
+ """ Returns the stop time of the RM as a timestamp"""
return self._stop_time
@property
def discover_time(self):
- """ Returns timestamp with the time the RM passed to state discovered """
+ """ Returns the time discovering was finished for the RM as a timestamp"""
return self._discover_time
@property
def provision_time(self):
- """ Returns timestamp with the time the RM passed to state provisioned """
+ """ Returns the time provisioning was finished for the RM as a timestamp"""
return self._provision_time
@property
def ready_time(self):
- """ Returns timestamp with the time the RM passed to state ready """
+ """ Returns the time deployment was finished for the RM as a timestamp"""
return self._ready_time
@property
def release_time(self):
- """ Returns timestamp with the time the RM was released """
+ """ Returns the release time of the RM as a timestamp"""
return self._release_time
+ @property
+ def finish_time(self):
+ """ Returns the finalization time of the RM as a timestamp"""
+ return self._finish_time
+
+ @property
+ def failed_time(self):
+ """ Returns the time failure occured for the RM as a timestamp"""
+ return self._failed_time
+
@property
def state(self):
""" Get the state of the current RM """
return self._state
def log_message(self, msg):
- """ Improve debugging message by adding more information
- as the guid and the type of the RM
+ """ Returns the log message formatted with added information.
- :param msg: Message to log
+ :param msg: text message
:type msg: str
:rtype: str
"""
return " %s guid: %d - %s " % (self._rtype, self.guid, msg)
def connect(self, guid):
- """ Connect the current RM with the RM 'guid'
+ """ Establishes a connection to the RM identified by guid
- :param guid: Guid of the RM the current RM will be connected
+ :param guid: Global unique identified of the RM to connect to
:type guid: int
"""
if self.valid_connection(guid):
self._connections.add(guid)
+ def disconnect(self, guid):
+ """ Removes connection to the RM identified by guid
+
+ :param guid: Global unique identified of the RM to connect to
+ :type guid: int
+ """
+ if guid in self._connections:
+ self._connections.remove(guid)
+
def discover(self):
- """ Discover the Resource. As it is specific for each RM,
- this method take the time when the RM become DISCOVERED and
- change the status """
- self._discover_time = strfnow()
+ """ Performs resource discovery.
+
+ This method is resposible for selecting an individual resource
+ matching user requirements.
+ This method should be redefined when necessary in child classes.
+ """
+ self._discover_time = tnow()
self._state = ResourceState.DISCOVERED
def provision(self):
- """ Provision the Resource. As it is specific for each RM,
- this method take the time when the RM become PROVISIONNED and
- change the status """
- self._provision_time = strfnow()
+ """ Performs resource provisioning.
+
+ This method is resposible for provisioning one resource.
+ After this method has been successfully invoked, the resource
+ should be acccesible/controllable by the RM.
+ This method should be redefined when necessary in child classes.
+ """
+ self._provision_time = tnow()
self._state = ResourceState.PROVISIONED
def start(self):
- """ Start the Resource Manager. As it is specific to each RM, this methods
- just change, after some verifications, the status to STARTED and save the time.
-
+ """ Starts the resource.
+
+ There is no generic start behavior for all resources.
+ This method should be redefined when necessary in child classes.
"""
if not self._state in [ResourceState.READY, ResourceState.STOPPED]:
self.error("Wrong state %s for start" % self.state)
return
- self._start_time = strfnow()
+ self._start_time = tnow()
self._state = ResourceState.STARTED
def stop(self):
- """ Stop the Resource Manager. As it is specific to each RM, this methods
- just change, after some verifications, the status to STOPPED and save the time.
-
+ """ Stops the resource.
+
+ There is no generic stop behavior for all resources.
+ This method should be redefined when necessary in child classes.
"""
if not self._state in [ResourceState.STARTED]:
self.error("Wrong state %s for stop" % self.state)
return
- self._stop_time = strfnow()
+ self._stop_time = tnow()
self._state = ResourceState.STOPPED
def set(self, name, value):
attr.value = value
def get(self, name):
- """ Start the Resource Manager
+ """ Returns the value of the attribute
:param name: Name of the attribute
:type name: str
return attr.value
def register_trace(self, name):
- """ Enable trace
+ """ Explicitly enable trace generation
:param name: Name of the trace
:type name: str
conditions.append((group, state, time))
- def get_connected(self, rtype):
- """ Return the list of RM with the type 'rtype'
+ def get_connected(self, rtype = None):
+ """ Returns the list of RM with the type 'rtype'
:param rtype: Type of the RM we look for
:type rtype: str
connected = []
for guid in self.connections:
rm = self.ec.get_resource(guid)
- if rm.rtype() == rtype:
+ if not rtype or rm.rtype() == rtype:
connected.append(rm)
return connected
# Only keep time information for START and STOP
break
- d = strfdiff(strfnow(), t)
- wait = strfdiff(strfvalid(time),strfvalid(str(d)+"s"))
- if wait > 0.001:
+ # time already elapsed since RM changed state
+ waited = "%fs" % tdiffsec(tnow(), t)
+
+ # time still to wait
+ wait = tdiffsec(stabsformat(time), stabsformat(waited))
+
+ if wait > 0:
reschedule = True
delay = "%fs" % wait
break
+
return reschedule, delay
def set_with_conditions(self, name, value, group, state, time):
return
self.debug("----- READY ---- ")
- self._ready_time = strfnow()
+ self._ready_time = tnow()
self._state = ResourceState.READY
def release(self):
- """Clean the resource at the end of the Experiment and change the status
+ """Release any resources used by this RM
"""
- self._release_time = strfnow()
+ self._release_time = tnow()
self._state = ResourceState.RELEASED
+ def finish(self):
+ """ Mark ResourceManager as FINISHED
+
+ """
+ self._finish_time = tnow()
+ self._state = ResourceState.FINISHED
+
+ def fail(self):
+ """ Mark ResourceManager as FAILED
+
+ """
+ self._failed_time = tnow()
+ self._state = ResourceState.FAILED
+
def valid_connection(self, guid):
- """Check if the connection is available. This method need to be
- redefined by each new Resource Manager.
+ """Checks whether a connection with the other RM
+ is valid.
+ This method need to be redefined by each new Resource Manager.
:param guid: Guid of the current Resource Manager
:type guid: int
from nepi.execution.resource import ResourceManager, clsinit, ResourceState
from nepi.resources.linux.node import LinuxNode
from nepi.util.sshfuncs import ProcStatus
-from nepi.util.timefuncs import strfnow, strfdiff
+from nepi.util.timefuncs import tnow, tdiffsec
import os
import subprocess
self._proc = None
# timestamp of last state check of the application
- self._last_state_check = strfnow()
+ self._last_state_check = tnow()
def log_message(self, msg):
return " guid %d - host %s - %s " % (self.guid,
# the local processor with too many ssh queries, the state is only
# requested every 'state_check_delay' seconds.
state_check_delay = 0.5
- if strfdiff(strfnow(), self._last_state_check) > state_check_delay:
+ if tdiffsec(tnow(), self._last_state_check) > state_check_delay:
# check if execution errors occurred
(out, err), proc = self.node.check_errors(self.app_home)
if status == ProcStatus.FINISHED:
self._state = ResourceState.FINISHED
- self._last_state_check = strfnow()
+ self._last_state_check = tnow()
return self._state
ResourceAction
from nepi.resources.linux.ccn.ccnapplication import LinuxCCNApplication
from nepi.resources.linux.ccn.ccnr import LinuxCCNR
-from nepi.util.timefuncs import strfnow, strfdiff
+from nepi.util.timefuncs import tnow
import os
self.execute_command(command, env)
self.debug("----- READY ---- ")
- self._ready_time = strfnow()
+ self._ready_time = tnow()
self._state = ResourceState.READY
def start(self):
command = self.get("command")
self.info("Starting command '%s'" % command)
- self._start_time = strfnow()
+ self._start_time = tnow()
self._state = ResourceState.STARTED
else:
msg = " Failed to execute command '%s'" % command
from nepi.execution.resource import ResourceManager, clsinit_copy, ResourceState
from nepi.resources.linux.application import LinuxApplication
from nepi.resources.linux.node import OSType
-from nepi.util.timefuncs import strfnow, strfdiff
+from nepi.util.timefuncs import tnow, tdiff
import os
raise_on_error = True)
self.debug("----- READY ---- ")
- self._ready_time = strfnow()
+ self._ready_time = tnow()
self._state = ResourceState.READY
def start(self):
command = self.get("command")
self.info("Starting command '%s'" % command)
- self._start_time = strfnow()
+ self._start_time = tnow()
self._state = ResourceState.STARTED
else:
msg = " Failed to execute command '%s'" % command
stdout = "ccndstop_stdout",
stderr = "ccndstop_stderr")
- self._stop_time = strfnow()
+ self._stop_time = tnow()
self._state = ResourceState.STOPPED
@property
# First check if the ccnd has failed
state_check_delay = 0.5
if self._state == ResourceState.STARTED and \
- strfdiff(strfnow(), self._last_state_check) > state_check_delay:
+ tdiff(tnow(), self._last_state_check) > state_check_delay:
(out, err), proc = self._ccndstatus
retcode = proc.poll()
self.error(msg, out, err)
self._state = ResourceState.FAILED
- self._last_state_check = strfnow()
+ self._last_state_check = tnow()
return self._state
ResourceAction
from nepi.resources.linux.ccn.ccnapplication import LinuxCCNApplication
from nepi.resources.linux.ccn.ccnd import LinuxCCND
-from nepi.util.timefuncs import strfnow, strfdiff
+from nepi.util.timefuncs import tnow
import os
raise_on_error = True)
self.debug("----- READY ---- ")
- self._ready_time = strfnow()
+ self._ready_time = tnow()
self._state = ResourceState.READY
def start(self):
command = self.get("command")
self.info("Starting command '%s'" % command)
- self._start_time = strfnow()
+ self._start_time = tnow()
self._state = ResourceState.STARTED
else:
msg = " Failed to execute command '%s'" % command
# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
# Julien Tribino <julien.tribino@inria.fr>
-import datetime
import ssl
import sys
import time
from nepi.resources.omf.omf_client import OMFClient
from nepi.resources.omf.messages_5_4 import MessageHandler
+from nepi.util.timefuncs import tsfromat
+
class OMFAPI(Logger):
"""
.. class:: Class Args :
"""
super(OMFAPI, self).__init__("OMFAPI")
- date = datetime.datetime.now().strftime("%Y-%m-%dt%H.%M.%S")
+ date = tsfromat()
tz = -time.altzone if time.daylight != 0 else -time.timezone
date += "%+06.2f" % (tz / 3600) # timezone difference is in seconds
self._user = "%s-%s" % (slice, date)
# Work around to fix "ImportError: Failed to import _strptime because the import lock is held by another thread."
datetime.datetime.strptime("20120807124732894211", _strf)
-def strfnow():
- """ Current date """
- return datetime.datetime.now().strftime(_strf)
-
-def strfdiff(str1, str2):
- # Time difference in seconds without ignoring miliseconds
- d1 = datetime.datetime.strptime(str1, _strf)
- d2 = datetime.datetime.strptime(str2, _strf)
- diff = d1 - d2
- ddays = diff.days * 86400
- dus = round(diff.microseconds * 1.0e-06, 2)
- ret = ddays + diff.seconds + dus
- # delay must be > 0
- return (ret or 0.001)
-
-def strfvalid(date):
- """ User defined date to scheduler date
+def stformat(sdate):
+ """ Constructs a datetime object from a string date with
+ format YYYYMMddHHMMSSffff
+
+ """
+ return datetime.datetime.strptime(sdate, _strf).date()
+
+def tsfromat(date = None):
+ """ Formats a datetime object to a string with format YYYYMMddHHMMSSffff.
+ If no date is given, the current date is used.
- :param date : user define date matchin the pattern _strf
+ """
+ if not date:
+ date = tnow()
+
+ return date.strftime(_strf)
+
+def tnow():
+ """ Returns datetime object with the current time """
+ return datetime.datetime.now()
+
+def tdiff(date1, date2):
+ """ Returns difference ( date1 - date2 ) as a datetime object,
+ where date1 and date 2 are datetime objects
+
+ """
+ return date1 - date2
+
+def tdiffsec(date1, date2):
+ """ Returns the date difference ( date1 - date2 ) in seconds,
+ where date1 and date 2 are datetime objects
+
+ """
+ diff = tdiff(date1, date2)
+ return diff.total_seconds()
+
+def stabsformat(sdate, dbase = None):
+ """ Constructs a datetime object from a string date.
+ The string date can be expressed as an absolute date
+ ( i.e. format YYYYMMddHHMMSSffff ) or as a relative time
+ ( e.g. format '5m' or '10s').
+ If the date is a relative time and the dbase parameter
+ is given (dbase must be datetime object), the returned
+ date will be dbase + sdate. If dbase is None,
+ current time will be used instead as base time.
+
+ :param date : string date
:type date : date
"""
- if not date:
- return strfnow()
- if _reabs.match(date):
- return date
- m = _rerel.match(date)
+
+ # No date given, return current datetime
+ if not sdate:
+ return tnow()
+
+ # Absolute date is given
+ if _reabs.match(sdate):
+ return stformat(sdate)
+
+ # Relative time is given
+ m = _rerel.match(sdate)
if m:
time = float(m.groupdict()['time'])
units = m.groupdict()['units']
elif units == 'ms':
delta = datetime.timedelta(microseconds = (time*1000))
else:
- delta = datetime.timedelta(microseconds = time)
- now = datetime.datetime.now()
- d = now + delta
- return d.strftime(_strf)
+ delta = datetime.timedelta(microseconds = time)
+
+ if not dbase:
+ dbase = tnow()
+
+ return dbase + delta
+
return None
--- /dev/null
+#!/usr/bin/env python
+#
+# NEPI, a framework to manage network experiments
+# Copyright (C) 2013 INRIA
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
+
+from nepi.execution.scheduler import HeapScheduler, Task, TaskStatus
+from nepi.util.timefuncs import tnow, stabsformat
+
+import unittest
+
+class SchedulerTestCase(unittest.TestCase):
+ def test_task_order(self):
+ def first():
+ return 1
+
+ def second():
+ return 2
+
+ def third():
+ return 3
+
+ scheduler = HeapScheduler()
+
+ t1 = tnow()
+ t2 = stabsformat("2s")
+ t3 = stabsformat("3s")
+
+ tsk1 = Task(t1, first)
+ tsk2 = Task(t2, second)
+ tsk3 = Task(t3, third)
+
+ # schedule the tasks in disorder
+ scheduler.schedule(tsk2)
+ scheduler.schedule(tsk3)
+ scheduler.schedule(tsk1)
+
+ # Make sure tasks are retrieved in teh correct order
+ tsk = scheduler.next()
+ self.assertEquals(tsk.callback(), 1)
+
+ tsk = scheduler.next()
+ self.assertEquals(tsk.callback(), 2)
+
+ tsk = scheduler.next()
+ self.assertEquals(tsk.callback(), 3)
+
+
+if __name__ == '__main__':
+ unittest.main()
+
from nepi.resources.omf.channel import OMFChannel
from nepi.resources.omf.omf_api import OMFAPIFactory
+from nepi.util.timefuncs import tdiffsec
+
from nepi.util.timefuncs import *
import time
self.ec.register_condition(self.app5, ResourceAction.START, [self.app3, self.app2], ResourceState.STARTED , "2s")
self.ec.register_condition(self.app5, ResourceAction.START, self.app1, ResourceState.STARTED , "25s")
- self.ec.register_condition([self.app1, self.app2, self.app3,self.app4, self.app5], ResourceAction.STOP, self.app5, ResourceState.STARTED , "1s")
+ self.ec.register_condition([self.app1, self.app2, self.app3,self.app4, self.app5],
+ ResourceAction.STOP, self.app5, ResourceState.STARTED , "1s")
def tearDown(self):
self.ec.shutdown()
self.ec.wait_finished([self.app1, self.app2, self.app3,self.app4, self.app5])
- self.assertEquals(round(strfdiff(self.ec.get_resource(self.app2).start_time, self.ec.get_resource(self.app1).start_time),1), 3.0)
- self.assertEquals(round(strfdiff(self.ec.get_resource(self.app3).start_time, self.ec.get_resource(self.app2).start_time),1), 2.0)
- self.assertEquals(round(strfdiff(self.ec.get_resource(self.app4).start_time, self.ec.get_resource(self.app3).start_time),1), 3.0)
- self.assertEquals(round(strfdiff(self.ec.get_resource(self.app5).start_time, self.ec.get_resource(self.app3).start_time),1), 20.0)
- self.assertEquals(round(strfdiff(self.ec.get_resource(self.app5).start_time, self.ec.get_resource(self.app1).start_time),1), 25.0)
+ print " HOLA ", self.ec.get_resource(self.app2).start_time, self.ec.get_resource(self.app1).start_time
+
+ self.assertEquals(round(tdiffsec(self.ec.get_resource(self.app2).start_time,
+ self.ec.get_resource(self.app1).start_time),1), 3.0)
+ self.assertEquals(round(tdiffsec(self.ec.get_resource(self.app3).start_time,
+ self.ec.get_resource(self.app2).start_time),1), 2.0)
+ self.assertEquals(round(tdiffsec(self.ec.get_resource(self.app4).start_time,
+ self.ec.get_resource(self.app3).start_time),1), 3.0)
+ self.assertEquals(round(tdiffsec(self.ec.get_resource(self.app5).start_time,
+ self.ec.get_resource(self.app3).start_time),1), 20.0)
+ self.assertEquals(round(tdiffsec(self.ec.get_resource(self.app5).start_time,
+ self.ec.get_resource(self.app1).start_time),1), 25.0)
# Precision is at 1/10. So this one returns an error 7.03 != 7.0
- #self.assertEquals(strfdiff(self.ec.get_resource(self.app5).start_time, self.ec.get_resource(self.app1).start_time), 7)
- #In order to release everythings
+ #self.assertEquals(tdiffsec(self.ec.get_resource(self.app5).start_time, self.ec.get_resource(self.app1).start_time), 7)
+ #In order to release everythings
time.sleep(1)