Deferred versions of proxied methods automatically injected that return deferred...
[nepi.git] / src / nepi / util / proxy.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 import base64
5 import nepi.core.execute
6 from nepi.core.attributes import AttributesMap, Attribute
7 from nepi.util import server, validation
8 from nepi.util.constants import TIME_NOW, ATTR_NEPI_TESTBED_ENVIRONMENT_SETUP, DeploymentConfiguration as DC
9 import getpass
10 import cPickle
11 import sys
12 import time
13 import tempfile
14 import shutil
15 import functools
16
17 # PROTOCOL REPLIES
18 OK = 0
19 ERROR = 1
20
21 # PROTOCOL INSTRUCTION MESSAGES
22 XML = 2 
23 TRACE   = 4
24 FINISHED    = 5
25 START   = 6
26 STOP    = 7
27 SHUTDOWN    = 8
28 CONFIGURE   = 9
29 CREATE      = 10
30 CREATE_SET  = 11
31 FACTORY_SET = 12
32 CONNECT     = 13
33 CROSS_CONNECT   = 14
34 ADD_TRACE   = 15
35 ADD_ADDRESS = 16
36 ADD_ROUTE   = 17
37 DO_SETUP    = 18
38 DO_CREATE   = 19
39 DO_CONNECT_INIT = 20
40 DO_CONFIGURE    = 21
41 DO_CROSS_CONNECT_INIT   = 22
42 GET = 23
43 SET = 24
44 ACTION  = 25
45 STATUS  = 26
46 GUIDS  = 27
47 GET_ROUTE = 28
48 GET_ADDRESS = 29
49 RECOVER = 30
50 DO_PRECONFIGURE     = 31
51 GET_ATTRIBUTE_LIST  = 32
52 DO_CONNECT_COMPL    = 33
53 DO_CROSS_CONNECT_COMPL  = 34
54 TESTBED_ID  = 35
55 TESTBED_VERSION  = 36
56 DO_PRESTART = 37
57 GET_FACTORY_ID = 38
58 GET_TESTBED_ID = 39
59 GET_TESTBED_VERSION = 40
60 TRACES_INFO = 41
61
62 instruction_text = dict({
63     OK:     "OK",
64     ERROR:  "ERROR",
65     XML:    "XML",
66     TRACE:  "TRACE",
67     FINISHED:   "FINISHED",
68     START:  "START",
69     STOP:   "STOP",
70     RECOVER: "RECOVER",
71     SHUTDOWN:   "SHUTDOWN",
72     CONFIGURE:  "CONFIGURE",
73     CREATE: "CREATE",
74     CREATE_SET: "CREATE_SET",
75     FACTORY_SET:    "FACTORY_SET",
76     CONNECT:    "CONNECT",
77     CROSS_CONNECT: "CROSS_CONNECT",
78     ADD_TRACE:  "ADD_TRACE",
79     ADD_ADDRESS:    "ADD_ADDRESS",
80     ADD_ROUTE:  "ADD_ROUTE",
81     DO_SETUP:   "DO_SETUP",
82     DO_CREATE:  "DO_CREATE",
83     DO_CONNECT_INIT: "DO_CONNECT_INIT",
84     DO_CONNECT_COMPL: "DO_CONNECT_COMPL",
85     DO_CONFIGURE:   "DO_CONFIGURE",
86     DO_PRECONFIGURE:   "DO_PRECONFIGURE",
87     DO_CROSS_CONNECT_INIT:  "DO_CROSS_CONNECT_INIT",
88     DO_CROSS_CONNECT_COMPL: "DO_CROSS_CONNECT_COMPL",
89     GET:    "GET",
90     SET:    "SET",
91     GET_ROUTE: "GET_ROUTE",
92     GET_ADDRESS: "GET_ADDRESS",
93     GET_ATTRIBUTE_LIST: "GET_ATTRIBUTE_LIST",
94     GET_FACTORY_ID: "GET_FACTORY_ID",
95     GET_TESTBED_ID: "GET_TESTBED_ID",
96     GET_TESTBED_VERSION: "GET_TESTBED_VERSION",
97     ACTION: "ACTION",
98     STATUS: "STATUS",
99     GUIDS:  "GUIDS",
100     TESTBED_ID: "TESTBED_ID",
101     TESTBED_VERSION: "TESTBED_VERSION",
102     TRACES_INFO: "TRACES_INFO",
103     })
104
105 def log_msg(server, params):
106     try:
107         instr = int(params[0])
108         instr_txt = instruction_text[instr]
109         server.log_debug("%s - msg: %s [%s]" % (server.__class__.__name__, 
110             instr_txt, ", ".join(map(str, params[1:]))))
111     except:
112         # don't die for logging
113         pass
114
115 def log_reply(server, reply):
116     try:
117         res = reply.split("|")
118         code = int(res[0])
119         code_txt = instruction_text[code]
120         try:
121             txt = base64.b64decode(res[1])
122         except:
123             txt = res[1]
124         server.log_debug("%s - reply: %s %s" % (server.__class__.__name__, 
125                 code_txt, txt))
126     except:
127         # don't die for logging
128         server.log_debug("%s - reply: %s" % (server.__class__.__name__, 
129                 reply))
130         pass
131
132 def to_server_log_level(log_level):
133     return (
134         server.DEBUG_LEVEL
135             if log_level == DC.DEBUG_LEVEL 
136         else server.ERROR_LEVEL
137     )
138
139 def get_access_config_params(access_config):
140     root_dir = access_config.get_attribute_value(DC.ROOT_DIRECTORY)
141     log_level = access_config.get_attribute_value(DC.LOG_LEVEL)
142     log_level = to_server_log_level(log_level)
143     user = host = port = agent = key = None
144     communication = access_config.get_attribute_value(DC.DEPLOYMENT_COMMUNICATION)
145     environment_setup = (
146         access_config.get_attribute_value(DC.DEPLOYMENT_ENVIRONMENT_SETUP)
147         if access_config.has_attribute(DC.DEPLOYMENT_ENVIRONMENT_SETUP)
148         else None
149     )
150     if communication == DC.ACCESS_SSH:
151         user = access_config.get_attribute_value(DC.DEPLOYMENT_USER)
152         host = access_config.get_attribute_value(DC.DEPLOYMENT_HOST)
153         port = access_config.get_attribute_value(DC.DEPLOYMENT_PORT)
154         agent = access_config.get_attribute_value(DC.USE_AGENT)
155         key = access_config.get_attribute_value(DC.DEPLOYMENT_KEY)
156     return (root_dir, log_level, user, host, port, key, agent, environment_setup)
157
158 class AccessConfiguration(AttributesMap):
159     def __init__(self, params = None):
160         super(AccessConfiguration, self).__init__()
161         
162         from nepi.core.metadata import Metadata
163         
164         for _,attr_info in Metadata.DEPLOYMENT_ATTRIBUTES:
165             self.add_attribute(**attr_info)
166         
167         if params:
168             for attr_name, attr_value in params.iteritems():
169                 parser = Attribute.type_parsers[self.get_attribute_type(attr_name)]
170                 attr_value = parser(attr_value)
171                 self.set_attribute_value(attr_name, attr_value)
172
173 class TempDir(object):
174     def __init__(self):
175         self.path = tempfile.mkdtemp()
176     
177     def __del__(self):
178         shutil.rmtree(self.path)
179
180 class PermDir(object):
181     def __init__(self, path):
182         self.path = path
183
184 def create_experiment_controller(xml, access_config = None):
185     mode = None if not access_config \
186             else access_config.get_attribute_value(DC.DEPLOYMENT_MODE)
187     launch = True if not access_config \
188             else not access_config.get_attribute_value(DC.RECOVER)
189     if not mode or mode == DC.MODE_SINGLE_PROCESS:
190         if not launch:
191             raise ValueError, "Unsupported instantiation mode: %s with lanch=False" % (mode,)
192         
193         from nepi.core.execute import ExperimentController
194         
195         if not access_config or not access_config.has_attribute(DC.ROOT_DIRECTORY):
196             root_dir = TempDir()
197         else:
198             root_dir = PermDir(access_config.get_attribute_value(DC.ROOT_DIRECTORY))
199         controller = ExperimentController(xml, root_dir.path)
200         
201         # inject reference to temporary dir, so that it gets cleaned
202         # up at destruction time.
203         controller._tempdir = root_dir
204         
205         return controller
206     elif mode == DC.MODE_DAEMON:
207         (root_dir, log_level, user, host, port, key, agent, environment_setup) = \
208                 get_access_config_params(access_config)
209         return ExperimentControllerProxy(root_dir, log_level,
210                 experiment_xml = xml, host = host, port = port, user = user, ident_key = key,
211                 agent = agent, launch = launch,
212                 environment_setup = environment_setup)
213     raise RuntimeError("Unsupported access configuration '%s'" % mode)
214
215 def create_testbed_controller(testbed_id, testbed_version, access_config):
216     mode = None if not access_config \
217             else access_config.get_attribute_value(DC.DEPLOYMENT_MODE)
218     launch = True if not access_config \
219             else not access_config.get_attribute_value(DC.RECOVER)
220     if not mode or mode == DC.MODE_SINGLE_PROCESS:
221         if not launch:
222             raise ValueError, "Unsupported instantiation mode: %s with lanch=False" % (mode,)
223         return  _build_testbed_controller(testbed_id, testbed_version)
224     elif mode == DC.MODE_DAEMON:
225         (root_dir, log_level, user, host, port, key, agent, environment_setup) = \
226                 get_access_config_params(access_config)
227         return TestbedControllerProxy(root_dir, log_level, testbed_id = testbed_id, 
228                 testbed_version = testbed_version, host = host, port = port, ident_key = key,
229                 user = user, agent = agent, launch = launch,
230                 environment_setup = environment_setup)
231     raise RuntimeError("Unsupported access configuration '%s'" % mode)
232
233 def _build_testbed_controller(testbed_id, testbed_version):
234     mod_name = "nepi.testbeds.%s" % (testbed_id.lower())
235     if not mod_name in sys.modules:
236         __import__(mod_name)
237     module = sys.modules[mod_name]
238     return module.TestbedController(testbed_version)
239
240 # Just a namespace class
241 class Marshalling:
242     class Decoders:
243         @staticmethod
244         def pickled_data(sdata):
245             return cPickle.loads(base64.b64decode(sdata))
246         
247         @staticmethod
248         def base64_data(sdata):
249             return base64.b64decode(sdata)
250         
251         @staticmethod
252         def nullint(sdata):
253             return None if sdata == "None" else int(sdata)
254         
255         @staticmethod
256         def bool(sdata):
257             return sdata == 'True'
258         
259     class Encoders:
260         @staticmethod
261         def pickled_data(data):
262             return base64.b64encode(cPickle.dumps(data))
263         
264         @staticmethod
265         def base64_data(data):
266             return base64.b64encode(data)
267         
268         @staticmethod
269         def nullint(data):
270             return "None" if data is None else int(data)
271         
272         @staticmethod
273         def bool(data):
274             return str(bool(data))
275            
276     # import into Marshalling all the decoders
277     # they act as types
278     locals().update([
279         (typname, typ)
280         for typname, typ in vars(Decoders).iteritems()
281         if not typname.startswith('_')
282     ])
283
284     _TYPE_ENCODERS = dict([
285         # id(type) -> (<encoding_function>, <formatting_string>)
286         (typname, (getattr(Encoders,typname),"%s"))
287         for typname in vars(Decoders)
288         if not typname.startswith('_')
289            and hasattr(Encoders,typname)
290     ])
291
292     # Builtins
293     _TYPE_ENCODERS["float"] = (float, "%r")
294     _TYPE_ENCODERS["int"] = (int, "%d")
295     _TYPE_ENCODERS["long"] = (int, "%d")
296     _TYPE_ENCODERS["str"] = (str, "%s")
297     _TYPE_ENCODERS["unicode"] = (str, "%s")
298     
299     # Generic encoder
300     _TYPE_ENCODERS[None] = (str, "%s")
301     
302     @staticmethod
303     def args(*types):
304         """
305         Decorator that converts the given function into one that takes
306         a single "params" list, with each parameter marshalled according
307         to the given factory callable (type constructors are accepted).
308         
309         The first argument (self) is left untouched.
310         
311         eg:
312         
313         @Marshalling.args(int,int,str,base64_data)
314         def somefunc(self, someint, otherint, somestr, someb64):
315            return someretval
316         """
317         def decor(f):
318             @functools.wraps(f)
319             def rv(self, params):
320                 return f(self, *[ ctor(val)
321                                   for ctor,val in zip(types, params[1:]) ])
322             
323             rv._argtypes = types
324             
325             # Derive type encoders by looking up types in _TYPE_ENCODERS
326             # make_proxy will use it to encode arguments in command strings
327             argencoders = []
328             TYPE_ENCODERS = Marshalling._TYPE_ENCODERS
329             for typ in types:
330                 if typ.__name__ in TYPE_ENCODERS:
331                     argencoders.append(TYPE_ENCODERS[typ.__name__])
332                 else:
333                     # generic encoder
334                     argencoders.append(TYPE_ENCODERS[None])
335             
336             rv._argencoders = tuple(argencoders)
337             
338             rv._retval = getattr(f, '_retval', None)
339             return rv
340         return decor
341
342     @staticmethod
343     def retval(typ=Decoders.base64_data):
344         """
345         Decorator that converts the given function into one that 
346         returns a properly encoded return string, given that the undecorated
347         function returns suitable input for the encoding function.
348         
349         The optional typ argument specifies a type.
350         For the default of base64_data, return values should be strings.
351         The return value of the encoding method should be a string always.
352         
353         eg:
354         
355         @Marshalling.args(int,int,str,base64_data)
356         @Marshalling.retval(str)
357         def somefunc(self, someint, otherint, somestr, someb64):
358            return someint
359         """
360         encode, fmt = Marshalling._TYPE_ENCODERS.get(
361             typ.__name__,
362             Marshalling._TYPE_ENCODERS[None])
363         fmt = "%d|"+fmt
364         
365         def decor(f):
366             @functools.wraps(f)
367             def rv(self, *p, **kw):
368                 data = f(self, *p, **kw)
369                 return fmt % (
370                     OK,
371                     encode(data)
372                 )
373             rv._retval = typ
374             rv._argtypes = getattr(f, '_argtypes', None)
375             rv._argencoders = getattr(f, '_argencoders', None)
376             return rv
377         return decor
378     
379     @staticmethod
380     def retvoid(f):
381         """
382         Decorator that converts the given function into one that 
383         always return an encoded empty string.
384         
385         Useful for null-returning functions.
386         """
387         OKRV = "%d|" % (OK,)
388         
389         @functools.wraps(f)
390         def rv(self, *p, **kw):
391             f(self, *p, **kw)
392             return OKRV
393         
394         rv._retval = None
395         rv._argtypes = getattr(f, '_argtypes', None)
396         rv._argencoders = getattr(f, '_argencoders', None)
397         return rv
398     
399     @staticmethod
400     def handles(whichcommand):
401         """
402         Associates the method with a given command code for servers.
403         It should always be the topmost decorator.
404         """
405         def decor(f):
406             f._handles_command = whichcommand
407             return f
408         return decor
409
410 class BaseServer(server.Server):
411     def reply_action(self, msg):
412         if not msg:
413             result = base64.b64encode("Invalid command line")
414             reply = "%d|%s" % (ERROR, result)
415         else:
416             params = msg.split("|")
417             instruction = int(params[0])
418             log_msg(self, params)
419             try:
420                 for mname,meth in vars(self.__class__).iteritems():
421                     if not mname.startswith('_'):
422                         cmd = getattr(meth, '_handles_command', None)
423                         if cmd == instruction:
424                             meth = getattr(self, mname)
425                             reply = meth(params)
426                             break
427                 else:
428                     error = "Invalid instruction %s" % instruction
429                     self.log_error(error)
430                     result = base64.b64encode(error)
431                     reply = "%d|%s" % (ERROR, result)
432             except:
433                 error = self.log_error()
434                 result = base64.b64encode(error)
435                 reply = "%d|%s" % (ERROR, result)
436         log_reply(self, reply)
437         return reply
438
439 class TestbedControllerServer(BaseServer):
440     def __init__(self, root_dir, log_level, testbed_id, testbed_version):
441         super(TestbedControllerServer, self).__init__(root_dir, log_level)
442         self._testbed_id = testbed_id
443         self._testbed_version = testbed_version
444         self._testbed = None
445
446     def post_daemonize(self):
447         self._testbed = _build_testbed_controller(self._testbed_id, 
448                 self._testbed_version)
449
450     @Marshalling.handles(GUIDS)
451     @Marshalling.args()
452     @Marshalling.retval( Marshalling.pickled_data )
453     def guids(self):
454         return self._testbed.guids
455
456     @Marshalling.handles(TESTBED_ID)
457     @Marshalling.args()
458     @Marshalling.retval()
459     def testbed_id(self):
460         return str(self._testbed.testbed_id)
461
462     @Marshalling.handles(TESTBED_VERSION)
463     @Marshalling.args()
464     @Marshalling.retval()
465     def testbed_version(self):
466         return str(self._testbed.testbed_version)
467
468     @Marshalling.handles(CREATE)
469     @Marshalling.args(int, str)
470     @Marshalling.retvoid
471     def defer_create(self, guid, factory_id):
472         self._testbed.defer_create(guid, factory_id)
473
474     @Marshalling.handles(TRACE)
475     @Marshalling.args(int, str, Marshalling.base64_data)
476     @Marshalling.retval()
477     def trace(self, guid, trace_id, attribute):
478         return self._testbed.trace(guid, trace_id, attribute)
479
480     @Marshalling.handles(TRACES_INFO)
481     @Marshalling.args()
482     @Marshalling.retval( Marshalling.pickled_data )
483     def traces_info(self):
484         return self._testbed.traces_info()
485
486     @Marshalling.handles(START)
487     @Marshalling.args()
488     @Marshalling.retvoid
489     def start(self):
490         self._testbed.start()
491
492     @Marshalling.handles(STOP)
493     @Marshalling.args()
494     @Marshalling.retvoid
495     def stop(self):
496         self._testbed.stop()
497
498     @Marshalling.handles(SHUTDOWN)
499     @Marshalling.args()
500     @Marshalling.retvoid
501     def shutdown(self):
502         self._testbed.shutdown()
503
504     @Marshalling.handles(CONFIGURE)
505     @Marshalling.args(Marshalling.base64_data, Marshalling.pickled_data)
506     @Marshalling.retvoid
507     def defer_configure(self, name, value):
508         self._testbed.defer_configure(name, value)
509
510     @Marshalling.handles(CREATE_SET)
511     @Marshalling.args(int, Marshalling.base64_data, Marshalling.pickled_data)
512     @Marshalling.retvoid
513     def defer_create_set(self, guid, name, value):
514         self._testbed.defer_create_set(guid, name, value)
515
516     @Marshalling.handles(FACTORY_SET)
517     @Marshalling.args(Marshalling.base64_data, Marshalling.pickled_data)
518     @Marshalling.retvoid
519     def defer_factory_set(self, name, value):
520         self._testbed.defer_factory_set(name, value)
521
522     @Marshalling.handles(CONNECT)
523     @Marshalling.args(int, str, int, str)
524     @Marshalling.retvoid
525     def defer_connect(self, guid1, connector_type_name1, guid2, connector_type_name2):
526         self._testbed.defer_connect(guid1, connector_type_name1, guid2, 
527             connector_type_name2)
528
529     @Marshalling.handles(CROSS_CONNECT)
530     @Marshalling.args(int, str, int, int, str, str, str)
531     @Marshalling.retvoid
532     def defer_cross_connect(self, 
533             guid, connector_type_name,
534             cross_guid, cross_testbed_guid,
535             cross_testbed_id, cross_factory_id,
536             cross_connector_type_name):
537         self._testbed.defer_cross_connect(guid, connector_type_name, cross_guid, 
538             cross_testbed_guid, cross_testbed_id, cross_factory_id, 
539             cross_connector_type_name)
540
541     @Marshalling.handles(ADD_TRACE)
542     @Marshalling.args(int, str)
543     @Marshalling.retvoid
544     def defer_add_trace(self, guid, trace_id):
545         self._testbed.defer_add_trace(guid, trace_id)
546
547     @Marshalling.handles(ADD_ADDRESS)
548     @Marshalling.args(int, str, int, str)
549     @Marshalling.retvoid
550     def defer_add_address(self, guid, address, netprefix, broadcast):
551         self._testbed.defer_add_address(guid, address, netprefix,
552                 broadcast)
553
554     @Marshalling.handles(ADD_ROUTE)
555     @Marshalling.args(int, str, int, str)
556     @Marshalling.retvoid
557     def defer_add_route(self, guid, destination, netprefix, nexthop):
558         self._testbed.defer_add_route(guid, destination, netprefix, nexthop)
559
560     @Marshalling.handles(DO_SETUP)
561     @Marshalling.args()
562     @Marshalling.retvoid
563     def do_setup(self):
564         self._testbed.do_setup()
565
566     @Marshalling.handles(DO_CREATE)
567     @Marshalling.args()
568     @Marshalling.retvoid
569     def do_create(self):
570         self._testbed.do_create()
571
572     @Marshalling.handles(DO_CONNECT_INIT)
573     @Marshalling.args()
574     @Marshalling.retvoid
575     def do_connect_init(self):
576         self._testbed.do_connect_init()
577
578     @Marshalling.handles(DO_CONNECT_COMPL)
579     @Marshalling.args()
580     @Marshalling.retvoid
581     def do_connect_compl(self):
582         self._testbed.do_connect_compl()
583
584     @Marshalling.handles(DO_CONFIGURE)
585     @Marshalling.args()
586     @Marshalling.retvoid
587     def do_configure(self):
588         self._testbed.do_configure()
589
590     @Marshalling.handles(DO_PRECONFIGURE)
591     @Marshalling.args()
592     @Marshalling.retvoid
593     def do_preconfigure(self):
594         self._testbed.do_preconfigure()
595
596     @Marshalling.handles(DO_PRESTART)
597     @Marshalling.args()
598     @Marshalling.retvoid
599     def do_prestart(self):
600         self._testbed.do_prestart()
601
602     @Marshalling.handles(DO_CROSS_CONNECT_INIT)
603     @Marshalling.args( Marshalling.Decoders.pickled_data )
604     @Marshalling.retvoid
605     def do_cross_connect_init(self, cross_data):
606         self._testbed.do_cross_connect_init(cross_data)
607
608     @Marshalling.handles(DO_CROSS_CONNECT_COMPL)
609     @Marshalling.args( Marshalling.Decoders.pickled_data )
610     @Marshalling.retvoid
611     def do_cross_connect_compl(self, cross_data):
612         self._testbed.do_cross_connect_compl(cross_data)
613
614     @Marshalling.handles(GET)
615     @Marshalling.args(int, Marshalling.base64_data, str)
616     @Marshalling.retval( Marshalling.pickled_data )
617     def get(self, guid, name, time):
618         return self._testbed.get(guid, name, time)
619
620     @Marshalling.handles(SET)
621     @Marshalling.args(int, Marshalling.base64_data, Marshalling.pickled_data, str)
622     @Marshalling.retvoid
623     def set(self, guid, name, value, time):
624         self._testbed.set(guid, name, value, time)
625
626     @Marshalling.handles(GET_ADDRESS)
627     @Marshalling.args(int, int, Marshalling.base64_data)
628     @Marshalling.retval()
629     def get_address(self, guid, index, attribute):
630         return str(self._testbed.get_address(guid, index, attribute))
631
632     @Marshalling.handles(GET_ROUTE)
633     @Marshalling.args(int, int, Marshalling.base64_data)
634     @Marshalling.retval()
635     def get_route(self, guid, index, attribute):
636         return str(self._testbed.get_route(guid, index, attribute))
637
638     @Marshalling.handles(ACTION)
639     @Marshalling.args(str, int, Marshalling.base64_data)
640     @Marshalling.retvoid
641     def action(self, time, guid, command):
642         self._testbed.action(time, guid, command)
643
644     @Marshalling.handles(STATUS)
645     @Marshalling.args(Marshalling.nullint)
646     @Marshalling.retval(int)
647     def status(self, guid):
648         return self._testbed.status(guid)
649
650     @Marshalling.handles(GET_ATTRIBUTE_LIST)
651     @Marshalling.args(int)
652     @Marshalling.retval( Marshalling.pickled_data )
653     def get_attribute_list(self, guid):
654         return self._testbed.get_attribute_list(guid)
655
656     @Marshalling.handles(GET_FACTORY_ID)
657     @Marshalling.args(int)
658     @Marshalling.retval()
659     def get_factory_id(self, guid):
660         return self._testbed.get_factory_id(guid)
661
662 class ExperimentControllerServer(BaseServer):
663     def __init__(self, root_dir, log_level, experiment_xml):
664         super(ExperimentControllerServer, self).__init__(root_dir, log_level)
665         self._experiment_xml = experiment_xml
666         self._experiment = None
667
668     def post_daemonize(self):
669         from nepi.core.execute import ExperimentController
670         self._experiment = ExperimentController(self._experiment_xml, 
671             root_dir = self._root_dir)
672
673     @Marshalling.handles(GUIDS)
674     @Marshalling.args()
675     @Marshalling.retval( Marshalling.pickled_data )
676     def guids(self):
677         return self._experiment.guids
678
679     @Marshalling.handles(XML)
680     @Marshalling.args()
681     @Marshalling.retval()
682     def experiment_xml(self):
683         return self._experiment.experiment_xml
684         
685     @Marshalling.handles(TRACE)
686     @Marshalling.args(int, str, Marshalling.base64_data)
687     @Marshalling.retval()
688     def trace(self, guid, trace_id, attribute):
689         return str(self._experiment.trace(guid, trace_id, attribute))
690
691     @Marshalling.handles(TRACES_INFO)
692     @Marshalling.args()
693     @Marshalling.retval( Marshalling.pickled_data )
694     def traces_info(self):
695         return self._experiment.traces_info()
696
697     @Marshalling.handles(FINISHED)
698     @Marshalling.args(int)
699     @Marshalling.retval(Marshalling.bool)
700     def is_finished(self, guid):
701         return self._experiment.is_finished(guid)
702
703     @Marshalling.handles(GET)
704     @Marshalling.args(int, Marshalling.base64_data, str)
705     @Marshalling.retval( Marshalling.pickled_data )
706     def get(self, guid, name, time):
707         return self._experiment.get(guid, name, time)
708
709     @Marshalling.handles(SET)
710     @Marshalling.args(int, Marshalling.base64_data, Marshalling.pickled_data, str)
711     @Marshalling.retvoid
712     def set(self, guid, name, value, time):
713         self._experiment.set(guid, name, value, time)
714
715     @Marshalling.handles(START)
716     @Marshalling.args()
717     @Marshalling.retvoid
718     def start(self):
719         self._experiment.start()
720
721     @Marshalling.handles(STOP)
722     @Marshalling.args()
723     @Marshalling.retvoid
724     def stop(self):
725         self._experiment.stop()
726
727     @Marshalling.handles(RECOVER)
728     @Marshalling.args()
729     @Marshalling.retvoid
730     def recover(self):
731         self._experiment.recover()
732
733     @Marshalling.handles(SHUTDOWN)
734     @Marshalling.args()
735     @Marshalling.retvoid
736     def shutdown(self):
737         self._experiment.shutdown()
738
739     @Marshalling.handles(GET_TESTBED_ID)
740     @Marshalling.args(int)
741     @Marshalling.retval()
742     def get_testbed_id(self, guid):
743         return self._experiment.get_testbed_id(guid)
744
745     @Marshalling.handles(GET_FACTORY_ID)
746     @Marshalling.args(int)
747     @Marshalling.retval()
748     def get_factory_id(self, guid):
749         return self._experiment.get_factory_id(guid)
750
751     @Marshalling.handles(GET_TESTBED_VERSION)
752     @Marshalling.args(int)
753     @Marshalling.retval()
754     def get_testbed_version(self, guid):
755         return self._experiment.get_testbed_version(guid)
756
757 class BaseProxy(object):
758     _ServerClass = None
759     _ServerClassModule = "nepi.util.proxy"
760     
761     def __init__(self, 
762             ctor_args, root_dir, 
763             launch = True, host = None, 
764             port = None, user = None, ident_key = None, agent = None,
765             environment_setup = ""):
766         if launch:
767             # ssh
768             if host != None:
769                 python_code = (
770                     "from %(classmodule)s import %(classname)s;"
771                     "s = %(classname)s%(ctor_args)r;"
772                     "s.run()" 
773                 % dict(
774                     classname = self._ServerClass.__name__,
775                     classmodule = self._ServerClassModule,
776                     ctor_args = ctor_args
777                 ) )
778                 proc = server.popen_ssh_subprocess(python_code, host = host,
779                     port = port, user = user, agent = agent,
780                     ident_key = ident_key,
781                     environment_setup = environment_setup,
782                     waitcommand = True)
783                 if proc.poll():
784                     err = proc.stderr.read()
785                     raise RuntimeError, "Server could not be executed: %s" % (err,)
786             else:
787                 # launch daemon
788                 s = self._ServerClass(*ctor_args)
789                 s.run()
790
791         # connect client to server
792         self._client = server.Client(root_dir, host = host, port = port, 
793                 user = user, agent = agent, 
794                 environment_setup = environment_setup)
795     
796     @staticmethod
797     def _make_message(argtypes, argencoders, command, methname, classname, *args):
798         if len(argtypes) != len(argencoders):
799             raise ValueError, "Invalid arguments for _make_message: "\
800                 "in stub method %s of class %s "\
801                 "argtypes and argencoders must match in size" % (
802                     methname, classname )
803         if len(argtypes) != len(args):
804             raise ValueError, "Invalid arguments for _make_message: "\
805                 "in stub method %s of class %s "\
806                 "expected %d arguments, got %d" % (
807                     methname, classname,
808                     len(argtypes), len(args))
809         
810         buf = []
811         for argnum, (typ, (encode, fmt), val) in enumerate(zip(argtypes, argencoders, args)):
812             try:
813                 buf.append(fmt % encode(val))
814             except:
815                 import traceback
816                 raise TypeError, "Argument %d of stub method %s of class %s "\
817                     "requires a value of type %s, but got %s - nested error: %s" % (
818                         argnum, methname, classname,
819                         getattr(typ, '__name__', typ), type(val),
820                         traceback.format_exc()
821                 )
822         
823         return "%d|%s" % (command, '|'.join(buf))
824     
825     @staticmethod
826     def _parse_reply(rvtype, methname, classname, reply):
827         if not reply:
828             raise RuntimeError, "Invalid reply: %r "\
829                 "for stub method %s of class %s" % (
830                     reply,
831                     methname,
832                     classname)
833         
834         try:
835             result = reply.split("|")
836             code = int(result[0])
837             text = result[1]
838         except:
839             import traceback
840             raise TypeError, "Return value of stub method %s of class %s "\
841                 "cannot be parsed: must be of type %s, got %r - nested error: %s" % (
842                     methname, classname,
843                     getattr(rvtype, '__name__', rvtype), reply,
844                     traceback.format_exc()
845             )
846         if code == ERROR:
847             text = base64.b64decode(text)
848             raise RuntimeError(text)
849         elif code == OK:
850             try:
851                 if rvtype is None:
852                     return
853                 else:
854                     return rvtype(text)
855             except:
856                 import traceback
857                 raise TypeError, "Return value of stub method %s of class %s "\
858                     "cannot be parsed: must be of type %s - nested error: %s" % (
859                         methname, classname,
860                         getattr(rvtype, '__name__', rvtype),
861                         traceback.format_exc()
862                 )
863         else:
864             raise RuntimeError, "Invalid reply: %r "\
865                 "for stub method %s of class %s - unknown code" % (
866                     reply,
867                     methname,
868                     classname)
869     
870     @staticmethod
871     def _make_stubs(server_class, template_class):
872         """
873         Returns a dictionary method_name -> method
874         with stub methods.
875         
876         Usage:
877         
878             class SomeProxy(BaseProxy):
879                ...
880                
881                locals().update( BaseProxy._make_stubs(
882                     ServerClass,
883                     TemplateClass
884                ) )
885         
886         ServerClass is the corresponding Server class, as
887         specified in the _ServerClass class method (_make_stubs
888         is static and can't access the method), and TemplateClass
889         is the ultimate implementation class behind the server,
890         from which argument names and defaults are taken, to
891         maintain meaningful interfaces.
892         """
893         rv = {}
894         
895         class NONE: pass
896         
897         import os.path
898         func_template_path = os.path.join(
899             os.path.dirname(__file__),
900             'proxy_stub.tpl')
901         func_template_file = open(func_template_path, "r")
902         func_template = func_template_file.read()
903         func_template_file.close()
904         
905         for methname in vars(template_class).copy():
906             if methname.endswith('_deferred'):
907                 # cannot wrap deferreds...
908                 continue
909             dmethname = methname+'_deferred'
910             if hasattr(server_class, methname) and not methname.startswith('_'):
911                 template_meth = getattr(template_class, methname)
912                 server_meth = getattr(server_class, methname)
913                 
914                 command = getattr(server_meth, '_handles_command', None)
915                 argtypes = getattr(server_meth, '_argtypes', None)
916                 argencoders = getattr(server_meth, '_argencoders', None)
917                 rvtype = getattr(server_meth, '_retval', None)
918                 doprop = False
919                 
920                 if hasattr(template_meth, 'fget'):
921                     # property getter
922                     template_meth = template_meth.fget
923                     doprop = True
924                 
925                 if command is not None and argtypes is not None and argencoders is not None:
926                     # We have an interface method...
927                     code = template_meth.func_code
928                     argnames = code.co_varnames[:code.co_argcount]
929                     argdefaults = ( (NONE,) * (len(argnames) - len(template_meth.func_defaults or ()))
930                                   + (template_meth.func_defaults or ()) )
931                     
932                     func_globals = dict(
933                         BaseProxy = BaseProxy,
934                         argtypes = argtypes,
935                         argencoders = argencoders,
936                         rvtype = rvtype,
937                         functools = functools,
938                     )
939                     context = dict()
940                     
941                     func_text = func_template % dict(
942                         self = argnames[0],
943                         args = '%s' % (','.join(argnames[1:])),
944                         argdefs = ','.join([
945                             argname if argdef is NONE
946                             else "%s=%r" % (argname, argdef)
947                             for argname, argdef in zip(argnames[1:], argdefaults[1:])
948                         ]),
949                         command = command,
950                         methname = methname,
951                         classname = server_class.__name__
952                     )
953                     
954                     func_text = compile(
955                         func_text,
956                         func_template_path,
957                         'exec')
958                     
959                     exec func_text in func_globals, context
960                     
961                     if doprop:
962                         rv[methname] = property(context[methname])
963                         rv[dmethname] = property(context[dmethname])
964                     else:
965                         rv[methname] = context[methname]
966                         rv[dmethname] = context[dmethname]
967                     
968                     # inject _deferred into core classes
969                     if hasattr(template_class, methname) and not hasattr(template_class, dmethname):
970                         setattr(template_class, dmethname, 
971                             getattr(template_class, methname))
972         
973         return rv
974                         
975 class TestbedControllerProxy(BaseProxy):
976     
977     _ServerClass = TestbedControllerServer
978     
979     def __init__(self, root_dir, log_level, testbed_id = None, 
980             testbed_version = None, launch = True, host = None, 
981             port = None, user = None, ident_key = None, agent = None,
982             environment_setup = ""):
983         if launch and (testbed_id == None or testbed_version == None):
984             raise RuntimeError("To launch a TesbedControllerServer a "
985                     "testbed_id and testbed_version are required")
986         super(TestbedControllerProxy,self).__init__(
987             ctor_args = (root_dir, log_level, testbed_id, testbed_version),
988             root_dir = root_dir,
989             launch = launch, host = host, port = port, user = user,
990             ident_key = ident_key, agent = agent, 
991             environment_setup = environment_setup)
992
993     locals().update( BaseProxy._make_stubs(
994         server_class = TestbedControllerServer,
995         template_class = nepi.core.execute.TestbedController,
996     ) )
997     
998     # Shutdown stops the serverside...
999     def shutdown(self, _stub = shutdown):
1000         rv = _stub(self)
1001         self._client.send_stop()
1002         self._client.read_reply() # wait for it
1003         return rv
1004     
1005
1006 class ExperimentControllerProxy(BaseProxy):
1007     _ServerClass = ExperimentControllerServer
1008     
1009     def __init__(self, root_dir, log_level, experiment_xml = None, 
1010             launch = True, host = None, port = None, user = None, 
1011             ident_key = None, agent = None, environment_setup = ""):
1012         if launch and experiment_xml is None:
1013             raise RuntimeError("To launch a ExperimentControllerServer a \
1014                     xml description of the experiment is required")
1015         super(ExperimentControllerProxy,self).__init__(
1016             ctor_args = (root_dir, log_level, experiment_xml),
1017             root_dir = root_dir,
1018             launch = launch, host = host, port = port, user = user,
1019             ident_key = ident_key, agent = agent, 
1020             environment_setup = environment_setup)
1021
1022     locals().update( BaseProxy._make_stubs(
1023         server_class = ExperimentControllerServer,
1024         template_class = nepi.core.execute.ExperimentController,
1025     ) )
1026
1027     
1028     # Shutdown stops the serverside...
1029     def shutdown(self, _stub = shutdown):
1030         rv = _stub(self)
1031         self._client.send_stop()
1032         self._client.read_reply() # wait for it
1033         return rv
1034