X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=test%2Fexecution%2Fec.py;h=03040146fbc9ac26bedc6a81843730efc37232d1;hb=db88361f45034b3286b2d1f6967023a64890f6cb;hp=eda875eefc2f3b0cd6160a2f65f5902d043ee08e;hpb=4f7387b9c2f8e427bff72a05dc547730937a242a;p=nepi.git diff --git a/test/execution/ec.py b/test/execution/ec.py index eda875ee..03040146 100755 --- a/test/execution/ec.py +++ b/test/execution/ec.py @@ -1,51 +1,94 @@ #!/usr/bin/env python +# +# NEPI, a framework to manage network experiments +# Copyright (C) 2013 INRIA +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +# Author: Alina Quereilhac -from neco.execution.ec import ExperimentController -from neco.execution.tasks import TaskStatus + +from nepi.execution.ec import ExperimentController, ECState +from nepi.execution.scheduler import TaskStatus import datetime +import time import unittest class ExecuteControllersTestCase(unittest.TestCase): def test_schedule_print(self): - def myfunc(ec_weakref): - result = id(ec_weakref()) - return (TaskStatus.SUCCESS, result) + def myfunc(): + return 'hola!' + + ec = ExperimentController() + + tid = ec.schedule("0s", myfunc, track=True) + + while True: + task = ec.get_task(tid) + if task.status != TaskStatus.NEW: + break - try: - ec = ExperimentController() + time.sleep(1) - tid = ec.schedule("0s", myfunc) - status = None - while status != TaskStatus.SUCCESS: - (status, result) = ec.task_info(tid) + self.assertEquals('hola!', task.result) - self.assertEquals(id(ec), result) - finally: - ec.terminate() + ec.shutdown() def test_schedule_date(self): - def get_time(ec_weakref): - timestamp = datetime.datetime.now() - return (TaskStatus.SUCCESS, timestamp) + def get_time(): + return datetime.datetime.now() + + ec = ExperimentController() + + schedule_time = datetime.datetime.now() + + tid = ec.schedule("4s", get_time, track=True) + + while True: + task = ec.get_task(tid) + if task.status != TaskStatus.NEW: + break + + time.sleep(1) + + execution_time = task.result + delta = execution_time - schedule_time + self.assertTrue(delta > datetime.timedelta(seconds=4)) + self.assertTrue(delta < datetime.timedelta(seconds=5)) + + ec.shutdown() - try: - ec = ExperimentController() + def test_schedule_exception(self): + def raise_error(): + # When this task is executed and the error raise, + # the FailureManager should set its failure level to + # TASK_FAILURE + raise RuntimeError, "NOT A REAL ERROR. JUST TESTING!" - schedule_time = datetime.datetime.now() - - tid = ec.schedule("4s", get_time) - status = None - while status != TaskStatus.SUCCESS: - (status, execution_time) = ec.task_info(tid) + ec = ExperimentController() - delta = execution_time - schedule_time - self.assertTrue(delta > datetime.timedelta(seconds=4)) - self.assertTrue(delta < datetime.timedelta(seconds=5)) + tid = ec.schedule("2s", raise_error, track = True) + + while True: + task = ec.get_task(tid) + if task.status != TaskStatus.NEW: + break - finally: - ec.terminate() + time.sleep(1) + self.assertEquals(task.status, TaskStatus.ERROR) if __name__ == '__main__': unittest.main()