2 # -*- coding: utf-8 -*-
5 import nepi.core.execute
6 from nepi.core.attributes import AttributesMap, Attribute
7 from nepi.util import server, validation
8 from nepi.util.constants import TIME_NOW, ATTR_NEPI_TESTBED_ENVIRONMENT_SETUP, DeploymentConfiguration as DC
21 # PROTOCOL INSTRUCTION MESSAGES
41 DO_CROSS_CONNECT_INIT = 22
51 GET_ATTRIBUTE_LIST = 32
53 DO_CROSS_CONNECT_COMPL = 34
59 GET_TESTBED_VERSION = 40
63 instruction_text = dict({
74 CONFIGURE: "CONFIGURE",
76 CREATE_SET: "CREATE_SET",
77 FACTORY_SET: "FACTORY_SET",
79 CROSS_CONNECT: "CROSS_CONNECT",
80 ADD_TRACE: "ADD_TRACE",
81 ADD_ADDRESS: "ADD_ADDRESS",
82 ADD_ROUTE: "ADD_ROUTE",
84 DO_CREATE: "DO_CREATE",
85 DO_CONNECT_INIT: "DO_CONNECT_INIT",
86 DO_CONNECT_COMPL: "DO_CONNECT_COMPL",
87 DO_CONFIGURE: "DO_CONFIGURE",
88 DO_PRECONFIGURE: "DO_PRECONFIGURE",
89 DO_CROSS_CONNECT_INIT: "DO_CROSS_CONNECT_INIT",
90 DO_CROSS_CONNECT_COMPL: "DO_CROSS_CONNECT_COMPL",
93 GET_ROUTE: "GET_ROUTE",
94 GET_ADDRESS: "GET_ADDRESS",
95 GET_ATTRIBUTE_LIST: "GET_ATTRIBUTE_LIST",
96 GET_FACTORY_ID: "GET_FACTORY_ID",
97 GET_TESTBED_ID: "GET_TESTBED_ID",
98 GET_TESTBED_VERSION: "GET_TESTBED_VERSION",
102 TESTBED_ID: "TESTBED_ID",
103 TESTBED_VERSION: "TESTBED_VERSION",
104 TRACES_INFO: "TRACES_INFO",
107 def log_msg(server, params):
109 instr = int(params[0])
110 instr_txt = instruction_text[instr]
111 server.log_debug("%s - msg: %s [%s]" % (server.__class__.__name__,
112 instr_txt, ", ".join(map(str, params[1:]))))
114 # don't die for logging
117 def log_reply(server, reply):
119 res = reply.split("|")
121 code_txt = instruction_text[code]
123 txt = base64.b64decode(res[1])
126 server.log_debug("%s - reply: %s %s" % (server.__class__.__name__,
129 # don't die for logging
130 server.log_debug("%s - reply: %s" % (server.__class__.__name__,
134 def to_server_log_level(log_level):
137 if log_level == DC.DEBUG_LEVEL
138 else server.ERROR_LEVEL
141 def get_access_config_params(access_config):
142 root_dir = access_config.get_attribute_value(DC.ROOT_DIRECTORY)
143 log_level = access_config.get_attribute_value(DC.LOG_LEVEL)
144 log_level = to_server_log_level(log_level)
145 user = host = port = agent = key = None
146 communication = access_config.get_attribute_value(DC.DEPLOYMENT_COMMUNICATION)
147 environment_setup = (
148 access_config.get_attribute_value(DC.DEPLOYMENT_ENVIRONMENT_SETUP)
149 if access_config.has_attribute(DC.DEPLOYMENT_ENVIRONMENT_SETUP)
152 if communication == DC.ACCESS_SSH:
153 user = access_config.get_attribute_value(DC.DEPLOYMENT_USER)
154 host = access_config.get_attribute_value(DC.DEPLOYMENT_HOST)
155 port = access_config.get_attribute_value(DC.DEPLOYMENT_PORT)
156 agent = access_config.get_attribute_value(DC.USE_AGENT)
157 key = access_config.get_attribute_value(DC.DEPLOYMENT_KEY)
158 return (root_dir, log_level, user, host, port, key, agent, environment_setup)
160 class AccessConfiguration(AttributesMap):
161 def __init__(self, params = None):
162 super(AccessConfiguration, self).__init__()
164 from nepi.core.metadata import Metadata
166 for _,attr_info in Metadata.DEPLOYMENT_ATTRIBUTES.iteritems():
167 self.add_attribute(**attr_info)
170 for attr_name, attr_value in params.iteritems():
171 parser = Attribute.type_parsers[self.get_attribute_type(attr_name)]
172 attr_value = parser(attr_value)
173 self.set_attribute_value(attr_name, attr_value)
175 class TempDir(object):
177 self.path = tempfile.mkdtemp()
180 shutil.rmtree(self.path)
182 class PermDir(object):
183 def __init__(self, path):
186 def create_experiment_controller(xml, access_config = None):
187 mode = None if not access_config \
188 else access_config.get_attribute_value(DC.DEPLOYMENT_MODE)
189 launch = True if not access_config \
190 else not access_config.get_attribute_value(DC.RECOVER)
191 if not mode or mode == DC.MODE_SINGLE_PROCESS:
193 raise ValueError, "Unsupported instantiation mode: %s with lanch=False" % (mode,)
195 from nepi.core.execute import ExperimentController
197 if not access_config or not access_config.has_attribute(DC.ROOT_DIRECTORY):
200 root_dir = PermDir(access_config.get_attribute_value(DC.ROOT_DIRECTORY))
201 controller = ExperimentController(xml, root_dir.path)
203 # inject reference to temporary dir, so that it gets cleaned
204 # up at destruction time.
205 controller._tempdir = root_dir
208 elif mode == DC.MODE_DAEMON:
209 (root_dir, log_level, user, host, port, key, agent, environment_setup) = \
210 get_access_config_params(access_config)
211 return ExperimentControllerProxy(root_dir, log_level,
212 experiment_xml = xml, host = host, port = port, user = user, ident_key = key,
213 agent = agent, launch = launch,
214 environment_setup = environment_setup)
215 raise RuntimeError("Unsupported access configuration '%s'" % mode)
217 def create_testbed_controller(testbed_id, testbed_version, access_config):
218 mode = None if not access_config \
219 else access_config.get_attribute_value(DC.DEPLOYMENT_MODE)
220 launch = True if not access_config \
221 else not access_config.get_attribute_value(DC.RECOVER)
222 if not mode or mode == DC.MODE_SINGLE_PROCESS:
224 raise ValueError, "Unsupported instantiation mode: %s with lanch=False" % (mode,)
225 return _build_testbed_controller(testbed_id, testbed_version)
226 elif mode == DC.MODE_DAEMON:
227 (root_dir, log_level, user, host, port, key, agent, environment_setup) = \
228 get_access_config_params(access_config)
229 return TestbedControllerProxy(root_dir, log_level, testbed_id = testbed_id,
230 testbed_version = testbed_version, host = host, port = port, ident_key = key,
231 user = user, agent = agent, launch = launch,
232 environment_setup = environment_setup)
233 raise RuntimeError("Unsupported access configuration '%s'" % mode)
235 def _build_testbed_controller(testbed_id, testbed_version):
236 mod_name = "nepi.testbeds.%s" % (testbed_id.lower())
237 if not mod_name in sys.modules:
239 module = sys.modules[mod_name]
240 tc = module.TestbedController()
241 if tc.testbed_version != testbed_version:
242 raise RuntimeError("Bad testbed version on testbed %s. Asked for %s, got %s" % \
243 (testbed_id, testbed_version, tc.testbed_version))
246 # Just a namespace class
250 def pickled_data(sdata):
251 return cPickle.loads(base64.b64decode(sdata))
254 def base64_data(sdata):
255 return base64.b64decode(sdata)
259 return None if sdata == "None" else int(sdata)
263 return sdata == 'True'
267 def pickled_data(data):
268 return base64.b64encode(cPickle.dumps(data))
271 def base64_data(data):
272 return base64.b64encode(data)
276 return "None" if data is None else int(data)
280 return str(bool(data))
282 # import into Marshalling all the decoders
286 for typname, typ in vars(Decoders).iteritems()
287 if not typname.startswith('_')
290 _TYPE_ENCODERS = dict([
291 # id(type) -> (<encoding_function>, <formatting_string>)
292 (typname, (getattr(Encoders,typname),"%s"))
293 for typname in vars(Decoders)
294 if not typname.startswith('_')
295 and hasattr(Encoders,typname)
299 _TYPE_ENCODERS["float"] = (float, "%r")
300 _TYPE_ENCODERS["int"] = (int, "%d")
301 _TYPE_ENCODERS["long"] = (int, "%d")
302 _TYPE_ENCODERS["str"] = (str, "%s")
303 _TYPE_ENCODERS["unicode"] = (str, "%s")
306 _TYPE_ENCODERS[None] = (str, "%s")
311 Decorator that converts the given function into one that takes
312 a single "params" list, with each parameter marshalled according
313 to the given factory callable (type constructors are accepted).
315 The first argument (self) is left untouched.
319 @Marshalling.args(int,int,str,base64_data)
320 def somefunc(self, someint, otherint, somestr, someb64):
325 def rv(self, params):
326 return f(self, *[ ctor(val)
327 for ctor,val in zip(types, params[1:]) ])
331 # Derive type encoders by looking up types in _TYPE_ENCODERS
332 # make_proxy will use it to encode arguments in command strings
334 TYPE_ENCODERS = Marshalling._TYPE_ENCODERS
336 if typ.__name__ in TYPE_ENCODERS:
337 argencoders.append(TYPE_ENCODERS[typ.__name__])
340 argencoders.append(TYPE_ENCODERS[None])
342 rv._argencoders = tuple(argencoders)
344 rv._retval = getattr(f, '_retval', None)
349 def retval(typ=Decoders.base64_data):
351 Decorator that converts the given function into one that
352 returns a properly encoded return string, given that the undecorated
353 function returns suitable input for the encoding function.
355 The optional typ argument specifies a type.
356 For the default of base64_data, return values should be strings.
357 The return value of the encoding method should be a string always.
361 @Marshalling.args(int,int,str,base64_data)
362 @Marshalling.retval(str)
363 def somefunc(self, someint, otherint, somestr, someb64):
366 encode, fmt = Marshalling._TYPE_ENCODERS.get(
368 Marshalling._TYPE_ENCODERS[None])
373 def rv(self, *p, **kw):
374 data = f(self, *p, **kw)
380 rv._argtypes = getattr(f, '_argtypes', None)
381 rv._argencoders = getattr(f, '_argencoders', None)
388 Decorator that converts the given function into one that
389 always return an encoded empty string.
391 Useful for null-returning functions.
396 def rv(self, *p, **kw):
401 rv._argtypes = getattr(f, '_argtypes', None)
402 rv._argencoders = getattr(f, '_argencoders', None)
406 def handles(whichcommand):
408 Associates the method with a given command code for servers.
409 It should always be the topmost decorator.
412 f._handles_command = whichcommand
416 class BaseServer(server.Server):
417 def reply_action(self, msg):
419 result = base64.b64encode("Invalid command line")
420 reply = "%d|%s" % (ERROR, result)
422 params = msg.split("|")
423 instruction = int(params[0])
424 log_msg(self, params)
426 for mname,meth in vars(self.__class__).iteritems():
427 if not mname.startswith('_'):
428 cmd = getattr(meth, '_handles_command', None)
429 if cmd == instruction:
430 meth = getattr(self, mname)
434 error = "Invalid instruction %s" % instruction
435 self.log_error(error)
436 result = base64.b64encode(error)
437 reply = "%d|%s" % (ERROR, result)
439 error = self.log_error()
440 result = base64.b64encode(error)
441 reply = "%d|%s" % (ERROR, result)
442 log_reply(self, reply)
445 class TestbedControllerServer(BaseServer):
446 def __init__(self, root_dir, log_level, testbed_id, testbed_version):
447 super(TestbedControllerServer, self).__init__(root_dir, log_level)
448 self._testbed_id = testbed_id
449 self._testbed_version = testbed_version
452 def post_daemonize(self):
453 self._testbed = _build_testbed_controller(self._testbed_id,
454 self._testbed_version)
456 @Marshalling.handles(GUIDS)
458 @Marshalling.retval( Marshalling.pickled_data )
460 return self._testbed.guids
462 @Marshalling.handles(TESTBED_ID)
464 @Marshalling.retval()
465 def testbed_id(self):
466 return str(self._testbed.testbed_id)
468 @Marshalling.handles(TESTBED_VERSION)
470 @Marshalling.retval()
471 def testbed_version(self):
472 return str(self._testbed.testbed_version)
474 @Marshalling.handles(CREATE)
475 @Marshalling.args(int, str)
477 def defer_create(self, guid, factory_id):
478 self._testbed.defer_create(guid, factory_id)
480 @Marshalling.handles(TRACE)
481 @Marshalling.args(int, str, Marshalling.base64_data)
482 @Marshalling.retval()
483 def trace(self, guid, trace_id, attribute):
484 return self._testbed.trace(guid, trace_id, attribute)
486 @Marshalling.handles(TRACES_INFO)
488 @Marshalling.retval( Marshalling.pickled_data )
489 def traces_info(self):
490 return self._testbed.traces_info()
492 @Marshalling.handles(START)
496 self._testbed.start()
498 @Marshalling.handles(STOP)
504 @Marshalling.handles(SHUTDOWN)
508 self._testbed.shutdown()
510 @Marshalling.handles(CONFIGURE)
511 @Marshalling.args(Marshalling.base64_data, Marshalling.pickled_data)
513 def defer_configure(self, name, value):
514 self._testbed.defer_configure(name, value)
516 @Marshalling.handles(CREATE_SET)
517 @Marshalling.args(int, Marshalling.base64_data, Marshalling.pickled_data)
519 def defer_create_set(self, guid, name, value):
520 self._testbed.defer_create_set(guid, name, value)
522 @Marshalling.handles(FACTORY_SET)
523 @Marshalling.args(Marshalling.base64_data, Marshalling.pickled_data)
525 def defer_factory_set(self, name, value):
526 self._testbed.defer_factory_set(name, value)
528 @Marshalling.handles(CONNECT)
529 @Marshalling.args(int, str, int, str)
531 def defer_connect(self, guid1, connector_type_name1, guid2, connector_type_name2):
532 self._testbed.defer_connect(guid1, connector_type_name1, guid2,
533 connector_type_name2)
535 @Marshalling.handles(CROSS_CONNECT)
536 @Marshalling.args(int, str, int, int, str, str, str)
538 def defer_cross_connect(self,
539 guid, connector_type_name,
540 cross_guid, cross_testbed_guid,
541 cross_testbed_id, cross_factory_id,
542 cross_connector_type_name):
543 self._testbed.defer_cross_connect(guid, connector_type_name, cross_guid,
544 cross_testbed_guid, cross_testbed_id, cross_factory_id,
545 cross_connector_type_name)
547 @Marshalling.handles(ADD_TRACE)
548 @Marshalling.args(int, str)
550 def defer_add_trace(self, guid, trace_id):
551 self._testbed.defer_add_trace(guid, trace_id)
553 @Marshalling.handles(ADD_ADDRESS)
554 @Marshalling.args(int, str, int, Marshalling.pickled_data)
556 def defer_add_address(self, guid, address, netprefix, broadcast):
557 self._testbed.defer_add_address(guid, address, netprefix,
560 @Marshalling.handles(ADD_ROUTE)
561 @Marshalling.args(int, str, int, str, int)
563 def defer_add_route(self, guid, destination, netprefix, nexthop, metric):
564 self._testbed.defer_add_route(guid, destination, netprefix, nexthop, metric)
566 @Marshalling.handles(DO_SETUP)
570 self._testbed.do_setup()
572 @Marshalling.handles(DO_CREATE)
576 self._testbed.do_create()
578 @Marshalling.handles(DO_CONNECT_INIT)
581 def do_connect_init(self):
582 self._testbed.do_connect_init()
584 @Marshalling.handles(DO_CONNECT_COMPL)
587 def do_connect_compl(self):
588 self._testbed.do_connect_compl()
590 @Marshalling.handles(DO_CONFIGURE)
593 def do_configure(self):
594 self._testbed.do_configure()
596 @Marshalling.handles(DO_PRECONFIGURE)
599 def do_preconfigure(self):
600 self._testbed.do_preconfigure()
602 @Marshalling.handles(DO_PRESTART)
605 def do_prestart(self):
606 self._testbed.do_prestart()
608 @Marshalling.handles(DO_CROSS_CONNECT_INIT)
609 @Marshalling.args( Marshalling.Decoders.pickled_data )
611 def do_cross_connect_init(self, cross_data):
612 self._testbed.do_cross_connect_init(cross_data)
614 @Marshalling.handles(DO_CROSS_CONNECT_COMPL)
615 @Marshalling.args( Marshalling.Decoders.pickled_data )
617 def do_cross_connect_compl(self, cross_data):
618 self._testbed.do_cross_connect_compl(cross_data)
620 @Marshalling.handles(GET)
621 @Marshalling.args(int, Marshalling.base64_data, str)
622 @Marshalling.retval( Marshalling.pickled_data )
623 def get(self, guid, name, time):
624 return self._testbed.get(guid, name, time)
626 @Marshalling.handles(SET)
627 @Marshalling.args(int, Marshalling.base64_data, Marshalling.pickled_data, str)
629 def set(self, guid, name, value, time):
630 self._testbed.set(guid, name, value, time)
632 @Marshalling.handles(GET_ADDRESS)
633 @Marshalling.args(int, int, Marshalling.base64_data)
634 @Marshalling.retval()
635 def get_address(self, guid, index, attribute):
636 return str(self._testbed.get_address(guid, index, attribute))
638 @Marshalling.handles(GET_ROUTE)
639 @Marshalling.args(int, int, Marshalling.base64_data)
640 @Marshalling.retval()
641 def get_route(self, guid, index, attribute):
642 return str(self._testbed.get_route(guid, index, attribute))
644 @Marshalling.handles(ACTION)
645 @Marshalling.args(str, int, Marshalling.base64_data)
647 def action(self, time, guid, command):
648 self._testbed.action(time, guid, command)
650 @Marshalling.handles(STATUS)
651 @Marshalling.args(Marshalling.nullint)
652 @Marshalling.retval(int)
653 def status(self, guid):
654 return self._testbed.status(guid)
656 @Marshalling.handles(GET_ATTRIBUTE_LIST)
657 @Marshalling.args(int, Marshalling.nullint, Marshalling.bool)
658 @Marshalling.retval( Marshalling.pickled_data )
659 def get_attribute_list(self, guid, filter_flags = None, exclude = False):
660 return self._testbed.get_attribute_list(guid, filter_flags, exclude)
662 @Marshalling.handles(GET_FACTORY_ID)
663 @Marshalling.args(int)
664 @Marshalling.retval()
665 def get_factory_id(self, guid):
666 return self._testbed.get_factory_id(guid)
668 class ExperimentControllerServer(BaseServer):
669 def __init__(self, root_dir, log_level, experiment_xml):
670 super(ExperimentControllerServer, self).__init__(root_dir, log_level)
671 self._experiment_xml = experiment_xml
672 self._experiment = None
674 def post_daemonize(self):
675 from nepi.core.execute import ExperimentController
676 self._experiment = ExperimentController(self._experiment_xml,
677 root_dir = self._root_dir)
679 @Marshalling.handles(GUIDS)
681 @Marshalling.retval( Marshalling.pickled_data )
683 return self._experiment.guids
685 @Marshalling.handles(XML)
687 @Marshalling.retval()
688 def experiment_design_xml(self):
689 return self._experiment.experiment_design_xml
691 @Marshalling.handles(EXEC_XML)
693 @Marshalling.retval()
694 def experiment_execute_xml(self):
695 return self._experiment.experiment_execute_xml
697 @Marshalling.handles(TRACE)
698 @Marshalling.args(int, str, Marshalling.base64_data)
699 @Marshalling.retval()
700 def trace(self, guid, trace_id, attribute):
701 return str(self._experiment.trace(guid, trace_id, attribute))
703 @Marshalling.handles(TRACES_INFO)
705 @Marshalling.retval( Marshalling.pickled_data )
706 def traces_info(self):
707 return self._experiment.traces_info()
709 @Marshalling.handles(FINISHED)
710 @Marshalling.args(int)
711 @Marshalling.retval(Marshalling.bool)
712 def is_finished(self, guid):
713 return self._experiment.is_finished(guid)
715 @Marshalling.handles(GET)
716 @Marshalling.args(int, Marshalling.base64_data, str)
717 @Marshalling.retval( Marshalling.pickled_data )
718 def get(self, guid, name, time):
719 return self._experiment.get(guid, name, time)
721 @Marshalling.handles(SET)
722 @Marshalling.args(int, Marshalling.base64_data, Marshalling.pickled_data, str)
724 def set(self, guid, name, value, time):
725 self._experiment.set(guid, name, value, time)
727 @Marshalling.handles(START)
731 self._experiment.start()
733 @Marshalling.handles(STOP)
737 self._experiment.stop()
739 @Marshalling.handles(RECOVER)
743 self._experiment.recover()
745 @Marshalling.handles(SHUTDOWN)
749 self._experiment.shutdown()
751 @Marshalling.handles(GET_TESTBED_ID)
752 @Marshalling.args(int)
753 @Marshalling.retval()
754 def get_testbed_id(self, guid):
755 return self._experiment.get_testbed_id(guid)
757 @Marshalling.handles(GET_FACTORY_ID)
758 @Marshalling.args(int)
759 @Marshalling.retval()
760 def get_factory_id(self, guid):
761 return self._experiment.get_factory_id(guid)
763 @Marshalling.handles(GET_TESTBED_VERSION)
764 @Marshalling.args(int)
765 @Marshalling.retval()
766 def get_testbed_version(self, guid):
767 return self._experiment.get_testbed_version(guid)
769 class BaseProxy(object):
771 _ServerClassModule = "nepi.util.proxy"
775 launch = True, host = None,
776 port = None, user = None, ident_key = None, agent = None,
777 environment_setup = ""):
782 "from %(classmodule)s import %(classname)s;"
783 "s = %(classname)s%(ctor_args)r;"
786 classname = self._ServerClass.__name__,
787 classmodule = self._ServerClassModule,
788 ctor_args = ctor_args
790 proc = server.popen_ssh_subprocess(python_code, host = host,
791 port = port, user = user, agent = agent,
792 ident_key = ident_key,
793 environment_setup = environment_setup,
796 err = proc.stderr.read()
797 raise RuntimeError, "Server could not be executed: %s" % (err,)
800 s = self._ServerClass(*ctor_args)
803 # connect client to server
804 self._client = server.Client(root_dir, host = host, port = port,
805 user = user, agent = agent,
806 environment_setup = environment_setup)
809 def _make_message(argtypes, argencoders, command, methname, classname, *args):
810 if len(argtypes) != len(argencoders):
811 raise ValueError, "Invalid arguments for _make_message: "\
812 "in stub method %s of class %s "\
813 "argtypes and argencoders must match in size" % (
814 methname, classname )
815 if len(argtypes) != len(args):
816 raise ValueError, "Invalid arguments for _make_message: "\
817 "in stub method %s of class %s "\
818 "expected %d arguments, got %d" % (
820 len(argtypes), len(args))
823 for argnum, (typ, (encode, fmt), val) in enumerate(zip(argtypes, argencoders, args)):
825 buf.append(fmt % encode(val))
828 raise TypeError, "Argument %d of stub method %s of class %s "\
829 "requires a value of type %s, but got %s - nested error: %s" % (
830 argnum, methname, classname,
831 getattr(typ, '__name__', typ), type(val),
832 traceback.format_exc()
835 return "%d|%s" % (command, '|'.join(buf))
838 def _parse_reply(rvtype, methname, classname, reply):
840 raise RuntimeError, "Invalid reply: %r "\
841 "for stub method %s of class %s" % (
847 result = reply.split("|")
848 code = int(result[0])
852 raise TypeError, "Return value of stub method %s of class %s "\
853 "cannot be parsed: must be of type %s, got %r - nested error: %s" % (
855 getattr(rvtype, '__name__', rvtype), reply,
856 traceback.format_exc()
859 text = base64.b64decode(text)
860 raise RuntimeError(text)
869 raise TypeError, "Return value of stub method %s of class %s "\
870 "cannot be parsed: must be of type %s - nested error: %s" % (
872 getattr(rvtype, '__name__', rvtype),
873 traceback.format_exc()
876 raise RuntimeError, "Invalid reply: %r "\
877 "for stub method %s of class %s - unknown code" % (
883 def _make_stubs(server_class, template_class):
885 Returns a dictionary method_name -> method
890 class SomeProxy(BaseProxy):
893 locals().update( BaseProxy._make_stubs(
898 ServerClass is the corresponding Server class, as
899 specified in the _ServerClass class method (_make_stubs
900 is static and can't access the method), and TemplateClass
901 is the ultimate implementation class behind the server,
902 from which argument names and defaults are taken, to
903 maintain meaningful interfaces.
910 func_template_path = os.path.join(
911 os.path.dirname(__file__),
913 func_template_file = open(func_template_path, "r")
914 func_template = func_template_file.read()
915 func_template_file.close()
917 for methname in vars(template_class).copy():
918 if methname.endswith('_deferred'):
919 # cannot wrap deferreds...
921 dmethname = methname+'_deferred'
922 if hasattr(server_class, methname) and not methname.startswith('_'):
923 template_meth = getattr(template_class, methname)
924 server_meth = getattr(server_class, methname)
926 command = getattr(server_meth, '_handles_command', None)
927 argtypes = getattr(server_meth, '_argtypes', None)
928 argencoders = getattr(server_meth, '_argencoders', None)
929 rvtype = getattr(server_meth, '_retval', None)
932 if hasattr(template_meth, 'fget'):
934 template_meth = template_meth.fget
937 if command is not None and argtypes is not None and argencoders is not None:
938 # We have an interface method...
939 code = template_meth.func_code
940 argnames = code.co_varnames[:code.co_argcount]
941 argdefaults = ( (NONE,) * (len(argnames) - len(template_meth.func_defaults or ()))
942 + (template_meth.func_defaults or ()) )
945 BaseProxy = BaseProxy,
947 argencoders = argencoders,
949 functools = functools,
953 func_text = func_template % dict(
955 args = '%s' % (','.join(argnames[1:])),
957 argname if argdef is NONE
958 else "%s=%r" % (argname, argdef)
959 for argname, argdef in zip(argnames[1:], argdefaults[1:])
963 classname = server_class.__name__
971 exec func_text in func_globals, context
974 rv[methname] = property(context[methname])
975 rv[dmethname] = property(context[dmethname])
977 rv[methname] = context[methname]
978 rv[dmethname] = context[dmethname]
980 # inject _deferred into core classes
981 if hasattr(template_class, methname) and not hasattr(template_class, dmethname):
982 def freezename(methname, dmethname):
983 def dmeth(self, *p, **kw):
984 return getattr(self, methname)(*p, **kw)
985 dmeth.__name__ = dmethname
987 dmeth = freezename(methname, dmethname)
988 setattr(template_class, dmethname, dmeth)
992 class TestbedControllerProxy(BaseProxy):
994 _ServerClass = TestbedControllerServer
996 def __init__(self, root_dir, log_level, testbed_id = None,
997 testbed_version = None, launch = True, host = None,
998 port = None, user = None, ident_key = None, agent = None,
999 environment_setup = ""):
1000 if launch and (testbed_id == None or testbed_version == None):
1001 raise RuntimeError("To launch a TesbedControllerServer a "
1002 "testbed_id and testbed_version are required")
1003 super(TestbedControllerProxy,self).__init__(
1004 ctor_args = (root_dir, log_level, testbed_id, testbed_version),
1005 root_dir = root_dir,
1006 launch = launch, host = host, port = port, user = user,
1007 ident_key = ident_key, agent = agent,
1008 environment_setup = environment_setup)
1010 locals().update( BaseProxy._make_stubs(
1011 server_class = TestbedControllerServer,
1012 template_class = nepi.core.execute.TestbedController,
1015 # Shutdown stops the serverside...
1016 def shutdown(self, _stub = shutdown):
1018 self._client.send_stop()
1019 self._client.read_reply() # wait for it
1023 class ExperimentControllerProxy(BaseProxy):
1024 _ServerClass = ExperimentControllerServer
1026 def __init__(self, root_dir, log_level, experiment_xml = None,
1027 launch = True, host = None, port = None, user = None,
1028 ident_key = None, agent = None, environment_setup = ""):
1029 if launch and experiment_xml is None:
1030 raise RuntimeError("To launch a ExperimentControllerServer a \
1031 xml description of the experiment is required")
1032 super(ExperimentControllerProxy,self).__init__(
1033 ctor_args = (root_dir, log_level, experiment_xml),
1034 root_dir = root_dir,
1035 launch = launch, host = host, port = port, user = user,
1036 ident_key = ident_key, agent = agent,
1037 environment_setup = environment_setup)
1039 locals().update( BaseProxy._make_stubs(
1040 server_class = ExperimentControllerServer,
1041 template_class = nepi.core.execute.ExperimentController,
1045 # Shutdown stops the serverside...
1046 def shutdown(self, _stub = shutdown):
1048 self._client.send_stop()
1049 self._client.read_reply() # wait for it