2 # -*- coding: utf-8 -*-
5 from nepi.core.attributes import AttributesMap, Attribute
6 from nepi.util import server, validation
7 from nepi.util.constants import TIME_NOW
18 # PROTOCOL INSTRUCTION MESSAGES
55 # EXPERIMENT CONTROLER PROTOCOL MESSAGES
56 controller_messages = dict({
58 ACCESS: "%d|%s" % (ACCESS, "%d|%s|%s|%s|%s|%d|%s|%r|%s"),
59 TRACE: "%d|%s" % (TRACE, "%d|%d|%s|%s"),
60 FINISHED: "%d|%s" % (FINISHED, "%d"),
63 RECOVER : "%d" % RECOVER,
64 SHUTDOWN: "%d" % SHUTDOWN,
67 # TESTBED INSTANCE PROTOCOL MESSAGES
68 testbed_messages = dict({
69 TRACE: "%d|%s" % (TRACE, "%d|%s|%s"),
72 SHUTDOWN: "%d" % SHUTDOWN,
73 CONFIGURE: "%d|%s" % (CONFIGURE, "%s|%s|%d"),
74 CREATE: "%d|%s" % (CREATE, "%d|%s"),
75 CREATE_SET: "%d|%s" % (CREATE_SET, "%d|%s|%s|%d"),
76 FACTORY_SET: "%d|%s" % (FACTORY_SET, "%d|%s|%s|%d"),
77 CONNECT: "%d|%s" % (CONNECT, "%d|%s|%d|%s"),
78 CROSS_CONNECT: "%d|%s" % (CROSS_CONNECT, "%d|%s|%d|%d|%s|%s"),
79 ADD_TRACE: "%d|%s" % (ADD_TRACE, "%d|%s"),
80 ADD_ADDRESS: "%d|%s" % (ADD_ADDRESS, "%d|%s|%d|%s"),
81 ADD_ROUTE: "%d|%s" % (ADD_ROUTE, "%d|%s|%d|%s"),
82 DO_SETUP: "%d" % DO_SETUP,
83 DO_CREATE: "%d" % DO_CREATE,
84 DO_CONNECT: "%d" % DO_CONNECT,
85 DO_CONFIGURE: "%d" % DO_CONFIGURE,
86 DO_CROSS_CONNECT: "%d" % DO_CROSS_CONNECT,
87 GET: "%d|%s" % (GET, "%s|%d|%s"),
88 SET: "%d|%s" % (SET, "%s|%d|%s|%s|%d"),
89 GET_ROUTE: "%d|%s" % (GET, "%d|%d|%s"),
90 GET_ADDRESS: "%d|%s" % (GET, "%d|%d|%s"),
91 ACTION: "%d|%s" % (ACTION, "%s|%d|%s"),
92 STATUS: "%d|%s" % (STATUS, "%d"),
96 instruction_text = dict({
102 FINISHED: "FINISHED",
106 SHUTDOWN: "SHUTDOWN",
107 CONFIGURE: "CONFIGURE",
109 CREATE_SET: "CREATE_SET",
110 FACTORY_SET: "FACTORY_SET",
112 CROSS_CONNECT: "CROSS_CONNECT",
113 ADD_TRACE: "ADD_TRACE",
114 ADD_ADDRESS: "ADD_ADDRESS",
115 ADD_ROUTE: "ADD_ROUTE",
116 DO_SETUP: "DO_SETUP",
117 DO_CREATE: "DO_CREATE",
118 DO_CONNECT: "DO_CONNECT",
119 DO_CONFIGURE: "DO_CONFIGURE",
120 DO_CROSS_CONNECT: "DO_CROSS_CONNECT",
123 GET_ROUTE: "GET_ROUTE",
124 GET_ADDRESS: "GET_ADDRESS",
135 if isinstance(value, bool):
137 elif isinstance(value, int):
139 elif isinstance(value, float):
144 def set_type(type, value):
150 value = value == "True"
155 def log_msg(server, params):
156 instr = int(params[0])
157 instr_txt = instruction_text[instr]
158 server.log_debug("%s - msg: %s [%s]" % (server.__class__.__name__,
159 instr_txt, ", ".join(map(str, params[1:]))))
161 def log_reply(server, reply):
162 res = reply.split("|")
164 code_txt = instruction_text[code]
165 txt = base64.b64decode(res[1])
166 server.log_debug("%s - reply: %s %s" % (server.__class__.__name__,
169 def to_server_log_level(log_level):
170 return server.DEBUG_LEVEL \
171 if log_level == AccessConfiguration.DEBUG_LEVEL \
172 else server.ERROR_LEVEL
174 def get_access_config_params(access_config):
175 root_dir = access_config.get_attribute_value("rootDirectory")
176 log_level = access_config.get_attribute_value("logLevel")
177 log_level = to_server_log_level(log_level)
178 user = host = port = agent = None
179 communication = access_config.get_attribute_value("communication")
180 if communication == AccessConfiguration.ACCESS_SSH:
181 user = access_config.get_attribute_value("user")
182 host = access_config.get_attribute_value("host")
183 port = access_config.get_attribute_value("port")
184 agent = access_config.get_attribute_value("useAgent")
185 return (root_dir, log_level, user, host, port, agent)
187 class AccessConfiguration(AttributesMap):
188 MODE_SINGLE_PROCESS = "SINGLE"
189 MODE_DAEMON = "DAEMON"
191 ACCESS_LOCAL = "LOCAL"
192 ERROR_LEVEL = "Error"
193 DEBUG_LEVEL = "Debug"
196 super(AccessConfiguration, self).__init__()
197 self.add_attribute(name = "mode",
198 help = "Instance execution mode",
199 type = Attribute.ENUM,
200 value = AccessConfiguration.MODE_SINGLE_PROCESS,
201 allowed = [AccessConfiguration.MODE_DAEMON,
202 AccessConfiguration.MODE_SINGLE_PROCESS],
203 validation_function = validation.is_enum)
204 self.add_attribute(name = "communication",
205 help = "Instance communication mode",
206 type = Attribute.ENUM,
207 value = AccessConfiguration.ACCESS_LOCAL,
208 allowed = [AccessConfiguration.ACCESS_LOCAL,
209 AccessConfiguration.ACCESS_SSH],
210 validation_function = validation.is_enum)
211 self.add_attribute(name = "host",
212 help = "Host where the testbed will be executed",
213 type = Attribute.STRING,
215 validation_function = validation.is_string)
216 self.add_attribute(name = "user",
217 help = "User on the Host to execute the testbed",
218 type = Attribute.STRING,
219 value = getpass.getuser(),
220 validation_function = validation.is_string)
221 self.add_attribute(name = "port",
222 help = "Port on the Host",
223 type = Attribute.INTEGER,
225 validation_function = validation.is_integer)
226 self.add_attribute(name = "rootDirectory",
227 help = "Root directory for storing process files",
228 type = Attribute.STRING,
230 validation_function = validation.is_string) # TODO: validation.is_path
231 self.add_attribute(name = "useAgent",
232 help = "Use -A option for forwarding of the authentication agent, if ssh access is used",
233 type = Attribute.BOOL,
235 validation_function = validation.is_bool)
236 self.add_attribute(name = "logLevel",
237 help = "Log level for instance",
238 type = Attribute.ENUM,
239 value = AccessConfiguration.ERROR_LEVEL,
240 allowed = [AccessConfiguration.ERROR_LEVEL,
241 AccessConfiguration.DEBUG_LEVEL],
242 validation_function = validation.is_enum)
243 self.add_attribute(name = "recover",
244 help = "Do not intantiate testbeds, rather, reconnect to already-running instances. Used to recover from a dead controller.",
245 type = Attribute.BOOL,
247 validation_function = validation.is_bool)
249 class TempDir(object):
251 self.path = tempfile.mkdtemp()
254 shutil.rmtree(self.path)
256 class PermDir(object):
257 def __init__(self, path):
260 def create_controller(xml, access_config = None):
261 mode = None if not access_config \
262 else access_config.get_attribute_value("mode")
263 launch = True if not access_config \
264 else not access_config.get_attribute_value("recover")
265 if not mode or mode == AccessConfiguration.MODE_SINGLE_PROCESS:
267 raise ValueError, "Unsupported instantiation mode: %s with lanch=False" % (mode,)
269 from nepi.core.execute import ExperimentController
271 if not access_config or not access_config.has_attribute("rootDirectory"):
274 root_dir = PermDir(access_config.get_attribute_value("rootDirectory"))
275 controller = ExperimentController(xml, root_dir.path)
277 # inject reference to temporary dir, so that it gets cleaned
278 # up at destruction time.
279 controller._tempdir = root_dir
282 elif mode == AccessConfiguration.MODE_DAEMON:
283 (root_dir, log_level, user, host, port, agent) = \
284 get_access_config_params(access_config)
285 return ExperimentControllerProxy(root_dir, log_level,
286 experiment_xml = xml, host = host, port = port, user = user,
287 agent = agent, launch = launch)
288 raise RuntimeError("Unsupported access configuration '%s'" % mode)
290 def create_testbed_instance(testbed_id, testbed_version, access_config):
291 mode = None if not access_config \
292 else access_config.get_attribute_value("mode")
293 launch = True if not access_config \
294 else not access_config.get_attribute_value("recover")
295 if not mode or mode == AccessConfiguration.MODE_SINGLE_PROCESS:
297 raise ValueError, "Unsupported instantiation mode: %s with lanch=False" % (mode,)
298 return _build_testbed_instance(testbed_id, testbed_version)
299 elif mode == AccessConfiguration.MODE_DAEMON:
300 (root_dir, log_level, user, host, port, agent) = \
301 get_access_config_params(access_config)
302 return TestbedInstanceProxy(root_dir, log_level, testbed_id = testbed_id,
303 testbed_version = testbed_version, host = host, port = port,
304 user = user, agent = agent, launch = launch)
305 raise RuntimeError("Unsupported access configuration '%s'" % mode)
307 def _build_testbed_instance(testbed_id, testbed_version):
308 mod_name = "nepi.testbeds.%s" % (testbed_id.lower())
309 if not mod_name in sys.modules:
311 module = sys.modules[mod_name]
312 return module.TestbedInstance(testbed_version)
314 class TestbedInstanceServer(server.Server):
315 def __init__(self, root_dir, log_level, testbed_id, testbed_version):
316 super(TestbedInstanceServer, self).__init__(root_dir, log_level)
317 self._testbed_id = testbed_id
318 self._testbed_version = testbed_version
321 def post_daemonize(self):
322 self._testbed = _build_testbed_instance(self._testbed_id,
323 self._testbed_version)
325 def reply_action(self, msg):
327 result = base64.b64encode("Invalid command line")
328 reply = "%d|%s" % (ERROR, result)
330 params = msg.split("|")
331 instruction = int(params[0])
332 log_msg(self, params)
334 if instruction == TRACE:
335 reply = self.trace(params)
336 elif instruction == START:
337 reply = self.start(params)
338 elif instruction == STOP:
339 reply = self.stop(params)
340 elif instruction == SHUTDOWN:
341 reply = self.shutdown(params)
342 elif instruction == CONFIGURE:
343 reply = self.defer_configure(params)
344 elif instruction == CREATE:
345 reply = self.defer_create(params)
346 elif instruction == CREATE_SET:
347 reply = self.defer_create_set(params)
348 elif instruction == FACTORY_SET:
349 reply = self.defer_factory_set(params)
350 elif instruction == CONNECT:
351 reply = self.defer_connect(params)
352 elif instruction == CROSS_CONNECT:
353 reply = self.defer_cross_connect(params)
354 elif instruction == ADD_TRACE:
355 reply = self.defer_add_trace(params)
356 elif instruction == ADD_ADDRESS:
357 reply = self.defer_add_address(params)
358 elif instruction == ADD_ROUTE:
359 reply = self.defer_add_route(params)
360 elif instruction == DO_SETUP:
361 reply = self.do_setup(params)
362 elif instruction == DO_CREATE:
363 reply = self.do_create(params)
364 elif instruction == DO_CONNECT:
365 reply = self.do_connect(params)
366 elif instruction == DO_CONFIGURE:
367 reply = self.do_configure(params)
368 elif instruction == DO_CROSS_CONNECT:
369 reply = self.do_cross_connect(params)
370 elif instruction == GET:
371 reply = self.get(params)
372 elif instruction == SET:
373 reply = self.set(params)
374 elif instruction == GET_ADDRESS:
375 reply = self.get_address(params)
376 elif instruction == GET_ROUTE:
377 reply = self.get_route(params)
378 elif instruction == ACTION:
379 reply = self.action(params)
380 elif instruction == STATUS:
381 reply = self.status(params)
382 elif instruction == GUIDS:
383 reply = self.guids(params)
385 error = "Invalid instruction %s" % instruction
386 self.log_error(error)
387 result = base64.b64encode(error)
388 reply = "%d|%s" % (ERROR, result)
390 error = self.log_error()
391 result = base64.b64encode(error)
392 reply = "%d|%s" % (ERROR, result)
393 log_reply(self, reply)
396 def guids(self, params):
397 guids = self._testbed.guids
398 guids = ",".join(map(str, guids))
399 result = base64.b64encode(guids)
400 return "%d|%s" % (OK, result)
402 def defer_create(self, params):
403 guid = int(params[1])
404 factory_id = params[2]
405 self._testbed.defer_create(guid, factory_id)
406 return "%d|%s" % (OK, "")
408 def trace(self, params):
409 guid = int(params[1])
411 attribute = base64.b64decode(params[3])
412 trace = self._testbed.trace(guid, trace_id, attribute)
413 result = base64.b64encode(trace)
414 return "%d|%s" % (OK, result)
416 def start(self, params):
417 self._testbed.start()
418 return "%d|%s" % (OK, "")
420 def stop(self, params):
422 return "%d|%s" % (OK, "")
424 def shutdown(self, params):
425 self._testbed.shutdown()
426 return "%d|%s" % (OK, "")
428 def defer_configure(self, params):
429 name = base64.b64decode(params[1])
430 value = base64.b64decode(params[2])
431 type = int(params[3])
432 value = set_type(type, value)
433 self._testbed.defer_configure(name, value)
434 return "%d|%s" % (OK, "")
436 def defer_create_set(self, params):
437 guid = int(params[1])
438 name = base64.b64decode(params[2])
439 value = base64.b64decode(params[3])
440 type = int(params[4])
441 value = set_type(type, value)
442 self._testbed.defer_create_set(guid, name, value)
443 return "%d|%s" % (OK, "")
445 def defer_factory_set(self, params):
446 name = base64.b64decode(params[1])
447 value = base64.b64decode(params[2])
448 type = int(params[3])
449 value = set_type(type, value)
450 self._testbed.defer_factory_set(name, value)
451 return "%d|%s" % (OK, "")
453 def defer_connect(self, params):
454 guid1 = int(params[1])
455 connector_type_name1 = params[2]
456 guid2 = int(params[3])
457 connector_type_name2 = params[4]
458 self._testbed.defer_connect(guid1, connector_type_name1, guid2,
459 connector_type_name2)
460 return "%d|%s" % (OK, "")
462 def defer_cross_connect(self, params):
463 guid = int(params[1])
464 connector_type_name = params[2]
465 cross_guid = int(params[3])
466 connector_type_name = params[4]
467 cross_guid = int(params[5])
468 cross_testbed_id = params[6]
469 cross_factory_id = params[7]
470 cross_connector_type_name = params[8]
471 self._testbed.defer_cross_connect(guid, connector_type_name, cross_guid,
472 cross_testbed_id, cross_factory_id, cross_connector_type_name)
473 return "%d|%s" % (OK, "")
475 def defer_add_trace(self, params):
476 guid = int(params[1])
478 self._testbed.defer_add_trace(guid, trace_id)
479 return "%d|%s" % (OK, "")
481 def defer_add_address(self, params):
482 guid = int(params[1])
484 netprefix = int(params[3])
485 broadcast = params[4]
486 self._testbed.defer_add_address(guid, address, netprefix,
488 return "%d|%s" % (OK, "")
490 def defer_add_route(self, params):
491 guid = int(params[1])
492 destination = params[2]
493 netprefix = int(params[3])
495 self._testbed.defer_add_route(guid, destination, netprefix, nexthop)
496 return "%d|%s" % (OK, "")
498 def do_setup(self, params):
499 self._testbed.do_setup()
500 return "%d|%s" % (OK, "")
502 def do_create(self, params):
503 self._testbed.do_create()
504 return "%d|%s" % (OK, "")
506 def do_connect(self, params):
507 self._testbed.do_connect()
508 return "%d|%s" % (OK, "")
510 def do_configure(self, params):
511 self._testbed.do_configure()
512 return "%d|%s" % (OK, "")
514 def do_cross_connect(self, params):
515 self._testbed.do_cross_connect()
516 return "%d|%s" % (OK, "")
518 def get(self, params):
520 guid = int(param[2] )
521 name = base64.b64decode(params[3])
522 value = self._testbed.get(time, guid, name)
523 result = base64.b64encode(str(value))
524 return "%d|%s" % (OK, result)
526 def set(self, params):
528 guid = int(params[2])
529 name = base64.b64decode(params[3])
530 value = base64.b64decode(params[4])
531 type = int(params[3])
532 value = set_type(type, value)
533 self._testbed.set(time, guid, name, value)
534 return "%d|%s" % (OK, "")
536 def get_address(self, params):
538 index = int(param[2])
539 attribute = base64.b64decode(param[3])
540 value = self._testbed.get_address(guid, index, attribute)
541 result = base64.b64encode(str(value))
542 return "%d|%s" % (OK, result)
544 def get_route(self, params):
546 index = int(param[2])
547 attribute = base64.b64decode(param[3])
548 value = self._testbed.get_route(guid, index, attribute)
549 result = base64.b64encode(str(value))
550 return "%d|%s" % (OK, result)
552 def action(self, params):
554 guid = int(params[2])
555 command = base64.b64decode(params[3])
556 self._testbed.action(time, guid, command)
557 return "%d|%s" % (OK, "")
559 def status(self, params):
560 guid = int(params[1])
561 status = self._testbed.status(guid)
562 result = base64.b64encode(str(status))
563 return "%d|%s" % (OK, result)
565 class ExperimentControllerServer(server.Server):
566 def __init__(self, root_dir, log_level, experiment_xml):
567 super(ExperimentControllerServer, self).__init__(root_dir, log_level)
568 self._experiment_xml = experiment_xml
569 self._controller = None
571 def post_daemonize(self):
572 from nepi.core.execute import ExperimentController
573 self._controller = ExperimentController(self._experiment_xml,
574 root_dir = self._root_dir)
576 def reply_action(self, msg):
578 result = base64.b64encode("Invalid command line")
579 reply = "%d|%s" % (ERROR, result)
581 params = msg.split("|")
582 instruction = int(params[0])
583 log_msg(self, params)
585 if instruction == XML:
586 reply = self.experiment_xml(params)
587 elif instruction == ACCESS:
588 reply = self.set_access_configuration(params)
589 elif instruction == TRACE:
590 reply = self.trace(params)
591 elif instruction == FINISHED:
592 reply = self.is_finished(params)
593 elif instruction == START:
594 reply = self.start(params)
595 elif instruction == STOP:
596 reply = self.stop(params)
597 elif instruction == RECOVER:
598 reply = self.recover(params)
599 elif instruction == SHUTDOWN:
600 reply = self.shutdown(params)
602 error = "Invalid instruction %s" % instruction
603 self.log_error(error)
604 result = base64.b64encode(error)
605 reply = "%d|%s" % (ERROR, result)
607 error = self.log_error()
608 result = base64.b64encode(error)
609 reply = "%d|%s" % (ERROR, result)
610 log_reply(self, reply)
613 def experiment_xml(self, params):
614 xml = self._controller.experiment_xml
615 result = base64.b64encode(xml)
616 return "%d|%s" % (OK, result)
618 def set_access_configuration(self, params):
619 testbed_guid = int(params[1])
621 communication = params[3]
624 port = int(params[6])
626 use_agent = params[8] == "True"
627 log_level = params[9]
628 access_config = AccessConfiguration()
629 access_config.set_attribute_value("mode", mode)
630 access_config.set_attribute_value("communication", communication)
631 access_config.set_attribute_value("host", host)
632 access_config.set_attribute_value("user", user)
633 access_config.set_attribute_value("port", port)
634 access_config.set_attribute_value("rootDirectory", root_dir)
635 access_config.set_attribute_value("useAgent", use_agent)
636 access_config.set_attribute_value("logLevel", log_level)
637 self._controller.set_access_configuration(testbed_guid,
639 return "%d|%s" % (OK, "")
641 def trace(self, params):
642 testbed_guid = int(params[1])
643 guid = int(params[2])
645 attribute = base64.b64decode(params[4])
646 trace = self._controller.trace(testbed_guid, guid, trace_id, attribute)
647 result = base64.b64encode(trace)
648 return "%d|%s" % (OK, result)
650 def is_finished(self, params):
651 guid = int(params[1])
652 status = self._controller.is_finished(guid)
653 result = base64.b64encode(str(status))
654 return "%d|%s" % (OK, result)
656 def start(self, params):
657 self._controller.start()
658 return "%d|%s" % (OK, "")
660 def stop(self, params):
661 self._controller.stop()
662 return "%d|%s" % (OK, "")
664 def recover(self, params):
665 self._controller.recover()
666 return "%d|%s" % (OK, "")
668 def shutdown(self, params):
669 self._controller.shutdown()
670 return "%d|%s" % (OK, "")
672 class TestbedInstanceProxy(object):
673 def __init__(self, root_dir, log_level, testbed_id = None,
674 testbed_version = None, launch = True, host = None,
675 port = None, user = None, agent = None):
677 if testbed_id == None or testbed_version == None:
678 raise RuntimeError("To launch a TesbedInstance server a \
679 testbed_id and testbed_version are required")
682 python_code = "from nepi.util.proxy import \
683 TesbedInstanceServer;\
684 s = TestbedInstanceServer('%s', %d, '%s', '%s');\
685 s.run()" % (root_dir, log_level, testbed_id,
687 proc = server.popen_ssh_subprocess(python_code, host = host,
688 port = port, user = user, agent = agent)
690 err = proc.stderr.read()
691 raise RuntimeError("Server could not be executed: %s" % \
695 s = TestbedInstanceServer(root_dir, log_level, testbed_id,
699 # connect client to server
700 self._client = server.Client(root_dir, host = host, port = port,
701 user = user, agent = agent)
705 msg = testbed_messages[GUIDS]
706 self._client.send_msg(msg)
707 reply = self._client.read_reply()
708 result = reply.split("|")
709 code = int(result[0])
710 text = base64.b64decode(result[1])
712 raise RuntimeError(text)
713 return map(int, text.split(","))
715 def defer_configure(self, name, value):
716 msg = testbed_messages[CONFIGURE]
717 type = get_type(value)
718 # avoid having "|" in this parameters
719 name = base64.b64encode(name)
720 value = base64.b64encode(str(value))
721 msg = msg % (name, value, type)
722 self._client.send_msg(msg)
723 reply = self._client.read_reply()
724 result = reply.split("|")
725 code = int(result[0])
726 text = base64.b64decode(result[1])
728 raise RuntimeError(text)
730 def defer_create(self, guid, factory_id):
731 msg = testbed_messages[CREATE]
732 msg = msg % (guid, factory_id)
733 self._client.send_msg(msg)
734 reply = self._client.read_reply()
735 result = reply.split("|")
736 code = int(result[0])
737 text = base64.b64decode(result[1])
739 raise RuntimeError(text)
741 def defer_create_set(self, guid, name, value):
742 msg = testbed_messages[CREATE_SET]
743 type = get_type(value)
744 # avoid having "|" in this parameters
745 name = base64.b64encode(name)
746 value = base64.b64encode(str(value))
747 msg = msg % (guid, name, value, type)
748 self._client.send_msg(msg)
749 reply = self._client.read_reply()
750 result = reply.split("|")
751 code = int(result[0])
752 text = base64.b64decode(result[1])
754 raise RuntimeError(text)
756 def defer_factory_set(self, guid, name, value):
757 msg = testbed_messages[FACTORY_SET]
758 type = get_type(value)
759 # avoid having "|" in this parameters
760 name = base64.b64encode(name)
761 value = base64.b64encode(str(value))
762 msg = msg % (guid, name, value, type)
763 self._client.send_msg(msg)
764 reply = self._client.read_reply()
765 result = reply.split("|")
766 code = int(result[0])
767 text = base64.b64decode(result[1])
769 raise RuntimeError(text)
771 def defer_connect(self, guid1, connector_type_name1, guid2,
772 connector_type_name2):
773 msg = testbed_messages[CONNECT]
774 msg = msg % (guid1, connector_type_name1, guid2,
775 connector_type_name2)
776 self._client.send_msg(msg)
777 reply = self._client.read_reply()
778 result = reply.split("|")
779 code = int(result[0])
780 text = base64.b64decode(result[1])
782 raise RuntimeError(text)
784 def defer_cross_connect(self, guid, connector_type_name, cross_guid,
785 cross_testbed_id, cross_factory_id, cross_connector_type_name):
786 msg = testbed_messages[CROSS_CONNECT]
787 msg = msg % (guid, connector_type_name, cross_guid,
788 cross_testbed_id, cross_factory_id, cross_connector_type_name)
789 self._client.send_msg(msg)
790 reply = self._client.read_reply()
791 result = reply.split("|")
792 code = int(result[0])
793 text = base64.b64decode(result[1])
795 raise RuntimeError(text)
797 def defer_add_trace(self, guid, trace_id):
798 msg = testbed_messages[ADD_TRACE]
799 msg = msg % (guid, trace_id)
800 self._client.send_msg(msg)
801 reply = self._client.read_reply()
802 result = reply.split("|")
803 code = int(result[0])
804 text = base64.b64decode(result[1])
806 raise RuntimeError(text)
808 def defer_add_address(self, guid, address, netprefix, broadcast):
809 msg = testbed_messages[ADD_ADDRESS]
810 msg = msg % (guid, address, netprefix, broadcast)
811 self._client.send_msg(msg)
812 reply = self._client.read_reply()
813 result = reply.split("|")
814 code = int(result[0])
815 text = base64.b64decode(result[1])
817 raise RuntimeError(text)
819 def defer_add_route(self, guid, destination, netprefix, nexthop):
820 msg = testbed_messages[ADD_ROUTE]
821 msg = msg % (guid, destination, netprefix, nexthop)
822 self._client.send_msg(msg)
823 reply = self._client.read_reply()
824 result = reply.split("|")
825 code = int(result[0])
826 text = base64.b64decode(result[1])
828 raise RuntimeError(text)
831 msg = testbed_messages[DO_SETUP]
832 self._client.send_msg(msg)
833 reply = self._client.read_reply()
834 result = reply.split("|")
835 code = int(result[0])
836 text = base64.b64decode(result[1])
838 raise RuntimeError(text)
841 msg = testbed_messages[DO_CREATE]
842 self._client.send_msg(msg)
843 reply = self._client.read_reply()
844 result = reply.split("|")
845 code = int(result[0])
846 text = base64.b64decode(result[1])
848 raise RuntimeError(text)
850 def do_connect(self):
851 msg = testbed_messages[DO_CONNECT]
852 self._client.send_msg(msg)
853 reply = self._client.read_reply()
854 result = reply.split("|")
855 code = int(result[0])
856 text = base64.b64decode(result[1])
858 raise RuntimeError(text)
860 def do_configure(self):
861 msg = testbed_messages[DO_CONFIGURE]
862 self._client.send_msg(msg)
863 reply = self._client.read_reply()
864 result = reply.split("|")
865 code = int(result[0])
866 text = base64.b64decode(result[1])
868 raise RuntimeError(text)
870 def do_cross_connect(self):
871 msg = testbed_messages[DO_CROSS_CONNECT]
872 self._client.send_msg(msg)
873 reply = self._client.read_reply()
874 result = reply.split("|")
875 code = int(result[0])
876 text = base64.b64decode(result[1])
878 raise RuntimeError(text)
880 def start(self, time = TIME_NOW):
881 msg = testbed_messages[START]
882 self._client.send_msg(msg)
883 reply = self._client.read_reply()
884 result = reply.split("|")
885 code = int(result[0])
886 text = base64.b64decode(result[1])
888 raise RuntimeError(text)
890 def stop(self, time = TIME_NOW):
891 msg = testbed_messages[STOP]
892 self._client.send_msg(msg)
893 reply = self._client.read_reply()
894 result = reply.split("|")
895 code = int(result[0])
896 text = base64.b64decode(result[1])
898 raise RuntimeError(text)
900 def set(self, time, guid, name, value):
901 msg = testbed_messages[SET]
902 type = get_type(value)
903 # avoid having "|" in this parameters
904 name = base64.b64encode(name)
905 value = base64.b64encode(str(value))
906 msg = msg % (time, guid, name, value, type)
907 self._client.send_msg(msg)
908 reply = self._client.read_reply()
909 result = reply.split("|")
910 code = int(result[0])
911 text = base64.b64decode(result[1])
913 raise RuntimeError(text)
915 def get(self, time, guid, name):
916 msg = testbed_messages[GET]
917 # avoid having "|" in this parameters
918 name = base64.b64encode(name)
919 msg = msg % (time, guid, name)
920 self._client.send_msg(msg)
921 reply = self._client.read_reply()
922 result = reply.split("|")
923 code = int(result[0])
924 text = base64.b64decode(result[1])
926 raise RuntimeError(text)
929 def get_address(self, guid, index, attribute):
930 msg = testbed_messages[GET_ADDRESS]
931 # avoid having "|" in this parameters
932 attribute = base64.b64encode(attribute)
933 msg = msg % (guid, index, attribute)
934 self._client.send_msg(msg)
935 reply = self._client.read_reply()
936 result = reply.split("|")
937 code = int(result[0])
938 text = base64.b64decode(result[1])
940 raise RuntimeError(text)
943 def get_route(self, guid, index, attribute):
944 msg = testbed_messages[GET_ROUTE]
945 # avoid having "|" in this parameters
946 attribute = base64.b64encode(attribute)
947 msg = msg % (guid, index, attribute)
948 self._client.send_msg(msg)
949 reply = self._client.read_reply()
950 result = reply.split("|")
951 code = int(result[0])
952 text = base64.b64decode(result[1])
954 raise RuntimeError(text)
957 def action(self, time, guid, action):
958 msg = testbed_messages[ACTION]
959 msg = msg % (time, guid, action)
960 self._client.send_msg(msg)
961 reply = self._client.read_reply()
962 result = reply.split("|")
963 code = int(result[0])
964 text = base64.b64decode(result[1])
966 raise RuntimeError(text)
968 def status(self, guid):
969 msg = testbed_messages[STATUS]
971 self._client.send_msg(msg)
972 reply = self._client.read_reply()
973 result = reply.split("|")
974 code = int(result[0])
975 text = base64.b64decode(result[1])
977 raise RuntimeError(text)
980 def trace(self, guid, trace_id, attribute='value'):
981 msg = testbed_messages[TRACE]
982 attribute = base64.b64encode(attribute)
983 msg = msg % (guid, trace_id, attribute)
984 self._client.send_msg(msg)
985 reply = self._client.read_reply()
986 result = reply.split("|")
987 code = int(result[0])
988 text = base64.b64decode(result[1])
990 raise RuntimeError(text)
994 msg = testbed_messages[SHUTDOWN]
995 self._client.send_msg(msg)
996 reply = self._client.read_reply()
997 result = reply.split("|")
998 code = int(result[0])
999 text = base64.b64decode(result[1])
1001 raise RuntimeError(text)
1002 self._client.send_stop()
1003 self._client.read_reply() # wait for it
1005 class ExperimentControllerProxy(object):
1006 def __init__(self, root_dir, log_level, experiment_xml = None,
1007 launch = True, host = None, port = None, user = None,
1011 if experiment_xml == None:
1012 raise RuntimeError("To launch a ExperimentControllerServer a \
1013 xml description of the experiment is required")
1016 xml = experiment_xml
1017 python_code = "from nepi.util.proxy import ExperimentControllerServer;\
1018 s = ExperimentControllerServer(%r, %r, %r);\
1019 s.run()" % (root_dir, log_level, xml)
1020 proc = server.popen_ssh_subprocess(python_code, host = host,
1021 port = port, user = user, agent = agent)
1023 err = proc.stderr.read()
1024 raise RuntimeError("Server could not be executed: %s" % \
1028 s = ExperimentControllerServer(root_dir, log_level, experiment_xml)
1031 # connect client to server
1032 self._client = server.Client(root_dir, host = host, port = port,
1033 user = user, agent = agent)
1036 def experiment_xml(self):
1037 msg = controller_messages[XML]
1038 self._client.send_msg(msg)
1039 reply = self._client.read_reply()
1040 result = reply.split("|")
1041 code = int(result[0])
1042 text = base64.b64decode(result[1])
1044 raise RuntimeError(text)
1047 def set_access_configuration(self, testbed_guid, access_config):
1048 mode = access_config.get_attribute_value("mode")
1049 communication = access_config.get_attribute_value("communication")
1050 host = access_config.get_attribute_value("host")
1051 user = access_config.get_attribute_value("user")
1052 port = access_config.get_attribute_value("port")
1053 root_dir = access_config.get_attribute_value("rootDirectory")
1054 use_agent = access_config.get_attribute_value("useAgent")
1055 log_level = access_config.get_attribute_value("logLevel")
1056 msg = controller_messages[ACCESS]
1057 msg = msg % (testbed_guid, mode, communication, host, user, port,
1058 root_dir, use_agent, log_level)
1059 self._client.send_msg(msg)
1060 reply = self._client.read_reply()
1061 result = reply.split("|")
1062 code = int(result[0])
1063 text = base64.b64decode(result[1])
1065 raise RuntimeError(text)
1067 def trace(self, testbed_guid, guid, trace_id, attribute='value'):
1068 msg = controller_messages[TRACE]
1069 attribute = base64.b64encode(attribute)
1070 msg = msg % (testbed_guid, guid, trace_id, attribute)
1071 self._client.send_msg(msg)
1072 reply = self._client.read_reply()
1073 result = reply.split("|")
1074 code = int(result[0])
1075 text = base64.b64decode(result[1])
1078 raise RuntimeError(text)
1081 msg = controller_messages[START]
1082 self._client.send_msg(msg)
1083 reply = self._client.read_reply()
1084 result = reply.split("|")
1085 code = int(result[0])
1086 text = base64.b64decode(result[1])
1088 raise RuntimeError(text)
1091 msg = controller_messages[STOP]
1092 self._client.send_msg(msg)
1093 reply = self._client.read_reply()
1094 result = reply.split("|")
1095 code = int(result[0])
1096 text = base64.b64decode(result[1])
1098 raise RuntimeError(text)
1101 msg = controller_messages[RECOVER]
1102 self._client.send_msg(msg)
1103 reply = self._client.read_reply()
1104 result = reply.split("|")
1105 code = int(result[0])
1106 text = base64.b64decode(result[1])
1108 raise RuntimeError(text)
1110 def is_finished(self, guid):
1111 msg = controller_messages[FINISHED]
1113 self._client.send_msg(msg)
1114 reply = self._client.read_reply()
1115 result = reply.split("|")
1116 code = int(result[0])
1117 text = base64.b64decode(result[1])
1119 raise RuntimeError(text)
1120 return text == "True"
1123 msg = controller_messages[SHUTDOWN]
1124 self._client.send_msg(msg)
1125 reply = self._client.read_reply()
1126 result = reply.split("|")
1127 code = int(result[0])
1128 text = base64.b64decode(result[1])
1130 raise RuntimeError(text)
1131 self._client.send_stop()
1132 self._client.read_reply() # wait for it