2 # -*- coding: utf-8 -*-
5 from nepi.core.attributes import AttributesMap, Attribute
6 from nepi.util import server, validation
7 from nepi.util.constants import TIME_NOW
14 # PROTOCOL INSTRUCTION MESSAGES
48 # EXPERIMENT CONTROLER PROTOCOL MESSAGES
49 controller_messages = dict({
51 ACCESS: "%d|%s" % (ACCESS, "%d|%s|%s|%s|%s|%d|%s|%r"),
52 TRACE: "%d|%s" % (TRACE, "%d|%d|%s"),
53 FINISHED: "%d|%s" % (FINISHED, "%d"),
56 SHUTDOWN: "%d" % SHUTDOWN,
59 # TESTBED INSTANCE PROTOCOL MESSAGES
60 testbed_messages = dict({
61 TRACE: "%d|%s" % (TRACE, "%d|%s"),
62 START: "%d|%s" % (START, "%s"),
63 STOP: "%d|%s" % (STOP, "%s"),
64 SHUTDOWN: "%d" % SHUTDOWN,
65 CONFIGURE: "%d|%s" % (CONFIGURE, "%s|%s|%d"),
66 CREATE: "%d|%s" % (CREATE, "%d|%s"),
67 CREATE_SET: "%d|%s" % (CREATE_SET, "%d|%s|%s|%d"),
68 FACTORY_SET: "%d|%s" % (FACTORY_SET, "%d|%s|%s|%d"),
69 CONNECT: "%d|%s" % (CONNECT, "%d|%s|%d|%s"),
70 CROSS_CONNECT: "%d|%s" % (CROSS_CONNECT, "%d|%s|%d|%d|%s|%s"),
71 ADD_TRACE: "%d|%s" % (ADD_TRACE, "%d|%s"),
72 ADD_ADDRESS: "%d|%s" % (ADD_ADDRESS, "%d|%d|%s|%d|%s"),
73 ADD_ROUTE: "%d|%s" % (ADD_ROUTE, "%d|%s|%d|%s"),
74 DO_SETUP: "%d" % DO_SETUP,
75 DO_CREATE: "%d" % DO_CREATE,
76 DO_CONNECT: "%d" % DO_CONNECT,
77 DO_CONFIGURE: "%d" % DO_CONFIGURE,
78 DO_CROSS_CONNECT: "%d" % DO_CROSS_CONNECT,
79 GET: "%d|%s" % (GET, "%s|%d|%s"),
80 SET: "%d|%s" % (SET, "%s|%d|%s|%s|%d"),
81 ACTION: "%d|%s" % (ACTION, "%s|%d|%s"),
82 STATUS: "%d|%s" % (STATUS, "%d"),
86 instruction_text = dict({
96 CONFIGURE: "CONFIGURE",
98 CREATE_SET: "CREATE_SET",
99 FACTORY_SET: "FACTORY_SET",
101 CROSS_CONNECT: "CROSS_CONNECT",
102 ADD_TRACE: "ADD_TRACE",
103 ADD_ADDRESS: "ADD_ADDRESS",
104 ADD_ROUTE: "ADD_ROUTE",
105 DO_SETUP: "DO_SETUP",
106 DO_CREATE: "DO_CREATE",
107 DO_CONNECT: "DO_CONNECT",
108 DO_CONFIGURE: "DO_CONFIGURE",
109 DO_CROSS_CONNECT: "DO_CROSS_CONNECT",
122 if isinstance(value, bool):
124 elif isinstance(value, int):
126 elif isinstance(value, float):
131 def set_type(type, value):
137 value = value == "True"
142 def log_msg(server, params):
143 instr = int(params[0])
144 instr_txt = instruction_text[instr]
145 server.log_debug("%s - msg: %s [%s]" % (server.__class__.__name__,
146 instr_txt, ", ".join(map(str, params[1:]))))
148 def log_reply(server, reply):
149 res = reply.split("|")
151 code_txt = instruction_text[code]
152 txt = base64.b64decode(res[1])
153 server.log_debug("%s - reply: %s %s" % (server.__class__.__name__,
156 class AccessConfiguration(AttributesMap):
157 MODE_SINGLE_PROCESS = "SINGLE"
158 MODE_DAEMON = "DAEMON"
160 ACCESS_LOCAL = "LOCAL"
161 ERROR_LEVEL = "Error"
162 DEBUG_LEVEL = "Debug"
165 super(AccessConfiguration, self).__init__()
166 self.add_attribute(name = "mode",
167 help = "Instance execution mode",
168 type = Attribute.ENUM,
169 value = AccessConfiguration.MODE_SINGLE_PROCESS,
170 allowed = [AccessConfiguration.MODE_DAEMON,
171 AccessConfiguration.MODE_SINGLE_PROCESS],
172 validation_function = validation.is_enum)
173 self.add_attribute(name = "communication",
174 help = "Instance communication mode",
175 type = Attribute.ENUM,
176 value = AccessConfiguration.ACCESS_LOCAL,
177 allowed = [AccessConfiguration.ACCESS_LOCAL,
178 AccessConfiguration.ACCESS_SSH],
179 validation_function = validation.is_enum)
180 self.add_attribute(name = "host",
181 help = "Host where the testbed will be executed",
182 type = Attribute.STRING,
184 validation_function = validation.is_string)
185 self.add_attribute(name = "user",
186 help = "User on the Host to execute the testbed",
187 type = Attribute.STRING,
188 validation_function = validation.is_string)
189 self.add_attribute(name = "port",
190 help = "Port on the Host",
191 type = Attribute.INTEGER,
193 validation_function = validation.is_integer)
194 self.add_attribute(name = "rootDirectory",
195 help = "Root directory for storing process files",
196 type = Attribute.STRING,
198 validation_function = validation.is_string) # TODO: validation.is_path
199 self.add_attribute(name = "useAgent",
200 help = "Use -A option for forwarding of the authentication agent, if ssh access is used",
201 type = Attribute.BOOL,
203 validation_function = validation.is_bool)
204 self.add_attribute(name = "logLevel",
205 help = "Log level for instance",
206 type = Attribute.ENUM,
207 value = AccessConfiguration.ERROR_LEVEL,
208 allowed = [AccessConfiguration.ERROR_LEVEL,
209 AccessConfiguration.DEBUG_LEVEL],
210 validation_function = validation.is_enum)
212 def create_controller(xml, access_config = None):
213 mode = None if not access_config else access_config.get_attribute_value("mode")
214 if not mode or mode == AccessConfiguration.MODE_SINGLE_PROCESS:
215 from nepi.core.execute import ExperimentController
216 return ExperimentController(xml)
217 elif mode == AccessConfiguration.MODE_DAEMON:
218 root_dir = access_config.get_attribute_value("rootDirectory")
219 log_level = access_config.get_attribute_value("logLevel")
220 return ExperimentControllerProxy(root_dir, log_level, experiment_xml = xml)
221 raise RuntimeError("Unsupported access configuration 'mode'" % mode)
223 def create_testbed_instance(testbed_id, testbed_version, access_config):
224 mode = None if not access_config else access_config.get_attribute_value("mode")
225 if not mode or mode == AccessConfiguration.MODE_SINGLE_PROCESS:
226 return _build_testbed_testbed(testbed_id, testbed_version)
227 elif mode == AccessConfiguration.MODE_DAEMON:
228 root_dir = access_config.get_attribute_value("rootDirectory")
229 log_level = access_config.get_attribute_value("logLevel")
230 return TestbedIntanceProxy(root_dir, log_level, testbed_id = testbed_id,
231 testbed_version = testbed_version)
232 raise RuntimeError("Unsupported access configuration 'mode'" % mode)
234 def _build_testbed_testbed(testbed_id, testbed_version):
235 mod_name = "nepi.testbeds.%s" % (testbed_id.lower())
236 if not mod_name in sys.modules:
238 module = sys.modules[mod_name]
239 return module.TestbedInstance(testbed_version)
241 class TestbedInstanceServer(server.Server):
242 def __init__(self, root_dir, testbed_id, testbed_version):
243 super(TestbedInstanceServer, self).__init__(root_dir)
244 self._testbed_id = testbed_id
245 self._testbed_version = testbed_version
248 def post_daemonize(self):
249 self._testbed = _build_testbed_testbed(self._testbed_id,
250 self._testbed_version)
252 def reply_action(self, msg):
253 params = msg.split("|")
254 instruction = int(params[0])
255 log_msg(self, params)
257 if instruction == TRACE:
258 reply = self.trace(params)
259 elif instruction == START:
260 reply = self.start(params)
261 elif instruction == STOP:
262 reply = self.stop(params)
263 elif instruction == SHUTDOWN:
264 reply = self.shutdown(params)
265 elif instruction == CONFIGURE:
266 reply = self.configure(params)
267 elif instruction == CREATE:
268 reply = self.create(params)
269 elif instruction == CREATE_SET:
270 reply = self.create_set(params)
271 elif instruction == FACTORY_SET:
272 reply = self.factory_set(params)
273 elif instruction == CONNECT:
274 reply = self.connect(params)
275 elif instruction == CROSS_CONNECT:
276 reply = self.cross_connect(params)
277 elif instruction == ADD_TRACE:
278 reply = self.add_trace(params)
279 elif instruction == ADD_ADDRESS:
280 reply = self.add_address(params)
281 elif instruction == ADD_ROUTE:
282 reply = self.add_route(params)
283 elif instruction == DO_SETUP:
284 reply = self.do_setup(params)
285 elif instruction == DO_CREATE:
286 reply = self.do_create(params)
287 elif instruction == DO_CONNECT:
288 reply = self.do_connect(params)
289 elif instruction == DO_CONFIGURE:
290 reply = self.do_configure(params)
291 elif instruction == DO_CROSS_CONNECT:
292 reply = self.do_cross_connect(params)
293 elif instruction == GET:
294 reply = self.get(params)
295 elif instruction == SET:
296 reply = self.set(params)
297 elif instruction == ACTION:
298 reply = self.action(params)
299 elif instruction == STATUS:
300 reply = self.status(params)
301 elif instruction == GUIDS:
302 reply = self.guids(params)
304 error = "Invalid instruction %s" % instruction
305 self.log_error(error)
306 result = base64.b64encode(error)
307 reply = "%d|%s" % (ERROR, result)
309 error = self.log_error()
310 result = base64.b64encode(error)
311 reply = "%d|%s" % (ERROR, result)
312 log_reply(self, reply)
315 def guids(self, params):
316 guids = self._testbed.guids
317 guids = ",".join(map(str, guids))
318 result = base64.b64encode(guids)
319 return "%d|%s" % (OK, result)
321 def create(self, params):
322 guid = int(params[1])
323 factory_id = params[2]
324 self._testbed.create(guid, factory_id)
325 return "%d|%s" % (OK, "")
327 def trace(self, params):
328 guid = int(params[1])
330 trace = self._testbed.trace(guid, trace_id)
331 result = base64.b64encode(trace)
332 return "%d|%s" % (OK, result)
334 def start(self, params):
336 self._testbed.start(time)
337 return "%d|%s" % (OK, "")
339 def stop(self, params):
341 self._testbed.stop(time)
342 return "%d|%s" % (OK, "")
344 def shutdown(self, params):
345 self._testbed.shutdown()
346 return "%d|%s" % (OK, "")
348 def configure(self, params):
349 name = base64.b64decode(params[1])
350 value = base64.b64decode(params[2])
351 type = int(params[3])
352 value = set_type(type, value)
353 self._testbed.configure(name, value)
354 return "%d|%s" % (OK, "")
356 def create_set(self, params):
357 guid = int(params[1])
358 name = base64.b64decode(params[2])
359 value = base64.b64decode(params[3])
360 type = int(params[4])
361 value = set_type(type, value)
362 self._testbed.create_set(guid, name, value)
363 return "%d|%s" % (OK, "")
365 def factory_set(self, params):
366 name = base64.b64decode(params[1])
367 value = base64.b64decode(params[2])
368 type = int(params[3])
369 value = set_type(type, value)
370 self._testbed.factory_set(name, value)
371 return "%d|%s" % (OK, "")
373 def connect(self, params):
374 guid1 = int(params[1])
375 connector_type_name1 = params[2]
376 guid2 = int(params[3])
377 connector_type_name2 = params[4]
378 self._testbed.connect(guid1, connector_type_name1, guid2,
379 connector_type_name2)
380 return "%d|%s" % (OK, "")
382 def cross_connect(self, params):
383 guid = int(params[1])
384 connector_type_name = params[2]
385 cross_guid = int(params[3])
386 connector_type_name = params[4]
387 cross_guid = int(params[5])
388 cross_testbed_id = params[6]
389 cross_factory_id = params[7]
390 cross_connector_type_name = params[8]
391 self._testbed.cross_connect(guid, connector_type_name, cross_guid,
392 cross_testbed_id, cross_factory_id, cross_connector_type_name)
393 return "%d|%s" % (OK, "")
395 def add_trace(self, params):
396 guid = int(params[1])
398 self._testbed.add_trace(guid, trace_id)
399 return "%d|%s" % (OK, "")
401 def add_address(self, params):
402 guid = int(params[1])
403 family = int(params[2])
405 netprefix = int(params[4])
406 broadcast = params[5]
407 self._testbed.add_address(guid, family, address, netprefix,
409 return "%d|%s" % (OK, "")
411 def add_route(self, params):
412 guid = int(params[1])
413 destination = params[2]
414 netprefix = int(params[3])
416 self._testbed.add_route(guid, destination, netprefix, nexthop)
417 return "%d|%s" % (OK, "")
419 def do_setup(self, params):
420 self._testbed.do_setup()
421 return "%d|%s" % (OK, "")
423 def do_create(self, params):
424 self._testbed.do_create()
425 return "%d|%s" % (OK, "")
427 def do_connect(self, params):
428 self._testbed.do_connect()
429 return "%d|%s" % (OK, "")
431 def do_configure(self, params):
432 self._testbed.do_configure()
433 return "%d|%s" % (OK, "")
435 def do_cross_connect(self, params):
436 self._testbed.do_cross_connect()
437 return "%d|%s" % (OK, "")
439 def get(self, params):
441 guid = int(param[2] )
442 name = base64.b64decode(params[3])
443 value = self._testbed.get(time, guid, name)
444 result = base64.b64encode(str(value))
445 return "%d|%s" % (OK, result)
447 def set(self, params):
449 guid = int(params[2])
450 name = base64.b64decode(params[3])
451 value = base64.b64decode(params[4])
452 type = int(params[3])
453 value = set_type(type, value)
454 self._testbed.set(time, guid, name, value)
455 return "%d|%s" % (OK, "")
457 def action(self, params):
459 guid = int(params[2])
460 command = base64.b64decode(params[3])
461 self._testbed.action(time, guid, command)
462 return "%d|%s" % (OK, "")
464 def status(self, params):
465 guid = int(params[1])
466 status = self._testbed.status(guid)
467 result = base64.b64encode(str(status))
468 return "%d|%s" % (OK, result)
470 class ExperimentControllerServer(server.Server):
471 def __init__(self, root_dir, experiment_xml):
472 super(ExperimentControllerServer, self).__init__(root_dir)
473 self._experiment_xml = experiment_xml
474 self._controller = None
476 def post_daemonize(self):
477 from nepi.core.execute import ExperimentController
478 self._controller = ExperimentController(self._experiment_xml)
480 def reply_action(self, msg):
481 params = msg.split("|")
482 instruction = int(params[0])
483 log_msg(self, params)
485 if instruction == XML:
486 reply = self.experiment_xml(params)
487 elif instruction == ACCESS:
488 reply = self.set_access_configuration(params)
489 elif instruction == TRACE:
490 reply = self.trace(params)
491 elif instruction == FINISHED:
492 reply = self.is_finished(params)
493 elif instruction == START:
494 reply = self.start(params)
495 elif instruction == STOP:
496 reply = self.stop(params)
497 elif instruction == SHUTDOWN:
498 reply = self.shutdown(params)
500 error = "Invalid instruction %s" % instruction
501 self.log_error(error)
502 result = base64.b64encode(error)
503 reply = "%d|%s" % (ERROR, result)
505 error = self.log_error()
506 result = base64.b64encode(error)
507 reply = "%d|%s" % (ERROR, result)
508 log_reply(self, reply)
511 def experiment_xml(self, params):
512 xml = self._controller.experiment_xml
513 result = base64.b64encode(xml)
514 return "%d|%s" % (OK, result)
516 def set_access_configuration(self, params):
517 testbed_guid = int(params[1])
519 communication = params[3]
522 port = int(params[6])
524 use_agent = params[8] == "True"
525 access_config = AccessConfiguration()
526 access_config.set_attribute_value("mode", mode)
527 access_config.set_attribute_value("communication", communication)
528 access_config.set_attribute_value("host", host)
529 access_config.set_attribute_value("user", user)
530 access_config.set_attribute_value("port", port)
531 access_config.set_attribute_value("rootDirectory", root_dir)
532 access_config.set_attribute_value("useAgent", use_agent)
533 self._controller.set_access_configuration(testbed_guid,
535 return "%d|%s" % (OK, "")
537 def trace(self, params):
538 testbed_guid = int(params[1])
539 guid = int(params[2])
541 trace = self._controller.trace(testbed_guid, guid, trace_id)
542 result = base64.b64encode(trace)
543 return "%d|%s" % (OK, result)
545 def is_finished(self, params):
546 guid = int(params[1])
547 status = self._controller.is_finished(guid)
548 result = base64.b64encode(str(status))
549 return "%d|%s" % (OK, result)
551 def start(self, params):
552 self._controller.start()
553 return "%d|%s" % (OK, "")
555 def stop(self, params):
556 self._controller.stop()
557 return "%d|%s" % (OK, "")
559 def shutdown(self, params):
560 self._controller.shutdown()
561 return "%d|%s" % (OK, "")
563 class TestbedIntanceProxy(object):
564 def __init__(self, root_dir, log_level, testbed_id = None,
565 testbed_version = None, launch = True):
567 if testbed_id == None or testbed_version == None:
568 raise RuntimeError("To launch a TesbedInstance server a \
569 testbed_id and testbed_version are required")
571 s = TestbedInstanceServer(root_dir, testbed_id, testbed_version)
572 if log_level == AccessConfiguration.DEBUG_LEVEL:
573 s.set_debug_log_level()
576 self._client = server.Client(root_dir)
580 msg = testbed_messages[GUIDS]
581 self._client.send_msg(msg)
582 reply = self._client.read_reply()
583 result = reply.split("|")
584 code = int(result[0])
585 text = base64.b64decode(result[1])
587 raise RuntimeError(text)
588 return map(int, text.split(","))
590 def configure(self, name, value):
591 msg = testbed_messages[CONFIGURE]
592 type = get_type(value)
593 # avoid having "|" in this parameters
594 name = base64.b64encode(name)
595 value = base64.b64encode(str(value))
596 msg = msg % (name, value, type)
597 self._client.send_msg(msg)
598 reply = self._client.read_reply()
599 result = reply.split("|")
600 code = int(result[0])
601 text = base64.b64decode(result[1])
603 raise RuntimeError(text)
605 def create(self, guid, factory_id):
606 msg = testbed_messages[CREATE]
607 msg = msg % (guid, factory_id)
608 self._client.send_msg(msg)
609 reply = self._client.read_reply()
610 result = reply.split("|")
611 code = int(result[0])
612 text = base64.b64decode(result[1])
614 raise RuntimeError(text)
616 def create_set(self, guid, name, value):
617 msg = testbed_messages[CREATE_SET]
618 type = get_type(value)
619 # avoid having "|" in this parameters
620 name = base64.b64encode(name)
621 value = base64.b64encode(str(value))
622 msg = msg % (guid, name, value, type)
623 self._client.send_msg(msg)
624 reply = self._client.read_reply()
625 result = reply.split("|")
626 code = int(result[0])
627 text = base64.b64decode(result[1])
629 raise RuntimeError(text)
631 def factory_set(self, guid, name, value):
632 msg = testbed_messages[FACTORY_SET]
633 type = get_type(value)
634 # avoid having "|" in this parameters
635 name = base64.b64encode(name)
636 value = base64.b64encode(str(value))
637 msg = msg % (guid, name, value, type)
638 self._client.send_msg(msg)
639 reply = self._client.read_reply()
640 result = reply.split("|")
641 code = int(result[0])
642 text = base64.b64decode(result[1])
644 raise RuntimeError(text)
646 def connect(self, guid1, connector_type_name1, guid2,
647 connector_type_name2):
648 msg = testbed_messages[CONNECT]
649 msg = msg % (guid1, connector_type_name1, guid2,
650 connector_type_name2)
651 self._client.send_msg(msg)
652 reply = self._client.read_reply()
653 result = reply.split("|")
654 code = int(result[0])
655 text = base64.b64decode(result[1])
657 raise RuntimeError(text)
659 def cross_connect(self, guid, connector_type_name, cross_guid,
660 cross_testbed_id, cross_factory_id, cross_connector_type_name):
661 msg = testbed_messages[CROSS_CONNECT]
662 msg = msg % (guid, connector_type_name, cross_guid,
663 cross_testbed_id, cross_factory_id, cross_connector_type_name)
664 self._client.send_msg(msg)
665 reply = self._client.read_reply()
666 result = reply.split("|")
667 code = int(result[0])
668 text = base64.b64decode(result[1])
670 raise RuntimeError(text)
672 def add_trace(self, guid, trace_id):
673 msg = testbed_messages[ADD_TRACE]
674 msg = msg % (guid, trace_id)
675 self._client.send_msg(msg)
676 reply = self._client.read_reply()
677 result = reply.split("|")
678 code = int(result[0])
679 text = base64.b64decode(result[1])
681 raise RuntimeError(text)
683 def add_address(self, guid, family, address, netprefix, broadcast):
684 msg = testbed_messages[ADD_ADDRESS]
685 msg = msg % (guid, family, address, netprefix, broadcast)
686 self._client.send_msg(msg)
687 reply = self._client.read_reply()
688 result = reply.split("|")
689 code = int(result[0])
690 text = base64.b64decode(result[1])
692 raise RuntimeError(text)
694 def add_route(self, guid, destination, netprefix, nexthop):
695 msg = testbed_messages[ADD_ROUTE]
696 msg = msg % (guid, destination, netprefix, nexthop)
697 self._client.send_msg(msg)
698 reply = self._client.read_reply()
699 result = reply.split("|")
700 code = int(result[0])
701 text = base64.b64decode(result[1])
703 raise RuntimeError(text)
706 msg = testbed_messages[DO_SETUP]
707 self._client.send_msg(msg)
708 reply = self._client.read_reply()
709 result = reply.split("|")
710 code = int(result[0])
711 text = base64.b64decode(result[1])
713 raise RuntimeError(text)
716 msg = testbed_messages[DO_CREATE]
717 self._client.send_msg(msg)
718 reply = self._client.read_reply()
719 result = reply.split("|")
720 code = int(result[0])
721 text = base64.b64decode(result[1])
723 raise RuntimeError(text)
725 def do_connect(self):
726 msg = testbed_messages[DO_CONNECT]
727 self._client.send_msg(msg)
728 reply = self._client.read_reply()
729 result = reply.split("|")
730 code = int(result[0])
731 text = base64.b64decode(result[1])
733 raise RuntimeError(text)
735 def do_configure(self):
736 msg = testbed_messages[DO_CONFIGURE]
737 self._client.send_msg(msg)
738 reply = self._client.read_reply()
739 result = reply.split("|")
740 code = int(result[0])
741 text = base64.b64decode(result[1])
743 raise RuntimeError(text)
745 def do_cross_connect(self):
746 msg = testbed_messages[DO_CROSS_CONNECT]
747 self._client.send_msg(msg)
748 reply = self._client.read_reply()
749 result = reply.split("|")
750 code = int(result[0])
751 text = base64.b64decode(result[1])
753 raise RuntimeError(text)
755 def start(self, time = TIME_NOW):
756 msg = testbed_messages[START]
758 self._client.send_msg(msg)
759 reply = self._client.read_reply()
760 result = reply.split("|")
761 code = int(result[0])
762 text = base64.b64decode(result[1])
764 raise RuntimeError(text)
766 def stop(self, time = TIME_NOW):
767 msg = testbed_messages[STOP]
769 self._client.send_msg(msg)
770 reply = self._client.read_reply()
771 result = reply.split("|")
772 code = int(result[0])
773 text = base64.b64decode(result[1])
775 raise RuntimeError(text)
777 def set(self, time, guid, name, value):
778 msg = testbed_messages[SET]
779 type = get_type(value)
780 # avoid having "|" in this parameters
781 name = base64.b64encode(name)
782 value = base64.b64encode(str(value))
783 msg = msg % (time, guid, name, value, type)
784 self._client.send_msg(msg)
785 reply = self._client.read_reply()
786 result = reply.split("|")
787 code = int(result[0])
788 text = base64.b64decode(result[1])
790 raise RuntimeError(text)
792 def get(self, time, guid, name):
793 msg = testbed_messages[GET]
794 # avoid having "|" in this parameters
795 name = base64.b64encode(name)
796 msg = msg % (time, guid, name)
797 self._client.send_msg(msg)
798 reply = self._client.read_reply()
799 result = reply.split("|")
800 code = int(result[0])
801 text = base64.b64decode(result[1])
803 raise RuntimeError(text)
806 def action(self, time, guid, action):
807 msg = testbed_messages[ACTION]
808 msg = msg % (time, guid, action)
809 self._client.send_msg(msg)
810 reply = self._client.read_reply()
811 result = reply.split("|")
812 code = int(result[0])
813 text = base64.b64decode(result[1])
815 raise RuntimeError(text)
817 def status(self, guid):
818 msg = testbed_messages[STATUS]
820 self._client.send_msg(msg)
821 reply = self._client.read_reply()
822 result = reply.split("|")
823 code = int(result[0])
824 text = base64.b64decode(result[1])
826 raise RuntimeError(text)
829 def trace(self, guid, trace_id):
830 msg = testbed_messages[TRACE]
831 msg = msg % (guid, trace_id)
832 self._client.send_msg(msg)
833 reply = self._client.read_reply()
834 result = reply.split("|")
835 code = int(result[0])
836 text = base64.b64decode(result[1])
838 raise RuntimeError(text)
842 msg = testbed_messages[SHUTDOWN]
843 self._client.send_msg(msg)
844 reply = self._client.read_reply()
845 result = reply.split("|")
846 code = int(result[0])
847 text = base64.b64decode(result[1])
849 raise RuntimeError(text)
850 self._client.send_stop()
852 class ExperimentControllerProxy(object):
853 def __init__(self, root_dir, log_level, experiment_xml = None, launch = True):
855 if experiment_xml == None:
856 raise RuntimeError("To launch a ExperimentControllerServer a \
857 xml description of the experiment is required")
859 s = ExperimentControllerServer(root_dir, experiment_xml)
860 if log_level == AccessConfiguration.DEBUG_LEVEL:
861 s.set_debug_log_level()
864 self._client = server.Client(root_dir)
867 def experiment_xml(self):
868 msg = controller_messages[XML]
869 self._client.send_msg(msg)
870 reply = self._client.read_reply()
871 result = reply.split("|")
872 code = int(result[0])
873 text = base64.b64decode(result[1])
875 raise RuntimeError(text)
878 def set_access_configuration(self, testbed_guid, access_config):
879 mode = access_config.get_attribute_value("mode")
880 communication = access_config.get_attribute_value("communication")
881 host = access_config.get_attribute_value("host")
882 user = access_config.get_attribute_value("user")
883 port = access_config.get_attribute_value("port")
884 root_dir = access_config.get_attribute_value("rootDirectory")
885 use_agent = access_config.get_attribute_value("useAgent")
886 msg = controller_messages[ACCESS]
887 msg = msg % (testbed_guid, mode, communication, host, user, port,
889 self._client.send_msg(msg)
890 reply = self._client.read_reply()
891 result = reply.split("|")
892 code = int(result[0])
893 text = base64.b64decode(result[1])
895 raise RuntimeError(text)
897 def trace(self, testbed_guid, guid, trace_id):
898 msg = controller_messages[TRACE]
899 msg = msg % (testbed_guid, guid, trace_id)
900 self._client.send_msg(msg)
901 reply = self._client.read_reply()
902 result = reply.split("|")
903 code = int(result[0])
904 text = base64.b64decode(result[1])
907 raise RuntimeError(text)
910 msg = controller_messages[START]
911 self._client.send_msg(msg)
912 reply = self._client.read_reply()
913 result = reply.split("|")
914 code = int(result[0])
915 text = base64.b64decode(result[1])
917 raise RuntimeError(text)
920 msg = controller_messages[STOP]
921 self._client.send_msg(msg)
922 reply = self._client.read_reply()
923 result = reply.split("|")
924 code = int(result[0])
925 text = base64.b64decode(result[1])
927 raise RuntimeError(text)
929 def is_finished(self, guid):
930 msg = controller_messages[FINISHED]
932 self._client.send_msg(msg)
933 reply = self._client.read_reply()
934 result = reply.split("|")
935 code = int(result[0])
936 text = base64.b64decode(result[1])
938 raise RuntimeError(text)
939 return text == "True"
942 msg = controller_messages[SHUTDOWN]
943 self._client.send_msg(msg)
944 reply = self._client.read_reply()
945 result = reply.split("|")
946 code = int(result[0])
947 text = base64.b64decode(result[1])
949 raise RuntimeError(text)
950 self._client.send_stop()