2 # -*- coding: utf-8 -*-
5 from nepi.core.attributes import AttributesMap, Attribute
6 from nepi.util import server, validation
7 from nepi.util.constants import TIME_NOW
18 # PROTOCOL INSTRUCTION MESSAGES
55 # EXPERIMENT CONTROLER PROTOCOL MESSAGES
56 controller_messages = dict({
58 ACCESS: "%d|%s" % (ACCESS, "%d|%s|%s|%s|%s|%d|%s|%r|%s"),
59 TRACE: "%d|%s" % (TRACE, "%d|%d|%s|%s"),
60 FINISHED: "%d|%s" % (FINISHED, "%d"),
63 RECOVER : "%d" % RECOVER,
64 SHUTDOWN: "%d" % SHUTDOWN,
67 # TESTBED INSTANCE PROTOCOL MESSAGES
68 testbed_messages = dict({
69 TRACE: "%d|%s" % (TRACE, "%d|%s|%s"),
72 SHUTDOWN: "%d" % SHUTDOWN,
73 CONFIGURE: "%d|%s" % (CONFIGURE, "%s|%s|%d"),
74 CREATE: "%d|%s" % (CREATE, "%d|%s"),
75 CREATE_SET: "%d|%s" % (CREATE_SET, "%d|%s|%s|%d"),
76 FACTORY_SET: "%d|%s" % (FACTORY_SET, "%d|%s|%s|%d"),
77 CONNECT: "%d|%s" % (CONNECT, "%d|%s|%d|%s"),
78 CROSS_CONNECT: "%d|%s" % (CROSS_CONNECT, "%d|%s|%d|%d|%s|%s"),
79 ADD_TRACE: "%d|%s" % (ADD_TRACE, "%d|%s"),
80 ADD_ADDRESS: "%d|%s" % (ADD_ADDRESS, "%d|%s|%d|%s"),
81 ADD_ROUTE: "%d|%s" % (ADD_ROUTE, "%d|%s|%d|%s"),
82 DO_SETUP: "%d" % DO_SETUP,
83 DO_CREATE: "%d" % DO_CREATE,
84 DO_CONNECT: "%d" % DO_CONNECT,
85 DO_CONFIGURE: "%d" % DO_CONFIGURE,
86 DO_CROSS_CONNECT: "%d" % DO_CROSS_CONNECT,
87 GET: "%d|%s" % (GET, "%s|%d|%s"),
88 SET: "%d|%s" % (SET, "%s|%d|%s|%s|%d"),
89 GET_ROUTE: "%d|%s" % (GET, "%d|%d|%s"),
90 GET_ADDRESS: "%d|%s" % (GET, "%d|%d|%s"),
91 ACTION: "%d|%s" % (ACTION, "%s|%d|%s"),
92 STATUS: "%d|%s" % (STATUS, "%d"),
96 instruction_text = dict({
102 FINISHED: "FINISHED",
106 SHUTDOWN: "SHUTDOWN",
107 CONFIGURE: "CONFIGURE",
109 CREATE_SET: "CREATE_SET",
110 FACTORY_SET: "FACTORY_SET",
112 CROSS_CONNECT: "CROSS_CONNECT",
113 ADD_TRACE: "ADD_TRACE",
114 ADD_ADDRESS: "ADD_ADDRESS",
115 ADD_ROUTE: "ADD_ROUTE",
116 DO_SETUP: "DO_SETUP",
117 DO_CREATE: "DO_CREATE",
118 DO_CONNECT: "DO_CONNECT",
119 DO_CONFIGURE: "DO_CONFIGURE",
120 DO_CROSS_CONNECT: "DO_CROSS_CONNECT",
123 GET_ROUTE: "GET_ROUTE",
124 GET_ADDRESS: "GET_ADDRESS",
135 if isinstance(value, bool):
137 elif isinstance(value, int):
139 elif isinstance(value, float):
144 def set_type(type, value):
150 value = value == "True"
155 def log_msg(server, params):
156 instr = int(params[0])
157 instr_txt = instruction_text[instr]
158 server.log_debug("%s - msg: %s [%s]" % (server.__class__.__name__,
159 instr_txt, ", ".join(map(str, params[1:]))))
161 def log_reply(server, reply):
162 res = reply.split("|")
164 code_txt = instruction_text[code]
165 txt = base64.b64decode(res[1])
166 server.log_debug("%s - reply: %s %s" % (server.__class__.__name__,
169 def launch_ssh_daemon_client(root_dir, python_code, host, port, user, agent):
172 proc = server.popen_ssh_subprocess(python_code, host = host,
173 port = port, user = user, agent = agent)
175 err = proc.stderr.read()
176 raise RuntimeError("Client could not be executed: %s" % \
179 return server.Client(root_dir, host = host, port = port, user = user,
182 def to_server_log_level(log_level):
183 return server.DEBUG_LEVEL \
184 if log_level == AccessConfiguration.DEBUG_LEVEL \
185 else server.ERROR_LEVEL
187 def get_access_config_params(access_config):
188 root_dir = access_config.get_attribute_value("rootDirectory")
189 log_level = access_config.get_attribute_value("logLevel")
190 log_level = to_server_log_level(log_level)
191 user = host = port = agent = None
192 communication = access_config.get_attribute_value("communication")
193 if communication == AccessConfiguration.ACCESS_SSH:
194 user = access_config.get_attribute_value("user")
195 host = access_config.get_attribute_value("host")
196 port = access_config.get_attribute_value("port")
197 agent = access_config.get_attribute_value("useAgent")
198 return (root_dir, log_level, user, host, port, agent)
200 class AccessConfiguration(AttributesMap):
201 MODE_SINGLE_PROCESS = "SINGLE"
202 MODE_DAEMON = "DAEMON"
204 ACCESS_LOCAL = "LOCAL"
205 ERROR_LEVEL = "Error"
206 DEBUG_LEVEL = "Debug"
209 super(AccessConfiguration, self).__init__()
210 self.add_attribute(name = "mode",
211 help = "Instance execution mode",
212 type = Attribute.ENUM,
213 value = AccessConfiguration.MODE_SINGLE_PROCESS,
214 allowed = [AccessConfiguration.MODE_DAEMON,
215 AccessConfiguration.MODE_SINGLE_PROCESS],
216 validation_function = validation.is_enum)
217 self.add_attribute(name = "communication",
218 help = "Instance communication mode",
219 type = Attribute.ENUM,
220 value = AccessConfiguration.ACCESS_LOCAL,
221 allowed = [AccessConfiguration.ACCESS_LOCAL,
222 AccessConfiguration.ACCESS_SSH],
223 validation_function = validation.is_enum)
224 self.add_attribute(name = "host",
225 help = "Host where the testbed will be executed",
226 type = Attribute.STRING,
228 validation_function = validation.is_string)
229 self.add_attribute(name = "user",
230 help = "User on the Host to execute the testbed",
231 type = Attribute.STRING,
232 value = getpass.getuser(),
233 validation_function = validation.is_string)
234 self.add_attribute(name = "port",
235 help = "Port on the Host",
236 type = Attribute.INTEGER,
238 validation_function = validation.is_integer)
239 self.add_attribute(name = "rootDirectory",
240 help = "Root directory for storing process files",
241 type = Attribute.STRING,
243 validation_function = validation.is_string) # TODO: validation.is_path
244 self.add_attribute(name = "useAgent",
245 help = "Use -A option for forwarding of the authentication agent, if ssh access is used",
246 type = Attribute.BOOL,
248 validation_function = validation.is_bool)
249 self.add_attribute(name = "logLevel",
250 help = "Log level for instance",
251 type = Attribute.ENUM,
252 value = AccessConfiguration.ERROR_LEVEL,
253 allowed = [AccessConfiguration.ERROR_LEVEL,
254 AccessConfiguration.DEBUG_LEVEL],
255 validation_function = validation.is_enum)
256 self.add_attribute(name = "recover",
257 help = "Do not intantiate testbeds, rather, reconnect to already-running instances. Used to recover from a dead controller.",
258 type = Attribute.BOOL,
260 validation_function = validation.is_bool)
262 class TempDir(object):
264 self.path = tempfile.mkdtemp()
267 shutil.rmtree(self.path)
269 class PermDir(object):
270 def __init__(self, path):
273 def create_controller(xml, access_config = None):
274 mode = None if not access_config \
275 else access_config.get_attribute_value("mode")
276 launch = True if not access_config \
277 else not access_config.get_attribute_value("recover")
278 if not mode or mode == AccessConfiguration.MODE_SINGLE_PROCESS:
280 raise ValueError, "Unsupported instantiation mode: %s with lanch=False" % (mode,)
282 from nepi.core.execute import ExperimentController
284 if not access_config or not access_config.has_attribute("rootDirectory"):
287 root_dir = PermDir(access_config.get_attribute_value("rootDirectory"))
288 controller = ExperimentController(xml, root_dir.path)
290 # inject reference to temporary dir, so that it gets cleaned
291 # up at destruction time.
292 controller._tempdir = root_dir
295 elif mode == AccessConfiguration.MODE_DAEMON:
296 (root_dir, log_level, user, host, port, agent) = \
297 get_access_config_params(access_config)
298 return ExperimentControllerProxy(root_dir, log_level,
299 experiment_xml = xml, host = host, port = port, user = user,
300 agent = agent, launch = launch)
301 raise RuntimeError("Unsupported access configuration '%s'" % mode)
303 def create_testbed_instance(testbed_id, testbed_version, access_config):
304 mode = None if not access_config \
305 else access_config.get_attribute_value("mode")
306 launch = True if not access_config \
307 else not access_config.get_attribute_value("recover")
308 if not mode or mode == AccessConfiguration.MODE_SINGLE_PROCESS:
310 raise ValueError, "Unsupported instantiation mode: %s with lanch=False" % (mode,)
311 return _build_testbed_instance(testbed_id, testbed_version)
312 elif mode == AccessConfiguration.MODE_DAEMON:
313 (root_dir, log_level, user, host, port, agent) = \
314 get_access_config_params(access_config)
315 return TestbedInstanceProxy(root_dir, log_level, testbed_id = testbed_id,
316 testbed_version = testbed_version, host = host, port = port,
317 user = user, agent = agent, launch = launch)
318 raise RuntimeError("Unsupported access configuration '%s'" % mode)
320 def _build_testbed_instance(testbed_id, testbed_version):
321 mod_name = "nepi.testbeds.%s" % (testbed_id.lower())
322 if not mod_name in sys.modules:
324 module = sys.modules[mod_name]
325 return module.TestbedInstance(testbed_version)
327 class TestbedInstanceServer(server.Server):
328 def __init__(self, root_dir, log_level, testbed_id, testbed_version):
329 super(TestbedInstanceServer, self).__init__(root_dir, log_level)
330 self._testbed_id = testbed_id
331 self._testbed_version = testbed_version
334 def post_daemonize(self):
335 self._testbed = _build_testbed_instance(self._testbed_id,
336 self._testbed_version)
338 def reply_action(self, msg):
339 params = msg.split("|")
340 instruction = int(params[0])
341 log_msg(self, params)
343 if instruction == TRACE:
344 reply = self.trace(params)
345 elif instruction == START:
346 reply = self.start(params)
347 elif instruction == STOP:
348 reply = self.stop(params)
349 elif instruction == SHUTDOWN:
350 reply = self.shutdown(params)
351 elif instruction == CONFIGURE:
352 reply = self.defer_configure(params)
353 elif instruction == CREATE:
354 reply = self.defer_create(params)
355 elif instruction == CREATE_SET:
356 reply = self.defer_create_set(params)
357 elif instruction == FACTORY_SET:
358 reply = self.defer_factory_set(params)
359 elif instruction == CONNECT:
360 reply = self.defer_connect(params)
361 elif instruction == CROSS_CONNECT:
362 reply = self.defer_cross_connect(params)
363 elif instruction == ADD_TRACE:
364 reply = self.defer_add_trace(params)
365 elif instruction == ADD_ADDRESS:
366 reply = self.defer_add_address(params)
367 elif instruction == ADD_ROUTE:
368 reply = self.defer_add_route(params)
369 elif instruction == DO_SETUP:
370 reply = self.do_setup(params)
371 elif instruction == DO_CREATE:
372 reply = self.do_create(params)
373 elif instruction == DO_CONNECT:
374 reply = self.do_connect(params)
375 elif instruction == DO_CONFIGURE:
376 reply = self.do_configure(params)
377 elif instruction == DO_CROSS_CONNECT:
378 reply = self.do_cross_connect(params)
379 elif instruction == GET:
380 reply = self.get(params)
381 elif instruction == SET:
382 reply = self.set(params)
383 elif instruction == GET_ADDRESS:
384 reply = self.get_address(params)
385 elif instruction == GET_ROUTE:
386 reply = self.get_route(params)
387 elif instruction == ACTION:
388 reply = self.action(params)
389 elif instruction == STATUS:
390 reply = self.status(params)
391 elif instruction == GUIDS:
392 reply = self.guids(params)
394 error = "Invalid instruction %s" % instruction
395 self.log_error(error)
396 result = base64.b64encode(error)
397 reply = "%d|%s" % (ERROR, result)
399 error = self.log_error()
400 result = base64.b64encode(error)
401 reply = "%d|%s" % (ERROR, result)
402 log_reply(self, reply)
405 def guids(self, params):
406 guids = self._testbed.guids
407 guids = ",".join(map(str, guids))
408 result = base64.b64encode(guids)
409 return "%d|%s" % (OK, result)
411 def defer_create(self, params):
412 guid = int(params[1])
413 factory_id = params[2]
414 self._testbed.defer_create(guid, factory_id)
415 return "%d|%s" % (OK, "")
417 def trace(self, params):
418 guid = int(params[1])
420 attribute = base64.b64decode(params[3])
421 trace = self._testbed.trace(guid, trace_id, attribute)
422 result = base64.b64encode(trace)
423 return "%d|%s" % (OK, result)
425 def start(self, params):
426 self._testbed.start()
427 return "%d|%s" % (OK, "")
429 def stop(self, params):
431 return "%d|%s" % (OK, "")
433 def shutdown(self, params):
434 self._testbed.shutdown()
435 return "%d|%s" % (OK, "")
437 def defer_configure(self, params):
438 name = base64.b64decode(params[1])
439 value = base64.b64decode(params[2])
440 type = int(params[3])
441 value = set_type(type, value)
442 self._testbed.defer_configure(name, value)
443 return "%d|%s" % (OK, "")
445 def defer_create_set(self, params):
446 guid = int(params[1])
447 name = base64.b64decode(params[2])
448 value = base64.b64decode(params[3])
449 type = int(params[4])
450 value = set_type(type, value)
451 self._testbed.defer_create_set(guid, name, value)
452 return "%d|%s" % (OK, "")
454 def defer_factory_set(self, params):
455 name = base64.b64decode(params[1])
456 value = base64.b64decode(params[2])
457 type = int(params[3])
458 value = set_type(type, value)
459 self._testbed.defer_factory_set(name, value)
460 return "%d|%s" % (OK, "")
462 def defer_connect(self, params):
463 guid1 = int(params[1])
464 connector_type_name1 = params[2]
465 guid2 = int(params[3])
466 connector_type_name2 = params[4]
467 self._testbed.defer_connect(guid1, connector_type_name1, guid2,
468 connector_type_name2)
469 return "%d|%s" % (OK, "")
471 def defer_cross_connect(self, params):
472 guid = int(params[1])
473 connector_type_name = params[2]
474 cross_guid = int(params[3])
475 connector_type_name = params[4]
476 cross_guid = int(params[5])
477 cross_testbed_id = params[6]
478 cross_factory_id = params[7]
479 cross_connector_type_name = params[8]
480 self._testbed.defer_cross_connect(guid, connector_type_name, cross_guid,
481 cross_testbed_id, cross_factory_id, cross_connector_type_name)
482 return "%d|%s" % (OK, "")
484 def defer_add_trace(self, params):
485 guid = int(params[1])
487 self._testbed.defer_add_trace(guid, trace_id)
488 return "%d|%s" % (OK, "")
490 def defer_add_address(self, params):
491 guid = int(params[1])
493 netprefix = int(params[3])
494 broadcast = params[4]
495 self._testbed.defer_add_address(guid, address, netprefix,
497 return "%d|%s" % (OK, "")
499 def defer_add_route(self, params):
500 guid = int(params[1])
501 destination = params[2]
502 netprefix = int(params[3])
504 self._testbed.defer_add_route(guid, destination, netprefix, nexthop)
505 return "%d|%s" % (OK, "")
507 def do_setup(self, params):
508 self._testbed.do_setup()
509 return "%d|%s" % (OK, "")
511 def do_create(self, params):
512 self._testbed.do_create()
513 return "%d|%s" % (OK, "")
515 def do_connect(self, params):
516 self._testbed.do_connect()
517 return "%d|%s" % (OK, "")
519 def do_configure(self, params):
520 self._testbed.do_configure()
521 return "%d|%s" % (OK, "")
523 def do_cross_connect(self, params):
524 self._testbed.do_cross_connect()
525 return "%d|%s" % (OK, "")
527 def get(self, params):
529 guid = int(param[2] )
530 name = base64.b64decode(params[3])
531 value = self._testbed.get(time, guid, name)
532 result = base64.b64encode(str(value))
533 return "%d|%s" % (OK, result)
535 def set(self, params):
537 guid = int(params[2])
538 name = base64.b64decode(params[3])
539 value = base64.b64decode(params[4])
540 type = int(params[3])
541 value = set_type(type, value)
542 self._testbed.set(time, guid, name, value)
543 return "%d|%s" % (OK, "")
545 def get_address(self, params):
547 index = int(param[2])
548 attribute = base64.b64decode(param[3])
549 value = self._testbed.get_address(guid, index, attribute)
550 result = base64.b64encode(str(value))
551 return "%d|%s" % (OK, result)
553 def get_route(self, params):
555 index = int(param[2])
556 attribute = base64.b64decode(param[3])
557 value = self._testbed.get_route(guid, index, attribute)
558 result = base64.b64encode(str(value))
559 return "%d|%s" % (OK, result)
561 def action(self, params):
563 guid = int(params[2])
564 command = base64.b64decode(params[3])
565 self._testbed.action(time, guid, command)
566 return "%d|%s" % (OK, "")
568 def status(self, params):
569 guid = int(params[1])
570 status = self._testbed.status(guid)
571 result = base64.b64encode(str(status))
572 return "%d|%s" % (OK, result)
574 class ExperimentControllerServer(server.Server):
575 def __init__(self, root_dir, log_level, experiment_xml):
576 super(ExperimentControllerServer, self).__init__(root_dir, log_level)
577 self._experiment_xml = experiment_xml
578 self._controller = None
580 def post_daemonize(self):
581 from nepi.core.execute import ExperimentController
582 self._controller = ExperimentController(self._experiment_xml,
583 root_dir = self._root_dir)
585 def reply_action(self, msg):
587 result = base64.b64encode("Invalid command line")
588 reply = "%d|%s" % (ERROR, result)
590 params = msg.split("|")
591 instruction = int(params[0])
592 log_msg(self, params)
594 if instruction == XML:
595 reply = self.experiment_xml(params)
596 elif instruction == ACCESS:
597 reply = self.set_access_configuration(params)
598 elif instruction == TRACE:
599 reply = self.trace(params)
600 elif instruction == FINISHED:
601 reply = self.is_finished(params)
602 elif instruction == START:
603 reply = self.start(params)
604 elif instruction == STOP:
605 reply = self.stop(params)
606 elif instruction == RECOVER:
607 reply = self.recover(params)
608 elif instruction == SHUTDOWN:
609 reply = self.shutdown(params)
611 error = "Invalid instruction %s" % instruction
612 self.log_error(error)
613 result = base64.b64encode(error)
614 reply = "%d|%s" % (ERROR, result)
616 error = self.log_error()
617 result = base64.b64encode(error)
618 reply = "%d|%s" % (ERROR, result)
619 log_reply(self, reply)
622 def experiment_xml(self, params):
623 xml = self._controller.experiment_xml
624 result = base64.b64encode(xml)
625 return "%d|%s" % (OK, result)
627 def set_access_configuration(self, params):
628 testbed_guid = int(params[1])
630 communication = params[3]
633 port = int(params[6])
635 use_agent = params[8] == "True"
636 log_level = params[9]
637 access_config = AccessConfiguration()
638 access_config.set_attribute_value("mode", mode)
639 access_config.set_attribute_value("communication", communication)
640 access_config.set_attribute_value("host", host)
641 access_config.set_attribute_value("user", user)
642 access_config.set_attribute_value("port", port)
643 access_config.set_attribute_value("rootDirectory", root_dir)
644 access_config.set_attribute_value("useAgent", use_agent)
645 access_config.set_attribute_value("logLevel", log_level)
646 self._controller.set_access_configuration(testbed_guid,
648 return "%d|%s" % (OK, "")
650 def trace(self, params):
651 testbed_guid = int(params[1])
652 guid = int(params[2])
654 attribute = base64.b64decode(params[4])
655 trace = self._controller.trace(testbed_guid, guid, trace_id, attribute)
656 result = base64.b64encode(trace)
657 return "%d|%s" % (OK, result)
659 def is_finished(self, params):
660 guid = int(params[1])
661 status = self._controller.is_finished(guid)
662 result = base64.b64encode(str(status))
663 return "%d|%s" % (OK, result)
665 def start(self, params):
666 self._controller.start()
667 return "%d|%s" % (OK, "")
669 def stop(self, params):
670 self._controller.stop()
671 return "%d|%s" % (OK, "")
673 def recover(self, params):
674 self._controller.recover()
675 return "%d|%s" % (OK, "")
677 def shutdown(self, params):
678 self._controller.shutdown()
679 return "%d|%s" % (OK, "")
681 class TestbedInstanceProxy(object):
682 def __init__(self, root_dir, log_level, testbed_id = None,
683 testbed_version = None, launch = True, host = None,
684 port = None, user = None, agent = None):
686 if testbed_id == None or testbed_version == None:
687 raise RuntimeError("To launch a TesbedInstance server a \
688 testbed_id and testbed_version are required")
691 python_code = "from nepi.util.proxy import \
692 TesbedInstanceServer;\
693 s = TestbedInstanceServer('%s', %d, '%s', '%s');\
694 s.run()" % (root_dir, log_level, testbed_id,
696 self._client = launch_ssh_daemon_client(root_dir, python_code,
697 host, port, user, agent)
700 s = TestbedInstanceServer(root_dir, log_level, testbed_id,
704 self._client = server.Client(root_dir)
706 # attempt to reconnect
708 self._client = launch_ssh_daemon_client(root_dir, None,
709 host, port, user, agent)
711 self._client = server.Client(root_dir)
715 msg = testbed_messages[GUIDS]
716 self._client.send_msg(msg)
717 reply = self._client.read_reply()
718 result = reply.split("|")
719 code = int(result[0])
720 text = base64.b64decode(result[1])
722 raise RuntimeError(text)
723 return map(int, text.split(","))
725 def defer_configure(self, name, value):
726 msg = testbed_messages[CONFIGURE]
727 type = get_type(value)
728 # avoid having "|" in this parameters
729 name = base64.b64encode(name)
730 value = base64.b64encode(str(value))
731 msg = msg % (name, value, type)
732 self._client.send_msg(msg)
733 reply = self._client.read_reply()
734 result = reply.split("|")
735 code = int(result[0])
736 text = base64.b64decode(result[1])
738 raise RuntimeError(text)
740 def defer_create(self, guid, factory_id):
741 msg = testbed_messages[CREATE]
742 msg = msg % (guid, factory_id)
743 self._client.send_msg(msg)
744 reply = self._client.read_reply()
745 result = reply.split("|")
746 code = int(result[0])
747 text = base64.b64decode(result[1])
749 raise RuntimeError(text)
751 def defer_create_set(self, guid, name, value):
752 msg = testbed_messages[CREATE_SET]
753 type = get_type(value)
754 # avoid having "|" in this parameters
755 name = base64.b64encode(name)
756 value = base64.b64encode(str(value))
757 msg = msg % (guid, name, value, type)
758 self._client.send_msg(msg)
759 reply = self._client.read_reply()
760 result = reply.split("|")
761 code = int(result[0])
762 text = base64.b64decode(result[1])
764 raise RuntimeError(text)
766 def defer_factory_set(self, guid, name, value):
767 msg = testbed_messages[FACTORY_SET]
768 type = get_type(value)
769 # avoid having "|" in this parameters
770 name = base64.b64encode(name)
771 value = base64.b64encode(str(value))
772 msg = msg % (guid, name, value, type)
773 self._client.send_msg(msg)
774 reply = self._client.read_reply()
775 result = reply.split("|")
776 code = int(result[0])
777 text = base64.b64decode(result[1])
779 raise RuntimeError(text)
781 def defer_connect(self, guid1, connector_type_name1, guid2,
782 connector_type_name2):
783 msg = testbed_messages[CONNECT]
784 msg = msg % (guid1, connector_type_name1, guid2,
785 connector_type_name2)
786 self._client.send_msg(msg)
787 reply = self._client.read_reply()
788 result = reply.split("|")
789 code = int(result[0])
790 text = base64.b64decode(result[1])
792 raise RuntimeError(text)
794 def defer_cross_connect(self, guid, connector_type_name, cross_guid,
795 cross_testbed_id, cross_factory_id, cross_connector_type_name):
796 msg = testbed_messages[CROSS_CONNECT]
797 msg = msg % (guid, connector_type_name, cross_guid,
798 cross_testbed_id, cross_factory_id, cross_connector_type_name)
799 self._client.send_msg(msg)
800 reply = self._client.read_reply()
801 result = reply.split("|")
802 code = int(result[0])
803 text = base64.b64decode(result[1])
805 raise RuntimeError(text)
807 def defer_add_trace(self, guid, trace_id):
808 msg = testbed_messages[ADD_TRACE]
809 msg = msg % (guid, trace_id)
810 self._client.send_msg(msg)
811 reply = self._client.read_reply()
812 result = reply.split("|")
813 code = int(result[0])
814 text = base64.b64decode(result[1])
816 raise RuntimeError(text)
818 def defer_add_address(self, guid, address, netprefix, broadcast):
819 msg = testbed_messages[ADD_ADDRESS]
820 msg = msg % (guid, address, netprefix, broadcast)
821 self._client.send_msg(msg)
822 reply = self._client.read_reply()
823 result = reply.split("|")
824 code = int(result[0])
825 text = base64.b64decode(result[1])
827 raise RuntimeError(text)
829 def defer_add_route(self, guid, destination, netprefix, nexthop):
830 msg = testbed_messages[ADD_ROUTE]
831 msg = msg % (guid, destination, netprefix, nexthop)
832 self._client.send_msg(msg)
833 reply = self._client.read_reply()
834 result = reply.split("|")
835 code = int(result[0])
836 text = base64.b64decode(result[1])
838 raise RuntimeError(text)
841 msg = testbed_messages[DO_SETUP]
842 self._client.send_msg(msg)
843 reply = self._client.read_reply()
844 result = reply.split("|")
845 code = int(result[0])
846 text = base64.b64decode(result[1])
848 raise RuntimeError(text)
851 msg = testbed_messages[DO_CREATE]
852 self._client.send_msg(msg)
853 reply = self._client.read_reply()
854 result = reply.split("|")
855 code = int(result[0])
856 text = base64.b64decode(result[1])
858 raise RuntimeError(text)
860 def do_connect(self):
861 msg = testbed_messages[DO_CONNECT]
862 self._client.send_msg(msg)
863 reply = self._client.read_reply()
864 result = reply.split("|")
865 code = int(result[0])
866 text = base64.b64decode(result[1])
868 raise RuntimeError(text)
870 def do_configure(self):
871 msg = testbed_messages[DO_CONFIGURE]
872 self._client.send_msg(msg)
873 reply = self._client.read_reply()
874 result = reply.split("|")
875 code = int(result[0])
876 text = base64.b64decode(result[1])
878 raise RuntimeError(text)
880 def do_cross_connect(self):
881 msg = testbed_messages[DO_CROSS_CONNECT]
882 self._client.send_msg(msg)
883 reply = self._client.read_reply()
884 result = reply.split("|")
885 code = int(result[0])
886 text = base64.b64decode(result[1])
888 raise RuntimeError(text)
890 def start(self, time = TIME_NOW):
891 msg = testbed_messages[START]
892 self._client.send_msg(msg)
893 reply = self._client.read_reply()
894 result = reply.split("|")
895 code = int(result[0])
896 text = base64.b64decode(result[1])
898 raise RuntimeError(text)
900 def stop(self, time = TIME_NOW):
901 msg = testbed_messages[STOP]
902 self._client.send_msg(msg)
903 reply = self._client.read_reply()
904 result = reply.split("|")
905 code = int(result[0])
906 text = base64.b64decode(result[1])
908 raise RuntimeError(text)
910 def set(self, time, guid, name, value):
911 msg = testbed_messages[SET]
912 type = get_type(value)
913 # avoid having "|" in this parameters
914 name = base64.b64encode(name)
915 value = base64.b64encode(str(value))
916 msg = msg % (time, guid, name, value, type)
917 self._client.send_msg(msg)
918 reply = self._client.read_reply()
919 result = reply.split("|")
920 code = int(result[0])
921 text = base64.b64decode(result[1])
923 raise RuntimeError(text)
925 def get(self, time, guid, name):
926 msg = testbed_messages[GET]
927 # avoid having "|" in this parameters
928 name = base64.b64encode(name)
929 msg = msg % (time, guid, name)
930 self._client.send_msg(msg)
931 reply = self._client.read_reply()
932 result = reply.split("|")
933 code = int(result[0])
934 text = base64.b64decode(result[1])
936 raise RuntimeError(text)
939 def get_address(self, guid, index, attribute):
940 msg = testbed_messages[GET_ADDRESS]
941 # avoid having "|" in this parameters
942 attribute = base64.b64encode(attribute)
943 msg = msg % (guid, index, attribute)
944 self._client.send_msg(msg)
945 reply = self._client.read_reply()
946 result = reply.split("|")
947 code = int(result[0])
948 text = base64.b64decode(result[1])
950 raise RuntimeError(text)
953 def get_route(self, guid, index, attribute):
954 msg = testbed_messages[GET_ROUTE]
955 # avoid having "|" in this parameters
956 attribute = base64.b64encode(attribute)
957 msg = msg % (guid, index, attribute)
958 self._client.send_msg(msg)
959 reply = self._client.read_reply()
960 result = reply.split("|")
961 code = int(result[0])
962 text = base64.b64decode(result[1])
964 raise RuntimeError(text)
967 def action(self, time, guid, action):
968 msg = testbed_messages[ACTION]
969 msg = msg % (time, guid, action)
970 self._client.send_msg(msg)
971 reply = self._client.read_reply()
972 result = reply.split("|")
973 code = int(result[0])
974 text = base64.b64decode(result[1])
976 raise RuntimeError(text)
978 def status(self, guid):
979 msg = testbed_messages[STATUS]
981 self._client.send_msg(msg)
982 reply = self._client.read_reply()
983 result = reply.split("|")
984 code = int(result[0])
985 text = base64.b64decode(result[1])
987 raise RuntimeError(text)
990 def trace(self, guid, trace_id, attribute='value'):
991 msg = testbed_messages[TRACE]
992 attribute = base64.b64encode(attribute)
993 msg = msg % (guid, trace_id, attribute)
994 self._client.send_msg(msg)
995 reply = self._client.read_reply()
996 result = reply.split("|")
997 code = int(result[0])
998 text = base64.b64decode(result[1])
1000 raise RuntimeError(text)
1004 msg = testbed_messages[SHUTDOWN]
1005 self._client.send_msg(msg)
1006 reply = self._client.read_reply()
1007 result = reply.split("|")
1008 code = int(result[0])
1009 text = base64.b64decode(result[1])
1011 raise RuntimeError(text)
1012 self._client.send_stop()
1013 self._client.read_reply() # wait for it
1015 class ExperimentControllerProxy(object):
1016 def __init__(self, root_dir, log_level, experiment_xml = None,
1017 launch = True, host = None, port = None, user = None,
1021 if experiment_xml == None:
1022 raise RuntimeError("To launch a ExperimentControllerServer a \
1023 xml description of the experiment is required")
1026 xml = experiment_xml
1027 xml = xml.replace("'", r"\'")
1028 xml = xml.replace("\"", r"\'")
1029 xml = xml.replace("\n", r"")
1030 python_code = "from nepi.util.proxy import ExperimentControllerServer;\
1031 s = ExperimentControllerServer(%r, %r, %r);\
1032 s.run()" % (root_dir, log_level, xml)
1033 self._client = launch_ssh_daemon_client(root_dir, python_code,
1034 host, port, user, agent)
1037 s = ExperimentControllerServer(root_dir, log_level, experiment_xml)
1040 self._client = server.Client(root_dir)
1042 # attempt to reconnect
1044 self._client = launch_ssh_daemon_client(root_dir, None,
1045 host, port, user, agent)
1047 self._client = server.Client(root_dir)
1050 def experiment_xml(self):
1051 msg = controller_messages[XML]
1052 self._client.send_msg(msg)
1053 reply = self._client.read_reply()
1054 result = reply.split("|")
1055 code = int(result[0])
1056 text = base64.b64decode(result[1])
1058 raise RuntimeError(text)
1061 def set_access_configuration(self, testbed_guid, access_config):
1062 mode = access_config.get_attribute_value("mode")
1063 communication = access_config.get_attribute_value("communication")
1064 host = access_config.get_attribute_value("host")
1065 user = access_config.get_attribute_value("user")
1066 port = access_config.get_attribute_value("port")
1067 root_dir = access_config.get_attribute_value("rootDirectory")
1068 use_agent = access_config.get_attribute_value("useAgent")
1069 log_level = access_config.get_attribute_value("logLevel")
1070 msg = controller_messages[ACCESS]
1071 msg = msg % (testbed_guid, mode, communication, host, user, port,
1072 root_dir, use_agent, log_level)
1073 self._client.send_msg(msg)
1074 reply = self._client.read_reply()
1075 result = reply.split("|")
1076 code = int(result[0])
1077 text = base64.b64decode(result[1])
1079 raise RuntimeError(text)
1081 def trace(self, testbed_guid, guid, trace_id, attribute='value'):
1082 msg = controller_messages[TRACE]
1083 attribute = base64.b64encode(attribute)
1084 msg = msg % (testbed_guid, guid, trace_id, attribute)
1085 self._client.send_msg(msg)
1086 reply = self._client.read_reply()
1087 result = reply.split("|")
1088 code = int(result[0])
1089 text = base64.b64decode(result[1])
1092 raise RuntimeError(text)
1095 msg = controller_messages[START]
1096 self._client.send_msg(msg)
1097 reply = self._client.read_reply()
1098 result = reply.split("|")
1099 code = int(result[0])
1100 text = base64.b64decode(result[1])
1102 raise RuntimeError(text)
1105 msg = controller_messages[STOP]
1106 self._client.send_msg(msg)
1107 reply = self._client.read_reply()
1108 result = reply.split("|")
1109 code = int(result[0])
1110 text = base64.b64decode(result[1])
1112 raise RuntimeError(text)
1115 msg = controller_messages[RECOVER]
1116 self._client.send_msg(msg)
1117 reply = self._client.read_reply()
1118 result = reply.split("|")
1119 code = int(result[0])
1120 text = base64.b64decode(result[1])
1122 raise RuntimeError(text)
1124 def is_finished(self, guid):
1125 msg = controller_messages[FINISHED]
1127 self._client.send_msg(msg)
1128 reply = self._client.read_reply()
1129 result = reply.split("|")
1130 code = int(result[0])
1131 text = base64.b64decode(result[1])
1133 raise RuntimeError(text)
1134 return text == "True"
1137 msg = controller_messages[SHUTDOWN]
1138 self._client.send_msg(msg)
1139 reply = self._client.read_reply()
1140 result = reply.split("|")
1141 code = int(result[0])
1142 text = base64.b64decode(result[1])
1144 raise RuntimeError(text)
1145 self._client.send_stop()
1146 self._client.read_reply() # wait for it