6b6bf3f67a1144cb74cc444e8a91d58a34641d30
[nepi.git] / src / nepi / util / proxy.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 import base64
5 import nepi.core.execute
6 import nepi.util.environ
7 from nepi.core.attributes import AttributesMap, Attribute
8 from nepi.util import server, validation
9 from nepi.util.constants import TIME_NOW, ATTR_NEPI_TESTBED_ENVIRONMENT_SETUP, DeploymentConfiguration as DC
10 import getpass
11 import cPickle
12 import sys
13 import time
14 import tempfile
15 import shutil
16 import functools
17 import os
18
19 # PROTOCOL REPLIES
20 OK = 0
21 ERROR = 1
22
23 # PROTOCOL INSTRUCTION MESSAGES
24 XML = 2 
25 TRACE   = 4
26 FINISHED    = 5
27 START   = 6
28 STOP    = 7
29 SHUTDOWN    = 8
30 CONFIGURE   = 9
31 CREATE      = 10
32 CREATE_SET  = 11
33 FACTORY_SET = 12
34 CONNECT     = 13
35 CROSS_CONNECT   = 14
36 ADD_TRACE   = 15
37 ADD_ADDRESS = 16
38 ADD_ROUTE   = 17
39 DO_SETUP    = 18
40 DO_CREATE   = 19
41 DO_CONNECT_INIT = 20
42 DO_CONFIGURE    = 21
43 DO_CROSS_CONNECT_INIT   = 22
44 GET = 23
45 SET = 24
46 ACTION  = 25
47 STATUS  = 26
48 GUIDS  = 27
49 GET_ROUTE = 28
50 GET_ADDRESS = 29
51 RECOVER = 30
52 DO_PRECONFIGURE     = 31
53 GET_ATTRIBUTE_LIST  = 32
54 DO_CONNECT_COMPL    = 33
55 DO_CROSS_CONNECT_COMPL  = 34
56 TESTBED_ID  = 35
57 TESTBED_VERSION  = 36
58 DO_PRESTART = 37
59 GET_FACTORY_ID = 38
60 GET_TESTBED_ID = 39
61 GET_TESTBED_VERSION = 40
62 TRACES_INFO = 41
63 EXEC_XML = 42
64 TESTBED_STATUS  = 43
65 STARTED_TIME  = 44
66 STOPPED_TIME  = 45
67
68 instruction_text = dict({
69     OK:     "OK",
70     ERROR:  "ERROR",
71     XML:    "XML",
72     EXEC_XML:    "EXEC_XML",
73     TRACE:  "TRACE",
74     FINISHED:   "FINISHED",
75     START:  "START",
76     STOP:   "STOP",
77     RECOVER: "RECOVER",
78     SHUTDOWN:   "SHUTDOWN",
79     CONFIGURE:  "CONFIGURE",
80     CREATE: "CREATE",
81     CREATE_SET: "CREATE_SET",
82     FACTORY_SET:    "FACTORY_SET",
83     CONNECT:    "CONNECT",
84     CROSS_CONNECT: "CROSS_CONNECT",
85     ADD_TRACE:  "ADD_TRACE",
86     ADD_ADDRESS:    "ADD_ADDRESS",
87     ADD_ROUTE:  "ADD_ROUTE",
88     DO_SETUP:   "DO_SETUP",
89     DO_CREATE:  "DO_CREATE",
90     DO_CONNECT_INIT: "DO_CONNECT_INIT",
91     DO_CONNECT_COMPL: "DO_CONNECT_COMPL",
92     DO_CONFIGURE:   "DO_CONFIGURE",
93     DO_PRECONFIGURE:   "DO_PRECONFIGURE",
94     DO_CROSS_CONNECT_INIT:  "DO_CROSS_CONNECT_INIT",
95     DO_CROSS_CONNECT_COMPL: "DO_CROSS_CONNECT_COMPL",
96     GET:    "GET",
97     SET:    "SET",
98     GET_ROUTE: "GET_ROUTE",
99     GET_ADDRESS: "GET_ADDRESS",
100     GET_ATTRIBUTE_LIST: "GET_ATTRIBUTE_LIST",
101     GET_FACTORY_ID: "GET_FACTORY_ID",
102     GET_TESTBED_ID: "GET_TESTBED_ID",
103     GET_TESTBED_VERSION: "GET_TESTBED_VERSION",
104     ACTION: "ACTION",
105     STATUS: "STATUS",
106     GUIDS:  "GUIDS",
107     TESTBED_ID: "TESTBED_ID",
108     TESTBED_VERSION: "TESTBED_VERSION",
109     TRACES_INFO: "TRACES_INFO",
110     STARTED_TIME: "STARTED_TIME",
111     STOPPED_TIME: "STOPPED_TIME",
112
113     })
114
115 def log_msg(server, params):
116     try:
117         instr = int(params[0])
118         instr_txt = instruction_text[instr]
119         server.log_debug("%s - msg: %s [%s]" % (server.__class__.__name__, 
120             instr_txt, ", ".join(map(str, params[1:]))))
121     except:
122         # don't die for logging
123         pass
124
125 def log_reply(server, reply):
126     try:
127         res = reply.split("|")
128         code = int(res[0])
129         code_txt = instruction_text[code]
130         try:
131             txt = base64.b64decode(res[1])
132         except:
133             txt = res[1]
134         server.log_debug("%s - reply: %s %s" % (server.__class__.__name__, 
135                 code_txt, txt))
136     except:
137         # don't die for logging
138         server.log_debug("%s - reply: %s" % (server.__class__.__name__, 
139                 reply))
140         pass
141
142 def to_server_log_level(log_level):
143     return (
144         server.DEBUG_LEVEL
145             if log_level == DC.DEBUG_LEVEL 
146         else server.ERROR_LEVEL
147     )
148
149 def get_access_config_params(access_config):
150     root_dir = access_config.get_attribute_value(DC.ROOT_DIRECTORY)
151     log_level = access_config.get_attribute_value(DC.LOG_LEVEL)
152     log_level = to_server_log_level(log_level)
153     user = host = port = agent = key = None
154     communication = access_config.get_attribute_value(DC.DEPLOYMENT_COMMUNICATION)
155     environment_setup = (
156         access_config.get_attribute_value(DC.DEPLOYMENT_ENVIRONMENT_SETUP)
157         if access_config.has_attribute(DC.DEPLOYMENT_ENVIRONMENT_SETUP)
158         else ""
159     )
160     if communication == DC.ACCESS_SSH:
161         user = access_config.get_attribute_value(DC.DEPLOYMENT_USER)
162         host = access_config.get_attribute_value(DC.DEPLOYMENT_HOST)
163         port = access_config.get_attribute_value(DC.DEPLOYMENT_PORT)
164         agent = access_config.get_attribute_value(DC.USE_AGENT)
165         key = access_config.get_attribute_value(DC.DEPLOYMENT_KEY)
166     return (root_dir, log_level, user, host, port, key, agent, environment_setup)
167
168 class AccessConfiguration(AttributesMap):
169     def __init__(self, params = None):
170         super(AccessConfiguration, self).__init__()
171         
172         from nepi.core.metadata import Metadata
173         
174         for _,attr_info in Metadata.PROXY_ATTRIBUTES.iteritems():
175             self.add_attribute(**attr_info)
176
177         if params:
178             for attr_name, attr_value in params.iteritems():
179                 parser = Attribute.type_parsers[self.get_attribute_type(attr_name)]
180                 attr_value = parser(attr_value)
181                 self.set_attribute_value(attr_name, attr_value)
182
183 class TempDir(object):
184     def __init__(self):
185         self.path = tempfile.mkdtemp()
186     
187     def __del__(self):
188         shutil.rmtree(self.path)
189
190 class PermDir(object):
191     def __init__(self, path):
192         self.path = path
193
194 def create_experiment_controller(xml, access_config = None):
195     mode = None if not access_config \
196             else access_config.get_attribute_value(DC.DEPLOYMENT_MODE)
197     launch = True if not access_config \
198             else not access_config.get_attribute_value(DC.RECOVER)
199     if not mode or mode == DC.MODE_SINGLE_PROCESS:
200         from nepi.core.execute import ExperimentController
201         
202         if not access_config or not access_config.has_attribute(DC.ROOT_DIRECTORY):
203             root_dir = TempDir()
204         else:
205             root_dir = PermDir(access_config.get_attribute_value(DC.ROOT_DIRECTORY))
206         controller = ExperimentController(xml, root_dir.path)
207         
208         # inject reference to temporary dir, so that it gets cleaned
209         # up at destruction time.
210         controller._tempdir = root_dir
211         
212         if not launch:
213             # try to recover
214             controller.recover()
215         
216         return controller
217     elif mode == DC.MODE_DAEMON:
218         (root_dir, log_level, user, host, port, key, agent, environment_setup) = \
219                 get_access_config_params(access_config)
220         try:
221             return ExperimentControllerProxy(root_dir, log_level,
222                 experiment_xml = xml, host = host, port = port, user = user, ident_key = key,
223                 agent = agent, launch = launch,
224                 environment_setup = environment_setup)
225         except:
226             if not launch:
227                 # Maybe controller died, recover from persisted testbed information if possible
228                 controller = ExperimentControllerProxy(root_dir, log_level,
229                     experiment_xml = xml, host = host, port = port, user = user, ident_key = key,
230                     agent = agent, launch = True,
231                     environment_setup = environment_setup)
232                 controller.recover()
233                 return controller
234             else:
235                 raise
236     raise RuntimeError("Unsupported access configuration '%s'" % mode)
237
238 def create_testbed_controller(testbed_id, testbed_version, access_config):
239     mode = None if not access_config \
240             else access_config.get_attribute_value(DC.DEPLOYMENT_MODE)
241     launch = True if not access_config \
242             else not access_config.get_attribute_value(DC.RECOVER)
243     if not mode or mode == DC.MODE_SINGLE_PROCESS:
244         if not launch:
245             raise ValueError, "Unsupported instantiation mode: %s with lanch=False" % (mode,)
246         return  _build_testbed_controller(testbed_id, testbed_version)
247     elif mode == DC.MODE_DAEMON:
248         (root_dir, log_level, user, host, port, key, agent, environment_setup) = \
249                 get_access_config_params(access_config)
250         return TestbedControllerProxy(root_dir, log_level, testbed_id = testbed_id, 
251                 testbed_version = testbed_version, host = host, port = port, ident_key = key,
252                 user = user, agent = agent, launch = launch,
253                 environment_setup = environment_setup)
254     raise RuntimeError("Unsupported access configuration '%s'" % mode)
255
256 def _build_testbed_controller(testbed_id, testbed_version):
257     mod_name = nepi.util.environ.find_testbed(testbed_id)
258     
259     if not mod_name in sys.modules:
260         try:
261             __import__(mod_name)
262         except ImportError:
263             raise ImportError, "Cannot find module %s in %r" % (mod_name, sys.path)
264     
265     module = sys.modules[mod_name]
266     tc = module.TestbedController()
267     if tc.testbed_version != testbed_version:
268         raise RuntimeError("Bad testbed version on testbed %s. Asked for %s, got %s" % \
269                 (testbed_id, testbed_version, tc.testbed_version))
270     return tc
271
272 # Just a namespace class
273 class Marshalling:
274     class Decoders:
275         @staticmethod
276         def pickled_data(sdata):
277             return cPickle.loads(base64.b64decode(sdata))
278         
279         @staticmethod
280         def base64_data(sdata):
281             return base64.b64decode(sdata)
282         
283         @staticmethod
284         def nullint(sdata):
285             return None if sdata == "None" else int(sdata)
286
287         @staticmethod
288         def bool(sdata):
289             return sdata == 'True'
290         
291     class Encoders:
292         @staticmethod
293         def pickled_data(data):
294             return base64.b64encode(cPickle.dumps(data))
295         
296         @staticmethod
297         def base64_data(data):
298             return base64.b64encode(data)
299         
300         @staticmethod
301         def nullint(data):
302             return "None" if data is None else int(data)
303         
304         @staticmethod
305         def bool(data):
306             return str(bool(data))
307            
308     # import into Marshalling all the decoders
309     # they act as types
310     locals().update([
311         (typname, typ)
312         for typname, typ in vars(Decoders).iteritems()
313         if not typname.startswith('_')
314     ])
315
316     _TYPE_ENCODERS = dict([
317         # id(type) -> (<encoding_function>, <formatting_string>)
318         (typname, (getattr(Encoders,typname),"%s"))
319         for typname in vars(Decoders)
320         if not typname.startswith('_')
321            and hasattr(Encoders,typname)
322     ])
323
324     # Builtins
325     _TYPE_ENCODERS["float"] = (float, "%r")
326     _TYPE_ENCODERS["int"] = (int, "%d")
327     _TYPE_ENCODERS["long"] = (int, "%d")
328     _TYPE_ENCODERS["str"] = (str, "%s")
329     _TYPE_ENCODERS["unicode"] = (str, "%s")
330     
331     # Generic encoder
332     _TYPE_ENCODERS[None] = (str, "%s")
333     
334     @staticmethod
335     def args(*types):
336         """
337         Decorator that converts the given function into one that takes
338         a single "params" list, with each parameter marshalled according
339         to the given factory callable (type constructors are accepted).
340         
341         The first argument (self) is left untouched.
342         
343         eg:
344         
345         @Marshalling.args(int,int,str,base64_data)
346         def somefunc(self, someint, otherint, somestr, someb64):
347            return someretval
348         """
349         def decor(f):
350             @functools.wraps(f)
351             def rv(self, params):
352                 return f(self, *[ ctor(val)
353                                   for ctor,val in zip(types, params[1:]) ])
354             
355             rv._argtypes = types
356             
357             # Derive type encoders by looking up types in _TYPE_ENCODERS
358             # make_proxy will use it to encode arguments in command strings
359             argencoders = []
360             TYPE_ENCODERS = Marshalling._TYPE_ENCODERS
361             for typ in types:
362                 if typ.__name__ in TYPE_ENCODERS:
363                     argencoders.append(TYPE_ENCODERS[typ.__name__])
364                 else:
365                     # generic encoder
366                     argencoders.append(TYPE_ENCODERS[None])
367             
368             rv._argencoders = tuple(argencoders)
369             
370             rv._retval = getattr(f, '_retval', None)
371             return rv
372         return decor
373
374     @staticmethod
375     def retval(typ=Decoders.base64_data):
376         """
377         Decorator that converts the given function into one that 
378         returns a properly encoded return string, given that the undecorated
379         function returns suitable input for the encoding function.
380         
381         The optional typ argument specifies a type.
382         For the default of base64_data, return values should be strings.
383         The return value of the encoding method should be a string always.
384         
385         eg:
386         
387         @Marshalling.args(int,int,str,base64_data)
388         @Marshalling.retval(str)
389         def somefunc(self, someint, otherint, somestr, someb64):
390            return someint
391         """
392         encode, fmt = Marshalling._TYPE_ENCODERS.get(
393             typ.__name__,
394             Marshalling._TYPE_ENCODERS[None])
395         fmt = "%d|"+fmt
396         
397         def decor(f):
398             @functools.wraps(f)
399             def rv(self, *p, **kw):
400                 data = f(self, *p, **kw)
401                 return fmt % (
402                     OK,
403                     encode(data)
404                 )
405             rv._retval = typ
406             rv._argtypes = getattr(f, '_argtypes', None)
407             rv._argencoders = getattr(f, '_argencoders', None)
408             return rv
409         return decor
410     
411     @staticmethod
412     def retvoid(f):
413         """
414         Decorator that converts the given function into one that 
415         always return an encoded empty string.
416         
417         Useful for null-returning functions.
418         """
419         OKRV = "%d|" % (OK,)
420         
421         @functools.wraps(f)
422         def rv(self, *p, **kw):
423             f(self, *p, **kw)
424             return OKRV
425         
426         rv._retval = None
427         rv._argtypes = getattr(f, '_argtypes', None)
428         rv._argencoders = getattr(f, '_argencoders', None)
429         return rv
430     
431     @staticmethod
432     def handles(whichcommand):
433         """
434         Associates the method with a given command code for servers.
435         It should always be the topmost decorator.
436         """
437         def decor(f):
438             f._handles_command = whichcommand
439             return f
440         return decor
441
442 class BaseServer(server.Server):
443     def reply_action(self, msg):
444         if not msg:
445             result = base64.b64encode("Invalid command line")
446             reply = "%d|%s" % (ERROR, result)
447         else:
448             params = msg.split("|")
449             instruction = int(params[0])
450             log_msg(self, params)
451             try:
452                 for mname,meth in vars(self.__class__).iteritems():
453                     if not mname.startswith('_'):
454                         cmd = getattr(meth, '_handles_command', None)
455                         if cmd == instruction:
456                             meth = getattr(self, mname)
457                             reply = meth(params)
458                             break
459                 else:
460                     error = "Invalid instruction %s" % instruction
461                     self.log_error(error)
462                     result = base64.b64encode(error)
463                     reply = "%d|%s" % (ERROR, result)
464             except:
465                 error = self.log_error()
466                 result = base64.b64encode(error)
467                 reply = "%d|%s" % (ERROR, result)
468         log_reply(self, reply)
469         return reply
470
471 class TestbedControllerServer(BaseServer):
472     def __init__(self, root_dir, log_level, testbed_id, testbed_version, environment_setup):
473         super(TestbedControllerServer, self).__init__(root_dir, log_level, 
474             environment_setup = environment_setup )
475         self._testbed_id = testbed_id
476         self._testbed_version = testbed_version
477         self._testbed = None
478
479     def post_daemonize(self):
480         self._testbed = _build_testbed_controller(self._testbed_id, 
481                 self._testbed_version)
482
483     @Marshalling.handles(GUIDS)
484     @Marshalling.args()
485     @Marshalling.retval( Marshalling.pickled_data )
486     def guids(self):
487         return self._testbed.guids
488
489     @Marshalling.handles(TESTBED_ID)
490     @Marshalling.args()
491     @Marshalling.retval()
492     def testbed_id(self):
493         return str(self._testbed.testbed_id)
494
495     @Marshalling.handles(TESTBED_VERSION)
496     @Marshalling.args()
497     @Marshalling.retval()
498     def testbed_version(self):
499         return str(self._testbed.testbed_version)
500
501     @Marshalling.handles(CREATE)
502     @Marshalling.args(int, str)
503     @Marshalling.retvoid
504     def defer_create(self, guid, factory_id):
505         self._testbed.defer_create(guid, factory_id)
506
507     @Marshalling.handles(TRACE)
508     @Marshalling.args(int, str, Marshalling.base64_data)
509     @Marshalling.retval()
510     def trace(self, guid, trace_id, attribute):
511         return self._testbed.trace(guid, trace_id, attribute)
512
513     @Marshalling.handles(TRACES_INFO)
514     @Marshalling.args()
515     @Marshalling.retval( Marshalling.pickled_data )
516     def traces_info(self):
517         return self._testbed.traces_info()
518
519     @Marshalling.handles(START)
520     @Marshalling.args()
521     @Marshalling.retvoid
522     def start(self):
523         self._testbed.start()
524
525     @Marshalling.handles(STOP)
526     @Marshalling.args()
527     @Marshalling.retvoid
528     def stop(self):
529         self._testbed.stop()
530
531     @Marshalling.handles(SHUTDOWN)
532     @Marshalling.args()
533     @Marshalling.retvoid
534     def shutdown(self):
535         self._testbed.shutdown()
536
537     @Marshalling.handles(CONFIGURE)
538     @Marshalling.args(Marshalling.base64_data, Marshalling.pickled_data)
539     @Marshalling.retvoid
540     def defer_configure(self, name, value):
541         self._testbed.defer_configure(name, value)
542
543     @Marshalling.handles(CREATE_SET)
544     @Marshalling.args(int, Marshalling.base64_data, Marshalling.pickled_data)
545     @Marshalling.retvoid
546     def defer_create_set(self, guid, name, value):
547         self._testbed.defer_create_set(guid, name, value)
548
549     @Marshalling.handles(FACTORY_SET)
550     @Marshalling.args(Marshalling.base64_data, Marshalling.pickled_data)
551     @Marshalling.retvoid
552     def defer_factory_set(self, name, value):
553         self._testbed.defer_factory_set(name, value)
554
555     @Marshalling.handles(CONNECT)
556     @Marshalling.args(int, str, int, str)
557     @Marshalling.retvoid
558     def defer_connect(self, guid1, connector_type_name1, guid2, connector_type_name2):
559         self._testbed.defer_connect(guid1, connector_type_name1, guid2, 
560             connector_type_name2)
561
562     @Marshalling.handles(CROSS_CONNECT)
563     @Marshalling.args(int, str, int, int, str, str, str)
564     @Marshalling.retvoid
565     def defer_cross_connect(self, 
566             guid, connector_type_name,
567             cross_guid, cross_testbed_guid,
568             cross_testbed_id, cross_factory_id,
569             cross_connector_type_name):
570         self._testbed.defer_cross_connect(guid, connector_type_name, cross_guid, 
571             cross_testbed_guid, cross_testbed_id, cross_factory_id, 
572             cross_connector_type_name)
573
574     @Marshalling.handles(ADD_TRACE)
575     @Marshalling.args(int, str)
576     @Marshalling.retvoid
577     def defer_add_trace(self, guid, trace_id):
578         self._testbed.defer_add_trace(guid, trace_id)
579
580     @Marshalling.handles(ADD_ADDRESS)
581     @Marshalling.args(int, str, int, Marshalling.pickled_data)
582     @Marshalling.retvoid
583     def defer_add_address(self, guid, address, netprefix, broadcast):
584         self._testbed.defer_add_address(guid, address, netprefix,
585                 broadcast)
586
587     @Marshalling.handles(ADD_ROUTE)
588     @Marshalling.args(int, str, int, str, int)
589     @Marshalling.retvoid
590     def defer_add_route(self, guid, destination, netprefix, nexthop, metric):
591         self._testbed.defer_add_route(guid, destination, netprefix, nexthop, metric)
592
593     @Marshalling.handles(DO_SETUP)
594     @Marshalling.args()
595     @Marshalling.retvoid
596     def do_setup(self):
597         self._testbed.do_setup()
598
599     @Marshalling.handles(DO_CREATE)
600     @Marshalling.args()
601     @Marshalling.retvoid
602     def do_create(self):
603         self._testbed.do_create()
604
605     @Marshalling.handles(DO_CONNECT_INIT)
606     @Marshalling.args()
607     @Marshalling.retvoid
608     def do_connect_init(self):
609         self._testbed.do_connect_init()
610
611     @Marshalling.handles(DO_CONNECT_COMPL)
612     @Marshalling.args()
613     @Marshalling.retvoid
614     def do_connect_compl(self):
615         self._testbed.do_connect_compl()
616
617     @Marshalling.handles(DO_CONFIGURE)
618     @Marshalling.args()
619     @Marshalling.retvoid
620     def do_configure(self):
621         self._testbed.do_configure()
622
623     @Marshalling.handles(DO_PRECONFIGURE)
624     @Marshalling.args()
625     @Marshalling.retvoid
626     def do_preconfigure(self):
627         self._testbed.do_preconfigure()
628
629     @Marshalling.handles(DO_PRESTART)
630     @Marshalling.args()
631     @Marshalling.retvoid
632     def do_prestart(self):
633         self._testbed.do_prestart()
634
635     @Marshalling.handles(DO_CROSS_CONNECT_INIT)
636     @Marshalling.args( Marshalling.Decoders.pickled_data )
637     @Marshalling.retvoid
638     def do_cross_connect_init(self, cross_data):
639         self._testbed.do_cross_connect_init(cross_data)
640
641     @Marshalling.handles(DO_CROSS_CONNECT_COMPL)
642     @Marshalling.args( Marshalling.Decoders.pickled_data )
643     @Marshalling.retvoid
644     def do_cross_connect_compl(self, cross_data):
645         self._testbed.do_cross_connect_compl(cross_data)
646
647     @Marshalling.handles(GET)
648     @Marshalling.args(int, Marshalling.base64_data, str)
649     @Marshalling.retval( Marshalling.pickled_data )
650     def get(self, guid, name, time):
651         return self._testbed.get(guid, name, time)
652
653     @Marshalling.handles(SET)
654     @Marshalling.args(int, Marshalling.base64_data, Marshalling.pickled_data, str)
655     @Marshalling.retvoid
656     def set(self, guid, name, value, time):
657         self._testbed.set(guid, name, value, time)
658
659     @Marshalling.handles(GET_ADDRESS)
660     @Marshalling.args(int, int, Marshalling.base64_data)
661     @Marshalling.retval()
662     def get_address(self, guid, index, attribute):
663         return str(self._testbed.get_address(guid, index, attribute))
664
665     @Marshalling.handles(GET_ROUTE)
666     @Marshalling.args(int, int, Marshalling.base64_data)
667     @Marshalling.retval()
668     def get_route(self, guid, index, attribute):
669         return str(self._testbed.get_route(guid, index, attribute))
670
671     @Marshalling.handles(ACTION)
672     @Marshalling.args(str, int, Marshalling.base64_data)
673     @Marshalling.retvoid
674     def action(self, time, guid, command):
675         self._testbed.action(time, guid, command)
676
677     @Marshalling.handles(STATUS)
678     @Marshalling.args(Marshalling.nullint)
679     @Marshalling.retval(int)
680     def status(self, guid):
681         return self._testbed.status(guid)
682
683     @Marshalling.handles(TESTBED_STATUS)
684     @Marshalling.args()
685     @Marshalling.retval(int)
686     def testbed_status(self):
687         return self._testbed.testbed_status()
688
689     @Marshalling.handles(GET_ATTRIBUTE_LIST)
690     @Marshalling.args(int, Marshalling.nullint, Marshalling.bool)
691     @Marshalling.retval( Marshalling.pickled_data )
692     def get_attribute_list(self, guid, filter_flags = None, exclude = False):
693         return self._testbed.get_attribute_list(guid, filter_flags, exclude)
694
695     @Marshalling.handles(GET_FACTORY_ID)
696     @Marshalling.args(int)
697     @Marshalling.retval()
698     def get_factory_id(self, guid):
699         return self._testbed.get_factory_id(guid)
700
701     @Marshalling.handles(RECOVER)
702     @Marshalling.args()
703     @Marshalling.retvoid
704     def recover(self):
705         self._testbed.recover()
706
707
708 class ExperimentControllerServer(BaseServer):
709     def __init__(self, root_dir, log_level, experiment_xml, environment_setup):
710         super(ExperimentControllerServer, self).__init__(root_dir, log_level, 
711             environment_setup = environment_setup )
712         self._experiment_xml = experiment_xml
713         self._experiment = None
714
715     def post_daemonize(self):
716         from nepi.core.execute import ExperimentController
717         self._experiment = ExperimentController(self._experiment_xml, 
718             root_dir = self._root_dir)
719
720     @Marshalling.handles(GUIDS)
721     @Marshalling.args()
722     @Marshalling.retval( Marshalling.pickled_data )
723     def guids(self):
724         return self._experiment.guids
725
726     @Marshalling.handles(STARTED_TIME)
727     @Marshalling.args()
728     @Marshalling.retval( Marshalling.pickled_data )
729     def started_time(self):
730         return self._experiment.started_time
731
732     @Marshalling.handles(STOPPED_TIME)
733     @Marshalling.args()
734     @Marshalling.retval( Marshalling.pickled_data )
735     def stopped_time(self):
736         return self._experiment.stopped_time
737
738     @Marshalling.handles(XML)
739     @Marshalling.args()
740     @Marshalling.retval()
741     def experiment_design_xml(self):
742         return self._experiment.experiment_design_xml
743         
744     @Marshalling.handles(EXEC_XML)
745     @Marshalling.args()
746     @Marshalling.retval()
747     def experiment_execute_xml(self):
748         return self._experiment.experiment_execute_xml
749         
750     @Marshalling.handles(TRACE)
751     @Marshalling.args(int, str, Marshalling.base64_data)
752     @Marshalling.retval()
753     def trace(self, guid, trace_id, attribute):
754         return str(self._experiment.trace(guid, trace_id, attribute))
755
756     @Marshalling.handles(TRACES_INFO)
757     @Marshalling.args()
758     @Marshalling.retval( Marshalling.pickled_data )
759     def traces_info(self):
760         return self._experiment.traces_info()
761
762     @Marshalling.handles(FINISHED)
763     @Marshalling.args(int)
764     @Marshalling.retval(Marshalling.bool)
765     def is_finished(self, guid):
766         return self._experiment.is_finished(guid)
767
768     @Marshalling.handles(STATUS)
769     @Marshalling.args(int)
770     @Marshalling.retval(int)
771     def status(self, guid):
772         return self._experiment.is_finished(guid)
773
774     @Marshalling.handles(GET)
775     @Marshalling.args(int, Marshalling.base64_data, str)
776     @Marshalling.retval( Marshalling.pickled_data )
777     def get(self, guid, name, time):
778         return self._experiment.get(guid, name, time)
779
780     @Marshalling.handles(SET)
781     @Marshalling.args(int, Marshalling.base64_data, Marshalling.pickled_data, str)
782     @Marshalling.retvoid
783     def set(self, guid, name, value, time):
784         self._experiment.set(guid, name, value, time)
785
786     @Marshalling.handles(START)
787     @Marshalling.args()
788     @Marshalling.retvoid
789     def start(self):
790         self._experiment.start()
791
792     @Marshalling.handles(STOP)
793     @Marshalling.args()
794     @Marshalling.retvoid
795     def stop(self):
796         self._experiment.stop()
797
798     @Marshalling.handles(RECOVER)
799     @Marshalling.args()
800     @Marshalling.retvoid
801     def recover(self):
802         self._experiment.recover()
803
804     @Marshalling.handles(SHUTDOWN)
805     @Marshalling.args()
806     @Marshalling.retvoid
807     def shutdown(self):
808         self._experiment.shutdown()
809
810     @Marshalling.handles(GET_TESTBED_ID)
811     @Marshalling.args(int)
812     @Marshalling.retval()
813     def get_testbed_id(self, guid):
814         return self._experiment.get_testbed_id(guid)
815
816     @Marshalling.handles(GET_FACTORY_ID)
817     @Marshalling.args(int)
818     @Marshalling.retval()
819     def get_factory_id(self, guid):
820         return self._experiment.get_factory_id(guid)
821
822     @Marshalling.handles(GET_TESTBED_VERSION)
823     @Marshalling.args(int)
824     @Marshalling.retval()
825     def get_testbed_version(self, guid):
826         return self._experiment.get_testbed_version(guid)
827
828 class BaseProxy(object):
829     _ServerClass = None
830     _ServerClassModule = "nepi.util.proxy"
831     
832     def __init__(self, 
833             ctor_args, root_dir, 
834             launch = True, host = None, 
835             port = None, user = None, ident_key = None, agent = None,
836             environment_setup = ""):
837         if launch:
838             # ssh
839             if host != None:
840                 python_code = (
841                     "from %(classmodule)s import %(classname)s;"
842                     "s = %(classname)s%(ctor_args)r;"
843                     "s.run()" 
844                 % dict(
845                     classname = self._ServerClass.__name__,
846                     classmodule = self._ServerClassModule,
847                     ctor_args = ctor_args
848                 ) )
849                 proc = server.popen_ssh_subprocess(python_code, host = host,
850                     port = port, user = user, agent = agent,
851                     ident_key = ident_key,
852                     environment_setup = environment_setup,
853                     waitcommand = True)
854                 if proc.poll():
855                     err = proc.stderr.read()
856                     raise RuntimeError, "Server could not be executed: %s" % (err,)
857             else:
858                 # launch daemon
859                 s = self._ServerClass(*ctor_args)
860                 s.run()
861
862         # connect client to server
863         self._client = server.Client(root_dir, host = host, port = port, 
864                 user = user, agent = agent, 
865                 environment_setup = environment_setup)
866     
867     @staticmethod
868     def _make_message(argtypes, argencoders, command, methname, classname, *args):
869         if len(argtypes) != len(argencoders):
870             raise ValueError, "Invalid arguments for _make_message: "\
871                 "in stub method %s of class %s "\
872                 "argtypes and argencoders must match in size" % (
873                     methname, classname )
874         if len(argtypes) != len(args):
875             raise ValueError, "Invalid arguments for _make_message: "\
876                 "in stub method %s of class %s "\
877                 "expected %d arguments, got %d" % (
878                     methname, classname,
879                     len(argtypes), len(args))
880         
881         buf = []
882         for argnum, (typ, (encode, fmt), val) in enumerate(zip(argtypes, argencoders, args)):
883             try:
884                 buf.append(fmt % encode(val))
885             except:
886                 import traceback
887                 raise TypeError, "Argument %d of stub method %s of class %s "\
888                     "requires a value of type %s, but got %s - nested error: %s" % (
889                         argnum, methname, classname,
890                         getattr(typ, '__name__', typ), type(val),
891                         traceback.format_exc()
892                 )
893         
894         return "%d|%s" % (command, '|'.join(buf))
895     
896     @staticmethod
897     def _parse_reply(rvtype, methname, classname, reply):
898         if not reply:
899             raise RuntimeError, "Invalid reply: %r "\
900                 "for stub method %s of class %s" % (
901                     reply,
902                     methname,
903                     classname)
904         
905         try:
906             result = reply.split("|")
907             code = int(result[0])
908             text = result[1]
909         except:
910             import traceback
911             raise TypeError, "Return value of stub method %s of class %s "\
912                 "cannot be parsed: must be of type %s, got %r - nested error: %s" % (
913                     methname, classname,
914                     getattr(rvtype, '__name__', rvtype), reply,
915                     traceback.format_exc()
916             )
917         if code == ERROR:
918             text = base64.b64decode(text)
919             raise RuntimeError(text)
920         elif code == OK:
921             try:
922                 if rvtype is None:
923                     return
924                 else:
925                     return rvtype(text)
926             except:
927                 import traceback
928                 raise TypeError, "Return value of stub method %s of class %s "\
929                     "cannot be parsed: must be of type %s - nested error: %s" % (
930                         methname, classname,
931                         getattr(rvtype, '__name__', rvtype),
932                         traceback.format_exc()
933                 )
934         else:
935             raise RuntimeError, "Invalid reply: %r "\
936                 "for stub method %s of class %s - unknown code" % (
937                     reply,
938                     methname,
939                     classname)
940     
941     @staticmethod
942     def _make_stubs(server_class, template_class):
943         """
944         Returns a dictionary method_name -> method
945         with stub methods.
946         
947         Usage:
948         
949             class SomeProxy(BaseProxy):
950                ...
951                
952                locals().update( BaseProxy._make_stubs(
953                     ServerClass,
954                     TemplateClass
955                ) )
956         
957         ServerClass is the corresponding Server class, as
958         specified in the _ServerClass class method (_make_stubs
959         is static and can't access the method), and TemplateClass
960         is the ultimate implementation class behind the server,
961         from which argument names and defaults are taken, to
962         maintain meaningful interfaces.
963         """
964         rv = {}
965         
966         class NONE: pass
967         
968         import os.path
969         func_template_path = os.path.join(
970             os.path.dirname(__file__),
971             'proxy_stub.tpl')
972         func_template_file = open(func_template_path, "r")
973         func_template = func_template_file.read()
974         func_template_file.close()
975         
976         for methname in vars(template_class).copy():
977             if methname.endswith('_deferred'):
978                 # cannot wrap deferreds...
979                 continue
980             dmethname = methname+'_deferred'
981             if hasattr(server_class, methname) and not methname.startswith('_'):
982                 template_meth = getattr(template_class, methname)
983                 server_meth = getattr(server_class, methname)
984                 
985                 command = getattr(server_meth, '_handles_command', None)
986                 argtypes = getattr(server_meth, '_argtypes', None)
987                 argencoders = getattr(server_meth, '_argencoders', None)
988                 rvtype = getattr(server_meth, '_retval', None)
989                 doprop = False
990                 
991                 if hasattr(template_meth, 'fget'):
992                     # property getter
993                     template_meth = template_meth.fget
994                     doprop = True
995                 
996                 if command is not None and argtypes is not None and argencoders is not None:
997                     # We have an interface method...
998                     code = template_meth.func_code
999                     argnames = code.co_varnames[:code.co_argcount]
1000                     argdefaults = ( (NONE,) * (len(argnames) - len(template_meth.func_defaults or ()))
1001                                   + (template_meth.func_defaults or ()) )
1002                     
1003                     func_globals = dict(
1004                         BaseProxy = BaseProxy,
1005                         argtypes = argtypes,
1006                         argencoders = argencoders,
1007                         rvtype = rvtype,
1008                         functools = functools,
1009                     )
1010                     context = dict()
1011                     
1012                     func_text = func_template % dict(
1013                         self = argnames[0],
1014                         args = '%s' % (','.join(argnames[1:])),
1015                         argdefs = ','.join([
1016                             argname if argdef is NONE
1017                             else "%s=%r" % (argname, argdef)
1018                             for argname, argdef in zip(argnames[1:], argdefaults[1:])
1019                         ]),
1020                         command = command,
1021                         methname = methname,
1022                         classname = server_class.__name__
1023                     )
1024                     
1025                     func_text = compile(
1026                         func_text,
1027                         func_template_path,
1028                         'exec')
1029                     
1030                     exec func_text in func_globals, context
1031                     
1032                     if doprop:
1033                         rv[methname] = property(context[methname])
1034                         rv[dmethname] = property(context[dmethname])
1035                     else:
1036                         rv[methname] = context[methname]
1037                         rv[dmethname] = context[dmethname]
1038                     
1039                     # inject _deferred into core classes
1040                     if hasattr(template_class, methname) and not hasattr(template_class, dmethname):
1041                         def freezename(methname, dmethname):
1042                             def dmeth(self, *p, **kw): 
1043                                 return getattr(self, methname)(*p, **kw)
1044                             dmeth.__name__ = dmethname
1045                             return dmeth
1046                         dmeth = freezename(methname, dmethname)
1047                         setattr(template_class, dmethname, dmeth)
1048         
1049         return rv
1050                         
1051 class TestbedControllerProxy(BaseProxy):
1052     
1053     _ServerClass = TestbedControllerServer
1054     
1055     def __init__(self, root_dir, log_level, testbed_id = None, 
1056             testbed_version = None, launch = True, host = None, 
1057             port = None, user = None, ident_key = None, agent = None,
1058             environment_setup = ""):
1059         if launch and (testbed_id == None or testbed_version == None):
1060             raise RuntimeError("To launch a TesbedControllerServer a "
1061                     "testbed_id and testbed_version are required")
1062         super(TestbedControllerProxy,self).__init__(
1063             ctor_args = (root_dir, log_level, testbed_id, testbed_version, environment_setup),
1064             root_dir = root_dir,
1065             launch = launch, host = host, port = port, user = user,
1066             ident_key = ident_key, agent = agent, 
1067             environment_setup = environment_setup)
1068
1069     locals().update( BaseProxy._make_stubs(
1070         server_class = TestbedControllerServer,
1071         template_class = nepi.core.execute.TestbedController,
1072     ) )
1073     
1074     # Shutdown stops the serverside...
1075     def shutdown(self, _stub = shutdown):
1076         rv = _stub(self)
1077         self._client.send_stop()
1078         self._client.read_reply() # wait for it
1079         return rv
1080     
1081
1082 class ExperimentControllerProxy(BaseProxy):
1083     _ServerClass = ExperimentControllerServer
1084     
1085     def __init__(self, root_dir, log_level, experiment_xml = None, 
1086             launch = True, host = None, port = None, user = None, 
1087             ident_key = None, agent = None, environment_setup = ""):
1088         super(ExperimentControllerProxy,self).__init__(
1089             ctor_args = (root_dir, log_level, experiment_xml, environment_setup),
1090             root_dir = root_dir,
1091             launch = launch, host = host, port = port, user = user,
1092             ident_key = ident_key, agent = agent, 
1093             environment_setup = environment_setup)
1094
1095     locals().update( BaseProxy._make_stubs(
1096         server_class = ExperimentControllerServer,
1097         template_class = nepi.core.execute.ExperimentController,
1098     ) )
1099
1100     
1101     # Shutdown stops the serverside...
1102     def shutdown(self, _stub = shutdown):
1103         rv = _stub(self)
1104         self._client.send_stop()
1105         self._client.read_reply() # wait for it
1106         return rv
1107