#!/usr/bin/env python
+#
+# NEPI, a framework to manage network experiments
+# Copyright (C) 2013 INRIA
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License version 2 as
+# published by the Free Software Foundation;
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
-from neco.execution.ec import ExperimentController
-from neco.execution.tasks import TaskStatus
+
+from nepi.execution.ec import ExperimentController, ECState
+from nepi.execution.scheduler import TaskStatus
import datetime
+import time
import unittest
class ExecuteControllersTestCase(unittest.TestCase):
def test_schedule_print(self):
- def myfunc(ec_weakref):
- result = id(ec_weakref())
- return (TaskStatus.SUCCESS, result)
+ def myfunc():
+ return 'hola!'
+
+ ec = ExperimentController()
+
+ tid = ec.schedule("0s", myfunc, track=True)
+
+ while True:
+ task = ec.get_task(tid)
+ if task.status != TaskStatus.NEW:
+ break
- try:
- ec = ExperimentController()
+ time.sleep(1)
- tid = ec.schedule("0s", myfunc)
- status = None
- while status != TaskStatus.SUCCESS:
- (status, result) = ec.task_info(tid)
+ self.assertEqual('hola!', task.result)
- self.assertEquals(id(ec), result)
- finally:
- ec.terminate()
+ ec.shutdown()
def test_schedule_date(self):
- def get_time(ec_weakref):
- timestamp = datetime.datetime.now()
- return (TaskStatus.SUCCESS, timestamp)
+ def get_time():
+ return datetime.datetime.now()
+
+ ec = ExperimentController()
+
+ schedule_time = datetime.datetime.now()
+
+ tid = ec.schedule("4s", get_time, track=True)
+
+ while True:
+ task = ec.get_task(tid)
+ if task.status != TaskStatus.NEW:
+ break
+
+ time.sleep(1)
+
+ execution_time = task.result
+ delta = execution_time - schedule_time
+ self.assertTrue(delta > datetime.timedelta(seconds=4))
+ self.assertTrue(delta < datetime.timedelta(seconds=5))
+
+ ec.shutdown()
- try:
- ec = ExperimentController()
+ def test_schedule_exception(self):
+ def raise_error():
+ # When this task is executed and the error raise,
+ # the FailureManager should set its failure level to
+ # TASK_FAILURE
+ raise RuntimeError("NOT A REAL ERROR. JUST TESTING!")
- schedule_time = datetime.datetime.now()
-
- tid = ec.schedule("4s", get_time)
- status = None
- while status != TaskStatus.SUCCESS:
- (status, execution_time) = ec.task_info(tid)
+ ec = ExperimentController()
- delta = execution_time - schedule_time
- self.assertTrue(delta > datetime.timedelta(seconds=4))
- self.assertTrue(delta < datetime.timedelta(seconds=5))
+ tid = ec.schedule("2s", raise_error, track = True)
+
+ while True:
+ task = ec.get_task(tid)
+ if task.status != TaskStatus.NEW:
+ break
- finally:
- ec.terminate()
+ time.sleep(1)
+ self.assertEqual(task.status, TaskStatus.ERROR)
if __name__ == '__main__':
unittest.main()