14f928e57e1f10b596402ae539692af8a8542a29
[nepi.git] / src / nepi / util / proxy.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 import base64
5 import nepi.core.execute
6 import nepi.util.environ
7 from nepi.core.attributes import AttributesMap, Attribute
8 from nepi.util import server, validation
9 from nepi.util.constants import TIME_NOW, ATTR_NEPI_TESTBED_ENVIRONMENT_SETUP, DeploymentConfiguration as DC
10 import getpass
11 import cPickle
12 import sys
13 import time
14 import tempfile
15 import shutil
16 import functools
17 import os
18
19 # PROTOCOL REPLIES
20 OK = 0
21 ERROR = 1
22
23 # PROTOCOL INSTRUCTION MESSAGES
24 XML = 2 
25 TRACE   = 4
26 FINISHED    = 5
27 START   = 6
28 STOP    = 7
29 SHUTDOWN    = 8
30 CONFIGURE   = 9
31 CREATE      = 10
32 CREATE_SET  = 11
33 FACTORY_SET = 12
34 CONNECT     = 13
35 CROSS_CONNECT   = 14
36 ADD_TRACE   = 15
37 ADD_ADDRESS = 16
38 ADD_ROUTE   = 17
39 DO_SETUP    = 18
40 DO_CREATE   = 19
41 DO_CONNECT_INIT = 20
42 DO_CONFIGURE    = 21
43 DO_CROSS_CONNECT_INIT   = 22
44 GET = 23
45 SET = 24
46 ACTION  = 25
47 STATUS  = 26
48 GUIDS  = 27
49 GET_ROUTE = 28
50 GET_ADDRESS = 29
51 RECOVER = 30
52 DO_PRECONFIGURE     = 31
53 GET_ATTRIBUTE_LIST  = 32
54 DO_CONNECT_COMPL    = 33
55 DO_CROSS_CONNECT_COMPL  = 34
56 TESTBED_ID  = 35
57 TESTBED_VERSION  = 36
58 DO_PRESTART = 37
59 GET_FACTORY_ID = 38
60 GET_TESTBED_ID = 39
61 GET_TESTBED_VERSION = 40
62 TRACES_INFO = 41
63 EXEC_XML = 42
64
65 instruction_text = dict({
66     OK:     "OK",
67     ERROR:  "ERROR",
68     XML:    "XML",
69     EXEC_XML:    "EXEC_XML",
70     TRACE:  "TRACE",
71     FINISHED:   "FINISHED",
72     START:  "START",
73     STOP:   "STOP",
74     RECOVER: "RECOVER",
75     SHUTDOWN:   "SHUTDOWN",
76     CONFIGURE:  "CONFIGURE",
77     CREATE: "CREATE",
78     CREATE_SET: "CREATE_SET",
79     FACTORY_SET:    "FACTORY_SET",
80     CONNECT:    "CONNECT",
81     CROSS_CONNECT: "CROSS_CONNECT",
82     ADD_TRACE:  "ADD_TRACE",
83     ADD_ADDRESS:    "ADD_ADDRESS",
84     ADD_ROUTE:  "ADD_ROUTE",
85     DO_SETUP:   "DO_SETUP",
86     DO_CREATE:  "DO_CREATE",
87     DO_CONNECT_INIT: "DO_CONNECT_INIT",
88     DO_CONNECT_COMPL: "DO_CONNECT_COMPL",
89     DO_CONFIGURE:   "DO_CONFIGURE",
90     DO_PRECONFIGURE:   "DO_PRECONFIGURE",
91     DO_CROSS_CONNECT_INIT:  "DO_CROSS_CONNECT_INIT",
92     DO_CROSS_CONNECT_COMPL: "DO_CROSS_CONNECT_COMPL",
93     GET:    "GET",
94     SET:    "SET",
95     GET_ROUTE: "GET_ROUTE",
96     GET_ADDRESS: "GET_ADDRESS",
97     GET_ATTRIBUTE_LIST: "GET_ATTRIBUTE_LIST",
98     GET_FACTORY_ID: "GET_FACTORY_ID",
99     GET_TESTBED_ID: "GET_TESTBED_ID",
100     GET_TESTBED_VERSION: "GET_TESTBED_VERSION",
101     ACTION: "ACTION",
102     STATUS: "STATUS",
103     GUIDS:  "GUIDS",
104     TESTBED_ID: "TESTBED_ID",
105     TESTBED_VERSION: "TESTBED_VERSION",
106     TRACES_INFO: "TRACES_INFO",
107     })
108
109 def log_msg(server, params):
110     try:
111         instr = int(params[0])
112         instr_txt = instruction_text[instr]
113         server.log_debug("%s - msg: %s [%s]" % (server.__class__.__name__, 
114             instr_txt, ", ".join(map(str, params[1:]))))
115     except:
116         # don't die for logging
117         pass
118
119 def log_reply(server, reply):
120     try:
121         res = reply.split("|")
122         code = int(res[0])
123         code_txt = instruction_text[code]
124         try:
125             txt = base64.b64decode(res[1])
126         except:
127             txt = res[1]
128         server.log_debug("%s - reply: %s %s" % (server.__class__.__name__, 
129                 code_txt, txt))
130     except:
131         # don't die for logging
132         server.log_debug("%s - reply: %s" % (server.__class__.__name__, 
133                 reply))
134         pass
135
136 def to_server_log_level(log_level):
137     return (
138         server.DEBUG_LEVEL
139             if log_level == DC.DEBUG_LEVEL 
140         else server.ERROR_LEVEL
141     )
142
143 def get_access_config_params(access_config):
144     root_dir = access_config.get_attribute_value(DC.ROOT_DIRECTORY)
145     log_level = access_config.get_attribute_value(DC.LOG_LEVEL)
146     log_level = to_server_log_level(log_level)
147     user = host = port = agent = key = None
148     communication = access_config.get_attribute_value(DC.DEPLOYMENT_COMMUNICATION)
149     environment_setup = (
150         access_config.get_attribute_value(DC.DEPLOYMENT_ENVIRONMENT_SETUP)
151         if access_config.has_attribute(DC.DEPLOYMENT_ENVIRONMENT_SETUP)
152         else ""
153     )
154     if communication == DC.ACCESS_SSH:
155         user = access_config.get_attribute_value(DC.DEPLOYMENT_USER)
156         host = access_config.get_attribute_value(DC.DEPLOYMENT_HOST)
157         port = access_config.get_attribute_value(DC.DEPLOYMENT_PORT)
158         agent = access_config.get_attribute_value(DC.USE_AGENT)
159         key = access_config.get_attribute_value(DC.DEPLOYMENT_KEY)
160     return (root_dir, log_level, user, host, port, key, agent, environment_setup)
161
162 class AccessConfiguration(AttributesMap):
163     def __init__(self, params = None):
164         super(AccessConfiguration, self).__init__()
165         
166         from nepi.core.metadata import Metadata
167         
168         for _,attr_info in Metadata.DEPLOYMENT_ATTRIBUTES.iteritems():
169             self.add_attribute(**attr_info)
170         
171         if params:
172             for attr_name, attr_value in params.iteritems():
173                 parser = Attribute.type_parsers[self.get_attribute_type(attr_name)]
174                 attr_value = parser(attr_value)
175                 self.set_attribute_value(attr_name, attr_value)
176
177 class TempDir(object):
178     def __init__(self):
179         self.path = tempfile.mkdtemp()
180     
181     def __del__(self):
182         shutil.rmtree(self.path)
183
184 class PermDir(object):
185     def __init__(self, path):
186         self.path = path
187
188 def create_experiment_controller(xml, access_config = None):
189     mode = None if not access_config \
190             else access_config.get_attribute_value(DC.DEPLOYMENT_MODE)
191     launch = True if not access_config \
192             else not access_config.get_attribute_value(DC.RECOVER)
193     if not mode or mode == DC.MODE_SINGLE_PROCESS:
194         from nepi.core.execute import ExperimentController
195         
196         if not access_config or not access_config.has_attribute(DC.ROOT_DIRECTORY):
197             root_dir = TempDir()
198         else:
199             root_dir = PermDir(access_config.get_attribute_value(DC.ROOT_DIRECTORY))
200         controller = ExperimentController(xml, root_dir.path)
201         
202         # inject reference to temporary dir, so that it gets cleaned
203         # up at destruction time.
204         controller._tempdir = root_dir
205         
206         if not launch:
207             # try to recover
208             controller.recover()
209         
210         return controller
211     elif mode == DC.MODE_DAEMON:
212         (root_dir, log_level, user, host, port, key, agent, environment_setup) = \
213                 get_access_config_params(access_config)
214         try:
215             return ExperimentControllerProxy(root_dir, log_level,
216                 experiment_xml = xml, host = host, port = port, user = user, ident_key = key,
217                 agent = agent, launch = launch,
218                 environment_setup = environment_setup)
219         except:
220             if not launch:
221                 # Maybe controller died, recover from persisted testbed information if possible
222                 controller = ExperimentControllerProxy(root_dir, log_level,
223                     experiment_xml = xml, host = host, port = port, user = user, ident_key = key,
224                     agent = agent, launch = True,
225                     environment_setup = environment_setup)
226                 controller.recover()
227                 return controller
228             else:
229                 raise
230     raise RuntimeError("Unsupported access configuration '%s'" % mode)
231
232 def create_testbed_controller(testbed_id, testbed_version, access_config):
233     mode = None if not access_config \
234             else access_config.get_attribute_value(DC.DEPLOYMENT_MODE)
235     launch = True if not access_config \
236             else not access_config.get_attribute_value(DC.RECOVER)
237     if not mode or mode == DC.MODE_SINGLE_PROCESS:
238         if not launch:
239             raise ValueError, "Unsupported instantiation mode: %s with lanch=False" % (mode,)
240         return  _build_testbed_controller(testbed_id, testbed_version)
241     elif mode == DC.MODE_DAEMON:
242         (root_dir, log_level, user, host, port, key, agent, environment_setup) = \
243                 get_access_config_params(access_config)
244         return TestbedControllerProxy(root_dir, log_level, testbed_id = testbed_id, 
245                 testbed_version = testbed_version, host = host, port = port, ident_key = key,
246                 user = user, agent = agent, launch = launch,
247                 environment_setup = environment_setup)
248     raise RuntimeError("Unsupported access configuration '%s'" % mode)
249
250 def _build_testbed_controller(testbed_id, testbed_version):
251     mod_name = nepi.util.environ.find_testbed(testbed_id)
252     
253     if not mod_name in sys.modules:
254         try:
255             __import__(mod_name)
256         except ImportError:
257             raise ImportError, "Cannot find module %s in %r" % (mod_name, sys.path)
258     
259     module = sys.modules[mod_name]
260     tc = module.TestbedController()
261     if tc.testbed_version != testbed_version:
262         raise RuntimeError("Bad testbed version on testbed %s. Asked for %s, got %s" % \
263                 (testbed_id, testbed_version, tc.testbed_version))
264     return tc
265
266 # Just a namespace class
267 class Marshalling:
268     class Decoders:
269         @staticmethod
270         def pickled_data(sdata):
271             return cPickle.loads(base64.b64decode(sdata))
272         
273         @staticmethod
274         def base64_data(sdata):
275             return base64.b64decode(sdata)
276         
277         @staticmethod
278         def nullint(sdata):
279             return None if sdata == "None" else int(sdata)
280         
281         @staticmethod
282         def bool(sdata):
283             return sdata == 'True'
284         
285     class Encoders:
286         @staticmethod
287         def pickled_data(data):
288             return base64.b64encode(cPickle.dumps(data))
289         
290         @staticmethod
291         def base64_data(data):
292             return base64.b64encode(data)
293         
294         @staticmethod
295         def nullint(data):
296             return "None" if data is None else int(data)
297         
298         @staticmethod
299         def bool(data):
300             return str(bool(data))
301            
302     # import into Marshalling all the decoders
303     # they act as types
304     locals().update([
305         (typname, typ)
306         for typname, typ in vars(Decoders).iteritems()
307         if not typname.startswith('_')
308     ])
309
310     _TYPE_ENCODERS = dict([
311         # id(type) -> (<encoding_function>, <formatting_string>)
312         (typname, (getattr(Encoders,typname),"%s"))
313         for typname in vars(Decoders)
314         if not typname.startswith('_')
315            and hasattr(Encoders,typname)
316     ])
317
318     # Builtins
319     _TYPE_ENCODERS["float"] = (float, "%r")
320     _TYPE_ENCODERS["int"] = (int, "%d")
321     _TYPE_ENCODERS["long"] = (int, "%d")
322     _TYPE_ENCODERS["str"] = (str, "%s")
323     _TYPE_ENCODERS["unicode"] = (str, "%s")
324     
325     # Generic encoder
326     _TYPE_ENCODERS[None] = (str, "%s")
327     
328     @staticmethod
329     def args(*types):
330         """
331         Decorator that converts the given function into one that takes
332         a single "params" list, with each parameter marshalled according
333         to the given factory callable (type constructors are accepted).
334         
335         The first argument (self) is left untouched.
336         
337         eg:
338         
339         @Marshalling.args(int,int,str,base64_data)
340         def somefunc(self, someint, otherint, somestr, someb64):
341            return someretval
342         """
343         def decor(f):
344             @functools.wraps(f)
345             def rv(self, params):
346                 return f(self, *[ ctor(val)
347                                   for ctor,val in zip(types, params[1:]) ])
348             
349             rv._argtypes = types
350             
351             # Derive type encoders by looking up types in _TYPE_ENCODERS
352             # make_proxy will use it to encode arguments in command strings
353             argencoders = []
354             TYPE_ENCODERS = Marshalling._TYPE_ENCODERS
355             for typ in types:
356                 if typ.__name__ in TYPE_ENCODERS:
357                     argencoders.append(TYPE_ENCODERS[typ.__name__])
358                 else:
359                     # generic encoder
360                     argencoders.append(TYPE_ENCODERS[None])
361             
362             rv._argencoders = tuple(argencoders)
363             
364             rv._retval = getattr(f, '_retval', None)
365             return rv
366         return decor
367
368     @staticmethod
369     def retval(typ=Decoders.base64_data):
370         """
371         Decorator that converts the given function into one that 
372         returns a properly encoded return string, given that the undecorated
373         function returns suitable input for the encoding function.
374         
375         The optional typ argument specifies a type.
376         For the default of base64_data, return values should be strings.
377         The return value of the encoding method should be a string always.
378         
379         eg:
380         
381         @Marshalling.args(int,int,str,base64_data)
382         @Marshalling.retval(str)
383         def somefunc(self, someint, otherint, somestr, someb64):
384            return someint
385         """
386         encode, fmt = Marshalling._TYPE_ENCODERS.get(
387             typ.__name__,
388             Marshalling._TYPE_ENCODERS[None])
389         fmt = "%d|"+fmt
390         
391         def decor(f):
392             @functools.wraps(f)
393             def rv(self, *p, **kw):
394                 data = f(self, *p, **kw)
395                 return fmt % (
396                     OK,
397                     encode(data)
398                 )
399             rv._retval = typ
400             rv._argtypes = getattr(f, '_argtypes', None)
401             rv._argencoders = getattr(f, '_argencoders', None)
402             return rv
403         return decor
404     
405     @staticmethod
406     def retvoid(f):
407         """
408         Decorator that converts the given function into one that 
409         always return an encoded empty string.
410         
411         Useful for null-returning functions.
412         """
413         OKRV = "%d|" % (OK,)
414         
415         @functools.wraps(f)
416         def rv(self, *p, **kw):
417             f(self, *p, **kw)
418             return OKRV
419         
420         rv._retval = None
421         rv._argtypes = getattr(f, '_argtypes', None)
422         rv._argencoders = getattr(f, '_argencoders', None)
423         return rv
424     
425     @staticmethod
426     def handles(whichcommand):
427         """
428         Associates the method with a given command code for servers.
429         It should always be the topmost decorator.
430         """
431         def decor(f):
432             f._handles_command = whichcommand
433             return f
434         return decor
435
436 class BaseServer(server.Server):
437     def reply_action(self, msg):
438         if not msg:
439             result = base64.b64encode("Invalid command line")
440             reply = "%d|%s" % (ERROR, result)
441         else:
442             params = msg.split("|")
443             instruction = int(params[0])
444             log_msg(self, params)
445             try:
446                 for mname,meth in vars(self.__class__).iteritems():
447                     if not mname.startswith('_'):
448                         cmd = getattr(meth, '_handles_command', None)
449                         if cmd == instruction:
450                             meth = getattr(self, mname)
451                             reply = meth(params)
452                             break
453                 else:
454                     error = "Invalid instruction %s" % instruction
455                     self.log_error(error)
456                     result = base64.b64encode(error)
457                     reply = "%d|%s" % (ERROR, result)
458             except:
459                 error = self.log_error()
460                 result = base64.b64encode(error)
461                 reply = "%d|%s" % (ERROR, result)
462         log_reply(self, reply)
463         return reply
464
465 class TestbedControllerServer(BaseServer):
466     def __init__(self, root_dir, log_level, testbed_id, testbed_version, environment_setup):
467         super(TestbedControllerServer, self).__init__(root_dir, log_level, 
468             environment_setup = environment_setup )
469         self._testbed_id = testbed_id
470         self._testbed_version = testbed_version
471         self._testbed = None
472
473     def post_daemonize(self):
474         self._testbed = _build_testbed_controller(self._testbed_id, 
475                 self._testbed_version)
476
477     @Marshalling.handles(GUIDS)
478     @Marshalling.args()
479     @Marshalling.retval( Marshalling.pickled_data )
480     def guids(self):
481         return self._testbed.guids
482
483     @Marshalling.handles(TESTBED_ID)
484     @Marshalling.args()
485     @Marshalling.retval()
486     def testbed_id(self):
487         return str(self._testbed.testbed_id)
488
489     @Marshalling.handles(TESTBED_VERSION)
490     @Marshalling.args()
491     @Marshalling.retval()
492     def testbed_version(self):
493         return str(self._testbed.testbed_version)
494
495     @Marshalling.handles(CREATE)
496     @Marshalling.args(int, str)
497     @Marshalling.retvoid
498     def defer_create(self, guid, factory_id):
499         self._testbed.defer_create(guid, factory_id)
500
501     @Marshalling.handles(TRACE)
502     @Marshalling.args(int, str, Marshalling.base64_data)
503     @Marshalling.retval()
504     def trace(self, guid, trace_id, attribute):
505         return self._testbed.trace(guid, trace_id, attribute)
506
507     @Marshalling.handles(TRACES_INFO)
508     @Marshalling.args()
509     @Marshalling.retval( Marshalling.pickled_data )
510     def traces_info(self):
511         return self._testbed.traces_info()
512
513     @Marshalling.handles(START)
514     @Marshalling.args()
515     @Marshalling.retvoid
516     def start(self):
517         self._testbed.start()
518
519     @Marshalling.handles(STOP)
520     @Marshalling.args()
521     @Marshalling.retvoid
522     def stop(self):
523         self._testbed.stop()
524
525     @Marshalling.handles(SHUTDOWN)
526     @Marshalling.args()
527     @Marshalling.retvoid
528     def shutdown(self):
529         self._testbed.shutdown()
530
531     @Marshalling.handles(CONFIGURE)
532     @Marshalling.args(Marshalling.base64_data, Marshalling.pickled_data)
533     @Marshalling.retvoid
534     def defer_configure(self, name, value):
535         self._testbed.defer_configure(name, value)
536
537     @Marshalling.handles(CREATE_SET)
538     @Marshalling.args(int, Marshalling.base64_data, Marshalling.pickled_data)
539     @Marshalling.retvoid
540     def defer_create_set(self, guid, name, value):
541         self._testbed.defer_create_set(guid, name, value)
542
543     @Marshalling.handles(FACTORY_SET)
544     @Marshalling.args(Marshalling.base64_data, Marshalling.pickled_data)
545     @Marshalling.retvoid
546     def defer_factory_set(self, name, value):
547         self._testbed.defer_factory_set(name, value)
548
549     @Marshalling.handles(CONNECT)
550     @Marshalling.args(int, str, int, str)
551     @Marshalling.retvoid
552     def defer_connect(self, guid1, connector_type_name1, guid2, connector_type_name2):
553         self._testbed.defer_connect(guid1, connector_type_name1, guid2, 
554             connector_type_name2)
555
556     @Marshalling.handles(CROSS_CONNECT)
557     @Marshalling.args(int, str, int, int, str, str, str)
558     @Marshalling.retvoid
559     def defer_cross_connect(self, 
560             guid, connector_type_name,
561             cross_guid, cross_testbed_guid,
562             cross_testbed_id, cross_factory_id,
563             cross_connector_type_name):
564         self._testbed.defer_cross_connect(guid, connector_type_name, cross_guid, 
565             cross_testbed_guid, cross_testbed_id, cross_factory_id, 
566             cross_connector_type_name)
567
568     @Marshalling.handles(ADD_TRACE)
569     @Marshalling.args(int, str)
570     @Marshalling.retvoid
571     def defer_add_trace(self, guid, trace_id):
572         self._testbed.defer_add_trace(guid, trace_id)
573
574     @Marshalling.handles(ADD_ADDRESS)
575     @Marshalling.args(int, str, int, Marshalling.pickled_data)
576     @Marshalling.retvoid
577     def defer_add_address(self, guid, address, netprefix, broadcast):
578         self._testbed.defer_add_address(guid, address, netprefix,
579                 broadcast)
580
581     @Marshalling.handles(ADD_ROUTE)
582     @Marshalling.args(int, str, int, str, int)
583     @Marshalling.retvoid
584     def defer_add_route(self, guid, destination, netprefix, nexthop, metric):
585         self._testbed.defer_add_route(guid, destination, netprefix, nexthop, metric)
586
587     @Marshalling.handles(DO_SETUP)
588     @Marshalling.args()
589     @Marshalling.retvoid
590     def do_setup(self):
591         self._testbed.do_setup()
592
593     @Marshalling.handles(DO_CREATE)
594     @Marshalling.args()
595     @Marshalling.retvoid
596     def do_create(self):
597         self._testbed.do_create()
598
599     @Marshalling.handles(DO_CONNECT_INIT)
600     @Marshalling.args()
601     @Marshalling.retvoid
602     def do_connect_init(self):
603         self._testbed.do_connect_init()
604
605     @Marshalling.handles(DO_CONNECT_COMPL)
606     @Marshalling.args()
607     @Marshalling.retvoid
608     def do_connect_compl(self):
609         self._testbed.do_connect_compl()
610
611     @Marshalling.handles(DO_CONFIGURE)
612     @Marshalling.args()
613     @Marshalling.retvoid
614     def do_configure(self):
615         self._testbed.do_configure()
616
617     @Marshalling.handles(DO_PRECONFIGURE)
618     @Marshalling.args()
619     @Marshalling.retvoid
620     def do_preconfigure(self):
621         self._testbed.do_preconfigure()
622
623     @Marshalling.handles(DO_PRESTART)
624     @Marshalling.args()
625     @Marshalling.retvoid
626     def do_prestart(self):
627         self._testbed.do_prestart()
628
629     @Marshalling.handles(DO_CROSS_CONNECT_INIT)
630     @Marshalling.args( Marshalling.Decoders.pickled_data )
631     @Marshalling.retvoid
632     def do_cross_connect_init(self, cross_data):
633         self._testbed.do_cross_connect_init(cross_data)
634
635     @Marshalling.handles(DO_CROSS_CONNECT_COMPL)
636     @Marshalling.args( Marshalling.Decoders.pickled_data )
637     @Marshalling.retvoid
638     def do_cross_connect_compl(self, cross_data):
639         self._testbed.do_cross_connect_compl(cross_data)
640
641     @Marshalling.handles(GET)
642     @Marshalling.args(int, Marshalling.base64_data, str)
643     @Marshalling.retval( Marshalling.pickled_data )
644     def get(self, guid, name, time):
645         return self._testbed.get(guid, name, time)
646
647     @Marshalling.handles(SET)
648     @Marshalling.args(int, Marshalling.base64_data, Marshalling.pickled_data, str)
649     @Marshalling.retvoid
650     def set(self, guid, name, value, time):
651         self._testbed.set(guid, name, value, time)
652
653     @Marshalling.handles(GET_ADDRESS)
654     @Marshalling.args(int, int, Marshalling.base64_data)
655     @Marshalling.retval()
656     def get_address(self, guid, index, attribute):
657         return str(self._testbed.get_address(guid, index, attribute))
658
659     @Marshalling.handles(GET_ROUTE)
660     @Marshalling.args(int, int, Marshalling.base64_data)
661     @Marshalling.retval()
662     def get_route(self, guid, index, attribute):
663         return str(self._testbed.get_route(guid, index, attribute))
664
665     @Marshalling.handles(ACTION)
666     @Marshalling.args(str, int, Marshalling.base64_data)
667     @Marshalling.retvoid
668     def action(self, time, guid, command):
669         self._testbed.action(time, guid, command)
670
671     @Marshalling.handles(STATUS)
672     @Marshalling.args(Marshalling.nullint)
673     @Marshalling.retval(int)
674     def status(self, guid):
675         return self._testbed.status(guid)
676
677     @Marshalling.handles(GET_ATTRIBUTE_LIST)
678     @Marshalling.args(int, Marshalling.nullint, Marshalling.bool)
679     @Marshalling.retval( Marshalling.pickled_data )
680     def get_attribute_list(self, guid, filter_flags = None, exclude = False):
681         return self._testbed.get_attribute_list(guid, filter_flags, exclude)
682
683     @Marshalling.handles(GET_FACTORY_ID)
684     @Marshalling.args(int)
685     @Marshalling.retval()
686     def get_factory_id(self, guid):
687         return self._testbed.get_factory_id(guid)
688
689 class ExperimentControllerServer(BaseServer):
690     def __init__(self, root_dir, log_level, experiment_xml, environment_setup):
691         super(ExperimentControllerServer, self).__init__(root_dir, log_level, 
692             environment_setup = environment_setup )
693         self._experiment_xml = experiment_xml
694         self._experiment = None
695
696     def post_daemonize(self):
697         from nepi.core.execute import ExperimentController
698         self._experiment = ExperimentController(self._experiment_xml, 
699             root_dir = self._root_dir)
700
701     @Marshalling.handles(GUIDS)
702     @Marshalling.args()
703     @Marshalling.retval( Marshalling.pickled_data )
704     def guids(self):
705         return self._experiment.guids
706
707     @Marshalling.handles(XML)
708     @Marshalling.args()
709     @Marshalling.retval()
710     def experiment_design_xml(self):
711         return self._experiment.experiment_design_xml
712         
713     @Marshalling.handles(EXEC_XML)
714     @Marshalling.args()
715     @Marshalling.retval()
716     def experiment_execute_xml(self):
717         return self._experiment.experiment_execute_xml
718         
719     @Marshalling.handles(TRACE)
720     @Marshalling.args(int, str, Marshalling.base64_data)
721     @Marshalling.retval()
722     def trace(self, guid, trace_id, attribute):
723         return str(self._experiment.trace(guid, trace_id, attribute))
724
725     @Marshalling.handles(TRACES_INFO)
726     @Marshalling.args()
727     @Marshalling.retval( Marshalling.pickled_data )
728     def traces_info(self):
729         return self._experiment.traces_info()
730
731     @Marshalling.handles(FINISHED)
732     @Marshalling.args(int)
733     @Marshalling.retval(Marshalling.bool)
734     def is_finished(self, guid):
735         return self._experiment.is_finished(guid)
736
737     @Marshalling.handles(GET)
738     @Marshalling.args(int, Marshalling.base64_data, str)
739     @Marshalling.retval( Marshalling.pickled_data )
740     def get(self, guid, name, time):
741         return self._experiment.get(guid, name, time)
742
743     @Marshalling.handles(SET)
744     @Marshalling.args(int, Marshalling.base64_data, Marshalling.pickled_data, str)
745     @Marshalling.retvoid
746     def set(self, guid, name, value, time):
747         self._experiment.set(guid, name, value, time)
748
749     @Marshalling.handles(START)
750     @Marshalling.args()
751     @Marshalling.retvoid
752     def start(self):
753         self._experiment.start()
754
755     @Marshalling.handles(STOP)
756     @Marshalling.args()
757     @Marshalling.retvoid
758     def stop(self):
759         self._experiment.stop()
760
761     @Marshalling.handles(RECOVER)
762     @Marshalling.args()
763     @Marshalling.retvoid
764     def recover(self):
765         self._experiment.recover()
766
767     @Marshalling.handles(SHUTDOWN)
768     @Marshalling.args()
769     @Marshalling.retvoid
770     def shutdown(self):
771         self._experiment.shutdown()
772
773     @Marshalling.handles(GET_TESTBED_ID)
774     @Marshalling.args(int)
775     @Marshalling.retval()
776     def get_testbed_id(self, guid):
777         return self._experiment.get_testbed_id(guid)
778
779     @Marshalling.handles(GET_FACTORY_ID)
780     @Marshalling.args(int)
781     @Marshalling.retval()
782     def get_factory_id(self, guid):
783         return self._experiment.get_factory_id(guid)
784
785     @Marshalling.handles(GET_TESTBED_VERSION)
786     @Marshalling.args(int)
787     @Marshalling.retval()
788     def get_testbed_version(self, guid):
789         return self._experiment.get_testbed_version(guid)
790
791 class BaseProxy(object):
792     _ServerClass = None
793     _ServerClassModule = "nepi.util.proxy"
794     
795     def __init__(self, 
796             ctor_args, root_dir, 
797             launch = True, host = None, 
798             port = None, user = None, ident_key = None, agent = None,
799             environment_setup = ""):
800         if launch:
801             # ssh
802             if host != None:
803                 python_code = (
804                     "from %(classmodule)s import %(classname)s;"
805                     "s = %(classname)s%(ctor_args)r;"
806                     "s.run()" 
807                 % dict(
808                     classname = self._ServerClass.__name__,
809                     classmodule = self._ServerClassModule,
810                     ctor_args = ctor_args
811                 ) )
812                 proc = server.popen_ssh_subprocess(python_code, host = host,
813                     port = port, user = user, agent = agent,
814                     ident_key = ident_key,
815                     environment_setup = environment_setup,
816                     waitcommand = True)
817                 if proc.poll():
818                     err = proc.stderr.read()
819                     raise RuntimeError, "Server could not be executed: %s" % (err,)
820             else:
821                 # launch daemon
822                 s = self._ServerClass(*ctor_args)
823                 s.run()
824
825         # connect client to server
826         self._client = server.Client(root_dir, host = host, port = port, 
827                 user = user, agent = agent, 
828                 environment_setup = environment_setup)
829     
830     @staticmethod
831     def _make_message(argtypes, argencoders, command, methname, classname, *args):
832         if len(argtypes) != len(argencoders):
833             raise ValueError, "Invalid arguments for _make_message: "\
834                 "in stub method %s of class %s "\
835                 "argtypes and argencoders must match in size" % (
836                     methname, classname )
837         if len(argtypes) != len(args):
838             raise ValueError, "Invalid arguments for _make_message: "\
839                 "in stub method %s of class %s "\
840                 "expected %d arguments, got %d" % (
841                     methname, classname,
842                     len(argtypes), len(args))
843         
844         buf = []
845         for argnum, (typ, (encode, fmt), val) in enumerate(zip(argtypes, argencoders, args)):
846             try:
847                 buf.append(fmt % encode(val))
848             except:
849                 import traceback
850                 raise TypeError, "Argument %d of stub method %s of class %s "\
851                     "requires a value of type %s, but got %s - nested error: %s" % (
852                         argnum, methname, classname,
853                         getattr(typ, '__name__', typ), type(val),
854                         traceback.format_exc()
855                 )
856         
857         return "%d|%s" % (command, '|'.join(buf))
858     
859     @staticmethod
860     def _parse_reply(rvtype, methname, classname, reply):
861         if not reply:
862             raise RuntimeError, "Invalid reply: %r "\
863                 "for stub method %s of class %s" % (
864                     reply,
865                     methname,
866                     classname)
867         
868         try:
869             result = reply.split("|")
870             code = int(result[0])
871             text = result[1]
872         except:
873             import traceback
874             raise TypeError, "Return value of stub method %s of class %s "\
875                 "cannot be parsed: must be of type %s, got %r - nested error: %s" % (
876                     methname, classname,
877                     getattr(rvtype, '__name__', rvtype), reply,
878                     traceback.format_exc()
879             )
880         if code == ERROR:
881             text = base64.b64decode(text)
882             raise RuntimeError(text)
883         elif code == OK:
884             try:
885                 if rvtype is None:
886                     return
887                 else:
888                     return rvtype(text)
889             except:
890                 import traceback
891                 raise TypeError, "Return value of stub method %s of class %s "\
892                     "cannot be parsed: must be of type %s - nested error: %s" % (
893                         methname, classname,
894                         getattr(rvtype, '__name__', rvtype),
895                         traceback.format_exc()
896                 )
897         else:
898             raise RuntimeError, "Invalid reply: %r "\
899                 "for stub method %s of class %s - unknown code" % (
900                     reply,
901                     methname,
902                     classname)
903     
904     @staticmethod
905     def _make_stubs(server_class, template_class):
906         """
907         Returns a dictionary method_name -> method
908         with stub methods.
909         
910         Usage:
911         
912             class SomeProxy(BaseProxy):
913                ...
914                
915                locals().update( BaseProxy._make_stubs(
916                     ServerClass,
917                     TemplateClass
918                ) )
919         
920         ServerClass is the corresponding Server class, as
921         specified in the _ServerClass class method (_make_stubs
922         is static and can't access the method), and TemplateClass
923         is the ultimate implementation class behind the server,
924         from which argument names and defaults are taken, to
925         maintain meaningful interfaces.
926         """
927         rv = {}
928         
929         class NONE: pass
930         
931         import os.path
932         func_template_path = os.path.join(
933             os.path.dirname(__file__),
934             'proxy_stub.tpl')
935         func_template_file = open(func_template_path, "r")
936         func_template = func_template_file.read()
937         func_template_file.close()
938         
939         for methname in vars(template_class).copy():
940             if methname.endswith('_deferred'):
941                 # cannot wrap deferreds...
942                 continue
943             dmethname = methname+'_deferred'
944             if hasattr(server_class, methname) and not methname.startswith('_'):
945                 template_meth = getattr(template_class, methname)
946                 server_meth = getattr(server_class, methname)
947                 
948                 command = getattr(server_meth, '_handles_command', None)
949                 argtypes = getattr(server_meth, '_argtypes', None)
950                 argencoders = getattr(server_meth, '_argencoders', None)
951                 rvtype = getattr(server_meth, '_retval', None)
952                 doprop = False
953                 
954                 if hasattr(template_meth, 'fget'):
955                     # property getter
956                     template_meth = template_meth.fget
957                     doprop = True
958                 
959                 if command is not None and argtypes is not None and argencoders is not None:
960                     # We have an interface method...
961                     code = template_meth.func_code
962                     argnames = code.co_varnames[:code.co_argcount]
963                     argdefaults = ( (NONE,) * (len(argnames) - len(template_meth.func_defaults or ()))
964                                   + (template_meth.func_defaults or ()) )
965                     
966                     func_globals = dict(
967                         BaseProxy = BaseProxy,
968                         argtypes = argtypes,
969                         argencoders = argencoders,
970                         rvtype = rvtype,
971                         functools = functools,
972                     )
973                     context = dict()
974                     
975                     func_text = func_template % dict(
976                         self = argnames[0],
977                         args = '%s' % (','.join(argnames[1:])),
978                         argdefs = ','.join([
979                             argname if argdef is NONE
980                             else "%s=%r" % (argname, argdef)
981                             for argname, argdef in zip(argnames[1:], argdefaults[1:])
982                         ]),
983                         command = command,
984                         methname = methname,
985                         classname = server_class.__name__
986                     )
987                     
988                     func_text = compile(
989                         func_text,
990                         func_template_path,
991                         'exec')
992                     
993                     exec func_text in func_globals, context
994                     
995                     if doprop:
996                         rv[methname] = property(context[methname])
997                         rv[dmethname] = property(context[dmethname])
998                     else:
999                         rv[methname] = context[methname]
1000                         rv[dmethname] = context[dmethname]
1001                     
1002                     # inject _deferred into core classes
1003                     if hasattr(template_class, methname) and not hasattr(template_class, dmethname):
1004                         def freezename(methname, dmethname):
1005                             def dmeth(self, *p, **kw): 
1006                                 return getattr(self, methname)(*p, **kw)
1007                             dmeth.__name__ = dmethname
1008                             return dmeth
1009                         dmeth = freezename(methname, dmethname)
1010                         setattr(template_class, dmethname, dmeth)
1011         
1012         return rv
1013                         
1014 class TestbedControllerProxy(BaseProxy):
1015     
1016     _ServerClass = TestbedControllerServer
1017     
1018     def __init__(self, root_dir, log_level, testbed_id = None, 
1019             testbed_version = None, launch = True, host = None, 
1020             port = None, user = None, ident_key = None, agent = None,
1021             environment_setup = ""):
1022         if launch and (testbed_id == None or testbed_version == None):
1023             raise RuntimeError("To launch a TesbedControllerServer a "
1024                     "testbed_id and testbed_version are required")
1025         super(TestbedControllerProxy,self).__init__(
1026             ctor_args = (root_dir, log_level, testbed_id, testbed_version, environment_setup),
1027             root_dir = root_dir,
1028             launch = launch, host = host, port = port, user = user,
1029             ident_key = ident_key, agent = agent, 
1030             environment_setup = environment_setup)
1031
1032     locals().update( BaseProxy._make_stubs(
1033         server_class = TestbedControllerServer,
1034         template_class = nepi.core.execute.TestbedController,
1035     ) )
1036     
1037     # Shutdown stops the serverside...
1038     def shutdown(self, _stub = shutdown):
1039         rv = _stub(self)
1040         self._client.send_stop()
1041         self._client.read_reply() # wait for it
1042         return rv
1043     
1044
1045 class ExperimentControllerProxy(BaseProxy):
1046     _ServerClass = ExperimentControllerServer
1047     
1048     def __init__(self, root_dir, log_level, experiment_xml = None, 
1049             launch = True, host = None, port = None, user = None, 
1050             ident_key = None, agent = None, environment_setup = ""):
1051         super(ExperimentControllerProxy,self).__init__(
1052             ctor_args = (root_dir, log_level, experiment_xml, environment_setup),
1053             root_dir = root_dir,
1054             launch = launch, host = host, port = port, user = user,
1055             ident_key = ident_key, agent = agent, 
1056             environment_setup = environment_setup)
1057
1058     locals().update( BaseProxy._make_stubs(
1059         server_class = ExperimentControllerServer,
1060         template_class = nepi.core.execute.ExperimentController,
1061     ) )
1062
1063     
1064     # Shutdown stops the serverside...
1065     def shutdown(self, _stub = shutdown):
1066         rv = _stub(self)
1067         self._client.send_stop()
1068         self._client.read_reply() # wait for it
1069         return rv
1070