2 # -*- coding: utf-8 -*-
5 import nepi.core.execute
6 from nepi.core.attributes import AttributesMap, Attribute
7 from nepi.util import server, validation
8 from nepi.util.constants import TIME_NOW, ATTR_NEPI_TESTBED_ENVIRONMENT_SETUP, DeploymentConfiguration as DC
21 # PROTOCOL INSTRUCTION MESSAGES
41 DO_CROSS_CONNECT_INIT = 22
51 GET_ATTRIBUTE_LIST = 32
53 DO_CROSS_CONNECT_COMPL = 34
59 GET_TESTBED_VERSION = 40
63 instruction_text = dict({
74 CONFIGURE: "CONFIGURE",
76 CREATE_SET: "CREATE_SET",
77 FACTORY_SET: "FACTORY_SET",
79 CROSS_CONNECT: "CROSS_CONNECT",
80 ADD_TRACE: "ADD_TRACE",
81 ADD_ADDRESS: "ADD_ADDRESS",
82 ADD_ROUTE: "ADD_ROUTE",
84 DO_CREATE: "DO_CREATE",
85 DO_CONNECT_INIT: "DO_CONNECT_INIT",
86 DO_CONNECT_COMPL: "DO_CONNECT_COMPL",
87 DO_CONFIGURE: "DO_CONFIGURE",
88 DO_PRECONFIGURE: "DO_PRECONFIGURE",
89 DO_CROSS_CONNECT_INIT: "DO_CROSS_CONNECT_INIT",
90 DO_CROSS_CONNECT_COMPL: "DO_CROSS_CONNECT_COMPL",
93 GET_ROUTE: "GET_ROUTE",
94 GET_ADDRESS: "GET_ADDRESS",
95 GET_ATTRIBUTE_LIST: "GET_ATTRIBUTE_LIST",
96 GET_FACTORY_ID: "GET_FACTORY_ID",
97 GET_TESTBED_ID: "GET_TESTBED_ID",
98 GET_TESTBED_VERSION: "GET_TESTBED_VERSION",
102 TESTBED_ID: "TESTBED_ID",
103 TESTBED_VERSION: "TESTBED_VERSION",
104 TRACES_INFO: "TRACES_INFO",
107 def log_msg(server, params):
109 instr = int(params[0])
110 instr_txt = instruction_text[instr]
111 server.log_debug("%s - msg: %s [%s]" % (server.__class__.__name__,
112 instr_txt, ", ".join(map(str, params[1:]))))
114 # don't die for logging
117 def log_reply(server, reply):
119 res = reply.split("|")
121 code_txt = instruction_text[code]
123 txt = base64.b64decode(res[1])
126 server.log_debug("%s - reply: %s %s" % (server.__class__.__name__,
129 # don't die for logging
130 server.log_debug("%s - reply: %s" % (server.__class__.__name__,
134 def to_server_log_level(log_level):
137 if log_level == DC.DEBUG_LEVEL
138 else server.ERROR_LEVEL
141 def get_access_config_params(access_config):
142 root_dir = access_config.get_attribute_value(DC.ROOT_DIRECTORY)
143 log_level = access_config.get_attribute_value(DC.LOG_LEVEL)
144 log_level = to_server_log_level(log_level)
145 user = host = port = agent = key = None
146 communication = access_config.get_attribute_value(DC.DEPLOYMENT_COMMUNICATION)
147 environment_setup = (
148 access_config.get_attribute_value(DC.DEPLOYMENT_ENVIRONMENT_SETUP)
149 if access_config.has_attribute(DC.DEPLOYMENT_ENVIRONMENT_SETUP)
152 if communication == DC.ACCESS_SSH:
153 user = access_config.get_attribute_value(DC.DEPLOYMENT_USER)
154 host = access_config.get_attribute_value(DC.DEPLOYMENT_HOST)
155 port = access_config.get_attribute_value(DC.DEPLOYMENT_PORT)
156 agent = access_config.get_attribute_value(DC.USE_AGENT)
157 key = access_config.get_attribute_value(DC.DEPLOYMENT_KEY)
158 return (root_dir, log_level, user, host, port, key, agent, environment_setup)
160 class AccessConfiguration(AttributesMap):
161 def __init__(self, params = None):
162 super(AccessConfiguration, self).__init__()
164 from nepi.core.metadata import Metadata
166 for _,attr_info in Metadata.DEPLOYMENT_ATTRIBUTES.iteritems():
167 self.add_attribute(**attr_info)
170 for attr_name, attr_value in params.iteritems():
171 parser = Attribute.type_parsers[self.get_attribute_type(attr_name)]
172 attr_value = parser(attr_value)
173 self.set_attribute_value(attr_name, attr_value)
175 class TempDir(object):
177 self.path = tempfile.mkdtemp()
180 shutil.rmtree(self.path)
182 class PermDir(object):
183 def __init__(self, path):
186 def create_experiment_controller(xml, access_config = None):
187 mode = None if not access_config \
188 else access_config.get_attribute_value(DC.DEPLOYMENT_MODE)
189 launch = True if not access_config \
190 else not access_config.get_attribute_value(DC.RECOVER)
191 if not mode or mode == DC.MODE_SINGLE_PROCESS:
193 raise ValueError, "Unsupported instantiation mode: %s with lanch=False" % (mode,)
195 from nepi.core.execute import ExperimentController
197 if not access_config or not access_config.has_attribute(DC.ROOT_DIRECTORY):
200 root_dir = PermDir(access_config.get_attribute_value(DC.ROOT_DIRECTORY))
201 controller = ExperimentController(xml, root_dir.path)
203 # inject reference to temporary dir, so that it gets cleaned
204 # up at destruction time.
205 controller._tempdir = root_dir
208 elif mode == DC.MODE_DAEMON:
209 (root_dir, log_level, user, host, port, key, agent, environment_setup) = \
210 get_access_config_params(access_config)
211 return ExperimentControllerProxy(root_dir, log_level,
212 experiment_xml = xml, host = host, port = port, user = user, ident_key = key,
213 agent = agent, launch = launch,
214 environment_setup = environment_setup)
215 raise RuntimeError("Unsupported access configuration '%s'" % mode)
217 def create_testbed_controller(testbed_id, testbed_version, access_config):
218 mode = None if not access_config \
219 else access_config.get_attribute_value(DC.DEPLOYMENT_MODE)
220 launch = True if not access_config \
221 else not access_config.get_attribute_value(DC.RECOVER)
222 if not mode or mode == DC.MODE_SINGLE_PROCESS:
224 raise ValueError, "Unsupported instantiation mode: %s with lanch=False" % (mode,)
225 return _build_testbed_controller(testbed_id, testbed_version)
226 elif mode == DC.MODE_DAEMON:
227 (root_dir, log_level, user, host, port, key, agent, environment_setup) = \
228 get_access_config_params(access_config)
229 return TestbedControllerProxy(root_dir, log_level, testbed_id = testbed_id,
230 testbed_version = testbed_version, host = host, port = port, ident_key = key,
231 user = user, agent = agent, launch = launch,
232 environment_setup = environment_setup)
233 raise RuntimeError("Unsupported access configuration '%s'" % mode)
235 def _build_testbed_controller(testbed_id, testbed_version):
236 mod_name = "nepi.testbeds.%s" % (testbed_id.lower())
237 if not mod_name in sys.modules:
239 module = sys.modules[mod_name]
240 return module.TestbedController(testbed_version)
242 # Just a namespace class
246 def pickled_data(sdata):
247 return cPickle.loads(base64.b64decode(sdata))
250 def base64_data(sdata):
251 return base64.b64decode(sdata)
255 return None if sdata == "None" else int(sdata)
259 return sdata == 'True'
263 def pickled_data(data):
264 return base64.b64encode(cPickle.dumps(data))
267 def base64_data(data):
268 return base64.b64encode(data)
272 return "None" if data is None else int(data)
276 return str(bool(data))
278 # import into Marshalling all the decoders
282 for typname, typ in vars(Decoders).iteritems()
283 if not typname.startswith('_')
286 _TYPE_ENCODERS = dict([
287 # id(type) -> (<encoding_function>, <formatting_string>)
288 (typname, (getattr(Encoders,typname),"%s"))
289 for typname in vars(Decoders)
290 if not typname.startswith('_')
291 and hasattr(Encoders,typname)
295 _TYPE_ENCODERS["float"] = (float, "%r")
296 _TYPE_ENCODERS["int"] = (int, "%d")
297 _TYPE_ENCODERS["long"] = (int, "%d")
298 _TYPE_ENCODERS["str"] = (str, "%s")
299 _TYPE_ENCODERS["unicode"] = (str, "%s")
302 _TYPE_ENCODERS[None] = (str, "%s")
307 Decorator that converts the given function into one that takes
308 a single "params" list, with each parameter marshalled according
309 to the given factory callable (type constructors are accepted).
311 The first argument (self) is left untouched.
315 @Marshalling.args(int,int,str,base64_data)
316 def somefunc(self, someint, otherint, somestr, someb64):
321 def rv(self, params):
322 return f(self, *[ ctor(val)
323 for ctor,val in zip(types, params[1:]) ])
327 # Derive type encoders by looking up types in _TYPE_ENCODERS
328 # make_proxy will use it to encode arguments in command strings
330 TYPE_ENCODERS = Marshalling._TYPE_ENCODERS
332 if typ.__name__ in TYPE_ENCODERS:
333 argencoders.append(TYPE_ENCODERS[typ.__name__])
336 argencoders.append(TYPE_ENCODERS[None])
338 rv._argencoders = tuple(argencoders)
340 rv._retval = getattr(f, '_retval', None)
345 def retval(typ=Decoders.base64_data):
347 Decorator that converts the given function into one that
348 returns a properly encoded return string, given that the undecorated
349 function returns suitable input for the encoding function.
351 The optional typ argument specifies a type.
352 For the default of base64_data, return values should be strings.
353 The return value of the encoding method should be a string always.
357 @Marshalling.args(int,int,str,base64_data)
358 @Marshalling.retval(str)
359 def somefunc(self, someint, otherint, somestr, someb64):
362 encode, fmt = Marshalling._TYPE_ENCODERS.get(
364 Marshalling._TYPE_ENCODERS[None])
369 def rv(self, *p, **kw):
370 data = f(self, *p, **kw)
376 rv._argtypes = getattr(f, '_argtypes', None)
377 rv._argencoders = getattr(f, '_argencoders', None)
384 Decorator that converts the given function into one that
385 always return an encoded empty string.
387 Useful for null-returning functions.
392 def rv(self, *p, **kw):
397 rv._argtypes = getattr(f, '_argtypes', None)
398 rv._argencoders = getattr(f, '_argencoders', None)
402 def handles(whichcommand):
404 Associates the method with a given command code for servers.
405 It should always be the topmost decorator.
408 f._handles_command = whichcommand
412 class BaseServer(server.Server):
413 def reply_action(self, msg):
415 result = base64.b64encode("Invalid command line")
416 reply = "%d|%s" % (ERROR, result)
418 params = msg.split("|")
419 instruction = int(params[0])
420 log_msg(self, params)
422 for mname,meth in vars(self.__class__).iteritems():
423 if not mname.startswith('_'):
424 cmd = getattr(meth, '_handles_command', None)
425 if cmd == instruction:
426 meth = getattr(self, mname)
430 error = "Invalid instruction %s" % instruction
431 self.log_error(error)
432 result = base64.b64encode(error)
433 reply = "%d|%s" % (ERROR, result)
435 error = self.log_error()
436 result = base64.b64encode(error)
437 reply = "%d|%s" % (ERROR, result)
438 log_reply(self, reply)
441 class TestbedControllerServer(BaseServer):
442 def __init__(self, root_dir, log_level, testbed_id, testbed_version):
443 super(TestbedControllerServer, self).__init__(root_dir, log_level)
444 self._testbed_id = testbed_id
445 self._testbed_version = testbed_version
448 def post_daemonize(self):
449 self._testbed = _build_testbed_controller(self._testbed_id,
450 self._testbed_version)
452 @Marshalling.handles(GUIDS)
454 @Marshalling.retval( Marshalling.pickled_data )
456 return self._testbed.guids
458 @Marshalling.handles(TESTBED_ID)
460 @Marshalling.retval()
461 def testbed_id(self):
462 return str(self._testbed.testbed_id)
464 @Marshalling.handles(TESTBED_VERSION)
466 @Marshalling.retval()
467 def testbed_version(self):
468 return str(self._testbed.testbed_version)
470 @Marshalling.handles(CREATE)
471 @Marshalling.args(int, str)
473 def defer_create(self, guid, factory_id):
474 self._testbed.defer_create(guid, factory_id)
476 @Marshalling.handles(TRACE)
477 @Marshalling.args(int, str, Marshalling.base64_data)
478 @Marshalling.retval()
479 def trace(self, guid, trace_id, attribute):
480 return self._testbed.trace(guid, trace_id, attribute)
482 @Marshalling.handles(TRACES_INFO)
484 @Marshalling.retval( Marshalling.pickled_data )
485 def traces_info(self):
486 return self._testbed.traces_info()
488 @Marshalling.handles(START)
492 self._testbed.start()
494 @Marshalling.handles(STOP)
500 @Marshalling.handles(SHUTDOWN)
504 self._testbed.shutdown()
506 @Marshalling.handles(CONFIGURE)
507 @Marshalling.args(Marshalling.base64_data, Marshalling.pickled_data)
509 def defer_configure(self, name, value):
510 self._testbed.defer_configure(name, value)
512 @Marshalling.handles(CREATE_SET)
513 @Marshalling.args(int, Marshalling.base64_data, Marshalling.pickled_data)
515 def defer_create_set(self, guid, name, value):
516 self._testbed.defer_create_set(guid, name, value)
518 @Marshalling.handles(FACTORY_SET)
519 @Marshalling.args(Marshalling.base64_data, Marshalling.pickled_data)
521 def defer_factory_set(self, name, value):
522 self._testbed.defer_factory_set(name, value)
524 @Marshalling.handles(CONNECT)
525 @Marshalling.args(int, str, int, str)
527 def defer_connect(self, guid1, connector_type_name1, guid2, connector_type_name2):
528 self._testbed.defer_connect(guid1, connector_type_name1, guid2,
529 connector_type_name2)
531 @Marshalling.handles(CROSS_CONNECT)
532 @Marshalling.args(int, str, int, int, str, str, str)
534 def defer_cross_connect(self,
535 guid, connector_type_name,
536 cross_guid, cross_testbed_guid,
537 cross_testbed_id, cross_factory_id,
538 cross_connector_type_name):
539 self._testbed.defer_cross_connect(guid, connector_type_name, cross_guid,
540 cross_testbed_guid, cross_testbed_id, cross_factory_id,
541 cross_connector_type_name)
543 @Marshalling.handles(ADD_TRACE)
544 @Marshalling.args(int, str)
546 def defer_add_trace(self, guid, trace_id):
547 self._testbed.defer_add_trace(guid, trace_id)
549 @Marshalling.handles(ADD_ADDRESS)
550 @Marshalling.args(int, str, int, str)
552 def defer_add_address(self, guid, address, netprefix, broadcast):
553 self._testbed.defer_add_address(guid, address, netprefix,
556 @Marshalling.handles(ADD_ROUTE)
557 @Marshalling.args(int, str, int, str)
559 def defer_add_route(self, guid, destination, netprefix, nexthop):
560 self._testbed.defer_add_route(guid, destination, netprefix, nexthop)
562 @Marshalling.handles(DO_SETUP)
566 self._testbed.do_setup()
568 @Marshalling.handles(DO_CREATE)
572 self._testbed.do_create()
574 @Marshalling.handles(DO_CONNECT_INIT)
577 def do_connect_init(self):
578 self._testbed.do_connect_init()
580 @Marshalling.handles(DO_CONNECT_COMPL)
583 def do_connect_compl(self):
584 self._testbed.do_connect_compl()
586 @Marshalling.handles(DO_CONFIGURE)
589 def do_configure(self):
590 self._testbed.do_configure()
592 @Marshalling.handles(DO_PRECONFIGURE)
595 def do_preconfigure(self):
596 self._testbed.do_preconfigure()
598 @Marshalling.handles(DO_PRESTART)
601 def do_prestart(self):
602 self._testbed.do_prestart()
604 @Marshalling.handles(DO_CROSS_CONNECT_INIT)
605 @Marshalling.args( Marshalling.Decoders.pickled_data )
607 def do_cross_connect_init(self, cross_data):
608 self._testbed.do_cross_connect_init(cross_data)
610 @Marshalling.handles(DO_CROSS_CONNECT_COMPL)
611 @Marshalling.args( Marshalling.Decoders.pickled_data )
613 def do_cross_connect_compl(self, cross_data):
614 self._testbed.do_cross_connect_compl(cross_data)
616 @Marshalling.handles(GET)
617 @Marshalling.args(int, Marshalling.base64_data, str)
618 @Marshalling.retval( Marshalling.pickled_data )
619 def get(self, guid, name, time):
620 return self._testbed.get(guid, name, time)
622 @Marshalling.handles(SET)
623 @Marshalling.args(int, Marshalling.base64_data, Marshalling.pickled_data, str)
625 def set(self, guid, name, value, time):
626 self._testbed.set(guid, name, value, time)
628 @Marshalling.handles(GET_ADDRESS)
629 @Marshalling.args(int, int, Marshalling.base64_data)
630 @Marshalling.retval()
631 def get_address(self, guid, index, attribute):
632 return str(self._testbed.get_address(guid, index, attribute))
634 @Marshalling.handles(GET_ROUTE)
635 @Marshalling.args(int, int, Marshalling.base64_data)
636 @Marshalling.retval()
637 def get_route(self, guid, index, attribute):
638 return str(self._testbed.get_route(guid, index, attribute))
640 @Marshalling.handles(ACTION)
641 @Marshalling.args(str, int, Marshalling.base64_data)
643 def action(self, time, guid, command):
644 self._testbed.action(time, guid, command)
646 @Marshalling.handles(STATUS)
647 @Marshalling.args(Marshalling.nullint)
648 @Marshalling.retval(int)
649 def status(self, guid):
650 return self._testbed.status(guid)
652 @Marshalling.handles(GET_ATTRIBUTE_LIST)
653 @Marshalling.args(int, int)
654 @Marshalling.retval( Marshalling.pickled_data )
655 def get_attribute_list(self, guid, filter_flags = None):
656 return self._testbed.get_attribute_list(guid, filter_flags)
658 @Marshalling.handles(GET_FACTORY_ID)
659 @Marshalling.args(int)
660 @Marshalling.retval()
661 def get_factory_id(self, guid):
662 return self._testbed.get_factory_id(guid)
664 class ExperimentControllerServer(BaseServer):
665 def __init__(self, root_dir, log_level, experiment_xml):
666 super(ExperimentControllerServer, self).__init__(root_dir, log_level)
667 self._experiment_xml = experiment_xml
668 self._experiment = None
670 def post_daemonize(self):
671 from nepi.core.execute import ExperimentController
672 self._experiment = ExperimentController(self._experiment_xml,
673 root_dir = self._root_dir)
675 @Marshalling.handles(GUIDS)
677 @Marshalling.retval( Marshalling.pickled_data )
679 return self._experiment.guids
681 @Marshalling.handles(XML)
683 @Marshalling.retval()
684 def experiment_design_xml(self):
685 return self._experiment.experiment_design_xml
687 @Marshalling.handles(EXEC_XML)
689 @Marshalling.retval()
690 def experiment_execute_xml(self):
691 return self._experiment.experiment_execute_xml
693 @Marshalling.handles(TRACE)
694 @Marshalling.args(int, str, Marshalling.base64_data)
695 @Marshalling.retval()
696 def trace(self, guid, trace_id, attribute):
697 return str(self._experiment.trace(guid, trace_id, attribute))
699 @Marshalling.handles(TRACES_INFO)
701 @Marshalling.retval( Marshalling.pickled_data )
702 def traces_info(self):
703 return self._experiment.traces_info()
705 @Marshalling.handles(FINISHED)
706 @Marshalling.args(int)
707 @Marshalling.retval(Marshalling.bool)
708 def is_finished(self, guid):
709 return self._experiment.is_finished(guid)
711 @Marshalling.handles(GET)
712 @Marshalling.args(int, Marshalling.base64_data, str)
713 @Marshalling.retval( Marshalling.pickled_data )
714 def get(self, guid, name, time):
715 return self._experiment.get(guid, name, time)
717 @Marshalling.handles(SET)
718 @Marshalling.args(int, Marshalling.base64_data, Marshalling.pickled_data, str)
720 def set(self, guid, name, value, time):
721 self._experiment.set(guid, name, value, time)
723 @Marshalling.handles(START)
727 self._experiment.start()
729 @Marshalling.handles(STOP)
733 self._experiment.stop()
735 @Marshalling.handles(RECOVER)
739 self._experiment.recover()
741 @Marshalling.handles(SHUTDOWN)
745 self._experiment.shutdown()
747 @Marshalling.handles(GET_TESTBED_ID)
748 @Marshalling.args(int)
749 @Marshalling.retval()
750 def get_testbed_id(self, guid):
751 return self._experiment.get_testbed_id(guid)
753 @Marshalling.handles(GET_FACTORY_ID)
754 @Marshalling.args(int)
755 @Marshalling.retval()
756 def get_factory_id(self, guid):
757 return self._experiment.get_factory_id(guid)
759 @Marshalling.handles(GET_TESTBED_VERSION)
760 @Marshalling.args(int)
761 @Marshalling.retval()
762 def get_testbed_version(self, guid):
763 return self._experiment.get_testbed_version(guid)
765 class BaseProxy(object):
767 _ServerClassModule = "nepi.util.proxy"
771 launch = True, host = None,
772 port = None, user = None, ident_key = None, agent = None,
773 environment_setup = ""):
778 "from %(classmodule)s import %(classname)s;"
779 "s = %(classname)s%(ctor_args)r;"
782 classname = self._ServerClass.__name__,
783 classmodule = self._ServerClassModule,
784 ctor_args = ctor_args
786 proc = server.popen_ssh_subprocess(python_code, host = host,
787 port = port, user = user, agent = agent,
788 ident_key = ident_key,
789 environment_setup = environment_setup,
792 err = proc.stderr.read()
793 raise RuntimeError, "Server could not be executed: %s" % (err,)
796 s = self._ServerClass(*ctor_args)
799 # connect client to server
800 self._client = server.Client(root_dir, host = host, port = port,
801 user = user, agent = agent,
802 environment_setup = environment_setup)
805 def _make_message(argtypes, argencoders, command, methname, classname, *args):
806 if len(argtypes) != len(argencoders):
807 raise ValueError, "Invalid arguments for _make_message: "\
808 "in stub method %s of class %s "\
809 "argtypes and argencoders must match in size" % (
810 methname, classname )
811 if len(argtypes) != len(args):
812 raise ValueError, "Invalid arguments for _make_message: "\
813 "in stub method %s of class %s "\
814 "expected %d arguments, got %d" % (
816 len(argtypes), len(args))
819 for argnum, (typ, (encode, fmt), val) in enumerate(zip(argtypes, argencoders, args)):
821 buf.append(fmt % encode(val))
824 raise TypeError, "Argument %d of stub method %s of class %s "\
825 "requires a value of type %s, but got %s - nested error: %s" % (
826 argnum, methname, classname,
827 getattr(typ, '__name__', typ), type(val),
828 traceback.format_exc()
831 return "%d|%s" % (command, '|'.join(buf))
834 def _parse_reply(rvtype, methname, classname, reply):
836 raise RuntimeError, "Invalid reply: %r "\
837 "for stub method %s of class %s" % (
843 result = reply.split("|")
844 code = int(result[0])
848 raise TypeError, "Return value of stub method %s of class %s "\
849 "cannot be parsed: must be of type %s, got %r - nested error: %s" % (
851 getattr(rvtype, '__name__', rvtype), reply,
852 traceback.format_exc()
855 text = base64.b64decode(text)
856 raise RuntimeError(text)
865 raise TypeError, "Return value of stub method %s of class %s "\
866 "cannot be parsed: must be of type %s - nested error: %s" % (
868 getattr(rvtype, '__name__', rvtype),
869 traceback.format_exc()
872 raise RuntimeError, "Invalid reply: %r "\
873 "for stub method %s of class %s - unknown code" % (
879 def _make_stubs(server_class, template_class):
881 Returns a dictionary method_name -> method
886 class SomeProxy(BaseProxy):
889 locals().update( BaseProxy._make_stubs(
894 ServerClass is the corresponding Server class, as
895 specified in the _ServerClass class method (_make_stubs
896 is static and can't access the method), and TemplateClass
897 is the ultimate implementation class behind the server,
898 from which argument names and defaults are taken, to
899 maintain meaningful interfaces.
906 func_template_path = os.path.join(
907 os.path.dirname(__file__),
909 func_template_file = open(func_template_path, "r")
910 func_template = func_template_file.read()
911 func_template_file.close()
913 for methname in vars(template_class).copy():
914 if methname.endswith('_deferred'):
915 # cannot wrap deferreds...
917 dmethname = methname+'_deferred'
918 if hasattr(server_class, methname) and not methname.startswith('_'):
919 template_meth = getattr(template_class, methname)
920 server_meth = getattr(server_class, methname)
922 command = getattr(server_meth, '_handles_command', None)
923 argtypes = getattr(server_meth, '_argtypes', None)
924 argencoders = getattr(server_meth, '_argencoders', None)
925 rvtype = getattr(server_meth, '_retval', None)
928 if hasattr(template_meth, 'fget'):
930 template_meth = template_meth.fget
933 if command is not None and argtypes is not None and argencoders is not None:
934 # We have an interface method...
935 code = template_meth.func_code
936 argnames = code.co_varnames[:code.co_argcount]
937 argdefaults = ( (NONE,) * (len(argnames) - len(template_meth.func_defaults or ()))
938 + (template_meth.func_defaults or ()) )
941 BaseProxy = BaseProxy,
943 argencoders = argencoders,
945 functools = functools,
949 func_text = func_template % dict(
951 args = '%s' % (','.join(argnames[1:])),
953 argname if argdef is NONE
954 else "%s=%r" % (argname, argdef)
955 for argname, argdef in zip(argnames[1:], argdefaults[1:])
959 classname = server_class.__name__
967 exec func_text in func_globals, context
970 rv[methname] = property(context[methname])
971 rv[dmethname] = property(context[dmethname])
973 rv[methname] = context[methname]
974 rv[dmethname] = context[dmethname]
976 # inject _deferred into core classes
977 if hasattr(template_class, methname) and not hasattr(template_class, dmethname):
978 def freezename(methname, dmethname):
979 def dmeth(self, *p, **kw):
980 return getattr(self, methname)(*p, **kw)
981 dmeth.__name__ = dmethname
983 dmeth = freezename(methname, dmethname)
984 setattr(template_class, dmethname, dmeth)
988 class TestbedControllerProxy(BaseProxy):
990 _ServerClass = TestbedControllerServer
992 def __init__(self, root_dir, log_level, testbed_id = None,
993 testbed_version = None, launch = True, host = None,
994 port = None, user = None, ident_key = None, agent = None,
995 environment_setup = ""):
996 if launch and (testbed_id == None or testbed_version == None):
997 raise RuntimeError("To launch a TesbedControllerServer a "
998 "testbed_id and testbed_version are required")
999 super(TestbedControllerProxy,self).__init__(
1000 ctor_args = (root_dir, log_level, testbed_id, testbed_version),
1001 root_dir = root_dir,
1002 launch = launch, host = host, port = port, user = user,
1003 ident_key = ident_key, agent = agent,
1004 environment_setup = environment_setup)
1006 locals().update( BaseProxy._make_stubs(
1007 server_class = TestbedControllerServer,
1008 template_class = nepi.core.execute.TestbedController,
1011 # Shutdown stops the serverside...
1012 def shutdown(self, _stub = shutdown):
1014 self._client.send_stop()
1015 self._client.read_reply() # wait for it
1019 class ExperimentControllerProxy(BaseProxy):
1020 _ServerClass = ExperimentControllerServer
1022 def __init__(self, root_dir, log_level, experiment_xml = None,
1023 launch = True, host = None, port = None, user = None,
1024 ident_key = None, agent = None, environment_setup = ""):
1025 if launch and experiment_xml is None:
1026 raise RuntimeError("To launch a ExperimentControllerServer a \
1027 xml description of the experiment is required")
1028 super(ExperimentControllerProxy,self).__init__(
1029 ctor_args = (root_dir, log_level, experiment_xml),
1030 root_dir = root_dir,
1031 launch = launch, host = host, port = port, user = user,
1032 ident_key = ident_key, agent = agent,
1033 environment_setup = environment_setup)
1035 locals().update( BaseProxy._make_stubs(
1036 server_class = ExperimentControllerServer,
1037 template_class = nepi.core.execute.ExperimentController,
1041 # Shutdown stops the serverside...
1042 def shutdown(self, _stub = shutdown):
1044 self._client.send_stop()
1045 self._client.read_reply() # wait for it