6c4db3235f063a4e34c85bcbe999dbf45fa1b843
[nepi.git] / src / nepi / util / proxy.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 import base64
5 import nepi.core.execute
6 from nepi.core.attributes import AttributesMap, Attribute
7 from nepi.util import server, validation
8 from nepi.util.constants import TIME_NOW, ATTR_NEPI_TESTBED_ENVIRONMENT_SETUP, DeploymentConfiguration as DC
9 import getpass
10 import cPickle
11 import sys
12 import time
13 import tempfile
14 import shutil
15 import functools
16
17 # PROTOCOL REPLIES
18 OK = 0
19 ERROR = 1
20
21 # PROTOCOL INSTRUCTION MESSAGES
22 XML = 2 
23 TRACE   = 4
24 FINISHED    = 5
25 START   = 6
26 STOP    = 7
27 SHUTDOWN    = 8
28 CONFIGURE   = 9
29 CREATE      = 10
30 CREATE_SET  = 11
31 FACTORY_SET = 12
32 CONNECT     = 13
33 CROSS_CONNECT   = 14
34 ADD_TRACE   = 15
35 ADD_ADDRESS = 16
36 ADD_ROUTE   = 17
37 DO_SETUP    = 18
38 DO_CREATE   = 19
39 DO_CONNECT_INIT = 20
40 DO_CONFIGURE    = 21
41 DO_CROSS_CONNECT_INIT   = 22
42 GET = 23
43 SET = 24
44 ACTION  = 25
45 STATUS  = 26
46 GUIDS  = 27
47 GET_ROUTE = 28
48 GET_ADDRESS = 29
49 RECOVER = 30
50 DO_PRECONFIGURE     = 31
51 GET_ATTRIBUTE_LIST  = 32
52 DO_CONNECT_COMPL    = 33
53 DO_CROSS_CONNECT_COMPL  = 34
54 TESTBED_ID  = 35
55 TESTBED_VERSION  = 36
56 DO_PRESTART = 37
57 GET_FACTORY_ID = 38
58 GET_TESTBED_ID = 39
59 GET_TESTBED_VERSION = 40
60 TRACES_INFO = 41
61 EXEC_XML = 42
62
63 instruction_text = dict({
64     OK:     "OK",
65     ERROR:  "ERROR",
66     XML:    "XML",
67     EXEC_XML:    "EXEC_XML",
68     TRACE:  "TRACE",
69     FINISHED:   "FINISHED",
70     START:  "START",
71     STOP:   "STOP",
72     RECOVER: "RECOVER",
73     SHUTDOWN:   "SHUTDOWN",
74     CONFIGURE:  "CONFIGURE",
75     CREATE: "CREATE",
76     CREATE_SET: "CREATE_SET",
77     FACTORY_SET:    "FACTORY_SET",
78     CONNECT:    "CONNECT",
79     CROSS_CONNECT: "CROSS_CONNECT",
80     ADD_TRACE:  "ADD_TRACE",
81     ADD_ADDRESS:    "ADD_ADDRESS",
82     ADD_ROUTE:  "ADD_ROUTE",
83     DO_SETUP:   "DO_SETUP",
84     DO_CREATE:  "DO_CREATE",
85     DO_CONNECT_INIT: "DO_CONNECT_INIT",
86     DO_CONNECT_COMPL: "DO_CONNECT_COMPL",
87     DO_CONFIGURE:   "DO_CONFIGURE",
88     DO_PRECONFIGURE:   "DO_PRECONFIGURE",
89     DO_CROSS_CONNECT_INIT:  "DO_CROSS_CONNECT_INIT",
90     DO_CROSS_CONNECT_COMPL: "DO_CROSS_CONNECT_COMPL",
91     GET:    "GET",
92     SET:    "SET",
93     GET_ROUTE: "GET_ROUTE",
94     GET_ADDRESS: "GET_ADDRESS",
95     GET_ATTRIBUTE_LIST: "GET_ATTRIBUTE_LIST",
96     GET_FACTORY_ID: "GET_FACTORY_ID",
97     GET_TESTBED_ID: "GET_TESTBED_ID",
98     GET_TESTBED_VERSION: "GET_TESTBED_VERSION",
99     ACTION: "ACTION",
100     STATUS: "STATUS",
101     GUIDS:  "GUIDS",
102     TESTBED_ID: "TESTBED_ID",
103     TESTBED_VERSION: "TESTBED_VERSION",
104     TRACES_INFO: "TRACES_INFO",
105     })
106
107 def log_msg(server, params):
108     try:
109         instr = int(params[0])
110         instr_txt = instruction_text[instr]
111         server.log_debug("%s - msg: %s [%s]" % (server.__class__.__name__, 
112             instr_txt, ", ".join(map(str, params[1:]))))
113     except:
114         # don't die for logging
115         pass
116
117 def log_reply(server, reply):
118     try:
119         res = reply.split("|")
120         code = int(res[0])
121         code_txt = instruction_text[code]
122         try:
123             txt = base64.b64decode(res[1])
124         except:
125             txt = res[1]
126         server.log_debug("%s - reply: %s %s" % (server.__class__.__name__, 
127                 code_txt, txt))
128     except:
129         # don't die for logging
130         server.log_debug("%s - reply: %s" % (server.__class__.__name__, 
131                 reply))
132         pass
133
134 def to_server_log_level(log_level):
135     return (
136         server.DEBUG_LEVEL
137             if log_level == DC.DEBUG_LEVEL 
138         else server.ERROR_LEVEL
139     )
140
141 def get_access_config_params(access_config):
142     root_dir = access_config.get_attribute_value(DC.ROOT_DIRECTORY)
143     log_level = access_config.get_attribute_value(DC.LOG_LEVEL)
144     log_level = to_server_log_level(log_level)
145     user = host = port = agent = key = None
146     communication = access_config.get_attribute_value(DC.DEPLOYMENT_COMMUNICATION)
147     environment_setup = (
148         access_config.get_attribute_value(DC.DEPLOYMENT_ENVIRONMENT_SETUP)
149         if access_config.has_attribute(DC.DEPLOYMENT_ENVIRONMENT_SETUP)
150         else None
151     )
152     if communication == DC.ACCESS_SSH:
153         user = access_config.get_attribute_value(DC.DEPLOYMENT_USER)
154         host = access_config.get_attribute_value(DC.DEPLOYMENT_HOST)
155         port = access_config.get_attribute_value(DC.DEPLOYMENT_PORT)
156         agent = access_config.get_attribute_value(DC.USE_AGENT)
157         key = access_config.get_attribute_value(DC.DEPLOYMENT_KEY)
158     return (root_dir, log_level, user, host, port, key, agent, environment_setup)
159
160 class AccessConfiguration(AttributesMap):
161     def __init__(self, params = None):
162         super(AccessConfiguration, self).__init__()
163         
164         from nepi.core.metadata import Metadata
165         
166         for _,attr_info in Metadata.DEPLOYMENT_ATTRIBUTES.iteritems():
167             self.add_attribute(**attr_info)
168         
169         if params:
170             for attr_name, attr_value in params.iteritems():
171                 parser = Attribute.type_parsers[self.get_attribute_type(attr_name)]
172                 attr_value = parser(attr_value)
173                 self.set_attribute_value(attr_name, attr_value)
174
175 class TempDir(object):
176     def __init__(self):
177         self.path = tempfile.mkdtemp()
178     
179     def __del__(self):
180         shutil.rmtree(self.path)
181
182 class PermDir(object):
183     def __init__(self, path):
184         self.path = path
185
186 def create_experiment_controller(xml, access_config = None):
187     mode = None if not access_config \
188             else access_config.get_attribute_value(DC.DEPLOYMENT_MODE)
189     launch = True if not access_config \
190             else not access_config.get_attribute_value(DC.RECOVER)
191     if not mode or mode == DC.MODE_SINGLE_PROCESS:
192         if not launch:
193             raise ValueError, "Unsupported instantiation mode: %s with lanch=False" % (mode,)
194         
195         from nepi.core.execute import ExperimentController
196         
197         if not access_config or not access_config.has_attribute(DC.ROOT_DIRECTORY):
198             root_dir = TempDir()
199         else:
200             root_dir = PermDir(access_config.get_attribute_value(DC.ROOT_DIRECTORY))
201         controller = ExperimentController(xml, root_dir.path)
202         
203         # inject reference to temporary dir, so that it gets cleaned
204         # up at destruction time.
205         controller._tempdir = root_dir
206         
207         return controller
208     elif mode == DC.MODE_DAEMON:
209         (root_dir, log_level, user, host, port, key, agent, environment_setup) = \
210                 get_access_config_params(access_config)
211         return ExperimentControllerProxy(root_dir, log_level,
212                 experiment_xml = xml, host = host, port = port, user = user, ident_key = key,
213                 agent = agent, launch = launch,
214                 environment_setup = environment_setup)
215     raise RuntimeError("Unsupported access configuration '%s'" % mode)
216
217 def create_testbed_controller(testbed_id, testbed_version, access_config):
218     mode = None if not access_config \
219             else access_config.get_attribute_value(DC.DEPLOYMENT_MODE)
220     launch = True if not access_config \
221             else not access_config.get_attribute_value(DC.RECOVER)
222     if not mode or mode == DC.MODE_SINGLE_PROCESS:
223         if not launch:
224             raise ValueError, "Unsupported instantiation mode: %s with lanch=False" % (mode,)
225         return  _build_testbed_controller(testbed_id, testbed_version)
226     elif mode == DC.MODE_DAEMON:
227         (root_dir, log_level, user, host, port, key, agent, environment_setup) = \
228                 get_access_config_params(access_config)
229         return TestbedControllerProxy(root_dir, log_level, testbed_id = testbed_id, 
230                 testbed_version = testbed_version, host = host, port = port, ident_key = key,
231                 user = user, agent = agent, launch = launch,
232                 environment_setup = environment_setup)
233     raise RuntimeError("Unsupported access configuration '%s'" % mode)
234
235 def _build_testbed_controller(testbed_id, testbed_version):
236     mod_name = "nepi.testbeds.%s" % (testbed_id.lower())
237     if not mod_name in sys.modules:
238         __import__(mod_name)
239     module = sys.modules[mod_name]
240     tc = module.TestbedController()
241     if tc.testbed_version != testbed_version:
242         raise RuntimeError("Bad testbed version on testbed %s. Asked for %s, got %s" % \
243                 (testbed_id, testbed_version, tc.testbed_version))
244     return tc
245
246 # Just a namespace class
247 class Marshalling:
248     class Decoders:
249         @staticmethod
250         def pickled_data(sdata):
251             return cPickle.loads(base64.b64decode(sdata))
252         
253         @staticmethod
254         def base64_data(sdata):
255             return base64.b64decode(sdata)
256         
257         @staticmethod
258         def nullint(sdata):
259             return None if sdata == "None" else int(sdata)
260         
261         @staticmethod
262         def bool(sdata):
263             return sdata == 'True'
264         
265     class Encoders:
266         @staticmethod
267         def pickled_data(data):
268             return base64.b64encode(cPickle.dumps(data))
269         
270         @staticmethod
271         def base64_data(data):
272             return base64.b64encode(data)
273         
274         @staticmethod
275         def nullint(data):
276             return "None" if data is None else int(data)
277         
278         @staticmethod
279         def bool(data):
280             return str(bool(data))
281            
282     # import into Marshalling all the decoders
283     # they act as types
284     locals().update([
285         (typname, typ)
286         for typname, typ in vars(Decoders).iteritems()
287         if not typname.startswith('_')
288     ])
289
290     _TYPE_ENCODERS = dict([
291         # id(type) -> (<encoding_function>, <formatting_string>)
292         (typname, (getattr(Encoders,typname),"%s"))
293         for typname in vars(Decoders)
294         if not typname.startswith('_')
295            and hasattr(Encoders,typname)
296     ])
297
298     # Builtins
299     _TYPE_ENCODERS["float"] = (float, "%r")
300     _TYPE_ENCODERS["int"] = (int, "%d")
301     _TYPE_ENCODERS["long"] = (int, "%d")
302     _TYPE_ENCODERS["str"] = (str, "%s")
303     _TYPE_ENCODERS["unicode"] = (str, "%s")
304     
305     # Generic encoder
306     _TYPE_ENCODERS[None] = (str, "%s")
307     
308     @staticmethod
309     def args(*types):
310         """
311         Decorator that converts the given function into one that takes
312         a single "params" list, with each parameter marshalled according
313         to the given factory callable (type constructors are accepted).
314         
315         The first argument (self) is left untouched.
316         
317         eg:
318         
319         @Marshalling.args(int,int,str,base64_data)
320         def somefunc(self, someint, otherint, somestr, someb64):
321            return someretval
322         """
323         def decor(f):
324             @functools.wraps(f)
325             def rv(self, params):
326                 return f(self, *[ ctor(val)
327                                   for ctor,val in zip(types, params[1:]) ])
328             
329             rv._argtypes = types
330             
331             # Derive type encoders by looking up types in _TYPE_ENCODERS
332             # make_proxy will use it to encode arguments in command strings
333             argencoders = []
334             TYPE_ENCODERS = Marshalling._TYPE_ENCODERS
335             for typ in types:
336                 if typ.__name__ in TYPE_ENCODERS:
337                     argencoders.append(TYPE_ENCODERS[typ.__name__])
338                 else:
339                     # generic encoder
340                     argencoders.append(TYPE_ENCODERS[None])
341             
342             rv._argencoders = tuple(argencoders)
343             
344             rv._retval = getattr(f, '_retval', None)
345             return rv
346         return decor
347
348     @staticmethod
349     def retval(typ=Decoders.base64_data):
350         """
351         Decorator that converts the given function into one that 
352         returns a properly encoded return string, given that the undecorated
353         function returns suitable input for the encoding function.
354         
355         The optional typ argument specifies a type.
356         For the default of base64_data, return values should be strings.
357         The return value of the encoding method should be a string always.
358         
359         eg:
360         
361         @Marshalling.args(int,int,str,base64_data)
362         @Marshalling.retval(str)
363         def somefunc(self, someint, otherint, somestr, someb64):
364            return someint
365         """
366         encode, fmt = Marshalling._TYPE_ENCODERS.get(
367             typ.__name__,
368             Marshalling._TYPE_ENCODERS[None])
369         fmt = "%d|"+fmt
370         
371         def decor(f):
372             @functools.wraps(f)
373             def rv(self, *p, **kw):
374                 data = f(self, *p, **kw)
375                 return fmt % (
376                     OK,
377                     encode(data)
378                 )
379             rv._retval = typ
380             rv._argtypes = getattr(f, '_argtypes', None)
381             rv._argencoders = getattr(f, '_argencoders', None)
382             return rv
383         return decor
384     
385     @staticmethod
386     def retvoid(f):
387         """
388         Decorator that converts the given function into one that 
389         always return an encoded empty string.
390         
391         Useful for null-returning functions.
392         """
393         OKRV = "%d|" % (OK,)
394         
395         @functools.wraps(f)
396         def rv(self, *p, **kw):
397             f(self, *p, **kw)
398             return OKRV
399         
400         rv._retval = None
401         rv._argtypes = getattr(f, '_argtypes', None)
402         rv._argencoders = getattr(f, '_argencoders', None)
403         return rv
404     
405     @staticmethod
406     def handles(whichcommand):
407         """
408         Associates the method with a given command code for servers.
409         It should always be the topmost decorator.
410         """
411         def decor(f):
412             f._handles_command = whichcommand
413             return f
414         return decor
415
416 class BaseServer(server.Server):
417     def reply_action(self, msg):
418         if not msg:
419             result = base64.b64encode("Invalid command line")
420             reply = "%d|%s" % (ERROR, result)
421         else:
422             params = msg.split("|")
423             instruction = int(params[0])
424             log_msg(self, params)
425             try:
426                 for mname,meth in vars(self.__class__).iteritems():
427                     if not mname.startswith('_'):
428                         cmd = getattr(meth, '_handles_command', None)
429                         if cmd == instruction:
430                             meth = getattr(self, mname)
431                             reply = meth(params)
432                             break
433                 else:
434                     error = "Invalid instruction %s" % instruction
435                     self.log_error(error)
436                     result = base64.b64encode(error)
437                     reply = "%d|%s" % (ERROR, result)
438             except:
439                 error = self.log_error()
440                 result = base64.b64encode(error)
441                 reply = "%d|%s" % (ERROR, result)
442         log_reply(self, reply)
443         return reply
444
445 class TestbedControllerServer(BaseServer):
446     def __init__(self, root_dir, log_level, testbed_id, testbed_version):
447         super(TestbedControllerServer, self).__init__(root_dir, log_level)
448         self._testbed_id = testbed_id
449         self._testbed_version = testbed_version
450         self._testbed = None
451
452     def post_daemonize(self):
453         self._testbed = _build_testbed_controller(self._testbed_id, 
454                 self._testbed_version)
455
456     @Marshalling.handles(GUIDS)
457     @Marshalling.args()
458     @Marshalling.retval( Marshalling.pickled_data )
459     def guids(self):
460         return self._testbed.guids
461
462     @Marshalling.handles(TESTBED_ID)
463     @Marshalling.args()
464     @Marshalling.retval()
465     def testbed_id(self):
466         return str(self._testbed.testbed_id)
467
468     @Marshalling.handles(TESTBED_VERSION)
469     @Marshalling.args()
470     @Marshalling.retval()
471     def testbed_version(self):
472         return str(self._testbed.testbed_version)
473
474     @Marshalling.handles(CREATE)
475     @Marshalling.args(int, str)
476     @Marshalling.retvoid
477     def defer_create(self, guid, factory_id):
478         self._testbed.defer_create(guid, factory_id)
479
480     @Marshalling.handles(TRACE)
481     @Marshalling.args(int, str, Marshalling.base64_data)
482     @Marshalling.retval()
483     def trace(self, guid, trace_id, attribute):
484         return self._testbed.trace(guid, trace_id, attribute)
485
486     @Marshalling.handles(TRACES_INFO)
487     @Marshalling.args()
488     @Marshalling.retval( Marshalling.pickled_data )
489     def traces_info(self):
490         return self._testbed.traces_info()
491
492     @Marshalling.handles(START)
493     @Marshalling.args()
494     @Marshalling.retvoid
495     def start(self):
496         self._testbed.start()
497
498     @Marshalling.handles(STOP)
499     @Marshalling.args()
500     @Marshalling.retvoid
501     def stop(self):
502         self._testbed.stop()
503
504     @Marshalling.handles(SHUTDOWN)
505     @Marshalling.args()
506     @Marshalling.retvoid
507     def shutdown(self):
508         self._testbed.shutdown()
509
510     @Marshalling.handles(CONFIGURE)
511     @Marshalling.args(Marshalling.base64_data, Marshalling.pickled_data)
512     @Marshalling.retvoid
513     def defer_configure(self, name, value):
514         self._testbed.defer_configure(name, value)
515
516     @Marshalling.handles(CREATE_SET)
517     @Marshalling.args(int, Marshalling.base64_data, Marshalling.pickled_data)
518     @Marshalling.retvoid
519     def defer_create_set(self, guid, name, value):
520         self._testbed.defer_create_set(guid, name, value)
521
522     @Marshalling.handles(FACTORY_SET)
523     @Marshalling.args(Marshalling.base64_data, Marshalling.pickled_data)
524     @Marshalling.retvoid
525     def defer_factory_set(self, name, value):
526         self._testbed.defer_factory_set(name, value)
527
528     @Marshalling.handles(CONNECT)
529     @Marshalling.args(int, str, int, str)
530     @Marshalling.retvoid
531     def defer_connect(self, guid1, connector_type_name1, guid2, connector_type_name2):
532         self._testbed.defer_connect(guid1, connector_type_name1, guid2, 
533             connector_type_name2)
534
535     @Marshalling.handles(CROSS_CONNECT)
536     @Marshalling.args(int, str, int, int, str, str, str)
537     @Marshalling.retvoid
538     def defer_cross_connect(self, 
539             guid, connector_type_name,
540             cross_guid, cross_testbed_guid,
541             cross_testbed_id, cross_factory_id,
542             cross_connector_type_name):
543         self._testbed.defer_cross_connect(guid, connector_type_name, cross_guid, 
544             cross_testbed_guid, cross_testbed_id, cross_factory_id, 
545             cross_connector_type_name)
546
547     @Marshalling.handles(ADD_TRACE)
548     @Marshalling.args(int, str)
549     @Marshalling.retvoid
550     def defer_add_trace(self, guid, trace_id):
551         self._testbed.defer_add_trace(guid, trace_id)
552
553     @Marshalling.handles(ADD_ADDRESS)
554     @Marshalling.args(int, str, int, Marshalling.pickled_data)
555     @Marshalling.retvoid
556     def defer_add_address(self, guid, address, netprefix, broadcast):
557         self._testbed.defer_add_address(guid, address, netprefix,
558                 broadcast)
559
560     @Marshalling.handles(ADD_ROUTE)
561     @Marshalling.args(int, str, int, str, int)
562     @Marshalling.retvoid
563     def defer_add_route(self, guid, destination, netprefix, nexthop, metric):
564         self._testbed.defer_add_route(guid, destination, netprefix, nexthop, metric)
565
566     @Marshalling.handles(DO_SETUP)
567     @Marshalling.args()
568     @Marshalling.retvoid
569     def do_setup(self):
570         self._testbed.do_setup()
571
572     @Marshalling.handles(DO_CREATE)
573     @Marshalling.args()
574     @Marshalling.retvoid
575     def do_create(self):
576         self._testbed.do_create()
577
578     @Marshalling.handles(DO_CONNECT_INIT)
579     @Marshalling.args()
580     @Marshalling.retvoid
581     def do_connect_init(self):
582         self._testbed.do_connect_init()
583
584     @Marshalling.handles(DO_CONNECT_COMPL)
585     @Marshalling.args()
586     @Marshalling.retvoid
587     def do_connect_compl(self):
588         self._testbed.do_connect_compl()
589
590     @Marshalling.handles(DO_CONFIGURE)
591     @Marshalling.args()
592     @Marshalling.retvoid
593     def do_configure(self):
594         self._testbed.do_configure()
595
596     @Marshalling.handles(DO_PRECONFIGURE)
597     @Marshalling.args()
598     @Marshalling.retvoid
599     def do_preconfigure(self):
600         self._testbed.do_preconfigure()
601
602     @Marshalling.handles(DO_PRESTART)
603     @Marshalling.args()
604     @Marshalling.retvoid
605     def do_prestart(self):
606         self._testbed.do_prestart()
607
608     @Marshalling.handles(DO_CROSS_CONNECT_INIT)
609     @Marshalling.args( Marshalling.Decoders.pickled_data )
610     @Marshalling.retvoid
611     def do_cross_connect_init(self, cross_data):
612         self._testbed.do_cross_connect_init(cross_data)
613
614     @Marshalling.handles(DO_CROSS_CONNECT_COMPL)
615     @Marshalling.args( Marshalling.Decoders.pickled_data )
616     @Marshalling.retvoid
617     def do_cross_connect_compl(self, cross_data):
618         self._testbed.do_cross_connect_compl(cross_data)
619
620     @Marshalling.handles(GET)
621     @Marshalling.args(int, Marshalling.base64_data, str)
622     @Marshalling.retval( Marshalling.pickled_data )
623     def get(self, guid, name, time):
624         return self._testbed.get(guid, name, time)
625
626     @Marshalling.handles(SET)
627     @Marshalling.args(int, Marshalling.base64_data, Marshalling.pickled_data, str)
628     @Marshalling.retvoid
629     def set(self, guid, name, value, time):
630         self._testbed.set(guid, name, value, time)
631
632     @Marshalling.handles(GET_ADDRESS)
633     @Marshalling.args(int, int, Marshalling.base64_data)
634     @Marshalling.retval()
635     def get_address(self, guid, index, attribute):
636         return str(self._testbed.get_address(guid, index, attribute))
637
638     @Marshalling.handles(GET_ROUTE)
639     @Marshalling.args(int, int, Marshalling.base64_data)
640     @Marshalling.retval()
641     def get_route(self, guid, index, attribute):
642         return str(self._testbed.get_route(guid, index, attribute))
643
644     @Marshalling.handles(ACTION)
645     @Marshalling.args(str, int, Marshalling.base64_data)
646     @Marshalling.retvoid
647     def action(self, time, guid, command):
648         self._testbed.action(time, guid, command)
649
650     @Marshalling.handles(STATUS)
651     @Marshalling.args(Marshalling.nullint)
652     @Marshalling.retval(int)
653     def status(self, guid):
654         return self._testbed.status(guid)
655
656     @Marshalling.handles(GET_ATTRIBUTE_LIST)
657     @Marshalling.args(int, Marshalling.nullint, Marshalling.bool)
658     @Marshalling.retval( Marshalling.pickled_data )
659     def get_attribute_list(self, guid, filter_flags = None, exclude = False):
660         return self._testbed.get_attribute_list(guid, filter_flags, exclude)
661
662     @Marshalling.handles(GET_FACTORY_ID)
663     @Marshalling.args(int)
664     @Marshalling.retval()
665     def get_factory_id(self, guid):
666         return self._testbed.get_factory_id(guid)
667
668 class ExperimentControllerServer(BaseServer):
669     def __init__(self, root_dir, log_level, experiment_xml):
670         super(ExperimentControllerServer, self).__init__(root_dir, log_level)
671         self._experiment_xml = experiment_xml
672         self._experiment = None
673
674     def post_daemonize(self):
675         from nepi.core.execute import ExperimentController
676         self._experiment = ExperimentController(self._experiment_xml, 
677             root_dir = self._root_dir)
678
679     @Marshalling.handles(GUIDS)
680     @Marshalling.args()
681     @Marshalling.retval( Marshalling.pickled_data )
682     def guids(self):
683         return self._experiment.guids
684
685     @Marshalling.handles(XML)
686     @Marshalling.args()
687     @Marshalling.retval()
688     def experiment_design_xml(self):
689         return self._experiment.experiment_design_xml
690         
691     @Marshalling.handles(EXEC_XML)
692     @Marshalling.args()
693     @Marshalling.retval()
694     def experiment_execute_xml(self):
695         return self._experiment.experiment_execute_xml
696         
697     @Marshalling.handles(TRACE)
698     @Marshalling.args(int, str, Marshalling.base64_data)
699     @Marshalling.retval()
700     def trace(self, guid, trace_id, attribute):
701         return str(self._experiment.trace(guid, trace_id, attribute))
702
703     @Marshalling.handles(TRACES_INFO)
704     @Marshalling.args()
705     @Marshalling.retval( Marshalling.pickled_data )
706     def traces_info(self):
707         return self._experiment.traces_info()
708
709     @Marshalling.handles(FINISHED)
710     @Marshalling.args(int)
711     @Marshalling.retval(Marshalling.bool)
712     def is_finished(self, guid):
713         return self._experiment.is_finished(guid)
714
715     @Marshalling.handles(GET)
716     @Marshalling.args(int, Marshalling.base64_data, str)
717     @Marshalling.retval( Marshalling.pickled_data )
718     def get(self, guid, name, time):
719         return self._experiment.get(guid, name, time)
720
721     @Marshalling.handles(SET)
722     @Marshalling.args(int, Marshalling.base64_data, Marshalling.pickled_data, str)
723     @Marshalling.retvoid
724     def set(self, guid, name, value, time):
725         self._experiment.set(guid, name, value, time)
726
727     @Marshalling.handles(START)
728     @Marshalling.args()
729     @Marshalling.retvoid
730     def start(self):
731         self._experiment.start()
732
733     @Marshalling.handles(STOP)
734     @Marshalling.args()
735     @Marshalling.retvoid
736     def stop(self):
737         self._experiment.stop()
738
739     @Marshalling.handles(RECOVER)
740     @Marshalling.args()
741     @Marshalling.retvoid
742     def recover(self):
743         self._experiment.recover()
744
745     @Marshalling.handles(SHUTDOWN)
746     @Marshalling.args()
747     @Marshalling.retvoid
748     def shutdown(self):
749         self._experiment.shutdown()
750
751     @Marshalling.handles(GET_TESTBED_ID)
752     @Marshalling.args(int)
753     @Marshalling.retval()
754     def get_testbed_id(self, guid):
755         return self._experiment.get_testbed_id(guid)
756
757     @Marshalling.handles(GET_FACTORY_ID)
758     @Marshalling.args(int)
759     @Marshalling.retval()
760     def get_factory_id(self, guid):
761         return self._experiment.get_factory_id(guid)
762
763     @Marshalling.handles(GET_TESTBED_VERSION)
764     @Marshalling.args(int)
765     @Marshalling.retval()
766     def get_testbed_version(self, guid):
767         return self._experiment.get_testbed_version(guid)
768
769 class BaseProxy(object):
770     _ServerClass = None
771     _ServerClassModule = "nepi.util.proxy"
772     
773     def __init__(self, 
774             ctor_args, root_dir, 
775             launch = True, host = None, 
776             port = None, user = None, ident_key = None, agent = None,
777             environment_setup = ""):
778         if launch:
779             # ssh
780             if host != None:
781                 python_code = (
782                     "from %(classmodule)s import %(classname)s;"
783                     "s = %(classname)s%(ctor_args)r;"
784                     "s.run()" 
785                 % dict(
786                     classname = self._ServerClass.__name__,
787                     classmodule = self._ServerClassModule,
788                     ctor_args = ctor_args
789                 ) )
790                 proc = server.popen_ssh_subprocess(python_code, host = host,
791                     port = port, user = user, agent = agent,
792                     ident_key = ident_key,
793                     environment_setup = environment_setup,
794                     waitcommand = True)
795                 if proc.poll():
796                     err = proc.stderr.read()
797                     raise RuntimeError, "Server could not be executed: %s" % (err,)
798             else:
799                 # launch daemon
800                 s = self._ServerClass(*ctor_args)
801                 s.run()
802
803         # connect client to server
804         self._client = server.Client(root_dir, host = host, port = port, 
805                 user = user, agent = agent, 
806                 environment_setup = environment_setup)
807     
808     @staticmethod
809     def _make_message(argtypes, argencoders, command, methname, classname, *args):
810         if len(argtypes) != len(argencoders):
811             raise ValueError, "Invalid arguments for _make_message: "\
812                 "in stub method %s of class %s "\
813                 "argtypes and argencoders must match in size" % (
814                     methname, classname )
815         if len(argtypes) != len(args):
816             raise ValueError, "Invalid arguments for _make_message: "\
817                 "in stub method %s of class %s "\
818                 "expected %d arguments, got %d" % (
819                     methname, classname,
820                     len(argtypes), len(args))
821         
822         buf = []
823         for argnum, (typ, (encode, fmt), val) in enumerate(zip(argtypes, argencoders, args)):
824             try:
825                 buf.append(fmt % encode(val))
826             except:
827                 import traceback
828                 raise TypeError, "Argument %d of stub method %s of class %s "\
829                     "requires a value of type %s, but got %s - nested error: %s" % (
830                         argnum, methname, classname,
831                         getattr(typ, '__name__', typ), type(val),
832                         traceback.format_exc()
833                 )
834         
835         return "%d|%s" % (command, '|'.join(buf))
836     
837     @staticmethod
838     def _parse_reply(rvtype, methname, classname, reply):
839         if not reply:
840             raise RuntimeError, "Invalid reply: %r "\
841                 "for stub method %s of class %s" % (
842                     reply,
843                     methname,
844                     classname)
845         
846         try:
847             result = reply.split("|")
848             code = int(result[0])
849             text = result[1]
850         except:
851             import traceback
852             raise TypeError, "Return value of stub method %s of class %s "\
853                 "cannot be parsed: must be of type %s, got %r - nested error: %s" % (
854                     methname, classname,
855                     getattr(rvtype, '__name__', rvtype), reply,
856                     traceback.format_exc()
857             )
858         if code == ERROR:
859             text = base64.b64decode(text)
860             raise RuntimeError(text)
861         elif code == OK:
862             try:
863                 if rvtype is None:
864                     return
865                 else:
866                     return rvtype(text)
867             except:
868                 import traceback
869                 raise TypeError, "Return value of stub method %s of class %s "\
870                     "cannot be parsed: must be of type %s - nested error: %s" % (
871                         methname, classname,
872                         getattr(rvtype, '__name__', rvtype),
873                         traceback.format_exc()
874                 )
875         else:
876             raise RuntimeError, "Invalid reply: %r "\
877                 "for stub method %s of class %s - unknown code" % (
878                     reply,
879                     methname,
880                     classname)
881     
882     @staticmethod
883     def _make_stubs(server_class, template_class):
884         """
885         Returns a dictionary method_name -> method
886         with stub methods.
887         
888         Usage:
889         
890             class SomeProxy(BaseProxy):
891                ...
892                
893                locals().update( BaseProxy._make_stubs(
894                     ServerClass,
895                     TemplateClass
896                ) )
897         
898         ServerClass is the corresponding Server class, as
899         specified in the _ServerClass class method (_make_stubs
900         is static and can't access the method), and TemplateClass
901         is the ultimate implementation class behind the server,
902         from which argument names and defaults are taken, to
903         maintain meaningful interfaces.
904         """
905         rv = {}
906         
907         class NONE: pass
908         
909         import os.path
910         func_template_path = os.path.join(
911             os.path.dirname(__file__),
912             'proxy_stub.tpl')
913         func_template_file = open(func_template_path, "r")
914         func_template = func_template_file.read()
915         func_template_file.close()
916         
917         for methname in vars(template_class).copy():
918             if methname.endswith('_deferred'):
919                 # cannot wrap deferreds...
920                 continue
921             dmethname = methname+'_deferred'
922             if hasattr(server_class, methname) and not methname.startswith('_'):
923                 template_meth = getattr(template_class, methname)
924                 server_meth = getattr(server_class, methname)
925                 
926                 command = getattr(server_meth, '_handles_command', None)
927                 argtypes = getattr(server_meth, '_argtypes', None)
928                 argencoders = getattr(server_meth, '_argencoders', None)
929                 rvtype = getattr(server_meth, '_retval', None)
930                 doprop = False
931                 
932                 if hasattr(template_meth, 'fget'):
933                     # property getter
934                     template_meth = template_meth.fget
935                     doprop = True
936                 
937                 if command is not None and argtypes is not None and argencoders is not None:
938                     # We have an interface method...
939                     code = template_meth.func_code
940                     argnames = code.co_varnames[:code.co_argcount]
941                     argdefaults = ( (NONE,) * (len(argnames) - len(template_meth.func_defaults or ()))
942                                   + (template_meth.func_defaults or ()) )
943                     
944                     func_globals = dict(
945                         BaseProxy = BaseProxy,
946                         argtypes = argtypes,
947                         argencoders = argencoders,
948                         rvtype = rvtype,
949                         functools = functools,
950                     )
951                     context = dict()
952                     
953                     func_text = func_template % dict(
954                         self = argnames[0],
955                         args = '%s' % (','.join(argnames[1:])),
956                         argdefs = ','.join([
957                             argname if argdef is NONE
958                             else "%s=%r" % (argname, argdef)
959                             for argname, argdef in zip(argnames[1:], argdefaults[1:])
960                         ]),
961                         command = command,
962                         methname = methname,
963                         classname = server_class.__name__
964                     )
965                     
966                     func_text = compile(
967                         func_text,
968                         func_template_path,
969                         'exec')
970                     
971                     exec func_text in func_globals, context
972                     
973                     if doprop:
974                         rv[methname] = property(context[methname])
975                         rv[dmethname] = property(context[dmethname])
976                     else:
977                         rv[methname] = context[methname]
978                         rv[dmethname] = context[dmethname]
979                     
980                     # inject _deferred into core classes
981                     if hasattr(template_class, methname) and not hasattr(template_class, dmethname):
982                         def freezename(methname, dmethname):
983                             def dmeth(self, *p, **kw): 
984                                 return getattr(self, methname)(*p, **kw)
985                             dmeth.__name__ = dmethname
986                             return dmeth
987                         dmeth = freezename(methname, dmethname)
988                         setattr(template_class, dmethname, dmeth)
989         
990         return rv
991                         
992 class TestbedControllerProxy(BaseProxy):
993     
994     _ServerClass = TestbedControllerServer
995     
996     def __init__(self, root_dir, log_level, testbed_id = None, 
997             testbed_version = None, launch = True, host = None, 
998             port = None, user = None, ident_key = None, agent = None,
999             environment_setup = ""):
1000         if launch and (testbed_id == None or testbed_version == None):
1001             raise RuntimeError("To launch a TesbedControllerServer a "
1002                     "testbed_id and testbed_version are required")
1003         super(TestbedControllerProxy,self).__init__(
1004             ctor_args = (root_dir, log_level, testbed_id, testbed_version),
1005             root_dir = root_dir,
1006             launch = launch, host = host, port = port, user = user,
1007             ident_key = ident_key, agent = agent, 
1008             environment_setup = environment_setup)
1009
1010     locals().update( BaseProxy._make_stubs(
1011         server_class = TestbedControllerServer,
1012         template_class = nepi.core.execute.TestbedController,
1013     ) )
1014     
1015     # Shutdown stops the serverside...
1016     def shutdown(self, _stub = shutdown):
1017         rv = _stub(self)
1018         self._client.send_stop()
1019         self._client.read_reply() # wait for it
1020         return rv
1021     
1022
1023 class ExperimentControllerProxy(BaseProxy):
1024     _ServerClass = ExperimentControllerServer
1025     
1026     def __init__(self, root_dir, log_level, experiment_xml = None, 
1027             launch = True, host = None, port = None, user = None, 
1028             ident_key = None, agent = None, environment_setup = ""):
1029         if launch and experiment_xml is None:
1030             raise RuntimeError("To launch a ExperimentControllerServer a \
1031                     xml description of the experiment is required")
1032         super(ExperimentControllerProxy,self).__init__(
1033             ctor_args = (root_dir, log_level, experiment_xml),
1034             root_dir = root_dir,
1035             launch = launch, host = host, port = port, user = user,
1036             ident_key = ident_key, agent = agent, 
1037             environment_setup = environment_setup)
1038
1039     locals().update( BaseProxy._make_stubs(
1040         server_class = ExperimentControllerServer,
1041         template_class = nepi.core.execute.ExperimentController,
1042     ) )
1043
1044     
1045     # Shutdown stops the serverside...
1046     def shutdown(self, _stub = shutdown):
1047         rv = _stub(self)
1048         self._client.send_stop()
1049         self._client.read_reply() # wait for it
1050         return rv
1051