3a64ecd9d8d7daddbb772fd4dd2d707399b9d9ac
[nepi.git] / src / nepi / util / proxy.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 import base64
5 import nepi.core.execute
6 from nepi.core.attributes import AttributesMap, Attribute
7 from nepi.util import server, validation
8 from nepi.util.constants import TIME_NOW, ATTR_NEPI_TESTBED_ENVIRONMENT_SETUP, DeploymentConfiguration as DC
9 import getpass
10 import cPickle
11 import sys
12 import time
13 import tempfile
14 import shutil
15 import functools
16
17 # PROTOCOL REPLIES
18 OK = 0
19 ERROR = 1
20
21 # PROTOCOL INSTRUCTION MESSAGES
22 XML = 2 
23 TRACE   = 4
24 FINISHED    = 5
25 START   = 6
26 STOP    = 7
27 SHUTDOWN    = 8
28 CONFIGURE   = 9
29 CREATE      = 10
30 CREATE_SET  = 11
31 FACTORY_SET = 12
32 CONNECT     = 13
33 CROSS_CONNECT   = 14
34 ADD_TRACE   = 15
35 ADD_ADDRESS = 16
36 ADD_ROUTE   = 17
37 DO_SETUP    = 18
38 DO_CREATE   = 19
39 DO_CONNECT_INIT = 20
40 DO_CONFIGURE    = 21
41 DO_CROSS_CONNECT_INIT   = 22
42 GET = 23
43 SET = 24
44 ACTION  = 25
45 STATUS  = 26
46 GUIDS  = 27
47 GET_ROUTE = 28
48 GET_ADDRESS = 29
49 RECOVER = 30
50 DO_PRECONFIGURE     = 31
51 GET_ATTRIBUTE_LIST  = 32
52 DO_CONNECT_COMPL    = 33
53 DO_CROSS_CONNECT_COMPL  = 34
54 TESTBED_ID  = 35
55 TESTBED_VERSION  = 36
56 EXPERIMENT_SET = 37
57 EXPERIMENT_GET = 38
58 DO_PRESTART = 39
59 GET_TAGS = 40
60
61 instruction_text = dict({
62     OK:     "OK",
63     ERROR:  "ERROR",
64     XML:    "XML",
65     TRACE:  "TRACE",
66     FINISHED:   "FINISHED",
67     START:  "START",
68     STOP:   "STOP",
69     RECOVER: "RECOVER",
70     SHUTDOWN:   "SHUTDOWN",
71     CONFIGURE:  "CONFIGURE",
72     CREATE: "CREATE",
73     CREATE_SET: "CREATE_SET",
74     FACTORY_SET:    "FACTORY_SET",
75     CONNECT:    "CONNECT",
76     CROSS_CONNECT: "CROSS_CONNECT",
77     ADD_TRACE:  "ADD_TRACE",
78     ADD_ADDRESS:    "ADD_ADDRESS",
79     ADD_ROUTE:  "ADD_ROUTE",
80     DO_SETUP:   "DO_SETUP",
81     DO_CREATE:  "DO_CREATE",
82     DO_CONNECT_INIT: "DO_CONNECT_INIT",
83     DO_CONNECT_COMPL: "DO_CONNECT_COMPL",
84     DO_CONFIGURE:   "DO_CONFIGURE",
85     DO_PRECONFIGURE:   "DO_PRECONFIGURE",
86     DO_CROSS_CONNECT_INIT:  "DO_CROSS_CONNECT_INIT",
87     DO_CROSS_CONNECT_COMPL: "DO_CROSS_CONNECT_COMPL",
88     GET:    "GET",
89     SET:    "SET",
90     GET_ROUTE: "GET_ROUTE",
91     GET_ADDRESS: "GET_ADDRESS",
92     GET_ATTRIBUTE_LIST: "GET_ATTRIBUTE_LIST",
93     ACTION: "ACTION",
94     STATUS: "STATUS",
95     GUIDS:  "GUIDS",
96     TESTBED_ID: "TESTBED_ID",
97     TESTBED_VERSION: "TESTBED_VERSION",
98     EXPERIMENT_SET: "EXPERIMENT_SET",
99     EXPERIMENT_GET: "EXPERIMENT_GET",
100     GET_TAGS: "GET_TAGS",
101     })
102
103 def log_msg(server, params):
104     try:
105         instr = int(params[0])
106         instr_txt = instruction_text[instr]
107         server.log_debug("%s - msg: %s [%s]" % (server.__class__.__name__, 
108             instr_txt, ", ".join(map(str, params[1:]))))
109     except:
110         # don't die for logging
111         pass
112
113 def log_reply(server, reply):
114     try:
115         res = reply.split("|")
116         code = int(res[0])
117         code_txt = instruction_text[code]
118         try:
119             txt = base64.b64decode(res[1])
120         except:
121             txt = res[1]
122         server.log_debug("%s - reply: %s %s" % (server.__class__.__name__, 
123                 code_txt, txt))
124     except:
125         # don't die for logging
126         server.log_debug("%s - reply: %s" % (server.__class__.__name__, 
127                 reply))
128         pass
129
130 def to_server_log_level(log_level):
131     return (
132         server.DEBUG_LEVEL
133             if log_level == DC.DEBUG_LEVEL 
134         else server.ERROR_LEVEL
135     )
136
137 def get_access_config_params(access_config):
138     root_dir = access_config.get_attribute_value(DC.ROOT_DIRECTORY)
139     log_level = access_config.get_attribute_value(DC.LOG_LEVEL)
140     log_level = to_server_log_level(log_level)
141     user = host = port = agent = key = None
142     communication = access_config.get_attribute_value(DC.DEPLOYMENT_COMMUNICATION)
143     environment_setup = (
144         access_config.get_attribute_value(DC.DEPLOYMENT_ENVIRONMENT_SETUP)
145         if access_config.has_attribute(DC.DEPLOYMENT_ENVIRONMENT_SETUP)
146         else None
147     )
148     if communication == DC.ACCESS_SSH:
149         user = access_config.get_attribute_value(DC.DEPLOYMENT_USER)
150         host = access_config.get_attribute_value(DC.DEPLOYMENT_HOST)
151         port = access_config.get_attribute_value(DC.DEPLOYMENT_PORT)
152         agent = access_config.get_attribute_value(DC.USE_AGENT)
153         key = access_config.get_attribute_value(DC.DEPLOYMENT_KEY)
154     return (root_dir, log_level, user, host, port, key, agent, environment_setup)
155
156 class AccessConfiguration(AttributesMap):
157     def __init__(self, params = None):
158         super(AccessConfiguration, self).__init__()
159         
160         from nepi.core.metadata import Metadata
161         
162         for _,attr_info in Metadata.DEPLOYMENT_ATTRIBUTES:
163             self.add_attribute(**attr_info)
164         
165         if params:
166             for attr_name, attr_value in params.iteritems():
167                 parser = Attribute.type_parsers[self.get_attribute_type(attr_name)]
168                 attr_value = parser(attr_value)
169                 self.set_attribute_value(attr_name, attr_value)
170
171 class TempDir(object):
172     def __init__(self):
173         self.path = tempfile.mkdtemp()
174     
175     def __del__(self):
176         shutil.rmtree(self.path)
177
178 class PermDir(object):
179     def __init__(self, path):
180         self.path = path
181
182 def create_controller(xml, access_config = None):
183     mode = None if not access_config \
184             else access_config.get_attribute_value(DC.DEPLOYMENT_MODE)
185     launch = True if not access_config \
186             else not access_config.get_attribute_value(DC.RECOVER)
187     if not mode or mode == DC.MODE_SINGLE_PROCESS:
188         if not launch:
189             raise ValueError, "Unsupported instantiation mode: %s with lanch=False" % (mode,)
190         
191         from nepi.core.execute import ExperimentController
192         
193         if not access_config or not access_config.has_attribute(DC.ROOT_DIRECTORY):
194             root_dir = TempDir()
195         else:
196             root_dir = PermDir(access_config.get_attribute_value(DC.ROOT_DIRECTORY))
197         controller = ExperimentController(xml, root_dir.path)
198         
199         # inject reference to temporary dir, so that it gets cleaned
200         # up at destruction time.
201         controller._tempdir = root_dir
202         
203         return controller
204     elif mode == DC.MODE_DAEMON:
205         (root_dir, log_level, user, host, port, key, agent, environment_setup) = \
206                 get_access_config_params(access_config)
207         return ExperimentControllerProxy(root_dir, log_level,
208                 experiment_xml = xml, host = host, port = port, user = user, ident_key = key,
209                 agent = agent, launch = launch,
210                 environment_setup = environment_setup)
211     raise RuntimeError("Unsupported access configuration '%s'" % mode)
212
213 def create_testbed_controller(testbed_id, testbed_version, access_config):
214     mode = None if not access_config \
215             else access_config.get_attribute_value(DC.DEPLOYMENT_MODE)
216     launch = True if not access_config \
217             else not access_config.get_attribute_value(DC.RECOVER)
218     if not mode or mode == DC.MODE_SINGLE_PROCESS:
219         if not launch:
220             raise ValueError, "Unsupported instantiation mode: %s with lanch=False" % (mode,)
221         return  _build_testbed_controller(testbed_id, testbed_version)
222     elif mode == DC.MODE_DAEMON:
223         (root_dir, log_level, user, host, port, key, agent, environment_setup) = \
224                 get_access_config_params(access_config)
225         return TestbedControllerProxy(root_dir, log_level, testbed_id = testbed_id, 
226                 testbed_version = testbed_version, host = host, port = port, ident_key = key,
227                 user = user, agent = agent, launch = launch,
228                 environment_setup = environment_setup)
229     raise RuntimeError("Unsupported access configuration '%s'" % mode)
230
231 def _build_testbed_controller(testbed_id, testbed_version):
232     mod_name = "nepi.testbeds.%s" % (testbed_id.lower())
233     if not mod_name in sys.modules:
234         __import__(mod_name)
235     module = sys.modules[mod_name]
236     return module.TestbedController(testbed_version)
237
238 # Just a namespace class
239 class Marshalling:
240     class Decoders:
241         @staticmethod
242         def pickled_data(sdata):
243             return cPickle.loads(base64.b64decode(sdata))
244         
245         @staticmethod
246         def base64_data(sdata):
247             return base64.b64decode(sdata)
248         
249         @staticmethod
250         def nullint(sdata):
251             return None if sdata == "None" else int(sdata)
252         
253         @staticmethod
254         def bool(sdata):
255             return sdata == 'True'
256         
257     class Encoders:
258         @staticmethod
259         def pickled_data(data):
260             return base64.b64encode(cPickle.dumps(data))
261         
262         @staticmethod
263         def base64_data(data):
264             return base64.b64encode(data)
265         
266         @staticmethod
267         def nullint(data):
268             return "None" if data is None else int(data)
269         
270         @staticmethod
271         def bool(data):
272             return str(bool(data))
273            
274     # import into Marshalling all the decoders
275     # they act as types
276     locals().update([
277         (typname, typ)
278         for typname, typ in vars(Decoders).iteritems()
279         if not typname.startswith('_')
280     ])
281
282     _TYPE_ENCODERS = dict([
283         # id(type) -> (<encoding_function>, <formatting_string>)
284         (typname, (getattr(Encoders,typname),"%s"))
285         for typname in vars(Decoders)
286         if not typname.startswith('_')
287            and hasattr(Encoders,typname)
288     ])
289
290     # Builtins
291     _TYPE_ENCODERS["float"] = (float, "%r")
292     _TYPE_ENCODERS["int"] = (int, "%d")
293     _TYPE_ENCODERS["long"] = (int, "%d")
294     _TYPE_ENCODERS["str"] = (str, "%s")
295     _TYPE_ENCODERS["unicode"] = (str, "%s")
296     
297     # Generic encoder
298     _TYPE_ENCODERS[None] = (str, "%s")
299     
300     @staticmethod
301     def args(*types):
302         """
303         Decorator that converts the given function into one that takes
304         a single "params" list, with each parameter marshalled according
305         to the given factory callable (type constructors are accepted).
306         
307         The first argument (self) is left untouched.
308         
309         eg:
310         
311         @Marshalling.args(int,int,str,base64_data)
312         def somefunc(self, someint, otherint, somestr, someb64):
313            return someretval
314         """
315         def decor(f):
316             @functools.wraps(f)
317             def rv(self, params):
318                 return f(self, *[ ctor(val)
319                                   for ctor,val in zip(types, params[1:]) ])
320             
321             rv._argtypes = types
322             
323             # Derive type encoders by looking up types in _TYPE_ENCODERS
324             # make_proxy will use it to encode arguments in command strings
325             argencoders = []
326             TYPE_ENCODERS = Marshalling._TYPE_ENCODERS
327             for typ in types:
328                 if typ.__name__ in TYPE_ENCODERS:
329                     argencoders.append(TYPE_ENCODERS[typ.__name__])
330                 else:
331                     # generic encoder
332                     argencoders.append(TYPE_ENCODERS[None])
333             
334             rv._argencoders = tuple(argencoders)
335             
336             rv._retval = getattr(f, '_retval', None)
337             return rv
338         return decor
339
340     @staticmethod
341     def retval(typ=Decoders.base64_data):
342         """
343         Decorator that converts the given function into one that 
344         returns a properly encoded return string, given that the undecorated
345         function returns suitable input for the encoding function.
346         
347         The optional typ argument specifies a type.
348         For the default of base64_data, return values should be strings.
349         The return value of the encoding method should be a string always.
350         
351         eg:
352         
353         @Marshalling.args(int,int,str,base64_data)
354         @Marshalling.retval(str)
355         def somefunc(self, someint, otherint, somestr, someb64):
356            return someint
357         """
358         encode, fmt = Marshalling._TYPE_ENCODERS.get(
359             typ.__name__,
360             Marshalling._TYPE_ENCODERS[None])
361         fmt = "%d|"+fmt
362         
363         def decor(f):
364             @functools.wraps(f)
365             def rv(self, *p, **kw):
366                 data = f(self, *p, **kw)
367                 return fmt % (
368                     OK,
369                     encode(data)
370                 )
371             rv._retval = typ
372             rv._argtypes = getattr(f, '_argtypes', None)
373             rv._argencoders = getattr(f, '_argencoders', None)
374             return rv
375         return decor
376     
377     @staticmethod
378     def retvoid(f):
379         """
380         Decorator that converts the given function into one that 
381         always return an encoded empty string.
382         
383         Useful for null-returning functions.
384         """
385         OKRV = "%d|" % (OK,)
386         
387         @functools.wraps(f)
388         def rv(self, *p, **kw):
389             f(self, *p, **kw)
390             return OKRV
391         
392         rv._retval = None
393         rv._argtypes = getattr(f, '_argtypes', None)
394         rv._argencoders = getattr(f, '_argencoders', None)
395         return rv
396     
397     @staticmethod
398     def handles(whichcommand):
399         """
400         Associates the method with a given command code for servers.
401         It should always be the topmost decorator.
402         """
403         def decor(f):
404             f._handles_command = whichcommand
405             return f
406         return decor
407
408 class BaseServer(server.Server):
409     def reply_action(self, msg):
410         if not msg:
411             result = base64.b64encode("Invalid command line")
412             reply = "%d|%s" % (ERROR, result)
413         else:
414             params = msg.split("|")
415             instruction = int(params[0])
416             log_msg(self, params)
417             try:
418                 for mname,meth in vars(self.__class__).iteritems():
419                     if not mname.startswith('_'):
420                         cmd = getattr(meth, '_handles_command', None)
421                         if cmd == instruction:
422                             meth = getattr(self, mname)
423                             reply = meth(params)
424                             break
425                 else:
426                     error = "Invalid instruction %s" % instruction
427                     self.log_error(error)
428                     result = base64.b64encode(error)
429                     reply = "%d|%s" % (ERROR, result)
430             except:
431                 error = self.log_error()
432                 result = base64.b64encode(error)
433                 reply = "%d|%s" % (ERROR, result)
434         log_reply(self, reply)
435         return reply
436
437 class TestbedControllerServer(BaseServer):
438     def __init__(self, root_dir, log_level, testbed_id, testbed_version):
439         super(TestbedControllerServer, self).__init__(root_dir, log_level)
440         self._testbed_id = testbed_id
441         self._testbed_version = testbed_version
442         self._testbed = None
443
444     def post_daemonize(self):
445         self._testbed = _build_testbed_controller(self._testbed_id, 
446                 self._testbed_version)
447
448     @Marshalling.handles(GUIDS)
449     @Marshalling.args()
450     @Marshalling.retval( Marshalling.pickled_data )
451     def guids(self):
452         return self._testbed.guids
453
454     @Marshalling.handles(TESTBED_ID)
455     @Marshalling.args()
456     @Marshalling.retval()
457     def testbed_id(self):
458         return str(self._testbed.testbed_id)
459
460     @Marshalling.handles(TESTBED_VERSION)
461     @Marshalling.args()
462     @Marshalling.retval()
463     def testbed_version(self):
464         return str(self._testbed.testbed_version)
465
466     @Marshalling.handles(CREATE)
467     @Marshalling.args(int, str)
468     @Marshalling.retvoid
469     def defer_create(self, guid, factory_id):
470         self._testbed.defer_create(guid, factory_id)
471
472     @Marshalling.handles(TRACE)
473     @Marshalling.args(int, str, Marshalling.base64_data)
474     @Marshalling.retval()
475     def trace(self, guid, trace_id, attribute):
476         return self._testbed.trace(guid, trace_id, attribute)
477
478     @Marshalling.handles(START)
479     @Marshalling.args()
480     @Marshalling.retvoid
481     def start(self):
482         self._testbed.start()
483
484     @Marshalling.handles(STOP)
485     @Marshalling.args()
486     @Marshalling.retvoid
487     def stop(self):
488         self._testbed.stop()
489
490     @Marshalling.handles(SHUTDOWN)
491     @Marshalling.args()
492     @Marshalling.retvoid
493     def shutdown(self):
494         self._testbed.shutdown()
495
496     @Marshalling.handles(CONFIGURE)
497     @Marshalling.args(Marshalling.base64_data, Marshalling.pickled_data)
498     @Marshalling.retvoid
499     def defer_configure(self, name, value):
500         self._testbed.defer_configure(name, value)
501
502     @Marshalling.handles(CREATE_SET)
503     @Marshalling.args(int, Marshalling.base64_data, Marshalling.pickled_data)
504     @Marshalling.retvoid
505     def defer_create_set(self, guid, name, value):
506         self._testbed.defer_create_set(guid, name, value)
507
508     @Marshalling.handles(FACTORY_SET)
509     @Marshalling.args(Marshalling.base64_data, Marshalling.pickled_data)
510     @Marshalling.retvoid
511     def defer_factory_set(self, name, value):
512         self._testbed.defer_factory_set(name, value)
513
514     @Marshalling.handles(CONNECT)
515     @Marshalling.args(int, str, int, str)
516     @Marshalling.retvoid
517     def defer_connect(self, guid1, connector_type_name1, guid2, connector_type_name2):
518         self._testbed.defer_connect(guid1, connector_type_name1, guid2, 
519             connector_type_name2)
520
521     @Marshalling.handles(CROSS_CONNECT)
522     @Marshalling.args(int, str, int, int, str, str, str)
523     @Marshalling.retvoid
524     def defer_cross_connect(self, 
525             guid, connector_type_name,
526             cross_guid, cross_testbed_guid,
527             cross_testbed_id, cross_factory_id,
528             cross_connector_type_name):
529         self._testbed.defer_cross_connect(guid, connector_type_name, cross_guid, 
530             cross_testbed_guid, cross_testbed_id, cross_factory_id, 
531             cross_connector_type_name)
532
533     @Marshalling.handles(ADD_TRACE)
534     @Marshalling.args(int, str)
535     @Marshalling.retvoid
536     def defer_add_trace(self, guid, trace_id):
537         self._testbed.defer_add_trace(guid, trace_id)
538
539     @Marshalling.handles(ADD_ADDRESS)
540     @Marshalling.args(int, str, int, str)
541     @Marshalling.retvoid
542     def defer_add_address(self, guid, address, netprefix, broadcast):
543         self._testbed.defer_add_address(guid, address, netprefix,
544                 broadcast)
545
546     @Marshalling.handles(ADD_ROUTE)
547     @Marshalling.args(int, str, int, str)
548     @Marshalling.retvoid
549     def defer_add_route(self, guid, destination, netprefix, nexthop):
550         self._testbed.defer_add_route(guid, destination, netprefix, nexthop)
551
552     @Marshalling.handles(DO_SETUP)
553     @Marshalling.args()
554     @Marshalling.retvoid
555     def do_setup(self):
556         self._testbed.do_setup()
557
558     @Marshalling.handles(DO_CREATE)
559     @Marshalling.args()
560     @Marshalling.retvoid
561     def do_create(self):
562         self._testbed.do_create()
563
564     @Marshalling.handles(DO_CONNECT_INIT)
565     @Marshalling.args()
566     @Marshalling.retvoid
567     def do_connect_init(self):
568         self._testbed.do_connect_init()
569
570     @Marshalling.handles(DO_CONNECT_COMPL)
571     @Marshalling.args()
572     @Marshalling.retvoid
573     def do_connect_compl(self):
574         self._testbed.do_connect_compl()
575
576     @Marshalling.handles(DO_CONFIGURE)
577     @Marshalling.args()
578     @Marshalling.retvoid
579     def do_configure(self):
580         self._testbed.do_configure()
581
582     @Marshalling.handles(DO_PRECONFIGURE)
583     @Marshalling.args()
584     @Marshalling.retvoid
585     def do_preconfigure(self):
586         self._testbed.do_preconfigure()
587
588     @Marshalling.handles(DO_PRESTART)
589     @Marshalling.args()
590     @Marshalling.retvoid
591     def do_prestart(self):
592         self._testbed.do_prestart()
593
594     @Marshalling.handles(DO_CROSS_CONNECT_INIT)
595     @Marshalling.args( Marshalling.Decoders.pickled_data )
596     @Marshalling.retvoid
597     def do_cross_connect_init(self, cross_data):
598         self._testbed.do_cross_connect_init(cross_data)
599
600     @Marshalling.handles(DO_CROSS_CONNECT_COMPL)
601     @Marshalling.args( Marshalling.Decoders.pickled_data )
602     @Marshalling.retvoid
603     def do_cross_connect_compl(self, cross_data):
604         self._testbed.do_cross_connect_compl(cross_data)
605
606     @Marshalling.handles(GET)
607     @Marshalling.args(int, Marshalling.base64_data, str)
608     @Marshalling.retval( Marshalling.pickled_data )
609     def get(self, guid, name, time):
610         return self._testbed.get(guid, name, time)
611
612     @Marshalling.handles(SET)
613     @Marshalling.args(int, Marshalling.base64_data, Marshalling.pickled_data, str)
614     @Marshalling.retvoid
615     def set(self, guid, name, value, time):
616         self._testbed.set(guid, name, value, time)
617
618     @Marshalling.handles(GET_ADDRESS)
619     @Marshalling.args(int, int, Marshalling.base64_data)
620     @Marshalling.retval()
621     def get_address(self, guid, index, attribute):
622         return str(self._testbed.get_address(guid, index, attribute))
623
624     @Marshalling.handles(GET_ROUTE)
625     @Marshalling.args(int, int, Marshalling.base64_data)
626     @Marshalling.retval()
627     def get_route(self, guid, index, attribute):
628         return str(self._testbed.get_route(guid, index, attribute))
629
630     @Marshalling.handles(ACTION)
631     @Marshalling.args(str, int, Marshalling.base64_data)
632     @Marshalling.retvoid
633     def action(self, time, guid, command):
634         self._testbed.action(time, guid, command)
635
636     @Marshalling.handles(STATUS)
637     @Marshalling.args(Marshalling.nullint)
638     @Marshalling.retval(int)
639     def status(self, guid):
640         return self._testbed.status(guid)
641
642     @Marshalling.handles(GET_ATTRIBUTE_LIST)
643     @Marshalling.args(int)
644     @Marshalling.retval( Marshalling.pickled_data )
645     def get_attribute_list(self, guid):
646         return self._testbed.get_attribute_list(guid)
647
648     @Marshalling.handles(GET_TAGS)
649     @Marshalling.args(int)
650     @Marshalling.retval( Marshalling.pickled_data )
651     def get_tags(self, guid):
652         return self._testbed.get_tags(guid)
653
654 class ExperimentControllerServer(BaseServer):
655     def __init__(self, root_dir, log_level, experiment_xml):
656         super(ExperimentControllerServer, self).__init__(root_dir, log_level)
657         self._experiment_xml = experiment_xml
658         self._controller = None
659
660     def post_daemonize(self):
661         from nepi.core.execute import ExperimentController
662         self._controller = ExperimentController(self._experiment_xml, 
663             root_dir = self._root_dir)
664
665     @Marshalling.handles(XML)
666     @Marshalling.args()
667     @Marshalling.retval()
668     def experiment_xml(self):
669         return self._controller.experiment_xml
670         
671     @Marshalling.handles(TRACE)
672     @Marshalling.args(int, int, str, Marshalling.base64_data)
673     @Marshalling.retval()
674     def trace(self, testbed_guid, guid, trace_id, attribute):
675         return str(self._controller.trace(testbed_guid, guid, trace_id, attribute))
676
677     @Marshalling.handles(FINISHED)
678     @Marshalling.args(int)
679     @Marshalling.retval(Marshalling.bool)
680     def is_finished(self, guid):
681         return self._controller.is_finished(guid)
682
683     @Marshalling.handles(EXPERIMENT_GET)
684     @Marshalling.args(int, int, Marshalling.base64_data, str)
685     @Marshalling.retval( Marshalling.pickled_data )
686     def get(self, testbed_guid, guid, name, time):
687         return self._controller.get(testbed_guid, guid, name, time)
688
689     @Marshalling.handles(EXPERIMENT_SET)
690     @Marshalling.args(int, int, Marshalling.base64_data, Marshalling.pickled_data, str)
691     @Marshalling.retvoid
692     def set(self, testbed_guid, guid, name, value, time):
693         self._controller.set(testbed_guid, guid, name, value, time)
694
695     @Marshalling.handles(GET_TAGS)
696     @Marshalling.args(int, int)
697     @Marshalling.retval( Marshalling.pickled_data )
698     def get_tags(self, testbed_guid, guid):
699         return self._controller.get_tags(testbed_guid, guid)
700
701     @Marshalling.handles(START)
702     @Marshalling.args()
703     @Marshalling.retvoid
704     def start(self):
705         self._controller.start()
706
707     @Marshalling.handles(STOP)
708     @Marshalling.args()
709     @Marshalling.retvoid
710     def stop(self):
711         self._controller.stop()
712
713     @Marshalling.handles(RECOVER)
714     @Marshalling.args()
715     @Marshalling.retvoid
716     def recover(self):
717         self._controller.recover()
718
719     @Marshalling.handles(SHUTDOWN)
720     @Marshalling.args()
721     @Marshalling.retvoid
722     def shutdown(self):
723         self._controller.shutdown()
724
725 class BaseProxy(object):
726     _ServerClass = None
727     _ServerClassModule = "nepi.util.proxy"
728     
729     def __init__(self, 
730             ctor_args, root_dir, 
731             launch = True, host = None, 
732             port = None, user = None, ident_key = None, agent = None,
733             environment_setup = ""):
734         if launch:
735             # ssh
736             if host != None:
737                 python_code = (
738                     "from %(classmodule)s import %(classname)s;"
739                     "s = %(classname)s%(ctor_args)r;"
740                     "s.run()" 
741                 % dict(
742                     classname = self._ServerClass.__name__,
743                     classmodule = self._ServerClassModule,
744                     ctor_args = ctor_args
745                 ) )
746                 proc = server.popen_ssh_subprocess(python_code, host = host,
747                     port = port, user = user, agent = agent,
748                     ident_key = ident_key,
749                     environment_setup = environment_setup,
750                     waitcommand = True)
751                 if proc.poll():
752                     err = proc.stderr.read()
753                     raise RuntimeError, "Server could not be executed: %s" % (err,)
754             else:
755                 # launch daemon
756                 s = self._ServerClass(*ctor_args)
757                 s.run()
758
759         # connect client to server
760         self._client = server.Client(root_dir, host = host, port = port, 
761                 user = user, agent = agent, 
762                 environment_setup = environment_setup)
763     
764     @staticmethod
765     def _make_message(argtypes, argencoders, command, methname, classname, *args):
766         if len(argtypes) != len(argencoders):
767             raise ValueError, "Invalid arguments for _make_message: "\
768                 "in stub method %s of class %s "\
769                 "argtypes and argencoders must match in size" % (
770                     methname, classname )
771         if len(argtypes) != len(args):
772             raise ValueError, "Invalid arguments for _make_message: "\
773                 "in stub method %s of class %s "\
774                 "expected %d arguments, got %d" % (
775                     methname, classname,
776                     len(argtypes), len(args))
777         
778         buf = []
779         for argnum, (typ, (encode, fmt), val) in enumerate(zip(argtypes, argencoders, args)):
780             try:
781                 buf.append(fmt % encode(val))
782             except:
783                 import traceback
784                 raise TypeError, "Argument %d of stub method %s of class %s "\
785                     "requires a value of type %s, but got %s - nested error: %s" % (
786                         argnum, methname, classname,
787                         getattr(typ, '__name__', typ), type(val),
788                         traceback.format_exc()
789                 )
790         
791         return "%d|%s" % (command, '|'.join(buf))
792     
793     @staticmethod
794     def _parse_reply(rvtype, methname, classname, reply):
795         if not reply:
796             raise RuntimeError, "Invalid reply: %r "\
797                 "for stub method %s of class %s" % (
798                     reply,
799                     methname,
800                     classname)
801         
802         try:
803             result = reply.split("|")
804             code = int(result[0])
805             text = result[1]
806         except:
807             import traceback
808             raise TypeError, "Return value of stub method %s of class %s "\
809                 "cannot be parsed: must be of type %s, got %r - nested error: %s" % (
810                     methname, classname,
811                     getattr(rvtype, '__name__', rvtype), reply,
812                     traceback.format_exc()
813             )
814         if code == ERROR:
815             text = base64.b64decode(text)
816             raise RuntimeError(text)
817         elif code == OK:
818             try:
819                 if rvtype is None:
820                     return
821                 else:
822                     return rvtype(text)
823             except:
824                 import traceback
825                 raise TypeError, "Return value of stub method %s of class %s "\
826                     "cannot be parsed: must be of type %s - nested error: %s" % (
827                         methname, classname,
828                         getattr(rvtype, '__name__', rvtype),
829                         traceback.format_exc()
830                 )
831         else:
832             raise RuntimeError, "Invalid reply: %r "\
833                 "for stub method %s of class %s - unknown code" % (
834                     reply,
835                     methname,
836                     classname)
837     
838     @staticmethod
839     def _make_stubs(server_class, template_class):
840         """
841         Returns a dictionary method_name -> method
842         with stub methods.
843         
844         Usage:
845         
846             class SomeProxy(BaseProxy):
847                ...
848                
849                locals().update( BaseProxy._make_stubs(
850                     ServerClass,
851                     TemplateClass
852                ) )
853         
854         ServerClass is the corresponding Server class, as
855         specified in the _ServerClass class method (_make_stubs
856         is static and can't access the method), and TemplateClass
857         is the ultimate implementation class behind the server,
858         from which argument names and defaults are taken, to
859         maintain meaningful interfaces.
860         """
861         rv = {}
862         
863         class NONE: pass
864         
865         import os.path
866         func_template_path = os.path.join(
867             os.path.dirname(__file__),
868             'proxy_stub.tpl')
869         func_template_file = open(func_template_path, "r")
870         func_template = func_template_file.read()
871         func_template_file.close()
872         
873         for methname in vars(template_class):
874             if hasattr(server_class, methname) and not methname.startswith('_'):
875                 template_meth = getattr(template_class, methname)
876                 server_meth = getattr(server_class, methname)
877                 
878                 command = getattr(server_meth, '_handles_command', None)
879                 argtypes = getattr(server_meth, '_argtypes', None)
880                 argencoders = getattr(server_meth, '_argencoders', None)
881                 rvtype = getattr(server_meth, '_retval', None)
882                 doprop = False
883                 
884                 if hasattr(template_meth, 'fget'):
885                     # property getter
886                     template_meth = template_meth.fget
887                     doprop = True
888                 
889                 if command is not None and argtypes is not None and argencoders is not None:
890                     # We have an interface method...
891                     code = template_meth.func_code
892                     argnames = code.co_varnames[:code.co_argcount]
893                     argdefaults = ( (NONE,) * (len(argnames) - len(template_meth.func_defaults or ()))
894                                   + (template_meth.func_defaults or ()) )
895                     
896                     func_globals = dict(
897                         BaseProxy = BaseProxy,
898                         argtypes = argtypes,
899                         argencoders = argencoders,
900                         rvtype = rvtype,
901                     )
902                     context = dict()
903                     
904                     func_text = func_template % dict(
905                         self = argnames[0],
906                         args = '%s' % (','.join(argnames[1:])),
907                         argdefs = ','.join([
908                             argname if argdef is NONE
909                             else "%s=%r" % (argname, argdef)
910                             for argname, argdef in zip(argnames[1:], argdefaults[1:])
911                         ]),
912                         command = command,
913                         methname = methname,
914                         classname = server_class.__name__
915                     )
916                     
917                     func_text = compile(
918                         func_text,
919                         func_template_path,
920                         'exec')
921                     
922                     exec func_text in func_globals, context
923                     
924                     if doprop:
925                         rv[methname] = property(context[methname])
926                     else:
927                         rv[methname] = context[methname]
928         
929         return rv
930                         
931 class TestbedControllerProxy(BaseProxy):
932     
933     _ServerClass = TestbedControllerServer
934     
935     def __init__(self, root_dir, log_level, testbed_id = None, 
936             testbed_version = None, launch = True, host = None, 
937             port = None, user = None, ident_key = None, agent = None,
938             environment_setup = ""):
939         if launch and (testbed_id == None or testbed_version == None):
940             raise RuntimeError("To launch a TesbedControllerServer a "
941                     "testbed_id and testbed_version are required")
942         super(TestbedControllerProxy,self).__init__(
943             ctor_args = (root_dir, log_level, testbed_id, testbed_version),
944             root_dir = root_dir,
945             launch = launch, host = host, port = port, user = user,
946             ident_key = ident_key, agent = agent, 
947             environment_setup = environment_setup)
948
949     locals().update( BaseProxy._make_stubs(
950         server_class = TestbedControllerServer,
951         template_class = nepi.core.execute.TestbedController,
952     ) )
953     
954     # Shutdown stops the serverside...
955     def shutdown(self, _stub = shutdown):
956         rv = _stub(self)
957         self._client.send_stop()
958         self._client.read_reply() # wait for it
959         return rv
960     
961
962 class ExperimentControllerProxy(BaseProxy):
963     _ServerClass = ExperimentControllerServer
964     
965     def __init__(self, root_dir, log_level, experiment_xml = None, 
966             launch = True, host = None, port = None, user = None, 
967             ident_key = None, agent = None, environment_setup = ""):
968         if launch and experiment_xml is None:
969             raise RuntimeError("To launch a ExperimentControllerServer a \
970                     xml description of the experiment is required")
971         super(ExperimentControllerProxy,self).__init__(
972             ctor_args = (root_dir, log_level, experiment_xml),
973             root_dir = root_dir,
974             launch = launch, host = host, port = port, user = user,
975             ident_key = ident_key, agent = agent, 
976             environment_setup = environment_setup)
977
978     locals().update( BaseProxy._make_stubs(
979         server_class = ExperimentControllerServer,
980         template_class = nepi.core.execute.ExperimentController,
981     ) )
982
983     
984     # Shutdown stops the serverside...
985     def shutdown(self, _stub = shutdown):
986         rv = _stub(self)
987         self._client.send_stop()
988         self._client.read_reply() # wait for it
989         return rv
990