2 # -*- coding: utf-8 -*-
5 from nepi.core.attributes import AttributesMap, Attribute
6 from nepi.util import server, validation
7 from nepi.util.constants import TIME_NOW, ATTR_NEPI_TESTBED_ENVIRONMENT_SETUP, DeploymentConfiguration as DC
19 # PROTOCOL INSTRUCTION MESSAGES
39 DO_CROSS_CONNECT_INIT = 22
49 GET_ATTRIBUTE_LIST = 32
51 DO_CROSS_CONNECT_COMPL = 34
63 # EXPERIMENT CONTROLER PROTOCOL MESSAGES
64 controller_messages = dict({
66 TRACE: "%d|%s" % (TRACE, "%d|%d|%s|%s"),
67 FINISHED: "%d|%s" % (FINISHED, "%d"),
70 RECOVER : "%d" % RECOVER,
71 SHUTDOWN: "%d" % SHUTDOWN,
74 # TESTBED INSTANCE PROTOCOL MESSAGES
75 testbed_messages = dict({
76 TRACE: "%d|%s" % (TRACE, "%d|%s|%s"),
79 SHUTDOWN: "%d" % SHUTDOWN,
80 CONFIGURE: "%d|%s" % (CONFIGURE, "%s|%s|%d"),
81 CREATE: "%d|%s" % (CREATE, "%d|%s"),
82 CREATE_SET: "%d|%s" % (CREATE_SET, "%d|%s|%s|%d"),
83 FACTORY_SET: "%d|%s" % (FACTORY_SET, "%d|%s|%s|%d"),
84 CONNECT: "%d|%s" % (CONNECT, "%d|%s|%d|%s"),
85 CROSS_CONNECT: "%d|%s" % (CROSS_CONNECT, "%d|%s|%d|%d|%s|%s|%s"),
86 ADD_TRACE: "%d|%s" % (ADD_TRACE, "%d|%s"),
87 ADD_ADDRESS: "%d|%s" % (ADD_ADDRESS, "%d|%s|%d|%s"),
88 ADD_ROUTE: "%d|%s" % (ADD_ROUTE, "%d|%s|%d|%s"),
89 DO_SETUP: "%d" % DO_SETUP,
90 DO_CREATE: "%d" % DO_CREATE,
91 DO_CONNECT_INIT: "%d" % DO_CONNECT_INIT,
92 DO_CONNECT_COMPL: "%d" % DO_CONNECT_COMPL,
93 DO_CONFIGURE: "%d" % DO_CONFIGURE,
94 DO_PRECONFIGURE: "%d" % DO_PRECONFIGURE,
95 DO_CROSS_CONNECT_INIT: "%d|%s" % (DO_CROSS_CONNECT_INIT, "%s"),
96 DO_CROSS_CONNECT_COMPL: "%d|%s" % (DO_CROSS_CONNECT_COMPL, "%s"),
97 GET: "%d|%s" % (GET, "%d|%s|%s"),
98 SET: "%d|%s" % (SET, "%d|%s|%s|%d|%s"),
99 EXPERIMENT_GET: "%d|%s" % (EXPERIMENT_GET, "%d|%d|%s|%s"),
100 EXPERIMENT_SET: "%d|%s" % (EXPERIMENT_SET, "%d|%d|%s|%s|%d|%s"),
101 GET_ROUTE: "%d|%s" % (GET_ROUTE, "%d|%d|%s"),
102 GET_ADDRESS: "%d|%s" % (GET_ADDRESS, "%d|%d|%s"),
103 ACTION: "%d|%s" % (ACTION, "%s|%d|%s"),
104 STATUS: "%d|%s" % (STATUS, "%s"),
106 GET_ATTRIBUTE_LIST: "%d|%s" % (GET_ATTRIBUTE_LIST,"%d"),
107 TESTBED_ID: "%d" % TESTBED_ID,
108 TESTBED_VERSION: "%d" % TESTBED_VERSION,
111 instruction_text = dict({
116 FINISHED: "FINISHED",
120 SHUTDOWN: "SHUTDOWN",
121 CONFIGURE: "CONFIGURE",
123 CREATE_SET: "CREATE_SET",
124 FACTORY_SET: "FACTORY_SET",
126 CROSS_CONNECT: "CROSS_CONNECT",
127 ADD_TRACE: "ADD_TRACE",
128 ADD_ADDRESS: "ADD_ADDRESS",
129 ADD_ROUTE: "ADD_ROUTE",
130 DO_SETUP: "DO_SETUP",
131 DO_CREATE: "DO_CREATE",
132 DO_CONNECT_INIT: "DO_CONNECT_INIT",
133 DO_CONNECT_COMPL: "DO_CONNECT_COMPL",
134 DO_CONFIGURE: "DO_CONFIGURE",
135 DO_PRECONFIGURE: "DO_PRECONFIGURE",
136 DO_CROSS_CONNECT_INIT: "DO_CROSS_CONNECT_INIT",
137 DO_CROSS_CONNECT_COMPL: "DO_CROSS_CONNECT_COMPL",
140 GET_ROUTE: "GET_ROUTE",
141 GET_ADDRESS: "GET_ADDRESS",
142 GET_ATTRIBUTE_LIST: "GET_ATTRIBUTE_LIST",
150 TESTBED_ID: "TESTBED_ID",
151 TESTBED_VERSION: "TESTBED_VERSION",
152 EXPERIMENT_SET: "EXPERIMENT_SET",
153 EXPERIMENT_GET: "EXPERIMENT_GET",
157 if isinstance(value, bool):
159 elif isinstance(value, int):
161 elif isinstance(value, float):
166 def set_type(type, value):
172 value = value == "True"
177 def log_msg(server, params):
178 instr = int(params[0])
179 instr_txt = instruction_text[instr]
180 server.log_debug("%s - msg: %s [%s]" % (server.__class__.__name__,
181 instr_txt, ", ".join(map(str, params[1:]))))
183 def log_reply(server, reply):
184 res = reply.split("|")
186 code_txt = instruction_text[code]
187 txt = base64.b64decode(res[1])
188 server.log_debug("%s - reply: %s %s" % (server.__class__.__name__,
191 def to_server_log_level(log_level):
194 if log_level == DC.DEBUG_LEVEL
195 else server.ERROR_LEVEL
198 def get_access_config_params(access_config):
199 root_dir = access_config.get_attribute_value(DC.ROOT_DIRECTORY)
200 log_level = access_config.get_attribute_value(DC.LOG_LEVEL)
201 log_level = to_server_log_level(log_level)
202 user = host = port = agent = key = None
203 communication = access_config.get_attribute_value(DC.DEPLOYMENT_COMMUNICATION)
204 environment_setup = (
205 access_config.get_attribute_value(DC.DEPLOYMENT_ENVIRONMENT_SETUP)
206 if access_config.has_attribute(DC.DEPLOYMENT_ENVIRONMENT_SETUP)
209 if communication == DC.ACCESS_SSH:
210 user = access_config.get_attribute_value(DC.DEPLOYMENT_USER)
211 host = access_config.get_attribute_value(DC.DEPLOYMENT_HOST)
212 port = access_config.get_attribute_value(DC.DEPLOYMENT_PORT)
213 agent = access_config.get_attribute_value(DC.USE_AGENT)
214 key = access_config.get_attribute_value(DC.DEPLOYMENT_KEY)
215 return (root_dir, log_level, user, host, port, key, agent, environment_setup)
217 class AccessConfiguration(AttributesMap):
218 def __init__(self, params = None):
219 super(AccessConfiguration, self).__init__()
221 from nepi.core.metadata import Metadata
223 for _,attr_info in Metadata.DEPLOYMENT_ATTRIBUTES:
224 self.add_attribute(**attr_info)
227 for attr_name, attr_value in params.iteritems():
228 parser = Attribute.type_parsers[self.get_attribute_type(attr_name)]
229 attr_value = parser(attr_value)
230 self.set_attribute_value(attr_name, attr_value)
232 class TempDir(object):
234 self.path = tempfile.mkdtemp()
237 shutil.rmtree(self.path)
239 class PermDir(object):
240 def __init__(self, path):
243 def create_controller(xml, access_config = None):
244 mode = None if not access_config \
245 else access_config.get_attribute_value(DC.DEPLOYMENT_MODE)
246 launch = True if not access_config \
247 else not access_config.get_attribute_value(DC.RECOVER)
248 if not mode or mode == DC.MODE_SINGLE_PROCESS:
250 raise ValueError, "Unsupported instantiation mode: %s with lanch=False" % (mode,)
252 from nepi.core.execute import ExperimentController
254 if not access_config or not access_config.has_attribute(DC.ROOT_DIRECTORY):
257 root_dir = PermDir(access_config.get_attribute_value(DC.ROOT_DIRECTORY))
258 controller = ExperimentController(xml, root_dir.path)
260 # inject reference to temporary dir, so that it gets cleaned
261 # up at destruction time.
262 controller._tempdir = root_dir
265 elif mode == DC.MODE_DAEMON:
266 (root_dir, log_level, user, host, port, key, agent, environment_setup) = \
267 get_access_config_params(access_config)
268 return ExperimentControllerProxy(root_dir, log_level,
269 experiment_xml = xml, host = host, port = port, user = user, ident_key = key,
270 agent = agent, launch = launch,
271 environment_setup = environment_setup)
272 raise RuntimeError("Unsupported access configuration '%s'" % mode)
274 def create_testbed_controller(testbed_id, testbed_version, access_config):
275 mode = None if not access_config \
276 else access_config.get_attribute_value(DC.DEPLOYMENT_MODE)
277 launch = True if not access_config \
278 else not access_config.get_attribute_value(DC.RECOVER)
279 if not mode or mode == DC.MODE_SINGLE_PROCESS:
281 raise ValueError, "Unsupported instantiation mode: %s with lanch=False" % (mode,)
282 return _build_testbed_controller(testbed_id, testbed_version)
283 elif mode == DC.MODE_DAEMON:
284 (root_dir, log_level, user, host, port, key, agent, environment_setup) = \
285 get_access_config_params(access_config)
286 return TestbedControllerProxy(root_dir, log_level, testbed_id = testbed_id,
287 testbed_version = testbed_version, host = host, port = port, ident_key = key,
288 user = user, agent = agent, launch = launch,
289 environment_setup = environment_setup)
290 raise RuntimeError("Unsupported access configuration '%s'" % mode)
292 def _build_testbed_controller(testbed_id, testbed_version):
293 mod_name = "nepi.testbeds.%s" % (testbed_id.lower())
294 if not mod_name in sys.modules:
296 module = sys.modules[mod_name]
297 return module.TestbedController(testbed_version)
299 class TestbedControllerServer(server.Server):
300 def __init__(self, root_dir, log_level, testbed_id, testbed_version):
301 super(TestbedControllerServer, self).__init__(root_dir, log_level)
302 self._testbed_id = testbed_id
303 self._testbed_version = testbed_version
306 def post_daemonize(self):
307 self._testbed = _build_testbed_controller(self._testbed_id,
308 self._testbed_version)
310 def reply_action(self, msg):
312 result = base64.b64encode("Invalid command line")
313 reply = "%d|%s" % (ERROR, result)
315 params = msg.split("|")
316 instruction = int(params[0])
317 log_msg(self, params)
319 if instruction == TRACE:
320 reply = self.trace(params)
321 elif instruction == START:
322 reply = self.start(params)
323 elif instruction == STOP:
324 reply = self.stop(params)
325 elif instruction == SHUTDOWN:
326 reply = self.shutdown(params)
327 elif instruction == CONFIGURE:
328 reply = self.defer_configure(params)
329 elif instruction == CREATE:
330 reply = self.defer_create(params)
331 elif instruction == CREATE_SET:
332 reply = self.defer_create_set(params)
333 elif instruction == FACTORY_SET:
334 reply = self.defer_factory_set(params)
335 elif instruction == CONNECT:
336 reply = self.defer_connect(params)
337 elif instruction == CROSS_CONNECT:
338 reply = self.defer_cross_connect(params)
339 elif instruction == ADD_TRACE:
340 reply = self.defer_add_trace(params)
341 elif instruction == ADD_ADDRESS:
342 reply = self.defer_add_address(params)
343 elif instruction == ADD_ROUTE:
344 reply = self.defer_add_route(params)
345 elif instruction == DO_SETUP:
346 reply = self.do_setup(params)
347 elif instruction == DO_CREATE:
348 reply = self.do_create(params)
349 elif instruction == DO_CONNECT_INIT:
350 reply = self.do_connect_init(params)
351 elif instruction == DO_CONNECT_COMPL:
352 reply = self.do_connect_compl(params)
353 elif instruction == DO_CONFIGURE:
354 reply = self.do_configure(params)
355 elif instruction == DO_PRECONFIGURE:
356 reply = self.do_preconfigure(params)
357 elif instruction == DO_CROSS_CONNECT_INIT:
358 reply = self.do_cross_connect_init(params)
359 elif instruction == DO_CROSS_CONNECT_COMPL:
360 reply = self.do_cross_connect_compl(params)
361 elif instruction == GET:
362 reply = self.get(params)
363 elif instruction == SET:
364 reply = self.set(params)
365 elif instruction == GET_ADDRESS:
366 reply = self.get_address(params)
367 elif instruction == GET_ROUTE:
368 reply = self.get_route(params)
369 elif instruction == ACTION:
370 reply = self.action(params)
371 elif instruction == STATUS:
372 reply = self.status(params)
373 elif instruction == GUIDS:
374 reply = self.guids(params)
375 elif instruction == GET_ATTRIBUTE_LIST:
376 reply = self.get_attribute_list(params)
377 elif instruction == TESTBED_ID:
378 reply = self.testbed_id(params)
379 elif instruction == TESTBED_VERSION:
380 reply = self.testbed_version(params)
382 error = "Invalid instruction %s" % instruction
383 self.log_error(error)
384 result = base64.b64encode(error)
385 reply = "%d|%s" % (ERROR, result)
387 error = self.log_error()
388 result = base64.b64encode(error)
389 reply = "%d|%s" % (ERROR, result)
390 log_reply(self, reply)
393 def guids(self, params):
394 guids = self._testbed.guids
395 value = cPickle.dumps(guids)
396 result = base64.b64encode(value)
397 return "%d|%s" % (OK, result)
399 def testbed_id(self, params):
400 testbed_id = self._testbed.testbed_id
401 result = base64.b64encode(str(testbed_id))
402 return "%d|%s" % (OK, result)
404 def testbed_version(self, params):
405 testbed_version = self._testbed.testbed_version
406 result = base64.b64encode(str(testbed_version))
407 return "%d|%s" % (OK, result)
409 def defer_create(self, params):
410 guid = int(params[1])
411 factory_id = params[2]
412 self._testbed.defer_create(guid, factory_id)
413 return "%d|%s" % (OK, "")
415 def trace(self, params):
416 guid = int(params[1])
418 attribute = base64.b64decode(params[3])
419 trace = self._testbed.trace(guid, trace_id, attribute)
420 result = base64.b64encode(trace)
421 return "%d|%s" % (OK, result)
423 def start(self, params):
424 self._testbed.start()
425 return "%d|%s" % (OK, "")
427 def stop(self, params):
429 return "%d|%s" % (OK, "")
431 def shutdown(self, params):
432 self._testbed.shutdown()
433 return "%d|%s" % (OK, "")
435 def defer_configure(self, params):
436 name = base64.b64decode(params[1])
437 value = base64.b64decode(params[2])
438 type = int(params[3])
439 value = set_type(type, value)
440 self._testbed.defer_configure(name, value)
441 return "%d|%s" % (OK, "")
443 def defer_create_set(self, params):
444 guid = int(params[1])
445 name = base64.b64decode(params[2])
446 value = base64.b64decode(params[3])
447 type = int(params[4])
448 value = set_type(type, value)
449 self._testbed.defer_create_set(guid, name, value)
450 return "%d|%s" % (OK, "")
452 def defer_factory_set(self, params):
453 name = base64.b64decode(params[1])
454 value = base64.b64decode(params[2])
455 type = int(params[3])
456 value = set_type(type, value)
457 self._testbed.defer_factory_set(name, value)
458 return "%d|%s" % (OK, "")
460 def defer_connect(self, params):
461 guid1 = int(params[1])
462 connector_type_name1 = params[2]
463 guid2 = int(params[3])
464 connector_type_name2 = params[4]
465 self._testbed.defer_connect(guid1, connector_type_name1, guid2,
466 connector_type_name2)
467 return "%d|%s" % (OK, "")
469 def defer_cross_connect(self, params):
470 guid = int(params[1])
471 connector_type_name = params[2]
472 cross_guid = int(params[3])
473 cross_testbed_guid = int(params[4])
474 cross_testbed_id = params[5]
475 cross_factory_id = params[6]
476 cross_connector_type_name = params[7]
477 self._testbed.defer_cross_connect(guid, connector_type_name, cross_guid,
478 cross_testbed_guid, cross_testbed_id, cross_factory_id,
479 cross_connector_type_name)
480 return "%d|%s" % (OK, "")
482 def defer_add_trace(self, params):
483 guid = int(params[1])
485 self._testbed.defer_add_trace(guid, trace_id)
486 return "%d|%s" % (OK, "")
488 def defer_add_address(self, params):
489 guid = int(params[1])
491 netprefix = int(params[3])
492 broadcast = params[4]
493 self._testbed.defer_add_address(guid, address, netprefix,
495 return "%d|%s" % (OK, "")
497 def defer_add_route(self, params):
498 guid = int(params[1])
499 destination = params[2]
500 netprefix = int(params[3])
502 self._testbed.defer_add_route(guid, destination, netprefix, nexthop)
503 return "%d|%s" % (OK, "")
505 def do_setup(self, params):
506 self._testbed.do_setup()
507 return "%d|%s" % (OK, "")
509 def do_create(self, params):
510 self._testbed.do_create()
511 return "%d|%s" % (OK, "")
513 def do_connect_init(self, params):
514 self._testbed.do_connect_init()
515 return "%d|%s" % (OK, "")
517 def do_connect_compl(self, params):
518 self._testbed.do_connect_compl()
519 return "%d|%s" % (OK, "")
521 def do_configure(self, params):
522 self._testbed.do_configure()
523 return "%d|%s" % (OK, "")
525 def do_preconfigure(self, params):
526 self._testbed.do_preconfigure()
527 return "%d|%s" % (OK, "")
529 def do_cross_connect_init(self, params):
530 pcross_data = base64.b64decode(params[1])
531 cross_data = cPickle.loads(pcross_data)
532 self._testbed.do_cross_connect_init(cross_data)
533 return "%d|%s" % (OK, "")
535 def do_cross_connect_compl(self, params):
536 pcross_data = base64.b64decode(params[1])
537 cross_data = cPickle.loads(pcross_data)
538 self._testbed.do_cross_connect_compl(cross_data)
539 return "%d|%s" % (OK, "")
541 def get(self, params):
542 guid = int(params[1])
543 name = base64.b64decode(params[2])
545 value = self._testbed.get(guid, name, time)
546 result = base64.b64encode(str(value))
547 return "%d|%s" % (OK, result)
549 def set(self, params):
550 guid = int(params[1])
551 name = base64.b64decode(params[2])
552 value = base64.b64decode(params[3])
553 type = int(params[2])
555 value = set_type(type, value)
556 self._testbed.set(guid, name, value, time)
557 return "%d|%s" % (OK, "")
559 def get_address(self, params):
560 guid = int(params[1])
561 index = int(params[2])
562 attribute = base64.b64decode(params[3])
563 value = self._testbed.get_address(guid, index, attribute)
564 result = base64.b64encode(str(value))
565 return "%d|%s" % (OK, result)
567 def get_route(self, params):
568 guid = int(params[1])
569 index = int(params[2])
570 attribute = base64.b64decode(params[3])
571 value = self._testbed.get_route(guid, index, attribute)
572 result = base64.b64encode(str(value))
573 return "%d|%s" % (OK, result)
575 def action(self, params):
577 guid = int(params[2])
578 command = base64.b64decode(params[3])
579 self._testbed.action(time, guid, command)
580 return "%d|%s" % (OK, "")
582 def status(self, params):
584 if params[1] != "None":
585 guid = int(params[1])
586 status = self._testbed.status(guid)
587 result = base64.b64encode(str(status))
588 return "%d|%s" % (OK, result)
590 def get_attribute_list(self, params):
591 guid = int(params[1])
592 attr_list = self._testbed.get_attribute_list(guid)
593 value = cPickle.dumps(attr_list)
594 result = base64.b64encode(value)
595 return "%d|%s" % (OK, result)
597 class ExperimentControllerServer(server.Server):
598 def __init__(self, root_dir, log_level, experiment_xml):
599 super(ExperimentControllerServer, self).__init__(root_dir, log_level)
600 self._experiment_xml = experiment_xml
601 self._controller = None
603 def post_daemonize(self):
604 from nepi.core.execute import ExperimentController
605 self._controller = ExperimentController(self._experiment_xml,
606 root_dir = self._root_dir)
608 def reply_action(self, msg):
610 result = base64.b64encode("Invalid command line")
611 reply = "%d|%s" % (ERROR, result)
613 params = msg.split("|")
614 instruction = int(params[0])
615 log_msg(self, params)
617 if instruction == XML:
618 reply = self.experiment_xml(params)
619 elif instruction == TRACE:
620 reply = self.trace(params)
621 elif instruction == FINISHED:
622 reply = self.is_finished(params)
623 elif instruction == EXPERIMENT_GET:
624 reply = self.get(params)
625 elif instruction == EXPERIMENT_SET:
626 reply = self.set(params)
627 elif instruction == START:
628 reply = self.start(params)
629 elif instruction == STOP:
630 reply = self.stop(params)
631 elif instruction == RECOVER:
632 reply = self.recover(params)
633 elif instruction == SHUTDOWN:
634 reply = self.shutdown(params)
636 error = "Invalid instruction %s" % instruction
637 self.log_error(error)
638 result = base64.b64encode(error)
639 reply = "%d|%s" % (ERROR, result)
641 error = self.log_error()
642 result = base64.b64encode(error)
643 reply = "%d|%s" % (ERROR, result)
644 log_reply(self, reply)
647 def experiment_xml(self, params):
648 xml = self._controller.experiment_xml
649 result = base64.b64encode(xml)
650 return "%d|%s" % (OK, result)
652 def trace(self, params):
653 testbed_guid = int(params[1])
654 guid = int(params[2])
656 attribute = base64.b64decode(params[4])
657 trace = self._controller.trace(testbed_guid, guid, trace_id, attribute)
658 result = base64.b64encode(trace)
659 return "%d|%s" % (OK, result)
661 def is_finished(self, params):
662 guid = int(params[1])
663 status = self._controller.is_finished(guid)
664 result = base64.b64encode(str(status))
665 return "%d|%s" % (OK, result)
667 def get(self, params):
668 testbed_guid = int(param[1])
669 guid = int(params[2])
670 name = base64.b64decode(params[3])
671 value = self._controller.get(testbed_guid, guid, name, time)
673 result = base64.b64encode(str(value))
674 return "%d|%s" % (OK, result)
676 def set(self, params):
677 testbed_guid = int(params[1])
678 guid = int(params[2])
679 name = base64.b64decode(params[3])
680 value = base64.b64decode(params[4])
681 type = int(params[3])
683 value = set_type(type, value)
684 self._controller.set(testbed_guid, guid, name, value, time)
685 return "%d|%s" % (OK, "")
687 def start(self, params):
688 self._controller.start()
689 return "%d|%s" % (OK, "")
691 def stop(self, params):
692 self._controller.stop()
693 return "%d|%s" % (OK, "")
695 def recover(self, params):
696 self._controller.recover()
697 return "%d|%s" % (OK, "")
699 def shutdown(self, params):
700 self._controller.shutdown()
701 return "%d|%s" % (OK, "")
703 class TestbedControllerProxy(object):
704 def __init__(self, root_dir, log_level, testbed_id = None,
705 testbed_version = None, launch = True, host = None,
706 port = None, user = None, ident_key = None, agent = None,
707 environment_setup = ""):
709 if testbed_id == None or testbed_version == None:
710 raise RuntimeError("To launch a TesbedInstance server a \
711 testbed_id and testbed_version are required")
714 python_code = "from nepi.util.proxy import "\
715 "TestbedControllerServer;"\
716 "s = TestbedControllerServer('%s', %d, '%s', '%s');"\
717 "s.run()" % (root_dir, log_level, testbed_id,
719 proc = server.popen_ssh_subprocess(python_code, host = host,
720 port = port, user = user, agent = agent,
721 ident_key = ident_key,
722 environment_setup = environment_setup,
725 err = proc.stderr.read()
726 raise RuntimeError("Server could not be executed: %s" % \
730 s = TestbedControllerServer(root_dir, log_level, testbed_id,
734 # connect client to server
735 self._client = server.Client(root_dir, host = host, port = port,
736 user = user, agent = agent,
737 environment_setup = environment_setup)
741 msg = testbed_messages[GUIDS]
742 self._client.send_msg(msg)
743 reply = self._client.read_reply()
745 raise RuntimeError, "Invalid reply: %r" % (reply,)
746 result = reply.split("|")
747 code = int(result[0])
748 text = base64.b64decode(result[1])
750 raise RuntimeError(text)
751 guids = cPickle.loads(text)
755 def testbed_id(self):
756 msg = testbed_messages[TESTBED_ID]
757 self._client.send_msg(msg)
758 reply = self._client.read_reply()
760 raise RuntimeError, "Invalid reply: %r" % (reply,)
761 result = reply.split("|")
762 code = int(result[0])
763 text = base64.b64decode(result[1])
765 raise RuntimeError(text)
769 def testbed_version(self):
770 msg = testbed_messages[TESTBED_VERSION]
771 self._client.send_msg(msg)
772 reply = self._client.read_reply()
774 raise RuntimeError, "Invalid reply: %r" % (reply,)
775 result = reply.split("|")
776 code = int(result[0])
777 text = base64.b64decode(result[1])
779 raise RuntimeError(text)
782 def defer_configure(self, name, value):
783 msg = testbed_messages[CONFIGURE]
784 type = get_type(value)
785 # avoid having "|" in this parameters
786 name = base64.b64encode(name)
787 value = base64.b64encode(str(value))
788 msg = msg % (name, value, type)
789 self._client.send_msg(msg)
790 reply = self._client.read_reply()
792 raise RuntimeError, "Invalid reply: %r" % (reply,)
793 result = reply.split("|")
794 code = int(result[0])
795 text = base64.b64decode(result[1])
797 raise RuntimeError(text)
799 def defer_create(self, guid, factory_id):
800 msg = testbed_messages[CREATE]
801 msg = msg % (guid, factory_id)
802 self._client.send_msg(msg)
803 reply = self._client.read_reply()
805 raise RuntimeError, "Invalid reply: %r" % (reply,)
806 result = reply.split("|")
807 code = int(result[0])
808 text = base64.b64decode(result[1])
810 raise RuntimeError(text)
812 def defer_create_set(self, guid, name, value):
813 msg = testbed_messages[CREATE_SET]
814 type = get_type(value)
815 # avoid having "|" in this parameters
816 name = base64.b64encode(name)
817 value = base64.b64encode(str(value))
818 msg = msg % (guid, name, value, type)
819 self._client.send_msg(msg)
820 reply = self._client.read_reply()
822 raise RuntimeError, "Invalid reply: %r" % (reply,)
823 result = reply.split("|")
824 code = int(result[0])
825 text = base64.b64decode(result[1])
827 raise RuntimeError(text)
829 def defer_factory_set(self, guid, name, value):
830 msg = testbed_messages[FACTORY_SET]
831 type = get_type(value)
832 # avoid having "|" in this parameters
833 name = base64.b64encode(name)
834 value = base64.b64encode(str(value))
835 msg = msg % (guid, name, value, type)
836 self._client.send_msg(msg)
837 reply = self._client.read_reply()
839 raise RuntimeError, "Invalid reply: %r" % (reply,)
840 result = reply.split("|")
841 code = int(result[0])
842 text = base64.b64decode(result[1])
844 raise RuntimeError(text)
846 def defer_connect(self, guid1, connector_type_name1, guid2,
847 connector_type_name2):
848 msg = testbed_messages[CONNECT]
849 msg = msg % (guid1, connector_type_name1, guid2,
850 connector_type_name2)
851 self._client.send_msg(msg)
852 reply = self._client.read_reply()
854 raise RuntimeError, "Invalid reply: %r" % (reply,)
855 result = reply.split("|")
856 code = int(result[0])
857 text = base64.b64decode(result[1])
859 raise RuntimeError(text)
861 def defer_cross_connect(self, guid, connector_type_name, cross_guid,
862 cross_testbed_guid, cross_testbed_id, cross_factory_id,
863 cross_connector_type_name):
864 msg = testbed_messages[CROSS_CONNECT]
865 msg = msg % (guid, connector_type_name, cross_guid, cross_testbed_guid,
866 cross_testbed_id, cross_factory_id, cross_connector_type_name)
867 self._client.send_msg(msg)
868 reply = self._client.read_reply()
870 raise RuntimeError, "Invalid reply: %r" % (reply,)
871 result = reply.split("|")
872 code = int(result[0])
873 text = base64.b64decode(result[1])
875 raise RuntimeError(text)
877 def defer_add_trace(self, guid, trace_id):
878 msg = testbed_messages[ADD_TRACE]
879 msg = msg % (guid, trace_id)
880 self._client.send_msg(msg)
881 reply = self._client.read_reply()
883 raise RuntimeError, "Invalid reply: %r" % (reply,)
884 result = reply.split("|")
885 code = int(result[0])
886 text = base64.b64decode(result[1])
888 raise RuntimeError(text)
890 def defer_add_address(self, guid, address, netprefix, broadcast):
891 msg = testbed_messages[ADD_ADDRESS]
892 msg = msg % (guid, address, netprefix, broadcast)
893 self._client.send_msg(msg)
894 reply = self._client.read_reply()
896 raise RuntimeError, "Invalid reply: %r" % (reply,)
897 result = reply.split("|")
898 code = int(result[0])
899 text = base64.b64decode(result[1])
901 raise RuntimeError(text)
903 def defer_add_route(self, guid, destination, netprefix, nexthop):
904 msg = testbed_messages[ADD_ROUTE]
905 msg = msg % (guid, destination, netprefix, nexthop)
906 self._client.send_msg(msg)
907 reply = self._client.read_reply()
909 raise RuntimeError, "Invalid reply: %r" % (reply,)
910 result = reply.split("|")
911 code = int(result[0])
912 text = base64.b64decode(result[1])
914 raise RuntimeError(text)
917 msg = testbed_messages[DO_SETUP]
918 self._client.send_msg(msg)
919 reply = self._client.read_reply()
921 raise RuntimeError, "Invalid reply: %r" % (reply,)
922 result = reply.split("|")
923 code = int(result[0])
924 text = base64.b64decode(result[1])
926 raise RuntimeError(text)
929 msg = testbed_messages[DO_CREATE]
930 self._client.send_msg(msg)
931 reply = self._client.read_reply()
933 raise RuntimeError, "Invalid reply: %r" % (reply,)
934 result = reply.split("|")
935 code = int(result[0])
936 text = base64.b64decode(result[1])
938 raise RuntimeError(text)
940 def do_connect_init(self):
941 msg = testbed_messages[DO_CONNECT_INIT]
942 self._client.send_msg(msg)
943 reply = self._client.read_reply()
945 raise RuntimeError, "Invalid reply: %r" % (reply,)
946 result = reply.split("|")
947 code = int(result[0])
948 text = base64.b64decode(result[1])
950 raise RuntimeError(text)
952 def do_connect_compl(self):
953 msg = testbed_messages[DO_CONNECT_COMPL]
954 self._client.send_msg(msg)
955 reply = self._client.read_reply()
957 raise RuntimeError, "Invalid reply: %r" % (reply,)
958 result = reply.split("|")
959 code = int(result[0])
960 text = base64.b64decode(result[1])
962 raise RuntimeError(text)
964 def do_configure(self):
965 msg = testbed_messages[DO_CONFIGURE]
966 self._client.send_msg(msg)
967 reply = self._client.read_reply()
969 raise RuntimeError, "Invalid reply: %r" % (reply,)
970 result = reply.split("|")
971 code = int(result[0])
972 text = base64.b64decode(result[1])
974 raise RuntimeError(text)
976 def do_preconfigure(self):
977 msg = testbed_messages[DO_PRECONFIGURE]
978 self._client.send_msg(msg)
979 reply = self._client.read_reply()
981 raise RuntimeError, "Invalid reply: %r" % (reply,)
982 result = reply.split("|")
983 code = int(result[0])
984 text = base64.b64decode(result[1])
986 raise RuntimeError(text)
988 def do_cross_connect_init(self, cross_data):
989 msg = testbed_messages[DO_CROSS_CONNECT_INIT]
990 pcross_data = cPickle.dumps(cross_data)
991 cross_data = base64.b64encode(pcross_data)
992 msg = msg % (cross_data)
993 self._client.send_msg(msg)
994 reply = self._client.read_reply()
996 raise RuntimeError, "Invalid reply: %r" % (reply,)
997 result = reply.split("|")
998 code = int(result[0])
999 text = base64.b64decode(result[1])
1001 raise RuntimeError(text)
1003 def do_cross_connect_compl(self, cross_data):
1004 msg = testbed_messages[DO_CROSS_CONNECT_COMPL]
1005 pcross_data = cPickle.dumps(cross_data)
1006 cross_data = base64.b64encode(pcross_data)
1007 msg = msg % (cross_data)
1008 self._client.send_msg(msg)
1009 reply = self._client.read_reply()
1011 raise RuntimeError, "Invalid reply: %r" % (reply,)
1012 result = reply.split("|")
1013 code = int(result[0])
1014 text = base64.b64decode(result[1])
1016 raise RuntimeError(text)
1018 def start(self, time = TIME_NOW):
1019 msg = testbed_messages[START]
1020 self._client.send_msg(msg)
1021 reply = self._client.read_reply()
1023 raise RuntimeError, "Invalid reply: %r" % (reply,)
1024 result = reply.split("|")
1025 code = int(result[0])
1026 text = base64.b64decode(result[1])
1028 raise RuntimeError(text)
1030 def stop(self, time = TIME_NOW):
1031 msg = testbed_messages[STOP]
1032 self._client.send_msg(msg)
1033 reply = self._client.read_reply()
1035 raise RuntimeError, "Invalid reply: %r" % (reply,)
1036 result = reply.split("|")
1037 code = int(result[0])
1038 text = base64.b64decode(result[1])
1040 raise RuntimeError(text)
1042 def set(self, guid, name, value, time = TIME_NOW):
1043 msg = testbed_messages[SET]
1044 type = get_type(value)
1045 # avoid having "|" in this parameters
1046 name = base64.b64encode(name)
1047 value = base64.b64encode(str(value))
1048 msg = msg % (guid, name, value, type, time)
1049 self._client.send_msg(msg)
1050 reply = self._client.read_reply()
1052 raise RuntimeError, "Invalid reply: %r" % (reply,)
1053 result = reply.split("|")
1054 code = int(result[0])
1055 text = base64.b64decode(result[1])
1057 raise RuntimeError(text)
1059 def get(self, guid, name, time = TIME_NOW):
1060 msg = testbed_messages[GET]
1061 # avoid having "|" in this parameters
1062 name = base64.b64encode(name)
1063 msg = msg % (guid, name, time)
1064 self._client.send_msg(msg)
1065 reply = self._client.read_reply()
1067 raise RuntimeError, "Invalid reply: %r" % (reply,)
1068 result = reply.split("|")
1069 code = int(result[0])
1070 text = base64.b64decode(result[1])
1072 raise RuntimeError(text)
1075 def get_address(self, guid, index, attribute):
1076 msg = testbed_messages[GET_ADDRESS]
1077 # avoid having "|" in this parameters
1078 attribute = base64.b64encode(attribute)
1079 msg = msg % (guid, index, attribute)
1080 self._client.send_msg(msg)
1081 reply = self._client.read_reply()
1083 raise RuntimeError, "Invalid reply: %r" % (reply,)
1084 result = reply.split("|")
1085 code = int(result[0])
1086 text = base64.b64decode(result[1])
1088 raise RuntimeError(text)
1091 def get_route(self, guid, index, attribute):
1092 msg = testbed_messages[GET_ROUTE]
1093 # avoid having "|" in this parameters
1094 attribute = base64.b64encode(attribute)
1095 msg = msg % (guid, index, attribute)
1096 self._client.send_msg(msg)
1097 reply = self._client.read_reply()
1099 raise RuntimeError, "Invalid reply: %r" % (reply,)
1100 result = reply.split("|")
1101 code = int(result[0])
1102 text = base64.b64decode(result[1])
1104 raise RuntimeError(text)
1107 def action(self, time, guid, action):
1108 msg = testbed_messages[ACTION]
1109 msg = msg % (time, guid, action)
1110 self._client.send_msg(msg)
1111 reply = self._client.read_reply()
1113 raise RuntimeError, "Invalid reply: %r" % (reply,)
1114 result = reply.split("|")
1115 code = int(result[0])
1116 text = base64.b64decode(result[1])
1118 raise RuntimeError(text)
1120 def status(self, guid = None):
1121 msg = testbed_messages[STATUS]
1123 self._client.send_msg(msg)
1124 reply = self._client.read_reply()
1126 raise RuntimeError, "Invalid reply: %r" % (reply,)
1127 result = reply.split("|")
1128 code = int(result[0])
1129 text = base64.b64decode(result[1])
1131 raise RuntimeError(text)
1134 def trace(self, guid, trace_id, attribute='value'):
1135 msg = testbed_messages[TRACE]
1136 attribute = base64.b64encode(attribute)
1137 msg = msg % (guid, trace_id, attribute)
1138 self._client.send_msg(msg)
1139 reply = self._client.read_reply()
1141 raise RuntimeError, "Invalid reply: %r" % (reply,)
1142 result = reply.split("|")
1143 code = int(result[0])
1144 text = base64.b64decode(result[1])
1146 raise RuntimeError(text)
1149 def get_attribute_list(self, guid):
1150 msg = testbed_messages[GET_ATTRIBUTE_LIST]
1152 self._client.send_msg(msg)
1153 reply = self._client.read_reply()
1155 raise RuntimeError, "Invalid reply: %r" % (reply,)
1156 result = reply.split("|")
1157 code = int(result[0])
1158 text = base64.b64decode(result[1])
1160 raise RuntimeError(text)
1161 attr_list = cPickle.loads(text)
1165 msg = testbed_messages[SHUTDOWN]
1166 self._client.send_msg(msg)
1167 reply = self._client.read_reply()
1169 raise RuntimeError, "Invalid reply: %r" % (reply,)
1170 result = reply.split("|")
1171 code = int(result[0])
1172 text = base64.b64decode(result[1])
1174 raise RuntimeError(text)
1175 self._client.send_stop()
1176 self._client.read_reply() # wait for it
1178 class ExperimentControllerProxy(object):
1179 def __init__(self, root_dir, log_level, experiment_xml = None,
1180 launch = True, host = None, port = None, user = None,
1181 ident_key = None, agent = None, environment_setup = ""):
1184 if experiment_xml == None:
1185 raise RuntimeError("To launch a ExperimentControllerServer a \
1186 xml description of the experiment is required")
1189 xml = experiment_xml
1190 python_code = "from nepi.util.proxy import ExperimentControllerServer;\
1191 s = ExperimentControllerServer(%r, %r, %r);\
1192 s.run()" % (root_dir, log_level, xml)
1193 proc = server.popen_ssh_subprocess(python_code, host = host,
1194 port = port, user = user, agent = agent,
1195 ident_key = ident_key,
1196 environment_setup = environment_setup,
1199 err = proc.stderr.read()
1200 raise RuntimeError("Server could not be executed: %s" % \
1204 s = ExperimentControllerServer(root_dir, log_level, experiment_xml)
1207 # connect client to server
1208 self._client = server.Client(root_dir, host = host, port = port,
1209 user = user, agent = agent,
1210 environment_setup = environment_setup)
1213 def experiment_xml(self):
1214 msg = controller_messages[XML]
1215 self._client.send_msg(msg)
1216 reply = self._client.read_reply()
1218 raise RuntimeError, "Invalid reply: %r" % (reply,)
1219 result = reply.split("|")
1220 code = int(result[0])
1221 text = base64.b64decode(result[1])
1223 raise RuntimeError(text)
1226 def trace(self, testbed_guid, guid, trace_id, attribute='value'):
1227 msg = controller_messages[TRACE]
1228 attribute = base64.b64encode(attribute)
1229 msg = msg % (testbed_guid, guid, trace_id, attribute)
1230 self._client.send_msg(msg)
1231 reply = self._client.read_reply()
1233 raise RuntimeError, "Invalid reply: %r" % (reply,)
1234 result = reply.split("|")
1235 code = int(result[0])
1236 text = base64.b64decode(result[1])
1239 raise RuntimeError(text)
1242 msg = controller_messages[START]
1243 self._client.send_msg(msg)
1244 reply = self._client.read_reply()
1246 raise RuntimeError, "Invalid reply: %r" % (reply,)
1247 result = reply.split("|")
1248 code = int(result[0])
1249 text = base64.b64decode(result[1])
1251 raise RuntimeError(text)
1254 msg = controller_messages[STOP]
1255 self._client.send_msg(msg)
1256 reply = self._client.read_reply()
1258 raise RuntimeError, "Invalid reply: %r" % (reply,)
1259 result = reply.split("|")
1260 code = int(result[0])
1261 text = base64.b64decode(result[1])
1263 raise RuntimeError(text)
1266 msg = controller_messages[RECOVER]
1267 self._client.send_msg(msg)
1268 reply = self._client.read_reply()
1270 raise RuntimeError, "Invalid reply: %r" % (reply,)
1271 result = reply.split("|")
1272 code = int(result[0])
1273 text = base64.b64decode(result[1])
1275 raise RuntimeError(text)
1277 def is_finished(self, guid):
1278 msg = controller_messages[FINISHED]
1280 self._client.send_msg(msg)
1281 reply = self._client.read_reply()
1283 raise RuntimeError, "Invalid reply: %r" % (reply,)
1284 result = reply.split("|")
1285 code = int(result[0])
1286 text = base64.b64decode(result[1])
1288 raise RuntimeError(text)
1289 return text == "True"
1291 def set(self, testbed_guid, guid, name, value, time = TIME_NOW):
1292 msg = testbed_messages[EXPERIMENT_SET]
1293 type = get_type(value)
1294 # avoid having "|" in this parameters
1295 name = base64.b64encode(name)
1296 value = base64.b64encode(str(value))
1297 msg = msg % (testbed_guid, guid, name, value, type, time)
1298 self._client.send_msg(msg)
1299 reply = self._client.read_reply()
1301 raise RuntimeError, "Invalid reply: %r" % (reply,)
1302 result = reply.split("|")
1303 code = int(result[0])
1304 text = base64.b64decode(result[1])
1306 raise RuntimeError(text)
1308 def get(self, testbed_guid, guid, name, time = TIME_NOW):
1309 msg = testbed_messages[EXPERIMENT_GET]
1310 # avoid having "|" in this parameters
1311 name = base64.b64encode(name)
1312 msg = msg % (testbed_guid, guid, name, time)
1313 self._client.send_msg(msg)
1314 reply = self._client.read_reply()
1316 raise RuntimeError, "Invalid reply: %r" % (reply,)
1317 result = reply.split("|")
1318 code = int(result[0])
1319 text = base64.b64decode(result[1])
1321 raise RuntimeError(text)
1325 msg = controller_messages[SHUTDOWN]
1326 self._client.send_msg(msg)
1327 reply = self._client.read_reply()
1329 raise RuntimeError, "Invalid reply: %r" % (reply,)
1330 result = reply.split("|")
1331 code = int(result[0])
1332 text = base64.b64decode(result[1])
1334 raise RuntimeError(text)
1335 self._client.send_stop()
1336 self._client.read_reply() # wait for it