1 # -*- coding: utf-8 -*-
4 import nepi.core.execute
5 import nepi.util.environ
6 from nepi.core.attributes import AttributesMap, Attribute
7 from nepi.util import server, validation
8 from nepi.util.constants import TIME_NOW, ATTR_NEPI_TESTBED_ENVIRONMENT_SETUP, DeploymentConfiguration as DC
22 # PROTOCOL INSTRUCTION MESSAGES
42 DO_CROSS_CONNECT_INIT = 22
52 GET_ATTRIBUTE_LIST = 32
54 DO_CROSS_CONNECT_COMPL = 34
60 GET_TESTBED_VERSION = 40
67 ACCESS_CONFIGURATIONS = 47
68 CURRENT_ACCESS_CONFIG = 48
71 instruction_text = dict({
82 CONFIGURE: "CONFIGURE",
84 CREATE_SET: "CREATE_SET",
85 FACTORY_SET: "FACTORY_SET",
87 CROSS_CONNECT: "CROSS_CONNECT",
88 ADD_TRACE: "ADD_TRACE",
89 ADD_ADDRESS: "ADD_ADDRESS",
90 ADD_ROUTE: "ADD_ROUTE",
92 DO_CREATE: "DO_CREATE",
93 DO_CONNECT_INIT: "DO_CONNECT_INIT",
94 DO_CONNECT_COMPL: "DO_CONNECT_COMPL",
95 DO_CONFIGURE: "DO_CONFIGURE",
96 DO_PRECONFIGURE: "DO_PRECONFIGURE",
97 DO_CROSS_CONNECT_INIT: "DO_CROSS_CONNECT_INIT",
98 DO_CROSS_CONNECT_COMPL: "DO_CROSS_CONNECT_COMPL",
101 GET_ROUTE: "GET_ROUTE",
102 GET_ADDRESS: "GET_ADDRESS",
103 GET_ATTRIBUTE_LIST: "GET_ATTRIBUTE_LIST",
104 GET_FACTORY_ID: "GET_FACTORY_ID",
105 GET_TESTBED_ID: "GET_TESTBED_ID",
106 GET_TESTBED_VERSION: "GET_TESTBED_VERSION",
110 TESTBED_ID: "TESTBED_ID",
111 TESTBED_VERSION: "TESTBED_VERSION",
112 TRACES_INFO: "TRACES_INFO",
113 STARTED_TIME: "STARTED_TIME",
114 STOPPED_TIME: "STOPPED_TIME",
116 ACCESS_CONFIGURATIONS: "ACCESS_CONFIGURATIONS",
117 CURRENT_ACCESS_CONFIG: "CURRENT_ACCESS_CONFIG"
121 def log_msg(server, params):
123 instr = int(params[0])
124 instr_txt = instruction_text[instr]
125 server.log_debug("%s - msg: %s [%s]" % (server.__class__.__name__,
126 instr_txt, ", ".join(map(str, params[1:]))))
128 # don't die for logging
131 def log_reply(server, reply):
133 res = reply.split("|")
135 code_txt = instruction_text[code]
137 txt = base64.b64decode(res[1])
140 server.log_debug("%s - reply: %s %s" % (server.__class__.__name__,
143 # don't die for logging
144 server.log_debug("%s - reply: %s" % (server.__class__.__name__,
148 def to_server_log_level(log_level):
151 if log_level == DC.DEBUG_LEVEL
155 def get_access_config_params(access_config):
156 mode = access_config.get_attribute_value(DC.DEPLOYMENT_MODE)
157 launch = not access_config.get_attribute_value(DC.RECOVER)
158 root_dir = access_config.get_attribute_value(DC.ROOT_DIRECTORY)
159 log_level = access_config.get_attribute_value(DC.LOG_LEVEL)
160 log_level = to_server_log_level(log_level)
161 communication = access_config.get_attribute_value(DC.DEPLOYMENT_COMMUNICATION)
162 environment_setup = (
163 access_config.get_attribute_value(DC.DEPLOYMENT_ENVIRONMENT_SETUP)
164 if access_config.has_attribute(DC.DEPLOYMENT_ENVIRONMENT_SETUP)
167 user = access_config.get_attribute_value(DC.DEPLOYMENT_USER)
168 host = access_config.get_attribute_value(DC.DEPLOYMENT_HOST)
169 port = access_config.get_attribute_value(DC.DEPLOYMENT_PORT)
170 agent = access_config.get_attribute_value(DC.USE_AGENT)
171 sudo = access_config.get_attribute_value(DC.USE_SUDO)
172 key = access_config.get_attribute_value(DC.DEPLOYMENT_KEY)
173 communication = access_config.get_attribute_value(DC.DEPLOYMENT_COMMUNICATION)
174 clean_root = access_config.get_attribute_value(DC.CLEAN_ROOT)
175 return (mode, launch, root_dir, log_level, communication, user, host, port,
176 key, agent, sudo, environment_setup, clean_root)
178 class AccessConfiguration(AttributesMap):
179 def __init__(self, params = None):
180 super(AccessConfiguration, self).__init__()
182 from nepi.core.metadata import Metadata
184 for _,attr_info in Metadata.PROXY_ATTRIBUTES.iteritems():
185 self.add_attribute(**attr_info)
188 for attr_name, attr_value in params.iteritems():
189 parser = Attribute.type_parsers[self.get_attribute_type(attr_name)]
190 attr_value = parser(attr_value)
191 self.set_attribute_value(attr_name, attr_value)
193 class TempDir(object):
195 self.path = tempfile.mkdtemp()
198 shutil.rmtree(self.path)
200 class PermDir(object):
201 def __init__(self, path):
204 def create_experiment_suite(xml, access_config, repetitions = None,
205 duration = None, wait_guids = None):
208 (mode, launch, root_dir, log_level, communication, user, host, port,
209 key, agent, sudo, environment_setup, clean_root) \
210 = get_access_config_params(access_config)
212 if not mode or mode == DC.MODE_SINGLE_PROCESS:
213 from nepi.core.execute import ExperimentSuite
217 root_dir = PermDir(access_config.get_attribute_value(DC.ROOT_DIRECTORY))
219 exp_suite = ExperimentSuite(xml, access_config, repetitions, duration,
222 # inject reference to temporary dir, so that it gets cleaned
223 # up at destruction time.
224 exp_suite._tempdir = root_dir
226 elif mode == DC.MODE_DAEMON:
227 return ExperimentSuiteProxy(root_dir, log_level,
229 repetitions = repetitions,
231 wait_guids = wait_guids,
232 communication = communication,
239 environment_setup = environment_setup,
240 clean_root = clean_root)
241 raise RuntimeError("Unsupported access configuration '%s'" % mode)
243 def create_experiment_controller(xml, access_config = None):
246 log_level = DC.ERROR_LEVEL
248 (mode, launch, root_dir, log_level, communication, user, host, port,
249 key, agent, sudo, environment_setup, clean_root) \
250 = get_access_config_params(access_config)
252 os.environ["NEPI_CONTROLLER_LOGLEVEL"] = log_level
254 if not mode or mode == DC.MODE_SINGLE_PROCESS:
255 from nepi.core.execute import ExperimentController
257 if not access_config or not access_config.has_attribute(DC.ROOT_DIRECTORY):
260 root_dir = PermDir(access_config.get_attribute_value(DC.ROOT_DIRECTORY))
261 controller = ExperimentController(xml, root_dir.path)
263 # inject reference to temporary dir, so that it gets cleaned
264 # up at destruction time.
265 controller._tempdir = root_dir
272 elif mode == DC.MODE_DAEMON:
274 return ExperimentControllerProxy(root_dir, log_level,
275 experiment_xml = xml,
276 communication = communication,
284 environment_setup = environment_setup,
285 clean_root = clean_root)
288 # Maybe controller died, recover from persisted testbed information if possible
289 controller = ExperimentControllerProxy(root_dir, log_level,
290 experiment_xml = xml,
291 communication = communication,
299 environment_setup = environment_setup,
300 clean_root = clean_root)
305 raise RuntimeError("Unsupported access configuration '%s'" % mode)
307 def create_testbed_controller(testbed_id, testbed_version, access_config):
310 log_level = DC.ERROR_LEVEL
312 (mode, launch, root_dir, log_level, communication, user, host, port,
313 key, agent, sudo, environment_setup, clean_root) \
314 = get_access_config_params(access_config)
316 os.environ["NEPI_CONTROLLER_LOGLEVEL"] = log_level
318 if not mode or mode == DC.MODE_SINGLE_PROCESS:
320 raise ValueError, "Unsupported instantiation mode: %s with launch=False" % (mode,)
321 return _build_testbed_controller(testbed_id, testbed_version)
322 elif mode == DC.MODE_DAEMON:
323 return TestbedControllerProxy(root_dir, log_level,
324 testbed_id = testbed_id,
325 testbed_version = testbed_version,
326 communication = communication,
334 environment_setup = environment_setup,
335 clean_root = clean_root)
336 raise RuntimeError("Unsupported access configuration '%s'" % mode)
338 def _build_testbed_controller(testbed_id, testbed_version):
339 mod_name = nepi.util.environ.find_testbed(testbed_id)
341 if not mod_name in sys.modules:
345 raise ImportError, "Cannot find module %s in %r" % (mod_name, sys.path)
347 module = sys.modules[mod_name]
348 tc = module.TestbedController()
349 if tc.testbed_version != testbed_version:
350 raise RuntimeError("Bad testbed version on testbed %s. Asked for %s, got %s" % \
351 (testbed_id, testbed_version, tc.testbed_version))
354 # Just a namespace class
358 def pickled_data(sdata):
359 return cPickle.loads(base64.b64decode(sdata))
362 def base64_data(sdata):
363 return base64.b64decode(sdata)
367 return None if sdata == "None" else int(sdata)
371 return sdata == 'True'
375 def pickled_data(data):
376 return base64.b64encode(cPickle.dumps(data))
379 def base64_data(data):
382 return base64.b64encode(data)
386 return "None" if data is None else int(data)
390 return str(bool(data))
392 # import into Marshalling all the decoders
396 for typname, typ in vars(Decoders).iteritems()
397 if not typname.startswith('_')
400 _TYPE_ENCODERS = dict([
401 # id(type) -> (<encoding_function>, <formatting_string>)
402 (typname, (getattr(Encoders,typname),"%s"))
403 for typname in vars(Decoders)
404 if not typname.startswith('_')
405 and hasattr(Encoders,typname)
409 _TYPE_ENCODERS["float"] = (float, "%r")
410 _TYPE_ENCODERS["int"] = (int, "%d")
411 _TYPE_ENCODERS["long"] = (int, "%d")
412 _TYPE_ENCODERS["str"] = (str, "%s")
413 _TYPE_ENCODERS["unicode"] = (str, "%s")
416 _TYPE_ENCODERS[None] = (str, "%s")
421 Decorator that converts the given function into one that takes
422 a single "params" list, with each parameter marshalled according
423 to the given factory callable (type constructors are accepted).
425 The first argument (self) is left untouched.
429 @Marshalling.args(int,int,str,base64_data)
430 def somefunc(self, someint, otherint, somestr, someb64):
435 def rv(self, params):
436 return f(self, *[ ctor(val)
437 for ctor,val in zip(types, params[1:]) ])
441 # Derive type encoders by looking up types in _TYPE_ENCODERS
442 # make_proxy will use it to encode arguments in command strings
444 TYPE_ENCODERS = Marshalling._TYPE_ENCODERS
446 if typ.__name__ in TYPE_ENCODERS:
447 argencoders.append(TYPE_ENCODERS[typ.__name__])
450 argencoders.append(TYPE_ENCODERS[None])
452 rv._argencoders = tuple(argencoders)
454 rv._retval = getattr(f, '_retval', None)
459 def retval(typ=Decoders.base64_data):
461 Decorator that converts the given function into one that
462 returns a properly encoded return string, given that the undecorated
463 function returns suitable input for the encoding function.
465 The optional typ argument specifies a type.
466 For the default of base64_data, return values should be strings.
467 The return value of the encoding method should be a string always.
471 @Marshalling.args(int,int,str,base64_data)
472 @Marshalling.retval(str)
473 def somefunc(self, someint, otherint, somestr, someb64):
476 encode, fmt = Marshalling._TYPE_ENCODERS.get(
478 Marshalling._TYPE_ENCODERS[None])
483 def rv(self, *p, **kw):
484 data = f(self, *p, **kw)
490 rv._argtypes = getattr(f, '_argtypes', None)
491 rv._argencoders = getattr(f, '_argencoders', None)
498 Decorator that converts the given function into one that
499 always return an encoded empty string.
501 Useful for null-returning functions.
506 def rv(self, *p, **kw):
511 rv._argtypes = getattr(f, '_argtypes', None)
512 rv._argencoders = getattr(f, '_argencoders', None)
516 def handles(whichcommand):
518 Associates the method with a given command code for servers.
519 It should always be the topmost decorator.
522 f._handles_command = whichcommand
526 class BaseServer(server.Server):
527 def reply_action(self, msg):
529 result = base64.b64encode("Invalid command line")
530 reply = "%d|%s" % (ERROR, result)
532 params = msg.split("|")
533 instruction = int(params[0])
534 log_msg(self, params)
536 for mname,meth in vars(self.__class__).iteritems():
537 if not mname.startswith('_'):
538 cmd = getattr(meth, '_handles_command', None)
539 if cmd == instruction:
540 meth = getattr(self, mname)
544 error = "Invalid instruction %s" % instruction
545 self.log_error(error)
546 result = base64.b64encode(error)
547 reply = "%d|%s" % (ERROR, result)
549 error = self.log_error()
550 result = base64.b64encode(error)
551 reply = "%d|%s" % (ERROR, result)
552 log_reply(self, reply)
555 class ExperimentSuiteServer(BaseServer):
556 def __init__(self, root_dir, log_level,
557 xml, repetitions, duration, wait_guids,
558 communication = DC.ACCESS_LOCAL,
565 environment_setup = "",
567 super(ExperimentSuiteServer, self).__init__(root_dir, log_level,
568 environment_setup = environment_setup, clean_root = clean_root)
569 access_config = AccessConfiguration()
570 access_config.set_attribute_value(DC.ROOT_DIRECTORY, root_dir)
571 access_config.set_attribute_value(DC.LOG_LEVEL, log_level)
572 access_config.set_attribute_value(DC.DEPLOYMENT_ENVIRONMENT_SETUP, environment_setup)
574 access_config.set_attribute_value(DC.DEPLOYMENT_USER, user)
576 access_config.set_attribute_value(DC.DEPLOYMENT_HOST, host)
578 access_config.set_attribute_value(DC.DEPLOYMENT_PORT, port)
580 access_config.set_attribute_value(DC.USE_AGENT, agent)
582 acess_config.set_attribute_value(DC.USE_SUDO, sudo)
584 access_config.set_attribute_value(DC.DEPLOYMENT_KEY, ident_key)
586 access_config.set_attribute_value(DC.DEPLOYMENT_COMMUNICATION, communication)
588 access_config.set_attribute_value(DC.CLEAN_ROOT, clean_root)
589 self._experiment_xml = xml
590 self._duration = duration
591 self._repetitions = repetitions
592 self._wait_guids = wait_guids
593 self._access_config = access_config
594 self._experiment_suite = None
596 def post_daemonize(self):
597 from nepi.core.execute import ExperimentSuite
598 self._experiment_suite = ExperimentSuite(
599 self._experiment_xml, self._access_config,
600 self._repetitions, self._duration, self._wait_guids)
602 @Marshalling.handles(CURRENT)
604 @Marshalling.retval(int)
606 return self._experiment_suite.current()
608 @Marshalling.handles(STATUS)
610 @Marshalling.retval(int)
612 return self._experiment_suite.status()
614 @Marshalling.handles(FINISHED)
616 @Marshalling.retval(Marshalling.bool)
617 def is_finished(self):
618 return self._experiment_suite.is_finished()
620 @Marshalling.handles(ACCESS_CONFIGURATIONS)
622 @Marshalling.retval( Marshalling.pickled_data )
623 def get_access_configurations(self):
624 return self._experiment_suite.get_access_configurations()
626 @Marshalling.handles(START)
630 self._experiment_suite.start()
632 @Marshalling.handles(SHUTDOWN)
636 self._experiment_suite.shutdown()
638 @Marshalling.handles(CURRENT_ACCESS_CONFIG)
640 @Marshalling.retval( Marshalling.pickled_data )
641 def get_current_access_config(self):
642 return self._experiment_suite.get_current_access_config()
644 class TestbedControllerServer(BaseServer):
645 def __init__(self, root_dir, log_level, testbed_id, testbed_version,
646 environment_setup, clean_root):
647 super(TestbedControllerServer, self).__init__(root_dir, log_level,
648 environment_setup = environment_setup, clean_root = clean_root)
649 self._testbed_id = testbed_id
650 self._testbed_version = testbed_version
653 def post_daemonize(self):
654 self._testbed = _build_testbed_controller(self._testbed_id,
655 self._testbed_version)
657 @Marshalling.handles(GUIDS)
659 @Marshalling.retval( Marshalling.pickled_data )
661 return self._testbed.guids
663 @Marshalling.handles(TESTBED_ID)
665 @Marshalling.retval()
666 def testbed_id(self):
667 return str(self._testbed.testbed_id)
669 @Marshalling.handles(TESTBED_VERSION)
671 @Marshalling.retval()
672 def testbed_version(self):
673 return str(self._testbed.testbed_version)
675 @Marshalling.handles(CREATE)
676 @Marshalling.args(int, str)
678 def defer_create(self, guid, factory_id):
679 self._testbed.defer_create(guid, factory_id)
681 @Marshalling.handles(TRACE)
682 @Marshalling.args(int, str, Marshalling.base64_data)
683 @Marshalling.retval()
684 def trace(self, guid, trace_id, attribute):
685 return self._testbed.trace(guid, trace_id, attribute)
687 @Marshalling.handles(TRACES_INFO)
689 @Marshalling.retval( Marshalling.pickled_data )
690 def traces_info(self):
691 return self._testbed.traces_info()
693 @Marshalling.handles(START)
697 self._testbed.start()
699 @Marshalling.handles(STOP)
705 @Marshalling.handles(SHUTDOWN)
709 self._testbed.shutdown()
711 @Marshalling.handles(CONFIGURE)
712 @Marshalling.args(Marshalling.base64_data, Marshalling.pickled_data)
714 def defer_configure(self, name, value):
715 self._testbed.defer_configure(name, value)
717 @Marshalling.handles(CREATE_SET)
718 @Marshalling.args(int, Marshalling.base64_data, Marshalling.pickled_data)
720 def defer_create_set(self, guid, name, value):
721 self._testbed.defer_create_set(guid, name, value)
723 @Marshalling.handles(FACTORY_SET)
724 @Marshalling.args(Marshalling.base64_data, Marshalling.pickled_data)
726 def defer_factory_set(self, name, value):
727 self._testbed.defer_factory_set(name, value)
729 @Marshalling.handles(CONNECT)
730 @Marshalling.args(int, str, int, str)
732 def defer_connect(self, guid1, connector_type_name1, guid2, connector_type_name2):
733 self._testbed.defer_connect(guid1, connector_type_name1, guid2,
734 connector_type_name2)
736 @Marshalling.handles(CROSS_CONNECT)
737 @Marshalling.args(int, str, int, int, str, str, str)
739 def defer_cross_connect(self,
740 guid, connector_type_name,
741 cross_guid, cross_testbed_guid,
742 cross_testbed_id, cross_factory_id,
743 cross_connector_type_name):
744 self._testbed.defer_cross_connect(guid, connector_type_name, cross_guid,
745 cross_testbed_guid, cross_testbed_id, cross_factory_id,
746 cross_connector_type_name)
748 @Marshalling.handles(ADD_TRACE)
749 @Marshalling.args(int, str)
751 def defer_add_trace(self, guid, trace_id):
752 self._testbed.defer_add_trace(guid, trace_id)
754 @Marshalling.handles(ADD_ADDRESS)
755 @Marshalling.args(int, str, int, Marshalling.pickled_data)
757 def defer_add_address(self, guid, address, netprefix, broadcast):
758 self._testbed.defer_add_address(guid, address, netprefix,
761 @Marshalling.handles(ADD_ROUTE)
762 @Marshalling.args(int, str, int, str, int)
764 def defer_add_route(self, guid, destination, netprefix, nexthop, metric):
765 self._testbed.defer_add_route(guid, destination, netprefix, nexthop, metric)
767 @Marshalling.handles(DO_SETUP)
771 self._testbed.do_setup()
773 @Marshalling.handles(DO_CREATE)
777 self._testbed.do_create()
779 @Marshalling.handles(DO_CONNECT_INIT)
782 def do_connect_init(self):
783 self._testbed.do_connect_init()
785 @Marshalling.handles(DO_CONNECT_COMPL)
788 def do_connect_compl(self):
789 self._testbed.do_connect_compl()
791 @Marshalling.handles(DO_CONFIGURE)
794 def do_configure(self):
795 self._testbed.do_configure()
797 @Marshalling.handles(DO_PRECONFIGURE)
800 def do_preconfigure(self):
801 self._testbed.do_preconfigure()
803 @Marshalling.handles(DO_PRESTART)
806 def do_prestart(self):
807 self._testbed.do_prestart()
809 @Marshalling.handles(DO_CROSS_CONNECT_INIT)
810 @Marshalling.args( Marshalling.Decoders.pickled_data )
812 def do_cross_connect_init(self, cross_data):
813 self._testbed.do_cross_connect_init(cross_data)
815 @Marshalling.handles(DO_CROSS_CONNECT_COMPL)
816 @Marshalling.args( Marshalling.Decoders.pickled_data )
818 def do_cross_connect_compl(self, cross_data):
819 self._testbed.do_cross_connect_compl(cross_data)
821 @Marshalling.handles(GET)
822 @Marshalling.args(int, Marshalling.base64_data, str)
823 @Marshalling.retval( Marshalling.pickled_data )
824 def get(self, guid, name, time):
825 return self._testbed.get(guid, name, time)
827 @Marshalling.handles(SET)
828 @Marshalling.args(int, Marshalling.base64_data, Marshalling.pickled_data, str)
830 def set(self, guid, name, value, time):
831 self._testbed.set(guid, name, value, time)
833 @Marshalling.handles(GET_ADDRESS)
834 @Marshalling.args(int, int, Marshalling.base64_data)
835 @Marshalling.retval()
836 def get_address(self, guid, index, attribute):
837 return str(self._testbed.get_address(guid, index, attribute))
839 @Marshalling.handles(GET_ROUTE)
840 @Marshalling.args(int, int, Marshalling.base64_data)
841 @Marshalling.retval()
842 def get_route(self, guid, index, attribute):
843 return str(self._testbed.get_route(guid, index, attribute))
845 @Marshalling.handles(ACTION)
846 @Marshalling.args(str, int, Marshalling.base64_data)
848 def action(self, time, guid, command):
849 self._testbed.action(time, guid, command)
851 @Marshalling.handles(STATUS)
852 @Marshalling.args(Marshalling.nullint)
853 @Marshalling.retval(int)
854 def status(self, guid):
855 return self._testbed.status(guid)
857 @Marshalling.handles(TESTBED_STATUS)
859 @Marshalling.retval(int)
860 def testbed_status(self):
861 return self._testbed.testbed_status()
863 @Marshalling.handles(GET_ATTRIBUTE_LIST)
864 @Marshalling.args(int, Marshalling.nullint, Marshalling.bool)
865 @Marshalling.retval( Marshalling.pickled_data )
866 def get_attribute_list(self, guid, filter_flags = None, exclude = False):
867 return self._testbed.get_attribute_list(guid, filter_flags, exclude)
869 @Marshalling.handles(GET_FACTORY_ID)
870 @Marshalling.args(int)
871 @Marshalling.retval()
872 def get_factory_id(self, guid):
873 return self._testbed.get_factory_id(guid)
875 @Marshalling.handles(RECOVER)
879 self._testbed.recover()
882 class ExperimentControllerServer(BaseServer):
883 def __init__(self, root_dir, log_level, experiment_xml, environment_setup,
885 super(ExperimentControllerServer, self).__init__(root_dir, log_level,
886 environment_setup = environment_setup, clean_root = clean_root)
887 self._experiment_xml = experiment_xml
888 self._experiment = None
890 def post_daemonize(self):
891 from nepi.core.execute import ExperimentController
892 self._experiment = ExperimentController(self._experiment_xml,
893 root_dir = self._root_dir)
895 @Marshalling.handles(GUIDS)
897 @Marshalling.retval( Marshalling.pickled_data )
899 return self._experiment.guids
901 @Marshalling.handles(STARTED_TIME)
903 @Marshalling.retval( Marshalling.pickled_data )
904 def started_time(self):
905 return self._experiment.started_time
907 @Marshalling.handles(STOPPED_TIME)
909 @Marshalling.retval( Marshalling.pickled_data )
910 def stopped_time(self):
911 return self._experiment.stopped_time
913 @Marshalling.handles(XML)
915 @Marshalling.retval()
916 def experiment_design_xml(self):
917 return self._experiment.experiment_design_xml
919 @Marshalling.handles(EXEC_XML)
921 @Marshalling.retval()
922 def experiment_execute_xml(self):
923 return self._experiment.experiment_execute_xml
925 @Marshalling.handles(TRACE)
926 @Marshalling.args(int, str, Marshalling.base64_data)
927 @Marshalling.retval()
928 def trace(self, guid, trace_id, attribute):
929 return str(self._experiment.trace(guid, trace_id, attribute))
931 @Marshalling.handles(TRACES_INFO)
933 @Marshalling.retval( Marshalling.pickled_data )
934 def traces_info(self):
935 return self._experiment.traces_info()
937 @Marshalling.handles(FINISHED)
938 @Marshalling.args(int)
939 @Marshalling.retval(Marshalling.bool)
940 def is_finished(self, guid):
941 return self._experiment.is_finished(guid)
943 @Marshalling.handles(STATUS)
944 @Marshalling.args(int)
945 @Marshalling.retval(int)
946 def status(self, guid):
947 return self._experiment.status(guid)
949 @Marshalling.handles(GET)
950 @Marshalling.args(int, Marshalling.base64_data, str)
951 @Marshalling.retval( Marshalling.pickled_data )
952 def get(self, guid, name, time):
953 return self._experiment.get(guid, name, time)
955 @Marshalling.handles(SET)
956 @Marshalling.args(int, Marshalling.base64_data, Marshalling.pickled_data, str)
958 def set(self, guid, name, value, time):
959 self._experiment.set(guid, name, value, time)
961 @Marshalling.handles(START)
965 self._experiment.start()
967 @Marshalling.handles(STOP)
971 self._experiment.stop()
973 @Marshalling.handles(RECOVER)
977 self._experiment.recover()
979 @Marshalling.handles(SHUTDOWN)
983 self._experiment.shutdown()
985 @Marshalling.handles(GET_TESTBED_ID)
986 @Marshalling.args(int)
987 @Marshalling.retval()
988 def get_testbed_id(self, guid):
989 return self._experiment.get_testbed_id(guid)
991 @Marshalling.handles(GET_FACTORY_ID)
992 @Marshalling.args(int)
993 @Marshalling.retval()
994 def get_factory_id(self, guid):
995 return self._experiment.get_factory_id(guid)
997 @Marshalling.handles(GET_TESTBED_VERSION)
998 @Marshalling.args(int)
999 @Marshalling.retval()
1000 def get_testbed_version(self, guid):
1001 return self._experiment.get_testbed_version(guid)
1003 class BaseProxy(object):
1005 _ServerClassModule = "nepi.util.proxy"
1007 def __init__(self, ctor_args, root_dir,
1009 communication = DC.ACCESS_LOCAL,
1016 environment_setup = "",
1017 clean_root = False):
1020 "from %(classmodule)s import %(classname)s;"
1021 "s = %(classname)s%(ctor_args)r;"
1024 classname = self._ServerClass.__name__,
1025 classmodule = self._ServerClassModule,
1026 ctor_args = ctor_args
1028 proc = server.popen_python(python_code,
1029 communication = communication,
1034 ident_key = ident_key,
1036 environment_setup = environment_setup)
1037 # Wait for the server to be ready, otherwise nobody
1038 # will be able to connect to it
1042 helo = proc.stderr.readline()
1043 if helo == 'SERVER_READY.\n':
1047 raise AssertionError, "Expected 'SERVER_READY.', got: %s" % (''.join(err),)
1048 # connect client to server
1049 self._client = server.Client(root_dir,
1050 communication = communication,
1056 environment_setup = environment_setup)
1059 def _make_message(argtypes, argencoders, command, methname, classname, *args):
1060 if len(argtypes) != len(argencoders):
1061 raise ValueError, "Invalid arguments for _make_message: "\
1062 "in stub method %s of class %s "\
1063 "argtypes and argencoders must match in size" % (
1064 methname, classname )
1065 if len(argtypes) != len(args):
1066 raise ValueError, "Invalid arguments for _make_message: "\
1067 "in stub method %s of class %s "\
1068 "expected %d arguments, got %d" % (
1069 methname, classname,
1070 len(argtypes), len(args))
1073 for argnum, (typ, (encode, fmt), val) in enumerate(zip(argtypes, argencoders, args)):
1075 buf.append(fmt % encode(val))
1078 raise TypeError, "Argument %d of stub method %s of class %s "\
1079 "requires a value of type %s, but got %s - nested error: %s" % (
1080 argnum, methname, classname,
1081 getattr(typ, '__name__', typ), type(val),
1082 traceback.format_exc()
1085 return "%d|%s" % (command, '|'.join(buf))
1088 def _parse_reply(rvtype, methname, classname, reply):
1090 raise RuntimeError, "Invalid reply: %r "\
1091 "for stub method %s of class %s" % (
1097 result = reply.split("|")
1098 code = int(result[0])
1102 raise TypeError, "Return value of stub method %s of class %s "\
1103 "cannot be parsed: must be of type %s, got %r - nested error: %s" % (
1104 methname, classname,
1105 getattr(rvtype, '__name__', rvtype), reply,
1106 traceback.format_exc()
1109 text = base64.b64decode(text)
1110 raise RuntimeError(text)
1119 raise TypeError, "Return value of stub method %s of class %s "\
1120 "cannot be parsed: must be of type %s - nested error: %s" % (
1121 methname, classname,
1122 getattr(rvtype, '__name__', rvtype),
1123 traceback.format_exc()
1126 raise RuntimeError, "Invalid reply: %r "\
1127 "for stub method %s of class %s - unknown code" % (
1133 def _make_stubs(server_class, template_class):
1135 Returns a dictionary method_name -> method
1140 class SomeProxy(BaseProxy):
1143 locals().update( BaseProxy._make_stubs(
1148 ServerClass is the corresponding Server class, as
1149 specified in the _ServerClass class method (_make_stubs
1150 is static and can't access the method), and TemplateClass
1151 is the ultimate implementation class behind the server,
1152 from which argument names and defaults are taken, to
1153 maintain meaningful interfaces.
1160 func_template_path = os.path.join(
1161 os.path.dirname(__file__),
1163 func_template_file = open(func_template_path, "r")
1164 func_template = func_template_file.read()
1165 func_template_file.close()
1167 for methname in vars(template_class).copy():
1168 if methname.endswith('_deferred'):
1169 # cannot wrap deferreds...
1171 dmethname = methname+'_deferred'
1172 if hasattr(server_class, methname) and not methname.startswith('_'):
1173 template_meth = getattr(template_class, methname)
1174 server_meth = getattr(server_class, methname)
1176 command = getattr(server_meth, '_handles_command', None)
1177 argtypes = getattr(server_meth, '_argtypes', None)
1178 argencoders = getattr(server_meth, '_argencoders', None)
1179 rvtype = getattr(server_meth, '_retval', None)
1182 if hasattr(template_meth, 'fget'):
1184 template_meth = template_meth.fget
1187 if command is not None and argtypes is not None and argencoders is not None:
1188 # We have an interface method...
1189 code = template_meth.func_code
1190 argnames = code.co_varnames[:code.co_argcount]
1191 argdefaults = ( (NONE,) * (len(argnames) - len(template_meth.func_defaults or ()))
1192 + (template_meth.func_defaults or ()) )
1194 func_globals = dict(
1195 BaseProxy = BaseProxy,
1196 argtypes = argtypes,
1197 argencoders = argencoders,
1199 functools = functools,
1203 func_text = func_template % dict(
1205 args = '%s' % (','.join(argnames[1:])),
1206 argdefs = ','.join([
1207 argname if argdef is NONE
1208 else "%s=%r" % (argname, argdef)
1209 for argname, argdef in zip(argnames[1:], argdefaults[1:])
1212 methname = methname,
1213 classname = server_class.__name__
1216 func_text = compile(
1221 exec func_text in func_globals, context
1224 rv[methname] = property(context[methname])
1225 rv[dmethname] = property(context[dmethname])
1227 rv[methname] = context[methname]
1228 rv[dmethname] = context[dmethname]
1230 # inject _deferred into core classes
1231 if hasattr(template_class, methname) and not hasattr(template_class, dmethname):
1232 def freezename(methname, dmethname):
1233 def dmeth(self, *p, **kw):
1234 return getattr(self, methname)(*p, **kw)
1235 dmeth.__name__ = dmethname
1237 dmeth = freezename(methname, dmethname)
1238 setattr(template_class, dmethname, dmeth)
1242 class ExperimentSuiteProxy(BaseProxy):
1244 _ServerClass = ExperimentSuiteServer
1246 def __init__(self, root_dir, log_level,
1247 xml, repetitions, duration, wait_guids,
1248 communication = DC.ACCESS_LOCAL,
1255 environment_setup = "",
1256 clean_root = False):
1257 super(ExperimentSuiteProxy,self).__init__(
1258 ctor_args = (root_dir, log_level,
1272 root_dir = root_dir,
1273 launch = True, #launch
1274 communication = communication,
1278 ident_key = ident_key,
1281 environment_setup = environment_setup)
1283 locals().update( BaseProxy._make_stubs(
1284 server_class = ExperimentSuiteServer,
1285 template_class = nepi.core.execute.ExperimentSuite,
1288 # Shutdown stops the serverside...
1289 def shutdown(self, _stub = shutdown):
1291 self._client.send_stop()
1292 self._client.read_reply() # wait for it
1295 class TestbedControllerProxy(BaseProxy):
1297 _ServerClass = TestbedControllerServer
1299 def __init__(self, root_dir, log_level,
1301 testbed_version = None,
1303 communication = DC.ACCESS_LOCAL,
1310 environment_setup = "",
1311 clean_root = False):
1312 if launch and (testbed_id == None or testbed_version == None):
1313 raise RuntimeError("To launch a TesbedControllerServer a "
1314 "testbed_id and testbed_version are required")
1315 super(TestbedControllerProxy,self).__init__(
1316 ctor_args = (root_dir, log_level, testbed_id, testbed_version,
1317 environment_setup, clean_root),
1318 root_dir = root_dir,
1320 communication = communication,
1324 ident_key = ident_key,
1327 environment_setup = environment_setup)
1329 locals().update( BaseProxy._make_stubs(
1330 server_class = TestbedControllerServer,
1331 template_class = nepi.core.execute.TestbedController,
1334 # Shutdown stops the serverside...
1335 def shutdown(self, _stub = shutdown):
1337 self._client.send_stop()
1338 self._client.read_reply() # wait for it
1342 class ExperimentControllerProxy(BaseProxy):
1343 _ServerClass = ExperimentControllerServer
1345 def __init__(self, root_dir, log_level,
1346 experiment_xml = None,
1348 communication = DC.ACCESS_LOCAL,
1355 environment_setup = "",
1356 clean_root = False):
1357 super(ExperimentControllerProxy,self).__init__(
1358 ctor_args = (root_dir, log_level, experiment_xml, environment_setup,
1360 root_dir = root_dir,
1362 communication = communication,
1366 ident_key = ident_key,
1369 environment_setup = environment_setup,
1370 clean_root = clean_root)
1372 locals().update( BaseProxy._make_stubs(
1373 server_class = ExperimentControllerServer,
1374 template_class = nepi.core.execute.ExperimentController,
1377 # Shutdown stops the serverside...
1378 def shutdown(self, _stub = shutdown):
1380 self._client.send_stop()
1381 self._client.read_reply() # wait for it