2 # -*- coding: utf-8 -*-
5 from nepi.core.attributes import AttributesMap, Attribute
6 from nepi.util import server, validation
7 from nepi.util.constants import TIME_NOW
14 # PROTOCOL INSTRUCTION MESSAGES
48 # EXPERIMENT CONTROLER PROTOCOL MESSAGES
49 controller_messages = dict({
51 ACCESS: "%d|%s" % (ACCESS, "%d|%s|%s|%s|%s|%d|%s|%r"),
52 TRACE: "%d|%s" % (TRACE, "%d|%d|%s"),
53 FINISHED: "%d|%s" % (FINISHED, "%d"),
56 SHUTDOWN: "%d" % SHUTDOWN,
59 # TESTBED INSTANCE PROTOCOL MESSAGES
60 testbed_messages = dict({
61 TRACE: "%d|%s" % (TRACE, "%d|%s"),
62 START: "%d|%s" % (START, "%s"),
63 STOP: "%d|%s" % (STOP, "%s"),
64 SHUTDOWN: "%d" % SHUTDOWN,
65 CONFIGURE: "%d|%s" % (CONFIGURE, "%s|%s|%d"),
66 CREATE: "%d|%s" % (CREATE, "%d|%s"),
67 CREATE_SET: "%d|%s" % (CREATE_SET, "%d|%s|%s|%d"),
68 FACTORY_SET: "%d|%s" % (FACTORY_SET, "%d|%s|%s|%d"),
69 CONNECT: "%d|%s" % (CONNECT, "%d|%s|%d|%s"),
70 CROSS_CONNECT: "%d|%s" % (CROSS_CONNECT, "%d|%s|%d|%d|%s|%s"),
71 ADD_TRACE: "%d|%s" % (ADD_TRACE, "%d|%s"),
72 ADD_ADDRESS: "%d|%s" % (ADD_ADDRESS, "%d|%d|%s|%d|%s"),
73 ADD_ROUTE: "%d|%s" % (ADD_ROUTE, "%d|%s|%d|%s"),
74 DO_SETUP: "%d" % DO_SETUP,
75 DO_CREATE: "%d" % DO_CREATE,
76 DO_CONNECT: "%d" % DO_CONNECT,
77 DO_CONFIGURE: "%d" % DO_CONFIGURE,
78 DO_CROSS_CONNECT: "%d" % DO_CROSS_CONNECT,
79 GET: "%d|%s" % (GET, "%s|%d|%s"),
80 SET: "%d|%s" % (SET, "%s|%d|%s|%s|%d"),
81 ACTION: "%d|%s" % (ACTION, "%s|%d|%s"),
82 STATUS: "%d|%s" % (STATUS, "%d"),
87 if isinstance(value, bool):
89 elif isinstance(value, int):
91 elif isinstance(value, float):
96 def set_type(type, value):
107 class AccessConfiguration(AttributesMap):
108 MODE_SINGLE_PROCESS = "SINGLE"
109 MODE_DAEMON = "DAEMON"
111 ACCESS_LOCAL = "LOCAL"
114 super(AccessConfiguration, self).__init__()
115 self.add_attribute(name = "mode",
116 help = "Instance execution mode",
117 type = Attribute.ENUM,
118 value = AccessConfiguration.MODE_SINGLE_PROCESS,
119 allowed = [AccessConfiguration.MODE_DAEMON,
120 AccessConfiguration.MODE_SINGLE_PROCESS],
121 validation_function = validation.is_enum)
122 self.add_attribute(name = "communication",
123 help = "Instance communication mode",
124 type = Attribute.ENUM,
125 value = AccessConfiguration.ACCESS_LOCAL,
126 allowed = [AccessConfiguration.ACCESS_LOCAL,
127 AccessConfiguration.ACCESS_SSH],
128 validation_function = validation.is_enum)
129 self.add_attribute(name = "host",
130 help = "Host where the testbed will be executed",
131 type = Attribute.STRING,
133 validation_function = validation.is_string)
134 self.add_attribute(name = "user",
135 help = "User on the Host to execute the testbed",
136 type = Attribute.STRING,
137 validation_function = validation.is_string)
138 self.add_attribute(name = "port",
139 help = "Port on the Host",
140 type = Attribute.INTEGER,
142 validation_function = validation.is_integer)
143 self.add_attribute(name = "rootDirectory",
144 help = "Root directory for storing process files",
145 type = Attribute.STRING,
147 validation_function = validation.is_string) # TODO: validation.is_path
148 self.add_attribute(name = "useAgent",
149 help = "Use -A option for forwarding of the authentication agent, if ssh access is used",
150 type = Attribute.BOOL,
152 validation_function = validation.is_bool)
154 def create_controller(xml, access_config = None):
155 mode = None if not access_config else access_config.get_attribute_value("mode")
156 if not mode or mode == AccessConfiguration.MODE_SINGLE_PROCESS:
157 from nepi.core.execute import ExperimentController
158 return ExperimentController(xml)
159 elif mode ==AccessConfiguration.MODE_DAEMON:
160 root_dir = access_config.get_attribute_value("rootDirectory")
161 return ExperimentControllerProxy(xml, root_dir)
162 raise RuntimeError("Unsupported access configuration 'mode'" % mode)
164 def create_testbed_instance(testbed_id, testbed_version, access_config):
165 mode = None if not access_config else access_config.get_attribute_value("mode")
166 if not mode or mode == AccessConfiguration.MODE_SINGLE_PROCESS:
167 return _build_testbed_testbed(testbed_id, testbed_version)
168 elif mode == AccessConfiguration.MODE_DAEMON:
169 root_dir = access_config.get_attribute_value("rootDirectory")
170 return TestbedIntanceProxy(testbed_id, testbed_version, root_dir)
171 raise RuntimeError("Unsupported access configuration 'mode'" % mode)
173 def _build_testbed_testbed(testbed_id, testbed_version):
174 mod_name = "nepi.testbeds.%s" % (testbed_id.lower())
175 if not mod_name in sys.modules:
177 module = sys.modules[mod_name]
178 return module.TestbedInstance(testbed_version)
180 class TestbedInstanceServer(server.Server):
181 def __init__(self, testbed_id, testbed_version, root_dir):
182 super(TestbedInstanceServer, self).__init__(root_dir)
183 self._testbed_id = testbed_id
184 self._testbed_version = testbed_version
187 def post_daemonize(self):
188 self._testbed = _build_testbed_testbed(self._testbed_id,
189 self._testbed_version)
191 def reply_action(self, msg):
192 params = msg.split("|")
193 instruction = int(params[0])
195 if instruction == TRACE:
196 return self.trace(params)
197 elif instruction == START:
198 return self.start(params)
199 elif instruction == STOP:
200 return self.stop(params)
201 elif instruction == SHUTDOWN:
202 return self.shutdown(params)
203 elif instruction == CONFIGURE:
204 return self.configure(params)
205 elif instruction == CREATE:
206 return self.create(params)
207 elif instruction == CREATE_SET:
208 return self.create_set(params)
209 elif instruction == FACTORY_SET:
210 return self.factory_set(params)
211 elif instruction == CONNECT:
212 return self.connect(params)
213 elif instruction == CROSS_CONNECT:
214 return self.cross_connect(params)
215 elif instruction == ADD_TRACE:
216 return self.add_trace(params)
217 elif instruction == ADD_ADDRESS:
218 return self.add_address(params)
219 elif instruction == ADD_ROUTE:
220 return self.add_route(params)
221 elif instruction == DO_SETUP:
222 return self.do_setup(params)
223 elif instruction == DO_CREATE:
224 return self.do_create(params)
225 elif instruction == DO_CONNECT:
226 return self.do_connect(params)
227 elif instruction == DO_CONFIGURE:
228 return self.do_configure(params)
229 elif instruction == DO_CROSS_CONNECT:
230 return self.do_cross_connect(params)
231 elif instruction == GET:
232 return self.get(params)
233 elif instruction == SET:
234 return self.set(params)
235 elif instruction == ACTION:
236 return self.action(params)
237 elif instruction == STATUS:
238 return self.status(params)
239 elif instruction == GUIDS:
240 return self.guids(params)
242 error = "Invalid instruction %s" % instruction
243 self.log_error(error)
244 result = base64.b64encode(error)
245 return "%d|%s" % (ERROR, result)
247 error = self.log_error()
248 result = base64.b64encode(error)
249 return "%d|%s" % (ERROR, result)
251 def guids(self, params):
252 guids = self._testbed.guids
253 guids = ",".join(map(str, guids))
254 result = base64.b64encode(guids)
255 return "%d|%s" % (OK, result)
257 def create(self, params):
258 guid = int(params[1])
259 factory_id = params[2]
260 self._testbed.create(guid, factory_id)
261 return "%d|%s" % (OK, "")
263 def trace(self, params):
264 guid = int(params[1])
266 trace = self._testbed.trace(guid, trace_id)
267 result = base64.b64encode(trace)
268 return "%d|%s" % (OK, result)
270 def start(self, params):
272 self._testbed.start(time)
273 return "%d|%s" % (OK, "")
275 def stop(self, params):
277 self._testbed.stop(time)
278 return "%d|%s" % (OK, "")
280 def shutdown(self, params):
281 self._testbed.shutdown()
282 return "%d|%s" % (OK, "")
284 def configure(self, params):
285 name = base64.b64decode(params[1])
286 value = base64.b64decode(params[2])
287 type = int(params[3])
288 value = set_type(type, value)
289 self._testbed.configure(name, value)
290 return "%d|%s" % (OK, "")
292 def create_set(self, params):
293 guid = int(params[1])
294 name = base64.b64decode(params[2])
295 value = base64.b64decode(params[3])
296 type = int(params[4])
297 value = set_type(type, value)
298 self._testbed.create_set(guid, name, value)
299 return "%d|%s" % (OK, "")
301 def factory_set(self, params):
302 name = base64.b64decode(params[1])
303 value = base64.b64decode(params[2])
304 type = int(params[3])
305 value = set_type(type, value)
306 self._testbed.factory_set(name, value)
307 return "%d|%s" % (OK, "")
309 def connect(self, params):
310 guid1 = int(params[1])
311 connector_type_name1 = params[2]
312 guid2 = int(params[3])
313 connector_type_name2 = params[4]
314 self._testbed.connect(guid1, connector_type_name1, guid2,
315 connector_type_name2)
316 return "%d|%s" % (OK, "")
318 def cross_connect(self, params):
319 guid = int(params[1])
320 connector_type_name = params[2]
321 cross_guid = int(params[3])
322 connector_type_name = params[4]
323 cross_guid = int(params[5])
324 cross_testbed_id = params[6]
325 cross_factory_id = params[7]
326 cross_connector_type_name = params[8]
327 self._testbed.cross_connect(guid, connector_type_name, cross_guid,
328 cross_testbed_id, cross_factory_id, cross_connector_type_name)
329 return "%d|%s" % (OK, "")
331 def add_trace(self, params):
332 guid = int(params[1])
334 self._testbed.add_trace(guid, trace_id)
335 return "%d|%s" % (OK, "")
337 def add_address(self, params):
338 guid = int(params[1])
339 family = int(params[2])
341 netprefix = int(params[4])
342 broadcast = params[5]
343 self._testbed.add_address(guid, family, address, netprefix,
345 return "%d|%s" % (OK, "")
347 def add_route(self, params):
348 guid = int(params[1])
349 destination = params[2]
350 netprefix = int(params[3])
352 self._testbed.add_route(guid, destination, netprefix, nexthop)
353 return "%d|%s" % (OK, "")
355 def do_setup(self, params):
356 self._testbed.do_setup()
357 return "%d|%s" % (OK, "")
359 def do_create(self, params):
360 self._testbed.do_create()
361 return "%d|%s" % (OK, "")
363 def do_connect(self, params):
364 self._testbed.do_connect()
365 return "%d|%s" % (OK, "")
367 def do_configure(self, params):
368 self._testbed.do_configure()
369 return "%d|%s" % (OK, "")
371 def do_cross_connect(self, params):
372 self._testbed.do_cross_connect()
373 return "%d|%s" % (OK, "")
375 def get(self, params):
377 guid = int(param[2] )
378 name = base64.b64decode(params[3])
379 value = self._testbed.get(time, guid, name)
380 result = base64.b64encode(str(value))
381 return "%d|%s" % (OK, result)
383 def set(self, params):
385 guid = int(params[2])
386 name = base64.b64decode(params[3])
387 value = base64.b64decode(params[4])
388 type = int(params[3])
389 value = set_type(type, value)
390 self._testbed.set(time, guid, name, value)
391 return "%d|%s" % (OK, "")
393 def action(self, params):
395 guid = int(params[2])
396 command = base64.b64decode(params[3])
397 self._testbed.action(time, guid, command)
398 return "%d|%s" % (OK, "")
400 def status(self, params):
401 guid = int(params[1])
402 status = self._testbed.status(guid)
403 result = base64.b64encode(str(status))
404 return "%d|%s" % (OK, result)
406 class ExperimentControllerServer(server.Server):
407 def __init__(self, experiment_xml, root_dir):
408 super(ExperimentControllerServer, self).__init__(root_dir)
409 self._experiment_xml = experiment_xml
410 self._controller = None
412 def post_daemonize(self):
413 from nepi.core.execute import ExperimentController
414 self._controller = ExperimentController(self._experiment_xml)
416 def reply_action(self, msg):
417 params = msg.split("|")
418 instruction = int(params[0])
420 if instruction == XML:
421 return self.experiment_xml(params)
422 elif instruction == ACCESS:
423 return self.set_access_configuration(params)
424 elif instruction == TRACE:
425 return self.trace(params)
426 elif instruction == FINISHED:
427 return self.is_finished(params)
428 elif instruction == START:
429 return self.start(params)
430 elif instruction == STOP:
431 return self.stop(params)
432 elif instruction == SHUTDOWN:
433 return self.shutdown(params)
435 error = "Invalid instruction %s" % instruction
436 self.log_error(error)
437 result = base64.b64encode(error)
438 return "%d|%s" % (ERROR, result)
440 error = self.log_error()
441 result = base64.b64encode(error)
442 return "%d|%s" % (ERROR, result)
444 def experiment_xml(self, params):
445 xml = self._controller.experiment_xml
446 result = base64.b64encode(xml)
447 return "%d|%s" % (OK, result)
449 def set_access_configuration(self, params):
450 testbed_guid = int(params[1])
452 communication = params[3]
455 port = int(params[6])
457 use_agent = bool(params[8])
458 access_config = AccessConfiguration()
459 access_config.set_attribute_value("mode", mode)
460 access_config.set_attribute_value("communication", communication)
461 access_config.set_attribute_value("host", host)
462 access_config.set_attribute_value("user", user)
463 access_config.set_attribute_value("port", port)
464 access_config.set_attribute_value("rootDirectory", root_dir)
465 access_config.set_attribute_value("useAgent", use_agent)
466 self._controller.set_access_configuration(testbed_guid,
468 return "%d|%s" % (OK, "")
470 def trace(self, params):
471 testbed_guid = int(params[1])
472 guid = int(params[2])
474 trace = self._controller.trace(testbed_guid, guid, trace_id)
475 result = base64.b64encode(trace)
476 return "%d|%s" % (OK, "%s" % result)
478 def is_finished(self, params):
479 guid = int(params[1])
480 result = self._controller.is_finished(guid)
481 return "%d|%s" % (OK, "%r" % result)
483 def start(self, params):
484 self._controller.start()
485 return "%d|%s" % (OK, "")
487 def stop(self, params):
488 self._controller.stop()
489 return "%d|%s" % (OK, "")
491 def shutdown(self, params):
492 self._controller.shutdown()
493 return "%d|%s" % (OK, "")
495 class TestbedIntanceProxy(object):
496 def __init__(self, testbed_id, testbed_version, root_dir):
498 s = TestbedInstanceServer(testbed_id, testbed_version,
502 self._client = server.Client(root_dir)
506 msg = testbed_messages[GUIDS]
507 self._client.send_msg(msg)
508 reply = self._client.read_reply()
509 result = reply.split("|")
510 code = int(result[0])
511 text = base64.b64decode(result[1])
513 raise RuntimeError(text)
514 return map(int, text.split(","))
516 def configure(self, name, value):
517 msg = testbed_messages[CONFIGURE]
518 type = get_type(value)
519 # avoid having "|" in this parameters
520 name = base64.b64encode(name)
521 value = base64.b64encode(str(value))
522 msg = msg % (name, value, type)
523 self._client.send_msg(msg)
524 reply = self._client.read_reply()
525 result = reply.split("|")
526 code = int(result[0])
527 text = base64.b64decode(result[1])
529 raise RuntimeError(text)
531 def create(self, guid, factory_id):
532 msg = testbed_messages[CREATE]
533 msg = msg % (guid, factory_id)
534 self._client.send_msg(msg)
535 reply = self._client.read_reply()
536 result = reply.split("|")
537 code = int(result[0])
538 text = base64.b64decode(result[1])
540 raise RuntimeError(text)
542 def create_set(self, guid, name, value):
543 msg = testbed_messages[CREATE_SET]
544 type = get_type(value)
545 # avoid having "|" in this parameters
546 name = base64.b64encode(name)
547 value = base64.b64encode(str(value))
548 msg = msg % (guid, name, value, type)
549 self._client.send_msg(msg)
550 reply = self._client.read_reply()
551 result = reply.split("|")
552 code = int(result[0])
553 text = base64.b64decode(result[1])
555 raise RuntimeError(text)
557 def factory_set(self, guid, name, value):
558 msg = testbed_messages[FACTORY_SET]
559 type = get_type(value)
560 # avoid having "|" in this parameters
561 name = base64.b64encode(name)
562 value = base64.b64encode(str(value))
563 msg = msg % (guid, name, value, type)
564 self._client.send_msg(msg)
565 reply = self._client.read_reply()
566 result = reply.split("|")
567 code = int(result[0])
568 text = base64.b64decode(result[1])
570 raise RuntimeError(text)
572 def connect(self, guid1, connector_type_name1, guid2,
573 connector_type_name2):
574 msg = testbed_messages[CONNECT]
575 msg = msg % (guid1, connector_type_name1, guid2,
576 connector_type_name2)
577 self._client.send_msg(msg)
578 reply = self._client.read_reply()
579 result = reply.split("|")
580 code = int(result[0])
581 text = base64.b64decode(result[1])
583 raise RuntimeError(text)
585 def cross_connect(self, guid, connector_type_name, cross_guid,
586 cross_testbed_id, cross_factory_id, cross_connector_type_name):
587 msg = testbed_messages[CROSS_CONNECT]
588 msg = msg % (guid, connector_type_name, cross_guid,
589 cross_testbed_id, cross_factory_id, cross_connector_type_name)
590 self._client.send_msg(msg)
591 reply = self._client.read_reply()
592 result = reply.split("|")
593 code = int(result[0])
594 text = base64.b64decode(result[1])
596 raise RuntimeError(text)
598 def add_trace(self, guid, trace_id):
599 msg = testbed_messages[ADD_TRACE]
600 msg = msg % (guid, trace_id)
601 self._client.send_msg(msg)
602 reply = self._client.read_reply()
603 result = reply.split("|")
604 code = int(result[0])
605 text = base64.b64decode(result[1])
607 raise RuntimeError(text)
609 def add_address(self, guid, family, address, netprefix, broadcast):
610 msg = testbed_messages[ADD_ADDRESS]
611 msg = msg % (guid, family, address, netprefix, broadcast)
612 self._client.send_msg(msg)
613 reply = self._client.read_reply()
614 result = reply.split("|")
615 code = int(result[0])
616 text = base64.b64decode(result[1])
618 raise RuntimeError(text)
620 def add_route(self, guid, destination, netprefix, nexthop):
621 msg = testbed_messages[ADD_ROUTE]
622 msg = msg % (guid, destination, netprefix, nexthop)
623 self._client.send_msg(msg)
624 reply = self._client.read_reply()
625 result = reply.split("|")
626 code = int(result[0])
627 text = base64.b64decode(result[1])
629 raise RuntimeError(text)
632 msg = testbed_messages[DO_SETUP]
633 self._client.send_msg(msg)
634 reply = self._client.read_reply()
635 result = reply.split("|")
636 code = int(result[0])
637 text = base64.b64decode(result[1])
639 raise RuntimeError(text)
642 msg = testbed_messages[DO_CREATE]
643 self._client.send_msg(msg)
644 reply = self._client.read_reply()
645 result = reply.split("|")
646 code = int(result[0])
647 text = base64.b64decode(result[1])
649 raise RuntimeError(text)
651 def do_connect(self):
652 msg = testbed_messages[DO_CONNECT]
653 self._client.send_msg(msg)
654 reply = self._client.read_reply()
655 result = reply.split("|")
656 code = int(result[0])
657 text = base64.b64decode(result[1])
659 raise RuntimeError(text)
661 def do_configure(self):
662 msg = testbed_messages[DO_CONFIGURE]
663 self._client.send_msg(msg)
664 reply = self._client.read_reply()
665 result = reply.split("|")
666 code = int(result[0])
667 text = base64.b64decode(result[1])
669 raise RuntimeError(text)
671 def do_cross_connect(self):
672 msg = testbed_messages[DO_CROSS_CONNECT]
673 self._client.send_msg(msg)
674 reply = self._client.read_reply()
675 result = reply.split("|")
676 code = int(result[0])
677 text = base64.b64decode(result[1])
679 raise RuntimeError(text)
681 def start(self, time = TIME_NOW):
682 msg = testbed_messages[START]
684 self._client.send_msg(msg)
685 reply = self._client.read_reply()
686 result = reply.split("|")
687 code = int(result[0])
688 text = base64.b64decode(result[1])
690 raise RuntimeError(text)
692 def stop(self, time = TIME_NOW):
693 msg = testbed_messages[STOP]
695 self._client.send_msg(msg)
696 reply = self._client.read_reply()
697 result = reply.split("|")
698 code = int(result[0])
699 text = base64.b64decode(result[1])
701 raise RuntimeError(text)
703 def set(self, time, guid, name, value):
704 msg = testbed_messages[SET]
705 type = get_type(value)
706 # avoid having "|" in this parameters
707 name = base64.b64encode(name)
708 value = base64.b64encode(str(value))
709 msg = msg % (time, guid, name, value, type)
710 self._client.send_msg(msg)
711 reply = self._client.read_reply()
712 result = reply.split("|")
713 code = int(result[0])
714 text = base64.b64decode(result[1])
716 raise RuntimeError(text)
718 def get(self, time, guid, name):
719 msg = testbed_messages[GET]
720 # avoid having "|" in this parameters
721 name = base64.b64encode(name)
722 msg = msg % (time, guid, name)
723 self._client.send_msg(msg)
724 reply = self._client.read_reply()
725 result = reply.split("|")
726 code = int(result[0])
727 text = base64.b64decode(result[1])
729 raise RuntimeError(text)
732 def action(self, time, guid, action):
733 msg = testbed_messages[ACTION]
734 msg = msg % (time, guid, action)
735 self._client.send_msg(msg)
736 reply = self._client.read_reply()
737 result = reply.split("|")
738 code = int(result[0])
739 text = base64.b64decode(result[1])
741 raise RuntimeError(text)
743 def status(self, guid):
744 msg = testbed_messages[STATUS]
746 self._client.send_msg(msg)
747 reply = self._client.read_reply()
748 result = reply.split("|")
749 code = int(result[0])
750 text = base64.b64decode(result[1])
752 raise RuntimeError(text)
755 def trace(self, guid, trace_id):
756 msg = testbed_messages[TRACE]
757 msg = msg % (guid, trace_id)
758 self._client.send_msg(msg)
759 reply = self._client.read_reply()
760 result = reply.split("|")
761 code = int(result[0])
762 text = base64.b64decode(result[1])
764 raise RuntimeError(text)
768 msg = testbed_messages[SHUTDOWN]
769 self._client.send_msg(msg)
770 reply = self._client.read_reply()
771 result = reply.split("|")
772 code = int(result[0])
773 text = base64.b64decode(result[1])
775 raise RuntimeError(text)
776 self._client.send_stop()
778 class ExperimentControllerProxy(object):
779 def __init__(self, experiment_xml, root_dir):
781 s = ExperimentControllerServer(experiment_xml, root_dir)
784 self._client = server.Client(root_dir)
787 def experiment_xml(self):
788 msg = controller_messages[XML]
789 self._client.send_msg(msg)
790 reply = self._client.read_reply()
791 result = reply.split("|")
792 code = int(result[0])
793 text = base64.b64decode(result[1])
796 raise RuntimeError(text)
798 def set_access_configuration(self, testbed_guid, access_config):
799 mode = access_config.get_attribute_value("mode")
800 communication = access_config.get_attribute_value("communication")
801 host = access_config.get_attribute_value("host")
802 user = access_config.get_attribute_value("user")
803 port = access_config.get_attribute_value("port")
804 root_dir = access_config.get_attribute_value("rootDirectory")
805 use_agent = access_config.get_attribute_value("useAgent")
806 msg = controller_messages[ACCESS]
807 msg = msg % (testbed_guid, mode, communication, host, user, port,
809 self._client.send_msg(msg)
810 reply = self._client.read_reply()
811 result = reply.split("|")
812 code = int(result[0])
813 text = base64.b64decode(result[1])
815 raise RuntimeError(text)
817 def trace(self, testbed_guid, guid, trace_id):
818 msg = controller_messages[TRACE]
819 msg = msg % (testbed_guid, guid, trace_id)
820 self._client.send_msg(msg)
821 reply = self._client.read_reply()
822 result = reply.split("|")
823 code = int(result[0])
824 text = base64.b64decode(result[1])
827 raise RuntimeError(text)
830 msg = controller_messages[START]
831 self._client.send_msg(msg)
832 reply = self._client.read_reply()
833 result = reply.split("|")
834 code = int(result[0])
835 text = base64.b64decode(result[1])
837 raise RuntimeError(text)
840 msg = controller_messages[STOP]
841 self._client.send_msg(msg)
842 reply = self._client.read_reply()
843 result = reply.split("|")
844 code = int(result[0])
845 text = base64.b64decode(result[1])
847 raise RuntimeError(text)
849 def is_finished(self, guid):
850 msg = controller_messages[FINISHED]
852 self._client.send_msg(msg)
853 reply = self._client.read_reply()
854 result = reply.split("|")
855 code = int(result[0])
857 return bool(result[1])
858 error = base64.b64decode(result[1])
859 raise RuntimeError(error)
862 msg = controller_messages[SHUTDOWN]
863 self._client.send_msg(msg)
864 reply = self._client.read_reply()
865 result = reply.split("|")
866 code = int(result[0])
867 text = base64.b64decode(result[1])
869 raise RuntimeError(text)
870 self._client.send_stop()