2 # -*- coding: utf-8 -*-
5 import nepi.core.execute
6 import nepi.util.environ
7 from nepi.core.attributes import AttributesMap, Attribute
8 from nepi.util import server, validation
9 from nepi.util.constants import TIME_NOW, ATTR_NEPI_TESTBED_ENVIRONMENT_SETUP, DeploymentConfiguration as DC
23 # PROTOCOL INSTRUCTION MESSAGES
43 DO_CROSS_CONNECT_INIT = 22
53 GET_ATTRIBUTE_LIST = 32
55 DO_CROSS_CONNECT_COMPL = 34
61 GET_TESTBED_VERSION = 40
65 instruction_text = dict({
76 CONFIGURE: "CONFIGURE",
78 CREATE_SET: "CREATE_SET",
79 FACTORY_SET: "FACTORY_SET",
81 CROSS_CONNECT: "CROSS_CONNECT",
82 ADD_TRACE: "ADD_TRACE",
83 ADD_ADDRESS: "ADD_ADDRESS",
84 ADD_ROUTE: "ADD_ROUTE",
86 DO_CREATE: "DO_CREATE",
87 DO_CONNECT_INIT: "DO_CONNECT_INIT",
88 DO_CONNECT_COMPL: "DO_CONNECT_COMPL",
89 DO_CONFIGURE: "DO_CONFIGURE",
90 DO_PRECONFIGURE: "DO_PRECONFIGURE",
91 DO_CROSS_CONNECT_INIT: "DO_CROSS_CONNECT_INIT",
92 DO_CROSS_CONNECT_COMPL: "DO_CROSS_CONNECT_COMPL",
95 GET_ROUTE: "GET_ROUTE",
96 GET_ADDRESS: "GET_ADDRESS",
97 GET_ATTRIBUTE_LIST: "GET_ATTRIBUTE_LIST",
98 GET_FACTORY_ID: "GET_FACTORY_ID",
99 GET_TESTBED_ID: "GET_TESTBED_ID",
100 GET_TESTBED_VERSION: "GET_TESTBED_VERSION",
104 TESTBED_ID: "TESTBED_ID",
105 TESTBED_VERSION: "TESTBED_VERSION",
106 TRACES_INFO: "TRACES_INFO",
109 def log_msg(server, params):
111 instr = int(params[0])
112 instr_txt = instruction_text[instr]
113 server.log_debug("%s - msg: %s [%s]" % (server.__class__.__name__,
114 instr_txt, ", ".join(map(str, params[1:]))))
116 # don't die for logging
119 def log_reply(server, reply):
121 res = reply.split("|")
123 code_txt = instruction_text[code]
125 txt = base64.b64decode(res[1])
128 server.log_debug("%s - reply: %s %s" % (server.__class__.__name__,
131 # don't die for logging
132 server.log_debug("%s - reply: %s" % (server.__class__.__name__,
136 def to_server_log_level(log_level):
139 if log_level == DC.DEBUG_LEVEL
140 else server.ERROR_LEVEL
143 def get_access_config_params(access_config):
144 root_dir = access_config.get_attribute_value(DC.ROOT_DIRECTORY)
145 log_level = access_config.get_attribute_value(DC.LOG_LEVEL)
146 log_level = to_server_log_level(log_level)
147 user = host = port = agent = key = None
148 communication = access_config.get_attribute_value(DC.DEPLOYMENT_COMMUNICATION)
149 environment_setup = (
150 access_config.get_attribute_value(DC.DEPLOYMENT_ENVIRONMENT_SETUP)
151 if access_config.has_attribute(DC.DEPLOYMENT_ENVIRONMENT_SETUP)
154 if communication == DC.ACCESS_SSH:
155 user = access_config.get_attribute_value(DC.DEPLOYMENT_USER)
156 host = access_config.get_attribute_value(DC.DEPLOYMENT_HOST)
157 port = access_config.get_attribute_value(DC.DEPLOYMENT_PORT)
158 agent = access_config.get_attribute_value(DC.USE_AGENT)
159 key = access_config.get_attribute_value(DC.DEPLOYMENT_KEY)
160 return (root_dir, log_level, user, host, port, key, agent, environment_setup)
162 class AccessConfiguration(AttributesMap):
163 def __init__(self, params = None):
164 super(AccessConfiguration, self).__init__()
166 from nepi.core.metadata import Metadata
168 for _,attr_info in Metadata.DEPLOYMENT_ATTRIBUTES.iteritems():
169 self.add_attribute(**attr_info)
172 for attr_name, attr_value in params.iteritems():
173 parser = Attribute.type_parsers[self.get_attribute_type(attr_name)]
174 attr_value = parser(attr_value)
175 self.set_attribute_value(attr_name, attr_value)
177 class TempDir(object):
179 self.path = tempfile.mkdtemp()
182 shutil.rmtree(self.path)
184 class PermDir(object):
185 def __init__(self, path):
188 def create_experiment_controller(xml, access_config = None):
189 mode = None if not access_config \
190 else access_config.get_attribute_value(DC.DEPLOYMENT_MODE)
191 launch = True if not access_config \
192 else not access_config.get_attribute_value(DC.RECOVER)
193 if not mode or mode == DC.MODE_SINGLE_PROCESS:
194 from nepi.core.execute import ExperimentController
196 if not access_config or not access_config.has_attribute(DC.ROOT_DIRECTORY):
199 root_dir = PermDir(access_config.get_attribute_value(DC.ROOT_DIRECTORY))
200 controller = ExperimentController(xml, root_dir.path)
202 # inject reference to temporary dir, so that it gets cleaned
203 # up at destruction time.
204 controller._tempdir = root_dir
211 elif mode == DC.MODE_DAEMON:
212 (root_dir, log_level, user, host, port, key, agent, environment_setup) = \
213 get_access_config_params(access_config)
215 return ExperimentControllerProxy(root_dir, log_level,
216 experiment_xml = xml, host = host, port = port, user = user, ident_key = key,
217 agent = agent, launch = launch,
218 environment_setup = environment_setup)
221 # Maybe controller died, recover from persisted testbed information if possible
222 controller = ExperimentControllerProxy(root_dir, log_level,
223 experiment_xml = xml, host = host, port = port, user = user, ident_key = key,
224 agent = agent, launch = True,
225 environment_setup = environment_setup)
230 raise RuntimeError("Unsupported access configuration '%s'" % mode)
232 def create_testbed_controller(testbed_id, testbed_version, access_config):
233 mode = None if not access_config \
234 else access_config.get_attribute_value(DC.DEPLOYMENT_MODE)
235 launch = True if not access_config \
236 else not access_config.get_attribute_value(DC.RECOVER)
237 if not mode or mode == DC.MODE_SINGLE_PROCESS:
239 raise ValueError, "Unsupported instantiation mode: %s with lanch=False" % (mode,)
240 return _build_testbed_controller(testbed_id, testbed_version)
241 elif mode == DC.MODE_DAEMON:
242 (root_dir, log_level, user, host, port, key, agent, environment_setup) = \
243 get_access_config_params(access_config)
244 return TestbedControllerProxy(root_dir, log_level, testbed_id = testbed_id,
245 testbed_version = testbed_version, host = host, port = port, ident_key = key,
246 user = user, agent = agent, launch = launch,
247 environment_setup = environment_setup)
248 raise RuntimeError("Unsupported access configuration '%s'" % mode)
250 def _build_testbed_controller(testbed_id, testbed_version):
251 mod_name = nepi.util.environ.find_testbed(testbed_id)
253 if not mod_name in sys.modules:
257 raise ImportError, "Cannot find module %s in %r" % (mod_name, sys.path)
259 module = sys.modules[mod_name]
260 tc = module.TestbedController()
261 if tc.testbed_version != testbed_version:
262 raise RuntimeError("Bad testbed version on testbed %s. Asked for %s, got %s" % \
263 (testbed_id, testbed_version, tc.testbed_version))
266 # Just a namespace class
270 def pickled_data(sdata):
271 return cPickle.loads(base64.b64decode(sdata))
274 def base64_data(sdata):
275 return base64.b64decode(sdata)
279 return None if sdata == "None" else int(sdata)
283 return sdata == 'True'
287 def pickled_data(data):
288 return base64.b64encode(cPickle.dumps(data))
291 def base64_data(data):
292 return base64.b64encode(data)
296 return "None" if data is None else int(data)
300 return str(bool(data))
302 # import into Marshalling all the decoders
306 for typname, typ in vars(Decoders).iteritems()
307 if not typname.startswith('_')
310 _TYPE_ENCODERS = dict([
311 # id(type) -> (<encoding_function>, <formatting_string>)
312 (typname, (getattr(Encoders,typname),"%s"))
313 for typname in vars(Decoders)
314 if not typname.startswith('_')
315 and hasattr(Encoders,typname)
319 _TYPE_ENCODERS["float"] = (float, "%r")
320 _TYPE_ENCODERS["int"] = (int, "%d")
321 _TYPE_ENCODERS["long"] = (int, "%d")
322 _TYPE_ENCODERS["str"] = (str, "%s")
323 _TYPE_ENCODERS["unicode"] = (str, "%s")
326 _TYPE_ENCODERS[None] = (str, "%s")
331 Decorator that converts the given function into one that takes
332 a single "params" list, with each parameter marshalled according
333 to the given factory callable (type constructors are accepted).
335 The first argument (self) is left untouched.
339 @Marshalling.args(int,int,str,base64_data)
340 def somefunc(self, someint, otherint, somestr, someb64):
345 def rv(self, params):
346 return f(self, *[ ctor(val)
347 for ctor,val in zip(types, params[1:]) ])
351 # Derive type encoders by looking up types in _TYPE_ENCODERS
352 # make_proxy will use it to encode arguments in command strings
354 TYPE_ENCODERS = Marshalling._TYPE_ENCODERS
356 if typ.__name__ in TYPE_ENCODERS:
357 argencoders.append(TYPE_ENCODERS[typ.__name__])
360 argencoders.append(TYPE_ENCODERS[None])
362 rv._argencoders = tuple(argencoders)
364 rv._retval = getattr(f, '_retval', None)
369 def retval(typ=Decoders.base64_data):
371 Decorator that converts the given function into one that
372 returns a properly encoded return string, given that the undecorated
373 function returns suitable input for the encoding function.
375 The optional typ argument specifies a type.
376 For the default of base64_data, return values should be strings.
377 The return value of the encoding method should be a string always.
381 @Marshalling.args(int,int,str,base64_data)
382 @Marshalling.retval(str)
383 def somefunc(self, someint, otherint, somestr, someb64):
386 encode, fmt = Marshalling._TYPE_ENCODERS.get(
388 Marshalling._TYPE_ENCODERS[None])
393 def rv(self, *p, **kw):
394 data = f(self, *p, **kw)
400 rv._argtypes = getattr(f, '_argtypes', None)
401 rv._argencoders = getattr(f, '_argencoders', None)
408 Decorator that converts the given function into one that
409 always return an encoded empty string.
411 Useful for null-returning functions.
416 def rv(self, *p, **kw):
421 rv._argtypes = getattr(f, '_argtypes', None)
422 rv._argencoders = getattr(f, '_argencoders', None)
426 def handles(whichcommand):
428 Associates the method with a given command code for servers.
429 It should always be the topmost decorator.
432 f._handles_command = whichcommand
436 class BaseServer(server.Server):
437 def reply_action(self, msg):
439 result = base64.b64encode("Invalid command line")
440 reply = "%d|%s" % (ERROR, result)
442 params = msg.split("|")
443 instruction = int(params[0])
444 log_msg(self, params)
446 for mname,meth in vars(self.__class__).iteritems():
447 if not mname.startswith('_'):
448 cmd = getattr(meth, '_handles_command', None)
449 if cmd == instruction:
450 meth = getattr(self, mname)
454 error = "Invalid instruction %s" % instruction
455 self.log_error(error)
456 result = base64.b64encode(error)
457 reply = "%d|%s" % (ERROR, result)
459 error = self.log_error()
460 result = base64.b64encode(error)
461 reply = "%d|%s" % (ERROR, result)
462 log_reply(self, reply)
465 class TestbedControllerServer(BaseServer):
466 def __init__(self, root_dir, log_level, testbed_id, testbed_version, environment_setup):
467 super(TestbedControllerServer, self).__init__(root_dir, log_level,
468 environment_setup = environment_setup )
469 self._testbed_id = testbed_id
470 self._testbed_version = testbed_version
473 def post_daemonize(self):
474 self._testbed = _build_testbed_controller(self._testbed_id,
475 self._testbed_version)
477 @Marshalling.handles(GUIDS)
479 @Marshalling.retval( Marshalling.pickled_data )
481 return self._testbed.guids
483 @Marshalling.handles(TESTBED_ID)
485 @Marshalling.retval()
486 def testbed_id(self):
487 return str(self._testbed.testbed_id)
489 @Marshalling.handles(TESTBED_VERSION)
491 @Marshalling.retval()
492 def testbed_version(self):
493 return str(self._testbed.testbed_version)
495 @Marshalling.handles(CREATE)
496 @Marshalling.args(int, str)
498 def defer_create(self, guid, factory_id):
499 self._testbed.defer_create(guid, factory_id)
501 @Marshalling.handles(TRACE)
502 @Marshalling.args(int, str, Marshalling.base64_data)
503 @Marshalling.retval()
504 def trace(self, guid, trace_id, attribute):
505 return self._testbed.trace(guid, trace_id, attribute)
507 @Marshalling.handles(TRACES_INFO)
509 @Marshalling.retval( Marshalling.pickled_data )
510 def traces_info(self):
511 return self._testbed.traces_info()
513 @Marshalling.handles(START)
517 self._testbed.start()
519 @Marshalling.handles(STOP)
525 @Marshalling.handles(SHUTDOWN)
529 self._testbed.shutdown()
531 @Marshalling.handles(CONFIGURE)
532 @Marshalling.args(Marshalling.base64_data, Marshalling.pickled_data)
534 def defer_configure(self, name, value):
535 self._testbed.defer_configure(name, value)
537 @Marshalling.handles(CREATE_SET)
538 @Marshalling.args(int, Marshalling.base64_data, Marshalling.pickled_data)
540 def defer_create_set(self, guid, name, value):
541 self._testbed.defer_create_set(guid, name, value)
543 @Marshalling.handles(FACTORY_SET)
544 @Marshalling.args(Marshalling.base64_data, Marshalling.pickled_data)
546 def defer_factory_set(self, name, value):
547 self._testbed.defer_factory_set(name, value)
549 @Marshalling.handles(CONNECT)
550 @Marshalling.args(int, str, int, str)
552 def defer_connect(self, guid1, connector_type_name1, guid2, connector_type_name2):
553 self._testbed.defer_connect(guid1, connector_type_name1, guid2,
554 connector_type_name2)
556 @Marshalling.handles(CROSS_CONNECT)
557 @Marshalling.args(int, str, int, int, str, str, str)
559 def defer_cross_connect(self,
560 guid, connector_type_name,
561 cross_guid, cross_testbed_guid,
562 cross_testbed_id, cross_factory_id,
563 cross_connector_type_name):
564 self._testbed.defer_cross_connect(guid, connector_type_name, cross_guid,
565 cross_testbed_guid, cross_testbed_id, cross_factory_id,
566 cross_connector_type_name)
568 @Marshalling.handles(ADD_TRACE)
569 @Marshalling.args(int, str)
571 def defer_add_trace(self, guid, trace_id):
572 self._testbed.defer_add_trace(guid, trace_id)
574 @Marshalling.handles(ADD_ADDRESS)
575 @Marshalling.args(int, str, int, Marshalling.pickled_data)
577 def defer_add_address(self, guid, address, netprefix, broadcast):
578 self._testbed.defer_add_address(guid, address, netprefix,
581 @Marshalling.handles(ADD_ROUTE)
582 @Marshalling.args(int, str, int, str, int)
584 def defer_add_route(self, guid, destination, netprefix, nexthop, metric):
585 self._testbed.defer_add_route(guid, destination, netprefix, nexthop, metric)
587 @Marshalling.handles(DO_SETUP)
591 self._testbed.do_setup()
593 @Marshalling.handles(DO_CREATE)
597 self._testbed.do_create()
599 @Marshalling.handles(DO_CONNECT_INIT)
602 def do_connect_init(self):
603 self._testbed.do_connect_init()
605 @Marshalling.handles(DO_CONNECT_COMPL)
608 def do_connect_compl(self):
609 self._testbed.do_connect_compl()
611 @Marshalling.handles(DO_CONFIGURE)
614 def do_configure(self):
615 self._testbed.do_configure()
617 @Marshalling.handles(DO_PRECONFIGURE)
620 def do_preconfigure(self):
621 self._testbed.do_preconfigure()
623 @Marshalling.handles(DO_PRESTART)
626 def do_prestart(self):
627 self._testbed.do_prestart()
629 @Marshalling.handles(DO_CROSS_CONNECT_INIT)
630 @Marshalling.args( Marshalling.Decoders.pickled_data )
632 def do_cross_connect_init(self, cross_data):
633 self._testbed.do_cross_connect_init(cross_data)
635 @Marshalling.handles(DO_CROSS_CONNECT_COMPL)
636 @Marshalling.args( Marshalling.Decoders.pickled_data )
638 def do_cross_connect_compl(self, cross_data):
639 self._testbed.do_cross_connect_compl(cross_data)
641 @Marshalling.handles(GET)
642 @Marshalling.args(int, Marshalling.base64_data, str)
643 @Marshalling.retval( Marshalling.pickled_data )
644 def get(self, guid, name, time):
645 return self._testbed.get(guid, name, time)
647 @Marshalling.handles(SET)
648 @Marshalling.args(int, Marshalling.base64_data, Marshalling.pickled_data, str)
650 def set(self, guid, name, value, time):
651 self._testbed.set(guid, name, value, time)
653 @Marshalling.handles(GET_ADDRESS)
654 @Marshalling.args(int, int, Marshalling.base64_data)
655 @Marshalling.retval()
656 def get_address(self, guid, index, attribute):
657 return str(self._testbed.get_address(guid, index, attribute))
659 @Marshalling.handles(GET_ROUTE)
660 @Marshalling.args(int, int, Marshalling.base64_data)
661 @Marshalling.retval()
662 def get_route(self, guid, index, attribute):
663 return str(self._testbed.get_route(guid, index, attribute))
665 @Marshalling.handles(ACTION)
666 @Marshalling.args(str, int, Marshalling.base64_data)
668 def action(self, time, guid, command):
669 self._testbed.action(time, guid, command)
671 @Marshalling.handles(STATUS)
672 @Marshalling.args(Marshalling.nullint)
673 @Marshalling.retval(int)
674 def status(self, guid):
675 return self._testbed.status(guid)
677 @Marshalling.handles(GET_ATTRIBUTE_LIST)
678 @Marshalling.args(int, Marshalling.nullint, Marshalling.bool)
679 @Marshalling.retval( Marshalling.pickled_data )
680 def get_attribute_list(self, guid, filter_flags = None, exclude = False):
681 return self._testbed.get_attribute_list(guid, filter_flags, exclude)
683 @Marshalling.handles(GET_FACTORY_ID)
684 @Marshalling.args(int)
685 @Marshalling.retval()
686 def get_factory_id(self, guid):
687 return self._testbed.get_factory_id(guid)
689 class ExperimentControllerServer(BaseServer):
690 def __init__(self, root_dir, log_level, experiment_xml, environment_setup):
691 super(ExperimentControllerServer, self).__init__(root_dir, log_level,
692 environment_setup = environment_setup )
693 self._experiment_xml = experiment_xml
694 self._experiment = None
696 def post_daemonize(self):
697 from nepi.core.execute import ExperimentController
698 self._experiment = ExperimentController(self._experiment_xml,
699 root_dir = self._root_dir)
701 @Marshalling.handles(GUIDS)
703 @Marshalling.retval( Marshalling.pickled_data )
705 return self._experiment.guids
707 @Marshalling.handles(XML)
709 @Marshalling.retval()
710 def experiment_design_xml(self):
711 return self._experiment.experiment_design_xml
713 @Marshalling.handles(EXEC_XML)
715 @Marshalling.retval()
716 def experiment_execute_xml(self):
717 return self._experiment.experiment_execute_xml
719 @Marshalling.handles(TRACE)
720 @Marshalling.args(int, str, Marshalling.base64_data)
721 @Marshalling.retval()
722 def trace(self, guid, trace_id, attribute):
723 return str(self._experiment.trace(guid, trace_id, attribute))
725 @Marshalling.handles(TRACES_INFO)
727 @Marshalling.retval( Marshalling.pickled_data )
728 def traces_info(self):
729 return self._experiment.traces_info()
731 @Marshalling.handles(FINISHED)
732 @Marshalling.args(int)
733 @Marshalling.retval(Marshalling.bool)
734 def is_finished(self, guid):
735 return self._experiment.is_finished(guid)
737 @Marshalling.handles(GET)
738 @Marshalling.args(int, Marshalling.base64_data, str)
739 @Marshalling.retval( Marshalling.pickled_data )
740 def get(self, guid, name, time):
741 return self._experiment.get(guid, name, time)
743 @Marshalling.handles(SET)
744 @Marshalling.args(int, Marshalling.base64_data, Marshalling.pickled_data, str)
746 def set(self, guid, name, value, time):
747 self._experiment.set(guid, name, value, time)
749 @Marshalling.handles(START)
753 self._experiment.start()
755 @Marshalling.handles(STOP)
759 self._experiment.stop()
761 @Marshalling.handles(RECOVER)
765 self._experiment.recover()
767 @Marshalling.handles(SHUTDOWN)
771 self._experiment.shutdown()
773 @Marshalling.handles(GET_TESTBED_ID)
774 @Marshalling.args(int)
775 @Marshalling.retval()
776 def get_testbed_id(self, guid):
777 return self._experiment.get_testbed_id(guid)
779 @Marshalling.handles(GET_FACTORY_ID)
780 @Marshalling.args(int)
781 @Marshalling.retval()
782 def get_factory_id(self, guid):
783 return self._experiment.get_factory_id(guid)
785 @Marshalling.handles(GET_TESTBED_VERSION)
786 @Marshalling.args(int)
787 @Marshalling.retval()
788 def get_testbed_version(self, guid):
789 return self._experiment.get_testbed_version(guid)
791 class BaseProxy(object):
793 _ServerClassModule = "nepi.util.proxy"
797 launch = True, host = None,
798 port = None, user = None, ident_key = None, agent = None,
799 environment_setup = ""):
804 "from %(classmodule)s import %(classname)s;"
805 "s = %(classname)s%(ctor_args)r;"
808 classname = self._ServerClass.__name__,
809 classmodule = self._ServerClassModule,
810 ctor_args = ctor_args
812 proc = server.popen_ssh_subprocess(python_code, host = host,
813 port = port, user = user, agent = agent,
814 ident_key = ident_key,
815 environment_setup = environment_setup,
818 err = proc.stderr.read()
819 raise RuntimeError, "Server could not be executed: %s" % (err,)
822 s = self._ServerClass(*ctor_args)
825 # connect client to server
826 self._client = server.Client(root_dir, host = host, port = port,
827 user = user, agent = agent,
828 environment_setup = environment_setup)
831 def _make_message(argtypes, argencoders, command, methname, classname, *args):
832 if len(argtypes) != len(argencoders):
833 raise ValueError, "Invalid arguments for _make_message: "\
834 "in stub method %s of class %s "\
835 "argtypes and argencoders must match in size" % (
836 methname, classname )
837 if len(argtypes) != len(args):
838 raise ValueError, "Invalid arguments for _make_message: "\
839 "in stub method %s of class %s "\
840 "expected %d arguments, got %d" % (
842 len(argtypes), len(args))
845 for argnum, (typ, (encode, fmt), val) in enumerate(zip(argtypes, argencoders, args)):
847 buf.append(fmt % encode(val))
850 raise TypeError, "Argument %d of stub method %s of class %s "\
851 "requires a value of type %s, but got %s - nested error: %s" % (
852 argnum, methname, classname,
853 getattr(typ, '__name__', typ), type(val),
854 traceback.format_exc()
857 return "%d|%s" % (command, '|'.join(buf))
860 def _parse_reply(rvtype, methname, classname, reply):
862 raise RuntimeError, "Invalid reply: %r "\
863 "for stub method %s of class %s" % (
869 result = reply.split("|")
870 code = int(result[0])
874 raise TypeError, "Return value of stub method %s of class %s "\
875 "cannot be parsed: must be of type %s, got %r - nested error: %s" % (
877 getattr(rvtype, '__name__', rvtype), reply,
878 traceback.format_exc()
881 text = base64.b64decode(text)
882 raise RuntimeError(text)
891 raise TypeError, "Return value of stub method %s of class %s "\
892 "cannot be parsed: must be of type %s - nested error: %s" % (
894 getattr(rvtype, '__name__', rvtype),
895 traceback.format_exc()
898 raise RuntimeError, "Invalid reply: %r "\
899 "for stub method %s of class %s - unknown code" % (
905 def _make_stubs(server_class, template_class):
907 Returns a dictionary method_name -> method
912 class SomeProxy(BaseProxy):
915 locals().update( BaseProxy._make_stubs(
920 ServerClass is the corresponding Server class, as
921 specified in the _ServerClass class method (_make_stubs
922 is static and can't access the method), and TemplateClass
923 is the ultimate implementation class behind the server,
924 from which argument names and defaults are taken, to
925 maintain meaningful interfaces.
932 func_template_path = os.path.join(
933 os.path.dirname(__file__),
935 func_template_file = open(func_template_path, "r")
936 func_template = func_template_file.read()
937 func_template_file.close()
939 for methname in vars(template_class).copy():
940 if methname.endswith('_deferred'):
941 # cannot wrap deferreds...
943 dmethname = methname+'_deferred'
944 if hasattr(server_class, methname) and not methname.startswith('_'):
945 template_meth = getattr(template_class, methname)
946 server_meth = getattr(server_class, methname)
948 command = getattr(server_meth, '_handles_command', None)
949 argtypes = getattr(server_meth, '_argtypes', None)
950 argencoders = getattr(server_meth, '_argencoders', None)
951 rvtype = getattr(server_meth, '_retval', None)
954 if hasattr(template_meth, 'fget'):
956 template_meth = template_meth.fget
959 if command is not None and argtypes is not None and argencoders is not None:
960 # We have an interface method...
961 code = template_meth.func_code
962 argnames = code.co_varnames[:code.co_argcount]
963 argdefaults = ( (NONE,) * (len(argnames) - len(template_meth.func_defaults or ()))
964 + (template_meth.func_defaults or ()) )
967 BaseProxy = BaseProxy,
969 argencoders = argencoders,
971 functools = functools,
975 func_text = func_template % dict(
977 args = '%s' % (','.join(argnames[1:])),
979 argname if argdef is NONE
980 else "%s=%r" % (argname, argdef)
981 for argname, argdef in zip(argnames[1:], argdefaults[1:])
985 classname = server_class.__name__
993 exec func_text in func_globals, context
996 rv[methname] = property(context[methname])
997 rv[dmethname] = property(context[dmethname])
999 rv[methname] = context[methname]
1000 rv[dmethname] = context[dmethname]
1002 # inject _deferred into core classes
1003 if hasattr(template_class, methname) and not hasattr(template_class, dmethname):
1004 def freezename(methname, dmethname):
1005 def dmeth(self, *p, **kw):
1006 return getattr(self, methname)(*p, **kw)
1007 dmeth.__name__ = dmethname
1009 dmeth = freezename(methname, dmethname)
1010 setattr(template_class, dmethname, dmeth)
1014 class TestbedControllerProxy(BaseProxy):
1016 _ServerClass = TestbedControllerServer
1018 def __init__(self, root_dir, log_level, testbed_id = None,
1019 testbed_version = None, launch = True, host = None,
1020 port = None, user = None, ident_key = None, agent = None,
1021 environment_setup = ""):
1022 if launch and (testbed_id == None or testbed_version == None):
1023 raise RuntimeError("To launch a TesbedControllerServer a "
1024 "testbed_id and testbed_version are required")
1025 super(TestbedControllerProxy,self).__init__(
1026 ctor_args = (root_dir, log_level, testbed_id, testbed_version, environment_setup),
1027 root_dir = root_dir,
1028 launch = launch, host = host, port = port, user = user,
1029 ident_key = ident_key, agent = agent,
1030 environment_setup = environment_setup)
1032 locals().update( BaseProxy._make_stubs(
1033 server_class = TestbedControllerServer,
1034 template_class = nepi.core.execute.TestbedController,
1037 # Shutdown stops the serverside...
1038 def shutdown(self, _stub = shutdown):
1040 self._client.send_stop()
1041 self._client.read_reply() # wait for it
1045 class ExperimentControllerProxy(BaseProxy):
1046 _ServerClass = ExperimentControllerServer
1048 def __init__(self, root_dir, log_level, experiment_xml = None,
1049 launch = True, host = None, port = None, user = None,
1050 ident_key = None, agent = None, environment_setup = ""):
1051 super(ExperimentControllerProxy,self).__init__(
1052 ctor_args = (root_dir, log_level, experiment_xml, environment_setup),
1053 root_dir = root_dir,
1054 launch = launch, host = host, port = port, user = user,
1055 ident_key = ident_key, agent = agent,
1056 environment_setup = environment_setup)
1058 locals().update( BaseProxy._make_stubs(
1059 server_class = ExperimentControllerServer,
1060 template_class = nepi.core.execute.ExperimentController,
1064 # Shutdown stops the serverside...
1065 def shutdown(self, _stub = shutdown):
1067 self._client.send_stop()
1068 self._client.read_reply() # wait for it