059ccfff0ec44eaece4c92d006ae7d1ef26992ba
[nepi.git] / src / nepi / util / proxy.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 import base64
5 import nepi.core.execute
6 from nepi.core.attributes import AttributesMap, Attribute
7 from nepi.util import server, validation
8 from nepi.util.constants import TIME_NOW, ATTR_NEPI_TESTBED_ENVIRONMENT_SETUP, DeploymentConfiguration as DC
9 import getpass
10 import cPickle
11 import sys
12 import time
13 import tempfile
14 import shutil
15 import functools
16
17 # PROTOCOL REPLIES
18 OK = 0
19 ERROR = 1
20
21 # PROTOCOL INSTRUCTION MESSAGES
22 XML = 2 
23 TRACE   = 4
24 FINISHED    = 5
25 START   = 6
26 STOP    = 7
27 SHUTDOWN    = 8
28 CONFIGURE   = 9
29 CREATE      = 10
30 CREATE_SET  = 11
31 FACTORY_SET = 12
32 CONNECT     = 13
33 CROSS_CONNECT   = 14
34 ADD_TRACE   = 15
35 ADD_ADDRESS = 16
36 ADD_ROUTE   = 17
37 DO_SETUP    = 18
38 DO_CREATE   = 19
39 DO_CONNECT_INIT = 20
40 DO_CONFIGURE    = 21
41 DO_CROSS_CONNECT_INIT   = 22
42 GET = 23
43 SET = 24
44 ACTION  = 25
45 STATUS  = 26
46 GUIDS  = 27
47 GET_ROUTE = 28
48 GET_ADDRESS = 29
49 RECOVER = 30
50 DO_PRECONFIGURE     = 31
51 GET_ATTRIBUTE_LIST  = 32
52 DO_CONNECT_COMPL    = 33
53 DO_CROSS_CONNECT_COMPL  = 34
54 TESTBED_ID  = 35
55 TESTBED_VERSION  = 36
56 EXPERIMENT_SET = 37
57 EXPERIMENT_GET = 38
58 DO_PRESTART = 39
59
60 instruction_text = dict({
61     OK:     "OK",
62     ERROR:  "ERROR",
63     XML:    "XML",
64     TRACE:  "TRACE",
65     FINISHED:   "FINISHED",
66     START:  "START",
67     STOP:   "STOP",
68     RECOVER: "RECOVER",
69     SHUTDOWN:   "SHUTDOWN",
70     CONFIGURE:  "CONFIGURE",
71     CREATE: "CREATE",
72     CREATE_SET: "CREATE_SET",
73     FACTORY_SET:    "FACTORY_SET",
74     CONNECT:    "CONNECT",
75     CROSS_CONNECT: "CROSS_CONNECT",
76     ADD_TRACE:  "ADD_TRACE",
77     ADD_ADDRESS:    "ADD_ADDRESS",
78     ADD_ROUTE:  "ADD_ROUTE",
79     DO_SETUP:   "DO_SETUP",
80     DO_CREATE:  "DO_CREATE",
81     DO_CONNECT_INIT: "DO_CONNECT_INIT",
82     DO_CONNECT_COMPL: "DO_CONNECT_COMPL",
83     DO_CONFIGURE:   "DO_CONFIGURE",
84     DO_PRECONFIGURE:   "DO_PRECONFIGURE",
85     DO_CROSS_CONNECT_INIT:  "DO_CROSS_CONNECT_INIT",
86     DO_CROSS_CONNECT_COMPL: "DO_CROSS_CONNECT_COMPL",
87     GET:    "GET",
88     SET:    "SET",
89     GET_ROUTE: "GET_ROUTE",
90     GET_ADDRESS: "GET_ADDRESS",
91     GET_ATTRIBUTE_LIST: "GET_ATTRIBUTE_LIST",
92     ACTION: "ACTION",
93     STATUS: "STATUS",
94     GUIDS:  "GUIDS",
95     TESTBED_ID: "TESTBED_ID",
96     TESTBED_VERSION: "TESTBED_VERSION",
97     EXPERIMENT_SET: "EXPERIMENT_SET",
98     EXPERIMENT_GET: "EXPERIMENT_GET",
99     })
100
101 def log_msg(server, params):
102     try:
103         instr = int(params[0])
104         instr_txt = instruction_text[instr]
105         server.log_debug("%s - msg: %s [%s]" % (server.__class__.__name__, 
106             instr_txt, ", ".join(map(str, params[1:]))))
107     except:
108         # don't die for logging
109         pass
110
111 def log_reply(server, reply):
112     try:
113         res = reply.split("|")
114         code = int(res[0])
115         code_txt = instruction_text[code]
116         try:
117             txt = base64.b64decode(res[1])
118         except:
119             txt = res[1]
120         server.log_debug("%s - reply: %s %s" % (server.__class__.__name__, 
121                 code_txt, txt))
122     except:
123         # don't die for logging
124         server.log_debug("%s - reply: %s" % (server.__class__.__name__, 
125                 reply))
126         pass
127
128 def to_server_log_level(log_level):
129     return (
130         server.DEBUG_LEVEL
131             if log_level == DC.DEBUG_LEVEL 
132         else server.ERROR_LEVEL
133     )
134
135 def get_access_config_params(access_config):
136     root_dir = access_config.get_attribute_value(DC.ROOT_DIRECTORY)
137     log_level = access_config.get_attribute_value(DC.LOG_LEVEL)
138     log_level = to_server_log_level(log_level)
139     user = host = port = agent = key = None
140     communication = access_config.get_attribute_value(DC.DEPLOYMENT_COMMUNICATION)
141     environment_setup = (
142         access_config.get_attribute_value(DC.DEPLOYMENT_ENVIRONMENT_SETUP)
143         if access_config.has_attribute(DC.DEPLOYMENT_ENVIRONMENT_SETUP)
144         else None
145     )
146     if communication == DC.ACCESS_SSH:
147         user = access_config.get_attribute_value(DC.DEPLOYMENT_USER)
148         host = access_config.get_attribute_value(DC.DEPLOYMENT_HOST)
149         port = access_config.get_attribute_value(DC.DEPLOYMENT_PORT)
150         agent = access_config.get_attribute_value(DC.USE_AGENT)
151         key = access_config.get_attribute_value(DC.DEPLOYMENT_KEY)
152     return (root_dir, log_level, user, host, port, key, agent, environment_setup)
153
154 class AccessConfiguration(AttributesMap):
155     def __init__(self, params = None):
156         super(AccessConfiguration, self).__init__()
157         
158         from nepi.core.metadata import Metadata
159         
160         for _,attr_info in Metadata.DEPLOYMENT_ATTRIBUTES:
161             self.add_attribute(**attr_info)
162         
163         if params:
164             for attr_name, attr_value in params.iteritems():
165                 parser = Attribute.type_parsers[self.get_attribute_type(attr_name)]
166                 attr_value = parser(attr_value)
167                 self.set_attribute_value(attr_name, attr_value)
168
169 class TempDir(object):
170     def __init__(self):
171         self.path = tempfile.mkdtemp()
172     
173     def __del__(self):
174         shutil.rmtree(self.path)
175
176 class PermDir(object):
177     def __init__(self, path):
178         self.path = path
179
180 def create_controller(xml, access_config = None):
181     mode = None if not access_config \
182             else access_config.get_attribute_value(DC.DEPLOYMENT_MODE)
183     launch = True if not access_config \
184             else not access_config.get_attribute_value(DC.RECOVER)
185     if not mode or mode == DC.MODE_SINGLE_PROCESS:
186         if not launch:
187             raise ValueError, "Unsupported instantiation mode: %s with lanch=False" % (mode,)
188         
189         from nepi.core.execute import ExperimentController
190         
191         if not access_config or not access_config.has_attribute(DC.ROOT_DIRECTORY):
192             root_dir = TempDir()
193         else:
194             root_dir = PermDir(access_config.get_attribute_value(DC.ROOT_DIRECTORY))
195         controller = ExperimentController(xml, root_dir.path)
196         
197         # inject reference to temporary dir, so that it gets cleaned
198         # up at destruction time.
199         controller._tempdir = root_dir
200         
201         return controller
202     elif mode == DC.MODE_DAEMON:
203         (root_dir, log_level, user, host, port, key, agent, environment_setup) = \
204                 get_access_config_params(access_config)
205         return ExperimentControllerProxy(root_dir, log_level,
206                 experiment_xml = xml, host = host, port = port, user = user, ident_key = key,
207                 agent = agent, launch = launch,
208                 environment_setup = environment_setup)
209     raise RuntimeError("Unsupported access configuration '%s'" % mode)
210
211 def create_testbed_controller(testbed_id, testbed_version, access_config):
212     mode = None if not access_config \
213             else access_config.get_attribute_value(DC.DEPLOYMENT_MODE)
214     launch = True if not access_config \
215             else not access_config.get_attribute_value(DC.RECOVER)
216     if not mode or mode == DC.MODE_SINGLE_PROCESS:
217         if not launch:
218             raise ValueError, "Unsupported instantiation mode: %s with lanch=False" % (mode,)
219         return  _build_testbed_controller(testbed_id, testbed_version)
220     elif mode == DC.MODE_DAEMON:
221         (root_dir, log_level, user, host, port, key, agent, environment_setup) = \
222                 get_access_config_params(access_config)
223         return TestbedControllerProxy(root_dir, log_level, testbed_id = testbed_id, 
224                 testbed_version = testbed_version, host = host, port = port, ident_key = key,
225                 user = user, agent = agent, launch = launch,
226                 environment_setup = environment_setup)
227     raise RuntimeError("Unsupported access configuration '%s'" % mode)
228
229 def _build_testbed_controller(testbed_id, testbed_version):
230     mod_name = "nepi.testbeds.%s" % (testbed_id.lower())
231     if not mod_name in sys.modules:
232         __import__(mod_name)
233     module = sys.modules[mod_name]
234     return module.TestbedController(testbed_version)
235
236 # Just a namespace class
237 class Marshalling:
238     class Decoders:
239         @staticmethod
240         def pickled_data(sdata):
241             return cPickle.loads(base64.b64decode(sdata))
242         
243         @staticmethod
244         def base64_data(sdata):
245             return base64.b64decode(sdata)
246         
247         @staticmethod
248         def nullint(sdata):
249             return None if sdata == "None" else int(sdata)
250         
251         @staticmethod
252         def bool(sdata):
253             return sdata == 'True'
254         
255     class Encoders:
256         @staticmethod
257         def pickled_data(data):
258             return base64.b64encode(cPickle.dumps(data))
259         
260         @staticmethod
261         def base64_data(data):
262             return base64.b64encode(data)
263         
264         @staticmethod
265         def nullint(data):
266             return "None" if data is None else int(data)
267         
268         @staticmethod
269         def bool(data):
270             return str(bool(data))
271            
272     # import into Marshalling all the decoders
273     # they act as types
274     locals().update([
275         (typname, typ)
276         for typname, typ in vars(Decoders).iteritems()
277         if not typname.startswith('_')
278     ])
279
280     _TYPE_ENCODERS = dict([
281         # id(type) -> (<encoding_function>, <formatting_string>)
282         (typname, (getattr(Encoders,typname),"%s"))
283         for typname in vars(Decoders)
284         if not typname.startswith('_')
285            and hasattr(Encoders,typname)
286     ])
287
288     # Builtins
289     _TYPE_ENCODERS["float"] = (float, "%r")
290     _TYPE_ENCODERS["int"] = (int, "%d")
291     _TYPE_ENCODERS["long"] = (int, "%d")
292     _TYPE_ENCODERS["str"] = (str, "%s")
293     _TYPE_ENCODERS["unicode"] = (str, "%s")
294     
295     # Generic encoder
296     _TYPE_ENCODERS[None] = (str, "%s")
297     
298     @staticmethod
299     def args(*types):
300         """
301         Decorator that converts the given function into one that takes
302         a single "params" list, with each parameter marshalled according
303         to the given factory callable (type constructors are accepted).
304         
305         The first argument (self) is left untouched.
306         
307         eg:
308         
309         @Marshalling.args(int,int,str,base64_data)
310         def somefunc(self, someint, otherint, somestr, someb64):
311            return someretval
312         """
313         def decor(f):
314             @functools.wraps(f)
315             def rv(self, params):
316                 return f(self, *[ ctor(val)
317                                   for ctor,val in zip(types, params[1:]) ])
318             
319             rv._argtypes = types
320             
321             # Derive type encoders by looking up types in _TYPE_ENCODERS
322             # make_proxy will use it to encode arguments in command strings
323             argencoders = []
324             TYPE_ENCODERS = Marshalling._TYPE_ENCODERS
325             for typ in types:
326                 if typ.__name__ in TYPE_ENCODERS:
327                     argencoders.append(TYPE_ENCODERS[typ.__name__])
328                 else:
329                     # generic encoder
330                     argencoders.append(TYPE_ENCODERS[None])
331             
332             rv._argencoders = tuple(argencoders)
333             
334             rv._retval = getattr(f, '_retval', None)
335             return rv
336         return decor
337
338     @staticmethod
339     def retval(typ=Decoders.base64_data):
340         """
341         Decorator that converts the given function into one that 
342         returns a properly encoded return string, given that the undecorated
343         function returns suitable input for the encoding function.
344         
345         The optional typ argument specifies a type.
346         For the default of base64_data, return values should be strings.
347         The return value of the encoding method should be a string always.
348         
349         eg:
350         
351         @Marshalling.args(int,int,str,base64_data)
352         @Marshalling.retval(str)
353         def somefunc(self, someint, otherint, somestr, someb64):
354            return someint
355         """
356         encode, fmt = Marshalling._TYPE_ENCODERS.get(
357             typ.__name__,
358             Marshalling._TYPE_ENCODERS[None])
359         fmt = "%d|"+fmt
360         
361         def decor(f):
362             @functools.wraps(f)
363             def rv(self, *p, **kw):
364                 data = f(self, *p, **kw)
365                 return fmt % (
366                     OK,
367                     encode(data)
368                 )
369             rv._retval = typ
370             rv._argtypes = getattr(f, '_argtypes', None)
371             rv._argencoders = getattr(f, '_argencoders', None)
372             return rv
373         return decor
374     
375     @staticmethod
376     def retvoid(f):
377         """
378         Decorator that converts the given function into one that 
379         always return an encoded empty string.
380         
381         Useful for null-returning functions.
382         """
383         OKRV = "%d|" % (OK,)
384         
385         @functools.wraps(f)
386         def rv(self, *p, **kw):
387             f(self, *p, **kw)
388             return OKRV
389         
390         rv._retval = None
391         rv._argtypes = getattr(f, '_argtypes', None)
392         rv._argencoders = getattr(f, '_argencoders', None)
393         return rv
394     
395     @staticmethod
396     def handles(whichcommand):
397         """
398         Associates the method with a given command code for servers.
399         It should always be the topmost decorator.
400         """
401         def decor(f):
402             f._handles_command = whichcommand
403             return f
404         return decor
405
406 class BaseServer(server.Server):
407     def reply_action(self, msg):
408         if not msg:
409             result = base64.b64encode("Invalid command line")
410             reply = "%d|%s" % (ERROR, result)
411         else:
412             params = msg.split("|")
413             instruction = int(params[0])
414             log_msg(self, params)
415             try:
416                 for mname,meth in vars(self.__class__).iteritems():
417                     if not mname.startswith('_'):
418                         cmd = getattr(meth, '_handles_command', None)
419                         if cmd == instruction:
420                             meth = getattr(self, mname)
421                             reply = meth(params)
422                             break
423                 else:
424                     error = "Invalid instruction %s" % instruction
425                     self.log_error(error)
426                     result = base64.b64encode(error)
427                     reply = "%d|%s" % (ERROR, result)
428             except:
429                 error = self.log_error()
430                 result = base64.b64encode(error)
431                 reply = "%d|%s" % (ERROR, result)
432         log_reply(self, reply)
433         return reply
434
435 class TestbedControllerServer(BaseServer):
436     def __init__(self, root_dir, log_level, testbed_id, testbed_version):
437         super(TestbedControllerServer, self).__init__(root_dir, log_level)
438         self._testbed_id = testbed_id
439         self._testbed_version = testbed_version
440         self._testbed = None
441
442     def post_daemonize(self):
443         self._testbed = _build_testbed_controller(self._testbed_id, 
444                 self._testbed_version)
445
446     @Marshalling.handles(GUIDS)
447     @Marshalling.args()
448     @Marshalling.retval( Marshalling.pickled_data )
449     def guids(self):
450         return self._testbed.guids
451
452     @Marshalling.handles(TESTBED_ID)
453     @Marshalling.args()
454     @Marshalling.retval()
455     def testbed_id(self):
456         return str(self._testbed.testbed_id)
457
458     @Marshalling.handles(TESTBED_VERSION)
459     @Marshalling.args()
460     @Marshalling.retval()
461     def testbed_version(self):
462         return str(self._testbed.testbed_version)
463
464     @Marshalling.handles(CREATE)
465     @Marshalling.args(int, str)
466     @Marshalling.retvoid
467     def defer_create(self, guid, factory_id):
468         self._testbed.defer_create(guid, factory_id)
469
470     @Marshalling.handles(TRACE)
471     @Marshalling.args(int, str, Marshalling.base64_data)
472     @Marshalling.retval()
473     def trace(self, guid, trace_id, attribute):
474         return self._testbed.trace(guid, trace_id, attribute)
475
476     @Marshalling.handles(START)
477     @Marshalling.args()
478     @Marshalling.retvoid
479     def start(self):
480         self._testbed.start()
481
482     @Marshalling.handles(STOP)
483     @Marshalling.args()
484     @Marshalling.retvoid
485     def stop(self):
486         self._testbed.stop()
487
488     @Marshalling.handles(SHUTDOWN)
489     @Marshalling.args()
490     @Marshalling.retvoid
491     def shutdown(self):
492         self._testbed.shutdown()
493
494     @Marshalling.handles(CONFIGURE)
495     @Marshalling.args(Marshalling.base64_data, Marshalling.pickled_data)
496     @Marshalling.retvoid
497     def defer_configure(self, name, value):
498         self._testbed.defer_configure(name, value)
499
500     @Marshalling.handles(CREATE_SET)
501     @Marshalling.args(int, Marshalling.base64_data, Marshalling.pickled_data)
502     @Marshalling.retvoid
503     def defer_create_set(self, guid, name, value):
504         self._testbed.defer_create_set(guid, name, value)
505
506     @Marshalling.handles(FACTORY_SET)
507     @Marshalling.args(Marshalling.base64_data, Marshalling.pickled_data)
508     @Marshalling.retvoid
509     def defer_factory_set(self, name, value):
510         self._testbed.defer_factory_set(name, value)
511
512     @Marshalling.handles(CONNECT)
513     @Marshalling.args(int, str, int, str)
514     @Marshalling.retvoid
515     def defer_connect(self, guid1, connector_type_name1, guid2, connector_type_name2):
516         self._testbed.defer_connect(guid1, connector_type_name1, guid2, 
517             connector_type_name2)
518
519     @Marshalling.handles(CROSS_CONNECT)
520     @Marshalling.args(int, str, int, int, str, str, str)
521     @Marshalling.retvoid
522     def defer_cross_connect(self, 
523             guid, connector_type_name,
524             cross_guid, cross_testbed_guid,
525             cross_testbed_id, cross_factory_id,
526             cross_connector_type_name):
527         self._testbed.defer_cross_connect(guid, connector_type_name, cross_guid, 
528             cross_testbed_guid, cross_testbed_id, cross_factory_id, 
529             cross_connector_type_name)
530
531     @Marshalling.handles(ADD_TRACE)
532     @Marshalling.args(int, str)
533     @Marshalling.retvoid
534     def defer_add_trace(self, guid, trace_id):
535         self._testbed.defer_add_trace(guid, trace_id)
536
537     @Marshalling.handles(ADD_ADDRESS)
538     @Marshalling.args(int, str, int, str)
539     @Marshalling.retvoid
540     def defer_add_address(self, guid, address, netprefix, broadcast):
541         self._testbed.defer_add_address(guid, address, netprefix,
542                 broadcast)
543
544     @Marshalling.handles(ADD_ROUTE)
545     @Marshalling.args(int, str, int, str)
546     @Marshalling.retvoid
547     def defer_add_route(self, guid, destination, netprefix, nexthop):
548         self._testbed.defer_add_route(guid, destination, netprefix, nexthop)
549
550     @Marshalling.handles(DO_SETUP)
551     @Marshalling.args()
552     @Marshalling.retvoid
553     def do_setup(self):
554         self._testbed.do_setup()
555
556     @Marshalling.handles(DO_CREATE)
557     @Marshalling.args()
558     @Marshalling.retvoid
559     def do_create(self):
560         self._testbed.do_create()
561
562     @Marshalling.handles(DO_CONNECT_INIT)
563     @Marshalling.args()
564     @Marshalling.retvoid
565     def do_connect_init(self):
566         self._testbed.do_connect_init()
567
568     @Marshalling.handles(DO_CONNECT_COMPL)
569     @Marshalling.args()
570     @Marshalling.retvoid
571     def do_connect_compl(self):
572         self._testbed.do_connect_compl()
573
574     @Marshalling.handles(DO_CONFIGURE)
575     @Marshalling.args()
576     @Marshalling.retvoid
577     def do_configure(self):
578         self._testbed.do_configure()
579
580     @Marshalling.handles(DO_PRECONFIGURE)
581     @Marshalling.args()
582     @Marshalling.retvoid
583     def do_preconfigure(self):
584         self._testbed.do_preconfigure()
585
586     @Marshalling.handles(DO_PRESTART)
587     @Marshalling.args()
588     @Marshalling.retvoid
589     def do_prestart(self):
590         self._testbed.do_prestart()
591
592     @Marshalling.handles(DO_CROSS_CONNECT_INIT)
593     @Marshalling.args( Marshalling.Decoders.pickled_data )
594     @Marshalling.retvoid
595     def do_cross_connect_init(self, cross_data):
596         self._testbed.do_cross_connect_init(cross_data)
597
598     @Marshalling.handles(DO_CROSS_CONNECT_COMPL)
599     @Marshalling.args( Marshalling.Decoders.pickled_data )
600     @Marshalling.retvoid
601     def do_cross_connect_compl(self, cross_data):
602         self._testbed.do_cross_connect_compl(cross_data)
603
604     @Marshalling.handles(GET)
605     @Marshalling.args(int, Marshalling.base64_data, str)
606     @Marshalling.retval( Marshalling.pickled_data )
607     def get(self, guid, name, time):
608         return self._testbed.get(guid, name, time)
609
610     @Marshalling.handles(SET)
611     @Marshalling.args(int, Marshalling.base64_data, Marshalling.pickled_data, str)
612     @Marshalling.retvoid
613     def set(self, guid, name, value, time):
614         self._testbed.set(guid, name, value, time)
615
616     @Marshalling.handles(GET_ADDRESS)
617     @Marshalling.args(int, int, Marshalling.base64_data)
618     @Marshalling.retval()
619     def get_address(self, guid, index, attribute):
620         return str(self._testbed.get_address(guid, index, attribute))
621
622     @Marshalling.handles(GET_ROUTE)
623     @Marshalling.args(int, int, Marshalling.base64_data)
624     @Marshalling.retval()
625     def get_route(self, guid, index, attribute):
626         return str(self._testbed.get_route(guid, index, attribute))
627
628     @Marshalling.handles(ACTION)
629     @Marshalling.args(str, int, Marshalling.base64_data)
630     @Marshalling.retvoid
631     def action(self, time, guid, command):
632         self._testbed.action(time, guid, command)
633
634     @Marshalling.handles(STATUS)
635     @Marshalling.args(Marshalling.nullint)
636     @Marshalling.retval(int)
637     def status(self, guid):
638         return self._testbed.status(guid)
639
640     @Marshalling.handles(GET_ATTRIBUTE_LIST)
641     @Marshalling.args(int)
642     @Marshalling.retval( Marshalling.pickled_data )
643     def get_attribute_list(self, guid):
644         return self._testbed.get_attribute_list(guid)
645
646 class ExperimentControllerServer(BaseServer):
647     def __init__(self, root_dir, log_level, experiment_xml):
648         super(ExperimentControllerServer, self).__init__(root_dir, log_level)
649         self._experiment_xml = experiment_xml
650         self._controller = None
651
652     def post_daemonize(self):
653         from nepi.core.execute import ExperimentController
654         self._controller = ExperimentController(self._experiment_xml, 
655             root_dir = self._root_dir)
656
657     @Marshalling.handles(XML)
658     @Marshalling.args()
659     @Marshalling.retval()
660     def experiment_xml(self):
661         return self._controller.experiment_xml
662         
663     @Marshalling.handles(TRACE)
664     @Marshalling.args(int, int, str, Marshalling.base64_data)
665     @Marshalling.retval()
666     def trace(self, testbed_guid, guid, trace_id, attribute):
667         return str(self._controller.trace(testbed_guid, guid, trace_id, attribute))
668
669     @Marshalling.handles(FINISHED)
670     @Marshalling.args(int)
671     @Marshalling.retval(Marshalling.bool)
672     def is_finished(self, guid):
673         return self._controller.is_finished(guid)
674
675     @Marshalling.handles(EXPERIMENT_GET)
676     @Marshalling.args(int, int, Marshalling.base64_data, str)
677     @Marshalling.retval( Marshalling.pickled_data )
678     def get(self, testbed_guid, guid, name, time):
679         return self._controller.get(testbed_guid, guid, name, time)
680
681     @Marshalling.handles(EXPERIMENT_SET)
682     @Marshalling.args(int, int, Marshalling.base64_data, Marshalling.pickled_data, str)
683     @Marshalling.retvoid
684     def set(self, testbed_guid, guid, name, value, time):
685         self._controller.set(testbed_guid, guid, name, value, time)
686
687     @Marshalling.handles(START)
688     @Marshalling.args()
689     @Marshalling.retvoid
690     def start(self):
691         self._controller.start()
692
693     @Marshalling.handles(STOP)
694     @Marshalling.args()
695     @Marshalling.retvoid
696     def stop(self):
697         self._controller.stop()
698
699     @Marshalling.handles(RECOVER)
700     @Marshalling.args()
701     @Marshalling.retvoid
702     def recover(self):
703         self._controller.recover()
704
705     @Marshalling.handles(SHUTDOWN)
706     @Marshalling.args()
707     @Marshalling.retvoid
708     def shutdown(self):
709         self._controller.shutdown()
710
711 class BaseProxy(object):
712     _ServerClass = None
713     _ServerClassModule = "nepi.util.proxy"
714     
715     def __init__(self, 
716             ctor_args, root_dir, 
717             launch = True, host = None, 
718             port = None, user = None, ident_key = None, agent = None,
719             environment_setup = ""):
720         if launch:
721             # ssh
722             if host != None:
723                 python_code = (
724                     "from %(classmodule)s import %(classname)s;"
725                     "s = %(classname)s%(ctor_args)r;"
726                     "s.run()" 
727                 % dict(
728                     classname = self._ServerClass.__name__,
729                     classmodule = self._ServerClassModule,
730                     ctor_args = ctor_args
731                 ) )
732                 proc = server.popen_ssh_subprocess(python_code, host = host,
733                     port = port, user = user, agent = agent,
734                     ident_key = ident_key,
735                     environment_setup = environment_setup,
736                     waitcommand = True)
737                 if proc.poll():
738                     err = proc.stderr.read()
739                     raise RuntimeError, "Server could not be executed: %s" % (err,)
740             else:
741                 # launch daemon
742                 s = self._ServerClass(*ctor_args)
743                 s.run()
744
745         # connect client to server
746         self._client = server.Client(root_dir, host = host, port = port, 
747                 user = user, agent = agent, 
748                 environment_setup = environment_setup)
749     
750     @staticmethod
751     def _make_message(argtypes, argencoders, command, methname, classname, *args):
752         if len(argtypes) != len(argencoders):
753             raise ValueError, "Invalid arguments for _make_message: "\
754                 "in stub method %s of class %s "\
755                 "argtypes and argencoders must match in size" % (
756                     methname, classname )
757         if len(argtypes) != len(args):
758             raise ValueError, "Invalid arguments for _make_message: "\
759                 "in stub method %s of class %s "\
760                 "expected %d arguments, got %d" % (
761                     methname, classname,
762                     len(argtypes), len(args))
763         
764         buf = []
765         for argnum, (typ, (encode, fmt), val) in enumerate(zip(argtypes, argencoders, args)):
766             try:
767                 buf.append(fmt % encode(val))
768             except:
769                 import traceback
770                 raise TypeError, "Argument %d of stub method %s of class %s "\
771                     "requires a value of type %s, but got %s - nested error: %s" % (
772                         argnum, methname, classname,
773                         getattr(typ, '__name__', typ), type(val),
774                         traceback.format_exc()
775                 )
776         
777         return "%d|%s" % (command, '|'.join(buf))
778     
779     @staticmethod
780     def _parse_reply(rvtype, methname, classname, reply):
781         if not reply:
782             raise RuntimeError, "Invalid reply: %r "\
783                 "for stub method %s of class %s" % (
784                     reply,
785                     methname,
786                     classname)
787         
788         try:
789             result = reply.split("|")
790             code = int(result[0])
791             text = result[1]
792         except:
793             import traceback
794             raise TypeError, "Return value of stub method %s of class %s "\
795                 "cannot be parsed: must be of type %s, got %r - nested error: %s" % (
796                     methname, classname,
797                     getattr(rvtype, '__name__', rvtype), reply,
798                     traceback.format_exc()
799             )
800         if code == ERROR:
801             text = base64.b64decode(text)
802             raise RuntimeError(text)
803         elif code == OK:
804             try:
805                 if rvtype is None:
806                     return
807                 else:
808                     return rvtype(text)
809             except:
810                 import traceback
811                 raise TypeError, "Return value of stub method %s of class %s "\
812                     "cannot be parsed: must be of type %s - nested error: %s" % (
813                         methname, classname,
814                         getattr(rvtype, '__name__', rvtype),
815                         traceback.format_exc()
816                 )
817         else:
818             raise RuntimeError, "Invalid reply: %r "\
819                 "for stub method %s of class %s - unknown code" % (
820                     reply,
821                     methname,
822                     classname)
823     
824     @staticmethod
825     def _make_stubs(server_class, template_class):
826         """
827         Returns a dictionary method_name -> method
828         with stub methods.
829         
830         Usage:
831         
832             class SomeProxy(BaseProxy):
833                ...
834                
835                locals().update( BaseProxy._make_stubs(
836                     ServerClass,
837                     TemplateClass
838                ) )
839         
840         ServerClass is the corresponding Server class, as
841         specified in the _ServerClass class method (_make_stubs
842         is static and can't access the method), and TemplateClass
843         is the ultimate implementation class behind the server,
844         from which argument names and defaults are taken, to
845         maintain meaningful interfaces.
846         """
847         rv = {}
848         
849         class NONE: pass
850         
851         import os.path
852         func_template_path = os.path.join(
853             os.path.dirname(__file__),
854             'proxy_stub.tpl')
855         func_template_file = open(func_template_path, "r")
856         func_template = func_template_file.read()
857         func_template_file.close()
858         
859         for methname in vars(template_class):
860             if hasattr(server_class, methname) and not methname.startswith('_'):
861                 template_meth = getattr(template_class, methname)
862                 server_meth = getattr(server_class, methname)
863                 
864                 command = getattr(server_meth, '_handles_command', None)
865                 argtypes = getattr(server_meth, '_argtypes', None)
866                 argencoders = getattr(server_meth, '_argencoders', None)
867                 rvtype = getattr(server_meth, '_retval', None)
868                 doprop = False
869                 
870                 if hasattr(template_meth, 'fget'):
871                     # property getter
872                     template_meth = template_meth.fget
873                     doprop = True
874                 
875                 if command is not None and argtypes is not None and argencoders is not None:
876                     # We have an interface method...
877                     code = template_meth.func_code
878                     argnames = code.co_varnames[:code.co_argcount]
879                     argdefaults = ( (NONE,) * (len(argnames) - len(template_meth.func_defaults or ()))
880                                   + (template_meth.func_defaults or ()) )
881                     
882                     func_globals = dict(
883                         BaseProxy = BaseProxy,
884                         argtypes = argtypes,
885                         argencoders = argencoders,
886                         rvtype = rvtype,
887                     )
888                     context = dict()
889                     
890                     func_text = func_template % dict(
891                         self = argnames[0],
892                         args = '%s' % (','.join(argnames[1:])),
893                         argdefs = ','.join([
894                             argname if argdef is NONE
895                             else "%s=%r" % (argname, argdef)
896                             for argname, argdef in zip(argnames[1:], argdefaults[1:])
897                         ]),
898                         command = command,
899                         methname = methname,
900                         classname = server_class.__name__
901                     )
902                     
903                     func_text = compile(
904                         func_text,
905                         func_template_path,
906                         'exec')
907                     
908                     exec func_text in func_globals, context
909                     
910                     if doprop:
911                         rv[methname] = property(context[methname])
912                     else:
913                         rv[methname] = context[methname]
914         
915         return rv
916                         
917 class TestbedControllerProxy(BaseProxy):
918     
919     _ServerClass = TestbedControllerServer
920     
921     def __init__(self, root_dir, log_level, testbed_id = None, 
922             testbed_version = None, launch = True, host = None, 
923             port = None, user = None, ident_key = None, agent = None,
924             environment_setup = ""):
925         if launch and (testbed_id == None or testbed_version == None):
926             raise RuntimeError("To launch a TesbedControllerServer a "
927                     "testbed_id and testbed_version are required")
928         super(TestbedControllerProxy,self).__init__(
929             ctor_args = (root_dir, log_level, testbed_id, testbed_version),
930             root_dir = root_dir,
931             launch = launch, host = host, port = port, user = user,
932             ident_key = ident_key, agent = agent, 
933             environment_setup = environment_setup)
934
935     locals().update( BaseProxy._make_stubs(
936         server_class = TestbedControllerServer,
937         template_class = nepi.core.execute.TestbedController,
938     ) )
939     
940     # Shutdown stops the serverside...
941     def shutdown(self, _stub = shutdown):
942         rv = _stub(self)
943         self._client.send_stop()
944         self._client.read_reply() # wait for it
945         return rv
946     
947
948 class ExperimentControllerProxy(BaseProxy):
949     _ServerClass = ExperimentControllerServer
950     
951     def __init__(self, root_dir, log_level, experiment_xml = None, 
952             launch = True, host = None, port = None, user = None, 
953             ident_key = None, agent = None, environment_setup = ""):
954         if launch and experiment_xml is None:
955             raise RuntimeError("To launch a ExperimentControllerServer a \
956                     xml description of the experiment is required")
957         super(ExperimentControllerProxy,self).__init__(
958             ctor_args = (root_dir, log_level, experiment_xml),
959             root_dir = root_dir,
960             launch = launch, host = host, port = port, user = user,
961             ident_key = ident_key, agent = agent, 
962             environment_setup = environment_setup)
963
964     locals().update( BaseProxy._make_stubs(
965         server_class = ExperimentControllerServer,
966         template_class = nepi.core.execute.ExperimentController,
967     ) )
968
969     
970     # Shutdown stops the serverside...
971     def shutdown(self, _stub = shutdown):
972         rv = _stub(self)
973         self._client.send_stop()
974         self._client.read_reply() # wait for it
975         return rv
976