2 # -*- coding: utf-8 -*-
5 import nepi.core.execute
6 import nepi.util.environ
7 from nepi.core.attributes import AttributesMap, Attribute
8 from nepi.util import server, validation
9 from nepi.util.constants import TIME_NOW, ATTR_NEPI_TESTBED_ENVIRONMENT_SETUP, DeploymentConfiguration as DC
23 # PROTOCOL INSTRUCTION MESSAGES
43 DO_CROSS_CONNECT_INIT = 22
53 GET_ATTRIBUTE_LIST = 32
55 DO_CROSS_CONNECT_COMPL = 34
61 GET_TESTBED_VERSION = 40
68 instruction_text = dict({
79 CONFIGURE: "CONFIGURE",
81 CREATE_SET: "CREATE_SET",
82 FACTORY_SET: "FACTORY_SET",
84 CROSS_CONNECT: "CROSS_CONNECT",
85 ADD_TRACE: "ADD_TRACE",
86 ADD_ADDRESS: "ADD_ADDRESS",
87 ADD_ROUTE: "ADD_ROUTE",
89 DO_CREATE: "DO_CREATE",
90 DO_CONNECT_INIT: "DO_CONNECT_INIT",
91 DO_CONNECT_COMPL: "DO_CONNECT_COMPL",
92 DO_CONFIGURE: "DO_CONFIGURE",
93 DO_PRECONFIGURE: "DO_PRECONFIGURE",
94 DO_CROSS_CONNECT_INIT: "DO_CROSS_CONNECT_INIT",
95 DO_CROSS_CONNECT_COMPL: "DO_CROSS_CONNECT_COMPL",
98 GET_ROUTE: "GET_ROUTE",
99 GET_ADDRESS: "GET_ADDRESS",
100 GET_ATTRIBUTE_LIST: "GET_ATTRIBUTE_LIST",
101 GET_FACTORY_ID: "GET_FACTORY_ID",
102 GET_TESTBED_ID: "GET_TESTBED_ID",
103 GET_TESTBED_VERSION: "GET_TESTBED_VERSION",
107 TESTBED_ID: "TESTBED_ID",
108 TESTBED_VERSION: "TESTBED_VERSION",
109 TRACES_INFO: "TRACES_INFO",
110 STARTED_TIME: "STARTED_TIME",
111 STOPPED_TIME: "STOPPED_TIME",
115 def log_msg(server, params):
117 instr = int(params[0])
118 instr_txt = instruction_text[instr]
119 server.log_debug("%s - msg: %s [%s]" % (server.__class__.__name__,
120 instr_txt, ", ".join(map(str, params[1:]))))
122 # don't die for logging
125 def log_reply(server, reply):
127 res = reply.split("|")
129 code_txt = instruction_text[code]
131 txt = base64.b64decode(res[1])
134 server.log_debug("%s - reply: %s %s" % (server.__class__.__name__,
137 # don't die for logging
138 server.log_debug("%s - reply: %s" % (server.__class__.__name__,
142 def to_server_log_level(log_level):
145 if log_level == DC.DEBUG_LEVEL
146 else server.ERROR_LEVEL
149 def get_access_config_params(access_config):
150 root_dir = access_config.get_attribute_value(DC.ROOT_DIRECTORY)
151 log_level = access_config.get_attribute_value(DC.LOG_LEVEL)
152 log_level = to_server_log_level(log_level)
153 user = host = port = agent = key = sudo = None
154 communication = access_config.get_attribute_value(DC.DEPLOYMENT_COMMUNICATION)
155 environment_setup = (
156 access_config.get_attribute_value(DC.DEPLOYMENT_ENVIRONMENT_SETUP)
157 if access_config.has_attribute(DC.DEPLOYMENT_ENVIRONMENT_SETUP)
160 user = access_config.get_attribute_value(DC.DEPLOYMENT_USER)
161 host = access_config.get_attribute_value(DC.DEPLOYMENT_HOST)
162 port = access_config.get_attribute_value(DC.DEPLOYMENT_PORT)
163 agent = access_config.get_attribute_value(DC.USE_AGENT)
164 sudo = access_config.get_attribute_value(DC.USE_SUDO)
165 key = access_config.get_attribute_value(DC.DEPLOYMENT_KEY)
166 communication = access_config.get_attribute_value(DC.DEPLOYMENT_COMMUNICATION)
167 return (root_dir, log_level, communication, user, host, port, key, agent,
168 sudo, environment_setup)
170 class AccessConfiguration(AttributesMap):
171 def __init__(self, params = None):
172 super(AccessConfiguration, self).__init__()
174 from nepi.core.metadata import Metadata
176 for _,attr_info in Metadata.PROXY_ATTRIBUTES.iteritems():
177 self.add_attribute(**attr_info)
180 for attr_name, attr_value in params.iteritems():
181 parser = Attribute.type_parsers[self.get_attribute_type(attr_name)]
182 attr_value = parser(attr_value)
183 self.set_attribute_value(attr_name, attr_value)
185 class TempDir(object):
187 self.path = tempfile.mkdtemp()
190 shutil.rmtree(self.path)
192 class PermDir(object):
193 def __init__(self, path):
196 def create_experiment_controller(xml, access_config = None):
197 mode = None if not access_config \
198 else access_config.get_attribute_value(DC.DEPLOYMENT_MODE)
199 launch = True if not access_config \
200 else not access_config.get_attribute_value(DC.RECOVER)
201 if not mode or mode == DC.MODE_SINGLE_PROCESS:
202 from nepi.core.execute import ExperimentController
204 if not access_config or not access_config.has_attribute(DC.ROOT_DIRECTORY):
207 root_dir = PermDir(access_config.get_attribute_value(DC.ROOT_DIRECTORY))
208 controller = ExperimentController(xml, root_dir.path)
210 # inject reference to temporary dir, so that it gets cleaned
211 # up at destruction time.
212 controller._tempdir = root_dir
219 elif mode == DC.MODE_DAEMON:
220 (root_dir, log_level, communication, user, host, port, key, agent,
221 sudo, environment_setup) = get_access_config_params(access_config)
223 return ExperimentControllerProxy(root_dir, log_level,
224 experiment_xml = xml,
225 communication = communication,
233 environment_setup = environment_setup)
236 # Maybe controller died, recover from persisted testbed information if possible
237 controller = ExperimentControllerProxy(root_dir, log_level,
238 experiment_xml = xml,
239 communication = communication,
247 environment_setup = environment_setup)
252 raise RuntimeError("Unsupported access configuration '%s'" % mode)
254 def create_testbed_controller(testbed_id, testbed_version, access_config):
255 mode = None if not access_config \
256 else access_config.get_attribute_value(DC.DEPLOYMENT_MODE)
257 launch = True if not access_config \
258 else not access_config.get_attribute_value(DC.RECOVER)
259 if not mode or mode == DC.MODE_SINGLE_PROCESS:
261 raise ValueError, "Unsupported instantiation mode: %s with lanch=False" % (mode,)
262 return _build_testbed_controller(testbed_id, testbed_version)
263 elif mode == DC.MODE_DAEMON:
264 (root_dir, log_level, communication, user, host, port, key, agent,
265 sudo, environment_setup) = get_access_config_params(access_config)
266 return TestbedControllerProxy(root_dir, log_level,
267 testbed_id = testbed_id,
268 testbed_version = testbed_version,
269 communication = communication,
277 environment_setup = environment_setup)
278 raise RuntimeError("Unsupported access configuration '%s'" % mode)
280 def _build_testbed_controller(testbed_id, testbed_version):
281 mod_name = nepi.util.environ.find_testbed(testbed_id)
283 if not mod_name in sys.modules:
287 raise ImportError, "Cannot find module %s in %r" % (mod_name, sys.path)
289 module = sys.modules[mod_name]
290 tc = module.TestbedController()
291 if tc.testbed_version != testbed_version:
292 raise RuntimeError("Bad testbed version on testbed %s. Asked for %s, got %s" % \
293 (testbed_id, testbed_version, tc.testbed_version))
296 # Just a namespace class
300 def pickled_data(sdata):
301 return cPickle.loads(base64.b64decode(sdata))
304 def base64_data(sdata):
305 return base64.b64decode(sdata)
309 return None if sdata == "None" else int(sdata)
313 return sdata == 'True'
317 def pickled_data(data):
318 return base64.b64encode(cPickle.dumps(data))
321 def base64_data(data):
322 return base64.b64encode(data)
326 return "None" if data is None else int(data)
330 return str(bool(data))
332 # import into Marshalling all the decoders
336 for typname, typ in vars(Decoders).iteritems()
337 if not typname.startswith('_')
340 _TYPE_ENCODERS = dict([
341 # id(type) -> (<encoding_function>, <formatting_string>)
342 (typname, (getattr(Encoders,typname),"%s"))
343 for typname in vars(Decoders)
344 if not typname.startswith('_')
345 and hasattr(Encoders,typname)
349 _TYPE_ENCODERS["float"] = (float, "%r")
350 _TYPE_ENCODERS["int"] = (int, "%d")
351 _TYPE_ENCODERS["long"] = (int, "%d")
352 _TYPE_ENCODERS["str"] = (str, "%s")
353 _TYPE_ENCODERS["unicode"] = (str, "%s")
356 _TYPE_ENCODERS[None] = (str, "%s")
361 Decorator that converts the given function into one that takes
362 a single "params" list, with each parameter marshalled according
363 to the given factory callable (type constructors are accepted).
365 The first argument (self) is left untouched.
369 @Marshalling.args(int,int,str,base64_data)
370 def somefunc(self, someint, otherint, somestr, someb64):
375 def rv(self, params):
376 return f(self, *[ ctor(val)
377 for ctor,val in zip(types, params[1:]) ])
381 # Derive type encoders by looking up types in _TYPE_ENCODERS
382 # make_proxy will use it to encode arguments in command strings
384 TYPE_ENCODERS = Marshalling._TYPE_ENCODERS
386 if typ.__name__ in TYPE_ENCODERS:
387 argencoders.append(TYPE_ENCODERS[typ.__name__])
390 argencoders.append(TYPE_ENCODERS[None])
392 rv._argencoders = tuple(argencoders)
394 rv._retval = getattr(f, '_retval', None)
399 def retval(typ=Decoders.base64_data):
401 Decorator that converts the given function into one that
402 returns a properly encoded return string, given that the undecorated
403 function returns suitable input for the encoding function.
405 The optional typ argument specifies a type.
406 For the default of base64_data, return values should be strings.
407 The return value of the encoding method should be a string always.
411 @Marshalling.args(int,int,str,base64_data)
412 @Marshalling.retval(str)
413 def somefunc(self, someint, otherint, somestr, someb64):
416 encode, fmt = Marshalling._TYPE_ENCODERS.get(
418 Marshalling._TYPE_ENCODERS[None])
423 def rv(self, *p, **kw):
424 data = f(self, *p, **kw)
430 rv._argtypes = getattr(f, '_argtypes', None)
431 rv._argencoders = getattr(f, '_argencoders', None)
438 Decorator that converts the given function into one that
439 always return an encoded empty string.
441 Useful for null-returning functions.
446 def rv(self, *p, **kw):
451 rv._argtypes = getattr(f, '_argtypes', None)
452 rv._argencoders = getattr(f, '_argencoders', None)
456 def handles(whichcommand):
458 Associates the method with a given command code for servers.
459 It should always be the topmost decorator.
462 f._handles_command = whichcommand
466 class BaseServer(server.Server):
467 def reply_action(self, msg):
469 result = base64.b64encode("Invalid command line")
470 reply = "%d|%s" % (ERROR, result)
472 params = msg.split("|")
473 instruction = int(params[0])
474 log_msg(self, params)
476 for mname,meth in vars(self.__class__).iteritems():
477 if not mname.startswith('_'):
478 cmd = getattr(meth, '_handles_command', None)
479 if cmd == instruction:
480 meth = getattr(self, mname)
484 error = "Invalid instruction %s" % instruction
485 self.log_error(error)
486 result = base64.b64encode(error)
487 reply = "%d|%s" % (ERROR, result)
489 error = self.log_error()
490 result = base64.b64encode(error)
491 reply = "%d|%s" % (ERROR, result)
492 log_reply(self, reply)
495 class TestbedControllerServer(BaseServer):
496 def __init__(self, root_dir, log_level, testbed_id, testbed_version, environment_setup):
497 super(TestbedControllerServer, self).__init__(root_dir, log_level,
498 environment_setup = environment_setup )
499 self._testbed_id = testbed_id
500 self._testbed_version = testbed_version
503 def post_daemonize(self):
504 self._testbed = _build_testbed_controller(self._testbed_id,
505 self._testbed_version)
507 @Marshalling.handles(GUIDS)
509 @Marshalling.retval( Marshalling.pickled_data )
511 return self._testbed.guids
513 @Marshalling.handles(TESTBED_ID)
515 @Marshalling.retval()
516 def testbed_id(self):
517 return str(self._testbed.testbed_id)
519 @Marshalling.handles(TESTBED_VERSION)
521 @Marshalling.retval()
522 def testbed_version(self):
523 return str(self._testbed.testbed_version)
525 @Marshalling.handles(CREATE)
526 @Marshalling.args(int, str)
528 def defer_create(self, guid, factory_id):
529 self._testbed.defer_create(guid, factory_id)
531 @Marshalling.handles(TRACE)
532 @Marshalling.args(int, str, Marshalling.base64_data)
533 @Marshalling.retval()
534 def trace(self, guid, trace_id, attribute):
535 return self._testbed.trace(guid, trace_id, attribute)
537 @Marshalling.handles(TRACES_INFO)
539 @Marshalling.retval( Marshalling.pickled_data )
540 def traces_info(self):
541 return self._testbed.traces_info()
543 @Marshalling.handles(START)
547 self._testbed.start()
549 @Marshalling.handles(STOP)
555 @Marshalling.handles(SHUTDOWN)
559 self._testbed.shutdown()
561 @Marshalling.handles(CONFIGURE)
562 @Marshalling.args(Marshalling.base64_data, Marshalling.pickled_data)
564 def defer_configure(self, name, value):
565 self._testbed.defer_configure(name, value)
567 @Marshalling.handles(CREATE_SET)
568 @Marshalling.args(int, Marshalling.base64_data, Marshalling.pickled_data)
570 def defer_create_set(self, guid, name, value):
571 self._testbed.defer_create_set(guid, name, value)
573 @Marshalling.handles(FACTORY_SET)
574 @Marshalling.args(Marshalling.base64_data, Marshalling.pickled_data)
576 def defer_factory_set(self, name, value):
577 self._testbed.defer_factory_set(name, value)
579 @Marshalling.handles(CONNECT)
580 @Marshalling.args(int, str, int, str)
582 def defer_connect(self, guid1, connector_type_name1, guid2, connector_type_name2):
583 self._testbed.defer_connect(guid1, connector_type_name1, guid2,
584 connector_type_name2)
586 @Marshalling.handles(CROSS_CONNECT)
587 @Marshalling.args(int, str, int, int, str, str, str)
589 def defer_cross_connect(self,
590 guid, connector_type_name,
591 cross_guid, cross_testbed_guid,
592 cross_testbed_id, cross_factory_id,
593 cross_connector_type_name):
594 self._testbed.defer_cross_connect(guid, connector_type_name, cross_guid,
595 cross_testbed_guid, cross_testbed_id, cross_factory_id,
596 cross_connector_type_name)
598 @Marshalling.handles(ADD_TRACE)
599 @Marshalling.args(int, str)
601 def defer_add_trace(self, guid, trace_id):
602 self._testbed.defer_add_trace(guid, trace_id)
604 @Marshalling.handles(ADD_ADDRESS)
605 @Marshalling.args(int, str, int, Marshalling.pickled_data)
607 def defer_add_address(self, guid, address, netprefix, broadcast):
608 self._testbed.defer_add_address(guid, address, netprefix,
611 @Marshalling.handles(ADD_ROUTE)
612 @Marshalling.args(int, str, int, str, int)
614 def defer_add_route(self, guid, destination, netprefix, nexthop, metric):
615 self._testbed.defer_add_route(guid, destination, netprefix, nexthop, metric)
617 @Marshalling.handles(DO_SETUP)
621 self._testbed.do_setup()
623 @Marshalling.handles(DO_CREATE)
627 self._testbed.do_create()
629 @Marshalling.handles(DO_CONNECT_INIT)
632 def do_connect_init(self):
633 self._testbed.do_connect_init()
635 @Marshalling.handles(DO_CONNECT_COMPL)
638 def do_connect_compl(self):
639 self._testbed.do_connect_compl()
641 @Marshalling.handles(DO_CONFIGURE)
644 def do_configure(self):
645 self._testbed.do_configure()
647 @Marshalling.handles(DO_PRECONFIGURE)
650 def do_preconfigure(self):
651 self._testbed.do_preconfigure()
653 @Marshalling.handles(DO_PRESTART)
656 def do_prestart(self):
657 self._testbed.do_prestart()
659 @Marshalling.handles(DO_CROSS_CONNECT_INIT)
660 @Marshalling.args( Marshalling.Decoders.pickled_data )
662 def do_cross_connect_init(self, cross_data):
663 self._testbed.do_cross_connect_init(cross_data)
665 @Marshalling.handles(DO_CROSS_CONNECT_COMPL)
666 @Marshalling.args( Marshalling.Decoders.pickled_data )
668 def do_cross_connect_compl(self, cross_data):
669 self._testbed.do_cross_connect_compl(cross_data)
671 @Marshalling.handles(GET)
672 @Marshalling.args(int, Marshalling.base64_data, str)
673 @Marshalling.retval( Marshalling.pickled_data )
674 def get(self, guid, name, time):
675 return self._testbed.get(guid, name, time)
677 @Marshalling.handles(SET)
678 @Marshalling.args(int, Marshalling.base64_data, Marshalling.pickled_data, str)
680 def set(self, guid, name, value, time):
681 self._testbed.set(guid, name, value, time)
683 @Marshalling.handles(GET_ADDRESS)
684 @Marshalling.args(int, int, Marshalling.base64_data)
685 @Marshalling.retval()
686 def get_address(self, guid, index, attribute):
687 return str(self._testbed.get_address(guid, index, attribute))
689 @Marshalling.handles(GET_ROUTE)
690 @Marshalling.args(int, int, Marshalling.base64_data)
691 @Marshalling.retval()
692 def get_route(self, guid, index, attribute):
693 return str(self._testbed.get_route(guid, index, attribute))
695 @Marshalling.handles(ACTION)
696 @Marshalling.args(str, int, Marshalling.base64_data)
698 def action(self, time, guid, command):
699 self._testbed.action(time, guid, command)
701 @Marshalling.handles(STATUS)
702 @Marshalling.args(Marshalling.nullint)
703 @Marshalling.retval(int)
704 def status(self, guid):
705 return self._testbed.status(guid)
707 @Marshalling.handles(TESTBED_STATUS)
709 @Marshalling.retval(int)
710 def testbed_status(self):
711 return self._testbed.testbed_status()
713 @Marshalling.handles(GET_ATTRIBUTE_LIST)
714 @Marshalling.args(int, Marshalling.nullint, Marshalling.bool)
715 @Marshalling.retval( Marshalling.pickled_data )
716 def get_attribute_list(self, guid, filter_flags = None, exclude = False):
717 return self._testbed.get_attribute_list(guid, filter_flags, exclude)
719 @Marshalling.handles(GET_FACTORY_ID)
720 @Marshalling.args(int)
721 @Marshalling.retval()
722 def get_factory_id(self, guid):
723 return self._testbed.get_factory_id(guid)
725 @Marshalling.handles(RECOVER)
729 self._testbed.recover()
732 class ExperimentControllerServer(BaseServer):
733 def __init__(self, root_dir, log_level, experiment_xml, environment_setup):
734 super(ExperimentControllerServer, self).__init__(root_dir, log_level,
735 environment_setup = environment_setup )
736 self._experiment_xml = experiment_xml
737 self._experiment = None
739 def post_daemonize(self):
740 from nepi.core.execute import ExperimentController
741 self._experiment = ExperimentController(self._experiment_xml,
742 root_dir = self._root_dir)
744 @Marshalling.handles(GUIDS)
746 @Marshalling.retval( Marshalling.pickled_data )
748 return self._experiment.guids
750 @Marshalling.handles(STARTED_TIME)
752 @Marshalling.retval( Marshalling.pickled_data )
753 def started_time(self):
754 return self._experiment.started_time
756 @Marshalling.handles(STOPPED_TIME)
758 @Marshalling.retval( Marshalling.pickled_data )
759 def stopped_time(self):
760 return self._experiment.stopped_time
762 @Marshalling.handles(XML)
764 @Marshalling.retval()
765 def experiment_design_xml(self):
766 return self._experiment.experiment_design_xml
768 @Marshalling.handles(EXEC_XML)
770 @Marshalling.retval()
771 def experiment_execute_xml(self):
772 return self._experiment.experiment_execute_xml
774 @Marshalling.handles(TRACE)
775 @Marshalling.args(int, str, Marshalling.base64_data)
776 @Marshalling.retval()
777 def trace(self, guid, trace_id, attribute):
778 return str(self._experiment.trace(guid, trace_id, attribute))
780 @Marshalling.handles(TRACES_INFO)
782 @Marshalling.retval( Marshalling.pickled_data )
783 def traces_info(self):
784 return self._experiment.traces_info()
786 @Marshalling.handles(FINISHED)
787 @Marshalling.args(int)
788 @Marshalling.retval(Marshalling.bool)
789 def is_finished(self, guid):
790 return self._experiment.is_finished(guid)
792 @Marshalling.handles(STATUS)
793 @Marshalling.args(int)
794 @Marshalling.retval(int)
795 def status(self, guid):
796 return self._experiment.is_finished(guid)
798 @Marshalling.handles(GET)
799 @Marshalling.args(int, Marshalling.base64_data, str)
800 @Marshalling.retval( Marshalling.pickled_data )
801 def get(self, guid, name, time):
802 return self._experiment.get(guid, name, time)
804 @Marshalling.handles(SET)
805 @Marshalling.args(int, Marshalling.base64_data, Marshalling.pickled_data, str)
807 def set(self, guid, name, value, time):
808 self._experiment.set(guid, name, value, time)
810 @Marshalling.handles(START)
814 self._experiment.start()
816 @Marshalling.handles(STOP)
820 self._experiment.stop()
822 @Marshalling.handles(RECOVER)
826 self._experiment.recover()
828 @Marshalling.handles(SHUTDOWN)
832 self._experiment.shutdown()
834 @Marshalling.handles(GET_TESTBED_ID)
835 @Marshalling.args(int)
836 @Marshalling.retval()
837 def get_testbed_id(self, guid):
838 return self._experiment.get_testbed_id(guid)
840 @Marshalling.handles(GET_FACTORY_ID)
841 @Marshalling.args(int)
842 @Marshalling.retval()
843 def get_factory_id(self, guid):
844 return self._experiment.get_factory_id(guid)
846 @Marshalling.handles(GET_TESTBED_VERSION)
847 @Marshalling.args(int)
848 @Marshalling.retval()
849 def get_testbed_version(self, guid):
850 return self._experiment.get_testbed_version(guid)
852 class BaseProxy(object):
854 _ServerClassModule = "nepi.util.proxy"
856 def __init__(self, ctor_args, root_dir,
858 communication = DC.ACCESS_LOCAL,
865 environment_setup = ""):
868 "from %(classmodule)s import %(classname)s;"
869 "s = %(classname)s%(ctor_args)r;"
872 classname = self._ServerClass.__name__,
873 classmodule = self._ServerClassModule,
874 ctor_args = ctor_args
876 proc = server.popen_python(python_code,
877 communication = communication,
882 ident_key = ident_key,
884 environment_setup = environment_setup)
885 # Wait for the server to be ready, otherwise nobody
886 # will be able to connect to it
887 helo = proc.stderr.readline()
888 if helo != 'SERVER_READY.\n':
889 raise AssertionError, "Expected 'SERVER_READY.', got %r: %s" % (helo,
890 helo + proc.stderr.read())
891 # connect client to server
892 self._client = server.Client(root_dir,
893 communication = communication,
899 environment_setup = environment_setup)
902 def _make_message(argtypes, argencoders, command, methname, classname, *args):
903 if len(argtypes) != len(argencoders):
904 raise ValueError, "Invalid arguments for _make_message: "\
905 "in stub method %s of class %s "\
906 "argtypes and argencoders must match in size" % (
907 methname, classname )
908 if len(argtypes) != len(args):
909 raise ValueError, "Invalid arguments for _make_message: "\
910 "in stub method %s of class %s "\
911 "expected %d arguments, got %d" % (
913 len(argtypes), len(args))
916 for argnum, (typ, (encode, fmt), val) in enumerate(zip(argtypes, argencoders, args)):
918 buf.append(fmt % encode(val))
921 raise TypeError, "Argument %d of stub method %s of class %s "\
922 "requires a value of type %s, but got %s - nested error: %s" % (
923 argnum, methname, classname,
924 getattr(typ, '__name__', typ), type(val),
925 traceback.format_exc()
928 return "%d|%s" % (command, '|'.join(buf))
931 def _parse_reply(rvtype, methname, classname, reply):
933 raise RuntimeError, "Invalid reply: %r "\
934 "for stub method %s of class %s" % (
940 result = reply.split("|")
941 code = int(result[0])
945 raise TypeError, "Return value of stub method %s of class %s "\
946 "cannot be parsed: must be of type %s, got %r - nested error: %s" % (
948 getattr(rvtype, '__name__', rvtype), reply,
949 traceback.format_exc()
952 text = base64.b64decode(text)
953 raise RuntimeError(text)
962 raise TypeError, "Return value of stub method %s of class %s "\
963 "cannot be parsed: must be of type %s - nested error: %s" % (
965 getattr(rvtype, '__name__', rvtype),
966 traceback.format_exc()
969 raise RuntimeError, "Invalid reply: %r "\
970 "for stub method %s of class %s - unknown code" % (
976 def _make_stubs(server_class, template_class):
978 Returns a dictionary method_name -> method
983 class SomeProxy(BaseProxy):
986 locals().update( BaseProxy._make_stubs(
991 ServerClass is the corresponding Server class, as
992 specified in the _ServerClass class method (_make_stubs
993 is static and can't access the method), and TemplateClass
994 is the ultimate implementation class behind the server,
995 from which argument names and defaults are taken, to
996 maintain meaningful interfaces.
1003 func_template_path = os.path.join(
1004 os.path.dirname(__file__),
1006 func_template_file = open(func_template_path, "r")
1007 func_template = func_template_file.read()
1008 func_template_file.close()
1010 for methname in vars(template_class).copy():
1011 if methname.endswith('_deferred'):
1012 # cannot wrap deferreds...
1014 dmethname = methname+'_deferred'
1015 if hasattr(server_class, methname) and not methname.startswith('_'):
1016 template_meth = getattr(template_class, methname)
1017 server_meth = getattr(server_class, methname)
1019 command = getattr(server_meth, '_handles_command', None)
1020 argtypes = getattr(server_meth, '_argtypes', None)
1021 argencoders = getattr(server_meth, '_argencoders', None)
1022 rvtype = getattr(server_meth, '_retval', None)
1025 if hasattr(template_meth, 'fget'):
1027 template_meth = template_meth.fget
1030 if command is not None and argtypes is not None and argencoders is not None:
1031 # We have an interface method...
1032 code = template_meth.func_code
1033 argnames = code.co_varnames[:code.co_argcount]
1034 argdefaults = ( (NONE,) * (len(argnames) - len(template_meth.func_defaults or ()))
1035 + (template_meth.func_defaults or ()) )
1037 func_globals = dict(
1038 BaseProxy = BaseProxy,
1039 argtypes = argtypes,
1040 argencoders = argencoders,
1042 functools = functools,
1046 func_text = func_template % dict(
1048 args = '%s' % (','.join(argnames[1:])),
1049 argdefs = ','.join([
1050 argname if argdef is NONE
1051 else "%s=%r" % (argname, argdef)
1052 for argname, argdef in zip(argnames[1:], argdefaults[1:])
1055 methname = methname,
1056 classname = server_class.__name__
1059 func_text = compile(
1064 exec func_text in func_globals, context
1067 rv[methname] = property(context[methname])
1068 rv[dmethname] = property(context[dmethname])
1070 rv[methname] = context[methname]
1071 rv[dmethname] = context[dmethname]
1073 # inject _deferred into core classes
1074 if hasattr(template_class, methname) and not hasattr(template_class, dmethname):
1075 def freezename(methname, dmethname):
1076 def dmeth(self, *p, **kw):
1077 return getattr(self, methname)(*p, **kw)
1078 dmeth.__name__ = dmethname
1080 dmeth = freezename(methname, dmethname)
1081 setattr(template_class, dmethname, dmeth)
1085 class TestbedControllerProxy(BaseProxy):
1087 _ServerClass = TestbedControllerServer
1089 def __init__(self, root_dir, log_level,
1091 testbed_version = None,
1093 communication = DC.ACCESS_LOCAL,
1100 environment_setup = ""):
1101 if launch and (testbed_id == None or testbed_version == None):
1102 raise RuntimeError("To launch a TesbedControllerServer a "
1103 "testbed_id and testbed_version are required")
1104 super(TestbedControllerProxy,self).__init__(
1105 ctor_args = (root_dir, log_level, testbed_id, testbed_version, environment_setup),
1106 root_dir = root_dir,
1108 communication = communication,
1112 ident_key = ident_key,
1115 environment_setup = environment_setup)
1117 locals().update( BaseProxy._make_stubs(
1118 server_class = TestbedControllerServer,
1119 template_class = nepi.core.execute.TestbedController,
1122 # Shutdown stops the serverside...
1123 def shutdown(self, _stub = shutdown):
1125 self._client.send_stop()
1126 self._client.read_reply() # wait for it
1130 class ExperimentControllerProxy(BaseProxy):
1131 _ServerClass = ExperimentControllerServer
1133 def __init__(self, root_dir, log_level,
1134 experiment_xml = None,
1136 communication = DC.ACCESS_LOCAL,
1143 environment_setup = ""):
1144 super(ExperimentControllerProxy,self).__init__(
1145 ctor_args = (root_dir, log_level, experiment_xml, environment_setup),
1146 root_dir = root_dir,
1148 communication = communication,
1152 ident_key = ident_key,
1155 environment_setup = environment_setup)
1157 locals().update( BaseProxy._make_stubs(
1158 server_class = ExperimentControllerServer,
1159 template_class = nepi.core.execute.ExperimentController,
1163 # Shutdown stops the serverside...
1164 def shutdown(self, _stub = shutdown):
1166 self._client.send_stop()
1167 self._client.read_reply() # wait for it