2 # -*- coding: utf-8 -*-
5 import nepi.core.execute
6 import nepi.util.environ
7 from nepi.core.attributes import AttributesMap, Attribute
8 from nepi.util import server, validation
9 from nepi.util.constants import TIME_NOW, ATTR_NEPI_TESTBED_ENVIRONMENT_SETUP, DeploymentConfiguration as DC
23 # PROTOCOL INSTRUCTION MESSAGES
43 DO_CROSS_CONNECT_INIT = 22
53 GET_ATTRIBUTE_LIST = 32
55 DO_CROSS_CONNECT_COMPL = 34
61 GET_TESTBED_VERSION = 40
68 ACCESS_CONFIGURATIONS = 47
69 CURRENT_ACCESS_CONFIG = 48
72 instruction_text = dict({
83 CONFIGURE: "CONFIGURE",
85 CREATE_SET: "CREATE_SET",
86 FACTORY_SET: "FACTORY_SET",
88 CROSS_CONNECT: "CROSS_CONNECT",
89 ADD_TRACE: "ADD_TRACE",
90 ADD_ADDRESS: "ADD_ADDRESS",
91 ADD_ROUTE: "ADD_ROUTE",
93 DO_CREATE: "DO_CREATE",
94 DO_CONNECT_INIT: "DO_CONNECT_INIT",
95 DO_CONNECT_COMPL: "DO_CONNECT_COMPL",
96 DO_CONFIGURE: "DO_CONFIGURE",
97 DO_PRECONFIGURE: "DO_PRECONFIGURE",
98 DO_CROSS_CONNECT_INIT: "DO_CROSS_CONNECT_INIT",
99 DO_CROSS_CONNECT_COMPL: "DO_CROSS_CONNECT_COMPL",
102 GET_ROUTE: "GET_ROUTE",
103 GET_ADDRESS: "GET_ADDRESS",
104 GET_ATTRIBUTE_LIST: "GET_ATTRIBUTE_LIST",
105 GET_FACTORY_ID: "GET_FACTORY_ID",
106 GET_TESTBED_ID: "GET_TESTBED_ID",
107 GET_TESTBED_VERSION: "GET_TESTBED_VERSION",
111 TESTBED_ID: "TESTBED_ID",
112 TESTBED_VERSION: "TESTBED_VERSION",
113 TRACES_INFO: "TRACES_INFO",
114 STARTED_TIME: "STARTED_TIME",
115 STOPPED_TIME: "STOPPED_TIME",
117 ACCESS_CONFIGURATIONS: "ACCESS_CONFIGURATIONS",
118 CURRENT_ACCESS_CONFIG: "CURRENT_ACCESS_CONFIG"
122 def log_msg(server, params):
124 instr = int(params[0])
125 instr_txt = instruction_text[instr]
126 server.log_debug("%s - msg: %s [%s]" % (server.__class__.__name__,
127 instr_txt, ", ".join(map(str, params[1:]))))
129 # don't die for logging
132 def log_reply(server, reply):
134 res = reply.split("|")
136 code_txt = instruction_text[code]
138 txt = base64.b64decode(res[1])
141 server.log_debug("%s - reply: %s %s" % (server.__class__.__name__,
144 # don't die for logging
145 server.log_debug("%s - reply: %s" % (server.__class__.__name__,
149 def to_server_log_level(log_level):
152 if log_level == DC.DEBUG_LEVEL
156 def get_access_config_params(access_config):
157 mode = access_config.get_attribute_value(DC.DEPLOYMENT_MODE)
158 launch = not access_config.get_attribute_value(DC.RECOVER)
159 root_dir = access_config.get_attribute_value(DC.ROOT_DIRECTORY)
160 log_level = access_config.get_attribute_value(DC.LOG_LEVEL)
161 log_level = to_server_log_level(log_level)
162 communication = access_config.get_attribute_value(DC.DEPLOYMENT_COMMUNICATION)
163 environment_setup = (
164 access_config.get_attribute_value(DC.DEPLOYMENT_ENVIRONMENT_SETUP)
165 if access_config.has_attribute(DC.DEPLOYMENT_ENVIRONMENT_SETUP)
168 user = access_config.get_attribute_value(DC.DEPLOYMENT_USER)
169 host = access_config.get_attribute_value(DC.DEPLOYMENT_HOST)
170 port = access_config.get_attribute_value(DC.DEPLOYMENT_PORT)
171 agent = access_config.get_attribute_value(DC.USE_AGENT)
172 sudo = access_config.get_attribute_value(DC.USE_SUDO)
173 key = access_config.get_attribute_value(DC.DEPLOYMENT_KEY)
174 communication = access_config.get_attribute_value(DC.DEPLOYMENT_COMMUNICATION)
175 clean_root = access_config.get_attribute_value(DC.CLEAN_ROOT)
176 return (mode, launch, root_dir, log_level, communication, user, host, port,
177 key, agent, sudo, environment_setup, clean_root)
179 class AccessConfiguration(AttributesMap):
180 def __init__(self, params = None):
181 super(AccessConfiguration, self).__init__()
183 from nepi.core.metadata import Metadata
185 for _,attr_info in Metadata.PROXY_ATTRIBUTES.iteritems():
186 self.add_attribute(**attr_info)
189 for attr_name, attr_value in params.iteritems():
190 parser = Attribute.type_parsers[self.get_attribute_type(attr_name)]
191 attr_value = parser(attr_value)
192 self.set_attribute_value(attr_name, attr_value)
194 class TempDir(object):
196 self.path = tempfile.mkdtemp()
199 shutil.rmtree(self.path)
201 class PermDir(object):
202 def __init__(self, path):
205 def create_experiment_suite(xml, access_config, repetitions = None,
206 duration = None, wait_guids = None):
209 (mode, launch, root_dir, log_level, communication, user, host, port,
210 key, agent, sudo, environment_setup, clean_root) \
211 = get_access_config_params(access_config)
213 if not mode or mode == DC.MODE_SINGLE_PROCESS:
214 from nepi.core.execute import ExperimentSuite
218 root_dir = PermDir(access_config.get_attribute_value(DC.ROOT_DIRECTORY))
220 exp_suite = ExperimentSuite(xml, access_config, repetitions, duration,
223 # inject reference to temporary dir, so that it gets cleaned
224 # up at destruction time.
225 exp_suite._tempdir = root_dir
227 elif mode == DC.MODE_DAEMON:
228 return ExperimentSuiteProxy(root_dir, log_level,
230 repetitions = repetitions,
232 wait_guids = wait_guids,
233 communication = communication,
240 environment_setup = environment_setup,
241 clean_root = clean_root)
242 raise RuntimeError("Unsupported access configuration '%s'" % mode)
244 def create_experiment_controller(xml, access_config = None):
247 log_level = DC.ERROR_LEVEL
249 (mode, launch, root_dir, log_level, communication, user, host, port,
250 key, agent, sudo, environment_setup, clean_root) \
251 = get_access_config_params(access_config)
253 os.environ["NEPI_CONTROLLER_LOGLEVEL"] = log_level
255 if not mode or mode == DC.MODE_SINGLE_PROCESS:
256 from nepi.core.execute import ExperimentController
258 if not access_config or not access_config.has_attribute(DC.ROOT_DIRECTORY):
261 root_dir = PermDir(access_config.get_attribute_value(DC.ROOT_DIRECTORY))
262 controller = ExperimentController(xml, root_dir.path)
264 # inject reference to temporary dir, so that it gets cleaned
265 # up at destruction time.
266 controller._tempdir = root_dir
273 elif mode == DC.MODE_DAEMON:
275 return ExperimentControllerProxy(root_dir, log_level,
276 experiment_xml = xml,
277 communication = communication,
285 environment_setup = environment_setup,
286 clean_root = clean_root)
289 # Maybe controller died, recover from persisted testbed information if possible
290 controller = ExperimentControllerProxy(root_dir, log_level,
291 experiment_xml = xml,
292 communication = communication,
300 environment_setup = environment_setup,
301 clean_root = clean_root)
306 raise RuntimeError("Unsupported access configuration '%s'" % mode)
308 def create_testbed_controller(testbed_id, testbed_version, access_config):
311 log_level = DC.ERROR_LEVEL
313 (mode, launch, root_dir, log_level, communication, user, host, port,
314 key, agent, sudo, environment_setup, clean_root) \
315 = get_access_config_params(access_config)
317 os.environ["NEPI_CONTROLLER_LOGLEVEL"] = log_level
319 if not mode or mode == DC.MODE_SINGLE_PROCESS:
321 raise ValueError, "Unsupported instantiation mode: %s with launch=False" % (mode,)
322 return _build_testbed_controller(testbed_id, testbed_version)
323 elif mode == DC.MODE_DAEMON:
324 return TestbedControllerProxy(root_dir, log_level,
325 testbed_id = testbed_id,
326 testbed_version = testbed_version,
327 communication = communication,
335 environment_setup = environment_setup,
336 clean_root = clean_root)
337 raise RuntimeError("Unsupported access configuration '%s'" % mode)
339 def _build_testbed_controller(testbed_id, testbed_version):
340 mod_name = nepi.util.environ.find_testbed(testbed_id)
342 if not mod_name in sys.modules:
346 raise ImportError, "Cannot find module %s in %r" % (mod_name, sys.path)
348 module = sys.modules[mod_name]
349 tc = module.TestbedController()
350 if tc.testbed_version != testbed_version:
351 raise RuntimeError("Bad testbed version on testbed %s. Asked for %s, got %s" % \
352 (testbed_id, testbed_version, tc.testbed_version))
355 # Just a namespace class
359 def pickled_data(sdata):
360 return cPickle.loads(base64.b64decode(sdata))
363 def base64_data(sdata):
364 return base64.b64decode(sdata)
368 return None if sdata == "None" else int(sdata)
372 return sdata == 'True'
376 def pickled_data(data):
377 return base64.b64encode(cPickle.dumps(data))
380 def base64_data(data):
383 return base64.b64encode(data)
387 return "None" if data is None else int(data)
391 return str(bool(data))
393 # import into Marshalling all the decoders
397 for typname, typ in vars(Decoders).iteritems()
398 if not typname.startswith('_')
401 _TYPE_ENCODERS = dict([
402 # id(type) -> (<encoding_function>, <formatting_string>)
403 (typname, (getattr(Encoders,typname),"%s"))
404 for typname in vars(Decoders)
405 if not typname.startswith('_')
406 and hasattr(Encoders,typname)
410 _TYPE_ENCODERS["float"] = (float, "%r")
411 _TYPE_ENCODERS["int"] = (int, "%d")
412 _TYPE_ENCODERS["long"] = (int, "%d")
413 _TYPE_ENCODERS["str"] = (str, "%s")
414 _TYPE_ENCODERS["unicode"] = (str, "%s")
417 _TYPE_ENCODERS[None] = (str, "%s")
422 Decorator that converts the given function into one that takes
423 a single "params" list, with each parameter marshalled according
424 to the given factory callable (type constructors are accepted).
426 The first argument (self) is left untouched.
430 @Marshalling.args(int,int,str,base64_data)
431 def somefunc(self, someint, otherint, somestr, someb64):
436 def rv(self, params):
437 return f(self, *[ ctor(val)
438 for ctor,val in zip(types, params[1:]) ])
442 # Derive type encoders by looking up types in _TYPE_ENCODERS
443 # make_proxy will use it to encode arguments in command strings
445 TYPE_ENCODERS = Marshalling._TYPE_ENCODERS
447 if typ.__name__ in TYPE_ENCODERS:
448 argencoders.append(TYPE_ENCODERS[typ.__name__])
451 argencoders.append(TYPE_ENCODERS[None])
453 rv._argencoders = tuple(argencoders)
455 rv._retval = getattr(f, '_retval', None)
460 def retval(typ=Decoders.base64_data):
462 Decorator that converts the given function into one that
463 returns a properly encoded return string, given that the undecorated
464 function returns suitable input for the encoding function.
466 The optional typ argument specifies a type.
467 For the default of base64_data, return values should be strings.
468 The return value of the encoding method should be a string always.
472 @Marshalling.args(int,int,str,base64_data)
473 @Marshalling.retval(str)
474 def somefunc(self, someint, otherint, somestr, someb64):
477 encode, fmt = Marshalling._TYPE_ENCODERS.get(
479 Marshalling._TYPE_ENCODERS[None])
484 def rv(self, *p, **kw):
485 data = f(self, *p, **kw)
491 rv._argtypes = getattr(f, '_argtypes', None)
492 rv._argencoders = getattr(f, '_argencoders', None)
499 Decorator that converts the given function into one that
500 always return an encoded empty string.
502 Useful for null-returning functions.
507 def rv(self, *p, **kw):
512 rv._argtypes = getattr(f, '_argtypes', None)
513 rv._argencoders = getattr(f, '_argencoders', None)
517 def handles(whichcommand):
519 Associates the method with a given command code for servers.
520 It should always be the topmost decorator.
523 f._handles_command = whichcommand
527 class BaseServer(server.Server):
528 def reply_action(self, msg):
530 result = base64.b64encode("Invalid command line")
531 reply = "%d|%s" % (ERROR, result)
533 params = msg.split("|")
534 instruction = int(params[0])
535 log_msg(self, params)
537 for mname,meth in vars(self.__class__).iteritems():
538 if not mname.startswith('_'):
539 cmd = getattr(meth, '_handles_command', None)
540 if cmd == instruction:
541 meth = getattr(self, mname)
545 error = "Invalid instruction %s" % instruction
546 self.log_error(error)
547 result = base64.b64encode(error)
548 reply = "%d|%s" % (ERROR, result)
550 error = self.log_error()
551 result = base64.b64encode(error)
552 reply = "%d|%s" % (ERROR, result)
553 log_reply(self, reply)
556 class ExperimentSuiteServer(BaseServer):
557 def __init__(self, root_dir, log_level,
558 xml, repetitions, duration, wait_guids,
559 communication = DC.ACCESS_LOCAL,
566 environment_setup = "",
568 super(ExperimentSuiteServer, self).__init__(root_dir, log_level,
569 environment_setup = environment_setup, clean_root = clean_root)
570 access_config = AccessConfiguration()
571 access_config.set_attribute_value(DC.ROOT_DIRECTORY, root_dir)
572 access_config.set_attribute_value(DC.LOG_LEVEL, log_level)
573 access_config.set_attribute_value(DC.DEPLOYMENT_ENVIRONMENT_SETUP, environment_setup)
575 access_config.set_attribute_value(DC.DEPLOYMENT_USER, user)
577 access_config.set_attribute_value(DC.DEPLOYMENT_HOST, host)
579 access_config.set_attribute_value(DC.DEPLOYMENT_PORT, port)
581 access_config.set_attribute_value(DC.USE_AGENT, agent)
583 acess_config.set_attribute_value(DC.USE_SUDO, sudo)
585 access_config.set_attribute_value(DC.DEPLOYMENT_KEY, ident_key)
587 access_config.set_attribute_value(DC.DEPLOYMENT_COMMUNICATION, communication)
589 access_config.set_attribute_value(DC.CLEAN_ROOT, clean_root)
590 self._experiment_xml = xml
591 self._duration = duration
592 self._repetitions = repetitions
593 self._wait_guids = wait_guids
594 self._access_config = access_config
595 self._experiment_suite = None
597 def post_daemonize(self):
598 from nepi.core.execute import ExperimentSuite
599 self._experiment_suite = ExperimentSuite(
600 self._experiment_xml, self._access_config,
601 self._repetitions, self._duration, self._wait_guids)
603 @Marshalling.handles(CURRENT)
605 @Marshalling.retval(int)
607 return self._experiment_suite.current()
609 @Marshalling.handles(STATUS)
611 @Marshalling.retval(int)
613 return self._experiment_suite.status()
615 @Marshalling.handles(FINISHED)
617 @Marshalling.retval(Marshalling.bool)
618 def is_finished(self):
619 return self._experiment_suite.is_finished()
621 @Marshalling.handles(ACCESS_CONFIGURATIONS)
623 @Marshalling.retval( Marshalling.pickled_data )
624 def get_access_configurations(self):
625 return self._experiment_suite.get_access_configurations()
627 @Marshalling.handles(START)
631 self._experiment_suite.start()
633 @Marshalling.handles(SHUTDOWN)
637 self._experiment_suite.shutdown()
639 @Marshalling.handles(CURRENT_ACCESS_CONFIG)
641 @Marshalling.retval( Marshalling.pickled_data )
642 def get_current_access_config(self):
643 return self._experiment_suite.get_current_access_config()
645 class TestbedControllerServer(BaseServer):
646 def __init__(self, root_dir, log_level, testbed_id, testbed_version,
647 environment_setup, clean_root):
648 super(TestbedControllerServer, self).__init__(root_dir, log_level,
649 environment_setup = environment_setup, clean_root = clean_root)
650 self._testbed_id = testbed_id
651 self._testbed_version = testbed_version
654 def post_daemonize(self):
655 self._testbed = _build_testbed_controller(self._testbed_id,
656 self._testbed_version)
658 @Marshalling.handles(GUIDS)
660 @Marshalling.retval( Marshalling.pickled_data )
662 return self._testbed.guids
664 @Marshalling.handles(TESTBED_ID)
666 @Marshalling.retval()
667 def testbed_id(self):
668 return str(self._testbed.testbed_id)
670 @Marshalling.handles(TESTBED_VERSION)
672 @Marshalling.retval()
673 def testbed_version(self):
674 return str(self._testbed.testbed_version)
676 @Marshalling.handles(CREATE)
677 @Marshalling.args(int, str)
679 def defer_create(self, guid, factory_id):
680 self._testbed.defer_create(guid, factory_id)
682 @Marshalling.handles(TRACE)
683 @Marshalling.args(int, str, Marshalling.base64_data)
684 @Marshalling.retval()
685 def trace(self, guid, trace_id, attribute):
686 return self._testbed.trace(guid, trace_id, attribute)
688 @Marshalling.handles(TRACES_INFO)
690 @Marshalling.retval( Marshalling.pickled_data )
691 def traces_info(self):
692 return self._testbed.traces_info()
694 @Marshalling.handles(START)
698 self._testbed.start()
700 @Marshalling.handles(STOP)
706 @Marshalling.handles(SHUTDOWN)
710 self._testbed.shutdown()
712 @Marshalling.handles(CONFIGURE)
713 @Marshalling.args(Marshalling.base64_data, Marshalling.pickled_data)
715 def defer_configure(self, name, value):
716 self._testbed.defer_configure(name, value)
718 @Marshalling.handles(CREATE_SET)
719 @Marshalling.args(int, Marshalling.base64_data, Marshalling.pickled_data)
721 def defer_create_set(self, guid, name, value):
722 self._testbed.defer_create_set(guid, name, value)
724 @Marshalling.handles(FACTORY_SET)
725 @Marshalling.args(Marshalling.base64_data, Marshalling.pickled_data)
727 def defer_factory_set(self, name, value):
728 self._testbed.defer_factory_set(name, value)
730 @Marshalling.handles(CONNECT)
731 @Marshalling.args(int, str, int, str)
733 def defer_connect(self, guid1, connector_type_name1, guid2, connector_type_name2):
734 self._testbed.defer_connect(guid1, connector_type_name1, guid2,
735 connector_type_name2)
737 @Marshalling.handles(CROSS_CONNECT)
738 @Marshalling.args(int, str, int, int, str, str, str)
740 def defer_cross_connect(self,
741 guid, connector_type_name,
742 cross_guid, cross_testbed_guid,
743 cross_testbed_id, cross_factory_id,
744 cross_connector_type_name):
745 self._testbed.defer_cross_connect(guid, connector_type_name, cross_guid,
746 cross_testbed_guid, cross_testbed_id, cross_factory_id,
747 cross_connector_type_name)
749 @Marshalling.handles(ADD_TRACE)
750 @Marshalling.args(int, str)
752 def defer_add_trace(self, guid, trace_id):
753 self._testbed.defer_add_trace(guid, trace_id)
755 @Marshalling.handles(ADD_ADDRESS)
756 @Marshalling.args(int, str, int, Marshalling.pickled_data)
758 def defer_add_address(self, guid, address, netprefix, broadcast):
759 self._testbed.defer_add_address(guid, address, netprefix,
762 @Marshalling.handles(ADD_ROUTE)
763 @Marshalling.args(int, str, int, str, int)
765 def defer_add_route(self, guid, destination, netprefix, nexthop, metric):
766 self._testbed.defer_add_route(guid, destination, netprefix, nexthop, metric)
768 @Marshalling.handles(DO_SETUP)
772 self._testbed.do_setup()
774 @Marshalling.handles(DO_CREATE)
778 self._testbed.do_create()
780 @Marshalling.handles(DO_CONNECT_INIT)
783 def do_connect_init(self):
784 self._testbed.do_connect_init()
786 @Marshalling.handles(DO_CONNECT_COMPL)
789 def do_connect_compl(self):
790 self._testbed.do_connect_compl()
792 @Marshalling.handles(DO_CONFIGURE)
795 def do_configure(self):
796 self._testbed.do_configure()
798 @Marshalling.handles(DO_PRECONFIGURE)
801 def do_preconfigure(self):
802 self._testbed.do_preconfigure()
804 @Marshalling.handles(DO_PRESTART)
807 def do_prestart(self):
808 self._testbed.do_prestart()
810 @Marshalling.handles(DO_CROSS_CONNECT_INIT)
811 @Marshalling.args( Marshalling.Decoders.pickled_data )
813 def do_cross_connect_init(self, cross_data):
814 self._testbed.do_cross_connect_init(cross_data)
816 @Marshalling.handles(DO_CROSS_CONNECT_COMPL)
817 @Marshalling.args( Marshalling.Decoders.pickled_data )
819 def do_cross_connect_compl(self, cross_data):
820 self._testbed.do_cross_connect_compl(cross_data)
822 @Marshalling.handles(GET)
823 @Marshalling.args(int, Marshalling.base64_data, str)
824 @Marshalling.retval( Marshalling.pickled_data )
825 def get(self, guid, name, time):
826 return self._testbed.get(guid, name, time)
828 @Marshalling.handles(SET)
829 @Marshalling.args(int, Marshalling.base64_data, Marshalling.pickled_data, str)
831 def set(self, guid, name, value, time):
832 self._testbed.set(guid, name, value, time)
834 @Marshalling.handles(GET_ADDRESS)
835 @Marshalling.args(int, int, Marshalling.base64_data)
836 @Marshalling.retval()
837 def get_address(self, guid, index, attribute):
838 return str(self._testbed.get_address(guid, index, attribute))
840 @Marshalling.handles(GET_ROUTE)
841 @Marshalling.args(int, int, Marshalling.base64_data)
842 @Marshalling.retval()
843 def get_route(self, guid, index, attribute):
844 return str(self._testbed.get_route(guid, index, attribute))
846 @Marshalling.handles(ACTION)
847 @Marshalling.args(str, int, Marshalling.base64_data)
849 def action(self, time, guid, command):
850 self._testbed.action(time, guid, command)
852 @Marshalling.handles(STATUS)
853 @Marshalling.args(Marshalling.nullint)
854 @Marshalling.retval(int)
855 def status(self, guid):
856 return self._testbed.status(guid)
858 @Marshalling.handles(TESTBED_STATUS)
860 @Marshalling.retval(int)
861 def testbed_status(self):
862 return self._testbed.testbed_status()
864 @Marshalling.handles(GET_ATTRIBUTE_LIST)
865 @Marshalling.args(int, Marshalling.nullint, Marshalling.bool)
866 @Marshalling.retval( Marshalling.pickled_data )
867 def get_attribute_list(self, guid, filter_flags = None, exclude = False):
868 return self._testbed.get_attribute_list(guid, filter_flags, exclude)
870 @Marshalling.handles(GET_FACTORY_ID)
871 @Marshalling.args(int)
872 @Marshalling.retval()
873 def get_factory_id(self, guid):
874 return self._testbed.get_factory_id(guid)
876 @Marshalling.handles(RECOVER)
880 self._testbed.recover()
883 class ExperimentControllerServer(BaseServer):
884 def __init__(self, root_dir, log_level, experiment_xml, environment_setup,
886 super(ExperimentControllerServer, self).__init__(root_dir, log_level,
887 environment_setup = environment_setup, clean_root = clean_root)
888 self._experiment_xml = experiment_xml
889 self._experiment = None
891 def post_daemonize(self):
892 from nepi.core.execute import ExperimentController
893 self._experiment = ExperimentController(self._experiment_xml,
894 root_dir = self._root_dir)
896 @Marshalling.handles(GUIDS)
898 @Marshalling.retval( Marshalling.pickled_data )
900 return self._experiment.guids
902 @Marshalling.handles(STARTED_TIME)
904 @Marshalling.retval( Marshalling.pickled_data )
905 def started_time(self):
906 return self._experiment.started_time
908 @Marshalling.handles(STOPPED_TIME)
910 @Marshalling.retval( Marshalling.pickled_data )
911 def stopped_time(self):
912 return self._experiment.stopped_time
914 @Marshalling.handles(XML)
916 @Marshalling.retval()
917 def experiment_design_xml(self):
918 return self._experiment.experiment_design_xml
920 @Marshalling.handles(EXEC_XML)
922 @Marshalling.retval()
923 def experiment_execute_xml(self):
924 return self._experiment.experiment_execute_xml
926 @Marshalling.handles(TRACE)
927 @Marshalling.args(int, str, Marshalling.base64_data)
928 @Marshalling.retval()
929 def trace(self, guid, trace_id, attribute):
930 return str(self._experiment.trace(guid, trace_id, attribute))
932 @Marshalling.handles(TRACES_INFO)
934 @Marshalling.retval( Marshalling.pickled_data )
935 def traces_info(self):
936 return self._experiment.traces_info()
938 @Marshalling.handles(FINISHED)
939 @Marshalling.args(int)
940 @Marshalling.retval(Marshalling.bool)
941 def is_finished(self, guid):
942 return self._experiment.is_finished(guid)
944 @Marshalling.handles(STATUS)
945 @Marshalling.args(int)
946 @Marshalling.retval(int)
947 def status(self, guid):
948 return self._experiment.status(guid)
950 @Marshalling.handles(GET)
951 @Marshalling.args(int, Marshalling.base64_data, str)
952 @Marshalling.retval( Marshalling.pickled_data )
953 def get(self, guid, name, time):
954 return self._experiment.get(guid, name, time)
956 @Marshalling.handles(SET)
957 @Marshalling.args(int, Marshalling.base64_data, Marshalling.pickled_data, str)
959 def set(self, guid, name, value, time):
960 self._experiment.set(guid, name, value, time)
962 @Marshalling.handles(START)
966 self._experiment.start()
968 @Marshalling.handles(STOP)
972 self._experiment.stop()
974 @Marshalling.handles(RECOVER)
978 self._experiment.recover()
980 @Marshalling.handles(SHUTDOWN)
984 self._experiment.shutdown()
986 @Marshalling.handles(GET_TESTBED_ID)
987 @Marshalling.args(int)
988 @Marshalling.retval()
989 def get_testbed_id(self, guid):
990 return self._experiment.get_testbed_id(guid)
992 @Marshalling.handles(GET_FACTORY_ID)
993 @Marshalling.args(int)
994 @Marshalling.retval()
995 def get_factory_id(self, guid):
996 return self._experiment.get_factory_id(guid)
998 @Marshalling.handles(GET_TESTBED_VERSION)
999 @Marshalling.args(int)
1000 @Marshalling.retval()
1001 def get_testbed_version(self, guid):
1002 return self._experiment.get_testbed_version(guid)
1004 class BaseProxy(object):
1006 _ServerClassModule = "nepi.util.proxy"
1008 def __init__(self, ctor_args, root_dir,
1010 communication = DC.ACCESS_LOCAL,
1017 environment_setup = "",
1018 clean_root = False):
1021 "from %(classmodule)s import %(classname)s;"
1022 "s = %(classname)s%(ctor_args)r;"
1025 classname = self._ServerClass.__name__,
1026 classmodule = self._ServerClassModule,
1027 ctor_args = ctor_args
1029 proc = server.popen_python(python_code,
1030 communication = communication,
1035 ident_key = ident_key,
1037 environment_setup = environment_setup)
1038 # Wait for the server to be ready, otherwise nobody
1039 # will be able to connect to it
1043 helo = proc.stderr.readline()
1044 if helo == 'SERVER_READY.\n':
1048 raise AssertionError, "Expected 'SERVER_READY.', got: %s" % (''.join(err),)
1049 # connect client to server
1050 self._client = server.Client(root_dir,
1051 communication = communication,
1057 environment_setup = environment_setup)
1060 def _make_message(argtypes, argencoders, command, methname, classname, *args):
1061 if len(argtypes) != len(argencoders):
1062 raise ValueError, "Invalid arguments for _make_message: "\
1063 "in stub method %s of class %s "\
1064 "argtypes and argencoders must match in size" % (
1065 methname, classname )
1066 if len(argtypes) != len(args):
1067 raise ValueError, "Invalid arguments for _make_message: "\
1068 "in stub method %s of class %s "\
1069 "expected %d arguments, got %d" % (
1070 methname, classname,
1071 len(argtypes), len(args))
1074 for argnum, (typ, (encode, fmt), val) in enumerate(zip(argtypes, argencoders, args)):
1076 buf.append(fmt % encode(val))
1079 raise TypeError, "Argument %d of stub method %s of class %s "\
1080 "requires a value of type %s, but got %s - nested error: %s" % (
1081 argnum, methname, classname,
1082 getattr(typ, '__name__', typ), type(val),
1083 traceback.format_exc()
1086 return "%d|%s" % (command, '|'.join(buf))
1089 def _parse_reply(rvtype, methname, classname, reply):
1091 raise RuntimeError, "Invalid reply: %r "\
1092 "for stub method %s of class %s" % (
1098 result = reply.split("|")
1099 code = int(result[0])
1103 raise TypeError, "Return value of stub method %s of class %s "\
1104 "cannot be parsed: must be of type %s, got %r - nested error: %s" % (
1105 methname, classname,
1106 getattr(rvtype, '__name__', rvtype), reply,
1107 traceback.format_exc()
1110 text = base64.b64decode(text)
1111 raise RuntimeError(text)
1120 raise TypeError, "Return value of stub method %s of class %s "\
1121 "cannot be parsed: must be of type %s - nested error: %s" % (
1122 methname, classname,
1123 getattr(rvtype, '__name__', rvtype),
1124 traceback.format_exc()
1127 raise RuntimeError, "Invalid reply: %r "\
1128 "for stub method %s of class %s - unknown code" % (
1134 def _make_stubs(server_class, template_class):
1136 Returns a dictionary method_name -> method
1141 class SomeProxy(BaseProxy):
1144 locals().update( BaseProxy._make_stubs(
1149 ServerClass is the corresponding Server class, as
1150 specified in the _ServerClass class method (_make_stubs
1151 is static and can't access the method), and TemplateClass
1152 is the ultimate implementation class behind the server,
1153 from which argument names and defaults are taken, to
1154 maintain meaningful interfaces.
1161 func_template_path = os.path.join(
1162 os.path.dirname(__file__),
1164 func_template_file = open(func_template_path, "r")
1165 func_template = func_template_file.read()
1166 func_template_file.close()
1168 for methname in vars(template_class).copy():
1169 if methname.endswith('_deferred'):
1170 # cannot wrap deferreds...
1172 dmethname = methname+'_deferred'
1173 if hasattr(server_class, methname) and not methname.startswith('_'):
1174 template_meth = getattr(template_class, methname)
1175 server_meth = getattr(server_class, methname)
1177 command = getattr(server_meth, '_handles_command', None)
1178 argtypes = getattr(server_meth, '_argtypes', None)
1179 argencoders = getattr(server_meth, '_argencoders', None)
1180 rvtype = getattr(server_meth, '_retval', None)
1183 if hasattr(template_meth, 'fget'):
1185 template_meth = template_meth.fget
1188 if command is not None and argtypes is not None and argencoders is not None:
1189 # We have an interface method...
1190 code = template_meth.func_code
1191 argnames = code.co_varnames[:code.co_argcount]
1192 argdefaults = ( (NONE,) * (len(argnames) - len(template_meth.func_defaults or ()))
1193 + (template_meth.func_defaults or ()) )
1195 func_globals = dict(
1196 BaseProxy = BaseProxy,
1197 argtypes = argtypes,
1198 argencoders = argencoders,
1200 functools = functools,
1204 func_text = func_template % dict(
1206 args = '%s' % (','.join(argnames[1:])),
1207 argdefs = ','.join([
1208 argname if argdef is NONE
1209 else "%s=%r" % (argname, argdef)
1210 for argname, argdef in zip(argnames[1:], argdefaults[1:])
1213 methname = methname,
1214 classname = server_class.__name__
1217 func_text = compile(
1222 exec func_text in func_globals, context
1225 rv[methname] = property(context[methname])
1226 rv[dmethname] = property(context[dmethname])
1228 rv[methname] = context[methname]
1229 rv[dmethname] = context[dmethname]
1231 # inject _deferred into core classes
1232 if hasattr(template_class, methname) and not hasattr(template_class, dmethname):
1233 def freezename(methname, dmethname):
1234 def dmeth(self, *p, **kw):
1235 return getattr(self, methname)(*p, **kw)
1236 dmeth.__name__ = dmethname
1238 dmeth = freezename(methname, dmethname)
1239 setattr(template_class, dmethname, dmeth)
1243 class ExperimentSuiteProxy(BaseProxy):
1245 _ServerClass = ExperimentSuiteServer
1247 def __init__(self, root_dir, log_level,
1248 xml, repetitions, duration, wait_guids,
1249 communication = DC.ACCESS_LOCAL,
1256 environment_setup = "",
1257 clean_root = False):
1258 super(ExperimentSuiteProxy,self).__init__(
1259 ctor_args = (root_dir, log_level,
1273 root_dir = root_dir,
1274 launch = True, #launch
1275 communication = communication,
1279 ident_key = ident_key,
1282 environment_setup = environment_setup)
1284 locals().update( BaseProxy._make_stubs(
1285 server_class = ExperimentSuiteServer,
1286 template_class = nepi.core.execute.ExperimentSuite,
1289 # Shutdown stops the serverside...
1290 def shutdown(self, _stub = shutdown):
1292 self._client.send_stop()
1293 self._client.read_reply() # wait for it
1296 class TestbedControllerProxy(BaseProxy):
1298 _ServerClass = TestbedControllerServer
1300 def __init__(self, root_dir, log_level,
1302 testbed_version = None,
1304 communication = DC.ACCESS_LOCAL,
1311 environment_setup = "",
1312 clean_root = False):
1313 if launch and (testbed_id == None or testbed_version == None):
1314 raise RuntimeError("To launch a TesbedControllerServer a "
1315 "testbed_id and testbed_version are required")
1316 super(TestbedControllerProxy,self).__init__(
1317 ctor_args = (root_dir, log_level, testbed_id, testbed_version,
1318 environment_setup, clean_root),
1319 root_dir = root_dir,
1321 communication = communication,
1325 ident_key = ident_key,
1328 environment_setup = environment_setup)
1330 locals().update( BaseProxy._make_stubs(
1331 server_class = TestbedControllerServer,
1332 template_class = nepi.core.execute.TestbedController,
1335 # Shutdown stops the serverside...
1336 def shutdown(self, _stub = shutdown):
1338 self._client.send_stop()
1339 self._client.read_reply() # wait for it
1343 class ExperimentControllerProxy(BaseProxy):
1344 _ServerClass = ExperimentControllerServer
1346 def __init__(self, root_dir, log_level,
1347 experiment_xml = None,
1349 communication = DC.ACCESS_LOCAL,
1356 environment_setup = "",
1357 clean_root = False):
1358 super(ExperimentControllerProxy,self).__init__(
1359 ctor_args = (root_dir, log_level, experiment_xml, environment_setup,
1361 root_dir = root_dir,
1363 communication = communication,
1367 ident_key = ident_key,
1370 environment_setup = environment_setup,
1371 clean_root = clean_root)
1373 locals().update( BaseProxy._make_stubs(
1374 server_class = ExperimentControllerServer,
1375 template_class = nepi.core.execute.ExperimentController,
1378 # Shutdown stops the serverside...
1379 def shutdown(self, _stub = shutdown):
1381 self._client.send_stop()
1382 self._client.read_reply() # wait for it