2 # -*- coding: utf-8 -*-
5 import nepi.core.execute
6 import nepi.util.environ
7 from nepi.core.attributes import AttributesMap, Attribute
8 from nepi.util import server, validation
9 from nepi.util.constants import TIME_NOW, ATTR_NEPI_TESTBED_ENVIRONMENT_SETUP, DeploymentConfiguration as DC
23 # PROTOCOL INSTRUCTION MESSAGES
43 DO_CROSS_CONNECT_INIT = 22
53 GET_ATTRIBUTE_LIST = 32
55 DO_CROSS_CONNECT_COMPL = 34
61 GET_TESTBED_VERSION = 40
68 instruction_text = dict({
79 CONFIGURE: "CONFIGURE",
81 CREATE_SET: "CREATE_SET",
82 FACTORY_SET: "FACTORY_SET",
84 CROSS_CONNECT: "CROSS_CONNECT",
85 ADD_TRACE: "ADD_TRACE",
86 ADD_ADDRESS: "ADD_ADDRESS",
87 ADD_ROUTE: "ADD_ROUTE",
89 DO_CREATE: "DO_CREATE",
90 DO_CONNECT_INIT: "DO_CONNECT_INIT",
91 DO_CONNECT_COMPL: "DO_CONNECT_COMPL",
92 DO_CONFIGURE: "DO_CONFIGURE",
93 DO_PRECONFIGURE: "DO_PRECONFIGURE",
94 DO_CROSS_CONNECT_INIT: "DO_CROSS_CONNECT_INIT",
95 DO_CROSS_CONNECT_COMPL: "DO_CROSS_CONNECT_COMPL",
98 GET_ROUTE: "GET_ROUTE",
99 GET_ADDRESS: "GET_ADDRESS",
100 GET_ATTRIBUTE_LIST: "GET_ATTRIBUTE_LIST",
101 GET_FACTORY_ID: "GET_FACTORY_ID",
102 GET_TESTBED_ID: "GET_TESTBED_ID",
103 GET_TESTBED_VERSION: "GET_TESTBED_VERSION",
107 TESTBED_ID: "TESTBED_ID",
108 TESTBED_VERSION: "TESTBED_VERSION",
109 TRACES_INFO: "TRACES_INFO",
110 STARTED_TIME: "STARTED_TIME",
111 STOPPED_TIME: "STOPPED_TIME",
115 def log_msg(server, params):
117 instr = int(params[0])
118 instr_txt = instruction_text[instr]
119 server.log_debug("%s - msg: %s [%s]" % (server.__class__.__name__,
120 instr_txt, ", ".join(map(str, params[1:]))))
122 # don't die for logging
125 def log_reply(server, reply):
127 res = reply.split("|")
129 code_txt = instruction_text[code]
131 txt = base64.b64decode(res[1])
134 server.log_debug("%s - reply: %s %s" % (server.__class__.__name__,
137 # don't die for logging
138 server.log_debug("%s - reply: %s" % (server.__class__.__name__,
142 def to_server_log_level(log_level):
145 if log_level == DC.DEBUG_LEVEL
146 else server.ERROR_LEVEL
149 def get_access_config_params(access_config):
150 root_dir = access_config.get_attribute_value(DC.ROOT_DIRECTORY)
151 log_level = access_config.get_attribute_value(DC.LOG_LEVEL)
152 log_level = to_server_log_level(log_level)
153 user = host = port = agent = key = None
154 communication = access_config.get_attribute_value(DC.DEPLOYMENT_COMMUNICATION)
155 environment_setup = (
156 access_config.get_attribute_value(DC.DEPLOYMENT_ENVIRONMENT_SETUP)
157 if access_config.has_attribute(DC.DEPLOYMENT_ENVIRONMENT_SETUP)
160 if communication == DC.ACCESS_SSH:
161 user = access_config.get_attribute_value(DC.DEPLOYMENT_USER)
162 host = access_config.get_attribute_value(DC.DEPLOYMENT_HOST)
163 port = access_config.get_attribute_value(DC.DEPLOYMENT_PORT)
164 agent = access_config.get_attribute_value(DC.USE_AGENT)
165 key = access_config.get_attribute_value(DC.DEPLOYMENT_KEY)
166 return (root_dir, log_level, user, host, port, key, agent, environment_setup)
168 class AccessConfiguration(AttributesMap):
169 def __init__(self, params = None):
170 super(AccessConfiguration, self).__init__()
172 from nepi.core.metadata import Metadata
174 for _,attr_info in Metadata.PROXY_ATTRIBUTES.iteritems():
175 self.add_attribute(**attr_info)
178 for attr_name, attr_value in params.iteritems():
179 parser = Attribute.type_parsers[self.get_attribute_type(attr_name)]
180 attr_value = parser(attr_value)
181 self.set_attribute_value(attr_name, attr_value)
183 class TempDir(object):
185 self.path = tempfile.mkdtemp()
188 shutil.rmtree(self.path)
190 class PermDir(object):
191 def __init__(self, path):
194 def create_experiment_controller(xml, access_config = None):
195 mode = None if not access_config \
196 else access_config.get_attribute_value(DC.DEPLOYMENT_MODE)
197 launch = True if not access_config \
198 else not access_config.get_attribute_value(DC.RECOVER)
199 if not mode or mode == DC.MODE_SINGLE_PROCESS:
200 from nepi.core.execute import ExperimentController
202 if not access_config or not access_config.has_attribute(DC.ROOT_DIRECTORY):
205 root_dir = PermDir(access_config.get_attribute_value(DC.ROOT_DIRECTORY))
206 controller = ExperimentController(xml, root_dir.path)
208 # inject reference to temporary dir, so that it gets cleaned
209 # up at destruction time.
210 controller._tempdir = root_dir
217 elif mode == DC.MODE_DAEMON:
218 (root_dir, log_level, user, host, port, key, agent, environment_setup) = \
219 get_access_config_params(access_config)
221 return ExperimentControllerProxy(root_dir, log_level,
222 experiment_xml = xml, host = host, port = port, user = user, ident_key = key,
223 agent = agent, launch = launch,
224 environment_setup = environment_setup)
227 # Maybe controller died, recover from persisted testbed information if possible
228 controller = ExperimentControllerProxy(root_dir, log_level,
229 experiment_xml = xml, host = host, port = port, user = user, ident_key = key,
230 agent = agent, launch = True,
231 environment_setup = environment_setup)
236 raise RuntimeError("Unsupported access configuration '%s'" % mode)
238 def create_testbed_controller(testbed_id, testbed_version, access_config):
239 mode = None if not access_config \
240 else access_config.get_attribute_value(DC.DEPLOYMENT_MODE)
241 launch = True if not access_config \
242 else not access_config.get_attribute_value(DC.RECOVER)
243 if not mode or mode == DC.MODE_SINGLE_PROCESS:
245 raise ValueError, "Unsupported instantiation mode: %s with lanch=False" % (mode,)
246 return _build_testbed_controller(testbed_id, testbed_version)
247 elif mode == DC.MODE_DAEMON:
248 (root_dir, log_level, user, host, port, key, agent, environment_setup) = \
249 get_access_config_params(access_config)
250 return TestbedControllerProxy(root_dir, log_level, testbed_id = testbed_id,
251 testbed_version = testbed_version, host = host, port = port, ident_key = key,
252 user = user, agent = agent, launch = launch,
253 environment_setup = environment_setup)
254 raise RuntimeError("Unsupported access configuration '%s'" % mode)
256 def _build_testbed_controller(testbed_id, testbed_version):
257 mod_name = nepi.util.environ.find_testbed(testbed_id)
259 if not mod_name in sys.modules:
263 raise ImportError, "Cannot find module %s in %r" % (mod_name, sys.path)
265 module = sys.modules[mod_name]
266 tc = module.TestbedController()
267 if tc.testbed_version != testbed_version:
268 raise RuntimeError("Bad testbed version on testbed %s. Asked for %s, got %s" % \
269 (testbed_id, testbed_version, tc.testbed_version))
272 # Just a namespace class
276 def pickled_data(sdata):
277 return cPickle.loads(base64.b64decode(sdata))
280 def base64_data(sdata):
281 return base64.b64decode(sdata)
285 return None if sdata == "None" else int(sdata)
289 return sdata == 'True'
293 def pickled_data(data):
294 return base64.b64encode(cPickle.dumps(data))
297 def base64_data(data):
298 return base64.b64encode(data)
302 return "None" if data is None else int(data)
306 return str(bool(data))
308 # import into Marshalling all the decoders
312 for typname, typ in vars(Decoders).iteritems()
313 if not typname.startswith('_')
316 _TYPE_ENCODERS = dict([
317 # id(type) -> (<encoding_function>, <formatting_string>)
318 (typname, (getattr(Encoders,typname),"%s"))
319 for typname in vars(Decoders)
320 if not typname.startswith('_')
321 and hasattr(Encoders,typname)
325 _TYPE_ENCODERS["float"] = (float, "%r")
326 _TYPE_ENCODERS["int"] = (int, "%d")
327 _TYPE_ENCODERS["long"] = (int, "%d")
328 _TYPE_ENCODERS["str"] = (str, "%s")
329 _TYPE_ENCODERS["unicode"] = (str, "%s")
332 _TYPE_ENCODERS[None] = (str, "%s")
337 Decorator that converts the given function into one that takes
338 a single "params" list, with each parameter marshalled according
339 to the given factory callable (type constructors are accepted).
341 The first argument (self) is left untouched.
345 @Marshalling.args(int,int,str,base64_data)
346 def somefunc(self, someint, otherint, somestr, someb64):
351 def rv(self, params):
352 return f(self, *[ ctor(val)
353 for ctor,val in zip(types, params[1:]) ])
357 # Derive type encoders by looking up types in _TYPE_ENCODERS
358 # make_proxy will use it to encode arguments in command strings
360 TYPE_ENCODERS = Marshalling._TYPE_ENCODERS
362 if typ.__name__ in TYPE_ENCODERS:
363 argencoders.append(TYPE_ENCODERS[typ.__name__])
366 argencoders.append(TYPE_ENCODERS[None])
368 rv._argencoders = tuple(argencoders)
370 rv._retval = getattr(f, '_retval', None)
375 def retval(typ=Decoders.base64_data):
377 Decorator that converts the given function into one that
378 returns a properly encoded return string, given that the undecorated
379 function returns suitable input for the encoding function.
381 The optional typ argument specifies a type.
382 For the default of base64_data, return values should be strings.
383 The return value of the encoding method should be a string always.
387 @Marshalling.args(int,int,str,base64_data)
388 @Marshalling.retval(str)
389 def somefunc(self, someint, otherint, somestr, someb64):
392 encode, fmt = Marshalling._TYPE_ENCODERS.get(
394 Marshalling._TYPE_ENCODERS[None])
399 def rv(self, *p, **kw):
400 data = f(self, *p, **kw)
406 rv._argtypes = getattr(f, '_argtypes', None)
407 rv._argencoders = getattr(f, '_argencoders', None)
414 Decorator that converts the given function into one that
415 always return an encoded empty string.
417 Useful for null-returning functions.
422 def rv(self, *p, **kw):
427 rv._argtypes = getattr(f, '_argtypes', None)
428 rv._argencoders = getattr(f, '_argencoders', None)
432 def handles(whichcommand):
434 Associates the method with a given command code for servers.
435 It should always be the topmost decorator.
438 f._handles_command = whichcommand
442 class BaseServer(server.Server):
443 def reply_action(self, msg):
445 result = base64.b64encode("Invalid command line")
446 reply = "%d|%s" % (ERROR, result)
448 params = msg.split("|")
449 instruction = int(params[0])
450 log_msg(self, params)
452 for mname,meth in vars(self.__class__).iteritems():
453 if not mname.startswith('_'):
454 cmd = getattr(meth, '_handles_command', None)
455 if cmd == instruction:
456 meth = getattr(self, mname)
460 error = "Invalid instruction %s" % instruction
461 self.log_error(error)
462 result = base64.b64encode(error)
463 reply = "%d|%s" % (ERROR, result)
465 error = self.log_error()
466 result = base64.b64encode(error)
467 reply = "%d|%s" % (ERROR, result)
468 log_reply(self, reply)
471 class TestbedControllerServer(BaseServer):
472 def __init__(self, root_dir, log_level, testbed_id, testbed_version, environment_setup):
473 super(TestbedControllerServer, self).__init__(root_dir, log_level,
474 environment_setup = environment_setup )
475 self._testbed_id = testbed_id
476 self._testbed_version = testbed_version
479 def post_daemonize(self):
480 self._testbed = _build_testbed_controller(self._testbed_id,
481 self._testbed_version)
483 @Marshalling.handles(GUIDS)
485 @Marshalling.retval( Marshalling.pickled_data )
487 return self._testbed.guids
489 @Marshalling.handles(TESTBED_ID)
491 @Marshalling.retval()
492 def testbed_id(self):
493 return str(self._testbed.testbed_id)
495 @Marshalling.handles(TESTBED_VERSION)
497 @Marshalling.retval()
498 def testbed_version(self):
499 return str(self._testbed.testbed_version)
501 @Marshalling.handles(CREATE)
502 @Marshalling.args(int, str)
504 def defer_create(self, guid, factory_id):
505 self._testbed.defer_create(guid, factory_id)
507 @Marshalling.handles(TRACE)
508 @Marshalling.args(int, str, Marshalling.base64_data)
509 @Marshalling.retval()
510 def trace(self, guid, trace_id, attribute):
511 return self._testbed.trace(guid, trace_id, attribute)
513 @Marshalling.handles(TRACES_INFO)
515 @Marshalling.retval( Marshalling.pickled_data )
516 def traces_info(self):
517 return self._testbed.traces_info()
519 @Marshalling.handles(START)
523 self._testbed.start()
525 @Marshalling.handles(STOP)
531 @Marshalling.handles(SHUTDOWN)
535 self._testbed.shutdown()
537 @Marshalling.handles(CONFIGURE)
538 @Marshalling.args(Marshalling.base64_data, Marshalling.pickled_data)
540 def defer_configure(self, name, value):
541 self._testbed.defer_configure(name, value)
543 @Marshalling.handles(CREATE_SET)
544 @Marshalling.args(int, Marshalling.base64_data, Marshalling.pickled_data)
546 def defer_create_set(self, guid, name, value):
547 self._testbed.defer_create_set(guid, name, value)
549 @Marshalling.handles(FACTORY_SET)
550 @Marshalling.args(Marshalling.base64_data, Marshalling.pickled_data)
552 def defer_factory_set(self, name, value):
553 self._testbed.defer_factory_set(name, value)
555 @Marshalling.handles(CONNECT)
556 @Marshalling.args(int, str, int, str)
558 def defer_connect(self, guid1, connector_type_name1, guid2, connector_type_name2):
559 self._testbed.defer_connect(guid1, connector_type_name1, guid2,
560 connector_type_name2)
562 @Marshalling.handles(CROSS_CONNECT)
563 @Marshalling.args(int, str, int, int, str, str, str)
565 def defer_cross_connect(self,
566 guid, connector_type_name,
567 cross_guid, cross_testbed_guid,
568 cross_testbed_id, cross_factory_id,
569 cross_connector_type_name):
570 self._testbed.defer_cross_connect(guid, connector_type_name, cross_guid,
571 cross_testbed_guid, cross_testbed_id, cross_factory_id,
572 cross_connector_type_name)
574 @Marshalling.handles(ADD_TRACE)
575 @Marshalling.args(int, str)
577 def defer_add_trace(self, guid, trace_id):
578 self._testbed.defer_add_trace(guid, trace_id)
580 @Marshalling.handles(ADD_ADDRESS)
581 @Marshalling.args(int, str, int, Marshalling.pickled_data)
583 def defer_add_address(self, guid, address, netprefix, broadcast):
584 self._testbed.defer_add_address(guid, address, netprefix,
587 @Marshalling.handles(ADD_ROUTE)
588 @Marshalling.args(int, str, int, str, int)
590 def defer_add_route(self, guid, destination, netprefix, nexthop, metric):
591 self._testbed.defer_add_route(guid, destination, netprefix, nexthop, metric)
593 @Marshalling.handles(DO_SETUP)
597 self._testbed.do_setup()
599 @Marshalling.handles(DO_CREATE)
603 self._testbed.do_create()
605 @Marshalling.handles(DO_CONNECT_INIT)
608 def do_connect_init(self):
609 self._testbed.do_connect_init()
611 @Marshalling.handles(DO_CONNECT_COMPL)
614 def do_connect_compl(self):
615 self._testbed.do_connect_compl()
617 @Marshalling.handles(DO_CONFIGURE)
620 def do_configure(self):
621 self._testbed.do_configure()
623 @Marshalling.handles(DO_PRECONFIGURE)
626 def do_preconfigure(self):
627 self._testbed.do_preconfigure()
629 @Marshalling.handles(DO_PRESTART)
632 def do_prestart(self):
633 self._testbed.do_prestart()
635 @Marshalling.handles(DO_CROSS_CONNECT_INIT)
636 @Marshalling.args( Marshalling.Decoders.pickled_data )
638 def do_cross_connect_init(self, cross_data):
639 self._testbed.do_cross_connect_init(cross_data)
641 @Marshalling.handles(DO_CROSS_CONNECT_COMPL)
642 @Marshalling.args( Marshalling.Decoders.pickled_data )
644 def do_cross_connect_compl(self, cross_data):
645 self._testbed.do_cross_connect_compl(cross_data)
647 @Marshalling.handles(GET)
648 @Marshalling.args(int, Marshalling.base64_data, str)
649 @Marshalling.retval( Marshalling.pickled_data )
650 def get(self, guid, name, time):
651 return self._testbed.get(guid, name, time)
653 @Marshalling.handles(SET)
654 @Marshalling.args(int, Marshalling.base64_data, Marshalling.pickled_data, str)
656 def set(self, guid, name, value, time):
657 self._testbed.set(guid, name, value, time)
659 @Marshalling.handles(GET_ADDRESS)
660 @Marshalling.args(int, int, Marshalling.base64_data)
661 @Marshalling.retval()
662 def get_address(self, guid, index, attribute):
663 return str(self._testbed.get_address(guid, index, attribute))
665 @Marshalling.handles(GET_ROUTE)
666 @Marshalling.args(int, int, Marshalling.base64_data)
667 @Marshalling.retval()
668 def get_route(self, guid, index, attribute):
669 return str(self._testbed.get_route(guid, index, attribute))
671 @Marshalling.handles(ACTION)
672 @Marshalling.args(str, int, Marshalling.base64_data)
674 def action(self, time, guid, command):
675 self._testbed.action(time, guid, command)
677 @Marshalling.handles(STATUS)
678 @Marshalling.args(Marshalling.nullint)
679 @Marshalling.retval(int)
680 def status(self, guid):
681 return self._testbed.status(guid)
683 @Marshalling.handles(TESTBED_STATUS)
685 @Marshalling.retval(int)
686 def testbed_status(self):
687 return self._testbed.testbed_status()
689 @Marshalling.handles(GET_ATTRIBUTE_LIST)
690 @Marshalling.args(int, Marshalling.nullint, Marshalling.bool)
691 @Marshalling.retval( Marshalling.pickled_data )
692 def get_attribute_list(self, guid, filter_flags = None, exclude = False):
693 return self._testbed.get_attribute_list(guid, filter_flags, exclude)
695 @Marshalling.handles(GET_FACTORY_ID)
696 @Marshalling.args(int)
697 @Marshalling.retval()
698 def get_factory_id(self, guid):
699 return self._testbed.get_factory_id(guid)
701 @Marshalling.handles(RECOVER)
705 self._testbed.recover()
708 class ExperimentControllerServer(BaseServer):
709 def __init__(self, root_dir, log_level, experiment_xml, environment_setup):
710 super(ExperimentControllerServer, self).__init__(root_dir, log_level,
711 environment_setup = environment_setup )
712 self._experiment_xml = experiment_xml
713 self._experiment = None
715 def post_daemonize(self):
716 from nepi.core.execute import ExperimentController
717 self._experiment = ExperimentController(self._experiment_xml,
718 root_dir = self._root_dir)
720 @Marshalling.handles(GUIDS)
722 @Marshalling.retval( Marshalling.pickled_data )
724 return self._experiment.guids
726 @Marshalling.handles(STARTED_TIME)
728 @Marshalling.retval( Marshalling.pickled_data )
729 def started_time(self):
730 return self._experiment.started_time
732 @Marshalling.handles(STOPPED_TIME)
734 @Marshalling.retval( Marshalling.pickled_data )
735 def stopped_time(self):
736 return self._experiment.stopped_time
738 @Marshalling.handles(XML)
740 @Marshalling.retval()
741 def experiment_design_xml(self):
742 return self._experiment.experiment_design_xml
744 @Marshalling.handles(EXEC_XML)
746 @Marshalling.retval()
747 def experiment_execute_xml(self):
748 return self._experiment.experiment_execute_xml
750 @Marshalling.handles(TRACE)
751 @Marshalling.args(int, str, Marshalling.base64_data)
752 @Marshalling.retval()
753 def trace(self, guid, trace_id, attribute):
754 return str(self._experiment.trace(guid, trace_id, attribute))
756 @Marshalling.handles(TRACES_INFO)
758 @Marshalling.retval( Marshalling.pickled_data )
759 def traces_info(self):
760 return self._experiment.traces_info()
762 @Marshalling.handles(FINISHED)
763 @Marshalling.args(int)
764 @Marshalling.retval(Marshalling.bool)
765 def is_finished(self, guid):
766 return self._experiment.is_finished(guid)
768 @Marshalling.handles(STATUS)
769 @Marshalling.args(int)
770 @Marshalling.retval(int)
771 def status(self, guid):
772 return self._experiment.is_finished(guid)
774 @Marshalling.handles(GET)
775 @Marshalling.args(int, Marshalling.base64_data, str)
776 @Marshalling.retval( Marshalling.pickled_data )
777 def get(self, guid, name, time):
778 return self._experiment.get(guid, name, time)
780 @Marshalling.handles(SET)
781 @Marshalling.args(int, Marshalling.base64_data, Marshalling.pickled_data, str)
783 def set(self, guid, name, value, time):
784 self._experiment.set(guid, name, value, time)
786 @Marshalling.handles(START)
790 self._experiment.start()
792 @Marshalling.handles(STOP)
796 self._experiment.stop()
798 @Marshalling.handles(RECOVER)
802 self._experiment.recover()
804 @Marshalling.handles(SHUTDOWN)
808 self._experiment.shutdown()
810 @Marshalling.handles(GET_TESTBED_ID)
811 @Marshalling.args(int)
812 @Marshalling.retval()
813 def get_testbed_id(self, guid):
814 return self._experiment.get_testbed_id(guid)
816 @Marshalling.handles(GET_FACTORY_ID)
817 @Marshalling.args(int)
818 @Marshalling.retval()
819 def get_factory_id(self, guid):
820 return self._experiment.get_factory_id(guid)
822 @Marshalling.handles(GET_TESTBED_VERSION)
823 @Marshalling.args(int)
824 @Marshalling.retval()
825 def get_testbed_version(self, guid):
826 return self._experiment.get_testbed_version(guid)
828 class BaseProxy(object):
830 _ServerClassModule = "nepi.util.proxy"
834 launch = True, host = None,
835 port = None, user = None, ident_key = None, agent = None,
836 environment_setup = ""):
841 "from %(classmodule)s import %(classname)s;"
842 "s = %(classname)s%(ctor_args)r;"
845 classname = self._ServerClass.__name__,
846 classmodule = self._ServerClassModule,
847 ctor_args = ctor_args
849 proc = server.popen_ssh_subprocess(python_code, host = host,
850 port = port, user = user, agent = agent,
851 ident_key = ident_key,
852 environment_setup = environment_setup,
855 err = proc.stderr.read()
856 raise RuntimeError, "Server could not be executed: %s" % (err,)
859 s = self._ServerClass(*ctor_args)
862 # connect client to server
863 self._client = server.Client(root_dir, host = host, port = port,
864 user = user, agent = agent,
865 environment_setup = environment_setup)
868 def _make_message(argtypes, argencoders, command, methname, classname, *args):
869 if len(argtypes) != len(argencoders):
870 raise ValueError, "Invalid arguments for _make_message: "\
871 "in stub method %s of class %s "\
872 "argtypes and argencoders must match in size" % (
873 methname, classname )
874 if len(argtypes) != len(args):
875 raise ValueError, "Invalid arguments for _make_message: "\
876 "in stub method %s of class %s "\
877 "expected %d arguments, got %d" % (
879 len(argtypes), len(args))
882 for argnum, (typ, (encode, fmt), val) in enumerate(zip(argtypes, argencoders, args)):
884 buf.append(fmt % encode(val))
887 raise TypeError, "Argument %d of stub method %s of class %s "\
888 "requires a value of type %s, but got %s - nested error: %s" % (
889 argnum, methname, classname,
890 getattr(typ, '__name__', typ), type(val),
891 traceback.format_exc()
894 return "%d|%s" % (command, '|'.join(buf))
897 def _parse_reply(rvtype, methname, classname, reply):
899 raise RuntimeError, "Invalid reply: %r "\
900 "for stub method %s of class %s" % (
906 result = reply.split("|")
907 code = int(result[0])
911 raise TypeError, "Return value of stub method %s of class %s "\
912 "cannot be parsed: must be of type %s, got %r - nested error: %s" % (
914 getattr(rvtype, '__name__', rvtype), reply,
915 traceback.format_exc()
918 text = base64.b64decode(text)
919 raise RuntimeError(text)
928 raise TypeError, "Return value of stub method %s of class %s "\
929 "cannot be parsed: must be of type %s - nested error: %s" % (
931 getattr(rvtype, '__name__', rvtype),
932 traceback.format_exc()
935 raise RuntimeError, "Invalid reply: %r "\
936 "for stub method %s of class %s - unknown code" % (
942 def _make_stubs(server_class, template_class):
944 Returns a dictionary method_name -> method
949 class SomeProxy(BaseProxy):
952 locals().update( BaseProxy._make_stubs(
957 ServerClass is the corresponding Server class, as
958 specified in the _ServerClass class method (_make_stubs
959 is static and can't access the method), and TemplateClass
960 is the ultimate implementation class behind the server,
961 from which argument names and defaults are taken, to
962 maintain meaningful interfaces.
969 func_template_path = os.path.join(
970 os.path.dirname(__file__),
972 func_template_file = open(func_template_path, "r")
973 func_template = func_template_file.read()
974 func_template_file.close()
976 for methname in vars(template_class).copy():
977 if methname.endswith('_deferred'):
978 # cannot wrap deferreds...
980 dmethname = methname+'_deferred'
981 if hasattr(server_class, methname) and not methname.startswith('_'):
982 template_meth = getattr(template_class, methname)
983 server_meth = getattr(server_class, methname)
985 command = getattr(server_meth, '_handles_command', None)
986 argtypes = getattr(server_meth, '_argtypes', None)
987 argencoders = getattr(server_meth, '_argencoders', None)
988 rvtype = getattr(server_meth, '_retval', None)
991 if hasattr(template_meth, 'fget'):
993 template_meth = template_meth.fget
996 if command is not None and argtypes is not None and argencoders is not None:
997 # We have an interface method...
998 code = template_meth.func_code
999 argnames = code.co_varnames[:code.co_argcount]
1000 argdefaults = ( (NONE,) * (len(argnames) - len(template_meth.func_defaults or ()))
1001 + (template_meth.func_defaults or ()) )
1003 func_globals = dict(
1004 BaseProxy = BaseProxy,
1005 argtypes = argtypes,
1006 argencoders = argencoders,
1008 functools = functools,
1012 func_text = func_template % dict(
1014 args = '%s' % (','.join(argnames[1:])),
1015 argdefs = ','.join([
1016 argname if argdef is NONE
1017 else "%s=%r" % (argname, argdef)
1018 for argname, argdef in zip(argnames[1:], argdefaults[1:])
1021 methname = methname,
1022 classname = server_class.__name__
1025 func_text = compile(
1030 exec func_text in func_globals, context
1033 rv[methname] = property(context[methname])
1034 rv[dmethname] = property(context[dmethname])
1036 rv[methname] = context[methname]
1037 rv[dmethname] = context[dmethname]
1039 # inject _deferred into core classes
1040 if hasattr(template_class, methname) and not hasattr(template_class, dmethname):
1041 def freezename(methname, dmethname):
1042 def dmeth(self, *p, **kw):
1043 return getattr(self, methname)(*p, **kw)
1044 dmeth.__name__ = dmethname
1046 dmeth = freezename(methname, dmethname)
1047 setattr(template_class, dmethname, dmeth)
1051 class TestbedControllerProxy(BaseProxy):
1053 _ServerClass = TestbedControllerServer
1055 def __init__(self, root_dir, log_level, testbed_id = None,
1056 testbed_version = None, launch = True, host = None,
1057 port = None, user = None, ident_key = None, agent = None,
1058 environment_setup = ""):
1059 if launch and (testbed_id == None or testbed_version == None):
1060 raise RuntimeError("To launch a TesbedControllerServer a "
1061 "testbed_id and testbed_version are required")
1062 super(TestbedControllerProxy,self).__init__(
1063 ctor_args = (root_dir, log_level, testbed_id, testbed_version, environment_setup),
1064 root_dir = root_dir,
1065 launch = launch, host = host, port = port, user = user,
1066 ident_key = ident_key, agent = agent,
1067 environment_setup = environment_setup)
1069 locals().update( BaseProxy._make_stubs(
1070 server_class = TestbedControllerServer,
1071 template_class = nepi.core.execute.TestbedController,
1074 # Shutdown stops the serverside...
1075 def shutdown(self, _stub = shutdown):
1077 self._client.send_stop()
1078 self._client.read_reply() # wait for it
1082 class ExperimentControllerProxy(BaseProxy):
1083 _ServerClass = ExperimentControllerServer
1085 def __init__(self, root_dir, log_level, experiment_xml = None,
1086 launch = True, host = None, port = None, user = None,
1087 ident_key = None, agent = None, environment_setup = ""):
1088 super(ExperimentControllerProxy,self).__init__(
1089 ctor_args = (root_dir, log_level, experiment_xml, environment_setup),
1090 root_dir = root_dir,
1091 launch = launch, host = host, port = port, user = user,
1092 ident_key = ident_key, agent = agent,
1093 environment_setup = environment_setup)
1095 locals().update( BaseProxy._make_stubs(
1096 server_class = ExperimentControllerServer,
1097 template_class = nepi.core.execute.ExperimentController,
1101 # Shutdown stops the serverside...
1102 def shutdown(self, _stub = shutdown):
1104 self._client.send_stop()
1105 self._client.read_reply() # wait for it