2 # -*- coding: utf-8 -*-
5 import nepi.core.execute
6 import nepi.util.environ
7 from nepi.core.attributes import AttributesMap, Attribute
8 from nepi.util import server, validation
9 from nepi.util.constants import TIME_NOW, ATTR_NEPI_TESTBED_ENVIRONMENT_SETUP, DeploymentConfiguration as DC
23 # PROTOCOL INSTRUCTION MESSAGES
43 DO_CROSS_CONNECT_INIT = 22
53 GET_ATTRIBUTE_LIST = 32
55 DO_CROSS_CONNECT_COMPL = 34
61 GET_TESTBED_VERSION = 40
68 instruction_text = dict({
79 CONFIGURE: "CONFIGURE",
81 CREATE_SET: "CREATE_SET",
82 FACTORY_SET: "FACTORY_SET",
84 CROSS_CONNECT: "CROSS_CONNECT",
85 ADD_TRACE: "ADD_TRACE",
86 ADD_ADDRESS: "ADD_ADDRESS",
87 ADD_ROUTE: "ADD_ROUTE",
89 DO_CREATE: "DO_CREATE",
90 DO_CONNECT_INIT: "DO_CONNECT_INIT",
91 DO_CONNECT_COMPL: "DO_CONNECT_COMPL",
92 DO_CONFIGURE: "DO_CONFIGURE",
93 DO_PRECONFIGURE: "DO_PRECONFIGURE",
94 DO_CROSS_CONNECT_INIT: "DO_CROSS_CONNECT_INIT",
95 DO_CROSS_CONNECT_COMPL: "DO_CROSS_CONNECT_COMPL",
98 GET_ROUTE: "GET_ROUTE",
99 GET_ADDRESS: "GET_ADDRESS",
100 GET_ATTRIBUTE_LIST: "GET_ATTRIBUTE_LIST",
101 GET_FACTORY_ID: "GET_FACTORY_ID",
102 GET_TESTBED_ID: "GET_TESTBED_ID",
103 GET_TESTBED_VERSION: "GET_TESTBED_VERSION",
107 TESTBED_ID: "TESTBED_ID",
108 TESTBED_VERSION: "TESTBED_VERSION",
109 TRACES_INFO: "TRACES_INFO",
110 STARTED_TIME: "STARTED_TIME",
111 STOPPED_TIME: "STOPPED_TIME",
115 def log_msg(server, params):
117 instr = int(params[0])
118 instr_txt = instruction_text[instr]
119 server.log_debug("%s - msg: %s [%s]" % (server.__class__.__name__,
120 instr_txt, ", ".join(map(str, params[1:]))))
122 # don't die for logging
125 def log_reply(server, reply):
127 res = reply.split("|")
129 code_txt = instruction_text[code]
131 txt = base64.b64decode(res[1])
134 server.log_debug("%s - reply: %s %s" % (server.__class__.__name__,
137 # don't die for logging
138 server.log_debug("%s - reply: %s" % (server.__class__.__name__,
142 def to_server_log_level(log_level):
145 if log_level == DC.DEBUG_LEVEL
149 def get_access_config_params(access_config):
150 mode = access_config.get_attribute_value(DC.DEPLOYMENT_MODE)
151 launch = not access_config.get_attribute_value(DC.RECOVER)
152 root_dir = access_config.get_attribute_value(DC.ROOT_DIRECTORY)
153 log_level = access_config.get_attribute_value(DC.LOG_LEVEL)
154 log_level = to_server_log_level(log_level)
155 communication = access_config.get_attribute_value(DC.DEPLOYMENT_COMMUNICATION)
156 environment_setup = (
157 access_config.get_attribute_value(DC.DEPLOYMENT_ENVIRONMENT_SETUP)
158 if access_config.has_attribute(DC.DEPLOYMENT_ENVIRONMENT_SETUP)
161 user = access_config.get_attribute_value(DC.DEPLOYMENT_USER)
162 host = access_config.get_attribute_value(DC.DEPLOYMENT_HOST)
163 port = access_config.get_attribute_value(DC.DEPLOYMENT_PORT)
164 agent = access_config.get_attribute_value(DC.USE_AGENT)
165 sudo = access_config.get_attribute_value(DC.USE_SUDO)
166 key = access_config.get_attribute_value(DC.DEPLOYMENT_KEY)
167 communication = access_config.get_attribute_value(DC.DEPLOYMENT_COMMUNICATION)
168 clean_root = access_config.get_attribute_value(DC.CLEAN_ROOT)
169 return (mode, launch, root_dir, log_level, communication, user, host, port,
170 key, agent, sudo, environment_setup, clean_root)
172 class AccessConfiguration(AttributesMap):
173 def __init__(self, params = None):
174 super(AccessConfiguration, self).__init__()
176 from nepi.core.metadata import Metadata
178 for _,attr_info in Metadata.PROXY_ATTRIBUTES.iteritems():
179 self.add_attribute(**attr_info)
182 for attr_name, attr_value in params.iteritems():
183 parser = Attribute.type_parsers[self.get_attribute_type(attr_name)]
184 attr_value = parser(attr_value)
185 self.set_attribute_value(attr_name, attr_value)
187 class TempDir(object):
189 self.path = tempfile.mkdtemp()
192 shutil.rmtree(self.path)
194 class PermDir(object):
195 def __init__(self, path):
198 def create_experiment_controller(xml, access_config = None):
201 log_level = DC.ERROR_LEVEL
203 (mode, launch, root_dir, log_level, communication, user, host, port,
204 key, agent, sudo, environment_setup, clean_root) \
205 = get_access_config_params(access_config)
207 os.environ["NEPI_CONTROLLER_LOGLEVEL"] = log_level
209 if not mode or mode == DC.MODE_SINGLE_PROCESS:
210 from nepi.core.execute import ExperimentController
212 if not access_config or not access_config.has_attribute(DC.ROOT_DIRECTORY):
215 root_dir = PermDir(access_config.get_attribute_value(DC.ROOT_DIRECTORY))
216 controller = ExperimentController(xml, root_dir.path)
218 # inject reference to temporary dir, so that it gets cleaned
219 # up at destruction time.
220 controller._tempdir = root_dir
227 elif mode == DC.MODE_DAEMON:
229 return ExperimentControllerProxy(root_dir, log_level,
230 experiment_xml = xml,
231 communication = communication,
239 environment_setup = environment_setup,
240 clean_root = clean_root)
243 # Maybe controller died, recover from persisted testbed information if possible
244 controller = ExperimentControllerProxy(root_dir, log_level,
245 experiment_xml = xml,
246 communication = communication,
254 environment_setup = environment_setup,
255 clean_root = clean_root)
260 raise RuntimeError("Unsupported access configuration '%s'" % mode)
262 def create_testbed_controller(testbed_id, testbed_version, access_config):
265 log_level = DC.ERROR_LEVEL
267 (mode, launch, root_dir, log_level, communication, user, host, port,
268 key, agent, sudo, environment_setup, clean_root) \
269 = get_access_config_params(access_config)
271 os.environ["NEPI_CONTROLLER_LOGLEVEL"] = log_level
273 if not mode or mode == DC.MODE_SINGLE_PROCESS:
275 raise ValueError, "Unsupported instantiation mode: %s with launch=False" % (mode,)
276 return _build_testbed_controller(testbed_id, testbed_version)
277 elif mode == DC.MODE_DAEMON:
278 return TestbedControllerProxy(root_dir, log_level,
279 testbed_id = testbed_id,
280 testbed_version = testbed_version,
281 communication = communication,
289 environment_setup = environment_setup,
290 clean_root = clean_root)
291 raise RuntimeError("Unsupported access configuration '%s'" % mode)
293 def _build_testbed_controller(testbed_id, testbed_version):
294 mod_name = nepi.util.environ.find_testbed(testbed_id)
296 if not mod_name in sys.modules:
300 raise ImportError, "Cannot find module %s in %r" % (mod_name, sys.path)
302 module = sys.modules[mod_name]
303 tc = module.TestbedController()
304 if tc.testbed_version != testbed_version:
305 raise RuntimeError("Bad testbed version on testbed %s. Asked for %s, got %s" % \
306 (testbed_id, testbed_version, tc.testbed_version))
309 # Just a namespace class
313 def pickled_data(sdata):
314 return cPickle.loads(base64.b64decode(sdata))
317 def base64_data(sdata):
318 return base64.b64decode(sdata)
322 return None if sdata == "None" else int(sdata)
326 return sdata == 'True'
330 def pickled_data(data):
331 return base64.b64encode(cPickle.dumps(data))
334 def base64_data(data):
335 return base64.b64encode(data)
339 return "None" if data is None else int(data)
343 return str(bool(data))
345 # import into Marshalling all the decoders
349 for typname, typ in vars(Decoders).iteritems()
350 if not typname.startswith('_')
353 _TYPE_ENCODERS = dict([
354 # id(type) -> (<encoding_function>, <formatting_string>)
355 (typname, (getattr(Encoders,typname),"%s"))
356 for typname in vars(Decoders)
357 if not typname.startswith('_')
358 and hasattr(Encoders,typname)
362 _TYPE_ENCODERS["float"] = (float, "%r")
363 _TYPE_ENCODERS["int"] = (int, "%d")
364 _TYPE_ENCODERS["long"] = (int, "%d")
365 _TYPE_ENCODERS["str"] = (str, "%s")
366 _TYPE_ENCODERS["unicode"] = (str, "%s")
369 _TYPE_ENCODERS[None] = (str, "%s")
374 Decorator that converts the given function into one that takes
375 a single "params" list, with each parameter marshalled according
376 to the given factory callable (type constructors are accepted).
378 The first argument (self) is left untouched.
382 @Marshalling.args(int,int,str,base64_data)
383 def somefunc(self, someint, otherint, somestr, someb64):
388 def rv(self, params):
389 return f(self, *[ ctor(val)
390 for ctor,val in zip(types, params[1:]) ])
394 # Derive type encoders by looking up types in _TYPE_ENCODERS
395 # make_proxy will use it to encode arguments in command strings
397 TYPE_ENCODERS = Marshalling._TYPE_ENCODERS
399 if typ.__name__ in TYPE_ENCODERS:
400 argencoders.append(TYPE_ENCODERS[typ.__name__])
403 argencoders.append(TYPE_ENCODERS[None])
405 rv._argencoders = tuple(argencoders)
407 rv._retval = getattr(f, '_retval', None)
412 def retval(typ=Decoders.base64_data):
414 Decorator that converts the given function into one that
415 returns a properly encoded return string, given that the undecorated
416 function returns suitable input for the encoding function.
418 The optional typ argument specifies a type.
419 For the default of base64_data, return values should be strings.
420 The return value of the encoding method should be a string always.
424 @Marshalling.args(int,int,str,base64_data)
425 @Marshalling.retval(str)
426 def somefunc(self, someint, otherint, somestr, someb64):
429 encode, fmt = Marshalling._TYPE_ENCODERS.get(
431 Marshalling._TYPE_ENCODERS[None])
436 def rv(self, *p, **kw):
437 data = f(self, *p, **kw)
443 rv._argtypes = getattr(f, '_argtypes', None)
444 rv._argencoders = getattr(f, '_argencoders', None)
451 Decorator that converts the given function into one that
452 always return an encoded empty string.
454 Useful for null-returning functions.
459 def rv(self, *p, **kw):
464 rv._argtypes = getattr(f, '_argtypes', None)
465 rv._argencoders = getattr(f, '_argencoders', None)
469 def handles(whichcommand):
471 Associates the method with a given command code for servers.
472 It should always be the topmost decorator.
475 f._handles_command = whichcommand
479 class BaseServer(server.Server):
480 def reply_action(self, msg):
482 result = base64.b64encode("Invalid command line")
483 reply = "%d|%s" % (ERROR, result)
485 params = msg.split("|")
486 instruction = int(params[0])
487 log_msg(self, params)
489 for mname,meth in vars(self.__class__).iteritems():
490 if not mname.startswith('_'):
491 cmd = getattr(meth, '_handles_command', None)
492 if cmd == instruction:
493 meth = getattr(self, mname)
497 error = "Invalid instruction %s" % instruction
498 self.log_error(error)
499 result = base64.b64encode(error)
500 reply = "%d|%s" % (ERROR, result)
502 error = self.log_error()
503 result = base64.b64encode(error)
504 reply = "%d|%s" % (ERROR, result)
505 log_reply(self, reply)
508 class TestbedControllerServer(BaseServer):
509 def __init__(self, root_dir, log_level, testbed_id, testbed_version,
510 environment_setup, clean_root):
511 super(TestbedControllerServer, self).__init__(root_dir, log_level,
512 environment_setup = environment_setup, clean_root = clean_root)
513 self._testbed_id = testbed_id
514 self._testbed_version = testbed_version
517 def post_daemonize(self):
518 self._testbed = _build_testbed_controller(self._testbed_id,
519 self._testbed_version)
521 @Marshalling.handles(GUIDS)
523 @Marshalling.retval( Marshalling.pickled_data )
525 return self._testbed.guids
527 @Marshalling.handles(TESTBED_ID)
529 @Marshalling.retval()
530 def testbed_id(self):
531 return str(self._testbed.testbed_id)
533 @Marshalling.handles(TESTBED_VERSION)
535 @Marshalling.retval()
536 def testbed_version(self):
537 return str(self._testbed.testbed_version)
539 @Marshalling.handles(CREATE)
540 @Marshalling.args(int, str)
542 def defer_create(self, guid, factory_id):
543 self._testbed.defer_create(guid, factory_id)
545 @Marshalling.handles(TRACE)
546 @Marshalling.args(int, str, Marshalling.base64_data)
547 @Marshalling.retval()
548 def trace(self, guid, trace_id, attribute):
549 return self._testbed.trace(guid, trace_id, attribute)
551 @Marshalling.handles(TRACES_INFO)
553 @Marshalling.retval( Marshalling.pickled_data )
554 def traces_info(self):
555 return self._testbed.traces_info()
557 @Marshalling.handles(START)
561 self._testbed.start()
563 @Marshalling.handles(STOP)
569 @Marshalling.handles(SHUTDOWN)
573 self._testbed.shutdown()
575 @Marshalling.handles(CONFIGURE)
576 @Marshalling.args(Marshalling.base64_data, Marshalling.pickled_data)
578 def defer_configure(self, name, value):
579 self._testbed.defer_configure(name, value)
581 @Marshalling.handles(CREATE_SET)
582 @Marshalling.args(int, Marshalling.base64_data, Marshalling.pickled_data)
584 def defer_create_set(self, guid, name, value):
585 self._testbed.defer_create_set(guid, name, value)
587 @Marshalling.handles(FACTORY_SET)
588 @Marshalling.args(Marshalling.base64_data, Marshalling.pickled_data)
590 def defer_factory_set(self, name, value):
591 self._testbed.defer_factory_set(name, value)
593 @Marshalling.handles(CONNECT)
594 @Marshalling.args(int, str, int, str)
596 def defer_connect(self, guid1, connector_type_name1, guid2, connector_type_name2):
597 self._testbed.defer_connect(guid1, connector_type_name1, guid2,
598 connector_type_name2)
600 @Marshalling.handles(CROSS_CONNECT)
601 @Marshalling.args(int, str, int, int, str, str, str)
603 def defer_cross_connect(self,
604 guid, connector_type_name,
605 cross_guid, cross_testbed_guid,
606 cross_testbed_id, cross_factory_id,
607 cross_connector_type_name):
608 self._testbed.defer_cross_connect(guid, connector_type_name, cross_guid,
609 cross_testbed_guid, cross_testbed_id, cross_factory_id,
610 cross_connector_type_name)
612 @Marshalling.handles(ADD_TRACE)
613 @Marshalling.args(int, str)
615 def defer_add_trace(self, guid, trace_id):
616 self._testbed.defer_add_trace(guid, trace_id)
618 @Marshalling.handles(ADD_ADDRESS)
619 @Marshalling.args(int, str, int, Marshalling.pickled_data)
621 def defer_add_address(self, guid, address, netprefix, broadcast):
622 self._testbed.defer_add_address(guid, address, netprefix,
625 @Marshalling.handles(ADD_ROUTE)
626 @Marshalling.args(int, str, int, str, int)
628 def defer_add_route(self, guid, destination, netprefix, nexthop, metric):
629 self._testbed.defer_add_route(guid, destination, netprefix, nexthop, metric)
631 @Marshalling.handles(DO_SETUP)
635 self._testbed.do_setup()
637 @Marshalling.handles(DO_CREATE)
641 self._testbed.do_create()
643 @Marshalling.handles(DO_CONNECT_INIT)
646 def do_connect_init(self):
647 self._testbed.do_connect_init()
649 @Marshalling.handles(DO_CONNECT_COMPL)
652 def do_connect_compl(self):
653 self._testbed.do_connect_compl()
655 @Marshalling.handles(DO_CONFIGURE)
658 def do_configure(self):
659 self._testbed.do_configure()
661 @Marshalling.handles(DO_PRECONFIGURE)
664 def do_preconfigure(self):
665 self._testbed.do_preconfigure()
667 @Marshalling.handles(DO_PRESTART)
670 def do_prestart(self):
671 self._testbed.do_prestart()
673 @Marshalling.handles(DO_CROSS_CONNECT_INIT)
674 @Marshalling.args( Marshalling.Decoders.pickled_data )
676 def do_cross_connect_init(self, cross_data):
677 self._testbed.do_cross_connect_init(cross_data)
679 @Marshalling.handles(DO_CROSS_CONNECT_COMPL)
680 @Marshalling.args( Marshalling.Decoders.pickled_data )
682 def do_cross_connect_compl(self, cross_data):
683 self._testbed.do_cross_connect_compl(cross_data)
685 @Marshalling.handles(GET)
686 @Marshalling.args(int, Marshalling.base64_data, str)
687 @Marshalling.retval( Marshalling.pickled_data )
688 def get(self, guid, name, time):
689 return self._testbed.get(guid, name, time)
691 @Marshalling.handles(SET)
692 @Marshalling.args(int, Marshalling.base64_data, Marshalling.pickled_data, str)
694 def set(self, guid, name, value, time):
695 self._testbed.set(guid, name, value, time)
697 @Marshalling.handles(GET_ADDRESS)
698 @Marshalling.args(int, int, Marshalling.base64_data)
699 @Marshalling.retval()
700 def get_address(self, guid, index, attribute):
701 return str(self._testbed.get_address(guid, index, attribute))
703 @Marshalling.handles(GET_ROUTE)
704 @Marshalling.args(int, int, Marshalling.base64_data)
705 @Marshalling.retval()
706 def get_route(self, guid, index, attribute):
707 return str(self._testbed.get_route(guid, index, attribute))
709 @Marshalling.handles(ACTION)
710 @Marshalling.args(str, int, Marshalling.base64_data)
712 def action(self, time, guid, command):
713 self._testbed.action(time, guid, command)
715 @Marshalling.handles(STATUS)
716 @Marshalling.args(Marshalling.nullint)
717 @Marshalling.retval(int)
718 def status(self, guid):
719 return self._testbed.status(guid)
721 @Marshalling.handles(TESTBED_STATUS)
723 @Marshalling.retval(int)
724 def testbed_status(self):
725 return self._testbed.testbed_status()
727 @Marshalling.handles(GET_ATTRIBUTE_LIST)
728 @Marshalling.args(int, Marshalling.nullint, Marshalling.bool)
729 @Marshalling.retval( Marshalling.pickled_data )
730 def get_attribute_list(self, guid, filter_flags = None, exclude = False):
731 return self._testbed.get_attribute_list(guid, filter_flags, exclude)
733 @Marshalling.handles(GET_FACTORY_ID)
734 @Marshalling.args(int)
735 @Marshalling.retval()
736 def get_factory_id(self, guid):
737 return self._testbed.get_factory_id(guid)
739 @Marshalling.handles(RECOVER)
743 self._testbed.recover()
746 class ExperimentControllerServer(BaseServer):
747 def __init__(self, root_dir, log_level, experiment_xml, environment_setup,
749 super(ExperimentControllerServer, self).__init__(root_dir, log_level,
750 environment_setup = environment_setup, clean_root = clean_root)
751 self._experiment_xml = experiment_xml
752 self._experiment = None
754 def post_daemonize(self):
755 from nepi.core.execute import ExperimentController
756 self._experiment = ExperimentController(self._experiment_xml,
757 root_dir = self._root_dir)
759 @Marshalling.handles(GUIDS)
761 @Marshalling.retval( Marshalling.pickled_data )
763 return self._experiment.guids
765 @Marshalling.handles(STARTED_TIME)
767 @Marshalling.retval( Marshalling.pickled_data )
768 def started_time(self):
769 return self._experiment.started_time
771 @Marshalling.handles(STOPPED_TIME)
773 @Marshalling.retval( Marshalling.pickled_data )
774 def stopped_time(self):
775 return self._experiment.stopped_time
777 @Marshalling.handles(XML)
779 @Marshalling.retval()
780 def experiment_design_xml(self):
781 return self._experiment.experiment_design_xml
783 @Marshalling.handles(EXEC_XML)
785 @Marshalling.retval()
786 def experiment_execute_xml(self):
787 return self._experiment.experiment_execute_xml
789 @Marshalling.handles(TRACE)
790 @Marshalling.args(int, str, Marshalling.base64_data)
791 @Marshalling.retval()
792 def trace(self, guid, trace_id, attribute):
793 return str(self._experiment.trace(guid, trace_id, attribute))
795 @Marshalling.handles(TRACES_INFO)
797 @Marshalling.retval( Marshalling.pickled_data )
798 def traces_info(self):
799 return self._experiment.traces_info()
801 @Marshalling.handles(FINISHED)
802 @Marshalling.args(int)
803 @Marshalling.retval(Marshalling.bool)
804 def is_finished(self, guid):
805 return self._experiment.is_finished(guid)
807 @Marshalling.handles(STATUS)
808 @Marshalling.args(int)
809 @Marshalling.retval(int)
810 def status(self, guid):
811 return self._experiment.is_finished(guid)
813 @Marshalling.handles(GET)
814 @Marshalling.args(int, Marshalling.base64_data, str)
815 @Marshalling.retval( Marshalling.pickled_data )
816 def get(self, guid, name, time):
817 return self._experiment.get(guid, name, time)
819 @Marshalling.handles(SET)
820 @Marshalling.args(int, Marshalling.base64_data, Marshalling.pickled_data, str)
822 def set(self, guid, name, value, time):
823 self._experiment.set(guid, name, value, time)
825 @Marshalling.handles(START)
829 self._experiment.start()
831 @Marshalling.handles(STOP)
835 self._experiment.stop()
837 @Marshalling.handles(RECOVER)
841 self._experiment.recover()
843 @Marshalling.handles(SHUTDOWN)
847 self._experiment.shutdown()
849 @Marshalling.handles(GET_TESTBED_ID)
850 @Marshalling.args(int)
851 @Marshalling.retval()
852 def get_testbed_id(self, guid):
853 return self._experiment.get_testbed_id(guid)
855 @Marshalling.handles(GET_FACTORY_ID)
856 @Marshalling.args(int)
857 @Marshalling.retval()
858 def get_factory_id(self, guid):
859 return self._experiment.get_factory_id(guid)
861 @Marshalling.handles(GET_TESTBED_VERSION)
862 @Marshalling.args(int)
863 @Marshalling.retval()
864 def get_testbed_version(self, guid):
865 return self._experiment.get_testbed_version(guid)
867 class BaseProxy(object):
869 _ServerClassModule = "nepi.util.proxy"
871 def __init__(self, ctor_args, root_dir,
873 communication = DC.ACCESS_LOCAL,
880 environment_setup = "",
884 "from %(classmodule)s import %(classname)s;"
885 "s = %(classname)s%(ctor_args)r;"
888 classname = self._ServerClass.__name__,
889 classmodule = self._ServerClassModule,
890 ctor_args = ctor_args
892 proc = server.popen_python(python_code,
893 communication = communication,
898 ident_key = ident_key,
900 environment_setup = environment_setup)
901 # Wait for the server to be ready, otherwise nobody
902 # will be able to connect to it
906 helo = proc.stderr.readline()
907 if helo == 'SERVER_READY.\n':
911 raise AssertionError, "Expected 'SERVER_READY.', got: %s" % (''.join(err),)
912 # connect client to server
913 self._client = server.Client(root_dir,
914 communication = communication,
920 environment_setup = environment_setup)
923 def _make_message(argtypes, argencoders, command, methname, classname, *args):
924 if len(argtypes) != len(argencoders):
925 raise ValueError, "Invalid arguments for _make_message: "\
926 "in stub method %s of class %s "\
927 "argtypes and argencoders must match in size" % (
928 methname, classname )
929 if len(argtypes) != len(args):
930 raise ValueError, "Invalid arguments for _make_message: "\
931 "in stub method %s of class %s "\
932 "expected %d arguments, got %d" % (
934 len(argtypes), len(args))
937 for argnum, (typ, (encode, fmt), val) in enumerate(zip(argtypes, argencoders, args)):
939 buf.append(fmt % encode(val))
942 raise TypeError, "Argument %d of stub method %s of class %s "\
943 "requires a value of type %s, but got %s - nested error: %s" % (
944 argnum, methname, classname,
945 getattr(typ, '__name__', typ), type(val),
946 traceback.format_exc()
949 return "%d|%s" % (command, '|'.join(buf))
952 def _parse_reply(rvtype, methname, classname, reply):
954 raise RuntimeError, "Invalid reply: %r "\
955 "for stub method %s of class %s" % (
961 result = reply.split("|")
962 code = int(result[0])
966 raise TypeError, "Return value of stub method %s of class %s "\
967 "cannot be parsed: must be of type %s, got %r - nested error: %s" % (
969 getattr(rvtype, '__name__', rvtype), reply,
970 traceback.format_exc()
973 text = base64.b64decode(text)
974 raise RuntimeError(text)
983 raise TypeError, "Return value of stub method %s of class %s "\
984 "cannot be parsed: must be of type %s - nested error: %s" % (
986 getattr(rvtype, '__name__', rvtype),
987 traceback.format_exc()
990 raise RuntimeError, "Invalid reply: %r "\
991 "for stub method %s of class %s - unknown code" % (
997 def _make_stubs(server_class, template_class):
999 Returns a dictionary method_name -> method
1004 class SomeProxy(BaseProxy):
1007 locals().update( BaseProxy._make_stubs(
1012 ServerClass is the corresponding Server class, as
1013 specified in the _ServerClass class method (_make_stubs
1014 is static and can't access the method), and TemplateClass
1015 is the ultimate implementation class behind the server,
1016 from which argument names and defaults are taken, to
1017 maintain meaningful interfaces.
1024 func_template_path = os.path.join(
1025 os.path.dirname(__file__),
1027 func_template_file = open(func_template_path, "r")
1028 func_template = func_template_file.read()
1029 func_template_file.close()
1031 for methname in vars(template_class).copy():
1032 if methname.endswith('_deferred'):
1033 # cannot wrap deferreds...
1035 dmethname = methname+'_deferred'
1036 if hasattr(server_class, methname) and not methname.startswith('_'):
1037 template_meth = getattr(template_class, methname)
1038 server_meth = getattr(server_class, methname)
1040 command = getattr(server_meth, '_handles_command', None)
1041 argtypes = getattr(server_meth, '_argtypes', None)
1042 argencoders = getattr(server_meth, '_argencoders', None)
1043 rvtype = getattr(server_meth, '_retval', None)
1046 if hasattr(template_meth, 'fget'):
1048 template_meth = template_meth.fget
1051 if command is not None and argtypes is not None and argencoders is not None:
1052 # We have an interface method...
1053 code = template_meth.func_code
1054 argnames = code.co_varnames[:code.co_argcount]
1055 argdefaults = ( (NONE,) * (len(argnames) - len(template_meth.func_defaults or ()))
1056 + (template_meth.func_defaults or ()) )
1058 func_globals = dict(
1059 BaseProxy = BaseProxy,
1060 argtypes = argtypes,
1061 argencoders = argencoders,
1063 functools = functools,
1067 func_text = func_template % dict(
1069 args = '%s' % (','.join(argnames[1:])),
1070 argdefs = ','.join([
1071 argname if argdef is NONE
1072 else "%s=%r" % (argname, argdef)
1073 for argname, argdef in zip(argnames[1:], argdefaults[1:])
1076 methname = methname,
1077 classname = server_class.__name__
1080 func_text = compile(
1085 exec func_text in func_globals, context
1088 rv[methname] = property(context[methname])
1089 rv[dmethname] = property(context[dmethname])
1091 rv[methname] = context[methname]
1092 rv[dmethname] = context[dmethname]
1094 # inject _deferred into core classes
1095 if hasattr(template_class, methname) and not hasattr(template_class, dmethname):
1096 def freezename(methname, dmethname):
1097 def dmeth(self, *p, **kw):
1098 return getattr(self, methname)(*p, **kw)
1099 dmeth.__name__ = dmethname
1101 dmeth = freezename(methname, dmethname)
1102 setattr(template_class, dmethname, dmeth)
1106 class TestbedControllerProxy(BaseProxy):
1108 _ServerClass = TestbedControllerServer
1110 def __init__(self, root_dir, log_level,
1112 testbed_version = None,
1114 communication = DC.ACCESS_LOCAL,
1121 environment_setup = "",
1122 clean_root = False):
1123 if launch and (testbed_id == None or testbed_version == None):
1124 raise RuntimeError("To launch a TesbedControllerServer a "
1125 "testbed_id and testbed_version are required")
1126 super(TestbedControllerProxy,self).__init__(
1127 ctor_args = (root_dir, log_level, testbed_id, testbed_version,
1128 environment_setup, clean_root),
1129 root_dir = root_dir,
1131 communication = communication,
1135 ident_key = ident_key,
1138 environment_setup = environment_setup)
1140 locals().update( BaseProxy._make_stubs(
1141 server_class = TestbedControllerServer,
1142 template_class = nepi.core.execute.TestbedController,
1145 # Shutdown stops the serverside...
1146 def shutdown(self, _stub = shutdown):
1148 self._client.send_stop()
1149 self._client.read_reply() # wait for it
1153 class ExperimentControllerProxy(BaseProxy):
1154 _ServerClass = ExperimentControllerServer
1156 def __init__(self, root_dir, log_level,
1157 experiment_xml = None,
1159 communication = DC.ACCESS_LOCAL,
1166 environment_setup = "",
1167 clean_root = False):
1168 super(ExperimentControllerProxy,self).__init__(
1169 ctor_args = (root_dir, log_level, experiment_xml, environment_setup,
1171 root_dir = root_dir,
1173 communication = communication,
1177 ident_key = ident_key,
1180 environment_setup = environment_setup,
1181 clean_root = clean_root)
1183 locals().update( BaseProxy._make_stubs(
1184 server_class = ExperimentControllerServer,
1185 template_class = nepi.core.execute.ExperimentController,
1189 # Shutdown stops the serverside...
1190 def shutdown(self, _stub = shutdown):
1192 self._client.send_stop()
1193 self._client.read_reply() # wait for it