2 # -*- coding: utf-8 -*-
5 from nepi.core.attributes import AttributesMap, Attribute
6 from nepi.util import server, validation
7 from nepi.util.constants import TIME_NOW
19 # PROTOCOL INSTRUCTION MESSAGES
40 DO_CROSS_CONNECT_INIT = 22
50 GET_ATTRIBUTE_LIST = 32
52 DO_CROSS_CONNECT_COMPL = 34
62 # EXPERIMENT CONTROLER PROTOCOL MESSAGES
63 controller_messages = dict({
65 ACCESS: "%d|%s" % (ACCESS, "%d|%s|%s|%s|%s|%d|%s|%r|%s"),
66 TRACE: "%d|%s" % (TRACE, "%d|%d|%s|%s"),
67 FINISHED: "%d|%s" % (FINISHED, "%d"),
70 RECOVER : "%d" % RECOVER,
71 SHUTDOWN: "%d" % SHUTDOWN,
74 # TESTBED INSTANCE PROTOCOL MESSAGES
75 testbed_messages = dict({
76 TRACE: "%d|%s" % (TRACE, "%d|%s|%s"),
79 SHUTDOWN: "%d" % SHUTDOWN,
80 CONFIGURE: "%d|%s" % (CONFIGURE, "%s|%s|%d"),
81 CREATE: "%d|%s" % (CREATE, "%d|%s"),
82 CREATE_SET: "%d|%s" % (CREATE_SET, "%d|%s|%s|%d"),
83 FACTORY_SET: "%d|%s" % (FACTORY_SET, "%d|%s|%s|%d"),
84 CONNECT: "%d|%s" % (CONNECT, "%d|%s|%d|%s"),
85 CROSS_CONNECT: "%d|%s" % (CROSS_CONNECT, "%d|%s|%d|%d|%s|%s"),
86 ADD_TRACE: "%d|%s" % (ADD_TRACE, "%d|%s"),
87 ADD_ADDRESS: "%d|%s" % (ADD_ADDRESS, "%d|%s|%d|%s"),
88 ADD_ROUTE: "%d|%s" % (ADD_ROUTE, "%d|%s|%d|%s"),
89 DO_SETUP: "%d" % DO_SETUP,
90 DO_CREATE: "%d" % DO_CREATE,
91 DO_CONNECT_INIT: "%d" % DO_CONNECT_INIT,
92 DO_CONNECT_COMPL: "%d" % DO_CONNECT_COMPL,
93 DO_CONFIGURE: "%d" % DO_CONFIGURE,
94 DO_PRECONFIGURE: "%d" % DO_PRECONFIGURE,
95 DO_CROSS_CONNECT_INIT: "%d|%s" % (DO_CROSS_CONNECT_INIT, "%s"),
96 DO_CROSS_CONNECT_COMPL: "%d|%s" % (DO_CROSS_CONNECT_COMPL, "%s"),
97 GET: "%d|%s" % (GET, "%s|%d|%s"),
98 SET: "%d|%s" % (SET, "%s|%d|%s|%s|%d"),
99 GET_ROUTE: "%d|%s" % (GET, "%d|%d|%s"),
100 GET_ADDRESS: "%d|%s" % (GET, "%d|%d|%s"),
101 ACTION: "%d|%s" % (ACTION, "%s|%d|%s"),
102 STATUS: "%d|%s" % (STATUS, "%s"),
104 GET_ATTRIBUTE_LIST: "%d" % GET_ATTRIBUTE_LIST,
105 TESTBED_ID: "%d" % TESTBED_ID,
106 TESTBED_VERSION: "%d" % TESTBED_VERSION,
109 instruction_text = dict({
115 FINISHED: "FINISHED",
119 SHUTDOWN: "SHUTDOWN",
120 CONFIGURE: "CONFIGURE",
122 CREATE_SET: "CREATE_SET",
123 FACTORY_SET: "FACTORY_SET",
125 CROSS_CONNECT: "CROSS_CONNECT",
126 ADD_TRACE: "ADD_TRACE",
127 ADD_ADDRESS: "ADD_ADDRESS",
128 ADD_ROUTE: "ADD_ROUTE",
129 DO_SETUP: "DO_SETUP",
130 DO_CREATE: "DO_CREATE",
131 DO_CONNECT_INIT: "DO_CONNECT_INIT",
132 DO_CONNECT_COMPL: "DO_CONNECT_COMPL",
133 DO_CONFIGURE: "DO_CONFIGURE",
134 DO_PRECONFIGURE: "DO_PRECONFIGURE",
135 DO_CROSS_CONNECT_INIT: "DO_CROSS_CONNECT_INIT",
136 DO_CROSS_CONNECT_COMPL: "DO_CROSS_CONNECT_COMPL",
139 GET_ROUTE: "GET_ROUTE",
140 GET_ADDRESS: "GET_ADDRESS",
141 GET_ATTRIBUTE_LIST: "GET_ATTRIBUTE_LIST",
149 TESTBED_ID: "TESTBED_ID",
150 TESTBED_VERSION: "TESTBED_VERSION",
154 if isinstance(value, bool):
156 elif isinstance(value, int):
158 elif isinstance(value, float):
163 def set_type(type, value):
169 value = value == "True"
174 def log_msg(server, params):
175 instr = int(params[0])
176 instr_txt = instruction_text[instr]
177 server.log_debug("%s - msg: %s [%s]" % (server.__class__.__name__,
178 instr_txt, ", ".join(map(str, params[1:]))))
180 def log_reply(server, reply):
181 res = reply.split("|")
183 code_txt = instruction_text[code]
184 txt = base64.b64decode(res[1])
185 server.log_debug("%s - reply: %s %s" % (server.__class__.__name__,
188 def to_server_log_level(log_level):
189 return server.DEBUG_LEVEL \
190 if log_level == AccessConfiguration.DEBUG_LEVEL \
191 else server.ERROR_LEVEL
193 def get_access_config_params(access_config):
194 root_dir = access_config.get_attribute_value("rootDirectory")
195 log_level = access_config.get_attribute_value("logLevel")
196 log_level = to_server_log_level(log_level)
197 user = host = port = agent = None
198 communication = access_config.get_attribute_value("communication")
199 if communication == AccessConfiguration.ACCESS_SSH:
200 user = access_config.get_attribute_value("user")
201 host = access_config.get_attribute_value("host")
202 port = access_config.get_attribute_value("port")
203 agent = access_config.get_attribute_value("useAgent")
204 return (root_dir, log_level, user, host, port, agent)
206 class AccessConfiguration(AttributesMap):
207 MODE_SINGLE_PROCESS = "SINGLE"
208 MODE_DAEMON = "DAEMON"
210 ACCESS_LOCAL = "LOCAL"
211 ERROR_LEVEL = "Error"
212 DEBUG_LEVEL = "Debug"
215 super(AccessConfiguration, self).__init__()
216 self.add_attribute(name = "mode",
217 help = "Instance execution mode",
218 type = Attribute.ENUM,
219 value = AccessConfiguration.MODE_SINGLE_PROCESS,
220 allowed = [AccessConfiguration.MODE_DAEMON,
221 AccessConfiguration.MODE_SINGLE_PROCESS],
222 validation_function = validation.is_enum)
223 self.add_attribute(name = "communication",
224 help = "Instance communication mode",
225 type = Attribute.ENUM,
226 value = AccessConfiguration.ACCESS_LOCAL,
227 allowed = [AccessConfiguration.ACCESS_LOCAL,
228 AccessConfiguration.ACCESS_SSH],
229 validation_function = validation.is_enum)
230 self.add_attribute(name = "host",
231 help = "Host where the testbed will be executed",
232 type = Attribute.STRING,
234 validation_function = validation.is_string)
235 self.add_attribute(name = "user",
236 help = "User on the Host to execute the testbed",
237 type = Attribute.STRING,
238 value = getpass.getuser(),
239 validation_function = validation.is_string)
240 self.add_attribute(name = "port",
241 help = "Port on the Host",
242 type = Attribute.INTEGER,
244 validation_function = validation.is_integer)
245 self.add_attribute(name = "rootDirectory",
246 help = "Root directory for storing process files",
247 type = Attribute.STRING,
249 validation_function = validation.is_string) # TODO: validation.is_path
250 self.add_attribute(name = "useAgent",
251 help = "Use -A option for forwarding of the authentication agent, if ssh access is used",
252 type = Attribute.BOOL,
254 validation_function = validation.is_bool)
255 self.add_attribute(name = "logLevel",
256 help = "Log level for instance",
257 type = Attribute.ENUM,
258 value = AccessConfiguration.ERROR_LEVEL,
259 allowed = [AccessConfiguration.ERROR_LEVEL,
260 AccessConfiguration.DEBUG_LEVEL],
261 validation_function = validation.is_enum)
262 self.add_attribute(name = "recover",
263 help = "Do not intantiate testbeds, rather, reconnect to already-running instances. Used to recover from a dead controller.",
264 type = Attribute.BOOL,
266 validation_function = validation.is_bool)
268 class TempDir(object):
270 self.path = tempfile.mkdtemp()
273 shutil.rmtree(self.path)
275 class PermDir(object):
276 def __init__(self, path):
279 def create_controller(xml, access_config = None):
280 mode = None if not access_config \
281 else access_config.get_attribute_value("mode")
282 launch = True if not access_config \
283 else not access_config.get_attribute_value("recover")
284 if not mode or mode == AccessConfiguration.MODE_SINGLE_PROCESS:
286 raise ValueError, "Unsupported instantiation mode: %s with lanch=False" % (mode,)
288 from nepi.core.execute import ExperimentController
290 if not access_config or not access_config.has_attribute("rootDirectory"):
293 root_dir = PermDir(access_config.get_attribute_value("rootDirectory"))
294 controller = ExperimentController(xml, root_dir.path)
296 # inject reference to temporary dir, so that it gets cleaned
297 # up at destruction time.
298 controller._tempdir = root_dir
301 elif mode == AccessConfiguration.MODE_DAEMON:
302 (root_dir, log_level, user, host, port, agent) = \
303 get_access_config_params(access_config)
304 return ExperimentControllerProxy(root_dir, log_level,
305 experiment_xml = xml, host = host, port = port, user = user,
306 agent = agent, launch = launch)
307 raise RuntimeError("Unsupported access configuration '%s'" % mode)
309 def create_testbed_controller(testbed_id, testbed_version, access_config):
310 mode = None if not access_config \
311 else access_config.get_attribute_value("mode")
312 launch = True if not access_config \
313 else not access_config.get_attribute_value("recover")
314 if not mode or mode == AccessConfiguration.MODE_SINGLE_PROCESS:
316 raise ValueError, "Unsupported instantiation mode: %s with lanch=False" % (mode,)
317 return _build_testbed_controller(testbed_id, testbed_version)
318 elif mode == AccessConfiguration.MODE_DAEMON:
319 (root_dir, log_level, user, host, port, agent) = \
320 get_access_config_params(access_config)
321 return TestbedControllerProxy(root_dir, log_level, testbed_id = testbed_id,
322 testbed_version = testbed_version, host = host, port = port,
323 user = user, agent = agent, launch = launch)
324 raise RuntimeError("Unsupported access configuration '%s'" % mode)
326 def _build_testbed_controller(testbed_id, testbed_version):
327 mod_name = "nepi.testbeds.%s" % (testbed_id.lower())
328 if not mod_name in sys.modules:
330 module = sys.modules[mod_name]
331 return module.TestbedController(testbed_version)
333 class TestbedControllerServer(server.Server):
334 def __init__(self, root_dir, log_level, testbed_id, testbed_version):
335 super(TestbedControllerServer, self).__init__(root_dir, log_level)
336 self._testbed_id = testbed_id
337 self._testbed_version = testbed_version
340 def post_daemonize(self):
341 self._testbed = _build_testbed_controller(self._testbed_id,
342 self._testbed_version)
344 def reply_action(self, msg):
346 result = base64.b64encode("Invalid command line")
347 reply = "%d|%s" % (ERROR, result)
349 params = msg.split("|")
350 instruction = int(params[0])
351 log_msg(self, params)
353 if instruction == TRACE:
354 reply = self.trace(params)
355 elif instruction == START:
356 reply = self.start(params)
357 elif instruction == STOP:
358 reply = self.stop(params)
359 elif instruction == SHUTDOWN:
360 reply = self.shutdown(params)
361 elif instruction == CONFIGURE:
362 reply = self.defer_configure(params)
363 elif instruction == CREATE:
364 reply = self.defer_create(params)
365 elif instruction == CREATE_SET:
366 reply = self.defer_create_set(params)
367 elif instruction == FACTORY_SET:
368 reply = self.defer_factory_set(params)
369 elif instruction == CONNECT:
370 reply = self.defer_connect(params)
371 elif instruction == CROSS_CONNECT:
372 reply = self.defer_cross_connect(params)
373 elif instruction == ADD_TRACE:
374 reply = self.defer_add_trace(params)
375 elif instruction == ADD_ADDRESS:
376 reply = self.defer_add_address(params)
377 elif instruction == ADD_ROUTE:
378 reply = self.defer_add_route(params)
379 elif instruction == DO_SETUP:
380 reply = self.do_setup(params)
381 elif instruction == DO_CREATE:
382 reply = self.do_create(params)
383 elif instruction == DO_CONNECT_INIT:
384 reply = self.do_connect_init(params)
385 elif instruction == DO_CONNECT_COMPL:
386 reply = self.do_connect_compl(params)
387 elif instruction == DO_CONFIGURE:
388 reply = self.do_configure(params)
389 elif instruction == DO_PRECONFIGURE:
390 reply = self.do_preconfigure(params)
391 elif instruction == DO_CROSS_CONNECT_INIT:
392 reply = self.do_cross_connect_init(params)
393 elif instruction == DO_CROSS_CONNECT_COMPL:
394 reply = self.do_cross_connect_compl(params)
395 elif instruction == GET:
396 reply = self.get(params)
397 elif instruction == SET:
398 reply = self.set(params)
399 elif instruction == GET_ADDRESS:
400 reply = self.get_address(params)
401 elif instruction == GET_ROUTE:
402 reply = self.get_route(params)
403 elif instruction == ACTION:
404 reply = self.action(params)
405 elif instruction == STATUS:
406 reply = self.status(params)
407 elif instruction == GUIDS:
408 reply = self.guids(params)
409 elif instruction == GET_ATTRIBUTE_LIST:
410 reply = self.get_attribute_list(params)
411 elif instruction == TESTBED_ID:
412 reply = self.testbed_id(params)
413 elif instruction == TESTBED_VERSION:
414 reply = self.testbed_version(params)
416 error = "Invalid instruction %s" % instruction
417 self.log_error(error)
418 result = base64.b64encode(error)
419 reply = "%d|%s" % (ERROR, result)
421 error = self.log_error()
422 result = base64.b64encode(error)
423 reply = "%d|%s" % (ERROR, result)
424 log_reply(self, reply)
427 def guids(self, params):
428 guids = self._testbed.guids
429 value = cPickle.dumps(guids)
430 result = base64.b64encode(value)
431 return "%d|%s" % (OK, result)
433 def testbed_id(self, params):
434 testbed_id = self._testbed.testbed_id
435 result = base64.b64encode(str(testbed_id))
436 return "%d|%s" % (OK, result)
438 def testbed_version(self, params):
439 testbed_version = self._testbed.testbed_version
440 result = base64.b64encode(str(testbed_version))
441 return "%d|%s" % (OK, result)
443 def defer_create(self, params):
444 guid = int(params[1])
445 factory_id = params[2]
446 self._testbed.defer_create(guid, factory_id)
447 return "%d|%s" % (OK, "")
449 def trace(self, params):
450 guid = int(params[1])
452 attribute = base64.b64decode(params[3])
453 trace = self._testbed.trace(guid, trace_id, attribute)
454 result = base64.b64encode(trace)
455 return "%d|%s" % (OK, result)
457 def start(self, params):
458 self._testbed.start()
459 return "%d|%s" % (OK, "")
461 def stop(self, params):
463 return "%d|%s" % (OK, "")
465 def shutdown(self, params):
466 self._testbed.shutdown()
467 return "%d|%s" % (OK, "")
469 def defer_configure(self, params):
470 name = base64.b64decode(params[1])
471 value = base64.b64decode(params[2])
472 type = int(params[3])
473 value = set_type(type, value)
474 self._testbed.defer_configure(name, value)
475 return "%d|%s" % (OK, "")
477 def defer_create_set(self, params):
478 guid = int(params[1])
479 name = base64.b64decode(params[2])
480 value = base64.b64decode(params[3])
481 type = int(params[4])
482 value = set_type(type, value)
483 self._testbed.defer_create_set(guid, name, value)
484 return "%d|%s" % (OK, "")
486 def defer_factory_set(self, params):
487 name = base64.b64decode(params[1])
488 value = base64.b64decode(params[2])
489 type = int(params[3])
490 value = set_type(type, value)
491 self._testbed.defer_factory_set(name, value)
492 return "%d|%s" % (OK, "")
494 def defer_connect(self, params):
495 guid1 = int(params[1])
496 connector_type_name1 = params[2]
497 guid2 = int(params[3])
498 connector_type_name2 = params[4]
499 self._testbed.defer_connect(guid1, connector_type_name1, guid2,
500 connector_type_name2)
501 return "%d|%s" % (OK, "")
503 def defer_cross_connect(self, params):
504 guid = int(params[1])
505 connector_type_name = params[2]
506 cross_guid = int(params[3])
507 connector_type_name = params[4]
508 cross_guid = int(params[5])
509 cross_testbed_id = params[6]
510 cross_factory_id = params[7]
511 cross_connector_type_name = params[8]
512 self._testbed.defer_cross_connect(guid, connector_type_name, cross_guid,
513 cross_testbed_id, cross_factory_id, cross_connector_type_name)
514 return "%d|%s" % (OK, "")
516 def defer_add_trace(self, params):
517 guid = int(params[1])
519 self._testbed.defer_add_trace(guid, trace_id)
520 return "%d|%s" % (OK, "")
522 def defer_add_address(self, params):
523 guid = int(params[1])
525 netprefix = int(params[3])
526 broadcast = params[4]
527 self._testbed.defer_add_address(guid, address, netprefix,
529 return "%d|%s" % (OK, "")
531 def defer_add_route(self, params):
532 guid = int(params[1])
533 destination = params[2]
534 netprefix = int(params[3])
536 self._testbed.defer_add_route(guid, destination, netprefix, nexthop)
537 return "%d|%s" % (OK, "")
539 def do_setup(self, params):
540 self._testbed.do_setup()
541 return "%d|%s" % (OK, "")
543 def do_create(self, params):
544 self._testbed.do_create()
545 return "%d|%s" % (OK, "")
547 def do_connect_init(self, params):
548 self._testbed.do_connect_init()
549 return "%d|%s" % (OK, "")
551 def do_connect_compl(self, params):
552 self._testbed.do_connect_compl()
553 return "%d|%s" % (OK, "")
555 def do_configure(self, params):
556 self._testbed.do_configure()
557 return "%d|%s" % (OK, "")
559 def do_preconfigure(self, params):
560 self._testbed.do_preconfigure()
561 return "%d|%s" % (OK, "")
563 def do_cross_connect_init(self, params):
564 pcross_data = base64.b64decode(params[1])
565 cross_data = cPickle.loads(pcross_data)
566 self._testbed.do_cross_connect_init(cross_data)
567 return "%d|%s" % (OK, "")
569 def do_cross_connect_compl(self, params):
570 pcross_data = base64.b64decode(params[1])
571 cross_data = cPickle.loads(pcross_data)
572 self._testbed.do_cross_connect_compl(cross_data)
573 return "%d|%s" % (OK, "")
575 def get(self, params):
578 name = base64.b64decode(params[3])
579 value = self._testbed.get(time, guid, name)
580 result = base64.b64encode(str(value))
581 return "%d|%s" % (OK, result)
583 def set(self, params):
585 guid = int(params[2])
586 name = base64.b64decode(params[3])
587 value = base64.b64decode(params[4])
588 type = int(params[3])
589 value = set_type(type, value)
590 self._testbed.set(time, guid, name, value)
591 return "%d|%s" % (OK, "")
593 def get_address(self, params):
594 guid = int(params[1])
595 index = int(params[2])
596 attribute = base64.b64decode(param[3])
597 value = self._testbed.get_address(guid, index, attribute)
598 result = base64.b64encode(str(value))
599 return "%d|%s" % (OK, result)
601 def get_route(self, params):
602 guid = int(params[1])
603 index = int(params[2])
604 attribute = base64.b64decode(param[3])
605 value = self._testbed.get_route(guid, index, attribute)
606 result = base64.b64encode(str(value))
607 return "%d|%s" % (OK, result)
609 def action(self, params):
611 guid = int(params[2])
612 command = base64.b64decode(params[3])
613 self._testbed.action(time, guid, command)
614 return "%d|%s" % (OK, "")
616 def status(self, params):
618 if params[1] != "None":
619 guid = int(params[1])
620 status = self._testbed.status(guid)
621 result = base64.b64encode(str(status))
622 return "%d|%s" % (OK, result)
624 def get_attribute_list(self, params):
625 guid = int(params[1])
626 attr_list = self._testbed.get_attribute_list(guid)
627 value = cPickle.dumps(attr_list)
628 result = base64.b64encode(value)
629 return "%d|%s" % (OK, result)
631 class ExperimentControllerServer(server.Server):
632 def __init__(self, root_dir, log_level, experiment_xml):
633 super(ExperimentControllerServer, self).__init__(root_dir, log_level)
634 self._experiment_xml = experiment_xml
635 self._controller = None
637 def post_daemonize(self):
638 from nepi.core.execute import ExperimentController
639 self._controller = ExperimentController(self._experiment_xml,
640 root_dir = self._root_dir)
642 def reply_action(self, msg):
644 result = base64.b64encode("Invalid command line")
645 reply = "%d|%s" % (ERROR, result)
647 params = msg.split("|")
648 instruction = int(params[0])
649 log_msg(self, params)
651 if instruction == XML:
652 reply = self.experiment_xml(params)
653 elif instruction == ACCESS:
654 reply = self.set_access_configuration(params)
655 elif instruction == TRACE:
656 reply = self.trace(params)
657 elif instruction == FINISHED:
658 reply = self.is_finished(params)
659 elif instruction == START:
660 reply = self.start(params)
661 elif instruction == STOP:
662 reply = self.stop(params)
663 elif instruction == RECOVER:
664 reply = self.recover(params)
665 elif instruction == SHUTDOWN:
666 reply = self.shutdown(params)
668 error = "Invalid instruction %s" % instruction
669 self.log_error(error)
670 result = base64.b64encode(error)
671 reply = "%d|%s" % (ERROR, result)
673 error = self.log_error()
674 result = base64.b64encode(error)
675 reply = "%d|%s" % (ERROR, result)
676 log_reply(self, reply)
679 def experiment_xml(self, params):
680 xml = self._controller.experiment_xml
681 result = base64.b64encode(xml)
682 return "%d|%s" % (OK, result)
684 def set_access_configuration(self, params):
685 testbed_guid = int(params[1])
687 communication = params[3]
690 port = int(params[6])
692 use_agent = params[8] == "True"
693 log_level = params[9]
694 access_config = AccessConfiguration()
695 access_config.set_attribute_value("mode", mode)
696 access_config.set_attribute_value("communication", communication)
697 access_config.set_attribute_value("host", host)
698 access_config.set_attribute_value("user", user)
699 access_config.set_attribute_value("port", port)
700 access_config.set_attribute_value("rootDirectory", root_dir)
701 access_config.set_attribute_value("useAgent", use_agent)
702 access_config.set_attribute_value("logLevel", log_level)
703 self._controller.set_access_configuration(testbed_guid,
705 return "%d|%s" % (OK, "")
707 def trace(self, params):
708 testbed_guid = int(params[1])
709 guid = int(params[2])
711 attribute = base64.b64decode(params[4])
712 trace = self._controller.trace(testbed_guid, guid, trace_id, attribute)
713 result = base64.b64encode(trace)
714 return "%d|%s" % (OK, result)
716 def is_finished(self, params):
717 guid = int(params[1])
718 status = self._controller.is_finished(guid)
719 result = base64.b64encode(str(status))
720 return "%d|%s" % (OK, result)
722 def start(self, params):
723 self._controller.start()
724 return "%d|%s" % (OK, "")
726 def stop(self, params):
727 self._controller.stop()
728 return "%d|%s" % (OK, "")
730 def recover(self, params):
731 self._controller.recover()
732 return "%d|%s" % (OK, "")
734 def shutdown(self, params):
735 self._controller.shutdown()
736 return "%d|%s" % (OK, "")
738 class TestbedControllerProxy(object):
739 def __init__(self, root_dir, log_level, testbed_id = None,
740 testbed_version = None, launch = True, host = None,
741 port = None, user = None, agent = None):
743 if testbed_id == None or testbed_version == None:
744 raise RuntimeError("To launch a TesbedInstance server a \
745 testbed_id and testbed_version are required")
748 python_code = "from nepi.util.proxy import \
749 TesbedInstanceServer;\
750 s = TestbedControllerServer('%s', %d, '%s', '%s');\
751 s.run()" % (root_dir, log_level, testbed_id,
753 proc = server.popen_ssh_subprocess(python_code, host = host,
754 port = port, user = user, agent = agent)
756 err = proc.stderr.read()
757 raise RuntimeError("Server could not be executed: %s" % \
761 s = TestbedControllerServer(root_dir, log_level, testbed_id,
765 # connect client to server
766 self._client = server.Client(root_dir, host = host, port = port,
767 user = user, agent = agent)
771 msg = testbed_messages[GUIDS]
772 self._client.send_msg(msg)
773 reply = self._client.read_reply()
774 result = reply.split("|")
775 code = int(result[0])
776 text = base64.b64decode(result[1])
778 raise RuntimeError(text)
779 guids = cPickle.loads(text)
783 def testbed_id(self):
784 msg = testbed_messages[TESTBED_ID]
785 self._client.send_msg(msg)
786 reply = self._client.read_reply()
787 result = reply.split("|")
788 code = int(result[0])
789 text = base64.b64decode(result[1])
791 raise RuntimeError(text)
795 def testbed_version(self):
796 msg = testbed_messages[TESTBED_VERSION]
797 self._client.send_msg(msg)
798 reply = self._client.read_reply()
799 result = reply.split("|")
800 code = int(result[0])
801 text = base64.b64decode(result[1])
803 raise RuntimeError(text)
806 def defer_configure(self, name, value):
807 msg = testbed_messages[CONFIGURE]
808 type = get_type(value)
809 # avoid having "|" in this parameters
810 name = base64.b64encode(name)
811 value = base64.b64encode(str(value))
812 msg = msg % (name, value, type)
813 self._client.send_msg(msg)
814 reply = self._client.read_reply()
815 result = reply.split("|")
816 code = int(result[0])
817 text = base64.b64decode(result[1])
819 raise RuntimeError(text)
821 def defer_create(self, guid, factory_id):
822 msg = testbed_messages[CREATE]
823 msg = msg % (guid, factory_id)
824 self._client.send_msg(msg)
825 reply = self._client.read_reply()
826 result = reply.split("|")
827 code = int(result[0])
828 text = base64.b64decode(result[1])
830 raise RuntimeError(text)
832 def defer_create_set(self, guid, name, value):
833 msg = testbed_messages[CREATE_SET]
834 type = get_type(value)
835 # avoid having "|" in this parameters
836 name = base64.b64encode(name)
837 value = base64.b64encode(str(value))
838 msg = msg % (guid, name, value, type)
839 self._client.send_msg(msg)
840 reply = self._client.read_reply()
841 result = reply.split("|")
842 code = int(result[0])
843 text = base64.b64decode(result[1])
845 raise RuntimeError(text)
847 def defer_factory_set(self, guid, name, value):
848 msg = testbed_messages[FACTORY_SET]
849 type = get_type(value)
850 # avoid having "|" in this parameters
851 name = base64.b64encode(name)
852 value = base64.b64encode(str(value))
853 msg = msg % (guid, name, value, type)
854 self._client.send_msg(msg)
855 reply = self._client.read_reply()
856 result = reply.split("|")
857 code = int(result[0])
858 text = base64.b64decode(result[1])
860 raise RuntimeError(text)
862 def defer_connect(self, guid1, connector_type_name1, guid2,
863 connector_type_name2):
864 msg = testbed_messages[CONNECT]
865 msg = msg % (guid1, connector_type_name1, guid2,
866 connector_type_name2)
867 self._client.send_msg(msg)
868 reply = self._client.read_reply()
869 result = reply.split("|")
870 code = int(result[0])
871 text = base64.b64decode(result[1])
873 raise RuntimeError(text)
875 def defer_cross_connect(self, guid, connector_type_name, cross_guid,
876 cross_testbed_id, cross_factory_id, cross_connector_type_name):
877 msg = testbed_messages[CROSS_CONNECT]
878 msg = msg % (guid, connector_type_name, cross_guid,
879 cross_testbed_id, cross_factory_id, cross_connector_type_name)
880 self._client.send_msg(msg)
881 reply = self._client.read_reply()
882 result = reply.split("|")
883 code = int(result[0])
884 text = base64.b64decode(result[1])
886 raise RuntimeError(text)
888 def defer_add_trace(self, guid, trace_id):
889 msg = testbed_messages[ADD_TRACE]
890 msg = msg % (guid, trace_id)
891 self._client.send_msg(msg)
892 reply = self._client.read_reply()
893 result = reply.split("|")
894 code = int(result[0])
895 text = base64.b64decode(result[1])
897 raise RuntimeError(text)
899 def defer_add_address(self, guid, address, netprefix, broadcast):
900 msg = testbed_messages[ADD_ADDRESS]
901 msg = msg % (guid, address, netprefix, broadcast)
902 self._client.send_msg(msg)
903 reply = self._client.read_reply()
904 result = reply.split("|")
905 code = int(result[0])
906 text = base64.b64decode(result[1])
908 raise RuntimeError(text)
910 def defer_add_route(self, guid, destination, netprefix, nexthop):
911 msg = testbed_messages[ADD_ROUTE]
912 msg = msg % (guid, destination, netprefix, nexthop)
913 self._client.send_msg(msg)
914 reply = self._client.read_reply()
915 result = reply.split("|")
916 code = int(result[0])
917 text = base64.b64decode(result[1])
919 raise RuntimeError(text)
922 msg = testbed_messages[DO_SETUP]
923 self._client.send_msg(msg)
924 reply = self._client.read_reply()
925 result = reply.split("|")
926 code = int(result[0])
927 text = base64.b64decode(result[1])
929 raise RuntimeError(text)
932 msg = testbed_messages[DO_CREATE]
933 self._client.send_msg(msg)
934 reply = self._client.read_reply()
935 result = reply.split("|")
936 code = int(result[0])
937 text = base64.b64decode(result[1])
939 raise RuntimeError(text)
941 def do_connect_init(self):
942 msg = testbed_messages[DO_CONNECT_INIT]
943 self._client.send_msg(msg)
944 reply = self._client.read_reply()
945 result = reply.split("|")
946 code = int(result[0])
947 text = base64.b64decode(result[1])
949 raise RuntimeError(text)
951 def do_connect_compl(self):
952 msg = testbed_messages[DO_CONNECT_COMPL]
953 self._client.send_msg(msg)
954 reply = self._client.read_reply()
955 result = reply.split("|")
956 code = int(result[0])
957 text = base64.b64decode(result[1])
959 raise RuntimeError(text)
961 def do_configure(self):
962 msg = testbed_messages[DO_CONFIGURE]
963 self._client.send_msg(msg)
964 reply = self._client.read_reply()
965 result = reply.split("|")
966 code = int(result[0])
967 text = base64.b64decode(result[1])
969 raise RuntimeError(text)
971 def do_preconfigure(self):
972 msg = testbed_messages[DO_PRECONFIGURE]
973 self._client.send_msg(msg)
974 reply = self._client.read_reply()
975 result = reply.split("|")
976 code = int(result[0])
977 text = base64.b64decode(result[1])
979 raise RuntimeError(text)
981 def do_cross_connect_init(self, cross_data):
982 msg = testbed_messages[DO_CROSS_CONNECT_INIT]
983 pcross_data = cPickle.dumps(cross_data)
984 cross_data = base64.b64encode(pcross_data)
985 msg = msg % (cross_data)
986 self._client.send_msg(msg)
987 reply = self._client.read_reply()
988 result = reply.split("|")
989 code = int(result[0])
990 text = base64.b64decode(result[1])
992 raise RuntimeError(text)
994 def do_cross_connect_compl(self, cross_data):
995 msg = testbed_messages[DO_CROSS_CONNECT_COMPL]
996 pcross_data = cPickle.dumps(cross_data)
997 cross_data = base64.b64encode(pcross_data)
998 msg = msg % (cross_data)
999 self._client.send_msg(msg)
1000 reply = self._client.read_reply()
1001 result = reply.split("|")
1002 code = int(result[0])
1003 text = base64.b64decode(result[1])
1005 raise RuntimeError(text)
1007 def start(self, time = TIME_NOW):
1008 msg = testbed_messages[START]
1009 self._client.send_msg(msg)
1010 reply = self._client.read_reply()
1011 result = reply.split("|")
1012 code = int(result[0])
1013 text = base64.b64decode(result[1])
1015 raise RuntimeError(text)
1017 def stop(self, time = TIME_NOW):
1018 msg = testbed_messages[STOP]
1019 self._client.send_msg(msg)
1020 reply = self._client.read_reply()
1021 result = reply.split("|")
1022 code = int(result[0])
1023 text = base64.b64decode(result[1])
1025 raise RuntimeError(text)
1027 def set(self, time, guid, name, value):
1028 msg = testbed_messages[SET]
1029 type = get_type(value)
1030 # avoid having "|" in this parameters
1031 name = base64.b64encode(name)
1032 value = base64.b64encode(str(value))
1033 msg = msg % (time, guid, name, value, type)
1034 self._client.send_msg(msg)
1035 reply = self._client.read_reply()
1036 result = reply.split("|")
1037 code = int(result[0])
1038 text = base64.b64decode(result[1])
1040 raise RuntimeError(text)
1042 def get(self, time, guid, name):
1043 msg = testbed_messages[GET]
1044 # avoid having "|" in this parameters
1045 name = base64.b64encode(name)
1046 msg = msg % (time, guid, name)
1047 self._client.send_msg(msg)
1048 reply = self._client.read_reply()
1049 result = reply.split("|")
1050 code = int(result[0])
1051 text = base64.b64decode(result[1])
1053 raise RuntimeError(text)
1056 def get_address(self, guid, index, attribute):
1057 msg = testbed_messages[GET_ADDRESS]
1058 # avoid having "|" in this parameters
1059 attribute = base64.b64encode(attribute)
1060 msg = msg % (guid, index, attribute)
1061 self._client.send_msg(msg)
1062 reply = self._client.read_reply()
1063 result = reply.split("|")
1064 code = int(result[0])
1065 text = base64.b64decode(result[1])
1067 raise RuntimeError(text)
1070 def get_route(self, guid, index, attribute):
1071 msg = testbed_messages[GET_ROUTE]
1072 # avoid having "|" in this parameters
1073 attribute = base64.b64encode(attribute)
1074 msg = msg % (guid, index, attribute)
1075 self._client.send_msg(msg)
1076 reply = self._client.read_reply()
1077 result = reply.split("|")
1078 code = int(result[0])
1079 text = base64.b64decode(result[1])
1081 raise RuntimeError(text)
1084 def action(self, time, guid, action):
1085 msg = testbed_messages[ACTION]
1086 msg = msg % (time, guid, action)
1087 self._client.send_msg(msg)
1088 reply = self._client.read_reply()
1089 result = reply.split("|")
1090 code = int(result[0])
1091 text = base64.b64decode(result[1])
1093 raise RuntimeError(text)
1095 def status(self, guid = None):
1096 msg = testbed_messages[STATUS]
1097 msg = msg % str(guid)
1098 self._client.send_msg(msg)
1099 reply = self._client.read_reply()
1100 result = reply.split("|")
1101 code = int(result[0])
1102 text = base64.b64decode(result[1])
1104 raise RuntimeError(text)
1107 def trace(self, guid, trace_id, attribute='value'):
1108 msg = testbed_messages[TRACE]
1109 attribute = base64.b64encode(attribute)
1110 msg = msg % (guid, trace_id, attribute)
1111 self._client.send_msg(msg)
1112 reply = self._client.read_reply()
1113 result = reply.split("|")
1114 code = int(result[0])
1115 text = base64.b64decode(result[1])
1117 raise RuntimeError(text)
1120 def get_attribute_list(self, guid):
1121 msg = testbed_messages[GET_ATTRIBUTE_LIST]
1123 self._client.send_msg(msg)
1124 reply = self._client.read_reply()
1125 result = reply.split("|")
1126 code = int(result[0])
1127 text = base64.b64decode(result[1])
1129 raise RuntimeError(text)
1130 attr_list = cPickle.loads(text)
1134 msg = testbed_messages[SHUTDOWN]
1135 self._client.send_msg(msg)
1136 reply = self._client.read_reply()
1137 result = reply.split("|")
1138 code = int(result[0])
1139 text = base64.b64decode(result[1])
1141 raise RuntimeError(text)
1142 self._client.send_stop()
1143 self._client.read_reply() # wait for it
1145 class ExperimentControllerProxy(object):
1146 def __init__(self, root_dir, log_level, experiment_xml = None,
1147 launch = True, host = None, port = None, user = None,
1151 if experiment_xml == None:
1152 raise RuntimeError("To launch a ExperimentControllerServer a \
1153 xml description of the experiment is required")
1156 xml = experiment_xml
1157 python_code = "from nepi.util.proxy import ExperimentControllerServer;\
1158 s = ExperimentControllerServer(%r, %r, %r);\
1159 s.run()" % (root_dir, log_level, xml)
1160 proc = server.popen_ssh_subprocess(python_code, host = host,
1161 port = port, user = user, agent = agent)
1163 err = proc.stderr.read()
1164 raise RuntimeError("Server could not be executed: %s" % \
1168 s = ExperimentControllerServer(root_dir, log_level, experiment_xml)
1171 # connect client to server
1172 self._client = server.Client(root_dir, host = host, port = port,
1173 user = user, agent = agent)
1176 def experiment_xml(self):
1177 msg = controller_messages[XML]
1178 self._client.send_msg(msg)
1179 reply = self._client.read_reply()
1180 result = reply.split("|")
1181 code = int(result[0])
1182 text = base64.b64decode(result[1])
1184 raise RuntimeError(text)
1187 def set_access_configuration(self, testbed_guid, access_config):
1188 mode = access_config.get_attribute_value("mode")
1189 communication = access_config.get_attribute_value("communication")
1190 host = access_config.get_attribute_value("host")
1191 user = access_config.get_attribute_value("user")
1192 port = access_config.get_attribute_value("port")
1193 root_dir = access_config.get_attribute_value("rootDirectory")
1194 use_agent = access_config.get_attribute_value("useAgent")
1195 log_level = access_config.get_attribute_value("logLevel")
1196 msg = controller_messages[ACCESS]
1197 msg = msg % (testbed_guid, mode, communication, host, user, port,
1198 root_dir, use_agent, log_level)
1199 self._client.send_msg(msg)
1200 reply = self._client.read_reply()
1201 result = reply.split("|")
1202 code = int(result[0])
1203 text = base64.b64decode(result[1])
1205 raise RuntimeError(text)
1207 def trace(self, testbed_guid, guid, trace_id, attribute='value'):
1208 msg = controller_messages[TRACE]
1209 attribute = base64.b64encode(attribute)
1210 msg = msg % (testbed_guid, guid, trace_id, attribute)
1211 self._client.send_msg(msg)
1212 reply = self._client.read_reply()
1213 result = reply.split("|")
1214 code = int(result[0])
1215 text = base64.b64decode(result[1])
1218 raise RuntimeError(text)
1221 msg = controller_messages[START]
1222 self._client.send_msg(msg)
1223 reply = self._client.read_reply()
1224 result = reply.split("|")
1225 code = int(result[0])
1226 text = base64.b64decode(result[1])
1228 raise RuntimeError(text)
1231 msg = controller_messages[STOP]
1232 self._client.send_msg(msg)
1233 reply = self._client.read_reply()
1234 result = reply.split("|")
1235 code = int(result[0])
1236 text = base64.b64decode(result[1])
1238 raise RuntimeError(text)
1241 msg = controller_messages[RECOVER]
1242 self._client.send_msg(msg)
1243 reply = self._client.read_reply()
1244 result = reply.split("|")
1245 code = int(result[0])
1246 text = base64.b64decode(result[1])
1248 raise RuntimeError(text)
1250 def is_finished(self, guid):
1251 msg = controller_messages[FINISHED]
1253 self._client.send_msg(msg)
1254 reply = self._client.read_reply()
1255 result = reply.split("|")
1256 code = int(result[0])
1257 text = base64.b64decode(result[1])
1259 raise RuntimeError(text)
1260 return text == "True"
1263 msg = controller_messages[SHUTDOWN]
1264 self._client.send_msg(msg)
1265 reply = self._client.read_reply()
1266 result = reply.split("|")
1267 code = int(result[0])
1268 text = base64.b64decode(result[1])
1270 raise RuntimeError(text)
1271 self._client.send_stop()
1272 self._client.read_reply() # wait for it