2 # -*- coding: utf-8 -*-
5 from nepi.core.attributes import AttributesMap, Attribute
6 from nepi.util import server, validation
7 from nepi.util.constants import TIME_NOW
18 # PROTOCOL INSTRUCTION MESSAGES
56 # EXPERIMENT CONTROLER PROTOCOL MESSAGES
57 controller_messages = dict({
59 ACCESS: "%d|%s" % (ACCESS, "%d|%s|%s|%s|%s|%d|%s|%r|%s"),
60 TRACE: "%d|%s" % (TRACE, "%d|%d|%s|%s"),
61 FINISHED: "%d|%s" % (FINISHED, "%d"),
64 RECOVER : "%d" % RECOVER,
65 SHUTDOWN: "%d" % SHUTDOWN,
68 # TESTBED INSTANCE PROTOCOL MESSAGES
69 testbed_messages = dict({
70 TRACE: "%d|%s" % (TRACE, "%d|%s|%s"),
73 SHUTDOWN: "%d" % SHUTDOWN,
74 CONFIGURE: "%d|%s" % (CONFIGURE, "%s|%s|%d"),
75 CREATE: "%d|%s" % (CREATE, "%d|%s"),
76 CREATE_SET: "%d|%s" % (CREATE_SET, "%d|%s|%s|%d"),
77 FACTORY_SET: "%d|%s" % (FACTORY_SET, "%d|%s|%s|%d"),
78 CONNECT: "%d|%s" % (CONNECT, "%d|%s|%d|%s"),
79 CROSS_CONNECT: "%d|%s" % (CROSS_CONNECT, "%d|%s|%d|%d|%s|%s"),
80 ADD_TRACE: "%d|%s" % (ADD_TRACE, "%d|%s"),
81 ADD_ADDRESS: "%d|%s" % (ADD_ADDRESS, "%d|%s|%d|%s"),
82 ADD_ROUTE: "%d|%s" % (ADD_ROUTE, "%d|%s|%d|%s"),
83 DO_SETUP: "%d" % DO_SETUP,
84 DO_CREATE: "%d" % DO_CREATE,
85 DO_CONNECT: "%d" % DO_CONNECT,
86 DO_CONFIGURE: "%d" % DO_CONFIGURE,
87 DO_PRECONFIGURE: "%d" % DO_PRECONFIGURE,
88 DO_CROSS_CONNECT: "%d" % DO_CROSS_CONNECT,
89 GET: "%d|%s" % (GET, "%s|%d|%s"),
90 SET: "%d|%s" % (SET, "%s|%d|%s|%s|%d"),
91 GET_ROUTE: "%d|%s" % (GET, "%d|%d|%s"),
92 GET_ADDRESS: "%d|%s" % (GET, "%d|%d|%s"),
93 ACTION: "%d|%s" % (ACTION, "%s|%d|%s"),
94 STATUS: "%d|%s" % (STATUS, "%d"),
98 instruction_text = dict({
104 FINISHED: "FINISHED",
108 SHUTDOWN: "SHUTDOWN",
109 CONFIGURE: "CONFIGURE",
111 CREATE_SET: "CREATE_SET",
112 FACTORY_SET: "FACTORY_SET",
114 CROSS_CONNECT: "CROSS_CONNECT",
115 ADD_TRACE: "ADD_TRACE",
116 ADD_ADDRESS: "ADD_ADDRESS",
117 ADD_ROUTE: "ADD_ROUTE",
118 DO_SETUP: "DO_SETUP",
119 DO_CREATE: "DO_CREATE",
120 DO_CONNECT: "DO_CONNECT",
121 DO_CONFIGURE: "DO_CONFIGURE",
122 DO_PRECONFIGURE: "DO_PRECONFIGURE",
123 DO_CROSS_CONNECT: "DO_CROSS_CONNECT",
126 GET_ROUTE: "GET_ROUTE",
127 GET_ADDRESS: "GET_ADDRESS",
138 if isinstance(value, bool):
140 elif isinstance(value, int):
142 elif isinstance(value, float):
147 def set_type(type, value):
153 value = value == "True"
158 def log_msg(server, params):
159 instr = int(params[0])
160 instr_txt = instruction_text[instr]
161 server.log_debug("%s - msg: %s [%s]" % (server.__class__.__name__,
162 instr_txt, ", ".join(map(str, params[1:]))))
164 def log_reply(server, reply):
165 res = reply.split("|")
167 code_txt = instruction_text[code]
168 txt = base64.b64decode(res[1])
169 server.log_debug("%s - reply: %s %s" % (server.__class__.__name__,
172 def to_server_log_level(log_level):
173 return server.DEBUG_LEVEL \
174 if log_level == AccessConfiguration.DEBUG_LEVEL \
175 else server.ERROR_LEVEL
177 def get_access_config_params(access_config):
178 root_dir = access_config.get_attribute_value("rootDirectory")
179 log_level = access_config.get_attribute_value("logLevel")
180 log_level = to_server_log_level(log_level)
181 user = host = port = agent = None
182 communication = access_config.get_attribute_value("communication")
183 if communication == AccessConfiguration.ACCESS_SSH:
184 user = access_config.get_attribute_value("user")
185 host = access_config.get_attribute_value("host")
186 port = access_config.get_attribute_value("port")
187 agent = access_config.get_attribute_value("useAgent")
188 return (root_dir, log_level, user, host, port, agent)
190 class AccessConfiguration(AttributesMap):
191 MODE_SINGLE_PROCESS = "SINGLE"
192 MODE_DAEMON = "DAEMON"
194 ACCESS_LOCAL = "LOCAL"
195 ERROR_LEVEL = "Error"
196 DEBUG_LEVEL = "Debug"
199 super(AccessConfiguration, self).__init__()
200 self.add_attribute(name = "mode",
201 help = "Instance execution mode",
202 type = Attribute.ENUM,
203 value = AccessConfiguration.MODE_SINGLE_PROCESS,
204 allowed = [AccessConfiguration.MODE_DAEMON,
205 AccessConfiguration.MODE_SINGLE_PROCESS],
206 validation_function = validation.is_enum)
207 self.add_attribute(name = "communication",
208 help = "Instance communication mode",
209 type = Attribute.ENUM,
210 value = AccessConfiguration.ACCESS_LOCAL,
211 allowed = [AccessConfiguration.ACCESS_LOCAL,
212 AccessConfiguration.ACCESS_SSH],
213 validation_function = validation.is_enum)
214 self.add_attribute(name = "host",
215 help = "Host where the testbed will be executed",
216 type = Attribute.STRING,
218 validation_function = validation.is_string)
219 self.add_attribute(name = "user",
220 help = "User on the Host to execute the testbed",
221 type = Attribute.STRING,
222 value = getpass.getuser(),
223 validation_function = validation.is_string)
224 self.add_attribute(name = "port",
225 help = "Port on the Host",
226 type = Attribute.INTEGER,
228 validation_function = validation.is_integer)
229 self.add_attribute(name = "rootDirectory",
230 help = "Root directory for storing process files",
231 type = Attribute.STRING,
233 validation_function = validation.is_string) # TODO: validation.is_path
234 self.add_attribute(name = "useAgent",
235 help = "Use -A option for forwarding of the authentication agent, if ssh access is used",
236 type = Attribute.BOOL,
238 validation_function = validation.is_bool)
239 self.add_attribute(name = "logLevel",
240 help = "Log level for instance",
241 type = Attribute.ENUM,
242 value = AccessConfiguration.ERROR_LEVEL,
243 allowed = [AccessConfiguration.ERROR_LEVEL,
244 AccessConfiguration.DEBUG_LEVEL],
245 validation_function = validation.is_enum)
246 self.add_attribute(name = "recover",
247 help = "Do not intantiate testbeds, rather, reconnect to already-running instances. Used to recover from a dead controller.",
248 type = Attribute.BOOL,
250 validation_function = validation.is_bool)
252 class TempDir(object):
254 self.path = tempfile.mkdtemp()
257 shutil.rmtree(self.path)
259 class PermDir(object):
260 def __init__(self, path):
263 def create_controller(xml, access_config = None):
264 mode = None if not access_config \
265 else access_config.get_attribute_value("mode")
266 launch = True if not access_config \
267 else not access_config.get_attribute_value("recover")
268 if not mode or mode == AccessConfiguration.MODE_SINGLE_PROCESS:
270 raise ValueError, "Unsupported instantiation mode: %s with lanch=False" % (mode,)
272 from nepi.core.execute import ExperimentController
274 if not access_config or not access_config.has_attribute("rootDirectory"):
277 root_dir = PermDir(access_config.get_attribute_value("rootDirectory"))
278 controller = ExperimentController(xml, root_dir.path)
280 # inject reference to temporary dir, so that it gets cleaned
281 # up at destruction time.
282 controller._tempdir = root_dir
285 elif mode == AccessConfiguration.MODE_DAEMON:
286 (root_dir, log_level, user, host, port, agent) = \
287 get_access_config_params(access_config)
288 return ExperimentControllerProxy(root_dir, log_level,
289 experiment_xml = xml, host = host, port = port, user = user,
290 agent = agent, launch = launch)
291 raise RuntimeError("Unsupported access configuration '%s'" % mode)
293 def create_testbed_instance(testbed_id, testbed_version, access_config):
294 mode = None if not access_config \
295 else access_config.get_attribute_value("mode")
296 launch = True if not access_config \
297 else not access_config.get_attribute_value("recover")
298 if not mode or mode == AccessConfiguration.MODE_SINGLE_PROCESS:
300 raise ValueError, "Unsupported instantiation mode: %s with lanch=False" % (mode,)
301 return _build_testbed_instance(testbed_id, testbed_version)
302 elif mode == AccessConfiguration.MODE_DAEMON:
303 (root_dir, log_level, user, host, port, agent) = \
304 get_access_config_params(access_config)
305 return TestbedControllerProxy(root_dir, log_level, testbed_id = testbed_id,
306 testbed_version = testbed_version, host = host, port = port,
307 user = user, agent = agent, launch = launch)
308 raise RuntimeError("Unsupported access configuration '%s'" % mode)
310 def _build_testbed_instance(testbed_id, testbed_version):
311 mod_name = "nepi.testbeds.%s" % (testbed_id.lower())
312 if not mod_name in sys.modules:
314 module = sys.modules[mod_name]
315 return module.TestbedController(testbed_version)
317 class TestbedControllerServer(server.Server):
318 def __init__(self, root_dir, log_level, testbed_id, testbed_version):
319 super(TestbedControllerServer, self).__init__(root_dir, log_level)
320 self._testbed_id = testbed_id
321 self._testbed_version = testbed_version
324 def post_daemonize(self):
325 self._testbed = _build_testbed_instance(self._testbed_id,
326 self._testbed_version)
328 def reply_action(self, msg):
330 result = base64.b64encode("Invalid command line")
331 reply = "%d|%s" % (ERROR, result)
333 params = msg.split("|")
334 instruction = int(params[0])
335 log_msg(self, params)
337 if instruction == TRACE:
338 reply = self.trace(params)
339 elif instruction == START:
340 reply = self.start(params)
341 elif instruction == STOP:
342 reply = self.stop(params)
343 elif instruction == SHUTDOWN:
344 reply = self.shutdown(params)
345 elif instruction == CONFIGURE:
346 reply = self.defer_configure(params)
347 elif instruction == CREATE:
348 reply = self.defer_create(params)
349 elif instruction == CREATE_SET:
350 reply = self.defer_create_set(params)
351 elif instruction == FACTORY_SET:
352 reply = self.defer_factory_set(params)
353 elif instruction == CONNECT:
354 reply = self.defer_connect(params)
355 elif instruction == CROSS_CONNECT:
356 reply = self.defer_cross_connect(params)
357 elif instruction == ADD_TRACE:
358 reply = self.defer_add_trace(params)
359 elif instruction == ADD_ADDRESS:
360 reply = self.defer_add_address(params)
361 elif instruction == ADD_ROUTE:
362 reply = self.defer_add_route(params)
363 elif instruction == DO_SETUP:
364 reply = self.do_setup(params)
365 elif instruction == DO_CREATE:
366 reply = self.do_create(params)
367 elif instruction == DO_CONNECT:
368 reply = self.do_connect(params)
369 elif instruction == DO_CONFIGURE:
370 reply = self.do_configure(params)
371 elif instruction == DO_PRECONFIGURE:
372 reply = self.do_preconfigure(params)
373 elif instruction == DO_CROSS_CONNECT:
374 reply = self.do_cross_connect(params)
375 elif instruction == GET:
376 reply = self.get(params)
377 elif instruction == SET:
378 reply = self.set(params)
379 elif instruction == GET_ADDRESS:
380 reply = self.get_address(params)
381 elif instruction == GET_ROUTE:
382 reply = self.get_route(params)
383 elif instruction == ACTION:
384 reply = self.action(params)
385 elif instruction == STATUS:
386 reply = self.status(params)
387 elif instruction == GUIDS:
388 reply = self.guids(params)
390 error = "Invalid instruction %s" % instruction
391 self.log_error(error)
392 result = base64.b64encode(error)
393 reply = "%d|%s" % (ERROR, result)
395 error = self.log_error()
396 result = base64.b64encode(error)
397 reply = "%d|%s" % (ERROR, result)
398 log_reply(self, reply)
401 def guids(self, params):
402 guids = self._testbed.guids
403 guids = ",".join(map(str, guids))
404 result = base64.b64encode(guids)
405 return "%d|%s" % (OK, result)
407 def defer_create(self, params):
408 guid = int(params[1])
409 factory_id = params[2]
410 self._testbed.defer_create(guid, factory_id)
411 return "%d|%s" % (OK, "")
413 def trace(self, params):
414 guid = int(params[1])
416 attribute = base64.b64decode(params[3])
417 trace = self._testbed.trace(guid, trace_id, attribute)
418 result = base64.b64encode(trace)
419 return "%d|%s" % (OK, result)
421 def start(self, params):
422 self._testbed.start()
423 return "%d|%s" % (OK, "")
425 def stop(self, params):
427 return "%d|%s" % (OK, "")
429 def shutdown(self, params):
430 self._testbed.shutdown()
431 return "%d|%s" % (OK, "")
433 def defer_configure(self, params):
434 name = base64.b64decode(params[1])
435 value = base64.b64decode(params[2])
436 type = int(params[3])
437 value = set_type(type, value)
438 self._testbed.defer_configure(name, value)
439 return "%d|%s" % (OK, "")
441 def defer_create_set(self, params):
442 guid = int(params[1])
443 name = base64.b64decode(params[2])
444 value = base64.b64decode(params[3])
445 type = int(params[4])
446 value = set_type(type, value)
447 self._testbed.defer_create_set(guid, name, value)
448 return "%d|%s" % (OK, "")
450 def defer_factory_set(self, params):
451 name = base64.b64decode(params[1])
452 value = base64.b64decode(params[2])
453 type = int(params[3])
454 value = set_type(type, value)
455 self._testbed.defer_factory_set(name, value)
456 return "%d|%s" % (OK, "")
458 def defer_connect(self, params):
459 guid1 = int(params[1])
460 connector_type_name1 = params[2]
461 guid2 = int(params[3])
462 connector_type_name2 = params[4]
463 self._testbed.defer_connect(guid1, connector_type_name1, guid2,
464 connector_type_name2)
465 return "%d|%s" % (OK, "")
467 def defer_cross_connect(self, params):
468 guid = int(params[1])
469 connector_type_name = params[2]
470 cross_guid = int(params[3])
471 connector_type_name = params[4]
472 cross_guid = int(params[5])
473 cross_testbed_id = params[6]
474 cross_factory_id = params[7]
475 cross_connector_type_name = params[8]
476 self._testbed.defer_cross_connect(guid, connector_type_name, cross_guid,
477 cross_testbed_id, cross_factory_id, cross_connector_type_name)
478 return "%d|%s" % (OK, "")
480 def defer_add_trace(self, params):
481 guid = int(params[1])
483 self._testbed.defer_add_trace(guid, trace_id)
484 return "%d|%s" % (OK, "")
486 def defer_add_address(self, params):
487 guid = int(params[1])
489 netprefix = int(params[3])
490 broadcast = params[4]
491 self._testbed.defer_add_address(guid, address, netprefix,
493 return "%d|%s" % (OK, "")
495 def defer_add_route(self, params):
496 guid = int(params[1])
497 destination = params[2]
498 netprefix = int(params[3])
500 self._testbed.defer_add_route(guid, destination, netprefix, nexthop)
501 return "%d|%s" % (OK, "")
503 def do_setup(self, params):
504 self._testbed.do_setup()
505 return "%d|%s" % (OK, "")
507 def do_create(self, params):
508 self._testbed.do_create()
509 return "%d|%s" % (OK, "")
511 def do_connect(self, params):
512 self._testbed.do_connect()
513 return "%d|%s" % (OK, "")
515 def do_configure(self, params):
516 self._testbed.do_configure()
517 return "%d|%s" % (OK, "")
519 def do_preconfigure(self, params):
520 self._testbed.do_preconfigure()
521 return "%d|%s" % (OK, "")
523 def do_cross_connect(self, params):
524 self._testbed.do_cross_connect()
525 return "%d|%s" % (OK, "")
527 def get(self, params):
529 guid = int(param[2] )
530 name = base64.b64decode(params[3])
531 value = self._testbed.get(time, guid, name)
532 result = base64.b64encode(str(value))
533 return "%d|%s" % (OK, result)
535 def set(self, params):
537 guid = int(params[2])
538 name = base64.b64decode(params[3])
539 value = base64.b64decode(params[4])
540 type = int(params[3])
541 value = set_type(type, value)
542 self._testbed.set(time, guid, name, value)
543 return "%d|%s" % (OK, "")
545 def get_address(self, params):
547 index = int(param[2])
548 attribute = base64.b64decode(param[3])
549 value = self._testbed.get_address(guid, index, attribute)
550 result = base64.b64encode(str(value))
551 return "%d|%s" % (OK, result)
553 def get_route(self, params):
555 index = int(param[2])
556 attribute = base64.b64decode(param[3])
557 value = self._testbed.get_route(guid, index, attribute)
558 result = base64.b64encode(str(value))
559 return "%d|%s" % (OK, result)
561 def action(self, params):
563 guid = int(params[2])
564 command = base64.b64decode(params[3])
565 self._testbed.action(time, guid, command)
566 return "%d|%s" % (OK, "")
568 def status(self, params):
569 guid = int(params[1])
570 status = self._testbed.status(guid)
571 result = base64.b64encode(str(status))
572 return "%d|%s" % (OK, result)
574 class ExperimentControllerServer(server.Server):
575 def __init__(self, root_dir, log_level, experiment_xml):
576 super(ExperimentControllerServer, self).__init__(root_dir, log_level)
577 self._experiment_xml = experiment_xml
578 self._controller = None
580 def post_daemonize(self):
581 from nepi.core.execute import ExperimentController
582 self._controller = ExperimentController(self._experiment_xml,
583 root_dir = self._root_dir)
585 def reply_action(self, msg):
587 result = base64.b64encode("Invalid command line")
588 reply = "%d|%s" % (ERROR, result)
590 params = msg.split("|")
591 instruction = int(params[0])
592 log_msg(self, params)
594 if instruction == XML:
595 reply = self.experiment_xml(params)
596 elif instruction == ACCESS:
597 reply = self.set_access_configuration(params)
598 elif instruction == TRACE:
599 reply = self.trace(params)
600 elif instruction == FINISHED:
601 reply = self.is_finished(params)
602 elif instruction == START:
603 reply = self.start(params)
604 elif instruction == STOP:
605 reply = self.stop(params)
606 elif instruction == RECOVER:
607 reply = self.recover(params)
608 elif instruction == SHUTDOWN:
609 reply = self.shutdown(params)
611 error = "Invalid instruction %s" % instruction
612 self.log_error(error)
613 result = base64.b64encode(error)
614 reply = "%d|%s" % (ERROR, result)
616 error = self.log_error()
617 result = base64.b64encode(error)
618 reply = "%d|%s" % (ERROR, result)
619 log_reply(self, reply)
622 def experiment_xml(self, params):
623 xml = self._controller.experiment_xml
624 result = base64.b64encode(xml)
625 return "%d|%s" % (OK, result)
627 def set_access_configuration(self, params):
628 testbed_guid = int(params[1])
630 communication = params[3]
633 port = int(params[6])
635 use_agent = params[8] == "True"
636 log_level = params[9]
637 access_config = AccessConfiguration()
638 access_config.set_attribute_value("mode", mode)
639 access_config.set_attribute_value("communication", communication)
640 access_config.set_attribute_value("host", host)
641 access_config.set_attribute_value("user", user)
642 access_config.set_attribute_value("port", port)
643 access_config.set_attribute_value("rootDirectory", root_dir)
644 access_config.set_attribute_value("useAgent", use_agent)
645 access_config.set_attribute_value("logLevel", log_level)
646 self._controller.set_access_configuration(testbed_guid,
648 return "%d|%s" % (OK, "")
650 def trace(self, params):
651 testbed_guid = int(params[1])
652 guid = int(params[2])
654 attribute = base64.b64decode(params[4])
655 trace = self._controller.trace(testbed_guid, guid, trace_id, attribute)
656 result = base64.b64encode(trace)
657 return "%d|%s" % (OK, result)
659 def is_finished(self, params):
660 guid = int(params[1])
661 status = self._controller.is_finished(guid)
662 result = base64.b64encode(str(status))
663 return "%d|%s" % (OK, result)
665 def start(self, params):
666 self._controller.start()
667 return "%d|%s" % (OK, "")
669 def stop(self, params):
670 self._controller.stop()
671 return "%d|%s" % (OK, "")
673 def recover(self, params):
674 self._controller.recover()
675 return "%d|%s" % (OK, "")
677 def shutdown(self, params):
678 self._controller.shutdown()
679 return "%d|%s" % (OK, "")
681 class TestbedControllerProxy(object):
682 def __init__(self, root_dir, log_level, testbed_id = None,
683 testbed_version = None, launch = True, host = None,
684 port = None, user = None, agent = None):
686 if testbed_id == None or testbed_version == None:
687 raise RuntimeError("To launch a TesbedInstance server a \
688 testbed_id and testbed_version are required")
691 python_code = "from nepi.util.proxy import \
692 TesbedInstanceServer;\
693 s = TestbedControllerServer('%s', %d, '%s', '%s');\
694 s.run()" % (root_dir, log_level, testbed_id,
696 proc = server.popen_ssh_subprocess(python_code, host = host,
697 port = port, user = user, agent = agent)
699 err = proc.stderr.read()
700 raise RuntimeError("Server could not be executed: %s" % \
704 s = TestbedControllerServer(root_dir, log_level, testbed_id,
708 # connect client to server
709 self._client = server.Client(root_dir, host = host, port = port,
710 user = user, agent = agent)
714 msg = testbed_messages[GUIDS]
715 self._client.send_msg(msg)
716 reply = self._client.read_reply()
717 result = reply.split("|")
718 code = int(result[0])
719 text = base64.b64decode(result[1])
721 raise RuntimeError(text)
722 return map(int, text.split(","))
724 def defer_configure(self, name, value):
725 msg = testbed_messages[CONFIGURE]
726 type = get_type(value)
727 # avoid having "|" in this parameters
728 name = base64.b64encode(name)
729 value = base64.b64encode(str(value))
730 msg = msg % (name, value, type)
731 self._client.send_msg(msg)
732 reply = self._client.read_reply()
733 result = reply.split("|")
734 code = int(result[0])
735 text = base64.b64decode(result[1])
737 raise RuntimeError(text)
739 def defer_create(self, guid, factory_id):
740 msg = testbed_messages[CREATE]
741 msg = msg % (guid, factory_id)
742 self._client.send_msg(msg)
743 reply = self._client.read_reply()
744 result = reply.split("|")
745 code = int(result[0])
746 text = base64.b64decode(result[1])
748 raise RuntimeError(text)
750 def defer_create_set(self, guid, name, value):
751 msg = testbed_messages[CREATE_SET]
752 type = get_type(value)
753 # avoid having "|" in this parameters
754 name = base64.b64encode(name)
755 value = base64.b64encode(str(value))
756 msg = msg % (guid, name, value, type)
757 self._client.send_msg(msg)
758 reply = self._client.read_reply()
759 result = reply.split("|")
760 code = int(result[0])
761 text = base64.b64decode(result[1])
763 raise RuntimeError(text)
765 def defer_factory_set(self, guid, name, value):
766 msg = testbed_messages[FACTORY_SET]
767 type = get_type(value)
768 # avoid having "|" in this parameters
769 name = base64.b64encode(name)
770 value = base64.b64encode(str(value))
771 msg = msg % (guid, name, value, type)
772 self._client.send_msg(msg)
773 reply = self._client.read_reply()
774 result = reply.split("|")
775 code = int(result[0])
776 text = base64.b64decode(result[1])
778 raise RuntimeError(text)
780 def defer_connect(self, guid1, connector_type_name1, guid2,
781 connector_type_name2):
782 msg = testbed_messages[CONNECT]
783 msg = msg % (guid1, connector_type_name1, guid2,
784 connector_type_name2)
785 self._client.send_msg(msg)
786 reply = self._client.read_reply()
787 result = reply.split("|")
788 code = int(result[0])
789 text = base64.b64decode(result[1])
791 raise RuntimeError(text)
793 def defer_cross_connect(self, guid, connector_type_name, cross_guid,
794 cross_testbed_id, cross_factory_id, cross_connector_type_name):
795 msg = testbed_messages[CROSS_CONNECT]
796 msg = msg % (guid, connector_type_name, cross_guid,
797 cross_testbed_id, cross_factory_id, cross_connector_type_name)
798 self._client.send_msg(msg)
799 reply = self._client.read_reply()
800 result = reply.split("|")
801 code = int(result[0])
802 text = base64.b64decode(result[1])
804 raise RuntimeError(text)
806 def defer_add_trace(self, guid, trace_id):
807 msg = testbed_messages[ADD_TRACE]
808 msg = msg % (guid, trace_id)
809 self._client.send_msg(msg)
810 reply = self._client.read_reply()
811 result = reply.split("|")
812 code = int(result[0])
813 text = base64.b64decode(result[1])
815 raise RuntimeError(text)
817 def defer_add_address(self, guid, address, netprefix, broadcast):
818 msg = testbed_messages[ADD_ADDRESS]
819 msg = msg % (guid, address, netprefix, broadcast)
820 self._client.send_msg(msg)
821 reply = self._client.read_reply()
822 result = reply.split("|")
823 code = int(result[0])
824 text = base64.b64decode(result[1])
826 raise RuntimeError(text)
828 def defer_add_route(self, guid, destination, netprefix, nexthop):
829 msg = testbed_messages[ADD_ROUTE]
830 msg = msg % (guid, destination, netprefix, nexthop)
831 self._client.send_msg(msg)
832 reply = self._client.read_reply()
833 result = reply.split("|")
834 code = int(result[0])
835 text = base64.b64decode(result[1])
837 raise RuntimeError(text)
840 msg = testbed_messages[DO_SETUP]
841 self._client.send_msg(msg)
842 reply = self._client.read_reply()
843 result = reply.split("|")
844 code = int(result[0])
845 text = base64.b64decode(result[1])
847 raise RuntimeError(text)
850 msg = testbed_messages[DO_CREATE]
851 self._client.send_msg(msg)
852 reply = self._client.read_reply()
853 result = reply.split("|")
854 code = int(result[0])
855 text = base64.b64decode(result[1])
857 raise RuntimeError(text)
859 def do_connect(self):
860 msg = testbed_messages[DO_CONNECT]
861 self._client.send_msg(msg)
862 reply = self._client.read_reply()
863 result = reply.split("|")
864 code = int(result[0])
865 text = base64.b64decode(result[1])
867 raise RuntimeError(text)
869 def do_configure(self):
870 msg = testbed_messages[DO_CONFIGURE]
871 self._client.send_msg(msg)
872 reply = self._client.read_reply()
873 result = reply.split("|")
874 code = int(result[0])
875 text = base64.b64decode(result[1])
877 raise RuntimeError(text)
879 def do_preconfigure(self):
880 msg = testbed_messages[DO_PRECONFIGURE]
881 self._client.send_msg(msg)
882 reply = self._client.read_reply()
883 result = reply.split("|")
884 code = int(result[0])
885 text = base64.b64decode(result[1])
887 raise RuntimeError(text)
889 def do_cross_connect(self):
890 msg = testbed_messages[DO_CROSS_CONNECT]
891 self._client.send_msg(msg)
892 reply = self._client.read_reply()
893 result = reply.split("|")
894 code = int(result[0])
895 text = base64.b64decode(result[1])
897 raise RuntimeError(text)
899 def start(self, time = TIME_NOW):
900 msg = testbed_messages[START]
901 self._client.send_msg(msg)
902 reply = self._client.read_reply()
903 result = reply.split("|")
904 code = int(result[0])
905 text = base64.b64decode(result[1])
907 raise RuntimeError(text)
909 def stop(self, time = TIME_NOW):
910 msg = testbed_messages[STOP]
911 self._client.send_msg(msg)
912 reply = self._client.read_reply()
913 result = reply.split("|")
914 code = int(result[0])
915 text = base64.b64decode(result[1])
917 raise RuntimeError(text)
919 def set(self, time, guid, name, value):
920 msg = testbed_messages[SET]
921 type = get_type(value)
922 # avoid having "|" in this parameters
923 name = base64.b64encode(name)
924 value = base64.b64encode(str(value))
925 msg = msg % (time, guid, name, value, type)
926 self._client.send_msg(msg)
927 reply = self._client.read_reply()
928 result = reply.split("|")
929 code = int(result[0])
930 text = base64.b64decode(result[1])
932 raise RuntimeError(text)
934 def get(self, time, guid, name):
935 msg = testbed_messages[GET]
936 # avoid having "|" in this parameters
937 name = base64.b64encode(name)
938 msg = msg % (time, guid, name)
939 self._client.send_msg(msg)
940 reply = self._client.read_reply()
941 result = reply.split("|")
942 code = int(result[0])
943 text = base64.b64decode(result[1])
945 raise RuntimeError(text)
948 def get_address(self, guid, index, attribute):
949 msg = testbed_messages[GET_ADDRESS]
950 # avoid having "|" in this parameters
951 attribute = base64.b64encode(attribute)
952 msg = msg % (guid, index, attribute)
953 self._client.send_msg(msg)
954 reply = self._client.read_reply()
955 result = reply.split("|")
956 code = int(result[0])
957 text = base64.b64decode(result[1])
959 raise RuntimeError(text)
962 def get_route(self, guid, index, attribute):
963 msg = testbed_messages[GET_ROUTE]
964 # avoid having "|" in this parameters
965 attribute = base64.b64encode(attribute)
966 msg = msg % (guid, index, attribute)
967 self._client.send_msg(msg)
968 reply = self._client.read_reply()
969 result = reply.split("|")
970 code = int(result[0])
971 text = base64.b64decode(result[1])
973 raise RuntimeError(text)
976 def action(self, time, guid, action):
977 msg = testbed_messages[ACTION]
978 msg = msg % (time, guid, action)
979 self._client.send_msg(msg)
980 reply = self._client.read_reply()
981 result = reply.split("|")
982 code = int(result[0])
983 text = base64.b64decode(result[1])
985 raise RuntimeError(text)
987 def status(self, guid):
988 msg = testbed_messages[STATUS]
990 self._client.send_msg(msg)
991 reply = self._client.read_reply()
992 result = reply.split("|")
993 code = int(result[0])
994 text = base64.b64decode(result[1])
996 raise RuntimeError(text)
999 def trace(self, guid, trace_id, attribute='value'):
1000 msg = testbed_messages[TRACE]
1001 attribute = base64.b64encode(attribute)
1002 msg = msg % (guid, trace_id, attribute)
1003 self._client.send_msg(msg)
1004 reply = self._client.read_reply()
1005 result = reply.split("|")
1006 code = int(result[0])
1007 text = base64.b64decode(result[1])
1009 raise RuntimeError(text)
1013 msg = testbed_messages[SHUTDOWN]
1014 self._client.send_msg(msg)
1015 reply = self._client.read_reply()
1016 result = reply.split("|")
1017 code = int(result[0])
1018 text = base64.b64decode(result[1])
1020 raise RuntimeError(text)
1021 self._client.send_stop()
1022 self._client.read_reply() # wait for it
1024 class ExperimentControllerProxy(object):
1025 def __init__(self, root_dir, log_level, experiment_xml = None,
1026 launch = True, host = None, port = None, user = None,
1030 if experiment_xml == None:
1031 raise RuntimeError("To launch a ExperimentControllerServer a \
1032 xml description of the experiment is required")
1035 xml = experiment_xml
1036 python_code = "from nepi.util.proxy import ExperimentControllerServer;\
1037 s = ExperimentControllerServer(%r, %r, %r);\
1038 s.run()" % (root_dir, log_level, xml)
1039 proc = server.popen_ssh_subprocess(python_code, host = host,
1040 port = port, user = user, agent = agent)
1042 err = proc.stderr.read()
1043 raise RuntimeError("Server could not be executed: %s" % \
1047 s = ExperimentControllerServer(root_dir, log_level, experiment_xml)
1050 # connect client to server
1051 self._client = server.Client(root_dir, host = host, port = port,
1052 user = user, agent = agent)
1055 def experiment_xml(self):
1056 msg = controller_messages[XML]
1057 self._client.send_msg(msg)
1058 reply = self._client.read_reply()
1059 result = reply.split("|")
1060 code = int(result[0])
1061 text = base64.b64decode(result[1])
1063 raise RuntimeError(text)
1066 def set_access_configuration(self, testbed_guid, access_config):
1067 mode = access_config.get_attribute_value("mode")
1068 communication = access_config.get_attribute_value("communication")
1069 host = access_config.get_attribute_value("host")
1070 user = access_config.get_attribute_value("user")
1071 port = access_config.get_attribute_value("port")
1072 root_dir = access_config.get_attribute_value("rootDirectory")
1073 use_agent = access_config.get_attribute_value("useAgent")
1074 log_level = access_config.get_attribute_value("logLevel")
1075 msg = controller_messages[ACCESS]
1076 msg = msg % (testbed_guid, mode, communication, host, user, port,
1077 root_dir, use_agent, log_level)
1078 self._client.send_msg(msg)
1079 reply = self._client.read_reply()
1080 result = reply.split("|")
1081 code = int(result[0])
1082 text = base64.b64decode(result[1])
1084 raise RuntimeError(text)
1086 def trace(self, testbed_guid, guid, trace_id, attribute='value'):
1087 msg = controller_messages[TRACE]
1088 attribute = base64.b64encode(attribute)
1089 msg = msg % (testbed_guid, guid, trace_id, attribute)
1090 self._client.send_msg(msg)
1091 reply = self._client.read_reply()
1092 result = reply.split("|")
1093 code = int(result[0])
1094 text = base64.b64decode(result[1])
1097 raise RuntimeError(text)
1100 msg = controller_messages[START]
1101 self._client.send_msg(msg)
1102 reply = self._client.read_reply()
1103 result = reply.split("|")
1104 code = int(result[0])
1105 text = base64.b64decode(result[1])
1107 raise RuntimeError(text)
1110 msg = controller_messages[STOP]
1111 self._client.send_msg(msg)
1112 reply = self._client.read_reply()
1113 result = reply.split("|")
1114 code = int(result[0])
1115 text = base64.b64decode(result[1])
1117 raise RuntimeError(text)
1120 msg = controller_messages[RECOVER]
1121 self._client.send_msg(msg)
1122 reply = self._client.read_reply()
1123 result = reply.split("|")
1124 code = int(result[0])
1125 text = base64.b64decode(result[1])
1127 raise RuntimeError(text)
1129 def is_finished(self, guid):
1130 msg = controller_messages[FINISHED]
1132 self._client.send_msg(msg)
1133 reply = self._client.read_reply()
1134 result = reply.split("|")
1135 code = int(result[0])
1136 text = base64.b64decode(result[1])
1138 raise RuntimeError(text)
1139 return text == "True"
1142 msg = controller_messages[SHUTDOWN]
1143 self._client.send_msg(msg)
1144 reply = self._client.read_reply()
1145 result = reply.split("|")
1146 code = int(result[0])
1147 text = base64.b64decode(result[1])
1149 raise RuntimeError(text)
1150 self._client.send_stop()
1151 self._client.read_reply() # wait for it