1 # -*- coding: utf-8 -*-
4 import nepi.core.execute
5 import nepi.util.environ
6 from nepi.core.attributes import AttributesMap, Attribute
7 from nepi.util import server, validation
8 from nepi.util.constants import TIME_NOW, ATTR_NEPI_TESTBED_ENVIRONMENT_SETUP, DeploymentConfiguration as DC
22 # PROTOCOL INSTRUCTION MESSAGES
42 DO_CROSS_CONNECT_INIT = 22
52 GET_ATTRIBUTE_LIST = 32
54 DO_CROSS_CONNECT_COMPL = 34
60 GET_TESTBED_VERSION = 40
67 ACCESS_CONFIGURATIONS = 47
68 CURRENT_ACCESS_CONFIG = 48
71 instruction_text = dict({
82 CONFIGURE: "CONFIGURE",
84 CREATE_SET: "CREATE_SET",
85 FACTORY_SET: "FACTORY_SET",
87 CROSS_CONNECT: "CROSS_CONNECT",
88 ADD_TRACE: "ADD_TRACE",
89 ADD_ADDRESS: "ADD_ADDRESS",
90 ADD_ROUTE: "ADD_ROUTE",
92 DO_CREATE: "DO_CREATE",
93 DO_CONNECT_INIT: "DO_CONNECT_INIT",
94 DO_CONNECT_COMPL: "DO_CONNECT_COMPL",
95 DO_CONFIGURE: "DO_CONFIGURE",
96 DO_PRECONFIGURE: "DO_PRECONFIGURE",
97 DO_CROSS_CONNECT_INIT: "DO_CROSS_CONNECT_INIT",
98 DO_CROSS_CONNECT_COMPL: "DO_CROSS_CONNECT_COMPL",
101 GET_ROUTE: "GET_ROUTE",
102 GET_ADDRESS: "GET_ADDRESS",
103 GET_ATTRIBUTE_LIST: "GET_ATTRIBUTE_LIST",
104 GET_FACTORY_ID: "GET_FACTORY_ID",
105 GET_TESTBED_ID: "GET_TESTBED_ID",
106 GET_TESTBED_VERSION: "GET_TESTBED_VERSION",
110 TESTBED_ID: "TESTBED_ID",
111 TESTBED_VERSION: "TESTBED_VERSION",
112 TRACES_INFO: "TRACES_INFO",
113 STARTED_TIME: "STARTED_TIME",
114 STOPPED_TIME: "STOPPED_TIME",
116 ACCESS_CONFIGURATIONS: "ACCESS_CONFIGURATIONS",
117 CURRENT_ACCESS_CONFIG: "CURRENT_ACCESS_CONFIG"
121 def log_msg(server, params):
123 instr = int(params[0])
124 instr_txt = instruction_text[instr]
125 server.log_debug("%s - msg: %s [%s]" % (server.__class__.__name__,
126 instr_txt, ", ".join(map(str, params[1:]))))
128 # don't die for logging
131 def log_reply(server, reply):
133 res = reply.split("|")
135 code_txt = instruction_text[code]
137 txt = base64.b64decode(res[1])
140 server.log_debug("%s - reply: %s %s" % (server.__class__.__name__,
143 # don't die for logging
144 server.log_debug("%s - reply: %s" % (server.__class__.__name__,
148 def to_server_log_level(log_level):
151 if log_level == DC.DEBUG_LEVEL
155 def get_access_config_params(access_config):
156 mode = access_config.get_attribute_value(DC.DEPLOYMENT_MODE)
157 launch = not access_config.get_attribute_value(DC.RECOVER)
158 root_dir = access_config.get_attribute_value(DC.ROOT_DIRECTORY)
159 log_level = access_config.get_attribute_value(DC.LOG_LEVEL)
160 log_level = to_server_log_level(log_level)
161 communication = access_config.get_attribute_value(DC.DEPLOYMENT_COMMUNICATION)
162 environment_setup = (
163 access_config.get_attribute_value(DC.DEPLOYMENT_ENVIRONMENT_SETUP)
164 if access_config.has_attribute(DC.DEPLOYMENT_ENVIRONMENT_SETUP)
167 user = access_config.get_attribute_value(DC.DEPLOYMENT_USER)
168 host = access_config.get_attribute_value(DC.DEPLOYMENT_HOST)
169 port = access_config.get_attribute_value(DC.DEPLOYMENT_PORT)
170 agent = access_config.get_attribute_value(DC.USE_AGENT)
171 sudo = access_config.get_attribute_value(DC.USE_SUDO)
172 key = access_config.get_attribute_value(DC.DEPLOYMENT_KEY)
173 communication = access_config.get_attribute_value(DC.DEPLOYMENT_COMMUNICATION)
174 clean_root = access_config.get_attribute_value(DC.CLEAN_ROOT)
175 return (mode, launch, root_dir, log_level, communication, user, host, port,
176 key, agent, sudo, environment_setup, clean_root)
178 class AccessConfiguration(AttributesMap):
179 def __init__(self, params = None):
180 super(AccessConfiguration, self).__init__()
182 from nepi.core.metadata import Metadata
184 for _,attr_info in Metadata.PROXY_ATTRIBUTES.iteritems():
185 self.add_attribute(**attr_info)
188 for attr_name, attr_value in params.iteritems():
189 parser = Attribute.type_parsers[self.get_attribute_type(attr_name)]
190 attr_value = parser(attr_value)
191 self.set_attribute_value(attr_name, attr_value)
193 class TempDir(object):
195 self.path = tempfile.mkdtemp()
198 shutil.rmtree(self.path)
200 class PermDir(object):
201 def __init__(self, path):
204 def create_experiment_suite(xml, access_config, repetitions = None,
205 duration = None, wait_guids = None):
208 (mode, launch, root_dir, log_level, communication, user, host, port,
209 key, agent, sudo, environment_setup, clean_root) \
210 = get_access_config_params(access_config)
212 if not mode or mode == DC.MODE_SINGLE_PROCESS:
213 from nepi.core.execute import ExperimentSuite
217 root_dir = PermDir(access_config.get_attribute_value(DC.ROOT_DIRECTORY))
219 exp_suite = ExperimentSuite(xml, access_config, repetitions, duration,
222 # inject reference to temporary dir, so that it gets cleaned
223 # up at destruction time.
224 exp_suite._tempdir = root_dir
226 elif mode == DC.MODE_DAEMON:
227 return ExperimentSuiteProxy(root_dir, log_level,
229 repetitions = repetitions,
231 wait_guids = wait_guids,
232 communication = communication,
239 environment_setup = environment_setup,
240 clean_root = clean_root)
241 raise RuntimeError("Unsupported access configuration '%s'" % mode)
243 def create_experiment_controller(xml, access_config = None):
246 log_level = DC.ERROR_LEVEL
248 (mode, launch, root_dir, log_level, communication, user, host, port,
249 key, agent, sudo, environment_setup, clean_root) \
250 = get_access_config_params(access_config)
252 os.environ["NEPI_CONTROLLER_LOGLEVEL"] = log_level
254 if not mode or mode == DC.MODE_SINGLE_PROCESS:
255 from nepi.core.execute import ExperimentController
257 if not access_config or not access_config.has_attribute(DC.ROOT_DIRECTORY):
260 root_dir = PermDir(access_config.get_attribute_value(DC.ROOT_DIRECTORY))
261 controller = ExperimentController(xml, root_dir.path)
263 # inject reference to temporary dir, so that it gets cleaned
264 # up at destruction time.
265 controller._tempdir = root_dir
272 elif mode == DC.MODE_DAEMON:
274 return ExperimentControllerProxy(root_dir, log_level,
275 experiment_xml = xml,
276 communication = communication,
284 environment_setup = environment_setup,
285 clean_root = clean_root)
288 # Maybe controller died, recover from persisted testbed information if possible
289 controller = ExperimentControllerProxy(root_dir, log_level,
290 experiment_xml = xml,
291 communication = communication,
299 environment_setup = environment_setup,
300 clean_root = clean_root)
305 raise RuntimeError("Unsupported access configuration '%s'" % mode)
307 def create_testbed_controller(testbed_id, testbed_version, access_config):
310 log_level = DC.ERROR_LEVEL
312 (mode, launch, root_dir, log_level, communication, user, host, port,
313 key, agent, sudo, environment_setup, clean_root) \
314 = get_access_config_params(access_config)
316 os.environ["NEPI_CONTROLLER_LOGLEVEL"] = log_level
318 if not mode or mode == DC.MODE_SINGLE_PROCESS:
320 raise ValueError, "Unsupported instantiation mode: %s with launch=False" % (mode,)
321 return _build_testbed_controller(testbed_id, testbed_version)
322 elif mode == DC.MODE_DAEMON:
323 return TestbedControllerProxy(root_dir, log_level,
324 testbed_id = testbed_id,
325 testbed_version = testbed_version,
326 communication = communication,
334 environment_setup = environment_setup,
335 clean_root = clean_root)
336 raise RuntimeError("Unsupported access configuration '%s'" % mode)
338 def _build_testbed_controller(testbed_id, testbed_version):
339 mod_name = nepi.util.environ.find_testbed(testbed_id)
341 if not mod_name in sys.modules:
345 raise ImportError, "Cannot find module %s in %r" % (mod_name, sys.path)
347 module = sys.modules[mod_name]
348 tc = module.TestbedController()
349 if tc.testbed_version != testbed_version:
350 raise RuntimeError("Bad testbed version on testbed %s. Asked for %s, got %s" % \
351 (testbed_id, testbed_version, tc.testbed_version))
354 # Just a namespace class
358 def pickled_data(sdata):
359 return cPickle.loads(base64.b64decode(sdata))
362 def base64_data(sdata):
363 return base64.b64decode(sdata)
367 return None if sdata == "None" else int(sdata)
371 return sdata == 'True'
375 def pickled_data(data):
376 return base64.b64encode(cPickle.dumps(data))
379 def base64_data(data):
382 return base64.b64encode(data)
386 return "None" if data is None else int(data)
390 return str(bool(data))
392 # import into Marshalling all the decoders
396 for typname, typ in vars(Decoders).iteritems()
397 if not typname.startswith('_')
400 _TYPE_ENCODERS = dict([
401 # id(type) -> (<encoding_function>, <formatting_string>)
402 (typname, (getattr(Encoders,typname),"%s"))
403 for typname in vars(Decoders)
404 if not typname.startswith('_')
405 and hasattr(Encoders,typname)
409 _TYPE_ENCODERS["float"] = (float, "%r")
410 _TYPE_ENCODERS["int"] = (int, "%d")
411 _TYPE_ENCODERS["long"] = (int, "%d")
412 _TYPE_ENCODERS["str"] = (str, "%s")
413 _TYPE_ENCODERS["unicode"] = (str, "%s")
416 _TYPE_ENCODERS[None] = (str, "%s")
421 Decorator that converts the given function into one that takes
422 a single "params" list, with each parameter marshalled according
423 to the given factory callable (type constructors are accepted).
425 The first argument (self) is left untouched.
429 @Marshalling.args(int,int,str,base64_data)
430 def somefunc(self, someint, otherint, somestr, someb64):
435 def rv(self, params):
436 return f(self, *[ ctor(val)
437 for ctor,val in zip(types, params[1:]) ])
441 # Derive type encoders by looking up types in _TYPE_ENCODERS
442 # make_proxy will use it to encode arguments in command strings
444 TYPE_ENCODERS = Marshalling._TYPE_ENCODERS
446 if typ.__name__ in TYPE_ENCODERS:
447 argencoders.append(TYPE_ENCODERS[typ.__name__])
450 argencoders.append(TYPE_ENCODERS[None])
452 rv._argencoders = tuple(argencoders)
454 rv._retval = getattr(f, '_retval', None)
459 def retval(typ=Decoders.base64_data):
461 Decorator that converts the given function into one that
462 returns a properly encoded return string, given that the undecorated
463 function returns suitable input for the encoding function.
465 The optional typ argument specifies a type.
466 For the default of base64_data, return values should be strings.
467 The return value of the encoding method should be a string always.
471 @Marshalling.args(int,int,str,base64_data)
472 @Marshalling.retval(str)
473 def somefunc(self, someint, otherint, somestr, someb64):
476 encode, fmt = Marshalling._TYPE_ENCODERS.get(
478 Marshalling._TYPE_ENCODERS[None])
483 def rv(self, *p, **kw):
484 data = f(self, *p, **kw)
490 rv._argtypes = getattr(f, '_argtypes', None)
491 rv._argencoders = getattr(f, '_argencoders', None)
498 Decorator that converts the given function into one that
499 always return an encoded empty string.
501 Useful for null-returning functions.
506 def rv(self, *p, **kw):
511 rv._argtypes = getattr(f, '_argtypes', None)
512 rv._argencoders = getattr(f, '_argencoders', None)
516 def handles(whichcommand):
518 Associates the method with a given command code for servers.
519 It should always be the topmost decorator.
522 f._handles_command = whichcommand
526 class BaseServer(server.Server):
527 def reply_action(self, msg):
529 result = base64.b64encode("Invalid command line")
530 reply = "%d|%s" % (ERROR, result)
532 params = msg.split("|")
533 instruction = int(params[0])
534 log_msg(self, params)
536 for mname,meth in vars(self.__class__).iteritems():
537 if not mname.startswith('_'):
538 cmd = getattr(meth, '_handles_command', None)
539 if cmd == instruction:
540 meth = getattr(self, mname)
544 error = "Invalid instruction %s" % instruction
545 self.log_error(error)
546 result = base64.b64encode(error)
547 reply = "%d|%s" % (ERROR, result)
549 error = self.log_error()
550 result = base64.b64encode(error)
551 reply = "%d|%s" % (ERROR, result)
552 log_reply(self, reply)
555 class ExperimentSuiteServer(BaseServer):
556 def __init__(self, root_dir, log_level,
557 xml, repetitions, duration, wait_guids,
558 communication = DC.ACCESS_LOCAL,
565 environment_setup = "",
567 super(ExperimentSuiteServer, self).__init__(root_dir, log_level,
568 environment_setup = environment_setup, clean_root = clean_root)
569 access_config = AccessConfiguration()
570 access_config.set_attribute_value(DC.ROOT_DIRECTORY, root_dir)
571 access_config.set_attribute_value(DC.LOG_LEVEL, log_level)
572 access_config.set_attribute_value(DC.DEPLOYMENT_ENVIRONMENT_SETUP, environment_setup)
574 access_config.set_attribute_value(DC.DEPLOYMENT_USER, user)
576 access_config.set_attribute_value(DC.DEPLOYMENT_HOST, host)
578 access_config.set_attribute_value(DC.DEPLOYMENT_PORT, port)
580 access_config.set_attribute_value(DC.USE_AGENT, agent)
582 acess_config.set_attribute_value(DC.USE_SUDO, sudo)
584 access_config.set_attribute_value(DC.DEPLOYMENT_KEY, ident_key)
586 access_config.set_attribute_value(DC.DEPLOYMENT_COMMUNICATION, communication)
588 access_config.set_attribute_value(DC.CLEAN_ROOT, clean_root)
589 self._experiment_xml = xml
590 self._duration = duration
591 self._repetitions = repetitions
592 self._wait_guids = wait_guids
593 self._access_config = access_config
594 self._experiment_suite = None
596 def post_daemonize(self):
597 from nepi.core.execute import ExperimentSuite
598 self._experiment_suite = ExperimentSuite(
599 self._experiment_xml, self._access_config,
600 self._repetitions, self._duration, self._wait_guids)
602 @Marshalling.handles(CURRENT)
604 @Marshalling.retval(int)
606 return self._experiment_suite.current()
608 @Marshalling.handles(STATUS)
610 @Marshalling.retval(int)
612 return self._experiment_suite.status()
614 @Marshalling.handles(FINISHED)
616 @Marshalling.retval(Marshalling.bool)
617 def is_finished(self):
618 return self._experiment_suite.is_finished()
620 @Marshalling.handles(ACCESS_CONFIGURATIONS)
622 @Marshalling.retval( Marshalling.pickled_data )
623 def get_access_configurations(self):
624 return self._experiment_suite.get_access_configurations()
626 @Marshalling.handles(START)
630 self._experiment_suite.start()
632 @Marshalling.handles(SHUTDOWN)
636 self._experiment_suite.shutdown()
638 @Marshalling.handles(CURRENT_ACCESS_CONFIG)
640 @Marshalling.retval( Marshalling.pickled_data )
641 def get_current_access_config(self):
642 return self._experiment_suite.get_current_access_config()
644 class TestbedControllerServer(BaseServer):
645 def __init__(self, root_dir, log_level, testbed_id, testbed_version,
646 environment_setup, clean_root):
647 super(TestbedControllerServer, self).__init__(root_dir, log_level,
648 environment_setup = environment_setup, clean_root = clean_root)
649 self._testbed_id = testbed_id
650 self._testbed_version = testbed_version
653 def post_daemonize(self):
654 self._testbed = _build_testbed_controller(self._testbed_id,
655 self._testbed_version)
657 @Marshalling.handles(GUIDS)
659 @Marshalling.retval( Marshalling.pickled_data )
661 return self._testbed.guids
663 @Marshalling.handles(TESTBED_ID)
665 @Marshalling.retval()
666 def testbed_id(self):
667 return str(self._testbed.testbed_id)
669 @Marshalling.handles(TESTBED_VERSION)
671 @Marshalling.retval()
672 def testbed_version(self):
673 return str(self._testbed.testbed_version)
675 @Marshalling.handles(CREATE)
676 @Marshalling.args(int, str)
678 def defer_create(self, guid, factory_id):
679 self._testbed.defer_create(guid, factory_id)
681 @Marshalling.handles(TRACE)
682 @Marshalling.args(int, str, Marshalling.base64_data)
683 @Marshalling.retval()
684 def trace(self, guid, trace_id, attribute):
685 return self._testbed.trace(guid, trace_id, attribute)
687 @Marshalling.handles(TRACES_INFO)
689 @Marshalling.retval( Marshalling.pickled_data )
690 def traces_info(self):
691 return self._testbed.traces_info()
693 @Marshalling.handles(START)
697 self._testbed.start()
699 @Marshalling.handles(STOP)
705 @Marshalling.handles(SHUTDOWN)
709 self._testbed.shutdown()
711 @Marshalling.handles(CONFIGURE)
712 @Marshalling.args(Marshalling.base64_data, Marshalling.pickled_data)
714 def defer_configure(self, name, value):
715 self._testbed.defer_configure(name, value)
717 @Marshalling.handles(CREATE_SET)
718 @Marshalling.args(int, Marshalling.base64_data, Marshalling.pickled_data)
720 def defer_create_set(self, guid, name, value):
721 self._testbed.defer_create_set(guid, name, value)
723 @Marshalling.handles(FACTORY_SET)
724 @Marshalling.args(Marshalling.base64_data, Marshalling.pickled_data)
726 def defer_factory_set(self, name, value):
727 self._testbed.defer_factory_set(name, value)
729 @Marshalling.handles(CONNECT)
730 @Marshalling.args(int, str, int, str)
732 def defer_connect(self, guid1, connector_type_name1, guid2, connector_type_name2):
733 self._testbed.defer_connect(guid1, connector_type_name1, guid2,
734 connector_type_name2)
736 @Marshalling.handles(CROSS_CONNECT)
737 @Marshalling.args(int, str, int, int, str, str, str)
739 def defer_cross_connect(self,
740 guid, connector_type_name,
741 cross_guid, cross_testbed_guid,
742 cross_testbed_id, cross_factory_id,
743 cross_connector_type_name):
744 self._testbed.defer_cross_connect(guid, connector_type_name, cross_guid,
745 cross_testbed_guid, cross_testbed_id, cross_factory_id,
746 cross_connector_type_name)
748 @Marshalling.handles(ADD_TRACE)
749 @Marshalling.args(int, str)
751 def defer_add_trace(self, guid, trace_id):
752 self._testbed.defer_add_trace(guid, trace_id)
754 @Marshalling.handles(ADD_ADDRESS)
755 @Marshalling.args(int, str, int, Marshalling.pickled_data)
757 def defer_add_address(self, guid, address, netprefix, broadcast):
758 self._testbed.defer_add_address(guid, address, netprefix,
761 @Marshalling.handles(ADD_ROUTE)
762 @Marshalling.args(int, str, int, str, int, str)
764 def defer_add_route(self, guid, destination, netprefix, nexthop,
766 self._testbed.defer_add_route(guid, destination, netprefix, nexthop,
769 @Marshalling.handles(DO_SETUP)
773 self._testbed.do_setup()
775 @Marshalling.handles(DO_CREATE)
779 self._testbed.do_create()
781 @Marshalling.handles(DO_CONNECT_INIT)
784 def do_connect_init(self):
785 self._testbed.do_connect_init()
787 @Marshalling.handles(DO_CONNECT_COMPL)
790 def do_connect_compl(self):
791 self._testbed.do_connect_compl()
793 @Marshalling.handles(DO_CONFIGURE)
796 def do_configure(self):
797 self._testbed.do_configure()
799 @Marshalling.handles(DO_PRECONFIGURE)
802 def do_preconfigure(self):
803 self._testbed.do_preconfigure()
805 @Marshalling.handles(DO_PRESTART)
808 def do_prestart(self):
809 self._testbed.do_prestart()
811 @Marshalling.handles(DO_CROSS_CONNECT_INIT)
812 @Marshalling.args( Marshalling.Decoders.pickled_data )
814 def do_cross_connect_init(self, cross_data):
815 self._testbed.do_cross_connect_init(cross_data)
817 @Marshalling.handles(DO_CROSS_CONNECT_COMPL)
818 @Marshalling.args( Marshalling.Decoders.pickled_data )
820 def do_cross_connect_compl(self, cross_data):
821 self._testbed.do_cross_connect_compl(cross_data)
823 @Marshalling.handles(GET)
824 @Marshalling.args(int, Marshalling.base64_data, str)
825 @Marshalling.retval( Marshalling.pickled_data )
826 def get(self, guid, name, time):
827 return self._testbed.get(guid, name, time)
829 @Marshalling.handles(SET)
830 @Marshalling.args(int, Marshalling.base64_data, Marshalling.pickled_data, str)
832 def set(self, guid, name, value, time):
833 self._testbed.set(guid, name, value, time)
835 @Marshalling.handles(GET_ADDRESS)
836 @Marshalling.args(int, int, Marshalling.base64_data)
837 @Marshalling.retval()
838 def get_address(self, guid, index, attribute):
839 return str(self._testbed.get_address(guid, index, attribute))
841 @Marshalling.handles(GET_ROUTE)
842 @Marshalling.args(int, int, Marshalling.base64_data)
843 @Marshalling.retval()
844 def get_route(self, guid, index, attribute):
845 return str(self._testbed.get_route(guid, index, attribute))
847 @Marshalling.handles(ACTION)
848 @Marshalling.args(str, int, Marshalling.base64_data)
850 def action(self, time, guid, command):
851 self._testbed.action(time, guid, command)
853 @Marshalling.handles(STATUS)
854 @Marshalling.args(Marshalling.nullint)
855 @Marshalling.retval(int)
856 def status(self, guid):
857 return self._testbed.status(guid)
859 @Marshalling.handles(TESTBED_STATUS)
861 @Marshalling.retval(int)
862 def testbed_status(self):
863 return self._testbed.testbed_status()
865 @Marshalling.handles(GET_ATTRIBUTE_LIST)
866 @Marshalling.args(int, Marshalling.nullint, Marshalling.bool)
867 @Marshalling.retval( Marshalling.pickled_data )
868 def get_attribute_list(self, guid, filter_flags = None, exclude = False):
869 return self._testbed.get_attribute_list(guid, filter_flags, exclude)
871 @Marshalling.handles(GET_FACTORY_ID)
872 @Marshalling.args(int)
873 @Marshalling.retval()
874 def get_factory_id(self, guid):
875 return self._testbed.get_factory_id(guid)
877 @Marshalling.handles(RECOVER)
881 self._testbed.recover()
884 class ExperimentControllerServer(BaseServer):
885 def __init__(self, root_dir, log_level, experiment_xml, environment_setup,
887 super(ExperimentControllerServer, self).__init__(root_dir, log_level,
888 environment_setup = environment_setup, clean_root = clean_root)
889 self._experiment_xml = experiment_xml
890 self._experiment = None
892 def post_daemonize(self):
893 from nepi.core.execute import ExperimentController
894 self._experiment = ExperimentController(self._experiment_xml,
895 root_dir = self._root_dir)
897 @Marshalling.handles(GUIDS)
899 @Marshalling.retval( Marshalling.pickled_data )
901 return self._experiment.guids
903 @Marshalling.handles(STARTED_TIME)
905 @Marshalling.retval( Marshalling.pickled_data )
906 def started_time(self):
907 return self._experiment.started_time
909 @Marshalling.handles(STOPPED_TIME)
911 @Marshalling.retval( Marshalling.pickled_data )
912 def stopped_time(self):
913 return self._experiment.stopped_time
915 @Marshalling.handles(XML)
917 @Marshalling.retval()
918 def experiment_design_xml(self):
919 return self._experiment.experiment_design_xml
921 @Marshalling.handles(EXEC_XML)
923 @Marshalling.retval()
924 def experiment_execute_xml(self):
925 return self._experiment.experiment_execute_xml
927 @Marshalling.handles(TRACE)
928 @Marshalling.args(int, str, Marshalling.base64_data)
929 @Marshalling.retval()
930 def trace(self, guid, trace_id, attribute):
931 return str(self._experiment.trace(guid, trace_id, attribute))
933 @Marshalling.handles(TRACES_INFO)
935 @Marshalling.retval( Marshalling.pickled_data )
936 def traces_info(self):
937 return self._experiment.traces_info()
939 @Marshalling.handles(FINISHED)
940 @Marshalling.args(int)
941 @Marshalling.retval(Marshalling.bool)
942 def is_finished(self, guid):
943 return self._experiment.is_finished(guid)
945 @Marshalling.handles(STATUS)
946 @Marshalling.args(int)
947 @Marshalling.retval(int)
948 def status(self, guid):
949 return self._experiment.status(guid)
951 @Marshalling.handles(GET)
952 @Marshalling.args(int, Marshalling.base64_data, str)
953 @Marshalling.retval( Marshalling.pickled_data )
954 def get(self, guid, name, time):
955 return self._experiment.get(guid, name, time)
957 @Marshalling.handles(SET)
958 @Marshalling.args(int, Marshalling.base64_data, Marshalling.pickled_data, str)
960 def set(self, guid, name, value, time):
961 self._experiment.set(guid, name, value, time)
963 @Marshalling.handles(START)
967 self._experiment.start()
969 @Marshalling.handles(STOP)
973 self._experiment.stop()
975 @Marshalling.handles(RECOVER)
979 self._experiment.recover()
981 @Marshalling.handles(SHUTDOWN)
985 self._experiment.shutdown()
987 @Marshalling.handles(GET_TESTBED_ID)
988 @Marshalling.args(int)
989 @Marshalling.retval()
990 def get_testbed_id(self, guid):
991 return self._experiment.get_testbed_id(guid)
993 @Marshalling.handles(GET_FACTORY_ID)
994 @Marshalling.args(int)
995 @Marshalling.retval()
996 def get_factory_id(self, guid):
997 return self._experiment.get_factory_id(guid)
999 @Marshalling.handles(GET_TESTBED_VERSION)
1000 @Marshalling.args(int)
1001 @Marshalling.retval()
1002 def get_testbed_version(self, guid):
1003 return self._experiment.get_testbed_version(guid)
1005 class BaseProxy(object):
1007 _ServerClassModule = "nepi.util.proxy"
1009 def __init__(self, ctor_args, root_dir,
1011 communication = DC.ACCESS_LOCAL,
1018 environment_setup = "",
1019 clean_root = False):
1022 "from %(classmodule)s import %(classname)s;"
1023 "s = %(classname)s%(ctor_args)r;"
1026 classname = self._ServerClass.__name__,
1027 classmodule = self._ServerClassModule,
1028 ctor_args = ctor_args
1030 proc = server.popen_python(python_code,
1031 communication = communication,
1036 ident_key = ident_key,
1038 environment_setup = environment_setup)
1039 # Wait for the server to be ready, otherwise nobody
1040 # will be able to connect to it
1044 helo = proc.stderr.readline()
1045 if helo == 'SERVER_READY.\n':
1049 raise AssertionError, "Expected 'SERVER_READY.', got: %s" % (''.join(err),)
1050 # connect client to server
1051 self._client = server.Client(root_dir,
1052 communication = communication,
1058 environment_setup = environment_setup)
1061 def _make_message(argtypes, argencoders, command, methname, classname, *args):
1062 if len(argtypes) != len(argencoders):
1063 raise ValueError, "Invalid arguments for _make_message: "\
1064 "in stub method %s of class %s "\
1065 "argtypes and argencoders must match in size" % (
1066 methname, classname )
1067 if len(argtypes) != len(args):
1068 raise ValueError, "Invalid arguments for _make_message: "\
1069 "in stub method %s of class %s "\
1070 "expected %d arguments, got %d" % (
1071 methname, classname,
1072 len(argtypes), len(args))
1075 for argnum, (typ, (encode, fmt), val) in enumerate(zip(argtypes, argencoders, args)):
1077 buf.append(fmt % encode(val))
1080 raise TypeError, "Argument %d of stub method %s of class %s "\
1081 "requires a value of type %s, but got %s - nested error: %s" % (
1082 argnum, methname, classname,
1083 getattr(typ, '__name__', typ), type(val),
1084 traceback.format_exc()
1087 return "%d|%s" % (command, '|'.join(buf))
1090 def _parse_reply(rvtype, methname, classname, reply):
1092 raise RuntimeError, "Invalid reply: %r "\
1093 "for stub method %s of class %s" % (
1099 result = reply.split("|")
1100 code = int(result[0])
1104 raise TypeError, "Return value of stub method %s of class %s "\
1105 "cannot be parsed: must be of type %s, got %r - nested error: %s" % (
1106 methname, classname,
1107 getattr(rvtype, '__name__', rvtype), reply,
1108 traceback.format_exc()
1111 text = base64.b64decode(text)
1112 raise RuntimeError(text)
1121 raise TypeError, "Return value of stub method %s of class %s "\
1122 "cannot be parsed: must be of type %s - nested error: %s" % (
1123 methname, classname,
1124 getattr(rvtype, '__name__', rvtype),
1125 traceback.format_exc()
1128 raise RuntimeError, "Invalid reply: %r "\
1129 "for stub method %s of class %s - unknown code" % (
1135 def _make_stubs(server_class, template_class):
1137 Returns a dictionary method_name -> method
1142 class SomeProxy(BaseProxy):
1145 locals().update( BaseProxy._make_stubs(
1150 ServerClass is the corresponding Server class, as
1151 specified in the _ServerClass class method (_make_stubs
1152 is static and can't access the method), and TemplateClass
1153 is the ultimate implementation class behind the server,
1154 from which argument names and defaults are taken, to
1155 maintain meaningful interfaces.
1162 func_template_path = os.path.join(
1163 os.path.dirname(__file__),
1165 func_template_file = open(func_template_path, "r")
1166 func_template = func_template_file.read()
1167 func_template_file.close()
1169 for methname in vars(template_class).copy():
1170 if methname.endswith('_deferred'):
1171 # cannot wrap deferreds...
1173 dmethname = methname+'_deferred'
1174 if hasattr(server_class, methname) and not methname.startswith('_'):
1175 template_meth = getattr(template_class, methname)
1176 server_meth = getattr(server_class, methname)
1178 command = getattr(server_meth, '_handles_command', None)
1179 argtypes = getattr(server_meth, '_argtypes', None)
1180 argencoders = getattr(server_meth, '_argencoders', None)
1181 rvtype = getattr(server_meth, '_retval', None)
1184 if hasattr(template_meth, 'fget'):
1186 template_meth = template_meth.fget
1189 if command is not None and argtypes is not None and argencoders is not None:
1190 # We have an interface method...
1191 code = template_meth.func_code
1192 argnames = code.co_varnames[:code.co_argcount]
1193 argdefaults = ( (NONE,) * (len(argnames) - len(template_meth.func_defaults or ()))
1194 + (template_meth.func_defaults or ()) )
1196 func_globals = dict(
1197 BaseProxy = BaseProxy,
1198 argtypes = argtypes,
1199 argencoders = argencoders,
1201 functools = functools,
1205 func_text = func_template % dict(
1207 args = '%s' % (','.join(argnames[1:])),
1208 argdefs = ','.join([
1209 argname if argdef is NONE
1210 else "%s=%r" % (argname, argdef)
1211 for argname, argdef in zip(argnames[1:], argdefaults[1:])
1214 methname = methname,
1215 classname = server_class.__name__
1218 func_text = compile(
1223 exec func_text in func_globals, context
1226 rv[methname] = property(context[methname])
1227 rv[dmethname] = property(context[dmethname])
1229 rv[methname] = context[methname]
1230 rv[dmethname] = context[dmethname]
1232 # inject _deferred into core classes
1233 if hasattr(template_class, methname) and not hasattr(template_class, dmethname):
1234 def freezename(methname, dmethname):
1235 def dmeth(self, *p, **kw):
1236 return getattr(self, methname)(*p, **kw)
1237 dmeth.__name__ = dmethname
1239 dmeth = freezename(methname, dmethname)
1240 setattr(template_class, dmethname, dmeth)
1244 class ExperimentSuiteProxy(BaseProxy):
1246 _ServerClass = ExperimentSuiteServer
1248 def __init__(self, root_dir, log_level,
1249 xml, repetitions, duration, wait_guids,
1250 communication = DC.ACCESS_LOCAL,
1257 environment_setup = "",
1258 clean_root = False):
1259 super(ExperimentSuiteProxy,self).__init__(
1260 ctor_args = (root_dir, log_level,
1274 root_dir = root_dir,
1275 launch = True, #launch
1276 communication = communication,
1280 ident_key = ident_key,
1283 environment_setup = environment_setup)
1285 locals().update( BaseProxy._make_stubs(
1286 server_class = ExperimentSuiteServer,
1287 template_class = nepi.core.execute.ExperimentSuite,
1290 # Shutdown stops the serverside...
1291 def shutdown(self, _stub = shutdown):
1293 self._client.send_stop()
1294 self._client.read_reply() # wait for it
1297 class TestbedControllerProxy(BaseProxy):
1299 _ServerClass = TestbedControllerServer
1301 def __init__(self, root_dir, log_level,
1303 testbed_version = None,
1305 communication = DC.ACCESS_LOCAL,
1312 environment_setup = "",
1313 clean_root = False):
1314 if launch and (testbed_id == None or testbed_version == None):
1315 raise RuntimeError("To launch a TesbedControllerServer a "
1316 "testbed_id and testbed_version are required")
1317 super(TestbedControllerProxy,self).__init__(
1318 ctor_args = (root_dir, log_level, testbed_id, testbed_version,
1319 environment_setup, clean_root),
1320 root_dir = root_dir,
1322 communication = communication,
1326 ident_key = ident_key,
1329 environment_setup = environment_setup)
1331 locals().update( BaseProxy._make_stubs(
1332 server_class = TestbedControllerServer,
1333 template_class = nepi.core.execute.TestbedController,
1336 # Shutdown stops the serverside...
1337 def shutdown(self, _stub = shutdown):
1339 self._client.send_stop()
1340 self._client.read_reply() # wait for it
1344 class ExperimentControllerProxy(BaseProxy):
1345 _ServerClass = ExperimentControllerServer
1347 def __init__(self, root_dir, log_level,
1348 experiment_xml = None,
1350 communication = DC.ACCESS_LOCAL,
1357 environment_setup = "",
1358 clean_root = False):
1359 super(ExperimentControllerProxy,self).__init__(
1360 ctor_args = (root_dir, log_level, experiment_xml, environment_setup,
1362 root_dir = root_dir,
1364 communication = communication,
1368 ident_key = ident_key,
1371 environment_setup = environment_setup,
1372 clean_root = clean_root)
1374 locals().update( BaseProxy._make_stubs(
1375 server_class = ExperimentControllerServer,
1376 template_class = nepi.core.execute.ExperimentController,
1379 # Shutdown stops the serverside...
1380 def shutdown(self, _stub = shutdown):
1382 self._client.send_stop()
1383 self._client.read_reply() # wait for it