2 # -*- coding: utf-8 -*-
5 from nepi.core.attributes import AttributesMap, Attribute
6 from nepi.util import server, validation
7 from nepi.util.constants import TIME_NOW
18 # PROTOCOL INSTRUCTION MESSAGES
55 # EXPERIMENT CONTROLER PROTOCOL MESSAGES
56 controller_messages = dict({
58 ACCESS: "%d|%s" % (ACCESS, "%d|%s|%s|%s|%s|%d|%s|%r|%s"),
59 TRACE: "%d|%s" % (TRACE, "%d|%d|%s|%s"),
60 FINISHED: "%d|%s" % (FINISHED, "%d"),
63 RECOVER : "%d" % RECOVER,
64 SHUTDOWN: "%d" % SHUTDOWN,
67 # TESTBED INSTANCE PROTOCOL MESSAGES
68 testbed_messages = dict({
69 TRACE: "%d|%s" % (TRACE, "%d|%s|%s"),
72 SHUTDOWN: "%d" % SHUTDOWN,
73 CONFIGURE: "%d|%s" % (CONFIGURE, "%s|%s|%d"),
74 CREATE: "%d|%s" % (CREATE, "%d|%s"),
75 CREATE_SET: "%d|%s" % (CREATE_SET, "%d|%s|%s|%d"),
76 FACTORY_SET: "%d|%s" % (FACTORY_SET, "%d|%s|%s|%d"),
77 CONNECT: "%d|%s" % (CONNECT, "%d|%s|%d|%s"),
78 CROSS_CONNECT: "%d|%s" % (CROSS_CONNECT, "%d|%s|%d|%d|%s|%s"),
79 ADD_TRACE: "%d|%s" % (ADD_TRACE, "%d|%s"),
80 ADD_ADDRESS: "%d|%s" % (ADD_ADDRESS, "%d|%s|%d|%s"),
81 ADD_ROUTE: "%d|%s" % (ADD_ROUTE, "%d|%s|%d|%s"),
82 DO_SETUP: "%d" % DO_SETUP,
83 DO_CREATE: "%d" % DO_CREATE,
84 DO_CONNECT: "%d" % DO_CONNECT,
85 DO_CONFIGURE: "%d" % DO_CONFIGURE,
86 DO_CROSS_CONNECT: "%d" % DO_CROSS_CONNECT,
87 GET: "%d|%s" % (GET, "%s|%d|%s"),
88 SET: "%d|%s" % (SET, "%s|%d|%s|%s|%d"),
89 GET_ROUTE: "%d|%s" % (GET, "%d|%d|%s"),
90 GET_ADDRESS: "%d|%s" % (GET, "%d|%d|%s"),
91 ACTION: "%d|%s" % (ACTION, "%s|%d|%s"),
92 STATUS: "%d|%s" % (STATUS, "%d"),
96 instruction_text = dict({
102 FINISHED: "FINISHED",
106 SHUTDOWN: "SHUTDOWN",
107 CONFIGURE: "CONFIGURE",
109 CREATE_SET: "CREATE_SET",
110 FACTORY_SET: "FACTORY_SET",
112 CROSS_CONNECT: "CROSS_CONNECT",
113 ADD_TRACE: "ADD_TRACE",
114 ADD_ADDRESS: "ADD_ADDRESS",
115 ADD_ROUTE: "ADD_ROUTE",
116 DO_SETUP: "DO_SETUP",
117 DO_CREATE: "DO_CREATE",
118 DO_CONNECT: "DO_CONNECT",
119 DO_CONFIGURE: "DO_CONFIGURE",
120 DO_CROSS_CONNECT: "DO_CROSS_CONNECT",
123 GET_ROUTE: "GET_ROUTE",
124 GET_ADDRESS: "GET_ADDRESS",
135 if isinstance(value, bool):
137 elif isinstance(value, int):
139 elif isinstance(value, float):
144 def set_type(type, value):
150 value = value == "True"
155 def log_msg(server, params):
156 instr = int(params[0])
157 instr_txt = instruction_text[instr]
158 server.log_debug("%s - msg: %s [%s]" % (server.__class__.__name__,
159 instr_txt, ", ".join(map(str, params[1:]))))
161 def log_reply(server, reply):
162 res = reply.split("|")
164 code_txt = instruction_text[code]
165 txt = base64.b64decode(res[1])
166 server.log_debug("%s - reply: %s %s" % (server.__class__.__name__,
169 def launch_ssh_daemon_client(root_dir, python_code, host, port, user, agent):
172 proc = server.popen_ssh_subprocess(python_code, host = host,
173 port = port, user = user, agent = agent)
175 err = proc.stderr.read()
176 raise RuntimeError("Client could not be executed: %s" % \
179 return server.Client(root_dir, host = host, port = port, user = user,
182 def to_server_log_level(log_level):
183 return server.DEBUG_LEVEL \
184 if log_level == AccessConfiguration.DEBUG_LEVEL \
185 else server.ERROR_LEVEL
187 def get_access_config_params(access_config):
188 root_dir = access_config.get_attribute_value("rootDirectory")
189 log_level = access_config.get_attribute_value("logLevel")
190 log_level = to_server_log_level(log_level)
191 user = host = port = agent = None
192 communication = access_config.get_attribute_value("communication")
193 if communication == AccessConfiguration.ACCESS_SSH:
194 user = access_config.get_attribute_value("user")
195 host = access_config.get_attribute_value("host")
196 port = access_config.get_attribute_value("port")
197 agent = access_config.get_attribute_value("useAgent")
198 return (root_dir, log_level, user, host, port, agent)
200 class AccessConfiguration(AttributesMap):
201 MODE_SINGLE_PROCESS = "SINGLE"
202 MODE_DAEMON = "DAEMON"
204 ACCESS_LOCAL = "LOCAL"
205 ERROR_LEVEL = "Error"
206 DEBUG_LEVEL = "Debug"
209 super(AccessConfiguration, self).__init__()
210 self.add_attribute(name = "mode",
211 help = "Instance execution mode",
212 type = Attribute.ENUM,
213 value = AccessConfiguration.MODE_SINGLE_PROCESS,
214 allowed = [AccessConfiguration.MODE_DAEMON,
215 AccessConfiguration.MODE_SINGLE_PROCESS],
216 validation_function = validation.is_enum)
217 self.add_attribute(name = "communication",
218 help = "Instance communication mode",
219 type = Attribute.ENUM,
220 value = AccessConfiguration.ACCESS_LOCAL,
221 allowed = [AccessConfiguration.ACCESS_LOCAL,
222 AccessConfiguration.ACCESS_SSH],
223 validation_function = validation.is_enum)
224 self.add_attribute(name = "host",
225 help = "Host where the testbed will be executed",
226 type = Attribute.STRING,
228 validation_function = validation.is_string)
229 self.add_attribute(name = "user",
230 help = "User on the Host to execute the testbed",
231 type = Attribute.STRING,
232 value = getpass.getuser(),
233 validation_function = validation.is_string)
234 self.add_attribute(name = "port",
235 help = "Port on the Host",
236 type = Attribute.INTEGER,
238 validation_function = validation.is_integer)
239 self.add_attribute(name = "rootDirectory",
240 help = "Root directory for storing process files",
241 type = Attribute.STRING,
243 validation_function = validation.is_string) # TODO: validation.is_path
244 self.add_attribute(name = "useAgent",
245 help = "Use -A option for forwarding of the authentication agent, if ssh access is used",
246 type = Attribute.BOOL,
248 validation_function = validation.is_bool)
249 self.add_attribute(name = "logLevel",
250 help = "Log level for instance",
251 type = Attribute.ENUM,
252 value = AccessConfiguration.ERROR_LEVEL,
253 allowed = [AccessConfiguration.ERROR_LEVEL,
254 AccessConfiguration.DEBUG_LEVEL],
255 validation_function = validation.is_enum)
256 self.add_attribute(name = "recover",
257 help = "Do not intantiate testbeds, rather, reconnect to already-running instances. Used to recover from a dead controller.",
258 type = Attribute.BOOL,
260 validation_function = validation.is_bool)
262 class TempDir(object):
264 self.path = tempfile.mkdtemp()
267 shutil.rmtree(self.path)
269 class PermDir(object):
270 def __init__(self, path):
273 def create_controller(xml, access_config = None):
274 mode = None if not access_config \
275 else access_config.get_attribute_value("mode")
276 launch = True if not access_config \
277 else not access_config.get_attribute_value("recover")
278 if not mode or mode == AccessConfiguration.MODE_SINGLE_PROCESS:
280 raise ValueError, "Unsupported instantiation mode: %s with lanch=False" % (mode,)
282 from nepi.core.execute import ExperimentController
284 if not access_config or not access_config.has_attribute("rootDirectory"):
287 root_dir = PermDir(access_config.get_attribute_value("rootDirectory"))
288 controller = ExperimentController(xml, root_dir.path)
290 # inject reference to temporary dir, so that it gets cleaned
291 # up at destruction time.
292 controller._tempdir = root_dir
295 elif mode == AccessConfiguration.MODE_DAEMON:
296 (root_dir, log_level, user, host, port, agent) = \
297 get_access_config_params(access_config)
298 return ExperimentControllerProxy(root_dir, log_level,
299 experiment_xml = xml, host = host, port = port, user = user,
300 agent = agent, launch = launch)
301 raise RuntimeError("Unsupported access configuration '%s'" % mode)
303 def create_testbed_instance(testbed_id, testbed_version, access_config):
304 mode = None if not access_config \
305 else access_config.get_attribute_value("mode")
306 launch = True if not access_config \
307 else not access_config.get_attribute_value("recover")
308 if not mode or mode == AccessConfiguration.MODE_SINGLE_PROCESS:
310 raise ValueError, "Unsupported instantiation mode: %s with lanch=False" % (mode,)
311 return _build_testbed_instance(testbed_id, testbed_version)
312 elif mode == AccessConfiguration.MODE_DAEMON:
313 (root_dir, log_level, user, host, port, agent) = \
314 get_access_config_params(access_config)
315 return TestbedInstanceProxy(root_dir, log_level, testbed_id = testbed_id,
316 testbed_version = testbed_version, host = host, port = port,
317 user = user, agent = agent, launch = launch)
318 raise RuntimeError("Unsupported access configuration '%s'" % mode)
320 def _build_testbed_instance(testbed_id, testbed_version):
321 mod_name = "nepi.testbeds.%s" % (testbed_id.lower())
322 if not mod_name in sys.modules:
324 module = sys.modules[mod_name]
325 return module.TestbedInstance(testbed_version)
327 class TestbedInstanceServer(server.Server):
328 def __init__(self, root_dir, log_level, testbed_id, testbed_version):
329 super(TestbedInstanceServer, self).__init__(root_dir, log_level)
330 self._testbed_id = testbed_id
331 self._testbed_version = testbed_version
334 def post_daemonize(self):
335 self._testbed = _build_testbed_instance(self._testbed_id,
336 self._testbed_version)
338 def reply_action(self, msg):
340 result = base64.b64encode("Invalid command line")
341 reply = "%d|%s" % (ERROR, result)
343 params = msg.split("|")
344 instruction = int(params[0])
345 log_msg(self, params)
347 if instruction == TRACE:
348 reply = self.trace(params)
349 elif instruction == START:
350 reply = self.start(params)
351 elif instruction == STOP:
352 reply = self.stop(params)
353 elif instruction == SHUTDOWN:
354 reply = self.shutdown(params)
355 elif instruction == CONFIGURE:
356 reply = self.defer_configure(params)
357 elif instruction == CREATE:
358 reply = self.defer_create(params)
359 elif instruction == CREATE_SET:
360 reply = self.defer_create_set(params)
361 elif instruction == FACTORY_SET:
362 reply = self.defer_factory_set(params)
363 elif instruction == CONNECT:
364 reply = self.defer_connect(params)
365 elif instruction == CROSS_CONNECT:
366 reply = self.defer_cross_connect(params)
367 elif instruction == ADD_TRACE:
368 reply = self.defer_add_trace(params)
369 elif instruction == ADD_ADDRESS:
370 reply = self.defer_add_address(params)
371 elif instruction == ADD_ROUTE:
372 reply = self.defer_add_route(params)
373 elif instruction == DO_SETUP:
374 reply = self.do_setup(params)
375 elif instruction == DO_CREATE:
376 reply = self.do_create(params)
377 elif instruction == DO_CONNECT:
378 reply = self.do_connect(params)
379 elif instruction == DO_CONFIGURE:
380 reply = self.do_configure(params)
381 elif instruction == DO_CROSS_CONNECT:
382 reply = self.do_cross_connect(params)
383 elif instruction == GET:
384 reply = self.get(params)
385 elif instruction == SET:
386 reply = self.set(params)
387 elif instruction == GET_ADDRESS:
388 reply = self.get_address(params)
389 elif instruction == GET_ROUTE:
390 reply = self.get_route(params)
391 elif instruction == ACTION:
392 reply = self.action(params)
393 elif instruction == STATUS:
394 reply = self.status(params)
395 elif instruction == GUIDS:
396 reply = self.guids(params)
398 error = "Invalid instruction %s" % instruction
399 self.log_error(error)
400 result = base64.b64encode(error)
401 reply = "%d|%s" % (ERROR, result)
403 error = self.log_error()
404 result = base64.b64encode(error)
405 reply = "%d|%s" % (ERROR, result)
406 log_reply(self, reply)
409 def guids(self, params):
410 guids = self._testbed.guids
411 guids = ",".join(map(str, guids))
412 result = base64.b64encode(guids)
413 return "%d|%s" % (OK, result)
415 def defer_create(self, params):
416 guid = int(params[1])
417 factory_id = params[2]
418 self._testbed.defer_create(guid, factory_id)
419 return "%d|%s" % (OK, "")
421 def trace(self, params):
422 guid = int(params[1])
424 attribute = base64.b64decode(params[3])
425 trace = self._testbed.trace(guid, trace_id, attribute)
426 result = base64.b64encode(trace)
427 return "%d|%s" % (OK, result)
429 def start(self, params):
430 self._testbed.start()
431 return "%d|%s" % (OK, "")
433 def stop(self, params):
435 return "%d|%s" % (OK, "")
437 def shutdown(self, params):
438 self._testbed.shutdown()
439 return "%d|%s" % (OK, "")
441 def defer_configure(self, params):
442 name = base64.b64decode(params[1])
443 value = base64.b64decode(params[2])
444 type = int(params[3])
445 value = set_type(type, value)
446 self._testbed.defer_configure(name, value)
447 return "%d|%s" % (OK, "")
449 def defer_create_set(self, params):
450 guid = int(params[1])
451 name = base64.b64decode(params[2])
452 value = base64.b64decode(params[3])
453 type = int(params[4])
454 value = set_type(type, value)
455 self._testbed.defer_create_set(guid, name, value)
456 return "%d|%s" % (OK, "")
458 def defer_factory_set(self, params):
459 name = base64.b64decode(params[1])
460 value = base64.b64decode(params[2])
461 type = int(params[3])
462 value = set_type(type, value)
463 self._testbed.defer_factory_set(name, value)
464 return "%d|%s" % (OK, "")
466 def defer_connect(self, params):
467 guid1 = int(params[1])
468 connector_type_name1 = params[2]
469 guid2 = int(params[3])
470 connector_type_name2 = params[4]
471 self._testbed.defer_connect(guid1, connector_type_name1, guid2,
472 connector_type_name2)
473 return "%d|%s" % (OK, "")
475 def defer_cross_connect(self, params):
476 guid = int(params[1])
477 connector_type_name = params[2]
478 cross_guid = int(params[3])
479 connector_type_name = params[4]
480 cross_guid = int(params[5])
481 cross_testbed_id = params[6]
482 cross_factory_id = params[7]
483 cross_connector_type_name = params[8]
484 self._testbed.defer_cross_connect(guid, connector_type_name, cross_guid,
485 cross_testbed_id, cross_factory_id, cross_connector_type_name)
486 return "%d|%s" % (OK, "")
488 def defer_add_trace(self, params):
489 guid = int(params[1])
491 self._testbed.defer_add_trace(guid, trace_id)
492 return "%d|%s" % (OK, "")
494 def defer_add_address(self, params):
495 guid = int(params[1])
497 netprefix = int(params[3])
498 broadcast = params[4]
499 self._testbed.defer_add_address(guid, address, netprefix,
501 return "%d|%s" % (OK, "")
503 def defer_add_route(self, params):
504 guid = int(params[1])
505 destination = params[2]
506 netprefix = int(params[3])
508 self._testbed.defer_add_route(guid, destination, netprefix, nexthop)
509 return "%d|%s" % (OK, "")
511 def do_setup(self, params):
512 self._testbed.do_setup()
513 return "%d|%s" % (OK, "")
515 def do_create(self, params):
516 self._testbed.do_create()
517 return "%d|%s" % (OK, "")
519 def do_connect(self, params):
520 self._testbed.do_connect()
521 return "%d|%s" % (OK, "")
523 def do_configure(self, params):
524 self._testbed.do_configure()
525 return "%d|%s" % (OK, "")
527 def do_cross_connect(self, params):
528 self._testbed.do_cross_connect()
529 return "%d|%s" % (OK, "")
531 def get(self, params):
533 guid = int(param[2] )
534 name = base64.b64decode(params[3])
535 value = self._testbed.get(time, guid, name)
536 result = base64.b64encode(str(value))
537 return "%d|%s" % (OK, result)
539 def set(self, params):
541 guid = int(params[2])
542 name = base64.b64decode(params[3])
543 value = base64.b64decode(params[4])
544 type = int(params[3])
545 value = set_type(type, value)
546 self._testbed.set(time, guid, name, value)
547 return "%d|%s" % (OK, "")
549 def get_address(self, params):
551 index = int(param[2])
552 attribute = base64.b64decode(param[3])
553 value = self._testbed.get_address(guid, index, attribute)
554 result = base64.b64encode(str(value))
555 return "%d|%s" % (OK, result)
557 def get_route(self, params):
559 index = int(param[2])
560 attribute = base64.b64decode(param[3])
561 value = self._testbed.get_route(guid, index, attribute)
562 result = base64.b64encode(str(value))
563 return "%d|%s" % (OK, result)
565 def action(self, params):
567 guid = int(params[2])
568 command = base64.b64decode(params[3])
569 self._testbed.action(time, guid, command)
570 return "%d|%s" % (OK, "")
572 def status(self, params):
573 guid = int(params[1])
574 status = self._testbed.status(guid)
575 result = base64.b64encode(str(status))
576 return "%d|%s" % (OK, result)
578 class ExperimentControllerServer(server.Server):
579 def __init__(self, root_dir, log_level, experiment_xml):
580 super(ExperimentControllerServer, self).__init__(root_dir, log_level)
581 self._experiment_xml = experiment_xml
582 self._controller = None
584 def post_daemonize(self):
585 from nepi.core.execute import ExperimentController
586 self._controller = ExperimentController(self._experiment_xml,
587 root_dir = self._root_dir)
589 def reply_action(self, msg):
591 result = base64.b64encode("Invalid command line")
592 reply = "%d|%s" % (ERROR, result)
594 params = msg.split("|")
595 instruction = int(params[0])
596 log_msg(self, params)
598 if instruction == XML:
599 reply = self.experiment_xml(params)
600 elif instruction == ACCESS:
601 reply = self.set_access_configuration(params)
602 elif instruction == TRACE:
603 reply = self.trace(params)
604 elif instruction == FINISHED:
605 reply = self.is_finished(params)
606 elif instruction == START:
607 reply = self.start(params)
608 elif instruction == STOP:
609 reply = self.stop(params)
610 elif instruction == RECOVER:
611 reply = self.recover(params)
612 elif instruction == SHUTDOWN:
613 reply = self.shutdown(params)
615 error = "Invalid instruction %s" % instruction
616 self.log_error(error)
617 result = base64.b64encode(error)
618 reply = "%d|%s" % (ERROR, result)
620 error = self.log_error()
621 result = base64.b64encode(error)
622 reply = "%d|%s" % (ERROR, result)
623 log_reply(self, reply)
626 def experiment_xml(self, params):
627 xml = self._controller.experiment_xml
628 result = base64.b64encode(xml)
629 return "%d|%s" % (OK, result)
631 def set_access_configuration(self, params):
632 testbed_guid = int(params[1])
634 communication = params[3]
637 port = int(params[6])
639 use_agent = params[8] == "True"
640 log_level = params[9]
641 access_config = AccessConfiguration()
642 access_config.set_attribute_value("mode", mode)
643 access_config.set_attribute_value("communication", communication)
644 access_config.set_attribute_value("host", host)
645 access_config.set_attribute_value("user", user)
646 access_config.set_attribute_value("port", port)
647 access_config.set_attribute_value("rootDirectory", root_dir)
648 access_config.set_attribute_value("useAgent", use_agent)
649 access_config.set_attribute_value("logLevel", log_level)
650 self._controller.set_access_configuration(testbed_guid,
652 return "%d|%s" % (OK, "")
654 def trace(self, params):
655 testbed_guid = int(params[1])
656 guid = int(params[2])
658 attribute = base64.b64decode(params[4])
659 trace = self._controller.trace(testbed_guid, guid, trace_id, attribute)
660 result = base64.b64encode(trace)
661 return "%d|%s" % (OK, result)
663 def is_finished(self, params):
664 guid = int(params[1])
665 status = self._controller.is_finished(guid)
666 result = base64.b64encode(str(status))
667 return "%d|%s" % (OK, result)
669 def start(self, params):
670 self._controller.start()
671 return "%d|%s" % (OK, "")
673 def stop(self, params):
674 self._controller.stop()
675 return "%d|%s" % (OK, "")
677 def recover(self, params):
678 self._controller.recover()
679 return "%d|%s" % (OK, "")
681 def shutdown(self, params):
682 self._controller.shutdown()
683 return "%d|%s" % (OK, "")
685 class TestbedInstanceProxy(object):
686 def __init__(self, root_dir, log_level, testbed_id = None,
687 testbed_version = None, launch = True, host = None,
688 port = None, user = None, agent = None):
690 if testbed_id == None or testbed_version == None:
691 raise RuntimeError("To launch a TesbedInstance server a \
692 testbed_id and testbed_version are required")
695 python_code = "from nepi.util.proxy import \
696 TesbedInstanceServer;\
697 s = TestbedInstanceServer('%s', %d, '%s', '%s');\
698 s.run()" % (root_dir, log_level, testbed_id,
700 self._client = launch_ssh_daemon_client(root_dir, python_code,
701 host, port, user, agent)
704 s = TestbedInstanceServer(root_dir, log_level, testbed_id,
708 self._client = server.Client(root_dir)
710 # attempt to reconnect
712 self._client = launch_ssh_daemon_client(root_dir, None,
713 host, port, user, agent)
715 self._client = server.Client(root_dir)
719 msg = testbed_messages[GUIDS]
720 self._client.send_msg(msg)
721 reply = self._client.read_reply()
722 result = reply.split("|")
723 code = int(result[0])
724 text = base64.b64decode(result[1])
726 raise RuntimeError(text)
727 return map(int, text.split(","))
729 def defer_configure(self, name, value):
730 msg = testbed_messages[CONFIGURE]
731 type = get_type(value)
732 # avoid having "|" in this parameters
733 name = base64.b64encode(name)
734 value = base64.b64encode(str(value))
735 msg = msg % (name, value, type)
736 self._client.send_msg(msg)
737 reply = self._client.read_reply()
738 result = reply.split("|")
739 code = int(result[0])
740 text = base64.b64decode(result[1])
742 raise RuntimeError(text)
744 def defer_create(self, guid, factory_id):
745 msg = testbed_messages[CREATE]
746 msg = msg % (guid, factory_id)
747 self._client.send_msg(msg)
748 reply = self._client.read_reply()
749 result = reply.split("|")
750 code = int(result[0])
751 text = base64.b64decode(result[1])
753 raise RuntimeError(text)
755 def defer_create_set(self, guid, name, value):
756 msg = testbed_messages[CREATE_SET]
757 type = get_type(value)
758 # avoid having "|" in this parameters
759 name = base64.b64encode(name)
760 value = base64.b64encode(str(value))
761 msg = msg % (guid, name, value, type)
762 self._client.send_msg(msg)
763 reply = self._client.read_reply()
764 result = reply.split("|")
765 code = int(result[0])
766 text = base64.b64decode(result[1])
768 raise RuntimeError(text)
770 def defer_factory_set(self, guid, name, value):
771 msg = testbed_messages[FACTORY_SET]
772 type = get_type(value)
773 # avoid having "|" in this parameters
774 name = base64.b64encode(name)
775 value = base64.b64encode(str(value))
776 msg = msg % (guid, name, value, type)
777 self._client.send_msg(msg)
778 reply = self._client.read_reply()
779 result = reply.split("|")
780 code = int(result[0])
781 text = base64.b64decode(result[1])
783 raise RuntimeError(text)
785 def defer_connect(self, guid1, connector_type_name1, guid2,
786 connector_type_name2):
787 msg = testbed_messages[CONNECT]
788 msg = msg % (guid1, connector_type_name1, guid2,
789 connector_type_name2)
790 self._client.send_msg(msg)
791 reply = self._client.read_reply()
792 result = reply.split("|")
793 code = int(result[0])
794 text = base64.b64decode(result[1])
796 raise RuntimeError(text)
798 def defer_cross_connect(self, guid, connector_type_name, cross_guid,
799 cross_testbed_id, cross_factory_id, cross_connector_type_name):
800 msg = testbed_messages[CROSS_CONNECT]
801 msg = msg % (guid, connector_type_name, cross_guid,
802 cross_testbed_id, cross_factory_id, cross_connector_type_name)
803 self._client.send_msg(msg)
804 reply = self._client.read_reply()
805 result = reply.split("|")
806 code = int(result[0])
807 text = base64.b64decode(result[1])
809 raise RuntimeError(text)
811 def defer_add_trace(self, guid, trace_id):
812 msg = testbed_messages[ADD_TRACE]
813 msg = msg % (guid, trace_id)
814 self._client.send_msg(msg)
815 reply = self._client.read_reply()
816 result = reply.split("|")
817 code = int(result[0])
818 text = base64.b64decode(result[1])
820 raise RuntimeError(text)
822 def defer_add_address(self, guid, address, netprefix, broadcast):
823 msg = testbed_messages[ADD_ADDRESS]
824 msg = msg % (guid, address, netprefix, broadcast)
825 self._client.send_msg(msg)
826 reply = self._client.read_reply()
827 result = reply.split("|")
828 code = int(result[0])
829 text = base64.b64decode(result[1])
831 raise RuntimeError(text)
833 def defer_add_route(self, guid, destination, netprefix, nexthop):
834 msg = testbed_messages[ADD_ROUTE]
835 msg = msg % (guid, destination, netprefix, nexthop)
836 self._client.send_msg(msg)
837 reply = self._client.read_reply()
838 result = reply.split("|")
839 code = int(result[0])
840 text = base64.b64decode(result[1])
842 raise RuntimeError(text)
845 msg = testbed_messages[DO_SETUP]
846 self._client.send_msg(msg)
847 reply = self._client.read_reply()
848 result = reply.split("|")
849 code = int(result[0])
850 text = base64.b64decode(result[1])
852 raise RuntimeError(text)
855 msg = testbed_messages[DO_CREATE]
856 self._client.send_msg(msg)
857 reply = self._client.read_reply()
858 result = reply.split("|")
859 code = int(result[0])
860 text = base64.b64decode(result[1])
862 raise RuntimeError(text)
864 def do_connect(self):
865 msg = testbed_messages[DO_CONNECT]
866 self._client.send_msg(msg)
867 reply = self._client.read_reply()
868 result = reply.split("|")
869 code = int(result[0])
870 text = base64.b64decode(result[1])
872 raise RuntimeError(text)
874 def do_configure(self):
875 msg = testbed_messages[DO_CONFIGURE]
876 self._client.send_msg(msg)
877 reply = self._client.read_reply()
878 result = reply.split("|")
879 code = int(result[0])
880 text = base64.b64decode(result[1])
882 raise RuntimeError(text)
884 def do_cross_connect(self):
885 msg = testbed_messages[DO_CROSS_CONNECT]
886 self._client.send_msg(msg)
887 reply = self._client.read_reply()
888 result = reply.split("|")
889 code = int(result[0])
890 text = base64.b64decode(result[1])
892 raise RuntimeError(text)
894 def start(self, time = TIME_NOW):
895 msg = testbed_messages[START]
896 self._client.send_msg(msg)
897 reply = self._client.read_reply()
898 result = reply.split("|")
899 code = int(result[0])
900 text = base64.b64decode(result[1])
902 raise RuntimeError(text)
904 def stop(self, time = TIME_NOW):
905 msg = testbed_messages[STOP]
906 self._client.send_msg(msg)
907 reply = self._client.read_reply()
908 result = reply.split("|")
909 code = int(result[0])
910 text = base64.b64decode(result[1])
912 raise RuntimeError(text)
914 def set(self, time, guid, name, value):
915 msg = testbed_messages[SET]
916 type = get_type(value)
917 # avoid having "|" in this parameters
918 name = base64.b64encode(name)
919 value = base64.b64encode(str(value))
920 msg = msg % (time, guid, name, value, type)
921 self._client.send_msg(msg)
922 reply = self._client.read_reply()
923 result = reply.split("|")
924 code = int(result[0])
925 text = base64.b64decode(result[1])
927 raise RuntimeError(text)
929 def get(self, time, guid, name):
930 msg = testbed_messages[GET]
931 # avoid having "|" in this parameters
932 name = base64.b64encode(name)
933 msg = msg % (time, guid, name)
934 self._client.send_msg(msg)
935 reply = self._client.read_reply()
936 result = reply.split("|")
937 code = int(result[0])
938 text = base64.b64decode(result[1])
940 raise RuntimeError(text)
943 def get_address(self, guid, index, attribute):
944 msg = testbed_messages[GET_ADDRESS]
945 # avoid having "|" in this parameters
946 attribute = base64.b64encode(attribute)
947 msg = msg % (guid, index, attribute)
948 self._client.send_msg(msg)
949 reply = self._client.read_reply()
950 result = reply.split("|")
951 code = int(result[0])
952 text = base64.b64decode(result[1])
954 raise RuntimeError(text)
957 def get_route(self, guid, index, attribute):
958 msg = testbed_messages[GET_ROUTE]
959 # avoid having "|" in this parameters
960 attribute = base64.b64encode(attribute)
961 msg = msg % (guid, index, attribute)
962 self._client.send_msg(msg)
963 reply = self._client.read_reply()
964 result = reply.split("|")
965 code = int(result[0])
966 text = base64.b64decode(result[1])
968 raise RuntimeError(text)
971 def action(self, time, guid, action):
972 msg = testbed_messages[ACTION]
973 msg = msg % (time, guid, action)
974 self._client.send_msg(msg)
975 reply = self._client.read_reply()
976 result = reply.split("|")
977 code = int(result[0])
978 text = base64.b64decode(result[1])
980 raise RuntimeError(text)
982 def status(self, guid):
983 msg = testbed_messages[STATUS]
985 self._client.send_msg(msg)
986 reply = self._client.read_reply()
987 result = reply.split("|")
988 code = int(result[0])
989 text = base64.b64decode(result[1])
991 raise RuntimeError(text)
994 def trace(self, guid, trace_id, attribute='value'):
995 msg = testbed_messages[TRACE]
996 attribute = base64.b64encode(attribute)
997 msg = msg % (guid, trace_id, attribute)
998 self._client.send_msg(msg)
999 reply = self._client.read_reply()
1000 result = reply.split("|")
1001 code = int(result[0])
1002 text = base64.b64decode(result[1])
1004 raise RuntimeError(text)
1008 msg = testbed_messages[SHUTDOWN]
1009 self._client.send_msg(msg)
1010 reply = self._client.read_reply()
1011 result = reply.split("|")
1012 code = int(result[0])
1013 text = base64.b64decode(result[1])
1015 raise RuntimeError(text)
1016 self._client.send_stop()
1017 self._client.read_reply() # wait for it
1019 class ExperimentControllerProxy(object):
1020 def __init__(self, root_dir, log_level, experiment_xml = None,
1021 launch = True, host = None, port = None, user = None,
1025 if experiment_xml == None:
1026 raise RuntimeError("To launch a ExperimentControllerServer a \
1027 xml description of the experiment is required")
1030 xml = experiment_xml
1031 xml = xml.replace("'", r"\'")
1032 xml = xml.replace("\"", r"\'")
1033 xml = xml.replace("\n", r"")
1034 python_code = "from nepi.util.proxy import ExperimentControllerServer;\
1035 s = ExperimentControllerServer(%r, %r, %r);\
1036 s.run()" % (root_dir, log_level, xml)
1037 self._client = launch_ssh_daemon_client(root_dir, python_code,
1038 host, port, user, agent)
1041 s = ExperimentControllerServer(root_dir, log_level, experiment_xml)
1044 self._client = server.Client(root_dir)
1046 # attempt to reconnect
1048 self._client = launch_ssh_daemon_client(root_dir, None,
1049 host, port, user, agent)
1051 self._client = server.Client(root_dir)
1054 def experiment_xml(self):
1055 msg = controller_messages[XML]
1056 self._client.send_msg(msg)
1057 reply = self._client.read_reply()
1058 result = reply.split("|")
1059 code = int(result[0])
1060 text = base64.b64decode(result[1])
1062 raise RuntimeError(text)
1065 def set_access_configuration(self, testbed_guid, access_config):
1066 mode = access_config.get_attribute_value("mode")
1067 communication = access_config.get_attribute_value("communication")
1068 host = access_config.get_attribute_value("host")
1069 user = access_config.get_attribute_value("user")
1070 port = access_config.get_attribute_value("port")
1071 root_dir = access_config.get_attribute_value("rootDirectory")
1072 use_agent = access_config.get_attribute_value("useAgent")
1073 log_level = access_config.get_attribute_value("logLevel")
1074 msg = controller_messages[ACCESS]
1075 msg = msg % (testbed_guid, mode, communication, host, user, port,
1076 root_dir, use_agent, log_level)
1077 self._client.send_msg(msg)
1078 reply = self._client.read_reply()
1079 result = reply.split("|")
1080 code = int(result[0])
1081 text = base64.b64decode(result[1])
1083 raise RuntimeError(text)
1085 def trace(self, testbed_guid, guid, trace_id, attribute='value'):
1086 msg = controller_messages[TRACE]
1087 attribute = base64.b64encode(attribute)
1088 msg = msg % (testbed_guid, guid, trace_id, attribute)
1089 self._client.send_msg(msg)
1090 reply = self._client.read_reply()
1091 result = reply.split("|")
1092 code = int(result[0])
1093 text = base64.b64decode(result[1])
1096 raise RuntimeError(text)
1099 msg = controller_messages[START]
1100 self._client.send_msg(msg)
1101 reply = self._client.read_reply()
1102 result = reply.split("|")
1103 code = int(result[0])
1104 text = base64.b64decode(result[1])
1106 raise RuntimeError(text)
1109 msg = controller_messages[STOP]
1110 self._client.send_msg(msg)
1111 reply = self._client.read_reply()
1112 result = reply.split("|")
1113 code = int(result[0])
1114 text = base64.b64decode(result[1])
1116 raise RuntimeError(text)
1119 msg = controller_messages[RECOVER]
1120 self._client.send_msg(msg)
1121 reply = self._client.read_reply()
1122 result = reply.split("|")
1123 code = int(result[0])
1124 text = base64.b64decode(result[1])
1126 raise RuntimeError(text)
1128 def is_finished(self, guid):
1129 msg = controller_messages[FINISHED]
1131 self._client.send_msg(msg)
1132 reply = self._client.read_reply()
1133 result = reply.split("|")
1134 code = int(result[0])
1135 text = base64.b64decode(result[1])
1137 raise RuntimeError(text)
1138 return text == "True"
1141 msg = controller_messages[SHUTDOWN]
1142 self._client.send_msg(msg)
1143 reply = self._client.read_reply()
1144 result = reply.split("|")
1145 code = int(result[0])
1146 text = base64.b64decode(result[1])
1148 raise RuntimeError(text)
1149 self._client.send_stop()
1150 self._client.read_reply() # wait for it