2 # -*- coding: utf-8 -*-
5 import nepi.core.execute
6 from nepi.core.attributes import AttributesMap, Attribute
7 from nepi.util import server, validation
8 from nepi.util.constants import TIME_NOW, ATTR_NEPI_TESTBED_ENVIRONMENT_SETUP, DeploymentConfiguration as DC
21 # PROTOCOL INSTRUCTION MESSAGES
41 DO_CROSS_CONNECT_INIT = 22
51 GET_ATTRIBUTE_LIST = 32
53 DO_CROSS_CONNECT_COMPL = 34
59 GET_TESTBED_VERSION = 40
62 instruction_text = dict({
72 CONFIGURE: "CONFIGURE",
74 CREATE_SET: "CREATE_SET",
75 FACTORY_SET: "FACTORY_SET",
77 CROSS_CONNECT: "CROSS_CONNECT",
78 ADD_TRACE: "ADD_TRACE",
79 ADD_ADDRESS: "ADD_ADDRESS",
80 ADD_ROUTE: "ADD_ROUTE",
82 DO_CREATE: "DO_CREATE",
83 DO_CONNECT_INIT: "DO_CONNECT_INIT",
84 DO_CONNECT_COMPL: "DO_CONNECT_COMPL",
85 DO_CONFIGURE: "DO_CONFIGURE",
86 DO_PRECONFIGURE: "DO_PRECONFIGURE",
87 DO_CROSS_CONNECT_INIT: "DO_CROSS_CONNECT_INIT",
88 DO_CROSS_CONNECT_COMPL: "DO_CROSS_CONNECT_COMPL",
91 GET_ROUTE: "GET_ROUTE",
92 GET_ADDRESS: "GET_ADDRESS",
93 GET_ATTRIBUTE_LIST: "GET_ATTRIBUTE_LIST",
94 GET_FACTORY_ID: "GET_FACTORY_ID",
95 GET_TESTBED_ID: "GET_TESTBED_ID",
96 GET_TESTBED_VERSION: "GET_TESTBED_VERSION",
100 TESTBED_ID: "TESTBED_ID",
101 TESTBED_VERSION: "TESTBED_VERSION",
102 TRACES_INFO: "TRACES_INFO",
105 def log_msg(server, params):
107 instr = int(params[0])
108 instr_txt = instruction_text[instr]
109 server.log_debug("%s - msg: %s [%s]" % (server.__class__.__name__,
110 instr_txt, ", ".join(map(str, params[1:]))))
112 # don't die for logging
115 def log_reply(server, reply):
117 res = reply.split("|")
119 code_txt = instruction_text[code]
121 txt = base64.b64decode(res[1])
124 server.log_debug("%s - reply: %s %s" % (server.__class__.__name__,
127 # don't die for logging
128 server.log_debug("%s - reply: %s" % (server.__class__.__name__,
132 def to_server_log_level(log_level):
135 if log_level == DC.DEBUG_LEVEL
136 else server.ERROR_LEVEL
139 def get_access_config_params(access_config):
140 root_dir = access_config.get_attribute_value(DC.ROOT_DIRECTORY)
141 log_level = access_config.get_attribute_value(DC.LOG_LEVEL)
142 log_level = to_server_log_level(log_level)
143 user = host = port = agent = key = None
144 communication = access_config.get_attribute_value(DC.DEPLOYMENT_COMMUNICATION)
145 environment_setup = (
146 access_config.get_attribute_value(DC.DEPLOYMENT_ENVIRONMENT_SETUP)
147 if access_config.has_attribute(DC.DEPLOYMENT_ENVIRONMENT_SETUP)
150 if communication == DC.ACCESS_SSH:
151 user = access_config.get_attribute_value(DC.DEPLOYMENT_USER)
152 host = access_config.get_attribute_value(DC.DEPLOYMENT_HOST)
153 port = access_config.get_attribute_value(DC.DEPLOYMENT_PORT)
154 agent = access_config.get_attribute_value(DC.USE_AGENT)
155 key = access_config.get_attribute_value(DC.DEPLOYMENT_KEY)
156 return (root_dir, log_level, user, host, port, key, agent, environment_setup)
158 class AccessConfiguration(AttributesMap):
159 def __init__(self, params = None):
160 super(AccessConfiguration, self).__init__()
162 from nepi.core.metadata import Metadata
164 for _,attr_info in Metadata.DEPLOYMENT_ATTRIBUTES.iteritems():
165 self.add_attribute(**attr_info)
168 for attr_name, attr_value in params.iteritems():
169 parser = Attribute.type_parsers[self.get_attribute_type(attr_name)]
170 attr_value = parser(attr_value)
171 self.set_attribute_value(attr_name, attr_value)
173 class TempDir(object):
175 self.path = tempfile.mkdtemp()
178 shutil.rmtree(self.path)
180 class PermDir(object):
181 def __init__(self, path):
184 def create_experiment_controller(xml, access_config = None):
185 mode = None if not access_config \
186 else access_config.get_attribute_value(DC.DEPLOYMENT_MODE)
187 launch = True if not access_config \
188 else not access_config.get_attribute_value(DC.RECOVER)
189 if not mode or mode == DC.MODE_SINGLE_PROCESS:
191 raise ValueError, "Unsupported instantiation mode: %s with lanch=False" % (mode,)
193 from nepi.core.execute import ExperimentController
195 if not access_config or not access_config.has_attribute(DC.ROOT_DIRECTORY):
198 root_dir = PermDir(access_config.get_attribute_value(DC.ROOT_DIRECTORY))
199 controller = ExperimentController(xml, root_dir.path)
201 # inject reference to temporary dir, so that it gets cleaned
202 # up at destruction time.
203 controller._tempdir = root_dir
206 elif mode == DC.MODE_DAEMON:
207 (root_dir, log_level, user, host, port, key, agent, environment_setup) = \
208 get_access_config_params(access_config)
209 return ExperimentControllerProxy(root_dir, log_level,
210 experiment_xml = xml, host = host, port = port, user = user, ident_key = key,
211 agent = agent, launch = launch,
212 environment_setup = environment_setup)
213 raise RuntimeError("Unsupported access configuration '%s'" % mode)
215 def create_testbed_controller(testbed_id, testbed_version, access_config):
216 mode = None if not access_config \
217 else access_config.get_attribute_value(DC.DEPLOYMENT_MODE)
218 launch = True if not access_config \
219 else not access_config.get_attribute_value(DC.RECOVER)
220 if not mode or mode == DC.MODE_SINGLE_PROCESS:
222 raise ValueError, "Unsupported instantiation mode: %s with lanch=False" % (mode,)
223 return _build_testbed_controller(testbed_id, testbed_version)
224 elif mode == DC.MODE_DAEMON:
225 (root_dir, log_level, user, host, port, key, agent, environment_setup) = \
226 get_access_config_params(access_config)
227 return TestbedControllerProxy(root_dir, log_level, testbed_id = testbed_id,
228 testbed_version = testbed_version, host = host, port = port, ident_key = key,
229 user = user, agent = agent, launch = launch,
230 environment_setup = environment_setup)
231 raise RuntimeError("Unsupported access configuration '%s'" % mode)
233 def _build_testbed_controller(testbed_id, testbed_version):
234 mod_name = "nepi.testbeds.%s" % (testbed_id.lower())
235 if not mod_name in sys.modules:
237 module = sys.modules[mod_name]
238 return module.TestbedController(testbed_version)
240 # Just a namespace class
244 def pickled_data(sdata):
245 return cPickle.loads(base64.b64decode(sdata))
248 def base64_data(sdata):
249 return base64.b64decode(sdata)
253 return None if sdata == "None" else int(sdata)
257 return sdata == 'True'
261 def pickled_data(data):
262 return base64.b64encode(cPickle.dumps(data))
265 def base64_data(data):
266 return base64.b64encode(data)
270 return "None" if data is None else int(data)
274 return str(bool(data))
276 # import into Marshalling all the decoders
280 for typname, typ in vars(Decoders).iteritems()
281 if not typname.startswith('_')
284 _TYPE_ENCODERS = dict([
285 # id(type) -> (<encoding_function>, <formatting_string>)
286 (typname, (getattr(Encoders,typname),"%s"))
287 for typname in vars(Decoders)
288 if not typname.startswith('_')
289 and hasattr(Encoders,typname)
293 _TYPE_ENCODERS["float"] = (float, "%r")
294 _TYPE_ENCODERS["int"] = (int, "%d")
295 _TYPE_ENCODERS["long"] = (int, "%d")
296 _TYPE_ENCODERS["str"] = (str, "%s")
297 _TYPE_ENCODERS["unicode"] = (str, "%s")
300 _TYPE_ENCODERS[None] = (str, "%s")
305 Decorator that converts the given function into one that takes
306 a single "params" list, with each parameter marshalled according
307 to the given factory callable (type constructors are accepted).
309 The first argument (self) is left untouched.
313 @Marshalling.args(int,int,str,base64_data)
314 def somefunc(self, someint, otherint, somestr, someb64):
319 def rv(self, params):
320 return f(self, *[ ctor(val)
321 for ctor,val in zip(types, params[1:]) ])
325 # Derive type encoders by looking up types in _TYPE_ENCODERS
326 # make_proxy will use it to encode arguments in command strings
328 TYPE_ENCODERS = Marshalling._TYPE_ENCODERS
330 if typ.__name__ in TYPE_ENCODERS:
331 argencoders.append(TYPE_ENCODERS[typ.__name__])
334 argencoders.append(TYPE_ENCODERS[None])
336 rv._argencoders = tuple(argencoders)
338 rv._retval = getattr(f, '_retval', None)
343 def retval(typ=Decoders.base64_data):
345 Decorator that converts the given function into one that
346 returns a properly encoded return string, given that the undecorated
347 function returns suitable input for the encoding function.
349 The optional typ argument specifies a type.
350 For the default of base64_data, return values should be strings.
351 The return value of the encoding method should be a string always.
355 @Marshalling.args(int,int,str,base64_data)
356 @Marshalling.retval(str)
357 def somefunc(self, someint, otherint, somestr, someb64):
360 encode, fmt = Marshalling._TYPE_ENCODERS.get(
362 Marshalling._TYPE_ENCODERS[None])
367 def rv(self, *p, **kw):
368 data = f(self, *p, **kw)
374 rv._argtypes = getattr(f, '_argtypes', None)
375 rv._argencoders = getattr(f, '_argencoders', None)
382 Decorator that converts the given function into one that
383 always return an encoded empty string.
385 Useful for null-returning functions.
390 def rv(self, *p, **kw):
395 rv._argtypes = getattr(f, '_argtypes', None)
396 rv._argencoders = getattr(f, '_argencoders', None)
400 def handles(whichcommand):
402 Associates the method with a given command code for servers.
403 It should always be the topmost decorator.
406 f._handles_command = whichcommand
410 class BaseServer(server.Server):
411 def reply_action(self, msg):
413 result = base64.b64encode("Invalid command line")
414 reply = "%d|%s" % (ERROR, result)
416 params = msg.split("|")
417 instruction = int(params[0])
418 log_msg(self, params)
420 for mname,meth in vars(self.__class__).iteritems():
421 if not mname.startswith('_'):
422 cmd = getattr(meth, '_handles_command', None)
423 if cmd == instruction:
424 meth = getattr(self, mname)
428 error = "Invalid instruction %s" % instruction
429 self.log_error(error)
430 result = base64.b64encode(error)
431 reply = "%d|%s" % (ERROR, result)
433 error = self.log_error()
434 result = base64.b64encode(error)
435 reply = "%d|%s" % (ERROR, result)
436 log_reply(self, reply)
439 class TestbedControllerServer(BaseServer):
440 def __init__(self, root_dir, log_level, testbed_id, testbed_version):
441 super(TestbedControllerServer, self).__init__(root_dir, log_level)
442 self._testbed_id = testbed_id
443 self._testbed_version = testbed_version
446 def post_daemonize(self):
447 self._testbed = _build_testbed_controller(self._testbed_id,
448 self._testbed_version)
450 @Marshalling.handles(GUIDS)
452 @Marshalling.retval( Marshalling.pickled_data )
454 return self._testbed.guids
456 @Marshalling.handles(TESTBED_ID)
458 @Marshalling.retval()
459 def testbed_id(self):
460 return str(self._testbed.testbed_id)
462 @Marshalling.handles(TESTBED_VERSION)
464 @Marshalling.retval()
465 def testbed_version(self):
466 return str(self._testbed.testbed_version)
468 @Marshalling.handles(CREATE)
469 @Marshalling.args(int, str)
471 def defer_create(self, guid, factory_id):
472 self._testbed.defer_create(guid, factory_id)
474 @Marshalling.handles(TRACE)
475 @Marshalling.args(int, str, Marshalling.base64_data)
476 @Marshalling.retval()
477 def trace(self, guid, trace_id, attribute):
478 return self._testbed.trace(guid, trace_id, attribute)
480 @Marshalling.handles(TRACES_INFO)
482 @Marshalling.retval( Marshalling.pickled_data )
483 def traces_info(self):
484 return self._testbed.traces_info()
486 @Marshalling.handles(START)
490 self._testbed.start()
492 @Marshalling.handles(STOP)
498 @Marshalling.handles(SHUTDOWN)
502 self._testbed.shutdown()
504 @Marshalling.handles(CONFIGURE)
505 @Marshalling.args(Marshalling.base64_data, Marshalling.pickled_data)
507 def defer_configure(self, name, value):
508 self._testbed.defer_configure(name, value)
510 @Marshalling.handles(CREATE_SET)
511 @Marshalling.args(int, Marshalling.base64_data, Marshalling.pickled_data)
513 def defer_create_set(self, guid, name, value):
514 self._testbed.defer_create_set(guid, name, value)
516 @Marshalling.handles(FACTORY_SET)
517 @Marshalling.args(Marshalling.base64_data, Marshalling.pickled_data)
519 def defer_factory_set(self, name, value):
520 self._testbed.defer_factory_set(name, value)
522 @Marshalling.handles(CONNECT)
523 @Marshalling.args(int, str, int, str)
525 def defer_connect(self, guid1, connector_type_name1, guid2, connector_type_name2):
526 self._testbed.defer_connect(guid1, connector_type_name1, guid2,
527 connector_type_name2)
529 @Marshalling.handles(CROSS_CONNECT)
530 @Marshalling.args(int, str, int, int, str, str, str)
532 def defer_cross_connect(self,
533 guid, connector_type_name,
534 cross_guid, cross_testbed_guid,
535 cross_testbed_id, cross_factory_id,
536 cross_connector_type_name):
537 self._testbed.defer_cross_connect(guid, connector_type_name, cross_guid,
538 cross_testbed_guid, cross_testbed_id, cross_factory_id,
539 cross_connector_type_name)
541 @Marshalling.handles(ADD_TRACE)
542 @Marshalling.args(int, str)
544 def defer_add_trace(self, guid, trace_id):
545 self._testbed.defer_add_trace(guid, trace_id)
547 @Marshalling.handles(ADD_ADDRESS)
548 @Marshalling.args(int, str, int, str)
550 def defer_add_address(self, guid, address, netprefix, broadcast):
551 self._testbed.defer_add_address(guid, address, netprefix,
554 @Marshalling.handles(ADD_ROUTE)
555 @Marshalling.args(int, str, int, str)
557 def defer_add_route(self, guid, destination, netprefix, nexthop):
558 self._testbed.defer_add_route(guid, destination, netprefix, nexthop)
560 @Marshalling.handles(DO_SETUP)
564 self._testbed.do_setup()
566 @Marshalling.handles(DO_CREATE)
570 self._testbed.do_create()
572 @Marshalling.handles(DO_CONNECT_INIT)
575 def do_connect_init(self):
576 self._testbed.do_connect_init()
578 @Marshalling.handles(DO_CONNECT_COMPL)
581 def do_connect_compl(self):
582 self._testbed.do_connect_compl()
584 @Marshalling.handles(DO_CONFIGURE)
587 def do_configure(self):
588 self._testbed.do_configure()
590 @Marshalling.handles(DO_PRECONFIGURE)
593 def do_preconfigure(self):
594 self._testbed.do_preconfigure()
596 @Marshalling.handles(DO_PRESTART)
599 def do_prestart(self):
600 self._testbed.do_prestart()
602 @Marshalling.handles(DO_CROSS_CONNECT_INIT)
603 @Marshalling.args( Marshalling.Decoders.pickled_data )
605 def do_cross_connect_init(self, cross_data):
606 self._testbed.do_cross_connect_init(cross_data)
608 @Marshalling.handles(DO_CROSS_CONNECT_COMPL)
609 @Marshalling.args( Marshalling.Decoders.pickled_data )
611 def do_cross_connect_compl(self, cross_data):
612 self._testbed.do_cross_connect_compl(cross_data)
614 @Marshalling.handles(GET)
615 @Marshalling.args(int, Marshalling.base64_data, str)
616 @Marshalling.retval( Marshalling.pickled_data )
617 def get(self, guid, name, time):
618 return self._testbed.get(guid, name, time)
620 @Marshalling.handles(SET)
621 @Marshalling.args(int, Marshalling.base64_data, Marshalling.pickled_data, str)
623 def set(self, guid, name, value, time):
624 self._testbed.set(guid, name, value, time)
626 @Marshalling.handles(GET_ADDRESS)
627 @Marshalling.args(int, int, Marshalling.base64_data)
628 @Marshalling.retval()
629 def get_address(self, guid, index, attribute):
630 return str(self._testbed.get_address(guid, index, attribute))
632 @Marshalling.handles(GET_ROUTE)
633 @Marshalling.args(int, int, Marshalling.base64_data)
634 @Marshalling.retval()
635 def get_route(self, guid, index, attribute):
636 return str(self._testbed.get_route(guid, index, attribute))
638 @Marshalling.handles(ACTION)
639 @Marshalling.args(str, int, Marshalling.base64_data)
641 def action(self, time, guid, command):
642 self._testbed.action(time, guid, command)
644 @Marshalling.handles(STATUS)
645 @Marshalling.args(Marshalling.nullint)
646 @Marshalling.retval(int)
647 def status(self, guid):
648 return self._testbed.status(guid)
650 @Marshalling.handles(GET_ATTRIBUTE_LIST)
651 @Marshalling.args(int)
652 @Marshalling.retval( Marshalling.pickled_data )
653 def get_attribute_list(self, guid):
654 return self._testbed.get_attribute_list(guid)
656 @Marshalling.handles(GET_FACTORY_ID)
657 @Marshalling.args(int)
658 @Marshalling.retval()
659 def get_factory_id(self, guid):
660 return self._testbed.get_factory_id(guid)
662 class ExperimentControllerServer(BaseServer):
663 def __init__(self, root_dir, log_level, experiment_xml):
664 super(ExperimentControllerServer, self).__init__(root_dir, log_level)
665 self._experiment_xml = experiment_xml
666 self._experiment = None
668 def post_daemonize(self):
669 from nepi.core.execute import ExperimentController
670 self._experiment = ExperimentController(self._experiment_xml,
671 root_dir = self._root_dir)
673 @Marshalling.handles(GUIDS)
675 @Marshalling.retval( Marshalling.pickled_data )
677 return self._experiment.guids
679 @Marshalling.handles(XML)
681 @Marshalling.retval()
682 def experiment_xml(self):
683 return self._experiment.experiment_xml
685 @Marshalling.handles(TRACE)
686 @Marshalling.args(int, str, Marshalling.base64_data)
687 @Marshalling.retval()
688 def trace(self, guid, trace_id, attribute):
689 return str(self._experiment.trace(guid, trace_id, attribute))
691 @Marshalling.handles(TRACES_INFO)
693 @Marshalling.retval( Marshalling.pickled_data )
694 def traces_info(self):
695 return self._experiment.traces_info()
697 @Marshalling.handles(FINISHED)
698 @Marshalling.args(int)
699 @Marshalling.retval(Marshalling.bool)
700 def is_finished(self, guid):
701 return self._experiment.is_finished(guid)
703 @Marshalling.handles(GET)
704 @Marshalling.args(int, Marshalling.base64_data, str)
705 @Marshalling.retval( Marshalling.pickled_data )
706 def get(self, guid, name, time):
707 return self._experiment.get(guid, name, time)
709 @Marshalling.handles(SET)
710 @Marshalling.args(int, Marshalling.base64_data, Marshalling.pickled_data, str)
712 def set(self, guid, name, value, time):
713 self._experiment.set(guid, name, value, time)
715 @Marshalling.handles(START)
719 self._experiment.start()
721 @Marshalling.handles(STOP)
725 self._experiment.stop()
727 @Marshalling.handles(RECOVER)
731 self._experiment.recover()
733 @Marshalling.handles(SHUTDOWN)
737 self._experiment.shutdown()
739 @Marshalling.handles(GET_TESTBED_ID)
740 @Marshalling.args(int)
741 @Marshalling.retval()
742 def get_testbed_id(self, guid):
743 return self._experiment.get_testbed_id(guid)
745 @Marshalling.handles(GET_FACTORY_ID)
746 @Marshalling.args(int)
747 @Marshalling.retval()
748 def get_factory_id(self, guid):
749 return self._experiment.get_factory_id(guid)
751 @Marshalling.handles(GET_TESTBED_VERSION)
752 @Marshalling.args(int)
753 @Marshalling.retval()
754 def get_testbed_version(self, guid):
755 return self._experiment.get_testbed_version(guid)
757 class BaseProxy(object):
759 _ServerClassModule = "nepi.util.proxy"
763 launch = True, host = None,
764 port = None, user = None, ident_key = None, agent = None,
765 environment_setup = ""):
770 "from %(classmodule)s import %(classname)s;"
771 "s = %(classname)s%(ctor_args)r;"
774 classname = self._ServerClass.__name__,
775 classmodule = self._ServerClassModule,
776 ctor_args = ctor_args
778 proc = server.popen_ssh_subprocess(python_code, host = host,
779 port = port, user = user, agent = agent,
780 ident_key = ident_key,
781 environment_setup = environment_setup,
784 err = proc.stderr.read()
785 raise RuntimeError, "Server could not be executed: %s" % (err,)
788 s = self._ServerClass(*ctor_args)
791 # connect client to server
792 self._client = server.Client(root_dir, host = host, port = port,
793 user = user, agent = agent,
794 environment_setup = environment_setup)
797 def _make_message(argtypes, argencoders, command, methname, classname, *args):
798 if len(argtypes) != len(argencoders):
799 raise ValueError, "Invalid arguments for _make_message: "\
800 "in stub method %s of class %s "\
801 "argtypes and argencoders must match in size" % (
802 methname, classname )
803 if len(argtypes) != len(args):
804 raise ValueError, "Invalid arguments for _make_message: "\
805 "in stub method %s of class %s "\
806 "expected %d arguments, got %d" % (
808 len(argtypes), len(args))
811 for argnum, (typ, (encode, fmt), val) in enumerate(zip(argtypes, argencoders, args)):
813 buf.append(fmt % encode(val))
816 raise TypeError, "Argument %d of stub method %s of class %s "\
817 "requires a value of type %s, but got %s - nested error: %s" % (
818 argnum, methname, classname,
819 getattr(typ, '__name__', typ), type(val),
820 traceback.format_exc()
823 return "%d|%s" % (command, '|'.join(buf))
826 def _parse_reply(rvtype, methname, classname, reply):
828 raise RuntimeError, "Invalid reply: %r "\
829 "for stub method %s of class %s" % (
835 result = reply.split("|")
836 code = int(result[0])
840 raise TypeError, "Return value of stub method %s of class %s "\
841 "cannot be parsed: must be of type %s, got %r - nested error: %s" % (
843 getattr(rvtype, '__name__', rvtype), reply,
844 traceback.format_exc()
847 text = base64.b64decode(text)
848 raise RuntimeError(text)
857 raise TypeError, "Return value of stub method %s of class %s "\
858 "cannot be parsed: must be of type %s - nested error: %s" % (
860 getattr(rvtype, '__name__', rvtype),
861 traceback.format_exc()
864 raise RuntimeError, "Invalid reply: %r "\
865 "for stub method %s of class %s - unknown code" % (
871 def _make_stubs(server_class, template_class):
873 Returns a dictionary method_name -> method
878 class SomeProxy(BaseProxy):
881 locals().update( BaseProxy._make_stubs(
886 ServerClass is the corresponding Server class, as
887 specified in the _ServerClass class method (_make_stubs
888 is static and can't access the method), and TemplateClass
889 is the ultimate implementation class behind the server,
890 from which argument names and defaults are taken, to
891 maintain meaningful interfaces.
898 func_template_path = os.path.join(
899 os.path.dirname(__file__),
901 func_template_file = open(func_template_path, "r")
902 func_template = func_template_file.read()
903 func_template_file.close()
905 for methname in vars(template_class).copy():
906 if methname.endswith('_deferred'):
907 # cannot wrap deferreds...
909 dmethname = methname+'_deferred'
910 if hasattr(server_class, methname) and not methname.startswith('_'):
911 template_meth = getattr(template_class, methname)
912 server_meth = getattr(server_class, methname)
914 command = getattr(server_meth, '_handles_command', None)
915 argtypes = getattr(server_meth, '_argtypes', None)
916 argencoders = getattr(server_meth, '_argencoders', None)
917 rvtype = getattr(server_meth, '_retval', None)
920 if hasattr(template_meth, 'fget'):
922 template_meth = template_meth.fget
925 if command is not None and argtypes is not None and argencoders is not None:
926 # We have an interface method...
927 code = template_meth.func_code
928 argnames = code.co_varnames[:code.co_argcount]
929 argdefaults = ( (NONE,) * (len(argnames) - len(template_meth.func_defaults or ()))
930 + (template_meth.func_defaults or ()) )
933 BaseProxy = BaseProxy,
935 argencoders = argencoders,
937 functools = functools,
941 func_text = func_template % dict(
943 args = '%s' % (','.join(argnames[1:])),
945 argname if argdef is NONE
946 else "%s=%r" % (argname, argdef)
947 for argname, argdef in zip(argnames[1:], argdefaults[1:])
951 classname = server_class.__name__
959 exec func_text in func_globals, context
962 rv[methname] = property(context[methname])
963 rv[dmethname] = property(context[dmethname])
965 rv[methname] = context[methname]
966 rv[dmethname] = context[dmethname]
968 # inject _deferred into core classes
969 if hasattr(template_class, methname) and not hasattr(template_class, dmethname):
970 def freezename(methname, dmethname):
971 def dmeth(self, *p, **kw):
972 return getattr(self, methname)(*p, **kw)
973 dmeth.__name__ = dmethname
975 dmeth = freezename(methname, dmethname)
976 setattr(template_class, dmethname, dmeth)
980 class TestbedControllerProxy(BaseProxy):
982 _ServerClass = TestbedControllerServer
984 def __init__(self, root_dir, log_level, testbed_id = None,
985 testbed_version = None, launch = True, host = None,
986 port = None, user = None, ident_key = None, agent = None,
987 environment_setup = ""):
988 if launch and (testbed_id == None or testbed_version == None):
989 raise RuntimeError("To launch a TesbedControllerServer a "
990 "testbed_id and testbed_version are required")
991 super(TestbedControllerProxy,self).__init__(
992 ctor_args = (root_dir, log_level, testbed_id, testbed_version),
994 launch = launch, host = host, port = port, user = user,
995 ident_key = ident_key, agent = agent,
996 environment_setup = environment_setup)
998 locals().update( BaseProxy._make_stubs(
999 server_class = TestbedControllerServer,
1000 template_class = nepi.core.execute.TestbedController,
1003 # Shutdown stops the serverside...
1004 def shutdown(self, _stub = shutdown):
1006 self._client.send_stop()
1007 self._client.read_reply() # wait for it
1011 class ExperimentControllerProxy(BaseProxy):
1012 _ServerClass = ExperimentControllerServer
1014 def __init__(self, root_dir, log_level, experiment_xml = None,
1015 launch = True, host = None, port = None, user = None,
1016 ident_key = None, agent = None, environment_setup = ""):
1017 if launch and experiment_xml is None:
1018 raise RuntimeError("To launch a ExperimentControllerServer a \
1019 xml description of the experiment is required")
1020 super(ExperimentControllerProxy,self).__init__(
1021 ctor_args = (root_dir, log_level, experiment_xml),
1022 root_dir = root_dir,
1023 launch = launch, host = host, port = port, user = user,
1024 ident_key = ident_key, agent = agent,
1025 environment_setup = environment_setup)
1027 locals().update( BaseProxy._make_stubs(
1028 server_class = ExperimentControllerServer,
1029 template_class = nepi.core.execute.ExperimentController,
1033 # Shutdown stops the serverside...
1034 def shutdown(self, _stub = shutdown):
1036 self._client.send_stop()
1037 self._client.read_reply() # wait for it