2 # -*- coding: utf-8 -*-
5 from nepi.core.attributes import AttributesMap, Attribute
6 from nepi.util import server, validation
7 from nepi.util.constants import TIME_NOW
16 # PROTOCOL INSTRUCTION MESSAGES
50 # EXPERIMENT CONTROLER PROTOCOL MESSAGES
51 controller_messages = dict({
53 ACCESS: "%d|%s" % (ACCESS, "%d|%s|%s|%s|%s|%d|%s|%r|%s"),
54 TRACE: "%d|%s" % (TRACE, "%d|%d|%s"),
55 FINISHED: "%d|%s" % (FINISHED, "%d"),
58 SHUTDOWN: "%d" % SHUTDOWN,
61 # TESTBED INSTANCE PROTOCOL MESSAGES
62 testbed_messages = dict({
63 TRACE: "%d|%s" % (TRACE, "%d|%s"),
66 SHUTDOWN: "%d" % SHUTDOWN,
67 CONFIGURE: "%d|%s" % (CONFIGURE, "%s|%s|%d"),
68 CREATE: "%d|%s" % (CREATE, "%d|%s"),
69 CREATE_SET: "%d|%s" % (CREATE_SET, "%d|%s|%s|%d"),
70 FACTORY_SET: "%d|%s" % (FACTORY_SET, "%d|%s|%s|%d"),
71 CONNECT: "%d|%s" % (CONNECT, "%d|%s|%d|%s"),
72 CROSS_CONNECT: "%d|%s" % (CROSS_CONNECT, "%d|%s|%d|%d|%s|%s"),
73 ADD_TRACE: "%d|%s" % (ADD_TRACE, "%d|%s"),
74 ADD_ADDRESS: "%d|%s" % (ADD_ADDRESS, "%d|%s|%d|%s"),
75 ADD_ROUTE: "%d|%s" % (ADD_ROUTE, "%d|%s|%d|%s"),
76 DO_SETUP: "%d" % DO_SETUP,
77 DO_CREATE: "%d" % DO_CREATE,
78 DO_CONNECT: "%d" % DO_CONNECT,
79 DO_CONFIGURE: "%d" % DO_CONFIGURE,
80 DO_CROSS_CONNECT: "%d" % DO_CROSS_CONNECT,
81 GET: "%d|%s" % (GET, "%s|%d|%s"),
82 SET: "%d|%s" % (SET, "%s|%d|%s|%s|%d"),
83 ACTION: "%d|%s" % (ACTION, "%s|%d|%s"),
84 STATUS: "%d|%s" % (STATUS, "%d"),
88 instruction_text = dict({
98 CONFIGURE: "CONFIGURE",
100 CREATE_SET: "CREATE_SET",
101 FACTORY_SET: "FACTORY_SET",
103 CROSS_CONNECT: "CROSS_CONNECT",
104 ADD_TRACE: "ADD_TRACE",
105 ADD_ADDRESS: "ADD_ADDRESS",
106 ADD_ROUTE: "ADD_ROUTE",
107 DO_SETUP: "DO_SETUP",
108 DO_CREATE: "DO_CREATE",
109 DO_CONNECT: "DO_CONNECT",
110 DO_CONFIGURE: "DO_CONFIGURE",
111 DO_CROSS_CONNECT: "DO_CROSS_CONNECT",
124 if isinstance(value, bool):
126 elif isinstance(value, int):
128 elif isinstance(value, float):
133 def set_type(type, value):
139 value = value == "True"
144 def log_msg(server, params):
145 instr = int(params[0])
146 instr_txt = instruction_text[instr]
147 server.log_debug("%s - msg: %s [%s]" % (server.__class__.__name__,
148 instr_txt, ", ".join(map(str, params[1:]))))
150 def log_reply(server, reply):
151 res = reply.split("|")
153 code_txt = instruction_text[code]
154 txt = base64.b64decode(res[1])
155 server.log_debug("%s - reply: %s %s" % (server.__class__.__name__,
158 def launch_ssh_daemon_client(root_dir, python_code, host, port, user, agent):
161 proc = server.popen_ssh_subprocess(python_code, host = host,
162 port = port, user = user, agent = agent)
164 err = proc.stderr.read()
165 raise RuntimeError("Client could not be executed: %s" % \
168 return server.Client(root_dir, host = host, port = port, user = user,
171 def to_server_log_level(log_level):
172 return server.DEBUG_LEVEL \
173 if log_level == AccessConfiguration.DEBUG_LEVEL \
174 else server.ERROR_LEVEL
176 def get_access_config_params(access_config):
177 root_dir = access_config.get_attribute_value("rootDirectory")
178 log_level = access_config.get_attribute_value("logLevel")
179 log_level = to_server_log_level(log_level)
180 user = host = port = agent = None
181 communication = access_config.get_attribute_value("communication")
182 if communication == AccessConfiguration.ACCESS_SSH:
183 user = access_config.get_attribute_value("user")
184 host = access_config.get_attribute_value("host")
185 port = access_config.get_attribute_value("port")
186 agent = access_config.get_attribute_value("useAgent")
187 return (root_dir, log_level, user, host, port, agent)
189 class AccessConfiguration(AttributesMap):
190 MODE_SINGLE_PROCESS = "SINGLE"
191 MODE_DAEMON = "DAEMON"
193 ACCESS_LOCAL = "LOCAL"
194 ERROR_LEVEL = "Error"
195 DEBUG_LEVEL = "Debug"
198 super(AccessConfiguration, self).__init__()
199 self.add_attribute(name = "mode",
200 help = "Instance execution mode",
201 type = Attribute.ENUM,
202 value = AccessConfiguration.MODE_SINGLE_PROCESS,
203 allowed = [AccessConfiguration.MODE_DAEMON,
204 AccessConfiguration.MODE_SINGLE_PROCESS],
205 validation_function = validation.is_enum)
206 self.add_attribute(name = "communication",
207 help = "Instance communication mode",
208 type = Attribute.ENUM,
209 value = AccessConfiguration.ACCESS_LOCAL,
210 allowed = [AccessConfiguration.ACCESS_LOCAL,
211 AccessConfiguration.ACCESS_SSH],
212 validation_function = validation.is_enum)
213 self.add_attribute(name = "host",
214 help = "Host where the testbed will be executed",
215 type = Attribute.STRING,
217 validation_function = validation.is_string)
218 self.add_attribute(name = "user",
219 help = "User on the Host to execute the testbed",
220 type = Attribute.STRING,
221 value = getpass.getuser(),
222 validation_function = validation.is_string)
223 self.add_attribute(name = "port",
224 help = "Port on the Host",
225 type = Attribute.INTEGER,
227 validation_function = validation.is_integer)
228 self.add_attribute(name = "rootDirectory",
229 help = "Root directory for storing process files",
230 type = Attribute.STRING,
232 validation_function = validation.is_string) # TODO: validation.is_path
233 self.add_attribute(name = "useAgent",
234 help = "Use -A option for forwarding of the authentication agent, if ssh access is used",
235 type = Attribute.BOOL,
237 validation_function = validation.is_bool)
238 self.add_attribute(name = "logLevel",
239 help = "Log level for instance",
240 type = Attribute.ENUM,
241 value = AccessConfiguration.ERROR_LEVEL,
242 allowed = [AccessConfiguration.ERROR_LEVEL,
243 AccessConfiguration.DEBUG_LEVEL],
244 validation_function = validation.is_enum)
246 def create_controller(xml, access_config = None):
247 mode = None if not access_config else \
248 access_config.get_attribute_value("mode")
249 if not mode or mode == AccessConfiguration.MODE_SINGLE_PROCESS:
250 from nepi.core.execute import ExperimentController
251 return ExperimentController(xml)
252 elif mode == AccessConfiguration.MODE_DAEMON:
253 (root_dir, log_level, user, host, port, agent) = \
254 get_access_config_params(access_config)
255 return ExperimentControllerProxy(root_dir, log_level,
256 experiment_xml = xml, host = host, port = port, user = user,
258 raise RuntimeError("Unsupported access configuration 'mode'" % mode)
260 def create_testbed_instance(testbed_id, testbed_version, access_config):
261 mode = None if not access_config else access_config.get_attribute_value("mode")
262 if not mode or mode == AccessConfiguration.MODE_SINGLE_PROCESS:
263 return _build_testbed_instance(testbed_id, testbed_version)
264 elif mode == AccessConfiguration.MODE_DAEMON:
265 (root_dir, log_level, user, host, port, agent) = \
266 get_access_config_params(access_config)
267 return TestbedInstanceProxy(root_dir, log_level, testbed_id = testbed_id,
268 testbed_version = testbed_version, host = host, port = port,
269 user = user, agent = agent)
270 raise RuntimeError("Unsupported access configuration 'mode'" % mode)
272 def _build_testbed_instance(testbed_id, testbed_version):
273 mod_name = "nepi.testbeds.%s" % (testbed_id.lower())
274 if not mod_name in sys.modules:
276 module = sys.modules[mod_name]
277 return module.TestbedInstance(testbed_version)
279 class TestbedInstanceServer(server.Server):
280 def __init__(self, root_dir, log_level, testbed_id, testbed_version):
281 super(TestbedInstanceServer, self).__init__(root_dir, log_level)
282 self._testbed_id = testbed_id
283 self._testbed_version = testbed_version
286 def post_daemonize(self):
287 self._testbed = _build_testbed_instance(self._testbed_id,
288 self._testbed_version)
290 def reply_action(self, msg):
291 params = msg.split("|")
292 instruction = int(params[0])
293 log_msg(self, params)
295 if instruction == TRACE:
296 reply = self.trace(params)
297 elif instruction == START:
298 reply = self.start(params)
299 elif instruction == STOP:
300 reply = self.stop(params)
301 elif instruction == SHUTDOWN:
302 reply = self.shutdown(params)
303 elif instruction == CONFIGURE:
304 reply = self.defer_configure(params)
305 elif instruction == CREATE:
306 reply = self.defer_create(params)
307 elif instruction == CREATE_SET:
308 reply = self.defer_create_set(params)
309 elif instruction == FACTORY_SET:
310 reply = self.defer_factory_set(params)
311 elif instruction == CONNECT:
312 reply = self.defer_connect(params)
313 elif instruction == CROSS_CONNECT:
314 reply = self.defer_cross_connect(params)
315 elif instruction == ADD_TRACE:
316 reply = self.defer_add_trace(params)
317 elif instruction == ADD_ADDRESS:
318 reply = self.defer_add_address(params)
319 elif instruction == ADD_ROUTE:
320 reply = self.defer_add_route(params)
321 elif instruction == DO_SETUP:
322 reply = self.do_setup(params)
323 elif instruction == DO_CREATE:
324 reply = self.do_create(params)
325 elif instruction == DO_CONNECT:
326 reply = self.do_connect(params)
327 elif instruction == DO_CONFIGURE:
328 reply = self.do_configure(params)
329 elif instruction == DO_CROSS_CONNECT:
330 reply = self.do_cross_connect(params)
331 elif instruction == GET:
332 reply = self.get(params)
333 elif instruction == SET:
334 reply = self.set(params)
335 elif instruction == ACTION:
336 reply = self.action(params)
337 elif instruction == STATUS:
338 reply = self.status(params)
339 elif instruction == GUIDS:
340 reply = self.guids(params)
342 error = "Invalid instruction %s" % instruction
343 self.log_error(error)
344 result = base64.b64encode(error)
345 reply = "%d|%s" % (ERROR, result)
347 error = self.log_error()
348 result = base64.b64encode(error)
349 reply = "%d|%s" % (ERROR, result)
350 log_reply(self, reply)
353 def guids(self, params):
354 guids = self._testbed.guids
355 guids = ",".join(map(str, guids))
356 result = base64.b64encode(guids)
357 return "%d|%s" % (OK, result)
359 def defer_create(self, params):
360 guid = int(params[1])
361 factory_id = params[2]
362 self._testbed.defer_create(guid, factory_id)
363 return "%d|%s" % (OK, "")
365 def trace(self, params):
366 guid = int(params[1])
368 trace = self._testbed.trace(guid, trace_id)
369 result = base64.b64encode(trace)
370 return "%d|%s" % (OK, result)
372 def start(self, params):
373 self._testbed.start()
374 return "%d|%s" % (OK, "")
376 def stop(self, params):
378 return "%d|%s" % (OK, "")
380 def shutdown(self, params):
381 self._testbed.shutdown()
382 return "%d|%s" % (OK, "")
384 def defer_configure(self, params):
385 name = base64.b64decode(params[1])
386 value = base64.b64decode(params[2])
387 type = int(params[3])
388 value = set_type(type, value)
389 self._testbed.defer_configure(name, value)
390 return "%d|%s" % (OK, "")
392 def defer_create_set(self, params):
393 guid = int(params[1])
394 name = base64.b64decode(params[2])
395 value = base64.b64decode(params[3])
396 type = int(params[4])
397 value = set_type(type, value)
398 self._testbed.defer_create_set(guid, name, value)
399 return "%d|%s" % (OK, "")
401 def defer_factory_set(self, params):
402 name = base64.b64decode(params[1])
403 value = base64.b64decode(params[2])
404 type = int(params[3])
405 value = set_type(type, value)
406 self._testbed.defer_factory_set(name, value)
407 return "%d|%s" % (OK, "")
409 def defer_connect(self, params):
410 guid1 = int(params[1])
411 connector_type_name1 = params[2]
412 guid2 = int(params[3])
413 connector_type_name2 = params[4]
414 self._testbed.defer_connect(guid1, connector_type_name1, guid2,
415 connector_type_name2)
416 return "%d|%s" % (OK, "")
418 def defer_cross_connect(self, params):
419 guid = int(params[1])
420 connector_type_name = params[2]
421 cross_guid = int(params[3])
422 connector_type_name = params[4]
423 cross_guid = int(params[5])
424 cross_testbed_id = params[6]
425 cross_factory_id = params[7]
426 cross_connector_type_name = params[8]
427 self._testbed.defer_cross_connect(guid, connector_type_name, cross_guid,
428 cross_testbed_id, cross_factory_id, cross_connector_type_name)
429 return "%d|%s" % (OK, "")
431 def defer_add_trace(self, params):
432 guid = int(params[1])
434 self._testbed.defer_add_trace(guid, trace_id)
435 return "%d|%s" % (OK, "")
437 def defer_add_address(self, params):
438 guid = int(params[1])
440 netprefix = int(params[3])
441 broadcast = params[4]
442 self._testbed.defer_add_address(guid, address, netprefix,
444 return "%d|%s" % (OK, "")
446 def defer_add_route(self, params):
447 guid = int(params[1])
448 destination = params[2]
449 netprefix = int(params[3])
451 self._testbed.defer_add_route(guid, destination, netprefix, nexthop)
452 return "%d|%s" % (OK, "")
454 def do_setup(self, params):
455 self._testbed.do_setup()
456 return "%d|%s" % (OK, "")
458 def do_create(self, params):
459 self._testbed.do_create()
460 return "%d|%s" % (OK, "")
462 def do_connect(self, params):
463 self._testbed.do_connect()
464 return "%d|%s" % (OK, "")
466 def do_configure(self, params):
467 self._testbed.do_configure()
468 return "%d|%s" % (OK, "")
470 def do_cross_connect(self, params):
471 self._testbed.do_cross_connect()
472 return "%d|%s" % (OK, "")
474 def get(self, params):
476 guid = int(param[2] )
477 name = base64.b64decode(params[3])
478 value = self._testbed.get(time, guid, name)
479 result = base64.b64encode(str(value))
480 return "%d|%s" % (OK, result)
482 def set(self, params):
484 guid = int(params[2])
485 name = base64.b64decode(params[3])
486 value = base64.b64decode(params[4])
487 type = int(params[3])
488 value = set_type(type, value)
489 self._testbed.set(time, guid, name, value)
490 return "%d|%s" % (OK, "")
492 def action(self, params):
494 guid = int(params[2])
495 command = base64.b64decode(params[3])
496 self._testbed.action(time, guid, command)
497 return "%d|%s" % (OK, "")
499 def status(self, params):
500 guid = int(params[1])
501 status = self._testbed.status(guid)
502 result = base64.b64encode(str(status))
503 return "%d|%s" % (OK, result)
505 class ExperimentControllerServer(server.Server):
506 def __init__(self, root_dir, log_level, experiment_xml):
507 super(ExperimentControllerServer, self).__init__(root_dir, log_level)
508 self._experiment_xml = experiment_xml
509 self._controller = None
511 def post_daemonize(self):
512 from nepi.core.execute import ExperimentController
513 self._controller = ExperimentController(self._experiment_xml)
515 def reply_action(self, msg):
516 params = msg.split("|")
517 instruction = int(params[0])
518 log_msg(self, params)
520 if instruction == XML:
521 reply = self.experiment_xml(params)
522 elif instruction == ACCESS:
523 reply = self.set_access_configuration(params)
524 elif instruction == TRACE:
525 reply = self.trace(params)
526 elif instruction == FINISHED:
527 reply = self.is_finished(params)
528 elif instruction == START:
529 reply = self.start(params)
530 elif instruction == STOP:
531 reply = self.stop(params)
532 elif instruction == SHUTDOWN:
533 reply = self.shutdown(params)
535 error = "Invalid instruction %s" % instruction
536 self.log_error(error)
537 result = base64.b64encode(error)
538 reply = "%d|%s" % (ERROR, result)
540 error = self.log_error()
541 result = base64.b64encode(error)
542 reply = "%d|%s" % (ERROR, result)
543 log_reply(self, reply)
546 def experiment_xml(self, params):
547 xml = self._controller.experiment_xml
548 result = base64.b64encode(xml)
549 return "%d|%s" % (OK, result)
551 def set_access_configuration(self, params):
552 testbed_guid = int(params[1])
554 communication = params[3]
557 port = int(params[6])
559 use_agent = params[8] == "True"
560 log_level = params[9]
561 access_config = AccessConfiguration()
562 access_config.set_attribute_value("mode", mode)
563 access_config.set_attribute_value("communication", communication)
564 access_config.set_attribute_value("host", host)
565 access_config.set_attribute_value("user", user)
566 access_config.set_attribute_value("port", port)
567 access_config.set_attribute_value("rootDirectory", root_dir)
568 access_config.set_attribute_value("useAgent", use_agent)
569 access_config.set_attribute_value("logLevel", log_level)
570 self._controller.set_access_configuration(testbed_guid,
572 return "%d|%s" % (OK, "")
574 def trace(self, params):
575 testbed_guid = int(params[1])
576 guid = int(params[2])
578 trace = self._controller.trace(testbed_guid, guid, trace_id)
579 result = base64.b64encode(trace)
580 return "%d|%s" % (OK, result)
582 def is_finished(self, params):
583 guid = int(params[1])
584 status = self._controller.is_finished(guid)
585 result = base64.b64encode(str(status))
586 return "%d|%s" % (OK, result)
588 def start(self, params):
589 self._controller.start()
590 return "%d|%s" % (OK, "")
592 def stop(self, params):
593 self._controller.stop()
594 return "%d|%s" % (OK, "")
596 def shutdown(self, params):
597 self._controller.shutdown()
598 return "%d|%s" % (OK, "")
600 class TestbedInstanceProxy(object):
601 def __init__(self, root_dir, log_level, testbed_id = None,
602 testbed_version = None, launch = True, host = None,
603 port = None, user = None, agent = None):
605 if testbed_id == None or testbed_version == None:
606 raise RuntimeError("To launch a TesbedInstance server a \
607 testbed_id and testbed_version are required")
610 python_code = "from nepi.util.proxy import \
611 TesbedInstanceServer;\
612 s = TestbedInstanceServer('%s', %d, '%s', '%s');\
613 s.run()" % (root_dir, log_level, testbed_id,
615 self._client = launch_ssh_daemon_client(root_dir, python_code,
616 host, port, user, agent)
619 s = TestbedInstanceServer(root_dir, log_level, testbed_id,
623 self._client = server.Client(root_dir)
625 # attempt to reconnect
627 self._client = launch_ssh_daemon_client(root_dir, None,
628 host, port, user, agent)
630 self._client = server.Client(root_dir)
634 msg = testbed_messages[GUIDS]
635 self._client.send_msg(msg)
636 reply = self._client.read_reply()
637 result = reply.split("|")
638 code = int(result[0])
639 text = base64.b64decode(result[1])
641 raise RuntimeError(text)
642 return map(int, text.split(","))
644 def defer_configure(self, name, value):
645 msg = testbed_messages[CONFIGURE]
646 type = get_type(value)
647 # avoid having "|" in this parameters
648 name = base64.b64encode(name)
649 value = base64.b64encode(str(value))
650 msg = msg % (name, value, type)
651 self._client.send_msg(msg)
652 reply = self._client.read_reply()
653 result = reply.split("|")
654 code = int(result[0])
655 text = base64.b64decode(result[1])
657 raise RuntimeError(text)
659 def defer_create(self, guid, factory_id):
660 msg = testbed_messages[CREATE]
661 msg = msg % (guid, factory_id)
662 self._client.send_msg(msg)
663 reply = self._client.read_reply()
664 result = reply.split("|")
665 code = int(result[0])
666 text = base64.b64decode(result[1])
668 raise RuntimeError(text)
670 def defer_create_set(self, guid, name, value):
671 msg = testbed_messages[CREATE_SET]
672 type = get_type(value)
673 # avoid having "|" in this parameters
674 name = base64.b64encode(name)
675 value = base64.b64encode(str(value))
676 msg = msg % (guid, name, value, type)
677 self._client.send_msg(msg)
678 reply = self._client.read_reply()
679 result = reply.split("|")
680 code = int(result[0])
681 text = base64.b64decode(result[1])
683 raise RuntimeError(text)
685 def defer_factory_set(self, guid, name, value):
686 msg = testbed_messages[FACTORY_SET]
687 type = get_type(value)
688 # avoid having "|" in this parameters
689 name = base64.b64encode(name)
690 value = base64.b64encode(str(value))
691 msg = msg % (guid, name, value, type)
692 self._client.send_msg(msg)
693 reply = self._client.read_reply()
694 result = reply.split("|")
695 code = int(result[0])
696 text = base64.b64decode(result[1])
698 raise RuntimeError(text)
700 def defer_connect(self, guid1, connector_type_name1, guid2,
701 connector_type_name2):
702 msg = testbed_messages[CONNECT]
703 msg = msg % (guid1, connector_type_name1, guid2,
704 connector_type_name2)
705 self._client.send_msg(msg)
706 reply = self._client.read_reply()
707 result = reply.split("|")
708 code = int(result[0])
709 text = base64.b64decode(result[1])
711 raise RuntimeError(text)
713 def defer_cross_connect(self, guid, connector_type_name, cross_guid,
714 cross_testbed_id, cross_factory_id, cross_connector_type_name):
715 msg = testbed_messages[CROSS_CONNECT]
716 msg = msg % (guid, connector_type_name, cross_guid,
717 cross_testbed_id, cross_factory_id, cross_connector_type_name)
718 self._client.send_msg(msg)
719 reply = self._client.read_reply()
720 result = reply.split("|")
721 code = int(result[0])
722 text = base64.b64decode(result[1])
724 raise RuntimeError(text)
726 def defer_add_trace(self, guid, trace_id):
727 msg = testbed_messages[ADD_TRACE]
728 msg = msg % (guid, trace_id)
729 self._client.send_msg(msg)
730 reply = self._client.read_reply()
731 result = reply.split("|")
732 code = int(result[0])
733 text = base64.b64decode(result[1])
735 raise RuntimeError(text)
737 def defer_add_address(self, guid, address, netprefix, broadcast):
738 msg = testbed_messages[ADD_ADDRESS]
739 msg = msg % (guid, address, netprefix, broadcast)
740 self._client.send_msg(msg)
741 reply = self._client.read_reply()
742 result = reply.split("|")
743 code = int(result[0])
744 text = base64.b64decode(result[1])
746 raise RuntimeError(text)
748 def defer_add_route(self, guid, destination, netprefix, nexthop):
749 msg = testbed_messages[ADD_ROUTE]
750 msg = msg % (guid, destination, netprefix, nexthop)
751 self._client.send_msg(msg)
752 reply = self._client.read_reply()
753 result = reply.split("|")
754 code = int(result[0])
755 text = base64.b64decode(result[1])
757 raise RuntimeError(text)
760 msg = testbed_messages[DO_SETUP]
761 self._client.send_msg(msg)
762 reply = self._client.read_reply()
763 result = reply.split("|")
764 code = int(result[0])
765 text = base64.b64decode(result[1])
767 raise RuntimeError(text)
770 msg = testbed_messages[DO_CREATE]
771 self._client.send_msg(msg)
772 reply = self._client.read_reply()
773 result = reply.split("|")
774 code = int(result[0])
775 text = base64.b64decode(result[1])
777 raise RuntimeError(text)
779 def do_connect(self):
780 msg = testbed_messages[DO_CONNECT]
781 self._client.send_msg(msg)
782 reply = self._client.read_reply()
783 result = reply.split("|")
784 code = int(result[0])
785 text = base64.b64decode(result[1])
787 raise RuntimeError(text)
789 def do_configure(self):
790 msg = testbed_messages[DO_CONFIGURE]
791 self._client.send_msg(msg)
792 reply = self._client.read_reply()
793 result = reply.split("|")
794 code = int(result[0])
795 text = base64.b64decode(result[1])
797 raise RuntimeError(text)
799 def do_cross_connect(self):
800 msg = testbed_messages[DO_CROSS_CONNECT]
801 self._client.send_msg(msg)
802 reply = self._client.read_reply()
803 result = reply.split("|")
804 code = int(result[0])
805 text = base64.b64decode(result[1])
807 raise RuntimeError(text)
809 def start(self, time = TIME_NOW):
810 msg = testbed_messages[START]
811 self._client.send_msg(msg)
812 reply = self._client.read_reply()
813 result = reply.split("|")
814 code = int(result[0])
815 text = base64.b64decode(result[1])
817 raise RuntimeError(text)
819 def stop(self, time = TIME_NOW):
820 msg = testbed_messages[STOP]
821 self._client.send_msg(msg)
822 reply = self._client.read_reply()
823 result = reply.split("|")
824 code = int(result[0])
825 text = base64.b64decode(result[1])
827 raise RuntimeError(text)
829 def set(self, time, guid, name, value):
830 msg = testbed_messages[SET]
831 type = get_type(value)
832 # avoid having "|" in this parameters
833 name = base64.b64encode(name)
834 value = base64.b64encode(str(value))
835 msg = msg % (time, guid, name, value, type)
836 self._client.send_msg(msg)
837 reply = self._client.read_reply()
838 result = reply.split("|")
839 code = int(result[0])
840 text = base64.b64decode(result[1])
842 raise RuntimeError(text)
844 def get(self, time, guid, name):
845 msg = testbed_messages[GET]
846 # avoid having "|" in this parameters
847 name = base64.b64encode(name)
848 msg = msg % (time, guid, name)
849 self._client.send_msg(msg)
850 reply = self._client.read_reply()
851 result = reply.split("|")
852 code = int(result[0])
853 text = base64.b64decode(result[1])
855 raise RuntimeError(text)
858 def action(self, time, guid, action):
859 msg = testbed_messages[ACTION]
860 msg = msg % (time, guid, action)
861 self._client.send_msg(msg)
862 reply = self._client.read_reply()
863 result = reply.split("|")
864 code = int(result[0])
865 text = base64.b64decode(result[1])
867 raise RuntimeError(text)
869 def status(self, guid):
870 msg = testbed_messages[STATUS]
872 self._client.send_msg(msg)
873 reply = self._client.read_reply()
874 result = reply.split("|")
875 code = int(result[0])
876 text = base64.b64decode(result[1])
878 raise RuntimeError(text)
881 def trace(self, guid, trace_id):
882 msg = testbed_messages[TRACE]
883 msg = msg % (guid, trace_id)
884 self._client.send_msg(msg)
885 reply = self._client.read_reply()
886 result = reply.split("|")
887 code = int(result[0])
888 text = base64.b64decode(result[1])
890 raise RuntimeError(text)
894 msg = testbed_messages[SHUTDOWN]
895 self._client.send_msg(msg)
896 reply = self._client.read_reply()
897 result = reply.split("|")
898 code = int(result[0])
899 text = base64.b64decode(result[1])
901 raise RuntimeError(text)
902 self._client.send_stop()
904 class ExperimentControllerProxy(object):
905 def __init__(self, root_dir, log_level, experiment_xml = None,
906 launch = True, host = None, port = None, user = None,
910 if experiment_xml == None:
911 raise RuntimeError("To launch a ExperimentControllerServer a \
912 xml description of the experiment is required")
916 xml = xml.replace("'", r"\'")
917 xml = xml.replace("\"", r"\'")
918 xml = xml.replace("\n", r"")
919 python_code = "from nepi.util.proxy import ExperimentControllerServer;\
920 s = ExperimentControllerServer(%r, %r, %r);\
921 s.run()" % (root_dir, log_level, xml)
922 self._client = launch_ssh_daemon_client(root_dir, python_code,
923 host, port, user, agent)
926 s = ExperimentControllerServer(root_dir, log_level, experiment_xml)
929 self._client = server.Client(root_dir)
931 # attempt to reconnect
933 self._client = launch_ssh_daemon_client(root_dir, None,
934 host, port, user, agent)
936 self._client = server.Client(root_dir)
939 def experiment_xml(self):
940 msg = controller_messages[XML]
941 self._client.send_msg(msg)
942 reply = self._client.read_reply()
943 result = reply.split("|")
944 code = int(result[0])
945 text = base64.b64decode(result[1])
947 raise RuntimeError(text)
950 def set_access_configuration(self, testbed_guid, access_config):
951 mode = access_config.get_attribute_value("mode")
952 communication = access_config.get_attribute_value("communication")
953 host = access_config.get_attribute_value("host")
954 user = access_config.get_attribute_value("user")
955 port = access_config.get_attribute_value("port")
956 root_dir = access_config.get_attribute_value("rootDirectory")
957 use_agent = access_config.get_attribute_value("useAgent")
958 log_level = access_config.get_attribute_value("logLevel")
959 msg = controller_messages[ACCESS]
960 msg = msg % (testbed_guid, mode, communication, host, user, port,
961 root_dir, use_agent, log_level)
962 self._client.send_msg(msg)
963 reply = self._client.read_reply()
964 result = reply.split("|")
965 code = int(result[0])
966 text = base64.b64decode(result[1])
968 raise RuntimeError(text)
970 def trace(self, testbed_guid, guid, trace_id):
971 msg = controller_messages[TRACE]
972 msg = msg % (testbed_guid, guid, trace_id)
973 self._client.send_msg(msg)
974 reply = self._client.read_reply()
975 result = reply.split("|")
976 code = int(result[0])
977 text = base64.b64decode(result[1])
980 raise RuntimeError(text)
983 msg = controller_messages[START]
984 self._client.send_msg(msg)
985 reply = self._client.read_reply()
986 result = reply.split("|")
987 code = int(result[0])
988 text = base64.b64decode(result[1])
990 raise RuntimeError(text)
993 msg = controller_messages[STOP]
994 self._client.send_msg(msg)
995 reply = self._client.read_reply()
996 result = reply.split("|")
997 code = int(result[0])
998 text = base64.b64decode(result[1])
1000 raise RuntimeError(text)
1002 def is_finished(self, guid):
1003 msg = controller_messages[FINISHED]
1005 self._client.send_msg(msg)
1006 reply = self._client.read_reply()
1007 result = reply.split("|")
1008 code = int(result[0])
1009 text = base64.b64decode(result[1])
1011 raise RuntimeError(text)
1012 return text == "True"
1015 msg = controller_messages[SHUTDOWN]
1016 self._client.send_msg(msg)
1017 reply = self._client.read_reply()
1018 result = reply.split("|")
1019 code = int(result[0])
1020 text = base64.b64decode(result[1])
1022 raise RuntimeError(text)
1023 self._client.send_stop()