3 # NEPI, a framework to manage network experiments
4 # Copyright (C) 2013 INRIA
6 # This program is free software: you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License version 2 as
8 # published by the Free Software Foundation;
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
21 from nepi.execution.ec import ExperimentController, ECState
22 from nepi.execution.scheduler import TaskStatus
28 class ExecuteControllersTestCase(unittest.TestCase):
29 def test_schedule_print(self):
33 ec = ExperimentController()
35 tid = ec.schedule("0s", myfunc, track=True)
38 task = ec.get_task(tid)
39 if task.status != TaskStatus.NEW:
44 self.assertEqual('hola!', task.result)
48 def test_schedule_date(self):
50 return datetime.datetime.now()
52 ec = ExperimentController()
54 schedule_time = datetime.datetime.now()
56 tid = ec.schedule("4s", get_time, track=True)
59 task = ec.get_task(tid)
60 if task.status != TaskStatus.NEW:
65 execution_time = task.result
66 delta = execution_time - schedule_time
67 self.assertTrue(delta > datetime.timedelta(seconds=4))
68 self.assertTrue(delta < datetime.timedelta(seconds=5))
72 def test_schedule_exception(self):
74 # When this task is executed and the error raise,
75 # the FailureManager should set its failure level to
77 raise RuntimeError("NOT A REAL ERROR. JUST TESTING!")
79 ec = ExperimentController()
81 tid = ec.schedule("2s", raise_error, track = True)
84 task = ec.get_task(tid)
85 if task.status != TaskStatus.NEW:
90 self.assertEqual(task.status, TaskStatus.ERROR)
92 if __name__ == '__main__':