2 # -*- coding: utf-8 -*-
5 from nepi.core.attributes import AttributesMap, Attribute
6 from nepi.util import server, validation
7 from nepi.util.constants import TIME_NOW
16 # PROTOCOL INSTRUCTION MESSAGES
50 # EXPERIMENT CONTROLER PROTOCOL MESSAGES
51 controller_messages = dict({
53 ACCESS: "%d|%s" % (ACCESS, "%d|%s|%s|%s|%s|%d|%s|%r|%s"),
54 TRACE: "%d|%s" % (TRACE, "%d|%d|%s"),
55 FINISHED: "%d|%s" % (FINISHED, "%d"),
58 SHUTDOWN: "%d" % SHUTDOWN,
61 # TESTBED INSTANCE PROTOCOL MESSAGES
62 testbed_messages = dict({
63 TRACE: "%d|%s" % (TRACE, "%d|%s"),
64 START: "%d|%s" % (START, "%s"),
65 STOP: "%d|%s" % (STOP, "%s"),
66 SHUTDOWN: "%d" % SHUTDOWN,
67 CONFIGURE: "%d|%s" % (CONFIGURE, "%s|%s|%d"),
68 CREATE: "%d|%s" % (CREATE, "%d|%s"),
69 CREATE_SET: "%d|%s" % (CREATE_SET, "%d|%s|%s|%d"),
70 FACTORY_SET: "%d|%s" % (FACTORY_SET, "%d|%s|%s|%d"),
71 CONNECT: "%d|%s" % (CONNECT, "%d|%s|%d|%s"),
72 CROSS_CONNECT: "%d|%s" % (CROSS_CONNECT, "%d|%s|%d|%d|%s|%s"),
73 ADD_TRACE: "%d|%s" % (ADD_TRACE, "%d|%s"),
74 ADD_ADDRESS: "%d|%s" % (ADD_ADDRESS, "%d|%d|%s|%d|%s"),
75 ADD_ROUTE: "%d|%s" % (ADD_ROUTE, "%d|%s|%d|%s"),
76 DO_SETUP: "%d" % DO_SETUP,
77 DO_CREATE: "%d" % DO_CREATE,
78 DO_CONNECT: "%d" % DO_CONNECT,
79 DO_CONFIGURE: "%d" % DO_CONFIGURE,
80 DO_CROSS_CONNECT: "%d" % DO_CROSS_CONNECT,
81 GET: "%d|%s" % (GET, "%s|%d|%s"),
82 SET: "%d|%s" % (SET, "%s|%d|%s|%s|%d"),
83 ACTION: "%d|%s" % (ACTION, "%s|%d|%s"),
84 STATUS: "%d|%s" % (STATUS, "%d"),
88 instruction_text = dict({
98 CONFIGURE: "CONFIGURE",
100 CREATE_SET: "CREATE_SET",
101 FACTORY_SET: "FACTORY_SET",
103 CROSS_CONNECT: "CROSS_CONNECT",
104 ADD_TRACE: "ADD_TRACE",
105 ADD_ADDRESS: "ADD_ADDRESS",
106 ADD_ROUTE: "ADD_ROUTE",
107 DO_SETUP: "DO_SETUP",
108 DO_CREATE: "DO_CREATE",
109 DO_CONNECT: "DO_CONNECT",
110 DO_CONFIGURE: "DO_CONFIGURE",
111 DO_CROSS_CONNECT: "DO_CROSS_CONNECT",
124 if isinstance(value, bool):
126 elif isinstance(value, int):
128 elif isinstance(value, float):
133 def set_type(type, value):
139 value = value == "True"
144 def log_msg(server, params):
145 instr = int(params[0])
146 instr_txt = instruction_text[instr]
147 server.log_debug("%s - msg: %s [%s]" % (server.__class__.__name__,
148 instr_txt, ", ".join(map(str, params[1:]))))
150 def log_reply(server, reply):
151 res = reply.split("|")
153 code_txt = instruction_text[code]
154 txt = base64.b64decode(res[1])
155 server.log_debug("%s - reply: %s %s" % (server.__class__.__name__,
158 def launch_ssh_daemon_client(root_dir, python_code, host, port, user, agent):
160 proc = server.popen_ssh_subprocess(python_code, host = host,
161 port = port, user = user, agent = agent)
162 #while not proc.poll():
165 err = proc.stderr.read()
166 raise RuntimeError("Client could not be executed: %s" % \
169 return server.Client(root_dir, host = host, port = port, user = user,
172 def to_server_log_level(log_level):
173 return server.DEBUG_LEVEL \
174 if log_level == AccessConfiguration.DEBUG_LEVEL \
175 else server.ERROR_LEVEL
177 def get_access_config_params(access_config):
178 root_dir = access_config.get_attribute_value("rootDirectory")
179 log_level = access_config.get_attribute_value("logLevel")
180 log_level = to_server_log_level(log_level)
181 user = host = port = agent = None
182 communication = access_config.get_attribute_value("communication")
183 if communication == AccessConfiguration.ACCESS_SSH:
184 user = access_config.get_attribute_value("user")
185 host = access_config.get_attribute_value("host")
186 port = access_config.get_attribute_value("port")
187 agent = access_config.get_attribute_value("useAgent")
188 return (root_dir, log_level, user, host, port, agent)
190 class AccessConfiguration(AttributesMap):
191 MODE_SINGLE_PROCESS = "SINGLE"
192 MODE_DAEMON = "DAEMON"
194 ACCESS_LOCAL = "LOCAL"
195 ERROR_LEVEL = "Error"
196 DEBUG_LEVEL = "Debug"
199 super(AccessConfiguration, self).__init__()
200 self.add_attribute(name = "mode",
201 help = "Instance execution mode",
202 type = Attribute.ENUM,
203 value = AccessConfiguration.MODE_SINGLE_PROCESS,
204 allowed = [AccessConfiguration.MODE_DAEMON,
205 AccessConfiguration.MODE_SINGLE_PROCESS],
206 validation_function = validation.is_enum)
207 self.add_attribute(name = "communication",
208 help = "Instance communication mode",
209 type = Attribute.ENUM,
210 value = AccessConfiguration.ACCESS_LOCAL,
211 allowed = [AccessConfiguration.ACCESS_LOCAL,
212 AccessConfiguration.ACCESS_SSH],
213 validation_function = validation.is_enum)
214 self.add_attribute(name = "host",
215 help = "Host where the testbed will be executed",
216 type = Attribute.STRING,
218 validation_function = validation.is_string)
219 self.add_attribute(name = "user",
220 help = "User on the Host to execute the testbed",
221 type = Attribute.STRING,
222 value = getpass.getuser(),
223 validation_function = validation.is_string)
224 self.add_attribute(name = "port",
225 help = "Port on the Host",
226 type = Attribute.INTEGER,
228 validation_function = validation.is_integer)
229 self.add_attribute(name = "rootDirectory",
230 help = "Root directory for storing process files",
231 type = Attribute.STRING,
233 validation_function = validation.is_string) # TODO: validation.is_path
234 self.add_attribute(name = "useAgent",
235 help = "Use -A option for forwarding of the authentication agent, if ssh access is used",
236 type = Attribute.BOOL,
238 validation_function = validation.is_bool)
239 self.add_attribute(name = "logLevel",
240 help = "Log level for instance",
241 type = Attribute.ENUM,
242 value = AccessConfiguration.ERROR_LEVEL,
243 allowed = [AccessConfiguration.ERROR_LEVEL,
244 AccessConfiguration.DEBUG_LEVEL],
245 validation_function = validation.is_enum)
247 def create_controller(xml, access_config = None):
248 mode = None if not access_config else \
249 access_config.get_attribute_value("mode")
250 if not mode or mode == AccessConfiguration.MODE_SINGLE_PROCESS:
251 from nepi.core.execute import ExperimentController
252 return ExperimentController(xml)
253 elif mode == AccessConfiguration.MODE_DAEMON:
254 (root_dir, log_level, user, host, port, agent) = \
255 get_access_config_params(access_config)
256 return ExperimentControllerProxy(root_dir, log_level,
257 experiment_xml = xml, host = host, port = port, user = user,
259 raise RuntimeError("Unsupported access configuration 'mode'" % mode)
261 def create_testbed_instance(testbed_id, testbed_version, access_config):
262 mode = None if not access_config else access_config.get_attribute_value("mode")
263 if not mode or mode == AccessConfiguration.MODE_SINGLE_PROCESS:
264 return _build_testbed_instance(testbed_id, testbed_version)
265 elif mode == AccessConfiguration.MODE_DAEMON:
266 (root_dir, log_level, user, host, port, agent) = \
267 get_access_config_params(access_config)
268 return TestbedIntanceProxy(root_dir, log_level, testbed_id = testbed_id,
269 testbed_version = testbed_version, host = host, port = port,
270 user = user, agent = agent)
271 raise RuntimeError("Unsupported access configuration 'mode'" % mode)
273 def _build_testbed_instance(testbed_id, testbed_version):
274 mod_name = "nepi.testbeds.%s" % (testbed_id.lower())
275 if not mod_name in sys.modules:
277 module = sys.modules[mod_name]
278 return module.TestbedInstance(testbed_version)
280 class TestbedInstanceServer(server.Server):
281 def __init__(self, root_dir, log_level, testbed_id, testbed_version):
282 super(TestbedInstanceServer, self).__init__(root_dir, log_level)
283 self._testbed_id = testbed_id
284 self._testbed_version = testbed_version
287 def post_daemonize(self):
288 self._testbed = _build_testbed_instance(self._testbed_id,
289 self._testbed_version)
291 def reply_action(self, msg):
292 params = msg.split("|")
293 instruction = int(params[0])
294 log_msg(self, params)
296 if instruction == TRACE:
297 reply = self.trace(params)
298 elif instruction == START:
299 reply = self.start(params)
300 elif instruction == STOP:
301 reply = self.stop(params)
302 elif instruction == SHUTDOWN:
303 reply = self.shutdown(params)
304 elif instruction == CONFIGURE:
305 reply = self.configure(params)
306 elif instruction == CREATE:
307 reply = self.create(params)
308 elif instruction == CREATE_SET:
309 reply = self.create_set(params)
310 elif instruction == FACTORY_SET:
311 reply = self.factory_set(params)
312 elif instruction == CONNECT:
313 reply = self.connect(params)
314 elif instruction == CROSS_CONNECT:
315 reply = self.cross_connect(params)
316 elif instruction == ADD_TRACE:
317 reply = self.add_trace(params)
318 elif instruction == ADD_ADDRESS:
319 reply = self.add_address(params)
320 elif instruction == ADD_ROUTE:
321 reply = self.add_route(params)
322 elif instruction == DO_SETUP:
323 reply = self.do_setup(params)
324 elif instruction == DO_CREATE:
325 reply = self.do_create(params)
326 elif instruction == DO_CONNECT:
327 reply = self.do_connect(params)
328 elif instruction == DO_CONFIGURE:
329 reply = self.do_configure(params)
330 elif instruction == DO_CROSS_CONNECT:
331 reply = self.do_cross_connect(params)
332 elif instruction == GET:
333 reply = self.get(params)
334 elif instruction == SET:
335 reply = self.set(params)
336 elif instruction == ACTION:
337 reply = self.action(params)
338 elif instruction == STATUS:
339 reply = self.status(params)
340 elif instruction == GUIDS:
341 reply = self.guids(params)
343 error = "Invalid instruction %s" % instruction
344 self.log_error(error)
345 result = base64.b64encode(error)
346 reply = "%d|%s" % (ERROR, result)
348 error = self.log_error()
349 result = base64.b64encode(error)
350 reply = "%d|%s" % (ERROR, result)
351 log_reply(self, reply)
354 def guids(self, params):
355 guids = self._testbed.guids
356 guids = ",".join(map(str, guids))
357 result = base64.b64encode(guids)
358 return "%d|%s" % (OK, result)
360 def create(self, params):
361 guid = int(params[1])
362 factory_id = params[2]
363 self._testbed.create(guid, factory_id)
364 return "%d|%s" % (OK, "")
366 def trace(self, params):
367 guid = int(params[1])
369 trace = self._testbed.trace(guid, trace_id)
370 result = base64.b64encode(trace)
371 return "%d|%s" % (OK, result)
373 def start(self, params):
375 self._testbed.start(time)
376 return "%d|%s" % (OK, "")
378 def stop(self, params):
380 self._testbed.stop(time)
381 return "%d|%s" % (OK, "")
383 def shutdown(self, params):
384 self._testbed.shutdown()
385 return "%d|%s" % (OK, "")
387 def configure(self, params):
388 name = base64.b64decode(params[1])
389 value = base64.b64decode(params[2])
390 type = int(params[3])
391 value = set_type(type, value)
392 self._testbed.configure(name, value)
393 return "%d|%s" % (OK, "")
395 def create_set(self, params):
396 guid = int(params[1])
397 name = base64.b64decode(params[2])
398 value = base64.b64decode(params[3])
399 type = int(params[4])
400 value = set_type(type, value)
401 self._testbed.create_set(guid, name, value)
402 return "%d|%s" % (OK, "")
404 def factory_set(self, params):
405 name = base64.b64decode(params[1])
406 value = base64.b64decode(params[2])
407 type = int(params[3])
408 value = set_type(type, value)
409 self._testbed.factory_set(name, value)
410 return "%d|%s" % (OK, "")
412 def connect(self, params):
413 guid1 = int(params[1])
414 connector_type_name1 = params[2]
415 guid2 = int(params[3])
416 connector_type_name2 = params[4]
417 self._testbed.connect(guid1, connector_type_name1, guid2,
418 connector_type_name2)
419 return "%d|%s" % (OK, "")
421 def cross_connect(self, params):
422 guid = int(params[1])
423 connector_type_name = params[2]
424 cross_guid = int(params[3])
425 connector_type_name = params[4]
426 cross_guid = int(params[5])
427 cross_testbed_id = params[6]
428 cross_factory_id = params[7]
429 cross_connector_type_name = params[8]
430 self._testbed.cross_connect(guid, connector_type_name, cross_guid,
431 cross_testbed_id, cross_factory_id, cross_connector_type_name)
432 return "%d|%s" % (OK, "")
434 def add_trace(self, params):
435 guid = int(params[1])
437 self._testbed.add_trace(guid, trace_id)
438 return "%d|%s" % (OK, "")
440 def add_address(self, params):
441 guid = int(params[1])
442 family = int(params[2])
444 netprefix = int(params[4])
445 broadcast = params[5]
446 self._testbed.add_address(guid, family, address, netprefix,
448 return "%d|%s" % (OK, "")
450 def add_route(self, params):
451 guid = int(params[1])
452 destination = params[2]
453 netprefix = int(params[3])
455 self._testbed.add_route(guid, destination, netprefix, nexthop)
456 return "%d|%s" % (OK, "")
458 def do_setup(self, params):
459 self._testbed.do_setup()
460 return "%d|%s" % (OK, "")
462 def do_create(self, params):
463 self._testbed.do_create()
464 return "%d|%s" % (OK, "")
466 def do_connect(self, params):
467 self._testbed.do_connect()
468 return "%d|%s" % (OK, "")
470 def do_configure(self, params):
471 self._testbed.do_configure()
472 return "%d|%s" % (OK, "")
474 def do_cross_connect(self, params):
475 self._testbed.do_cross_connect()
476 return "%d|%s" % (OK, "")
478 def get(self, params):
480 guid = int(param[2] )
481 name = base64.b64decode(params[3])
482 value = self._testbed.get(time, guid, name)
483 result = base64.b64encode(str(value))
484 return "%d|%s" % (OK, result)
486 def set(self, params):
488 guid = int(params[2])
489 name = base64.b64decode(params[3])
490 value = base64.b64decode(params[4])
491 type = int(params[3])
492 value = set_type(type, value)
493 self._testbed.set(time, guid, name, value)
494 return "%d|%s" % (OK, "")
496 def action(self, params):
498 guid = int(params[2])
499 command = base64.b64decode(params[3])
500 self._testbed.action(time, guid, command)
501 return "%d|%s" % (OK, "")
503 def status(self, params):
504 guid = int(params[1])
505 status = self._testbed.status(guid)
506 result = base64.b64encode(str(status))
507 return "%d|%s" % (OK, result)
509 class ExperimentControllerServer(server.Server):
510 def __init__(self, root_dir, log_level, experiment_xml):
511 super(ExperimentControllerServer, self).__init__(root_dir, log_level)
512 self._experiment_xml = experiment_xml
513 self._controller = None
515 def post_daemonize(self):
516 from nepi.core.execute import ExperimentController
517 self._controller = ExperimentController(self._experiment_xml)
519 def reply_action(self, msg):
520 params = msg.split("|")
521 instruction = int(params[0])
522 log_msg(self, params)
524 if instruction == XML:
525 reply = self.experiment_xml(params)
526 elif instruction == ACCESS:
527 reply = self.set_access_configuration(params)
528 elif instruction == TRACE:
529 reply = self.trace(params)
530 elif instruction == FINISHED:
531 reply = self.is_finished(params)
532 elif instruction == START:
533 reply = self.start(params)
534 elif instruction == STOP:
535 reply = self.stop(params)
536 elif instruction == SHUTDOWN:
537 reply = self.shutdown(params)
539 error = "Invalid instruction %s" % instruction
540 self.log_error(error)
541 result = base64.b64encode(error)
542 reply = "%d|%s" % (ERROR, result)
544 error = self.log_error()
545 result = base64.b64encode(error)
546 reply = "%d|%s" % (ERROR, result)
547 log_reply(self, reply)
550 def experiment_xml(self, params):
551 xml = self._controller.experiment_xml
552 result = base64.b64encode(xml)
553 return "%d|%s" % (OK, result)
555 def set_access_configuration(self, params):
556 testbed_guid = int(params[1])
558 communication = params[3]
561 port = int(params[6])
563 use_agent = params[8] == "True"
564 log_level = params[9]
565 access_config = AccessConfiguration()
566 access_config.set_attribute_value("mode", mode)
567 access_config.set_attribute_value("communication", communication)
568 access_config.set_attribute_value("host", host)
569 access_config.set_attribute_value("user", user)
570 access_config.set_attribute_value("port", port)
571 access_config.set_attribute_value("rootDirectory", root_dir)
572 access_config.set_attribute_value("useAgent", use_agent)
573 access_config.set_attribute_value("logLevel", log_level)
574 self._controller.set_access_configuration(testbed_guid,
576 return "%d|%s" % (OK, "")
578 def trace(self, params):
579 testbed_guid = int(params[1])
580 guid = int(params[2])
582 trace = self._controller.trace(testbed_guid, guid, trace_id)
583 result = base64.b64encode(trace)
584 return "%d|%s" % (OK, result)
586 def is_finished(self, params):
587 guid = int(params[1])
588 status = self._controller.is_finished(guid)
589 result = base64.b64encode(str(status))
590 return "%d|%s" % (OK, result)
592 def start(self, params):
593 self._controller.start()
594 return "%d|%s" % (OK, "")
596 def stop(self, params):
597 self._controller.stop()
598 return "%d|%s" % (OK, "")
600 def shutdown(self, params):
601 self._controller.shutdown()
602 return "%d|%s" % (OK, "")
604 class TestbedIntanceProxy(object):
605 def __init__(self, root_dir, log_level, testbed_id = None,
606 testbed_version = None, launch = True, host = None,
607 port = None, user = None, agent = None):
609 if testbed_id == None or testbed_version == None:
610 raise RuntimeError("To launch a TesbedInstance server a \
611 testbed_id and testbed_version are required")
614 python_code = "from nepi.util.proxy import \
615 TesbedInstanceServer;\
616 s = TestbedInstanceServer('%s', %d, '%s', '%s');\
617 s.run()" % (root_dir, log_level, testbed_id,
619 self._client = launch_ssh_daemon_client(root_dir, python_code,
620 host, port, user, agent)
623 s = TestbedInstanceServer(root_dir, log_level, testbed_id,
627 self._client = server.Client(root_dir)
631 msg = testbed_messages[GUIDS]
632 self._client.send_msg(msg)
633 reply = self._client.read_reply()
634 result = reply.split("|")
635 code = int(result[0])
636 text = base64.b64decode(result[1])
638 raise RuntimeError(text)
639 return map(int, text.split(","))
641 def configure(self, name, value):
642 msg = testbed_messages[CONFIGURE]
643 type = get_type(value)
644 # avoid having "|" in this parameters
645 name = base64.b64encode(name)
646 value = base64.b64encode(str(value))
647 msg = msg % (name, value, type)
648 self._client.send_msg(msg)
649 reply = self._client.read_reply()
650 result = reply.split("|")
651 code = int(result[0])
652 text = base64.b64decode(result[1])
654 raise RuntimeError(text)
656 def create(self, guid, factory_id):
657 msg = testbed_messages[CREATE]
658 msg = msg % (guid, factory_id)
659 self._client.send_msg(msg)
660 reply = self._client.read_reply()
661 result = reply.split("|")
662 code = int(result[0])
663 text = base64.b64decode(result[1])
665 raise RuntimeError(text)
667 def create_set(self, guid, name, value):
668 msg = testbed_messages[CREATE_SET]
669 type = get_type(value)
670 # avoid having "|" in this parameters
671 name = base64.b64encode(name)
672 value = base64.b64encode(str(value))
673 msg = msg % (guid, name, value, type)
674 self._client.send_msg(msg)
675 reply = self._client.read_reply()
676 result = reply.split("|")
677 code = int(result[0])
678 text = base64.b64decode(result[1])
680 raise RuntimeError(text)
682 def factory_set(self, guid, name, value):
683 msg = testbed_messages[FACTORY_SET]
684 type = get_type(value)
685 # avoid having "|" in this parameters
686 name = base64.b64encode(name)
687 value = base64.b64encode(str(value))
688 msg = msg % (guid, name, value, type)
689 self._client.send_msg(msg)
690 reply = self._client.read_reply()
691 result = reply.split("|")
692 code = int(result[0])
693 text = base64.b64decode(result[1])
695 raise RuntimeError(text)
697 def connect(self, guid1, connector_type_name1, guid2,
698 connector_type_name2):
699 msg = testbed_messages[CONNECT]
700 msg = msg % (guid1, connector_type_name1, guid2,
701 connector_type_name2)
702 self._client.send_msg(msg)
703 reply = self._client.read_reply()
704 result = reply.split("|")
705 code = int(result[0])
706 text = base64.b64decode(result[1])
708 raise RuntimeError(text)
710 def cross_connect(self, guid, connector_type_name, cross_guid,
711 cross_testbed_id, cross_factory_id, cross_connector_type_name):
712 msg = testbed_messages[CROSS_CONNECT]
713 msg = msg % (guid, connector_type_name, cross_guid,
714 cross_testbed_id, cross_factory_id, cross_connector_type_name)
715 self._client.send_msg(msg)
716 reply = self._client.read_reply()
717 result = reply.split("|")
718 code = int(result[0])
719 text = base64.b64decode(result[1])
721 raise RuntimeError(text)
723 def add_trace(self, guid, trace_id):
724 msg = testbed_messages[ADD_TRACE]
725 msg = msg % (guid, trace_id)
726 self._client.send_msg(msg)
727 reply = self._client.read_reply()
728 result = reply.split("|")
729 code = int(result[0])
730 text = base64.b64decode(result[1])
732 raise RuntimeError(text)
734 def add_address(self, guid, family, address, netprefix, broadcast):
735 msg = testbed_messages[ADD_ADDRESS]
736 msg = msg % (guid, family, address, netprefix, broadcast)
737 self._client.send_msg(msg)
738 reply = self._client.read_reply()
739 result = reply.split("|")
740 code = int(result[0])
741 text = base64.b64decode(result[1])
743 raise RuntimeError(text)
745 def add_route(self, guid, destination, netprefix, nexthop):
746 msg = testbed_messages[ADD_ROUTE]
747 msg = msg % (guid, destination, netprefix, nexthop)
748 self._client.send_msg(msg)
749 reply = self._client.read_reply()
750 result = reply.split("|")
751 code = int(result[0])
752 text = base64.b64decode(result[1])
754 raise RuntimeError(text)
757 msg = testbed_messages[DO_SETUP]
758 self._client.send_msg(msg)
759 reply = self._client.read_reply()
760 result = reply.split("|")
761 code = int(result[0])
762 text = base64.b64decode(result[1])
764 raise RuntimeError(text)
767 msg = testbed_messages[DO_CREATE]
768 self._client.send_msg(msg)
769 reply = self._client.read_reply()
770 result = reply.split("|")
771 code = int(result[0])
772 text = base64.b64decode(result[1])
774 raise RuntimeError(text)
776 def do_connect(self):
777 msg = testbed_messages[DO_CONNECT]
778 self._client.send_msg(msg)
779 reply = self._client.read_reply()
780 result = reply.split("|")
781 code = int(result[0])
782 text = base64.b64decode(result[1])
784 raise RuntimeError(text)
786 def do_configure(self):
787 msg = testbed_messages[DO_CONFIGURE]
788 self._client.send_msg(msg)
789 reply = self._client.read_reply()
790 result = reply.split("|")
791 code = int(result[0])
792 text = base64.b64decode(result[1])
794 raise RuntimeError(text)
796 def do_cross_connect(self):
797 msg = testbed_messages[DO_CROSS_CONNECT]
798 self._client.send_msg(msg)
799 reply = self._client.read_reply()
800 result = reply.split("|")
801 code = int(result[0])
802 text = base64.b64decode(result[1])
804 raise RuntimeError(text)
806 def start(self, time = TIME_NOW):
807 msg = testbed_messages[START]
809 self._client.send_msg(msg)
810 reply = self._client.read_reply()
811 result = reply.split("|")
812 code = int(result[0])
813 text = base64.b64decode(result[1])
815 raise RuntimeError(text)
817 def stop(self, time = TIME_NOW):
818 msg = testbed_messages[STOP]
820 self._client.send_msg(msg)
821 reply = self._client.read_reply()
822 result = reply.split("|")
823 code = int(result[0])
824 text = base64.b64decode(result[1])
826 raise RuntimeError(text)
828 def set(self, time, guid, name, value):
829 msg = testbed_messages[SET]
830 type = get_type(value)
831 # avoid having "|" in this parameters
832 name = base64.b64encode(name)
833 value = base64.b64encode(str(value))
834 msg = msg % (time, guid, name, value, type)
835 self._client.send_msg(msg)
836 reply = self._client.read_reply()
837 result = reply.split("|")
838 code = int(result[0])
839 text = base64.b64decode(result[1])
841 raise RuntimeError(text)
843 def get(self, time, guid, name):
844 msg = testbed_messages[GET]
845 # avoid having "|" in this parameters
846 name = base64.b64encode(name)
847 msg = msg % (time, guid, name)
848 self._client.send_msg(msg)
849 reply = self._client.read_reply()
850 result = reply.split("|")
851 code = int(result[0])
852 text = base64.b64decode(result[1])
854 raise RuntimeError(text)
857 def action(self, time, guid, action):
858 msg = testbed_messages[ACTION]
859 msg = msg % (time, guid, action)
860 self._client.send_msg(msg)
861 reply = self._client.read_reply()
862 result = reply.split("|")
863 code = int(result[0])
864 text = base64.b64decode(result[1])
866 raise RuntimeError(text)
868 def status(self, guid):
869 msg = testbed_messages[STATUS]
871 self._client.send_msg(msg)
872 reply = self._client.read_reply()
873 result = reply.split("|")
874 code = int(result[0])
875 text = base64.b64decode(result[1])
877 raise RuntimeError(text)
880 def trace(self, guid, trace_id):
881 msg = testbed_messages[TRACE]
882 msg = msg % (guid, trace_id)
883 self._client.send_msg(msg)
884 reply = self._client.read_reply()
885 result = reply.split("|")
886 code = int(result[0])
887 text = base64.b64decode(result[1])
889 raise RuntimeError(text)
893 msg = testbed_messages[SHUTDOWN]
894 self._client.send_msg(msg)
895 reply = self._client.read_reply()
896 result = reply.split("|")
897 code = int(result[0])
898 text = base64.b64decode(result[1])
900 raise RuntimeError(text)
901 self._client.send_stop()
903 class ExperimentControllerProxy(object):
904 def __init__(self, root_dir, log_level, experiment_xml = None,
905 launch = True, host = None, port = None, user = None,
909 if experiment_xml == None:
910 raise RuntimeError("To launch a ExperimentControllerServer a \
911 xml description of the experiment is required")
915 xml = xml.replace("'", r"\'")
916 xml = xml.replace("\"", r"\'")
917 xml = xml.replace("\n", r"")
918 python_code = "from nepi.util.proxy import ExperimentControllerServer;\
919 s = ExperimentControllerServer('%s', %d, '%s');\
920 s.run()" % (root_dir, log_level, xml)
921 self._client = launch_ssh_daemon_client(root_dir, python_code,
922 host, port, user, agent)
925 s = ExperimentControllerServer(root_dir, log_level, experiment_xml)
928 self._client = server.Client(root_dir)
931 def experiment_xml(self):
932 msg = controller_messages[XML]
933 self._client.send_msg(msg)
934 reply = self._client.read_reply()
935 result = reply.split("|")
936 code = int(result[0])
937 text = base64.b64decode(result[1])
939 raise RuntimeError(text)
942 def set_access_configuration(self, testbed_guid, access_config):
943 mode = access_config.get_attribute_value("mode")
944 communication = access_config.get_attribute_value("communication")
945 host = access_config.get_attribute_value("host")
946 user = access_config.get_attribute_value("user")
947 port = access_config.get_attribute_value("port")
948 root_dir = access_config.get_attribute_value("rootDirectory")
949 use_agent = access_config.get_attribute_value("useAgent")
950 log_level = access_config.get_attribute_value("logLevel")
951 msg = controller_messages[ACCESS]
952 msg = msg % (testbed_guid, mode, communication, host, user, port,
953 root_dir, use_agent, log_level)
954 self._client.send_msg(msg)
955 reply = self._client.read_reply()
956 result = reply.split("|")
957 code = int(result[0])
958 text = base64.b64decode(result[1])
960 raise RuntimeError(text)
962 def trace(self, testbed_guid, guid, trace_id):
963 msg = controller_messages[TRACE]
964 msg = msg % (testbed_guid, guid, trace_id)
965 self._client.send_msg(msg)
966 reply = self._client.read_reply()
967 result = reply.split("|")
968 code = int(result[0])
969 text = base64.b64decode(result[1])
972 raise RuntimeError(text)
975 msg = controller_messages[START]
976 self._client.send_msg(msg)
977 reply = self._client.read_reply()
978 result = reply.split("|")
979 code = int(result[0])
980 text = base64.b64decode(result[1])
982 raise RuntimeError(text)
985 msg = controller_messages[STOP]
986 self._client.send_msg(msg)
987 reply = self._client.read_reply()
988 result = reply.split("|")
989 code = int(result[0])
990 text = base64.b64decode(result[1])
992 raise RuntimeError(text)
994 def is_finished(self, guid):
995 msg = controller_messages[FINISHED]
997 self._client.send_msg(msg)
998 reply = self._client.read_reply()
999 result = reply.split("|")
1000 code = int(result[0])
1001 text = base64.b64decode(result[1])
1003 raise RuntimeError(text)
1004 return text == "True"
1007 msg = controller_messages[SHUTDOWN]
1008 self._client.send_msg(msg)
1009 reply = self._client.read_reply()
1010 result = reply.split("|")
1011 code = int(result[0])
1012 text = base64.b64decode(result[1])
1014 raise RuntimeError(text)
1015 self._client.send_stop()