X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=test%2Fexecution%2Fec.py;h=03040146fbc9ac26bedc6a81843730efc37232d1;hb=db88361f45034b3286b2d1f6967023a64890f6cb;hp=eda875eefc2f3b0cd6160a2f65f5902d043ee08e;hpb=4f7387b9c2f8e427bff72a05dc547730937a242a;p=nepi.git
diff --git a/test/execution/ec.py b/test/execution/ec.py
index eda875ee..03040146 100755
--- a/test/execution/ec.py
+++ b/test/execution/ec.py
@@ -1,51 +1,94 @@
#!/usr/bin/env python
+#
+# NEPI, a framework to manage network experiments
+# Copyright (C) 2013 INRIA
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+#
+# Author: Alina Quereilhac
-from neco.execution.ec import ExperimentController
-from neco.execution.tasks import TaskStatus
+
+from nepi.execution.ec import ExperimentController, ECState
+from nepi.execution.scheduler import TaskStatus
import datetime
+import time
import unittest
class ExecuteControllersTestCase(unittest.TestCase):
def test_schedule_print(self):
- def myfunc(ec_weakref):
- result = id(ec_weakref())
- return (TaskStatus.SUCCESS, result)
+ def myfunc():
+ return 'hola!'
+
+ ec = ExperimentController()
+
+ tid = ec.schedule("0s", myfunc, track=True)
+
+ while True:
+ task = ec.get_task(tid)
+ if task.status != TaskStatus.NEW:
+ break
- try:
- ec = ExperimentController()
+ time.sleep(1)
- tid = ec.schedule("0s", myfunc)
- status = None
- while status != TaskStatus.SUCCESS:
- (status, result) = ec.task_info(tid)
+ self.assertEquals('hola!', task.result)
- self.assertEquals(id(ec), result)
- finally:
- ec.terminate()
+ ec.shutdown()
def test_schedule_date(self):
- def get_time(ec_weakref):
- timestamp = datetime.datetime.now()
- return (TaskStatus.SUCCESS, timestamp)
+ def get_time():
+ return datetime.datetime.now()
+
+ ec = ExperimentController()
+
+ schedule_time = datetime.datetime.now()
+
+ tid = ec.schedule("4s", get_time, track=True)
+
+ while True:
+ task = ec.get_task(tid)
+ if task.status != TaskStatus.NEW:
+ break
+
+ time.sleep(1)
+
+ execution_time = task.result
+ delta = execution_time - schedule_time
+ self.assertTrue(delta > datetime.timedelta(seconds=4))
+ self.assertTrue(delta < datetime.timedelta(seconds=5))
+
+ ec.shutdown()
- try:
- ec = ExperimentController()
+ def test_schedule_exception(self):
+ def raise_error():
+ # When this task is executed and the error raise,
+ # the FailureManager should set its failure level to
+ # TASK_FAILURE
+ raise RuntimeError, "NOT A REAL ERROR. JUST TESTING!"
- schedule_time = datetime.datetime.now()
-
- tid = ec.schedule("4s", get_time)
- status = None
- while status != TaskStatus.SUCCESS:
- (status, execution_time) = ec.task_info(tid)
+ ec = ExperimentController()
- delta = execution_time - schedule_time
- self.assertTrue(delta > datetime.timedelta(seconds=4))
- self.assertTrue(delta < datetime.timedelta(seconds=5))
+ tid = ec.schedule("2s", raise_error, track = True)
+
+ while True:
+ task = ec.get_task(tid)
+ if task.status != TaskStatus.NEW:
+ break
- finally:
- ec.terminate()
+ time.sleep(1)
+ self.assertEquals(task.status, TaskStatus.ERROR)
if __name__ == '__main__':
unittest.main()