2 # -*- coding: utf-8 -*-
5 from nepi.core.attributes import AttributesMap, Attribute
6 from nepi.util import server, validation
7 from nepi.util.constants import TIME_NOW
19 # PROTOCOL INSTRUCTION MESSAGES
40 DO_CROSS_CONNECT_INIT = 22
50 GET_ATTRIBUTE_LIST = 32
52 DO_CROSS_CONNECT_COMPL = 34
64 # EXPERIMENT CONTROLER PROTOCOL MESSAGES
65 controller_messages = dict({
67 ACCESS: "%d|%s" % (ACCESS, "%d|%s|%s|%s|%s|%d|%s|%r|%s"),
68 TRACE: "%d|%s" % (TRACE, "%d|%d|%s|%s"),
69 FINISHED: "%d|%s" % (FINISHED, "%d"),
72 RECOVER : "%d" % RECOVER,
73 SHUTDOWN: "%d" % SHUTDOWN,
76 # TESTBED INSTANCE PROTOCOL MESSAGES
77 testbed_messages = dict({
78 TRACE: "%d|%s" % (TRACE, "%d|%s|%s"),
81 SHUTDOWN: "%d" % SHUTDOWN,
82 CONFIGURE: "%d|%s" % (CONFIGURE, "%s|%s|%d"),
83 CREATE: "%d|%s" % (CREATE, "%d|%s"),
84 CREATE_SET: "%d|%s" % (CREATE_SET, "%d|%s|%s|%d"),
85 FACTORY_SET: "%d|%s" % (FACTORY_SET, "%d|%s|%s|%d"),
86 CONNECT: "%d|%s" % (CONNECT, "%d|%s|%d|%s"),
87 CROSS_CONNECT: "%d|%s" % (CROSS_CONNECT, "%d|%s|%d|%d|%s|%s|%s"),
88 ADD_TRACE: "%d|%s" % (ADD_TRACE, "%d|%s"),
89 ADD_ADDRESS: "%d|%s" % (ADD_ADDRESS, "%d|%s|%d|%s"),
90 ADD_ROUTE: "%d|%s" % (ADD_ROUTE, "%d|%s|%d|%s"),
91 DO_SETUP: "%d" % DO_SETUP,
92 DO_CREATE: "%d" % DO_CREATE,
93 DO_CONNECT_INIT: "%d" % DO_CONNECT_INIT,
94 DO_CONNECT_COMPL: "%d" % DO_CONNECT_COMPL,
95 DO_CONFIGURE: "%d" % DO_CONFIGURE,
96 DO_PRECONFIGURE: "%d" % DO_PRECONFIGURE,
97 DO_CROSS_CONNECT_INIT: "%d|%s" % (DO_CROSS_CONNECT_INIT, "%s"),
98 DO_CROSS_CONNECT_COMPL: "%d|%s" % (DO_CROSS_CONNECT_COMPL, "%s"),
99 GET: "%d|%s" % (GET, "%d|%s|%s"),
100 SET: "%d|%s" % (SET, "%d|%s|%s|%d|%s"),
101 EXPERIMENT_GET: "%d|%s" % (EXPERIMENT_GET, "%d|%d|%s|%s"),
102 EXPERIMENT_SET: "%d|%s" % (EXPERIMENT_SET, "%d|%d|%s|%s|%d|%s"),
103 GET_ROUTE: "%d|%s" % (GET, "%d|%d|%s"),
104 GET_ADDRESS: "%d|%s" % (GET, "%d|%d|%s"),
105 ACTION: "%d|%s" % (ACTION, "%s|%d|%s"),
106 STATUS: "%d|%s" % (STATUS, "%s"),
108 GET_ATTRIBUTE_LIST: "%d" % GET_ATTRIBUTE_LIST,
109 TESTBED_ID: "%d" % TESTBED_ID,
110 TESTBED_VERSION: "%d" % TESTBED_VERSION,
113 instruction_text = dict({
119 FINISHED: "FINISHED",
123 SHUTDOWN: "SHUTDOWN",
124 CONFIGURE: "CONFIGURE",
126 CREATE_SET: "CREATE_SET",
127 FACTORY_SET: "FACTORY_SET",
129 CROSS_CONNECT: "CROSS_CONNECT",
130 ADD_TRACE: "ADD_TRACE",
131 ADD_ADDRESS: "ADD_ADDRESS",
132 ADD_ROUTE: "ADD_ROUTE",
133 DO_SETUP: "DO_SETUP",
134 DO_CREATE: "DO_CREATE",
135 DO_CONNECT_INIT: "DO_CONNECT_INIT",
136 DO_CONNECT_COMPL: "DO_CONNECT_COMPL",
137 DO_CONFIGURE: "DO_CONFIGURE",
138 DO_PRECONFIGURE: "DO_PRECONFIGURE",
139 DO_CROSS_CONNECT_INIT: "DO_CROSS_CONNECT_INIT",
140 DO_CROSS_CONNECT_COMPL: "DO_CROSS_CONNECT_COMPL",
143 GET_ROUTE: "GET_ROUTE",
144 GET_ADDRESS: "GET_ADDRESS",
145 GET_ATTRIBUTE_LIST: "GET_ATTRIBUTE_LIST",
153 TESTBED_ID: "TESTBED_ID",
154 TESTBED_VERSION: "TESTBED_VERSION",
155 EXPERIMENT_SET: "EXPERIMENT_SET",
156 EXPERIMENT_GET: "EXPERIMENT_GET",
160 if isinstance(value, bool):
162 elif isinstance(value, int):
164 elif isinstance(value, float):
169 def set_type(type, value):
175 value = value == "True"
180 def log_msg(server, params):
181 instr = int(params[0])
182 instr_txt = instruction_text[instr]
183 server.log_debug("%s - msg: %s [%s]" % (server.__class__.__name__,
184 instr_txt, ", ".join(map(str, params[1:]))))
186 def log_reply(server, reply):
187 res = reply.split("|")
189 code_txt = instruction_text[code]
190 txt = base64.b64decode(res[1])
191 server.log_debug("%s - reply: %s %s" % (server.__class__.__name__,
194 def to_server_log_level(log_level):
195 return server.DEBUG_LEVEL \
196 if log_level == AccessConfiguration.DEBUG_LEVEL \
197 else server.ERROR_LEVEL
199 def get_access_config_params(access_config):
200 root_dir = access_config.get_attribute_value("rootDirectory")
201 log_level = access_config.get_attribute_value("logLevel")
202 log_level = to_server_log_level(log_level)
203 user = host = port = agent = None
204 communication = access_config.get_attribute_value("communication")
205 if communication == AccessConfiguration.ACCESS_SSH:
206 user = access_config.get_attribute_value("user")
207 host = access_config.get_attribute_value("host")
208 port = access_config.get_attribute_value("port")
209 agent = access_config.get_attribute_value("useAgent")
210 return (root_dir, log_level, user, host, port, agent)
212 class AccessConfiguration(AttributesMap):
213 MODE_SINGLE_PROCESS = "SINGLE"
214 MODE_DAEMON = "DAEMON"
216 ACCESS_LOCAL = "LOCAL"
217 ERROR_LEVEL = "Error"
218 DEBUG_LEVEL = "Debug"
221 super(AccessConfiguration, self).__init__()
222 self.add_attribute(name = "mode",
223 help = "Instance execution mode",
224 type = Attribute.ENUM,
225 value = AccessConfiguration.MODE_SINGLE_PROCESS,
226 allowed = [AccessConfiguration.MODE_DAEMON,
227 AccessConfiguration.MODE_SINGLE_PROCESS],
228 validation_function = validation.is_enum)
229 self.add_attribute(name = "communication",
230 help = "Instance communication mode",
231 type = Attribute.ENUM,
232 value = AccessConfiguration.ACCESS_LOCAL,
233 allowed = [AccessConfiguration.ACCESS_LOCAL,
234 AccessConfiguration.ACCESS_SSH],
235 validation_function = validation.is_enum)
236 self.add_attribute(name = "host",
237 help = "Host where the testbed will be executed",
238 type = Attribute.STRING,
240 validation_function = validation.is_string)
241 self.add_attribute(name = "user",
242 help = "User on the Host to execute the testbed",
243 type = Attribute.STRING,
244 value = getpass.getuser(),
245 validation_function = validation.is_string)
246 self.add_attribute(name = "port",
247 help = "Port on the Host",
248 type = Attribute.INTEGER,
250 validation_function = validation.is_integer)
251 self.add_attribute(name = "rootDirectory",
252 help = "Root directory for storing process files",
253 type = Attribute.STRING,
255 validation_function = validation.is_string) # TODO: validation.is_path
256 self.add_attribute(name = "useAgent",
257 help = "Use -A option for forwarding of the authentication agent, if ssh access is used",
258 type = Attribute.BOOL,
260 validation_function = validation.is_bool)
261 self.add_attribute(name = "logLevel",
262 help = "Log level for instance",
263 type = Attribute.ENUM,
264 value = AccessConfiguration.ERROR_LEVEL,
265 allowed = [AccessConfiguration.ERROR_LEVEL,
266 AccessConfiguration.DEBUG_LEVEL],
267 validation_function = validation.is_enum)
268 self.add_attribute(name = "recover",
269 help = "Do not intantiate testbeds, rather, reconnect to already-running instances. Used to recover from a dead controller.",
270 type = Attribute.BOOL,
272 validation_function = validation.is_bool)
274 class TempDir(object):
276 self.path = tempfile.mkdtemp()
279 shutil.rmtree(self.path)
281 class PermDir(object):
282 def __init__(self, path):
285 def create_controller(xml, access_config = None):
286 mode = None if not access_config \
287 else access_config.get_attribute_value("mode")
288 launch = True if not access_config \
289 else not access_config.get_attribute_value("recover")
290 if not mode or mode == AccessConfiguration.MODE_SINGLE_PROCESS:
292 raise ValueError, "Unsupported instantiation mode: %s with lanch=False" % (mode,)
294 from nepi.core.execute import ExperimentController
296 if not access_config or not access_config.has_attribute("rootDirectory"):
299 root_dir = PermDir(access_config.get_attribute_value("rootDirectory"))
300 controller = ExperimentController(xml, root_dir.path)
302 # inject reference to temporary dir, so that it gets cleaned
303 # up at destruction time.
304 controller._tempdir = root_dir
307 elif mode == AccessConfiguration.MODE_DAEMON:
308 (root_dir, log_level, user, host, port, agent) = \
309 get_access_config_params(access_config)
310 return ExperimentControllerProxy(root_dir, log_level,
311 experiment_xml = xml, host = host, port = port, user = user,
312 agent = agent, launch = launch)
313 raise RuntimeError("Unsupported access configuration '%s'" % mode)
315 def create_testbed_controller(testbed_id, testbed_version, access_config):
316 mode = None if not access_config \
317 else access_config.get_attribute_value("mode")
318 launch = True if not access_config \
319 else not access_config.get_attribute_value("recover")
320 if not mode or mode == AccessConfiguration.MODE_SINGLE_PROCESS:
322 raise ValueError, "Unsupported instantiation mode: %s with lanch=False" % (mode,)
323 return _build_testbed_controller(testbed_id, testbed_version)
324 elif mode == AccessConfiguration.MODE_DAEMON:
325 (root_dir, log_level, user, host, port, agent) = \
326 get_access_config_params(access_config)
327 return TestbedControllerProxy(root_dir, log_level, testbed_id = testbed_id,
328 testbed_version = testbed_version, host = host, port = port,
329 user = user, agent = agent, launch = launch)
330 raise RuntimeError("Unsupported access configuration '%s'" % mode)
332 def _build_testbed_controller(testbed_id, testbed_version):
333 mod_name = "nepi.testbeds.%s" % (testbed_id.lower())
334 if not mod_name in sys.modules:
336 module = sys.modules[mod_name]
337 return module.TestbedController(testbed_version)
339 class TestbedControllerServer(server.Server):
340 def __init__(self, root_dir, log_level, testbed_id, testbed_version):
341 super(TestbedControllerServer, self).__init__(root_dir, log_level)
342 self._testbed_id = testbed_id
343 self._testbed_version = testbed_version
346 def post_daemonize(self):
347 self._testbed = _build_testbed_controller(self._testbed_id,
348 self._testbed_version)
350 def reply_action(self, msg):
352 result = base64.b64encode("Invalid command line")
353 reply = "%d|%s" % (ERROR, result)
355 params = msg.split("|")
356 instruction = int(params[0])
357 log_msg(self, params)
359 if instruction == TRACE:
360 reply = self.trace(params)
361 elif instruction == START:
362 reply = self.start(params)
363 elif instruction == STOP:
364 reply = self.stop(params)
365 elif instruction == SHUTDOWN:
366 reply = self.shutdown(params)
367 elif instruction == CONFIGURE:
368 reply = self.defer_configure(params)
369 elif instruction == CREATE:
370 reply = self.defer_create(params)
371 elif instruction == CREATE_SET:
372 reply = self.defer_create_set(params)
373 elif instruction == FACTORY_SET:
374 reply = self.defer_factory_set(params)
375 elif instruction == CONNECT:
376 reply = self.defer_connect(params)
377 elif instruction == CROSS_CONNECT:
378 reply = self.defer_cross_connect(params)
379 elif instruction == ADD_TRACE:
380 reply = self.defer_add_trace(params)
381 elif instruction == ADD_ADDRESS:
382 reply = self.defer_add_address(params)
383 elif instruction == ADD_ROUTE:
384 reply = self.defer_add_route(params)
385 elif instruction == DO_SETUP:
386 reply = self.do_setup(params)
387 elif instruction == DO_CREATE:
388 reply = self.do_create(params)
389 elif instruction == DO_CONNECT_INIT:
390 reply = self.do_connect_init(params)
391 elif instruction == DO_CONNECT_COMPL:
392 reply = self.do_connect_compl(params)
393 elif instruction == DO_CONFIGURE:
394 reply = self.do_configure(params)
395 elif instruction == DO_PRECONFIGURE:
396 reply = self.do_preconfigure(params)
397 elif instruction == DO_CROSS_CONNECT_INIT:
398 reply = self.do_cross_connect_init(params)
399 elif instruction == DO_CROSS_CONNECT_COMPL:
400 reply = self.do_cross_connect_compl(params)
401 elif instruction == GET:
402 reply = self.get(params)
403 elif instruction == SET:
404 reply = self.set(params)
405 elif instruction == GET_ADDRESS:
406 reply = self.get_address(params)
407 elif instruction == GET_ROUTE:
408 reply = self.get_route(params)
409 elif instruction == ACTION:
410 reply = self.action(params)
411 elif instruction == STATUS:
412 reply = self.status(params)
413 elif instruction == GUIDS:
414 reply = self.guids(params)
415 elif instruction == GET_ATTRIBUTE_LIST:
416 reply = self.get_attribute_list(params)
417 elif instruction == TESTBED_ID:
418 reply = self.testbed_id(params)
419 elif instruction == TESTBED_VERSION:
420 reply = self.testbed_version(params)
422 error = "Invalid instruction %s" % instruction
423 self.log_error(error)
424 result = base64.b64encode(error)
425 reply = "%d|%s" % (ERROR, result)
427 error = self.log_error()
428 result = base64.b64encode(error)
429 reply = "%d|%s" % (ERROR, result)
430 log_reply(self, reply)
433 def guids(self, params):
434 guids = self._testbed.guids
435 value = cPickle.dumps(guids)
436 result = base64.b64encode(value)
437 return "%d|%s" % (OK, result)
439 def testbed_id(self, params):
440 testbed_id = self._testbed.testbed_id
441 result = base64.b64encode(str(testbed_id))
442 return "%d|%s" % (OK, result)
444 def testbed_version(self, params):
445 testbed_version = self._testbed.testbed_version
446 result = base64.b64encode(str(testbed_version))
447 return "%d|%s" % (OK, result)
449 def defer_create(self, params):
450 guid = int(params[1])
451 factory_id = params[2]
452 self._testbed.defer_create(guid, factory_id)
453 return "%d|%s" % (OK, "")
455 def trace(self, params):
456 guid = int(params[1])
458 attribute = base64.b64decode(params[3])
459 trace = self._testbed.trace(guid, trace_id, attribute)
460 result = base64.b64encode(trace)
461 return "%d|%s" % (OK, result)
463 def start(self, params):
464 self._testbed.start()
465 return "%d|%s" % (OK, "")
467 def stop(self, params):
469 return "%d|%s" % (OK, "")
471 def shutdown(self, params):
472 self._testbed.shutdown()
473 return "%d|%s" % (OK, "")
475 def defer_configure(self, params):
476 name = base64.b64decode(params[1])
477 value = base64.b64decode(params[2])
478 type = int(params[3])
479 value = set_type(type, value)
480 self._testbed.defer_configure(name, value)
481 return "%d|%s" % (OK, "")
483 def defer_create_set(self, params):
484 guid = int(params[1])
485 name = base64.b64decode(params[2])
486 value = base64.b64decode(params[3])
487 type = int(params[4])
488 value = set_type(type, value)
489 self._testbed.defer_create_set(guid, name, value)
490 return "%d|%s" % (OK, "")
492 def defer_factory_set(self, params):
493 name = base64.b64decode(params[1])
494 value = base64.b64decode(params[2])
495 type = int(params[3])
496 value = set_type(type, value)
497 self._testbed.defer_factory_set(name, value)
498 return "%d|%s" % (OK, "")
500 def defer_connect(self, params):
501 guid1 = int(params[1])
502 connector_type_name1 = params[2]
503 guid2 = int(params[3])
504 connector_type_name2 = params[4]
505 self._testbed.defer_connect(guid1, connector_type_name1, guid2,
506 connector_type_name2)
507 return "%d|%s" % (OK, "")
509 def defer_cross_connect(self, params):
510 guid = int(params[1])
511 connector_type_name = params[2]
512 cross_guid = int(params[3])
513 connector_type_name = params[4]
514 cross_guid = int(params[5])
515 cross_testbed_guid = int(params[6])
516 cross_testbed_id = params[7]
517 cross_factory_id = params[8]
518 cross_connector_type_name = params[9]
519 self._testbed.defer_cross_connect(guid, connector_type_name, cross_guid,
520 cross_testbed_guid, cross_testbed_id, cross_factory_id,
521 cross_connector_type_name)
522 return "%d|%s" % (OK, "")
524 def defer_add_trace(self, params):
525 guid = int(params[1])
527 self._testbed.defer_add_trace(guid, trace_id)
528 return "%d|%s" % (OK, "")
530 def defer_add_address(self, params):
531 guid = int(params[1])
533 netprefix = int(params[3])
534 broadcast = params[4]
535 self._testbed.defer_add_address(guid, address, netprefix,
537 return "%d|%s" % (OK, "")
539 def defer_add_route(self, params):
540 guid = int(params[1])
541 destination = params[2]
542 netprefix = int(params[3])
544 self._testbed.defer_add_route(guid, destination, netprefix, nexthop)
545 return "%d|%s" % (OK, "")
547 def do_setup(self, params):
548 self._testbed.do_setup()
549 return "%d|%s" % (OK, "")
551 def do_create(self, params):
552 self._testbed.do_create()
553 return "%d|%s" % (OK, "")
555 def do_connect_init(self, params):
556 self._testbed.do_connect_init()
557 return "%d|%s" % (OK, "")
559 def do_connect_compl(self, params):
560 self._testbed.do_connect_compl()
561 return "%d|%s" % (OK, "")
563 def do_configure(self, params):
564 self._testbed.do_configure()
565 return "%d|%s" % (OK, "")
567 def do_preconfigure(self, params):
568 self._testbed.do_preconfigure()
569 return "%d|%s" % (OK, "")
571 def do_cross_connect_init(self, params):
572 pcross_data = base64.b64decode(params[1])
573 cross_data = cPickle.loads(pcross_data)
574 self._testbed.do_cross_connect_init(cross_data)
575 return "%d|%s" % (OK, "")
577 def do_cross_connect_compl(self, params):
578 pcross_data = base64.b64decode(params[1])
579 cross_data = cPickle.loads(pcross_data)
580 self._testbed.do_cross_connect_compl(cross_data)
581 return "%d|%s" % (OK, "")
583 def get(self, params):
585 name = base64.b64decode(params[2])
586 value = self._testbed.get(guid, name, time)
588 result = base64.b64encode(str(value))
589 return "%d|%s" % (OK, result)
591 def set(self, params):
592 guid = int(params[1])
593 name = base64.b64decode(params[2])
594 value = base64.b64decode(params[3])
595 type = int(params[2])
597 value = set_type(type, value)
598 self._testbed.set(guid, name, value, time)
599 return "%d|%s" % (OK, "")
601 def get_address(self, params):
602 guid = int(params[1])
603 index = int(params[2])
604 attribute = base64.b64decode(param[3])
605 value = self._testbed.get_address(guid, index, attribute)
606 result = base64.b64encode(str(value))
607 return "%d|%s" % (OK, result)
609 def get_route(self, params):
610 guid = int(params[1])
611 index = int(params[2])
612 attribute = base64.b64decode(param[3])
613 value = self._testbed.get_route(guid, index, attribute)
614 result = base64.b64encode(str(value))
615 return "%d|%s" % (OK, result)
617 def action(self, params):
619 guid = int(params[2])
620 command = base64.b64decode(params[3])
621 self._testbed.action(time, guid, command)
622 return "%d|%s" % (OK, "")
624 def status(self, params):
626 if params[1] != "None":
627 guid = int(params[1])
628 status = self._testbed.status(guid)
629 result = base64.b64encode(str(status))
630 return "%d|%s" % (OK, result)
632 def get_attribute_list(self, params):
633 guid = int(params[1])
634 attr_list = self._testbed.get_attribute_list(guid)
635 value = cPickle.dumps(attr_list)
636 result = base64.b64encode(value)
637 return "%d|%s" % (OK, result)
639 class ExperimentControllerServer(server.Server):
640 def __init__(self, root_dir, log_level, experiment_xml):
641 super(ExperimentControllerServer, self).__init__(root_dir, log_level)
642 self._experiment_xml = experiment_xml
643 self._controller = None
645 def post_daemonize(self):
646 from nepi.core.execute import ExperimentController
647 self._controller = ExperimentController(self._experiment_xml,
648 root_dir = self._root_dir)
650 def reply_action(self, msg):
652 result = base64.b64encode("Invalid command line")
653 reply = "%d|%s" % (ERROR, result)
655 params = msg.split("|")
656 instruction = int(params[0])
657 log_msg(self, params)
659 if instruction == XML:
660 reply = self.experiment_xml(params)
661 elif instruction == ACCESS:
662 reply = self.set_access_configuration(params)
663 elif instruction == TRACE:
664 reply = self.trace(params)
665 elif instruction == FINISHED:
666 reply = self.is_finished(params)
667 elif instruction == EXPERIMENT_GET:
668 reply = self.get(params)
669 elif instruction == EXPERIMENT_SET:
670 reply = self.set(params)
671 elif instruction == START:
672 reply = self.start(params)
673 elif instruction == STOP:
674 reply = self.stop(params)
675 elif instruction == RECOVER:
676 reply = self.recover(params)
677 elif instruction == SHUTDOWN:
678 reply = self.shutdown(params)
680 error = "Invalid instruction %s" % instruction
681 self.log_error(error)
682 result = base64.b64encode(error)
683 reply = "%d|%s" % (ERROR, result)
685 error = self.log_error()
686 result = base64.b64encode(error)
687 reply = "%d|%s" % (ERROR, result)
688 log_reply(self, reply)
691 def experiment_xml(self, params):
692 xml = self._controller.experiment_xml
693 result = base64.b64encode(xml)
694 return "%d|%s" % (OK, result)
696 def set_access_configuration(self, params):
697 testbed_guid = int(params[1])
699 communication = params[3]
702 port = int(params[6])
704 use_agent = params[8] == "True"
705 log_level = params[9]
706 access_config = AccessConfiguration()
707 access_config.set_attribute_value("mode", mode)
708 access_config.set_attribute_value("communication", communication)
709 access_config.set_attribute_value("host", host)
710 access_config.set_attribute_value("user", user)
711 access_config.set_attribute_value("port", port)
712 access_config.set_attribute_value("rootDirectory", root_dir)
713 access_config.set_attribute_value("useAgent", use_agent)
714 access_config.set_attribute_value("logLevel", log_level)
715 self._controller.set_access_configuration(testbed_guid,
717 return "%d|%s" % (OK, "")
719 def trace(self, params):
720 testbed_guid = int(params[1])
721 guid = int(params[2])
723 attribute = base64.b64decode(params[4])
724 trace = self._controller.trace(testbed_guid, guid, trace_id, attribute)
725 result = base64.b64encode(trace)
726 return "%d|%s" % (OK, result)
728 def is_finished(self, params):
729 guid = int(params[1])
730 status = self._controller.is_finished(guid)
731 result = base64.b64encode(str(status))
732 return "%d|%s" % (OK, result)
734 def get(self, params):
735 testbed_guid = int(param[1])
736 guid = int(params[2])
737 name = base64.b64decode(params[3])
738 value = self._controller.get(testbed_guid, guid, name, time)
740 result = base64.b64encode(str(value))
741 return "%d|%s" % (OK, result)
743 def set(self, params):
744 testbed_guid = int(params[1])
745 guid = int(params[2])
746 name = base64.b64decode(params[3])
747 value = base64.b64decode(params[4])
748 type = int(params[3])
750 value = set_type(type, value)
751 self._controller.set(testbed_guid, guid, name, value, time)
752 return "%d|%s" % (OK, "")
754 def start(self, params):
755 self._controller.start()
756 return "%d|%s" % (OK, "")
758 def stop(self, params):
759 self._controller.stop()
760 return "%d|%s" % (OK, "")
762 def recover(self, params):
763 self._controller.recover()
764 return "%d|%s" % (OK, "")
766 def shutdown(self, params):
767 self._controller.shutdown()
768 return "%d|%s" % (OK, "")
770 class TestbedControllerProxy(object):
771 def __init__(self, root_dir, log_level, testbed_id = None,
772 testbed_version = None, launch = True, host = None,
773 port = None, user = None, agent = None):
775 if testbed_id == None or testbed_version == None:
776 raise RuntimeError("To launch a TesbedInstance server a \
777 testbed_id and testbed_version are required")
780 python_code = "from nepi.util.proxy import \
781 TesbedInstanceServer;\
782 s = TestbedControllerServer('%s', %d, '%s', '%s');\
783 s.run()" % (root_dir, log_level, testbed_id,
785 proc = server.popen_ssh_subprocess(python_code, host = host,
786 port = port, user = user, agent = agent)
788 err = proc.stderr.read()
789 raise RuntimeError("Server could not be executed: %s" % \
793 s = TestbedControllerServer(root_dir, log_level, testbed_id,
797 # connect client to server
798 self._client = server.Client(root_dir, host = host, port = port,
799 user = user, agent = agent)
803 msg = testbed_messages[GUIDS]
804 self._client.send_msg(msg)
805 reply = self._client.read_reply()
806 result = reply.split("|")
807 code = int(result[0])
808 text = base64.b64decode(result[1])
810 raise RuntimeError(text)
811 guids = cPickle.loads(text)
815 def testbed_id(self):
816 msg = testbed_messages[TESTBED_ID]
817 self._client.send_msg(msg)
818 reply = self._client.read_reply()
819 result = reply.split("|")
820 code = int(result[0])
821 text = base64.b64decode(result[1])
823 raise RuntimeError(text)
827 def testbed_version(self):
828 msg = testbed_messages[TESTBED_VERSION]
829 self._client.send_msg(msg)
830 reply = self._client.read_reply()
831 result = reply.split("|")
832 code = int(result[0])
833 text = base64.b64decode(result[1])
835 raise RuntimeError(text)
838 def defer_configure(self, name, value):
839 msg = testbed_messages[CONFIGURE]
840 type = get_type(value)
841 # avoid having "|" in this parameters
842 name = base64.b64encode(name)
843 value = base64.b64encode(str(value))
844 msg = msg % (name, value, type)
845 self._client.send_msg(msg)
846 reply = self._client.read_reply()
847 result = reply.split("|")
848 code = int(result[0])
849 text = base64.b64decode(result[1])
851 raise RuntimeError(text)
853 def defer_create(self, guid, factory_id):
854 msg = testbed_messages[CREATE]
855 msg = msg % (guid, factory_id)
856 self._client.send_msg(msg)
857 reply = self._client.read_reply()
858 result = reply.split("|")
859 code = int(result[0])
860 text = base64.b64decode(result[1])
862 raise RuntimeError(text)
864 def defer_create_set(self, guid, name, value):
865 msg = testbed_messages[CREATE_SET]
866 type = get_type(value)
867 # avoid having "|" in this parameters
868 name = base64.b64encode(name)
869 value = base64.b64encode(str(value))
870 msg = msg % (guid, name, value, type)
871 self._client.send_msg(msg)
872 reply = self._client.read_reply()
873 result = reply.split("|")
874 code = int(result[0])
875 text = base64.b64decode(result[1])
877 raise RuntimeError(text)
879 def defer_factory_set(self, guid, name, value):
880 msg = testbed_messages[FACTORY_SET]
881 type = get_type(value)
882 # avoid having "|" in this parameters
883 name = base64.b64encode(name)
884 value = base64.b64encode(str(value))
885 msg = msg % (guid, name, value, type)
886 self._client.send_msg(msg)
887 reply = self._client.read_reply()
888 result = reply.split("|")
889 code = int(result[0])
890 text = base64.b64decode(result[1])
892 raise RuntimeError(text)
894 def defer_connect(self, guid1, connector_type_name1, guid2,
895 connector_type_name2):
896 msg = testbed_messages[CONNECT]
897 msg = msg % (guid1, connector_type_name1, guid2,
898 connector_type_name2)
899 self._client.send_msg(msg)
900 reply = self._client.read_reply()
901 result = reply.split("|")
902 code = int(result[0])
903 text = base64.b64decode(result[1])
905 raise RuntimeError(text)
907 def defer_cross_connect(self, guid, connector_type_name, cross_guid,
908 cross_testbed_guid, cross_testbed_id, cross_factory_id,
909 cross_connector_type_name):
910 msg = testbed_messages[CROSS_CONNECT]
911 msg = msg % (guid, connector_type_name, cross_guid, cross_testbed_guid,
912 cross_testbed_id, cross_factory_id, cross_connector_type_name)
913 self._client.send_msg(msg)
914 reply = self._client.read_reply()
915 result = reply.split("|")
916 code = int(result[0])
917 text = base64.b64decode(result[1])
919 raise RuntimeError(text)
921 def defer_add_trace(self, guid, trace_id):
922 msg = testbed_messages[ADD_TRACE]
923 msg = msg % (guid, trace_id)
924 self._client.send_msg(msg)
925 reply = self._client.read_reply()
926 result = reply.split("|")
927 code = int(result[0])
928 text = base64.b64decode(result[1])
930 raise RuntimeError(text)
932 def defer_add_address(self, guid, address, netprefix, broadcast):
933 msg = testbed_messages[ADD_ADDRESS]
934 msg = msg % (guid, address, netprefix, broadcast)
935 self._client.send_msg(msg)
936 reply = self._client.read_reply()
937 result = reply.split("|")
938 code = int(result[0])
939 text = base64.b64decode(result[1])
941 raise RuntimeError(text)
943 def defer_add_route(self, guid, destination, netprefix, nexthop):
944 msg = testbed_messages[ADD_ROUTE]
945 msg = msg % (guid, destination, netprefix, nexthop)
946 self._client.send_msg(msg)
947 reply = self._client.read_reply()
948 result = reply.split("|")
949 code = int(result[0])
950 text = base64.b64decode(result[1])
952 raise RuntimeError(text)
955 msg = testbed_messages[DO_SETUP]
956 self._client.send_msg(msg)
957 reply = self._client.read_reply()
958 result = reply.split("|")
959 code = int(result[0])
960 text = base64.b64decode(result[1])
962 raise RuntimeError(text)
965 msg = testbed_messages[DO_CREATE]
966 self._client.send_msg(msg)
967 reply = self._client.read_reply()
968 result = reply.split("|")
969 code = int(result[0])
970 text = base64.b64decode(result[1])
972 raise RuntimeError(text)
974 def do_connect_init(self):
975 msg = testbed_messages[DO_CONNECT_INIT]
976 self._client.send_msg(msg)
977 reply = self._client.read_reply()
978 result = reply.split("|")
979 code = int(result[0])
980 text = base64.b64decode(result[1])
982 raise RuntimeError(text)
984 def do_connect_compl(self):
985 msg = testbed_messages[DO_CONNECT_COMPL]
986 self._client.send_msg(msg)
987 reply = self._client.read_reply()
988 result = reply.split("|")
989 code = int(result[0])
990 text = base64.b64decode(result[1])
992 raise RuntimeError(text)
994 def do_configure(self):
995 msg = testbed_messages[DO_CONFIGURE]
996 self._client.send_msg(msg)
997 reply = self._client.read_reply()
998 result = reply.split("|")
999 code = int(result[0])
1000 text = base64.b64decode(result[1])
1002 raise RuntimeError(text)
1004 def do_preconfigure(self):
1005 msg = testbed_messages[DO_PRECONFIGURE]
1006 self._client.send_msg(msg)
1007 reply = self._client.read_reply()
1008 result = reply.split("|")
1009 code = int(result[0])
1010 text = base64.b64decode(result[1])
1012 raise RuntimeError(text)
1014 def do_cross_connect_init(self, cross_data):
1015 msg = testbed_messages[DO_CROSS_CONNECT_INIT]
1016 pcross_data = cPickle.dumps(cross_data)
1017 cross_data = base64.b64encode(pcross_data)
1018 msg = msg % (cross_data)
1019 self._client.send_msg(msg)
1020 reply = self._client.read_reply()
1021 result = reply.split("|")
1022 code = int(result[0])
1023 text = base64.b64decode(result[1])
1025 raise RuntimeError(text)
1027 def do_cross_connect_compl(self, cross_data):
1028 msg = testbed_messages[DO_CROSS_CONNECT_COMPL]
1029 pcross_data = cPickle.dumps(cross_data)
1030 cross_data = base64.b64encode(pcross_data)
1031 msg = msg % (cross_data)
1032 self._client.send_msg(msg)
1033 reply = self._client.read_reply()
1034 result = reply.split("|")
1035 code = int(result[0])
1036 text = base64.b64decode(result[1])
1038 raise RuntimeError(text)
1040 def start(self, time = TIME_NOW):
1041 msg = testbed_messages[START]
1042 self._client.send_msg(msg)
1043 reply = self._client.read_reply()
1044 result = reply.split("|")
1045 code = int(result[0])
1046 text = base64.b64decode(result[1])
1048 raise RuntimeError(text)
1050 def stop(self, time = TIME_NOW):
1051 msg = testbed_messages[STOP]
1052 self._client.send_msg(msg)
1053 reply = self._client.read_reply()
1054 result = reply.split("|")
1055 code = int(result[0])
1056 text = base64.b64decode(result[1])
1058 raise RuntimeError(text)
1060 def set(self, guid, name, value, time = TIME_NOW):
1061 msg = testbed_messages[SET]
1062 type = get_type(value)
1063 # avoid having "|" in this parameters
1064 name = base64.b64encode(name)
1065 value = base64.b64encode(str(value))
1066 msg = msg % (guid, name, value, type, time)
1067 self._client.send_msg(msg)
1068 reply = self._client.read_reply()
1069 result = reply.split("|")
1070 code = int(result[0])
1071 text = base64.b64decode(result[1])
1073 raise RuntimeError(text)
1075 def get(self, guid, name, time = TIME_NOW):
1076 msg = testbed_messages[GET]
1077 # avoid having "|" in this parameters
1078 name = base64.b64encode(name)
1079 msg = msg % (guid, name, time)
1080 self._client.send_msg(msg)
1081 reply = self._client.read_reply()
1082 result = reply.split("|")
1083 code = int(result[0])
1084 text = base64.b64decode(result[1])
1086 raise RuntimeError(text)
1089 def get_address(self, guid, index, attribute):
1090 msg = testbed_messages[GET_ADDRESS]
1091 # avoid having "|" in this parameters
1092 attribute = base64.b64encode(attribute)
1093 msg = msg % (guid, index, attribute)
1094 self._client.send_msg(msg)
1095 reply = self._client.read_reply()
1096 result = reply.split("|")
1097 code = int(result[0])
1098 text = base64.b64decode(result[1])
1100 raise RuntimeError(text)
1103 def get_route(self, guid, index, attribute):
1104 msg = testbed_messages[GET_ROUTE]
1105 # avoid having "|" in this parameters
1106 attribute = base64.b64encode(attribute)
1107 msg = msg % (guid, index, attribute)
1108 self._client.send_msg(msg)
1109 reply = self._client.read_reply()
1110 result = reply.split("|")
1111 code = int(result[0])
1112 text = base64.b64decode(result[1])
1114 raise RuntimeError(text)
1117 def action(self, time, guid, action):
1118 msg = testbed_messages[ACTION]
1119 msg = msg % (time, guid, action)
1120 self._client.send_msg(msg)
1121 reply = self._client.read_reply()
1122 result = reply.split("|")
1123 code = int(result[0])
1124 text = base64.b64decode(result[1])
1126 raise RuntimeError(text)
1128 def status(self, guid = None):
1129 msg = testbed_messages[STATUS]
1130 msg = msg % str(guid)
1131 self._client.send_msg(msg)
1132 reply = self._client.read_reply()
1133 result = reply.split("|")
1134 code = int(result[0])
1135 text = base64.b64decode(result[1])
1137 raise RuntimeError(text)
1140 def trace(self, guid, trace_id, attribute='value'):
1141 msg = testbed_messages[TRACE]
1142 attribute = base64.b64encode(attribute)
1143 msg = msg % (guid, trace_id, attribute)
1144 self._client.send_msg(msg)
1145 reply = self._client.read_reply()
1146 result = reply.split("|")
1147 code = int(result[0])
1148 text = base64.b64decode(result[1])
1150 raise RuntimeError(text)
1153 def get_attribute_list(self, guid):
1154 msg = testbed_messages[GET_ATTRIBUTE_LIST]
1156 self._client.send_msg(msg)
1157 reply = self._client.read_reply()
1158 result = reply.split("|")
1159 code = int(result[0])
1160 text = base64.b64decode(result[1])
1162 raise RuntimeError(text)
1163 attr_list = cPickle.loads(text)
1167 msg = testbed_messages[SHUTDOWN]
1168 self._client.send_msg(msg)
1169 reply = self._client.read_reply()
1170 result = reply.split("|")
1171 code = int(result[0])
1172 text = base64.b64decode(result[1])
1174 raise RuntimeError(text)
1175 self._client.send_stop()
1176 self._client.read_reply() # wait for it
1178 class ExperimentControllerProxy(object):
1179 def __init__(self, root_dir, log_level, experiment_xml = None,
1180 launch = True, host = None, port = None, user = None,
1184 if experiment_xml == None:
1185 raise RuntimeError("To launch a ExperimentControllerServer a \
1186 xml description of the experiment is required")
1189 xml = experiment_xml
1190 python_code = "from nepi.util.proxy import ExperimentControllerServer;\
1191 s = ExperimentControllerServer(%r, %r, %r);\
1192 s.run()" % (root_dir, log_level, xml)
1193 proc = server.popen_ssh_subprocess(python_code, host = host,
1194 port = port, user = user, agent = agent)
1196 err = proc.stderr.read()
1197 raise RuntimeError("Server could not be executed: %s" % \
1201 s = ExperimentControllerServer(root_dir, log_level, experiment_xml)
1204 # connect client to server
1205 self._client = server.Client(root_dir, host = host, port = port,
1206 user = user, agent = agent)
1209 def experiment_xml(self):
1210 msg = controller_messages[XML]
1211 self._client.send_msg(msg)
1212 reply = self._client.read_reply()
1213 result = reply.split("|")
1214 code = int(result[0])
1215 text = base64.b64decode(result[1])
1217 raise RuntimeError(text)
1220 def set_access_configuration(self, testbed_guid, access_config):
1221 mode = access_config.get_attribute_value("mode")
1222 communication = access_config.get_attribute_value("communication")
1223 host = access_config.get_attribute_value("host")
1224 user = access_config.get_attribute_value("user")
1225 port = access_config.get_attribute_value("port")
1226 root_dir = access_config.get_attribute_value("rootDirectory")
1227 use_agent = access_config.get_attribute_value("useAgent")
1228 log_level = access_config.get_attribute_value("logLevel")
1229 msg = controller_messages[ACCESS]
1230 msg = msg % (testbed_guid, mode, communication, host, user, port,
1231 root_dir, use_agent, log_level)
1232 self._client.send_msg(msg)
1233 reply = self._client.read_reply()
1234 result = reply.split("|")
1235 code = int(result[0])
1236 text = base64.b64decode(result[1])
1238 raise RuntimeError(text)
1240 def trace(self, testbed_guid, guid, trace_id, attribute='value'):
1241 msg = controller_messages[TRACE]
1242 attribute = base64.b64encode(attribute)
1243 msg = msg % (testbed_guid, guid, trace_id, attribute)
1244 self._client.send_msg(msg)
1245 reply = self._client.read_reply()
1246 result = reply.split("|")
1247 code = int(result[0])
1248 text = base64.b64decode(result[1])
1251 raise RuntimeError(text)
1254 msg = controller_messages[START]
1255 self._client.send_msg(msg)
1256 reply = self._client.read_reply()
1257 result = reply.split("|")
1258 code = int(result[0])
1259 text = base64.b64decode(result[1])
1261 raise RuntimeError(text)
1264 msg = controller_messages[STOP]
1265 self._client.send_msg(msg)
1266 reply = self._client.read_reply()
1267 result = reply.split("|")
1268 code = int(result[0])
1269 text = base64.b64decode(result[1])
1271 raise RuntimeError(text)
1274 msg = controller_messages[RECOVER]
1275 self._client.send_msg(msg)
1276 reply = self._client.read_reply()
1277 result = reply.split("|")
1278 code = int(result[0])
1279 text = base64.b64decode(result[1])
1281 raise RuntimeError(text)
1283 def is_finished(self, guid):
1284 msg = controller_messages[FINISHED]
1286 self._client.send_msg(msg)
1287 reply = self._client.read_reply()
1288 result = reply.split("|")
1289 code = int(result[0])
1290 text = base64.b64decode(result[1])
1292 raise RuntimeError(text)
1293 return text == "True"
1295 def set(self, testbed_guid, guid, name, value, time = TIME_NOW):
1296 msg = testbed_messages[EXPERIMENT_SET]
1297 type = get_type(value)
1298 # avoid having "|" in this parameters
1299 name = base64.b64encode(name)
1300 value = base64.b64encode(str(value))
1301 msg = msg % (testbed_guid, guid, name, value, type, time)
1302 self._client.send_msg(msg)
1303 reply = self._client.read_reply()
1304 result = reply.split("|")
1305 code = int(result[0])
1306 text = base64.b64decode(result[1])
1308 raise RuntimeError(text)
1310 def get(self, testbed_guid, guid, name, time = TIME_NOW):
1311 msg = testbed_messages[EXPERIMENT_GET]
1312 # avoid having "|" in this parameters
1313 name = base64.b64encode(name)
1314 msg = msg % (testbed_guid, guid, name, time)
1315 self._client.send_msg(msg)
1316 reply = self._client.read_reply()
1317 result = reply.split("|")
1318 code = int(result[0])
1319 text = base64.b64decode(result[1])
1321 raise RuntimeError(text)
1325 msg = controller_messages[SHUTDOWN]
1326 self._client.send_msg(msg)
1327 reply = self._client.read_reply()
1328 result = reply.split("|")
1329 code = int(result[0])
1330 text = base64.b64decode(result[1])
1332 raise RuntimeError(text)
1333 self._client.send_stop()
1334 self._client.read_reply() # wait for it