2 # -*- coding: utf-8 -*-
5 from nepi.core.attributes import AttributesMap, Attribute
6 from nepi.util import server, validation
7 from nepi.util.constants import TIME_NOW
16 # PROTOCOL INSTRUCTION MESSAGES
50 # EXPERIMENT CONTROLER PROTOCOL MESSAGES
51 controller_messages = dict({
53 ACCESS: "%d|%s" % (ACCESS, "%d|%s|%s|%s|%s|%d|%s|%r|%s"),
54 TRACE: "%d|%s" % (TRACE, "%d|%d|%s"),
55 FINISHED: "%d|%s" % (FINISHED, "%d"),
58 SHUTDOWN: "%d" % SHUTDOWN,
61 # TESTBED INSTANCE PROTOCOL MESSAGES
62 testbed_messages = dict({
63 TRACE: "%d|%s" % (TRACE, "%d|%s"),
66 SHUTDOWN: "%d" % SHUTDOWN,
67 CONFIGURE: "%d|%s" % (CONFIGURE, "%s|%s|%d"),
68 CREATE: "%d|%s" % (CREATE, "%d|%s"),
69 CREATE_SET: "%d|%s" % (CREATE_SET, "%d|%s|%s|%d"),
70 FACTORY_SET: "%d|%s" % (FACTORY_SET, "%d|%s|%s|%d"),
71 CONNECT: "%d|%s" % (CONNECT, "%d|%s|%d|%s"),
72 CROSS_CONNECT: "%d|%s" % (CROSS_CONNECT, "%d|%s|%d|%d|%s|%s"),
73 ADD_TRACE: "%d|%s" % (ADD_TRACE, "%d|%s"),
74 ADD_ADDRESS: "%d|%s" % (ADD_ADDRESS, "%d|%s|%d|%s"),
75 ADD_ROUTE: "%d|%s" % (ADD_ROUTE, "%d|%s|%d|%s"),
76 DO_SETUP: "%d" % DO_SETUP,
77 DO_CREATE: "%d" % DO_CREATE,
78 DO_CONNECT: "%d" % DO_CONNECT,
79 DO_CONFIGURE: "%d" % DO_CONFIGURE,
80 DO_CROSS_CONNECT: "%d" % DO_CROSS_CONNECT,
81 GET: "%d|%s" % (GET, "%s|%d|%s"),
82 SET: "%d|%s" % (SET, "%s|%d|%s|%s|%d"),
83 ACTION: "%d|%s" % (ACTION, "%s|%d|%s"),
84 STATUS: "%d|%s" % (STATUS, "%d"),
88 instruction_text = dict({
98 CONFIGURE: "CONFIGURE",
100 CREATE_SET: "CREATE_SET",
101 FACTORY_SET: "FACTORY_SET",
103 CROSS_CONNECT: "CROSS_CONNECT",
104 ADD_TRACE: "ADD_TRACE",
105 ADD_ADDRESS: "ADD_ADDRESS",
106 ADD_ROUTE: "ADD_ROUTE",
107 DO_SETUP: "DO_SETUP",
108 DO_CREATE: "DO_CREATE",
109 DO_CONNECT: "DO_CONNECT",
110 DO_CONFIGURE: "DO_CONFIGURE",
111 DO_CROSS_CONNECT: "DO_CROSS_CONNECT",
124 if isinstance(value, bool):
126 elif isinstance(value, int):
128 elif isinstance(value, float):
133 def set_type(type, value):
139 value = value == "True"
144 def log_msg(server, params):
145 instr = int(params[0])
146 instr_txt = instruction_text[instr]
147 server.log_debug("%s - msg: %s [%s]" % (server.__class__.__name__,
148 instr_txt, ", ".join(map(str, params[1:]))))
150 def log_reply(server, reply):
151 res = reply.split("|")
153 code_txt = instruction_text[code]
154 txt = base64.b64decode(res[1])
155 server.log_debug("%s - reply: %s %s" % (server.__class__.__name__,
158 def launch_ssh_daemon_client(root_dir, python_code, host, port, user, agent):
160 proc = server.popen_ssh_subprocess(python_code, host = host,
161 port = port, user = user, agent = agent)
163 err = proc.stderr.read()
164 raise RuntimeError("Client could not be executed: %s" % \
167 return server.Client(root_dir, host = host, port = port, user = user,
170 def to_server_log_level(log_level):
171 return server.DEBUG_LEVEL \
172 if log_level == AccessConfiguration.DEBUG_LEVEL \
173 else server.ERROR_LEVEL
175 def get_access_config_params(access_config):
176 root_dir = access_config.get_attribute_value("rootDirectory")
177 log_level = access_config.get_attribute_value("logLevel")
178 log_level = to_server_log_level(log_level)
179 user = host = port = agent = None
180 communication = access_config.get_attribute_value("communication")
181 if communication == AccessConfiguration.ACCESS_SSH:
182 user = access_config.get_attribute_value("user")
183 host = access_config.get_attribute_value("host")
184 port = access_config.get_attribute_value("port")
185 agent = access_config.get_attribute_value("useAgent")
186 return (root_dir, log_level, user, host, port, agent)
188 class AccessConfiguration(AttributesMap):
189 MODE_SINGLE_PROCESS = "SINGLE"
190 MODE_DAEMON = "DAEMON"
192 ACCESS_LOCAL = "LOCAL"
193 ERROR_LEVEL = "Error"
194 DEBUG_LEVEL = "Debug"
197 super(AccessConfiguration, self).__init__()
198 self.add_attribute(name = "mode",
199 help = "Instance execution mode",
200 type = Attribute.ENUM,
201 value = AccessConfiguration.MODE_SINGLE_PROCESS,
202 allowed = [AccessConfiguration.MODE_DAEMON,
203 AccessConfiguration.MODE_SINGLE_PROCESS],
204 validation_function = validation.is_enum)
205 self.add_attribute(name = "communication",
206 help = "Instance communication mode",
207 type = Attribute.ENUM,
208 value = AccessConfiguration.ACCESS_LOCAL,
209 allowed = [AccessConfiguration.ACCESS_LOCAL,
210 AccessConfiguration.ACCESS_SSH],
211 validation_function = validation.is_enum)
212 self.add_attribute(name = "host",
213 help = "Host where the testbed will be executed",
214 type = Attribute.STRING,
216 validation_function = validation.is_string)
217 self.add_attribute(name = "user",
218 help = "User on the Host to execute the testbed",
219 type = Attribute.STRING,
220 value = getpass.getuser(),
221 validation_function = validation.is_string)
222 self.add_attribute(name = "port",
223 help = "Port on the Host",
224 type = Attribute.INTEGER,
226 validation_function = validation.is_integer)
227 self.add_attribute(name = "rootDirectory",
228 help = "Root directory for storing process files",
229 type = Attribute.STRING,
231 validation_function = validation.is_string) # TODO: validation.is_path
232 self.add_attribute(name = "useAgent",
233 help = "Use -A option for forwarding of the authentication agent, if ssh access is used",
234 type = Attribute.BOOL,
236 validation_function = validation.is_bool)
237 self.add_attribute(name = "logLevel",
238 help = "Log level for instance",
239 type = Attribute.ENUM,
240 value = AccessConfiguration.ERROR_LEVEL,
241 allowed = [AccessConfiguration.ERROR_LEVEL,
242 AccessConfiguration.DEBUG_LEVEL],
243 validation_function = validation.is_enum)
245 def create_controller(xml, access_config = None):
246 mode = None if not access_config else \
247 access_config.get_attribute_value("mode")
248 if not mode or mode == AccessConfiguration.MODE_SINGLE_PROCESS:
249 from nepi.core.execute import ExperimentController
250 return ExperimentController(xml)
251 elif mode == AccessConfiguration.MODE_DAEMON:
252 (root_dir, log_level, user, host, port, agent) = \
253 get_access_config_params(access_config)
254 return ExperimentControllerProxy(root_dir, log_level,
255 experiment_xml = xml, host = host, port = port, user = user,
257 raise RuntimeError("Unsupported access configuration 'mode'" % mode)
259 def create_testbed_instance(testbed_id, testbed_version, access_config):
260 mode = None if not access_config else access_config.get_attribute_value("mode")
261 if not mode or mode == AccessConfiguration.MODE_SINGLE_PROCESS:
262 return _build_testbed_instance(testbed_id, testbed_version)
263 elif mode == AccessConfiguration.MODE_DAEMON:
264 (root_dir, log_level, user, host, port, agent) = \
265 get_access_config_params(access_config)
266 return TestbedInstanceProxy(root_dir, log_level, testbed_id = testbed_id,
267 testbed_version = testbed_version, host = host, port = port,
268 user = user, agent = agent)
269 raise RuntimeError("Unsupported access configuration 'mode'" % mode)
271 def _build_testbed_instance(testbed_id, testbed_version):
272 mod_name = "nepi.testbeds.%s" % (testbed_id.lower())
273 if not mod_name in sys.modules:
275 module = sys.modules[mod_name]
276 return module.TestbedInstance(testbed_version)
278 class TestbedInstanceServer(server.Server):
279 def __init__(self, root_dir, log_level, testbed_id, testbed_version):
280 super(TestbedInstanceServer, self).__init__(root_dir, log_level)
281 self._testbed_id = testbed_id
282 self._testbed_version = testbed_version
285 def post_daemonize(self):
286 self._testbed = _build_testbed_instance(self._testbed_id,
287 self._testbed_version)
289 def reply_action(self, msg):
290 params = msg.split("|")
291 instruction = int(params[0])
292 log_msg(self, params)
294 if instruction == TRACE:
295 reply = self.trace(params)
296 elif instruction == START:
297 reply = self.start(params)
298 elif instruction == STOP:
299 reply = self.stop(params)
300 elif instruction == SHUTDOWN:
301 reply = self.shutdown(params)
302 elif instruction == CONFIGURE:
303 reply = self.defer_configure(params)
304 elif instruction == CREATE:
305 reply = self.defer_create(params)
306 elif instruction == CREATE_SET:
307 reply = self.defer_create_set(params)
308 elif instruction == FACTORY_SET:
309 reply = self.defer_factory_set(params)
310 elif instruction == CONNECT:
311 reply = self.defer_connect(params)
312 elif instruction == CROSS_CONNECT:
313 reply = self.defer_cross_connect(params)
314 elif instruction == ADD_TRACE:
315 reply = self.defer_add_trace(params)
316 elif instruction == ADD_ADDRESS:
317 reply = self.defer_add_address(params)
318 elif instruction == ADD_ROUTE:
319 reply = self.defer_add_route(params)
320 elif instruction == DO_SETUP:
321 reply = self.do_setup(params)
322 elif instruction == DO_CREATE:
323 reply = self.do_create(params)
324 elif instruction == DO_CONNECT:
325 reply = self.do_connect(params)
326 elif instruction == DO_CONFIGURE:
327 reply = self.do_configure(params)
328 elif instruction == DO_CROSS_CONNECT:
329 reply = self.do_cross_connect(params)
330 elif instruction == GET:
331 reply = self.get(params)
332 elif instruction == SET:
333 reply = self.set(params)
334 elif instruction == ACTION:
335 reply = self.action(params)
336 elif instruction == STATUS:
337 reply = self.status(params)
338 elif instruction == GUIDS:
339 reply = self.guids(params)
341 error = "Invalid instruction %s" % instruction
342 self.log_error(error)
343 result = base64.b64encode(error)
344 reply = "%d|%s" % (ERROR, result)
346 error = self.log_error()
347 result = base64.b64encode(error)
348 reply = "%d|%s" % (ERROR, result)
349 log_reply(self, reply)
352 def guids(self, params):
353 guids = self._testbed.guids
354 guids = ",".join(map(str, guids))
355 result = base64.b64encode(guids)
356 return "%d|%s" % (OK, result)
358 def defer_create(self, params):
359 guid = int(params[1])
360 factory_id = params[2]
361 self._testbed.defer_create(guid, factory_id)
362 return "%d|%s" % (OK, "")
364 def trace(self, params):
365 guid = int(params[1])
367 trace = self._testbed.trace(guid, trace_id)
368 result = base64.b64encode(trace)
369 return "%d|%s" % (OK, result)
371 def start(self, params):
372 self._testbed.start()
373 return "%d|%s" % (OK, "")
375 def stop(self, params):
377 return "%d|%s" % (OK, "")
379 def shutdown(self, params):
380 self._testbed.shutdown()
381 return "%d|%s" % (OK, "")
383 def defer_configure(self, params):
384 name = base64.b64decode(params[1])
385 value = base64.b64decode(params[2])
386 type = int(params[3])
387 value = set_type(type, value)
388 self._testbed.defer_configure(name, value)
389 return "%d|%s" % (OK, "")
391 def defer_create_set(self, params):
392 guid = int(params[1])
393 name = base64.b64decode(params[2])
394 value = base64.b64decode(params[3])
395 type = int(params[4])
396 value = set_type(type, value)
397 self._testbed.defer_create_set(guid, name, value)
398 return "%d|%s" % (OK, "")
400 def defer_factory_set(self, params):
401 name = base64.b64decode(params[1])
402 value = base64.b64decode(params[2])
403 type = int(params[3])
404 value = set_type(type, value)
405 self._testbed.defer_factory_set(name, value)
406 return "%d|%s" % (OK, "")
408 def defer_connect(self, params):
409 guid1 = int(params[1])
410 connector_type_name1 = params[2]
411 guid2 = int(params[3])
412 connector_type_name2 = params[4]
413 self._testbed.defer_connect(guid1, connector_type_name1, guid2,
414 connector_type_name2)
415 return "%d|%s" % (OK, "")
417 def defer_cross_connect(self, params):
418 guid = int(params[1])
419 connector_type_name = params[2]
420 cross_guid = int(params[3])
421 connector_type_name = params[4]
422 cross_guid = int(params[5])
423 cross_testbed_id = params[6]
424 cross_factory_id = params[7]
425 cross_connector_type_name = params[8]
426 self._testbed.defer_cross_connect(guid, connector_type_name, cross_guid,
427 cross_testbed_id, cross_factory_id, cross_connector_type_name)
428 return "%d|%s" % (OK, "")
430 def defer_add_trace(self, params):
431 guid = int(params[1])
433 self._testbed.defer_add_trace(guid, trace_id)
434 return "%d|%s" % (OK, "")
436 def defer_add_address(self, params):
437 guid = int(params[1])
439 netprefix = int(params[3])
440 broadcast = params[4]
441 self._testbed.defer_add_address(guid, address, netprefix,
443 return "%d|%s" % (OK, "")
445 def defer_add_route(self, params):
446 guid = int(params[1])
447 destination = params[2]
448 netprefix = int(params[3])
450 self._testbed.defer_add_route(guid, destination, netprefix, nexthop)
451 return "%d|%s" % (OK, "")
453 def do_setup(self, params):
454 self._testbed.do_setup()
455 return "%d|%s" % (OK, "")
457 def do_create(self, params):
458 self._testbed.do_create()
459 return "%d|%s" % (OK, "")
461 def do_connect(self, params):
462 self._testbed.do_connect()
463 return "%d|%s" % (OK, "")
465 def do_configure(self, params):
466 self._testbed.do_configure()
467 return "%d|%s" % (OK, "")
469 def do_cross_connect(self, params):
470 self._testbed.do_cross_connect()
471 return "%d|%s" % (OK, "")
473 def get(self, params):
475 guid = int(param[2] )
476 name = base64.b64decode(params[3])
477 value = self._testbed.get(time, guid, name)
478 result = base64.b64encode(str(value))
479 return "%d|%s" % (OK, result)
481 def set(self, params):
483 guid = int(params[2])
484 name = base64.b64decode(params[3])
485 value = base64.b64decode(params[4])
486 type = int(params[3])
487 value = set_type(type, value)
488 self._testbed.set(time, guid, name, value)
489 return "%d|%s" % (OK, "")
491 def action(self, params):
493 guid = int(params[2])
494 command = base64.b64decode(params[3])
495 self._testbed.action(time, guid, command)
496 return "%d|%s" % (OK, "")
498 def status(self, params):
499 guid = int(params[1])
500 status = self._testbed.status(guid)
501 result = base64.b64encode(str(status))
502 return "%d|%s" % (OK, result)
504 class ExperimentControllerServer(server.Server):
505 def __init__(self, root_dir, log_level, experiment_xml):
506 super(ExperimentControllerServer, self).__init__(root_dir, log_level)
507 self._experiment_xml = experiment_xml
508 self._controller = None
510 def post_daemonize(self):
511 from nepi.core.execute import ExperimentController
512 self._controller = ExperimentController(self._experiment_xml)
514 def reply_action(self, msg):
515 params = msg.split("|")
516 instruction = int(params[0])
517 log_msg(self, params)
519 if instruction == XML:
520 reply = self.experiment_xml(params)
521 elif instruction == ACCESS:
522 reply = self.set_access_configuration(params)
523 elif instruction == TRACE:
524 reply = self.trace(params)
525 elif instruction == FINISHED:
526 reply = self.is_finished(params)
527 elif instruction == START:
528 reply = self.start(params)
529 elif instruction == STOP:
530 reply = self.stop(params)
531 elif instruction == SHUTDOWN:
532 reply = self.shutdown(params)
534 error = "Invalid instruction %s" % instruction
535 self.log_error(error)
536 result = base64.b64encode(error)
537 reply = "%d|%s" % (ERROR, result)
539 error = self.log_error()
540 result = base64.b64encode(error)
541 reply = "%d|%s" % (ERROR, result)
542 log_reply(self, reply)
545 def experiment_xml(self, params):
546 xml = self._controller.experiment_xml
547 result = base64.b64encode(xml)
548 return "%d|%s" % (OK, result)
550 def set_access_configuration(self, params):
551 testbed_guid = int(params[1])
553 communication = params[3]
556 port = int(params[6])
558 use_agent = params[8] == "True"
559 log_level = params[9]
560 access_config = AccessConfiguration()
561 access_config.set_attribute_value("mode", mode)
562 access_config.set_attribute_value("communication", communication)
563 access_config.set_attribute_value("host", host)
564 access_config.set_attribute_value("user", user)
565 access_config.set_attribute_value("port", port)
566 access_config.set_attribute_value("rootDirectory", root_dir)
567 access_config.set_attribute_value("useAgent", use_agent)
568 access_config.set_attribute_value("logLevel", log_level)
569 self._controller.set_access_configuration(testbed_guid,
571 return "%d|%s" % (OK, "")
573 def trace(self, params):
574 testbed_guid = int(params[1])
575 guid = int(params[2])
577 trace = self._controller.trace(testbed_guid, guid, trace_id)
578 result = base64.b64encode(trace)
579 return "%d|%s" % (OK, result)
581 def is_finished(self, params):
582 guid = int(params[1])
583 status = self._controller.is_finished(guid)
584 result = base64.b64encode(str(status))
585 return "%d|%s" % (OK, result)
587 def start(self, params):
588 self._controller.start()
589 return "%d|%s" % (OK, "")
591 def stop(self, params):
592 self._controller.stop()
593 return "%d|%s" % (OK, "")
595 def shutdown(self, params):
596 self._controller.shutdown()
597 return "%d|%s" % (OK, "")
599 class TestbedInstanceProxy(object):
600 def __init__(self, root_dir, log_level, testbed_id = None,
601 testbed_version = None, launch = True, host = None,
602 port = None, user = None, agent = None):
604 if testbed_id == None or testbed_version == None:
605 raise RuntimeError("To launch a TesbedInstance server a \
606 testbed_id and testbed_version are required")
609 python_code = "from nepi.util.proxy import \
610 TesbedInstanceServer;\
611 s = TestbedInstanceServer('%s', %d, '%s', '%s');\
612 s.run()" % (root_dir, log_level, testbed_id,
614 self._client = launch_ssh_daemon_client(root_dir, python_code,
615 host, port, user, agent)
618 s = TestbedInstanceServer(root_dir, log_level, testbed_id,
622 self._client = server.Client(root_dir)
626 msg = testbed_messages[GUIDS]
627 self._client.send_msg(msg)
628 reply = self._client.read_reply()
629 result = reply.split("|")
630 code = int(result[0])
631 text = base64.b64decode(result[1])
633 raise RuntimeError(text)
634 return map(int, text.split(","))
636 def defer_configure(self, name, value):
637 msg = testbed_messages[CONFIGURE]
638 type = get_type(value)
639 # avoid having "|" in this parameters
640 name = base64.b64encode(name)
641 value = base64.b64encode(str(value))
642 msg = msg % (name, value, type)
643 self._client.send_msg(msg)
644 reply = self._client.read_reply()
645 result = reply.split("|")
646 code = int(result[0])
647 text = base64.b64decode(result[1])
649 raise RuntimeError(text)
651 def defer_create(self, guid, factory_id):
652 msg = testbed_messages[CREATE]
653 msg = msg % (guid, factory_id)
654 self._client.send_msg(msg)
655 reply = self._client.read_reply()
656 result = reply.split("|")
657 code = int(result[0])
658 text = base64.b64decode(result[1])
660 raise RuntimeError(text)
662 def defer_create_set(self, guid, name, value):
663 msg = testbed_messages[CREATE_SET]
664 type = get_type(value)
665 # avoid having "|" in this parameters
666 name = base64.b64encode(name)
667 value = base64.b64encode(str(value))
668 msg = msg % (guid, name, value, type)
669 self._client.send_msg(msg)
670 reply = self._client.read_reply()
671 result = reply.split("|")
672 code = int(result[0])
673 text = base64.b64decode(result[1])
675 raise RuntimeError(text)
677 def defer_factory_set(self, guid, name, value):
678 msg = testbed_messages[FACTORY_SET]
679 type = get_type(value)
680 # avoid having "|" in this parameters
681 name = base64.b64encode(name)
682 value = base64.b64encode(str(value))
683 msg = msg % (guid, name, value, type)
684 self._client.send_msg(msg)
685 reply = self._client.read_reply()
686 result = reply.split("|")
687 code = int(result[0])
688 text = base64.b64decode(result[1])
690 raise RuntimeError(text)
692 def defer_connect(self, guid1, connector_type_name1, guid2,
693 connector_type_name2):
694 msg = testbed_messages[CONNECT]
695 msg = msg % (guid1, connector_type_name1, guid2,
696 connector_type_name2)
697 self._client.send_msg(msg)
698 reply = self._client.read_reply()
699 result = reply.split("|")
700 code = int(result[0])
701 text = base64.b64decode(result[1])
703 raise RuntimeError(text)
705 def defer_cross_connect(self, guid, connector_type_name, cross_guid,
706 cross_testbed_id, cross_factory_id, cross_connector_type_name):
707 msg = testbed_messages[CROSS_CONNECT]
708 msg = msg % (guid, connector_type_name, cross_guid,
709 cross_testbed_id, cross_factory_id, cross_connector_type_name)
710 self._client.send_msg(msg)
711 reply = self._client.read_reply()
712 result = reply.split("|")
713 code = int(result[0])
714 text = base64.b64decode(result[1])
716 raise RuntimeError(text)
718 def defer_add_trace(self, guid, trace_id):
719 msg = testbed_messages[ADD_TRACE]
720 msg = msg % (guid, trace_id)
721 self._client.send_msg(msg)
722 reply = self._client.read_reply()
723 result = reply.split("|")
724 code = int(result[0])
725 text = base64.b64decode(result[1])
727 raise RuntimeError(text)
729 def defer_add_address(self, guid, address, netprefix, broadcast):
730 msg = testbed_messages[ADD_ADDRESS]
731 msg = msg % (guid, address, netprefix, broadcast)
732 self._client.send_msg(msg)
733 reply = self._client.read_reply()
734 result = reply.split("|")
735 code = int(result[0])
736 text = base64.b64decode(result[1])
738 raise RuntimeError(text)
740 def defer_add_route(self, guid, destination, netprefix, nexthop):
741 msg = testbed_messages[ADD_ROUTE]
742 msg = msg % (guid, destination, netprefix, nexthop)
743 self._client.send_msg(msg)
744 reply = self._client.read_reply()
745 result = reply.split("|")
746 code = int(result[0])
747 text = base64.b64decode(result[1])
749 raise RuntimeError(text)
752 msg = testbed_messages[DO_SETUP]
753 self._client.send_msg(msg)
754 reply = self._client.read_reply()
755 result = reply.split("|")
756 code = int(result[0])
757 text = base64.b64decode(result[1])
759 raise RuntimeError(text)
762 msg = testbed_messages[DO_CREATE]
763 self._client.send_msg(msg)
764 reply = self._client.read_reply()
765 result = reply.split("|")
766 code = int(result[0])
767 text = base64.b64decode(result[1])
769 raise RuntimeError(text)
771 def do_connect(self):
772 msg = testbed_messages[DO_CONNECT]
773 self._client.send_msg(msg)
774 reply = self._client.read_reply()
775 result = reply.split("|")
776 code = int(result[0])
777 text = base64.b64decode(result[1])
779 raise RuntimeError(text)
781 def do_configure(self):
782 msg = testbed_messages[DO_CONFIGURE]
783 self._client.send_msg(msg)
784 reply = self._client.read_reply()
785 result = reply.split("|")
786 code = int(result[0])
787 text = base64.b64decode(result[1])
789 raise RuntimeError(text)
791 def do_cross_connect(self):
792 msg = testbed_messages[DO_CROSS_CONNECT]
793 self._client.send_msg(msg)
794 reply = self._client.read_reply()
795 result = reply.split("|")
796 code = int(result[0])
797 text = base64.b64decode(result[1])
799 raise RuntimeError(text)
801 def start(self, time = TIME_NOW):
802 msg = testbed_messages[START]
803 self._client.send_msg(msg)
804 reply = self._client.read_reply()
805 result = reply.split("|")
806 code = int(result[0])
807 text = base64.b64decode(result[1])
809 raise RuntimeError(text)
811 def stop(self, time = TIME_NOW):
812 msg = testbed_messages[STOP]
813 self._client.send_msg(msg)
814 reply = self._client.read_reply()
815 result = reply.split("|")
816 code = int(result[0])
817 text = base64.b64decode(result[1])
819 raise RuntimeError(text)
821 def set(self, time, guid, name, value):
822 msg = testbed_messages[SET]
823 type = get_type(value)
824 # avoid having "|" in this parameters
825 name = base64.b64encode(name)
826 value = base64.b64encode(str(value))
827 msg = msg % (time, guid, name, value, type)
828 self._client.send_msg(msg)
829 reply = self._client.read_reply()
830 result = reply.split("|")
831 code = int(result[0])
832 text = base64.b64decode(result[1])
834 raise RuntimeError(text)
836 def get(self, time, guid, name):
837 msg = testbed_messages[GET]
838 # avoid having "|" in this parameters
839 name = base64.b64encode(name)
840 msg = msg % (time, guid, name)
841 self._client.send_msg(msg)
842 reply = self._client.read_reply()
843 result = reply.split("|")
844 code = int(result[0])
845 text = base64.b64decode(result[1])
847 raise RuntimeError(text)
850 def action(self, time, guid, action):
851 msg = testbed_messages[ACTION]
852 msg = msg % (time, guid, action)
853 self._client.send_msg(msg)
854 reply = self._client.read_reply()
855 result = reply.split("|")
856 code = int(result[0])
857 text = base64.b64decode(result[1])
859 raise RuntimeError(text)
861 def status(self, guid):
862 msg = testbed_messages[STATUS]
864 self._client.send_msg(msg)
865 reply = self._client.read_reply()
866 result = reply.split("|")
867 code = int(result[0])
868 text = base64.b64decode(result[1])
870 raise RuntimeError(text)
873 def trace(self, guid, trace_id):
874 msg = testbed_messages[TRACE]
875 msg = msg % (guid, trace_id)
876 self._client.send_msg(msg)
877 reply = self._client.read_reply()
878 result = reply.split("|")
879 code = int(result[0])
880 text = base64.b64decode(result[1])
882 raise RuntimeError(text)
886 msg = testbed_messages[SHUTDOWN]
887 self._client.send_msg(msg)
888 reply = self._client.read_reply()
889 result = reply.split("|")
890 code = int(result[0])
891 text = base64.b64decode(result[1])
893 raise RuntimeError(text)
894 self._client.send_stop()
896 class ExperimentControllerProxy(object):
897 def __init__(self, root_dir, log_level, experiment_xml = None,
898 launch = True, host = None, port = None, user = None,
902 if experiment_xml == None:
903 raise RuntimeError("To launch a ExperimentControllerServer a \
904 xml description of the experiment is required")
908 xml = xml.replace("'", r"\'")
909 xml = xml.replace("\"", r"\'")
910 xml = xml.replace("\n", r"")
911 python_code = "from nepi.util.proxy import ExperimentControllerServer;\
912 s = ExperimentControllerServer('%s', %d, '%s');\
913 s.run()" % (root_dir, log_level, xml)
914 self._client = launch_ssh_daemon_client(root_dir, python_code,
915 host, port, user, agent)
918 s = ExperimentControllerServer(root_dir, log_level, experiment_xml)
921 self._client = server.Client(root_dir)
924 def experiment_xml(self):
925 msg = controller_messages[XML]
926 self._client.send_msg(msg)
927 reply = self._client.read_reply()
928 result = reply.split("|")
929 code = int(result[0])
930 text = base64.b64decode(result[1])
932 raise RuntimeError(text)
935 def set_access_configuration(self, testbed_guid, access_config):
936 mode = access_config.get_attribute_value("mode")
937 communication = access_config.get_attribute_value("communication")
938 host = access_config.get_attribute_value("host")
939 user = access_config.get_attribute_value("user")
940 port = access_config.get_attribute_value("port")
941 root_dir = access_config.get_attribute_value("rootDirectory")
942 use_agent = access_config.get_attribute_value("useAgent")
943 log_level = access_config.get_attribute_value("logLevel")
944 msg = controller_messages[ACCESS]
945 msg = msg % (testbed_guid, mode, communication, host, user, port,
946 root_dir, use_agent, log_level)
947 self._client.send_msg(msg)
948 reply = self._client.read_reply()
949 result = reply.split("|")
950 code = int(result[0])
951 text = base64.b64decode(result[1])
953 raise RuntimeError(text)
955 def trace(self, testbed_guid, guid, trace_id):
956 msg = controller_messages[TRACE]
957 msg = msg % (testbed_guid, guid, trace_id)
958 self._client.send_msg(msg)
959 reply = self._client.read_reply()
960 result = reply.split("|")
961 code = int(result[0])
962 text = base64.b64decode(result[1])
965 raise RuntimeError(text)
968 msg = controller_messages[START]
969 self._client.send_msg(msg)
970 reply = self._client.read_reply()
971 result = reply.split("|")
972 code = int(result[0])
973 text = base64.b64decode(result[1])
975 raise RuntimeError(text)
978 msg = controller_messages[STOP]
979 self._client.send_msg(msg)
980 reply = self._client.read_reply()
981 result = reply.split("|")
982 code = int(result[0])
983 text = base64.b64decode(result[1])
985 raise RuntimeError(text)
987 def is_finished(self, guid):
988 msg = controller_messages[FINISHED]
990 self._client.send_msg(msg)
991 reply = self._client.read_reply()
992 result = reply.split("|")
993 code = int(result[0])
994 text = base64.b64decode(result[1])
996 raise RuntimeError(text)
997 return text == "True"
1000 msg = controller_messages[SHUTDOWN]
1001 self._client.send_msg(msg)
1002 reply = self._client.read_reply()
1003 result = reply.split("|")
1004 code = int(result[0])
1005 text = base64.b64decode(result[1])
1007 raise RuntimeError(text)
1008 self._client.send_stop()