2 # -*- coding: utf-8 -*-
5 from nepi.core.attributes import AttributesMap, Attribute
6 from nepi.util import server, validation
7 from nepi.util.constants import TIME_NOW, ATTR_NEPI_TESTBED_ENVIRONMENT_SETUP
19 # PROTOCOL INSTRUCTION MESSAGES
40 DO_CROSS_CONNECT_INIT = 22
50 GET_ATTRIBUTE_LIST = 32
52 DO_CROSS_CONNECT_COMPL = 34
64 # EXPERIMENT CONTROLER PROTOCOL MESSAGES
65 controller_messages = dict({
67 ACCESS: "%d|%s" % (ACCESS, "%d|%s|%s|%s|%s|%d|%s|%r|%s"),
68 TRACE: "%d|%s" % (TRACE, "%d|%d|%s|%s"),
69 FINISHED: "%d|%s" % (FINISHED, "%d"),
72 RECOVER : "%d" % RECOVER,
73 SHUTDOWN: "%d" % SHUTDOWN,
76 # TESTBED INSTANCE PROTOCOL MESSAGES
77 testbed_messages = dict({
78 TRACE: "%d|%s" % (TRACE, "%d|%s|%s"),
81 SHUTDOWN: "%d" % SHUTDOWN,
82 CONFIGURE: "%d|%s" % (CONFIGURE, "%s|%s|%d"),
83 CREATE: "%d|%s" % (CREATE, "%d|%s"),
84 CREATE_SET: "%d|%s" % (CREATE_SET, "%d|%s|%s|%d"),
85 FACTORY_SET: "%d|%s" % (FACTORY_SET, "%d|%s|%s|%d"),
86 CONNECT: "%d|%s" % (CONNECT, "%d|%s|%d|%s"),
87 CROSS_CONNECT: "%d|%s" % (CROSS_CONNECT, "%d|%s|%d|%d|%s|%s|%s"),
88 ADD_TRACE: "%d|%s" % (ADD_TRACE, "%d|%s"),
89 ADD_ADDRESS: "%d|%s" % (ADD_ADDRESS, "%d|%s|%d|%s"),
90 ADD_ROUTE: "%d|%s" % (ADD_ROUTE, "%d|%s|%d|%s"),
91 DO_SETUP: "%d" % DO_SETUP,
92 DO_CREATE: "%d" % DO_CREATE,
93 DO_CONNECT_INIT: "%d" % DO_CONNECT_INIT,
94 DO_CONNECT_COMPL: "%d" % DO_CONNECT_COMPL,
95 DO_CONFIGURE: "%d" % DO_CONFIGURE,
96 DO_PRECONFIGURE: "%d" % DO_PRECONFIGURE,
97 DO_CROSS_CONNECT_INIT: "%d|%s" % (DO_CROSS_CONNECT_INIT, "%s"),
98 DO_CROSS_CONNECT_COMPL: "%d|%s" % (DO_CROSS_CONNECT_COMPL, "%s"),
99 GET: "%d|%s" % (GET, "%d|%s|%s"),
100 SET: "%d|%s" % (SET, "%d|%s|%s|%d|%s"),
101 EXPERIMENT_GET: "%d|%s" % (EXPERIMENT_GET, "%d|%d|%s|%s"),
102 EXPERIMENT_SET: "%d|%s" % (EXPERIMENT_SET, "%d|%d|%s|%s|%d|%s"),
103 GET_ROUTE: "%d|%s" % (GET, "%d|%d|%s"),
104 GET_ADDRESS: "%d|%s" % (GET, "%d|%d|%s"),
105 ACTION: "%d|%s" % (ACTION, "%s|%d|%s"),
106 STATUS: "%d|%s" % (STATUS, "%s"),
108 GET_ATTRIBUTE_LIST: "%d" % GET_ATTRIBUTE_LIST,
109 TESTBED_ID: "%d" % TESTBED_ID,
110 TESTBED_VERSION: "%d" % TESTBED_VERSION,
113 instruction_text = dict({
119 FINISHED: "FINISHED",
123 SHUTDOWN: "SHUTDOWN",
124 CONFIGURE: "CONFIGURE",
126 CREATE_SET: "CREATE_SET",
127 FACTORY_SET: "FACTORY_SET",
129 CROSS_CONNECT: "CROSS_CONNECT",
130 ADD_TRACE: "ADD_TRACE",
131 ADD_ADDRESS: "ADD_ADDRESS",
132 ADD_ROUTE: "ADD_ROUTE",
133 DO_SETUP: "DO_SETUP",
134 DO_CREATE: "DO_CREATE",
135 DO_CONNECT_INIT: "DO_CONNECT_INIT",
136 DO_CONNECT_COMPL: "DO_CONNECT_COMPL",
137 DO_CONFIGURE: "DO_CONFIGURE",
138 DO_PRECONFIGURE: "DO_PRECONFIGURE",
139 DO_CROSS_CONNECT_INIT: "DO_CROSS_CONNECT_INIT",
140 DO_CROSS_CONNECT_COMPL: "DO_CROSS_CONNECT_COMPL",
143 GET_ROUTE: "GET_ROUTE",
144 GET_ADDRESS: "GET_ADDRESS",
145 GET_ATTRIBUTE_LIST: "GET_ATTRIBUTE_LIST",
153 TESTBED_ID: "TESTBED_ID",
154 TESTBED_VERSION: "TESTBED_VERSION",
155 EXPERIMENT_SET: "EXPERIMENT_SET",
156 EXPERIMENT_GET: "EXPERIMENT_GET",
160 if isinstance(value, bool):
162 elif isinstance(value, int):
164 elif isinstance(value, float):
169 def set_type(type, value):
175 value = value == "True"
180 def log_msg(server, params):
181 instr = int(params[0])
182 instr_txt = instruction_text[instr]
183 server.log_debug("%s - msg: %s [%s]" % (server.__class__.__name__,
184 instr_txt, ", ".join(map(str, params[1:]))))
186 def log_reply(server, reply):
187 res = reply.split("|")
189 code_txt = instruction_text[code]
190 txt = base64.b64decode(res[1])
191 server.log_debug("%s - reply: %s %s" % (server.__class__.__name__,
194 def to_server_log_level(log_level):
195 return server.DEBUG_LEVEL \
196 if log_level == AccessConfiguration.DEBUG_LEVEL \
197 else server.ERROR_LEVEL
199 def get_access_config_params(access_config):
200 root_dir = access_config.get_attribute_value("rootDirectory")
201 log_level = access_config.get_attribute_value("logLevel")
202 log_level = to_server_log_level(log_level)
203 user = host = port = agent = None
204 communication = access_config.get_attribute_value("communication")
205 if communication == AccessConfiguration.ACCESS_SSH:
206 user = access_config.get_attribute_value("user")
207 host = access_config.get_attribute_value("host")
208 port = access_config.get_attribute_value("port")
209 agent = access_config.get_attribute_value("useAgent")
210 return (root_dir, log_level, user, host, port, agent)
212 class AccessConfiguration(AttributesMap):
213 MODE_SINGLE_PROCESS = "SINGLE"
214 MODE_DAEMON = "DAEMON"
216 ACCESS_LOCAL = "LOCAL"
217 ERROR_LEVEL = "Error"
218 DEBUG_LEVEL = "Debug"
221 super(AccessConfiguration, self).__init__()
222 self.add_attribute(name = "mode",
223 help = "Instance execution mode",
224 type = Attribute.ENUM,
225 value = AccessConfiguration.MODE_SINGLE_PROCESS,
226 allowed = [AccessConfiguration.MODE_DAEMON,
227 AccessConfiguration.MODE_SINGLE_PROCESS],
228 validation_function = validation.is_enum)
229 self.add_attribute(name = "communication",
230 help = "Instance communication mode",
231 type = Attribute.ENUM,
232 value = AccessConfiguration.ACCESS_LOCAL,
233 allowed = [AccessConfiguration.ACCESS_LOCAL,
234 AccessConfiguration.ACCESS_SSH],
235 validation_function = validation.is_enum)
236 self.add_attribute(name = "host",
237 help = "Host where the testbed will be executed",
238 type = Attribute.STRING,
240 validation_function = validation.is_string)
241 self.add_attribute(name = "user",
242 help = "User on the Host to execute the testbed",
243 type = Attribute.STRING,
244 value = getpass.getuser(),
245 validation_function = validation.is_string)
246 self.add_attribute(name = "port",
247 help = "Port on the Host",
248 type = Attribute.INTEGER,
250 validation_function = validation.is_integer)
251 self.add_attribute(name = "rootDirectory",
252 help = "Root directory for storing process files",
253 type = Attribute.STRING,
255 validation_function = validation.is_string) # TODO: validation.is_path
256 self.add_attribute(name = "useAgent",
257 help = "Use -A option for forwarding of the authentication agent, if ssh access is used",
258 type = Attribute.BOOL,
260 validation_function = validation.is_bool)
261 self.add_attribute(name = "logLevel",
262 help = "Log level for instance",
263 type = Attribute.ENUM,
264 value = AccessConfiguration.ERROR_LEVEL,
265 allowed = [AccessConfiguration.ERROR_LEVEL,
266 AccessConfiguration.DEBUG_LEVEL],
267 validation_function = validation.is_enum)
268 self.add_attribute(name = "recover",
269 help = "Do not intantiate testbeds, rather, reconnect to already-running instances. Used to recover from a dead controller.",
270 type = Attribute.BOOL,
272 validation_function = validation.is_bool)
274 class TempDir(object):
276 self.path = tempfile.mkdtemp()
279 shutil.rmtree(self.path)
281 class PermDir(object):
282 def __init__(self, path):
285 def create_controller(xml, access_config = None):
286 mode = None if not access_config \
287 else access_config.get_attribute_value("mode")
288 launch = True if not access_config \
289 else not access_config.get_attribute_value("recover")
290 if not mode or mode == AccessConfiguration.MODE_SINGLE_PROCESS:
292 raise ValueError, "Unsupported instantiation mode: %s with lanch=False" % (mode,)
294 from nepi.core.execute import ExperimentController
296 if not access_config or not access_config.has_attribute("rootDirectory"):
299 root_dir = PermDir(access_config.get_attribute_value("rootDirectory"))
300 controller = ExperimentController(xml, root_dir.path)
302 # inject reference to temporary dir, so that it gets cleaned
303 # up at destruction time.
304 controller._tempdir = root_dir
307 elif mode == AccessConfiguration.MODE_DAEMON:
308 (root_dir, log_level, user, host, port, agent) = \
309 get_access_config_params(access_config)
310 return ExperimentControllerProxy(root_dir, log_level,
311 experiment_xml = xml, host = host, port = port, user = user,
312 agent = agent, launch = launch)
313 raise RuntimeError("Unsupported access configuration '%s'" % mode)
315 def create_testbed_controller(testbed_id, testbed_version, access_config):
316 mode = None if not access_config \
317 else access_config.get_attribute_value("mode")
318 launch = True if not access_config \
319 else not access_config.get_attribute_value("recover")
320 environment_setup = access_config \
321 and access_config.has_attribute(ATTR_NEPI_TESTBED_ENVIRONMENT_SETUP) \
322 and access_config.get_attribute_value(ATTR_NEPI_TESTBED_ENVIRONMENT_SETUP) \
324 if not mode or mode == AccessConfiguration.MODE_SINGLE_PROCESS:
326 raise ValueError, "Unsupported instantiation mode: %s with lanch=False" % (mode,)
327 return _build_testbed_controller(testbed_id, testbed_version)
328 elif mode == AccessConfiguration.MODE_DAEMON:
329 (root_dir, log_level, user, host, port, agent) = \
330 get_access_config_params(access_config)
331 return TestbedControllerProxy(root_dir, log_level, testbed_id = testbed_id,
332 testbed_version = testbed_version, host = host, port = port,
333 user = user, agent = agent, launch = launch,
334 environment_setup = environment_setup)
335 raise RuntimeError("Unsupported access configuration '%s'" % mode)
337 def _build_testbed_controller(testbed_id, testbed_version):
338 mod_name = "nepi.testbeds.%s" % (testbed_id.lower())
339 if not mod_name in sys.modules:
341 module = sys.modules[mod_name]
342 return module.TestbedController(testbed_version)
344 class TestbedControllerServer(server.Server):
345 def __init__(self, root_dir, log_level, testbed_id, testbed_version):
346 super(TestbedControllerServer, self).__init__(root_dir, log_level)
347 self._testbed_id = testbed_id
348 self._testbed_version = testbed_version
351 def post_daemonize(self):
352 self._testbed = _build_testbed_controller(self._testbed_id,
353 self._testbed_version)
355 def reply_action(self, msg):
357 result = base64.b64encode("Invalid command line")
358 reply = "%d|%s" % (ERROR, result)
360 params = msg.split("|")
361 instruction = int(params[0])
362 log_msg(self, params)
364 if instruction == TRACE:
365 reply = self.trace(params)
366 elif instruction == START:
367 reply = self.start(params)
368 elif instruction == STOP:
369 reply = self.stop(params)
370 elif instruction == SHUTDOWN:
371 reply = self.shutdown(params)
372 elif instruction == CONFIGURE:
373 reply = self.defer_configure(params)
374 elif instruction == CREATE:
375 reply = self.defer_create(params)
376 elif instruction == CREATE_SET:
377 reply = self.defer_create_set(params)
378 elif instruction == FACTORY_SET:
379 reply = self.defer_factory_set(params)
380 elif instruction == CONNECT:
381 reply = self.defer_connect(params)
382 elif instruction == CROSS_CONNECT:
383 reply = self.defer_cross_connect(params)
384 elif instruction == ADD_TRACE:
385 reply = self.defer_add_trace(params)
386 elif instruction == ADD_ADDRESS:
387 reply = self.defer_add_address(params)
388 elif instruction == ADD_ROUTE:
389 reply = self.defer_add_route(params)
390 elif instruction == DO_SETUP:
391 reply = self.do_setup(params)
392 elif instruction == DO_CREATE:
393 reply = self.do_create(params)
394 elif instruction == DO_CONNECT_INIT:
395 reply = self.do_connect_init(params)
396 elif instruction == DO_CONNECT_COMPL:
397 reply = self.do_connect_compl(params)
398 elif instruction == DO_CONFIGURE:
399 reply = self.do_configure(params)
400 elif instruction == DO_PRECONFIGURE:
401 reply = self.do_preconfigure(params)
402 elif instruction == DO_CROSS_CONNECT_INIT:
403 reply = self.do_cross_connect_init(params)
404 elif instruction == DO_CROSS_CONNECT_COMPL:
405 reply = self.do_cross_connect_compl(params)
406 elif instruction == GET:
407 reply = self.get(params)
408 elif instruction == SET:
409 reply = self.set(params)
410 elif instruction == GET_ADDRESS:
411 reply = self.get_address(params)
412 elif instruction == GET_ROUTE:
413 reply = self.get_route(params)
414 elif instruction == ACTION:
415 reply = self.action(params)
416 elif instruction == STATUS:
417 reply = self.status(params)
418 elif instruction == GUIDS:
419 reply = self.guids(params)
420 elif instruction == GET_ATTRIBUTE_LIST:
421 reply = self.get_attribute_list(params)
422 elif instruction == TESTBED_ID:
423 reply = self.testbed_id(params)
424 elif instruction == TESTBED_VERSION:
425 reply = self.testbed_version(params)
427 error = "Invalid instruction %s" % instruction
428 self.log_error(error)
429 result = base64.b64encode(error)
430 reply = "%d|%s" % (ERROR, result)
432 error = self.log_error()
433 result = base64.b64encode(error)
434 reply = "%d|%s" % (ERROR, result)
435 log_reply(self, reply)
438 def guids(self, params):
439 guids = self._testbed.guids
440 value = cPickle.dumps(guids)
441 result = base64.b64encode(value)
442 return "%d|%s" % (OK, result)
444 def testbed_id(self, params):
445 testbed_id = self._testbed.testbed_id
446 result = base64.b64encode(str(testbed_id))
447 return "%d|%s" % (OK, result)
449 def testbed_version(self, params):
450 testbed_version = self._testbed.testbed_version
451 result = base64.b64encode(str(testbed_version))
452 return "%d|%s" % (OK, result)
454 def defer_create(self, params):
455 guid = int(params[1])
456 factory_id = params[2]
457 self._testbed.defer_create(guid, factory_id)
458 return "%d|%s" % (OK, "")
460 def trace(self, params):
461 guid = int(params[1])
463 attribute = base64.b64decode(params[3])
464 trace = self._testbed.trace(guid, trace_id, attribute)
465 result = base64.b64encode(trace)
466 return "%d|%s" % (OK, result)
468 def start(self, params):
469 self._testbed.start()
470 return "%d|%s" % (OK, "")
472 def stop(self, params):
474 return "%d|%s" % (OK, "")
476 def shutdown(self, params):
477 self._testbed.shutdown()
478 return "%d|%s" % (OK, "")
480 def defer_configure(self, params):
481 name = base64.b64decode(params[1])
482 value = base64.b64decode(params[2])
483 type = int(params[3])
484 value = set_type(type, value)
485 self._testbed.defer_configure(name, value)
486 return "%d|%s" % (OK, "")
488 def defer_create_set(self, params):
489 guid = int(params[1])
490 name = base64.b64decode(params[2])
491 value = base64.b64decode(params[3])
492 type = int(params[4])
493 value = set_type(type, value)
494 self._testbed.defer_create_set(guid, name, value)
495 return "%d|%s" % (OK, "")
497 def defer_factory_set(self, params):
498 name = base64.b64decode(params[1])
499 value = base64.b64decode(params[2])
500 type = int(params[3])
501 value = set_type(type, value)
502 self._testbed.defer_factory_set(name, value)
503 return "%d|%s" % (OK, "")
505 def defer_connect(self, params):
506 guid1 = int(params[1])
507 connector_type_name1 = params[2]
508 guid2 = int(params[3])
509 connector_type_name2 = params[4]
510 self._testbed.defer_connect(guid1, connector_type_name1, guid2,
511 connector_type_name2)
512 return "%d|%s" % (OK, "")
514 def defer_cross_connect(self, params):
515 guid = int(params[1])
516 connector_type_name = params[2]
517 cross_guid = int(params[3])
518 connector_type_name = params[4]
519 cross_guid = int(params[5])
520 cross_testbed_guid = int(params[6])
521 cross_testbed_id = params[7]
522 cross_factory_id = params[8]
523 cross_connector_type_name = params[9]
524 self._testbed.defer_cross_connect(guid, connector_type_name, cross_guid,
525 cross_testbed_guid, cross_testbed_id, cross_factory_id,
526 cross_connector_type_name)
527 return "%d|%s" % (OK, "")
529 def defer_add_trace(self, params):
530 guid = int(params[1])
532 self._testbed.defer_add_trace(guid, trace_id)
533 return "%d|%s" % (OK, "")
535 def defer_add_address(self, params):
536 guid = int(params[1])
538 netprefix = int(params[3])
539 broadcast = params[4]
540 self._testbed.defer_add_address(guid, address, netprefix,
542 return "%d|%s" % (OK, "")
544 def defer_add_route(self, params):
545 guid = int(params[1])
546 destination = params[2]
547 netprefix = int(params[3])
549 self._testbed.defer_add_route(guid, destination, netprefix, nexthop)
550 return "%d|%s" % (OK, "")
552 def do_setup(self, params):
553 self._testbed.do_setup()
554 return "%d|%s" % (OK, "")
556 def do_create(self, params):
557 self._testbed.do_create()
558 return "%d|%s" % (OK, "")
560 def do_connect_init(self, params):
561 self._testbed.do_connect_init()
562 return "%d|%s" % (OK, "")
564 def do_connect_compl(self, params):
565 self._testbed.do_connect_compl()
566 return "%d|%s" % (OK, "")
568 def do_configure(self, params):
569 self._testbed.do_configure()
570 return "%d|%s" % (OK, "")
572 def do_preconfigure(self, params):
573 self._testbed.do_preconfigure()
574 return "%d|%s" % (OK, "")
576 def do_cross_connect_init(self, params):
577 pcross_data = base64.b64decode(params[1])
578 cross_data = cPickle.loads(pcross_data)
579 self._testbed.do_cross_connect_init(cross_data)
580 return "%d|%s" % (OK, "")
582 def do_cross_connect_compl(self, params):
583 pcross_data = base64.b64decode(params[1])
584 cross_data = cPickle.loads(pcross_data)
585 self._testbed.do_cross_connect_compl(cross_data)
586 return "%d|%s" % (OK, "")
588 def get(self, params):
590 name = base64.b64decode(params[2])
591 value = self._testbed.get(guid, name, time)
593 result = base64.b64encode(str(value))
594 return "%d|%s" % (OK, result)
596 def set(self, params):
597 guid = int(params[1])
598 name = base64.b64decode(params[2])
599 value = base64.b64decode(params[3])
600 type = int(params[2])
602 value = set_type(type, value)
603 self._testbed.set(guid, name, value, time)
604 return "%d|%s" % (OK, "")
606 def get_address(self, params):
607 guid = int(params[1])
608 index = int(params[2])
609 attribute = base64.b64decode(param[3])
610 value = self._testbed.get_address(guid, index, attribute)
611 result = base64.b64encode(str(value))
612 return "%d|%s" % (OK, result)
614 def get_route(self, params):
615 guid = int(params[1])
616 index = int(params[2])
617 attribute = base64.b64decode(param[3])
618 value = self._testbed.get_route(guid, index, attribute)
619 result = base64.b64encode(str(value))
620 return "%d|%s" % (OK, result)
622 def action(self, params):
624 guid = int(params[2])
625 command = base64.b64decode(params[3])
626 self._testbed.action(time, guid, command)
627 return "%d|%s" % (OK, "")
629 def status(self, params):
631 if params[1] != "None":
632 guid = int(params[1])
633 status = self._testbed.status(guid)
634 result = base64.b64encode(str(status))
635 return "%d|%s" % (OK, result)
637 def get_attribute_list(self, params):
638 guid = int(params[1])
639 attr_list = self._testbed.get_attribute_list(guid)
640 value = cPickle.dumps(attr_list)
641 result = base64.b64encode(value)
642 return "%d|%s" % (OK, result)
644 class ExperimentControllerServer(server.Server):
645 def __init__(self, root_dir, log_level, experiment_xml):
646 super(ExperimentControllerServer, self).__init__(root_dir, log_level)
647 self._experiment_xml = experiment_xml
648 self._controller = None
650 def post_daemonize(self):
651 from nepi.core.execute import ExperimentController
652 self._controller = ExperimentController(self._experiment_xml,
653 root_dir = self._root_dir)
655 def reply_action(self, msg):
657 result = base64.b64encode("Invalid command line")
658 reply = "%d|%s" % (ERROR, result)
660 params = msg.split("|")
661 instruction = int(params[0])
662 log_msg(self, params)
664 if instruction == XML:
665 reply = self.experiment_xml(params)
666 elif instruction == ACCESS:
667 reply = self.set_access_configuration(params)
668 elif instruction == TRACE:
669 reply = self.trace(params)
670 elif instruction == FINISHED:
671 reply = self.is_finished(params)
672 elif instruction == EXPERIMENT_GET:
673 reply = self.get(params)
674 elif instruction == EXPERIMENT_SET:
675 reply = self.set(params)
676 elif instruction == START:
677 reply = self.start(params)
678 elif instruction == STOP:
679 reply = self.stop(params)
680 elif instruction == RECOVER:
681 reply = self.recover(params)
682 elif instruction == SHUTDOWN:
683 reply = self.shutdown(params)
685 error = "Invalid instruction %s" % instruction
686 self.log_error(error)
687 result = base64.b64encode(error)
688 reply = "%d|%s" % (ERROR, result)
690 error = self.log_error()
691 result = base64.b64encode(error)
692 reply = "%d|%s" % (ERROR, result)
693 log_reply(self, reply)
696 def experiment_xml(self, params):
697 xml = self._controller.experiment_xml
698 result = base64.b64encode(xml)
699 return "%d|%s" % (OK, result)
701 def set_access_configuration(self, params):
702 testbed_guid = int(params[1])
704 communication = params[3]
707 port = int(params[6])
709 use_agent = params[8] == "True"
710 log_level = params[9]
711 access_config = AccessConfiguration()
712 access_config.set_attribute_value("mode", mode)
713 access_config.set_attribute_value("communication", communication)
714 access_config.set_attribute_value("host", host)
715 access_config.set_attribute_value("user", user)
716 access_config.set_attribute_value("port", port)
717 access_config.set_attribute_value("rootDirectory", root_dir)
718 access_config.set_attribute_value("useAgent", use_agent)
719 access_config.set_attribute_value("logLevel", log_level)
720 self._controller.set_access_configuration(testbed_guid,
722 return "%d|%s" % (OK, "")
724 def trace(self, params):
725 testbed_guid = int(params[1])
726 guid = int(params[2])
728 attribute = base64.b64decode(params[4])
729 trace = self._controller.trace(testbed_guid, guid, trace_id, attribute)
730 result = base64.b64encode(trace)
731 return "%d|%s" % (OK, result)
733 def is_finished(self, params):
734 guid = int(params[1])
735 status = self._controller.is_finished(guid)
736 result = base64.b64encode(str(status))
737 return "%d|%s" % (OK, result)
739 def get(self, params):
740 testbed_guid = int(param[1])
741 guid = int(params[2])
742 name = base64.b64decode(params[3])
743 value = self._controller.get(testbed_guid, guid, name, time)
745 result = base64.b64encode(str(value))
746 return "%d|%s" % (OK, result)
748 def set(self, params):
749 testbed_guid = int(params[1])
750 guid = int(params[2])
751 name = base64.b64decode(params[3])
752 value = base64.b64decode(params[4])
753 type = int(params[3])
755 value = set_type(type, value)
756 self._controller.set(testbed_guid, guid, name, value, time)
757 return "%d|%s" % (OK, "")
759 def start(self, params):
760 self._controller.start()
761 return "%d|%s" % (OK, "")
763 def stop(self, params):
764 self._controller.stop()
765 return "%d|%s" % (OK, "")
767 def recover(self, params):
768 self._controller.recover()
769 return "%d|%s" % (OK, "")
771 def shutdown(self, params):
772 self._controller.shutdown()
773 return "%d|%s" % (OK, "")
775 class TestbedControllerProxy(object):
776 def __init__(self, root_dir, log_level, testbed_id = None,
777 testbed_version = None, launch = True, host = None,
778 port = None, user = None, agent = None,
779 environment_setup = ""):
781 if testbed_id == None or testbed_version == None:
782 raise RuntimeError("To launch a TesbedInstance server a \
783 testbed_id and testbed_version are required")
786 python_code = "from nepi.util.proxy import \
787 TesbedInstanceServer;\
788 s = TestbedControllerServer('%s', %d, '%s', '%s');\
789 s.run()" % (root_dir, log_level, testbed_id,
791 proc = server.popen_ssh_subprocess(python_code, host = host,
792 port = port, user = user, agent = agent,
793 environment_setup = environment_setup)
795 err = proc.stderr.read()
796 raise RuntimeError("Server could not be executed: %s" % \
800 s = TestbedControllerServer(root_dir, log_level, testbed_id,
804 # connect client to server
805 self._client = server.Client(root_dir, host = host, port = port,
806 user = user, agent = agent)
810 msg = testbed_messages[GUIDS]
811 self._client.send_msg(msg)
812 reply = self._client.read_reply()
813 result = reply.split("|")
814 code = int(result[0])
815 text = base64.b64decode(result[1])
817 raise RuntimeError(text)
818 guids = cPickle.loads(text)
822 def testbed_id(self):
823 msg = testbed_messages[TESTBED_ID]
824 self._client.send_msg(msg)
825 reply = self._client.read_reply()
826 result = reply.split("|")
827 code = int(result[0])
828 text = base64.b64decode(result[1])
830 raise RuntimeError(text)
834 def testbed_version(self):
835 msg = testbed_messages[TESTBED_VERSION]
836 self._client.send_msg(msg)
837 reply = self._client.read_reply()
838 result = reply.split("|")
839 code = int(result[0])
840 text = base64.b64decode(result[1])
842 raise RuntimeError(text)
845 def defer_configure(self, name, value):
846 msg = testbed_messages[CONFIGURE]
847 type = get_type(value)
848 # avoid having "|" in this parameters
849 name = base64.b64encode(name)
850 value = base64.b64encode(str(value))
851 msg = msg % (name, value, type)
852 self._client.send_msg(msg)
853 reply = self._client.read_reply()
854 result = reply.split("|")
855 code = int(result[0])
856 text = base64.b64decode(result[1])
858 raise RuntimeError(text)
860 def defer_create(self, guid, factory_id):
861 msg = testbed_messages[CREATE]
862 msg = msg % (guid, factory_id)
863 self._client.send_msg(msg)
864 reply = self._client.read_reply()
865 result = reply.split("|")
866 code = int(result[0])
867 text = base64.b64decode(result[1])
869 raise RuntimeError(text)
871 def defer_create_set(self, guid, name, value):
872 msg = testbed_messages[CREATE_SET]
873 type = get_type(value)
874 # avoid having "|" in this parameters
875 name = base64.b64encode(name)
876 value = base64.b64encode(str(value))
877 msg = msg % (guid, name, value, type)
878 self._client.send_msg(msg)
879 reply = self._client.read_reply()
880 result = reply.split("|")
881 code = int(result[0])
882 text = base64.b64decode(result[1])
884 raise RuntimeError(text)
886 def defer_factory_set(self, guid, name, value):
887 msg = testbed_messages[FACTORY_SET]
888 type = get_type(value)
889 # avoid having "|" in this parameters
890 name = base64.b64encode(name)
891 value = base64.b64encode(str(value))
892 msg = msg % (guid, name, value, type)
893 self._client.send_msg(msg)
894 reply = self._client.read_reply()
895 result = reply.split("|")
896 code = int(result[0])
897 text = base64.b64decode(result[1])
899 raise RuntimeError(text)
901 def defer_connect(self, guid1, connector_type_name1, guid2,
902 connector_type_name2):
903 msg = testbed_messages[CONNECT]
904 msg = msg % (guid1, connector_type_name1, guid2,
905 connector_type_name2)
906 self._client.send_msg(msg)
907 reply = self._client.read_reply()
908 result = reply.split("|")
909 code = int(result[0])
910 text = base64.b64decode(result[1])
912 raise RuntimeError(text)
914 def defer_cross_connect(self, guid, connector_type_name, cross_guid,
915 cross_testbed_guid, cross_testbed_id, cross_factory_id,
916 cross_connector_type_name):
917 msg = testbed_messages[CROSS_CONNECT]
918 msg = msg % (guid, connector_type_name, cross_guid, cross_testbed_guid,
919 cross_testbed_id, cross_factory_id, cross_connector_type_name)
920 self._client.send_msg(msg)
921 reply = self._client.read_reply()
922 result = reply.split("|")
923 code = int(result[0])
924 text = base64.b64decode(result[1])
926 raise RuntimeError(text)
928 def defer_add_trace(self, guid, trace_id):
929 msg = testbed_messages[ADD_TRACE]
930 msg = msg % (guid, trace_id)
931 self._client.send_msg(msg)
932 reply = self._client.read_reply()
933 result = reply.split("|")
934 code = int(result[0])
935 text = base64.b64decode(result[1])
937 raise RuntimeError(text)
939 def defer_add_address(self, guid, address, netprefix, broadcast):
940 msg = testbed_messages[ADD_ADDRESS]
941 msg = msg % (guid, address, netprefix, broadcast)
942 self._client.send_msg(msg)
943 reply = self._client.read_reply()
944 result = reply.split("|")
945 code = int(result[0])
946 text = base64.b64decode(result[1])
948 raise RuntimeError(text)
950 def defer_add_route(self, guid, destination, netprefix, nexthop):
951 msg = testbed_messages[ADD_ROUTE]
952 msg = msg % (guid, destination, netprefix, nexthop)
953 self._client.send_msg(msg)
954 reply = self._client.read_reply()
955 result = reply.split("|")
956 code = int(result[0])
957 text = base64.b64decode(result[1])
959 raise RuntimeError(text)
962 msg = testbed_messages[DO_SETUP]
963 self._client.send_msg(msg)
964 reply = self._client.read_reply()
965 result = reply.split("|")
966 code = int(result[0])
967 text = base64.b64decode(result[1])
969 raise RuntimeError(text)
972 msg = testbed_messages[DO_CREATE]
973 self._client.send_msg(msg)
974 reply = self._client.read_reply()
975 result = reply.split("|")
976 code = int(result[0])
977 text = base64.b64decode(result[1])
979 raise RuntimeError(text)
981 def do_connect_init(self):
982 msg = testbed_messages[DO_CONNECT_INIT]
983 self._client.send_msg(msg)
984 reply = self._client.read_reply()
985 result = reply.split("|")
986 code = int(result[0])
987 text = base64.b64decode(result[1])
989 raise RuntimeError(text)
991 def do_connect_compl(self):
992 msg = testbed_messages[DO_CONNECT_COMPL]
993 self._client.send_msg(msg)
994 reply = self._client.read_reply()
995 result = reply.split("|")
996 code = int(result[0])
997 text = base64.b64decode(result[1])
999 raise RuntimeError(text)
1001 def do_configure(self):
1002 msg = testbed_messages[DO_CONFIGURE]
1003 self._client.send_msg(msg)
1004 reply = self._client.read_reply()
1005 result = reply.split("|")
1006 code = int(result[0])
1007 text = base64.b64decode(result[1])
1009 raise RuntimeError(text)
1011 def do_preconfigure(self):
1012 msg = testbed_messages[DO_PRECONFIGURE]
1013 self._client.send_msg(msg)
1014 reply = self._client.read_reply()
1015 result = reply.split("|")
1016 code = int(result[0])
1017 text = base64.b64decode(result[1])
1019 raise RuntimeError(text)
1021 def do_cross_connect_init(self, cross_data):
1022 msg = testbed_messages[DO_CROSS_CONNECT_INIT]
1023 pcross_data = cPickle.dumps(cross_data)
1024 cross_data = base64.b64encode(pcross_data)
1025 msg = msg % (cross_data)
1026 self._client.send_msg(msg)
1027 reply = self._client.read_reply()
1028 result = reply.split("|")
1029 code = int(result[0])
1030 text = base64.b64decode(result[1])
1032 raise RuntimeError(text)
1034 def do_cross_connect_compl(self, cross_data):
1035 msg = testbed_messages[DO_CROSS_CONNECT_COMPL]
1036 pcross_data = cPickle.dumps(cross_data)
1037 cross_data = base64.b64encode(pcross_data)
1038 msg = msg % (cross_data)
1039 self._client.send_msg(msg)
1040 reply = self._client.read_reply()
1041 result = reply.split("|")
1042 code = int(result[0])
1043 text = base64.b64decode(result[1])
1045 raise RuntimeError(text)
1047 def start(self, time = TIME_NOW):
1048 msg = testbed_messages[START]
1049 self._client.send_msg(msg)
1050 reply = self._client.read_reply()
1051 result = reply.split("|")
1052 code = int(result[0])
1053 text = base64.b64decode(result[1])
1055 raise RuntimeError(text)
1057 def stop(self, time = TIME_NOW):
1058 msg = testbed_messages[STOP]
1059 self._client.send_msg(msg)
1060 reply = self._client.read_reply()
1061 result = reply.split("|")
1062 code = int(result[0])
1063 text = base64.b64decode(result[1])
1065 raise RuntimeError(text)
1067 def set(self, guid, name, value, time = TIME_NOW):
1068 msg = testbed_messages[SET]
1069 type = get_type(value)
1070 # avoid having "|" in this parameters
1071 name = base64.b64encode(name)
1072 value = base64.b64encode(str(value))
1073 msg = msg % (guid, name, value, type, time)
1074 self._client.send_msg(msg)
1075 reply = self._client.read_reply()
1076 result = reply.split("|")
1077 code = int(result[0])
1078 text = base64.b64decode(result[1])
1080 raise RuntimeError(text)
1082 def get(self, guid, name, time = TIME_NOW):
1083 msg = testbed_messages[GET]
1084 # avoid having "|" in this parameters
1085 name = base64.b64encode(name)
1086 msg = msg % (guid, name, time)
1087 self._client.send_msg(msg)
1088 reply = self._client.read_reply()
1089 result = reply.split("|")
1090 code = int(result[0])
1091 text = base64.b64decode(result[1])
1093 raise RuntimeError(text)
1096 def get_address(self, guid, index, attribute):
1097 msg = testbed_messages[GET_ADDRESS]
1098 # avoid having "|" in this parameters
1099 attribute = base64.b64encode(attribute)
1100 msg = msg % (guid, index, attribute)
1101 self._client.send_msg(msg)
1102 reply = self._client.read_reply()
1103 result = reply.split("|")
1104 code = int(result[0])
1105 text = base64.b64decode(result[1])
1107 raise RuntimeError(text)
1110 def get_route(self, guid, index, attribute):
1111 msg = testbed_messages[GET_ROUTE]
1112 # avoid having "|" in this parameters
1113 attribute = base64.b64encode(attribute)
1114 msg = msg % (guid, index, attribute)
1115 self._client.send_msg(msg)
1116 reply = self._client.read_reply()
1117 result = reply.split("|")
1118 code = int(result[0])
1119 text = base64.b64decode(result[1])
1121 raise RuntimeError(text)
1124 def action(self, time, guid, action):
1125 msg = testbed_messages[ACTION]
1126 msg = msg % (time, guid, action)
1127 self._client.send_msg(msg)
1128 reply = self._client.read_reply()
1129 result = reply.split("|")
1130 code = int(result[0])
1131 text = base64.b64decode(result[1])
1133 raise RuntimeError(text)
1135 def status(self, guid = None):
1136 msg = testbed_messages[STATUS]
1137 msg = msg % str(guid)
1138 self._client.send_msg(msg)
1139 reply = self._client.read_reply()
1140 result = reply.split("|")
1141 code = int(result[0])
1142 text = base64.b64decode(result[1])
1144 raise RuntimeError(text)
1147 def trace(self, guid, trace_id, attribute='value'):
1148 msg = testbed_messages[TRACE]
1149 attribute = base64.b64encode(attribute)
1150 msg = msg % (guid, trace_id, attribute)
1151 self._client.send_msg(msg)
1152 reply = self._client.read_reply()
1153 result = reply.split("|")
1154 code = int(result[0])
1155 text = base64.b64decode(result[1])
1157 raise RuntimeError(text)
1160 def get_attribute_list(self, guid):
1161 msg = testbed_messages[GET_ATTRIBUTE_LIST]
1163 self._client.send_msg(msg)
1164 reply = self._client.read_reply()
1165 result = reply.split("|")
1166 code = int(result[0])
1167 text = base64.b64decode(result[1])
1169 raise RuntimeError(text)
1170 attr_list = cPickle.loads(text)
1174 msg = testbed_messages[SHUTDOWN]
1175 self._client.send_msg(msg)
1176 reply = self._client.read_reply()
1177 result = reply.split("|")
1178 code = int(result[0])
1179 text = base64.b64decode(result[1])
1181 raise RuntimeError(text)
1182 self._client.send_stop()
1183 self._client.read_reply() # wait for it
1185 class ExperimentControllerProxy(object):
1186 def __init__(self, root_dir, log_level, experiment_xml = None,
1187 launch = True, host = None, port = None, user = None,
1188 agent = None, environment_setup = ""):
1191 if experiment_xml == None:
1192 raise RuntimeError("To launch a ExperimentControllerServer a \
1193 xml description of the experiment is required")
1196 xml = experiment_xml
1197 python_code = "from nepi.util.proxy import ExperimentControllerServer;\
1198 s = ExperimentControllerServer(%r, %r, %r);\
1199 s.run()" % (root_dir, log_level, xml)
1200 proc = server.popen_ssh_subprocess(python_code, host = host,
1201 port = port, user = user, agent = agent,
1202 environment_setup = environment_setup)
1204 err = proc.stderr.read()
1205 raise RuntimeError("Server could not be executed: %s" % \
1209 s = ExperimentControllerServer(root_dir, log_level, experiment_xml)
1212 # connect client to server
1213 self._client = server.Client(root_dir, host = host, port = port,
1214 user = user, agent = agent)
1217 def experiment_xml(self):
1218 msg = controller_messages[XML]
1219 self._client.send_msg(msg)
1220 reply = self._client.read_reply()
1221 result = reply.split("|")
1222 code = int(result[0])
1223 text = base64.b64decode(result[1])
1225 raise RuntimeError(text)
1228 def set_access_configuration(self, testbed_guid, access_config):
1229 mode = access_config.get_attribute_value("mode")
1230 communication = access_config.get_attribute_value("communication")
1231 host = access_config.get_attribute_value("host")
1232 user = access_config.get_attribute_value("user")
1233 port = access_config.get_attribute_value("port")
1234 root_dir = access_config.get_attribute_value("rootDirectory")
1235 use_agent = access_config.get_attribute_value("useAgent")
1236 log_level = access_config.get_attribute_value("logLevel")
1237 msg = controller_messages[ACCESS]
1238 msg = msg % (testbed_guid, mode, communication, host, user, port,
1239 root_dir, use_agent, log_level)
1240 self._client.send_msg(msg)
1241 reply = self._client.read_reply()
1242 result = reply.split("|")
1243 code = int(result[0])
1244 text = base64.b64decode(result[1])
1246 raise RuntimeError(text)
1248 def trace(self, testbed_guid, guid, trace_id, attribute='value'):
1249 msg = controller_messages[TRACE]
1250 attribute = base64.b64encode(attribute)
1251 msg = msg % (testbed_guid, guid, trace_id, attribute)
1252 self._client.send_msg(msg)
1253 reply = self._client.read_reply()
1254 result = reply.split("|")
1255 code = int(result[0])
1256 text = base64.b64decode(result[1])
1259 raise RuntimeError(text)
1262 msg = controller_messages[START]
1263 self._client.send_msg(msg)
1264 reply = self._client.read_reply()
1265 result = reply.split("|")
1266 code = int(result[0])
1267 text = base64.b64decode(result[1])
1269 raise RuntimeError(text)
1272 msg = controller_messages[STOP]
1273 self._client.send_msg(msg)
1274 reply = self._client.read_reply()
1275 result = reply.split("|")
1276 code = int(result[0])
1277 text = base64.b64decode(result[1])
1279 raise RuntimeError(text)
1282 msg = controller_messages[RECOVER]
1283 self._client.send_msg(msg)
1284 reply = self._client.read_reply()
1285 result = reply.split("|")
1286 code = int(result[0])
1287 text = base64.b64decode(result[1])
1289 raise RuntimeError(text)
1291 def is_finished(self, guid):
1292 msg = controller_messages[FINISHED]
1294 self._client.send_msg(msg)
1295 reply = self._client.read_reply()
1296 result = reply.split("|")
1297 code = int(result[0])
1298 text = base64.b64decode(result[1])
1300 raise RuntimeError(text)
1301 return text == "True"
1303 def set(self, testbed_guid, guid, name, value, time = TIME_NOW):
1304 msg = testbed_messages[EXPERIMENT_SET]
1305 type = get_type(value)
1306 # avoid having "|" in this parameters
1307 name = base64.b64encode(name)
1308 value = base64.b64encode(str(value))
1309 msg = msg % (testbed_guid, guid, name, value, type, time)
1310 self._client.send_msg(msg)
1311 reply = self._client.read_reply()
1312 result = reply.split("|")
1313 code = int(result[0])
1314 text = base64.b64decode(result[1])
1316 raise RuntimeError(text)
1318 def get(self, testbed_guid, guid, name, time = TIME_NOW):
1319 msg = testbed_messages[EXPERIMENT_GET]
1320 # avoid having "|" in this parameters
1321 name = base64.b64encode(name)
1322 msg = msg % (testbed_guid, guid, name, time)
1323 self._client.send_msg(msg)
1324 reply = self._client.read_reply()
1325 result = reply.split("|")
1326 code = int(result[0])
1327 text = base64.b64decode(result[1])
1329 raise RuntimeError(text)
1333 msg = controller_messages[SHUTDOWN]
1334 self._client.send_msg(msg)
1335 reply = self._client.read_reply()
1336 result = reply.split("|")
1337 code = int(result[0])
1338 text = base64.b64decode(result[1])
1340 raise RuntimeError(text)
1341 self._client.send_stop()
1342 self._client.read_reply() # wait for it