X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=test%2Fexecution%2Fec.py;h=ff7a8fa7df095885672edb49157390f78eb7943c;hb=1d2350d56f314a6e3de43517a66f7e2f48128d44;hp=eda875eefc2f3b0cd6160a2f65f5902d043ee08e;hpb=4f7387b9c2f8e427bff72a05dc547730937a242a;p=nepi.git diff --git a/test/execution/ec.py b/test/execution/ec.py index eda875ee..ff7a8fa7 100755 --- a/test/execution/ec.py +++ b/test/execution/ec.py @@ -1,51 +1,93 @@ #!/usr/bin/env python +# +# NEPI, a framework to manage network experiments +# Copyright (C) 2013 INRIA +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 2 as +# published by the Free Software Foundation; +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +# Author: Alina Quereilhac -from neco.execution.ec import ExperimentController -from neco.execution.tasks import TaskStatus + +from nepi.execution.ec import ExperimentController, ECState +from nepi.execution.scheduler import TaskStatus import datetime +import time import unittest class ExecuteControllersTestCase(unittest.TestCase): def test_schedule_print(self): - def myfunc(ec_weakref): - result = id(ec_weakref()) - return (TaskStatus.SUCCESS, result) + def myfunc(): + return 'hola!' + + ec = ExperimentController() + + tid = ec.schedule("0s", myfunc, track=True) + + while True: + task = ec.get_task(tid) + if task.status != TaskStatus.NEW: + break - try: - ec = ExperimentController() + time.sleep(1) - tid = ec.schedule("0s", myfunc) - status = None - while status != TaskStatus.SUCCESS: - (status, result) = ec.task_info(tid) + self.assertEqual('hola!', task.result) - self.assertEquals(id(ec), result) - finally: - ec.terminate() + ec.shutdown() def test_schedule_date(self): - def get_time(ec_weakref): - timestamp = datetime.datetime.now() - return (TaskStatus.SUCCESS, timestamp) + def get_time(): + return datetime.datetime.now() + + ec = ExperimentController() + + schedule_time = datetime.datetime.now() + + tid = ec.schedule("4s", get_time, track=True) + + while True: + task = ec.get_task(tid) + if task.status != TaskStatus.NEW: + break + + time.sleep(1) + + execution_time = task.result + delta = execution_time - schedule_time + self.assertTrue(delta > datetime.timedelta(seconds=4)) + self.assertTrue(delta < datetime.timedelta(seconds=5)) + + ec.shutdown() - try: - ec = ExperimentController() + def test_schedule_exception(self): + def raise_error(): + # When this task is executed and the error raise, + # the FailureManager should set its failure level to + # TASK_FAILURE + raise RuntimeError("NOT A REAL ERROR. JUST TESTING!") - schedule_time = datetime.datetime.now() - - tid = ec.schedule("4s", get_time) - status = None - while status != TaskStatus.SUCCESS: - (status, execution_time) = ec.task_info(tid) + ec = ExperimentController() - delta = execution_time - schedule_time - self.assertTrue(delta > datetime.timedelta(seconds=4)) - self.assertTrue(delta < datetime.timedelta(seconds=5)) + tid = ec.schedule("2s", raise_error, track = True) + + while True: + task = ec.get_task(tid) + if task.status != TaskStatus.NEW: + break - finally: - ec.terminate() + time.sleep(1) + self.assertEqual(task.status, TaskStatus.ERROR) if __name__ == '__main__': unittest.main()