Added metods to obtain factory_id, testbed_id and testbed_version for a box using...
[nepi.git] / src / nepi / util / proxy.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 import base64
5 import nepi.core.execute
6 from nepi.core.attributes import AttributesMap, Attribute
7 from nepi.util import server, validation
8 from nepi.util.constants import TIME_NOW, ATTR_NEPI_TESTBED_ENVIRONMENT_SETUP, DeploymentConfiguration as DC
9 import getpass
10 import cPickle
11 import sys
12 import time
13 import tempfile
14 import shutil
15 import functools
16
17 # PROTOCOL REPLIES
18 OK = 0
19 ERROR = 1
20
21 # PROTOCOL INSTRUCTION MESSAGES
22 XML = 2 
23 TRACE   = 4
24 FINISHED    = 5
25 START   = 6
26 STOP    = 7
27 SHUTDOWN    = 8
28 CONFIGURE   = 9
29 CREATE      = 10
30 CREATE_SET  = 11
31 FACTORY_SET = 12
32 CONNECT     = 13
33 CROSS_CONNECT   = 14
34 ADD_TRACE   = 15
35 ADD_ADDRESS = 16
36 ADD_ROUTE   = 17
37 DO_SETUP    = 18
38 DO_CREATE   = 19
39 DO_CONNECT_INIT = 20
40 DO_CONFIGURE    = 21
41 DO_CROSS_CONNECT_INIT   = 22
42 GET = 23
43 SET = 24
44 ACTION  = 25
45 STATUS  = 26
46 GUIDS  = 27
47 GET_ROUTE = 28
48 GET_ADDRESS = 29
49 RECOVER = 30
50 DO_PRECONFIGURE     = 31
51 GET_ATTRIBUTE_LIST  = 32
52 DO_CONNECT_COMPL    = 33
53 DO_CROSS_CONNECT_COMPL  = 34
54 TESTBED_ID  = 35
55 TESTBED_VERSION  = 36
56 DO_PRESTART = 37
57 GET_FACTORY_ID = 38
58 GET_TESTBED_ID = 39
59 GET_TESTBED_VERSION = 40
60
61 instruction_text = dict({
62     OK:     "OK",
63     ERROR:  "ERROR",
64     XML:    "XML",
65     TRACE:  "TRACE",
66     FINISHED:   "FINISHED",
67     START:  "START",
68     STOP:   "STOP",
69     RECOVER: "RECOVER",
70     SHUTDOWN:   "SHUTDOWN",
71     CONFIGURE:  "CONFIGURE",
72     CREATE: "CREATE",
73     CREATE_SET: "CREATE_SET",
74     FACTORY_SET:    "FACTORY_SET",
75     CONNECT:    "CONNECT",
76     CROSS_CONNECT: "CROSS_CONNECT",
77     ADD_TRACE:  "ADD_TRACE",
78     ADD_ADDRESS:    "ADD_ADDRESS",
79     ADD_ROUTE:  "ADD_ROUTE",
80     DO_SETUP:   "DO_SETUP",
81     DO_CREATE:  "DO_CREATE",
82     DO_CONNECT_INIT: "DO_CONNECT_INIT",
83     DO_CONNECT_COMPL: "DO_CONNECT_COMPL",
84     DO_CONFIGURE:   "DO_CONFIGURE",
85     DO_PRECONFIGURE:   "DO_PRECONFIGURE",
86     DO_CROSS_CONNECT_INIT:  "DO_CROSS_CONNECT_INIT",
87     DO_CROSS_CONNECT_COMPL: "DO_CROSS_CONNECT_COMPL",
88     GET:    "GET",
89     SET:    "SET",
90     GET_ROUTE: "GET_ROUTE",
91     GET_ADDRESS: "GET_ADDRESS",
92     GET_ATTRIBUTE_LIST: "GET_ATTRIBUTE_LIST",
93     GET_FACTORY_ID: "GET_FACTORY_ID",
94     GET_TESTBED_ID: "GET_TESTBED_ID",
95     GET_TESTBED_VERSION: "GET_TESTBED_VERSION",
96     ACTION: "ACTION",
97     STATUS: "STATUS",
98     GUIDS:  "GUIDS",
99     TESTBED_ID: "TESTBED_ID",
100     TESTBED_VERSION: "TESTBED_VERSION",
101     })
102
103 def log_msg(server, params):
104     try:
105         instr = int(params[0])
106         instr_txt = instruction_text[instr]
107         server.log_debug("%s - msg: %s [%s]" % (server.__class__.__name__, 
108             instr_txt, ", ".join(map(str, params[1:]))))
109     except:
110         # don't die for logging
111         pass
112
113 def log_reply(server, reply):
114     try:
115         res = reply.split("|")
116         code = int(res[0])
117         code_txt = instruction_text[code]
118         try:
119             txt = base64.b64decode(res[1])
120         except:
121             txt = res[1]
122         server.log_debug("%s - reply: %s %s" % (server.__class__.__name__, 
123                 code_txt, txt))
124     except:
125         # don't die for logging
126         server.log_debug("%s - reply: %s" % (server.__class__.__name__, 
127                 reply))
128         pass
129
130 def to_server_log_level(log_level):
131     return (
132         server.DEBUG_LEVEL
133             if log_level == DC.DEBUG_LEVEL 
134         else server.ERROR_LEVEL
135     )
136
137 def get_access_config_params(access_config):
138     root_dir = access_config.get_attribute_value(DC.ROOT_DIRECTORY)
139     log_level = access_config.get_attribute_value(DC.LOG_LEVEL)
140     log_level = to_server_log_level(log_level)
141     user = host = port = agent = key = None
142     communication = access_config.get_attribute_value(DC.DEPLOYMENT_COMMUNICATION)
143     environment_setup = (
144         access_config.get_attribute_value(DC.DEPLOYMENT_ENVIRONMENT_SETUP)
145         if access_config.has_attribute(DC.DEPLOYMENT_ENVIRONMENT_SETUP)
146         else None
147     )
148     if communication == DC.ACCESS_SSH:
149         user = access_config.get_attribute_value(DC.DEPLOYMENT_USER)
150         host = access_config.get_attribute_value(DC.DEPLOYMENT_HOST)
151         port = access_config.get_attribute_value(DC.DEPLOYMENT_PORT)
152         agent = access_config.get_attribute_value(DC.USE_AGENT)
153         key = access_config.get_attribute_value(DC.DEPLOYMENT_KEY)
154     return (root_dir, log_level, user, host, port, key, agent, environment_setup)
155
156 class AccessConfiguration(AttributesMap):
157     def __init__(self, params = None):
158         super(AccessConfiguration, self).__init__()
159         
160         from nepi.core.metadata import Metadata
161         
162         for _,attr_info in Metadata.DEPLOYMENT_ATTRIBUTES:
163             self.add_attribute(**attr_info)
164         
165         if params:
166             for attr_name, attr_value in params.iteritems():
167                 parser = Attribute.type_parsers[self.get_attribute_type(attr_name)]
168                 attr_value = parser(attr_value)
169                 self.set_attribute_value(attr_name, attr_value)
170
171 class TempDir(object):
172     def __init__(self):
173         self.path = tempfile.mkdtemp()
174     
175     def __del__(self):
176         shutil.rmtree(self.path)
177
178 class PermDir(object):
179     def __init__(self, path):
180         self.path = path
181
182 def create_controller(xml, access_config = None):
183     mode = None if not access_config \
184             else access_config.get_attribute_value(DC.DEPLOYMENT_MODE)
185     launch = True if not access_config \
186             else not access_config.get_attribute_value(DC.RECOVER)
187     if not mode or mode == DC.MODE_SINGLE_PROCESS:
188         if not launch:
189             raise ValueError, "Unsupported instantiation mode: %s with lanch=False" % (mode,)
190         
191         from nepi.core.execute import ExperimentController
192         
193         if not access_config or not access_config.has_attribute(DC.ROOT_DIRECTORY):
194             root_dir = TempDir()
195         else:
196             root_dir = PermDir(access_config.get_attribute_value(DC.ROOT_DIRECTORY))
197         controller = ExperimentController(xml, root_dir.path)
198         
199         # inject reference to temporary dir, so that it gets cleaned
200         # up at destruction time.
201         controller._tempdir = root_dir
202         
203         return controller
204     elif mode == DC.MODE_DAEMON:
205         (root_dir, log_level, user, host, port, key, agent, environment_setup) = \
206                 get_access_config_params(access_config)
207         return ExperimentControllerProxy(root_dir, log_level,
208                 experiment_xml = xml, host = host, port = port, user = user, ident_key = key,
209                 agent = agent, launch = launch,
210                 environment_setup = environment_setup)
211     raise RuntimeError("Unsupported access configuration '%s'" % mode)
212
213 def create_testbed_controller(testbed_id, testbed_version, access_config):
214     mode = None if not access_config \
215             else access_config.get_attribute_value(DC.DEPLOYMENT_MODE)
216     launch = True if not access_config \
217             else not access_config.get_attribute_value(DC.RECOVER)
218     if not mode or mode == DC.MODE_SINGLE_PROCESS:
219         if not launch:
220             raise ValueError, "Unsupported instantiation mode: %s with lanch=False" % (mode,)
221         return  _build_testbed_controller(testbed_id, testbed_version)
222     elif mode == DC.MODE_DAEMON:
223         (root_dir, log_level, user, host, port, key, agent, environment_setup) = \
224                 get_access_config_params(access_config)
225         return TestbedControllerProxy(root_dir, log_level, testbed_id = testbed_id, 
226                 testbed_version = testbed_version, host = host, port = port, ident_key = key,
227                 user = user, agent = agent, launch = launch,
228                 environment_setup = environment_setup)
229     raise RuntimeError("Unsupported access configuration '%s'" % mode)
230
231 def _build_testbed_controller(testbed_id, testbed_version):
232     mod_name = "nepi.testbeds.%s" % (testbed_id.lower())
233     if not mod_name in sys.modules:
234         __import__(mod_name)
235     module = sys.modules[mod_name]
236     return module.TestbedController(testbed_version)
237
238 # Just a namespace class
239 class Marshalling:
240     class Decoders:
241         @staticmethod
242         def pickled_data(sdata):
243             return cPickle.loads(base64.b64decode(sdata))
244         
245         @staticmethod
246         def base64_data(sdata):
247             return base64.b64decode(sdata)
248         
249         @staticmethod
250         def nullint(sdata):
251             return None if sdata == "None" else int(sdata)
252         
253         @staticmethod
254         def bool(sdata):
255             return sdata == 'True'
256         
257     class Encoders:
258         @staticmethod
259         def pickled_data(data):
260             return base64.b64encode(cPickle.dumps(data))
261         
262         @staticmethod
263         def base64_data(data):
264             return base64.b64encode(data)
265         
266         @staticmethod
267         def nullint(data):
268             return "None" if data is None else int(data)
269         
270         @staticmethod
271         def bool(data):
272             return str(bool(data))
273            
274     # import into Marshalling all the decoders
275     # they act as types
276     locals().update([
277         (typname, typ)
278         for typname, typ in vars(Decoders).iteritems()
279         if not typname.startswith('_')
280     ])
281
282     _TYPE_ENCODERS = dict([
283         # id(type) -> (<encoding_function>, <formatting_string>)
284         (typname, (getattr(Encoders,typname),"%s"))
285         for typname in vars(Decoders)
286         if not typname.startswith('_')
287            and hasattr(Encoders,typname)
288     ])
289
290     # Builtins
291     _TYPE_ENCODERS["float"] = (float, "%r")
292     _TYPE_ENCODERS["int"] = (int, "%d")
293     _TYPE_ENCODERS["long"] = (int, "%d")
294     _TYPE_ENCODERS["str"] = (str, "%s")
295     _TYPE_ENCODERS["unicode"] = (str, "%s")
296     
297     # Generic encoder
298     _TYPE_ENCODERS[None] = (str, "%s")
299     
300     @staticmethod
301     def args(*types):
302         """
303         Decorator that converts the given function into one that takes
304         a single "params" list, with each parameter marshalled according
305         to the given factory callable (type constructors are accepted).
306         
307         The first argument (self) is left untouched.
308         
309         eg:
310         
311         @Marshalling.args(int,int,str,base64_data)
312         def somefunc(self, someint, otherint, somestr, someb64):
313            return someretval
314         """
315         def decor(f):
316             @functools.wraps(f)
317             def rv(self, params):
318                 return f(self, *[ ctor(val)
319                                   for ctor,val in zip(types, params[1:]) ])
320             
321             rv._argtypes = types
322             
323             # Derive type encoders by looking up types in _TYPE_ENCODERS
324             # make_proxy will use it to encode arguments in command strings
325             argencoders = []
326             TYPE_ENCODERS = Marshalling._TYPE_ENCODERS
327             for typ in types:
328                 if typ.__name__ in TYPE_ENCODERS:
329                     argencoders.append(TYPE_ENCODERS[typ.__name__])
330                 else:
331                     # generic encoder
332                     argencoders.append(TYPE_ENCODERS[None])
333             
334             rv._argencoders = tuple(argencoders)
335             
336             rv._retval = getattr(f, '_retval', None)
337             return rv
338         return decor
339
340     @staticmethod
341     def retval(typ=Decoders.base64_data):
342         """
343         Decorator that converts the given function into one that 
344         returns a properly encoded return string, given that the undecorated
345         function returns suitable input for the encoding function.
346         
347         The optional typ argument specifies a type.
348         For the default of base64_data, return values should be strings.
349         The return value of the encoding method should be a string always.
350         
351         eg:
352         
353         @Marshalling.args(int,int,str,base64_data)
354         @Marshalling.retval(str)
355         def somefunc(self, someint, otherint, somestr, someb64):
356            return someint
357         """
358         encode, fmt = Marshalling._TYPE_ENCODERS.get(
359             typ.__name__,
360             Marshalling._TYPE_ENCODERS[None])
361         fmt = "%d|"+fmt
362         
363         def decor(f):
364             @functools.wraps(f)
365             def rv(self, *p, **kw):
366                 data = f(self, *p, **kw)
367                 return fmt % (
368                     OK,
369                     encode(data)
370                 )
371             rv._retval = typ
372             rv._argtypes = getattr(f, '_argtypes', None)
373             rv._argencoders = getattr(f, '_argencoders', None)
374             return rv
375         return decor
376     
377     @staticmethod
378     def retvoid(f):
379         """
380         Decorator that converts the given function into one that 
381         always return an encoded empty string.
382         
383         Useful for null-returning functions.
384         """
385         OKRV = "%d|" % (OK,)
386         
387         @functools.wraps(f)
388         def rv(self, *p, **kw):
389             f(self, *p, **kw)
390             return OKRV
391         
392         rv._retval = None
393         rv._argtypes = getattr(f, '_argtypes', None)
394         rv._argencoders = getattr(f, '_argencoders', None)
395         return rv
396     
397     @staticmethod
398     def handles(whichcommand):
399         """
400         Associates the method with a given command code for servers.
401         It should always be the topmost decorator.
402         """
403         def decor(f):
404             f._handles_command = whichcommand
405             return f
406         return decor
407
408 class BaseServer(server.Server):
409     def reply_action(self, msg):
410         if not msg:
411             result = base64.b64encode("Invalid command line")
412             reply = "%d|%s" % (ERROR, result)
413         else:
414             params = msg.split("|")
415             instruction = int(params[0])
416             log_msg(self, params)
417             try:
418                 for mname,meth in vars(self.__class__).iteritems():
419                     if not mname.startswith('_'):
420                         cmd = getattr(meth, '_handles_command', None)
421                         if cmd == instruction:
422                             meth = getattr(self, mname)
423                             reply = meth(params)
424                             break
425                 else:
426                     error = "Invalid instruction %s" % instruction
427                     self.log_error(error)
428                     result = base64.b64encode(error)
429                     reply = "%d|%s" % (ERROR, result)
430             except:
431                 error = self.log_error()
432                 result = base64.b64encode(error)
433                 reply = "%d|%s" % (ERROR, result)
434         log_reply(self, reply)
435         return reply
436
437 class TestbedControllerServer(BaseServer):
438     def __init__(self, root_dir, log_level, testbed_id, testbed_version):
439         super(TestbedControllerServer, self).__init__(root_dir, log_level)
440         self._testbed_id = testbed_id
441         self._testbed_version = testbed_version
442         self._testbed = None
443
444     def post_daemonize(self):
445         self._testbed = _build_testbed_controller(self._testbed_id, 
446                 self._testbed_version)
447
448     @Marshalling.handles(GUIDS)
449     @Marshalling.args()
450     @Marshalling.retval( Marshalling.pickled_data )
451     def guids(self):
452         return self._testbed.guids
453
454     @Marshalling.handles(TESTBED_ID)
455     @Marshalling.args()
456     @Marshalling.retval()
457     def testbed_id(self):
458         return str(self._testbed.testbed_id)
459
460     @Marshalling.handles(TESTBED_VERSION)
461     @Marshalling.args()
462     @Marshalling.retval()
463     def testbed_version(self):
464         return str(self._testbed.testbed_version)
465
466     @Marshalling.handles(CREATE)
467     @Marshalling.args(int, str)
468     @Marshalling.retvoid
469     def defer_create(self, guid, factory_id):
470         self._testbed.defer_create(guid, factory_id)
471
472     @Marshalling.handles(TRACE)
473     @Marshalling.args(int, str, Marshalling.base64_data)
474     @Marshalling.retval()
475     def trace(self, guid, trace_id, attribute):
476         return self._testbed.trace(guid, trace_id, attribute)
477
478     @Marshalling.handles(START)
479     @Marshalling.args()
480     @Marshalling.retvoid
481     def start(self):
482         self._testbed.start()
483
484     @Marshalling.handles(STOP)
485     @Marshalling.args()
486     @Marshalling.retvoid
487     def stop(self):
488         self._testbed.stop()
489
490     @Marshalling.handles(SHUTDOWN)
491     @Marshalling.args()
492     @Marshalling.retvoid
493     def shutdown(self):
494         self._testbed.shutdown()
495
496     @Marshalling.handles(CONFIGURE)
497     @Marshalling.args(Marshalling.base64_data, Marshalling.pickled_data)
498     @Marshalling.retvoid
499     def defer_configure(self, name, value):
500         self._testbed.defer_configure(name, value)
501
502     @Marshalling.handles(CREATE_SET)
503     @Marshalling.args(int, Marshalling.base64_data, Marshalling.pickled_data)
504     @Marshalling.retvoid
505     def defer_create_set(self, guid, name, value):
506         self._testbed.defer_create_set(guid, name, value)
507
508     @Marshalling.handles(FACTORY_SET)
509     @Marshalling.args(Marshalling.base64_data, Marshalling.pickled_data)
510     @Marshalling.retvoid
511     def defer_factory_set(self, name, value):
512         self._testbed.defer_factory_set(name, value)
513
514     @Marshalling.handles(CONNECT)
515     @Marshalling.args(int, str, int, str)
516     @Marshalling.retvoid
517     def defer_connect(self, guid1, connector_type_name1, guid2, connector_type_name2):
518         self._testbed.defer_connect(guid1, connector_type_name1, guid2, 
519             connector_type_name2)
520
521     @Marshalling.handles(CROSS_CONNECT)
522     @Marshalling.args(int, str, int, int, str, str, str)
523     @Marshalling.retvoid
524     def defer_cross_connect(self, 
525             guid, connector_type_name,
526             cross_guid, cross_testbed_guid,
527             cross_testbed_id, cross_factory_id,
528             cross_connector_type_name):
529         self._testbed.defer_cross_connect(guid, connector_type_name, cross_guid, 
530             cross_testbed_guid, cross_testbed_id, cross_factory_id, 
531             cross_connector_type_name)
532
533     @Marshalling.handles(ADD_TRACE)
534     @Marshalling.args(int, str)
535     @Marshalling.retvoid
536     def defer_add_trace(self, guid, trace_id):
537         self._testbed.defer_add_trace(guid, trace_id)
538
539     @Marshalling.handles(ADD_ADDRESS)
540     @Marshalling.args(int, str, int, str)
541     @Marshalling.retvoid
542     def defer_add_address(self, guid, address, netprefix, broadcast):
543         self._testbed.defer_add_address(guid, address, netprefix,
544                 broadcast)
545
546     @Marshalling.handles(ADD_ROUTE)
547     @Marshalling.args(int, str, int, str)
548     @Marshalling.retvoid
549     def defer_add_route(self, guid, destination, netprefix, nexthop):
550         self._testbed.defer_add_route(guid, destination, netprefix, nexthop)
551
552     @Marshalling.handles(DO_SETUP)
553     @Marshalling.args()
554     @Marshalling.retvoid
555     def do_setup(self):
556         self._testbed.do_setup()
557
558     @Marshalling.handles(DO_CREATE)
559     @Marshalling.args()
560     @Marshalling.retvoid
561     def do_create(self):
562         self._testbed.do_create()
563
564     @Marshalling.handles(DO_CONNECT_INIT)
565     @Marshalling.args()
566     @Marshalling.retvoid
567     def do_connect_init(self):
568         self._testbed.do_connect_init()
569
570     @Marshalling.handles(DO_CONNECT_COMPL)
571     @Marshalling.args()
572     @Marshalling.retvoid
573     def do_connect_compl(self):
574         self._testbed.do_connect_compl()
575
576     @Marshalling.handles(DO_CONFIGURE)
577     @Marshalling.args()
578     @Marshalling.retvoid
579     def do_configure(self):
580         self._testbed.do_configure()
581
582     @Marshalling.handles(DO_PRECONFIGURE)
583     @Marshalling.args()
584     @Marshalling.retvoid
585     def do_preconfigure(self):
586         self._testbed.do_preconfigure()
587
588     @Marshalling.handles(DO_PRESTART)
589     @Marshalling.args()
590     @Marshalling.retvoid
591     def do_prestart(self):
592         self._testbed.do_prestart()
593
594     @Marshalling.handles(DO_CROSS_CONNECT_INIT)
595     @Marshalling.args( Marshalling.Decoders.pickled_data )
596     @Marshalling.retvoid
597     def do_cross_connect_init(self, cross_data):
598         self._testbed.do_cross_connect_init(cross_data)
599
600     @Marshalling.handles(DO_CROSS_CONNECT_COMPL)
601     @Marshalling.args( Marshalling.Decoders.pickled_data )
602     @Marshalling.retvoid
603     def do_cross_connect_compl(self, cross_data):
604         self._testbed.do_cross_connect_compl(cross_data)
605
606     @Marshalling.handles(GET)
607     @Marshalling.args(int, Marshalling.base64_data, str)
608     @Marshalling.retval( Marshalling.pickled_data )
609     def get(self, guid, name, time):
610         return self._testbed.get(guid, name, time)
611
612     @Marshalling.handles(SET)
613     @Marshalling.args(int, Marshalling.base64_data, Marshalling.pickled_data, str)
614     @Marshalling.retvoid
615     def set(self, guid, name, value, time):
616         self._testbed.set(guid, name, value, time)
617
618     @Marshalling.handles(GET_ADDRESS)
619     @Marshalling.args(int, int, Marshalling.base64_data)
620     @Marshalling.retval()
621     def get_address(self, guid, index, attribute):
622         return str(self._testbed.get_address(guid, index, attribute))
623
624     @Marshalling.handles(GET_ROUTE)
625     @Marshalling.args(int, int, Marshalling.base64_data)
626     @Marshalling.retval()
627     def get_route(self, guid, index, attribute):
628         return str(self._testbed.get_route(guid, index, attribute))
629
630     @Marshalling.handles(ACTION)
631     @Marshalling.args(str, int, Marshalling.base64_data)
632     @Marshalling.retvoid
633     def action(self, time, guid, command):
634         self._testbed.action(time, guid, command)
635
636     @Marshalling.handles(STATUS)
637     @Marshalling.args(Marshalling.nullint)
638     @Marshalling.retval(int)
639     def status(self, guid):
640         return self._testbed.status(guid)
641
642     @Marshalling.handles(GET_ATTRIBUTE_LIST)
643     @Marshalling.args(int)
644     @Marshalling.retval( Marshalling.pickled_data )
645     def get_attribute_list(self, guid):
646         return self._testbed.get_attribute_list(guid)
647
648     @Marshalling.handles(GET_FACTORY_ID)
649     @Marshalling.args(int)
650     @Marshalling.retval()
651     def get_factory_id(self, guid):
652         return self._testbed.get_factory_id(guid)
653
654 class ExperimentControllerServer(BaseServer):
655     def __init__(self, root_dir, log_level, experiment_xml):
656         super(ExperimentControllerServer, self).__init__(root_dir, log_level)
657         self._experiment_xml = experiment_xml
658         self._controller = None
659
660     def post_daemonize(self):
661         from nepi.core.execute import ExperimentController
662         self._controller = ExperimentController(self._experiment_xml, 
663             root_dir = self._root_dir)
664
665     @Marshalling.handles(GUIDS)
666     @Marshalling.args()
667     @Marshalling.retval( Marshalling.pickled_data )
668     def guids(self):
669         return self._controller.guids
670
671     @Marshalling.handles(XML)
672     @Marshalling.args()
673     @Marshalling.retval()
674     def experiment_xml(self):
675         return self._controller.experiment_xml
676         
677     @Marshalling.handles(TRACE)
678     @Marshalling.args(int, str, Marshalling.base64_data)
679     @Marshalling.retval()
680     def trace(self, guid, trace_id, attribute):
681         return str(self._controller.trace(guid, trace_id, attribute))
682
683     @Marshalling.handles(FINISHED)
684     @Marshalling.args(int)
685     @Marshalling.retval(Marshalling.bool)
686     def is_finished(self, guid):
687         return self._controller.is_finished(guid)
688
689     @Marshalling.handles(GET)
690     @Marshalling.args(int, Marshalling.base64_data, str)
691     @Marshalling.retval( Marshalling.pickled_data )
692     def get(self, guid, name, time):
693         return self._controller.get(guid, name, time)
694
695     @Marshalling.handles(SET)
696     @Marshalling.args(int, Marshalling.base64_data, Marshalling.pickled_data, str)
697     @Marshalling.retvoid
698     def set(self, guid, name, value, time):
699         self._controller.set(guid, name, value, time)
700
701     @Marshalling.handles(START)
702     @Marshalling.args()
703     @Marshalling.retvoid
704     def start(self):
705         self._controller.start()
706
707     @Marshalling.handles(STOP)
708     @Marshalling.args()
709     @Marshalling.retvoid
710     def stop(self):
711         self._controller.stop()
712
713     @Marshalling.handles(RECOVER)
714     @Marshalling.args()
715     @Marshalling.retvoid
716     def recover(self):
717         self._controller.recover()
718
719     @Marshalling.handles(SHUTDOWN)
720     @Marshalling.args()
721     @Marshalling.retvoid
722     def shutdown(self):
723         self._controller.shutdown()
724
725     @Marshalling.handles(GET_TESTBED_ID)
726     @Marshalling.args(int)
727     @Marshalling.retval()
728     def get_testbed_id(self, guid):
729         return self._controller.get_testbed_id(guid)
730
731     @Marshalling.handles(GET_FACTORY_ID)
732     @Marshalling.args(int)
733     @Marshalling.retval()
734     def get_factory_id(self, guid):
735         return self._controller.get_factory_id(guid)
736
737     @Marshalling.handles(GET_TESTBED_VERSION)
738     @Marshalling.args(int)
739     @Marshalling.retval()
740     def get_testbed_version(self, guid):
741         return self._controller.get_testbed_version(guid)
742
743 class BaseProxy(object):
744     _ServerClass = None
745     _ServerClassModule = "nepi.util.proxy"
746     
747     def __init__(self, 
748             ctor_args, root_dir, 
749             launch = True, host = None, 
750             port = None, user = None, ident_key = None, agent = None,
751             environment_setup = ""):
752         if launch:
753             # ssh
754             if host != None:
755                 python_code = (
756                     "from %(classmodule)s import %(classname)s;"
757                     "s = %(classname)s%(ctor_args)r;"
758                     "s.run()" 
759                 % dict(
760                     classname = self._ServerClass.__name__,
761                     classmodule = self._ServerClassModule,
762                     ctor_args = ctor_args
763                 ) )
764                 proc = server.popen_ssh_subprocess(python_code, host = host,
765                     port = port, user = user, agent = agent,
766                     ident_key = ident_key,
767                     environment_setup = environment_setup,
768                     waitcommand = True)
769                 if proc.poll():
770                     err = proc.stderr.read()
771                     raise RuntimeError, "Server could not be executed: %s" % (err,)
772             else:
773                 # launch daemon
774                 s = self._ServerClass(*ctor_args)
775                 s.run()
776
777         # connect client to server
778         self._client = server.Client(root_dir, host = host, port = port, 
779                 user = user, agent = agent, 
780                 environment_setup = environment_setup)
781     
782     @staticmethod
783     def _make_message(argtypes, argencoders, command, methname, classname, *args):
784         if len(argtypes) != len(argencoders):
785             raise ValueError, "Invalid arguments for _make_message: "\
786                 "in stub method %s of class %s "\
787                 "argtypes and argencoders must match in size" % (
788                     methname, classname )
789         if len(argtypes) != len(args):
790             raise ValueError, "Invalid arguments for _make_message: "\
791                 "in stub method %s of class %s "\
792                 "expected %d arguments, got %d" % (
793                     methname, classname,
794                     len(argtypes), len(args))
795         
796         buf = []
797         for argnum, (typ, (encode, fmt), val) in enumerate(zip(argtypes, argencoders, args)):
798             try:
799                 buf.append(fmt % encode(val))
800             except:
801                 import traceback
802                 raise TypeError, "Argument %d of stub method %s of class %s "\
803                     "requires a value of type %s, but got %s - nested error: %s" % (
804                         argnum, methname, classname,
805                         getattr(typ, '__name__', typ), type(val),
806                         traceback.format_exc()
807                 )
808         
809         return "%d|%s" % (command, '|'.join(buf))
810     
811     @staticmethod
812     def _parse_reply(rvtype, methname, classname, reply):
813         if not reply:
814             raise RuntimeError, "Invalid reply: %r "\
815                 "for stub method %s of class %s" % (
816                     reply,
817                     methname,
818                     classname)
819         
820         try:
821             result = reply.split("|")
822             code = int(result[0])
823             text = result[1]
824         except:
825             import traceback
826             raise TypeError, "Return value of stub method %s of class %s "\
827                 "cannot be parsed: must be of type %s, got %r - nested error: %s" % (
828                     methname, classname,
829                     getattr(rvtype, '__name__', rvtype), reply,
830                     traceback.format_exc()
831             )
832         if code == ERROR:
833             text = base64.b64decode(text)
834             raise RuntimeError(text)
835         elif code == OK:
836             try:
837                 if rvtype is None:
838                     return
839                 else:
840                     return rvtype(text)
841             except:
842                 import traceback
843                 raise TypeError, "Return value of stub method %s of class %s "\
844                     "cannot be parsed: must be of type %s - nested error: %s" % (
845                         methname, classname,
846                         getattr(rvtype, '__name__', rvtype),
847                         traceback.format_exc()
848                 )
849         else:
850             raise RuntimeError, "Invalid reply: %r "\
851                 "for stub method %s of class %s - unknown code" % (
852                     reply,
853                     methname,
854                     classname)
855     
856     @staticmethod
857     def _make_stubs(server_class, template_class):
858         """
859         Returns a dictionary method_name -> method
860         with stub methods.
861         
862         Usage:
863         
864             class SomeProxy(BaseProxy):
865                ...
866                
867                locals().update( BaseProxy._make_stubs(
868                     ServerClass,
869                     TemplateClass
870                ) )
871         
872         ServerClass is the corresponding Server class, as
873         specified in the _ServerClass class method (_make_stubs
874         is static and can't access the method), and TemplateClass
875         is the ultimate implementation class behind the server,
876         from which argument names and defaults are taken, to
877         maintain meaningful interfaces.
878         """
879         rv = {}
880         
881         class NONE: pass
882         
883         import os.path
884         func_template_path = os.path.join(
885             os.path.dirname(__file__),
886             'proxy_stub.tpl')
887         func_template_file = open(func_template_path, "r")
888         func_template = func_template_file.read()
889         func_template_file.close()
890         
891         for methname in vars(template_class):
892             if hasattr(server_class, methname) and not methname.startswith('_'):
893                 template_meth = getattr(template_class, methname)
894                 server_meth = getattr(server_class, methname)
895                 
896                 command = getattr(server_meth, '_handles_command', None)
897                 argtypes = getattr(server_meth, '_argtypes', None)
898                 argencoders = getattr(server_meth, '_argencoders', None)
899                 rvtype = getattr(server_meth, '_retval', None)
900                 doprop = False
901                 
902                 if hasattr(template_meth, 'fget'):
903                     # property getter
904                     template_meth = template_meth.fget
905                     doprop = True
906                 
907                 if command is not None and argtypes is not None and argencoders is not None:
908                     # We have an interface method...
909                     code = template_meth.func_code
910                     argnames = code.co_varnames[:code.co_argcount]
911                     argdefaults = ( (NONE,) * (len(argnames) - len(template_meth.func_defaults or ()))
912                                   + (template_meth.func_defaults or ()) )
913                     
914                     func_globals = dict(
915                         BaseProxy = BaseProxy,
916                         argtypes = argtypes,
917                         argencoders = argencoders,
918                         rvtype = rvtype,
919                     )
920                     context = dict()
921                     
922                     func_text = func_template % dict(
923                         self = argnames[0],
924                         args = '%s' % (','.join(argnames[1:])),
925                         argdefs = ','.join([
926                             argname if argdef is NONE
927                             else "%s=%r" % (argname, argdef)
928                             for argname, argdef in zip(argnames[1:], argdefaults[1:])
929                         ]),
930                         command = command,
931                         methname = methname,
932                         classname = server_class.__name__
933                     )
934                     
935                     func_text = compile(
936                         func_text,
937                         func_template_path,
938                         'exec')
939                     
940                     exec func_text in func_globals, context
941                     
942                     if doprop:
943                         rv[methname] = property(context[methname])
944                     else:
945                         rv[methname] = context[methname]
946         
947         return rv
948                         
949 class TestbedControllerProxy(BaseProxy):
950     
951     _ServerClass = TestbedControllerServer
952     
953     def __init__(self, root_dir, log_level, testbed_id = None, 
954             testbed_version = None, launch = True, host = None, 
955             port = None, user = None, ident_key = None, agent = None,
956             environment_setup = ""):
957         if launch and (testbed_id == None or testbed_version == None):
958             raise RuntimeError("To launch a TesbedControllerServer a "
959                     "testbed_id and testbed_version are required")
960         super(TestbedControllerProxy,self).__init__(
961             ctor_args = (root_dir, log_level, testbed_id, testbed_version),
962             root_dir = root_dir,
963             launch = launch, host = host, port = port, user = user,
964             ident_key = ident_key, agent = agent, 
965             environment_setup = environment_setup)
966
967     locals().update( BaseProxy._make_stubs(
968         server_class = TestbedControllerServer,
969         template_class = nepi.core.execute.TestbedController,
970     ) )
971     
972     # Shutdown stops the serverside...
973     def shutdown(self, _stub = shutdown):
974         rv = _stub(self)
975         self._client.send_stop()
976         self._client.read_reply() # wait for it
977         return rv
978     
979
980 class ExperimentControllerProxy(BaseProxy):
981     _ServerClass = ExperimentControllerServer
982     
983     def __init__(self, root_dir, log_level, experiment_xml = None, 
984             launch = True, host = None, port = None, user = None, 
985             ident_key = None, agent = None, environment_setup = ""):
986         if launch and experiment_xml is None:
987             raise RuntimeError("To launch a ExperimentControllerServer a \
988                     xml description of the experiment is required")
989         super(ExperimentControllerProxy,self).__init__(
990             ctor_args = (root_dir, log_level, experiment_xml),
991             root_dir = root_dir,
992             launch = launch, host = host, port = port, user = user,
993             ident_key = ident_key, agent = agent, 
994             environment_setup = environment_setup)
995
996     locals().update( BaseProxy._make_stubs(
997         server_class = ExperimentControllerServer,
998         template_class = nepi.core.execute.ExperimentController,
999     ) )
1000
1001     
1002     # Shutdown stops the serverside...
1003     def shutdown(self, _stub = shutdown):
1004         rv = _stub(self)
1005         self._client.send_stop()
1006         self._client.read_reply() # wait for it
1007         return rv
1008